S1005403_RisCC/target_simulator/gui/analysis_window.py

324 lines
12 KiB
Python

# target_simulator/gui/analysis_window.py
"""
A Toplevel window for displaying real-time performance analysis, including
error statistics and plots.
"""
import tkinter as tk
from tkinter import ttk, messagebox
from typing import Optional, Dict
import json
import os
from target_simulator.analysis.performance_analyzer import PerformanceAnalyzer
from target_simulator.analysis.simulation_state_hub import SimulationStateHub
try:
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
MATPLOTLIB_AVAILABLE = True
except ImportError:
MATPLOTLIB_AVAILABLE = False
UPDATE_INTERVAL_MS = 1000 # Update analysis every second
class AnalysisWindow(tk.Toplevel):
"""
A window that displays real-time analysis of tracking performance.
"""
def __init__(self, master, archive_filepath: str):
super().__init__(master)
self.title(f"Analysis for: {os.path.basename(archive_filepath)}")
self.geometry("800x600")
# State variables
self.selected_target_id = tk.IntVar(value=0)
self._active = True
# Carica i dati e inizializza l'analizzatore
self._load_data_and_setup(archive_filepath)
# ... il resto del codice di creazione widget rimane simile ...
self._create_widgets()
# Non c'è più un loop, ma un singolo aggiornamento
self._populate_analysis()
def _load_data_and_setup(self, filepath: str):
try:
with open(filepath, "r", encoding="utf-8") as f:
archive_data = json.load(f)
except Exception as e:
messagebox.showerror(
"Loading Error",
f"Could not load archive file.\n{e}",
)
self.destroy()
return
# Extract estimated latency from metadata
metadata = archive_data.get("metadata", {})
self.estimated_latency_ms = metadata.get("estimated_latency_ms")
self.prediction_offset_ms = metadata.get("prediction_offset_ms")
# Create a temporary hub and populate it with historical data
self._hub = SimulationStateHub()
results = archive_data.get("simulation_results", {})
for target_id_str, data in results.items():
target_id = int(target_id_str)
for state in data.get("simulated", []):
self._hub.add_simulated_state(target_id, state[0], tuple(state[1:]))
for state in data.get("real", []):
self._hub.add_real_state(target_id, state[0], tuple(state[1:]))
# Create the analyzer with the populated hub
self._analyzer = PerformanceAnalyzer(self._hub)
def _populate_analysis(self):
"""Esegue l'analisi e popola i widget una sola volta."""
self._update_target_selector() # Ora usa l'hub locale
# Seleziona il primo target di default
target_ids = self.target_selector["values"]
if target_ids:
self.selected_target_id.set(target_ids[0])
analysis_results = self._analyzer.analyze()
sel_id = self.selected_target_id.get()
if sel_id in analysis_results:
self._update_stats_table(analysis_results[sel_id])
self._update_plot(sel_id)
else:
self._clear_views()
def _create_widgets(self):
main_pane = ttk.PanedWindow(self, orient=tk.VERTICAL)
main_pane.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# --- Top Frame for Stats Table ---
stats_frame = ttk.LabelFrame(main_pane, text="Error Statistics (feet)")
# Keep stats frame compact so the plot below has more space
main_pane.add(stats_frame, weight=1)
self._create_stats_widgets(stats_frame)
# --- Bottom Frame for Plot ---
plot_frame = ttk.LabelFrame(main_pane, text="Error Over Time (feet)")
# Give the plot more vertical weight so it occupies most of the window
main_pane.add(plot_frame, weight=4)
self._create_plot_widgets(plot_frame)
def _create_stats_widgets(self, parent):
# Build a horizontal area: left = table, right = legend/explanations
container = ttk.Frame(parent)
container.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
left = ttk.Frame(container)
left.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
right = ttk.Frame(container)
right.pack(side=tk.RIGHT, fill=tk.Y)
top_bar = ttk.Frame(left)
top_bar.pack(fill=tk.X, padx=0, pady=(0, 6))
ttk.Label(top_bar, text="Select Target ID:").pack(side=tk.LEFT)
self.target_selector = ttk.Combobox(
top_bar, textvariable=self.selected_target_id, state="readonly", width=5
)
self.target_selector.pack(side=tk.LEFT, padx=5)
self.target_selector.bind("<<ComboboxSelected>>", self._on_target_select)
# Frame per le metriche di sincronizzazione
sync_frame = ttk.Frame(top_bar)
sync_frame.pack(side=tk.LEFT, padx=(20, 0))
if self.estimated_latency_ms is not None:
ttk.Label(sync_frame, text="Avg. Latency:").pack(side=tk.LEFT)
ttk.Label(
sync_frame,
text=f"{self.estimated_latency_ms:.1f} ms",
font=("Segoe UI", 9, "bold"),
foreground="blue"
).pack(side=tk.LEFT, padx=4)
if self.prediction_offset_ms is not None:
ttk.Label(sync_frame, text="Prediction Offset:").pack(side=tk.LEFT, padx=(10, 0))
ttk.Label(
sync_frame,
text=f"{self.prediction_offset_ms:.1f} ms",
font=("Segoe UI", 9, "bold"),
foreground="green"
).pack(side=tk.LEFT, padx=4)
columns = ("metric", "x_error", "y_error", "z_error")
self.stats_tree = ttk.Treeview(left, columns=columns, show="headings")
self.stats_tree.heading("metric", text="Metric")
self.stats_tree.heading("x_error", text="Error X")
self.stats_tree.heading("y_error", text="Error Y")
self.stats_tree.heading("z_error", text="Error Z")
self.stats_tree.column("metric", width=120, anchor=tk.W)
self.stats_tree.column("x_error", anchor=tk.E, width=120)
self.stats_tree.column("y_error", anchor=tk.E, width=120)
self.stats_tree.column("z_error", anchor=tk.E, width=120)
self.stats_tree.pack(fill=tk.BOTH, expand=True)
# Right side: explanatory legend box (compact)
legend_title = ttk.Label(
right, text="How to Interpret Results:", font=(None, 9, "bold")
)
legend_title.pack(anchor=tk.NW, padx=(6, 6), pady=(4, 4))
explanation_text = (
"Formula: Error = Real Position - Simulated Position\n\n"
"Sign of Error (e.g., on X axis):\n"
"• Positive Error (+): Real target is at a larger X coordinate.\n"
" - If moving R -> L (X increases): Real is AHEAD.\n"
" - If moving L -> R (X decreases): Real is BEHIND.\n\n"
"• Negative Error (-): Real target is at a smaller X coordinate.\n"
" - If moving R -> L (X increases): Real is BEHIND.\n"
" - If moving L -> R (X decreases): Real is AHEAD.\n\n"
"Prediction Offset:\n"
"A manual offset to compensate for server processing delay, "
"aiming to minimize the Mean Error."
)
try:
ttk.Label(
right,
text=explanation_text,
justify=tk.LEFT,
wraplength=280,
).pack(anchor=tk.NW, padx=(6, 6))
except Exception:
pass
def _create_plot_widgets(self, parent):
fig = Figure(figsize=(5, 3), dpi=100)
self.ax = fig.add_subplot(111)
self.ax.set_title("Instantaneous Error")
self.ax.set_xlabel("Time (s)")
self.ax.set_ylabel("Error (ft)")
(self.line_x,) = self.ax.plot([], [], lw=2, label="Error X")
(self.line_y,) = self.ax.plot([], [], lw=2, label="Error Y")
(self.line_z,) = self.ax.plot([], [], lw=2, label="Error Z")
# Place legend outside the axes to keep plot area clear
try:
self.ax.grid(True)
# horizontal zero line for reference
self.ax.axhline(0.0, color="black", lw=1, linestyle="--", alpha=0.8)
self.ax.legend(loc="upper left", bbox_to_anchor=(1.02, 1), fontsize=9)
except Exception:
pass
fig.tight_layout()
self.canvas = FigureCanvasTkAgg(fig, master=parent)
self.canvas.draw()
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
def _update_target_selector(self):
# Only update the combobox values when the hub reports target ids.
# This prevents the selector from being emptied when the hub is cleared
# at the end of a simulation and allows the analysis window to continue
# showing the last results until the next simulation starts.
try:
target_ids = sorted(self._hub.get_all_target_ids())
except Exception:
target_ids = []
if target_ids:
self.target_selector["values"] = target_ids
if self.selected_target_id.get() not in target_ids:
# If previous selection isn't available, pick the first available
self.selected_target_id.set(target_ids[0])
else:
# Do not overwrite existing combobox values when no targets are present.
# This preserves the user's last view after a simulation ends.
pass
def _update_stats_table(self, results: Dict):
self.stats_tree.delete(*self.stats_tree.get_children())
metrics = ["mean", "std_dev", "rmse"]
for metric in metrics:
self.stats_tree.insert(
"",
"end",
values=(
metric.replace("_", " ").title(),
f"{results['x'][metric]:.3f}",
f"{results['y'][metric]:.3f}",
f"{results['z'][metric]:.3f}",
),
)
def _update_plot(self, target_id: int):
history = self._hub.get_target_history(target_id)
if not history or not history["real"] or len(history["simulated"]) < 2:
self.line_x.set_data([], [])
self.line_y.set_data([], [])
self.line_z.set_data([], [])
self.ax.relim()
self.ax.autoscale_view()
self.canvas.draw_idle()
return
times, errors_x, errors_y, errors_z = [], [], [], []
sim_hist = sorted(history["simulated"])
for real_state in history["real"]:
real_ts, real_x, real_y, real_z = real_state
p1, p2 = self._analyzer._find_bracketing_points(real_ts, sim_hist)
if p1 and p2:
interp_state = self._analyzer._interpolate(real_ts, p1, p2)
_ts, interp_x, interp_y, interp_z = interp_state
times.append(real_ts)
errors_x.append(real_x - interp_x)
errors_y.append(real_y - interp_y)
errors_z.append(real_z - interp_z)
self.line_x.set_data(times, errors_x)
self.line_y.set_data(times, errors_y)
self.line_z.set_data(times, errors_z)
self.ax.relim()
self.ax.autoscale_view()
self.canvas.draw_idle()
def _clear_views(self):
self.stats_tree.delete(*self.stats_tree.get_children())
self.line_x.set_data([], [])
self.line_y.set_data([], [])
self.line_z.set_data([], [])
self.ax.relim()
self.ax.autoscale_view()
self.canvas.draw_idle()
def _on_close(self):
self._active = False
self.destroy()
def _on_target_select(self, event=None):
"""Handle combobox selection changes and update stats/plot."""
try:
sel = self.selected_target_id.get()
analysis_results = self._analyzer.analyze()
if sel in analysis_results:
self._update_stats_table(analysis_results[sel])
self._update_plot(sel)
else:
self._clear_views()
except Exception:
pass