1723 lines
74 KiB
Python
1723 lines
74 KiB
Python
# target_simulator/gui/sfp_debug_window.py
|
|
"""
|
|
Provides a Toplevel window for debugging the SFP transport layer.
|
|
This version uses a thread-safe queue for GUI updates to prevent deadlocks.
|
|
"""
|
|
|
|
import tkinter as tk
|
|
from tkinter import ttk, scrolledtext, messagebox
|
|
import logging
|
|
import datetime
|
|
import os
|
|
import ctypes
|
|
import math
|
|
import socket
|
|
from queue import Queue, Empty
|
|
from typing import Dict, Optional, Any, List
|
|
|
|
try:
|
|
from PIL import Image, ImageTk
|
|
import numpy as np
|
|
import cv2
|
|
|
|
_IMAGE_LIBS_AVAILABLE = True
|
|
except ImportError:
|
|
_IMAGE_LIBS_AVAILABLE = False
|
|
|
|
from target_simulator.core.sfp_transport import SfpTransport
|
|
from target_simulator.core.sfp_structures import ImageLeaderData, SFPHeader
|
|
from target_simulator.gui.payload_router import DebugPayloadRouter
|
|
from target_simulator.core.models import Target, Waypoint, ManeuverType, KNOTS_TO_FPS
|
|
from target_simulator.core import command_builder
|
|
|
|
# Module-level logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
DEF_TEST_ID = 1
|
|
DEF_TEST_RANGE = 30.0
|
|
DEF_TEST_AZIMUTH = 10.0
|
|
DEF_TEST_VELOCITY = 300.0
|
|
DEF_TEST_HEADING = 0.0
|
|
DEF_TEST_ALTITUDE = 10000.0
|
|
|
|
|
|
class SfpDebugWindow(tk.Toplevel):
|
|
"""Top-level window for SFP debugging and payload inspection."""
|
|
|
|
GUI_POLL_INTERVAL_MS = 100
|
|
|
|
def __init__(self, master=None):
|
|
super().__init__(master)
|
|
self.master = master
|
|
self.geometry("1100x700")
|
|
self.protocol("WM_DELETE_WINDOW", self._on_close)
|
|
self.logger = logger
|
|
|
|
self.debug_update_queue = Queue()
|
|
|
|
self.shared_communicator = getattr(self.master, "target_communicator", None)
|
|
self.payload_router: Optional[DebugPayloadRouter] = None
|
|
if self.shared_communicator:
|
|
router_attr = getattr(self.shared_communicator, "router", None)
|
|
if callable(router_attr):
|
|
self.payload_router = router_attr()
|
|
else:
|
|
self.payload_router = router_attr
|
|
|
|
if self.shared_communicator:
|
|
self.shared_communicator.add_connection_state_callback(
|
|
self._update_toggle_state
|
|
)
|
|
|
|
if self.payload_router:
|
|
self.payload_router.add_ris_target_listener(self._queue_ris_target_update)
|
|
|
|
self.image_area_size = 150
|
|
self.ip_var = tk.StringVar(value="127.0.0.1")
|
|
self.local_port_var = tk.StringVar(value="60002")
|
|
self.server_port_var = tk.StringVar(value="60003")
|
|
self.script_var = tk.StringVar(value="print('hello from client')")
|
|
self.tgt_id_var = tk.IntVar(value=DEF_TEST_ID)
|
|
self.tgt_range_var = tk.DoubleVar(value=DEF_TEST_RANGE)
|
|
self.tgt_az_var = tk.DoubleVar(value=DEF_TEST_AZIMUTH)
|
|
self.tgt_alt_var = tk.DoubleVar(value=DEF_TEST_ALTITUDE)
|
|
self.tgt_vel_var = tk.DoubleVar(value=DEF_TEST_VELOCITY)
|
|
self.tgt_hdg_var = tk.DoubleVar(value=DEF_TEST_HEADING)
|
|
self.tgt_active_var = tk.BooleanVar(value=True)
|
|
self.tgt_traceable_var = tk.BooleanVar(value=True)
|
|
self.tgt_restart_var = tk.BooleanVar(value=False)
|
|
# Send mode: either legacy command-line strings or JSON info payloads
|
|
self.send_mode_var = tk.StringVar(value="cmd")
|
|
# View settings for RIS/status presentation
|
|
self.scenario_view_mode = tk.StringVar(value="simplified")
|
|
self.simplified_decimals = tk.IntVar(value=2)
|
|
self._master_mode_names = [
|
|
"idle_master_mode",
|
|
"int_bit_master_mode",
|
|
"gm_master_mode",
|
|
"dbs_master_mode",
|
|
"rws_master_mode",
|
|
"vs_master_mode",
|
|
"acm_master_mode",
|
|
"tws_master_mode",
|
|
"sea_low_master_mode",
|
|
"sea_high_master_mode",
|
|
"gmti_master_mode",
|
|
"bcn_master_mode",
|
|
"sam_master_mode",
|
|
"ta_master_mode",
|
|
"wa_master_mode",
|
|
"stt_master_mode",
|
|
"dtt_master_mode",
|
|
"sstt_master_mode",
|
|
"acq_master_mode",
|
|
"ftt_master_mode",
|
|
"agr_master_mode",
|
|
"sar_master_mode",
|
|
"invalid_master_mode_",
|
|
"xtst_dummy_mode",
|
|
"xtst_hw_validation_mode",
|
|
"boot_master_mode",
|
|
"master_mode_id_cardinality_",
|
|
]
|
|
|
|
self._create_widgets()
|
|
|
|
# Initialize connection fields from the central connection settings
|
|
try:
|
|
cfg = None
|
|
# Prefer the master's runtime connection_config if available
|
|
if (
|
|
hasattr(self.master, "connection_config")
|
|
and self.master.connection_config
|
|
):
|
|
cfg = self.master.connection_config.get("target", {})
|
|
else:
|
|
# Fallback to ConfigManager if present on master
|
|
gm = getattr(self.master, "config_manager", None)
|
|
if gm:
|
|
cfg = gm.get_connection_settings().get("target", {})
|
|
|
|
if cfg is not None:
|
|
sfp_cfg = cfg.get("sfp", {})
|
|
if sfp_cfg:
|
|
ip = sfp_cfg.get("ip") or sfp_cfg.get("host")
|
|
if ip:
|
|
self.ip_var.set(str(ip))
|
|
port = sfp_cfg.get("port")
|
|
if port is not None:
|
|
self.server_port_var.set(str(port))
|
|
local = sfp_cfg.get("local_port")
|
|
if local is not None:
|
|
self.local_port_var.set(str(local))
|
|
except Exception:
|
|
# Do not prevent window from opening on any config read error
|
|
self.logger.debug(
|
|
"Could not initialise SFP debug connection fields from central config",
|
|
exc_info=True,
|
|
)
|
|
|
|
if self.shared_communicator:
|
|
self._update_toggle_state(self.shared_communicator.is_open)
|
|
|
|
# Allow overriding the debug window poll interval from the main settings.json.
|
|
# Support both `GUI_POLL_INTERVAL_MS` and `gui_poll_interval_ms` keys.
|
|
try:
|
|
general = None
|
|
if hasattr(self.master, "config_manager") and self.master.config_manager:
|
|
general = self.master.config_manager.get_general_settings() or {}
|
|
configured = None
|
|
if general is not None:
|
|
configured = general.get("GUI_POLL_INTERVAL_MS", general.get("gui_poll_interval_ms", None))
|
|
self.gui_poll_interval_ms = int(configured) if configured is not None else self.GUI_POLL_INTERVAL_MS
|
|
except Exception:
|
|
self.gui_poll_interval_ms = self.GUI_POLL_INTERVAL_MS
|
|
|
|
# Schedule the legacy polling loop used by tests and older UI code
|
|
self.after(self.gui_poll_interval_ms, self._process_latest_payloads)
|
|
|
|
def _queue_ris_target_update(self, targets: List[Target]):
|
|
"""Enqueue a list of RIS targets for later processing on the GUI thread.
|
|
|
|
The queue is used to transfer target lists from background threads into
|
|
the Tk mainloop safely. If the queue is full the update is dropped and
|
|
a warning is logged.
|
|
"""
|
|
try:
|
|
self.debug_update_queue.put_nowait(targets)
|
|
except Exception:
|
|
# Queue.Full may not be available across Python versions; swallow
|
|
# and log a warning instead of raising to avoid breaking debug UI.
|
|
self.logger.warning(
|
|
"SFP Debug window update queue is full or unavailable. Skipping an update."
|
|
)
|
|
|
|
def _on_close(self):
|
|
"""Cleanup callbacks and destroy the debug window.
|
|
|
|
Remove any registered callbacks from the shared communicator and
|
|
the payload router then destroy the Toplevel window so resources are
|
|
released cleanly.
|
|
"""
|
|
self.logger.info("SFP Debug Window closing.")
|
|
if self.shared_communicator:
|
|
try:
|
|
self.shared_communicator.remove_connection_state_callback(
|
|
self._update_toggle_state
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
if self.payload_router:
|
|
try:
|
|
self.payload_router.remove_ris_target_listener(
|
|
self._queue_ris_target_update
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
self.destroy()
|
|
|
|
def _process_gui_updates(self):
|
|
"""
|
|
This method runs on the GUI thread and processes all queued updates.
|
|
"""
|
|
# 1. Process target updates (no PPI here; still support queued updates)
|
|
try:
|
|
while not self.debug_update_queue.empty():
|
|
real_targets = self.debug_update_queue.get_nowait()
|
|
# kept for compatibility; this window no longer shows a PPI
|
|
self.update_ppi_targets(real_targets)
|
|
except Empty:
|
|
pass
|
|
|
|
# 2. Process other buffered payloads (for text tabs)
|
|
if self.payload_router:
|
|
try:
|
|
new_payloads = self.payload_router.get_and_clear_latest_payloads()
|
|
if new_payloads:
|
|
for flow_id, payload in new_payloads.items():
|
|
if flow_id == "MFD" and _IMAGE_LIBS_AVAILABLE:
|
|
self._display_image_data(payload, self.mfd_tab, "mfd_photo")
|
|
elif flow_id == "SAR" and _IMAGE_LIBS_AVAILABLE:
|
|
self._display_image_data(payload, self.sar_tab, "sar_photo")
|
|
elif flow_id == "BIN":
|
|
self._display_hex_data(payload, self.bin_tab)
|
|
elif flow_id == "JSON":
|
|
self._display_json_data(payload, self.json_tab)
|
|
elif flow_id == "RIS_STATUS_JSON":
|
|
try:
|
|
import json
|
|
|
|
struct = (
|
|
json.loads(payload.decode("utf-8"))
|
|
if isinstance(payload, (bytes, bytearray))
|
|
else payload
|
|
)
|
|
|
|
view_mode = self.scenario_view_mode.get()
|
|
decimals = self.simplified_decimals.get()
|
|
|
|
def to_deg(rad):
|
|
return rad * 180.0 / math.pi
|
|
|
|
def m_s_to_ft_s(ms):
|
|
return ms * 3.28084
|
|
|
|
def m_to_ft(m):
|
|
return m * 3.28084
|
|
|
|
def decimal_deg_to_dms(deg, is_lat):
|
|
d = abs(deg)
|
|
degrees = int(d)
|
|
minutes_full = (d - degrees) * 60
|
|
minutes = int(minutes_full)
|
|
seconds = (minutes_full - minutes) * 60
|
|
direction = ""
|
|
if is_lat:
|
|
direction = "N" if deg >= 0 else "S"
|
|
else:
|
|
direction = "E" if deg >= 0 else "W"
|
|
return f"{degrees}° {minutes}' {seconds:.2f}\" {direction}"
|
|
|
|
# 1. Update scenario tree
|
|
self.scenario_tree.delete(
|
|
*self.scenario_tree.get_children()
|
|
)
|
|
scenario = struct.get("scenario", {})
|
|
if scenario:
|
|
for field, value in scenario.items():
|
|
display_value = value
|
|
if view_mode == "simplified":
|
|
if field in [
|
|
"platform_azimuth",
|
|
"true_heading",
|
|
"ant_nav_az",
|
|
"ant_nav_el",
|
|
]:
|
|
display_value = (
|
|
f"{to_deg(value):.{decimals}f}°"
|
|
)
|
|
elif field in ["vx", "vy", "vz"]:
|
|
display_value = f"{m_s_to_ft_s(value):.{decimals}f} ft/s"
|
|
elif field == "baro_altitude":
|
|
display_value = (
|
|
f"{m_to_ft(value):.{decimals}f} ft"
|
|
)
|
|
elif field in ["latitude", "longitude"]:
|
|
display_value = decimal_deg_to_dms(
|
|
value, is_lat=(field == "latitude")
|
|
)
|
|
elif field == "mode" and value < len(
|
|
self._master_mode_names
|
|
):
|
|
display_value = f"{value} ({self._master_mode_names[value].replace('_master_mode', '')})"
|
|
elif isinstance(value, list):
|
|
display_value = str(value)
|
|
|
|
self.scenario_tree.insert(
|
|
"", tk.END, values=(field, display_value)
|
|
)
|
|
|
|
# 2. Update ris_tree (all targets)
|
|
self.ris_tree.delete(*self.ris_tree.get_children())
|
|
targets = struct.get("targets", [])
|
|
for i, t in enumerate(targets):
|
|
flags_val = t.get("flags", 0)
|
|
flags_display = f"0x{flags_val:X}"
|
|
|
|
if view_mode == "simplified":
|
|
heading = f"{to_deg(t.get('heading', 0.0)):.{decimals}f}°"
|
|
x_pos = f"{m_to_ft(t.get('x', 0.0)):.{decimals}f} ft"
|
|
y_pos = f"{m_to_ft(t.get('y', 0.0)):.{decimals}f} ft"
|
|
z_pos = f"{m_to_ft(t.get('z', 0.0)):.{decimals}f} ft"
|
|
else:
|
|
heading = f"{t.get('heading', 0.0):.6f}"
|
|
x_pos = f"{t.get('x', 0.0):.3f}"
|
|
y_pos = f"{t.get('y', 0.0):.3f}"
|
|
z_pos = f"{t.get('z', 0.0):.3f}"
|
|
|
|
vals = (
|
|
i,
|
|
flags_display,
|
|
heading,
|
|
x_pos,
|
|
y_pos,
|
|
z_pos,
|
|
)
|
|
self.ris_tree.insert("", tk.END, values=vals)
|
|
|
|
except Exception:
|
|
self.logger.exception(
|
|
"Failed to update RIS tables from JSON payload."
|
|
)
|
|
|
|
except Exception:
|
|
self.logger.exception(
|
|
"Error while fetching latest payloads from router"
|
|
)
|
|
|
|
# (raw packet display logic remains unchanged)
|
|
if self.payload_router:
|
|
try:
|
|
raw_pkt = self.payload_router.get_and_clear_raw_packet()
|
|
if raw_pkt:
|
|
raw_bytes, addr = raw_pkt
|
|
self._display_raw_packet(raw_bytes, addr)
|
|
self._refresh_history_tree()
|
|
except Exception:
|
|
self.logger.exception("Error while fetching raw packet from router")
|
|
|
|
# use configured poll interval if available
|
|
try:
|
|
self.after(self.gui_poll_interval_ms, self._process_gui_updates)
|
|
except Exception:
|
|
self.after(self.GUI_POLL_INTERVAL_MS, self._process_gui_updates)
|
|
|
|
def _create_widgets(self):
|
|
"""Create and lay out all child widgets for the debug window.
|
|
|
|
This method builds the connection controls, quick target sender, script
|
|
area and the notebook which contains raw/log/RIS/history/image views.
|
|
"""
|
|
top_controls_frame = ttk.Frame(self)
|
|
top_controls_frame.pack(side=tk.TOP, fill=tk.X, padx=5, pady=5)
|
|
conn_frame = ttk.Frame(top_controls_frame)
|
|
conn_frame.pack(side=tk.TOP, fill=tk.X, pady=(0, 5))
|
|
self._create_connection_widgets(conn_frame)
|
|
target_sender_frame = ttk.LabelFrame(
|
|
top_controls_frame, text="Simple Target Sender"
|
|
)
|
|
target_sender_frame.pack(side=tk.TOP, fill=tk.X, pady=(5, 5))
|
|
self._create_target_sender_widgets(target_sender_frame)
|
|
script_frame = ttk.LabelFrame(top_controls_frame, text="Script to send")
|
|
script_frame.pack(side=tk.TOP, fill=tk.X, pady=(0, 5))
|
|
self._create_script_sender_widgets(script_frame)
|
|
self.notebook = ttk.Notebook(self)
|
|
self.notebook.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
|
self._create_notebook_tabs()
|
|
|
|
def update_ppi_targets(self, targets: List[Target]):
|
|
"""Compatibility shim for older code that expected a PPI update API.
|
|
|
|
The debug window no longer renders a PPI; this method intentionally
|
|
performs no action but preserves the call signature for callers.
|
|
"""
|
|
return
|
|
|
|
def _create_connection_widgets(self, parent):
|
|
"""Build the small connection controls (IP, ports, connect toggle).
|
|
|
|
These widgets are intentionally compact to fit in the debug window's
|
|
top control area.
|
|
"""
|
|
ttk.Label(parent, text="IP:").pack(side=tk.LEFT, padx=(4, 2))
|
|
ttk.Entry(parent, textvariable=self.ip_var, width=18).pack(
|
|
side=tk.LEFT, padx=(0, 6)
|
|
)
|
|
ttk.Label(parent, text="Local Port:").pack(side=tk.LEFT, padx=(0, 2))
|
|
ttk.Entry(parent, textvariable=self.local_port_var, width=8).pack(
|
|
side=tk.LEFT, padx=(0, 6)
|
|
)
|
|
ttk.Label(parent, text="Server Port:").pack(side=tk.LEFT, padx=(0, 2))
|
|
ttk.Entry(parent, textvariable=self.server_port_var, width=8).pack(
|
|
side=tk.LEFT, padx=(0, 6)
|
|
)
|
|
self.connect_toggle_btn = ttk.Button(
|
|
parent, text="Connect", command=self._on_toggle_connect
|
|
)
|
|
self.connect_toggle_btn.pack(side=tk.LEFT, padx=(0, 6))
|
|
self.send_probe_btn = ttk.Button(
|
|
parent, text="Send Probe", command=self._on_send_probe
|
|
)
|
|
self.send_probe_btn.pack(side=tk.LEFT, padx=(6, 4))
|
|
|
|
def _create_target_sender_widgets(self, parent):
|
|
"""Create the quick target sender UI (fields + CMD/JSON quick actions).
|
|
|
|
The left side contains editable numeric inputs for a temporary target
|
|
and the right side exposes legacy CMD buttons and JSON quick-actions.
|
|
"""
|
|
# Create a horizontal split: left 60% for editable fields, right 40% for
|
|
# a notebook containing CMD and JSON quick-action tabs.
|
|
paned = ttk.Panedwindow(parent, orient=tk.HORIZONTAL)
|
|
paned.pack(fill=tk.X, expand=False)
|
|
|
|
left = ttk.Frame(paned, padding=5)
|
|
right = ttk.Frame(paned, padding=5)
|
|
paned.add(left, weight=3)
|
|
paned.add(right, weight=2)
|
|
|
|
# Left side: editable fields laid out in a grid (keeps existing controls)
|
|
grid = ttk.Frame(left)
|
|
grid.pack(fill=tk.X)
|
|
grid.columnconfigure(1, pad=15)
|
|
grid.columnconfigure(3, pad=15)
|
|
grid.columnconfigure(5, pad=15)
|
|
ttk.Label(grid, text="ID:").grid(row=0, column=0, sticky=tk.W)
|
|
ttk.Spinbox(grid, from_=0, to=15, textvariable=self.tgt_id_var, width=8).grid(
|
|
row=1, column=0, sticky=tk.W
|
|
)
|
|
ttk.Label(grid, text="Range (NM):").grid(row=0, column=1, sticky=tk.W)
|
|
ttk.Spinbox(
|
|
grid, from_=0, to=500, textvariable=self.tgt_range_var, width=10
|
|
).grid(row=1, column=1, sticky=tk.W)
|
|
ttk.Label(grid, text="Azimuth (°):").grid(row=0, column=2, sticky=tk.W)
|
|
ttk.Spinbox(
|
|
grid, from_=-180, to=180, textvariable=self.tgt_az_var, width=10
|
|
).grid(row=1, column=2, sticky=tk.W)
|
|
ttk.Label(grid, text="Velocity (kn):").grid(row=0, column=3, sticky=tk.W)
|
|
ttk.Spinbox(
|
|
grid, from_=0, to=2000, textvariable=self.tgt_vel_var, width=10
|
|
).grid(row=1, column=3, sticky=tk.W)
|
|
ttk.Label(grid, text="Heading (°):").grid(row=0, column=4, sticky=tk.W)
|
|
ttk.Spinbox(
|
|
grid, from_=0, to=360, textvariable=self.tgt_hdg_var, width=10
|
|
).grid(row=1, column=4, sticky=tk.W)
|
|
ttk.Label(grid, text="Altitude (ft):").grid(row=0, column=5, sticky=tk.W)
|
|
ttk.Spinbox(
|
|
grid, from_=0, to=80000, textvariable=self.tgt_alt_var, width=12
|
|
).grid(row=1, column=5, sticky=tk.W)
|
|
# Flags / options row under the numeric fields (keeps numeric inputs on one line)
|
|
flags_frame = ttk.Frame(grid)
|
|
# place flags_frame below the spinboxes, spanning the visible columns
|
|
flags_frame.grid(row=2, column=0, columnspan=7, sticky="w", pady=(8, 0))
|
|
ttk.Checkbutton(flags_frame, text="Active", variable=self.tgt_active_var).pack(
|
|
side=tk.LEFT, anchor="w"
|
|
)
|
|
ttk.Checkbutton(
|
|
flags_frame, text="Traceable", variable=self.tgt_traceable_var
|
|
).pack(side=tk.LEFT, anchor="w", padx=8)
|
|
ttk.Checkbutton(
|
|
flags_frame, text="Restart", variable=self.tgt_restart_var
|
|
).pack(side=tk.LEFT, anchor="w", padx=8)
|
|
|
|
# Right side: notebook with CMD and JSON tabs
|
|
notebook = ttk.Notebook(right)
|
|
notebook.pack(fill=tk.BOTH, expand=True)
|
|
|
|
cmd_tab = ttk.Frame(notebook)
|
|
json_tab = ttk.Frame(notebook)
|
|
notebook.add(cmd_tab, text="CMD")
|
|
notebook.add(json_tab, text="JSON")
|
|
|
|
# Ensure CMD tab is selected by default so quick CMD buttons are visible
|
|
try:
|
|
notebook.select(cmd_tab)
|
|
except Exception:
|
|
pass
|
|
|
|
# CMD tab: legacy textual quick commands (compact grid to save vertical space)
|
|
cmd_btn_frame = ttk.Frame(cmd_tab)
|
|
# Pack to the top-left to make buttons immediately visible and reduce
|
|
# the chance they're clipped on small widths
|
|
cmd_btn_frame.pack(side=tk.TOP, anchor="w", padx=4, pady=4)
|
|
|
|
btn_specs = [
|
|
(
|
|
"tgtreset",
|
|
lambda: self._on_send_simple_command(command_builder.build_tgtreset()),
|
|
),
|
|
(
|
|
"pause",
|
|
lambda: self._on_send_simple_command(command_builder.build_pause()),
|
|
),
|
|
(
|
|
"continue",
|
|
lambda: self._on_send_simple_command(command_builder.build_continue()),
|
|
),
|
|
(
|
|
"Send Target",
|
|
lambda: (self.send_mode_var.set("cmd"), self._on_send_target()),
|
|
),
|
|
("reset (legacy)", lambda: self._on_send_reset_button()),
|
|
]
|
|
|
|
# Arrange buttons in up to 3 columns to reduce vertical height
|
|
for idx, (text, cmd) in enumerate(btn_specs):
|
|
r = idx // 3
|
|
c = idx % 3
|
|
ttk.Button(cmd_btn_frame, text=text, command=cmd, width=12).grid(
|
|
row=r, column=c, padx=6, pady=2, sticky="w"
|
|
)
|
|
|
|
# JSON tab: new JSON-style quick actions
|
|
ttk.Button(
|
|
json_tab,
|
|
text="Send Target (JSON)",
|
|
command=lambda: (self.send_mode_var.set("json"), self._on_send_target()),
|
|
).pack(side=tk.TOP, anchor="w", padx=4, pady=4)
|
|
ttk.Button(
|
|
json_tab,
|
|
text="Reset (JSON)",
|
|
command=lambda: (
|
|
self.send_mode_var.set("json"),
|
|
self._on_send_reset_button(),
|
|
),
|
|
).pack(side=tk.TOP, anchor="w", padx=4, pady=4)
|
|
# New: send a single JSON that zeroes all targets IDs 0..31
|
|
ttk.Button(
|
|
json_tab,
|
|
text="Reset IDs",
|
|
command=lambda: (self.send_mode_var.set("json"), self._on_send_reset_ids()),
|
|
).pack(side=tk.TOP, anchor="w", padx=4, pady=4)
|
|
|
|
# The explicit send-mode radio toggle is no longer necessary because
|
|
# the CMD / JSON tabs each provide send buttons for their respective
|
|
# payload types. Keep send_mode_var available for buttons to set.
|
|
|
|
def _create_notebook_tabs(self):
|
|
"""Create the notebook tabs used by the SFP Debug Window.
|
|
|
|
This method ensures the widgets referenced elsewhere (log_tab, raw_tab_text,
|
|
ris_tree, scenario_tree, history_tree, image tabs, bin/json views) exist.
|
|
"""
|
|
# Raw packet tab
|
|
raw_frame = ttk.Frame(self.notebook)
|
|
self.raw_tab_text = scrolledtext.ScrolledText(
|
|
raw_frame, height=12, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 9)
|
|
)
|
|
self.raw_tab_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
|
self.notebook.add(raw_frame, text="Raw")
|
|
|
|
# Log tab
|
|
log_frame = ttk.Frame(self.notebook)
|
|
self.log_tab = scrolledtext.ScrolledText(
|
|
log_frame, height=12, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 9)
|
|
)
|
|
self.log_tab.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
|
self.notebook.add(log_frame, text="Log")
|
|
|
|
# RIS tab - scenario and targets (side-by-side)
|
|
ris_frame = ttk.Frame(self.notebook)
|
|
|
|
# Controls above the paned area: view mode (raw/simplified) and decimals
|
|
controls = ttk.Frame(ris_frame)
|
|
controls.pack(side=tk.TOP, fill=tk.X, padx=5, pady=(4, 2))
|
|
ttk.Label(controls, text="View:").pack(side=tk.LEFT, padx=(2, 4))
|
|
# Use ttk.OptionMenu where possible; fallback to tk.OptionMenu in environments
|
|
# where the ttk widget encounters issues (headless/test environments).
|
|
try:
|
|
self.scenario_view_mode_opt = ttk.OptionMenu(
|
|
controls,
|
|
self.scenario_view_mode,
|
|
self.scenario_view_mode.get(),
|
|
"simplified",
|
|
"raw",
|
|
)
|
|
self.scenario_view_mode_opt.pack(side=tk.LEFT)
|
|
except Exception:
|
|
self.scenario_view_mode.set("simplified")
|
|
self.scenario_view_mode_opt = tk.OptionMenu(
|
|
controls, self.scenario_view_mode, "simplified", "raw"
|
|
)
|
|
self.scenario_view_mode_opt.pack(side=tk.LEFT)
|
|
ttk.Label(controls, text="Decimals:").pack(side=tk.LEFT, padx=(12, 4))
|
|
self.dec_spin = ttk.Spinbox(
|
|
controls, from_=0, to=6, textvariable=self.simplified_decimals, width=4
|
|
)
|
|
self.dec_spin.pack(side=tk.LEFT)
|
|
|
|
# Use a paned window to place scenario (left) and targets (right) side-by-side
|
|
paned = ttk.Panedwindow(ris_frame, orient=tk.HORIZONTAL)
|
|
paned.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
|
|
|
left = ttk.Frame(paned)
|
|
right = ttk.Frame(paned)
|
|
paned.add(left, weight=1)
|
|
paned.add(right, weight=3)
|
|
|
|
# Scenario tree on the left with scrollbar
|
|
self.scenario_tree = ttk.Treeview(
|
|
left, columns=("field", "value"), show="headings", height=12
|
|
)
|
|
self.scenario_tree.heading("field", text="Field")
|
|
self.scenario_tree.heading("value", text="Value")
|
|
# Set sensible column widths and stretching
|
|
self.scenario_tree.column("field", width=160, anchor=tk.W, stretch=False)
|
|
self.scenario_tree.column("value", width=220, anchor=tk.W, stretch=True)
|
|
scen_scroll = ttk.Scrollbar(
|
|
left, orient=tk.VERTICAL, command=self.scenario_tree.yview
|
|
)
|
|
self.scenario_tree.configure(yscrollcommand=scen_scroll.set)
|
|
scen_scroll.pack(side=tk.RIGHT, fill=tk.Y)
|
|
self.scenario_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5, pady=5)
|
|
|
|
# RIS targets table on the right with vertical and horizontal scrollbars
|
|
cols = ("index", "flags", "heading", "x", "y", "z")
|
|
self.ris_tree = ttk.Treeview(right, columns=cols, show="headings", height=16)
|
|
self.ris_tree.heading("index", text="#")
|
|
self.ris_tree.heading("flags", text="flags")
|
|
self.ris_tree.heading("heading", text="heading")
|
|
self.ris_tree.heading("x", text="x")
|
|
self.ris_tree.heading("y", text="y")
|
|
self.ris_tree.heading("z", text="z")
|
|
# Column sizing
|
|
self.ris_tree.column("index", width=40, anchor=tk.CENTER, stretch=False)
|
|
self.ris_tree.column("flags", width=60, anchor=tk.CENTER, stretch=False)
|
|
self.ris_tree.column("heading", width=80, anchor=tk.CENTER, stretch=False)
|
|
self.ris_tree.column("x", width=120, anchor=tk.E, stretch=True)
|
|
self.ris_tree.column("y", width=120, anchor=tk.E, stretch=True)
|
|
self.ris_tree.column("z", width=100, anchor=tk.E, stretch=False)
|
|
ris_v = ttk.Scrollbar(right, orient=tk.VERTICAL, command=self.ris_tree.yview)
|
|
# Some test stubs (DummyTreeview) may not implement xview; fall back to yview or no-op
|
|
ris_h_cmd = getattr(
|
|
self.ris_tree,
|
|
"xview",
|
|
getattr(self.ris_tree, "yview", lambda *a, **k: None),
|
|
)
|
|
ris_h = ttk.Scrollbar(right, orient=tk.HORIZONTAL, command=ris_h_cmd)
|
|
# Configure scroll commands: only set xscrollcommand if the scrollbar supports set
|
|
xscroll_cmd = ris_h.set if hasattr(ris_h, "set") else (lambda *a, **k: None)
|
|
self.ris_tree.configure(yscrollcommand=ris_v.set, xscrollcommand=xscroll_cmd)
|
|
ris_v.pack(side=tk.RIGHT, fill=tk.Y)
|
|
ris_h.pack(side=tk.BOTTOM, fill=tk.X)
|
|
self.ris_tree.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
|
|
|
self.notebook.add(ris_frame, text="RIS")
|
|
|
|
# History tab
|
|
history_frame = ttk.Frame(self.notebook)
|
|
self.history_tree = ttk.Treeview(
|
|
history_frame,
|
|
columns=("time", "flow", "tid", "size"),
|
|
show="headings",
|
|
height=8,
|
|
)
|
|
self.history_tree.heading("time", text="Time")
|
|
self.history_tree.heading("flow", text="Flow")
|
|
self.history_tree.heading("tid", text="TID")
|
|
self.history_tree.heading("size", text="Size")
|
|
self.history_tree.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
|
btn_frame = ttk.Frame(history_frame)
|
|
btn_frame.pack(fill=tk.X, padx=5, pady=5)
|
|
ttk.Button(btn_frame, text="Clear", command=self._on_clear_history).pack(
|
|
side=tk.LEFT
|
|
)
|
|
ttk.Button(
|
|
btn_frame, text="Settings", command=self._open_history_settings_dialog
|
|
).pack(side=tk.LEFT, padx=4)
|
|
self.notebook.add(history_frame, text="History")
|
|
|
|
# Image tabs (MFD, SAR)
|
|
self.mfd_tab = self._create_image_tab("MFD")
|
|
self.notebook.add(self.mfd_tab["frame"], text="MFD")
|
|
self.sar_tab = self._create_image_tab("SAR")
|
|
self.notebook.add(self.sar_tab["frame"], text="SAR")
|
|
|
|
# Binary and JSON viewers
|
|
bin_frame = ttk.Frame(self.notebook)
|
|
self.bin_tab = scrolledtext.ScrolledText(
|
|
bin_frame, height=12, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 9)
|
|
)
|
|
self.bin_tab.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
|
self.notebook.add(bin_frame, text="BIN")
|
|
|
|
json_frame = ttk.Frame(self.notebook)
|
|
self.json_tab = scrolledtext.ScrolledText(
|
|
json_frame, height=12, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 9)
|
|
)
|
|
self.json_tab.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
|
self.notebook.add(json_frame, text="JSON")
|
|
|
|
def _create_script_sender_widgets(self, parent):
|
|
"""Create a small UI to enter and send ad-hoc script/commands."""
|
|
frame = ttk.Frame(parent, padding=4)
|
|
frame.pack(fill=tk.X)
|
|
ttk.Label(frame, text="Script:").pack(side=tk.LEFT, padx=(0, 4))
|
|
entry = ttk.Entry(frame, textvariable=self.script_var, width=72)
|
|
entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
|
|
send_btn = ttk.Button(frame, text="Send", command=self._on_send_script)
|
|
send_btn.pack(side=tk.LEFT, padx=(6, 0))
|
|
|
|
def _on_send_target(self):
|
|
if not self.shared_communicator or not self.shared_communicator.is_open:
|
|
self._log_to_widget("ERROR: Cannot send target, not connected.", "ERROR")
|
|
messagebox.showerror(
|
|
"Connection Error", "Communicator is not connected.", parent=self
|
|
)
|
|
return
|
|
|
|
try:
|
|
target_id = self.tgt_id_var.get()
|
|
range_nm = self.tgt_range_var.get()
|
|
az_deg = self.tgt_az_var.get()
|
|
alt_ft = self.tgt_alt_var.get()
|
|
vel_kn = self.tgt_vel_var.get()
|
|
hdg_deg = self.tgt_hdg_var.get()
|
|
is_active = self.tgt_active_var.get()
|
|
is_traceable = self.tgt_traceable_var.get()
|
|
is_restart = self.tgt_restart_var.get()
|
|
|
|
vel_fps = vel_kn * KNOTS_TO_FPS
|
|
|
|
initial_waypoint = Waypoint(
|
|
maneuver_type=ManeuverType.FLY_TO_POINT,
|
|
target_range_nm=range_nm,
|
|
target_azimuth_deg=az_deg,
|
|
target_altitude_ft=alt_ft,
|
|
target_velocity_fps=vel_fps,
|
|
target_heading_deg=hdg_deg,
|
|
)
|
|
temp_target = Target(target_id=target_id, trajectory=[initial_waypoint])
|
|
temp_target.active = is_active
|
|
temp_target.traceable = is_traceable
|
|
temp_target.restart = is_restart
|
|
temp_target.current_range_nm = range_nm
|
|
temp_target.current_azimuth_deg = az_deg
|
|
temp_target.current_velocity_fps = vel_fps
|
|
temp_target.current_heading_deg = hdg_deg
|
|
temp_target.current_altitude_ft = alt_ft
|
|
|
|
# Decide whether to send legacy command string or JSON info payload
|
|
if self.send_mode_var.get() == "json":
|
|
try:
|
|
json_payload = command_builder.build_json_update([temp_target])
|
|
# Optionally save debug copy if communicator exposes the helper
|
|
if hasattr(self.shared_communicator, "_save_json_payload_to_temp"):
|
|
try:
|
|
self.shared_communicator._save_json_payload_to_temp(
|
|
json_payload, "sfp_debug_send"
|
|
)
|
|
except Exception:
|
|
pass
|
|
if not json_payload.endswith("\n"):
|
|
json_payload = json_payload + "\n"
|
|
self._log_to_widget(
|
|
f"Built JSON payload for target {target_id}.", "INFO"
|
|
)
|
|
success = self.shared_communicator._send_single_command(
|
|
json_payload
|
|
)
|
|
if success:
|
|
self._log_to_widget(
|
|
f"Successfully sent JSON payload for target {target_id}.",
|
|
"INFO",
|
|
)
|
|
else:
|
|
self._log_to_widget(
|
|
f"Failed to send JSON payload for target {target_id}.",
|
|
"ERROR",
|
|
)
|
|
except Exception:
|
|
self.logger.exception(
|
|
"Failed to build/send JSON payload in debug window"
|
|
)
|
|
self._log_to_widget(
|
|
"ERROR: Failed to build/send JSON payload.", "ERROR"
|
|
)
|
|
else:
|
|
command_str = command_builder.build_tgtset_from_target_state(
|
|
temp_target, include_flags=True
|
|
)
|
|
# Normalize/prefix legacy textual commands so the server receives the
|
|
# expected leading '$' and a trailing newline. Also log the final
|
|
# payload shown exactly as sent.
|
|
command_str = self._ensure_legacy_prefixed(command_str)
|
|
self._log_to_widget(f"Built command: {command_str!r}", "INFO")
|
|
|
|
success = self.shared_communicator._send_single_command(command_str)
|
|
|
|
if success:
|
|
self._log_to_widget(
|
|
f"Successfully sent command for target {target_id}.", "INFO"
|
|
)
|
|
else:
|
|
self._log_to_widget(
|
|
f"Failed to send command for target {target_id}.", "ERROR"
|
|
)
|
|
|
|
except (ValueError, tk.TclError) as e:
|
|
error_msg = f"Invalid input value: {e}"
|
|
self._log_to_widget(f"ERROR: {error_msg}", "ERROR")
|
|
messagebox.showerror("Input Error", error_msg, parent=self)
|
|
except Exception as e:
|
|
self.logger.exception("An unexpected error occurred in _on_send_target.")
|
|
self._log_to_widget(f"ERROR: {e}", "CRITICAL")
|
|
messagebox.showerror("Unexpected Error", str(e), parent=self)
|
|
|
|
def _create_image_tab(self, title: str) -> Dict:
|
|
frame = ttk.Frame(self.notebook)
|
|
image_container = ttk.Frame(
|
|
frame,
|
|
width=self.image_area_size,
|
|
height=self.image_area_size,
|
|
relief=tk.SUNKEN,
|
|
)
|
|
image_container.pack(pady=5, padx=5)
|
|
image_container.pack_propagate(False)
|
|
image_label = ttk.Label(
|
|
image_container, text=f"Waiting for {title}...", anchor=tk.CENTER
|
|
)
|
|
image_label.pack(fill=tk.BOTH, expand=True)
|
|
hex_view = scrolledtext.ScrolledText(
|
|
frame, height=8, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 9)
|
|
)
|
|
hex_view.pack(fill=tk.BOTH, expand=True, pady=5, padx=5)
|
|
return {
|
|
"frame": frame,
|
|
"image_label": image_label,
|
|
"hex_view": hex_view,
|
|
"image_container": image_container,
|
|
}
|
|
|
|
def _open_image_size_dialog(self):
|
|
dlg = tk.Toplevel(self)
|
|
dlg.title("Image Size")
|
|
dlg.transient(self)
|
|
dlg.grab_set()
|
|
ttk.Label(dlg, text="Image area size (px):").pack(padx=10, pady=(10, 2))
|
|
size_var = tk.StringVar(value=str(self.image_area_size))
|
|
entry = ttk.Entry(dlg, textvariable=size_var, width=8)
|
|
entry.pack(padx=10, pady=(0, 10))
|
|
btn_frame = ttk.Frame(dlg)
|
|
btn_frame.pack(padx=10, pady=(0, 10))
|
|
|
|
def on_save():
|
|
try:
|
|
v = int(size_var.get())
|
|
if v <= 0:
|
|
raise ValueError()
|
|
except Exception:
|
|
messagebox.showerror(
|
|
"Invalid value",
|
|
"Please enter a positive integer for image size.",
|
|
parent=dlg,
|
|
)
|
|
return
|
|
self.image_area_size = v
|
|
for tab in (getattr(self, "mfd_tab", None), getattr(self, "sar_tab", None)):
|
|
if tab and "image_container" in tab:
|
|
tab["image_container"].config(width=v, height=v)
|
|
gm = getattr(self.master, "config_manager", None)
|
|
if gm:
|
|
general = gm.get_general_settings() or {}
|
|
image_display = general.get("image_display", {})
|
|
image_display["size"] = v
|
|
general["image_display"] = image_display
|
|
gm.save_general_settings(general)
|
|
dlg.destroy()
|
|
|
|
def on_cancel():
|
|
dlg.destroy()
|
|
|
|
ttk.Button(btn_frame, text="Cancel", command=on_cancel).pack(
|
|
side=tk.RIGHT, padx=(0, 5)
|
|
)
|
|
ttk.Button(btn_frame, text="Save", command=on_save).pack(side=tk.RIGHT)
|
|
|
|
def _on_toggle_connect(self):
|
|
if not self.shared_communicator:
|
|
self._log_to_widget("ERROR: No shared communicator available.", "ERROR")
|
|
messagebox.showerror(
|
|
"Error", "No shared communicator available.", parent=self
|
|
)
|
|
return
|
|
|
|
if self.shared_communicator.is_open:
|
|
self.shared_communicator.disconnect()
|
|
else:
|
|
try:
|
|
ip = self.ip_var.get()
|
|
server_port = int(self.server_port_var.get())
|
|
local_port = int(self.local_port_var.get())
|
|
config = {"ip": ip, "port": server_port, "local_port": local_port}
|
|
self.shared_communicator.connect(config)
|
|
except (ValueError, tk.TclError) as e:
|
|
self._log_to_widget(f"ERROR: Invalid connection settings: {e}", "ERROR")
|
|
messagebox.showerror(
|
|
"Input Error", f"Invalid connection settings: {e}", parent=self
|
|
)
|
|
|
|
def _on_send_probe(self):
|
|
ip = self.ip_var.get()
|
|
try:
|
|
port = int(self.server_port_var.get())
|
|
except Exception:
|
|
self._log_to_widget("ERROR: Invalid port number for probe.", "ERROR")
|
|
return
|
|
probe_payload = b"SFP_PROBE\n"
|
|
try:
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.settimeout(1.0)
|
|
sock.sendto(probe_payload, (ip, port))
|
|
sock.close()
|
|
self._log_to_widget(f"Sent probe to {ip}:{port}", "INFO")
|
|
except Exception as e:
|
|
self._log_to_widget(f"Failed to send probe to {ip}:{port}: {e}", "ERROR")
|
|
|
|
def _on_send_script(self):
|
|
if not self.shared_communicator or not self.shared_communicator.is_open:
|
|
self._log_to_widget("ERROR: Cannot send script, not connected.", "ERROR")
|
|
return
|
|
try:
|
|
command_str = self.script_var.get()
|
|
# Ensure legacy/textual script commands are prefixed with '$' as expected by server
|
|
command_str = self._ensure_legacy_prefixed(command_str)
|
|
self.shared_communicator._send_single_command(command_str)
|
|
except (ValueError, tk.TclError) as e:
|
|
self._log_to_widget(
|
|
f"ERROR: Invalid input for script sending: {e}", "ERROR"
|
|
)
|
|
|
|
def _on_send_reset_button(self):
|
|
"""Send a reset to the radar: JSON reset if JSON mode selected,
|
|
otherwise send legacy textual reset lines exactly as requested by the user.
|
|
"""
|
|
if not self.shared_communicator or not self.shared_communicator.is_open:
|
|
self._log_to_widget("ERROR: Cannot send reset, not connected.", "ERROR")
|
|
messagebox.showerror(
|
|
"Connection Error", "Communicator is not connected.", parent=self
|
|
)
|
|
return False
|
|
|
|
try:
|
|
if self.send_mode_var.get() == "json":
|
|
json_payload = '{"CMD":"reset"}'
|
|
# Ensure trailing newline
|
|
if not json_payload.endswith("\n"):
|
|
json_payload = json_payload + "\n"
|
|
self._log_to_widget("Sending JSON reset payload...", "INFO")
|
|
return self._on_send_simple_command(json_payload)
|
|
else:
|
|
# Legacy textual reset: send precisely 'mex.t_rows=80' and 'tgtset /-s' each newline-terminated
|
|
cmd1 = "mex.t_rows=80\n"
|
|
cmd2 = "tgtset /-s\n"
|
|
self._log_to_widget("Sending legacy reset sequence...", "INFO")
|
|
ok1 = self._on_send_simple_command(cmd1)
|
|
ok2 = self._on_send_simple_command(cmd2)
|
|
return bool(ok1 and ok2)
|
|
except Exception:
|
|
self.logger.exception("Failed to send reset command from debug window")
|
|
self._log_to_widget("ERROR: Failed to send reset.", "ERROR")
|
|
return False
|
|
|
|
def _on_send_reset_ids(self):
|
|
"""Build and send a single JSON payload that contains targets 0..31
|
|
with all numeric fields zeroed and flags off. This is a client-side
|
|
'hard reset' for the simulation when the server does not implement a
|
|
dedicated reset command.
|
|
"""
|
|
if not self.shared_communicator or not self.shared_communicator.is_open:
|
|
self._log_to_widget("ERROR: Cannot send reset IDs, not connected.", "ERROR")
|
|
messagebox.showerror(
|
|
"Connection Error", "Communicator is not connected.", parent=self
|
|
)
|
|
return False
|
|
|
|
try:
|
|
targets = []
|
|
for tid in range(32):
|
|
# Create a minimal waypoint with zeros
|
|
wp = Waypoint(
|
|
maneuver_type=ManeuverType.FLY_TO_POINT,
|
|
target_range_nm=0.0,
|
|
target_azimuth_deg=0.0,
|
|
target_altitude_ft=0.0,
|
|
target_velocity_fps=0.0,
|
|
target_heading_deg=0.0,
|
|
)
|
|
t = Target(target_id=tid, trajectory=[wp])
|
|
# Ensure all flags/fields are off/zero
|
|
t.active = False
|
|
t.traceable = False
|
|
t.restart = False
|
|
t.current_range_nm = 0.0
|
|
t.current_azimuth_deg = 0.0
|
|
t.current_velocity_fps = 0.0
|
|
t.current_heading_deg = 0.0
|
|
t.current_altitude_ft = 0.0
|
|
targets.append(t)
|
|
|
|
# We have a transport limit (MAX_BYTES). Build compact JSONs and
|
|
# split into the largest batches that fit the limit.
|
|
import json as _json
|
|
|
|
MAX_BYTES = 1020
|
|
|
|
def build_compact_payload(tlist):
|
|
s = command_builder.build_json_update(tlist)
|
|
try:
|
|
obj = _json.loads(s)
|
|
compact = _json.dumps(obj, separators=(",", ":"))
|
|
except Exception:
|
|
# Fallback: remove common whitespace
|
|
compact = s.replace("\n", "").replace("\r", "")
|
|
return compact
|
|
|
|
n = len(targets)
|
|
# Find the largest batch size that fits in MAX_BYTES for the first batch
|
|
batch_size = n
|
|
while batch_size > 0:
|
|
payload = build_compact_payload(targets[:batch_size])
|
|
if len(payload.encode("utf-8")) <= MAX_BYTES:
|
|
break
|
|
batch_size -= 1
|
|
|
|
if batch_size == 0:
|
|
self._log_to_widget(
|
|
"ERROR: Cannot fit even a single target into transport limit.",
|
|
"ERROR",
|
|
)
|
|
return False
|
|
|
|
# Send in batches of batch_size
|
|
i = 0
|
|
overall_ok = True
|
|
first_payload_saved = False
|
|
while i < n:
|
|
j = min(i + batch_size, n)
|
|
payload = build_compact_payload(targets[i:j])
|
|
if not payload.endswith("\n"):
|
|
payload = payload + "\n"
|
|
|
|
# Optionally save only the first payload for debugging
|
|
if (not first_payload_saved) and hasattr(
|
|
self.shared_communicator, "_save_json_payload_to_temp"
|
|
):
|
|
try:
|
|
self.shared_communicator._save_json_payload_to_temp(
|
|
payload, f"sfp_debug_reset_ids_part_{i}"
|
|
)
|
|
first_payload_saved = True
|
|
except Exception:
|
|
pass
|
|
|
|
self._log_to_widget(
|
|
f"Sending Reset IDs JSON payload part {i}-{j-1}...", "INFO"
|
|
)
|
|
ok = self.shared_communicator._send_single_command(payload)
|
|
overall_ok = overall_ok and bool(ok)
|
|
if not ok:
|
|
self._log_to_widget(
|
|
f"Failed to send Reset IDs payload part {i}-{j-1}.", "ERROR"
|
|
)
|
|
i = j
|
|
|
|
if overall_ok:
|
|
self._log_to_widget(
|
|
"Successfully sent all Reset IDs JSON payloads.", "INFO"
|
|
)
|
|
else:
|
|
self._log_to_widget(
|
|
"One or more Reset IDs JSON payloads failed.", "ERROR"
|
|
)
|
|
return overall_ok
|
|
except Exception:
|
|
self.logger.exception("Failed to build/send Reset IDs payload")
|
|
self._log_to_widget("ERROR: Failed to send Reset IDs.", "ERROR")
|
|
return False
|
|
|
|
def _on_send_simple_command(self, command_str: str):
|
|
if not self.shared_communicator or not self.shared_communicator.is_open:
|
|
self._log_to_widget("ERROR: Cannot send command, not connected.", "ERROR")
|
|
messagebox.showerror(
|
|
"Connection Error", "Communicator is not connected.", parent=self
|
|
)
|
|
return False
|
|
|
|
try:
|
|
# Normalize legacy textual commands (prefix with $) before sending.
|
|
command_str = self._ensure_legacy_prefixed(command_str)
|
|
success = self.shared_communicator._send_single_command(command_str)
|
|
if success:
|
|
self._log_to_widget(f"Successfully sent command: {command_str}", "INFO")
|
|
else:
|
|
self._log_to_widget(f"Failed to send command: {command_str}", "ERROR")
|
|
return success
|
|
except Exception as e:
|
|
self.logger.exception("Unexpected error in _on_send_simple_command")
|
|
self._log_to_widget(f"ERROR: {e}", "ERROR")
|
|
return False
|
|
|
|
def _on_send_tgtset(self):
|
|
"""Send a tgtset for the current fields.
|
|
|
|
The UI refactor accidentally duplicated widget-creation code into this
|
|
method which referenced a `parent` variable that doesn't exist here.
|
|
Keep behaviour simple: delegate to the existing _on_send_target which
|
|
builds and sends the appropriate payload based on the selected mode.
|
|
"""
|
|
try:
|
|
self._on_send_target()
|
|
return True
|
|
except Exception:
|
|
self.logger.exception("Failed while sending tgtset")
|
|
return False
|
|
|
|
def _update_toggle_state(self, connected: bool):
|
|
try:
|
|
if connected:
|
|
self.connect_toggle_btn.config(text="Disconnect")
|
|
else:
|
|
self.connect_toggle_btn.config(text="Connect")
|
|
except Exception:
|
|
pass
|
|
|
|
def update_toggle_state(self, connected: bool):
|
|
self._update_toggle_state(connected)
|
|
|
|
def _process_latest_payloads(self):
|
|
if not self.payload_router:
|
|
self.after(self.GUI_POLL_INTERVAL_MS, self._process_latest_payloads)
|
|
return
|
|
|
|
try:
|
|
new_payloads = self.payload_router.get_and_clear_latest_payloads()
|
|
except Exception:
|
|
self.logger.exception("Error while fetching latest payloads from router")
|
|
new_payloads = {}
|
|
|
|
if new_payloads:
|
|
for flow_id, payload in new_payloads.items():
|
|
if flow_id == "MFD" and _IMAGE_LIBS_AVAILABLE:
|
|
self._display_image_data(payload, self.mfd_tab, "mfd_photo")
|
|
elif flow_id == "SAR" and _IMAGE_LIBS_AVAILABLE:
|
|
self._display_image_data(payload, self.sar_tab, "sar_photo")
|
|
elif flow_id == "BIN":
|
|
self._display_hex_data(payload, self.bin_tab)
|
|
elif flow_id == "JSON":
|
|
self._display_json_data(payload, self.json_tab)
|
|
elif flow_id == "RIS_STATUS_JSON":
|
|
try:
|
|
import json
|
|
|
|
struct = (
|
|
json.loads(payload.decode("utf-8"))
|
|
if isinstance(payload, (bytes, bytearray))
|
|
else payload
|
|
)
|
|
|
|
# Respect the user's selected view mode and decimals
|
|
view_mode = self.scenario_view_mode.get()
|
|
decimals = self.simplified_decimals.get()
|
|
|
|
def to_deg(rad):
|
|
return rad * 180.0 / math.pi
|
|
|
|
def m_s_to_ft_s(ms):
|
|
return ms * 3.28084
|
|
|
|
def m_to_ft(m):
|
|
return m * 3.28084
|
|
|
|
def decimal_deg_to_dms(deg, is_lat):
|
|
d = abs(deg)
|
|
degrees = int(d)
|
|
minutes_full = (d - degrees) * 60
|
|
minutes = int(minutes_full)
|
|
seconds = (minutes_full - minutes) * 60
|
|
direction = ""
|
|
if is_lat:
|
|
direction = "N" if deg >= 0 else "S"
|
|
else:
|
|
direction = "E" if deg >= 0 else "W"
|
|
return f"{degrees}° {minutes}' {seconds:.2f}\" {direction}"
|
|
|
|
# 1. Update scenario tree
|
|
for iid in self.scenario_tree.get_children():
|
|
self.scenario_tree.delete(iid)
|
|
scenario = struct.get("scenario", {})
|
|
if scenario:
|
|
for field_name, value in scenario.items():
|
|
display_value = value
|
|
if view_mode == "simplified":
|
|
if field_name in [
|
|
"platform_azimuth",
|
|
"true_heading",
|
|
"ant_nav_az",
|
|
"ant_nav_el",
|
|
]:
|
|
try:
|
|
display_value = (
|
|
f"{to_deg(value):.{decimals}f}°"
|
|
)
|
|
except Exception:
|
|
display_value = str(value)
|
|
elif field_name in ["vx", "vy", "vz"]:
|
|
try:
|
|
display_value = f"{m_s_to_ft_s(value):.{decimals}f} ft/s"
|
|
except Exception:
|
|
display_value = str(value)
|
|
elif field_name == "baro_altitude":
|
|
try:
|
|
display_value = (
|
|
f"{m_to_ft(value):.{decimals}f} ft"
|
|
)
|
|
except Exception:
|
|
display_value = str(value)
|
|
elif field_name in ["latitude", "longitude"]:
|
|
try:
|
|
display_value = decimal_deg_to_dms(
|
|
value, is_lat=(field_name == "latitude")
|
|
)
|
|
except Exception:
|
|
display_value = str(value)
|
|
elif (
|
|
field_name == "mode"
|
|
and isinstance(value, int)
|
|
and value < len(self._master_mode_names)
|
|
):
|
|
display_value = f"{value} ({self._master_mode_names[value].replace('_master_mode', '')})"
|
|
elif isinstance(value, list):
|
|
display_value = str(value)
|
|
|
|
self.scenario_tree.insert(
|
|
"", tk.END, values=(field_name, display_value)
|
|
)
|
|
|
|
# 2. Update ris_tree (all targets)
|
|
for iid in self.ris_tree.get_children():
|
|
self.ris_tree.delete(iid)
|
|
targets = struct.get("targets", [])
|
|
for i, t in enumerate(targets):
|
|
flags_val = t.get("flags", 0)
|
|
flags_display = f"0x{flags_val:X}"
|
|
|
|
if view_mode == "simplified":
|
|
try:
|
|
heading = (
|
|
f"{to_deg(t.get('heading', 0.0)):.{decimals}f}°"
|
|
)
|
|
except Exception:
|
|
heading = f"{t.get('heading', 0.0):.6f}"
|
|
try:
|
|
x_pos = (
|
|
f"{m_to_ft(t.get('x', 0.0)):.{decimals}f} ft"
|
|
)
|
|
y_pos = (
|
|
f"{m_to_ft(t.get('y', 0.0)):.{decimals}f} ft"
|
|
)
|
|
z_pos = (
|
|
f"{m_to_ft(t.get('z', 0.0)):.{decimals}f} ft"
|
|
)
|
|
except Exception:
|
|
x_pos = f"{t.get('x', 0.0):.3f}"
|
|
y_pos = f"{t.get('y', 0.0):.3f}"
|
|
z_pos = f"{t.get('z', 0.0):.3f}"
|
|
else:
|
|
heading = f"{t.get('heading', 0.0):.6f}"
|
|
x_pos = f"{t.get('x', 0.0):.3f}"
|
|
y_pos = f"{t.get('y', 0.0):.3f}"
|
|
z_pos = f"{t.get('z', 0.0):.3f}"
|
|
|
|
vals = (
|
|
i,
|
|
flags_display,
|
|
heading,
|
|
x_pos,
|
|
y_pos,
|
|
z_pos,
|
|
)
|
|
self.ris_tree.insert("", tk.END, values=vals)
|
|
except Exception:
|
|
self.logger.exception(
|
|
"Failed to update RIS tables from JSON payload."
|
|
)
|
|
|
|
try:
|
|
raw_pkt = (
|
|
self.payload_router.get_and_clear_raw_packet()
|
|
if self.payload_router
|
|
else None
|
|
)
|
|
if raw_pkt:
|
|
raw_bytes, addr = raw_pkt
|
|
self._display_raw_packet(raw_bytes, addr)
|
|
self._refresh_history_tree()
|
|
except Exception:
|
|
self.logger.exception("Error while fetching raw packet from router")
|
|
|
|
self.after(self.GUI_POLL_INTERVAL_MS, self._process_latest_payloads)
|
|
|
|
def _refresh_history_tree(self):
|
|
try:
|
|
if self.payload_router:
|
|
hist = self.payload_router.get_history()
|
|
self.history_tree.delete(*self.history_tree.get_children())
|
|
for i, entry in enumerate(reversed(hist)):
|
|
ts = entry["ts"].strftime("%H:%M:%S.%f")[:-3]
|
|
flow_name = entry.get("flow_name", "")
|
|
tid = entry.get("tid", "")
|
|
size = len(entry.get("raw", b""))
|
|
self.history_tree.insert(
|
|
"", tk.END, values=(ts, flow_name, tid, f"{size}B")
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
def _on_history_select(self, event=None):
|
|
try:
|
|
sel = self.history_tree.selection()
|
|
if not sel:
|
|
return
|
|
iid = sel[0]
|
|
children = list(self.history_tree.get_children())
|
|
idx = children.index(iid)
|
|
|
|
hist = self.payload_router.get_history()
|
|
rev_idx = len(hist) - 1 - idx
|
|
if not (0 <= rev_idx < len(hist)):
|
|
return
|
|
|
|
entry = hist[rev_idx]
|
|
self._display_raw_packet(entry["raw"], entry["addr"])
|
|
except Exception:
|
|
self.logger.exception("Error during history selection handling")
|
|
|
|
def _on_clear_history(self):
|
|
if self.payload_router:
|
|
self.payload_router.clear_history()
|
|
self._refresh_history_tree()
|
|
|
|
def _open_history_settings_dialog(self):
|
|
dlg = tk.Toplevel(self)
|
|
dlg.title("History Settings")
|
|
dlg.transient(self)
|
|
dlg.grab_set()
|
|
hist_size = self.payload_router._history_size if self.payload_router else 20
|
|
persist = self.payload_router._persist if self.payload_router else False
|
|
ttk.Label(dlg, text="History size (entries):").pack(padx=10, pady=(10, 2))
|
|
size_var = tk.StringVar(value=str(hist_size))
|
|
entry = ttk.Entry(dlg, textvariable=size_var, width=8)
|
|
entry.pack(padx=10, pady=(0, 10))
|
|
persist_var = tk.BooleanVar(value=bool(persist))
|
|
ttk.Checkbutton(
|
|
dlg, text="Persist raw packets to Temp/", variable=persist_var
|
|
).pack(padx=10, pady=(0, 10))
|
|
# Allow adjusting the payload router logger level from this dialog for convenience
|
|
try:
|
|
import logging as _logging
|
|
|
|
LEVEL_NAMES = ["NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
|
|
pr_logger = _logging.getLogger("target_simulator.gui.payload_router")
|
|
current_level = _logging.getLevelName(pr_logger.getEffectiveLevel())
|
|
except Exception:
|
|
LEVEL_NAMES = ["NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
|
|
current_level = "INFO"
|
|
ttk.Label(dlg, text="Payload Router logger level:").pack(padx=10, pady=(4, 2))
|
|
logger_level_var = tk.StringVar(value=current_level)
|
|
level_combo = ttk.Combobox(
|
|
dlg,
|
|
values=LEVEL_NAMES,
|
|
textvariable=logger_level_var,
|
|
state="readonly",
|
|
width=12,
|
|
)
|
|
level_combo.pack(padx=10, pady=(0, 8))
|
|
btn_frame = ttk.Frame(dlg)
|
|
btn_frame.pack(padx=10, pady=(0, 10), fill=tk.X)
|
|
|
|
def on_save():
|
|
try:
|
|
v = int(size_var.get())
|
|
if v <= 0:
|
|
raise ValueError()
|
|
except Exception:
|
|
messagebox.showerror(
|
|
"Invalid value",
|
|
"Please enter a positive integer for history size.",
|
|
parent=dlg,
|
|
)
|
|
return
|
|
|
|
if self.payload_router:
|
|
self.payload_router.set_history_size(v)
|
|
self.payload_router.set_persist(bool(persist_var.get()))
|
|
|
|
gm = getattr(self.master, "config_manager", None)
|
|
if gm:
|
|
general = gm.get_general_settings() or {}
|
|
# Save sfp_debug section
|
|
sfp_debug = general.get("sfp_debug", {})
|
|
sfp_debug["history_size"] = v
|
|
sfp_debug["persist_raw"] = bool(persist_var.get())
|
|
general["sfp_debug"] = sfp_debug
|
|
|
|
# Persist payload_router logger level under logger_panel.saved_levels so it is applied globally
|
|
try:
|
|
lvl_name = logger_level_var.get()
|
|
if lvl_name:
|
|
lp = (
|
|
general.get("logger_panel", {})
|
|
if isinstance(general.get("logger_panel", {}), dict)
|
|
else {}
|
|
)
|
|
saved = (
|
|
lp.get("saved_levels", {})
|
|
if isinstance(lp.get("saved_levels", {}), dict)
|
|
else {}
|
|
)
|
|
saved["target_simulator.gui.payload_router"] = lvl_name
|
|
lp["saved_levels"] = saved
|
|
general["logger_panel"] = lp
|
|
# apply immediately
|
|
try:
|
|
import logging as _logging
|
|
|
|
lvl_val = _logging.getLevelName(lvl_name)
|
|
if isinstance(lvl_val, int):
|
|
_logging.getLogger(
|
|
"target_simulator.gui.payload_router"
|
|
).setLevel(lvl_val)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
gm.save_general_settings(general)
|
|
|
|
dlg.destroy()
|
|
|
|
def on_cancel():
|
|
dlg.destroy()
|
|
|
|
ttk.Button(btn_frame, text="Cancel", command=on_cancel).pack(
|
|
side=tk.RIGHT, padx=(0, 5)
|
|
)
|
|
ttk.Button(btn_frame, text="Save", command=on_save).pack(side=tk.RIGHT)
|
|
|
|
def _display_raw_packet(self, raw_bytes: bytes, addr: tuple):
|
|
try:
|
|
header_size = SFPHeader.size()
|
|
if len(raw_bytes) < header_size:
|
|
raise ValueError("Packet smaller than SFP header")
|
|
header = SFPHeader.from_buffer_copy(raw_bytes)
|
|
body = raw_bytes[header_size:]
|
|
field_list = [
|
|
"SFP_MARKER",
|
|
"SFP_DIRECTION",
|
|
"SFP_PROT_VER",
|
|
"SFP_PT_SPARE",
|
|
"SFP_TAG",
|
|
"SFP_SRC",
|
|
"SFP_FLOW",
|
|
"SFP_TID",
|
|
"SFP_FLAGS",
|
|
"SFP_WIN",
|
|
"SFP_ERR",
|
|
"SFP_ERR_INFO",
|
|
"SFP_TOTFRGAS",
|
|
"SFP_FRAG",
|
|
"SFP_RECTYPE",
|
|
"SFP_RECSPARE",
|
|
"SFP_PLDAP",
|
|
"SFP_PLEXT",
|
|
"SFP_RECCOUNTER",
|
|
"SFP_PLSIZE",
|
|
"SFP_TOTSIZE",
|
|
"SFP_PLOFFSET",
|
|
]
|
|
pairs = []
|
|
flag_val = 0
|
|
for f in field_list:
|
|
val = getattr(header, f)
|
|
if f == "SFP_FLAGS":
|
|
flag_val = val
|
|
pairs.append(
|
|
(f, f"{val} (0x{val:X})" if isinstance(val, int) else str(val))
|
|
)
|
|
out_lines = [f"From {addr}\n\nSFP Header:\n\n"]
|
|
col_width = 36
|
|
for i in range(0, len(pairs), 2):
|
|
left_text = f"{pairs[i][0]:12s}: {pairs[i][1]}"
|
|
right_text = (
|
|
f"{pairs[i+1][0]:12s}: {pairs[i+1][1]}"
|
|
if (i + 1) < len(pairs)
|
|
else ""
|
|
)
|
|
out_lines.append(f"{left_text:<{col_width}} {right_text}\n")
|
|
flag_defs = [
|
|
(0, "ACQ_REQ"),
|
|
(1, "RESENT"),
|
|
(2, "TRAILER_ACK"),
|
|
(3, "RESV3"),
|
|
(4, "RESV4"),
|
|
(5, "RESV5"),
|
|
(6, "RESV6"),
|
|
(7, "ERROR"),
|
|
]
|
|
out_lines.append(
|
|
f"SFP_FLAGS : {flag_val} (0x{flag_val:X}) "
|
|
+ "".join(
|
|
[
|
|
f" [{(name if bool((flag_val >> bit) & 1) else ' ')}]"
|
|
for bit, name in flag_defs
|
|
]
|
|
)
|
|
+ "\n"
|
|
)
|
|
out_lines.append("\nBODY (hex):\n" + self._format_hex_dump(body))
|
|
|
|
self.raw_tab_text.config(state=tk.NORMAL)
|
|
self.raw_tab_text.delete("1.0", tk.END)
|
|
self.raw_tab_text.insert("1.0", "".join(out_lines))
|
|
self.raw_tab_text.config(state=tk.DISABLED)
|
|
except Exception as e:
|
|
text = f"Failed to format raw packet: {e}\n\nRaw dump:\n{self._format_hex_dump(raw_bytes)}"
|
|
self.raw_tab_text.config(state=tk.NORMAL)
|
|
self.raw_tab_text.delete("1.0", tk.END)
|
|
self.raw_tab_text.insert("1.0", text)
|
|
self.raw_tab_text.config(state=tk.DISABLED)
|
|
|
|
def _display_image_data(
|
|
self, payload: bytearray, tab_widgets: Dict[str, Any], photo_attr: str
|
|
):
|
|
try:
|
|
if len(payload) < ctypes.sizeof(ImageLeaderData):
|
|
raise ValueError("Payload smaller than ImageLeaderData header.")
|
|
leader = ImageLeaderData.from_buffer(payload)
|
|
h, w, bpp, stride = (
|
|
leader.HEADER_DATA.DY,
|
|
leader.HEADER_DATA.DX,
|
|
leader.HEADER_DATA.BPP,
|
|
leader.HEADER_DATA.STRIDE,
|
|
)
|
|
offset = ctypes.sizeof(ImageLeaderData)
|
|
if not (h > 0 and w > 0 and bpp in [1, 2] and stride >= w):
|
|
raise ValueError(f"Invalid image dimensions")
|
|
expected_size = stride * h * bpp
|
|
if (offset + expected_size) > len(payload):
|
|
raise ValueError("Incomplete image data")
|
|
pixel_data_view = np.ndarray(
|
|
shape=(h, stride),
|
|
dtype=np.uint8 if bpp == 1 else np.uint16,
|
|
buffer=payload,
|
|
offset=offset,
|
|
)
|
|
image_data = pixel_data_view[:, :w]
|
|
display_img_8bit = cv2.normalize(
|
|
image_data, None, 0, 255, cv2.NORM_MINMAX, cv2.CV_8U
|
|
)
|
|
img_pil = Image.fromarray(
|
|
cv2.cvtColor(display_img_8bit, cv2.COLOR_GRAY2RGB)
|
|
)
|
|
resized = self._resize_pil_to_label(img_pil, tab_widgets["image_label"])
|
|
photo = ImageTk.PhotoImage(image=resized)
|
|
tab_widgets["image_label"].config(image=photo, text="")
|
|
setattr(self, photo_attr, photo)
|
|
except Exception as e:
|
|
tab_widgets["image_label"].config(
|
|
image=None, text=f"Error parsing image:\n{e}"
|
|
)
|
|
setattr(self, photo_attr, None)
|
|
self._display_hex_data(payload, tab_widgets["hex_view"])
|
|
|
|
def _resize_pil_to_label(
|
|
self, img: "Image.Image", label_widget: ttk.Label
|
|
) -> "Image.Image":
|
|
try:
|
|
width, height = label_widget.winfo_width(), label_widget.winfo_height()
|
|
if width <= 1 or height <= 1:
|
|
return img
|
|
scale = min(width / img.width, height / img.height)
|
|
if scale >= 1.0:
|
|
return img
|
|
new_w, new_h = int(img.width * scale), int(img.height * scale)
|
|
return img.resize((new_w, new_h), Image.LANCZOS)
|
|
except Exception:
|
|
return img
|
|
|
|
def _display_hex_data(self, payload: bytearray, widget: scrolledtext.ScrolledText):
|
|
hex_dump = self._format_hex_dump(payload)
|
|
widget.config(state=tk.NORMAL)
|
|
widget.delete("1.0", tk.END)
|
|
widget.insert("1.0", hex_dump)
|
|
widget.config(state=tk.DISABLED)
|
|
|
|
def _display_json_data(self, payload: bytearray, widget: scrolledtext.ScrolledText):
|
|
try:
|
|
import json
|
|
|
|
text = json.dumps(json.loads(payload.decode("utf-8")), indent=2)
|
|
except Exception as e:
|
|
text = f"--- FAILED TO PARSE JSON ---\n{e}\n\n--- RAW HEX DUMP ---\n{self._format_hex_dump(payload)}"
|
|
widget.config(state=tk.NORMAL)
|
|
widget.delete("1.0", tk.END)
|
|
widget.insert("1.0", text)
|
|
widget.config(state=tk.DISABLED)
|
|
|
|
def _log_to_widget(self, message: str, level: str = "DEBUG"):
|
|
self.logger.info(message)
|
|
self.log_tab.config(state=tk.NORMAL)
|
|
self.log_tab.insert(tk.END, f"[{level}] {message}\n")
|
|
self.log_tab.config(state=tk.DISABLED)
|
|
self.log_tab.see(tk.END)
|
|
|
|
def _ensure_legacy_prefixed(self, command: str) -> str:
|
|
"""Normalize textual/legacy commands: ensure they end with a newline.
|
|
|
|
If the string looks like JSON (starts with '{' or '['), do not alter it
|
|
other than ensuring a trailing newline. Bytes are decoded as UTF-8.
|
|
Former behavior prefixed '$' to legacy commands; that has been removed
|
|
because the DSP expects plain 'tgtset' commands.
|
|
"""
|
|
try:
|
|
if isinstance(command, (bytes, bytearray)):
|
|
s = command.decode("utf-8")
|
|
else:
|
|
s = str(command)
|
|
except Exception:
|
|
s = str(command)
|
|
|
|
s = s.strip()
|
|
if not s:
|
|
return s
|
|
|
|
# If it looks like JSON (starts with { or [), leave structure intact
|
|
# but ensure a trailing newline. For legacy textual commands, do not
|
|
# add any special prefix — just ensure newline.
|
|
if not s.endswith("\n"):
|
|
s = s + "\n"
|
|
return s
|
|
|
|
def _on_save_ris_csv(self):
|
|
"""Save the currently visible RIS and scenario tables to a CSV file.
|
|
|
|
The CSV is written under the repository's Temp/ directory and the
|
|
user is informed via the debug window log. Errors are swallowed and
|
|
reported to the log widget to avoid UI interruption.
|
|
"""
|
|
try:
|
|
import csv
|
|
|
|
scenario_rows = [
|
|
self.scenario_tree.item(iid, "values")
|
|
for iid in self.scenario_tree.get_children()
|
|
]
|
|
rows = [
|
|
self.ris_tree.item(iid, "values")
|
|
for iid in self.ris_tree.get_children()
|
|
]
|
|
if not (scenario_rows or rows):
|
|
self._log_to_widget("No RIS data to save.", "INFO")
|
|
return
|
|
temp_dir = os.path.join(
|
|
os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")),
|
|
"Temp",
|
|
)
|
|
os.makedirs(temp_dir, exist_ok=True)
|
|
ts = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%S")
|
|
path = os.path.join(temp_dir, f"ris_data_{ts}.csv")
|
|
with open(path, "w", newline="", encoding="utf-8") as f:
|
|
writer = csv.writer(f)
|
|
if scenario_rows:
|
|
writer.writerow(["Scenario Field", "Value"])
|
|
writer.writerows(scenario_rows)
|
|
writer.writerow([])
|
|
writer.writerow(["#", "flags", "heading", "x", "y", "z"])
|
|
writer.writerows(rows)
|
|
self._log_to_widget(f"Saved RIS data to CSV: {path}", "INFO")
|
|
except Exception as e:
|
|
self._log_to_widget(f"Failed to save RIS CSV: {e}", "ERROR")
|
|
|
|
def _format_hex_dump(self, data: bytes, length=16) -> str:
|
|
lines = []
|
|
for i in range(0, len(data), length):
|
|
chunk = data[i : i + length]
|
|
hex_part = " ".join(f"{b:02X}" for b in chunk)
|
|
ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
|
|
lines.append(f"{i:08X} {hex_part:<{length*3}} |{ascii_part}|")
|
|
return "\n".join(lines)
|