S1005403_RisCC/target_simulator/gui/analysis_window.py
2025-11-13 10:57:10 +01:00

537 lines
20 KiB
Python

# target_simulator/gui/analysis_window.py
"""
A Toplevel window for displaying real-time performance analysis, including
error statistics and plots.
"""
import tkinter as tk
from tkinter import ttk, messagebox
from typing import Optional, Dict
import json
import os
from target_simulator.analysis.performance_analyzer import PerformanceAnalyzer
from target_simulator.analysis.simulation_state_hub import SimulationStateHub
try:
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
MATPLOTLIB_AVAILABLE = True
except ImportError:
MATPLOTLIB_AVAILABLE = False
UPDATE_INTERVAL_MS = 1000 # Update analysis every second
class AnalysisWindow(tk.Toplevel):
"""
A window that displays real-time analysis of tracking performance.
"""
def __init__(self, master, archive_filepath: str):
super().__init__(master)
self.title(f"Analysis for: {os.path.basename(archive_filepath)}")
self.geometry("900x750")
# State variables
self.selected_target_id = tk.IntVar(value=0)
self._active = True
# Carica i dati e inizializza l'analizzatore
self._load_data_and_setup(archive_filepath)
# ... il resto del codice di creazione widget rimane simile ...
self._create_widgets()
# Non c'è più un loop, ma un singolo aggiornamento
self._populate_analysis()
def _load_data_and_setup(self, filepath: str):
try:
with open(filepath, "r", encoding="utf-8") as f:
archive_data = json.load(f)
except Exception as e:
messagebox.showerror(
"Loading Error",
f"Could not load archive file.\n{e}",
)
self.destroy()
return
# Extract estimated latency from metadata
metadata = archive_data.get("metadata", {})
self.estimated_latency_ms = metadata.get("estimated_latency_ms")
self.prediction_offset_ms = metadata.get("prediction_offset_ms")
# Load latency samples (new format only: [[timestamp, latency_ms], ...])
latency_samples = metadata.get("latency_samples", [])
if latency_samples and isinstance(latency_samples[0], list):
self.latency_timestamps = [s[0] for s in latency_samples]
self.latency_values_ms = [s[1] for s in latency_samples]
else:
# No valid latency data
self.latency_timestamps = []
self.latency_values_ms = []
# Create a temporary hub and populate it with historical data
self._hub = SimulationStateHub()
results = archive_data.get("simulation_results", {})
for target_id_str, data in results.items():
target_id = int(target_id_str)
for state in data.get("simulated", []):
self._hub.add_simulated_state(target_id, state[0], tuple(state[1:]))
for state in data.get("real", []):
self._hub.add_real_state(target_id, state[0], tuple(state[1:]))
# Create the analyzer with the populated hub
self._analyzer = PerformanceAnalyzer(self._hub)
def _populate_analysis(self):
"""Esegue l'analisi e popola i widget una sola volta."""
self._update_target_selector() # Ora usa l'hub locale
# Seleziona il primo target di default
target_ids = self.target_selector["values"]
if target_ids:
self.selected_target_id.set(target_ids[0])
analysis_results = self._analyzer.analyze()
sel_id = self.selected_target_id.get()
if sel_id in analysis_results:
self._update_stats_table(analysis_results[sel_id])
self._update_plot(sel_id)
else:
# Provide diagnostic information when analysis cannot be
# produced for the selected target (common cause: no
# overlapping timestamps between simulated and real samples).
self._show_insufficient_data_info(sel_id)
# Update the latency plot regardless of target selection
self._update_latency_plot()
def _create_widgets(self):
main_pane = ttk.PanedWindow(self, orient=tk.VERTICAL)
main_pane.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# --- Top Frame for Stats Table ---
stats_frame = ttk.LabelFrame(main_pane, text="Error Statistics (feet)")
# Keep stats frame compact so the plot below has more space
main_pane.add(stats_frame, weight=1)
self._create_stats_widgets(stats_frame)
# --- Bottom Frame for Plot ---
plot_frame = ttk.LabelFrame(main_pane, text="Error Over Time (feet)")
# Give the plot more vertical weight so it occupies most of the window
main_pane.add(plot_frame, weight=4)
self._create_plot_widgets(plot_frame)
def _create_stats_widgets(self, parent):
"""Create the statistics widgets (table and explanatory legend).
Args:
parent (tk.Widget): Parent container where stats widgets will be placed.
"""
# Build a horizontal area: left = table, right = legend/explanations
container = ttk.Frame(parent)
container.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
left = ttk.Frame(container)
left.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
right = ttk.Frame(container)
right.pack(side=tk.RIGHT, fill=tk.Y)
top_bar = ttk.Frame(left)
top_bar.pack(fill=tk.X, padx=0, pady=(0, 6))
ttk.Label(top_bar, text="Select Target ID:").pack(side=tk.LEFT)
self.target_selector = ttk.Combobox(
top_bar, textvariable=self.selected_target_id, state="readonly", width=5
)
self.target_selector.pack(side=tk.LEFT, padx=5)
self.target_selector.bind("<<ComboboxSelected>>", self._on_target_select)
# Frame per le metriche di sincronizzazione
sync_frame = ttk.Frame(top_bar)
sync_frame.pack(side=tk.LEFT, padx=(20, 0))
if self.estimated_latency_ms is not None:
ttk.Label(sync_frame, text="Avg. Latency:").pack(side=tk.LEFT)
ttk.Label(
sync_frame,
text=f"{self.estimated_latency_ms:.1f} ms",
font=("Segoe UI", 9, "bold"),
foreground="blue",
).pack(side=tk.LEFT, padx=4)
if self.prediction_offset_ms is not None:
ttk.Label(sync_frame, text="Prediction Offset:").pack(
side=tk.LEFT, padx=(10, 0)
)
ttk.Label(
sync_frame,
text=f"{self.prediction_offset_ms:.1f} ms",
font=("Segoe UI", 9, "bold"),
foreground="green",
).pack(side=tk.LEFT, padx=4)
# Table with metrics as columns and error types as rows
columns = ("error_type", "mean", "std_dev", "rmse")
self.stats_tree = ttk.Treeview(left, columns=columns, show="headings", height=4)
self.stats_tree.heading("error_type", text="")
self.stats_tree.heading("mean", text="Mean (ft)")
self.stats_tree.heading("std_dev", text="Std Dev (ft)")
self.stats_tree.heading("rmse", text="RMSE (ft)")
self.stats_tree.column("error_type", width=100, anchor=tk.W)
self.stats_tree.column("mean", anchor=tk.E, width=100)
self.stats_tree.column("std_dev", anchor=tk.E, width=100)
self.stats_tree.column("rmse", anchor=tk.E, width=100)
self.stats_tree.pack(fill=tk.BOTH, expand=True)
# Right side: explanatory legend box (compact)
legend_title = ttk.Label(
right, text="How to Interpret Results:", font=(None, 9, "bold")
)
legend_title.pack(anchor=tk.NW, padx=(6, 6), pady=(4, 4))
explanation_text = (
"Formula: Error = Real Position - Simulated Position\n\n"
"Sign of Error (e.g., on X axis):\n"
"• Positive Error (+): Real target is at a larger X coordinate.\n"
" - If moving R -> L (X increases): Real is AHEAD.\n"
" - If moving L -> R (X decreases): Real is BEHIND.\n\n"
"• Negative Error (-): Real target is at a smaller X coordinate.\n"
" - If moving R -> L (X increases): Real is BEHIND.\n"
" - If moving L -> R (X decreases): Real is AHEAD.\n\n"
"Prediction Offset:\n"
"A manual offset to compensate for server processing delay, "
"aiming to minimize the Mean Error."
)
try:
ttk.Label(
right,
text=explanation_text,
justify=tk.LEFT,
wraplength=280,
).pack(anchor=tk.NW, padx=(6, 6))
except Exception:
pass
def _create_plot_widgets(self, parent):
"""Create and configure the matplotlib Figure and canvas used for plotting.
Args:
parent (tk.Widget): Parent container where the plot canvas will be packed.
"""
fig = Figure(figsize=(5, 6), dpi=100)
# Use GridSpec for aligned subplots with shared x-axis alignment
gs = fig.add_gridspec(2, 1, height_ratios=[2, 1], hspace=0.3)
# Top subplot: Instantaneous Error
self.ax = fig.add_subplot(gs[0, 0])
self.ax.set_title("Instantaneous Error")
self.ax.set_xlabel("Time (s)")
self.ax.set_ylabel("Error (ft)")
(self.line_x,) = self.ax.plot([], [], lw=2, label="Error X")
(self.line_y,) = self.ax.plot([], [], lw=2, label="Error Y")
(self.line_z,) = self.ax.plot([], [], lw=2, label="Error Z")
# Place legend inside the plot area
try:
self.ax.grid(True)
# horizontal zero line for reference
self.ax.axhline(0.0, color="black", lw=1, linestyle="--", alpha=0.8)
self.ax.legend(loc="upper right", fontsize=9)
except Exception:
pass
# Bottom subplot: Latency over time
self.ax_latency = fig.add_subplot(gs[1, 0], sharex=None)
self.ax_latency.set_title("Latency Evolution")
self.ax_latency.set_xlabel(
"Time (s)"
) # Will be updated if no timestamps available
self.ax_latency.set_ylabel("Latency (ms)")
(self.line_latency,) = self.ax_latency.plot(
[], [], lw=2, color="orange", label="Latency"
)
try:
self.ax_latency.grid(True)
self.ax_latency.legend(loc="upper right", fontsize=9)
except Exception:
pass
fig.tight_layout()
self.canvas = FigureCanvasTkAgg(fig, master=parent)
self.canvas.draw()
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
def _update_target_selector(self):
"""Refresh the target selector Combobox with IDs from the state hub.
Only updates the combobox when the hub reports IDs; if no IDs are
available the existing values are preserved to avoid flicker in the UI.
"""
# Only update the combobox values when the hub reports target ids.
# This prevents the selector from being emptied when the hub is cleared
# at the end of a simulation and allows the analysis window to continue
# showing the last results until the next simulation starts.
try:
target_ids = sorted(self._hub.get_all_target_ids())
except Exception:
target_ids = []
if target_ids:
self.target_selector["values"] = target_ids
if self.selected_target_id.get() not in target_ids:
# If previous selection isn't available, pick the first available
self.selected_target_id.set(target_ids[0])
else:
# Do not overwrite existing combobox values when no targets are present.
# This preserves the user's last view after a simulation ends.
pass
def _update_stats_table(self, results: Dict):
"""Populate the stats Treeview with aggregated error metrics.
Args:
results (Dict): A mapping containing 'x', 'y', 'z' metrics and
numerical aggregates such as mean, std_dev and rmse.
"""
self.stats_tree.delete(*self.stats_tree.get_children())
# Add rows for each error axis (X, Y, Z)
for axis in ["x", "y", "z"]:
self.stats_tree.insert(
"",
"end",
values=(
f"Error {axis.upper()}",
f"{results[axis]['mean']:.3f}",
f"{results[axis]['std_dev']:.3f}",
f"{results[axis]['rmse']:.3f}",
),
)
# Add latency row if available
if self.estimated_latency_ms is not None:
# Calculate latency stats from samples if available
if self.latency_values_ms:
import statistics
lat_mean = statistics.mean(self.latency_values_ms)
lat_std = (
statistics.stdev(self.latency_values_ms)
if len(self.latency_values_ms) > 1
else 0.0
)
lat_min = min(self.latency_values_ms)
lat_max = max(self.latency_values_ms)
self.stats_tree.insert(
"",
"end",
values=(
"Latency (ms)",
f"{lat_mean:.2f}",
f"{lat_std:.2f}",
f"{lat_min:.2f} - {lat_max:.2f}",
),
)
else:
# Fallback to estimated latency only
self.stats_tree.insert(
"",
"end",
values=(
"Latency (ms)",
f"{self.estimated_latency_ms:.2f}",
"N/A",
"N/A",
),
)
def _update_plot(self, target_id: int):
"""Update the matplotlib plot for the given target using hub history.
Args:
target_id (int): The identifier of the target to plot errors for.
"""
history = self._hub.get_target_history(target_id)
if not history or not history["real"] or len(history["simulated"]) < 2:
self.line_x.set_data([], [])
self.line_y.set_data([], [])
self.line_z.set_data([], [])
self.ax.relim()
self.ax.autoscale_view()
self.canvas.draw_idle()
return
times, errors_x, errors_y, errors_z = [], [], [], []
sim_hist = sorted(history["simulated"])
for real_state in history["real"]:
real_ts, real_x, real_y, real_z = real_state
p1, p2 = self._analyzer._find_bracketing_points(real_ts, sim_hist)
if p1 and p2:
interp_state = self._analyzer._interpolate(real_ts, p1, p2)
_ts, interp_x, interp_y, interp_z = interp_state
times.append(real_ts)
errors_x.append(real_x - interp_x)
errors_y.append(real_y - interp_y)
errors_z.append(real_z - interp_z)
self.line_x.set_data(times, errors_x)
self.line_y.set_data(times, errors_y)
self.line_z.set_data(times, errors_z)
self.ax.relim()
self.ax.autoscale_view()
self.canvas.draw_idle()
def _update_latency_plot(self):
"""Update the latency subplot with the latency samples from the archive.
Plots latency measurements over time to show how latency evolved
during the simulation, aligned with the error plot above.
"""
if not self.latency_values_ms or not self.latency_timestamps:
# No latency data or invalid format - clear the plot
self.line_latency.set_data([], [])
self.ax_latency.relim()
self.ax_latency.autoscale_view()
self.canvas.draw_idle()
return
# Plot latencies - they are already filtered to simulation time range
self.line_latency.set_data(self.latency_timestamps, self.latency_values_ms)
self.ax_latency.relim()
self.ax_latency.autoscale_view()
self.canvas.draw_idle()
def _clear_views(self):
"""Clear statistics and plots, leaving the UI in an empty state.
This is used as a safe fallback when no data is available or when an
error occurs while generating analysis results.
"""
self.stats_tree.delete(*self.stats_tree.get_children())
self.line_x.set_data([], [])
self.line_y.set_data([], [])
self.line_z.set_data([], [])
self.line_latency.set_data([], [])
self.ax.relim()
self.ax.autoscale_view()
self.ax_latency.relim()
self.ax_latency.autoscale_view()
self.canvas.draw_idle()
def _show_insufficient_data_info(self, target_id: int):
"""Display helpful information in the stats table when a target
cannot be analyzed (for example because simulated and real time
ranges do not overlap). This avoids an empty UI and gives the
user actionable context.
"""
try:
# Clear previous contents
self.stats_tree.delete(*self.stats_tree.get_children())
history = self._hub.get_target_history(target_id)
if history is None:
self.stats_tree.insert(
"", "end", values=("Info", "Target not found", "", "")
)
self._clear_views()
return
sim_times = [s[0] for s in history.get("simulated", [])]
real_times = [r[0] for r in history.get("real", [])]
sim_count = len(sim_times)
real_count = len(real_times)
sim_range = (min(sim_times), max(sim_times)) if sim_times else (None, None)
real_range = (
(min(real_times), max(real_times)) if real_times else (None, None)
)
# populate the small table with human-readable diagnostic rows
self.stats_tree.insert(
"",
"end",
values=("Info", f"Target {target_id}", "", ""),
)
self.stats_tree.insert(
"",
"end",
values=("Sim samples", str(sim_count), "", ""),
)
self.stats_tree.insert(
"",
"end",
values=("Sim time range", f"{sim_range[0]} -> {sim_range[1]}", "", ""),
)
self.stats_tree.insert(
"",
"end",
values=("Real samples", str(real_count), "", ""),
)
self.stats_tree.insert(
"",
"end",
values=(
"Real time range",
f"{real_range[0]} -> {real_range[1]}",
"",
"",
),
)
# keep plot cleared
self.line_x.set_data([], [])
self.line_y.set_data([], [])
self.line_z.set_data([], [])
self.ax.relim()
self.ax.autoscale_view()
self.canvas.draw_idle()
except Exception:
# Fail silently to avoid breaking the analysis window; show
# the cleared view as a fallback.
self._clear_views()
def _on_close(self):
"""Handle the window close event by marking the window inactive and
destroying it.
"""
self._active = False
self.destroy()
def _on_target_select(self, event=None):
"""Handle combobox selection changes and update stats/plot."""
try:
sel = self.selected_target_id.get()
analysis_results = self._analyzer.analyze()
if sel in analysis_results:
self._update_stats_table(analysis_results[sel])
self._update_plot(sel)
else:
self._clear_views()
# Latency plot doesn't depend on target selection
self._update_latency_plot()
except Exception:
pass