Creating a FastCS Driver#
Demo Simulation#
Within FastCS there is a tickit simulation of a temperature controller. Clone the FastCS
repository and open it in VS Code. The simulation can be run with the
Temp Controller Sim launch config by typing Ctrl+P debug (note the trailing
whitespace), selecting the launch config and pressing enter. The simulation will then
sit and wait for commands to be sent. When it receives commands, it will log them to the
console to show what it is doing.
Note
FastCS must be installed with the demo extra for the demo simulator to run. This can
be done by running pip install 'fastcs[demo]'.
This tutorial will walk through the steps of writing a device driver to control this simulation.
FastCS Controllers#
The core of a FastCS device driver is the Controller. This class is used to implement
control of a device and instances can be loaded into a FastCS application to expose its
functionality.
Create a TemperatureController class that inherits from Controller.
Code 1
from fastcs.controllers import Controller
class TemperatureController(Controller):
pass
FastCS Launcher#
The entrypoint to a FastCS application is the FastCS class. This takes a Controller
and a list of transports to expose the API through and provides a run method to launch
the application. Create a FastCS instance, pass the TemperatureController to it
along with an empty list of transports (for now).
Code 2
from fastcs.controllers import Controller
from fastcs.launch import FastCS
class TemperatureController(Controller):
pass
fastcs = FastCS(TemperatureController(), [])
if __name__ == "__main__":
fastcs.run()
Now the application runs, but it still doesn’t expose any API because the Controller
is empty.
FastCS Attributes#
The simulator has an API to get its ID. To expose this in the driver, an Attribute can
be added to the Controller. There are 3 types of Attribute: AttrR, AttrW and
AttrRW, representing the access mode of the API. The ID can be read, but it cannot be
written, so add an AttrR. An Attribute also needs a type. The ID from the simulator
is a string, so String should be used.
Code 3
from fastcs.attributes import AttrR
from fastcs.controllers import Controller
from fastcs.datatypes import String
from fastcs.launch import FastCS
class TemperatureController(Controller):
device_id = AttrR(String())
fastcs = FastCS(TemperatureController(), [])
if __name__ == "__main__":
fastcs.run()
Now the controller has a property that will appear in the API, but there are no transports being run on the event loop to expose that API. The controller can be interacted with in the console, but note that it hasn’t populated any values because it doesn’t have a connection.
Interactive Shell
In [1]: controller.device_id
Out[1]: AttrR(String())
In [2]: controller.device_id.get()
Out[2]: ‘’
FastCS Transports#
FastCS supports multiple transports to expose the API of the loaded Controller. The
following transports are currently supported
EPICS CA (using
pythonSoftIOC)EPICS PVA (using
p4p)Tango (using
pytango)GraphQL (using
strawberry)HTTP (using
fastapi)
One or more of these can be loaded into the application and run in parallel. Add the
EPICS CA transport to the application by creating an EPICSCATransport instance and
passing it in.
Code 4
from fastcs.attributes import AttrR
from fastcs.controllers import Controller
from fastcs.datatypes import String
from fastcs.launch import FastCS
from fastcs.transports.epics import EpicsIOCOptions
from fastcs.transports.epics.ca.transport import EpicsCATransport
class TemperatureController(Controller):
device_id = AttrR(String())
epics_ca = EpicsCATransport(epicsca=EpicsIOCOptions(pv_prefix="DEMO"))
fastcs = FastCS(TemperatureController(), [epics_ca])
if __name__ == "__main__":
fastcs.run()
There will now be a DEMO:DeviceId PV being served by the application. However, the
record is unset because the Controller is not yet querying the simulator for the
value.
❯ caget -S DEMO:DeviceId
DEMO:DeviceId
Now that the controller has a PV, it would be useful to open a UI. Add EPICS GUI
options to the transport options and generate a demo.bob file to use with Phoebus.
Code 5
from pathlib import Path
from fastcs.attributes import AttrR
from fastcs.controllers import Controller
from fastcs.datatypes import String
from fastcs.launch import FastCS
from fastcs.transports.epics import EpicsGUIOptions, EpicsIOCOptions
from fastcs.transports.epics.ca import EpicsCATransport
class TemperatureController(Controller):
device_id = AttrR(String())
gui_options = EpicsGUIOptions(
output_path=Path(".") / "demo.bob", title="Demo Temperature Controller"
)
epics_ca = EpicsCATransport(gui=gui_options, epicsca=EpicsIOCOptions(pv_prefix="DEMO"))
fastcs = FastCS(TemperatureController(), [epics_ca])
if __name__ == "__main__":
fastcs.run()
The demo.bob will have been created in the directory the application was run from.
FastCS Device Connection#
The Attributes of a FastCS Controller need some IO with the device in order to get
and set values. This is implemented with AttributeIOs and connections. Generally each
driver implements its own IO and connection logic, but there are some built in options.
Update the controller to create an IPConnection to communicate with the simulator over
TCP and implement a connect method that establishes the connection. The connect
method is called by the FastCS application at the appropriate time during start up to
ensure the connection is established before it is used.
Note
The simulator control connection is on port 25565.
Code 6
from pathlib import Path
from fastcs.attributes import AttrR
from fastcs.connections import IPConnection, IPConnectionSettings
from fastcs.controllers import Controller
from fastcs.datatypes import String
from fastcs.launch import FastCS
from fastcs.transports.epics import EpicsGUIOptions, EpicsIOCOptions
from fastcs.transports.epics.ca import EpicsCATransport
class TemperatureController(Controller):
device_id = AttrR(String())
def __init__(self, settings: IPConnectionSettings):
super().__init__()
self._ip_settings = settings
self._connection = IPConnection()
async def connect(self):
await self._connection.connect(self._ip_settings)
gui_options = EpicsGUIOptions(
output_path=Path(".") / "demo.bob", title="Demo Temperature Controller"
)
epics_ca = EpicsCATransport(gui=gui_options, epicsca=EpicsIOCOptions(pv_prefix="DEMO"))
connection_settings = IPConnectionSettings("localhost", 25565)
fastcs = FastCS(TemperatureController(connection_settings), [epics_ca])
if __name__ == "__main__":
fastcs.run()
Warning
The application will now fail to connect if the demo simulation is not running.
The Controller has now established a connection with the simulator. This connection
can be passed to an AttributeIO to enable it to query the device API and update the
value in the device_id attribute. Create a TemperatureControllerAttributeIO child
class and implement the update method to query the device and set the value of the
attribute, and create a TemperatureControllerAttributeIORef and pass an instance of
it to the device_id attribute to tell the controller what io to use to update it.
Note
The update_period property tells the base class how often to call update
Code 7
from dataclasses import dataclass
from pathlib import Path
from typing import TypeVar
from fastcs.attributes import AttributeIO, AttributeIORef, AttrR
from fastcs.connections import IPConnection, IPConnectionSettings
from fastcs.controllers import Controller
from fastcs.datatypes import String
from fastcs.launch import FastCS
from fastcs.transports.epics import EpicsGUIOptions, EpicsIOCOptions
from fastcs.transports.epics.ca import EpicsCATransport
NumberT = TypeVar("NumberT", int, float)
@dataclass
class IDAttributeIORef(AttributeIORef):
update_period: float | None = 0.2
class IDAttributeIO(AttributeIO[NumberT, IDAttributeIORef]):
def __init__(self, connection: IPConnection):
super().__init__()
self._connection = connection
async def update(self, attr: AttrR[NumberT, IDAttributeIORef]):
response = await self._connection.send_query("ID?\r\n")
value = response.strip("\r\n")
await attr.update(attr.dtype(value))
class TemperatureController(Controller):
device_id = AttrR(String(), io_ref=IDAttributeIORef())
def __init__(self, settings: IPConnectionSettings):
self._ip_settings = settings
self._connection = IPConnection()
super().__init__(ios=[IDAttributeIO(self._connection)])
async def connect(self):
await self._connection.connect(self._ip_settings)
gui_options = EpicsGUIOptions(
output_path=Path(".") / "demo.bob", title="Demo Temperature Controller"
)
epics_ca = EpicsCATransport(gui=gui_options, epicsca=EpicsIOCOptions(pv_prefix="DEMO"))
connection_settings = IPConnectionSettings("localhost", 25565)
fastcs = FastCS(TemperatureController(connection_settings), [epics_ca])
if __name__ == "__main__":
fastcs.run()
Note
In the update method, errors won’t crash the application, but it prints them to the
terminal. - Update loop ... stopped:
Now the PV will be set by reading from the simulator and the IOC has one fully functional PV.
❯ caget -S DEMO:DeviceId
DEMO:DeviceId SIMTCONT123
Building Up The API#
The simulator supports many other commands, for example it reports the total power
currently being drawn with the P command. This can be exposed by adding another
AttrR with a Float datatype, but the IO only supports the ID command to get the
device ID. This new attribute could have its own IO, but it is similar enough that the
existing IO can be support both.
Modify the IO ref to take a name string and update the IO to use it in the query
string sent to the device. Create a new attribute to read the power usage using this.
Note
All responses from the IPConnection are strings. This is fine for the ID command
because the value is actually a string, but for P the value is a float, so the
update methods needs to explicitly cast to the correct type. It can use
Attribute.dtype to call the builtin for its datatype - e.g. int, float, str,
etc.
Code 8
from dataclasses import KW_ONLY, dataclass
from pathlib import Path
from typing import TypeVar
from fastcs.attributes import AttributeIO, AttributeIORef, AttrR
from fastcs.connections import IPConnection, IPConnectionSettings
from fastcs.controllers import Controller
from fastcs.datatypes import Float, String
from fastcs.launch import FastCS
from fastcs.transports.epics import EpicsGUIOptions, EpicsIOCOptions
from fastcs.transports.epics.ca import EpicsCATransport
NumberT = TypeVar("NumberT", int, float)
@dataclass
class TemperatureControllerAttributeIORef(AttributeIORef):
name: str
_: KW_ONLY
update_period: float | None = 0.2
class TemperatureControllerAttributeIO(
AttributeIO[NumberT, TemperatureControllerAttributeIORef]
):
def __init__(self, connection: IPConnection):
super().__init__()
self._connection = connection
async def update(self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef]):
query = f"{attr.io_ref.name}?"
response = await self._connection.send_query(f"{query}\r\n")
value = response.strip("\r\n")
await attr.update(attr.dtype(value))
class TemperatureController(Controller):
device_id = AttrR(String(), io_ref=TemperatureControllerAttributeIORef("ID"))
power = AttrR(Float(), io_ref=TemperatureControllerAttributeIORef("P"))
def __init__(self, settings: IPConnectionSettings):
self._ip_settings = settings
self._connection = IPConnection()
super().__init__(ios=[TemperatureControllerAttributeIO(self._connection)])
async def connect(self):
await self._connection.connect(self._ip_settings)
gui_options = EpicsGUIOptions(
output_path=Path(".") / "demo.bob", title="Demo Temperature Controller"
)
epics_ca = EpicsCATransport(gui=gui_options, epicsca=EpicsIOCOptions(pv_prefix="DEMO"))
connection_settings = IPConnectionSettings("localhost", 25565)
fastcs = FastCS(TemperatureController(connection_settings), [epics_ca])
if __name__ == "__main__":
fastcs.run()
Now the IOC has two PVs being polled periodically. The new PV will be visible in the
Phoebus UI on refresh (right-click). DEMO:Power will read as 0 because the simulator
is not currently running a ramp. To do that the controller needs to be able to set
values on the device, as well as read them back. The ramp rate of the temperature can be
read with the R command and set with the R=... command. This means the IO also needs
a send method to send values to the device.
Update the IO to implement send and then add a new AttrRW with type Float to get
and set the ramp rate.
Note
The set commands do not return a response, so use the send_command method instead of
send_query.
Code 9
from dataclasses import KW_ONLY, dataclass
from pathlib import Path
from typing import TypeVar
from fastcs.attributes import AttributeIO, AttributeIORef, AttrR, AttrRW, AttrW
from fastcs.connections import IPConnection, IPConnectionSettings
from fastcs.controllers import Controller
from fastcs.datatypes import Float, String
from fastcs.launch import FastCS
from fastcs.transports.epics import EpicsGUIOptions, EpicsIOCOptions
from fastcs.transports.epics.ca import EpicsCATransport
NumberT = TypeVar("NumberT", int, float)
@dataclass
class TemperatureControllerAttributeIORef(AttributeIORef):
name: str
_: KW_ONLY
update_period: float | None = 0.2
class TemperatureControllerAttributeIO(
AttributeIO[NumberT, TemperatureControllerAttributeIORef]
):
def __init__(self, connection: IPConnection):
super().__init__()
self._connection = connection
async def update(self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef]):
query = f"{attr.io_ref.name}?"
response = await self._connection.send_query(f"{query}\r\n")
value = response.strip("\r\n")
await attr.update(attr.dtype(value))
async def send(
self, attr: AttrW[NumberT, TemperatureControllerAttributeIORef], value: NumberT
) -> None:
command = f"{attr.io_ref.name}={attr.dtype(value)}"
await self._connection.send_command(f"{command}\r\n")
class TemperatureController(Controller):
device_id = AttrR(String(), io_ref=TemperatureControllerAttributeIORef("ID"))
power = AttrR(Float(), io_ref=TemperatureControllerAttributeIORef("P"))
ramp_rate = AttrRW(Float(), io_ref=TemperatureControllerAttributeIORef("R"))
def __init__(self, settings: IPConnectionSettings):
self._ip_settings = settings
self._connection = IPConnection()
super().__init__(ios=[TemperatureControllerAttributeIO(self._connection)])
async def connect(self):
await self._connection.connect(self._ip_settings)
gui_options = EpicsGUIOptions(
output_path=Path(".") / "demo.bob", title="Demo Temperature Controller"
)
epics_ca = EpicsCATransport(gui=gui_options, epicsca=EpicsIOCOptions(pv_prefix="DEMO"))
connection_settings = IPConnectionSettings("localhost", 25565)
fastcs = FastCS(TemperatureController(connection_settings), [epics_ca])
if __name__ == "__main__":
fastcs.run()
Two new PVs will be created: one to set the ramp rate and one to read it back.
❯ caget DEMO:RampRate_RBV
DEMO:RampRate_RBV 2
❯ caput DEMO:RampRate 5
Old : DEMO:RampRate 2
New : DEMO:RampRate 5
❯ caget DEMO:RampRate_RBV
DEMO:RampRate_RBV 5
The changes will also be visible in the simulator terminal.
INFO:fastcs.demo.simulation.device:Set ramp rate to 5.0
This adds the first method to modify the device, but more are needed to be able to run a
temperature ramp. The simulator has multiple temperature control loops that can be
ramped independently. They each have a common set of commands that control them
individually, for example to S01=... to set the start point for ramp 1, E02=... to
set the end point for ramp 2.
Given that the device has n instances of a common interface, it makes sense to create
a class to encapsulate this control and then instantiate it for each ramp the simulator
has. This can be done with the use of sub controllers. Controllers can be arbitrarily
nested to match the structure of a device and this structure is then mirrored to the
transport layer for the visibility of the user.
Create a TemperatureRampController with two AttrRWs the ramp start and end, update
the IO to include an optional suffix for the commands so that it can be shared with
the parent TemperatureController and add an argument to define how many ramps there
are, which is used to register the correct number of ramp controllers with the parent.
Code 10
from dataclasses import KW_ONLY, dataclass
from pathlib import Path
from typing import TypeVar
from fastcs.attributes import AttributeIO, AttributeIORef, AttrR, AttrRW, AttrW
from fastcs.connections import IPConnection, IPConnectionSettings
from fastcs.controllers import Controller
from fastcs.datatypes import Float, Int, String
from fastcs.launch import FastCS
from fastcs.transports.epics import EpicsGUIOptions, EpicsIOCOptions
from fastcs.transports.epics.ca import EpicsCATransport
NumberT = TypeVar("NumberT", int, float)
@dataclass
class TemperatureControllerAttributeIORef(AttributeIORef):
name: str
_: KW_ONLY
update_period: float | None = 0.2
class TemperatureControllerAttributeIO(
AttributeIO[NumberT, TemperatureControllerAttributeIORef]
):
def __init__(self, connection: IPConnection, suffix: str = ""):
super().__init__()
self._connection = connection
self._suffix = suffix
async def update(self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef]):
query = f"{attr.io_ref.name}{self._suffix}?"
response = await self._connection.send_query(f"{query}\r\n")
value = response.strip("\r\n")
await attr.update(attr.dtype(value))
async def send(
self, attr: AttrW[NumberT, TemperatureControllerAttributeIORef], value: NumberT
) -> None:
command = f"{attr.io_ref.name}{self._suffix}={attr.dtype(value)}"
await self._connection.send_command(f"{command}\r\n")
class TemperatureRampController(Controller):
start = AttrRW(Int(), io_ref=TemperatureControllerAttributeIORef(name="S"))
end = AttrRW(Int(), io_ref=TemperatureControllerAttributeIORef(name="E"))
def __init__(self, index: int, connection: IPConnection) -> None:
suffix = f"{index:02d}"
super().__init__(
f"Ramp{suffix}", ios=[TemperatureControllerAttributeIO(connection, suffix)]
)
class TemperatureController(Controller):
device_id = AttrR(String(), io_ref=TemperatureControllerAttributeIORef("ID"))
power = AttrR(Float(), io_ref=TemperatureControllerAttributeIORef("P"))
ramp_rate = AttrRW(Float(), io_ref=TemperatureControllerAttributeIORef("R"))
def __init__(self, ramp_count: int, settings: IPConnectionSettings):
self._ip_settings = settings
self._connection = IPConnection()
super().__init__(ios=[TemperatureControllerAttributeIO(self._connection)])
self._ramp_controllers: list[TemperatureRampController] = []
for index in range(1, ramp_count + 1):
controller = TemperatureRampController(index, self._connection)
self._ramp_controllers.append(controller)
self.add_sub_controller(f"R{index}", controller)
async def connect(self):
await self._connection.connect(self._ip_settings)
gui_options = EpicsGUIOptions(
output_path=Path(".") / "demo.bob", title="Demo Temperature Controller"
)
epics_ca = EpicsCATransport(gui=gui_options, epicsca=EpicsIOCOptions(pv_prefix="DEMO"))
connection_settings = IPConnectionSettings("localhost", 25565)
fastcs = FastCS(TemperatureController(4, connection_settings), [epics_ca])
if __name__ == "__main__":
fastcs.run()
New PVs will be added (e.g. DEMO:R1:Start):
DEMO:R{1,2,3,4}:StartDEMO:R{1,2,3,4}:Start_RBVDEMO:R{1,2,3,4}:EndDEMO:R{1,2,3,4}:End_RBV
Four buttons will also be added to the Phoebus UI to open sub screens for each ramp.
This allows the controller to set the range of every temperature ramp. Again, the
simulator terminal will confirm that the changes are taking effect. The final commands
needed to run a temperature ramp are the N01 and N01= commands, which are used to
enable (and disable) the ramping.
Add an AttrRW to the TemperatureRampControllers with an Enum type, using a
StrEnum with states Off and On.
Code 11
import enum
from dataclasses import KW_ONLY, dataclass
from pathlib import Path
from typing import TypeVar
from fastcs.attributes import AttributeIO, AttributeIORef, AttrR, AttrRW, AttrW
from fastcs.connections import IPConnection, IPConnectionSettings
from fastcs.controllers import Controller
from fastcs.datatypes import Enum, Float, Int, String
from fastcs.launch import FastCS
from fastcs.transports.epics import EpicsGUIOptions, EpicsIOCOptions
from fastcs.transports.epics.ca import EpicsCATransport
NumberT = TypeVar("NumberT", int, float)
@dataclass
class TemperatureControllerAttributeIORef(AttributeIORef):
name: str
_: KW_ONLY
update_period: float | None = 0.2
class TemperatureControllerAttributeIO(
AttributeIO[NumberT, TemperatureControllerAttributeIORef]
):
def __init__(self, connection: IPConnection, suffix: str = ""):
super().__init__()
self._connection = connection
self._suffix = suffix
async def update(self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef]):
query = f"{attr.io_ref.name}{self._suffix}?"
response = await self._connection.send_query(f"{query}\r\n")
value = response.strip("\r\n")
await attr.update(attr.dtype(value))
async def send(
self, attr: AttrW[NumberT, TemperatureControllerAttributeIORef], value: NumberT
) -> None:
command = f"{attr.io_ref.name}{self._suffix}={attr.dtype(value)}"
await self._connection.send_command(f"{command}\r\n")
class OnOffEnum(enum.StrEnum):
Off = "0"
On = "1"
class TemperatureRampController(Controller):
start = AttrRW(Int(), io_ref=TemperatureControllerAttributeIORef(name="S"))
end = AttrRW(Int(), io_ref=TemperatureControllerAttributeIORef(name="E"))
enabled = AttrRW(Enum(OnOffEnum), io_ref=TemperatureControllerAttributeIORef("N"))
def __init__(self, index: int, connection: IPConnection) -> None:
suffix = f"{index:02d}"
super().__init__(
f"Ramp{suffix}", ios=[TemperatureControllerAttributeIO(connection, suffix)]
)
class TemperatureController(Controller):
device_id = AttrR(String(), io_ref=TemperatureControllerAttributeIORef("ID"))
power = AttrR(Float(), io_ref=TemperatureControllerAttributeIORef("P"))
ramp_rate = AttrRW(Float(), io_ref=TemperatureControllerAttributeIORef("R"))
def __init__(self, ramp_count: int, settings: IPConnectionSettings):
self._ip_settings = settings
self._connection = IPConnection()
super().__init__(ios=[TemperatureControllerAttributeIO(self._connection)])
self._ramp_controllers: list[TemperatureRampController] = []
for index in range(1, ramp_count + 1):
controller = TemperatureRampController(index, self._connection)
self._ramp_controllers.append(controller)
self.add_sub_controller(f"R{index}", controller)
async def connect(self):
await self._connection.connect(self._ip_settings)
gui_options = EpicsGUIOptions(
output_path=Path(".") / "demo.bob", title="Demo Temperature Controller"
)
epics_ca = EpicsCATransport(gui=gui_options, epicsca=EpicsIOCOptions(pv_prefix="DEMO"))
connection_settings = IPConnectionSettings("localhost", 25565)
fastcs = FastCS(TemperatureController(4, connection_settings), [epics_ca])
if __name__ == "__main__":
fastcs.run()
Now the temperature ramp can be run.
❯ caput DEMO:R1:Enabled On
Old : DEMO:R1:Enabled Off
New : DEMO:R1:Enabled On
❯ caget DEMO:Power
DEMO:Power 56.84
❯ caput DEMO:R1:Enabled Off
Old : DEMO:R1:Enabled On
New : DEMO:R1:Enabled Off
❯ caget DEMO:Power
DEMO:Power 0
In the simulator terminal the progress of the ramp can be seen as it happens.
INFO:fastcs.demo.simulation.device:Started ramp 0
INFO:fastcs.demo.simulation.device:Target Temperatures: 10.000, 0.000, 0.000, 0.000
INFO:fastcs.demo.simulation.device:Actual Temperatures: 9.572, 0.000, 0.000, 0.000
INFO:fastcs.demo.simulation.device:Target Temperatures: 10.200, 0.000, 0.000, 0.000
INFO:fastcs.demo.simulation.device:Actual Temperatures: 9.952, 0.000, 0.000, 0.000
INFO:fastcs.demo.simulation.device:Target Temperatures: 10.400, 0.000, 0.000, 0.000
...
INFO:fastcs.demo.simulation.device:Stopped ramp 0
The target and actual temperatures visible in the simulator terminal are also exposed in
the API with the T01? and A01? commands.
FastCS Methods#
The applied voltage for each ramp is also available with the V? command, but the value
is an array with each element corresponding to a ramp. Here it will be simplest to
manually fetch the array in the parent controller and pass each value into ramp
controller. This can be done with a scan method - these are called at a defined rate,
similar to the update method of an AttributeIO.
Add an AttrR for the voltage to the TemperatureRampController, but do not pass it an
IO ref. Then add a method to the TemperatureController with a @scan decorator that
gets the array of voltages and sets each ramp controller with its value. Also add
AttrRs for the target and actual temperature for each ramp as described above.
Code 12
import enum
import json
from dataclasses import KW_ONLY, dataclass
from pathlib import Path
from typing import TypeVar
from fastcs.attributes import AttributeIO, AttributeIORef, AttrR, AttrRW, AttrW
from fastcs.connections import IPConnection, IPConnectionSettings
from fastcs.controllers import Controller
from fastcs.datatypes import Enum, Float, Int, String
from fastcs.launch import FastCS
from fastcs.methods import scan
from fastcs.transports.epics import EpicsGUIOptions, EpicsIOCOptions
from fastcs.transports.epics.ca import EpicsCATransport
NumberT = TypeVar("NumberT", int, float)
@dataclass
class TemperatureControllerAttributeIORef(AttributeIORef):
name: str
_: KW_ONLY
update_period: float | None = 0.2
class TemperatureControllerAttributeIO(
AttributeIO[NumberT, TemperatureControllerAttributeIORef]
):
def __init__(self, connection: IPConnection, suffix: str = ""):
super().__init__()
self._connection = connection
self._suffix = suffix
async def update(self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef]):
query = f"{attr.io_ref.name}{self._suffix}?"
response = await self._connection.send_query(f"{query}\r\n")
value = response.strip("\r\n")
await attr.update(attr.dtype(value))
async def send(
self, attr: AttrW[NumberT, TemperatureControllerAttributeIORef], value: NumberT
) -> None:
command = f"{attr.io_ref.name}{self._suffix}={attr.dtype(value)}"
await self._connection.send_command(f"{command}\r\n")
class OnOffEnum(enum.StrEnum):
Off = "0"
On = "1"
class TemperatureRampController(Controller):
start = AttrRW(Int(), io_ref=TemperatureControllerAttributeIORef(name="S"))
end = AttrRW(Int(), io_ref=TemperatureControllerAttributeIORef(name="E"))
enabled = AttrRW(Enum(OnOffEnum), io_ref=TemperatureControllerAttributeIORef("N"))
target = AttrR(Float(), io_ref=TemperatureControllerAttributeIORef("T"))
actual = AttrR(Float(), io_ref=TemperatureControllerAttributeIORef("A"))
voltage = AttrR(Float())
def __init__(self, index: int, connection: IPConnection) -> None:
suffix = f"{index:02d}"
super().__init__(
f"Ramp{suffix}", ios=[TemperatureControllerAttributeIO(connection, suffix)]
)
class TemperatureController(Controller):
device_id = AttrR(String(), io_ref=TemperatureControllerAttributeIORef("ID"))
power = AttrR(Float(), io_ref=TemperatureControllerAttributeIORef("P"))
ramp_rate = AttrRW(Float(), io_ref=TemperatureControllerAttributeIORef("R"))
def __init__(self, ramp_count: int, settings: IPConnectionSettings):
self._ip_settings = settings
self._connection = IPConnection()
super().__init__(ios=[TemperatureControllerAttributeIO(self._connection)])
self._ramp_controllers: list[TemperatureRampController] = []
for index in range(1, ramp_count + 1):
controller = TemperatureRampController(index, self._connection)
self._ramp_controllers.append(controller)
self.add_sub_controller(f"R{index}", controller)
async def connect(self):
await self._connection.connect(self._ip_settings)
@scan(0.1)
async def update_voltages(self):
voltages = json.loads(
(await self._connection.send_query("V?\r\n")).strip("\r\n")
)
for index, controller in enumerate(self._ramp_controllers):
await controller.voltage.update(float(voltages[index]))
gui_options = EpicsGUIOptions(
output_path=Path(".") / "demo.bob", title="Demo Temperature Controller"
)
epics_ca = EpicsCATransport(gui=gui_options, epicsca=EpicsIOCOptions(pv_prefix="DEMO"))
connection_settings = IPConnectionSettings("localhost", 25565)
fastcs = FastCS(TemperatureController(4, connection_settings), [epics_ca])
if __name__ == "__main__":
fastcs.run()
Creating attributes is intended to be a simple API covering most use cases, but where
more flexibility is needed wrapped controller methods can be useful to avoid adding
complexity to the IO to handle a small subset of attributes. It is also useful for
implementing higher level logic on top of the attributes that expose the API of a device
directly. For example, it would be useful to have a single button to stop all of the
ramps at the same time. This can be done with a command method. These are similar to
scan methods except that they create an API in transport layer in the same way an
attribute does.
Add a method with a @command decorator to set enabled to false in every ramp
controller.
Code 13
import asyncio
import enum
import json
from dataclasses import KW_ONLY, dataclass
from pathlib import Path
from typing import TypeVar
from fastcs.attributes import AttributeIO, AttributeIORef, AttrR, AttrRW, AttrW
from fastcs.connections import IPConnection, IPConnectionSettings
from fastcs.controllers import Controller
from fastcs.datatypes import Enum, Float, Int, String
from fastcs.launch import FastCS
from fastcs.methods import command, scan
from fastcs.transports.epics import EpicsGUIOptions, EpicsIOCOptions
from fastcs.transports.epics.ca import EpicsCATransport
NumberT = TypeVar("NumberT", int, float)
@dataclass
class TemperatureControllerAttributeIORef(AttributeIORef):
name: str
_: KW_ONLY
update_period: float | None = 0.2
class TemperatureControllerAttributeIO(
AttributeIO[NumberT, TemperatureControllerAttributeIORef]
):
def __init__(self, connection: IPConnection, suffix: str = ""):
super().__init__()
self._connection = connection
self._suffix = suffix
async def update(self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef]):
query = f"{attr.io_ref.name}{self._suffix}?"
response = await self._connection.send_query(f"{query}\r\n")
value = response.strip("\r\n")
await attr.update(attr.dtype(value))
async def send(
self, attr: AttrW[NumberT, TemperatureControllerAttributeIORef], value: NumberT
) -> None:
command = f"{attr.io_ref.name}{self._suffix}={attr.dtype(value)}"
await self._connection.send_command(f"{command}\r\n")
class OnOffEnum(enum.StrEnum):
Off = "0"
On = "1"
class TemperatureRampController(Controller):
start = AttrRW(Int(), io_ref=TemperatureControllerAttributeIORef(name="S"))
end = AttrRW(Int(), io_ref=TemperatureControllerAttributeIORef(name="E"))
enabled = AttrRW(Enum(OnOffEnum), io_ref=TemperatureControllerAttributeIORef("N"))
target = AttrR(Float(), io_ref=TemperatureControllerAttributeIORef("T"))
actual = AttrR(Float(), io_ref=TemperatureControllerAttributeIORef("A"))
voltage = AttrR(Float())
def __init__(self, index: int, connection: IPConnection) -> None:
suffix = f"{index:02d}"
super().__init__(
f"Ramp{suffix}", ios=[TemperatureControllerAttributeIO(connection, suffix)]
)
class TemperatureController(Controller):
device_id = AttrR(String(), io_ref=TemperatureControllerAttributeIORef("ID"))
power = AttrR(Float(), io_ref=TemperatureControllerAttributeIORef("P"))
ramp_rate = AttrRW(Float(), io_ref=TemperatureControllerAttributeIORef("R"))
def __init__(self, ramp_count: int, settings: IPConnectionSettings):
self._ip_settings = settings
self._connection = IPConnection()
super().__init__(ios=[TemperatureControllerAttributeIO(self._connection)])
self._ramp_controllers: list[TemperatureRampController] = []
for index in range(1, ramp_count + 1):
controller = TemperatureRampController(index, self._connection)
self._ramp_controllers.append(controller)
self.add_sub_controller(f"R{index}", controller)
async def connect(self):
await self._connection.connect(self._ip_settings)
@scan(0.1)
async def update_voltages(self):
voltages = json.loads(
(await self._connection.send_query("V?\r\n")).strip("\r\n")
)
for index, controller in enumerate(self._ramp_controllers):
await controller.voltage.update(float(voltages[index]))
@command()
async def disable_all(self) -> None:
for rc in self._ramp_controllers:
await rc.enabled.put(OnOffEnum.Off, sync_setpoint=True)
# TODO: The requests all get concatenated and the sim doesn't handle it
await asyncio.sleep(0.1)
gui_options = EpicsGUIOptions(
output_path=Path(".") / "demo.bob", title="Demo Temperature Controller"
)
epics_ca = EpicsCATransport(gui=gui_options, epicsca=EpicsIOCOptions(pv_prefix="DEMO"))
connection_settings = IPConnectionSettings("localhost", 25565)
fastcs = FastCS(TemperatureController(4, connection_settings), [epics_ca])
if __name__ == "__main__":
fastcs.run()
The new DEMO:CancelAll PV can be set (the value doesn’t matter) to stop all of the
ramps.
❯ caget DEMO:R1:Enabled_RBV
DEMO:R1:Enabled_RBV On
❯ caput DEMO:DisableAll 1
Old : DEMO:DisableAll
New : DEMO:DisableAll
❯ caget DEMO:R1:Enabled_RBV
DEMO:R1:Enabled_RBV Off
Logging#
FastCS has convenient logging support to provide status and metrics from the
application. To enable logging from the core framework call configure_logging with no
arguments (the default logging level is INFO). To log messages from a driver, either
import the singleton logger directly, or to provide more context to the message, call
bind_logger with a name (usually either the name of the module or the name of the
class).
Create a module-level logger to log status of the application start up. Create a class
logger for TemperatureControllerAttributeIO to log the commands it sends.
Code 14
import asyncio
import enum
import json
from dataclasses import KW_ONLY, dataclass
from pathlib import Path
from typing import TypeVar
from fastcs.attributes import AttributeIO, AttributeIORef, AttrR, AttrRW, AttrW
from fastcs.connections import IPConnection, IPConnectionSettings
from fastcs.controllers import Controller
from fastcs.datatypes import Enum, Float, Int, String
from fastcs.launch import FastCS
from fastcs.logging import bind_logger, configure_logging
from fastcs.methods import command, scan
from fastcs.transports.epics import EpicsGUIOptions, EpicsIOCOptions
from fastcs.transports.epics.ca import EpicsCATransport
logger = bind_logger(__name__)
NumberT = TypeVar("NumberT", int, float)
@dataclass
class TemperatureControllerAttributeIORef(AttributeIORef):
name: str
_: KW_ONLY
update_period: float | None = 0.2
class TemperatureControllerAttributeIO(
AttributeIO[NumberT, TemperatureControllerAttributeIORef]
):
def __init__(self, connection: IPConnection, suffix: str = ""):
super().__init__()
self.logger = bind_logger(__class__.__name__)
self._connection = connection
self._suffix = suffix
async def update(self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef]):
query = f"{attr.io_ref.name}{self._suffix}?"
response = await self._connection.send_query(f"{query}\r\n")
value = response.strip("\r\n")
await attr.update(attr.dtype(value))
async def send(
self, attr: AttrW[NumberT, TemperatureControllerAttributeIORef], value: NumberT
) -> None:
command = f"{attr.io_ref.name}{self._suffix}={attr.dtype(value)}"
self.logger.info("Sending attribute value", command=command, attribute=attr)
await self._connection.send_command(f"{command}\r\n")
class OnOffEnum(enum.StrEnum):
Off = "0"
On = "1"
class TemperatureRampController(Controller):
start = AttrRW(Int(), io_ref=TemperatureControllerAttributeIORef(name="S"))
end = AttrRW(Int(), io_ref=TemperatureControllerAttributeIORef(name="E"))
enabled = AttrRW(Enum(OnOffEnum), io_ref=TemperatureControllerAttributeIORef("N"))
target = AttrR(Float(), io_ref=TemperatureControllerAttributeIORef("T"))
actual = AttrR(Float(), io_ref=TemperatureControllerAttributeIORef("A"))
voltage = AttrR(Float())
def __init__(self, index: int, connection: IPConnection) -> None:
suffix = f"{index:02d}"
super().__init__(
f"Ramp{suffix}", ios=[TemperatureControllerAttributeIO(connection, suffix)]
)
class TemperatureController(Controller):
device_id = AttrR(String(), io_ref=TemperatureControllerAttributeIORef("ID"))
power = AttrR(Float(), io_ref=TemperatureControllerAttributeIORef("P"))
ramp_rate = AttrRW(Float(), io_ref=TemperatureControllerAttributeIORef("R"))
def __init__(self, ramp_count: int, settings: IPConnectionSettings):
self._ip_settings = settings
self._connection = IPConnection()
super().__init__(ios=[TemperatureControllerAttributeIO(self._connection)])
self._ramp_controllers: list[TemperatureRampController] = []
for index in range(1, ramp_count + 1):
controller = TemperatureRampController(index, self._connection)
self._ramp_controllers.append(controller)
self.add_sub_controller(f"R{index}", controller)
async def connect(self):
await self._connection.connect(self._ip_settings)
@scan(0.1)
async def update_voltages(self):
voltages = json.loads(
(await self._connection.send_query("V?\r\n")).strip("\r\n")
)
for index, controller in enumerate(self._ramp_controllers):
await controller.voltage.update(float(voltages[index]))
@command()
async def disable_all(self) -> None:
self.log_event("Disabling all ramps")
for rc in self._ramp_controllers:
await rc.enabled.put(OnOffEnum.Off, sync_setpoint=True)
# TODO: The requests all get concatenated and the sim doesn't handle it
await asyncio.sleep(0.1)
configure_logging()
gui_options = EpicsGUIOptions(
output_path=Path(".") / "demo.bob", title="Demo Temperature Controller"
)
epics_ca = EpicsCATransport(gui=gui_options, epicsca=EpicsIOCOptions(pv_prefix="DEMO"))
connection_settings = IPConnectionSettings("localhost", 25565)
logger.info("Configuring connection settings", connection_settings=connection_settings)
fastcs = FastCS(TemperatureController(4, connection_settings), [epics_ca])
if __name__ == "__main__":
fastcs.run()
Try setting a PV and check the console for the log message it prints.
[2025-11-18 11:26:41.065+0000 I] Sending attribute value [TemperatureControllerAttributeIO] command=E01=70, attribute=AttrRW(path=R1.end, datatype=Int, io_ref=TemperatureControllerAttributeIORef(update_period=0.2, name='E'))
A similar log message could be added for the update method of the IO, but this would be
very verbose. For this use case FastCS provides the Tracer class, which is inherited
by AttributeIO, among other core FastCS classes. This adds a enables logging TRACE
level log messages that are disabled by default, but can be enabled at runtime.
Update the send method of the IO to log a message showing the query that was sent and
the response from the device. Update the configure_logging call to pass
LogLevel.TRACE as the log level, so that when tracing is enabled the messages are
visible.
Code 15
import asyncio
import enum
import json
from dataclasses import KW_ONLY, dataclass
from pathlib import Path
from typing import TypeVar
from fastcs.attributes import AttributeIO, AttributeIORef, AttrR, AttrRW, AttrW
from fastcs.connections import IPConnection, IPConnectionSettings
from fastcs.controllers import Controller
from fastcs.datatypes import Enum, Float, Int, String
from fastcs.launch import FastCS
from fastcs.logging import LogLevel, bind_logger, configure_logging
from fastcs.methods import command, scan
from fastcs.transports.epics import EpicsGUIOptions, EpicsIOCOptions
from fastcs.transports.epics.ca import EpicsCATransport
logger = bind_logger(__name__)
NumberT = TypeVar("NumberT", int, float)
@dataclass
class TemperatureControllerAttributeIORef(AttributeIORef):
name: str
_: KW_ONLY
update_period: float | None = 0.2
class TemperatureControllerAttributeIO(
AttributeIO[NumberT, TemperatureControllerAttributeIORef]
):
def __init__(self, connection: IPConnection, suffix: str = ""):
super().__init__()
self.logger = bind_logger(__class__.__name__)
self._connection = connection
self._suffix = suffix
async def update(self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef]):
query = f"{attr.io_ref.name}{self._suffix}?"
response = await self._connection.send_query(f"{query}\r\n")
value = response.strip("\r\n")
self.log_event("Query for attribute", query=query, response=value, topic=attr)
await attr.update(attr.dtype(value))
async def send(
self, attr: AttrW[NumberT, TemperatureControllerAttributeIORef], value: NumberT
) -> None:
command = f"{attr.io_ref.name}{self._suffix}={attr.dtype(value)}"
self.logger.info("Sending attribute value", command=command, attribute=attr)
await self._connection.send_command(f"{command}\r\n")
class OnOffEnum(enum.StrEnum):
Off = "0"
On = "1"
class TemperatureRampController(Controller):
start = AttrRW(Int(), io_ref=TemperatureControllerAttributeIORef(name="S"))
end = AttrRW(Int(), io_ref=TemperatureControllerAttributeIORef(name="E"))
enabled = AttrRW(Enum(OnOffEnum), io_ref=TemperatureControllerAttributeIORef("N"))
target = AttrR(Float(), io_ref=TemperatureControllerAttributeIORef("T"))
actual = AttrR(Float(), io_ref=TemperatureControllerAttributeIORef("A"))
voltage = AttrR(Float())
def __init__(self, index: int, connection: IPConnection) -> None:
suffix = f"{index:02d}"
super().__init__(
f"Ramp{suffix}", ios=[TemperatureControllerAttributeIO(connection, suffix)]
)
class TemperatureController(Controller):
device_id = AttrR(String(), io_ref=TemperatureControllerAttributeIORef("ID"))
power = AttrR(Float(), io_ref=TemperatureControllerAttributeIORef("P"))
ramp_rate = AttrRW(Float(), io_ref=TemperatureControllerAttributeIORef("R"))
def __init__(self, ramp_count: int, settings: IPConnectionSettings):
self._ip_settings = settings
self._connection = IPConnection()
super().__init__(ios=[TemperatureControllerAttributeIO(self._connection)])
self._ramp_controllers: list[TemperatureRampController] = []
for index in range(1, ramp_count + 1):
controller = TemperatureRampController(index, self._connection)
self._ramp_controllers.append(controller)
self.add_sub_controller(f"R{index}", controller)
async def connect(self):
await self._connection.connect(self._ip_settings)
@scan(0.1)
async def update_voltages(self):
voltages = json.loads(
(await self._connection.send_query("V?\r\n")).strip("\r\n")
)
for index, controller in enumerate(self._ramp_controllers):
await controller.voltage.update(float(voltages[index]))
@command()
async def disable_all(self) -> None:
self.log_event("Disabling all ramps")
for rc in self._ramp_controllers:
await rc.enabled.put(OnOffEnum.Off, sync_setpoint=True)
# TODO: The requests all get concatenated and the sim doesn't handle it
await asyncio.sleep(0.1)
configure_logging(LogLevel.TRACE)
gui_options = EpicsGUIOptions(
output_path=Path(".") / "demo.bob", title="Demo Temperature Controller"
)
epics_ca = EpicsCATransport(gui=gui_options, epicsca=EpicsIOCOptions(pv_prefix="DEMO"))
connection_settings = IPConnectionSettings("localhost", 25565)
logger.info("Configuring connection settings", connection_settings=connection_settings)
fastcs = FastCS(TemperatureController(4, connection_settings), [epics_ca])
if __name__ == "__main__":
fastcs.run()
Enable tracing on the power attribute by calling enable_tracing and then enable a
ramp so that the value updates. Check the console to see the messages. Call disable_tracing to disable the log messages for `power.
In [1]: controller.power.enable_tracing()
[2025-11-18 11:11:12.060+0000 T] Query for attribute [TemperatureControllerAttributeIO] query=P?, response=0.0
[2025-11-18 11:11:12.060+0000 T] Attribute set [AttrR] attribute=AttrR(path=power, datatype=Float, io_ref=TemperatureControllerAttributeIORef(update_period=0.2, name='P')), value=0.0
[2025-11-18 11:11:12.060+0000 T] PV set from attribute [fastcs.transports.epics.ca.ioc] pv=DEMO:Power, value=0.0
[2025-11-18 11:11:12.194+0000 I] PV put: DEMO:R1:Enabled = 1 [fastcs.transports.epics.ca.ioc] pv=DEMO:R1:Enabled, value=1
[2025-11-18 11:11:12.195+0000 I] Sending attribute value [TemperatureControllerAttributeIO] command=N01=1, attribute=AttrRW(path=R1.enabled, datatype=Enum, io_ref=TemperatureControllerAttributeIORef(update_period=0.2, name='N'))
[2025-11-18 11:11:12.261+0000 T] Update attribute [AttrR]
[2025-11-18 11:11:12.262+0000 T] Query for attribute [TemperatureControllerAttributeIO] query=P?, response=29.040181873093132
[2025-11-18 11:11:12.262+0000 T] Attribute set [AttrR] attribute=AttrR(path=power, datatype=Float, io_ref=TemperatureControllerAttributeIORef(update_period=0.2, name='P')), value=29.040181873093132
[2025-11-18 11:11:12.262+0000 T] PV set from attribute [fastcs.transports.epics.ca.ioc] pv=DEMO:Power, value=29.04
[2025-11-18 11:11:12.463+0000 T] Update attribute [AttrR]
[2025-11-18 11:11:12.464+0000 T] Query for attribute [TemperatureControllerAttributeIO] query=P?, response=30.452524641833854
[2025-11-18 11:11:12.464+0000 T] Attribute set [AttrR] attribute=AttrR(path=power, datatype=Float, io_ref=TemperatureControllerAttributeIORef(update_period=0.2, name='P')), value=30.452524641833854
[2025-11-18 11:11:12.465+0000 T] PV set from attribute [fastcs.transports.epics.ca.ioc] pv=DEMO:Power, value=30.45
In [2]: controller.power.disable_tracing()
These log messages includes other trace loggers that log messages with power as the
topic, so they also appear automatically, so the log messages show changes to the
attribute throughout the stack: the query to the device and its response, the value the
attribute is set to, and the value that the PV in the EPICS CA transport is set to.
Note
The Tracer can also be used as a module-level instance for use in free functions.
from fastcs.tracer import Tracer
tracer = Tracer(__name__)
def handle_attribute(attr):
tracer.log_event("Handling attribute", topic=attr)
These messages can then be enabled by calling enable_tracing on the module-level
Tracer, or more likely on a specific attribute.
Summary#
This demonstrates some of the simple use cases for a statically defined FastCS driver. It is also possible to instantiate a driver dynamically by instantiating a device during startup. See the next tutorial for how to do this.