Source code for fastcs.transports.epics.pva.ioc
import asyncio
from p4p.server import Server, StaticProvider
from fastcs.attributes import AttrR, AttrRW, AttrW
from fastcs.transports.controller_api import ControllerAPI
from fastcs.transports.epics.util import controller_pv_prefix
from fastcs.util import snake_to_pascal
from ._pv_handlers import make_command_pv, make_shared_read_pv, make_shared_write_pv
from .pvi import add_pvi_info
[docs]
async def parse_attributes(
root_pv_prefix: str, root_controller_api: ControllerAPI
) -> StaticProvider:
"""Parses `Attribute` s into p4p signals in handlers."""
provider = StaticProvider(root_pv_prefix)
for controller_api in root_controller_api.walk_api():
pv_prefix = controller_pv_prefix(root_pv_prefix, controller_api)
add_pvi_info(
provider=provider,
pv_prefix=pv_prefix,
controller_api=controller_api,
description=controller_api.description,
)
for attr_name, attribute in controller_api.attributes.items():
full_pv_name = f"{pv_prefix}:{snake_to_pascal(attr_name)}"
match attribute:
case AttrRW():
attribute_pv = make_shared_write_pv(attribute)
attribute_pv_rbv = make_shared_read_pv(attribute)
provider.add(f"{full_pv_name}", attribute_pv)
provider.add(f"{full_pv_name}_RBV", attribute_pv_rbv)
case AttrR():
attribute_pv = make_shared_read_pv(attribute)
provider.add(f"{full_pv_name}", attribute_pv)
case AttrW():
attribute_pv = make_shared_write_pv(attribute)
provider.add(f"{full_pv_name}", attribute_pv)
for attr_name, method in controller_api.command_methods.items():
full_pv_name = f"{pv_prefix}:{snake_to_pascal(attr_name)}"
command_pv = make_command_pv(method.fn)
provider.add(f"{full_pv_name}", command_pv)
return provider
[docs]
class P4PIOC:
"""A P4P IOC which handles a controller"""
def __init__(self, pv_prefix: str, controller_api: ControllerAPI):
self.pv_prefix = pv_prefix
self.controller_api = controller_api
async def run(self):
provider = await parse_attributes(self.pv_prefix, self.controller_api)
endless_event = asyncio.Event()
with Server([provider]):
await endless_event.wait()