251 lines
9.9 KiB
Python
251 lines
9.9 KiB
Python
# profileAnalyzer/core/core.py
|
|
|
|
import pstats
|
|
from io import StringIO
|
|
import subprocess
|
|
import sys
|
|
import os
|
|
from dataclasses import dataclass
|
|
from typing import List, Optional, Tuple, Dict
|
|
from datetime import datetime
|
|
from pstats import Stats
|
|
import tempfile
|
|
from functools import lru_cache
|
|
|
|
try:
|
|
import graphviz
|
|
GRAPHVIZ_AVAILABLE = True
|
|
except ImportError:
|
|
GRAPHVIZ_AVAILABLE = False
|
|
|
|
|
|
# --- New Utility Function ---
|
|
@lru_cache(maxsize=None)
|
|
def is_graphviz_installed() -> bool:
|
|
"""
|
|
Checks if the Graphviz executable is installed and in the system's PATH.
|
|
Returns True if available, False otherwise. Caches the result.
|
|
"""
|
|
if not GRAPHVIZ_AVAILABLE:
|
|
return False
|
|
try:
|
|
# This command is lightweight and fails if 'dot' is not found.
|
|
graphviz.Digraph().pipe()
|
|
return True
|
|
except graphviz.backend.ExecutableNotFound:
|
|
print("INFO: Graphviz executable not found. Graph generation will be disabled.")
|
|
return False
|
|
|
|
|
|
# --- Dataclass (unchanged) ---
|
|
@dataclass
|
|
class LaunchProfile:
|
|
# ... (il codice rimane identico)
|
|
name: str
|
|
run_as_module: bool = False
|
|
target_path: str = ""
|
|
module_name: str = ""
|
|
script_args: str = ""
|
|
python_interpreter: str = sys.executable
|
|
|
|
# --- ProfileAnalyzer Class (Updated) ---
|
|
class ProfileAnalyzer:
|
|
# ... (tutta la classe ProfileAnalyzer, incluso generate_call_graph, rimane identica alla versione precedente)
|
|
"""
|
|
Handles loading a profile data file and extracting statistics
|
|
by directly accessing the pstats data structures.
|
|
"""
|
|
def __init__(self):
|
|
self.stats: Optional[Stats] = None
|
|
self.profile_path: Optional[str] = None
|
|
self._func_info_map: Optional[Dict[str, Tuple]] = None
|
|
|
|
def load_profile(self, filepath: str) -> bool:
|
|
try:
|
|
self.stats = pstats.Stats(filepath)
|
|
self.stats.strip_dirs()
|
|
self.profile_path = filepath
|
|
self._func_info_map = None
|
|
return True
|
|
except (FileNotFoundError, TypeError, OSError) as e:
|
|
print(f"Error loading profile file '{filepath}': {e}")
|
|
self.stats = None
|
|
self.profile_path = None
|
|
self._func_info_map = None
|
|
return False
|
|
|
|
def _get_func_info_map(self) -> Dict[str, Tuple]:
|
|
if self._func_info_map is None and self.stats:
|
|
self._func_info_map = {
|
|
pstats.func_std_string(func): func
|
|
for func in self.stats.stats
|
|
}
|
|
return self._func_info_map or {}
|
|
|
|
def _find_func_tuple_by_string(self, func_info_str: str) -> Optional[Tuple]:
|
|
return self._get_func_info_map().get(func_info_str)
|
|
|
|
def get_stats(self, sort_by: str, limit: int = 500) -> List[Tuple]:
|
|
if not self.stats:
|
|
return []
|
|
sort_map = {
|
|
"cumulative": 3, "tottime": 2, "ncalls": 1, "filename": 4
|
|
}
|
|
sort_key_index = sort_map.get(sort_by, 3)
|
|
|
|
all_stats = []
|
|
for func_tuple, (cc, nc, tt, ct, callers) in self.stats.stats.items():
|
|
func_str = pstats.func_std_string(func_tuple)
|
|
percall_tottime = tt / nc if nc > 0 else 0
|
|
percall_cumtime = ct / nc if nc > 0 else 0
|
|
all_stats.append({
|
|
'func_str': func_str, 'func_tuple': func_tuple,
|
|
'raw_stats': (cc, nc, tt, ct, percall_tottime, percall_cumtime)
|
|
})
|
|
|
|
if sort_by == 'filename':
|
|
all_stats.sort(key=lambda x: x['func_str'])
|
|
else:
|
|
all_stats.sort(key=lambda x: x['raw_stats'][sort_key_index], reverse=True)
|
|
|
|
results = []
|
|
for stat_item in all_stats[:limit]:
|
|
s = stat_item['raw_stats']
|
|
results.append((s[1], s[2], s[4], s[3], s[5], stat_item['func_str']))
|
|
return results
|
|
|
|
def get_callers(self, func_info: str) -> List[Tuple[str, str]]:
|
|
if not self.stats: return []
|
|
target_func_tuple = self._find_func_tuple_by_string(func_info)
|
|
if not target_func_tuple or target_func_tuple not in self.stats.stats: return []
|
|
callers_data = self.stats.stats[target_func_tuple][4]
|
|
results = []
|
|
for caller_tuple, (cc, nc, tt, ct) in callers_data.items():
|
|
caller_info_str = f"{nc} call(s) in {ct:.4f}s"
|
|
caller_func_str = pstats.func_std_string(caller_tuple)
|
|
results.append((caller_info_str, caller_func_str))
|
|
results.sort(key=lambda x: x[1])
|
|
return results
|
|
|
|
def get_callees(self, func_info: str) -> List[Tuple[str, str]]:
|
|
if not self.stats: return []
|
|
target_func_tuple = self._find_func_tuple_by_string(func_info)
|
|
if not target_func_tuple: return []
|
|
callees = {}
|
|
for func_tuple, stats_tuple in self.stats.stats.items():
|
|
if target_func_tuple in stats_tuple[4]:
|
|
call_stats = stats_tuple[4][target_func_tuple]
|
|
callees[func_tuple] = call_stats
|
|
results = []
|
|
for callee_tuple, (cc, nc, tt, ct) in callees.items():
|
|
callee_info_str = f"{nc} call(s) for {ct:.4f}s"
|
|
callee_func_str = pstats.func_std_string(callee_tuple)
|
|
results.append((callee_info_str, callee_func_str))
|
|
results.sort(key=lambda x: x[1])
|
|
return results
|
|
|
|
def _get_color_for_time(self, time: float, max_time: float) -> str:
|
|
if max_time == 0: return "#90EE90"
|
|
ratio = min(time / max_time, 1.0)
|
|
red = int(255 * min(ratio * 2, 1.0))
|
|
green = int(255 * min((1 - ratio) * 2, 1.0))
|
|
return f"#{red:02x}{green:02x}00"
|
|
|
|
def generate_call_graph(self, threshold: float = 0.01) -> Optional[str]:
|
|
if not self.stats or not self.stats.stats or not is_graphviz_installed(): return None
|
|
total_time = self.stats.total_tt
|
|
if total_time == 0: return None
|
|
|
|
max_tottime = max(s[2] for s in self.stats.stats.values()) if self.stats.stats else 0
|
|
graph = graphviz.Digraph(comment='Call Graph',
|
|
graph_attr={'rankdir': 'LR', 'splines': 'true', 'overlap': 'false'},
|
|
node_attr={'shape': 'box', 'style': 'filled'})
|
|
|
|
nodes_in_graph = set()
|
|
for func_tuple, (cc, nc, tt, ct, callers) in self.stats.stats.items():
|
|
if tt < total_time * threshold:
|
|
continue
|
|
func_str = pstats.func_std_string(func_tuple)
|
|
nodes_in_graph.add(func_tuple)
|
|
node_label = f"{func_str}\n(tottime: {tt:.4f}s)"
|
|
node_color = self._get_color_for_time(tt, max_tottime)
|
|
graph.node(name=func_str, label=node_label, fillcolor=node_color)
|
|
|
|
for caller_tuple, call_stats in callers.items():
|
|
caller_tottime = self.stats.stats.get(caller_tuple, (0,0,0,0))[2]
|
|
if caller_tottime >= total_time * threshold:
|
|
caller_str = pstats.func_std_string(caller_tuple)
|
|
graph.edge(caller_str, func_str, label=f"{call_stats[1]} calls")
|
|
|
|
if not nodes_in_graph:
|
|
print("No significant functions to graph based on the threshold.")
|
|
return None
|
|
try:
|
|
output_path = tempfile.NamedTemporaryFile(suffix=".png", delete=False).name
|
|
graph.render(outfile=output_path, format='png', view=False, cleanup=True)
|
|
return output_path
|
|
except Exception as e:
|
|
print(f"Failed to render graph: {e}")
|
|
return None
|
|
|
|
# --- run_and_profile_script function (unchanged) ---
|
|
def run_and_profile_script(profile: LaunchProfile) -> Optional[str]:
|
|
# ... (il codice rimane identico)
|
|
output_dir = os.path.join(os.getcwd(), "execution_profiles")
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
profile_name_sanitized = profile.name.replace(' ', '_').replace('.', '_')
|
|
profile_output_path = os.path.abspath(os.path.join(output_dir, f"{timestamp}_{profile_name_sanitized}.prof"))
|
|
|
|
command = [profile.python_interpreter, "-m", "cProfile", "-o", profile_output_path]
|
|
working_directory = None
|
|
|
|
if profile.run_as_module:
|
|
if not profile.module_name:
|
|
print("Error: Module name is required.")
|
|
return None
|
|
if not profile.target_path or not os.path.isdir(profile.target_path):
|
|
print(f"Error: Project Root Folder is not a valid directory: {profile.target_path}")
|
|
return None
|
|
|
|
command.extend(["-m", profile.module_name])
|
|
working_directory = profile.target_path
|
|
else:
|
|
if not profile.target_path or not os.path.exists(profile.target_path):
|
|
print(f"Error: Script path does not exist: {profile.target_path}")
|
|
return None
|
|
command.append(profile.target_path)
|
|
working_directory = os.path.dirname(profile.target_path)
|
|
|
|
if profile.script_args:
|
|
command.extend(profile.script_args.split())
|
|
|
|
try:
|
|
print(f"Executing profiling command: {' '.join(command)}")
|
|
print(f"Working Directory for subprocess: {working_directory}")
|
|
|
|
process = subprocess.Popen(
|
|
command,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
cwd=working_directory
|
|
)
|
|
stdout, stderr = process.communicate()
|
|
|
|
if stdout: print(f"--- STDOUT ---\n{stdout}")
|
|
if stderr: print(f"--- STDERR ---\n{stderr}")
|
|
|
|
if process.returncode != 0:
|
|
print(f"Warning: Profiled script exited with non-zero status: {process.returncode}")
|
|
|
|
if not os.path.exists(profile_output_path) or os.path.getsize(profile_output_path) == 0:
|
|
print("Error: Profiling failed and no output file was generated.")
|
|
return None
|
|
|
|
print(f"Profiling data saved to: {profile_output_path}")
|
|
return profile_output_path
|
|
|
|
except Exception as e:
|
|
print(f"Failed to run and profile script: {e}")
|
|
return None |