# profileAnalyzer/core/core.py import pstats from io import StringIO import subprocess import sys import os from dataclasses import dataclass from typing import List, Optional, Tuple, Dict, Any from datetime import datetime from pstats import Stats import tempfile from functools import lru_cache try: import graphviz GRAPHVIZ_AVAILABLE = True except ImportError: GRAPHVIZ_AVAILABLE = False # --- Utility Function (unchanged) --- @lru_cache(maxsize=None) def is_graphviz_installed() -> bool: if not GRAPHVIZ_AVAILABLE: return False try: graphviz.Digraph().pipe() return True except graphviz.backend.ExecutableNotFound: print("INFO: Graphviz executable not found. Graph generation will be disabled.") return False # --- Dataclass (unchanged) --- @dataclass class LaunchProfile: name: str run_as_module: bool = False target_path: str = "" module_name: str = "" script_args: str = "" python_interpreter: str = sys.executable # --- ProfileAnalyzer Class (Updated) --- class ProfileAnalyzer: """ Handles loading, analyzing, and comparing profile data. """ def __init__(self): self.stats: Optional[Stats] = None self.profile_path: Optional[str] = None self._func_info_map: Optional[Dict[str, Tuple]] = None def load_profile(self, filepath: str) -> Optional[Stats]: """ Loads a profile file and returns the Stats object. If this instance is used for single analysis, it also sets self.stats. """ try: stats_obj = pstats.Stats(filepath) stats_obj.strip_dirs() # For single analysis mode, update instance state self.stats = stats_obj self.profile_path = filepath self._func_info_map = None # Invalidate cache return stats_obj except (FileNotFoundError, TypeError, OSError) as e: print(f"Error loading profile file '{filepath}': {e}") self.stats = None self.profile_path = None self._func_info_map = None return None def compare_stats(self, stats_base: Stats, stats_comparison: Stats) -> List[Dict]: """ Compares two pstats.Stats objects and returns the differences. Args: stats_base: The baseline stats object. stats_comparison: The comparison stats object to check against the baseline. Returns: A list of dictionaries, where each dictionary contains stats for a function from the comparison profile plus the deltas. """ base = stats_base.stats comp = stats_comparison.stats # Get a set of all function keys from both profiles all_funcs = set(base.keys()) | set(comp.keys()) comparison_results = [] for func_tuple in all_funcs: # Get stats, defaulting to zero-tuple if function not in profile base_stats = base.get(func_tuple, (0, 0, 0, 0, {})) comp_stats = comp.get(func_tuple, (0, 0, 0, 0, {})) # Unpack stats for comparison profile cc, nc, tt, ct, _ = comp_stats # Calculate deltas delta_nc = nc - base_stats[1] delta_tt = tt - base_stats[2] delta_ct = ct - base_stats[3] # Calculate per-call stats for the comparison profile percall_tottime = tt / nc if nc > 0 else 0 percall_cumtime = ct / nc if nc > 0 else 0 comparison_results.append({ 'func_str': pstats.func_std_string(func_tuple), 'ncalls': nc, 'tottime': tt, 'percall_tottime': percall_tottime, 'cumtime': ct, 'percall_cumtime': percall_cumtime, 'delta_ncalls': delta_nc, 'delta_tottime': delta_tt, 'delta_cumtime': delta_ct }) return comparison_results # --- All other methods remain the same --- def _get_func_info_map(self) -> Dict[str, Tuple]: if self._func_info_map is None and self.stats: self._func_info_map = { pstats.func_std_string(func): func for func in self.stats.stats } return self._func_info_map or {} def _find_func_tuple_by_string(self, func_info_str: str) -> Optional[Tuple]: return self._get_func_info_map().get(func_info_str) def get_stats(self, sort_by: str, limit: int = 500) -> List[Tuple]: """ Extracts statistics, now including the percentage of total time. Returns: A list of tuples in the format: (ncalls, tottime, percentage_tottime, percall_tottime, cumtime, percall_cumtime, function_details_str) """ if not self.stats: return [] # Map frontend sort keys to internal pstats keys or custom keys sort_map = { "cumulative": "cumtime", "tottime": "tottime", "ncalls": "ncalls", "filename": "func_str" } sort_key = sort_map.get(sort_by, "cumtime") all_stats = [] total_tt = self.stats.total_tt or 1 # Avoid division by zero for func_tuple, (cc, nc, tt, ct, callers) in self.stats.stats.items(): percall_tottime = tt / nc if nc > 0 else 0 percall_cumtime = ct / nc if nc > 0 else 0 percentage_tottime = (tt / total_tt) * 100 all_stats.append({ 'func_str': pstats.func_std_string(func_tuple), 'ncalls': nc, 'tottime': tt, 'cumtime': ct, 'percall_tottime': percall_tottime, 'percall_cumtime': percall_cumtime, 'percentage_tottime': percentage_tottime }) # Perform sorting based on the dictionary keys all_stats.sort(key=lambda x: x[sort_key], reverse=(sort_key != 'func_str')) # Format final output list results = [] for stat_item in all_stats[:limit]: results.append(( stat_item['ncalls'], stat_item['tottime'], stat_item['percentage_tottime'], stat_item['percall_tottime'], stat_item['cumtime'], stat_item['percall_cumtime'], stat_item['func_str'] )) return results def get_callers(self, func_info: str) -> List[Tuple[str, str]]: if not self.stats: return [] target_func_tuple = self._find_func_tuple_by_string(func_info) if not target_func_tuple or target_func_tuple not in self.stats.stats: return [] callers_data = self.stats.stats[target_func_tuple][4] results = [] for caller_tuple, (cc, nc, tt, ct) in callers_data.items(): caller_info_str = f"{nc} call(s) in {ct:.4f}s" caller_func_str = pstats.func_std_string(caller_tuple) results.append((caller_info_str, caller_func_str)) results.sort(key=lambda x: x[1]) return results def get_callees(self, func_info: str) -> List[Tuple[str, str]]: if not self.stats: return [] target_func_tuple = self._find_func_tuple_by_string(func_info) if not target_func_tuple: return [] callees = {} for func_tuple, stats_tuple in self.stats.stats.items(): if target_func_tuple in stats_tuple[4]: call_stats = stats_tuple[4][target_func_tuple] callees[func_tuple] = call_stats results = [] for callee_tuple, (cc, nc, tt, ct) in callees.items(): callee_info_str = f"{nc} call(s) for {ct:.4f}s" callee_func_str = pstats.func_std_string(callee_tuple) results.append((callee_info_str, callee_func_str)) results.sort(key=lambda x: x[1]) return results def _get_color_for_time(self, time: float, max_time: float) -> str: if max_time == 0: return "#90EE90" ratio = min(time / max_time, 1.0) red = int(255 * min(ratio * 2, 1.0)) green = int(255 * min((1 - ratio) * 2, 1.0)) return f"#{red:02x}{green:02x}00" def generate_call_graph(self, threshold: float = 0.01) -> Optional[str]: if not self.stats or not self.stats.stats or not is_graphviz_installed(): return None total_time = self.stats.total_tt if total_time == 0: return None max_tottime = max(s[2] for s in self.stats.stats.values()) if self.stats.stats else 0 graph = graphviz.Digraph(comment='Call Graph', graph_attr={'rankdir': 'LR', 'splines': 'true', 'overlap': 'false'}, node_attr={'shape': 'box', 'style': 'filled'}) nodes_in_graph = set() for func_tuple, (cc, nc, tt, ct, callers) in self.stats.stats.items(): if tt < total_time * threshold: continue func_str = pstats.func_std_string(func_tuple) nodes_in_graph.add(func_tuple) node_label = f"{func_str}\n(tottime: {tt:.4f}s)" node_color = self._get_color_for_time(tt, max_tottime) graph.node(name=func_str, label=node_label, fillcolor=node_color) for caller_tuple, call_stats in callers.items(): caller_tottime = self.stats.stats.get(caller_tuple, (0,0,0,0))[2] if caller_tottime >= total_time * threshold: caller_str = pstats.func_std_string(caller_tuple) graph.edge(caller_str, func_str, label=f"{call_stats[1]} calls") if not nodes_in_graph: print("No significant functions to graph based on the threshold.") return None try: output_path = tempfile.NamedTemporaryFile(suffix=".png", delete=False).name graph.render(outfile=output_path, format='png', view=False, cleanup=True) return output_path except Exception as e: print(f"Failed to render graph: {e}") return None # --- run_and_profile_script function (unchanged) --- def run_and_profile_script(profile: LaunchProfile) -> Optional[str]: # ... (code for this function remains identical) output_dir = os.path.join(os.getcwd(), "execution_profiles") os.makedirs(output_dir, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") profile_name_sanitized = profile.name.replace(' ', '_').replace('.', '_') profile_output_path = os.path.abspath(os.path.join(output_dir, f"{timestamp}_{profile_name_sanitized}.prof")) command = [profile.python_interpreter, "-m", "cProfile", "-o", profile_output_path] working_directory = None if profile.run_as_module: if not profile.module_name: print("Error: Module name is required.") return None if not profile.target_path or not os.path.isdir(profile.target_path): print(f"Error: Project Root Folder is not a valid directory: {profile.target_path}") return None command.extend(["-m", profile.module_name]) working_directory = profile.target_path else: if not profile.target_path or not os.path.exists(profile.target_path): print(f"Error: Script path does not exist: {profile.target_path}") return None command.append(profile.target_path) working_directory = os.path.dirname(profile.target_path) if profile.script_args: command.extend(profile.script_args.split()) try: print(f"Executing profiling command: {' '.join(command)}") print(f"Working Directory for subprocess: {working_directory}") process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, cwd=working_directory) stdout, stderr = process.communicate() if stdout: print(f"--- STDOUT ---\n{stdout}") if stderr: print(f"--- STDERR ---\n{stderr}") if process.returncode != 0: print(f"Warning: Profiled script exited with non-zero status: {process.returncode}") if not os.path.exists(profile_output_path) or os.path.getsize(profile_output_path) == 0: print("Error: Profiling failed and no output file was generated.") return None print(f"Profiling data saved to: {profile_output_path}") return profile_output_path except Exception as e: print(f"Failed to run and profile script: {e}") return None