Source code for softioc.softioc

import os
import sys
import atexit
from ctypes import *
from tempfile import NamedTemporaryFile

from . import imports, device
from . import cothread_dispatcher

__all__ = ['dbLoadDatabase', 'iocInit', 'interactive_ioc']


# tie in epicsAtExit() to interpreter lifecycle
@atexit.register
def epicsAtPyExit():
    imports.epicsExitCallAtExits()


[docs]def iocInit(dispatcher=None): '''This must be called exactly once after loading all EPICS database files. After this point the EPICS IOC is running and serving PVs. Args: dispatcher: A callable with signature ``dispatcher(func, *args)``. Will be called in response to caput on a record. If not supplied use `cothread` as a dispatcher. See Also: `softioc.asyncio_dispatcher` is a dispatcher for `asyncio` applications ''' if dispatcher is None: # Fallback to cothread dispatcher = cothread_dispatcher.CothreadDispatcher() # Set the dispatcher for record processing callbacks device.dispatcher = dispatcher imports.iocInit()
def safeEpicsExit(code=0): '''Calls epicsExit() after ensuring Python exit handlers called.''' if hasattr(sys, 'exitfunc'): # py 2.x try: # Calling epicsExit() will bypass any atexit exit handlers, so call # them explicitly now. sys.exitfunc() finally: # Make sure we don't try the exit handlers more than once! del sys.exitfunc elif hasattr(atexit, '_run_exitfuncs'): # py 3.x atexit._run_exitfuncs() # calls epicsExitCallAtExits() # and then OS exit() imports.epicsExit(code) epicsExit = safeEpicsExit # The following identifiers will be exported to interactive shell. command_names = [] # IOC Test facilities def ExportTest(name, argtypes, defaults=(), description='no description yet', lib=imports.dbCore): f = getattr(lib, name) f.argtypes = argtypes f.restype = None length = len(argtypes) def call_f(*args): missing = length - len(args) if missing > 0: # Add in the missing arguments from the given defaults args = args + defaults[-missing:] f(*args) call_f.__doc__ = description call_f.__name__ = name globals()[name] = call_f command_names.append(name) auto_encode = imports.auto_encode ExportTest('dba', (auto_encode,), (), '''\ dba(field) Prints value of each field in dbAddr structure associated with field.''') ExportTest('dbl', (auto_encode, auto_encode,), ('', ''), '''\ dbl(pattern='', fields='') Prints the names of records in the database matching pattern. If a (space separated) list of fields is also given then the values of the fields are also printed.''') ExportTest('dbnr', (c_int,), (0,), '''\ dbnr(all=0) Print number of records of each record type.''') ExportTest('dbgrep', (auto_encode,), (), '''\ dbgrep(pattern) Lists all record names that match the pattern. * matches any number of characters in a record name.''') ExportTest('dbgf', (auto_encode,), (), '''\ dbgf(field) Prints field type and value.''') ExportTest('dbpf', (auto_encode, auto_encode,), (), '''\ dbpf(field, value) Writes the given value into the field.''') ExportTest('dbpr', (auto_encode, c_int,), (0,), '''\ dbpr(record, interest=0) Prints all the fields in record up to the indicated interest level: = ======================================================== 0 Application fields which change during record processing 1 Application fields which are fixed during processing 2 System developer fields of major interest 3 System developer fields of minor interest 4 All other fields = ========================================================''') ExportTest('dbtr', (auto_encode,), (), '''\ dbtr(record) Tests processing of the specified record.''') ExportTest('dbtgf', (auto_encode,), (), '''\ dbtgf(field_name) This performs a dbNameToAddr and then calls dbGetField with all possible request types and options. It prints the results of each call. This routine is of most interest to system developers for testing database access.''') ExportTest('dbtpf', (auto_encode, auto_encode,), (), '''\ dbtpf(field_name, value) This command performs a dbNameToAddr, then calls dbPutField, followed by dbgf for each possible request type. This routine is of interest to system developers for testing database access.''') ExportTest('dbtpn', (auto_encode, auto_encode,), (), '''\ dbtpn(field, value) This command performs a dbProcessNotify request. If a non-null value argument string is provided it issues a putProcessRequest to the named record; if no value is provided it issues a processGetRequest. This routine is mainly of interest to system developers for testing database access.''') ExportTest('dbior', (auto_encode, c_int,), ('', 0,), '''\ dbior(driver='', interest=0) Prints driver reports for the selected driver (or all drivers if driver is omitted) at the given interest level.''') ExportTest('dbhcr', (), (), '''\ dbhcr() Prints hardware configuration report.''') ExportTest('gft', (auto_encode,), (), '''\ gft(field) Get Field Test for old database access''') ExportTest('pft', (auto_encode,), (), '''\ pft(field, value) Put Field Test for old database access''') ExportTest('tpn', (auto_encode, auto_encode,), (), '''\ tpn(field, value) Test Process Notify for old database access''') ExportTest('dblsr', (auto_encode, c_int,), (), '''\ dblsr(recordname, level) This command generates a report showing the lock set to which each record belongs. If recordname is 0, "", or "*" all records are shown, otherwise only records in the same lock set as recordname are shown. level can have the following values: = ======================================================= 0 Show lock set information only 1 Show each record in the lock set 2 Show each record and all database links in the lock set = =======================================================''') ExportTest('dbLockShowLocked', (c_int,), (), '''\ dbLockShowLocked(level) This command generates a report showing all locked locksets, the records they contain, the lockset state and the thread that currently owns the lockset. The level argument is passed to epicsMutexShow to adjust the information reported about each locked epicsMutex.''') ExportTest('scanppl', (c_double,), (0.0,), '''\ scanppl(rate=0.0) Prints all records with the selected scan rate (or all if rate=0).''') ExportTest('scanpel', (c_int,), (0,), '''\ scanpel(event=0) Prints all records with selected event number (or all if event=0).''') ExportTest('scanpiol', (), (), '''\ scanpiol() Prints all records in the I/O event scan lists.''') ExportTest('casr', (c_int,), (0,), '''\ casr(level=0) Channel Access Server Report - print information regarding all connected CA clients. Information printed determined by level: = ================================================================== 0 Show a one line summary of each connected client 1 Show additional information for each client 2 Also show additional information about the server's free resources = ================================================================== ''') ExportTest('dbel', (auto_encode,), (), '''\ dbel(record_name) This routine prints the Channel Access event list for the specified record.''') ExportTest('dbcar', (auto_encode, c_int), ("*", 0,), '''\ dbcar(record_name, level) Database to channel access report. This command generates a report showing database channel access links. If record_name is “*” then information about all records is shown otherwise only information about the specified record. level can have the following values: = ================================================================== 0 Show summary information only. 1 Show summary and each CA link that is not connected. 2 Show summary and status of each CA link = ================================================================== ''') ExportTest('generalTimeReport', (c_int,), (0,), '''\ generalTimeReport(int level) This routine displays the time providers and their priority levels that have registered with the General Time subsystem for both current and event times. At level 1 it also shows the current time as obtained from each provider.''', lib=imports.Com) ExportTest('eltc', (c_int,), (), '''\ eltc(noYes) This determines if error messages are displayed on the IOC console. 0 means no and any other value means yes.''', lib=imports.Com) # Hacked up exit object so that when soft IOC framework sends us an exit command # we actually exit. class Exiter: def __repr__(self): # hack to exit when "called" with no parenthesis sys.exit(0) def __call__(self, code=0): sys.exit(code) exit = Exiter() command_names.append('exit')
[docs]def dbLoadDatabase(database, path = None, substitutions = None): '''Loads a database file and applies any given substitutions.''' imports.dbLoadDatabase(database, path, substitutions)
def _add_records_from_file(dirname, file, substitutions): # This is very naive, it loads all includes before their parents which # possibly can put them out of order, but it works well enough for # devIocStats with open(os.path.join(dirname, file)) as f: lines, include_subs = [], '' for line in f.readlines(): line = line.rstrip() if line.startswith('substitute'): # substitute "QUEUE=scanOnce, QUEUE_CAPS=SCANONCE" # keep hold of the substitutions include_subs = line.split('"')[1] elif line.startswith('include'): # include "iocQueue.db" subs = substitutions if substitutions and include_subs: subs = substitutions + ', ' + include_subs else: subs = substitutions + include_subs _add_records_from_file(dirname, line.split('"')[1], subs) else: # A record line lines.append(line) # Write a tempfile and load it with NamedTemporaryFile(suffix='.db', delete=False) as f: f.write(os.linesep.join(lines).encode()) dbLoadDatabase(f.name, substitutions=substitutions) os.unlink(f.name)
[docs]def devIocStats(ioc_name): '''This will load a template for the devIocStats library with the specified IOC name. This should be called before `iocInit`''' substitutions = 'IOCNAME=' + ioc_name + ', TODFORMAT=%m/%d/%Y %H:%M:%S' iocstats_dir = os.path.join( os.path.dirname(__file__), 'iocStats', 'iocAdmin', 'Db') _add_records_from_file(iocstats_dir, 'ioc.template', substitutions)
[docs]def interactive_ioc(context = {}, call_exit = True): '''Fires up the interactive IOC prompt with the given context. Args: context: A dictionary of values that will be made available to the interactive Python shell together with a number of EPICS test functions call_exit: If `True`, the IOC will be terminated by calling epicsExit which means that `interactive_ioc` will not return ''' # Add all our commands to the given context. exports = dict((key, globals()[key]) for key in command_names) import code try: code.interact(local = dict(exports, **context), exitmsg = '') except SystemExit as e: if call_exit: safeEpicsExit(e.code) raise if call_exit: safeEpicsExit(0)
[docs]def non_interactive_ioc(): '''Function to run the IOC in non-interactive mode. This mode is useful for running the IOC as a background process without user interaction. This function expects a stop signal. When it receives one, the IOC stops. ''' device.dispatcher.wait_for_quit() safeEpicsExit(0)