Source code for dodal.devices.thawer
from asyncio import Task, create_task, sleep
from enum import Enum
from bluesky.protocols import Stoppable
from ophyd_async.core import AsyncStatus, Device, SignalRW, StandardReadable
from ophyd_async.epics.signal import epics_signal_rw
class ThawingException(Exception):
pass
[docs]
class ThawerStates(str, Enum):
OFF = "Off"
ON = "On"
[docs]
class ThawingTimer(Device):
def __init__(self, control_signal: SignalRW[ThawerStates]) -> None:
self._control_signal = control_signal
self._thawing_task: Task | None = None
super().__init__("thaw_for_time_s")
@AsyncStatus.wrap
async def set(self, time_to_thaw_for: float):
await self._control_signal.set(ThawerStates.ON)
if self._thawing_task and not self._thawing_task.done():
raise ThawingException("Thawing task already in progress")
self._thawing_task = create_task(sleep(time_to_thaw_for))
try:
await self._thawing_task
finally:
await self._control_signal.set(ThawerStates.OFF)
async def stop(self):
if self._thawing_task:
self._thawing_task.cancel()
[docs]
class Thawer(StandardReadable, Stoppable):
def __init__(self, prefix: str, name: str = "") -> None:
self.control = epics_signal_rw(ThawerStates, prefix + ":CTRL")
self.thaw_for_time_s = ThawingTimer(self.control)
super().__init__(name)
async def stop(self):
await self.thaw_for_time_s.stop()
await self.control.set(ThawerStates.OFF)