# target_simulator/gui/performance_analysis_window.py """ Performance Analysis Window for detailed packet processing diagnostics. This window provides in-depth visualization of packet processing performance including timing breakdowns, spike detection, and statistical analysis. """ import tkinter as tk from tkinter import ttk, messagebox import logging import csv import warnings from typing import Optional, Dict, Any, List from matplotlib.figure import Figure from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk import statistics logger = logging.getLogger(__name__) class PerformanceAnalysisWindow(tk.Toplevel): """ Dedicated window for analyzing packet processing performance data from a CSV file. Displays: - Time-series plot of component processing times - Statistical summary table - Distribution histogram """ def __init__(self, parent, performance_csv_path: str): """ Initialize the performance analysis window. Args: parent: Parent Tkinter window performance_csv_path: Path to the .perf.csv file to analyze. """ super().__init__(parent) self.performance_csv_path = performance_csv_path self.performance_samples: List[Dict[str, Any]] = [] self.metadata: Dict[str, str] = {} self.scenario_name = "Unknown" self.title("Performance Analysis") self.geometry("1200x900") # UI State Variables self.show_threshold_var = tk.BooleanVar(value=True) self._show_loading_and_create_widgets() def _load_data_from_csv(self): """Load performance samples and metadata from the specified CSV file.""" try: with open(self.performance_csv_path, "r", encoding="utf-8") as f: # Read metadata from commented lines for line in f: if line.startswith("#"): try: key, value = line.strip("# ").strip().split(":", 1) self.metadata[key.strip()] = value.strip() except ValueError: continue else: break f.seek(0) csv_content = [line for line in f if not line.startswith("#")] if not csv_content: raise ValueError("CSV file contains no data rows.") reader = csv.DictReader(csv_content) self.performance_samples = [] for row in reader: sample = {key: float(value) for key, value in row.items() if value} self.performance_samples.append(sample) self.scenario_name = self.metadata.get("Scenario Name", "Unknown") self.title(f"Performance Analysis - {self.scenario_name}") except Exception as e: raise IOError(f"Failed to read or parse performance CSV file:\n{e}") def _extract_data_arrays(self): """Extract data arrays from the loaded performance samples.""" if not self.performance_samples: self.timestamps = [] self.total_ms = [] self.parse_ms = [] self.hub_ms = [] self.archive_ms = [] self.listener_ms = [] self.clock_ms = [] return self.timestamps = [s.get('timestamp', 0.0) for s in self.performance_samples] self.total_ms = [s.get('total_ms', 0.0) for s in self.performance_samples] self.parse_ms = [s.get('parse_ms', 0.0) for s in self.performance_samples] self.hub_ms = [s.get('hub_ms', 0.0) for s in self.performance_samples] self.archive_ms = [s.get('archive_ms', 0.0) for s in self.performance_samples] self.listener_ms = [s.get('listener_ms', 0.0) for s in self.performance_samples] self.clock_ms = [s.get('clock_ms', 0.0) for s in self.performance_samples] def _show_loading_and_create_widgets(self): """Show loading dialog and create widgets asynchronously.""" loading_dialog = tk.Toplevel(self) loading_dialog.title("Loading Performance Data") loading_dialog.geometry("350x120") loading_dialog.transient(self) loading_dialog.grab_set() loading_dialog.update_idletasks() x = self.winfo_x() + (self.winfo_width() // 2) - (loading_dialog.winfo_width() // 2) y = self.winfo_y() + (self.winfo_height() // 2) - (loading_dialog.winfo_height() // 2) loading_dialog.geometry(f"+{x}+{y}") label_text = tk.StringVar(value="Loading performance data...") ttk.Label(loading_dialog, textvariable=label_text, font=("Segoe UI", 10)).pack(pady=20) progress_label = ttk.Label(loading_dialog, text="Please wait...") progress_label.pack(pady=5) progress_bar = ttk.Progressbar(loading_dialog, mode='indeterminate', length=300) progress_bar.pack(pady=10) progress_bar.start(10) def load_and_display(): try: progress_label.config(text="Reading performance file...") self.update() self._load_data_from_csv() label_text.set(f"Processing {len(self.performance_samples):,} samples...") progress_label.config(text="Extracting data arrays...") self.update() self._extract_data_arrays() progress_label.config(text="Computing statistics...") self.update() self._compute_statistics() progress_label.config(text="Creating widgets...") self.update() self._create_widgets() progress_label.config(text="Rendering plots...") self.update() self._populate_plots() loading_dialog.destroy() except Exception as e: loading_dialog.destroy() messagebox.showerror("Performance Analysis Error", f"An error occurred:\n{e}", parent=self) self.destroy() self.after(100, load_and_display) def _compute_statistics(self): """Compute statistical metrics from performance data.""" if not self.total_ms: self.stats = {} return self.stats = { 'total_samples': len(self.total_ms), 'total': { 'mean': statistics.mean(self.total_ms), 'median': statistics.median(self.total_ms), 'stdev': statistics.stdev(self.total_ms) if len(self.total_ms) > 1 else 0.0, 'min': min(self.total_ms), 'max': max(self.total_ms), 'p95': self._percentile(self.total_ms, 95), 'p99': self._percentile(self.total_ms, 99), }, 'parse': {'mean': statistics.mean(self.parse_ms), 'max': max(self.parse_ms)}, 'hub': {'mean': statistics.mean(self.hub_ms), 'max': max(self.hub_ms)}, 'archive': {'mean': statistics.mean(self.archive_ms), 'max': max(self.archive_ms)}, 'listener': {'mean': statistics.mean(self.listener_ms), 'max': max(self.listener_ms)}, 'clock': {'mean': statistics.mean(self.clock_ms), 'max': max(self.clock_ms)}, } self.stats['spike_count'] = sum(1 for t in self.total_ms if t > 100) self.stats['spike_percentage'] = (self.stats['spike_count'] / len(self.total_ms) * 100) if self.total_ms else 0.0 if self.stats['total']['max'] > 0: max_idx = self.total_ms.index(self.stats['total']['max']) components = { 'parse': self.parse_ms[max_idx], 'hub': self.hub_ms[max_idx], 'archive': self.archive_ms[max_idx], 'listener': self.listener_ms[max_idx], 'clock': self.clock_ms[max_idx], } self.stats['max_component'] = max(components, key=components.get) else: self.stats['max_component'] = "N/A" def _percentile(self, data: List[float], p: float) -> float: """Calculate percentile of data.""" if not data: return 0.0 sorted_data = sorted(data) k = (len(sorted_data) - 1) * (p / 100.0) f = int(k) c = f + 1 if c >= len(sorted_data): return sorted_data[-1] return sorted_data[f] + (sorted_data[c] - sorted_data[f]) * (k - f) def _create_widgets(self): """Create the UI widgets.""" main_pane = ttk.PanedWindow(self, orient=tk.VERTICAL) main_pane.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) top_container = ttk.Frame(main_pane) main_pane.add(top_container, weight=1) # Configure grid per dividere esattamente in due top_container.columnconfigure(0, weight=1, uniform="half") top_container.columnconfigure(1, weight=1, uniform="half") top_container.rowconfigure(0, weight=1) stats_frame = ttk.LabelFrame(top_container, text="Performance Statistics", padding=10) stats_frame.grid(row=0, column=0, sticky="nsew", padx=(0, 5)) self._create_stats_table(stats_frame) self._create_info_panel(top_container) plots_frame = ttk.Frame(main_pane) main_pane.add(plots_frame, weight=4) self._create_plots(plots_frame) def _create_info_panel(self, parent): """Create an informational panel explaining the metrics.""" info_frame = ttk.LabelFrame(parent, text="ℹ About Performance Analysis", padding=10) info_frame.grid(row=0, column=1, sticky="nsew", padx=(5, 0)) # Simplified text to fit without scrolling info_text = ( "Packet processing time analysis.\n" "📊 Components:\n" "• Parse: Decode SFP payload\n" "• Hub: Update SimulationStateHub\n" "• Archive: Save data to file\n" "• Listener: Broadcast to GUI\n" "• Clock: Sync timestamps\n" "⚠ Spikes (>100ms):\n" "Slowdowns from GC, disk I/O,\n" "or lock contention.\n" "🎯 Bottleneck:\n" "Slowest component in worst event." ) info_label = ttk.Label(info_frame, text=info_text, justify=tk.LEFT, font=("Segoe UI", 9)) info_label.pack(anchor=tk.W, fill=tk.BOTH, expand=True) def _create_stats_table(self, parent): """Create the statistics table.""" tree_frame = ttk.Frame(parent) tree_frame.pack(fill=tk.BOTH, expand=True) scrollbar = ttk.Scrollbar(tree_frame) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) columns = ("Metric", "Value", "Details") self.stats_tree = ttk.Treeview(tree_frame, columns=columns, show="headings", height=12, yscrollcommand=scrollbar.set) scrollbar.config(command=self.stats_tree.yview) self.stats_tree.heading("Metric", text="Metric") self.stats_tree.heading("Value", text="Value") self.stats_tree.heading("Details", text="Details") self.stats_tree.column("Metric", width=160, anchor='w') self.stats_tree.column("Value", width=150, anchor='e') self.stats_tree.column("Details", width=300, anchor='w') self.stats_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) self._populate_stats_table() def _populate_stats_table(self): """Populate the statistics table with computed metrics.""" if not self.stats: self.stats_tree.insert("", "end", values=("No Data", "N/A", "")) return # ... (population logic is unchanged, remains correct) ... self.stats_tree.insert("", "end", values=("Total Samples", f"{self.stats['total_samples']:,}", "")) self.stats_tree.insert("", "end", values=("Spikes (>100ms)", f"{self.stats['spike_count']:,}", f"{self.stats['spike_percentage']:.2f}% of packets")) self.stats_tree.insert("", "end", values=("", "", "")) self.stats_tree.insert("", "end", values=("Total Processing Time", "", ""), tags=("header",)) total = self.stats['total'] self.stats_tree.insert("", "end", values=(" Mean", f"{total['mean']:.3f} ms", "")) self.stats_tree.insert("", "end", values=(" Median", f"{total['median']:.3f} ms", "")) self.stats_tree.insert("", "end", values=(" Std Dev", f"{total['stdev']:.3f} ms", "")) self.stats_tree.insert("", "end", values=(" Min / Max", f"{total['min']:.3f} / {total['max']:.1f} ms", "")) self.stats_tree.insert("", "end", values=(" 95th Percentile", f"{total['p95']:.3f} ms", "")) self.stats_tree.insert("", "end", values=(" 99th Percentile", f"{total['p99']:.3f} ms", "")) self.stats_tree.insert("", "end", values=("", "", "")) self.stats_tree.insert("", "end", values=("Component Breakdown", "", ""), tags=("header",)) for comp_name in ['parse', 'hub', 'archive', 'listener', 'clock']: if comp_name in self.stats: comp = self.stats[comp_name] bottleneck = " ⚠ BOTTLENECK" if comp_name == self.stats['max_component'] else "" self.stats_tree.insert("", "end", values=(f" {comp_name.capitalize()}", f"{comp['mean']:.3f} ms", f"Max: {comp['max']:.1f} ms{bottleneck}")) self.stats_tree.tag_configure("header", font=("Segoe UI", 9, "bold")) def _create_plots(self, parent): """Create matplotlib plots and controls.""" self.fig = Figure(figsize=(10, 8), dpi=100) gs = self.fig.add_gridspec(2, 1, height_ratios=[2, 1], hspace=0.35, top=0.95) self.ax_timeseries = self.fig.add_subplot(gs[0, 0]) self.ax_histogram = self.fig.add_subplot(gs[1, 0]) canvas_frame = ttk.Frame(parent) canvas_frame.pack(fill=tk.BOTH, expand=True) toolbar_frame = ttk.Frame(canvas_frame) toolbar_frame.pack(side=tk.TOP, fill=tk.X) self.canvas = FigureCanvasTkAgg(self.fig, master=canvas_frame) toolbar = NavigationToolbar2Tk(self.canvas, toolbar_frame) # Add the checkbox to the toolbar threshold_check = ttk.Checkbutton( toolbar, text="Show 100ms Threshold", variable=self.show_threshold_var, command=self._populate_plots # Redraw plot on change ) threshold_check.pack(side=tk.RIGHT, padx=5) toolbar.update() self.canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=True) def _populate_plots(self): """Populate the plots with performance data.""" if not self.timestamps: self.canvas.draw() return # Time series plot self.ax_timeseries.clear() self.ax_timeseries.set_title("Packet Processing Time Over Simulation") self.ax_timeseries.set_xlabel("Time (s)") self.ax_timeseries.set_ylabel("Processing Time (ms)") self.ax_timeseries.plot(self.timestamps, self.hub_ms, label='Hub', lw=1.5, alpha=0.8) self.ax_timeseries.plot(self.timestamps, self.archive_ms, label='Archive', lw=1.5, alpha=0.8) self.ax_timeseries.plot(self.timestamps, self.listener_ms, label='Listener', lw=1.5, alpha=0.8) self.ax_timeseries.plot(self.timestamps, self.parse_ms, label='Parse', lw=1, alpha=0.7) self.ax_timeseries.plot(self.timestamps, self.clock_ms, label='Clock', lw=1, alpha=0.7) if self.show_threshold_var.get(): self.ax_timeseries.axhline(y=100, color='r', linestyle='--', lw=1, alpha=0.5, label='100ms Threshold') self.ax_timeseries.legend(loc='upper right', fontsize=9) self.ax_timeseries.grid(True, alpha=0.3) # Let Matplotlib handle autoscaling. It will use a linear scale for small values # and naturally adjust to larger values if spikes are present. self.ax_timeseries.relim() self.ax_timeseries.autoscale_view(True, True, True) # Histogram self.ax_histogram.clear() self.ax_histogram.set_title("Processing Time Distribution") self.ax_histogram.set_xlabel("Processing Time (ms)") self.ax_histogram.set_ylabel("Frequency") normal_times = [t for t in self.total_ms if t <= 100] spike_times = [t for t in self.total_ms if t > 100] if normal_times: self.ax_histogram.hist(normal_times, bins=50, alpha=0.7, label=f'Normal ({len(normal_times)})') if spike_times: self.ax_histogram.hist(spike_times, bins=20, alpha=0.7, label=f'Spikes ({len(spike_times)})') if normal_times or spike_times: self.ax_histogram.legend(loc='upper right', fontsize=9) self.ax_histogram.grid(True, alpha=0.3) with warnings.catch_warnings(): warnings.simplefilter("ignore", UserWarning) self.fig.tight_layout() self.canvas.draw()