diff --git a/target_simulator/analysis/__init__.py b/target_simulator/analysis/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/target_simulator/communication/__init__.py b/target_simulator/communication/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/target_simulator/gui/payload_router.py b/target_simulator/gui/payload_router.py index 8d0d133..bc6870f 100644 --- a/target_simulator/gui/payload_router.py +++ b/target_simulator/gui/payload_router.py @@ -21,6 +21,7 @@ from typing import Dict, Optional, Any, List, Callable, Tuple from target_simulator.core.sfp_structures import SFPHeader, SfpRisStatusPayload from target_simulator.analysis.simulation_state_hub import SimulationStateHub from target_simulator.core.models import Target +from target_simulator.utils.clock_synchronizer import ClockSynchronizer # Module-level logger for this module logger = logging.getLogger(__name__) @@ -51,6 +52,8 @@ class DebugPayloadRouter: self._hub = simulation_hub + self._clock_synchronizer = ClockSynchronizer() + # Listeners for real-time target data broadcasts self._ris_target_listeners: List[TargetListListener] = [] @@ -162,35 +165,39 @@ class DebugPayloadRouter: return targets, inactive_ids def _handle_ris_status(self, payload: bytearray): - real_targets, inactive_ids = self._parse_ris_payload_to_targets(payload) + # --- MODIFICA INIZIO --- + client_reception_time = time.monotonic() + # Attempt to parse payload and server timetag for synchronization + try: + parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload) + server_timetag = parsed_payload.scenario.timetag + + # 1. Update the synchronization model with the new sample + self._clock_synchronizer.add_sample(server_timetag, client_reception_time) + + # 2. Convert the server timetag to an estimated client-domain generation time + estimated_generation_time = self._clock_synchronizer.to_client_time(server_timetag) + + except (ValueError, TypeError, IndexError): + # If parsing fails, we cannot sync. Fallback to reception time. + self._logger.warning("Could not parse RIS payload for timetag. Using reception time for sync.") + estimated_generation_time = client_reception_time + + real_targets, inactive_ids = self._parse_ris_payload_to_targets(payload) + if self._hub: try: - # Use the client's monotonic clock as the single source of truth for timestamps - # to ensure simulated and real data can be correlated. - reception_timestamp = time.monotonic() + # Record a single packet-level arrival timestamp + if hasattr(self._hub, "add_real_packet"): + self._hub.add_real_packet(client_reception_time) - # Record a single packet-level arrival timestamp so callers can - # measure packets/sec (instead of per-target events/sec). - try: - if hasattr(self._hub, "add_real_packet"): - self._hub.add_real_packet(reception_timestamp) - except Exception: - # Non-fatal: continue even if packet recording fails - pass + # Clear inactive targets + for tid in inactive_ids or []: + if hasattr(self._hub, "clear_real_target_data"): + self._hub.clear_real_target_data(tid) - # If payload included inactive targets (flags==0), clear their stored - # real data so they disappear from the PPI immediately. - try: - for tid in inactive_ids or []: - if hasattr(self._hub, "clear_real_target_data"): - self._hub.clear_real_target_data(tid) - except Exception: - self._logger.debug( - "Failed to clear inactive target data in hub", exc_info=True - ) - - # Add real states for active targets + # Add real states for active targets using the ESTIMATED generation time for target in real_targets: state_tuple = ( getattr(target, "_pos_x_ft", 0.0), @@ -199,32 +206,20 @@ class DebugPayloadRouter: ) self._hub.add_real_state( target_id=target.target_id, - timestamp=reception_timestamp, + timestamp=estimated_generation_time, # <-- MODIFICA CHIAVE state=state_tuple, ) - # Propagate heading information (if available) into the hub - try: - for target in real_targets: - if hasattr(self._hub, "set_real_heading"): - raw_val = getattr(target, "_raw_heading", None) - self._hub.set_real_heading( - target.target_id, - getattr(target, "current_heading_deg", 0.0), - raw_value=raw_val, - ) - except Exception: - self._logger.debug( - "Failed to propagate heading to hub", exc_info=True - ) + # Propagate heading information + for target in real_targets: + if hasattr(self._hub, "set_real_heading"): + raw_val = getattr(target, "_raw_heading", None) + self._hub.set_real_heading( + target.target_id, + getattr(target, "current_heading_deg", 0.0), + raw_value=raw_val, + ) - # if self._update_queue: - # try: - # self._update_queue.put_nowait([]) - # except Full: - # self._logger.warning( - # f"{self._log_prefix} GUI update queue is full; dropped notification." - # ) except Exception: self._logger.exception( "DebugPayloadRouter: Failed to process RIS for Hub." @@ -234,7 +229,6 @@ class DebugPayloadRouter: archive = self.active_archive if archive: - reception_timestamp = time.monotonic() for target in real_targets: state_tuple = ( getattr(target, "_pos_x_ft", 0.0), @@ -243,9 +237,10 @@ class DebugPayloadRouter: ) archive.add_real_state( target_id=target.target_id, - timestamp=reception_timestamp, + timestamp=estimated_generation_time, # <-- MODIFICA CHIAVE state=state_tuple, ) + # --- MODIFICA FINE --- # --- BROADCAST to all registered listeners --- with self._lock: @@ -254,21 +249,22 @@ class DebugPayloadRouter: listener(real_targets) except Exception: self._logger.exception(f"Error in RIS target listener: {listener}") - - # --- Buffer other data for debug tabs --- + + # ... (il resto della funzione per il debug rimane invariato) try: if len(payload) >= SfpRisStatusPayload.size(): - parsed = SfpRisStatusPayload.from_buffer_copy(payload) - sc = parsed.scenario + # Re-parse if not already done (for robustness) + if 'parsed_payload' not in locals(): + parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload) + + sc = parsed_payload.scenario # ... (Text generation logic remains unchanged) ... lines = ["RIS Status Payload:\n", "Scenario:"] lines.append(f" timetag : {sc.timetag}") # ... etc. text_out = "\n".join(lines) - # Keep backward compatibility: store textual summary under the legacy key self._update_last_payload( "RIS_STATUS", bytearray(text_out.encode("utf-8")) ) - # Also provide explicitly-named text and JSON variants for newer UI consumers self._update_last_payload( "RIS_STATUS_TEXT", bytearray(text_out.encode("utf-8")) ) @@ -281,20 +277,17 @@ class DebugPayloadRouter: return value scenario_dict = { - f[0]: _convert_ctypes(getattr(parsed.scenario, f[0])) - for f in parsed.scenario._fields_ + f[0]: _convert_ctypes(getattr(parsed_payload.scenario, f[0])) + for f in parsed_payload.scenario._fields_ } targets_list = [ {f[0]: _convert_ctypes(getattr(t, f[0])) for f in t._fields_} - for t in parsed.tgt.tgt + for t in parsed_payload.tgt.tgt ] struct = {"scenario": scenario_dict, "targets": targets_list} json_bytes = bytearray(json.dumps(struct, indent=2).encode("utf-8")) self._update_last_payload("RIS_STATUS_JSON", json_bytes) - # Propagate antenna azimuth into the hub so the GUI can render - # the antenna orientation. Prefer the RIS field `ant_nav_az` (this - # is the antenna navigation azimuth). For backward compatibility - # fall back to `platform_azimuth` if `ant_nav_az` is not present. + try: plat = None if "ant_nav_az" in scenario_dict: @@ -309,31 +302,18 @@ class DebugPayloadRouter: ): try: val = float(plat) - # If the value looks like radians (<= ~2*pi) convert to degrees if abs(val) <= (2 * math.pi * 1.1): deg = math.degrees(val) else: deg = val - recv_ts = ( - reception_timestamp - if "reception_timestamp" in locals() - else time.monotonic() - ) - try: - # New API: set_antenna_azimuth - if hasattr(self._hub, "set_antenna_azimuth"): - self._hub.set_antenna_azimuth( - deg, timestamp=recv_ts - ) - else: - # Fallback to legacy name if present - self._hub.set_platform_azimuth( - deg, timestamp=recv_ts - ) - except Exception: - self._logger.debug( - "Failed to set antenna/platform azimuth on hub", - exc_info=True, + + if hasattr(self._hub, "set_antenna_azimuth"): + self._hub.set_antenna_azimuth( + deg, timestamp=client_reception_time + ) + else: + self._hub.set_platform_azimuth( + deg, timestamp=client_reception_time ) except Exception: pass diff --git a/target_simulator/simulation/__init__.py b/target_simulator/simulation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/target_simulator/utils/clock_synchronizer.py b/target_simulator/utils/clock_synchronizer.py new file mode 100644 index 0000000..3452529 --- /dev/null +++ b/target_simulator/utils/clock_synchronizer.py @@ -0,0 +1,130 @@ +# target_simulator/utils/clock_synchronizer.py + +""" +Provides a ClockSynchronizer class to model the relationship between a remote +server's wrapping 32-bit timetag and the local monotonic clock. +""" +import collections +import threading +import time +from typing import List, Tuple + +# NumPy is a strong recommendation for linear regression. +# If it's not already a dependency, it should be added. +try: + import numpy as np + NUMPY_AVAILABLE = True +except ImportError: + NUMPY_AVAILABLE = False + + +class ClockSynchronizer: + """ + Synchronizes a remote wrapping 32-bit counter with the local monotonic clock + using linear regression to model clock offset and drift. + """ + + # Constants for a 32-bit counter + _COUNTER_MAX = 2**32 + _WRAP_THRESHOLD = 2**31 # Detect wrap if decrease is > half the max value + + def __init__(self, history_size: int = 100, min_samples_for_fit: int = 10): + """ + Initializes the ClockSynchronizer. + + Args: + history_size: The number of recent samples to use for regression. + min_samples_for_fit: The minimum number of samples required to + perform a linear regression fit. + """ + if not NUMPY_AVAILABLE: + raise ImportError("NumPy is required for the ClockSynchronizer.") + + self._lock = threading.Lock() + self._history: collections.deque = collections.deque(maxlen=history_size) + self._min_samples = min_samples_for_fit + + # State for timestamp unwrapping + self._wrap_count: int = 0 + self._last_raw_timetag: int | None = None + + # Linear model parameters: client_time = m * server_unwrapped_ticks + b + self._m: float = 0.0 # Slope (client seconds per server tick) + self._b: float = 0.0 # Intercept (client time when server time was 0) + + def add_sample(self, raw_server_timetag: int, client_reception_time: float): + """ + Adds a new sample pair to update the synchronization model. + + Args: + raw_server_timetag: The raw 32-bit timetag from the server. + client_reception_time: The local monotonic time of reception. + """ + with self._lock: + # --- Timestamp Unwrapping Logic --- + if self._last_raw_timetag is None: + # First sample, assume no wraps yet. + self._last_raw_timetag = raw_server_timetag + else: + # Check for a wrap-around + diff = self._last_raw_timetag - raw_server_timetag + if diff > self._WRAP_THRESHOLD: + self._wrap_count += 1 + + self._last_raw_timetag = raw_server_timetag + unwrapped_timetag = raw_server_timetag + self._wrap_count * self._COUNTER_MAX + + # Add the new sample to history and update the model + self._history.append((unwrapped_timetag, client_reception_time)) + self._update_model() + + def _update_model(self): + """ + Performs linear regression on the stored history to update the + model parameters (m and b). + This method must be called within a locked context. + """ + if len(self._history) < self._min_samples: + # Not enough data for a reliable fit + return + + x_vals = np.array([sample[0] for sample in self._history]) + y_vals = np.array([sample[1] for sample in self._history]) + + # Use polyfit to find the slope (m) and intercept (b) of the best-fit line + try: + m, b = np.polyfit(x_vals, y_vals, 1) + self._m = m + self._b = b + except np.linalg.LinAlgError: + # This can happen if data is not well-conditioned, though unlikely here. + # In this case, we just keep the old model parameters. + pass + + def to_client_time(self, raw_server_timetag: int) -> float: + """ + Estimates the equivalent local client monotonic time for a given raw + server timetag. + + Args: + raw_server_timetag: The raw 32-bit timetag from the server. + + Returns: + The estimated client monotonic time when the event occurred. + """ + with self._lock: + # Determine the correct wrap count for this specific timestamp. + # This handles cases where the timetag might be slightly older + # than the most recent sample. + current_wrap_count = self._wrap_count + if self._last_raw_timetag is not None: + diff = self._last_raw_timetag - raw_server_timetag + if diff < -self._WRAP_THRESHOLD: + # This timetag is from just before the last wrap + current_wrap_count -= 1 + + unwrapped_timetag = raw_server_timetag + current_wrap_count * self._COUNTER_MAX + + # Apply the linear model + estimated_time = self._m * unwrapped_timetag + self._b + return estimated_time \ No newline at end of file