import logging
from ctypes import c_short, c_ubyte, c_ushort
from typing import Union
from tickit.devices.cryostream.states import AlarmCodes, PhaseIds, RunModes
from tickit.devices.cryostream.status import ExtendedStatus, Status
LOGGER = logging.getLogger(__name__)
[docs]class CryostreamBase:
"""A base class for cryostream device logic."""
min_temp: int = 8000 # cK
max_temp: int = 40000 # ck
min_rate: int = 1
max_rate: int = 360
min_plat_duration: int = 1
max_plat_duration: int = 1440
default_temp_shutdown: int = 30000
default_ramp_rate: int = 360
random_value: int = 20 # for status packet unknown values
def __init__(self) -> None:
"""A CryostreamBase constructor which assigns initial values."""
self.run_mode: int = 0
self.phase_id: int = 0
self.alarm_code: int = 0
self.turbo_mode: int = 0
self.gas_temp: int = 30000
self.ramp_rate: int = 0
self.gas_flow: int = 0
self._target_temp: int = 0
self.time_at_last_update: float = 0.0
[docs] async def restart(self) -> None:
"""Stop Cryostream and re-initialise system back to Ready."""
start_target_temp = 10000
await self.stop()
self.run_mode = RunModes.STARTUP.value
await self.cool(start_target_temp)
if self.gas_temp == start_target_temp:
self.run_mode = RunModes.STARTUPOK.value
else:
self.run_mode = RunModes.STARTUPFAIL.value
[docs] async def ramp(self, ramp_rate: int, target_temp: int) -> None:
"""Change gas temperature to a set value at a controlled rate.
Args:
ramp_rate (int): The rate at which the temperature should change.
target_temp (int): The target temperature.
"""
if ramp_rate < self.min_rate or ramp_rate > self.max_rate:
self.alarm_code = AlarmCodes.TEMP_CONTROL_ERROR
else:
self.ramp_rate = ramp_rate
if target_temp < self.min_temp or target_temp > self.max_temp:
self.alarm_code = AlarmCodes.TEMP_CONTROL_ERROR
else:
self._target_temp = target_temp
# Todo
"""
difference_temp = abs(self.gas_temp - self.target_temp)
if 10 < difference_temp < 25:
self.alarm_code = AlarmCodes.TEMP_WARNING
if difference_temp > 25:
self.alarm_code = AlarmCodes.TEMP_CONTROL_ERROR
"""
self.run_mode = RunModes.RUN.value
if self.phase_id != PhaseIds.COOL.value:
self.phase_id = PhaseIds.RAMP.value
if (
self.phase_id == PhaseIds.COOL.value
or (self.turbo_mode == 1 and self._target_temp < 31000)
or self._target_temp < 90000
):
self.gas_flow = 10
else:
self.gas_flow = 5
[docs] async def plat(self, duration: int) -> None:
"""Maintain the current temperature for a set amount of time.
Args:
duration (int): The duration for which the temperature should be held.
"""
if duration < self.min_plat_duration or duration > self.max_plat_duration:
raise Exception("Duration set to less than minimum plat duration time")
if duration > self.max_plat_duration:
raise Exception("Duration set to more than maximum plat duration time")
self.plat_duration = duration
self._target_temp = self.gas_temp
self.run_mode = RunModes.RUN.value
self.phase_id = PhaseIds.PLAT.value
[docs] async def hold(self) -> None:
"""Maintain the current temperature indefinitely."""
self.run_mode = RunModes.RUN.value
self.phase_id = PhaseIds.HOLD.value
self._target_temp = self.gas_temp
[docs] async def cool(self, target_temp: int) -> None:
"""Make gas temperature decrease to a set value as quickly as possible.
Args:
target_temp (int): The target temperature.
"""
self.run_mode = RunModes.RUN
self.phase_id = PhaseIds.COOL.value
self._target_temp = target_temp
await self.ramp(self.default_ramp_rate, self._target_temp)
[docs] async def end(self, ramp_rate: int) -> None:
"""Bring the gas temperature to 300 K at ramp rate, then halt and stop.
Args:
ramp_rate (int): The rate at which the temperature should change.
"""
if self.run_mode not in (5, 6):
self.phase_id = PhaseIds.END.value
await self.ramp(ramp_rate, self.default_temp_shutdown)
if self.gas_temp == self.default_temp_shutdown:
self.gas_flow = 0
self.run_mode = RunModes.SHUTDOWNOK.value
else:
self.run_mode = RunModes.SHUTDOWNFAIL.value
[docs] async def purge(self) -> None:
"""Bring the gas temperature to 300 K at max rate, then halt and stop."""
if self.run_mode not in (5, 6):
self.phase_id = PhaseIds.PURGE.value
self.gas_flow = 0
await self.ramp(self.default_ramp_rate, self.default_temp_shutdown)
self.phase_id = PhaseIds.PURGE.value
if self.gas_temp == self.default_temp_shutdown:
self.run_mode = RunModes.SHUTDOWNOK.value
else:
self.run_mode = RunModes.SHUTDOWNFAIL.value
[docs] async def pause(self) -> None:
"""Interrupt and maintain the current gas temperature until resumed."""
# Todo interrupt other commands from running
...
[docs] async def resume(self) -> None:
"""Resume the previous command."""
if self.phase_id == PhaseIds.HOLD.value:
LOGGER.warn("Cannot return to previous command") # Todo keep commands
[docs] async def stop(self) -> None:
"""Gas flow is halted and the system is stopped at the current temperature."""
if self.run_mode not in (5, 6):
self.gas_flow = 0
self._target_temp = self.gas_temp
self.run_mode = RunModes.SHUTDOWNOK.value
[docs] async def turbo(self, turbo_on: int) -> None:
"""Set turbo mode to use maximum achievable flow.
The system will use 10 l/min gas flow except above 310 K
Above 310 K available heater power limits the maximum achievable
flow to 5 l/min.
Args:
turbo_on (int): The desired turbo mode, where 0 denotes off and 1 denotes
on.
"""
if turbo_on == 1:
self.turbo_mode = 1
if self.gas_temp < 310:
self.gas_flow = 10
else:
self.gas_flow = 5
else:
self.turbo_mode = 0
[docs] def update_temperature(self, time: float) -> int:
"""Update the Cryostream gas temperature according to mode and time.
Args:
time (float): The current simulation time (in nanoseconds).
Returns:
int: The current gas temperature.
"""
delta_time = (time - self.time_at_last_update) / 1e9
difference = self._target_temp - self.gas_temp
if (self.gas_temp - self._target_temp) < 10:
self.phase_id = PhaseIds.HOLD.value
delta_temp = self.gas_flow * delta_time
if difference < 0:
delta_temp = -delta_temp
self.gas_temp = int(self.gas_temp + delta_temp)
self.time_at_last_update = time
return self.gas_temp
[docs] async def get_status(self, status_format: int) -> Union[Status, ExtendedStatus]:
"""Get a status packet corresponding to the specified format.
Args:
status_format (int): The status packet format, where 0 denotes a standard
status packet and 1 denotes an extended status packet.
Returns:
Union[
tickit.devices.cryostream.status.Status,
tickit.devices.cryostream.status.ExtendedStatus]: The status packet.
"""
if status_format == 0:
return self.status
if status_format == 1:
return self.extended_status
else:
raise Exception("Key error")