Update Attribute Values from a Device#
There are different patterns for pushing values from a device into attributes to suit different use cases. Choose the pattern that fits how the device API delivers data.
Update Tasks via AttributeIO.update#
Use this pattern when each attribute maps to an independent request to the device. The
AttributeIO.update method is called periodically as a background task, once per
attribute, at the rate set by update_period in the attribute’s AttributeIORef.
Define an AttributeIORef with an update_period and implement AttributeIO.update
to query the device and call attr.update with the result:
from dataclasses import KW_ONLY, dataclass
from fastcs.attributes import AttributeIO, AttributeIORef, AttrR, AttrRW, AttrW
from fastcs.controllers import Controller
from fastcs.datatypes import Float, String
@dataclass
class MyDeviceIORef(AttributeIORef):
register: str
_: KW_ONLY
update_period: float | None = 0.5
class MyDeviceIO(AttributeIO[float, MyDeviceIORef]):
def __init__(self, connection):
super().__init__()
self._connection = connection
async def update(self, attr: AttrR[float, MyDeviceIORef]):
response = await self._connection.send_query(f"{attr.io_ref.register}?\r\n")
await attr.update(float(response.strip()))
async def send(self, attr: AttrW[float, MyDeviceIORef], value: float):
await self._connection.send_command(f"{attr.io_ref.register}={value}\r\n")
class MyController(Controller):
temperature = AttrR(Float(), io_ref=MyDeviceIORef("T"))
setpoint = AttrRW(Float(), io_ref=MyDeviceIORef("S", update_period=1.0))
label = AttrR(String(), io_ref=MyDeviceIORef("L", update_period=None))
def __init__(self, connection):
super().__init__(ios=[MyDeviceIO(connection)])
Setting update_period to:
A positive
float— polls at that interval in seconds.None— no automatic updates; the attribute value is only set explicitly (e.g. from a scan method or subscription callback).ONCE(imported fromfastcs) — called once on startup and not again.
Initial Read with Event-Driven Updates from Puts#
Use this pattern when attributes need their initial value read on startup, but subsequent updates arrive as side-effects of write operations rather than on a fixed poll cycle. This is common for devices that echo back related parameter values in their response to a set command.
Set update_period=ONCE on the AttributeIORef so that AttributeIO.update is called
once when the application starts. Then, in AttributeIO.send, parse the device’s
response to the put and call attr.update on any attributes whose values have changed:
from collections.abc import Awaitable, Callable
from dataclasses import KW_ONLY, dataclass
from fastcs import ONCE
from fastcs.attributes import AttributeIO, AttributeIORef, AttrR, AttrRW, AttrW
from fastcs.controllers import Controller
from fastcs.datatypes import Float
@dataclass
class MyDeviceIORef(AttributeIORef):
register: str
_: KW_ONLY
update_period: float | None = ONCE
PutResponseCallback = Callable[[str], Awaitable[None]]
class MyDeviceIO(AttributeIO[float, MyDeviceIORef]):
def __init__(self, connection, on_put_response: PutResponseCallback | None = None):
super().__init__()
self._connection = connection
self._on_put_response = on_put_response
async def update(self, attr: AttrR[float, MyDeviceIORef]):
response = await self._connection.send_query(f"{attr.io_ref.register}?\r\n")
await attr.update(float(response.strip()))
async def send(self, attr: AttrW[float, MyDeviceIORef], value: float):
# Device responds with a snapshot of all current values after a set
response = await self._connection.send_query(
f"{attr.io_ref.register}={value}\r\n"
)
if self._on_put_response is not None:
await self._on_put_response(response)
class MyController(Controller):
setpoint = AttrRW(Float(), io_ref=MyDeviceIORef("S"))
actual_temperature = AttrR(Float(), io_ref=MyDeviceIORef("T"))
power = AttrR(Float(), io_ref=MyDeviceIORef("P"))
status = AttrR(Float(), io_ref=MyDeviceIORef("X"))
def __init__(self, connection):
super().__init__(ios=[MyDeviceIO(connection, self._handle_put_response)])
async def _handle_put_response(self, response: str) -> None:
actual, power, status = response.strip().split(",")
await self.actual_temperature.update(float(actual))
await self.power.update(float(power))
await self.status.update(float(status))
Attributes that are updated as side-effects of puts can still carry update_period=ONCE
so they also get their initial value on startup. Set update_period=None instead if the
device response to the put is the only source of truth and no initial poll is needed.
Batched Updates via a Scan Method#
Use this pattern when the device returns values for multiple attributes in a single
response. A @scan method runs periodically on the controller and distributes the
results by calling attr.update directly on each attribute.
Attributes that are updated this way do not need an io_ref with an update_period
because the scan method drives the updates rather than individual IO tasks.
import json
from fastcs.attributes import AttrR
from fastcs.controllers import Controller
from fastcs.datatypes import Float
from fastcs.methods import scan
class ChannelController(Controller):
voltage = AttrR(Float()) # No io_ref — updated by parent scan method
def __init__(self, index: int, connection):
super().__init__(f"Ch{index:02d}")
self._index = index
self._connection = connection
class MultiChannelController(Controller):
def __init__(self, channel_count: int, connection):
self._connection = connection
super().__init__()
self._channels: list[ChannelController] = []
for i in range(channel_count):
ch = ChannelController(i, connection)
self._channels.append(ch)
self.add_sub_controller(f"Ch{i:02d}", ch)
@scan(0.1)
async def update_voltages(self):
# One request returns all channel voltages
voltages = json.loads(
(await self._connection.send_query("V?\r\n")).strip()
)
for channel, voltage in zip(self._channels, voltages):
await channel.voltage.update(float(voltage))
The scan period (here 0.1 seconds) sets how often the batched query runs. Scans that
raise an exception will pause and wait for reconnect() to be called before resuming.
Scan as a cache for AttributeIO.update#
When there are many attributes to update from a batched response, calling attr.update
for each one inside the scan method becomes verbose. Instead, the scan can populate a
cache on the AttributeIO, and each attribute’s regular update task reads from that
cache rather than querying the device while the device is still only queried once per
cycle.
import json
from dataclasses import KW_ONLY, dataclass
from fastcs.attributes import AttributeIO, AttributeIORef, AttrR
from fastcs.controllers import Controller
from fastcs.datatypes import Float
from fastcs.methods import scan
@dataclass
class ChannelIORef(AttributeIORef):
index: int
_: KW_ONLY
update_period: float | None = 0.1
class ChannelIO(AttributeIO[float, ChannelIORef]):
def __init__(self):
super().__init__()
self._cache: dict[int, float] = {}
def update_cache(self, values: dict[int, float]) -> None:
self._cache = values
async def update(self, attr: AttrR[float, ChannelIORef]):
cached = self._cache.get(attr.io_ref.index)
if cached is not None:
await attr.update(cached)
class ChannelController(Controller):
def __init__(self, index: int, io: ChannelIO):
super().__init__(f"Ch{index:02d}", ios=[io])
self.voltage = AttrR(Float(), io_ref=ChannelIORef(index))
class MultiChannelController(Controller):
def __init__(self, channel_count: int, connection):
self._connection = connection
self._channel_io = ChannelIO()
super().__init__()
for i in range(channel_count):
self.add_sub_controller(f"Ch{i:02d}", ChannelController(i, self._channel_io))
@scan(0.1)
async def fetch_voltages(self):
voltages = json.loads(
(await self._connection.send_query("V?\r\n")).strip()
)
self._channel_io.update_cache(dict(enumerate(map(float, voltages))))
Subscription Callbacks#
Use this pattern when the device library (or protocol) delivers value changes by calling
a user-supplied callback rather than responding to polls. Wrap attr.update in an async
callback and register it with the library.
import asyncio
from fastcs.attributes import AttrR
from fastcs.controllers import Controller
from fastcs.datatypes import Float
class SubscriptionController(Controller):
temperature = AttrR(Float())
def __init__(self, subscription_client):
super().__init__()
self._client = subscription_client
async def connect(self):
# Register an async callback that forwards updates into the attribute.
async def on_temperature_change(value: float) -> None:
await self.temperature.update(value)
await self._client.subscribe("temperature", on_temperature_change)
await super().connect()
If the library only supports synchronous callbacks, schedule the coroutine onto the running event loop:
def on_temperature_change_sync(value: float) -> None:
asyncio.get_event_loop().call_soon_threadsafe(
asyncio.ensure_future, self.temperature.update(value)
)
self._client.subscribe("temperature", on_temperature_change_sync)