Dynamic FastCS drivers#
Demo Simulation#
The demo simulation used in the previous tutorial has a command API? to list all of its
commands. This allows you to introspect the device and create the API dynamically,
instead of defining all the attributes statically. The response will look like this
{
"Device ID": {"command": "ID", "type": "str", "access_mode": "r"},
"Power": {"command": "P", "type": "float", "access_mode": "rw"},
"Ramp Rate": {"command": "R", "type": "float", "access_mode": "rw"},
"Ramps": [
{
"Start": {"command": "S01", "type": "int", "access_mode": "rw"},
"End": {"command": "E01", "type": "int", "access_mode": "rw"},
"Enabled": {"command": "N01", "type": "int", "access_mode": "rw"},
"Target": {"command": "T01", "type": "float", "access_mode": "rw"},
"Actual": {"command": "A01", "type": "float", "access_mode": "rw"},
},
...,
],
}
This contains all the metadata about the parameters in the API needed to create the
Attributes from the previous tutorial. For a real device, this might also include
fields such as the units of numerical parameters, limits that a parameter can be set to,
or a description for the parameter.
FastCS Initialisation#
Specific Controller classes can optionally implement an async initialise method to
perform any start up logic. The intention here is that the __init__ method should be
minimal and the initialise method performs any long running calls, such as querying an
API, allowing FastCS to run these concurrently to reduce start times.
Take the driver implementation from the previous tutorial and remove the
statically defined Attributes and creation of sub controllers in __init__. Then
implement an initialise method to create these dynamically instead.
Create a pydantic model to validate the response from the device
from pydantic import BaseModel, ConfigDict, ValidationError
from fastcs.launch import FastCS
from fastcs.transports.epics import EpicsIOCOptions
from fastcs.transports.epics.ca import EpicsCATransport
class TemperatureControllerParameter(BaseModel):
model_config = ConfigDict(extra="forbid")
command: str
type: Literal["bool", "int", "float", "str"]
access_mode: Literal["r", "rw"]
@property
def fastcs_datatype(self) -> DataType:
match self.type:
case "bool":
return Bool()
case "int":
Create a function to parse the dictionary, validate the entries against the model and
create Attributes.
return Float()
case "str":
return String()
def create_attributes(parameters: dict[str, Any]) -> dict[str, Attribute]:
attributes: dict[str, Attribute] = {}
for name, parameter in parameters.items():
name = name.replace(" ", "_").lower()
try:
parameter = TemperatureControllerParameter.model_validate(parameter)
except ValidationError as e:
print(f"Failed to validate parameter '{parameter}'\n{e}")
continue
io_ref = TemperatureControllerAttributeIORef(parameter.command)
match parameter.access_mode:
case "r":
Update the controllers to not define attributes statically and implement initialise methods to create these attributes dynamically.
) -> None:
command = f"{attr.io_ref.name}={attr.dtype(value)}"
await self._connection.send_command(f"{command}\r\n")
class TemperatureRampController(Controller):
def __init__(
self,
index: int,
parameters: dict[str, TemperatureControllerParameter],
io: TemperatureControllerAttributeIO,
):
self._parameters = parameters
super().__init__(f"Ramp{index}", ios=[io])
async def initialise(self):
for name, attribute in create_attributes(self._parameters).items():
self.add_attribute(name, attribute)
class TemperatureController(Controller):
def __init__(self, settings: IPConnectionSettings):
self._ip_settings = settings
self._connection = IPConnection()
self._io = TemperatureControllerAttributeIO(self._connection)
super().__init__(ios=[self._io])
async def connect(self):
await self._connection.connect(self._ip_settings)
async def initialise(self):
await self.connect()
api = json.loads((await self._connection.send_query("API?\r\n")).strip("\r\n"))
ramps_api = api.pop("Ramps")
for name, attribute in create_attributes(api).items():
self.add_attribute(name, attribute)
The suffix field should also be removed from TemperatureController and
TemperatureRampController and then not used in TemperatureControllerAttributeIO
because the command field on TemperatureControllerParameter includes this.
TODO: Add enabled back in to TemperatureRampController and recreate disable_all to
demonstrate validation of introspected Attributes.
The full code is as follows
Code
import json
from dataclasses import KW_ONLY, dataclass
from typing import Any, Literal, TypeVar
from pydantic import BaseModel, ConfigDict, ValidationError
from fastcs.attributes import (
Attribute,
AttributeIO,
AttributeIORef,
AttrR,
AttrRW,
AttrW,
)
from fastcs.connections import IPConnection, IPConnectionSettings
from fastcs.controllers import Controller
from fastcs.datatypes import Bool, DataType, Float, Int, String
from fastcs.launch import FastCS
from fastcs.transports.epics import EpicsIOCOptions
from fastcs.transports.epics.ca import EpicsCATransport
class TemperatureControllerParameter(BaseModel):
model_config = ConfigDict(extra="forbid")
command: str
type: Literal["bool", "int", "float", "str"]
access_mode: Literal["r", "rw"]
@property
def fastcs_datatype(self) -> DataType:
match self.type:
case "bool":
return Bool()
case "int":
return Int()
case "float":
return Float()
case "str":
return String()
def create_attributes(parameters: dict[str, Any]) -> dict[str, Attribute]:
attributes: dict[str, Attribute] = {}
for name, parameter in parameters.items():
name = name.replace(" ", "_").lower()
try:
parameter = TemperatureControllerParameter.model_validate(parameter)
except ValidationError as e:
print(f"Failed to validate parameter '{parameter}'\n{e}")
continue
io_ref = TemperatureControllerAttributeIORef(parameter.command)
match parameter.access_mode:
case "r":
attributes[name] = AttrR(parameter.fastcs_datatype, io_ref=io_ref)
case "rw":
attributes[name] = AttrRW(parameter.fastcs_datatype, io_ref=io_ref)
return attributes
NumberT = TypeVar("NumberT", int, float)
@dataclass
class TemperatureControllerAttributeIORef(AttributeIORef):
name: str
_: KW_ONLY
update_period: float | None = 0.2
class TemperatureControllerAttributeIO(
AttributeIO[NumberT, TemperatureControllerAttributeIORef]
):
def __init__(self, connection: IPConnection):
super().__init__()
self._connection = connection
async def update(self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef]):
query = f"{attr.io_ref.name}?"
response = await self._connection.send_query(f"{query}\r\n")
value = response.strip("\r\n")
await attr.update(attr.dtype(value))
async def send(
self, attr: AttrW[NumberT, TemperatureControllerAttributeIORef], value: NumberT
) -> None:
command = f"{attr.io_ref.name}={attr.dtype(value)}"
await self._connection.send_command(f"{command}\r\n")
class TemperatureRampController(Controller):
def __init__(
self,
index: int,
parameters: dict[str, TemperatureControllerParameter],
io: TemperatureControllerAttributeIO,
):
self._parameters = parameters
super().__init__(f"Ramp{index}", ios=[io])
async def initialise(self):
for name, attribute in create_attributes(self._parameters).items():
self.add_attribute(name, attribute)
class TemperatureController(Controller):
def __init__(self, settings: IPConnectionSettings):
self._ip_settings = settings
self._connection = IPConnection()
self._io = TemperatureControllerAttributeIO(self._connection)
super().__init__(ios=[self._io])
async def connect(self):
await self._connection.connect(self._ip_settings)
async def initialise(self):
await self.connect()
api = json.loads((await self._connection.send_query("API?\r\n")).strip("\r\n"))
ramps_api = api.pop("Ramps")
for name, attribute in create_attributes(api).items():
self.add_attribute(name, attribute)
for idx, ramp_parameters in enumerate(ramps_api):
ramp_controller = TemperatureRampController(
idx + 1, ramp_parameters, self._io
)
await ramp_controller.initialise()
self.add_sub_controller(f"Ramp{idx + 1:02d}", ramp_controller)
await self._connection.close()
epics_ca = EpicsCATransport(epicsca=EpicsIOCOptions(pv_prefix="DEMO"))
connection_settings = IPConnectionSettings("localhost", 25565)
fastcs = FastCS(TemperatureController(connection_settings), [epics_ca])
if __name__ == "__main__":
fastcs.run()