SXXXXXXX_ProfileAnalyzer/profileanalyzer/core/core.py
2025-06-23 14:25:58 +02:00

236 lines
8.9 KiB
Python

# profileAnalyzer/core/core.py
import pstats
from io import StringIO
import subprocess
import sys
import os
from dataclasses import dataclass, field
from typing import List, Optional, Tuple, Dict, Any
from datetime import datetime
import pstats
from pstats import Stats
# --- Dataclass (unchanged) ---
@dataclass
class LaunchProfile:
name: str
run_as_module: bool = False
target_path: str = ""
module_name: str = ""
script_args: str = ""
python_interpreter: str = sys.executable
# --- ProfileAnalyzer Class (Refactored) ---
class ProfileAnalyzer:
"""
Handles loading a profile data file and extracting statistics
by directly accessing the pstats data structures.
"""
def __init__(self):
self.stats: Optional[Stats] = None
self.profile_path: Optional[str] = None
# Cache for mapping function string representations to their tuple keys
self._func_info_map: Optional[Dict[str, Tuple]] = None
def load_profile(self, filepath: str) -> bool:
"""Loads and prepares the profile data."""
try:
self.stats = pstats.Stats(filepath)
self.stats.strip_dirs()
self.profile_path = filepath
# Invalidate cache on new file load
self._func_info_map = None
return True
except (FileNotFoundError, TypeError, OSError) as e:
print(f"Error loading profile file '{filepath}': {e}")
self.stats = None
self.profile_path = None
self._func_info_map = None
return False
def _get_func_info_map(self) -> Dict[str, Tuple]:
"""
Builds and caches a mapping from pstats' string representation
of a function to its internal tuple key.
"""
if self._func_info_map is None and self.stats:
self._func_info_map = {
pstats.func_std_string(func): func
for func in self.stats.stats
}
return self._func_info_map or {}
def _find_func_tuple_by_string(self, func_info_str: str) -> Optional[Tuple]:
"""Finds the internal function tuple key from its string representation."""
return self._get_func_info_map().get(func_info_str)
def get_stats(self, sort_by: str, limit: int = 200) -> List[Tuple]:
"""
Extracts statistics directly from the pstats object.
Args:
sort_by: The key to sort statistics by.
limit: The maximum number of records to return.
Returns:
A list of tuples, each containing stats for one function.
Format: (ncalls, tottime, percall_tottime, cumtime, percall_cumtime, function_details_str)
"""
if not self.stats:
return []
# Map frontend sort keys to internal pstats keys
sort_map = {
"cumulative": 3, # 'ct' (cumulative time)
"tottime": 2, # 'tt' (total time)
"ncalls": 1, # 'nc' (number of calls)
"filename": 4 # Special case, sort by formatted name
}
sort_key_index = sort_map.get(sort_by, 3) # Default to cumulative
all_stats = []
for func_tuple, (cc, nc, tt, ct, callers) in self.stats.stats.items():
func_str = pstats.func_std_string(func_tuple)
percall_tottime = tt / nc if nc > 0 else 0
percall_cumtime = ct / nc if nc > 0 else 0
all_stats.append({
'func_str': func_str,
'func_tuple': func_tuple,
'raw_stats': (cc, nc, tt, ct, percall_tottime, percall_cumtime)
})
# Perform sorting
if sort_by == 'filename':
# Sort alphabetically by the function string representation
all_stats.sort(key=lambda x: x['func_str'])
else:
# Sort numerically by the specified stat, in descending order
all_stats.sort(key=lambda x: x['raw_stats'][sort_key_index], reverse=True)
# Format final output list
results = []
for stat_item in all_stats[:limit]:
s = stat_item['raw_stats']
results.append((s[1], s[2], s[4], s[3], s[5], stat_item['func_str']))
return results
def get_callers(self, func_info: str) -> List[Tuple[str, str]]:
"""
Gets the list of functions that called the specified function.
"""
if not self.stats:
return []
target_func_tuple = self._find_func_tuple_by_string(func_info)
if not target_func_tuple or target_func_tuple not in self.stats.stats:
return []
# The 'callers' are stored directly in the stats for the target function
callers_data = self.stats.stats[target_func_tuple][4]
results = []
for caller_tuple, (cc, nc, tt, ct) in callers_data.items():
caller_info_str = f"{nc} call(s) in {ct:.4f}s"
caller_func_str = pstats.func_std_string(caller_tuple)
results.append((caller_info_str, caller_func_str))
results.sort(key=lambda x: x[1]) # Sort alphabetically by function name
return results
def get_callees(self, func_info: str) -> List[Tuple[str, str]]:
"""
Gets the list of functions called by the specified function.
"""
if not self.stats:
return []
target_func_tuple = self._find_func_tuple_by_string(func_info)
if not target_func_tuple:
return []
# To find callees, we must iterate through all functions and check
# who they were called BY.
callees = {}
for func_tuple, stats_tuple in self.stats.stats.items():
# stats_tuple[4] is the callers dictionary for 'func_tuple'
if target_func_tuple in stats_tuple[4]:
# This means 'target_func_tuple' called 'func_tuple'.
# So, 'func_tuple' is a callee of our target.
call_stats = stats_tuple[4][target_func_tuple]
callees[func_tuple] = call_stats
results = []
for callee_tuple, (cc, nc, tt, ct) in callees.items():
callee_info_str = f"{nc} call(s) for {ct:.4f}s"
callee_func_str = pstats.func_std_string(callee_tuple)
results.append((callee_info_str, callee_func_str))
results.sort(key=lambda x: x[1]) # Sort alphabetically by function name
return results
# --- run_and_profile_script function (unchanged) ---
def run_and_profile_script(profile: LaunchProfile) -> Optional[str]:
"""Runs a target Python script/module under cProfile."""
output_dir = os.path.join(os.getcwd(), "execution_profiles")
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
profile_name_sanitized = profile.name.replace(' ', '_').replace('.', '_')
profile_output_path = os.path.abspath(os.path.join(output_dir, f"{timestamp}_{profile_name_sanitized}.prof"))
command = [profile.python_interpreter, "-m", "cProfile", "-o", profile_output_path]
working_directory = None
if profile.run_as_module:
if not profile.module_name:
print("Error: Module name is required.")
return None
if not profile.target_path or not os.path.isdir(profile.target_path):
print(f"Error: Project Root Folder is not a valid directory: {profile.target_path}")
return None
command.extend(["-m", profile.module_name])
working_directory = profile.target_path
else:
if not profile.target_path or not os.path.exists(profile.target_path):
print(f"Error: Script path does not exist: {profile.target_path}")
return None
command.append(profile.target_path)
working_directory = os.path.dirname(profile.target_path)
if profile.script_args:
command.extend(profile.script_args.split())
try:
print(f"Executing profiling command: {' '.join(command)}")
print(f"Working Directory for subprocess: {working_directory}")
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=working_directory
)
stdout, stderr = process.communicate()
if stdout: print(f"--- STDOUT ---\n{stdout}")
if stderr: print(f"--- STDERR ---\n{stderr}")
if process.returncode != 0:
print(f"Warning: Profiled script exited with non-zero status: {process.returncode}")
if not os.path.exists(profile_output_path) or os.path.getsize(profile_output_path) == 0:
print("Error: Profiling failed and no output file was generated.")
return None
print(f"Profiling data saved to: {profile_output_path}")
return profile_output_path
except Exception as e:
print(f"Failed to run and profile script: {e}")
return None