Source code for virtac.virtac_server

"""Contains the VirtacServer class which creates and manages the PV interface to the
VIRTAC. It also provides various public methods to interact with Virtac"""

import csv
import logging
import typing
from collections import defaultdict
from enum import StrEnum
from pathlib import Path
from typing import cast

import atip
import numpy
import pytac

from .pv import (
    BasePV,
    CollationPV,
    InversionPV,
    MonitorPV,
    ReadSimPV,
    ReadWriteSimPV,
    RecordData,
    RecordTypes,
    RecordValueType,
    RefreshPV,
    SummationPV,
)

LimitsDictType = dict[str, tuple[str, str, str, str, str, str]]


class MirrorType(StrEnum):
    BASIC = "basic"
    INVERSE = "inverse"
    SUMMATE = "summate"
    COLLATE = "collate"


MIRROR_TYPES = {
    MirrorType.BASIC: MonitorPV,
    MirrorType.INVERSE: InversionPV,
    MirrorType.SUMMATE: SummationPV,
    MirrorType.COLLATE: CollationPV,
}


[docs] class VirtacServer: """The soft-ioc server which contains the configuration and PVs for the VIRTAC. It allows ATIP to be interfaced using EPICS in the same manner as the live machine. Attributes: lattice (pytac.lattice.Lattice): An instance of a Pytac lattice with a simulator data source derived from pyAT. """ def __init__( self, ring_mode: str, limits_csv: Path | None = None, bba_csv: Path | None = None, feedback_csv: Path | None = None, mirror_csv: Path | None = None, tune_csv: Path | None = None, disable_emittance: bool = False, disable_tunefb: bool = False, ) -> None: """ Args: ring_mode: The ring mode to create the lattice in. limits_csv: The filepath to the .csv file from which to load the pv limits. For more information see create_csv.py. bba_csv: The filepath to the .csv file from which to load the bba records, for more information see create_csv.py. feedback_csv: The filepath to the .csv file from which to load the feedback records, for more information see create_csv.py. mirror_csv: The filepath to the .csv file from which to load the mirror records, for more information see create_csv.py. tune_csv: The filepath to the .csv file from which to load the tune feedback records, for more information see create_csv.py. disable_emittance: Whether emittance should be disabled. disable_tunefb: Whether tune feedback should be disabled. """ self._disable_emittance: bool = disable_emittance self._disable_tunefb: bool = disable_tunefb self._pv_monitoring: bool = True self.lattice: pytac.lattice.EpicsLattice = atip.utils.loader( ring_mode, self.update_pvs, self._disable_emittance ) self.lattice.set_default_data_source(pytac.SIM) # Holding dictionary for all PVs self._pv_dict: dict[str, BasePV] = {} # Dictionary for the PVs which should be automatically updated when the # simulation data is recalculated self._readback_pvs_dict: dict[str, ReadSimPV] = {} print("Starting PV creation.") self._create_core_pvs(limits_csv) if bba_csv is not None: self._create_bba_records(bba_csv) if feedback_csv is not None: self._create_feedback_records(feedback_csv) if mirror_csv is not None: self._create_mirror_records(mirror_csv) if not disable_tunefb and tune_csv is not None: self._setup_tune_feedback(tune_csv) self.print_virtac_stats()
[docs] def update_pvs(self) -> None: """The callback function passed to ATSimulator during lattice creation, which is called each time a calculation of physics data is completed and updates all the in records that do not have a corresponding out record with the latest values from the simulator. """ logging.info("Updating output PVs") for pv in self._readback_pvs_dict.values(): pv.update_from_sim() logging.debug("Finished updating output PVs")
def _create_core_pvs(self, limits_csv: Path | None) -> None: """Create the core records required for the virtac using both lattice and element pytac data. Args: limits_csv: The filepath to the .csv file from which to load pv field data to configure softioc records with. """ limits_dict: LimitsDictType = {} if limits_csv is not None: with open(limits_csv) as f: csv_reader = csv.DictReader(f) for line in csv_reader: limits_dict[line["pv"]] = ( str(line["upper"]), str(line["lower"]), str(line["precision"]), str(line["drive_high"]), str(line["drive_low"]), str(line["scan"]), ) # Create PVs from lattice elements. self._create_element_pvs(limits_dict) # Create PVs from the lattice itself. self._create_lattice_pvs(limits_dict) def _create_element_pvs(self, limits_dict: LimitsDictType) -> None: """Create a PV for each simulated field on each pytac lattice element. .. note:: The one exception to the rule of one PV per field is for the bend magnets. Each of the 50 bend magnets shares a single PV which stores their current value as they have a shared power supply in the real machine. .. note:: For fields which have an in type record (RB) and an out type record (SP)we create SetpointPVs (or a derivative). SetpointPVs are used to set the pytac element with their SP record, the RB record merely reflects the set value. .. note:: For fields which only have an (RB) record and no (SP) record we just create regular PVs and we set their update_from_lattice to true. This means that when the Pytac lattice is updated after a PyAT physics recalculation, these PVs read their value from it. Args: limits_dict: A dictionary containing the limits data for the PVs. """ bend_in_record = None for element in self.lattice: # There is only 1 bend PV for all bend magnets, each bend element is added # to this PV if element.type_.upper() == "BEND" and bend_in_record is not None: bend_in_record.append_pytac_item(element) else: for field in cast( dict[str, list[str]], element.get_fields()[pytac.SIM] ): value = element.get_value( field, units=pytac.ENG, data_source=pytac.SIM ) read_pv_name = cast(str, element.get_pv_name(field, pytac.RB)) upper, lower, precision, drive_high, drive_low, scan = ( limits_dict.get( read_pv_name, (None, None, None, None, None, "I/O Intr") ) ) record_data = RecordData( RecordTypes.AI, lower=lower, upper=upper, precision=precision, drive_high=drive_high, drive_low=drive_low, initial_value=value, scan=scan, ) read_pv = ReadSimPV( read_pv_name, record_data, pytac_items=[element], field=field ) self._pv_dict[read_pv_name] = read_pv # Readback PVs without a setpoint PV are updated from the simulation # after recalculation. Readback PVs with a setpoint PV are updated # when their associated setpoint PV is updated. try: read_write_pv_name = cast( str, element.get_pv_name(field, pytac.SP) ) except pytac.exceptions.HandleException: # Only triggered if this element has an RB PV but no SP PV. # Add to list of PVs to be updated from the simulation self._readback_pvs_dict[read_pv_name] = read_pv else: upper, lower, precision, drive_high, drive_low, scan = ( limits_dict.get( read_write_pv_name, (None, None, None, None, None, "Passive"), ) ) record_data = RecordData( RecordTypes.AO, lower=lower, upper=upper, precision=precision, drive_high=drive_high, drive_low=drive_low, initial_value=value, always_update=True, ) read_write_pv = ReadWriteSimPV( read_write_pv_name, record_data, read_pv, pytac_items=[element], pytac_field=field, ) self._pv_dict[read_write_pv_name] = read_write_pv if element.type_.upper() == "BEND" and bend_in_record is None: bend_in_record = read_write_pv def _create_lattice_pvs(self, limits_dict: LimitsDictType) -> None: """Create a PV for each simulated field on each pytac lattice itself. .. note:: For fields which have an in type record (RB) and an out type record (SP) we create SetpointPVs (or a derivative). SetpointPVs are used to set the pytac element with their SP record, the RB record merely reflects the set value. .. note:: For fields which only have an (RB) record and no (SP) record we just create regular PVs and we set their update_from_lattice to true. This means that when the pytac lattice is recalculated, these PVs read their value from the lattice. Args: limits_dict: A dictionary containing the limits data for the PVs. """ lat_field_dict = cast(dict[str, list[str]], self.lattice.get_fields()) lat_field_set = set(lat_field_dict[pytac.LIVE]) & set(lat_field_dict[pytac.SIM]) if self._disable_emittance: lat_field_set -= {"emittance_x", "emittance_y"} for field in lat_field_set: # Ignore basic devices as they do not have PVs. if not isinstance( self.lattice.get_device(field), pytac.device.SimpleDevice ): get_pv_name = cast(str, self.lattice.get_pv_name(field, pytac.RB)) upper, lower, precision, _, _, scan = limits_dict.get( get_pv_name, (None, None, None, None, None, "I/O Intr") ) value = self.lattice.get_value( field, units=pytac.ENG, data_source=pytac.SIM ) record_data = RecordData( RecordTypes.AI, lower=lower, upper=upper, precision=precision, scan=scan, initial_value=value, ) read_pv = ReadSimPV( get_pv_name, record_data, pytac_items=[self.lattice], field=field ) self._pv_dict[get_pv_name] = read_pv self._readback_pvs_dict[get_pv_name] = read_pv def _create_bba_records(self, bba_csv: Path) -> None: """Create all the beam-based-alignment records from the .csv file at the location passed, see create_csv.py for more information. Args: bba_csv: The filepath to the .csv file to load the records in accordance with. """ self._create_feedback_or_bba_records_from_csv(bba_csv) def _create_feedback_records(self, feedback_csv: Path) -> None: """Create all the feedback records from the .csv file at the location passed, see create_csv.py for more information; records for one edge case are also created. Args: feedback_csv: The filepath to the .csv file to load the records in accordance with. """ self._create_feedback_or_bba_records_from_csv(feedback_csv) # We can choose to not calculate emittance as it is not always required, # which decreases computation time. if not self._disable_emittance: name = "SR-DI-EMIT-01:STATUS" record_data = RecordData(RecordTypes.MBBI, zrvl="0", zrst="Successful") emit_status_pv = BasePV(name, record_data) self._pv_dict[name] = emit_status_pv def _create_feedback_or_bba_records_from_csv(self, csv_file: Path) -> None: """Read the csv file and create the corresponding records based on its contents. Args: csv_file: The filepath to the .csv file to load the records in accordance with. """ # We don't set limits or precision but this shouldn't be an issue as these # records aren't intended to be set to by a user. with open(csv_file) as f: csv_reader = csv.DictReader(f) for line in csv_reader: val: typing.Any = 0 name = line["pv"] try: # Waveform records may have values stored as a list such as: [5 1 3] # We convert that into a numpy array for initialising the record if (line["value"][0], line["value"][-1]) == ("[", "]"): val = numpy.fromstring((line["value"])[1:-1], sep=" ") else: val = float(line["value"]) except (AssertionError, ValueError) as exc: raise ValueError( f"Invalid initial value for {line['record_type']} record: " f"{line['value']}" ) from exc else: record_data = RecordData(line["record_type"], initial_value=val) pv = ReadSimPV( name, record_data, [self.lattice[int(line["index"]) - 1]], line["field"], ) self._pv_dict[name] = pv def _create_mirror_records(self, mirror_csv: Path) -> None: """Create all the mirror records from the .csv file at the location passed, see create_csv.py for more information. Args: mirror_csv : The filepath to the .csv file to load the records in accordance with. """ with open(mirror_csv) as f: val: RecordValueType = 0 csv_reader = csv.DictReader(f) for line in csv_reader: # Get a list of input pvs, these are all virtac owned pvs input_pv_names = line["in_pv"].split(", ") if (len(input_pv_names) > 1) and (line["mirror_type"] in ["basic"]): raise IndexError( "Transformation mirror type takes only one input PV." ) elif (len(input_pv_names) < 2) and ( line["mirror_type"] in ["collate", "summate"] ): raise IndexError( "collation and summation mirror types take at least two input " "PVs." ) input_records: list[BasePV] = [] for pv in input_pv_names: try: # Lookup pv in our dictionary of softioc records input_records.append(self._pv_dict[pv]) except KeyError: logging.exception(f"PV {pv} does not exist within virtac") try: # Waveform records may have values stored as a list such as: [5 1 3] # We convert that into a numpy array for initialising the record if (line["value"][0], line["value"][-1]) == ("[", "]"): val = numpy.fromstring((line["value"])[1:-1], sep=" ") else: val = float(line["value"]) except (AssertionError, ValueError) as exc: raise ValueError( f"Invalid initial value for {line['output_type']} record: " f"{line['value']}" ) from exc else: out_pv_name = line["out_pv"] record_data = RecordData( line["output_type"], initial_value=val, scan=line["scan"], ) try: mirror_type = MIRROR_TYPES[MirrorType(line["mirror_type"])] if mirror_type == MIRROR_TYPES[MirrorType.BASIC]: mirror_type = cast(type[MonitorPV], mirror_type) # MonitorPV requires a list of str rather than a list of PV output_pv = mirror_type( out_pv_name, record_data, input_pv_names ) else: mirror_type = cast( type[InversionPV] | type[SummationPV] | type[CollationPV], mirror_type, ) output_pv = mirror_type( out_pv_name, record_data, input_records ) except KeyError as e: raise TypeError( f"{line['mirror_type']} is not valid, please use one of: " f"({', '.join(n.value for n in MirrorType)})" ) from e self._pv_dict[out_pv_name] = output_pv def _setup_tune_feedback(self, tune_csv: Path) -> None: """Read the tune feedback .csv and find the associated offset PVs, before starting monitoring them for a change to mimic the behaviour of the quadrupoles used by the tune feedback system on the live machine. .. note:: This is intended to be on the recieving end of the tune feedback system and doesn't actually perfom tune feedback itself. .. note:: The 'offset_pv' is the PV which monitors a 'delta_pv' and when the 'delta_pv' changes, stores its value and triggers the 'set_pv' to process. When the 'set_pv' processes, it gets the value we just stored to the 'offset_pv' and adds it to its own value. Args: tune_csv: A path to a tune feedback .csv file to be used instead of the default filepath passed at startup. """ with open(tune_csv) as f: csv_reader = csv.DictReader(f) for line in csv_reader: assert isinstance(self._pv_dict[line["set_pv"]], ReadWriteSimPV) self._pv_dict[line["offset_pv"]] set_record = cast(ReadWriteSimPV, self._pv_dict[line["set_pv"]]) old_set_record = cast(ReadWriteSimPV, self._pv_dict[line["offset_pv"]]) # We overwrite the old_set_record with the new RefreshPV which has # the required capabilities for tunefb new_set_record = RefreshPV( line["offset_pv"], line["delta_pv"], set_record, old_set_record ) set_record.attach_offset_record(new_set_record) self._pv_dict[line["offset_pv"]] = new_set_record
[docs] def enable_monitoring(self) -> None: """Enable monitoring for all MonitorPV derived PVs. This will allow tune feedback and vertical emittance feedback to work again """ if self._pv_monitoring: logging.warning("PV monitoring is already enabled, nothing to do.") else: logging.info("Enabling PV monitoring") for pv in self._pv_dict.values(): if isinstance(pv, MonitorPV): pv.enable_monitoring() self._pv_monitoring = True
# TODO: Is this needed? It essentially just pauses a subset of the virtacs # functionality
[docs] def disable_monitoring(self) -> None: """Disable monitoring for all MonitorPV derived PVs. This will disable tune feedback and vertical emittance feedback """ if not self._pv_monitoring: logging.warning("PV monitoring is already disabled, nothing to do.") else: logging.info("Disabling PV monitoring") for _, pv in self._pv_dict.items(): if isinstance(pv, MonitorPV): pv.disable_monitoring() self._pv_monitoring = False
[docs] def print_virtac_stats(self, verbosity: int = 0) -> None: """Print helpful statistics based on passed verbosity level Args: verbosity: The verbosity level to print at, higher levels print more information. """ pv_type_count: dict[type[BasePV], int] = defaultdict(int) for pv in self._pv_dict.values(): pv_type_count[type(pv)] += 1 print("Virtac stats:") print( "\t Tune feedbacks is " f"{('disabled' if self._disable_tunefb else 'enabled')}" ) print( "\t Emittance calculations are " f"{('disabled' if self._disable_emittance else 'enabled')}" ) print( f"\t PV monitoring is {('enabled' if self._pv_monitoring else 'disabled')}" ) print(f"\t Total pvs: {len(self._pv_dict)}") for pv_type, count in pv_type_count.items(): print(f"\t\t {pv_type.__name__} pvs: {count}") if verbosity >= 1: print("\tAvailable PVs") for pv in self._pv_dict.values(): print(f"\t\t{pv.name}, {type(pv)}")