Source code for fastcs.controller

from __future__ import annotations

from collections import Counter
from collections.abc import Sequence
from copy import deepcopy
from typing import get_type_hints

from fastcs.attribute_io import AttributeIO
from fastcs.attribute_io_ref import AttributeIORefT
from fastcs.attributes import Attribute, AttrR, AttrRW, AttrW
from fastcs.datatypes import T
from fastcs.tracer import Tracer


[docs] class BaseController(Tracer): """Base class for controller.""" #: Attributes passed from the device at runtime. attributes: dict[str, Attribute] description: str | None = None def __init__( self, path: list[str] | None = None, description: str | None = None, ios: Sequence[AttributeIO[T, AttributeIORefT]] | None = None, ) -> None: super().__init__() if ( description is not None ): # Use the argument over the one class defined description. self.description = description if not hasattr(self, "attributes"): self.attributes = {} self._path: list[str] = path or [] self.__sub_controller_tree: dict[str, Controller] = {} self._bind_attrs() ios = ios or [] self._attribute_ref_io_map = {io.ref_type: io for io in ios} self._validate_io(ios) async def initialise(self): pass
[docs] async def attribute_initialise(self) -> None: """Register update and send callbacks for attributes on this controller and all subcontrollers""" self._add_io_callbacks() for controller in self.get_sub_controllers().values(): await controller.attribute_initialise()
def _add_io_callbacks(self): for attr in self.attributes.values(): ref = attr.io_ref if attr.has_io_ref() else None io = self._attribute_ref_io_map.get(type(ref)) if isinstance(attr, AttrW): attr.add_process_callback(self._create_send_callback(io, attr, ref)) if isinstance(attr, AttrR): attr.add_update_callback(self._create_update_callback(io, attr, ref)) def _create_send_callback(self, io, attr, ref): if ref is None: async def send_callback(value): await attr.update_display_without_process(value) if isinstance(attr, AttrRW): await attr.set(value) else: async def send_callback(value): await io.send(attr, value) return send_callback def _create_update_callback(self, io, attr, ref): if ref is None: async def error_callback(): raise RuntimeError("Can't call update on Attributes without an io_ref") return error_callback else: async def update_callback(): await io.update(attr) return update_callback @property def path(self) -> list[str]: """Path prefix of attributes, recursively including parent Controllers.""" return self._path def set_path(self, path: list[str]): if self._path: raise ValueError(f"sub controller is already registered under {self.path}") self._path = path def _bind_attrs(self) -> None: """Search for `Attributes` and `Methods` to bind them to this instance. This method will search the attributes of this controller class to bind them to this specific instance. For `Attribute`s, this is just a case of copying and re-assigning to `self` to make it unique across multiple instances of this controller class. For `Method`s, this requires creating a bound method from a class method and a controller instance, so that it can be called from any context with the controller instance passed as the `self` argument. """ # Lazy import to avoid circular references from fastcs.cs_methods import UnboundCommand, UnboundPut, UnboundScan # Using a dictionary instead of a set to maintain order. class_dir = {key: None for key in dir(type(self)) if not key.startswith("_")} class_type_hints = { key: value for key, value in get_type_hints(type(self)).items() if not key.startswith("_") } for attr_name in {**class_dir, **class_type_hints}: if attr_name == "root_attribute": continue attr = getattr(self, attr_name, None) if isinstance(attr, Attribute): if ( attr_name in self.attributes and self.attributes[attr_name] is not attr ): raise ValueError( f"`{type(self).__name__}` has conflicting attribute " f"`{attr_name}` already present in the attributes dict." ) new_attribute = deepcopy(attr) setattr(self, attr_name, new_attribute) self.attributes[attr_name] = new_attribute elif isinstance(attr, UnboundPut | UnboundScan | UnboundCommand): setattr(self, attr_name, attr.bind(self)) def _validate_io(self, ios: Sequence[AttributeIO[T, AttributeIORefT]]): """Validate that there is exactly one AttributeIO class registered to the controller for each type of AttributeIORef belonging to the attributes of the controller""" for ref_type, count in Counter([io.ref_type for io in ios]).items(): if count > 1: raise RuntimeError( f"More than one AttributeIO class handles {ref_type.__name__}" ) for attr in self.attributes.values(): if not attr.has_io_ref(): continue assert type(attr.io_ref) in self._attribute_ref_io_map, ( f"{self.__class__.__name__} does not have an AttributeIO to handle " f"{attr.io_ref.__class__.__name__}" ) def register_sub_controller(self, name: str, sub_controller: Controller): if name in self.__sub_controller_tree.keys(): raise ValueError( f"Controller {self} already has a sub controller registered as {name}" ) self.__sub_controller_tree[name] = sub_controller sub_controller.set_path(self.path + [name]) if isinstance(sub_controller.root_attribute, Attribute): if name in self.attributes: raise TypeError( f"Cannot set sub controller `{name}` root attribute " f"on the parent controller `{type(self).__name__}` " f"as it already has an attribute of that name." ) self.attributes[name] = sub_controller.root_attribute def get_sub_controllers(self) -> dict[str, Controller]: return self.__sub_controller_tree def __repr__(self): return f"""\ {type(self).__name__}({self.path}, {list(self.__sub_controller_tree.keys())})\ """
[docs] class Controller(BaseController): """Top-level controller for a device. This is the primary class for implementing device support in FastCS. Instances of this class can be loaded into a backend to access its ``Attribute``s. The backend can then perform a specific function with the set of ``Attributes``, such as generating a UI or creating parameters for a control system. """ root_attribute: Attribute | None = None def __init__( self, description: str | None = None, ios: Sequence[AttributeIO[T, AttributeIORefT]] | None = None, ) -> None: super().__init__(description=description, ios=ios) async def connect(self) -> None: pass