1734 lines
77 KiB
Python
1734 lines
77 KiB
Python
from target_simulator.gui.ppi_display import PPIDisplay
|
|
|
|
# target_simulator/gui/sfp_debug_window.py
|
|
"""
|
|
Provides a Toplevel window for debugging the SFP transport layer.
|
|
This version uses a sampling approach to handle high-frequency data streams
|
|
without overwhelming the GUI thread.
|
|
"""
|
|
|
|
import tkinter as tk
|
|
from tkinter import ttk, scrolledtext, messagebox
|
|
import logging
|
|
import threading
|
|
import collections
|
|
import datetime
|
|
import os
|
|
import ctypes
|
|
import time
|
|
import math
|
|
from typing import Dict, Callable, Optional, Any
|
|
import socket
|
|
|
|
# Third-party imports for image display
|
|
try:
|
|
from PIL import Image, ImageTk
|
|
import numpy as np
|
|
import cv2
|
|
|
|
_IMAGE_LIBS_AVAILABLE = True
|
|
except ImportError:
|
|
_IMAGE_LIBS_AVAILABLE = False
|
|
|
|
# Imports from the project structure
|
|
from target_simulator.core.sfp_transport import SfpTransport
|
|
from target_simulator.core.sfp_structures import (
|
|
ImageLeaderData,
|
|
SFPHeader,
|
|
SfpRisStatusPayload,
|
|
)
|
|
from target_simulator.gui.payload_router import DebugPayloadRouter
|
|
from target_simulator.core.models import (
|
|
Target,
|
|
Waypoint,
|
|
ManeuverType,
|
|
KNOTS_TO_FPS,
|
|
)
|
|
from target_simulator.core import command_builder
|
|
|
|
|
|
# default value for testing fdx protocolo
|
|
DEF_TEST_ID = 1
|
|
DEF_TEST_RANGE = 30.0
|
|
DEF_TEST_AZIMUTH = 10.0
|
|
DEF_TEST_VELOCITY = 300.0
|
|
DEF_TEST_HEADING = 0.0
|
|
DEF_TEST_ALTITUDE = 10000.0
|
|
|
|
|
|
class SfpDebugWindow(tk.Toplevel):
|
|
"""Top-level window for SFP debugging and payload inspection."""
|
|
|
|
GUI_POLL_INTERVAL_MS = 250
|
|
|
|
def __init__(self, master=None):
|
|
super().__init__(master)
|
|
self.master = master
|
|
try:
|
|
self.geometry("1100x700")
|
|
except Exception:
|
|
pass
|
|
self.logger = logging.getLogger(__name__)
|
|
self.payload_router = DebugPayloadRouter()
|
|
self.sfp_transport: Optional[SfpTransport] = None
|
|
self.image_area_size = 150
|
|
self._ppi_visible = False
|
|
|
|
# --- TK Variables ---
|
|
self.ip_var = tk.StringVar(value="127.0.0.1")
|
|
# Local port to bind the client socket (where server will send status)
|
|
self.local_port_var = tk.StringVar(value="60002")
|
|
# Server port where we send script/command messages
|
|
self.server_port_var = tk.StringVar(value="60001")
|
|
self.script_var = tk.StringVar(value="print('hello from client')")
|
|
|
|
# Variables for the new target sender UI
|
|
self.tgt_id_var = tk.IntVar(value=DEF_TEST_ID)
|
|
self.tgt_range_var = tk.DoubleVar(value=DEF_TEST_RANGE)
|
|
self.tgt_az_var = tk.DoubleVar(value=DEF_TEST_AZIMUTH)
|
|
self.tgt_alt_var = tk.DoubleVar(value=DEF_TEST_ALTITUDE)
|
|
self.tgt_vel_var = tk.DoubleVar(value=DEF_TEST_VELOCITY)
|
|
self.tgt_hdg_var = tk.DoubleVar(value=DEF_TEST_HEADING)
|
|
self.tgt_active_var = tk.BooleanVar(value=True)
|
|
self.tgt_traceable_var = tk.BooleanVar(value=True)
|
|
self.tgt_restart_var = tk.BooleanVar(value=False)
|
|
|
|
self._master_mode_names = [
|
|
"idle_master_mode",
|
|
"int_bit_master_mode",
|
|
"gm_master_mode",
|
|
"dbs_master_mode",
|
|
"rws_master_mode",
|
|
"vs_master_mode",
|
|
"acm_master_mode",
|
|
"tws_master_mode",
|
|
"sea_low_master_mode",
|
|
"sea_high_master_mode",
|
|
"gmti_master_mode",
|
|
"bcn_master_mode",
|
|
"sam_master_mode",
|
|
"ta_master_mode",
|
|
"wa_master_mode",
|
|
"stt_master_mode",
|
|
"dtt_master_mode",
|
|
"sstt_master_mode",
|
|
"acq_master_mode",
|
|
"ftt_master_mode",
|
|
"agr_master_mode",
|
|
"sar_master_mode",
|
|
"invalid_master_mode_",
|
|
"xtst_dummy_mode",
|
|
"xtst_hw_validation_mode",
|
|
"boot_master_mode",
|
|
"master_mode_id_cardinality_",
|
|
]
|
|
|
|
# --- UI Construction ---
|
|
self._create_widgets()
|
|
|
|
# Start the periodic GUI poll loop
|
|
try:
|
|
self.after(self.GUI_POLL_INTERVAL_MS, self._process_latest_payloads)
|
|
except Exception:
|
|
pass
|
|
|
|
def _create_widgets(self):
|
|
# --- Top Controls Container ---
|
|
top_controls_frame = ttk.Frame(self)
|
|
top_controls_frame.pack(side=tk.TOP, fill=tk.X, padx=5, pady=5)
|
|
|
|
# --- Connection Frame ---
|
|
conn_frame = ttk.Frame(top_controls_frame)
|
|
conn_frame.pack(side=tk.TOP, fill=tk.X, pady=(0, 5))
|
|
self._create_connection_widgets(conn_frame)
|
|
|
|
# --- Simple Target Sender Frame ---
|
|
target_sender_frame = ttk.LabelFrame(
|
|
top_controls_frame, text="Simple Target Sender"
|
|
)
|
|
target_sender_frame.pack(side=tk.TOP, fill=tk.X, pady=(5, 5))
|
|
self._create_target_sender_widgets(target_sender_frame)
|
|
|
|
# --- Script Sender Frame (optional, can be removed if not needed) ---
|
|
script_frame = ttk.LabelFrame(top_controls_frame, text="Script to send")
|
|
script_frame.pack(side=tk.TOP, fill=tk.X, pady=(0, 5))
|
|
self._create_script_sender_widgets(script_frame)
|
|
|
|
# --- Data Display Notebook ---
|
|
self.notebook = ttk.Notebook(self)
|
|
self.notebook.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
|
self._create_notebook_tabs()
|
|
|
|
# (PPI widget is created inside the RIS tab right pane; see _create_notebook_tabs)
|
|
|
|
def _toggle_ppi(self):
|
|
# Swap RIS table and RIS PPI container
|
|
try:
|
|
if self._ppi_visible:
|
|
# show table, hide ppi
|
|
self.ris_ppi_container.pack_forget()
|
|
self.ris_table_container.pack(fill=tk.BOTH, expand=True)
|
|
self._ppi_visible = False
|
|
self.ppi_toggle_btn.config(text="Show PPI Map")
|
|
else:
|
|
# hide table, show ppi
|
|
self.ris_table_container.pack_forget()
|
|
self.ris_ppi_container.pack(fill=tk.BOTH, expand=True)
|
|
self._ppi_visible = True
|
|
self.ppi_toggle_btn.config(text="Hide PPI Map")
|
|
except Exception:
|
|
# Fallback: if containers are missing, do nothing
|
|
self.logger.exception("Toggle PPI failed")
|
|
|
|
def update_ppi_targets(self, targets):
|
|
try:
|
|
if self.ris_ppi_widget:
|
|
self.ris_ppi_widget.update_targets(targets)
|
|
except Exception:
|
|
self.logger.exception("Failed to update RIS PPI targets")
|
|
|
|
def on_ris_status_update(self, ris_status_payload):
|
|
# Convert RIS targets to Target objects (minimal, for display)
|
|
targets = []
|
|
for i in range(getattr(ris_status_payload.tgt, "tgt", []).__len__()):
|
|
ris_tgt = ris_status_payload.tgt.tgt[i]
|
|
# Only show if valid/active (customize as needed)
|
|
if getattr(ris_tgt, "flags", 0) & 1:
|
|
t = Target(
|
|
target_id=i,
|
|
trajectory=[],
|
|
active=True,
|
|
traceable=True,
|
|
)
|
|
t.current_range_nm = (ris_tgt.x**2 + ris_tgt.y**2) ** 0.5 / 6076.12
|
|
try:
|
|
# Use atan2(y, x) so azimuth follows the conventional (x East, y North)
|
|
t.current_azimuth_deg = math.degrees(
|
|
math.atan2(ris_tgt.y, ris_tgt.x)
|
|
)
|
|
except Exception:
|
|
t.current_azimuth_deg = 0.0
|
|
t.current_altitude_ft = ris_tgt.z
|
|
t.current_heading_deg = getattr(ris_tgt, "heading", 0.0)
|
|
try:
|
|
self._log_to_widget(
|
|
f"RIS JSON heading raw[{i}]: {getattr(ris_tgt, 'heading', None)}",
|
|
"DEBUG",
|
|
)
|
|
except Exception:
|
|
pass
|
|
t.active = True
|
|
targets.append(t)
|
|
self.update_ppi_targets(targets)
|
|
|
|
def _create_connection_widgets(self, parent):
|
|
ttk.Label(parent, text="IP:").pack(side=tk.LEFT, padx=(4, 2))
|
|
ttk.Entry(parent, textvariable=self.ip_var, width=18).pack(
|
|
side=tk.LEFT, padx=(0, 6)
|
|
)
|
|
ttk.Label(parent, text="Local Port:").pack(side=tk.LEFT, padx=(0, 2))
|
|
ttk.Entry(parent, textvariable=self.local_port_var, width=8).pack(
|
|
side=tk.LEFT, padx=(0, 6)
|
|
)
|
|
ttk.Label(parent, text="Server Port:").pack(side=tk.LEFT, padx=(0, 2))
|
|
ttk.Entry(parent, textvariable=self.server_port_var, width=8).pack(
|
|
side=tk.LEFT, padx=(0, 6)
|
|
)
|
|
self.connect_btn = ttk.Button(parent, text="Connect", command=self._on_connect)
|
|
self.connect_btn.pack(side=tk.LEFT, padx=(0, 6))
|
|
self.disconnect_btn = ttk.Button(
|
|
parent, text="Disconnect", command=self._on_disconnect, state=tk.DISABLED
|
|
)
|
|
self.disconnect_btn.pack(side=tk.LEFT, padx=(0, 6))
|
|
self.send_probe_btn = ttk.Button(
|
|
parent, text="Send Probe", command=self._on_send_probe
|
|
)
|
|
self.send_probe_btn.pack(side=tk.LEFT, padx=(6, 4))
|
|
# Quick commands frame
|
|
quick_frame = ttk.Frame(parent)
|
|
quick_frame.pack(side=tk.RIGHT)
|
|
# Always prefix commands with '$' to satisfy server's mex parser
|
|
|
|
# Quick command buttons moved to the Simple Target Sender area
|
|
|
|
def _create_target_sender_widgets(self, parent):
|
|
grid = ttk.Frame(parent, padding=5)
|
|
grid.pack(fill=tk.X)
|
|
# Configure columns to have padding between them
|
|
grid.columnconfigure(1, pad=15)
|
|
grid.columnconfigure(3, pad=15)
|
|
grid.columnconfigure(5, pad=15)
|
|
|
|
# --- Column 0 ---
|
|
ttk.Label(grid, text="ID:").grid(row=0, column=0, sticky=tk.W)
|
|
ttk.Spinbox(grid, from_=0, to=15, textvariable=self.tgt_id_var, width=8).grid(
|
|
row=1, column=0, sticky=tk.W
|
|
)
|
|
|
|
# --- Column 1 ---
|
|
ttk.Label(grid, text="Range (NM):").grid(row=0, column=1, sticky=tk.W)
|
|
ttk.Spinbox(
|
|
grid, from_=0, to=500, textvariable=self.tgt_range_var, width=10
|
|
).grid(row=1, column=1, sticky=tk.W)
|
|
|
|
# --- Column 2 ---
|
|
ttk.Label(grid, text="Azimuth (°):").grid(row=0, column=2, sticky=tk.W)
|
|
ttk.Spinbox(
|
|
grid, from_=-180, to=180, textvariable=self.tgt_az_var, width=10
|
|
).grid(row=1, column=2, sticky=tk.W)
|
|
|
|
# --- Column 3 ---
|
|
ttk.Label(grid, text="Velocity (kn):").grid(row=0, column=3, sticky=tk.W)
|
|
ttk.Spinbox(
|
|
grid, from_=0, to=2000, textvariable=self.tgt_vel_var, width=10
|
|
).grid(row=1, column=3, sticky=tk.W)
|
|
|
|
# --- Column 4 ---
|
|
ttk.Label(grid, text="Heading (°):").grid(row=0, column=4, sticky=tk.W)
|
|
ttk.Spinbox(
|
|
grid, from_=0, to=360, textvariable=self.tgt_hdg_var, width=10
|
|
).grid(row=1, column=4, sticky=tk.W)
|
|
|
|
# --- Column 5 ---
|
|
ttk.Label(grid, text="Altitude (ft):").grid(row=0, column=5, sticky=tk.W)
|
|
ttk.Spinbox(
|
|
grid, from_=0, to=80000, textvariable=self.tgt_alt_var, width=12
|
|
).grid(row=1, column=5, sticky=tk.W)
|
|
|
|
# --- Column 6 (Controls) ---
|
|
controls_frame = ttk.Frame(grid)
|
|
controls_frame.grid(row=1, column=6, sticky="nsew", padx=(20, 0))
|
|
ttk.Checkbutton(
|
|
controls_frame, text="Active", variable=self.tgt_active_var
|
|
).pack(side=tk.LEFT, anchor="w")
|
|
ttk.Checkbutton(
|
|
controls_frame, text="Traceable", variable=self.tgt_traceable_var
|
|
).pack(side=tk.LEFT, anchor="w", padx=5)
|
|
ttk.Checkbutton(
|
|
controls_frame, text="Restart", variable=self.tgt_restart_var
|
|
).pack(side=tk.LEFT, anchor="w", padx=5)
|
|
|
|
send_button = ttk.Button(
|
|
controls_frame, text="Send Target", command=self._on_send_target
|
|
)
|
|
send_button.pack(side=tk.LEFT, padx=(10, 0))
|
|
# Small helper label to clarify that this debug button will include
|
|
# state qualifiers (/s /t /r) in the sent tgtset command.
|
|
ttk.Label(
|
|
controls_frame,
|
|
text="(debug: sends /s /t /r if set)",
|
|
foreground="#555555",
|
|
font=("Segoe UI", 8),
|
|
).pack(side=tk.LEFT, padx=(6, 0))
|
|
|
|
# --- Quick command buttons (moved here from connection frame) ---
|
|
quick_cmd_frame = ttk.Frame(parent)
|
|
quick_cmd_frame.pack(fill=tk.X, pady=(6, 0))
|
|
ttk.Button(
|
|
quick_cmd_frame,
|
|
text="tgtreset",
|
|
command=lambda: self._on_send_simple_command(
|
|
command_builder.build_tgtreset()
|
|
),
|
|
).pack(side=tk.LEFT, padx=4)
|
|
ttk.Button(
|
|
quick_cmd_frame,
|
|
text="pause",
|
|
command=lambda: self._on_send_simple_command(command_builder.build_pause()),
|
|
).pack(side=tk.LEFT, padx=4)
|
|
ttk.Button(
|
|
quick_cmd_frame,
|
|
text="continue",
|
|
command=lambda: self._on_send_simple_command(
|
|
command_builder.build_continue()
|
|
),
|
|
).pack(side=tk.LEFT, padx=4)
|
|
ttk.Button(
|
|
quick_cmd_frame, text="tgtset (cur)", command=lambda: self._on_send_tgtset()
|
|
).pack(side=tk.LEFT, padx=8)
|
|
# PPI toggle button (moved here)
|
|
self.ppi_toggle_btn = ttk.Button(
|
|
quick_cmd_frame, text="Show PPI Map", command=self._toggle_ppi
|
|
)
|
|
self.ppi_toggle_btn.pack(side=tk.RIGHT, padx=4)
|
|
|
|
def _create_script_sender_widgets(self, parent):
|
|
ttk.Label(parent, text="Script to send:").pack(side=tk.LEFT, padx=(5, 2))
|
|
ttk.Entry(parent, textvariable=self.script_var, width=60).pack(
|
|
side=tk.LEFT, expand=True, fill=tk.X, padx=(0, 5)
|
|
)
|
|
self.send_script_btn = ttk.Button(
|
|
parent, text="Send script", command=self._on_send_script
|
|
)
|
|
self.send_script_btn.pack(side=tk.LEFT, padx=5)
|
|
|
|
def _create_notebook_tabs(self):
|
|
# The implementation of tab creation is unchanged, so it's omitted for brevity
|
|
# but would be here in the full file.
|
|
self.log_tab = scrolledtext.ScrolledText(
|
|
self.notebook, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 9)
|
|
)
|
|
self.notebook.add(self.log_tab, text="Raw Log")
|
|
# ... (rest of the tab creation code is identical) ...
|
|
# --- (The rest of the file content remains the same as your provided version) ---
|
|
if _IMAGE_LIBS_AVAILABLE:
|
|
self.mfd_tab = self._create_image_tab("MFD Image")
|
|
self.notebook.add(self.mfd_tab["frame"], text="MFD Image")
|
|
self.sar_tab = self._create_image_tab("SAR Image")
|
|
self.notebook.add(self.sar_tab["frame"], text="SAR Image")
|
|
ris_frame = ttk.Frame(self.notebook)
|
|
paned = ttk.Panedwindow(ris_frame, orient=tk.HORIZONTAL)
|
|
paned.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
|
left = ttk.Frame(paned)
|
|
self.scenario_tree = ttk.Treeview(
|
|
left, columns=("field", "value"), show="headings", height=12
|
|
)
|
|
self.scenario_tree.heading("field", text="Field")
|
|
self.scenario_tree.heading("value", text="Value")
|
|
self.scenario_tree.column("field", width=140, anchor="w")
|
|
self.scenario_tree.column("value", width=160, anchor="w")
|
|
self.scenario_tree.pack(fill=tk.BOTH, expand=True)
|
|
paned.add(left, weight=1)
|
|
right = ttk.Frame(paned)
|
|
# Containers: one for the numeric table, one for the PPI (swapable)
|
|
self.ris_table_container = ttk.Frame(right)
|
|
self.ris_ppi_container = ttk.Frame(right)
|
|
|
|
cols = ("idx", "flags", "heading", "x", "y", "z")
|
|
self.ris_tree = ttk.Treeview(
|
|
self.ris_table_container, columns=cols, show="headings", height=12
|
|
)
|
|
for c, txt in zip(cols, ("#", "flags", "heading", "x", "y", "z")):
|
|
self.ris_tree.heading(c, text=txt)
|
|
self.ris_tree.column(c, width=70, anchor="center")
|
|
try:
|
|
style = ttk.Style()
|
|
small_font = ("Consolas", 8)
|
|
style.configure("Small.Treeview", font=small_font)
|
|
self.ris_tree.configure(style="Small.Treeview")
|
|
self.scenario_tree.configure(style="Small.Treeview")
|
|
except Exception:
|
|
pass
|
|
self.ris_tree.pack(fill=tk.BOTH, expand=True)
|
|
# Initially show table container, keep ppi container ready (hidden)
|
|
self.ris_table_container.pack(fill=tk.BOTH, expand=True)
|
|
paned.add(right, weight=2)
|
|
# Create PPI widget inside ris_ppi_container but keep it hidden
|
|
try:
|
|
# Determine trail length from the configuration manager if available
|
|
gm = getattr(self.master, "config_manager", None)
|
|
trail_len = None
|
|
try:
|
|
general = gm.get_general_settings() if gm else {}
|
|
trail_len = general.get("ppi_trail_length") if isinstance(general, dict) else None
|
|
except Exception:
|
|
trail_len = None
|
|
self.ris_ppi_widget = PPIDisplay(self.ris_ppi_container, max_range_nm=100, trail_length=trail_len)
|
|
self.ris_ppi_widget.pack(fill=tk.BOTH, expand=True)
|
|
except Exception:
|
|
self.ris_ppi_widget = None
|
|
btn_frame = ttk.Frame(ris_frame)
|
|
btn_frame.pack(fill=tk.X, padx=5, pady=(0, 5))
|
|
self.scenario_view_mode = tk.StringVar(value="simplified")
|
|
mode_frame = ttk.Frame(btn_frame)
|
|
mode_frame.pack(side=tk.LEFT, padx=(4, 0))
|
|
ttk.Label(mode_frame, text="View:").pack(side=tk.LEFT, padx=(0, 6))
|
|
ttk.Radiobutton(
|
|
mode_frame, text="Raw", value="raw", variable=self.scenario_view_mode
|
|
).pack(side=tk.LEFT)
|
|
ttk.Radiobutton(
|
|
mode_frame,
|
|
text="Simplified",
|
|
value="simplified",
|
|
variable=self.scenario_view_mode,
|
|
).pack(side=tk.LEFT)
|
|
self.simplified_decimals = tk.IntVar(value=4)
|
|
ttk.Label(mode_frame, text=" Decimals:").pack(side=tk.LEFT, padx=(8, 2))
|
|
try:
|
|
sp = tk.Spinbox(
|
|
mode_frame,
|
|
from_=0,
|
|
to=8,
|
|
width=3,
|
|
textvariable=self.simplified_decimals,
|
|
)
|
|
sp.pack(side=tk.LEFT)
|
|
except Exception:
|
|
e = ttk.Entry(mode_frame, textvariable=self.simplified_decimals, width=3)
|
|
e.pack(side=tk.LEFT)
|
|
self.ris_save_csv_btn = ttk.Button(
|
|
btn_frame, text="Save CSV", command=lambda: self._on_save_ris_csv()
|
|
)
|
|
self.ris_save_csv_btn.pack(side=tk.RIGHT)
|
|
self.notebook.add(ris_frame, text="RIS Status")
|
|
raw_frame = ttk.Frame(self.notebook)
|
|
history_frame = ttk.Frame(raw_frame, width=380)
|
|
history_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(5, 2), pady=5)
|
|
ttk.Label(history_frame, text="History (latest)").pack(anchor=tk.W, padx=4)
|
|
try:
|
|
history_font = ("Consolas", 8)
|
|
except Exception:
|
|
history_font = None
|
|
list_container = ttk.Frame(history_frame)
|
|
list_container.pack(fill=tk.BOTH, expand=True, padx=4, pady=(2, 4))
|
|
columns = ("ts", "flow", "tid", "size")
|
|
self.history_tree = ttk.Treeview(
|
|
list_container, columns=columns, show="headings", height=20
|
|
)
|
|
self.history_tree.heading("ts", text="Timestamp")
|
|
self.history_tree.heading("flow", text="Flow")
|
|
self.history_tree.heading("tid", text="TID")
|
|
self.history_tree.heading("size", text="Size")
|
|
self.history_tree.column("ts", width=100, anchor="w")
|
|
self.history_tree.column("flow", width=50, anchor="w")
|
|
self.history_tree.column("tid", width=40, anchor="center")
|
|
self.history_tree.column("size", width=50, anchor="e")
|
|
try:
|
|
style = ttk.Style()
|
|
style.configure("Small.Treeview", font=history_font)
|
|
self.history_tree.configure(style="Small.Treeview")
|
|
except Exception:
|
|
pass
|
|
self.history_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
|
|
self.history_vscroll = ttk.Scrollbar(
|
|
list_container, orient=tk.VERTICAL, command=self.history_tree.yview
|
|
)
|
|
self.history_vscroll.pack(side=tk.RIGHT, fill=tk.Y)
|
|
self.history_tree.config(yscrollcommand=self.history_vscroll.set)
|
|
hb_frame = ttk.Frame(history_frame)
|
|
hb_frame.pack(fill=tk.X, padx=4, pady=(4, 4))
|
|
self.history_settings_btn = ttk.Button(
|
|
hb_frame,
|
|
text="Settings",
|
|
command=lambda: self._open_history_settings_dialog(),
|
|
)
|
|
self.history_settings_btn.pack(side=tk.LEFT)
|
|
self.history_clear_btn = ttk.Button(
|
|
hb_frame, text="Clear", command=lambda: self._on_clear_history()
|
|
)
|
|
self.history_clear_btn.pack(side=tk.RIGHT)
|
|
self.raw_tab_text = scrolledtext.ScrolledText(
|
|
raw_frame, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 9)
|
|
)
|
|
self.raw_tab_text.pack(
|
|
side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(2, 5), pady=5
|
|
)
|
|
try:
|
|
self.notebook.insert(1, raw_frame, text="SFP Raw")
|
|
except Exception:
|
|
self.notebook.add(raw_frame, text="SFP Raw")
|
|
try:
|
|
self.raw_tab_text.tag_config(
|
|
"flag_set", background="#d4ffd4", foreground="#006400"
|
|
)
|
|
self.raw_tab_text.tag_config(
|
|
"flag_unset", background="#f0f0f0", foreground="#808080"
|
|
)
|
|
self.raw_tab_text.tag_config(
|
|
"flag_error", background="#ffd4d4", foreground="#800000"
|
|
)
|
|
self.raw_tab_text.tag_config("hdr_field", foreground="#000080")
|
|
except Exception:
|
|
pass
|
|
self.bin_tab = scrolledtext.ScrolledText(
|
|
self.notebook, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 10)
|
|
)
|
|
self.notebook.add(self.bin_tab, text="Binary (Hex)")
|
|
self.json_tab = scrolledtext.ScrolledText(
|
|
self.notebook, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 10)
|
|
)
|
|
self.notebook.add(self.json_tab, text="JSON")
|
|
|
|
def _on_send_target(self):
|
|
"""Callback to build and send a debug 'tgtset' command for a simple target.
|
|
|
|
Notes:
|
|
- This debug sender uses `tgtset` with qualifiers (/s, /t, /r) when the
|
|
corresponding checkboxes are set in the UI. The runtime simulation
|
|
should continue to use `tgtset` without qualifiers for continuous
|
|
updates; qualifiers are only included here for manual debugging.
|
|
"""
|
|
|
|
# 1. Collect data from UI
|
|
ip = self.ip_var.get()
|
|
port = int(self.server_port_var.get())
|
|
destination = (ip, port)
|
|
|
|
target_id = self.tgt_id_var.get()
|
|
range_nm = self.tgt_range_var.get()
|
|
az_deg = self.tgt_az_var.get()
|
|
alt_ft = self.tgt_alt_var.get()
|
|
vel_kn = self.tgt_vel_var.get()
|
|
hdg_deg = self.tgt_hdg_var.get()
|
|
is_active = self.tgt_active_var.get()
|
|
is_traceable = self.tgt_traceable_var.get()
|
|
is_restart = self.tgt_restart_var.get()
|
|
self._log_to_widget(
|
|
f"DEBUG: is_active={is_active}, is_traceable={is_traceable}, is_restart={is_restart}",
|
|
"DEBUG",
|
|
)
|
|
|
|
if not self.sfp_transport or not self.sfp_transport._socket:
|
|
self._log_to_widget("ERROR: Cannot send target, not connected.", "ERROR")
|
|
messagebox.showerror(
|
|
"Connection Error",
|
|
"SFP transport is not connected. Please connect first.",
|
|
parent=self,
|
|
)
|
|
return
|
|
|
|
try:
|
|
# 1. Collect data from UI
|
|
ip = self.ip_var.get()
|
|
port = int(self.server_port_var.get())
|
|
destination = (ip, port)
|
|
|
|
target_id = self.tgt_id_var.get()
|
|
range_nm = self.tgt_range_var.get()
|
|
az_deg = self.tgt_az_var.get()
|
|
alt_ft = self.tgt_alt_var.get()
|
|
vel_kn = self.tgt_vel_var.get()
|
|
hdg_deg = self.tgt_hdg_var.get()
|
|
is_active = self.tgt_active_var.get()
|
|
is_traceable = self.tgt_traceable_var.get()
|
|
is_restart = self.tgt_restart_var.get()
|
|
|
|
vel_fps = vel_kn * KNOTS_TO_FPS
|
|
|
|
# 2. Create a temporary Target object to feed the command builder
|
|
initial_waypoint = Waypoint(
|
|
maneuver_type=ManeuverType.FLY_TO_POINT,
|
|
target_range_nm=range_nm,
|
|
target_azimuth_deg=az_deg,
|
|
target_altitude_ft=alt_ft,
|
|
target_velocity_fps=vel_fps,
|
|
target_heading_deg=hdg_deg,
|
|
)
|
|
temp_target = Target(
|
|
target_id=target_id,
|
|
trajectory=[initial_waypoint],
|
|
)
|
|
# Imposta le proprietà dopo reset_simulation
|
|
temp_target.active = is_active
|
|
temp_target.traceable = is_traceable
|
|
temp_target.restart = is_restart
|
|
temp_target.current_range_nm = range_nm
|
|
temp_target.current_azimuth_deg = az_deg
|
|
temp_target.current_velocity_fps = vel_fps
|
|
temp_target.current_heading_deg = hdg_deg
|
|
temp_target.current_altitude_ft = alt_ft
|
|
|
|
# 3. Build the command string: use tgtset but INCLUDE qualifiers
|
|
# so that the debug window sends the flags (/s, /t, /r) while the
|
|
# simulation runtime continues to use tgtset without flags.
|
|
command_str = command_builder.build_tgtset_from_target_state(
|
|
temp_target, include_flags=True
|
|
)
|
|
# Ensure the command is trimmed, prefixed with '$' and terminated with a newline
|
|
command_str = command_str.strip()
|
|
# if not command_str.startswith("$"):
|
|
# command_str = "$" + command_str.lstrip()
|
|
if not command_str.endswith("\n"):
|
|
command_str = command_str + "\n"
|
|
self._log_to_widget(f"Built command: {command_str!r}", "INFO")
|
|
|
|
# 4. Send using the transport layer
|
|
success = self.sfp_transport.send_script_command(command_str, destination)
|
|
|
|
if success:
|
|
self._log_to_widget(
|
|
f"Successfully sent command for target {target_id}.", "INFO"
|
|
)
|
|
else:
|
|
self._log_to_widget(
|
|
f"Failed to send command for target {target_id}.", "ERROR"
|
|
)
|
|
|
|
except (ValueError, tk.TclError) as e:
|
|
error_msg = f"Invalid input value: {e}"
|
|
self._log_to_widget(f"ERROR: {error_msg}", "ERROR")
|
|
messagebox.showerror("Input Error", error_msg, parent=self)
|
|
except Exception as e:
|
|
self.logger.exception("An unexpected error occurred in _on_send_target.")
|
|
self._log_to_widget(f"ERROR: {e}", "CRITICAL")
|
|
messagebox.showerror("Unexpected Error", str(e), parent=self)
|
|
|
|
# --- (The rest of the file content remains the same) ---
|
|
def _create_image_tab(self, title: str) -> Dict:
|
|
frame = ttk.Frame(self.notebook)
|
|
image_container = ttk.Frame(
|
|
frame,
|
|
width=self.image_area_size,
|
|
height=self.image_area_size,
|
|
relief=tk.SUNKEN,
|
|
)
|
|
image_container.pack(pady=5, padx=5)
|
|
image_container.pack_propagate(False)
|
|
image_label = ttk.Label(
|
|
image_container, text=f"Waiting for {title}...", anchor=tk.CENTER
|
|
)
|
|
image_label.pack(fill=tk.BOTH, expand=True)
|
|
hex_view = scrolledtext.ScrolledText(
|
|
frame, height=8, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 9)
|
|
)
|
|
hex_view.pack(fill=tk.BOTH, expand=True, pady=5, padx=5)
|
|
return {
|
|
"frame": frame,
|
|
"image_label": image_label,
|
|
"hex_view": hex_view,
|
|
"image_container": image_container,
|
|
}
|
|
|
|
def _open_image_size_dialog(self):
|
|
dlg = tk.Toplevel(self)
|
|
dlg.title("Image Size")
|
|
dlg.transient(self)
|
|
dlg.grab_set()
|
|
ttk.Label(dlg, text="Image area size (px):").pack(padx=10, pady=(10, 2))
|
|
size_var = tk.StringVar(value=str(self.image_area_size))
|
|
entry = ttk.Entry(dlg, textvariable=size_var, width=8)
|
|
entry.pack(padx=10, pady=(0, 10))
|
|
btn_frame = ttk.Frame(dlg)
|
|
btn_frame.pack(padx=10, pady=(0, 10))
|
|
|
|
def on_save():
|
|
try:
|
|
v = int(size_var.get())
|
|
if v <= 0:
|
|
raise ValueError()
|
|
except Exception:
|
|
message = "Please enter a positive integer for image size."
|
|
try:
|
|
tk.messagebox.showerror("Invalid value", message, parent=dlg)
|
|
except Exception:
|
|
pass
|
|
return
|
|
self.image_area_size = v
|
|
for tab in (getattr(self, "mfd_tab", None), getattr(self, "sar_tab", None)):
|
|
if tab and "image_container" in tab:
|
|
tab["image_container"].config(width=v, height=v)
|
|
gm = getattr(self.master, "config_manager", None)
|
|
if gm:
|
|
general = gm.get_general_settings() or {}
|
|
image_display = general.get("image_display", {})
|
|
image_display["size"] = v
|
|
general["image_display"] = image_display
|
|
gm.save_general_settings(general)
|
|
dlg.destroy()
|
|
|
|
def on_cancel():
|
|
dlg.destroy()
|
|
|
|
ttk.Button(btn_frame, text="Cancel", command=on_cancel).pack(
|
|
side=tk.RIGHT, padx=(0, 5)
|
|
)
|
|
ttk.Button(btn_frame, text="Save", command=on_save).pack(side=tk.RIGHT)
|
|
|
|
def _on_connect(self):
|
|
ip = self.ip_var.get()
|
|
try:
|
|
port = int(self.local_port_var.get())
|
|
except ValueError:
|
|
self._log_to_widget("ERROR: Invalid port number.", "ERROR")
|
|
return
|
|
self._log_to_widget(f"Attempting to connect to {ip}:{port}...")
|
|
ack_config = {ord("M"): 32, ord("S"): 16}
|
|
self.sfp_transport = SfpTransport(
|
|
host=ip,
|
|
port=port,
|
|
payload_handlers=self.payload_router.get_handlers(),
|
|
ack_config=ack_config,
|
|
raw_packet_callback=self.payload_router.update_raw_packet,
|
|
)
|
|
if self.sfp_transport.start():
|
|
self._log_to_widget(
|
|
"Connection successful. Listening for packets...", "INFO"
|
|
)
|
|
self.connect_btn.config(state=tk.DISABLED)
|
|
self.disconnect_btn.config(state=tk.NORMAL)
|
|
else:
|
|
self._log_to_widget("Connection failed. Check IP/Port and logs.", "ERROR")
|
|
self.sfp_transport = None
|
|
try:
|
|
self.history_tree.bind(
|
|
"<<TreeviewSelect>>", lambda e: self._on_history_select()
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
def _on_send_probe(self):
|
|
ip = self.ip_var.get()
|
|
try:
|
|
port = int(self.server_port_var.get())
|
|
except Exception:
|
|
self._log_to_widget("ERROR: Invalid port number for probe.", "ERROR")
|
|
return
|
|
probe_payload = b"SFP_PROBE\n"
|
|
try:
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.settimeout(1.0)
|
|
sock.sendto(probe_payload, (ip, port))
|
|
sock.close()
|
|
self._log_to_widget(f"Sent probe to {ip}:{port}", "INFO")
|
|
except Exception as e:
|
|
self._log_to_widget(f"Failed to send probe to {ip}:{port}: {e}", "ERROR")
|
|
|
|
def _on_send_script(self):
|
|
if not self.sfp_transport or not self.sfp_transport._socket:
|
|
self._log_to_widget("ERROR: Cannot send script, not connected.", "ERROR")
|
|
return
|
|
try:
|
|
ip = self.ip_var.get()
|
|
port = int(self.server_port_var.get())
|
|
destination = (ip, port)
|
|
command_str = self.script_var.get()
|
|
command_str = command_str.strip()
|
|
# if command_str and not command_str.startswith("$"):
|
|
# command_str = "$" + command_str.lstrip()
|
|
if command_str and not command_str.endswith("\n"):
|
|
command_str = command_str + "\n"
|
|
self.sfp_transport.send_script_command(command_str, destination)
|
|
except (ValueError, tk.TclError) as e:
|
|
self._log_to_widget(
|
|
f"ERROR: Invalid input for script sending: {e}", "ERROR"
|
|
)
|
|
|
|
def _on_send_simple_command(self, command_str: str):
|
|
"""Send a simple script command string to the configured server port.
|
|
|
|
Validates transport/socket and logs the result.
|
|
"""
|
|
if not self.sfp_transport or not getattr(self.sfp_transport, "_socket", None):
|
|
self._log_to_widget("ERROR: Cannot send command, not connected.", "ERROR")
|
|
messagebox.showerror(
|
|
"Connection Error",
|
|
"SFP transport is not connected. Please connect first.",
|
|
parent=self,
|
|
)
|
|
return False
|
|
|
|
try:
|
|
ip = self.ip_var.get()
|
|
port = int(self.server_port_var.get())
|
|
destination = (ip, port)
|
|
command_str = (command_str or "").strip()
|
|
# Always prefix with '$' (no space after $)
|
|
# if not command_str.startswith("$"):
|
|
# command_str = "$" + command_str.lstrip()
|
|
if command_str and not command_str.endswith("\n"):
|
|
command_str = command_str + "\n"
|
|
self._log_to_widget(
|
|
f"Sending command to {destination}: {command_str!r}", "INFO"
|
|
)
|
|
success = self.sfp_transport.send_script_command(command_str, destination)
|
|
if success:
|
|
self._log_to_widget(f"Successfully sent command: {command_str}", "INFO")
|
|
else:
|
|
self._log_to_widget(f"Failed to send command: {command_str}", "ERROR")
|
|
return success
|
|
except Exception as e:
|
|
self.logger.exception("Unexpected error in _on_send_simple_command")
|
|
self._log_to_widget(f"ERROR: {e}", "ERROR")
|
|
return False
|
|
|
|
def _on_send_tgtset(self):
|
|
"""Build and send a 'tgtset' command using current UI values."""
|
|
try:
|
|
target_id = self.tgt_id_var.get()
|
|
range_nm = self.tgt_range_var.get()
|
|
az_deg = self.tgt_az_var.get()
|
|
alt_ft = self.tgt_alt_var.get()
|
|
vel_kn = self.tgt_vel_var.get()
|
|
hdg_deg = self.tgt_hdg_var.get()
|
|
|
|
vel_fps = vel_kn * KNOTS_TO_FPS
|
|
|
|
is_active = self.tgt_active_var.get()
|
|
is_traceable = self.tgt_traceable_var.get()
|
|
updates = {
|
|
"range_nm": f"{range_nm:.2f}",
|
|
"azimuth_deg": f"{az_deg:.2f}",
|
|
"velocity_fps": f"{vel_fps:.2f}",
|
|
"heading_deg": f"{hdg_deg:.2f}",
|
|
"altitude_ft": f"{alt_ft:.2f}",
|
|
"active": is_active,
|
|
"traceable": is_traceable,
|
|
}
|
|
command_str = command_builder.build_tgtset_selective(target_id, updates)
|
|
command_str = command_str.strip()
|
|
if command_str and not command_str.endswith("\n"):
|
|
command_str = command_str + "\n"
|
|
return self._on_send_simple_command(command_str)
|
|
except (ValueError, tk.TclError) as e:
|
|
self._log_to_widget(f"ERROR: Invalid input for tgtset: {e}", "ERROR")
|
|
return False
|
|
|
|
def _on_disconnect(self):
|
|
if self.sfp_transport:
|
|
self._log_to_widget("Disconnecting...", "INFO")
|
|
self.sfp_transport.shutdown()
|
|
self.sfp_transport = None
|
|
self.connect_btn.config(state=tk.NORMAL)
|
|
self.disconnect_btn.config(state=tk.DISABLED)
|
|
self._log_to_widget("Disconnected.", "INFO")
|
|
|
|
def _on_close(self):
|
|
self.logger.info("SFP Debug Window closing.")
|
|
self._on_disconnect()
|
|
self.destroy()
|
|
|
|
def _process_latest_payloads(self):
|
|
new_payloads = self.payload_router.get_and_clear_latest_payloads()
|
|
if new_payloads:
|
|
self._log_to_widget(
|
|
f"Processing {len(new_payloads)} new payload(s) for flows: {list(new_payloads.keys())}"
|
|
)
|
|
for flow_id, payload in new_payloads.items():
|
|
if flow_id == "MFD" and _IMAGE_LIBS_AVAILABLE:
|
|
self._display_image_data(payload, self.mfd_tab, "mfd_photo")
|
|
elif flow_id == "SAR" and _IMAGE_LIBS_AVAILABLE:
|
|
self._display_image_data(payload, self.sar_tab, "sar_photo")
|
|
elif flow_id == "BIN":
|
|
self._display_hex_data(payload, self.bin_tab)
|
|
elif flow_id == "JSON":
|
|
self._display_json_data(payload, self.json_tab)
|
|
elif flow_id == "RIS_STATUS_JSON":
|
|
try:
|
|
import json
|
|
|
|
struct = (
|
|
json.loads(payload.decode("utf-8"))
|
|
if isinstance(payload, (bytes, bytearray))
|
|
else payload
|
|
)
|
|
for iid in self.scenario_tree.get_children():
|
|
self.scenario_tree.delete(iid)
|
|
scenario = (
|
|
struct.get("scenario", {})
|
|
if isinstance(struct, dict)
|
|
else {}
|
|
)
|
|
if scenario:
|
|
import math
|
|
|
|
def to_deg(v):
|
|
try:
|
|
return float(v) * (180.0 / math.pi)
|
|
except Exception:
|
|
return v
|
|
|
|
def m_s_to_ft_s(v):
|
|
try:
|
|
return float(v) * 3.280839895
|
|
except Exception:
|
|
return v
|
|
|
|
def m_to_ft(v):
|
|
try:
|
|
return float(v) * 3.280839895
|
|
except Exception:
|
|
return v
|
|
|
|
order = [
|
|
("timetag", "timetag", ""),
|
|
("platform_azimuth", "platform_azimuth", "°"),
|
|
("ant_nav_az", "ant_nav_az", "°"),
|
|
("ant_nav_el", "ant_nav_el", "°"),
|
|
("flags", "flags", ""),
|
|
("mode", "mode", ""),
|
|
("vx", "vx", "ft/s"),
|
|
("vy", "vy", "ft/s"),
|
|
("vz", "vz", "ft/s"),
|
|
("baro_altitude", "baro_altitude", "ft"),
|
|
("latitude", "latitude", "°"),
|
|
("longitude", "longitude", "°"),
|
|
("true_heading", "true_heading", "°"),
|
|
]
|
|
view_mode = (
|
|
self.scenario_view_mode.get()
|
|
if hasattr(self, "scenario_view_mode")
|
|
else "simplified"
|
|
)
|
|
dec_simp = int(
|
|
self.simplified_decimals.get()
|
|
if hasattr(self, "simplified_decimals")
|
|
else 4
|
|
)
|
|
|
|
def fmt_raw_number(v, key_name=None):
|
|
try:
|
|
fv = float(v)
|
|
if key_name in ("latitude", "longitude"):
|
|
return f"{fv:.8f}"
|
|
return f"{fv:.6f}"
|
|
except Exception:
|
|
return str(v)
|
|
|
|
def fmt_simplified_number(v, unit_str, decimals=4):
|
|
try:
|
|
fv = float(v)
|
|
return (
|
|
f"{fv:.{decimals}f} {unit_str}"
|
|
if unit_str
|
|
else f"{fv:.{decimals}f}"
|
|
)
|
|
except Exception:
|
|
return str(v)
|
|
|
|
def try_float(v):
|
|
try:
|
|
return float(v)
|
|
except Exception:
|
|
return None
|
|
|
|
def decimal_deg_to_dms(deg, is_lat=True):
|
|
try:
|
|
d = float(deg)
|
|
except Exception:
|
|
return str(deg)
|
|
if is_lat:
|
|
direction = "N" if d >= 0 else "S"
|
|
else:
|
|
direction = "E" if d >= 0 else "W"
|
|
ad = abs(d)
|
|
degrees = int(ad)
|
|
minutes_full = (ad - degrees) * 60
|
|
minutes = int(minutes_full)
|
|
seconds = (minutes_full - minutes) * 60
|
|
return f"{degrees}°{minutes}'{seconds:.2f} {direction}"
|
|
|
|
scenario_rows = []
|
|
for label, key, unit in order:
|
|
if key in scenario:
|
|
val = scenario.get(key)
|
|
if view_mode == "raw":
|
|
if key in (
|
|
"platform_azimuth",
|
|
"true_heading",
|
|
"ant_nav_az",
|
|
"ant_nav_el",
|
|
):
|
|
display_val = fmt_raw_number(val, key)
|
|
elif key in ("vx", "vy", "vz", "baro_altitude"):
|
|
display_val = fmt_raw_number(val, key)
|
|
elif key in ("latitude", "longitude"):
|
|
display_val = fmt_raw_number(val, key)
|
|
elif key == "flags":
|
|
try:
|
|
display_val = (
|
|
f"{int(val)} (0x{int(val):X})"
|
|
)
|
|
except Exception:
|
|
display_val = str(val)
|
|
else:
|
|
display_val = str(val)
|
|
else:
|
|
if key in ("platform_azimuth", "true_heading"):
|
|
fv = try_float(val)
|
|
if fv is not None:
|
|
conv = to_deg(fv)
|
|
display_val = fmt_simplified_number(
|
|
conv, "°", dec_simp
|
|
)
|
|
else:
|
|
display_val = str(val)
|
|
elif key in ("ant_nav_az", "ant_nav_el"):
|
|
fv = try_float(val)
|
|
if fv is not None:
|
|
conv = to_deg(fv)
|
|
display_val = fmt_simplified_number(
|
|
conv, "°", dec_simp
|
|
)
|
|
else:
|
|
display_val = str(val)
|
|
elif key in ("vx", "vy", "vz"):
|
|
fv = try_float(val)
|
|
if fv is not None:
|
|
conv = m_s_to_ft_s(fv)
|
|
display_val = fmt_simplified_number(
|
|
conv, "ft/s", dec_simp
|
|
)
|
|
else:
|
|
display_val = str(val)
|
|
elif key == "baro_altitude":
|
|
fv = try_float(val)
|
|
if fv is not None:
|
|
conv = m_to_ft(fv)
|
|
display_val = fmt_simplified_number(
|
|
conv, "ft", dec_simp
|
|
)
|
|
else:
|
|
display_val = str(val)
|
|
elif key in ("latitude", "longitude"):
|
|
fv = try_float(val)
|
|
if fv is not None:
|
|
is_lat = key == "latitude"
|
|
display_val = decimal_deg_to_dms(
|
|
fv, is_lat=is_lat
|
|
)
|
|
else:
|
|
display_val = str(val)
|
|
elif key == "flags":
|
|
try:
|
|
display_val = (
|
|
f"{int(val)} (0x{int(val):X})"
|
|
)
|
|
except Exception:
|
|
display_val = str(val)
|
|
elif key == "mode":
|
|
try:
|
|
midx = int(val)
|
|
if (
|
|
0
|
|
<= midx
|
|
< len(self._master_mode_names)
|
|
):
|
|
name = self._master_mode_names[midx]
|
|
short = name
|
|
for suffix in (
|
|
"_master_mode",
|
|
"_mode",
|
|
"_master_mode_",
|
|
"_",
|
|
):
|
|
if short.endswith(suffix):
|
|
short = short[
|
|
: -len(suffix)
|
|
]
|
|
short = short.replace(
|
|
"master", ""
|
|
).strip("_")
|
|
display_val = short
|
|
else:
|
|
display_val = str(midx)
|
|
except Exception:
|
|
display_val = str(val)
|
|
else:
|
|
display_val = str(val)
|
|
scenario_rows.append((label, display_val))
|
|
for l, v in scenario_rows:
|
|
self.scenario_tree.insert(
|
|
"", tk.END, values=(f"{l}", v)
|
|
)
|
|
for iid in self.ris_tree.get_children():
|
|
self.ris_tree.delete(iid)
|
|
targets = (
|
|
struct.get("targets", [])
|
|
if isinstance(struct, dict)
|
|
else []
|
|
)
|
|
try:
|
|
view_mode = (
|
|
self.scenario_view_mode.get()
|
|
if hasattr(self, "scenario_view_mode")
|
|
else "simplified"
|
|
)
|
|
self.ris_tree.heading("heading", text="heading")
|
|
self.ris_tree.heading("x", text="x")
|
|
self.ris_tree.heading("y", text="y")
|
|
self.ris_tree.heading("z", text="z")
|
|
except Exception:
|
|
pass
|
|
view_mode = (
|
|
self.scenario_view_mode.get()
|
|
if hasattr(self, "scenario_view_mode")
|
|
else "simplified"
|
|
)
|
|
dec_simp = int(
|
|
self.simplified_decimals.get()
|
|
if hasattr(self, "simplified_decimals")
|
|
else 4
|
|
)
|
|
for t in targets:
|
|
try:
|
|
idx = t.get("index")
|
|
if idx is None:
|
|
idx = t.get("idx")
|
|
if idx is None:
|
|
idx = t.get("#")
|
|
raw_flags = t.get("flags", t.get("flag", 0))
|
|
try:
|
|
flags_val = int(raw_flags)
|
|
flags_display = f"{flags_val} (0x{flags_val:X})"
|
|
except Exception:
|
|
flags_display = str(raw_flags)
|
|
if view_mode == "raw":
|
|
hfv = try_float(t.get("heading"))
|
|
heading_val = (
|
|
f"{hfv:.6f}"
|
|
if hfv is not None
|
|
else str(t.get("heading"))
|
|
)
|
|
xfv = try_float(t.get("x"))
|
|
yfv = try_float(t.get("y"))
|
|
zfv = try_float(t.get("z"))
|
|
x_val = (
|
|
f"{xfv:.6f}" if xfv is not None else t.get("x")
|
|
)
|
|
y_val = (
|
|
f"{yfv:.6f}" if yfv is not None else t.get("y")
|
|
)
|
|
z_val = (
|
|
f"{zfv:.6f}" if zfv is not None else t.get("z")
|
|
)
|
|
vals = (
|
|
idx,
|
|
flags_display,
|
|
heading_val,
|
|
x_val,
|
|
y_val,
|
|
z_val,
|
|
)
|
|
else:
|
|
hfv = try_float(t.get("heading"))
|
|
if hfv is not None:
|
|
heading_deg = hfv * (180.0 / 3.141592653589793)
|
|
heading_val = f"{heading_deg:.{dec_simp}f} °"
|
|
else:
|
|
heading_val = str(t.get("heading"))
|
|
xfv = try_float(t.get("x"))
|
|
yfv = try_float(t.get("y"))
|
|
zfv = try_float(t.get("z"))
|
|
x_val = (
|
|
f"{xfv:.{dec_simp}f} m"
|
|
if xfv is not None
|
|
else str(t.get("x"))
|
|
)
|
|
y_val = (
|
|
f"{yfv:.{dec_simp}f} m"
|
|
if yfv is not None
|
|
else str(t.get("y"))
|
|
)
|
|
if zfv is not None:
|
|
z_val = f"{(zfv * 3.280839895):.{dec_simp}f} ft"
|
|
else:
|
|
z_val = str(t.get("z"))
|
|
vals = (
|
|
idx,
|
|
flags_display,
|
|
heading_val,
|
|
x_val,
|
|
y_val,
|
|
z_val,
|
|
)
|
|
if (
|
|
not isinstance(vals, (list, tuple))
|
|
or len(vals) != 6
|
|
):
|
|
vals = (idx, flags_display, "", "", "", "")
|
|
self.ris_tree.insert("", tk.END, values=vals)
|
|
except Exception as _e:
|
|
try:
|
|
self.ris_tree.insert(
|
|
"",
|
|
tk.END,
|
|
values=(
|
|
None,
|
|
None,
|
|
str(t.get("heading")),
|
|
str(t.get("x")),
|
|
str(t.get("y")),
|
|
str(t.get("z")),
|
|
),
|
|
)
|
|
except Exception:
|
|
pass
|
|
# --- Update PPI with active targets that fit the current PPI scale/sector ---
|
|
try:
|
|
# Only proceed if we have a PPI widget available
|
|
ppi = getattr(self, "ris_ppi_widget", None)
|
|
if ppi is not None:
|
|
ppi_targets = []
|
|
METERS_PER_NM = 1852.0
|
|
# Determine current display range from the PPI widget
|
|
current_range = (
|
|
ppi.range_var.get()
|
|
if hasattr(ppi, "range_var")
|
|
else ppi.max_range
|
|
)
|
|
# Debug: show parsed target count and sample raw values
|
|
try:
|
|
# self._log_to_widget(
|
|
# f"PPI: parsed {len(targets)} JSON target(s); current_range={current_range}",
|
|
# "DEBUG",
|
|
# )
|
|
for i, tt in enumerate(targets[:6]):
|
|
try:
|
|
fx = tt.get("flags", None)
|
|
tx = tt.get("x", None)
|
|
ty = tt.get("y", None)
|
|
tz = tt.get("z", None)
|
|
# compute derived quantities
|
|
try:
|
|
xm = float(tx)
|
|
ym = float(ty)
|
|
zm = float(tz)
|
|
rng_m = (xm * xm + ym * ym) ** 0.5
|
|
rng_nm = rng_m / 1852.0
|
|
# Use atan2(y, x) for conventional azimuth (x East, y North)
|
|
az_deg = math.degrees(
|
|
math.atan2(ym, xm)
|
|
)
|
|
# elevation from z and slant range
|
|
elev_rad = (
|
|
math.atan2(zm, rng_m)
|
|
if rng_m > 0
|
|
else 0.0
|
|
)
|
|
elev_deg = math.degrees(elev_rad)
|
|
except Exception:
|
|
rng_m = rng_nm = az_deg = elev_deg = (
|
|
None
|
|
)
|
|
# msg = (
|
|
# f"PPI RAW[{i}]: flags={fx} x={tx} y={ty} z={tz} | range_m={rng_m:.1f} range_nm={rng_nm:.2f} az={az_deg:.2f}° elev={elev_deg:.2f}°"
|
|
# if rng_m is not None
|
|
# else f"PPI RAW[{i}]: flags={fx} x={tx} y={ty} z={tz}"
|
|
# )
|
|
# self._log_to_widget(msg, "DEBUG")
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
for t in targets:
|
|
raw_flags = t.get("flags", 0)
|
|
# Only show enabled/active targets (non-zero flags)
|
|
if int(raw_flags) == 0:
|
|
continue
|
|
x = float(t.get("x", 0.0))
|
|
y = float(t.get("y", 0.0))
|
|
z = float(t.get("z", 0.0))
|
|
# Compute range in NM assuming x/y are meters
|
|
range_nm = ((x**2 + y**2) ** 0.5) / METERS_PER_NM
|
|
if range_nm > current_range:
|
|
continue
|
|
# Compute azimuth in degrees using atan2(y, x)
|
|
az_deg = math.degrees(math.atan2(y, x))
|
|
# Heading in JSON is in radians -> convert to degrees
|
|
heading = t.get("heading", 0.0)
|
|
try:
|
|
hdg_deg = float(heading) * (180.0 / math.pi)
|
|
except Exception:
|
|
hdg_deg = 0.0
|
|
|
|
tgt = Target(
|
|
target_id=int(t.get("index", 0)),
|
|
trajectory=[],
|
|
active=True,
|
|
traceable=True,
|
|
)
|
|
tgt.current_range_nm = range_nm
|
|
tgt.current_azimuth_deg = az_deg
|
|
tgt.current_heading_deg = hdg_deg
|
|
# convert altitude from meters to feet
|
|
try:
|
|
tgt.current_altitude_ft = float(z) * 3.280839895
|
|
except Exception:
|
|
tgt.current_altitude_ft = 0.0
|
|
tgt.active = True
|
|
ppi_targets.append(tgt)
|
|
# Push to PPI (log debug info)
|
|
try:
|
|
self._log_to_widget(
|
|
f"PPI: prepared {len(ppi_targets)} target(s) for display",
|
|
"DEBUG",
|
|
)
|
|
if ppi_targets:
|
|
for pt in ppi_targets[:5]:
|
|
self._log_to_widget(
|
|
f"PPI target sample: id={getattr(pt, 'target_id', None)} r_nm={getattr(pt,'current_range_nm',None):.2f} az={getattr(pt,'current_azimuth_deg',None):.2f} hdg={getattr(pt,'current_heading_deg',None):.2f}",
|
|
"DEBUG",
|
|
)
|
|
# Mark these as server-sent 'real' targets so
|
|
# PPIDisplay will draw them using the 'real'
|
|
# styling (red). Use the dict form expected
|
|
# by PPIDisplay to distinguish origins.
|
|
self.update_ppi_targets({"real": ppi_targets})
|
|
except Exception:
|
|
self.logger.exception(
|
|
"Failed to push targets to PPI"
|
|
)
|
|
except Exception:
|
|
self.logger.exception(
|
|
"Error while preparing RIS targets for PPI"
|
|
)
|
|
except Exception:
|
|
pass
|
|
self.after(self.GUI_POLL_INTERVAL_MS, self._process_latest_payloads)
|
|
raw_pkt = self.payload_router.get_and_clear_raw_packet()
|
|
if raw_pkt:
|
|
raw_bytes, addr = raw_pkt
|
|
self._display_raw_packet(raw_bytes, addr)
|
|
try:
|
|
self._refresh_history_tree()
|
|
except Exception:
|
|
pass
|
|
|
|
def _refresh_history_tree(self):
|
|
try:
|
|
hist = self.payload_router.get_history()
|
|
for iid in self.history_tree.get_children():
|
|
self.history_tree.delete(iid)
|
|
for i, entry in enumerate(reversed(hist)):
|
|
ts = entry["ts"].strftime("%H:%M:%S.%f")[:-3]
|
|
flow_name = entry.get("flow_name", "")
|
|
tid = entry.get("tid", "")
|
|
size = len(entry.get("raw", b""))
|
|
self.history_tree.insert(
|
|
"", tk.END, values=(ts, flow_name, tid, f"{size}B")
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
def _on_history_select(self):
|
|
try:
|
|
sel = self.history_tree.selection()
|
|
if not sel:
|
|
return
|
|
iid = sel[0]
|
|
children = list(self.history_tree.get_children())
|
|
try:
|
|
idx = children.index(iid)
|
|
except ValueError:
|
|
idx = None
|
|
hist = list(reversed(self.payload_router.get_history()))
|
|
if idx is None or idx < 0 or idx >= len(hist):
|
|
return
|
|
entry = hist[idx]
|
|
self._display_raw_packet(entry["raw"], entry["addr"])
|
|
except Exception:
|
|
pass
|
|
|
|
def _on_clear_history(self):
|
|
try:
|
|
self.payload_router.clear_history()
|
|
self._refresh_history_tree()
|
|
except Exception:
|
|
pass
|
|
|
|
def _open_history_settings_dialog(self):
|
|
dlg = tk.Toplevel(self)
|
|
dlg.title("History Settings")
|
|
dlg.transient(self)
|
|
dlg.grab_set()
|
|
try:
|
|
hist_size = self.payload_router._history_size
|
|
persist = self.payload_router._persist
|
|
except Exception:
|
|
hist_size = 20
|
|
persist = False
|
|
ttk.Label(dlg, text="History size (entries):").pack(padx=10, pady=(10, 2))
|
|
size_var = tk.StringVar(value=str(hist_size))
|
|
entry = ttk.Entry(dlg, textvariable=size_var, width=8)
|
|
entry.pack(padx=10, pady=(0, 10))
|
|
persist_var = tk.BooleanVar(value=bool(persist))
|
|
ttk.Checkbutton(
|
|
dlg, text="Persist raw packets to Temp/", variable=persist_var
|
|
).pack(padx=10, pady=(0, 10))
|
|
btn_frame = ttk.Frame(dlg)
|
|
btn_frame.pack(padx=10, pady=(0, 10), fill=tk.X)
|
|
|
|
def on_save():
|
|
try:
|
|
v = int(size_var.get())
|
|
if v <= 0:
|
|
raise ValueError()
|
|
except Exception:
|
|
try:
|
|
tk.messagebox.showerror(
|
|
"Invalid value",
|
|
"Please enter a positive integer for history size.",
|
|
parent=dlg,
|
|
)
|
|
except Exception:
|
|
pass
|
|
return
|
|
try:
|
|
self.payload_router.set_history_size(v)
|
|
self.payload_router.set_persist(bool(persist_var.get()))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
gm = getattr(self.master, "config_manager", None)
|
|
if gm:
|
|
general = gm.get_general_settings() or {}
|
|
sfp_debug = general.get("sfp_debug", {})
|
|
sfp_debug["history_size"] = v
|
|
sfp_debug["persist_raw"] = bool(persist_var.get())
|
|
general["sfp_debug"] = sfp_debug
|
|
gm.save_general_settings(general)
|
|
except Exception:
|
|
pass
|
|
dlg.destroy()
|
|
|
|
def on_cancel():
|
|
dlg.destroy()
|
|
|
|
ttk.Button(btn_frame, text="Cancel", command=on_cancel).pack(
|
|
side=tk.RIGHT, padx=(0, 5)
|
|
)
|
|
ttk.Button(btn_frame, text="Save", command=on_save).pack(side=tk.RIGHT)
|
|
|
|
def _display_raw_packet(self, raw_bytes: bytes, addr: tuple):
|
|
try:
|
|
header_size = SFPHeader.size()
|
|
if len(raw_bytes) < header_size:
|
|
raise ValueError("Packet smaller than SFP header")
|
|
header = SFPHeader.from_buffer_copy(raw_bytes)
|
|
body = raw_bytes[header_size:]
|
|
field_list = [
|
|
"SFP_MARKER",
|
|
"SFP_DIRECTION",
|
|
"SFP_PROT_VER",
|
|
"SFP_PT_SPARE",
|
|
"SFP_TAG",
|
|
"SFP_SRC",
|
|
"SFP_FLOW",
|
|
"SFP_TID",
|
|
"SFP_FLAGS",
|
|
"SFP_WIN",
|
|
"SFP_ERR",
|
|
"SFP_ERR_INFO",
|
|
"SFP_TOTFRGAS",
|
|
"SFP_FRAG",
|
|
"SFP_RECTYPE",
|
|
"SFP_RECSPARE",
|
|
"SFP_PLDAP",
|
|
"SFP_PLEXT",
|
|
"SFP_RECCOUNTER",
|
|
"SFP_PLSIZE",
|
|
"SFP_TOTSIZE",
|
|
"SFP_PLOFFSET",
|
|
]
|
|
pairs = []
|
|
flag_val = None
|
|
for f in field_list:
|
|
try:
|
|
val = getattr(header, f)
|
|
except Exception:
|
|
val = "<N/A>"
|
|
if f == "SFP_FLAGS":
|
|
flag_val = val
|
|
pairs.append(
|
|
(f, f"{val} (0x{val:X})" if isinstance(val, int) else str(val))
|
|
)
|
|
continue
|
|
if isinstance(val, int):
|
|
pairs.append((f, f"{val} (0x{val:X})"))
|
|
else:
|
|
pairs.append((f, str(val)))
|
|
out_lines = [f"From {addr}\n\nSFP Header:\n\n"]
|
|
col_width = 36
|
|
for i in range(0, len(pairs), 2):
|
|
left = pairs[i]
|
|
right = pairs[i + 1] if (i + 1) < len(pairs) else None
|
|
left_text = f"{left[0]:12s}: {left[1]}"
|
|
if right:
|
|
right_text = f"{right[0]:12s}: {right[1]}"
|
|
line = f"{left_text:<{col_width}} {right_text}"
|
|
else:
|
|
line = left_text
|
|
out_lines.append(line + "\n")
|
|
flag_defs = [
|
|
(0, "ACQ_REQ"),
|
|
(1, "RESENT"),
|
|
(2, "TRAILER_ACK"),
|
|
(3, "RESV3"),
|
|
(4, "RESV4"),
|
|
(5, "RESV5"),
|
|
(6, "RESV6"),
|
|
(7, "ERROR"),
|
|
]
|
|
out_lines.append(f"SFP_FLAGS : {flag_val} (0x{flag_val:X}) ")
|
|
for bit, name in flag_defs:
|
|
is_set = bool((flag_val >> bit) & 1)
|
|
out_lines.append(f" [{name}]")
|
|
out_lines.append("\n\nFlags legend:\n")
|
|
legend_map = {
|
|
"ACQ_REQ": "Acquisition required/requested",
|
|
"RESENT": "Fragment resent / please resend",
|
|
"TRAILER_ACK": "Request trailer acknowledgement",
|
|
"ERROR": "Packet-level error flag",
|
|
"RESV3": "Reserved",
|
|
"RESV4": "Reserved",
|
|
"RESV5": "Reserved",
|
|
"RESV6": "Reserved",
|
|
}
|
|
for _, name in flag_defs:
|
|
desc = legend_map.get(name, "")
|
|
out_lines.append(f" {name:12s}: {desc}\n")
|
|
out_lines.append("\nBODY (hex):\n")
|
|
hex_dump = self._format_hex_dump(body)
|
|
out_lines.append(hex_dump)
|
|
full_text = "".join(out_lines)
|
|
self.raw_tab_text.config(state=tk.NORMAL)
|
|
self.raw_tab_text.delete("1.0", tk.END)
|
|
header_block, _, _ = full_text.partition("\nBODY (hex):\n")
|
|
self.raw_tab_text.insert(tk.END, header_block + "\n", "hdr_field")
|
|
self.raw_tab_text.config(state=tk.DISABLED)
|
|
try:
|
|
self._display_hex_data(body, self.bin_tab)
|
|
except Exception:
|
|
self.bin_tab.config(state=tk.NORMAL)
|
|
self.bin_tab.delete("1.0", tk.END)
|
|
self.bin_tab.insert("1.0", hex_dump)
|
|
self.bin_tab.config(state=tk.DISABLED)
|
|
except Exception as e:
|
|
text = f"Failed to format raw packet: {e}\n\nRaw dump:\n"
|
|
text += self._format_hex_dump(raw_bytes)
|
|
self.raw_tab_text.config(state=tk.NORMAL)
|
|
self.raw_tab_text.delete("1.0", tk.END)
|
|
self.raw_tab_text.insert("1.0", text)
|
|
self.raw_tab_text.config(state=tk.DISABLED)
|
|
|
|
def _display_image_data(
|
|
self, payload: bytearray, tab_widgets: Dict[str, Any], photo_attr: str
|
|
):
|
|
try:
|
|
if len(payload) < ctypes.sizeof(ImageLeaderData):
|
|
raise ValueError("Payload smaller than ImageLeaderData header.")
|
|
leader = ImageLeaderData.from_buffer(payload)
|
|
h, w, bpp = (
|
|
leader.HEADER_DATA.DY,
|
|
leader.HEADER_DATA.DX,
|
|
leader.HEADER_DATA.BPP,
|
|
)
|
|
stride = leader.HEADER_DATA.STRIDE
|
|
offset = ctypes.sizeof(ImageLeaderData)
|
|
if not (h > 0 and w > 0 and bpp in [1, 2] and stride >= w):
|
|
raise ValueError(
|
|
f"Invalid image dimensions: {w}x{h}, bpp={bpp}, stride={stride}"
|
|
)
|
|
dtype = np.uint8 if bpp == 1 else np.uint16
|
|
expected_size = stride * h * bpp
|
|
if (offset + expected_size) > len(payload):
|
|
offset_fallback = (
|
|
ctypes.sizeof(SFPHeader)
|
|
+ ctypes.sizeof(ImageLeaderData)
|
|
- ctypes.sizeof(leader.PIXEL_TAG)
|
|
)
|
|
if (offset_fallback + expected_size) <= len(payload):
|
|
offset = offset_fallback
|
|
else:
|
|
raise ValueError(
|
|
f"Incomplete image data. Expected {expected_size}, got {len(payload) - offset}"
|
|
)
|
|
pixel_data_view = np.ndarray(
|
|
shape=(h, stride), dtype=dtype, buffer=payload, offset=offset
|
|
)
|
|
image_data = pixel_data_view[:, :w]
|
|
display_img_8bit = cv2.normalize(
|
|
image_data, None, 0, 255, cv2.NORM_MINMAX, cv2.CV_8U
|
|
)
|
|
img_pil = Image.fromarray(
|
|
cv2.cvtColor(display_img_8bit, cv2.COLOR_GRAY2RGB)
|
|
)
|
|
resized = self._resize_pil_to_label(
|
|
img_pil, tab_widgets.get("image_container", tab_widgets["image_label"])
|
|
)
|
|
photo = ImageTk.PhotoImage(image=resized)
|
|
tab_widgets["image_label"].config(image=photo, text="")
|
|
setattr(self, photo_attr, photo)
|
|
except Exception as e:
|
|
self.logger.error(f"Error parsing image payload: {e}")
|
|
tab_widgets["image_label"].config(
|
|
image=None, text=f"Error parsing image:\n{e}"
|
|
)
|
|
setattr(self, photo_attr, None)
|
|
try:
|
|
self._display_hex_data(payload, self.bin_tab)
|
|
except Exception:
|
|
try:
|
|
self._display_hex_data(payload, tab_widgets["hex_view"])
|
|
except Exception:
|
|
pass
|
|
|
|
def _resize_pil_to_label(
|
|
self, img: "Image.Image", label_widget: ttk.Label
|
|
) -> "Image.Image":
|
|
try:
|
|
width, height = label_widget.winfo_width(), label_widget.winfo_height()
|
|
if width <= 1 or height <= 1:
|
|
return img
|
|
img_w, img_h = img.size
|
|
scale = min(width / img_w, height / img_h)
|
|
if scale >= 1.0:
|
|
return img
|
|
new_w, new_h = max(1, int(img_w * scale)), max(1, int(img_h * scale))
|
|
return img.resize((new_w, new_h), Image.LANCZOS)
|
|
except Exception:
|
|
return img
|
|
|
|
def _display_hex_data(self, payload: bytearray, widget: scrolledtext.ScrolledText):
|
|
hex_dump = self._format_hex_dump(payload)
|
|
widget.config(state=tk.NORMAL)
|
|
widget.delete("1.0", tk.END)
|
|
widget.insert("1.0", hex_dump)
|
|
widget.config(state=tk.DISABLED)
|
|
|
|
def _display_json_data(self, payload: bytearray, widget: scrolledtext.ScrolledText):
|
|
try:
|
|
import json
|
|
|
|
text = json.dumps(json.loads(payload.decode("utf-8")), indent=2)
|
|
except Exception as e:
|
|
text = f"--- FAILED TO PARSE JSON ---\n{e}\n\n--- RAW HEX DUMP ---"
|
|
text += self._format_hex_dump(payload)
|
|
widget.config(state=tk.NORMAL)
|
|
widget.delete("1.0", tk.END)
|
|
widget.insert("1.0", text)
|
|
widget.config(state=tk.DISABLED)
|
|
|
|
def _log_to_widget(self, message: str, level: str = "DEBUG"):
|
|
self.logger.info(message)
|
|
self.log_tab.config(state=tk.NORMAL)
|
|
self.log_tab.insert(tk.END, f"[{level}] {message}\n")
|
|
self.log_tab.config(state=tk.DISABLED)
|
|
self.log_tab.see(tk.END)
|
|
|
|
def _on_save_ris_csv(self):
|
|
try:
|
|
import csv
|
|
|
|
scenario_rows = [
|
|
self.scenario_tree.item(iid, "values")
|
|
for iid in self.scenario_tree.get_children()
|
|
]
|
|
rows = [
|
|
self.ris_tree.item(iid, "values")
|
|
for iid in self.ris_tree.get_children()
|
|
]
|
|
if not scenario_rows and not rows:
|
|
self._log_to_widget("No RIS data to save.", "INFO")
|
|
return
|
|
project_root = os.path.abspath(
|
|
os.path.join(os.path.dirname(__file__), "..", "..")
|
|
)
|
|
temp_dir = os.path.join(project_root, "Temp")
|
|
os.makedirs(temp_dir, exist_ok=True)
|
|
ts = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%S")
|
|
fname = f"ris_targets_{ts}.csv"
|
|
path = os.path.join(temp_dir, fname)
|
|
with open(path, "w", newline="", encoding="utf-8") as f:
|
|
writer = csv.writer(f)
|
|
if scenario_rows:
|
|
writer.writerow(["Scenario Field", "Value"])
|
|
for s in scenario_rows:
|
|
writer.writerow(s)
|
|
writer.writerow([])
|
|
writer.writerow(["index", "flags", "heading", "x", "y", "z"])
|
|
for r in rows:
|
|
writer.writerow(r)
|
|
self._log_to_widget(f"Saved RIS targets CSV to {path}", "INFO")
|
|
except Exception as e:
|
|
self._log_to_widget(f"Failed to save RIS CSV: {e}", "ERROR")
|
|
|
|
def _format_hex_dump(self, data: bytes, length=16) -> str:
|
|
lines = []
|
|
for i in range(0, len(data), length):
|
|
chunk = data[i : i + length]
|
|
hex_part = " ".join(f"{b:02X}" for b in chunk)
|
|
ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
|
|
lines.append(f"{i:08X} {hex_part:<{length*3}} |{ascii_part}|")
|
|
return "\n".join(lines)
|