ACSE

Python implementation

Association Controll Service Element

hat.drivers.acse.IdentifiedEntity

Identified entity

alias of Tuple[List[Union[int, Tuple[str, int]]], hat.asn1.common.Entity]

class hat.drivers.acse.Address(host, port)

Bases: tuple

Create new instance of Address(host, port)

host

host name

Type

str

port

TCP port

Type

int

class hat.drivers.acse.SyntaxNames(syntax_names)

Bases: object

Syntax name registry

Parameters

syntax_names (List[asn1.ObjectIdentifier]) – list of ASN.1 ObjectIdentifiers representing syntax names

get_name(syntax_id)

Get syntax name associated with id

Parameters

syntax_id (int) – syntax id

Returns

asn1.ObjectIdentifier

get_id(syntax_name)

Get syntax id associated with name

Parameters

syntax_name (asn1.ObjectIdentifier) – syntax name

Returns

int

class hat.drivers.acse.ConnectionInfo(local_addr, local_tsel, local_ssel, local_psel, local_ap_title, local_ae_qualifier, remote_addr, remote_tsel, remote_ssel, remote_psel, remote_ap_title, remote_ae_qualifier)

Bases: tuple

Create new instance of ConnectionInfo(local_addr, local_tsel, local_ssel, local_psel, local_ap_title, local_ae_qualifier, remote_addr, remote_tsel, remote_ssel, remote_psel, remote_ap_title, remote_ae_qualifier)

local_addr

local address

Type

Address

local_ae_qualifier

local AE qualifier

Type

Optional[int]

local_ap_title

local AP title

Type

Optional[asn1.ObjectIdentifier]

local_psel

local COPP selector

Type

Optional[int]

local_ssel

local COSP selector

Type

Optional[int]

local_tsel

local COTP selector

Type

Optional[int]

remote_addr

remote address

Type

Address

remote_ae_qualifier

remote AE qualifier

Type

Optional[int]

remote_ap_title

remote AP title

Type

Optional[asn1.ObjectIdentifier]

remote_psel

remote COPP selector

Type

Optional[int]

remote_ssel

remote COSP selector

Type

Optional[int]

remote_tsel

remote COTP selector

Type

Optional[int]

hat.drivers.acse.ValidateResult

Validate result

alias of Optional[Tuple[List[Union[int, Tuple[str, int]]], hat.asn1.common.Entity]]

hat.drivers.acse.ValidateCb

Validate callback

alias of Callable[[hat.drivers.copp.SyntaxNames, Tuple[List[Union[int, Tuple[str, int]]], hat.asn1.common.Entity]], Union[Tuple[List[Union[int, Tuple[str, int]]], hat.asn1.common.Entity], None, Awaitable[Optional[Tuple[List[Union[int, Tuple[str, int]]], hat.asn1.common.Entity]]]]]

hat.drivers.acse.ConnectionCb

Connection callback

alias of Callable[[Connection], Union[None, Awaitable[None]]]

async hat.drivers.acse.connect(syntax_name_list, app_context_name, addr, local_tsel=None, remote_tsel=None, local_ssel=None, remote_ssel=None, local_psel=None, remote_psel=None, local_ap_title=None, remote_ap_title=None, local_ae_qualifier=None, remote_ae_qualifier=None, user_data=None)

Connect to ACSE server

Parameters
  • syntax_name_list (List[asn1.ObjectIdentifier]) – syntax names

  • app_context_name (asn1.ObjectIdentifier) – application context name

  • addr (Address) – remote address

  • local_tsel (Optional[int]) – local COTP selector

  • remote_tsel (Optional[int]) – remote COTP selector

  • local_ssel (Optional[int]) – local COSP selector

  • remote_ssel (Optional[int]) – remote COSP selector

  • local_psel (Optional[int]) – local COPP selector

  • remote_psel (Optional[int]) – remote COPP selector

  • local_ap_title (Optional[asn1.ObjectIdentifier]) – local AP title

  • remote_ap_title (Optional[asn1.ObjectIdentifier]) – remote AP title

  • local_ae_qualifier (Optional[int]) – local AE qualifier

  • remote_ae_qualifier (Optional[int]) – remote AE qualifier

  • user_data (Optional[IdentifiedEntity]]) – connect request user data

Returns

Connection

async hat.drivers.acse.listen(validate_cb, connection_cb, addr=Address(host='0.0.0.0', port=102))

Create ACSE listening server

Parameters
  • validate_cb (ValidateCb) – callback function or coroutine called on new incomming connection request prior to creating connection object

  • connection_cb (ConnectionCb) – new connection callback

  • addr (Address) – local listening address

Returns

Server

class hat.drivers.acse.Server

Bases: object

ACSE listening server

For creating new server see listen()

property addresses

listening addresses

Type

List[Address]

property closed

closed future

Type

asyncio.Future

async async_close()

Close listening socket

Calling this method doesn’t close active incomming connections

class hat.drivers.acse.Connection

Bases: object

ACSE connection

For creating new connection see connect()

property info

connection info

Type

ConnectionInfo

property conn_req_user_data

connect request’s user data

Type

IdentifiedEntity

property conn_res_user_data

connect response’s user data

Type

IdentifiedEntity

property closed

closed future

Type

asyncio.Future

async async_close()

Async close

async read()

Read data

Returns

IdentifiedEntity

write(data)

Write data

Parameters

data (IdentifiedEntity) – data