Event Server

Event Server is a central component responsible for registering, processing, storing and providing access to events.

Running

By installing Event Server from hat-event package, executable hat-event becomes available and can be used for starting this component.

usage: hat-event [-h] [--conf path]
                 [--additional-json-schemas-path [path [path ...]]]

optional arguments:
  -h, --help            show this help message and exit
  --conf path           configuration defined by hat://event/main.yaml#
                        (default $XDG_CONFIG_HOME/hat/event.yaml)
  --additional-json-schemas-path [path [path ...]]
                        additional json schemas paths

Event

Event is a generic data structure used for communication and storage of relevant changes happening through time in a Hat system. For example, event can represent a data change triggered by a device outside of the system, or an inner notification created by a component inside the system. Each event is immutable and uniquely identified by its event id. Event Server is the only component responsible for creating events - all other components shall request Event Server to create a new event.

Event data structure:

  • id

    A unique Event identifier containing Event Server identifier and event instance identifier. This property is always set by the server.

  • type

    Type is a user (client) defined list of strings. Semantics of the list’s elements and their position in the list is determined by the user and is not predefined by the Event server. This list should be used as the main identifier of the occurred event’s type. Each component registering an event should have its own naming convention defined which does not collide with other components’ naming conventions. This property is set by the user while registering an event. Subtypes ? and * are not allowed as parts of event type.

    When used in querying and subscription, this property has additional semantics. Any string in the list can be replaced with ? while the last string can also be replaced with *. Replacements must substitute an entire string in the list. The semantics of these replacements are:

    • The string ?

      is matched with a single arbitrary string.

    • The string *

      is matched with any number (zero or more) of arbitrary strings.

  • timestamp

    This property determines the moment when the event was registered on the server. It is always set by the server.

  • source timestamp

    This property is optional. It represents the moment the Event occurred as detected by the source of the Event. It is always set by the client.

  • payload

    This property is optional. It can be used to provide additional data bound to an event. The payload property is always set by the client registering the event. Its contents are determined by the Event’s type and can be decoded by clients who understand Events of that type. Payload can be encoded as binary data, JSON data or SBS encoded data. Event server doesn’t decode payload while receiving event requests, storing events or providing query results. Payload can be optionally decoded by Event Server’s modules.

Server - Client communication

Communication between server and client is based on chatter communication utilizing following messages:

Message

Conversation

Direction

First

Last

Token

MsgSubscribe

T

T

T

c > s

MsgNotify

T

T

T

s > c

MsgRegisterReq

T

T/F

T

c > s

MsgRegisterRes

F

T

T

s > c

MsgQueryReq

T

F

T

c > s

MsgQueryRes

F

T

T

s > c

Actions available to clients which are directly mapped to exchange of communication messages:

  • subscribe

    A client can, at any time, subscribe to Events of certain types by sending a subscribe message (MsgSubscribe) to the server. After server receives subscribe message, it will spontaneously notify the client (by sending MsgNotify) whenever an Event occurs with the type matched to any type in the subscription message. Matching is done as described in event’s type property including the ? and * options. A client can send as many subscribe messages as it wants, each new subscription message implicitly invalidates previous subscriptions. Initially, after new connection between server and client is established, client isn’t subscribed to any events. Both subscribe and notify messages can be sent at any time independently of other communication messages. Events that are notified by single MsgNotify are mutually unrelated.

  • register event

    A client can, at any time, send new request for event registration. Those register requests are sent as part of MsgRegisterReq message. Single MsgRegisterReq may contain an arbitrary number of registration requests which are all registered at the same time. Single register event contains event type; and optional source timestamp and payload. Upon receiving MsgRegisterReq, it is responsibility of a server to create new event for each register event. All events created based on a single MsgRegisterReq have the same timestamp. If a client doesn’t end chatter conversation (MsgRegisterReq last flag is false), once associated events are created server will respond with MsgRegisterRes and end conversation. For each register event in MsgRegisterReq, associated MsgRegisterRes contains newly created event, or information about event registration failure.

  • query events

    At any time, client can initiate new event query by sending MsgQueryReq message. Upon receiving query request, server will provide all available events that match query criteria as part of single MsgQueryRes. Single query request can contain multiple filter conditions which ALL must be met for all events provided to client as query result. Query request contains:

    • ids - optional filter condition

      If set, only events with ids which are defined as part of filter condition are matched.

    • types - optional filter condition

      List of event types. If set, event type has to match at least one type from the list. Matching is done as defined in event’s type property description - including the ? and * options.

    • from timestamp - optional filter condition

      If set, only events with timestamp greater than or equal are matched.

    • to timestamp - optional filter condition

      If set, only events with timestamp lower than or equal are matched.

    • from source timestamp - optional filter condition

      If set, only events with source timestamp defined, and greater than or equal, are matched.

    • to source timestamp - optional filter condition

      If set, only events with source timestamp defined, and lower than or equal, are matched.

    • payload - optional filter condition

      If set, only events with payload defined and whose payload is the same as the query’s payload are matched.

    • order

      Can be set to ‘ascending’ or ‘descending’. If set to ‘ascending’, matched Events will be returned ordered from the earliest to the latest dependent on their timestamp or source timestamp (this choice is determined by the order by property of the query). Earliest meaning lower timestamp, latest meaning greater timestamp. If set to descending the same logic applies, but the order is reversed.

    • order by

      Can be set to ‘timestamp’ or ‘source timestamp’. Ordering Events by ‘source timestamp’ has events with ‘source timestamp’ undefined returned last in an arbitrary order.

    • unique type

      If set to true, it determines whether the matched Events will contain only one event instance of the same type. With the query ‘order’ set to ‘descending’, only one Event with the greatest timestamp or source timestamp will be matched. Setting the ‘order’ to ‘ascending’ will match the Event with the lowest timestamp or source timestamp.

    • max results

      If set, limits the number of matched Events to this number. Matched Events are dependent on the query ‘order’ the same way as in ‘unique type’.

Server - Server communication

Todo

define backend engine sync messages

Components

Event Server functionality can be defined by using the following components:

folder "Component 1" <<Component>> {
    component "Event Client" as Client1
}

folder "Component 2" <<Component>> {
    component " Event Client" as Client2
}

folder "Event Server" {
    component Communication
    component "Module Engine" as ModuleEngine
    component "Generic Module 1" <<Module>> as Module1
    component "Generic Module 2" <<Module>> as Module2
    component "Specialized Module Engine" <<Module>> as SpecModuleEngine
    component "Specialized Module 1" <<Specialized Module>> as SpecModule1
    component "Specialized Module 2" <<Specialized Module>> as SpecModule2
    component "Backend Engine" as BackendEngine
    component "Backend" as Backend
    component "Backend 1" <<Backend>> as Backend1
    component "Backend 2" <<Backend>> as Backend2

    interface subscribe
    interface notify
    interface register
    interface query
}

folder "Remote Event Server" {
    component "Backend Engine" as RemoteBackendEngine
}

database "Database 1" <<Database>> as Database1
database "Database 2" <<Database>> as Database2

Communication -- subscribe
Communication -- notify
Communication -- register
Communication -- query

subscribe <-- Client1
notify --> Client1
register <-- Client1
query <-- Client1

subscribe <-- Client2
notify --> Client2
register <-- Client2
query <-- Client2

ModuleEngine <-> Communication
ModuleEngine --> BackendEngine

Module1 --o ModuleEngine
Module2 --o ModuleEngine
SpecModuleEngine --o ModuleEngine
SpecModule1 --o SpecModuleEngine
SpecModule2 --o SpecModuleEngine

BackendEngine o-- Backend
Backend <|-- Backend1
Backend <|-- Backend2

Backend1 --> Database1
Backend2 --> Database2

RemoteBackendEngine <--> BackendEngine

Client

Event client is any component that provides client functionality in Server - Client communication. Package hat-event provides python implementation of hat.event.client module which can be used as a basis for communication with Event Server. This module provides low-level and high-level communication API. For more detail see documentation of hat.event.client module.

Communication

Event Server’s communication module is responsible for providing implementation of server side Server - Client communication. This component translates client requests to module engine’s method calls. At the same time, it observes all new event notifications made by module engine and notifies clients with appropriate messages.

RegisterEvent objects obtained from client’s register requests must be converted to ProcessEvent before they can be passed for further processing to module engine. This conversion is done by module engine, as it is the only entity responsible for creating new ProcessEvent objects.

A unique identifier is assigned to each chatter connection established with communication (unique for the single execution lifetime of Event Server process). This identifier is associated with all ProcessEvent objects obtained from corresponding connection.

Communication associates connection with information received as part of connection’s last subscribe message. This subscription is used as a filter for selecting subset of event notifications which are sent to associated connection.

Communication module is responsible for registering events each time new chatter connection is established and existing chatter connection is closed:

  • ‘event’, ‘communication’, ‘connected’

    • source timestamp - None

    • payload - None

  • ‘event’, ‘communication’, ‘disconnected’

    • source timestamp - None

    • payload - None

Module engine

Module engine is responsible for creating modules and coordinating event registration, processing and querying between communication, modules and backend engine.

Module engine provides method for creating process events utilized by communication and modules. By creating process events, register events are enhanced with unique identifier and source identification. Identifier assigned to process event is the same one that is assigned to corresponding event. Information regarding source identifier is available only during processing of process event and is discarded once event is created.

Process of creating events based on a single set of process events is called session. Module engine starts new session each time communication or module requests new registration. Session ends once backend engine returns result of event registration and all modules are notified with this result. Start and end of each session is notified to each module by creating and closing module session. Each module instantiates its own module session.

During session processing, each module session is notified with a list of new and deleted process events which are not previously presented to that module. Processing these process events by module session can result in new process events which are to be added to current session or list of previously added process events which are to be deleted from session. All other module sessions, except the one that produced list of new and deleted process events, are notified with those process event lists. This process continues iteratively until all module sessions return empty lists for both new and deleted process events. Processing process events by single module session is always sequential - module session is notified with session changes after its previous notification processing is finished. Different module sessions may be processed concurrently. Module engine keeps order of new process events added to session, but it is allowed to aggregate processing results from multiple module sessions into a single session change notification.

Care should be taken by module implementation not to cause self recursive or mutually recursive endless processing loop.

Each module can define its event type filter condition which is used for filtering new and deleted process events that will get notified to module session. When session finishes, module session is closed by calling its async_close method, which receives a list of all newly created events resulting from the session processing. These events are not filtered by module subscription. By calling async_close with list of all newly registered events, module engine provides each module session with opportunity to post-process single session as a whole.

Todo

do we want to filter resulting events passed to module session close with module’s subscription?

Modules

Warning

Event server does not provide sandbox environment for loading end executing modules. Modules have full access to Event Server functionality which is controlled with module execution. Module implementation and configuration should be written in accordance to other modules and Event Server as a whole, keeping in mind processing execution time overhead and possible interference between modules.

Each module represents predefined and configurable closely related functions that can modify the process of registering new events or initiate new event registration sessions. When created, module is provided with reference to module engine which can be used for creating new process events, registering process events and querying events. Responsibility of each module, upon creation, is to create its own source identifier which will be unique for single Event Server process execution.

Modules available as part of hat-event package:

Backend engine

Backend engine is responsible for actions involving persisting process events and querying events. During initialization, backend engine creates a single instance of backend which is used for storage.

During registration of events, backend engine converts process events to backend events. This conversion involves setting event’s timestamp (which is the same for all events registered in a single session) and replacing event type with event type identifier. To enable this, backend engine has to maintain association reference between event types and corresponding event type identifiers. By replacing event types with identifiers, list of strings which represent event type is replaced with a single numeric identifier.

Query request which is passed to backend engine contains event types as optional filter condition. Responsibility of backend engine is to find all matching event type identifiers and query backend based on a list of identifiers.

Todo

can we associate type identifiers based on subtypes and utilize identifier ranges instead of explicitly passing each identifier to backend

Todo

sync between backend engines

Backends

Backends are simple wrappers for storing and retrieving events from specialized storage engines.

Backends available as part of hat-event package:

Python implementation

Common

Common functionality shared between clients and event server

class hat.event.common.Order(value)

Bases: enum.Enum

An enumeration.

  • DESCENDING

  • ASCENDING

DESCENDING = 1
ASCENDING = 2
class hat.event.common.OrderBy(value)

Bases: enum.Enum

An enumeration.

  • TIMESTAMP

  • SOURCE_TIMESTAMP

TIMESTAMP = 1
SOURCE_TIMESTAMP = 2
class hat.event.common.EventPayloadType(value)

Bases: enum.Enum

An enumeration.

  • BINARY

  • JSON

  • SBS

BINARY = 1
JSON = 2
SBS = 3
class hat.event.common.EventId(server, instance)

Bases: tuple

Create new instance of EventId(server, instance)

instance

event instance identifier

Type

int

server

server identifier

Type

int

hat.event.common.EventType

Event type

alias of List[str]

class hat.event.common.EventPayload(type, data)

Bases: tuple

Create new instance of EventPayload(type, data)

data

data

Type

Union[bytes,json.Data,SbsData]

type

payload type

Type

EventPayloadType

class hat.event.common.SbsData(module, type, data)

Bases: tuple

Create new instance of SbsData(module, type, data)

data

data

Type

bytes

module

SBS module name

Type

Optional[str]

type

SBS type name

Type

str

class hat.event.common.Event(event_id, event_type, timestamp, source_timestamp, payload)

Bases: tuple

Create new instance of Event(event_id, event_type, timestamp, source_timestamp, payload)

event_id

event identifier

Type

EventId

event_type

event type

Type

EventType

payload

payload

Type

Optional[EventPayload]

source_timestamp

source timestamp

Type

Optional[Timestamp]

timestamp

timestamp

Type

Timestamp

class hat.event.common.RegisterEvent(event_type, source_timestamp, payload)

Bases: tuple

Create new instance of RegisterEvent(event_type, source_timestamp, payload)

event_type

event type

Type

EventType

payload

payload

Type

Optional[EventPayload]

source_timestamp

source timestamp

Type

Optional[Timestamp]

class hat.event.common.QueryData(event_ids, event_types, t_from, t_to, source_t_from, source_t_to, payload, order, order_by, unique_type, max_results)

Bases: tuple

Create new instance of QueryData(event_ids, event_types, t_from, t_to, source_t_from, source_t_to, payload, order, order_by, unique_type, max_results)

event_ids

event identifiers

Type

Optional[List[EventId]]

event_types

event types

Type

Optional[List[EventType]]

max_results

maximum results

Type

Optional[int]

order

order

Type

Order

order_by

order by

Type

OrderBy

payload

payload

Type

Optional[EventPayload]

source_t_from

Optional[Timestamp]

source_t_to

Optional[Timestamp]

t_from

timestamp from

Type

Optional[Timestamp]

t_to

timestamp to

Type

Optional[Timestamp]

unique_type

unique type flag

Type

bool

hat.event.common.matches_query_type(event_type, query_type)

Determine if event type matches query type

Event type is tested if it matches query type according to the following rules:

  • Matching is performed on subtypes in increasing order.

  • Event type is a match only if all its subtypes are matched by corresponding query subtypes.

  • Matching is finished when all query subtypes are exhausted.

  • Query subtype ‘?’ matches exactly one event subtype of any value. The subtype must exist.

  • Query subtype ‘*’ matches 0 or more event subtypes of any value. It must be the last query subtype.

  • All other values of query subtype match exactly one event subtype of the same value.

  • Query type without subtypes is matched only by event type with no subtypes.

As a consequence of aforementioned matching rules, event subtypes ‘*’ and ‘?’ cannot be directly matched and it is advisable not to use them in event types.

Parameters
  • event_type (EventType) – event type

  • query_type (EventType) – query type

Returns

true if matches

Return type

bool

class hat.event.common.SubscriptionRegistry

Bases: object

Registry of event type subscriptions

A tree-like collection that maps event types to hashable values. A map between an event type and a value is called a subscription. Registry enables finding all values that are subscribed to some event type. Each value can be subscribed to multiple event types.

When adding a value to the registry, its subscriptions are specified by a query type. It can be the same as an event type, but also contain ‘?’ and ‘*’ wildcard subtypes. Value is subscribed to all event types that match the given query type. For more details on matching see matches_query_type().

When a value is removed from the registry, all its subscriptions are also removed.

add(value, query_type)

Add and subscribe value

Adds value to the registry and subscribes it to all event types that match query type. If value is already in the registry, new subscriptions will be added to previous.

Parameters
  • value (Hashable) – value

  • query_type (EventType) – query type

remove(value)

Remove and unsubscribe value

Removes value from the registry with all its subscriptions.

Parameters

value (Hashable) – value

find(event_type)

Find subscribed values

Finds and returns all values that are subscribed to event type.

Parameters

event_type (EventType) – event type

Returns

values subscribed to event type

Return type

Set[Hashable]

class hat.event.common.Timestamp(s, us)

Bases: hat.event.common.Timestamp

Create new instance of Timestamp(s, us)

hat.event.common.timestamp_to_bytes(t)

Convert timestamp to 96 bit representation

Bits 0 - 63 are big endian two’s complement encoded Timestamp.s and bits 64 - 95 are big endian two’s complement encoded Timestamp.us.

Parameters

t (Timestamp) – timestamp

Returns

bytes

hat.event.common.timestamp_from_bytes(data)

Create new timestamp from 96 bit representation

Bytes representation is same as defined for timestamp_to_bytes()

Parameters

data (bytes) – 96 bit timestamp

Returns

timestamp

Return type

Timestamp

hat.event.common.timestamp_to_float(t)

Convert timestamp to floating number of seconds since 1970-01-01 UTC

For precise serialization see timestamp_to_bytes() / timestamp_from_bytes()

Parameters

t (Timestamp) – timestamp

Returns

timestamp

Return type

float

hat.event.common.timestamp_from_float(ts)

Create new timestamp from floating number of seconds since 1970-01-01 UTC

For precise serialization see timestamp_to_bytes() / timestamp_from_bytes()

Parameters

ts (float) – seconds since 1970-01-01

Returns

timestamp

Return type

Timestamp

hat.event.common.timestamp_to_datetime(t)

Convert timestamp to datetime

For precise serialization see timestamp_to_bytes() / timestamp_from_bytes()

Parameters

t (Timestamp) – timestamp

Returns

datetime (representing utc time)

Return type

datetime.datetime

hat.event.common.timestamp_from_datetime(dt)

Create new timestamp from datetime

If tzinfo is not set, it is assumed that provided datetime represents utc time.

For precise serialization see timestamp_to_bytes() / timestamp_from_bytes()

Parameters

dt (datetime.datetime) – datetime

Returns

timestamp

Return type

Timestamp

hat.event.common.timestamp_to_sbs(t)

Convert timestamp to SBS data

Parameters

t (Timestamp) – timestamp

Returns

SBS data

Return type

hat.sbs.Data

hat.event.common.timestamp_from_sbs(data)

Create new timestamp from SBS data

Parameters

data (hat.sbs.Data) – SBS data

Returns

timestamp

Return type

Timestamp

hat.event.common.now()

Create new timestamp representing current time

Returns

timestamp

Return type

Timestamp

hat.event.common.event_to_sbs(event)

Convert Event to SBS data

Parameters

event (Event) – event

Returns

hat.sbs.Data

hat.event.common.event_from_sbs(data)

Create new Event based on SBS data

Parameters

data (hat.sbs.Data) – SBS data

Returns

Event

hat.event.common.register_event_to_sbs(event)

Convert RegisterEvent to SBS data

Parameters

event (RegisterEvent) – register event

Returns

hat.sbs.Data

hat.event.common.register_event_from_sbs(data)

Create new RegisterEvent based on SBS data

Parameters

data (hat.sbs.Data) – SBS data

Returns

RegisterEvent

hat.event.common.query_to_sbs(query)

Convert QueryData to SBS data

Parameters

query (QueryData) – query data

Returns

hat.sbs.Data

hat.event.common.query_from_sbs(data)

Create new QueryData based on SBS data

Parameters

data (hat.sbs.Data) – SBS data

Returns

QueryData

hat.event.common.event_payload_to_sbs(payload)

Convert EventPayload to SBS data

Parameters

payload (EventPayload) – event payload

Returns

hat.sbs.Data

hat.event.common.event_payload_from_sbs(data)

Create new EventPayload based on SBS data

Parameters

data (hat.sbs.Data) – SBS data

Returns

EventPayload

Client

Library used by components for communication with Event Server

This module provides low-level interface (connect/Client) and high-level interface (run_client) for communication with Event Server.

connect() is used for establishing single chatter based connection with Event Server which is represented by Client. Once connection is terminated (signaled with Client.closed()), it is up to user to repeat connect() call and create new Client instance, if additional communication with Event Server is required.

Example of low-level interface usage:

client = await hat.event.client.connect('tcp+sbs://127.0.0.1:23012',
                                        [['x', 'y', 'z']])

registered_events = await client.register_with_response([
    hat.event.common.RegisterEvent(
        event_type=['x', 'y', 'z'],
        source_timestamp=hat.event.common.now(),
        payload=hat.event.common.EventPayload(
            type=hat.event.common.EventPayloadType.BINARY,
            data=b'test'))])

received_events = await client.receive()

queried_events = await client.query(
    hat.event.common.QueryData(
        event_types=[['x', 'y', 'z']],
        max_results=1))

assert registered_events == received_events
assert received_events == queried_events

await client.async_close()

run_client() provides high-level interface for continuous communication with currenty active Event Server based on information obtained from Monitor Server. This function repeatedly tries to create active connection with Event Server. When this connection is created, users code is notified by calling async_run_cb callback. Once connection is closed, execution of async_run_cb is cancelled and run_client() repeats connection estabishment process.

Example of high-level interface usage:

async def monitor_async_run(monitor):
    await hat.event.client.run_client(
        monitor_client=monitor,
        server_group='event servers',
        async_run_cb=event_async_run])

async def event_async_run(client):
    while True:
        assert not client.closed.done()
        await asyncio.sleep(10)

await hat.monitor.client.run_component(
    conf={'name': 'client',
          'group': 'test clients',
          'monitor_address': 'tcp+sbs://127.0.0.1:23010',
          'component_address': None},
    async_run_cb=monitor_async_run)
hat.event.client.mlog

module logger

Type

logging.Logger

hat.event.client.reconnect_delay

delay in seconds before trying to reconnect to event server (used in high-level interface)

Type

int

async hat.event.client.connect(address, subscriptions=None, **kwargs)

Connect to event server

For address format see hat.chatter.connect().

According to Event Server specification, each subscription is event type identifier which can contain special subtypes ? and *. Subtype ? can occure at any position inside event type identifier and is used as replacement for any single subtype. Subtype * is valid only as last subtype in event type identifier and is used as replacement for zero or more arbitrary subtypes.

If subscription is not defined (is None), client doesn’t subscribe for any events and will not receive server’s notifications.

Parameters
  • address (str) – event server’s address

  • subscriptions (Optional[List[common.EventType]]) – subscriptions

  • kwargs – additional arguments passed to hat.chatter.connect()

Returns

Client

class hat.event.client.Client

Bases: object

Event Server client

For creating new client see connect().

property closed

closed future

Type

asyncio.Future

async async_close()

Async close

async receive()

Receive subscribed event notifications

Returns

List[common.Event]

Raises

chatter.ConnectionClosedError – closed chatter connection

register(events)

Register events

Parameters

events (List[common.RegisterEvent]) – register events

Raises
  • chatter.ConnectionClosedError – closed chatter connection

  • Exception

async register_with_response(events)

Register events

Each RegisterEvent from events is paired with results Event if new event was successfuly created or None is new event could not be created.

Parameters

events (List[common.RegisterEvent]) – register events

Returns

List[Optional[common.Event]]

Raises
  • chatter.ConnectionClosedError – closed chatter connection

  • Exception

async query(data)

Query events from server

Parameters

data (common.QueryData) – query data

Returns

List[common.Event]

Raises
  • chatter.ConnectionClosedError – closed chatter connection

  • Exception

async hat.event.client.run_client(monitor_client, server_group, async_run_cb, subscriptions=None)

Continuously communicate with currently active Event Server

This function tries to establish active connection with Event Server. Once this connection is established, async_run_cb is called with currently active Client. Once connection to Event Server is closed or new active Event Server is detected, execution of async_run_cb is canceled. If new connection to Event Server is successfuly established, async_run_cb is called with new instance of Client.

async_run_cb is called when:
  • new active Client is created

async_run_cb execution is cancelled when:
  • run_client() finishes execution

  • connection to Event Server is closed

  • different active Event Server is detected from Monitor Server’s list of components

run_client() finishes execution when:
  • connection to Monitor Server is closed

  • async_run_cb finishes execution (by returning value or raising exception other than asyncio.CancelledError)

Return value of this function is the same as return value of async_run_cb. If async_run_cb finishes by raising exception or if connection to Monitor Server is closed, exception is reraised.

Todo

review conditions when run_client finishes execution

Parameters
  • monitor_client (hat.monitor.client.Client) – monitor client

  • server_group (str) – event server’s component group

  • async_run_cb (Callable[[Client],None]) – run callback

  • subscriptions (Optional[List[common.EventType]]) – subscriptions

Returns

Any

Server

Common event server structures and functionalty

class hat.event.server.common.SourceType(value)

Bases: enum.Enum

An enumeration.

  • COMMUNICATION

  • MODULE

COMMUNICATION = 1
MODULE = 2
class hat.event.server.common.Source(type, name, id)

Bases: tuple

Create new instance of Source(type, name, id)

id

identifier

Type

int

name

source name (module name)

Type

Optional[str]

type

source type

Type

SourceType

class hat.event.server.common.ProcessEvent(event_id, source, event_type, source_timestamp, payload)

Bases: tuple

Create new instance of ProcessEvent(event_id, source, event_type, source_timestamp, payload)

event_id

event identifier

Type

EventId

event_type

event type

Type

EventType

payload

payload

Type

Optional[EventPayload]

source

source

Type

Source

source_timestamp

source timestamp

Type

Optional[Timestamp]

class hat.event.server.common.SessionChanges(new, deleted)

Bases: tuple

Create new instance of SessionChanges(new, deleted)

deleted

deleted register events

Type

List[ProcessEvent]

new

new register events

Type

List[ProcessEvent]

hat.event.server.common.BackendConf

Backend configuration

alias of Union[None, bool, int, float, str, List[Data], Dict[str, Data]]

hat.event.server.common.CreateBackend

Create backend callable

alias of Callable[[Union[None, bool, int, float, str, List[Data], Dict[str, Data]]], Awaitable[Backend]]

class hat.event.server.common.Backend

Bases: abc.ABC

Backend ABC

Backend is implemented as python module which is dynamically imported. It is expected that this module implements:

  • json_schema_id (Optional[str]): JSON schema id

  • create (CreateBackend): create new backend instance

If module defines JSON schema id, it will be used for aditional validation of backend’s configuration.

abstract property closed

closed future

Type

asyncio.Future

abstract async async_close()

Async close

async get_last_event_id(server_id)

Get last registered event id associated with server id

Parameters

server_id (int) – server identifier

Returns

common.EventId

abstract async register(events)

Register events

Todo

do we need list of success flags as result?

Parameters

events (List[Event]) – events

abstract async query(data)

Query events

Parameters

data (QueryData) – query data

Returns

List[Event]

class hat.event.server.common.EventTypeRegistryStorage

Bases: abc.ABC

EventTypeRegistry storage ABC

This interface specifies perzistent storage used by EventTypeRegistry.

abstract async get_event_type_mappings()

Get all event type mappings

Returned dict has event type ids as keys and associated event types as values.

Returns

Dict[int,EventType]

abstract async add_event_type_mappings(mappings)

Add new event type mappings

mappings dict has event type ids as keys and associated event types as values. New mappings are appended to allready existing mappings.

Parameters

mappings (Dict[int,EventType]) – event type mappings

class hat.event.server.common.Module

Bases: abc.ABC

Module ABC

Module is implemented as python module which is dynamically imported. It is expected that this module implements:

  • json_schema_id (Optional[str]): JSON schema id

  • create (CreateModule): create new module instance

If module defines JSON schema id, it will be used for aditional validation of module’s configuration.

Module’s subscriptions are constant during module’s lifetime.

abstract property subscriptions

subscribed event types filter

Type

List[EventType]

abstract property closed

closed future

Type

asyncio.Future

abstract async async_close()

Async close

abstract async create_session()

Create new module session

Returns

ModuleSession

class hat.event.server.common.ModuleSession

Bases: abc.ABC

abstract property closed

closed future

Type

asyncio.Future

abstract async async_close(events)

Async close

This method is called with all registered session events without applying subscription filter.

Todo

do we wan’t to apply subscription filter

Parameters

events (List[Event]) – registered events

abstract async process(changes)

Process session changes

Changes include only process events which are matched by modules subscription filter.

Single module session process is always called sequentially.

Parameters

changes (SessionChanges) – session changes

Returns

SessionChanges

async hat.event.server.common.create_event_type_registry(storage)

Create EventTypeRegistry instance

This class can be used for simple mapping between event types and unique numerical event type identifiers.

class hat.event.server.common.EventTypeRegistry

Bases: object

get_event_type(identifier)

Get event types associated with identifier

async get_identifiers(event_types)

Get identifiers associated with event types

If event type doesn’t have previously defined identifier, new one is created and stored in storage.

Parameters

event_types (Iterable[EventType]) – event types

Returns

List[int]

query_identifiers(event_types)

Get identifiers matching event type queries

Parameters

event_types (EventType) – event type queries

Returns

Set[int]

Event server’s communication

hat.event.server.communication.mlog

module logger

Type

logging.Logger

async hat.event.server.communication.create(conf, engine)

Create communication

Parameters
  • conf (hat.json.Data) – configuration defined by hat://event/main.yaml#/definitions/communication

  • engine (hat.event.module_engine.ModuleEngine) – module engine

Returns

Communication

class hat.event.server.communication.Communication

Bases: object

property closed

closed future

Type

asyncio.Future

async async_close()

Async close

Module engine

exception hat.event.server.module_engine.ModuleEngineClosedError

Bases: Exception

Error signaling closed module engine

async hat.event.server.module_engine.create(conf, backend_engine)

Create module engine

Parameters
  • conf (hat.json.Data) – configuration defined by hat://event/main.yaml#/definitions/module_engine

  • backend_engine (hat.event.backend_engine.BackendEngine) – backend engine

Returns

ModuleEngine

class hat.event.server.module_engine.ModuleEngine

Bases: object

property closed

closed future

Type

asyncio.Future

async async_close()

Async close

register_events_cb(cb)

Register events callback

Parameters

cb (Callable[[List[common.Event]],None]) – change callback

Returns

util.RegisterCallbackHandle

create_process_event(source, event)

Create process event

Parameters
  • source (common.Source) – event source

  • event (common.RegisterEvent) – register event

Returns

common.ProcessEvent

async register(source, events)

Register events

Parameters
  • source (common.Source) – event source

  • events (List[common.RegisterEvent]) – register events

Returns

List[Optional[common.Event]]

Raises
  • ModuleEngineClosedError

  • Exception

async query(data)

Query events

Parameters

data (common.QueryData) – query data

Returns

List[common.Event]

Raises
  • ModuleEngineClosedError

  • Exception

Backend engine

async hat.event.server.backend_engine.create(conf)

Create backend engine

Parameters

conf (hat.json.Data) – configuration defined by hat://event/main.yaml#/definitions/backend_engine

Returns

BackendEngine

class hat.event.server.backend_engine.BackendEngine

Bases: object

property closed

closed future

Type

asyncio.Future

async async_close()

Async close

async get_last_event_id()

Get last registered event id

Returns

common.EventId

async register(events)

Register events

Parameters

events (List[common.ProcessEvent]) – process events

Returns

List[Optional[common.Event]]

async query(data)

Query events

Parameters

data (common.QueryData) – query data

Returns

List[common.Event]