SXXXXXXX_ProfileAnalyzer/profileanalyzer/core/core.py
2025-06-23 14:37:58 +02:00

251 lines
9.9 KiB
Python

# profileAnalyzer/core/core.py
import pstats
from io import StringIO
import subprocess
import sys
import os
from dataclasses import dataclass
from typing import List, Optional, Tuple, Dict
from datetime import datetime
from pstats import Stats
import tempfile
from functools import lru_cache
try:
import graphviz
GRAPHVIZ_AVAILABLE = True
except ImportError:
GRAPHVIZ_AVAILABLE = False
# --- New Utility Function ---
@lru_cache(maxsize=None)
def is_graphviz_installed() -> bool:
"""
Checks if the Graphviz executable is installed and in the system's PATH.
Returns True if available, False otherwise. Caches the result.
"""
if not GRAPHVIZ_AVAILABLE:
return False
try:
# This command is lightweight and fails if 'dot' is not found.
graphviz.Digraph().pipe()
return True
except graphviz.backend.ExecutableNotFound:
print("INFO: Graphviz executable not found. Graph generation will be disabled.")
return False
# --- Dataclass (unchanged) ---
@dataclass
class LaunchProfile:
# ... (il codice rimane identico)
name: str
run_as_module: bool = False
target_path: str = ""
module_name: str = ""
script_args: str = ""
python_interpreter: str = sys.executable
# --- ProfileAnalyzer Class (Updated) ---
class ProfileAnalyzer:
# ... (tutta la classe ProfileAnalyzer, incluso generate_call_graph, rimane identica alla versione precedente)
"""
Handles loading a profile data file and extracting statistics
by directly accessing the pstats data structures.
"""
def __init__(self):
self.stats: Optional[Stats] = None
self.profile_path: Optional[str] = None
self._func_info_map: Optional[Dict[str, Tuple]] = None
def load_profile(self, filepath: str) -> bool:
try:
self.stats = pstats.Stats(filepath)
self.stats.strip_dirs()
self.profile_path = filepath
self._func_info_map = None
return True
except (FileNotFoundError, TypeError, OSError) as e:
print(f"Error loading profile file '{filepath}': {e}")
self.stats = None
self.profile_path = None
self._func_info_map = None
return False
def _get_func_info_map(self) -> Dict[str, Tuple]:
if self._func_info_map is None and self.stats:
self._func_info_map = {
pstats.func_std_string(func): func
for func in self.stats.stats
}
return self._func_info_map or {}
def _find_func_tuple_by_string(self, func_info_str: str) -> Optional[Tuple]:
return self._get_func_info_map().get(func_info_str)
def get_stats(self, sort_by: str, limit: int = 500) -> List[Tuple]:
if not self.stats:
return []
sort_map = {
"cumulative": 3, "tottime": 2, "ncalls": 1, "filename": 4
}
sort_key_index = sort_map.get(sort_by, 3)
all_stats = []
for func_tuple, (cc, nc, tt, ct, callers) in self.stats.stats.items():
func_str = pstats.func_std_string(func_tuple)
percall_tottime = tt / nc if nc > 0 else 0
percall_cumtime = ct / nc if nc > 0 else 0
all_stats.append({
'func_str': func_str, 'func_tuple': func_tuple,
'raw_stats': (cc, nc, tt, ct, percall_tottime, percall_cumtime)
})
if sort_by == 'filename':
all_stats.sort(key=lambda x: x['func_str'])
else:
all_stats.sort(key=lambda x: x['raw_stats'][sort_key_index], reverse=True)
results = []
for stat_item in all_stats[:limit]:
s = stat_item['raw_stats']
results.append((s[1], s[2], s[4], s[3], s[5], stat_item['func_str']))
return results
def get_callers(self, func_info: str) -> List[Tuple[str, str]]:
if not self.stats: return []
target_func_tuple = self._find_func_tuple_by_string(func_info)
if not target_func_tuple or target_func_tuple not in self.stats.stats: return []
callers_data = self.stats.stats[target_func_tuple][4]
results = []
for caller_tuple, (cc, nc, tt, ct) in callers_data.items():
caller_info_str = f"{nc} call(s) in {ct:.4f}s"
caller_func_str = pstats.func_std_string(caller_tuple)
results.append((caller_info_str, caller_func_str))
results.sort(key=lambda x: x[1])
return results
def get_callees(self, func_info: str) -> List[Tuple[str, str]]:
if not self.stats: return []
target_func_tuple = self._find_func_tuple_by_string(func_info)
if not target_func_tuple: return []
callees = {}
for func_tuple, stats_tuple in self.stats.stats.items():
if target_func_tuple in stats_tuple[4]:
call_stats = stats_tuple[4][target_func_tuple]
callees[func_tuple] = call_stats
results = []
for callee_tuple, (cc, nc, tt, ct) in callees.items():
callee_info_str = f"{nc} call(s) for {ct:.4f}s"
callee_func_str = pstats.func_std_string(callee_tuple)
results.append((callee_info_str, callee_func_str))
results.sort(key=lambda x: x[1])
return results
def _get_color_for_time(self, time: float, max_time: float) -> str:
if max_time == 0: return "#90EE90"
ratio = min(time / max_time, 1.0)
red = int(255 * min(ratio * 2, 1.0))
green = int(255 * min((1 - ratio) * 2, 1.0))
return f"#{red:02x}{green:02x}00"
def generate_call_graph(self, threshold: float = 0.01) -> Optional[str]:
if not self.stats or not self.stats.stats or not is_graphviz_installed(): return None
total_time = self.stats.total_tt
if total_time == 0: return None
max_tottime = max(s[2] for s in self.stats.stats.values()) if self.stats.stats else 0
graph = graphviz.Digraph(comment='Call Graph',
graph_attr={'rankdir': 'LR', 'splines': 'true', 'overlap': 'false'},
node_attr={'shape': 'box', 'style': 'filled'})
nodes_in_graph = set()
for func_tuple, (cc, nc, tt, ct, callers) in self.stats.stats.items():
if tt < total_time * threshold:
continue
func_str = pstats.func_std_string(func_tuple)
nodes_in_graph.add(func_tuple)
node_label = f"{func_str}\n(tottime: {tt:.4f}s)"
node_color = self._get_color_for_time(tt, max_tottime)
graph.node(name=func_str, label=node_label, fillcolor=node_color)
for caller_tuple, call_stats in callers.items():
caller_tottime = self.stats.stats.get(caller_tuple, (0,0,0,0))[2]
if caller_tottime >= total_time * threshold:
caller_str = pstats.func_std_string(caller_tuple)
graph.edge(caller_str, func_str, label=f"{call_stats[1]} calls")
if not nodes_in_graph:
print("No significant functions to graph based on the threshold.")
return None
try:
output_path = tempfile.NamedTemporaryFile(suffix=".png", delete=False).name
graph.render(outfile=output_path, format='png', view=False, cleanup=True)
return output_path
except Exception as e:
print(f"Failed to render graph: {e}")
return None
# --- run_and_profile_script function (unchanged) ---
def run_and_profile_script(profile: LaunchProfile) -> Optional[str]:
# ... (il codice rimane identico)
output_dir = os.path.join(os.getcwd(), "execution_profiles")
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
profile_name_sanitized = profile.name.replace(' ', '_').replace('.', '_')
profile_output_path = os.path.abspath(os.path.join(output_dir, f"{timestamp}_{profile_name_sanitized}.prof"))
command = [profile.python_interpreter, "-m", "cProfile", "-o", profile_output_path]
working_directory = None
if profile.run_as_module:
if not profile.module_name:
print("Error: Module name is required.")
return None
if not profile.target_path or not os.path.isdir(profile.target_path):
print(f"Error: Project Root Folder is not a valid directory: {profile.target_path}")
return None
command.extend(["-m", profile.module_name])
working_directory = profile.target_path
else:
if not profile.target_path or not os.path.exists(profile.target_path):
print(f"Error: Script path does not exist: {profile.target_path}")
return None
command.append(profile.target_path)
working_directory = os.path.dirname(profile.target_path)
if profile.script_args:
command.extend(profile.script_args.split())
try:
print(f"Executing profiling command: {' '.join(command)}")
print(f"Working Directory for subprocess: {working_directory}")
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=working_directory
)
stdout, stderr = process.communicate()
if stdout: print(f"--- STDOUT ---\n{stdout}")
if stderr: print(f"--- STDERR ---\n{stderr}")
if process.returncode != 0:
print(f"Warning: Profiled script exited with non-zero status: {process.returncode}")
if not os.path.exists(profile_output_path) or os.path.getsize(profile_output_path) == 0:
print("Error: Profiling failed and no output file was generated.")
return None
print(f"Profiling data saved to: {profile_output_path}")
return profile_output_path
except Exception as e:
print(f"Failed to run and profile script: {e}")
return None