diff --git a/target_simulator/analysis/simulation_archive.py b/target_simulator/analysis/simulation_archive.py index 6c64a3c..8caa30c 100644 --- a/target_simulator/analysis/simulation_archive.py +++ b/target_simulator/analysis/simulation_archive.py @@ -4,7 +4,7 @@ import os import json import time from datetime import datetime -from typing import Dict, List, Any, Tuple +from typing import Dict, List, Any, Tuple, Optional from target_simulator.core.models import Scenario @@ -62,22 +62,32 @@ class SimulationArchive: full_state: RecordedState = (timestamp, state[0], state[1], state[2]) self.recorded_data[target_id]["real"].append(full_state) - def save(self) -> str: + def save(self, extra_metadata: Optional[Dict[str, Any]] = None) -> str: """ - Salva l'archivio completo della simulazione in un file JSON. - Il nome del file è generato dal timestamp e dal nome dello scenario. + Saves the complete simulation archive to a JSON file. + The filename is generated from the timestamp and scenario name. - Ritorna: - Il percorso del file salvato. + Args: + extra_metadata: An optional dictionary of metadata to add or + overwrite in the final archive file. + + Returns: + The path of the saved file. """ end_time = time.monotonic() + metadata = { + "scenario_name": self.scenario_name, + "start_timestamp_utc": datetime.utcnow().isoformat(), + "duration_seconds": end_time - self.start_time, + } + + # Merge extra metadata if provided + if extra_metadata: + metadata.update(extra_metadata) + archive_content = { - "metadata": { - "scenario_name": self.scenario_name, - "start_timestamp_utc": datetime.utcnow().isoformat(), - "duration_seconds": end_time - self.start_time, - }, + "metadata": metadata, "scenario_definition": self.scenario_data, "simulation_results": self.recorded_data, } @@ -92,8 +102,8 @@ class SimulationArchive: try: with open(filepath, "w", encoding="utf-8") as f: json.dump(archive_content, f, indent=4) - print(f"Archivio di simulazione salvato in: {filepath}") + print(f"Simulation archive saved to: {filepath}") return filepath except IOError as e: - print(f"Errore durante il salvataggio dell'archivio di simulazione: {e}") + print(f"Error saving simulation archive: {e}") return "" diff --git a/target_simulator/core/simulation_engine.py b/target_simulator/core/simulation_engine.py index 52fbba8..8a365fe 100644 --- a/target_simulator/core/simulation_engine.py +++ b/target_simulator/core/simulation_engine.py @@ -6,6 +6,7 @@ broadcast target states, supporting different operational modes. """ import threading import time +import copy from queue import Queue from typing import Optional @@ -52,6 +53,7 @@ class SimulationEngine(threading.Thread): self.archive = archive # Archive path if needed self.time_multiplier = 1.0 self.update_interval_s = 1.0 + self.prediction_horizon_s = 0.0 # Latency compensation in seconds # Determine communication protocol from the communicator's config self.use_json_protocol = False @@ -68,6 +70,14 @@ class SimulationEngine(threading.Thread): self._last_tick_time = 0.0 self._last_update_time = 0.0 self._is_paused = False + + def set_prediction_horizon(self, horizon_s: float): + """ + Sets the prediction horizon for latency compensation. + This should be the estimated one-way client-to-server latency. + """ + # Set a reasonable cap to avoid excessive prediction + self.prediction_horizon_s = max(0.0, min(horizon_s, 1.0)) def load_scenario(self, scenario: Scenario): """Loads a new scenario into the engine and resets its simulation state.""" @@ -197,16 +207,30 @@ class SimulationEngine(threading.Thread): if self.communicator and self.communicator.is_open: commands_to_send = [] + # Create a list of targets to be sent, potentially predicted + targets_to_send = [] + if self.prediction_horizon_s > 0.0 and active_targets: + # Apply prediction + for target in active_targets: + # Create a deep copy to avoid altering the main simulation state + predicted_target = copy.deepcopy(target) + # Advance its state by the prediction horizon + predicted_target.update_state(self.prediction_horizon_s) + targets_to_send.append(predicted_target) + else: + # No prediction, use current state + targets_to_send = active_targets + # --- Protocol-dependent command generation --- if self.use_json_protocol: # --- JSON Protocol Logic --- - if active_targets: + if targets_to_send: # Usa la lista (potenzialmente predetta) json_payload = command_builder.build_json_update( - active_targets + targets_to_send ) commands_to_send.append(json_payload) - # Log to CSV for debugging + # Log to CSV for debugging (log the original state) for target in active_targets: state_tuple = ( getattr(target, "_pos_x_ft", 0.0), @@ -223,24 +247,27 @@ class SimulationEngine(threading.Thread): ) else: # --- Legacy Protocol Logic --- - for target in active_targets: + for target in targets_to_send: # Usa la lista (potenzialmente predetta) cmd = command_builder.build_tgtset_from_target_state(target) commands_to_send.append(cmd) - # Log to CSV for debugging - state_tuple = ( - getattr(target, "_pos_x_ft", 0.0), - getattr(target, "_pos_y_ft", 0.0), - getattr(target, "_pos_z_ft", 0.0), - ) - append_sent_position( - tick_timestamp, - target.target_id, - state_tuple[0], - state_tuple[1], - state_tuple[2], - cmd, - ) + # Log to CSV for debugging (log the original state) + # Find the original target corresponding to the predicted one + original_target = next((t for t in active_targets if t.target_id == target.target_id), None) + if original_target: + state_tuple = ( + getattr(original_target, "_pos_x_ft", 0.0), + getattr(original_target, "_pos_y_ft", 0.0), + getattr(original_target, "_pos_z_ft", 0.0), + ) + append_sent_position( + tick_timestamp, + original_target.target_id, + state_tuple[0], + state_tuple[1], + state_tuple[2], + cmd, + ) # --- Send the batch of commands --- if commands_to_send: diff --git a/target_simulator/gui/analysis_window.py b/target_simulator/gui/analysis_window.py index 1bff084..a5a127c 100644 --- a/target_simulator/gui/analysis_window.py +++ b/target_simulator/gui/analysis_window.py @@ -53,13 +53,19 @@ class AnalysisWindow(tk.Toplevel): archive_data = json.load(f) except Exception as e: messagebox.showerror( - "Errore di Caricamento", - f"Impossibile caricare il file di archivio.\n{e}", + "Loading Error", + f"Could not load archive file.\n{e}", ) self.destroy() return - # Crea un hub temporaneo e popolalo con i dati storici + # --- NUOVA AGGIUNTA --- + # Extract estimated latency from metadata + metadata = archive_data.get("metadata", {}) + self.estimated_latency_ms = metadata.get("estimated_latency_ms") + # --- FINE AGGIUNTA --- + + # Create a temporary hub and populate it with historical data self._hub = SimulationStateHub() results = archive_data.get("simulation_results", {}) for target_id_str, data in results.items(): @@ -69,7 +75,7 @@ class AnalysisWindow(tk.Toplevel): for state in data.get("real", []): self._hub.add_real_state(target_id, state[0], tuple(state[1:])) - # Crea l'analizzatore con l'hub popolato + # Create the analyzer with the populated hub self._analyzer = PerformanceAnalyzer(self._hub) def _populate_analysis(self): @@ -126,6 +132,17 @@ class AnalysisWindow(tk.Toplevel): ) self.target_selector.pack(side=tk.LEFT, padx=5) self.target_selector.bind("<>", self._on_target_select) + + if self.estimated_latency_ms is not None: + latency_frame = ttk.Frame(top_bar) + latency_frame.pack(side=tk.LEFT, padx=(20, 0)) + ttk.Label(latency_frame, text="Avg. Latency:").pack(side=tk.LEFT) + ttk.Label( + latency_frame, + text=f"{self.estimated_latency_ms:.1f} ms", + font=("Segoe UI", 9, "bold"), + foreground="blue" + ).pack(side=tk.LEFT, padx=4) columns = ("metric", "x_error", "y_error", "z_error") self.stats_tree = ttk.Treeview(left, columns=columns, show="headings") diff --git a/target_simulator/gui/main_view.py b/target_simulator/gui/main_view.py index 0eeeeed..99542b7 100644 --- a/target_simulator/gui/main_view.py +++ b/target_simulator/gui/main_view.py @@ -130,6 +130,7 @@ class MainView(tk.Tk): self.after(GUI_REFRESH_RATE_MS, self._gui_refresh_loop) self.after(1000, self._update_rate_status) + self.after(1000, self._update_latency_status) def _create_main_layout(self): v_pane = ttk.PanedWindow(self, orient=tk.VERTICAL) @@ -280,6 +281,7 @@ class MainView(tk.Tk): self.status_bar.place(relx=0.0, rely=1.0, anchor="sw", relwidth=1.0, height=24) self.status_var = self.status_bar.status_var self.rate_status_var = self.status_bar.rate_status_var + self.latency_status_var = self.status_bar.latency_status_var self._status_after_id = None def show_status_message(self, text: str, timeout_ms: Optional[int] = 3000): @@ -424,6 +426,32 @@ class MainView(tk.Tk): self.logger.debug(f"Error updating rate status: {e}") finally: self.after(1000, self._update_rate_status) + + def _update_latency_status(self): + """Periodically updates the latency display and prediction horizon.""" + try: + latency_s = 0.0 + if self.target_communicator and hasattr(self.target_communicator, 'router'): + router = self.target_communicator.router() + if router: + latency_s = router.get_estimated_latency_s() + + # Update the status bar display + if self.latency_status_var: + if latency_s > 0: + self.latency_status_var.set(f"Latency: {latency_s * 1000:.1f} ms") + else: + self.latency_status_var.set("") # Pulisce se non c'è latenza + + # Update the simulation engine's prediction horizon if it's running + if self.simulation_engine and self.simulation_engine.is_running(): + self.simulation_engine.set_prediction_horizon(latency_s) + + except Exception as e: + self.logger.debug(f"Error updating latency status: {e}") + finally: + # Schedule the next update + self.after(1000, self._update_latency_status) def _on_seek(self): if not self.simulation_engine or not self.simulation_engine.scenario: diff --git a/target_simulator/gui/payload_router.py b/target_simulator/gui/payload_router.py index bc6870f..41f8555 100644 --- a/target_simulator/gui/payload_router.py +++ b/target_simulator/gui/payload_router.py @@ -389,3 +389,14 @@ class DebugPayloadRouter: def set_persist(self, enabled: bool): with self._lock: self._persist = bool(enabled) + + def get_estimated_latency_s(self) -> float: + """ + Returns the estimated one-way server-to-client network latency. + + Returns: + The estimated latency in seconds, or 0.0 if not available. + """ + if hasattr(self, '_clock_synchronizer') and self._clock_synchronizer: + return self._clock_synchronizer.get_average_latency_s() + return 0.0 diff --git a/target_simulator/gui/status_bar.py b/target_simulator/gui/status_bar.py index fce3c9c..3151a9f 100644 --- a/target_simulator/gui/status_bar.py +++ b/target_simulator/gui/status_bar.py @@ -31,9 +31,7 @@ class StatusBar(ttk.Frame): def __init__(self, parent, resource_poll_s: float = 1.0, height: int = 24): super().__init__(parent, relief=tk.SUNKEN) - # Keep the status bar a fixed small height so it remains visible on - # vertically-constrained windows. Prevent children from forcing the - # frame's size. + # Keep the status bar a fixed small height try: self.configure(height=int(height)) self.pack_propagate(False) @@ -55,33 +53,45 @@ class StatusBar(ttk.Frame): self.status_var = tk.StringVar(value="Ready") ttk.Label(self, textvariable=self.status_var, anchor=tk.W).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=6) - # Right: rate and resource indicators - try: - self.rate_status_var = tk.StringVar(value="") - ttk.Label(self, textvariable=self.rate_status_var, anchor=tk.E).pack(side=tk.RIGHT, padx=(6, 8)) - except Exception: - self.rate_status_var = None - - # Resource usage (optional). We create the var even if psutil missing so - # callers can safely call getattr(..., 'resource_var', None). + # Right: rate, latency, and resource indicators try: + # Resource usage (optional) - pack this first to appear on the far right self.resource_var = tk.StringVar(value="") ttk.Label(self, textvariable=self.resource_var, anchor=tk.E).pack(side=tk.RIGHT, padx=(6, 8)) except Exception: self.resource_var = None + # Separator before latency + ttk.Separator(self, orient=tk.VERTICAL).pack(side=tk.RIGHT, fill=tk.Y, padx=5, pady=4) + + try: + # Latency indicator + self.latency_status_var = tk.StringVar(value="") + ttk.Label(self, textvariable=self.latency_status_var, anchor=tk.E).pack(side=tk.RIGHT, padx=(0, 6)) + except Exception: + self.latency_status_var = None + + # Separator before rate + ttk.Separator(self, orient=tk.VERTICAL).pack(side=tk.RIGHT, fill=tk.Y, padx=5, pady=4) + + try: + self.rate_status_var = tk.StringVar(value="") + ttk.Label(self, textvariable=self.rate_status_var, anchor=tk.E).pack(side=tk.RIGHT, padx=(0, 6)) + except Exception: + self.rate_status_var = None + + # Internal state self._status_after_id: Optional[str] = None self._res_stop_event = threading.Event() self._res_thread: Optional[threading.Thread] = None self._resource_poll_s = float(resource_poll_s) - # Start background monitor if psutil is available and we have a var + # Start background monitor if psutil is available if _HAS_PSUTIL and self.resource_var is not None: try: self.start_resource_monitor(self._resource_poll_s) except Exception: - # Don't fail construction if monitor can't start pass def _draw_status_indicator(self, canvas: tk.Canvas, color: str) -> None: diff --git a/target_simulator/simulation/simulation_controller.py b/target_simulator/simulation/simulation_controller.py index c604f2a..72b52f4 100644 --- a/target_simulator/simulation/simulation_controller.py +++ b/target_simulator/simulation/simulation_controller.py @@ -172,7 +172,24 @@ class SimulationController: def _stop_or_finish_simulation(self, main_view, was_stopped_by_user: bool): """Unified logic for handling simulation end, either by user or naturally.""" if self.current_archive: - self.current_archive.save() + # --- NUOVA AGGIUNTA INIZIO --- + # Retrieve estimated latency before saving the archive + estimated_latency_s = 0.0 + extra_metadata = {} + try: + target_comm = getattr(self.communicator_manager, "target_communicator", None) + if target_comm and hasattr(target_comm, 'router'): + router = target_comm.router() + if router and hasattr(router, 'get_estimated_latency_s'): + estimated_latency_s = router.get_estimated_latency_s() + + if estimated_latency_s > 0: + extra_metadata['estimated_latency_ms'] = round(estimated_latency_s * 1000, 2) + except Exception as e: + self.logger.warning(f"Could not retrieve estimated latency for archive: {e}") + # --- NUOVA AGGIUNTA FINE --- + + self.current_archive.save(extra_metadata=extra_metadata) self.current_archive = main_view.current_archive = None target_comm = getattr(self.communicator_manager, "target_communicator", None) diff --git a/target_simulator/utils/clock_synchronizer.py b/target_simulator/utils/clock_synchronizer.py index 3452529..62d5504 100644 --- a/target_simulator/utils/clock_synchronizer.py +++ b/target_simulator/utils/clock_synchronizer.py @@ -1,5 +1,3 @@ -# target_simulator/utils/clock_synchronizer.py - """ Provides a ClockSynchronizer class to model the relationship between a remote server's wrapping 32-bit timetag and the local monotonic clock. @@ -8,22 +6,18 @@ import collections import threading import time from typing import List, Tuple - -# NumPy is a strong recommendation for linear regression. -# If it's not already a dependency, it should be added. +#NumPy is a strong recommendation for linear regression. +#If it's not already a dependency, it should be added. try: import numpy as np NUMPY_AVAILABLE = True except ImportError: NUMPY_AVAILABLE = False - - class ClockSynchronizer: """ Synchronizes a remote wrapping 32-bit counter with the local monotonic clock using linear regression to model clock offset and drift. """ - # Constants for a 32-bit counter _COUNTER_MAX = 2**32 _WRAP_THRESHOLD = 2**31 # Detect wrap if decrease is > half the max value @@ -35,7 +29,7 @@ class ClockSynchronizer: Args: history_size: The number of recent samples to use for regression. min_samples_for_fit: The minimum number of samples required to - perform a linear regression fit. + perform a linear regression fit. """ if not NUMPY_AVAILABLE: raise ImportError("NumPy is required for the ClockSynchronizer.") @@ -52,6 +46,9 @@ class ClockSynchronizer: self._m: float = 0.0 # Slope (client seconds per server tick) self._b: float = 0.0 # Intercept (client time when server time was 0) + # Estimated one-way latency from server to client + self._average_latency_s: float = 0.0 + def add_sample(self, raw_server_timetag: int, client_reception_time: float): """ Adds a new sample pair to update the synchronization model. @@ -81,7 +78,7 @@ class ClockSynchronizer: def _update_model(self): """ Performs linear regression on the stored history to update the - model parameters (m and b). + model parameters (m and b) and the average latency. This method must be called within a locked context. """ if len(self._history) < self._min_samples: @@ -91,14 +88,24 @@ class ClockSynchronizer: x_vals = np.array([sample[0] for sample in self._history]) y_vals = np.array([sample[1] for sample in self._history]) - # Use polyfit to find the slope (m) and intercept (b) of the best-fit line try: m, b = np.polyfit(x_vals, y_vals, 1) self._m = m self._b = b + + # --- Calculate Average Latency --- + # Estimated generation time for each sample based on the new model + estimated_generation_times = self._m * x_vals + self._b + # Latency is the difference between reception and estimated generation + latencies = y_vals - estimated_generation_times + # Update the average latency, filtering out negative values which are artifacts + positive_latencies = latencies[latencies >= 0] + if len(positive_latencies) > 0: + self._average_latency_s = np.mean(positive_latencies) + else: + self._average_latency_s = 0.0 + except np.linalg.LinAlgError: - # This can happen if data is not well-conditioned, though unlikely here. - # In this case, we just keep the old model parameters. pass def to_client_time(self, raw_server_timetag: int) -> float: @@ -113,18 +120,24 @@ class ClockSynchronizer: The estimated client monotonic time when the event occurred. """ with self._lock: - # Determine the correct wrap count for this specific timestamp. - # This handles cases where the timetag might be slightly older - # than the most recent sample. current_wrap_count = self._wrap_count if self._last_raw_timetag is not None: diff = self._last_raw_timetag - raw_server_timetag if diff < -self._WRAP_THRESHOLD: - # This timetag is from just before the last wrap current_wrap_count -= 1 unwrapped_timetag = raw_server_timetag + current_wrap_count * self._COUNTER_MAX - # Apply the linear model estimated_time = self._m * unwrapped_timetag + self._b - return estimated_time \ No newline at end of file + return estimated_time + + def get_average_latency_s(self) -> float: + """ + Returns the current estimated average one-way network latency from + server to client in seconds. + + Returns: + The average latency in seconds, or 0.0 if not yet computed. + """ + with self._lock: + return self._average_latency_s \ No newline at end of file