import json
import re
import time
from typing import Any, Dict, Sequence, Set, Tuple
from annotypes import Anno
from cothread.cosocket import socket
from malcolm.core import Display, NumberMeta, Queue, TimeoutError, TimeStamp, Widget
from malcolm.modules import builtin
from malcolm.modules.builtin.util import LayoutTable
from ..pandablocksclient import PandABlocksClient
from ..parts.pandaactionpart import PandAActionPart
from ..parts.pandabussespart import PandABussesPart
from ..util import DOC_URL_BASE, ADocUrlBase
from .pandablockcontroller import PandABlockController
with Anno("Hostname of the box"):
AHostname = str
with Anno("Port number of the TCP server control port"):
APort = int
with Anno("Time between polls of PandA current value changes"):
APollPeriod = float
AMri = builtin.controllers.AMri
AConfigDir = builtin.controllers.AConfigDir
ATemplateDesigns = builtin.controllers.ATemplateDesigns
AInitialDesign = builtin.controllers.AInitialDesign
ADescription = builtin.controllers.ADescription
# Minimum period in seconds between updates of the last poll period attribute
POLL_PERIOD_REPORT = 1
[docs]class PandAManagerController(builtin.controllers.ManagerController):
def __init__(
self,
mri: AMri,
config_dir: AConfigDir,
hostname: AHostname = "localhost",
port: APort = 8888,
doc_url_base: ADocUrlBase = DOC_URL_BASE,
poll_period: APollPeriod = 0.1,
template_designs: ATemplateDesigns = "",
initial_design: AInitialDesign = "",
description: ADescription = "",
) -> None:
super().__init__(
mri=mri,
config_dir=config_dir,
template_designs=template_designs,
initial_design=initial_design,
description=description,
)
self._poll_period = poll_period
self._doc_url_base = doc_url_base
# All the bit_out fields and their values
# {block_name.field_name: value}
self._bit_outs: Dict[str, bool] = {}
# The bit_out field values that need toggling since the last handle
# {block_name.field_name: value}
self._bit_out_changes: Dict[str, bool] = {}
# The fields that busses needs to know about
# {block_name.field_name[.subfield_name]}
self._bus_fields: Set[str] = set()
# The child controllers we have created
self._child_controllers: Dict[str, PandABlockController] = {}
# The PandABlock client that does the comms
self._client = PandABlocksClient(hostname, port, Queue)
# The json layout stored in PandA
self._json_layout: Dict[str, Dict[str, float]] = {}
# Filled in on reset
self._stop_queue = None
self._poll_spawned = None
# Poll period reporting
self.last_poll_period = NumberMeta(
"float64",
"The time between the last 2 polls of the hardware",
tags=[Widget.TEXTUPDATE.tag()],
display=Display(units="s", precision=3),
).create_attribute_model(poll_period)
self.field_registry.add_attribute_model("lastPollPeriod", self.last_poll_period)
# Bus tables
self.busses: PandABussesPart = self._make_busses()
self.add_part(self.busses)
def do_init(self):
# start the poll loop and make block parts first to fill in our parts
# before calling _set_block_children()
self.start_poll_loop()
super().do_init()
def start_poll_loop(self):
# queue to listen for stop events
if not self._client.started:
self._stop_queue = Queue()
if self._client.started:
self._client.stop()
self._client.start(self.process.spawn, socket)
if not self._child_controllers:
self._make_child_controllers()
if self._poll_spawned is None:
self._poll_spawned = self.process.spawn(self._poll_loop)
def do_disable(self):
super().do_disable()
self.stop_poll_loop()
def do_reset(self):
self.start_poll_loop()
super().do_reset()
def _poll_loop(self):
"""At self.poll_period poll for changes"""
last_poll_update = time.time()
next_poll = time.time() + self._poll_period
try:
while True:
# Need to make sure we don't consume all the CPU, allow us to be
# active for 50% of the poll period, so we must sleep at least
# 50% of the poll period
min_sleep = self._poll_period * 0.5
sleep_for = next_poll - time.time()
if sleep_for < min_sleep:
# Going too fast, slow down a bit
last_poll_period = self._poll_period + min_sleep - sleep_for
sleep_for = min_sleep
else:
last_poll_period = self._poll_period
try:
# If told to stop, we will get something here and return
return self._stop_queue.get(timeout=sleep_for)
except TimeoutError:
# No stop, no problem
pass
# Poll for changes
self.handle_changes(self._client.get_changes())
if (
last_poll_period != self.last_poll_period.value
and next_poll - last_poll_update > POLL_PERIOD_REPORT
):
self.last_poll_period.set_value(last_poll_period)
last_poll_update = next_poll
next_poll += last_poll_period
except Exception as e:
self.go_to_error_state(e)
raise
def stop_poll_loop(self):
if self._poll_spawned:
self._stop_queue.put(None)
self._poll_spawned.wait()
self._poll_spawned = None
if self._client.started:
self._client.stop()
def _make_child_controllers(self):
self._child_controllers = {}
controllers = []
child_parts = []
pos_names = []
blocks_data = self._client.get_blocks_data()
for block_rootname, block_data in blocks_data.items():
block_names = []
if block_data.number == 1:
block_names.append(block_rootname)
else:
for i in range(block_data.number):
block_names.append("%s%d" % (block_rootname, i + 1))
for block_name in block_names:
# Look through the BlockData for things we are interested in
for field_name, field_data in block_data.fields.items():
if field_data.field_type == "pos_out":
pos_names.append(f"{block_name}.{field_name}")
# Make the child controller and add it to the process
controller, child_part = self._make_child_block(block_name, block_data)
controllers += [controller]
child_parts += [child_part]
self._child_controllers[block_name] = controller
# If there is only one, make an alias with "1" appended for
# *METADATA.LABEL lookup
if block_data.number == 1:
self._child_controllers[block_name + "1"] = controller
self.process.add_controllers(controllers)
for part in child_parts:
self.add_part(part)
# Create the busses from their initial sets of values
pcap_bit_fields = self._client.get_pcap_bits_fields()
self.busses.create_busses(pcap_bit_fields, pos_names)
# Handle the pos_names that busses needs
self._bus_fields = set(pos_names)
for pos_name in pos_names:
for suffix in ("CAPTURE", "UNITS", "SCALE", "OFFSET"):
self._bus_fields.add(f"{pos_name}.{suffix}")
# Handle the bit_outs, keeping a list for toggling and adding them
# to the set of things that the busses need
self._bit_outs = {k: 0 for k in self.busses.bits.value.name}
self._bit_out_changes = {}
self._bus_fields |= set(self._bit_outs)
for capture_field in pcap_bit_fields:
self._bus_fields.add(capture_field)
# Handle the initial set of changes to get an initial value
self.handle_changes(self._client.get_changes())
# Then once more to let bit_outs toggle back
self.handle_changes(())
assert (
not self._bit_out_changes
), f"There are still bit_out changes {self._bit_out_changes}"
def _make_busses(self) -> PandABussesPart:
return PandABussesPart("busses", self._client)
def _make_child_block(self, block_name, block_data):
controller = PandABlockController(
self._client, self.mri, block_name, block_data, self._doc_url_base
)
if block_name == "PCAP":
controller.add_part(
PandAActionPart(
self._client, "*PCAP", "ARM", "Arm position capture", []
)
)
controller.add_part(
PandAActionPart(
self._client, "*PCAP", "DISARM", "Disarm position capture", []
)
)
child_part = builtin.parts.ChildPart(
name=block_name, mri=controller.mri, stateful=False
)
return controller, child_part
def _handle_change(self, k, v, bus_changes, block_changes, bit_out_changes):
# Handle bit changes
try:
current_v = self._bit_outs[k]
except KeyError:
# Not a bit
pass
else:
# Convert to a boolean
v = bool(int(v))
try:
changed_to = bit_out_changes[k]
except KeyError:
# We didn't already make a change
if v == current_v:
# Value is the same, store the negation, and set it
# back next time
self._bit_out_changes[k] = v
v = not v
else:
# Already made a change, defer this value til next time
# if it is different
if changed_to != v:
self._bit_out_changes[k] = v
return
self._bit_outs[k] = v
# Notify the bus tables if they need to know
if k in self._bus_fields:
bus_changes[k] = v
# Add to the relevant Block changes dict
block_name, field_name = k.split(".", 1)
if block_name == "*METADATA":
if field_name.startswith("LABEL_"):
field_name, block_name = field_name.split("_", 1)
elif field_name == "LAYOUT" and v:
# Set the layout table first so that related fields are set after
# possible deletion
self._json_layout = json.loads("".join(v))
if self.layout.value.name:
# Only set the layout after the initial call to set_layout
self.set_layout(LayoutTable([], [], [], [], []))
return
else:
# Don't support any non-label metadata fields at the moment
return
block_changes.setdefault(block_name, {})[field_name] = v
def handle_changes(self, changes: Sequence[Tuple[str, str]]) -> None:
ts = TimeStamp()
# {block_name: {field_name: field_value}}
block_changes: Dict[str, Any] = {}
# {full_field: field_value}
bus_changes = {}
# Process bit outs that need changing
bit_out_changes = self._bit_out_changes
self._bit_out_changes = {}
for k, v in bit_out_changes.items():
self._bit_outs[k] = v
bus_changes[k] = v
block_name, field_name = k.split(".")
block_changes.setdefault(block_name, {})[field_name] = v
# Work out which change is needed for which block
for key, value in changes:
self._handle_change(key, value, bus_changes, block_changes, bit_out_changes)
# Notify the Blocks that they need to handle these changes
if bus_changes:
self.busses.handle_changes(bus_changes, ts)
for block_name, block_changes_values in block_changes.items():
self._child_controllers[block_name].handle_changes(block_changes_values, ts)
def set_layout(self, value):
if not value.name:
# Blank layout table means read from PandA provided json layout
# Called when PandA supplies a layout
x, y, visible = [], [], []
names = list(set(self.layout.value.name).union(self._json_layout))
for name in names:
visible.append(name in self._json_layout)
x.append(self._json_layout.get(name, {"x": 0.0})["x"])
y.append(self._json_layout.get(name, {"y": 0.0})["y"])
value = LayoutTable(names, names, x, y, visible)
super().set_layout(value)
old_json_layout = self._json_layout.copy()
for name, _, x, y, visible in self.layout.value.rows():
if visible:
self._json_layout[name] = dict(x=x, y=y)
else:
self._json_layout.pop(name, "")
if self._json_layout != old_json_layout:
# Custom encoding so the lines aren't too long and there aren't too many
lines = re.split(r'(?<=,) (?!"y")', json.dumps(self._json_layout))
self._client.set_table("*METADATA", "LAYOUT", lines)