Source code for dodal.devices.current_amplifiers.current_amplifier_detector
import asyncio
from bluesky.protocols import Preparable, Reading
from ophyd_async.core import (
AsyncStatus,
Reference,
StandardReadable,
StandardReadableFormat,
soft_signal_r_and_setter,
soft_signal_rw,
)
from dodal.devices.current_amplifiers.current_amplifier import (
CurrentAmp,
CurrentAmpCounter,
)
from dodal.log import LOGGER
[docs]
class CurrentAmpDet(StandardReadable, Preparable):
"""
CurrentAmpDet composed of a CurrentAmp and a CurrentAmpCounter. It provides
the option for automatically changing the CurrentAmp gain to within the optimal
range. It also converts the currentAmp/counter output back into the detector
current output in Amp.
Attributes:
current_amp (currentAmp): Current amplifier type device.
counter (CurrentAmpCounter): Counter that capture the current amplifier output.
current (SignalRW([float]): Soft signal to store the corrected current.
auto_mode (signalR([bool])): Soft signal to store the flag for auto gain.
name (str): Name of the device.
"""
def __init__(
self,
current_amp: CurrentAmp,
counter: CurrentAmpCounter,
name: str = "",
) -> None:
self.current_amp = Reference(current_amp)
self.counter = Reference(counter)
with self.add_children_as_readables():
self.current, self._set_current = soft_signal_r_and_setter(
float, initial_value=None, units="Amp"
)
with self.add_children_as_readables(StandardReadableFormat.CONFIG_SIGNAL):
self.auto_mode = soft_signal_rw(bool, initial_value=True)
super().__init__(name)
[docs]
async def read(self) -> dict[str, Reading]:
"""
Read is modified so that if auto_mode is true it will optimise gain before
taking the final reading
"""
if await self.auto_mode.get_value():
LOGGER.info(f"{self.name}-Attempting auto-gain")
status = self.auto_gain()
try:
await status
LOGGER.info(
f"{self.name} new gain = {await self.current_amp().get_gain()}."
)
except ValueError as ex:
LOGGER.warning(f"{self.name} gain went outside limits")
# Further details are provided to the user in the logged exception
LOGGER.exception(ex)
current = await self.get_corrected_current()
self._set_current(current)
return await super().read()
@AsyncStatus.wrap
async def auto_gain(self) -> None:
within_limits = False
while not within_limits:
reading = abs(await self.counter().get_voltage_per_sec())
upper_limit, lower_limit = await asyncio.gather(
self.current_amp().get_upperlimit(),
self.current_amp().get_lowerlimit(),
)
if reading > upper_limit:
await self.current_amp().decrease_gain()
elif reading < lower_limit:
await self.current_amp().increase_gain()
else:
within_limits = True
[docs]
async def get_corrected_current(self) -> float:
"""
Convert the output(count and gain) back into the read detector output in Amp.
"""
current_gain, voltage_per_sec = await asyncio.gather(
self.current_amp().get_gain(),
self.counter().get_voltage_per_sec(),
)
correction_factor = current_gain.value
corrected_current = voltage_per_sec / correction_factor
return corrected_current
@AsyncStatus.wrap
async def stage(self) -> None:
await self.counter().stage()
@AsyncStatus.wrap
async def unstage(self) -> None:
await self.counter().unstage()
@AsyncStatus.wrap
async def prepare(self, value) -> None:
await self.counter().prepare(value=value)