From 9ac75a07e9b00b29843d3a7491dcb05fba1e8312 Mon Sep 17 00:00:00 2001 From: VALLONGOL Date: Wed, 19 Nov 2025 08:17:02 +0100 Subject: [PATCH] - aggiunto tool per la profilazione esterna dei tempi di risposta nella connessione, da usare con wireshark - modificato il sistema di aggiornamento della ppi per evitare draw bloccanti --- target_simulator/_version.py | 45 +++--- target_simulator/gui/external_profiler.py | 96 ++++++++++++ .../gui/external_profiler_window.py | 140 ++++++++++++++++++ target_simulator/gui/main_view.py | 8 +- target_simulator/gui/payload_router.py | 134 +++++++++++++---- target_simulator/gui/ppi_display.py | 24 ++- tools/test_udp_send_simple.py | 47 ++++++ 7 files changed, 432 insertions(+), 62 deletions(-) create mode 100644 target_simulator/gui/external_profiler.py create mode 100644 target_simulator/gui/external_profiler_window.py create mode 100644 tools/test_udp_send_simple.py diff --git a/target_simulator/_version.py b/target_simulator/_version.py index e93510e..b386fb5 100644 --- a/target_simulator/_version.py +++ b/target_simulator/_version.py @@ -6,10 +6,10 @@ import re # --- Version Data (Generated) --- -__version__ = "v.0.0.0.99-0-g2d7f8ea" -GIT_COMMIT_HASH = "2d7f8ea75d9d74355a975c05ba89e8a7330a63ef" +__version__ = "v.0.0.1.1-0-g5908e72-dirty" +GIT_COMMIT_HASH = "5908e72ae3da9451d2963f2e24d16d12d47655ea" GIT_BRANCH = "master" -BUILD_TIMESTAMP = "2025-11-17T12:34:39.102051+00:00" +BUILD_TIMESTAMP = "2025-11-18T14:12:47.478320+00:00" IS_GIT_REPO = True # --- Default Values (for comparison or fallback) --- @@ -17,7 +17,6 @@ DEFAULT_VERSION = "0.0.0+unknown" DEFAULT_COMMIT = "Unknown" DEFAULT_BRANCH = "Unknown" - # --- Helper Function --- def get_version_string(format_string=None): """ @@ -45,39 +44,29 @@ def get_version_string(format_string=None): replacements = {} try: - replacements["version"] = __version__ if __version__ else DEFAULT_VERSION - replacements["commit"] = GIT_COMMIT_HASH if GIT_COMMIT_HASH else DEFAULT_COMMIT - replacements["commit_short"] = ( - GIT_COMMIT_HASH[:7] - if GIT_COMMIT_HASH and len(GIT_COMMIT_HASH) >= 7 - else DEFAULT_COMMIT - ) - replacements["branch"] = GIT_BRANCH if GIT_BRANCH else DEFAULT_BRANCH - replacements["timestamp"] = BUILD_TIMESTAMP if BUILD_TIMESTAMP else "Unknown" - replacements["timestamp_short"] = ( - BUILD_TIMESTAMP.split("T")[0] - if BUILD_TIMESTAMP and "T" in BUILD_TIMESTAMP - else "Unknown" - ) - replacements["is_git"] = "Git" if IS_GIT_REPO else "Unknown" - replacements["dirty"] = ( - "-dirty" if __version__ and __version__.endswith("-dirty") else "" - ) + replacements['version'] = __version__ if __version__ else DEFAULT_VERSION + replacements['commit'] = GIT_COMMIT_HASH if GIT_COMMIT_HASH else DEFAULT_COMMIT + replacements['commit_short'] = GIT_COMMIT_HASH[:7] if GIT_COMMIT_HASH and len(GIT_COMMIT_HASH) >= 7 else DEFAULT_COMMIT + replacements['branch'] = GIT_BRANCH if GIT_BRANCH else DEFAULT_BRANCH + replacements['timestamp'] = BUILD_TIMESTAMP if BUILD_TIMESTAMP else "Unknown" + replacements['timestamp_short'] = BUILD_TIMESTAMP.split('T')[0] if BUILD_TIMESTAMP and 'T' in BUILD_TIMESTAMP else "Unknown" + replacements['is_git'] = "Git" if IS_GIT_REPO else "Unknown" + replacements['dirty'] = "-dirty" if __version__ and __version__.endswith('-dirty') else "" tag = DEFAULT_VERSION if __version__ and IS_GIT_REPO: - match = re.match(r"^(v?([0-9]+(?:\.[0-9]+)*))", __version__) + match = re.match(r'^(v?([0-9]+(?:\.[0-9]+)*))', __version__) if match: tag = match.group(1) - replacements["tag"] = tag + replacements['tag'] = tag output_string = format_string for placeholder, value in replacements.items(): - pattern = re.compile(r"{{\s*" + re.escape(placeholder) + r"\s*}}") - output_string = pattern.sub(str(value), output_string) + pattern = re.compile(r'{{\s*' + re.escape(placeholder) + r'\s*}}') + output_string = pattern.sub(str(value), output_string) - if re.search(r"{\s*\w+\s*}", output_string): - pass # Or log a warning: print(f"Warning: Unreplaced placeholders found: {output_string}") + if re.search(r'{\s*\w+\s*}', output_string): + pass # Or log a warning: print(f"Warning: Unreplaced placeholders found: {output_string}") return output_string diff --git a/target_simulator/gui/external_profiler.py b/target_simulator/gui/external_profiler.py new file mode 100644 index 0000000..a770ad5 --- /dev/null +++ b/target_simulator/gui/external_profiler.py @@ -0,0 +1,96 @@ +# target_simulator/gui/external_profiler.py +import socket +import logging + +class ExternalProfiler: + """ + Gestisce la logica di forwarding dei pacchetti su porte dedicate per il profiling esterno. + """ + def __init__(self): + self.logger = logging.getLogger(__name__) + self.port_a = 0 + self.port_b = 0 + self.socket_a = None + self.socket_b = None + self.is_active = False + self.use_broadcast = False + + def start(self, port_a: int, port_b: int, use_broadcast: bool = False) -> bool: + """ + Crea i socket e avvia il profiler. + + Args: + port_a: Porta per l'invio del pacchetto raw (inizio elaborazione) + port_b: Porta per l'invio dopo elaborazione (fine elaborazione) + use_broadcast: Se True, invia in broadcast invece che solo a localhost + """ + if self.is_active: + self.logger.warning("Profiler is already active.") + return True + + self.port_a = port_a + self.port_b = port_b + self.use_broadcast = use_broadcast + + try: + self.socket_a = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.socket_a.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + if use_broadcast: + self.socket_a.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + + self.socket_b = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.socket_b.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + if use_broadcast: + self.socket_b.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + + self.is_active = True + target = "broadcast" if use_broadcast else "127.0.0.1" + self.logger.info(f"External Profiler started. Forwarding to {target} on ports A:{port_a}, B:{port_b}") + return True + except Exception as e: + self.logger.error(f"Failed to create sockets for profiling: {e}") + self.stop() + return False + + def stop(self): + """ + Ferma il profiler e chiude i socket. + """ + if self.socket_a: + self.socket_a.close() + self.socket_a = None + if self.socket_b: + self.socket_b.close() + self.socket_b = None + self.is_active = False + self.logger.info("External Profiler stopped.") + + def forward_to_port_a(self, raw_bytes: bytes, addr: tuple): + """ + Callback per la Porta A: inoltra il pacchetto raw se è un messaggio di scenario. + """ + if not self.is_active or not self.socket_a or len(raw_bytes) <= 6: + return + + flow = raw_bytes[6] + if flow in (ord('R'), ord('r')): + try: + dest_addr = '255.255.255.255' if self.use_broadcast else '127.0.0.1' + self.socket_a.sendto(raw_bytes, (dest_addr, self.port_a)) + except Exception as e: + self.logger.error(f"Failed to forward to Port A: {e}") + + def forward_to_port_b(self, raw_bytes: bytes): + """ + Callback per la Porta B: inoltra il pacchetto raw dopo l'elaborazione. + """ + if not self.is_active or not self.socket_b or len(raw_bytes) <= 6: + return + + flow = raw_bytes[6] + if flow in (ord('R'), ord('r')): + try: + dest_addr = '255.255.255.255' if self.use_broadcast else '127.0.0.1' + self.socket_b.sendto(raw_bytes, (dest_addr, self.port_b)) + except Exception as e: + self.logger.error(f"Failed to forward to Port B: {e}") \ No newline at end of file diff --git a/target_simulator/gui/external_profiler_window.py b/target_simulator/gui/external_profiler_window.py new file mode 100644 index 0000000..3592cde --- /dev/null +++ b/target_simulator/gui/external_profiler_window.py @@ -0,0 +1,140 @@ +# target_simulator/gui/external_profiler_window.py +import tkinter as tk +from tkinter import ttk, messagebox +from target_simulator.gui.external_profiler import ExternalProfiler +from target_simulator.core.sfp_communicator import SFPCommunicator + +class ExternalProfilerWindow(tk.Toplevel): + """ + Finestra di dialogo per controllare lo strumento di profiling esterno. + """ + def __init__(self, master): + super().__init__(master) + self.title("External Profiling Tool") + self.geometry("500x450") + self.transient(master) + self.grab_set() + + self.profiler = ExternalProfiler() + self.router = None + + # Ottieni il router dal communicator SFP principale + communicator = getattr(master, "target_communicator", None) + if isinstance(communicator, SFPCommunicator): + self.router = communicator.router() + + self._create_widgets() + self.protocol("WM_DELETE_WINDOW", self._on_close) + + def _create_widgets(self): + main_frame = ttk.Frame(self, padding=10) + main_frame.pack(fill=tk.BOTH, expand=True) + + # Info panel + info_frame = ttk.LabelFrame(main_frame, text="ℹ️ Information", padding=10) + info_frame.pack(fill='x', pady=(0, 10)) + + info_text = ( + "This tool broadcasts RIS packets to specified UDP ports for network latency analysis.\n\n" + "• Port A: Packets sent when received from server (start of processing)\n" + "• Port B: Packets sent after processing (end of processing)\n\n" + "To capture packets, use Wireshark with filter:\n" + " udp.port == || udp.port == \n\n" + "Enable Broadcast when the server is on a different machine." + ) + info_label = ttk.Label(info_frame, text=info_text, justify='left', wraplength=460) + info_label.pack(anchor='w') + + # Configuration frame + config_frame = ttk.LabelFrame(main_frame, text="Configuration", padding=10) + config_frame.pack(fill='x', pady=(0, 10)) + + ttk.Label(config_frame, text="Port A (Start of Processing):").pack(anchor='w') + self.port_a_var = tk.StringVar(value="55001") + self.port_a_entry = ttk.Entry(config_frame, textvariable=self.port_a_var) + self.port_a_entry.pack(fill='x', pady=(0, 10)) + + ttk.Label(config_frame, text="Port B (End of Processing):").pack(anchor='w') + self.port_b_var = tk.StringVar(value="55002") + self.port_b_entry = ttk.Entry(config_frame, textvariable=self.port_b_var) + self.port_b_entry.pack(fill='x', pady=(0, 10)) + + # Broadcast checkbox + self.broadcast_var = tk.BooleanVar(value=False) + self.broadcast_check = ttk.Checkbutton( + config_frame, + text="Enable Broadcast (for remote servers)", + variable=self.broadcast_var + ) + self.broadcast_check.pack(anchor='w', pady=(5, 0)) + + broadcast_hint = ttk.Label( + config_frame, + text="⚠ Localhost (127.0.0.1) if disabled, Broadcast (255.255.255.255) if enabled", + font=('TkDefaultFont', 8), + foreground='gray' + ) + broadcast_hint.pack(anchor='w', padx=(20, 0)) + + # Control frame + control_frame = ttk.Frame(main_frame) + control_frame.pack(fill='x', pady=(10, 0)) + + self.start_stop_button = ttk.Button(control_frame, text="Start Profiling", command=self._toggle_profiling) + self.start_stop_button.pack(pady=(0, 10)) + + self.status_var = tk.StringVar(value="Status: Inactive") + status_label = ttk.Label(control_frame, textvariable=self.status_var, font=('TkDefaultFont', 9, 'bold')) + status_label.pack() + + def _toggle_profiling(self): + if self.profiler.is_active: + self._stop_profiling() + else: + self._start_profiling() + + def _start_profiling(self): + if not self.router: + messagebox.showerror("Error", "SFP communicator's router not found.", parent=self) + return + + try: + port_a = int(self.port_a_var.get()) + port_b = int(self.port_b_var.get()) + if port_a == port_b: + raise ValueError("Ports must be different.") + except ValueError as e: + messagebox.showerror("Invalid Input", f"Please enter valid and different port numbers.\n{e}", parent=self) + return + + use_broadcast = self.broadcast_var.get() + + if self.profiler.start(port_a, port_b, use_broadcast): + # Registra i callback sul router + self.router.add_raw_packet_listener(self.profiler.forward_to_port_a) + self.router.add_post_processing_listener(self.profiler.forward_to_port_b) + + target = "broadcast" if use_broadcast else "localhost" + self.status_var.set(f"Status: Active - Sending to {target} ports {port_a}/{port_b}") + self.start_stop_button.config(text="Stop Profiling") + self.port_a_entry.config(state='disabled') + self.port_b_entry.config(state='disabled') + self.broadcast_check.config(state='disabled') + + def _stop_profiling(self): + self.profiler.stop() + if self.router: + # Deregistra i callback per pulizia + self.router.remove_raw_packet_listener(self.profiler.forward_to_port_a) + self.router.remove_post_processing_listener(self.profiler.forward_to_port_b) + + self.status_var.set("Status: Inactive") + self.start_stop_button.config(text="Start Profiling") + self.port_a_entry.config(state='normal') + self.port_b_entry.config(state='normal') + self.broadcast_check.config(state='normal') + + def _on_close(self): + if self.profiler.is_active: + self._stop_profiling() + self.destroy() \ No newline at end of file diff --git a/target_simulator/gui/main_view.py b/target_simulator/gui/main_view.py index 2137f09..71e620f 100644 --- a/target_simulator/gui/main_view.py +++ b/target_simulator/gui/main_view.py @@ -63,6 +63,7 @@ from target_simulator.core import command_builder from target_simulator.analysis.simulation_archive import SimulationArchive from target_simulator.communication.communicator_manager import CommunicatorManager from target_simulator.simulation.simulation_controller import SimulationController +from target_simulator.gui.external_profiler_window import ExternalProfilerWindow from target_simulator.gui.sync_tool_window import SyncToolWindow @@ -74,7 +75,7 @@ try: except ImportError: WRAPPER_APP_VERSION_STRING = "(Dev Wrapper)" -GUI_REFRESH_RATE_MS = 40 +GUI_REFRESH_RATE_MS = 100 #40 default 25 fps class MainView(tk.Tk): @@ -362,6 +363,7 @@ class MainView(tk.Tk): ) debug_menu.add_separator() debug_menu.add_command(label="Sync Tool...", command=self._open_sync_tool) + debug_menu.add_command(label="External Profiler...", command=self._open_external_profiler) def _create_statusbar(self): """Create and place the application's status bar widget and expose vars.""" @@ -985,3 +987,7 @@ class MainView(tk.Tk): "Sync Tool requires an active SFP communicator.", parent=self, ) + + def _open_external_profiler(self): + """Apre la finestra dello strumento di profiling esterno.""" + ExternalProfilerWindow(self) diff --git a/target_simulator/gui/payload_router.py b/target_simulator/gui/payload_router.py index 6799669..e026056 100644 --- a/target_simulator/gui/payload_router.py +++ b/target_simulator/gui/payload_router.py @@ -35,6 +35,11 @@ logger = logging.getLogger(__name__) PayloadHandler = Callable[[bytearray], None] TargetListListener = Callable[[List[Target]], None] +# Definiamo i tipi per i nuovi listener per maggiore chiarezza +RawPacketListener = Callable[[bytes, tuple], None] +PostProcessingListener = Callable[[bytes], None] + + # --- Constants --- M_TO_FT = 3.28084 PERFORMANCE_SAMPLES_BUFFER_SIZE = 10000 @@ -56,6 +61,11 @@ class DebugPayloadRouter: self.logger = logger self._log_prefix = "[DebugPayloadRouter]" self._lock = threading.Lock() + + # Liste per i nuovi listener di profiling esterno + self._raw_packet_listeners: List[RawPacketListener] = [] + self._post_processing_listeners: List[PostProcessingListener] = [] + self._latest_payloads: Dict[str, Any] = {} self._last_raw_packet: Optional[tuple] = None self._sfp_debug_history_size = 20 @@ -127,6 +137,38 @@ class DebugPayloadRouter: except Exception: self._clock_sync = None + def add_raw_packet_listener(self, listener: RawPacketListener): + """Registra un listener per essere notificato all'arrivo di un pacchetto raw (Hook Porta A).""" + with self._lock: + if listener not in self._raw_packet_listeners: + self._raw_packet_listeners.append(listener) + self.logger.debug(f"Raw packet listener registered. Total: {len(self._raw_packet_listeners)}") + + def remove_raw_packet_listener(self, listener: RawPacketListener): + """De-registra un listener per i pacchetti raw.""" + with self._lock: + try: + self._raw_packet_listeners.remove(listener) + self.logger.debug(f"Raw packet listener removed. Total: {len(self._raw_packet_listeners)}") + except ValueError: + pass + + def add_post_processing_listener(self, listener: PostProcessingListener): + """Registra un listener per essere notificato dopo l'elaborazione di un pacchetto (Hook Porta B).""" + with self._lock: + if listener not in self._post_processing_listeners: + self._post_processing_listeners.append(listener) + self.logger.debug(f"Post-processing listener registered. Total: {len(self._post_processing_listeners)}") + + def remove_post_processing_listener(self, listener: PostProcessingListener): + """De-registra un listener di post-elaborazione.""" + with self._lock: + try: + self._post_processing_listeners.remove(listener) + self.logger.debug(f"Post-processing listener removed. Total: {len(self._post_processing_listeners)}") + except ValueError: + pass + def shutdown(self): """Signals the processing worker thread to stop.""" self.logger.info("Shutting down payload router worker thread...") @@ -144,12 +186,15 @@ class DebugPayloadRouter: self.logger.info("Payload processing worker thread started.") while not self._stop_worker.is_set(): try: + # CORREZIONE: L'item in coda è ora (payload, reception_timestamp, raw_packet_originale) item = self._processing_queue.get(timeout=1.0) if item is None: break - payload, reception_timestamp = item - self._dispatch_payload(payload, reception_timestamp) + payload, reception_timestamp, raw_packet = item + + self._dispatch_payload(bytearray(payload), reception_timestamp, raw_packet) + self._processing_queue.task_done() except Empty: @@ -158,10 +203,9 @@ class DebugPayloadRouter: self.logger.exception("Unexpected error in payload processing loop.") self.logger.info("Payload processing worker thread stopped.") - def _dispatch_payload(self, payload: bytearray, reception_timestamp: float): + def _dispatch_payload(self, payload: bytearray, reception_timestamp: float, raw_bytes_for_profiling: bytes): """ Worker Thread: Analyzes the payload and routes it to the correct handler. - Distinguishes between RIS status messages and SYNC replies. """ try: if len(payload) < ctypes.sizeof(DataTag): @@ -171,17 +215,21 @@ class DebugPayloadRouter: return tag = DataTag.from_buffer_copy(payload) - tag_str = f"{chr(tag.ID[0]) if 32 <= tag.ID[0] <= 126 else tag.ID[0]}{chr(tag.ID[1]) if 32 <= tag.ID[1] <= 126 else tag.ID[1]}" - - # 'SY' -> SYNC Reply (ha priorità perché è specifico) + if tag.ID[0] == ord("S") and tag.ID[1] == ord("Y"): self._process_sync_reply_payload(payload, reception_timestamp) else: - # Tutti gli altri payload sul flow 'R' sono RIS Status - # (che iniziano con scenario_tag, target_tag, ecc.) self._process_ris_status_payload(payload, reception_timestamp) except Exception: self.logger.exception("Error during payload dispatch.") + finally: + # Notifica i listener di post-elaborazione + with self._lock: + for listener in self._post_processing_listeners: + try: + listener(raw_bytes_for_profiling) + except Exception as e: + self.logger.error(f"Error in post-processing listener: {e}") def _process_sync_reply_payload( self, payload: bytearray, reception_timestamp: float @@ -190,20 +238,13 @@ class DebugPayloadRouter: Worker Thread: Handles a SYNC reply. """ try: - # The payload here is the DataTag + SfpRisSyncPayload - # Skip the DataTag (8 bytes) to get to the actual sync data sync_data = payload[ctypes.sizeof(DataTag) :] - if len(sync_data) < 16: self.logger.warning( f"SYNC payload too short: {len(sync_data)} bytes, need at least 16" ) return - - # Usa la struttura ctypes che ora corrisponde esattamente al C++ - # Ordine corretto: flags, cc_cookie, ris_timetag, tx_period_ms sync_payload = SfpRisSyncPayload.from_buffer_copy(sync_data) - result = { "cookie": sync_payload.cc_cookie, "server_timetag": sync_payload.ris_timetag, @@ -211,15 +252,10 @@ class DebugPayloadRouter: "flags": sync_payload.flags, "tx_period_ms": sync_payload.tx_period_ms, } - - # Put the result into the dedicated queue for the Sync Tool window try: self._sync_results_queue.put_nowait(result) except Full: - # La coda è piena (probabilmente la finestra Sync Tool non è aperta o non sta consumando) - # Scartiamo silenziosamente - non è un errore critico pass - except Exception as e: self.logger.warning( f"Failed to parse SYNC reply payload: {e}", exc_info=True @@ -227,14 +263,19 @@ class DebugPayloadRouter: def _handle_ris_status(self, payload: bytearray): """ - Network Thread: Queues an incoming RIS status payload for processing. + Network Thread: This handler is called by SfpTransport. + It receives the REASSEMBLED PAYLOAD (without SFP Header). + + CORREZIONE: Questa funzione è stata rimossa perché la logica è stata spostata. + Il `PayloadRouter` ora riceve i pacchetti raw tramite `update_raw_packet` e gestisce + la decodifica e l'inoltro internamente. + + Per compatibilità con il dizionario `_handlers`, la lasciamo, ma + ora la logica di accodamento è in `update_raw_packet` per avere accesso + al pacchetto completo. """ - reception_timestamp = time.monotonic() - try: - self._processing_queue.put_nowait((payload, reception_timestamp)) - except Full: - self.logger.error("Payload processing queue is full! A packet was dropped.") - + pass # La logica è ora in update_raw_packet + def _process_ris_status_payload( self, payload: bytearray, reception_timestamp: float ): @@ -250,14 +291,17 @@ class DebugPayloadRouter: parsed_payload = None t_parse_start = time.perf_counter() try: + # Qui usiamo 'payload' che è già stato privato dell'header SFP parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload) - except (ValueError, TypeError): - self.logger.error("Failed to parse SfpRisStatusPayload from buffer.") + except (ValueError, TypeError) as e: + # Aggiungiamo contesto all'errore + self.logger.error(f"Failed to parse SfpRisStatusPayload from buffer (len={len(payload)}). Error: {e}") return t_parse_end = time.perf_counter() if self._profiling_enabled: self._perf_counters["parse_time_total"] += t_parse_end - t_parse_start + # ... (TUTTO il resto del metodo _process_ris_status_payload rimane IDENTICO) ... t_hub_start = time.perf_counter() if self._hub: try: @@ -451,6 +495,9 @@ class DebugPayloadRouter: pass def get_handlers(self) -> Dict[int, PayloadHandler]: + # Questa funzione non è più il modo primario con cui il router riceve i dati, + # ma la manteniamo per compatibilità con il transport. + # Il vero punto di ingresso è `update_raw_packet`. return self._handlers def _update_last_payload(self, flow_id: str, payload: Any): @@ -573,6 +620,31 @@ class DebugPayloadRouter: return new_payloads def update_raw_packet(self, raw_bytes: bytes, addr: tuple): + # Notifica i listener del pacchetto raw (Hook per la Porta A) + with self._lock: + for listener in self._raw_packet_listeners: + try: + listener(raw_bytes, addr) + except Exception as e: + self.logger.error(f"Error in raw packet listener: {e}") + + # CORREZIONE: Gestiamo qui l'accodamento dei pacchetti RIS + header_size = SFPHeader.size() + if len(raw_bytes) > header_size: + try: + # Leggiamo il flow ID direttamente dal pacchetto raw + flow = raw_bytes[6] # L'offset del campo SFP_FLOW è 6 + if flow in (ord('R'), ord('r')): + reception_timestamp = time.monotonic() + # Estraiamo la payload per il worker thread + payload = raw_bytes[header_size:] + # Mettiamo in coda la tupla completa per il worker + self._processing_queue.put_nowait((payload, reception_timestamp, raw_bytes)) + except Full: + self.logger.error("Payload processing queue is full! A RIS packet was dropped.") + except Exception as e: + self.logger.error(f"Error queuing RIS packet in update_raw_packet: {e}") + with self._lock: self._last_raw_packet = (raw_bytes, addr) entry = {"ts": datetime.datetime.utcnow(), "addr": addr, "raw": raw_bytes} @@ -700,4 +772,4 @@ class DebugPayloadRouter: def set_persist(self, enabled: bool): with self._lock: - self._persist = bool(enabled) + self._persist = bool(enabled) \ No newline at end of file diff --git a/target_simulator/gui/ppi_display.py b/target_simulator/gui/ppi_display.py index 3800347..04c1985 100644 --- a/target_simulator/gui/ppi_display.py +++ b/target_simulator/gui/ppi_display.py @@ -369,8 +369,19 @@ class PPIDisplay(ttk.Frame): def update_simulated_targets(self, targets: List[Target]): """Updates and redraws only the simulated targets.""" self._update_target_category(targets, "simulated") + # Schedule a non-blocking redraw to avoid blocking the Tk event loop. + # The main view already calls `canvas.draw_idle()` once per GUI loop, + # but calling `draw_idle()` here helps ensure an update is requested + # when this method is invoked directly by UI actions. if self.canvas: - self.canvas.draw() + try: + self.canvas.draw_idle() + except Exception: + # Fall back to a blocking draw if draw_idle is not available + try: + self.canvas.draw() + except Exception: + logger.exception("Failed to redraw PPI canvas") def update_real_targets(self, targets: List[Target]): """Updates and redraws only the real targets.""" @@ -385,8 +396,17 @@ class PPIDisplay(ttk.Frame): pass self._update_target_category(targets, "real") + # Use non-blocking redraw to prevent long Matplotlib draw times from + # stalling the Tk event loop. This keeps the GUI refresh loop closer + # to the configured `GUI_REFRESH_RATE_MS` even under heavy load. if self.canvas: - self.canvas.draw() + try: + self.canvas.draw_idle() + except Exception: + try: + self.canvas.draw() + except Exception: + logger.exception("Failed to redraw PPI canvas") def get_real_update_rate(self, window_seconds: float = 1.0) -> float: """ diff --git a/tools/test_udp_send_simple.py b/tools/test_udp_send_simple.py new file mode 100644 index 0000000..ba90678 --- /dev/null +++ b/tools/test_udp_send_simple.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +""" +Script di test per verificare che l'invio UDP funzioni correttamente. +Prova ad inviare alcuni pacchetti alle porte 55001 e 55002. +""" +import socket +import time + +def test_udp_send(): + """Testa l'invio di pacchetti UDP sulle porte di profiling.""" + + # Crea un socket UDP + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + test_data = b"TEST_PACKET_" + str(time.time()).encode('ascii') + + port_a = 55001 + port_b = 55002 + dest = '127.0.0.1' + + print(f"Tentativo di invio su {dest}:{port_a}") + try: + bytes_sent = sock.sendto(test_data, (dest, port_a)) + print(f"✓ Inviati {bytes_sent} bytes alla porta {port_a}") + except Exception as e: + print(f"✗ ERRORE nell'invio alla porta {port_a}: {e}") + import traceback + traceback.print_exc() + + time.sleep(0.1) + + print(f"\nTentativo di invio su {dest}:{port_b}") + try: + bytes_sent = sock.sendto(test_data, (dest, port_b)) + print(f"✓ Inviati {bytes_sent} bytes alla porta {port_b}") + except Exception as e: + print(f"✗ ERRORE nell'invio alla porta {port_b}: {e}") + import traceback + traceback.print_exc() + + sock.close() + print("\nTest completato. Controlla Wireshark per vedere se i pacchetti sono stati catturati.") + print("Filtro Wireshark suggerito: udp.port == 55001 || udp.port == 55002") + +if __name__ == '__main__': + test_udp_send()