Source code for httomolib.misc.images

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------
# Copyright 2022 Diamond Light Source Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------
# Created By  : <scientificsoftware@diamond.ac.uk>
# Created Date: 27/October/2022
# ---------------------------------------------------------------------------
""" Module for loading/saving images """

import asyncio
from io import BytesIO
import os
import pathlib
from typing import List, Optional, Union
import httomolib

import numpy as np
from numpy import ndarray
from PIL import Image, ImageDraw, ImageFont
from skimage import exposure
import decimal

import aiofiles

__all__ = [
    "save_to_images",
]

# number of asyncio workers to use to process saving images
# 40-ish seems to be the sweet spot, but it doesn't matter much
NUM_WORKERS = 40


[docs]def save_to_images( data: ndarray, out_dir: Union[str, os.PathLike], subfolder_name: str = "images", axis: int = 1, file_format: str = "tif", jpeg_quality: int = 95, offset: int = 0, watermark_vals: Optional[tuple] = None, asynchronous: bool = False, ): """ Saves data as 2D images. Rescaling of input isn't performed, so if rescaling is needed please rescale the input data first (such as with the `rescale_to_int` function from the `httomolibgpu` package). Parameters ---------- data : np.ndarray Required input NumPy ndarray. out_dir : str The main output directory for images. subfolder_name : str, optional Subfolder name within the main output directory. Defaults to 'images'. axis : int, optional Specify the axis to use to slice the data (if `data` is a 3D array). file_format : str, optional Specify the file format to use, e.g. "png", "jpeg", or "tif". Defaults to "tif". jpeg_quality : int, optional Specify the quality of the jpeg image. offset: int, optional The offset to start file indexing from, e.g. if offset is 100, images will start at 00100.tif. This is used when executed in parallel context and only partial data is passed in this run. watermark_vals: tuple, optional A tuple with the values that will be written in the image as watermarks. The tuple length must be of the same size as len(data[axis]). asynchronous: bool, optional Perform write operations synchronously or asynchronously. """ if data.dtype not in [np.uint8, np.uint16, np.uint32]: print( "The input data is not in uint(8, 16 or 32 bit) data type and it will be rescaled to 8 uint bit" ) data = exposure.rescale_intensity(data, out_range=(0, 255)).astype(np.uint8) bits_data_type = data.dtype.itemsize * 8 if watermark_vals is not None and data.ndim > 2: # check the length of the tuple and the data slicing dim if len(watermark_vals) != len(data[axis]): raise ValueError( "The length of the watermark_vals tuple should be the same as the length of data's slicing axis" ) # create the output folder subfolder_name = f"{subfolder_name}{str(bits_data_type)}bit_{str(file_format)}" path_to_images_dir = pathlib.Path(out_dir) / subfolder_name path_to_images_dir.mkdir(parents=True, exist_ok=True) queue: Optional[asyncio.Queue] = None if asynchronous: # async task queue - we push our tasks for every 2D image here queue = asyncio.Queue() data = np.nan_to_num(data, copy=False, nan=0.0, posinf=0, neginf=0) if data.ndim == 3: slice_dim_size = np.shape(data)[axis] for idx in range(slice_dim_size): filename = f"{idx + offset:05d}.{file_format}" filepath_name = os.path.join(path_to_images_dir, f"{filename}") # note: data.take call is far more time consuming if axis == 0: d = data[idx, :, :] elif axis == 1: d = data[:, idx, :] else: d = data[:, :, idx] if asynchronous: # give the actual saving to the background task assert queue is not None queue.put_nowait( ( d, jpeg_quality, "TIFF" if file_format == "tif" else file_format, filepath_name, ) ) else: Image.fromarray(d).save(filepath_name, quality=jpeg_quality) # after saving the image we check if the watermark needs to be added to that image if watermark_vals is not None: dec_points = __find_decimals(watermark_vals[idx]) string_to_format = "." + str(dec_points) + "f" _add_watermark( filepath_name, format(watermark_vals[idx], string_to_format) ) else: filename = f"{1:05d}.{file_format}" filepath_name = os.path.join(path_to_images_dir, f"{filename}") if asynchronous: # give the actual saving to the background task assert queue is not None queue.put_nowait( ( data, jpeg_quality, "TIFF" if file_format == "tif" else file_format, filepath_name, ) ) else: Image.fromarray(data).save(filepath_name, quality=jpeg_quality) # after saving the image we check if the watermark needs to be added to that image if watermark_vals is not None: dec_points = __find_decimals(watermark_vals[0]) string_to_format = "." + str(dec_points) + "f" _add_watermark(filepath_name, format(watermark_vals[0], string_to_format)) if asynchronous: # Start the event loop to save the images - and wait until it's done assert queue is not None asyncio.run(_waiting_loop(queue))
def _add_watermark( filepath_name: str, watermark_str: str, font_size_perc: int = 4, margin_perc: int = 3, ): """Adding two watermarks, bottom left and bottom right corners""" original_image = Image.open(filepath_name) draw = ImageDraw.Draw(original_image) image_width, image_height = original_image.size # the image can be a non-square one font_size_relative = int(image_height / 100 * font_size_perc) # relative to height margin_relative_w = int(image_width / 100 * margin_perc) margin_relative_h = int(image_height / 100 * margin_perc) # as pillow doesn't provide fonts and the default one cannot be scaled, # we need to ship the font with httomolib ourselves path_to_font = os.path.dirname(httomolib.__file__) font = ImageFont.truetype( path_to_font + "/misc" + "/DejaVuSans.ttf", font_size_relative ) text_height = font_size_relative text_width = draw.textlength(watermark_str, font) # Calculating positions position_left = (margin_relative_w, image_height - margin_relative_h - text_height) position_right = ( image_width - margin_relative_w - text_width, image_height - margin_relative_h - text_height, ) draw.text( position_left, watermark_str, fill="white", stroke_fill="black", font=font, ) draw.text( position_right, watermark_str, fill="black", stroke_fill="white", font=font, ) original_image.save(filepath_name) async def _save_single_image(data: np.ndarray, quality: float, format: str, path: str): # We need a binary buffer in order to use aiofiles to write - PIL does not have # async methods itself. # So we convert image into a bytes array synchronously first buffer = BytesIO() Image.fromarray(data).save(buffer, quality=quality, format=format) # and then we write the buffer asynchronously to a file async with aiofiles.open(path, "wb") as file: await file.write(buffer.getbuffer()) async def _image_save_worker(queue): """Asynchronous worker task that waits on the given queue for tasks to save images""" while True: # Get a "work item" out of the queue - this is a suspend point for the task data, quality, format, path = await queue.get() await _save_single_image(data, quality, format, path) # Notify the queue that the "work item" has been processed. queue.task_done() async def _waiting_loop(queue) -> None: """Async loop that assigns workers to process queue tasks and waits for them to finish""" # First, create worker tasks to process the queue concurrently. tasks: List[asyncio.Task] = [] for _ in range(NUM_WORKERS): task = asyncio.create_task(_image_save_worker(queue)) tasks.append(task) # Wait until the queue is fully processed. await queue.join() # Cancel our worker tasks. for task in tasks: task.cancel() # Wait until all worker tasks are cancelled. await asyncio.gather(*tasks, return_exceptions=True) def __find_decimals(value): return abs(decimal.Decimal(str(value)).as_tuple().exponent)