Module hat.event.common

Common functionality shared between clients and event server

Expand source code
"""Common functionality shared between clients and event server"""

from pathlib import Path
import collections
import datetime
import enum
import struct
import typing

from hat import chatter
from hat import json
from hat import sbs
import hat.monitor.common


package_path: Path = Path(__file__).parent
"""Python package path"""

json_schema_repo: json.SchemaRepository = json.SchemaRepository(
    json.json_schema_repo,
    hat.monitor.common.json_schema_repo,
    json.SchemaRepository.from_json(package_path / 'json_schema_repo.json'))
"""JSON schema repository"""

sbs_repo = sbs.Repository(
    chatter.sbs_repo,
    sbs.Repository.from_json(package_path / 'sbs_repo.json'))
"""SBS schema repository"""

EventType: typing.Type = typing.Tuple[str, ...]
"""Event type"""


Order = enum.Enum('Order', [
    'DESCENDING',
    'ASCENDING'])


OrderBy = enum.Enum('OrderBy', [
    'TIMESTAMP',
    'SOURCE_TIMESTAMP'])


EventPayloadType = enum.Enum('EventPayloadType', [
    'BINARY',
    'JSON',
    'SBS'])


class EventId(typing.NamedTuple):
    server: int
    """server identifier"""
    instance: int
    """event instance identifier"""


class EventPayload(typing.NamedTuple):
    type: EventPayloadType
    data: typing.Union[bytes, json.Data, 'SbsData']


class SbsData(typing.NamedTuple):
    module: typing.Optional[str]
    """SBS module name"""
    type: str
    """SBS type name"""
    data: bytes


class Event(typing.NamedTuple):
    event_id: EventId
    event_type: EventType
    timestamp: 'Timestamp'
    source_timestamp: typing.Optional['Timestamp']
    payload: typing.Optional[EventPayload]


class RegisterEvent(typing.NamedTuple):
    event_type: EventType
    source_timestamp: typing.Optional['Timestamp']
    payload: typing.Optional[EventPayload]


class QueryData(typing.NamedTuple):
    event_ids: typing.Optional[typing.List[EventId]] = None
    event_types: typing.Optional[typing.List[EventType]] = None
    t_from: typing.Optional['Timestamp'] = None
    t_to: typing.Optional['Timestamp'] = None
    source_t_from: typing.Optional['Timestamp'] = None
    source_t_to: typing.Optional['Timestamp'] = None
    payload: typing.Optional[EventPayload] = None
    order: Order = Order.DESCENDING
    order_by: OrderBy = OrderBy.TIMESTAMP
    unique_type: bool = False
    max_results: typing.Optional[int] = None


def matches_query_type(event_type: EventType,
                       query_type: EventType
                       ) -> bool:
    """Determine if event type matches query type

    Event type is tested if it matches query type according to the following
    rules:

        * Matching is performed on subtypes in increasing order.
        * Event type is a match only if all its subtypes are matched by
          corresponding query subtypes.
        * Matching is finished when all query subtypes are exhausted.
        * Query subtype '?' matches exactly one event subtype of any value.
          The subtype must exist.
        * Query subtype '*' matches 0 or more event subtypes of any value. It
          must be the last query subtype.
        * All other values of query subtype match exactly one event subtype
          of the same value.
        * Query type without subtypes is matched only by event type with no
          subtypes.

    As a consequence of aforementioned matching rules, event subtypes '*' and
    '?' cannot be directly matched and it is advisable not to use them in event
    types.

    """
    is_variable = bool(query_type and query_type[-1] == '*')
    if is_variable:
        query_type = query_type[:-1]

    if len(event_type) < len(query_type):
        return False

    if len(event_type) > len(query_type) and not is_variable:
        return False

    for i, j in zip(event_type, query_type):
        if j != '?' and i != j:
            return False

    return True


class Subscription:
    """Subscription defined by query event types"""

    _Node = typing.Tuple[bool,                  # is_leaf
                         typing.Dict[str,       # subtype
                                     '_Node']]  # child

    def __init__(self, query_types: typing.Iterable[EventType]):
        self._root = False, {}
        for query_type in query_types:
            self._root = Subscription._add_query_type(self._root, query_type)

    def get_query_types(self) -> typing.Iterable[EventType]:
        """Calculate sanitized query event types"""
        yield from Subscription._get_query_types(self._root)

    def matches(self, event_type: EventType) -> bool:
        """Does `event_type` match subscription"""
        return Subscription._matches(self._root, event_type)

    def union(self, *others: 'Subscription') -> 'Subscription':
        """Create new subscription including event types from this and
        other subscriptions."""
        result = Subscription([])
        result._root = Subscription._union(
            [self._root, *(other._root for other in others)])
        return result

    def isdisjoint(self, other: 'Subscription') -> bool:
        """Return ``True`` if this subscription has no event types in common
        with other subscription."""
        return Subscription._isdisjoint(self._root, other._root)

    @staticmethod
    def _add_query_type(node, query_type):
        is_leaf, children = node

        if '*' in children:
            return node

        if not query_type:
            return True, children

        head, rest = query_type[0], query_type[1:]

        if head == '*':
            if rest:
                raise ValueError('invalid query event type')
            children.clear()
            children['*'] = True, {}

        else:
            child = children.get(head, (False, {}))
            child = Subscription._add_query_type(child, rest)
            children[head] = child

        return node

    @staticmethod
    def _get_query_types(node):
        is_leaf, children = node

        if is_leaf and '*' not in children:
            yield ()

        for head, child in children.items():
            for rest in Subscription._get_query_types(child):
                yield (head, *rest)

    @staticmethod
    def _matches(node, event_type):
        is_leaf, children = node

        if '*' in children:
            return True

        if not event_type:
            return is_leaf

        head, rest = event_type[0], event_type[1:]

        for i in (head, '?'):
            child = children.get(i)
            if not child:
                continue
            if Subscription._matches(child, rest):
                return True

        return False

    @staticmethod
    def _union(nodes):
        if len(nodes) < 2:
            return nodes[0]

        is_leaf = any(i for i, _ in nodes)

        names = {}
        for _, node_children in nodes:
            for name, node_child in node_children.items():
                if name == '*':
                    return is_leaf, {'*': (True, {})}
                if name not in names:
                    names[name] = collections.deque()
                names[name].append(node_child)

        children = {name: Subscription._union(named_children)
                    for name, named_children in names.items()}

        return is_leaf, children

    @staticmethod
    def _isdisjoint(first_node, second_node):
        first_is_leaf, first_children = first_node
        second_is_leaf, second_children = second_node

        if first_is_leaf and second_is_leaf:
            return False

        if (('*' in first_children and second_children) or
                ('*' in second_children and first_children)):
            return False

        if '?' in first_children:
            for child in second_children.values():
                if not Subscription._isdisjoint(first_children['?'], child):
                    return False

        if '?' in second_children:
            for name, child in first_children.items():
                if name == '?':
                    continue
                if not Subscription._isdisjoint(second_children['?'], child):
                    return False

        names = set(first_children.keys()).intersection(second_children.keys())
        for name in names:
            if name == '?':
                continue
            if not Subscription._isdisjoint(first_children[name],
                                            second_children[name]):
                return False

        return True


class Timestamp(typing.NamedTuple):
    s: int
    """seconds since 1970-01-01 (can be negative)"""
    us: int
    """microseconds added to timestamp seconds in range [0, 1e6)"""

    def __lt__(self, other):
        if not isinstance(other, Timestamp):
            return NotImplemented
        return self.s * 1000000 + self.us < other.s * 1000000 + other.us

    def __gt__(self, other):
        if not isinstance(other, Timestamp):
            return NotImplemented
        return self.s * 1000000 + self.us > other.s * 1000000 + other.us

    def __eq__(self, other):
        if not isinstance(other, Timestamp):
            return NotImplemented
        return self.s * 1000000 + self.us == other.s * 1000000 + other.us

    def __ne__(self, other):
        return not self == other

    def __le__(self, other):
        return self < other or self == other

    def __ge__(self, other):
        return self > other or self == other

    def __hash__(self):
        return self.s * 1000000 + self.us

    def add(self, s: float) -> 'Timestamp':
        """Create new timestamp by adding seconds to existing timestamp"""
        us = self.us + round((s - int(s)) * 1e6)
        s = self.s + int(s)
        return Timestamp(s=s + us // int(1e6),
                         us=us % int(1e6))


def timestamp_to_bytes(t: Timestamp) -> bytes:
    """Convert timestamp to 12 byte representation

    Bytes [0, 8] are big endian unsigned `Timestamp.s` + 2^63 and
    bytes [9, 12] are big endian unsigned `Timestamp.us`.

    """
    return struct.pack(">QI", t.s + (1 << 63), t.us)


def timestamp_from_bytes(data: bytes) -> Timestamp:
    """Create new timestamp from 12 byte representation

    Bytes representation is same as defined for `timestamp_to_bytes` function.

    """
    s, us = struct.unpack(">QI", data)
    return Timestamp(s - (1 << 63), us)


def timestamp_to_float(t: Timestamp) -> float:
    """Convert timestamp to floating number of seconds since 1970-01-01 UTC

    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.

    """
    return t.s + t.us * 1E-6


def timestamp_from_float(ts: float) -> Timestamp:
    """Create timestamp from floating number of seconds since 1970-01-01 UTC

    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.

    """
    s = int(ts)
    if ts < 0:
        s = s - 1
    us = round((ts - s) * 1E6)
    if us == 1000000:
        return Timestamp(s + 1, 0)
    else:
        return Timestamp(s, us)


def timestamp_to_datetime(t: Timestamp) -> datetime.datetime:
    """Convert timestamp to datetime (representing utc time)

    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.

    """
    try:
        dt_from_s = datetime.datetime.fromtimestamp(t.s, datetime.timezone.utc)
    except OSError:
        dt_from_s = (
            datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) +
            datetime.timedelta(seconds=t.s))
    return datetime.datetime(
        year=dt_from_s.year,
        month=dt_from_s.month,
        day=dt_from_s.day,
        hour=dt_from_s.hour,
        minute=dt_from_s.minute,
        second=dt_from_s.second,
        microsecond=t.us,
        tzinfo=datetime.timezone.utc)


def timestamp_from_datetime(dt: datetime.datetime) -> Timestamp:
    """Create new timestamp from datetime

    If `tzinfo` is not set, it is assumed that provided datetime represents
    utc time.

    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.

    """
    if not dt.tzinfo:
        dt = dt.replace(tzinfo=datetime.timezone.utc)
    s = int(dt.timestamp())
    if dt.timestamp() < 0:
        s = s - 1
    return Timestamp(s=s, us=dt.microsecond)


def timestamp_to_sbs(t: Timestamp) -> sbs.Data:
    """Convert timestamp to SBS data"""
    return {'s': t.s, 'us': t.us}


def timestamp_from_sbs(data: sbs.Data) -> Timestamp:
    """Create new timestamp from SBS data"""
    return Timestamp(s=data['s'], us=data['us'])


def now() -> Timestamp:
    """Create new timestamp representing current time"""
    return timestamp_from_datetime(
        datetime.datetime.now(datetime.timezone.utc))


def event_to_sbs(event: Event) -> sbs.Data:
    """Convert Event to SBS data"""
    return {
        'id': _event_id_to_sbs(event.event_id),
        'type': list(event.event_type),
        'timestamp': timestamp_to_sbs(event.timestamp),
        'sourceTimestamp': _optional_to_sbs(event.source_timestamp,
                                            timestamp_to_sbs),
        'payload': _optional_to_sbs(event.payload, event_payload_to_sbs)}


def event_from_sbs(data: sbs.Data) -> Event:
    """Create new Event based on SBS data"""
    return Event(
        event_id=_event_id_from_sbs(data['id']),
        event_type=tuple(data['type']),
        timestamp=timestamp_from_sbs(data['timestamp']),
        source_timestamp=_optional_from_sbs(data['sourceTimestamp'],
                                            timestamp_from_sbs),
        payload=_optional_from_sbs(data['payload'], event_payload_from_sbs))


def register_event_to_sbs(event: RegisterEvent) -> sbs.Data:
    """Convert RegisterEvent to SBS data"""
    return {
        'type': list(event.event_type),
        'sourceTimestamp': _optional_to_sbs(event.source_timestamp,
                                            timestamp_to_sbs),
        'payload': _optional_to_sbs(event.payload, event_payload_to_sbs)}


def register_event_from_sbs(data: sbs.Data) -> RegisterEvent:
    """Create new RegisterEvent based on SBS data"""
    return RegisterEvent(
        event_type=tuple(data['type']),
        source_timestamp=_optional_from_sbs(data['sourceTimestamp'],
                                            timestamp_from_sbs),
        payload=_optional_from_sbs(data['payload'], event_payload_from_sbs))


def query_to_sbs(query: QueryData) -> sbs.Data:
    """Convert QueryData to SBS data"""
    return {
        'ids': _optional_to_sbs(query.event_ids, lambda ids: [
            _event_id_to_sbs(i) for i in ids]),
        'types': _optional_to_sbs(query.event_types, lambda ets: [
            list(et) for et in ets]),
        'tFrom': _optional_to_sbs(query.t_from, timestamp_to_sbs),
        'tTo': _optional_to_sbs(query.t_to, timestamp_to_sbs),
        'sourceTFrom': _optional_to_sbs(query.source_t_from, timestamp_to_sbs),
        'sourceTTo': _optional_to_sbs(query.source_t_to, timestamp_to_sbs),
        'payload': _optional_to_sbs(query.payload, event_payload_to_sbs),
        'order': {Order.DESCENDING: ('descending', None),
                  Order.ASCENDING: ('ascending', None)}[query.order],
        'orderBy': {OrderBy.TIMESTAMP: ('timestamp', None),
                    OrderBy.SOURCE_TIMESTAMP: ('sourceTimestamp', None)
                    }[query.order_by],
        'uniqueType': query.unique_type,
        'maxResults': _optional_to_sbs(query.max_results)}


def query_from_sbs(data: sbs.Data) -> QueryData:
    """Create new QueryData based on SBS data"""
    return QueryData(
        event_ids=_optional_from_sbs(data['ids'], lambda ids: [
            _event_id_from_sbs(i) for i in ids]),
        event_types=_optional_from_sbs(data['types'], lambda ets: [
            tuple(et) for et in ets]),
        t_from=_optional_from_sbs(data['tFrom'], timestamp_from_sbs),
        t_to=_optional_from_sbs(data['tTo'], timestamp_from_sbs),
        source_t_from=_optional_from_sbs(data['sourceTFrom'],
                                         timestamp_from_sbs),
        source_t_to=_optional_from_sbs(data['sourceTTo'], timestamp_from_sbs),
        payload=_optional_from_sbs(data['payload'], event_payload_from_sbs),
        order={'descending': Order.DESCENDING,
               'ascending': Order.ASCENDING}[data['order'][0]],
        order_by={'timestamp': OrderBy.TIMESTAMP,
                  'sourceTimestamp': OrderBy.SOURCE_TIMESTAMP
                  }[data['orderBy'][0]],
        unique_type=data['uniqueType'],
        max_results=_optional_from_sbs(data['maxResults']))


def event_payload_to_sbs(payload: EventPayload) -> sbs.Data:
    """Convert EventPayload to SBS data"""
    if payload.type == EventPayloadType.BINARY:
        return 'binary', payload.data

    if payload.type == EventPayloadType.JSON:
        return 'json', json.encode(payload.data)

    if payload.type == EventPayloadType.SBS:
        return 'sbs', _sbs_data_to_sbs(payload.data)

    raise ValueError('unsupported payload type')


def event_payload_from_sbs(data: sbs.Data) -> EventPayload:
    """Create new EventPayload based on SBS data"""
    data_type, data_data = data

    if data_type == 'binary':
        return EventPayload(type=EventPayloadType.BINARY,
                            data=data_data)

    if data_type == 'json':
        return EventPayload(type=EventPayloadType.JSON,
                            data=json.decode(data_data))

    if data_type == 'sbs':
        return EventPayload(type=EventPayloadType.SBS,
                            data=_sbs_data_from_sbs(data_data))

    raise ValueError('unsupported payload type')


def _event_id_to_sbs(event_id):
    return {'server': event_id.server,
            'instance': event_id.instance}


def _event_id_from_sbs(data):
    return EventId(server=data['server'],
                   instance=data['instance'])


def _sbs_data_to_sbs(data):
    return {'module': _optional_to_sbs(data.module),
            'type': data.type,
            'data': data.data}


def _sbs_data_from_sbs(data):
    return SbsData(module=_optional_from_sbs(data['module']),
                   type=data['type'],
                   data=data['data'])


def _optional_to_sbs(value, fn=lambda i: i):
    return ('Just', fn(value)) if value is not None else ('Nothing', None)


def _optional_from_sbs(data, fn=lambda i: i):
    return fn(data[1]) if data[0] == 'Just' else None

Global variables

var EventType : typing.Type

Event type

var json_schema_repoSchemaRepository

JSON schema repository

var package_path : pathlib.Path

Python package path

var sbs_repo

SBS schema repository

Functions

def event_from_sbs(data: typing.Union[bool, int, float, str, bytes, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')], typing.Tuple[str, ForwardRef('Data')]]) ‑> Event

Create new Event based on SBS data

Expand source code
def event_from_sbs(data: sbs.Data) -> Event:
    """Create new Event based on SBS data"""
    return Event(
        event_id=_event_id_from_sbs(data['id']),
        event_type=tuple(data['type']),
        timestamp=timestamp_from_sbs(data['timestamp']),
        source_timestamp=_optional_from_sbs(data['sourceTimestamp'],
                                            timestamp_from_sbs),
        payload=_optional_from_sbs(data['payload'], event_payload_from_sbs))
def event_payload_from_sbs(data: typing.Union[bool, int, float, str, bytes, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')], typing.Tuple[str, ForwardRef('Data')]]) ‑> EventPayload

Create new EventPayload based on SBS data

Expand source code
def event_payload_from_sbs(data: sbs.Data) -> EventPayload:
    """Create new EventPayload based on SBS data"""
    data_type, data_data = data

    if data_type == 'binary':
        return EventPayload(type=EventPayloadType.BINARY,
                            data=data_data)

    if data_type == 'json':
        return EventPayload(type=EventPayloadType.JSON,
                            data=json.decode(data_data))

    if data_type == 'sbs':
        return EventPayload(type=EventPayloadType.SBS,
                            data=_sbs_data_from_sbs(data_data))

    raise ValueError('unsupported payload type')
def event_payload_to_sbs(payload: EventPayload) ‑> typing.Union[bool, int, float, str, bytes, typing.List[abc.Data], typing.Dict[str, Data], typing.Tuple[str, Data]]

Convert EventPayload to SBS data

Expand source code
def event_payload_to_sbs(payload: EventPayload) -> sbs.Data:
    """Convert EventPayload to SBS data"""
    if payload.type == EventPayloadType.BINARY:
        return 'binary', payload.data

    if payload.type == EventPayloadType.JSON:
        return 'json', json.encode(payload.data)

    if payload.type == EventPayloadType.SBS:
        return 'sbs', _sbs_data_to_sbs(payload.data)

    raise ValueError('unsupported payload type')
def event_to_sbs(event: Event) ‑> typing.Union[bool, int, float, str, bytes, typing.List[abc.Data], typing.Dict[str, Data], typing.Tuple[str, Data]]

Convert Event to SBS data

Expand source code
def event_to_sbs(event: Event) -> sbs.Data:
    """Convert Event to SBS data"""
    return {
        'id': _event_id_to_sbs(event.event_id),
        'type': list(event.event_type),
        'timestamp': timestamp_to_sbs(event.timestamp),
        'sourceTimestamp': _optional_to_sbs(event.source_timestamp,
                                            timestamp_to_sbs),
        'payload': _optional_to_sbs(event.payload, event_payload_to_sbs)}
def matches_query_type(event_type: typing.Tuple[str, ...], query_type: typing.Tuple[str, ...]) ‑> bool

Determine if event type matches query type

Event type is tested if it matches query type according to the following rules:

* Matching is performed on subtypes in increasing order.
* Event type is a match only if all its subtypes are matched by
  corresponding query subtypes.
* Matching is finished when all query subtypes are exhausted.
* Query subtype '?' matches exactly one event subtype of any value.
  The subtype must exist.
* Query subtype '*' matches 0 or more event subtypes of any value. It
  must be the last query subtype.
* All other values of query subtype match exactly one event subtype
  of the same value.
* Query type without subtypes is matched only by event type with no
  subtypes.

As a consequence of aforementioned matching rules, event subtypes '*' and '?' cannot be directly matched and it is advisable not to use them in event types.

Expand source code
def matches_query_type(event_type: EventType,
                       query_type: EventType
                       ) -> bool:
    """Determine if event type matches query type

    Event type is tested if it matches query type according to the following
    rules:

        * Matching is performed on subtypes in increasing order.
        * Event type is a match only if all its subtypes are matched by
          corresponding query subtypes.
        * Matching is finished when all query subtypes are exhausted.
        * Query subtype '?' matches exactly one event subtype of any value.
          The subtype must exist.
        * Query subtype '*' matches 0 or more event subtypes of any value. It
          must be the last query subtype.
        * All other values of query subtype match exactly one event subtype
          of the same value.
        * Query type without subtypes is matched only by event type with no
          subtypes.

    As a consequence of aforementioned matching rules, event subtypes '*' and
    '?' cannot be directly matched and it is advisable not to use them in event
    types.

    """
    is_variable = bool(query_type and query_type[-1] == '*')
    if is_variable:
        query_type = query_type[:-1]

    if len(event_type) < len(query_type):
        return False

    if len(event_type) > len(query_type) and not is_variable:
        return False

    for i, j in zip(event_type, query_type):
        if j != '?' and i != j:
            return False

    return True
def now() ‑> Timestamp

Create new timestamp representing current time

Expand source code
def now() -> Timestamp:
    """Create new timestamp representing current time"""
    return timestamp_from_datetime(
        datetime.datetime.now(datetime.timezone.utc))
def query_from_sbs(data: typing.Union[bool, int, float, str, bytes, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')], typing.Tuple[str, ForwardRef('Data')]]) ‑> QueryData

Create new QueryData based on SBS data

Expand source code
def query_from_sbs(data: sbs.Data) -> QueryData:
    """Create new QueryData based on SBS data"""
    return QueryData(
        event_ids=_optional_from_sbs(data['ids'], lambda ids: [
            _event_id_from_sbs(i) for i in ids]),
        event_types=_optional_from_sbs(data['types'], lambda ets: [
            tuple(et) for et in ets]),
        t_from=_optional_from_sbs(data['tFrom'], timestamp_from_sbs),
        t_to=_optional_from_sbs(data['tTo'], timestamp_from_sbs),
        source_t_from=_optional_from_sbs(data['sourceTFrom'],
                                         timestamp_from_sbs),
        source_t_to=_optional_from_sbs(data['sourceTTo'], timestamp_from_sbs),
        payload=_optional_from_sbs(data['payload'], event_payload_from_sbs),
        order={'descending': Order.DESCENDING,
               'ascending': Order.ASCENDING}[data['order'][0]],
        order_by={'timestamp': OrderBy.TIMESTAMP,
                  'sourceTimestamp': OrderBy.SOURCE_TIMESTAMP
                  }[data['orderBy'][0]],
        unique_type=data['uniqueType'],
        max_results=_optional_from_sbs(data['maxResults']))
def query_to_sbs(query: QueryData) ‑> typing.Union[bool, int, float, str, bytes, typing.List[abc.Data], typing.Dict[str, Data], typing.Tuple[str, Data]]

Convert QueryData to SBS data

Expand source code
def query_to_sbs(query: QueryData) -> sbs.Data:
    """Convert QueryData to SBS data"""
    return {
        'ids': _optional_to_sbs(query.event_ids, lambda ids: [
            _event_id_to_sbs(i) for i in ids]),
        'types': _optional_to_sbs(query.event_types, lambda ets: [
            list(et) for et in ets]),
        'tFrom': _optional_to_sbs(query.t_from, timestamp_to_sbs),
        'tTo': _optional_to_sbs(query.t_to, timestamp_to_sbs),
        'sourceTFrom': _optional_to_sbs(query.source_t_from, timestamp_to_sbs),
        'sourceTTo': _optional_to_sbs(query.source_t_to, timestamp_to_sbs),
        'payload': _optional_to_sbs(query.payload, event_payload_to_sbs),
        'order': {Order.DESCENDING: ('descending', None),
                  Order.ASCENDING: ('ascending', None)}[query.order],
        'orderBy': {OrderBy.TIMESTAMP: ('timestamp', None),
                    OrderBy.SOURCE_TIMESTAMP: ('sourceTimestamp', None)
                    }[query.order_by],
        'uniqueType': query.unique_type,
        'maxResults': _optional_to_sbs(query.max_results)}
def register_event_from_sbs(data: typing.Union[bool, int, float, str, bytes, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')], typing.Tuple[str, ForwardRef('Data')]]) ‑> RegisterEvent

Create new RegisterEvent based on SBS data

Expand source code
def register_event_from_sbs(data: sbs.Data) -> RegisterEvent:
    """Create new RegisterEvent based on SBS data"""
    return RegisterEvent(
        event_type=tuple(data['type']),
        source_timestamp=_optional_from_sbs(data['sourceTimestamp'],
                                            timestamp_from_sbs),
        payload=_optional_from_sbs(data['payload'], event_payload_from_sbs))
def register_event_to_sbs(event: RegisterEvent) ‑> typing.Union[bool, int, float, str, bytes, typing.List[abc.Data], typing.Dict[str, Data], typing.Tuple[str, Data]]

Convert RegisterEvent to SBS data

Expand source code
def register_event_to_sbs(event: RegisterEvent) -> sbs.Data:
    """Convert RegisterEvent to SBS data"""
    return {
        'type': list(event.event_type),
        'sourceTimestamp': _optional_to_sbs(event.source_timestamp,
                                            timestamp_to_sbs),
        'payload': _optional_to_sbs(event.payload, event_payload_to_sbs)}
def timestamp_from_bytes(data: bytes) ‑> Timestamp

Create new timestamp from 12 byte representation

Bytes representation is same as defined for timestamp_to_bytes() function.

Expand source code
def timestamp_from_bytes(data: bytes) -> Timestamp:
    """Create new timestamp from 12 byte representation

    Bytes representation is same as defined for `timestamp_to_bytes` function.

    """
    s, us = struct.unpack(">QI", data)
    return Timestamp(s - (1 << 63), us)
def timestamp_from_datetime(dt: datetime.datetime) ‑> Timestamp

Create new timestamp from datetime

If tzinfo is not set, it is assumed that provided datetime represents utc time.

For precise serialization see timestamp_to_bytes()/timestamp_from_bytes().

Expand source code
def timestamp_from_datetime(dt: datetime.datetime) -> Timestamp:
    """Create new timestamp from datetime

    If `tzinfo` is not set, it is assumed that provided datetime represents
    utc time.

    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.

    """
    if not dt.tzinfo:
        dt = dt.replace(tzinfo=datetime.timezone.utc)
    s = int(dt.timestamp())
    if dt.timestamp() < 0:
        s = s - 1
    return Timestamp(s=s, us=dt.microsecond)
def timestamp_from_float(ts: float) ‑> Timestamp

Create timestamp from floating number of seconds since 1970-01-01 UTC

For precise serialization see timestamp_to_bytes()/timestamp_from_bytes().

Expand source code
def timestamp_from_float(ts: float) -> Timestamp:
    """Create timestamp from floating number of seconds since 1970-01-01 UTC

    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.

    """
    s = int(ts)
    if ts < 0:
        s = s - 1
    us = round((ts - s) * 1E6)
    if us == 1000000:
        return Timestamp(s + 1, 0)
    else:
        return Timestamp(s, us)
def timestamp_from_sbs(data: typing.Union[bool, int, float, str, bytes, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')], typing.Tuple[str, ForwardRef('Data')]]) ‑> Timestamp

Create new timestamp from SBS data

Expand source code
def timestamp_from_sbs(data: sbs.Data) -> Timestamp:
    """Create new timestamp from SBS data"""
    return Timestamp(s=data['s'], us=data['us'])
def timestamp_to_bytes(t: Timestamp) ‑> bytes

Convert timestamp to 12 byte representation

Bytes [0, 8] are big endian unsigned Timestamp.s + 2^63 and bytes [9, 12] are big endian unsigned Timestamp.us.

Expand source code
def timestamp_to_bytes(t: Timestamp) -> bytes:
    """Convert timestamp to 12 byte representation

    Bytes [0, 8] are big endian unsigned `Timestamp.s` + 2^63 and
    bytes [9, 12] are big endian unsigned `Timestamp.us`.

    """
    return struct.pack(">QI", t.s + (1 << 63), t.us)
def timestamp_to_datetime(t: Timestamp) ‑> datetime.datetime

Convert timestamp to datetime (representing utc time)

For precise serialization see timestamp_to_bytes()/timestamp_from_bytes().

Expand source code
def timestamp_to_datetime(t: Timestamp) -> datetime.datetime:
    """Convert timestamp to datetime (representing utc time)

    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.

    """
    try:
        dt_from_s = datetime.datetime.fromtimestamp(t.s, datetime.timezone.utc)
    except OSError:
        dt_from_s = (
            datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) +
            datetime.timedelta(seconds=t.s))
    return datetime.datetime(
        year=dt_from_s.year,
        month=dt_from_s.month,
        day=dt_from_s.day,
        hour=dt_from_s.hour,
        minute=dt_from_s.minute,
        second=dt_from_s.second,
        microsecond=t.us,
        tzinfo=datetime.timezone.utc)
def timestamp_to_float(t: Timestamp) ‑> float

Convert timestamp to floating number of seconds since 1970-01-01 UTC

For precise serialization see timestamp_to_bytes()/timestamp_from_bytes().

Expand source code
def timestamp_to_float(t: Timestamp) -> float:
    """Convert timestamp to floating number of seconds since 1970-01-01 UTC

    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.

    """
    return t.s + t.us * 1E-6
def timestamp_to_sbs(t: Timestamp) ‑> typing.Union[bool, int, float, str, bytes, typing.List[abc.Data], typing.Dict[str, Data], typing.Tuple[str, Data]]

Convert timestamp to SBS data

Expand source code
def timestamp_to_sbs(t: Timestamp) -> sbs.Data:
    """Convert timestamp to SBS data"""
    return {'s': t.s, 'us': t.us}

Classes

class Event (event_id: EventId, event_type: typing.Tuple[str, ...], timestamp: Timestamp, source_timestamp: typing.Union[ForwardRef('Timestamp'), NoneType], payload: typing.Union[EventPayload, NoneType])

Event(event_id, event_type, timestamp, source_timestamp, payload)

Expand source code
class Event(typing.NamedTuple):
    event_id: EventId
    event_type: EventType
    timestamp: 'Timestamp'
    source_timestamp: typing.Optional['Timestamp']
    payload: typing.Optional[EventPayload]

Ancestors

  • builtins.tuple

Instance variables

var event_idEventId

Alias for field number 0

var event_type : typing.Tuple[str, ...]

Alias for field number 1

var payload : typing.Union[EventPayload, NoneType]

Alias for field number 4

var source_timestamp : typing.Union[Timestamp, NoneType]

Alias for field number 3

var timestampTimestamp

Alias for field number 2

class EventId (server: int, instance: int)

EventId(server, instance)

Expand source code
class EventId(typing.NamedTuple):
    server: int
    """server identifier"""
    instance: int
    """event instance identifier"""

Ancestors

  • builtins.tuple

Instance variables

var instance : int

event instance identifier

var server : int

server identifier

class EventPayload (type: EventPayloadType, data: typing.Union[bytes, NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')], ForwardRef('SbsData')])

EventPayload(type, data)

Expand source code
class EventPayload(typing.NamedTuple):
    type: EventPayloadType
    data: typing.Union[bytes, json.Data, 'SbsData']

Ancestors

  • builtins.tuple

Instance variables

var data

Alias for field number 1

var type

Alias for field number 0

class EventPayloadType (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Ancestors

  • enum.Enum

Class variables

var BINARY
var JSON
var SBS
class Order (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Ancestors

  • enum.Enum

Class variables

var ASCENDING
var DESCENDING
class OrderBy (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Ancestors

  • enum.Enum

Class variables

var SOURCE_TIMESTAMP
var TIMESTAMP
class QueryData (event_ids: typing.Union[typing.List[EventId], NoneType] = None, event_types: typing.Union[typing.List[typing.Tuple[str, ...]], NoneType] = None, t_from: typing.Union[ForwardRef('Timestamp'), NoneType] = None, t_to: typing.Union[ForwardRef('Timestamp'), NoneType] = None, source_t_from: typing.Union[ForwardRef('Timestamp'), NoneType] = None, source_t_to: typing.Union[ForwardRef('Timestamp'), NoneType] = None, payload: typing.Union[EventPayload, NoneType] = None, order: Order = Order.DESCENDING, order_by: OrderBy = OrderBy.TIMESTAMP, unique_type: bool = False, max_results: typing.Union[int, NoneType] = None)

QueryData(event_ids, event_types, t_from, t_to, source_t_from, source_t_to, payload, order, order_by, unique_type, max_results)

Expand source code
class QueryData(typing.NamedTuple):
    event_ids: typing.Optional[typing.List[EventId]] = None
    event_types: typing.Optional[typing.List[EventType]] = None
    t_from: typing.Optional['Timestamp'] = None
    t_to: typing.Optional['Timestamp'] = None
    source_t_from: typing.Optional['Timestamp'] = None
    source_t_to: typing.Optional['Timestamp'] = None
    payload: typing.Optional[EventPayload] = None
    order: Order = Order.DESCENDING
    order_by: OrderBy = OrderBy.TIMESTAMP
    unique_type: bool = False
    max_results: typing.Optional[int] = None

Ancestors

  • builtins.tuple

Instance variables

var event_ids : typing.Union[typing.List[EventId], NoneType]

Alias for field number 0

var event_types : typing.Union[typing.List[typing.Tuple[str, ...]], NoneType]

Alias for field number 1

var max_results : typing.Union[int, NoneType]

Alias for field number 10

var orderOrder

Alias for field number 7

var order_byOrderBy

Alias for field number 8

var payload : typing.Union[EventPayload, NoneType]

Alias for field number 6

var source_t_from : typing.Union[Timestamp, NoneType]

Alias for field number 4

var source_t_to : typing.Union[Timestamp, NoneType]

Alias for field number 5

var t_from : typing.Union[Timestamp, NoneType]

Alias for field number 2

var t_to : typing.Union[Timestamp, NoneType]

Alias for field number 3

var unique_type : bool

Alias for field number 9

class RegisterEvent (event_type: typing.Tuple[str, ...], source_timestamp: typing.Union[ForwardRef('Timestamp'), NoneType], payload: typing.Union[EventPayload, NoneType])

RegisterEvent(event_type, source_timestamp, payload)

Expand source code
class RegisterEvent(typing.NamedTuple):
    event_type: EventType
    source_timestamp: typing.Optional['Timestamp']
    payload: typing.Optional[EventPayload]

Ancestors

  • builtins.tuple

Instance variables

var event_type : typing.Tuple[str, ...]

Alias for field number 0

var payload : typing.Union[EventPayload, NoneType]

Alias for field number 2

var source_timestamp : typing.Union[Timestamp, NoneType]

Alias for field number 1

class SbsData (module: typing.Union[str, NoneType], type: str, data: bytes)

SbsData(module, type, data)

Expand source code
class SbsData(typing.NamedTuple):
    module: typing.Optional[str]
    """SBS module name"""
    type: str
    """SBS type name"""
    data: bytes

Ancestors

  • builtins.tuple

Instance variables

var data : bytes

Alias for field number 2

var module : typing.Union[str, NoneType]

SBS module name

var type : str

SBS type name

class Subscription (query_types: typing.Iterable[typing.Tuple[str, ...]])

Subscription defined by query event types

Expand source code
class Subscription:
    """Subscription defined by query event types"""

    _Node = typing.Tuple[bool,                  # is_leaf
                         typing.Dict[str,       # subtype
                                     '_Node']]  # child

    def __init__(self, query_types: typing.Iterable[EventType]):
        self._root = False, {}
        for query_type in query_types:
            self._root = Subscription._add_query_type(self._root, query_type)

    def get_query_types(self) -> typing.Iterable[EventType]:
        """Calculate sanitized query event types"""
        yield from Subscription._get_query_types(self._root)

    def matches(self, event_type: EventType) -> bool:
        """Does `event_type` match subscription"""
        return Subscription._matches(self._root, event_type)

    def union(self, *others: 'Subscription') -> 'Subscription':
        """Create new subscription including event types from this and
        other subscriptions."""
        result = Subscription([])
        result._root = Subscription._union(
            [self._root, *(other._root for other in others)])
        return result

    def isdisjoint(self, other: 'Subscription') -> bool:
        """Return ``True`` if this subscription has no event types in common
        with other subscription."""
        return Subscription._isdisjoint(self._root, other._root)

    @staticmethod
    def _add_query_type(node, query_type):
        is_leaf, children = node

        if '*' in children:
            return node

        if not query_type:
            return True, children

        head, rest = query_type[0], query_type[1:]

        if head == '*':
            if rest:
                raise ValueError('invalid query event type')
            children.clear()
            children['*'] = True, {}

        else:
            child = children.get(head, (False, {}))
            child = Subscription._add_query_type(child, rest)
            children[head] = child

        return node

    @staticmethod
    def _get_query_types(node):
        is_leaf, children = node

        if is_leaf and '*' not in children:
            yield ()

        for head, child in children.items():
            for rest in Subscription._get_query_types(child):
                yield (head, *rest)

    @staticmethod
    def _matches(node, event_type):
        is_leaf, children = node

        if '*' in children:
            return True

        if not event_type:
            return is_leaf

        head, rest = event_type[0], event_type[1:]

        for i in (head, '?'):
            child = children.get(i)
            if not child:
                continue
            if Subscription._matches(child, rest):
                return True

        return False

    @staticmethod
    def _union(nodes):
        if len(nodes) < 2:
            return nodes[0]

        is_leaf = any(i for i, _ in nodes)

        names = {}
        for _, node_children in nodes:
            for name, node_child in node_children.items():
                if name == '*':
                    return is_leaf, {'*': (True, {})}
                if name not in names:
                    names[name] = collections.deque()
                names[name].append(node_child)

        children = {name: Subscription._union(named_children)
                    for name, named_children in names.items()}

        return is_leaf, children

    @staticmethod
    def _isdisjoint(first_node, second_node):
        first_is_leaf, first_children = first_node
        second_is_leaf, second_children = second_node

        if first_is_leaf and second_is_leaf:
            return False

        if (('*' in first_children and second_children) or
                ('*' in second_children and first_children)):
            return False

        if '?' in first_children:
            for child in second_children.values():
                if not Subscription._isdisjoint(first_children['?'], child):
                    return False

        if '?' in second_children:
            for name, child in first_children.items():
                if name == '?':
                    continue
                if not Subscription._isdisjoint(second_children['?'], child):
                    return False

        names = set(first_children.keys()).intersection(second_children.keys())
        for name in names:
            if name == '?':
                continue
            if not Subscription._isdisjoint(first_children[name],
                                            second_children[name]):
                return False

        return True

Methods

def get_query_types(self) ‑> typing.Iterable[typing.Tuple[str, ...]]

Calculate sanitized query event types

Expand source code
def get_query_types(self) -> typing.Iterable[EventType]:
    """Calculate sanitized query event types"""
    yield from Subscription._get_query_types(self._root)
def isdisjoint(self, other: Subscription) ‑> bool

Return True if this subscription has no event types in common with other subscription.

Expand source code
def isdisjoint(self, other: 'Subscription') -> bool:
    """Return ``True`` if this subscription has no event types in common
    with other subscription."""
    return Subscription._isdisjoint(self._root, other._root)
def matches(self, event_type: typing.Tuple[str, ...]) ‑> bool

Does event_type match subscription

Expand source code
def matches(self, event_type: EventType) -> bool:
    """Does `event_type` match subscription"""
    return Subscription._matches(self._root, event_type)
def union(self, *others: Subscription) ‑> Subscription

Create new subscription including event types from this and other subscriptions.

Expand source code
def union(self, *others: 'Subscription') -> 'Subscription':
    """Create new subscription including event types from this and
    other subscriptions."""
    result = Subscription([])
    result._root = Subscription._union(
        [self._root, *(other._root for other in others)])
    return result
class Timestamp (s: int, us: int)

Timestamp(s, us)

Expand source code
class Timestamp(typing.NamedTuple):
    s: int
    """seconds since 1970-01-01 (can be negative)"""
    us: int
    """microseconds added to timestamp seconds in range [0, 1e6)"""

    def __lt__(self, other):
        if not isinstance(other, Timestamp):
            return NotImplemented
        return self.s * 1000000 + self.us < other.s * 1000000 + other.us

    def __gt__(self, other):
        if not isinstance(other, Timestamp):
            return NotImplemented
        return self.s * 1000000 + self.us > other.s * 1000000 + other.us

    def __eq__(self, other):
        if not isinstance(other, Timestamp):
            return NotImplemented
        return self.s * 1000000 + self.us == other.s * 1000000 + other.us

    def __ne__(self, other):
        return not self == other

    def __le__(self, other):
        return self < other or self == other

    def __ge__(self, other):
        return self > other or self == other

    def __hash__(self):
        return self.s * 1000000 + self.us

    def add(self, s: float) -> 'Timestamp':
        """Create new timestamp by adding seconds to existing timestamp"""
        us = self.us + round((s - int(s)) * 1e6)
        s = self.s + int(s)
        return Timestamp(s=s + us // int(1e6),
                         us=us % int(1e6))

Ancestors

  • builtins.tuple

Instance variables

var s : int

seconds since 1970-01-01 (can be negative)

var us : int

microseconds added to timestamp seconds in range [0, 1e6)

Methods

def add(self, s: float) ‑> Timestamp

Create new timestamp by adding seconds to existing timestamp

Expand source code
def add(self, s: float) -> 'Timestamp':
    """Create new timestamp by adding seconds to existing timestamp"""
    us = self.us + round((s - int(s)) * 1e6)
    s = self.s + int(s)
    return Timestamp(s=s + us // int(1e6),
                     us=us % int(1e6))