Source code for dodal.devices.oav.pin_image_recognition.utils

from collections.abc import Callable
from dataclasses import dataclass
from enum import Enum
from typing import Final

import cv2
import numpy as np

from dodal.log import LOGGER


[docs] class ScanDirections(Enum): FORWARD = 1 REVERSE = -1
[docs] def identity(*args, **kwargs) -> Callable[[np.ndarray], np.ndarray]: return lambda arr: arr
[docs] def erode(ksize: int, iterations: int) -> Callable[[np.ndarray], np.ndarray]: element = cv2.getStructuringElement(cv2.MORPH_RECT, (ksize, ksize)) return lambda arr: cv2.erode(arr, element, iterations=iterations)
[docs] def dilate(ksize: int, iterations: int) -> Callable[[np.ndarray], np.ndarray]: element = cv2.getStructuringElement(cv2.MORPH_RECT, (ksize, ksize)) return lambda arr: cv2.dilate(arr, element, iterations=iterations)
def _morph( ksize: int, iterations: int, morph_type: int ) -> Callable[[np.ndarray], np.ndarray]: element = cv2.getStructuringElement(cv2.MORPH_RECT, (ksize, ksize)) return lambda arr: cv2.morphologyEx(arr, morph_type, element, iterations=iterations)
[docs] def open_morph(ksize: int, iterations: int) -> Callable[[np.ndarray], np.ndarray]: return _morph(ksize=ksize, iterations=iterations, morph_type=cv2.MORPH_OPEN)
[docs] def close(ksize: int, iterations: int) -> Callable[[np.ndarray], np.ndarray]: return _morph(ksize=ksize, iterations=iterations, morph_type=cv2.MORPH_CLOSE)
[docs] def gradient(ksize: int, iterations: int) -> Callable[[np.ndarray], np.ndarray]: return _morph(ksize=ksize, iterations=iterations, morph_type=cv2.MORPH_GRADIENT)
[docs] def top_hat(ksize: int, iterations: int) -> Callable[[np.ndarray], np.ndarray]: return _morph(ksize=ksize, iterations=iterations, morph_type=cv2.MORPH_TOPHAT)
[docs] def black_hat(ksize: int, iterations: int) -> Callable[[np.ndarray], np.ndarray]: return _morph(ksize=ksize, iterations=iterations, morph_type=cv2.MORPH_BLACKHAT)
[docs] def blur(ksize: int, *args, **kwargs) -> Callable[[np.ndarray], np.ndarray]: return lambda arr: cv2.blur(arr, ksize=(ksize, ksize))
[docs] def gaussian_blur(ksize: int, *args, **kwargs) -> Callable[[np.ndarray], np.ndarray]: # Kernel size should be odd. if not ksize % 2: ksize += 1 return lambda arr: cv2.GaussianBlur(arr, (ksize, ksize), 0)
[docs] def median_blur(ksize: int, *args, **kwargs) -> Callable[[np.ndarray], np.ndarray]: if not ksize % 2: ksize += 1 return lambda arr: cv2.medianBlur(arr, ksize)
ARRAY_PROCESSING_FUNCTIONS_MAP = { 0: erode, 1: dilate, 2: open_morph, 3: close, 4: gradient, 5: top_hat, 6: black_hat, 7: blur, 8: gaussian_blur, 9: median_blur, 10: identity, } # A substitute for "None" which can fit into an numpy int array. # Also used as a substitute for a not-found sample position. NONE_VALUE: Final[int] = -1
[docs] @dataclass class SampleLocation: """ Holder type for results from sample detection. """ tip_x: int | None tip_y: int | None edge_top: np.ndarray edge_bottom: np.ndarray
[docs] class MxSampleDetect: def __init__( self, *, preprocess: Callable[[np.ndarray], np.ndarray] = lambda arr: arr, canny_upper: int = 100, canny_lower: int = 50, close_ksize: int = 5, close_iterations: int = 5, scan_direction: ScanDirections = ScanDirections.FORWARD, min_tip_height: int = 5, ): """ Configures sample detection parameters. Args: preprocess: A preprocessing function applied to the array after conversion to grayscale. See implementations of common functions above for predefined conversions Defaults to a no-op (i.e. no preprocessing) canny_upper: upper threshold for canny edge detection canny_lower: lower threshold for canny edge detection close_ksize: kernel size for "close" operation close_iterations: number of iterations for "close" operation scan_direction: ScanDirections.FORWARD for left-to-right, ScanDirections.REVERSE for right-to-left min_tip_height: minimum height of pin tip """ self.preprocess = preprocess self.canny_upper = canny_upper self.canny_lower = canny_lower self.close_ksize = close_ksize self.close_iterations = close_iterations self.scan_direction = scan_direction self.min_tip_height = min_tip_height def processArray(self, arr: np.ndarray) -> SampleLocation: # Get a greyscale version of the input. if arr.ndim == 3: gray_arr = cv2.cvtColor(arr, cv2.COLOR_BGR2GRAY) else: assert arr.ndim == 2 gray_arr = arr # Preprocess the array. (Use the greyscale one.) pp_arr = self.preprocess(gray_arr) # Find some edges. edge_arr = cv2.Canny(pp_arr, self.canny_upper, self.canny_lower) closed_arr = close(self.close_ksize, self.close_iterations)(edge_arr) # Find the sample. return self._locate_sample(closed_arr) @staticmethod def _first_and_last_nonzero_by_columns( arr: np.ndarray, ) -> tuple[np.ndarray, np.ndarray]: """ Finds the indexes of the first & last non-zero values by column in a 2d array. Outputs will contain NONE_VALUE if no non-zero values exist in a column. i.e. for input: [ [0, 0, 0, 1], [1, 1, 0, 0], [0, 1, 0, 1], ] first_nonzero will be: [1, 1, NONE_VALUE, 0] last_nonzero will be [1, 2, NONE_VALUE, 2] """ nonzero = arr.astype(dtype=bool, copy=False) any_nonzero_in_column = nonzero.any(axis=0) first_nonzero = np.where( any_nonzero_in_column, nonzero.argmax(axis=0), NONE_VALUE ) flipped = nonzero.shape[0] - np.flip(nonzero, axis=0).argmax(axis=0) - 1 last_nonzero = np.where(any_nonzero_in_column, flipped, NONE_VALUE) return first_nonzero, last_nonzero def _locate_sample(self, edge_arr: np.ndarray) -> SampleLocation: height, width = edge_arr.shape top, bottom = MxSampleDetect._first_and_last_nonzero_by_columns(edge_arr) # Calculate widths. In general if bottom == top this has width 1. # special case for bottom == top == NONE_VALUE (i.e. no edge at all), that has width 0. widths = np.where(top != NONE_VALUE, bottom - top + 1, 0) # Generate the indices of columns with widths larger than the specified min tip height. non_narrow_widths = widths >= self.min_tip_height column_indices_with_non_narrow_widths = np.flatnonzero(non_narrow_widths) if column_indices_with_non_narrow_widths.shape[0] == 0: # No non-narrow locations - sample not in picture? # Or wrong parameters for edge-finding, ... LOGGER.warning( "pin-tip detection: No non-narrow edges found - cannot locate pin tip" ) return SampleLocation( tip_x=None, tip_y=None, edge_bottom=bottom, edge_top=top ) # Choose our starting point - i.e. first column with non-narrow width for positive scan, last one for negative scan. if self.scan_direction == ScanDirections.FORWARD: start_column = int(column_indices_with_non_narrow_widths[0]) else: start_column = int(column_indices_with_non_narrow_widths[-1]) x = start_column # Move backwards to where there were no edges at all... while top[x] != NONE_VALUE: x += -self.scan_direction.value if x == -1 or x == width: # (In this case the sample is off the edge of the picture.) LOGGER.warning( "pin-tip detection: Pin tip may be outside image area - assuming at edge." ) break x += self.scan_direction.value # ...and forward one step. x is now at the tip. tip_x = x tip_y = int(round(0.5 * (top[x] + bottom[x]))) # clear edges to the left (right) of the tip. if self.scan_direction.value == 1: top[:x] = NONE_VALUE bottom[:x] = NONE_VALUE else: top[x + 1 :] = NONE_VALUE bottom[x + 1 :] = NONE_VALUE LOGGER.info( f"pin-tip detection: Successfully located pin tip at (x={tip_x}, y={tip_y})" ) return SampleLocation( tip_x=tip_x, tip_y=tip_y, edge_bottom=bottom, edge_top=top )