Source code for qgis_profiler.meters.thread_health_checker

#  Copyright (c) 2025-2026 profiler-qgis-plugin contributors.
#
#
#  This file is part of profiler-qgis-plugin.
#
#  profiler-qgis-plugin is free software: you can redistribute it and/or
#  modify it under the terms of the GNU General Public License as published
#  by the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  profiler-qgis-plugin is distributed in the hope that it will be
#  useful, but WITHOUT ANY WARRANTY; without even the implied warranty
#  of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with profiler-qgis-plugin. If not, see <https://www.gnu.org/licenses/>.

"""Meter that monitors main thread responsiveness.

Contains :class:`MainThreadHealthChecker`, which polls the main thread from a
background thread and reports an anomaly when the response time exceeds a
configurable threshold.
"""

import logging
import time
import warnings
from collections.abc import Callable
from typing import Optional

from qgis.core import QgsApplication
from qgis.PyQt.QtCore import (
    QElapsedTimer,
    QEventLoop,
    QObject,
    QThread,
    QTimer,
    pyqtSignal,
    pyqtSlot,
)

from qgis_profiler.meters.meter import Meter
from qgis_profiler.settings import Settings

LOGGER = logging.getLogger(__name__)


[docs] class ThreadPoller(QObject): """Poll the main thread at regular intervals from a background thread.""" poll = pyqtSignal() def __init__(self, poll_interval_ms: int) -> None: """Initialize with the poll interval in milliseconds.""" super().__init__() self._polling_active: bool = False self._elapsed_timer = QElapsedTimer() self._poll_interval = poll_interval_ms self._timer: QTimer | None = None self._event_loop = QEventLoop(self)
[docs] @pyqtSlot() def start(self) -> None: """Start polling.""" LOGGER.debug("Starting thread poller") self._run_polling() self._setup_timer() self._event_loop.exec()
[docs] @pyqtSlot() def stop(self) -> None: """Stop polling.""" LOGGER.debug("Stopping thread poller") if self._timer: self._timer.stop() self._event_loop.exit()
[docs] def elapsed_ms_after_last_ping(self) -> int: """Return elapsed milliseconds since last ping.""" return self._elapsed_timer.elapsed()
[docs] def set_poll_finished(self) -> None: """Mark polling as finished.""" self._polling_active = False
@pyqtSlot() def _run_polling(self) -> None: """Emit poll and restart the elapsed timer.""" if self._polling_active: return self._polling_active = True self._elapsed_timer.restart() self.poll.emit() def _setup_timer(self) -> None: """Set up the QTimer for periodic polling.""" self._timer = QTimer(self) self._timer.timeout.connect(self._run_polling) self._timer.start(self._poll_interval)
[docs] class MainThreadHealthChecker(Meter): """Monitors the health of the main application thread. Provides mechanisms to measure delays between thread pings and detect anomalies based on a defined threshold. """ _short_name = "main_thread" _instance: Optional["MainThreadHealthChecker"] = None def __init__(self, poll_interval_s: float, threshold_s: float) -> None: """Initialize with poll interval and threshold in seconds.""" super().__init__(supports_continuous_measurement=True) self._poll_interval_ms = poll_interval_s * 1000 self._threshold_ms = threshold_s * 1000 self._poller: ThreadPoller | None = None self._polling_thread: QThread | None = None self._last_delay_ms: int = 0 LOGGER.debug("Health checker parameters initialized: %s", self) def __str__(self) -> str: """Return a string representation of the health checker parameters.""" return ( f"MainThreadHealthChecker(" f"poll_interval_s={self._poll_interval_ms / 1000}, " f"threshold_s={self._threshold_ms / 1000})" )
[docs] @classmethod def get(cls) -> "MainThreadHealthChecker": """Return the singleton MainThreadHealthChecker instance.""" if cls._instance is None: cls._instance = MainThreadHealthChecker( poll_interval_s=Settings.thread_health_checker_poll_interval.get(), threshold_s=Settings.thread_health_checker_threshold.get(), ) cls._instance.enabled = Settings.thread_health_checker_enabled.get() return cls._instance
[docs] @classmethod def monitor( # noqa: PLR0913 cls, function: Callable | None = None, *, name: str | None = None, group: str | None = None, name_args: list[str] | None = None, connect_to_profiler: bool = True, start_continuous_measuring: bool = True, measure_after_call: bool = False, ) -> Callable: """Decorate a function to monitor it, warning if measure_after_call is used.""" if measure_after_call: warnings.warn( "measure_after_call is not recommended for MainThreadHealthChecker", stacklevel=1, ) return super().monitor( function, name=name, group=group, name_args=name_args, connect_to_profiler=connect_to_profiler, start_continuous_measuring=start_continuous_measuring, measure_after_call=measure_after_call, )
[docs] def reset_parameters(self) -> None: """Reset measurement parameters from current settings.""" self._poll_interval_ms = ( Settings.thread_health_checker_poll_interval.get() * 1000 ) self._threshold_ms = Settings.thread_health_checker_threshold.get() * 1000 self.enabled = Settings.thread_health_checker_enabled.get() LOGGER.debug("Health checker parameters reset: %s", self)
def _start_measuring(self) -> bool: LOGGER.debug("Starting health checking") self._poller = ThreadPoller(int(self._poll_interval_ms)) self._poller.poll.connect(self._on_poll_event) self._polling_thread = QThread(self) self._poller.moveToThread(self._polling_thread) self._polling_thread.finished.connect(self._poller.stop) self._polling_thread.finished.connect(self._poller.deleteLater) self._polling_thread.finished.connect(self._polling_thread.deleteLater) self._polling_thread.started.connect(self._poller.start) self._polling_thread.start() return True def _stop_measuring(self) -> None: LOGGER.debug("Stopping health checking") if self._poller: self._poller.poll.disconnect(self._on_poll_event) if self._polling_thread and self._polling_thread.isRunning(): self._polling_thread.quit() self._polling_thread.wait() self._polling_thread = None self._poller = None def _measure(self) -> tuple[float, bool]: """Measure the delay between thread poll and main thread response. :return: the last delay in milliseconds and whether it exceeded the threshold. """ if self._poller: return self._last_delay_ms / 1000, self._last_delay_ms > self._threshold_ms self._last_delay_ms = 0 self.start_measuring() t = time.time() while ( self._last_delay_ms == 0 and time.time() - t < Settings.thread_health_checker_poll_interval.get() * 2 ): QgsApplication.processEvents() self.cleanup() return self._last_delay_ms / 1000, self._last_delay_ms > self._threshold_ms @pyqtSlot() def _on_poll_event(self) -> None: """Handle poll events.""" if not self._poller: # Should not be possible return elapsed_ms = self._poller.elapsed_ms_after_last_ping() if elapsed_ms > self._threshold_ms: LOGGER.debug("Took too long %s ms", elapsed_ms) self._emit_anomaly(round(elapsed_ms / 1000, 3)) self._poller.set_poll_finished() self._last_delay_ms = elapsed_ms