- aggiunto tool per la profilazione esterna dei tempi di risposta nella connessione, da usare con wireshark

- modificato il sistema di aggiornamento della ppi per evitare draw bloccanti
This commit is contained in:
VALLONGOL 2025-11-19 08:17:02 +01:00
parent 5908e72ae3
commit 9ac75a07e9
7 changed files with 432 additions and 62 deletions

View File

@ -6,10 +6,10 @@
import re import re
# --- Version Data (Generated) --- # --- Version Data (Generated) ---
__version__ = "v.0.0.0.99-0-g2d7f8ea" __version__ = "v.0.0.1.1-0-g5908e72-dirty"
GIT_COMMIT_HASH = "2d7f8ea75d9d74355a975c05ba89e8a7330a63ef" GIT_COMMIT_HASH = "5908e72ae3da9451d2963f2e24d16d12d47655ea"
GIT_BRANCH = "master" GIT_BRANCH = "master"
BUILD_TIMESTAMP = "2025-11-17T12:34:39.102051+00:00" BUILD_TIMESTAMP = "2025-11-18T14:12:47.478320+00:00"
IS_GIT_REPO = True IS_GIT_REPO = True
# --- Default Values (for comparison or fallback) --- # --- Default Values (for comparison or fallback) ---
@ -17,7 +17,6 @@ DEFAULT_VERSION = "0.0.0+unknown"
DEFAULT_COMMIT = "Unknown" DEFAULT_COMMIT = "Unknown"
DEFAULT_BRANCH = "Unknown" DEFAULT_BRANCH = "Unknown"
# --- Helper Function --- # --- Helper Function ---
def get_version_string(format_string=None): def get_version_string(format_string=None):
""" """
@ -45,39 +44,29 @@ def get_version_string(format_string=None):
replacements = {} replacements = {}
try: try:
replacements["version"] = __version__ if __version__ else DEFAULT_VERSION replacements['version'] = __version__ if __version__ else DEFAULT_VERSION
replacements["commit"] = GIT_COMMIT_HASH if GIT_COMMIT_HASH else DEFAULT_COMMIT replacements['commit'] = GIT_COMMIT_HASH if GIT_COMMIT_HASH else DEFAULT_COMMIT
replacements["commit_short"] = ( replacements['commit_short'] = GIT_COMMIT_HASH[:7] if GIT_COMMIT_HASH and len(GIT_COMMIT_HASH) >= 7 else DEFAULT_COMMIT
GIT_COMMIT_HASH[:7] replacements['branch'] = GIT_BRANCH if GIT_BRANCH else DEFAULT_BRANCH
if GIT_COMMIT_HASH and len(GIT_COMMIT_HASH) >= 7 replacements['timestamp'] = BUILD_TIMESTAMP if BUILD_TIMESTAMP else "Unknown"
else DEFAULT_COMMIT replacements['timestamp_short'] = BUILD_TIMESTAMP.split('T')[0] if BUILD_TIMESTAMP and 'T' in BUILD_TIMESTAMP else "Unknown"
) replacements['is_git'] = "Git" if IS_GIT_REPO else "Unknown"
replacements["branch"] = GIT_BRANCH if GIT_BRANCH else DEFAULT_BRANCH replacements['dirty'] = "-dirty" if __version__ and __version__.endswith('-dirty') else ""
replacements["timestamp"] = BUILD_TIMESTAMP if BUILD_TIMESTAMP else "Unknown"
replacements["timestamp_short"] = (
BUILD_TIMESTAMP.split("T")[0]
if BUILD_TIMESTAMP and "T" in BUILD_TIMESTAMP
else "Unknown"
)
replacements["is_git"] = "Git" if IS_GIT_REPO else "Unknown"
replacements["dirty"] = (
"-dirty" if __version__ and __version__.endswith("-dirty") else ""
)
tag = DEFAULT_VERSION tag = DEFAULT_VERSION
if __version__ and IS_GIT_REPO: if __version__ and IS_GIT_REPO:
match = re.match(r"^(v?([0-9]+(?:\.[0-9]+)*))", __version__) match = re.match(r'^(v?([0-9]+(?:\.[0-9]+)*))', __version__)
if match: if match:
tag = match.group(1) tag = match.group(1)
replacements["tag"] = tag replacements['tag'] = tag
output_string = format_string output_string = format_string
for placeholder, value in replacements.items(): for placeholder, value in replacements.items():
pattern = re.compile(r"{{\s*" + re.escape(placeholder) + r"\s*}}") pattern = re.compile(r'{{\s*' + re.escape(placeholder) + r'\s*}}')
output_string = pattern.sub(str(value), output_string) output_string = pattern.sub(str(value), output_string)
if re.search(r"{\s*\w+\s*}", output_string): if re.search(r'{\s*\w+\s*}', output_string):
pass # Or log a warning: print(f"Warning: Unreplaced placeholders found: {output_string}") pass # Or log a warning: print(f"Warning: Unreplaced placeholders found: {output_string}")
return output_string return output_string

View File

@ -0,0 +1,96 @@
# target_simulator/gui/external_profiler.py
import socket
import logging
class ExternalProfiler:
"""
Gestisce la logica di forwarding dei pacchetti su porte dedicate per il profiling esterno.
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.port_a = 0
self.port_b = 0
self.socket_a = None
self.socket_b = None
self.is_active = False
self.use_broadcast = False
def start(self, port_a: int, port_b: int, use_broadcast: bool = False) -> bool:
"""
Crea i socket e avvia il profiler.
Args:
port_a: Porta per l'invio del pacchetto raw (inizio elaborazione)
port_b: Porta per l'invio dopo elaborazione (fine elaborazione)
use_broadcast: Se True, invia in broadcast invece che solo a localhost
"""
if self.is_active:
self.logger.warning("Profiler is already active.")
return True
self.port_a = port_a
self.port_b = port_b
self.use_broadcast = use_broadcast
try:
self.socket_a = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket_a.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if use_broadcast:
self.socket_a.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.socket_b = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket_b.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if use_broadcast:
self.socket_b.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.is_active = True
target = "broadcast" if use_broadcast else "127.0.0.1"
self.logger.info(f"External Profiler started. Forwarding to {target} on ports A:{port_a}, B:{port_b}")
return True
except Exception as e:
self.logger.error(f"Failed to create sockets for profiling: {e}")
self.stop()
return False
def stop(self):
"""
Ferma il profiler e chiude i socket.
"""
if self.socket_a:
self.socket_a.close()
self.socket_a = None
if self.socket_b:
self.socket_b.close()
self.socket_b = None
self.is_active = False
self.logger.info("External Profiler stopped.")
def forward_to_port_a(self, raw_bytes: bytes, addr: tuple):
"""
Callback per la Porta A: inoltra il pacchetto raw se è un messaggio di scenario.
"""
if not self.is_active or not self.socket_a or len(raw_bytes) <= 6:
return
flow = raw_bytes[6]
if flow in (ord('R'), ord('r')):
try:
dest_addr = '255.255.255.255' if self.use_broadcast else '127.0.0.1'
self.socket_a.sendto(raw_bytes, (dest_addr, self.port_a))
except Exception as e:
self.logger.error(f"Failed to forward to Port A: {e}")
def forward_to_port_b(self, raw_bytes: bytes):
"""
Callback per la Porta B: inoltra il pacchetto raw dopo l'elaborazione.
"""
if not self.is_active or not self.socket_b or len(raw_bytes) <= 6:
return
flow = raw_bytes[6]
if flow in (ord('R'), ord('r')):
try:
dest_addr = '255.255.255.255' if self.use_broadcast else '127.0.0.1'
self.socket_b.sendto(raw_bytes, (dest_addr, self.port_b))
except Exception as e:
self.logger.error(f"Failed to forward to Port B: {e}")

View File

@ -0,0 +1,140 @@
# target_simulator/gui/external_profiler_window.py
import tkinter as tk
from tkinter import ttk, messagebox
from target_simulator.gui.external_profiler import ExternalProfiler
from target_simulator.core.sfp_communicator import SFPCommunicator
class ExternalProfilerWindow(tk.Toplevel):
"""
Finestra di dialogo per controllare lo strumento di profiling esterno.
"""
def __init__(self, master):
super().__init__(master)
self.title("External Profiling Tool")
self.geometry("500x450")
self.transient(master)
self.grab_set()
self.profiler = ExternalProfiler()
self.router = None
# Ottieni il router dal communicator SFP principale
communicator = getattr(master, "target_communicator", None)
if isinstance(communicator, SFPCommunicator):
self.router = communicator.router()
self._create_widgets()
self.protocol("WM_DELETE_WINDOW", self._on_close)
def _create_widgets(self):
main_frame = ttk.Frame(self, padding=10)
main_frame.pack(fill=tk.BOTH, expand=True)
# Info panel
info_frame = ttk.LabelFrame(main_frame, text=" Information", padding=10)
info_frame.pack(fill='x', pady=(0, 10))
info_text = (
"This tool broadcasts RIS packets to specified UDP ports for network latency analysis.\n\n"
"• Port A: Packets sent when received from server (start of processing)\n"
"• Port B: Packets sent after processing (end of processing)\n\n"
"To capture packets, use Wireshark with filter:\n"
" udp.port == <PortA> || udp.port == <PortB>\n\n"
"Enable Broadcast when the server is on a different machine."
)
info_label = ttk.Label(info_frame, text=info_text, justify='left', wraplength=460)
info_label.pack(anchor='w')
# Configuration frame
config_frame = ttk.LabelFrame(main_frame, text="Configuration", padding=10)
config_frame.pack(fill='x', pady=(0, 10))
ttk.Label(config_frame, text="Port A (Start of Processing):").pack(anchor='w')
self.port_a_var = tk.StringVar(value="55001")
self.port_a_entry = ttk.Entry(config_frame, textvariable=self.port_a_var)
self.port_a_entry.pack(fill='x', pady=(0, 10))
ttk.Label(config_frame, text="Port B (End of Processing):").pack(anchor='w')
self.port_b_var = tk.StringVar(value="55002")
self.port_b_entry = ttk.Entry(config_frame, textvariable=self.port_b_var)
self.port_b_entry.pack(fill='x', pady=(0, 10))
# Broadcast checkbox
self.broadcast_var = tk.BooleanVar(value=False)
self.broadcast_check = ttk.Checkbutton(
config_frame,
text="Enable Broadcast (for remote servers)",
variable=self.broadcast_var
)
self.broadcast_check.pack(anchor='w', pady=(5, 0))
broadcast_hint = ttk.Label(
config_frame,
text="⚠ Localhost (127.0.0.1) if disabled, Broadcast (255.255.255.255) if enabled",
font=('TkDefaultFont', 8),
foreground='gray'
)
broadcast_hint.pack(anchor='w', padx=(20, 0))
# Control frame
control_frame = ttk.Frame(main_frame)
control_frame.pack(fill='x', pady=(10, 0))
self.start_stop_button = ttk.Button(control_frame, text="Start Profiling", command=self._toggle_profiling)
self.start_stop_button.pack(pady=(0, 10))
self.status_var = tk.StringVar(value="Status: Inactive")
status_label = ttk.Label(control_frame, textvariable=self.status_var, font=('TkDefaultFont', 9, 'bold'))
status_label.pack()
def _toggle_profiling(self):
if self.profiler.is_active:
self._stop_profiling()
else:
self._start_profiling()
def _start_profiling(self):
if not self.router:
messagebox.showerror("Error", "SFP communicator's router not found.", parent=self)
return
try:
port_a = int(self.port_a_var.get())
port_b = int(self.port_b_var.get())
if port_a == port_b:
raise ValueError("Ports must be different.")
except ValueError as e:
messagebox.showerror("Invalid Input", f"Please enter valid and different port numbers.\n{e}", parent=self)
return
use_broadcast = self.broadcast_var.get()
if self.profiler.start(port_a, port_b, use_broadcast):
# Registra i callback sul router
self.router.add_raw_packet_listener(self.profiler.forward_to_port_a)
self.router.add_post_processing_listener(self.profiler.forward_to_port_b)
target = "broadcast" if use_broadcast else "localhost"
self.status_var.set(f"Status: Active - Sending to {target} ports {port_a}/{port_b}")
self.start_stop_button.config(text="Stop Profiling")
self.port_a_entry.config(state='disabled')
self.port_b_entry.config(state='disabled')
self.broadcast_check.config(state='disabled')
def _stop_profiling(self):
self.profiler.stop()
if self.router:
# Deregistra i callback per pulizia
self.router.remove_raw_packet_listener(self.profiler.forward_to_port_a)
self.router.remove_post_processing_listener(self.profiler.forward_to_port_b)
self.status_var.set("Status: Inactive")
self.start_stop_button.config(text="Start Profiling")
self.port_a_entry.config(state='normal')
self.port_b_entry.config(state='normal')
self.broadcast_check.config(state='normal')
def _on_close(self):
if self.profiler.is_active:
self._stop_profiling()
self.destroy()

View File

@ -63,6 +63,7 @@ from target_simulator.core import command_builder
from target_simulator.analysis.simulation_archive import SimulationArchive from target_simulator.analysis.simulation_archive import SimulationArchive
from target_simulator.communication.communicator_manager import CommunicatorManager from target_simulator.communication.communicator_manager import CommunicatorManager
from target_simulator.simulation.simulation_controller import SimulationController from target_simulator.simulation.simulation_controller import SimulationController
from target_simulator.gui.external_profiler_window import ExternalProfilerWindow
from target_simulator.gui.sync_tool_window import SyncToolWindow from target_simulator.gui.sync_tool_window import SyncToolWindow
@ -74,7 +75,7 @@ try:
except ImportError: except ImportError:
WRAPPER_APP_VERSION_STRING = "(Dev Wrapper)" WRAPPER_APP_VERSION_STRING = "(Dev Wrapper)"
GUI_REFRESH_RATE_MS = 40 GUI_REFRESH_RATE_MS = 100 #40 default 25 fps
class MainView(tk.Tk): class MainView(tk.Tk):
@ -362,6 +363,7 @@ class MainView(tk.Tk):
) )
debug_menu.add_separator() debug_menu.add_separator()
debug_menu.add_command(label="Sync Tool...", command=self._open_sync_tool) debug_menu.add_command(label="Sync Tool...", command=self._open_sync_tool)
debug_menu.add_command(label="External Profiler...", command=self._open_external_profiler)
def _create_statusbar(self): def _create_statusbar(self):
"""Create and place the application's status bar widget and expose vars.""" """Create and place the application's status bar widget and expose vars."""
@ -985,3 +987,7 @@ class MainView(tk.Tk):
"Sync Tool requires an active SFP communicator.", "Sync Tool requires an active SFP communicator.",
parent=self, parent=self,
) )
def _open_external_profiler(self):
"""Apre la finestra dello strumento di profiling esterno."""
ExternalProfilerWindow(self)

View File

@ -35,6 +35,11 @@ logger = logging.getLogger(__name__)
PayloadHandler = Callable[[bytearray], None] PayloadHandler = Callable[[bytearray], None]
TargetListListener = Callable[[List[Target]], None] TargetListListener = Callable[[List[Target]], None]
# Definiamo i tipi per i nuovi listener per maggiore chiarezza
RawPacketListener = Callable[[bytes, tuple], None]
PostProcessingListener = Callable[[bytes], None]
# --- Constants --- # --- Constants ---
M_TO_FT = 3.28084 M_TO_FT = 3.28084
PERFORMANCE_SAMPLES_BUFFER_SIZE = 10000 PERFORMANCE_SAMPLES_BUFFER_SIZE = 10000
@ -56,6 +61,11 @@ class DebugPayloadRouter:
self.logger = logger self.logger = logger
self._log_prefix = "[DebugPayloadRouter]" self._log_prefix = "[DebugPayloadRouter]"
self._lock = threading.Lock() self._lock = threading.Lock()
# Liste per i nuovi listener di profiling esterno
self._raw_packet_listeners: List[RawPacketListener] = []
self._post_processing_listeners: List[PostProcessingListener] = []
self._latest_payloads: Dict[str, Any] = {} self._latest_payloads: Dict[str, Any] = {}
self._last_raw_packet: Optional[tuple] = None self._last_raw_packet: Optional[tuple] = None
self._sfp_debug_history_size = 20 self._sfp_debug_history_size = 20
@ -127,6 +137,38 @@ class DebugPayloadRouter:
except Exception: except Exception:
self._clock_sync = None self._clock_sync = None
def add_raw_packet_listener(self, listener: RawPacketListener):
"""Registra un listener per essere notificato all'arrivo di un pacchetto raw (Hook Porta A)."""
with self._lock:
if listener not in self._raw_packet_listeners:
self._raw_packet_listeners.append(listener)
self.logger.debug(f"Raw packet listener registered. Total: {len(self._raw_packet_listeners)}")
def remove_raw_packet_listener(self, listener: RawPacketListener):
"""De-registra un listener per i pacchetti raw."""
with self._lock:
try:
self._raw_packet_listeners.remove(listener)
self.logger.debug(f"Raw packet listener removed. Total: {len(self._raw_packet_listeners)}")
except ValueError:
pass
def add_post_processing_listener(self, listener: PostProcessingListener):
"""Registra un listener per essere notificato dopo l'elaborazione di un pacchetto (Hook Porta B)."""
with self._lock:
if listener not in self._post_processing_listeners:
self._post_processing_listeners.append(listener)
self.logger.debug(f"Post-processing listener registered. Total: {len(self._post_processing_listeners)}")
def remove_post_processing_listener(self, listener: PostProcessingListener):
"""De-registra un listener di post-elaborazione."""
with self._lock:
try:
self._post_processing_listeners.remove(listener)
self.logger.debug(f"Post-processing listener removed. Total: {len(self._post_processing_listeners)}")
except ValueError:
pass
def shutdown(self): def shutdown(self):
"""Signals the processing worker thread to stop.""" """Signals the processing worker thread to stop."""
self.logger.info("Shutting down payload router worker thread...") self.logger.info("Shutting down payload router worker thread...")
@ -144,12 +186,15 @@ class DebugPayloadRouter:
self.logger.info("Payload processing worker thread started.") self.logger.info("Payload processing worker thread started.")
while not self._stop_worker.is_set(): while not self._stop_worker.is_set():
try: try:
# CORREZIONE: L'item in coda è ora (payload, reception_timestamp, raw_packet_originale)
item = self._processing_queue.get(timeout=1.0) item = self._processing_queue.get(timeout=1.0)
if item is None: if item is None:
break break
payload, reception_timestamp = item payload, reception_timestamp, raw_packet = item
self._dispatch_payload(payload, reception_timestamp)
self._dispatch_payload(bytearray(payload), reception_timestamp, raw_packet)
self._processing_queue.task_done() self._processing_queue.task_done()
except Empty: except Empty:
@ -158,10 +203,9 @@ class DebugPayloadRouter:
self.logger.exception("Unexpected error in payload processing loop.") self.logger.exception("Unexpected error in payload processing loop.")
self.logger.info("Payload processing worker thread stopped.") self.logger.info("Payload processing worker thread stopped.")
def _dispatch_payload(self, payload: bytearray, reception_timestamp: float): def _dispatch_payload(self, payload: bytearray, reception_timestamp: float, raw_bytes_for_profiling: bytes):
""" """
Worker Thread: Analyzes the payload and routes it to the correct handler. Worker Thread: Analyzes the payload and routes it to the correct handler.
Distinguishes between RIS status messages and SYNC replies.
""" """
try: try:
if len(payload) < ctypes.sizeof(DataTag): if len(payload) < ctypes.sizeof(DataTag):
@ -171,17 +215,21 @@ class DebugPayloadRouter:
return return
tag = DataTag.from_buffer_copy(payload) tag = DataTag.from_buffer_copy(payload)
tag_str = f"{chr(tag.ID[0]) if 32 <= tag.ID[0] <= 126 else tag.ID[0]}{chr(tag.ID[1]) if 32 <= tag.ID[1] <= 126 else tag.ID[1]}"
# 'SY' -> SYNC Reply (ha priorità perché è specifico)
if tag.ID[0] == ord("S") and tag.ID[1] == ord("Y"): if tag.ID[0] == ord("S") and tag.ID[1] == ord("Y"):
self._process_sync_reply_payload(payload, reception_timestamp) self._process_sync_reply_payload(payload, reception_timestamp)
else: else:
# Tutti gli altri payload sul flow 'R' sono RIS Status
# (che iniziano con scenario_tag, target_tag, ecc.)
self._process_ris_status_payload(payload, reception_timestamp) self._process_ris_status_payload(payload, reception_timestamp)
except Exception: except Exception:
self.logger.exception("Error during payload dispatch.") self.logger.exception("Error during payload dispatch.")
finally:
# Notifica i listener di post-elaborazione
with self._lock:
for listener in self._post_processing_listeners:
try:
listener(raw_bytes_for_profiling)
except Exception as e:
self.logger.error(f"Error in post-processing listener: {e}")
def _process_sync_reply_payload( def _process_sync_reply_payload(
self, payload: bytearray, reception_timestamp: float self, payload: bytearray, reception_timestamp: float
@ -190,20 +238,13 @@ class DebugPayloadRouter:
Worker Thread: Handles a SYNC reply. Worker Thread: Handles a SYNC reply.
""" """
try: try:
# The payload here is the DataTag + SfpRisSyncPayload
# Skip the DataTag (8 bytes) to get to the actual sync data
sync_data = payload[ctypes.sizeof(DataTag) :] sync_data = payload[ctypes.sizeof(DataTag) :]
if len(sync_data) < 16: if len(sync_data) < 16:
self.logger.warning( self.logger.warning(
f"SYNC payload too short: {len(sync_data)} bytes, need at least 16" f"SYNC payload too short: {len(sync_data)} bytes, need at least 16"
) )
return return
# Usa la struttura ctypes che ora corrisponde esattamente al C++
# Ordine corretto: flags, cc_cookie, ris_timetag, tx_period_ms
sync_payload = SfpRisSyncPayload.from_buffer_copy(sync_data) sync_payload = SfpRisSyncPayload.from_buffer_copy(sync_data)
result = { result = {
"cookie": sync_payload.cc_cookie, "cookie": sync_payload.cc_cookie,
"server_timetag": sync_payload.ris_timetag, "server_timetag": sync_payload.ris_timetag,
@ -211,15 +252,10 @@ class DebugPayloadRouter:
"flags": sync_payload.flags, "flags": sync_payload.flags,
"tx_period_ms": sync_payload.tx_period_ms, "tx_period_ms": sync_payload.tx_period_ms,
} }
# Put the result into the dedicated queue for the Sync Tool window
try: try:
self._sync_results_queue.put_nowait(result) self._sync_results_queue.put_nowait(result)
except Full: except Full:
# La coda è piena (probabilmente la finestra Sync Tool non è aperta o non sta consumando)
# Scartiamo silenziosamente - non è un errore critico
pass pass
except Exception as e: except Exception as e:
self.logger.warning( self.logger.warning(
f"Failed to parse SYNC reply payload: {e}", exc_info=True f"Failed to parse SYNC reply payload: {e}", exc_info=True
@ -227,13 +263,18 @@ class DebugPayloadRouter:
def _handle_ris_status(self, payload: bytearray): def _handle_ris_status(self, payload: bytearray):
""" """
Network Thread: Queues an incoming RIS status payload for processing. Network Thread: This handler is called by SfpTransport.
It receives the REASSEMBLED PAYLOAD (without SFP Header).
CORREZIONE: Questa funzione è stata rimossa perché la logica è stata spostata.
Il `PayloadRouter` ora riceve i pacchetti raw tramite `update_raw_packet` e gestisce
la decodifica e l'inoltro internamente.
Per compatibilità con il dizionario `_handlers`, la lasciamo, ma
ora la logica di accodamento è in `update_raw_packet` per avere accesso
al pacchetto completo.
""" """
reception_timestamp = time.monotonic() pass # La logica è ora in update_raw_packet
try:
self._processing_queue.put_nowait((payload, reception_timestamp))
except Full:
self.logger.error("Payload processing queue is full! A packet was dropped.")
def _process_ris_status_payload( def _process_ris_status_payload(
self, payload: bytearray, reception_timestamp: float self, payload: bytearray, reception_timestamp: float
@ -250,14 +291,17 @@ class DebugPayloadRouter:
parsed_payload = None parsed_payload = None
t_parse_start = time.perf_counter() t_parse_start = time.perf_counter()
try: try:
# Qui usiamo 'payload' che è già stato privato dell'header SFP
parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload) parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload)
except (ValueError, TypeError): except (ValueError, TypeError) as e:
self.logger.error("Failed to parse SfpRisStatusPayload from buffer.") # Aggiungiamo contesto all'errore
self.logger.error(f"Failed to parse SfpRisStatusPayload from buffer (len={len(payload)}). Error: {e}")
return return
t_parse_end = time.perf_counter() t_parse_end = time.perf_counter()
if self._profiling_enabled: if self._profiling_enabled:
self._perf_counters["parse_time_total"] += t_parse_end - t_parse_start self._perf_counters["parse_time_total"] += t_parse_end - t_parse_start
# ... (TUTTO il resto del metodo _process_ris_status_payload rimane IDENTICO) ...
t_hub_start = time.perf_counter() t_hub_start = time.perf_counter()
if self._hub: if self._hub:
try: try:
@ -451,6 +495,9 @@ class DebugPayloadRouter:
pass pass
def get_handlers(self) -> Dict[int, PayloadHandler]: def get_handlers(self) -> Dict[int, PayloadHandler]:
# Questa funzione non è più il modo primario con cui il router riceve i dati,
# ma la manteniamo per compatibilità con il transport.
# Il vero punto di ingresso è `update_raw_packet`.
return self._handlers return self._handlers
def _update_last_payload(self, flow_id: str, payload: Any): def _update_last_payload(self, flow_id: str, payload: Any):
@ -573,6 +620,31 @@ class DebugPayloadRouter:
return new_payloads return new_payloads
def update_raw_packet(self, raw_bytes: bytes, addr: tuple): def update_raw_packet(self, raw_bytes: bytes, addr: tuple):
# Notifica i listener del pacchetto raw (Hook per la Porta A)
with self._lock:
for listener in self._raw_packet_listeners:
try:
listener(raw_bytes, addr)
except Exception as e:
self.logger.error(f"Error in raw packet listener: {e}")
# CORREZIONE: Gestiamo qui l'accodamento dei pacchetti RIS
header_size = SFPHeader.size()
if len(raw_bytes) > header_size:
try:
# Leggiamo il flow ID direttamente dal pacchetto raw
flow = raw_bytes[6] # L'offset del campo SFP_FLOW è 6
if flow in (ord('R'), ord('r')):
reception_timestamp = time.monotonic()
# Estraiamo la payload per il worker thread
payload = raw_bytes[header_size:]
# Mettiamo in coda la tupla completa per il worker
self._processing_queue.put_nowait((payload, reception_timestamp, raw_bytes))
except Full:
self.logger.error("Payload processing queue is full! A RIS packet was dropped.")
except Exception as e:
self.logger.error(f"Error queuing RIS packet in update_raw_packet: {e}")
with self._lock: with self._lock:
self._last_raw_packet = (raw_bytes, addr) self._last_raw_packet = (raw_bytes, addr)
entry = {"ts": datetime.datetime.utcnow(), "addr": addr, "raw": raw_bytes} entry = {"ts": datetime.datetime.utcnow(), "addr": addr, "raw": raw_bytes}

View File

@ -369,8 +369,19 @@ class PPIDisplay(ttk.Frame):
def update_simulated_targets(self, targets: List[Target]): def update_simulated_targets(self, targets: List[Target]):
"""Updates and redraws only the simulated targets.""" """Updates and redraws only the simulated targets."""
self._update_target_category(targets, "simulated") self._update_target_category(targets, "simulated")
# Schedule a non-blocking redraw to avoid blocking the Tk event loop.
# The main view already calls `canvas.draw_idle()` once per GUI loop,
# but calling `draw_idle()` here helps ensure an update is requested
# when this method is invoked directly by UI actions.
if self.canvas: if self.canvas:
self.canvas.draw() try:
self.canvas.draw_idle()
except Exception:
# Fall back to a blocking draw if draw_idle is not available
try:
self.canvas.draw()
except Exception:
logger.exception("Failed to redraw PPI canvas")
def update_real_targets(self, targets: List[Target]): def update_real_targets(self, targets: List[Target]):
"""Updates and redraws only the real targets.""" """Updates and redraws only the real targets."""
@ -385,8 +396,17 @@ class PPIDisplay(ttk.Frame):
pass pass
self._update_target_category(targets, "real") self._update_target_category(targets, "real")
# Use non-blocking redraw to prevent long Matplotlib draw times from
# stalling the Tk event loop. This keeps the GUI refresh loop closer
# to the configured `GUI_REFRESH_RATE_MS` even under heavy load.
if self.canvas: if self.canvas:
self.canvas.draw() try:
self.canvas.draw_idle()
except Exception:
try:
self.canvas.draw()
except Exception:
logger.exception("Failed to redraw PPI canvas")
def get_real_update_rate(self, window_seconds: float = 1.0) -> float: def get_real_update_rate(self, window_seconds: float = 1.0) -> float:
""" """

View File

@ -0,0 +1,47 @@
#!/usr/bin/env python
"""
Script di test per verificare che l'invio UDP funzioni correttamente.
Prova ad inviare alcuni pacchetti alle porte 55001 e 55002.
"""
import socket
import time
def test_udp_send():
"""Testa l'invio di pacchetti UDP sulle porte di profiling."""
# Crea un socket UDP
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
test_data = b"TEST_PACKET_" + str(time.time()).encode('ascii')
port_a = 55001
port_b = 55002
dest = '127.0.0.1'
print(f"Tentativo di invio su {dest}:{port_a}")
try:
bytes_sent = sock.sendto(test_data, (dest, port_a))
print(f"✓ Inviati {bytes_sent} bytes alla porta {port_a}")
except Exception as e:
print(f"✗ ERRORE nell'invio alla porta {port_a}: {e}")
import traceback
traceback.print_exc()
time.sleep(0.1)
print(f"\nTentativo di invio su {dest}:{port_b}")
try:
bytes_sent = sock.sendto(test_data, (dest, port_b))
print(f"✓ Inviati {bytes_sent} bytes alla porta {port_b}")
except Exception as e:
print(f"✗ ERRORE nell'invio alla porta {port_b}: {e}")
import traceback
traceback.print_exc()
sock.close()
print("\nTest completato. Controlla Wireshark per vedere se i pacchetti sono stati catturati.")
print("Filtro Wireshark suggerito: udp.port == 55001 || udp.port == 55002")
if __name__ == '__main__':
test_udp_send()