Source code for softioc.device

import os
import time
import ctypes
from ctypes import *
import numpy

from . import alarm
from . import fields
from .imports import (
    create_callback_capsule,
    dbLoadDatabase,
    signal_processing_complete,
    recGblResetAlarms,
    db_put_field,
    db_get_field,
)
from .device_core import DeviceSupportCore, RecordLookup


# This is set from softioc.iocInit
dispatcher = None

# Global blocking flag, used to mark asynchronous (False) or synchronous (True)
# processing modes for Out records.
# Default False to maintain behaviour from previous versions.
blocking = False

# Set the current global blocking flag, and return the previous value.
[docs]def SetBlocking(new_val): global blocking old_val = blocking blocking = new_val return old_val
# EPICS processing return codes EPICS_OK = 0 EPICS_ERROR = 1 NO_CONVERT = 2 class ProcessDeviceSupportCore(DeviceSupportCore, RecordLookup): '''Implements canonical default processing for records with a _process method. Processing typically either copies a locally set value into the record, or else reads a value from the record and triggers an update. ''' # Most records just have an extra process method, but unfortunately ai/ao # will have to override this to also add their special_linconv method. _dset_extra_ = ([('process', CFUNCTYPE(c_int, c_void_p))], [0]) # For some record types we want to return a different return code either # from record init or processing _epics_rc_ = EPICS_OK # Most subclasses (all except waveforms) define a ctypes constructor for the # underlying EPICS compatible value. def _value_to_epics(self, value): return self._ctype_(value) def _epics_to_value(self, epics): return epics.value def _default_value(self): return self._ctype_() def _compare_values(self, value1, value2): return value1.value == value2.value # This method is called during Out record processing to return the # underlying value in EPICS format. def _read_value(self, record): # Take a true copy of the value read to avoid accidental sharing result = self._ctype_() result.value = record.read_val().value return result # This method is called during In record processing to update the # underlying value (the value must be in EPICS compatible format). This is # also called during Out record initialisation and value reversion when # required. def _write_value(self, record, value): record.write_val(value) def get_field(self, field): ''' Returns the given field value as a string.''' assert hasattr(self, "_record"), \ 'get_field may only be called after iocInit' data = (c_char * 40)() name = self._name + '.' + field db_get_field(name, fields.DBF_STRING, addressof(data), 1) return _string_at(data, 40) def set_field(self, field, value): '''Sets the given field to the given value. Value will be transported as a DBF_STRING.''' assert hasattr(self, "_record"), \ 'set_field may only be called after iocInit' data = (c_char * 40)() data.value = str(value).encode() + b'\0' name = self._name + '.' + field db_put_field(name, fields.DBF_STRING, addressof(data), 1)
[docs]class ProcessDeviceSupportIn(ProcessDeviceSupportCore): _link_ = 'INP' def __init__(self, name, **kargs): if 'initial_value' in kargs: value = self._value_to_epics(kargs.pop('initial_value')) else: value = self._default_value() # We implement update locking via a simple trick which relies on the # Python global interpreter lock: this ensures that assigning or # reading a single value is atomic. We therefore cluster all our # variable state into a single tuple which represents a single value # to be processed. # The tuple contains everything needed to be written: the value, # severity, alarm and optional timestamp. self._value = (value, alarm.NO_ALARM, alarm.UDF_ALARM, None) self.__super.__init__(name, **kargs) def _process(self, record): # For input process we copy the value stored in the instance to the # record. The alarm status is also updated, and a custom timestamp # can also be set. value, severity, alarm, timestamp = self._value self._write_value(record, value) self.process_severity(record, severity, alarm) if timestamp is not None: record.TIME = timestamp record.UDF = 0 return self._epics_rc_
[docs] def set(self, value, severity=alarm.NO_ALARM, alarm=alarm.NO_ALARM, timestamp=None): '''Updates the stored value and triggers an update. The alarm severity and timestamp can also be specified if appropriate.''' value = self._value_to_epics(value) self._value = (value, severity, alarm, timestamp) self.trigger()
[docs] def set_alarm(self, severity, alarm, timestamp=None): '''Updates the alarm status without changing the stored value. An update is triggered, and a timestamp can optionally be specified.''' self._value = (self._value[0], severity, alarm, timestamp) self.trigger()
[docs] def get(self): '''Returns the last written value.''' return self._epics_to_value(self._value[0])
[docs]class ProcessDeviceSupportOut(ProcessDeviceSupportCore): _link_ = 'OUT' def __init__(self, name, **kargs): on_update = kargs.pop('on_update', None) on_update_name = kargs.pop('on_update_name', None) # At most one of on_update and on_update_name can be specified assert on_update is None or on_update_name is None, \ 'Cannot specify on_update and on_update_name together' if on_update: self.__on_update = on_update elif on_update_name: self.__on_update = lambda value: on_update_name(value, name) else: self.__on_update = None self.__validate = kargs.pop('validate', None) self.__always_update = kargs.pop('always_update', False) self.__enable_write = True if 'initial_value' in kargs: value = self._value_to_epics(kargs.pop('initial_value')) initial_severity = alarm.NO_ALARM initial_status = alarm.NO_ALARM else: value = self._default_value() # To maintain backwards compatibility, if there is no initial value # we mark the record as invalid initial_severity = alarm.INVALID_ALARM initial_status = alarm.UDF_ALARM self._value = (value, initial_severity, initial_status) self._blocking = kargs.pop('blocking', blocking) if self._blocking: self._callback = create_callback_capsule() self.__super.__init__(name, **kargs) def init_record(self, record): '''Special record initialisation for out records only: implements special record initialisation if an initial value has been specified, allowing out records to have a sensible initial value.''' self._write_value(record, self._value[0]) if 'MLST' in self._fields_: record.MLST = self._value[0] record.TIME = time.time() record.UDF = 0 record.NSEV = self._value[1] record.NSTA = self._value[2] recGblResetAlarms(record) return self._epics_rc_ def __completion(self, record): '''Signals that all on_update processing is finished''' if self._blocking: signal_processing_complete(record, self._callback) def _process(self, record): '''Processing suitable for output records. Performs immediate value validation and asynchronous update notification.''' if record.PACT: return EPICS_OK # Ignore memoized value, retrieve it from the VAL field instead value = self._read_value(record) _, severity, alarm = self._value self.process_severity(record, severity, alarm) if not self.__always_update and \ self._compare_values(value, self._value[0]): # If the value isn't making a change then don't do anything. return EPICS_OK python_value = self._epics_to_value(value) if self.__enable_write and self.__validate and \ not self.__validate(self, python_value): # Asynchronous validation rejects value, so restore the last good # value. self._write_value(record, self._value[0]) return EPICS_ERROR else: # Value is good. Hang onto it, let users know the value has changed self._value = (value, severity, alarm) record.UDF = 0 if self.__on_update and self.__enable_write: record.PACT = self._blocking dispatcher( self.__on_update, func_args=(python_value,), completion = self.__completion, completion_args=(record,)) return EPICS_OK def _value_to_dbr(self, value): return self._dbf_type_, 1, addressof(value), value
[docs] def set_alarm(self, severity, alarm): '''Updates the alarm status without changing the stored value. An update is triggered, and a timestamp can optionally be specified.''' self._value = (self._value[0], severity, alarm) self.set( self.get(), severity=severity, alarm=alarm)
[docs] def set(self, value, process=True, severity=alarm.NO_ALARM, alarm=alarm.NO_ALARM): '''Special routine to set the value directly.''' value = self._value_to_epics(value) try: _record = self._record except AttributeError: # Record not initialised yet. Record data for when # initialisation occurs self._value = (value, severity, alarm) else: # The array parameter is used to keep the raw pointer alive dbf_code, length, data, array = self._value_to_dbr(value) self.__enable_write = process db_put_field(_record.NAME, dbf_code, data, length) self.__enable_write = True
[docs] def get(self): return self._epics_to_value(self._value[0])
def _Device(Base, record_type, ctype, dbf_type, epics_rc, mlst = False): '''Wrapper for generating simple records.''' class GenericDevice(Base): _record_type_ = record_type _device_name_ = 'devPython_' + record_type _fields_ = ['UDF', 'VAL'] _epics_rc_ = epics_rc _ctype_ = staticmethod(ctype) _dbf_type_ = dbf_type if mlst: _fields_.append('MLST') GenericDevice.__name__ = record_type return GenericDevice _In = ProcessDeviceSupportIn _Out = ProcessDeviceSupportOut def _Device_In(*args, **kargs): return _Device(_In, mlst = False, *args, **kargs) def _Device_Out(*args, **kargs): return _Device(_Out, mlst = True, *args, **kargs) longin = _Device_In('longin', c_int32, fields.DBF_LONG, EPICS_OK) longout = _Device_Out('longout', c_int32, fields.DBF_LONG, EPICS_OK) int64in = _Device_In('int64in', c_int64, fields.DBF_INT64, EPICS_OK) int64out = _Device_Out('int64out', c_int64, fields.DBF_INT64, EPICS_OK) bi = _Device_In('bi', c_uint16, fields.DBF_CHAR, NO_CONVERT) bo = _Device_Out('bo', c_uint16, fields.DBF_CHAR, NO_CONVERT) mbbi = _Device_In('mbbi', c_uint16, fields.DBF_SHORT, NO_CONVERT) mbbo = _Device_Out('mbbo', c_uint16, fields.DBF_SHORT, NO_CONVERT) def _string_at(value, count): # Need string_at() twice to ensure string is size limited *and* null # terminated. value = ctypes.string_at(ctypes.string_at(value, count)) # Convert bytes to unicode string return value.decode(errors = 'replace') class EpicsString: _fields_ = ['UDF', 'VAL'] _epics_rc_ = EPICS_OK _ctype_ = c_char * 40 _dbf_type_ = fields.DBF_STRING def _value_to_epics(self, value): # It's a little odd: we can't simply construct a value from the byte # string, but we can update the array in an existing value. # Value being written must be a string, and will be automatically null # terminated where possible. result = self._ctype_() result.value = value.encode() + b'\0' return result def _epics_to_value(self, epics): return _string_at(epics, 40) class stringin(EpicsString, ProcessDeviceSupportIn): _record_type_ = 'stringin' _device_name_ = 'devPython_stringin' class stringout(EpicsString, ProcessDeviceSupportOut): _record_type_ = 'stringout' _device_name_ = 'devPython_stringout' dset_process_linconv = ( [('process', CFUNCTYPE(c_int, c_void_p)), ('special_linconv', CFUNCTYPE(c_int, c_void_p, c_int))], [0, 0]) # For ai and ao there's no point in supporting RVAL <-> VAL conversion, so # for these we support no conversion directly. class ai(ProcessDeviceSupportIn): _record_type_ = 'ai' _device_name_ = 'devPython_ai' _fields_ = ['UDF', 'VAL'] _dset_extra_ = dset_process_linconv _epics_rc_ = NO_CONVERT _ctype_ = c_double _dbf_type_ = fields.DBF_DOUBLE def _process(self, record): # Because we're returning NO_CONVERT we need to do the .UDF updating # ourself (otherwise the record support layer does this). record.UDF = int(numpy.isnan(self._value[0])) return self.__super._process(record) class ao(ProcessDeviceSupportOut): _record_type_ = 'ao' _device_name_ = 'devPython_ao' _fields_ = ['UDF', 'VAL', 'MLST'] _dset_extra_ = dset_process_linconv _epics_rc_ = NO_CONVERT _ctype_ = c_double _dbf_type_ = fields.DBF_DOUBLE def to_epics_str_array(value): """Convert the given array of Python strings to an array of EPICS nul-terminated strings""" result = numpy.empty(len(value), 'S40') for n, s in enumerate(value): if isinstance(s, str): val = EpicsString._ctype_() val.value = s.encode() + b'\0' result[n] = val.value else: result[n] = s return result def _require_waveform(value, dtype): if dtype and dtype.char == 'S': return to_epics_str_array(value) else: if isinstance(value, bytes): # Special case hack for byte arrays. Surprisingly tricky: value = numpy.frombuffer(value, dtype = numpy.uint8) value = numpy.require(value, dtype = dtype) if value.shape == (): value.shape = (1,) assert value.ndim == 1, 'Can\'t write multidimensional arrays' return value class WaveformBase(ProcessDeviceSupportCore): _link_ = 'INP' # In the waveform record class, the following four fields are key: # FTVL Type of stored waveform (as a DBF_ code) # BPTR Pointer to raw array containing waveform data # NELM Length of allocated array in number of elements # NORD Currently reported length of array (0 <= NORD <= NELM) _fields_ = ['UDF', 'FTVL', 'BPTR', 'NELM', 'NORD'] def __init__(self, name, _wf_nelm, _wf_dtype, **kargs): self._dtype = _wf_dtype self._nelm = _wf_nelm self.__super.__init__(name, **kargs) def init_record(self, record): self._dbf_type_ = record.FTVL return self.__super.init_record(record) def _read_value(self, record): nord = record.NORD result = numpy.empty(nord, dtype = self._dtype) memmove( result.ctypes.data_as(c_void_p), record.BPTR, self._dtype.itemsize * nord) return result def _write_value(self, record, value): nord = len(value) memmove( record.BPTR, value.ctypes.data_as(c_void_p), self._dtype.itemsize * nord) record.NORD = nord def _compare_values(self, value, other): return numpy.array_equal(value, other) def _value_to_epics(self, value): # Ensure we always convert incoming value into numpy array, regardless # of whether the record has been initialised or not value = _require_waveform(value, self._dtype) # Because arrays are mutable values it's ever so easy to accidentially # call set() with a value which subsequently changes. To avoid this # common class of bug, at the cost of duplicated code and data, here we # ensure a copy is taken of the value. assert len(value) <= self._nelm, 'Value too long for waveform' value = numpy.copy(value) # As we return a reference to the numpy array, ensure it cannot be # modified under our noses value.flags.writeable = False return value def _epics_to_value(self, value): if self._dtype.char == 'S': return [_string_at(s, 40) for s in value] else: return value def _value_to_dbr(self, value): return self._dbf_type_, len(value), value.ctypes.data, value class waveform(WaveformBase, ProcessDeviceSupportIn): _record_type_ = 'waveform' _device_name_ = 'devPython_waveform' class waveform_out(WaveformBase, ProcessDeviceSupportOut): _record_type_ = 'waveform' _device_name_ = 'devPython_waveform_out' class LongStringBase(WaveformBase): _dtype = numpy.dtype('uint8') def _value_to_epics(self, value): value = value.encode(errors = 'replace') # Convert a string into an array of characters. This will produce # the correct behaviour when treating a character array as a string. # Note that the trailing null is needed to work around problems with # some clients. value = numpy.frombuffer(value + b'\0', dtype = numpy.uint8) # Ensure string isn't too long to fit into waveform assert len(value) <= self._nelm, 'Value too long for waveform' return value def _epics_to_value(self, value): return _string_at(value.ctypes, len(value)) class long_stringin(LongStringBase, ProcessDeviceSupportIn): _record_type_ = 'waveform' _device_name_ = 'devPython_long_stringin' class long_stringout(LongStringBase, ProcessDeviceSupportOut): _record_type_ = 'waveform' _device_name_ = 'devPython_long_stringout' # Ensure the .dbd file is loaded. dbLoadDatabase('device.dbd', os.path.dirname(__file__), None)