Python utility library

Package hat-util provides common utility functions not available as part of Python standard library. These function are available as parts of modules:

  • hat.util.common

    Extensions of basic builtin python data types.

  • hat.util.aio

    Utility functions and data structures providing enhancment of asyncio library.

  • hat.util.json

    Functions related to serialization of JSON data and validation based on JSON Schemas.

  • hat.util.qt

    Parallel execution of Qt and asyncio threads.

hat.util.common

Common utility functions

hat.util.common.first(xs, fn=<function <lambda>>, default=None)

Return the first element from iterable that satisfies predicate fn, or default if no such element exists.

Parameters
  • xs (Iterable[T]) – collection

  • fn (Callable[[T], bool]) – predicate

  • default (Optional[T]) – default value

Return type

Optional[T]

hat.util.common.namedtuple(name, *field_props)

Create a documented Type[collections.namedtuple]`.

The name can be a string; or a tuple containing name and documentation.

The field_props are an iterable of tuple field properties. A property can be a field name; a tuple of field name and documentation; or a tuple of field name, documentation and default value.

Parameters
  • name (Union[str, Tuple[str, str]]) – tuple name with optional documentation

  • field_props (Iterable[Union[str, Tuple[str, str], Tuple[str, str, Any]]]) – tuple field properties

Return type

Type[collections.namedtuple]

hat.util.common.extend_enum_doc(cls, description=None)

Extend enumeration documentation with a list of all members.

Parameters
  • cls (enum.EnumMeta) –

  • description (str) –

Return type

enum.EnumMeta

class hat.util.common.RegisterCallbackHandle(cancel)

Bases: hat.util.common.RegisterCallbackHandle

Handle for canceling callback registration.

Parameters

cancel (Callable[[],None]) – cancel callback registration

Create new instance of RegisterCallbackHandle(cancel,)

class hat.util.common.CallbackRegistry(exception_cb=None)

Bases: object

Registry that enables callback registration and notification.

Callbacks in the registry are notified sequentially with CallbackRegistry.notify(). If a callback raises an exception, the exception is caught and exception_cb handler is called. Notification of subsequent callbacks is not interrupted. If handler is None, the exception is reraised and no subsequent callback is notified.

Parameters

exception_cb (Optional[Callable[[Exception], None]]) – exception handler

register(cb)

Register a callback. A handle for registration canceling is returned.

Parameters

cb (Callable) – callback

Return type

hat.util.common.RegisterCallbackHandle

notify(*args, **kwargs)

Notify all registered callbacks.

hat.util.common.parse_url_query(query)

Parse url query string.

Returns a dictionary of field names and their values.

Parameters

query (str) – url query string

Return type

Dict[str, str]

hat.util.common.parse_env_path(path)

Parse path with environment variables.

Parse file path and replace all file path segments equal to $<name> with value of <name> environment variable. If environment variable is not set, segments are replaced with ..

Parameters

path (os.PathLike) –

Return type

pathlib.Path

class hat.util.common.LogRotatingFileHandler(filename, *vargs, **kwargs)

Bases: logging.handlers.RotatingFileHandler

Filename is parsed with parse_env_path().

For other arguments, see: logging.handlers.RotatingFileHandler.

class hat.util.common.EnvPathArgParseAction(option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=False, help=None, metavar=None)

Bases: argparse.Action

Argparse action for parsing file paths with environment variables.

Each path is parsed with parse_env_path().

hat.util.common.get_unused_tcp_port()

Search for unused TCP port

Return type

int

hat.util.common.get_unused_udp_port()

Search for unused UDP port

Return type

int

hat.util.aio

Async utility functions

hat.util.aio.mlog

module logger

Type

logging.Logger

async hat.util.aio.first(xs, fn=<function <lambda>>, default=None)

Return the first element from async iterable that satisfies predicate fn, or default if no such element exists.

Parameters
  • xs (AsyncIterable[T]) – async collection

  • fn (Callable[[T], bool]) – predicate

  • default (Optional[T]) – default value

Return type

Optional[T]

async hat.util.aio.uncancellable(f, raise_cancel=True)

Uncancellable execution of a Future.

Future is shielded and its execution cannot be interrupted.

If raise_cancel is True and the Future gets canceled, asyncio.CancelledError is reraised after the Future finishes.

Warning

If raise_cancel is False, this method suppresses asyncio.CancelledError and stops its propagation. Use with caution.

Parameters
  • f (_asyncio.Future) – future

  • raise_cancel (bool) – raise CancelledError flag

Returns

future’s result

Return type

Any

hat.util.aio.AsyncCallable

Async callable

alias of collections.abc.Callable[…, Optional[Awaitable]]

async hat.util.aio.call(fn, *args, **kwargs)

Call a function or a coroutine (or other callable object).

Call the fn with args and kwargs. If result of this call is awaitable, it is awaited and returned. Otherwise, result is immediately returned.

Parameters
  • fn (collections.abc.Callable[.., Optional[Awaitable]]) – callable object

  • args – additional positional arguments

  • kwargs – additional keyword arguments

Returns

awaited result or result

Return type

Any

Example

def f1(x):

return x

def f2(x):

f = asyncio.Future() f.set_result(x) return f

async def f3(x):

return ‘x

assert ‘f1’ == await hat.util.aio.call(f1, ‘f1’) assert ‘f2’ == await hat.util.aio.call(f2, ‘f2’) assert ‘f3’ == await hat.util.aio.call(f3, ‘f3’)

async hat.util.aio.call_on_cancel(fn, *args, **kwargs)

Call a function or a coroutine when canceled.

When canceled, fn is called with args and kwargs by using call().

Parameters
  • fn (collections.abc.Callable[.., Optional[Awaitable]]) – function or coroutine

  • args – additional function arguments

  • kwargs – additional function keyword arguments

Returns

function result

Return type

Any

async hat.util.aio.call_on_done(f, fn, *args, **kwargs)

Call a function or a coroutine when awaitable is done.

When f is done, fn is called with args and kwargs by using call().

Parameters
  • f (Awaitable) – awaitable future

  • fn (collections.abc.Callable[.., Optional[Awaitable]]) – function or coroutine

  • args – additional function arguments

  • kwargs – additional function keyword arguments

Returns

function result

Return type

Any

hat.util.aio.create_executor(*args, executor_cls=<class 'concurrent.futures.thread.ThreadPoolExecutor'>, loop=None)

Create asyncio.loop.run_in_executor() wrapper.

Returns a coroutine that takes a function and its arguments, executes the function using executor created from executor_cls and args; and returns the result.

Parameters
  • args (Any) – executor args

  • executor_cls (Type) – executor class

  • loop (Optional[asyncio.events.AbstractEventLoop]) – asyncio loop

Returns

executor coroutine

Return type

Callable[[Callable,..],Awaitable[Any]]

hat.util.aio.init_asyncio()

Initialize asyncio.

Sets event loop policy to uvloop.EventLoopPolicy if possible.

On Windows, sets policy to asyncio.WindowsProactorEventLoopPolicy.

hat.util.aio.run_asyncio(future)

Run asyncio loop until the future is completed and return the result.

SIGINT and SIGTERM handlers are temporarily overridden. Instead of raising KeyboardInterrupt on every signal reception, Future is canceled only once. Additional signals are ignored.

On Windows, SIGBREAK (CTRL_BREAK_EVENT) handler is also overridden and asyncio loop gets periodically woken up (every 0.5 seconds).

Parameters

future (Awaitable) – future or coroutine

Returns

future’s result

Return type

Any

exception hat.util.aio.QueueClosedError

Bases: Exception

Raised when trying to use a closed queue.

exception hat.util.aio.QueueEmptyError

Bases: Exception

Raised if queue is empty.

exception hat.util.aio.QueueFullError

Bases: Exception

Raised if queue is full.

class hat.util.aio.Queue(maxsize=0)

Bases: object

Asyncio queue which implements AsyncIterable and can be closed.

Interface and implementation are based on asyncio.Queue.

If maxsize is less than or equal to zero, the queue size is infinite.

Parameters

maxsize (int) – maximum number of items in the queue

property maxsize

Maximum number of items in the queue.

property closed

Closed future.

empty()

True if queue is empty, False otherwise.

Return type

bool

full()

True if queue is full, False otherwise.

Return type

bool

qsize()

Number of items currently in the queue.

Return type

int

close()

Close the queue.

get_nowait()

Return an item if one is immediately available, else raise QueueEmptyError.

Raises

QueueEmptyError

Return type

Any

put_nowait(item)

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFullError.

Raises

QueueFullError

Parameters

item (Any) –

async get()

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

Raises

QueueClosedError

Return type

Any

async put(item)

Put an item into the queue.

If the queue is full, wait until a free slot is available before adding the item.

Raises

QueueClosedError

Parameters

item (Any) –

async get_until_empty()

Empty the queue and return the last item.

If queue is empty, wait until at least one item is available.

Raises

QueueClosedError

Return type

Any

get_nowait_until_empty()

Empty the queue and return the last item if at least one item is immediately available, else raise QueueEmptyError.

Raises

QueueEmptyError

Return type

Any

class hat.util.aio.Group(exception_cb=None, *, loop=None)

Bases: object

Group of asyncio Tasks.

Group enables creation and management of related asyncio Tasks. The Group ensures uninterrupted execution of Tasks and Task completion upon Group closing.

Group can contain subgroups, which are independent Groups managed by the parent Group.

If a Task raises exception, other Tasks continue to execute.

If exception_cb handler is None, exceptions are logged with level WARNING.

Parameters
  • exception_cb (Optional[Callable[[Exception], None]]) – exception handler

  • loop (Optional[asyncio.events.AbstractEventLoop]) – asyncio loop

property is_open

True if group is not closing or closed, False otherwise.

property closing

Closing Future.

property closed

Closed Future.

create_subgroup()

Create new Group as a child of this Group. Return the new Group.

When a parent Group gets closed, all of its children are closed. Closing of a subgroup has no effect on the parent Group.

Subgroup inherits exception handler from its parent.

Return type

hat.util.aio.Group

wrap(future)

Wrap the Future into a Task and schedule its execution. Return the Task object.

Resulting task is shielded and can be canceled only with Group.async_close().

Parameters

future (_asyncio.Future) –

Return type

_asyncio.Task

spawn(fn, *args, **kwargs)

Wrap the result of a fn into a Task and schedule its execution. Return the Task object.

Function is called with provided args and kwargs. Resulting Task is shielded and can be canceled only with Group.async_close().

Parameters
  • fn (Callable[[..], Awaitable]) – function

  • args – function arguments

  • kwargs – function keyword arguments

Return type

_asyncio.Task

close(cancel=True)

Schedule Group closing.

Closing Future is set immediately. All subgroups are closed, and all running tasks are optionally canceled. Once closing of all subgroups and execution of all tasks is completed, closed Future is set.

Parameters

cancel (bool) – cancel running tasks

async async_close(cancel=True)

Close Group and wait until closed Future is completed.

Parameters

cancel (bool) – cancel running tasks

hat.util.json

JSON data manipulation and validation

hat.util.json.Data

JSON data type identifier.

alias of Union[None, bool, int, float, str, List[Data], Dict[str, Data]]

class hat.util.json.Format(value)

Bases: enum.Enum

Encoding format

JSON
YAML
hat.util.json.equals(a, b)

Equality comparison of json serializable data.

Tests for equality of data according to JSON format. Notably, bool values are not considered equal to numeric values in any case. This is different from default equality comparison, which considers False equal to 0 and 0.0; and True equal to 1 and 1.0.

Parameters
  • a (Union[None, bool, int, float, str, List[Data], Dict[str, Data]]) –

  • b (Union[None, bool, int, float, str, List[Data], Dict[str, Data]]) –

Return type

bool

hat.util.json.diff(src, dst)

Generate JSON Patch diff.

Parameters
  • src (Union[None, bool, int, float, str, List[Data], Dict[str, Data]]) –

  • dst (Union[None, bool, int, float, str, List[Data], Dict[str, Data]]) –

Return type

Union[None, bool, int, float, str, List[Union[None, bool, int, float, str, List[Data], Dict[str, Data]]], Dict[str, Union[None, bool, int, float, str, List[Data], Dict[str, Data]]]]

hat.util.json.patch(data, diff)

Apply JSON Patch diff.

Parameters
  • data (Union[None, bool, int, float, str, List[Data], Dict[str, Data]]) –

  • diff (Union[None, bool, int, float, str, List[Data], Dict[str, Data]]) –

Return type

Union[None, bool, int, float, str, List[Union[None, bool, int, float, str, List[Data], Dict[str, Data]]], Dict[str, Union[None, bool, int, float, str, List[Data], Dict[str, Data]]]]

hat.util.json.encode(data, format=<Format.JSON: 1>, indent=None)

Encode JSON data.

Parameters
  • data (Union[None, bool, int, float, str, List[Data], Dict[str, Data]]) – JSON data

  • format (hat.util.json.Format) – encoding format

  • indent (Optional[int]) – indentation size

Return type

str

hat.util.json.decode(data_str, format=<Format.JSON: 1>)

Decode JSON data.

Parameters
  • data_str (str) – encoded JSON data

  • format (hat.util.json.Format) – encoding format

Return type

Union[None, bool, int, float, str, List[Union[None, bool, int, float, str, List[Data], Dict[str, Data]]], Dict[str, Union[None, bool, int, float, str, List[Data], Dict[str, Data]]]]

hat.util.json.encode_file(data, path, format=None, indent=4)

Encode JSON data to file.

If format is None, encoding format is derived from path suffix.

Parameters
  • data (Union[None, bool, int, float, str, List[Data], Dict[str, Data]]) – JSON data

  • path (pathlib.PurePath) – file path

  • format (Optional[hat.util.json.Format]) – encoding format

  • indent (Optional[int]) – indentation size

hat.util.json.decode_file(path, format=None)

Decode JSON data from file.

If format is None, encoding format is derived from path suffix.

Parameters
  • path (pathlib.PurePath) – file path

  • format (Optional[hat.util.json.Format]) – encoding format

Return type

Union[None, bool, int, float, str, List[Union[None, bool, int, float, str, List[Data], Dict[str, Data]]], Dict[str, Union[None, bool, int, float, str, List[Data], Dict[str, Data]]]]

class hat.util.json.SchemaRepository(*args)

Bases: object

JSON Schema repository.

A repository that holds json schemas and enables validation against them.

Repository can be initialized with multiple arguments, which can be instances of pathlib.PurePath, Data or SchemaRepository.

If an argument is of type pathlib.PurePath, and path points to file with a suffix ‘.json’, ‘.yml’ or ‘.yaml’, json serializable data is decoded from the file. Otherwise, it is assumed that path points to a directory, which is recursively searched for json and yaml files. All decoded schemas are added to the repository. If a schema with the same id was previosly added, an exception is raised.

If an argument is of type Data, it should be a json serializable data representation of a JSON schema. If a schema with the same id was previosly added, an exception is raised.

If an argument is of type SchemaRepository, its schemas are added to the new repository. Previously added schemas with the same id are replaced.

Parameters

args (Union[pathlib.PurePath, None, bool, int, float, str, List[Data], Dict[str, Data], SchemaRepository]) –

validate(schema_id, data)

Validate data against JSON schema.

Parameters
  • schema_id (str) – JSON schema identifier

  • data (Union[None, bool, int, float, str, List[Data], Dict[str, Data]]) – data to be validated

Raises

jsonschema.ValidationError

to_json()

Export repository content as json serializable data.

Entire repository content is exported as json serializable data. New repository can be created from the exported content by using SchemaRepository.from_json().

Return type

Union[None, bool, int, float, str, List[Union[None, bool, int, float, str, List[Data], Dict[str, Data]]], Dict[str, Union[None, bool, int, float, str, List[Data], Dict[str, Data]]]]

static from_json(data)

Create new repository from content exported as json serializable data.

Creates a new repository from content of another repository that was exported by using SchemaRepository.to_json().

Parameters

data (Union[pathlib.PurePath, None, bool, int, float, str, List[Data], Dict[str, Data]]) – repository data

Return type

hat.util.json.SchemaRepository

hat.util.qt

Parallel execution of Qt and asyncio threads

hat.util.qt.QtExecutor

First argument is Callable called with additionaln arguments

alias of Callable[[…], Awaitable[Any]]

hat.util.qt.AsyncMain

First argument is QtExecutor

alias of Callable[[…], Awaitable[Any]]

hat.util.qt.run(async_main, *args, **kwargs)

Run Qt application with additional asyncyio thread

Parameters
  • async_main (Callable[[..], Awaitable[Any]]) – asyncio main entry point

  • args – aditional positional arguments passed to async_main

  • kwargs – aditional keyword arguments passed to async_main