import time
from typing import Optional
from annotypes import Anno
from malcolm.core import MethodModel, Part, PartRegistrar, Queue, TimeoutError, tags
from malcolm.modules import builtin
from .. import util
with Anno("Status pv to see if successful"):
AStatusPv = str
with Anno("Good value for status pv"):
AGoodStatus = str
with Anno("PV containing error message if unsuccessful"):
AMessagePv = str
with Anno("Value to write to pv when method called"):
AValue = int
with Anno("Wait for caput callback?"):
AWait = bool
with Anno("How long to wait for status_pv == good_status before returning"):
AStatusTimeout = int
[docs]class CAActionPart(Part):
"""
Group a number of PVs together that represent a method like acquire()
Commonly a group of pvs are used to represent a method call like this::
caput(pv, wait=True)
assert caget(status_pv) == good_status, (
"Action failed with message: %s" % caget(message_pv)
)
This `Part` wraps up this design pattern as a Malcolm `Method`
"""
def __init__(
self,
name: util.APartName,
description: util.AMetaDescription,
pv: util.APv = "",
status_pv: AStatusPv = "",
good_status: AGoodStatus = "",
status_timeout: AStatusTimeout = 1,
message_pv: AMessagePv = "",
value: AValue = 1,
wait: AWait = True,
throw: util.AThrow = True,
) -> None:
super().__init__(name)
self.description = description
self.pv = pv
self.status_pv = status_pv
self.good_status = good_status
self.status_timeout = status_timeout
self.message_pv = message_pv
self.value = value
self.wait = wait
self.throw = throw
self.method: Optional[MethodModel] = None
def setup(self, registrar: PartRegistrar) -> None:
super().setup(registrar)
# Hooks
registrar.hook(
(builtin.hooks.InitHook, builtin.hooks.ResetHook), self.reconnect
)
# Methods
self.method = registrar.add_method_model(
self.caput, self.name, self.description
)
def reconnect(self):
pvs = [self.pv]
if self.status_pv:
pvs.append(self.status_pv)
if self.message_pv:
pvs.append(self.message_pv)
ca_values = util.catools.caget(pvs, throw=self.throw)
# check connection is ok
try:
for v in ca_values:
if not isinstance(v, util.catools.ca_nothing):
assert v.ok, f"CA connect failed with {v.state_strings[v.state]}"
else:
raise AssertionError("CA connect failed")
except AssertionError as e:
if self.throw:
raise e
else:
self.method.meta.set_tags(
list(self.method.meta.tags) + [tags.method_hidden()]
)
def wait_for_good_status(self, deadline):
q = Queue()
m = util.catools.camonitor(
self.status_pv, q.put, datatype=util.catools.DBR_STRING
)
status = None
try:
while True:
try:
status = q.get(deadline - time.time())
except TimeoutError:
return status
else:
if status == self.good_status:
return status
finally:
m.close()
def caput(self):
self.log.info("caput %s %s", self.pv, self.value)
util.catools.caput(self.pv, self.value, wait=self.wait, timeout=None)
if self.status_pv:
# Wait for up to status_timeout for the right results to come in
deadline = time.time() + self.status_timeout
status = self.wait_for_good_status(deadline)
if self.message_pv:
message = " %s:" % util.catools.caget(
self.message_pv, datatype=util.catools.DBR_CHAR_STR
)
else:
message = ""
assert (
status == self.good_status
), "Status %s:%s while performing 'caput %s %s'" % (
status,
message,
self.pv,
self.value,
)