Source code for fastcs.transports.controller_api

import asyncio
from collections import defaultdict
from collections.abc import Callable, Iterator
from dataclasses import dataclass, field

from fastcs.attributes import Attribute, AttributeIORef, AttrR
from fastcs.logging import bind_logger
from fastcs.methods import Command, Scan, ScanCallback
from fastcs.tracer import Tracer
from fastcs.util import ONCE

tracer = Tracer(name=__name__)
logger = bind_logger(logger_name=__name__)


[docs] @dataclass class ControllerAPI: """Attributes, Methods and sub APIs of a `Controller` to expose in a transport""" path: list[str] = field(default_factory=list) """Path within controller tree (empty if this is the root)""" attributes: dict[str, Attribute] = field(default_factory=dict) command_methods: dict[str, Command] = field(default_factory=dict) scan_methods: dict[str, Scan] = field(default_factory=dict) sub_apis: dict[str, "ControllerAPI"] = field(default_factory=dict) """APIs of the sub controllers of the `Controller` this API was built from""" description: str | None = None
[docs] def walk_api(self) -> Iterator["ControllerAPI"]: """Walk through all the nested `ControllerAPI` s of this `ControllerAPI`. Yields the `ControllerAPI` s from a depth-first traversal of the tree, including self. """ yield self for api in self.sub_apis.values(): yield from api.walk_api()
def __repr__(self): return ( f"ControllerAPI(" f"path={self.path}, " f"sub_apis=[{', '.join(self.sub_apis.keys())}]" f")" ) def get_scan_and_initial_coros( self, ) -> tuple[list[ScanCallback], list[ScanCallback]]: scan_dict: dict[float, list[Callable]] = defaultdict(list) initial_coros: list[Callable] = [] for controller_api in self.walk_api(): _add_scan_method_tasks(scan_dict, initial_coros, controller_api) _add_attribute_update_tasks(scan_dict, initial_coros, controller_api) scan_coros = _get_periodic_scan_coros(scan_dict) return scan_coros, initial_coros
def _add_scan_method_tasks( scan_dict: dict[float, list[Callable]], initial_coros: list[Callable], controller_api: ControllerAPI, ): for method in controller_api.scan_methods.values(): if method.period is ONCE: initial_coros.append(method.fn) else: scan_dict[method.period].append(method.fn) def _add_attribute_update_tasks( scan_dict: dict[float, list[Callable]], initial_coros: list[Callable], controller_api: ControllerAPI, ): for attribute in controller_api.attributes.values(): match attribute: case ( AttrR(_io_ref=AttributeIORef(update_period=update_period)) as attribute ): if update_period is ONCE: initial_coros.append(attribute.bind_update_callback()) elif update_period is not None: scan_dict[update_period].append(attribute.bind_update_callback()) def _get_periodic_scan_coros( scan_dict: dict[float, list[ScanCallback]], ) -> list[ScanCallback]: periodic_scan_coros: list[ScanCallback] = [] for period, methods in scan_dict.items(): periodic_scan_coros.append(_create_periodic_scan_coro(period, methods)) return periodic_scan_coros def _create_periodic_scan_coro( period: float, scans: list[ScanCallback] ) -> ScanCallback: async def _sleep(): await asyncio.sleep(period) methods = [_sleep] + list(scans) # Create periodic behavior async def scan_coro() -> None: while True: await asyncio.gather(*[method() for method in methods]) return scan_coro