Source code for softioc.device

import os
import time
import inspect
from ctypes import *
import numpy

from . import alarm
from .fields import DbfCodeToNumpy, DbrToDbfCode
from .imports import dbLoadDatabase, recGblResetAlarms, db_put_field
from .device_core import DeviceSupportCore, RecordLookup


# This is set from softioc.iocInit
# dispatcher(func, *args) will queue a callback to happen
dispatcher = None


# EPICS processing return codes
EPICS_OK = 0
EPICS_ERROR = 1
NO_CONVERT = 2


class ProcessDeviceSupportCore(DeviceSupportCore, RecordLookup):
    '''Implements canonical default processing for records with a _process
    method.  Processing typically either copies a locally set value into the
    record, or else reads a value from the record and triggers an update.
    '''

    # Most records just have an extra process method, but unfortunately ai/ao
    # will have to override this to also add their special_linconv method.
    _dset_extra_ = ([('process', CFUNCTYPE(c_int, c_void_p))], [0])

    # For some record types we want to return a different return code either
    # from record init or processing
    _epics_rc = EPICS_OK

    # Default implementations of read and write, overwritten where necessary.
    def _read_value(self, record):
        return getattr(record, 'VAL')
    def _write_value(self, record, value):
        setattr(record, 'VAL', value)


[docs]class ProcessDeviceSupportIn(ProcessDeviceSupportCore): _link_ = 'INP' def __init__(self, name, **kargs): # We implement update locking via a simple trick which relies on the # Python global interpreter lock: this ensures that assigning or # reading a single value is atomic. We therefore cluster all our # variable state into a single tuple which represents a single value # to be processed. # The tuple contains everything needed to be written: the value, # severity, alarm and optional timestamp. self._value = ( kargs.pop('initial_value', self._default_), alarm.NO_ALARM, alarm.UDF_ALARM, None) self.__super.__init__(name, **kargs) def _process(self, record, _value=None): # For input process we copy the value stored in the instance to the # record. The alarm status is also updated, and a custom timestamp # can also be set. if _value is None: _value = self._value value, severity, alarm, timestamp = _value self._write_value(record, value) self.process_severity(record, severity, alarm) if timestamp is not None: record.TIME = timestamp return self._epics_rc
[docs] def set(self, value, severity=alarm.NO_ALARM, alarm=alarm.UDF_ALARM, timestamp=None): '''Updates the stored value and triggers an update. The alarm severity and timestamp can also be specified if appropriate.''' self._value = (value, severity, alarm, timestamp) self.trigger()
[docs] def set_alarm(self, severity, alarm, timestamp=None): '''Updates the alarm status without changing the stored value. An update is triggered, and a timestamp can optionally be specified.''' self._value = (self._value[0], severity, alarm, timestamp) self.trigger()
[docs] def get(self): '''Returns the last written value.''' return self._value[0]
[docs]class ProcessDeviceSupportOut(ProcessDeviceSupportCore): _link_ = 'OUT' def __init__(self, name, **kargs): on_update = kargs.pop('on_update', None) on_update_name = kargs.pop('on_update_name', None) # At most one of on_update and on_update_name can be specified assert on_update is None or on_update_name is None, \ 'Cannot specify on_update and on_update_name together' if on_update: self.__on_update = on_update elif on_update_name: self.__on_update = lambda value: on_update_name(value, name) else: self.__on_update = None self.__validate = kargs.pop('validate', None) self.__always_update = kargs.pop('always_update', False) self._value = kargs.pop('initial_value', None) self.__enable_write = True self.__super.__init__(name, **kargs) def init_record(self, record): '''Special record initialisation for out records only: implements special record initialisation if an initial value has been specified, allowing out records to have a sensible initial value.''' if self._value is not None: self._write_value(record, self._value) if 'MLST' in self._fields_: record.MLST = self._value record.TIME = time.time() record.UDF = 0 recGblResetAlarms(record) return self._epics_rc def _process(self, record): '''Processing suitable for output records. Performs immediate value validation and asynchronous update notification.''' value = self._read_value(record) if numpy.all(value == self._value) and not self.__always_update: # If the value isn't making a change then don't do anything. return EPICS_OK if self.__enable_write and self.__validate and \ not self.__validate(self, value): # Asynchronous validation rejects value. It's up to the # validation routine to do any logging. In this case restore the # last good value. if self._value is not None: self._write_value(record, self._value) return EPICS_ERROR self._value = value if self.__on_update and self.__enable_write: dispatcher(self.__on_update, value) return EPICS_OK NumpyCharCodeToDbr = { # The following type codes are supported directly: 'S': 0, # DBR_STRING str_ 'h': 1, # DBR_SHORT short = int16 'f': 2, # DBR_FLOAT single = float32 'b': 4, # DBR_CHAR byte = int8 'i': 5, # DBR_LONG intc = int32 'd': 6, # DBR_DOUBLE float_ = float64 # These are supported as related types 'H': 1, # DBR_SHORT ushort = uint16 '?': 4, # DBR_CHAR bool_ 'B': 4, # DBR_CHAR ubyte = uint8 'I': 5, # DBR_LONG uintc = uint32 } if numpy.int_().itemsize == 4: NumpyCharCodeToDbr.update({'l': 5, 'L': 5}) # int_, uint # Converts a Python value into a form suitable for sending over channel # access. Derived from the corresponding cothread implementation. Returns # computed dbrcode, waveform length, raw data pointer and pointer to # underlying data (for lifetime management). def value_to_dbr(self, value): # First convert the data directly into an array. This will help in # subsequent processing: this does most of the type coercion. value = numpy.require(value, requirements = 'C') if value.shape == (): value.shape = (1,) assert value.ndim == 1, 'Can\'t put multidimensional arrays!' if value.dtype.char == 'S': # Need special processing to hack the array so that strings are # actually 40 characters long. new_value = numpy.empty(value.shape, 'S40') new_value[:] = value value = new_value try: dbrtype = self.NumpyCharCodeToDbr[value.dtype.char] except KeyError: # One more special case. caput() of a list of integers on a 64-bit # system will fail at this point because they were automatically # converted to 64-bit integers. Catch this special case and fix it # up by silently converting to 32-bit integers. Not really the # right thing to do (as data can be quietly lost), but the # alternative isn't nice to use either. if value.dtype.char == 'l': value = numpy.require(value, dtype = numpy.int32) dbrtype = 5 else: raise return dbrtype, len(value), value.ctypes.data, value
[docs] def set(self, value, process=True): '''Special routine to set the value directly.''' try: _record = self._record except AttributeError: # Record not initialised yet. Record the value for when # initialisation occurs self._value = value else: datatype, length, data, array = self.value_to_dbr(value) self.__enable_write = process db_put_field( _record.NAME, DbrToDbfCode[datatype], data, length) self.__enable_write = True
[docs] def get(self): return self._value
def _Device(Base, record_type, mlst=False, default=0, convert=True): '''Wrapper for generating simple records.''' class GenericDevice(Base): _record_type_ = record_type _device_name_ = 'devPython_' + record_type _default_ = default _fields_ = ['UDF', 'VAL'] _epics_rc = EPICS_OK if convert else NO_CONVERT if mlst: _fields_.append('MLST') GenericDevice.__name__ = record_type return GenericDevice _In = ProcessDeviceSupportIn _Out = ProcessDeviceSupportOut def _Device_In(type, **kargs): return _Device(_In, type, **kargs) def _Device_Out(type, convert=True, mlst=True): return _Device(_Out, type, convert=convert, mlst=mlst, default=None) longin = _Device_In('longin') longout = _Device_Out('longout') bi = _Device_In('bi', convert=False) bo = _Device_Out('bo', convert=False) stringin = _Device_In('stringin', mlst=False, default='') stringout = _Device_Out('stringout', mlst=False) mbbi = _Device_In('mbbi', convert=False) mbbo = _Device_Out('mbbo', convert=False) dset_process_linconv = ( [('process', CFUNCTYPE(c_int, c_void_p)), ('special_linconv', CFUNCTYPE(c_int, c_void_p, c_int))], [0, 0]) # For ai and ao there's no point in supporting RVAL <-> VAL conversion, so # for these we support no conversion directly. class ai(ProcessDeviceSupportIn): _record_type_ = 'ai' _device_name_ = 'devPython_ai' _default_ = 0.0 _fields_ = ['UDF', 'VAL'] _dset_extra_ = dset_process_linconv _epics_rc = NO_CONVERT def _process(self, record): _value = self._value self.__super._process(record, _value) # Because we're returning NO_CONVERT we need to do the .UDF updating # ourself (otherwise the record support layer does this). record.UDF = int(numpy.isnan(_value[0])) return NO_CONVERT class ao(ProcessDeviceSupportOut): _record_type_ = 'ao' _device_name_ = 'devPython_ao' _fields_ = ['UDF', 'VAL', 'MLST'] _dset_extra_ = dset_process_linconv _epics_rc = NO_CONVERT class WaveformBase(ProcessDeviceSupportCore): _link_ = 'INP' # In the waveform record class, the following four fields are key: # FTVL Type of stored waveform (as a DBF_ code) # BPTR Pointer to raw array containing waveform data # NELM Length of allocated array in number of elements # NORD Currently reported length of array (0 <= NORD <= NELM) _fields_ = ['UDF', 'FTVL', 'BPTR', 'NELM', 'NORD'] def init_record(self, record): self.dtype = DbfCodeToNumpy[record.FTVL] return self.__super.init_record(record) def _read_value(self, record): nord = record.NORD result = numpy.empty(nord, dtype = self.dtype) memmove( result.ctypes.data_as(c_void_p), record.BPTR, self.dtype.itemsize * nord) return result def _write_value(self, record, value): value = numpy.require(value, dtype = self.dtype) if value.shape == (): value.shape = (1,) assert value.ndim == 1, 'Can\'t write multidimensional arrays' nelm = record.NELM nord = len(value) if nord > nelm: nord = nelm memmove( record.BPTR, value.ctypes.data_as(c_void_p), self.dtype.itemsize * nord) record.NORD = nord class waveform(WaveformBase, ProcessDeviceSupportIn): _record_type_ = 'waveform' _device_name_ = 'devPython_waveform' _default_ = () # Because arrays are mutable values it's ever so easy to accidentially call # set() with a value which subsequently changes. To avoid this common class # of bug, at the cost of duplicated code and data, here we ensure a copy is # taken of the value. def set(self, value, severity=alarm.NO_ALARM, alarm=alarm.UDF_ALARM, timestamp=None): '''Updates the stored value and triggers an update. The alarm severity and timestamp can also be specified if appropriate.''' if isinstance(value, str): # Convert a string into an array of characters. This will produce # the correct behaviour when treating a character array as a string. # Note that the trailing null is needed to work around problems with # some clients. value = numpy.fromstring(value + '\0', dtype = 'uint8') value = numpy.require(value, dtype = self.dtype) self._value = (+value, severity, alarm, timestamp) if value.shape == (): value.shape = (1,) assert value.ndim == 1, 'Can\'t write multidimensional arrays' self.trigger() class waveform_out(WaveformBase, ProcessDeviceSupportOut): _record_type_ = 'waveform' _device_name_ = 'devPython_waveform_out' # Ensure the .dbd file is loaded. dbLoadDatabase("device.dbd", os.path.dirname(__file__), None)