Source code for dodal.devices.linkam3
import asyncio
import time
from enum import Enum
from bluesky.protocols import Location
from ophyd_async.core import (
ConfigSignal,
HintedSignal,
StandardReadable,
WatchableAsyncStatus,
WatcherUpdate,
observe_value,
)
from ophyd_async.epics.signal import epics_signal_r, epics_signal_rw
[docs]
class PumpControl(str, Enum):
Manual = "Manual"
Auto = "Auto"
[docs]
class Linkam3(StandardReadable):
"""Device to represent a Linkam3 temperature controller
Attributes:
tolerance (float): Deadband around the setpoint within which the position is assumed to have been reached
settle_time (int): The delay between reaching the setpoint and the move being considered complete
Args:
prefix (str): PV prefix for this device
name (str): unique name for this device
"""
tolerance: float = 0.5
settle_time: int = 0
def __init__(self, prefix: str, name: str):
self.temp = epics_signal_r(float, prefix + "TEMP:")
self.dsc = epics_signal_r(float, prefix + "DSC:")
self.start_heat = epics_signal_rw(bool, prefix + "STARTHEAT:")
self.ramp_rate = epics_signal_rw(
float, prefix + "RAMPRATE:", prefix + "RAMPRATE:SET:"
)
self.ramp_time = epics_signal_r(float, prefix + "RAMPTIME:")
self.set_point = epics_signal_rw(
float, prefix + "SETPOINT:", prefix + "SETPOINT:SET:"
)
self.pump_control = epics_signal_r(
PumpControl,
prefix + "LNP_MODE:SET:",
)
self.speed = epics_signal_rw(
float, prefix + "LNP_SPEED:", prefix + "LNP_SPEED:SET:"
)
self.chamber_vac = epics_signal_r(float, prefix + "VAC_CHAMBER:")
self.sensor_vac = epics_signal_r(float, prefix + "VAC_DATA1:")
self.error = epics_signal_r(str, prefix + "CTRLLR:ERR:")
# status is a bitfield stored in a double?
self.status = epics_signal_r(float, prefix + "STATUS:")
self.add_readables((self.temp,), wrapper=HintedSignal)
self.add_readables(
(self.ramp_rate, self.speed, self.set_point), wrapper=ConfigSignal
)
super().__init__(name=name)
@WatchableAsyncStatus.wrap
async def set(self, new_position: float, timeout: float | None = None):
# time.monotonic won't go backwards in case of NTP corrections
start = time.monotonic()
old_position = await self.set_point.get_value()
await self.set_point.set(new_position, wait=True)
async for current_position in observe_value(self.temp):
yield WatcherUpdate(
name=self.name,
current=current_position,
initial=old_position,
target=new_position,
time_elapsed=time.monotonic() - start,
)
if abs(current_position - new_position) < self.tolerance:
await asyncio.sleep(self.settle_time)
break
# TODO: Make use of values in Status.
# https://github.com/DiamondLightSource/dodal/issues/338
async def _is_nth_bit_set(self, n: int) -> bool:
return bool(int(await self.status.get_value()) & 1 << n)
async def in_error(self) -> bool:
return await self._is_nth_bit_set(0)
async def at_setpoint(self) -> bool:
return await self._is_nth_bit_set(1)
async def heater_on(self) -> bool:
return await self._is_nth_bit_set(2)
async def pump_on(self) -> bool:
return await self._is_nth_bit_set(3)
async def pump_auto(self) -> bool:
return await self._is_nth_bit_set(4)
async def locate(self) -> Location:
return {
"readback": await self.temp.get_value(),
"setpoint": await self.set_point.get_value(),
}