Source code for malcolm.modules.pva.controllers.pvaclientcomms

from typing import Any, Dict, Set

from p4p import Value
from p4p.client.cothread import Context, Subscription
from p4p.client.raw import Disconnected, RemoteError
from p4p.nt import NTURI

from malcolm.core import DEFAULT_TIMEOUT, Alarm, BlockMeta, BlockModel, Model, Queue
from malcolm.modules import builtin

from .pvaconvert import Type, convert_to_type_tuple_value, convert_value_to_dict


[docs]class PvaClientComms(builtin.controllers.ClientComms): """A class for a client to communicate with the server""" _monitors = None _ctxt = None _queues: Dict[str, Queue] = {} def do_init(self): super().do_init() self._ctxt = Context("pva", unwrap=False) self._queues: Dict[str, Queue] = {} self._monitors: Set[Subscription] = set() def do_disable(self): super().do_disable() # Unsubscribe to all the monitors for m in self._monitors: m.close() self._ctxt.close() def _update_settable_fields( self, update_fields: Set[str], dotted_path: str, ob: Any ) -> None: if isinstance(ob, dict): model_children = all([isinstance(ob[k], Model) for k in ob]) else: model_children = False if isinstance(ob, Model) or model_children: # Recurse down for k in ob: self._update_settable_fields(update_fields, f"{dotted_path}.{k}", ob[k]) else: # This is a terminal field, add to the set update_fields.add(dotted_path)
[docs] def sync_proxy(self, mri, block): """Abstract method telling the ClientComms to sync this proxy Block with its remote counterpart. Should wait until it is connected Args: mri (str): The mri for the remote block block (BlockModel): The local proxy Block to keep in sync """ done_queue = Queue() self._queues[mri] = done_queue update_fields = set() def callback(value=None): if isinstance(value, Exception): # Disconnect or Cancelled or RemoteError if isinstance(value, Disconnected): # We will get a reconnect with a whole new structure update_fields.clear() block.health.set_value( value="pvAccess disconnected", alarm=Alarm.disconnected("pvAccess disconnected"), ) else: with block.notifier.changes_squashed: if not update_fields: self.log.debug("Regenerating from %s", list(value)) self._regenerate_block(block, value, update_fields) done_queue.put(None) else: self._update_block(block, value, update_fields) m = self._ctxt.monitor(mri, callback, notify_disconnect=True) self._monitors.add(m) done_queue.get(timeout=DEFAULT_TIMEOUT)
def _regenerate_block( self, block: BlockModel, value: Value, update_fields: Set[str] ) -> None: # This is an initial update, generate the list of all fields # TODO: very similar to websocketclientcomms for field in list(block): if field not in ("health", "meta"): block.remove_endpoint(field) for k, v in value.items(): if k == "health": # Update health attribute block.health.set_value( value=v["value"], alarm=convert_value_to_dict(v["alarm"]), ts=convert_value_to_dict(v["timeStamp"]), ) elif k == "meta": # Update BlockMeta meta: BlockMeta = block.meta for n in meta.call_types: meta.apply_change([n], v[n]) else: # Add new Attribute/Method v = convert_value_to_dict(v) block.set_endpoint_data(k, v) # Update the list of fields self._update_settable_fields(update_fields, k, block[k]) def _update_block( self, block: BlockModel, value: Value, update_fields: Set[str] ) -> None: # This is a subsequent update changed = value.changedSet(parents=True, expand=False) for k in changed.intersection(update_fields): v = value[k] if isinstance(v, Value): v = convert_value_to_dict(v) block.apply_change(k.split("."), v)
[docs] def send_put(self, mri, attribute_name, value): """Abstract method to dispatch a Put to the server Args: mri (str): The mri of the Block attribute_name (str): The name of the Attribute within the Block value: The value to put """ path = attribute_name + ".value" typ, value = convert_to_type_tuple_value(value) if isinstance(typ, tuple): # Structure, make into a Value _, typeid, fields = typ value = Value(Type(fields, typeid), value) try: self._ctxt.put(mri, {path: value}, path) except RemoteError: if attribute_name == "exports": # TODO: use a tag instead of a name # This will change the structure of the block # Wait for reconnect self._queues[mri].get(timeout=DEFAULT_TIMEOUT) else: # Not expected, raise raise
[docs] def send_post(self, mri, method_name, **params): """Abstract method to dispatch a Post to the server Args: mri (str): The mri of the Block method_name (str): The name of the Method within the Block params: The parameters to send Returns: The return results from the server """ typ, parameters = convert_to_type_tuple_value(params) uri = NTURI(typ[2]) uri = uri.wrap(path=f"{mri}.{method_name}", kws=parameters, scheme="pva") value = self._ctxt.rpc(mri, uri, timeout=None) return convert_value_to_dict(value)