414 lines
16 KiB
Python
414 lines
16 KiB
Python
"""Tkinter viewer window for VideoReceiverSFP with MFD parameter controls."""
|
|
import threading
|
|
import time
|
|
import logging
|
|
import tkinter as tk
|
|
from tkinter import ttk, colorchooser
|
|
from tkinter.scrolledtext import ScrolledText
|
|
|
|
try:
|
|
# Use the integration wrapper from the ControlPanel package if available
|
|
from controlpanel.gui import logger_integration
|
|
except Exception:
|
|
logger_integration = None
|
|
from typing import Any, Optional, Callable
|
|
import os
|
|
|
|
try:
|
|
from PIL import Image, ImageTk, ImageOps
|
|
except Exception:
|
|
Image = ImageTk = ImageOps = None
|
|
|
|
# Import MFD state if available
|
|
try:
|
|
from VideoReceiverSFP.core.mfd_state import MfdState
|
|
except ImportError:
|
|
import sys
|
|
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
|
|
try:
|
|
from core.mfd_state import MfdState
|
|
except ImportError:
|
|
MfdState = None
|
|
|
|
|
|
class SfpViewerWithParams:
|
|
"""Tkinter viewer with MFD parameter controls matching ControlPanel layout."""
|
|
|
|
def __init__(self,
|
|
window_title: str = "SFP Viewer with MFD Params",
|
|
on_mfd_param_changed: Optional[Callable] = None,
|
|
master: Optional[tk.Widget] = None,
|
|
logs_master: Optional[tk.Widget] = None):
|
|
"""Initialize viewer with MFD parameter controls."""
|
|
if master is None:
|
|
self._root = tk.Tk()
|
|
self._owns_root = True
|
|
self._root.title(window_title)
|
|
self._root.geometry("900x600")
|
|
else:
|
|
self._root = master
|
|
self._owns_root = False
|
|
|
|
self._on_mfd_param_changed = on_mfd_param_changed
|
|
self._logs_master = logs_master
|
|
|
|
if MfdState:
|
|
self._mfd_state = MfdState()
|
|
else:
|
|
self._mfd_state = None
|
|
logging.warning("MfdState not available; params controls disabled")
|
|
|
|
self._build_ui()
|
|
|
|
self._photo = None
|
|
self._pending_frame = None
|
|
self._lock = threading.Lock()
|
|
self._running = False
|
|
|
|
self._frame_times = []
|
|
self._first_frame_received = False
|
|
self._default_size = 484
|
|
# Display size for MFD preview (pixels per side). Raw MFD frames are often 484x484.
|
|
self._mfd_display_size = 484
|
|
|
|
self._poll_interval_ms = 100
|
|
self._running = True
|
|
self._root.after(self._poll_interval_ms, self._poll)
|
|
|
|
try:
|
|
logging.getLogger().info("SfpViewerWithParams: GUI initialized")
|
|
if self._owns_root:
|
|
try:
|
|
self._root.lift()
|
|
self._root.attributes("-topmost", True)
|
|
self._root.update()
|
|
def _unset_topmost():
|
|
try:
|
|
self._root.attributes("-topmost", False)
|
|
except Exception:
|
|
pass
|
|
self._root.after(200, _unset_topmost)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
def _build_ui(self):
|
|
"""Build main UI: image canvas only with context menu for parameters."""
|
|
# Main container - padding=0 for absolute edge-to-edge
|
|
main_frame = ttk.Frame(self._root, padding=0)
|
|
main_frame.pack(fill="both", expand=True)
|
|
|
|
# Canvas: highlightthickness=0 and bd=0 remove the gray focus ring/border
|
|
self._img_canvas = tk.Canvas(main_frame, bg="black", highlightthickness=0, bd=0)
|
|
self._img_canvas.pack(fill="both", expand=True, padx=0, pady=0)
|
|
|
|
# We use a Label inside the canvas window to display the image
|
|
self._img_label = tk.Label(self._img_canvas, bg="black", bd=0, padx=0, pady=0)
|
|
try:
|
|
self._img_label.bind("<Button-3>", self._on_canvas_right_click)
|
|
except Exception:
|
|
pass
|
|
|
|
# Place label at top-left of canvas
|
|
self._canvas_win_id = self._img_canvas.create_window(0, 0, window=self._img_label, anchor="nw")
|
|
|
|
try:
|
|
self._context_menu = tk.Menu(self._root, tearoff=0)
|
|
self._context_menu.add_command(label="Open Controls", command=self._open_controls_window)
|
|
self._img_canvas.bind("<Button-3>", self._on_canvas_right_click)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
if self._logs_master is not None:
|
|
self._log_widget = ScrolledText(self._logs_master, height=6, state=tk.DISABLED, wrap=tk.WORD)
|
|
self._log_widget.pack(fill=tk.BOTH, expand=True)
|
|
else:
|
|
logs_container = ttk.Frame(self._root)
|
|
logs_container.pack(fill="both", expand=False, padx=5, pady=(5, 5))
|
|
logs_frame = ttk.Labelframe(logs_container, text="Logs", padding=5)
|
|
logs_frame.pack(fill="both", expand=True)
|
|
self._log_widget = ScrolledText(logs_frame, height=6, state=tk.DISABLED, wrap=tk.WORD)
|
|
self._log_widget.pack(fill=tk.BOTH, expand=True)
|
|
|
|
if logger_integration is not None:
|
|
try:
|
|
logger_integration.init_logger(self._root, enable_console=True, root_level=logging.INFO)
|
|
logger_integration.add_widget(self._log_widget, level_colors=None, max_lines=2000)
|
|
except Exception as e:
|
|
logging.warning(f"SfpViewerWithParams: logger integration failed: {e}")
|
|
except Exception as e:
|
|
logging.warning(f"SfpViewerWithParams: could not create Logs frame: {e}")
|
|
|
|
def _build_mfd_params_frame(self, parent) -> ttk.Frame:
|
|
"""Build MFD parameters control panel."""
|
|
params_frame = ttk.Labelframe(parent, text="MFD Parameters", padding=10)
|
|
categories = ["Occlusion", "Cat A", "Cat B", "Cat C", "Cat C1", "Cat C2", "Cat C3"]
|
|
|
|
self._intensity_sliders = {}
|
|
self._color_buttons = {}
|
|
self._color_indicators = {}
|
|
|
|
row = 0
|
|
for cat_name in categories:
|
|
label = ttk.Label(params_frame, text=cat_name, width=10)
|
|
label.grid(row=row, column=0, sticky="w", pady=2)
|
|
|
|
intensity_var = tk.IntVar(value=255)
|
|
slider = ttk.Scale(
|
|
params_frame,
|
|
from_=0, to=255,
|
|
orient="horizontal",
|
|
variable=intensity_var,
|
|
command=lambda val, name=cat_name: self._on_intensity_changed(name, val)
|
|
)
|
|
slider.grid(row=row, column=1, sticky="ew", padx=5, pady=2)
|
|
self._intensity_sliders[cat_name] = (slider, intensity_var)
|
|
|
|
color_btn = ttk.Button(
|
|
params_frame,
|
|
text="Color",
|
|
width=6,
|
|
command=lambda name=cat_name: self._on_color_button_clicked(name)
|
|
)
|
|
color_btn.grid(row=row, column=2, padx=2, pady=2)
|
|
self._color_buttons[cat_name] = color_btn
|
|
|
|
color_indicator = tk.Label(params_frame, width=3, bg="#FFFFFF", relief="sunken")
|
|
color_indicator.grid(row=row, column=3, padx=2, pady=2)
|
|
self._color_indicators[cat_name] = color_indicator
|
|
row += 1
|
|
|
|
raw_label = ttk.Label(params_frame, text="Raw Map", width=10)
|
|
raw_label.grid(row=row, column=0, sticky="w", pady=2)
|
|
|
|
raw_var = tk.IntVar(value=255)
|
|
raw_slider = ttk.Scale(
|
|
params_frame,
|
|
from_=0, to=255,
|
|
orient="horizontal",
|
|
variable=raw_var,
|
|
command=lambda val: self._on_raw_map_changed(val)
|
|
)
|
|
raw_slider.grid(row=row, column=1, columnspan=3, sticky="ew", padx=5, pady=2)
|
|
self._raw_map_slider = (raw_slider, raw_var)
|
|
params_frame.columnconfigure(1, weight=1)
|
|
|
|
self._update_color_indicators()
|
|
|
|
legend_text = "Save options: PNG frames and raw payloads to 'dumps' folder."
|
|
legend = ttk.Label(params_frame, text=legend_text, wraplength=220)
|
|
legend.grid(row=row+1, column=0, columnspan=4, sticky='w', pady=(8,2))
|
|
|
|
self._save_png_var = tk.BooleanVar(value=False)
|
|
self._save_bin_var = tk.BooleanVar(value=False)
|
|
save_png_cb = ttk.Checkbutton(params_frame, text="Save .png", variable=self._save_png_var, command=self._on_save_png_toggled)
|
|
save_bin_cb = ttk.Checkbutton(params_frame, text="Save .bin", variable=self._save_bin_var, command=self._on_save_bin_toggled)
|
|
save_png_cb.grid(row=row+2, column=0, columnspan=4, sticky='w', pady=(2,2))
|
|
save_bin_cb.grid(row=row+3, column=0, columnspan=4, sticky='w', pady=(2,6))
|
|
|
|
return params_frame
|
|
|
|
def _update_color_indicators(self):
|
|
if not self._mfd_state:
|
|
return
|
|
for cat_name, indicator in self._color_indicators.items():
|
|
try:
|
|
cat_data = self._mfd_state.mfd_params["categories"].get(cat_name)
|
|
if cat_data:
|
|
bgr = cat_data["color"]
|
|
hex_color = f"#{bgr[2]:02x}{bgr[1]:02x}{bgr[0]:02x}"
|
|
indicator.config(bg=hex_color)
|
|
except Exception:
|
|
pass
|
|
|
|
def _on_intensity_changed(self, category_name: str, value):
|
|
try:
|
|
int_val = int(float(value))
|
|
if self._mfd_state:
|
|
self._mfd_state.update_category_intensity(category_name, int_val)
|
|
if self._on_mfd_param_changed:
|
|
self._on_mfd_param_changed("intensity", category_name, int_val)
|
|
except Exception:
|
|
pass
|
|
|
|
def _on_color_button_clicked(self, category_name: str):
|
|
try:
|
|
current_bgr = (255, 255, 255)
|
|
if self._mfd_state:
|
|
cat_data = self._mfd_state.mfd_params["categories"].get(category_name)
|
|
if cat_data:
|
|
current_bgr = cat_data["color"]
|
|
current_rgb = (current_bgr[2], current_bgr[1], current_bgr[0])
|
|
color = colorchooser.askcolor(title=f"Choose color for {category_name}", initialcolor=current_rgb)
|
|
if color and color[0]:
|
|
rgb = color[0]
|
|
new_bgr = (int(rgb[2]), int(rgb[1]), int(rgb[0]))
|
|
if self._mfd_state:
|
|
self._mfd_state.update_category_color(category_name, new_bgr)
|
|
self._update_color_indicators()
|
|
if self._on_mfd_param_changed:
|
|
self._on_mfd_param_changed("color", category_name, new_bgr)
|
|
except Exception:
|
|
pass
|
|
|
|
def _on_raw_map_changed(self, value):
|
|
try:
|
|
int_val = int(float(value))
|
|
if self._mfd_state:
|
|
self._mfd_state.update_raw_map_intensity(int_val)
|
|
if self._on_mfd_param_changed:
|
|
self._on_mfd_param_changed("raw_map", None, int_val)
|
|
except Exception:
|
|
pass
|
|
|
|
def get_mfd_lut(self):
|
|
if self._mfd_state:
|
|
return self._mfd_state.get_lut()
|
|
return None
|
|
|
|
def set_mfd_display_size(self, size: int) -> None:
|
|
"""Set maximum side length (pixels) for displayed MFD preview images.
|
|
|
|
The incoming raw MFD frames are kept intact; this only affects displayed copy size.
|
|
"""
|
|
try:
|
|
s = int(size)
|
|
if s < 16:
|
|
s = 16
|
|
self._mfd_display_size = s
|
|
except Exception:
|
|
pass
|
|
|
|
def _on_save_png_toggled(self):
|
|
val = bool(self._save_png_var.get())
|
|
if self._on_mfd_param_changed:
|
|
self._on_mfd_param_changed("save_png", None, val)
|
|
|
|
def _on_save_bin_toggled(self):
|
|
val = bool(self._save_bin_var.get())
|
|
if self._on_mfd_param_changed:
|
|
self._on_mfd_param_changed("save_bin", None, val)
|
|
|
|
def _on_canvas_right_click(self, event):
|
|
try:
|
|
if hasattr(self, '_context_menu') and self._context_menu:
|
|
self._context_menu.tk_popup(event.x_root, event.y_root)
|
|
except Exception:
|
|
pass
|
|
|
|
def _open_controls_window(self):
|
|
try:
|
|
win = tk.Toplevel(self._root)
|
|
win.title("MFD Controls")
|
|
frame = self._build_mfd_params_frame(win)
|
|
frame.pack(fill='both', expand=True, padx=6, pady=6)
|
|
win.transient(self._root)
|
|
win.lift()
|
|
except Exception:
|
|
pass
|
|
|
|
def show_frame(self, frame: Any) -> None:
|
|
try:
|
|
with self._lock:
|
|
self._pending_frame = frame
|
|
self._frame_times.append(time.time())
|
|
cutoff = time.time() - 2.0
|
|
while self._frame_times and self._frame_times[0] < cutoff:
|
|
self._frame_times.pop(0)
|
|
except Exception:
|
|
pass
|
|
|
|
def _build_tk_image(self, frame: Any):
|
|
"""Convert frame to PhotoImage and sync canvas size."""
|
|
try:
|
|
if Image is None or ImageTk is None:
|
|
return None
|
|
|
|
if hasattr(frame, "copy"):
|
|
img = frame.copy()
|
|
elif isinstance(frame, (bytes, bytearray)):
|
|
import io
|
|
img = Image.open(io.BytesIO(frame)).convert("RGB")
|
|
else:
|
|
return None
|
|
|
|
# Apply optional autocontrast heuristics
|
|
try:
|
|
import numpy as np
|
|
arr = np.asarray(img.convert('L')) if img.mode == 'RGB' else np.asarray(img)
|
|
vmin, vmax = int(arr.min()), int(arr.max())
|
|
if ImageOps and ((vmax - vmin) < 16 or vmax < 64):
|
|
img = ImageOps.autocontrast(img)
|
|
except Exception:
|
|
pass
|
|
|
|
# Prepare a resized copy for display so we don't force the canvas to grow
|
|
display_copy = img.copy()
|
|
try:
|
|
max_side = int(self._mfd_display_size) if self._mfd_display_size else None
|
|
except Exception:
|
|
max_side = None
|
|
|
|
if max_side and (display_copy.width > max_side or display_copy.height > max_side):
|
|
try:
|
|
resample = getattr(Image, 'LANCZOS', 1)
|
|
display_copy.thumbnail((max_side, max_side), resample)
|
|
except Exception:
|
|
try:
|
|
display_copy.thumbnail((max_side, max_side))
|
|
except Exception:
|
|
pass
|
|
|
|
# Update canvas geometry on size change or first frame
|
|
w, h = display_copy.width, display_copy.height
|
|
if not self._first_frame_received or (w != self._img_canvas.winfo_width()):
|
|
self._first_frame_received = True
|
|
self._img_canvas.config(width=w, height=h)
|
|
|
|
return ImageTk.PhotoImage(display_copy)
|
|
except Exception:
|
|
return None
|
|
|
|
def _poll(self):
|
|
try:
|
|
frame_to_show = None
|
|
with self._lock:
|
|
if self._pending_frame is not None:
|
|
frame_to_show = self._pending_frame
|
|
self._pending_frame = None
|
|
|
|
if frame_to_show is not None:
|
|
photo = self._build_tk_image(frame_to_show)
|
|
if photo:
|
|
self._photo = photo
|
|
self._img_label.config(image=self._photo)
|
|
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
try:
|
|
self._root.after(self._poll_interval_ms, self._poll)
|
|
except Exception:
|
|
pass
|
|
|
|
def run(self):
|
|
self._running = True
|
|
try:
|
|
if self._owns_root:
|
|
self._root.mainloop()
|
|
else:
|
|
while self._running:
|
|
time.sleep(0.1)
|
|
finally:
|
|
self._running = False
|
|
|
|
def stop(self):
|
|
self._running = False
|
|
try:
|
|
if self._owns_root:
|
|
self._root.quit()
|
|
self._root.destroy()
|
|
except Exception:
|
|
pass |