# target_simulator/gui/payload_router.py """Payload router for buffering SFP payloads for the GUI. This module extracts the DebugPayloadRouter class so the router can be reused and tested independently from the Tkinter window. """ import threading import collections import datetime import os import logging import math import json import ctypes import time from queue import Queue, Full from typing import Dict, Optional, Any, List, Callable, Tuple from target_simulator.core.sfp_structures import SFPHeader, SfpRisStatusPayload from target_simulator.analysis.simulation_state_hub import SimulationStateHub from target_simulator.core.models import Target from target_simulator.utils.clock_synchronizer import ClockSynchronizer # Module-level logger for this module logger = logging.getLogger(__name__) PayloadHandler = Callable[[bytearray], None] TargetListListener = Callable[[List[Target]], None] class DebugPayloadRouter: """ A unified router that handles payloads for the entire application. It updates the SimulationStateHub and broadcasts processed data to registered listeners. This class is thread-safe. """ def __init__( self, simulation_hub: Optional[SimulationStateHub] = None, ): self.active_archive = None self._log_prefix = "[DebugPayloadRouter]" self._lock = threading.Lock() self._latest_payloads: Dict[str, Any] = {} self._last_raw_packet: Optional[tuple] = None self._history_size = 20 self._history = collections.deque(maxlen=self._history_size) self._persist = False self._hub = simulation_hub self._clock_synchronizer = ClockSynchronizer() # Listeners for real-time target data broadcasts self._ris_target_listeners: List[TargetListListener] = [] project_root = os.path.abspath( os.path.join(os.path.dirname(__file__), "..", "..") ) self._persist_dir = os.path.join(project_root, "Temp") try: os.makedirs(self._persist_dir, exist_ok=True) except Exception: pass self._handlers: Dict[int, PayloadHandler] = { ord("M"): lambda p: self._update_last_payload("MFD", p), ord("S"): lambda p: self._update_last_payload("SAR", p), ord("B"): lambda p: self._update_last_payload("BIN", p), ord("J"): lambda p: self._update_last_payload("JSON", p), ord("R"): self._handle_ris_status, ord("r"): self._handle_ris_status, } logger.info(f"{self._log_prefix} Initialized (Hub: {self._hub is not None}).") self._logger = logger def set_archive(self, archive): """Imposta la sessione di archivio corrente per la registrazione.""" with self._lock: self.active_archive = archive def add_ris_target_listener(self, listener: TargetListListener): """Registers a callback function to receive updates for real targets.""" with self._lock: if listener not in self._ris_target_listeners: self._ris_target_listeners.append(listener) self._logger.info(f"RIS target listener added: {listener}") def remove_ris_target_listener(self, listener: TargetListListener): """Unregisters a callback function.""" with self._lock: try: self._ris_target_listeners.remove(listener) self._logger.info(f"RIS target listener removed: {listener}") except ValueError: pass def get_handlers(self) -> Dict[int, PayloadHandler]: return self._handlers def _update_last_payload(self, flow_id: str, payload: Any): with self._lock: self._latest_payloads[flow_id] = payload def _parse_ris_payload_to_targets( self, payload: bytearray ) -> Tuple[List[Target], List[int]]: """ Parse RIS payload and return a tuple: - list of Target objects for entries with flags != 0 (active targets) - list of integer target IDs present with flags == 0 (inactive) """ targets: List[Target] = [] inactive_ids: List[int] = [] try: parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload) for i, ris_target in enumerate(parsed_payload.tgt.tgt): if ris_target.flags != 0: target = Target( target_id=i, trajectory=[], active=True, traceable=True ) M_TO_FT = 3.280839895 pos_x_ft = float(ris_target.y) * M_TO_FT pos_y_ft = float(ris_target.x) * M_TO_FT pos_z_ft = float(ris_target.z) * M_TO_FT setattr(target, "_pos_x_ft", pos_x_ft) setattr(target, "_pos_y_ft", pos_y_ft) setattr(target, "_pos_z_ft", pos_z_ft) target._update_current_polar_coords() try: raw_h = float(ris_target.heading) # Server should send heading in radians; but be tolerant: # if the magnitude looks like radians (<= ~2*pi) convert to degrees, # otherwise assume it's already degrees. if abs(raw_h) <= (2 * math.pi * 1.1): hdg_deg = math.degrees(raw_h) unit = "rad" else: hdg_deg = raw_h unit = "deg" target.current_heading_deg = hdg_deg % 360 # Store the raw value on the Target for later correlation try: setattr(target, "_raw_heading", raw_h) except Exception: pass self._logger.debug( f"Parsed RIS heading for target {i}: raw={raw_h} assumed={unit} hdg_deg={target.current_heading_deg:.6f}" ) except (ValueError, TypeError): target.current_heading_deg = 0.0 targets.append(target) else: try: inactive_ids.append(int(i)) except Exception: pass except Exception: self._logger.exception( f"{self._log_prefix} Failed to parse RIS payload into Target objects." ) return targets, inactive_ids def _handle_ris_status(self, payload: bytearray): # --- MODIFICA INIZIO --- client_reception_time = time.monotonic() # Attempt to parse payload and server timetag for synchronization try: parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload) server_timetag = parsed_payload.scenario.timetag # 1. Update the synchronization model with the new sample self._clock_synchronizer.add_sample(server_timetag, client_reception_time) # 2. Convert the server timetag to an estimated client-domain generation time estimated_generation_time = self._clock_synchronizer.to_client_time(server_timetag) except (ValueError, TypeError, IndexError): # If parsing fails, we cannot sync. Fallback to reception time. self._logger.warning("Could not parse RIS payload for timetag. Using reception time for sync.") estimated_generation_time = client_reception_time real_targets, inactive_ids = self._parse_ris_payload_to_targets(payload) if self._hub: try: # Record a single packet-level arrival timestamp if hasattr(self._hub, "add_real_packet"): self._hub.add_real_packet(client_reception_time) # Clear inactive targets for tid in inactive_ids or []: if hasattr(self._hub, "clear_real_target_data"): self._hub.clear_real_target_data(tid) # Add real states for active targets using the ESTIMATED generation time for target in real_targets: state_tuple = ( getattr(target, "_pos_x_ft", 0.0), getattr(target, "_pos_y_ft", 0.0), getattr(target, "_pos_z_ft", 0.0), ) self._hub.add_real_state( target_id=target.target_id, timestamp=estimated_generation_time, # <-- MODIFICA CHIAVE state=state_tuple, ) # Propagate heading information for target in real_targets: if hasattr(self._hub, "set_real_heading"): raw_val = getattr(target, "_raw_heading", None) self._hub.set_real_heading( target.target_id, getattr(target, "current_heading_deg", 0.0), raw_value=raw_val, ) except Exception: self._logger.exception( "DebugPayloadRouter: Failed to process RIS for Hub." ) with self._lock: archive = self.active_archive if archive: for target in real_targets: state_tuple = ( getattr(target, "_pos_x_ft", 0.0), getattr(target, "_pos_y_ft", 0.0), getattr(target, "_pos_z_ft", 0.0), ) archive.add_real_state( target_id=target.target_id, timestamp=estimated_generation_time, # <-- MODIFICA CHIAVE state=state_tuple, ) # --- MODIFICA FINE --- # --- BROADCAST to all registered listeners --- with self._lock: for listener in self._ris_target_listeners: try: listener(real_targets) except Exception: self._logger.exception(f"Error in RIS target listener: {listener}") # ... (il resto della funzione per il debug rimane invariato) try: if len(payload) >= SfpRisStatusPayload.size(): # Re-parse if not already done (for robustness) if 'parsed_payload' not in locals(): parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload) sc = parsed_payload.scenario # ... (Text generation logic remains unchanged) ... lines = ["RIS Status Payload:\n", "Scenario:"] lines.append(f" timetag : {sc.timetag}") # ... etc. text_out = "\n".join(lines) self._update_last_payload( "RIS_STATUS", bytearray(text_out.encode("utf-8")) ) self._update_last_payload( "RIS_STATUS_TEXT", bytearray(text_out.encode("utf-8")) ) def _convert_ctypes(value): if hasattr(value, "_length_"): return list(value) if isinstance(value, ctypes._SimpleCData): return value.value return value scenario_dict = { f[0]: _convert_ctypes(getattr(parsed_payload.scenario, f[0])) for f in parsed_payload.scenario._fields_ } targets_list = [ {f[0]: _convert_ctypes(getattr(t, f[0])) for f in t._fields_} for t in parsed_payload.tgt.tgt ] struct = {"scenario": scenario_dict, "targets": targets_list} json_bytes = bytearray(json.dumps(struct, indent=2).encode("utf-8")) self._update_last_payload("RIS_STATUS_JSON", json_bytes) try: plat = None if "ant_nav_az" in scenario_dict: plat = scenario_dict.get("ant_nav_az") elif "platform_azimuth" in scenario_dict: plat = scenario_dict.get("platform_azimuth") if ( plat is not None and self._hub and hasattr(self._hub, "set_platform_azimuth") ): try: val = float(plat) if abs(val) <= (2 * math.pi * 1.1): deg = math.degrees(val) else: deg = val if hasattr(self._hub, "set_antenna_azimuth"): self._hub.set_antenna_azimuth( deg, timestamp=client_reception_time ) else: self._hub.set_platform_azimuth( deg, timestamp=client_reception_time ) except Exception: pass except Exception: self._logger.debug( "Error while extracting antenna azimuth from RIS payload", exc_info=True, ) except Exception: self._logger.exception("Failed to generate text/JSON for RIS debug view.") def get_and_clear_latest_payloads(self) -> Dict[str, Any]: with self._lock: new_payloads = self._latest_payloads self._latest_payloads = {} return new_payloads # ... (il resto del file rimane invariato) ... def update_raw_packet(self, raw_bytes: bytes, addr: tuple): with self._lock: self._last_raw_packet = (raw_bytes, addr) entry = {"ts": datetime.datetime.utcnow(), "addr": addr, "raw": raw_bytes} try: hdr = SFPHeader.from_buffer_copy(raw_bytes) entry["flow"] = int(hdr.SFP_FLOW) entry["tid"] = int(hdr.SFP_TID) flow_map = { ord("M"): "MFD", ord("S"): "SAR", ord("B"): "BIN", ord("J"): "JSON", ord("R"): "RIS", ord("r"): "ris", } entry["flow_name"] = flow_map.get(entry["flow"], str(entry["flow"])) except Exception: pass self._history.append(entry) if self._persist: try: ts = entry["ts"].strftime("%Y%m%dT%H%M%S.%f") fname = f"sfp_raw_{ts}_{addr[0].replace(':','_')}_{addr[1]}.bin" path = os.path.join(self._persist_dir, fname) with open(path, "wb") as f: f.write(raw_bytes) except Exception: pass def get_and_clear_raw_packet(self) -> Optional[tuple]: with self._lock: pkt = self._last_raw_packet self._last_raw_packet = None return pkt def get_history(self): with self._lock: return list(self._history) def clear_history(self): with self._lock: self._history.clear() def set_history_size(self, n: int): with self._lock: try: n = max(1, int(n)) except Exception: return self._history_size = n new_deque = collections.deque(self._history, maxlen=self._history_size) self._history = new_deque def set_persist(self, enabled: bool): with self._lock: self._persist = bool(enabled)