Source code for fastcs.demo.controllers

from __future__ import annotations

import asyncio
import enum
import json
from dataclasses import dataclass
from typing import TypeVar

from fastcs.attribute_io import AttributeIO
from fastcs.attribute_io_ref import AttributeIORef
from fastcs.attributes import AttrR, AttrRW, AttrW
from fastcs.connections import IPConnection, IPConnectionSettings
from fastcs.controller import Controller
from fastcs.datatypes import Enum, Float, Int
from fastcs.wrappers import command, scan

NumberT = TypeVar("NumberT", int, float)


class OnOffEnum(enum.StrEnum):
    Off = "0"
    On = "1"


[docs] @dataclass class TemperatureControllerSettings: num_ramp_controllers: int ip_settings: IPConnectionSettings
[docs] @dataclass(kw_only=True) class TemperatureControllerAttributeIORef(AttributeIORef): name: str update_period: float | None = 0.2 def __post_init__(self): # Call __init__ of non-dataclass parent super().__init__()
class TemperatureControllerAttributeIO( AttributeIO[NumberT, TemperatureControllerAttributeIORef] ): def __init__(self, connection: IPConnection, suffix: str): self._connection = connection self.suffix = suffix async def send( self, attr: AttrW[NumberT, TemperatureControllerAttributeIORef], value: NumberT ) -> None: command = f"{attr.io_ref.name}{self.suffix}={attr.dtype(value)}" await self._connection.send_command(f"{command}\r\n") self.log_event("Send command for attribute", topic=attr, command=command) async def update( self, attr: AttrR[NumberT, TemperatureControllerAttributeIORef] ) -> None: query = f"{attr.io_ref.name}{self.suffix}?" response = await self._connection.send_query(f"{query}?\r\n") response = response.strip("\r\n") self.log_event( "Query for attribute", topic=attr, query=query, response=response, ) await attr.set(attr.dtype(response)) class TemperatureController(Controller): ramp_rate = AttrRW(Float(), io_ref=TemperatureControllerAttributeIORef(name="R")) power = AttrR(Float(), io_ref=TemperatureControllerAttributeIORef(name="P")) def __init__(self, settings: TemperatureControllerSettings) -> None: self.connection = IPConnection() self.suffix = "" super().__init__( ios=[TemperatureControllerAttributeIO(self.connection, self.suffix)] ) self._settings = settings self._ramp_controllers: list[TemperatureRampController] = [] for index in range(1, settings.num_ramp_controllers + 1): controller = TemperatureRampController(index, self.connection) self._ramp_controllers.append(controller) self.register_sub_controller(f"R{index}", controller) @command() async def cancel_all(self) -> None: for rc in self._ramp_controllers: await rc.enabled.process(OnOffEnum.Off) # TODO: The requests all get concatenated and the sim doesn't handle it await asyncio.sleep(0.1) async def connect(self) -> None: await self.connection.connect(self._settings.ip_settings) async def close(self) -> None: await self.connection.close() @scan(0.1) async def update_voltages(self): query = "V?" voltages = json.loads( (await self.connection.send_query(f"{query}\r\n")).strip("\r\n") ) for index, controller in enumerate(self._ramp_controllers): self.log_event( "Update voltages", topic=controller.voltage, query=query, response=voltages, ) await controller.voltage.set(float(voltages[index])) class TemperatureRampController(Controller): start = AttrRW(Int(), io_ref=TemperatureControllerAttributeIORef(name="S")) end = AttrRW(Int(), io_ref=TemperatureControllerAttributeIORef(name="E")) enabled = AttrRW( Enum(OnOffEnum), io_ref=TemperatureControllerAttributeIORef(name="N") ) target = AttrR(Float(prec=3), io_ref=TemperatureControllerAttributeIORef(name="T")) actual = AttrR(Float(prec=3), io_ref=TemperatureControllerAttributeIORef(name="A")) voltage = AttrR(Float(prec=3)) def __init__(self, index: int, conn: IPConnection) -> None: suffix = f"{index:02d}" super().__init__( f"Ramp{suffix}", ios=[TemperatureControllerAttributeIO(conn, suffix)] ) self.connection = conn