import collections
import os
from shutil import copyfile
from typing import TYPE_CHECKING, Optional
import h5py
import numpy as np
from annotypes import Anno, add_call_types
from malcolm.core import APartName, Info, PartRegistrar
from malcolm.modules import builtin, pandablocks, pmac, scanning
from malcolm.modules.builtin.util import LayoutTable
if TYPE_CHECKING:
from typing import Dict, List
PartInfo = Dict[str, List[Info]]
# Pull re-used annotypes into our namespace in case we are subclassed
APartName = APartName
AMri = builtin.parts.AMri
with Anno("name of CS port"):
ACsPort = str
with Anno("mri suffix of malcolm CS block [$(pmac_mri):$(suffix)]"):
ACsMriSuffix = str
with Anno("mri suffix of malcolm Status block [$(pmac_mri):$(suffix)]"):
AStatusMriSuffix = str
PVar = collections.namedtuple("PVar", "path file p_number")
def MRES_VAR(axis):
return "P48%02d" % axis
def OFFSET_VAR(axis):
return "P49%02d" % axis
# We will set these attributes on the child block, so don't save them
[docs]class KinematicsSavuPart(builtin.parts.ChildPart):
"""Part for writing out files to send to Savu for post processing
of forward kinematics. Creates the following files:
- <ID>-savu.nxs - Input data file for Savu. Links to Panda data, and
datasets which contain the kinematics code and variables.
- <ID>-savu_pl.nxs - Savu process list, copied from /kinematics directory
- <ID>-vds.nxs - VDS file linking to Savu processed data (when processed)
"""
def __init__(
self,
name: APartName,
mri: AMri,
cs_port: ACsPort = None,
cs_mri_suffix: ACsMriSuffix = ":CS",
status_mri_suffix: AStatusMriSuffix = ":STATUS",
) -> None:
super(KinematicsSavuPart, self).__init__(name, mri, stateful=False)
self.nxs_full_filename = ""
self.vds_full_filename = ""
self.savu_pl_filename = ""
self.savu_full_filename = ""
self.savu_code_lines: List[str] = []
self.savu_variables: Dict[str, str] = {}
self.q_value_mapping: Dict[int, str] = {}
self.p_vars: List[PVar] = []
self.use_min_max = True
self.savu_file = None
self.layout_table: Optional[LayoutTable] = None
self.pos_table = None
self.cs_port = cs_port
self.cs_mri_suffix = cs_mri_suffix
self.status_mri_suffix = status_mri_suffix
self.shape = None
self.pmac_mri: str = ""
self.panda_mri: str = ""
self.axis_numbers: Dict[str, int] = {}
self.generator = None
def setup(self, registrar: PartRegistrar) -> None:
super(KinematicsSavuPart, self).setup(registrar)
# Tell the controller to expose some extra configure parameters
registrar.report(scanning.hooks.ConfigureHook.create_info(self.on_configure))
# Hooks
registrar.hook(scanning.hooks.ConfigureHook, self.on_configure)
registrar.hook(scanning.hooks.PostConfigureHook, self.on_post_configure)
# Allow CamelCase as these parameters will be serialized
# noinspection PyPep8Naming
[docs] @add_call_types
def on_post_configure(
self, context: scanning.hooks.AContext, part_info: scanning.hooks.APartInfo
) -> None:
# Map these in the file
dataset_infos = scanning.infos.DatasetProducedInfo.filter_values(part_info)
for scannable, axis_num in self.axis_numbers.items():
min_i, max_i, value_i = None, None, None
for info in dataset_infos:
if info.name.startswith(scannable + "."):
if info.type == scanning.infos.DatasetType.POSITION_MIN:
min_i = info
elif info.type == scanning.infos.DatasetType.POSITION_MAX:
max_i = info
elif info.type == scanning.infos.DatasetType.POSITION_VALUE:
value_i = info
# Always make sure .value is there
assert value_i, f"No value dataset for {scannable}"
self.p_vars.append(
PVar(
path=value_i.path,
file=value_i.filename,
p_number="p%dmean" % axis_num,
)
)
if min_i and max_i:
self.p_vars.append(
PVar(
path=min_i.path,
file=min_i.filename,
p_number="p%dmin" % axis_num,
)
)
self.p_vars.append(
PVar(
path=max_i.path,
file=max_i.filename,
p_number="p%dmax" % axis_num,
)
)
else:
self.use_min_max = False
# Get Forward Kinematics code lines and I,P,M,Q input variables
pmac_status_child = context.block_view(self.pmac_mri + ":STATUS")
raw_input_vars = " ".join(
[
pmac_status_child.iVariables.value,
pmac_status_child.pVariables.value,
pmac_status_child.mVariables.value,
]
)
pmac_cs_child = context.block_view(self.pmac_mri + ":" + self.cs_mri_suffix)
raw_kinematics_program_code = pmac_cs_child.forwardKinematic.value
raw_input_vars += " " + pmac_cs_child.qVariables.value
self.savu_code_lines = raw_kinematics_program_code.splitlines()
self.parse_input_variables(raw_input_vars)
self.create_files()
def check_mres_and_pos(self, split_var):
# if PandA has MRES/Offset, clear P variable so it isn't applied twice
for scannable, axis_num in self.axis_numbers.items():
posbus_ind = self.pos_table.datasetName.index(scannable)
panda_mres = self.pos_table.scale[posbus_ind]
panda_offset = self.pos_table.offset[posbus_ind]
if split_var[0] == MRES_VAR(axis_num):
if panda_mres != 1.0:
split_var[1] = "1.0"
elif split_var[0] == OFFSET_VAR(axis_num):
if panda_offset != 0.0:
split_var[1] = "0.0"
return split_var
def parse_input_variables(self, raw_input_vars):
try:
for var in raw_input_vars.split(" "):
if var:
split_var = var.split("=")
# ignore any values in hex
if not split_var[1].startswith("$"):
split_var = self.check_mres_and_pos(split_var)
self.savu_variables[split_var[0]] = split_var[1]
except IndexError:
raise ValueError(
f"Error getting kinematic input variables from {raw_input_vars}"
)
[docs] def create_files(self):
"""Create the files that will be used by Savu
- <ID>-savu.nxs - Input data file for Savu. Links to Panda data, and
datasets which contain the kinematics code and variables, and
whether to use min, mean and max datasets, or just the mean.
- <ID>-savu_pl.nxs - Savu process list
- <ID>-kinematics-vds.nxs - VDS file linking to Savu processed data
"""
# Create the -savu.nxs file which contains the input data for Savu
with h5py.File(self.nxs_full_filename, "w", libver="latest") as savu_file:
savu_file.attrs["default"] = "entry"
nxentry = savu_file.create_group("entry")
nxentry.attrs["NX_class"] = "NXentry"
nxentry.attrs["default"] = "inputs"
nxcollection = nxentry.create_group("inputs")
nxcollection.attrs["NX_class"] = "NXcollection"
# Program code lines dataset
program_dset = nxcollection.create_dataset(
"program", (len(self.savu_code_lines),), h5py.special_dtype(vlen=str)
)
program_dset[...] = self.savu_code_lines
program_dset.attrs["long_name"] = "Kinematic Program lines"
# Fixed variables dataset
comp_type = np.dtype(
[("Name", h5py.special_dtype(vlen=str)), ("Value", "f")]
)
data = np.array(list(self.savu_variables.items()), dtype=comp_type)
variables_dset = nxcollection.create_dataset(
"variables", (len(data),), comp_type
)
variables_dset.attrs["long_name"] = "Fixed program variables"
variables_dset[...] = data
# Use MinMax dataset
minmax_data = np.array([self.use_min_max])
minmax_dset = nxcollection.create_dataset("use_minmax", data=minmax_data)
minmax_dset.attrs["long_name"] = "Use min and max dataset"
# Link to external P values
for p_var in self.p_vars:
savu_file["/entry/inputs/" + p_var.p_number] = h5py.ExternalLink(
p_var.file, p_var.path
)
# Create Savu plugin list file
src = os.path.realpath(__file__)
src = os.path.dirname(src)
if self.use_min_max:
kinematics_file = "min_mean_max.nxs"
else:
kinematics_file = "only_mean.nxs"
src = os.path.join(src, "..", "kinematics", kinematics_file)
copyfile(src, self.savu_pl_filename)
# Create the finished VDS file which links to the processed Savu data
self.create_vds_file()
[docs] def create_vds_file(self):
"""Create the VDS file that points to the processed savu files.
Assumes that savu is called with the argument to specify the location
of the processed data is in a data folder with the suffix '-savuproc'
"""
virtual_shape = (9,) + self.shape
with h5py.File(self.vds_full_filename, "w", libver="latest") as f:
f.require_group("/entry/")
if self.use_min_max:
datatypes = ["min", "mean", "max"]
else:
datatypes = ["mean"]
for datatype in datatypes:
for i in range(9):
layout = h5py.VirtualLayout(shape=self.shape, dtype=np.float64)
v_source = h5py.VirtualSource(
self.savu_full_filename,
f"/entry/final_result_q{datatype}/data",
shape=virtual_shape,
)
layout[:] = v_source[i]
# Use axis name if have it, otherwise use raw Q number
if i + 1 in self.q_value_mapping:
f.create_virtual_dataset(
"/entry/" + self.q_value_mapping[i + 1] + "." + datatype,
layout,
fillvalue=-1,
)
else:
f.create_virtual_dataset(
"/entry/rawQ" + "%02d" % (i + 1) + "." + datatype,
layout,
fillvalue=-1,
)
# Add any setpoint dimensions
for dim in self.generator.dimensions:
for axis in dim.axes:
f.create_dataset(
name=f"/entry/{axis}_set/{axis}.value_set",
dtype=np.float64,
data=[p for p in dim.get_positions(axis)],
)