from random import random
from typing import Awaitable, Callable, Optional
from tickit.adapters.composed import ComposedAdapter
from tickit.adapters.interpreters.command.command_interpreter import CommandInterpreter
from tickit.adapters.interpreters.command.regex_command import RegexCommand
from tickit.adapters.servers.tcp import TcpServer
from tickit.core.adapter import ConfigurableAdapter
from tickit.core.device import ConfigurableDevice, DeviceUpdate
from tickit.core.typedefs import SimTime
from tickit.utils.byte_format import ByteFormat
from tickit.utils.compat.typing_compat import TypedDict
[docs]class Shutter(ConfigurableDevice):
"""A toy device which downscales flux according to a set position.
A toy device which produces an output flux which is downscaled from the input flux
according to an internal state position. The position may be altered by setting new
a target which will be matched by position over a period of time determined by the
rate.
"""
#: A typed mapping containing the 'flux' input value
Inputs: TypedDict = TypedDict("Inputs", {"flux": float})
#: A typed mapping containing the 'flux' output value
Outputs: TypedDict = TypedDict("Outputs", {"flux": float})
def __init__(
self, default_position: float, initial_position: Optional[float] = None
) -> None:
"""A Shutter constructor which configures the initial and default position.
Args:
default_position (float): The initial target position of the shutter
initial_position (Optional[float]): The initial position of the shutter. If
None, a random value in the range [0.0,1.0) will be used. Defaults to
None.
"""
self.target_position = default_position
self.position = initial_position if initial_position else random()
self.rate = 2e-10
self.last_time: Optional[SimTime] = None
[docs] @staticmethod
def move(position: float, target: float, rate: float, period: SimTime) -> float:
"""A helper method used to compute the new position of a shutter.
A helper method used to compute the new position of a shutter given a target
position, a rate of change and a period over which the change occurs. Movement
is performed at the defined rate and comes to a "hard" stop when the desired
position is reached.
Args:
position (float): The prior position of the shutter.
target (float): The target position of the shutter.
rate (float): The rate of change of shutter position.
period (SimTime): The period over which the change occurs.
Returns:
float: The posterior position of the shutter.
"""
if position < target:
position = min(position + rate * period, target)
elif position > target:
position = max(position - rate * period, target)
return position
[docs] def update(self, time: SimTime, inputs: Inputs) -> DeviceUpdate[Outputs]:
"""The update method which moves the shutter and produces a downscaled flux.
The update method which adjusts the position according to the target position,
computes the transmitted flux and produces the output flux with a request to be
called back in 100ms if the if the shutter continues to move.
Args:
time (SimTime): The current simulation time (in nanoseconds).
inputs (State): A mapping of inputs to the device and their values.
Returns:
DeviceUpdate[Outputs]:
The produced update event which contains the value of the transmitted
flux, and requests a callback after 100ms if the shutter continues to
move.
"""
if self.last_time:
self.position = Shutter.move(
self.position,
self.target_position,
self.rate,
SimTime(time - self.last_time),
)
self.last_time = time
call_at = (
None if self.position == self.target_position else SimTime(time + int(1e8))
)
output_flux = inputs["flux"] * self.position
return DeviceUpdate(Shutter.Outputs(flux=output_flux), call_at)
[docs]class ShutterAdapter(ComposedAdapter, ConfigurableAdapter):
"""A toy composed adapter which gets shutter position and target and sets target."""
_device: Shutter
def __init__(
self,
device: Shutter,
raise_interrupt: Callable[[], Awaitable[None]],
host: str = "localhost",
port: int = 25565,
) -> None:
"""A Shutter which instantiates a TcpServer with configured host and port.
Args:
device (Device): The device which this adapter is attached to
raise_interrupt (Callable): A callback to request that the device is
updated immediately.
host (Optional[str]): The host address of the TcpServer. Defaults to
"localhost".
port (Optional[int]): The bound port of the TcpServer. Defaults to 25565.
"""
super().__init__(
device,
raise_interrupt,
TcpServer(host, port, ByteFormat(b"%b\r\n")),
CommandInterpreter(),
)
[docs] @RegexCommand(r"P\?", False, "utf-8")
async def get_position(self) -> bytes:
"""A regex string command which returns the utf-8 encoded value of position.
Returns:
bytes: The utf-8 encoded value of position.
"""
return str(self._device.position).encode("utf-8")
[docs] @RegexCommand(r"T\?", False, "utf-8")
async def get_target(self) -> bytes:
"""A regex string command which returns the utf-8 encoded value of target.
Returns:
bytes: The utf-8 encoded value of target.
"""
return str(self._device.target_position).encode("utf-8")
[docs] @RegexCommand(r"T=(\d+\.?\d*)", True, "utf-8")
async def set_target(self, target: str) -> None:
"""A regex string command which sets the target position of the shutter.
Args:
target (str): The target position of the shutter.
"""
self._device.target_position = float(target)
self._device.last_time = None