Source code for softioc.softioc

import os
import sys
from ctypes import *

from epicsdbbuilder.recordset import recordset

from . import imports, device

__all__ = ['dbLoadDatabase', 'iocInit', 'interactive_ioc']


epicsExit = imports.epicsExit


[docs]def iocInit(dispatcher=None): """This must be called exactly once after loading all EPICS database files. After this point the EPICS IOC is running and serving PVs. Args: dispatcher: A callable with signature ``dispatcher(func, *args)``. Will be called in response to caput on a record. If not supplied use `cothread` as a dispatcher. See Also: `softioc.asyncio_dispatcher` is a dispatcher for `asyncio` applications """ if dispatcher is None: # Fallback to cothread import cothread # Create our own cothread callback queue so that our callbacks # processing doesn't interfere with other callback processing. dispatcher = cothread.cothread._Callback() # Set the dispatcher for record processing callbacks device.dispatcher = dispatcher imports.iocInit()
def safeEpicsExit(): '''Calls epicsExit() after ensuring Python exit handlers called.''' if hasattr(sys, 'exitfunc'): try: # Calling epicsExit() will bypass any atexit exit handlers, so call # them explicitly now. sys.exitfunc() finally: # Make sure we don't try the exit handlers more than once! del sys.exitfunc epicsExit() # The following identifiers will be exported to interactive shell. command_names = [] # IOC Test facilities def ExportTest(name, argtypes, defaults=(), description='no description yet', lib=imports.dbCore): f = getattr(lib, name) f.argtypes = argtypes f.restype = None length = len(argtypes) def call_f(*args): missing = length - len(args) if missing > 0: # Add in the missing arguments from the given defaults args = args + defaults[-missing:] f(*args) call_f.__doc__ = description call_f.__name__ = name globals()[name] = call_f command_names.append(name) auto_encode = imports.auto_encode ExportTest('dba', (auto_encode,), (), '''\ dba(field) Prints value of each field in dbAddr structure associated with field.''') ExportTest('dbl', (auto_encode, auto_encode,), ('', ''), '''\ dbl(pattern='', fields='') Prints the names of records in the database matching pattern. If a (space separated) list of fields is also given then the values of the fields are also printed.''') ExportTest('dbnr', (c_int,), (0,), '''\ dbnr(all=0) Print number of records of each record type.''') ExportTest('dbgrep', (auto_encode,), (), '''\ dbgrep(pattern) Lists all record names that match the pattern. * matches any number of characters in a record name.''') ExportTest('dbgf', (auto_encode,), (), '''\ dbgf(field) Prints field type and value.''') ExportTest('dbpf', (auto_encode, auto_encode,), (), '''\ dbpf(field, value) Writes the given value into the field.''') ExportTest('dbpr', (auto_encode, c_int,), (0,), '''\ dbpr(record, interest=0) Prints all the fields in record up to the indicated interest level: = ======================================================== 0 Application fields which change during record processing 1 Application fields which are fixed during processing 2 System developer fields of major interest 3 System developer fields of minor interest 4 All other fields = ========================================================''') ExportTest('dbtr', (auto_encode,), (), '''\ dbtr(record) Tests processing of the specified record.''') ExportTest('dbtgf', (auto_encode,), (), '''\ dbtgf(field_name) This performs a dbNameToAddr and then calls dbGetField with all possible request types and options. It prints the results of each call. This routine is of most interest to system developers for testing database access.''') ExportTest('dbtpf', (auto_encode, auto_encode,), (), '''\ dbtpf(field_name, value) This command performs a dbNameToAddr, then calls dbPutField, followed by dbgf for each possible request type. This routine is of interest to system developers for testing database access.''') ExportTest('dbtpn', (auto_encode, auto_encode,), (), '''\ dbtpn(field, value) This command performs a dbProcessNotify request. If a non-null value argument string is provided it issues a putProcessRequest to the named record; if no value is provided it issues a processGetRequest. This routine is mainly of interest to system developers for testing database access.''') ExportTest('dbior', (auto_encode, c_int,), ('', 0,), '''\ dbior(driver='', interest=0) Prints driver reports for the selected driver (or all drivers if driver is omitted) at the given interest level.''') ExportTest('dbhcr', (), (), '''\ dbhcr() Prints hardware configuration report.''') ExportTest('gft', (auto_encode,), (), '''\ gft(field) Get Field Test for old database access''') ExportTest('pft', (auto_encode,), (), '''\ pft(field, value) Put Field Test for old database access''') ExportTest('tpn', (auto_encode, auto_encode,), (), '''\ tpn(field, value) Test Process Notify for old database access''') ExportTest('dblsr', (auto_encode, c_int,), (), '''\ dblsr(recordname, level) This command generates a report showing the lock set to which each record belongs. If recordname is 0, "", or "*" all records are shown, otherwise only records in the same lock set as recordname are shown. level can have the following values: = ======================================================= 0 Show lock set information only 1 Show each record in the lock set 2 Show each record and all database links in the lock set = =======================================================''') ExportTest('dbLockShowLocked', (c_int,), (), '''\ dbLockShowLocked(level) This command generates a report showing all locked locksets, the records they contain, the lockset state and the thread that currently owns the lockset. The level argument is passed to epicsMutexShow to adjust the information reported about each locked epicsMutex.''') ExportTest('scanppl', (c_double,), (0.0,), '''\ scanppl(rate=0.0) Prints all records with the selected scan rate (or all if rate=0).''') ExportTest('scanpel', (c_int,), (0,), '''\ scanpel(event=0) Prints all records with selected event number (or all if event=0).''') ExportTest('scanpiol', (), (), '''\ scanpiol() Prints all records in the I/O event scan lists.''') ExportTest('generalTimeReport', (c_int,), (0,), '''\ generalTimeReport(int level) This routine displays the time providers and their priority levels that have registered with the General Time subsystem for both current and event times. At level 1 it also shows the current time as obtained from each provider.''', lib=imports.Com) ExportTest('eltc', (c_int,), (), '''\ eltc(noYes) TThis determines if error messages are displayed on the IOC console. 0 means no and any other value means yes.''', lib=imports.Com) # Hacked up exit object so that when soft IOC framework sends us an exit command # we actually exit. class Exiter: def __repr__(self): safeEpicsExit() def __call__(self): safeEpicsExit() exit = Exiter() command_names.append('exit')
[docs]def dbLoadDatabase(database, path = None, substitutions = None): '''Loads a database file and applies any given substitutions.''' imports.dbLoadDatabase(database, path, substitutions)
def _add_records_from_file(dir, file, macros): # This is very naive, for instance macros are added to but never removed, # but it works well enough for devIocStats with open(os.path.join(dir, file)) as f: for line in f.readlines(): line = line.rstrip() if line.startswith('substitute'): # substitute "QUEUE=scanOnce, QUEUE_CAPS=SCANONCE for sub in line.split('"')[1].split(','): k, v = sub.split('=') macros[k.strip()] = v.strip() elif line.startswith('include'): # include "iocQueue.db" _add_records_from_file(dir, line.split('"')[1], macros) else: # A record line for k, v in macros.items(): line = line.replace('$(%s)' % k, v) recordset.AddBodyLine(line)
[docs]def devIocStats(ioc_name): '''This will load a template for the devIocStats library with the specified IOC name. This should be called before `iocInit`''' macros = dict(IOCNAME=ioc_name, TODFORMAT='%m/%d/%Y %H:%M:%S') iocstats_dir = os.path.join(os.path.dirname(__file__), 'iocStatsDb') _add_records_from_file(iocstats_dir, 'ioc.template', macros)
[docs]def interactive_ioc(context = {}, call_exit = True): '''Fires up the interactive IOC prompt with the given context. Args: context: A dictionary of values that will be made available to the interactive Python shell together with a number of EPICS test functions call_exit: If `True`, the IOC will be terminated by calling epicsExit which means that `interactive_ioc` will not return ''' # Add all our commands to the given context. exports = dict((key, globals()[key]) for key in command_names) import code if sys.version_info < (3, 6): interact_args = {} else: # This suppresses irritating exit message introduced by Python3. Alas, # this option is only available in Python 3.6! interact_args = dict(exitmsg = '') code.interact(local = dict(exports, **context), **interact_args) if call_exit: safeEpicsExit()