Source code for malcolm.core.notifier

from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Tuple

from annotypes import Array, FrozenOrderedDict

from .concurrency import RLock
from .loggable import Loggable
from .request import Subscribe, Unsubscribe
from .response import Response

if TYPE_CHECKING:
    from .models import BlockModel

    Callback = Callable[[Response], None]
    CallbackResponses = List[Tuple[Callback, Response]]
    SubscriptionKeys = Dict[Tuple[Callback, int], Subscribe]


class DummyNotifier:
    @property  # type: ignore
    @contextmanager
    def changes_squashed(self):
        yield

    def add_squashed_change(self, path: List[str], data: Any) -> None:
        pass

    def add_squashed_delete(self, path: List[str]) -> None:
        pass


def freeze(o):
    # Cheaper than a subclass check, will find Models for us and freeze them
    # into dicts
    if hasattr(o, "notifier"):
        o = FrozenOrderedDict(
            (("typeid", o.typeid),)
            + tuple((k, freeze(getattr(o, k))) for k in o.call_types)
        )
    elif isinstance(o, dict):
        # Recurse down in case there are any models down there
        o = FrozenOrderedDict(tuple((k, freeze(v)) for k, v in o.items()))
    elif o.__class__ is Array and hasattr(o.typ, "notifier"):
        # Recurse down only if the type suggests it has a model
        o = [freeze(v) for v in o.seq]
    return o


[docs]class Notifier(Loggable): """Object that can service callbacks on given endpoints""" def __init__(self, mri: str, lock: RLock, block: "BlockModel") -> None: self.set_logger(mri=mri) self._tree = NotifierNode(block) self._lock = lock # Incremented every time we do with changes_squashed self._squashed_count = 0 self._squashed_changes: List[List] = [] self._subscription_keys: SubscriptionKeys = {}
[docs] def handle_subscribe(self, request: Subscribe) -> "CallbackResponses": """Handle a Subscribe request from outside. Called with lock taken""" ret = self._tree.handle_subscribe(request, request.path[1:]) self._subscription_keys[request.generate_key()] = request return ret
[docs] def handle_unsubscribe(self, request: Unsubscribe) -> "CallbackResponses": """Handle a Unsubscribe request from outside. Called with lock taken""" subscribe = self._subscription_keys.pop(request.generate_key()) ret = self._tree.handle_unsubscribe(subscribe, subscribe.path[1:]) return ret
@property def changes_squashed(self) -> "Notifier": """Context manager to allow multiple calls to notify_change() to be made and all changes squashed into one consistent set. E.g: with notifier.changes_squashed: attr.set_value(1) attr.set_alarm(MINOR) """ return self
[docs] def add_squashed_change(self, path: List[str], data: Any) -> None: """Register a squashed change to a particular path Args: path (list): The path of what has changed, relative from Block data (object): The new data """ assert self._squashed_count, "Called while not squashing changes" self._squashed_changes.append([path[1:], data])
[docs] def add_squashed_delete(self, path: List[str]) -> None: """Register a squashed deletion of a particular path Args: path (list): The path of what has changed, relative from Block """ assert self._squashed_count, "Called while not squashing changes" self._squashed_changes.append([path[1:]])
def __enter__(self): """So we can use this as a context manager for squashing changes""" self._lock.acquire() self._squashed_count += 1 def __exit__(self, exc_type=None, exc_val=None, exc_tb=None): """So we can use this as a context manager for squashing changes""" responses = [] try: self._squashed_count -= 1 if self._squashed_count == 0: changes = self._squashed_changes self._squashed_changes = [] # TODO: squash intermediate deltas here? responses += self._tree.notify_changes(changes) finally: self._lock.release() self._callback_responses(responses) def _callback_responses(self, responses: "CallbackResponses") -> None: for cb, response in responses: try: cb(response) except Exception: self.log.exception(f"Exception notifying {response}") raise
class NotifierNode: # Define slots so it uses less resources to make these __slots__ = ["delta_requests", "update_requests", "children", "parent", "data"] def __init__(self, data: Any, parent: "NotifierNode" = None) -> None: self.delta_requests: List[Subscribe] = [] self.update_requests: List[Subscribe] = [] self.children: Dict[str, NotifierNode] = {} self.parent = parent self.data = data def notify_changes(self, changes: List[List]) -> "CallbackResponses": """Set our data and notify anyone listening Args: changes (list): [[path, optional data]] where path is the path to what has changed, and data is the unserialized object that has changed Returns: list: [(callback, Response)] that need to be called """ ret = [] child_changes: Dict[str, List] = {} for change in changes: # Add any changes that our children need to know about self._add_child_change(change, child_changes) # If we have update subscribers, freeze at this level if self.update_requests: frozen = freeze(self.data) for request in self.update_requests: ret.append(request.update_response(frozen)) # If we have delta subscribers, freeze the change value if self.delta_requests: for change in changes: change[-1] = freeze(change[-1]) for request in self.delta_requests: ret.append(request.delta_response(changes)) # Now notify our children for name, changes in child_changes.items(): ret += self.children[name].notify_changes(changes) return ret def _add_child_change(self, change: List, child_changes: Dict[str, List]) -> None: path = change[0] if path: # This is for one of our children name = path[0] if name in self.children: if len(change) == 2: child_change = [path[1:], change[1]] else: child_change = [path[1:]] child_changes.setdefault(name, []).append(child_change) else: # This is for us if len(change) == 2: child_change_dict = self._update_data(change[1]) else: child_change_dict = self._update_data(None) for name, child_change in child_change_dict.items(): child_changes.setdefault(name, []).append(child_change) def _update_data(self, data: Any) -> Dict[str, List]: """Set our data and notify any subscribers of children what has changed Args: data (object): The new data Returns: dict: {child_name: [path_list, optional child_data]} of the change that needs to be passed to a child as a result of this """ self.data = data child_change_dict: Dict[str, List] = {} # Reflect change of data to children for name in self.children: child_data = getattr(data, name, None) if child_data is None: # Deletion child_change_dict[name] = [[]] else: # Change child_change_dict[name] = [[], child_data] return child_change_dict def handle_subscribe( self, request: Subscribe, path: List[str] ) -> "CallbackResponses": """Add to the list of request to notify, and notify the initial value of the data held Args: request (Subscribe): The subscribe request path (list): The relative path from ourself Returns: list: [(callback, Response)] that need to be called """ ret = [] if path: # Recurse down name = path[0] if name not in self.children: self.children[name] = NotifierNode(getattr(self.data, name, None), self) ret += self.children[name].handle_subscribe(request, path[1:]) else: # This is for us frozen = freeze(self.data) if request.delta: self.delta_requests.append(request) ret.append(request.delta_response([[[], frozen]])) else: self.update_requests.append(request) ret.append(request.update_response(frozen)) return ret def handle_unsubscribe( self, request: Subscribe, path: List[str] ) -> "CallbackResponses": """Remove from the notifier list and send a return Args: request (Subscribe): The original subscribe request path (list): The relative path from ourself Returns: list: [(callback, Response)] that need to be called """ ret = [] if path: # Recurse down name = path[0] child = self.children[name] ret += child.handle_unsubscribe(request, path[1:]) if ( not child.children and not child.update_requests and not child.delta_requests ): del self.children[name] else: # This is for us if request in self.update_requests: self.update_requests.remove(request) else: self.delta_requests.remove(request) ret.append(request.return_response()) return ret