Source code for fastcs.transports.tango.util
from dataclasses import asdict
from typing import Any
from tango import AttrDataFormat
from fastcs.attributes import Attribute
from fastcs.datatypes import (
Bool,
DataType,
DType,
DType_T,
Enum,
Float,
Int,
String,
Waveform,
)
TANGO_ALLOWED_DATATYPES = (Bool, DataType, Enum, Float, Int, String, Waveform)
DATATYPE_FIELD_TO_SERVER_FIELD = {
"units": "unit",
"min": "min_value",
"max": "max_value",
"min_alarm": "min_alarm",
"max_alarm": "min_alarm",
}
[docs]
def get_server_metadata_from_attribute(
attribute: Attribute[DType],
) -> dict[str, Any]:
"""Gets the metadata for a Tango field from an attribute."""
arguments = {}
arguments["doc"] = attribute.description if attribute.description else ""
return arguments
[docs]
def get_server_metadata_from_datatype(datatype: DataType[DType]) -> dict[str, str]:
"""Gets the metadata for a Tango field from a FastCS datatype."""
arguments = {
DATATYPE_FIELD_TO_SERVER_FIELD[field]: value
for field, value in asdict(datatype).items()
if field in DATATYPE_FIELD_TO_SERVER_FIELD
}
dtype = datatype.dtype
match datatype:
case Waveform():
dtype = datatype.array_dtype
match len(datatype.shape):
case 1:
arguments["max_dim_x"] = datatype.shape[0]
arguments["dformat"] = AttrDataFormat.SPECTRUM
case 2:
arguments["max_dim_x"], arguments["max_dim_y"] = datatype.shape
arguments["dformat"] = AttrDataFormat.IMAGE
case _:
raise TypeError(
f"Unsupported shape {datatype.shape}, Tango supports up "
"to 2D arrays"
)
case Float():
arguments["format"] = f"%.{datatype.prec}"
arguments["dtype"] = dtype
for argument, value in arguments.items():
if value is None:
arguments[argument] = ""
return arguments
[docs]
def cast_to_tango_type(datatype: DataType[DType_T], value: DType_T) -> object:
"""Casts a value from FastCS to tango datatype."""
match datatype:
case Enum():
return datatype.index_of(datatype.validate(value))
case datatype if issubclass(type(datatype), TANGO_ALLOWED_DATATYPES):
return datatype.validate(value)
case _:
raise ValueError(f"Unsupported datatype {datatype}")
[docs]
def cast_from_tango_type(datatype: DataType[DType_T], value: object) -> DType_T:
"""Casts a value from tango to FastCS datatype."""
match datatype:
case Enum():
assert isinstance(value, int), "Got non-integer value for Enum"
return datatype.validate(datatype.members[value])
case datatype if issubclass(type(datatype), TANGO_ALLOWED_DATATYPES):
return datatype.validate(value) # type: ignore
case _:
raise ValueError(f"Unsupported datatype {datatype}")