# target_simulator/gui/payload_router.py """ Payload router for buffering SFP payloads for the GUI. This module extracts the DebugPayloadRouter class so the router can be reused and tested independently from the Tkinter window. """ import threading import statistics import collections import datetime import os import logging import math import json import ctypes import time from queue import Queue, Empty, Full from typing import Dict, Optional, Any, List, Callable, Tuple from target_simulator.core.sfp_structures import ( SFPHeader, SfpRisStatusPayload, DataTag, SfpRisSyncPayload, ) from target_simulator.analysis.simulation_state_hub import SimulationStateHub from target_simulator.core.models import Target from target_simulator.utils.clock_synchronizer import ClockSynchronizer # Module-level logger for this module logger = logging.getLogger(__name__) PayloadHandler = Callable[[bytearray], None] TargetListListener = Callable[[List[Target]], None] # Definiamo i tipi per i nuovi listener per maggiore chiarezza RawPacketListener = Callable[[bytes, tuple], None] PostProcessingListener = Callable[[bytes], None] # --- Constants --- M_TO_FT = 3.28084 PERFORMANCE_SAMPLES_BUFFER_SIZE = 10000 class DebugPayloadRouter: """ A unified router that handles payloads for the entire application. It receives payloads from the network thread, queues them, and processes them on a dedicated worker thread to avoid blocking the network receiver. This class is thread-safe. """ def __init__( self, simulation_hub: Optional[SimulationStateHub] = None, ): self.active_archive = None self.logger = logger self._log_prefix = "[DebugPayloadRouter]" self._lock = threading.Lock() # Liste per i nuovi listener di profiling esterno self._raw_packet_listeners: List[RawPacketListener] = [] self._post_processing_listeners: List[PostProcessingListener] = [] self._latest_payloads: Dict[str, Any] = {} self._last_raw_packet: Optional[tuple] = None self._sfp_debug_history_size = 20 self._history = collections.deque(maxlen=self._sfp_debug_history_size) self._persist = False self._hub = simulation_hub self._last_ownship_update_time: Optional[float] = None self._ris_target_listeners: List[TargetListListener] = [] # --- Worker Thread and Queue for Processing --- self._processing_queue: Queue = Queue(maxsize=100) self._sync_results_queue: Queue = Queue( maxsize=500 ) # Coda per i risultati SYNC (grande buffer) self._stop_worker = threading.Event() self._processing_thread = threading.Thread( target=self._processing_loop, name="PayloadProcessingThread", daemon=True ) self._processing_thread.start() from target_simulator.config import DEBUG_CONFIG self._profiling_enabled = DEBUG_CONFIG.get( "enable_performance_profiling", False ) self._perf_counters = { "packet_count": 0, "_total_packet_count": 0, "parse_time_total": 0.0, "hub_update_time_total": 0.0, "archive_time_total": 0.0, "listener_time_total": 0.0, "clock_sync_time_total": 0.0, "last_report_time": time.time(), "max_processing_time": 0.0, } if self._profiling_enabled: self._perf_samples = collections.deque( maxlen=PERFORMANCE_SAMPLES_BUFFER_SIZE ) else: self._perf_samples = None project_root = os.path.abspath( os.path.join(os.path.dirname(__file__), "..", "..") ) self._persist_dir = os.path.join(project_root, "Temp") try: os.makedirs(self._persist_dir, exist_ok=True) except Exception: pass self._handlers: Dict[int, PayloadHandler] = { ord("M"): lambda p: self._update_last_payload("MFD", p), ord("S"): lambda p: self._update_last_payload("SAR", p), ord("B"): lambda p: self._update_last_payload("BIN", p), ord("J"): self._handle_json_payload, ord("R"): self._handle_ris_status, ord("r"): self._handle_ris_status, } self.logger.info( f"{self._log_prefix} Initialized (Hub: {self._hub is not None})." ) try: self._clock_sync = ClockSynchronizer() except Exception: self._clock_sync = None def add_raw_packet_listener(self, listener: RawPacketListener): """Registra un listener per essere notificato all'arrivo di un pacchetto raw (Hook Porta A).""" with self._lock: if listener not in self._raw_packet_listeners: self._raw_packet_listeners.append(listener) self.logger.debug(f"Raw packet listener registered. Total: {len(self._raw_packet_listeners)}") def remove_raw_packet_listener(self, listener: RawPacketListener): """De-registra un listener per i pacchetti raw.""" with self._lock: try: self._raw_packet_listeners.remove(listener) self.logger.debug(f"Raw packet listener removed. Total: {len(self._raw_packet_listeners)}") except ValueError: pass def add_post_processing_listener(self, listener: PostProcessingListener): """Registra un listener per essere notificato dopo l'elaborazione di un pacchetto (Hook Porta B).""" with self._lock: if listener not in self._post_processing_listeners: self._post_processing_listeners.append(listener) self.logger.debug(f"Post-processing listener registered. Total: {len(self._post_processing_listeners)}") def remove_post_processing_listener(self, listener: PostProcessingListener): """De-registra un listener di post-elaborazione.""" with self._lock: try: self._post_processing_listeners.remove(listener) self.logger.debug(f"Post-processing listener removed. Total: {len(self._post_processing_listeners)}") except ValueError: pass def shutdown(self): """Signals the processing worker thread to stop.""" self.logger.info("Shutting down payload router worker thread...") self._stop_worker.set() try: self._processing_queue.put_nowait(None) except Full: pass self._processing_thread.join(timeout=2.0) if self._processing_thread.is_alive(): self.logger.warning("Payload processing thread did not shut down cleanly.") def _processing_loop(self): """Worker thread loop that processes payloads from the queue.""" self.logger.info("Payload processing worker thread started.") while not self._stop_worker.is_set(): try: # CORREZIONE: L'item in coda è ora (payload, reception_timestamp, raw_packet_originale) item = self._processing_queue.get(timeout=1.0) if item is None: break payload, reception_timestamp, raw_packet = item self._dispatch_payload(bytearray(payload), reception_timestamp, raw_packet) self._processing_queue.task_done() except Empty: continue except Exception: self.logger.exception("Unexpected error in payload processing loop.") self.logger.info("Payload processing worker thread stopped.") def _dispatch_payload(self, payload: bytearray, reception_timestamp: float, raw_bytes_for_profiling: bytes): """ Worker Thread: Analyzes the payload and routes it to the correct handler. """ try: if len(payload) < ctypes.sizeof(DataTag): self.logger.warning( f"Payload too short: {len(payload)} bytes, need at least {ctypes.sizeof(DataTag)}" ) return tag = DataTag.from_buffer_copy(payload) if tag.ID[0] == ord("S") and tag.ID[1] == ord("Y"): self._process_sync_reply_payload(payload, reception_timestamp) else: self._process_ris_status_payload(payload, reception_timestamp) except Exception: self.logger.exception("Error during payload dispatch.") finally: # Notifica i listener di post-elaborazione with self._lock: for listener in self._post_processing_listeners: try: listener(raw_bytes_for_profiling) except Exception as e: self.logger.error(f"Error in post-processing listener: {e}") def _process_sync_reply_payload( self, payload: bytearray, reception_timestamp: float ): """ Worker Thread: Handles a SYNC reply. """ try: sync_data = payload[ctypes.sizeof(DataTag) :] if len(sync_data) < 16: self.logger.warning( f"SYNC payload too short: {len(sync_data)} bytes, need at least 16" ) return sync_payload = SfpRisSyncPayload.from_buffer_copy(sync_data) result = { "cookie": sync_payload.cc_cookie, "server_timetag": sync_payload.ris_timetag, "reception_timestamp": reception_timestamp, "flags": sync_payload.flags, "tx_period_ms": sync_payload.tx_period_ms, } try: self._sync_results_queue.put_nowait(result) except Full: pass except Exception as e: self.logger.warning( f"Failed to parse SYNC reply payload: {e}", exc_info=True ) def _handle_ris_status(self, payload: bytearray): """ Network Thread: This handler is called by SfpTransport. It receives the REASSEMBLED PAYLOAD (without SFP Header). CORREZIONE: Questa funzione è stata rimossa perché la logica è stata spostata. Il `PayloadRouter` ora riceve i pacchetti raw tramite `update_raw_packet` e gestisce la decodifica e l'inoltro internamente. Per compatibilità con il dizionario `_handlers`, la lasciamo, ma ora la logica di accodamento è in `update_raw_packet` per avere accesso al pacchetto completo. """ pass # La logica è ora in update_raw_packet def _process_ris_status_payload( self, payload: bytearray, reception_timestamp: float ): """ Worker Thread: Parses and processes a single RIS status payload. """ t_start = time.perf_counter() if self._profiling_enabled else None if self._profiling_enabled: self._perf_counters["packet_count"] += 1 self._perf_counters["_total_packet_count"] += 1 parsed_payload = None t_parse_start = time.perf_counter() try: # Qui usiamo 'payload' che è già stato privato dell'header SFP parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload) except (ValueError, TypeError) as e: # Aggiungiamo contesto all'errore self.logger.error(f"Failed to parse SfpRisStatusPayload from buffer (len={len(payload)}). Error: {e}") return t_parse_end = time.perf_counter() if self._profiling_enabled: self._perf_counters["parse_time_total"] += t_parse_end - t_parse_start # ... (TUTTO il resto del metodo _process_ris_status_payload rimane IDENTICO) ... t_hub_start = time.perf_counter() if self._hub: try: sc = parsed_payload.scenario delta_t = 0.0 if self._last_ownship_update_time is not None: delta_t = reception_timestamp - self._last_ownship_update_time self._last_ownship_update_time = reception_timestamp old_state = self._hub.get_ownship_state() old_pos_xy = old_state.get("position_xy_ft", (0.0, 0.0)) ownship_vx_fps = float(sc.vy) * M_TO_FT ownship_vy_fps = float(sc.vx) * M_TO_FT new_pos_x_ft = old_pos_xy[0] + ownship_vx_fps * delta_t new_pos_y_ft = old_pos_xy[1] + ownship_vy_fps * delta_t ownship_heading_deg = math.degrees(float(sc.true_heading)) % 360 ownship_state = { "timestamp": reception_timestamp, "position_xy_ft": (new_pos_x_ft, new_pos_y_ft), "altitude_ft": float(sc.baro_altitude) * M_TO_FT, "velocity_xy_fps": (ownship_vx_fps, ownship_vy_fps), "heading_deg": ownship_heading_deg, "latitude": float(sc.latitude), "longitude": float(sc.longitude), } self._hub.set_ownship_state(ownship_state) with self._lock: archive = self.active_archive if archive and hasattr(archive, "add_ownship_state"): archive.add_ownship_state(ownship_state) except Exception: self.logger.exception("Failed to update ownship state.") t_clock_start = time.perf_counter() if self._clock_sync is not None and parsed_payload is not None: try: server_timetag = int(parsed_payload.scenario.timetag) self._clock_sync.add_sample(server_timetag, reception_timestamp) est_gen = self._clock_sync.to_client_time(server_timetag) latency = reception_timestamp - est_gen if latency >= 0 and self.active_archive is not None: latency_ms = latency * 1000 with self._lock: archive = self.active_archive if archive and hasattr(archive, "add_latency_sample"): archive.add_latency_sample(reception_timestamp, latency_ms) except Exception: pass t_clock_end = time.perf_counter() if self._profiling_enabled: self._perf_counters["clock_sync_time_total"] += t_clock_end - t_clock_start real_targets, inactive_ids = self._parse_ris_payload_to_targets(payload) if self._hub: try: self._hub.add_real_packet(reception_timestamp) for tid in inactive_ids or []: self._hub.clear_real_target_data(tid) for target in real_targets: state_tuple = ( getattr(target, "_pos_x_ft", 0.0), getattr(target, "_pos_y_ft", 0.0), getattr(target, "_pos_z_ft", 0.0), ) self._hub.add_real_state( target.target_id, reception_timestamp, state_tuple ) self._hub.set_real_heading( target.target_id, target.current_heading_deg, raw_value=getattr(target, "_raw_heading", None), ) except Exception: self.logger.exception("Failed to process RIS targets for Hub.") t_hub_end = time.perf_counter() if self._profiling_enabled: self._perf_counters["hub_update_time_total"] += t_hub_end - t_hub_start t_archive_start = time.perf_counter() with self._lock: archive = self.active_archive if archive: for target in real_targets: state_tuple = ( getattr(target, "_pos_x_ft", 0.0), getattr(target, "_pos_y_ft", 0.0), getattr(target, "_pos_z_ft", 0.0), ) archive.add_real_state( target.target_id, reception_timestamp, state_tuple ) t_archive_end = time.perf_counter() if self._profiling_enabled: self._perf_counters["archive_time_total"] += t_archive_end - t_archive_start t_listener_start = time.perf_counter() with self._lock: for listener in self._ris_target_listeners: try: listener(real_targets) except Exception: self.logger.exception(f"Error in RIS target listener: {listener}") t_listener_end = time.perf_counter() if self._profiling_enabled: self._perf_counters["listener_time_total"] += ( t_listener_end - t_listener_start ) if ( self._profiling_enabled and t_start is not None and self.active_archive is not None ): total_processing_time = time.perf_counter() - t_start self._perf_counters["max_processing_time"] = max( self._perf_counters["max_processing_time"], total_processing_time ) if ( total_processing_time > 0.010 or self._perf_counters["_total_packet_count"] % 100 == 0 ): if self._perf_samples is not None: sample = { "timestamp": reception_timestamp, "total_ms": round(total_processing_time * 1000, 3), "parse_ms": round((t_parse_end - t_parse_start) * 1000, 3), "hub_ms": round((t_hub_end - t_hub_start) * 1000, 3), "archive_ms": round( (t_archive_end - t_archive_start) * 1000, 3 ), "listener_ms": round( (t_listener_end - t_listener_start) * 1000, 3 ), "clock_ms": round((t_clock_end - t_clock_start) * 1000, 3), } self._perf_samples.append(sample) if len(self._perf_samples) % 500 == 0: self.logger.debug( f"Performance samples buffer: {len(self._perf_samples)} samples" ) current_time = time.time() if current_time - self._perf_counters["last_report_time"] >= 5.0: self._report_performance_stats() self._perf_counters["last_report_time"] = current_time self._update_debug_views(parsed_payload) def get_sync_result(self) -> Optional[Dict]: """Estrae un risultato dalla coda SYNC in modo non bloccante.""" try: return self._sync_results_queue.get_nowait() except Empty: return None def set_archive(self, archive): with self._lock: self.active_archive = archive if archive is not None: if self._perf_samples is not None: self._perf_samples.clear() self._perf_counters.update( { "packet_count": 0, "_total_packet_count": 0, "parse_time_total": 0.0, "hub_update_time_total": 0.0, "archive_time_total": 0.0, "listener_time_total": 0.0, "clock_sync_time_total": 0.0, "max_processing_time": 0.0, "last_report_time": time.time(), } ) self.logger.debug("Performance buffers cleared for new simulation") def add_ris_target_listener(self, listener: TargetListListener): with self._lock: if listener not in self._ris_target_listeners: self._ris_target_listeners.append(listener) self.logger.info(f"RIS target listener added: {listener}") def remove_ris_target_listener(self, listener: TargetListListener): with self._lock: try: self._ris_target_listeners.remove(listener) self.logger.info(f"RIS target listener removed: {listener}") except ValueError: pass def get_handlers(self) -> Dict[int, PayloadHandler]: # Questa funzione non è più il modo primario con cui il router riceve i dati, # ma la manteniamo per compatibilità con il transport. # Il vero punto di ingresso è `update_raw_packet`. return self._handlers def _update_last_payload(self, flow_id: str, payload: Any): with self._lock: self._latest_payloads[flow_id] = payload def _parse_ris_payload_to_targets( self, payload: bytearray ) -> Tuple[List[Target], List[int]]: targets: List[Target] = [] inactive_ids: List[int] = [] try: parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload) for i, ris_target in enumerate(parsed_payload.tgt.tgt): if ris_target.flags != 0: target = Target( target_id=i, trajectory=[], active=True, traceable=True ) pos_x_ft = float(ris_target.y) * M_TO_FT pos_y_ft = float(ris_target.x) * M_TO_FT pos_z_ft = float(ris_target.z) * M_TO_FT setattr(target, "_pos_x_ft", pos_x_ft) setattr(target, "_pos_y_ft", pos_y_ft) setattr(target, "_pos_z_ft", pos_z_ft) target._update_current_polar_coords() try: raw_h = float(ris_target.heading) target.current_heading_deg = ( math.degrees(raw_h) if abs(raw_h) <= 7.0 else raw_h ) % 360 setattr(target, "_raw_heading", raw_h) except (ValueError, TypeError): target.current_heading_deg = 0.0 targets.append(target) else: inactive_ids.append(int(i)) except Exception: self.logger.exception("Failed to parse RIS payload into Target objects.") return targets, inactive_ids def _update_debug_views(self, parsed_payload: SfpRisStatusPayload): try: sc = parsed_payload.scenario lines = ["RIS Status Payload:\n", "Scenario:"] text_out = "\n".join(lines) self._update_last_payload( "RIS_STATUS_TEXT", bytearray(text_out.encode("utf-8")) ) def _convert_ctypes(value): if hasattr(value, "_length_"): return list(value) if isinstance(value, ctypes._SimpleCData): return value.value return value scenario_dict = { f[0]: _convert_ctypes(getattr(sc, f[0])) for f in sc._fields_ } targets_list = [ {f[0]: _convert_ctypes(getattr(t, f[0])) for f in t._fields_} for t in parsed_payload.tgt.tgt ] struct = {"scenario": scenario_dict, "targets": targets_list} json_bytes = bytearray(json.dumps(struct, indent=2).encode("utf-8")) self._update_last_payload("RIS_STATUS_JSON", json_bytes) if self._hub: sweep_rad = scenario_dict.get("ant_nav_az") if sweep_rad is not None: try: sweep_deg = math.degrees(float(sweep_rad)) self._hub.set_antenna_azimuth( sweep_deg, timestamp=time.monotonic() ) except Exception: pass except Exception: self.logger.exception("Failed to generate text/JSON for RIS debug view.") def _handle_json_payload(self, payload: bytearray): try: self._update_last_payload("JSON", payload) if not self._hub: return try: obj = json.loads(payload.decode("utf-8")) except Exception: return def _find_val(keys, dct): if not isinstance(dct, dict): return None for key in keys: if key in dct: return dct[key] sc = dct.get("scenario", {}) if isinstance(sc, dict): for key in keys: if key in sc: return sc[key] return None sweep_val = _find_val(["ant_nav_az", "antenna_azimuth"], obj) if sweep_val is not None: try: s_rad = float(sweep_val) if abs(s_rad) > 7.0: s_rad = math.radians(s_rad) sweep_deg = math.degrees(s_rad) self._hub.set_antenna_azimuth(sweep_deg, timestamp=time.monotonic()) except Exception as e: self.logger.debug(f"Error processing antenna azimuth: {e}") except Exception: self.logger.exception("Error handling JSON payload") def get_and_clear_latest_payloads(self) -> Dict[str, Any]: with self._lock: new_payloads, self._latest_payloads = self._latest_payloads, {} return new_payloads def update_raw_packet(self, raw_bytes: bytes, addr: tuple): # Notifica i listener del pacchetto raw (Hook per la Porta A) with self._lock: for listener in self._raw_packet_listeners: try: listener(raw_bytes, addr) except Exception as e: self.logger.error(f"Error in raw packet listener: {e}") # CORREZIONE: Gestiamo qui l'accodamento dei pacchetti RIS header_size = SFPHeader.size() if len(raw_bytes) > header_size: try: # Leggiamo il flow ID direttamente dal pacchetto raw flow = raw_bytes[6] # L'offset del campo SFP_FLOW è 6 if flow in (ord('R'), ord('r')): reception_timestamp = time.monotonic() # Estraiamo la payload per il worker thread payload = raw_bytes[header_size:] # Mettiamo in coda la tupla completa per il worker self._processing_queue.put_nowait((payload, reception_timestamp, raw_bytes)) except Full: self.logger.error("Payload processing queue is full! A RIS packet was dropped.") except Exception as e: self.logger.error(f"Error queuing RIS packet in update_raw_packet: {e}") with self._lock: self._last_raw_packet = (raw_bytes, addr) entry = {"ts": datetime.datetime.utcnow(), "addr": addr, "raw": raw_bytes} try: hdr = SFPHeader.from_buffer_copy(raw_bytes) entry.update(flow=int(hdr.SFP_FLOW), tid=int(hdr.SFP_TID)) flow_map = { ord(c): n for c, n in [ ("M", "MFD"), ("S", "SAR"), ("B", "BIN"), ("J", "JSON"), ("R", "RIS"), ] } entry["flow_name"] = flow_map.get(entry["flow"], str(entry["flow"])) except Exception: pass self._history.append(entry) if self._persist: try: ts = entry["ts"].strftime("%Y%m%dT%H%M%S.%f") fname = f"sfp_raw_{ts}_{addr[0].replace(':','_')}_{addr[1]}.bin" with open(os.path.join(self._persist_dir, fname), "wb") as f: f.write(raw_bytes) except Exception: pass def get_and_clear_raw_packet(self) -> Optional[tuple]: with self._lock: pkt, self._last_raw_packet = self._last_raw_packet, None return pkt def get_estimated_latency_s(self) -> float: try: if self._clock_sync: return self._clock_sync.get_average_latency_s() except Exception: pass return 0.0 def get_latency_samples(self, limit: Optional[int] = None) -> List[tuple]: try: if self._clock_sync: samples = self._clock_sync.get_latency_history() return samples[-limit:] if limit else samples except Exception: pass return [] def get_latency_stats(self, sample_limit: int = 200) -> Dict[str, Any]: try: samples = self.get_latency_samples(limit=sample_limit) if not samples: return {"count": 0} ms = [s[1] * 1000.0 for s in samples] return { "mean_ms": round(statistics.mean(ms), 3), "std_ms": round(statistics.stdev(ms) if len(ms) > 1 else 0.0, 3), "min_ms": round(min(ms), 3), "max_ms": round(max(ms), 3), "count": len(ms), } except Exception: pass return {"count": 0} def get_history(self): with self._lock: return list(self._history) def get_performance_samples(self): result = ( list(self._perf_samples) if self._profiling_enabled and self._perf_samples else [] ) self.logger.debug( f"get_performance_samples called: profiling={self._profiling_enabled}, samples_count={len(result)}" ) return result def _report_performance_stats(self): try: count = self._perf_counters["packet_count"] if count == 0: return avg_parse = (self._perf_counters["parse_time_total"] / count) * 1000 avg_hub = (self._perf_counters["hub_update_time_total"] / count) * 1000 avg_archive = (self._perf_counters["archive_time_total"] / count) * 1000 avg_listener = (self._perf_counters["listener_time_total"] / count) * 1000 avg_clock = (self._perf_counters["clock_sync_time_total"] / count) * 1000 max_proc = self._perf_counters["max_processing_time"] * 1000 total_avg = avg_parse + avg_hub + avg_archive + avg_listener + avg_clock self.logger.info( f"[PERF] Pkts: {count} | Avg: {total_avg:.2f}ms " f"(parse:{avg_parse:.2f} hub:{avg_hub:.2f} arch:{avg_archive:.2f} listen:{avg_listener:.2f} sync:{avg_clock:.2f}) | " f"Max: {max_proc:.2f}ms" ) self._perf_counters["packet_count"] = 0 self._perf_counters["parse_time_total"] = 0.0 self._perf_counters["hub_update_time_total"] = 0.0 self._perf_counters["archive_time_total"] = 0.0 self._perf_counters["listener_time_total"] = 0.0 self._perf_counters["clock_sync_time_total"] = 0.0 self._perf_counters["max_processing_time"] = 0.0 except Exception as e: self.logger.debug(f"Error reporting performance stats: {e}") def clear_history(self): with self._lock: self._history.clear() def set_history_size(self, n: int): with self._lock: self._sfp_debug_history_size = max(1, int(n)) self._history = collections.deque( self._history, maxlen=self._sfp_debug_history_size ) def set_persist(self, enabled: bool): with self._lock: self._persist = bool(enabled)