198 lines
7.3 KiB
Python
198 lines
7.3 KiB
Python
# profileAnalyzer/core/core.py
|
|
|
|
# ... (tutti gli import e le altre classi/funzioni restano invariate) ...
|
|
import pstats
|
|
from io import StringIO
|
|
import subprocess
|
|
import sys
|
|
import os
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Optional, Tuple
|
|
from datetime import datetime
|
|
|
|
@dataclass
|
|
class LaunchProfile:
|
|
name: str
|
|
run_as_module: bool = False
|
|
target_path: str = ""
|
|
module_name: str = ""
|
|
script_args: str = ""
|
|
python_interpreter: str = sys.executable
|
|
|
|
class ProfileAnalyzer:
|
|
"""
|
|
Handles loading a profile data file and extracting statistics.
|
|
"""
|
|
def __init__(self):
|
|
self.stats: Optional[pstats.Stats] = None
|
|
self.profile_path: Optional[str] = None
|
|
|
|
def load_profile(self, filepath: str) -> bool:
|
|
try:
|
|
self.stats = pstats.Stats(filepath)
|
|
self.stats.strip_dirs()
|
|
self.profile_path = filepath
|
|
return True
|
|
except (FileNotFoundError, TypeError, OSError) as e:
|
|
print(f"Error loading profile file '{filepath}': {e}")
|
|
self.stats = None
|
|
self.profile_path = None
|
|
return False
|
|
|
|
def get_stats(self, sort_by: str, limit: int = 200) -> list:
|
|
# ... (nessuna modifica qui, il metodo rimane uguale)
|
|
if not self.stats:
|
|
return []
|
|
s = StringIO()
|
|
stats_to_print = pstats.Stats(self.profile_path, stream=s)
|
|
stats_to_print.strip_dirs()
|
|
stats_to_print.sort_stats(sort_by)
|
|
stats_to_print.print_stats(limit)
|
|
s.seek(0)
|
|
lines = s.getvalue().splitlines()
|
|
results = []
|
|
data_started = False
|
|
for line in lines:
|
|
if not line.strip(): continue
|
|
if 'ncalls' in line and 'tottime' in line:
|
|
data_started = True
|
|
continue
|
|
if data_started:
|
|
parts = line.strip().split(maxsplit=5)
|
|
if len(parts) == 6:
|
|
try:
|
|
results.append((parts[0], float(parts[1]), float(parts[2]), float(parts[3]), float(parts[4]), parts[5]))
|
|
except (ValueError, IndexError):
|
|
continue
|
|
return results
|
|
|
|
def _parse_pstats_output(self, output: str) -> List[Tuple[str, str]]:
|
|
"""Helper to parse the complex output of print_callers/callees."""
|
|
results = []
|
|
lines = output.splitlines()
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line and "->" in line:
|
|
parts = line.split("->")
|
|
if len(parts) == 2:
|
|
# Caller/Callee info is on the right
|
|
# Timing/call info is on the left
|
|
caller_info = parts[0].strip()
|
|
callee_details = parts[1].strip()
|
|
results.append((caller_info, callee_details))
|
|
return results
|
|
|
|
def get_callers(self, func_info: str) -> List[Tuple[str, str]]:
|
|
"""
|
|
Gets the list of functions that called the specified function.
|
|
|
|
Args:
|
|
func_info: The identifier string for the function from pstats.
|
|
|
|
Returns:
|
|
A list of tuples, where each tuple is (caller_timing_info, caller_function_info).
|
|
"""
|
|
if not self.stats:
|
|
return []
|
|
|
|
s = StringIO()
|
|
# Re-create stats object to redirect output
|
|
stats_to_print = pstats.Stats(self.profile_path, stream=s)
|
|
stats_to_print.strip_dirs()
|
|
stats_to_print.print_callers(func_info)
|
|
|
|
# pstats print_callers format is complex, we need to parse it carefully
|
|
# The output looks like:
|
|
# function_info ->
|
|
# caller1_timing_info -> caller1_function_info
|
|
# caller2_timing_info -> caller2_function_info
|
|
return self._parse_pstats_output(s.getvalue())
|
|
|
|
def get_callees(self, func_info: str) -> List[Tuple[str, str]]:
|
|
"""
|
|
Gets the list of functions called by the specified function.
|
|
|
|
Args:
|
|
func_info: The identifier string for the function from pstats.
|
|
|
|
Returns:
|
|
A list of tuples, where each tuple is (callee_timing_info, callee_function_info).
|
|
"""
|
|
if not self.stats:
|
|
return []
|
|
|
|
s = StringIO()
|
|
stats_to_print = pstats.Stats(self.profile_path, stream=s)
|
|
stats_to_print.strip_dirs()
|
|
stats_to_print.print_callees(func_info)
|
|
|
|
# The format is similar to print_callers
|
|
return self._parse_pstats_output(s.getvalue())
|
|
|
|
def run_and_profile_script(profile: LaunchProfile) -> Optional[str]:
|
|
"""Runs a target Python script/module under cProfile."""
|
|
|
|
# --- Output File Path Definition (prima di tutto) ---
|
|
output_dir = os.path.join(os.getcwd(), "execution_profiles")
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
profile_name_sanitized = profile.name.replace(' ', '_').replace('.', '_')
|
|
# **CORREZIONE**: Creare un percorso assoluto fin da subito
|
|
profile_output_path = os.path.abspath(os.path.join(output_dir, f"{timestamp}_{profile_name_sanitized}.prof"))
|
|
|
|
# --- Command and Working Directory Definition ---
|
|
command = [profile.python_interpreter, "-m", "cProfile", "-o", profile_output_path]
|
|
working_directory = None
|
|
|
|
if profile.run_as_module:
|
|
if not profile.module_name:
|
|
print("Error: Module name is required.")
|
|
return None
|
|
# target_path in module mode is the project root
|
|
if not profile.target_path or not os.path.isdir(profile.target_path):
|
|
print(f"Error: Project Root Folder is not a valid directory: {profile.target_path}")
|
|
return None
|
|
|
|
# **CORREZIONE**: la sintassi corretta è '-m' seguito dal nome del modulo
|
|
command.extend(["-m", profile.module_name])
|
|
working_directory = profile.target_path
|
|
else:
|
|
# **CORREZIONE**: target_path è il percorso dello script
|
|
if not profile.target_path or not os.path.exists(profile.target_path):
|
|
print(f"Error: Script path does not exist: {profile.target_path}")
|
|
return None
|
|
command.append(profile.target_path)
|
|
working_directory = os.path.dirname(profile.target_path)
|
|
|
|
if profile.script_args:
|
|
command.extend(profile.script_args.split())
|
|
|
|
try:
|
|
print(f"Executing profiling command: {' '.join(command)}")
|
|
print(f"Working Directory for subprocess: {working_directory}")
|
|
|
|
process = subprocess.Popen(
|
|
command,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
cwd=working_directory
|
|
)
|
|
stdout, stderr = process.communicate()
|
|
|
|
if stdout: print(f"--- STDOUT ---\n{stdout}")
|
|
if stderr: print(f"--- STDERR ---\n{stderr}")
|
|
|
|
if process.returncode != 0:
|
|
print(f"Warning: Profiled script exited with non-zero status: {process.returncode}")
|
|
|
|
if not os.path.exists(profile_output_path) or os.path.getsize(profile_output_path) == 0:
|
|
print("Error: Profiling failed and no output file was generated.")
|
|
return None
|
|
|
|
print(f"Profiling data saved to: {profile_output_path}")
|
|
return profile_output_path
|
|
|
|
except Exception as e:
|
|
print(f"Failed to run and profile script: {e}")
|
|
return None |