hat.drivers.iec104 - IEC 60870-5-104

Asyncio based communication driver for IEC 60870-5-104 protocol.

class Address(typing.NamedTuple):
    host: str
    port: int = 2404

class ConnectionInfo(typing.NamedTuple):
    local_addr: Address
    remote_addr: Address

async def connect(addr: Address,
                  interrogate_cb: typing.Optional[InterrogateCb] = None,
                  counter_interrogate_cb: typing.Optional[CounterInterrogateCb] = None,
                  command_cb: typing.Optional[CommandCb] = None,
                  response_timeout: float = 15,
                  supervisory_timeout: float = 10,
                  test_timeout: float = 20,
                  send_window_size: int = 12,
                  receive_window_size: int = 8
                  ) -> 'Connection': ...

async def listen(connection_cb: ConnectionCb,
                 addr: Address = Address('0.0.0.0'),
                 interrogate_cb: typing.Optional[InterrogateCb] = None,
                 counter_interrogate_cb: typing.Optional[CounterInterrogateCb] = None,
                 command_cb: typing.Optional[CommandCb] = None,
                 response_timeout: float = 15,
                 supervisory_timeout: float = 10,
                 test_timeout: float = 20,
                 send_window_size: int = 12,
                 receive_window_size: int = 8
                 ) -> 'Server': ...

class Server(aio.Resource):

    @property
    def async_group(self) -> aio.Group: ...

    @property
    def addresses(self) -> typing.List[Address]: ...

class Connection(aio.Resource):

    @property
    def async_group(self) -> aio.Group: ...

    @property
    def info(self) -> ConnectionInfo: ...

    def notify_data_change(self, data: typing.List[common.Data]): ...

    async def interrogate(self,
                          asdu_address: int = 0xFFFF
                          ) -> typing.List[common.Data]: ...

    async def counter_interrogate(self,
                                  asdu_address: int = 0xFFFF,
                                  freeze: common.FreezeCode = common.FreezeCode.READ
                                  ) -> typing.List[common.Data]: ...

    async def receive(self) -> typing.List[common.Data]: ...

    async def send_command(self,
                           cmd: common.Command
                           ) -> bool: ...

Example usage:

addr = iec104.Address('127.0.0.1', util.get_unused_tcp_port())
conn2_future = asyncio.Future()
srv = await iec104.listen(conn2_future.set_result, addr)
conn1 = await iec104.connect(addr)
conn2 = await conn2_future

data = iec104.Data(value=iec104.SingleValue.ON,
                   quality=iec104.Quality(invalid=False,
                                          not_topical=False,
                                          substituted=False,
                                          blocked=False,
                                          overflow=False),
                   time=None,
                   asdu_address=123,
                   io_address=321,
                   cause=iec104.Cause.SPONTANEOUS,
                   is_test=False)

# send data from conn1 to conn2
conn1.notify_data_change([data])
result = await conn2.receive()
assert result == [data]

# send data from conn2 to conn1
conn2.notify_data_change([data])
result = await conn1.receive()
assert result == [data]

await conn1.async_close()
await conn2.async_close()
await srv.async_close()

API

API reference is available as part of generated documentation: