misura delle latenze di comunicazione tra server e client

This commit is contained in:
VALLONGOL 2025-11-04 13:29:40 +01:00
parent 47eeed88fd
commit 1ebf680064
8 changed files with 202 additions and 69 deletions

View File

@ -4,7 +4,7 @@ import os
import json
import time
from datetime import datetime
from typing import Dict, List, Any, Tuple
from typing import Dict, List, Any, Tuple, Optional
from target_simulator.core.models import Scenario
@ -62,22 +62,32 @@ class SimulationArchive:
full_state: RecordedState = (timestamp, state[0], state[1], state[2])
self.recorded_data[target_id]["real"].append(full_state)
def save(self) -> str:
def save(self, extra_metadata: Optional[Dict[str, Any]] = None) -> str:
"""
Salva l'archivio completo della simulazione in un file JSON.
Il nome del file è generato dal timestamp e dal nome dello scenario.
Saves the complete simulation archive to a JSON file.
The filename is generated from the timestamp and scenario name.
Ritorna:
Il percorso del file salvato.
Args:
extra_metadata: An optional dictionary of metadata to add or
overwrite in the final archive file.
Returns:
The path of the saved file.
"""
end_time = time.monotonic()
archive_content = {
"metadata": {
metadata = {
"scenario_name": self.scenario_name,
"start_timestamp_utc": datetime.utcnow().isoformat(),
"duration_seconds": end_time - self.start_time,
},
}
# Merge extra metadata if provided
if extra_metadata:
metadata.update(extra_metadata)
archive_content = {
"metadata": metadata,
"scenario_definition": self.scenario_data,
"simulation_results": self.recorded_data,
}
@ -92,8 +102,8 @@ class SimulationArchive:
try:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(archive_content, f, indent=4)
print(f"Archivio di simulazione salvato in: {filepath}")
print(f"Simulation archive saved to: {filepath}")
return filepath
except IOError as e:
print(f"Errore durante il salvataggio dell'archivio di simulazione: {e}")
print(f"Error saving simulation archive: {e}")
return ""

View File

@ -6,6 +6,7 @@ broadcast target states, supporting different operational modes.
"""
import threading
import time
import copy
from queue import Queue
from typing import Optional
@ -52,6 +53,7 @@ class SimulationEngine(threading.Thread):
self.archive = archive # Archive path if needed
self.time_multiplier = 1.0
self.update_interval_s = 1.0
self.prediction_horizon_s = 0.0 # Latency compensation in seconds
# Determine communication protocol from the communicator's config
self.use_json_protocol = False
@ -69,6 +71,14 @@ class SimulationEngine(threading.Thread):
self._last_update_time = 0.0
self._is_paused = False
def set_prediction_horizon(self, horizon_s: float):
"""
Sets the prediction horizon for latency compensation.
This should be the estimated one-way client-to-server latency.
"""
# Set a reasonable cap to avoid excessive prediction
self.prediction_horizon_s = max(0.0, min(horizon_s, 1.0))
def load_scenario(self, scenario: Scenario):
"""Loads a new scenario into the engine and resets its simulation state."""
self.logger.info(f"Loading scenario '{scenario.name}' into simulation engine.")
@ -197,16 +207,30 @@ class SimulationEngine(threading.Thread):
if self.communicator and self.communicator.is_open:
commands_to_send = []
# Create a list of targets to be sent, potentially predicted
targets_to_send = []
if self.prediction_horizon_s > 0.0 and active_targets:
# Apply prediction
for target in active_targets:
# Create a deep copy to avoid altering the main simulation state
predicted_target = copy.deepcopy(target)
# Advance its state by the prediction horizon
predicted_target.update_state(self.prediction_horizon_s)
targets_to_send.append(predicted_target)
else:
# No prediction, use current state
targets_to_send = active_targets
# --- Protocol-dependent command generation ---
if self.use_json_protocol:
# --- JSON Protocol Logic ---
if active_targets:
if targets_to_send: # Usa la lista (potenzialmente predetta)
json_payload = command_builder.build_json_update(
active_targets
targets_to_send
)
commands_to_send.append(json_payload)
# Log to CSV for debugging
# Log to CSV for debugging (log the original state)
for target in active_targets:
state_tuple = (
getattr(target, "_pos_x_ft", 0.0),
@ -223,19 +247,22 @@ class SimulationEngine(threading.Thread):
)
else:
# --- Legacy Protocol Logic ---
for target in active_targets:
for target in targets_to_send: # Usa la lista (potenzialmente predetta)
cmd = command_builder.build_tgtset_from_target_state(target)
commands_to_send.append(cmd)
# Log to CSV for debugging
# Log to CSV for debugging (log the original state)
# Find the original target corresponding to the predicted one
original_target = next((t for t in active_targets if t.target_id == target.target_id), None)
if original_target:
state_tuple = (
getattr(target, "_pos_x_ft", 0.0),
getattr(target, "_pos_y_ft", 0.0),
getattr(target, "_pos_z_ft", 0.0),
getattr(original_target, "_pos_x_ft", 0.0),
getattr(original_target, "_pos_y_ft", 0.0),
getattr(original_target, "_pos_z_ft", 0.0),
)
append_sent_position(
tick_timestamp,
target.target_id,
original_target.target_id,
state_tuple[0],
state_tuple[1],
state_tuple[2],

View File

@ -53,13 +53,19 @@ class AnalysisWindow(tk.Toplevel):
archive_data = json.load(f)
except Exception as e:
messagebox.showerror(
"Errore di Caricamento",
f"Impossibile caricare il file di archivio.\n{e}",
"Loading Error",
f"Could not load archive file.\n{e}",
)
self.destroy()
return
# Crea un hub temporaneo e popolalo con i dati storici
# --- NUOVA AGGIUNTA ---
# Extract estimated latency from metadata
metadata = archive_data.get("metadata", {})
self.estimated_latency_ms = metadata.get("estimated_latency_ms")
# --- FINE AGGIUNTA ---
# Create a temporary hub and populate it with historical data
self._hub = SimulationStateHub()
results = archive_data.get("simulation_results", {})
for target_id_str, data in results.items():
@ -69,7 +75,7 @@ class AnalysisWindow(tk.Toplevel):
for state in data.get("real", []):
self._hub.add_real_state(target_id, state[0], tuple(state[1:]))
# Crea l'analizzatore con l'hub popolato
# Create the analyzer with the populated hub
self._analyzer = PerformanceAnalyzer(self._hub)
def _populate_analysis(self):
@ -127,6 +133,17 @@ class AnalysisWindow(tk.Toplevel):
self.target_selector.pack(side=tk.LEFT, padx=5)
self.target_selector.bind("<<ComboboxSelected>>", self._on_target_select)
if self.estimated_latency_ms is not None:
latency_frame = ttk.Frame(top_bar)
latency_frame.pack(side=tk.LEFT, padx=(20, 0))
ttk.Label(latency_frame, text="Avg. Latency:").pack(side=tk.LEFT)
ttk.Label(
latency_frame,
text=f"{self.estimated_latency_ms:.1f} ms",
font=("Segoe UI", 9, "bold"),
foreground="blue"
).pack(side=tk.LEFT, padx=4)
columns = ("metric", "x_error", "y_error", "z_error")
self.stats_tree = ttk.Treeview(left, columns=columns, show="headings")

View File

@ -130,6 +130,7 @@ class MainView(tk.Tk):
self.after(GUI_REFRESH_RATE_MS, self._gui_refresh_loop)
self.after(1000, self._update_rate_status)
self.after(1000, self._update_latency_status)
def _create_main_layout(self):
v_pane = ttk.PanedWindow(self, orient=tk.VERTICAL)
@ -280,6 +281,7 @@ class MainView(tk.Tk):
self.status_bar.place(relx=0.0, rely=1.0, anchor="sw", relwidth=1.0, height=24)
self.status_var = self.status_bar.status_var
self.rate_status_var = self.status_bar.rate_status_var
self.latency_status_var = self.status_bar.latency_status_var
self._status_after_id = None
def show_status_message(self, text: str, timeout_ms: Optional[int] = 3000):
@ -425,6 +427,32 @@ class MainView(tk.Tk):
finally:
self.after(1000, self._update_rate_status)
def _update_latency_status(self):
"""Periodically updates the latency display and prediction horizon."""
try:
latency_s = 0.0
if self.target_communicator and hasattr(self.target_communicator, 'router'):
router = self.target_communicator.router()
if router:
latency_s = router.get_estimated_latency_s()
# Update the status bar display
if self.latency_status_var:
if latency_s > 0:
self.latency_status_var.set(f"Latency: {latency_s * 1000:.1f} ms")
else:
self.latency_status_var.set("") # Pulisce se non c'è latenza
# Update the simulation engine's prediction horizon if it's running
if self.simulation_engine and self.simulation_engine.is_running():
self.simulation_engine.set_prediction_horizon(latency_s)
except Exception as e:
self.logger.debug(f"Error updating latency status: {e}")
finally:
# Schedule the next update
self.after(1000, self._update_latency_status)
def _on_seek(self):
if not self.simulation_engine or not self.simulation_engine.scenario:
return

View File

@ -389,3 +389,14 @@ class DebugPayloadRouter:
def set_persist(self, enabled: bool):
with self._lock:
self._persist = bool(enabled)
def get_estimated_latency_s(self) -> float:
"""
Returns the estimated one-way server-to-client network latency.
Returns:
The estimated latency in seconds, or 0.0 if not available.
"""
if hasattr(self, '_clock_synchronizer') and self._clock_synchronizer:
return self._clock_synchronizer.get_average_latency_s()
return 0.0

View File

@ -31,9 +31,7 @@ class StatusBar(ttk.Frame):
def __init__(self, parent, resource_poll_s: float = 1.0, height: int = 24):
super().__init__(parent, relief=tk.SUNKEN)
# Keep the status bar a fixed small height so it remains visible on
# vertically-constrained windows. Prevent children from forcing the
# frame's size.
# Keep the status bar a fixed small height
try:
self.configure(height=int(height))
self.pack_propagate(False)
@ -55,33 +53,45 @@ class StatusBar(ttk.Frame):
self.status_var = tk.StringVar(value="Ready")
ttk.Label(self, textvariable=self.status_var, anchor=tk.W).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=6)
# Right: rate and resource indicators
try:
self.rate_status_var = tk.StringVar(value="")
ttk.Label(self, textvariable=self.rate_status_var, anchor=tk.E).pack(side=tk.RIGHT, padx=(6, 8))
except Exception:
self.rate_status_var = None
# Resource usage (optional). We create the var even if psutil missing so
# callers can safely call getattr(..., 'resource_var', None).
# Right: rate, latency, and resource indicators
try:
# Resource usage (optional) - pack this first to appear on the far right
self.resource_var = tk.StringVar(value="")
ttk.Label(self, textvariable=self.resource_var, anchor=tk.E).pack(side=tk.RIGHT, padx=(6, 8))
except Exception:
self.resource_var = None
# Separator before latency
ttk.Separator(self, orient=tk.VERTICAL).pack(side=tk.RIGHT, fill=tk.Y, padx=5, pady=4)
try:
# Latency indicator
self.latency_status_var = tk.StringVar(value="")
ttk.Label(self, textvariable=self.latency_status_var, anchor=tk.E).pack(side=tk.RIGHT, padx=(0, 6))
except Exception:
self.latency_status_var = None
# Separator before rate
ttk.Separator(self, orient=tk.VERTICAL).pack(side=tk.RIGHT, fill=tk.Y, padx=5, pady=4)
try:
self.rate_status_var = tk.StringVar(value="")
ttk.Label(self, textvariable=self.rate_status_var, anchor=tk.E).pack(side=tk.RIGHT, padx=(0, 6))
except Exception:
self.rate_status_var = None
# Internal state
self._status_after_id: Optional[str] = None
self._res_stop_event = threading.Event()
self._res_thread: Optional[threading.Thread] = None
self._resource_poll_s = float(resource_poll_s)
# Start background monitor if psutil is available and we have a var
# Start background monitor if psutil is available
if _HAS_PSUTIL and self.resource_var is not None:
try:
self.start_resource_monitor(self._resource_poll_s)
except Exception:
# Don't fail construction if monitor can't start
pass
def _draw_status_indicator(self, canvas: tk.Canvas, color: str) -> None:

View File

@ -172,7 +172,24 @@ class SimulationController:
def _stop_or_finish_simulation(self, main_view, was_stopped_by_user: bool):
"""Unified logic for handling simulation end, either by user or naturally."""
if self.current_archive:
self.current_archive.save()
# --- NUOVA AGGIUNTA INIZIO ---
# Retrieve estimated latency before saving the archive
estimated_latency_s = 0.0
extra_metadata = {}
try:
target_comm = getattr(self.communicator_manager, "target_communicator", None)
if target_comm and hasattr(target_comm, 'router'):
router = target_comm.router()
if router and hasattr(router, 'get_estimated_latency_s'):
estimated_latency_s = router.get_estimated_latency_s()
if estimated_latency_s > 0:
extra_metadata['estimated_latency_ms'] = round(estimated_latency_s * 1000, 2)
except Exception as e:
self.logger.warning(f"Could not retrieve estimated latency for archive: {e}")
# --- NUOVA AGGIUNTA FINE ---
self.current_archive.save(extra_metadata=extra_metadata)
self.current_archive = main_view.current_archive = None
target_comm = getattr(self.communicator_manager, "target_communicator", None)

View File

@ -1,5 +1,3 @@
# target_simulator/utils/clock_synchronizer.py
"""
Provides a ClockSynchronizer class to model the relationship between a remote
server's wrapping 32-bit timetag and the local monotonic clock.
@ -8,7 +6,6 @@ import collections
import threading
import time
from typing import List, Tuple
#NumPy is a strong recommendation for linear regression.
#If it's not already a dependency, it should be added.
try:
@ -16,14 +13,11 @@ try:
NUMPY_AVAILABLE = True
except ImportError:
NUMPY_AVAILABLE = False
class ClockSynchronizer:
"""
Synchronizes a remote wrapping 32-bit counter with the local monotonic clock
using linear regression to model clock offset and drift.
"""
# Constants for a 32-bit counter
_COUNTER_MAX = 2**32
_WRAP_THRESHOLD = 2**31 # Detect wrap if decrease is > half the max value
@ -52,6 +46,9 @@ class ClockSynchronizer:
self._m: float = 0.0 # Slope (client seconds per server tick)
self._b: float = 0.0 # Intercept (client time when server time was 0)
# Estimated one-way latency from server to client
self._average_latency_s: float = 0.0
def add_sample(self, raw_server_timetag: int, client_reception_time: float):
"""
Adds a new sample pair to update the synchronization model.
@ -81,7 +78,7 @@ class ClockSynchronizer:
def _update_model(self):
"""
Performs linear regression on the stored history to update the
model parameters (m and b).
model parameters (m and b) and the average latency.
This method must be called within a locked context.
"""
if len(self._history) < self._min_samples:
@ -91,14 +88,24 @@ class ClockSynchronizer:
x_vals = np.array([sample[0] for sample in self._history])
y_vals = np.array([sample[1] for sample in self._history])
# Use polyfit to find the slope (m) and intercept (b) of the best-fit line
try:
m, b = np.polyfit(x_vals, y_vals, 1)
self._m = m
self._b = b
# --- Calculate Average Latency ---
# Estimated generation time for each sample based on the new model
estimated_generation_times = self._m * x_vals + self._b
# Latency is the difference between reception and estimated generation
latencies = y_vals - estimated_generation_times
# Update the average latency, filtering out negative values which are artifacts
positive_latencies = latencies[latencies >= 0]
if len(positive_latencies) > 0:
self._average_latency_s = np.mean(positive_latencies)
else:
self._average_latency_s = 0.0
except np.linalg.LinAlgError:
# This can happen if data is not well-conditioned, though unlikely here.
# In this case, we just keep the old model parameters.
pass
def to_client_time(self, raw_server_timetag: int) -> float:
@ -113,18 +120,24 @@ class ClockSynchronizer:
The estimated client monotonic time when the event occurred.
"""
with self._lock:
# Determine the correct wrap count for this specific timestamp.
# This handles cases where the timetag might be slightly older
# than the most recent sample.
current_wrap_count = self._wrap_count
if self._last_raw_timetag is not None:
diff = self._last_raw_timetag - raw_server_timetag
if diff < -self._WRAP_THRESHOLD:
# This timetag is from just before the last wrap
current_wrap_count -= 1
unwrapped_timetag = raw_server_timetag + current_wrap_count * self._COUNTER_MAX
# Apply the linear model
estimated_time = self._m * unwrapped_timetag + self._b
return estimated_time
def get_average_latency_s(self) -> float:
"""
Returns the current estimated average one-way network latency from
server to client in seconds.
Returns:
The average latency in seconds, or 0.0 if not yet computed.
"""
with self._lock:
return self._average_latency_s