import enum
from collections.abc import Callable
from dataclasses import asdict
from typing import Any
from softioc import builder
from softioc.pythonSoftIoc import RecordWrapper
from fastcs.attributes import AttrR, AttrRW, AttrW
from fastcs.datatypes import Bool, DataType, DType_T, Enum, Float, Int, String, Waveform
from fastcs.exceptions import FastCSError
_MBB_FIELD_PREFIXES = (
"ZR",
"ON",
"TW",
"TH",
"FR",
"FV",
"SX",
"SV",
"EI",
"NI",
"TE",
"EL",
"TV",
"TT",
"FT",
"FF",
)
MBB_STATE_FIELDS = tuple(f"{p}ST" for p in _MBB_FIELD_PREFIXES)
MBB_VALUE_FIELDS = tuple(f"{p}VL" for p in _MBB_FIELD_PREFIXES)
MBB_MAX_CHOICES = len(_MBB_FIELD_PREFIXES)
EPICS_ALLOWED_DATATYPES = (Bool, Enum, Float, Int, String, Waveform)
DEFAULT_STRING_WAVEFORM_LENGTH = 256
DATATYPE_FIELD_TO_IN_RECORD_FIELD = {
"prec": "PREC",
"units": "EGU",
"min_alarm": "LOPR",
"max_alarm": "HOPR",
}
DATATYPE_FIELD_TO_OUT_RECORD_FIELD = {
"prec": "PREC",
"units": "EGU",
"min": "DRVL",
"max": "DRVH",
"min_alarm": "LOPR",
"max_alarm": "HOPR",
}
def _make_in_record(pv: str, attribute: AttrR) -> RecordWrapper:
common_fields = {
"DESC": attribute.description,
"initial_value": cast_to_epics_type(attribute.datatype, attribute.get()),
}
match attribute.datatype:
case Bool():
record = builder.boolIn(pv, ZNAM="False", ONAM="True", **common_fields)
case Int():
record = builder.longIn(
pv,
LOPR=attribute.datatype.min_alarm,
HOPR=attribute.datatype.max_alarm,
EGU=attribute.datatype.units,
**common_fields,
)
case Float():
record = builder.aIn(
pv,
LOPR=attribute.datatype.min_alarm,
HOPR=attribute.datatype.max_alarm,
EGU=attribute.datatype.units,
PREC=attribute.datatype.prec,
**common_fields,
)
case String():
record = builder.longStringIn(
pv,
length=attribute.datatype.length or DEFAULT_STRING_WAVEFORM_LENGTH,
**common_fields,
)
case Enum():
if len(attribute.datatype.members) > MBB_MAX_CHOICES:
record = builder.longStringIn(
pv,
**common_fields,
)
else:
common_fields.update(create_state_keys(attribute.datatype))
record = builder.mbbIn(
pv,
**common_fields,
)
case Waveform():
record = builder.WaveformIn(
pv, length=attribute.datatype.shape[0], **common_fields
)
case _:
raise FastCSError(
f"EPICS unsupported datatype on {attribute}: {attribute.datatype}"
)
def datatype_updater(datatype: DataType):
for name, value in asdict(datatype).items():
if name in DATATYPE_FIELD_TO_IN_RECORD_FIELD:
record.set_field(DATATYPE_FIELD_TO_IN_RECORD_FIELD[name], value)
attribute.add_update_datatype_callback(datatype_updater)
return record
def _make_out_record(pv: str, attribute: AttrW, on_update: Callable) -> RecordWrapper:
common_fields = {
"DESC": attribute.description,
"initial_value": cast_to_epics_type(
attribute.datatype,
attribute.get()
if isinstance(attribute, AttrRW)
else attribute.datatype.initial_value,
),
"on_update": on_update,
"always_update": True,
"blocking": True,
}
match attribute.datatype:
case Bool():
record = builder.boolOut(pv, ZNAM="False", ONAM="True", **common_fields)
case Int():
record = builder.longOut(
pv,
LOPR=attribute.datatype.min_alarm,
HOPR=attribute.datatype.max_alarm,
EGU=attribute.datatype.units,
DRVL=attribute.datatype.min,
DRVH=attribute.datatype.max,
**common_fields,
)
case Float():
record = builder.aOut(
pv,
LOPR=attribute.datatype.min_alarm,
HOPR=attribute.datatype.max_alarm,
EGU=attribute.datatype.units,
PREC=attribute.datatype.prec,
DRVL=attribute.datatype.min,
DRVH=attribute.datatype.max,
**common_fields,
)
case String():
record = builder.longStringOut(
pv,
length=attribute.datatype.length or DEFAULT_STRING_WAVEFORM_LENGTH,
**common_fields,
)
case Enum():
if len(attribute.datatype.members) > MBB_MAX_CHOICES:
datatype: Enum = attribute.datatype
def _verify_in_datatype(_, value):
return value in datatype.names
record = builder.longStringOut(
pv,
validate=_verify_in_datatype,
**common_fields,
)
else:
common_fields.update(create_state_keys(attribute.datatype))
record = builder.mbbOut(
pv,
**common_fields,
)
case Waveform():
record = builder.WaveformOut(
pv,
length=attribute.datatype.shape[0],
**common_fields,
)
case _:
raise FastCSError(
f"EPICS unsupported datatype on {attribute}: {attribute.datatype}"
)
def datatype_updater(datatype: DataType):
for name, value in asdict(datatype).items():
if name in DATATYPE_FIELD_TO_OUT_RECORD_FIELD:
record.set_field(DATATYPE_FIELD_TO_OUT_RECORD_FIELD[name], value)
attribute.add_update_datatype_callback(datatype_updater)
return record
[docs]
def create_state_keys(datatype: Enum):
"""Creates a dictionary of state field keys to names"""
return dict(
zip(
MBB_STATE_FIELDS,
datatype.names,
strict=False,
)
)
[docs]
def cast_from_epics_type(datatype: DataType[DType_T], value: object) -> DType_T:
"""Casts from an EPICS datatype to a FastCS datatype."""
match datatype:
case Bool():
if value == 0:
return False
elif value == 1:
return True
else:
raise ValueError(f"Invalid bool value from EPICS record {value}")
case Enum():
if len(datatype.members) <= MBB_MAX_CHOICES:
assert isinstance(value, int), "Got non-integer value for Enum"
return datatype.validate(datatype.members[value])
else: # enum backed by string record
assert isinstance(value, str), "Got non-string value for long Enum"
# python typing can't narrow the nested generic enum_cls
assert issubclass(datatype.enum_cls, enum.Enum), "Invalid Enum.enum_cls"
enum_member = datatype.enum_cls[value]
return datatype.validate(enum_member)
case datatype if issubclass(type(datatype), EPICS_ALLOWED_DATATYPES):
return datatype.validate(value) # type: ignore
case _:
raise ValueError(f"Unsupported datatype {datatype}")
[docs]
def cast_to_epics_type(datatype: DataType[DType_T], value: DType_T) -> Any:
"""Casts from an attribute's datatype to an EPICS datatype."""
match datatype:
case Enum():
if len(datatype.members) <= MBB_MAX_CHOICES:
return datatype.index_of(datatype.validate(value))
else: # enum backed by string record
return datatype.validate(value).name
case String() as string:
if string.length is not None:
return value[: string.length]
else:
return value[:DEFAULT_STRING_WAVEFORM_LENGTH]
case datatype if issubclass(type(datatype), EPICS_ALLOWED_DATATYPES):
return value
case _:
raise ValueError(f"Unsupported datatype {datatype}")