From e85622c7f5fb8c617db5f051cb110f52836e8347 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Thu, 2 Jul 2026 10:01:59 +0200 Subject: [PATCH 01/19] Normalize recording container handling Centralized recording container configuration by introducing shared allowed/default container constants and using them across settings, UI, and utilities. The recording UI now populates container options from config and keeps the filename extension aligned with the selected container when switching between known video formats, so saved settings and path previews stay consistent. --- dlclivegui/config.py | 5 ++- dlclivegui/gui/main_window.py | 81 ++++++++++++++++++++++++----------- dlclivegui/utils/utils.py | 4 +- 3 files changed, 62 insertions(+), 28 deletions(-) diff --git a/dlclivegui/config.py b/dlclivegui/config.py index 53629bb..339eb3e 100644 --- a/dlclivegui/config.py +++ b/dlclivegui/config.py @@ -20,6 +20,9 @@ # Global settings ## GUI GUI_MAX_DISPLAY_FPS: float = 30.0 +## Recording +ALLOWED_VIDEO_CONTAINERS: set[str] = {"mp4", "avi", "mov"} +DEFAULT_RECORDING_CONTAINER: str = "mp4" ## Debug @@ -512,7 +515,7 @@ class RecordingSettings(BaseModel): enabled: bool = False directory: str = Field(default_factory=lambda: str(Path.home() / "Videos" / "deeplabcut-live")) filename: str = "session.mp4" - container: Literal["mp4", "avi", "mov"] = "mp4" + container: Literal["mp4", "avi", "mov"] = DEFAULT_RECORDING_CONTAINER codec: str = "libx264" crf: int = Field(default=23, ge=0, le=51) fast_encoding: bool = False diff --git a/dlclivegui/gui/main_window.py b/dlclivegui/gui/main_window.py index 2677b8f..2a0e4fd 100644 --- a/dlclivegui/gui/main_window.py +++ b/dlclivegui/gui/main_window.py @@ -48,7 +48,9 @@ from dlclivegui.cameras import CameraFactory from dlclivegui.config import ( + ALLOWED_VIDEO_CONTAINERS, DEFAULT_CONFIG, + DEFAULT_RECORDING_CONTAINER, ApplicationSettings, BoundingBoxSettings, CameraSettings, @@ -593,7 +595,7 @@ def _build_recording_group(self) -> QGroupBox: self.container_combo.setToolTip("Select the video container/format") self.container_combo.setSizePolicy(QSizePolicy.MinimumExpanding, QSizePolicy.Preferred) self.container_combo.setEditable(True) - self.container_combo.addItems(["mp4", "avi", "mov"]) + self.container_combo.addItems(sorted(ALLOWED_VIDEO_CONTAINERS)) # Ensure it never becomes unreadable: self.container_combo.setMinimumContentsLength(8) self.container_combo.setSizeAdjustPolicy(QComboBox.SizeAdjustPolicy.AdjustToMinimumContentsLengthWithIcon) @@ -809,7 +811,7 @@ def _connect_signals(self) -> None: self.use_timestamp_checkbox.stateChanged.connect(self._on_use_timestamp_changed) self.output_directory_edit.textChanged.connect(lambda _t: self._update_recording_path_preview()) self.filename_edit.textChanged.connect(lambda _t: self._update_recording_path_preview()) - self.container_combo.currentTextChanged.connect(lambda _t: self._update_recording_path_preview()) + self.container_combo.currentTextChanged.connect(self._on_container_changed) self.fast_encoding_checkbox.stateChanged.connect(self._on_fast_encoding_changed) # ------------------------------------------------------------------ @@ -945,11 +947,13 @@ def _dlc_settings_from_ui(self, *, allow_empty_model_path=False) -> DLCProcessor ) def _recording_settings_from_ui(self) -> RecordingSettings: + container = self.container_combo.currentText().strip() or DEFAULT_RECORDING_CONTAINER + filename = self._filename_matching_container(self.filename_edit.text().strip(), container) return RecordingSettings( enabled=True, # Always enabled - recording controlled by button directory=self.output_directory_edit.text().strip(), - filename=self.filename_edit.text().strip() or "session.mp4", - container=self.container_combo.currentText().strip() or "mp4", + filename=filename, + container=container, codec=self.codec_combo.currentText().strip() or "libx264", crf=int(self.crf_spin.value()), fast_encoding=bool( @@ -1162,31 +1166,52 @@ def _refresh_processors(self) -> None: # ------------------------------------------------------------------ # Recording path preview and session name persistence + def _known_recording_extensions(self) -> set[str]: + """Return known recording container extensions without leading dots.""" + known = ALLOWED_VIDEO_CONTAINERS.copy() + if hasattr(self, "container_combo"): + known.update( + self.container_combo.itemText(i).strip().lower().lstrip(".") + for i in range(self.container_combo.count()) + if self.container_combo.itemText(i).strip() + ) + return known + + def _filename_matching_container(self, filename: str, container: str) -> str: + """ + Adjust filename extension to match selected container, but only when + the existing extension is another known recording container. + """ + name = filename.strip() or "recording" + selected_ext = container.strip().lower().lstrip(".") + suffix = Path(name).suffix + + if not suffix or not selected_ext: + return name + + current_ext = suffix.lower().lstrip(".") + if current_ext in self._known_recording_extensions() and current_ext != selected_ext: + return str(Path(name).with_suffix(f".{selected_ext}")) + + return name + + def _on_container_changed(self, text: str) -> None: + """Keep filename extension aligned with selected container when safe.""" + if hasattr(self, "filename_edit"): + current = self.filename_edit.text() + updated = self._filename_matching_container(current, text) + if updated != current: + self.filename_edit.blockSignals(True) + self.filename_edit.setText(updated) + self.filename_edit.blockSignals(False) + + self._update_recording_path_preview() + def _on_session_name_editing_finished(self) -> None: name = self.session_name_edit.text().strip() self._settings_store.set_session_name(name) self._update_recording_path_preview() - # def _update_recording_path_preview(self) -> None: - # """Update the label showing where files will go (best-effort).""" - # if not hasattr(self, "recording_path_preview"): - # return - # out_dir = self.output_directory_edit.text().strip() - # sess = self.session_name_edit.text().strip() if hasattr(self, "session_name_edit") else "" - # base = self.filename_edit.text().strip() - # container = self.container_combo.currentText().strip() if hasattr(self, "container_combo") else "mp4" - # use_ts = self.use_timestamp_checkbox.isChecked() if hasattr(self, "use_timestamp_checkbox") else True - - # # Preview is approximate (since run index/time is decided at start). - # sess_safe = sess.strip() or "session" - # run_hint = "run_" if use_ts else "run_" - # stem_hint = Path(base).stem if base.strip() else "recording" # shows user-provided stem or default - # full_hint = str(Path(out_dir).expanduser() / sess_safe / run_hint / f"{stem_hint}_.{container}") - # self.recording_path_preview.setText(f"{full_hint}") - # self.recording_path_preview.setToolTip( - # f"Click to copy to clipboard :
{full_hint.replace('', '*')}" - # ) - def _update_recording_path_preview(self) -> None: """Update the label showing where files will go (best-effort).""" if not hasattr(self, "recording_path_preview"): @@ -1194,8 +1219,12 @@ def _update_recording_path_preview(self) -> None: out_dir = self.output_directory_edit.text().strip() sess = self.session_name_edit.text().strip() if hasattr(self, "session_name_edit") else "" - base = self.filename_edit.text().strip() - container = self.container_combo.currentText().strip() if hasattr(self, "container_combo") else "mp4" + container = ( + self.container_combo.currentText().strip() + if hasattr(self, "container_combo") + else DEFAULT_RECORDING_CONTAINER + ) + base = self._filename_matching_container(self.filename_edit.text(), container) use_ts = self.use_timestamp_checkbox.isChecked() if hasattr(self, "use_timestamp_checkbox") else True # Preview is approximate (since run index/time is decided at start). diff --git a/dlclivegui/utils/utils.py b/dlclivegui/utils/utils.py index 6af003d..534e573 100644 --- a/dlclivegui/utils/utils.py +++ b/dlclivegui/utils/utils.py @@ -8,6 +8,8 @@ from datetime import datetime from pathlib import Path +from dlclivegui.config import DEFAULT_RECORDING_CONTAINER + _INVALID_CHARS = re.compile(r"[^A-Za-z0-9._-]+") @@ -36,7 +38,7 @@ def split_stem_ext(base_filename: str, container: str) -> tuple[str, str]: If user typed an extension, keep it. Else use container. """ base = (base_filename or "").strip() - container = (container or "mp4").strip().lstrip(".") or "mp4" + container = (container or DEFAULT_RECORDING_CONTAINER).strip().lstrip(".") or DEFAULT_RECORDING_CONTAINER if not base: base = "recording" From dc9161c6afd16c53779770ff66c988e7bfa9faa7 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Thu, 2 Jul 2026 10:50:25 +0200 Subject: [PATCH 02/19] Add DLC timing instrumentation in GUI path Introduce a new `DLC_DO_LOG_TIMING` config flag and wire `WorkerTimingStats` into `DLCLiveMainWindow` for DLC enqueue and pose-ready callback timing. The pose callback now logs camera-to-GUI latency in debug mode, marks display state dirty instead of forcing an immediate redraw, and emits periodic timing stats via `maybe_log()`. --- dlclivegui/config.py | 1 + dlclivegui/gui/main_window.py | 34 ++++++++++++++++++++++++++++------ 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/dlclivegui/config.py b/dlclivegui/config.py index 339eb3e..f1347af 100644 --- a/dlclivegui/config.py +++ b/dlclivegui/config.py @@ -30,6 +30,7 @@ SINGLE_CAMERA_WORKER_DO_LOG_TIMING: bool = False MULTI_CAMERA_WORKER_DO_LOG_TIMING: bool = False REC_DO_LOG_TIMING: bool = False +DLC_DO_LOG_TIMING: bool = True # MAIN_WINDOW_DO_LOG_TIMING: bool = False #### Backends BASLER_DO_LOG_TIMING: bool = False diff --git a/dlclivegui/gui/main_window.py b/dlclivegui/gui/main_window.py index 2a0e4fd..898540e 100644 --- a/dlclivegui/gui/main_window.py +++ b/dlclivegui/gui/main_window.py @@ -51,6 +51,7 @@ ALLOWED_VIDEO_CONTAINERS, DEFAULT_CONFIG, DEFAULT_RECORDING_CONTAINER, + DLC_DO_LOG_TIMING, ApplicationSettings, BoundingBoxSettings, CameraSettings, @@ -70,7 +71,7 @@ from ..services.multi_camera_controller import MultiCameraController, MultiFrameData, get_camera_id, get_display_id from ..utils.display import BBoxColors, compute_tile_info, create_tiled_frame, draw_bbox, draw_pose from ..utils.settings_store import DLCLiveGUISettingsStore, ModelPathStore -from ..utils.stats import format_dlc_stats +from ..utils.stats import WorkerTimingStats, format_dlc_stats from ..utils.utils import FPSTracker from .camera_config.camera_config_dialog import CameraConfigDialog from .misc import color_dropdowns as color_ui @@ -131,6 +132,10 @@ def __init__(self, config: ApplicationSettings | None = None): self._rec_manager = RecordingManager() self._dlc = DLCLiveProcessor() self.multi_camera_controller = MultiCameraController() + ### Time debug + self._dlc_timing = WorkerTimingStats( + "GUI - DLC Worker", logger=logger, log_interval=2.0, enabled=DLC_DO_LOG_TIMING + ) self._config = config self._inference_camera_id: str | None = None # Camera ID used for inference @@ -1504,7 +1509,11 @@ def _on_multi_frame_processing_ready(self, frame_data: MultiFrameData) -> None: if self._dlc_active and is_dlc_camera_frame and dlc_cam_id in frame_data.frames: frame = frame_data.frames[dlc_cam_id] timestamp = frame_data.timestamps.get(dlc_cam_id, time.time()) - self._dlc.enqueue_frame(frame, timestamp) + with self._dlc_timing.measure("enqueue_frame"): + self._dlc.enqueue_frame(frame, timestamp) + + self._dlc_timing.note_frame() + self._dlc_timing.maybe_log() def _on_multi_frame_display_ready(self, frame_data: MultiFrameData) -> None: """Throttled UI/display path. @@ -2048,10 +2057,23 @@ def _stop_recording(self) -> None: def _on_pose_ready(self, result: PoseResult) -> None: if not self._dlc_active: return - self._last_pose = result - # logger.debug(f"Pose result: {result.pose}, Timestamp: {result.timestamp}") - if self._current_frame is not None: - self._display_frame(self._current_frame, force=True) + + with self._dlc_timing.measure("DLC.pose_ready_callback"): + self._last_pose = result + + try: + latency_ms = (time.time() - float(result.timestamp)) * 1000.0 + if logger.isEnabledFor(logging.DEBUG): + logger.debug("DLC pose latency camera_timestamp_to_gui=%.2f ms", latency_ms) + except Exception: + pass + + if self._current_frame is not None: + self._display_dirty = True + # with self._dlc_timing.measure("DLC.display_after_pose"): + # self._display_frame(self._current_frame, force=True) + + self._dlc_timing.maybe_log() def _on_dlc_error(self, message: str) -> None: self._stop_inference(show_message=False) From 3efffdec8f5c09c2a4a4856cbfd8bf9133dcd266 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Thu, 2 Jul 2026 10:54:05 +0200 Subject: [PATCH 03/19] Prioritize latest frame when queue is full Update `_enqueue_frame` to keep enqueueing the newest frame by removing one queued item when `put_nowait` hits `queue.Full`, instead of dropping the incoming frame. This makes processing more real-time under load and keeps enqueue/drop stats consistent, including safe `task_done()` handling. --- dlclivegui/services/dlc_processor.py | 29 +++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index b4476e1..0ecf8ab 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -258,13 +258,28 @@ def enqueue_frame(self, frame: np.ndarray, timestamp: float) -> None: if q is None: return - try: - q.put_nowait((frame_c, timestamp, enq_time)) - with self._stats_lock: - self._frames_enqueued += 1 - except queue.Full: - with self._stats_lock: - self._frames_dropped += 1 + item = (frame_c, timestamp, enq_time) + + while True: + try: + q.put_nowait(item) + with self._stats_lock: + self._frames_enqueued += 1 + return + + except queue.Full: + try: + q.get_nowait() + try: + q.task_done() + except ValueError: + pass + + with self._stats_lock: + self._frames_dropped += 1 + + except queue.Empty: + continue def get_stats(self) -> ProcessorStats: """Get current processing statistics.""" From 8f26d5894386199d09fb843820a6dfab98401989 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Thu, 2 Jul 2026 11:16:52 +0200 Subject: [PATCH 04/19] Lower camera backend logs to debug Demoted multiple verbose runtime messages from INFO to DEBUG in Basler and GenTL backends. This keeps normal logs cleaner by moving routine configuration/readback details (FPS setup, converter mode, exposure/gain settings, trigger configuration, and startup/close diagnostics) out of INFO-level output while preserving the diagnostics when DEBUG is enabled. --- dlclivegui/cameras/backends/basler_backend.py | 26 +++++++++---------- dlclivegui/cameras/backends/gentl_backend.py | 18 ++++++------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/dlclivegui/cameras/backends/basler_backend.py b/dlclivegui/cameras/backends/basler_backend.py index 2739820..e1177d9 100644 --- a/dlclivegui/cameras/backends/basler_backend.py +++ b/dlclivegui/cameras/backends/basler_backend.py @@ -437,7 +437,7 @@ def _configure_frame_rate(self) -> None: fps = self._positive_float(getattr(self.settings, "fps", 0.0)) if fps is None: - LOG.info("[Basler] FPS: auto/free-run, not forcing AcquisitionFrameRate") + LOG.debug("[Basler] FPS: auto/free-run, not forcing AcquisitionFrameRate") return enable = self._feature("AcquisitionFrameRateEnable") @@ -454,7 +454,7 @@ def _configure_frame_rate(self) -> None: try: min_v = rate.GetMin() max_v = rate.GetMax() - LOG.info("[Basler] AcquisitionFrameRate range: min=%s max=%s requested=%s", min_v, max_v, fps) + LOG.debug("[Basler] AcquisitionFrameRate range: min=%s max=%s requested=%s", min_v, max_v, fps) except Exception: pass @@ -485,7 +485,7 @@ def _configure_frame_rate(self) -> None: if feature is not None: readbacks[name] = self._feature_value(feature, None) - LOG.info("[Basler] FPS readback requested=%s values=%s", fps, readbacks) + LOG.debug("[Basler] Readback requested=%s values=%s", fps, readbacks) try: self._actual_fps = float(readbacks.get("AcquisitionFrameRate")) @@ -510,14 +510,14 @@ def _configure_converter(self) -> None: if self._should_output_mono(): self._converter.OutputPixelFormat = pylon.PixelType_Mono8 - LOG.info( + LOG.debug( "[Basler] Converter configured for Mono8 output (camera PixelFormat=%s preserve_mono=%s)", camera_pixel_format, self._preserve_mono, ) else: self._converter.OutputPixelFormat = pylon.PixelType_BGR8packed - LOG.info( + LOG.debug( "[Basler] Converter configured for BGR8 output (camera PixelFormat=%s preserve_mono=%s)", camera_pixel_format, self._preserve_mono, @@ -548,7 +548,7 @@ def open(self) -> None: self._camera.ExposureTime.SetValue(float(self.settings.exposure)) if hasattr(self._camera, "ExposureTimeAbs"): self._camera.ExposureTimeAbs.SetValue(float(self.settings.exposure)) - LOG.info("[Basler] Exposure set to %s us (auto off)", self.settings.exposure) + LOG.debug("[Basler] Exposure set to %s us (auto off)", self.settings.exposure) except Exception as exc: LOG.warning("[Basler] Failed to set exposure: %s", exc) @@ -558,7 +558,7 @@ def open(self) -> None: if hasattr(self._camera, "GainAuto"): self._camera.GainAuto.SetValue("Off") self._camera.Gain.SetValue(float(self.settings.gain)) - LOG.info("[Basler] Gain set to %s dB (auto off)", self.settings.gain) + LOG.debug("[Basler] Gain set to %s dB (auto off)", self.settings.gain) except Exception as exc: LOG.warning("[Basler] Failed to set gain: %s", exc) @@ -638,7 +638,7 @@ def open(self) -> None: # pylon.GrabStrategy_LatestImageOnly, pylon.GrabStrategy_OneByOne, ) - LOG.info( + LOG.debug( "[Basler] grabbing=%s max_buffers=%s", self._camera.IsGrabbing(), self._camera.MaxNumBuffer.GetValue() if hasattr(self._camera, "MaxNumBuffer") else "N/A", @@ -646,7 +646,7 @@ def open(self) -> None: else: LOG.debug("Fast-start probe: skipping StartGrabbing and converter") - LOG.info( + LOG.debug( "[Basler] open device_id=%s index=%s fast_start=%s requested=(%sx%s @ %s fps exp=%s gain=%s)", getattr(self, "_device_id", None), getattr(self.settings, "index", None), @@ -756,7 +756,7 @@ def read(self) -> CapturedFrame: if not self._logged_first_frame: self._logged_first_frame = True - LOG.info( + LOG.debug( "[Basler] first frame device_id=%s shape=%s dtype=%s nbytes=%.2f MB " "camera_pixel_format=%s output_format=%s preserve_mono=%s", self._device_id, @@ -803,7 +803,7 @@ def read(self) -> CapturedFrame: raise RuntimeError("Failed to retrieve image from Basler camera.") from exc def close(self) -> None: - LOG.info( + LOG.debug( "[Basler] close called camera_exists=%s grabbing=%s open=%s", self._camera is not None, bool(self._camera and self._camera.IsGrabbing()), @@ -1164,7 +1164,7 @@ def _configure_trigger_input(self, cfg, *, strict: bool = False) -> None: self._trigger = CameraTriggerSettings() return - LOG.info( + LOG.debug( "Basler trigger input configured: role=%s selector=%s source=%s activation=%s " "selector_ok=%s source_ok=%s activation_ok=%s", role, @@ -1229,7 +1229,7 @@ def _configure_trigger_master(self, cfg, *, strict: bool = False) -> None: source_ok = self._set_enum_feature("LineSource", output_source, strict=strict) if mode_ok and source_ok: - LOG.info( + LOG.debug( "Basler trigger master configured via Line*: output_line=%s output_source=%s", output_line, output_source, diff --git a/dlclivegui/cameras/backends/gentl_backend.py b/dlclivegui/cameras/backends/gentl_backend.py index e462a1d..f0eb641 100644 --- a/dlclivegui/cameras/backends/gentl_backend.py +++ b/dlclivegui/cameras/backends/gentl_backend.py @@ -1275,7 +1275,7 @@ def _resolve_trigger_source(self, node_map, requested: str, *, strict: bool) -> if requested.lower() == "auto": for candidate in ("Line0", "Line1", "Line2", "Any"): if candidate in available: - LOG.info( + LOG.debug( "GenTL TriggerSource auto-selected '%s'. Available: %s", candidate, available, @@ -1447,7 +1447,7 @@ def _configure_trigger_input(self, node_map, cfg, *, strict: bool = False) -> No self._trigger = CameraTriggerSettings() return - LOG.info( + LOG.debug( "GenTL trigger input configured: role=%s selector=%s source_requested=%s " "source=%s activation=%s selector_ok=%s source_ok=%s activation_ok=%s", role, @@ -1508,7 +1508,7 @@ def _configure_trigger_master(self, node_map, cfg, *, strict: bool = False) -> N node = self._node(node_map, "StrobeDuration") if node is not None: node.value = int(strobe_duration) - LOG.info("Configured GenTL StrobeDuration=%s", int(strobe_duration)) + LOG.debug("Configured GenTL StrobeDuration=%s", int(strobe_duration)) except Exception as exc: if strict: raise RuntimeError(f"Failed to set StrobeDuration={strobe_duration}: {exc}") from exc @@ -1519,7 +1519,7 @@ def _configure_trigger_master(self, node_map, cfg, *, strict: bool = False) -> N node = self._node(node_map, "StrobeDelay") if node is not None: node.value = int(strobe_delay) - LOG.info("Configured GenTL StrobeDelay=%s", int(strobe_delay)) + LOG.debug("Configured GenTL StrobeDelay=%s", int(strobe_delay)) except Exception as exc: if strict: raise RuntimeError(f"Failed to set StrobeDelay={strobe_delay}: {exc}") from exc @@ -1533,7 +1533,7 @@ def _configure_trigger_master(self, node_map, cfg, *, strict: bool = False) -> N ) if enable_ok: - LOG.info( + LOG.debug( "GenTL trigger master configured via Strobe*: " "StrobeEnable=On StrobePolarity=%s polarity_ok=%s " "StrobeOperation=%s operation_ok=%s", @@ -1573,7 +1573,7 @@ def _configure_trigger_master(self, node_map, cfg, *, strict: bool = False) -> N source_ok = self._set_enum_node(node_map, "LineSource", output_source, strict=strict) if mode_ok and source_ok: - LOG.info( + LOG.debug( "GenTL trigger master configured via Line*: output_line=%s output_source=%s", output_line, output_source, @@ -1692,7 +1692,7 @@ def _configure_frame_rate(self, node_map) -> None: return target = float(self.settings.fps) - LOG.info("Configuring GenTL frame rate: requested %.3f FPS", target) + LOG.debug("Configuring GenTL frame rate: requested %.3f FPS", target) for attr in ("AcquisitionFrameRateEnable", "AcquisitionFrameRateControlEnable"): try: @@ -1700,7 +1700,7 @@ def _configure_frame_rate(self, node_map) -> None: before = getattr(node, "value", None) node.value = True after = getattr(node, "value", None) - LOG.info("Enabled GenTL %s: before=%r after=%r", attr, before, after) + LOG.debug("Enabled GenTL %s: before=%r after=%r", attr, before, after) break except Exception: pass @@ -1712,7 +1712,7 @@ def _configure_frame_rate(self, node_map) -> None: node.value = target after = getattr(node, "value", None) - LOG.info( + LOG.debug( "Set GenTL %s: before=%r requested=%.3f after=%r", attr, before, From 7e3a86984765cee07cb465b5677ecad7dd842f98 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Thu, 2 Jul 2026 13:30:00 +0200 Subject: [PATCH 05/19] Harden DLC worker startup and timing logs Adds fine-grained `WorkerTimingStats` instrumentation across enqueue, initialization, inference, and emit paths, with error/frame accounting and optional timing logging. It also makes worker startup/stop behavior safer by deferring queue creation until RUNNING, blocking enqueue during STARTING, normalizing input frames before inference, and adding richer debug diagnostics (CUDA/runner state and thread stack dumps on stuck shutdown). --- dlclivegui/services/dlc_processor.py | 286 ++++++++++++++++++++++----- dlclivegui/utils/stats.py | 5 +- dlclivegui/utils/utils.py | 16 ++ 3 files changed, 256 insertions(+), 51 deletions(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index 0ecf8ab..4d4df75 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -16,9 +16,11 @@ import numpy as np from PySide6.QtCore import QObject, Signal -from dlclivegui.config import DLCProcessorSettings, ModelType +from dlclivegui.config import DLC_DO_LOG_TIMING, DLCProcessorSettings, ModelType from dlclivegui.processors.processor_utils import instantiate_from_scan from dlclivegui.temp import Engine # type: ignore # TODO use main package enum when released +from dlclivegui.utils.stats import WorkerTimingStats +from dlclivegui.utils.utils import format_thread_stack logger = logging.getLogger(__name__) STOP_WORKER_TIMEOUT = 10.0 # # seconds to wait in STOPPING state before scheduling background reaping @@ -181,6 +183,13 @@ def __init__(self) -> None: self._gpu_inference_times: deque[float] = deque(maxlen=60) self._processor_overhead_times: deque[float] = deque(maxlen=60) + self._timing = WorkerTimingStats( + "DLCLiveProcessor", + logger=logger, + log_interval=1.0, + enabled=bool(DLC_DO_LOG_TIMING or ENABLE_PROFILING), + ) + @staticmethod def get_model_backend(model_path: str) -> Engine: return Engine.from_model_path(model_path) @@ -232,24 +241,40 @@ def shutdown(self) -> None: def enqueue_frame(self, frame: np.ndarray, timestamp: float) -> None: # Keep lifecycle lock held only for quick state checks and snapshots. with self._lifecycle_lock: - if self._state in (WorkerState.STOPPING, WorkerState.FAULTED) or self._stop_event.is_set(): + if ( + self._state in (WorkerState.STOPPING, WorkerState.FAULTED, WorkerState.STARTING) + or self._stop_event.is_set() + ): return t = self._worker_thread q = self._queue should_start = t is None or not t.is_alive() - frame_c = frame.copy() + with self._timing.measure("DLC.enqueue.copy_frame"): + frame_c = frame.copy() enq_time = time.perf_counter() if should_start: # Re-acquire the lifecycle lock to safely (re)start the worker if needed. with self._lifecycle_lock: # Re-check state in case it changed while we were copying the frame. - if self._state in (WorkerState.STOPPING, WorkerState.FAULTED) or self._stop_event.is_set(): + if ( + self._state in (WorkerState.STOPPING, WorkerState.FAULTED, WorkerState.STARTING) + or self._stop_event.is_set() + ): return t = self._worker_thread if t is None or not t.is_alive(): - # _start_worker_locked expects the lifecycle lock to be held. + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "Starting DLC worker from first frame: " + "shape=%s dtype=%s contiguous=%s strides=%s timestamp=%.6f", + frame_c.shape, + frame_c.dtype, + frame_c.flags["C_CONTIGUOUS"], + frame_c.strides, + timestamp, + ) self._start_worker_locked(frame_c, timestamp) return # Worker is now running; refresh queue snapshot. @@ -262,18 +287,20 @@ def enqueue_frame(self, frame: np.ndarray, timestamp: float) -> None: while True: try: - q.put_nowait(item) + with self._timing.measure("DLC.enqueue.put"): + q.put_nowait(item) with self._stats_lock: self._frames_enqueued += 1 return except queue.Full: try: - q.get_nowait() - try: - q.task_done() - except ValueError: - pass + with self._timing.measure("DLC.enqueue.drop_stale"): + q.get_nowait() + try: + q.task_done() + except ValueError: + pass with self._stats_lock: self._frames_dropped += 1 @@ -332,11 +359,91 @@ def get_stats(self) -> ProcessorStats: avg_processor_overhead=avg_proc_overhead, ) + def _debug_log_dlc_runner_device(self) -> None: + if not logger.isEnabledFor(logging.DEBUG): + return + + try: + import torch + + logger.debug( + "Torch CUDA state: available=%s built=%s device_count=%s current_device=%s device_name=%s " + "allocated=%.2fMB reserved=%.2fMB", + torch.cuda.is_available(), + torch.backends.cuda.is_built(), + torch.cuda.device_count(), + torch.cuda.current_device() if torch.cuda.is_available() else None, + torch.cuda.get_device_name(0) if torch.cuda.is_available() and torch.cuda.device_count() else None, + torch.cuda.memory_allocated(0) / (1024 * 1024) if torch.cuda.is_available() else 0.0, + torch.cuda.memory_reserved(0) / (1024 * 1024) if torch.cuda.is_available() else 0.0, + ) + except Exception: + logger.debug("Could not query torch CUDA state", exc_info=True) + + dlc = self._dlc + runner = getattr(dlc, "runner", None) + + logger.debug( + "DLCLive runner: type=%s runner.device=%r runner.model=%r runner.net=%r", + type(runner).__name__ if runner is not None else None, + getattr(runner, "device", None), + type(getattr(runner, "model", None)).__name__ if getattr(runner, "model", None) is not None else None, + type(getattr(runner, "net", None)).__name__ if getattr(runner, "net", None) is not None else None, + ) + + seen: set[int] = set() + + def walk(obj, path: str, depth: int = 0) -> None: + if obj is None or depth > 7: + return + + oid = id(obj) + if oid in seen: + return + seen.add(oid) + + try: + params = getattr(obj, "parameters", None) + if callable(params): + first_param = next(iter(params()), None) + if first_param is not None: + logger.debug( + "Torch module at %s: parameter device=%s is_cuda=%s dtype=%s shape=%s", + path, + first_param.device, + first_param.is_cuda, + first_param.dtype, + tuple(first_param.shape), + ) + except Exception: + pass + + for name in ( + "runner", + "model", + "net", + "pose_model", + "dlc_model", + "module", + "engine", + "predictor", + "detector", + "backbone", + ): + try: + child = getattr(obj, name, None) + except Exception: + child = None + if child is not None: + walk(child, f"{path}.{name}", depth + 1) + + walk(self._dlc, "self._dlc") + def _start_worker_locked(self, init_frame: np.ndarray, init_timestamp: float) -> None: # lifecycle_lock must already be held if self._worker_thread is not None and self._worker_thread.is_alive(): return - self._queue = queue.Queue(maxsize=1) + self._queue = None self._stop_event.clear() self._state = WorkerState.STARTING self._worker_thread = threading.Thread( @@ -364,7 +471,7 @@ def _stop_worker(self) -> bool: t.join(timeout=STOP_WORKER_TIMEOUT) if t.is_alive(): qsize = self._queue.qsize() if self._queue is not None else -1 - logger.warning("DLC worker thread did not terminate cleanly (qsize=%s)", qsize) + logger.warning("DLC worker thread did not terminate cleanly (qsize=%s)\n%s", qsize, format_thread_stack(t)) self._schedule_reap(t) return False @@ -427,7 +534,8 @@ def _timed_processor(self): def timed_process(pose, _op=original, _holder=holder, **kwargs): start = time.perf_counter() try: - return _op(pose, **kwargs) + with self._timing.measure("DLC.processor.process"): + return _op(pose, **kwargs) finally: _holder[0] = time.perf_counter() - start @@ -438,6 +546,24 @@ def timed_process(pose, _op=original, _holder=holder, **kwargs): # Restore even if inference/errors occur self._processor.process = original + @staticmethod + def _prepare_input_frame(frame: np.ndarray) -> np.ndarray: + """Normalize camera frames for DLCLive inference.""" + arr = np.asarray(frame) + + if arr.ndim == 2: + # Mono8 / grayscale -> 3-channel + arr = np.repeat(arr[:, :, None], 3, axis=2) + elif arr.ndim == 3 and arr.shape[2] == 4: + arr = arr[:, :, :3] + elif arr.ndim != 3 or arr.shape[2] != 3: + raise ValueError(f"Unsupported DLCLive input frame shape: {arr.shape}") + + if arr.dtype != np.uint8: + arr = np.clip(arr, 0, 255).astype(np.uint8, copy=False) + + return np.ascontiguousarray(arr) + def _process_frame( self, frame: np.ndarray, @@ -453,11 +579,23 @@ def _process_frame( if self._dlc is None: raise RuntimeError("DLCLive instance is not initialized.") # Time GPU inference (and processor overhead when present) + with self._timing.measure("DLC.prepare_frame"): + frame = self._prepare_input_frame(frame) with self._timed_processor() as proc_holder: inference_start = time.perf_counter() - raw_pose: Any = self._dlc.get_pose(frame, frame_time=timestamp) + + with self._timing.measure("DLC.process_frame"): + processed_frame = self._dlc.process_frame(frame) + + with self._timing.measure("DLC.runner.get_pose"): + self._dlc.pose = self._dlc.runner.get_pose(processed_frame) + + with self._timing.measure("DLC.post_process_pose"): + raw_pose: Any = self._dlc._post_process_pose(processed_frame, frame_time=timestamp) + inference_time = time.perf_counter() - inference_start - pose_arr: np.ndarray = validate_pose_array(raw_pose, source_backend=PoseBackends.DLC_LIVE) + with self._timing.measure("DLC.validate_pose"): + pose_arr: np.ndarray = validate_pose_array(raw_pose, source_backend=PoseBackends.DLC_LIVE) pose_packet = PosePacket( schema_version=0, keypoints=pose_arr, @@ -475,7 +613,8 @@ def _process_frame( # Emit pose (measure signal overhead) signal_start = time.perf_counter() - self.pose_ready.emit(PoseResult(pose=pose_packet.keypoints, timestamp=timestamp, packet=pose_packet)) + with self._timing.measure("DLC.emit.pose_ready"): + self.pose_ready.emit(PoseResult(pose=pose_packet.keypoints, timestamp=timestamp, packet=pose_packet)) signal_time = time.perf_counter() - signal_start end_ts = time.perf_counter() @@ -496,6 +635,8 @@ def _process_frame( self._gpu_inference_times.append(gpu_inference_time) self._processor_overhead_times.append(processor_overhead) + self._timing.note_frame() + self._timing.maybe_log() self.frame_processed.emit() def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: @@ -504,60 +645,98 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: if not self._settings.model_path: raise RuntimeError("No DLCLive model path configured.") - init_start = time.perf_counter() - dyn = self._settings.dynamic - if not isinstance(dyn, (list, tuple)) or len(dyn) != 3: - try: - dyn = dyn.to_tuple() - except Exception as e: - raise RuntimeError("Invalid dynamic crop settings format.") from e - enabled, margin, max_missing = dyn - - options = { - "model_path": self._settings.model_path, - "model_type": self._settings.model_type, - "processor": self._processor, - "dynamic": [enabled, margin, max_missing], - "resize": self._settings.resize, - "precision": self._settings.precision, - "single_animal": self._settings.single_animal, - } - if self._settings.device is not None: - options["device"] = self._settings.device + with self._timing.measure("DLC.build_options"): + dyn = self._settings.dynamic + if not isinstance(dyn, (list, tuple)) or len(dyn) != 3: + try: + dyn = dyn.to_tuple() + except Exception as e: + raise RuntimeError("Invalid dynamic crop settings format.") from e + enabled, margin, max_missing = dyn + + options = { + "model_path": self._settings.model_path, + "model_type": self._settings.model_type, + "processor": self._processor, + "dynamic": [enabled, margin, max_missing], + "resize": self._settings.resize, + "precision": self._settings.precision, + "single_animal": self._settings.single_animal, + } + if self._settings.device is not None: + options["device"] = self._settings.device + + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "DLC worker starting: model_path=%s model_type=%s device=%s " + "init_frame_shape=%s dtype=%s contiguous=%s", + self._settings.model_path, + self._settings.model_type, + self._settings.device, + init_frame.shape, + init_frame.dtype, + init_frame.flags["C_CONTIGUOUS"], + ) try: if DLCLive is None: raise RuntimeError( "DLCLive class is not available. Ensure the dlclive package is installed and can be imported." ) - self._dlc = DLCLive(**options) + with self._timing.measure("DLC.construct"): + self._dlc = DLCLive(**options) + self._timing.maybe_log() except Exception as exc: + self._timing.note_error() + self._timing.maybe_log() with self._lifecycle_lock: self._state = WorkerState.FAULTED raise RuntimeError( f"Failed to initialize DLCLive with model '{self._settings.model_path}': {exc}" ) from exc + if self._stop_event.is_set(): + logger.debug("DLC worker stop requested during construction; exiting before init_inference.") + return + + with self._timing.measure("DLC.prepare_init_frame"): + init_frame = self._prepare_input_frame(init_frame) + + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "Calling DLCLive.init_inference with frame shape=%s dtype=%s contiguous=%s", + init_frame.shape, + init_frame.dtype, + init_frame.flags["C_CONTIGUOUS"], + ) # First inference to initialize - init_inference_start = time.perf_counter() - self._dlc.init_inference(init_frame) - init_inference_time = time.perf_counter() - init_inference_start + with self._timing.measure("DLC.init_inference"): + self._dlc.init_inference(init_frame) + + self._debug_log_dlc_runner_device() + self._timing.note_frame() + self._timing.maybe_log() + + if self._stop_event.is_set(): + logger.debug("DLC worker stop requested after init_inference; exiting before RUNNING state.") + return # Pass DLCLive cfg to processor if available if hasattr(self._dlc, "processor") and hasattr(self._dlc.processor, "set_dlc_cfg"): - self._dlc.processor.set_dlc_cfg(getattr(self._dlc, "cfg", None)) + with self._timing.measure("DLC.processor.set_dlc_cfg"): + self._dlc.processor.set_dlc_cfg(getattr(self._dlc, "cfg", None)) self._initialized = True self.initialized.emit(True) with self._lifecycle_lock: + if self._stop_event.is_set(): + logger.debug("DLC worker stop requested before RUNNING state; exiting.") + return + + self._queue = queue.Queue(maxsize=1) self._state = WorkerState.RUNNING - total_init_time = time.perf_counter() - init_start - logger.info( - "DLCLive model initialized successfully (total: %.3fs, init_inference: %.3fs)", - total_init_time, - init_inference_time, - ) + logger.info("DLCLive model initialized successfully") # Emit pose for init frame & update stats (not dequeued) self._process_frame(init_frame, init_timestamp, time.perf_counter(), queue_wait_time=0.0) @@ -598,6 +777,8 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: try: self._process_frame(frame, ts, enq, queue_wait_time=0.0) except Exception as exc: + self._timing.note_error() + self._timing.maybe_log() logger.exception("Pose inference failed", exc_info=exc) self.error.emit(str(exc)) finally: @@ -610,11 +791,14 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: # Normal operation: timed get try: wait_start = time.perf_counter() - item = q.get(timeout=0.05) + with self._timing.measure("DLC.queue_get"): + item = q.get(timeout=0.05) queue_wait_time = time.perf_counter() - wait_start except queue.Empty: continue except Exception as exc: + self._timing.note_error() + self._timing.maybe_log() logger.exception("Error getting item from queue", exc_info=exc) with self._lifecycle_lock: self._state = WorkerState.FAULTED @@ -625,6 +809,8 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: frame, ts, enq = item self._process_frame(frame, ts, enq, queue_wait_time=queue_wait_time) except Exception as exc: + self._timing.note_error() + self._timing.maybe_log() logger.exception("Pose inference failed", exc_info=exc) self.error.emit(str(exc)) finally: @@ -635,6 +821,8 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: logger.info("DLC worker thread exiting") + self._timing.maybe_log() + class DLCService: """Wrap DLCLiveProcessor lifecycle & configuration.""" diff --git a/dlclivegui/utils/stats.py b/dlclivegui/utils/stats.py index 1edbf78..0ef0528 100644 --- a/dlclivegui/utils/stats.py +++ b/dlclivegui/utils/stats.py @@ -70,6 +70,7 @@ def __init__(self, parent: WorkerTimingStats, name: str): self.parent = parent self.name = name self.t0 = 0.0 + self.elapsed = 0.0 def __enter__(self): if self.parent.enabled: @@ -80,8 +81,8 @@ def __exit__(self, exc_type, exc, tb): if not self.parent.enabled: return False - dt = time.perf_counter() - self.t0 - self.parent._totals[self.name] = self.parent._totals.get(self.name, 0.0) + dt + self.elapsed = time.perf_counter() - self.t0 + self.parent._totals[self.name] = self.parent._totals.get(self.name, 0.0) + self.elapsed self.parent._counts[self.name] = self.parent._counts.get(self.name, 0) + 1 return False diff --git a/dlclivegui/utils/utils.py b/dlclivegui/utils/utils.py index 534e573..bd72958 100644 --- a/dlclivegui/utils/utils.py +++ b/dlclivegui/utils/utils.py @@ -1,7 +1,10 @@ from __future__ import annotations import re +import sys +import threading import time +import traceback from collections import deque from collections.abc import Iterable from dataclasses import dataclass @@ -87,6 +90,19 @@ def build_run_dir(session_dir: Path, *, use_timestamp: bool) -> Path: return run_dir +def format_thread_stack(thread: threading.Thread) -> str: + ident = thread.ident + if ident is None: + return f"Thread {thread.name!r} has no ident." + + frame = sys._current_frames().get(ident) + if frame is None: + return f"No Python stack frame found for thread {thread.name!r} ident={ident}." + + stack = "".join(traceback.format_stack(frame)) + return f"Stack for thread {thread.name!r} ident={ident}:\n{stack}" + + @dataclass(frozen=True) class RecordingPlan: session_dir: Path From 9d8f98f2e1b8cdec3b9234d455dc709dbf8c96eb Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Thu, 2 Jul 2026 13:30:37 +0200 Subject: [PATCH 06/19] Disable pose latency debug logging Comment out the camera-to-GUI pose latency calculation and debug log in `pose_ready_callback`. This removes the try/except-wrapped timing log path while leaving pose handling and display update behavior unchanged. --- dlclivegui/gui/main_window.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dlclivegui/gui/main_window.py b/dlclivegui/gui/main_window.py index 898540e..bbd3ef7 100644 --- a/dlclivegui/gui/main_window.py +++ b/dlclivegui/gui/main_window.py @@ -2061,12 +2061,12 @@ def _on_pose_ready(self, result: PoseResult) -> None: with self._dlc_timing.measure("DLC.pose_ready_callback"): self._last_pose = result - try: - latency_ms = (time.time() - float(result.timestamp)) * 1000.0 - if logger.isEnabledFor(logging.DEBUG): - logger.debug("DLC pose latency camera_timestamp_to_gui=%.2f ms", latency_ms) - except Exception: - pass + # try: + # latency_ms = (time.time() - float(result.timestamp)) * 1000.0 + # if logger.isEnabledFor(logging.DEBUG): + # logger.debug("DLC pose latency camera_timestamp_to_gui=%.2f ms", latency_ms) + # except Exception: + # pass if self._current_frame is not None: self._display_dirty = True From 035a974872f92b438b02c25156c023573155770c Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Thu, 2 Jul 2026 13:31:01 +0200 Subject: [PATCH 07/19] Fix shutdown order in camera preview stop Reorders preview teardown so inference is stopped before stopping the multi-camera controller. This avoids stopping the controller while inference is still active and keeps shutdown state cleanup consistent. --- dlclivegui/gui/main_window.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlclivegui/gui/main_window.py b/dlclivegui/gui/main_window.py index bbd3ef7..2d6c3e0 100644 --- a/dlclivegui/gui/main_window.py +++ b/dlclivegui/gui/main_window.py @@ -1723,9 +1723,9 @@ def _stop_preview(self) -> None: # Stop any active recording first self._stop_multi_camera_recording() - self.multi_camera_controller.stop() self._pending_recording_after_preview = False self._stop_inference(show_message=False) + self.multi_camera_controller.stop() self._fps_tracker.clear() self._last_display_time = 0.0 if hasattr(self, "camera_stats_label"): From e4f22df55e8400117f26965b1ce25c1716e4a53e Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Thu, 2 Jul 2026 10:15:13 +0200 Subject: [PATCH 08/19] Add mono-preserving output mode to GenTL Adds a `preserve_mono` option to the GenTL camera backend so Mono pixel formats can remain 2D Mono8 output instead of always converting to BGR. The backend now reports preserve-mono capability, exposes recommended/actual output format based on camera format, and persists detected pixel/output format metadata in settings. It also logs first-frame format details to make runtime format behavior easier to inspect. --- dlclivegui/cameras/backends/gentl_backend.py | 54 +++++++++++++++++++- 1 file changed, 52 insertions(+), 2 deletions(-) diff --git a/dlclivegui/cameras/backends/gentl_backend.py b/dlclivegui/cameras/backends/gentl_backend.py index f0eb641..db6f185 100644 --- a/dlclivegui/cameras/backends/gentl_backend.py +++ b/dlclivegui/cameras/backends/gentl_backend.py @@ -90,6 +90,8 @@ def __init__(self, settings): ns = {} self._fast_start: bool = bool(ns.get("fast_start", False)) + self._preserve_mono: bool = bool(getattr(settings, "preserve_mono", False) or ns.get("preserve_mono", False)) + self._logged_first_frame: bool = False raw_device_id = ns.get("device_id") or props.get("device_id") legacy_serial = ns.get("serial_number") or ns.get("serial") or props.get("serial_number") or props.get("serial") @@ -184,10 +186,20 @@ def actual_pixel_format(self) -> str | None: """Camera/native pixel format selected on the GenICam PixelFormat node.""" return self._camera_pixel_format or (self._pixel_format if self._pixel_format != "auto" else None) + @property + def recommended_preserve_mono(self) -> bool | None: + if not self._camera_pixel_format: + return None + return self._is_camera_mono() + @property def actual_output_format(self) -> str | None: - """Current GenTL backend emits OpenCV-native BGR uint8 frames.""" - return self._actual_output_format or "BGR8" + """Backend output frame format emitted to the app, e.g. 'Mono8' or 'BGR8'.""" + if self._actual_output_format: + return self._actual_output_format + if not self._camera_pixel_format: + return None + return "Mono8" if self._should_output_mono() else "BGR8" @classmethod def is_available(cls) -> bool: @@ -203,6 +215,7 @@ def static_capabilities(cls) -> dict[str, SupportLevel]: "device_discovery": SupportLevel.SUPPORTED, "stable_identity": SupportLevel.SUPPORTED, "hardware_trigger": SupportLevel.BEST_EFFORT, + "preserve_mono": SupportLevel.SUPPORTED, } def _debug_trigger_nodes(self, node_map, *, context: str = "") -> None: @@ -600,6 +613,13 @@ def waits_for_hardware_trigger(self) -> bool: role = str(self._trigger_attr(getattr(self, "_trigger", None), "role", "off") or "off").lower() return role in {"external", "follower"} + def _is_camera_mono(self) -> bool: + fmt = str(self._camera_pixel_format or self._pixel_format or "").strip() + return fmt.startswith("Mono") + + def _should_output_mono(self) -> bool: + return bool(self._preserve_mono and self._is_camera_mono()) + @staticmethod def _output_format_for_frame(frame: np.ndarray) -> str: if frame.ndim == 2: @@ -654,6 +674,25 @@ def read(self) -> CapturedFrame: except Exception: pass self._actual_output_format = self._output_format_for_frame(frame) + try: + ns = self._ensure_settings_ns() + ns["actual_output_format"] = self._actual_output_format + ns["preserve_mono"] = self._preserve_mono + except Exception: + pass + if not self._logged_first_frame: + self._logged_first_frame = True + LOG.info( + "[GenTL] first frame device_id=%s shape=%s dtype=%s nbytes=%.2f MB " + "camera_pixel_format=%s output_format=%s preserve_mono=%s", + self._device_id, + frame.shape, + frame.dtype, + frame.nbytes / (1024 * 1024), + self._camera_pixel_format, + self.actual_output_format, + self._preserve_mono, + ) return CapturedFrame( frame=frame, @@ -1346,6 +1385,14 @@ def _configure_pixel_format(self, node_map) -> None: pixel_format_node.value = selected self._pixel_format = str(pixel_format_node.value) self._camera_pixel_format = self._pixel_format + try: + ns = self._ensure_settings_ns() + ns["actual_pixel_format"] = self._camera_pixel_format + ns["detected_pixel_format"] = self._camera_pixel_format + ns["actual_output_format"] = self.actual_output_format + ns["preserve_mono"] = self._preserve_mono + except Exception: + pass LOG.debug("GenTL pixel format selected: %s", self._pixel_format) @@ -1851,6 +1898,9 @@ def _convert_frame(self, frame: np.ndarray) -> np.ndarray: frame = cv2.cvtColor(frame, cv2.COLOR_BayerGR2BGR) elif fmt == "BayerBG8": frame = cv2.cvtColor(frame, cv2.COLOR_BayerBG2BGR) + elif self._should_output_mono(): + # Keep Mono* cameras as 2D uint8 frames when explicitly requested. + pass else: frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) From 37ad84d2210ac0b5073dfa4e9faae2485e7703cb Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Thu, 2 Jul 2026 15:18:25 +0200 Subject: [PATCH 09/19] Async recording stop and queued frame dispatch Move recording shutdown off the UI thread and finalize it via a Qt signal so stop actions no longer block the interface. RecordingManager now uses a lock-protected recorder map plus a bounded background frame-dispatch queue/thread to decouple frame intake from disk writes, drop frames under sustained backpressure, and cleanly stop/tear down recorders. The recording-with-overlays UI path is also disabled in this change. --- dlclivegui/gui/main_window.py | 54 +++++++-- dlclivegui/gui/recording_manager.py | 180 +++++++++++++++++++++++----- 2 files changed, 191 insertions(+), 43 deletions(-) diff --git a/dlclivegui/gui/main_window.py b/dlclivegui/gui/main_window.py index 2d6c3e0..afb4f0c 100644 --- a/dlclivegui/gui/main_window.py +++ b/dlclivegui/gui/main_window.py @@ -6,12 +6,13 @@ import json import logging import os +import threading import time from pathlib import Path import cv2 import numpy as np -from PySide6.QtCore import QRect, QSettings, Qt, QTimer, QUrl +from PySide6.QtCore import QRect, QSettings, Qt, QTimer, QUrl, Signal from PySide6.QtGui import ( QAction, QActionGroup, @@ -87,6 +88,8 @@ class DLCLiveMainWindow(QMainWindow): """Main application window.""" + _recording_stopped_async = Signal() + def __init__(self, config: ApplicationSettings | None = None): super().__init__() self.setWindowTitle("DeepLabCut Live GUI") @@ -178,6 +181,9 @@ def __init__(self, config: ApplicationSettings | None = None): self._dlc_tile_scale: tuple[float, float] = (1.0, 1.0) # (scale_x, scale_y) # Display flag (decoupled from frame capture for performance) self._display_dirty: bool = False + # Recording state + self._recording_stopping = False + self._recording_stopped_async.connect(self._on_recording_stopped_async) self._load_icons() self._preview_pixmap = QPixmap(LOGO_ALPHA) @@ -641,11 +647,11 @@ def _build_recording_group(self) -> QGroupBox: form.addRow(grid) # Recording options - self.record_with_overlays_checkbox = QCheckBox("Record video with overlays") - self.record_with_overlays_checkbox.setToolTip( - "Enable to include pose overlays in recorded video (keypoints & bounding boxes)" - ) - self.record_with_overlays_checkbox.setChecked(False) + # self.record_with_overlays_checkbox = QCheckBox("Record video with overlays") + # self.record_with_overlays_checkbox.setToolTip( + # "Enable to include pose overlays in recorded video (keypoints & bounding boxes)" + # ) + # self.record_with_overlays_checkbox.setChecked(False) self.fast_encoding_checkbox = QCheckBox("Use faster encoding parameters") self.fast_encoding_checkbox.setToolTip( @@ -658,7 +664,7 @@ def _build_recording_group(self) -> QGroupBox: recording_options = QWidget() recording_options_layout = QHBoxLayout(recording_options) recording_options_layout.setContentsMargins(0, 0, 0, 0) - recording_options_layout.addWidget(self.record_with_overlays_checkbox) + # recording_options_layout.addWidget(self.record_with_overlays_checkbox) recording_options_layout.addWidget(self.fast_encoding_checkbox) recording_options_layout.addStretch(1) @@ -1454,8 +1460,8 @@ def _on_recording_frame_ready( if not self._rec_manager.is_active: return - if self.record_with_overlays_checkbox.isChecked(): - frame = self._render_overlays_for_recording(camera_id, frame) + # if self.record_with_overlays_checkbox.isChecked(): + # frame = self._render_overlays_for_recording(camera_id, frame) self._rec_manager.write_frame(camera_id, frame, timestamp, timestamp_metadata=timestamp_metadata) @@ -1611,9 +1617,35 @@ def _stop_multi_camera_recording(self) -> None: if not self._rec_manager.is_active: return - self.multi_camera_controller.set_recording_frame_do_emit(False) + if getattr(self, "_recording_stopping", False): + return + + self._recording_stopping = True - self._rec_manager.stop_all() + self.start_record_button.setEnabled(False) + self.stop_record_button.setEnabled(False) + self.statusBar().showMessage("Stopping multi-camera recording…", 3000) + + # Stop frame emission immediately so no new frames enter recording pipeline. + try: + self.multi_camera_controller.set_recording_frame_do_emit(False) + except Exception: + logger.exception("Failed to disable recording frame emission") + + def worker(): + try: + self._rec_manager.stop_all() + finally: + self._recording_stopped_async.emit() + + threading.Thread( + target=worker, + name="StopRecordingWorker", + daemon=True, + ).start() + + def _on_recording_stopped_async(self) -> None: + self._recording_stopping = False self.start_record_button.setEnabled(True) self.stop_record_button.setEnabled(False) self.statusBar().showMessage("Multi-camera recording stopped", 3000) diff --git a/dlclivegui/gui/recording_manager.py b/dlclivegui/gui/recording_manager.py index ddcef47..e08020c 100644 --- a/dlclivegui/gui/recording_manager.py +++ b/dlclivegui/gui/recording_manager.py @@ -1,6 +1,8 @@ from __future__ import annotations import logging +import queue +import threading import time from pathlib import Path @@ -14,6 +16,8 @@ log = logging.getLogger(__name__) +_FRAME_SENTINEL = object() + class RecordingManager: """Handle multi-camera recording lifecycle and filenames.""" @@ -23,21 +27,30 @@ def __init__(self): self._session_dir: Path | None = None self._run_dir: Path | None = None + self._lock = threading.RLock() + self._frame_queue: queue.Queue | None = None + self._dispatch_thread: threading.Thread | None = None + self._dispatch_stop = threading.Event() + @property def is_active(self) -> bool: - return bool(self._recorders) + with self._lock: + return bool(self._recorders) @property def recorders(self) -> dict[str, VideoRecorder]: - return self._recorders + with self._lock: + return dict(self._recorders) @property def session_dir(self) -> Path | None: - return self._session_dir + with self._lock: + return self._session_dir @property def run_dir(self) -> Path | None: - return self._run_dir + with self._lock: + return self._run_dir @staticmethod def _backend_ns(cam: CameraSettings) -> dict: @@ -89,7 +102,71 @@ def _resolve_recording_fps( return None def pop(self, cam_id: str, default=None) -> VideoRecorder | None: - return self._recorders.pop(cam_id, default) + with self._lock: + return self._recorders.pop(cam_id, default) + + def _start_dispatcher(self) -> None: + with self._lock: + if self._dispatch_thread is not None and self._dispatch_thread.is_alive(): + return + + self._dispatch_stop.clear() + self._frame_queue = queue.Queue(maxsize=4096) + self._dispatch_thread = threading.Thread( + target=self._dispatch_loop, + name="RecordingManagerDispatcher", + daemon=True, + ) + self._dispatch_thread.start() + + def _stop_dispatcher(self, timeout: float = 2.0) -> None: + self._dispatch_stop.set() + + with self._lock: + q = self._frame_queue + t = self._dispatch_thread + + if q is not None: + try: + q.put_nowait(_FRAME_SENTINEL) + except queue.Full: + pass + + if t is not None: + t.join(timeout=timeout) + if t.is_alive(): + log.warning("Recording frame dispatcher did not stop within %.1fs", timeout) + + with self._lock: + self._dispatch_thread = None + self._frame_queue = None + self._dispatch_stop.clear() + + def _dispatch_loop(self) -> None: + with self._lock: + q = self._frame_queue + + if q is None: + return + + while not self._dispatch_stop.is_set(): + try: + item = q.get(timeout=0.1) + except queue.Empty: + continue + + try: + if item is _FRAME_SENTINEL: + break + + cam_id, frame, timestamp = item + self._write_frame_now(cam_id, frame, timestamp) + + finally: + try: + q.task_done() + except ValueError: + pass def start_all( self, @@ -117,8 +194,9 @@ def start_all( Returns: run_dir if at least one recorder started, else None. """ - if self._recorders: - return self._run_dir + with self._lock: + if self._recorders: + return self._run_dir if not active_cams: return None @@ -135,8 +213,9 @@ def start_all( log.error("Failed to create run dir: %s", exc) return None - self._session_dir = session_dir - self._run_dir = run_dir + with self._lock: + self._session_dir = session_dir + self._run_dir = run_dir started_any = False @@ -174,7 +253,8 @@ def start_all( ) try: recorder.start() - self._recorders[cam_id] = recorder + with self._lock: + self._recorders[cam_id] = recorder started_any = True log.info("Started recording %s -> %s", cam_id, cam_path) except Exception as exc: @@ -184,30 +264,40 @@ def start_all( return None if not started_any: - self._recorders.clear() - self._session_dir = None - self._run_dir = None + with self._lock: + self._recorders.clear() + self._session_dir = None + self._run_dir = None return None + self._start_dispatcher() return run_dir def stop_all(self) -> None: - for cam_id, rec in self._recorders.items(): + self._stop_dispatcher() + + with self._lock: + recorders = list(self._recorders.items()) + self._recorders.clear() + + for cam_id, rec in recorders: try: rec.stop() log.info("Stopped recording %s", cam_id) except Exception as exc: log.warning("Error stopping recorder for %s: %s", cam_id, exc) - self._recorders.clear() - self._session_dir = None - self._run_dir = None - - def write_frame( - self, cam_id: str, frame: np.ndarray, timestamp: float | None = None, timestamp_metadata: object | None = None - ) -> None: - rec = self._recorders.get(cam_id) + + with self._lock: + self._session_dir = None + self._run_dir = None + + def _write_frame_now(self, cam_id: str, frame: np.ndarray, timestamp: float | None = None, timestamp_metadata: object | None = None) -> None: + with self._lock: + rec = self._recorders.get(cam_id) + if not rec or not rec.is_running: return + try: rec.write( frame, @@ -216,18 +306,40 @@ def write_frame( ) except Exception as exc: log.warning( - "Failed to write frame for %s: %s: %s frame_shape=%s dtype=%s", + "Failed to write frame for %s: %s: %s frame_shape=%s dtype=%s. Removing recorder.", cam_id, type(exc).__name__, str(exc) or repr(exc), getattr(frame, "shape", None), getattr(frame, "dtype", None), ) - try: - rec.stop() - except Exception: - log.exception("Failed to stop recorder for %s after write error.") - self._recorders.pop(cam_id, None) + + with self._lock: + rec = self._recorders.pop(cam_id, None) + + if rec is not None: + try: + rec.stop() + except Exception: + log.exception("Failed to stop recorder for %s after write error.", cam_id) + + def write_frame(self, cam_id: str, frame: np.ndarray, timestamp: float | None = None, timestamp_metadata: object | None = None) -> None: + with self._lock: + q = self._frame_queue + active = cam_id in self._recorders + + if not active or q is None: + return + + try: + q.put_nowait((cam_id, frame, timestamp if timestamp is not None else time.time(), timestamp_metadata)) + except queue.Full: + log.warning( + "Recording manager frame queue full; dropping frame for %s. frame_shape=%s dtype=%s", + cam_id, + getattr(frame, "shape", None), + getattr(frame, "dtype", None), + ) def get_stats_summary(self) -> str: totals = { @@ -241,7 +353,11 @@ def get_stats_summary(self) -> str: "max_latency": 0.0, "avg_latencies": [], } - for rec in self._recorders.values(): + + with self._lock: + recorders = list(self._recorders.values()) + + for rec in recorders: stats: RecorderStats | None = rec.get_stats() if not stats: continue @@ -255,8 +371,8 @@ def get_stats_summary(self) -> str: totals["max_latency"] = max(totals["max_latency"], stats.last_latency) totals["avg_latencies"].append(stats.average_latency) - if len(self._recorders) == 1: - rec = next(iter(self._recorders.values())) + if len(recorders) == 1: + rec = recorders[0] stats = rec.get_stats() if stats: from dlclivegui.utils.stats import format_recorder_stats @@ -271,7 +387,7 @@ def get_stats_summary(self) -> str: fill_pct = (100.0 * totals["queue"] / buffer) if buffer > 0 else 0.0 return ( - f"{len(self._recorders)} cams | {totals['written']}/{totals['enqueued']} frames | " + f"{len(recorders)} cams | {totals['written']}/{totals['enqueued']} frames | " f"writer {totals['write_fps']:.1f} fps | " f"latency {totals['max_latency'] * 1000:.1f}ms (avg {avg * 1000:.1f}ms) | " f"queue {queue_text} ({fill_pct:.0f}%) | " From 1487cfd6cec5028199c20e108aae4bedb55f0c81 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Thu, 2 Jul 2026 15:24:09 +0200 Subject: [PATCH 10/19] Make fps value float for write gear --- dlclivegui/config.py | 2 +- dlclivegui/services/video_recorder.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dlclivegui/config.py b/dlclivegui/config.py index f1347af..585ead2 100644 --- a/dlclivegui/config.py +++ b/dlclivegui/config.py @@ -558,7 +558,7 @@ def writegear_options(self, fps: float | None) -> dict[str, Any]: crf_value = int(self.crf) if self.crf is not None else 23 opts: dict[str, Any] = { - "-input_framerate": f"{fps_value:.6f}", + "-input_framerate": float(fps_value), "-vcodec": codec_value, "-crf": str(crf_value), } diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index 44369a5..b1831d0 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -184,7 +184,7 @@ def start(self) -> None: writer_kwargs: dict[str, Any] = { "compression_mode": True, "logging": False, - "-input_framerate": fps_value, + "-input_framerate": float(fps_value), "-vcodec": codec_value, "-crf": int(self._crf), } From 279338551a07b3d00bb61d03d1c7fab61c85068a Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Thu, 2 Jul 2026 16:28:04 +0200 Subject: [PATCH 11/19] Refactor camera worker and recording pipeline Extracts `SingleCameraWorker` into a new `services/camera_controller.py` module and moves per-frame rotation/cropping plus recording-sink writes into the worker thread. `MultiCameraController` now injects and updates a shared recording sink on workers, and recording enable/disable is propagated directly to active workers. The previous recording-frame emission path and transform handling in the multi-camera slot are removed/commented out to avoid duplicate processing and keep the controller focused on frame aggregation. Also relocates `recording_manager.py` from `gui` to `services` with a file rename. --- dlclivegui/services/camera_controller.py | 255 ++++++++++++++++++ .../services/multi_camera_controller.py | 247 ++--------------- .../{gui => services}/recording_manager.py | 0 3 files changed, 280 insertions(+), 222 deletions(-) create mode 100644 dlclivegui/services/camera_controller.py rename dlclivegui/{gui => services}/recording_manager.py (100%) diff --git a/dlclivegui/services/camera_controller.py b/dlclivegui/services/camera_controller.py new file mode 100644 index 0000000..428503c --- /dev/null +++ b/dlclivegui/services/camera_controller.py @@ -0,0 +1,255 @@ +from __future__ import annotations + +import copy +import logging +import time +from threading import Event, Lock + +import cv2 +import numpy as np +from PySide6.QtCore import QObject, Signal, Slot + +from dlclivegui.cameras import CameraFactory +from dlclivegui.cameras.base import CameraBackend + +# from dlclivegui.config import CameraSettings +from dlclivegui.config import ( + SINGLE_CAMERA_WORKER_DO_LOG_TIMING, + CameraSettings, +) +from dlclivegui.utils.stats import WorkerTimingStats + +logger = logging.getLogger(__name__) + + +class SingleCameraWorker(QObject): + """Worker for a single camera in multi-camera mode.""" + + frame_captured = Signal(str, object, float) # camera_id, frame, timestamp + error_occurred = Signal(str, str) # camera_id, error_message + runtime_info = Signal(str, object) # camera_id, dict of runtime info + started = Signal(str) # camera_id + stopped = Signal(str) # camera_id + + def __init__(self, camera_id: str, settings: CameraSettings): + super().__init__() + self._camera_id = camera_id + self._settings = copy.deepcopy(settings) + self._stop_event = Event() + self._backend: CameraBackend | None = None + self._max_consecutive_errors = 5 + self._retry_delay = 0.1 + self._trigger_timeout_delay = 0.05 + self._trigger_wait_log_interval = 2.0 + self._last_trigger_wait_log = 0.0 + self._trigger_wait_suppressed_count = 0 + + self._recording_sink = None + self._recording_enabled = False + self._recording_sink_lock = Lock() + + # Performance logs + self._timing = WorkerTimingStats( + camera_id, logger=logger, log_interval=1.0, enabled=SINGLE_CAMERA_WORKER_DO_LOG_TIMING + ) + + def set_recording_sink(self, sink) -> None: + with self._recording_sink_lock: + self._recording_sink = sink + + def set_recording_enabled(self, enabled: bool) -> None: + with self._recording_sink_lock: + self._recording_enabled = bool(enabled) + + @Slot() + def run(self) -> None: + self._stop_event.clear() + + try: + logger.debug( + "[Worker %s] before create: backend=%s index=%s properties=%s", + self._camera_id, + self._settings.backend, + self._settings.index, + self._settings.properties, + ) + + self._backend = CameraFactory.create(self._settings) + + logger.debug( + "[Worker %s] after create: backend=%s index=%s properties=%s", + self._camera_id, + self._backend.settings.backend, + self._backend.settings.index, + self._backend.settings.properties, + ) + + self._backend.open() + self.runtime_info.emit( + self._camera_id, + { + "actual_fps": getattr(self._backend, "actual_fps", None), + "actual_resolution": getattr(self._backend, "actual_resolution", None), + "actual_pixel_format": getattr(self._backend, "actual_pixel_format", None), + "actual_output_format": getattr(self._backend, "actual_output_format", None), + }, + ) + except Exception as exc: + logger.exception(f"Failed to initialize camera {self._camera_id}", exc_info=exc) + self.error_occurred.emit(self._camera_id, f"Failed to initialize camera: {exc}") + self.stopped.emit(self._camera_id) + return + + self.started.emit(self._camera_id) + consecutive_errors = 0 + + while not self._stop_event.is_set(): + try: + with self._timing.measure("Single.read"): + frame, timestamp = self._backend.read() + if frame is None or frame.size == 0: + consecutive_errors += 1 + if consecutive_errors >= self._max_consecutive_errors: + self.error_occurred.emit( + self._camera_id, "Too many empty frames.\nWas the device disconnected ?" + ) + break + if self._stop_event.wait(self._retry_delay): + break + continue + + consecutive_errors = 0 + with self._timing.measure("Single.transforms"): + frame = self._apply_worker_transforms(frame) + + with self._recording_sink_lock: + recording_enabled = self._recording_enabled + recording_sink = self._recording_sink + + if recording_enabled and recording_sink is not None: + try: + with self._timing.measure("Single.recording_sink"): + recording_sink(self._camera_id, frame, timestamp) + except Exception as exc: + logger.exception(f"Failed to write frame for camera {self._camera_id}: {exc}") + + with self._timing.measure("Single.emit"): + self.frame_captured.emit(self._camera_id, frame, timestamp) + + self._timing.note_frame() + self._timing.maybe_log() + + except TimeoutError as exc: + self._timing.note_timeout() + self._timing.maybe_log() + if self._stop_event.is_set(): + break + + # In hardware-trigger mode, a timeout usually means: + # "no trigger pulse arrived during this poll interval". + # This is expected and should not count as a camera failure. + if bool(getattr(self._backend, "waits_for_hardware_trigger", False)): + self._log_trigger_wait_throttled(exc) + consecutive_errors = 0 + + if self._stop_event.wait(self._trigger_timeout_delay): + break # Stop event set during wait + continue + + consecutive_errors += 1 + if consecutive_errors >= self._max_consecutive_errors: + self.error_occurred.emit(self._camera_id, f"Camera read timeout: {exc}") + break + if self._stop_event.wait(self._retry_delay): + break + continue + + except Exception as exc: + self._timing.note_error() + self._timing.maybe_log() + consecutive_errors += 1 + if self._stop_event.is_set(): + break + if consecutive_errors >= self._max_consecutive_errors: + self.error_occurred.emit(self._camera_id, f"Camera read error: {exc}") + break + if self._stop_event.wait(self._retry_delay): + break + continue + + # Cleanup + if self._backend is not None: + try: + self._backend.close() + except Exception: + pass + self.stopped.emit(self._camera_id) + + def stop(self) -> None: + self._stop_event.set() + + @staticmethod + def apply_rotation(frame: np.ndarray, degrees: int) -> np.ndarray: + """Apply rotation to frame.""" + if degrees == 90: + return cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) + elif degrees == 180: + return cv2.rotate(frame, cv2.ROTATE_180) + elif degrees == 270: + return cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE) + return frame + + @staticmethod + def apply_crop(frame: np.ndarray, crop_region: tuple[int, int, int, int]) -> np.ndarray: + """Apply crop to frame.""" + x0, y0, x1, y1 = crop_region + height, width = frame.shape[:2] + + x0 = max(0, min(x0, width)) + y0 = max(0, min(y0, height)) + x1 = max(x0, min(x1, width)) if x1 > 0 else width + y1 = max(y0, min(y1, height)) if y1 > 0 else height + + if x0 < x1 and y0 < y1: + return frame[y0:y1, x0:x1] + return frame + + def _apply_worker_transforms(self, frame: np.ndarray) -> np.ndarray: + if self._settings.rotation: + frame = self.apply_rotation(frame, self._settings.rotation) + + crop_region = self._settings.get_crop_region() + if crop_region: + frame = self.apply_crop(frame, crop_region) + + return frame + + def _log_trigger_wait_throttled(self, exc: BaseException) -> None: + """Log hardware-trigger wait timeouts at a controlled rate. + + In trigger-waiting modes, read timeouts are expected polling misses. + Without throttling, the log can be flooded at ~10-20 messages/sec/camera. + """ + now = time.monotonic() + + if now - self._last_trigger_wait_log < self._trigger_wait_log_interval: + self._trigger_wait_suppressed_count += 1 + return + + suppressed = self._trigger_wait_suppressed_count + self._trigger_wait_suppressed_count = 0 + self._last_trigger_wait_log = now + + if suppressed: + logger.debug( + "[Worker %s] waiting for hardware trigger: %s (suppressed %d repeated timeout logs)", + self._camera_id, + exc, + suppressed, + ) + else: + logger.debug( + "[Worker %s] waiting for hardware trigger: %s", + self._camera_id, + exc, + ) diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index 97c53dc..d91f4ea 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -7,26 +7,25 @@ import time from dataclasses import dataclass from functools import partial -from threading import Event, Lock +from threading import Lock import cv2 import numpy as np -from PySide6.QtCore import QObject, QThread, Signal, Slot +from PySide6.QtCore import QObject, QThread, Signal from PySide6.QtGui import QImage, QPixmap -from dlclivegui.cameras import CameraFactory -from dlclivegui.cameras.base import CameraBackend from dlclivegui.cameras.factory import camera_identity_key # from dlclivegui.config import CameraSettings from dlclivegui.config import ( GUI_MAX_DISPLAY_FPS, MULTI_CAMERA_WORKER_DO_LOG_TIMING, - SINGLE_CAMERA_WORKER_DO_LOG_TIMING, CameraSettings, ) from dlclivegui.utils.stats import WorkerTimingStats +from .camera_controller import SingleCameraWorker + LOGGER = logging.getLogger(__name__) QUIT_WAIT_MS = 5000 # wait for cooperative quit (5s) @@ -44,181 +43,6 @@ class MultiFrameData: display_ids: dict[str, str] = None # camera_id -> display_id (for labeling) -class SingleCameraWorker(QObject): - """Worker for a single camera in multi-camera mode.""" - - frame_captured = Signal(str, object, float, object) # camera_id, frame, timestamp, timestamp_metadata - error_occurred = Signal(str, str) # camera_id, error_message - runtime_info = Signal(str, object) # camera_id, dict of runtime info - started = Signal(str) # camera_id - stopped = Signal(str) # camera_id - - def __init__(self, camera_id: str, settings: CameraSettings): - super().__init__() - self._camera_id = camera_id - self._settings = copy.deepcopy(settings) - self._stop_event = Event() - self._backend: CameraBackend | None = None - self._max_consecutive_errors = 5 - self._retry_delay = 0.1 - self._trigger_timeout_delay = 0.05 - - self._trigger_wait_log_interval = 2.0 - self._last_trigger_wait_log = 0.0 - self._trigger_wait_suppressed_count = 0 - - # Performance logs - self._timing = WorkerTimingStats( - camera_id, logger=LOGGER, log_interval=1.0, enabled=SINGLE_CAMERA_WORKER_DO_LOG_TIMING - ) - - @Slot() - def run(self) -> None: - self._stop_event.clear() - - try: - LOGGER.debug( - "[Worker %s] before create: backend=%s index=%s properties=%s", - self._camera_id, - self._settings.backend, - self._settings.index, - self._settings.properties, - ) - - self._backend = CameraFactory.create(self._settings) - - LOGGER.debug( - "[Worker %s] after create: backend=%s index=%s properties=%s", - self._camera_id, - self._backend.settings.backend, - self._backend.settings.index, - self._backend.settings.properties, - ) - - self._backend.open() - self.runtime_info.emit( - self._camera_id, - { - "actual_fps": getattr(self._backend, "actual_fps", None), - "actual_resolution": getattr(self._backend, "actual_resolution", None), - "actual_pixel_format": getattr(self._backend, "actual_pixel_format", None), - "actual_output_format": getattr(self._backend, "actual_output_format", None), - }, - ) - except Exception as exc: - LOGGER.exception(f"Failed to initialize camera {self._camera_id}", exc_info=exc) - self.error_occurred.emit(self._camera_id, f"Failed to initialize camera: {exc}") - self.stopped.emit(self._camera_id) - return - - self.started.emit(self._camera_id) - consecutive_errors = 0 - - while not self._stop_event.is_set(): - try: - with self._timing.measure("Single.read"): - captured = self._backend.read() - frame = captured.frame - timestamp = captured.software_timestamp - timestamp_metadata = captured.timestamp_metadata - if frame is None or frame.size == 0: - consecutive_errors += 1 - if consecutive_errors >= self._max_consecutive_errors: - self.error_occurred.emit( - self._camera_id, "Too many empty frames.\nWas the device disconnected ?" - ) - break - if self._stop_event.wait(self._retry_delay): - break - continue - - consecutive_errors = 0 - with self._timing.measure("Single.emit.frame_captured"): - self.frame_captured.emit(self._camera_id, frame, timestamp, timestamp_metadata) - - self._timing.note_frame() - self._timing.maybe_log() - - except TimeoutError as exc: - self._timing.note_timeout() - self._timing.maybe_log() - if self._stop_event.is_set(): - break - - # In hardware-trigger mode, a timeout usually means: - # "no trigger pulse arrived during this poll interval". - # This is expected and should not count as a camera failure. - if bool(getattr(self._backend, "waits_for_hardware_trigger", False)): - self._log_trigger_wait_throttled(exc) - consecutive_errors = 0 - - if self._stop_event.wait(self._trigger_timeout_delay): - break # Stop event set during wait - continue - - consecutive_errors += 1 - if consecutive_errors >= self._max_consecutive_errors: - self.error_occurred.emit(self._camera_id, f"Camera read timeout: {exc}") - break - if self._stop_event.wait(self._retry_delay): - break - continue - - except Exception as exc: - self._timing.note_error() - self._timing.maybe_log() - consecutive_errors += 1 - if self._stop_event.is_set(): - break - if consecutive_errors >= self._max_consecutive_errors: - self.error_occurred.emit(self._camera_id, f"Camera read error: {exc}") - break - if self._stop_event.wait(self._retry_delay): - break - continue - - # Cleanup - if self._backend is not None: - try: - self._backend.close() - except Exception: - pass - self.stopped.emit(self._camera_id) - - def stop(self) -> None: - self._stop_event.set() - - def _log_trigger_wait_throttled(self, exc: BaseException) -> None: - """Log hardware-trigger wait timeouts at a controlled rate. - - In trigger-waiting modes, read timeouts are expected polling misses. - Without throttling, the log can be flooded at ~10-20 messages/sec/camera. - """ - now = time.monotonic() - - if now - self._last_trigger_wait_log < self._trigger_wait_log_interval: - self._trigger_wait_suppressed_count += 1 - return - - suppressed = self._trigger_wait_suppressed_count - self._trigger_wait_suppressed_count = 0 - self._last_trigger_wait_log = now - - if suppressed: - LOGGER.debug( - "[Worker %s] waiting for hardware trigger: %s (suppressed %d repeated timeout logs)", - self._camera_id, - exc, - suppressed, - ) - else: - LOGGER.debug( - "[Worker %s] waiting for hardware trigger: %s", - self._camera_id, - exc, - ) - - def get_display_id(settings: CameraSettings) -> str: """Return the human-friendly camera label used for GUI display. Intentionally different from get_camera_id(), which should return a stable @@ -325,6 +149,7 @@ def __init__(self): self._frame_lock = Lock() self._running = False self._recording_frame_emission_enabled: bool = False + self._recording_sink = None self._started_cameras: set = set() self._camera_display_order: list[str] = [] self._display_ids: dict[str, str] = {} # camera_id -> display_id (for labeling) @@ -358,12 +183,9 @@ def _timing_for_camera(self, camera_id: str) -> WorkerTimingStats: return timing def set_recording_frame_do_emit(self, enabled: bool) -> None: - """Enable/disable the lightweight per-camera recording frame signal. - - This avoids sending recording-only traffic when the user is only previewing - or running DLC. - """ self._recording_frame_emission_enabled = bool(enabled) + for worker in list(self._workers.values()): + worker.set_recording_enabled(enabled) def _should_emit_display_ready(self) -> bool: """Return True when the UI/display path should be updated. @@ -459,6 +281,8 @@ def _start_camera(self, settings: CameraSettings) -> None: self._display_ids[cam_id] = display_id dc = self._settings[cam_id] worker = SingleCameraWorker(cam_id, dc) + worker.set_recording_sink(self._recording_sink) + worker.set_recording_enabled(self._recording_frame_emission_enabled) thread = QThread() worker.moveToThread(thread) @@ -476,6 +300,11 @@ def _start_camera(self, settings: CameraSettings) -> None: worker.stopped.connect(thread.quit) thread.start() + def set_recording_sink(self, sink) -> None: + self._recording_sink = sink + for worker in list(self._workers.values()): + worker.set_recording_sink(sink) + def _cleanup_camera(self, camera_id: str) -> None: # remove stored frame data with self._frame_lock: @@ -581,20 +410,20 @@ def _on_frame_captured( frame_data: MultiFrameData | None = None with timing.measure("Multi.slot.total"): - settings = self._settings.get(camera_id) + self._settings.get(camera_id) - with timing.measure("Multi.apply_transforms"): - if settings and settings.rotation: - frame = MultiCameraController.apply_rotation(frame, settings.rotation) + # with timing.measure("Multi.apply_transforms"): + # if settings and settings.rotation: + # frame = MultiCameraController.apply_rotation(frame, settings.rotation) - if settings: - crop_region = settings.get_crop_region() - if crop_region: - frame = MultiCameraController.apply_crop(frame, crop_region) + # if settings: + # crop_region = settings.get_crop_region() + # if crop_region: + # frame = MultiCameraController.apply_crop(frame, crop_region) - if self._recording_frame_emission_enabled: - with timing.measure("Multi.emit.recording_frame_ready"): - self.recording_frame_ready.emit(camera_id, frame, timestamp, timestamp_metadata) + # if self._recording_frame_emission_enabled: + # with timing.measure("Multi.emit.recording_frame_ready"): + # self.recording_frame_ready.emit(camera_id, frame, timestamp) with self._frame_lock: with timing.measure("Multi.store_latest"): @@ -670,32 +499,6 @@ def actual_fps_by_camera_id(self) -> dict[str, float]: return out - @staticmethod - def apply_rotation(frame: np.ndarray, degrees: int) -> np.ndarray: - """Apply rotation to frame.""" - if degrees == 90: - return cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) - elif degrees == 180: - return cv2.rotate(frame, cv2.ROTATE_180) - elif degrees == 270: - return cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE) - return frame - - @staticmethod - def apply_crop(frame: np.ndarray, crop_region: tuple[int, int, int, int]) -> np.ndarray: - """Apply crop to frame.""" - x0, y0, x1, y1 = crop_region - height, width = frame.shape[:2] - - x0 = max(0, min(x0, width)) - y0 = max(0, min(y0, height)) - x1 = max(x0, min(x1, width)) if x1 > 0 else width - y1 = max(y0, min(y1, height)) if y1 > 0 else height - - if x0 < x1 and y0 < y1: - return frame[y0:y1, x0:x1] - return frame - @staticmethod def apply_resize(frame: np.ndarray, max_w: int, max_h: int, allow_upscale: bool = False) -> np.ndarray: """Resize frame to fit within max dimensions while maintaining aspect ratio.""" diff --git a/dlclivegui/gui/recording_manager.py b/dlclivegui/services/recording_manager.py similarity index 100% rename from dlclivegui/gui/recording_manager.py rename to dlclivegui/services/recording_manager.py From a5d53b254964cfbd48e2fdf85d932a8e55603c10 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Thu, 2 Jul 2026 16:28:23 +0200 Subject: [PATCH 12/19] Route recording frames through recording sink Switch `RecordingManager` import to the new `services` package path and update recording flow to use an explicit recording sink callback. Recording now sets `multi_camera_controller.set_recording_sink(self._rec_manager.write_frame)` when starting and clears it on stop, replacing the previous direct `recording_frame_ready` signal hookup. --- dlclivegui/gui/main_window.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dlclivegui/gui/main_window.py b/dlclivegui/gui/main_window.py index afb4f0c..1770240 100644 --- a/dlclivegui/gui/main_window.py +++ b/dlclivegui/gui/main_window.py @@ -70,6 +70,7 @@ ) from ..services.dlc_processor import DLCLiveProcessor, PoseResult from ..services.multi_camera_controller import MultiCameraController, MultiFrameData, get_camera_id, get_display_id +from ..services.recording_manager import RecordingManager from ..utils.display import BBoxColors, compute_tile_info, create_tiled_frame, draw_bbox, draw_pose from ..utils.settings_store import DLCLiveGUISettingsStore, ModelPathStore from ..utils.stats import WorkerTimingStats, format_dlc_stats @@ -79,7 +80,6 @@ from .misc import layouts as lyts from .misc.drag_spinbox import ScrubSpinBox from .misc.eliding_label import ElidingPathLabel -from .recording_manager import RecordingManager from .theme import LOGO, LOGO_ALPHA, AppStyle, apply_theme logger = logging.getLogger("DLCLiveGUI") @@ -801,7 +801,7 @@ def _connect_signals(self) -> None: # Multi-camera controller signals (used for both single and multi-camera modes) self.multi_camera_controller.frame_ready.connect(self._on_multi_frame_processing_ready) self.multi_camera_controller.display_ready.connect(self._on_multi_frame_display_ready) - self.multi_camera_controller.recording_frame_ready.connect(self._on_recording_frame_ready) + # self.multi_camera_controller.recording_frame_ready.connect(self._on_recording_frame_ready) self.multi_camera_controller.all_started.connect(self._on_multi_camera_started) self.multi_camera_controller.all_stopped.connect(self._on_multi_camera_stopped) self.multi_camera_controller.camera_error.connect(self._on_multi_camera_error) @@ -1605,6 +1605,7 @@ def _start_multi_camera_recording(self) -> None: if run_dir is None: self._show_error("Failed to start recording.") return + self.multi_camera_controller.set_recording_sink(self._rec_manager.write_frame) self.multi_camera_controller.set_recording_frame_do_emit(True) self._settings_store.set_session_name(session_name) @@ -1629,6 +1630,7 @@ def _stop_multi_camera_recording(self) -> None: # Stop frame emission immediately so no new frames enter recording pipeline. try: self.multi_camera_controller.set_recording_frame_do_emit(False) + self.multi_camera_controller.set_recording_sink(None) except Exception: logger.exception("Failed to disable recording frame emission") From 688cf18f3df6b95247a7f937e553d6d26c372aa4 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Thu, 2 Jul 2026 16:28:39 +0200 Subject: [PATCH 13/19] Update recording manager test imports Adjust test fixtures and GUI recording manager tests to import `RecordingManager` and related module symbols from `dlclivegui.services.recording_manager` instead of the old `dlclivegui.gui.recording_manager` path. --- tests/conftest.py | 6 +++--- tests/gui/test_rec_manager.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index f04941e..6fda4be 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -325,7 +325,7 @@ def _fake_start_all(self, recording, active_cams, current_frames, **kwargs): run_dir.mkdir(parents=True, exist_ok=True) return run_dir - from dlclivegui.gui import recording_manager as rm_mod + from dlclivegui.services import recording_manager as rm_mod monkeypatch.setattr(rm_mod.RecordingManager, "start_all", _fake_start_all) return calls @@ -409,7 +409,7 @@ def recording_settings(app_config_two_cams): @pytest.fixture def patch_video_recorder(monkeypatch): - import dlclivegui.gui.recording_manager as rm_mod + import dlclivegui.services.recording_manager as rm_mod monkeypatch.setattr(rm_mod, "VideoRecorder", FakeVideoRecorder) return FakeVideoRecorder @@ -428,7 +428,7 @@ def _fake_write_frame(cam_id, frame, timestamp=None, timestamp_metadata=None): @pytest.fixture def patch_build_run_dir(monkeypatch, tmp_path): - import dlclivegui.gui.recording_manager as rm_mod + import dlclivegui.services.recording_manager as rm_mod spy = {"session_dir": None, "use_timestamp": None} run_dir = tmp_path / "videos" / "Sess_SANITIZED" / "run_TEST" diff --git a/tests/gui/test_rec_manager.py b/tests/gui/test_rec_manager.py index cf4bca2..6c81ac3 100644 --- a/tests/gui/test_rec_manager.py +++ b/tests/gui/test_rec_manager.py @@ -4,8 +4,8 @@ import pytest from dlclivegui.config import CameraSettings -from dlclivegui.gui.recording_manager import RecordingManager from dlclivegui.services.multi_camera_controller import get_camera_id, get_display_id +from dlclivegui.services.recording_manager import RecordingManager from dlclivegui.utils.stats import RecorderStats from dlclivegui.utils.timestamps import FrameTimestampMetadata @@ -214,7 +214,7 @@ def test_write_frame_uses_time_when_timestamp_missing( mgr = RecordingManager() mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess") - import dlclivegui.gui.recording_manager as rm_mod # noqa: E402 + import dlclivegui.services.recording_manager as rm_mod # noqa: E402 monkeypatch.setattr(rm_mod.time, "time", lambda: 999.0) From 04c97e97ba11bca6ac3a91ed758abda8923c3232 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Thu, 2 Jul 2026 17:31:45 +0200 Subject: [PATCH 14/19] Propagate capture metadata in single-camera flow Update the single-camera pipeline to use structured capture results from backend reads, extracting `frame`, `software_timestamp`, and `timestamp_metadata` and forwarding metadata through frame emission and recording writes. Recording queue handling now expects the metadata field as well. Preview transform helpers were also switched to reuse `SingleCameraWorker` crop/rotation methods instead of `MultiCameraController`. --- dlclivegui/gui/camera_config/preview.py | 5 +++-- dlclivegui/services/camera_controller.py | 9 ++++++--- dlclivegui/services/recording_manager.py | 12 ++++++++---- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/dlclivegui/gui/camera_config/preview.py b/dlclivegui/gui/camera_config/preview.py index bbd1aef..ef2c757 100644 --- a/dlclivegui/gui/camera_config/preview.py +++ b/dlclivegui/gui/camera_config/preview.py @@ -7,6 +7,7 @@ from PySide6.QtCore import QTimer +from ...services.camera_controller import SingleCameraWorker from ...services.multi_camera_controller import MultiCameraController if TYPE_CHECKING: @@ -56,7 +57,7 @@ class PreviewSession: def apply_rotation(frame, rotation): - return MultiCameraController.apply_rotation(frame, rotation) + return SingleCameraWorker.apply_rotation(frame, rotation) def apply_crop(frame, x0, y0, x1, y1): @@ -66,7 +67,7 @@ def apply_crop(frame, x0, y0, x1, y1): x1 = max(x0, min(x1, w)) y1 = max(y0, min(y1, h)) - return MultiCameraController.apply_crop(frame, (x0, y0, x1, y1)) + return SingleCameraWorker.apply_crop(frame, (x0, y0, x1, y1)) def resize_to_fit(frame, max_w=400, max_h=300): diff --git a/dlclivegui/services/camera_controller.py b/dlclivegui/services/camera_controller.py index 428503c..6aebda9 100644 --- a/dlclivegui/services/camera_controller.py +++ b/dlclivegui/services/camera_controller.py @@ -106,7 +106,10 @@ def run(self) -> None: while not self._stop_event.is_set(): try: with self._timing.measure("Single.read"): - frame, timestamp = self._backend.read() + captured = self._backend.read() + frame = captured.frame + timestamp = captured.software_timestamp + timestamp_metadata = captured.timestamp_metadata if frame is None or frame.size == 0: consecutive_errors += 1 if consecutive_errors >= self._max_consecutive_errors: @@ -129,12 +132,12 @@ def run(self) -> None: if recording_enabled and recording_sink is not None: try: with self._timing.measure("Single.recording_sink"): - recording_sink(self._camera_id, frame, timestamp) + recording_sink(self._camera_id, frame, timestamp, timestamp_metadata) except Exception as exc: logger.exception(f"Failed to write frame for camera {self._camera_id}: {exc}") with self._timing.measure("Single.emit"): - self.frame_captured.emit(self._camera_id, frame, timestamp) + self.frame_captured.emit(self._camera_id, frame, timestamp, timestamp_metadata) self._timing.note_frame() self._timing.maybe_log() diff --git a/dlclivegui/services/recording_manager.py b/dlclivegui/services/recording_manager.py index e08020c..48a720c 100644 --- a/dlclivegui/services/recording_manager.py +++ b/dlclivegui/services/recording_manager.py @@ -159,8 +159,8 @@ def _dispatch_loop(self) -> None: if item is _FRAME_SENTINEL: break - cam_id, frame, timestamp = item - self._write_frame_now(cam_id, frame, timestamp) + cam_id, frame, timestamp, timestamp_metadata = item + self._write_frame_now(cam_id, frame, timestamp, timestamp_metadata) finally: try: @@ -291,7 +291,9 @@ def stop_all(self) -> None: self._session_dir = None self._run_dir = None - def _write_frame_now(self, cam_id: str, frame: np.ndarray, timestamp: float | None = None, timestamp_metadata: object | None = None) -> None: + def _write_frame_now( + self, cam_id: str, frame: np.ndarray, timestamp: float | None = None, timestamp_metadata: object | None = None + ) -> None: with self._lock: rec = self._recorders.get(cam_id) @@ -323,7 +325,9 @@ def _write_frame_now(self, cam_id: str, frame: np.ndarray, timestamp: float | No except Exception: log.exception("Failed to stop recorder for %s after write error.", cam_id) - def write_frame(self, cam_id: str, frame: np.ndarray, timestamp: float | None = None, timestamp_metadata: object | None = None) -> None: + def write_frame( + self, cam_id: str, frame: np.ndarray, timestamp: float | None = None, timestamp_metadata: object | None = None + ) -> None: with self._lock: q = self._frame_queue active = cam_id in self._recorders From 57440254faee6cb746bb5571267b8931e7bcb6f8 Mon Sep 17 00:00:00 2001 From: C-Achard Date: Fri, 3 Jul 2026 16:12:48 +0200 Subject: [PATCH 15/19] Add timestamp metadata to frame signal Update `SingleCameraWorker.frame_captured` to emit a fourth argument for timestamp metadata alongside camera ID, frame, and timestamp. This extends the signal contract so downstream multi-camera consumers can receive richer timing context per frame. --- dlclivegui/services/camera_controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlclivegui/services/camera_controller.py b/dlclivegui/services/camera_controller.py index 6aebda9..5717353 100644 --- a/dlclivegui/services/camera_controller.py +++ b/dlclivegui/services/camera_controller.py @@ -25,7 +25,7 @@ class SingleCameraWorker(QObject): """Worker for a single camera in multi-camera mode.""" - frame_captured = Signal(str, object, float) # camera_id, frame, timestamp + frame_captured = Signal(str, object, float, object) # camera_id, frame, timestamp, timestamp_metadata error_occurred = Signal(str, str) # camera_id, error_message runtime_info = Signal(str, object) # camera_id, dict of runtime info started = Signal(str) # camera_id From a420b71cd135503cb08f255b60cf899f9cb01fa5 Mon Sep 17 00:00:00 2001 From: C-Achard Date: Fri, 3 Jul 2026 16:13:11 +0200 Subject: [PATCH 16/19] Comment previous signals --- dlclivegui/services/multi_camera_controller.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index d91f4ea..fdca4fd 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -125,9 +125,9 @@ class MultiCameraController(QObject): # Signals frame_ready = Signal(object) # MultiFrameData (full cam FPS; inference only) - recording_frame_ready = Signal( - str, object, float, object - ) # camera_id, frame, timestamp, timestamp_metadata (full cam FPS; for recording) + # recording_frame_ready = Signal( + # str, object, float, object + # ) # camera_id, frame, timestamp, timestamp_metadata (full cam FPS; for recording) display_ready = Signal(object) # MultiFrameData for GUI display (throttled to GUI_MAX_DISPLAY_FPS) camera_started = Signal(str, object) # camera_id, settings camera_stopped = Signal(str) # camera_id @@ -410,7 +410,7 @@ def _on_frame_captured( frame_data: MultiFrameData | None = None with timing.measure("Multi.slot.total"): - self._settings.get(camera_id) + # self._settings.get(camera_id) # with timing.measure("Multi.apply_transforms"): # if settings and settings.rotation: From 75dad9993f34548d933e564b4bb82a2b6cbc8b1c Mon Sep 17 00:00:00 2001 From: C-Achard Date: Fri, 3 Jul 2026 16:13:43 +0200 Subject: [PATCH 17/19] Fix dispatcher lifecycle and add flush API Refactors recording frame dispatching to start lazily in `write_frame`, simplifies the dispatch loop to block on queue reads, and makes dispatcher shutdown more reliable by waiting to enqueue the sentinel and only clearing thread/queue state when stopping the current thread. Adds `flush(timeout)` to wait for queued frames to be fully dispatched, improving control around recording stop/teardown behavior. --- dlclivegui/services/recording_manager.py | 77 ++++++++++++++++-------- 1 file changed, 51 insertions(+), 26 deletions(-) diff --git a/dlclivegui/services/recording_manager.py b/dlclivegui/services/recording_manager.py index 48a720c..8fb1d60 100644 --- a/dlclivegui/services/recording_manager.py +++ b/dlclivegui/services/recording_manager.py @@ -106,31 +106,28 @@ def pop(self, cam_id: str, default=None) -> VideoRecorder | None: return self._recorders.pop(cam_id, default) def _start_dispatcher(self) -> None: - with self._lock: - if self._dispatch_thread is not None and self._dispatch_thread.is_alive(): - return + if self._dispatch_thread is not None and self._dispatch_thread.is_alive(): + return - self._dispatch_stop.clear() - self._frame_queue = queue.Queue(maxsize=4096) - self._dispatch_thread = threading.Thread( - target=self._dispatch_loop, - name="RecordingManagerDispatcher", - daemon=True, - ) - self._dispatch_thread.start() + self._dispatch_stop.clear() + self._frame_queue = queue.Queue(maxsize=4096) + self._dispatch_thread = threading.Thread( + target=self._dispatch_loop, + name="RecordingManagerDispatcher", + daemon=True, + ) + self._dispatch_thread.start() def _stop_dispatcher(self, timeout: float = 2.0) -> None: - self._dispatch_stop.set() - with self._lock: q = self._frame_queue t = self._dispatch_thread if q is not None: try: - q.put_nowait(_FRAME_SENTINEL) + q.put(_FRAME_SENTINEL, block=True, timeout=timeout) except queue.Full: - pass + log.warning("Recording frame queue full while stopping dispatcher; dispatcher may not stop promptly.") if t is not None: t.join(timeout=timeout) @@ -138,8 +135,9 @@ def _stop_dispatcher(self, timeout: float = 2.0) -> None: log.warning("Recording frame dispatcher did not stop within %.1fs", timeout) with self._lock: - self._dispatch_thread = None - self._frame_queue = None + if self._dispatch_thread is t: + self._dispatch_thread = None + self._frame_queue = None self._dispatch_stop.clear() def _dispatch_loop(self) -> None: @@ -149,11 +147,8 @@ def _dispatch_loop(self) -> None: if q is None: return - while not self._dispatch_stop.is_set(): - try: - item = q.get(timeout=0.1) - except queue.Empty: - continue + while True: + item = q.get() try: if item is _FRAME_SENTINEL: @@ -270,7 +265,6 @@ def start_all( self._run_dir = None return None - self._start_dispatcher() return run_dir def stop_all(self) -> None: @@ -326,13 +320,23 @@ def _write_frame_now( log.exception("Failed to stop recorder for %s after write error.", cam_id) def write_frame( - self, cam_id: str, frame: np.ndarray, timestamp: float | None = None, timestamp_metadata: object | None = None + self, + cam_id: str, + frame: np.ndarray, + timestamp: float | None = None, + timestamp_metadata: object | None = None, ) -> None: with self._lock: - q = self._frame_queue active = cam_id in self._recorders + if not active: + return + + if self._frame_queue is None or self._dispatch_thread is None or not self._dispatch_thread.is_alive(): + self._start_dispatcher() - if not active or q is None: + q = self._frame_queue + + if q is None: return try: @@ -345,6 +349,27 @@ def write_frame( getattr(frame, "dtype", None), ) + def flush(self, timeout: float = 2.0) -> bool: + """Wait until all currently queued recording frames have been dispatched. + + Returns True if the queue drained before timeout, False otherwise. + """ + with self._lock: + q = self._frame_queue + + if q is None: + return True + + done = threading.Event() + + def waiter() -> None: + q.join() + done.set() + + t = threading.Thread(target=waiter, name="RecordingManagerFlush", daemon=True) + t.start() + return done.wait(timeout) + def get_stats_summary(self) -> str: totals = { "enqueued": 0, From e9504bd77554857fd709eff10fa60dc7a3948594 Mon Sep 17 00:00:00 2001 From: C-Achard Date: Fri, 3 Jul 2026 16:19:52 +0200 Subject: [PATCH 18/19] Update tests for CapturedFrame integration Adjust camera backend and factory tests to return `CapturedFrame` objects instead of `(frame, timestamp)` tuples, matching the updated camera API. Extend `tests/conftest.py` fake DLCLive doubles with runner, processing, and post-processing behavior so `DLCLiveProcessor._process_frame` paths are exercised under the new flow. --- tests/cameras/test_backend_discovery.py | 4 ++-- tests/cameras/test_factory.py | 19 ++++++++-------- tests/cameras/test_fake_backend.py | 2 +- tests/conftest.py | 29 +++++++++++++++++++++++++ 4 files changed, 42 insertions(+), 12 deletions(-) diff --git a/tests/cameras/test_backend_discovery.py b/tests/cameras/test_backend_discovery.py index 610a90c..0b86e95 100644 --- a/tests/cameras/test_backend_discovery.py +++ b/tests/cameras/test_backend_discovery.py @@ -26,7 +26,7 @@ def _write_temp_backend_package(tmp_path: Path, pkg_name: str = "test_backends_p # A backend module which registers itself as "lazyfake" backend_code = textwrap.dedent( """ - from dlclivegui.cameras.base import register_backend, CameraBackend + from dlclivegui.cameras.base import register_backend, CameraBackend, CapturedFrame from dlclivegui.config import CameraSettings import numpy as np import time @@ -44,7 +44,7 @@ def open(self) -> None: def read(self): # Small deterministic frame + timestamp frame = np.zeros((2, 3, 3), dtype=np.uint8) - return frame, time.time() + return CapturedFrame(frame, time.time(), None) def close(self) -> None: self._opened = False diff --git a/tests/cameras/test_factory.py b/tests/cameras/test_factory.py index cc1d798..43516b4 100644 --- a/tests/cameras/test_factory.py +++ b/tests/cameras/test_factory.py @@ -3,6 +3,7 @@ import pytest from dlclivegui.cameras import CameraFactory, DetectedCamera, base +from dlclivegui.cameras.base import CapturedFrame from dlclivegui.config import CameraSettings @@ -69,7 +70,7 @@ def open(self): raise AssertionError("Probing path should not open when rich discovery returns a list") def read(self): - return None, 0.0 + return CapturedFrame(None, 0.0, None) def close(self): pass @@ -112,7 +113,7 @@ def open(self): pass def read(self): - return None, 0.0 + return CapturedFrame(None, 0.0, None) def close(self): pass @@ -150,7 +151,7 @@ def open(self): raise RuntimeError("no device") def read(self): - return None, 0.0 + return CapturedFrame(None, 0.0, None) def close(self): pass @@ -182,7 +183,7 @@ def open(self): raise RuntimeError("no device") def read(self): - return None, 0.0 + return CapturedFrame(None, 0.0, None) def close(self): pass @@ -220,7 +221,7 @@ def open(self): pass def read(self): - return None, 0.0 + return CapturedFrame(None, 0.0, None) def close(self): pass @@ -252,7 +253,7 @@ def open(self): pass def read(self): - return None, 0.0 + return CapturedFrame(None, 0.0, None) def close(self): pass @@ -280,7 +281,7 @@ def open(self): pass def read(self): - return None, 0.0 + return CapturedFrame(None, 0.0, None) def close(self): pass @@ -311,7 +312,7 @@ def open(self): pass def read(self): - return None, 0.0 + return CapturedFrame(None, 0.0, None) def close(self): pass @@ -341,7 +342,7 @@ def open(self): raise RuntimeError("no device") def read(self): - return None, 0.0 + return CapturedFrame(None, 0.0, None) def close(self): pass diff --git a/tests/cameras/test_fake_backend.py b/tests/cameras/test_fake_backend.py index d85616b..eac6e60 100644 --- a/tests/cameras/test_fake_backend.py +++ b/tests/cameras/test_fake_backend.py @@ -26,7 +26,7 @@ def open(self): def read(self): assert self._opened img = np.zeros((10, 20, 3), dtype=np.uint8) - return img, 123.456 + return base.CapturedFrame(img, 123.456, None) def close(self): self._opened = False diff --git a/tests/conftest.py b/tests/conftest.py index 6fda4be..3eaf353 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -154,6 +154,18 @@ def _factory(settings: CameraSettings): # --------------------------------------------------------------------- # Test doubles # --------------------------------------------------------------------- +class FakeRunner: + """Minimal fake DLCLive runner used by DLCLiveProcessor._process_frame.""" + + def __init__(self, parent): + self._parent = parent + self.device = "cpu" + self.model = None + self.net = None + + def get_pose(self, processed_frame): + self._parent.pose_calls += 1 + return np.ones((2, 3), dtype=float) class FakeDLCLive: @@ -163,14 +175,31 @@ def __init__(self, **opts): self.opts = opts self.init_called = False self.pose_calls = 0 + self.process_frame_calls = 0 + + self.processor = opts.get("processor") + self.cfg = {"fake": True} + self.runner = FakeRunner(self) + self.pose = None def init_inference(self, frame): self.init_called = True + def process_frame(self, frame): + self.process_frame_calls += 1 + return frame + def get_pose(self, frame, frame_time=None): + # Keep this for compatibility with older tests, but production code now + # uses self.runner.get_pose(...). self.pose_calls += 1 return np.ones((2, 3), dtype=float) + def _post_process_pose(self, processed_frame, frame_time=None): + if self.pose is None: + self.pose = self.runner.get_pose(processed_frame) + return self.pose + @pytest.fixture def fake_dlclive_factory(): From 5fd2a4fb372604b647547380fca317a1e89c7357 Mon Sep 17 00:00:00 2001 From: C-Achard Date: Fri, 3 Jul 2026 16:20:16 +0200 Subject: [PATCH 19/19] Stabilize recording and camera tests Update tests to match recent recording/capture API changes and reduce flakiness. Camera dialog E2E stubs now return `CapturedFrame`, multicam tests validate the new recording sink path (including timestamp metadata forwarding), and recording manager tests consistently clean up with `stop_all()` plus queue flushes before assertions. GUI overlay tests tied to removed behavior are now skipped, and config tests now expect numeric `-input_framerate` values instead of formatted strings. --- .../gui/camera_config/test_cam_dialog_e2e.py | 6 +- tests/gui/test_pose_overlay.py | 2 + tests/gui/test_rec_manager.py | 474 ++++++++++-------- tests/services/test_multicam_controller.py | 113 +++-- tests/test_config.py | 6 +- 5 files changed, 344 insertions(+), 257 deletions(-) diff --git a/tests/gui/camera_config/test_cam_dialog_e2e.py b/tests/gui/camera_config/test_cam_dialog_e2e.py index df1c357..9556efc 100644 --- a/tests/gui/camera_config/test_cam_dialog_e2e.py +++ b/tests/gui/camera_config/test_cam_dialog_e2e.py @@ -8,7 +8,7 @@ from PySide6.QtCore import Qt from PySide6.QtWidgets import QMessageBox -from dlclivegui.cameras.base import CameraBackend +from dlclivegui.cameras.base import CameraBackend, CapturedFrame from dlclivegui.cameras.factory import CameraFactory, DetectedCamera from dlclivegui.config import CameraSettings, MultiCameraSettings from dlclivegui.gui.camera_config.camera_config_dialog import CameraConfigDialog @@ -194,7 +194,7 @@ def close(self): self._opened = False def read(self): - return np.zeros((30, 40, 3), dtype=np.uint8), 0.1 + return CapturedFrame(np.zeros((30, 40, 3), dtype=np.uint8), 0.1, None) CountingBackend.opens = 0 monkeypatch.setattr(CameraFactory, "create", staticmethod(lambda s: CountingBackend(s))) @@ -238,7 +238,7 @@ def close(self): self._opened = False def read(self): - return np.zeros((30, 40, 3), dtype=np.uint8), 0.1 + return CapturedFrame(np.zeros((30, 40, 3), dtype=np.uint8), 0.1, None) CountingBackend.opens = 0 monkeypatch.setattr(CameraFactory, "create", staticmethod(lambda s: CountingBackend(s))) diff --git a/tests/gui/test_pose_overlay.py b/tests/gui/test_pose_overlay.py index 511d445..bef210b 100644 --- a/tests/gui/test_pose_overlay.py +++ b/tests/gui/test_pose_overlay.py @@ -9,6 +9,7 @@ def stop(self): @pytest.mark.gui @pytest.mark.timeout(10) +@pytest.mark.skip("Removed functionality.") def test_record_overlay_uses_identity_transform_for_per_camera_recording(window, draw_pose_stub): # Disable event timers to avoid GUI rendering pipelines interfering with test window._display_timer.stop() @@ -47,6 +48,7 @@ def test_record_overlay_uses_identity_transform_for_per_camera_recording(window, @pytest.mark.gui @pytest.mark.timeout(10) +@pytest.mark.skip("Removed functionality.") def test_record_overlay_toggle_affects_frames_sent_to_recorder(window, recording_frame_spy, draw_pose_stub): # Disable event timers to avoid GUI rendering pipelines interfering with test window._display_timer.stop() diff --git a/tests/gui/test_rec_manager.py b/tests/gui/test_rec_manager.py index 6c81ac3..df36f08 100644 --- a/tests/gui/test_rec_manager.py +++ b/tests/gui/test_rec_manager.py @@ -43,37 +43,40 @@ def test_start_all_creates_recorders_and_returns_run_dir( spy, expected_run_dir = patch_build_run_dir mgr = RecordingManager() - run_dir = mgr.start_all( - recording_settings, - _active_cams_two, - current_frames, - session_name="Sess", - use_timestamp=True, - all_or_nothing=False, - ) - - assert run_dir == expected_run_dir - assert mgr.is_active is True - assert mgr.run_dir == expected_run_dir - assert mgr.session_dir is not None - assert len(mgr.recorders) == 2 - - # build_run_dir called with correct use_timestamp - assert spy["use_timestamp"] is True - assert spy["session_dir"] is not None + try: + run_dir = mgr.start_all( + recording_settings, + _active_cams_two, + current_frames, + session_name="Sess", + use_timestamp=True, + all_or_nothing=False, + ) - # Validate per-cam recorder construction - for cam in _active_cams_two: - cam_id = get_camera_id(cam) - rec = mgr.recorders[cam_id] - assert rec.codec == recording_settings.codec - assert rec.crf == recording_settings.crf - assert rec.frame_rate == float(cam.fps) - assert rec.is_running is True - # output file should be inside run dir - assert rec.output.parent == expected_run_dir - # filename should include backend + cam index - assert f"_{cam.backend}_cam{cam.index}" in rec.output.name + assert run_dir == expected_run_dir + assert mgr.is_active is True + assert mgr.run_dir == expected_run_dir + assert mgr.session_dir is not None + assert len(mgr.recorders) == 2 + + # build_run_dir called with correct use_timestamp + assert spy["use_timestamp"] is True + assert spy["session_dir"] is not None + + # Validate per-cam recorder construction + for cam in _active_cams_two: + cam_id = get_camera_id(cam) + rec = mgr.recorders[cam_id] + assert rec.codec == recording_settings.codec + assert rec.crf == recording_settings.crf + assert rec.frame_rate == float(cam.fps) + assert rec.is_running is True + # output file should be inside run dir + assert rec.output.parent == expected_run_dir + # filename should include backend + cam index + assert f"_{cam.backend}_cam{cam.index}" in rec.output.name + finally: + mgr.stop_all() @pytest.mark.unit @@ -83,8 +86,11 @@ def test_start_all_passes_use_timestamp_flag( spy, _expected_run_dir = patch_build_run_dir mgr = RecordingManager() - mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess", use_timestamp=False) - assert spy["use_timestamp"] is False + try: + mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess", use_timestamp=False) + assert spy["use_timestamp"] is False + finally: + mgr.stop_all() @pytest.mark.unit @@ -92,14 +98,18 @@ def test_frame_size_is_inferred_from_current_frames( recording_settings, _active_cams_two, current_frames, patch_video_recorder, patch_build_run_dir ): mgr = RecordingManager() - mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess") - # cam0 -> 480x640, cam1 -> 720x1280 - for cam in _active_cams_two: - cam_id = get_camera_id(cam) - rec = mgr.recorders[cam_id] - frame = current_frames[cam_id] - assert rec.frame_size == (frame.shape[0], frame.shape[1]) + try: + mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess") + + # cam0 -> 480x640, cam1 -> 720x1280 + for cam in _active_cams_two: + cam_id = get_camera_id(cam) + rec = mgr.recorders[cam_id] + frame = current_frames[cam_id] + assert rec.frame_size == (frame.shape[0], frame.shape[1]) + finally: + mgr.stop_all() @pytest.mark.unit @@ -111,10 +121,14 @@ def test_missing_frame_results_in_none_frame_size( current_frames.pop(cam1_id) mgr = RecordingManager() - mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess") - rec1 = mgr.recorders[cam1_id] - assert rec1.frame_size is None + try: + mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess") + + rec1 = mgr.recorders[cam1_id] + assert rec1.frame_size is None + finally: + mgr.stop_all() @pytest.mark.unit @@ -175,6 +189,7 @@ def start_with_failure(self): assert mgr.session_dir is None finally: patch_video_recorder.start = original_start + mgr.stop_all() @pytest.mark.unit @@ -197,14 +212,19 @@ def test_write_frame_uses_given_timestamp( recording_settings, _active_cams_two, current_frames, patch_video_recorder, patch_build_run_dir ): mgr = RecordingManager() - mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess") - cam0_id = get_camera_id(_active_cams_two[0]) - frame = current_frames[cam0_id] - mgr.write_frame(cam0_id, frame, timestamp=123.0) + try: + mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess") - rec = mgr.recorders[cam0_id] - assert rec.write_calls[-1][1] == 123.0 + cam0_id = get_camera_id(_active_cams_two[0]) + frame = current_frames[cam0_id] + mgr.write_frame(cam0_id, frame, timestamp=123.0) + assert mgr.flush(timeout=2.0) + + rec = mgr.recorders[cam0_id] + assert rec.write_calls[-1][1] == 123.0 + finally: + mgr.stop_all() @pytest.mark.unit @@ -212,18 +232,23 @@ def test_write_frame_uses_time_when_timestamp_missing( recording_settings, _active_cams_two, current_frames, patch_video_recorder, patch_build_run_dir, monkeypatch ): mgr = RecordingManager() - mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess") - import dlclivegui.services.recording_manager as rm_mod # noqa: E402 + try: + mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess") - monkeypatch.setattr(rm_mod.time, "time", lambda: 999.0) + import dlclivegui.services.recording_manager as rm_mod # noqa: E402 - cam0_id = get_camera_id(_active_cams_two[0]) - frame = current_frames[cam0_id] - mgr.write_frame(cam0_id, frame, timestamp=None) + monkeypatch.setattr(rm_mod.time, "time", lambda: 999.0) + + cam0_id = get_camera_id(_active_cams_two[0]) + frame = current_frames[cam0_id] + mgr.write_frame(cam0_id, frame, timestamp=None) + assert mgr.flush(timeout=2.0) - rec = mgr.recorders[cam0_id] - assert rec.write_calls[-1][1] == 999.0 + rec = mgr.recorders[cam0_id] + assert rec.write_calls[-1][1] == 999.0 + finally: + mgr.stop_all() @pytest.mark.unit @@ -231,14 +256,20 @@ def test_write_frame_removes_recorder_on_exception( recording_settings, _active_cams_two, current_frames, patch_video_recorder, patch_build_run_dir ): mgr = RecordingManager() - mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess") - cam0_id = get_camera_id(_active_cams_two[0]) - rec = mgr.recorders[cam0_id] - rec.raise_on_write = True + try: + mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess") - mgr.write_frame(cam0_id, current_frames[cam0_id], timestamp=1.0) - assert cam0_id not in mgr.recorders + cam0_id = get_camera_id(_active_cams_two[0]) + rec = mgr.recorders[cam0_id] + rec.raise_on_write = True + + mgr.write_frame(cam0_id, current_frames[cam0_id], timestamp=1.0) + assert mgr.flush(timeout=2.0) + + assert cam0_id not in mgr.recorders + finally: + mgr.stop_all() @pytest.mark.unit @@ -246,17 +277,21 @@ def test_get_stats_summary_single_recorder_uses_formatter( recording_settings, _active_cams_two, current_frames, patch_video_recorder, patch_build_run_dir, monkeypatch ): mgr = RecordingManager() - mgr.start_all(recording_settings, [_active_cams_two[0]], current_frames, session_name="Sess") - cam0_id = get_camera_id(_active_cams_two[0]) - mgr.recorders[cam0_id]._stats = RecorderStats(frames_written=10, frames_enqueued=12) + try: + mgr.start_all(recording_settings, [_active_cams_two[0]], current_frames, session_name="Sess") + + cam0_id = get_camera_id(_active_cams_two[0]) + mgr.recorders[cam0_id]._stats = RecorderStats(frames_written=10, frames_enqueued=12) - # Patch formatter to avoid depending on formatting implementation - import dlclivegui.utils.stats as stats_mod + # Patch formatter to avoid depending on formatting implementation + import dlclivegui.utils.stats as stats_mod - monkeypatch.setattr(stats_mod, "format_recorder_stats", lambda s: "OK_SINGLE") + monkeypatch.setattr(stats_mod, "format_recorder_stats", lambda s: "OK_SINGLE") - assert mgr.get_stats_summary() == "OK_SINGLE" + assert mgr.get_stats_summary() == "OK_SINGLE" + finally: + mgr.stop_all() @pytest.mark.unit @@ -264,39 +299,43 @@ def test_get_stats_summary_multi_aggregates( recording_settings, _active_cams_two, current_frames, patch_video_recorder, patch_build_run_dir ): mgr = RecordingManager() - mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess") - ids = [get_camera_id(c) for c in _active_cams_two] - - mgr.recorders[ids[0]]._stats = RecorderStats( - frames_enqueued=12, - frames_written=10, - dropped_frames=1, - queue_size=2, - buffer_size=10, - average_latency=0.01, - last_latency=0.02, - write_fps=25.0, - ) - mgr.recorders[ids[1]]._stats = RecorderStats( - frames_enqueued=24, - frames_written=20, - dropped_frames=3, - queue_size=4, - buffer_size=10, - average_latency=0.03, - last_latency=0.05, - write_fps=30.0, - ) - - summary = mgr.get_stats_summary() - - assert "2 cams" in summary - assert "30/36 frames" in summary - assert "writer 55.0 fps" in summary - assert "dropped 4" in summary - assert "queue 6/20" in summary - assert "backlog 6" in summary + try: + mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess") + + ids = [get_camera_id(c) for c in _active_cams_two] + + mgr.recorders[ids[0]]._stats = RecorderStats( + frames_enqueued=12, + frames_written=10, + dropped_frames=1, + queue_size=2, + buffer_size=10, + average_latency=0.01, + last_latency=0.02, + write_fps=25.0, + ) + mgr.recorders[ids[1]]._stats = RecorderStats( + frames_enqueued=24, + frames_written=20, + dropped_frames=3, + queue_size=4, + buffer_size=10, + average_latency=0.03, + last_latency=0.05, + write_fps=30.0, + ) + + summary = mgr.get_stats_summary() + + assert "2 cams" in summary + assert "30/36 frames" in summary + assert "writer 55.0 fps" in summary + assert "dropped 4" in summary + assert "queue 6/20" in summary + assert "backlog 6" in summary + finally: + mgr.stop_all() @pytest.mark.unit @@ -307,51 +346,58 @@ def test_recording_manager_uses_stable_camera_id_not_display_id( ): mgr = RecordingManager() - cam = CameraSettings( - name="GenTL cam", - backend="gentl", - index=0, - fps=30.0, - enabled=True, - properties={ - "gentl": { - "device_id": "serial:SER0", - "serial_number": "SER0", - } - }, - ).apply_defaults() - - stable_id = get_camera_id(cam) - display_id = get_display_id(cam) - - assert stable_id == "gentl:serial:SER0" - assert display_id == "GenTL cam" - assert stable_id != display_id - - frame = np.zeros((480, 640, 3), dtype=np.uint8) - current_frames = {stable_id: frame} - - run_dir = mgr.start_all( - recording_settings, - [cam], - current_frames, - session_name="Sess", - ) + try: + cam = CameraSettings( + name="GenTL cam", + backend="gentl", + index=0, + fps=30.0, + enabled=True, + properties={ + "gentl": { + "device_id": "serial:SER0", + "serial_number": "SER0", + } + }, + ).apply_defaults() + + stable_id = get_camera_id(cam) + display_id = get_display_id(cam) + + assert stable_id == "gentl:serial:SER0" + assert display_id == "GenTL cam" + assert stable_id != display_id + + frame = np.zeros((480, 640, 3), dtype=np.uint8) + current_frames = {stable_id: frame} - assert run_dir is not None - assert stable_id in mgr.recorders - assert display_id not in mgr.recorders + run_dir = mgr.start_all( + recording_settings, + [cam], + current_frames, + session_name="Sess", + ) + + assert run_dir is not None + assert stable_id in mgr.recorders + assert display_id not in mgr.recorders - rec = mgr.recorders[stable_id] - assert rec.frame_size == (480, 640) + rec = mgr.recorders[stable_id] + assert rec.frame_size == (480, 640) + + mgr.write_frame(stable_id, frame, timestamp=123.0) + assert mgr.flush(timeout=2.0) + + assert len(rec.write_calls) == 1 + assert rec.write_calls[-1][1] == 123.0 - mgr.write_frame(stable_id, frame, timestamp=123.0) - assert len(rec.write_calls) == 1 - assert rec.write_calls[-1][1] == 123.0 + # Display ID is GUI-only and must not route frames internally. + mgr.write_frame(display_id, frame, timestamp=456.0) + assert mgr.flush(timeout=2.0) - # Display ID is GUI-only and must not route frames internally. - mgr.write_frame(display_id, frame, timestamp=456.0) - assert len(rec.write_calls) == 1 + assert len(rec.write_calls) == 1 + finally: + mgr.stop_all() @pytest.mark.unit @@ -362,41 +408,44 @@ def test_start_all_does_not_infer_frame_size_from_display_id( ): mgr = RecordingManager() - cam = CameraSettings( - name="GenTL cam", - backend="gentl", - index=0, - fps=30.0, - enabled=True, - properties={ - "gentl": { - "device_id": "serial:SER0", - "serial_number": "SER0", - } - }, - ).apply_defaults() - - stable_id = get_camera_id(cam) - display_id = get_display_id(cam) - - frame = np.zeros((480, 640, 3), dtype=np.uint8) - - # Simulate the buggy situation: frames are keyed by display ID. - current_frames = {display_id: frame} - - mgr.start_all( - recording_settings, - [cam], - current_frames, - session_name="Sess", - ) + try: + cam = CameraSettings( + name="GenTL cam", + backend="gentl", + index=0, + fps=30.0, + enabled=True, + properties={ + "gentl": { + "device_id": "serial:SER0", + "serial_number": "SER0", + } + }, + ).apply_defaults() + + stable_id = get_camera_id(cam) + display_id = get_display_id(cam) + + frame = np.zeros((480, 640, 3), dtype=np.uint8) + + # Simulate the buggy situation: frames are keyed by display ID. + current_frames = {display_id: frame} + + mgr.start_all( + recording_settings, + [cam], + current_frames, + session_name="Sess", + ) - assert stable_id in mgr.recorders - assert display_id not in mgr.recorders + assert stable_id in mgr.recorders + assert display_id not in mgr.recorders - # Since RecordingManager uses stable IDs internally, it should not find this frame. - rec = mgr.recorders[stable_id] - assert rec.frame_size is None + # Since RecordingManager uses stable IDs internally, it should not find this frame. + rec = mgr.recorders[stable_id] + assert rec.frame_size is None + finally: + mgr.stop_all() @pytest.mark.unit @@ -412,17 +461,21 @@ def test_start_all_passes_writegear_options( recording_settings.fast_encoding = True mgr = RecordingManager() - mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess") - for cam in _active_cams_two: - cam_id = get_camera_id(cam) - rec = mgr.recorders[cam_id] + try: + mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess") - assert rec.writer_options is not None - assert rec.writer_options["-vcodec"] == "libx264" - assert rec.writer_options["-crf"] == "23" - assert rec.writer_options["-preset"] == "ultrafast" - assert rec.writer_options["-tune"] == "zerolatency" + for cam in _active_cams_two: + cam_id = get_camera_id(cam) + rec = mgr.recorders[cam_id] + + assert rec.writer_options is not None + assert rec.writer_options["-vcodec"] == "libx264" + assert rec.writer_options["-crf"] == "23" + assert rec.writer_options["-preset"] == "ultrafast" + assert rec.writer_options["-tune"] == "zerolatency" + finally: + mgr.stop_all() class TestRecordingManagerTimestampMetadata: @@ -436,28 +489,33 @@ def test_write_frame_passes_timestamp_metadata( patch_build_run_dir, ): mgr = RecordingManager() - mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess") - - cam0_id = get_camera_id(_active_cams_two[0]) - frame = current_frames[cam0_id] - - meta = FrameTimestampMetadata( - source="grab_result.GetTimeStamp", - backend="basler", - default_reported="seconds", - seconds=0.001, - raw_value=1_000_000, - raw_unit="ticks", - tick_frequency_hz=1_000_000_000.0, - kind="camera_clock", - ) - - mgr.write_frame(cam0_id, frame, timestamp=123.0, timestamp_metadata=meta) - - rec = mgr.recorders[cam0_id] - assert len(rec.write_calls) == 1 - written_frame, written_timestamp, written_metadata = rec.write_calls[0] - assert written_frame is frame - assert written_timestamp == 123.0 - assert written_metadata is meta + try: + mgr.start_all(recording_settings, _active_cams_two, current_frames, session_name="Sess") + + cam0_id = get_camera_id(_active_cams_two[0]) + frame = current_frames[cam0_id] + + meta = FrameTimestampMetadata( + source="grab_result.GetTimeStamp", + backend="basler", + default_reported="seconds", + seconds=0.001, + raw_value=1_000_000, + raw_unit="ticks", + tick_frequency_hz=1_000_000_000.0, + kind="camera_clock", + ) + + mgr.write_frame(cam0_id, frame, timestamp=123.0, timestamp_metadata=meta) + assert mgr.flush(timeout=2.0) + + rec = mgr.recorders[cam0_id] + assert len(rec.write_calls) == 1 + + written_frame, written_timestamp, written_metadata = rec.write_calls[0] + assert written_frame is frame + assert written_timestamp == 123.0 + assert written_metadata is meta + finally: + mgr.stop_all() diff --git a/tests/services/test_multicam_controller.py b/tests/services/test_multicam_controller.py index 4eafbda..056886c 100644 --- a/tests/services/test_multicam_controller.py +++ b/tests/services/test_multicam_controller.py @@ -505,7 +505,7 @@ def _create(settings): @pytest.mark.unit -def test_recording_frame_ready_only_emits_when_enabled(qtbot, patch_factory): +def test_recording_sink_receives_frames_when_enabled(qtbot, patch_factory): mc = MultiCameraController() cam = CameraSettings( @@ -517,26 +517,25 @@ def test_recording_frame_ready_only_emits_when_enabled(qtbot, patch_factory): ).apply_defaults() cam_id = get_camera_id(cam) - seen: list[tuple[str, tuple, float]] = [] + seen: list[tuple[str, tuple, float, object]] = [] - def on_recording_frame(camera_id, frame, timestamp, timestamp_metadata=None): - seen.append((camera_id, frame.shape, timestamp)) - - mc.recording_frame_ready.connect(on_recording_frame) + def sink(camera_id, frame, timestamp, timestamp_metadata=None): + seen.append((camera_id, frame.shape, timestamp, timestamp_metadata)) try: with qtbot.waitSignal(mc.all_started, timeout=1500): mc.start([cam]) - # Disabled by default: should not emit recording frames. + # Disabled by default. qtbot.wait(300) assert seen == [] + mc.set_recording_sink(sink) mc.set_recording_frame_do_emit(True) qtbot.waitUntil(lambda: bool(seen), timeout=2000) - camera_id, shape, timestamp = seen[-1] + camera_id, shape, timestamp, timestamp_metadata = seen[-1] assert camera_id == cam_id assert isinstance(timestamp, float) assert len(shape) in (2, 3) @@ -552,48 +551,76 @@ def on_recording_frame(camera_id, frame, timestamp, timestamp_metadata=None): mc.stop(wait=True) -class TestRecordingFrameTimestamps: - @pytest.mark.unit - def test_recording_frame_ready_forwards_timestamp_metadata(self, qtbot): - mc = MultiCameraController() - mc._running = True - mc._recording_frame_emission_enabled = True +@pytest.mark.unit +def test_recording_sink_forwards_timestamp_metadata(qtbot, monkeypatch): + from dlclivegui.cameras.base import CapturedFrame + from dlclivegui.cameras.factory import CameraFactory + + meta = FrameTimestampMetadata( + source="grab_result.GetTimeStamp", + backend="basler", + default_reported="seconds", + seconds=0.001, + raw_value=1_000_000, + raw_unit="ticks", + tick_frequency_hz=1_000_000_000.0, + kind="camera_clock", + ) + + class TimestampBackend: + waits_for_hardware_trigger = False + + def __init__(self, settings): + self.settings = settings + self._count = 0 + + def open(self): + pass + + def read(self): + self._count += 1 + return CapturedFrame( + frame=np.zeros((10, 10), dtype=np.uint8), + software_timestamp=123.0 + self._count, + timestamp_metadata=meta, + ) + + def close(self): + pass - cam_id = "basler:0815-0000" - mc._settings[cam_id] = CameraSettings( - name="C", - backend="basler", - index=0, - enabled=True, - ).apply_defaults() - mc._camera_display_order = [cam_id] - mc._display_ids[cam_id] = "C" + monkeypatch.setattr(CameraFactory, "create", staticmethod(lambda settings: TimestampBackend(settings))) - frame = np.zeros((10, 10), dtype=np.uint8) - meta = FrameTimestampMetadata( - source="grab_result.GetTimeStamp", - backend="basler", - default_reported="seconds", - seconds=0.001, - raw_value=1_000_000, - raw_unit="ticks", - tick_frequency_hz=1_000_000_000.0, - kind="camera_clock", - ) + mc = MultiCameraController() + cam = CameraSettings( + name="C", + backend="basler", + index=0, + enabled=True, + properties={"basler": {"device_id": "0815-0000"}}, + ).apply_defaults() - seen = [] + cam_id = get_camera_id(cam) + seen = [] - def on_recording_frame(camera_id, emitted_frame, timestamp, timestamp_metadata): - seen.append((camera_id, emitted_frame, timestamp, timestamp_metadata)) + def sink(camera_id, frame, timestamp, timestamp_metadata=None): + seen.append((camera_id, frame, timestamp, timestamp_metadata)) - mc.recording_frame_ready.connect(on_recording_frame) + try: + with qtbot.waitSignal(mc.all_started, timeout=1500): + mc.start([cam]) - mc._on_frame_captured(cam_id, frame, 123.0, meta) + # Recording is disabled by start(); enable the new sink path after cameras are running. + mc.set_recording_sink(sink) + mc.set_recording_frame_do_emit(True) - assert len(seen) == 1 + qtbot.waitUntil(lambda: bool(seen), timeout=2000) - camera_id, emitted_frame, timestamp, timestamp_metadata = seen[0] + camera_id, frame, timestamp, timestamp_metadata = seen[-1] assert camera_id == cam_id - assert emitted_frame is frame - assert timestamp == 123.0 + assert frame.shape == (10, 10) + assert isinstance(timestamp, float) assert timestamp_metadata is meta + + finally: + with qtbot.waitSignal(mc.all_stopped, timeout=2000): + mc.stop(wait=True) diff --git a/tests/test_config.py b/tests/test_config.py index 63b387b..9cb00e7 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -54,7 +54,7 @@ def test_recording_settings_writegear_options_default(): opts = settings.writegear_options(100.0) - assert opts["-input_framerate"] == "100.000000" + assert opts["-input_framerate"] == 100.0 assert opts["-vcodec"] == "libx264" assert opts["-crf"] == "23" assert "-preset" not in opts @@ -66,7 +66,7 @@ def test_recording_settings_writegear_options_fast_encoding_x264(): opts = settings.writegear_options(100.0) - assert opts["-input_framerate"] == "100.000000" + assert opts["-input_framerate"] == 100.0 assert opts["-vcodec"] == "libx264" assert opts["-crf"] == "23" assert opts["-preset"] == "ultrafast" @@ -88,4 +88,4 @@ def test_recording_settings_writegear_options_invalid_fps_falls_back_to_30(): opts = settings.writegear_options(None) - assert opts["-input_framerate"] == "30.000000" + assert opts["-input_framerate"] == 30.0