aggiunta la funzione di analisi a posteriori della simulazione

This commit is contained in:
VALLONGOL 2025-11-03 08:40:21 +01:00
parent 73a7817f5c
commit aa9f8e69a2
6 changed files with 9751 additions and 140 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,92 @@
# target_simulator/analysis/simulation_archive.py
import os
import json
import time
from datetime import datetime
from typing import Dict, List, Any, Tuple
from target_simulator.core.models import Scenario
# Definisci la struttura per uno stato registrato
RecordedState = Tuple[float, float, float, float] # (timestamp, x_ft, y_ft, z_ft)
class SimulationArchive:
"""
Gestisce la raccolta dei dati per una singola esecuzione di simulazione e la salva su file.
"""
ARCHIVE_FOLDER = "archive_simulations"
def __init__(self, scenario: Scenario):
"""
Inizializza una nuova sessione di archivio per un dato scenario.
"""
self.start_time = time.monotonic()
self.scenario_name = scenario.name
self.scenario_data = scenario.to_dict()
# Struttura dati per contenere gli eventi registrati, indicizzati per target_id
# self.recorded_data[target_id]['simulated'] = [(ts, x, y, z), ...]
# self.recorded_data[target_id]['real'] = [(ts, x, y, z), ...]
self.recorded_data: Dict[int, Dict[str, List[RecordedState]]] = {}
self._ensure_archive_directory()
def _ensure_archive_directory(self):
"""Crea la directory principale dell'archivio se non esiste."""
if not os.path.exists(self.ARCHIVE_FOLDER):
try:
os.makedirs(self.ARCHIVE_FOLDER)
except OSError as e:
print(f"Errore nella creazione della directory di archivio: {e}")
def add_simulated_state(self, target_id: int, timestamp: float, state: Tuple[float, ...]):
"""Aggiunge uno stato simulato all'archivio."""
if target_id not in self.recorded_data:
self.recorded_data[target_id] = {"simulated": [], "real": []}
full_state: RecordedState = (timestamp, state[0], state[1], state[2])
self.recorded_data[target_id]["simulated"].append(full_state)
def add_real_state(self, target_id: int, timestamp: float, state: Tuple[float, ...]):
"""Aggiunge uno stato reale (dal server) all'archivio."""
if target_id not in self.recorded_data:
self.recorded_data[target_id] = {"simulated": [], "real": []}
full_state: RecordedState = (timestamp, state[0], state[1], state[2])
self.recorded_data[target_id]["real"].append(full_state)
def save(self) -> str:
"""
Salva l'archivio completo della simulazione in un file JSON.
Il nome del file è generato dal timestamp e dal nome dello scenario.
Ritorna:
Il percorso del file salvato.
"""
end_time = time.monotonic()
archive_content = {
"metadata": {
"scenario_name": self.scenario_name,
"start_timestamp_utc": datetime.utcnow().isoformat(),
"duration_seconds": end_time - self.start_time,
},
"scenario_definition": self.scenario_data,
"simulation_results": self.recorded_data,
}
ts_str = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_scenario_name = "".join(c for c in self.scenario_name if c.isalnum() or c in (' ', '_')).rstrip()
filename = f"{ts_str}_{safe_scenario_name}.json"
filepath = os.path.join(self.ARCHIVE_FOLDER, filename)
try:
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(archive_content, f, indent=4)
print(f"Archivio di simulazione salvato in: {filepath}")
return filepath
except IOError as e:
print(f"Errore durante il salvataggio dell'archivio di simulazione: {e}")
return ""

View File

@ -27,12 +27,14 @@ class SimulationEngine(threading.Thread):
self, self,
communicator: Optional[CommunicatorInterface], communicator: Optional[CommunicatorInterface],
simulation_hub: Optional[SimulationStateHub] = None, simulation_hub: Optional[SimulationStateHub] = None,
archive: Optional[str] = None
): ):
super().__init__(daemon=True, name="SimulationEngineThread") super().__init__(daemon=True, name="SimulationEngineThread")
self.logger = get_logger(__name__) self.logger = get_logger(__name__)
self.communicator = communicator self.communicator = communicator
self.simulation_hub = simulation_hub # Hub for data analysis self.simulation_hub = simulation_hub # Hub for data analysis
self.archive = archive # Archive path if needed
self.time_multiplier = 1.0 self.time_multiplier = 1.0
self.update_interval_s = 1.0 self.update_interval_s = 1.0
@ -109,6 +111,18 @@ class SimulationEngine(threading.Thread):
simulated_delta_time = delta_time * self.time_multiplier simulated_delta_time = delta_time * self.time_multiplier
self.scenario.update_state(simulated_delta_time) self.scenario.update_state(simulated_delta_time)
if self.archive:
log_timestamp = time.monotonic()
for target in self.scenario.get_all_targets():
if target.active:
state_tuple = (
getattr(target, "_pos_x_ft", 0.0),
getattr(target, "_pos_y_ft", 0.0),
getattr(target, "_pos_z_ft", 0.0),
)
self.archive.add_simulated_state(
target.target_id, log_timestamp, state_tuple
)
# --- High-Frequency State Logging --- # --- High-Frequency State Logging ---
tick_timestamp = time.monotonic() tick_timestamp = time.monotonic()

View File

@ -7,6 +7,8 @@ error statistics and plots.
import tkinter as tk import tkinter as tk
from tkinter import ttk, messagebox from tkinter import ttk, messagebox
from typing import Optional, Dict from typing import Optional, Dict
import json
import os
from target_simulator.analysis.performance_analyzer import PerformanceAnalyzer from target_simulator.analysis.performance_analyzer import PerformanceAnalyzer
from target_simulator.analysis.simulation_state_hub import SimulationStateHub from target_simulator.analysis.simulation_state_hub import SimulationStateHub
@ -27,37 +29,63 @@ class AnalysisWindow(tk.Toplevel):
A window that displays real-time analysis of tracking performance. A window that displays real-time analysis of tracking performance.
""" """
def __init__(self, master, analyzer: PerformanceAnalyzer, hub: SimulationStateHub): def __init__(self, master, archive_filepath: str):
super().__init__(master) super().__init__(master)
self.title("Performance Analysis") self.title(f"Analysis for: {os.path.basename(archive_filepath)}")
self.geometry("800x600") self.geometry("800x600")
self.transient(master)
if not MATPLOTLIB_AVAILABLE: # State variables
messagebox.showerror( self.selected_target_id = tk.IntVar(value=0)
"Dependency Missing", self._active = True
"Matplotlib is required for the analysis window. Please install it (`pip install matplotlib`).",
parent=self, # Carica i dati e inizializza l'analizzatore
) self._load_data_and_setup(archive_filepath)
# ... il resto del codice di creazione widget rimane simile ...
self._create_widgets()
# Non c'è più un loop, ma un singolo aggiornamento
self._populate_analysis()
def _load_data_and_setup(self, filepath: str):
try:
with open(filepath, 'r', encoding='utf-8') as f:
archive_data = json.load(f)
except Exception as e:
messagebox.showerror("Errore di Caricamento", f"Impossibile caricare il file di archivio.\n{e}")
self.destroy() self.destroy()
return return
self._analyzer = analyzer # Crea un hub temporaneo e popolalo con i dati storici
self._hub = hub self._hub = SimulationStateHub()
self._active = True results = archive_data.get("simulation_results", {})
for target_id_str, data in results.items():
target_id = int(target_id_str)
for state in data.get("simulated", []):
self._hub.add_simulated_state(target_id, state[0], tuple(state[1:]))
for state in data.get("real", []):
self._hub.add_real_state(target_id, state[0], tuple(state[1:]))
# Cache last displayed state so we can preserve it between simulations. # Crea l'analizzatore con l'hub popolato
# If no new analysis data is available (e.g. simulation stopped), we self._analyzer = PerformanceAnalyzer(self._hub)
# will keep showing these values until a new simulation populates the hub.
self._last_displayed_target_id = None
self._has_last_values = False
self.selected_target_id = tk.IntVar() def _populate_analysis(self):
"""Esegue l'analisi e popola i widget una sola volta."""
self._update_target_selector() # Ora usa l'hub locale
self._create_widgets() # Seleziona il primo target di default
self._update_loop() target_ids = self.target_selector["values"]
if target_ids:
self.selected_target_id.set(target_ids[0])
self.protocol("WM_DELETE_WINDOW", self._on_close) analysis_results = self._analyzer.analyze()
sel_id = self.selected_target_id.get()
if sel_id in analysis_results:
self._update_stats_table(analysis_results[sel_id])
self._update_plot(sel_id)
else:
self._clear_views()
def _create_widgets(self): def _create_widgets(self):
main_pane = ttk.PanedWindow(self, orient=tk.VERTICAL) main_pane = ttk.PanedWindow(self, orient=tk.VERTICAL)
@ -65,17 +93,29 @@ class AnalysisWindow(tk.Toplevel):
# --- Top Frame for Stats Table --- # --- Top Frame for Stats Table ---
stats_frame = ttk.LabelFrame(main_pane, text="Error Statistics (feet)") stats_frame = ttk.LabelFrame(main_pane, text="Error Statistics (feet)")
# Keep stats frame compact so the plot below has more space
main_pane.add(stats_frame, weight=1) main_pane.add(stats_frame, weight=1)
self._create_stats_widgets(stats_frame) self._create_stats_widgets(stats_frame)
# --- Bottom Frame for Plot --- # --- Bottom Frame for Plot ---
plot_frame = ttk.LabelFrame(main_pane, text="Error Over Time (feet)") plot_frame = ttk.LabelFrame(main_pane, text="Error Over Time (feet)")
main_pane.add(plot_frame, weight=3) # Give the plot more vertical weight so it occupies most of the window
main_pane.add(plot_frame, weight=4)
self._create_plot_widgets(plot_frame) self._create_plot_widgets(plot_frame)
def _create_stats_widgets(self, parent): def _create_stats_widgets(self, parent):
top_bar = ttk.Frame(parent) # Build a horizontal area: left = table, right = legend/explanations
top_bar.pack(fill=tk.X, padx=5, pady=5) container = ttk.Frame(parent)
container.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
left = ttk.Frame(container)
left.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
right = ttk.Frame(container)
right.pack(side=tk.RIGHT, fill=tk.Y)
top_bar = ttk.Frame(left)
top_bar.pack(fill=tk.X, padx=0, pady=(0, 6))
ttk.Label(top_bar, text="Select Target ID:").pack(side=tk.LEFT) ttk.Label(top_bar, text="Select Target ID:").pack(side=tk.LEFT)
self.target_selector = ttk.Combobox( self.target_selector = ttk.Combobox(
@ -85,7 +125,7 @@ class AnalysisWindow(tk.Toplevel):
self.target_selector.bind("<<ComboboxSelected>>", self._on_target_select) self.target_selector.bind("<<ComboboxSelected>>", self._on_target_select)
columns = ("metric", "x_error", "y_error", "z_error") columns = ("metric", "x_error", "y_error", "z_error")
self.stats_tree = ttk.Treeview(parent, columns=columns, show="headings") self.stats_tree = ttk.Treeview(left, columns=columns, show="headings")
self.stats_tree.heading("metric", text="Metric") self.stats_tree.heading("metric", text="Metric")
self.stats_tree.heading("x_error", text="Error X") self.stats_tree.heading("x_error", text="Error X")
@ -93,11 +133,27 @@ class AnalysisWindow(tk.Toplevel):
self.stats_tree.heading("z_error", text="Error Z") self.stats_tree.heading("z_error", text="Error Z")
self.stats_tree.column("metric", width=120, anchor=tk.W) self.stats_tree.column("metric", width=120, anchor=tk.W)
self.stats_tree.column("x_error", anchor=tk.E) self.stats_tree.column("x_error", anchor=tk.E, width=120)
self.stats_tree.column("y_error", anchor=tk.E) self.stats_tree.column("y_error", anchor=tk.E, width=120)
self.stats_tree.column("z_error", anchor=tk.E) self.stats_tree.column("z_error", anchor=tk.E, width=120)
self.stats_tree.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) self.stats_tree.pack(fill=tk.BOTH, expand=True)
# Right side: explanatory legend box (compact)
legend_title = ttk.Label(right, text="How to read these results:", font=(None, 9, "bold"))
legend_title.pack(anchor=tk.NW, padx=(6, 6), pady=(4, 2))
legend_text = (
"mean = average error\nstd_dev = standard deviation\nrmse = root-mean-square error\n\n"
"Sign convention: value = real - simulated.\n"
"> 0 => real is ahead of simulated.\n< 0 => real lags simulated.\n\nUnits: feet"
)
try:
ttk.Label(right, text=legend_text, foreground="gray", justify=tk.LEFT, wraplength=260).pack(
anchor=tk.NW, padx=(6, 6)
)
except Exception:
pass
def _create_plot_widgets(self, parent): def _create_plot_widgets(self, parent):
fig = Figure(figsize=(5, 3), dpi=100) fig = Figure(figsize=(5, 3), dpi=100)
@ -111,74 +167,20 @@ class AnalysisWindow(tk.Toplevel):
(self.line_y,) = self.ax.plot([], [], lw=2, label="Error Y") (self.line_y,) = self.ax.plot([], [], lw=2, label="Error Y")
(self.line_z,) = self.ax.plot([], [], lw=2, label="Error Z") (self.line_z,) = self.ax.plot([], [], lw=2, label="Error Z")
self.ax.legend() # Place legend outside the axes to keep plot area clear
self.ax.grid(True) try:
self.ax.grid(True)
# horizontal zero line for reference
self.ax.axhline(0.0, color="black", lw=1, linestyle="--", alpha=0.8)
self.ax.legend(loc="upper left", bbox_to_anchor=(1.02, 1), fontsize=9)
except Exception:
pass
fig.tight_layout() fig.tight_layout()
self.canvas = FigureCanvasTkAgg(fig, master=parent) self.canvas = FigureCanvasTkAgg(fig, master=parent)
self.canvas.draw() self.canvas.draw()
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
def _update_loop(self):
if not self._active:
return
self._update_target_selector()
# If the simulation is not running, do not process new analysis data.
# Keep showing the last cached values (if any) until a simulation is
# actively running again.
try:
running = False
if hasattr(self.master, "is_simulation_running"):
try:
running = bool(self.master.is_simulation_running.get())
except Exception:
running = False
except Exception:
running = False
sel = self.selected_target_id.get()
if not running:
# Simulation not running: do not call analyze or update plots.
# Preserve last displayed values to avoid UI flicker/clearing.
# However, if there are no cached values, show a message in the
# stats table to indicate data will appear when simulation runs.
if not self._has_last_values:
self.stats_tree.delete(*self.stats_tree.get_children())
self.stats_tree.insert(
"",
"end",
values=("No Data", "-", "-", "-"),
)
self.after(UPDATE_INTERVAL_MS, self._update_loop)
return
# When running, perform live analysis and update views.
analysis_results = self._analyzer.analyze()
if sel in analysis_results:
# We have fresh analysis for the selected target: update and cache
self._update_stats_table(analysis_results[sel])
self._update_plot(sel)
self._last_displayed_target_id = sel
self._has_last_values = True
else:
# No fresh analysis for selected target. Decide whether to clear or
# keep the last displayed values. If the hub currently reports any
# active real targets, it means data should be cleared for missing
# selection; otherwise preserve.
try:
if self._hub and hasattr(self._hub, "has_active_real_targets") and self._hub.has_active_real_targets():
self._clear_views()
self._has_last_values = False
else:
pass
except Exception:
self._clear_views()
self._has_last_values = False
self.after(UPDATE_INTERVAL_MS, self._update_loop)
def _update_target_selector(self): def _update_target_selector(self):
# Only update the combobox values when the hub reports target ids. # Only update the combobox values when the hub reports target ids.
@ -201,51 +203,6 @@ class AnalysisWindow(tk.Toplevel):
# This preserves the user's last view after a simulation ends. # This preserves the user's last view after a simulation ends.
pass pass
def _on_target_select(self, event=None):
# Trigger an immediate update when user changes selection
# Only perform an immediate analysis if the simulation is running.
running = False
try:
if hasattr(self.master, "is_simulation_running"):
running = bool(self.master.is_simulation_running.get())
except Exception:
running = False
sel = self.selected_target_id.get()
if not running:
# If we have cached values for this selection, keep them; otherwise
# show a placeholder and don't call the analyzer.
if self._has_last_values and self._last_displayed_target_id == sel:
return
else:
self.stats_tree.delete(*self.stats_tree.get_children())
self.stats_tree.insert(
"",
"end",
values=("No Data", "-", "-", "-"),
)
self._has_last_values = False
return
# Simulation running -> perform analysis for selection
analysis_results = self._analyzer.analyze()
if sel in analysis_results:
self._update_stats_table(analysis_results[sel])
self._update_plot(sel)
self._last_displayed_target_id = sel
self._has_last_values = True
else:
try:
if self._hub and hasattr(self._hub, "has_active_real_targets") and self._hub.has_active_real_targets():
self._clear_views()
self._has_last_values = False
else:
pass
except Exception:
self._clear_views()
self._has_last_values = False
def _update_stats_table(self, results: Dict): def _update_stats_table(self, results: Dict):
self.stats_tree.delete(*self.stats_tree.get_children()) self.stats_tree.delete(*self.stats_tree.get_children())
@ -309,3 +266,16 @@ class AnalysisWindow(tk.Toplevel):
def _on_close(self): def _on_close(self):
self._active = False self._active = False
self.destroy() self.destroy()
def _on_target_select(self, event=None):
"""Handle combobox selection changes and update stats/plot."""
try:
sel = self.selected_target_id.get()
analysis_results = self._analyzer.analyze()
if sel in analysis_results:
self._update_stats_table(analysis_results[sel])
self._update_plot(sel)
else:
self._clear_views()
except Exception:
pass

View File

@ -9,6 +9,9 @@ from queue import Queue, Empty
from typing import Optional, Dict, Any, List from typing import Optional, Dict, Any, List
import time import time
import math import math
import os
import json
from datetime import datetime
# Use absolute imports for robustness and clarity # Use absolute imports for robustness and clarity
from target_simulator.gui.ppi_display import PPIDisplay from target_simulator.gui.ppi_display import PPIDisplay
@ -32,6 +35,7 @@ from target_simulator.analysis.simulation_state_hub import SimulationStateHub
from target_simulator.analysis.performance_analyzer import PerformanceAnalyzer from target_simulator.analysis.performance_analyzer import PerformanceAnalyzer
from target_simulator.gui.analysis_window import AnalysisWindow from target_simulator.gui.analysis_window import AnalysisWindow
from target_simulator.core import command_builder from target_simulator.core import command_builder
from target_simulator.analysis.simulation_archive import SimulationArchive
GUI_QUEUE_POLL_INTERVAL_MS = 100 GUI_QUEUE_POLL_INTERVAL_MS = 100
@ -46,6 +50,8 @@ class MainView(tk.Tk):
self.logger = get_logger(__name__) self.logger = get_logger(__name__)
self.config_manager = ConfigManager() self.config_manager = ConfigManager()
self.current_archive: Optional[SimulationArchive] = None
# --- Load Settings --- # --- Load Settings ---
settings = self.config_manager.get_general_settings() settings = self.config_manager.get_general_settings()
self.scan_limit = settings.get("scan_limit", 60) self.scan_limit = settings.get("scan_limit", 60)
@ -376,6 +382,11 @@ class MainView(tk.Tk):
) )
send_lru_button.pack(side=tk.RIGHT) send_lru_button.pack(side=tk.RIGHT)
# --- TAB 4: Analysis ---
analysis_tab = ttk.Frame(left_notebook)
left_notebook.add(analysis_tab, text="Analysis")
self._create_analysis_tab_widgets(analysis_tab) # Nuovo metodo
# --- Bottom Pane (Logs) --- # --- Bottom Pane (Logs) ---
log_frame_container = ttk.LabelFrame(v_pane, text="Logs") log_frame_container = ttk.LabelFrame(v_pane, text="Logs")
v_pane.add(log_frame_container, weight=1) v_pane.add(log_frame_container, weight=1)
@ -383,6 +394,75 @@ class MainView(tk.Tk):
log_frame_container, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 9) log_frame_container, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 9)
) )
self.log_text_widget.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) self.log_text_widget.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# Ensure the log pane starts smaller so the rate/status bar remains visible
# on lower-resolution screens. We compute the window geometry after
# idle tasks have run (so sizes are realistic) and then move the sash
# so the Logs pane has a small fixed minimum height (~80px) or ~18%.
try:
def _shrink_log_pane_once(event=None):
# Run only once
if getattr(self, "_log_pane_shrunk", False):
return
try:
# Ensure geometry/layout is updated
try:
self.update_idletasks()
except Exception:
pass
# Prefer the paned window's current height if available
total_h = v_pane.winfo_height() or self.winfo_height() or 800
# Determine desired log pane height (min 80px, or ~18% of window)
desired_log_h = max(80, int(total_h * 0.18))
# Compute sash position so that bottom pane (logs) has desired_log_h
pos = max(40, int(total_h - desired_log_h))
# Apply sash position (index 0 for the only sash in vertical pane)
try:
v_pane.sashpos(0, pos)
except Exception:
# Some platforms may not support sashpos until fully realized;
# ignore and rely on later Configure event.
pass
# Mark as done so we don't repeatedly adjust
setattr(self, "_log_pane_shrunk", True)
except Exception:
pass
# Try shortly after init (gives Tk time to compute geometry)
self.after(150, _shrink_log_pane_once)
# Also bind to a single Configure event in case geometry wasn't ready
def _on_config_once(ev):
_shrink_log_pane_once()
try:
v_pane.unbind("<Configure>", onconf_id)
except Exception:
pass
onconf_id = v_pane.bind("<Configure>", _on_config_once)
except Exception:
pass
def _create_analysis_tab_widgets(self, parent):
self.analysis_tree = ttk.Treeview(parent, columns=("datetime", "scenario", "duration"), show="headings")
self.analysis_tree.heading("datetime", text="Date/Time")
self.analysis_tree.heading("scenario", text="Scenario Name")
self.analysis_tree.heading("duration", text="Duration (s)")
self.analysis_tree.column("datetime", width=150)
self.analysis_tree.column("scenario", width=200)
self.analysis_tree.column("duration", width=80, anchor=tk.E)
self.analysis_tree.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
btn_frame = ttk.Frame(parent)
btn_frame.pack(fill=tk.X, padx=5, pady=5)
ttk.Button(btn_frame, text="Refresh List", command=self._refresh_analysis_list).pack(side=tk.LEFT)
ttk.Button(btn_frame, text="Analyze Selected", command=self._on_analyze_run).pack(side=tk.RIGHT)
# Popola la lista all'avvio
self.after(100, self._refresh_analysis_list)
def _create_menubar(self): def _create_menubar(self):
menubar = tk.Menu(self) menubar = tk.Menu(self)
@ -876,6 +956,13 @@ class MainView(tk.Tk):
pass pass
self._update_simulation_progress_display() self._update_simulation_progress_display()
self.current_archive = SimulationArchive(self.scenario)
self.simulation_engine.archive = self.current_archive
if self.target_communicator and hasattr(self.target_communicator, "router"):
router = self.target_communicator.router()
if router:
router.set_archive(self.current_archive)
self.simulation_engine.start() self.simulation_engine.start()
# Set running state and update buttons AFTER starting the thread # Set running state and update buttons AFTER starting the thread
@ -886,6 +973,15 @@ class MainView(tk.Tk):
if not self.is_simulation_running.get() or not self.simulation_engine: if not self.is_simulation_running.get() or not self.simulation_engine:
return return
if self.current_archive:
self.current_archive.save()
self.current_archive = None
if self.target_communicator and hasattr(self.target_communicator, "router"):
router = self.target_communicator.router()
if router:
router.set_archive(None) # Disattiva l'archiviazione
self._refresh_analysis_list() # Aggiorna subito la lista
self.logger.info("Stopping live simulation (user request)...") self.logger.info("Stopping live simulation (user request)...")
try: try:
self.simulation_engine.stop() self.simulation_engine.stop()
@ -915,6 +1011,15 @@ class MainView(tk.Tk):
"""Handle the natural end-of-simulation event.""" """Handle the natural end-of-simulation event."""
self.logger.info("Handling simulation finished (engine signalled completion).") self.logger.info("Handling simulation finished (engine signalled completion).")
if self.current_archive:
self.current_archive.save()
self.current_archive = None
if self.target_communicator and hasattr(self.target_communicator, "router"):
router = self.target_communicator.router()
if router:
router.set_archive(None) # Disattiva l'archiviazione
self._refresh_analysis_list() # Aggiorna subito la lista
if self.simulation_engine and self.simulation_engine.is_running(): if self.simulation_engine and self.simulation_engine.is_running():
try: try:
self.simulation_engine.stop() self.simulation_engine.stop()
@ -1643,3 +1748,46 @@ class MainView(tk.Tk):
# Reschedule the next refresh cycle # Reschedule the next refresh cycle
self.after(GUI_REFRESH_RATE_MS, self._gui_refresh_loop) self.after(GUI_REFRESH_RATE_MS, self._gui_refresh_loop)
def _refresh_analysis_list(self):
self.analysis_tree.delete(*self.analysis_tree.get_children())
archive_folder = SimulationArchive.ARCHIVE_FOLDER
if not os.path.exists(archive_folder):
return
runs = []
for filename in os.listdir(archive_folder):
if filename.endswith(".json"):
filepath = os.path.join(archive_folder, filename)
try:
# Leggiamo solo i metadati per non caricare tutto il file
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
metadata = data.get("metadata", {})
# Usiamo il timestamp del nome del file per l'ordinamento
dt_str = filename.split('_')[0]
run_info = {
"datetime": datetime.strptime(dt_str, "%Y%m%d").strftime("%Y-%m-%d") + " " + filename.split('_')[1].replace(".json",""),
"scenario": metadata.get("scenario_name", "N/A"),
"duration": f"{metadata.get('duration_seconds', 0):.1f}",
"filepath": filepath
}
runs.append(run_info)
except Exception as e:
self.logger.warning(f"Impossibile leggere l'archivio {filename}: {e}")
# Ordina dal più recente al più vecchio
for run in sorted(runs, key=lambda r: r['datetime'], reverse=True):
self.analysis_tree.insert("", tk.END, values=(run['datetime'], run['scenario'], run['duration']), iid=run['filepath'])
def _on_analyze_run(self):
selected_item = self.analysis_tree.focus()
if not selected_item:
messagebox.showinfo("Nessuna Selezione", "Seleziona una simulazione da analizzare.")
return
archive_filepath = selected_item # L'IID è il filepath
# Apri la finestra di analisi passando il percorso del file
# (dovremo modificare AnalysisWindow per accettarlo)
AnalysisWindow(self, archive_filepath=archive_filepath)

View File

@ -40,6 +40,7 @@ class DebugPayloadRouter:
self, self,
simulation_hub: Optional[SimulationStateHub] = None, simulation_hub: Optional[SimulationStateHub] = None,
): ):
self.active_archive = None
self._log_prefix = "[DebugPayloadRouter]" self._log_prefix = "[DebugPayloadRouter]"
self._lock = threading.Lock() self._lock = threading.Lock()
self._latest_payloads: Dict[str, Any] = {} self._latest_payloads: Dict[str, Any] = {}
@ -75,6 +76,11 @@ class DebugPayloadRouter:
) )
self._logger = logger self._logger = logger
def set_archive(self, archive):
"""Imposta la sessione di archivio corrente per la registrazione."""
with self._lock:
self.active_archive = archive
def add_ris_target_listener(self, listener: TargetListListener): def add_ris_target_listener(self, listener: TargetListListener):
"""Registers a callback function to receive updates for real targets.""" """Registers a callback function to receive updates for real targets."""
with self._lock: with self._lock:
@ -226,6 +232,23 @@ class DebugPayloadRouter:
"DebugPayloadRouter: Failed to process RIS for Hub." "DebugPayloadRouter: Failed to process RIS for Hub."
) )
with self._lock:
archive = self.active_archive
if archive:
reception_timestamp = time.monotonic()
for target in real_targets:
state_tuple = (
getattr(target, "_pos_x_ft", 0.0),
getattr(target, "_pos_y_ft", 0.0),
getattr(target, "_pos_z_ft", 0.0),
)
archive.add_real_state(
target_id=target.target_id,
timestamp=reception_timestamp,
state=state_tuple
)
# --- BROADCAST to all registered listeners --- # --- BROADCAST to all registered listeners ---
with self._lock: with self._lock:
for listener in self._ris_target_listeners: for listener in self._ris_target_listeners: