Source code for dodal.plans.check_topup

import bluesky.plan_stubs as bps

from dodal.devices.synchrotron import Synchrotron, SynchrotronMode
from dodal.log import LOGGER

ALLOWED_MODES = [SynchrotronMode.USER, SynchrotronMode.SPECIAL]
DECAY_MODE_COUNTDOWN = -1  # Value of the start_countdown PV when in decay mode
COUNTDOWN_DURING_TOPUP = 0


def _in_decay_mode(time_to_topup):
    if time_to_topup == DECAY_MODE_COUNTDOWN:
        LOGGER.info("Machine in decay mode, gating disabled")
        return True
    return False


def _gating_permitted(machine_mode: SynchrotronMode):
    if machine_mode in ALLOWED_MODES:
        LOGGER.info("Machine in allowed mode, gating top up enabled.")
        return True
    LOGGER.info("Machine not in allowed mode, gating disabled")
    return False


def _delay_to_avoid_topup(total_run_time, time_to_topup):
    if total_run_time > time_to_topup:
        LOGGER.info(
            """
            Total run time for this collection exceeds time to next top up.
            Collection delayed until top up done.
            """
        )
        return True
    LOGGER.info(
        """
        Total run time less than time to next topup. Proceeding with collection.
        """
    )
    return False


[docs] def wait_for_topup_complete(synchrotron: Synchrotron): LOGGER.info("Waiting for topup to complete") start = yield from bps.rd(synchrotron.top_up_start_countdown) while start == COUNTDOWN_DURING_TOPUP: yield from bps.sleep(0.1) start = yield from bps.rd(synchrotron.top_up_start_countdown)
[docs] def check_topup_and_wait_if_necessary( synchrotron: Synchrotron, total_exposure_time: float, ops_time: float, # Account for xray centering, rotation speed, etc ): # See https://github.com/DiamondLightSource/hyperion/issues/932 """A small plan to check if topup gating is permitted and sleep until the topup\ is over if it starts before the end of collection. Args: synchrotron (Synchrotron): Synchrotron device. total_exposure_time (float): Expected total exposure time for \ collection, in seconds. ops_time (float): Additional time to account for various operations,\ eg. x-ray centering, in seconds. Defaults to 30.0. """ machine_mode = yield from bps.rd(synchrotron.synchrotron_mode) assert isinstance(machine_mode, SynchrotronMode) time_to_topup = yield from bps.rd(synchrotron.top_up_start_countdown) if _in_decay_mode(time_to_topup) or not _gating_permitted(machine_mode): yield from bps.null() return tot_run_time = total_exposure_time + ops_time end_topup = yield from bps.rd(synchrotron.top_up_end_countdown) time_to_wait = ( end_topup if _delay_to_avoid_topup(tot_run_time, time_to_topup) else 0.0 ) yield from bps.sleep(time_to_wait) check_start = yield from bps.rd(synchrotron.top_up_start_countdown) if check_start == COUNTDOWN_DURING_TOPUP: yield from wait_for_topup_complete(synchrotron)