Source code for cdl.algorithms.image

# Copyright (c) DataLab Platform Developers, BSD 3-Clause license, see LICENSE file.

"""
.. Image Processing Algorithms (see parent package :mod:`cdl.algorithms`)
"""

# pylint: disable=invalid-name  # Allows short reference names like x, y, ...

from __future__ import annotations

from typing import Literal

import cv2
import numpy as np
import scipy.ndimage as spi
import scipy.spatial as spt
from numpy import ma
from skimage import exposure, feature, measure, transform

# MARK: Level adjustment ---------------------------------------------------------------


[docs] def scale_data_to_min_max( data: np.ndarray, zmin: float | int, zmax: float | int ) -> np.ndarray: """Scale array `data` to fit [zmin, zmax] dynamic range Args: data: Input data zmin: Minimum value of output data zmax: Maximum value of output data Returns: Scaled data """ dmin = data.min() dmax = data.max() fdata = np.array(data, dtype=float) fdata -= dmin fdata *= float(zmax - zmin) / (dmax - dmin) fdata += float(zmin) return np.array(fdata, data.dtype)
[docs] def normalize( data: np.ndarray, parameter: Literal["maximum", "amplitude", "area", "energy", "rms"] = "maximum", ) -> np.ndarray: """Normalize input array to a given parameter. Args: data: Input data parameter: Normalization parameter (default: "maximum") Returns: Normalized array """ if parameter == "maximum": return scale_data_to_min_max(data, data.min() / data.max(), 1.0) if parameter == "amplitude": return scale_data_to_min_max(data, 0.0, 1.0) fdata = np.array(data, dtype=float) if parameter == "area": return fdata / fdata.sum() if parameter == "energy": return fdata / np.sqrt(np.sum(fdata * fdata.conjugate())) if parameter == "rms": return fdata / np.sqrt(np.mean(fdata * fdata.conjugate())) raise ValueError(f"Unsupported parameter {parameter}")
# MARK: Fourier analysis ---------------------------------------------------------------
[docs] def fft2d(z: np.ndarray, shift: bool = True) -> np.ndarray: """Compute FFT of complex array `z` Args: z: Input data shift: Shift zero frequency to center (default: True) Returns: FFT of input data """ z1 = np.fft.fft2(z) if shift: z1 = np.fft.fftshift(z1) return z1
[docs] def ifft2d(z: np.ndarray, shift: bool = True) -> np.ndarray: """Compute inverse FFT of complex array `z` Args: z: Input data shift: Shift zero frequency to center (default: True) Returns: Inverse FFT of input data """ if shift: z = np.fft.ifftshift(z) z1 = np.fft.ifft2(z) return z1
[docs] def magnitude_spectrum(z: np.ndarray, log_scale: bool = False) -> np.ndarray: """Compute magnitude spectrum of complex array `z` Args: z: Input data log_scale: Use log scale (default: False) Returns: Magnitude spectrum of input data """ z1 = np.abs(fft2d(z)) if log_scale: z1 = 20 * np.log10(z1.clip(1e-10)) return z1
[docs] def phase_spectrum(z: np.ndarray) -> np.ndarray: """Compute phase spectrum of complex array `z` Args: z: Input data Returns: Phase spectrum of input data (in degrees) """ return np.rad2deg(np.angle(fft2d(z)))
[docs] def psd(z: np.ndarray, log_scale: bool = False) -> np.ndarray: """Compute power spectral density of complex array `z` Args: z: Input data log_scale: Use log scale (default: False) Returns: Power spectral density of input data """ z1 = np.abs(fft2d(z)) ** 2 if log_scale: z1 = 10 * np.log10(z1.clip(1e-10)) return z1
# MARK: Binning ------------------------------------------------------------------------ BINNING_OPERATIONS = ("sum", "average", "median", "min", "max")
[docs] def binning( data: np.ndarray, sx: int, sy: int, operation: Literal["sum", "average", "median", "min", "max"], dtype=None, ) -> np.ndarray: """Perform image pixel binning Args: data: Input data sx: Binning size along x (number of pixels to bin together) sy: Binning size along y (number of pixels to bin together) operation: Binning operation dtype: Output data type (default: None, i.e. same as input) Returns: Binned data """ ny, nx = data.shape shape = (ny // sy, sy, nx // sx, sx) try: bdata = data[: ny - ny % sy, : nx - nx % sx].reshape(shape) except ValueError as err: raise ValueError("Binning is not a multiple of image dimensions") from err if operation == "sum": bdata = np.array(bdata, dtype=float).sum(axis=(-1, 1)) elif operation == "average": bdata = bdata.mean(axis=(-1, 1)) elif operation == "median": bdata = ma.median(bdata, axis=(-1, 1)) elif operation == "min": bdata = bdata.min(axis=(-1, 1)) elif operation == "max": bdata = bdata.max(axis=(-1, 1)) else: valid = ", ".join(BINNING_OPERATIONS) raise ValueError(f"Invalid operation {operation} (valid values: {valid})") return np.array(bdata, dtype=data.dtype if dtype is None else np.dtype(dtype))
# MARK: Background subtraction ---------------------------------------------------------
[docs] def flatfield( rawdata: np.ndarray, flatdata: np.ndarray, threshold: float | None = None ) -> np.ndarray: """Compute flat-field correction Args: rawdata: Raw data flatdata: Flat-field data threshold: Threshold for flat-field correction (default: None) Returns: Flat-field corrected data """ dtemp = np.array(rawdata, dtype=float, copy=True) * flatdata.mean() dunif = np.array(flatdata, dtype=float, copy=True) dunif[dunif == 0] = 1.0 dcorr_all = np.array(dtemp / dunif, dtype=rawdata.dtype) dcorr = np.array(rawdata, copy=True) dcorr[rawdata > threshold] = dcorr_all[rawdata > threshold] return dcorr
# MARK: Misc. analyses -----------------------------------------------------------------
[docs] def get_centroid_fourier(data: np.ndarray) -> tuple[float, float]: """Return image centroid using Fourier algorithm Args: data: Input data Returns: Centroid coordinates (row, col) """ # Fourier transform method as discussed by Weisshaar et al. # (http://www.mnd-umwelttechnik.fh-wiesbaden.de/pig/weisshaar_u5.pdf) rows, cols = data.shape if rows == 1 or cols == 1: return 0, 0 i = np.arange(0, rows).reshape(1, rows) sin_a = np.sin((i - 1) * 2 * np.pi / (rows - 1)).T cos_a = np.cos((i - 1) * 2 * np.pi / (rows - 1)).T j = np.arange(0, cols).reshape(cols, 1) sin_b = np.sin((j - 1) * 2 * np.pi / (cols - 1)).T cos_b = np.cos((j - 1) * 2 * np.pi / (cols - 1)).T a = (cos_a * data).sum() b = (sin_a * data).sum() c = (data * cos_b).sum() d = (data * sin_b).sum() rphi = (0 if b > 0 else 2 * np.pi) if a > 0 else np.pi cphi = (0 if d > 0 else 2 * np.pi) if c > 0 else np.pi if a * c == 0.0: return 0, 0 row = (np.arctan(b / a) + rphi) * (rows - 1) / (2 * np.pi) + 1 col = (np.arctan(d / c) + cphi) * (cols - 1) / (2 * np.pi) + 1 try: row = int(row) except ma.MaskError: row = np.nan try: col = int(col) except ma.MaskError: col = np.nan return row, col
[docs] def get_absolute_level(data: np.ndarray, level: float) -> float: """Return absolute level Args: data: Input data level: Relative level (0.0 to 1.0) Returns: Absolute level """ if not isinstance(level, float) or level < 0.0 or level > 1.0: raise ValueError("Level must be a float between 0. and 1.") return (float(np.nanmin(data)) + float(np.nanmax(data))) * level
[docs] def get_enclosing_circle( data: np.ndarray, level: float = 0.5 ) -> tuple[int, int, float]: """Return (x, y, radius) for the circle contour enclosing image values above threshold relative level (.5 means FWHM) Args: data: Input data level: Relative level (default: 0.5) Returns: A tuple (x, y, radius) Raises: ValueError: No contour was found """ data_th = data.copy() data_th[data <= get_absolute_level(data, level)] = 0.0 contours = measure.find_contours(data_th) model = measure.CircleModel() result = None max_radius = 1.0 for contour in contours: if model.estimate(contour): yc, xc, radius = model.params if radius > max_radius: result = (int(xc), int(yc), radius) max_radius = radius if result is None: raise ValueError("No contour was found") return result
[docs] def get_radial_profile( data: np.ndarray, center: tuple[int, int] ) -> tuple[np.ndarray, np.ndarray]: """Return radial profile of image data Args: data: Input data (2D array) center: Coordinates of the center of the profile (x, y) Returns: Radial profile (X, Y) where X is the distance from the center (1D array) and Y is the average value of pixels at this distance (1D array) """ y, x = np.indices((data.shape)) # Get the indices of pixels x0, y0 = center r = np.sqrt((x - x0) ** 2 + (y - y0) ** 2) # Calculate distance to the center r = r.astype(int) # Average over the same distance tbin = np.bincount(r.ravel(), data.ravel()) # Sum of pixel values at each distance nr = np.bincount(r.ravel()) # Number of pixels at each distance yprofile = tbin / nr # this is the half radial profile # Let's mirror it to get the full radial profile (the first element is the center) yprofile = np.concatenate((yprofile[::-1], yprofile[1:])) # The x axis is the distance from the center (0 is the center) xprofile = np.arange(len(yprofile)) - len(yprofile) // 2 return xprofile, yprofile
[docs] def distance_matrix(coords: list) -> np.ndarray: """Return distance matrix from coords Args: coords: List of coordinates Returns: Distance matrix """ return np.triu(spt.distance.cdist(coords, coords, "euclidean"))
[docs] def get_2d_peaks_coords( data: np.ndarray, size: int | None = None, level: float = 0.5 ) -> np.ndarray: """Detect peaks in image data, return coordinates. If neighborhoods size is None, default value is the highest value between 50 pixels and the 1/40th of the smallest image dimension. Detection threshold level is relative to difference between data maximum and minimum values. Args: data: Input data size: Neighborhood size (default: None) level: Relative level (default: 0.5) Returns: Coordinates of peaks """ if size is None: size = max(min(data.shape) // 40, 50) data_max = spi.maximum_filter(data, size) data_min = spi.minimum_filter(data, size) data_diff = data_max - data_min diff = (data_max - data_min) > get_absolute_level(data_diff, level) maxima = data == data_max maxima[diff == 0] = 0 labeled, _num_objects = spi.label(maxima) slices = spi.find_objects(labeled) coords = [] for dy, dx in slices: x_center = int(0.5 * (dx.start + dx.stop - 1)) y_center = int(0.5 * (dy.start + dy.stop - 1)) coords.append((x_center, y_center)) if len(coords) > 1: # Eventually removing duplicates dist = distance_matrix(coords) for index in reversed(np.unique(np.where((dist < size) & (dist > 0))[1])): coords.pop(index) return np.array(coords)
[docs] def get_contour_shapes( data: np.ndarray | ma.MaskedArray, shape: Literal["circle", "ellipse", "polygon"] = "ellipse", level: float = 0.5, ) -> np.ndarray: """Find iso-valued contours in a 2D array, above relative level (.5 means FWHM), then fit contours with shape ('ellipse' or 'circle') Args: data: Input data shape: Shape to fit. Default is 'ellipse' level: Relative level (default: 0.5) Returns: Coordinates of shapes """ # pylint: disable=too-many-locals assert shape in ("circle", "ellipse", "polygon") contours = measure.find_contours(data, level=get_absolute_level(data, level)) coords = [] for contour in contours: # `contour` is a (N, 2) array (rows, cols): we need to check if all those # coordinates are masked: if so, we skip this contour if isinstance(data, ma.MaskedArray) and np.all( data.mask[contour[:, 0].astype(int), contour[:, 1].astype(int)] ): continue if shape == "circle": model = measure.CircleModel() if model.estimate(contour): yc, xc, r = model.params if r <= 1.0: continue coords.append([xc, yc, r]) elif shape == "ellipse": model = measure.EllipseModel() if model.estimate(contour): yc, xc, b, a, theta = model.params if a <= 1.0 or b <= 1.0: continue coords.append([xc, yc, a, b, theta]) elif shape == "polygon": # `contour` is a (N, 2) array (rows, cols): we need to convert it # to a list of x, y coordinates flattened in a single list coords.append(contour[:, ::-1].flatten()) else: raise NotImplementedError(f"Invalid contour model {model}") if shape == "polygon": # `coords` is a list of arrays of shape (N, 2) where N is the number of points # that can vary from one array to another, so we need to padd with NaNs each # array to get a regular array: max_len = max(coord.shape[0] for coord in coords) arr = np.full((len(coords), max_len), np.nan) for i_row, coord in enumerate(coords): arr[i_row, : coord.shape[0]] = coord return arr return np.array(coords)
[docs] def get_hough_circle_peaks( data: np.ndarray, min_radius: float | None = None, max_radius: float | None = None, nb_radius: int | None = None, min_distance: int = 1, ) -> np.ndarray: """Detect peaks in image from circle Hough transform, return circle coordinates. Args: data: Input data min_radius: Minimum radius (default: None) max_radius: Maximum radius (default: None) nb_radius: Number of radii (default: None) min_distance: Minimum distance between circles (default: 1) Returns: Coordinates of circles """ assert min_radius is not None and max_radius is not None and max_radius > min_radius if nb_radius is None: nb_radius = max_radius - min_radius + 1 hough_radii = np.arange( min_radius, max_radius + 1, (max_radius - min_radius + 1) // nb_radius ) hough_res = transform.hough_circle(data, hough_radii) _accums, cx, cy, radii = transform.hough_circle_peaks( hough_res, hough_radii, min_xdistance=min_distance, min_ydistance=min_distance ) return np.vstack([cx, cy, radii]).T
# MARK: Blob detection ----------------------------------------------------------------- def __blobs_to_coords(blobs: np.ndarray) -> np.ndarray: """Convert blobs to coordinates Args: blobs: Blobs Returns: Coordinates """ cy, cx, radii = blobs.T coords = np.vstack([cx, cy, radii]).T return coords
[docs] def find_blobs_dog( data: np.ndarray, min_sigma: float = 1, max_sigma: float = 30, overlap: float = 0.5, threshold_rel: float = 0.2, exclude_border: bool = True, ) -> np.ndarray: """ Finds blobs in the given grayscale image using the Difference of Gaussians (DoG) method. Args: data: The grayscale input image. min_sigma: The minimum blob radius in pixels. max_sigma: The maximum blob radius in pixels. overlap: The minimum overlap between two blobs in pixels. For instance, if two blobs are detected with radii of 10 and 12 respectively, and the ``overlap`` is set to 0.5, then the area of the smaller blob will be ignored and only the area of the larger blob will be returned. threshold_rel: The absolute lower bound for scale space maxima. Local maxima smaller than ``threshold_rel`` are ignored. Reduce this to detect blobs with less intensities. exclude_border: If ``True``, exclude blobs from detection if they are too close to the border of the image. Border size is ``min_sigma``. Returns: Coordinates of blobs """ # Use scikit-image's Difference of Gaussians (DoG) method blobs = feature.blob_dog( data, min_sigma=min_sigma, max_sigma=max_sigma, overlap=overlap, threshold_rel=threshold_rel, exclude_border=exclude_border, ) return __blobs_to_coords(blobs)
[docs] def find_blobs_doh( data: np.ndarray, min_sigma: float = 1, max_sigma: float = 30, overlap: float = 0.5, log_scale: bool = False, threshold_rel: float = 0.2, ) -> np.ndarray: """ Finds blobs in the given grayscale image using the Determinant of Hessian (DoH) method. Args: data: The grayscale input image. min_sigma: The minimum blob radius in pixels. max_sigma: The maximum blob radius in pixels. overlap: The minimum overlap between two blobs in pixels. For instance, if two blobs are detected with radii of 10 and 12 respectively, and the ``overlap`` is set to 0.5, then the area of the smaller blob will be ignored and only the area of the larger blob will be returned. log_scale: If ``True``, the radius of each blob is returned as ``sqrt(sigma)`` for each detected blob. threshold_rel: The absolute lower bound for scale space maxima. Local maxima smaller than ``threshold_rel`` are ignored. Reduce this to detect blobs with less intensities. Returns: Coordinates of blobs """ # Use scikit-image's Determinant of Hessian (DoH) method to detect blobs blobs = feature.blob_doh( data, min_sigma=min_sigma, max_sigma=max_sigma, num_sigma=int(max_sigma - min_sigma + 1), threshold=None, threshold_rel=threshold_rel, overlap=overlap, log_scale=log_scale, ) return __blobs_to_coords(blobs)
[docs] def find_blobs_log( data: np.ndarray, min_sigma: float = 1, max_sigma: float = 30, overlap: float = 0.5, log_scale: bool = False, threshold_rel: float = 0.2, exclude_border: bool = True, ) -> np.ndarray: """Finds blobs in the given grayscale image using the Laplacian of Gaussian (LoG) method. Args: data: The grayscale input image. min_sigma: The minimum blob radius in pixels. max_sigma: The maximum blob radius in pixels. overlap: The minimum overlap between two blobs in pixels. For instance, if two blobs are detected with radii of 10 and 12 respectively, and the ``overlap`` is set to 0.5, then the area of the smaller blob will be ignored and only the area of the larger blob will be returned. log_scale: If ``True``, the radius of each blob is returned as ``sqrt(sigma)`` for each detected blob. threshold_rel: The absolute lower bound for scale space maxima. Local maxima smaller than ``threshold_rel`` are ignored. Reduce this to detect blobs with less intensities. exclude_border: If ``True``, exclude blobs from detection if they are too close to the border of the image. Border size is ``min_sigma``. Returns: Coordinates of blobs """ # Use scikit-image's Laplacian of Gaussian (LoG) method to detect blobs blobs = feature.blob_log( data, min_sigma=min_sigma, max_sigma=max_sigma, num_sigma=int(max_sigma - min_sigma + 1), threshold=None, threshold_rel=threshold_rel, overlap=overlap, log_scale=log_scale, exclude_border=exclude_border, ) return __blobs_to_coords(blobs)
[docs] def remove_overlapping_disks(coords: np.ndarray) -> np.ndarray: """Remove overlapping disks among coordinates Args: coords: The coordinates of the disks Returns: The coordinates of the disks with overlapping disks removed """ # Get the radii of each disk from the coordinates radii = coords[:, 2] # Calculate the distance between the center of each pair of disks dist = np.sqrt(np.sum((coords[:, None, :2] - coords[:, :2]) ** 2, axis=-1)) # Create a boolean mask where the distance between the centers # is less than the sum of the radii mask = dist < (radii[:, None] + radii) # Find the indices of overlapping disks overlapping_indices = np.argwhere(mask) # Remove the smaller disk from each overlapping pair for i, j in overlapping_indices: if i != j: if radii[i] < radii[j]: coords[i] = [np.nan, np.nan, np.nan] else: coords[j] = [np.nan, np.nan, np.nan] # Remove rows with NaN values coords = coords[~np.isnan(coords).any(axis=1)] return coords
[docs] def find_blobs_opencv( data: np.ndarray, min_threshold: float | None = None, max_threshold: float | None = None, min_repeatability: int | None = None, min_dist_between_blobs: float | None = None, filter_by_color: bool | None = None, blob_color: int | None = None, filter_by_area: bool | None = None, min_area: float | None = None, max_area: float | None = None, filter_by_circularity: bool | None = None, min_circularity: float | None = None, max_circularity: float | None = None, filter_by_inertia: bool | None = None, min_inertia_ratio: float | None = None, max_inertia_ratio: float | None = None, filter_by_convexity: bool | None = None, min_convexity: float | None = None, max_convexity: float | None = None, ) -> np.ndarray: """ Finds blobs in the given grayscale image using OpenCV's SimpleBlobDetector. Args: data: The grayscale input image. min_threshold: The minimum blob intensity. max_threshold: The maximum blob intensity. min_repeatability: The minimum number of times a blob is detected before it is reported. min_dist_between_blobs: The minimum distance between blobs. filter_by_color: If ``True``, blobs are filtered by color. blob_color: The color of the blobs to filter by. filter_by_area: If ``True``, blobs are filtered by area. min_area: The minimum blob area. max_area: The maximum blob area. filter_by_circularity: If ``True``, blobs are filtered by circularity. min_circularity: The minimum blob circularity. max_circularity: The maximum blob circularity. filter_by_inertia: If ``True``, blobs are filtered by inertia. min_inertia_ratio: The minimum blob inertia ratio. max_inertia_ratio: The maximum blob inertia ratio. filter_by_convexity: If ``True``, blobs are filtered by convexity. min_convexity: The minimum blob convexity. max_convexity: The maximum blob convexity. Returns: Coordinates of blobs """ params = cv2.SimpleBlobDetector_Params() if min_threshold is not None: params.minThreshold = min_threshold if max_threshold is not None: params.maxThreshold = max_threshold if min_repeatability is not None: params.minRepeatability = min_repeatability if min_dist_between_blobs is not None: params.minDistBetweenBlobs = min_dist_between_blobs if filter_by_color is not None: params.filterByColor = filter_by_color if blob_color is not None: params.blobColor = blob_color if filter_by_area is not None: params.filterByArea = filter_by_area if min_area is not None: params.minArea = min_area if max_area is not None: params.maxArea = max_area if filter_by_circularity is not None: params.filterByCircularity = filter_by_circularity if min_circularity is not None: params.minCircularity = min_circularity if max_circularity is not None: params.maxCircularity = max_circularity if filter_by_inertia is not None: params.filterByInertia = filter_by_inertia if min_inertia_ratio is not None: params.minInertiaRatio = min_inertia_ratio if max_inertia_ratio is not None: params.maxInertiaRatio = max_inertia_ratio if filter_by_convexity is not None: params.filterByConvexity = filter_by_convexity if min_convexity is not None: params.minConvexity = min_convexity if max_convexity is not None: params.maxConvexity = max_convexity detector = cv2.SimpleBlobDetector_create(params) image = exposure.rescale_intensity(data, out_range=np.uint8) keypoints = detector.detect(image) if keypoints: coords = cv2.KeyPoint_convert(keypoints) radii = 0.5 * np.array([kp.size for kp in keypoints]) blobs = np.vstack([coords[:, 1], coords[:, 0], radii]).T blobs = remove_overlapping_disks(blobs) else: blobs = np.array([]).reshape((0, 3)) return __blobs_to_coords(blobs)