Source code for fastcs.transport.epics.gui

from pvi._format.dls import DLSFormatter  # type: ignore
from pvi.device import (
    LED,
    ButtonPanel,
    CheckBox,
    ComboBox,
    ComponentUnion,
    Device,
    Grid,
    Group,
    ReadWidgetUnion,
    SignalR,
    SignalRW,
    SignalW,
    SignalX,
    SubScreen,
    TableRead,
    TableWrite,
    TextFormat,
    TextRead,
    TextWrite,
    ToggleButton,
    Tree,
    WriteWidgetUnion,
)
from pydantic import ValidationError

from fastcs.attributes import Attribute, AttrR, AttrRW, AttrW
from fastcs.controller_api import ControllerAPI
from fastcs.cs_methods import Command
from fastcs.datatypes import (
    Bool,
    DataType,
    Enum,
    Float,
    Int,
    String,
    Table,
    Waveform,
)
from fastcs.exceptions import FastCSException
from fastcs.util import numpy_to_fastcs_datatype, snake_to_pascal

from .options import EpicsGUIFormat, EpicsGUIOptions


[docs] class EpicsGUI: """For creating gui in the EPICS transports.""" def __init__(self, controller_api: ControllerAPI, pv_prefix: str) -> None: self._controller_api = controller_api self._pv_prefix = pv_prefix def _get_pv(self, attr_path: list[str], name: str): attr_prefix = ":".join( [self._pv_prefix] + [snake_to_pascal(node) for node in attr_path] ) return f"{attr_prefix}:{snake_to_pascal(name)}" def _get_read_widget(self, fastcs_datatype: DataType) -> ReadWidgetUnion | None: match fastcs_datatype: case Bool(): return LED() case Int() | Float(): return TextRead() case String(): return TextRead(format=TextFormat.string) case Enum(): return TextRead(format=TextFormat.string) case Waveform(): return None case datatype: raise FastCSException(f"Unsupported type {type(datatype)}: {datatype}") def _get_write_widget(self, fastcs_datatype: DataType) -> WriteWidgetUnion | None: match fastcs_datatype: case Bool(): return ToggleButton() case Int() | Float(): return TextWrite() case String(): return TextWrite(format=TextFormat.string) case Enum(): return ComboBox(choices=fastcs_datatype.names) case Waveform(): return None case datatype: raise FastCSException(f"Unsupported type {type(datatype)}: {datatype}") def _get_attribute_component( self, attr_path: list[str], name: str, attribute: Attribute ) -> SignalR | SignalW | SignalRW | None: pv = self._get_pv(attr_path, name) name = snake_to_pascal(name) match attribute: case AttrRW(): read_widget = self._get_read_widget(attribute.datatype) write_widget = self._get_write_widget(attribute.datatype) if write_widget is None or read_widget is None: return None return SignalRW( name=name, write_pv=pv, write_widget=write_widget, read_pv=pv + "_RBV", read_widget=read_widget, ) case AttrR(): read_widget = self._get_read_widget(attribute.datatype) if read_widget is None: return None return SignalR(name=name, read_pv=pv, read_widget=read_widget) case AttrW(): write_widget = self._get_write_widget(attribute.datatype) if write_widget is None: return None return SignalW(name=name, write_pv=pv, write_widget=write_widget) case _: raise FastCSException(f"Unsupported attribute type: {type(attribute)}") def _get_command_component(self, attr_path: list[str], name: str): pv = self._get_pv(attr_path, name) name = snake_to_pascal(name) return SignalX( name=name, write_pv=pv, value="1", write_widget=ButtonPanel(actions={name: "1"}), ) def create_gui(self, options: EpicsGUIOptions | None = None) -> None: if options is None: options = EpicsGUIOptions() if options.file_format is EpicsGUIFormat.edl: raise FastCSException("FastCS does not support .edl screens.") assert options.output_path.suffix == options.file_format.value options.output_path.parent.mkdir(parents=True, exist_ok=True) components = self.extract_api_components(self._controller_api) device = Device(label=options.title, children=components) formatter = DLSFormatter() formatter.format(device, options.output_path.resolve()) def extract_api_components(self, controller_api: ControllerAPI) -> Tree: components: Tree = [] for name, api in controller_api.sub_apis.items(): components.append( Group( name=snake_to_pascal(name), layout=SubScreen(), children=self.extract_api_components(api), ) ) groups: dict[str, list[ComponentUnion]] = {} for attr_name, attribute in controller_api.attributes.items(): try: signal = self._get_attribute_component( controller_api.path, attr_name, attribute, ) except ValidationError as e: print(f"Invalid name:\n{e}") continue if signal is None: continue match attribute: case Attribute(group=group) if group is not None: if group not in groups: groups[group] = [] # Remove duplication of group name and signal name signal.name = signal.name.removeprefix(group) groups[group].append(signal) case _: components.append(signal) for name, command in controller_api.command_methods.items(): signal = self._get_command_component(controller_api.path, name) match command: case Command(group=group) if group is not None: if group not in groups: groups[group] = [] groups[group].append(signal) case _: components.append(signal) for name, children in groups.items(): components.append(Group(name=name, layout=Grid(), children=children)) return components
[docs] class PvaEpicsGUI(EpicsGUI): """For creating gui in the PVA EPICS transport.""" def _get_pv(self, attr_path: list[str], name: str): return f"pva://{super()._get_pv(attr_path, name)}" def _get_read_widget(self, fastcs_datatype: DataType) -> ReadWidgetUnion | None: if isinstance(fastcs_datatype, Table): fastcs_datatypes = [ numpy_to_fastcs_datatype(datatype) for _, datatype in fastcs_datatype.structured_dtype ] base_get_read_widget = super()._get_read_widget widgets = [base_get_read_widget(datatype) for datatype in fastcs_datatypes] return TableRead(widgets=widgets) # type: ignore else: return super()._get_read_widget(fastcs_datatype) def _get_write_widget(self, fastcs_datatype: DataType) -> WriteWidgetUnion | None: if isinstance(fastcs_datatype, Table): widgets = [] for _, datatype in fastcs_datatype.structured_dtype: fastcs_datatype = numpy_to_fastcs_datatype(datatype) if isinstance(fastcs_datatype, Bool): # Replace with compact version for Table row widget = CheckBox() else: widget = super()._get_write_widget(fastcs_datatype) widgets.append(widget) return TableWrite(widgets=widgets) else: return super()._get_write_widget(fastcs_datatype)