S1005403_RisCC/target_simulator/gui/sfp_debug_window.py

877 lines
46 KiB
Python

# target_simulator/gui/sfp_debug_window.py
"""
Provides a Toplevel window for debugging the SFP transport layer.
This version uses a thread-safe queue for GUI updates to prevent deadlocks.
"""
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import logging
import datetime
import os
import ctypes
import math
from queue import Queue, Empty
from typing import Dict, Optional, Any, List
try:
from PIL import Image, ImageTk
import numpy as np
import cv2
_IMAGE_LIBS_AVAILABLE = True
except ImportError:
_IMAGE_LIBS_AVAILABLE = False
from target_simulator.core.sfp_transport import SfpTransport
from target_simulator.core.sfp_structures import ImageLeaderData, SFPHeader
from target_simulator.gui.payload_router import DebugPayloadRouter
from target_simulator.core.models import Target, Waypoint, ManeuverType, KNOTS_TO_FPS
from target_simulator.core import command_builder
from target_simulator.gui.ppi_display import PPIDisplay
DEF_TEST_ID = 1
DEF_TEST_RANGE = 30.0
DEF_TEST_AZIMUTH = 10.0
DEF_TEST_VELOCITY = 300.0
DEF_TEST_HEADING = 0.0
DEF_TEST_ALTITUDE = 10000.0
class SfpDebugWindow(tk.Toplevel):
"""Top-level window for SFP debugging and payload inspection."""
GUI_POLL_INTERVAL_MS = 100
def __init__(self, master=None):
super().__init__(master)
self.master = master
self.geometry("1100x700")
self.protocol("WM_DELETE_WINDOW", self._on_close)
self.logger = logging.getLogger(__name__)
self.debug_update_queue = Queue()
self.shared_communicator = getattr(self.master, 'target_communicator', None)
self.payload_router: Optional[DebugPayloadRouter] = None
if self.shared_communicator:
router_attr = getattr(self.shared_communicator, 'router', None)
if callable(router_attr):
self.payload_router = router_attr()
else:
self.payload_router = router_attr
if self.shared_communicator:
self.shared_communicator.add_connection_state_callback(self._update_toggle_state)
if self.payload_router:
self.payload_router.add_ris_target_listener(self._queue_ris_target_update)
self.image_area_size = 150
self._ppi_visible = False
self.ip_var = tk.StringVar(value="127.0.0.1")
self.local_port_var = tk.StringVar(value="60002")
self.server_port_var = tk.StringVar(value="60001")
self.script_var = tk.StringVar(value="print('hello from client')")
self.tgt_id_var = tk.IntVar(value=DEF_TEST_ID)
self.tgt_range_var = tk.DoubleVar(value=DEF_TEST_RANGE)
self.tgt_az_var = tk.DoubleVar(value=DEF_TEST_AZIMUTH)
self.tgt_alt_var = tk.DoubleVar(value=DEF_TEST_ALTITUDE)
self.tgt_vel_var = tk.DoubleVar(value=DEF_TEST_VELOCITY)
self.tgt_hdg_var = tk.DoubleVar(value=DEF_TEST_HEADING)
self.tgt_active_var = tk.BooleanVar(value=True)
self.tgt_traceable_var = tk.BooleanVar(value=True)
self.tgt_restart_var = tk.BooleanVar(value=False)
self._master_mode_names = [ "idle_master_mode", "int_bit_master_mode", "gm_master_mode", "dbs_master_mode", "rws_master_mode", "vs_master_mode", "acm_master_mode", "tws_master_mode", "sea_low_master_mode", "sea_high_master_mode", "gmti_master_mode", "bcn_master_mode", "sam_master_mode", "ta_master_mode", "wa_master_mode", "stt_master_mode", "dtt_master_mode", "sstt_master_mode", "acq_master_mode", "ftt_master_mode", "agr_master_mode", "sar_master_mode", "invalid_master_mode_", "xtst_dummy_mode", "xtst_hw_validation_mode", "boot_master_mode", "master_mode_id_cardinality_", ]
self._create_widgets()
if self.shared_communicator:
self._update_toggle_state(self.shared_communicator.is_open)
self.after(self.GUI_POLL_INTERVAL_MS, self._process_gui_updates)
def _queue_ris_target_update(self, targets: List[Target]):
try:
self.debug_update_queue.put_nowait(targets)
except Queue.Full:
self.logger.warning("SFP Debug window update queue is full. Skipping an update.")
def _on_close(self):
self.logger.info("SFP Debug Window closing.")
if self.shared_communicator:
self.shared_communicator.remove_connection_state_callback(self._update_toggle_state)
if self.payload_router:
self.payload_router.remove_ris_target_listener(self._queue_ris_target_update)
self.destroy()
def _process_gui_updates(self):
"""
This method runs on the GUI thread and processes all queued updates.
"""
# 1. Process target updates for the PPI
try:
while not self.debug_update_queue.empty():
real_targets = self.debug_update_queue.get_nowait()
self.update_ppi_targets(real_targets)
except Empty:
pass
# 2. Process other buffered payloads (for text tabs)
if self.payload_router:
try:
new_payloads = self.payload_router.get_and_clear_latest_payloads()
if new_payloads:
for flow_id, payload in new_payloads.items():
if flow_id == "MFD" and _IMAGE_LIBS_AVAILABLE:
self._display_image_data(payload, self.mfd_tab, "mfd_photo")
elif flow_id == "SAR" and _IMAGE_LIBS_AVAILABLE:
self._display_image_data(payload, self.sar_tab, "sar_photo")
elif flow_id == "BIN":
self._display_hex_data(payload, self.bin_tab)
elif flow_id == "JSON":
self._display_json_data(payload, self.json_tab)
elif flow_id == "RIS_STATUS_JSON":
try:
import json
struct = json.loads(payload.decode("utf-8")) if isinstance(payload, (bytes, bytearray)) else payload
# --- START OF NEW LOGIC ---
view_mode = self.scenario_view_mode.get()
decimals = self.simplified_decimals.get()
# Helper functions for conversion
def to_deg(rad): return rad * 180.0 / math.pi
def m_s_to_ft_s(ms): return ms * 3.28084
def m_to_ft(m): return m * 3.28084
def decimal_deg_to_dms(deg, is_lat):
d = abs(deg)
degrees = int(d)
minutes_full = (d - degrees) * 60
minutes = int(minutes_full)
seconds = (minutes_full - minutes) * 60
direction = ""
if is_lat:
direction = 'N' if deg >= 0 else 'S'
else:
direction = 'E' if deg >= 0 else 'W'
return f"{degrees}° {minutes}' {seconds:.2f}\" {direction}"
# 1. Update scenario tree
self.scenario_tree.delete(*self.scenario_tree.get_children())
scenario = struct.get("scenario", {})
if scenario:
for field, value in scenario.items():
display_value = value
if view_mode == 'simplified':
if field in ['platform_azimuth', 'true_heading', 'ant_nav_az', 'ant_nav_el']:
display_value = f"{to_deg(value):.{decimals}f}°"
elif field in ['vx', 'vy', 'vz']:
display_value = f"{m_s_to_ft_s(value):.{decimals}f} ft/s"
elif field == 'baro_altitude':
display_value = f"{m_to_ft(value):.{decimals}f} ft"
elif field in ['latitude', 'longitude']:
display_value = decimal_deg_to_dms(value, is_lat=(field == 'latitude'))
elif field == 'mode' and value < len(self._master_mode_names):
display_value = f"{value} ({self._master_mode_names[value].replace('_master_mode', '')})"
elif isinstance(value, list):
display_value = str(value) # Keep arrays as string
self.scenario_tree.insert("", tk.END, values=(field, display_value))
# 2. Update ris_tree (all targets)
self.ris_tree.delete(*self.ris_tree.get_children())
targets = struct.get("targets", [])
for i, t in enumerate(targets):
flags_val = t.get("flags", 0)
flags_display = f"0x{flags_val:X}"
if view_mode == 'simplified':
heading = f"{to_deg(t.get('heading', 0.0)):.{decimals}f}°"
x_pos = f"{m_to_ft(t.get('x', 0.0)):.{decimals}f} ft"
y_pos = f"{m_to_ft(t.get('y', 0.0)):.{decimals}f} ft"
z_pos = f"{m_to_ft(t.get('z', 0.0)):.{decimals}f} ft"
else: # Raw mode
heading = f"{t.get('heading', 0.0):.6f}"
x_pos = f"{t.get('x', 0.0):.3f}"
y_pos = f"{t.get('y', 0.0):.3f}"
z_pos = f"{t.get('z', 0.0):.3f}"
vals = (i, flags_display, heading, x_pos, y_pos, z_pos)
self.ris_tree.insert("", tk.END, values=vals)
# --- END OF NEW LOGIC ---
except Exception:
self.logger.exception("Failed to update RIS tables from JSON payload.")
except Exception:
self.logger.exception("Error while fetching latest payloads from router")
# (raw packet display logic remains unchanged)
if self.payload_router:
try:
raw_pkt = self.payload_router.get_and_clear_raw_packet()
if raw_pkt:
raw_bytes, addr = raw_pkt
self._display_raw_packet(raw_bytes, addr)
self._refresh_history_tree()
except Exception:
self.logger.exception("Error while fetching raw packet from router")
self.after(self.GUI_POLL_INTERVAL_MS, self._process_gui_updates)
def _create_widgets(self):
top_controls_frame = ttk.Frame(self)
top_controls_frame.pack(side=tk.TOP, fill=tk.X, padx=5, pady=5)
conn_frame = ttk.Frame(top_controls_frame)
conn_frame.pack(side=tk.TOP, fill=tk.X, pady=(0, 5))
self._create_connection_widgets(conn_frame)
target_sender_frame = ttk.LabelFrame(top_controls_frame, text="Simple Target Sender")
target_sender_frame.pack(side=tk.TOP, fill=tk.X, pady=(5, 5))
self._create_target_sender_widgets(target_sender_frame)
script_frame = ttk.LabelFrame(top_controls_frame, text="Script to send")
script_frame.pack(side=tk.TOP, fill=tk.X, pady=(0, 5))
self._create_script_sender_widgets(script_frame)
self.notebook = ttk.Notebook(self)
self.notebook.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self._create_notebook_tabs()
def _toggle_ppi(self):
try:
if self._ppi_visible:
self.ris_ppi_container.pack_forget()
self.ris_table_container.pack(fill=tk.BOTH, expand=True)
self._ppi_visible = False
self.ppi_toggle_btn.config(text="Show PPI Map")
else:
self.ris_table_container.pack_forget()
self.ris_ppi_container.pack(fill=tk.BOTH, expand=True)
self._ppi_visible = True
self.ppi_toggle_btn.config(text="Hide PPI Map")
except Exception:
self.logger.exception("Toggle PPI failed")
def update_ppi_targets(self, targets: List[Target]):
try:
if self.ris_ppi_widget:
self.ris_ppi_widget.update_targets({"real": targets})
except Exception:
self.logger.exception("Failed to update RIS PPI targets")
def _create_connection_widgets(self, parent):
ttk.Label(parent, text="IP:").pack(side=tk.LEFT, padx=(4, 2))
ttk.Entry(parent, textvariable=self.ip_var, width=18).pack(side=tk.LEFT, padx=(0, 6))
ttk.Label(parent, text="Local Port:").pack(side=tk.LEFT, padx=(0, 2))
ttk.Entry(parent, textvariable=self.local_port_var, width=8).pack(side=tk.LEFT, padx=(0, 6))
ttk.Label(parent, text="Server Port:").pack(side=tk.LEFT, padx=(0, 2))
ttk.Entry(parent, textvariable=self.server_port_var, width=8).pack(side=tk.LEFT, padx=(0, 6))
self.connect_toggle_btn = ttk.Button(parent, text="Connect", command=self._on_toggle_connect)
self.connect_toggle_btn.pack(side=tk.LEFT, padx=(0, 6))
self.send_probe_btn = ttk.Button(parent, text="Send Probe", command=self._on_send_probe)
self.send_probe_btn.pack(side=tk.LEFT, padx=(6, 4))
def _create_target_sender_widgets(self, parent):
grid = ttk.Frame(parent, padding=5)
grid.pack(fill=tk.X)
grid.columnconfigure(1, pad=15)
grid.columnconfigure(3, pad=15)
grid.columnconfigure(5, pad=15)
ttk.Label(grid, text="ID:").grid(row=0, column=0, sticky=tk.W)
ttk.Spinbox(grid, from_=0, to=15, textvariable=self.tgt_id_var, width=8).grid(row=1, column=0, sticky=tk.W)
ttk.Label(grid, text="Range (NM):").grid(row=0, column=1, sticky=tk.W)
ttk.Spinbox(grid, from_=0, to=500, textvariable=self.tgt_range_var, width=10).grid(row=1, column=1, sticky=tk.W)
ttk.Label(grid, text="Azimuth (°):").grid(row=0, column=2, sticky=tk.W)
ttk.Spinbox(grid, from_=-180, to=180, textvariable=self.tgt_az_var, width=10).grid(row=1, column=2, sticky=tk.W)
ttk.Label(grid, text="Velocity (kn):").grid(row=0, column=3, sticky=tk.W)
ttk.Spinbox(grid, from_=0, to=2000, textvariable=self.tgt_vel_var, width=10).grid(row=1, column=3, sticky=tk.W)
ttk.Label(grid, text="Heading (°):").grid(row=0, column=4, sticky=tk.W)
ttk.Spinbox(grid, from_=0, to=360, textvariable=self.tgt_hdg_var, width=10).grid(row=1, column=4, sticky=tk.W)
ttk.Label(grid, text="Altitude (ft):").grid(row=0, column=5, sticky=tk.W)
ttk.Spinbox(grid, from_=0, to=80000, textvariable=self.tgt_alt_var, width=12).grid(row=1, column=5, sticky=tk.W)
controls_frame = ttk.Frame(grid)
controls_frame.grid(row=1, column=6, sticky="nsew", padx=(20, 0))
ttk.Checkbutton(controls_frame, text="Active", variable=self.tgt_active_var).pack(side=tk.LEFT, anchor="w")
ttk.Checkbutton(controls_frame, text="Traceable", variable=self.tgt_traceable_var).pack(side=tk.LEFT, anchor="w", padx=5)
ttk.Checkbutton(controls_frame, text="Restart", variable=self.tgt_restart_var).pack(side=tk.LEFT, anchor="w", padx=5)
send_button = ttk.Button(controls_frame, text="Send Target", command=self._on_send_target)
send_button.pack(side=tk.LEFT, padx=(10, 0))
ttk.Label(controls_frame, text="(debug: sends /s /t /r if set)", foreground="#555555", font=("Segoe UI", 8)).pack(side=tk.LEFT, padx=(6, 0))
quick_cmd_frame = ttk.Frame(parent)
quick_cmd_frame.pack(fill=tk.X, pady=(6, 0))
ttk.Button(quick_cmd_frame, text="tgtreset", command=lambda: self._on_send_simple_command(command_builder.build_tgtreset())).pack(side=tk.LEFT, padx=4)
ttk.Button(quick_cmd_frame, text="pause", command=lambda: self._on_send_simple_command(command_builder.build_pause())).pack(side=tk.LEFT, padx=4)
ttk.Button(quick_cmd_frame, text="continue", command=lambda: self._on_send_simple_command(command_builder.build_continue())).pack(side=tk.LEFT, padx=4)
ttk.Button(quick_cmd_frame, text="tgtset (cur)", command=lambda: self._on_send_tgtset()).pack(side=tk.LEFT, padx=8)
self.ppi_toggle_btn = ttk.Button(quick_cmd_frame, text="Show PPI Map", command=self._toggle_ppi)
self.ppi_toggle_btn.pack(side=tk.RIGHT, padx=4)
def _create_script_sender_widgets(self, parent):
ttk.Label(parent, text="Script to send:").pack(side=tk.LEFT, padx=(5, 2))
ttk.Entry(parent, textvariable=self.script_var, width=60).pack(side=tk.LEFT, expand=True, fill=tk.X, padx=(0, 5))
self.send_script_btn = ttk.Button(parent, text="Send script", command=self._on_send_script)
self.send_script_btn.pack(side=tk.LEFT, padx=5)
def _create_notebook_tabs(self):
self.log_tab = scrolledtext.ScrolledText(self.notebook, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 9))
self.notebook.add(self.log_tab, text="Raw Log")
if _IMAGE_LIBS_AVAILABLE:
self.mfd_tab = self._create_image_tab("MFD Image")
self.notebook.add(self.mfd_tab["frame"], text="MFD Image")
self.sar_tab = self._create_image_tab("SAR Image")
self.notebook.add(self.sar_tab["frame"], text="SAR Image")
ris_frame = ttk.Frame(self.notebook)
paned = ttk.Panedwindow(ris_frame, orient=tk.HORIZONTAL)
paned.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
left = ttk.Frame(paned)
self.scenario_tree = ttk.Treeview(left, columns=("field", "value"), show="headings", height=12)
self.scenario_tree.heading("field", text="Field")
self.scenario_tree.heading("value", text="Value")
self.scenario_tree.column("field", width=140, anchor="w")
self.scenario_tree.column("value", width=160, anchor="w")
self.scenario_tree.pack(fill=tk.BOTH, expand=True)
paned.add(left, weight=1)
right = ttk.Frame(paned)
self.ris_table_container = ttk.Frame(right)
self.ris_ppi_container = ttk.Frame(right)
cols = ("idx", "flags", "heading", "x", "y", "z")
self.ris_tree = ttk.Treeview(self.ris_table_container, columns=cols, show="headings", height=12)
for c, txt in zip(cols, ("#", "flags", "heading", "x", "y", "z")):
self.ris_tree.heading(c, text=txt)
self.ris_tree.column(c, width=70, anchor="center")
self.ris_tree.pack(fill=tk.BOTH, expand=True)
self.ris_table_container.pack(fill=tk.BOTH, expand=True)
paned.add(right, weight=2)
gm = getattr(self.master, "config_manager", None)
trail_len = None
if gm:
general = gm.get_general_settings() or {}
trail_len = general.get("ppi_trail_length")
self.ris_ppi_widget = PPIDisplay(self.ris_ppi_container, max_range_nm=100, trail_length=trail_len)
self.ris_ppi_widget.pack(fill=tk.BOTH, expand=True)
btn_frame = ttk.Frame(ris_frame)
btn_frame.pack(fill=tk.X, padx=5, pady=(0, 5))
self.scenario_view_mode = tk.StringVar(value="simplified")
mode_frame = ttk.Frame(btn_frame)
mode_frame.pack(side=tk.LEFT, padx=(4, 0))
ttk.Label(mode_frame, text="View:").pack(side=tk.LEFT, padx=(0, 6))
ttk.Radiobutton(mode_frame, text="Raw", value="raw", variable=self.scenario_view_mode).pack(side=tk.LEFT)
ttk.Radiobutton(mode_frame, text="Simplified", value="simplified", variable=self.scenario_view_mode).pack(side=tk.LEFT)
self.simplified_decimals = tk.IntVar(value=4)
ttk.Label(mode_frame, text=" Decimals:").pack(side=tk.LEFT, padx=(8, 2))
ttk.Spinbox(mode_frame, from_=0, to=8, width=3, textvariable=self.simplified_decimals).pack(side=tk.LEFT)
self.ris_save_csv_btn = ttk.Button(btn_frame, text="Save CSV", command=self._on_save_ris_csv)
self.ris_save_csv_btn.pack(side=tk.RIGHT)
self.notebook.add(ris_frame, text="RIS Status")
raw_frame = ttk.Frame(self.notebook)
history_frame = ttk.Frame(raw_frame, width=380)
history_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(5, 2), pady=5)
ttk.Label(history_frame, text="History (latest)").pack(anchor=tk.W, padx=4)
list_container = ttk.Frame(history_frame)
list_container.pack(fill=tk.BOTH, expand=True, padx=4, pady=(2, 4))
columns = ("ts", "flow", "tid", "size")
self.history_tree = ttk.Treeview(list_container, columns=columns, show="headings", height=20)
self.history_tree.heading("ts", text="Timestamp")
self.history_tree.heading("flow", text="Flow")
self.history_tree.heading("tid", text="TID")
self.history_tree.heading("size", text="Size")
self.history_tree.column("ts", width=100, anchor="w")
self.history_tree.column("flow", width=50, anchor="w")
self.history_tree.column("tid", width=40, anchor="center")
self.history_tree.column("size", width=50, anchor="e")
self.history_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self.history_vscroll = ttk.Scrollbar(list_container, orient=tk.VERTICAL, command=self.history_tree.yview)
self.history_vscroll.pack(side=tk.RIGHT, fill=tk.Y)
self.history_tree.config(yscrollcommand=self.history_vscroll.set)
hb_frame = ttk.Frame(history_frame)
hb_frame.pack(fill=tk.X, padx=4, pady=(4, 4))
self.history_settings_btn = ttk.Button(hb_frame, text="Settings", command=self._open_history_settings_dialog)
self.history_settings_btn.pack(side=tk.LEFT)
self.history_clear_btn = ttk.Button(hb_frame, text="Clear", command=self._on_clear_history)
self.history_clear_btn.pack(side=tk.RIGHT)
self.raw_tab_text = scrolledtext.ScrolledText(raw_frame, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 9))
self.raw_tab_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(2, 5), pady=5)
self.notebook.insert(1, raw_frame, text="SFP Raw")
self.bin_tab = scrolledtext.ScrolledText(self.notebook, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 10))
self.notebook.add(self.bin_tab, text="Binary (Hex)")
self.json_tab = scrolledtext.ScrolledText(self.notebook, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 10))
self.notebook.add(self.json_tab, text="JSON")
def _on_send_target(self):
if not self.shared_communicator or not self.shared_communicator.is_open:
self._log_to_widget("ERROR: Cannot send target, not connected.", "ERROR")
messagebox.showerror("Connection Error", "Communicator is not connected.", parent=self)
return
try:
target_id = self.tgt_id_var.get()
range_nm = self.tgt_range_var.get()
az_deg = self.tgt_az_var.get()
alt_ft = self.tgt_alt_var.get()
vel_kn = self.tgt_vel_var.get()
hdg_deg = self.tgt_hdg_var.get()
is_active = self.tgt_active_var.get()
is_traceable = self.tgt_traceable_var.get()
is_restart = self.tgt_restart_var.get()
vel_fps = vel_kn * KNOTS_TO_FPS
initial_waypoint = Waypoint(
maneuver_type=ManeuverType.FLY_TO_POINT,
target_range_nm=range_nm,
target_azimuth_deg=az_deg,
target_altitude_ft=alt_ft,
target_velocity_fps=vel_fps,
target_heading_deg=hdg_deg,
)
temp_target = Target(target_id=target_id, trajectory=[initial_waypoint])
temp_target.active = is_active
temp_target.traceable = is_traceable
temp_target.restart = is_restart
temp_target.current_range_nm = range_nm
temp_target.current_azimuth_deg = az_deg
temp_target.current_velocity_fps = vel_fps
temp_target.current_heading_deg = hdg_deg
temp_target.current_altitude_ft = alt_ft
command_str = command_builder.build_tgtset_from_target_state(temp_target, include_flags=True)
command_str = command_str.strip()
if not command_str.endswith("\n"):
command_str += "\n"
self._log_to_widget(f"Built command: {command_str!r}", "INFO")
success = self.shared_communicator._send_single_command(command_str)
if success:
self._log_to_widget(f"Successfully sent command for target {target_id}.", "INFO")
else:
self._log_to_widget(f"Failed to send command for target {target_id}.", "ERROR")
except (ValueError, tk.TclError) as e:
error_msg = f"Invalid input value: {e}"
self._log_to_widget(f"ERROR: {error_msg}", "ERROR")
messagebox.showerror("Input Error", error_msg, parent=self)
except Exception as e:
self.logger.exception("An unexpected error occurred in _on_send_target.")
self._log_to_widget(f"ERROR: {e}", "CRITICAL")
messagebox.showerror("Unexpected Error", str(e), parent=self)
def _create_image_tab(self, title: str) -> Dict:
frame = ttk.Frame(self.notebook)
image_container = ttk.Frame(frame, width=self.image_area_size, height=self.image_area_size, relief=tk.SUNKEN)
image_container.pack(pady=5, padx=5)
image_container.pack_propagate(False)
image_label = ttk.Label(image_container, text=f"Waiting for {title}...", anchor=tk.CENTER)
image_label.pack(fill=tk.BOTH, expand=True)
hex_view = scrolledtext.ScrolledText(frame, height=8, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 9))
hex_view.pack(fill=tk.BOTH, expand=True, pady=5, padx=5)
return {
"frame": frame,
"image_label": image_label,
"hex_view": hex_view,
"image_container": image_container,
}
def _open_image_size_dialog(self):
dlg = tk.Toplevel(self)
dlg.title("Image Size")
dlg.transient(self)
dlg.grab_set()
ttk.Label(dlg, text="Image area size (px):").pack(padx=10, pady=(10, 2))
size_var = tk.StringVar(value=str(self.image_area_size))
entry = ttk.Entry(dlg, textvariable=size_var, width=8)
entry.pack(padx=10, pady=(0, 10))
btn_frame = ttk.Frame(dlg)
btn_frame.pack(padx=10, pady=(0, 10))
def on_save():
try:
v = int(size_var.get())
if v <= 0:
raise ValueError()
except Exception:
messagebox.showerror("Invalid value", "Please enter a positive integer for image size.", parent=dlg)
return
self.image_area_size = v
for tab in (getattr(self, "mfd_tab", None), getattr(self, "sar_tab", None)):
if tab and "image_container" in tab:
tab["image_container"].config(width=v, height=v)
gm = getattr(self.master, "config_manager", None)
if gm:
general = gm.get_general_settings() or {}
image_display = general.get("image_display", {})
image_display["size"] = v
general["image_display"] = image_display
gm.save_general_settings(general)
dlg.destroy()
def on_cancel():
dlg.destroy()
ttk.Button(btn_frame, text="Cancel", command=on_cancel).pack(side=tk.RIGHT, padx=(0, 5))
ttk.Button(btn_frame, text="Save", command=on_save).pack(side=tk.RIGHT)
def _on_toggle_connect(self):
if not self.shared_communicator:
self._log_to_widget("ERROR: No shared communicator available.", "ERROR")
messagebox.showerror("Error", "No shared communicator available.", parent=self)
return
if self.shared_communicator.is_open:
self.shared_communicator.disconnect()
else:
try:
ip = self.ip_var.get()
server_port = int(self.server_port_var.get())
local_port = int(self.local_port_var.get())
config = {"ip": ip, "port": server_port, "local_port": local_port}
self.shared_communicator.connect(config)
except (ValueError, tk.TclError) as e:
self._log_to_widget(f"ERROR: Invalid connection settings: {e}", "ERROR")
messagebox.showerror("Input Error", f"Invalid connection settings: {e}", parent=self)
def _on_send_probe(self):
ip = self.ip_var.get()
try:
port = int(self.server_port_var.get())
except Exception:
self._log_to_widget("ERROR: Invalid port number for probe.", "ERROR")
return
probe_payload = b"SFP_PROBE\n"
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1.0)
sock.sendto(probe_payload, (ip, port))
sock.close()
self._log_to_widget(f"Sent probe to {ip}:{port}", "INFO")
except Exception as e:
self._log_to_widget(f"Failed to send probe to {ip}:{port}: {e}", "ERROR")
def _on_send_script(self):
if not self.shared_communicator or not self.shared_communicator.is_open:
self._log_to_widget("ERROR: Cannot send script, not connected.", "ERROR")
return
try:
command_str = self.script_var.get()
self.shared_communicator._send_single_command(command_str)
except (ValueError, tk.TclError) as e:
self._log_to_widget(f"ERROR: Invalid input for script sending: {e}", "ERROR")
def _on_send_simple_command(self, command_str: str):
if not self.shared_communicator or not self.shared_communicator.is_open:
self._log_to_widget("ERROR: Cannot send command, not connected.", "ERROR")
messagebox.showerror("Connection Error", "Communicator is not connected.", parent=self)
return False
try:
success = self.shared_communicator._send_single_command(command_str)
if success:
self._log_to_widget(f"Successfully sent command: {command_str}", "INFO")
else:
self._log_to_widget(f"Failed to send command: {command_str}", "ERROR")
return success
except Exception as e:
self.logger.exception("Unexpected error in _on_send_simple_command")
self._log_to_widget(f"ERROR: {e}", "ERROR")
return False
def _on_send_tgtset(self):
try:
target_id = self.tgt_id_var.get()
range_nm = self.tgt_range_var.get()
az_deg = self.tgt_az_var.get()
alt_ft = self.tgt_alt_var.get()
vel_kn = self.tgt_vel_var.get()
hdg_deg = self.tgt_hdg_var.get()
vel_fps = vel_kn * KNOTS_TO_FPS
is_active = self.tgt_active_var.get()
is_traceable = self.tgt_traceable_var.get()
updates = {
"range_nm": f"{range_nm:.2f}",
"azimuth_deg": f"{az_deg:.2f}",
"velocity_fps": f"{vel_fps:.2f}",
"heading_deg": f"{hdg_deg:.2f}",
"altitude_ft": f"{alt_ft:.2f}",
"active": is_active,
"traceable": is_traceable,
}
command_str = command_builder.build_tgtset_selective(target_id, updates)
command_str = command_str.strip()
if command_str and not command_str.endswith("\n"):
command_str = command_str + "\n"
return self._on_send_simple_command(command_str)
except (ValueError, tk.TclError) as e:
self._log_to_widget(f"ERROR: Invalid input for tgtset: {e}", "ERROR")
return False
def _on_close(self):
self.logger.info("SFP Debug Window closing.")
if self.shared_communicator:
self.shared_communicator.remove_connection_state_callback(self._update_toggle_state)
if self.payload_router:
self.payload_router.remove_ris_target_listener(self.update_ppi_targets)
self.destroy()
def _update_toggle_state(self, connected: bool):
try:
if connected:
self.connect_toggle_btn.config(text="Disconnect")
else:
self.connect_toggle_btn.config(text="Connect")
except Exception:
pass
def update_toggle_state(self, connected: bool):
self._update_toggle_state(connected)
def _process_latest_payloads(self):
if not self.payload_router:
self.after(self.GUI_POLL_INTERVAL_MS, self._process_latest_payloads)
return
try:
new_payloads = self.payload_router.get_and_clear_latest_payloads()
except Exception:
self.logger.exception("Error while fetching latest payloads from router")
new_payloads = {}
if new_payloads:
for flow_id, payload in new_payloads.items():
if flow_id == "MFD" and _IMAGE_LIBS_AVAILABLE:
self._display_image_data(payload, self.mfd_tab, "mfd_photo")
elif flow_id == "SAR" and _IMAGE_LIBS_AVAILABLE:
self._display_image_data(payload, self.sar_tab, "sar_photo")
elif flow_id == "BIN":
self._display_hex_data(payload, self.bin_tab)
elif flow_id == "JSON":
self._display_json_data(payload, self.json_tab)
elif flow_id == "RIS_STATUS_JSON":
try:
import json
struct = json.loads(payload.decode("utf-8")) if isinstance(payload, (bytes, bytearray)) else payload
for iid in self.scenario_tree.get_children(): self.scenario_tree.delete(iid)
scenario = struct.get("scenario", {})
if scenario:
for field_name, value in scenario.items():
self.scenario_tree.insert("", tk.END, values=(field_name, str(value)))
for iid in self.ris_tree.get_children(): self.ris_tree.delete(iid)
targets = struct.get("targets", [])
for i, t in enumerate(targets):
flags_val = t.get("flags", 0)
flags_display = f"0x{flags_val:X}"
vals = (i, flags_display, f"{t.get('heading', 0.0):.3f}", f"{t.get('x', 0.0):.3f}", f"{t.get('y', 0.0):.3f}", f"{t.get('z', 0.0):.3f}")
self.ris_tree.insert("", tk.END, values=vals)
except Exception:
self.logger.exception("Failed to update RIS tables from JSON payload.")
try:
raw_pkt = self.payload_router.get_and_clear_raw_packet() if self.payload_router else None
if raw_pkt:
raw_bytes, addr = raw_pkt
self._display_raw_packet(raw_bytes, addr)
self._refresh_history_tree()
except Exception:
self.logger.exception("Error while fetching raw packet from router")
self.after(self.GUI_POLL_INTERVAL_MS, self._process_latest_payloads)
def _refresh_history_tree(self):
try:
if self.payload_router:
hist = self.payload_router.get_history()
self.history_tree.delete(*self.history_tree.get_children())
for i, entry in enumerate(reversed(hist)):
ts = entry["ts"].strftime("%H:%M:%S.%f")[:-3]
flow_name = entry.get("flow_name", "")
tid = entry.get("tid", "")
size = len(entry.get("raw", b""))
self.history_tree.insert("", tk.END, values=(ts, flow_name, tid, f"{size}B"))
except Exception: pass
def _on_history_select(self, event=None):
try:
sel = self.history_tree.selection()
if not sel: return
iid = sel[0]
children = list(self.history_tree.get_children())
idx = children.index(iid)
hist = self.payload_router.get_history()
rev_idx = len(hist) - 1 - idx
if not (0 <= rev_idx < len(hist)): return
entry = hist[rev_idx]
self._display_raw_packet(entry["raw"], entry["addr"])
except Exception:
self.logger.exception("Error during history selection handling")
def _on_clear_history(self):
if self.payload_router:
self.payload_router.clear_history()
self._refresh_history_tree()
def _open_history_settings_dialog(self):
dlg = tk.Toplevel(self)
dlg.title("History Settings")
dlg.transient(self)
dlg.grab_set()
hist_size = self.payload_router._history_size if self.payload_router else 20
persist = self.payload_router._persist if self.payload_router else False
ttk.Label(dlg, text="History size (entries):").pack(padx=10, pady=(10, 2))
size_var = tk.StringVar(value=str(hist_size))
entry = ttk.Entry(dlg, textvariable=size_var, width=8)
entry.pack(padx=10, pady=(0, 10))
persist_var = tk.BooleanVar(value=bool(persist))
ttk.Checkbutton(dlg, text="Persist raw packets to Temp/", variable=persist_var).pack(padx=10, pady=(0, 10))
btn_frame = ttk.Frame(dlg)
btn_frame.pack(padx=10, pady=(0, 10), fill=tk.X)
def on_save():
try: v = int(size_var.get())
except Exception:
messagebox.showerror("Invalid value", "Please enter a positive integer for history size.", parent=dlg)
return
if self.payload_router:
self.payload_router.set_history_size(v)
self.payload_router.set_persist(bool(persist_var.get()))
gm = getattr(self.master, "config_manager", None)
if gm:
general = gm.get_general_settings() or {}
sfp_debug = general.get("sfp_debug", {})
sfp_debug["history_size"] = v
sfp_debug["persist_raw"] = bool(persist_var.get())
general["sfp_debug"] = sfp_debug
gm.save_general_settings(general)
dlg.destroy()
def on_cancel(): dlg.destroy()
ttk.Button(btn_frame, text="Cancel", command=on_cancel).pack(side=tk.RIGHT, padx=(0, 5))
ttk.Button(btn_frame, text="Save", command=on_save).pack(side=tk.RIGHT)
def _display_raw_packet(self, raw_bytes: bytes, addr: tuple):
try:
header_size = SFPHeader.size()
if len(raw_bytes) < header_size: raise ValueError("Packet smaller than SFP header")
header = SFPHeader.from_buffer_copy(raw_bytes)
body = raw_bytes[header_size:]
field_list = [ "SFP_MARKER", "SFP_DIRECTION", "SFP_PROT_VER", "SFP_PT_SPARE", "SFP_TAG", "SFP_SRC", "SFP_FLOW", "SFP_TID", "SFP_FLAGS", "SFP_WIN", "SFP_ERR", "SFP_ERR_INFO", "SFP_TOTFRGAS", "SFP_FRAG", "SFP_RECTYPE", "SFP_RECSPARE", "SFP_PLDAP", "SFP_PLEXT", "SFP_RECCOUNTER", "SFP_PLSIZE", "SFP_TOTSIZE", "SFP_PLOFFSET", ]
pairs = []
flag_val = 0
for f in field_list:
val = getattr(header, f)
if f == "SFP_FLAGS": flag_val = val
pairs.append((f, f"{val} (0x{val:X})" if isinstance(val, int) else str(val)))
out_lines = [f"From {addr}\n\nSFP Header:\n\n"]
col_width = 36
for i in range(0, len(pairs), 2):
left_text = f"{pairs[i][0]:12s}: {pairs[i][1]}"
right_text = f"{pairs[i+1][0]:12s}: {pairs[i+1][1]}" if (i + 1) < len(pairs) else ""
out_lines.append(f"{left_text:<{col_width}} {right_text}\n")
flag_defs = [(0, "ACQ_REQ"), (1, "RESENT"), (2, "TRAILER_ACK"), (3, "RESV3"), (4, "RESV4"), (5, "RESV5"), (6, "RESV6"), (7, "ERROR")]
out_lines.append(f"SFP_FLAGS : {flag_val} (0x{flag_val:X}) " + "".join([f" [{(name if bool((flag_val >> bit) & 1) else ' ')}]" for bit, name in flag_defs]) + "\n")
out_lines.append("\nBODY (hex):\n" + self._format_hex_dump(body))
self.raw_tab_text.config(state=tk.NORMAL)
self.raw_tab_text.delete("1.0", tk.END)
self.raw_tab_text.insert("1.0", "".join(out_lines))
self.raw_tab_text.config(state=tk.DISABLED)
except Exception as e:
text = f"Failed to format raw packet: {e}\n\nRaw dump:\n{self._format_hex_dump(raw_bytes)}"
self.raw_tab_text.config(state=tk.NORMAL)
self.raw_tab_text.delete("1.0", tk.END)
self.raw_tab_text.insert("1.0", text)
self.raw_tab_text.config(state=tk.DISABLED)
def _display_image_data(self, payload: bytearray, tab_widgets: Dict[str, Any], photo_attr: str):
try:
if len(payload) < ctypes.sizeof(ImageLeaderData): raise ValueError("Payload smaller than ImageLeaderData header.")
leader = ImageLeaderData.from_buffer(payload)
h, w, bpp, stride = leader.HEADER_DATA.DY, leader.HEADER_DATA.DX, leader.HEADER_DATA.BPP, leader.HEADER_DATA.STRIDE
offset = ctypes.sizeof(ImageLeaderData)
if not (h > 0 and w > 0 and bpp in [1, 2] and stride >= w): raise ValueError(f"Invalid image dimensions")
expected_size = stride * h * bpp
if (offset + expected_size) > len(payload): raise ValueError("Incomplete image data")
pixel_data_view = np.ndarray(shape=(h, stride), dtype=np.uint8 if bpp==1 else np.uint16, buffer=payload, offset=offset)
image_data = pixel_data_view[:, :w]
display_img_8bit = cv2.normalize(image_data, None, 0, 255, cv2.NORM_MINMAX, cv2.CV_8U)
img_pil = Image.fromarray(cv2.cvtColor(display_img_8bit, cv2.COLOR_GRAY2RGB))
resized = self._resize_pil_to_label(img_pil, tab_widgets["image_label"])
photo = ImageTk.PhotoImage(image=resized)
tab_widgets["image_label"].config(image=photo, text="")
setattr(self, photo_attr, photo)
except Exception as e:
tab_widgets["image_label"].config(image=None, text=f"Error parsing image:\n{e}")
setattr(self, photo_attr, None)
self._display_hex_data(payload, tab_widgets["hex_view"])
def _resize_pil_to_label(self, img: "Image.Image", label_widget: ttk.Label) -> "Image.Image":
try:
width, height = label_widget.winfo_width(), label_widget.winfo_height()
if width <= 1 or height <= 1: return img
scale = min(width / img.width, height / img.height)
if scale >= 1.0: return img
new_w, new_h = int(img.width * scale), int(img.height * scale)
return img.resize((new_w, new_h), Image.LANCZOS)
except Exception: return img
def _display_hex_data(self, payload: bytearray, widget: scrolledtext.ScrolledText):
hex_dump = self._format_hex_dump(payload)
widget.config(state=tk.NORMAL)
widget.delete("1.0", tk.END)
widget.insert("1.0", hex_dump)
widget.config(state=tk.DISABLED)
def _display_json_data(self, payload: bytearray, widget: scrolledtext.ScrolledText):
try:
import json
text = json.dumps(json.loads(payload.decode("utf-8")), indent=2)
except Exception as e:
text = f"--- FAILED TO PARSE JSON ---\n{e}\n\n--- RAW HEX DUMP ---\n{self._format_hex_dump(payload)}"
widget.config(state=tk.NORMAL)
widget.delete("1.0", tk.END)
widget.insert("1.0", text)
widget.config(state=tk.DISABLED)
def _log_to_widget(self, message: str, level: str = "DEBUG"):
self.logger.info(message)
self.log_tab.config(state=tk.NORMAL)
self.log_tab.insert(tk.END, f"[{level}] {message}\n")
self.log_tab.config(state=tk.DISABLED)
self.log_tab.see(tk.END)
def _on_save_ris_csv(self):
try:
import csv
scenario_rows = [self.scenario_tree.item(iid, "values") for iid in self.scenario_tree.get_children()]
rows = [self.ris_tree.item(iid, "values") for iid in self.ris_tree.get_children()]
if not (scenario_rows or rows): self._log_to_widget("No RIS data to save.", "INFO"); return
temp_dir = os.path.join(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")), "Temp")
os.makedirs(temp_dir, exist_ok=True)
ts = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%S")
path = os.path.join(temp_dir, f"ris_data_{ts}.csv")
with open(path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if scenario_rows:
writer.writerow(["Scenario Field", "Value"])
writer.writerows(scenario_rows)
writer.writerow([])
writer.writerow(["#", "flags", "heading", "x", "y", "z"])
writer.writerows(rows)
self._log_to_widget(f"Saved RIS data to CSV: {path}", "INFO")
except Exception as e:
self._log_to_widget(f"Failed to save RIS CSV: {e}", "ERROR")
def _format_hex_dump(self, data: bytes, length=16) -> str:
lines = []
for i in range(0, len(data), length):
chunk = data[i : i + length]
hex_part = " ".join(f"{b:02X}" for b in chunk)
ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
lines.append(f"{i:08X} {hex_part:<{length*3}} |{ascii_part}|")
return "\n".join(lines)