You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
335 lines
11 KiB
335 lines
11 KiB
from collections import deque
|
|
from typing import TYPE_CHECKING
|
|
from typing import Any
|
|
from typing import Dict # deprecated since 3.9: https://peps.python.org/pep-0585/
|
|
|
|
from .dispatcher import ObjectConfig
|
|
from .dispatcher import resolver_factory
|
|
from .event import Event
|
|
from .event_data import EventData
|
|
from .event_data import TriggerData
|
|
from .exceptions import InvalidDefinition
|
|
from .exceptions import InvalidStateValue
|
|
from .exceptions import TransitionNotAllowed
|
|
from .factory import StateMachineMetaclass
|
|
from .i18n import _
|
|
from .model import Model
|
|
from .states import States
|
|
from .transition import Transition
|
|
|
|
if TYPE_CHECKING:
|
|
from .state import State
|
|
|
|
|
|
class StateMachine(metaclass=StateMachineMetaclass):
|
|
"""
|
|
|
|
Args:
|
|
model: An optional external object to store state. See :ref:`domain models`.
|
|
|
|
state_field (str): The model's field which stores the current state.
|
|
Default: ``state``.
|
|
|
|
start_value: An optional start state value if there's no current state assigned
|
|
on the :ref:`domain models`. Default: ``None``.
|
|
|
|
rtc (bool): Controls the :ref:`processing model`. Defaults to ``True``
|
|
that corresponds to a **run-to-completion** (RTC) model.
|
|
|
|
allow_event_without_transition: If ``False`` when an event does not result in a transition,
|
|
an exception ``TransitionNotAllowed`` will be raised.
|
|
If ``True`` the state machine allows triggering events that may not lead to a state
|
|
:ref:`transition`, including tolerance to unknown :ref:`event` triggers.
|
|
Default: ``False``.
|
|
|
|
"""
|
|
|
|
TransitionNotAllowed = TransitionNotAllowed
|
|
"""Shortcut for easy exception handling.
|
|
|
|
Example::
|
|
|
|
try:
|
|
sm.send("an-inexistent-event")
|
|
except sm.TransitionNotAllowed:
|
|
pass
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
model: Any = None,
|
|
state_field: str = "state",
|
|
start_value: Any = None,
|
|
rtc: bool = True,
|
|
allow_event_without_transition: bool = False,
|
|
):
|
|
self.model = model if model else Model()
|
|
self.state_field = state_field
|
|
self.start_value = start_value
|
|
self.allow_event_without_transition = allow_event_without_transition
|
|
self.__rtc = rtc
|
|
self.__processing: bool = False
|
|
self._external_queue: deque = deque()
|
|
|
|
assert hasattr(self, "_abstract")
|
|
if self._abstract:
|
|
raise InvalidDefinition(_("There are no states or transitions."))
|
|
|
|
initial_transition = Transition(
|
|
None, self._get_initial_state(), event="__initial__"
|
|
)
|
|
self._setup(self.states, initial_transition)
|
|
self._activate_initial_state(initial_transition)
|
|
|
|
if TYPE_CHECKING:
|
|
"""Makes mypy happy with dynamic created attributes"""
|
|
|
|
def __getattr__(self, attribute: str) -> Any:
|
|
...
|
|
|
|
def __repr__(self):
|
|
current_state_id = self.current_state.id if self.current_state_value else None
|
|
return (
|
|
f"{type(self).__name__}(model={self.model!r}, state_field={self.state_field!r}, "
|
|
f"current_state={current_state_id!r})"
|
|
)
|
|
|
|
def _get_initial_state(self):
|
|
current_state_value = (
|
|
self.start_value if self.start_value else self.initial_state.value
|
|
)
|
|
try:
|
|
return self.states_map[current_state_value]
|
|
except KeyError as err:
|
|
raise InvalidStateValue(current_state_value) from err
|
|
|
|
def _activate_initial_state(self, initial_transition):
|
|
if self.current_state_value is None:
|
|
# send an one-time event `__initial__` to enter the current state.
|
|
# current_state = self.current_state
|
|
initial_transition.before.clear()
|
|
initial_transition.on.clear()
|
|
initial_transition.after.clear()
|
|
|
|
event_data = EventData(
|
|
trigger_data=TriggerData(
|
|
machine=self,
|
|
event=initial_transition.event,
|
|
),
|
|
transition=initial_transition,
|
|
)
|
|
self._activate(event_data)
|
|
|
|
def _get_protected_attrs(self):
|
|
return {
|
|
"_abstract",
|
|
"model",
|
|
"state_field",
|
|
"start_value",
|
|
"initial_state",
|
|
"final_states",
|
|
"states",
|
|
"_events",
|
|
"states_map",
|
|
"send",
|
|
} | {s.id for s in self.states}
|
|
|
|
def _visit_states_and_transitions(self, visitor):
|
|
for state in self.states:
|
|
visitor(state)
|
|
for transition in state.transitions:
|
|
visitor(transition)
|
|
|
|
def _setup(self, class_states: States, initial_transition: Transition):
|
|
"""
|
|
Args:
|
|
class_states: The original (shared) instances of :ref:`State` living on the
|
|
user's concrete :ref:`StateMachine` class. These instances cannot be modified.
|
|
So we will clone the states in order to bind the new instances with concrete
|
|
actions and event handlers.
|
|
|
|
initial_transition: A special :ref:`transition` that triggers the enter on the
|
|
`initial` :ref:`State`.
|
|
"""
|
|
machine = ObjectConfig(self, skip_attrs=self._get_protected_attrs())
|
|
model = ObjectConfig(self.model, skip_attrs={self.state_field})
|
|
default_resolver = resolver_factory(machine, model)
|
|
|
|
# clone states and transitions to avoid sharing callbacks references between instances
|
|
self.states = States(
|
|
{s.id: s.clone()._setup(self, default_resolver) for s in class_states}
|
|
)
|
|
|
|
self.states_map: Dict[Any, State] = {
|
|
state.value: state for state in self.states
|
|
}
|
|
|
|
for state in self.states:
|
|
for transition in state.transitions:
|
|
transition._setup(self, default_resolver)
|
|
|
|
initial_transition._setup(self, default_resolver)
|
|
self.add_observer(machine, model)
|
|
|
|
def add_observer(self, *observers):
|
|
"""Add an observer.
|
|
|
|
Observers are a way to generically add behavior to a :ref:`StateMachine` without changing
|
|
its internal implementation.
|
|
|
|
.. seealso::
|
|
|
|
:ref:`observers`.
|
|
"""
|
|
|
|
resolvers = [resolver_factory(ObjectConfig.from_obj(o)) for o in observers]
|
|
self._visit_states_and_transitions(lambda x: x._add_observer(*resolvers))
|
|
return self
|
|
|
|
def _repr_html_(self):
|
|
return f'<div class="statemachine">{self._repr_svg_()}</div>'
|
|
|
|
def _repr_svg_(self):
|
|
return self._graph().create_svg().decode()
|
|
|
|
def _graph(self):
|
|
from .contrib.diagram import DotGraphMachine
|
|
|
|
return DotGraphMachine(self).get_graph()
|
|
|
|
@property
|
|
def current_state_value(self):
|
|
"""Get/Set the current :ref:`state` value.
|
|
|
|
This is a low level API, that can be used to assign any valid state value
|
|
completely bypassing all the hooks and validations.
|
|
"""
|
|
value = getattr(self.model, self.state_field, None)
|
|
return value
|
|
|
|
@current_state_value.setter
|
|
def current_state_value(self, value):
|
|
if value not in self.states_map:
|
|
raise InvalidStateValue(value)
|
|
setattr(self.model, self.state_field, value)
|
|
|
|
@property
|
|
def current_state(self) -> "State":
|
|
"""Get/Set the current :ref:`state`.
|
|
|
|
This is a low level API, that can be to assign any valid state
|
|
completely bypassing all the hooks and validations.
|
|
"""
|
|
return self.states_map[self.current_state_value]
|
|
|
|
@current_state.setter
|
|
def current_state(self, value):
|
|
self.current_state_value = value.value
|
|
|
|
@property
|
|
def events(self):
|
|
return self.__class__.events
|
|
|
|
@property
|
|
def allowed_events(self):
|
|
"""List of the current allowed events."""
|
|
return [
|
|
getattr(self, event)
|
|
for event in self.current_state.transitions.unique_events
|
|
]
|
|
|
|
def _process(self, trigger):
|
|
"""Process event triggers.
|
|
|
|
The simplest implementation is the non-RTC (synchronous),
|
|
where the trigger will be run immediately and the result collected as the return.
|
|
|
|
.. note::
|
|
|
|
While processing the trigger, if others events are generated, they
|
|
will also be processed immediately, so a "nested" behavior happens.
|
|
|
|
If the machine is on ``rtc`` model (queued), the event is put on a queue, and only the
|
|
first event will have the result collected.
|
|
|
|
.. note::
|
|
While processing the queue items, if others events are generated, they
|
|
will be processed sequentially (and not nested).
|
|
|
|
"""
|
|
if not self.__rtc:
|
|
# The machine is in "synchronous" mode
|
|
return trigger()
|
|
|
|
# The machine is in "queued" mode
|
|
# Add the trigger to queue and start processing in a loop.
|
|
self._external_queue.append(trigger)
|
|
|
|
# We make sure that only the first event enters the processing critical section,
|
|
# next events will only be put on the queue and processed by the same loop.
|
|
if self.__processing:
|
|
return
|
|
|
|
return self._processing_loop()
|
|
|
|
def _processing_loop(self):
|
|
"""Execute the triggers in the queue in order until the queue is empty"""
|
|
self.__processing = True
|
|
|
|
# We will collect the first result as the processing result to keep backwards compatibility
|
|
# so we need to use a sentinel object instead of `None` because the first result may
|
|
# be also `None`, and on this case the `first_result` may be overridden by another result.
|
|
sentinel = object()
|
|
first_result = sentinel
|
|
try:
|
|
while self._external_queue:
|
|
trigger = self._external_queue.popleft()
|
|
try:
|
|
result = trigger()
|
|
if first_result is sentinel:
|
|
first_result = result
|
|
except Exception:
|
|
# Whe clear the queue as we don't have an expected behavior
|
|
# and cannot keep processing
|
|
self._external_queue.clear()
|
|
raise
|
|
finally:
|
|
self.__processing = False
|
|
return first_result if first_result is not sentinel else None
|
|
|
|
def _activate(self, event_data: EventData):
|
|
transition = event_data.transition
|
|
source = event_data.state
|
|
target = transition.target
|
|
|
|
result = transition.before.call(*event_data.args, **event_data.extended_kwargs)
|
|
if source is not None and not transition.internal:
|
|
source.exit.call(*event_data.args, **event_data.extended_kwargs)
|
|
|
|
result += transition.on.call(*event_data.args, **event_data.extended_kwargs)
|
|
|
|
self.current_state = target
|
|
event_data.state = target
|
|
|
|
if not transition.internal:
|
|
target.enter.call(*event_data.args, **event_data.extended_kwargs)
|
|
transition.after.call(*event_data.args, **event_data.extended_kwargs)
|
|
|
|
if len(result) == 0:
|
|
result = None
|
|
elif len(result) == 1:
|
|
result = result[0]
|
|
|
|
return result
|
|
|
|
def send(self, event, *args, **kwargs):
|
|
"""Send an :ref:`Event` to the state machine.
|
|
|
|
.. seealso::
|
|
|
|
See: :ref:`triggering events`.
|
|
|
|
"""
|
|
event = Event(event)
|
|
return event.trigger(self, *args, **kwargs)
|