Source code for megafish.process

import numpy as np
import xarray as xr
from dask.diagnostics import ProgressBar
import dask.array as da

from .config import USE_GPU, show_resource
if USE_GPU:
    import cupy as cp
    from cupyx.scipy.ndimage import gaussian_filter
else:
    from scipy.ndimage import gaussian_filter


[docs] def projection(zarr_path, group, dim="z", method="max", footer="_mip"): """ Creates a projection of the image data along a specified dimension using the specified method. Args: zarr_path (str): Path to the Zarr file containing the image data. group (str): Group name in the Zarr file where the data is stored. dim (str, optional): The dimension along which to project (e.g., "z"); defaults to "z". method (str, optional): The projection method to use, either "max" for maximum projection or "min" for minimum projection; defaults to "max". footer (str, optional): Footer string to append to the output Zarr group name; defaults to "_mip". Returns: None: The function saves the projection result to a new Zarr group. """ # Open the Zarr dataset and extract the data array ds = xr.open_zarr(zarr_path, group=group + "/0") xar = ds["data"] # Retrieve the original chunk sizes and adjust for the projection dimension original_chunks = xar.chunks chunk_dict = {dim_name: chunk[0] for dim_name, chunk in zip(xar.dims, original_chunks)} chunk_dict.pop(dim) print("Making " + method + " " + dim + " projection: " + group + show_resource()) with ProgressBar(): # Apply the projection method along the specified dimension if method == "max": res = xar.max(dim=dim) elif method == "min": res = xar.min(dim=dim) else: raise ValueError("Unsupported method") # Re-chunk the result and save it to the new Zarr group res = res.chunk(chunks=chunk_dict) res.to_zarr(zarr_path, group=group + footer + "/0", mode="w")
[docs] def masking(zarr_path, group_int, group_mask, invert_mask=False, footer="_msk"): """ Applies a mask to the image data, setting masked regions to zero. Args: zarr_path (str): Path to the Zarr file containing the image data. group_int (str): Group name in the Zarr file for the intensity data to be masked. group_mask (str): Group name in the Zarr file for the mask data. invert_mask (bool, optional): If True, inverts the mask to apply the mask where the value is zero; defaults to False. footer (str, optional): Footer string to append to the output Zarr group name; defaults to "_msk". Returns: None: The function saves the masked result to a new Zarr group. """ # Open the Zarr datasets for the intensity data and the mask ds_int = xr.open_zarr(zarr_path, group=group_int + "/0") xar_int = ds_int["data"] ds_msk = xr.open_zarr(zarr_path, group=group_mask + "/0") xar_msk = ds_msk["data"] # Retrieve the original chunk sizes for the intensity data original_chunks = xar_int.chunks chunk_dict = {dim_name: chunk[0] for dim_name, chunk in zip(xar_int.dims, original_chunks)} # Expand the mask dimensions if the intensity data has 3 dimensions if len(xar_int.dims) == 3: xar_msk_broadcast = xar_msk.expand_dims( dim='cycle', axis=0).broadcast_like(xar_int) else: xar_msk_broadcast = xar_msk print("Masking: " + group_int + show_resource()) with ProgressBar(): # Apply the mask; invert if specified if invert_mask: res = xar_int.where(xar_msk_broadcast == 0, 0) else: res = xar_int.where(xar_msk_broadcast > 0, 0) # Re-chunk the result to match the original chunk sizes res = res.chunk(chunks=chunk_dict) # Save the masked data as a new Zarr group res.to_zarr(zarr_path, group=group_int + footer + "/0", mode="w")
[docs] def gaussian_blur(zarr_path, group, sigma, dtype="float32", footer="_gbr"): """ Applies a Gaussian blur to the image data. Args: zarr_path (str): Path to the Zarr file containing the image data. group (str): Group name in the Zarr file where the data is stored. sigma (float or tuple of float): The standard deviation(s) for the Gaussian kernel. dtype (str, optional): The data type of the output image; defaults to "float32". footer (str, optional): Footer string to append to the output Zarr group name; defaults to "_gbr". Returns: None: The function saves the blurred result to a new Zarr group. """ def _gaussian_filter(img, sigma): """ Applies a Gaussian filter to the input image. Args: img (numpy.ndarray): The input image. sigma (float or tuple of float): The standard deviation(s) for the Gaussian kernel. Returns: numpy.ndarray: The blurred image. """ img = img.astype(np.float32) # Handle 2D or 3D image shapes if len(img.shape) == 2: frm = img elif len(img.shape) == 3: frm = img.squeeze() else: raise ValueError("Unsupported shape") # Apply the Gaussian filter using GPU if available if USE_GPU: result = gaussian_filter(cp.asarray(frm), sigma).get() else: result = gaussian_filter(frm, sigma) # Reshape back if the input was 3D if len(img.shape) == 3: result = result[np.newaxis, :, :] return result # Open the Zarr dataset and extract the data array ds = xr.open_zarr(zarr_path, group=group + "/0") xar = ds["data"] # Retrieve the original chunk sizes and prepare chunking configuration original_chunks = xar.chunks chunk_dict = {dim_name: chunk[0] for dim_name, chunk in zip(xar.dims, original_chunks)} dims = xar.dims coords = xar.coords dar = da.from_zarr(zarr_path, component=group + "/0/data") # Define the depth for the map_overlap operation based on image dimensions if len(dims) == 2: depth = (sigma, sigma) elif len(dims) == 3: depth = (1, sigma, sigma) else: raise ValueError("Unsupported shape") # Apply the Gaussian filter across chunks with overlap res = dar.map_overlap(_gaussian_filter, depth=depth, sigma=sigma, dtype=dtype) print("Gaussian blur: " + group + show_resource()) with ProgressBar(): # Convert the result back to an xarray DataArray and set the chunks out = xr.DataArray(res, dims=dims, coords=coords) out = out.to_dataset(name="data") out = out.chunk(chunk_dict) # Save the blurred data as a new Zarr group out.to_zarr(zarr_path, group=group + footer + "/0", mode="w")
[docs] def binarization(zarr_path, group, threshold, reverse=False, dtype="uint8", footer="_bin"): """ Applies binarization to the image data based on a threshold. Args: zarr_path (str): Path to the Zarr file containing the image data. group (str): Group name in the Zarr file where the data is stored. threshold (float): The threshold value for binarization; values above the threshold are set to 1, others to 0. reverse (bool, optional): If True, reverses the binary values (0 becomes 1 and vice versa); defaults to False. dtype (str, optional): The data type of the output image; defaults to "uint8". footer (str, optional): Footer string to append to the output Zarr group name; defaults to "_bin". Returns: None: The function saves the binarized result to a new Zarr group. """ # Open the Zarr dataset and extract the data array ds = xr.open_zarr(zarr_path, group=group + "/0") xar = ds["data"] # Retrieve the original chunk sizes for the data original_chunks = xar.chunks chunk_dict = {dim_name: chunk[0] for dim_name, chunk in zip(xar.dims, original_chunks)} print("Binarization: " + group + show_resource()) with ProgressBar(): # Apply binarization based on the threshold res = (xar > threshold).astype(dtype) # Reverse binary values if specified if reverse: res = 1 - res # Re-chunk the result to match the original chunk sizes res = res.chunk(chunks=chunk_dict) # Save the binarized data as a new Zarr group res.to_zarr(zarr_path, group=group + footer + "/0", mode="w")