Source code for dodal.devices.thawer

from asyncio import Task, create_task, sleep

from bluesky.protocols import Stoppable
from ophyd_async.core import (
    AsyncStatus,
    Device,
    Reference,
    SignalRW,
    StandardReadable,
    StrictEnum,
)
from ophyd_async.epics.core import epics_signal_rw


class ThawingException(Exception):
    pass


[docs] class ThawerStates(StrictEnum): OFF = "Off" ON = "On"
[docs] class ThawingTimer(Device, Stoppable): def __init__(self, control_signal: SignalRW[ThawerStates]) -> None: self._control_signal_ref = Reference(control_signal) self._thawing_task: Task | None = None super().__init__("thaw_for_time_s") @AsyncStatus.wrap async def set(self, time_to_thaw_for: float): await self._control_signal_ref().set(ThawerStates.ON) if self._thawing_task and not self._thawing_task.done(): raise ThawingException("Thawing task already in progress") self._thawing_task = create_task(sleep(time_to_thaw_for)) try: await self._thawing_task finally: await self._control_signal_ref().set(ThawerStates.OFF) @AsyncStatus.wrap async def stop(self, *args, **kwargs): if self._thawing_task: self._thawing_task.cancel()
[docs] class Thawer(StandardReadable, Stoppable): def __init__(self, prefix: str, name: str = "") -> None: self.control = epics_signal_rw(ThawerStates, prefix + ":CTRL") self.thaw_for_time_s = ThawingTimer(self.control) super().__init__(name) @AsyncStatus.wrap async def stop(self, *args, **kwargs): await self.thaw_for_time_s.stop() await self.control.set(ThawerStates.OFF)