Source code for fastcs.demo.controllers

import asyncio
import enum
import json
from dataclasses import KW_ONLY, dataclass
from typing import TypeVar

from fastcs.attributes import AttributeIO, AttributeIORef, AttrR, AttrRW, AttrW
from fastcs.connections import IPConnection, IPConnectionSettings
from fastcs.controllers import Controller
from fastcs.datatypes import Enum, Float, Int
from fastcs.methods import command, scan

NumberT = TypeVar("NumberT", int, float)


class OnOffEnum(enum.StrEnum):
    Off = "0"
    On = "1"


[docs] @dataclass class TemperatureControllerSettings: num_ramp_controllers: int ip_settings: IPConnectionSettings
[docs] @dataclass class TemperatureControllerAttributeIORef(AttributeIORef): name: str _: KW_ONLY update_period: float | None = 0.2
class TemperatureControllerAttributeIO( AttributeIO[NumberT, TemperatureControllerAttributeIORef] ): def __init__(self, connection: IPConnection, suffix: str): super().__init__() self._connection = connection self.suffix = suffix async def send( self, attr: AttrW[NumberT, TemperatureControllerAttributeIORef], value: NumberT ) -> None: command = f"{attr.io_ref.name}{self.suffix}={attr.dtype(value)}" await self._connection.send_command(f"{command}\r\n") self.log_event("Send command for attribute", topic=attr, command=command) async def update( self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef] ) -> None: query = f"{attr.io_ref.name}{self.suffix}?" response = await self._connection.send_query(f"{query}\r\n") response = response.strip("\r\n") self.log_event( "Query for attribute", topic=attr, query=query, response=response, ) await attr.update(attr.dtype(response)) class TemperatureController(Controller): ramp_rate = AttrRW(Float(), io_ref=TemperatureControllerAttributeIORef(name="R")) power = AttrR(Float(), io_ref=TemperatureControllerAttributeIORef(name="P")) def __init__(self, settings: TemperatureControllerSettings) -> None: self.connection = IPConnection() self.suffix = "" super().__init__( ios=[TemperatureControllerAttributeIO(self.connection, self.suffix)] ) self._settings = settings self._ramp_controllers: list[TemperatureRampController] = [] for index in range(1, settings.num_ramp_controllers + 1): controller = TemperatureRampController(index, self.connection) self._ramp_controllers.append(controller) self.add_sub_controller(f"R{index}", controller) @command() async def cancel_all(self) -> None: for rc in self._ramp_controllers: await rc.enabled.put(OnOffEnum.Off, sync_setpoint=True) # TODO: The requests all get concatenated and the sim doesn't handle it await asyncio.sleep(0.1) async def connect(self) -> None: await self.connection.connect(self._settings.ip_settings) async def close(self) -> None: await self.connection.close() @scan(0.1) async def update_voltages(self): query = "V?" voltages = json.loads( (await self.connection.send_query(f"{query}\r\n")).strip("\r\n") ) for index, controller in enumerate(self._ramp_controllers): self.log_event( "Update voltages", topic=controller.voltage, query=query, response=voltages, ) await controller.voltage.update(float(voltages[index])) class TemperatureRampController(Controller): start = AttrRW(Int(), io_ref=TemperatureControllerAttributeIORef(name="S")) end = AttrRW(Int(), io_ref=TemperatureControllerAttributeIORef(name="E")) enabled = AttrRW( Enum(OnOffEnum), io_ref=TemperatureControllerAttributeIORef(name="N") ) target = AttrR(Float(prec=3), io_ref=TemperatureControllerAttributeIORef(name="T")) actual = AttrR(Float(prec=3), io_ref=TemperatureControllerAttributeIORef(name="A")) voltage = AttrR(Float(prec=3)) def __init__(self, index: int, conn: IPConnection) -> None: suffix = f"{index:02d}" super().__init__( f"Ramp{suffix}", ios=[TemperatureControllerAttributeIO(conn, suffix)] ) self.connection = conn