SXXXXXXX_ProfileAnalyzer/profileanalyzer/core/core.py
2025-06-23 15:31:29 +02:00

293 lines
12 KiB
Python

# profileAnalyzer/core/core.py
import pstats
from io import StringIO
import subprocess
import sys
import os
from dataclasses import dataclass
from typing import List, Optional, Tuple, Dict, Any
from datetime import datetime
from pstats import Stats
import tempfile
from functools import lru_cache
try:
import graphviz
GRAPHVIZ_AVAILABLE = True
except ImportError:
GRAPHVIZ_AVAILABLE = False
# --- Utility Function (unchanged) ---
@lru_cache(maxsize=None)
def is_graphviz_installed() -> bool:
if not GRAPHVIZ_AVAILABLE:
return False
try:
graphviz.Digraph().pipe()
return True
except graphviz.backend.ExecutableNotFound:
print("INFO: Graphviz executable not found. Graph generation will be disabled.")
return False
# --- Dataclass (unchanged) ---
@dataclass
class LaunchProfile:
name: str
run_as_module: bool = False
target_path: str = ""
module_name: str = ""
script_args: str = ""
python_interpreter: str = sys.executable
# --- ProfileAnalyzer Class (Updated) ---
class ProfileAnalyzer:
"""
Handles loading, analyzing, and comparing profile data.
"""
def __init__(self):
self.stats: Optional[Stats] = None
self.profile_path: Optional[str] = None
self._func_info_map: Optional[Dict[str, Tuple]] = None
def load_profile(self, filepath: str) -> Optional[Stats]:
"""
Loads a profile file and returns the Stats object.
If this instance is used for single analysis, it also sets self.stats.
"""
try:
stats_obj = pstats.Stats(filepath)
stats_obj.strip_dirs()
# For single analysis mode, update instance state
self.stats = stats_obj
self.profile_path = filepath
self._func_info_map = None # Invalidate cache
return stats_obj
except (FileNotFoundError, TypeError, OSError) as e:
print(f"Error loading profile file '{filepath}': {e}")
self.stats = None
self.profile_path = None
self._func_info_map = None
return None
def compare_stats(self, stats_base: Stats, stats_comparison: Stats) -> List[Dict]:
"""
Compares two pstats.Stats objects and returns the differences.
Args:
stats_base: The baseline stats object.
stats_comparison: The comparison stats object to check against the baseline.
Returns:
A list of dictionaries, where each dictionary contains stats
for a function from the comparison profile plus the deltas.
"""
base = stats_base.stats
comp = stats_comparison.stats
# Get a set of all function keys from both profiles
all_funcs = set(base.keys()) | set(comp.keys())
comparison_results = []
for func_tuple in all_funcs:
# Get stats, defaulting to zero-tuple if function not in profile
base_stats = base.get(func_tuple, (0, 0, 0, 0, {}))
comp_stats = comp.get(func_tuple, (0, 0, 0, 0, {}))
# Unpack stats for comparison profile
cc, nc, tt, ct, _ = comp_stats
# Calculate deltas
delta_nc = nc - base_stats[1]
delta_tt = tt - base_stats[2]
delta_ct = ct - base_stats[3]
# Calculate per-call stats for the comparison profile
percall_tottime = tt / nc if nc > 0 else 0
percall_cumtime = ct / nc if nc > 0 else 0
comparison_results.append({
'func_str': pstats.func_std_string(func_tuple),
'ncalls': nc,
'tottime': tt,
'percall_tottime': percall_tottime,
'cumtime': ct,
'percall_cumtime': percall_cumtime,
'delta_ncalls': delta_nc,
'delta_tottime': delta_tt,
'delta_cumtime': delta_ct
})
return comparison_results
# --- All other methods remain the same ---
def _get_func_info_map(self) -> Dict[str, Tuple]:
if self._func_info_map is None and self.stats:
self._func_info_map = {
pstats.func_std_string(func): func
for func in self.stats.stats
}
return self._func_info_map or {}
def _find_func_tuple_by_string(self, func_info_str: str) -> Optional[Tuple]:
return self._get_func_info_map().get(func_info_str)
def get_stats(self, sort_by: str, limit: int = 500) -> List[Tuple]:
"""
Extracts statistics, including percentage of total time, filepath and line number.
Returns:
A list of tuples in the format:
(ncalls, tottime, percentage_tottime, percall_tottime, cumtime,
percall_cumtime, function_details_str, filepath, line_number)
"""
if not self.stats:
return []
sort_map = {"cumulative": "cumtime", "tottime": "tottime", "ncalls": "ncalls", "filename": "func_str"}
sort_key = sort_map.get(sort_by, "cumtime")
all_stats = []
total_tt = self.stats.total_tt or 1
for func_tuple, (cc, nc, tt, ct, callers) in self.stats.stats.items():
filepath, line_number, func_name = func_tuple # Unpack the function tuple
percall_tottime = tt / nc if nc > 0 else 0
percall_cumtime = ct / nc if nc > 0 else 0
percentage_tottime = (tt / total_tt) * 100
all_stats.append({
'func_str': pstats.func_std_string(func_tuple),
'ncalls': nc, 'tottime': tt, 'cumtime': ct,
'percall_tottime': percall_tottime, 'percall_cumtime': percall_cumtime,
'percentage_tottime': percentage_tottime,
'filepath': filepath, 'line_number': line_number
})
all_stats.sort(key=lambda x: x[sort_key], reverse=(sort_key != 'func_str'))
results = []
for stat_item in all_stats[:limit]:
results.append((
stat_item['ncalls'], stat_item['tottime'], stat_item['percentage_tottime'],
stat_item['percall_tottime'], stat_item['cumtime'], stat_item['percall_cumtime'],
stat_item['func_str'], stat_item['filepath'], stat_item['line_number']
))
return results
def get_callers(self, func_info: str) -> List[Tuple[str, str]]:
if not self.stats: return []
target_func_tuple = self._find_func_tuple_by_string(func_info)
if not target_func_tuple or target_func_tuple not in self.stats.stats: return []
callers_data = self.stats.stats[target_func_tuple][4]
results = []
for caller_tuple, (cc, nc, tt, ct) in callers_data.items():
caller_info_str = f"{nc} call(s) in {ct:.4f}s"
caller_func_str = pstats.func_std_string(caller_tuple)
results.append((caller_info_str, caller_func_str))
results.sort(key=lambda x: x[1])
return results
def get_callees(self, func_info: str) -> List[Tuple[str, str]]:
if not self.stats: return []
target_func_tuple = self._find_func_tuple_by_string(func_info)
if not target_func_tuple: return []
callees = {}
for func_tuple, stats_tuple in self.stats.stats.items():
if target_func_tuple in stats_tuple[4]:
call_stats = stats_tuple[4][target_func_tuple]
callees[func_tuple] = call_stats
results = []
for callee_tuple, (cc, nc, tt, ct) in callees.items():
callee_info_str = f"{nc} call(s) for {ct:.4f}s"
callee_func_str = pstats.func_std_string(callee_tuple)
results.append((callee_info_str, callee_func_str))
results.sort(key=lambda x: x[1])
return results
def _get_color_for_time(self, time: float, max_time: float) -> str:
if max_time == 0: return "#90EE90"
ratio = min(time / max_time, 1.0)
red = int(255 * min(ratio * 2, 1.0))
green = int(255 * min((1 - ratio) * 2, 1.0))
return f"#{red:02x}{green:02x}00"
def generate_call_graph(self, threshold: float = 0.01) -> Optional[str]:
if not self.stats or not self.stats.stats or not is_graphviz_installed(): return None
total_time = self.stats.total_tt
if total_time == 0: return None
max_tottime = max(s[2] for s in self.stats.stats.values()) if self.stats.stats else 0
graph = graphviz.Digraph(comment='Call Graph', graph_attr={'rankdir': 'LR', 'splines': 'true', 'overlap': 'false'}, node_attr={'shape': 'box', 'style': 'filled'})
nodes_in_graph = set()
for func_tuple, (cc, nc, tt, ct, callers) in self.stats.stats.items():
if tt < total_time * threshold: continue
func_str = pstats.func_std_string(func_tuple)
nodes_in_graph.add(func_tuple)
node_label = f"{func_str}\n(tottime: {tt:.4f}s)"
node_color = self._get_color_for_time(tt, max_tottime)
graph.node(name=func_str, label=node_label, fillcolor=node_color)
for caller_tuple, call_stats in callers.items():
caller_tottime = self.stats.stats.get(caller_tuple, (0,0,0,0))[2]
if caller_tottime >= total_time * threshold:
caller_str = pstats.func_std_string(caller_tuple)
graph.edge(caller_str, func_str, label=f"{call_stats[1]} calls")
if not nodes_in_graph:
print("No significant functions to graph based on the threshold.")
return None
try:
output_path = tempfile.NamedTemporaryFile(suffix=".png", delete=False).name
graph.render(outfile=output_path, format='png', view=False, cleanup=True)
return output_path
except Exception as e:
print(f"Failed to render graph: {e}")
return None
# --- run_and_profile_script function (unchanged) ---
def run_and_profile_script(profile: LaunchProfile) -> Optional[str]:
# ... (code for this function remains identical)
output_dir = os.path.join(os.getcwd(), "execution_profiles")
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
profile_name_sanitized = profile.name.replace(' ', '_').replace('.', '_')
profile_output_path = os.path.abspath(os.path.join(output_dir, f"{timestamp}_{profile_name_sanitized}.prof"))
command = [profile.python_interpreter, "-m", "cProfile", "-o", profile_output_path]
working_directory = None
if profile.run_as_module:
if not profile.module_name:
print("Error: Module name is required.")
return None
if not profile.target_path or not os.path.isdir(profile.target_path):
print(f"Error: Project Root Folder is not a valid directory: {profile.target_path}")
return None
command.extend(["-m", profile.module_name])
working_directory = profile.target_path
else:
if not profile.target_path or not os.path.exists(profile.target_path):
print(f"Error: Script path does not exist: {profile.target_path}")
return None
command.append(profile.target_path)
working_directory = os.path.dirname(profile.target_path)
if profile.script_args:
command.extend(profile.script_args.split())
try:
print(f"Executing profiling command: {' '.join(command)}")
print(f"Working Directory for subprocess: {working_directory}")
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, cwd=working_directory)
stdout, stderr = process.communicate()
if stdout: print(f"--- STDOUT ---\n{stdout}")
if stderr: print(f"--- STDERR ---\n{stderr}")
if process.returncode != 0:
print(f"Warning: Profiled script exited with non-zero status: {process.returncode}")
if not os.path.exists(profile_output_path) or os.path.getsize(profile_output_path) == 0:
print("Error: Profiling failed and no output file was generated.")
return None
print(f"Profiling data saved to: {profile_output_path}")
return profile_output_path
except Exception as e:
print(f"Failed to run and profile script: {e}")
return None