Source code for softioc.builder

import os
import numpy

from .device_core import RecordLookup
from .softioc import dbLoadDatabase

from epicsdbbuilder import *

InitialiseDbd()
LoadDbdFile(os.path.join(os.path.dirname(__file__), 'device.dbd'))

from . import device, pythonSoftIoc  # noqa
# Re-export this so users only have to import the builder
from .device import SetBlocking, to_epics_str_array # noqa

PythonDevice = pythonSoftIoc.PythonDevice()


# ----------------------------------------------------------------------------
#  Wrappers for PythonDevice record constructors.
#
# The SCAN field for all input records defaults to I/O Intr.


def _set_in_defaults(fields):
    fields.setdefault('SCAN', 'I/O Intr')
    fields.setdefault('PINI', 'YES')
    fields.setdefault('DISP', 1)
    _set_alarm(fields)

def _set_out_defaults(fields):
    fields.setdefault('OMSL', 'supervisory')

def _set_alarm(fields):
    if 'status' in fields:
        assert 'STAT' not in fields, 'Can\'t specify both status and STAT'
        fields['STAT'] = _statStrings[fields.pop('status')]
    if 'severity' in fields:
        assert 'SEVR' not in fields, 'Can\'t specify both severity and SEVR'
        fields['SEVR'] = _severityStrings[fields.pop('severity')]

# For longout and ao we want DRV{L,H} to match {L,H}OPR by default
def _set_scalar_out_defaults(fields, DRVL, DRVH):
    fields['DRVL'] = DRVL
    fields['DRVH'] = DRVH
    fields.setdefault('LOPR', DRVL)
    fields.setdefault('HOPR', DRVH)


[docs]def aIn(name, LOPR=None, HOPR=None, EGU=None, PREC=None, **fields): _set_in_defaults(fields) return PythonDevice.ai( name, LOPR = LOPR, HOPR = HOPR, EGU = EGU, PREC = PREC, **fields)
[docs]def aOut(name, DRVL=None, DRVH=None, EGU=None, PREC=None, **fields): _set_out_defaults(fields) _set_scalar_out_defaults(fields, DRVL, DRVH) return PythonDevice.ao(name, EGU = EGU, PREC = PREC, **fields)
[docs]def boolIn(name, ZNAM=None, ONAM=None, **fields): _set_in_defaults(fields) return PythonDevice.bi(name, ZNAM = ZNAM, ONAM = ONAM, **fields)
[docs]def boolOut(name, ZNAM=None, ONAM=None, **fields): _set_out_defaults(fields) return PythonDevice.bo(name, ZNAM = ZNAM, ONAM = ONAM, **fields)
[docs]def longIn(name, LOPR=None, HOPR=None, EGU=None, **fields): _set_in_defaults(fields) fields.setdefault('MDEL', -1) return PythonDevice.longin( name, LOPR = LOPR, HOPR = HOPR, EGU = EGU, **fields)
[docs]def longOut(name, DRVL=None, DRVH=None, EGU=None, **fields): _set_out_defaults(fields) _set_scalar_out_defaults(fields, DRVL, DRVH) return PythonDevice.longout(name, EGU = EGU, **fields)
[docs]def int64In(name, LOPR=None, HOPR=None, EGU=None, **fields): _set_in_defaults(fields) fields.setdefault('MDEL', -1) return PythonDevice.int64in( name, LOPR = LOPR, HOPR = HOPR, EGU = EGU, **fields)
[docs]def int64Out(name, DRVL=None, DRVH=None, EGU=None, **fields): _set_out_defaults(fields) _set_scalar_out_defaults(fields, DRVL, DRVH) return PythonDevice.int64out(name, EGU = EGU, **fields)
# Field name prefixes for mbbi/mbbo records. _mbbPrefixes = [ 'ZR', 'ON', 'TW', 'TH', 'FR', 'FV', 'SX', 'SV', # 0-7 'EI', 'NI', 'TE', 'EL', 'TV', 'TT', 'FT', 'FF'] # 8-15 # All the severity strings supported by <prefix>SV _severityStrings = ['NO_ALARM', 'MINOR', 'MAJOR', 'INVALID'] _statStrings = [ 'NO_ALARM', 'READ', 'WRITE', 'HIHI', 'HIGH', 'LOLO', 'LOW', 'STATE', 'COS', 'COMM', 'TIMEOUT', 'HWLIMIT', 'CALC', 'SCAN', 'LINK', 'SOFT', 'BAD_SUB', 'UDF', 'DISABLE', 'SIMM', 'READ_ACCESS', 'WRITE_ACCESS'] # Converts a list of (option [,severity]) values or tuples into field settings # suitable for mbbi and mbbo records. def _process_mbb_values(options, fields): def process_value(prefix, value, option, severity=None): fields[prefix + 'ST'] = option fields[prefix + 'VL'] = value if severity: if isinstance(severity, int): # Map alarm.MINOR_ALARM -> "MINOR" severity = _severityStrings[severity] fields[prefix + 'SV'] = severity # zip() silently ignores extra values in options, so explicitly check length assert len(options) <= 16, 'May not specify more than 16 enum values' for prefix, (value, option) in zip(_mbbPrefixes, enumerate(options)): if isinstance(option, tuple): # The option is tuple consisting of the option name and an optional # alarm severity. process_value(prefix, value, *option) else: # The option is a simple string naming the option. Assign the # default numerical value (and no severity setting). process_value(prefix, value, option)
[docs]def mbbIn(name, *options, **fields): _process_mbb_values(options, fields) _set_in_defaults(fields) return PythonDevice.mbbi(name, **fields)
[docs]def mbbOut(name, *options, **fields): _process_mbb_values(options, fields) _set_out_defaults(fields) return PythonDevice.mbbo(name, **fields)
[docs]def stringIn(name, **fields): _set_in_defaults(fields) return PythonDevice.stringin(name, **fields)
[docs]def stringOut(name, **fields): _set_out_defaults(fields) return PythonDevice.stringout(name, **fields)
[docs]def Action(name, **fields): return boolOut(name, always_update = True, **fields)
# Converts numpy dtype name to FTVL value. NumpyDtypeToDbf = { 'int8': 'CHAR', 'uint8': 'UCHAR', 'int16': 'SHORT', 'uint16': 'USHORT', 'int32': 'LONG', 'uint32': 'ULONG', 'float32': 'FLOAT', 'float64': 'DOUBLE', 'bytes320': 'STRING', # Numpy term for 40-character byte str (40*8 bits) } # Coverts FTVL string to numpy type DbfStringToNumpy = { 'CHAR': 'int8', 'UCHAR': 'uint8', 'SHORT': 'int16', 'USHORT': 'uint16', 'LONG': 'int32', 'ULONG': 'uint32', 'FLOAT': 'float32', 'DOUBLE': 'float64', 'STRING': 'S40', } def _get_length(fields, default = None): '''Helper function for getting 'length' or 'NELM' from arguments''' if 'length' in fields: assert 'NELM' not in fields, 'Cannot specify NELM and length together' return fields.pop('length') elif 'NELM' in fields: return fields.pop('NELM') else: assert default is not None, 'Must specify waveform length' return default def _waveform(value, fields): '''Helper routine for waveform construction. If a value is given it is interpreted as an initial value and used to configure length and datatype (unless these are overridden), otherwise length and datatype must be specified.''' if 'initial_value' in fields: assert not value, 'Can\'t specify initial value twice!' value = (fields.pop('initial_value'),) # Datatype can be specified as keyword argument, taken from FTVL, or derived # from the initial value if 'datatype' in fields: assert 'FTVL' not in fields, \ 'Can\'t specify FTVL and datatype together' datatype = fields.pop('datatype') if datatype == int or datatype == 'int': # Convert Python int to 32-bit integer, it's all we can handle datatype = numpy.dtype('int32') else: datatype = numpy.dtype(datatype) elif 'FTVL' in fields: datatype = numpy.dtype(DbfStringToNumpy[fields['FTVL']]) else: # No datatype specified, will have to infer from initial value datatype = None if value: # If a value is specified it should be the *only* non keyword argument. value, = value initial_value = device._require_waveform(value, datatype) length = _get_length(fields, len(initial_value)) # Special case for [u]int64: if the initial value comes in as 64 bit # integers we cannot represent that, so recast it as [u]int32 # Special case for array of strings to mark each element as conforming # to EPICS 40-character string limit if datatype is None: if initial_value.dtype == numpy.int64: initial_value = numpy.require(initial_value, numpy.int32) elif initial_value.dtype == numpy.uint64: initial_value = numpy.require(initial_value, numpy.uint32) elif initial_value.dtype.char in ('S', 'U'): initial_value = to_epics_str_array(initial_value) else: initial_value = numpy.array([], dtype = datatype) length = _get_length(fields) datatype = initial_value.dtype assert length > 0, 'Array cannot be of zero length' fields['initial_value'] = initial_value fields['_wf_nelm'] = length fields['_wf_dtype'] = datatype fields['NELM'] = length fields['FTVL'] = NumpyDtypeToDbf[datatype.name]
[docs]def WaveformIn(name, *value, **fields): _waveform(value, fields) _set_in_defaults(fields) return PythonDevice.waveform(name, **fields)
# Legacy name Waveform = WaveformIn
[docs]def WaveformOut(name, *value, **fields): _waveform(value, fields) return PythonDevice.waveform_out(name, **fields)
def _long_string(fields): if 'length' in fields: length = fields.pop('length') elif 'initial_value' in fields: length = len(fields['initial_value'].encode(errors = 'replace')) + 1 else: # Default length of 256 length = 256 fields.setdefault('initial_value', '') fields['_wf_nelm'] = length fields['_wf_dtype'] = numpy.dtype('int8') fields['NELM'] = length fields['FTVL'] = 'CHAR' def qform_string(rec): rec.add_info("Q:form", "String") return rec
[docs]def longStringIn(name, **fields): _long_string(fields) _set_in_defaults(fields) return qform_string(PythonDevice.long_stringin(name, **fields))
[docs]def longStringOut(name, **fields): _long_string(fields) return qform_string(PythonDevice.long_stringout(name, **fields))
# ---------------------------------------------------------------------------- # Support routines for builder
[docs]def LoadDatabase(): '''This should be called after all the builder records have been created, but before calling iocInit(). The database is loaded into EPICS memory, ready for operation.''' from tempfile import mkstemp fd, database = mkstemp('.db') os.close(fd) WriteRecords(database) dbLoadDatabase(database) os.unlink(database) pythonSoftIoc.RecordWrapper.reset_builder()
[docs]def ClearRecords(): """Delete all created record information, allowing new record creation""" assert not pythonSoftIoc.RecordWrapper.is_builder_reset(), \ 'Record database has already been loaded' RecordLookup._RecordDirectory.clear() ResetRecords()
# ---------------------------------------------------------------------------- # Record name configuration. A device name prefix must be specified. SetSimpleRecordNames(None, ':') SetDeviceName = SetPrefix
[docs]def SetDeviceName(name): SetPrefix(name)
[docs]def UnsetDevice(): SetPrefix(None)
__all__ = [ # Re-exports from epicsdbbuilder 'records', 'PP', 'CP', 'MS', 'NP', # Wrappers for PythonDevice 'aIn', 'aOut', 'boolIn', 'boolOut', 'longIn', 'longOut', 'stringIn', 'stringOut', 'mbbIn', 'mbbOut', 'Waveform', 'WaveformIn', 'WaveformOut', 'longStringIn', 'longStringOut', 'Action', # Other builder support functions 'LoadDatabase', 'ClearRecords', 'SetDeviceName', 'UnsetDevice', # Device support functions 'SetBlocking' ]