Source code for fastcs.demo.controllers

from __future__ import annotations

import asyncio
import enum
import json
from dataclasses import dataclass
from typing import Any

from fastcs.attributes import AttrHandlerRW, AttrR, AttrRW, AttrW
from fastcs.connections import IPConnection, IPConnectionSettings
from fastcs.controller import BaseController, Controller, SubController
from fastcs.datatypes import Enum, Float, Int
from fastcs.wrappers import command, scan


class OnOffEnum(enum.StrEnum):
    Off = "0"
    On = "1"


[docs] @dataclass class TemperatureControllerSettings: num_ramp_controllers: int ip_settings: IPConnectionSettings
[docs] @dataclass class TemperatureControllerHandler(AttrHandlerRW): name: str update_period: float | None = 0.2 _controller: TemperatureController | TemperatureRampController | None = None async def initialise(self, controller: BaseController): assert isinstance(controller, TemperatureController | TemperatureRampController) self._controller = controller @property def controller(self) -> TemperatureController | TemperatureRampController: if self._controller is None: raise RuntimeError("Handler not initialised") return self._controller async def put(self, attr: AttrW, value: Any) -> None: await self.controller.connection.send_command( f"{self.name}{self.controller.suffix}={attr.dtype(value)}\r\n" ) async def update(self, attr: AttrR) -> None: response = await self.controller.connection.send_query( f"{self.name}{self.controller.suffix}?\r\n" ) response = response.strip("\r\n") await attr.set(attr.dtype(response))
class TemperatureController(Controller): ramp_rate = AttrRW(Float(), handler=TemperatureControllerHandler("R")) power = AttrR(Float(), handler=TemperatureControllerHandler("P")) def __init__(self, settings: TemperatureControllerSettings) -> None: super().__init__() self.suffix = "" self._settings = settings self.connection = IPConnection() self._ramp_controllers: list[TemperatureRampController] = [] for index in range(1, settings.num_ramp_controllers + 1): controller = TemperatureRampController(index, self.connection) self._ramp_controllers.append(controller) self.register_sub_controller(f"R{index}", controller) @command() async def cancel_all(self) -> None: for rc in self._ramp_controllers: await rc.enabled.process(OnOffEnum.Off) # TODO: The requests all get concatenated and the sim doesn't handle it await asyncio.sleep(0.1) async def connect(self) -> None: await self.connection.connect(self._settings.ip_settings) async def close(self) -> None: await self.connection.close() @scan(0.1) async def update_voltages(self): voltages = json.loads( (await self.connection.send_query("V?\r\n")).strip("\r\n") ) for index, controller in enumerate(self._ramp_controllers): await controller.voltage.set(float(voltages[index])) class TemperatureRampController(SubController): start = AttrRW(Int(), handler=TemperatureControllerHandler("S")) end = AttrRW(Int(), handler=TemperatureControllerHandler("E")) enabled = AttrRW(Enum(OnOffEnum), handler=TemperatureControllerHandler("N")) target = AttrR(Float(prec=3), handler=TemperatureControllerHandler("T")) actual = AttrR(Float(prec=3), handler=TemperatureControllerHandler("A")) voltage = AttrR(Float(prec=3)) def __init__(self, index: int, conn: IPConnection) -> None: self.suffix = f"{index:02d}" super().__init__(f"Ramp{self.suffix}") self.connection = conn