S1005403_RisCC/target_simulator/gui/sfp_debug_window.py
2025-10-21 12:44:19 +02:00

1703 lines
76 KiB
Python

from target_simulator.gui.ppi_display import PPIDisplay
# target_simulator/gui/sfp_debug_window.py
"""
Provides a Toplevel window for debugging the SFP transport layer.
This version uses a sampling approach to handle high-frequency data streams
without overwhelming the GUI thread.
"""
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import logging
import threading
import collections
import datetime
import os
import ctypes
import time
import math
from typing import Dict, Callable, Optional, Any
import socket
# Third-party imports for image display
try:
from PIL import Image, ImageTk
import numpy as np
import cv2
_IMAGE_LIBS_AVAILABLE = True
except ImportError:
_IMAGE_LIBS_AVAILABLE = False
# Imports from the project structure
from target_simulator.core.sfp_transport import SfpTransport
from target_simulator.core.sfp_structures import (
ImageLeaderData,
SFPHeader,
SfpRisStatusPayload,
)
from target_simulator.gui.payload_router import DebugPayloadRouter
from target_simulator.core.models import (
Target,
Waypoint,
ManeuverType,
KNOTS_TO_FPS,
)
from target_simulator.core import command_builder
# default value for testing fdx protocolo
DEF_TEST_ID = 1
DEF_TEST_RANGE = 30.0
DEF_TEST_AZIMUTH = 10.0
DEF_TEST_VELOCITY = 300.0
DEF_TEST_HEADING = 0.0
DEF_TEST_ALTITUDE = 10000.0
class SfpDebugWindow(tk.Toplevel):
"""Top-level window for SFP debugging and payload inspection."""
GUI_POLL_INTERVAL_MS = 250
def __init__(self, master=None):
super().__init__(master)
self.master = master
try:
self.geometry("1100x700")
except Exception:
pass
self.logger = logging.getLogger(__name__)
self.payload_router = DebugPayloadRouter()
self.sfp_transport: Optional[SfpTransport] = None
self.image_area_size = 150
self._ppi_visible = False
# --- TK Variables ---
self.ip_var = tk.StringVar(value="127.0.0.1")
# Local port to bind the client socket (where server will send status)
self.local_port_var = tk.StringVar(value="60002")
# Server port where we send script/command messages
self.server_port_var = tk.StringVar(value="60001")
self.script_var = tk.StringVar(value="print('hello from client')")
# Variables for the new target sender UI
self.tgt_id_var = tk.IntVar(value=DEF_TEST_ID)
self.tgt_range_var = tk.DoubleVar(value=DEF_TEST_RANGE)
self.tgt_az_var = tk.DoubleVar(value=DEF_TEST_AZIMUTH)
self.tgt_alt_var = tk.DoubleVar(value=DEF_TEST_ALTITUDE)
self.tgt_vel_var = tk.DoubleVar(value=DEF_TEST_VELOCITY)
self.tgt_hdg_var = tk.DoubleVar(value=DEF_TEST_HEADING)
self.tgt_active_var = tk.BooleanVar(value=True)
self.tgt_traceable_var = tk.BooleanVar(value=True)
self.tgt_restart_var = tk.BooleanVar(value=False)
self._master_mode_names = [
"idle_master_mode",
"int_bit_master_mode",
"gm_master_mode",
"dbs_master_mode",
"rws_master_mode",
"vs_master_mode",
"acm_master_mode",
"tws_master_mode",
"sea_low_master_mode",
"sea_high_master_mode",
"gmti_master_mode",
"bcn_master_mode",
"sam_master_mode",
"ta_master_mode",
"wa_master_mode",
"stt_master_mode",
"dtt_master_mode",
"sstt_master_mode",
"acq_master_mode",
"ftt_master_mode",
"agr_master_mode",
"sar_master_mode",
"invalid_master_mode_",
"xtst_dummy_mode",
"xtst_hw_validation_mode",
"boot_master_mode",
"master_mode_id_cardinality_",
]
# --- UI Construction ---
self._create_widgets()
# Start the periodic GUI poll loop
try:
self.after(self.GUI_POLL_INTERVAL_MS, self._process_latest_payloads)
except Exception:
pass
def _create_widgets(self):
# --- Top Controls Container ---
top_controls_frame = ttk.Frame(self)
top_controls_frame.pack(side=tk.TOP, fill=tk.X, padx=5, pady=5)
# --- Connection Frame ---
conn_frame = ttk.Frame(top_controls_frame)
conn_frame.pack(side=tk.TOP, fill=tk.X, pady=(0, 5))
self._create_connection_widgets(conn_frame)
# --- Simple Target Sender Frame ---
target_sender_frame = ttk.LabelFrame(
top_controls_frame, text="Simple Target Sender"
)
target_sender_frame.pack(side=tk.TOP, fill=tk.X, pady=(5, 5))
self._create_target_sender_widgets(target_sender_frame)
# --- Script Sender Frame (optional, can be removed if not needed) ---
script_frame = ttk.LabelFrame(top_controls_frame, text="Script to send")
script_frame.pack(side=tk.TOP, fill=tk.X, pady=(0, 5))
self._create_script_sender_widgets(script_frame)
# --- Data Display Notebook ---
self.notebook = ttk.Notebook(self)
self.notebook.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self._create_notebook_tabs()
# (PPI widget is created inside the RIS tab right pane; see _create_notebook_tabs)
def _toggle_ppi(self):
# Swap RIS table and RIS PPI container
try:
if self._ppi_visible:
# show table, hide ppi
self.ris_ppi_container.pack_forget()
self.ris_table_container.pack(fill=tk.BOTH, expand=True)
self._ppi_visible = False
self.ppi_toggle_btn.config(text="Show PPI Map")
else:
# hide table, show ppi
self.ris_table_container.pack_forget()
self.ris_ppi_container.pack(fill=tk.BOTH, expand=True)
self._ppi_visible = True
self.ppi_toggle_btn.config(text="Hide PPI Map")
except Exception:
# Fallback: if containers are missing, do nothing
self.logger.exception("Toggle PPI failed")
def update_ppi_targets(self, targets):
try:
if self.ris_ppi_widget:
self.ris_ppi_widget.update_targets(targets)
except Exception:
self.logger.exception("Failed to update RIS PPI targets")
def on_ris_status_update(self, ris_status_payload):
# Convert RIS targets to Target objects (minimal, for display)
targets = []
for i in range(getattr(ris_status_payload.tgt, "tgt", []).__len__()):
ris_tgt = ris_status_payload.tgt.tgt[i]
# Only show if valid/active (customize as needed)
if getattr(ris_tgt, "flags", 0) & 1:
t = Target(
target_id=i,
trajectory=[],
active=True,
traceable=True,
)
t.current_range_nm = (ris_tgt.x**2 + ris_tgt.y**2) ** 0.5 / 6076.12
try:
# Use atan2(y, x) so azimuth follows the conventional (x East, y North)
t.current_azimuth_deg = math.degrees(
math.atan2(ris_tgt.y, ris_tgt.x)
)
except Exception:
t.current_azimuth_deg = 0.0
t.current_altitude_ft = ris_tgt.z
t.current_heading_deg = getattr(ris_tgt, "heading", 0.0)
try:
self._log_to_widget(
f"RIS JSON heading raw[{i}]: {getattr(ris_tgt, 'heading', None)}",
"DEBUG",
)
except Exception:
pass
t.active = True
targets.append(t)
self.update_ppi_targets(targets)
def _create_connection_widgets(self, parent):
ttk.Label(parent, text="IP:").pack(side=tk.LEFT, padx=(4, 2))
ttk.Entry(parent, textvariable=self.ip_var, width=18).pack(
side=tk.LEFT, padx=(0, 6)
)
ttk.Label(parent, text="Local Port:").pack(side=tk.LEFT, padx=(0, 2))
ttk.Entry(parent, textvariable=self.local_port_var, width=8).pack(
side=tk.LEFT, padx=(0, 6)
)
ttk.Label(parent, text="Server Port:").pack(side=tk.LEFT, padx=(0, 2))
ttk.Entry(parent, textvariable=self.server_port_var, width=8).pack(
side=tk.LEFT, padx=(0, 6)
)
self.connect_btn = ttk.Button(parent, text="Connect", command=self._on_connect)
self.connect_btn.pack(side=tk.LEFT, padx=(0, 6))
self.disconnect_btn = ttk.Button(
parent, text="Disconnect", command=self._on_disconnect, state=tk.DISABLED
)
self.disconnect_btn.pack(side=tk.LEFT, padx=(0, 6))
self.send_probe_btn = ttk.Button(
parent, text="Send Probe", command=self._on_send_probe
)
self.send_probe_btn.pack(side=tk.LEFT, padx=(6, 4))
# Quick commands frame
quick_frame = ttk.Frame(parent)
quick_frame.pack(side=tk.RIGHT)
# Always prefix commands with '$' to satisfy server's mex parser
# Quick command buttons moved to the Simple Target Sender area
def _create_target_sender_widgets(self, parent):
grid = ttk.Frame(parent, padding=5)
grid.pack(fill=tk.X)
# Configure columns to have padding between them
grid.columnconfigure(1, pad=15)
grid.columnconfigure(3, pad=15)
grid.columnconfigure(5, pad=15)
# --- Column 0 ---
ttk.Label(grid, text="ID:").grid(row=0, column=0, sticky=tk.W)
ttk.Spinbox(grid, from_=0, to=15, textvariable=self.tgt_id_var, width=8).grid(
row=1, column=0, sticky=tk.W
)
# --- Column 1 ---
ttk.Label(grid, text="Range (NM):").grid(row=0, column=1, sticky=tk.W)
ttk.Spinbox(
grid, from_=0, to=500, textvariable=self.tgt_range_var, width=10
).grid(row=1, column=1, sticky=tk.W)
# --- Column 2 ---
ttk.Label(grid, text="Azimuth (°):").grid(row=0, column=2, sticky=tk.W)
ttk.Spinbox(
grid, from_=-180, to=180, textvariable=self.tgt_az_var, width=10
).grid(row=1, column=2, sticky=tk.W)
# --- Column 3 ---
ttk.Label(grid, text="Velocity (kn):").grid(row=0, column=3, sticky=tk.W)
ttk.Spinbox(
grid, from_=0, to=2000, textvariable=self.tgt_vel_var, width=10
).grid(row=1, column=3, sticky=tk.W)
# --- Column 4 ---
ttk.Label(grid, text="Heading (°):").grid(row=0, column=4, sticky=tk.W)
ttk.Spinbox(
grid, from_=0, to=360, textvariable=self.tgt_hdg_var, width=10
).grid(row=1, column=4, sticky=tk.W)
# --- Column 5 ---
ttk.Label(grid, text="Altitude (ft):").grid(row=0, column=5, sticky=tk.W)
ttk.Spinbox(
grid, from_=0, to=80000, textvariable=self.tgt_alt_var, width=12
).grid(row=1, column=5, sticky=tk.W)
# --- Column 6 (Controls) ---
controls_frame = ttk.Frame(grid)
controls_frame.grid(row=1, column=6, sticky="nsew", padx=(20, 0))
ttk.Checkbutton(
controls_frame, text="Active", variable=self.tgt_active_var
).pack(side=tk.LEFT, anchor="w")
ttk.Checkbutton(
controls_frame, text="Traceable", variable=self.tgt_traceable_var
).pack(side=tk.LEFT, anchor="w", padx=5)
ttk.Checkbutton(
controls_frame, text="Restart", variable=self.tgt_restart_var
).pack(side=tk.LEFT, anchor="w", padx=5)
send_button = ttk.Button(
controls_frame, text="Send Target", command=self._on_send_target
)
send_button.pack(side=tk.LEFT, padx=(10, 0))
# --- Quick command buttons (moved here from connection frame) ---
quick_cmd_frame = ttk.Frame(parent)
quick_cmd_frame.pack(fill=tk.X, pady=(6, 0))
ttk.Button(
quick_cmd_frame,
text="tgtreset",
command=lambda: self._on_send_simple_command(
command_builder.build_tgtreset()
),
).pack(side=tk.LEFT, padx=4)
ttk.Button(
quick_cmd_frame,
text="pause",
command=lambda: self._on_send_simple_command(command_builder.build_pause()),
).pack(side=tk.LEFT, padx=4)
ttk.Button(
quick_cmd_frame,
text="continue",
command=lambda: self._on_send_simple_command(
command_builder.build_continue()
),
).pack(side=tk.LEFT, padx=4)
ttk.Button(
quick_cmd_frame, text="tgtset (cur)", command=lambda: self._on_send_tgtset()
).pack(side=tk.LEFT, padx=8)
# PPI toggle button (moved here)
self.ppi_toggle_btn = ttk.Button(
quick_cmd_frame, text="Show PPI Map", command=self._toggle_ppi
)
self.ppi_toggle_btn.pack(side=tk.RIGHT, padx=4)
def _create_script_sender_widgets(self, parent):
ttk.Label(parent, text="Script to send:").pack(side=tk.LEFT, padx=(5, 2))
ttk.Entry(parent, textvariable=self.script_var, width=60).pack(
side=tk.LEFT, expand=True, fill=tk.X, padx=(0, 5)
)
self.send_script_btn = ttk.Button(
parent, text="Send script", command=self._on_send_script
)
self.send_script_btn.pack(side=tk.LEFT, padx=5)
def _create_notebook_tabs(self):
# The implementation of tab creation is unchanged, so it's omitted for brevity
# but would be here in the full file.
self.log_tab = scrolledtext.ScrolledText(
self.notebook, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 9)
)
self.notebook.add(self.log_tab, text="Raw Log")
# ... (rest of the tab creation code is identical) ...
# --- (The rest of the file content remains the same as your provided version) ---
if _IMAGE_LIBS_AVAILABLE:
self.mfd_tab = self._create_image_tab("MFD Image")
self.notebook.add(self.mfd_tab["frame"], text="MFD Image")
self.sar_tab = self._create_image_tab("SAR Image")
self.notebook.add(self.sar_tab["frame"], text="SAR Image")
ris_frame = ttk.Frame(self.notebook)
paned = ttk.Panedwindow(ris_frame, orient=tk.HORIZONTAL)
paned.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
left = ttk.Frame(paned)
self.scenario_tree = ttk.Treeview(
left, columns=("field", "value"), show="headings", height=12
)
self.scenario_tree.heading("field", text="Field")
self.scenario_tree.heading("value", text="Value")
self.scenario_tree.column("field", width=140, anchor="w")
self.scenario_tree.column("value", width=160, anchor="w")
self.scenario_tree.pack(fill=tk.BOTH, expand=True)
paned.add(left, weight=1)
right = ttk.Frame(paned)
# Containers: one for the numeric table, one for the PPI (swapable)
self.ris_table_container = ttk.Frame(right)
self.ris_ppi_container = ttk.Frame(right)
cols = ("idx", "flags", "heading", "x", "y", "z")
self.ris_tree = ttk.Treeview(
self.ris_table_container, columns=cols, show="headings", height=12
)
for c, txt in zip(cols, ("#", "flags", "heading", "x", "y", "z")):
self.ris_tree.heading(c, text=txt)
self.ris_tree.column(c, width=70, anchor="center")
try:
style = ttk.Style()
small_font = ("Consolas", 8)
style.configure("Small.Treeview", font=small_font)
self.ris_tree.configure(style="Small.Treeview")
self.scenario_tree.configure(style="Small.Treeview")
except Exception:
pass
self.ris_tree.pack(fill=tk.BOTH, expand=True)
# Initially show table container, keep ppi container ready (hidden)
self.ris_table_container.pack(fill=tk.BOTH, expand=True)
paned.add(right, weight=2)
# Create PPI widget inside ris_ppi_container but keep it hidden
try:
self.ris_ppi_widget = PPIDisplay(self.ris_ppi_container, max_range_nm=100)
self.ris_ppi_widget.pack(fill=tk.BOTH, expand=True)
except Exception:
self.ris_ppi_widget = None
btn_frame = ttk.Frame(ris_frame)
btn_frame.pack(fill=tk.X, padx=5, pady=(0, 5))
self.scenario_view_mode = tk.StringVar(value="simplified")
mode_frame = ttk.Frame(btn_frame)
mode_frame.pack(side=tk.LEFT, padx=(4, 0))
ttk.Label(mode_frame, text="View:").pack(side=tk.LEFT, padx=(0, 6))
ttk.Radiobutton(
mode_frame, text="Raw", value="raw", variable=self.scenario_view_mode
).pack(side=tk.LEFT)
ttk.Radiobutton(
mode_frame,
text="Simplified",
value="simplified",
variable=self.scenario_view_mode,
).pack(side=tk.LEFT)
self.simplified_decimals = tk.IntVar(value=4)
ttk.Label(mode_frame, text=" Decimals:").pack(side=tk.LEFT, padx=(8, 2))
try:
sp = tk.Spinbox(
mode_frame,
from_=0,
to=8,
width=3,
textvariable=self.simplified_decimals,
)
sp.pack(side=tk.LEFT)
except Exception:
e = ttk.Entry(mode_frame, textvariable=self.simplified_decimals, width=3)
e.pack(side=tk.LEFT)
self.ris_save_csv_btn = ttk.Button(
btn_frame, text="Save CSV", command=lambda: self._on_save_ris_csv()
)
self.ris_save_csv_btn.pack(side=tk.RIGHT)
self.notebook.add(ris_frame, text="RIS Status")
raw_frame = ttk.Frame(self.notebook)
history_frame = ttk.Frame(raw_frame, width=380)
history_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(5, 2), pady=5)
ttk.Label(history_frame, text="History (latest)").pack(anchor=tk.W, padx=4)
try:
history_font = ("Consolas", 8)
except Exception:
history_font = None
list_container = ttk.Frame(history_frame)
list_container.pack(fill=tk.BOTH, expand=True, padx=4, pady=(2, 4))
columns = ("ts", "flow", "tid", "size")
self.history_tree = ttk.Treeview(
list_container, columns=columns, show="headings", height=20
)
self.history_tree.heading("ts", text="Timestamp")
self.history_tree.heading("flow", text="Flow")
self.history_tree.heading("tid", text="TID")
self.history_tree.heading("size", text="Size")
self.history_tree.column("ts", width=100, anchor="w")
self.history_tree.column("flow", width=50, anchor="w")
self.history_tree.column("tid", width=40, anchor="center")
self.history_tree.column("size", width=50, anchor="e")
try:
style = ttk.Style()
style.configure("Small.Treeview", font=history_font)
self.history_tree.configure(style="Small.Treeview")
except Exception:
pass
self.history_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self.history_vscroll = ttk.Scrollbar(
list_container, orient=tk.VERTICAL, command=self.history_tree.yview
)
self.history_vscroll.pack(side=tk.RIGHT, fill=tk.Y)
self.history_tree.config(yscrollcommand=self.history_vscroll.set)
hb_frame = ttk.Frame(history_frame)
hb_frame.pack(fill=tk.X, padx=4, pady=(4, 4))
self.history_settings_btn = ttk.Button(
hb_frame,
text="Settings",
command=lambda: self._open_history_settings_dialog(),
)
self.history_settings_btn.pack(side=tk.LEFT)
self.history_clear_btn = ttk.Button(
hb_frame, text="Clear", command=lambda: self._on_clear_history()
)
self.history_clear_btn.pack(side=tk.RIGHT)
self.raw_tab_text = scrolledtext.ScrolledText(
raw_frame, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 9)
)
self.raw_tab_text.pack(
side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(2, 5), pady=5
)
try:
self.notebook.insert(1, raw_frame, text="SFP Raw")
except Exception:
self.notebook.add(raw_frame, text="SFP Raw")
try:
self.raw_tab_text.tag_config(
"flag_set", background="#d4ffd4", foreground="#006400"
)
self.raw_tab_text.tag_config(
"flag_unset", background="#f0f0f0", foreground="#808080"
)
self.raw_tab_text.tag_config(
"flag_error", background="#ffd4d4", foreground="#800000"
)
self.raw_tab_text.tag_config("hdr_field", foreground="#000080")
except Exception:
pass
self.bin_tab = scrolledtext.ScrolledText(
self.notebook, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 10)
)
self.notebook.add(self.bin_tab, text="Binary (Hex)")
self.json_tab = scrolledtext.ScrolledText(
self.notebook, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 10)
)
self.notebook.add(self.json_tab, text="JSON")
def _on_send_target(self):
"""Callback to build and send a tgtinit command for a simple target."""
# 1. Collect data from UI
ip = self.ip_var.get()
port = int(self.server_port_var.get())
destination = (ip, port)
target_id = self.tgt_id_var.get()
range_nm = self.tgt_range_var.get()
az_deg = self.tgt_az_var.get()
alt_ft = self.tgt_alt_var.get()
vel_kn = self.tgt_vel_var.get()
hdg_deg = self.tgt_hdg_var.get()
is_active = self.tgt_active_var.get()
is_traceable = self.tgt_traceable_var.get()
is_restart = self.tgt_restart_var.get()
self._log_to_widget(
f"DEBUG: is_active={is_active}, is_traceable={is_traceable}, is_restart={is_restart}",
"DEBUG",
)
if not self.sfp_transport or not self.sfp_transport._socket:
self._log_to_widget("ERROR: Cannot send target, not connected.", "ERROR")
messagebox.showerror(
"Connection Error",
"SFP transport is not connected. Please connect first.",
parent=self,
)
return
try:
# 1. Collect data from UI
ip = self.ip_var.get()
port = int(self.server_port_var.get())
destination = (ip, port)
target_id = self.tgt_id_var.get()
range_nm = self.tgt_range_var.get()
az_deg = self.tgt_az_var.get()
alt_ft = self.tgt_alt_var.get()
vel_kn = self.tgt_vel_var.get()
hdg_deg = self.tgt_hdg_var.get()
is_active = self.tgt_active_var.get()
is_traceable = self.tgt_traceable_var.get()
is_restart = self.tgt_restart_var.get()
vel_fps = vel_kn * KNOTS_TO_FPS
# 2. Create a temporary Target object to feed the command builder
initial_waypoint = Waypoint(
maneuver_type=ManeuverType.FLY_TO_POINT,
target_range_nm=range_nm,
target_azimuth_deg=az_deg,
target_altitude_ft=alt_ft,
target_velocity_fps=vel_fps,
target_heading_deg=hdg_deg,
)
temp_target = Target(
target_id=target_id,
trajectory=[initial_waypoint],
)
# Imposta le proprietà dopo reset_simulation
temp_target.active = is_active
temp_target.traceable = is_traceable
temp_target.restart = is_restart
temp_target.current_range_nm = range_nm
temp_target.current_azimuth_deg = az_deg
temp_target.current_velocity_fps = vel_fps
temp_target.current_heading_deg = hdg_deg
temp_target.current_altitude_ft = alt_ft
# 3. Build the command string
command_str = command_builder.build_tgtinit(temp_target)
# Ensure the command is trimmed, prefixed with '$' and terminated with a newline
command_str = command_str.strip()
# if not command_str.startswith("$"):
# command_str = "$" + command_str.lstrip()
if not command_str.endswith("\n"):
command_str = command_str + "\n"
self._log_to_widget(f"Built command: {command_str!r}", "INFO")
# 4. Send using the transport layer
success = self.sfp_transport.send_script_command(command_str, destination)
if success:
self._log_to_widget(
f"Successfully sent command for target {target_id}.", "INFO"
)
else:
self._log_to_widget(
f"Failed to send command for target {target_id}.", "ERROR"
)
except (ValueError, tk.TclError) as e:
error_msg = f"Invalid input value: {e}"
self._log_to_widget(f"ERROR: {error_msg}", "ERROR")
messagebox.showerror("Input Error", error_msg, parent=self)
except Exception as e:
self.logger.exception("An unexpected error occurred in _on_send_target.")
self._log_to_widget(f"ERROR: {e}", "CRITICAL")
messagebox.showerror("Unexpected Error", str(e), parent=self)
# --- (The rest of the file content remains the same) ---
def _create_image_tab(self, title: str) -> Dict:
frame = ttk.Frame(self.notebook)
image_container = ttk.Frame(
frame,
width=self.image_area_size,
height=self.image_area_size,
relief=tk.SUNKEN,
)
image_container.pack(pady=5, padx=5)
image_container.pack_propagate(False)
image_label = ttk.Label(
image_container, text=f"Waiting for {title}...", anchor=tk.CENTER
)
image_label.pack(fill=tk.BOTH, expand=True)
hex_view = scrolledtext.ScrolledText(
frame, height=8, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 9)
)
hex_view.pack(fill=tk.BOTH, expand=True, pady=5, padx=5)
return {
"frame": frame,
"image_label": image_label,
"hex_view": hex_view,
"image_container": image_container,
}
def _open_image_size_dialog(self):
dlg = tk.Toplevel(self)
dlg.title("Image Size")
dlg.transient(self)
dlg.grab_set()
ttk.Label(dlg, text="Image area size (px):").pack(padx=10, pady=(10, 2))
size_var = tk.StringVar(value=str(self.image_area_size))
entry = ttk.Entry(dlg, textvariable=size_var, width=8)
entry.pack(padx=10, pady=(0, 10))
btn_frame = ttk.Frame(dlg)
btn_frame.pack(padx=10, pady=(0, 10))
def on_save():
try:
v = int(size_var.get())
if v <= 0:
raise ValueError()
except Exception:
message = "Please enter a positive integer for image size."
try:
tk.messagebox.showerror("Invalid value", message, parent=dlg)
except Exception:
pass
return
self.image_area_size = v
for tab in (getattr(self, "mfd_tab", None), getattr(self, "sar_tab", None)):
if tab and "image_container" in tab:
tab["image_container"].config(width=v, height=v)
gm = getattr(self.master, "config_manager", None)
if gm:
general = gm.get_general_settings() or {}
image_display = general.get("image_display", {})
image_display["size"] = v
general["image_display"] = image_display
gm.save_general_settings(general)
dlg.destroy()
def on_cancel():
dlg.destroy()
ttk.Button(btn_frame, text="Cancel", command=on_cancel).pack(
side=tk.RIGHT, padx=(0, 5)
)
ttk.Button(btn_frame, text="Save", command=on_save).pack(side=tk.RIGHT)
def _on_connect(self):
ip = self.ip_var.get()
try:
port = int(self.local_port_var.get())
except ValueError:
self._log_to_widget("ERROR: Invalid port number.", "ERROR")
return
self._log_to_widget(f"Attempting to connect to {ip}:{port}...")
ack_config = {ord("M"): 32, ord("S"): 16}
self.sfp_transport = SfpTransport(
host=ip,
port=port,
payload_handlers=self.payload_router.get_handlers(),
ack_config=ack_config,
raw_packet_callback=self.payload_router.update_raw_packet,
)
if self.sfp_transport.start():
self._log_to_widget(
"Connection successful. Listening for packets...", "INFO"
)
self.connect_btn.config(state=tk.DISABLED)
self.disconnect_btn.config(state=tk.NORMAL)
else:
self._log_to_widget("Connection failed. Check IP/Port and logs.", "ERROR")
self.sfp_transport = None
try:
self.history_tree.bind(
"<<TreeviewSelect>>", lambda e: self._on_history_select()
)
except Exception:
pass
def _on_send_probe(self):
ip = self.ip_var.get()
try:
port = int(self.server_port_var.get())
except Exception:
self._log_to_widget("ERROR: Invalid port number for probe.", "ERROR")
return
probe_payload = b"SFP_PROBE\n"
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1.0)
sock.sendto(probe_payload, (ip, port))
sock.close()
self._log_to_widget(f"Sent probe to {ip}:{port}", "INFO")
except Exception as e:
self._log_to_widget(f"Failed to send probe to {ip}:{port}: {e}", "ERROR")
def _on_send_script(self):
if not self.sfp_transport or not self.sfp_transport._socket:
self._log_to_widget("ERROR: Cannot send script, not connected.", "ERROR")
return
try:
ip = self.ip_var.get()
port = int(self.server_port_var.get())
destination = (ip, port)
command_str = self.script_var.get()
command_str = command_str.strip()
# if command_str and not command_str.startswith("$"):
# command_str = "$" + command_str.lstrip()
if command_str and not command_str.endswith("\n"):
command_str = command_str + "\n"
self.sfp_transport.send_script_command(command_str, destination)
except (ValueError, tk.TclError) as e:
self._log_to_widget(
f"ERROR: Invalid input for script sending: {e}", "ERROR"
)
def _on_send_simple_command(self, command_str: str):
"""Send a simple script command string to the configured server port.
Validates transport/socket and logs the result.
"""
if not self.sfp_transport or not getattr(self.sfp_transport, "_socket", None):
self._log_to_widget("ERROR: Cannot send command, not connected.", "ERROR")
messagebox.showerror(
"Connection Error",
"SFP transport is not connected. Please connect first.",
parent=self,
)
return False
try:
ip = self.ip_var.get()
port = int(self.server_port_var.get())
destination = (ip, port)
command_str = (command_str or "").strip()
# Always prefix with '$' (no space after $)
# if not command_str.startswith("$"):
# command_str = "$" + command_str.lstrip()
if command_str and not command_str.endswith("\n"):
command_str = command_str + "\n"
self._log_to_widget(
f"Sending command to {destination}: {command_str!r}", "INFO"
)
success = self.sfp_transport.send_script_command(command_str, destination)
if success:
self._log_to_widget(f"Successfully sent command: {command_str}", "INFO")
else:
self._log_to_widget(f"Failed to send command: {command_str}", "ERROR")
return success
except Exception as e:
self.logger.exception("Unexpected error in _on_send_simple_command")
self._log_to_widget(f"ERROR: {e}", "ERROR")
return False
def _on_send_tgtset(self):
"""Build and send a 'tgtset' command using current UI values."""
try:
target_id = self.tgt_id_var.get()
range_nm = self.tgt_range_var.get()
az_deg = self.tgt_az_var.get()
alt_ft = self.tgt_alt_var.get()
vel_kn = self.tgt_vel_var.get()
hdg_deg = self.tgt_hdg_var.get()
vel_fps = vel_kn * KNOTS_TO_FPS
is_active = self.tgt_active_var.get()
is_traceable = self.tgt_traceable_var.get()
updates = {
"range_nm": f"{range_nm:.2f}",
"azimuth_deg": f"{az_deg:.2f}",
"velocity_fps": f"{vel_fps:.2f}",
"heading_deg": f"{hdg_deg:.2f}",
"altitude_ft": f"{alt_ft:.2f}",
"active": is_active,
"traceable": is_traceable,
}
command_str = command_builder.build_tgtset_selective(target_id, updates)
command_str = command_str.strip()
if command_str and not command_str.endswith("\n"):
command_str = command_str + "\n"
return self._on_send_simple_command(command_str)
except (ValueError, tk.TclError) as e:
self._log_to_widget(f"ERROR: Invalid input for tgtset: {e}", "ERROR")
return False
def _on_disconnect(self):
if self.sfp_transport:
self._log_to_widget("Disconnecting...", "INFO")
self.sfp_transport.shutdown()
self.sfp_transport = None
self.connect_btn.config(state=tk.NORMAL)
self.disconnect_btn.config(state=tk.DISABLED)
self._log_to_widget("Disconnected.", "INFO")
def _on_close(self):
self.logger.info("SFP Debug Window closing.")
self._on_disconnect()
self.destroy()
def _process_latest_payloads(self):
new_payloads = self.payload_router.get_and_clear_latest_payloads()
if new_payloads:
self._log_to_widget(
f"Processing {len(new_payloads)} new payload(s) for flows: {list(new_payloads.keys())}"
)
for flow_id, payload in new_payloads.items():
if flow_id == "MFD" and _IMAGE_LIBS_AVAILABLE:
self._display_image_data(payload, self.mfd_tab, "mfd_photo")
elif flow_id == "SAR" and _IMAGE_LIBS_AVAILABLE:
self._display_image_data(payload, self.sar_tab, "sar_photo")
elif flow_id == "BIN":
self._display_hex_data(payload, self.bin_tab)
elif flow_id == "JSON":
self._display_json_data(payload, self.json_tab)
elif flow_id == "RIS_STATUS_JSON":
try:
import json
struct = (
json.loads(payload.decode("utf-8"))
if isinstance(payload, (bytes, bytearray))
else payload
)
for iid in self.scenario_tree.get_children():
self.scenario_tree.delete(iid)
scenario = (
struct.get("scenario", {})
if isinstance(struct, dict)
else {}
)
if scenario:
import math
def to_deg(v):
try:
return float(v) * (180.0 / math.pi)
except Exception:
return v
def m_s_to_ft_s(v):
try:
return float(v) * 3.280839895
except Exception:
return v
def m_to_ft(v):
try:
return float(v) * 3.280839895
except Exception:
return v
order = [
("timetag", "timetag", ""),
("platform_azimuth", "platform_azimuth", "°"),
("ant_nav_az", "ant_nav_az", "°"),
("ant_nav_el", "ant_nav_el", "°"),
("flags", "flags", ""),
("mode", "mode", ""),
("vx", "vx", "ft/s"),
("vy", "vy", "ft/s"),
("vz", "vz", "ft/s"),
("baro_altitude", "baro_altitude", "ft"),
("latitude", "latitude", "°"),
("longitude", "longitude", "°"),
("true_heading", "true_heading", "°"),
]
view_mode = (
self.scenario_view_mode.get()
if hasattr(self, "scenario_view_mode")
else "simplified"
)
dec_simp = int(
self.simplified_decimals.get()
if hasattr(self, "simplified_decimals")
else 4
)
def fmt_raw_number(v, key_name=None):
try:
fv = float(v)
if key_name in ("latitude", "longitude"):
return f"{fv:.8f}"
return f"{fv:.6f}"
except Exception:
return str(v)
def fmt_simplified_number(v, unit_str, decimals=4):
try:
fv = float(v)
return (
f"{fv:.{decimals}f} {unit_str}"
if unit_str
else f"{fv:.{decimals}f}"
)
except Exception:
return str(v)
def try_float(v):
try:
return float(v)
except Exception:
return None
def decimal_deg_to_dms(deg, is_lat=True):
try:
d = float(deg)
except Exception:
return str(deg)
if is_lat:
direction = "N" if d >= 0 else "S"
else:
direction = "E" if d >= 0 else "W"
ad = abs(d)
degrees = int(ad)
minutes_full = (ad - degrees) * 60
minutes = int(minutes_full)
seconds = (minutes_full - minutes) * 60
return f"{degrees}°{minutes}'{seconds:.2f} {direction}"
scenario_rows = []
for label, key, unit in order:
if key in scenario:
val = scenario.get(key)
if view_mode == "raw":
if key in (
"platform_azimuth",
"true_heading",
"ant_nav_az",
"ant_nav_el",
):
display_val = fmt_raw_number(val, key)
elif key in ("vx", "vy", "vz", "baro_altitude"):
display_val = fmt_raw_number(val, key)
elif key in ("latitude", "longitude"):
display_val = fmt_raw_number(val, key)
elif key == "flags":
try:
display_val = (
f"{int(val)} (0x{int(val):X})"
)
except Exception:
display_val = str(val)
else:
display_val = str(val)
else:
if key in ("platform_azimuth", "true_heading"):
fv = try_float(val)
if fv is not None:
conv = to_deg(fv)
display_val = fmt_simplified_number(
conv, "°", dec_simp
)
else:
display_val = str(val)
elif key in ("ant_nav_az", "ant_nav_el"):
fv = try_float(val)
if fv is not None:
conv = to_deg(fv)
display_val = fmt_simplified_number(
conv, "°", dec_simp
)
else:
display_val = str(val)
elif key in ("vx", "vy", "vz"):
fv = try_float(val)
if fv is not None:
conv = m_s_to_ft_s(fv)
display_val = fmt_simplified_number(
conv, "ft/s", dec_simp
)
else:
display_val = str(val)
elif key == "baro_altitude":
fv = try_float(val)
if fv is not None:
conv = m_to_ft(fv)
display_val = fmt_simplified_number(
conv, "ft", dec_simp
)
else:
display_val = str(val)
elif key in ("latitude", "longitude"):
fv = try_float(val)
if fv is not None:
is_lat = key == "latitude"
display_val = decimal_deg_to_dms(
fv, is_lat=is_lat
)
else:
display_val = str(val)
elif key == "flags":
try:
display_val = (
f"{int(val)} (0x{int(val):X})"
)
except Exception:
display_val = str(val)
elif key == "mode":
try:
midx = int(val)
if (
0
<= midx
< len(self._master_mode_names)
):
name = self._master_mode_names[midx]
short = name
for suffix in (
"_master_mode",
"_mode",
"_master_mode_",
"_",
):
if short.endswith(suffix):
short = short[
: -len(suffix)
]
short = short.replace(
"master", ""
).strip("_")
display_val = short
else:
display_val = str(midx)
except Exception:
display_val = str(val)
else:
display_val = str(val)
scenario_rows.append((label, display_val))
for l, v in scenario_rows:
self.scenario_tree.insert(
"", tk.END, values=(f"{l}", v)
)
for iid in self.ris_tree.get_children():
self.ris_tree.delete(iid)
targets = (
struct.get("targets", [])
if isinstance(struct, dict)
else []
)
try:
view_mode = (
self.scenario_view_mode.get()
if hasattr(self, "scenario_view_mode")
else "simplified"
)
self.ris_tree.heading("heading", text="heading")
self.ris_tree.heading("x", text="x")
self.ris_tree.heading("y", text="y")
self.ris_tree.heading("z", text="z")
except Exception:
pass
view_mode = (
self.scenario_view_mode.get()
if hasattr(self, "scenario_view_mode")
else "simplified"
)
dec_simp = int(
self.simplified_decimals.get()
if hasattr(self, "simplified_decimals")
else 4
)
for t in targets:
try:
idx = t.get("index")
if idx is None:
idx = t.get("idx")
if idx is None:
idx = t.get("#")
raw_flags = t.get("flags", t.get("flag", 0))
try:
flags_val = int(raw_flags)
flags_display = f"{flags_val} (0x{flags_val:X})"
except Exception:
flags_display = str(raw_flags)
if view_mode == "raw":
hfv = try_float(t.get("heading"))
heading_val = (
f"{hfv:.6f}"
if hfv is not None
else str(t.get("heading"))
)
xfv = try_float(t.get("x"))
yfv = try_float(t.get("y"))
zfv = try_float(t.get("z"))
x_val = (
f"{xfv:.6f}" if xfv is not None else t.get("x")
)
y_val = (
f"{yfv:.6f}" if yfv is not None else t.get("y")
)
z_val = (
f"{zfv:.6f}" if zfv is not None else t.get("z")
)
vals = (
idx,
flags_display,
heading_val,
x_val,
y_val,
z_val,
)
else:
hfv = try_float(t.get("heading"))
if hfv is not None:
heading_deg = hfv * (180.0 / 3.141592653589793)
heading_val = f"{heading_deg:.{dec_simp}f} °"
else:
heading_val = str(t.get("heading"))
xfv = try_float(t.get("x"))
yfv = try_float(t.get("y"))
zfv = try_float(t.get("z"))
x_val = (
f"{xfv:.{dec_simp}f} m"
if xfv is not None
else str(t.get("x"))
)
y_val = (
f"{yfv:.{dec_simp}f} m"
if yfv is not None
else str(t.get("y"))
)
if zfv is not None:
z_val = f"{(zfv * 3.280839895):.{dec_simp}f} ft"
else:
z_val = str(t.get("z"))
vals = (
idx,
flags_display,
heading_val,
x_val,
y_val,
z_val,
)
if (
not isinstance(vals, (list, tuple))
or len(vals) != 6
):
vals = (idx, flags_display, "", "", "", "")
self.ris_tree.insert("", tk.END, values=vals)
except Exception as _e:
try:
self.ris_tree.insert(
"",
tk.END,
values=(
None,
None,
str(t.get("heading")),
str(t.get("x")),
str(t.get("y")),
str(t.get("z")),
),
)
except Exception:
pass
# --- Update PPI with active targets that fit the current PPI scale/sector ---
try:
# Only proceed if we have a PPI widget available
ppi = getattr(self, "ris_ppi_widget", None)
if ppi is not None:
ppi_targets = []
METERS_PER_NM = 1852.0
# Determine current display range from the PPI widget
current_range = (
ppi.range_var.get()
if hasattr(ppi, "range_var")
else ppi.max_range
)
# Debug: show parsed target count and sample raw values
try:
#self._log_to_widget(
# f"PPI: parsed {len(targets)} JSON target(s); current_range={current_range}",
# "DEBUG",
#)
for i, tt in enumerate(targets[:6]):
try:
fx = tt.get("flags", None)
tx = tt.get("x", None)
ty = tt.get("y", None)
tz = tt.get("z", None)
# compute derived quantities
try:
xm = float(tx)
ym = float(ty)
zm = float(tz)
rng_m = (xm * xm + ym * ym) ** 0.5
rng_nm = rng_m / 1852.0
# Use atan2(y, x) for conventional azimuth (x East, y North)
az_deg = math.degrees(
math.atan2(ym, xm)
)
# elevation from z and slant range
elev_rad = (
math.atan2(zm, rng_m)
if rng_m > 0
else 0.0
)
elev_deg = math.degrees(elev_rad)
except Exception:
rng_m = rng_nm = az_deg = elev_deg = (
None
)
#msg = (
# f"PPI RAW[{i}]: flags={fx} x={tx} y={ty} z={tz} | range_m={rng_m:.1f} range_nm={rng_nm:.2f} az={az_deg:.2f}° elev={elev_deg:.2f}°"
# if rng_m is not None
# else f"PPI RAW[{i}]: flags={fx} x={tx} y={ty} z={tz}"
#)
#self._log_to_widget(msg, "DEBUG")
except Exception:
pass
except Exception:
pass
for t in targets:
raw_flags = t.get("flags", 0)
# Only show enabled/active targets (non-zero flags)
if int(raw_flags) == 0:
continue
x = float(t.get("x", 0.0))
y = float(t.get("y", 0.0))
z = float(t.get("z", 0.0))
# Compute range in NM assuming x/y are meters
range_nm = ((x**2 + y**2) ** 0.5) / METERS_PER_NM
if range_nm > current_range:
continue
# Compute azimuth in degrees using atan2(y, x)
az_deg = math.degrees(math.atan2(y, x))
# Heading in JSON is in radians -> convert to degrees
heading = t.get("heading", 0.0)
try:
hdg_deg = float(heading) * (180.0 / math.pi)
except Exception:
hdg_deg = 0.0
tgt = Target(
target_id=int(t.get("index", 0)),
trajectory=[],
active=True,
traceable=True,
)
tgt.current_range_nm = range_nm
tgt.current_azimuth_deg = az_deg
tgt.current_heading_deg = hdg_deg
# convert altitude from meters to feet
try:
tgt.current_altitude_ft = float(z) * 3.280839895
except Exception:
tgt.current_altitude_ft = 0.0
tgt.active = True
ppi_targets.append(tgt)
# Push to PPI (log debug info)
try:
self._log_to_widget(
f"PPI: prepared {len(ppi_targets)} target(s) for display",
"DEBUG",
)
if ppi_targets:
for pt in ppi_targets[:5]:
self._log_to_widget(
f"PPI target sample: id={getattr(pt, 'target_id', None)} r_nm={getattr(pt,'current_range_nm',None):.2f} az={getattr(pt,'current_azimuth_deg',None):.2f} hdg={getattr(pt,'current_heading_deg',None):.2f}",
"DEBUG",
)
self.update_ppi_targets(ppi_targets)
except Exception:
self.logger.exception(
"Failed to push targets to PPI"
)
except Exception:
self.logger.exception(
"Error while preparing RIS targets for PPI"
)
except Exception:
pass
self.after(self.GUI_POLL_INTERVAL_MS, self._process_latest_payloads)
raw_pkt = self.payload_router.get_and_clear_raw_packet()
if raw_pkt:
raw_bytes, addr = raw_pkt
self._display_raw_packet(raw_bytes, addr)
try:
self._refresh_history_tree()
except Exception:
pass
def _refresh_history_tree(self):
try:
hist = self.payload_router.get_history()
for iid in self.history_tree.get_children():
self.history_tree.delete(iid)
for i, entry in enumerate(reversed(hist)):
ts = entry["ts"].strftime("%H:%M:%S.%f")[:-3]
flow_name = entry.get("flow_name", "")
tid = entry.get("tid", "")
size = len(entry.get("raw", b""))
self.history_tree.insert(
"", tk.END, values=(ts, flow_name, tid, f"{size}B")
)
except Exception:
pass
def _on_history_select(self):
try:
sel = self.history_tree.selection()
if not sel:
return
iid = sel[0]
children = list(self.history_tree.get_children())
try:
idx = children.index(iid)
except ValueError:
idx = None
hist = list(reversed(self.payload_router.get_history()))
if idx is None or idx < 0 or idx >= len(hist):
return
entry = hist[idx]
self._display_raw_packet(entry["raw"], entry["addr"])
except Exception:
pass
def _on_clear_history(self):
try:
self.payload_router.clear_history()
self._refresh_history_tree()
except Exception:
pass
def _open_history_settings_dialog(self):
dlg = tk.Toplevel(self)
dlg.title("History Settings")
dlg.transient(self)
dlg.grab_set()
try:
hist_size = self.payload_router._history_size
persist = self.payload_router._persist
except Exception:
hist_size = 20
persist = False
ttk.Label(dlg, text="History size (entries):").pack(padx=10, pady=(10, 2))
size_var = tk.StringVar(value=str(hist_size))
entry = ttk.Entry(dlg, textvariable=size_var, width=8)
entry.pack(padx=10, pady=(0, 10))
persist_var = tk.BooleanVar(value=bool(persist))
ttk.Checkbutton(
dlg, text="Persist raw packets to Temp/", variable=persist_var
).pack(padx=10, pady=(0, 10))
btn_frame = ttk.Frame(dlg)
btn_frame.pack(padx=10, pady=(0, 10), fill=tk.X)
def on_save():
try:
v = int(size_var.get())
if v <= 0:
raise ValueError()
except Exception:
try:
tk.messagebox.showerror(
"Invalid value",
"Please enter a positive integer for history size.",
parent=dlg,
)
except Exception:
pass
return
try:
self.payload_router.set_history_size(v)
self.payload_router.set_persist(bool(persist_var.get()))
except Exception:
pass
try:
gm = getattr(self.master, "config_manager", None)
if gm:
general = gm.get_general_settings() or {}
sfp_debug = general.get("sfp_debug", {})
sfp_debug["history_size"] = v
sfp_debug["persist_raw"] = bool(persist_var.get())
general["sfp_debug"] = sfp_debug
gm.save_general_settings(general)
except Exception:
pass
dlg.destroy()
def on_cancel():
dlg.destroy()
ttk.Button(btn_frame, text="Cancel", command=on_cancel).pack(
side=tk.RIGHT, padx=(0, 5)
)
ttk.Button(btn_frame, text="Save", command=on_save).pack(side=tk.RIGHT)
def _display_raw_packet(self, raw_bytes: bytes, addr: tuple):
try:
header_size = SFPHeader.size()
if len(raw_bytes) < header_size:
raise ValueError("Packet smaller than SFP header")
header = SFPHeader.from_buffer_copy(raw_bytes)
body = raw_bytes[header_size:]
field_list = [
"SFP_MARKER",
"SFP_DIRECTION",
"SFP_PROT_VER",
"SFP_PT_SPARE",
"SFP_TAG",
"SFP_SRC",
"SFP_FLOW",
"SFP_TID",
"SFP_FLAGS",
"SFP_WIN",
"SFP_ERR",
"SFP_ERR_INFO",
"SFP_TOTFRGAS",
"SFP_FRAG",
"SFP_RECTYPE",
"SFP_RECSPARE",
"SFP_PLDAP",
"SFP_PLEXT",
"SFP_RECCOUNTER",
"SFP_PLSIZE",
"SFP_TOTSIZE",
"SFP_PLOFFSET",
]
pairs = []
flag_val = None
for f in field_list:
try:
val = getattr(header, f)
except Exception:
val = "<N/A>"
if f == "SFP_FLAGS":
flag_val = val
pairs.append(
(f, f"{val} (0x{val:X})" if isinstance(val, int) else str(val))
)
continue
if isinstance(val, int):
pairs.append((f, f"{val} (0x{val:X})"))
else:
pairs.append((f, str(val)))
out_lines = [f"From {addr}\n\nSFP Header:\n\n"]
col_width = 36
for i in range(0, len(pairs), 2):
left = pairs[i]
right = pairs[i + 1] if (i + 1) < len(pairs) else None
left_text = f"{left[0]:12s}: {left[1]}"
if right:
right_text = f"{right[0]:12s}: {right[1]}"
line = f"{left_text:<{col_width}} {right_text}"
else:
line = left_text
out_lines.append(line + "\n")
flag_defs = [
(0, "ACQ_REQ"),
(1, "RESENT"),
(2, "TRAILER_ACK"),
(3, "RESV3"),
(4, "RESV4"),
(5, "RESV5"),
(6, "RESV6"),
(7, "ERROR"),
]
out_lines.append(f"SFP_FLAGS : {flag_val} (0x{flag_val:X}) ")
for bit, name in flag_defs:
is_set = bool((flag_val >> bit) & 1)
out_lines.append(f" [{name}]")
out_lines.append("\n\nFlags legend:\n")
legend_map = {
"ACQ_REQ": "Acquisition required/requested",
"RESENT": "Fragment resent / please resend",
"TRAILER_ACK": "Request trailer acknowledgement",
"ERROR": "Packet-level error flag",
"RESV3": "Reserved",
"RESV4": "Reserved",
"RESV5": "Reserved",
"RESV6": "Reserved",
}
for _, name in flag_defs:
desc = legend_map.get(name, "")
out_lines.append(f" {name:12s}: {desc}\n")
out_lines.append("\nBODY (hex):\n")
hex_dump = self._format_hex_dump(body)
out_lines.append(hex_dump)
full_text = "".join(out_lines)
self.raw_tab_text.config(state=tk.NORMAL)
self.raw_tab_text.delete("1.0", tk.END)
header_block, _, _ = full_text.partition("\nBODY (hex):\n")
self.raw_tab_text.insert(tk.END, header_block + "\n", "hdr_field")
self.raw_tab_text.config(state=tk.DISABLED)
try:
self._display_hex_data(body, self.bin_tab)
except Exception:
self.bin_tab.config(state=tk.NORMAL)
self.bin_tab.delete("1.0", tk.END)
self.bin_tab.insert("1.0", hex_dump)
self.bin_tab.config(state=tk.DISABLED)
except Exception as e:
text = f"Failed to format raw packet: {e}\n\nRaw dump:\n"
text += self._format_hex_dump(raw_bytes)
self.raw_tab_text.config(state=tk.NORMAL)
self.raw_tab_text.delete("1.0", tk.END)
self.raw_tab_text.insert("1.0", text)
self.raw_tab_text.config(state=tk.DISABLED)
def _display_image_data(
self, payload: bytearray, tab_widgets: Dict[str, Any], photo_attr: str
):
try:
if len(payload) < ctypes.sizeof(ImageLeaderData):
raise ValueError("Payload smaller than ImageLeaderData header.")
leader = ImageLeaderData.from_buffer(payload)
h, w, bpp = (
leader.HEADER_DATA.DY,
leader.HEADER_DATA.DX,
leader.HEADER_DATA.BPP,
)
stride = leader.HEADER_DATA.STRIDE
offset = ctypes.sizeof(ImageLeaderData)
if not (h > 0 and w > 0 and bpp in [1, 2] and stride >= w):
raise ValueError(
f"Invalid image dimensions: {w}x{h}, bpp={bpp}, stride={stride}"
)
dtype = np.uint8 if bpp == 1 else np.uint16
expected_size = stride * h * bpp
if (offset + expected_size) > len(payload):
offset_fallback = (
ctypes.sizeof(SFPHeader)
+ ctypes.sizeof(ImageLeaderData)
- ctypes.sizeof(leader.PIXEL_TAG)
)
if (offset_fallback + expected_size) <= len(payload):
offset = offset_fallback
else:
raise ValueError(
f"Incomplete image data. Expected {expected_size}, got {len(payload) - offset}"
)
pixel_data_view = np.ndarray(
shape=(h, stride), dtype=dtype, buffer=payload, offset=offset
)
image_data = pixel_data_view[:, :w]
display_img_8bit = cv2.normalize(
image_data, None, 0, 255, cv2.NORM_MINMAX, cv2.CV_8U
)
img_pil = Image.fromarray(
cv2.cvtColor(display_img_8bit, cv2.COLOR_GRAY2RGB)
)
resized = self._resize_pil_to_label(
img_pil, tab_widgets.get("image_container", tab_widgets["image_label"])
)
photo = ImageTk.PhotoImage(image=resized)
tab_widgets["image_label"].config(image=photo, text="")
setattr(self, photo_attr, photo)
except Exception as e:
self.logger.error(f"Error parsing image payload: {e}")
tab_widgets["image_label"].config(
image=None, text=f"Error parsing image:\n{e}"
)
setattr(self, photo_attr, None)
try:
self._display_hex_data(payload, self.bin_tab)
except Exception:
try:
self._display_hex_data(payload, tab_widgets["hex_view"])
except Exception:
pass
def _resize_pil_to_label(
self, img: "Image.Image", label_widget: ttk.Label
) -> "Image.Image":
try:
width, height = label_widget.winfo_width(), label_widget.winfo_height()
if width <= 1 or height <= 1:
return img
img_w, img_h = img.size
scale = min(width / img_w, height / img_h)
if scale >= 1.0:
return img
new_w, new_h = max(1, int(img_w * scale)), max(1, int(img_h * scale))
return img.resize((new_w, new_h), Image.LANCZOS)
except Exception:
return img
def _display_hex_data(self, payload: bytearray, widget: scrolledtext.ScrolledText):
hex_dump = self._format_hex_dump(payload)
widget.config(state=tk.NORMAL)
widget.delete("1.0", tk.END)
widget.insert("1.0", hex_dump)
widget.config(state=tk.DISABLED)
def _display_json_data(self, payload: bytearray, widget: scrolledtext.ScrolledText):
try:
import json
text = json.dumps(json.loads(payload.decode("utf-8")), indent=2)
except Exception as e:
text = f"--- FAILED TO PARSE JSON ---\n{e}\n\n--- RAW HEX DUMP ---"
text += self._format_hex_dump(payload)
widget.config(state=tk.NORMAL)
widget.delete("1.0", tk.END)
widget.insert("1.0", text)
widget.config(state=tk.DISABLED)
def _log_to_widget(self, message: str, level: str = "DEBUG"):
self.logger.info(message)
self.log_tab.config(state=tk.NORMAL)
self.log_tab.insert(tk.END, f"[{level}] {message}\n")
self.log_tab.config(state=tk.DISABLED)
self.log_tab.see(tk.END)
def _on_save_ris_csv(self):
try:
import csv
scenario_rows = [
self.scenario_tree.item(iid, "values")
for iid in self.scenario_tree.get_children()
]
rows = [
self.ris_tree.item(iid, "values")
for iid in self.ris_tree.get_children()
]
if not scenario_rows and not rows:
self._log_to_widget("No RIS data to save.", "INFO")
return
project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..")
)
temp_dir = os.path.join(project_root, "Temp")
os.makedirs(temp_dir, exist_ok=True)
ts = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%S")
fname = f"ris_targets_{ts}.csv"
path = os.path.join(temp_dir, fname)
with open(path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if scenario_rows:
writer.writerow(["Scenario Field", "Value"])
for s in scenario_rows:
writer.writerow(s)
writer.writerow([])
writer.writerow(["index", "flags", "heading", "x", "y", "z"])
for r in rows:
writer.writerow(r)
self._log_to_widget(f"Saved RIS targets CSV to {path}", "INFO")
except Exception as e:
self._log_to_widget(f"Failed to save RIS CSV: {e}", "ERROR")
def _format_hex_dump(self, data: bytes, length=16) -> str:
lines = []
for i in range(0, len(data), length):
chunk = data[i : i + length]
hex_part = " ".join(f"{b:02X}" for b in chunk)
ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
lines.append(f"{i:08X} {hex_part:<{length*3}} |{ascii_part}|")
return "\n".join(lines)