#!/usr/bin/env python3# -*- coding: utf-8 -*-# ---------------------------------------------------------------------------# Copyright 2022 Diamond Light Source Ltd.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.# ---------------------------------------------------------------------------# Created By : <scientificsoftware@diamond.ac.uk># Created Date: 27/October/2022# ---------------------------------------------------------------------------""" Module for loading/saving images """importasynciofromioimportBytesIOimportosimportpathlibfromtypingimportList,Optional,UnionimporthttomolibimportnumpyasnpfromnumpyimportndarrayfromPILimportImage,ImageDraw,ImageFontfromskimageimportexposureimportdecimalimportaiofiles__all__=["save_to_images",]# number of asyncio workers to use to process saving images# 40-ish seems to be the sweet spot, but it doesn't matter muchNUM_WORKERS=40
[docs]defsave_to_images(data:ndarray,out_dir:Union[str,os.PathLike],subfolder_name:str="images",axis:int=1,file_format:str="tif",jpeg_quality:int=95,offset:int=0,watermark_vals:Optional[tuple]=None,asynchronous:bool=False,):""" Saves data as 2D images. Rescaling of input isn't performed, so if rescaling is needed please rescale the input data first (such as with the `rescale_to_int` function from the `httomolibgpu` package). Parameters ---------- data : np.ndarray Required input NumPy ndarray. out_dir : str The main output directory for images. subfolder_name : str, optional Subfolder name within the main output directory. Defaults to 'images'. axis : int, optional Specify the axis to use to slice the data (if `data` is a 3D array). file_format : str, optional Specify the file format to use, e.g. "png", "jpeg", or "tif". Defaults to "tif". jpeg_quality : int, optional Specify the quality of the jpeg image. offset: int, optional The offset to start file indexing from, e.g. if offset is 100, images will start at 00100.tif. This is used when executed in parallel context and only partial data is passed in this run. watermark_vals: tuple, optional A tuple with the values that will be written in the image as watermarks. The tuple length must be of the same size as len(data[axis]). asynchronous: bool, optional Perform write operations synchronously or asynchronously. """ifdata.dtypenotin[np.uint8,np.uint16,np.uint32]:print("The input data is not in uint(8, 16 or 32 bit) data type and it will be rescaled to 8 uint bit")data=exposure.rescale_intensity(data,out_range=(0,255)).astype(np.uint8)bits_data_type=data.dtype.itemsize*8ifwatermark_valsisnotNoneanddata.ndim>2:# check the length of the tuple and the data slicing dimiflen(watermark_vals)!=len(data[axis]):raiseValueError("The length of the watermark_vals tuple should be the same as the length of data's slicing axis")# create the output foldersubfolder_name=f"{subfolder_name}{str(bits_data_type)}bit_{str(file_format)}"path_to_images_dir=pathlib.Path(out_dir)/subfolder_namepath_to_images_dir.mkdir(parents=True,exist_ok=True)queue:Optional[asyncio.Queue]=Noneifasynchronous:# async task queue - we push our tasks for every 2D image herequeue=asyncio.Queue()data=np.nan_to_num(data,copy=False,nan=0.0,posinf=0,neginf=0)ifdata.ndim==3:slice_dim_size=np.shape(data)[axis]foridxinrange(slice_dim_size):filename=f"{idx+offset:05d}.{file_format}"filepath_name=os.path.join(path_to_images_dir,f"{filename}")# note: data.take call is far more time consumingifaxis==0:d=data[idx,:,:]elifaxis==1:d=data[:,idx,:]else:d=data[:,:,idx]ifasynchronous:# give the actual saving to the background taskassertqueueisnotNonequeue.put_nowait((d,jpeg_quality,"TIFF"iffile_format=="tif"elsefile_format,filepath_name,))else:Image.fromarray(d).save(filepath_name,quality=jpeg_quality)# after saving the image we check if the watermark needs to be added to that imageifwatermark_valsisnotNone:dec_points=__find_decimals(watermark_vals[idx])string_to_format="."+str(dec_points)+"f"_add_watermark(filepath_name,format(watermark_vals[idx],string_to_format))else:filename=f"{1:05d}.{file_format}"filepath_name=os.path.join(path_to_images_dir,f"{filename}")ifasynchronous:# give the actual saving to the background taskassertqueueisnotNonequeue.put_nowait((data,jpeg_quality,"TIFF"iffile_format=="tif"elsefile_format,filepath_name,))else:Image.fromarray(data).save(filepath_name,quality=jpeg_quality)# after saving the image we check if the watermark needs to be added to that imageifwatermark_valsisnotNone:dec_points=__find_decimals(watermark_vals[0])string_to_format="."+str(dec_points)+"f"_add_watermark(filepath_name,format(watermark_vals[0],string_to_format))ifasynchronous:# Start the event loop to save the images - and wait until it's doneassertqueueisnotNoneasyncio.run(_waiting_loop(queue))
def_add_watermark(filepath_name:str,watermark_str:str,font_size_perc:int=4,margin_perc:int=3,):"""Adding two watermarks, bottom left and bottom right corners"""original_image=Image.open(filepath_name)draw=ImageDraw.Draw(original_image)image_width,image_height=original_image.size# the image can be a non-square onefont_size_relative=int(image_height/100*font_size_perc)# relative to heightmargin_relative_w=int(image_width/100*margin_perc)margin_relative_h=int(image_height/100*margin_perc)# as pillow doesn't provide fonts and the default one cannot be scaled,# we need to ship the font with httomolib ourselvespath_to_font=os.path.dirname(httomolib.__file__)font=ImageFont.truetype(path_to_font+"/misc"+"/DejaVuSans.ttf",font_size_relative)text_height=font_size_relativetext_width=draw.textlength(watermark_str,font)# Calculating positionsposition_left=(margin_relative_w,image_height-margin_relative_h-text_height)position_right=(image_width-margin_relative_w-text_width,image_height-margin_relative_h-text_height,)draw.text(position_left,watermark_str,fill="white",stroke_fill="black",font=font,)draw.text(position_right,watermark_str,fill="black",stroke_fill="white",font=font,)original_image.save(filepath_name)asyncdef_save_single_image(data:np.ndarray,quality:float,format:str,path:str):# We need a binary buffer in order to use aiofiles to write - PIL does not have# async methods itself.# So we convert image into a bytes array synchronously firstbuffer=BytesIO()Image.fromarray(data).save(buffer,quality=quality,format=format)# and then we write the buffer asynchronously to a fileasyncwithaiofiles.open(path,"wb")asfile:awaitfile.write(buffer.getbuffer())asyncdef_image_save_worker(queue):"""Asynchronous worker task that waits on the given queue for tasks to save images"""whileTrue:# Get a "work item" out of the queue - this is a suspend point for the taskdata,quality,format,path=awaitqueue.get()await_save_single_image(data,quality,format,path)# Notify the queue that the "work item" has been processed.queue.task_done()asyncdef_waiting_loop(queue)->None:"""Async loop that assigns workers to process queue tasks and waits for them to finish"""# First, create worker tasks to process the queue concurrently.tasks:List[asyncio.Task]=[]for_inrange(NUM_WORKERS):task=asyncio.create_task(_image_save_worker(queue))tasks.append(task)# Wait until the queue is fully processed.awaitqueue.join()# Cancel our worker tasks.fortaskintasks:task.cancel()# Wait until all worker tasks are cancelled.awaitasyncio.gather(*tasks,return_exceptions=True)def__find_decimals(value):returnabs(decimal.Decimal(str(value)).as_tuple().exponent)