# target_simulator/gui/payload_router.py """ Payload router for buffering SFP payloads for the GUI. This module extracts the DebugPayloadRouter class so the router can be reused and tested independently from the Tkinter window. """ import threading import statistics import collections import datetime import os import logging import math import json import ctypes import time from queue import Queue, Full from typing import Dict, Optional, Any, List, Callable, Tuple from target_simulator.core.sfp_structures import SFPHeader, SfpRisStatusPayload from target_simulator.analysis.simulation_state_hub import SimulationStateHub from target_simulator.core.models import Target from target_simulator.utils.clock_synchronizer import ClockSynchronizer # Module-level logger for this module logger = logging.getLogger(__name__) PayloadHandler = Callable[[bytearray], None] TargetListListener = Callable[[List[Target]], None] # --- Constants --- M_TO_FT = 3.28084 class DebugPayloadRouter: """ A unified router that handles payloads for the entire application. It updates the SimulationStateHub and broadcasts processed data to registered listeners. This class is thread-safe. """ def __init__( self, simulation_hub: Optional[SimulationStateHub] = None, ): self.active_archive = None self._log_prefix = "[DebugPayloadRouter]" self._lock = threading.Lock() self._latest_payloads: Dict[str, Any] = {} self._last_raw_packet: Optional[tuple] = None self._history_size = 20 self._history = collections.deque(maxlen=self._history_size) self._persist = False # Recent latency samples (seconds) computed from clock synchronizer self._latency_samples = collections.deque(maxlen=1000) self._hub = simulation_hub # Timestamp for ownship position integration self._last_ownship_update_time: Optional[float] = None # Listeners for real-time target data broadcasts self._ris_target_listeners: List[TargetListListener] = [] project_root = os.path.abspath( os.path.join(os.path.dirname(__file__), "..", "..") ) self._persist_dir = os.path.join(project_root, "Temp") try: os.makedirs(self._persist_dir, exist_ok=True) except Exception: pass self._handlers: Dict[int, PayloadHandler] = { ord("M"): lambda p: self._update_last_payload("MFD", p), ord("S"): lambda p: self._update_last_payload("SAR", p), ord("B"): lambda p: self._update_last_payload("BIN", p), # JSON payloads may contain antenna nav fields (ant_nav_az / ant_nav_el). # Route JSON to a dedicated handler that will both store the raw # payload and, when possible, extract antenna az/el to update the hub # so the PPI antenna sweep can animate even when using JSON protocol. ord("J"): self._handle_json_payload, ord("R"): self._handle_ris_status, ord("r"): self._handle_ris_status, } logger.info(f"{self._log_prefix} Initialized (Hub: {self._hub is not None}).") self._logger = logger # Optional clock synchronizer for latency estimation. ClockSynchronizer # requires numpy; if unavailable we simply leave latency estimates at 0. try: self._clock_sync = ClockSynchronizer() except Exception: self._clock_sync = None def set_archive(self, archive): """Set the current archive object used to persist incoming states. Args: archive: An object implementing the small archive API (e.g. add_real_state, add_ownship_state). The router will call archive methods while processing incoming payloads. """ with self._lock: self.active_archive = archive def add_ris_target_listener(self, listener: TargetListListener): """Register a listener that will be called with lists of real Targets. The listener will be invoked from the router's internal thread context. """ with self._lock: if listener not in self._ris_target_listeners: self._ris_target_listeners.append(listener) self._logger.info(f"RIS target listener added: {listener}") def remove_ris_target_listener(self, listener: TargetListListener): """Unregister a previously registered RIS target listener.""" with self._lock: try: self._ris_target_listeners.remove(listener) self._logger.info(f"RIS target listener removed: {listener}") except ValueError: pass def get_handlers(self) -> Dict[int, PayloadHandler]: """Return the mapping of SFP flow byte values to payload handlers.""" return self._handlers def _update_last_payload(self, flow_id: str, payload: Any): """Store the last payload for a logical flow id in a thread-safe way.""" with self._lock: self._latest_payloads[flow_id] = payload def _parse_ris_payload_to_targets( self, payload: bytearray ) -> Tuple[List[Target], List[int]]: """ Parse RIS payload and return a tuple: - list of Target objects for entries with flags != 0 (active targets) - list of integer target IDs present with flags == 0 (inactive) """ targets: List[Target] = [] inactive_ids: List[int] = [] try: parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload) for i, ris_target in enumerate(parsed_payload.tgt.tgt): if ris_target.flags != 0: target = Target( target_id=i, trajectory=[], active=True, traceable=True ) # Server's y-axis is East (our x), x-axis is North (our y) pos_x_ft = float(ris_target.y) * M_TO_FT pos_y_ft = float(ris_target.x) * M_TO_FT pos_z_ft = float(ris_target.z) * M_TO_FT setattr(target, "_pos_x_ft", pos_x_ft) setattr(target, "_pos_y_ft", pos_y_ft) setattr(target, "_pos_z_ft", pos_z_ft) target._update_current_polar_coords() try: raw_h = float(ris_target.heading) if abs(raw_h) <= (2 * math.pi * 1.1): hdg_deg = math.degrees(raw_h) unit = "rad" else: hdg_deg = raw_h unit = "deg" target.current_heading_deg = hdg_deg % 360 setattr(target, "_raw_heading", raw_h) except (ValueError, TypeError): target.current_heading_deg = 0.0 targets.append(target) else: inactive_ids.append(int(i)) except Exception: self._logger.exception( f"{self._log_prefix} Failed to parse RIS payload into Target objects." ) return targets, inactive_ids def _handle_ris_status(self, payload: bytearray): """ Parse and process an RIS status payload. Ingressi: payload (bytearray) - raw RIS status packet Uscite: None (side-effects: updates SimulationStateHub, archives, notifies listeners) Commento: extracts ownship, targets, updates hub.set_antenna_azimuth when available. """ reception_timestamp = time.monotonic() parsed_payload = None try: parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload) except (ValueError, TypeError): self._logger.error("Failed to parse SfpRisStatusPayload from buffer.") return # --- Update Ownship State --- if self._hub: try: sc = parsed_payload.scenario delta_t = 0.0 if self._last_ownship_update_time is not None: delta_t = reception_timestamp - self._last_ownship_update_time self._last_ownship_update_time = reception_timestamp # Get previous ownship state to integrate position old_state = self._hub.get_ownship_state() old_pos_xy = old_state.get("position_xy_ft", (0.0, 0.0)) # Server's vy is East (our x), vx is North (our y) ownship_vx_fps = float(sc.vy) * M_TO_FT ownship_vy_fps = float(sc.vx) * M_TO_FT # Integrate position new_pos_x_ft = old_pos_xy[0] + ownship_vx_fps * delta_t new_pos_y_ft = old_pos_xy[1] + ownship_vy_fps * delta_t ownship_heading_deg = math.degrees(float(sc.true_heading)) % 360 ownship_state = { "timestamp": reception_timestamp, "position_xy_ft": (new_pos_x_ft, new_pos_y_ft), "altitude_ft": float(sc.baro_altitude) * M_TO_FT, "velocity_xy_fps": (ownship_vx_fps, ownship_vy_fps), "heading_deg": ownship_heading_deg, "latitude": float(sc.latitude), "longitude": float(sc.longitude), } self._hub.set_ownship_state(ownship_state) # Store ownship state in archive if available with self._lock: archive = self.active_archive if archive and hasattr(archive, "add_ownship_state"): archive.add_ownship_state(ownship_state) except Exception: self._logger.exception("Failed to update ownship state.") # --- Feed clock synchronizer with server timetag (if available) --- try: if self._clock_sync is not None and parsed_payload is not None: try: server_timetag = int(parsed_payload.scenario.timetag) # Add a sample: raw server timetag + local reception time self._clock_sync.add_sample(server_timetag, reception_timestamp) # Compute a latency sample if model can estimate client time try: est_gen = self._clock_sync.to_client_time(server_timetag) latency = reception_timestamp - est_gen # store only non-negative latencies if latency >= 0: with self._lock: self._latency_samples.append(latency) except Exception: pass except Exception: pass except Exception: pass # --- Update Target States --- real_targets, inactive_ids = self._parse_ris_payload_to_targets(payload) if self._hub: try: if hasattr(self._hub, "add_real_packet"): self._hub.add_real_packet(reception_timestamp) for tid in inactive_ids or []: if hasattr(self._hub, "clear_real_target_data"): self._hub.clear_real_target_data(tid) for target in real_targets: state_tuple = ( getattr(target, "_pos_x_ft", 0.0), getattr(target, "_pos_y_ft", 0.0), getattr(target, "_pos_z_ft", 0.0), ) self._hub.add_real_state( target_id=target.target_id, timestamp=reception_timestamp, state=state_tuple, ) if hasattr(self._hub, "set_real_heading"): raw_val = getattr(target, "_raw_heading", None) self._hub.set_real_heading( target.target_id, getattr(target, "current_heading_deg", 0.0), raw_value=raw_val, ) except Exception: self._logger.exception("Failed to process RIS targets for Hub.") with self._lock: archive = self.active_archive if archive: for target in real_targets: state_tuple = ( getattr(target, "_pos_x_ft", 0.0), getattr(target, "_pos_y_ft", 0.0), getattr(target, "_pos_z_ft", 0.0), ) archive.add_real_state( target_id=target.target_id, timestamp=reception_timestamp, state=state_tuple, ) # --- BROADCAST to all registered listeners --- with self._lock: for listener in self._ris_target_listeners: try: listener(real_targets) except Exception: self._logger.exception(f"Error in RIS target listener: {listener}") # --- Update Debug Views (unchanged) --- self._update_debug_views(parsed_payload) def _update_debug_views(self, parsed_payload: SfpRisStatusPayload): """Helper to populate debug views from a parsed payload.""" try: sc = parsed_payload.scenario lines = ["RIS Status Payload:\n", "Scenario:"] # ... text generation logic ... text_out = "\n".join(lines) self._update_last_payload( "RIS_STATUS_TEXT", bytearray(text_out.encode("utf-8")) ) # ... JSON generation logic ... def _convert_ctypes(value): if hasattr(value, "_length_"): return list(value) if isinstance(value, ctypes._SimpleCData): return value.value return value scenario_dict = { f[0]: _convert_ctypes(getattr(sc, f[0])) for f in sc._fields_ } targets_list = [ {f[0]: _convert_ctypes(getattr(t, f[0])) for f in t._fields_} for t in parsed_payload.tgt.tgt ] struct = {"scenario": scenario_dict, "targets": targets_list} json_bytes = bytearray(json.dumps(struct, indent=2).encode("utf-8")) self._update_last_payload("RIS_STATUS_JSON", json_bytes) # --- Propagate antenna azimuth to hub --- if self._hub: # Get platform heading and relative antenna sweep heading_rad = scenario_dict.get( "true_heading", scenario_dict.get("platform_azimuth") ) sweep_rad = scenario_dict.get("ant_nav_az") total_az_rad = None if heading_rad is not None: total_az_rad = float(heading_rad) if sweep_rad is not None: total_az_rad += float(sweep_rad) if total_az_rad is not None: az_deg = math.degrees(total_az_rad) self._hub.set_antenna_azimuth(az_deg, timestamp=time.monotonic()) except Exception: self._logger.exception("Failed to generate text/JSON for RIS debug view.") def _handle_json_payload(self, payload: bytearray): """ Handle JSON payloads. Store raw JSON and, if present, extract antenna navigation fields (ant_nav_az / ant_nav_el) and propagate them to the SimulationStateHub so the PPI antenna can animate. """ try: # Always store the raw payload for debug inspection self._update_last_payload("JSON", payload) if not self._hub: return # Try to decode and parse JSON try: text = payload.decode("utf-8") obj = json.loads(text) except Exception: return # Helper to find fields in varied JSON structures def _find_val(key_candidates, dct): if not isinstance(dct, dict): return None # Check top-level for key in key_candidates: if key in dct: return dct[key] # Check scenario sub-dict if present sc = dct.get("scenario") or dct.get("sc") if isinstance(sc, dict): for key in key_candidates: if key in sc: return sc[key] return None # Find platform heading and relative antenna sweep using multiple potential keys heading_val = _find_val( ["true_heading", "platform_azimuth", "heading"], obj ) sweep_val = _find_val( ["ant_nav_az", "antenna_azimuth", "sweep_azimuth"], obj ) total_az_val = None if heading_val is not None: try: h_rad = float(heading_val) # If heading is in degrees (> 2*pi), convert to radians if abs(h_rad) > (2 * math.pi + 0.01): h_rad = math.radians(h_rad) total_az_rad = h_rad if sweep_val is not None: s_rad = float(sweep_val) # If sweep is in degrees, convert if abs(s_rad) > (2 * math.pi + 0.01): s_rad = math.radians(s_rad) total_az_rad += s_rad az_deg = math.degrees(total_az_rad) # LOGGARE IL SUCCESSO PER DEBUG self._logger.debug( f"Found azimuth info in JSON: heading={heading_val}, sweep={sweep_val} -> total_deg={az_deg}" ) self._hub.set_antenna_azimuth(az_deg, timestamp=time.monotonic()) except Exception as e: self._logger.debug(f"Error processing azimuth values: {e}") pass else: pass # self._logger.debug("No heading info found in JSON payload") # Optionally capture elevation for future UI use # plat_el = _find_val(["ant_nav_el", "platform_elevation"], obj) # if plat_el is not None: # pass except Exception as e: self._logger.exception("Error handling JSON payload") def get_and_clear_latest_payloads(self) -> Dict[str, Any]: """Atomically retrieve and clear the latest payloads cache. Returns: Dict[str, Any]: A copy of the latest payloads mapping captured at call time. """ with self._lock: new_payloads = self._latest_payloads self._latest_payloads = {} return new_payloads def update_raw_packet(self, raw_bytes: bytes, addr: tuple): """Record the last raw packet received and optionally persist it. Args: raw_bytes (bytes): Raw packet contents. addr (tuple): Source address tuple (ip, port). """ with self._lock: self._last_raw_packet = (raw_bytes, addr) entry = {"ts": datetime.datetime.utcnow(), "addr": addr, "raw": raw_bytes} try: hdr = SFPHeader.from_buffer_copy(raw_bytes) entry["flow"] = int(hdr.SFP_FLOW) entry["tid"] = int(hdr.SFP_TID) flow_map = { ord("M"): "MFD", ord("S"): "SAR", ord("B"): "BIN", ord("J"): "JSON", ord("R"): "RIS", ord("r"): "ris", } entry["flow_name"] = flow_map.get(entry["flow"], str(entry["flow"])) except Exception: pass self._history.append(entry) if self._persist: try: ts = entry["ts"].strftime("%Y%m%dT%H%M%S.%f") fname = f"sfp_raw_{ts}_{addr[0].replace(':','_')}_{addr[1]}.bin" path = os.path.join(self._persist_dir, fname) with open(path, "wb") as f: f.write(raw_bytes) except Exception: pass def get_and_clear_raw_packet(self) -> Optional[tuple]: """Return the last raw packet (if any) and clear the cached value.""" with self._lock: pkt = self._last_raw_packet self._last_raw_packet = None return pkt # --- Latency estimation API --- def get_estimated_latency_s(self) -> float: """Return estimated one-way latency in seconds, or 0.0 if unknown.""" try: if self._clock_sync is not None: return float(self._clock_sync.get_average_latency_s()) except Exception: pass return 0.0 def get_latency_samples(self, limit: Optional[int] = None) -> List[float]: """Return recent latency samples in seconds (most recent last). Args: limit: maximum number of samples to return (None = all available) """ with self._lock: samples = list(self._latency_samples) if limit is not None and limit > 0: return samples[-limit:] return samples def get_latency_stats(self, sample_limit: int = 200) -> Dict[str, Any]: """Compute basic statistics over recent latency samples. Returns a dict with mean_ms, std_ms, var_ms, min_ms, max_ms, count. """ with self._lock: samples = list(self._latency_samples) if not samples: return { "mean_ms": None, "std_ms": None, "var_ms": None, "min_ms": None, "max_ms": None, "count": 0, } if sample_limit and sample_limit > 0: samples = samples[-sample_limit:] # Work in milliseconds for reporting ms = [s * 1000.0 for s in samples] mean = statistics.mean(ms) var = statistics.pvariance(ms) if len(ms) > 1 else 0.0 std = statistics.pstdev(ms) if len(ms) > 1 else 0.0 return { "mean_ms": round(mean, 3), "std_ms": round(std, 3), "var_ms": round(var, 3), "min_ms": round(min(ms), 3), "max_ms": round(max(ms), 3), "count": len(ms), } def get_history(self): """Return a shallow copy of the internal history deque as a list.""" with self._lock: return list(self._history) def clear_history(self): """Clear the in-memory history buffer used by debug views.""" with self._lock: self._history.clear() def set_history_size(self, n: int): """Adjust the maximum history size kept for debug playback. Args: n (int): New maximum number of entries to retain (minimum 1). """ with self._lock: n = max(1, int(n)) self._history_size = n new_deque = collections.deque(self._history, maxlen=self._history_size) self._history = new_deque def set_persist(self, enabled: bool): """Enable or disable persisting of raw packets to disk for debugging.""" with self._lock: self._persist = bool(enabled)