Source code for malcolm.modules.pmac.parts.cspart

from annotypes import Anno, add_call_types

from malcolm.core import DEFAULT_TIMEOUT, PartRegistrar
from malcolm.modules import builtin, scanning

from ..infos import PmacCsKinematicsInfo
from ..util import CS_AXIS_NAMES

with Anno("Co-ordinate system number"):
    ACS = int
with Anno("Motor position to move to in EGUs"):
    ADemandPosition = float
with Anno("Time to take to perform move in seconds"):
    AMoveTime = float

# Pull re-used annotypes into our namespace in case we are subclassed
AMri = builtin.parts.AMri


[docs]class CSPart(builtin.parts.ChildPart): def __init__(self, mri: AMri, cs: ACS) -> None: super().__init__("CS%d" % cs, mri, initial_visibility=True) self.cs = cs def setup(self, registrar: PartRegistrar) -> None: super().setup(registrar) # Add methods registrar.add_method_model(self.move, "moveCS%d" % self.cs, needs_context=True) # Hooks registrar.hook(scanning.hooks.ReportStatusHook, self.report_status)
[docs] @add_call_types def report_status(self, context: scanning.hooks.AContext) -> scanning.hooks.UInfos: child = context.block_view(self.mri) info = PmacCsKinematicsInfo( child.port, child.qVariables.value, child.forward.value, child.inverse.value ) return info
[docs] @add_call_types def on_init(self, context: builtin.hooks.AContext) -> None: super().on_init(context) # Check the port name matches our CS number child = context.block_view(self.mri) cs_port = child.port.value assert cs_port.endswith( str(self.cs) ), "CS Port %s doesn't end with port number %d" % (cs_port, self.cs)
# Serialize, so use camelCase # noinspection PyPep8Naming
[docs] @add_call_types def move( self, context: builtin.hooks.AContext, a: ADemandPosition = None, b: ADemandPosition = None, c: ADemandPosition = None, u: ADemandPosition = None, v: ADemandPosition = None, w: ADemandPosition = None, x: ADemandPosition = None, y: ADemandPosition = None, z: ADemandPosition = None, moveTime: AMoveTime = 0, ) -> None: """Move the given CS axes using a deferred co-ordinated move""" child = context.block_view(self.mri) child.deferMoves.put_value(True) # Convert move time into milliseconds child.csMoveTime.put_value(moveTime * 1000.0) # Add in the motors we need to move attribute_values = {} for axis in CS_AXIS_NAMES: demand = locals()[axis.lower()] if demand is not None: attribute_values[f"demand{axis}"] = demand fs = child.put_attribute_values_async(attribute_values) # Wait for the demand to have been received by the PV for attr, value in sorted(attribute_values.items()): child.when_value_matches(attr, value, timeout=1.0) # Start the move child.deferMoves.put_value(False) # Wait for them to get there context.wait_all_futures(fs, timeout=moveTime + DEFAULT_TIMEOUT)
# def inverse_kinematics(): # def forward_kinematics():