# target_simulator/gui/analysis_window.py """ A Toplevel window for displaying real-time performance analysis, including error statistics and plots. """ import tkinter as tk from tkinter import ttk, messagebox from typing import Optional, Dict from target_simulator.analysis.performance_analyzer import PerformanceAnalyzer from target_simulator.analysis.simulation_state_hub import SimulationStateHub try: from matplotlib.figure import Figure from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg MATPLOTLIB_AVAILABLE = True except ImportError: MATPLOTLIB_AVAILABLE = False UPDATE_INTERVAL_MS = 1000 # Update analysis every second class AnalysisWindow(tk.Toplevel): """ A window that displays real-time analysis of tracking performance. """ def __init__(self, master, analyzer: PerformanceAnalyzer, hub: SimulationStateHub): super().__init__(master) self.title("Performance Analysis") self.geometry("800x600") self.transient(master) if not MATPLOTLIB_AVAILABLE: messagebox.showerror( "Dependency Missing", "Matplotlib is required for the analysis window. Please install it (`pip install matplotlib`).", parent=self, ) self.destroy() return self._analyzer = analyzer self._hub = hub self._active = True # Cache last displayed state so we can preserve it between simulations. # If no new analysis data is available (e.g. simulation stopped), we # will keep showing these values until a new simulation populates the hub. self._last_displayed_target_id = None self._has_last_values = False self.selected_target_id = tk.IntVar() self._create_widgets() self._update_loop() self.protocol("WM_DELETE_WINDOW", self._on_close) def _create_widgets(self): main_pane = ttk.PanedWindow(self, orient=tk.VERTICAL) main_pane.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) # --- Top Frame for Stats Table --- stats_frame = ttk.LabelFrame(main_pane, text="Error Statistics (feet)") main_pane.add(stats_frame, weight=1) self._create_stats_widgets(stats_frame) # --- Bottom Frame for Plot --- plot_frame = ttk.LabelFrame(main_pane, text="Error Over Time (feet)") main_pane.add(plot_frame, weight=3) self._create_plot_widgets(plot_frame) def _create_stats_widgets(self, parent): top_bar = ttk.Frame(parent) top_bar.pack(fill=tk.X, padx=5, pady=5) ttk.Label(top_bar, text="Select Target ID:").pack(side=tk.LEFT) self.target_selector = ttk.Combobox( top_bar, textvariable=self.selected_target_id, state="readonly", width=5 ) self.target_selector.pack(side=tk.LEFT, padx=5) self.target_selector.bind("<>", self._on_target_select) columns = ("metric", "x_error", "y_error", "z_error") self.stats_tree = ttk.Treeview(parent, columns=columns, show="headings") self.stats_tree.heading("metric", text="Metric") self.stats_tree.heading("x_error", text="Error X") self.stats_tree.heading("y_error", text="Error Y") self.stats_tree.heading("z_error", text="Error Z") self.stats_tree.column("metric", width=120, anchor=tk.W) self.stats_tree.column("x_error", anchor=tk.E) self.stats_tree.column("y_error", anchor=tk.E) self.stats_tree.column("z_error", anchor=tk.E) self.stats_tree.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) def _create_plot_widgets(self, parent): fig = Figure(figsize=(5, 3), dpi=100) self.ax = fig.add_subplot(111) self.ax.set_title("Instantaneous Error") self.ax.set_xlabel("Time (s)") self.ax.set_ylabel("Error (ft)") (self.line_x,) = self.ax.plot([], [], lw=2, label="Error X") (self.line_y,) = self.ax.plot([], [], lw=2, label="Error Y") (self.line_z,) = self.ax.plot([], [], lw=2, label="Error Z") self.ax.legend() self.ax.grid(True) fig.tight_layout() self.canvas = FigureCanvasTkAgg(fig, master=parent) self.canvas.draw() self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) def _update_loop(self): if not self._active: return self._update_target_selector() # If the simulation is not running, do not process new analysis data. # Keep showing the last cached values (if any) until a simulation is # actively running again. try: running = False if hasattr(self.master, "is_simulation_running"): try: running = bool(self.master.is_simulation_running.get()) except Exception: running = False except Exception: running = False sel = self.selected_target_id.get() if not running: # Simulation not running: do not call analyze or update plots. # Preserve last displayed values to avoid UI flicker/clearing. # However, if there are no cached values, show a message in the # stats table to indicate data will appear when simulation runs. if not self._has_last_values: self.stats_tree.delete(*self.stats_tree.get_children()) self.stats_tree.insert( "", "end", values=("No Data", "-", "-", "-"), ) self.after(UPDATE_INTERVAL_MS, self._update_loop) return # When running, perform live analysis and update views. analysis_results = self._analyzer.analyze() if sel in analysis_results: # We have fresh analysis for the selected target: update and cache self._update_stats_table(analysis_results[sel]) self._update_plot(sel) self._last_displayed_target_id = sel self._has_last_values = True else: # No fresh analysis for selected target. Decide whether to clear or # keep the last displayed values. If the hub currently reports any # active real targets, it means data should be cleared for missing # selection; otherwise preserve. try: if self._hub and hasattr(self._hub, "has_active_real_targets") and self._hub.has_active_real_targets(): self._clear_views() self._has_last_values = False else: pass except Exception: self._clear_views() self._has_last_values = False self.after(UPDATE_INTERVAL_MS, self._update_loop) def _update_target_selector(self): # Only update the combobox values when the hub reports target ids. # This prevents the selector from being emptied when the hub is cleared # at the end of a simulation and allows the analysis window to continue # showing the last results until the next simulation starts. try: target_ids = sorted(self._hub.get_all_target_ids()) except Exception: target_ids = [] if target_ids: self.target_selector["values"] = target_ids if self.selected_target_id.get() not in target_ids: # If previous selection isn't available, pick the first available self.selected_target_id.set(target_ids[0]) else: # Do not overwrite existing combobox values when no targets are present. # This preserves the user's last view after a simulation ends. pass def _on_target_select(self, event=None): # Trigger an immediate update when user changes selection # Only perform an immediate analysis if the simulation is running. running = False try: if hasattr(self.master, "is_simulation_running"): running = bool(self.master.is_simulation_running.get()) except Exception: running = False sel = self.selected_target_id.get() if not running: # If we have cached values for this selection, keep them; otherwise # show a placeholder and don't call the analyzer. if self._has_last_values and self._last_displayed_target_id == sel: return else: self.stats_tree.delete(*self.stats_tree.get_children()) self.stats_tree.insert( "", "end", values=("No Data", "-", "-", "-"), ) self._has_last_values = False return # Simulation running -> perform analysis for selection analysis_results = self._analyzer.analyze() if sel in analysis_results: self._update_stats_table(analysis_results[sel]) self._update_plot(sel) self._last_displayed_target_id = sel self._has_last_values = True else: try: if self._hub and hasattr(self._hub, "has_active_real_targets") and self._hub.has_active_real_targets(): self._clear_views() self._has_last_values = False else: pass except Exception: self._clear_views() self._has_last_values = False def _update_stats_table(self, results: Dict): self.stats_tree.delete(*self.stats_tree.get_children()) metrics = ["mean", "std_dev", "rmse"] for metric in metrics: self.stats_tree.insert( "", "end", values=( metric.replace("_", " ").title(), f"{results['x'][metric]:.3f}", f"{results['y'][metric]:.3f}", f"{results['z'][metric]:.3f}", ), ) def _update_plot(self, target_id: int): history = self._hub.get_target_history(target_id) if not history or not history["real"] or len(history["simulated"]) < 2: self.line_x.set_data([], []) self.line_y.set_data([], []) self.line_z.set_data([], []) self.ax.relim() self.ax.autoscale_view() self.canvas.draw_idle() return times, errors_x, errors_y, errors_z = [], [], [], [] sim_hist = sorted(history["simulated"]) for real_state in history["real"]: real_ts, real_x, real_y, real_z = real_state p1, p2 = self._analyzer._find_bracketing_points(real_ts, sim_hist) if p1 and p2: interp_state = self._analyzer._interpolate(real_ts, p1, p2) _ts, interp_x, interp_y, interp_z = interp_state times.append(real_ts) errors_x.append(real_x - interp_x) errors_y.append(real_y - interp_y) errors_z.append(real_z - interp_z) self.line_x.set_data(times, errors_x) self.line_y.set_data(times, errors_y) self.line_z.set_data(times, errors_z) self.ax.relim() self.ax.autoscale_view() self.canvas.draw_idle() def _clear_views(self): self.stats_tree.delete(*self.stats_tree.get_children()) self.line_x.set_data([], []) self.line_y.set_data([], []) self.line_z.set_data([], []) self.ax.relim() self.ax.autoscale_view() self.canvas.draw_idle() def _on_close(self): self._active = False self.destroy()