Source code for fastcs.transport.tango.util
from dataclasses import asdict
from typing import Any
from tango import AttrDataFormat
from fastcs.attributes import Attribute
from fastcs.datatypes import Bool, DataType, Enum, Float, Int, String, T, Waveform
TANGO_ALLOWED_DATATYPES = (Bool, DataType, Enum, Float, Int, String, Waveform)
DATATYPE_FIELD_TO_SERVER_FIELD = {
"units": "unit",
"min": "min_value",
"max": "max_value",
"min_alarm": "min_alarm",
"max_alarm": "min_alarm",
}
[docs]
def get_server_metadata_from_attribute(
attribute: Attribute[T],
) -> dict[str, Any]:
"""Gets the metadata for a Tango field from an attribute."""
arguments = {}
arguments["doc"] = attribute.description if attribute.description else ""
return arguments
[docs]
def get_server_metadata_from_datatype(datatype: DataType[T]) -> dict[str, str]:
"""Gets the metadata for a Tango field from a FastCS datatype."""
arguments = {
DATATYPE_FIELD_TO_SERVER_FIELD[field]: value
for field, value in asdict(datatype).items()
if field in DATATYPE_FIELD_TO_SERVER_FIELD
}
dtype = datatype.dtype
match datatype:
case Waveform():
dtype = datatype.array_dtype
match len(datatype.shape):
case 1:
arguments["max_dim_x"] = datatype.shape[0]
arguments["dformat"] = AttrDataFormat.SPECTRUM
case 2:
arguments["max_dim_x"], arguments["max_dim_y"] = datatype.shape
arguments["dformat"] = AttrDataFormat.IMAGE
case _:
raise TypeError(
f"Unsupported shape {datatype.shape}, Tango supports up "
"to 2D arrays"
)
case Float():
arguments["format"] = f"%.{datatype.prec}"
arguments["dtype"] = dtype
for argument, value in arguments.items():
if value is None:
arguments[argument] = ""
return arguments
[docs]
def cast_to_tango_type(datatype: DataType[T], value: T) -> object:
"""Casts a value from FastCS to tango datatype."""
match datatype:
case Enum():
return datatype.index_of(datatype.validate(value))
case datatype if issubclass(type(datatype), TANGO_ALLOWED_DATATYPES):
return datatype.validate(value)
case _:
raise ValueError(f"Unsupported datatype {datatype}")
[docs]
def cast_from_tango_type(datatype: DataType[T], value: object) -> T:
"""Casts a value from tango to FastCS datatype."""
match datatype:
case Enum():
return datatype.validate(datatype.members[value])
case datatype if issubclass(type(datatype), TANGO_ALLOWED_DATATYPES):
return datatype.validate(value) # type: ignore
case _:
raise ValueError(f"Unsupported datatype {datatype}")