Source code for fastcs.transport.epics.ca.ioc

import asyncio
from collections.abc import Callable
from typing import Any, Literal

from softioc import builder, softioc
from softioc.asyncio_dispatcher import AsyncioDispatcher
from softioc.pythonSoftIoc import RecordWrapper

from fastcs.attributes import AttrR, AttrRW, AttrW
from fastcs.controller_api import ControllerAPI
from fastcs.datatypes import DataType, T
from fastcs.transport.epics.ca.util import (
    builder_callable_from_attribute,
    cast_from_epics_type,
    cast_to_epics_type,
    record_metadata_from_attribute,
    record_metadata_from_datatype,
)
from fastcs.transport.epics.options import EpicsIOCOptions
from fastcs.util import snake_to_pascal

EPICS_MAX_NAME_LENGTH = 60


[docs] class EpicsCAIOC: """A softioc which handles a controller. Avoid running directly, instead use `fastcs.launch.FastCS`. """ def __init__( self, pv_prefix: str, controller_api: ControllerAPI, options: EpicsIOCOptions | None = None, ): self._options = options or EpicsIOCOptions() self._controller_api = controller_api _add_pvi_info(f"{pv_prefix}:PVI") _add_sub_controller_pvi_info(pv_prefix, controller_api) _create_and_link_attribute_pvs(pv_prefix, controller_api) _create_and_link_command_pvs(pv_prefix, controller_api) def run( self, loop: asyncio.AbstractEventLoop, ) -> None: dispatcher = AsyncioDispatcher(loop) # Needs running loop builder.LoadDatabase() softioc.iocInit(dispatcher)
def _add_pvi_info( pvi: str, parent_pvi: str = "", name: str = "", ): """Add PVI metadata for a controller. Args: pvi: PVI PV of controller parent_pvi: PVI PV of parent controller name: Name to register controller with parent as """ # Create a record to attach the info tags to record = builder.longStringIn( f"{pvi}_PV", initial_value=pvi, DESC="The records in this controller", ) # Create PVI PV in preparation for adding attribute info tags to it q_group = { pvi: { "+id": "epics:nt/NTPVI:1.0", "display.description": {"+type": "plain", "+channel": "DESC"}, "": {"+type": "meta", "+channel": "VAL"}, } } # If this controller has a parent, add a link in the parent to this controller if parent_pvi and name: q_group.update( { parent_pvi: { f"value.{name}.d": { "+channel": "VAL", "+type": "plain", "+trigger": f"value.{name}.d", } } } ) record.add_info("Q:group", q_group) def _add_sub_controller_pvi_info(pv_prefix: str, parent: ControllerAPI): """Add PVI references from controller to its sub controllers, recursively. Args: pv_prefix: PV Prefix of IOC parent: Controller to add PVI refs for """ parent_pvi = ":".join([pv_prefix] + parent.path + ["PVI"]) for child in parent.sub_apis.values(): child_pvi = ":".join([pv_prefix] + child.path + ["PVI"]) child_name = child.path[-1].lower() _add_pvi_info(child_pvi, parent_pvi, child_name) _add_sub_controller_pvi_info(pv_prefix, child) def _create_and_link_attribute_pvs( pv_prefix: str, root_controller_api: ControllerAPI ) -> None: for controller_api in root_controller_api.walk_api(): path = controller_api.path for attr_name, attribute in controller_api.attributes.items(): pv_name = snake_to_pascal(attr_name) _pv_prefix = ":".join([pv_prefix] + path) full_pv_name_length = len(f"{_pv_prefix}:{pv_name}") if full_pv_name_length > EPICS_MAX_NAME_LENGTH: attribute.enabled = False print( f"Not creating PV for {attr_name} for controller" f" {controller_api.path} as full name would exceed" f" {EPICS_MAX_NAME_LENGTH} characters" ) continue match attribute: case AttrRW(): if full_pv_name_length > (EPICS_MAX_NAME_LENGTH - 4): print( f"Not creating PVs for {attr_name} as _RBV PV" f" name would exceed {EPICS_MAX_NAME_LENGTH}" " characters" ) attribute.enabled = False else: _create_and_link_read_pv( _pv_prefix, f"{pv_name}_RBV", attr_name, attribute ) _create_and_link_write_pv( _pv_prefix, pv_name, attr_name, attribute ) case AttrR(): _create_and_link_read_pv(_pv_prefix, pv_name, attr_name, attribute) case AttrW(): _create_and_link_write_pv(_pv_prefix, pv_name, attr_name, attribute) def _create_and_link_read_pv( pv_prefix: str, pv_name: str, attr_name: str, attribute: AttrR[T] ) -> None: async def async_record_set(value: T): record.set(cast_to_epics_type(attribute.datatype, value)) record = _make_record(f"{pv_prefix}:{pv_name}", attribute) _add_attr_pvi_info(record, pv_prefix, attr_name, "r") attribute.add_update_callback(async_record_set) def _make_record( pv: str, attribute: AttrR | AttrW | AttrRW, on_update: Callable | None = None, out_record: bool = False, ) -> RecordWrapper: builder_callable = builder_callable_from_attribute(attribute, on_update is None) datatype_record_metadata = record_metadata_from_datatype( attribute.datatype, out_record ) attribute_record_metadata = record_metadata_from_attribute(attribute) update = {"always_update": True, "on_update": on_update} if on_update else {} record = builder_callable( pv, **update, **datatype_record_metadata, **attribute_record_metadata ) def datatype_updater(datatype: DataType): for name, value in record_metadata_from_datatype(datatype, out_record).items(): record.set_field(name, value) attribute.add_update_datatype_callback(datatype_updater) return record def _create_and_link_write_pv( pv_prefix: str, pv_name: str, attr_name: str, attribute: AttrW[T] ) -> None: async def on_update(value): await attribute.process_without_display_update( cast_from_epics_type(attribute.datatype, value) ) async def async_write_display(value: T): record.set(cast_to_epics_type(attribute.datatype, value), process=False) record = _make_record( f"{pv_prefix}:{pv_name}", attribute, on_update=on_update, out_record=True ) _add_attr_pvi_info(record, pv_prefix, attr_name, "w") attribute.add_write_display_callback(async_write_display) def _create_and_link_command_pvs( pv_prefix: str, root_controller_api: ControllerAPI ) -> None: for controller_api in root_controller_api.walk_api(): path = controller_api.path for attr_name, method in controller_api.command_methods.items(): pv_name = snake_to_pascal(attr_name) _pv_prefix = ":".join([pv_prefix] + path) if len(f"{_pv_prefix}:{pv_name}") > EPICS_MAX_NAME_LENGTH: print( f"Not creating PV for {attr_name} as full name would exceed" f" {EPICS_MAX_NAME_LENGTH} characters" ) method.enabled = False else: _create_and_link_command_pv( _pv_prefix, pv_name, attr_name, method.fn, ) def _create_and_link_command_pv( pv_prefix: str, pv_name: str, attr_name: str, method: Callable ) -> None: async def wrapped_method(_: Any): await method() record = builder.Action( f"{pv_prefix}:{pv_name}", on_update=wrapped_method, blocking=True, ) _add_attr_pvi_info(record, pv_prefix, attr_name, "x") def _add_attr_pvi_info( record: RecordWrapper, prefix: str, name: str, access_mode: Literal["r", "w", "rw", "x"], ): """Add an info tag to a record to include it in the PVI for the controller. Args: record: Record to add info tag to prefix: PV prefix of controller name: Name of parameter to add to PVI access_mode: Access mode of parameter """ record.add_info( "Q:group", { f"{prefix}:PVI": { f"value.{name}.{access_mode}": { "+channel": "NAME", "+type": "plain", "+trigger": f"value.{name}.{access_mode}", } } }, )