|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import warnings |
| 4 | +from typing import TYPE_CHECKING, cast |
| 5 | +from weakref import WeakValueDictionary |
| 6 | + |
| 7 | +import ndv |
| 8 | +from pymmcore_plus.mda.handlers import TensorStoreHandler |
| 9 | +from PyQt6.QtCore import QObject, Qt, QTimer |
| 10 | +from PyQt6.QtWidgets import QWidget |
| 11 | + |
| 12 | +if TYPE_CHECKING: |
| 13 | + from collections.abc import Iterator |
| 14 | + |
| 15 | + import numpy as np |
| 16 | + import useq |
| 17 | + from ndv.models._array_display_model import IndexMap |
| 18 | + from pymmcore_plus import CMMCorePlus |
| 19 | + from pymmcore_plus.mda import SupportsFrameReady |
| 20 | + from pymmcore_plus.metadata import FrameMetaV1, SummaryMetaV1 |
| 21 | + from useq import MDASequence |
| 22 | + |
| 23 | + |
| 24 | +# NOTE: we make this a QObject mostly so that the lifetime of this object is tied to |
| 25 | +# the lifetime of the parent QMainWindow. If inheriting from QObject is removed in |
| 26 | +# the future, make sure not to store a strong reference to this main_window |
| 27 | +class NDVViewersManager(QObject): |
| 28 | + """Object that mediates a connection between the MDA experiment and ndv viewers. |
| 29 | +
|
| 30 | + Parameters |
| 31 | + ---------- |
| 32 | + parent : QWidget |
| 33 | + The parent widget. |
| 34 | + mmcore : CMMCorePlus |
| 35 | + The CMMCorePlus instance. |
| 36 | + """ |
| 37 | + |
| 38 | + def __init__(self, parent: QWidget, mmcore: CMMCorePlus): |
| 39 | + super().__init__(parent) |
| 40 | + self._mmc = mmcore |
| 41 | + |
| 42 | + # weakref map of {sequence_uid: ndv.ArrayViewer} |
| 43 | + self._seq_viewers = WeakValueDictionary[str, ndv.ArrayViewer]() |
| 44 | + # currently active viewer |
| 45 | + self._active_viewer: ndv.ArrayViewer | None = None |
| 46 | + |
| 47 | + # We differentiate between handlers that were created by someone else, and |
| 48 | + # gathered using mda.get_output_handlers(), vs handlers that were created by us. |
| 49 | + # because we need to call frameReady/sequenceFinished manually on the latter. |
| 50 | + self._handler: SupportsFrameReady | None = None |
| 51 | + self._own_handler: TensorStoreHandler | None = None |
| 52 | + |
| 53 | + # CONNECTIONS --------------------------------------------------------- |
| 54 | + |
| 55 | + self._mmc.mda.events.sequenceStarted.connect(self._on_sequence_started) |
| 56 | + self._mmc.mda.events.frameReady.connect(self._on_frame_ready) |
| 57 | + self._mmc.mda.events.sequenceFinished.connect(self._on_sequence_finished) |
| 58 | + parent.destroyed.connect(self._cleanup) |
| 59 | + |
| 60 | + def _cleanup(self, obj: QObject | None = None) -> None: |
| 61 | + self._active_viewer = None |
| 62 | + self._handler = None |
| 63 | + self._own_handler = None |
| 64 | + |
| 65 | + def _on_sequence_started( |
| 66 | + self, sequence: useq.MDASequence, meta: SummaryMetaV1 |
| 67 | + ) -> None: |
| 68 | + """Called when a new MDA sequence has been started. |
| 69 | +
|
| 70 | + We grab the first handler in the list of output handlers, or create a new |
| 71 | + TensorStoreHandler if none exist. Then we create a new ndv viewer and show it. |
| 72 | + """ |
| 73 | + self._own_handler = self._handler = None |
| 74 | + if handlers := self._mmc.mda.get_output_handlers(): |
| 75 | + # someone else has created a handler for this sequence |
| 76 | + self._handler = handlers[0] |
| 77 | + else: |
| 78 | + # if it does not exist, create a new TensorStoreHandler |
| 79 | + self._own_handler = TensorStoreHandler(driver="zarr", kvstore="memory://") |
| 80 | + self._own_handler.reset(sequence) |
| 81 | + |
| 82 | + # since the handler is empty at this point, create a ndv viewer with no data |
| 83 | + self._active_viewer = viewer = self._create_ndv_viewer(sequence) |
| 84 | + self._seq_viewers[str(sequence.uid)] = viewer |
| 85 | + |
| 86 | + def _on_frame_ready( |
| 87 | + self, frame: np.ndarray, event: useq.MDAEvent, meta: FrameMetaV1 |
| 88 | + ) -> None: |
| 89 | + """Create a viewer if it does not exist, otherwise update the current index.""" |
| 90 | + # at this point the viewer should exist |
| 91 | + if self._own_handler is not None: |
| 92 | + self._own_handler.frameReady(frame, event, meta) |
| 93 | + |
| 94 | + if (viewer := self._active_viewer) is None: |
| 95 | + return # pragma: no cover |
| 96 | + |
| 97 | + # if the viewer does not yet have data, it's likely the very first frame |
| 98 | + # so update the viewer's data source to the underlying handlers store |
| 99 | + if viewer.data_wrapper is None: |
| 100 | + handler = self._handler or self._own_handler |
| 101 | + if isinstance(handler, TensorStoreHandler): |
| 102 | + # TODO: temporary. maybe create the DataWrapper for the handlers |
| 103 | + viewer.data = handler.store |
| 104 | + else: |
| 105 | + warnings.warn( |
| 106 | + f"don't know how to show data of type {type(handler)}", |
| 107 | + stacklevel=2, |
| 108 | + ) |
| 109 | + # otherwise update the sliders to the most recently acquired frame |
| 110 | + else: |
| 111 | + # Add a small delay to make sure the data are available in the handler |
| 112 | + # This is a bit of a hack to get around the data handlers can write data |
| 113 | + # asynchronously, so the data may not be available immediately to the viewer |
| 114 | + # after the handler's frameReady method is called. |
| 115 | + current_index = viewer.display_model.current_index |
| 116 | + |
| 117 | + def _update(_idx: IndexMap = current_index) -> None: |
| 118 | + try: |
| 119 | + _idx.update(event.index.items()) |
| 120 | + except Exception: # pragma: no cover |
| 121 | + # this happens if the viewer has been closed in the meantime |
| 122 | + # usually it's a RuntimeError, but could be an EmitLoopError |
| 123 | + pass |
| 124 | + |
| 125 | + QTimer.singleShot(10, _update) |
| 126 | + |
| 127 | + def _on_sequence_finished(self, sequence: useq.MDASequence) -> None: |
| 128 | + """Called when a sequence has finished.""" |
| 129 | + if self._own_handler is not None: |
| 130 | + self._own_handler.sequenceFinished(sequence) |
| 131 | + # cleanup pointers somehow? |
| 132 | + |
| 133 | + def _create_ndv_viewer(self, sequence: MDASequence) -> ndv.ArrayViewer: |
| 134 | + """Create a new ndv viewer with no data.""" |
| 135 | + ndv_viewer = ndv.ArrayViewer() |
| 136 | + q_viewer = cast("QWidget", ndv_viewer.widget()) |
| 137 | + |
| 138 | + if isinstance(par := self.parent(), QWidget): |
| 139 | + q_viewer.setParent(par) |
| 140 | + |
| 141 | + sha = str(sequence.uid)[:8] |
| 142 | + q_viewer.setObjectName(f"ndv-{sha}") |
| 143 | + q_viewer.setWindowTitle(f"MDA {sha}") |
| 144 | + q_viewer.setWindowFlags(Qt.WindowType.Dialog) |
| 145 | + q_viewer.show() |
| 146 | + return ndv_viewer |
| 147 | + |
| 148 | + def __repr__(self) -> str: # pragma: no cover |
| 149 | + return f"<{self.__class__.__name__} {hex(id(self))} ({len(self)} viewer)>" |
| 150 | + |
| 151 | + def __len__(self) -> int: |
| 152 | + return len(self._seq_viewers) |
| 153 | + |
| 154 | + def viewers(self) -> Iterator[ndv.ArrayViewer]: |
| 155 | + yield from (self._seq_viewers.values()) |
0 commit comments