Module hat.event.client
Library used by components for communication with Event Server
This module provides low-level interface (connect/Client) and high-level interface (run_client) for communication with Event Server.
:func:connect()
is used for establishing single chatter based connection
with Event Server which is represented by :class:Client
. Once connection
is terminated (signaled with :meth:Client.closed
), it is up to user to
repeat :func:connect()
call and create new :class:Client
instance, if
additional communication with Event Server is required.
Example of low-level interface usage::
client = await hat.event.client.connect('tcp+sbs://127.0.0.1:23012',
[['x', 'y', 'z']])
registered_events = await client.register_with_response([
hat.event.common.RegisterEvent(
event_type=['x', 'y', 'z'],
source_timestamp=hat.event.common.now(),
payload=hat.event.common.EventPayload(
type=hat.event.common.EventPayloadType.BINARY,
data=b'test'))])
received_events = await client.receive()
queried_events = await client.query(
hat.event.common.QueryData(
event_types=[['x', 'y', 'z']],
max_results=1))
assert registered_events == received_events
assert received_events == queried_events
await client.async_close()
:func:run_client()
provides high-level interface for continuous communication
with currenty active Event Server based on information obtained from Monitor
Server. This function repeatedly tries to create active connection with
Event Server. When this connection is created, users code is notified by
calling async_run_cb
callback. Once connection is closed, execution of
async_run_cb
is cancelled and :func:run_client()
repeats connection
estabishment process.
Example of high-level interface usage::
async def monitor_async_run(component):
await hat.event.client.run_client(
monitor_client=monitor,
server_group='event servers',
async_run_cb=event_async_run])
async def event_async_run(client):
while True:
assert not client.is_closed
await asyncio.sleep(10)
monitor = await hat.monitor.client.connect({
'name': 'client',
'group': 'test clients',
'monitor_address': 'tcp+sbs://127.0.0.1:23010',
'component_address': None})
component = hat.monitor.client.Component(monitor, monitor_async_run)
component.set_enabled(True)
await monitor.async_close()
Expand source code
"""Library used by components for communication with Event Server
This module provides low-level interface (connect/Client) and high-level
interface (run_client) for communication with Event Server.
:func:`connect` is used for establishing single chatter based connection
with Event Server which is represented by :class:`Client`. Once connection
is terminated (signaled with :meth:`Client.closed`), it is up to user to
repeat :func:`connect` call and create new :class:`Client` instance, if
additional communication with Event Server is required.
Example of low-level interface usage::
client = await hat.event.client.connect('tcp+sbs://127.0.0.1:23012',
[['x', 'y', 'z']])
registered_events = await client.register_with_response([
hat.event.common.RegisterEvent(
event_type=['x', 'y', 'z'],
source_timestamp=hat.event.common.now(),
payload=hat.event.common.EventPayload(
type=hat.event.common.EventPayloadType.BINARY,
data=b'test'))])
received_events = await client.receive()
queried_events = await client.query(
hat.event.common.QueryData(
event_types=[['x', 'y', 'z']],
max_results=1))
assert registered_events == received_events
assert received_events == queried_events
await client.async_close()
:func:`run_client` provides high-level interface for continuous communication
with currenty active Event Server based on information obtained from Monitor
Server. This function repeatedly tries to create active connection with
Event Server. When this connection is created, users code is notified by
calling `async_run_cb` callback. Once connection is closed, execution of
`async_run_cb` is cancelled and :func:`run_client` repeats connection
estabishment process.
Example of high-level interface usage::
async def monitor_async_run(component):
await hat.event.client.run_client(
monitor_client=monitor,
server_group='event servers',
async_run_cb=event_async_run])
async def event_async_run(client):
while True:
assert not client.is_closed
await asyncio.sleep(10)
monitor = await hat.monitor.client.connect({
'name': 'client',
'group': 'test clients',
'monitor_address': 'tcp+sbs://127.0.0.1:23010',
'component_address': None})
component = hat.monitor.client.Component(monitor, monitor_async_run)
component.set_enabled(True)
await monitor.async_close()
"""
import asyncio
import contextlib
import logging
import typing
from hat import aio
from hat import chatter
from hat import util
from hat.event import common
import hat.monitor.client
mlog: logging.Logger = logging.getLogger(__name__)
"""Module logger"""
reconnect_delay: float = 0.5
"""Delay in seconds before trying to reconnect to event server
(used in high-level interface)"""
async def connect(address: str,
subscriptions: typing.List[common.EventType] = [],
**kwargs
) -> 'Client':
"""Connect to event server
For address format see `hat.chatter.connect` coroutine.
According to Event Server specification, each subscription is event
type identifier which can contain special subtypes ``?`` and ``*``.
Subtype ``?`` can occure at any position inside event type identifier
and is used as replacement for any single subtype. Subtype ``*`` is valid
only as last subtype in event type identifier and is used as replacement
for zero or more arbitrary subtypes.
If subscription is empty list, client doesn't subscribe for any events and
will not receive server's notifications.
Args:
address: event server's address
subscriptions: subscriptions
kwargs: additional arguments passed to `hat.chatter.connect` coroutine
"""
client = Client()
client._async_group = aio.Group()
client._conv_futures = {}
client._event_queue = aio.Queue()
client._conn = await chatter.connect(common.sbs_repo, address, **kwargs)
client._async_group.spawn(aio.call_on_cancel, client._conn.async_close)
if subscriptions:
client._conn.send(chatter.Data(module='HatEvent',
type='MsgSubscribe',
data=[list(i) for i in subscriptions]))
client._async_group.spawn(client._receive_loop)
return client
class Client(aio.Resource):
"""Event Server client
For creating new client see `connect` coroutine.
"""
@property
def async_group(self) -> aio.Group:
"""Async group"""
return self._async_group
async def receive(self) -> typing.List[common.Event]:
"""Receive subscribed event notifications
Raises:
ConnectionError
"""
try:
return await self._event_queue.get()
except aio.QueueClosedError:
raise ConnectionError()
def register(self, events: typing.List[common.RegisterEvent]):
"""Register events
Raises:
ConnectionError
"""
msg_data = chatter.Data(module='HatEvent',
type='MsgRegisterReq',
data=[common.register_event_to_sbs(i)
for i in events])
self._conn.send(msg_data)
async def register_with_response(self,
events: typing.List[common.RegisterEvent]
) -> typing.List[typing.Optional[common.Event]]: # NOQA
"""Register events
Each `common.RegisterEvent` from `events` is paired with results
`common.Event` if new event was successfuly created or ``None`` is new
event could not be created.
Raises:
ConnectionError
"""
msg_data = chatter.Data(module='HatEvent',
type='MsgRegisterReq',
data=[common.register_event_to_sbs(i)
for i in events])
conv = self._conn.send(msg_data, last=False)
return await self._wait_conv_res(conv)
async def query(self,
data: common.QueryData
) -> typing.List[common.Event]:
"""Query events from server
Raises:
ConnectionError
"""
msg_data = chatter.Data(module='HatEvent',
type='MsgQueryReq',
data=common.query_to_sbs(data))
conv = self._conn.send(msg_data, last=False)
return await self._wait_conv_res(conv)
async def _receive_loop(self):
mlog.debug("starting receive loop")
try:
while True:
mlog.debug("waiting for incoming message")
msg = await self._conn.receive()
msg_type = msg.data.module, msg.data.type
if msg_type == ('HatEvent', 'MsgNotify'):
mlog.debug("received event notification")
self._process_msg_notify(msg)
elif msg_type == ('HatEvent', 'MsgQueryRes'):
mlog.debug("received query response")
self._process_msg_query_res(msg)
elif msg_type == ('HatEvent', 'MsgRegisterRes'):
mlog.debug("received register response")
self._process_msg_register_res(msg)
else:
raise Exception("unsupported message type")
except ConnectionError:
pass
except Exception as e:
mlog.error("read loop error: %s", e, exc_info=e)
finally:
mlog.debug("stopping receive loop")
self._async_group.close()
self._event_queue.close()
for f in self._conv_futures.values():
f.set_exception(ConnectionError())
async def _wait_conv_res(self, conv):
if not self.is_open:
raise ConnectionError()
response_future = asyncio.Future()
self._conv_futures[conv] = response_future
try:
return await response_future
finally:
self._conv_futures.pop(conv, None)
def _process_msg_notify(self, msg):
events = [common.event_from_sbs(e) for e in msg.data.data]
self._event_queue.put_nowait(events)
def _process_msg_query_res(self, msg):
f = self._conv_futures.get(msg.conv)
if not f or f.done():
return
events = [common.event_from_sbs(e) for e in msg.data.data]
f.set_result(events)
def _process_msg_register_res(self, msg):
f = self._conv_futures.get(msg.conv)
if not f or f.done():
return
events = [common.event_from_sbs(e) if t == 'event' else None
for t, e in msg.data.data]
f.set_result(events)
async def run_client(monitor_client: hat.monitor.client.Client,
server_group: str,
async_run_cb: typing.Callable[[Client],
typing.Awaitable[None]],
subscriptions: typing.List[common.EventType] = []
) -> typing.Any:
"""Continuously communicate with currently active Event Server
This function tries to establish active connection with Event Server
within monitor component group `server_group`. Once this connection is
established, `async_run_cb` is called with currently active `Client`
instance. Once connection to Event Server is closed or new active Event
Server is detected, execution of `async_run_cb` is canceled. If new
connection to Event Server is successfully established,
`async_run_cb` is called with new instance of `Client`.
`async_run_cb` is called when:
* new active `Client` is created
`async_run_cb` execution is cancelled when:
* `run_client` finishes execution
* connection to Event Server is closed
* different active Event Server is detected from Monitor Server's list
of components
`run_client` finishes execution when:
* connection to Monitor Server is closed
* `async_run_cb` finishes execution (by returning value or raising
exception, other than `asyncio.CancelledError`)
Return value of this function is the same as return value of
`async_run_cb`. If `async_run_cb` finishes by raising exception or if
connection to Monitor Server is closed, ConnectionError is raised.
If execution of `run_client` is canceled while `async_run_cb` is
running, connection to event server is closed after `async_run_cb`
cancellation finishes.
"""
async_group = aio.Group()
address_queue = aio.Queue()
async_group.spawn(aio.call_on_done, monitor_client.wait_closing(),
address_queue.close)
async_group.spawn(_address_loop, monitor_client, server_group,
address_queue)
address = None
try:
while True:
while not address:
address = await address_queue.get_until_empty()
async with async_group.create_subgroup() as subgroup:
address_future = subgroup.spawn(address_queue.get_until_empty)
client_future = subgroup.spawn(_client_loop, address,
subscriptions, async_run_cb)
await asyncio.wait([address_future, client_future],
return_when=asyncio.FIRST_COMPLETED)
if address_future.done():
address = address_future.result()
else:
return client_future.result()
except aio.QueueClosedError:
raise ConnectionError()
finally:
await aio.uncancellable(async_group.async_close())
async def _address_loop(monitor_client, server_group, address_queue):
last_address = None
changes = aio.Queue()
with monitor_client.register_change_cb(lambda: changes.put_nowait(None)):
while True:
info = util.first(monitor_client.components, lambda c: (
c.group == server_group and
c.blessing is not None and
c.blessing == c.ready))
address = info.address if info else None
if address != last_address and not address_queue.is_closed:
mlog.debug("new server address: %s", address)
last_address = address
address_queue.put_nowait(address)
await changes.get()
async def _client_loop(address, subscriptions, async_run_cb):
while True:
async_group = aio.Group()
try:
mlog.debug("connecting to server %s", address)
try:
client = await connect(address, subscriptions)
except Exception as e:
mlog.warning("error connecting to server: %s", e, exc_info=e)
await asyncio.sleep(reconnect_delay)
continue
mlog.debug("connected to server - running async_run_cb")
async_group.spawn(aio.call_on_cancel, client.async_close)
async_group.spawn(aio.call_on_done, client.wait_closing(),
async_group.close)
async with async_group.create_subgroup() as subgroup:
run_future = subgroup.spawn(async_run_cb, client)
await asyncio.wait([run_future])
with contextlib.suppress(asyncio.CancelledError):
return run_future.result()
finally:
await aio.uncancellable(async_group.async_close())
mlog.debug("connection to server closed")
await asyncio.sleep(reconnect_delay)
Global variables
var mlog : logging.Logger
-
Module logger
var reconnect_delay : float
-
Delay in seconds before trying to reconnect to event server (used in high-level interface)
Functions
async def connect(address: str, subscriptions: typing.List[typing.Tuple[str, ...]] = [], **kwargs) ‑> Client
-
Connect to event server
For address format see
connect()
coroutine.According to Event Server specification, each subscription is event type identifier which can contain special subtypes
?
and*
. Subtype?
can occure at any position inside event type identifier and is used as replacement for any single subtype. Subtype*
is valid only as last subtype in event type identifier and is used as replacement for zero or more arbitrary subtypes.If subscription is empty list, client doesn't subscribe for any events and will not receive server's notifications.
Args
address
- event server's address
subscriptions
- subscriptions
kwargs
- additional arguments passed to
connect()
coroutine
Expand source code
async def connect(address: str, subscriptions: typing.List[common.EventType] = [], **kwargs ) -> 'Client': """Connect to event server For address format see `hat.chatter.connect` coroutine. According to Event Server specification, each subscription is event type identifier which can contain special subtypes ``?`` and ``*``. Subtype ``?`` can occure at any position inside event type identifier and is used as replacement for any single subtype. Subtype ``*`` is valid only as last subtype in event type identifier and is used as replacement for zero or more arbitrary subtypes. If subscription is empty list, client doesn't subscribe for any events and will not receive server's notifications. Args: address: event server's address subscriptions: subscriptions kwargs: additional arguments passed to `hat.chatter.connect` coroutine """ client = Client() client._async_group = aio.Group() client._conv_futures = {} client._event_queue = aio.Queue() client._conn = await chatter.connect(common.sbs_repo, address, **kwargs) client._async_group.spawn(aio.call_on_cancel, client._conn.async_close) if subscriptions: client._conn.send(chatter.Data(module='HatEvent', type='MsgSubscribe', data=[list(i) for i in subscriptions])) client._async_group.spawn(client._receive_loop) return client
async def run_client(monitor_client: Client, server_group: str, async_run_cb: typing.Callable[[Client], typing.Awaitable[NoneType]], subscriptions: typing.List[typing.Tuple[str, ...]] = []) ‑> typing.Any
-
Continuously communicate with currently active Event Server
This function tries to establish active connection with Event Server within monitor component group
server_group
. Once this connection is established,async_run_cb
is called with currently activeClient
instance. Once connection to Event Server is closed or new active Event Server is detected, execution ofasync_run_cb
is canceled. If new connection to Event Server is successfully established,async_run_cb
is called with new instance ofClient
.async_run_cb
is called when: * new activeClient
is createdasync_run_cb
execution is cancelled when: *run_client()
finishes execution * connection to Event Server is closed * different active Event Server is detected from Monitor Server's list of componentsrun_client()
finishes execution when: * connection to Monitor Server is closed *async_run_cb
finishes execution (by returning value or raising exception, other thanasyncio.CancelledError
)Return value of this function is the same as return value of
async_run_cb
. Ifasync_run_cb
finishes by raising exception or if connection to Monitor Server is closed, ConnectionError is raised.If execution of
run_client()
is canceled whileasync_run_cb
is running, connection to event server is closed afterasync_run_cb
cancellation finishes.Expand source code
async def run_client(monitor_client: hat.monitor.client.Client, server_group: str, async_run_cb: typing.Callable[[Client], typing.Awaitable[None]], subscriptions: typing.List[common.EventType] = [] ) -> typing.Any: """Continuously communicate with currently active Event Server This function tries to establish active connection with Event Server within monitor component group `server_group`. Once this connection is established, `async_run_cb` is called with currently active `Client` instance. Once connection to Event Server is closed or new active Event Server is detected, execution of `async_run_cb` is canceled. If new connection to Event Server is successfully established, `async_run_cb` is called with new instance of `Client`. `async_run_cb` is called when: * new active `Client` is created `async_run_cb` execution is cancelled when: * `run_client` finishes execution * connection to Event Server is closed * different active Event Server is detected from Monitor Server's list of components `run_client` finishes execution when: * connection to Monitor Server is closed * `async_run_cb` finishes execution (by returning value or raising exception, other than `asyncio.CancelledError`) Return value of this function is the same as return value of `async_run_cb`. If `async_run_cb` finishes by raising exception or if connection to Monitor Server is closed, ConnectionError is raised. If execution of `run_client` is canceled while `async_run_cb` is running, connection to event server is closed after `async_run_cb` cancellation finishes. """ async_group = aio.Group() address_queue = aio.Queue() async_group.spawn(aio.call_on_done, monitor_client.wait_closing(), address_queue.close) async_group.spawn(_address_loop, monitor_client, server_group, address_queue) address = None try: while True: while not address: address = await address_queue.get_until_empty() async with async_group.create_subgroup() as subgroup: address_future = subgroup.spawn(address_queue.get_until_empty) client_future = subgroup.spawn(_client_loop, address, subscriptions, async_run_cb) await asyncio.wait([address_future, client_future], return_when=asyncio.FIRST_COMPLETED) if address_future.done(): address = address_future.result() else: return client_future.result() except aio.QueueClosedError: raise ConnectionError() finally: await aio.uncancellable(async_group.async_close())
Classes
class Client
-
Event Server client
For creating new client see
connect()
coroutine.Expand source code
class Client(aio.Resource): """Event Server client For creating new client see `connect` coroutine. """ @property def async_group(self) -> aio.Group: """Async group""" return self._async_group async def receive(self) -> typing.List[common.Event]: """Receive subscribed event notifications Raises: ConnectionError """ try: return await self._event_queue.get() except aio.QueueClosedError: raise ConnectionError() def register(self, events: typing.List[common.RegisterEvent]): """Register events Raises: ConnectionError """ msg_data = chatter.Data(module='HatEvent', type='MsgRegisterReq', data=[common.register_event_to_sbs(i) for i in events]) self._conn.send(msg_data) async def register_with_response(self, events: typing.List[common.RegisterEvent] ) -> typing.List[typing.Optional[common.Event]]: # NOQA """Register events Each `common.RegisterEvent` from `events` is paired with results `common.Event` if new event was successfuly created or ``None`` is new event could not be created. Raises: ConnectionError """ msg_data = chatter.Data(module='HatEvent', type='MsgRegisterReq', data=[common.register_event_to_sbs(i) for i in events]) conv = self._conn.send(msg_data, last=False) return await self._wait_conv_res(conv) async def query(self, data: common.QueryData ) -> typing.List[common.Event]: """Query events from server Raises: ConnectionError """ msg_data = chatter.Data(module='HatEvent', type='MsgQueryReq', data=common.query_to_sbs(data)) conv = self._conn.send(msg_data, last=False) return await self._wait_conv_res(conv) async def _receive_loop(self): mlog.debug("starting receive loop") try: while True: mlog.debug("waiting for incoming message") msg = await self._conn.receive() msg_type = msg.data.module, msg.data.type if msg_type == ('HatEvent', 'MsgNotify'): mlog.debug("received event notification") self._process_msg_notify(msg) elif msg_type == ('HatEvent', 'MsgQueryRes'): mlog.debug("received query response") self._process_msg_query_res(msg) elif msg_type == ('HatEvent', 'MsgRegisterRes'): mlog.debug("received register response") self._process_msg_register_res(msg) else: raise Exception("unsupported message type") except ConnectionError: pass except Exception as e: mlog.error("read loop error: %s", e, exc_info=e) finally: mlog.debug("stopping receive loop") self._async_group.close() self._event_queue.close() for f in self._conv_futures.values(): f.set_exception(ConnectionError()) async def _wait_conv_res(self, conv): if not self.is_open: raise ConnectionError() response_future = asyncio.Future() self._conv_futures[conv] = response_future try: return await response_future finally: self._conv_futures.pop(conv, None) def _process_msg_notify(self, msg): events = [common.event_from_sbs(e) for e in msg.data.data] self._event_queue.put_nowait(events) def _process_msg_query_res(self, msg): f = self._conv_futures.get(msg.conv) if not f or f.done(): return events = [common.event_from_sbs(e) for e in msg.data.data] f.set_result(events) def _process_msg_register_res(self, msg): f = self._conv_futures.get(msg.conv) if not f or f.done(): return events = [common.event_from_sbs(e) if t == 'event' else None for t, e in msg.data.data] f.set_result(events)
Ancestors
- Resource
- abc.ABC
Instance variables
var async_group : Group
-
Async group
Expand source code
@property def async_group(self) -> aio.Group: """Async group""" return self._async_group
Methods
async def query(self, data: QueryData) ‑> typing.List[Event]
-
Query events from server
Raises
ConnectionError
Expand source code
async def query(self, data: common.QueryData ) -> typing.List[common.Event]: """Query events from server Raises: ConnectionError """ msg_data = chatter.Data(module='HatEvent', type='MsgQueryReq', data=common.query_to_sbs(data)) conv = self._conn.send(msg_data, last=False) return await self._wait_conv_res(conv)
async def receive(self) ‑> typing.List[Event]
-
Receive subscribed event notifications
Raises
ConnectionError
Expand source code
async def receive(self) -> typing.List[common.Event]: """Receive subscribed event notifications Raises: ConnectionError """ try: return await self._event_queue.get() except aio.QueueClosedError: raise ConnectionError()
def register(self, events: typing.List[RegisterEvent])
-
Register events
Raises
ConnectionError
Expand source code
def register(self, events: typing.List[common.RegisterEvent]): """Register events Raises: ConnectionError """ msg_data = chatter.Data(module='HatEvent', type='MsgRegisterReq', data=[common.register_event_to_sbs(i) for i in events]) self._conn.send(msg_data)
async def register_with_response(self, events: typing.List[RegisterEvent]) ‑> typing.List[typing.Union[Event, NoneType]]
-
Register events
Each
common.RegisterEvent
fromevents
is paired with resultscommon.Event
if new event was successfuly created orNone
is new event could not be created.Raises
ConnectionError
Expand source code
async def register_with_response(self, events: typing.List[common.RegisterEvent] ) -> typing.List[typing.Optional[common.Event]]: # NOQA """Register events Each `common.RegisterEvent` from `events` is paired with results `common.Event` if new event was successfuly created or ``None`` is new event could not be created. Raises: ConnectionError """ msg_data = chatter.Data(module='HatEvent', type='MsgRegisterReq', data=[common.register_event_to_sbs(i) for i in events]) conv = self._conn.send(msg_data, last=False) return await self._wait_conv_res(conv)
Inherited members