Source code for dls_bxflow_epsic.epsic_workflow

import json
import logging
from typing import Dict, Optional, Union

# Filestore interface.
from dls_bxflow_lib.bx_filestores.bx_filestores import bx_filestores_get_default

# Base class for workflows.
from dls_bxflow_lib.bx_workflows.base import Base as BxWorkflowBase

# Object managers.
from dls_bxflow_run.bx_tasks.bx_tasks import BxTasks
from dls_bxflow_run.bx_tasks.constants import Types as BxTaskTypes

# Utilities.
from dls_utilpack.callsign import callsign
from dls_utilpack.require import require
from dls_utilpack.search_file import search_file

# Output location convention.
from dls_bxflow_epsic.utilities import data_label_2_filestore_directory

# Versions of things.
from dls_bxflow_epsic.version import meta as dls_bxflow_epsic_meta

logger = logging.getLogger(__name__)


# -----------------------------------------------------------------------------------


[docs]class EpsicWorkflow(BxWorkflowBase): """ This class provides some beamline-specific helper methods. It makes some assumptions: - tasks are added in order, and only depend on the one coming before to finish - the final task ends the job with either of its standard gates - any failure gate ends the job """ # ------------------------------------------------------------------ def __init__(self, **kwargs): # Put versions into the log. meta_dict = dls_bxflow_epsic_meta() meta_json = json.dumps(meta_dict, indent=4) logger.debug( f"[APPVERS] instantiating workflow object using versions\n{meta_json}" ) # In personal mode, we don't make beamline paths. use_data_label = ( bx_filestores_get_default().specification().get("use_data_label", True) ) if use_data_label: # On this beamline, all workflows should have a data_label in the kwargs. data_label = require(f"{callsign(self)} kwargs", kwargs, "data_label") # Derive a place to put the output files from the job execution. # This usually uses the data_label to save output files in a similar location to the orignal data file. # This needs configurator to give current beamline, year and visit. filestore_directory = data_label_2_filestore_directory(data_label) # Tell the workflow builder where the output files should go. bx_filestores_get_default().set_directory(filestore_directory) logger.info( f"{callsign(self)} sets filestore_directory to {filestore_directory}" ) # Init the base class only AFTER the filestore_directory is set. BxWorkflowBase.__init__(self, **kwargs) # There might be no data label in the constructor kwargs. if hasattr(self, "data_label"): # Modify the job's data_label. self.bx_job.set_data_label(self.data_label) # ------------------------------------------------------------------
[docs] def add_notebook_task( self, notebook_name: str, modify_cells: Optional[Dict] = None, remex_hints: Optional[Union[Dict, str]] = None, label_suffix: Optional[str] = None, ): """ Add a notebook task. Args: notebook_name (str): name of the notebook, without root directory or .ipynb suffix modify_cells (Optional[Dict]): Python code to be put into cells. Defaults to None, which means don't replace. This argument is a dict whose keys are the cell numbers. remex_hints (Optional[Dict]): Dictionary specifying the remote execution hints for this task. label_suffix (Optional[str]): Suffix to be appended to notebook name for task label, for example when multiple of the same task class are done on the same inputs Defaults to None. Raises: RuntimeError: Any kind of error in this method. Returns: BxTask task object. """ label = notebook_name try: # Make full path to the notebook to run. ipynb_filename = search_file( self.bx_configurator.require("epsic.notebook_paths"), f"{notebook_name}.ipynb", ) if label_suffix is not None: label = f"{label}{label_suffix}" # Let all notebooks have a default remex hint for the beamline. if remex_hints is None: remex_hints = "standard_science_cluster" # Specify the bxflow task. bx_task_specification = { "type": BxTaskTypes.JUPYTER, "label": label, # The scheduler attempts to match a launcher with sympathetic remex hints. # Then the chosen launcher attempts to honor the remex_hints when it creates the shell process. # Finally, the individual task may also use remex hints when it runs. "remex_hints": remex_hints, "type_specific_tbd": { "ipynb_filename": ipynb_filename, "modify_cells": modify_cells, }, } # Assemble remex_hints from task and configuration. self.assemble_remex_hints(bx_task_specification) # Build the task. bx_task = BxTasks().build_object( bx_task_specification, ) # Add it to the workflow. self.add_task(bx_task) except Exception: raise RuntimeError(f"error adding notebook task for {label}") return bx_task
# ------------------------------------------------------------------
[docs] def add_mib_convert_task( self, mib_filename, ): """ Add a mib_convert task. """ try: # Specify the bxflow task. bx_task_specification = { "type": BxTaskTypes.MODULE_CLASSNAME, "label": "mib_convert", # The scheduler attempts to match a launcher with sympathetic remex hints. # Then the chosen launcher attempts to honor the remex_hints when it creates the shell process. # Finally, the individual task may also use remex hints when it runs. "remex_hints": "mib_convert", "type_specific_tbd": { "module_classname": "dls_bxflow_epsic.algorithms.mib_converter::MibConverter", "constructor_args": [mib_filename], # This task needs the bx_task reference in order to give it artifacts. # This is a placeholder here... the value will be filled in at runtime. "constructor_kwargs": {"bx_task": None}, }, } # Assemble remex_hints from task and configuration. self.assemble_remex_hints(bx_task_specification) # Build the task. bx_task = BxTasks().build_object( bx_task_specification, ) # Add it to the workflow. self.add_task(bx_task) except Exception: raise RuntimeError("error adding mib_convert task") return bx_task