Source code for fastcs.attributes

from __future__ import annotations

import asyncio
from collections.abc import Awaitable, Callable
from typing import Any, Generic

from fastcs.attribute_io_ref import AttributeIORefT
from fastcs.datatypes import ATTRIBUTE_TYPES, DataType, T
from fastcs.logging import bind_logger
from fastcs.tracer import Tracer

ONCE = float("inf")
"""Special value to indicate that an attribute should be updated once on start up."""

logger = bind_logger(logger_name=__name__)


[docs] class Attribute(Generic[T, AttributeIORefT], Tracer): """Base FastCS attribute. Instances of this class added to a ``Controller`` will be used by the FastCS class. """ def __init__( self, datatype: DataType[T], io_ref: AttributeIORefT | None = None, group: str | None = None, description: str | None = None, ) -> None: super().__init__() assert issubclass(datatype.dtype, ATTRIBUTE_TYPES), ( f"Attr type must be one of {ATTRIBUTE_TYPES}, " "received type {datatype.dtype}" ) self._io_ref = io_ref self._datatype: DataType[T] = datatype self._group = group self.enabled = True self.description = description # A callback to use when setting the datatype to a different value, for example # changing the units on an int. self._update_datatype_callbacks: list[Callable[[DataType[T]], None]] = [] # Path and name to be filled in by Controller it is bound to self._name = "" self._path = [] @property def io_ref(self) -> AttributeIORefT: if self._io_ref is None: raise RuntimeError(f"{self} has no AttributeIORef") return self._io_ref def has_io_ref(self): return self._io_ref is not None @property def datatype(self) -> DataType[T]: return self._datatype @property def dtype(self) -> type[T]: return self._datatype.dtype @property def group(self) -> str | None: return self._group @property def name(self) -> str: return self._name @property def path(self) -> list[str]: return self._path def add_update_datatype_callback( self, callback: Callable[[DataType[T]], None] ) -> None: self._update_datatype_callbacks.append(callback) def update_datatype(self, datatype: DataType[T]) -> None: if not isinstance(self._datatype, type(datatype)): raise ValueError( f"Attribute datatype must be of type {type(self._datatype)}" ) self._datatype = datatype for callback in self._update_datatype_callbacks: callback(datatype) def set_name(self, name: str): if self._name: raise RuntimeError( f"Attribute is already registered with a controller as {self._name}" ) self._name = name def set_path(self, path: list[str]): if self._path: raise RuntimeError( f"Attribute is already registered with a controller at {self._path}" ) self._path = path def __repr__(self): name = self.__class__.__name__ path = ".".join(self._path + [self._name]) or None datatype = self._datatype.__class__.__name__ return f"{name}(path={path}, datatype={datatype}, io_ref={self._io_ref})"
AttrIOUpdateCallback = Callable[["AttrR[T, Any]"], Awaitable[None]] """An AttributeIO callback that takes an AttrR and updates its value""" AttrUpdateCallback = Callable[[], Awaitable[None]] """A callback to be called periodically to update an attribute""" AttrOnUpdateCallback = Callable[[T], Awaitable[None]] """A callback to be called when the value of the attribute is updated"""
[docs] class AttrR(Attribute[T, AttributeIORefT]): """A read-only ``Attribute``.""" def __init__( self, datatype: DataType[T], io_ref: AttributeIORefT | None = None, group: str | None = None, initial_value: T | None = None, description: str | None = None, ) -> None: super().__init__(datatype, io_ref, group, description=description) self._value: T = ( datatype.initial_value if initial_value is None else initial_value ) self._update_callback: AttrIOUpdateCallback[T] | None = None """Callback to update the value of the attribute with an IO to the source""" self._on_update_callbacks: list[AttrOnUpdateCallback[T]] | None = None """Callbacks to publish changes to the value of the attribute"""
[docs] def get(self) -> T: """Get the cached value of the attribute.""" return self._value
[docs] async def update(self, value: T) -> None: """Update the value of the attibute This sets the cached value of the attribute presented in the API. It should generally only be called from an IO or a controller that is updating the value from some underlying source. To request a change to the setpoint of the attribute, use the ``put`` method, which will attempt to apply the change to the underlying source. """ self.log_event("Attribute set", attribute=self, value=value) self._value = self._datatype.validate(value) if self._on_update_callbacks is not None: try: await asyncio.gather( *[cb(self._value) for cb in self._on_update_callbacks] ) except Exception as e: logger.opt(exception=e).error( "On update callback failed", attribute=self, value=value ) raise
[docs] def add_on_update_callback(self, callback: AttrOnUpdateCallback[T]) -> None: """Add a callback to be called when the value of the attribute is updated The callback will be called with the updated value. """ if self._on_update_callbacks is None: self._on_update_callbacks = [] self._on_update_callbacks.append(callback)
[docs] def set_update_callback(self, callback: AttrIOUpdateCallback[T]): """Set the callback to update the value of the attribute from the source The callback will be converted to an async task and called periodically. """ if self._update_callback is not None: raise RuntimeError("Attribute already has an IO update callback") self._update_callback = callback
[docs] def bind_update_callback(self) -> AttrUpdateCallback: """Bind self into the registered IO update callback""" if self._update_callback is None: raise RuntimeError("Attribute has no update callback") else: update_callback = self._update_callback async def update_attribute(): try: self.log_event("Update attribute", topic=self) await update_callback(self) except Exception as e: logger.opt(exception=e).error("Update loop failed", attribute=self) raise return update_attribute
AttrOnPutCallback = Callable[["AttrW[T, Any]", T], Awaitable[None]] """Callbacks to be called when the setpoint of an attribute is changed""" AttrSyncSetpointCallback = Callable[[T], Awaitable[None]] """Callbacks to be called when the setpoint of an attribute is changed"""
[docs] class AttrW(Attribute[T, AttributeIORefT]): """A write-only ``Attribute``.""" def __init__( self, datatype: DataType[T], io_ref: AttributeIORefT | None = None, group: str | None = None, description: str | None = None, ) -> None: super().__init__( datatype, # type: ignore io_ref, group, description=description, ) self._on_put_callback: AttrOnPutCallback[T] | None = None """Callback to action a change to the setpoint of the attribute""" self._sync_setpoint_callbacks: list[AttrSyncSetpointCallback[T]] = [] """Callbacks to publish changes to the setpoint of the attribute"""
[docs] async def put(self, setpoint: T, sync_setpoint: bool = False) -> None: """Set the setpoint of the attribute This should be called by clients to the attribute such as transports to apply a change to the attribute. The ``_on_put_callback`` will be called with this new setpoint, which may or may not take effect depending on the validity of the new value. For example, if the attribute has an IO to some device, the value might be rejected. To directly change the value of the attribute, for example from an update loop that has read a new value from some underlying source, call the ``update`` method. """ setpoint = self._datatype.validate(setpoint) if self._on_put_callback is not None: try: await self._on_put_callback(self, setpoint) except Exception as e: logger.opt(exception=e).error( "Put failed", attribute=self, setpoint=setpoint ) if sync_setpoint: try: await self._call_sync_setpoint_callbacks(setpoint) except Exception as e: logger.opt(exception=e).error( "Sync setpoint failed", attribute=self, setpoint=setpoint )
async def _call_sync_setpoint_callbacks(self, setpoint: T) -> None: if self._sync_setpoint_callbacks: await asyncio.gather( *[cb(setpoint) for cb in self._sync_setpoint_callbacks] )
[docs] def set_on_put_callback(self, callback: AttrOnPutCallback[T]) -> None: """Set the callback to call when the setpoint is changed The callback will be called with the attribute and the new setpoint. """ if self._on_put_callback is not None: raise RuntimeError("Attribute already has an on put callback") self._on_put_callback = callback
[docs] def add_sync_setpoint_callback(self, callback: AttrSyncSetpointCallback[T]) -> None: """Add a callback to publish changes to the setpoint of the attribute The callback will be called with the new setpoint. """ self._sync_setpoint_callbacks.append(callback)
[docs] class AttrRW(AttrR[T, AttributeIORefT], AttrW[T, AttributeIORefT]): """A read-write ``Attribute``.""" def __init__( self, datatype: DataType[T], io_ref: AttributeIORefT | None = None, group: str | None = None, initial_value: T | None = None, description: str | None = None, ): super().__init__(datatype, io_ref, group, initial_value, description) self._setpoint_initialised = False if io_ref is None: self.set_on_put_callback(self._internal_update) async def _internal_update(self, attr: AttrW[T, AttributeIORefT], value: T): """Update value directly when Attribute has no IO""" assert attr is self await self.update(value) async def update(self, value: T): await super().update(value) if not self._setpoint_initialised: await self._call_sync_setpoint_callbacks(value) self._setpoint_initialised = True