# target_simulator/gui/payload_router.py """ Payload router for buffering SFP payloads for the GUI. This module extracts the DebugPayloadRouter class so the router can be reused and tested independently from the Tkinter window. """ import threading import statistics import collections import datetime import os import logging import math import json import ctypes import time from queue import Queue, Full from typing import Dict, Optional, Any, List, Callable, Tuple from target_simulator.core.sfp_structures import SFPHeader, SfpRisStatusPayload from target_simulator.analysis.simulation_state_hub import SimulationStateHub from target_simulator.core.models import Target from target_simulator.utils.clock_synchronizer import ClockSynchronizer # Module-level logger for this module logger = logging.getLogger(__name__) PayloadHandler = Callable[[bytearray], None] TargetListListener = Callable[[List[Target]], None] # --- Constants --- M_TO_FT = 3.28084 class DebugPayloadRouter: """ A unified router that handles payloads for the entire application. It updates the SimulationStateHub and broadcasts processed data to registered listeners. This class is thread-safe. """ def __init__( self, simulation_hub: Optional[SimulationStateHub] = None, ): self.active_archive = None self._log_prefix = "[DebugPayloadRouter]" self._lock = threading.Lock() self._latest_payloads: Dict[str, Any] = {} self._last_raw_packet: Optional[tuple] = None self._history_size = 20 self._history = collections.deque(maxlen=self._history_size) self._persist = False # Recent latency samples (seconds) computed from clock synchronizer self._latency_samples = collections.deque(maxlen=1000) self._hub = simulation_hub # Timestamp for ownship position integration self._last_ownship_update_time: Optional[float] = None # Listeners for real-time target data broadcasts self._ris_target_listeners: List[TargetListListener] = [] project_root = os.path.abspath( os.path.join(os.path.dirname(__file__), "..", "..") ) self._persist_dir = os.path.join(project_root, "Temp") try: os.makedirs(self._persist_dir, exist_ok=True) except Exception: pass self._handlers: Dict[int, PayloadHandler] = { ord("M"): lambda p: self._update_last_payload("MFD", p), ord("S"): lambda p: self._update_last_payload("SAR", p), ord("B"): lambda p: self._update_last_payload("BIN", p), ord("J"): lambda p: self._update_last_payload("JSON", p), ord("R"): self._handle_ris_status, ord("r"): self._handle_ris_status, } logger.info(f"{self._log_prefix} Initialized (Hub: {self._hub is not None}).") self._logger = logger # Optional clock synchronizer for latency estimation. ClockSynchronizer # requires numpy; if unavailable we simply leave latency estimates at 0. try: self._clock_sync = ClockSynchronizer() except Exception: self._clock_sync = None def set_archive(self, archive): """Sets the current archive session for recording.""" with self._lock: self.active_archive = archive def add_ris_target_listener(self, listener: TargetListListener): """Registers a callback function to receive updates for real targets.""" with self._lock: if listener not in self._ris_target_listeners: self._ris_target_listeners.append(listener) self._logger.info(f"RIS target listener added: {listener}") def remove_ris_target_listener(self, listener: TargetListListener): """Unregisters a callback function.""" with self._lock: try: self._ris_target_listeners.remove(listener) self._logger.info(f"RIS target listener removed: {listener}") except ValueError: pass def get_handlers(self) -> Dict[int, PayloadHandler]: return self._handlers def _update_last_payload(self, flow_id: str, payload: Any): with self._lock: self._latest_payloads[flow_id] = payload def _parse_ris_payload_to_targets( self, payload: bytearray ) -> Tuple[List[Target], List[int]]: """ Parse RIS payload and return a tuple: - list of Target objects for entries with flags != 0 (active targets) - list of integer target IDs present with flags == 0 (inactive) """ targets: List[Target] = [] inactive_ids: List[int] = [] try: parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload) for i, ris_target in enumerate(parsed_payload.tgt.tgt): if ris_target.flags != 0: target = Target( target_id=i, trajectory=[], active=True, traceable=True ) # Server's y-axis is East (our x), x-axis is North (our y) pos_x_ft = float(ris_target.y) * M_TO_FT pos_y_ft = float(ris_target.x) * M_TO_FT pos_z_ft = float(ris_target.z) * M_TO_FT setattr(target, "_pos_x_ft", pos_x_ft) setattr(target, "_pos_y_ft", pos_y_ft) setattr(target, "_pos_z_ft", pos_z_ft) target._update_current_polar_coords() try: raw_h = float(ris_target.heading) if abs(raw_h) <= (2 * math.pi * 1.1): hdg_deg = math.degrees(raw_h) unit = "rad" else: hdg_deg = raw_h unit = "deg" target.current_heading_deg = hdg_deg % 360 setattr(target, "_raw_heading", raw_h) except (ValueError, TypeError): target.current_heading_deg = 0.0 targets.append(target) else: inactive_ids.append(int(i)) except Exception: self._logger.exception( f"{self._log_prefix} Failed to parse RIS payload into Target objects." ) return targets, inactive_ids def _handle_ris_status(self, payload: bytearray): reception_timestamp = time.monotonic() parsed_payload = None try: parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload) except (ValueError, TypeError): self._logger.error("Failed to parse SfpRisStatusPayload from buffer.") return # --- Update Ownship State --- if self._hub: try: sc = parsed_payload.scenario delta_t = 0.0 if self._last_ownship_update_time is not None: delta_t = reception_timestamp - self._last_ownship_update_time self._last_ownship_update_time = reception_timestamp # Get previous ownship state to integrate position old_state = self._hub.get_ownship_state() old_pos_xy = old_state.get("position_xy_ft", (0.0, 0.0)) # Server's vy is East (our x), vx is North (our y) ownship_vx_fps = float(sc.vy) * M_TO_FT ownship_vy_fps = float(sc.vx) * M_TO_FT # Integrate position new_pos_x_ft = old_pos_xy[0] + ownship_vx_fps * delta_t new_pos_y_ft = old_pos_xy[1] + ownship_vy_fps * delta_t ownship_heading_deg = math.degrees(float(sc.true_heading)) % 360 ownship_state = { "timestamp": reception_timestamp, "position_xy_ft": (new_pos_x_ft, new_pos_y_ft), "altitude_ft": float(sc.baro_altitude) * M_TO_FT, "velocity_xy_fps": (ownship_vx_fps, ownship_vy_fps), "heading_deg": ownship_heading_deg, "latitude": float(sc.latitude), "longitude": float(sc.longitude), } self._hub.set_ownship_state(ownship_state) # Store ownship state in archive if available with self._lock: archive = self.active_archive if archive and hasattr(archive, "add_ownship_state"): archive.add_ownship_state(ownship_state) except Exception: self._logger.exception("Failed to update ownship state.") # --- Feed clock synchronizer with server timetag (if available) --- try: if self._clock_sync is not None and parsed_payload is not None: try: server_timetag = int(parsed_payload.scenario.timetag) # Add a sample: raw server timetag + local reception time self._clock_sync.add_sample(server_timetag, reception_timestamp) # Compute a latency sample if model can estimate client time try: est_gen = self._clock_sync.to_client_time(server_timetag) latency = reception_timestamp - est_gen # store only non-negative latencies if latency >= 0: with self._lock: self._latency_samples.append(latency) except Exception: pass except Exception: pass except Exception: pass # --- Update Target States --- real_targets, inactive_ids = self._parse_ris_payload_to_targets(payload) if self._hub: try: if hasattr(self._hub, "add_real_packet"): self._hub.add_real_packet(reception_timestamp) for tid in inactive_ids or []: if hasattr(self._hub, "clear_real_target_data"): self._hub.clear_real_target_data(tid) for target in real_targets: state_tuple = ( getattr(target, "_pos_x_ft", 0.0), getattr(target, "_pos_y_ft", 0.0), getattr(target, "_pos_z_ft", 0.0), ) self._hub.add_real_state( target_id=target.target_id, timestamp=reception_timestamp, state=state_tuple, ) if hasattr(self._hub, "set_real_heading"): raw_val = getattr(target, "_raw_heading", None) self._hub.set_real_heading( target.target_id, getattr(target, "current_heading_deg", 0.0), raw_value=raw_val, ) except Exception: self._logger.exception("Failed to process RIS targets for Hub.") with self._lock: archive = self.active_archive if archive: for target in real_targets: state_tuple = ( getattr(target, "_pos_x_ft", 0.0), getattr(target, "_pos_y_ft", 0.0), getattr(target, "_pos_z_ft", 0.0), ) archive.add_real_state( target_id=target.target_id, timestamp=reception_timestamp, state=state_tuple, ) # --- BROADCAST to all registered listeners --- with self._lock: for listener in self._ris_target_listeners: try: listener(real_targets) except Exception: self._logger.exception(f"Error in RIS target listener: {listener}") # --- Update Debug Views (unchanged) --- self._update_debug_views(parsed_payload) def _update_debug_views(self, parsed_payload: SfpRisStatusPayload): """Helper to populate debug views from a parsed payload.""" try: sc = parsed_payload.scenario lines = ["RIS Status Payload:\n", "Scenario:"] # ... text generation logic ... text_out = "\n".join(lines) self._update_last_payload( "RIS_STATUS_TEXT", bytearray(text_out.encode("utf-8")) ) # ... JSON generation logic ... def _convert_ctypes(value): if hasattr(value, "_length_"): return list(value) if isinstance(value, ctypes._SimpleCData): return value.value return value scenario_dict = { f[0]: _convert_ctypes(getattr(sc, f[0])) for f in sc._fields_ } targets_list = [ {f[0]: _convert_ctypes(getattr(t, f[0])) for f in t._fields_} for t in parsed_payload.tgt.tgt ] struct = {"scenario": scenario_dict, "targets": targets_list} json_bytes = bytearray(json.dumps(struct, indent=2).encode("utf-8")) self._update_last_payload("RIS_STATUS_JSON", json_bytes) # --- Propagate antenna azimuth to hub --- if self._hub: plat_az_rad = scenario_dict.get( "ant_nav_az", scenario_dict.get("platform_azimuth") ) if plat_az_rad is not None: az_deg = math.degrees(float(plat_az_rad)) self._hub.set_antenna_azimuth(az_deg, timestamp=time.monotonic()) except Exception: self._logger.exception("Failed to generate text/JSON for RIS debug view.") def get_and_clear_latest_payloads(self) -> Dict[str, Any]: with self._lock: new_payloads = self._latest_payloads self._latest_payloads = {} return new_payloads def update_raw_packet(self, raw_bytes: bytes, addr: tuple): with self._lock: self._last_raw_packet = (raw_bytes, addr) entry = {"ts": datetime.datetime.utcnow(), "addr": addr, "raw": raw_bytes} try: hdr = SFPHeader.from_buffer_copy(raw_bytes) entry["flow"] = int(hdr.SFP_FLOW) entry["tid"] = int(hdr.SFP_TID) flow_map = { ord("M"): "MFD", ord("S"): "SAR", ord("B"): "BIN", ord("J"): "JSON", ord("R"): "RIS", ord("r"): "ris", } entry["flow_name"] = flow_map.get(entry["flow"], str(entry["flow"])) except Exception: pass self._history.append(entry) if self._persist: try: ts = entry["ts"].strftime("%Y%m%dT%H%M%S.%f") fname = f"sfp_raw_{ts}_{addr[0].replace(':','_')}_{addr[1]}.bin" path = os.path.join(self._persist_dir, fname) with open(path, "wb") as f: f.write(raw_bytes) except Exception: pass def get_and_clear_raw_packet(self) -> Optional[tuple]: with self._lock: pkt = self._last_raw_packet self._last_raw_packet = None return pkt # --- Latency estimation API --- def get_estimated_latency_s(self) -> float: """Return estimated one-way latency in seconds, or 0.0 if unknown.""" try: if self._clock_sync is not None: return float(self._clock_sync.get_average_latency_s()) except Exception: pass return 0.0 def get_latency_samples(self, limit: Optional[int] = None) -> List[float]: """Return recent latency samples in seconds (most recent last). Args: limit: maximum number of samples to return (None = all available) """ with self._lock: samples = list(self._latency_samples) if limit is not None and limit > 0: return samples[-limit:] return samples def get_latency_stats(self, sample_limit: int = 200) -> Dict[str, Any]: """Compute basic statistics over recent latency samples. Returns a dict with mean_ms, std_ms, var_ms, min_ms, max_ms, count. """ with self._lock: samples = list(self._latency_samples) if not samples: return { "mean_ms": None, "std_ms": None, "var_ms": None, "min_ms": None, "max_ms": None, "count": 0, } if sample_limit and sample_limit > 0: samples = samples[-sample_limit:] # Work in milliseconds for reporting ms = [s * 1000.0 for s in samples] mean = statistics.mean(ms) var = statistics.pvariance(ms) if len(ms) > 1 else 0.0 std = statistics.pstdev(ms) if len(ms) > 1 else 0.0 return { "mean_ms": round(mean, 3), "std_ms": round(std, 3), "var_ms": round(var, 3), "min_ms": round(min(ms), 3), "max_ms": round(max(ms), 3), "count": len(ms), } def get_history(self): with self._lock: return list(self._history) def clear_history(self): with self._lock: self._history.clear() def set_history_size(self, n: int): with self._lock: n = max(1, int(n)) self._history_size = n new_deque = collections.deque(self._history, maxlen=self._history_size) self._history = new_deque def set_persist(self, enabled: bool): with self._lock: self._persist = bool(enabled)