inserito il flusso dati come quello fornito dal server sfp del dsp, wip

This commit is contained in:
VALLONGOL 2025-10-16 20:24:11 +02:00
parent f35f0112af
commit c77f56a599
4 changed files with 614 additions and 61 deletions

View File

@ -27,10 +27,6 @@
"baudrate": 9600 "baudrate": 9600
} }
} }
},
"sfp_debug": {
"history_size": 100,
"persist_raw": false
} }
}, },
"scenarios": { "scenarios": {

View File

@ -168,3 +168,55 @@ class ImageLeaderData(ctypes.Structure):
def get_colour_map_size() -> int: def get_colour_map_size() -> int:
"""Returns the static size of the COLOUR_MAP field in bytes.""" """Returns the static size of the COLOUR_MAP field in bytes."""
return 1024 return 1024
# --- RIS Status payload (minimal) ---
# These are conservative, minimal definitions to extract useful fields
# from the server's SFP_RIS::status_message_t. They intentionally include
# only the fields we currently need (timetag, platform azimuth, velocity,
# geographic position, and a small target struct).
DSP_RIS_MAX_TGT = 16
class RisTarget(ctypes.Structure):
_pack_ = 1
_fields_ = [
("flags", ctypes.c_int32),
("heading", ctypes.c_float),
("x", ctypes.c_float),
("y", ctypes.c_float),
("z", ctypes.c_float),
]
class RisScenario(ctypes.Structure):
_pack_ = 1
_fields_ = [
("timetag", ctypes.c_uint32),
("platform_azimuth", ctypes.c_float),
("vx", ctypes.c_float),
("vy", ctypes.c_float),
("vz", ctypes.c_float),
("baro_altitude", ctypes.c_float),
("latitude", ctypes.c_float),
("longitude", ctypes.c_float),
("true_heading", ctypes.c_float),
]
class RisTargetsBlock(ctypes.Structure):
_pack_ = 1
_fields_ = [("tgt", RisTarget * DSP_RIS_MAX_TGT)]
class SfpRisStatusPayload(ctypes.Structure):
_pack_ = 1
_fields_ = [
("scenario", RisScenario),
("tgt", RisTargetsBlock),
]
@classmethod
def size(cls):
return ctypes.sizeof(cls)

View File

@ -1,11 +1,7 @@
# core/transport/sfp_transport.py """Clean SFP transport implementation with detailed logging.
"""
Provides a reusable transport layer for the Simple Fragmentation Protocol (SFP).
This module handles UDP socket communication, SFP header parsing, fragment Handles UDP receive loop, SFP header parsing, fragment reassembly, ACK
reassembly, and ACK generation. It is application-agnostic and uses a generation, and hands completed payloads to registered handlers.
callback/handler system to pass fully reassembled payloads to the
application layer based on the SFP_FLOW identifier.
""" """
import socket import socket
@ -14,27 +10,12 @@ import threading
import time import time
from typing import Dict, Callable, Optional from typing import Dict, Callable, Optional
# Rimosso l'import da config, ora la classe è indipendente
from target_simulator.utils.network import create_udp_socket, close_udp_socket from target_simulator.utils.network import create_udp_socket, close_udp_socket
from target_simulator.core.sfp_structures import SFPHeader from target_simulator.core.sfp_structures import SFPHeader
# Define a type hint for payload handlers
PayloadHandler = Callable[[bytearray], None] PayloadHandler = Callable[[bytearray], None]
# Module-level logging control -------------------------------------------------
# You can override the logging level for this module by setting LOG_LEVEL at
# the top of this file (e.g. LOG_LEVEL = logging.DEBUG) or by configuring the
# returned `logger` object from outside the module:
#
# import logging
# import target_simulator.core.sfp_transport as st
# st.LOG_LEVEL = logging.DEBUG
# st.logger.setLevel(st.LOG_LEVEL)
#
# If left as None the logger will inherit configuration from the root logger.
LOG_LEVEL: Optional[int] = logging.INFO LOG_LEVEL: Optional[int] = logging.INFO
# Create a module logger. Use module name for clear hierarchical logging.
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if LOG_LEVEL is not None: if LOG_LEVEL is not None:
logger.setLevel(LOG_LEVEL) logger.setLevel(LOG_LEVEL)
@ -43,7 +24,6 @@ if LOG_LEVEL is not None:
class SfpTransport: class SfpTransport:
"""Manages SFP communication and payload reassembly.""" """Manages SFP communication and payload reassembly."""
# --- INIZIO MODIFICHE ---
def __init__( def __init__(
self, self,
host: str, host: str,
@ -52,34 +32,22 @@ class SfpTransport:
ack_config: Optional[Dict[int, int]] = None, ack_config: Optional[Dict[int, int]] = None,
raw_packet_callback: Optional[Callable[[bytes, tuple], None]] = None, raw_packet_callback: Optional[Callable[[bytes, tuple], None]] = None,
): ):
"""
Initializes the SFP Transport layer.
Args:
host (str): The local IP address to bind the UDP socket to.
port (int): The local port to listen on.
payload_handlers (Dict[int, PayloadHandler]): A dictionary mapping
SFP_FLOW IDs to their corresponding handler functions.
ack_config (Optional[Dict[int, int]]): A dictionary mapping SFP_FLOW IDs
to the SFP_WIN value to use in the ACK packet. If None, defaults will be used.
"""
# --- FINE MODIFICHE ---
self._log_prefix = "[SfpTransport]" self._log_prefix = "[SfpTransport]"
logger.info(f"{self._log_prefix} Initializing for {host}:{port}...") logger.info(f"{self._log_prefix} Initializing for {host}:{port}...")
self._host = host self._host = host
self._port = port self._port = port
self._payload_handlers = payload_handlers self._payload_handlers = payload_handlers
# --- INIZIO MODIFICHE ---
self._ack_config = ack_config if ack_config is not None else {} self._ack_config = ack_config if ack_config is not None else {}
# Optional callback invoked with (raw_packet_bytes, addr) for every packet received
self._raw_packet_callback = raw_packet_callback self._raw_packet_callback = raw_packet_callback
# --- FINE MODIFICHE ---
self._socket: Optional[socket.socket] = None self._socket: Optional[socket.socket] = None
self._receiver_thread: Optional[threading.Thread] = None self._receiver_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event() self._stop_event = threading.Event()
# transaction state: key=(flow, tid) -> {frag_index: total_frags}
self._fragments: Dict[tuple, Dict[int, int]] = {} self._fragments: Dict[tuple, Dict[int, int]] = {}
# buffers for reassembly: key=(flow, tid) -> bytearray(total_size)
self._buffers: Dict[tuple, bytearray] = {} self._buffers: Dict[tuple, bytearray] = {}
logger.debug( logger.debug(
@ -89,7 +57,6 @@ class SfpTransport:
logger.debug(f"{self._log_prefix} ACK window config: {self._ack_config}") logger.debug(f"{self._log_prefix} ACK window config: {self._ack_config}")
def start(self) -> bool: def start(self) -> bool:
"""Starts the transport layer... (nessuna modifica qui)"""
if self._receiver_thread is not None and self._receiver_thread.is_alive(): if self._receiver_thread is not None and self._receiver_thread.is_alive():
logger.warning( logger.warning(
f"{self._log_prefix} Start called, but receiver is already running." f"{self._log_prefix} Start called, but receiver is already running."
@ -111,7 +78,6 @@ class SfpTransport:
return True return True
def shutdown(self): def shutdown(self):
"""Stops the receiver thread... (nessuna modifica qui)"""
self._stop_event.set() self._stop_event.set()
if self._socket: if self._socket:
@ -129,7 +95,6 @@ class SfpTransport:
logger.info(f"{self._log_prefix} Shutdown complete.") logger.info(f"{self._log_prefix} Shutdown complete.")
def _receive_loop(self): def _receive_loop(self):
"""The main loop... (nessuna modifica qui)"""
log_prefix = f"{self._log_prefix} Loop" log_prefix = f"{self._log_prefix} Loop"
logger.info(f"{log_prefix} Starting receive loop.") logger.info(f"{log_prefix} Starting receive loop.")
@ -142,7 +107,6 @@ class SfpTransport:
data, addr = self._socket.recvfrom(65535) data, addr = self._socket.recvfrom(65535)
if not data: if not data:
continue continue
# Deliver raw packet to optional callback for inspection (non-blocking best-effort)
try: try:
if self._raw_packet_callback: if self._raw_packet_callback:
self._raw_packet_callback(data, addr) self._raw_packet_callback(data, addr)
@ -166,7 +130,12 @@ class SfpTransport:
logger.info(f"{log_prefix} Receive loop terminated.") logger.info(f"{log_prefix} Receive loop terminated.")
def _process_packet(self, data: bytes, addr: tuple): def _process_packet(self, data: bytes, addr: tuple):
"""Parses an SFP packet... (nessuna modifica qui)""" """Parse SFP packet, log details, and reassemble fragments.
Logging includes parsed header fields and a small payload preview.
The preview attempts JSON decode to detect text-based payloads; if
that fails the first bytes are logged in hex.
"""
header_size = SFPHeader.size() header_size = SFPHeader.size()
if len(data) < header_size: if len(data) < header_size:
logger.warning(f"Packet from {addr} is too small for SFP header. Ignoring.") logger.warning(f"Packet from {addr} is too small for SFP header. Ignoring.")
@ -178,10 +147,321 @@ class SfpTransport:
logger.error(f"Failed to parse SFP header from {addr}. Ignoring.") logger.error(f"Failed to parse SFP header from {addr}. Ignoring.")
return return
flow, tid = header.SFP_FLOW, header.SFP_TID # Extract header fields
frag, total_frags = header.SFP_FRAG, header.SFP_TOTFRGAS flow = header.SFP_FLOW
pl_size, pl_offset = header.SFP_PLSIZE, header.SFP_PLOFFSET tid = header.SFP_TID
frag = header.SFP_FRAG
total_frags = header.SFP_TOTFRGAS
pl_size = header.SFP_PLSIZE
pl_offset = header.SFP_PLOFFSET
total_size = header.SFP_TOTSIZE total_size = header.SFP_TOTSIZE
flags = header.SFP_FLAGS
try:
flow_name = chr(flow) if 32 <= flow < 127 else str(flow)
except Exception:
flow_name = str(flow)
# Payload preview for logging
payload_preview = data[header_size : header_size + 256]
try:
import json
json.loads(payload_preview.decode("utf-8", errors="strict"))
payload_preview_text = "JSON (preview)"
except Exception:
payload_preview_text = "Hex preview: " + " ".join(
f"{b:02X}" for b in payload_preview[:64]
)
logger.info(
f"{self._log_prefix} Packet from {addr} - flow={flow_name} ({flow}), tid={tid}, flags=0x{flags:X}, frag={frag}/{total_frags}, pl_size={pl_size}, pl_offset={pl_offset}, total_size={total_size}. {payload_preview_text}"
)
key = (flow, tid)
# If sender requested an ACK bit in flags, reply
if header.SFP_FLAGS & 0x01:
self._send_ack(addr, data[:header_size])
# Basic validation
if total_frags == 0 or total_frags > 60000 or total_size <= 0:
logger.warning(
f"Invalid metadata for {key}: total_frags={total_frags}, total_size={total_size}. Ignoring."
)
return
# Start a new transaction when frag==0
if frag == 0:
self._cleanup_lingering_transactions(flow, tid)
logger.debug(
f"New transaction started for key={key}. Total size: {total_size} bytes."
)
self._fragments[key] = {}
try:
self._buffers[key] = bytearray(total_size)
except (MemoryError, ValueError):
logger.error(
f"Failed to allocate {total_size} bytes for key={key}. Ignoring transaction."
)
self._fragments.pop(key, None)
return
if key not in self._buffers or key not in self._fragments:
logger.debug(
f"Ignoring fragment {frag} for untracked transaction key={key}."
)
return
self._fragments[key][frag] = total_frags
payload = data[header_size:]
bytes_to_copy = min(pl_size, len(payload))
if (pl_offset + bytes_to_copy) > len(self._buffers[key]):
logger.error(
f"Payload for key={key}, frag={frag} would overflow buffer. Ignoring."
)
return
# Copy into buffer
self._buffers[key][pl_offset : pl_offset + bytes_to_copy] = payload[:bytes_to_copy]
# If all fragments received, hand off to handler
if len(self._fragments[key]) == total_frags:
logger.debug(
f"Transaction complete for key={key}. Handing off to application layer."
)
completed_payload = self._buffers.pop(key)
self._fragments.pop(key)
handler = self._payload_handlers.get(flow)
if handler:
try:
handler(completed_payload)
except Exception:
logger.exception(
f"Error executing payload handler for flow {flow}."
)
else:
logger.warning(f"No payload handler registered for flow ID {flow}.")
def _send_ack(self, dest_addr: tuple, original_header_bytes: bytes):
log_prefix = f"{self._log_prefix} ACK"
if not self._socket:
return
try:
ack_header = bytearray(original_header_bytes)
flow = ack_header[SFPHeader.get_field_offset("SFP_FLOW")]
window_size = self._ack_config.get(flow, 0)
ack_header[SFPHeader.get_field_offset("SFP_DIRECTION")] = 0x3C # '<'
ack_header[SFPHeader.get_field_offset("SFP_WIN")] = window_size
original_flags = ack_header[SFPHeader.get_field_offset("SFP_FLAGS")]
ack_header[SFPHeader.get_field_offset("SFP_FLAGS")] = (
original_flags | 0x01
) & ~0x02
self._socket.sendto(ack_header, dest_addr)
logger.debug(
f"{log_prefix} Sent to {dest_addr} for flow {chr(flow) if 32<=flow<=126 else flow} with WIN={window_size}."
)
except Exception:
logger.exception(f"{log_prefix} Failed to send to {dest_addr}.")
def _cleanup_lingering_transactions(self, current_flow: int, current_tid: int):
keys_to_remove = [
key
for key in self._fragments
if key[0] == current_flow and key[1] != current_tid
]
for key in keys_to_remove:
logger.warning(
f"Cleaning up lingering/incomplete transaction for key={key}."
)
self._fragments.pop(key, None)
self._buffers.pop(key, None)
"""
Provides a reusable transport layer for the Simple Fragmentation Protocol (SFP).
This module handles UDP socket communication, SFP header parsing, fragment
reassembly, and ACK generation. It is application-agnostic and uses a
callback/handler system to pass fully reassembled payloads to the
application layer based on the SFP_FLOW identifier.
"""
"""
SFP transport layer (clean, single definition).
"""
import socket
import logging
import threading
import time
from typing import Dict, Callable, Optional
from target_simulator.utils.network import create_udp_socket, close_udp_socket
from target_simulator.core.sfp_structures import SFPHeader
PayloadHandler = Callable[[bytearray], None]
LOG_LEVEL: Optional[int] = logging.INFO
logger = logging.getLogger(__name__)
if LOG_LEVEL is not None:
logger.setLevel(LOG_LEVEL)
class SfpTransport:
"""Manages SFP communication and payload reassembly."""
def __init__(
self,
host: str,
port: int,
payload_handlers: Dict[int, PayloadHandler],
ack_config: Optional[Dict[int, int]] = None,
raw_packet_callback: Optional[Callable[[bytes, tuple], None]] = None,
):
self._log_prefix = "[SfpTransport]"
logger.info(f"{self._log_prefix} Initializing for {host}:{port}...")
self._host = host
self._port = port
self._payload_handlers = payload_handlers
self._ack_config = ack_config if ack_config is not None else {}
self._raw_packet_callback = raw_packet_callback
self._socket: Optional[socket.socket] = None
self._receiver_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._fragments: Dict[tuple, Dict[int, int]] = {}
self._buffers: Dict[tuple, bytearray] = {}
logger.debug(
f"{self._log_prefix} Registered handlers for flows: "
f"{[chr(k) if 32 <= k <= 126 else k for k in self._payload_handlers.keys()]}"
)
logger.debug(f"{self._log_prefix} ACK window config: {self._ack_config}")
def start(self) -> bool:
if self._receiver_thread is not None and self._receiver_thread.is_alive():
logger.warning(
f"{self._log_prefix} Start called, but receiver is already running."
)
return True
self._socket = create_udp_socket(self._host, self._port)
if not self._socket:
logger.critical(
f"{self._log_prefix} Failed to create and bind socket. Cannot start."
)
return False
self._stop_event.clear()
self._receiver_thread = threading.Thread(
target=self._receive_loop, name="SfpTransportThread", daemon=True
)
self._receiver_thread.start()
return True
def shutdown(self):
self._stop_event.set()
if self._socket:
close_udp_socket(self._socket)
self._socket = None
if self._receiver_thread and self._receiver_thread.is_alive():
logger.debug(f"{self._log_prefix} Waiting for receiver thread to join...")
self._receiver_thread.join(timeout=2.0)
if self._receiver_thread.is_alive():
logger.warning(
f"{self._log_prefix} Receiver thread did not join cleanly."
)
logger.info(f"{self._log_prefix} Shutdown complete.")
def _receive_loop(self):
log_prefix = f"{self._log_prefix} Loop"
logger.info(f"{log_prefix} Starting receive loop.")
while not self._stop_event.is_set():
if not self._socket:
logger.error(f"{log_prefix} Socket is not available. Stopping loop.")
break
try:
data, addr = self._socket.recvfrom(65535)
if not data:
continue
try:
if self._raw_packet_callback:
self._raw_packet_callback(data, addr)
except Exception:
logger.exception(
f"{log_prefix} raw_packet_callback raised an exception"
)
except socket.timeout:
continue
except OSError:
if not self._stop_event.is_set():
logger.error(f"{log_prefix} Socket error.", exc_info=True)
break
except Exception:
logger.exception(f"{log_prefix} Unexpected error in recvfrom.")
time.sleep(0.01)
continue
self._process_packet(data, addr)
logger.info(f"{log_prefix} Receive loop terminated.")
def _process_packet(self, data: bytes, addr: tuple):
header_size = SFPHeader.size()
if len(data) < header_size:
logger.warning(f"Packet from {addr} is too small for SFP header. Ignoring.")
return
try:
header = SFPHeader.from_buffer_copy(data)
except (ValueError, TypeError):
logger.error(f"Failed to parse SFP header from {addr}. Ignoring.")
return
# Extract fields
flow = header.SFP_FLOW
tid = header.SFP_TID
frag = header.SFP_FRAG
total_frags = header.SFP_TOTFRGAS
pl_size = header.SFP_PLSIZE
pl_offset = header.SFP_PLOFFSET
total_size = header.SFP_TOTSIZE
flags = header.SFP_FLAGS
try:
flow_name = chr(flow) if 32 <= flow < 127 else str(flow)
except Exception:
flow_name = str(flow)
# Preview payload for logging
payload_preview = data[header_size : header_size + 256]
try:
import json
json.loads(payload_preview.decode("utf-8", errors="strict"))
payload_preview_text = "JSON (preview)"
except Exception:
payload_preview_text = "Hex preview: " + " ".join(
f"{b:02X}" for b in payload_preview[:64]
)
logger.debug(
f"{self._log_prefix} Packet from {addr} - flow={flow_name} ({flow}), tid={tid}, flags=0x{flags:X}, frag={frag}/{total_frags}, pl_size={pl_size}, pl_offset={pl_offset}, total_size={total_size}. {payload_preview_text}"
)
key = (flow, tid) key = (flow, tid)
if header.SFP_FLAGS & 0x01: if header.SFP_FLAGS & 0x01:
@ -224,9 +504,7 @@ class SfpTransport:
) )
return return
self._buffers[key][pl_offset : pl_offset + bytes_to_copy] = payload[ self._buffers[key][pl_offset : pl_offset + bytes_to_copy] = payload[:bytes_to_copy]
:bytes_to_copy
]
if len(self._fragments[key]) == total_frags: if len(self._fragments[key]) == total_frags:
logger.debug( logger.debug(
@ -248,7 +526,6 @@ class SfpTransport:
logger.warning(f"No payload handler registered for flow ID {flow}.") logger.warning(f"No payload handler registered for flow ID {flow}.")
def _send_ack(self, dest_addr: tuple, original_header_bytes: bytes): def _send_ack(self, dest_addr: tuple, original_header_bytes: bytes):
"""Sends an SFP ACK packet back to the sender."""
log_prefix = f"{self._log_prefix} ACK" log_prefix = f"{self._log_prefix} ACK"
if not self._socket: if not self._socket:
return return
@ -257,11 +534,7 @@ class SfpTransport:
ack_header = bytearray(original_header_bytes) ack_header = bytearray(original_header_bytes)
flow = ack_header[SFPHeader.get_field_offset("SFP_FLOW")] flow = ack_header[SFPHeader.get_field_offset("SFP_FLOW")]
# --- INIZIO MODIFICHE ---
# Usa la configurazione passata al costruttore. Se non c'è una voce
# per questo flow, usa 0 come default sicuro.
window_size = self._ack_config.get(flow, 0) window_size = self._ack_config.get(flow, 0)
# --- FINE MODIFICHE ---
ack_header[SFPHeader.get_field_offset("SFP_DIRECTION")] = 0x3C # '<' ack_header[SFPHeader.get_field_offset("SFP_DIRECTION")] = 0x3C # '<'
ack_header[SFPHeader.get_field_offset("SFP_WIN")] = window_size ack_header[SFPHeader.get_field_offset("SFP_WIN")] = window_size
@ -278,10 +551,14 @@ class SfpTransport:
logger.exception(f"{log_prefix} Failed to send to {dest_addr}.") logger.exception(f"{log_prefix} Failed to send to {dest_addr}.")
def _cleanup_lingering_transactions(self, current_flow: int, current_tid: int): def _cleanup_lingering_transactions(self, current_flow: int, current_tid: int):
"""Removes old, incomplete transactions... (nessuna modifica qui)""" """Remove old, incomplete transactions for the same flow but different TID.
This prevents buffers from previous transactions (same flow) from
interfering with a newly started transaction.
"""
keys_to_remove = [ keys_to_remove = [
key key
for key in self._fragments for key in list(self._fragments.keys())
if key[0] == current_flow and key[1] != current_tid if key[0] == current_flow and key[1] != current_tid
] ]
for key in keys_to_remove: for key in keys_to_remove:

View File

@ -15,6 +15,7 @@ import os
import ctypes import ctypes
import time import time
from typing import Dict, Callable, Optional, Any from typing import Dict, Callable, Optional, Any
import socket
# Third-party imports for image display # Third-party imports for image display
try: try:
@ -28,7 +29,11 @@ except ImportError:
# Imports from the project structure # Imports from the project structure
from target_simulator.core.sfp_transport import SfpTransport, PayloadHandler from target_simulator.core.sfp_transport import SfpTransport, PayloadHandler
from target_simulator.core.sfp_structures import ImageLeaderData, SFPHeader from target_simulator.core.sfp_structures import (
ImageLeaderData,
SFPHeader,
SfpRisStatusPayload,
)
# --- Helper Class for Routing and Buffering Payloads --- # --- Helper Class for Routing and Buffering Payloads ---
@ -69,6 +74,7 @@ class DebugPayloadRouter:
ord("S"): lambda payload: self._update_last_payload("SAR", payload), ord("S"): lambda payload: self._update_last_payload("SAR", payload),
ord("B"): lambda payload: self._update_last_payload("BIN", payload), ord("B"): lambda payload: self._update_last_payload("BIN", payload),
ord("J"): lambda payload: self._update_last_payload("JSON", payload), ord("J"): lambda payload: self._update_last_payload("JSON", payload),
ord("R"): lambda payload: self._handle_ris_status(payload),
} }
def _update_last_payload(self, flow_id: str, payload: bytearray): def _update_last_payload(self, flow_id: str, payload: bytearray):
@ -76,6 +82,36 @@ class DebugPayloadRouter:
with self._lock: with self._lock:
self._latest_payloads[flow_id] = payload self._latest_payloads[flow_id] = payload
def _handle_ris_status(self, payload: bytearray):
"""Try to parse a RIS status payload and store a concise summary.
If parsing fails, store the raw payload as before.
"""
try:
if len(payload) >= SfpRisStatusPayload.size():
# Interpret the first bytes as the status payload
parsed = SfpRisStatusPayload.from_buffer_copy(bytes(payload[: SfpRisStatusPayload.size()]))
sc = parsed.scenario
# Build a concise textual summary
summary = (
f"timetag={sc.timetag}, az={sc.platform_azimuth:.3f}, vx={sc.vx:.2f}, vy={sc.vy:.2f}, vz={sc.vz:.2f}, lat={sc.latitude:.6f}, lon={sc.longitude:.6f}, heading={sc.true_heading:.3f}"
)
# For targets, include first 4 enabled targets
targets = []
for t in parsed.tgt.tgt[:4]:
if t.flags != 0:
targets.append(f"id? flags={t.flags} h={t.heading:.1f} x={t.x:.1f} y={t.y:.1f} z={t.z:.1f}")
if targets:
summary += "; targets: " + ", ".join(targets)
self._update_last_payload("RIS_STATUS", bytearray(summary.encode("utf-8")))
return
except Exception:
# fall through to storing raw payload
pass
# Fallback: store raw payload
self._update_last_payload("RIS_STATUS", payload)
def get_and_clear_latest_payloads(self) -> Dict[str, bytearray]: def get_and_clear_latest_payloads(self) -> Dict[str, bytearray]:
""" """
Thread-safely retrieves all new payloads received since the last call Thread-safely retrieves all new payloads received since the last call
@ -216,6 +252,8 @@ class SfpDebugWindow(tk.Toplevel):
self.protocol("WM_DELETE_WINDOW", self._on_close) self.protocol("WM_DELETE_WINDOW", self._on_close)
self.after(self.GUI_POLL_INTERVAL_MS, self._process_latest_payloads) self.after(self.GUI_POLL_INTERVAL_MS, self._process_latest_payloads)
# Track last raw update time to throttle high-volume flows
self._last_raw_update_ts = 0.0
def _create_widgets(self): def _create_widgets(self):
# --- Connection Controls (unchanged) --- # --- Connection Controls (unchanged) ---
@ -243,6 +281,29 @@ class SfpDebugWindow(tk.Toplevel):
conn_frame, text="Image size...", command=self._open_image_size_dialog conn_frame, text="Image size...", command=self._open_image_size_dialog
) )
self.image_size_btn.pack(side=tk.LEFT, padx=5) self.image_size_btn.pack(side=tk.LEFT, padx=5)
# Button to send a simple UDP probe to the configured IP:Port
self.send_probe_btn = ttk.Button(
conn_frame, text="Send probe", command=self._on_send_probe
)
self.send_probe_btn.pack(side=tk.LEFT, padx=5)
# Button to send a minimal SFP ACK packet to the configured IP:Port
self.send_ack_btn = ttk.Button(
conn_frame, text="Send ACK", command=self._on_send_ack
)
self.send_ack_btn.pack(side=tk.LEFT, padx=5)
# --- Script Sender Frame (below connection) ---
script_frame = ttk.Frame(self)
script_frame.pack(side=tk.TOP, fill=tk.X, padx=5, pady=(0, 5))
ttk.Label(script_frame, text="Script to send:").pack(side=tk.LEFT, padx=(5, 2))
self.script_var = tk.StringVar(value="print('hello from client')")
ttk.Entry(script_frame, textvariable=self.script_var, width=60).pack(
side=tk.LEFT, padx=(0, 5)
)
self.send_script_btn = ttk.Button(
script_frame, text="Send script", command=self._on_send_script
)
self.send_script_btn.pack(side=tk.LEFT, padx=5)
# --- Data Display Notebook (unchanged) --- # --- Data Display Notebook (unchanged) ---
self.notebook = ttk.Notebook(self) self.notebook = ttk.Notebook(self)
@ -470,6 +531,173 @@ class SfpDebugWindow(tk.Toplevel):
except Exception: except Exception:
pass pass
def _on_send_probe(self):
"""Sends a small UDP probe to the configured IP:port to "wake" the server.
The server expects any message on its listening port to begin sending SFP
messages, so we just send a short datagram. This function is intentionally
lightweight and does not depend on self.sfp_transport; it uses a temporary
UDP socket so it can be invoked even when not connected/listening.
"""
ip = self.ip_var.get()
try:
port = int(self.port_var.get())
except Exception:
self._log_to_widget("ERROR: Invalid port number for probe.", "ERROR")
return
probe_payload = b"SFP_PROBE\n" # simple payload; server will accept any data
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1.0)
sock.sendto(probe_payload, (ip, port))
sock.close()
self._log_to_widget(f"Sent probe to {ip}:{port}", "INFO")
except Exception as e:
self._log_to_widget(f"Failed to send probe to {ip}:{port}: {e}", "ERROR")
def _on_send_ack(self):
"""Constructs a minimal SFP ACK header and sends it to the server.
Uses the active transport socket when available so the packet originates
from the same local port; otherwise uses a temporary UDP socket.
"""
ip = self.ip_var.get()
try:
port = int(self.port_var.get())
except Exception:
self._log_to_widget("ERROR: Invalid port number for ACK.", "ERROR")
return
try:
# Construct a minimal valid SFP data fragment (frag 0 of 1) with a small payload.
payload = b"SFP_WAKE" # small payload so server sees valid metadata
hdr = SFPHeader()
# Direction: normal data (keep 0 for unspecified) or use '<' if needed by server
hdr.SFP_DIRECTION = 0x3C
hdr.SFP_FLOW = ord("M") if isinstance("M", str) else 0
hdr.SFP_TID = 1
# No special flags except zero; server expects total_frags > 0
hdr.SFP_FLAGS = 0x00
hdr.SFP_WIN = 32
# Fragment metadata: this is the only fragment
hdr.SFP_TOTFRGAS = 1
hdr.SFP_FRAG = 0
hdr.SFP_PLSIZE = len(payload)
hdr.SFP_PLOFFSET = 0
hdr.SFP_TOTSIZE = len(payload)
pkt = bytes(hdr) + payload
# Prefer to reuse the SfpTransport socket if available so source port matches
sock = None
used_temp_sock = False
if self.sfp_transport and getattr(self.sfp_transport, '_socket', None):
try:
sock = self.sfp_transport._socket
except Exception:
sock = None
if not sock:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
used_temp_sock = True
sock.sendto(pkt, (ip, port))
if used_temp_sock:
sock.close()
self._log_to_widget(f"Sent SFP data-fragment to {ip}:{port} (flow=M,t id=1)", "INFO")
except Exception as e:
self._log_to_widget(f"Failed to send SFP fragment to {ip}:{port}: {e}", "ERROR")
def _on_send_script(self):
"""Constructs a script_message_t-like payload and sends it to the server.
The server expects a data tag with tag 'C','S' and type_validity set.
We'll build the payload using ctypes to match layout and send it using
the transport socket (if available) so the server treats us as the client.
"""
ip = self.ip_var.get()
try:
port = int(self.port_var.get())
except Exception:
self._log_to_widget("ERROR: Invalid port number for script send.", "ERROR")
return
script_text = self.script_var.get() or ""
# Limit script size to 1020 bytes to be conservative (server has ~1024)
script_bytes = script_text.encode("utf-8")
max_script = 1020
if len(script_bytes) > max_script:
script_bytes = script_bytes[:max_script]
# Local ctypes definitions that mirror what the C++ server expects
class LocalDataTag(ctypes.Structure):
_pack_ = 1
_fields_ = [
("ID", ctypes.c_uint8 * 2),
("VALID", ctypes.c_uint8),
("VERSION", ctypes.c_uint8),
("SIZE", ctypes.c_uint32),
]
class ScriptPayload(ctypes.Structure):
_pack_ = 1
_fields_ = [
("script_tag", LocalDataTag),
("script", ctypes.c_uint8 * 1024),
]
try:
payload = ScriptPayload()
# set tag ID to 'C','S'
payload.script_tag.ID[0] = ord("C")
payload.script_tag.ID[1] = ord("S")
payload.script_tag.VALID = 1
payload.script_tag.VERSION = 1
payload.script_tag.SIZE = len(script_bytes)
# copy script bytes
for i, b in enumerate(script_bytes):
payload.script[i] = b
# Build SFP header
hdr = SFPHeader()
hdr.SFP_DIRECTION = 0x3C
hdr.SFP_FLOW = ord("R") # use 'R' for RIS script commands
hdr.SFP_TID = 1
hdr.SFP_FLAGS = 0x00
hdr.SFP_WIN = 32
hdr.SFP_TOTFRGAS = 1
hdr.SFP_FRAG = 0
pl_bytes = bytes(payload)
hdr.SFP_PLSIZE = len(pl_bytes)
hdr.SFP_PLOFFSET = 0
hdr.SFP_TOTSIZE = len(pl_bytes)
pkt = bytes(hdr) + pl_bytes
sock = None
used_temp = False
if self.sfp_transport and getattr(self.sfp_transport, "_socket", None):
try:
sock = self.sfp_transport._socket
except Exception:
sock = None
if not sock:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
used_temp = True
sock.sendto(pkt, (ip, port))
if used_temp:
sock.close()
self._log_to_widget(f"Sent script ({len(script_bytes)} bytes) to {ip}:{port}", "INFO")
except Exception as e:
self._log_to_widget(f"Failed to send script to {ip}:{port}: {e}", "ERROR")
def _on_disconnect(self): def _on_disconnect(self):
if self.sfp_transport: if self.sfp_transport:
self._log_to_widget("Disconnecting...", "INFO") self._log_to_widget("Disconnecting...", "INFO")