# Copyright (c) DataLab Platform Developers, BSD 3-Clause license, see LICENSE file.""".. Signal computation objects (see parent package :mod:`cdl.computation`)"""# pylint: disable=invalid-name # Allows short reference names like x, y, ...# MARK: Important note# --------------------# All `guidata.dataset.DataSet` classes must also be imported in the `cdl.param` module.from__future__importannotationsfromcollections.abcimportCallablefromenumimportEnumfromtypingimportAny,Literalimportguidata.datasetasgdsimportnumpyasnpimportscipy.integrateassptimportscipy.ndimageasspiimportscipy.signalasspsimportcdl.algorithms.signalasalgfromcdl.computation.baseimport(ArithmeticParam,ClipParam,ConstantParam,FFTParam,GaussianParam,HistogramParam,MovingAverageParam,MovingMedianParam,NormalizeParam,SpectrumParam,calc_resultproperties,dst_11,dst_n1n,new_signal_result,)fromcdl.configimport_fromcdl.objimportResultProperties,ResultShape,ROI1DParam,SignalObjVALID_DTYPES_STRLIST=SignalObj.get_valid_dtypenames()
[docs]defrestore_data_outside_roi(dst:SignalObj,src:SignalObj)->None:"""Restore data outside the region of interest, after a computation, only if the source signal has a ROI, if the data types are the same and if the shapes are the same. Otherwise, do nothing. Args: dst: destination signal object src: source signal object """if(src.maskdataisnotNoneanddst.xydata.dtype==src.xydata.dtypeanddst.xydata.shape==src.xydata.shape):dst.xydata[src.maskdata]=src.xydata[src.maskdata]
[docs]classWrap11Func:"""Wrap a 1 array → 1 array function (the simple case of y1 = f(y0)) to produce a 1 signal → 1 signal function, which can be used inside DataLab's infrastructure to perform computations with :class:`cdl.core.gui.processor.signal.SignalProcessor`. This wrapping mechanism using a class is necessary for the resulted function to be pickable by the ``multiprocessing`` module. The instance of this wrapper is callable and returns a :class:`cdl.obj.SignalObj` object. Example: >>> import numpy as np >>> from cdl.computation.signal import Wrap11Func >>> import cdl.obj >>> def square(y): ... return y**2 >>> compute_square = Wrap11Func(square) >>> x = np.linspace(0, 10, 100) >>> y = np.sin(x) >>> sig0 = cdl.obj.create_signal("Example", x, y) >>> sig1 = compute_square(sig0) Args: func: 1 array → 1 array function *args: Additional positional arguments to pass to the function **kwargs: Additional keyword arguments to pass to the function """def__init__(self,func:Callable,*args:Any,**kwargs:Any)->None:self.func=funcself.args=argsself.kwargs=kwargsself.__name__=func.__name__self.__doc__=func.__doc__self.__call__.__func__.__doc__=self.func.__doc__def__call__(self,src:SignalObj)->SignalObj:"""Compute the function on the input signal and return the result signal Args: src: input signal object Returns: Result signal object """suffix=", ".join([str(arg)forarginself.args]+[f"{k}={v}"fork,vinself.kwargs.items()ifvisnotNone])dst=dst_11(src,self.func.__name__,suffix)x,y=src.get_data()dst.set_xydata(x,self.func(y,*self.args,**self.kwargs))restore_data_outside_roi(dst,src)returndst
# MARK: compute_n1 functions -----------------------------------------------------------# Functions with N input signals and 1 output signal# --------------------------------------------------------------------------------------# Those functions are perfoming a computation on N input signals and return a single# output signal. If we were only executing these functions locally, we would not need# to define them here, but since we are using the multiprocessing module, we need to# define them here so that they can be pickled and sent to the worker processes.# Also, we need to systematically return the output signal object, even if it is already# modified in place, because the multiprocessing module will not be able to retrieve# the modified object from the worker processes.
[docs]defcompute_addition(dst:SignalObj,src:SignalObj)->SignalObj:"""Add **dst** and **src** signals and return **dst** signal modified in place Args: dst: destination signal src: source signal Returns: Modified **dst** signal (modified in place) """dst.y+=np.array(src.y,dtype=dst.y.dtype)ifdst.dyisnotNone:dst.dy=np.sqrt(dst.dy**2+src.dy**2)restore_data_outside_roi(dst,src)returndst
[docs]defcompute_product(dst:SignalObj,src:SignalObj)->SignalObj:"""Multiply **dst** and **src** signals and return **dst** signal modified in place Args: dst: destination signal src: source signal Returns: Modified **dst** signal (modified in place) """dst.y*=np.array(src.y,dtype=dst.y.dtype)ifdst.dyisnotNone:dst.dy=dst.y*np.sqrt((dst.dy/dst.y)**2+(src.dy/src.y)**2)restore_data_outside_roi(dst,src)returndst
[docs]defcompute_addition_constant(src:SignalObj,p:ConstantParam)->SignalObj:"""Add **dst** and a constant value and return a the new result signal object Args: src: input signal object p: constant value Returns: Result signal object **src** + **p.value** (new object) """dst=dst_11(src,"+",str(p.value))dst.y+=p.valuerestore_data_outside_roi(dst,src)returndst
[docs]defcompute_difference_constant(src:SignalObj,p:ConstantParam)->SignalObj:"""Subtract a constant value from a signal Args: src: input signal object p: constant value Returns: Result signal object **src** - **p.value** (new object) """dst=dst_11(src,"-",str(p.value))dst.y-=p.valuerestore_data_outside_roi(dst,src)returndst
[docs]defcompute_product_constant(src:SignalObj,p:ConstantParam)->SignalObj:"""Multiply **dst** by a constant value and return the new result signal object Args: src: input signal object p: constant value Returns: Result signal object **src** * **p.value** (new object) """dst=dst_11(src,"×",str(p.value))dst.y*=p.valuerestore_data_outside_roi(dst,src)returndst
[docs]defcompute_division_constant(src:SignalObj,p:ConstantParam)->SignalObj:"""Divide a signal by a constant value Args: src: input signal object p: constant value Returns: Result signal object **src** / **p.value** (new object) """dst=dst_11(src,"/",str(p.value))dst.y/=p.valuerestore_data_outside_roi(dst,src)returndst
# MARK: compute_n1n functions ----------------------------------------------------------# Functions with N input images + 1 input image and N output images# --------------------------------------------------------------------------------------
[docs]defcompute_arithmetic(src1:SignalObj,src2:SignalObj,p:ArithmeticParam)->SignalObj:"""Perform arithmetic operation on two signals Args: src1: source signal 1 src2: source signal 2 p: parameters Returns: Result signal object """initial_dtype=src1.xydata.dtypetitle=p.operation.replace("obj1",src1.short_id).replace("obj2",src2.short_id)dst=src1.copy(title=title)o,a,b=p.operator,p.factor,p.constantifoin("×","/")anda==0.0:dst.y=np.ones_like(src1.y)*belifp.operator=="+":dst.y=(src1.y+src2.y)*a+belifp.operator=="-":dst.y=(src1.y-src2.y)*a+belifp.operator=="×":dst.y=(src1.y*src2.y)*a+belifp.operator=="/":dst.y=(src1.y/src2.y)*a+bifdst.dyisnotNoneandp.operatorin("+","-"):dst.dy=np.sqrt(src1.dy**2+src2.dy**2)ifdst.dyisnotNone:dst.dy*=p.factor# Eventually convert to initial data typeifp.restore_dtype:dst.xydata=dst.xydata.astype(initial_dtype)restore_data_outside_roi(dst,src1)returndst
[docs]defcompute_difference(src1:SignalObj,src2:SignalObj)->SignalObj:"""Compute difference between two signals .. note:: If uncertainty is available, it is propagated. Args: src1: source signal 1 src2: source signal 2 Returns: Result signal object **src1** - **src2** """dst=dst_n1n(src1,src2,"-")dst.y=src1.y-src2.yifdst.dyisnotNone:dst.dy=np.sqrt(src1.dy**2+src2.dy**2)restore_data_outside_roi(dst,src1)returndst
[docs]defcompute_quadratic_difference(src1:SignalObj,src2:SignalObj)->SignalObj:"""Compute quadratic difference between two signals .. note:: If uncertainty is available, it is propagated. Args: src1: source signal 1 src2: source signal 2 Returns: Result signal object (**src1** - **src2**) / sqrt(2.0) """dst=dst_n1n(src1,src2,"quadratic_difference")x1,y1=src1.get_data()_x2,y2=src2.get_data()dst.set_xydata(x1,(y1-np.array(y2,dtype=y1.dtype))/np.sqrt(2.0))ifnp.issubdtype(dst.data.dtype,np.unsignedinteger):dst.data[src1.data<src2.data]=0ifdst.dyisnotNone:dst.dy=np.sqrt(src1.dy**2+src2.dy**2)restore_data_outside_roi(dst,src1)returndst
[docs]defcompute_division(src1:SignalObj,src2:SignalObj)->SignalObj:"""Compute division between two signals Args: src1: source signal 1 src2: source signal 2 Returns: Result signal object **src1** / **src2** """dst=dst_n1n(src1,src2,"/")x1,y1=src1.get_data()_x2,y2=src2.get_data()dst.set_xydata(x1,y1/np.array(y2,dtype=y1.dtype))restore_data_outside_roi(dst,src1)returndst
# MARK: compute_11 functions -----------------------------------------------------------# Functions with 1 input image and 1 output image# --------------------------------------------------------------------------------------
[docs]defextract_multiple_roi(src:SignalObj,group:gds.DataSetGroup)->SignalObj:"""Extract multiple regions of interest from data Args: src: source signal group: group of parameters Returns: Signal with multiple regions of interest """suffix=Noneiflen(group.datasets)==1:p:ROI1DParam=group.datasets[0]suffix=f"{p.xmin:.3g}≤x≤{p.xmax:.3g}"dst=dst_11(src,"extract_multiple_roi",suffix)x,y=src.get_data()xout,yout=np.ones_like(x)*np.nan,np.ones_like(y)*np.nanforpingroup.datasets:idx1,idx2=np.searchsorted(x,p.xmin),np.searchsorted(x,p.xmax)slice0=slice(idx1,idx2)xout[slice0],yout[slice0]=x[slice0],y[slice0]nans=np.isnan(xout)|np.isnan(yout)dst.set_xydata(xout[~nans],yout[~nans])# TODO: [P2] Instead of removing geometric shapes, apply roi extractdst.remove_all_shapes()returndst
[docs]defextract_single_roi(src:SignalObj,p:ROI1DParam)->SignalObj:"""Extract single region of interest from data Args: src: source signal p: ROI parameters Returns: Signal with single region of interest """dst=dst_11(src,"extract_single_roi",f"{p.xmin:.3g}≤x≤{p.xmax:.3g}")x,y=p.get_data(src).copy()dst.set_xydata(x,y)# TODO: [P2] Instead of removing geometric shapes, apply roi extractdst.remove_all_shapes()returndst
[docs]defcompute_swap_axes(src:SignalObj)->SignalObj:"""Swap axes Args: src: source signal Returns: Result signal object """dst=dst_11(src,"swap_axes")x,y=src.get_data()dst.set_xydata(y,x)returndst
[docs]defcompute_abs(src:SignalObj)->SignalObj:"""Compute absolute value with :py:data:`numpy.absolute` Args: src: source signal Returns: Result signal object """returnWrap11Func(np.absolute)(src)
[docs]defcompute_re(src:SignalObj)->SignalObj:"""Compute real part with :py:func:`numpy.real` Args: src: source signal Returns: Result signal object """returnWrap11Func(np.real)(src)
[docs]defcompute_im(src:SignalObj)->SignalObj:"""Compute imaginary part with :py:func:`numpy.imag` Args: src: source signal Returns: Result signal object """returnWrap11Func(np.imag)(src)
[docs]classDataTypeSParam(gds.DataSet):"""Convert signal data type parameters"""dtype_str=gds.ChoiceItem(_("Destination data type"),list(zip(VALID_DTYPES_STRLIST,VALID_DTYPES_STRLIST)),help=_("Output image data type."),)
[docs]defcompute_astype(src:SignalObj,p:DataTypeSParam)->SignalObj:"""Convert data type with :py:func:`numpy.astype` Args: src: source signal p: parameters Returns: Result signal object """dst=dst_11(src,"astype",f"dtype={p.dtype_str}")dst.xydata=src.xydata.astype(p.dtype_str)returndst
[docs]defcompute_log10(src:SignalObj)->SignalObj:"""Compute Log10 with :py:data:`numpy.log10` Args: src: source signal Returns: Result signal object """returnWrap11Func(np.log10)(src)
[docs]defcompute_exp(src:SignalObj)->SignalObj:"""Compute exponential with :py:data:`numpy.exp` Args: src: source signal Returns: Result signal object """returnWrap11Func(np.exp)(src)
[docs]defcompute_sqrt(src:SignalObj)->SignalObj:"""Compute square root with :py:data:`numpy.sqrt` Args: src: source signal Returns: Result signal object """returnWrap11Func(np.sqrt)(src)
[docs]defcompute_power(src:SignalObj,p:PowerParam)->SignalObj:"""Compute power with :py:data:`numpy.power` Args: src: source signal p: parameters Returns: Result signal object """dst=dst_11(src,"^",str(p.power))dst.y=np.power(src.y,p.power)restore_data_outside_roi(dst,src)returndst
[docs]defcompute_peak_detection(src:SignalObj,p:PeakDetectionParam)->SignalObj:"""Peak detection with :py:func:`cdl.algorithms.signal.peak_indices` Args: src: source signal p: parameters Returns: Result signal object """dst=dst_11(src,"peak_detection",f"threshold={p.threshold}%, min_dist={p.min_dist}pts")x,y=src.get_data()indices=alg.peak_indices(y,thres=p.threshold*0.01,min_dist=p.min_dist)dst.set_xydata(x[indices],y[indices])dst.metadata["curvestyle"]="Sticks"returndst
[docs]defcompute_normalize(src:SignalObj,p:NormalizeParam)->SignalObj:"""Normalize data with :py:func:`cdl.algorithms.signal.normalize` Args: src: source signal p: parameters Returns: Result signal object """dst=dst_11(src,"normalize",f"ref={p.method}")x,y=src.get_data()dst.set_xydata(x,alg.normalize(y,p.method))restore_data_outside_roi(dst,src)returndst
[docs]defcompute_derivative(src:SignalObj)->SignalObj:"""Compute derivative with :py:func:`numpy.gradient` Args: src: source signal Returns: Result signal object """dst=dst_11(src,"derivative")x,y=src.get_data()dst.set_xydata(x,np.gradient(y,x))restore_data_outside_roi(dst,src)returndst
[docs]defcompute_integral(src:SignalObj)->SignalObj:"""Compute integral with :py:func:`scipy.integrate.cumulative_trapezoid` Args: src: source signal Returns: Result signal object """dst=dst_11(src,"integral")x,y=src.get_data()dst.set_xydata(x,spt.cumulative_trapezoid(y,x,initial=0.0))restore_data_outside_roi(dst,src)returndst
[docs]defcompute_calibration(src:SignalObj,p:XYCalibrateParam)->SignalObj:"""Compute linear calibration Args: src: source signal p: parameters Returns: Result signal object """dst=dst_11(src,"calibration",f"{p.axis}={p.a}*{p.axis}+{p.b}")x,y=src.get_data()ifp.axis=="x":dst.set_xydata(p.a*x+p.b,y)else:dst.set_xydata(x,p.a*y+p.b)restore_data_outside_roi(dst,src)returndst
[docs]defcompute_clip(src:SignalObj,p:ClipParam)->SignalObj:"""Compute maximum data clipping with :py:func:`numpy.clip` Args: src: source signal p: parameters Returns: Result signal object """returnWrap11Func(np.clip,a_min=p.lower,a_max=p.upper)(src)
[docs]defcompute_offset_correction(src:SignalObj,p:ROI1DParam)->SignalObj:"""Correct offset: subtract the mean value of the signal in the specified range (baseline correction) Args: src: source signal p: parameters Returns: Result signal object """dst=dst_11(src,"offset_correction",f"{p.xmin:.3g}≤x≤{p.xmax:.3g}")_roi_x,roi_y=p.get_data(src)dst.y-=np.mean(roi_y)restore_data_outside_roi(dst,src)returndst
[docs]defcompute_gaussian_filter(src:SignalObj,p:GaussianParam)->SignalObj:"""Compute gaussian filter with :py:func:`scipy.ndimage.gaussian_filter` Args: src: source signal p: parameters Returns: Result signal object """returnWrap11Func(spi.gaussian_filter,sigma=p.sigma)(src)
[docs]defcompute_moving_average(src:SignalObj,p:MovingAverageParam)->SignalObj:"""Compute moving average with :py:func:`scipy.ndimage.uniform_filter` Args: src: source signal p: parameters Returns: Result signal object """returnWrap11Func(spi.uniform_filter,size=p.n,mode=p.mode)(src)
[docs]defcompute_moving_median(src:SignalObj,p:MovingMedianParam)->SignalObj:"""Compute moving median with :py:func:`scipy.ndimage.median_filter` Args: src: source signal p: parameters Returns: Result signal object """returnWrap11Func(spi.median_filter,size=p.n,mode=p.mode)(src)
[docs]defcompute_wiener(src:SignalObj)->SignalObj:"""Compute Wiener filter with :py:func:`scipy.signal.wiener` Args: src: source signal Returns: Result signal object """returnWrap11Func(sps.wiener)(src)
[docs]classBaseHighLowBandParam(gds.DataSet):"""Base class for high-pass, low-pass, band-pass and band-stop filters"""methods=(("bessel",_("Bessel")),("butter",_("Butterworth")),("cheby1",_("Chebyshev type 1")),("cheby2",_("Chebyshev type 2")),("ellip",_("Elliptic")),)TYPE:FilterType=FilterType.LOWPASS_type_prop=gds.GetAttrProp("TYPE")# Must be overwriten by the child class_method_prop=gds.GetAttrProp("method")method=gds.ChoiceItem(_("Filter method"),methods).set_prop("display",store=_method_prop)order=gds.IntItem(_("Filter order"),default=3,min=1)f_cut0=gds.FloatItem(_("Low cutoff frequency"),min=0,nonzero=True,unit="Hz").set_prop("display",hide=gds.FuncProp(_type_prop,lambdax:xisFilterType.HIGHPASS))f_cut1=gds.FloatItem(_("High cutoff frequency"),min=0,nonzero=True,unit="Hz").set_prop("display",hide=gds.FuncProp(_type_prop,lambdax:xisFilterType.LOWPASS))rp=gds.FloatItem(_("Passband ripple"),min=0,default=1,nonzero=True,unit="dB").set_prop("display",active=gds.FuncProp(_method_prop,lambdax:xin("cheby1","ellip")),)rs=gds.FloatItem(_("Stopband attenuation"),min=0,default=1,nonzero=True,unit="dB").set_prop("display",active=gds.FuncProp(_method_prop,lambdax:xin("cheby2","ellip")),)
[docs]@staticmethoddefget_nyquist_frequency(obj:SignalObj)->float:"""Return the Nyquist frequency of a signal object Args: obj: signal object """fs=float(obj.x.size-1)/(obj.x[-1]-obj.x[0])returnfs/2.0
[docs]defupdate_from_signal(self,obj:SignalObj)->None:"""Update the filter parameters from a signal object Args: obj: signal object """f_nyquist=self.get_nyquist_frequency(obj)ifself.f_cut0isNone:self.f_cut0=0.1*f_nyquistifself.f_cut1isNone:self.f_cut1=0.9*f_nyquist
[docs]defget_filter_params(self,obj:SignalObj)->tuple[float|str,float|str]:"""Return the filter parameters (a and b) as a tuple. These parameters are used in the scipy.signal filter functions (eg. `scipy.signal.filtfilt`). Args: obj: signal object Returns: tuple: filter parameters """f_nyquist=self.get_nyquist_frequency(obj)func=getattr(sps,self.method)args:list[float|str|tuple[float,...]]=[self.order]# type: ignoreifself.method=="cheby1":args+=[self.rp]elifself.method=="cheby2":args+=[self.rs]elifself.method=="ellip":args+=[self.rp,self.rs]ifself.TYPEisFilterType.HIGHPASS:args+=[self.f_cut1/f_nyquist]elifself.TYPEisFilterType.LOWPASS:args+=[self.f_cut0/f_nyquist]else:args+=[[self.f_cut0/f_nyquist,self.f_cut1/f_nyquist]]args+=[self.TYPE.value]returnfunc(*args)
[docs]defcompute_filter(src:SignalObj,p:BaseHighLowBandParam)->SignalObj:"""Compute frequency filter (low-pass, high-pass, band-pass, band-stop), with :py:func:`scipy.signal.filtfilt` Args: src: source signal p: parameters Returns: Result signal object """name=f"{p.TYPE.value}"suffix=f"order={p.order:d}"ifp.TYPEisFilterType.LOWPASS:suffix+=f", cutoff={p.f_cut0:.2f}"elifp.TYPEisFilterType.HIGHPASS:suffix+=f", cutoff={p.f_cut1:.2f}"else:suffix+=f", cutoff={p.f_cut0:.2f}:{p.f_cut1:.2f}"dst=dst_11(src,name,suffix)b,a=p.get_filter_params(dst)dst.y=sps.filtfilt(b,a,dst.y)restore_data_outside_roi(dst,src)returndst
[docs]defcompute_fft(src:SignalObj,p:FFTParam|None=None)->SignalObj:"""Compute FFT with :py:func:`cdl.algorithms.signal.fft1d` Args: src: source signal p: parameters Returns: Result signal object """dst=dst_11(src,"fft")x,y=src.get_data()dst.set_xydata(*alg.fft1d(x,y,shift=TrueifpisNoneelsep.shift))dst.save_attr_to_metadata("xunit","Hz"ifdst.xunit=="s"else"")dst.save_attr_to_metadata("yunit","")dst.save_attr_to_metadata("xlabel",_("Frequency"))returndst
[docs]defcompute_ifft(src:SignalObj,p:FFTParam|None=None)->SignalObj:"""Compute iFFT with :py:func:`cdl.algorithms.signal.ifft1d` Args: src: source signal p: parameters Returns: Result signal object """dst=dst_11(src,"ifft")x,y=src.get_data()dst.set_xydata(*alg.ifft1d(x,y,shift=TrueifpisNoneelsep.shift))dst.restore_attr_from_metadata("xunit","s"ifsrc.xunit=="Hz"else"")dst.restore_attr_from_metadata("yunit","")dst.restore_attr_from_metadata("xlabel","")returndst
[docs]defcompute_magnitude_spectrum(src:SignalObj,p:SpectrumParam|None=None)->SignalObj:"""Compute magnitude spectrum with :py:func:`cdl.algorithms.signal.magnitude_spectrum` Args: src: source signal p: parameters Returns: Result signal object """dst=dst_11(src,"magnitude_spectrum")x,y=src.get_data()log_scale=pisnotNoneandp.logdst.set_xydata(*alg.magnitude_spectrum(x,y,log_scale=log_scale))dst.xlabel=_("Frequency")dst.xunit="Hz"ifdst.xunit=="s"else""dst.yunit="dB"iflog_scaleelse""returndst
[docs]defcompute_phase_spectrum(src:SignalObj)->SignalObj:"""Compute phase spectrum with :py:func:`cdl.algorithms.signal.phase_spectrum` Args: src: source signal Returns: Result signal object """dst=dst_11(src,"phase_spectrum")x,y=src.get_data()dst.set_xydata(*alg.phase_spectrum(x,y))dst.xlabel=_("Frequency")dst.xunit="Hz"ifdst.xunit=="s"else""dst.yunit=""returndst
[docs]defcompute_psd(src:SignalObj,p:SpectrumParam|None=None)->SignalObj:"""Compute power spectral density with :py:func:`cdl.algorithms.signal.psd` Args: src: source signal p: parameters Returns: Result signal object """dst=dst_11(src,"psd")x,y=src.get_data()log_scale=pisnotNoneandp.logpsd_x,psd_y=alg.psd(x,y,log_scale=log_scale)dst.xydata=np.vstack((psd_x,psd_y))dst.xlabel=_("Frequency")dst.xunit="Hz"ifdst.xunit=="s"else""dst.yunit="dB/Hz"iflog_scaleelse""returndst
[docs]defcompute_histogram(src:SignalObj,p:HistogramParam)->SignalObj:"""Compute histogram with :py:func:`numpy.histogram` Args: src: source signal p: parameters Returns: Result signal object """data=src.get_masked_view().compressed()suffix=p.get_suffix(data)# Also updates p.lower and p.upper# Compute histogram:y,bin_edges=np.histogram(data,bins=p.bins,range=(p.lower,p.upper))x=(bin_edges[:-1]+bin_edges[1:])/2# Note: we use the `new_signal_result` function to create the result signal object# because the `dst_11` would copy the source signal, which is not what we want here# (we want a brand new signal object).dst=new_signal_result(src,"histogram",suffix=suffix,units=(src.yunit,""),labels=(src.ylabel,_("Counts")),)dst.set_xydata(x,y)dst.metadata["shade"]=0.5returndst
[docs]classInterpolationParam(gds.DataSet):"""Interpolation parameters"""methods=(("linear",_("Linear")),("spline",_("Spline")),("quadratic",_("Quadratic")),("cubic",_("Cubic")),("barycentric",_("Barycentric")),("pchip",_("PCHIP")),)method=gds.ChoiceItem(_("Interpolation method"),methods,default="linear")fill_value=gds.FloatItem(_("Fill value"),default=None,help=_("Value to use for points outside the interpolation domain (used only ""with linear, cubic and pchip methods)."),check=False,)
[docs]defcompute_interpolation(src1:SignalObj,src2:SignalObj,p:InterpolationParam)->SignalObj:"""Interpolate data with :py:func:`cdl.algorithms.signal.interpolate` Args: src1: source signal 1 src2: source signal 2 p: parameters Returns: Result signal object """suffix=f"method={p.method}"ifp.fill_valueisnotNoneandp.methodin("linear","cubic","pchip"):suffix+=f", fill_value={p.fill_value}"dst=dst_n1n(src1,src2,"interpolation",suffix)x1,y1=src1.get_data()xnew,_y2=src2.get_data()ynew=alg.interpolate(x1,y1,xnew,p.method,p.fill_value)dst.set_xydata(xnew,ynew)returndst
[docs]classResamplingParam(InterpolationParam):"""Resample parameters"""xmin=gds.FloatItem(_("X<sub>min</sub>"))xmax=gds.FloatItem(_("X<sub>max</sub>"))_prop=gds.GetAttrProp("dx_or_nbpts")_modes=(("dx","ΔX"),("nbpts",_("Number of points")))mode=gds.ChoiceItem(_("Mode"),_modes,default="nbpts",radio=True).set_prop("display",store=_prop)dx=gds.FloatItem("ΔX").set_prop("display",active=gds.FuncProp(_prop,lambdax:x=="dx"))nbpts=gds.IntItem(_("Number of points")).set_prop("display",active=gds.FuncProp(_prop,lambdax:x=="nbpts"))
[docs]defcompute_resampling(src:SignalObj,p:ResamplingParam)->SignalObj:"""Resample data with :py:func:`cdl.algorithms.signal.interpolate` Args: src: source signal p: parameters Returns: Result signal object """suffix=f"method={p.method}"ifp.fill_valueisnotNoneandp.methodin("linear","cubic","pchip"):suffix+=f", fill_value={p.fill_value}"ifp.mode=="dx":suffix+=f", dx={p.dx:.3f}"else:suffix+=f", nbpts={p.nbpts:d}"dst=dst_11(src,"resample",suffix)x,y=src.get_data()ifp.mode=="dx":xnew=np.arange(p.xmin,p.xmax,p.dx)else:xnew=np.linspace(p.xmin,p.xmax,p.nbpts)ynew=alg.interpolate(x,y,xnew,p.method,p.fill_value)dst.set_xydata(xnew,ynew)returndst
[docs]defcompute_detrending(src:SignalObj,p:DetrendingParam)->SignalObj:"""Detrend data with :py:func:`scipy.signal.detrend` Args: src: source signal p: parameters Returns: Result signal object """dst=dst_11(src,"detrending",f"method={p.method}")x,y=src.get_data()dst.set_xydata(x,sps.detrend(y,type=p.method))restore_data_outside_roi(dst,src)returndst
[docs]defcompute_convolution(src1:SignalObj,src2:SignalObj)->SignalObj:"""Compute convolution of two signals with :py:func:`scipy.signal.convolve` Args: src1: source signal 1 src2: source signal 2 Returns: Result signal object """dst=dst_n1n(src1,src2,"⊛")x1,y1=src1.get_data()_x2,y2=src2.get_data()ynew=np.real(sps.convolve(y1,y2,mode="same"))dst.set_xydata(x1,ynew)restore_data_outside_roi(dst,src1)returndst
[docs]classWindowingParam(gds.DataSet):"""Windowing parameters"""methods=(("barthann","Barthann"),("bartlett","Bartlett"),("blackman","Blackman"),("blackman-harris","Blackman-Harris"),("bohman","Bohman"),("boxcar","Boxcar"),("cosine",_("Cosine")),("exponential",_("Exponential")),("flat-top",_("Flat top")),("gaussian",_("Gaussian")),("hamming","Hamming"),("hanning","Hanning"),("kaiser","Kaiser"),("lanczos","Lanczos"),("nuttall","Nuttall"),("parzen","Parzen"),("rectangular",_("Rectangular")),("taylor","Taylor"),("tukey","Tukey"),)_meth_prop=gds.GetAttrProp("method")method=gds.ChoiceItem(_("Method"),methods,default="hamming").set_prop("display",store=_meth_prop)alpha=gds.FloatItem("Alpha",default=0.5,help=_("Shape parameter of the Tukey windowing function"),).set_prop("display",active=gds.FuncProp(_meth_prop,lambdax:x=="tukey"))beta=gds.FloatItem("Beta",default=14.0,help=_("Shape parameter of the Kaiser windowing function"),).set_prop("display",active=gds.FuncProp(_meth_prop,lambdax:x=="kaiser"))sigma=gds.FloatItem("Sigma",default=0.5,help=_("Shape parameter of the Gaussian windowing function"),).set_prop("display",active=gds.FuncProp(_meth_prop,lambdax:x=="gaussian"))
[docs]defcompute_windowing(src:SignalObj,p:WindowingParam)->SignalObj:"""Compute windowing (available methods: hamming, hanning, bartlett, blackman, tukey, rectangular) with :py:func:`cdl.algorithms.signal.windowing` Args: dst: destination signal src: source signal Returns: Result signal object """suffix=f"method={p.method}"ifp.method=="tukey":suffix+=f", alpha={p.alpha:.3f}"elifp.method=="kaiser":suffix+=f", beta={p.beta:.3f}"elifp.method=="gaussian":suffix+=f", sigma={p.sigma:.3f}"dst=dst_11(src,"windowing",suffix)# type: ignoredst.y=alg.windowing(dst.y,p.method,p.alpha)# type: ignorerestore_data_outside_roi(dst,src)returndst
[docs]defcompute_reverse_x(src:SignalObj)->SignalObj:"""Reverse x-axis Args: src: source signal Returns: Result signal object """dst=dst_11(src,"reverse_x")dst.y=dst.y[::-1]returndst
# MARK: compute_10 functions -----------------------------------------------------------# Functions with 1 input signal and 0 output signals (ResultShape or ResultProperties)# --------------------------------------------------------------------------------------
[docs]defcalc_resultshape(title:str,shape:Literal["rectangle","circle","ellipse","segment","marker","point","polygon"],obj:SignalObj,func:Callable,*args:Any,add_label:bool=False,)->ResultShape|None:"""Calculate result shape by executing a computation function on a signal object, taking into account the signal ROIs. Args: title: result title shape: result shape kind obj: input image object func: computation function *args: computation function arguments add_label: if True, add a label item (and the geometrical shape) to plot (default to False) Returns: Result shape object or None if no result is found .. warning:: The computation function must take either a single argument (the data) or multiple arguments (the data followed by the computation parameters). Moreover, the computation function must return a 1D NumPy array (or a list, or a tuple) containing the result of the computation. """res=[]fori_roiinobj.iterate_roi_indices():data_roi=obj.get_data(i_roi)ifargsisNone:results:np.ndarray=func(data_roi)else:results:np.ndarray=func(data_roi,*args)ifnotisinstance(results,(np.ndarray,list,tuple)):raiseValueError("The computation function must return a NumPy array, a list or a tuple")results=np.array(results)ifresults.size:ifresults.ndim!=1:raiseValueError("The computation function must return a 1D NumPy array")results=np.array([0ifi_roiisNoneelsei_roi]+results.tolist())res.append(results)ifres:returnResultShape(title,np.vstack(res),shape,add_label=add_label)returnNone
[docs]classFWHMParam(gds.DataSet):"""FWHM parameters"""methods=(("zero-crossing",_("Zero-crossing")),("gauss",_("Gaussian fit")),("lorentz",_("Lorentzian fit")),("voigt",_("Voigt fit")),)method=gds.ChoiceItem(_("Method"),methods,default="zero-crossing")xmin=gds.FloatItem("X<sub>MIN</sub>",default=None,check=False,help=_("Lower X boundary (empty for no limit, i.e. start of the signal)"),)xmax=gds.FloatItem("X<sub>MAX</sub>",default=None,check=False,help=_("Upper X boundary (empty for no limit, i.e. end of the signal)"),)
[docs]defcompute_fwhm(obj:SignalObj,param:FWHMParam)->ResultShape|None:"""Compute FWHM with :py:func:`cdl.algorithms.signal.fwhm` Args: obj: source signal param: parameters Returns: Segment coordinates """returncalc_resultshape("fwhm","segment",obj,alg.fwhm,param.method,param.xmin,param.xmax,add_label=True,)
[docs]defcompute_fw1e2(obj:SignalObj)->ResultShape|None:"""Compute FW at 1/e² with :py:func:`cdl.algorithms.signal.fw1e2` Args: obj: source signal Returns: Segment coordinates """returncalc_resultshape("fw1e2","segment",obj,alg.fw1e2,add_label=True)
[docs]defcompute_stats(obj:SignalObj)->ResultProperties:"""Compute statistics on a signal Args: obj: source signal Returns: Result properties object """statfuncs={"min(y) = %g {.yunit}":lambdaxy:xy[1].min(),"max(y) = %g {.yunit}":lambdaxy:xy[1].max(),"<y> = %g {.yunit}":lambdaxy:xy[1].mean(),"median(y) = %g {.yunit}":lambdaxy:np.median(xy[1]),"σ(y) = %g {.yunit}":lambdaxy:xy[1].std(),"<y>/σ(y)":lambdaxy:xy[1].mean()/xy[1].std(),"peak-to-peak(y) = %g {.yunit}":lambdaxy:np.ptp(xy[1]),"Σ(y) = %g {.yunit}":lambdaxy:xy[1].sum(),"∫ydx":lambdaxy:spt.trapezoid(xy[1],xy[0]),}returncalc_resultproperties("stats",obj,statfuncs)
[docs]defcompute_bandwidth_3db(obj:SignalObj)->ResultProperties:"""Compute bandwidth at -3 dB with :py:func:`cdl.algorithms.signal.bandwidth` Args: obj: source signal Returns: Result properties with bandwidth """returncalc_resultshape("bandwidth","segment",obj,alg.bandwidth,3.0,add_label=True)
[docs]classDynamicParam(gds.DataSet):"""Parameters for dynamic range computation (ENOB, SNR, SINAD, THD, SFDR)"""full_scale=gds.FloatItem(_("Full scale"),default=0.16,min=0.0,unit="V")_units=("dBc","dBFS")unit=gds.ChoiceItem(_("Unit"),zip(_units,_units),default="dBc",help=_("Unit for SINAD"))nb_harm=gds.IntItem(_("Number of harmonics"),default=5,min=1,help=_("Number of harmonics to consider for THD"),)
[docs]defcompute_dynamic_parameters(src:SignalObj,p:DynamicParam)->ResultProperties:"""Compute Dynamic parameters using the following functions: - Freq: :py:func:`cdl.algorithms.signal.sinus_frequency` - ENOB: :py:func:`cdl.algorithms.signal.enob` - SNR: :py:func:`cdl.algorithms.signal.snr` - SINAD: :py:func:`cdl.algorithms.signal.sinad` - THD: :py:func:`cdl.algorithms.signal.thd` - SFDR: :py:func:`cdl.algorithms.signal.sfdr` Args: src: source signal p: parameters Returns: Result properties with ENOB, SNR, SINAD, THD, SFDR """dsfx=f" = %g {p.unit}"funcs={"Freq":lambdaxy:alg.sinus_frequency(xy[0],xy[1]),"ENOB = %.1f bits":lambdaxy:alg.enob(xy[0],xy[1],p.full_scale),"SNR"+dsfx:lambdaxy:alg.snr(xy[0],xy[1],p.unit),"SINAD"+dsfx:lambdaxy:alg.sinad(xy[0],xy[1],p.unit),"THD"+dsfx:lambdaxy:alg.thd(xy[0],xy[1],p.full_scale,p.unit,p.nb_harm),"SFDR"+dsfx:lambdaxy:alg.sfdr(xy[0],xy[1],p.full_scale,p.unit),}returncalc_resultproperties("ADC",src,funcs)
[docs]defcompute_sampling_rate_period(obj:SignalObj)->ResultProperties:"""Compute sampling rate and period using the following functions: - fs: :py:func:`cdl.algorithms.signal.sampling_rate` - T: :py:func:`cdl.algorithms.signal.sampling_period` Args: obj: source signal Returns: Result properties with sampling rate and period """returncalc_resultproperties("sampling_rate_period",obj,{"fs = %g":lambdaxy:alg.sampling_rate(xy[0]),"T = %g {.xunit}":lambdaxy:alg.sampling_period(xy[0]),},)
[docs]defcompute_contrast(obj:SignalObj)->ResultProperties:"""Compute contrast with :py:func:`cdl.algorithms.signal.contrast`"""returncalc_resultproperties("contrast",obj,{"contrast":lambdaxy:alg.contrast(xy[1]),},)
[docs]defcompute_x_at_minmax(obj:SignalObj)->ResultProperties:"""Compute x at min/max"""returncalc_resultproperties("x@min,max",obj,{"X@Ymin = %g {.xunit}":lambdaxy:xy[0][np.argmin(xy[1])],"X@Ymax = %g {.xunit}":lambdaxy:xy[0][np.argmax(xy[1])],},)