Source code for httomolib.misc.images

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------------
# Copyright 2022 Diamond Light Source Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------
# Created By  : <scientificsoftware@diamond.ac.uk>
# Created Date: 27/October/2022
# ---------------------------------------------------------------------------
"""Module for loading/saving images"""

import asyncio
from io import BytesIO
import os
import pathlib
from typing import List, Optional, Union
import httomolib

import numpy as np
from PIL import Image, ImageDraw, ImageFont
import aiofiles
import decimal

__all__ = [
    "save_to_images",
]

# number of asyncio workers to use to process saving images
# 40-ish seems to be the sweet spot, but it doesn't matter much
NUM_WORKERS = 40


[docs] def save_to_images( data: np.ndarray, out_dir: Union[str, os.PathLike], subfolder_name: str = "images", axis: int = 1, file_format: str = "tif", jpeg_quality: int = 95, offset: int = 0, watermark_vals: Optional[tuple] = None, asynchronous: bool = False, ): """ Saves data as 2D tif, png or jpeg images. The images will be saved using the same data type as the input data, i.e., data rescaling of the input data is not performed. If the data rescaling is needed, please rescale using the `rescale_to_int` function, also available in this library. Parameters ---------- data : np.ndarray Required input NumPy ndarray. out_dir : str The main output directory for images. subfolder_name : str, optional Subfolder name within the main output directory. Defaults to 'images'. axis : int, optional Specify the axis to use to slice the data (if `data` is a 3D array). file_format : str, optional Specify the file format to use, e.g. "png", "jpeg", or "tif". Defaults to "tif". jpeg_quality : int, optional Specify the quality of the jpeg image. offset: int, optional The offset to start file indexing from, e.g. if offset is 100, images will start at 00100.tif. This is used when executed in parallel context and only partial data is passed in this run. watermark_vals: tuple, optional A tuple with the values that will be written in the image as watermarks. The tuple length must be of the same size as len(data[axis]). asynchronous: bool, optional Perform write operations synchronously or asynchronously. """ bits_data_type = data.dtype.itemsize * 8 if file_format != "tif" and bits_data_type in [16, 32, 64]: raise ValueError( "In order to save the images in jpeg or png format, the data needs to be rescaled to 8 bit first, please use the 'rescale_to_int' function" ) if watermark_vals is not None and data.ndim > 2: # check the length of the tuple and the data slicing dim if len(watermark_vals) != len(data[axis]): raise ValueError( "The length of the watermark_vals tuple should be the same as the length of data's slicing axis" ) fill_val = np.min(data) stroke_val = np.max(data) if data.dtype in ["uint8", "uint16", "uint32"]: fill_val = "black" stroke_val = "white" # create the output folder subfolder_name = f"{subfolder_name}{str(bits_data_type)}bit_{str(file_format)}" path_to_images_dir = pathlib.Path(out_dir) / subfolder_name path_to_images_dir.mkdir(parents=True, exist_ok=True) queue: Optional[asyncio.Queue] = None if asynchronous: # async task queue - we push our tasks for every 2D image here queue = asyncio.Queue() data = np.nan_to_num(data, copy=False, nan=0.0, posinf=0, neginf=0) if data.ndim == 3: slice_dim_size = np.shape(data)[axis] for idx in range(slice_dim_size): filename = f"{idx + offset:05d}.{file_format}" filepath_name = os.path.join(path_to_images_dir, f"{filename}") # note: data.take call is far more time consuming if axis == 0: d = data[idx, :, :] elif axis == 1: d = data[:, idx, :] else: d = data[:, :, idx] if asynchronous: # give the actual saving to the background task assert queue is not None queue.put_nowait( ( d, jpeg_quality, "TIFF" if file_format == "tif" else file_format, filepath_name, ) ) else: Image.fromarray(d).save(filepath_name, quality=jpeg_quality) # after saving the image we check if the watermark needs to be added to that image if watermark_vals is not None: dec_points = __find_decimals(watermark_vals[idx]) string_to_format = "." + str(dec_points) + "f" _add_watermark( filepath_name, format(watermark_vals[idx], string_to_format), fill_val, stroke_val, ) else: filename = f"{1:05d}.{file_format}" filepath_name = os.path.join(path_to_images_dir, f"{filename}") if asynchronous: # give the actual saving to the background task assert queue is not None queue.put_nowait( ( data, jpeg_quality, "TIFF" if file_format == "tif" else file_format, filepath_name, ) ) else: Image.fromarray(data).save(filepath_name, quality=jpeg_quality) # after saving the image we check if the watermark needs to be added to that image if watermark_vals is not None: dec_points = __find_decimals(watermark_vals[0]) string_to_format = "." + str(dec_points) + "f" _add_watermark( filepath_name, format(watermark_vals[0], string_to_format), fill_val, stroke_val, ) if asynchronous: # Start the event loop to save the images - and wait until it's done assert queue is not None asyncio.run(_waiting_loop(queue))
def _add_watermark( filepath_name: str, watermark_str: str, fill_val: Union[str, float], stroke_val: Union[str, float], font_size_perc: int = 4, margin_perc: int = 3, ): """Adding two watermarks, bottom left and bottom right corners""" original_image = Image.open(filepath_name) draw = ImageDraw.Draw(original_image) image_width, image_height = original_image.size # the image can be a non-square one font_size_relative = int(image_height / 100 * font_size_perc) # relative to height margin_relative_w = int(image_width / 100 * margin_perc) margin_relative_h = int(image_height / 100 * margin_perc) # as pillow doesn't provide fonts and the default one cannot be scaled, # we need to ship the font with httomolib ourselves path_to_font = os.path.dirname(httomolib.__file__) font = ImageFont.truetype( path_to_font + "/misc" + "/DejaVuSans.ttf", font_size_relative ) text_height = font_size_relative text_width = draw.textlength(watermark_str, font) # Calculating positions position_left = (margin_relative_w, image_height - margin_relative_h - text_height) position_right = ( image_width - margin_relative_w - text_width, image_height - margin_relative_h - text_height, ) draw.text( position_left, watermark_str, fill=fill_val, stroke_fill=stroke_val, font=font, ) draw.text( position_right, watermark_str, fill=stroke_val, stroke_fill=fill_val, font=font, ) original_image.save(filepath_name) async def _save_single_image(data: np.ndarray, quality: float, format: str, path: str): # We need a binary buffer in order to use aiofiles to write - PIL does not have # async methods itself. # So we convert image into a bytes array synchronously first buffer = BytesIO() Image.fromarray(data).save(buffer, quality=quality, format=format) # and then we write the buffer asynchronously to a file async with aiofiles.open(path, "wb") as file: await file.write(buffer.getbuffer()) async def _image_save_worker(queue): """Asynchronous worker task that waits on the given queue for tasks to save images""" while True: # Get a "work item" out of the queue - this is a suspend point for the task data, quality, format, path = await queue.get() await _save_single_image(data, quality, format, path) # Notify the queue that the "work item" has been processed. queue.task_done() async def _waiting_loop(queue) -> None: """Async loop that assigns workers to process queue tasks and waits for them to finish""" # First, create worker tasks to process the queue concurrently. tasks: List[asyncio.Task] = [] for _ in range(NUM_WORKERS): task = asyncio.create_task(_image_save_worker(queue)) tasks.append(task) # Wait until the queue is fully processed. await queue.join() # Cancel our worker tasks. for task in tasks: task.cancel() # Wait until all worker tasks are cancelled. await asyncio.gather(*tasks, return_exceptions=True) def __find_decimals(value): return abs(decimal.Decimal(str(value)).as_tuple().exponent)