# Copyright (c) DataLab Platform Developers, BSD 3-Clause license, see LICENSE file.
"""
.. Signal processor object (see parent package :mod:`cdl.core.gui.processor`)
"""
# pylint: disable=invalid-name # Allows short reference names like x, y, ...
from __future__ import annotations
import re
from collections.abc import Callable
import guidata.dataset as gds
import numpy as np
from guidata.qthelpers import exec_dialog
import cdl.computation.base as cpb
import cdl.computation.signal as cps
import cdl.param
from cdl.config import Conf, _
from cdl.core.gui.processor.base import BaseProcessor
from cdl.core.model.base import ResultProperties, ResultShape
from cdl.core.model.signal import ROI1DParam, SignalObj, SignalROI, create_signal
from cdl.utils.qthelpers import qt_try_except
from cdl.widgets import fitdialog, signalbaseline, signalpeak
[docs]
class SignalProcessor(BaseProcessor[SignalROI]):
"""Object handling signal processing: operations, processing, analysis"""
# pylint: disable=duplicate-code
[docs]
@qt_try_except()
def compute_sum(self) -> None:
"""Compute sum with :py:func:`cdl.computation.signal.compute_addition`"""
self.compute_n1("Σ", cps.compute_addition, title=_("Sum"))
[docs]
@qt_try_except()
def compute_addition_constant(self, param: cpb.ConstantParam | None = None) -> None:
"""Compute sum with a constant
with :py:func:`cdl.computation.signal.compute_addition_constant`"""
self.compute_11(
cps.compute_addition_constant,
param,
paramclass=cpb.ConstantParam,
title=_("Sum with constant"),
)
[docs]
@qt_try_except()
def compute_average(self) -> None:
"""Compute average with :py:func:`cdl.computation.signal.compute_addition`
and divide by the number of signals"""
def func_objs(new_obj: SignalObj, old_objs: list[SignalObj]) -> None:
"""Finalize average computation"""
new_obj.data = new_obj.data / float(len(old_objs))
if new_obj.dy is not None:
new_obj.dy = new_obj.dy / float(len(old_objs))
self.compute_n1(
"μ", cps.compute_addition, func_objs=func_objs, title=_("Average")
)
[docs]
@qt_try_except()
def compute_product(self) -> None:
"""Compute product with :py:func:`cdl.computation.signal.compute_product`"""
self.compute_n1("Π", cps.compute_product, title=_("Product"))
[docs]
@qt_try_except()
def compute_product_constant(self, param: cpb.ConstantParam | None = None) -> None:
"""Compute product with a constant
with :py:func:`cdl.computation.signal.compute_product_constant`"""
self.compute_11(
cps.compute_product_constant,
param,
paramclass=cpb.ConstantParam,
title=_("Product with constant"),
)
[docs]
@qt_try_except()
def compute_swap_axes(self) -> None:
"""Swap data axes with :py:func:`cdl.computation.signal.compute_swap_axes`"""
self.compute_11(cps.compute_swap_axes, title=_("Swap axes"))
[docs]
@qt_try_except()
def compute_abs(self) -> None:
"""Compute absolute value with :py:func:`cdl.computation.signal.compute_abs`"""
self.compute_11(cps.compute_abs, title=_("Absolute value"))
[docs]
@qt_try_except()
def compute_re(self) -> None:
"""Compute real part with :py:func:`cdl.computation.signal.compute_re`"""
self.compute_11(cps.compute_re, title=_("Real part"))
[docs]
@qt_try_except()
def compute_im(self) -> None:
"""Compute imaginary part with :py:func:`cdl.computation.signal.compute_im`"""
self.compute_11(cps.compute_im, title=_("Imaginary part"))
[docs]
@qt_try_except()
def compute_astype(self, param: cdl.param.DataTypeSParam | None = None) -> None:
"""Convert data type with :py:func:`cdl.computation.signal.compute_astype`"""
self.compute_11(
cps.compute_astype, param, cps.DataTypeSParam, title=_("Convert data type")
)
[docs]
@qt_try_except()
def compute_log10(self) -> None:
"""Compute Log10 with :py:func:`cdl.computation.signal.compute_log10`"""
self.compute_11(cps.compute_log10, title="Log10")
[docs]
@qt_try_except()
def compute_exp(self) -> None:
"""Compute Log10 with :py:func:`cdl.computation.signal.compute_exp`"""
self.compute_11(cps.compute_exp, title=_("Exponential"))
[docs]
@qt_try_except()
def compute_sqrt(self) -> None:
"""Compute square root with :py:func:`cdl.computation.signal.compute_sqrt`"""
self.compute_11(cps.compute_sqrt, title=_("Square root"))
[docs]
@qt_try_except()
def compute_power(self, param: cps.PowerParam | None = None) -> None:
"""Compute power with :py:func:`cdl.computation.signal.compute_power`"""
self.compute_11(cps.compute_power, param, cps.PowerParam, title="Power")
[docs]
@qt_try_except()
def compute_arithmetic(
self, obj2: SignalObj | None = None, param: cpb.ArithmeticParam | None = None
) -> None:
"""Compute arithmetic operation between two signals
with :py:func:`cdl.computation.signal.compute_arithmetic`"""
self.compute_n1n(
obj2,
_("signal to operate with"),
cps.compute_arithmetic,
param=param,
paramclass=cpb.ArithmeticParam,
title=_("Arithmetic"),
)
[docs]
@qt_try_except()
def compute_difference(
self, obj2: SignalObj | list[SignalObj] | None = None
) -> None:
"""Compute difference between two signals
with :py:func:`cdl.computation.signal.compute_difference`"""
self.compute_n1n(
obj2,
_("signal to subtract"),
cps.compute_difference,
title=_("Difference"),
)
[docs]
@qt_try_except()
def compute_difference_constant(
self, param: cpb.ConstantParam | None = None
) -> None:
"""Compute difference with a constant
with :py:func:`cdl.computation.signal.compute_difference_constant`"""
self.compute_11(
cps.compute_difference_constant,
param,
paramclass=cpb.ConstantParam,
title=_("Difference with constant"),
edit=True,
)
[docs]
@qt_try_except()
def compute_quadratic_difference(
self, obj2: SignalObj | list[SignalObj] | None = None
) -> None:
"""Compute quadratic difference between two signals
with :py:func:`cdl.computation.signal.compute_quadratic_difference`"""
self.compute_n1n(
obj2,
_("signal to subtract"),
cps.compute_quadratic_difference,
title=_("Quadratic difference"),
)
[docs]
@qt_try_except()
def compute_division(self, obj2: SignalObj | list[SignalObj] | None = None) -> None:
"""Compute division between two signals
with :py:func:`cdl.computation.signal.compute_division`"""
self.compute_n1n(
obj2,
_("divider"),
cps.compute_division,
title=_("Division"),
)
[docs]
@qt_try_except()
def compute_division_constant(self, param: cpb.ConstantParam | None = None) -> None:
"""Compute division by a constant
with :py:func:`cdl.computation.signal.compute_division_constant`"""
self.compute_11(
cps.compute_division_constant,
param,
paramclass=cpb.ConstantParam,
title=_("Division by constant"),
)
[docs]
@qt_try_except()
def compute_peak_detection(
self, param: cdl.param.PeakDetectionParam | None = None
) -> None:
"""Detect peaks from data
with :py:func:`cdl.computation.signal.compute_peak_detection`"""
obj = self.panel.objview.get_sel_objects(include_groups=True)[0]
edit, param = self.init_param(
param, cps.PeakDetectionParam, _("Peak detection")
)
if edit:
dlg = signalpeak.SignalPeakDetectionDialog(obj, parent=self.panel.parent())
if exec_dialog(dlg):
param.threshold = int(dlg.get_threshold() * 100)
param.min_dist = dlg.get_min_dist()
else:
return
self.compute_11(cps.compute_peak_detection, param)
[docs]
@qt_try_except()
def compute_reverse_x(self) -> None:
"""Reverse X axis with :py:func:`cdl.computation.signal.compute_reverse_x`"""
self.compute_11(cps.compute_reverse_x, title=_("Reverse X axis"))
# ------Signal Processing
[docs]
@qt_try_except()
def compute_normalize(self, param: cdl.param.NormalizeParam | None = None) -> None:
"""Normalize data with :py:func:`cdl.computation.signal.compute_normalize`"""
self.compute_11(
cps.compute_normalize, param, cps.NormalizeParam, title=_("Normalize")
)
[docs]
@qt_try_except()
def compute_derivative(self) -> None:
"""Compute derivative
with :py:func:`cdl.computation.signal.compute_derivative`"""
self.compute_11(cps.compute_derivative, title=_("Derivative"))
[docs]
@qt_try_except()
def compute_integral(self) -> None:
"""Compute integral with :py:func:`cdl.computation.signal.compute_integral`"""
self.compute_11(cps.compute_integral, title=_("Integral"))
[docs]
@qt_try_except()
def compute_calibration(
self, param: cdl.param.XYCalibrateParam | None = None
) -> None:
"""Compute data linear calibration
with :py:func:`cdl.computation.signal.compute_calibration`"""
self.compute_11(
cps.compute_calibration,
param,
cps.XYCalibrateParam,
title=_("Linear calibration"),
comment="y = a.x + b",
)
[docs]
@qt_try_except()
def compute_clip(self, param: cpb.ClipParam | None = None) -> None:
"""Compute maximum data clipping
with :py:func:`cdl.computation.signal.compute_clip`"""
self.compute_11(cps.compute_clip, param, cpb.ClipParam, title=_("Clipping"))
[docs]
@qt_try_except()
def compute_offset_correction(self, param: ROI1DParam | None = None) -> None:
"""Compute offset correction
with :py:func:`cdl.computation.signal.compute_offset_correction`"""
obj = self.panel.objview.get_sel_objects(include_groups=True)[0]
if param is None:
dlg = signalbaseline.SignalBaselineDialog(obj, parent=self.panel.parent())
if exec_dialog(dlg):
param = ROI1DParam()
param.xmin, param.xmax = dlg.get_x_range()
else:
return
self.compute_11(cps.compute_offset_correction, param)
[docs]
@qt_try_except()
def compute_gaussian_filter(self, param: cpb.GaussianParam | None = None) -> None:
"""Compute gaussian filter
with :py:func:`cdl.computation.signal.compute_gaussian_filter`"""
self.compute_11(
cps.compute_gaussian_filter,
param,
cpb.GaussianParam,
title=_("Gaussian filter"),
)
[docs]
@qt_try_except()
def compute_moving_average(
self, param: cpb.MovingAverageParam | None = None
) -> None:
"""Compute moving average
with :py:func:`cdl.computation.signal.compute_moving_average`"""
self.compute_11(
cps.compute_moving_average,
param,
cpb.MovingAverageParam,
title=_("Moving average"),
)
[docs]
@qt_try_except()
def compute_wiener(self) -> None:
"""Compute Wiener filter
with :py:func:`cdl.computation.signal.compute_wiener`"""
self.compute_11(cps.compute_wiener, title=_("Wiener filter"))
def __freq_filter(
self,
param: cdl.param.LowPassFilterParam
| cdl.param.HighPassFilterParam
| cdl.param.BandPassFilterParam
| cdl.param.BandStopFilterParam,
paramclass: type[
cdl.param.LowPassFilterParam
| cdl.param.HighPassFilterParam
| cdl.param.BandPassFilterParam
| cdl.param.BandStopFilterParam
],
title: str,
) -> None:
"""Compute frequency filter"""
edit, param = self.init_param(param, paramclass, title)
if edit:
obj = self.panel.objview.get_sel_objects(include_groups=True)[0]
param.update_from_signal(obj)
self.compute_11(cps.compute_filter, param, title=title, edit=edit)
[docs]
@qt_try_except()
def compute_lowpass(
self, param: cdl.param.LowPassFilterParam | None = None
) -> None:
"""Compute high-pass filter
with :py:func:`cdl.computation.signal.compute_filter`"""
self.__freq_filter(param, cdl.param.LowPassFilterParam, _("Low-pass filter"))
[docs]
@qt_try_except()
def compute_highpass(
self, param: cdl.param.HighPassFilterParam | None = None
) -> None:
"""Compute high-pass filter
with :py:func:`cdl.computation.signal.compute_filter`"""
self.__freq_filter(param, cdl.param.HighPassFilterParam, _("High-pass filter"))
[docs]
@qt_try_except()
def compute_bandpass(
self, param: cdl.param.BandPassFilterParam | None = None
) -> None:
"""Compute band-pass filter
with :py:func:`cdl.computation.signal.compute_filter`"""
self.__freq_filter(param, cdl.param.BandPassFilterParam, _("Band-pass filter"))
[docs]
@qt_try_except()
def compute_bandstop(
self, param: cdl.param.BandStopFilterParam | None = None
) -> None:
"""Compute band-stop filter
with :py:func:`cdl.computation.signal.compute_filter`"""
self.__freq_filter(param, cdl.param.BandStopFilterParam, _("Band-stop filter"))
[docs]
@qt_try_except()
def compute_fft(self, param: cdl.param.FFTParam | None = None) -> None:
"""Compute FFT with :py:func:`cdl.computation.signal.compute_fft`"""
if param is None:
param = cpb.FFTParam.create(shift=Conf.proc.fft_shift_enabled.get())
self.compute_11(cps.compute_fft, param, title=_("FFT"), edit=False)
[docs]
@qt_try_except()
def compute_ifft(self, param: cdl.param.FFTParam | None = None) -> None:
"""Compute iFFT with :py:func:`cdl.computation.signal.compute_ifft`"""
if param is None:
param = cpb.FFTParam.create(shift=Conf.proc.fft_shift_enabled.get())
self.compute_11(cps.compute_ifft, param, title=_("iFFT"), edit=False)
[docs]
@qt_try_except()
def compute_magnitude_spectrum(
self, param: cdl.param.SpectrumParam | None = None
) -> None:
"""Compute magnitude spectrum
with :py:func:`cdl.computation.signal.compute_magnitude_spectrum`"""
self.compute_11(
cps.compute_magnitude_spectrum,
param,
cdl.param.SpectrumParam,
title=_("Magnitude spectrum"),
)
[docs]
@qt_try_except()
def compute_phase_spectrum(self) -> None:
"""Compute phase spectrum
with :py:func:`cdl.computation.signal.compute_phase_spectrum`"""
self.compute_11(cps.compute_phase_spectrum, title=_("Phase spectrum"))
[docs]
@qt_try_except()
def compute_psd(self, param: cdl.param.SpectrumParam | None = None) -> None:
"""Compute power spectral density
with :py:func:`cdl.computation.signal.compute_psd`"""
self.compute_11(cps.compute_psd, param, cdl.param.SpectrumParam, title=_("PSD"))
[docs]
@qt_try_except()
def compute_interpolation(
self,
obj2: SignalObj | None = None,
param: cdl.param.InterpolationParam | None = None,
):
"""Compute interpolation
with :py:func:`cdl.computation.signal.compute_interpolation`"""
self.compute_n1n(
obj2,
_("signal for X values"),
cps.compute_interpolation,
param,
cps.InterpolationParam,
title=_("Interpolation"),
)
[docs]
@qt_try_except()
def compute_resampling(self, param: cdl.param.ResamplingParam | None = None):
"""Compute resampling
with :py:func:`cdl.computation.signal.compute_resampling`"""
edit, param = self.init_param(param, cps.ResamplingParam, _("Resampling"))
if edit:
obj = self.panel.objview.get_sel_objects(include_groups=True)[0]
if param.xmin is None:
param.xmin = obj.x[0]
if param.xmax is None:
param.xmax = obj.x[-1]
if param.dx is None:
param.dx = obj.x[1] - obj.x[0]
if param.nbpts is None:
param.nbpts = len(obj.x)
self.compute_11(
cps.compute_resampling,
param,
cps.ResamplingParam,
title=_("Resampling"),
edit=edit,
)
[docs]
@qt_try_except()
def compute_detrending(self, param: cdl.param.DetrendingParam | None = None):
"""Compute detrending
with :py:func:`cdl.computation.signal.compute_detrending`"""
self.compute_11(
cps.compute_detrending,
param,
cps.DetrendingParam,
title=_("Detrending"),
)
[docs]
@qt_try_except()
def compute_convolution(self, obj2: SignalObj | None = None) -> None:
"""Compute convolution
with :py:func:`cdl.computation.signal.compute_convolution`"""
self.compute_n1n(
obj2,
_("signal to convolve with"),
cps.compute_convolution,
title=_("Convolution"),
)
[docs]
@qt_try_except()
def compute_windowing(self, param: cdl.param.WindowingParam | None = None) -> None:
"""Compute windowing
with :py:func:`cdl.computation.signal.compute_windowing`"""
self.compute_11(
cps.compute_windowing,
param,
cdl.param.WindowingParam,
title=_("Windowing"),
)
[docs]
@qt_try_except()
def compute_polyfit(
self, param: cdl.param.PolynomialFitParam | None = None
) -> None:
"""Compute polynomial fitting curve"""
txt = _("Polynomial fit")
edit, param = self.init_param(param, cps.PolynomialFitParam, txt)
if not edit or param.edit(self.panel.parent()):
dlgfunc = fitdialog.polynomialfit
def polynomialfit(x, y, parent=None):
"""Polynomial fit dialog function"""
return dlgfunc(x, y, param.degree, parent=parent)
self.compute_fit(txt, polynomialfit)
def __row_compute_fit(
self, obj: SignalObj, name: str, fitdlgfunc: Callable
) -> None:
"""Curve fitting computing sub-method"""
output = fitdlgfunc(obj.x, obj.y, parent=self.panel.parent())
if output is not None:
y, params = output
params: list[fitdialog.FitParam]
pvalues = {}
for param in params:
if re.match(r"[\S\_]*\d{2}$", param.name):
shname = param.name[:-2]
value = pvalues.get(shname, np.array([]))
pvalues[shname] = np.array(list(value) + [param.value])
else:
pvalues[param.name] = param.value
# Creating new signal
metadata = {fitdlgfunc.__name__: pvalues}
signal = create_signal(f"{name}({obj.title})", obj.x, y, metadata=metadata)
# Creating new plot item
self.panel.add_object(signal)
[docs]
@qt_try_except()
def compute_fit(self, title: str, fitdlgfunc: Callable) -> None:
"""Compute fitting curve using an interactive dialog
Args:
title: Title of the dialog
fitdlgfunc: Fitting dialog function
"""
for obj in self.panel.objview.get_sel_objects():
self.__row_compute_fit(obj, title, fitdlgfunc)
[docs]
@qt_try_except()
def compute_multigaussianfit(self) -> None:
"""Compute multi-Gaussian fitting curve using an interactive dialog"""
fitdlgfunc = fitdialog.multigaussianfit
for obj in self.panel.objview.get_sel_objects():
dlg = signalpeak.SignalPeakDetectionDialog(obj, parent=self.panel)
if exec_dialog(dlg):
# Computing x, y
peaks = dlg.get_peak_indices()
def multigaussianfit(x, y, parent=None):
"""Multi-Gaussian fit dialog function"""
# pylint: disable=cell-var-from-loop
return fitdlgfunc(x, y, peaks, parent=parent)
self.__row_compute_fit(obj, _("Multi-Gaussian fit"), multigaussianfit)
@qt_try_except()
def _extract_multiple_roi_in_single_object(self, group: gds.DataSetGroup) -> None:
"""Extract multiple Regions Of Interest (ROIs) from data in a single object"""
self.compute_11(cps.extract_multiple_roi, group, title=_("Extract ROI"))
@qt_try_except()
def _extract_each_roi_in_separate_object(self, group: gds.DataSetGroup) -> None:
"""Extract each single Region Of Interest (ROI) from data in a separate
object (keep the ROI in the case of a circular ROI, for example)"""
self.compute_1n(cps.extract_single_roi, group.datasets, "ROI", edit=False)
# ------Signal Analysis
[docs]
@qt_try_except()
def compute_fwhm(
self, param: cdl.param.FWHMParam | None = None
) -> dict[str, ResultShape]:
"""Compute FWHM with :py:func:`cdl.computation.signal.compute_fwhm`"""
return self.compute_10(cps.compute_fwhm, param, cps.FWHMParam, title=_("FWHM"))
[docs]
@qt_try_except()
def compute_fw1e2(self) -> dict[str, ResultShape]:
"""Compute FW at 1/e² with :py:func:`cdl.computation.signal.compute_fw1e2`"""
return self.compute_10(cps.compute_fw1e2, title=_("FW") + "1/e²")
[docs]
@qt_try_except()
def compute_stats(self) -> dict[str, ResultProperties]:
"""Compute data statistics
with :py:func:`cdl.computation.signal.compute_stats`"""
return self.compute_10(cps.compute_stats, title=_("Statistics"))
[docs]
@qt_try_except()
def compute_histogram(
self, param: cdl.param.HistogramParam | None = None
) -> dict[str, ResultShape]:
"""Compute histogram
with :py:func:`cdl.computation.signal.compute_histogram`"""
return self.compute_11(
cps.compute_histogram, param, cps.HistogramParam, title=_("Histogram")
)
[docs]
@qt_try_except()
def compute_contrast(self) -> dict[str, ResultProperties]:
"""Compute contrast with :py:func:`cdl.computation.signal.compute_contrast`"""
return self.compute_10(cps.compute_contrast, title=_("Contrast"))
[docs]
@qt_try_except()
def compute_x_at_minmax(self) -> dict[str, ResultProperties]:
"""Compute x at min/max
with :py:func:`cdl.computation.signal.compute_x_at_minmax`"""
return self.compute_10(cps.compute_x_at_minmax, title="X @ min,max")
[docs]
@qt_try_except()
def compute_sampling_rate_period(self) -> dict[str, ResultProperties]:
"""Compute sampling rate and period (mean and std)
with :py:func:`cdl.computation.signal.compute_sampling_rate_period`"""
return self.compute_10(
cps.compute_sampling_rate_period, title=_("Sampling rate and period")
)
[docs]
@qt_try_except()
def compute_bandwidth_3db(self) -> None:
"""Compute bandwidth at -3dB
with :py:func:`cdl.computation.signal.compute_bandwidth_3db`"""
self.compute_10(cps.compute_bandwidth_3db, title=_("Bandwidth"))
[docs]
@qt_try_except()
def compute_dynamic_parameters(
self, param: cps.DynamicParam | None = None
) -> dict[str, ResultProperties]:
"""Compute Dynamic Parameters (ENOB, SINAD, THD, SFDR, SNR)
with :py:func:`cdl.computation.signal.compute_dynamic_parameters`"""
return self.compute_10(
cps.compute_dynamic_parameters,
param,
cps.DynamicParam,
title=_("Dynamic parameters"),
)