# Copyright (c) 2025-2026 profiler-qgis-plugin contributors.
#
#
# This file is part of profiler-qgis-plugin.
#
# profiler-qgis-plugin is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# profiler-qgis-plugin is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with profiler-qgis-plugin. If not, see <https://www.gnu.org/licenses/>.
"""Meter that measures how long QGIS takes to recover after a freeze.
Contains :class:`RecoveryMeasurer`, which repeatedly processes main-thread events
and reports an anomaly when the recovery time exceeds a configurable threshold.
"""
import logging
from typing import Optional
from qgis.PyQt.QtCore import QCoreApplication, QElapsedTimer
from qgis_profiler.meters.meter import Meter
from qgis_profiler.settings import Settings
LOGGER = logging.getLogger(__name__)
[docs]
class RecoveryMeasurer(Meter):
"""Measure how long QGIS takes to become fully responsive after a freeze."""
_short_name = "recovery"
_instance: Optional["RecoveryMeasurer"] = None
def __init__(
self,
process_event_count: int,
threshold_s: int,
timeout_s: int,
) -> None:
"""Initialize with event count, threshold, and timeout parameters."""
super().__init__()
self._process_event_count = process_event_count
self._threshold_ms = threshold_s * 1000
self._timeout_ms = timeout_s * 1000
self._elapsed_timer = QElapsedTimer()
self._recovery_timer = QElapsedTimer()
LOGGER.debug("Recovery parameters initialized: %s", self)
def __str__(self) -> str:
"""Return a string representation of the measurer parameters."""
return (
f"RecoveryMeasurer("
f"process_event_count={self._process_event_count}, "
f"threshold_s={self._threshold_ms / 1000},"
f"timeout_s={self._timeout_ms / 1000}),"
)
[docs]
@classmethod
def get(cls) -> "RecoveryMeasurer":
"""Return the singleton RecoveryMeasurer instance."""
if cls._instance is None:
cls._instance = RecoveryMeasurer(
process_event_count=Settings.recovery_process_event_count.get(),
threshold_s=Settings.recovery_threshold.get(),
timeout_s=Settings.recovery_timeout.get(),
)
cls._instance.enabled = Settings.recovery_meter_enabled.get()
return cls._instance
[docs]
def reset_parameters(self) -> None:
"""Reset measurement parameters from current settings."""
self._process_event_count = Settings.recovery_process_event_count.get()
self._threshold_ms = Settings.recovery_threshold.get() * 1000
self._timeout_ms = Settings.recovery_timeout.get() * 1000
self.enabled = Settings.recovery_meter_enabled.get()
LOGGER.debug("Recovery parameters reset: %s", self)
def _measure(self) -> tuple[float, bool]:
self._elapsed_timer.start()
over_threshold = self._wait_for_recovery()
elapsed_seconds = round(self._elapsed_timer.elapsed() / 1000, 3)
return elapsed_seconds, over_threshold
def _wait_for_recovery(self) -> bool:
over_threshold = False
while (
recovery_time := self._time_to_process_main_thread_events()
) > self._threshold_ms:
over_threshold = True
LOGGER.debug("Recovery time: %sms", recovery_time)
if self._elapsed_timer.elapsed() > self._timeout_ms:
LOGGER.warning("Recovery time exceeded timeout")
break
QCoreApplication.processEvents()
return over_threshold
def _time_to_process_main_thread_events(self) -> int:
self._recovery_timer.start()
for _ in range(self._process_event_count):
QCoreApplication.processEvents()
return self._recovery_timer.elapsed() # ms