Source code for cdl.core.gui.processor.base

# Copyright (c) DataLab Platform Developers, BSD 3-Clause license, see LICENSE file.

"""
.. Base processor object (see parent package :mod:`cdl.core.gui.processor`)
"""

# pylint: disable=invalid-name  # Allows short reference names like x, y, ...

from __future__ import annotations

import abc
import multiprocessing
import time
import warnings
from collections.abc import Callable
from multiprocessing.pool import Pool
from typing import TYPE_CHECKING, Any, Generic, Union

import guidata.dataset as gds
import numpy as np
from guidata.dataset import update_dataset
from guidata.qthelpers import exec_dialog
from guidata.widgets.arrayeditor import ArrayEditor
from qtpy import QtCore as QC
from qtpy import QtWidgets as QW

from cdl import env
from cdl.algorithms.datatypes import is_complex_dtype
from cdl.config import Conf, _
from cdl.core.gui.processor.catcher import CompOut, wng_err_func
from cdl.core.model.base import ResultProperties, ResultShape, TypeROI
from cdl.utils.qthelpers import create_progress_bar, qt_try_except
from cdl.widgets.warningerror import show_warning_error

if TYPE_CHECKING:
    from multiprocessing.pool import AsyncResult

    from plotpy.plot import PlotWidget

    from cdl.computation.base import (
        ArithmeticParam,
        ClipParam,
        ConstantParam,
        GaussianParam,
        MovingAverageParam,
        MovingMedianParam,
        NormalizeParam,
    )
    from cdl.core.gui.panel.image import ImagePanel
    from cdl.core.gui.panel.signal import SignalPanel
    from cdl.core.model.image import ImageObj
    from cdl.core.model.signal import SignalObj

    Obj = Union[SignalObj, ImageObj]


# Enable multiprocessing support for Windows, with frozen executable (e.g. PyInstaller)
multiprocessing.freeze_support()

# Set start method to 'spawn' for Linux (default is 'fork' which is not safe here
# because of the use of Qt and multithreading) - for other OS, the default is
# 'spawn' anyway
try:
    multiprocessing.set_start_method("spawn")
except RuntimeError:
    # This exception is raised if the method is already set (this may happen because
    # this module is imported more than once, e.g. when running tests)
    pass


COMPUTATION_TIP = _(
    "DataLab relies on various libraries to perform the computation. During the "
    "computation, errors may occur because of the data (e.g. division by zero, "
    "unexpected data type, etc.) or because of the libraries (e.g. memory error, "
    "etc.). If you encounter an error, before reporting it, please ensure that "
    "the computation is correct, by checking the data and the parameters."
)


POOL: Pool | None = None


[docs] class Worker: """Multiprocessing worker, to run long-running tasks in a separate process""" def __init__(self) -> None: self.asyncresult: AsyncResult = None self.result: Any = None
[docs] @staticmethod def create_pool() -> None: """Create multiprocessing pool""" global POOL # pylint: disable=global-statement # Create a pool with one process POOL = Pool(processes=1) # pylint: disable=not-callable,consider-using-with
[docs] @staticmethod def terminate_pool(wait: bool = False) -> None: """Terminate multiprocessing pool. Args: wait: wait for all tasks to finish. Defaults to False. """ global POOL # pylint: disable=global-statement if POOL is not None: if wait: # Close the pool properly (wait for all tasks to finish) POOL.close() else: # Terminate the pool and stop the timer POOL.terminate() POOL.join() POOL = None
[docs] def restart_pool(self) -> None: """Terminate and recreate the pool""" # Terminate the process and stop the timer self.terminate_pool(wait=False) # Recreate the pool for the next computation self.create_pool()
[docs] def run(self, func: Callable, args: tuple[Any]) -> None: """Run computation. Args: func: function to run args: arguments """ global POOL # pylint: disable=global-statement,global-variable-not-assigned assert POOL is not None self.asyncresult = POOL.apply_async(wng_err_func, (func, args))
[docs] def close(self) -> None: """Close worker: close pool properly and wait for all tasks to finish""" # Close multiprocessing Pool properly, but only if no computation is running, # to avoid blocking the GUI at exit (so, when wait=True, we wait for the # task to finish before closing the pool but there is actually no task running, # so the pool is closed immediately but *properly*) self.terminate_pool(wait=self.asyncresult is None)
[docs] def is_computation_finished(self) -> bool: """Return True if computation is finished. Returns: bool: True if computation is finished """ return self.asyncresult.ready()
[docs] def get_result(self) -> CompOut: """Return computation result. Returns: CompOut: computation result """ self.result = self.asyncresult.get() self.asyncresult = None return self.result
[docs] def is_pairwise_mode() -> bool: """Return True if operation mode is pairwise. Returns: bool: True if operation mode is pairwise """ state = Conf.proc.operation_mode.get() == "pairwise" return state
[docs] class BaseProcessor(QC.QObject, Generic[TypeROI]): """Object handling data processing: operations, processing, analysis. Args: panel: panel plotwidget: plot widget """ SIG_ADD_SHAPE = QC.Signal(str) PARAM_DEFAULTS: dict[str, gds.DataSet] = {} def __init__(self, panel: SignalPanel | ImagePanel, plotwidget: PlotWidget): super().__init__() self.panel = panel self.plotwidget = plotwidget self.worker: Worker | None = None self.set_process_isolation_enabled(Conf.main.process_isolation_enabled.get())
[docs] def close(self): """Close processor properly""" if self.worker is not None: self.worker.close() self.worker = None
[docs] def set_process_isolation_enabled(self, enabled: bool) -> None: """Set process isolation enabled. Args: enabled: enabled """ if enabled: if self.worker is None: self.worker = Worker() self.worker.create_pool() else: if self.worker is not None: self.worker.terminate_pool() self.worker = None
[docs] def has_param_defaults(self, paramclass: type[gds.DataSet]) -> bool: """Return True if parameter defaults are available. Args: paramclass: parameter class Returns: bool: True if parameter defaults are available """ return paramclass.__name__ in self.PARAM_DEFAULTS
[docs] def update_param_defaults(self, param: gds.DataSet) -> None: """Update parameter defaults. Args: param: parameters """ key = param.__class__.__name__ pdefaults = self.PARAM_DEFAULTS.get(key) if pdefaults is not None: update_dataset(param, pdefaults) self.PARAM_DEFAULTS[key] = param
[docs] def init_param( self, param: gds.DataSet, paramclass: type[gds.DataSet], title: str, comment: str | None = None, ) -> tuple[bool, gds.DataSet]: """Initialize processing parameters. Args: param: parameter paramclass: parameter class title: title comment: comment Returns: Tuple (edit, param) where edit is True if parameters have been edited, False otherwise. """ edit = param is None if edit: param = paramclass(title, comment) self.update_param_defaults(param) return edit, param
[docs] def compute_11( self, func: Callable, param: gds.DataSet | None = None, paramclass: gds.DataSet | None = None, title: str | None = None, comment: str | None = None, edit: bool | None = None, ) -> None: """Compute 11 function: 1 object in → 1 object out. Args: func: function param: parameter paramclass: parameter class title: title comment: comment edit: edit parameters """ if (edit is None or param is None) and paramclass is not None: edit, param = self.init_param(param, paramclass, title, comment) if param is not None: if edit and not param.edit(parent=self.panel.parent()): return self._compute_11_subroutine([func], [param], title)
[docs] def compute_1n( self, funcs: list[Callable] | Callable, params: list | None = None, title: str | None = None, edit: bool | None = None, ) -> None: """Compute 1n function: 1 object in → n objects out. Args: funcs: list of functions params: list of parameters title: title edit: edit parameters """ if params is None: assert not isinstance(funcs, Callable) params = [None] * len(funcs) else: group = gds.DataSetGroup(params, title=_("Parameters")) if edit and not group.edit(parent=self.panel.parent()): return if isinstance(funcs, Callable): funcs = [funcs] * len(params) else: assert len(funcs) == len(params) self._compute_11_subroutine(funcs, params, title)
[docs] def handle_output( self, compout: CompOut, context: str, progress: QW.QProgressDialog ) -> SignalObj | ImageObj | ResultShape | ResultProperties | None: """Handle computation output: if error, display error message, if warning, display warning message. Args: compout: computation output context: context (e.g. "Computing: Gaussian filter") progress: progress dialog Returns: Output object: a signal or image object, or a result shape object, or None if error """ if compout.error_msg or compout.warning_msg: mindur = progress.minimumDuration() progress.setMinimumDuration(1000000) if compout.error_msg: show_warning_error( self.panel, "error", context, compout.error_msg, COMPUTATION_TIP ) if compout.warning_msg: show_warning_error(self.panel, "warning", context, compout.warning_msg) progress.setMinimumDuration(mindur) if compout.error_msg: return None return compout.result
def __exec_func( self, func: Callable, args: tuple, progress: QW.QProgressDialog, ) -> CompOut | None: """Execute function, eventually in a separate process. Args: func: function to execute args: function arguments progress: progress dialog Returns: Computation output object or None if canceled """ QW.QApplication.processEvents() if not progress.wasCanceled(): if self.worker is None: return wng_err_func(func, args) self.worker.run(func, args) while not self.worker.is_computation_finished(): QW.QApplication.processEvents() time.sleep(0.1) if progress.wasCanceled(): self.worker.restart_pool() break if self.worker.is_computation_finished(): return self.worker.get_result() return None def _compute_11_subroutine( self, funcs: list[Callable], params: list, title: str ) -> None: """Compute 11 subroutine: used by compute 11 and compute 1n methods. Args: funcs: list of functions to execute params: list of parameters title: title of progress bar """ assert len(funcs) == len(params) objs = self.panel.objview.get_sel_objects(include_groups=True) grps = self.panel.objview.get_sel_groups() new_gids = {} with create_progress_bar( self.panel, title, max_=len(objs) * len(params) ) as progress: for i_row, obj in enumerate(objs): for i_param, (param, func) in enumerate(zip(params, funcs)): name = func.__name__.replace("compute_", "") i_title = f"{title} ({i_row + 1}/{len(objs)})" progress.setLabelText(i_title) pvalue = (i_row + 1) * (i_param + 1) pvalue = 0 if pvalue == 1 else pvalue progress.setValue(pvalue) args = (obj,) if param is None else (obj, param) result = self.__exec_func(func, args, progress) if result is None: break new_obj = self.handle_output( result, _("Computing: %s") % i_title, progress ) if new_obj is None: continue # Is new object a native object (i.e. a Signal object for a Signal # Panel, or an Image object for an Image Panel) ? # (example of non-native object use case: image profile extraction) is_new_obj_native = isinstance(new_obj, self.panel.PARAMCLASS) new_gid = None if grps and is_new_obj_native: # If groups are selected, then it means that there is no # individual object selected: we work on groups only old_gid = self.panel.objmodel.get_object_group_id(obj) new_gid = new_gids.get(old_gid) if new_gid is None: # Create a new group for each selected group old_g = self.panel.objmodel.get_group(old_gid) new_g = self.panel.add_group(f"{name}({old_g.short_id})") new_gids[old_gid] = new_gid = new_g.uuid if is_new_obj_native: self.panel.add_object(new_obj, group_id=new_gid) else: self.panel.mainwindow.add_object(new_obj) # Select newly created groups, if any for group_id in new_gids.values(): self.panel.objview.set_current_item_id(group_id, extend=True)
[docs] def compute_10( self, func: Callable, param: gds.DataSet | None = None, paramclass: gds.DataSet | None = None, title: str | None = None, comment: str | None = None, edit: bool | None = None, ) -> dict[str, ResultShape | ResultProperties]: """Compute 10 function: 1 object in → 0 object out (the result of this method is stored in original object's metadata). Args: func: function to execute param: parameters. Defaults to None. paramclass: parameters class. Defaults to None. title: title of progress bar. Defaults to None. comment: comment. Defaults to None. edit: if True, edit parameters. Defaults to None. Returns: Dictionary of results (keys: object uuid, values: ResultShape or ResultProperties objects) """ if (edit is None or param is None) and paramclass is not None: edit, param = self.init_param(param, paramclass, title, comment) if param is not None: if edit and not param.edit(parent=self.panel.parent()): return None objs = self.panel.objview.get_sel_objects(include_groups=True) current_obj = self.panel.objview.get_current_object() title = func.__name__.replace("compute_", "") if title is None else title with create_progress_bar(self.panel, title, max_=len(objs)) as progress: results: dict[str, ResultShape | ResultProperties] = {} xlabels = None ylabels = [] for idx, obj in enumerate(objs): pvalue = idx + 1 pvalue = 0 if pvalue == 1 else pvalue progress.setValue(pvalue) args = (obj,) if param is None else (obj, param) # Execute function compout = self.__exec_func(func, args, progress) if compout is None: break result = self.handle_output( compout, _("Computing: %s") % title, progress ) if result is None: continue # Add result shape to object's metadata result.add_to(obj) if param is not None: obj.metadata[f"{result.title}Param"] = str(param) results[obj.uuid] = result xlabels = result.headers if obj is current_obj: self.panel.selection_changed(update_items=True) else: self.panel.SIG_REFRESH_PLOT.emit(obj.uuid, True) for i_row_res in range(result.array.shape[0]): ylabel = f"{result.title}({obj.short_id})" i_roi = int(result.array[i_row_res, 0]) if i_roi >= 0: ylabel += f"|ROI{i_roi}" ylabels.append(ylabel) if results: with warnings.catch_warnings(): warnings.simplefilter("ignore", RuntimeWarning) dlg = ArrayEditor(self.panel.parent()) title = _("Results") res = np.vstack([result.shown_array for result in results.values()]) dlg.setup_and_check( res, title, readonly=True, xlabels=xlabels, ylabels=ylabels ) dlg.setObjectName(f"{objs[0].PREFIX}_results") dlg.resize(750, 300) exec_dialog(dlg) return results
def __get_src_grps_gids_objs_nbobj_valid(self) -> tuple[list, list, dict, int]: """Get source groups, group ids, objects, and number of objects, for pairwise mode, and check if the number of objects is valid. Returns: Tuple (source groups, group ids, objects, number of objects, valid) """ # In pairwise mode, we need to create a new object for each pair of objects objs = self.panel.objview.get_sel_objects(include_groups=True) objmodel = self.panel.objmodel src_grps = sorted( {objmodel.get_group_from_object(obj) for obj in objs}, key=lambda x: x.number, ) src_gids = [grp.uuid for grp in src_grps] # [src_objs dictionary] keys: old group id, values: list of old objects src_objs: dict[str, list[Obj]] = {} for src_gid in src_gids: src_objs[src_gid] = [ obj for obj in objs if objmodel.get_object_group_id(obj) == src_gid ] nbobj = len(src_objs[src_gids[0]]) valid = len(src_grps) > 1 if not valid: # In pairwise mode, we need selected objects in at least two groups. if env.execenv.unattended: raise ValueError( "Pairwise mode: objects must be selected in at least two groups" ) QW.QMessageBox.warning( self.panel.parent(), _("Warning"), _( "In pairwise mode, you need to select objects " "in at least two groups." ), ) if valid: valid = all(len(src_objs[src_gid]) == nbobj for src_gid in src_gids) if not valid: if env.execenv.unattended: raise ValueError( "Pairwise mode: invalid number of objects in each group" ) QW.QMessageBox.warning( self.panel.parent(), _("Warning"), _( "In pairwise mode, you need to select " "the same number of objects in each group." ), ) return src_grps, src_gids, src_objs, nbobj, valid
[docs] def compute_n1( self, name: str, func: Callable, param: gds.DataSet | None = None, paramclass: gds.DataSet | None = None, title: str | None = None, comment: str | None = None, func_objs: Callable | None = None, edit: bool | None = None, ) -> None: """Compute n1 function: N(>=2) objects in → 1 object out. Args: name: name of function func: function to execute param: parameters. Defaults to None. paramclass: parameters class. Defaults to None. title: title of progress bar. Defaults to None. comment: comment. Defaults to None. func_objs: function to execute on objects. Defaults to None. edit: if True, edit parameters. Defaults to None. """ if (edit is None or param is None) and paramclass is not None: edit, param = self.init_param(param, paramclass, title, comment) if param is not None: if edit and not param.edit(parent=self.panel.parent()): return objs = self.panel.objview.get_sel_objects(include_groups=True) objmodel = self.panel.objmodel pairwise = is_pairwise_mode() if pairwise: src_grps, src_gids, src_objs, _nbobj, valid = ( self.__get_src_grps_gids_objs_nbobj_valid() ) if not valid: return dst_gname = ( f"{name}({','.join([grp.short_id for grp in src_grps])})|pairwise" ) group_exclusive = len(self.panel.objview.get_sel_groups()) != 0 if not group_exclusive: # This is not a group exclusive selection dst_gname += "[...]" dst_gid = self.panel.add_group(dst_gname).uuid n_pairs = len(src_objs[src_gids[0]]) max_i_pair = min(n_pairs, max(len(src_objs[grp.uuid]) for grp in src_grps)) with create_progress_bar(self.panel, title, max_=n_pairs) as progress: for i_pair, src_obj1 in enumerate(src_objs[src_gids[0]][:max_i_pair]): src_obj1: SignalObj | ImageObj progress.setValue(i_pair + 1) progress.setLabelText(title) src_dtype = src_obj1.data.dtype dst_dtype = complex if is_complex_dtype(src_dtype) else float dst_obj = src_obj1.copy(dtype=dst_dtype) src_objs_pair = [src_obj1] for src_gid in src_gids[1:]: src_obj = src_objs[src_gid][i_pair] src_objs_pair.append(src_obj) if param is None: args = (dst_obj, src_obj) else: args = (dst_obj, src_obj, param) result = self.__exec_func(func, args, progress) if result is None: break dst_obj = self.handle_output( result, _("Calculating: %s") % title, progress ) if dst_obj is None: break dst_obj.update_resultshapes_from(src_obj) if src_obj.roi is not None: if dst_obj.roi is None: dst_obj.roi = src_obj.roi.copy() else: dst_obj.roi.add_roi(src_obj.roi) if func_objs is not None: func_objs(dst_obj, src_objs_pair) short_ids = [obj.short_id for obj in src_objs_pair] dst_obj.title = f'{name}({", ".join(short_ids)})' self.panel.add_object(dst_obj, group_id=dst_gid) else: # In single operand mode, we create a single object for all selected objects # [new_objs dictionary] keys: old group id, values: new object dst_objs: dict[str, Obj] = {} # [src_dtypes dictionary] keys: old group id, values: old data type src_dtypes: dict[str, np.dtype] = {} # [src_objs dictionary] keys: old group id, values: list of old objects src_objs: dict[str, list[Obj]] = {} with create_progress_bar(self.panel, title, max_=len(objs)) as progress: for index, src_obj in enumerate(objs): progress.setValue(index + 1) progress.setLabelText(title) src_gid = objmodel.get_object_group_id(src_obj) dst_obj = dst_objs.get(src_gid) if dst_obj is None: src_dtypes[src_gid] = src_dtype = src_obj.data.dtype dst_dtype = complex if is_complex_dtype(src_dtype) else float dst_objs[src_gid] = dst_obj = src_obj.copy(dtype=dst_dtype) dst_obj.roi = None src_objs[src_gid] = [src_obj] else: src_objs[src_gid].append(src_obj) if param is None: args = (dst_obj, src_obj) else: args = (dst_obj, src_obj, param) result = self.__exec_func(func, args, progress) if result is None: break dst_obj = self.handle_output( result, _("Calculating: %s") % title, progress ) if dst_obj is None: break dst_objs[src_gid] = dst_obj dst_obj.update_resultshapes_from(src_obj) if src_obj.roi is not None: if dst_obj.roi is None: dst_obj.roi = src_obj.roi.copy() else: dst_obj.roi.add_roi(src_obj.roi) grps = self.panel.objview.get_sel_groups() if grps: # (Group exclusive selection) # At least one group is selected: create a new group dst_gname = f"{name}({','.join([grp.short_id for grp in grps])})" dst_gid = self.panel.add_group(dst_gname).uuid else: # (Object exclusive selection) # No group is selected: use each object's group dst_gid = None for src_gid, dst_obj in dst_objs.items(): if func_objs is not None: func_objs(dst_obj, src_objs[src_gid]) short_ids = [obj.short_id for obj in src_objs[src_gid]] dst_obj.title = f'{name}({", ".join(short_ids)})' group_id = dst_gid if dst_gid is not None else src_gid self.panel.add_object(dst_obj, group_id=group_id) # Select newly created group, if any if dst_gid is not None: self.panel.objview.set_current_item_id(dst_gid)
[docs] def compute_n1n( self, obj2: Obj | list[Obj] | None, obj2_name: str, func: Callable, param: gds.DataSet | None = None, paramclass: gds.DataSet | None = None, title: str | None = None, comment: str | None = None, edit: bool | None = None, ) -> None: """Compute n1n function: N(>=1) objects + 1 object in → N objects out. Examples: subtract, divide Args: obj2: second object (or list of objects in case of pairwise operation mode) obj2_name: name of second object func: function to execute param: parameters. Defaults to None. paramclass: parameters class. Defaults to None. title: title of progress bar. Defaults to None. comment: comment. Defaults to None. edit: if True, edit parameters. Defaults to None. """ if (edit is None or param is None) and paramclass is not None: edit, param = self.init_param(param, paramclass, title, comment) objs = self.panel.objview.get_sel_objects(include_groups=True) objmodel = self.panel.objmodel pairwise = is_pairwise_mode() if obj2 is None: objs2 = [] elif isinstance(obj2, list): objs2 = obj2 assert pairwise else: objs2 = [obj2] dlg_title = _("Select %s") % obj2_name if pairwise: group_exclusive = len(self.panel.objview.get_sel_groups()) != 0 src_grps, src_gids, src_objs, nbobj, valid = ( self.__get_src_grps_gids_objs_nbobj_valid() ) if not valid: return if not objs2: objs2 = self.panel.get_objects_with_dialog( dlg_title, _( "<u>Note:</u> operation mode is <i>pairwise</i>: " "%s object(s) expected (i.e. as many as in the first group)" ) % nbobj, nbobj, ) if objs2 is None: return name = func.__name__.replace("compute_", "") n_pairs = len(src_objs[src_gids[0]]) max_i_pair = min(n_pairs, max(len(src_objs[grp.uuid]) for grp in src_grps)) grp2_id = objmodel.get_object_group_id(objs2[0]) grp2 = objmodel.get_group(grp2_id) with create_progress_bar(self.panel, title, max_=len(src_gids)) as progress: for i_group, src_gid in enumerate(src_gids): progress.setValue(i_group + 1) progress.setLabelText(title) if group_exclusive: # This is a group exclusive selection src_grp = objmodel.get_group(src_gid) grp_short_ids = [grp.short_id for grp in (src_grp, grp2)] dst_gname = f"{name}({','.join(grp_short_ids)})|pairwise" else: dst_gname = f"{name}[...]" dst_gid = self.panel.add_group(dst_gname).uuid for i_pair in range(max_i_pair): args = [src_objs[src_gid][i_pair], objs2[i_pair]] if param is not None: args.append(param) result = self.__exec_func(func, tuple(args), progress) if result is None: break new_obj = self.handle_output( result, _("Calculating: %s") % title, progress ) if new_obj is None: continue self.panel.add_object(new_obj, group_id=dst_gid) else: if not objs2: objs2 = self.panel.get_objects_with_dialog( dlg_title, _( "<u>Note:</u> operation mode is <i>single operand</i>: " "1 object expected" ), ) if objs2 is None: return obj2 = objs2[0] with create_progress_bar(self.panel, title, max_=len(objs)) as progress: for index, obj in enumerate(objs): progress.setValue(index + 1) progress.setLabelText(title) args = (obj, obj2) if param is None else (obj, obj2, param) result = self.__exec_func(func, args, progress) if result is None: break new_obj = self.handle_output( result, _("Calculating: %s") % title, progress ) if new_obj is None: continue group_id = objmodel.get_object_group_id(obj) self.panel.add_object(new_obj, group_id=group_id)
# ------Data Operations-------------------------------------------------------------
[docs] @abc.abstractmethod @qt_try_except() def compute_arithmetic( self, obj2: Obj | None = None, param: ArithmeticParam | None = None ) -> None: """Compute arithmetic operation"""
[docs] @abc.abstractmethod @qt_try_except() def compute_sum(self) -> None: """Compute sum"""
[docs] @abc.abstractmethod @qt_try_except() def compute_normalize(self, param: NormalizeParam | None = None) -> None: """Normalize data"""
[docs] @abc.abstractmethod @qt_try_except() def compute_average(self) -> None: """Compute average"""
[docs] @abc.abstractmethod @qt_try_except() def compute_product(self) -> None: """Compute product"""
[docs] @abc.abstractmethod @qt_try_except() def compute_difference(self, obj2: Obj | list[Obj] | None = None) -> None: """Compute difference"""
[docs] @abc.abstractmethod @qt_try_except() def compute_quadratic_difference(self, obj2: Obj | list[Obj] | None = None) -> None: """Compute quadratic difference"""
[docs] @abc.abstractmethod @qt_try_except() def compute_division(self, obj2: Obj | list[Obj] | None = None) -> None: """Compute division"""
[docs] @abc.abstractmethod @qt_try_except() def compute_swap_axes(self) -> None: """Swap data axes"""
[docs] @abc.abstractmethod @qt_try_except() def compute_abs(self) -> None: """Compute absolute value"""
[docs] @abc.abstractmethod @qt_try_except() def compute_re(self) -> None: """Compute real part"""
[docs] @abc.abstractmethod @qt_try_except() def compute_im(self) -> None: """Compute imaginary part"""
[docs] @abc.abstractmethod @qt_try_except() def compute_astype(self) -> None: """Convert data type"""
[docs] @abc.abstractmethod @qt_try_except() def compute_log10(self) -> None: """Compute Log10"""
[docs] @abc.abstractmethod @qt_try_except() def compute_exp(self) -> None: """Compute exponential"""
# ------Data Processing-------------------------------------------------------------
[docs] @abc.abstractmethod @qt_try_except() def compute_calibration(self, param=None) -> None: """Compute data linear calibration"""
[docs] @abc.abstractmethod @qt_try_except() def compute_clip(self, param: ClipParam | None = None) -> None: """Compute maximum data clipping"""
[docs] @abc.abstractmethod @qt_try_except() def compute_gaussian_filter(self, param: GaussianParam | None = None) -> None: """Compute gaussian filter"""
[docs] @abc.abstractmethod @qt_try_except() def compute_moving_average(self, param: MovingAverageParam | None = None) -> None: """Compute moving average"""
[docs] @abc.abstractmethod @qt_try_except() def compute_moving_median(self, param: MovingMedianParam | None = None) -> None: """Compute moving median"""
[docs] @abc.abstractmethod @qt_try_except() def compute_wiener(self) -> None: """Compute Wiener filter"""
[docs] @abc.abstractmethod @qt_try_except() def compute_fft(self) -> None: """Compute iFFT"""
[docs] @abc.abstractmethod @qt_try_except() def compute_ifft(self) -> None: """Compute FFT"""
[docs] @abc.abstractmethod @qt_try_except() def compute_addition_constant(self, param: ConstantParam) -> None: """Compute sum with a constant"""
[docs] @abc.abstractmethod @qt_try_except() def compute_difference_constant(self, param: ConstantParam) -> None: """Compute difference with a constant"""
[docs] @abc.abstractmethod @qt_try_except() def compute_product_constant(self, param: ConstantParam) -> None: """Compute product with a constant"""
[docs] @abc.abstractmethod @qt_try_except() def compute_division_constant(self, param: ConstantParam) -> None: """Compute division by a constant"""
[docs] @qt_try_except() def compute_roi_extraction(self, roi: TypeROI | None = None) -> None: """Extract Region Of Interest (ROI) from data with: - :py:func:`cdl.computation.image.extract_single_roi` for single ROI - :py:func:`cdl.computation.image.extract_multiple_roi` for multiple ROIs""" # Expected behavior: # ----------------- # * If `roi` is not None or not empty, skip the ROI dialog # * If first selected obj has a ROI, use this ROI as default but open # ROI Editor dialog anyway # * If multiple objs are selected, then apply the first obj ROI to all if roi is None or roi.is_empty(): roi = self.edit_regions_of_interest(extract=True) if roi is None or roi.is_empty(): return obj = self.panel.objview.get_sel_objects(include_groups=True)[0] group = roi.to_params(obj) if roi.singleobj and len(group.datasets) > 1: # Extract multiple ROIs into a single object (remove all the ROIs), # if the "Extract all ROIs into a single image object" # option is checked and if there are more than one ROI self._extract_multiple_roi_in_single_object(group) else: # Extract each ROI into a separate object (keep the ROI in the case of # a circular ROI), if the "Extract all ROIs into a single image object" # option is not checked or if there is only one ROI (See Issue #31) self._extract_each_roi_in_separate_object(group)
@abc.abstractmethod @qt_try_except() def _extract_multiple_roi_in_single_object(self, group: gds.DataSetGroup) -> None: """Extract multiple Regions Of Interest (ROIs) from data in a single object""" @abc.abstractmethod @qt_try_except() def _extract_each_roi_in_separate_object(self, group: gds.DataSetGroup) -> None: """Extract each single Region Of Interest (ROI) from data in a separate object (keep the ROI in the case of a circular ROI, for example)""" # ------Analysis-------------------------------------------------------------------
[docs] def edit_regions_of_interest( self, extract: bool = False, ) -> TypeROI | None: """Define Region Of Interest (ROI). Args: extract: If True, ROI is extracted from data. Defaults to False. Returns: ROI object or None if ROI dialog has been canceled. """ # Expected behavior: # ----------------- # * If first selected obj has a ROI, use this ROI as default but open # ROI Editor dialog anyway # * If multiple objs are selected, then apply the first obj ROI to all results = self.panel.get_roi_editor_output(extract=extract) if results is None: return None edited_roi, modified = results obj = self.panel.objview.get_sel_objects(include_groups=True)[0] group = edited_roi.to_params(obj) if ( env.execenv.unattended # Unattended mode (automated unit tests) or edited_roi.is_empty() # No ROI has been defined or group.edit(parent=self.panel.parent()) # ROI dialog has been accepted ): if modified: # If ROI has been modified, save ROI (even in "extract mode") if edited_roi.is_empty(): obj.roi = None else: edited_roi = edited_roi.from_params(obj, group) obj.roi = edited_roi self.SIG_ADD_SHAPE.emit(obj.uuid) self.panel.selection_changed(update_items=True) return edited_roi
[docs] def delete_regions_of_interest(self) -> None: """Delete Regions Of Interest""" for obj in self.panel.objview.get_sel_objects(): if obj.roi is not None: obj.roi = None self.panel.selection_changed(update_items=True)
[docs] @abc.abstractmethod @qt_try_except() def compute_stats(self) -> dict[str, ResultShape]: """Compute data statistics"""