S1005403_RisCC/target_simulator/gui/analysis_window.py
VALLONGOL 8c439b60c3 Chore: Stop tracking files based on .gitignore update.
Untracked files matching the following rules:
- Rule "!.vscode/launch.json": 1 file
- Rule "!.vscode/settings.json": 1 file
- Rule "!.vscode/tasks.json": 1 file
- Rule "*.bak*": 8 files
2025-10-31 13:46:11 +01:00

312 lines
12 KiB
Python

# target_simulator/gui/analysis_window.py
"""
A Toplevel window for displaying real-time performance analysis, including
error statistics and plots.
"""
import tkinter as tk
from tkinter import ttk, messagebox
from typing import Optional, Dict
from target_simulator.analysis.performance_analyzer import PerformanceAnalyzer
from target_simulator.analysis.simulation_state_hub import SimulationStateHub
try:
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
MATPLOTLIB_AVAILABLE = True
except ImportError:
MATPLOTLIB_AVAILABLE = False
UPDATE_INTERVAL_MS = 1000 # Update analysis every second
class AnalysisWindow(tk.Toplevel):
"""
A window that displays real-time analysis of tracking performance.
"""
def __init__(self, master, analyzer: PerformanceAnalyzer, hub: SimulationStateHub):
super().__init__(master)
self.title("Performance Analysis")
self.geometry("800x600")
self.transient(master)
if not MATPLOTLIB_AVAILABLE:
messagebox.showerror(
"Dependency Missing",
"Matplotlib is required for the analysis window. Please install it (`pip install matplotlib`).",
parent=self,
)
self.destroy()
return
self._analyzer = analyzer
self._hub = hub
self._active = True
# Cache last displayed state so we can preserve it between simulations.
# If no new analysis data is available (e.g. simulation stopped), we
# will keep showing these values until a new simulation populates the hub.
self._last_displayed_target_id = None
self._has_last_values = False
self.selected_target_id = tk.IntVar()
self._create_widgets()
self._update_loop()
self.protocol("WM_DELETE_WINDOW", self._on_close)
def _create_widgets(self):
main_pane = ttk.PanedWindow(self, orient=tk.VERTICAL)
main_pane.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# --- Top Frame for Stats Table ---
stats_frame = ttk.LabelFrame(main_pane, text="Error Statistics (feet)")
main_pane.add(stats_frame, weight=1)
self._create_stats_widgets(stats_frame)
# --- Bottom Frame for Plot ---
plot_frame = ttk.LabelFrame(main_pane, text="Error Over Time (feet)")
main_pane.add(plot_frame, weight=3)
self._create_plot_widgets(plot_frame)
def _create_stats_widgets(self, parent):
top_bar = ttk.Frame(parent)
top_bar.pack(fill=tk.X, padx=5, pady=5)
ttk.Label(top_bar, text="Select Target ID:").pack(side=tk.LEFT)
self.target_selector = ttk.Combobox(
top_bar, textvariable=self.selected_target_id, state="readonly", width=5
)
self.target_selector.pack(side=tk.LEFT, padx=5)
self.target_selector.bind("<<ComboboxSelected>>", self._on_target_select)
columns = ("metric", "x_error", "y_error", "z_error")
self.stats_tree = ttk.Treeview(parent, columns=columns, show="headings")
self.stats_tree.heading("metric", text="Metric")
self.stats_tree.heading("x_error", text="Error X")
self.stats_tree.heading("y_error", text="Error Y")
self.stats_tree.heading("z_error", text="Error Z")
self.stats_tree.column("metric", width=120, anchor=tk.W)
self.stats_tree.column("x_error", anchor=tk.E)
self.stats_tree.column("y_error", anchor=tk.E)
self.stats_tree.column("z_error", anchor=tk.E)
self.stats_tree.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
def _create_plot_widgets(self, parent):
fig = Figure(figsize=(5, 3), dpi=100)
self.ax = fig.add_subplot(111)
self.ax.set_title("Instantaneous Error")
self.ax.set_xlabel("Time (s)")
self.ax.set_ylabel("Error (ft)")
(self.line_x,) = self.ax.plot([], [], lw=2, label="Error X")
(self.line_y,) = self.ax.plot([], [], lw=2, label="Error Y")
(self.line_z,) = self.ax.plot([], [], lw=2, label="Error Z")
self.ax.legend()
self.ax.grid(True)
fig.tight_layout()
self.canvas = FigureCanvasTkAgg(fig, master=parent)
self.canvas.draw()
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
def _update_loop(self):
if not self._active:
return
self._update_target_selector()
# If the simulation is not running, do not process new analysis data.
# Keep showing the last cached values (if any) until a simulation is
# actively running again.
try:
running = False
if hasattr(self.master, "is_simulation_running"):
try:
running = bool(self.master.is_simulation_running.get())
except Exception:
running = False
except Exception:
running = False
sel = self.selected_target_id.get()
if not running:
# Simulation not running: do not call analyze or update plots.
# Preserve last displayed values to avoid UI flicker/clearing.
# However, if there are no cached values, show a message in the
# stats table to indicate data will appear when simulation runs.
if not self._has_last_values:
self.stats_tree.delete(*self.stats_tree.get_children())
self.stats_tree.insert(
"",
"end",
values=("No Data", "-", "-", "-"),
)
self.after(UPDATE_INTERVAL_MS, self._update_loop)
return
# When running, perform live analysis and update views.
analysis_results = self._analyzer.analyze()
if sel in analysis_results:
# We have fresh analysis for the selected target: update and cache
self._update_stats_table(analysis_results[sel])
self._update_plot(sel)
self._last_displayed_target_id = sel
self._has_last_values = True
else:
# No fresh analysis for selected target. Decide whether to clear or
# keep the last displayed values. If the hub currently reports any
# active real targets, it means data should be cleared for missing
# selection; otherwise preserve.
try:
if self._hub and hasattr(self._hub, "has_active_real_targets") and self._hub.has_active_real_targets():
self._clear_views()
self._has_last_values = False
else:
pass
except Exception:
self._clear_views()
self._has_last_values = False
self.after(UPDATE_INTERVAL_MS, self._update_loop)
def _update_target_selector(self):
# Only update the combobox values when the hub reports target ids.
# This prevents the selector from being emptied when the hub is cleared
# at the end of a simulation and allows the analysis window to continue
# showing the last results until the next simulation starts.
try:
target_ids = sorted(self._hub.get_all_target_ids())
except Exception:
target_ids = []
if target_ids:
self.target_selector["values"] = target_ids
if self.selected_target_id.get() not in target_ids:
# If previous selection isn't available, pick the first available
self.selected_target_id.set(target_ids[0])
else:
# Do not overwrite existing combobox values when no targets are present.
# This preserves the user's last view after a simulation ends.
pass
def _on_target_select(self, event=None):
# Trigger an immediate update when user changes selection
# Only perform an immediate analysis if the simulation is running.
running = False
try:
if hasattr(self.master, "is_simulation_running"):
running = bool(self.master.is_simulation_running.get())
except Exception:
running = False
sel = self.selected_target_id.get()
if not running:
# If we have cached values for this selection, keep them; otherwise
# show a placeholder and don't call the analyzer.
if self._has_last_values and self._last_displayed_target_id == sel:
return
else:
self.stats_tree.delete(*self.stats_tree.get_children())
self.stats_tree.insert(
"",
"end",
values=("No Data", "-", "-", "-"),
)
self._has_last_values = False
return
# Simulation running -> perform analysis for selection
analysis_results = self._analyzer.analyze()
if sel in analysis_results:
self._update_stats_table(analysis_results[sel])
self._update_plot(sel)
self._last_displayed_target_id = sel
self._has_last_values = True
else:
try:
if self._hub and hasattr(self._hub, "has_active_real_targets") and self._hub.has_active_real_targets():
self._clear_views()
self._has_last_values = False
else:
pass
except Exception:
self._clear_views()
self._has_last_values = False
def _update_stats_table(self, results: Dict):
self.stats_tree.delete(*self.stats_tree.get_children())
metrics = ["mean", "std_dev", "rmse"]
for metric in metrics:
self.stats_tree.insert(
"",
"end",
values=(
metric.replace("_", " ").title(),
f"{results['x'][metric]:.3f}",
f"{results['y'][metric]:.3f}",
f"{results['z'][metric]:.3f}",
),
)
def _update_plot(self, target_id: int):
history = self._hub.get_target_history(target_id)
if not history or not history["real"] or len(history["simulated"]) < 2:
self.line_x.set_data([], [])
self.line_y.set_data([], [])
self.line_z.set_data([], [])
self.ax.relim()
self.ax.autoscale_view()
self.canvas.draw_idle()
return
times, errors_x, errors_y, errors_z = [], [], [], []
sim_hist = sorted(history["simulated"])
for real_state in history["real"]:
real_ts, real_x, real_y, real_z = real_state
p1, p2 = self._analyzer._find_bracketing_points(real_ts, sim_hist)
if p1 and p2:
interp_state = self._analyzer._interpolate(real_ts, p1, p2)
_ts, interp_x, interp_y, interp_z = interp_state
times.append(real_ts)
errors_x.append(real_x - interp_x)
errors_y.append(real_y - interp_y)
errors_z.append(real_z - interp_z)
self.line_x.set_data(times, errors_x)
self.line_y.set_data(times, errors_y)
self.line_z.set_data(times, errors_z)
self.ax.relim()
self.ax.autoscale_view()
self.canvas.draw_idle()
def _clear_views(self):
self.stats_tree.delete(*self.stats_tree.get_children())
self.line_x.set_data([], [])
self.line_y.set_data([], [])
self.line_z.set_data([], [])
self.ax.relim()
self.ax.autoscale_view()
self.canvas.draw_idle()
def _on_close(self):
self._active = False
self.destroy()