# target_simulator/gui/analysis_window.py """ A Toplevel window for displaying real-time performance analysis, including error statistics and plots. """ import tkinter as tk from tkinter import ttk, messagebox from typing import Optional, Dict import json import os from target_simulator.analysis.performance_analyzer import PerformanceAnalyzer from target_simulator.analysis.simulation_state_hub import SimulationStateHub try: from matplotlib.figure import Figure from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg MATPLOTLIB_AVAILABLE = True except ImportError: MATPLOTLIB_AVAILABLE = False UPDATE_INTERVAL_MS = 1000 # Update analysis every second class AnalysisWindow(tk.Toplevel): """ A window that displays real-time analysis of tracking performance. """ def __init__(self, master, archive_filepath: str): super().__init__(master) self.title(f"Analysis for: {os.path.basename(archive_filepath)}") self.geometry("800x600") # State variables self.selected_target_id = tk.IntVar(value=0) self._active = True # Carica i dati e inizializza l'analizzatore self._load_data_and_setup(archive_filepath) # ... il resto del codice di creazione widget rimane simile ... self._create_widgets() # Non c'è più un loop, ma un singolo aggiornamento self._populate_analysis() def _load_data_and_setup(self, filepath: str): try: with open(filepath, "r", encoding="utf-8") as f: archive_data = json.load(f) except Exception as e: messagebox.showerror( "Loading Error", f"Could not load archive file.\n{e}", ) self.destroy() return # Extract estimated latency from metadata metadata = archive_data.get("metadata", {}) self.estimated_latency_ms = metadata.get("estimated_latency_ms") self.prediction_offset_ms = metadata.get("prediction_offset_ms") # Create a temporary hub and populate it with historical data self._hub = SimulationStateHub() results = archive_data.get("simulation_results", {}) for target_id_str, data in results.items(): target_id = int(target_id_str) for state in data.get("simulated", []): self._hub.add_simulated_state(target_id, state[0], tuple(state[1:])) for state in data.get("real", []): self._hub.add_real_state(target_id, state[0], tuple(state[1:])) # Create the analyzer with the populated hub self._analyzer = PerformanceAnalyzer(self._hub) def _populate_analysis(self): """Esegue l'analisi e popola i widget una sola volta.""" self._update_target_selector() # Ora usa l'hub locale # Seleziona il primo target di default target_ids = self.target_selector["values"] if target_ids: self.selected_target_id.set(target_ids[0]) analysis_results = self._analyzer.analyze() sel_id = self.selected_target_id.get() if sel_id in analysis_results: self._update_stats_table(analysis_results[sel_id]) self._update_plot(sel_id) else: # Provide diagnostic information when analysis cannot be # produced for the selected target (common cause: no # overlapping timestamps between simulated and real samples). self._show_insufficient_data_info(sel_id) def _create_widgets(self): main_pane = ttk.PanedWindow(self, orient=tk.VERTICAL) main_pane.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) # --- Top Frame for Stats Table --- stats_frame = ttk.LabelFrame(main_pane, text="Error Statistics (feet)") # Keep stats frame compact so the plot below has more space main_pane.add(stats_frame, weight=1) self._create_stats_widgets(stats_frame) # --- Bottom Frame for Plot --- plot_frame = ttk.LabelFrame(main_pane, text="Error Over Time (feet)") # Give the plot more vertical weight so it occupies most of the window main_pane.add(plot_frame, weight=4) self._create_plot_widgets(plot_frame) def _create_stats_widgets(self, parent): """Create the statistics widgets (table and explanatory legend). Args: parent (tk.Widget): Parent container where stats widgets will be placed. """ # Build a horizontal area: left = table, right = legend/explanations container = ttk.Frame(parent) container.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) left = ttk.Frame(container) left.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) right = ttk.Frame(container) right.pack(side=tk.RIGHT, fill=tk.Y) top_bar = ttk.Frame(left) top_bar.pack(fill=tk.X, padx=0, pady=(0, 6)) ttk.Label(top_bar, text="Select Target ID:").pack(side=tk.LEFT) self.target_selector = ttk.Combobox( top_bar, textvariable=self.selected_target_id, state="readonly", width=5 ) self.target_selector.pack(side=tk.LEFT, padx=5) self.target_selector.bind("<>", self._on_target_select) # Frame per le metriche di sincronizzazione sync_frame = ttk.Frame(top_bar) sync_frame.pack(side=tk.LEFT, padx=(20, 0)) if self.estimated_latency_ms is not None: ttk.Label(sync_frame, text="Avg. Latency:").pack(side=tk.LEFT) ttk.Label( sync_frame, text=f"{self.estimated_latency_ms:.1f} ms", font=("Segoe UI", 9, "bold"), foreground="blue", ).pack(side=tk.LEFT, padx=4) if self.prediction_offset_ms is not None: ttk.Label(sync_frame, text="Prediction Offset:").pack( side=tk.LEFT, padx=(10, 0) ) ttk.Label( sync_frame, text=f"{self.prediction_offset_ms:.1f} ms", font=("Segoe UI", 9, "bold"), foreground="green", ).pack(side=tk.LEFT, padx=4) columns = ("metric", "x_error", "y_error", "z_error") self.stats_tree = ttk.Treeview(left, columns=columns, show="headings") self.stats_tree.heading("metric", text="Metric") self.stats_tree.heading("x_error", text="Error X") self.stats_tree.heading("y_error", text="Error Y") self.stats_tree.heading("z_error", text="Error Z") self.stats_tree.column("metric", width=120, anchor=tk.W) self.stats_tree.column("x_error", anchor=tk.E, width=120) self.stats_tree.column("y_error", anchor=tk.E, width=120) self.stats_tree.column("z_error", anchor=tk.E, width=120) self.stats_tree.pack(fill=tk.BOTH, expand=True) # Right side: explanatory legend box (compact) legend_title = ttk.Label( right, text="How to Interpret Results:", font=(None, 9, "bold") ) legend_title.pack(anchor=tk.NW, padx=(6, 6), pady=(4, 4)) explanation_text = ( "Formula: Error = Real Position - Simulated Position\n\n" "Sign of Error (e.g., on X axis):\n" "• Positive Error (+): Real target is at a larger X coordinate.\n" " - If moving R -> L (X increases): Real is AHEAD.\n" " - If moving L -> R (X decreases): Real is BEHIND.\n\n" "• Negative Error (-): Real target is at a smaller X coordinate.\n" " - If moving R -> L (X increases): Real is BEHIND.\n" " - If moving L -> R (X decreases): Real is AHEAD.\n\n" "Prediction Offset:\n" "A manual offset to compensate for server processing delay, " "aiming to minimize the Mean Error." ) try: ttk.Label( right, text=explanation_text, justify=tk.LEFT, wraplength=280, ).pack(anchor=tk.NW, padx=(6, 6)) except Exception: pass def _create_plot_widgets(self, parent): """Create and configure the matplotlib Figure and canvas used for plotting. Args: parent (tk.Widget): Parent container where the plot canvas will be packed. """ fig = Figure(figsize=(5, 3), dpi=100) self.ax = fig.add_subplot(111) self.ax.set_title("Instantaneous Error") self.ax.set_xlabel("Time (s)") self.ax.set_ylabel("Error (ft)") (self.line_x,) = self.ax.plot([], [], lw=2, label="Error X") (self.line_y,) = self.ax.plot([], [], lw=2, label="Error Y") (self.line_z,) = self.ax.plot([], [], lw=2, label="Error Z") # Place legend outside the axes to keep plot area clear try: self.ax.grid(True) # horizontal zero line for reference self.ax.axhline(0.0, color="black", lw=1, linestyle="--", alpha=0.8) self.ax.legend(loc="upper left", bbox_to_anchor=(1.02, 1), fontsize=9) except Exception: pass fig.tight_layout() self.canvas = FigureCanvasTkAgg(fig, master=parent) self.canvas.draw() self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) def _update_target_selector(self): """Refresh the target selector Combobox with IDs from the state hub. Only updates the combobox when the hub reports IDs; if no IDs are available the existing values are preserved to avoid flicker in the UI. """ # Only update the combobox values when the hub reports target ids. # This prevents the selector from being emptied when the hub is cleared # at the end of a simulation and allows the analysis window to continue # showing the last results until the next simulation starts. try: target_ids = sorted(self._hub.get_all_target_ids()) except Exception: target_ids = [] if target_ids: self.target_selector["values"] = target_ids if self.selected_target_id.get() not in target_ids: # If previous selection isn't available, pick the first available self.selected_target_id.set(target_ids[0]) else: # Do not overwrite existing combobox values when no targets are present. # This preserves the user's last view after a simulation ends. pass def _update_stats_table(self, results: Dict): """Populate the stats Treeview with aggregated error metrics. Args: results (Dict): A mapping containing 'x', 'y', 'z' metrics and numerical aggregates such as mean, std_dev and rmse. """ self.stats_tree.delete(*self.stats_tree.get_children()) metrics = ["mean", "std_dev", "rmse"] for metric in metrics: self.stats_tree.insert( "", "end", values=( metric.replace("_", " ").title(), f"{results['x'][metric]:.3f}", f"{results['y'][metric]:.3f}", f"{results['z'][metric]:.3f}", ), ) def _update_plot(self, target_id: int): """Update the matplotlib plot for the given target using hub history. Args: target_id (int): The identifier of the target to plot errors for. """ history = self._hub.get_target_history(target_id) if not history or not history["real"] or len(history["simulated"]) < 2: self.line_x.set_data([], []) self.line_y.set_data([], []) self.line_z.set_data([], []) self.ax.relim() self.ax.autoscale_view() self.canvas.draw_idle() return times, errors_x, errors_y, errors_z = [], [], [], [] sim_hist = sorted(history["simulated"]) for real_state in history["real"]: real_ts, real_x, real_y, real_z = real_state p1, p2 = self._analyzer._find_bracketing_points(real_ts, sim_hist) if p1 and p2: interp_state = self._analyzer._interpolate(real_ts, p1, p2) _ts, interp_x, interp_y, interp_z = interp_state times.append(real_ts) errors_x.append(real_x - interp_x) errors_y.append(real_y - interp_y) errors_z.append(real_z - interp_z) self.line_x.set_data(times, errors_x) self.line_y.set_data(times, errors_y) self.line_z.set_data(times, errors_z) self.ax.relim() self.ax.autoscale_view() self.canvas.draw_idle() def _clear_views(self): """Clear statistics and plots, leaving the UI in an empty state. This is used as a safe fallback when no data is available or when an error occurs while generating analysis results. """ self.stats_tree.delete(*self.stats_tree.get_children()) self.line_x.set_data([], []) self.line_y.set_data([], []) self.line_z.set_data([], []) self.ax.relim() self.ax.autoscale_view() self.canvas.draw_idle() def _show_insufficient_data_info(self, target_id: int): """Display helpful information in the stats table when a target cannot be analyzed (for example because simulated and real time ranges do not overlap). This avoids an empty UI and gives the user actionable context. """ try: # Clear previous contents self.stats_tree.delete(*self.stats_tree.get_children()) history = self._hub.get_target_history(target_id) if history is None: self.stats_tree.insert( "", "end", values=("Info", "Target not found", "", "") ) self._clear_views() return sim_times = [s[0] for s in history.get("simulated", [])] real_times = [r[0] for r in history.get("real", [])] sim_count = len(sim_times) real_count = len(real_times) sim_range = (min(sim_times), max(sim_times)) if sim_times else (None, None) real_range = ( (min(real_times), max(real_times)) if real_times else (None, None) ) # populate the small table with human-readable diagnostic rows self.stats_tree.insert( "", "end", values=("Info", f"Target {target_id}", "", ""), ) self.stats_tree.insert( "", "end", values=("Sim samples", str(sim_count), "", ""), ) self.stats_tree.insert( "", "end", values=("Sim time range", f"{sim_range[0]} -> {sim_range[1]}", "", ""), ) self.stats_tree.insert( "", "end", values=("Real samples", str(real_count), "", ""), ) self.stats_tree.insert( "", "end", values=( "Real time range", f"{real_range[0]} -> {real_range[1]}", "", "", ), ) # keep plot cleared self.line_x.set_data([], []) self.line_y.set_data([], []) self.line_z.set_data([], []) self.ax.relim() self.ax.autoscale_view() self.canvas.draw_idle() except Exception: # Fail silently to avoid breaking the analysis window; show # the cleared view as a fallback. self._clear_views() def _on_close(self): """Handle the window close event by marking the window inactive and destroying it. """ self._active = False self.destroy() def _on_target_select(self, event=None): """Handle combobox selection changes and update stats/plot.""" try: sel = self.selected_target_id.get() analysis_results = self._analyzer.analyze() if sel in analysis_results: self._update_stats_table(analysis_results[sel]) self._update_plot(sel) else: self._clear_views() except Exception: pass