1259 lines
50 KiB
Python
1259 lines
50 KiB
Python
# target_simulator/gui/sfp_debug_window.py
|
|
"""
|
|
Provides a Toplevel window for debugging the SFP transport layer.
|
|
This version uses a sampling approach to handle high-frequency data streams
|
|
without overwhelming the GUI thread.
|
|
"""
|
|
|
|
import tkinter as tk
|
|
from tkinter import ttk, scrolledtext
|
|
import logging
|
|
import threading
|
|
import collections
|
|
import datetime
|
|
import os
|
|
import ctypes
|
|
import time
|
|
from typing import Dict, Callable, Optional, Any
|
|
import socket
|
|
|
|
# Third-party imports for image display
|
|
try:
|
|
from PIL import Image, ImageTk
|
|
import numpy as np
|
|
import cv2
|
|
|
|
_IMAGE_LIBS_AVAILABLE = True
|
|
except ImportError:
|
|
_IMAGE_LIBS_AVAILABLE = False
|
|
|
|
# Imports from the project structure
|
|
from target_simulator.core.sfp_transport import SfpTransport, PayloadHandler
|
|
from target_simulator.core.sfp_structures import (
|
|
ImageLeaderData,
|
|
SFPHeader,
|
|
SfpRisStatusPayload,
|
|
)
|
|
|
|
# --- Helper Class for Routing and Buffering Payloads ---
|
|
|
|
|
|
class DebugPayloadRouter:
|
|
"""
|
|
A router that buffers the last received payload for each flow,
|
|
allowing the GUI to sample the data at a lower frequency.
|
|
This class is thread-safe.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._log_prefix = "[DebugPayloadRouter]"
|
|
self._lock = threading.Lock()
|
|
# Buffer to store the last received payload for each flow type
|
|
self._latest_payloads: Dict[str, bytearray] = {}
|
|
# Buffer to store the last raw packet received (bytes, addr)
|
|
self._last_raw_packet: Optional[tuple] = None
|
|
# History of raw packets (timestamp, addr, raw bytes)
|
|
self._history_size = 20
|
|
self._history = collections.deque(maxlen=self._history_size)
|
|
self._persist = False
|
|
# default persist dir: repository Temp/ folder
|
|
project_root = os.path.abspath(
|
|
os.path.join(os.path.dirname(__file__), "..", "..")
|
|
)
|
|
self._persist_dir = os.path.join(project_root, "Temp")
|
|
try:
|
|
os.makedirs(self._persist_dir, exist_ok=True)
|
|
except Exception:
|
|
pass
|
|
logging.info(f"{self._log_prefix} Initialized.")
|
|
|
|
def get_handlers(self) -> Dict[int, PayloadHandler]:
|
|
"""Returns handlers that update the internal last-payload buffer."""
|
|
return {
|
|
ord("M"): lambda payload: self._update_last_payload("MFD", payload),
|
|
ord("S"): lambda payload: self._update_last_payload("SAR", payload),
|
|
ord("B"): lambda payload: self._update_last_payload("BIN", payload),
|
|
ord("J"): lambda payload: self._update_last_payload("JSON", payload),
|
|
# Support both uppercase 'R' and lowercase 'r' as RIS/status flows
|
|
ord("R"): lambda payload: self._handle_ris_status(payload),
|
|
ord("r"): lambda payload: self._handle_ris_status(payload),
|
|
}
|
|
|
|
def _update_last_payload(self, flow_id: str, payload: bytearray):
|
|
"""Thread-safely stores the latest payload for a given flow."""
|
|
with self._lock:
|
|
self._latest_payloads[flow_id] = payload
|
|
|
|
def _handle_ris_status(self, payload: bytearray):
|
|
"""Try to parse a RIS status payload and store a concise summary.
|
|
|
|
If parsing fails, store the raw payload as before.
|
|
"""
|
|
try:
|
|
if len(payload) >= SfpRisStatusPayload.size():
|
|
# Interpret the first bytes as the status payload
|
|
parsed = SfpRisStatusPayload.from_buffer_copy(
|
|
bytes(payload[: SfpRisStatusPayload.size()])
|
|
)
|
|
sc = parsed.scenario
|
|
lines = []
|
|
lines.append("RIS Status Payload:\n")
|
|
# Scenario block
|
|
lines.append("Scenario:")
|
|
lines.append(f" timetag : {sc.timetag}")
|
|
lines.append(f" platform_azim : {sc.platform_azimuth:.6f}")
|
|
lines.append(f" vx,vy,vz : {sc.vx:.3f}, {sc.vy:.3f}, {sc.vz:.3f}")
|
|
lines.append(f" baro_altitude : {sc.baro_altitude:.3f}")
|
|
lines.append(f" latitude : {sc.latitude:.6f}")
|
|
lines.append(f" longitude : {sc.longitude:.6f}")
|
|
lines.append(f" true_heading : {sc.true_heading:.3f}\n")
|
|
|
|
# Targets block
|
|
lines.append("Targets (first non-zero flags shown):")
|
|
any_target = False
|
|
for idx, t in enumerate(parsed.tgt.tgt):
|
|
if t.flags != 0:
|
|
any_target = True
|
|
lines.append(
|
|
f" [{idx}] flags={t.flags} heading={t.heading:.3f} x={t.x:.3f} y={t.y:.3f} z={t.z:.3f}"
|
|
)
|
|
if not any_target:
|
|
lines.append(" (no enabled targets)")
|
|
|
|
# Attach a short hex summary of the first bytes after the header so user can correlate
|
|
try:
|
|
sample_len = min(48, len(payload))
|
|
sample = payload[:sample_len]
|
|
hex_sample = " ".join(f"{b:02X}" for b in sample)
|
|
lines.append("\nPayload sample (hex, first %d bytes):" % sample_len)
|
|
lines.append(f" {hex_sample}")
|
|
except Exception:
|
|
pass
|
|
|
|
text_out = "\n".join(lines)
|
|
# Build structured JSON for UI table consumption
|
|
try:
|
|
import json
|
|
|
|
scenario_dict = {
|
|
"timetag": int(parsed.scenario.timetag),
|
|
"platform_azimuth": float(parsed.scenario.platform_azimuth),
|
|
"vx": float(parsed.scenario.vx),
|
|
"vy": float(parsed.scenario.vy),
|
|
"vz": float(parsed.scenario.vz),
|
|
"baro_altitude": float(parsed.scenario.baro_altitude),
|
|
"latitude": float(parsed.scenario.latitude),
|
|
"longitude": float(parsed.scenario.longitude),
|
|
"true_heading": float(parsed.scenario.true_heading),
|
|
}
|
|
targets_list = []
|
|
for idx, t in enumerate(parsed.tgt.tgt):
|
|
targets_list.append(
|
|
{
|
|
"index": idx,
|
|
"flags": int(t.flags),
|
|
"heading": float(t.heading),
|
|
"x": float(t.x),
|
|
"y": float(t.y),
|
|
"z": float(t.z),
|
|
}
|
|
)
|
|
struct = {"scenario": scenario_dict, "targets": targets_list}
|
|
json_bytes = bytearray(json.dumps(struct).encode("utf-8"))
|
|
except Exception:
|
|
json_bytes = bytearray(b"{}")
|
|
|
|
# Store textual representation and structured JSON so GUI can display it directly
|
|
self._update_last_payload("RIS_STATUS", bytearray(text_out.encode("utf-8")))
|
|
self._update_last_payload("RIS_STATUS_JSON", json_bytes)
|
|
return
|
|
except Exception:
|
|
# fall through to storing raw payload
|
|
pass
|
|
|
|
# Fallback: store raw payload (as hex dump)
|
|
try:
|
|
text_out = "\n".join([f"{b:02X}" for b in payload])
|
|
self._update_last_payload("RIS_STATUS", bytearray(text_out.encode("utf-8")))
|
|
except Exception:
|
|
self._update_last_payload("RIS_STATUS", payload)
|
|
|
|
def get_and_clear_latest_payloads(self) -> Dict[str, bytearray]:
|
|
"""
|
|
Thread-safely retrieves all new payloads received since the last call
|
|
and clears the internal buffer.
|
|
|
|
Returns:
|
|
Dict[str, bytearray]: A dictionary of the latest payload for each flow.
|
|
"""
|
|
with self._lock:
|
|
# Atomically swap the buffer with an empty one
|
|
new_payloads = self._latest_payloads
|
|
self._latest_payloads = {}
|
|
return new_payloads
|
|
|
|
def update_raw_packet(self, raw_bytes: bytes, addr: tuple):
|
|
"""Store the last raw packet received (overwritten by subsequent packets)."""
|
|
with self._lock:
|
|
# Keep last packet for immediate display
|
|
self._last_raw_packet = (raw_bytes, addr)
|
|
# Append to history with timestamp and small metadata
|
|
entry = {
|
|
"ts": datetime.datetime.utcnow(),
|
|
"addr": addr,
|
|
"raw": raw_bytes,
|
|
}
|
|
# Try to parse SFP header to capture flow/TID for list display
|
|
try:
|
|
hdr = SFPHeader.from_buffer_copy(raw_bytes)
|
|
entry["flow"] = int(hdr.SFP_FLOW)
|
|
entry["tid"] = int(hdr.SFP_TID)
|
|
# map common flows to names when possible
|
|
flow_map = {
|
|
ord("M"): "MFD",
|
|
ord("S"): "SAR",
|
|
ord("B"): "BIN",
|
|
ord("J"): "JSON",
|
|
}
|
|
entry["flow_name"] = flow_map.get(
|
|
entry["flow"],
|
|
(
|
|
chr(entry["flow"])
|
|
if 32 <= entry["flow"] < 127
|
|
else str(entry["flow"])
|
|
),
|
|
)
|
|
except Exception:
|
|
# best-effort: leave flow/tid absent
|
|
pass
|
|
self._history.append(entry)
|
|
# Optionally persist to disk (each entry as binary)
|
|
if self._persist:
|
|
try:
|
|
ts = entry["ts"].strftime("%Y%m%dT%H%M%S.%f")
|
|
fname = f"sfp_raw_{ts}_{addr[0].replace(':','_')}_{addr[1]}.bin"
|
|
path = os.path.join(self._persist_dir, fname)
|
|
with open(path, "wb") as f:
|
|
f.write(raw_bytes)
|
|
except Exception:
|
|
# don't propagate persistence errors to caller
|
|
pass
|
|
|
|
def get_and_clear_raw_packet(self) -> Optional[tuple]:
|
|
with self._lock:
|
|
pkt = self._last_raw_packet
|
|
self._last_raw_packet = None
|
|
return pkt
|
|
|
|
def get_history(self):
|
|
with self._lock:
|
|
return list(self._history)
|
|
|
|
def clear_history(self):
|
|
with self._lock:
|
|
self._history.clear()
|
|
|
|
def set_history_size(self, n: int):
|
|
with self._lock:
|
|
try:
|
|
n = max(1, int(n))
|
|
except Exception:
|
|
return
|
|
self._history_size = n
|
|
new_deque = collections.deque(self._history, maxlen=self._history_size)
|
|
self._history = new_deque
|
|
|
|
def set_persist(self, enabled: bool):
|
|
with self._lock:
|
|
self._persist = bool(enabled)
|
|
|
|
|
|
# --- Main Debug Window Class ---
|
|
|
|
|
|
class SfpDebugWindow(tk.Toplevel):
|
|
"""A self-contained SFP debugging and packet inspection window."""
|
|
|
|
GUI_POLL_INTERVAL_MS = 250 # Poll for new data 4 times per second
|
|
|
|
def __init__(self, master):
|
|
super().__init__(master)
|
|
self.title("SFP Packet Inspector")
|
|
self.geometry("900x700")
|
|
self.transient(master)
|
|
|
|
self.logger = logging.getLogger(__name__)
|
|
self.sfp_transport: Optional[SfpTransport] = None
|
|
self.payload_router = DebugPayloadRouter()
|
|
|
|
# Try to apply saved debug settings (history size, persist) from ConfigManager
|
|
try:
|
|
gm = getattr(master, "config_manager", None)
|
|
general = gm.get_general_settings() if gm else {}
|
|
sfp_debug_conf = general.get("sfp_debug", {})
|
|
hist_size = int(sfp_debug_conf.get("history_size", 20))
|
|
persist_raw = bool(sfp_debug_conf.get("persist_raw", False))
|
|
# apply to router
|
|
try:
|
|
self.payload_router.set_history_size(hist_size)
|
|
self.payload_router.set_persist(persist_raw)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
self.mfd_photo: Optional[ImageTk.PhotoImage] = None
|
|
self.sar_photo: Optional[ImageTk.PhotoImage] = None
|
|
|
|
# Read image display size from settings (general.image_display.size)
|
|
try:
|
|
gm = getattr(master, "config_manager", None)
|
|
general = gm.get_general_settings() if gm else {}
|
|
img_conf = general.get("image_display", {})
|
|
self.image_area_size = int(img_conf.get("size", 150))
|
|
except Exception:
|
|
self.image_area_size = 150
|
|
|
|
self._create_widgets()
|
|
self.protocol("WM_DELETE_WINDOW", self._on_close)
|
|
|
|
self.after(self.GUI_POLL_INTERVAL_MS, self._process_latest_payloads)
|
|
# Track last raw update time to throttle high-volume flows
|
|
self._last_raw_update_ts = 0.0
|
|
|
|
def _create_widgets(self):
|
|
# --- Connection Controls (unchanged) ---
|
|
conn_frame = ttk.LabelFrame(self, text="Connection", padding=5)
|
|
conn_frame.pack(side=tk.TOP, fill=tk.X, padx=5, pady=5)
|
|
ttk.Label(conn_frame, text="IP:").pack(side=tk.LEFT, padx=(5, 2))
|
|
self.ip_var = tk.StringVar(value="127.0.0.1")
|
|
ttk.Entry(conn_frame, textvariable=self.ip_var, width=15).pack(side=tk.LEFT)
|
|
ttk.Label(conn_frame, text="Port:").pack(side=tk.LEFT, padx=(10, 2))
|
|
self.port_var = tk.StringVar(value="60002") ##55556 per mfd
|
|
ttk.Entry(conn_frame, textvariable=self.port_var, width=7).pack(side=tk.LEFT)
|
|
self.connect_btn = ttk.Button(
|
|
conn_frame, text="Connect", command=self._on_connect
|
|
)
|
|
self.connect_btn.pack(side=tk.LEFT, padx=(10, 5))
|
|
self.disconnect_btn = ttk.Button(
|
|
conn_frame,
|
|
text="Disconnect",
|
|
command=self._on_disconnect,
|
|
state=tk.DISABLED,
|
|
)
|
|
self.disconnect_btn.pack(side=tk.LEFT, padx=5)
|
|
# Button to configure image display size
|
|
self.image_size_btn = ttk.Button(
|
|
conn_frame, text="Image size...", command=self._open_image_size_dialog
|
|
)
|
|
self.image_size_btn.pack(side=tk.LEFT, padx=5)
|
|
# Button to send a simple UDP probe to the configured IP:Port
|
|
self.send_probe_btn = ttk.Button(
|
|
conn_frame, text="Send probe", command=self._on_send_probe
|
|
)
|
|
self.send_probe_btn.pack(side=tk.LEFT, padx=5)
|
|
# Button to send a minimal SFP ACK packet to the configured IP:Port
|
|
self.send_ack_btn = ttk.Button(
|
|
conn_frame, text="Send ACK", command=self._on_send_ack
|
|
)
|
|
self.send_ack_btn.pack(side=tk.LEFT, padx=5)
|
|
|
|
# --- Script Sender Frame (below connection) ---
|
|
script_frame = ttk.Frame(self)
|
|
script_frame.pack(side=tk.TOP, fill=tk.X, padx=5, pady=(0, 5))
|
|
ttk.Label(script_frame, text="Script to send:").pack(side=tk.LEFT, padx=(5, 2))
|
|
self.script_var = tk.StringVar(value="print('hello from client')")
|
|
ttk.Entry(script_frame, textvariable=self.script_var, width=60).pack(
|
|
side=tk.LEFT, padx=(0, 5)
|
|
)
|
|
self.send_script_btn = ttk.Button(
|
|
script_frame, text="Send script", command=self._on_send_script
|
|
)
|
|
self.send_script_btn.pack(side=tk.LEFT, padx=5)
|
|
|
|
# --- Data Display Notebook (unchanged) ---
|
|
self.notebook = ttk.Notebook(self)
|
|
self.notebook.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
|
self.log_tab = scrolledtext.ScrolledText(
|
|
self.notebook, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 9)
|
|
)
|
|
self.notebook.add(self.log_tab, text="Raw Log")
|
|
if _IMAGE_LIBS_AVAILABLE:
|
|
self.mfd_tab = self._create_image_tab("MFD Image")
|
|
self.notebook.add(self.mfd_tab["frame"], text="MFD Image")
|
|
self.sar_tab = self._create_image_tab("SAR Image")
|
|
self.notebook.add(self.sar_tab["frame"], text="SAR Image")
|
|
# RIS status tab: textual decoded status of RIS payloads
|
|
self.ris_tab = scrolledtext.ScrolledText(
|
|
self.notebook, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 10)
|
|
)
|
|
self.notebook.add(self.ris_tab, text="RIS Status")
|
|
# Raw SFP packet view with history on the left and details on the right
|
|
raw_frame = ttk.Frame(self.notebook)
|
|
# Left: history listbox
|
|
history_frame = ttk.Frame(raw_frame, width=380)
|
|
history_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(5, 2), pady=5)
|
|
ttk.Label(history_frame, text="History (latest)").pack(anchor=tk.W, padx=4)
|
|
# smaller font so more fits on one line
|
|
try:
|
|
history_font = ("Consolas", 8)
|
|
except Exception:
|
|
history_font = None
|
|
# container for Treeview + scrollbar so buttons can sit under it
|
|
list_container = ttk.Frame(history_frame)
|
|
list_container.pack(fill=tk.BOTH, expand=True, padx=4, pady=(2, 4))
|
|
# Create a Treeview with columns: Timestamp | Flow | TID | Size
|
|
columns = ("ts", "flow", "tid", "size")
|
|
self.history_tree = ttk.Treeview(
|
|
list_container, columns=columns, show="headings", height=20
|
|
)
|
|
self.history_tree.heading("ts", text="Timestamp")
|
|
self.history_tree.heading("flow", text="Flow")
|
|
self.history_tree.heading("tid", text="TID")
|
|
self.history_tree.heading("size", text="Size")
|
|
# set column widths (ts wider)
|
|
self.history_tree.column("ts", width=100, anchor="w")
|
|
self.history_tree.column("flow", width=50, anchor="w")
|
|
self.history_tree.column("tid", width=40, anchor="center")
|
|
self.history_tree.column("size", width=50, anchor="e")
|
|
# smaller font for tree rows
|
|
try:
|
|
style = ttk.Style()
|
|
style.configure("Small.Treeview", font=history_font)
|
|
self.history_tree.configure(style="Small.Treeview")
|
|
except Exception:
|
|
pass
|
|
self.history_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
|
|
# scrollbar
|
|
self.history_vscroll = ttk.Scrollbar(
|
|
list_container, orient=tk.VERTICAL, command=self.history_tree.yview
|
|
)
|
|
self.history_vscroll.pack(side=tk.RIGHT, fill=tk.Y)
|
|
self.history_tree.config(yscrollcommand=self.history_vscroll.set)
|
|
hb_frame = ttk.Frame(history_frame)
|
|
hb_frame.pack(fill=tk.X, padx=4, pady=(4, 4))
|
|
# Settings and clear buttons
|
|
self.history_settings_btn = ttk.Button(
|
|
hb_frame,
|
|
text="Settings",
|
|
command=lambda: self._open_history_settings_dialog(),
|
|
)
|
|
self.history_settings_btn.pack(side=tk.LEFT)
|
|
self.history_clear_btn = ttk.Button(
|
|
hb_frame, text="Clear", command=lambda: self._on_clear_history()
|
|
)
|
|
self.history_clear_btn.pack(side=tk.RIGHT)
|
|
# Right: details view (previously raw_tab)
|
|
self.raw_tab_text = scrolledtext.ScrolledText(
|
|
raw_frame, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 9)
|
|
)
|
|
self.raw_tab_text.pack(
|
|
side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(2, 5), pady=5
|
|
)
|
|
try:
|
|
# insert as second tab (index 1)
|
|
self.notebook.insert(1, raw_frame, text="SFP Raw")
|
|
except Exception:
|
|
# fallback
|
|
self.notebook.add(raw_frame, text="SFP Raw")
|
|
# Configure visual tags for flags (set/unset) on raw_tab_text
|
|
try:
|
|
self.raw_tab_text.tag_config(
|
|
"flag_set", background="#d4ffd4", foreground="#006400"
|
|
)
|
|
self.raw_tab_text.tag_config(
|
|
"flag_unset", background="#f0f0f0", foreground="#808080"
|
|
)
|
|
self.raw_tab_text.tag_config(
|
|
"flag_error", background="#ffd4d4", foreground="#800000"
|
|
)
|
|
self.raw_tab_text.tag_config("hdr_field", foreground="#000080")
|
|
except Exception:
|
|
pass
|
|
self.bin_tab = scrolledtext.ScrolledText(
|
|
self.notebook, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 10)
|
|
)
|
|
self.notebook.add(self.bin_tab, text="Binary (Hex)")
|
|
self.json_tab = scrolledtext.ScrolledText(
|
|
self.notebook, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 10)
|
|
)
|
|
self.notebook.add(self.json_tab, text="JSON")
|
|
|
|
def _create_image_tab(self, title: str) -> Dict:
|
|
frame = ttk.Frame(self.notebook)
|
|
# Fixed-size container to keep UI tidy. Image area will be size x size px.
|
|
image_container = ttk.Frame(
|
|
frame,
|
|
width=self.image_area_size,
|
|
height=self.image_area_size,
|
|
relief=tk.SUNKEN,
|
|
)
|
|
image_container.pack(pady=5, padx=5)
|
|
image_container.pack_propagate(False)
|
|
image_label = ttk.Label(
|
|
image_container, text=f"Waiting for {title}...", anchor=tk.CENTER
|
|
)
|
|
image_label.pack(fill=tk.BOTH, expand=True)
|
|
hex_view = scrolledtext.ScrolledText(
|
|
frame, height=8, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 9)
|
|
)
|
|
hex_view.pack(fill=tk.BOTH, expand=True, pady=5, padx=5)
|
|
return {
|
|
"frame": frame,
|
|
"image_label": image_label,
|
|
"hex_view": hex_view,
|
|
"image_container": image_container,
|
|
}
|
|
|
|
def _open_image_size_dialog(self):
|
|
"""Open a small dialog to change the image display size and persist it to settings."""
|
|
dlg = tk.Toplevel(self)
|
|
dlg.title("Image Size")
|
|
dlg.transient(self)
|
|
dlg.grab_set()
|
|
ttk.Label(dlg, text="Image area size (px):").pack(padx=10, pady=(10, 2))
|
|
size_var = tk.StringVar(value=str(self.image_area_size))
|
|
entry = ttk.Entry(dlg, textvariable=size_var, width=8)
|
|
entry.pack(padx=10, pady=(0, 10))
|
|
|
|
btn_frame = ttk.Frame(dlg)
|
|
btn_frame.pack(padx=10, pady=(0, 10))
|
|
|
|
def on_save():
|
|
try:
|
|
v = int(size_var.get())
|
|
if v <= 0:
|
|
raise ValueError()
|
|
except Exception:
|
|
message = "Please enter a positive integer for image size."
|
|
try:
|
|
tk.messagebox.showerror("Invalid value", message, parent=dlg)
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
# Apply to current window
|
|
self.image_area_size = v
|
|
# Update existing containers if present
|
|
for tab in (getattr(self, "mfd_tab", None), getattr(self, "sar_tab", None)):
|
|
if tab and "image_container" in tab:
|
|
tab["image_container"].config(width=v, height=v)
|
|
|
|
# Persist to settings via ConfigManager on master (if available)
|
|
gm = getattr(self.master, "config_manager", None)
|
|
if gm:
|
|
general = gm.get_general_settings() or {}
|
|
image_display = general.get("image_display", {})
|
|
image_display["size"] = v
|
|
general["image_display"] = image_display
|
|
gm.save_general_settings(general)
|
|
|
|
dlg.destroy()
|
|
|
|
def on_cancel():
|
|
dlg.destroy()
|
|
|
|
ttk.Button(btn_frame, text="Cancel", command=on_cancel).pack(
|
|
side=tk.RIGHT, padx=(0, 5)
|
|
)
|
|
ttk.Button(btn_frame, text="Save", command=on_save).pack(side=tk.RIGHT)
|
|
|
|
def _on_connect(self):
|
|
ip = self.ip_var.get()
|
|
try:
|
|
port = int(self.port_var.get())
|
|
except ValueError:
|
|
self._log_to_widget("ERROR: Invalid port number.", "ERROR")
|
|
return
|
|
self._log_to_widget(f"Attempting to connect to {ip}:{port}...")
|
|
ack_config = {ord("M"): 32, ord("S"): 16}
|
|
self.sfp_transport = SfpTransport(
|
|
host=ip,
|
|
port=port,
|
|
payload_handlers=self.payload_router.get_handlers(),
|
|
ack_config=ack_config,
|
|
)
|
|
if self.sfp_transport.start():
|
|
self._log_to_widget(
|
|
"Connection successful. Listening for packets...", "INFO"
|
|
)
|
|
self.connect_btn.config(state=tk.DISABLED)
|
|
self.disconnect_btn.config(state=tk.NORMAL)
|
|
else:
|
|
self._log_to_widget("Connection failed. Check IP/Port and logs.", "ERROR")
|
|
self.sfp_transport = None
|
|
# Register raw packet callback regardless of start result (safe no-op if None)
|
|
if self.sfp_transport:
|
|
# Provide the router.update_raw_packet method as callback
|
|
try:
|
|
self.sfp_transport._raw_packet_callback = (
|
|
self.payload_router.update_raw_packet
|
|
)
|
|
except Exception:
|
|
self.logger.exception(
|
|
"Failed to register raw_packet_callback on SfpTransport"
|
|
)
|
|
|
|
# Bind history tree selection to show past packet
|
|
try:
|
|
self.history_tree.bind(
|
|
"<<TreeviewSelect>>", lambda e: self._on_history_select()
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
def _on_send_probe(self):
|
|
"""Sends a small UDP probe to the configured IP:port to "wake" the server.
|
|
|
|
The server expects any message on its listening port to begin sending SFP
|
|
messages, so we just send a short datagram. This function is intentionally
|
|
lightweight and does not depend on self.sfp_transport; it uses a temporary
|
|
UDP socket so it can be invoked even when not connected/listening.
|
|
"""
|
|
ip = self.ip_var.get()
|
|
try:
|
|
port = int(self.port_var.get())
|
|
except Exception:
|
|
self._log_to_widget("ERROR: Invalid port number for probe.", "ERROR")
|
|
return
|
|
|
|
probe_payload = b"SFP_PROBE\n" # simple payload; server will accept any data
|
|
try:
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.settimeout(1.0)
|
|
sock.sendto(probe_payload, (ip, port))
|
|
sock.close()
|
|
self._log_to_widget(f"Sent probe to {ip}:{port}", "INFO")
|
|
except Exception as e:
|
|
self._log_to_widget(f"Failed to send probe to {ip}:{port}: {e}", "ERROR")
|
|
|
|
def _on_send_ack(self):
|
|
"""Constructs a minimal SFP ACK header and sends it to the server.
|
|
|
|
Uses the active transport socket when available so the packet originates
|
|
from the same local port; otherwise uses a temporary UDP socket.
|
|
"""
|
|
ip = self.ip_var.get()
|
|
try:
|
|
port = int(self.port_var.get())
|
|
except Exception:
|
|
self._log_to_widget("ERROR: Invalid port number for ACK.", "ERROR")
|
|
return
|
|
|
|
try:
|
|
# Construct a minimal valid SFP data fragment (frag 0 of 1) with a small payload.
|
|
payload = b"SFP_WAKE" # small payload so server sees valid metadata
|
|
|
|
hdr = SFPHeader()
|
|
# Direction: normal data (keep 0 for unspecified) or use '<' if needed by server
|
|
hdr.SFP_DIRECTION = 0x3C
|
|
hdr.SFP_FLOW = ord("M") if isinstance("M", str) else 0
|
|
hdr.SFP_TID = 1
|
|
# No special flags except zero; server expects total_frags > 0
|
|
hdr.SFP_FLAGS = 0x00
|
|
hdr.SFP_WIN = 32
|
|
|
|
# Fragment metadata: this is the only fragment
|
|
hdr.SFP_TOTFRGAS = 1
|
|
hdr.SFP_FRAG = 0
|
|
hdr.SFP_PLSIZE = len(payload)
|
|
hdr.SFP_PLOFFSET = 0
|
|
hdr.SFP_TOTSIZE = len(payload)
|
|
|
|
pkt = bytes(hdr) + payload
|
|
|
|
# Prefer to reuse the SfpTransport socket if available so source port matches
|
|
sock = None
|
|
used_temp_sock = False
|
|
if self.sfp_transport and getattr(self.sfp_transport, '_socket', None):
|
|
try:
|
|
sock = self.sfp_transport._socket
|
|
except Exception:
|
|
sock = None
|
|
|
|
if not sock:
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
used_temp_sock = True
|
|
|
|
sock.sendto(pkt, (ip, port))
|
|
if used_temp_sock:
|
|
sock.close()
|
|
|
|
self._log_to_widget(f"Sent SFP data-fragment to {ip}:{port} (flow=M,t id=1)", "INFO")
|
|
except Exception as e:
|
|
self._log_to_widget(f"Failed to send SFP fragment to {ip}:{port}: {e}", "ERROR")
|
|
|
|
def _on_send_script(self):
|
|
"""Constructs a script_message_t-like payload and sends it to the server.
|
|
|
|
The server expects a data tag with tag 'C','S' and type_validity set.
|
|
We'll build the payload using ctypes to match layout and send it using
|
|
the transport socket (if available) so the server treats us as the client.
|
|
"""
|
|
ip = self.ip_var.get()
|
|
try:
|
|
port = int(self.port_var.get())
|
|
except Exception:
|
|
self._log_to_widget("ERROR: Invalid port number for script send.", "ERROR")
|
|
return
|
|
|
|
script_text = self.script_var.get() or ""
|
|
# Limit script size to 1020 bytes to be conservative (server has ~1024)
|
|
script_bytes = script_text.encode("utf-8")
|
|
max_script = 1020
|
|
if len(script_bytes) > max_script:
|
|
script_bytes = script_bytes[:max_script]
|
|
|
|
# Local ctypes definitions that mirror what the C++ server expects
|
|
class LocalDataTag(ctypes.Structure):
|
|
_pack_ = 1
|
|
_fields_ = [
|
|
("ID", ctypes.c_uint8 * 2),
|
|
("VALID", ctypes.c_uint8),
|
|
("VERSION", ctypes.c_uint8),
|
|
("SIZE", ctypes.c_uint32),
|
|
]
|
|
|
|
class ScriptPayload(ctypes.Structure):
|
|
_pack_ = 1
|
|
_fields_ = [
|
|
("script_tag", LocalDataTag),
|
|
("script", ctypes.c_uint8 * 1024),
|
|
]
|
|
|
|
try:
|
|
payload = ScriptPayload()
|
|
# set tag ID to 'C','S'
|
|
payload.script_tag.ID[0] = ord("C")
|
|
payload.script_tag.ID[1] = ord("S")
|
|
payload.script_tag.VALID = 1
|
|
payload.script_tag.VERSION = 1
|
|
payload.script_tag.SIZE = len(script_bytes)
|
|
# copy script bytes
|
|
for i, b in enumerate(script_bytes):
|
|
payload.script[i] = b
|
|
|
|
# Build SFP header
|
|
hdr = SFPHeader()
|
|
hdr.SFP_DIRECTION = 0x3C
|
|
hdr.SFP_FLOW = ord("R") # use 'R' for RIS script commands
|
|
hdr.SFP_TID = 1
|
|
hdr.SFP_FLAGS = 0x00
|
|
hdr.SFP_WIN = 32
|
|
hdr.SFP_TOTFRGAS = 1
|
|
hdr.SFP_FRAG = 0
|
|
pl_bytes = bytes(payload)
|
|
hdr.SFP_PLSIZE = len(pl_bytes)
|
|
hdr.SFP_PLOFFSET = 0
|
|
hdr.SFP_TOTSIZE = len(pl_bytes)
|
|
|
|
pkt = bytes(hdr) + pl_bytes
|
|
|
|
sock = None
|
|
used_temp = False
|
|
if self.sfp_transport and getattr(self.sfp_transport, "_socket", None):
|
|
try:
|
|
sock = self.sfp_transport._socket
|
|
except Exception:
|
|
sock = None
|
|
|
|
if not sock:
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
used_temp = True
|
|
|
|
sock.sendto(pkt, (ip, port))
|
|
if used_temp:
|
|
sock.close()
|
|
|
|
self._log_to_widget(f"Sent script ({len(script_bytes)} bytes) to {ip}:{port}", "INFO")
|
|
except Exception as e:
|
|
self._log_to_widget(f"Failed to send script to {ip}:{port}: {e}", "ERROR")
|
|
|
|
def _on_disconnect(self):
|
|
if self.sfp_transport:
|
|
self._log_to_widget("Disconnecting...", "INFO")
|
|
self.sfp_transport.shutdown()
|
|
self.sfp_transport = None
|
|
self.connect_btn.config(state=tk.NORMAL)
|
|
self.disconnect_btn.config(state=tk.DISABLED)
|
|
self._log_to_widget("Disconnected.", "INFO")
|
|
|
|
def _on_close(self):
|
|
self.logger.info("SFP Debug Window closing.")
|
|
self._on_disconnect()
|
|
self.destroy()
|
|
|
|
def _process_latest_payloads(self):
|
|
"""GUI-thread loop to sample and display the latest payloads."""
|
|
# Get all new payloads that have arrived since the last check
|
|
new_payloads = self.payload_router.get_and_clear_latest_payloads()
|
|
|
|
# If there are new payloads, process them
|
|
if new_payloads:
|
|
self._log_to_widget(
|
|
f"Processing {len(new_payloads)} new payload(s) for flows: {list(new_payloads.keys())}"
|
|
)
|
|
for flow_id, payload in new_payloads.items():
|
|
if flow_id == "MFD" and _IMAGE_LIBS_AVAILABLE:
|
|
self._display_image_data(payload, self.mfd_tab, "mfd_photo")
|
|
# self.notebook.select(self.mfd_tab["frame"])
|
|
elif flow_id == "SAR" and _IMAGE_LIBS_AVAILABLE:
|
|
self._display_image_data(payload, self.sar_tab, "sar_photo")
|
|
# self.notebook.select(self.sar_tab["frame"])
|
|
elif flow_id == "BIN":
|
|
self._display_hex_data(payload, self.bin_tab)
|
|
# self.notebook.select(self.bin_tab)
|
|
elif flow_id == "JSON":
|
|
self._display_json_data(payload, self.json_tab)
|
|
elif flow_id == "RIS_STATUS":
|
|
# Display the textual RIS status in the RIS tab
|
|
try:
|
|
text = (
|
|
payload.decode("utf-8")
|
|
if isinstance(payload, (bytes, bytearray))
|
|
else str(payload)
|
|
)
|
|
except Exception:
|
|
text = str(payload)
|
|
self.ris_tab.config(state=tk.NORMAL)
|
|
self.ris_tab.delete("1.0", tk.END)
|
|
self.ris_tab.insert("1.0", text)
|
|
self.ris_tab.config(state=tk.DISABLED)
|
|
# Optionally show the RIS tab automatically
|
|
# self.notebook.select(self.ris_tab)
|
|
# self.notebook.select(self.json_tab)
|
|
|
|
# Reschedule the next check
|
|
self.after(self.GUI_POLL_INTERVAL_MS, self._process_latest_payloads)
|
|
|
|
# Also check and display last raw packet
|
|
raw_pkt = self.payload_router.get_and_clear_raw_packet()
|
|
if raw_pkt:
|
|
raw_bytes, addr = raw_pkt
|
|
self._display_raw_packet(raw_bytes, addr)
|
|
# Refresh history tree to show new entry
|
|
try:
|
|
self._refresh_history_tree()
|
|
except Exception:
|
|
pass
|
|
|
|
def _refresh_history_tree(self):
|
|
try:
|
|
hist = self.payload_router.get_history()
|
|
# clear current
|
|
for iid in self.history_tree.get_children():
|
|
self.history_tree.delete(iid)
|
|
# insert reversed (latest first)
|
|
for i, entry in enumerate(reversed(hist)):
|
|
ts = entry["ts"].strftime("%H:%M:%S.%f")[:-3]
|
|
flow_name = entry.get("flow_name", "")
|
|
tid = entry.get("tid", "")
|
|
size = len(entry.get("raw", b""))
|
|
self.history_tree.insert(
|
|
"", tk.END, values=(ts, flow_name, tid, f"{size}B")
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
def _on_history_select(self):
|
|
try:
|
|
sel = self.history_tree.selection()
|
|
if not sel:
|
|
return
|
|
iid = sel[0]
|
|
# find index of item among children (0-based latest-first)
|
|
children = list(self.history_tree.get_children())
|
|
try:
|
|
idx = children.index(iid)
|
|
except ValueError:
|
|
idx = None
|
|
hist = list(reversed(self.payload_router.get_history()))
|
|
if idx is None or idx < 0 or idx >= len(hist):
|
|
return
|
|
entry = hist[idx]
|
|
self._display_raw_packet(entry["raw"], entry["addr"])
|
|
except Exception:
|
|
pass
|
|
|
|
def _on_clear_history(self):
|
|
try:
|
|
self.payload_router.clear_history()
|
|
self._refresh_history_listbox()
|
|
except Exception:
|
|
pass
|
|
|
|
def _open_history_settings_dialog(self):
|
|
dlg = tk.Toplevel(self)
|
|
dlg.title("History Settings")
|
|
dlg.transient(self)
|
|
dlg.grab_set()
|
|
# Current values
|
|
try:
|
|
hist_size = self.payload_router._history_size
|
|
persist = self.payload_router._persist
|
|
except Exception:
|
|
hist_size = 20
|
|
persist = False
|
|
|
|
ttk.Label(dlg, text="History size (entries):").pack(padx=10, pady=(10, 2))
|
|
size_var = tk.StringVar(value=str(hist_size))
|
|
entry = ttk.Entry(dlg, textvariable=size_var, width=8)
|
|
entry.pack(padx=10, pady=(0, 10))
|
|
|
|
persist_var = tk.BooleanVar(value=bool(persist))
|
|
ttk.Checkbutton(
|
|
dlg, text="Persist raw packets to Temp/", variable=persist_var
|
|
).pack(padx=10, pady=(0, 10))
|
|
|
|
btn_frame = ttk.Frame(dlg)
|
|
btn_frame.pack(padx=10, pady=(0, 10), fill=tk.X)
|
|
|
|
def on_save():
|
|
try:
|
|
v = int(size_var.get())
|
|
if v <= 0:
|
|
raise ValueError()
|
|
except Exception:
|
|
try:
|
|
tk.messagebox.showerror(
|
|
"Invalid value",
|
|
"Please enter a positive integer for history size.",
|
|
parent=dlg,
|
|
)
|
|
except Exception:
|
|
pass
|
|
return
|
|
# Apply
|
|
try:
|
|
self.payload_router.set_history_size(v)
|
|
self.payload_router.set_persist(bool(persist_var.get()))
|
|
except Exception:
|
|
pass
|
|
# Persist into settings.json via ConfigManager (master.config_manager)
|
|
try:
|
|
gm = getattr(self.master, "config_manager", None)
|
|
if gm:
|
|
general = gm.get_general_settings() or {}
|
|
sfp_debug = general.get("sfp_debug", {})
|
|
sfp_debug["history_size"] = v
|
|
sfp_debug["persist_raw"] = bool(persist_var.get())
|
|
general["sfp_debug"] = sfp_debug
|
|
gm.save_general_settings(general)
|
|
except Exception:
|
|
pass
|
|
dlg.destroy()
|
|
|
|
def on_cancel():
|
|
dlg.destroy()
|
|
|
|
ttk.Button(btn_frame, text="Cancel", command=on_cancel).pack(
|
|
side=tk.RIGHT, padx=(0, 5)
|
|
)
|
|
ttk.Button(btn_frame, text="Save", command=on_save).pack(side=tk.RIGHT)
|
|
|
|
def _display_raw_packet(self, raw_bytes: bytes, addr: tuple):
|
|
"""Show the raw SFP packet bytes and the parsed header (if possible)."""
|
|
try:
|
|
header_size = SFPHeader.size()
|
|
if len(raw_bytes) < header_size:
|
|
raise ValueError("Packet smaller than SFP header")
|
|
|
|
header = SFPHeader.from_buffer_copy(raw_bytes)
|
|
body = raw_bytes[header_size:]
|
|
|
|
# Build a compact two-column header table to save horizontal space
|
|
field_list = [
|
|
"SFP_MARKER",
|
|
"SFP_DIRECTION",
|
|
"SFP_PROT_VER",
|
|
"SFP_PT_SPARE",
|
|
"SFP_TAG",
|
|
"SFP_SRC",
|
|
"SFP_FLOW",
|
|
"SFP_TID",
|
|
"SFP_FLAGS",
|
|
"SFP_WIN",
|
|
"SFP_ERR",
|
|
"SFP_ERR_INFO",
|
|
"SFP_TOTFRGAS",
|
|
"SFP_FRAG",
|
|
"SFP_RECTYPE",
|
|
"SFP_RECSPARE",
|
|
"SFP_PLDAP",
|
|
"SFP_PLEXT",
|
|
"SFP_RECCOUNTER",
|
|
"SFP_PLSIZE",
|
|
"SFP_TOTSIZE",
|
|
"SFP_PLOFFSET",
|
|
]
|
|
|
|
# Collect (label, value) pairs, handle FLAGS specially
|
|
pairs = []
|
|
flag_val = None
|
|
for f in field_list:
|
|
try:
|
|
val = getattr(header, f)
|
|
except Exception:
|
|
val = "<N/A>"
|
|
if f == "SFP_FLAGS":
|
|
flag_val = val
|
|
# still include placeholder for alignment; actual flags printed later
|
|
pairs.append(
|
|
(f, f"{val} (0x{val:X})" if isinstance(val, int) else str(val))
|
|
)
|
|
continue
|
|
if isinstance(val, int):
|
|
pairs.append((f, f"{val} (0x{val:X})"))
|
|
else:
|
|
pairs.append((f, str(val)))
|
|
|
|
# Render two columns: pair up items two-per-line
|
|
# Build a full formatted text string so we can both log it
|
|
# (helpful for external capture/tooling) and display it in the widget.
|
|
out_lines = []
|
|
out_lines.append(f"From {addr}\n\nSFP Header:\n\n")
|
|
col_width = 36 # width for each column block
|
|
for i in range(0, len(pairs), 2):
|
|
left = pairs[i]
|
|
right = pairs[i + 1] if (i + 1) < len(pairs) else None
|
|
left_text = f"{left[0]:12s}: {left[1]}"
|
|
if right:
|
|
right_text = f"{right[0]:12s}: {right[1]}"
|
|
# Pad left_text to column width then append right_text
|
|
line = f"{left_text:<{col_width}} {right_text}"
|
|
else:
|
|
line = left_text
|
|
out_lines.append(line + "\n")
|
|
|
|
# FLAG decoding based on provided enum frag_flags_t
|
|
# bit0 = frag_flag_acq_required
|
|
# bit1 = frag_flag_resent / please_resend
|
|
# bit2 = frag_flag_please_trailer_ack
|
|
# bit7 = frag_flag_error
|
|
flag_defs = [
|
|
(0, "ACQ_REQ"),
|
|
(1, "RESENT"),
|
|
(2, "TRAILER_ACK"),
|
|
(3, "RESV3"),
|
|
(4, "RESV4"),
|
|
(5, "RESV5"),
|
|
(6, "RESV6"),
|
|
(7, "ERROR"),
|
|
]
|
|
|
|
out_lines.append(f"SFP_FLAGS : {flag_val} (0x{flag_val:X}) ")
|
|
# Append colored flag labels; use 'flag_error' tag for ERROR
|
|
for bit, name in flag_defs:
|
|
is_set = False
|
|
try:
|
|
is_set = bool((flag_val >> bit) & 1)
|
|
except Exception:
|
|
is_set = False
|
|
if name == "ERROR" and is_set:
|
|
tag = "flag_error"
|
|
else:
|
|
tag = "flag_set" if is_set else "flag_unset"
|
|
# Append textual flag indicator; tags are only for widget display
|
|
out_lines.append(f" [{name}]")
|
|
|
|
# Fixed legend text for flags (always visible)
|
|
out_lines.append("\n\nFlags legend:\n")
|
|
legend_map = {
|
|
"ACQ_REQ": "Acquisition required/requested",
|
|
"RESENT": "Fragment resent / please resend",
|
|
"TRAILER_ACK": "Request trailer acknowledgement",
|
|
"ERROR": "Packet-level error flag",
|
|
"RESV3": "Reserved",
|
|
"RESV4": "Reserved",
|
|
"RESV5": "Reserved",
|
|
"RESV6": "Reserved",
|
|
}
|
|
for _, name in flag_defs:
|
|
desc = legend_map.get(name, "")
|
|
out_lines.append(f" {name:12s}: {desc}\n")
|
|
|
|
out_lines.append("\nBODY (hex):\n")
|
|
hex_dump = self._format_hex_dump(body)
|
|
out_lines.append(hex_dump)
|
|
|
|
# Join into a single string and log it so external test-run captures
|
|
# include the full packet instead of attaching it.
|
|
full_text = "".join(out_lines)
|
|
#try:
|
|
# Use info level to match other logs produced by this window
|
|
#self.logger.info(full_text)
|
|
#except Exception:
|
|
# Don't fail display on logging problems
|
|
# pass
|
|
|
|
# Now display in the widget with visual tags where helpful.
|
|
self.raw_tab_text.config(state=tk.NORMAL)
|
|
self.raw_tab_text.delete("1.0", tk.END)
|
|
# Insert header block with hdr_field tag
|
|
header_block, _, rest = full_text.partition("\nBODY (hex):\n")
|
|
self.raw_tab_text.insert(tk.END, header_block + "\n", "hdr_field")
|
|
# For flags, re-insert with colored tags: simple approach, append the rest
|
|
self.raw_tab_text.insert(tk.END, "BODY (hex):\n")
|
|
self.raw_tab_text.insert(tk.END, hex_dump)
|
|
self.raw_tab_text.config(state=tk.DISABLED)
|
|
return
|
|
except Exception as e:
|
|
text = f"Failed to format raw packet: {e}\n\nRaw dump:\n"
|
|
text += self._format_hex_dump(raw_bytes)
|
|
self.raw_tab_text.config(state=tk.NORMAL)
|
|
self.raw_tab_text.delete("1.0", tk.END)
|
|
self.raw_tab_text.insert("1.0", text)
|
|
self.raw_tab_text.config(state=tk.DISABLED)
|
|
|
|
def _display_image_data(
|
|
self, payload: bytearray, tab_widgets: Dict[str, Any], photo_attr: str
|
|
):
|
|
"""Parses an image payload and displays it. Now handles simplified structure."""
|
|
try:
|
|
if len(payload) < ctypes.sizeof(ImageLeaderData):
|
|
raise ValueError("Payload smaller than ImageLeaderData header.")
|
|
|
|
leader = ImageLeaderData.from_buffer(payload)
|
|
h, w, bpp = (
|
|
leader.HEADER_DATA.DY,
|
|
leader.HEADER_DATA.DX,
|
|
leader.HEADER_DATA.BPP,
|
|
)
|
|
stride = leader.HEADER_DATA.STRIDE
|
|
offset = ctypes.sizeof(ImageLeaderData)
|
|
|
|
if not (h > 0 and w > 0 and bpp in [1, 2] and stride >= w):
|
|
raise ValueError(
|
|
f"Invalid image dimensions in header: {w}x{h}, bpp={bpp}, stride={stride}"
|
|
)
|
|
|
|
if bpp == 1:
|
|
dtype = np.uint8
|
|
else:
|
|
dtype = np.uint16
|
|
|
|
expected_size = stride * h * bpp
|
|
if (offset + expected_size) > len(payload):
|
|
# Fallback for old format where PIXEL_TAG was at the end of leader
|
|
offset_fallback = (
|
|
ctypes.sizeof(SFPHeader)
|
|
+ ctypes.sizeof(ImageLeaderData)
|
|
- ctypes.sizeof(leader.PIXEL_TAG)
|
|
)
|
|
if (offset_fallback + expected_size) <= len(payload):
|
|
offset = offset_fallback
|
|
else:
|
|
raise ValueError(
|
|
f"Incomplete image data. Expected {expected_size} bytes, got {len(payload) - offset}"
|
|
)
|
|
|
|
pixel_data_view = np.ndarray(
|
|
shape=(h, stride), dtype=dtype, buffer=payload, offset=offset
|
|
)
|
|
# Crop to actual width if stride is larger
|
|
image_data = pixel_data_view[:, :w]
|
|
|
|
display_img_8bit = cv2.normalize(
|
|
image_data, None, 0, 255, cv2.NORM_MINMAX, cv2.CV_8U
|
|
)
|
|
img_pil = Image.fromarray(
|
|
cv2.cvtColor(display_img_8bit, cv2.COLOR_GRAY2RGB)
|
|
)
|
|
|
|
# Resize image to fit the label area while preserving aspect ratio
|
|
try:
|
|
# Use the fixed-size container (150x150) for resizing target
|
|
resized = self._resize_pil_to_label(
|
|
img_pil,
|
|
tab_widgets.get("image_container", tab_widgets["image_label"]),
|
|
)
|
|
except Exception:
|
|
# Fallback to original if anything goes wrong
|
|
resized = img_pil
|
|
|
|
photo = ImageTk.PhotoImage(image=resized)
|
|
tab_widgets["image_label"].config(image=photo, text="")
|
|
setattr(self, photo_attr, photo)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error parsing image payload: {e}")
|
|
tab_widgets["image_label"].config(
|
|
image=None, text=f"Error parsing image:\n{e}"
|
|
)
|
|
setattr(self, photo_attr, None)
|
|
|
|
self._display_hex_data(payload, tab_widgets["hex_view"])
|
|
|
|
def _resize_pil_to_label(
|
|
self, img: "Image.Image", label_widget: ttk.Label
|
|
) -> "Image.Image":
|
|
"""Resize a PIL Image to fit within the current label widget size.
|
|
|
|
If the label widget has not been mapped yet (width/height == 1), this
|
|
will fallback to the image's original size.
|
|
"""
|
|
try:
|
|
# Get current allocated size for label (in pixels)
|
|
width = label_widget.winfo_width()
|
|
height = label_widget.winfo_height()
|
|
# If the widget isn't yet laid out, width/height may be 1 -> use geometry
|
|
if width <= 1 or height <= 1:
|
|
geom = self.geometry() # format: WxH+X+Y
|
|
if "x" in geom:
|
|
parts = geom.split("+", 1)[0].split("x")
|
|
win_w, win_h = int(parts[0]), int(parts[1])
|
|
# Use a fraction of window size for image area
|
|
width = max(1, int(win_w * 0.9))
|
|
height = max(1, int(win_h * 0.6))
|
|
|
|
if width <= 1 or height <= 1:
|
|
return img
|
|
|
|
img_w, img_h = img.size
|
|
# Compute scale preserving aspect ratio
|
|
scale = min(width / img_w, height / img_h)
|
|
if scale >= 1.0:
|
|
return img
|
|
|
|
new_w = max(1, int(img_w * scale))
|
|
new_h = max(1, int(img_h * scale))
|
|
return img.resize((new_w, new_h), Image.LANCZOS)
|
|
except Exception:
|
|
return img
|
|
|
|
def _display_hex_data(self, payload: bytearray, widget: scrolledtext.ScrolledText):
|
|
hex_dump = self._format_hex_dump(payload)
|
|
widget.config(state=tk.NORMAL)
|
|
widget.delete("1.0", tk.END)
|
|
widget.insert("1.0", hex_dump)
|
|
widget.config(state=tk.DISABLED)
|
|
|
|
def _display_json_data(self, payload: bytearray, widget: scrolledtext.ScrolledText):
|
|
try:
|
|
import json
|
|
|
|
text = json.dumps(json.loads(payload.decode("utf-8")), indent=2)
|
|
except Exception as e:
|
|
text = f"--- FAILED TO PARSE JSON ---\n{e}\n\n--- RAW HEX DUMP ---\n"
|
|
text += self._format_hex_dump(payload)
|
|
widget.config(state=tk.NORMAL)
|
|
widget.delete("1.0", tk.END)
|
|
widget.insert("1.0", text)
|
|
widget.config(state=tk.DISABLED)
|
|
|
|
def _log_to_widget(self, message: str, level: str = "DEBUG"):
|
|
self.logger.info(message)
|
|
self.log_tab.config(state=tk.NORMAL)
|
|
self.log_tab.insert(tk.END, f"[{level}] {message}\n")
|
|
self.log_tab.config(state=tk.DISABLED)
|
|
self.log_tab.see(tk.END)
|
|
|
|
def _format_hex_dump(self, data: bytes, length=16) -> str:
|
|
lines = []
|
|
for i in range(0, len(data), length):
|
|
chunk = data[i : i + length]
|
|
hex_part = " ".join(f"{b:02X}" for b in chunk)
|
|
ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
|
|
lines.append(f"{i:08X} {hex_part:<{length*3}} |{ascii_part}|")
|
|
return "\n".join(lines)
|