import os
from datetime import datetime
from enum import Enum
from typing import Any, Dict, Optional
from annotypes import add_call_types
from ruamel.yaml import YAML, YAMLError
from scanpointgenerator import CompoundGenerator, LineGenerator
from malcolm.core import (
AbortedError,
NotWriteableError,
NumberMeta,
PartRegistrar,
StringMeta,
TimeoutError,
Widget,
config_tag,
)
from malcolm.modules import builtin
from malcolm.modules.builtin.parts import ChildPart
from malcolm.modules.scanning.util import RunnableStates
from ..hooks import AContext
# Pull re-used annotypes
APartName = builtin.parts.APartName
AMri = builtin.parts.AMri
class EntryType(Enum):
GENERATOR = 0
SCAN = 1
class GeneratorType(Enum):
LINE = 0
class RunnerStates(Enum):
IDLE = 0
LOADING = 1
CONFIGURED = 2
RUNNING = 3
FINISHED = 4
FAULT = 5
ABORTED = 6
class ScanOutcome(Enum):
SUCCESS = 0
TIMEOUT = 1
NOTWRITEABLE = 2
ABORTED = 3
MISCONFIGURED = 4
FAIL = 5
OTHER = 99
class Scan:
def __init__(
self, name: str, generator: CompoundGenerator, repeats: int = 1
) -> None:
self.name = name
self.generator = generator
self.repeats = repeats
[docs]class ScanRunnerPart(ChildPart):
"""Used to run sets of scans defined in a YAML file with a scan block"""
def __init__(self, name: APartName, mri: AMri) -> None:
super().__init__(name, mri, stateful=False, initial_visibility=True)
self.runner_config = None
self.context: Optional[AContext] = None
self.scan_sets: Dict[str, Scan] = {}
self.runner_state = StringMeta(
"Runner state", tags=Widget.TEXTUPDATE.tag()
).create_attribute_model("Idle")
self.runner_status_message = StringMeta(
"Runner status message", tags=Widget.TEXTUPDATE.tag()
).create_attribute_model("Idle")
self.scan_file = StringMeta(
"Path to input scan file", tags=[config_tag(), Widget.TEXTINPUT.tag()]
).create_attribute_model()
self.scans_configured = NumberMeta(
"int64", "Number of configured scans", tags=Widget.TEXTUPDATE.tag()
).create_attribute_model()
self.current_scan_set = StringMeta(
"Current scan set", tags=Widget.TEXTUPDATE.tag()
).create_attribute_model()
self.scans_completed = NumberMeta(
"int64", "Number of scans completed", tags=Widget.TEXTUPDATE.tag()
).create_attribute_model()
self.scan_successes = NumberMeta(
"int64", "Successful scans", tags=[Widget.TEXTUPDATE.tag()]
).create_attribute_model()
self.scan_failures = NumberMeta(
"int64", "Failed scans", tags=[Widget.TEXTUPDATE.tag()]
).create_attribute_model()
self.output_directory = StringMeta(
"Root output directory (will create a sub-directory inside)",
tags=[config_tag(), Widget.TEXTINPUT.tag()],
).create_attribute_model()
def setup(self, registrar: PartRegistrar) -> None:
super().setup(registrar)
# Register attributes
registrar.add_attribute_model(
"runnerState", self.runner_state, self.runner_state.set_value
)
registrar.add_attribute_model(
"runnerStatusMessage",
self.runner_status_message,
self.runner_status_message.set_value,
)
registrar.add_attribute_model(
"scanFile", self.scan_file, self.scan_file.set_value
)
registrar.add_attribute_model(
"scansConfigured", self.scans_configured, self.scans_configured.set_value
)
registrar.add_attribute_model(
"currentScanSet", self.current_scan_set, self.current_scan_set.set_value
)
registrar.add_attribute_model(
"scansCompleted", self.scans_completed, self.scans_completed.set_value
)
registrar.add_attribute_model(
"scanSuccesses", self.scan_successes, self.scan_successes.set_value
)
registrar.add_attribute_model(
"scanFailures", self.scan_failures, self.scan_failures.set_value
)
registrar.add_attribute_model(
"outputDirectory", self.output_directory, self.output_directory.set_value
)
# Methods
registrar.add_method_model(self.loadFile)
registrar.add_method_model(self.run, needs_context=True)
registrar.add_method_model(self.abort, needs_context=True)
def get_file_contents(self) -> str:
try:
with open(self.scan_file.value, "r") as input_file:
return input_file.read()
except IOError:
self.set_runner_state(RunnerStates.FAULT)
self.runner_status_message.set_value("Could not read scan file")
raise
def parse_yaml(self, string: str) -> Any:
try:
yaml = YAML(typ="safe", pure=True)
parsed_yaml = yaml.load(string)
return parsed_yaml
except YAMLError:
self.set_runner_state(RunnerStates.FAULT)
self.runner_status_message.set_value("Could not parse scan file")
raise
@staticmethod
def get_kwargs_from_dict(input_dict, kwargs_list):
kwargs = {}
if not isinstance(kwargs_list, list):
kwargs_list = [kwargs_list]
for kwarg in kwargs_list:
if kwarg in input_dict:
kwargs[kwarg] = input_dict[kwarg]
return kwargs
@staticmethod
def parse_compound_generator(entry: dict) -> CompoundGenerator:
generators = []
generators_dict = entry["generators"]
for generator in generators_dict:
generators.append(LineGenerator.from_dict(generator["line"]))
entry["generators"] = generators
compound_generator = CompoundGenerator.from_dict(entry)
if compound_generator.duration <= 0.0:
raise ValueError(
"Negative generator duration - is it missing from the YAML?"
)
return compound_generator
def parse_scan(self, entry: dict) -> None:
name = entry["name"]
generator = self.parse_compound_generator(entry["generator"])
kwargs = self.get_kwargs_from_dict(entry, "repeats")
self.scan_sets[name] = Scan(name, generator, **kwargs)
@staticmethod
def get_current_datetime(time_separator: str = ":") -> str:
return datetime.now().strftime(
f"%Y-%m-%d-%H{time_separator}%M{time_separator}%S"
)
# noinspection PyPep8Naming
def loadFile(self) -> None:
# Update state
self.set_runner_state(RunnerStates.LOADING)
self.runner_status_message.set_value("Loading scan file")
# Read contents of file into string
file_contents = self.get_file_contents()
# Parse the string
parsed_yaml = self.parse_yaml(file_contents)
# Empty the current dictionaries
self.scan_sets = {}
# Parse the configuration
for item in parsed_yaml:
key_name = list(item.keys())[0].upper()
if key_name == EntryType.SCAN.name:
self.parse_scan(item["scan"])
else:
self.set_runner_state(RunnerStates.FAULT)
self.runner_status_message.value = "Unidentified key in YAML"
raise ValueError(f"Unidentified object in YAML: {key_name}")
# Count the number of scans configured
self.update_scans_configured()
self.set_runner_state(RunnerStates.CONFIGURED)
self.runner_status_message.set_value("Load complete")
def update_scans_configured(self) -> None:
number_of_scans = 0
for key in self.scan_sets:
number_of_scans += self.scan_sets[key].repeats
self.scans_configured.set_value(number_of_scans)
def create_directory(self, directory: str) -> None:
try:
os.mkdir(directory)
except OSError:
self.set_runner_state(RunnerStates.FAULT)
self.runner_status_message.set_value("Could not create directory")
raise IOError(f"ERROR: unable to create directory: {directory}")
def create_and_get_sub_directory(self, root_directory: str) -> str:
today_str = self.get_current_datetime(time_separator="-")
sub_directory = f"{root_directory}/{self.mri}-{today_str}"
self.create_directory(sub_directory)
return sub_directory
def get_root_directory(self):
root_directory = self.output_directory.value
if root_directory[-1] == "/":
root_directory = root_directory[:-1]
return root_directory
[docs] @add_call_types
def abort(self, context: AContext) -> None:
if self.context:
# Stop the context
self.context.stop()
# Stop the current scan
context.block_view(self.mri).abort()
# Update status
self.set_runner_state(RunnerStates.ABORTED)
self.runner_status_message.set_value("Aborted scans")
[docs] @add_call_types
def run(self, context: AContext) -> None:
# Check that we have loaded some scan sets
if len(self.scan_sets) == 0:
self.runner_status_message.set_value("No scan file loaded")
raise ValueError("No scan sets configured. Have you loaded a YAML file?")
# Root file directory
root_directory = self.get_root_directory()
# Sub-directory to create for this run
sub_directory = self.create_and_get_sub_directory(root_directory)
# Top-level report filepath
report_filepath = f"{sub_directory}/report.txt"
# Reset counters and set state
self.scans_completed.set_value(0)
self.scan_successes.set_value(0)
self.scan_failures.set_value(0)
self.set_runner_state(RunnerStates.RUNNING)
# Get our scan block and store context
self.context = context
scan_block = self.context.block_view(self.mri)
# Cycle through the scan sets
for key in self.scan_sets:
self.run_scan_set(
self.scan_sets[key], scan_block, sub_directory, report_filepath
)
self.set_runner_state(RunnerStates.FINISHED)
self.current_scan_set.set_value("")
self.runner_status_message.set_value("Scans complete")
def create_and_get_set_directory(self, sub_directory: str, set_name: str) -> str:
set_directory = f"{sub_directory}/scanset-{set_name}"
self.create_directory(set_directory)
return set_directory
def run_scan_set(
self,
scan_set: Scan,
scan_block: Any,
sub_directory: str,
report_filepath: str,
) -> None:
# Update scan set
self.current_scan_set.set_value(scan_set.name)
# Directory where to save scans for this set
set_directory = self.create_and_get_set_directory(sub_directory, scan_set.name)
# Run each scan
for scan_number in range(1, scan_set.repeats + 1):
self.run_scan(
scan_set.name,
scan_block,
set_directory,
scan_number,
report_filepath,
scan_set.generator,
)
def create_and_get_scan_directory(
self, set_directory: str, scan_number: int
) -> str:
scan_directory = f"{set_directory}/scan-{scan_number}"
self.create_directory(scan_directory)
return scan_directory
@staticmethod
def scan_is_aborting(scan_block):
return scan_block.state.value is RunnableStates.ABORTING
def run_scan(
self,
set_name: str,
scan_block: Any,
set_directory: str,
scan_number: int,
report_filepath: str,
generator: CompoundGenerator,
) -> None:
self.runner_status_message.set_value(f"Running {set_name}: {scan_number}")
assert self.context, "No context found"
# Make individual scan directory
scan_directory = self.create_and_get_scan_directory(set_directory, scan_number)
# Check if scan can be reset or run
while self.scan_is_aborting(scan_block):
self.context.sleep(0.1)
# Run the scan and capture the outcome
if scan_block.state.value is not RunnableStates.READY:
scan_block.reset()
# Configure first
outcome = None
try:
scan_block.configure(generator, fileDir=scan_directory)
except AssertionError:
outcome = ScanOutcome.MISCONFIGURED
except Exception as e:
outcome = ScanOutcome.MISCONFIGURED
self.log.error(
f"Unhandled exception for scan {scan_number} in {set_name}: "
f"({type(e)}) {e}"
)
# Run if configure was successful
start_time = self.get_current_datetime()
if outcome is None:
try:
scan_block.run()
except TimeoutError:
outcome = ScanOutcome.TIMEOUT
except NotWriteableError:
outcome = ScanOutcome.NOTWRITEABLE
except AbortedError:
outcome = ScanOutcome.ABORTED
except AssertionError:
outcome = ScanOutcome.FAIL
except Exception as e:
outcome = ScanOutcome.OTHER
self.log.error(
(
f"Unhandled exception for scan {scan_number} in {set_name}: "
f"({type(e)}) {e}"
)
)
else:
outcome = ScanOutcome.SUCCESS
# Record the outcome
end_time = self.get_current_datetime()
report_string = self.get_report_string(
set_name, scan_number, outcome, start_time, end_time
)
self.add_report_line(report_filepath, report_string)
if outcome is ScanOutcome.SUCCESS:
self.increment_scan_successes()
else:
self.increment_scan_failures()
def increment_scan_successes(self):
self.scan_successes.set_value(self.scan_successes.value + 1)
self.increment_scans_completed()
def increment_scan_failures(self):
self.scan_failures.set_value(self.scan_failures.value + 1)
self.increment_scans_completed()
def increment_scans_completed(self):
self.scans_completed.set_value(self.scans_completed.value + 1)
def get_report_string(
self,
set_name: str,
scan_number: int,
scan_outcome: ScanOutcome,
start_time: str,
end_time: str,
) -> str:
report_str = (
f"{set_name:<30}{scan_number:<10}{self.get_enum_label(scan_outcome):<14}"
f"{start_time:<20}{end_time}"
)
return report_str
def add_report_line(self, report_filepath: str, report_string: str) -> None:
try:
with open(report_filepath, "a+") as report_file:
report_file.write(f"{report_string}\n")
except IOError:
self.set_runner_state(RunnerStates.FAULT)
self.runner_status_message.set_value("Error writing report file")
raise IOError(f"Could not write to report file {report_filepath}")
@staticmethod
def get_enum_label(enum_state: Enum) -> str:
return enum_state.name.capitalize()
def set_runner_state(self, runner_state: RunnerStates) -> None:
self.runner_state.set_value(self.get_enum_label(runner_state))