# target_simulator/gui/sfp_debug_window.py """ Provides a Toplevel window for debugging the SFP transport layer. This version uses a sampling approach to handle high-frequency data streams without overwhelming the GUI thread. """ import tkinter as tk from tkinter import ttk, scrolledtext import logging import threading import collections import datetime import os import ctypes import time from typing import Dict, Callable, Optional, Any # Third-party imports for image display try: from PIL import Image, ImageTk import numpy as np import cv2 _IMAGE_LIBS_AVAILABLE = True except ImportError: _IMAGE_LIBS_AVAILABLE = False # Imports from the project structure from target_simulator.core.sfp_transport import SfpTransport, PayloadHandler from target_simulator.core.sfp_structures import ImageLeaderData, SFPHeader # --- Helper Class for Routing and Buffering Payloads --- class DebugPayloadRouter: """ A router that buffers the last received payload for each flow, allowing the GUI to sample the data at a lower frequency. This class is thread-safe. """ def __init__(self): self._log_prefix = "[DebugPayloadRouter]" self._lock = threading.Lock() # Buffer to store the last received payload for each flow type self._latest_payloads: Dict[str, bytearray] = {} # Buffer to store the last raw packet received (bytes, addr) self._last_raw_packet: Optional[tuple] = None # History of raw packets (timestamp, addr, raw bytes) self._history_size = 20 self._history = collections.deque(maxlen=self._history_size) self._persist = False # default persist dir: repository Temp/ folder project_root = os.path.abspath( os.path.join(os.path.dirname(__file__), "..", "..") ) self._persist_dir = os.path.join(project_root, "Temp") try: os.makedirs(self._persist_dir, exist_ok=True) except Exception: pass logging.info(f"{self._log_prefix} Initialized.") def get_handlers(self) -> Dict[int, PayloadHandler]: """Returns handlers that update the internal last-payload buffer.""" return { ord("M"): lambda payload: self._update_last_payload("MFD", payload), ord("S"): lambda payload: self._update_last_payload("SAR", payload), ord("B"): lambda payload: self._update_last_payload("BIN", payload), ord("J"): lambda payload: self._update_last_payload("JSON", payload), } def _update_last_payload(self, flow_id: str, payload: bytearray): """Thread-safely stores the latest payload for a given flow.""" with self._lock: self._latest_payloads[flow_id] = payload def get_and_clear_latest_payloads(self) -> Dict[str, bytearray]: """ Thread-safely retrieves all new payloads received since the last call and clears the internal buffer. Returns: Dict[str, bytearray]: A dictionary of the latest payload for each flow. """ with self._lock: # Atomically swap the buffer with an empty one new_payloads = self._latest_payloads self._latest_payloads = {} return new_payloads def update_raw_packet(self, raw_bytes: bytes, addr: tuple): """Store the last raw packet received (overwritten by subsequent packets).""" with self._lock: # Keep last packet for immediate display self._last_raw_packet = (raw_bytes, addr) # Append to history with timestamp and small metadata entry = { "ts": datetime.datetime.utcnow(), "addr": addr, "raw": raw_bytes, } # Try to parse SFP header to capture flow/TID for list display try: hdr = SFPHeader.from_buffer_copy(raw_bytes) entry["flow"] = int(hdr.SFP_FLOW) entry["tid"] = int(hdr.SFP_TID) # map common flows to names when possible flow_map = { ord("M"): "MFD", ord("S"): "SAR", ord("B"): "BIN", ord("J"): "JSON", } entry["flow_name"] = flow_map.get( entry["flow"], ( chr(entry["flow"]) if 32 <= entry["flow"] < 127 else str(entry["flow"]) ), ) except Exception: # best-effort: leave flow/tid absent pass self._history.append(entry) # Optionally persist to disk (each entry as binary) if self._persist: try: ts = entry["ts"].strftime("%Y%m%dT%H%M%S.%f") fname = f"sfp_raw_{ts}_{addr[0].replace(':','_')}_{addr[1]}.bin" path = os.path.join(self._persist_dir, fname) with open(path, "wb") as f: f.write(raw_bytes) except Exception: # don't propagate persistence errors to caller pass def get_and_clear_raw_packet(self) -> Optional[tuple]: with self._lock: pkt = self._last_raw_packet self._last_raw_packet = None return pkt def get_history(self): with self._lock: return list(self._history) def clear_history(self): with self._lock: self._history.clear() def set_history_size(self, n: int): with self._lock: try: n = max(1, int(n)) except Exception: return self._history_size = n new_deque = collections.deque(self._history, maxlen=self._history_size) self._history = new_deque def set_persist(self, enabled: bool): with self._lock: self._persist = bool(enabled) # --- Main Debug Window Class --- class SfpDebugWindow(tk.Toplevel): """A self-contained SFP debugging and packet inspection window.""" GUI_POLL_INTERVAL_MS = 250 # Poll for new data 4 times per second def __init__(self, master): super().__init__(master) self.title("SFP Packet Inspector") self.geometry("900x700") self.transient(master) self.logger = logging.getLogger(__name__) self.sfp_transport: Optional[SfpTransport] = None self.payload_router = DebugPayloadRouter() # Try to apply saved debug settings (history size, persist) from ConfigManager try: gm = getattr(master, "config_manager", None) general = gm.get_general_settings() if gm else {} sfp_debug_conf = general.get("sfp_debug", {}) hist_size = int(sfp_debug_conf.get("history_size", 20)) persist_raw = bool(sfp_debug_conf.get("persist_raw", False)) # apply to router try: self.payload_router.set_history_size(hist_size) self.payload_router.set_persist(persist_raw) except Exception: pass except Exception: pass self.mfd_photo: Optional[ImageTk.PhotoImage] = None self.sar_photo: Optional[ImageTk.PhotoImage] = None # Read image display size from settings (general.image_display.size) try: gm = getattr(master, "config_manager", None) general = gm.get_general_settings() if gm else {} img_conf = general.get("image_display", {}) self.image_area_size = int(img_conf.get("size", 150)) except Exception: self.image_area_size = 150 self._create_widgets() self.protocol("WM_DELETE_WINDOW", self._on_close) self.after(self.GUI_POLL_INTERVAL_MS, self._process_latest_payloads) def _create_widgets(self): # --- Connection Controls (unchanged) --- conn_frame = ttk.LabelFrame(self, text="Connection", padding=5) conn_frame.pack(side=tk.TOP, fill=tk.X, padx=5, pady=5) ttk.Label(conn_frame, text="IP:").pack(side=tk.LEFT, padx=(5, 2)) self.ip_var = tk.StringVar(value="127.0.0.1") ttk.Entry(conn_frame, textvariable=self.ip_var, width=15).pack(side=tk.LEFT) ttk.Label(conn_frame, text="Port:").pack(side=tk.LEFT, padx=(10, 2)) self.port_var = tk.StringVar(value="55556") ttk.Entry(conn_frame, textvariable=self.port_var, width=7).pack(side=tk.LEFT) self.connect_btn = ttk.Button( conn_frame, text="Connect", command=self._on_connect ) self.connect_btn.pack(side=tk.LEFT, padx=(10, 5)) self.disconnect_btn = ttk.Button( conn_frame, text="Disconnect", command=self._on_disconnect, state=tk.DISABLED, ) self.disconnect_btn.pack(side=tk.LEFT, padx=5) # Button to configure image display size self.image_size_btn = ttk.Button( conn_frame, text="Image size...", command=self._open_image_size_dialog ) self.image_size_btn.pack(side=tk.LEFT, padx=5) # --- Data Display Notebook (unchanged) --- self.notebook = ttk.Notebook(self) self.notebook.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) self.log_tab = scrolledtext.ScrolledText( self.notebook, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 9) ) self.notebook.add(self.log_tab, text="Raw Log") if _IMAGE_LIBS_AVAILABLE: self.mfd_tab = self._create_image_tab("MFD Image") self.notebook.add(self.mfd_tab["frame"], text="MFD Image") self.sar_tab = self._create_image_tab("SAR Image") self.notebook.add(self.sar_tab["frame"], text="SAR Image") # Raw SFP packet view with history on the left and details on the right raw_frame = ttk.Frame(self.notebook) # Left: history listbox history_frame = ttk.Frame(raw_frame, width=380) history_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(5, 2), pady=5) ttk.Label(history_frame, text="History (latest)").pack(anchor=tk.W, padx=4) # smaller font so more fits on one line try: history_font = ("Consolas", 8) except Exception: history_font = None # container for Treeview + scrollbar so buttons can sit under it list_container = ttk.Frame(history_frame) list_container.pack(fill=tk.BOTH, expand=True, padx=4, pady=(2, 4)) # Create a Treeview with columns: Timestamp | Flow | TID | Size columns = ("ts", "flow", "tid", "size") self.history_tree = ttk.Treeview( list_container, columns=columns, show="headings", height=20 ) self.history_tree.heading("ts", text="Timestamp") self.history_tree.heading("flow", text="Flow") self.history_tree.heading("tid", text="TID") self.history_tree.heading("size", text="Size") # set column widths (ts wider) self.history_tree.column("ts", width=100, anchor="w") self.history_tree.column("flow", width=50, anchor="w") self.history_tree.column("tid", width=40, anchor="center") self.history_tree.column("size", width=50, anchor="e") # smaller font for tree rows try: style = ttk.Style() style.configure("Small.Treeview", font=history_font) self.history_tree.configure(style="Small.Treeview") except Exception: pass self.history_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) # scrollbar self.history_vscroll = ttk.Scrollbar( list_container, orient=tk.VERTICAL, command=self.history_tree.yview ) self.history_vscroll.pack(side=tk.RIGHT, fill=tk.Y) self.history_tree.config(yscrollcommand=self.history_vscroll.set) hb_frame = ttk.Frame(history_frame) hb_frame.pack(fill=tk.X, padx=4, pady=(4, 4)) # Settings and clear buttons self.history_settings_btn = ttk.Button( hb_frame, text="Settings", command=lambda: self._open_history_settings_dialog(), ) self.history_settings_btn.pack(side=tk.LEFT) self.history_clear_btn = ttk.Button( hb_frame, text="Clear", command=lambda: self._on_clear_history() ) self.history_clear_btn.pack(side=tk.RIGHT) # Right: details view (previously raw_tab) self.raw_tab_text = scrolledtext.ScrolledText( raw_frame, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 9) ) self.raw_tab_text.pack( side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(2, 5), pady=5 ) try: # insert as second tab (index 1) self.notebook.insert(1, raw_frame, text="SFP Raw") except Exception: # fallback self.notebook.add(raw_frame, text="SFP Raw") # Configure visual tags for flags (set/unset) on raw_tab_text try: self.raw_tab_text.tag_config( "flag_set", background="#d4ffd4", foreground="#006400" ) self.raw_tab_text.tag_config( "flag_unset", background="#f0f0f0", foreground="#808080" ) self.raw_tab_text.tag_config( "flag_error", background="#ffd4d4", foreground="#800000" ) self.raw_tab_text.tag_config("hdr_field", foreground="#000080") except Exception: pass self.bin_tab = scrolledtext.ScrolledText( self.notebook, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 10) ) self.notebook.add(self.bin_tab, text="Binary (Hex)") self.json_tab = scrolledtext.ScrolledText( self.notebook, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 10) ) self.notebook.add(self.json_tab, text="JSON") def _create_image_tab(self, title: str) -> Dict: frame = ttk.Frame(self.notebook) # Fixed-size container to keep UI tidy. Image area will be size x size px. image_container = ttk.Frame( frame, width=self.image_area_size, height=self.image_area_size, relief=tk.SUNKEN, ) image_container.pack(pady=5, padx=5) image_container.pack_propagate(False) image_label = ttk.Label( image_container, text=f"Waiting for {title}...", anchor=tk.CENTER ) image_label.pack(fill=tk.BOTH, expand=True) hex_view = scrolledtext.ScrolledText( frame, height=8, state=tk.DISABLED, wrap=tk.NONE, font=("Consolas", 9) ) hex_view.pack(fill=tk.BOTH, expand=True, pady=5, padx=5) return { "frame": frame, "image_label": image_label, "hex_view": hex_view, "image_container": image_container, } def _open_image_size_dialog(self): """Open a small dialog to change the image display size and persist it to settings.""" dlg = tk.Toplevel(self) dlg.title("Image Size") dlg.transient(self) dlg.grab_set() ttk.Label(dlg, text="Image area size (px):").pack(padx=10, pady=(10, 2)) size_var = tk.StringVar(value=str(self.image_area_size)) entry = ttk.Entry(dlg, textvariable=size_var, width=8) entry.pack(padx=10, pady=(0, 10)) btn_frame = ttk.Frame(dlg) btn_frame.pack(padx=10, pady=(0, 10)) def on_save(): try: v = int(size_var.get()) if v <= 0: raise ValueError() except Exception: message = "Please enter a positive integer for image size." try: tk.messagebox.showerror("Invalid value", message, parent=dlg) except Exception: pass return # Apply to current window self.image_area_size = v # Update existing containers if present for tab in (getattr(self, "mfd_tab", None), getattr(self, "sar_tab", None)): if tab and "image_container" in tab: tab["image_container"].config(width=v, height=v) # Persist to settings via ConfigManager on master (if available) gm = getattr(self.master, "config_manager", None) if gm: general = gm.get_general_settings() or {} image_display = general.get("image_display", {}) image_display["size"] = v general["image_display"] = image_display gm.save_general_settings(general) dlg.destroy() def on_cancel(): dlg.destroy() ttk.Button(btn_frame, text="Cancel", command=on_cancel).pack( side=tk.RIGHT, padx=(0, 5) ) ttk.Button(btn_frame, text="Save", command=on_save).pack(side=tk.RIGHT) def _on_connect(self): ip = self.ip_var.get() try: port = int(self.port_var.get()) except ValueError: self._log_to_widget("ERROR: Invalid port number.", "ERROR") return self._log_to_widget(f"Attempting to connect to {ip}:{port}...") ack_config = {ord("M"): 32, ord("S"): 16} self.sfp_transport = SfpTransport( host=ip, port=port, payload_handlers=self.payload_router.get_handlers(), ack_config=ack_config, ) if self.sfp_transport.start(): self._log_to_widget( "Connection successful. Listening for packets...", "INFO" ) self.connect_btn.config(state=tk.DISABLED) self.disconnect_btn.config(state=tk.NORMAL) else: self._log_to_widget("Connection failed. Check IP/Port and logs.", "ERROR") self.sfp_transport = None # Register raw packet callback regardless of start result (safe no-op if None) if self.sfp_transport: # Provide the router.update_raw_packet method as callback try: self.sfp_transport._raw_packet_callback = ( self.payload_router.update_raw_packet ) except Exception: self.logger.exception( "Failed to register raw_packet_callback on SfpTransport" ) # Bind history tree selection to show past packet try: self.history_tree.bind( "<>", lambda e: self._on_history_select() ) except Exception: pass def _on_disconnect(self): if self.sfp_transport: self._log_to_widget("Disconnecting...", "INFO") self.sfp_transport.shutdown() self.sfp_transport = None self.connect_btn.config(state=tk.NORMAL) self.disconnect_btn.config(state=tk.DISABLED) self._log_to_widget("Disconnected.", "INFO") def _on_close(self): self.logger.info("SFP Debug Window closing.") self._on_disconnect() self.destroy() def _process_latest_payloads(self): """GUI-thread loop to sample and display the latest payloads.""" # Get all new payloads that have arrived since the last check new_payloads = self.payload_router.get_and_clear_latest_payloads() # If there are new payloads, process them if new_payloads: self._log_to_widget( f"Processing {len(new_payloads)} new payload(s) for flows: {list(new_payloads.keys())}" ) for flow_id, payload in new_payloads.items(): if flow_id == "MFD" and _IMAGE_LIBS_AVAILABLE: self._display_image_data(payload, self.mfd_tab, "mfd_photo") # self.notebook.select(self.mfd_tab["frame"]) elif flow_id == "SAR" and _IMAGE_LIBS_AVAILABLE: self._display_image_data(payload, self.sar_tab, "sar_photo") # self.notebook.select(self.sar_tab["frame"]) elif flow_id == "BIN": self._display_hex_data(payload, self.bin_tab) # self.notebook.select(self.bin_tab) elif flow_id == "JSON": self._display_json_data(payload, self.json_tab) # self.notebook.select(self.json_tab) # Reschedule the next check self.after(self.GUI_POLL_INTERVAL_MS, self._process_latest_payloads) # Also check and display last raw packet raw_pkt = self.payload_router.get_and_clear_raw_packet() if raw_pkt: raw_bytes, addr = raw_pkt self._display_raw_packet(raw_bytes, addr) # Refresh history tree to show new entry try: self._refresh_history_tree() except Exception: pass def _refresh_history_tree(self): try: hist = self.payload_router.get_history() # clear current for iid in self.history_tree.get_children(): self.history_tree.delete(iid) # insert reversed (latest first) for i, entry in enumerate(reversed(hist)): ts = entry["ts"].strftime("%H:%M:%S.%f")[:-3] flow_name = entry.get("flow_name", "") tid = entry.get("tid", "") size = len(entry.get("raw", b"")) self.history_tree.insert( "", tk.END, values=(ts, flow_name, tid, f"{size}B") ) except Exception: pass def _on_history_select(self): try: sel = self.history_tree.selection() if not sel: return iid = sel[0] # find index of item among children (0-based latest-first) children = list(self.history_tree.get_children()) try: idx = children.index(iid) except ValueError: idx = None hist = list(reversed(self.payload_router.get_history())) if idx is None or idx < 0 or idx >= len(hist): return entry = hist[idx] self._display_raw_packet(entry["raw"], entry["addr"]) except Exception: pass def _on_clear_history(self): try: self.payload_router.clear_history() self._refresh_history_listbox() except Exception: pass def _open_history_settings_dialog(self): dlg = tk.Toplevel(self) dlg.title("History Settings") dlg.transient(self) dlg.grab_set() # Current values try: hist_size = self.payload_router._history_size persist = self.payload_router._persist except Exception: hist_size = 20 persist = False ttk.Label(dlg, text="History size (entries):").pack(padx=10, pady=(10, 2)) size_var = tk.StringVar(value=str(hist_size)) entry = ttk.Entry(dlg, textvariable=size_var, width=8) entry.pack(padx=10, pady=(0, 10)) persist_var = tk.BooleanVar(value=bool(persist)) ttk.Checkbutton( dlg, text="Persist raw packets to Temp/", variable=persist_var ).pack(padx=10, pady=(0, 10)) btn_frame = ttk.Frame(dlg) btn_frame.pack(padx=10, pady=(0, 10), fill=tk.X) def on_save(): try: v = int(size_var.get()) if v <= 0: raise ValueError() except Exception: try: tk.messagebox.showerror( "Invalid value", "Please enter a positive integer for history size.", parent=dlg, ) except Exception: pass return # Apply try: self.payload_router.set_history_size(v) self.payload_router.set_persist(bool(persist_var.get())) except Exception: pass # Persist into settings.json via ConfigManager (master.config_manager) try: gm = getattr(self.master, "config_manager", None) if gm: general = gm.get_general_settings() or {} sfp_debug = general.get("sfp_debug", {}) sfp_debug["history_size"] = v sfp_debug["persist_raw"] = bool(persist_var.get()) general["sfp_debug"] = sfp_debug gm.save_general_settings(general) except Exception: pass dlg.destroy() def on_cancel(): dlg.destroy() ttk.Button(btn_frame, text="Cancel", command=on_cancel).pack( side=tk.RIGHT, padx=(0, 5) ) ttk.Button(btn_frame, text="Save", command=on_save).pack(side=tk.RIGHT) def _display_raw_packet(self, raw_bytes: bytes, addr: tuple): """Show the raw SFP packet bytes and the parsed header (if possible).""" try: header_size = SFPHeader.size() if len(raw_bytes) < header_size: raise ValueError("Packet smaller than SFP header") header = SFPHeader.from_buffer_copy(raw_bytes) body = raw_bytes[header_size:] # Build a compact two-column header table to save horizontal space field_list = [ "SFP_MARKER", "SFP_DIRECTION", "SFP_PROT_VER", "SFP_PT_SPARE", "SFP_TAG", "SFP_SRC", "SFP_FLOW", "SFP_TID", "SFP_FLAGS", "SFP_WIN", "SFP_ERR", "SFP_ERR_INFO", "SFP_TOTFRGAS", "SFP_FRAG", "SFP_RECTYPE", "SFP_RECSPARE", "SFP_PLDAP", "SFP_PLEXT", "SFP_RECCOUNTER", "SFP_PLSIZE", "SFP_TOTSIZE", "SFP_PLOFFSET", ] # Collect (label, value) pairs, handle FLAGS specially pairs = [] flag_val = None for f in field_list: try: val = getattr(header, f) except Exception: val = "" if f == "SFP_FLAGS": flag_val = val # still include placeholder for alignment; actual flags printed later pairs.append( (f, f"{val} (0x{val:X})" if isinstance(val, int) else str(val)) ) continue if isinstance(val, int): pairs.append((f, f"{val} (0x{val:X})")) else: pairs.append((f, str(val))) # Render two columns: pair up items two-per-line self.raw_tab_text.config(state=tk.NORMAL) self.raw_tab_text.delete("1.0", tk.END) self.raw_tab_text.insert( tk.END, f"From {addr}\n\nSFP Header:\n\n", "hdr_field" ) col_width = 36 # width for each column block for i in range(0, len(pairs), 2): left = pairs[i] right = pairs[i + 1] if (i + 1) < len(pairs) else None left_text = f"{left[0]:12s}: {left[1]}" if right: right_text = f"{right[0]:12s}: {right[1]}" # Pad left_text to column width then append right_text line = f"{left_text:<{col_width}} {right_text}" else: line = left_text self.raw_tab_text.insert(tk.END, line + "\n", "hdr_field") # FLAG decoding based on provided enum frag_flags_t # bit0 = frag_flag_acq_required # bit1 = frag_flag_resent / please_resend # bit2 = frag_flag_please_trailer_ack # bit7 = frag_flag_error flag_defs = [ (0, "ACQ_REQ"), (1, "RESENT"), (2, "TRAILER_ACK"), (3, "RESV3"), (4, "RESV4"), (5, "RESV5"), (6, "RESV6"), (7, "ERROR"), ] flag_line_start = self.raw_tab_text.index(tk.END) self.raw_tab_text.insert( tk.END, f"SFP_FLAGS : {flag_val} (0x{flag_val:X}) " ) # Append colored flag labels; use 'flag_error' tag for ERROR for bit, name in flag_defs: is_set = False try: is_set = bool((flag_val >> bit) & 1) except Exception: is_set = False if name == "ERROR" and is_set: tag = "flag_error" else: tag = "flag_set" if is_set else "flag_unset" self.raw_tab_text.insert(tk.END, f" [{name}]", tag) # Fixed legend text for flags (always visible) self.raw_tab_text.insert(tk.END, "\n\nFlags legend:\n", "hdr_field") legend_map = { "ACQ_REQ": "Acquisition required/requested", "RESENT": "Fragment resent / please resend", "TRAILER_ACK": "Request trailer acknowledgement", "ERROR": "Packet-level error flag", "RESV3": "Reserved", "RESV4": "Reserved", "RESV5": "Reserved", "RESV6": "Reserved", } for _, name in flag_defs: desc = legend_map.get(name, "") self.raw_tab_text.insert(tk.END, f" {name:12s}: {desc}\n") self.raw_tab_text.insert(tk.END, "\nBODY (hex):\n") self.raw_tab_text.insert(tk.END, self._format_hex_dump(body)) self.raw_tab_text.config(state=tk.DISABLED) return except Exception as e: text = f"Failed to format raw packet: {e}\n\nRaw dump:\n" text += self._format_hex_dump(raw_bytes) self.raw_tab_text.config(state=tk.NORMAL) self.raw_tab_text.delete("1.0", tk.END) self.raw_tab_text.insert("1.0", text) self.raw_tab_text.config(state=tk.DISABLED) def _display_image_data( self, payload: bytearray, tab_widgets: Dict[str, Any], photo_attr: str ): """Parses an image payload and displays it. Now handles simplified structure.""" try: if len(payload) < ctypes.sizeof(ImageLeaderData): raise ValueError("Payload smaller than ImageLeaderData header.") leader = ImageLeaderData.from_buffer(payload) h, w, bpp = ( leader.HEADER_DATA.DY, leader.HEADER_DATA.DX, leader.HEADER_DATA.BPP, ) stride = leader.HEADER_DATA.STRIDE offset = ctypes.sizeof(ImageLeaderData) if not (h > 0 and w > 0 and bpp in [1, 2] and stride >= w): raise ValueError( f"Invalid image dimensions in header: {w}x{h}, bpp={bpp}, stride={stride}" ) if bpp == 1: dtype = np.uint8 else: dtype = np.uint16 expected_size = stride * h * bpp if (offset + expected_size) > len(payload): # Fallback for old format where PIXEL_TAG was at the end of leader offset_fallback = ( ctypes.sizeof(SFPHeader) + ctypes.sizeof(ImageLeaderData) - ctypes.sizeof(leader.PIXEL_TAG) ) if (offset_fallback + expected_size) <= len(payload): offset = offset_fallback else: raise ValueError( f"Incomplete image data. Expected {expected_size} bytes, got {len(payload) - offset}" ) pixel_data_view = np.ndarray( shape=(h, stride), dtype=dtype, buffer=payload, offset=offset ) # Crop to actual width if stride is larger image_data = pixel_data_view[:, :w] display_img_8bit = cv2.normalize( image_data, None, 0, 255, cv2.NORM_MINMAX, cv2.CV_8U ) img_pil = Image.fromarray( cv2.cvtColor(display_img_8bit, cv2.COLOR_GRAY2RGB) ) # Resize image to fit the label area while preserving aspect ratio try: # Use the fixed-size container (150x150) for resizing target resized = self._resize_pil_to_label( img_pil, tab_widgets.get("image_container", tab_widgets["image_label"]), ) except Exception: # Fallback to original if anything goes wrong resized = img_pil photo = ImageTk.PhotoImage(image=resized) tab_widgets["image_label"].config(image=photo, text="") setattr(self, photo_attr, photo) except Exception as e: self.logger.error(f"Error parsing image payload: {e}") tab_widgets["image_label"].config( image=None, text=f"Error parsing image:\n{e}" ) setattr(self, photo_attr, None) self._display_hex_data(payload, tab_widgets["hex_view"]) def _resize_pil_to_label( self, img: "Image.Image", label_widget: ttk.Label ) -> "Image.Image": """Resize a PIL Image to fit within the current label widget size. If the label widget has not been mapped yet (width/height == 1), this will fallback to the image's original size. """ try: # Get current allocated size for label (in pixels) width = label_widget.winfo_width() height = label_widget.winfo_height() # If the widget isn't yet laid out, width/height may be 1 -> use geometry if width <= 1 or height <= 1: geom = self.geometry() # format: WxH+X+Y if "x" in geom: parts = geom.split("+", 1)[0].split("x") win_w, win_h = int(parts[0]), int(parts[1]) # Use a fraction of window size for image area width = max(1, int(win_w * 0.9)) height = max(1, int(win_h * 0.6)) if width <= 1 or height <= 1: return img img_w, img_h = img.size # Compute scale preserving aspect ratio scale = min(width / img_w, height / img_h) if scale >= 1.0: return img new_w = max(1, int(img_w * scale)) new_h = max(1, int(img_h * scale)) return img.resize((new_w, new_h), Image.LANCZOS) except Exception: return img def _display_hex_data(self, payload: bytearray, widget: scrolledtext.ScrolledText): hex_dump = self._format_hex_dump(payload) widget.config(state=tk.NORMAL) widget.delete("1.0", tk.END) widget.insert("1.0", hex_dump) widget.config(state=tk.DISABLED) def _display_json_data(self, payload: bytearray, widget: scrolledtext.ScrolledText): try: import json text = json.dumps(json.loads(payload.decode("utf-8")), indent=2) except Exception as e: text = f"--- FAILED TO PARSE JSON ---\n{e}\n\n--- RAW HEX DUMP ---\n" text += self._format_hex_dump(payload) widget.config(state=tk.NORMAL) widget.delete("1.0", tk.END) widget.insert("1.0", text) widget.config(state=tk.DISABLED) def _log_to_widget(self, message: str, level: str = "DEBUG"): self.logger.info(message) self.log_tab.config(state=tk.NORMAL) self.log_tab.insert(tk.END, f"[{level}] {message}\n") self.log_tab.config(state=tk.DISABLED) self.log_tab.see(tk.END) def _format_hex_dump(self, data: bytes, length=16) -> str: lines = [] for i in range(0, len(data), length): chunk = data[i : i + length] hex_part = " ".join(f"{b:02X}" for b in chunk) ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk) lines.append(f"{i:08X} {hex_part:<{length*3}} |{ascii_part}|") return "\n".join(lines)