Source code for dls_bxflow_epsic.algorithms.mib_converter
import glob
import logging
import os
# Utilities.
from dls_utilpack.callsign import callsign
from dls_utilpack.describe import describe
from dls_utilpack.explain import explain
# Function to get a data label from a data_filename.
from dls_bxflow_epsic.utilities import (
data_filename_2_byv,
mib_filename_2_data_label,
mib_filename_2_filestore_directory,
)
logger = logging.getLogger(__name__)
# --------------------------------------------------------------------------------
class _logging_filter:
"""
Python logging filter to remove annoying messages.
These are not super useful to see all the time at the INIT or even DEBUG level.
TODO: Remove matplotlib from imports done by mib2hdfConvert package.
"""
def filter(self, record):
if (
"matplotlib/__init__.py" in record.pathname
and "loaded modules" in record.msg
):
return 0
return 1
# -----------------------------------------------------------------------------------
[docs]class MibConverter:
"""
Class which converts a single mib file to hdf5.
Uses the method "convert" imported from mib2hdfConvert package in https://github.com/ePSIC-DLS/epsic_tools.
This class is called from the EpsicWorkflow::add_mib_convert_task method.
This runs in the cluster.
"""
def __init__(self, mib_filename, bx_task=None):
if bx_task is None:
raise RuntimeError(
f"MibConverter has not been given a bx_task instance for mib_filename {mib_filename}"
)
# Keep references from the constructor.
self.__bx_task = bx_task
self.__mib_filename = mib_filename
# Using mib_filename, derive various other names.
self.__data_label = mib_filename_2_data_label(self.__mib_filename)
self.__filestore_directory = mib_filename_2_filestore_directory(
self.__mib_filename
)
self.__conversion_prefix = self.__data_label.split("/")[-1]
self.__beamline, self.__year, self.__visit = data_filename_2_byv(
self.__mib_filename
)
# ------------------------------------------------------------------------------
[docs] async def run(self):
"""
Run the conversion.
Even though async, the caller knows this be long-blocking.
"""
if self.has_any_converted_filenames():
logger.info(f"the conversion is apparently done for {self.__mib_filename}")
else:
# If we want the data stored under a subdirectory with the material name,
# then we have to give a folder.
# TODO: Make sure we can collect and process mib files which are not in a material subdirectory.
material = self.__data_label.split("/")
if len(material) > 1:
# Material is data label, minus the last path component.
material = "/".join(material[:-1])
folder = f"Merlin/{material}"
else:
folder = None
logger.info(
f"converting {self.__beamline} {self.__year} {self.__visit} {self.__mib_filename} -folder {folder}"
)
# Importing from mib2hdf_watch_convert drags in a lot of dependencies
# which can produce a lot of logging.
for handler in logging.getLogger().handlers:
# Filter out the unwanted log lines.
handler.addFilter(_logging_filter())
# Presume the "module load bxflow/epsic/latest" sets the PYTHONPATH for this import.
from batch_mib_convert.mib2hdf_watch_convert import convert
convert(
self.__beamline,
self.__year,
self.__visit,
[self.__mib_filename],
folder=folder,
)
for filename in self.get_interesting_converted_filenames():
# Turn any interesting converted filenames to artefacts for the catalog (ispyb).
self.__bx_task.propose_artefact(filename)
# Also make a symlink into the task's directory for the GUI to see.
symlink = f"{self.__bx_task.get_directory()}/{os.path.basename(filename)}"
try:
os.symlink(filename, symlink)
except Exception as exception:
raise RuntimeError(
explain(exception, f"creating symlink for {filename}")
)
return 0
# ------------------------------------------------------------------------------
[docs] def get_interesting_converted_filenames(self):
"""
Return list of interesting converted filenames from the conversion.
"""
suffixes = [".hdf5", "_ibf.jpg", "_subset_dp.jpg"]
filenames = []
for suffix in suffixes:
filename = (
f"{self.__filestore_directory}/{self.__conversion_prefix}{suffix}"
)
if os.path.exists(filename):
filenames.append(filename)
logger.debug(describe(f"{callsign(self)} interesting files", filenames))
return filenames
# ------------------------------------------------------------------------------
[docs] def has_any_converted_filenames(self):
"""
Return true if any conversion output filenames are existing.
"""
pattern = f"{self.__filestore_directory}/{self.__conversion_prefix}*"
filenames = glob.glob(pattern, recursive=False)
logger.debug(f"{callsign(self)} glob({pattern}) matched {len(filenames)} files")
if len(filenames) > 0:
return True
else:
return False