Source code for fastcs.transports.epics.ca.ioc

import asyncio
from collections.abc import Callable
from typing import Any, Literal

from softioc import builder, softioc
from softioc.asyncio_dispatcher import AsyncioDispatcher
from softioc.pythonSoftIoc import RecordWrapper

from fastcs.attributes import AttrR, AttrRW, AttrW
from fastcs.datatypes import DataType, DType_T
from fastcs.logging import bind_logger
from fastcs.methods import Command
from fastcs.tracer import Tracer
from fastcs.transports.controller_api import ControllerAPI
from fastcs.transports.epics import EpicsIOCOptions
from fastcs.transports.epics.ca.util import (
    builder_callable_from_attribute,
    cast_from_epics_type,
    cast_to_epics_type,
    record_metadata_from_attribute,
    record_metadata_from_datatype,
)
from fastcs.transports.epics.util import controller_pv_prefix
from fastcs.util import snake_to_pascal

EPICS_MAX_NAME_LENGTH = 60


tracer = Tracer(name=__name__)
logger = bind_logger(logger_name=__name__)


[docs] class EpicsCAIOC: """A softioc which handles a controller""" def __init__( self, pv_prefix: str, controller_api: ControllerAPI, options: EpicsIOCOptions | None = None, ): self._options = options or EpicsIOCOptions() self._controller_api = controller_api _add_pvi_info(f"{pv_prefix}:PVI") _add_sub_controller_pvi_info(pv_prefix, controller_api) _create_and_link_attribute_pvs(pv_prefix, controller_api) _create_and_link_command_pvs(pv_prefix, controller_api) def run( self, loop: asyncio.AbstractEventLoop, ) -> None: dispatcher = AsyncioDispatcher(loop) # Needs running loop builder.LoadDatabase() softioc.iocInit(dispatcher)
def _add_pvi_info( pvi: str, parent_pvi: str = "", name: str = "", ): """Add PVI metadata for a controller. Args: pvi: PVI PV of controller parent_pvi: PVI PV of parent controller name: Name to register controller with parent as """ # Create a record to attach the info tags to record = builder.longStringIn( f"{pvi}_PV", initial_value=pvi, DESC="The records in this controller", ) # Create PVI PV in preparation for adding attribute info tags to it q_group = { pvi: { "+id": "epics:nt/NTPVI:1.0", "display.description": {"+type": "plain", "+channel": "DESC"}, "": {"+type": "meta", "+channel": "VAL"}, } } # If this controller has a parent, add a link in the parent to this controller if parent_pvi and name: q_group.update( { parent_pvi: { f"value.{name}.d": { "+channel": "VAL", "+type": "plain", "+trigger": f"value.{name}.d", } } } ) record.add_info("Q:group", q_group) def _add_sub_controller_pvi_info(pv_prefix: str, parent: ControllerAPI): """Add PVI references from controller to its sub controllers, recursively. Args: pv_prefix: PV Prefix of IOC parent: Controller to add PVI refs for """ parent_pvi = f"{controller_pv_prefix(pv_prefix, parent)}:PVI" for child in parent.sub_apis.values(): child_pvi = f"{controller_pv_prefix(pv_prefix, child)}:PVI" child_name = ( f"__{child.path[-1]}" # Sub-Controller of ControllerVector if child.path[-1].isdigit() else child.path[-1] ) _add_pvi_info(child_pvi, parent_pvi, child_name.lower()) _add_sub_controller_pvi_info(pv_prefix, child) def _create_and_link_attribute_pvs( root_pv_prefix: str, root_controller_api: ControllerAPI ) -> None: for controller_api in root_controller_api.walk_api(): pv_prefix = controller_pv_prefix(root_pv_prefix, controller_api) for attr_name, attribute in controller_api.attributes.items(): pv_name = snake_to_pascal(attr_name) full_pv_name_length = len(f"{pv_prefix}:{pv_name}") if full_pv_name_length > EPICS_MAX_NAME_LENGTH: attribute.enabled = False print( f"Not creating PV for {attr_name} for controller" f" {controller_api.path} as full name would exceed" f" {EPICS_MAX_NAME_LENGTH} characters" ) continue match attribute: case AttrRW(): if full_pv_name_length > (EPICS_MAX_NAME_LENGTH - 4): print( f"Not creating PVs for {attr_name} as _RBV PV" f" name would exceed {EPICS_MAX_NAME_LENGTH}" " characters" ) attribute.enabled = False else: _create_and_link_read_pv( pv_prefix, f"{pv_name}_RBV", attr_name, attribute ) _create_and_link_write_pv( pv_prefix, pv_name, attr_name, attribute ) case AttrR(): _create_and_link_read_pv(pv_prefix, pv_name, attr_name, attribute) case AttrW(): _create_and_link_write_pv(pv_prefix, pv_name, attr_name, attribute) def _create_and_link_read_pv( pv_prefix: str, pv_name: str, attr_name: str, attribute: AttrR[DType_T] ) -> None: pv = f"{pv_prefix}:{pv_name}" async def async_record_set(value: DType_T): tracer.log_event("PV set from attribute", topic=attribute, pv=pv, value=value) record.set(cast_to_epics_type(attribute.datatype, value)) record = _make_record(pv, attribute) _add_attr_pvi_info(record, pv_prefix, attr_name, "r") attribute.add_on_update_callback(async_record_set) def _make_record( pv: str, attribute: AttrR | AttrW | AttrRW, on_update: Callable | None = None, out_record: bool = False, ) -> RecordWrapper: builder_callable = builder_callable_from_attribute(attribute, on_update is None) datatype_record_metadata = record_metadata_from_datatype( attribute.datatype, out_record ) attribute_record_metadata = record_metadata_from_attribute(attribute) update = ( {"on_update": on_update, "always_update": True, "blocking": True} if on_update else {} ) record = builder_callable( pv, **update, **datatype_record_metadata, **attribute_record_metadata ) def datatype_updater(datatype: DataType): for name, value in record_metadata_from_datatype(datatype, out_record).items(): record.set_field(name, value) attribute.add_update_datatype_callback(datatype_updater) return record def _create_and_link_write_pv( pv_prefix: str, pv_name: str, attr_name: str, attribute: AttrW[DType_T] ) -> None: pv = f"{pv_prefix}:{pv_name}" async def on_update(value): logger.info("PV put: {pv} = {value}", pv=pv, value=value) await attribute.put(cast_from_epics_type(attribute.datatype, value)) async def set_setpoint_without_process(value: DType_T): tracer.log_event( "PV setpoint set from attribute", topic=attribute, pv=pv, value=value ) record.set(cast_to_epics_type(attribute.datatype, value), process=False) record = _make_record(pv, attribute, on_update=on_update, out_record=True) _add_attr_pvi_info(record, pv_prefix, attr_name, "w") attribute.add_sync_setpoint_callback(set_setpoint_without_process) def _create_and_link_command_pvs( root_pv_prefix: str, root_controller_api: ControllerAPI ) -> None: for controller_api in root_controller_api.walk_api(): pv_prefix = controller_pv_prefix(root_pv_prefix, controller_api) for attr_name, method in controller_api.command_methods.items(): pv_name = snake_to_pascal(attr_name) if len(f"{pv_prefix}:{pv_name}") > EPICS_MAX_NAME_LENGTH: print( f"Not creating PV for {attr_name} as full name would exceed" f" {EPICS_MAX_NAME_LENGTH} characters" ) method.enabled = False else: _create_and_link_command_pv( pv_prefix, pv_name, attr_name, method, ) def _create_and_link_command_pv( pv_prefix: str, pv_name: str, attr_name: str, method: Command ) -> None: pv = f"{pv_prefix}:{pv_name}" async def wrapped_method(_: Any): tracer.log_event("Command PV put", topic=method, pv=pv) await method.fn() record = builder.Action( f"{pv_prefix}:{pv_name}", on_update=wrapped_method, blocking=True, initial_value=0, ZNAM="Idle", ONAM="Active", ) _add_attr_pvi_info(record, pv_prefix, attr_name, "x") def _add_attr_pvi_info( record: RecordWrapper, prefix: str, name: str, access_mode: Literal["r", "w", "rw", "x"], ): """Add an info tag to a record to include it in the PVI for the controller. Args: record: Record to add info tag to prefix: PV prefix of controller name: Name of parameter to add to PVI access_mode: Access mode of parameter """ record.add_info( "Q:group", { f"{prefix}:PVI": { f"value.{name}.{access_mode}": { "+channel": "NAME", "+type": "plain", "+trigger": f"value.{name}.{access_mode}", } } }, )