Source code for fastcs.transports.controller_api
import asyncio
from collections import defaultdict
from collections.abc import Callable, Iterator
from dataclasses import dataclass, field
from fastcs.attributes import Attribute, AttributeIORef, AttrR
from fastcs.methods import Command, Scan, ScanCallback
from fastcs.util import ONCE
[docs]
@dataclass
class ControllerAPI:
"""Attributes, Methods and sub APIs of a `Controller` to expose in a transport"""
path: list[str] = field(default_factory=list)
"""Path within controller tree (empty if this is the root)"""
attributes: dict[str, Attribute] = field(default_factory=dict)
command_methods: dict[str, Command] = field(default_factory=dict)
scan_methods: dict[str, Scan] = field(default_factory=dict)
sub_apis: dict[str, "ControllerAPI"] = field(default_factory=dict)
"""APIs of the sub controllers of the `Controller` this API was built from"""
description: str | None = None
[docs]
def walk_api(self) -> Iterator["ControllerAPI"]:
"""Walk through all the nested `ControllerAPI` s of this `ControllerAPI`.
Yields the `ControllerAPI` s from a depth-first traversal of the tree,
including self.
"""
yield self
for api in self.sub_apis.values():
yield from api.walk_api()
def __repr__(self):
return (
f"ControllerAPI("
f"path={self.path}, "
f"sub_apis=[{', '.join(self.sub_apis.keys())}]"
f")"
)
def get_scan_and_initial_coros(
self,
) -> tuple[list[ScanCallback], list[ScanCallback]]:
scan_dict: dict[float, list[Callable]] = defaultdict(list)
initial_coros: list[Callable] = []
for controller_api in self.walk_api():
_add_scan_method_tasks(scan_dict, initial_coros, controller_api)
_add_attribute_update_tasks(scan_dict, initial_coros, controller_api)
scan_coros = _get_periodic_scan_coros(scan_dict)
return scan_coros, initial_coros
def _add_scan_method_tasks(
scan_dict: dict[float, list[Callable]],
initial_coros: list[Callable],
controller_api: ControllerAPI,
):
for method in controller_api.scan_methods.values():
if method.period is ONCE:
initial_coros.append(method.fn)
else:
scan_dict[method.period].append(method.fn)
def _add_attribute_update_tasks(
scan_dict: dict[float, list[Callable]],
initial_coros: list[Callable],
controller_api: ControllerAPI,
):
for attribute in controller_api.attributes.values():
match attribute:
case (
AttrR(_io_ref=AttributeIORef(update_period=update_period)) as attribute
):
if update_period is ONCE:
initial_coros.append(attribute.bind_update_callback())
elif update_period is not None:
scan_dict[update_period].append(attribute.bind_update_callback())
def _get_periodic_scan_coros(
scan_dict: dict[float, list[ScanCallback]],
) -> list[ScanCallback]:
periodic_scan_coros: list[ScanCallback] = []
for period, methods in scan_dict.items():
periodic_scan_coros.append(_create_periodic_scan_coro(period, methods))
return periodic_scan_coros
def _create_periodic_scan_coro(
period: float, scans: list[ScanCallback]
) -> ScanCallback:
async def _sleep():
await asyncio.sleep(period)
methods = [_sleep] + list(scans) # Create periodic behavior
async def scan_coro() -> None:
while True:
await asyncio.gather(*[method() for method in methods])
return scan_coro