Juggler communication protocol

Juggler is communication protocol used for communication between back-end and GUI front-end parts of components. As underlying protocol, Juggler uses WebSocket communication and text messages encoding JSON data. Communication between peers is symmetrical full duplex - once client and server establish WebSocket communication, each party can send messages independently of other and no further distinction between client and server is made.

Model

As prerequisite for Juggler communication, each peer should implement data model consisting of local and remote data. Data can be arbitrary application specific JSON serializable data.

Initially, null value is assumed for both local and remote data.

Each peer can freely change local data. Once local data is changed, JSON patch between previous and current local data is generated and transmitted to other peer. Upon receiving data change, other peer applies JSON patch to it’s remote data and notifies of remote data change. This enables continuous synchronization of local and remote data on both peers.

Communication

Juggler provides communication based on continuous synchronization of JSON data and communication based on asynchronous message passing.

Local and remote data synchronization assumes existence of time delayed caching implementation. Once local data changes, peer should wait for predefined time period prior to sending changes to other peer. In this time period, peer expects other local data changes which should all be delivered as single change to other peer. This caching provides optimization of message number and size of data exchanged between peers. At same time, additional communication latency is introduced.

When transfer of data as they are is required, asynchronous message passing should be used. By explicitly sending JSON messages, each peer can transmit arbitrary data. This communication assumes that no additional caching is done by Juggler implementation. Asynchronous message passing is independent of continuous data synchronization communication.

Messages

All messages are UTF-8 encoded JSON data defined by following JSON Schema:

type: object
required:
    - type
    - payload
properties:
    type:
        enum:
            - DATA
            - MESSAGE

DATA messages are used for continuous synchronization of JSON data. It’s payload parameter contains JSON patch value as defined by https://tools.ietf.org/html/rfc6902. This messages are sent by peer once local data changes.

MESSAGE messages provide asynchronous message passing communication.

Python implementation

Juggler communication protocol

This module implements basic communication infrastructure used for communication between back-end and GUI front-end parts of Hat components.

Example usage of Juggler communication:

srv_conn = None

def on_connection(conn):
    global srv_conn
    srv_conn = conn

srv = await listen('127.0.0.1', 1234, on_connection)

conn = await connect('ws://127.0.0.1:1234/ws')

conn.set_local_data(123)

await asyncio.sleep(0.3)

assert srv_conn.remote_data == conn.local_data

await conn.async_close()
await srv.async_close()
hat.juggler.mlog

module logger

Type

logging.Logger

async hat.juggler.connect(address, *, autoflush_delay=0.2)

Connect to remote server

Address represents remote WebSocket URL formated as <schema>://<host>:<port>/<path> where <schema> is ws or wss.

Argument autoflush_delay defines maximum time delay for automatic synchronization of local_data changes. If autoflush_delay is set to None, automatic synchronization is disabled and user is responsible for calling Connection.flush_local_data(). If autoflush_delay is set to 0, synchronization of local_data is performed on each change of local_data.

Parameters
  • address (str) – remote server address

  • autoflush_delay (Optional[float]) – autoflush delay

Return type

hat.juggler.Connection

async hat.juggler.listen(host, port, connection_cb, *, ws_path='/ws', static_dir=None, index_path='/index.html', pem_file=None, autoflush_delay=0.2, shutdown_timeout=0.1)

Create listening server

Each time server receives new incomming juggler connection, connection_cb is called with newly created connection.

If static_dir is set, server serves static files is addition to providing juggler communication.

If index_path is set, request for url path / are redirected to index_path.

If pem_file is set, server provides https/wss communication instead of http/ws communication.

Argument autoflush_delay is associated with all connections associated with this server (see connect()).

shutdown_timeout defines maximum time duration server will wait for regular connection close procedures during server shutdown. All connections that are not closed during this period are forcefully closed.

Parameters
  • host (str) – listening hostname

  • port (int) – listening TCP port

  • connection_cb (Callable[[Connection], None]) – connection callback

  • ws_path (str) – WebSocket url path segment

  • static_dir (Optional[pathlib.Path]) – static files directory path

  • index_path (Optional[str]) – index path

  • pem_file (Optional[pathlib.Path]) – PEM file path

  • autoflush_delay (Optional[float]) – autoflush delay

  • shutdown_timeout (float) – shutdown timeout

Return type

hat.juggler.Server

class hat.juggler.Server

Bases: object

For creating new server see listen().

property closed

Closed future

async async_close()

Async close server and all active connections

class hat.juggler.Connection

Bases: object

For creating new connection see connect().

property closed

Closed future

property local_data

Local data

property remote_data

remote data

register_change_cb(cb)

Register remote data change callback

Parameters

cb (Callable[[], None]) –

Return type

hat.util.common.RegisterCallbackHandle

async async_close()

Async close

set_local_data(data)

Set local data

Raises

ConnectionError

Parameters

data (Union[None, bool, int, float, str, List[Data], Dict[str, Data]]) –

async flush_local_data()

Force synchronization of local data

Raises

ConnectionError

async send(msg)

Send message

Raises

ConnectionError

Parameters

msg (Union[None, bool, int, float, str, List[Data], Dict[str, Data]]) –

async receive()

Receive message

Raises

ConnectionError

Return type

Union[None, bool, int, float, str, List[Data], Dict[str, Data]]