"""Simple Tkinter-based viewer for displaying frames from the VideoReceiverSFP module.""" import threading import time import logging import tkinter as tk from typing import Any import os try: from PIL import Image, ImageTk except Exception: Image = None ImageTk = None class SfpViewer: def __init__(self, window_title: str = "SFP Viewer"): self._root = tk.Tk() self._root.title(window_title) # Image display self._img_label = tk.Label(self._root) self._img_label.pack() # FPS label under the image self._fps_label = tk.Label(self._root, text="FPS: 0", font=("Arial", 10)) self._fps_label.pack() self._photo = None self._pending_frame = None self._lock = threading.Lock() self._running = False # timestamps of received frames for FPS calculation self._frame_times = [] # schedule GUI polling self._poll_interval_ms = 100 self._root.after(self._poll_interval_ms, self._poll) try: logging.getLogger().info("SfpViewer: GUI initialized") # Try to raise the window to the front try: self._root.lift() self._root.attributes("-topmost", True) self._root.update() # disable topmost after a short delay so it doesn't steal focus continuously def _unset_topmost(): try: self._root.attributes("-topmost", False) except Exception: pass self._root.after(200, _unset_topmost) except Exception: pass except Exception: pass def show_frame(self, frame: Any) -> None: # Called from any thread: store pending frame and add timestamp try: with self._lock: self._pending_frame = frame self._frame_times.append(time.time()) # Trim timestamps older than 2s cutoff = time.time() - 2.0 while self._frame_times and self._frame_times[0] < cutoff: self._frame_times.pop(0) except Exception: return def _build_tk_image(self, frame: Any): try: if Image is None or ImageTk is None: return None if hasattr(frame, "copy"): img = frame.copy() elif isinstance(frame, (bytes, bytearray)): import io img = Image.open(io.BytesIO(frame)).convert("RGB") else: return None # Lightweight diagnostics: log min/max and save a debug image copy try: arr = None if hasattr(img, 'mode') and img.mode in ('L', 'RGB'): import numpy as _np if img.mode == 'RGB': arr = _np.asarray(img.convert('L')) else: arr = _np.asarray(img) if arr is not None: vmin = int(arr.min()) vmax = int(arr.max()) logging.getLogger().debug("SfpViewer: frame stats min=%d max=%d", vmin, vmax) # If image is very low contrast (almost black), apply autocontrast to improve visibility try: if (vmax - vmin) < 16 or vmax < 64: from PIL import ImageOps logging.getLogger().info("SfpViewer: low contrast frame detected; applying autocontrast") img = ImageOps.autocontrast(img) # recompute arr for debug logging import numpy as _np arr = _np.asarray(img.convert('L')) vmin = int(arr.min()); vmax = int(arr.max()) logging.getLogger().info("SfpViewer: post-autocontrast min=%d max=%d", vmin, vmax) except Exception: logging.getLogger().exception("SfpViewer: autocontrast failed") dumps_dir = os.path.join(os.getcwd(), 'dumps') os.makedirs(dumps_dir, exist_ok=True) debug_path = os.path.join(dumps_dir, 'VideoReceiverSFP_last_frame_debug.png') try: img.save(debug_path) except Exception: pass except Exception: pass return ImageTk.PhotoImage(img) except Exception: return None def _poll(self): # Poll for pending frames and update GUI try: frame = None with self._lock: if self._pending_frame is not None: frame = self._pending_frame self._pending_frame = None if frame is not None: tkimg = self._build_tk_image(frame) if tkimg is not None: self._photo = tkimg self._img_label.config(image=self._photo) # update FPS label now = time.time() cutoff = now - 1.0 with self._lock: fps = len([t for t in self._frame_times if t >= cutoff]) self._fps_label.config(text=f"FPS: {fps}") except Exception: pass finally: try: self._root.after(self._poll_interval_ms, self._poll) except Exception: pass def run(self, duration: int = 10) -> None: def _stop(): try: self._root.quit() except Exception: pass try: self._root.after(int(duration * 1000), _stop) try: self._root.mainloop() except tk.TclError: start = time.time() while (time.time() - start) < duration: try: self._root.update() except tk.TclError: break time.sleep(0.02) finally: try: self._root.destroy() except Exception: pass self._running = False def run_async(self, duration: int = 10) -> None: threading.Thread(target=self.run, args=(duration,), daemon=True).start()