Source code for echolocator_lib.guis.aiohttp

import csv
import logging
import multiprocessing
import threading
from pathlib import Path
from typing import List

from dls_servbase_api.constants import Keywords as ProtocoljKeywords

# Utilities.
from dls_utilpack.callsign import callsign
from dls_utilpack.exceptions import EndOfList, ProgrammingFault
from dls_utilpack.require import require

# Basic things.
from dls_utilpack.thing import Thing
from dls_utilpack.visit import get_xchem_directory

# The model which describes the crystal wells to be injected into soakdb3.
from soakdb3_api.models.crystal_well_model import (
    CrystalWellModel as Soakdb3CrystalWellModel,
)

# Things xchembku provides.
from xchembku_api.datafaces.context import Context as XchembkuDatafaceClientContext
from xchembku_api.models.crystal_plate_filter_model import CrystalPlateFilterModel
from xchembku_api.models.crystal_well_droplocation_model import (
    CrystalWellDroplocationModel,
)
from xchembku_api.models.crystal_well_filter_model import (
    CrystalWellFilterModel,
    CrystalWellFilterSortbyEnum,
)
from xchembku_api.models.crystal_well_needing_droplocation_model import (
    CrystalWellNeedingDroplocationModel,
)

# Base class for an aiohttp server.
from echolocator_lib.base_aiohttp import BaseAiohttp

# Object managing echolocator_composers.
from echolocator_lib.composers.composers import (
    Composers,
    echolocator_composers_get_default,
    echolocator_composers_has_default,
    echolocator_composers_set_default,
)

# Gui protocolj things (must agree with javascript).
from echolocator_lib.guis.constants import Commands, Cookies, Keywords

logger = logging.getLogger(__name__)

thing_type = "echolocator_lib.echolocator_guis.aiohttp"


# ------------------------------------------------------------------------------------------
[docs]class Aiohttp(Thing, BaseAiohttp): """ Object implementing remote procedure calls for the echolocator API. """ # ---------------------------------------------------------------------------------------- def __init__(self, specification=None): Thing.__init__(self, thing_type, specification) BaseAiohttp.__init__( self, specification["type_specific_tbd"]["aiohttp_specification"], calling_file=__file__, ) self.__xchembku_client_context = None self.__xchembku = None # ---------------------------------------------------------------------------------------- def callsign(self): """""" return "%s %s" % ("Gui.Aiohttp", BaseAiohttp.callsign(self)) # ---------------------------------------------------------------------------------------- def activate_process(self): """""" try: multiprocessing.current_process().name = "gui" self.activate_process_base() except Exception as exception: logger.exception( f"unable to start {callsign(self)} process", exc_info=exception ) # ----------------------------------------------------------------------------------------
[docs] def activate_thread(self, loop): """ Called from inside a newly created thread. """ try: threading.current_thread().name = "gui" self.activate_thread_base(loop) except Exception as exception: logger.exception( f"unable to start {callsign(self)} thread", exc_info=exception )
# ---------------------------------------------------------------------------------------- async def activate_coro(self): """""" try: # No special routes, we will use protocolj dispathcing only route_tuples = [] # Start the actual coro to listen for incoming http requests. await self.activate_coro_base(route_tuples) # No default composer is set up yet? if not echolocator_composers_has_default(): # The echolocator_composer to use. echolocator_composer_specification = { "type": "echolocator_lib.echolocator_composers.html" } # Set up the default echolocator_composer. echolocator_composer = Composers().build_object( echolocator_composer_specification ) echolocator_composers_set_default(echolocator_composer) # Make the xchembku client context. s = require( f"{callsign(self)} specification", self.specification(), "type_specific_tbd", ) s = require( f"{callsign(self)} type_specific_tbd", s, "xchembku_dataface_specification", ) self.__xchembku_client_context = XchembkuDatafaceClientContext(s) # Activate the context. await self.__xchembku_client_context.aenter() # Get a reference to the xchembku interface provided by the context. self.__xchembku = self.__xchembku_client_context.get_interface() self.__export_directory = require( f"{callsign(self)} specification", self.specification(), "export_directory", ) self.__export_subdirectory = require( f"{callsign(self)} specification", self.specification(), "export_subdirectory", ) except Exception: raise RuntimeError(f"unable to start {callsign(self)} server coro") # ---------------------------------------------------------------------------------------- async def direct_shutdown(self): """""" # Forget we have an xchembku client reference. self.__xchembku = None if self.__xchembku_client_context is not None: await self.__xchembku_client_context.aexit() self.__xchembku_client_context = None # Let the base class stop the server event looping. await self.base_direct_shutdown() # ---------------------------------------------------------------------------------------- async def dispatch(self, request_dict, opaque): """""" command = require("request json", request_dict, Keywords.COMMAND) # Having no xchembku client reference means we must be shutting down. if self.__xchembku is None: raise RuntimeError( "refusing to execute command %s because server is shutting down" % (command) ) if command == Commands.LOAD_TABS: return await self.__load_tabs(opaque, request_dict) if command == Commands.SELECT_TAB: return await self.__select_tab(opaque, request_dict) elif command == Commands.REPORT_PLATES: return await self.__report_plates(opaque, request_dict) elif command == Commands.FETCH_IMAGE_LIST: return await self.__fetch_image_list(opaque, request_dict) if command == Commands.FETCH_IMAGE: return await self.__fetch_image(opaque, request_dict) elif command == Commands.EXPORT_TO_SOAKDB3: return await self.__export_to_soakdb3(opaque, request_dict) elif command == Commands.EXPORT_TO_CSV: return await self.__export_to_csv(opaque, request_dict) elif command == Commands.UPDATE: return await self.__update(opaque, request_dict) else: raise RuntimeError("invalid command %s" % (command)) # ---------------------------------------------------------------------------------------- async def __load_tabs(self, opaque, request_dict): tab_id = await self.get_cookie_content( opaque, Cookies.TABS_MANAGER, Keywords.TAB_ID ) logger.debug(f"[GUITABS] tab_id from cookie content is {tab_id}") # Reply with tabs. response = {Keywords.TAB_ID: tab_id} return response # ---------------------------------------------------------------------------------------- async def __select_tab(self, opaque, request_dict): tab_id = require("request json", request_dict, Keywords.TAB_ID) logger.debug(f"[GUITABS] tab_id in request is {tab_id}") # Put the tab_id into the cookie. self.set_cookie_content(opaque, Cookies.TABS_MANAGER, Keywords.TAB_ID, tab_id) response = {} return response # ---------------------------------------------------------------------------------------- async def __report_plates(self, opaque, request_dict): # Remember last posted value for auto_update_enabled. auto_update_enabled = await self._handle_auto_update( opaque, request_dict, Cookies.PLATE_LIST_UX ) visit_filter = await self.set_or_get_cookie_content( opaque, Cookies.PLATE_LIST_UX, "visit_filter", request_dict.get("visit_filter"), None, ) should_show_only_needing_intervention = await self.set_or_get_cookie_content( opaque, Cookies.PLATE_LIST_UX, "should_show_only_needing_intervention", request_dict.get("should_show_only_needing_intervention"), False, ) filters = { "visit_filter": visit_filter, "should_show_only_needing_intervention": should_show_only_needing_intervention, } # Start a filter where default is sorted on reverse. filter = CrystalPlateFilterModel(direction=-1) logger.debug( f"fetching image records, visit_filter is '{visit_filter}' and " f" should_show_only_needing_intervention is '{should_show_only_needing_intervention}'" ) if visit_filter is not None and visit_filter.strip() != "": filter.visit = visit_filter if should_show_only_needing_intervention: filter.needing_intervention = True # Fetch the list from the xchembku. crystal_plate_report_models = await self.__xchembku.report_crystal_plates( filter ) html = echolocator_composers_get_default().compose_crystal_plate_report( crystal_plate_report_models ) response = { "html": html, "filters": filters, "auto_update_enabled": auto_update_enabled, } return response # ---------------------------------------------------------------------------------------- async def __fetch_image_list(self, opaque, request_dict): # Remember last posted value for auto_update_enabled. auto_update_enabled = await self._handle_auto_update( opaque, request_dict, Cookies.IMAGE_LIST_UX ) visit_filter = await self.set_or_get_cookie_content( opaque, Cookies.IMAGE_LIST_UX, "visit_filter", request_dict.get("visit_filter"), None, ) barcode_filter = await self.set_or_get_cookie_content( opaque, Cookies.IMAGE_LIST_UX, "barcode_filter", request_dict.get("barcode_filter"), None, ) should_show_only_undecided = await self.set_or_get_cookie_content( opaque, Cookies.IMAGE_LIST_UX, "should_show_only_undecided", request_dict.get("should_show_only_undecided"), False, ) show_first_image = request_dict.get("show_first_image", False) if visit_filter is None: visit_filter = "" visit_filter = visit_filter.strip() if barcode_filter is None: barcode_filter = "" barcode_filter = barcode_filter.strip() filters = { "visit_filter": visit_filter, "barcode_filter": barcode_filter, "should_show_only_undecided": should_show_only_undecided, } if visit_filter == "": html = "please enter a visit" crystal_well_index = None crystal_well_count = None else: logger.debug( f"fetching image records, visit_filter is '{visit_filter}' and " f" barcode_filter is '{barcode_filter}' and " f" should_show_only_undecided is '{should_show_only_undecided}' and" f" show_first_image is '{show_first_image}'" ) # Start a filter for the list based on visit only. filter = CrystalWellFilterModel( visit=visit_filter, sortby=CrystalWellFilterSortbyEnum.NUMBER_OF_CRYSTALS, ) if barcode_filter != "": filter.barcode = barcode_filter should_show_only_undecided = await self.set_or_get_cookie_content( opaque, Cookies.IMAGE_LIST_UX, "should_show_only_undecided", request_dict.get("should_show_only_undecided"), False, ) if should_show_only_undecided: filter.is_decided = False # Fetch the properly ordered and filtered list from the xchembku. crystal_well_models = ( await self.__xchembku.fetch_crystal_wells_needing_droplocation(filter) ) crystal_well_count = len(crystal_well_models) # Remember the crystal well uuids in the list in the cookie. crystal_well_uuids = [m.uuid for m in crystal_well_models] self.set_cookie_content( opaque, Cookies.IMAGE_LIST_UX, "crystal_well_uuids", crystal_well_uuids, ) # Asking for first image only, and there are any? if show_first_image and crystal_well_count > 0: # Give the first image index in the response. crystal_well_index = 0 # Don't give html in the response. html = None # Asking for image list? else: # Give the html. html = echolocator_composers_get_default().compose_image_list( crystal_well_models ) crystal_well_index = None response = { "html": html, Keywords.CRYSTAL_WELL_INDEX: crystal_well_index, Keywords.CRYSTAL_WELL_COUNT: crystal_well_count, "filters": filters, "auto_update_enabled": auto_update_enabled, } return response # ---------------------------------------------------------------------------------------- async def __fetch_image(self, opaque, request_dict): # Get uuid from the cookie if it's not being posted here. crystal_well_index = await self.set_or_get_cookie_content( opaque, Cookies.IMAGE_EDIT_UX, Keywords.CRYSTAL_WELL_INDEX, request_dict.get(Keywords.CRYSTAL_WELL_INDEX), None, ) logger.info(f"fetching image from crystal_well_index {crystal_well_index}") # Not able to get an image from posted value or cookie? # Usually first time visiting Image Details tab when no image picked from list. if crystal_well_index is None: raise ProgrammingFault( f"no value for {Keywords.CRYSTAL_WELL_INDEX} in post or cookie {Cookies.IMAGE_EDIT_UX}" ) # Get uuids of the current list from the cookie. crystal_well_uuids = await self.get_cookie_content( opaque, Cookies.IMAGE_LIST_UX, "crystal_well_uuids", None, ) if crystal_well_uuids is None: raise ProgrammingFault( f"cookie {Cookies.IMAGE_LIST_UX} has no value for crystal_well_uuids" ) crystal_well_count = len(crystal_well_uuids) if crystal_well_index >= crystal_well_count: raise EndOfList( f"crystal_well_index {crystal_well_index} exceeds crystal_well_uuids length {crystal_well_count}" ) # The uuid at the posted index. crystal_well_uuid = crystal_well_uuids[crystal_well_index] # Filter to get just the single well. filter = CrystalWellFilterModel(anchor=crystal_well_uuid) # Get the single record. crystal_well_models = ( await self.__xchembku.fetch_crystal_wells_needing_droplocation(filter) ) if len(crystal_well_models) == 0: raise ProgrammingFault( f"crystal_well_index {crystal_well_index} for uuid {crystal_well_uuid} doesn't exist in the database" ) # Presumably there is only one image of interest. record = crystal_well_models[0].dict() # Filestore's full path to the image file. record["filename"] = "filestore" + record["filename"] response = { "record": record, # Give back current one and length of the list to help with the prev/next button composing. Keywords.CRYSTAL_WELL_INDEX: crystal_well_index, Keywords.CRYSTAL_WELL_COUNT: crystal_well_count, } return response # ---------------------------------------------------------------------------------------- async def __update(self, opaque, request_dict): t = require("ajax request", request_dict, "crystal_well_droplocation_model") # Wrap a model around the posted fields. crystal_well_droplocation_model = CrystalWellDroplocationModel(**t) # Update the database record. await self.__xchembku.upsert_crystal_well_droplocations( [crystal_well_droplocation_model], only_fields=list(t.keys()), ) # Caller wants to select the next image automatically? if request_dict.get(Keywords.SHOULD_ADVANCE, False): # There is a next image in the sequence? crystal_well_index = request_dict.get(Keywords.CRYSTAL_WELL_INDEX_NEXT) if crystal_well_index is not None: # Advance by fetching the next image record after the update. # Filters are provided in the IMAGE_LIST_UX cookie. next_request_dict = { ProtocoljKeywords.ENABLE_COOKIES: [ Cookies.IMAGE_EDIT_UX, Cookies.IMAGE_LIST_UX, ], Keywords.COMMAND: Commands.FETCH_IMAGE, Keywords.CRYSTAL_WELL_INDEX: crystal_well_index, } response = await self.__fetch_image(opaque, next_request_dict) response[ "confirmation" ] = "drop location has been updated and view advanced to next image" else: response = { "record": None, "confirmation": "drop location has been updated and have reached the end of the list", } else: response = {"confirmation": "drop location has been updated"} return response # ---------------------------------------------------------------------------------------- async def __export_to_soakdb3(self, opaque, request_dict): # Caller must provide a visit. visit_filter = request_dict.get("visit_filter") if visit_filter is None: raise RuntimeError("programming error: visit not submitted with request") # Visit must not be blank. visit_filter = visit_filter.strip() if visit_filter == "": response = {"error": "blank visit was given"} return response # Caller may provide a barcode. barcode_filter = request_dict.get("barcode_filter", "") barcode_filter = barcode_filter.strip() if barcode_filter == "": barcode_filter = None # Get a filter for wells we want to export. crystal_well_filter = CrystalWellFilterModel( visit=visit_filter, barcode=barcode_filter, is_decided=True, is_usable=True, sortby=CrystalWellFilterSortbyEnum.POSITION, ) # Fetch the list of wells according to the filter. crystal_well_models: List[ CrystalWellNeedingDroplocationModel ] = await self.__xchembku.fetch_crystal_wells_needing_droplocation( crystal_well_filter, why="[EXPFIL] get list to be epxorted" ) # Sort the list based on the (computed) row_first_position field. sorted_models = sorted( crystal_well_models, key=lambda crystal_well_model: crystal_well_model.row_first_position(), ) logger.debug(f"[EXPFIL] found {len(crystal_well_models)} to be exported") # Export the crystal wells to the appropriate soakdb3 visit. await self.__export_to_soakdb3_visit(visit_filter, sorted_models) # Make the list of droplocations to update with the exported flag. crystal_well_droplocation_models: List[CrystalWellDroplocationModel] = list() for crystal_well_model in crystal_well_models: # We only need the crystal_well_uuid for upserting. # This will have to be revisited if we ever want multiple droplocations on a single well. crystal_well_droplocation_model = CrystalWellDroplocationModel( crystal_well_uuid=crystal_well_model.uuid, is_exported_to_soakdb3=True, ) crystal_well_droplocation_models.append(crystal_well_droplocation_model) # Update the database for those that just got exported. # TODO: Use a bulk update when setting is_exported_to_soakdb3 field after export. result_counts = await self.__xchembku.upsert_crystal_well_droplocations( crystal_well_droplocation_models, only_fields=["is_exported_to_soakdb3"], why="setting is_exported_to_soakdb3 field after export", ) logger.debug(f"[EXPFIL] result_counts {result_counts}") # ---------------------------------------------------------------------- # Start a filter where we anchor on the given image. filter = CrystalPlateFilterModel(direction=-1) # Fetch the list from the xchembku. crystal_plate_report_models = await self.__xchembku.report_crystal_plates( filter ) html = echolocator_composers_get_default().compose_crystal_plate_report( crystal_plate_report_models ) response = { "confirmation": f"exported {len(crystal_well_models)} rows to soakdb3 visit {visit_filter}", "html": html, } return response # ---------------------------------------------------------------------------------------- async def __export_to_soakdb3_visit( self, visit, crystal_well_models: List[CrystalWellNeedingDroplocationModel], ) -> None: soakdb3_crystal_well_models = [] for m in crystal_well_models: soakdb3_crystal_well_models.append( Soakdb3CrystalWellModel( LabVisit=visit, CrystalPlate=m.rockminer_collected_stem, CrystalWell=m.position, EchoX=m.confirmed_microns_x, EchoY=m.confirmed_microns_y, ) ) visit_directory = Path(get_xchem_directory(self.__export_directory, visit)) logger.debug( f"exporting {len(soakdb3_crystal_well_models)} to {str(visit_directory)}" ) # Append well records to soakdb3 database. # Soakdb3 wants the "/processing" to be on the end of the visitid. await self.__xchembku.inject_soakdb3_crystal_wells( str(visit_directory / "processing"), soakdb3_crystal_well_models ) # ---------------------------------------------------------------------------------------- async def __export_to_csv(self, opaque, request_dict): # Caller must provide a visit. visit_filter = request_dict.get("visit_filter") if visit_filter is None: raise RuntimeError("visit not submitted with request (programming error)") # Visit must not be blank. visit_filter = visit_filter.strip() if visit_filter == "": response = {"error": "blank visit was given"} return response # Get a filter for wells we want to export. crystal_well_filter = CrystalWellFilterModel( visit=visit_filter, is_decided=True, is_usable=True, sortby=CrystalWellFilterSortbyEnum.POSITION, ) # Fetch the list of wells according to the filter. crystal_well_models: List[ CrystalWellNeedingDroplocationModel ] = await self.__xchembku.fetch_crystal_wells_needing_droplocation( crystal_well_filter ) # Group the wells by the plate they belong to. plates_crystal_well_models = {} for crystal_well_model in crystal_well_models: crystal_plate_uuid = crystal_well_model.crystal_plate_uuid if crystal_plate_uuid not in plates_crystal_well_models: plates_crystal_well_models[crystal_plate_uuid] = [] plates_crystal_well_models[crystal_plate_uuid].append(crystal_well_model) # Keep a list of the confirmations we will get from exporting the plates. confirmations = [] # Go through each plate separately. for ( crystal_plate_uuid, plate_crystal_well_models, ) in plates_crystal_well_models.items(): # Sort the list based on the (computed) row_first_position field. sorted_models = sorted( plate_crystal_well_models, key=lambda crystal_well_model: crystal_well_model.row_first_position(), ) # Export the crystal wells for this plate to the appropriate csv file named for the plate. filename = await self.__export_to_csv_plate( visit_filter, crystal_plate_uuid, sorted_models ) confirmations.append( f"exported {len(plate_crystal_well_models)} rows to {filename}" ) response = {"confirmation": "\n".join(confirmations)} return response # ---------------------------------------------------------------------------------------- async def __export_to_csv_plate( self, visit, crystal_plate_uuid, crystal_well_models: List[CrystalWellNeedingDroplocationModel], ): # Fetch the plate record for visit. crystal_plate_filter = CrystalPlateFilterModel(uuid=crystal_plate_uuid) crystal_plate_models = await self.__xchembku.fetch_crystal_plates( crystal_plate_filter ) if len(crystal_plate_models) == 0: raise RuntimeError( f'database integrity error: no crystal plate for uuid "{crystal_plate_uuid}"' ) crystal_plate_model = crystal_plate_models[0] # Use the stem from the rockmaker Luigi pipeline to form the csv filename. visit_directory = Path(get_xchem_directory(self.__export_directory, visit)) targets_directory = visit_directory / self.__export_subdirectory filename = ( targets_directory / f"{crystal_plate_model.rockminer_collected_stem}_targets.csv" ) if not filename.parent.is_dir(): raise RuntimeError(f"the directory does not exist: {str(filename.parent)}") with open(filename, "w", newline="") as f: writer = csv.writer(f) for m in crystal_well_models: row = [] row.append(m.position) row.append(m.confirmed_microns_x or "") row.append(m.confirmed_microns_y or "") writer.writerow(row) return filename # ---------------------------------------------------------------------------------------- async def _handle_auto_update(self, opaque, request_dict, cookie_name): # Remember last posted value for auto_update_enabled. auto_update_enabled = request_dict.get("auto_update_enabled") # logger.debug( # describe( # f"[AUTOUP] request_dict auto_update_enabled for cookie {cookie_name}", # auto_update_enabled, # ) # ) auto_update_enabled = await self.set_or_get_cookie_content( opaque, cookie_name, "auto_update_enabled", auto_update_enabled, False, ) # logger.debug( # describe( # f"[AUTOUP] request_set_or_get_cookie_content auto_update_enabled", # auto_update_enabled, # ) # ) return auto_update_enabled