hat.chatter - Python chatter library¶
This library provides Python implementation of Chatter communication protocol.
Client¶
hat.chatter.connect coroutine creates client initiated chatter connection:
async def connect(sbs_repo: sbs.Repository,
address: str,
*,
pem_file: typing.Optional[str] = None,
ping_timeout: float = 20,
queue_maxsize: int = 0
) -> 'Connection': ...
Server¶
hat.chatter.listen coroutine creates server listening for incomming chatter connections:
async def listen(sbs_repo: sbs.Repository,
address: str,
connection_cb: typing.Callable[['Connection'], None],
*,
pem_file: typing.Optional[str] = None,
ping_timeout: float = 20,
queue_maxsize: int = 0
) -> 'Server':
class Server(aio.Resource):
@property
def async_group(self) -> aio.Group: ...
@property
def addresses(self) -> typing.List[str]: ...
Connection¶
Once chatter connection is established by calling connect or by listening for incoming connection with listen, communication interface for client-side and for server-side is the same:
class Data(typing.NamedTuple):
module: typing.Optional[str]
"""SBS module name"""
type: str
"""SBS type name"""
data: sbs.Data
class Conversation(typing.NamedTuple):
conn: 'Connection'
owner: bool
first_id: int
class Msg(typing.NamedTuple):
data: Data
conv: Conversation
first: bool
last: bool
token: bool
class Connection(aio.Resource):
@property
def async_group(self) -> aio.Group: ...
@property
def local_address(self) -> str: ...
@property
def remote_address(self) -> str: ...
async def receive(self) -> Msg: ...
def send(self,
msg_data: Data,
*,
conv: typing.Optional[Conversation] = None,
last: bool = True,
token: bool = True,
timeout: typing.Optional[float] = None,
timeout_cb: typing.Optional[typing.Callable[[Conversation],
None]] = None
) -> Conversation: ...
Example¶
from hat import aio
from hat import chatter
from hat import sbs
from hat import util
sbs_repo = sbs.Repository(chatter.sbs_repo, r"""
module Example
Msg = Integer
""")
port = util.get_unused_tcp_port()
address = f'tcp+sbs://127.0.0.1:{port}'
server_conns = aio.Queue()
server = await chatter.listen(sbs_repo, address, server_conns.put_nowait)
client_conn = await chatter.connect(sbs_repo, address)
server_conn = await server_conns.get()
data = chatter.Data('Example', 'Msg', 123)
client_conn.send(data)
msg = await server_conn.receive()
assert msg.data == data
await server.async_close()
await client_conn.wait_closed()
await server_conn.wait_closed()