from typing import Any, Dict, Set
from p4p import Value
from p4p.client.cothread import Context, Subscription
from p4p.client.raw import Disconnected, RemoteError
from p4p.nt import NTURI
from malcolm.core import DEFAULT_TIMEOUT, Alarm, BlockMeta, BlockModel, Model, Queue
from malcolm.modules import builtin
from .pvaconvert import Type, convert_to_type_tuple_value, convert_value_to_dict
[docs]class PvaClientComms(builtin.controllers.ClientComms):
"""A class for a client to communicate with the server"""
_monitors = None
_ctxt = None
_queues: Dict[str, Queue] = {}
def do_init(self):
super().do_init()
self._ctxt = Context("pva", unwrap=False)
self._queues: Dict[str, Queue] = {}
self._monitors: Set[Subscription] = set()
def do_disable(self):
super().do_disable()
# Unsubscribe to all the monitors
for m in self._monitors:
m.close()
self._ctxt.close()
def _update_settable_fields(
self, update_fields: Set[str], dotted_path: str, ob: Any
) -> None:
if isinstance(ob, dict):
model_children = all([isinstance(ob[k], Model) for k in ob])
else:
model_children = False
if isinstance(ob, Model) or model_children:
# Recurse down
for k in ob:
self._update_settable_fields(update_fields, f"{dotted_path}.{k}", ob[k])
else:
# This is a terminal field, add to the set
update_fields.add(dotted_path)
[docs] def sync_proxy(self, mri, block):
"""Abstract method telling the ClientComms to sync this proxy Block
with its remote counterpart. Should wait until it is connected
Args:
mri (str): The mri for the remote block
block (BlockModel): The local proxy Block to keep in sync
"""
done_queue = Queue()
self._queues[mri] = done_queue
update_fields = set()
def callback(value=None):
if isinstance(value, Exception):
# Disconnect or Cancelled or RemoteError
if isinstance(value, Disconnected):
# We will get a reconnect with a whole new structure
update_fields.clear()
block.health.set_value(
value="pvAccess disconnected",
alarm=Alarm.disconnected("pvAccess disconnected"),
)
else:
with block.notifier.changes_squashed:
if not update_fields:
self.log.debug("Regenerating from %s", list(value))
self._regenerate_block(block, value, update_fields)
done_queue.put(None)
else:
self._update_block(block, value, update_fields)
m = self._ctxt.monitor(mri, callback, notify_disconnect=True)
self._monitors.add(m)
done_queue.get(timeout=DEFAULT_TIMEOUT)
def _regenerate_block(
self, block: BlockModel, value: Value, update_fields: Set[str]
) -> None:
# This is an initial update, generate the list of all fields
# TODO: very similar to websocketclientcomms
for field in list(block):
if field not in ("health", "meta"):
block.remove_endpoint(field)
for k, v in value.items():
if k == "health":
# Update health attribute
block.health.set_value(
value=v["value"],
alarm=convert_value_to_dict(v["alarm"]),
ts=convert_value_to_dict(v["timeStamp"]),
)
elif k == "meta":
# Update BlockMeta
meta: BlockMeta = block.meta
for n in meta.call_types:
meta.apply_change([n], v[n])
else:
# Add new Attribute/Method
v = convert_value_to_dict(v)
block.set_endpoint_data(k, v)
# Update the list of fields
self._update_settable_fields(update_fields, k, block[k])
def _update_block(
self, block: BlockModel, value: Value, update_fields: Set[str]
) -> None:
# This is a subsequent update
changed = value.changedSet(parents=True, expand=False)
for k in changed.intersection(update_fields):
v = value[k]
if isinstance(v, Value):
v = convert_value_to_dict(v)
block.apply_change(k.split("."), v)
[docs] def send_put(self, mri, attribute_name, value):
"""Abstract method to dispatch a Put to the server
Args:
mri (str): The mri of the Block
attribute_name (str): The name of the Attribute within the Block
value: The value to put
"""
path = attribute_name + ".value"
typ, value = convert_to_type_tuple_value(value)
if isinstance(typ, tuple):
# Structure, make into a Value
_, typeid, fields = typ
value = Value(Type(fields, typeid), value)
try:
self._ctxt.put(mri, {path: value}, path)
except RemoteError:
if attribute_name == "exports":
# TODO: use a tag instead of a name
# This will change the structure of the block
# Wait for reconnect
self._queues[mri].get(timeout=DEFAULT_TIMEOUT)
else:
# Not expected, raise
raise
[docs] def send_post(self, mri, method_name, **params):
"""Abstract method to dispatch a Post to the server
Args:
mri (str): The mri of the Block
method_name (str): The name of the Method within the Block
params: The parameters to send
Returns:
The return results from the server
"""
typ, parameters = convert_to_type_tuple_value(params)
uri = NTURI(typ[2])
uri = uri.wrap(path=f"{mri}.{method_name}", kws=parameters, scheme="pva")
value = self._ctxt.rpc(mri, uri, timeout=None)
return convert_value_to_dict(value)