# Copyright (c) DataLab Platform Developers, BSD 3-Clause license, see LICENSE file.
"""
.. Base processor object (see parent package :mod:`cdl.core.gui.processor`)
"""
# pylint: disable=invalid-name # Allows short reference names like x, y, ...
from __future__ import annotations
import abc
import multiprocessing
import time
import warnings
from collections.abc import Callable
from multiprocessing.pool import Pool
from typing import TYPE_CHECKING, Any, Union
import guidata.dataset as gds
import numpy as np
from guidata.dataset import update_dataset
from guidata.qthelpers import exec_dialog
from guidata.widgets.arrayeditor import ArrayEditor
from qtpy import QtCore as QC
from qtpy import QtWidgets as QW
from cdl import env
from cdl.algorithms.datatypes import is_complex_dtype
from cdl.computation.base import ROIDataParam
from cdl.config import Conf, _
from cdl.core.gui.processor.catcher import CompOut, wng_err_func
from cdl.core.model.base import ResultProperties, ResultShape
from cdl.utils.qthelpers import create_progress_bar, qt_try_except
from cdl.widgets.warningerror import show_warning_error
if TYPE_CHECKING:
from multiprocessing.pool import AsyncResult
from plotpy.plot import PlotWidget
from cdl.computation.base import (
ArithmeticParam,
ClipParam,
ConstantParam,
GaussianParam,
MovingAverageParam,
MovingMedianParam,
NormalizeParam,
)
from cdl.core.gui.panel.image import ImagePanel
from cdl.core.gui.panel.signal import SignalPanel
from cdl.core.model.image import ImageObj
from cdl.core.model.signal import SignalObj
Obj = Union[SignalObj, ImageObj]
# Enable multiprocessing support for Windows, with frozen executable (e.g. PyInstaller)
multiprocessing.freeze_support()
# Set start method to 'spawn' for Linux (default is 'fork' which is not safe here
# because of the use of Qt and multithreading) - for other OS, the default is
# 'spawn' anyway
try:
multiprocessing.set_start_method("spawn")
except RuntimeError:
# This exception is raised if the method is already set (this may happen because
# this module is imported more than once, e.g. when running tests)
pass
COMPUTATION_TIP = _(
"DataLab relies on various libraries to perform the computation. During the "
"computation, errors may occur because of the data (e.g. division by zero, "
"unexpected data type, etc.) or because of the libraries (e.g. memory error, "
"etc.). If you encounter an error, before reporting it, please ensure that "
"the computation is correct, by checking the data and the parameters."
)
POOL: Pool | None = None
[docs]
class Worker:
"""Multiprocessing worker, to run long-running tasks in a separate process"""
def __init__(self) -> None:
self.asyncresult: AsyncResult = None
self.result: Any = None
[docs]
@staticmethod
def create_pool() -> None:
"""Create multiprocessing pool"""
global POOL # pylint: disable=global-statement
# Create a pool with one process
POOL = Pool(processes=1) # pylint: disable=not-callable,consider-using-with
[docs]
@staticmethod
def terminate_pool(wait: bool = False) -> None:
"""Terminate multiprocessing pool.
Args:
wait: wait for all tasks to finish. Defaults to False.
"""
global POOL # pylint: disable=global-statement
if POOL is not None:
if wait:
# Close the pool properly (wait for all tasks to finish)
POOL.close()
else:
# Terminate the pool and stop the timer
POOL.terminate()
POOL.join()
POOL = None
[docs]
def restart_pool(self) -> None:
"""Terminate and recreate the pool"""
# Terminate the process and stop the timer
self.terminate_pool(wait=False)
# Recreate the pool for the next computation
self.create_pool()
[docs]
def run(self, func: Callable, args: tuple[Any]) -> None:
"""Run computation.
Args:
func: function to run
args: arguments
"""
global POOL # pylint: disable=global-statement,global-variable-not-assigned
assert POOL is not None
self.asyncresult = POOL.apply_async(wng_err_func, (func, args))
[docs]
def close(self) -> None:
"""Close worker: close pool properly and wait for all tasks to finish"""
# Close multiprocessing Pool properly, but only if no computation is running,
# to avoid blocking the GUI at exit (so, when wait=True, we wait for the
# task to finish before closing the pool but there is actually no task running,
# so the pool is closed immediately but *properly*)
self.terminate_pool(wait=self.asyncresult is None)
[docs]
def is_computation_finished(self) -> bool:
"""Return True if computation is finished.
Returns:
bool: True if computation is finished
"""
return self.asyncresult.ready()
[docs]
def get_result(self) -> CompOut:
"""Return computation result.
Returns:
CompOut: computation result
"""
self.result = self.asyncresult.get()
self.asyncresult = None
return self.result
[docs]
class BaseProcessor(QC.QObject):
"""Object handling data processing: operations, processing, analysis.
Args:
panel: panel
plotwidget: plot widget
"""
SIG_ADD_SHAPE = QC.Signal(str)
PARAM_DEFAULTS: dict[str, gds.DataSet] = {}
def __init__(self, panel: SignalPanel | ImagePanel, plotwidget: PlotWidget):
super().__init__()
self.panel = panel
self.plotwidget = plotwidget
self.worker: Worker | None = None
self.set_process_isolation_enabled(Conf.main.process_isolation_enabled.get())
[docs]
def close(self):
"""Close processor properly"""
if self.worker is not None:
self.worker.close()
self.worker = None
[docs]
def set_process_isolation_enabled(self, enabled: bool) -> None:
"""Set process isolation enabled.
Args:
enabled: enabled
"""
if enabled:
if self.worker is None:
self.worker = Worker()
self.worker.create_pool()
else:
if self.worker is not None:
self.worker.terminate_pool()
self.worker = None
[docs]
def has_param_defaults(self, paramclass: type[gds.DataSet]) -> bool:
"""Return True if parameter defaults are available.
Args:
paramclass: parameter class
Returns:
bool: True if parameter defaults are available
"""
return paramclass.__name__ in self.PARAM_DEFAULTS
[docs]
def update_param_defaults(self, param: gds.DataSet) -> None:
"""Update parameter defaults.
Args:
param: parameters
"""
key = param.__class__.__name__
pdefaults = self.PARAM_DEFAULTS.get(key)
if pdefaults is not None:
update_dataset(param, pdefaults)
self.PARAM_DEFAULTS[key] = param
[docs]
def init_param(
self,
param: gds.DataSet,
paramclass: type[gds.DataSet],
title: str,
comment: str | None = None,
) -> tuple[bool, gds.DataSet]:
"""Initialize processing parameters.
Args:
param: parameter
paramclass: parameter class
title: title
comment: comment
Returns:
Tuple (edit, param) where edit is True if parameters have been edited,
False otherwise.
"""
edit = param is None
if edit:
param = paramclass(title, comment)
self.update_param_defaults(param)
return edit, param
[docs]
def compute_11(
self,
func: Callable,
param: gds.DataSet | None = None,
paramclass: gds.DataSet | None = None,
title: str | None = None,
comment: str | None = None,
edit: bool | None = None,
) -> None:
"""Compute 11 function: 1 object in → 1 object out.
Args:
func: function
param: parameter
paramclass: parameter class
title: title
comment: comment
edit: edit parameters
"""
if (edit is None or param is None) and paramclass is not None:
edit, param = self.init_param(param, paramclass, title, comment)
if param is not None:
if edit and not param.edit(parent=self.panel.parent()):
return
self._compute_11_subroutine([func], [param], title)
[docs]
def compute_1n(
self,
funcs: list[Callable] | Callable,
params: list | None = None,
title: str | None = None,
edit: bool | None = None,
) -> None:
"""Compute 1n function: 1 object in → n objects out.
Args:
funcs: list of functions
params: list of parameters
title: title
edit: edit parameters
"""
if params is None:
assert not isinstance(funcs, Callable)
params = [None] * len(funcs)
else:
group = gds.DataSetGroup(params, title=_("Parameters"))
if edit and not group.edit(parent=self.panel.parent()):
return
if isinstance(funcs, Callable):
funcs = [funcs] * len(params)
else:
assert len(funcs) == len(params)
self._compute_11_subroutine(funcs, params, title)
[docs]
def handle_output(
self, compout: CompOut, context: str, progress: QW.QProgressDialog
) -> SignalObj | ImageObj | ResultShape | ResultProperties | None:
"""Handle computation output: if error, display error message,
if warning, display warning message.
Args:
compout: computation output
context: context (e.g. "Computing: Gaussian filter")
progress: progress dialog
Returns:
Output object: a signal or image object, or a result shape object,
or None if error
"""
if compout.error_msg or compout.warning_msg:
mindur = progress.minimumDuration()
progress.setMinimumDuration(1000000)
if compout.error_msg:
show_warning_error(
self.panel, "error", context, compout.error_msg, COMPUTATION_TIP
)
if compout.warning_msg:
show_warning_error(self.panel, "warning", context, compout.warning_msg)
progress.setMinimumDuration(mindur)
if compout.error_msg:
return None
return compout.result
def __exec_func(
self,
func: Callable,
args: tuple,
progress: QW.QProgressDialog,
) -> CompOut | None:
"""Execute function, eventually in a separate process.
Args:
func: function to execute
args: function arguments
progress: progress dialog
Returns:
Computation output object or None if canceled
"""
QW.QApplication.processEvents()
if not progress.wasCanceled():
if self.worker is None:
return wng_err_func(func, args)
self.worker.run(func, args)
while not self.worker.is_computation_finished():
QW.QApplication.processEvents()
time.sleep(0.1)
if progress.wasCanceled():
self.worker.restart_pool()
break
if self.worker.is_computation_finished():
return self.worker.get_result()
return None
def _compute_11_subroutine(
self, funcs: list[Callable], params: list, title: str
) -> None:
"""Compute 11 subroutine: used by compute 11 and compute 1n methods.
Args:
funcs: list of functions to execute
params: list of parameters
title: title of progress bar
"""
assert len(funcs) == len(params)
objs = self.panel.objview.get_sel_objects(include_groups=True)
grps = self.panel.objview.get_sel_groups()
new_gids = {}
with create_progress_bar(
self.panel, title, max_=len(objs) * len(params)
) as progress:
for i_row, obj in enumerate(objs):
for i_param, (param, func) in enumerate(zip(params, funcs)):
name = func.__name__.replace("compute_", "")
i_title = f"{title} ({i_row + 1}/{len(objs)})"
progress.setLabelText(i_title)
pvalue = (i_row + 1) * (i_param + 1)
pvalue = 0 if pvalue == 1 else pvalue
progress.setValue(pvalue)
args = (obj,) if param is None else (obj, param)
result = self.__exec_func(func, args, progress)
if result is None:
break
new_obj = self.handle_output(
result, _("Computing: %s") % i_title, progress
)
if new_obj is None:
continue
# Is new object a native object (i.e. a Signal object for a Signal
# Panel, or an Image object for an Image Panel) ?
# (example of non-native object use case: image profile extraction)
is_new_obj_native = isinstance(new_obj, self.panel.PARAMCLASS)
new_gid = None
if grps and is_new_obj_native:
# If groups are selected, then it means that there is no
# individual object selected: we work on groups only
old_gid = self.panel.objmodel.get_object_group_id(obj)
new_gid = new_gids.get(old_gid)
if new_gid is None:
# Create a new group for each selected group
old_g = self.panel.objmodel.get_group(old_gid)
new_g = self.panel.add_group(f"{name}({old_g.short_id})")
new_gids[old_gid] = new_gid = new_g.uuid
if is_new_obj_native:
self.panel.add_object(new_obj, group_id=new_gid)
else:
self.panel.mainwindow.add_object(new_obj)
# Select newly created groups, if any
for group_id in new_gids.values():
self.panel.objview.set_current_item_id(group_id, extend=True)
[docs]
def compute_10(
self,
func: Callable,
param: gds.DataSet | None = None,
paramclass: gds.DataSet | None = None,
title: str | None = None,
comment: str | None = None,
edit: bool | None = None,
) -> dict[str, ResultShape | ResultProperties]:
"""Compute 10 function: 1 object in → 0 object out
(the result of this method is stored in original object's metadata).
Args:
func: function to execute
param: parameters. Defaults to None.
paramclass: parameters class. Defaults to None.
title: title of progress bar. Defaults to None.
comment: comment. Defaults to None.
edit: if True, edit parameters. Defaults to None.
Returns:
Dictionary of results (keys: object uuid, values: ResultShape or
ResultProperties objects)
"""
if (edit is None or param is None) and paramclass is not None:
edit, param = self.init_param(param, paramclass, title, comment)
if param is not None:
if edit and not param.edit(parent=self.panel.parent()):
return None
objs = self.panel.objview.get_sel_objects(include_groups=True)
current_obj = self.panel.objview.get_current_object()
title = func.__name__.replace("compute_", "") if title is None else title
with create_progress_bar(self.panel, title, max_=len(objs)) as progress:
results: dict[str, ResultShape | ResultProperties] = {}
xlabels = None
ylabels = []
for idx, obj in enumerate(objs):
pvalue = idx + 1
pvalue = 0 if pvalue == 1 else pvalue
progress.setValue(pvalue)
args = (obj,) if param is None else (obj, param)
# Execute function
compout = self.__exec_func(func, args, progress)
if compout is None:
break
result = self.handle_output(
compout, _("Computing: %s") % title, progress
)
if result is None:
continue
# Add result shape to object's metadata
result.add_to(obj)
if param is not None:
obj.metadata[f"{result.title}Param"] = str(param)
results[obj.uuid] = result
xlabels = result.headers
if obj is current_obj:
self.panel.selection_changed(update_items=True)
else:
self.panel.SIG_REFRESH_PLOT.emit(obj.uuid, True)
for i_row_res in range(result.array.shape[0]):
ylabel = f"{result.title}({obj.short_id})"
i_roi = int(result.array[i_row_res, 0])
if i_roi >= 0:
ylabel += f"|ROI{i_roi}"
ylabels.append(ylabel)
if results:
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
dlg = ArrayEditor(self.panel.parent())
title = _("Results")
res = np.vstack([result.shown_array for result in results.values()])
dlg.setup_and_check(
res, title, readonly=True, xlabels=xlabels, ylabels=ylabels
)
dlg.setObjectName(f"{objs[0].PREFIX}_results")
dlg.resize(750, 300)
exec_dialog(dlg)
return results
[docs]
def compute_n1(
self,
name: str,
func: Callable,
param: gds.DataSet | None = None,
paramclass: gds.DataSet | None = None,
title: str | None = None,
comment: str | None = None,
func_objs: Callable | None = None,
edit: bool | None = None,
) -> None:
"""Compute n1 function: N(>=2) objects in → 1 object out.
Args:
name: name of function
func: function to execute
param: parameters. Defaults to None.
paramclass: parameters class. Defaults to None.
title: title of progress bar. Defaults to None.
comment: comment. Defaults to None.
func_objs: function to execute on objects. Defaults to None.
edit: if True, edit parameters. Defaults to None.
"""
if (edit is None or param is None) and paramclass is not None:
edit, param = self.init_param(param, paramclass, title, comment)
if param is not None:
if edit and not param.edit(parent=self.panel.parent()):
return
objs = self.panel.objview.get_sel_objects(include_groups=True)
# [new_objs dictionary] keys: old group id, values: new object
dst_objs: dict[str, Obj] = {}
# [src_dtypes dictionary] keys: old group id, values: old data type
src_dtypes: dict[str, np.dtype] = {}
# [src_objs dictionary] keys: old group id, values: list of old objects
src_objs: dict[str, list[Obj]] = {}
with create_progress_bar(self.panel, title, max_=len(objs)) as progress:
for index, src_obj in enumerate(objs):
progress.setValue(index + 1)
progress.setLabelText(title)
src_gid = self.panel.objmodel.get_object_group_id(src_obj)
dst_obj = dst_objs.get(src_gid)
if dst_obj is None:
src_dtypes[src_gid] = src_dtype = src_obj.data.dtype
dst_dtype = complex if is_complex_dtype(src_dtype) else float
dst_objs[src_gid] = dst_obj = src_obj.copy(dtype=dst_dtype)
dst_obj.roi = None
src_objs[src_gid] = [src_obj]
else:
src_objs[src_gid].append(src_obj)
if param is None:
args = (dst_obj, src_obj)
else:
args = (dst_obj, src_obj, param)
result = self.__exec_func(func, args, progress)
if result is None:
break
dst_obj = self.handle_output(
result, _("Calculating: %s") % title, progress
)
if dst_obj is None:
break
dst_objs[src_gid] = dst_obj
dst_obj.update_resultshapes_from(src_obj)
if src_obj.roi is not None:
if dst_obj.roi is None:
dst_obj.roi = src_obj.roi.copy()
else:
dst_obj.roi = np.vstack((dst_obj.roi, src_obj.roi))
grps = self.panel.objview.get_sel_groups()
if grps:
# (Group exclusive selection)
# At least one group is selected: create a new group
dst_gname = f"{name}({','.join([grp.short_id for grp in grps])})"
dst_gid = self.panel.add_group(dst_gname).uuid
else:
# (Object exclusive selection)
# No group is selected: use each object's group
dst_gid = None
for src_gid, dst_obj in dst_objs.items():
if func_objs is not None:
func_objs(dst_obj, src_objs[src_gid])
short_ids = [obj.short_id for obj in src_objs[src_gid]]
dst_obj.title = f'{name}({", ".join(short_ids)})'
group_id = dst_gid if dst_gid is not None else src_gid
self.panel.add_object(dst_obj, group_id=group_id)
# Select newly created group, if any
if dst_gid is not None:
self.panel.objview.set_current_item_id(dst_gid)
[docs]
def compute_n1n(
self,
obj2: Obj | None,
obj2_name: str,
func: Callable,
param: gds.DataSet | None = None,
paramclass: gds.DataSet | None = None,
title: str | None = None,
comment: str | None = None,
edit: bool | None = None,
) -> None:
"""Compute n1n function: N(>=1) objects + 1 object in → N objects out.
Examples: subtract, divide
Args:
obj2: second object
obj2_name: name of second object
func: function to execute
param: parameters. Defaults to None.
paramclass: parameters class. Defaults to None.
title: title of progress bar. Defaults to None.
comment: comment. Defaults to None.
edit: if True, edit parameters. Defaults to None.
"""
if (edit is None or param is None) and paramclass is not None:
edit, param = self.init_param(param, paramclass, title, comment)
if obj2 is None:
obj2 = self.panel.get_object_with_dialog(_("Select %s") % obj2_name)
if obj2 is None:
return
if param is not None:
if edit and not param.edit(parent=self.panel.parent()):
return
objs = self.panel.objview.get_sel_objects(include_groups=True)
# name = func.__name__.replace("compute_", "")
with create_progress_bar(self.panel, title, max_=len(objs)) as progress:
for index, obj in enumerate(objs):
progress.setValue(index + 1)
progress.setLabelText(title)
args = (obj, obj2) if param is None else (obj, obj2, param)
result = self.__exec_func(func, args, progress)
if result is None:
break
new_obj = self.handle_output(
result, _("Calculating: %s") % title, progress
)
if new_obj is None:
continue
group_id = self.panel.objmodel.get_object_group_id(obj)
self.panel.add_object(new_obj, group_id=group_id)
# ------Data Operations-------------------------------------------------------------
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_arithmetic(
self, obj2: Obj | None = None, param: ArithmeticParam | None = None
) -> None:
"""Compute arithmetic operation"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_sum(self) -> None:
"""Compute sum"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_normalize(self, param: NormalizeParam | None = None) -> None:
"""Normalize data"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_average(self) -> None:
"""Compute average"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_product(self) -> None:
"""Compute product"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_difference(self, obj2: Obj | None = None) -> None:
"""Compute difference"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_quadratic_difference(self, obj2: Obj | None = None) -> None:
"""Compute quadratic difference"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_division(self, obj2: Obj | None = None) -> None:
"""Compute division"""
def _get_roidataparam(self, param: ROIDataParam | None = None) -> ROIDataParam:
"""Eventually open ROI Editing Dialog, and return ROI editor data.
Args:
param: ROI data parameters.
Defaults to None.
Returns:
ROI data parameters.
"""
# Expected behavior:
# -----------------
# * If param.roidata argument is not None, skip the ROI dialog
# * If first selected obj has a ROI, use this ROI as default but open
# ROI Editor dialog anyway
# * If multiple objs are selected, then apply the first obj ROI to all
if param is None:
param = ROIDataParam()
if param.roidata is None:
param = self.edit_regions_of_interest(
extract=True, singleobj=param.singleobj
)
return param
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_swap_axes(self) -> None:
"""Swap data axes"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_abs(self) -> None:
"""Compute absolute value"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_re(self) -> None:
"""Compute real part"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_im(self) -> None:
"""Compute imaginary part"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_astype(self) -> None:
"""Convert data type"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_log10(self) -> None:
"""Compute Log10"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_exp(self) -> None:
"""Compute exponential"""
# ------Data Processing-------------------------------------------------------------
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_calibration(self, param=None) -> None:
"""Compute data linear calibration"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_clip(self, param: ClipParam | None = None) -> None:
"""Compute maximum data clipping"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_gaussian_filter(self, param: GaussianParam | None = None) -> None:
"""Compute gaussian filter"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_moving_average(self, param: MovingAverageParam | None = None) -> None:
"""Compute moving average"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_wiener(self) -> None:
"""Compute Wiener filter"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_fft(self) -> None:
"""Compute iFFT"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_ifft(self) -> None:
"""Compute FFT"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_addition_constant(self, param: ConstantParam) -> None:
"""Compute sum with a constant"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_difference_constant(self, param: ConstantParam) -> None:
"""Compute difference with a constant"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_product_constant(self, param: ConstantParam) -> None:
"""Compute product with a constant"""
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_division_constant(self, param: ConstantParam) -> None:
"""Compute division by a constant"""
# ------Analysis-------------------------------------------------------------------
[docs]
def edit_regions_of_interest(
self,
extract: bool = False,
singleobj: bool | None = None,
add_roi: bool = False,
) -> ROIDataParam | None:
"""Define Region Of Interest (ROI).
Args:
extract: If True, ROI is extracted from data. Defaults to False.
singleobj: If True, ROI is extracted from first selected object only.
If False, ROI is extracted from all selected objects. If None, ROI is
extracted from all selected objects only if they all have the same ROI.
Defaults to None.
add_roi: If True, add ROI to data immediately after opening the ROI editor.
Defaults to False.
Returns:
ROI data parameters or None if ROI dialog has been canceled.
"""
results = self.panel.get_roi_editor_output(
extract=extract, singleobj=singleobj, add_roi=add_roi
)
if results is None:
return None
roieditordata, modified = results
obj = self.panel.objview.get_sel_objects(include_groups=True)[0]
roigroup = obj.roidata_to_params(roieditordata.roidata)
if (
env.execenv.unattended
or roieditordata.roidata.size == 0
or roigroup.edit(parent=self.panel.parent())
):
roidata = obj.params_to_roidata(roigroup)
if modified:
roieditordata.roidata = roidata
# If ROI has been modified, save ROI (even in "extract mode")
obj.roi = roidata
self.SIG_ADD_SHAPE.emit(obj.uuid)
self.panel.selection_changed(update_items=True)
return roieditordata
[docs]
def delete_regions_of_interest(self) -> None:
"""Delete Regions Of Interest"""
for obj in self.panel.objview.get_sel_objects():
if obj.roi is not None:
obj.roi = None
self.panel.selection_changed(update_items=True)
[docs]
@abc.abstractmethod
@qt_try_except()
def compute_stats(self) -> dict[str, ResultShape]:
"""Compute data statistics"""