# Copyright (c) DataLab Platform Developers, BSD 3-Clause license, see LICENSE file.
"""
.. Base panel objects (see parent package :mod:`datalab.gui.panel`)
"""
# pylint: disable=invalid-name # Allows short reference names like x, y, ...
from __future__ import annotations
import abc
import glob
import os
import os.path as osp
import re
import warnings
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Generator, Generic, Literal, Type
import guidata.dataset as gds
import guidata.dataset.qtwidgets as gdq
import h5py
import numpy as np
import plotpy.io
from guidata.configtools import get_icon
from guidata.dataset import restore_dataset, update_dataset
from guidata.qthelpers import add_actions, create_action, exec_dialog
from plotpy.plot import BasePlot, BasePlotOptions, PlotDialog, SyncPlotDialog
from plotpy.tools import ActionTool
from qtpy import QtCore as QC # type: ignore[import]
from qtpy import QtWidgets as QW
from qtpy.compat import (
getexistingdirectory,
getopenfilename,
getopenfilenames,
getsavefilename,
)
from sigima.io import (
read_annotations,
read_metadata,
read_roi,
write_annotations,
write_metadata,
write_roi,
)
from sigima.io.base import get_file_extensions
from sigima.io.common.basename import format_basenames
from sigima.objects import (
ImageObj,
NewImageParam,
SignalObj,
TypeObj,
TypeROI,
create_image_from_param,
create_signal,
create_signal_from_param,
)
from sigima.objects.base import ROI_KEY
from sigima.params import SaveToDirectoryParam
from datalab import objectmodel
from datalab.adapters_metadata import (
GeometryAdapter,
ResultData,
TableAdapter,
create_resultdata_dict,
show_resultdata,
)
from datalab.adapters_plotpy import create_adapter_from_object
from datalab.config import APP_NAME, Conf, _
from datalab.env import execenv
from datalab.gui import actionhandler, objectview
from datalab.gui.newobject import (
CREATION_PARAMETERS_OPTION,
NewSignalParam,
extract_creation_parameters,
insert_creation_parameters,
)
from datalab.gui.processor.base import (
PROCESSING_PARAMETERS_OPTION,
ProcessingParameters,
extract_processing_parameters,
insert_processing_parameters,
)
from datalab.gui.roieditor import TypeROIEditor
from datalab.objectmodel import ObjectGroup, get_short_id, get_uuid, set_uuid
from datalab.utils.qthelpers import (
CallbackWorker,
create_progress_bar,
qt_long_callback,
qt_try_except,
qt_try_loadsave_file,
save_restore_stds,
)
from datalab.widgets.textimport import TextImportWizard
if TYPE_CHECKING:
from plotpy.items import CurveItem, LabelItem, MaskedXYImageItem
from sigima.io.image import ImageIORegistry
from sigima.io.signal import SignalIORegistry
from datalab.gui import ObjItf
from datalab.gui.main import DLMainWindow
from datalab.gui.plothandler import ImagePlotHandler, SignalPlotHandler
from datalab.gui.processor.image import ImageProcessor
from datalab.gui.processor.signal import SignalProcessor
from datalab.h5.native import NativeH5Reader, NativeH5Writer
# Metadata keys that should not be pasted when copying metadata between objects
METADATA_PASTE_EXCLUSIONS = {
ROI_KEY, # ROI has dedicated copy/paste operations
"__uuid", # Each object must have a unique identifier
f"__{PROCESSING_PARAMETERS_OPTION}", # Object-specific processing history
f"__{CREATION_PARAMETERS_OPTION}", # Object-specific creation parameters
}
[docs]
def is_plot_item_serializable(item: Any) -> bool:
"""Return True if plot item is serializable"""
try:
plotpy.io.item_class_from_name(item.__class__.__name__)
return True
except AssertionError:
return False
[docs]
def is_hdf5_file(filename: str, check_content: bool = False) -> bool:
"""Return True if filename has an HDF5 extension or is an HDF5 file.
Args:
filename: Path to the file to check
check_content: If True, also attempts to open the file to verify it's a
valid HDF5 file. If False, only checks the file extension.
Returns:
True if the file is (likely) an HDF5 file, False otherwise.
"""
# First, check by extension (fast)
has_hdf5_extension = filename.lower().endswith((".h5", ".hdf5", ".hdf", ".he5"))
if not check_content:
return has_hdf5_extension
# If checking content, try to open as HDF5 file
if has_hdf5_extension:
return True # Trust common HDF5 extensions
# For other extensions, attempt to open the file to verify it's HDF5
try:
with h5py.File(filename, "r"):
return True
except (OSError, IOError, ValueError):
# Not a valid HDF5 file
return False
[docs]
@dataclass
class ProcessingReport:
"""Report of processing operation
Args:
success: True if processing succeeded
obj_uuid: UUID of the processed object
message: Optional message (error or info)
"""
success: bool
obj_uuid: str | None = None
message: str | None = None
[docs]
class ObjectProp(QW.QWidget):
"""Object handling panel properties
Args:
panel: parent data panel
objclass: class of the object handled by the panel (SignalObj or ImageObj)
"""
def __init__(self, panel: BaseDataPanel, objclass: SignalObj | ImageObj) -> None:
super().__init__(panel)
# Create the tab widget
self.tabwidget = QW.QTabWidget(self)
self.tabwidget.setTabBarAutoHide(True)
self.tabwidget.setTabPosition(QW.QTabWidget.West)
self.panel = panel
self.objclass = objclass
# Object creation tab
self.creation_param_editor: gdq.DataSetEditGroupBox | None = None
self.current_creation_obj: SignalObj | ImageObj | None = None
self.creation_scroll: QW.QScrollArea | None = None
# Object processing tab
self.processing_param_editor: gdq.DataSetEditGroupBox | None = None
self.current_processing_obj: SignalObj | ImageObj | None = None
self.processing_scroll: QW.QScrollArea | None = None
# Properties tab
self.properties = gdq.DataSetEditGroupBox("", objclass)
self.properties.SIG_APPLY_BUTTON_CLICKED.connect(panel.properties_changed)
self.properties.setEnabled(False)
self.__original_values: dict[str, Any] = {}
# Create Analysis and History widgets
font = Conf.proc.small_mono_font.get_font()
self.processing_history = QW.QTextEdit()
self.processing_history.setReadOnly(True)
self.processing_history.setFont(font)
self.analysis_parameters = QW.QTextEdit()
self.analysis_parameters.setReadOnly(True)
self.analysis_parameters.setFont(font)
# Track newly created objects to show Creation tab only once
self._newly_created_obj_uuid: str | None = None
# Track when analysis results were just computed
self._fresh_analysis_obj_uuid: str | None = None
# Track when object was just processed (1-to-1)
self._fresh_processing_obj_uuid: str | None = None
self.tabwidget.addTab(
self.processing_history, get_icon("history.svg"), _("History")
)
self.tabwidget.addTab(
self.analysis_parameters, get_icon("analysis.svg"), _("Analysis parameters")
)
self.tabwidget.addTab(
self.properties, get_icon("properties.svg"), _("Properties")
)
self.processing_history.textChanged.connect(self._update_tab_visibility)
self.analysis_parameters.textChanged.connect(self._update_tab_visibility)
# Create vertical layout for the container
layout = QW.QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
# Add tab widget and button area to main layout
layout.addWidget(self.tabwidget)
# Here we could add another widget or layout if needed (in DataLab v0.20, we
# had a permanent button area here, but it was removed to avoid clutter)
def _update_tab_visibility(self) -> None:
"""Update visibility of tabs based on their content."""
for textedit in (self.processing_history, self.analysis_parameters):
tab_index = self.tabwidget.indexOf(textedit)
if tab_index >= 0:
has_content = bool(textedit.toPlainText().strip())
self.tabwidget.setTabVisible(tab_index, has_content)
[docs]
def display_analysis_parameters(self, obj: SignalObj | ImageObj) -> bool:
"""Set analysis parameter label.
Args:
obj: Signal or Image object
Returns:
True if analysis parameters were found and displayed, False otherwise.
"""
text = ""
# Iterate through all result adapters and extract parameter info
for adapter_class in (GeometryAdapter, TableAdapter):
for adapter in adapter_class.iterate_from_obj(obj):
param = adapter.get_param()
if param is not None:
if text:
text += "<br><br>"
# Get function name for context
func_name = adapter.func_name
if func_name:
# Add function name as a header for better context
param.set_comment(
"(" + _("Parameters for function `%s`") % func_name + ")"
)
text += param.to_html()
self.analysis_parameters.setText(text)
return bool(text)
def _build_processing_history(self, obj: SignalObj | ImageObj) -> str:
"""Build processing history as a simple text list.
Args:
obj: Signal or Image object
Returns:
Processing history as text
"""
history_items = []
current_obj = obj
max_depth = 20 # Prevent infinite loops
# Walk backwards through processing chain, collecting items
while current_obj is not None and len(history_items) < max_depth:
proc_params = extract_processing_parameters(current_obj)
if proc_params is None:
# Check for creation parameters
creation_params = extract_creation_parameters(current_obj)
if creation_params is not None:
text = f"{_('Created')}: {creation_params.title}"
history_items.append(text)
else:
history_items.append(_("Original object"))
break
# Skip 1-to-0 operations (analysis) as they don't transform the object
# They just add metadata, so they shouldn't appear in processing history
if proc_params.pattern == "1-to-0":
# For 1-to-0 operations, there's no processing history to show
# (they analyze but don't transform the object)
# Check if there's any earlier processing
creation_params = extract_creation_parameters(current_obj)
if creation_params is not None:
text = f"{_('Created')}: {creation_params.title}"
history_items.append(text)
else:
history_items.append(_("Original object"))
break
# Add current processing step
func_name = proc_params.func_name.replace("_", " ").title()
history_items.append(func_name)
# Try to find source object
if proc_params.source_uuid:
current_obj = self.panel.mainwindow.find_object_by_uuid(
proc_params.source_uuid
)
if current_obj is None:
history_items.append(_("(source deleted)"))
break
else:
if proc_params.source_uuids:
# Multiple sources (n-to-1 or 2-to-1 pattern)
history_items.append(_("(multiple sources)"))
break
if len(history_items) <= 1:
return "" # Shows the history tab only when there is some history
# Reverse to show from oldest to newest, then add indentation
history_items.reverse()
history_lines = []
for i, item in enumerate(history_items):
indent = " " * i
history_lines.append(f"{indent}└─ {item}")
return "\n".join(history_lines)
[docs]
def display_processing_history(self, obj: SignalObj | ImageObj) -> bool:
"""Display processing history.
Args:
obj: Signal or Image object
Returns:
True if processing history was found and displayed, False otherwise.
"""
history_text = self._build_processing_history(obj)
self.processing_history.setText(history_text)
return bool(history_text)
def __update_properties_dataset(self, obj: SignalObj | ImageObj) -> None:
"""Update properties dataset from signal/image dataset
Args:
obj: Signal or Image object
"""
dataset: SignalObj | ImageObj = self.properties.dataset
dataset.set_defaults()
update_dataset(dataset, obj)
self.properties.get()
self.properties.apply_button.setEnabled(False)
[docs]
def update_properties_from(
self,
obj: SignalObj | ImageObj | None = None,
force_tab: Literal["creation", "processing", "analysis", None] | None = None,
) -> None:
"""Update properties panel (properties, creation, processing) from object.
Args:
obj: Signal or Image object
force_tab: Force a specific tab to be current
"""
self.properties.setDisabled(obj is None)
if obj is None:
obj = self.objclass()
# Update the properties dataset
self.__update_properties_dataset(obj)
# Store original values to detect which properties have changed
# (using `restore_dataset` to convert the dataset to a dictionary)
self.__original_values = {}
restore_dataset(self.properties.dataset, self.__original_values)
# Display analysis parameters and processing history
has_analysis_parameters = self.display_analysis_parameters(obj)
self.display_processing_history(obj)
# Remove only Creation and Processing tabs (dynamic tabs)
# Use widget references instead of text labels for reliable identification
if self.creation_scroll is not None:
index = self.tabwidget.indexOf(self.creation_scroll)
if index >= 0:
self.tabwidget.removeTab(index)
if self.processing_scroll is not None:
index = self.tabwidget.indexOf(self.processing_scroll)
if index >= 0:
self.tabwidget.removeTab(index)
# Reset references for dynamic tabs
self.creation_param_editor = None
self.current_creation_obj = None
self.creation_scroll = None
self.processing_param_editor = None
self.current_processing_obj = None
self.processing_scroll = None
# Setup Creation and Processing tabs (if applicable)
has_creation_tab = False
has_processing_tab = False
if obj is not None:
has_creation_tab = self.setup_creation_tab(obj)
has_processing_tab = self.setup_processing_tab(obj) # Processing tab setup
# Trigger visibility update for History and Analysis parameters tabs
# (will be called via textChanged signals, but we call explicitly
# here to ensure initial state is correct)
self._update_tab_visibility()
# Determine which tab to show based on force_tab parameter:
# - If force_tab="creation" and Creation tab exists, show it
# - If force_tab="processing" and Processing tab exists, show it
# - If force_tab="analysis" and Analysis tab has content, show it
# - Otherwise, always show Properties tab (default behavior)
if force_tab == "creation" and has_creation_tab:
self.tabwidget.setCurrentWidget(self.creation_scroll)
elif force_tab == "processing" and has_processing_tab:
self.tabwidget.setCurrentWidget(self.processing_scroll)
elif force_tab == "analysis" and has_analysis_parameters:
self.tabwidget.setCurrentWidget(self.analysis_parameters)
else:
# Default: always show Properties tab when switching objects
self.tabwidget.setCurrentWidget(self.properties)
[docs]
def mark_as_newly_created(self, obj: SignalObj | ImageObj) -> None:
"""Mark object to show Creation tab on next selection.
Args:
obj: Object to mark
"""
self._newly_created_obj_uuid = get_uuid(obj)
[docs]
def mark_as_freshly_processed(self, obj: SignalObj | ImageObj) -> None:
"""Mark object to show Processing tab on next selection.
Args:
obj: Object to mark
"""
self._fresh_processing_obj_uuid = get_uuid(obj)
[docs]
def mark_as_fresh_analysis(self, obj: SignalObj | ImageObj) -> None:
"""Mark object to show Analysis tab on next selection.
Args:
obj: Object to mark
"""
self._fresh_analysis_obj_uuid = get_uuid(obj)
[docs]
def get_changed_properties(self) -> dict[str, Any]:
"""Get dictionary of properties that have changed from original values.
Returns:
Dictionary mapping property names to their new values, containing only
the properties that were modified by the user.
"""
dataset = self.properties.dataset
changed = {}
# Get current values as a dictionary
current_values = {}
restore_dataset(dataset, current_values)
# Compare with original values
for key, current_value in current_values.items():
original_value = self.__original_values.get(key)
# Check if value has changed
if not self._values_equal(current_value, original_value):
changed[key] = current_value
return changed
[docs]
def update_original_values(self) -> None:
"""Update the stored original values to the current dataset values.
This should be called after applying changes to reset the baseline
for detecting future changes.
"""
dataset = self.properties.dataset
self.__original_values = {}
restore_dataset(dataset, self.__original_values)
@staticmethod
def _values_equal(val1: Any, val2: Any) -> bool:
"""Compare two values, handling special cases like numpy arrays.
Args:
val1: first value
val2: second value
Returns:
True if values are equal
"""
# Handle numpy arrays
if isinstance(val1, np.ndarray) or isinstance(val2, np.ndarray):
if not isinstance(val1, np.ndarray) or not isinstance(val2, np.ndarray):
return False
return np.array_equal(val1, val2)
# Handle regular comparison
return val1 == val2
[docs]
def setup_creation_tab(
self, obj: SignalObj | ImageObj, set_current: bool = False
) -> bool:
"""Setup the Creation tab with parameter editor for interactive object creation.
Args:
obj: Signal or Image object
set_current: If True, set the Creation tab as current after creation
Returns:
True if Creation tab was set up, False otherwise
"""
param = extract_creation_parameters(obj)
if param is None:
return False
# Create parameter editor widget using the actual parameter class
# (which is a subclass of NewSignalParam or NewImageParam)
editor = gdq.DataSetEditGroupBox(_("Creation Parameters"), param.__class__)
update_dataset(editor.dataset, param)
editor.get()
# Connect Apply button to recreation handler
editor.SIG_APPLY_BUTTON_CLICKED.connect(self.apply_creation_parameters)
editor.set_apply_button_state(False)
# Store reference to be able to retrieve it later
self.creation_param_editor = editor
self.current_creation_obj = obj
# Remove existing Creation tab if it exists
if self.creation_scroll is not None:
index = self.tabwidget.indexOf(self.creation_scroll)
if index >= 0:
self.tabwidget.removeTab(index)
# Set the parameter editor as the scroll area widget
# Creation tab is always at index 0 (before all other tabs)
self.creation_scroll = QW.QScrollArea()
self.creation_scroll.setWidgetResizable(True)
self.creation_scroll.setWidget(editor)
icon_name = "new_sig.svg" if isinstance(obj, SignalObj) else "new_ima.svg"
self.tabwidget.insertTab(
0, self.creation_scroll, get_icon(icon_name), _("Creation")
)
# Set as current tab if requested
if set_current:
self.tabwidget.setCurrentWidget(self.creation_scroll)
return True
[docs]
def apply_creation_parameters(self) -> None:
"""Apply creation parameters: recreate object with updated parameters."""
editor = self.creation_param_editor
if editor is None or self.current_creation_obj is None:
return
if isinstance(self.current_creation_obj, SignalObj):
otext = _("Signal was modified in-place.")
else:
otext = _("Image was modified in-place.")
text = f"⚠️ {otext} ⚠️ "
text += _(
"If computation were performed based on this object, "
"they may need to be redone."
)
self.panel.SIG_STATUS_MESSAGE.emit(text, 20000)
# Recreate object with new parameters
# (serialization is done automatically in create_signal/image_from_param)
param = editor.dataset
try:
if isinstance(self.current_creation_obj, SignalObj):
new_obj = create_signal_from_param(param)
else: # ImageObj
new_obj = create_image_from_param(param)
except Exception as exc: # pylint: disable=broad-exception-caught
if execenv.unattended:
raise exc
QW.QMessageBox.warning(
self,
_("Error"),
_("Failed to recreate object with new parameters:\n%s") % str(exc),
)
return
# Update the current object in-place
obj_uuid = get_uuid(self.current_creation_obj)
self.current_creation_obj.title = new_obj.title
if isinstance(self.current_creation_obj, SignalObj):
self.current_creation_obj.xydata = new_obj.xydata
else: # ImageObj
self.current_creation_obj.data = new_obj.data
# Invalidate ROI mask cache when image dimensions change
# (the mask is computed based on image shape, so it must be recomputed)
self.current_creation_obj.invalidate_maskdata_cache()
# Update metadata with new creation parameters
insert_creation_parameters(self.current_creation_obj, param)
# Auto-recompute analysis if the object had analysis parameters
# Since the data has changed, any analysis results are now invalid
# Use the processor for the current object's type
obj_processor = self.__get_processor_associated_to(self.current_creation_obj)
obj_processor.auto_recompute_analysis(self.current_creation_obj)
# Update the tree view item (to show new title if it changed)
self.panel.objview.update_item(obj_uuid)
# Refresh only the plot, not the entire panel
# (avoid calling selection_changed which would trigger a full refresh
# of the Properties tab and could cause recursion issues)
self.panel.refresh_plot(obj_uuid, update_items=True, force=True)
# Update the Properties tab to reflect the new object properties
# (e.g., data type, dimensions, etc.)
self.__update_properties_dataset(self.current_creation_obj)
# Refresh the Creation tab with the new parameters
# Use QTimer to defer this until after the current event is processed
# Set the Creation tab as current to keep it visible after refresh
QC.QTimer.singleShot(
0,
lambda: self.setup_creation_tab(
self.current_creation_obj, set_current=True
),
)
[docs]
def setup_processing_tab(
self,
obj: SignalObj | ImageObj,
reset_params: bool = True,
set_current: bool = False,
) -> bool:
"""Setup the Processing tab with parameter editor for re-processing.
Args:
obj: Signal or Image object
reset_params: If True, call update_from_obj() to reset parameters from
source object. If False, use parameters as stored in metadata.
set_current: If True, set the Processing tab as current after creation
Returns:
True if Processing tab was set up, False otherwise
"""
# Extract processing parameters
proc_params = extract_processing_parameters(obj)
if proc_params is None:
return False
# Check if the pattern type is 1-to-1 (only interactive pattern)
if proc_params.pattern != "1-to-1":
return False
# Store reference to be able to retrieve it later
self.current_processing_obj = obj
# Check if object has processing parameter
param = proc_params.param
if param is None:
return False
# Skip interactive processing for list of parameters
# (e.g., ROI extraction, erase operations)
if isinstance(param, list):
return False
# Eventually call the `update_from_obj` method to properly initialize
# the parameter object from the current object state.
# Only do this when reset_params is True (initial setup), not when
# refreshing after user has modified parameters.
if reset_params and hasattr(param, "update_from_obj"):
# Warning: the `update_from_obj` method takes the input object as argument,
# not the output object (`obj` is the processed object here):
# Retrieve the input object from the source UUID
if proc_params.source_uuid is not None:
source_obj = self.panel.mainwindow.find_object_by_uuid(
proc_params.source_uuid
)
if source_obj is not None:
param.update_from_obj(source_obj)
# Create parameter editor widget
editor = gdq.DataSetEditGroupBox(
_("Processing Parameters"), param.__class__, wordwrap=True
)
update_dataset(editor.dataset, param)
editor.get()
# Connect Apply button to reprocessing handler
editor.SIG_APPLY_BUTTON_CLICKED.connect(self.apply_processing_parameters)
editor.set_apply_button_state(False)
# Store reference to be able to retrieve it later
self.processing_param_editor = editor
# Remove existing Processing tab if it exists
if self.processing_scroll is not None:
index = self.tabwidget.indexOf(self.processing_scroll)
if index >= 0:
self.tabwidget.removeTab(index)
# Processing tab comes after Creation tab (if it exists)
# Find the correct insertion index: after Creation (index 0) if it exists,
# otherwise at index 0
has_creation = (
self.creation_scroll is not None
and self.tabwidget.indexOf(self.creation_scroll) >= 0
)
insert_index = 1 if has_creation else 0
# Create new processing scroll area and tab
self.processing_scroll = QW.QScrollArea()
self.processing_scroll.setWidgetResizable(True)
self.processing_scroll.setHorizontalScrollBarPolicy(QC.Qt.ScrollBarAlwaysOff)
self.processing_scroll.setSizePolicy(
QW.QSizePolicy.Expanding, QW.QSizePolicy.Preferred
)
self.processing_scroll.setWidget(editor)
self.tabwidget.insertTab(
insert_index,
self.processing_scroll,
get_icon("libre-tech-ram.svg"),
_("Processing"),
)
# Set as current tab if requested
if set_current:
self.tabwidget.setCurrentWidget(self.processing_scroll)
return True
def __get_processor_associated_to(
self, obj: SignalObj | ImageObj
) -> SignalProcessor | ImageProcessor:
"""Get the processor associated to the given object type.
Args:
obj: Signal or Image object
Returns:
Processor associated to the object's type
"""
assert isinstance(obj, (SignalObj, ImageObj))
if isinstance(obj, SignalObj):
return self.panel.mainwindow.signalpanel.processor
return self.panel.mainwindow.imagepanel.processor
[docs]
def apply_processing_parameters(
self, obj: SignalObj | ImageObj | None = None, interactive: bool = True
) -> ProcessingReport:
"""Apply processing parameters: re-run processing with updated parameters.
Args:
obj: Signal or Image object to reprocess. If None, uses the current object.
interactive: If True, show progress and error messages in the UI.
Returns:
ProcessingReport with success status, object UUID, and optional message.
"""
if execenv.unattended:
interactive = False
report = ProcessingReport(success=False)
editor = self.processing_param_editor
obj = obj or self.current_processing_obj
if obj is None:
report.message = _("No processing object available.")
return report
report.obj_uuid = get_uuid(obj)
# Extract processing parameters
proc_params = extract_processing_parameters(obj)
if proc_params is None:
report.message = _("Processing metadata is incomplete.")
if interactive:
QW.QMessageBox.critical(self, _("Error"), report.message)
return report
# Check if source object still exists
if proc_params.source_uuid is None:
report.message = _(
"Processing metadata is incomplete (missing source UUID)."
)
if interactive:
QW.QMessageBox.critical(self, _("Error"), report.message)
return report
# Find source object
source_obj = self.panel.mainwindow.find_object_by_uuid(proc_params.source_uuid)
if source_obj is None:
report.message = _("Source object no longer exists.")
if interactive:
QW.QMessageBox.critical(
self,
_("Error"),
report.message
+ "\n\n"
+ _(
"The object that was used to create this processed object "
"has been deleted and cannot be used for reprocessing."
),
)
return report
# Get updated parameters from editor
param = editor.dataset if editor is not None else proc_params.param
# For cross-panel computations, we need to use the processor from the panel
# that owns the source object (e.g., radial_profile is in ImageProcessor)
source_processor = self.__get_processor_associated_to(source_obj)
# Recompute using the dedicated method (with multiprocessing support)
try:
new_obj = source_processor.recompute_1_to_1(
proc_params.func_name, source_obj, param
)
except Exception as exc: # pylint: disable=broad-exception-caught
report.message = _("Failed to reprocess object:\n%s") % str(exc)
if interactive:
QW.QMessageBox.warning(self, _("Error"), report.message)
return report
if new_obj is None:
# User cancelled the operation
report.message = _("Processing was cancelled.")
else:
report.success = True
# Update the current object in-place with data from new object
obj.title = new_obj.title
if isinstance(obj, SignalObj):
obj.xydata = new_obj.xydata
else: # ImageObj
obj.data = new_obj.data
# Invalidate ROI mask cache when image dimensions may have changed
# (the mask is computed based on image shape, so it must be recomputed)
obj.invalidate_maskdata_cache()
# Update metadata with new processing parameters
updated_proc_params = ProcessingParameters(
func_name=proc_params.func_name,
pattern=proc_params.pattern,
param=param,
source_uuid=proc_params.source_uuid,
)
insert_processing_parameters(obj, updated_proc_params)
# Auto-recompute analysis if the object had analysis parameters
# Since the data has changed, any analysis results are now invalid
# Use the processor for the current object's type (not source object's type)
obj_processor = self.__get_processor_associated_to(obj)
obj_processor.auto_recompute_analysis(obj)
# Update the tree view item and refresh plot
obj_uuid = get_uuid(obj)
self.panel.objview.update_item(obj_uuid)
self.panel.refresh_plot(obj_uuid, update_items=True, force=True)
# Update the Properties tab to reflect the new object properties
# (e.g., data type, dimensions, etc.)
self.__update_properties_dataset(obj)
# Refresh the Processing tab with the new parameters
# Don't reset parameters from source object - keep the user's values
# Set the Processing tab as current to keep it visible after refresh
QC.QTimer.singleShot(
0,
lambda: self.setup_processing_tab(
obj, reset_params=False, set_current=True
),
)
if isinstance(obj, SignalObj):
report.message = _("Signal was reprocessed.")
else:
report.message = _("Image was reprocessed.")
self.panel.SIG_STATUS_MESSAGE.emit("✅ " + report.message, 5000)
return report
[docs]
class AbstractPanel(QW.QSplitter, metaclass=AbstractPanelMeta):
"""Object defining DataLab panel interface,
based on a vertical QSplitter widget
A panel handle an object list (objects are signals, images, macros...).
Each object must implement ``datalab.gui.ObjItf`` interface
"""
H5_PREFIX = ""
SIG_OBJECT_ADDED = QC.Signal()
SIG_OBJECT_REMOVED = QC.Signal()
@abc.abstractmethod
def __init__(self, parent):
super().__init__(QC.Qt.Vertical, parent)
self.setObjectName(self.__class__.__name__[0].lower())
# Check if the class implements __len__, __getitem__ and __iter__
for method in ("__len__", "__getitem__", "__iter__"):
if not hasattr(self, method):
raise NotImplementedError(
f"Class {self.__class__.__name__} must implement method {method}"
)
# pylint: disable=unused-argument
[docs]
def get_serializable_name(self, obj: ObjItf) -> str:
"""Return serializable name of object"""
title = re.sub("[^-a-zA-Z0-9_.() ]+", "", obj.title.replace("/", "_"))
name = f"{get_short_id(obj)}: {title}"
return name
[docs]
def serialize_object_to_hdf5(self, obj: ObjItf, writer: NativeH5Writer) -> None:
"""Serialize object to HDF5 file"""
with writer.group(self.get_serializable_name(obj)):
obj.serialize(writer)
[docs]
def deserialize_object_from_hdf5(
self, reader: NativeH5Reader, name: str, reset_all: bool = False
) -> ObjItf:
"""Deserialize object from a HDF5 file
Args:
reader: HDF5 reader
name: Object name in HDF5 file
reset_all: If True, preserve original UUIDs (workspace reload).
If False, regenerate UUIDs (importing objects).
"""
with reader.group(name):
obj = self.create_object()
obj.deserialize(reader)
# Only regenerate UUIDs when importing objects (reset_all=False).
# When reopening a workspace (reset_all=True), preserve original UUIDs
# so that processing parameter references (source_uuid, source_uuids)
# remain valid and features like "Show source" and "Recompute" work.
# When importing, only regenerate UUID if it conflicts with an existing one.
if not reset_all and isinstance(obj, (SignalObj, ImageObj, ObjectGroup)):
if self.objmodel.has_uuid(get_uuid(obj)):
set_uuid(obj)
return obj
[docs]
@abc.abstractmethod
def serialize_to_hdf5(self, writer: NativeH5Writer) -> None:
"""Serialize whole panel to a HDF5 file"""
[docs]
@abc.abstractmethod
def deserialize_from_hdf5(
self, reader: NativeH5Reader, reset_all: bool = False
) -> None:
"""Deserialize whole panel from a HDF5 file
Args:
reader: HDF5 reader
reset_all: If True, preserve original UUIDs (workspace reload).
If False, regenerate UUIDs (importing objects).
"""
[docs]
@abc.abstractmethod
def create_object(self) -> ObjItf:
"""Create and return object"""
[docs]
@abc.abstractmethod
def add_object(self, obj: ObjItf) -> None:
"""Add object to panel"""
[docs]
@abc.abstractmethod
def remove_all_objects(self):
"""Remove all objects"""
self.SIG_OBJECT_REMOVED.emit()
[docs]
class NonModalInfoDialog(QW.QMessageBox):
"""Non-modal information message box with selectable text.
This widget displays an information message in a message dialog box, allowing users
to select and copy the text content.
"""
def __init__(self, parent: QW.QWidget, title: str, text: str) -> None:
"""Create a non-modal information message box with selectable text.
Args:
parent: The parent widget.
title: The title of the message box.
text: The text to display in the message box.
"""
super().__init__(parent)
self.setIcon(QW.QMessageBox.Information)
self.setWindowTitle(title)
if re.search(r"<[a-zA-Z/][^>]*>", text):
self.setTextFormat(QC.Qt.RichText) # type: ignore[attr-defined]
self.setTextInteractionFlags(
QC.Qt.TextBrowserInteraction # type: ignore[attr-defined]
)
else:
self.setTextFormat(QC.Qt.PlainText) # type: ignore[attr-defined]
self.setTextInteractionFlags(
QC.Qt.TextSelectableByMouse # type: ignore[attr-defined]
| QC.Qt.TextSelectableByKeyboard # type: ignore[attr-defined]
)
self.setText(text)
self.setStandardButtons(QW.QMessageBox.Close)
self.setDefaultButton(QW.QMessageBox.Close)
# ! Necessary only on non-Windows platforms
self.setWindowFlags(QC.Qt.Window) # type: ignore[attr-defined]
self.setModal(False)
[docs]
class SaveToDirectoryGUIParam(gds.DataSet, title=_("Save to directory")):
"""Save to directory parameters"""
def __init__(
self, objs: list[TypeObj] | None = None, extensions: list[str] | None = None
) -> None:
super().__init__()
self.__objs = objs or []
self.__extensions = extensions or []
[docs]
def get_extension_choices(self, _item=None, _value=None):
"""Return list of available extensions for choice item."""
return [("." + ext, "." + ext, None) for ext in self.__extensions]
[docs]
def build_filenames(self, objs: list[TypeObj] | None = None) -> list[str]:
"""Build filenames according to current parameters."""
objs = objs or self.__objs
extension = self.extension if self.extension is not None else ""
filenames = format_basenames(objs, self.basename + extension)
used: set[str] = set() # Ensure all filenames are unique.
for i, filename in enumerate(filenames):
root, ext = osp.splitext(filename)
filepath = osp.join(self.directory, filename)
k = 1
while (filename in used) or (not self.overwrite and osp.exists(filepath)):
filename = f"{root}_{k}{ext}"
filepath = osp.join(self.directory, filename)
k += 1
used.add(filename)
filenames[i] = filename
return filenames
[docs]
def generate_filepath_obj_pairs(
self, objs: list[TypeObj]
) -> Generator[tuple[str, TypeObj], None, None]:
"""Iterate over (filepath, object) pairs to be saved."""
for filename, obj in zip(self.build_filenames(objs), objs):
yield osp.join(self.directory, filename), obj
[docs]
def update_preview(self, _item=None, _value=None) -> None:
"""Update preview."""
try:
filenames = self.build_filenames()
preview_lines = []
for i, (obj, filename) in enumerate(zip(self.__objs, filenames), start=1):
# Try to get short ID if object has been added to panel
try:
obj_id = get_short_id(obj)
except (ValueError, KeyError):
# Fallback to simple index for objects not yet in panel
obj_id = str(i)
preview_lines.append(f"{obj_id}: {filename}")
self.preview = "\n".join(preview_lines)
except (ValueError, KeyError, TypeError) as exc:
# Handle formatting errors gracefully (e.g., incomplete format string)
self.preview = f"Invalid pattern:{os.linesep}{exc}"
directory = gds.DirectoryItem(_("Directory"), default=Conf.main.base_dir.get())
basename = gds.StringItem(
_("Basename pattern"),
default="{title}",
help=_("Python format string. See description for details."),
).set_prop("display", callback=update_preview)
help = gds.ButtonItem(_("Help"), on_button_click, "MessageBoxInformation").set_pos(
col=1
)
extension = gds.ChoiceItem(_("Extension"), get_extension_choices).set_prop(
"display", callback=update_preview
)
overwrite = gds.BoolItem(
_("Overwrite"), default=False, help=_("Overwrite existing files")
).set_pos(col=1)
preview = gds.TextItem(
_("Preview"), default=None, regexp=r"^(?!Invalid).*"
).set_prop("display", readonly=True)
[docs]
class BaseDataPanel(AbstractPanel, Generic[TypeObj, TypeROI, TypeROIEditor]):
"""Object handling the item list, the selected item properties and plot"""
PANEL_STR = "" # e.g. "Signal Panel"
PANEL_STR_ID = "" # e.g. "signal"
PARAMCLASS: TypeObj = None # Replaced in child object
ANNOTATION_TOOLS = ()
MINDIALOGSIZE = (800, 600)
MAXDIALOGSIZE = 0.95 # % of DataLab's main window size
# Replaced by the right class in child object:
IO_REGISTRY: SignalIORegistry | ImageIORegistry | None = None
SIG_STATUS_MESSAGE = QC.Signal(str, int) # emitted by "qt_try_except" decorator
SIG_REFRESH_PLOT = QC.Signal(
str, bool, bool, bool, bool
) # Connected to PlotHandler.refresh_plot
[docs]
@staticmethod
@abc.abstractmethod
def get_roi_class() -> Type[TypeROI]:
"""Return ROI class"""
[docs]
@staticmethod
@abc.abstractmethod
def get_roieditor_class() -> Type[TypeROIEditor]:
"""Return ROI editor class"""
@abc.abstractmethod
def __init__(self, parent: QW.QWidget) -> None:
super().__init__(parent)
self.mainwindow: DLMainWindow = parent
self.objprop = ObjectProp(self, self.PARAMCLASS)
self.objmodel = objectmodel.ObjectModel(f"g{self.PARAMCLASS.PREFIX}")
self.objview = objectview.ObjectView(self, self.objmodel)
self.objview.SIG_IMPORT_FILES.connect(self.handle_dropped_files)
self.objview.populate_tree()
self.plothandler: SignalPlotHandler | ImagePlotHandler = None
self.processor: SignalProcessor | ImageProcessor = None
self.acthandler: actionhandler.BaseActionHandler = None
self.metadata_clipboard = {}
self.annotations_clipboard: list[dict[str, Any]] = []
self.__roi_clipboard: TypeROI | None = None
self.context_menu = QW.QMenu()
self.__separate_views: dict[QW.QDialog, TypeObj] = {}
[docs]
def closeEvent(self, event):
"""Reimplement QMainWindow method"""
self.processor.close()
super().closeEvent(event)
# ------AbstractPanel interface-----------------------------------------------------
[docs]
def plot_item_parameters_changed(
self, item: CurveItem | MaskedXYImageItem | LabelItem
) -> None:
"""Plot items changed: update metadata of all objects from plot items"""
# Find the object corresponding to the plot item
obj = self.plothandler.get_obj_from_item(item)
if obj is not None:
# Unselect the item in the plot so that we update the item parameters
# in the right state (fix issue #184):
item.unselect()
# Ensure that item's parameters are up-to-date:
item.param.update_param(item)
# Update object metadata from plot item parameters
create_adapter_from_object(obj).update_metadata_from_plot_item(item)
if obj is self.objview.get_current_object():
self.objprop.update_properties_from(obj)
self.plothandler.update_resultproperty_from_plot_item(item)
# pylint: disable=unused-argument
[docs]
def plot_item_moved(
self, item: LabelItem, x0: float, y0: float, x1: float, y1: float
) -> None:
"""Plot item moved: update metadata of all objects from plot items
Args:
item: Plot item
x0: new x0 coordinate
y0: new y0 coordinate
x1: new x1 coordinate
y1: new y1 coordinate
"""
self.plothandler.update_resultproperty_from_plot_item(item)
[docs]
def serialize_object_to_hdf5(self, obj: TypeObj, writer: NativeH5Writer) -> None:
"""Serialize object to HDF5 file"""
# Before serializing, update metadata from plot item parameters, in order to
# save the latest visualization settings:
try:
item = self.plothandler[get_uuid(obj)]
create_adapter_from_object(obj).update_metadata_from_plot_item(item)
except KeyError:
# Plot item has not been created yet (this happens when auto-refresh has
# been disabled)
pass
super().serialize_object_to_hdf5(obj, writer)
[docs]
def serialize_to_hdf5(self, writer: NativeH5Writer) -> None:
"""Serialize whole panel to a HDF5 file"""
with writer.group(self.H5_PREFIX):
for group in self.objmodel.get_groups():
with writer.group(self.get_serializable_name(group)):
with writer.group("title"):
writer.write_str(group.title)
for obj in group.get_objects():
self.serialize_object_to_hdf5(obj, writer)
[docs]
def deserialize_from_hdf5(
self, reader: NativeH5Reader, reset_all: bool = False
) -> None:
"""Deserialize whole panel from a HDF5 file
Args:
reader: HDF5 reader
reset_all: If True, preserve original UUIDs (workspace reload).
If False, regenerate UUIDs (importing objects).
"""
with reader.group(self.H5_PREFIX):
for name in reader.h5.get(self.H5_PREFIX, []):
with reader.group(name):
group = self.add_group("")
with reader.group("title"):
group.title = reader.read_str()
for obj_name in reader.h5.get(f"{self.H5_PREFIX}/{name}", []):
obj = self.deserialize_object_from_hdf5(
reader, obj_name, reset_all
)
self.add_object(obj, get_uuid(group), set_current=False)
self.selection_changed()
def __len__(self) -> int:
"""Return number of objects"""
return len(self.objmodel)
def __getitem__(self, nb: int) -> TypeObj:
"""Return object from its number (1 to N)"""
return self.objmodel.get_object_from_number(nb)
def __iter__(self):
"""Iterate over objects"""
return iter(self.objmodel)
[docs]
def create_object(self) -> TypeObj:
"""Create object (signal or image)
Returns:
SignalObj or ImageObj object
"""
return self.PARAMCLASS() # pylint: disable=not-callable
[docs]
@qt_try_except()
def add_object(
self,
obj: TypeObj,
group_id: str | None = None,
set_current: bool = True,
) -> None:
"""Add object
Args:
obj: SignalObj or ImageObj object
group_id: group id to which the object belongs. If None or empty string,
the object is added to the current group.
set_current: if True, set the added object as current
"""
if obj in self.objmodel:
# Prevent adding the same object twice
raise ValueError(
f"Object {hex(id(obj))} already in panel. "
f"The same object cannot be added twice: "
f"please use a copy of the object."
)
if group_id is None or group_id == "":
group_id = self.objview.get_current_group_id()
if group_id is None:
groups = self.objmodel.get_groups()
if groups:
group_id = get_uuid(groups[0])
else:
group_id = get_uuid(self.add_group(""))
obj.check_data()
self.objmodel.add_object(obj, group_id)
# Mark this object as newly created to show Creation tab on first selection
# BUT: Don't overwrite if this object is already marked as freshly processed
# or has fresh analysis results (those take precedence)
obj_uuid = get_uuid(obj)
if (
obj_uuid != self.objprop._fresh_processing_obj_uuid
and obj_uuid != self.objprop._fresh_analysis_obj_uuid
):
self.objprop.mark_as_newly_created(obj)
# Block signals to avoid updating the plot (unnecessary refresh)
self.objview.blockSignals(True)
self.objview.add_object_item(obj, group_id, set_current=set_current)
self.objview.blockSignals(False)
# Emit signal to ensure that the data panel is shown in the main window and
# that the plot is updated (trigger a refresh of the plot)
self.SIG_OBJECT_ADDED.emit()
self.objview.update_tree()
[docs]
def remove_all_objects(self) -> None:
"""Remove all objects"""
# iterate over a copy of self.__separate_views dict keys to avoid RuntimeError:
# dictionary changed size during iteration
for dlg in list(self.__separate_views):
dlg.done(QW.QDialog.DialogCode.Rejected)
self.objmodel.clear()
self.plothandler.clear()
self.objview.populate_tree()
self.refresh_plot("selected", True, False)
super().remove_all_objects()
# Update object properties panel to clear creation/processing tabs
self.selection_changed()
# ---- Signal/Image Panel API ------------------------------------------------------
[docs]
def setup_panel(self) -> None:
"""Setup panel"""
self.acthandler.create_all_actions()
self.processor.SIG_ADD_SHAPE.connect(self.plothandler.add_shapes)
self.SIG_REFRESH_PLOT.connect(self.plothandler.refresh_plot)
self.objview.SIG_SELECTION_CHANGED.connect(self.selection_changed)
self.objview.SIG_ITEM_DOUBLECLICKED.connect(
lambda oid: self.open_separate_view([oid])
)
self.objview.SIG_CONTEXT_MENU.connect(self.__popup_contextmenu)
self.objprop.properties.SIG_APPLY_BUTTON_CLICKED.connect(
self.properties_changed
)
self.addWidget(self.objview)
self.addWidget(self.objprop)
[docs]
def refresh_plot(
self,
what: str,
update_items: bool = True,
force: bool = False,
only_visible: bool = True,
only_existing: bool = False,
) -> None:
"""Refresh plot.
Args:
what: string describing the objects to refresh.
Valid values are "selected" (refresh the selected objects),
"all" (refresh all objects), "existing" (refresh existing plot items),
or an object uuid.
update_items: if True, update the items.
If False, only show the items (do not update them).
Defaults to True.
force: if True, force refresh even if auto refresh is disabled.
Defaults to False.
only_visible: if True, only refresh visible items. Defaults to True.
Visible items are the ones that are not hidden by other items or the items
except the first one if the option "Show first only" is enabled.
This is useful for images, where the last image is the one that is shown.
If False, all items are refreshed.
only_existing: if True, only refresh existing items. Defaults to False.
Existing items are the ones that have already been created and are
associated to the object uuid. If False, create new items for the
objects that do not have an item yet.
Raises:
ValueError: if `what` is not a valid value
"""
if what not in ("selected", "all", "existing") and not isinstance(what, str):
raise ValueError(f"Invalid value for 'what': {what}")
self.SIG_REFRESH_PLOT.emit(
what, update_items, force, only_visible, only_existing
)
[docs]
def manual_refresh(self) -> None:
"""Manual refresh"""
self.refresh_plot("selected", True, True)
[docs]
def get_category_actions(
self, category: actionhandler.ActionCategory
) -> list[QW.QAction]: # pragma: no cover
"""Return actions for category"""
return self.acthandler.feature_actions.get(category, [])
def __popup_contextmenu(self, position: QC.QPoint) -> None: # pragma: no cover
"""Popup context menu at position"""
menu = self.get_context_menu()
menu.popup(position)
# ------Creating, adding, removing objects------------------------------------------
[docs]
def add_group(self, title: str, select: bool = False) -> objectmodel.ObjectGroup:
"""Add group
Args:
title: group title
select: if True, select the group in the tree view. Defaults to False.
Returns:
Created group object
"""
group = self.objmodel.add_group(title)
self.objview.add_group_item(group)
if select:
self.objview.select_groups([group])
return group
def __duplicate_individual_obj(
self, oid: str, new_group_id: str | None = None, set_current: bool = True
) -> None:
"""Duplicate individual object"""
obj = self.objmodel[oid]
if new_group_id is None:
new_group_id = self.objmodel.get_object_group_id(obj)
self.add_object(obj.copy(), group_id=new_group_id, set_current=set_current)
[docs]
def duplicate_object(self) -> None:
"""Duplication signal/image object"""
if not self.mainwindow.confirm_memory_state():
return
# Duplicate individual objects (exclusive with respect to groups)
for oid in self.objview.get_sel_object_uuids():
self.__duplicate_individual_obj(oid, set_current=False)
# Duplicate groups (exclusive with respect to individual objects)
for group in self.objview.get_sel_groups():
new_group = self.add_group(group.title)
for oid in self.objmodel.get_group_object_ids(get_uuid(group)):
self.__duplicate_individual_obj(
oid, get_uuid(new_group), set_current=False
)
self.selection_changed(update_items=True)
def _rename_results_in_clipboard(self, prefix: str) -> None:
"""Rename geometry and table results in clipboard to avoid conflicts.
Args:
prefix: Prefix to add to result titles
"""
for aclass in (GeometryAdapter, TableAdapter):
result_keys = [
k for k, v in self.metadata_clipboard.items() if aclass.match(k, v)
]
for dict_key in result_keys:
try:
# Get the result data
result_data = self.metadata_clipboard[dict_key]
# Update the title in the result data
if isinstance(result_data, dict) and "title" in result_data:
result_data = result_data.copy() # Don't modify original
result_data["title"] = prefix + result_data["title"]
# Create new key with updated title
new_dict_key = dict_key.replace(
aclass.META_PREFIX, aclass.META_PREFIX + prefix, 1
)
# Remove old entry and add new one
del self.metadata_clipboard[dict_key]
self.metadata_clipboard[new_dict_key] = result_data
except (KeyError, ValueError, IndexError, TypeError):
# If we can't process this result, leave it as is
continue
[docs]
def copy_roi(self) -> None:
"""Copy regions of interest"""
obj = self.objview.get_sel_objects()[0]
self.__roi_clipboard = obj.roi.copy()
[docs]
def paste_roi(self) -> None:
"""Paste regions of interest"""
sel_objects = self.objview.get_sel_objects(include_groups=True)
for obj in sel_objects:
if obj.roi is None:
obj.roi = self.__roi_clipboard.copy()
else:
obj.roi = obj.roi.combine_with(self.__roi_clipboard)
self.selection_changed(update_items=True)
self.refresh_plot(
"selected", update_items=True, only_visible=False, only_existing=True
)
[docs]
def remove_object(self, force: bool = False) -> None:
"""Remove signal/image object
Args:
force: if True, remove object without confirmation. Defaults to False.
"""
sel_groups = self.objview.get_sel_groups()
if sel_groups and not force and not execenv.unattended:
answer = QW.QMessageBox.warning(
self,
_("Delete group(s)"),
_("Are you sure you want to delete the selected group(s)?"),
QW.QMessageBox.Yes | QW.QMessageBox.No,
)
if answer == QW.QMessageBox.No:
return
sel_objects = self.objview.get_sel_objects(include_groups=True)
for obj in sorted(sel_objects, key=get_short_id, reverse=True):
dlg_list: list[QW.QDialog] = []
for dlg, obj_i in self.__separate_views.items():
if obj_i is obj:
dlg_list.append(dlg)
for dlg in dlg_list:
dlg.done(QW.QDialog.DialogCode.Rejected)
self.plothandler.remove_item(get_uuid(obj))
self.objview.remove_item(get_uuid(obj), refresh=False)
self.objmodel.remove_object(obj)
for group in sel_groups:
self.objview.remove_item(get_uuid(group), refresh=False)
self.objmodel.remove_group(group)
self.objview.update_tree()
self.selection_changed(update_items=True)
self.SIG_OBJECT_REMOVED.emit()
[docs]
def delete_all_objects(self) -> None: # pragma: no cover
"""Confirm before removing all objects"""
if len(self) == 0:
return
if execenv.unattended:
raise RuntimeError(
"This method should not be executed in unattended mode: "
"call remove_all_objects instead."
)
answer = QW.QMessageBox.warning(
self,
_("Delete all"),
_("Do you want to delete all objects (%s)?") % self.PANEL_STR,
QW.QMessageBox.Yes | QW.QMessageBox.No,
)
if answer == QW.QMessageBox.Yes:
self.remove_all_objects()
[docs]
def add_annotations_from_items(
self, items: list, refresh_plot: bool = True
) -> None:
"""Add object annotations (annotation plot items).
Args:
items: annotation plot items
refresh_plot: refresh plot. Defaults to True.
"""
for obj in self.objview.get_sel_objects(include_groups=True):
create_adapter_from_object(obj).add_annotations_from_items(items)
if refresh_plot:
self.refresh_plot("selected", True, False)
[docs]
def copy_titles_to_clipboard(self) -> None:
"""Copy object titles to clipboard (for reproducibility)"""
QW.QApplication.clipboard().setText(str(self.objview))
[docs]
def new_group(self) -> None:
"""Create a new group"""
# Open a message box to enter the group name
group_name, ok = QW.QInputDialog.getText(self, _("New group"), _("Group name:"))
if ok:
self.add_group(group_name)
[docs]
def rename_selected_object_or_group(self, new_name: str | None = None) -> None:
"""Rename selected object or group
Args:
new_name: new name (default: None, i.e. ask user)
"""
sel_objects = self.objview.get_sel_objects(include_groups=False)
sel_groups = self.objview.get_sel_groups()
if (not sel_objects and not sel_groups) or len(sel_objects) + len(
sel_groups
) > 1:
# Won't happen in the application, but could happen in tests or using the
# API directly
raise ValueError("Select one object or group to rename")
if sel_objects:
obj = sel_objects[0]
if new_name is None:
new_name, ok = QW.QInputDialog.getText(
self,
_("Rename object"),
_("Object name:"),
QW.QLineEdit.Normal,
obj.title,
)
if not ok:
return
obj.title = new_name
self.objview.update_item(get_uuid(obj))
self.objprop.update_properties_from(obj)
elif sel_groups:
group = sel_groups[0]
if new_name is None:
new_name, ok = QW.QInputDialog.getText(
self,
_("Rename group"),
_("Group name:"),
QW.QLineEdit.Normal,
group.title,
)
if not ok:
return
group.title = new_name
self.objview.update_item(get_uuid(group))
[docs]
@abc.abstractmethod
def get_newparam_from_current(
self, newparam: NewSignalParam | NewImageParam | None = None
) -> NewSignalParam | NewImageParam | None:
"""Get new object parameters from the current object.
Args:
newparam: new object parameters. If None, create a new one.
Returns:
New object parameters
"""
[docs]
@abc.abstractmethod
def new_object(
self,
param: NewSignalParam | NewImageParam | None = None,
edit: bool = False,
add_to_panel: bool = True,
) -> TypeObj | None:
"""Create a new object (signal/image).
Args:
param: new object parameters
edit: Open a dialog box to edit parameters (default: False).
When False, the object is created with default parameters and creation
parameters are stored in metadata for interactive editing.
add_to_panel: Add object to panel (default: True)
Returns:
New object
"""
[docs]
def set_current_object_title(self, title: str) -> None:
"""Set current object title"""
obj = self.objview.get_current_object()
obj.title = title
self.objview.update_item(get_uuid(obj))
def __load_from_file(
self, filename: str, create_group: bool = True, add_objects: bool = True
) -> list[SignalObj] | list[ImageObj]:
"""Open objects from file (signal/image), add them to DataLab and return them.
Args:
filename: file name
create_group: if True, create a new group if more than one object is loaded.
Defaults to True. If False, all objects are added to the current group.
add_objects: if True, add objects to the panel. Defaults to True.
Returns:
New object or list of new objects
"""
worker = CallbackWorker(lambda worker: self.IO_REGISTRY.read(filename, worker))
objs = qt_long_callback(self, _("Reading objects from file"), worker, True)
group_id = None
if len(objs) > 1 and create_group:
# Create a new group if more than one object is loaded
group_id = get_uuid(self.add_group(osp.basename(filename)))
with create_progress_bar(
self, _("Adding objects to workspace"), max_=len(objs) - 1
) as progress:
for i_obj, obj in enumerate(objs):
progress.setValue(i_obj + 1)
if progress.wasCanceled():
break
if add_objects:
set_uuid(obj) # In case the object UUID was serialized in the file,
# we need to reset it to a new UUID to avoid conflicts
# (e.g. HDF5 file)
self.add_object(obj, group_id=group_id, set_current=obj is objs[-1])
self.selection_changed()
return objs
def __save_to_file(self, obj: TypeObj, filename: str) -> None:
"""Save object to file (signal/image).
Args:
obj: object
filename: file name
"""
self.IO_REGISTRY.write(filename, obj)
[docs]
def load_from_directory(self, directory: str | None = None) -> list[TypeObj]:
"""Open objects from directory (signals or images, depending on the panel),
add them to DataLab and return them.
If the directory is not specified, ask the user to select a directory.
Args:
directory: directory name
Returns:
list of new objects
"""
if not self.mainwindow.confirm_memory_state():
return []
if directory is None: # pragma: no cover
basedir = Conf.main.base_dir.get()
with save_restore_stds():
directory = getexistingdirectory(self, _("Open"), basedir)
if not directory:
return []
folders = [
path
for path in glob.glob(osp.join(directory, "**"), recursive=True)
if osp.isdir(path) and len(os.listdir(path)) > 0
]
objs = []
with create_progress_bar(
self, _("Scanning directory"), max_=len(folders) - 1
) as progress:
# Iterate over all subfolders in the directory:
for i_path, path in enumerate(folders):
progress.setValue(i_path + 1)
if progress.wasCanceled():
break
path = osp.normpath(path)
fnames = sorted(
[
osp.join(path, fname)
for fname in os.listdir(path)
if osp.isfile(osp.join(path, fname))
]
)
new_objs = self.load_from_files(
fnames,
create_group=False,
add_objects=False,
ignore_errors=True,
)
if new_objs:
objs += new_objs
grp_name = osp.relpath(path, directory)
if grp_name == ".":
grp_name = osp.basename(path)
grp = self.add_group(grp_name)
for obj in new_objs:
self.add_object(obj, group_id=get_uuid(grp), set_current=False)
return objs
[docs]
def load_from_files(
self,
filenames: list[str] | None = None,
create_group: bool = False,
add_objects: bool = True,
ignore_errors: bool = False,
) -> list[TypeObj]:
"""Open objects from file (signals/images), add them to DataLab and return them.
Args:
filenames: File names
create_group: if True, create a new group if more than one object is loaded
for a single file. Defaults to False: all objects are added to the current
group.
add_objects: if True, add objects to the panel. Defaults to True.
ignore_errors: if True, ignore errors when loading files. Defaults to False.
Returns:
list of new objects
"""
if not self.mainwindow.confirm_memory_state():
return []
if filenames is None: # pragma: no cover
basedir = Conf.main.base_dir.get()
filters = self.IO_REGISTRY.get_read_filters()
with save_restore_stds():
filenames, _filt = getopenfilenames(self, _("Open"), basedir, filters)
# Sort filenames to ensure consistent alphabetical order across all platforms
filenames = sorted(filenames)
objs = []
for filename in filenames:
with qt_try_loadsave_file(self.parentWidget(), filename, "load"):
Conf.main.base_dir.set(filename)
try:
objs += self.__load_from_file(
filename, create_group=create_group, add_objects=add_objects
)
except Exception as exc: # pylint: disable=broad-exception-caught
if ignore_errors:
# Ignore unknown file types
pass
else:
raise exc
return objs
[docs]
def save_to_files(self, filenames: list[str] | str | None = None) -> None:
"""Save selected objects to files (signal/image).
Args:
filenames: File names
"""
objs = self.objview.get_sel_objects(include_groups=True)
if filenames is None: # pragma: no cover
filenames = [None] * len(objs)
assert len(filenames) == len(objs), (
"Number of filenames must match number of objects"
)
for index, obj in enumerate(objs):
filename = filenames[index]
if filename is None:
basedir = Conf.main.base_dir.get()
filters = self.IO_REGISTRY.get_write_filters()
with save_restore_stds():
filename, _filt = getsavefilename(
self, _("Save as"), basedir, filters
)
if filename:
with qt_try_loadsave_file(self.parentWidget(), filename, "save"):
Conf.main.base_dir.set(filename)
self.__save_to_file(obj, filename)
[docs]
def save_to_directory(self, param: SaveToDirectoryParam | None = None) -> None:
"""Save signals or images to directory using a filename pattern.
Opens a dialog to select the output directory, the basename pattern and the
extension.
Args:
param: parameters.
"""
objs = self.objview.get_sel_objects(include_groups=True)
if param is None:
extensions = get_file_extensions(self.IO_REGISTRY.get_write_filters())
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=gds.DataItemValidationWarning)
guiparam = SaveToDirectoryGUIParam(objs, extensions)
# Restore settings from config
saved_param = Conf.io.save_to_directory_settings.get(
default=SaveToDirectoryParam()
)
update_dataset(guiparam, saved_param)
# Validate extension: set to first if None or not in available list
# Note: extensions list has no dots, but guiparam.extension has dot
extensions_with_dot = ["." + ext for ext in extensions]
if (
guiparam.extension is None
or guiparam.extension not in extensions_with_dot
):
guiparam.extension = extensions_with_dot[0] if extensions else ""
if not guiparam.edit(parent=self.parentWidget()):
return
param = SaveToDirectoryParam()
update_dataset(param, guiparam)
# Save settings to config
Conf.io.save_to_directory_settings.set(param)
Conf.main.base_dir.set(param.directory)
with create_progress_bar(self, _("Saving..."), max_=len(objs)) as progress:
for i, (path, obj) in enumerate(param.generate_filepath_obj_pairs(objs)):
progress.setValue(i + 1)
if progress.wasCanceled():
break
with qt_try_loadsave_file(self.parentWidget(), path, "save"):
self.__save_to_file(obj, path)
[docs]
def handle_dropped_files(self, filenames: list[str] | None = None) -> None:
"""Handle dropped files
Args:
filenames: File names
Returns:
None
"""
dirnames = [fname for fname in filenames if osp.isdir(fname)]
h5_fnames = [
fname for fname in filenames if is_hdf5_file(fname, check_content=True)
]
other_fnames = sorted(list(set(filenames) - set(h5_fnames) - set(dirnames)))
if dirnames:
for dirname in dirnames:
self.load_from_directory(dirname)
if h5_fnames:
self.mainwindow.open_h5_files(h5_fnames, import_all=True)
if other_fnames:
self.load_from_files(other_fnames)
[docs]
def exec_import_wizard(self) -> None:
"""Execute import wizard"""
wizard = TextImportWizard(self.PANEL_STR_ID, parent=self.parentWidget())
if exec_dialog(wizard):
objs = wizard.get_objs()
if objs:
with create_progress_bar(
self, _("Adding objects to workspace"), max_=len(objs) - 1
) as progress:
for idx, obj in enumerate(objs):
progress.setValue(idx)
QW.QApplication.processEvents()
if progress.wasCanceled():
break
self.add_object(obj)
[docs]
def copy_annotations(self) -> None:
"""Copy annotations from selected object"""
obj = self.objview.get_sel_objects(include_groups=True)[0]
self.annotations_clipboard = obj.get_annotations()
# Update action states (e.g., "Paste annotations" should now be enabled)
self.selection_changed()
[docs]
def paste_annotations(self) -> None:
"""Paste annotations to selected object(s)"""
if not self.annotations_clipboard:
return
sel_objects = self.objview.get_sel_objects(include_groups=True)
for obj in sel_objects:
obj.set_annotations(self.annotations_clipboard)
self.refresh_plot("selected", True, False)
# Update action states (e.g., annotation-related actions should now be enabled)
self.selection_changed()
[docs]
def import_annotations_from_file(self, filename: str | None = None) -> None:
"""Import annotations from file (JSON).
Args:
filename: File name
"""
if filename is None: # pragma: no cover
basedir = Conf.main.base_dir.get()
with save_restore_stds():
filename, _filter = getopenfilename(
self, _("Import annotations"), basedir, "*.dlabann"
)
if filename:
with qt_try_loadsave_file(self.parentWidget(), filename, "load"):
Conf.main.base_dir.set(filename)
obj = self.objview.get_sel_objects(include_groups=True)[0]
annotations = read_annotations(filename)
obj.set_annotations(annotations)
self.refresh_plot("selected", True, False)
# Update action states (annotation-related actions should now be enabled)
self.selection_changed()
[docs]
def export_annotations_from_file(self, filename: str | None = None) -> None:
"""Export annotations to file (JSON).
Args:
filename: File name
"""
obj = self.objview.get_sel_objects(include_groups=True)[0]
if filename is None: # pragma: no cover
basedir = Conf.main.base_dir.get()
with save_restore_stds():
filename, _filt = getsavefilename(
self, _("Export annotations"), basedir, "*.dlabann"
)
if filename:
with qt_try_loadsave_file(self.parentWidget(), filename, "save"):
Conf.main.base_dir.set(filename)
annotations = obj.get_annotations()
write_annotations(filename, annotations)
[docs]
def delete_annotations(self) -> None:
"""Delete all annotations from selected object(s)"""
sel_objects = self.objview.get_sel_objects(include_groups=True)
for obj in sel_objects:
obj.clear_annotations()
self.refresh_plot("selected", True, False)
# Update action states (annotation-related actions should now be disabled)
self.selection_changed()
[docs]
def import_roi_from_file(self, filename: str | None = None) -> None:
"""Import regions of interest from file (JSON).
Args:
filename: File name
"""
if filename is None: # pragma: no cover
basedir = Conf.main.base_dir.get()
with save_restore_stds():
filename, _filter = getopenfilename(
self, _("Import ROI"), basedir, "*.dlabroi"
)
if filename:
with qt_try_loadsave_file(self.parentWidget(), filename, "load"):
Conf.main.base_dir.set(filename)
obj = self.objview.get_sel_objects(include_groups=True)[0]
roi = read_roi(filename)
if obj.roi is None:
obj.roi = roi
else:
obj.roi = obj.roi.combine_with(roi)
self.selection_changed(update_items=True)
self.refresh_plot("selected", True, False)
[docs]
def export_roi_to_file(self, filename: str | None = None) -> None:
"""Export regions of interest to file (JSON).
Args:
filename: File name
"""
obj = self.objview.get_sel_objects(include_groups=True)[0]
assert obj.roi is not None
if filename is None: # pragma: no cover
basedir = Conf.main.base_dir.get()
with save_restore_stds():
filename, _filt = getsavefilename(
self, _("Export ROI"), basedir, "*.dlabroi"
)
if filename:
with qt_try_loadsave_file(self.parentWidget(), filename, "save"):
Conf.main.base_dir.set(filename)
write_roi(filename, obj.roi)
# ------Refreshing GUI--------------------------------------------------------------
[docs]
def selection_changed(self, update_items: bool = False) -> None:
"""Object selection changed: update object properties, refresh plot and update
object view.
Args:
update_items: Update plot items (default: False)
"""
selected_objects = self.objview.get_sel_objects(include_groups=True)
selected_groups = self.objview.get_sel_groups()
# Determine which tab to show based on object state
current_obj = self.objview.get_current_object()
force_tab = None
if current_obj is not None:
obj_uuid = get_uuid(current_obj)
# Show Creation tab for newly created objects (only once)
if obj_uuid == self.objprop._newly_created_obj_uuid:
force_tab = "creation"
self.objprop._newly_created_obj_uuid = None
# Show Processing tab for freshly processed objects (only once)
elif obj_uuid == self.objprop._fresh_processing_obj_uuid:
force_tab = "processing"
self.objprop._fresh_processing_obj_uuid = None
# Show Analysis tab for objects with fresh analysis results
elif obj_uuid == self.objprop._fresh_analysis_obj_uuid:
force_tab = "analysis"
self.objprop._fresh_analysis_obj_uuid = None
self.objprop.update_properties_from(current_obj, force_tab=force_tab)
self.acthandler.selected_objects_changed(selected_groups, selected_objects)
self.refresh_plot("selected", update_items, False)
[docs]
def properties_changed(self) -> None:
"""The properties 'Apply' button was clicked: update object properties,
refresh plot and update object view."""
# Get only the properties that have changed from the original values
changed_props = self.objprop.get_changed_properties()
# Apply only the changed properties to all selected objects
for obj in self.objview.get_sel_objects(include_groups=True):
obj.mark_roi_as_changed()
# Update only the changed properties instead of all properties
update_dataset(obj, changed_props)
self.objview.update_item(get_uuid(obj))
# Auto-recompute analysis if the object had analysis parameters
# Since properties have changed, any analysis results may now be invalid
self.processor.auto_recompute_analysis(obj)
# Refresh all selected items, including non-visible ones (only_visible=False)
# This ensures that plot items are updated for all selected objects, even if
# they are temporarily hidden behind other objects
self.refresh_plot(
"selected", update_items=True, force=False, only_visible=False
)
# Update the stored original values to reflect the new state
# This ensures subsequent changes are compared against the current values
self.objprop.update_original_values()
[docs]
def recompute_processing(self) -> None:
"""Recompute/rerun selected objects or group with stored processing parameters.
This method handles both single objects and groups. For each object, it checks
if it has 1-to-1 processing parameters that can be recomputed. Objects without
recomputable parameters are skipped.
"""
# Get selected objects (handles both individual selection and groups)
objects = self.objview.get_sel_objects(include_groups=True)
if not objects:
return
# Filter objects that have recomputable processing parameters
recomputable_objects: list[SignalObj | ImageObj] = []
for obj in objects:
proc_params = extract_processing_parameters(obj)
if proc_params is not None and proc_params.pattern == "1-to-1":
recomputable_objects.append(obj)
if not recomputable_objects:
if not execenv.unattended:
QW.QMessageBox.information(
self,
_("Recompute"),
_(
"Selected object(s) do not have processing parameters "
"that can be recomputed."
),
)
return
# Recompute each object
with create_progress_bar(
self, _("Recomputing objects"), max_=len(recomputable_objects)
) as progress:
for index, obj in enumerate(recomputable_objects):
progress.setValue(index + 1)
QW.QApplication.processEvents()
if progress.wasCanceled():
break
# Temporarily set this object as current to use existing infrastructure
self.objview.set_current_object(obj)
report = self.objprop.apply_processing_parameters(
obj=obj, interactive=False
)
if not report.success and not execenv.unattended:
failtxt = _("Failed to recompute object")
if index == len(recomputable_objects) - 1:
QW.QMessageBox.warning(
self,
_("Recompute"),
f"{failtxt} '{obj.title}':\n{report.message}",
)
else:
conttxt = _("Do you want to continue with the next object?")
answer = QW.QMessageBox.warning(
self,
_("Recompute"),
f"{failtxt} '{obj.title}':\n{report.message}\n\n{conttxt}",
QW.QMessageBox.Yes | QW.QMessageBox.No,
)
if answer == QW.QMessageBox.No:
break
[docs]
def select_source_objects(self) -> None:
"""Select source objects associated with the selected object's processing.
This method retrieves the source object UUIDs from the selected object's
processing parameters and selects them in the object view.
"""
# Get the selected object (should be exactly one)
objects = self.objview.get_sel_objects(include_groups=False)
if len(objects) != 1:
return
obj = objects[0]
# Extract processing parameters
proc_params = extract_processing_parameters(obj)
if proc_params is None:
if not execenv.unattended:
QW.QMessageBox.information(
self,
_("Select source objects"),
_("Selected object does not have processing metadata."),
)
return
# Get source UUIDs
source_uuids = []
if proc_params.source_uuid:
source_uuids.append(proc_params.source_uuid)
if proc_params.source_uuids:
source_uuids.extend(proc_params.source_uuids)
if not source_uuids:
if not execenv.unattended:
QW.QMessageBox.information(
self,
_("Select source objects"),
_("Selected object does not have source object references."),
)
return
# Check if source objects still exist (search across all panels)
existing_objs = []
for uuid in source_uuids:
obj = self.mainwindow.find_object_by_uuid(uuid)
if obj is not None:
existing_objs.append(obj)
if not existing_objs:
if not execenv.unattended:
if len(source_uuids) == 1:
msg = _("Source object no longer exists.")
else:
msg = _("Source objects no longer exist.")
QW.QMessageBox.warning(self, _("Select source objects"), msg)
return
# Determine which panel contains the source objects
# Source objects are always in the same panel (either all signals or all images)
if all(uuid in self.objmodel.get_object_ids() for uuid in source_uuids):
source_panel = self
elif isinstance(existing_objs[0], SignalObj):
source_panel = self.mainwindow.signalpanel
else: # ImageObj
source_panel = self.mainwindow.imagepanel
# Switch to the source panel if needed
if source_panel is not self:
self.mainwindow.set_current_panel(source_panel)
# Select the existing source objects
# Note: Since all sources are in the same panel, all UUIDs in existing_objs
# are guaranteed to be in source_panel
source_panel.objview.clearSelection()
for obj in existing_objs:
source_panel.objview.set_current_item_id(get_uuid(obj), extend=True)
# Show info if some sources were deleted
missing_count = len(source_uuids) - len(existing_objs)
if missing_count > 0 and not execenv.unattended:
if len(existing_objs) == 1:
msg = _("Selected a single source object")
else:
msg = _("Selected %d source objects") % len(existing_objs)
msg += " ("
if missing_count == 1:
msg += _("1 source object no longer exists")
else:
msg += _("%d source objects no longer exist") % missing_count
msg += ")"
QW.QMessageBox.warning(self, _("Select source objects"), msg)
# ------Plotting data in modal dialogs----------------------------------------------
[docs]
def add_plot_items_to_dialog(self, dlg: PlotDialog, oids: list[str]) -> None:
"""Add plot items to dialog
Args:
dlg: Dialog
oids: Object IDs
"""
objs = self.objmodel.get_objects(oids)
plot = dlg.get_plot()
with create_progress_bar(
self, _("Creating plot items"), max_=len(objs)
) as progress:
for index, obj in enumerate(objs):
progress.setValue(index + 1)
QW.QApplication.processEvents()
if progress.wasCanceled():
return None
item = create_adapter_from_object(obj).make_item(
update_from=self.plothandler[get_uuid(obj)]
)
item.set_readonly(True)
plot.add_item(item, z=0)
plot.set_active_item(item)
item.unselect()
plot.replot()
return dlg
[docs]
def open_separate_view(
self, oids: list[str] | None = None, edit_annotations: bool = False
) -> PlotDialog | None:
"""
Open separate view for visualizing selected objects
Args:
oids: Object IDs (default: None)
edit_annotations: Edit annotations (default: False)
Returns:
Instance of PlotDialog
"""
if oids is None:
oids = self.objview.get_sel_object_uuids(include_groups=True)
obj = self.objmodel[oids[-1]] # last selected object
if not all(oid in self.plothandler for oid in oids):
# This happens for example when opening an already saved workspace with
# multiple images, and if the user tries to view in a new window a group of
# images without having selected any object yet. In this case, only the
# last image is actually plotted (because if the other have the same size
# and position, they are hidden), and the plot item of every other image is
# not created yet. So we need to refresh the plot to create the plot item of
# those images.
self.plothandler.refresh_plot(
"selected", update_items=True, force=True, only_visible=False
)
# Create a new dialog and add plot items to it
dlg = self.create_new_dialog(
title=obj.title if len(oids) == 1 else None,
edit=True,
name=f"{obj.PREFIX}_new_window",
)
if dlg is None:
return None
self.add_plot_items_to_dialog(dlg, oids)
mgr = dlg.get_manager()
toolbar = QW.QToolBar(_("Annotations"), self)
dlg.button_layout.insertWidget(0, toolbar)
mgr.add_toolbar(toolbar, id(toolbar))
toolbar.setToolButtonStyle(QC.Qt.ToolButtonTextUnderIcon)
for tool in self.ANNOTATION_TOOLS:
mgr.add_tool(tool, toolbar_id=id(toolbar))
def toggle_annotations(enabled: bool):
"""Toggle annotation tools / edit buttons visibility"""
for widget in (dlg.button_box, toolbar, mgr.get_itemlist_panel()):
if enabled:
widget.show()
else:
widget.hide()
edit_ann_action = create_action(
dlg,
_("Annotations"),
toggled=toggle_annotations,
icon=get_icon("annotations.svg"),
)
mgr.add_tool(ActionTool, edit_ann_action)
default_toolbar = mgr.get_default_toolbar()
action_btn = default_toolbar.widgetForAction(edit_ann_action)
action_btn.setToolButtonStyle(QC.Qt.ToolButtonTextBesideIcon)
plot = dlg.get_plot()
for item in plot.items:
item.set_selectable(False)
for item in create_adapter_from_object(obj).iterate_shape_items(editable=True):
plot.add_item(item)
self.__separate_views[dlg] = obj
toggle_annotations(edit_annotations)
if len(oids) > 1:
# If multiple objects are displayed, show the item list panel
# (otherwise, it is hidden by default to lighten the dialog, except
# if `edit_annotations` is True):
plot.manager.get_itemlist_panel().show()
if edit_annotations:
edit_ann_action.setChecked(True)
dlg.show()
dlg.finished.connect(self.__separate_view_finished)
return dlg
def __separate_view_finished(self, result: int) -> None:
"""Separate view was closed
Args:
result: result
"""
dlg: PlotDialog = self.sender()
if result == QW.QDialog.DialogCode.Accepted:
rw_items = []
for item in dlg.get_plot().get_items():
if not item.is_readonly() and is_plot_item_serializable(item):
rw_items.append(item)
obj = self.__separate_views[dlg]
# Use the annotation adapter to set annotations in the new format
adapter = create_adapter_from_object(obj)
adapter.set_annotations_from_items(rw_items)
self.selection_changed(update_items=True)
self.__separate_views.pop(dlg)
dlg.deleteLater()
[docs]
def view_images_side_by_side(self, oids: list[str] | None = None) -> None:
"""
View selected images side-by-side in a grid layout
Args:
oids: Object IDs (default: None, uses selected objects)
"""
if oids is None:
oids = self.objview.get_sel_object_uuids(include_groups=True)
if len(oids) < 2:
return
objs = self.objmodel.get_objects(oids)
# Compute grid dimensions
max_cols = 4
num_cols = min(len(objs), max_cols)
num_rows = (len(objs) + num_cols - 1) // num_cols
# Create dialog with synchronized plots
dlg = SyncPlotDialog(title=_("View images side-by-side"), toolbar=False)
dlg.setObjectName(f"{self.PANEL_STR_ID}_side_by_side")
# Add each image to the grid
for idx, obj in enumerate(objs):
row = idx // num_cols
col = idx % num_cols
# Create plot with title
plot = BasePlot(options=BasePlotOptions(title=obj.title, type="image"))
# Create plot item from object
adapter = create_adapter_from_object(obj)
item = adapter.make_item()
item.set_readonly(True)
plot.add_item(item, z=0)
# Add ROI items if available
if obj.roi is not None:
for roi_item in adapter.iterate_roi_items(editable=False):
plot.add_item(roi_item)
# Add to synchronized dialog
dlg.add_plot(row, col, plot, sync=True)
# Finalize and show dialog
dlg.finalize_configuration()
# Set explicit size for proper display
dlg.resize(20 + 440 * num_cols, 20 + 400 * num_rows)
exec_dialog(dlg)
[docs]
def get_dialog_size(self) -> tuple[int, int]:
"""Get dialog size (minimum and maximum)"""
# Resize the dialog so that it's at least MINDIALOGSIZE (absolute values),
# and at most MAXDIALOGSIZE (% of the main window size):
minwidth, minheight = self.MINDIALOGSIZE
maxwidth = int(self.mainwindow.width() * self.MAXDIALOGSIZE)
maxheight = int(self.mainwindow.height() * self.MAXDIALOGSIZE)
size = min(minwidth, maxwidth), min(minheight, maxheight)
return size
[docs]
def create_new_dialog(
self,
edit: bool = False,
toolbar: bool = True,
title: str | None = None,
name: str | None = None,
options: dict[str, Any] | None = None,
) -> PlotDialog | None:
"""Create new pop-up signal/image plot dialog.
Args:
edit: Edit mode
toolbar: Show toolbar
title: Dialog title
name: Dialog object name
options: Plot options
Returns:
Plot dialog instance
"""
plot_options = self.plothandler.get_plot_options()
if options is not None:
plot_options = plot_options.copy(options)
# pylint: disable=not-callable
dlg = PlotDialog(
parent=self.parentWidget(),
title=APP_NAME if title is None else f"{title} - {APP_NAME}",
options=plot_options,
toolbar=toolbar,
icon="DataLab.svg",
edit=edit,
size=self.get_dialog_size(),
)
dlg.setObjectName(name)
return dlg
[docs]
def get_roi_editor_output(
self, mode: Literal["apply", "extract", "define"] = "apply"
) -> tuple[TypeROI, bool] | None:
"""Get ROI data (array) from specific dialog box.
Args:
mode: Mode of operation, either "apply" (define ROI, then apply it to
selected objects), "extract" (define ROI, then extract data from it),
or "define" (define ROI without applying or extracting).
Returns:
A tuple containing the ROI object and a boolean indicating whether the
dialog was accepted or not.
"""
obj = self.objview.get_sel_objects(include_groups=True)[-1]
item = create_adapter_from_object(obj).make_item(
update_from=self.plothandler[get_uuid(obj)]
)
roi_editor_class = self.get_roieditor_class() # pylint: disable=not-callable
roi_editor = roi_editor_class(
parent=self.parentWidget(),
obj=obj,
mode=mode,
item=item,
options=self.plothandler.get_plot_options(),
size=self.get_dialog_size(),
)
if exec_dialog(roi_editor):
return roi_editor.get_roieditor_results()
return None
[docs]
def get_objects_with_dialog(
self,
title: str,
comment: str = "",
nb_objects: int = 1,
parent: QW.QWidget | None = None,
) -> TypeObj | None:
"""Get object with dialog box.
Args:
title: Dialog title
comment: Optional dialog comment
nb_objects: Number of objects to select
parent: Parent widget
Returns:
Object(s) (signal(s) or image(s), or None if dialog was canceled)
"""
parent = self if parent is None else parent
dlg = objectview.GetObjectsDialog(parent, self, title, comment, nb_objects)
if exec_dialog(dlg):
return dlg.get_selected_objects()
return None
def __show_no_result_warning(self):
"""Show no result warning"""
msg = "<br>".join(
[
_("No result currently available for this object."),
"",
_(
"This feature leverages the results of previous analysis "
"performed on the selected object(s).<br><br>"
"To compute results, select one or more objects and choose "
"a feature in the <u>Analysis</u> menu."
),
]
)
if not execenv.unattended:
QW.QMessageBox.information(self, APP_NAME, msg)
[docs]
def show_results(self) -> None:
"""Show results"""
objs = self.objview.get_sel_objects(include_groups=True)
rdatadict = create_resultdata_dict(objs)
if rdatadict:
for rdata in rdatadict.values():
show_resultdata(self.parentWidget(), rdata, f"{objs[0].PREFIX}_results")
else:
self.__show_no_result_warning()
[docs]
def toggle_result_label_visibility(self, state: bool) -> None:
"""Toggle the visibility of the merged result label on the plot.
Args:
state: True to show the label, False to hide it
"""
show_label = state
# Update the configuration
Conf.view.show_result_label.set(show_label)
# Synchronize the other panel's action state
for panel in (self.mainwindow.signalpanel, self.mainwindow.imagepanel):
if panel is not self and panel.acthandler.show_label_action is not None:
panel.acthandler.show_label_action.blockSignals(True)
panel.acthandler.show_label_action.setChecked(show_label)
panel.acthandler.show_label_action.blockSignals(False)
# Refresh the plot to show/hide the label
self.plothandler.toggle_result_label_visibility(show_label)
def __add_result_signal(
self,
x: np.ndarray | list[float],
y: np.ndarray | list[float],
title: str,
xaxis: str,
yaxis: str,
group_id: str | None = None,
) -> None:
"""Add result signal
Args:
x: X data
y: Y data
title: Signal title
xaxis: X axis label
yaxis: Y axis label
group_id: UUID of the group to add the signal to. If None, add to
current group.
"""
xdata = np.array(x, dtype=float)
ydata = np.array(y, dtype=float)
obj = create_signal(
title=f"{title}: {yaxis} = f({xaxis})",
x=xdata,
y=ydata,
labels=[xaxis, yaxis],
)
self.mainwindow.signalpanel.add_object(obj, group_id=group_id)
def __create_plot_result_param_class(self, rdata: ResultData) -> type[gds.DataSet]:
"""Create PlotResultParam class dynamically based on result data.
Args:
rdata: Result data
Returns:
PlotResultParam class
"""
# Build X and Y choices from result data headers
xchoices = (("indices", _("Indices")),)
for xlabel in rdata.headers:
# If this column data is not numeric, we skip it:
if not isinstance(
rdata.results[0].get_column_values(xlabel)[0], (int, float, np.number)
):
continue
xchoices += ((xlabel, xlabel),)
ychoices = xchoices[1:]
# Determine default plot kind based on result data
default_kind = (
"one_curve_per_object"
if any(len(result.to_dataframe()) > 1 for result in rdata.results)
else "one_curve_per_title"
)
class PlotResultParam(gds.DataSet):
"""Plot results parameters"""
kind = gds.ChoiceItem(
_("Plot kind"),
(
(
"one_curve_per_object",
_("One curve per object (or ROI) and per result title"),
),
("one_curve_per_title", _("One curve per result title")),
),
default=default_kind,
)
xaxis = gds.ChoiceItem(_("X axis"), xchoices, default="indices")
yaxis = gds.ChoiceItem(_("Y axis"), ychoices, default=ychoices[0][0])
return PlotResultParam
def __plot_result(
self,
category: str,
rdata: ResultData,
objs: list[SignalObj | ImageObj],
param: gds.DataSet | None = None,
result_group_id: str | None = None,
) -> None:
"""Plot results for a specific category
Args:
category: Result category
rdata: Result data
objs: List of objects
param: Plot result parameters. If None, show dialog to get parameters.
result_group_id: UUID of the group to add result signals to. If None,
add to current group.
"""
# Regrouping ResultShape results by their `title` attribute:
grouped_results: dict[str, list[GeometryAdapter | TableAdapter]] = {}
for result in rdata.results:
grouped_results.setdefault(result.title, []).append(result)
# From here, results are already grouped by their `category` attribute,
# and then by their `title` attribute. We can now plot them.
#
# Now, we have two common use cases:
# 1. Plotting one curve per object (signal/image) and per `title`
# attribute, if each selected object contains a result object
# with multiple values (e.g. from a blob detection feature).
# 2. Plotting one curve per `title` attribute, if each selected object
# contains a result object with a single value (e.g. from a FHWM
# feature) - in that case, we select only the first value of each
# result object.
if param is None:
# Create parameter class and show dialog
PlotResultParam = self.__create_plot_result_param_class(rdata)
comment = (
_(
"Plot results obtained from previous analyses.<br><br>"
"This plot is based on results associated with '%s' prefix."
)
% category
)
param = PlotResultParam(_("Plot results"), comment=comment)
if not param.edit(parent=self.parentWidget()):
return
if param.kind == "one_curve_per_title":
# One curve per ROI (if any) and per result title
# ------------------------------------------------------------------
# Begin by checking if all results have the same number of ROIs:
# for simplicity, let's check the number of unique ROI indices.
all_roi_indexes = [
result.get_unique_roi_indices() for result in rdata.results
]
# Check if all roi_indexes are the same:
if len(set(map(tuple, all_roi_indexes))) > 1:
if not execenv.unattended:
QW.QMessageBox.warning(
self,
_("Plot results"),
_(
"All objects associated with results must have the "
"same regions of interest (ROIs) to plot results "
"together."
),
)
return
obj = objs[0]
# Build a string with source object short IDs (max 3, then use "...")
max_ids_to_show = 3
short_ids = [get_short_id(obj) for obj in objs]
if len(short_ids) <= max_ids_to_show:
source_ids = ", ".join(short_ids)
else:
# Show first 2, "...", then last one: "s001, s002, ..., s010"
source_ids = (
", ".join(short_ids[: max_ids_to_show - 1])
+ ", ..., "
+ short_ids[-1]
)
for i_roi in all_roi_indexes[0]:
roi_suffix = f"|ROI{int(i_roi + 1)}" if i_roi >= 0 else ""
for title, results in grouped_results.items(): # title
x, y = [], []
for index, result in enumerate(results):
if param.xaxis == "indices":
x.append(index)
else:
x.append(result.get_column_values(param.xaxis, i_roi)[0])
y.append(result.get_column_values(param.yaxis, i_roi)[0])
if i_roi >= 0:
roi_suffix = f"|{obj.roi.get_single_roi_title(int(i_roi))}"
self.__add_result_signal(
x,
y,
f"{title} ({source_ids}){roi_suffix}",
param.xaxis,
param.yaxis,
result_group_id,
)
else:
# One curve per result title, per object and per ROI
# ------------------------------------------------------------------
for title, results in grouped_results.items(): # title
for index, result in enumerate(results): # object
obj = objs[index]
roi_indices = result.get_unique_roi_indices()
for i_roi in roi_indices: # ROI
roi_suffix = ""
if i_roi >= 0:
roi_suffix = f"|{obj.roi.get_single_roi_title(int(i_roi))}"
roi_data = result.get_roi_data(i_roi)
if param.xaxis == "indices":
x = list(range(len(roi_data)))
else:
x = roi_data[param.xaxis].values
y = roi_data[param.yaxis].values
shid = get_short_id(objs[index])
stitle = f"{title} ({shid}){roi_suffix}"
self.__add_result_signal(
x, y, stitle, param.xaxis, param.yaxis, result_group_id
)
[docs]
def plot_results(
self,
kind: str | None = None,
xaxis: str | None = None,
yaxis: str | None = None,
) -> None:
"""Plot results
Args:
kind: Plot kind. Either "one_curve_per_object" or "one_curve_per_title".
If None, show dialog to get parameters.
xaxis: X axis column name. If None, show dialog to get parameters.
yaxis: Y axis column name. If None, show dialog to get parameters.
"""
objs = self.objview.get_sel_objects(include_groups=True)
rdatadict = create_resultdata_dict(objs)
if rdatadict:
# Always use or create a "Results" group for all plot results
rgroup_title = _("Results")
target_panel = self.mainwindow.signalpanel
try:
# Check if a "Results" group already exists in the signal panel
rgroup = target_panel.objmodel.get_group_from_title(rgroup_title)
except KeyError:
# Create the group if it doesn't exist
rgroup = target_panel.add_group(rgroup_title)
result_group_id = get_uuid(rgroup)
for category, rdata in rdatadict.items():
param = None
if kind is not None and xaxis is not None and yaxis is not None:
# Create parameter object programmatically
PlotResultParam = self.__create_plot_result_param_class(rdata)
param = PlotResultParam()
param.kind = kind
param.xaxis = xaxis
param.yaxis = yaxis
self.__plot_result(category, rdata, objs, param, result_group_id)
else:
self.__show_no_result_warning()
[docs]
def delete_results(self) -> None:
"""Delete results"""
objs = self.objview.get_sel_objects(include_groups=True)
rdatadict = create_resultdata_dict(objs)
if rdatadict:
if execenv.unattended:
confirmed = True
else:
answer = QW.QMessageBox.warning(
self,
_("Delete results"),
_(
"Are you sure you want to delete all results "
"of the selected object(s)?"
),
QW.QMessageBox.Yes | QW.QMessageBox.No,
)
confirmed = answer == QW.QMessageBox.Yes
if confirmed:
objs = self.objview.get_sel_objects(include_groups=True)
for obj in objs:
# Remove all table and geometry results using adapter methods
TableAdapter.remove_all_from(obj)
GeometryAdapter.remove_all_from(obj)
if obj is self.objview.get_current_object():
self.objprop.update_properties_from(obj)
# Update action states to reflect the removal
selected_groups = self.objview.get_sel_groups()
self.acthandler.selected_objects_changed(selected_groups, objs)
self.refresh_plot("selected", True, False)
else:
self.__show_no_result_warning()
[docs]
def add_label_with_title(
self, title: str | None = None, ignore_msg: bool = True
) -> None:
"""Add a label with object title on the associated plot
Args:
title: Label title. Defaults to None.
If None, the title is the object title.
ignore_msg: If True, do not show the information message. Defaults to True.
If False, show a message box to inform the user that the label has been
added as an annotation, and that it can be edited or removed using the
annotation editing window.
"""
objs = self.objview.get_sel_objects(include_groups=True)
for obj in objs:
create_adapter_from_object(obj).add_label_with_title(title=title)
if (
not Conf.view.ignore_title_insertion_msg.get(False)
and not ignore_msg
and not execenv.unattended
):
answer = QW.QMessageBox.information(
self,
_("Annotation added"),
_(
"The label has been added as an annotation. "
"You can edit or remove it using the annotation editing window."
"<br><br>"
"Choosing to ignore this message will prevent it "
"from being displayed again."
),
QW.QMessageBox.Ok | QW.QMessageBox.Ignore,
)
if answer == QW.QMessageBox.Ignore:
Conf.view.ignore_title_insertion_msg.set(True)
self.refresh_plot("selected", True, False)