Source code for datalab.gui.panel.base

# Copyright (c) DataLab Platform Developers, BSD 3-Clause license, see LICENSE file.

"""
.. Base panel objects (see parent package :mod:`datalab.gui.panel`)
"""

# pylint: disable=invalid-name  # Allows short reference names like x, y, ...

from __future__ import annotations

import abc
import glob
import os
import os.path as osp
import re
import warnings
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Generator, Generic, Literal, Type

import guidata.dataset as gds
import guidata.dataset.qtwidgets as gdq
import h5py
import numpy as np
import plotpy.io
from guidata.configtools import get_icon
from guidata.dataset import restore_dataset, update_dataset
from guidata.qthelpers import add_actions, create_action, exec_dialog
from plotpy.plot import BasePlot, BasePlotOptions, PlotDialog, SyncPlotDialog
from plotpy.tools import ActionTool
from qtpy import QtCore as QC  # type: ignore[import]
from qtpy import QtWidgets as QW
from qtpy.compat import (
    getexistingdirectory,
    getopenfilename,
    getopenfilenames,
    getsavefilename,
)
from sigima.io import (
    read_annotations,
    read_metadata,
    read_roi,
    write_annotations,
    write_metadata,
    write_roi,
)
from sigima.io.base import get_file_extensions
from sigima.io.common.basename import format_basenames
from sigima.objects import (
    ImageObj,
    NewImageParam,
    SignalObj,
    TypeObj,
    TypeROI,
    create_image_from_param,
    create_signal,
    create_signal_from_param,
)
from sigima.objects.base import ROI_KEY
from sigima.params import SaveToDirectoryParam

from datalab import objectmodel
from datalab.adapters_metadata import (
    GeometryAdapter,
    ResultData,
    TableAdapter,
    create_resultdata_dict,
    show_resultdata,
)
from datalab.adapters_plotpy import create_adapter_from_object
from datalab.config import APP_NAME, Conf, _
from datalab.env import execenv
from datalab.gui import actionhandler, objectview
from datalab.gui.newobject import (
    CREATION_PARAMETERS_OPTION,
    NewSignalParam,
    extract_creation_parameters,
    insert_creation_parameters,
)
from datalab.gui.processor.base import (
    PROCESSING_PARAMETERS_OPTION,
    ProcessingParameters,
    extract_processing_parameters,
    insert_processing_parameters,
)
from datalab.gui.roieditor import TypeROIEditor
from datalab.objectmodel import ObjectGroup, get_short_id, get_uuid, set_uuid
from datalab.utils.qthelpers import (
    CallbackWorker,
    create_progress_bar,
    qt_long_callback,
    qt_try_except,
    qt_try_loadsave_file,
    save_restore_stds,
)
from datalab.widgets.textimport import TextImportWizard

if TYPE_CHECKING:
    from plotpy.items import CurveItem, LabelItem, MaskedXYImageItem
    from sigima.io.image import ImageIORegistry
    from sigima.io.signal import SignalIORegistry

    from datalab.gui import ObjItf
    from datalab.gui.main import DLMainWindow
    from datalab.gui.plothandler import ImagePlotHandler, SignalPlotHandler
    from datalab.gui.processor.image import ImageProcessor
    from datalab.gui.processor.signal import SignalProcessor
    from datalab.h5.native import NativeH5Reader, NativeH5Writer


# Metadata keys that should not be pasted when copying metadata between objects
METADATA_PASTE_EXCLUSIONS = {
    ROI_KEY,  # ROI has dedicated copy/paste operations
    "__uuid",  # Each object must have a unique identifier
    f"__{PROCESSING_PARAMETERS_OPTION}",  # Object-specific processing history
    f"__{CREATION_PARAMETERS_OPTION}",  # Object-specific creation parameters
}


[docs] def is_plot_item_serializable(item: Any) -> bool: """Return True if plot item is serializable""" try: plotpy.io.item_class_from_name(item.__class__.__name__) return True except AssertionError: return False
[docs] def is_hdf5_file(filename: str, check_content: bool = False) -> bool: """Return True if filename has an HDF5 extension or is an HDF5 file. Args: filename: Path to the file to check check_content: If True, also attempts to open the file to verify it's a valid HDF5 file. If False, only checks the file extension. Returns: True if the file is (likely) an HDF5 file, False otherwise. """ # First, check by extension (fast) has_hdf5_extension = filename.lower().endswith((".h5", ".hdf5", ".hdf", ".he5")) if not check_content: return has_hdf5_extension # If checking content, try to open as HDF5 file if has_hdf5_extension: return True # Trust common HDF5 extensions # For other extensions, attempt to open the file to verify it's HDF5 try: with h5py.File(filename, "r"): return True except (OSError, IOError, ValueError): # Not a valid HDF5 file return False
[docs] @dataclass class ProcessingReport: """Report of processing operation Args: success: True if processing succeeded obj_uuid: UUID of the processed object message: Optional message (error or info) """ success: bool obj_uuid: str | None = None message: str | None = None
[docs] class ObjectProp(QW.QWidget): """Object handling panel properties Args: panel: parent data panel objclass: class of the object handled by the panel (SignalObj or ImageObj) """ def __init__(self, panel: BaseDataPanel, objclass: SignalObj | ImageObj) -> None: super().__init__(panel) # Create the tab widget self.tabwidget = QW.QTabWidget(self) self.tabwidget.setTabBarAutoHide(True) self.tabwidget.setTabPosition(QW.QTabWidget.West) self.panel = panel self.objclass = objclass # Object creation tab self.creation_param_editor: gdq.DataSetEditGroupBox | None = None self.current_creation_obj: SignalObj | ImageObj | None = None self.creation_scroll: QW.QScrollArea | None = None # Object processing tab self.processing_param_editor: gdq.DataSetEditGroupBox | None = None self.current_processing_obj: SignalObj | ImageObj | None = None self.processing_scroll: QW.QScrollArea | None = None # Properties tab self.properties = gdq.DataSetEditGroupBox("", objclass) self.properties.SIG_APPLY_BUTTON_CLICKED.connect(panel.properties_changed) self.properties.setEnabled(False) self.__original_values: dict[str, Any] = {} # Create Analysis and History widgets font = Conf.proc.small_mono_font.get_font() self.processing_history = QW.QTextEdit() self.processing_history.setReadOnly(True) self.processing_history.setFont(font) self.analysis_parameters = QW.QTextEdit() self.analysis_parameters.setReadOnly(True) self.analysis_parameters.setFont(font) # Track newly created objects to show Creation tab only once self._newly_created_obj_uuid: str | None = None # Track when analysis results were just computed self._fresh_analysis_obj_uuid: str | None = None # Track when object was just processed (1-to-1) self._fresh_processing_obj_uuid: str | None = None self.tabwidget.addTab( self.processing_history, get_icon("history.svg"), _("History") ) self.tabwidget.addTab( self.analysis_parameters, get_icon("analysis.svg"), _("Analysis parameters") ) self.tabwidget.addTab( self.properties, get_icon("properties.svg"), _("Properties") ) self.processing_history.textChanged.connect(self._update_tab_visibility) self.analysis_parameters.textChanged.connect(self._update_tab_visibility) # Create vertical layout for the container layout = QW.QVBoxLayout(self) layout.setContentsMargins(0, 0, 0, 0) layout.setSpacing(0) # Add tab widget and button area to main layout layout.addWidget(self.tabwidget) # Here we could add another widget or layout if needed (in DataLab v0.20, we # had a permanent button area here, but it was removed to avoid clutter) def _update_tab_visibility(self) -> None: """Update visibility of tabs based on their content.""" for textedit in (self.processing_history, self.analysis_parameters): tab_index = self.tabwidget.indexOf(textedit) if tab_index >= 0: has_content = bool(textedit.toPlainText().strip()) self.tabwidget.setTabVisible(tab_index, has_content)
[docs] def display_analysis_parameters(self, obj: SignalObj | ImageObj) -> bool: """Set analysis parameter label. Args: obj: Signal or Image object Returns: True if analysis parameters were found and displayed, False otherwise. """ text = "" # Iterate through all result adapters and extract parameter info for adapter_class in (GeometryAdapter, TableAdapter): for adapter in adapter_class.iterate_from_obj(obj): param = adapter.get_param() if param is not None: if text: text += "<br><br>" # Get function name for context func_name = adapter.func_name if func_name: # Add function name as a header for better context param.set_comment( "(" + _("Parameters for function `%s`") % func_name + ")" ) text += param.to_html() self.analysis_parameters.setText(text) return bool(text)
def _build_processing_history(self, obj: SignalObj | ImageObj) -> str: """Build processing history as a simple text list. Args: obj: Signal or Image object Returns: Processing history as text """ history_items = [] current_obj = obj max_depth = 20 # Prevent infinite loops # Walk backwards through processing chain, collecting items while current_obj is not None and len(history_items) < max_depth: proc_params = extract_processing_parameters(current_obj) if proc_params is None: # Check for creation parameters creation_params = extract_creation_parameters(current_obj) if creation_params is not None: text = f"{_('Created')}: {creation_params.title}" history_items.append(text) else: history_items.append(_("Original object")) break # Skip 1-to-0 operations (analysis) as they don't transform the object # They just add metadata, so they shouldn't appear in processing history if proc_params.pattern == "1-to-0": # For 1-to-0 operations, there's no processing history to show # (they analyze but don't transform the object) # Check if there's any earlier processing creation_params = extract_creation_parameters(current_obj) if creation_params is not None: text = f"{_('Created')}: {creation_params.title}" history_items.append(text) else: history_items.append(_("Original object")) break # Add current processing step func_name = proc_params.func_name.replace("_", " ").title() history_items.append(func_name) # Try to find source object if proc_params.source_uuid: current_obj = self.panel.mainwindow.find_object_by_uuid( proc_params.source_uuid ) if current_obj is None: history_items.append(_("(source deleted)")) break else: if proc_params.source_uuids: # Multiple sources (n-to-1 or 2-to-1 pattern) history_items.append(_("(multiple sources)")) break if len(history_items) <= 1: return "" # Shows the history tab only when there is some history # Reverse to show from oldest to newest, then add indentation history_items.reverse() history_lines = [] for i, item in enumerate(history_items): indent = " " * i history_lines.append(f"{indent}└─ {item}") return "\n".join(history_lines)
[docs] def display_processing_history(self, obj: SignalObj | ImageObj) -> bool: """Display processing history. Args: obj: Signal or Image object Returns: True if processing history was found and displayed, False otherwise. """ history_text = self._build_processing_history(obj) self.processing_history.setText(history_text) return bool(history_text)
def __update_properties_dataset(self, obj: SignalObj | ImageObj) -> None: """Update properties dataset from signal/image dataset Args: obj: Signal or Image object """ dataset: SignalObj | ImageObj = self.properties.dataset dataset.set_defaults() update_dataset(dataset, obj) self.properties.get() self.properties.apply_button.setEnabled(False)
[docs] def update_properties_from( self, obj: SignalObj | ImageObj | None = None, force_tab: Literal["creation", "processing", "analysis", None] | None = None, ) -> None: """Update properties panel (properties, creation, processing) from object. Args: obj: Signal or Image object force_tab: Force a specific tab to be current """ self.properties.setDisabled(obj is None) if obj is None: obj = self.objclass() # Update the properties dataset self.__update_properties_dataset(obj) # Store original values to detect which properties have changed # (using `restore_dataset` to convert the dataset to a dictionary) self.__original_values = {} restore_dataset(self.properties.dataset, self.__original_values) # Display analysis parameters and processing history has_analysis_parameters = self.display_analysis_parameters(obj) self.display_processing_history(obj) # Remove only Creation and Processing tabs (dynamic tabs) # Use widget references instead of text labels for reliable identification if self.creation_scroll is not None: index = self.tabwidget.indexOf(self.creation_scroll) if index >= 0: self.tabwidget.removeTab(index) if self.processing_scroll is not None: index = self.tabwidget.indexOf(self.processing_scroll) if index >= 0: self.tabwidget.removeTab(index) # Reset references for dynamic tabs self.creation_param_editor = None self.current_creation_obj = None self.creation_scroll = None self.processing_param_editor = None self.current_processing_obj = None self.processing_scroll = None # Setup Creation and Processing tabs (if applicable) has_creation_tab = False has_processing_tab = False if obj is not None: has_creation_tab = self.setup_creation_tab(obj) has_processing_tab = self.setup_processing_tab(obj) # Processing tab setup # Trigger visibility update for History and Analysis parameters tabs # (will be called via textChanged signals, but we call explicitly # here to ensure initial state is correct) self._update_tab_visibility() # Determine which tab to show based on force_tab parameter: # - If force_tab="creation" and Creation tab exists, show it # - If force_tab="processing" and Processing tab exists, show it # - If force_tab="analysis" and Analysis tab has content, show it # - Otherwise, always show Properties tab (default behavior) if force_tab == "creation" and has_creation_tab: self.tabwidget.setCurrentWidget(self.creation_scroll) elif force_tab == "processing" and has_processing_tab: self.tabwidget.setCurrentWidget(self.processing_scroll) elif force_tab == "analysis" and has_analysis_parameters: self.tabwidget.setCurrentWidget(self.analysis_parameters) else: # Default: always show Properties tab when switching objects self.tabwidget.setCurrentWidget(self.properties)
[docs] def mark_as_newly_created(self, obj: SignalObj | ImageObj) -> None: """Mark object to show Creation tab on next selection. Args: obj: Object to mark """ self._newly_created_obj_uuid = get_uuid(obj)
[docs] def mark_as_freshly_processed(self, obj: SignalObj | ImageObj) -> None: """Mark object to show Processing tab on next selection. Args: obj: Object to mark """ self._fresh_processing_obj_uuid = get_uuid(obj)
[docs] def mark_as_fresh_analysis(self, obj: SignalObj | ImageObj) -> None: """Mark object to show Analysis tab on next selection. Args: obj: Object to mark """ self._fresh_analysis_obj_uuid = get_uuid(obj)
[docs] def get_changed_properties(self) -> dict[str, Any]: """Get dictionary of properties that have changed from original values. Returns: Dictionary mapping property names to their new values, containing only the properties that were modified by the user. """ dataset = self.properties.dataset changed = {} # Get current values as a dictionary current_values = {} restore_dataset(dataset, current_values) # Compare with original values for key, current_value in current_values.items(): original_value = self.__original_values.get(key) # Check if value has changed if not self._values_equal(current_value, original_value): changed[key] = current_value return changed
[docs] def update_original_values(self) -> None: """Update the stored original values to the current dataset values. This should be called after applying changes to reset the baseline for detecting future changes. """ dataset = self.properties.dataset self.__original_values = {} restore_dataset(dataset, self.__original_values)
@staticmethod def _values_equal(val1: Any, val2: Any) -> bool: """Compare two values, handling special cases like numpy arrays. Args: val1: first value val2: second value Returns: True if values are equal """ # Handle numpy arrays if isinstance(val1, np.ndarray) or isinstance(val2, np.ndarray): if not isinstance(val1, np.ndarray) or not isinstance(val2, np.ndarray): return False return np.array_equal(val1, val2) # Handle regular comparison return val1 == val2
[docs] def setup_creation_tab( self, obj: SignalObj | ImageObj, set_current: bool = False ) -> bool: """Setup the Creation tab with parameter editor for interactive object creation. Args: obj: Signal or Image object set_current: If True, set the Creation tab as current after creation Returns: True if Creation tab was set up, False otherwise """ param = extract_creation_parameters(obj) if param is None: return False # Create parameter editor widget using the actual parameter class # (which is a subclass of NewSignalParam or NewImageParam) editor = gdq.DataSetEditGroupBox(_("Creation Parameters"), param.__class__) update_dataset(editor.dataset, param) editor.get() # Connect Apply button to recreation handler editor.SIG_APPLY_BUTTON_CLICKED.connect(self.apply_creation_parameters) editor.set_apply_button_state(False) # Store reference to be able to retrieve it later self.creation_param_editor = editor self.current_creation_obj = obj # Remove existing Creation tab if it exists if self.creation_scroll is not None: index = self.tabwidget.indexOf(self.creation_scroll) if index >= 0: self.tabwidget.removeTab(index) # Set the parameter editor as the scroll area widget # Creation tab is always at index 0 (before all other tabs) self.creation_scroll = QW.QScrollArea() self.creation_scroll.setWidgetResizable(True) self.creation_scroll.setWidget(editor) icon_name = "new_sig.svg" if isinstance(obj, SignalObj) else "new_ima.svg" self.tabwidget.insertTab( 0, self.creation_scroll, get_icon(icon_name), _("Creation") ) # Set as current tab if requested if set_current: self.tabwidget.setCurrentWidget(self.creation_scroll) return True
[docs] def apply_creation_parameters(self) -> None: """Apply creation parameters: recreate object with updated parameters.""" editor = self.creation_param_editor if editor is None or self.current_creation_obj is None: return if isinstance(self.current_creation_obj, SignalObj): otext = _("Signal was modified in-place.") else: otext = _("Image was modified in-place.") text = f"⚠️ {otext} ⚠️ " text += _( "If computation were performed based on this object, " "they may need to be redone." ) self.panel.SIG_STATUS_MESSAGE.emit(text, 20000) # Recreate object with new parameters # (serialization is done automatically in create_signal/image_from_param) param = editor.dataset try: if isinstance(self.current_creation_obj, SignalObj): new_obj = create_signal_from_param(param) else: # ImageObj new_obj = create_image_from_param(param) except Exception as exc: # pylint: disable=broad-exception-caught if execenv.unattended: raise exc QW.QMessageBox.warning( self, _("Error"), _("Failed to recreate object with new parameters:\n%s") % str(exc), ) return # Update the current object in-place obj_uuid = get_uuid(self.current_creation_obj) self.current_creation_obj.title = new_obj.title if isinstance(self.current_creation_obj, SignalObj): self.current_creation_obj.xydata = new_obj.xydata else: # ImageObj self.current_creation_obj.data = new_obj.data # Invalidate ROI mask cache when image dimensions change # (the mask is computed based on image shape, so it must be recomputed) self.current_creation_obj.invalidate_maskdata_cache() # Update metadata with new creation parameters insert_creation_parameters(self.current_creation_obj, param) # Auto-recompute analysis if the object had analysis parameters # Since the data has changed, any analysis results are now invalid # Use the processor for the current object's type obj_processor = self.__get_processor_associated_to(self.current_creation_obj) obj_processor.auto_recompute_analysis(self.current_creation_obj) # Update the tree view item (to show new title if it changed) self.panel.objview.update_item(obj_uuid) # Refresh only the plot, not the entire panel # (avoid calling selection_changed which would trigger a full refresh # of the Properties tab and could cause recursion issues) self.panel.refresh_plot(obj_uuid, update_items=True, force=True) # Update the Properties tab to reflect the new object properties # (e.g., data type, dimensions, etc.) self.__update_properties_dataset(self.current_creation_obj) # Refresh the Creation tab with the new parameters # Use QTimer to defer this until after the current event is processed # Set the Creation tab as current to keep it visible after refresh QC.QTimer.singleShot( 0, lambda: self.setup_creation_tab( self.current_creation_obj, set_current=True ), )
[docs] def setup_processing_tab( self, obj: SignalObj | ImageObj, reset_params: bool = True, set_current: bool = False, ) -> bool: """Setup the Processing tab with parameter editor for re-processing. Args: obj: Signal or Image object reset_params: If True, call update_from_obj() to reset parameters from source object. If False, use parameters as stored in metadata. set_current: If True, set the Processing tab as current after creation Returns: True if Processing tab was set up, False otherwise """ # Extract processing parameters proc_params = extract_processing_parameters(obj) if proc_params is None: return False # Check if the pattern type is 1-to-1 (only interactive pattern) if proc_params.pattern != "1-to-1": return False # Store reference to be able to retrieve it later self.current_processing_obj = obj # Check if object has processing parameter param = proc_params.param if param is None: return False # Skip interactive processing for list of parameters # (e.g., ROI extraction, erase operations) if isinstance(param, list): return False # Eventually call the `update_from_obj` method to properly initialize # the parameter object from the current object state. # Only do this when reset_params is True (initial setup), not when # refreshing after user has modified parameters. if reset_params and hasattr(param, "update_from_obj"): # Warning: the `update_from_obj` method takes the input object as argument, # not the output object (`obj` is the processed object here): # Retrieve the input object from the source UUID if proc_params.source_uuid is not None: source_obj = self.panel.mainwindow.find_object_by_uuid( proc_params.source_uuid ) if source_obj is not None: param.update_from_obj(source_obj) # Create parameter editor widget editor = gdq.DataSetEditGroupBox( _("Processing Parameters"), param.__class__, wordwrap=True ) update_dataset(editor.dataset, param) editor.get() # Connect Apply button to reprocessing handler editor.SIG_APPLY_BUTTON_CLICKED.connect(self.apply_processing_parameters) editor.set_apply_button_state(False) # Store reference to be able to retrieve it later self.processing_param_editor = editor # Remove existing Processing tab if it exists if self.processing_scroll is not None: index = self.tabwidget.indexOf(self.processing_scroll) if index >= 0: self.tabwidget.removeTab(index) # Processing tab comes after Creation tab (if it exists) # Find the correct insertion index: after Creation (index 0) if it exists, # otherwise at index 0 has_creation = ( self.creation_scroll is not None and self.tabwidget.indexOf(self.creation_scroll) >= 0 ) insert_index = 1 if has_creation else 0 # Create new processing scroll area and tab self.processing_scroll = QW.QScrollArea() self.processing_scroll.setWidgetResizable(True) self.processing_scroll.setHorizontalScrollBarPolicy(QC.Qt.ScrollBarAlwaysOff) self.processing_scroll.setSizePolicy( QW.QSizePolicy.Expanding, QW.QSizePolicy.Preferred ) self.processing_scroll.setWidget(editor) self.tabwidget.insertTab( insert_index, self.processing_scroll, get_icon("libre-tech-ram.svg"), _("Processing"), ) # Set as current tab if requested if set_current: self.tabwidget.setCurrentWidget(self.processing_scroll) return True
def __get_processor_associated_to( self, obj: SignalObj | ImageObj ) -> SignalProcessor | ImageProcessor: """Get the processor associated to the given object type. Args: obj: Signal or Image object Returns: Processor associated to the object's type """ assert isinstance(obj, (SignalObj, ImageObj)) if isinstance(obj, SignalObj): return self.panel.mainwindow.signalpanel.processor return self.panel.mainwindow.imagepanel.processor
[docs] def apply_processing_parameters( self, obj: SignalObj | ImageObj | None = None, interactive: bool = True ) -> ProcessingReport: """Apply processing parameters: re-run processing with updated parameters. Args: obj: Signal or Image object to reprocess. If None, uses the current object. interactive: If True, show progress and error messages in the UI. Returns: ProcessingReport with success status, object UUID, and optional message. """ if execenv.unattended: interactive = False report = ProcessingReport(success=False) editor = self.processing_param_editor obj = obj or self.current_processing_obj if obj is None: report.message = _("No processing object available.") return report report.obj_uuid = get_uuid(obj) # Extract processing parameters proc_params = extract_processing_parameters(obj) if proc_params is None: report.message = _("Processing metadata is incomplete.") if interactive: QW.QMessageBox.critical(self, _("Error"), report.message) return report # Check if source object still exists if proc_params.source_uuid is None: report.message = _( "Processing metadata is incomplete (missing source UUID)." ) if interactive: QW.QMessageBox.critical(self, _("Error"), report.message) return report # Find source object source_obj = self.panel.mainwindow.find_object_by_uuid(proc_params.source_uuid) if source_obj is None: report.message = _("Source object no longer exists.") if interactive: QW.QMessageBox.critical( self, _("Error"), report.message + "\n\n" + _( "The object that was used to create this processed object " "has been deleted and cannot be used for reprocessing." ), ) return report # Get updated parameters from editor param = editor.dataset if editor is not None else proc_params.param # For cross-panel computations, we need to use the processor from the panel # that owns the source object (e.g., radial_profile is in ImageProcessor) source_processor = self.__get_processor_associated_to(source_obj) # Recompute using the dedicated method (with multiprocessing support) try: new_obj = source_processor.recompute_1_to_1( proc_params.func_name, source_obj, param ) except Exception as exc: # pylint: disable=broad-exception-caught report.message = _("Failed to reprocess object:\n%s") % str(exc) if interactive: QW.QMessageBox.warning(self, _("Error"), report.message) return report if new_obj is None: # User cancelled the operation report.message = _("Processing was cancelled.") else: report.success = True # Update the current object in-place with data from new object obj.title = new_obj.title if isinstance(obj, SignalObj): obj.xydata = new_obj.xydata else: # ImageObj obj.data = new_obj.data # Invalidate ROI mask cache when image dimensions may have changed # (the mask is computed based on image shape, so it must be recomputed) obj.invalidate_maskdata_cache() # Update metadata with new processing parameters updated_proc_params = ProcessingParameters( func_name=proc_params.func_name, pattern=proc_params.pattern, param=param, source_uuid=proc_params.source_uuid, ) insert_processing_parameters(obj, updated_proc_params) # Auto-recompute analysis if the object had analysis parameters # Since the data has changed, any analysis results are now invalid # Use the processor for the current object's type (not source object's type) obj_processor = self.__get_processor_associated_to(obj) obj_processor.auto_recompute_analysis(obj) # Update the tree view item and refresh plot obj_uuid = get_uuid(obj) self.panel.objview.update_item(obj_uuid) self.panel.refresh_plot(obj_uuid, update_items=True, force=True) # Update the Properties tab to reflect the new object properties # (e.g., data type, dimensions, etc.) self.__update_properties_dataset(obj) # Refresh the Processing tab with the new parameters # Don't reset parameters from source object - keep the user's values # Set the Processing tab as current to keep it visible after refresh QC.QTimer.singleShot( 0, lambda: self.setup_processing_tab( obj, reset_params=False, set_current=True ), ) if isinstance(obj, SignalObj): report.message = _("Signal was reprocessed.") else: report.message = _("Image was reprocessed.") self.panel.SIG_STATUS_MESSAGE.emit("✅ " + report.message, 5000) return report
[docs] class AbstractPanelMeta(type(QW.QSplitter), abc.ABCMeta): """Mixed metaclass to avoid conflicts"""
[docs] class AbstractPanel(QW.QSplitter, metaclass=AbstractPanelMeta): """Object defining DataLab panel interface, based on a vertical QSplitter widget A panel handle an object list (objects are signals, images, macros...). Each object must implement ``datalab.gui.ObjItf`` interface """ H5_PREFIX = "" SIG_OBJECT_ADDED = QC.Signal() SIG_OBJECT_REMOVED = QC.Signal() @abc.abstractmethod def __init__(self, parent): super().__init__(QC.Qt.Vertical, parent) self.setObjectName(self.__class__.__name__[0].lower()) # Check if the class implements __len__, __getitem__ and __iter__ for method in ("__len__", "__getitem__", "__iter__"): if not hasattr(self, method): raise NotImplementedError( f"Class {self.__class__.__name__} must implement method {method}" ) # pylint: disable=unused-argument
[docs] def get_serializable_name(self, obj: ObjItf) -> str: """Return serializable name of object""" title = re.sub("[^-a-zA-Z0-9_.() ]+", "", obj.title.replace("/", "_")) name = f"{get_short_id(obj)}: {title}" return name
[docs] def serialize_object_to_hdf5(self, obj: ObjItf, writer: NativeH5Writer) -> None: """Serialize object to HDF5 file""" with writer.group(self.get_serializable_name(obj)): obj.serialize(writer)
[docs] def deserialize_object_from_hdf5( self, reader: NativeH5Reader, name: str, reset_all: bool = False ) -> ObjItf: """Deserialize object from a HDF5 file Args: reader: HDF5 reader name: Object name in HDF5 file reset_all: If True, preserve original UUIDs (workspace reload). If False, regenerate UUIDs (importing objects). """ with reader.group(name): obj = self.create_object() obj.deserialize(reader) # Only regenerate UUIDs when importing objects (reset_all=False). # When reopening a workspace (reset_all=True), preserve original UUIDs # so that processing parameter references (source_uuid, source_uuids) # remain valid and features like "Show source" and "Recompute" work. # When importing, only regenerate UUID if it conflicts with an existing one. if not reset_all and isinstance(obj, (SignalObj, ImageObj, ObjectGroup)): if self.objmodel.has_uuid(get_uuid(obj)): set_uuid(obj) return obj
[docs] @abc.abstractmethod def serialize_to_hdf5(self, writer: NativeH5Writer) -> None: """Serialize whole panel to a HDF5 file"""
[docs] @abc.abstractmethod def deserialize_from_hdf5( self, reader: NativeH5Reader, reset_all: bool = False ) -> None: """Deserialize whole panel from a HDF5 file Args: reader: HDF5 reader reset_all: If True, preserve original UUIDs (workspace reload). If False, regenerate UUIDs (importing objects). """
[docs] @abc.abstractmethod def create_object(self) -> ObjItf: """Create and return object"""
[docs] @abc.abstractmethod def add_object(self, obj: ObjItf) -> None: """Add object to panel"""
[docs] @abc.abstractmethod def remove_all_objects(self): """Remove all objects""" self.SIG_OBJECT_REMOVED.emit()
[docs] class PasteMetadataParam(gds.DataSet): """Paste metadata parameters""" keep_roi = gds.BoolItem(_("Regions of interest"), default=True) keep_geometry = gds.BoolItem(_("Geometry results"), default=False).set_pos(col=1) keep_tables = gds.BoolItem(_("Table results"), default=False).set_pos(col=1) keep_other = gds.BoolItem(_("Other metadata"), default=True)
[docs] class NonModalInfoDialog(QW.QMessageBox): """Non-modal information message box with selectable text. This widget displays an information message in a message dialog box, allowing users to select and copy the text content. """ def __init__(self, parent: QW.QWidget, title: str, text: str) -> None: """Create a non-modal information message box with selectable text. Args: parent: The parent widget. title: The title of the message box. text: The text to display in the message box. """ super().__init__(parent) self.setIcon(QW.QMessageBox.Information) self.setWindowTitle(title) if re.search(r"<[a-zA-Z/][^>]*>", text): self.setTextFormat(QC.Qt.RichText) # type: ignore[attr-defined] self.setTextInteractionFlags( QC.Qt.TextBrowserInteraction # type: ignore[attr-defined] ) else: self.setTextFormat(QC.Qt.PlainText) # type: ignore[attr-defined] self.setTextInteractionFlags( QC.Qt.TextSelectableByMouse # type: ignore[attr-defined] | QC.Qt.TextSelectableByKeyboard # type: ignore[attr-defined] ) self.setText(text) self.setStandardButtons(QW.QMessageBox.Close) self.setDefaultButton(QW.QMessageBox.Close) # ! Necessary only on non-Windows platforms self.setWindowFlags(QC.Qt.Window) # type: ignore[attr-defined] self.setModal(False)
[docs] class SaveToDirectoryGUIParam(gds.DataSet, title=_("Save to directory")): """Save to directory parameters""" def __init__( self, objs: list[TypeObj] | None = None, extensions: list[str] | None = None ) -> None: super().__init__() self.__objs = objs or [] self.__extensions = extensions or []
[docs] def on_button_click( self: SaveToDirectoryGUIParam, _item: gds.ButtonItem, _value: None, parent: QW.QWidget, ) -> None: """Help button callback.""" text = "<br>".join( [ """Pattern accepts a Python format string. Standard Python format specifiers apply. Two extra modifiers are supported: 'upper' for uppercase and 'lower' for lowercase.""", "", "<b>Available placeholders:</b>", """ <table border="1" cellspacing="0" cellpadding="4"> <tr><th>Keyword</th><th>Description</th></tr> <tr><td>{title}</td><td>Title</td></tr> <tr><td>{index}</td><td>1-based index</td></tr> <tr><td>{count}</td><td>Total number of selected objects</td></tr> <tr><td>{xlabel}, {xunit}, {ylabel}, {yunit}</td> <td>Axis information for signals</td></tr> <tr><td>{metadata[key]}</td><td>Specific metadata value<br> <i>(direct {metadata} use is ignored)</i></td></tr> </table> """, "", "<b>Examples:</b>", """ <table border="1" cellspacing="0" cellpadding="4"> <tr><th>Pattern</th><th>Description</th></tr> <tr> <td>{index:03d}</td> <td>3-digit index with leading zeros</td> </tr> <tr> <td>{title:20.20}</td> <td>Title truncated to 20 characters</td> </tr> <tr> <td>{title:20.20upper}</td> <td>Title truncated to 20 characters, upper case</td> </tr> <tr> <td>{title:20.20lower}</td> <td>Title truncated to 20 characters, lower case</td> </tr> </table> """, ] ) NonModalInfoDialog(parent, "Pattern help", text).show()
[docs] def get_extension_choices(self, _item=None, _value=None): """Return list of available extensions for choice item.""" return [("." + ext, "." + ext, None) for ext in self.__extensions]
[docs] def build_filenames(self, objs: list[TypeObj] | None = None) -> list[str]: """Build filenames according to current parameters.""" objs = objs or self.__objs extension = self.extension if self.extension is not None else "" filenames = format_basenames(objs, self.basename + extension) used: set[str] = set() # Ensure all filenames are unique. for i, filename in enumerate(filenames): root, ext = osp.splitext(filename) filepath = osp.join(self.directory, filename) k = 1 while (filename in used) or (not self.overwrite and osp.exists(filepath)): filename = f"{root}_{k}{ext}" filepath = osp.join(self.directory, filename) k += 1 used.add(filename) filenames[i] = filename return filenames
[docs] def generate_filepath_obj_pairs( self, objs: list[TypeObj] ) -> Generator[tuple[str, TypeObj], None, None]: """Iterate over (filepath, object) pairs to be saved.""" for filename, obj in zip(self.build_filenames(objs), objs): yield osp.join(self.directory, filename), obj
[docs] def update_preview(self, _item=None, _value=None) -> None: """Update preview.""" try: filenames = self.build_filenames() preview_lines = [] for i, (obj, filename) in enumerate(zip(self.__objs, filenames), start=1): # Try to get short ID if object has been added to panel try: obj_id = get_short_id(obj) except (ValueError, KeyError): # Fallback to simple index for objects not yet in panel obj_id = str(i) preview_lines.append(f"{obj_id}: {filename}") self.preview = "\n".join(preview_lines) except (ValueError, KeyError, TypeError) as exc: # Handle formatting errors gracefully (e.g., incomplete format string) self.preview = f"Invalid pattern:{os.linesep}{exc}"
directory = gds.DirectoryItem(_("Directory"), default=Conf.main.base_dir.get()) basename = gds.StringItem( _("Basename pattern"), default="{title}", help=_("Python format string. See description for details."), ).set_prop("display", callback=update_preview) help = gds.ButtonItem(_("Help"), on_button_click, "MessageBoxInformation").set_pos( col=1 ) extension = gds.ChoiceItem(_("Extension"), get_extension_choices).set_prop( "display", callback=update_preview ) overwrite = gds.BoolItem( _("Overwrite"), default=False, help=_("Overwrite existing files") ).set_pos(col=1) preview = gds.TextItem( _("Preview"), default=None, regexp=r"^(?!Invalid).*" ).set_prop("display", readonly=True)
[docs] class AddMetadataParam( gds.DataSet, title=_("Add metadata"), comment=_( "Add a new metadata item to the selected objects.<br><br>" "The metadata key will be the same for all objects, " "but the value can use a pattern to generate different values.<br>" "Click the <b>Help</b> button for details on the pattern syntax.<br>" ), ): """Add metadata parameters""" def __init__(self, objs: list[TypeObj] | None = None) -> None: super().__init__() self.__objs = objs or []
[docs] def on_help_button_click( self: AddMetadataParam, _item: gds.ButtonItem, _value: None, parent: QW.QWidget, ) -> None: """Help button callback.""" text = "<br>".join( [ """Pattern accepts a Python format string. Standard Python format specifiers apply. Two extra modifiers are supported: 'upper' for uppercase and 'lower' for lowercase.""", "", "<b>Available placeholders:</b>", """ <table border="1" cellspacing="0" cellpadding="4"> <tr><th>Keyword</th><th>Description</th></tr> <tr><td>{title}</td><td>Title</td></tr> <tr><td>{index}</td><td>1-based index</td></tr> <tr><td>{count}</td><td>Total number of selected objects</td></tr> <tr><td>{xlabel}, {xunit}, {ylabel}, {yunit}</td> <td>Axis information for signals</td></tr> <tr><td>{metadata[key]}</td><td>Specific metadata value<br> <i>(direct {metadata} use is ignored)</i></td></tr> </table> """, "", "<b>Examples:</b>", """ <table border="1" cellspacing="0" cellpadding="4"> <tr><th>Pattern</th><th>Description</th></tr> <tr> <td>{index:03d}</td> <td>3-digit index with leading zeros</td> </tr> <tr> <td>{title:20.20}</td> <td>Title truncated to 20 characters</td> </tr> <tr> <td>{title:20.20upper}</td> <td>Title truncated to 20 characters, upper case</td> </tr> <tr> <td>{title:20.20lower}</td> <td>Title truncated to 20 characters, lower case</td> </tr> </table> """, ] ) NonModalInfoDialog(parent, "Pattern help", text).show()
[docs] def get_conversion_choices(self, _item=None, _value=None): """Return list of available conversion choices.""" return [ ("string", _("String"), None), ("float", _("Float"), None), ("int", _("Integer"), None), ("bool", _("Boolean"), None), ]
[docs] def build_values( self, objs: list[TypeObj] | None = None ) -> list[str | float | int | bool]: """Build values according to current parameters. Raises: ValueError: If a value cannot be converted to the target type. """ objs = objs or self.__objs # Generate values using the pattern raw_values = format_basenames(objs, self.value_pattern) # Convert values according to the selected conversion type converted_values = [] for i, value_str in enumerate(raw_values, start=1): if self.conversion == "string": converted_values.append(value_str) elif self.conversion == "float": try: converted_values.append(float(value_str)) except ValueError as exc: raise ValueError( f"Cannot convert value at index {i} to float: '{value_str}'" ) from exc elif self.conversion == "int": try: converted_values.append(int(value_str)) except ValueError as exc: raise ValueError( f"Cannot convert value at index {i} to integer: '{value_str}'" ) from exc elif self.conversion == "bool": # Convert to boolean: "true", "1", "yes" -> True, others -> False lower_val = value_str.lower() converted_values.append(lower_val in ("true", "1", "yes", "on")) return converted_values
[docs] def update_preview(self, _item=None, _value=None) -> None: """Update preview.""" try: values = self.build_values() preview_lines = [] for i, (obj, value) in enumerate(zip(self.__objs, values), start=1): # Try to get short ID if object has been added to panel try: obj_id = get_short_id(obj) except (ValueError, KeyError): # Fallback to simple index for objects not yet in panel obj_id = str(i) preview_lines.append(f"{obj_id}: {self.metadata_key} = {value!r}") self.preview = "\n".join(preview_lines) except ValueError as exc: # Handle conversion errors self.preview = f"Invalid conversion:{os.linesep}{exc}" except (KeyError, TypeError) as exc: # Handle formatting errors (e.g., incomplete format string) self.preview = f"Invalid pattern:{os.linesep}{exc}"
metadata_key = gds.StringItem( _("Metadata key"), default="custom_key", notempty=True, regexp=r"^[a-zA-Z_][a-zA-Z0-9_]*$", help=_("The key name for the metadata item"), ).set_prop("display", callback=update_preview) value_pattern = gds.StringItem( _("Value pattern"), default="{index}", help=_("Python format string. See description for details."), ).set_prop("display", callback=update_preview) help = gds.ButtonItem( _("Help"), on_help_button_click, "MessageBoxInformation" ).set_pos(col=1) conversion = gds.ChoiceItem( _("Conversion"), get_conversion_choices, default="string" ).set_prop("display", callback=update_preview) preview = gds.TextItem(_("Preview"), default="", regexp=r"^(?!Invalid).*").set_prop( "display", readonly=True )
[docs] class BaseDataPanel(AbstractPanel, Generic[TypeObj, TypeROI, TypeROIEditor]): """Object handling the item list, the selected item properties and plot""" PANEL_STR = "" # e.g. "Signal Panel" PANEL_STR_ID = "" # e.g. "signal" PARAMCLASS: TypeObj = None # Replaced in child object ANNOTATION_TOOLS = () MINDIALOGSIZE = (800, 600) MAXDIALOGSIZE = 0.95 # % of DataLab's main window size # Replaced by the right class in child object: IO_REGISTRY: SignalIORegistry | ImageIORegistry | None = None SIG_STATUS_MESSAGE = QC.Signal(str, int) # emitted by "qt_try_except" decorator SIG_REFRESH_PLOT = QC.Signal( str, bool, bool, bool, bool ) # Connected to PlotHandler.refresh_plot
[docs] @staticmethod @abc.abstractmethod def get_roi_class() -> Type[TypeROI]: """Return ROI class"""
[docs] @staticmethod @abc.abstractmethod def get_roieditor_class() -> Type[TypeROIEditor]: """Return ROI editor class"""
@abc.abstractmethod def __init__(self, parent: QW.QWidget) -> None: super().__init__(parent) self.mainwindow: DLMainWindow = parent self.objprop = ObjectProp(self, self.PARAMCLASS) self.objmodel = objectmodel.ObjectModel(f"g{self.PARAMCLASS.PREFIX}") self.objview = objectview.ObjectView(self, self.objmodel) self.objview.SIG_IMPORT_FILES.connect(self.handle_dropped_files) self.objview.populate_tree() self.plothandler: SignalPlotHandler | ImagePlotHandler = None self.processor: SignalProcessor | ImageProcessor = None self.acthandler: actionhandler.BaseActionHandler = None self.metadata_clipboard = {} self.annotations_clipboard: list[dict[str, Any]] = [] self.__roi_clipboard: TypeROI | None = None self.context_menu = QW.QMenu() self.__separate_views: dict[QW.QDialog, TypeObj] = {}
[docs] def closeEvent(self, event): """Reimplement QMainWindow method""" self.processor.close() super().closeEvent(event)
# ------AbstractPanel interface-----------------------------------------------------
[docs] def plot_item_parameters_changed( self, item: CurveItem | MaskedXYImageItem | LabelItem ) -> None: """Plot items changed: update metadata of all objects from plot items""" # Find the object corresponding to the plot item obj = self.plothandler.get_obj_from_item(item) if obj is not None: # Unselect the item in the plot so that we update the item parameters # in the right state (fix issue #184): item.unselect() # Ensure that item's parameters are up-to-date: item.param.update_param(item) # Update object metadata from plot item parameters create_adapter_from_object(obj).update_metadata_from_plot_item(item) if obj is self.objview.get_current_object(): self.objprop.update_properties_from(obj) self.plothandler.update_resultproperty_from_plot_item(item)
# pylint: disable=unused-argument
[docs] def plot_item_moved( self, item: LabelItem, x0: float, y0: float, x1: float, y1: float ) -> None: """Plot item moved: update metadata of all objects from plot items Args: item: Plot item x0: new x0 coordinate y0: new y0 coordinate x1: new x1 coordinate y1: new y1 coordinate """ self.plothandler.update_resultproperty_from_plot_item(item)
[docs] def serialize_object_to_hdf5(self, obj: TypeObj, writer: NativeH5Writer) -> None: """Serialize object to HDF5 file""" # Before serializing, update metadata from plot item parameters, in order to # save the latest visualization settings: try: item = self.plothandler[get_uuid(obj)] create_adapter_from_object(obj).update_metadata_from_plot_item(item) except KeyError: # Plot item has not been created yet (this happens when auto-refresh has # been disabled) pass super().serialize_object_to_hdf5(obj, writer)
[docs] def serialize_to_hdf5(self, writer: NativeH5Writer) -> None: """Serialize whole panel to a HDF5 file""" with writer.group(self.H5_PREFIX): for group in self.objmodel.get_groups(): with writer.group(self.get_serializable_name(group)): with writer.group("title"): writer.write_str(group.title) for obj in group.get_objects(): self.serialize_object_to_hdf5(obj, writer)
[docs] def deserialize_from_hdf5( self, reader: NativeH5Reader, reset_all: bool = False ) -> None: """Deserialize whole panel from a HDF5 file Args: reader: HDF5 reader reset_all: If True, preserve original UUIDs (workspace reload). If False, regenerate UUIDs (importing objects). """ with reader.group(self.H5_PREFIX): for name in reader.h5.get(self.H5_PREFIX, []): with reader.group(name): group = self.add_group("") with reader.group("title"): group.title = reader.read_str() for obj_name in reader.h5.get(f"{self.H5_PREFIX}/{name}", []): obj = self.deserialize_object_from_hdf5( reader, obj_name, reset_all ) self.add_object(obj, get_uuid(group), set_current=False) self.selection_changed()
def __len__(self) -> int: """Return number of objects""" return len(self.objmodel) def __getitem__(self, nb: int) -> TypeObj: """Return object from its number (1 to N)""" return self.objmodel.get_object_from_number(nb) def __iter__(self): """Iterate over objects""" return iter(self.objmodel)
[docs] def create_object(self) -> TypeObj: """Create object (signal or image) Returns: SignalObj or ImageObj object """ return self.PARAMCLASS() # pylint: disable=not-callable
[docs] @qt_try_except() def add_object( self, obj: TypeObj, group_id: str | None = None, set_current: bool = True, ) -> None: """Add object Args: obj: SignalObj or ImageObj object group_id: group id to which the object belongs. If None or empty string, the object is added to the current group. set_current: if True, set the added object as current """ if obj in self.objmodel: # Prevent adding the same object twice raise ValueError( f"Object {hex(id(obj))} already in panel. " f"The same object cannot be added twice: " f"please use a copy of the object." ) if group_id is None or group_id == "": group_id = self.objview.get_current_group_id() if group_id is None: groups = self.objmodel.get_groups() if groups: group_id = get_uuid(groups[0]) else: group_id = get_uuid(self.add_group("")) obj.check_data() self.objmodel.add_object(obj, group_id) # Mark this object as newly created to show Creation tab on first selection # BUT: Don't overwrite if this object is already marked as freshly processed # or has fresh analysis results (those take precedence) obj_uuid = get_uuid(obj) if ( obj_uuid != self.objprop._fresh_processing_obj_uuid and obj_uuid != self.objprop._fresh_analysis_obj_uuid ): self.objprop.mark_as_newly_created(obj) # Block signals to avoid updating the plot (unnecessary refresh) self.objview.blockSignals(True) self.objview.add_object_item(obj, group_id, set_current=set_current) self.objview.blockSignals(False) # Emit signal to ensure that the data panel is shown in the main window and # that the plot is updated (trigger a refresh of the plot) self.SIG_OBJECT_ADDED.emit() self.objview.update_tree()
[docs] def remove_all_objects(self) -> None: """Remove all objects""" # iterate over a copy of self.__separate_views dict keys to avoid RuntimeError: # dictionary changed size during iteration for dlg in list(self.__separate_views): dlg.done(QW.QDialog.DialogCode.Rejected) self.objmodel.clear() self.plothandler.clear() self.objview.populate_tree() self.refresh_plot("selected", True, False) super().remove_all_objects() # Update object properties panel to clear creation/processing tabs self.selection_changed()
# ---- Signal/Image Panel API ------------------------------------------------------
[docs] def setup_panel(self) -> None: """Setup panel""" self.acthandler.create_all_actions() self.processor.SIG_ADD_SHAPE.connect(self.plothandler.add_shapes) self.SIG_REFRESH_PLOT.connect(self.plothandler.refresh_plot) self.objview.SIG_SELECTION_CHANGED.connect(self.selection_changed) self.objview.SIG_ITEM_DOUBLECLICKED.connect( lambda oid: self.open_separate_view([oid]) ) self.objview.SIG_CONTEXT_MENU.connect(self.__popup_contextmenu) self.objprop.properties.SIG_APPLY_BUTTON_CLICKED.connect( self.properties_changed ) self.addWidget(self.objview) self.addWidget(self.objprop)
[docs] def refresh_plot( self, what: str, update_items: bool = True, force: bool = False, only_visible: bool = True, only_existing: bool = False, ) -> None: """Refresh plot. Args: what: string describing the objects to refresh. Valid values are "selected" (refresh the selected objects), "all" (refresh all objects), "existing" (refresh existing plot items), or an object uuid. update_items: if True, update the items. If False, only show the items (do not update them). Defaults to True. force: if True, force refresh even if auto refresh is disabled. Defaults to False. only_visible: if True, only refresh visible items. Defaults to True. Visible items are the ones that are not hidden by other items or the items except the first one if the option "Show first only" is enabled. This is useful for images, where the last image is the one that is shown. If False, all items are refreshed. only_existing: if True, only refresh existing items. Defaults to False. Existing items are the ones that have already been created and are associated to the object uuid. If False, create new items for the objects that do not have an item yet. Raises: ValueError: if `what` is not a valid value """ if what not in ("selected", "all", "existing") and not isinstance(what, str): raise ValueError(f"Invalid value for 'what': {what}") self.SIG_REFRESH_PLOT.emit( what, update_items, force, only_visible, only_existing )
[docs] def manual_refresh(self) -> None: """Manual refresh""" self.refresh_plot("selected", True, True)
[docs] def get_category_actions( self, category: actionhandler.ActionCategory ) -> list[QW.QAction]: # pragma: no cover """Return actions for category""" return self.acthandler.feature_actions.get(category, [])
[docs] def get_context_menu(self) -> QW.QMenu: """Update and return context menu""" # Note: For now, this is completely unnecessary to clear context menu everytime, # but implementing it this way could be useful in the future in menu contents # should take into account current object selection self.context_menu.clear() actions = self.get_category_actions(actionhandler.ActionCategory.CONTEXT_MENU) add_actions(self.context_menu, actions) return self.context_menu
def __popup_contextmenu(self, position: QC.QPoint) -> None: # pragma: no cover """Popup context menu at position""" menu = self.get_context_menu() menu.popup(position) # ------Creating, adding, removing objects------------------------------------------
[docs] def add_group(self, title: str, select: bool = False) -> objectmodel.ObjectGroup: """Add group Args: title: group title select: if True, select the group in the tree view. Defaults to False. Returns: Created group object """ group = self.objmodel.add_group(title) self.objview.add_group_item(group) if select: self.objview.select_groups([group]) return group
def __duplicate_individual_obj( self, oid: str, new_group_id: str | None = None, set_current: bool = True ) -> None: """Duplicate individual object""" obj = self.objmodel[oid] if new_group_id is None: new_group_id = self.objmodel.get_object_group_id(obj) self.add_object(obj.copy(), group_id=new_group_id, set_current=set_current)
[docs] def duplicate_object(self) -> None: """Duplication signal/image object""" if not self.mainwindow.confirm_memory_state(): return # Duplicate individual objects (exclusive with respect to groups) for oid in self.objview.get_sel_object_uuids(): self.__duplicate_individual_obj(oid, set_current=False) # Duplicate groups (exclusive with respect to individual objects) for group in self.objview.get_sel_groups(): new_group = self.add_group(group.title) for oid in self.objmodel.get_group_object_ids(get_uuid(group)): self.__duplicate_individual_obj( oid, get_uuid(new_group), set_current=False ) self.selection_changed(update_items=True)
[docs] def copy_metadata(self) -> None: """Copy object metadata""" obj = self.objview.get_sel_objects()[0] self.metadata_clipboard = obj.metadata.copy() # Rename geometry results to avoid conflicts when pasting to same object type new_pref = get_short_id(obj) + "_" self._rename_results_in_clipboard(new_pref) # Update action states (e.g., "Paste metadata" should now be enabled) self.selection_changed()
def _rename_results_in_clipboard(self, prefix: str) -> None: """Rename geometry and table results in clipboard to avoid conflicts. Args: prefix: Prefix to add to result titles """ for aclass in (GeometryAdapter, TableAdapter): result_keys = [ k for k, v in self.metadata_clipboard.items() if aclass.match(k, v) ] for dict_key in result_keys: try: # Get the result data result_data = self.metadata_clipboard[dict_key] # Update the title in the result data if isinstance(result_data, dict) and "title" in result_data: result_data = result_data.copy() # Don't modify original result_data["title"] = prefix + result_data["title"] # Create new key with updated title new_dict_key = dict_key.replace( aclass.META_PREFIX, aclass.META_PREFIX + prefix, 1 ) # Remove old entry and add new one del self.metadata_clipboard[dict_key] self.metadata_clipboard[new_dict_key] = result_data except (KeyError, ValueError, IndexError, TypeError): # If we can't process this result, leave it as is continue
[docs] def paste_metadata(self, param: PasteMetadataParam | None = None) -> None: """Paste metadata to selected object(s)""" if param is None: param = PasteMetadataParam( _("Paste metadata"), comment=_( "Select what to keep from the clipboard.<br><br>" "Result shapes and annotations, if kept, will be merged with " "existing ones. <u>All other metadata will be replaced</u>." ), ) if not param.edit(parent=self.parentWidget()): return metadata = {} if param.keep_roi and ROI_KEY in self.metadata_clipboard: metadata[ROI_KEY] = self.metadata_clipboard[ROI_KEY] if param.keep_geometry: for key, value in self.metadata_clipboard.items(): if GeometryAdapter.match(key, value): metadata[key] = value if param.keep_tables: for key, value in self.metadata_clipboard.items(): if TableAdapter.match(key, value): metadata[key] = value if param.keep_other: for key, value in self.metadata_clipboard.items(): if ( not GeometryAdapter.match(key, value) and not TableAdapter.match(key, value) and key not in METADATA_PASTE_EXCLUSIONS ): metadata[key] = value sel_objects = self.objview.get_sel_objects(include_groups=True) for obj in sorted(sel_objects, key=get_short_id, reverse=True): obj.update_metadata_from(metadata) # We have to do a special refresh in order to force the plot handler to update # all plot items, even the ones that are not visible (otherwise, image masks # would not be updated after pasting the metadata: see issue #123) self.refresh_plot( "selected", update_items=True, only_visible=False, only_existing=True )
[docs] def add_metadata(self, param: AddMetadataParam | None = None) -> None: """Add metadata item to selected object(s) Args: param: Add metadata parameters """ sel_objects = self.objview.get_sel_objects(include_groups=True) if not sel_objects: return if param is None: param = AddMetadataParam(sel_objects) # Restore settings from config saved_param = Conf.io.add_metadata_settings.get(default=AddMetadataParam()) update_dataset(param, saved_param) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=gds.DataItemValidationWarning) if not param.edit(parent=self.parentWidget(), wordwrap=False): return # Save settings to config Conf.io.add_metadata_settings.set(param) # Build values for all selected objects values = param.build_values(sel_objects) # Add metadata to each object for obj, value in zip(sel_objects, values): obj.metadata[param.metadata_key] = value # Refresh the plot to update any changes self.refresh_plot( "selected", update_items=True, only_visible=False, only_existing=True )
[docs] def copy_roi(self) -> None: """Copy regions of interest""" obj = self.objview.get_sel_objects()[0] self.__roi_clipboard = obj.roi.copy()
[docs] def paste_roi(self) -> None: """Paste regions of interest""" sel_objects = self.objview.get_sel_objects(include_groups=True) for obj in sel_objects: if obj.roi is None: obj.roi = self.__roi_clipboard.copy() else: obj.roi = obj.roi.combine_with(self.__roi_clipboard) self.selection_changed(update_items=True) self.refresh_plot( "selected", update_items=True, only_visible=False, only_existing=True )
[docs] def remove_object(self, force: bool = False) -> None: """Remove signal/image object Args: force: if True, remove object without confirmation. Defaults to False. """ sel_groups = self.objview.get_sel_groups() if sel_groups and not force and not execenv.unattended: answer = QW.QMessageBox.warning( self, _("Delete group(s)"), _("Are you sure you want to delete the selected group(s)?"), QW.QMessageBox.Yes | QW.QMessageBox.No, ) if answer == QW.QMessageBox.No: return sel_objects = self.objview.get_sel_objects(include_groups=True) for obj in sorted(sel_objects, key=get_short_id, reverse=True): dlg_list: list[QW.QDialog] = [] for dlg, obj_i in self.__separate_views.items(): if obj_i is obj: dlg_list.append(dlg) for dlg in dlg_list: dlg.done(QW.QDialog.DialogCode.Rejected) self.plothandler.remove_item(get_uuid(obj)) self.objview.remove_item(get_uuid(obj), refresh=False) self.objmodel.remove_object(obj) for group in sel_groups: self.objview.remove_item(get_uuid(group), refresh=False) self.objmodel.remove_group(group) self.objview.update_tree() self.selection_changed(update_items=True) self.SIG_OBJECT_REMOVED.emit()
[docs] def delete_all_objects(self) -> None: # pragma: no cover """Confirm before removing all objects""" if len(self) == 0: return if execenv.unattended: raise RuntimeError( "This method should not be executed in unattended mode: " "call remove_all_objects instead." ) answer = QW.QMessageBox.warning( self, _("Delete all"), _("Do you want to delete all objects (%s)?") % self.PANEL_STR, QW.QMessageBox.Yes | QW.QMessageBox.No, ) if answer == QW.QMessageBox.Yes: self.remove_all_objects()
[docs] def delete_metadata( self, refresh_plot: bool = True, keep_roi: bool | None = None ) -> None: """Delete metadata of selected objects Args: refresh_plot: Refresh plot. Defaults to True. keep_roi: Keep regions of interest, if any. Defaults to None (ask user). """ sel_objs = self.objview.get_sel_objects(include_groups=True) # Check if there are regions of interest first: roi_backup: dict[TypeObj, np.ndarray] = {} if any(obj.roi is not None for obj in sel_objs): if execenv.unattended and keep_roi is None: keep_roi = False elif keep_roi is None: answer = QW.QMessageBox.warning( self, _("Delete metadata"), _( "Some selected objects have regions of interest.<br>" "Do you want to delete them as well?" ), QW.QMessageBox.Yes | QW.QMessageBox.No | QW.QMessageBox.Cancel, ) if answer == QW.QMessageBox.Cancel: return keep_roi = answer == QW.QMessageBox.No if keep_roi: for obj in sel_objs: if obj.roi is not None: roi_backup[obj] = obj.roi # Delete metadata: for index, obj in enumerate(sel_objs): obj.reset_metadata_to_defaults() if not keep_roi: obj.mark_roi_as_changed() if obj in roi_backup: obj.roi = roi_backup[obj] if index == 0: self.selection_changed() # When calling object `reset_metadata_to_defaults` method, we removed all # metadata application options, among them the object number which is used # to compute the short ID of the object. # So we have to reset the short IDs of all objects in the model to recalculate # the object numbers: self.objmodel.reset_short_ids() if refresh_plot: # We have to do a special refresh in order to force the plot handler to # update all plot items, even the ones that are not visible (otherwise, # image masks would remained visible after deleting the ROI for example: # see issue #122) self.refresh_plot( "selected", update_items=True, only_visible=False, only_existing=True )
[docs] def add_annotations_from_items( self, items: list, refresh_plot: bool = True ) -> None: """Add object annotations (annotation plot items). Args: items: annotation plot items refresh_plot: refresh plot. Defaults to True. """ for obj in self.objview.get_sel_objects(include_groups=True): create_adapter_from_object(obj).add_annotations_from_items(items) if refresh_plot: self.refresh_plot("selected", True, False)
[docs] def update_metadata_view_settings(self) -> None: """Update metadata view settings""" def_dict = Conf.view.get_def_dict(self.__class__.__name__[:3].lower()) for obj in self.objmodel: obj.set_metadata_options_defaults(def_dict, overwrite=True) self.refresh_plot("all", True, False)
[docs] def copy_titles_to_clipboard(self) -> None: """Copy object titles to clipboard (for reproducibility)""" QW.QApplication.clipboard().setText(str(self.objview))
[docs] def new_group(self) -> None: """Create a new group""" # Open a message box to enter the group name group_name, ok = QW.QInputDialog.getText(self, _("New group"), _("Group name:")) if ok: self.add_group(group_name)
[docs] def rename_selected_object_or_group(self, new_name: str | None = None) -> None: """Rename selected object or group Args: new_name: new name (default: None, i.e. ask user) """ sel_objects = self.objview.get_sel_objects(include_groups=False) sel_groups = self.objview.get_sel_groups() if (not sel_objects and not sel_groups) or len(sel_objects) + len( sel_groups ) > 1: # Won't happen in the application, but could happen in tests or using the # API directly raise ValueError("Select one object or group to rename") if sel_objects: obj = sel_objects[0] if new_name is None: new_name, ok = QW.QInputDialog.getText( self, _("Rename object"), _("Object name:"), QW.QLineEdit.Normal, obj.title, ) if not ok: return obj.title = new_name self.objview.update_item(get_uuid(obj)) self.objprop.update_properties_from(obj) elif sel_groups: group = sel_groups[0] if new_name is None: new_name, ok = QW.QInputDialog.getText( self, _("Rename group"), _("Group name:"), QW.QLineEdit.Normal, group.title, ) if not ok: return group.title = new_name self.objview.update_item(get_uuid(group))
[docs] @abc.abstractmethod def get_newparam_from_current( self, newparam: NewSignalParam | NewImageParam | None = None ) -> NewSignalParam | NewImageParam | None: """Get new object parameters from the current object. Args: newparam: new object parameters. If None, create a new one. Returns: New object parameters """
[docs] @abc.abstractmethod def new_object( self, param: NewSignalParam | NewImageParam | None = None, edit: bool = False, add_to_panel: bool = True, ) -> TypeObj | None: """Create a new object (signal/image). Args: param: new object parameters edit: Open a dialog box to edit parameters (default: False). When False, the object is created with default parameters and creation parameters are stored in metadata for interactive editing. add_to_panel: Add object to panel (default: True) Returns: New object """
[docs] def set_current_object_title(self, title: str) -> None: """Set current object title""" obj = self.objview.get_current_object() obj.title = title self.objview.update_item(get_uuid(obj))
def __load_from_file( self, filename: str, create_group: bool = True, add_objects: bool = True ) -> list[SignalObj] | list[ImageObj]: """Open objects from file (signal/image), add them to DataLab and return them. Args: filename: file name create_group: if True, create a new group if more than one object is loaded. Defaults to True. If False, all objects are added to the current group. add_objects: if True, add objects to the panel. Defaults to True. Returns: New object or list of new objects """ worker = CallbackWorker(lambda worker: self.IO_REGISTRY.read(filename, worker)) objs = qt_long_callback(self, _("Reading objects from file"), worker, True) group_id = None if len(objs) > 1 and create_group: # Create a new group if more than one object is loaded group_id = get_uuid(self.add_group(osp.basename(filename))) with create_progress_bar( self, _("Adding objects to workspace"), max_=len(objs) - 1 ) as progress: for i_obj, obj in enumerate(objs): progress.setValue(i_obj + 1) if progress.wasCanceled(): break if add_objects: set_uuid(obj) # In case the object UUID was serialized in the file, # we need to reset it to a new UUID to avoid conflicts # (e.g. HDF5 file) self.add_object(obj, group_id=group_id, set_current=obj is objs[-1]) self.selection_changed() return objs def __save_to_file(self, obj: TypeObj, filename: str) -> None: """Save object to file (signal/image). Args: obj: object filename: file name """ self.IO_REGISTRY.write(filename, obj)
[docs] def load_from_directory(self, directory: str | None = None) -> list[TypeObj]: """Open objects from directory (signals or images, depending on the panel), add them to DataLab and return them. If the directory is not specified, ask the user to select a directory. Args: directory: directory name Returns: list of new objects """ if not self.mainwindow.confirm_memory_state(): return [] if directory is None: # pragma: no cover basedir = Conf.main.base_dir.get() with save_restore_stds(): directory = getexistingdirectory(self, _("Open"), basedir) if not directory: return [] folders = [ path for path in glob.glob(osp.join(directory, "**"), recursive=True) if osp.isdir(path) and len(os.listdir(path)) > 0 ] objs = [] with create_progress_bar( self, _("Scanning directory"), max_=len(folders) - 1 ) as progress: # Iterate over all subfolders in the directory: for i_path, path in enumerate(folders): progress.setValue(i_path + 1) if progress.wasCanceled(): break path = osp.normpath(path) fnames = sorted( [ osp.join(path, fname) for fname in os.listdir(path) if osp.isfile(osp.join(path, fname)) ] ) new_objs = self.load_from_files( fnames, create_group=False, add_objects=False, ignore_errors=True, ) if new_objs: objs += new_objs grp_name = osp.relpath(path, directory) if grp_name == ".": grp_name = osp.basename(path) grp = self.add_group(grp_name) for obj in new_objs: self.add_object(obj, group_id=get_uuid(grp), set_current=False) return objs
[docs] def load_from_files( self, filenames: list[str] | None = None, create_group: bool = False, add_objects: bool = True, ignore_errors: bool = False, ) -> list[TypeObj]: """Open objects from file (signals/images), add them to DataLab and return them. Args: filenames: File names create_group: if True, create a new group if more than one object is loaded for a single file. Defaults to False: all objects are added to the current group. add_objects: if True, add objects to the panel. Defaults to True. ignore_errors: if True, ignore errors when loading files. Defaults to False. Returns: list of new objects """ if not self.mainwindow.confirm_memory_state(): return [] if filenames is None: # pragma: no cover basedir = Conf.main.base_dir.get() filters = self.IO_REGISTRY.get_read_filters() with save_restore_stds(): filenames, _filt = getopenfilenames(self, _("Open"), basedir, filters) # Sort filenames to ensure consistent alphabetical order across all platforms filenames = sorted(filenames) objs = [] for filename in filenames: with qt_try_loadsave_file(self.parentWidget(), filename, "load"): Conf.main.base_dir.set(filename) try: objs += self.__load_from_file( filename, create_group=create_group, add_objects=add_objects ) except Exception as exc: # pylint: disable=broad-exception-caught if ignore_errors: # Ignore unknown file types pass else: raise exc return objs
[docs] def save_to_files(self, filenames: list[str] | str | None = None) -> None: """Save selected objects to files (signal/image). Args: filenames: File names """ objs = self.objview.get_sel_objects(include_groups=True) if filenames is None: # pragma: no cover filenames = [None] * len(objs) assert len(filenames) == len(objs), ( "Number of filenames must match number of objects" ) for index, obj in enumerate(objs): filename = filenames[index] if filename is None: basedir = Conf.main.base_dir.get() filters = self.IO_REGISTRY.get_write_filters() with save_restore_stds(): filename, _filt = getsavefilename( self, _("Save as"), basedir, filters ) if filename: with qt_try_loadsave_file(self.parentWidget(), filename, "save"): Conf.main.base_dir.set(filename) self.__save_to_file(obj, filename)
[docs] def save_to_directory(self, param: SaveToDirectoryParam | None = None) -> None: """Save signals or images to directory using a filename pattern. Opens a dialog to select the output directory, the basename pattern and the extension. Args: param: parameters. """ objs = self.objview.get_sel_objects(include_groups=True) if param is None: extensions = get_file_extensions(self.IO_REGISTRY.get_write_filters()) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=gds.DataItemValidationWarning) guiparam = SaveToDirectoryGUIParam(objs, extensions) # Restore settings from config saved_param = Conf.io.save_to_directory_settings.get( default=SaveToDirectoryParam() ) update_dataset(guiparam, saved_param) # Validate extension: set to first if None or not in available list # Note: extensions list has no dots, but guiparam.extension has dot extensions_with_dot = ["." + ext for ext in extensions] if ( guiparam.extension is None or guiparam.extension not in extensions_with_dot ): guiparam.extension = extensions_with_dot[0] if extensions else "" if not guiparam.edit(parent=self.parentWidget()): return param = SaveToDirectoryParam() update_dataset(param, guiparam) # Save settings to config Conf.io.save_to_directory_settings.set(param) Conf.main.base_dir.set(param.directory) with create_progress_bar(self, _("Saving..."), max_=len(objs)) as progress: for i, (path, obj) in enumerate(param.generate_filepath_obj_pairs(objs)): progress.setValue(i + 1) if progress.wasCanceled(): break with qt_try_loadsave_file(self.parentWidget(), path, "save"): self.__save_to_file(obj, path)
[docs] def handle_dropped_files(self, filenames: list[str] | None = None) -> None: """Handle dropped files Args: filenames: File names Returns: None """ dirnames = [fname for fname in filenames if osp.isdir(fname)] h5_fnames = [ fname for fname in filenames if is_hdf5_file(fname, check_content=True) ] other_fnames = sorted(list(set(filenames) - set(h5_fnames) - set(dirnames))) if dirnames: for dirname in dirnames: self.load_from_directory(dirname) if h5_fnames: self.mainwindow.open_h5_files(h5_fnames, import_all=True) if other_fnames: self.load_from_files(other_fnames)
[docs] def exec_import_wizard(self) -> None: """Execute import wizard""" wizard = TextImportWizard(self.PANEL_STR_ID, parent=self.parentWidget()) if exec_dialog(wizard): objs = wizard.get_objs() if objs: with create_progress_bar( self, _("Adding objects to workspace"), max_=len(objs) - 1 ) as progress: for idx, obj in enumerate(objs): progress.setValue(idx) QW.QApplication.processEvents() if progress.wasCanceled(): break self.add_object(obj)
[docs] def import_metadata_from_file(self, filename: str | None = None) -> None: """Import metadata from file (JSON). Args: filename: File name """ if filename is None: # pragma: no cover basedir = Conf.main.base_dir.get() with save_restore_stds(): filename, _filter = getopenfilename( self, _("Import metadata"), basedir, "*.dlabmeta" ) if filename: with qt_try_loadsave_file(self.parentWidget(), filename, "load"): Conf.main.base_dir.set(filename) obj = self.objview.get_sel_objects(include_groups=True)[0] obj.metadata = read_metadata(filename) self.refresh_plot("selected", True, False)
[docs] def export_metadata_from_file(self, filename: str | None = None) -> None: """Export metadata to file (JSON). Args: filename: File name """ obj = self.objview.get_sel_objects(include_groups=True)[0] if filename is None: # pragma: no cover basedir = Conf.main.base_dir.get() with save_restore_stds(): filename, _filt = getsavefilename( self, _("Export metadata"), basedir, "*.dlabmeta" ) if filename: with qt_try_loadsave_file(self.parentWidget(), filename, "save"): Conf.main.base_dir.set(filename) write_metadata(filename, obj.metadata)
[docs] def copy_annotations(self) -> None: """Copy annotations from selected object""" obj = self.objview.get_sel_objects(include_groups=True)[0] self.annotations_clipboard = obj.get_annotations() # Update action states (e.g., "Paste annotations" should now be enabled) self.selection_changed()
[docs] def paste_annotations(self) -> None: """Paste annotations to selected object(s)""" if not self.annotations_clipboard: return sel_objects = self.objview.get_sel_objects(include_groups=True) for obj in sel_objects: obj.set_annotations(self.annotations_clipboard) self.refresh_plot("selected", True, False) # Update action states (e.g., annotation-related actions should now be enabled) self.selection_changed()
[docs] def import_annotations_from_file(self, filename: str | None = None) -> None: """Import annotations from file (JSON). Args: filename: File name """ if filename is None: # pragma: no cover basedir = Conf.main.base_dir.get() with save_restore_stds(): filename, _filter = getopenfilename( self, _("Import annotations"), basedir, "*.dlabann" ) if filename: with qt_try_loadsave_file(self.parentWidget(), filename, "load"): Conf.main.base_dir.set(filename) obj = self.objview.get_sel_objects(include_groups=True)[0] annotations = read_annotations(filename) obj.set_annotations(annotations) self.refresh_plot("selected", True, False) # Update action states (annotation-related actions should now be enabled) self.selection_changed()
[docs] def export_annotations_from_file(self, filename: str | None = None) -> None: """Export annotations to file (JSON). Args: filename: File name """ obj = self.objview.get_sel_objects(include_groups=True)[0] if filename is None: # pragma: no cover basedir = Conf.main.base_dir.get() with save_restore_stds(): filename, _filt = getsavefilename( self, _("Export annotations"), basedir, "*.dlabann" ) if filename: with qt_try_loadsave_file(self.parentWidget(), filename, "save"): Conf.main.base_dir.set(filename) annotations = obj.get_annotations() write_annotations(filename, annotations)
[docs] def delete_annotations(self) -> None: """Delete all annotations from selected object(s)""" sel_objects = self.objview.get_sel_objects(include_groups=True) for obj in sel_objects: obj.clear_annotations() self.refresh_plot("selected", True, False) # Update action states (annotation-related actions should now be disabled) self.selection_changed()
[docs] def import_roi_from_file(self, filename: str | None = None) -> None: """Import regions of interest from file (JSON). Args: filename: File name """ if filename is None: # pragma: no cover basedir = Conf.main.base_dir.get() with save_restore_stds(): filename, _filter = getopenfilename( self, _("Import ROI"), basedir, "*.dlabroi" ) if filename: with qt_try_loadsave_file(self.parentWidget(), filename, "load"): Conf.main.base_dir.set(filename) obj = self.objview.get_sel_objects(include_groups=True)[0] roi = read_roi(filename) if obj.roi is None: obj.roi = roi else: obj.roi = obj.roi.combine_with(roi) self.selection_changed(update_items=True) self.refresh_plot("selected", True, False)
[docs] def export_roi_to_file(self, filename: str | None = None) -> None: """Export regions of interest to file (JSON). Args: filename: File name """ obj = self.objview.get_sel_objects(include_groups=True)[0] assert obj.roi is not None if filename is None: # pragma: no cover basedir = Conf.main.base_dir.get() with save_restore_stds(): filename, _filt = getsavefilename( self, _("Export ROI"), basedir, "*.dlabroi" ) if filename: with qt_try_loadsave_file(self.parentWidget(), filename, "save"): Conf.main.base_dir.set(filename) write_roi(filename, obj.roi)
# ------Refreshing GUI--------------------------------------------------------------
[docs] def selection_changed(self, update_items: bool = False) -> None: """Object selection changed: update object properties, refresh plot and update object view. Args: update_items: Update plot items (default: False) """ selected_objects = self.objview.get_sel_objects(include_groups=True) selected_groups = self.objview.get_sel_groups() # Determine which tab to show based on object state current_obj = self.objview.get_current_object() force_tab = None if current_obj is not None: obj_uuid = get_uuid(current_obj) # Show Creation tab for newly created objects (only once) if obj_uuid == self.objprop._newly_created_obj_uuid: force_tab = "creation" self.objprop._newly_created_obj_uuid = None # Show Processing tab for freshly processed objects (only once) elif obj_uuid == self.objprop._fresh_processing_obj_uuid: force_tab = "processing" self.objprop._fresh_processing_obj_uuid = None # Show Analysis tab for objects with fresh analysis results elif obj_uuid == self.objprop._fresh_analysis_obj_uuid: force_tab = "analysis" self.objprop._fresh_analysis_obj_uuid = None self.objprop.update_properties_from(current_obj, force_tab=force_tab) self.acthandler.selected_objects_changed(selected_groups, selected_objects) self.refresh_plot("selected", update_items, False)
[docs] def properties_changed(self) -> None: """The properties 'Apply' button was clicked: update object properties, refresh plot and update object view.""" # Get only the properties that have changed from the original values changed_props = self.objprop.get_changed_properties() # Apply only the changed properties to all selected objects for obj in self.objview.get_sel_objects(include_groups=True): obj.mark_roi_as_changed() # Update only the changed properties instead of all properties update_dataset(obj, changed_props) self.objview.update_item(get_uuid(obj)) # Auto-recompute analysis if the object had analysis parameters # Since properties have changed, any analysis results may now be invalid self.processor.auto_recompute_analysis(obj) # Refresh all selected items, including non-visible ones (only_visible=False) # This ensures that plot items are updated for all selected objects, even if # they are temporarily hidden behind other objects self.refresh_plot( "selected", update_items=True, force=False, only_visible=False ) # Update the stored original values to reflect the new state # This ensures subsequent changes are compared against the current values self.objprop.update_original_values()
[docs] def recompute_processing(self) -> None: """Recompute/rerun selected objects or group with stored processing parameters. This method handles both single objects and groups. For each object, it checks if it has 1-to-1 processing parameters that can be recomputed. Objects without recomputable parameters are skipped. """ # Get selected objects (handles both individual selection and groups) objects = self.objview.get_sel_objects(include_groups=True) if not objects: return # Filter objects that have recomputable processing parameters recomputable_objects: list[SignalObj | ImageObj] = [] for obj in objects: proc_params = extract_processing_parameters(obj) if proc_params is not None and proc_params.pattern == "1-to-1": recomputable_objects.append(obj) if not recomputable_objects: if not execenv.unattended: QW.QMessageBox.information( self, _("Recompute"), _( "Selected object(s) do not have processing parameters " "that can be recomputed." ), ) return # Recompute each object with create_progress_bar( self, _("Recomputing objects"), max_=len(recomputable_objects) ) as progress: for index, obj in enumerate(recomputable_objects): progress.setValue(index + 1) QW.QApplication.processEvents() if progress.wasCanceled(): break # Temporarily set this object as current to use existing infrastructure self.objview.set_current_object(obj) report = self.objprop.apply_processing_parameters( obj=obj, interactive=False ) if not report.success and not execenv.unattended: failtxt = _("Failed to recompute object") if index == len(recomputable_objects) - 1: QW.QMessageBox.warning( self, _("Recompute"), f"{failtxt} '{obj.title}':\n{report.message}", ) else: conttxt = _("Do you want to continue with the next object?") answer = QW.QMessageBox.warning( self, _("Recompute"), f"{failtxt} '{obj.title}':\n{report.message}\n\n{conttxt}", QW.QMessageBox.Yes | QW.QMessageBox.No, ) if answer == QW.QMessageBox.No: break
[docs] def select_source_objects(self) -> None: """Select source objects associated with the selected object's processing. This method retrieves the source object UUIDs from the selected object's processing parameters and selects them in the object view. """ # Get the selected object (should be exactly one) objects = self.objview.get_sel_objects(include_groups=False) if len(objects) != 1: return obj = objects[0] # Extract processing parameters proc_params = extract_processing_parameters(obj) if proc_params is None: if not execenv.unattended: QW.QMessageBox.information( self, _("Select source objects"), _("Selected object does not have processing metadata."), ) return # Get source UUIDs source_uuids = [] if proc_params.source_uuid: source_uuids.append(proc_params.source_uuid) if proc_params.source_uuids: source_uuids.extend(proc_params.source_uuids) if not source_uuids: if not execenv.unattended: QW.QMessageBox.information( self, _("Select source objects"), _("Selected object does not have source object references."), ) return # Check if source objects still exist (search across all panels) existing_objs = [] for uuid in source_uuids: obj = self.mainwindow.find_object_by_uuid(uuid) if obj is not None: existing_objs.append(obj) if not existing_objs: if not execenv.unattended: if len(source_uuids) == 1: msg = _("Source object no longer exists.") else: msg = _("Source objects no longer exist.") QW.QMessageBox.warning(self, _("Select source objects"), msg) return # Determine which panel contains the source objects # Source objects are always in the same panel (either all signals or all images) if all(uuid in self.objmodel.get_object_ids() for uuid in source_uuids): source_panel = self elif isinstance(existing_objs[0], SignalObj): source_panel = self.mainwindow.signalpanel else: # ImageObj source_panel = self.mainwindow.imagepanel # Switch to the source panel if needed if source_panel is not self: self.mainwindow.set_current_panel(source_panel) # Select the existing source objects # Note: Since all sources are in the same panel, all UUIDs in existing_objs # are guaranteed to be in source_panel source_panel.objview.clearSelection() for obj in existing_objs: source_panel.objview.set_current_item_id(get_uuid(obj), extend=True) # Show info if some sources were deleted missing_count = len(source_uuids) - len(existing_objs) if missing_count > 0 and not execenv.unattended: if len(existing_objs) == 1: msg = _("Selected a single source object") else: msg = _("Selected %d source objects") % len(existing_objs) msg += " (" if missing_count == 1: msg += _("1 source object no longer exists") else: msg += _("%d source objects no longer exist") % missing_count msg += ")" QW.QMessageBox.warning(self, _("Select source objects"), msg)
# ------Plotting data in modal dialogs----------------------------------------------
[docs] def add_plot_items_to_dialog(self, dlg: PlotDialog, oids: list[str]) -> None: """Add plot items to dialog Args: dlg: Dialog oids: Object IDs """ objs = self.objmodel.get_objects(oids) plot = dlg.get_plot() with create_progress_bar( self, _("Creating plot items"), max_=len(objs) ) as progress: for index, obj in enumerate(objs): progress.setValue(index + 1) QW.QApplication.processEvents() if progress.wasCanceled(): return None item = create_adapter_from_object(obj).make_item( update_from=self.plothandler[get_uuid(obj)] ) item.set_readonly(True) plot.add_item(item, z=0) plot.set_active_item(item) item.unselect() plot.replot() return dlg
[docs] def open_separate_view( self, oids: list[str] | None = None, edit_annotations: bool = False ) -> PlotDialog | None: """ Open separate view for visualizing selected objects Args: oids: Object IDs (default: None) edit_annotations: Edit annotations (default: False) Returns: Instance of PlotDialog """ if oids is None: oids = self.objview.get_sel_object_uuids(include_groups=True) obj = self.objmodel[oids[-1]] # last selected object if not all(oid in self.plothandler for oid in oids): # This happens for example when opening an already saved workspace with # multiple images, and if the user tries to view in a new window a group of # images without having selected any object yet. In this case, only the # last image is actually plotted (because if the other have the same size # and position, they are hidden), and the plot item of every other image is # not created yet. So we need to refresh the plot to create the plot item of # those images. self.plothandler.refresh_plot( "selected", update_items=True, force=True, only_visible=False ) # Create a new dialog and add plot items to it dlg = self.create_new_dialog( title=obj.title if len(oids) == 1 else None, edit=True, name=f"{obj.PREFIX}_new_window", ) if dlg is None: return None self.add_plot_items_to_dialog(dlg, oids) mgr = dlg.get_manager() toolbar = QW.QToolBar(_("Annotations"), self) dlg.button_layout.insertWidget(0, toolbar) mgr.add_toolbar(toolbar, id(toolbar)) toolbar.setToolButtonStyle(QC.Qt.ToolButtonTextUnderIcon) for tool in self.ANNOTATION_TOOLS: mgr.add_tool(tool, toolbar_id=id(toolbar)) def toggle_annotations(enabled: bool): """Toggle annotation tools / edit buttons visibility""" for widget in (dlg.button_box, toolbar, mgr.get_itemlist_panel()): if enabled: widget.show() else: widget.hide() edit_ann_action = create_action( dlg, _("Annotations"), toggled=toggle_annotations, icon=get_icon("annotations.svg"), ) mgr.add_tool(ActionTool, edit_ann_action) default_toolbar = mgr.get_default_toolbar() action_btn = default_toolbar.widgetForAction(edit_ann_action) action_btn.setToolButtonStyle(QC.Qt.ToolButtonTextBesideIcon) plot = dlg.get_plot() for item in plot.items: item.set_selectable(False) for item in create_adapter_from_object(obj).iterate_shape_items(editable=True): plot.add_item(item) self.__separate_views[dlg] = obj toggle_annotations(edit_annotations) if len(oids) > 1: # If multiple objects are displayed, show the item list panel # (otherwise, it is hidden by default to lighten the dialog, except # if `edit_annotations` is True): plot.manager.get_itemlist_panel().show() if edit_annotations: edit_ann_action.setChecked(True) dlg.show() dlg.finished.connect(self.__separate_view_finished) return dlg
def __separate_view_finished(self, result: int) -> None: """Separate view was closed Args: result: result """ dlg: PlotDialog = self.sender() if result == QW.QDialog.DialogCode.Accepted: rw_items = [] for item in dlg.get_plot().get_items(): if not item.is_readonly() and is_plot_item_serializable(item): rw_items.append(item) obj = self.__separate_views[dlg] # Use the annotation adapter to set annotations in the new format adapter = create_adapter_from_object(obj) adapter.set_annotations_from_items(rw_items) self.selection_changed(update_items=True) self.__separate_views.pop(dlg) dlg.deleteLater()
[docs] def view_images_side_by_side(self, oids: list[str] | None = None) -> None: """ View selected images side-by-side in a grid layout Args: oids: Object IDs (default: None, uses selected objects) """ if oids is None: oids = self.objview.get_sel_object_uuids(include_groups=True) if len(oids) < 2: return objs = self.objmodel.get_objects(oids) # Compute grid dimensions max_cols = 4 num_cols = min(len(objs), max_cols) num_rows = (len(objs) + num_cols - 1) // num_cols # Create dialog with synchronized plots dlg = SyncPlotDialog(title=_("View images side-by-side"), toolbar=False) dlg.setObjectName(f"{self.PANEL_STR_ID}_side_by_side") # Add each image to the grid for idx, obj in enumerate(objs): row = idx // num_cols col = idx % num_cols # Create plot with title plot = BasePlot(options=BasePlotOptions(title=obj.title, type="image")) # Create plot item from object adapter = create_adapter_from_object(obj) item = adapter.make_item() item.set_readonly(True) plot.add_item(item, z=0) # Add ROI items if available if obj.roi is not None: for roi_item in adapter.iterate_roi_items(editable=False): plot.add_item(roi_item) # Add to synchronized dialog dlg.add_plot(row, col, plot, sync=True) # Finalize and show dialog dlg.finalize_configuration() # Set explicit size for proper display dlg.resize(20 + 440 * num_cols, 20 + 400 * num_rows) exec_dialog(dlg)
[docs] def get_dialog_size(self) -> tuple[int, int]: """Get dialog size (minimum and maximum)""" # Resize the dialog so that it's at least MINDIALOGSIZE (absolute values), # and at most MAXDIALOGSIZE (% of the main window size): minwidth, minheight = self.MINDIALOGSIZE maxwidth = int(self.mainwindow.width() * self.MAXDIALOGSIZE) maxheight = int(self.mainwindow.height() * self.MAXDIALOGSIZE) size = min(minwidth, maxwidth), min(minheight, maxheight) return size
[docs] def create_new_dialog( self, edit: bool = False, toolbar: bool = True, title: str | None = None, name: str | None = None, options: dict[str, Any] | None = None, ) -> PlotDialog | None: """Create new pop-up signal/image plot dialog. Args: edit: Edit mode toolbar: Show toolbar title: Dialog title name: Dialog object name options: Plot options Returns: Plot dialog instance """ plot_options = self.plothandler.get_plot_options() if options is not None: plot_options = plot_options.copy(options) # pylint: disable=not-callable dlg = PlotDialog( parent=self.parentWidget(), title=APP_NAME if title is None else f"{title} - {APP_NAME}", options=plot_options, toolbar=toolbar, icon="DataLab.svg", edit=edit, size=self.get_dialog_size(), ) dlg.setObjectName(name) return dlg
[docs] def get_roi_editor_output( self, mode: Literal["apply", "extract", "define"] = "apply" ) -> tuple[TypeROI, bool] | None: """Get ROI data (array) from specific dialog box. Args: mode: Mode of operation, either "apply" (define ROI, then apply it to selected objects), "extract" (define ROI, then extract data from it), or "define" (define ROI without applying or extracting). Returns: A tuple containing the ROI object and a boolean indicating whether the dialog was accepted or not. """ obj = self.objview.get_sel_objects(include_groups=True)[-1] item = create_adapter_from_object(obj).make_item( update_from=self.plothandler[get_uuid(obj)] ) roi_editor_class = self.get_roieditor_class() # pylint: disable=not-callable roi_editor = roi_editor_class( parent=self.parentWidget(), obj=obj, mode=mode, item=item, options=self.plothandler.get_plot_options(), size=self.get_dialog_size(), ) if exec_dialog(roi_editor): return roi_editor.get_roieditor_results() return None
[docs] def get_objects_with_dialog( self, title: str, comment: str = "", nb_objects: int = 1, parent: QW.QWidget | None = None, ) -> TypeObj | None: """Get object with dialog box. Args: title: Dialog title comment: Optional dialog comment nb_objects: Number of objects to select parent: Parent widget Returns: Object(s) (signal(s) or image(s), or None if dialog was canceled) """ parent = self if parent is None else parent dlg = objectview.GetObjectsDialog(parent, self, title, comment, nb_objects) if exec_dialog(dlg): return dlg.get_selected_objects() return None
def __show_no_result_warning(self): """Show no result warning""" msg = "<br>".join( [ _("No result currently available for this object."), "", _( "This feature leverages the results of previous analysis " "performed on the selected object(s).<br><br>" "To compute results, select one or more objects and choose " "a feature in the <u>Analysis</u> menu." ), ] ) if not execenv.unattended: QW.QMessageBox.information(self, APP_NAME, msg)
[docs] def show_results(self) -> None: """Show results""" objs = self.objview.get_sel_objects(include_groups=True) rdatadict = create_resultdata_dict(objs) if rdatadict: for rdata in rdatadict.values(): show_resultdata(self.parentWidget(), rdata, f"{objs[0].PREFIX}_results") else: self.__show_no_result_warning()
[docs] def toggle_result_label_visibility(self, state: bool) -> None: """Toggle the visibility of the merged result label on the plot. Args: state: True to show the label, False to hide it """ show_label = state # Update the configuration Conf.view.show_result_label.set(show_label) # Synchronize the other panel's action state for panel in (self.mainwindow.signalpanel, self.mainwindow.imagepanel): if panel is not self and panel.acthandler.show_label_action is not None: panel.acthandler.show_label_action.blockSignals(True) panel.acthandler.show_label_action.setChecked(show_label) panel.acthandler.show_label_action.blockSignals(False) # Refresh the plot to show/hide the label self.plothandler.toggle_result_label_visibility(show_label)
def __add_result_signal( self, x: np.ndarray | list[float], y: np.ndarray | list[float], title: str, xaxis: str, yaxis: str, group_id: str | None = None, ) -> None: """Add result signal Args: x: X data y: Y data title: Signal title xaxis: X axis label yaxis: Y axis label group_id: UUID of the group to add the signal to. If None, add to current group. """ xdata = np.array(x, dtype=float) ydata = np.array(y, dtype=float) obj = create_signal( title=f"{title}: {yaxis} = f({xaxis})", x=xdata, y=ydata, labels=[xaxis, yaxis], ) self.mainwindow.signalpanel.add_object(obj, group_id=group_id) def __create_plot_result_param_class(self, rdata: ResultData) -> type[gds.DataSet]: """Create PlotResultParam class dynamically based on result data. Args: rdata: Result data Returns: PlotResultParam class """ # Build X and Y choices from result data headers xchoices = (("indices", _("Indices")),) for xlabel in rdata.headers: # If this column data is not numeric, we skip it: if not isinstance( rdata.results[0].get_column_values(xlabel)[0], (int, float, np.number) ): continue xchoices += ((xlabel, xlabel),) ychoices = xchoices[1:] # Determine default plot kind based on result data default_kind = ( "one_curve_per_object" if any(len(result.to_dataframe()) > 1 for result in rdata.results) else "one_curve_per_title" ) class PlotResultParam(gds.DataSet): """Plot results parameters""" kind = gds.ChoiceItem( _("Plot kind"), ( ( "one_curve_per_object", _("One curve per object (or ROI) and per result title"), ), ("one_curve_per_title", _("One curve per result title")), ), default=default_kind, ) xaxis = gds.ChoiceItem(_("X axis"), xchoices, default="indices") yaxis = gds.ChoiceItem(_("Y axis"), ychoices, default=ychoices[0][0]) return PlotResultParam def __plot_result( self, category: str, rdata: ResultData, objs: list[SignalObj | ImageObj], param: gds.DataSet | None = None, result_group_id: str | None = None, ) -> None: """Plot results for a specific category Args: category: Result category rdata: Result data objs: List of objects param: Plot result parameters. If None, show dialog to get parameters. result_group_id: UUID of the group to add result signals to. If None, add to current group. """ # Regrouping ResultShape results by their `title` attribute: grouped_results: dict[str, list[GeometryAdapter | TableAdapter]] = {} for result in rdata.results: grouped_results.setdefault(result.title, []).append(result) # From here, results are already grouped by their `category` attribute, # and then by their `title` attribute. We can now plot them. # # Now, we have two common use cases: # 1. Plotting one curve per object (signal/image) and per `title` # attribute, if each selected object contains a result object # with multiple values (e.g. from a blob detection feature). # 2. Plotting one curve per `title` attribute, if each selected object # contains a result object with a single value (e.g. from a FHWM # feature) - in that case, we select only the first value of each # result object. if param is None: # Create parameter class and show dialog PlotResultParam = self.__create_plot_result_param_class(rdata) comment = ( _( "Plot results obtained from previous analyses.<br><br>" "This plot is based on results associated with '%s' prefix." ) % category ) param = PlotResultParam(_("Plot results"), comment=comment) if not param.edit(parent=self.parentWidget()): return if param.kind == "one_curve_per_title": # One curve per ROI (if any) and per result title # ------------------------------------------------------------------ # Begin by checking if all results have the same number of ROIs: # for simplicity, let's check the number of unique ROI indices. all_roi_indexes = [ result.get_unique_roi_indices() for result in rdata.results ] # Check if all roi_indexes are the same: if len(set(map(tuple, all_roi_indexes))) > 1: if not execenv.unattended: QW.QMessageBox.warning( self, _("Plot results"), _( "All objects associated with results must have the " "same regions of interest (ROIs) to plot results " "together." ), ) return obj = objs[0] # Build a string with source object short IDs (max 3, then use "...") max_ids_to_show = 3 short_ids = [get_short_id(obj) for obj in objs] if len(short_ids) <= max_ids_to_show: source_ids = ", ".join(short_ids) else: # Show first 2, "...", then last one: "s001, s002, ..., s010" source_ids = ( ", ".join(short_ids[: max_ids_to_show - 1]) + ", ..., " + short_ids[-1] ) for i_roi in all_roi_indexes[0]: roi_suffix = f"|ROI{int(i_roi + 1)}" if i_roi >= 0 else "" for title, results in grouped_results.items(): # title x, y = [], [] for index, result in enumerate(results): if param.xaxis == "indices": x.append(index) else: x.append(result.get_column_values(param.xaxis, i_roi)[0]) y.append(result.get_column_values(param.yaxis, i_roi)[0]) if i_roi >= 0: roi_suffix = f"|{obj.roi.get_single_roi_title(int(i_roi))}" self.__add_result_signal( x, y, f"{title} ({source_ids}){roi_suffix}", param.xaxis, param.yaxis, result_group_id, ) else: # One curve per result title, per object and per ROI # ------------------------------------------------------------------ for title, results in grouped_results.items(): # title for index, result in enumerate(results): # object obj = objs[index] roi_indices = result.get_unique_roi_indices() for i_roi in roi_indices: # ROI roi_suffix = "" if i_roi >= 0: roi_suffix = f"|{obj.roi.get_single_roi_title(int(i_roi))}" roi_data = result.get_roi_data(i_roi) if param.xaxis == "indices": x = list(range(len(roi_data))) else: x = roi_data[param.xaxis].values y = roi_data[param.yaxis].values shid = get_short_id(objs[index]) stitle = f"{title} ({shid}){roi_suffix}" self.__add_result_signal( x, y, stitle, param.xaxis, param.yaxis, result_group_id )
[docs] def plot_results( self, kind: str | None = None, xaxis: str | None = None, yaxis: str | None = None, ) -> None: """Plot results Args: kind: Plot kind. Either "one_curve_per_object" or "one_curve_per_title". If None, show dialog to get parameters. xaxis: X axis column name. If None, show dialog to get parameters. yaxis: Y axis column name. If None, show dialog to get parameters. """ objs = self.objview.get_sel_objects(include_groups=True) rdatadict = create_resultdata_dict(objs) if rdatadict: # Always use or create a "Results" group for all plot results rgroup_title = _("Results") target_panel = self.mainwindow.signalpanel try: # Check if a "Results" group already exists in the signal panel rgroup = target_panel.objmodel.get_group_from_title(rgroup_title) except KeyError: # Create the group if it doesn't exist rgroup = target_panel.add_group(rgroup_title) result_group_id = get_uuid(rgroup) for category, rdata in rdatadict.items(): param = None if kind is not None and xaxis is not None and yaxis is not None: # Create parameter object programmatically PlotResultParam = self.__create_plot_result_param_class(rdata) param = PlotResultParam() param.kind = kind param.xaxis = xaxis param.yaxis = yaxis self.__plot_result(category, rdata, objs, param, result_group_id) else: self.__show_no_result_warning()
[docs] def delete_results(self) -> None: """Delete results""" objs = self.objview.get_sel_objects(include_groups=True) rdatadict = create_resultdata_dict(objs) if rdatadict: if execenv.unattended: confirmed = True else: answer = QW.QMessageBox.warning( self, _("Delete results"), _( "Are you sure you want to delete all results " "of the selected object(s)?" ), QW.QMessageBox.Yes | QW.QMessageBox.No, ) confirmed = answer == QW.QMessageBox.Yes if confirmed: objs = self.objview.get_sel_objects(include_groups=True) for obj in objs: # Remove all table and geometry results using adapter methods TableAdapter.remove_all_from(obj) GeometryAdapter.remove_all_from(obj) if obj is self.objview.get_current_object(): self.objprop.update_properties_from(obj) # Update action states to reflect the removal selected_groups = self.objview.get_sel_groups() self.acthandler.selected_objects_changed(selected_groups, objs) self.refresh_plot("selected", True, False) else: self.__show_no_result_warning()
[docs] def add_label_with_title( self, title: str | None = None, ignore_msg: bool = True ) -> None: """Add a label with object title on the associated plot Args: title: Label title. Defaults to None. If None, the title is the object title. ignore_msg: If True, do not show the information message. Defaults to True. If False, show a message box to inform the user that the label has been added as an annotation, and that it can be edited or removed using the annotation editing window. """ objs = self.objview.get_sel_objects(include_groups=True) for obj in objs: create_adapter_from_object(obj).add_label_with_title(title=title) if ( not Conf.view.ignore_title_insertion_msg.get(False) and not ignore_msg and not execenv.unattended ): answer = QW.QMessageBox.information( self, _("Annotation added"), _( "The label has been added as an annotation. " "You can edit or remove it using the annotation editing window." "<br><br>" "Choosing to ignore this message will prevent it " "from being displayed again." ), QW.QMessageBox.Ok | QW.QMessageBox.Ignore, ) if answer == QW.QMessageBox.Ignore: Conf.view.ignore_title_insertion_msg.set(True) self.refresh_plot("selected", True, False)