Source code for malcolm.modules.builtin.controllers.statefulcontroller

from typing import Dict, Union

from malcolm.compat import OrderedDict
from malcolm.core import (
    Alarm,
    AttributeModel,
    ChoiceMeta,
    Context,
    MethodModel,
    NotWriteableError,
    Part,
    ProcessStartHook,
    ProcessStopHook,
    Widget,
)

from ..hooks import DisableHook, HaltHook, InitHook, ResetHook
from ..infos import HealthInfo
from ..util import StatefulStates
from .basiccontroller import ADescription, AMri, BasicController

Field = Union[AttributeModel, MethodModel]
ChildrenWriteable = Dict[str, Dict[Field, bool]]


ss = StatefulStates

# Pull re-used annotypes into our namespace in case we are subclassed
AMri = AMri
ADescription = ADescription


[docs]class StatefulController(BasicController): """A controller that implements `StatefulStates`""" # The state_set that this controller implements state_set = ss() def __init__(self, mri: AMri, description: ADescription = "") -> None: super().__init__(mri, description) self._children_writeable: ChildrenWriteable = {} self.state = ChoiceMeta( "StateMachine State of Block", self.state_set.possible_states, tags=[Widget.MULTILINETEXTUPDATE.tag()] # Start DISABLING so we can immediately go to DISABLED ).create_attribute_model(ss.DISABLING) self.field_registry.add_attribute_model("state", self.state) self.field_registry.add_method_model(self.disable) self.set_writeable_in( self.field_registry.add_method_model(self.reset), ss.DISABLED, ss.FAULT ) self.transition(ss.DISABLED) self.register_hooked(ProcessStartHook, self.init) self.register_hooked(ProcessStopHook, self.halt) def set_writeable_in(self, field, *states): # Field has defined when it should be writeable, just check that # this is valid for this state_set for state in states: assert ( state in self.state_set.possible_states ), "State %s is not one of the valid states %s" % ( state, self.state_set.possible_states, ) for state in self.state_set.possible_states: state_writeable = self._children_writeable.setdefault(state, {}) state_writeable[field] = state in states def create_part_contexts(self) -> Dict[Part, Context]: part_contexts = OrderedDict() assert self.process, "No attached process" for part in self.parts.values(): part_contexts[part] = Context(self.process) return part_contexts def init(self): self.try_stateful_function(ss.RESETTING, ss.READY, self.do_init) def do_init(self): self.run_hooks( InitHook(part, context) for part, context in self.create_part_contexts().items() ) def halt(self): self.run_hooks( HaltHook(part, context) for part, context in self.create_part_contexts().items() ) self.disable() def disable(self): self.try_stateful_function(ss.DISABLING, ss.DISABLED, self.do_disable) def do_disable(self): self.run_hooks( DisableHook(part, context) for part, context in self.create_part_contexts().items() ) def reset(self): self.try_stateful_function(ss.RESETTING, ss.READY, self.do_reset) def do_reset(self): self.run_hooks( ResetHook(part, context) for part, context in self.create_part_contexts().items() ) def go_to_error_state(self, exception): if self.state.value != ss.FAULT: self.transition(ss.FAULT, str(exception)) def check_field_writeable(self, field): try: super().check_field_writeable(field) except NotWriteableError as e: msg = f"{e}, maybe because Block state = {self.state.value}" raise NotWriteableError(msg)
[docs] def transition(self, state, message=""): """Change to a new state if the transition is allowed Args: state (str): State to transition to message (str): Message if the transition is to a fault state """ with self.changes_squashed: initial_state = self.state.value if self.state_set.transition_allowed( initial_state=initial_state, target_state=state ): self.log.debug( "%s: Transitioning from %s to %s", self.mri, initial_state, state ) if state == ss.DISABLED: alarm = Alarm.invalid("Disabled") elif state == ss.FAULT: alarm = Alarm.major(message) else: alarm = Alarm() self.update_health(self, HealthInfo(alarm)) self.state.set_value(state) self.state.set_alarm(alarm) for child, writeable in self._children_writeable[state].items(): child.meta.set_writeable(writeable) else: raise TypeError(f"Cannot transition from {initial_state} to {state}")
def try_stateful_function(self, start_state, end_state, func, *args, **kwargs): try: self.transition(start_state) func(*args, **kwargs) self.transition(end_state) except Exception as e: # pylint:disable=broad-except self.log.debug( "Exception running %s %s %s transitioning from %s to %s", func, args, kwargs, start_state, end_state, exc_info=True, ) self.go_to_error_state(e) raise def add_block_field(self, name, child, writeable_func, needs_context): super().add_block_field(name, child, writeable_func, needs_context) # If we don't have a writeable func it can never be writeable if writeable_func is None: return # If we have already registered an explicit set then we are done for state in self.state_set.possible_states: state_writeable = self._children_writeable.get(state, {}) if child in state_writeable: return # Field is writeable but has not defined when it should be # writeable, so calculate it from the possible states states = [ state for state in self.state_set.possible_states if state not in (ss.DISABLING, ss.DISABLED) ] for state in self.state_set.possible_states: state_writeable = self._children_writeable.setdefault(state, {}) state_writeable[child] = state in states