Source code for fastcs.transport.epics.pva.ioc

import asyncio

from p4p.server import Server, StaticProvider

from fastcs.attributes import Attribute, AttrR, AttrRW, AttrW
from fastcs.controller_api import ControllerAPI
from fastcs.util import snake_to_pascal

from ._pv_handlers import (
    make_command_pv,
    make_shared_read_pv,
    make_shared_write_pv,
)
from .pvi_tree import AccessModeType, PviTree


def _attribute_to_access(attribute: Attribute) -> AccessModeType:
    match attribute:
        case AttrRW():
            return "rw"
        case AttrR():
            return "r"
        case AttrW():
            return "w"
        case _:
            raise ValueError(f"Unknown attribute type {type(attribute)}")


[docs] def get_pv_name(pv_prefix: str, *attribute_names: str) -> str: """Converts from an attribute name to a pv name.""" pv_formatted = ":".join([snake_to_pascal(attr) for attr in attribute_names]) return f"{pv_prefix}:{pv_formatted}" if pv_formatted else pv_prefix
[docs] async def parse_attributes( root_pv_prefix: str, root_controller_api: ControllerAPI ) -> list[StaticProvider]: """Parses `Attribute` s into p4p signals in handlers.""" pvi_tree = PviTree(root_pv_prefix) provider = StaticProvider(root_pv_prefix) for controller_api in root_controller_api.walk_api(): pv_prefix = get_pv_name(root_pv_prefix, *controller_api.path) pvi_tree.add_sub_device(pv_prefix, controller_api.description) for attr_name, attribute in controller_api.attributes.items(): pv_name = get_pv_name(pv_prefix, attr_name) match attribute: case AttrRW(): attribute_pv = make_shared_write_pv(attribute) attribute_pv_rbv = make_shared_read_pv(attribute) provider.add(pv_name, attribute_pv) provider.add(f"{pv_name}_RBV", attribute_pv_rbv) pvi_tree.add_signal(pv_name, "rw") case AttrR(): attribute_pv = make_shared_read_pv(attribute) provider.add(pv_name, attribute_pv) pvi_tree.add_signal(pv_name, "r") case AttrW(): attribute_pv = make_shared_write_pv(attribute) provider.add(pv_name, attribute_pv) pvi_tree.add_signal(pv_name, "w") for attr_name, method in controller_api.command_methods.items(): pv_name = get_pv_name(pv_prefix, attr_name) command_pv = make_command_pv(method.fn) provider.add(pv_name, command_pv) pvi_tree.add_signal(pv_name, "x") return [provider, pvi_tree.make_provider()]
[docs] class P4PIOC: """A P4P IOC which handles a controller. Avoid running directly, instead use `fastcs.launch.FastCS`. """ def __init__(self, pv_prefix: str, controller_api: ControllerAPI): self.pv_prefix = pv_prefix self.controller_api = controller_api async def run(self): providers = await parse_attributes(self.pv_prefix, self.controller_api) endless_event = asyncio.Event() with Server(providers): await endless_event.wait()