Source code for malcolm.modules.pandablocks.controllers.pandablockcontroller

import os
from typing import Any, Dict, Optional, Union, cast

from annotypes import Anno

from malcolm.core import (
    Alarm,
    AMri,
    BooleanMeta,
    ChoiceMeta,
    NumberMeta,
    Port,
    StringMeta,
    TableMeta,
    TimeStamp,
    VMeta,
    Widget,
    badge_value_tag,
    config_tag,
    group_tag,
    linked_value_tag,
    snake_to_camel,
    without_linked_value_tags,
)
from malcolm.modules import builtin

from ..pandablocksclient import BlockData, FieldData
from ..parts.pandaactionpart import PandAActionPart
from ..parts.pandafieldpart import PandAFieldPart
from ..parts.pandaiconpart import PandAIconPart
from ..parts.pandalabelpart import PandALabelPart
from ..parts.pandaluticonpart import PandALutIconPart
from ..parts.pandapulseiconpart import PandAPulseIconPart
from ..parts.pandasrgateiconpart import PandASRGateIconPart
from ..parts.pandatablepart import PandATablePart
from ..util import SVG_DIR, ABlockName, AClient, ADocUrlBase

ChangeHandler = Union[PandAFieldPart, PandALabelPart]


with Anno("Prefix to put on the beginning of the Block Name to make MRI"):
    AMriPrefix = str
with Anno("The BlockData object showing the fields of the Block"):
    ABlockData = BlockData

# Pull re-used annotypes into our namespace in case we are subclassed
AClient = AClient
ADocUrlBase = ADocUrlBase
ABlockName = ABlockName


def make_meta(subtyp, description, tags, writeable=True, labels=None):
    if subtyp == "enum":
        meta = ChoiceMeta(description, labels)
    elif subtyp == "bit":
        meta = BooleanMeta(description)
    elif subtyp in ("uint", ""):
        meta = NumberMeta("uint32", description)
    elif subtyp in ("int", "pos"):
        meta = NumberMeta("int32", description)
    elif subtyp == "scalar":
        meta = NumberMeta("float64", description)
    elif subtyp == "lut":
        meta = StringMeta(description)
    else:
        raise ValueError(f"Unknown subtype {subtyp!r}")
    meta.set_writeable(writeable)
    tags.append(meta.default_widget().tag())
    meta.set_tags(tags)
    return meta


[docs]class PandABlockController(builtin.controllers.BasicController): def __init__( self, client: AClient, mri_prefix: AMri, block_name: ABlockName, block_data: ABlockData, doc_url_base: ADocUrlBase, ) -> None: super().__init__(mri=f"{mri_prefix}:{block_name}") # Store self.client = client self.mri_prefix = mri_prefix self.block_name = block_name self.block_data = block_data self.doc_url_base = doc_url_base # {field_name: part} self.field_parts: Dict[str, Optional[ChangeHandler]] = {} # {field_name: attr.meta} self.mux_metas: Dict[str, VMeta] = {} # Make an icon, label and help for the Block self.icon_part: PandAIconPart = self._make_common_parts() # Create parts for each field for field_name, field_data in block_data.fields.items(): self._make_parts_for(field_name, field_data) def handle_changes(self, changes: Dict[str, Any], ts: TimeStamp) -> None: with self.changes_squashed: icon_needs_update = False if isinstance(changes, Dict): for k, v in changes.items(): # Health changes are for us if k.upper() == "HEALTH": if v.upper() == "OK": alarm = Alarm.ok else: alarm = Alarm.major(v) self.update_health( self, builtin.infos.HealthInfo(cast(Alarm, alarm), ts) ) continue # Work out if there is a part we need to notify try: part = self.field_parts[k] except KeyError: self.log.exception(f"Can't handle field {self.block_name}.{k}") part = None if part is None: continue part.handle_change(v, ts) if not icon_needs_update: icon_needs_update = k in self.icon_part.update_fields try: mux_meta = self.mux_metas[k] except KeyError: pass else: self._handle_mux_update(mux_meta, v) if icon_needs_update: d = {} for key in self.icon_part.update_fields: if key in self.field_parts: field_part = self.field_parts[key] if field_part: d[key] = field_part.attr.value icon = builtin.util.SVGIcon(self.icon_part.svg_text) self.icon_part.update_icon(icon, d) self.icon_part.attr.set_value(str(icon), ts=ts) def _handle_mux_update(self, mux_meta, v): # Mux changed its value, update its link to a different # Attribute tags = without_linked_value_tags(mux_meta.tags) split = v.split(".") if len(split) == 2: block_name, field_name = split attr_name = snake_to_camel(field_name.replace(".", "_")) block_mri = f"{self.mri_prefix}:{block_name}" tags.append(linked_value_tag(block_mri, attr_name)) mux_meta.set_tags(tags) def _make_common_parts(self) -> PandAIconPart: block_type = self.block_name.rstrip("0123456789") block_number = self.block_name[len(block_type) :] svg_path = os.path.join(SVG_DIR, block_type + ".svg") if block_type == "LUT": icon_cls = PandALutIconPart elif block_type in ("PULSE", "PCAP"): icon_cls = PandAPulseIconPart elif block_type == "SRGATE": icon_cls = PandASRGateIconPart else: icon_cls = PandAIconPart icon_part = icon_cls(self.client, self.block_name, svg_path) self.add_part(icon_part) label = self.block_data.description metadata_field = f"LABEL_{self.block_name}" if block_number: # If we have multiple blocks, make the labels unique label += f" {block_number}" else: # If we only have one block, the metadata field still has numbers metadata_field += "1" label_part = PandALabelPart(self.client, metadata_field, value=label) self.add_part(label_part) self.field_parts["LABEL"] = label_part self.add_part( builtin.parts.HelpPart( f"{self.doc_url_base}/build/{block_type.lower()}_doc.html" ) ) return icon_part def _make_parts_for(self, field_name, field_data): """Create the relevant parts for this field Args: field_name (str): Short field name, e.g. VAL field_data (FieldData): Field data object """ if field_name.upper() == "HEALTH": # Ignore health, as we already have a health field return typ = field_data.field_type subtyp = field_data.field_subtype if typ == "read": writeable = False else: writeable = True if typ == "time" or typ in ("param", "read") and subtyp == "time": self._make_time(field_name, field_data, writeable) elif typ == "write" and subtyp == "action": self._make_action(field_name, field_data) elif typ in ("param", "read", "write"): self._make_param(field_name, field_data, writeable) elif typ == "bit_out": self._make_out(field_name, field_data, "bit") elif typ == "pos_out": self._make_out(field_name, field_data, "pos") # Some attributes are handled by the top level busses table # so mark as present but ignored for suffix in ("CAPTURE", "UNITS", "SCALE", "OFFSET", "DATA_DELAY"): self.field_parts[f"{field_name}.{suffix}"] = None elif typ == "ext_out": if subtyp == "bits": # Bits is handled by the top level table, so mark it as being # present, but ignored self.field_parts[field_name + ".CAPTURE"] = None else: self._make_ext_capture(field_name, field_data) elif typ == "bit_mux": self._make_mux(field_name, field_data, Port.BOOL) self._make_mux_delay(field_name) elif typ == "pos_mux": self._make_mux(field_name, field_data, Port.INT32) elif typ == "table": self._make_table(field_name, field_data) else: raise ValueError(f"Unknown type {typ!r} subtype {subtyp!r}") def _make_group(self, attr_name: str) -> str: if attr_name not in self.parts: self.add_part( builtin.parts.GroupPart(attr_name, f"All {attr_name} attributes") ) group = group_tag(attr_name) return group def _make_field_part( self, field_name, meta, writeable, initial_value=None, iteration=1 ): if writeable: meta.set_tags(list(meta.tags) + [config_tag(iteration)]) meta.set_writeable(True) part = PandAFieldPart( self.client, meta, self.block_name, field_name, initial_value ) self.add_part(part) self.field_parts[field_name] = part def _make_time( self, field_name: str, field_data: FieldData, writeable: bool ) -> None: description = field_data.description if writeable: widget = Widget.TEXTINPUT group = self._make_group("parameters") else: widget = Widget.TEXTUPDATE group = self._make_group("readbacks") meta = NumberMeta("float64", description, [group, widget.tag()]) # We must change time units before value, so restore value in 2nd # iteration self._make_field_part(field_name, meta, writeable, iteration=2) meta = ChoiceMeta( description + " time units", ["min", "s", "ms", "us"], tags=[group, Widget.COMBO.tag()], ) self._make_field_part(field_name + ".UNITS", meta, writeable=True) def _make_action(self, field_name: str, field_data: FieldData) -> None: group = self._make_group("parameters") self.add_part( PandAActionPart( self.client, self.block_name, field_name, field_data.description, [group], ) ) def _make_param( self, field_name: str, field_data: FieldData, writeable: bool ) -> None: if writeable: group = self._make_group("parameters") else: group = self._make_group("readbacks") meta = make_meta( field_data.field_subtype, field_data.description, [group], writeable, field_data.labels, ) self._make_field_part(field_name, meta, writeable) def _make_out(self, field_name: str, field_data: FieldData, typ: str) -> None: group = self._make_group("outputs") if typ == "bit": port_type = Port.BOOL else: port_type = Port.INT32 flow_tag = port_type.source_port_tag(f"{self.block_name}.{field_name}") meta = make_meta( typ, field_data.description, tags=[group, flow_tag], writeable=False ) self._make_field_part(field_name, meta, writeable=False) def _make_ext_capture(self, field_name: str, field_data: FieldData) -> None: group = self._make_group("outputs") meta = ChoiceMeta( f"Capture {field_name} in PCAP?", field_data.labels, tags=[group, Widget.COMBO.tag()], ) self._make_field_part(field_name + ".CAPTURE", meta, writeable=True) def _make_mux( self, field_name: str, field_data: FieldData, port_type: Port ) -> None: group = self._make_group("inputs") labels = [x for x in field_data.labels if x in ("ZERO", "ONE")] + sorted( x for x in field_data.labels if x not in ("ZERO", "ONE") ) tags = [group, port_type.sink_port_tag("ZERO"), Widget.COMBO.tag()] if port_type == Port.BOOL: # Bits have a delay, use it as a badge delay_name = snake_to_camel(field_name) + "Delay" tags.append(badge_value_tag(self.mri, delay_name)) meta = ChoiceMeta(field_data.description, labels, tags=tags) self._make_field_part(field_name, meta, writeable=True) self.mux_metas[field_name] = meta def _make_mux_delay(self, field_name: str) -> None: group = self._make_group("inputs") meta = NumberMeta( "uint8", "How many FPGA ticks to delay input", tags=[group, Widget.TEXTINPUT.tag()], ) self._make_field_part(field_name + ".DELAY", meta, writeable=True) def _make_table(self, field_name: str, field_data: FieldData) -> None: group = self._make_group("parameters") tags = [Widget.TABLE.tag(), group, config_tag()] meta = TableMeta(field_data.description, tags, writeable=True) part = PandATablePart(self.client, meta, self.block_name, field_name) self.add_part(part) self.field_parts[field_name] = part