470 lines
18 KiB
Python
470 lines
18 KiB
Python
"""Tkinter viewer window for VideoReceiverSFP with MFD parameter controls."""
|
|
import threading
|
|
import time
|
|
import logging
|
|
import tkinter as tk
|
|
from tkinter import ttk, colorchooser
|
|
from typing import Any, Optional, Callable
|
|
import os
|
|
|
|
try:
|
|
from PIL import Image, ImageTk, ImageOps
|
|
except Exception:
|
|
Image = ImageTk = ImageOps = None
|
|
|
|
# Import MFD state if available
|
|
try:
|
|
from VideoReceiverSFP.core.mfd_state import MfdState
|
|
except ImportError:
|
|
import sys
|
|
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
|
|
try:
|
|
from core.mfd_state import MfdState
|
|
except ImportError:
|
|
MfdState = None
|
|
|
|
|
|
class SfpViewerWithParams:
|
|
"""Tkinter viewer with MFD parameter controls matching ControlPanel layout."""
|
|
|
|
def __init__(self,
|
|
window_title: str = "SFP Viewer with MFD Params",
|
|
on_mfd_param_changed: Optional[Callable] = None,
|
|
master: Optional[tk.Widget] = None):
|
|
"""Initialize viewer with MFD parameter controls.
|
|
|
|
Args:
|
|
window_title: Window title
|
|
on_mfd_param_changed: Optional callback(param_type, name, value) when params change
|
|
"""
|
|
# If a master widget is provided, embed UI inside it; otherwise create a standalone Tk.
|
|
if master is None:
|
|
self._root = tk.Tk()
|
|
self._owns_root = True
|
|
self._root.title(window_title)
|
|
self._root.geometry("900x600")
|
|
else:
|
|
self._root = master
|
|
self._owns_root = False
|
|
|
|
# Callback for parameter changes
|
|
self._on_mfd_param_changed = on_mfd_param_changed
|
|
|
|
# MFD state manager (local instance)
|
|
if MfdState:
|
|
self._mfd_state = MfdState()
|
|
else:
|
|
self._mfd_state = None
|
|
logging.warning("MfdState not available; params controls disabled")
|
|
|
|
# Build UI
|
|
self._build_ui()
|
|
|
|
# Image display
|
|
self._photo = None
|
|
self._pending_frame = None
|
|
self._lock = threading.Lock()
|
|
self._running = False
|
|
|
|
# FPS tracking
|
|
self._frame_times = []
|
|
self._first_frame_received = False
|
|
self._default_size = 484 # Default MFD size
|
|
|
|
# Schedule GUI polling
|
|
self._poll_interval_ms = 100
|
|
self._running = True
|
|
self._root.after(self._poll_interval_ms, self._poll)
|
|
|
|
try:
|
|
logging.getLogger().info("SfpViewerWithParams: GUI initialized")
|
|
# Raise window briefly only if we own the root
|
|
if self._owns_root:
|
|
try:
|
|
self._root.lift()
|
|
self._root.attributes("-topmost", True)
|
|
self._root.update()
|
|
def _unset_topmost():
|
|
try:
|
|
self._root.attributes("-topmost", False)
|
|
except Exception:
|
|
pass
|
|
self._root.after(200, _unset_topmost)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
def _build_ui(self):
|
|
"""Build main UI: image on left, MFD parameters on right."""
|
|
# Main container
|
|
main_frame = ttk.Frame(self._root)
|
|
main_frame.pack(fill="both", expand=True, padx=5, pady=5)
|
|
|
|
# Left: Image display - fixed 484x484 initially
|
|
image_frame = ttk.Frame(main_frame)
|
|
image_frame.grid(row=0, column=0, sticky="nsew", padx=(0, 5))
|
|
self._image_frame = image_frame
|
|
|
|
# Use Canvas to ensure fixed size display
|
|
self._img_canvas = tk.Canvas(image_frame, width=484, height=484, bg="black", highlightthickness=0)
|
|
self._img_canvas.pack(fill="both", expand=True)
|
|
self._img_label = tk.Label(self._img_canvas, bg="black")
|
|
self._img_canvas.create_window(242, 242, window=self._img_label)
|
|
|
|
# Right: MFD Parameters
|
|
if self._mfd_state:
|
|
params_frame = self._build_mfd_params_frame(main_frame)
|
|
params_frame.grid(row=0, column=1, sticky="nsew")
|
|
|
|
|
|
# Bottom: Status bar
|
|
status_frame = ttk.Frame(main_frame, relief="sunken")
|
|
status_frame.grid(row=1, column=0, columnspan=2, sticky="ew", pady=(5, 0))
|
|
|
|
self._fps_label = ttk.Label(status_frame, text="FPS: 0.00")
|
|
self._fps_label.pack(side="left", padx=5)
|
|
|
|
# Grid weights
|
|
main_frame.rowconfigure(0, weight=1)
|
|
main_frame.rowconfigure(1, weight=0)
|
|
main_frame.columnconfigure(0, weight=3)
|
|
main_frame.columnconfigure(1, weight=1)
|
|
|
|
def _build_mfd_params_frame(self, parent) -> ttk.Frame:
|
|
"""Build MFD parameters control panel matching ControlPanel layout."""
|
|
params_frame = ttk.Labelframe(parent, text="MFD Parameters", padding=10)
|
|
|
|
# Category order matching ControlPanel
|
|
categories = ["Occlusion", "Cat A", "Cat B", "Cat C", "Cat C1", "Cat C2", "Cat C3"]
|
|
|
|
# Store widgets for updates
|
|
self._intensity_sliders = {}
|
|
self._color_buttons = {}
|
|
self._color_indicators = {}
|
|
|
|
row = 0
|
|
for cat_name in categories:
|
|
# Label
|
|
label = ttk.Label(params_frame, text=cat_name, width=10)
|
|
label.grid(row=row, column=0, sticky="w", pady=2)
|
|
|
|
# Intensity slider
|
|
intensity_var = tk.IntVar(value=255)
|
|
slider = ttk.Scale(
|
|
params_frame,
|
|
from_=0, to=255,
|
|
orient="horizontal",
|
|
variable=intensity_var,
|
|
command=lambda val, name=cat_name: self._on_intensity_changed(name, val)
|
|
)
|
|
slider.grid(row=row, column=1, sticky="ew", padx=5, pady=2)
|
|
self._intensity_sliders[cat_name] = (slider, intensity_var)
|
|
|
|
# Color button
|
|
color_btn = ttk.Button(
|
|
params_frame,
|
|
text="Color",
|
|
width=6,
|
|
command=lambda name=cat_name: self._on_color_button_clicked(name)
|
|
)
|
|
color_btn.grid(row=row, column=2, padx=2, pady=2)
|
|
self._color_buttons[cat_name] = color_btn
|
|
|
|
# Color indicator (shows current color)
|
|
color_indicator = tk.Label(params_frame, width=3, bg="#FFFFFF", relief="sunken")
|
|
color_indicator.grid(row=row, column=3, padx=2, pady=2)
|
|
self._color_indicators[cat_name] = color_indicator
|
|
|
|
row += 1
|
|
|
|
# Raw Map intensity
|
|
raw_label = ttk.Label(params_frame, text="Raw Map", width=10)
|
|
raw_label.grid(row=row, column=0, sticky="w", pady=2)
|
|
|
|
raw_var = tk.IntVar(value=255)
|
|
raw_slider = ttk.Scale(
|
|
params_frame,
|
|
from_=0, to=255,
|
|
orient="horizontal",
|
|
variable=raw_var,
|
|
command=lambda val: self._on_raw_map_changed(val)
|
|
)
|
|
raw_slider.grid(row=row, column=1, columnspan=3, sticky="ew", padx=5, pady=2)
|
|
self._raw_map_slider = (raw_slider, raw_var)
|
|
|
|
# Configure column weights
|
|
params_frame.columnconfigure(1, weight=1)
|
|
|
|
# Initialize color indicators from state
|
|
self._update_color_indicators()
|
|
|
|
# Save flags: placed inside MFD Parameters frame for convenience
|
|
# Provide a small English legend and stack the two checkboxes aligned left
|
|
legend_text = (
|
|
"Save options: Save received frames (.png) and raw payloads (.bin) "
|
|
"to the 'dumps' folder. Rotation keeps the last N files."
|
|
)
|
|
legend = ttk.Label(params_frame, text=legend_text, wraplength=220)
|
|
params_frame.rowconfigure(row+1, weight=0)
|
|
legend.grid(row=row+1, column=0, columnspan=4, sticky='w', pady=(8,2))
|
|
|
|
self._save_png_var = tk.BooleanVar(value=False)
|
|
self._save_bin_var = tk.BooleanVar(value=False)
|
|
save_png_cb = ttk.Checkbutton(params_frame, text="Save .png", variable=self._save_png_var, command=self._on_save_png_toggled)
|
|
save_bin_cb = ttk.Checkbutton(params_frame, text="Save .bin", variable=self._save_bin_var, command=self._on_save_bin_toggled)
|
|
# stack vertically, left-aligned
|
|
save_png_cb.grid(row=row+2, column=0, columnspan=4, sticky='w', pady=(2,2))
|
|
save_bin_cb.grid(row=row+3, column=0, columnspan=4, sticky='w', pady=(2,6))
|
|
|
|
return params_frame
|
|
|
|
def _update_color_indicators(self):
|
|
"""Update color indicator labels to reflect current MFD state colors."""
|
|
if not self._mfd_state:
|
|
return
|
|
|
|
for cat_name, indicator in self._color_indicators.items():
|
|
try:
|
|
cat_data = self._mfd_state.mfd_params["categories"].get(cat_name)
|
|
if cat_data:
|
|
bgr = cat_data["color"]
|
|
# Convert BGR to hex for Tkinter
|
|
hex_color = f"#{bgr[2]:02x}{bgr[1]:02x}{bgr[0]:02x}"
|
|
indicator.config(bg=hex_color)
|
|
except Exception as e:
|
|
logging.exception(f"Error updating color indicator for {cat_name}")
|
|
|
|
def _on_intensity_changed(self, category_name: str, value):
|
|
"""Handle intensity slider change."""
|
|
try:
|
|
int_val = int(float(value))
|
|
logging.info(f"MFD intensity changed: {category_name} = {int_val}")
|
|
|
|
if self._mfd_state:
|
|
self._mfd_state.update_category_intensity(category_name, int_val)
|
|
|
|
# Notify external callback
|
|
if self._on_mfd_param_changed:
|
|
self._on_mfd_param_changed("intensity", category_name, int_val)
|
|
except Exception as e:
|
|
logging.exception(f"Error updating intensity for {category_name}")
|
|
|
|
def _on_color_button_clicked(self, category_name: str):
|
|
"""Handle color button click - open color chooser."""
|
|
try:
|
|
# Get current color from state
|
|
current_bgr = (255, 255, 255)
|
|
if self._mfd_state:
|
|
cat_data = self._mfd_state.mfd_params["categories"].get(category_name)
|
|
if cat_data:
|
|
current_bgr = cat_data["color"]
|
|
|
|
# Convert BGR to RGB for colorchooser
|
|
current_rgb = (current_bgr[2], current_bgr[1], current_bgr[0])
|
|
|
|
# Open color picker
|
|
color = colorchooser.askcolor(
|
|
title=f"Choose color for {category_name}",
|
|
initialcolor=current_rgb
|
|
)
|
|
|
|
if color and color[0]: # color is ((r,g,b), "#rrggbb")
|
|
rgb = color[0]
|
|
# Convert RGB to BGR
|
|
new_bgr = (int(rgb[2]), int(rgb[1]), int(rgb[0]))
|
|
|
|
logging.info(f"MFD color changed: {category_name} = BGR{new_bgr}")
|
|
|
|
if self._mfd_state:
|
|
self._mfd_state.update_category_color(category_name, new_bgr)
|
|
self._update_color_indicators()
|
|
|
|
# Notify external callback
|
|
if self._on_mfd_param_changed:
|
|
self._on_mfd_param_changed("color", category_name, new_bgr)
|
|
except Exception as e:
|
|
logging.exception(f"Error updating color for {category_name}")
|
|
|
|
def _on_raw_map_changed(self, value):
|
|
"""Handle raw map intensity slider change."""
|
|
try:
|
|
int_val = int(float(value))
|
|
logging.info(f"MFD raw map intensity changed: {int_val}")
|
|
|
|
if self._mfd_state:
|
|
self._mfd_state.update_raw_map_intensity(int_val)
|
|
|
|
# Notify external callback
|
|
if self._on_mfd_param_changed:
|
|
self._on_mfd_param_changed("raw_map", None, int_val)
|
|
except Exception as e:
|
|
logging.exception("Error updating raw map intensity")
|
|
|
|
def get_mfd_lut(self):
|
|
"""Return current MFD LUT from state manager."""
|
|
if self._mfd_state:
|
|
return self._mfd_state.get_lut()
|
|
return None
|
|
|
|
def _on_save_png_toggled(self):
|
|
val = bool(self._save_png_var.get())
|
|
logging.info(f"UI: Save PNG toggled: {val}")
|
|
if self._on_mfd_param_changed:
|
|
self._on_mfd_param_changed("save_png", None, val)
|
|
|
|
def _on_save_bin_toggled(self):
|
|
val = bool(self._save_bin_var.get())
|
|
logging.info(f"UI: Save BIN toggled: {val}")
|
|
if self._on_mfd_param_changed:
|
|
self._on_mfd_param_changed("save_bin", None, val)
|
|
|
|
def show_frame(self, frame: Any) -> None:
|
|
"""Queue a frame for display (thread-safe)."""
|
|
try:
|
|
# Debug: log what type of frame we receive
|
|
frame_type = type(frame).__name__
|
|
frame_info = f"type={frame_type}"
|
|
if hasattr(frame, 'size'):
|
|
frame_info += f" size={frame.size}"
|
|
elif hasattr(frame, '__len__'):
|
|
frame_info += f" len={len(frame)}"
|
|
logging.debug(f"show_frame received: {frame_info}")
|
|
|
|
with self._lock:
|
|
self._pending_frame = frame
|
|
self._frame_times.append(time.time())
|
|
# Trim old timestamps
|
|
cutoff = time.time() - 2.0
|
|
while self._frame_times and self._frame_times[0] < cutoff:
|
|
self._frame_times.pop(0)
|
|
except Exception as e:
|
|
logging.exception("Error in show_frame")
|
|
pass
|
|
|
|
def _build_tk_image(self, frame: Any):
|
|
"""Convert frame to PhotoImage for display."""
|
|
try:
|
|
if Image is None or ImageTk is None:
|
|
return None
|
|
|
|
# Log what we're trying to convert
|
|
frame_type = type(frame).__name__
|
|
logging.debug(f"_build_tk_image processing: type={frame_type}")
|
|
|
|
if hasattr(frame, "copy"):
|
|
img = frame.copy()
|
|
elif isinstance(frame, (bytes, bytearray)):
|
|
import io
|
|
logging.debug(f"_build_tk_image: converting bytes/bytearray (len={len(frame)}) to PIL Image")
|
|
img = Image.open(io.BytesIO(frame)).convert("RGB")
|
|
else:
|
|
logging.warning(f"_build_tk_image: unknown frame type {frame_type}")
|
|
return None
|
|
|
|
# Check first frame size and resize widget if needed
|
|
if not self._first_frame_received:
|
|
self._first_frame_received = True
|
|
if img.width != self._default_size or img.height != self._default_size:
|
|
try:
|
|
# Resize canvas to accommodate different image size
|
|
self._img_canvas.configure(width=img.width, height=img.height)
|
|
self._img_canvas.coords(self._img_label, img.width // 2, img.height // 2)
|
|
logging.info(f"MFD viewer: resized to {img.width}x{img.height} (first frame differs from default {self._default_size}x{self._default_size})")
|
|
except Exception:
|
|
pass
|
|
else:
|
|
logging.info(f"MFD viewer: first frame is {img.width}x{img.height}, keeping default size")
|
|
|
|
# Log stats and apply autocontrast if needed
|
|
try:
|
|
import numpy as np
|
|
if img.mode == 'RGB':
|
|
arr = np.asarray(img.convert('L'))
|
|
else:
|
|
arr = np.asarray(img)
|
|
|
|
vmin = int(arr.min())
|
|
vmax = int(arr.max())
|
|
logging.debug(f"Frame stats: min={vmin} max={vmax}")
|
|
|
|
# Apply autocontrast for low-contrast frames
|
|
if ImageOps and ((vmax - vmin) < 16 or vmax < 64):
|
|
logging.info("Low contrast detected; applying autocontrast")
|
|
img = ImageOps.autocontrast(img)
|
|
except Exception as e:
|
|
logging.exception("Error processing frame stats")
|
|
|
|
# Save debug copy
|
|
try:
|
|
dumps_dir = os.path.join(os.getcwd(), 'dumps')
|
|
os.makedirs(dumps_dir, exist_ok=True)
|
|
debug_path = os.path.join(dumps_dir, 'VideoReceiverSFP_viewer_debug.png')
|
|
img.save(debug_path)
|
|
except Exception:
|
|
pass
|
|
|
|
return ImageTk.PhotoImage(img)
|
|
except Exception as e:
|
|
logging.exception("Error building tk image")
|
|
return None
|
|
|
|
def _poll(self):
|
|
"""Periodic GUI update: process pending frame and update FPS."""
|
|
try:
|
|
frame_to_show = None
|
|
with self._lock:
|
|
if self._pending_frame is not None:
|
|
frame_to_show = self._pending_frame
|
|
self._pending_frame = None
|
|
|
|
if frame_to_show is not None:
|
|
photo = self._build_tk_image(frame_to_show)
|
|
if photo:
|
|
self._photo = photo
|
|
self._img_label.config(image=self._photo)
|
|
|
|
# Update FPS
|
|
now = time.time()
|
|
recent = [t for t in self._frame_times if now - t < 2.0]
|
|
if len(recent) >= 2:
|
|
elapsed = recent[-1] - recent[0]
|
|
if elapsed > 0:
|
|
fps = (len(recent) - 1) / elapsed
|
|
self._fps_label.config(text=f"FPS: {fps:.2f}")
|
|
|
|
except Exception as e:
|
|
logging.exception("Error in viewer poll")
|
|
finally:
|
|
try:
|
|
self._root.after(self._poll_interval_ms, self._poll)
|
|
except Exception:
|
|
pass
|
|
|
|
def run(self):
|
|
"""Start the viewer main loop."""
|
|
self._running = True
|
|
try:
|
|
# If this viewer created its own root, run the mainloop; otherwise block
|
|
if self._owns_root:
|
|
try:
|
|
self._root.mainloop()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
else:
|
|
# Embedded mode: keep running until stop() is called
|
|
while self._running:
|
|
time.sleep(0.1)
|
|
finally:
|
|
self._running = False
|
|
|
|
def stop(self):
|
|
"""Stop the viewer."""
|
|
self._running = False
|
|
try:
|
|
if self._owns_root:
|
|
self._root.quit()
|
|
self._root.destroy()
|
|
except Exception:
|
|
pass
|