TPKT

Python implementation

Transport protocol on top of TCP

hat.drivers.tpkt.mlog

module logger

Type

logging.Logger

hat.drivers.tpkt.Data

Data

alias of Union[bytes, bytearray, memoryview]

class hat.drivers.tpkt.Address(host, port)

Bases: tuple

Create new instance of Address(host, port)

host

host name

Type

str

port

TCP port

Type

int

class hat.drivers.tpkt.ConnectionInfo(local_addr, remote_addr)

Bases: tuple

Create new instance of ConnectionInfo(local_addr, remote_addr)

local_addr

local address

Type

Address

remote_addr

remote address

Type

Address

hat.drivers.tpkt.ConnectionCb

Connection callback

alias of Callable[[Connection], Union[None, Awaitable[None]]]

async hat.drivers.tpkt.connect(addr)

Create new TPKT connection

Parameters

addr (Address) – remote address

Returns

Connection

async hat.drivers.tpkt.listen(connection_cb, addr=Address(host='0.0.0.0', port=102))

Create new TPKT listening server

Parameters
  • connection_cb (ConnectionCb) – new connection callback

  • addr (Address) – local listening address

Returns

Server

class hat.drivers.tpkt.Server

Bases: object

TPKT listening server

For creation of new instance see listen()

property addresses

listening addresses

Type

List[Address]

property closed

closed future

Type

asyncio.Future

async async_close()

Close listening socket

Calling this method doesn’t close active incomming connections

class hat.drivers.tpkt.Connection

Bases: object

TPKT connection

For creation of new instance see connect()

property info

connection info

Type

ConnectionInfo

property closed

closed future

Type

asyncio.Future

async async_close()

Async close

async read()

Read data

Returns

Data

write(data)

Write data

Parameters

data (Data) – data