Chatter communication protocol

Hat defines a communication infrastructure used by Hat back-end components. This communication is based on the TCP communication protocol (with the option of applying an SSL layer on top of TCP) and messages encoded using SBS. The underlying stream is segmented into blocks. Each block contains a single communication message prefixed by its message header. This header consists of 1+m bytes where the first byte contains the value m (from 0 to 255, or header byte length), while the m bytes value determines the number of following message bytes (k) encoded as big-endian (the first byte value does not include the first header byte and message length does not include header length) which contain the actual body of the message.

Visualization of the communication stream:

address    ...  |  n  | n+1 |  ...  | n+m | n+m+1 |  ...  | n+m+k |  ...
         -------+-----+-----+-------+-----+-------+-------+-------+-------
   data    ...  |  m  |         k         |        message        |  ...
         -------+-----+-----+-------+-----+-------+-------+-------+-------

where:

  • m is byte length of k

  • k is byte length of message (n+1 is the most significant byte)

  • message is SBS encoded message data.

SBS data definitions are available in the directory schemas_sbs of the hat-open repository. The file structure of this directory mimics SBS modules available in .sbs definition files. The base module for all Hat messages is Hat. Message data definition, defined by service (see Services), is available in additional modules whose names are usually prefixed with Hat.

All Communication is peer-to-peer based. One peer connects to another and once the connection is achieved, each peer can send messages to the other peer in full duplex fashion. Each peer can terminate the connection if necessary. Most communication errors (including data serialization errors) will result in connection termination.

Message

Communication message is defined by SBS Hat.Msg containing:

  • unique message identifier (id)

  • conversation descriptor (first, owner, token, last)

  • message data (data) determined by type identifier (module, ‘type’)

Message identifier is a number which uniquely identifies a peer’s message. When a new connection is established, each peer bounds a counter to the connection. Upon sending a new message, this counter is incremented and used as the new message’s identifier.

Conversation descriptor consists of a set of parameters which bind the message to a specific conversation (see Conversation).

Message data consists of bytes that represent SBS encoded data with additional SBS module and SBS type identifiers.

SBS message schema:

module Hat

Msg = Tuple {
    id:     Integer
    first:  Integer
    owner:  Boolean
    token:  Boolean
    last:   Boolean
    data:   Data
}

Data = Tuple {
    module:  Maybe(String)
    type:    String
    data:    Bytes
}

Conversation

A Conversation is defined as an ordered finite sequence of messages. Conversations are used for short ordered exchange of messages between two peers. Each peer can start a new conversation and finalize an existing conversation. Multiple simultaneously active conversations can be bound to a single active connection. A basic example for conversation usage is a request-response message exchange between peers, implementing operation timeouts.

Each message contains its conversation descriptor which contains:

  • conversation’s first message identifier (first)

  • owner flag (owner)

  • token passing flag (token)

  • last message flag (last)

A Conversation’s first message identifier contains the message identifier of the message which started the conversation. If this identifier is equal to the current message’s identifier, the message is the first message in the conversation.

The peer who initiates a conversation (sends the first message in the conversation) is considered to be the conversation’s owner. A message’s owner flag (owner) is set to true only if the peer sending the message is also the conversation’s owner.

The token passing flag (token) is set to true if this message is the last message this peer will send prior to expecting messages (bound to the current conversation) from the other peer. This behavior is an implementation of the token passing mechanism. Initially, the token is given to the peer who initiates the conversation. This peer can pass the token to the other peer by setting the token passing flag (token) to true. Only the peer currently holding the token can send messages bound the conversation.

The last message flag (last) is set to true only if the message is the last message in the current conversation. Once this message is sent, the conversation can not be used to send new messages. When the last flag (last) is set to true, the value of the token passing flag is ignored.

Each conversation is uniquely identified by its first message identifier and owner flag.

Addressing

All end-points using Chatter are uniquely identified by a string identifier based on URI definition (RFC 3986). This identifier is formatted as <scheme>://<host>:<port> where

  • <scheme> - one of tcp+sbs, ssl+sbs

  • <host> - remote host’s name

  • <port> - remote tcp port

Services

A Service is a self contained set of functionalities bound to a connection. Each connection can have multiple associated Services. Each Service has a predefined set of message type identifiers available for custom message definition. These message types are defined in the Service’s appropriate SBS definition file in the schemas_sbs directory.

Ping

The Ping communication Service is responsible for detecting a closed connection. This Service periodically sends a Ping request and waits for a corresponding Pong response. If the response isn’t received in a defined timeout period (if the Conversation’s timeout is exceeded), the connection is closed.

This is a peer-to-peer Service (there is no distinction between client and server).

The default ping timeout is 30 seconds.

The default conversation timeout period is 5 seconds.

The Ping Service is implemented as an integrated part of hat.protocol.Connection.

A Ping - Pong Conversation between peers contains these messages:

Message

Conversation

Direction

First

Last

ping

T

F

p1 > p2

pong

F

T

p2 > p1

where ‘p1’ and ‘p2’ are two communicating peers.

SBS schema:

module HatPing

MsgPing = None

MsgPong = None

Python implementation

Chatter communication protocol

This module implements basic communication infrastructure used for Hat communication. Hat communication is based on multiple loosely coupled services. To implement communication with other Hat components, user should always implement independent communication service (or use one of predefined services).

hat.chatter.mlog

module logger

Type

logging.Logger

hat.chatter.sbs_repo

chatter message definition SBS repository

Type

sbs.Repository

class hat.chatter.Msg(conn, data, conv, first, last, token)

Bases: tuple

Received message

Create new instance of Msg(conn, data, conv, first, last, token)

conn

connection

Type

Connection

conv

conversation

Type

Conversation

data

data

Type

Data

first

first flag

Type

bool

last

last flag

Type

bool

token

token flag

Type

bool

class hat.chatter.Data(module, type, data)

Bases: tuple

Message data

Create new instance of Data(module, type, data)

data

data

Type

sbs.Data

module

SBS module name

Type

Optional[str]

type

SBS type name

Type

str

class hat.chatter.Conversation(conn, owner, first_id)

Bases: tuple

Create new instance of Conversation(conn, owner, first_id)

conn

connection

Type

Connection

first_id

first message id

Type

int

owner

owner flag

Type

bool

exception hat.chatter.ConnectionClosedError

Bases: Exception

Error signaling closed connection

async hat.chatter.connect(sbs_repo, address, *, pem_file=None, ping_timeout=20, connect_timeout=5, queue_maxsize=0)

Connect to remote server

sbs_repo should include hat.chatter.sbs_repo and aditional message data definitions.

Address is string formatted as <scheme>://<host>:<port> where

  • <scheme> - one of tcp+sbs, ssl+sbs

  • <host> - remote host’s name

  • <port> - remote tcp port.

PEM file is used only for ssl connection. If PEM file is not defined, certificate’s authenticity is not established.

If ping_timeout is None or 0, ping service is not registered.

Parameters
  • sbs_repo (hat.sbs.Repository) – chatter SBS repository

  • address (str) – address

  • pem_file (Optional[str]) – path to pem file

  • ping_timeout (float) – ping timeout in seconds

  • connect_timeout (float) – connect timeout in seconds

  • queue_maxsize (int) – receive message queue maximum size (0 - unlimited)

Returns

newly created connection

Return type

Connection

Raises
  • OSError – could not connect to specified address

  • ValueError – wrong address format

  • socket.gaierror – unknown host name

  • asyncio.TimeoutError – connect timeout

async hat.chatter.listen(sbs_repo, address, on_connection_cb, *, pem_file=None, ping_timeout=20, queue_maxsize=0)

Create listening server.

sbs_repo is same as for connect().

Address is same as for connect().

If ssl connection is used, pem_file is required.

If ping_timeout is None or 0, ping service is not registered.

Parameters
  • sbs_repo (hat.sbs.Repository) – chatter SBS repository

  • address (str) – address

  • on_connection_cb (Callable[[Connection], None]) – on connection callback

  • pem_file (Optional[str]) – path to pem file

  • ping_timeout (float) – ping timeout in seconds

  • queue_maxsize (int) – receive message queue maximum size (0 - unlimited)

Returns

Server

Raises
  • OSError – could not listen on specified address

  • ValueError – wrong address format

  • socket.gaierror – unknown host name

class hat.chatter.Server

Bases: object

For creating new server see listen().

property closed

closed future

Type

asyncio.Future

property addresses

listening addresses

Type

List[str]

async async_close()

Close server and all associated connections

class hat.chatter.Connection

Bases: object

Single connection

For creating new connection see connect().

property local_address

Local address

Type

str

property remote_address

Remote address

Type

str

property closed

closed future

Type

asyncio.Future

async async_close()

Async close

async receive()

Receive incomming message

Returns

Msg

Raises

ConnectionClosedError

send(msg_data, *, conv=None, last=True, token=True, timeout=None, timeout_cb=None)

Send message

Conversation timeout callbacks are triggered only for opened connection. Once connection is closed, all active conversations are closed without triggering timeout callbacks.

Sending message on closed connection will silently discard message.

Parameters
  • msg_data (Data) – message data

  • conv (Optional[Conversation]) – existing conversation or None for new conversation

  • last (bool) – conversation’s last flag

  • token (bool) – conversation’s token flag

  • timeout (Optional[float]) – conversation timeout in seconds or None for unlimited timeout

  • conv_timeout_cb (Optional[Callable[[Conversation],None]]) – conversation timeout callback

Returns

Conversation

Raises
  • ConnectionClosedError

  • Exception