# Copyright (c) 2025-2026 profiler-qgis-plugin contributors.
#
#
# This file is part of profiler-qgis-plugin.
#
# profiler-qgis-plugin is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# profiler-qgis-plugin is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with profiler-qgis-plugin. If not, see <https://www.gnu.org/licenses/>.
"""cProfile integration for QGIS profiling.
Contains :class:`QCProfiler`, a :class:`cProfile.Profile` subclass that can merge
QgsRuntimeProfiler data with standard cProfile statistics, and :class:`ProfilerEntry`,
a dataclass for representing individual profiling entries.
"""
import cProfile
import io
import pstats
from collections.abc import Generator, Sequence
from contextlib import contextmanager
from copy import deepcopy
from dataclasses import dataclass, field
from types import CodeType
from typing import TYPE_CHECKING, Any, Union
from qgis_profiler.constants import EPSILON
if TYPE_CHECKING:
from _lsprof import profiler_entry # noqa: SC200
[docs]
@dataclass
class ProfilerEntry: # noqa: PLW1641
"""Class representing a single profiling entry.
Inspired by _lsprof.profiler_entry and _lsprof.profiler_subentry
but made more flexible and easier to work with.
"""
code: str # code object or built-in function name
callcount: int = 1 # how many times this was called
inlinetime: float = 0.0 # inline time in this entry (not in subcalls)
reccallcount: int = 0 # how many times called recursively
totaltime: float = 0.0 # total time in this entry
calls: list["ProfilerEntry"] = field(default_factory=list) # details of the calls
[docs]
@staticmethod
def from_cprofiler(cprofiler: "cProfile.Profile") -> list["ProfilerEntry"]:
"""Turn cProfile.Profile stats into a list of ProfilerEntry objects."""
return [ProfilerEntry.from_stat(stat) for stat in cprofiler.getstats()]
[docs]
@staticmethod
def from_stat(stat: "profiler_entry") -> "ProfilerEntry":
"""Convert a profiler_entry stat into a ProfilerEntry."""
calls = (
[]
if not hasattr(stat, "calls") or not stat.calls
else list(map(ProfilerEntry.from_stat, stat.calls)) # type: ignore
)
return ProfilerEntry(
stat.code.co_name if isinstance(stat.code, CodeType) else stat.code,
stat.callcount,
round(stat.inlinetime, 3),
round(stat.reccallcount, 3),
round(stat.totaltime, 3),
calls,
)
def __add__(self, other: "ProfilerEntry") -> "ProfilerEntry":
"""Add two entries with the same code together."""
if self.code != other.code:
raise ValueError("Cannot add entries with different codes") # noqa: TRY003
return ProfilerEntry(
self.code,
self.callcount + other.callcount,
self.inlinetime + other.inlinetime,
self.reccallcount,
self.totaltime + other.totaltime,
self.calls,
)
def __eq__(self, other: object) -> bool:
"""Compare with approximate equality for floating point values."""
if not isinstance(other, ProfilerEntry):
return NotImplemented
return (
self.code == other.code
and self.callcount == other.callcount
and round(abs(self.inlinetime - other.inlinetime), 3) <= EPSILON
and self.reccallcount == other.reccallcount
and round(abs(self.totaltime - other.totaltime), 3) <= EPSILON
and self.calls == other.calls
)
def _extend_calls(self, calls: list["ProfilerEntry"]) -> None:
call_dict = {call.code: call for call in self.calls}
for call in calls:
if call.code in call_dict:
call_dict[call.code] += call # type: ignore
else:
call_dict[call.code] = call
self.calls = list(call_dict.values())
[docs]
@staticmethod
def parse_from_qgis_profiler_text(text: str) -> list["ProfilerEntry"]: # noqa: C901
"""Parse a given profiler text into a list of `ProfilerEntry` objects.
Process hierarchical structure based on indentation and generate profiling
entries.
"""
profile_entries: dict[str, ProfilerEntry] = {}
def profiler_lines_into_entries(
lines: list[str],
parents: set[str],
level: int = 1,
) -> list["ProfilerEntry"]:
"""Insert profiling data into the profile_entries dict."""
results = []
while lines:
line = lines[0]
line_level = line.count("-")
if line_level == level:
# This line is at the current level
lines.pop(0)
parts = line.split(": ")
name = parts[0].strip("- ").strip()
total_time = round(float(parts[1].strip()), 3)
entry = ProfilerEntry(name, totaltime=total_time)
new_entry = name not in profile_entries
if new_entry:
profile_entries[name] = entry
if name in parents:
profile_entries[name].reccallcount += 1
# Recursively parse children
calls = profiler_lines_into_entries(
lines, parents | {name}, level + 1
)
if calls:
profile_entries[name]._extend_calls(calls)
else:
# No children, inline time should be total time
entry.inlinetime = total_time
if not new_entry:
profile_entries[name] += entry
if level > 1:
if entry.calls:
entry = deepcopy(entry)
entry.calls = []
results.append(entry)
elif line_level < level:
# This line belongs to a parent level or is a group name
break
return results
# The first line is the name of the group
profiler_lines_into_entries(
text.splitlines()[1:],
set(),
)
return list(profile_entries.values())
[docs]
class QCProfiler(cProfile.Profile):
"""Extend cProfile.Profile with QGIS-specific functionality and extra utilities."""
def __init__(self) -> None:
"""Initialize the profiler with empty QGIS stats."""
super().__init__()
self._qgis_stats: list[ProfilerEntry] = []
self._profiling: bool = False
[docs]
@contextmanager
def qgis_profiler_data(self, profiler_text: str) -> Generator[None, Any, None]:
"""Temporarily set QGIS profiler stats from the given text."""
self._qgis_stats = ProfilerEntry.parse_from_qgis_profiler_text(profiler_text)
try:
yield
finally:
self._qgis_stats = []
[docs]
def enable(self, subcalls: bool = True, builtins: bool = True) -> None: # noqa: FBT001 FBT002
"""Enable profiling."""
super().enable(subcalls, builtins)
self._profiling = True
[docs]
def disable(self) -> None:
"""Disable profiling."""
super().disable()
self._profiling = False
[docs]
def getstats(self) -> Sequence[Union[ProfilerEntry, "profiler_entry"]]: # type: ignore[override]
"""Return QGIS stats if available, otherwise standard cProfile stats."""
if self._qgis_stats:
return self._qgis_stats
return super().getstats()
[docs]
def is_profiling(self) -> bool:
"""Return whether profiling is currently active."""
return self._profiling
[docs]
def get_stat_report(
self,
sort: str | tuple[str, ...] | int = -1,
max_line_count: int = 1000,
trim_zeros: bool = False, # noqa: FBT001 FBT002
) -> str:
"""Get the profile report as a string.
:param sort: Sort method. Can be a string or a tuple of strings.
:param max_line_count: Maximum number of lines to return.
:param trim_zeros: Trim lines with zero times from the report.
:return: The profile report as a string.
"""
if not isinstance(sort, tuple):
sort = (sort,) # type: ignore[assignment]
with io.StringIO() as stream:
pstats.Stats(self, stream=stream).strip_dirs().sort_stats( # type: ignore[misc]
*sort
).print_stats()
report = stream.getvalue()
report_lines = []
if trim_zeros:
for line in report.splitlines():
if "0.000 0.000 0.000 0.000" in line:
continue
report_lines.append(line)
else:
report_lines = report.splitlines()
# First 5 lines are part of the header
return "\n".join(report_lines[: max_line_count + 5])