Source code for fastcs.backend

import asyncio
from collections import defaultdict
from collections.abc import Callable, Coroutine

from fastcs.cs_methods import Command, Put, Scan
from fastcs.datatypes import T

from .attributes import ONCE, AttrHandlerR, AttrHandlerW, AttrR, AttrW
from .controller import BaseController, Controller
from .controller_api import ControllerAPI
from .exceptions import FastCSException
from .util import validate_hinted_attributes


[docs] class Backend: """For keeping track of tasks during FastCS serving.""" def __init__( self, controller: Controller, loop: asyncio.AbstractEventLoop, ): self._controller = controller self._loop = loop self._initial_coros = [controller.connect] self._scan_tasks: set[asyncio.Task] = set() # Initialise controller and then build its APIs loop.run_until_complete(controller.initialise()) loop.run_until_complete(controller.attribute_initialise()) validate_hinted_attributes(controller) self.controller_api = build_controller_api(controller) self._link_process_tasks() def _link_process_tasks(self): for controller_api in self.controller_api.walk_api(): _link_put_tasks(controller_api) _link_attribute_sender_class(controller_api) def __del__(self): self._stop_scan_tasks() async def serve(self): scans, initials = _get_scan_and_initial_coros(self.controller_api) self._initial_coros += initials await self._run_initial_coros() await self._start_scan_tasks(scans) async def _run_initial_coros(self): for coro in self._initial_coros: await coro() async def _start_scan_tasks( self, coros: list[Callable[[], Coroutine[None, None, None]]] ): self._scan_tasks = {self._loop.create_task(coro()) for coro in coros} for task in self._scan_tasks: task.add_done_callback(self._scan_done) def _scan_done(self, task: asyncio.Task): try: task.result() except Exception as e: raise FastCSException( "Exception raised in scan method of " f"{self._controller.__class__.__name__}" ) from e def _stop_scan_tasks(self): for task in self._scan_tasks: if not task.done(): try: task.cancel() except asyncio.CancelledError: pass
def _link_put_tasks(controller_api: ControllerAPI) -> None: for name, method in controller_api.put_methods.items(): name = name.removeprefix("put_") attribute = controller_api.attributes[name] match attribute: case AttrW(): attribute.add_process_callback(method.fn) case _: raise FastCSException( f"Mode {attribute.access_mode} does not " f"support put operations for {name}" ) def _link_attribute_sender_class(controller_api: ControllerAPI) -> None: for attr_name, attribute in controller_api.attributes.items(): match attribute: case AttrW(sender=AttrHandlerW()): assert not attribute.has_process_callback(), ( f"Cannot assign both put method and Sender object to {attr_name}" ) callback = _create_sender_callback(attribute) attribute.add_process_callback(callback) def _create_sender_callback(attribute): async def callback(value): await attribute.sender.put(attribute, value) return callback def _get_scan_and_initial_coros( root_controller_api: ControllerAPI, ) -> tuple[list[Callable], list[Callable]]: scan_dict: dict[float, list[Callable]] = defaultdict(list) initial_coros: list[Callable] = [] for controller_api in root_controller_api.walk_api(): _add_scan_method_tasks(scan_dict, controller_api) _add_attribute_updater_tasks(scan_dict, initial_coros, controller_api) scan_coros = _get_periodic_scan_coros(scan_dict) return scan_coros, initial_coros def _add_scan_method_tasks( scan_dict: dict[float, list[Callable]], controller_api: ControllerAPI ): for method in controller_api.scan_methods.values(): scan_dict[method.period].append(method.fn) def _add_attribute_updater_tasks( scan_dict: dict[float, list[Callable]], initial_coros: list[Callable], controller_api: ControllerAPI, ): for attribute in controller_api.attributes.values(): match attribute: case AttrR(updater=AttrHandlerR(update_period=update_period)) as attribute: callback = _create_updater_callback(attribute) if update_period is ONCE: initial_coros.append(callback) elif update_period is not None: scan_dict[update_period].append(callback) def _create_updater_callback(attribute: AttrR[T]): updater = attribute.updater assert updater is not None async def callback(): try: await updater.update(attribute) except Exception as e: print(f"Update loop in {updater} stopped:\n{e.__class__.__name__}: {e}") raise return callback def _get_periodic_scan_coros(scan_dict: dict[float, list[Callable]]) -> list[Callable]: periodic_scan_coros: list[Callable] = [] for period, methods in scan_dict.items(): periodic_scan_coros.append(_create_periodic_scan_coro(period, methods)) return periodic_scan_coros def _create_periodic_scan_coro(period, methods: list[Callable]) -> Callable: async def _sleep(): await asyncio.sleep(period) methods.append(_sleep) # Create periodic behavior async def scan_coro() -> None: while True: await asyncio.gather(*[method() for method in methods]) return scan_coro
[docs] def build_controller_api(controller: Controller) -> ControllerAPI: """Build a `ControllerAPI` for a `BaseController` and its sub controllers""" return _build_controller_api(controller, [])
def _build_controller_api(controller: BaseController, path: list[str]) -> ControllerAPI: """Build a `ControllerAPI` for a `BaseController` and its sub controllers""" scan_methods: dict[str, Scan] = {} put_methods: dict[str, Put] = {} command_methods: dict[str, Command] = {} for attr_name in dir(controller): attr = getattr(controller, attr_name) match attr: case Put(enabled=True): put_methods[attr_name] = attr case Scan(enabled=True): scan_methods[attr_name] = attr case Command(enabled=True): command_methods[attr_name] = attr case _: pass return ControllerAPI( path=path, attributes=controller.attributes, command_methods=command_methods, put_methods=put_methods, scan_methods=scan_methods, sub_apis={ name: _build_controller_api(sub_controller, path + [name]) for name, sub_controller in controller.get_sub_controllers().items() }, description=controller.description, )