# target_simulator/gui/analysis_window.py """ A Toplevel window for displaying real-time performance analysis, including error statistics and plots. """ import tkinter as tk from tkinter import ttk, messagebox from typing import Optional, Dict import json import os from target_simulator.analysis.performance_analyzer import PerformanceAnalyzer from target_simulator.analysis.simulation_state_hub import SimulationStateHub try: from matplotlib.figure import Figure from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg MATPLOTLIB_AVAILABLE = True except ImportError: MATPLOTLIB_AVAILABLE = False UPDATE_INTERVAL_MS = 1000 # Update analysis every second class AnalysisWindow(tk.Toplevel): """ A window that displays real-time analysis of tracking performance. """ def __init__(self, master, archive_filepath: str): super().__init__(master) self.title(f"Analysis for: {os.path.basename(archive_filepath)}") self.geometry("900x750") # State variables self.selected_target_id = tk.IntVar(value=0) self._active = True self._filtered_errors = None # Cache for spike-filtered errors used in statistics # Show loading window and process data in background self._show_loading_window(archive_filepath) def _load_data_and_setup(self, filepath: str): try: with open(filepath, "r", encoding="utf-8") as f: archive_data = json.load(f) except Exception as e: messagebox.showerror( "Loading Error", f"Could not load archive file.\n{e}", ) self.destroy() return # Extract estimated latency from metadata metadata = archive_data.get("metadata", {}) self.estimated_latency_ms = metadata.get("estimated_latency_ms") self.prediction_offset_ms = metadata.get("prediction_offset_ms") # Load latency samples (new format only: [[timestamp, latency_ms], ...]) latency_samples = metadata.get("latency_samples", []) if latency_samples and isinstance(latency_samples[0], list): self.latency_timestamps = [s[0] for s in latency_samples] self.latency_values_ms = [s[1] for s in latency_samples] else: # No valid latency data self.latency_timestamps = [] self.latency_values_ms = [] # Store performance samples for dedicated analysis window self.performance_samples = metadata.get("performance_samples", []) self.scenario_name = metadata.get("scenario_name", "Unknown") # Create a temporary hub with unlimited history size for analysis # The default history_size of 200 is too small for full simulation playback self._hub = SimulationStateHub(history_size=100000) results = archive_data.get("simulation_results", {}) for target_id_str, data in results.items(): target_id = int(target_id_str) for state in data.get("simulated", []): self._hub.add_simulated_state(target_id, state[0], tuple(state[1:])) for state in data.get("real", []): self._hub.add_real_state(target_id, state[0], tuple(state[1:])) # Create the analyzer with the populated hub self._analyzer = PerformanceAnalyzer(self._hub) def _show_loading_window(self, archive_filepath: str): """Show a loading dialog and load data asynchronously.""" # Create a modal loading dialog loading_dialog = tk.Toplevel(self) loading_dialog.title("Loading Analysis") loading_dialog.geometry("400x150") loading_dialog.transient(self) loading_dialog.grab_set() # Center the dialog loading_dialog.update_idletasks() x = self.winfo_x() + (self.winfo_width() // 2) - (loading_dialog.winfo_width() // 2) y = self.winfo_y() + (self.winfo_height() // 2) - (loading_dialog.winfo_height() // 2) loading_dialog.geometry(f"+{x}+{y}") # Add loading message and progress indicator ttk.Label( loading_dialog, text="Loading simulation data...", font=("Segoe UI", 11) ).pack(pady=(20, 10)) progress_label = ttk.Label( loading_dialog, text="Please wait", font=("Segoe UI", 9) ) progress_label.pack(pady=5) progress = ttk.Progressbar( loading_dialog, mode='indeterminate', length=300 ) progress.pack(pady=10) progress.start(10) # Schedule the actual loading to happen after the dialog is shown def load_and_display(): try: progress_label.config(text="Reading archive file...") self.update() # Load data self._load_data_and_setup(archive_filepath) progress_label.config(text="Creating widgets...") self.update() # Create widgets self._create_widgets() progress_label.config(text="Analyzing data...") self.update() # Populate analysis self._populate_analysis() # Close loading dialog loading_dialog.destroy() except Exception as e: loading_dialog.destroy() messagebox.showerror( "Analysis Error", f"Failed to load analysis:\n{e}", parent=self ) self.destroy() # Schedule loading after dialog is visible self.after(100, load_and_display) def _populate_analysis(self): """Esegue l'analisi e popola i widget una sola volta.""" self._update_target_selector() # Ora usa l'hub locale # Seleziona il primo target di default target_ids = self.target_selector["values"] if target_ids: self.selected_target_id.set(target_ids[0]) analysis_results = self._analyzer.analyze() sel_id = self.selected_target_id.get() if sel_id in analysis_results: self._update_stats_table(analysis_results[sel_id]) self._update_plot(sel_id) else: # Provide diagnostic information when analysis cannot be # produced for the selected target (common cause: no # overlapping timestamps between simulated and real samples). self._show_insufficient_data_info(sel_id) # Update the latency plot regardless of target selection self._update_latency_plot() def _create_widgets(self): main_pane = ttk.PanedWindow(self, orient=tk.VERTICAL) main_pane.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) # --- Top Frame for Stats Table --- stats_frame = ttk.LabelFrame(main_pane, text="Error Statistics (feet)") # Keep stats frame compact so the plot below has more space main_pane.add(stats_frame, weight=1) self._create_stats_widgets(stats_frame) # --- Bottom Frame for Plot --- plot_frame = ttk.LabelFrame(main_pane, text="Error Over Time (feet)") # Give the plot more vertical weight so it occupies most of the window main_pane.add(plot_frame, weight=4) self._create_plot_widgets(plot_frame) def _create_stats_widgets(self, parent): """Create the statistics widgets (table and explanatory legend). Args: parent (tk.Widget): Parent container where stats widgets will be placed. """ # Build a horizontal area: left = table, right = legend/explanations container = ttk.Frame(parent) container.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) left = ttk.Frame(container) left.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) right = ttk.Frame(container) right.pack(side=tk.RIGHT, fill=tk.Y) top_bar = ttk.Frame(left) top_bar.pack(fill=tk.X, padx=0, pady=(0, 6)) ttk.Label(top_bar, text="Select Target ID:").pack(side=tk.LEFT) self.target_selector = ttk.Combobox( top_bar, textvariable=self.selected_target_id, state="readonly", width=5 ) self.target_selector.pack(side=tk.LEFT, padx=5) self.target_selector.bind("<>", self._on_target_select) # Frame per le metriche di sincronizzazione sync_frame = ttk.Frame(top_bar) sync_frame.pack(side=tk.LEFT, padx=(20, 0)) if self.estimated_latency_ms is not None: ttk.Label(sync_frame, text="Avg. Latency:").pack(side=tk.LEFT) ttk.Label( sync_frame, text=f"{self.estimated_latency_ms:.1f} ms", font=("Segoe UI", 9, "bold"), foreground="blue", ).pack(side=tk.LEFT, padx=4) if self.prediction_offset_ms is not None: ttk.Label(sync_frame, text="Prediction Offset:").pack( side=tk.LEFT, padx=(10, 0) ) ttk.Label( sync_frame, text=f"{self.prediction_offset_ms:.1f} ms", font=("Segoe UI", 9, "bold"), foreground="green", ).pack(side=tk.LEFT, padx=4) # Button to open performance analysis window if self.performance_samples: perf_button = ttk.Button( sync_frame, text="Performance Analysis...", command=self._open_performance_window ) perf_button.pack(side=tk.LEFT, padx=(20, 0)) # Table with metrics as columns and error types as rows columns = ("error_type", "mean", "std_dev", "rmse") self.stats_tree = ttk.Treeview(left, columns=columns, show="headings", height=4) self.stats_tree.heading("error_type", text="") self.stats_tree.heading("mean", text="Mean (ft)") self.stats_tree.heading("std_dev", text="Std Dev (ft)") self.stats_tree.heading("rmse", text="RMSE (ft)") self.stats_tree.column("error_type", width=100, anchor=tk.W) self.stats_tree.column("mean", anchor=tk.E, width=100) self.stats_tree.column("std_dev", anchor=tk.E, width=100) self.stats_tree.column("rmse", anchor=tk.E, width=100) self.stats_tree.pack(fill=tk.BOTH, expand=True) # Right side: explanatory legend box (compact) legend_title = ttk.Label( right, text="How to Interpret Results:", font=(None, 9, "bold") ) legend_title.pack(anchor=tk.NW, padx=(6, 6), pady=(4, 4)) explanation_text = ( "Formula: Error = Real Position - Simulated Position\n\n" "Sign of Error (e.g., on X axis):\n" "• Positive Error (+): Real target is at a larger X coordinate.\n" " - If moving R -> L (X increases): Real is AHEAD.\n" " - If moving L -> R (X decreases): Real is BEHIND.\n\n" "• Negative Error (-): Real target is at a smaller X coordinate.\n" " - If moving R -> L (X increases): Real is BEHIND.\n" " - If moving L -> R (X decreases): Real is AHEAD.\n\n" "Prediction Offset:\n" "A manual offset to compensate for server processing delay, " "aiming to minimize the Mean Error." ) try: ttk.Label( right, text=explanation_text, justify=tk.LEFT, wraplength=280, ).pack(anchor=tk.NW, padx=(6, 6)) except Exception: pass def _create_plot_widgets(self, parent): """Create and configure the matplotlib Figure and canvas used for plotting. Args: parent (tk.Widget): Parent container where the plot canvas will be packed. """ fig = Figure(figsize=(5, 6), dpi=100) # Use GridSpec for aligned subplots with shared x-axis alignment gs = fig.add_gridspec(2, 1, height_ratios=[2, 1], hspace=0.35, top=0.95) # Top subplot: Instantaneous Error self.ax = fig.add_subplot(gs[0, 0]) self.ax.set_title("Instantaneous Error") self.ax.set_xlabel("Time (s)") self.ax.set_ylabel("Error (ft)") (self.line_x,) = self.ax.plot([], [], lw=2, label="Error X") (self.line_y,) = self.ax.plot([], [], lw=2, label="Error Y") (self.line_z,) = self.ax.plot([], [], lw=2, label="Error Z") # Place legend inside the plot area try: self.ax.grid(True) # horizontal zero line for reference self.ax.axhline(0.0, color="black", lw=1, linestyle="--", alpha=0.8) self.ax.legend(loc="upper right", fontsize=9) except Exception: pass # Bottom subplot: Latency over time - SHARE X-AXIS with top plot for synchronized zoom self.ax_latency = fig.add_subplot(gs[1, 0], sharex=self.ax) self.ax_latency.set_title("Latency Evolution") self.ax_latency.set_xlabel( "Time (s)" ) # Will be updated if no timestamps available self.ax_latency.set_ylabel("Latency (ms)") (self.line_latency,) = self.ax_latency.plot( [], [], lw=2, color="orange", label="Latency" ) try: self.ax_latency.grid(True) self.ax_latency.legend(loc="upper right", fontsize=9) except Exception: pass fig.tight_layout() # Create frame for toolbar and canvas plot_container = ttk.Frame(parent) plot_container.pack(fill=tk.BOTH, expand=True) # Add matplotlib navigation toolbar for zoom/pan functionality at the top from matplotlib.backends.backend_tkagg import NavigationToolbar2Tk toolbar_frame = ttk.Frame(plot_container) toolbar_frame.pack(side=tk.TOP, fill=tk.X) self.canvas = FigureCanvasTkAgg(fig, master=plot_container) toolbar = NavigationToolbar2Tk(self.canvas, toolbar_frame) toolbar.update() # Pack canvas after toolbar so it's below self.canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=True) self.canvas.draw() def _update_target_selector(self): """Refresh the target selector Combobox with IDs from the state hub. Only updates the combobox when the hub reports IDs; if no IDs are available the existing values are preserved to avoid flicker in the UI. """ # Only update the combobox values when the hub reports target ids. # This prevents the selector from being emptied when the hub is cleared # at the end of a simulation and allows the analysis window to continue # showing the last results until the next simulation starts. try: target_ids = sorted(self._hub.get_all_target_ids()) except Exception: target_ids = [] if target_ids: self.target_selector["values"] = target_ids if self.selected_target_id.get() not in target_ids: # If previous selection isn't available, pick the first available self.selected_target_id.set(target_ids[0]) else: # Do not overwrite existing combobox values when no targets are present. # This preserves the user's last view after a simulation ends. pass def _update_stats_table(self, results: Dict): """Populate the stats Treeview with aggregated error metrics. Uses filtered error data (excluding spikes) if available. Args: results (Dict): A mapping containing 'x', 'y', 'z' metrics and numerical aggregates such as mean, std_dev and rmse. """ self.stats_tree.delete(*self.stats_tree.get_children()) # Use filtered errors if available, otherwise fall back to analyzer results if hasattr(self, '_filtered_errors') and self._filtered_errors: import math # Compute statistics from filtered data for axis in ["x", "y", "z"]: errors = self._filtered_errors[axis] if errors: n = len(errors) mean = sum(errors) / n variance = sum((x - mean) ** 2 for x in errors) / n std_dev = math.sqrt(variance) rmse = math.sqrt(sum(x**2 for x in errors) / n) self.stats_tree.insert( "", "end", values=( f"Error {axis.upper()}", f"{mean:.3f}", f"{std_dev:.3f}", f"{rmse:.3f}", ), ) else: # No data after filtering self.stats_tree.insert( "", "end", values=( f"Error {axis.upper()}", "N/A", "N/A", "N/A", ), ) else: # Fallback to analyzer results (unfiltered) for axis in ["x", "y", "z"]: self.stats_tree.insert( "", "end", values=( f"Error {axis.upper()}", f"{results[axis]['mean']:.3f}", f"{results[axis]['std_dev']:.3f}", f"{results[axis]['rmse']:.3f}", ), ) # Add latency row if available if self.estimated_latency_ms is not None: # Calculate latency stats from samples if available if self.latency_values_ms: import statistics lat_mean = statistics.mean(self.latency_values_ms) lat_std = ( statistics.stdev(self.latency_values_ms) if len(self.latency_values_ms) > 1 else 0.0 ) lat_min = min(self.latency_values_ms) lat_max = max(self.latency_values_ms) self.stats_tree.insert( "", "end", values=( "Latency (ms)", f"{lat_mean:.2f}", f"{lat_std:.2f}", f"{lat_min:.2f} - {lat_max:.2f}", ), ) else: # Fallback to estimated latency only self.stats_tree.insert( "", "end", values=( "Latency (ms)", f"{self.estimated_latency_ms:.2f}", "N/A", "N/A", ), ) def _update_plot(self, target_id: int): """Update the matplotlib plot for the given target using hub history. Also computes and stores filtered errors (excluding outliers) for statistics. Args: target_id (int): The identifier of the target to plot errors for. """ history = self._hub.get_target_history(target_id) if not history or not history["real"] or len(history["simulated"]) < 2: self.line_x.set_data([], []) self.line_y.set_data([], []) self.line_z.set_data([], []) self.ax.relim() self.ax.autoscale_view() self.canvas.draw_idle() # Clear filtered data cache self._filtered_errors = None return times, errors_x, errors_y, errors_z = [], [], [], [] sim_hist = sorted(history["simulated"]) for real_state in history["real"]: real_ts, real_x, real_y, real_z = real_state p1, p2 = self._analyzer._find_bracketing_points(real_ts, sim_hist) if p1 and p2: interp_state = self._analyzer._interpolate(real_ts, p1, p2) _ts, interp_x, interp_y, interp_z = interp_state times.append(real_ts) errors_x.append(real_x - interp_x) errors_y.append(real_y - interp_y) errors_z.append(real_z - interp_z) if not times: self.line_x.set_data([], []) self.line_y.set_data([], []) self.line_z.set_data([], []) self.ax.relim() self.ax.autoscale_view() self.canvas.draw_idle() self._filtered_errors = None return # Filter initial transient/acquisition spikes # Use a threshold based on the median of stable data (after initial period) filtered_times, filtered_x, filtered_y, filtered_z = [], [], [], [] outlier_times, outlier_x, outlier_y, outlier_z = [], [], [], [] # Take a sample window after initial seconds to compute typical error magnitude min_time = min(times) sample_window_start = min_time + 5.0 # Skip first 5 seconds sample_window_end = min_time + 15.0 # Sample from 5s to 15s sample_errors = [] for i, t in enumerate(times): if sample_window_start <= t <= sample_window_end: err_magnitude = (errors_x[i]**2 + errors_y[i]**2 + errors_z[i]**2) ** 0.5 sample_errors.append(err_magnitude) # Compute threshold: median of sample + 10x (very permissive for normal errors) if sample_errors: import statistics median_err = statistics.median(sample_errors) # Use 20x median as threshold to catch only extreme outliers threshold = max(median_err * 20, 500.0) # At least 500 ft threshold else: # Fallback if no sample data available threshold = 1000.0 # Classify points as normal or outliers for i, t in enumerate(times): err_magnitude = (errors_x[i]**2 + errors_y[i]**2 + errors_z[i]**2) ** 0.5 if err_magnitude > threshold: outlier_times.append(t) outlier_x.append(errors_x[i]) outlier_y.append(errors_y[i]) outlier_z.append(errors_z[i]) else: filtered_times.append(t) filtered_x.append(errors_x[i]) filtered_y.append(errors_y[i]) filtered_z.append(errors_z[i]) # Store filtered errors for statistics computation self._filtered_errors = { 'x': filtered_x, 'y': filtered_y, 'z': filtered_z } # Plot filtered (normal) data self.line_x.set_data(filtered_times, filtered_x) self.line_y.set_data(filtered_times, filtered_y) self.line_z.set_data(filtered_times, filtered_z) # Add annotation if outliers were detected # Clear previous annotations for txt in getattr(self.ax, '_spike_annotations', []): txt.remove() self.ax._spike_annotations = [] if outlier_times: # Add text annotation about filtered spikes outlier_count = len(outlier_times) max_outlier_mag = max((outlier_x[i]**2 + outlier_y[i]**2 + outlier_z[i]**2)**0.5 for i in range(len(outlier_times))) annotation_text = (f"⚠ {outlier_count} acquisition spike(s) filtered\n" f"(max error: {max_outlier_mag:.0f} ft at t={outlier_times[0]:.1f}s)\n" f"Spikes excluded from statistics") txt = self.ax.text(0.02, 0.98, annotation_text, transform=self.ax.transAxes, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='yellow', alpha=0.7), fontsize=9) self.ax._spike_annotations = [txt] self.ax.relim() self.ax.autoscale_view() self.canvas.draw_idle() def _update_latency_plot(self): """Update the latency subplot with the latency samples from the archive. Plots latency measurements over time to show how latency evolved during the simulation, aligned with the error plot above. """ if not self.latency_values_ms or not self.latency_timestamps: # No latency data or invalid format - clear the plot self.line_latency.set_data([], []) self.ax_latency.relim() self.ax_latency.autoscale_view() self.canvas.draw_idle() return # Plot latencies - they are already filtered to simulation time range self.line_latency.set_data(self.latency_timestamps, self.latency_values_ms) self.ax_latency.relim() self.ax_latency.autoscale_view() self.canvas.draw_idle() def _update_performance_plot(self): """Update the performance subplot with processing time breakdown. Plots packet processing times to identify bottlenecks. """ if not self.ax_perf or not self.perf_timestamps: return # Plot each component self.line_perf_hub.set_data(self.perf_timestamps, self.perf_hub_ms) self.line_perf_archive.set_data(self.perf_timestamps, self.perf_archive_ms) self.line_perf_listener.set_data(self.perf_timestamps, self.perf_listener_ms) self.line_perf_parse.set_data(self.perf_timestamps, self.perf_parse_ms) self.line_perf_clock.set_data(self.perf_timestamps, self.perf_clock_ms) self.ax_perf.relim() self.ax_perf.autoscale_view() self.canvas.draw_idle() def _clear_views(self): """Clear statistics and plots, leaving the UI in an empty state. This is used as a safe fallback when no data is available or when an error occurs while generating analysis results. """ self.stats_tree.delete(*self.stats_tree.get_children()) self.line_x.set_data([], []) self.line_y.set_data([], []) self.line_z.set_data([], []) self.line_latency.set_data([], []) self.ax.relim() self.ax.autoscale_view() self.ax_latency.relim() self.ax_latency.autoscale_view() self.canvas.draw_idle() def _show_insufficient_data_info(self, target_id: int): """Display helpful information in the stats table when a target cannot be analyzed (for example because simulated and real time ranges do not overlap). This avoids an empty UI and gives the user actionable context. """ try: # Clear previous contents self.stats_tree.delete(*self.stats_tree.get_children()) history = self._hub.get_target_history(target_id) if history is None: self.stats_tree.insert( "", "end", values=("Info", "Target not found", "", "") ) self._clear_views() return sim_times = [s[0] for s in history.get("simulated", [])] real_times = [r[0] for r in history.get("real", [])] sim_count = len(sim_times) real_count = len(real_times) sim_range = (min(sim_times), max(sim_times)) if sim_times else (None, None) real_range = ( (min(real_times), max(real_times)) if real_times else (None, None) ) # populate the small table with human-readable diagnostic rows self.stats_tree.insert( "", "end", values=("Info", f"Target {target_id}", "", ""), ) self.stats_tree.insert( "", "end", values=("Sim samples", str(sim_count), "", ""), ) self.stats_tree.insert( "", "end", values=("Sim time range", f"{sim_range[0]} -> {sim_range[1]}", "", ""), ) self.stats_tree.insert( "", "end", values=("Real samples", str(real_count), "", ""), ) self.stats_tree.insert( "", "end", values=( "Real time range", f"{real_range[0]} -> {real_range[1]}", "", "", ), ) # keep plot cleared self.line_x.set_data([], []) self.line_y.set_data([], []) self.line_z.set_data([], []) self.ax.relim() self.ax.autoscale_view() self.canvas.draw_idle() except Exception: # Fail silently to avoid breaking the analysis window; show # the cleared view as a fallback. self._clear_views() def _on_close(self): """Handle the window close event by marking the window inactive and destroying it. """ self._active = False self.destroy() def _open_performance_window(self): """Open the dedicated performance analysis window.""" try: from target_simulator.gui.performance_analysis_window import PerformanceAnalysisWindow perf_window = PerformanceAnalysisWindow( parent=self, performance_samples=self.performance_samples, scenario_name=self.scenario_name ) except Exception as e: messagebox.showerror( "Performance Analysis Error", f"Failed to open performance analysis:\n{e}", parent=self ) def _on_target_select(self, event=None): """Handle combobox selection changes and update stats/plot.""" try: sel = self.selected_target_id.get() analysis_results = self._analyzer.analyze() if sel in analysis_results: self._update_stats_table(analysis_results[sel]) self._update_plot(sel) else: self._clear_views() # Latency plot doesn't depend on target selection self._update_latency_plot() except Exception: pass