hat.stc - Python statechart library

This library provides basic implementation of hierarchical state machine engine. Statechart definition can be provided as structures defined by API or by SCXML definition. Additionally, Graphviz DOT graph can be generated based on state definition together with Sphinx extension hat.sphinx.scxml.

Notable differences between hat.stc and SCXML standard:

  • initial child state (in scxml and state tag) should be defined only by setting parent’s initial attribute

  • transitions without associated event name are not supported

  • parallel substates are not supported

  • history pseudo-state is not supported

  • data model is not supported

  • external communications is not supported

  • all actions and conditions are identified by name - arbitrary expressions or executable contents are not supported

  • transition event identifiers are used as exact event names without support for substring segmentation matching

Tutorial

Hierarchical state machines (also known as statecharts) are abstractions used as organization/implementation basis for algorithms that execute continuous state changes. Interaction between “outside world” and statecharts is usually represented with sequence of events that are directly responsible for state changes.

In this tutorial, we will gradually introduce statechart concepts and hat.stc functions/structures which will help implement those concepts. All examples are available as part of git repository .

Statechart definition

As an example of trivial state machine, let us borrow simple example from Wikipedia:

../../_images/tutorial.svg

This diagram models simple door with only two states - opened and closed. Opened is initial state which can transition to closed state once close event occurs. Similarly, once in closed state, door can change to opened state if open event occurs. Both states have associated entry action (named printState) which is triggered each time state is entered.

The same statechart can be described with following SCXML definition:

<?xml version="1.0" encoding="UTF-8"?>
<scxml xmlns="http://www.w3.org/2005/07/scxml" initial="opened" version="1.0">
    <state id="opened">
        <onentry>printState</onentry>
        <transition event="close" target="closed"/>
    </state>
    <state id="closed">
        <onentry>printState</onentry>
        <transition event="open" target="opened"/>
    </state>
</scxml>

hat.stc library provides function hat.stc.parse_scxml which can be used for parsing SCXML definitions into state definitions usable by hat.stc.Statechart. Equivalent definition of this SCXML represented by hat.stc.State definitions is:

[State(name='opened',
       transitions=[Transition(event='close', target='closed')],
       entries=['printState']),
 State(name='closed',
       transitions=[Transition(event='open', target='opened')],
       entries=['printState'])]

Both SCXML and hat.stc.State based definitions represent identical statechart definition and it is up to user to chose more appropriate notation for statechart definitions. In the rest of this tutorial, we will be using SCXML definitions.

Creating statechart instance

Once we have prepared statechart definition (e.g. door_01.scxml), we can create execution environment with new statechart instance:

import asyncio
import hat.stc

async def main():

    def act_print_state(door, evt):
        print('current state:', door.state)

    states = hat.stc.parse_scxml("door_01.scxml")
    actions = {'printState': act_print_state}
    door = hat.stc.Statechart(states, actions)

    # TODO: run statechart

if __name__ == '__main__':
    asyncio.run(main())

During instance initialization, together with state definitions, we provide action bindings. Action bindings is dictionary which contains all action names, used in state definitions, associated with regular functions providing action implementation. In our case, definition contains action printState (same action is used as entry action for both opened and closed state) which is associated with act_print_state function - simple function that prints current state of our statechart instance.

By keeping state definition separate from associated actions and statechart instances, single list of state definitions can be used as blueprint for creating arbitrary number of mutually independent instances.

Running statechart

Execution of statechart logic is controlled with execution of hat.stc.Statechart.run coroutine. Once started, statechart will transition to initial state and wait for registered events that will cause state transitions:

    run_task = asyncio.create_task(door.run())
    await asyncio.sleep(1)

    print('registering close event')
    door.register(hat.stc.Event('close'))
    await asyncio.sleep(1)

    print('registering open event')
    door.register(hat.stc.Event('open'))
    await asyncio.sleep(1)

    run_task.cancel()

By executing this example, following output can be expected:

current state: opened
registering close event
current state: closed
registering open event
current state: opened

Representing statechart as python class

To provide clean interface, we can encapsulate our derived statechart functionality as single class:

door_states = hat.stc.parse_scxml('door_01.scxml')

class Door:

    def __init__(self):
        actions = {'printState': self._act_print_state}
        self._stc = hat.stc.Statechart(door_states, actions)
        self._run_task = asyncio.create_task(self._stc.run())

    def close(self):
        print('registering close event')
        self._stc.register(hat.stc.Event('close'))

    def open(self):
        print('registering open event')
        self._stc.register(hat.stc.Event('open'))

    def finish(self):
        self._run_task.cancel()

    def _act_print_state(self, inst, evt):
        print('current state:', self._stc.state)

Now we can instantiate and test our simple door:

    door = Door()
    await asyncio.sleep(1)

    door.close()
    await asyncio.sleep(1)

    door.open()
    await asyncio.sleep(1)

    door.finish()

This execution produces same result as our previous example:

current state: opened
registering close event
current state: closed
registering open event
current state: opened

Processing registered events

To help our analysis of event processing, we will introduce “force” to our operations of door closing and opening. This “force” will be represented with number in range [0, 100] where 0 represents minimal opening/closing force and 100 represents maximal opening/closing force.

This enhancement can be represented with following changes to door methods:

    def close(self, force):
        print('registering close event')
        self._stc.register(hat.stc.Event('close', force))

    def open(self, force):
        print('registering open event')
        self._stc.register(hat.stc.Event('open', force))

    def _act_print_state(self, inst, evt):
        force = evt.payload if evt else None
        print(f'force {force} caused transition to {self._stc.state}')

Now our test sequence:

    door = Door()
    await asyncio.sleep(1)

    door.close(10)
    await asyncio.sleep(1)

    door.open(20)
    await asyncio.sleep(1)

    door.finish()

results in:

force None caused transition to opened
registering close event
force 10 caused transition to closed
registering open event
force 20 caused transition to opened

Each instance of hat.std.Statechart has it’s own event queue. All registered events are added to the end of this queue. During execution of hat.stc.Statechart.run, events are taken one at the time from begging of event queue and checked for possible transitions. When transition is found, it will cause statechart instance to change it’s state and execute all appropriate actions. If transition paired with event could not be found, event is discarded and statechart doesn’t change it’s state. Once all events from the event queue are processed, hat.stc.Statechart.run will wait for new events to be added to event queue.

Taking this into account, by omitting asyncio.sleep calls between opening/closing doors, we can expect same transitions. Therefore:

    door = Door()
    door.close(20)
    door.open(50)

    await asyncio.sleep(1)
    door.finish()

results in:

registering close event
registering open event
force None caused transition to opened
force 20 caused transition to closed
force 50 caused transition to opened

Also, if we try to open already opened door or close already closed door, this operations will be ignored. Therefore:

    door = Door()
    door.open(10)
    door.close(20)
    door.close(30)
    door.open(40)

    await asyncio.sleep(1)
    door.finish()

results in:

registering open event
registering close event
registering close event
registering open event
force None caused transition to opened
force 20 caused transition to closed
force 40 caused transition to opened

Working with timeouts

As you have probably noticed, our door model reacts to close event by immediately closing the door and to open event by immediately opening the door. More realistic model would include transition states (closing and opening). Duration of door being in one of these transition states should be inversely proportional to the amount of applied force.

../../_images/tutorial1.svg
<?xml version="1.0" encoding="UTF-8"?>
<scxml xmlns="http://www.w3.org/2005/07/scxml" initial="opened" version="1.0">
    <state id="opened">
        <onentry>logEnter</onentry>
        <onexit>logExit</onexit>
        <transition event="close" target="closing">logTransition</transition>
    </state>
    <state id="closing">
        <onentry>logEnter</onentry>
        <onexit>logExit</onexit>
        <onentry>startTimer</onentry>
        <transition event="timeout" target="closed">logTransition</transition>
    </state>
    <state id="closed">
        <onentry>logEnter</onentry>
        <onexit>logExit</onexit>
        <transition event="open" target="opening">logTransition</transition>
    </state>
    <state id="opening">
        <onentry>logEnter</onentry>
        <onexit>logExit</onexit>
        <onentry>startTimer</onentry>
        <transition event="timeout" target="opened">logTransition</transition>
    </state>
</scxml>

To help us about state transitions, we have added additional logs which will inform us when state is entered (logEnter), state is exited (logExit) and transition action is triggered (logTransition). In addition to this logging actions, states closing and opening have additional action responsible for starting timer with calculated timer delay.

To implement timer behaviour, we will be using asyncio loop.call_later .

door_states = hat.stc.parse_scxml('door_02.scxml')

class Door:

    def __init__(self):
        actions = {'logEnter': self._act_log_enter,
                   'logExit': self._act_log_exit,
                   'logTransition': self._act_log_transition,
                   'startTimer': self._act_start_timer}
        self._stc = hat.stc.Statechart(door_states, actions)
        self._run_task = asyncio.create_task(self._stc.run())

    def finish(self):
        self._run_task.cancel()

    def close(self, force):
        print('registering close event')
        self._stc.register(hat.stc.Event('close', force))

    def open(self, force):
        print('registering open event')
        self._stc.register(hat.stc.Event('open', force))

    def _act_log_enter(self, inst, evt):
        print(f'entering state {self._stc.state}')

    def _act_log_exit(self, inst, evt):
        print(f'exiting state {self._stc.state}')

    def _act_log_transition(self, inst, evt):
        print(f'transitioning because of event {evt}')

    def _act_start_timer(self, inst, evt):
        force = evt.payload
        delay = force_to_delay(force)
        print(f'waiting for {delay} seconds')
        loop = asyncio.get_event_loop()
        loop.call_later(delay, self._stc.register, hat.stc.Event('timeout'))

def force_to_delay(force):
    if force <= 0:
        return 0.1
    if force >= 100:
        return 0
    return (100 - force) * 0.001

Execution our simple testing sequence:

    door = Door()
    await asyncio.sleep(1)

    door.close(30)
    await asyncio.sleep(1)

    door.open(60)
    await asyncio.sleep(1)

    door.finish()

will result in:

entering state opened
registering close event
exiting state opened
transitioning because of event Event(name='close', payload=30)
entering state closing
waiting for 0.07 seconds
exiting state closing
transitioning because of event Event(name='timeout', payload=None)
entering state closed
registering open event
exiting state closed
transitioning because of event Event(name='open', payload=60)
entering state opening
waiting for 0.04 seconds
exiting state opening
transitioning because of event Event(name='timeout', payload=None)
entering state opened

Composite states

Our previous example had one major drawback - once opening/closing state was entered, processing of future open/close events is possible only after timeout occurs. This can be specially problematic in cases of long timer duration which are responsible for leaving statechart “unresponsive” for a long time.

This problem can be tackled by adding additional transitions to closing/opening states which will enable “operation cancellation” by returning from originating state.

In case of more complex statechart definitions, addition of transitions for each possible event to each state can soon become hard to maintain. Alternative to this approach is grouping states into hierarchy.

Each hierarchical statechart can be represented by equivalent non-hierarchical state diagram but this can result in state diagrams that are harder to reason about. Because of this, in most cases of complex definitions, we will prefer hierarchical approach.

Hierarchical statechart definition:

../../_images/tutorial2.svg
<?xml version="1.0" encoding="UTF-8"?>
<scxml xmlns="http://www.w3.org/2005/07/scxml" initial="open_group" version="1.0">
    <state id="open_group" initial="opened">
        <onentry>logEnter</onentry>
        <onexit>logExit</onexit>
        <transition event="close" target="close_group">logTransition</transition>
        <state id="opened">
            <onentry>logEnter</onentry>
            <onexit>logExit</onexit>
        </state>
        <state id="opening">
            <onentry>logEnter</onentry>
            <onexit>logExit</onexit>
            <onentry>startTimer</onentry>
            <onexit>stopTimer</onexit>
            <transition event="timeout" target="opened">logTransition</transition>
        </state>
    </state>
    <state id="close_group" initial="closing">
        <onentry>logEnter</onentry>
        <onexit>logExit</onexit>
        <transition event="open" target="opening">logTransition</transition>
        <state id="closing">
            <onentry>logEnter</onentry>
            <onexit>logExit</onexit>
            <onentry>startTimer</onentry>
            <onexit>stopTimer</onexit>
            <transition event="timeout" target="closed">logTransition</transition>
        </state>
        <state id="closed">
            <onentry>logEnter</onentry>
            <onexit>logExit</onexit>
        </state>
    </state>
</scxml>

Implementation of Door class can remain mostly the same with addition of stopTimer action:

    def _act_start_timer(self, inst, evt):
        force = evt.payload
        delay = force_to_delay(force)
        print(f'waiting for {delay} seconds')
        loop = asyncio.get_event_loop()
        self._timer = loop.call_later(delay, self._stc.register,
                                      hat.stc.Event('timeout'))

    def _act_stop_timer(self, inst, evt):
        self._timer.cancel()

Execution of testing sequence

    door = Door()
    await asyncio.sleep(1)

    door.close(30)
    # sleeping for shorter period than it takes for door to close
    await asyncio.sleep(0.05)

    door.open(60)
    await asyncio.sleep(1)

    door.finish()

produces following output:

entering state open_group
entering state opened
registering close event
exiting state opened
exiting state open_group
transitioning because of event Event(name='close', payload=30)
entering state close_group
entering state closing
waiting for 0.07 seconds
registering open event
exiting state closing
exiting state close_group
transitioning because of event Event(name='open', payload=60)
entering state open_group
entering state opening
waiting for 0.04 seconds
exiting state opening
transitioning because of event Event(name='timeout', payload=None)
entering state opened

Advanced transitions

Together with regular transitions, hat.stc provides support for local transitions and internal transitions.

Local transitions can not be used to change state and do not have target state. Therefore, only useful part of local transition is possibility to associate action which will be executed once appropriate event occurs without any state changes.

This functionality can be used in our door simulation to process open events inside open_group state and close events in close_group state.

../../_images/tutorial3.svg
<?xml version="1.0" encoding="UTF-8"?>
<scxml xmlns="http://www.w3.org/2005/07/scxml" initial="open_group" version="1.0">
    <state id="open_group" initial="opened">
        <onentry>logEnter</onentry>
        <onexit>logExit</onexit>
        <transition event="close" target="close_group">logTransition</transition>
        <transition event="open">logInvalid</transition>
        <state id="opened">
            <onentry>logEnter</onentry>
            <onexit>logExit</onexit>
        </state>
        <state id="opening">
            <onentry>logEnter</onentry>
            <onexit>logExit</onexit>
            <onentry>startTimer</onentry>
            <onexit>stopTimer</onexit>
            <transition event="timeout" target="opened">logTransition</transition>
        </state>
    </state>
    <state id="close_group" initial="closing">
        <onentry>logEnter</onentry>
        <onexit>logExit</onexit>
        <transition event="open" target="opening">logTransition</transition>
        <transition event="close">logInvalid</transition>
        <state id="closing">
            <onentry>logEnter</onentry>
            <onexit>logExit</onexit>
            <onentry>startTimer</onentry>
            <onexit>stopTimer</onexit>
            <transition event="timeout" target="closed">logTransition</transition>
        </state>
        <state id="closed">
            <onentry>logEnter</onentry>
            <onexit>logExit</onexit>
        </state>
    </state>
</scxml>

In door implementation, we should add implementation of logInvalid action:

    def _act_log_invalid(self, inst, evt):
        print(f'invalid operation {evt.name} in state {self._stc.state}')

Execution of testing sequence

    door = Door()
    await asyncio.sleep(1)

    door.open(10)
    await asyncio.sleep(1)

    door.close(20)
    await asyncio.sleep(1)

    door.close(30)
    await asyncio.sleep(1)

    door.finish()

produces following output:

entering state open_group
entering state opened
registering open event
invalid operation open in state opened
registering close event
exiting state opened
exiting state open_group
transitioning because of event Event(name='close', payload=20)
entering state close_group
entering state closing
waiting for 0.08 seconds
exiting state closing
transitioning because of event Event(name='timeout', payload=None)
entering state closed
registering close event
invalid operation close in state closed

Transition guards

Each transition can have list of conditions which have to be met for event to cause state transition. These condition guards are convenient way of adding additional statechart logic without adding new states.

To demonstrate usage of transition conditions, we will update behavior of our door definition by limiting number of times successful open/close operations can occur during single door instance lifetime. Lets say that we can open or close door maximum N times.

../../_images/tutorial4.svg
<?xml version="1.0" encoding="UTF-8"?>
<scxml xmlns="http://www.w3.org/2005/07/scxml" initial="open_group" version="1.0">
    <state id="open_group" initial="opened">
        <onentry>logEnter</onentry>
        <onexit>logExit</onexit>
        <transition event="close" target="close_group" cond="isOperational">
            logTransition
            incCounter
        </transition>
        <transition event="open">logInvalid</transition>
        <state id="opened">
            <onentry>logEnter</onentry>
            <onexit>logExit</onexit>
        </state>
        <state id="opening">
            <onentry>logEnter</onentry>
            <onexit>logExit</onexit>
            <onentry>startTimer</onentry>
            <onexit>stopTimer</onexit>
            <transition event="timeout" target="opened">logTransition</transition>
        </state>
    </state>
    <state id="close_group" initial="closing">
        <onentry>logEnter</onentry>
        <onexit>logExit</onexit>
        <transition event="open" target="opening" cond="isOperational">
            logTransition
            incCounter
        </transition>
        <transition event="close">logInvalid</transition>
        <state id="closing">
            <onentry>logEnter</onentry>
            <onexit>logExit</onexit>
            <onentry>startTimer</onentry>
            <onexit>stopTimer</onexit>
            <transition event="timeout" target="closed">logTransition</transition>
        </state>
        <state id="closed">
            <onentry>logEnter</onentry>
            <onexit>logExit</onexit>
        </state>
    </state>
</scxml>

Condition isOperational and incCounter action have to be added to our implementation:

    def __init__(self, max_counter):
        actions = {'logEnter': self._act_log_enter,
                   'logExit': self._act_log_exit,
                   'logTransition': self._act_log_transition,
                   'logInvalid': self._act_log_invalid,
                   'startTimer': self._act_start_timer,
                   'stopTimer': self._act_stop_timer,
                   'incCounter': self._act_inc_counter}
        conditions = {'isOperational': self._cond_is_operational}
        self._max_counter = max_counter
        self._counter = 0
        self._timer = None
        self._stc = hat.stc.Statechart(door_states, actions, conditions)
        self._run_task = asyncio.create_task(self._stc.run())

    def _act_inc_counter(self, inst, evt):
        self._counter += 1

    def _cond_is_operational(self, inst, evt):
        return self._counter < self._max_counter

Execution of testing sequence

    door = Door(3)
    await asyncio.sleep(1)

    door.close(10)
    await asyncio.sleep(1)

    door.open(20)
    await asyncio.sleep(1)

    door.close(30)
    await asyncio.sleep(1)

    # we already operated on this door instance 3 times
    door.open(40)
    await asyncio.sleep(1)

    door.finish()

produces following output:

entering state open_group
entering state opened
registering close event
exiting state opened
exiting state open_group
transitioning because of event Event(name='close', payload=10)
entering state close_group
entering state closing
waiting for 0.09 seconds
exiting state closing
transitioning because of event Event(name='timeout', payload=None)
entering state closed
registering open event
exiting state closed
exiting state close_group
transitioning because of event Event(name='open', payload=20)
entering state open_group
entering state opening
waiting for 0.08 seconds
exiting state opening
transitioning because of event Event(name='timeout', payload=None)
entering state opened
registering close event
exiting state opened
exiting state open_group
transitioning because of event Event(name='close', payload=30)
entering state close_group
entering state closing
waiting for 0.07 seconds
exiting state closing
transitioning because of event Event(name='timeout', payload=None)
entering state closed
registering open event

Final state

In our previous example, it is obvious that once counter reaches maximum value, door are no longer operational. Instead of limiting future operations, we could allow one more time for user to try and operate the door. However, since the door have reach their operational limit, it is expected that next operation will permanently disable the door instance.

For those cases where future changes of statechart states is no longer possible, final state can be used. Final state is state as any other with one exception, once the statechart enters this state, hat.stc.Statechart.run will cease its execution.

../../_images/tutorial5.svg
<?xml version="1.0" encoding="UTF-8"?>
<scxml xmlns="http://www.w3.org/2005/07/scxml" initial="open_group" version="1.0">
    <state id="open_group" initial="opened">
        <onentry>logEnter</onentry>
        <onexit>logExit</onexit>
        <transition event="close" target="close_group" cond="isOperational">
            logTransition
            incCounter
        </transition>
        <transition event="close" target="final" cond="isNotOperational">
            logTransition
        </transition>
        <transition event="open">logInvalid</transition>
        <state id="opened">
            <onentry>logEnter</onentry>
            <onexit>logExit</onexit>
        </state>
        <state id="opening">
            <onentry>logEnter</onentry>
            <onexit>logExit</onexit>
            <onentry>startTimer</onentry>
            <onexit>stopTimer</onexit>
            <transition event="timeout" target="opened">logTransition</transition>
        </state>
    </state>
    <state id="close_group" initial="closing">
        <onentry>logEnter</onentry>
        <onexit>logExit</onexit>
        <transition event="open" target="opening" cond="isOperational">
            logTransition
            incCounter
        </transition>
        <transition event="open" target="final" cond="isNotOperational">
            logTransition
        </transition>
        <transition event="close">logInvalid</transition>
        <state id="closing">
            <onentry>logEnter</onentry>
            <onexit>logExit</onexit>
            <onentry>startTimer</onentry>
            <onexit>stopTimer</onexit>
            <transition event="timeout" target="closed">logTransition</transition>
        </state>
        <state id="closed">
            <onentry>logEnter</onentry>
            <onexit>logExit</onexit>
        </state>
    </state>
    <final id="final">
        <onentry>logEnter</onentry>
    </final>
</scxml>

Modification to implementation:

    @property
    def is_finished(self):
        return self._run_task.done()

    def _cond_is_not_operational(self, inst, evt):
        return self._counter >= self._max_counter

Execution of testing sequence

    door = Door(3)
    await asyncio.sleep(1)

    door.close(10)
    await asyncio.sleep(1)

    door.open(20)
    await asyncio.sleep(1)

    door.close(30)
    await asyncio.sleep(1)

    print('is finished:', door.is_finished)

    # we already operated on this door instance 3 times
    door.open(40)
    await asyncio.sleep(1)

    print('is finished:', door.is_finished)

produces following output:

entering state open_group
entering state opened
registering close event
exiting state opened
exiting state open_group
transitioning because of event Event(name='close', payload=10)
entering state close_group
entering state closing
waiting for 0.09 seconds
exiting state closing
transitioning because of event Event(name='timeout', payload=None)
entering state closed
registering open event
exiting state closed
exiting state close_group
transitioning because of event Event(name='open', payload=20)
entering state open_group
entering state opening
waiting for 0.08 seconds
exiting state opening
transitioning because of event Event(name='timeout', payload=None)
entering state opened
registering close event
exiting state opened
exiting state open_group
transitioning because of event Event(name='close', payload=30)
entering state close_group
entering state closing
waiting for 0.07 seconds
exiting state closing
transitioning because of event Event(name='timeout', payload=None)
entering state closed
is finished: False
registering open event
exiting state closed
exiting state close_group
transitioning because of event Event(name='open', payload=40)
entering state final
is finished: True

API

API reference is available as part of generated documentation: