from typing import List
from annotypes import Anno
from malcolm.core import (
Alarm,
ChoiceMeta,
NumberMeta,
Part,
PartRegistrar,
Port,
StringMeta,
)
from malcolm.modules import builtin, ca
from ..util import CS_AXIS_NAMES
with Anno("PV prefix for CSPort and CSAxis records"):
APvPrefix = str
# Pull re-used annotypes into our namespace in case we are subclassed
AGroup = ca.util.AGroup
[docs]class RawMotorSinkPortsPart(Part):
"""Defines a string `Attribute` representing a asyn port that should be
depicted as a Source Port on a Block"""
def __init__(self, pv_prefix: APvPrefix, group: ca.util.AGroup = None) -> None:
super().__init__("sinkPorts")
self.pvs = [pv_prefix + ":CsPort", pv_prefix + ":CsAxis"]
self.rbvs = [
pv_prefix + ":CsPort_RBV",
pv_prefix + ":CsAxis_RBV",
pv_prefix + ".OUT",
]
meta = ChoiceMeta("CS Axis")
builtin.util.set_tags(meta, writeable=True, group=group, sink_port=Port.MOTOR)
self.cs_attr = meta.create_attribute_model()
meta = StringMeta("Parent PMAC Port name")
builtin.util.set_tags(meta, group=group, sink_port=Port.MOTOR)
self.pmac_attr = meta.create_attribute_model()
meta = NumberMeta("int32", "Parent PMAC Axis number")
builtin.util.set_tags(meta, group=group)
self.axis_num_attr = meta.create_attribute_model()
# Subscriptions
self.monitors: List = []
self.port = None
self.axis = None
self.port_choices: List = []
def setup(self, registrar: PartRegistrar) -> None:
registrar.add_attribute_model("pmac", self.pmac_attr)
registrar.add_attribute_model("axisNumber", self.axis_num_attr)
registrar.add_attribute_model("cs", self.cs_attr, self.caput)
# Hooks
registrar.hook(builtin.hooks.DisableHook, self.disconnect)
registrar.hook(
(builtin.hooks.InitHook, builtin.hooks.ResetHook), self.reconnect
)
def reconnect(self):
# release old monitors
self.disconnect()
# make sure we can connect to the pvs
ca_values = ca.util.assert_connected(
ca.util.catools.caget(
self.pvs + self.rbvs, format=ca.util.catools.FORMAT_CTRL
)
)
# Set initial value
self.port_choices = ca_values[0].enums
choices = [""]
for choice in self.port_choices[1:]:
for axis in CS_AXIS_NAMES + ["I"]:
choices.append(f"{choice},{axis}")
self.cs_attr.meta.set_choices(choices)
self.port = self.port_choices[ca_values[2]]
self._update_value(ca_values[3], 1)
self._update_value(ca_values[4], 2)
# Setup monitor on rbvs
self.monitors = ca.util.catools.camonitor(
self.rbvs,
self._update_value,
format=ca.util.catools.FORMAT_TIME,
notify_disconnect=True,
)
def disconnect(self):
for monitor in self.monitors:
monitor.close()
self.monitors = []
def _update_value(self, value, index):
if index == 0:
# Got CS Port
if not value.ok:
self.port = None
elif value == 0:
self.port = ""
else:
self.port = self.port_choices[value]
elif index == 1:
# Got CS Axis
if value.ok and str(value) in CS_AXIS_NAMES + ["I"]:
self.axis = value
else:
self.axis = None
else:
# Got PMAC Port name
if value.ok:
# Split "@asyn(PORT,num)" into ["PORT", "num"]
split = value.split("(")[1].rstrip(")").split(",")
self.pmac_attr.set_value(split[0].strip())
self.axis_num_attr.set_value(split[1].strip())
else:
self.pmac_attr.set_value(None, alarm=Alarm.invalid("Bad PV value"))
self.axis_num_attr.set_value(None, alarm=Alarm.invalid("Bad PV value"))
if self.port is None or self.axis is None:
# Bad value or PV disconnected
self.cs_attr.set_value(None, alarm=Alarm.invalid("Bad PV value"))
elif self.port and self.axis:
# Connected to a port
self.cs_attr.set_value(f"{self.port},{self.axis}")
else:
# Not connected to a port
self.cs_attr.set_value("")
def caput(self, value):
if value:
port, axis = value.split(",")
port_index = self.port_choices.index(port)
else:
port_index = 0
axis = ""
ca.util.catools.caput(self.pvs, (port_index, axis), wait=True)
# now do a caget
values = ca.util.catools.caget(self.rbvs, format=ca.util.catools.FORMAT_TIME)
self.port = self.port_choices[values[0]]
self._update_value(values[1], 1)