Source code for malcolm.core.models
import inspect
from enum import Enum
from typing import (
Any,
Callable,
Dict,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Type,
Union,
cast,
)
import numpy as np
from annotypes import (
NO_DEFAULT,
Anno,
Array,
FrozenOrderedDict,
Serializable,
WithCallTypes,
deserialize_object,
to_array,
)
from malcolm.compat import OrderedDict
from .alarm import Alarm
from .camel import camel_to_title
from .notifier import DummyNotifier, Notifier
from .table import Table
from .tags import Widget, method_return_unpacked
from .timestamp import TimeStamp
def check_type(value, typ):
if typ != Any:
assert isinstance(value, typ), f"Expected {typ}, got {value!r}"
[docs]class Model(Serializable):
notifier: Union[Notifier, DummyNotifier] = DummyNotifier()
path: List[str] = []
__slots__: List[str] = []
[docs] def set_notifier_path(
self, notifier: Union[Notifier, DummyNotifier], path: List[str]
) -> None:
"""Sets the notifier, and the path from the path from block root
Args:
notifier (Notifier): The Notifier to tell when endpoint data changes
path (list): The absolute path to get to this object
"""
# This function should either change from the DummyNotifier or to
# the DummyNotifier, never between two valid notifiers
assert (
self.notifier is Model.notifier or notifier is Model.notifier
), f"Already have a notifier {self.notifier} path {self.path}"
self.notifier = notifier
self.path = path
# Tell all our children too
for name, ct in self.call_types.items():
if ct.is_mapping:
child = getattr(self, name)
if child and Model.matches_type(ct.typ[1]):
for k, v in child.items():
v.set_notifier_path(notifier, self.path + [name, k])
elif Model.matches_type(ct.typ):
assert not ct.is_array, f"Can't deal with Arrays of Models {ct}"
child = getattr(self, name)
child.set_notifier_path(notifier, self.path + [name])
def set_endpoint_data(self, name: str, value: Any) -> Any:
try:
ct = self.call_types[name]
except KeyError:
raise ValueError(f"{name!r} not in {self!r}.call_types {self.call_types!r}")
else:
if ct.is_array:
# Cast to right type, this will do some cheap validation
value = cast(Array, ct(value))
# Check we have the right type
assert not Model.matches_type(
ct.typ
), "Can't handle Array[Model] at the moment"
if isinstance(value.seq, (tuple, list)):
# Variable array, check types of each instance
# TODO: this might harm performance
typ = ct.typ
for x in value.seq:
assert isinstance(
x, typ
), f"Expected Array[{ct.typ!r}], got {value.seq!r}"
elif ct.is_mapping:
# Check it is the right type
ktype, vtype = ct.typ
for k, v in value.items():
check_type(k, ktype)
check_type(v, vtype)
# If we are setting structures of Models then sort notification
if Model.matches_type(ct.typ[1]):
# If we have old Models then stop them notifying
child = getattr(self, name, {})
if child:
for k, v in child.items():
v.set_notifier_path(Model.notifier, [])
for k, v in value.items():
v.set_notifier_path(self.notifier, self.path + [name, k])
else:
# If we are setting a Model then sort notification
if Model.matches_type(ct.typ):
# If we have an old Model then stop it notifying
child = getattr(self, name, None)
if child:
child.set_notifier_path(Model.notifier, [])
value.set_notifier_path(self.notifier, self.path + [name])
# Make sure it is the right typ
check_type(value, ct.typ)
with self.notifier.changes_squashed:
# Actually set the attribute
setattr(self, name, value)
# Tell the notifier what changed
self.notifier.add_squashed_change(self.path + [name], value)
return value
[docs] def apply_change(self, path: List[str], *args: Any) -> None:
"""Take a single change from a Delta and apply it to this model"""
if len(path) > 1:
# This is for a child
self[path[0]].apply_change(path[1:], *args)
else:
# This is for us
assert (
len(path) == 1 and len(args) == 1
), f"Cannot process change {[self.path + path] + list(args)}"
getattr(self, f"set_{path[0]}")(args[0])
# Types used when deserializing to the class
with Anno("Description of what this element represents"):
AMetaDescription = str
with Anno("Generic text tags for client tools to interpret"):
ATags = Union[Array[str]]
with Anno("Whether this element is currently writeable"):
AWriteable = bool
with Anno("A human readable label for the element"):
ALabel = str
# A more permissive union to allow a wider range of set_* args
UTags = Union[ATags, Sequence[str], str]
class Meta(Model):
"""Base class for describing Blocks, Methods and Attributes"""
__slots__ = ["description", "tags", "writeable", "label"]
def __init__(
self,
description: AMetaDescription = "",
tags: UTags = (),
writeable: AWriteable = False,
label: ALabel = "",
) -> None:
self.description = self.set_description(description)
self.tags = self.set_tags(tags)
self.writeable = self.set_writeable(writeable)
self.label = self.set_label(label)
def set_description(self, description: AMetaDescription) -> AMetaDescription:
return self.set_endpoint_data("description", description)
def set_tags(self, tags: UTags) -> ATags:
return self.set_endpoint_data("tags", tags)
def set_writeable(self, writeable: AWriteable) -> AWriteable:
return self.set_endpoint_data("writeable", writeable)
def set_label(self, label: ALabel) -> ALabel:
return self.set_endpoint_data("label", label)
[docs]class VMeta(Meta):
"""Abstract base class for validating the values of Attributes"""
attribute_class: Union[Type["AttributeModel"], None] = None
_annotype_lookup: Mapping[Tuple[type, bool, bool], Type["VMeta"]] = {}
__slots__: List[str] = []
[docs] def validate(self, value: Any) -> Any:
"""Abstract function to validate a given value
Args:
value: Value to validate
Returns:
The validated value if it passes
"""
raise NotImplementedError(self)
[docs] def create_attribute_model(self, initial_value: Any = None) -> "AttributeModel":
"""Make an AttributeModel instance of the correct type for this Meta
Args:
initial_value: The initial value the Attribute should take
Returns:
AttributeModel: The created attribute model instance
"""
assert self.attribute_class, "No attribute class"
attr = self.attribute_class(meta=self, value=initial_value)
return attr
[docs] def doc_type_string(self) -> str:
"""Abstract function to return the python type string.
For example, "str" or "numpy.int32"
"""
raise NotImplementedError(self)
[docs] def default_widget(self) -> Widget:
"""Abstract function to return the default widget type"""
raise NotImplementedError(self)
[docs] @classmethod
def from_annotype(cls, anno: Anno, writeable: bool, **kwargs: Any) -> "VMeta":
"""Return an instance of this class from an Anno"""
ret = cls(description=anno.description, writeable=writeable, **kwargs)
widget = ret.default_widget()
if widget != Widget.NONE:
ret.set_tags([widget.tag()])
return ret
[docs] @classmethod
def register_annotype_converter(
cls,
types: Union[Sequence[type], type],
is_array: bool = False,
is_mapping: bool = False,
) -> Anno:
"""Register this class as a converter for Anno instances"""
if not isinstance(types, Sequence):
types = [types]
def decorator(subclass):
for typ in types:
cls._annotype_lookup[(typ, is_array, is_mapping)] = subclass
return subclass
return decorator
[docs] @classmethod
def lookup_annotype_converter(cls, anno: Anno) -> Type["VMeta"]:
"""Look up a vmeta based on an Anno"""
bases: Union[List[Any], Tuple]
if hasattr(anno.typ, "__bases__"):
# This is a proper type
bases = inspect.getmro(anno.typ)
else:
# This is a numpy dtype
bases = [anno.typ]
for typ in bases:
key = (typ, bool(anno.is_array), bool(anno.is_mapping))
try:
return cls._annotype_lookup[key]
except KeyError:
pass
raise KeyError(anno)
# Types used when deserializing to the class
with Anno("The current value of the Attribute"):
AValue = Any
with Anno("The current alarm status"):
AAlarm = Alarm
with Anno("The time when the value was last updated"):
ATimeStamp = TimeStamp
with Anno("The validating Meta object describing our value"):
AVMeta = VMeta
# Don't register this with Serializable as we never instantiate it directly,
# only a subclass like NTScalar
[docs]class AttributeModel(Model):
"""Data Model for an Attribute"""
__slots__ = ["value", "alarm", "timeStamp", "meta"]
# noinspection PyPep8Naming
# timeStamp is camelCase to maintain compatibility with EPICS normative
# types
def __init__(
self,
value: AValue = None,
alarm: AAlarm = None,
timeStamp: ATimeStamp = None,
meta: AVMeta = None,
) -> None:
self.meta = self.set_meta(meta)
"""A Meta object describing the Attribute"""
self.value = self.set_value(value, set_alarm_ts=False)
"""The current value of the Attribute"""
self.alarm = self.set_alarm(alarm)
"""An Alarm object indicating any problems"""
self.timeStamp = self.set_timeStamp(timeStamp)
"""When value was last set"""
def set_meta(self, meta: Union[VMeta, None]) -> VMeta:
meta = deserialize_object(meta)
# Check that the meta attribute_class is ourself
assert isinstance(meta, VMeta), f"Expected meta object, got {type(meta)}"
assert isinstance(self, meta.attribute_class), (
f"Meta object needs to be attached to {meta.attribute_class}, "
f"we are a {type(self)}"
)
return self.set_endpoint_data("meta", meta)
[docs] def set_value(
self,
value: Any,
set_alarm_ts: bool = True,
alarm: Alarm = None,
ts: TimeStamp = None,
) -> Any:
"""Set value, calculating alarm and ts if requested"""
value = self.meta.validate(value)
if set_alarm_ts:
if alarm is None:
alarm = Alarm.ok
else:
alarm = cast(Alarm, deserialize_object(alarm, Alarm))
if ts is None:
ts = cast(TimeStamp, TimeStamp())
else:
ts = cast(TimeStamp, deserialize_object(ts, TimeStamp))
self.set_value_alarm_ts(value, alarm, ts)
else:
self.set_endpoint_data("value", value)
return self.value
[docs] def set_value_alarm_ts(self, value: Any, alarm: Alarm, ts: TimeStamp) -> None:
"""Set value with pre-validated alarm and timeStamp"""
with self.notifier.changes_squashed:
# Assume they are of the right format
self.value = value
self.notifier.add_squashed_change(self.path + ["value"], value)
if alarm is not self.alarm:
self.alarm = alarm
self.notifier.add_squashed_change(self.path + ["alarm"], alarm)
self.timeStamp = ts
self.notifier.add_squashed_change(self.path + ["timeStamp"], ts)
def set_alarm(self, alarm: Alarm = None) -> Alarm:
if alarm is None:
alarm = Alarm.ok
else:
alarm = deserialize_object(alarm, Alarm)
return self.set_endpoint_data("alarm", alarm)
# noinspection PyPep8Naming
# timeStamp is camelCase to maintain compatibility with EPICS normative
# types
def set_timeStamp(self, ts: TimeStamp = None) -> TimeStamp:
if ts is None:
ts = TimeStamp()
else:
ts = deserialize_object(ts, TimeStamp)
return self.set_endpoint_data("timeStamp", ts)
def apply_change(self, path, *args: Any) -> None:
if path == ["value"] and args:
self.set_value(args[0], set_alarm_ts=False)
else:
super().apply_change(path, *args)
[docs]@Serializable.register_subclass("epics:nt/NTTable:1.0")
class NTTable(AttributeModel):
"""AttributeModel containing a `TableMeta`"""
__slots__: List[str] = []
def set_value_alarm_ts(self, value: AValue, alarm: Alarm, ts: TimeStamp) -> None:
with self.notifier.changes_squashed:
# Assume they are of the right format
# Work out what changed in value, do a cheap Array id check
changed = [k for k in value if value[k] is not self.value[k]]
self.value = value
if len(changed) == len(value.call_types):
# Everything changed
self.notifier.add_squashed_change(self.path + ["value"], value)
else:
# Only some changed
for k in changed:
self.notifier.add_squashed_change(
self.path + ["value", k], value[k]
)
if alarm is not self.alarm:
self.alarm = alarm
self.notifier.add_squashed_change(self.path + ["alarm"], alarm)
self.timeStamp = ts
self.notifier.add_squashed_change(self.path + ["timeStamp"], ts)
[docs]@Serializable.register_subclass("epics:nt/NTScalarArray:1.0")
class NTScalarArray(AttributeModel):
"""AttributeModel containing a `VArrayMeta`"""
__slots__: List[str] = []
[docs]@Serializable.register_subclass("epics:nt/NTScalar:1.0")
class NTScalar(AttributeModel):
"""AttributeModel containing a `StringMeta`, `BooleanMeta`, `NumberMeta`
or `ChoiceMeta`
"""
__slots__: List[str] = []
[docs]@Serializable.register_subclass("epics:nt/NTUnion:1.0")
class NTUnion(AttributeModel):
"""AttributeModel containing a meta producing some object structure"""
__slots__: List[str] = []
FALSE_STRINGS = {"0", "False", "false", "FALSE", "No", "no", "NO"}
[docs]@Serializable.register_subclass("malcolm:core/BooleanMeta:1.0")
@VMeta.register_annotype_converter(bool)
class BooleanMeta(VMeta):
"""Meta object containing information for a boolean"""
attribute_class = NTScalar
__slots__: List[str] = []
[docs] def validate(self, value: Any) -> bool:
"""Cast value to boolean and return it"""
if value in FALSE_STRINGS:
return False
else:
return bool(value)
def doc_type_string(self) -> str:
return "bool"
def default_widget(self) -> Widget:
if self.writeable:
return Widget.CHECKBOX
else:
return Widget.LED
with Anno("Choices of valid strings"):
AChoices = Union[Array[str]]
UChoices = Union[AChoices, Sequence[Enum], Sequence[str]]
[docs]@Serializable.register_subclass("malcolm:core/ChoiceMeta:1.0")
@VMeta.register_annotype_converter(Enum)
class ChoiceMeta(VMeta):
"""Meta object containing information for a enum"""
attribute_class = NTScalar
__slots__ = ["choices"]
def __init__(
self,
description: AMetaDescription = "",
choices: UChoices = (),
tags: UTags = (),
writeable: AWriteable = False,
label: ALabel = "",
) -> None:
super().__init__(description, tags, writeable, label)
self.choices_lookup: Dict[Any, Union[str, Enum]] = {}
# Used for ChoiceMetaArray subclass only for producing Arrays
self.enum_cls: Union[Type, None] = None
self.choices = self.set_choices(choices)
def set_choices(self, choices: UChoices) -> AChoices:
# Calculate a lookup from all possible entries to the choice value
choices_lookup: Dict[Any, Union[str, Enum]] = {}
new_choices: List[Union[str, Enum]]
new_choices = [] # type: ignore
enum_typ: Union[Type, None] = None
choice: Union[object, None]
for i, choice in enumerate(choices):
# If we already have an enum type it must match
if enum_typ is not None:
assert isinstance(
choice, enum_typ
), f"Expected {enum_typ} choice, got {choice}"
elif not isinstance(choice, str):
enum_typ = type(choice)
if isinstance(choice, Enum):
# Our choice value must be a string
assert isinstance(choice.value, str), (
f"Expected Enum choice to have str value, got {choice!r} with "
f"value {choice.value!r}"
)
# Map the Enum instance and str to the Enum instance
choices_lookup[choice.value] = choice
choices_lookup[choice] = choice
new_choices.append(choice)
else:
assert isinstance(choice, str), f"Expected string choice, got {choice}"
# Map the string to itself
choices_lookup[choice] = choice
new_choices.append(choice)
# Map the index to the choice
choices_lookup[i] = choice
if choices:
# Map the default value to the first choice
choices_lookup[None] = choices[0]
else:
# There are no choices, so the default value is the empty string
choices_lookup[None] = ""
self.choices_lookup = choices_lookup
if enum_typ is None or Model.matches_type(enum_typ):
# We are producing strings
self.enum_cls = str
else:
# We are producing enums
self.enum_cls = enum_typ
self.call_types["choices"].typ = self.enum_cls
return self.set_endpoint_data(
"choices", self.call_types["choices"](new_choices)
)
[docs] def validate(self, value: Any) -> Union[Enum, str]:
"""Check if the value is valid returns it"""
# Our lookup table contains all the possible values
try:
return self.choices_lookup[value]
except KeyError:
raise ValueError(f"{value!r} is not a valid value in {list(self.choices)}")
def doc_type_string(self) -> str:
return " | ".join([repr(x) for x in self.choices])
def default_widget(self) -> Widget:
if self.writeable:
return Widget.COMBO
else:
return Widget.TEXTUPDATE
@classmethod
def from_annotype(cls, anno: Anno, writeable: bool, **kwargs: Any) -> VMeta:
return super().from_annotype(anno, writeable, choices=list(anno.typ))
with Anno("The lower bound of range within which the value must be set"):
ALimitLow = Union[np.float64]
ULimitLow = Union[ALimitLow, float]
with Anno("The upper bound of range within which the value must be set"):
ALimitHigh = Union[np.float64]
ULimitHigh = Union[ALimitHigh, float]
with Anno("Number of significant figures to display"):
APrecision = Union[np.int32]
UPrecision = Union[APrecision, int]
with Anno("The units for the value"):
AUnits = str
[docs]@Serializable.register_subclass("display_t")
class Display(Model):
__slots__ = ["limitLow", "limitHigh", "description", "precision", "units"]
# noinspection PyPep8Naming
# limitLow and limitHigh are camelCase to maintain compatibility with
# EPICS normative types
def __init__(
self,
limitLow: ULimitLow = 0,
limitHigh: ULimitHigh = 0,
description: AMetaDescription = "",
precision: UPrecision = 0,
units: AUnits = "",
) -> None:
# Set initial values
self.limitLow = self.set_limitLow(limitLow)
self.limitHigh = self.set_limitHigh(limitHigh)
self.description = self.set_description(description)
self.precision = self.set_precision(precision)
self.units = self.set_units(units)
# noinspection PyPep8Naming
# limitLow is camelCase to maintain compatibility with EPICS normative
# types
def set_limitLow(self, limitLow: ULimitLow) -> ALimitLow:
return self.set_endpoint_data("limitLow", np.float64(limitLow))
# noinspection PyPep8Naming
# limitHigh is camelCase to maintain compatibility with EPICS normative
# types
def set_limitHigh(self, limitHigh: ULimitHigh) -> ALimitHigh:
return self.set_endpoint_data("limitHigh", np.float64(limitHigh))
def set_precision(self, precision: UPrecision) -> APrecision:
return self.set_endpoint_data("precision", np.int32(precision))
def set_units(self, units: AUnits) -> AUnits:
return self.set_endpoint_data("units", units)
def set_description(self, description: AMetaDescription) -> AMetaDescription:
return self.set_endpoint_data("description", description)
with Anno("Numpy dtype string"):
ADtype = str
with Anno("Display info meta object"):
ADisplay = Display
_dtype_strings = [
"int8",
"uint8",
"int16",
"uint16",
"int32",
"uint32",
"int64",
"uint64",
"float32",
"float64",
]
_dtype_string_lookup = {getattr(np, dtype): dtype for dtype in _dtype_strings}
_dtype_string_lookup.update({int: "int64", float: "float64"})
[docs]@Serializable.register_subclass("malcolm:core/NumberMeta:1.0")
@VMeta.register_annotype_converter(list(_dtype_string_lookup))
class NumberMeta(VMeta):
"""Meta object containing information for a numerical value"""
attribute_class = NTScalar
__slots__ = ["dtype", "display"]
def __init__(
self,
dtype: ADtype = "float64",
description: AMetaDescription = "",
tags: UTags = (),
writeable: AWriteable = False,
label: ALabel = "",
display: ADisplay = None,
) -> None:
super().__init__(description, tags, writeable, label)
# like np.float64
self._np_type: type = np.float64
# like "float64"
self.dtype = self.set_dtype(dtype)
if display is None:
# Guess some defaults for the display object
if dtype in ["float32", "float64"]:
precision = 8
else:
precision = 0
display = Display(precision=precision)
self.display = self.set_display(display)
def set_display(self, display: ADisplay) -> ADisplay:
display = deserialize_object(display, Display)
return self.set_endpoint_data("display", display)
def set_dtype(self, dtype: ADtype) -> ADtype:
assert (
dtype in _dtype_strings
), f"Expected dtype to be in {self._dtypes}, got {dtype}"
self._np_type = getattr(np, dtype)
return self.set_endpoint_data("dtype", dtype)
[docs] def validate(self, value: Any) -> np.number:
"""Check if the value is valid returns it"""
if value is None:
value = 0
cast = self._np_type(value)
return cast
def doc_type_string(self) -> str:
return f"{self.dtype}"
def default_widget(self) -> Widget:
if self.writeable:
return Widget.TEXTINPUT
else:
return Widget.TEXTUPDATE
@classmethod
def from_annotype(cls, anno: Anno, writeable: bool, **kwargs: Any) -> VMeta:
return super().from_annotype(
anno, writeable, dtype=_dtype_string_lookup[anno.typ]
)
[docs]@Serializable.register_subclass("malcolm:core/StringMeta:1.0")
@VMeta.register_annotype_converter(str)
class StringMeta(VMeta):
"""Meta object containing information for a string"""
attribute_class = NTScalar
__slots__: List[str] = []
[docs] def validate(self, value: Any) -> str:
"""Check if the value is valid returns it"""
if value is None:
return ""
elif isinstance(value, str):
return value
else:
return str(value)
def doc_type_string(self) -> str:
return "str"
def default_widget(self) -> Widget:
if self.writeable:
return Widget.TEXTINPUT
else:
return Widget.TEXTUPDATE
[docs]class VArrayMeta(VMeta):
"""Intermediate abstract class so `TableMeta` can say "only arrays" """
attribute_class = NTScalarArray
__slots__: List[str] = []
def to_np_array(dtype, value: Any) -> Any:
# Give the Array the shorthand version
if dtype == np.float64:
dtype = float
elif dtype == np.int64:
dtype = int
if value.__class__ is Array and getattr(value.seq, "dtype", None) == dtype:
# If Array wraps a numpy array of the correct type we are done
return value
else:
if isinstance(value, Sequence):
# Cast to numpy array
value = np.array(value, dtype=dtype)
return to_array(Array[dtype], value)
[docs]@Serializable.register_subclass("malcolm:core/BooleanArrayMeta:1.0")
@VMeta.register_annotype_converter(bool, is_array=True)
class BooleanArrayMeta(VArrayMeta):
"""Meta object containing information for a boolean array"""
[docs] def validate(self, value: Any) -> Array[bool]:
"""Check if the value is valid returns it"""
return to_np_array(bool, value)
def doc_type_string(self) -> str:
return "[bool]"
def default_widget(self) -> Widget:
if self.writeable:
return Widget.CHECKBOX
else:
return Widget.LED
[docs]@Serializable.register_subclass("malcolm:core/ChoiceArrayMeta:1.0")
@VMeta.register_annotype_converter(Enum, is_array=True)
class ChoiceArrayMeta(ChoiceMeta, VArrayMeta):
"""Meta object containing information for a choice array"""
[docs] def validate(self, value: Any) -> Array[str]:
"""Check if the value is valid returns it"""
if value is None:
return Array[self.enum_cls]()
else:
ret = []
if isinstance(value, str):
value = [value]
# If we have an Array of the right type, start off assuming it's the
# same
is_same = value.__class__ is Array and value.typ is self.enum_cls
for i, choice in enumerate(value):
# Our lookup table contains all the possible values
try:
new_choice = self.choices_lookup[choice]
except KeyError:
raise ValueError(
f"{value} is not a valid value in {self.choices} "
f"for element {i}"
)
else:
is_same &= choice == new_choice
ret.append(new_choice)
if is_same:
return value
else:
return to_array(Array[self.enum_cls], ret)
def doc_type_string(self) -> str:
return f"[{super().doc_type_string()}]"
[docs]@Serializable.register_subclass("malcolm:core/NumberArrayMeta:1.0")
@VMeta.register_annotype_converter(list(_dtype_string_lookup), is_array=True)
class NumberArrayMeta(NumberMeta, VArrayMeta):
"""Meta object containing information for an array of numerical values"""
[docs] def validate(self, value: Any) -> Array:
"""Check if the value is valid returns it"""
return to_np_array(self._np_type, value)
def doc_type_string(self) -> str:
return f"[{self.dtype}]"
[docs]@Serializable.register_subclass("malcolm:core/StringArrayMeta:1.0")
@VMeta.register_annotype_converter(str, is_array=True)
class StringArrayMeta(VArrayMeta):
"""Meta object containing information for a string array"""
[docs] def validate(self, value: Any) -> Array:
"""Check if the value is valid returns it"""
cast = to_array(Array[str], value)
for v in cast:
assert isinstance(v, str), f"Expected Array[str], got {value!r}"
return cast
def doc_type_string(self) -> str:
return "[str]"
def default_widget(self) -> Widget:
if self.writeable:
return Widget.TEXTINPUT
else:
return Widget.TEXTUPDATE
with Anno("Elements that should appear in the table instance"):
ATableElements = Mapping[str, VArrayMeta]
[docs]@Serializable.register_subclass("malcolm:core/TableMeta:1.0")
@VMeta.register_annotype_converter(Table)
class TableMeta(VMeta):
__slots__ = ["elements"]
attribute_class = NTTable
def __init__(
self,
description: AMetaDescription = "",
tags: UTags = (),
writeable: AWriteable = False,
label: ALabel = "",
elements: ATableElements = None,
) -> None:
self.table_cls: Union[Type[Table], None] = None
self.elements: Dict[str, Meta] = {}
super().__init__(description, tags, writeable, label)
# Do this after so writeable is honoured
self.set_elements(elements if elements else {})
[docs] def set_elements(self, elements: ATableElements) -> ATableElements:
"""Set the elements dict from a serialized dict"""
deserialized = OrderedDict()
for k, v in elements.items():
if k != "typeid":
deserialized[k] = deserialize_object(v, VArrayMeta)
ret = self.set_endpoint_data("elements", deserialized)
self.set_table_cls(self.table_cls)
return ret
def set_table_cls(self, table_cls: Type[Table] = None) -> None:
if table_cls is None or table_cls.__name__ == "TableSubclass":
# Either autogenerated by this function or not set, so make one
class TableSubclass(Table):
def __init__(self, **kwargs: Any) -> None:
self.__dict__.update(kwargs)
table_cls = TableSubclass
for k, meta in self.elements.items():
# We can distinguish the type by asking for the default
# validate value
default_array: Array = meta.validate(None)
anno = Anno(meta.description, name=k).set_typ(
default_array.typ, is_array=True
)
table_cls.call_types[k] = anno
else:
# User supplied, check it matches element names
assert Table.matches_type(
table_cls
), f"Expecting table subclass, got {table_cls}"
missing = set(self.elements) - set(table_cls.call_types)
assert not missing, f"Supplied Table missing fields {missing}"
extra = set(table_cls.call_types) - set(self.elements)
assert not extra, f"Supplied Table has extra fields {extra}"
self.table_cls = table_cls
def validate(self, value: Any) -> Any:
if value is None:
# Create an empty table
value = {k: None for k in self.elements}
elif isinstance(value, Table):
# Serialize a single level so we can type check it
value = {k: value[k] for k in value.call_types}
elif not isinstance(value, dict):
raise ValueError(f"Expected Table instance or serialized, got {value}")
# We need to make a table instance ourselves
keys = set(x for x in value if x != "typeid")
missing = set(self.elements) - keys
assert not missing, f"Supplied table missing fields {missing}"
extra = keys - set(self.elements)
assert not extra, f"Supplied table has extra fields {extra}"
args = {k: meta.validate(value[k]) for k, meta in self.elements.items()}
assert self.table_cls, "No table set"
value = self.table_cls(**args)
# Check column lengths
value.validate_column_lengths()
# Check the table class give Array elements
for k in args:
assert (
value[k].__class__ is Array
), f"Table Class {self.table_cls} doesn't wrap attr '{k}' with an Array"
return value
def doc_type_string(self) -> str:
return "`Table`"
def default_widget(self) -> Widget:
return Widget.TABLE
[docs] @classmethod
def from_table(
cls,
table_cls: Type[Table],
description: str,
widget: Widget = None,
writeable: List[str] = [],
extra_tags: List[str] = [],
) -> "TableMeta":
"""Create a TableMeta object, using a Table subclass as the spec
Args:
table_cls: The Table class to read __init__ args from
description: The description of the created Meta
widget: The widget of the created Meta
writeable: A list of the writeable field names. If there are any
writeable fields then the whole Meta is writeable
extra_tags: A list of tags to be added to the table meta
"""
elements = OrderedDict()
for k, ct in table_cls.call_types.items():
subclass = cls.lookup_annotype_converter(ct)
elements[k] = subclass.from_annotype(ct, writeable=k in writeable)
ret = cls(description=description, elements=elements, writeable=bool(writeable))
if widget is None:
widget = ret.default_widget()
tags = [widget.tag()]
tags.extend(extra_tags)
ret.set_tags(tags)
ret.set_table_cls(table_cls)
return ret
@classmethod
def from_annotype(cls, anno: Anno, writeable: bool, **kwargs: Any) -> VMeta:
assert Table.matches_type(anno.typ), f"Expected Table, got {anno.typ}"
if writeable:
# All fields are writeable
writeable_fields = list(anno.typ.call_types)
else:
# No fields are writeable
writeable_fields = []
return cls.from_table(anno.typ, anno.description, writeable=writeable_fields)
# Types used when deserializing to the class
with Anno("Meta objects that are used to describe the elements in the map"):
AElements = Mapping[str, VMeta]
with Anno("The required elements in the map"):
ARequired = Union[Array[str]]
# A more permissive union to allow a wider range of set_* args
URequired = Union[ARequired, Sequence[str], str]
[docs]@Serializable.register_subclass("malcolm:core/MapMeta:1.0")
class MapMeta(Model):
"""An object containing a set of ScalarMeta objects"""
__slots__ = ["elements", "required"]
def __init__(
self, elements: Optional[AElements] = None, required: URequired = ()
) -> None:
self.elements = self.set_elements(elements if elements else {})
self.required = self.set_required(required)
def set_elements(self, elements: AElements) -> AElements:
deserialized = OrderedDict()
for k, v in elements.items():
if k != "typeid":
v = deserialize_object(v, VMeta)
if not v.label:
v.set_label(camel_to_title(k))
deserialized[k] = v
return self.set_endpoint_data("elements", deserialized)
def set_required(self, required: URequired) -> ARequired:
for r in required:
assert (
r in self.elements
), f"Expected one of {list(self.elements)!r}, got {r!r}"
return self.set_endpoint_data("required", ARequired(required))
[docs] def validate(
self, param_dict: Optional[Mapping[str, Any]] = None, add_missing: bool = False
) -> Dict[str, Any]:
"""Return a param dict in the right order, with the correct keys and
values of the correct type with no extras or missing"""
if param_dict is None:
param_dict = {}
extra = set(param_dict) - set(self.elements)
assert not extra, (
f"Given keys {list(sorted(param_dict))}, some of which aren't "
f"in allowed keys {list(self.elements)}"
)
args = OrderedDict()
for k, m in self.elements.items():
if k in param_dict:
args[k] = m.validate(param_dict[k])
elif add_missing:
args[k] = m.validate(None)
missing: Set = set(self.required) - set(args)
assert (
not missing
), f"Requires keys {list(self.required)} but only given {list(args)}"
return args
# Types used when deserializing to the class
with Anno("Meta for describing the arguments that should be passed"):
ATakes = MapMeta
with Anno("The required elements in the map"):
ADefaults = Mapping[str, Any]
with Anno("Meta for describing the arguments that will be returned"):
AReturns = MapMeta
[docs]@Serializable.register_subclass("malcolm:core/MethodMeta:1.1")
class MethodMeta(Meta):
"""Exposes a function with metadata for arguments and return values"""
__slots__ = ["takes", "returns", "defaults"]
def __init__(
self,
takes: Optional[ATakes] = None,
defaults: Optional[ADefaults] = None,
description: AMetaDescription = "",
tags: UTags = (),
writeable: AWriteable = False,
label: ALabel = "",
returns: Optional[AReturns] = None,
) -> None:
self.takes = self.set_takes(takes if takes else MapMeta())
self.returns = self.set_returns(returns if returns else MapMeta())
self.defaults = self.set_defaults(defaults if defaults else {})
super().__init__(description, tags, writeable, label)
def set_takes(self, takes: ATakes) -> ATakes:
takes = deserialize_object(takes, MapMeta)
return self.set_endpoint_data("takes", takes)
def set_defaults(self, defaults: ADefaults) -> ADefaults:
defaults = FrozenOrderedDict(
tuple(
(k, self.takes.elements[k].validate(v))
for k, v in defaults.items()
if k != "typeid"
)
)
return self.set_endpoint_data("defaults", defaults)
def set_returns(self, returns: AReturns) -> AReturns:
returns = deserialize_object(returns, MapMeta)
return self.set_endpoint_data("returns", returns)
[docs] @classmethod
def from_callable(
cls,
func: Callable,
description: str = None,
returns: bool = True,
without_takes: Sequence[str] = (),
) -> "MethodMeta":
"""Return an instance of this class from a Callable
Args:
func: @with_call_types decorated Callable to inspect
description: Override description. If None use func.__doc__
returns: If True then scan return_type too
without_takes: A sequence of strings that should not appear in the
takes structure
Returns:
A MethodMeta with takes and returns matching the input func
"""
if description is None:
if func.__doc__ is None:
description = ""
else:
description = func.__doc__
method = cls(description=description)
tags = []
takes_elements = OrderedDict()
defaults = OrderedDict()
takes_required = []
without_takes_set = set(without_takes)
for k, anno in getattr(func, "call_types", {}).items():
if k in without_takes_set:
continue
scls = VMeta.lookup_annotype_converter(anno)
takes_elements[k] = scls.from_annotype(anno, writeable=True)
if anno.default is NO_DEFAULT:
takes_required.append(k)
elif anno.default is not None:
defaults[k] = anno.default
takes = MapMeta(elements=takes_elements, required=takes_required)
method.set_takes(takes)
method.set_defaults(defaults)
if returns:
returns_elements = OrderedDict()
returns_required = []
return_type: Anno = getattr(func, "return_type", None)
if return_type is None or return_type.typ is None:
call_types = {}
elif WithCallTypes.matches_type(return_type.typ):
call_types = return_type.typ.call_types
else:
tags.append(method_return_unpacked())
call_types = {"return": return_type}
for k, anno in call_types.items():
scls = VMeta.lookup_annotype_converter(anno)
returns_elements[k] = scls.from_annotype(anno, writeable=False)
if anno.default is not None:
returns_required.append(k)
returns = MapMeta(elements=returns_elements, required=returns_required)
method.set_returns(returns)
method.set_tags(tags)
return method
# Types used when deserializing to the class
with Anno("The last map this took/returned"):
AMVValue = Mapping[str, Any]
with Anno("The elements that were supplied in the map"):
APresent = Union[Array[str]]
# A more permissive union to allow a wider range of set_* args
UPresent = Union[APresent, Sequence[str], str]
[docs]@Serializable.register_subclass("malcolm:core/MethodLog:1.0")
class MethodLog(Serializable):
"""Exposes a function with metadata for arguments and return values"""
__slots__ = ["value", "alarm", "timeStamp"]
# noinspection PyPep8Naming
# timeStamp is camelCase to maintain compatibility with EPICS normative
# types
def __init__(
self,
value: AMVValue = None,
present: UPresent = (),
alarm: AAlarm = None,
timeStamp: ATimeStamp = None,
) -> None:
self.value: Union[Dict, AMVValue]
if value is None:
self.value = {}
else:
self.value = value
self.present = APresent(present)
if alarm is None:
self.alarm = Alarm.ok
else:
self.alarm = deserialize_object(alarm, Alarm)
if timeStamp is None:
self.timeStamp = TimeStamp()
else:
self.timeStamp = deserialize_object(timeStamp, TimeStamp)
# Types used when deserializing to the class
with Anno("The last arguments that a method call took"):
ATook = MethodLog
with Anno("The last return value produced by a method call"):
AReturned = MethodLog
with Anno("Meta for describing the arguments that will be returned"):
AMethodMeta = MethodMeta
[docs]@Serializable.register_subclass("malcolm:core/Method:1.1")
class MethodModel(Model):
"""Exposes a function with last took and returned arguments"""
__slots__ = ["took", "returned", "meta"]
def __init__(
self, took: ATook = None, returned: AReturned = None, meta: AMethodMeta = None
) -> None:
self.meta = self.set_meta(meta if meta else MethodMeta())
self.took = self.set_took(took)
self.returned = self.set_returned(returned)
def set_meta(self, meta: AMethodMeta) -> AMethodMeta:
meta = deserialize_object(meta, MethodMeta)
return self.set_endpoint_data("meta", meta)
def set_took(self, took: ATook = None) -> ATook:
if took is None:
took = MethodLog(
self.meta.takes.validate(add_missing=True), [], Alarm.ok, TimeStamp.zero
)
else:
took = deserialize_object(took, MethodLog)
return self.set_endpoint_data("took", took)
def set_returned(self, returned: AReturned = None) -> AReturned:
if returned is None:
returned = MethodLog(
self.meta.returns.validate(add_missing=True),
[],
Alarm.ok,
TimeStamp.zero,
)
else:
returned = deserialize_object(returned, MethodLog)
return self.set_endpoint_data("returned", returned)
# Types used when deserializing to the class
with Anno("The list of fields currently in the Block"):
AFields = Union[Array[str]]
# A more permissive union to allow a wider range of set_* args
UFields = Union[AFields, Sequence[str], str]
[docs]@Serializable.register_subclass("malcolm:core/BlockMeta:1.0")
class BlockMeta(Meta):
__slots__ = ["fields"]
def __init__(
self,
description: AMetaDescription = "",
tags: UTags = (),
writeable: AWriteable = True,
label: ALabel = "",
fields: UFields = (),
) -> None:
super().__init__(description, tags, writeable, label)
self.fields = self.set_fields(fields)
def set_fields(self, fields: UFields) -> AFields:
return self.set_endpoint_data("fields", AFields(fields))
[docs]@Serializable.register_subclass("malcolm:core/Block:1.0")
class BlockModel(Model):
"""Data Model for a Block"""
def __init__(self) -> None:
# Make a new call_types dict so we don't modify for all instances
self.call_types = OrderedDict()
self.meta = self.set_endpoint_data("meta", BlockMeta())
def set_endpoint_data(
self,
name: str,
value: Union[AttributeModel, MethodModel, BlockMeta],
) -> Any:
name = deserialize_object(name, str)
if name == "meta":
value = deserialize_object(value, BlockMeta)
else:
value = deserialize_object(value, (AttributeModel, MethodModel))
with self.notifier.changes_squashed:
if name in self.call_types:
# Stop the old Model notifying
getattr(self, name).set_notifier_path(Model.notifier, [])
else:
anno = Anno("Field").set_typ(type(value))
self.call_types[name] = anno
value.set_notifier_path(self.notifier, self.path + [name])
setattr(self, name, value)
# Tell the notifier what changed
self.notifier.add_squashed_change(self.path + [name], value)
self._update_fields()
return value
def _update_fields(self):
self.meta.set_fields([x for x in self.call_types if x != "meta"])
def remove_endpoint(self, name: str) -> None:
with self.notifier.changes_squashed:
getattr(self, name).set_notifier_path(Model.notifier, [])
self.call_types.pop(name)
delattr(self, name)
self._update_fields()
self.notifier.add_squashed_delete(self.path + [name])