diff --git a/settings.json b/settings.json index 78f4f29..b1b8a18 100644 --- a/settings.json +++ b/settings.json @@ -27,10 +27,6 @@ "baudrate": 9600 } } - }, - "sfp_debug": { - "history_size": 100, - "persist_raw": false } }, "scenarios": { diff --git a/target_simulator/core/sfp_structures.py b/target_simulator/core/sfp_structures.py index 7defe74..4d7c752 100644 --- a/target_simulator/core/sfp_structures.py +++ b/target_simulator/core/sfp_structures.py @@ -168,3 +168,55 @@ class ImageLeaderData(ctypes.Structure): def get_colour_map_size() -> int: """Returns the static size of the COLOUR_MAP field in bytes.""" return 1024 + + +# --- RIS Status payload (minimal) --- +# These are conservative, minimal definitions to extract useful fields +# from the server's SFP_RIS::status_message_t. They intentionally include +# only the fields we currently need (timetag, platform azimuth, velocity, +# geographic position, and a small target struct). + +DSP_RIS_MAX_TGT = 16 + + +class RisTarget(ctypes.Structure): + _pack_ = 1 + _fields_ = [ + ("flags", ctypes.c_int32), + ("heading", ctypes.c_float), + ("x", ctypes.c_float), + ("y", ctypes.c_float), + ("z", ctypes.c_float), + ] + + +class RisScenario(ctypes.Structure): + _pack_ = 1 + _fields_ = [ + ("timetag", ctypes.c_uint32), + ("platform_azimuth", ctypes.c_float), + ("vx", ctypes.c_float), + ("vy", ctypes.c_float), + ("vz", ctypes.c_float), + ("baro_altitude", ctypes.c_float), + ("latitude", ctypes.c_float), + ("longitude", ctypes.c_float), + ("true_heading", ctypes.c_float), + ] + + +class RisTargetsBlock(ctypes.Structure): + _pack_ = 1 + _fields_ = [("tgt", RisTarget * DSP_RIS_MAX_TGT)] + + +class SfpRisStatusPayload(ctypes.Structure): + _pack_ = 1 + _fields_ = [ + ("scenario", RisScenario), + ("tgt", RisTargetsBlock), + ] + + @classmethod + def size(cls): + return ctypes.sizeof(cls) diff --git a/target_simulator/core/sfp_transport.py b/target_simulator/core/sfp_transport.py index 45a36b0..473c595 100644 --- a/target_simulator/core/sfp_transport.py +++ b/target_simulator/core/sfp_transport.py @@ -1,11 +1,7 @@ -# core/transport/sfp_transport.py -""" -Provides a reusable transport layer for the Simple Fragmentation Protocol (SFP). +"""Clean SFP transport implementation with detailed logging. -This module handles UDP socket communication, SFP header parsing, fragment -reassembly, and ACK generation. It is application-agnostic and uses a -callback/handler system to pass fully reassembled payloads to the -application layer based on the SFP_FLOW identifier. +Handles UDP receive loop, SFP header parsing, fragment reassembly, ACK +generation, and hands completed payloads to registered handlers. """ import socket @@ -14,27 +10,12 @@ import threading import time from typing import Dict, Callable, Optional -# Rimosso l'import da config, ora la classe è indipendente from target_simulator.utils.network import create_udp_socket, close_udp_socket from target_simulator.core.sfp_structures import SFPHeader -# Define a type hint for payload handlers PayloadHandler = Callable[[bytearray], None] -# Module-level logging control ------------------------------------------------- -# You can override the logging level for this module by setting LOG_LEVEL at -# the top of this file (e.g. LOG_LEVEL = logging.DEBUG) or by configuring the -# returned `logger` object from outside the module: -# -# import logging -# import target_simulator.core.sfp_transport as st -# st.LOG_LEVEL = logging.DEBUG -# st.logger.setLevel(st.LOG_LEVEL) -# -# If left as None the logger will inherit configuration from the root logger. LOG_LEVEL: Optional[int] = logging.INFO - -# Create a module logger. Use module name for clear hierarchical logging. logger = logging.getLogger(__name__) if LOG_LEVEL is not None: logger.setLevel(LOG_LEVEL) @@ -43,7 +24,6 @@ if LOG_LEVEL is not None: class SfpTransport: """Manages SFP communication and payload reassembly.""" - # --- INIZIO MODIFICHE --- def __init__( self, host: str, @@ -52,34 +32,22 @@ class SfpTransport: ack_config: Optional[Dict[int, int]] = None, raw_packet_callback: Optional[Callable[[bytes, tuple], None]] = None, ): - """ - Initializes the SFP Transport layer. - - Args: - host (str): The local IP address to bind the UDP socket to. - port (int): The local port to listen on. - payload_handlers (Dict[int, PayloadHandler]): A dictionary mapping - SFP_FLOW IDs to their corresponding handler functions. - ack_config (Optional[Dict[int, int]]): A dictionary mapping SFP_FLOW IDs - to the SFP_WIN value to use in the ACK packet. If None, defaults will be used. - """ - # --- FINE MODIFICHE --- self._log_prefix = "[SfpTransport]" logger.info(f"{self._log_prefix} Initializing for {host}:{port}...") self._host = host self._port = port self._payload_handlers = payload_handlers - # --- INIZIO MODIFICHE --- self._ack_config = ack_config if ack_config is not None else {} - # Optional callback invoked with (raw_packet_bytes, addr) for every packet received self._raw_packet_callback = raw_packet_callback - # --- FINE MODIFICHE --- + self._socket: Optional[socket.socket] = None self._receiver_thread: Optional[threading.Thread] = None self._stop_event = threading.Event() + # transaction state: key=(flow, tid) -> {frag_index: total_frags} self._fragments: Dict[tuple, Dict[int, int]] = {} + # buffers for reassembly: key=(flow, tid) -> bytearray(total_size) self._buffers: Dict[tuple, bytearray] = {} logger.debug( @@ -89,7 +57,6 @@ class SfpTransport: logger.debug(f"{self._log_prefix} ACK window config: {self._ack_config}") def start(self) -> bool: - """Starts the transport layer... (nessuna modifica qui)""" if self._receiver_thread is not None and self._receiver_thread.is_alive(): logger.warning( f"{self._log_prefix} Start called, but receiver is already running." @@ -111,7 +78,6 @@ class SfpTransport: return True def shutdown(self): - """Stops the receiver thread... (nessuna modifica qui)""" self._stop_event.set() if self._socket: @@ -129,7 +95,6 @@ class SfpTransport: logger.info(f"{self._log_prefix} Shutdown complete.") def _receive_loop(self): - """The main loop... (nessuna modifica qui)""" log_prefix = f"{self._log_prefix} Loop" logger.info(f"{log_prefix} Starting receive loop.") @@ -142,7 +107,6 @@ class SfpTransport: data, addr = self._socket.recvfrom(65535) if not data: continue - # Deliver raw packet to optional callback for inspection (non-blocking best-effort) try: if self._raw_packet_callback: self._raw_packet_callback(data, addr) @@ -166,7 +130,12 @@ class SfpTransport: logger.info(f"{log_prefix} Receive loop terminated.") def _process_packet(self, data: bytes, addr: tuple): - """Parses an SFP packet... (nessuna modifica qui)""" + """Parse SFP packet, log details, and reassemble fragments. + + Logging includes parsed header fields and a small payload preview. + The preview attempts JSON decode to detect text-based payloads; if + that fails the first bytes are logged in hex. + """ header_size = SFPHeader.size() if len(data) < header_size: logger.warning(f"Packet from {addr} is too small for SFP header. Ignoring.") @@ -178,10 +147,321 @@ class SfpTransport: logger.error(f"Failed to parse SFP header from {addr}. Ignoring.") return - flow, tid = header.SFP_FLOW, header.SFP_TID - frag, total_frags = header.SFP_FRAG, header.SFP_TOTFRGAS - pl_size, pl_offset = header.SFP_PLSIZE, header.SFP_PLOFFSET + # Extract header fields + flow = header.SFP_FLOW + tid = header.SFP_TID + frag = header.SFP_FRAG + total_frags = header.SFP_TOTFRGAS + pl_size = header.SFP_PLSIZE + pl_offset = header.SFP_PLOFFSET total_size = header.SFP_TOTSIZE + flags = header.SFP_FLAGS + + try: + flow_name = chr(flow) if 32 <= flow < 127 else str(flow) + except Exception: + flow_name = str(flow) + + # Payload preview for logging + payload_preview = data[header_size : header_size + 256] + try: + import json + + json.loads(payload_preview.decode("utf-8", errors="strict")) + payload_preview_text = "JSON (preview)" + except Exception: + payload_preview_text = "Hex preview: " + " ".join( + f"{b:02X}" for b in payload_preview[:64] + ) + + logger.info( + f"{self._log_prefix} Packet from {addr} - flow={flow_name} ({flow}), tid={tid}, flags=0x{flags:X}, frag={frag}/{total_frags}, pl_size={pl_size}, pl_offset={pl_offset}, total_size={total_size}. {payload_preview_text}" + ) + + key = (flow, tid) + + # If sender requested an ACK bit in flags, reply + if header.SFP_FLAGS & 0x01: + self._send_ack(addr, data[:header_size]) + + # Basic validation + if total_frags == 0 or total_frags > 60000 or total_size <= 0: + logger.warning( + f"Invalid metadata for {key}: total_frags={total_frags}, total_size={total_size}. Ignoring." + ) + return + + # Start a new transaction when frag==0 + if frag == 0: + self._cleanup_lingering_transactions(flow, tid) + logger.debug( + f"New transaction started for key={key}. Total size: {total_size} bytes." + ) + self._fragments[key] = {} + try: + self._buffers[key] = bytearray(total_size) + except (MemoryError, ValueError): + logger.error( + f"Failed to allocate {total_size} bytes for key={key}. Ignoring transaction." + ) + self._fragments.pop(key, None) + return + + if key not in self._buffers or key not in self._fragments: + logger.debug( + f"Ignoring fragment {frag} for untracked transaction key={key}." + ) + return + + self._fragments[key][frag] = total_frags + payload = data[header_size:] + bytes_to_copy = min(pl_size, len(payload)) + + if (pl_offset + bytes_to_copy) > len(self._buffers[key]): + logger.error( + f"Payload for key={key}, frag={frag} would overflow buffer. Ignoring." + ) + return + + # Copy into buffer + self._buffers[key][pl_offset : pl_offset + bytes_to_copy] = payload[:bytes_to_copy] + + # If all fragments received, hand off to handler + if len(self._fragments[key]) == total_frags: + logger.debug( + f"Transaction complete for key={key}. Handing off to application layer." + ) + + completed_payload = self._buffers.pop(key) + self._fragments.pop(key) + + handler = self._payload_handlers.get(flow) + if handler: + try: + handler(completed_payload) + except Exception: + logger.exception( + f"Error executing payload handler for flow {flow}." + ) + else: + logger.warning(f"No payload handler registered for flow ID {flow}.") + + def _send_ack(self, dest_addr: tuple, original_header_bytes: bytes): + log_prefix = f"{self._log_prefix} ACK" + if not self._socket: + return + + try: + ack_header = bytearray(original_header_bytes) + flow = ack_header[SFPHeader.get_field_offset("SFP_FLOW")] + + window_size = self._ack_config.get(flow, 0) + + ack_header[SFPHeader.get_field_offset("SFP_DIRECTION")] = 0x3C # '<' + ack_header[SFPHeader.get_field_offset("SFP_WIN")] = window_size + original_flags = ack_header[SFPHeader.get_field_offset("SFP_FLAGS")] + ack_header[SFPHeader.get_field_offset("SFP_FLAGS")] = ( + original_flags | 0x01 + ) & ~0x02 + + self._socket.sendto(ack_header, dest_addr) + logger.debug( + f"{log_prefix} Sent to {dest_addr} for flow {chr(flow) if 32<=flow<=126 else flow} with WIN={window_size}." + ) + except Exception: + logger.exception(f"{log_prefix} Failed to send to {dest_addr}.") + + def _cleanup_lingering_transactions(self, current_flow: int, current_tid: int): + keys_to_remove = [ + key + for key in self._fragments + if key[0] == current_flow and key[1] != current_tid + ] + for key in keys_to_remove: + logger.warning( + f"Cleaning up lingering/incomplete transaction for key={key}." + ) + self._fragments.pop(key, None) + self._buffers.pop(key, None) +""" +Provides a reusable transport layer for the Simple Fragmentation Protocol (SFP). + +This module handles UDP socket communication, SFP header parsing, fragment +reassembly, and ACK generation. It is application-agnostic and uses a +callback/handler system to pass fully reassembled payloads to the +application layer based on the SFP_FLOW identifier. +""" + +""" +SFP transport layer (clean, single definition). +""" + +import socket +import logging +import threading +import time +from typing import Dict, Callable, Optional + +from target_simulator.utils.network import create_udp_socket, close_udp_socket +from target_simulator.core.sfp_structures import SFPHeader + +PayloadHandler = Callable[[bytearray], None] + +LOG_LEVEL: Optional[int] = logging.INFO +logger = logging.getLogger(__name__) +if LOG_LEVEL is not None: + logger.setLevel(LOG_LEVEL) + + +class SfpTransport: + """Manages SFP communication and payload reassembly.""" + + def __init__( + self, + host: str, + port: int, + payload_handlers: Dict[int, PayloadHandler], + ack_config: Optional[Dict[int, int]] = None, + raw_packet_callback: Optional[Callable[[bytes, tuple], None]] = None, + ): + self._log_prefix = "[SfpTransport]" + logger.info(f"{self._log_prefix} Initializing for {host}:{port}...") + + self._host = host + self._port = port + self._payload_handlers = payload_handlers + self._ack_config = ack_config if ack_config is not None else {} + self._raw_packet_callback = raw_packet_callback + + self._socket: Optional[socket.socket] = None + self._receiver_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + + self._fragments: Dict[tuple, Dict[int, int]] = {} + self._buffers: Dict[tuple, bytearray] = {} + + logger.debug( + f"{self._log_prefix} Registered handlers for flows: " + f"{[chr(k) if 32 <= k <= 126 else k for k in self._payload_handlers.keys()]}" + ) + logger.debug(f"{self._log_prefix} ACK window config: {self._ack_config}") + + def start(self) -> bool: + if self._receiver_thread is not None and self._receiver_thread.is_alive(): + logger.warning( + f"{self._log_prefix} Start called, but receiver is already running." + ) + return True + + self._socket = create_udp_socket(self._host, self._port) + if not self._socket: + logger.critical( + f"{self._log_prefix} Failed to create and bind socket. Cannot start." + ) + return False + + self._stop_event.clear() + self._receiver_thread = threading.Thread( + target=self._receive_loop, name="SfpTransportThread", daemon=True + ) + self._receiver_thread.start() + return True + + def shutdown(self): + self._stop_event.set() + + if self._socket: + close_udp_socket(self._socket) + self._socket = None + + if self._receiver_thread and self._receiver_thread.is_alive(): + logger.debug(f"{self._log_prefix} Waiting for receiver thread to join...") + self._receiver_thread.join(timeout=2.0) + if self._receiver_thread.is_alive(): + logger.warning( + f"{self._log_prefix} Receiver thread did not join cleanly." + ) + + logger.info(f"{self._log_prefix} Shutdown complete.") + + def _receive_loop(self): + log_prefix = f"{self._log_prefix} Loop" + logger.info(f"{log_prefix} Starting receive loop.") + + while not self._stop_event.is_set(): + if not self._socket: + logger.error(f"{log_prefix} Socket is not available. Stopping loop.") + break + + try: + data, addr = self._socket.recvfrom(65535) + if not data: + continue + try: + if self._raw_packet_callback: + self._raw_packet_callback(data, addr) + except Exception: + logger.exception( + f"{log_prefix} raw_packet_callback raised an exception" + ) + except socket.timeout: + continue + except OSError: + if not self._stop_event.is_set(): + logger.error(f"{log_prefix} Socket error.", exc_info=True) + break + except Exception: + logger.exception(f"{log_prefix} Unexpected error in recvfrom.") + time.sleep(0.01) + continue + + self._process_packet(data, addr) + + logger.info(f"{log_prefix} Receive loop terminated.") + + def _process_packet(self, data: bytes, addr: tuple): + header_size = SFPHeader.size() + if len(data) < header_size: + logger.warning(f"Packet from {addr} is too small for SFP header. Ignoring.") + return + + try: + header = SFPHeader.from_buffer_copy(data) + except (ValueError, TypeError): + logger.error(f"Failed to parse SFP header from {addr}. Ignoring.") + return + + # Extract fields + flow = header.SFP_FLOW + tid = header.SFP_TID + frag = header.SFP_FRAG + total_frags = header.SFP_TOTFRGAS + pl_size = header.SFP_PLSIZE + pl_offset = header.SFP_PLOFFSET + total_size = header.SFP_TOTSIZE + flags = header.SFP_FLAGS + + try: + flow_name = chr(flow) if 32 <= flow < 127 else str(flow) + except Exception: + flow_name = str(flow) + + # Preview payload for logging + payload_preview = data[header_size : header_size + 256] + try: + import json + + json.loads(payload_preview.decode("utf-8", errors="strict")) + payload_preview_text = "JSON (preview)" + except Exception: + payload_preview_text = "Hex preview: " + " ".join( + f"{b:02X}" for b in payload_preview[:64] + ) + + logger.debug( + f"{self._log_prefix} Packet from {addr} - flow={flow_name} ({flow}), tid={tid}, flags=0x{flags:X}, frag={frag}/{total_frags}, pl_size={pl_size}, pl_offset={pl_offset}, total_size={total_size}. {payload_preview_text}" + ) + key = (flow, tid) if header.SFP_FLAGS & 0x01: @@ -224,9 +504,7 @@ class SfpTransport: ) return - self._buffers[key][pl_offset : pl_offset + bytes_to_copy] = payload[ - :bytes_to_copy - ] + self._buffers[key][pl_offset : pl_offset + bytes_to_copy] = payload[:bytes_to_copy] if len(self._fragments[key]) == total_frags: logger.debug( @@ -248,7 +526,6 @@ class SfpTransport: logger.warning(f"No payload handler registered for flow ID {flow}.") def _send_ack(self, dest_addr: tuple, original_header_bytes: bytes): - """Sends an SFP ACK packet back to the sender.""" log_prefix = f"{self._log_prefix} ACK" if not self._socket: return @@ -257,11 +534,7 @@ class SfpTransport: ack_header = bytearray(original_header_bytes) flow = ack_header[SFPHeader.get_field_offset("SFP_FLOW")] - # --- INIZIO MODIFICHE --- - # Usa la configurazione passata al costruttore. Se non c'è una voce - # per questo flow, usa 0 come default sicuro. window_size = self._ack_config.get(flow, 0) - # --- FINE MODIFICHE --- ack_header[SFPHeader.get_field_offset("SFP_DIRECTION")] = 0x3C # '<' ack_header[SFPHeader.get_field_offset("SFP_WIN")] = window_size @@ -278,10 +551,14 @@ class SfpTransport: logger.exception(f"{log_prefix} Failed to send to {dest_addr}.") def _cleanup_lingering_transactions(self, current_flow: int, current_tid: int): - """Removes old, incomplete transactions... (nessuna modifica qui)""" + """Remove old, incomplete transactions for the same flow but different TID. + + This prevents buffers from previous transactions (same flow) from + interfering with a newly started transaction. + """ keys_to_remove = [ key - for key in self._fragments + for key in list(self._fragments.keys()) if key[0] == current_flow and key[1] != current_tid ] for key in keys_to_remove: diff --git a/target_simulator/gui/sfp_debug_window.py b/target_simulator/gui/sfp_debug_window.py index 1586870..46aa9f0 100644 --- a/target_simulator/gui/sfp_debug_window.py +++ b/target_simulator/gui/sfp_debug_window.py @@ -15,6 +15,7 @@ import os import ctypes import time from typing import Dict, Callable, Optional, Any +import socket # Third-party imports for image display try: @@ -28,7 +29,11 @@ except ImportError: # Imports from the project structure from target_simulator.core.sfp_transport import SfpTransport, PayloadHandler -from target_simulator.core.sfp_structures import ImageLeaderData, SFPHeader +from target_simulator.core.sfp_structures import ( + ImageLeaderData, + SFPHeader, + SfpRisStatusPayload, +) # --- Helper Class for Routing and Buffering Payloads --- @@ -69,6 +74,7 @@ class DebugPayloadRouter: ord("S"): lambda payload: self._update_last_payload("SAR", payload), ord("B"): lambda payload: self._update_last_payload("BIN", payload), ord("J"): lambda payload: self._update_last_payload("JSON", payload), + ord("R"): lambda payload: self._handle_ris_status(payload), } def _update_last_payload(self, flow_id: str, payload: bytearray): @@ -76,6 +82,36 @@ class DebugPayloadRouter: with self._lock: self._latest_payloads[flow_id] = payload + def _handle_ris_status(self, payload: bytearray): + """Try to parse a RIS status payload and store a concise summary. + + If parsing fails, store the raw payload as before. + """ + try: + if len(payload) >= SfpRisStatusPayload.size(): + # Interpret the first bytes as the status payload + parsed = SfpRisStatusPayload.from_buffer_copy(bytes(payload[: SfpRisStatusPayload.size()])) + sc = parsed.scenario + # Build a concise textual summary + summary = ( + f"timetag={sc.timetag}, az={sc.platform_azimuth:.3f}, vx={sc.vx:.2f}, vy={sc.vy:.2f}, vz={sc.vz:.2f}, lat={sc.latitude:.6f}, lon={sc.longitude:.6f}, heading={sc.true_heading:.3f}" + ) + # For targets, include first 4 enabled targets + targets = [] + for t in parsed.tgt.tgt[:4]: + if t.flags != 0: + targets.append(f"id? flags={t.flags} h={t.heading:.1f} x={t.x:.1f} y={t.y:.1f} z={t.z:.1f}") + if targets: + summary += "; targets: " + ", ".join(targets) + self._update_last_payload("RIS_STATUS", bytearray(summary.encode("utf-8"))) + return + except Exception: + # fall through to storing raw payload + pass + + # Fallback: store raw payload + self._update_last_payload("RIS_STATUS", payload) + def get_and_clear_latest_payloads(self) -> Dict[str, bytearray]: """ Thread-safely retrieves all new payloads received since the last call @@ -216,6 +252,8 @@ class SfpDebugWindow(tk.Toplevel): self.protocol("WM_DELETE_WINDOW", self._on_close) self.after(self.GUI_POLL_INTERVAL_MS, self._process_latest_payloads) + # Track last raw update time to throttle high-volume flows + self._last_raw_update_ts = 0.0 def _create_widgets(self): # --- Connection Controls (unchanged) --- @@ -243,6 +281,29 @@ class SfpDebugWindow(tk.Toplevel): conn_frame, text="Image size...", command=self._open_image_size_dialog ) self.image_size_btn.pack(side=tk.LEFT, padx=5) + # Button to send a simple UDP probe to the configured IP:Port + self.send_probe_btn = ttk.Button( + conn_frame, text="Send probe", command=self._on_send_probe + ) + self.send_probe_btn.pack(side=tk.LEFT, padx=5) + # Button to send a minimal SFP ACK packet to the configured IP:Port + self.send_ack_btn = ttk.Button( + conn_frame, text="Send ACK", command=self._on_send_ack + ) + self.send_ack_btn.pack(side=tk.LEFT, padx=5) + + # --- Script Sender Frame (below connection) --- + script_frame = ttk.Frame(self) + script_frame.pack(side=tk.TOP, fill=tk.X, padx=5, pady=(0, 5)) + ttk.Label(script_frame, text="Script to send:").pack(side=tk.LEFT, padx=(5, 2)) + self.script_var = tk.StringVar(value="print('hello from client')") + ttk.Entry(script_frame, textvariable=self.script_var, width=60).pack( + side=tk.LEFT, padx=(0, 5) + ) + self.send_script_btn = ttk.Button( + script_frame, text="Send script", command=self._on_send_script + ) + self.send_script_btn.pack(side=tk.LEFT, padx=5) # --- Data Display Notebook (unchanged) --- self.notebook = ttk.Notebook(self) @@ -470,6 +531,173 @@ class SfpDebugWindow(tk.Toplevel): except Exception: pass + def _on_send_probe(self): + """Sends a small UDP probe to the configured IP:port to "wake" the server. + + The server expects any message on its listening port to begin sending SFP + messages, so we just send a short datagram. This function is intentionally + lightweight and does not depend on self.sfp_transport; it uses a temporary + UDP socket so it can be invoked even when not connected/listening. + """ + ip = self.ip_var.get() + try: + port = int(self.port_var.get()) + except Exception: + self._log_to_widget("ERROR: Invalid port number for probe.", "ERROR") + return + + probe_payload = b"SFP_PROBE\n" # simple payload; server will accept any data + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.settimeout(1.0) + sock.sendto(probe_payload, (ip, port)) + sock.close() + self._log_to_widget(f"Sent probe to {ip}:{port}", "INFO") + except Exception as e: + self._log_to_widget(f"Failed to send probe to {ip}:{port}: {e}", "ERROR") + + def _on_send_ack(self): + """Constructs a minimal SFP ACK header and sends it to the server. + + Uses the active transport socket when available so the packet originates + from the same local port; otherwise uses a temporary UDP socket. + """ + ip = self.ip_var.get() + try: + port = int(self.port_var.get()) + except Exception: + self._log_to_widget("ERROR: Invalid port number for ACK.", "ERROR") + return + + try: + # Construct a minimal valid SFP data fragment (frag 0 of 1) with a small payload. + payload = b"SFP_WAKE" # small payload so server sees valid metadata + + hdr = SFPHeader() + # Direction: normal data (keep 0 for unspecified) or use '<' if needed by server + hdr.SFP_DIRECTION = 0x3C + hdr.SFP_FLOW = ord("M") if isinstance("M", str) else 0 + hdr.SFP_TID = 1 + # No special flags except zero; server expects total_frags > 0 + hdr.SFP_FLAGS = 0x00 + hdr.SFP_WIN = 32 + + # Fragment metadata: this is the only fragment + hdr.SFP_TOTFRGAS = 1 + hdr.SFP_FRAG = 0 + hdr.SFP_PLSIZE = len(payload) + hdr.SFP_PLOFFSET = 0 + hdr.SFP_TOTSIZE = len(payload) + + pkt = bytes(hdr) + payload + + # Prefer to reuse the SfpTransport socket if available so source port matches + sock = None + used_temp_sock = False + if self.sfp_transport and getattr(self.sfp_transport, '_socket', None): + try: + sock = self.sfp_transport._socket + except Exception: + sock = None + + if not sock: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + used_temp_sock = True + + sock.sendto(pkt, (ip, port)) + if used_temp_sock: + sock.close() + + self._log_to_widget(f"Sent SFP data-fragment to {ip}:{port} (flow=M,t id=1)", "INFO") + except Exception as e: + self._log_to_widget(f"Failed to send SFP fragment to {ip}:{port}: {e}", "ERROR") + + def _on_send_script(self): + """Constructs a script_message_t-like payload and sends it to the server. + + The server expects a data tag with tag 'C','S' and type_validity set. + We'll build the payload using ctypes to match layout and send it using + the transport socket (if available) so the server treats us as the client. + """ + ip = self.ip_var.get() + try: + port = int(self.port_var.get()) + except Exception: + self._log_to_widget("ERROR: Invalid port number for script send.", "ERROR") + return + + script_text = self.script_var.get() or "" + # Limit script size to 1020 bytes to be conservative (server has ~1024) + script_bytes = script_text.encode("utf-8") + max_script = 1020 + if len(script_bytes) > max_script: + script_bytes = script_bytes[:max_script] + + # Local ctypes definitions that mirror what the C++ server expects + class LocalDataTag(ctypes.Structure): + _pack_ = 1 + _fields_ = [ + ("ID", ctypes.c_uint8 * 2), + ("VALID", ctypes.c_uint8), + ("VERSION", ctypes.c_uint8), + ("SIZE", ctypes.c_uint32), + ] + + class ScriptPayload(ctypes.Structure): + _pack_ = 1 + _fields_ = [ + ("script_tag", LocalDataTag), + ("script", ctypes.c_uint8 * 1024), + ] + + try: + payload = ScriptPayload() + # set tag ID to 'C','S' + payload.script_tag.ID[0] = ord("C") + payload.script_tag.ID[1] = ord("S") + payload.script_tag.VALID = 1 + payload.script_tag.VERSION = 1 + payload.script_tag.SIZE = len(script_bytes) + # copy script bytes + for i, b in enumerate(script_bytes): + payload.script[i] = b + + # Build SFP header + hdr = SFPHeader() + hdr.SFP_DIRECTION = 0x3C + hdr.SFP_FLOW = ord("R") # use 'R' for RIS script commands + hdr.SFP_TID = 1 + hdr.SFP_FLAGS = 0x00 + hdr.SFP_WIN = 32 + hdr.SFP_TOTFRGAS = 1 + hdr.SFP_FRAG = 0 + pl_bytes = bytes(payload) + hdr.SFP_PLSIZE = len(pl_bytes) + hdr.SFP_PLOFFSET = 0 + hdr.SFP_TOTSIZE = len(pl_bytes) + + pkt = bytes(hdr) + pl_bytes + + sock = None + used_temp = False + if self.sfp_transport and getattr(self.sfp_transport, "_socket", None): + try: + sock = self.sfp_transport._socket + except Exception: + sock = None + + if not sock: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + used_temp = True + + sock.sendto(pkt, (ip, port)) + if used_temp: + sock.close() + + self._log_to_widget(f"Sent script ({len(script_bytes)} bytes) to {ip}:{port}", "INFO") + except Exception as e: + self._log_to_widget(f"Failed to send script to {ip}:{port}: {e}", "ERROR") + def _on_disconnect(self): if self.sfp_transport: self._log_to_widget("Disconnecting...", "INFO")