import os
import time
from typing import Dict, Iterator, List, Optional
from xml.etree import cElementTree as ET
from annotypes import Anno, add_call_types
from scanpointgenerator import CompoundGenerator, Dimension
from malcolm.compat import et_to_string
from malcolm.core import (
APartName,
Block,
BooleanMeta,
Future,
Info,
PartRegistrar,
TimeoutError,
Widget,
config_tag,
)
from malcolm.modules import builtin, scanning
from ..infos import (
CalculatedNDAttributeDatasetInfo,
FilePathTranslatorInfo,
NDArrayDatasetInfo,
NDAttributeDatasetInfo,
)
from ..util import FRAME_TIMEOUT, APartRunsOnWindows, make_xml_filename
PartInfo = Dict[str, List[Info]]
SUFFIXES = "NXY3456789"
with Anno("Toggle writing of all ND attributes to HDF file"):
AWriteAllNDAttributes = bool
# Pull re-used annotypes into our namespace in case we are subclassed
APartName = APartName
AMri = builtin.parts.AMri
APartRunsOnWindows = APartRunsOnWindows
def greater_than_zero(v: int) -> bool:
return v > 0
def create_dataset_infos(
name: str,
part_info: scanning.hooks.APartInfo,
generator: CompoundGenerator,
filename: str,
) -> Iterator[Info]:
# Update the dataset table
uniqueid = "/entry/NDAttributes/NDArrayUniqueId"
generator_rank = len(generator.dimensions)
# Get the detector name from the primary source
ndarray_infos: List[NDArrayDatasetInfo] = NDArrayDatasetInfo.filter_values(
part_info
)
assert len(ndarray_infos) in (0, 1), (
"More than one NDArrayDatasetInfo defined %s" % ndarray_infos
)
# Default detector rank is 2d
detector_rank = 2
# Add the primary datasource
if ndarray_infos:
ndarray_info = ndarray_infos[0]
detector_rank = ndarray_info.rank
yield scanning.infos.DatasetProducedInfo(
name="%s.data" % name,
filename=filename,
type=scanning.util.DatasetType.PRIMARY,
rank=detector_rank + generator_rank,
path="/entry/detector/detector",
uniqueid=uniqueid,
)
# Add any secondary datasources
calculated_infos: List[
CalculatedNDAttributeDatasetInfo
] = CalculatedNDAttributeDatasetInfo.filter_values(part_info)
for calculated_info in calculated_infos:
yield scanning.infos.DatasetProducedInfo(
name="%s.%s" % (name, calculated_info.name),
filename=filename,
type=scanning.util.DatasetType.SECONDARY,
rank=detector_rank + generator_rank,
path="/entry/%s/%s" % (calculated_info.name, calculated_info.name),
uniqueid=uniqueid,
)
# Add all the other datasources
dataset_infos: List[NDAttributeDatasetInfo] = NDAttributeDatasetInfo.filter_values(
part_info
)
for dataset_info in dataset_infos:
yield scanning.infos.DatasetProducedInfo(
name=dataset_info.name,
filename=filename,
type=dataset_info.type,
# All attributes share the same rank as the detector image
rank=detector_rank + generator_rank,
path=f"/entry/{dataset_info.name}/{dataset_info.name}",
uniqueid=uniqueid,
)
# Add any setpoint dimensions
for dim in generator.axes:
yield scanning.infos.DatasetProducedInfo(
name="%s.value_set" % dim,
filename=filename,
type=scanning.util.DatasetType.POSITION_SET,
rank=1,
path="/entry/detector/%s_set" % dim,
uniqueid="",
)
def set_dimensions(child: Block, generator: CompoundGenerator) -> List[Future]:
num_dims = len(generator.dimensions)
assert num_dims <= 10, "Can only do 10 dims, you gave me %s" % num_dims
attr_dict: Dict = dict(numExtraDims=num_dims - 1)
# Fill in dim name and size
# NOTE: HDF writer has these filled with fastest moving first
# while dimensions is slowest moving first
for i in range(10):
suffix = SUFFIXES[i]
if i < num_dims:
forward_i = num_dims - i - 1
index_name = "d%d" % forward_i
index_size = generator.dimensions[forward_i].size
else:
index_name = ""
index_size = 1
attr_dict["posNameDim%s" % suffix] = index_name
attr_dict["extraDimSize%s" % suffix] = index_size
futures = child.put_attribute_values_async(attr_dict)
return futures
def make_set_points(
dimension: Dimension, axis: str, data_el: ET.Element, units: str
) -> None:
axis_vals = ["%.12g" % p for p in dimension.get_positions(axis)]
axis_el = ET.SubElement(
data_el,
"dataset",
name="%s_set" % axis,
source="constant",
type="float",
value=",".join(axis_vals),
)
if units:
ET.SubElement(
axis_el,
"attribute",
name="units",
source="constant",
value=units,
type="string",
)
def make_nxdata(
name: str,
rank: int,
entry_el: ET.Element,
generator: CompoundGenerator,
link: bool = False,
) -> ET.Element:
# Make a dataset for the data
data_el = ET.SubElement(entry_el, "group", name=name)
ET.SubElement(
data_el,
"attribute",
name="signal",
source="constant",
value=name,
type="string",
)
pad_dims = []
for d in generator.dimensions:
if len(d.axes) == 1:
pad_dims.append("%s_set" % d.axes[0])
else:
pad_dims.append(".")
pad_dims += ["."] * rank
ET.SubElement(
data_el,
"attribute",
name="axes",
source="constant",
value=",".join(pad_dims),
type="string",
)
ET.SubElement(
data_el,
"attribute",
name="NX_class",
source="constant",
value="NXdata",
type="string",
)
# Add in the indices into the dimensions array that our axes refer to
for i, d in enumerate(generator.dimensions):
for axis in d.axes:
ET.SubElement(
data_el,
"attribute",
name="%s_set_indices" % axis,
source="constant",
value=str(i),
type="string",
)
if link:
ET.SubElement(
data_el,
"hardlink",
name="%s_set" % axis,
target="/entry/detector/%s_set" % axis,
)
else:
make_set_points(d, axis, data_el, generator.units[axis])
return data_el
def make_layout_xml(
generator: CompoundGenerator,
part_info: scanning.hooks.APartInfo,
write_all_nd_attributes: bool = False,
) -> str:
# Make a root element with an NXEntry
root_el = ET.Element("hdf5_layout", auto_ndattr_default="false")
entry_el = ET.SubElement(root_el, "group", name="entry")
ET.SubElement(
entry_el,
"attribute",
name="NX_class",
source="constant",
value="NXentry",
type="string",
)
# Check that there is only one primary source of detector data
ndarray_infos: List[NDArrayDatasetInfo] = NDArrayDatasetInfo.filter_values(
part_info
)
if not ndarray_infos:
# Still need to put the data in the file, so manufacture something
primary_rank = 2
else:
primary_rank = ndarray_infos[0].rank
# Make an NXData element with the detector data in it in
# /entry/detector/detector
data_el = make_nxdata("detector", primary_rank, entry_el, generator)
det_el = ET.SubElement(
data_el, "dataset", name="detector", source="detector", det_default="true"
)
ET.SubElement(
det_el,
"attribute",
name="NX_class",
source="constant",
value="SDS",
type="string",
)
# Now add any calculated sources of data
calc_dataset_infos: List[
CalculatedNDAttributeDatasetInfo
] = CalculatedNDAttributeDatasetInfo.filter_values(part_info)
for calc_dataset_info in calc_dataset_infos:
# if we are a secondary source, use the same rank as the det
attr_el = make_nxdata(
calc_dataset_info.name, primary_rank, entry_el, generator, link=True
)
ET.SubElement(
attr_el,
"dataset",
name=calc_dataset_info.name,
source="ndattribute",
ndattribute=calc_dataset_info.attr,
)
# And then any other attribute sources of data
dataset_infos: List[NDAttributeDatasetInfo] = NDAttributeDatasetInfo.filter_values(
part_info
)
for dataset_info in dataset_infos:
# if we are a secondary source, use the same rank as the det
attr_el = make_nxdata(
dataset_info.name, primary_rank, entry_el, generator, link=True
)
ET.SubElement(
attr_el,
"dataset",
name=dataset_info.name,
source="ndattribute",
ndattribute=dataset_info.attr,
)
# Add a group for attributes
ndattr_default = "true" if write_all_nd_attributes else "false"
nd_attributes_el = ET.SubElement(
entry_el, "group", name="NDAttributes", ndattr_default=ndattr_default
)
ET.SubElement(
nd_attributes_el,
"attribute",
name="NX_class",
source="constant",
value="NXcollection",
type="string",
)
ET.SubElement(
nd_attributes_el,
"dataset",
name="NDArrayUniqueId",
source="ndattribute",
ndattribute="NDArrayUniqueId",
)
ET.SubElement(
nd_attributes_el,
"dataset",
name="NDArrayTimeStamp",
source="ndattribute",
ndattribute="NDArrayTimeStamp",
)
xml = et_to_string(root_el)
return xml
# We will set these attributes on the child block, so don't save them
[docs]@builtin.util.no_save(
"positionMode",
"enableCallbacks",
"fileWriteMode",
"swmrMode",
"storeAttr",
"dimAttDatasets",
"lazyOpen",
"arrayCounter",
"filePath",
"fileName",
"fileTemplate",
"numExtraDims",
"flushAttrPerNFrames",
"xmlLayout",
"flushDataPerNFrames",
"numCapture",
)
@builtin.util.no_save("posNameDim%s" % SUFFIXES[i] for i in range(10))
@builtin.util.no_save("extraDimSize%s" % SUFFIXES[i] for i in range(10))
class HDFWriterPart(builtin.parts.ChildPart):
"""Part for controlling an `hdf_writer_block` in a Device"""
def __init__(
self,
name: APartName,
mri: AMri,
runs_on_windows: APartRunsOnWindows = False,
write_all_nd_attributes: AWriteAllNDAttributes = True,
) -> None:
super().__init__(name, mri)
# Future for the start action
self.start_future: Optional[Future] = None
self.array_future: Optional[Future] = None
self.done_when_reaches = 0
# This is when uniqueId last updated
self.last_id_update: Optional[float] = None
# CompletedSteps = arrayCounter + self.uniqueid_offset
self.uniqueid_offset = 0
# The HDF5 layout file we write to say where the datasets go
self.layout_filename: Optional[str] = None
self.runs_on_windows = runs_on_windows
# How long to wait between frame updates before error
self.frame_timeout = 0.0
self.write_all_nd_attributes = BooleanMeta(
"Toggles whether all NDAttributes are written to "
"file, or only those specified in the dataset",
writeable=True,
tags=[Widget.CHECKBOX.tag(), config_tag()],
).create_attribute_model(write_all_nd_attributes)
[docs] @add_call_types
def on_reset(self, context: scanning.hooks.AContext) -> None:
super().on_reset(context)
self.on_abort(context)
# HDFWriter might have still be writing so stop doesn't guarantee
# flushed all frames start_future is in a different context so
# can't wait for it, so just wait for the running attribute to be false
child = context.block_view(self.mri)
child.when_value_matches("running", False)
# Delete the layout XML file
if self.layout_filename and os.path.isfile(self.layout_filename):
os.remove(self.layout_filename)
child.xmlLayout.put_value("")
[docs] def setup(self, registrar: PartRegistrar) -> None:
super().setup(registrar)
# Hooks
registrar.hook(scanning.hooks.ConfigureHook, self.on_configure)
registrar.hook(
(scanning.hooks.PostRunArmedHook, scanning.hooks.SeekHook), self.on_seek
)
registrar.hook(scanning.hooks.RunHook, self.on_run)
registrar.hook(scanning.hooks.PostRunReadyHook, self.on_post_run_ready)
registrar.hook(scanning.hooks.AbortHook, self.on_abort)
# Attributes
registrar.add_attribute_model(
"writeAllNdAttributes",
self.write_all_nd_attributes,
self.write_all_nd_attributes.set_value,
)
# Tell the controller to expose some extra configure parameters
registrar.report(scanning.hooks.ConfigureHook.create_info(self.on_configure))
# Allow CamelCase as these parameters will be serialized
# noinspection PyPep8Naming
def _check_xml_is_valid(self, child):
assert child.xmlLayoutValid.value, "%s: invalid XML layout file (%s)" % (
self.mri,
child.xmlErrorMsg.value,
)
[docs] @add_call_types
def on_seek(
self,
completed_steps: scanning.hooks.ACompletedSteps,
steps_to_do: scanning.hooks.AStepsToDo,
) -> None:
# This is rewinding or setting up for another batch, so the detector
# will skip to a uniqueID that has not been produced yet
self.uniqueid_offset = completed_steps - self.done_when_reaches
self.done_when_reaches += steps_to_do
[docs] @add_call_types
def on_run(self, context: scanning.hooks.AContext) -> None:
context.wait_all_futures(self.array_future)
context.unsubscribe_all()
self.last_id_update = None
child = context.block_view(self.mri)
child.uniqueId.subscribe_value(self.update_completed_steps)
f_done = child.when_value_matches_async("uniqueId", self.done_when_reaches)
while True:
try:
context.wait_all_futures(f_done, timeout=1)
except TimeoutError:
# This is ok, means we aren't done yet, so flush
self._flush_if_still_writing(child)
# Check it hasn't been too long
if self.last_id_update:
if time.time() > self.last_id_update + self.frame_timeout:
raise TimeoutError(
"HDF writer stalled, last updated at %s"
% (self.last_id_update)
)
# TODO: what happens if we miss the last frame?
else:
return
def _flush_if_still_writing(self, child):
# Check that the start_future hasn't errored
if self.start_future.done():
# This will raise if it errored
self.start_future.result()
else:
# Flush the hdf frames to disk
child.flushNow()
[docs] @add_call_types
def on_post_run_ready(self, context: scanning.hooks.AContext) -> None:
# Do one last flush and then we're done
child = context.block_view(self.mri)
self._flush_if_still_writing(child)
[docs] @add_call_types
def on_abort(self, context: scanning.hooks.AContext) -> None:
child = context.block_view(self.mri)
child.stop()
def update_completed_steps(self, value: int) -> None:
completed_steps = value + self.uniqueid_offset
self.last_id_update = time.time()
assert self.registrar, "No registrar assigned"
# Stop negative values being reported for first call when subscribing
# when we have a non-zero offset.
if completed_steps >= 0:
self.registrar.report(scanning.infos.RunProgressInfo(completed_steps))