Source code for dls_bxflow_lib.bx_jobs.standard
import json
import logging
import time
# Database field names.
from dls_bxflow_api.bx_databases.constants import BxJobFieldnames
# Global bx_dataface.
from dls_bxflow_api.bx_datafaces.bx_datafaces import bx_datafaces_get_default
# Global bx_filestore.
from dls_bxflow_lib.bx_filestores.bx_filestores import bx_filestores_get_default
# Base class for a Thing which has a name and traits.
from dls_bxflow_lib.bx_jobs.base import Base
# States of things.
from dls_bxflow_lib.bx_jobs.states import States as BxJobStates
# Objects managing things.
from dls_bxflow_run.bx_gates.bx_gates import BxGates
logger = logging.getLogger(__name__)
thing_type = "dls_bxflow_lib.bx_jobs.standard"
[docs]class Standard(Base):
"""
Object representing a bx_job for a standard command.
"""
# ----------------------------------------------------------------------------------------
def __init__(self, specification=None, predefined_uuid=None):
Base.__init__(self, thing_type, specification, predefined_uuid=predefined_uuid)
self.__blocked_by_bx_gates = BxGates()
self.state(BxJobStates.PREPARING)
# -----------------------------------------------------------------------------
async def enable(self):
# Register ourself.
await self.register()
# Register all the tasks.
await self.register_bx_tasks()
await bx_datafaces_get_default().enable_bx_job(self.uuid())
# -----------------------------------------------------------------------------
async def register(self):
bx_filestores_get_default().pin_job_directory(self)
bx_job_dict = {}
bx_job_dict["uuid"] = self.uuid()
bx_job_dict["type"] = self.thing_type()
bx_job_dict[BxJobFieldnames.LABEL] = self.specification()["label"]
bx_job_dict[BxJobFieldnames.DATA_LABEL] = self.get_data_label()
bx_job_dict[BxJobFieldnames.DIRECTORY] = self.get_directory()
bx_job_dict[BxJobFieldnames.BX_WORKFLOW_UUID] = self.get_workflow_uuid()
bx_job_dict["specification"] = json.dumps(self.specification())
# Beamline and visit are set from the default filestore
# which is in play at the time of job registration.
bx_job_dict[
BxJobFieldnames.BEAMLINE
] = bx_filestores_get_default().get_beamline()
bx_job_dict[BxJobFieldnames.VISIT] = bx_filestores_get_default().get_visit()
await bx_datafaces_get_default().set_bx_jobs([bx_job_dict])
bx_job_blocked_by_bx_gates = []
for blocked_by_bx_gate in self.__blocked_by_bx_gates.list():
bx_job_blocked_by_bx_gates.append(
{"lhs": self.uuid(), "rhs": blocked_by_bx_gate.uuid()}
)
await bx_datafaces_get_default().set_bx_job_blocked_by_bx_gates(
bx_job_blocked_by_bx_gates
)
# -----------------------------------------------------------------------------
async def register_bx_tasks(self):
bx_tasks_list = []
bx_task_dependency_bx_gates = []
for bx_task in self.bx_tasks.list():
bx_filestores_get_default().pin_task_directory(self, bx_task)
bx_task_dict = {}
bx_task_dict["bx_job_uuid"] = self.uuid()
bx_task_dict["type"] = bx_task.thing_type()
bx_task_dict["uuid"] = bx_task.uuid()
bx_task_dict["state"] = bx_task.state()
bx_task_dict["label"] = bx_task.label()
bx_task_dict["directory"] = bx_task.get_directory()
# Put the directory in the specification when we register it.
bx_task.specification()["directory"] = bx_task.get_directory()
bx_task_dict["specification"] = json.dumps(bx_task.specification())
bx_tasks_list.append(bx_task_dict)
await bx_task.controlled_bx_gates.register(self.uuid(), bx_task.uuid())
for dependency_bx_gate in bx_task.dependency_bx_gates.list():
bx_task_dependency_bx_gates.append(
{"lhs": bx_task.uuid(), "rhs": dependency_bx_gate.uuid()}
)
await bx_datafaces_get_default().set_bx_task_dependency_bx_gates(
bx_task_dependency_bx_gates
)
await bx_datafaces_get_default().set_bx_tasks(bx_tasks_list)
# -----------------------------------------------------------------------------
[docs] async def fetch(self):
"""
Fetch object from bx_dataface.
"""
# Get all the bx_tasks for the bx_job.
bx_task_records = await bx_datafaces_get_default().get_bx_tasks(self.uuid())
# --------------------------------------------------------------------------------
# Get all the bx_gates controlled by the bx_tasks in the bx_job.
controlled_bx_gate_records = (
await bx_datafaces_get_default().get_controlled_bx_gates(self.uuid())
)
bx_gates = BxGates()
for controlled_bx_gate_record in controlled_bx_gate_records:
controlled_bx_gate_uuid = controlled_bx_gate_record["uuid"]
bx_gate = bx_gates.build_object(
specification=controlled_bx_gate_record["specification"],
predefined_uuid=controlled_bx_gate_uuid,
)
bx_gate.bx_task_uuid(controlled_bx_gate_record["bx_task_uuid"])
bx_gates.add(bx_gate)
# logger.debug(
# "bx_gate %s controlled by task %s"
# % (bx_gate.uuid(), bx_gate.bx_task_uuid())
# )
# --------------------------------------------------------------------------------
# Assign the in-memory objects.
# Get all the dependency bx_gates needed by the bx_tasks.
bx_task_dependency_bx_gates_records = (
await bx_datafaces_get_default().get_dependency_bx_gates(self.uuid())
)
for bx_task_record in bx_task_records:
bx_task_uuid = bx_task_record["uuid"]
bx_task = self.bx_tasks.build_object(
specification=bx_task_record["specification"],
predefined_uuid=bx_task_uuid,
)
bx_task.set_directory(bx_task_record["directory"])
bx_task.bx_job_uuid(bx_task_record["bx_job_uuid"])
for bx_gate in bx_gates.list():
if bx_gate.bx_task_uuid() == bx_task_uuid:
bx_task.controlled_bx_gates.add(bx_gate)
for (
bx_task_dependency_bx_gates_record
) in bx_task_dependency_bx_gates_records:
if bx_task_dependency_bx_gates_record["lhs"] == bx_task_uuid:
bx_gate_uuid = bx_task_dependency_bx_gates_record["rhs"]
bx_gate = bx_gates.find(bx_gate_uuid)
bx_task.dependency_bx_gates.add(bx_gate)
self.bx_tasks.add(bx_task)
# --------------------------------------------------------------------------------
# Get all the gates which will block the job when high.
blocked_by_bx_gates_records = (
await bx_datafaces_get_default().get_blocked_by_bx_gates(self.uuid())
)
for record in blocked_by_bx_gates_records:
uuid = record["rhs"]
bx_gate = bx_gates.find(uuid)
self.blocked_by_bx_gates.add(bx_gate)
# -----------------------------------------------------------------------------
async def wait(self, timeout=5.0, naptime=0.1):
is_finished = False
t0 = time.time()
while timeout is None or time.time() - t0 < timeout:
bx_job_record = await bx_datafaces_get_default().get_bx_job(self.uuid())
if bx_job_record is None:
raise RuntimeError(f"dataface does not have bx_job {self.uuid()}")
if (
bx_job_record["state"] != BxJobStates.IN_PROGRESS
and bx_job_record["state"] != BxJobStates.READY
):
is_finished = True
break
time.sleep(naptime)
return is_finished
# -----------------------------------------------------------------------------
def _get_blocked_by_bx_gates(self):
return self.__blocked_by_bx_gates
def _set_blocked_by_bx_gates(self, blocked_by_bx_gates):
self.__blocked_by_bx_gates = blocked_by_bx_gates
def _del_blocked_by_bx_gates(self):
del self.__blocked_by_bx_gates
blocked_by_bx_gates = property(
fget=_get_blocked_by_bx_gates,
fset=_set_blocked_by_bx_gates,
fdel=_del_blocked_by_bx_gates,
doc="The blocked_by_bx_gates property.",
)