import os
import subprocess
from typing import Dict, List, Sequence, Set
from annotypes import Anno, add_call_types, deserialize_object, json_decode, json_encode
from malcolm.compat import OrderedDict
from malcolm.core import (
CAMEL_RE,
Alarm,
AlarmSeverity,
AlarmStatus,
AttributeModel,
BooleanMeta,
ChoiceArrayMeta,
ChoiceMeta,
Context,
Delta,
Part,
StringMeta,
Subscribe,
TableMeta,
Unsubscribe,
Widget,
camel_to_title,
config_tag,
get_config_tag,
without_config_tags,
)
from malcolm.core.tags import Port, without_group_tags
from ..hooks import LayoutHook, LoadHook, SaveHook
from ..infos import LayoutInfo, PartExportableInfo, PartModifiedInfo, PortInfo
from ..util import ExportTable, LayoutTable, ManagerStates
from .statefulcontroller import ADescription, AMri, StatefulController
ss = ManagerStates
with Anno("Directory to write save/load config to"):
AConfigDir = str
with Anno("Design to load at init"):
AInitialDesign = str
with Anno("Name of design to save, if different from current design"):
ASaveDesign = str
with Anno(
"A directory of templates with which to initially populate designs "
"Attribute. These cannot be saved over."
):
ATemplateDesigns = str
with Anno("Name of part"):
APartName = str
with Anno("Name of attribute"):
AAttributeName = str
# Pull re-used annotypes into our namespace in case we are subclassed
AMri = AMri
ADescription = ADescription
[docs]class ManagerController(StatefulController):
"""RunnableDevice implementer that also exposes GUI for child parts"""
state_set = ss()
def __init__(
self,
mri: AMri,
config_dir: AConfigDir,
template_designs: ATemplateDesigns = "",
initial_design: AInitialDesign = "",
description: ADescription = "",
) -> None:
super().__init__(mri=mri, description=description)
assert os.path.isdir(config_dir), f"{config_dir} is not a directory"
self.config_dir = config_dir
self.initial_design = initial_design
self.template_designs = template_designs
# last saved layout and exports
self.saved_visibility = None
self.saved_exports = None
# ((name, AttributeModel/MethodModel, setter, needs_context))
self._current_part_fields = ()
self._subscriptions: List[Subscribe] = []
self.port_info: Dict[APartName, List[PortInfo]] = {}
self.part_exportable: Dict[Part, Sequence[AAttributeName]] = {}
# TODO: turn this into "exported attribute modified"
self.context_modified: Dict[Part, Set[str]] = {}
self.part_modified: Dict[Part, PartModifiedInfo] = {}
# The attributes our part has published
self.our_config_attributes: Dict[str, AttributeModel] = {}
# The reportable infos we are listening for
self.info_registry.add_reportable(PartModifiedInfo, self.update_modified)
# Update queue of exportable fields
self.info_registry.add_reportable(PartExportableInfo, self.update_exportable)
# Create a port for ourself
self.field_registry.add_attribute_model(
"mri",
StringMeta(
"A port for giving our MRI to things that might use us",
tags=[Port.BLOCK.source_port_tag(self.mri)],
).create_attribute_model(self.mri),
)
# Create a layout table attribute for setting block positions
self.layout = TableMeta.from_table(
LayoutTable,
"Layout of child blocks",
Widget.FLOWGRAPH,
writeable=["x", "y", "visible"],
).create_attribute_model()
self.set_writeable_in(self.layout, ss.READY)
self.field_registry.add_attribute_model("layout", self.layout, self.set_layout)
# Create a design attribute for loading an existing layout
self.design = ChoiceMeta(
"Design name to load", tags=[config_tag(), Widget.COMBO.tag()]
).create_attribute_model()
self.field_registry.add_attribute_model("design", self.design, self.set_design)
self.set_writeable_in(self.design, ss.READY)
# Create an export table for mirroring exported fields
self.exports = TableMeta.from_table(
ExportTable,
"Exported fields of child blocks",
writeable=list(ExportTable.call_types),
).create_attribute_model()
# Overwrite the sources meta to be a ChoiceArrayMeta
self.exports.meta.elements["source"] = ChoiceArrayMeta(
"Name of the block.field to export",
writeable=True,
tags=[Widget.COMBO.tag()],
)
self.set_writeable_in(self.exports, ss.READY)
self.field_registry.add_attribute_model(
"exports", self.exports, self.set_exports
)
# Create read-only indicator for when things are modified
self.modified = BooleanMeta(
"Whether the design is modified", tags=[Widget.LED.tag()]
).create_attribute_model()
self.field_registry.add_attribute_model("modified", self.modified)
# Create the save method
self.set_writeable_in(self.field_registry.add_method_model(self.save), ss.READY)
def do_init(self):
super().do_init()
# List the config_dir and add to choices
self._set_layout_names()
# If given a default config, load this
if self.initial_design:
self.do_load(self.initial_design, init=True)
else:
# This will trigger all parts to report their layout, making sure
# the layout table has a valid value. This will also call
# self._update_block_endpoints()
self.set_default_layout()
def set_default_layout(self):
self.set_layout(LayoutTable([], [], [], [], []))
[docs] def set_layout(self, value):
"""Set the layout table value. Called on attribute put"""
# Can't do this with changes_squashed as it will call update_modified
# from another thread and deadlock. Need RLock.is_owned() from update_*
part_info = self.run_hooks(
LayoutHook(p, c, self.port_info, value)
for p, c in self.create_part_contexts(only_visible=False).items()
)
with self.changes_squashed:
layout_parts = LayoutInfo.filter_parts(part_info)
name, mri, x, y, visible = [], [], [], [], []
for part_name, layout_infos in layout_parts.items():
for layout_info in layout_infos:
name.append(part_name)
mri.append(layout_info.mri)
x.append(layout_info.x)
y.append(layout_info.y)
visible.append(layout_info.visible)
layout_table = LayoutTable(name, mri, x, y, visible)
visibility_changed = layout_table.visible != self.layout.value.visible
self.layout.set_value(layout_table)
if self.saved_visibility is None:
# First write of table, set layout and exports saves
self.saved_visibility = layout_table.visible
self.saved_exports = self.exports.value
# Force visibility changed so we update_block_endpoints
# even if there weren't any visible
visibility_changed = True
if visibility_changed:
self.update_modified()
self.update_exportable()
# Part visibility changed, might have attributes or methods
# that we need to hide or show
self.update_block_endpoints()
def set_exports(self, value):
# Validate
for export_name in value.export:
assert CAMEL_RE.match(
export_name
), f"Field {export_name!r} is not camelCase"
with self.changes_squashed:
self.exports.set_value(value)
self.update_modified()
self.update_block_endpoints()
def update_modified(self, part: Part = None, info: PartModifiedInfo = None) -> None:
with self.changes_squashed:
if part:
assert info, "No info to update part"
# Update the alarm for the given part
self.part_modified[part] = info
# Find the modified alarms for each visible part
message_list = []
only_modified_by_us = True
for part_name, visible in zip(
self.layout.value.name, self.layout.value.visible
):
part = self.parts[part_name]
info = self.part_modified.get(part, None)
if visible and info:
for name, message in sorted(info.modified.items()):
# Attribute flagged as been modified, is it by the
# context we passed to the part?
if name in self.context_modified.get(part, {}):
message = f"(We modified) {message}"
else:
only_modified_by_us = False
message_list.append(message)
# Add in any modification messages from the layout and export tables
if self.layout.value.visible != self.saved_visibility:
message_list.append("layout changed")
only_modified_by_us = False
if self.exports.value != self.saved_exports:
message_list.append("exports changed")
only_modified_by_us = False
if message_list:
if only_modified_by_us:
severity = AlarmSeverity.NO_ALARM
else:
severity = AlarmSeverity.MINOR_ALARM
alarm = Alarm(
severity, AlarmStatus.CONF_STATUS, "\n".join(message_list)
)
self.modified.set_value(True, alarm=alarm)
else:
self.modified.set_value(False)
def update_exportable(
self, part: Part = None, info: PartExportableInfo = None
) -> None:
with self.changes_squashed:
if part:
assert info, "No info to update part"
self.part_exportable[part] = info.names
self.port_info[part.name] = info.port_infos
# If we haven't saved visibility yet these have been called
# during do_init, so don't update block endpoints yet, this will
# be done as a batch at the end of do_init
if self.saved_visibility is not None:
# Find the exportable fields for each visible part
names = []
for part in self.parts.values():
fields = self.part_exportable.get(part, [])
for attr_name in fields:
names.append(f"{part.name}.{attr_name}")
changed_names = set(names).symmetric_difference(
self.exports.meta.elements["source"].choices
)
changed_exports = changed_names.intersection(self.exports.value.source)
self.exports.meta.elements["source"].set_choices(names)
# Update the block endpoints if anything currently exported is
# added or deleted
if changed_exports:
self.update_block_endpoints()
def update_block_endpoints(self):
if self._current_part_fields:
for name, child, _, _ in self._current_part_fields:
self._block.remove_endpoint(name)
for state, state_writeable in self._children_writeable.items():
state_writeable.pop(child, None)
self._current_part_fields = tuple(self._get_current_part_fields())
for name, child, writeable_func, needs_context in self._current_part_fields:
self.add_block_field(name, child, writeable_func, needs_context)
def add_part(self, part: Part) -> None:
super().add_part(part)
# Strip out the config tags of what we just added, as we will be
# saving them ourself
for name, field, _, _ in self.field_registry.fields.get(part, []):
if isinstance(field, AttributeModel):
tags = field.meta.tags
if get_config_tag(tags):
# Strip off the "config" tags from attributes
field.meta.set_tags(without_config_tags(tags))
self.our_config_attributes[name] = field
def add_initial_part_fields(self):
# Only add our own fields to start with, the rest will be added on load
for name, child, writeable_func, needs_context in self.field_registry.fields[
None
]:
self.add_block_field(name, child, writeable_func, needs_context)
def _get_current_part_fields(self):
# Clear out the current subscriptions
for subscription in self._subscriptions:
controller = self.process.get_controller(subscription.path[0])
unsubscribe = Unsubscribe(subscription.id)
unsubscribe.set_callback(subscription.callback)
controller.handle_request(unsubscribe)
self._subscriptions = []
# Find the mris of parts
mris = {}
invisible = set()
for part_name, mri, visible in zip(
self.layout.value.name, self.layout.value.mri, self.layout.value.visible
):
if visible:
mris[part_name] = mri
else:
invisible.add(part_name)
# Add fields from parts that aren't invisible
for part_name, part in self.parts.items():
if part_name not in invisible:
for data in self.field_registry.fields.get(part, []):
yield data
# Add exported fields from visible parts
for source, export_name in self.exports.value.rows():
part_name, attr_name = source.rsplit(".", 1)
part = self.parts[part_name]
# If part is visible, get its mri
mri = mris.get(part_name, None)
if mri and attr_name in self.part_exportable.get(part, []):
if not export_name:
export_name = attr_name
export, setter = self._make_export_field(mri, attr_name, export_name)
yield export_name, export, setter, False
def _make_export_field(self, mri, attr_name, export_name):
controller = self.process.get_controller(mri)
path = [mri, attr_name]
label = camel_to_title(export_name)
ret = {}
def update_field(response):
if not isinstance(response, Delta):
# Return or Error is the end of our subscription, log and ignore
self.log.debug("Export got response %r", response)
return
if not ret:
# First call, create the initial object
export = deserialize_object(response.changes[0][1])
if isinstance(export, AttributeModel):
def setter(v):
context = Context(self.process)
context.put(path, v)
# Strip out tags that we shouldn't export
# TODO: need to strip out port tags too...
export.meta.set_tags(
without_config_tags(without_group_tags(export.meta.tags))
)
ret["setter"] = setter
else:
def setter_star_args(*args):
context = Context(self.process)
context.post(path, *args)
ret["setter"] = setter_star_args
# Regenerate label
export.meta.set_label(label)
ret["export"] = export
else:
# Subsequent calls, update it
with self.changes_squashed:
for change in response.changes:
ret["export"].apply_change(*change)
subscription = Subscribe(path=path, delta=True)
subscription.set_callback(update_field)
self._subscriptions.append(subscription)
# When we have waited for the subscription, the first update_field
# will have been called
controller.handle_request(subscription).wait()
return ret["export"], ret["setter"]
def create_part_contexts(self, only_visible=True):
part_contexts = super().create_part_contexts()
if only_visible:
for part_name, visible in zip(
self.layout.value.name, self.layout.value.visible
):
part = self.parts[part_name]
if not visible:
part_contexts.pop(part)
else:
part_contexts[part].set_notify_dispatch_request(
part.notify_dispatch_request
)
return part_contexts
# Allow CamelCase for arguments as they will be exposed in the Block Method
# noinspection PyPep8Naming
[docs] @add_call_types
def save(self, designName: ASaveDesign = "") -> None:
"""Save the current design to file"""
self.try_stateful_function(ss.SAVING, ss.READY, self.do_save, designName)
def do_save(self, design=""):
if not design:
design = self.design.value
assert design, "Please specify save design name when saving from new"
assert not design.startswith("template_"), "Cannot save over a template"
structure = OrderedDict()
attributes = structure.setdefault("attributes", OrderedDict())
# Add the layout table
layout = attributes.setdefault("layout", OrderedDict())
for name, mri, x, y, visible in self.layout.value.rows():
layout_structure = OrderedDict()
layout_structure["x"] = x
layout_structure["y"] = y
layout_structure["visible"] = visible
layout[name] = layout_structure
# Add the exports table
exports = attributes.setdefault("exports", OrderedDict())
for source, export in self.exports.value.rows():
exports[source] = export
# Add other attributes
for name, attribute in self.our_config_attributes.items():
attributes[name] = attribute.value
# Add any structure that a child part wants to save
structure["children"] = self.run_hooks(
SaveHook(p, c)
for p, c in self.create_part_contexts(only_visible=False).items()
)
text = json_encode(structure, indent=2)
filename = self._validated_config_filename(design)
if filename.startswith("/tmp"):
self.log.warning(f"Saving to tmp directory {filename}")
with open(filename, "w") as f:
f.write(text)
# Run a sync command to make sure we flush this file to disk
subprocess.call("sync")
self._mark_clean(design)
def _set_layout_names(self, extra_name=None):
names = [""]
dir_name = self._make_config_dir()
for f in os.listdir(dir_name):
if os.path.isfile(os.path.join(dir_name, f)) and f.endswith(".json"):
names.append(f.split(".json")[0])
if extra_name and str(extra_name) not in names:
names.append(str(extra_name))
names.sort()
if os.path.isdir(self.template_designs):
for f in sorted(os.listdir(self.template_designs)):
assert f.startswith("template_") and f.endswith(".json"), (
"Template design %s/%s should start with 'template_' "
"and end with .json" % (self.template_designs, f)
)
t_name = f.split(".json")[0]
if t_name not in names:
names.append(t_name)
self.design.meta.set_choices(names)
def _validated_config_filename(self, name):
"""Make config dir and return full file path and extension
Args:
name (str): Filename without dir or extension
Returns:
str: Full path including extension
"""
if name.startswith("template_"):
# Load from templates dir
dir_name = self.template_designs
else:
# Load from config dir
dir_name = self._make_config_dir()
filename = os.path.join(dir_name, name.split(".json")[0] + ".json")
return filename
def _make_config_dir(self):
dir_name = os.path.join(self.config_dir, self.mri)
try:
os.mkdir(dir_name)
except OSError:
# OK if already exists, if not then it will fail on write anyway
pass
return dir_name
def set_design(self, value):
value = self.design.meta.validate(value)
self.try_stateful_function(ss.LOADING, ss.READY, self.do_load, value)
[docs] def do_load(self, design: str, init: bool = False) -> None:
"""Load a design name, running the child LoadHooks.
Args:
design: Name of the design json file, without extension
init: Passed to the LoadHook to tell the children if this is being
run at Init or not
"""
if design:
filename = self._validated_config_filename(design)
with open(filename, "r") as f:
text = f.read()
structure = json_decode(text)
else:
structure = {}
# Attributes and Children used to be merged, support this
attributes = structure.get("attributes", structure)
children = structure.get("children", structure)
# Set the layout table
name, mri, x, y, visible = [], [], [], [], []
for part_name, d in attributes.get("layout", {}).items():
name.append(part_name)
mri.append("")
x.append(d["x"])
y.append(d["y"])
visible.append(d["visible"])
self.set_layout(LayoutTable(name, mri, x, y, visible))
# Set the exports table
source, export = [], []
for source_name, export_name in attributes.get("exports", {}).items():
source.append(source_name)
export.append(export_name)
self.exports.set_value(ExportTable(source, export))
# Set other attributes
our_values = {
k: v for k, v in attributes.items() if k in self.our_config_attributes
}
block = self.block_view()
block.put_attribute_values(our_values)
# Run the load hook to get parts to load their own structure
self.run_hooks(
LoadHook(p, c, children.get(p.name, {}), init)
for p, c in self.create_part_contexts(only_visible=False).items()
)
self._mark_clean(design, init)
def _mark_clean(self, design, init=False):
with self.changes_squashed:
self.saved_visibility = self.layout.value.visible
self.saved_exports = self.exports.value
# Now we are clean, modified should clear
if not init:
# Don't clear at init, because some things may not be
# clean at init
self.part_modified = {}
self.update_modified()
self._set_layout_names(design)
self.design.set_value(design)
self.update_block_endpoints()