SXXXXXXX_ControlPanel/VideoReceiverSFP/core/sfp_module.py
2026-01-13 15:14:22 +01:00

783 lines
33 KiB
Python

"""VideoReceiverSFP: SFP receiver module (simulated and network modes).
This module implements a simple L0-like adapter that can run in simulated
mode by reading images from a local directory and publishing frames to
registered callbacks, or in network mode by listening for SFP UDP packets.
"""
from typing import Callable, Dict, Any, Optional
from collections import deque
import pathlib
import threading
import time
import os
import logging
import io
try:
import numpy as np
except Exception: # pragma: no cover - numpy is bundled via requirements, but guard anyway
np = None # type: ignore
try:
from PIL import Image
except Exception:
Image = None
# Optional network transport
try:
from .sfp_transport import SfpTransport
from .sfp_structures import SFPHeader
except Exception:
SfpTransport = None
SFPHeader = None
# Try to import the authoritative image structures from controlpanel
try:
from controlpanel.core.sfp_structures import ImageLeaderData
except Exception:
ImageLeaderData = None
# Use local image processing helpers (module must be self-contained)
try:
from .image_processing import normalize_image, apply_color_palette, apply_brightness_contrast, resize_image
except Exception:
normalize_image = None
apply_color_palette = None
apply_brightness_contrast = None
resize_image = None
try:
from .mfd_palette import build_mfd_lut, apply_mfd_lut
except Exception:
build_mfd_lut = None # type: ignore
apply_mfd_lut = None # type: ignore
# --- Image Type Constants ---
# Based on actual payload analysis: TYPE=1 with PALTYPE=1 is MFD, TYPE=2 is SAR
IMAGE_TYPE_UNKNOWN = 0
IMAGE_TYPE_MFD = 1 # MFD images have TYPE=1 and typically PALTYPE=1 (indexed palette)
IMAGE_TYPE_SAR = 2 # SAR images have TYPE=2
def get_image_type_name(type_id):
"""Helper function to get the name of the image type."""
return {
IMAGE_TYPE_SAR: "SAR",
IMAGE_TYPE_MFD: "MFD",
}.get(type_id, f"Unknown ({type_id})")
class SfpConnectorModule:
"""Minimal, self-contained SFP connector for development and testing.
- When initialized with `sim_dir` it will stream images from that folder
at the configured `fps`.
- When initialized with `host`/`port` it will listen for SFP UDP and
dispatch completed payloads to callbacks.
"""
def __init__(self, name: str = "VideoReceiverSFP"):
self.module_name = name
self.is_running = False
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._callbacks = []
self._last_frame = None
self._frame_lock = threading.Lock()
self._sim_dir: Optional[str] = None
self._fps = 5
self._frame_idx = 0
self._host = None
self._port = None
self._transport: Optional[object] = None
# normalization method: None, 'cp' (use controlpanel.normalize_image), or 'autocontrast'
self._normalize_method: Optional[str] = None
self._image_type_filter: Optional[int] = None
self._mfd_lut = None
# saving flags and queues
self._save_png = False
self._save_bin = False
try:
from .config import DUMP_KEEP_COUNT, DUMPS_DIR
self._dump_keep = int(DUMP_KEEP_COUNT)
self._dumps_dir = str(DUMPS_DIR)
except Exception:
self._dump_keep = 100
self._dumps_dir = os.path.join(os.getcwd(), 'dumps')
# deques to track saved files for rotation
self._saved_pngs = deque()
self._saved_bins = deque()
# track whether we've saved the first decoded frame to disk
self._first_frame_saved = False
self._first_frame_path: Optional[str] = None
# internal counters for logging
self._frames_received = 0
def initialize(self, config: Dict[str, Any]) -> bool:
"""Initialize the module.
Config keys:
- sim_dir: optional directory with sample images for simulated mode
- fps: frames per second (default 5)
- host: optional host to bind for network mode
- port: optional UDP port for network mode
"""
self._sim_dir = config.get("sim_dir")
self._fps = int(config.get("fps", 5))
self._host = config.get("host")
self._port = config.get("port")
self._normalize_method = config.get("normalize", None)
image_type_str = config.get("image_type_filter")
if image_type_str:
image_type_str = str(image_type_str).upper()
if image_type_str == 'MFD':
self._image_type_filter = IMAGE_TYPE_MFD
logging.info("VideoReceiverSFP: Filtering for MFD images only.")
elif image_type_str == 'SAR':
self._image_type_filter = IMAGE_TYPE_SAR
logging.info("VideoReceiverSFP: Filtering for SAR images only.")
else:
logging.warning("VideoReceiverSFP: Unknown image_type_filter=%s", image_type_str)
try:
logging.getLogger().debug("VideoReceiverSFP.initialize: sim_dir=%s fps=%s host=%s port=%s", self._sim_dir, self._fps, self._host, self._port)
except Exception:
pass
return True
def start_session(self) -> None:
if self.is_running:
return
# If network config provided and transport available, start transport
if self._host and self._port and SfpTransport is not None:
try:
handlers = {
ord("M"): self._network_payload_handler,
ord("S"): self._network_payload_handler,
}
self._transport = SfpTransport(self._host, int(self._port), handlers)
started = self._transport.start()
if started:
try:
logging.getLogger().info("VideoReceiverSFP: started SfpTransport on %s:%s", self._host, self._port)
except Exception:
pass
self.is_running = True
return
except Exception:
logging.exception("Failed to start SfpTransport; falling back to sim mode")
# fallback to simulated folder stream
self._stop_event.clear()
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
self.is_running = True
def stop_session(self) -> None:
if not self.is_running:
return
# Stop transport if active
if getattr(self, "_transport", None) is not None:
try:
self._transport.shutdown()
except Exception:
pass
self._transport = None
self._stop_event.set()
if self._thread is not None:
self._thread.join(timeout=2.0)
self.is_running = False
def register_callback(self, cb: Callable[[Any], None]) -> None:
if cb not in self._callbacks:
self._callbacks.append(cb)
try:
logging.getLogger().debug("VideoReceiverSFP: registered callback %s", getattr(cb, '__name__', str(cb)))
except Exception:
pass
def get_status(self) -> Dict[str, Any]:
return {
"is_running": self.is_running,
"sim_dir": self._sim_dir,
"fps": self._fps,
"host": self._host,
"port": self._port,
}
def get_data_stream(self) -> Any:
with self._frame_lock:
return self._last_frame
def update_mfd_lut(self, new_lut: 'np.ndarray') -> None:
"""Update the MFD LUT used for MFD image decoding.
Args:
new_lut: 256x3 RGB LUT array (uint8)
"""
self._mfd_lut = new_lut
logging.info("VideoReceiverSFP: MFD LUT updated")
# Optionally re-decode last frame if it was MFD type
# (For simplicity, next frame will use new LUT)
def save_last_frame(self, path: str) -> bool:
frame = self.get_data_stream()
if frame is None:
return False
try:
if Image is not None and hasattr(frame, "save"):
frame.save(path)
else:
with open(path, "wb") as fh:
fh.write(frame)
return True
except Exception:
return False
def _list_sim_files(self):
if not self._sim_dir or not os.path.isdir(self._sim_dir):
return []
files = [f for f in sorted(os.listdir(self._sim_dir)) if not f.startswith(".")]
return [os.path.join(self._sim_dir, f) for f in files]
def _load_next_frame(self):
files = self._list_sim_files()
if not files:
return None
path = files[self._frame_idx % len(files)]
self._frame_idx += 1
try:
if Image is not None:
# Open without forced conversion so we can detect grayscale/index images
img = Image.open(path)
try:
mode = getattr(img, 'mode', None)
# If the file is single-channel (likely an MFD index image), attempt to colorize
if mode in ('L', 'P'):
try:
import numpy as _np
inds = _np.array(img.convert('L'))
self._ensure_mfd_lut()
if self._mfd_lut is not None and apply_mfd_lut is not None:
rgb = apply_mfd_lut(inds, self._mfd_lut)
if rgb is not None and Image is not None:
try:
frame = Image.fromarray(rgb.astype('uint8'), mode='RGB')
dumps_dir = os.path.join(os.getcwd(), 'dumps')
os.makedirs(dumps_dir, exist_ok=True)
dbg_path = os.path.join(dumps_dir, 'VideoReceiverSFP_colorized_debug.png')
frame.save(dbg_path)
except Exception:
pass
return frame
except Exception:
pass
# Some dumps are stored as RGB but actually contain index images (identical channels / low cardinality)
if mode == 'RGB':
try:
import numpy as _np
larr = _np.array(img.convert('L'))
unique_vals = _np.unique(larr)
# If unique index count is small (<=32) treat as indices and colorize
if unique_vals.size <= 32:
self._ensure_mfd_lut()
if self._mfd_lut is not None and apply_mfd_lut is not None:
rgb = apply_mfd_lut(larr, self._mfd_lut)
if rgb is not None and Image is not None:
try:
frame = Image.fromarray(rgb.astype('uint8'), mode='RGB')
dumps_dir = os.path.join(os.getcwd(), 'dumps')
os.makedirs(dumps_dir, exist_ok=True)
dbg_path = os.path.join(dumps_dir, 'VideoReceiverSFP_colorized_debug.png')
frame.save(dbg_path)
except Exception:
pass
return frame
except Exception:
pass
# Default: return an RGB-converted copy for display
return img.convert("RGB")
except Exception:
return img.convert("RGB")
else:
with open(path, "rb") as fh:
return fh.read()
except Exception:
return None
def _loop(self):
interval = 1.0 / max(1, self._fps)
while not self._stop_event.is_set():
frame = self._load_next_frame()
if frame is not None:
with self._frame_lock:
self._last_frame = frame
self._frames_received += 1
try:
if self._frames_received % 10 == 0:
logging.getLogger().info("VideoReceiverSFP: simulated frames received=%d", self._frames_received)
except Exception:
pass
for cb in list(self._callbacks):
try:
cb(frame)
except Exception:
pass
time.sleep(interval)
@staticmethod
def _looks_like_encoded_image(payload: bytes) -> bool:
if not payload:
return False
if payload.startswith(b"\x89PNG\r\n\x1a\n"):
return True
if payload.startswith(b"\xff\xd8\xff"):
return True
if payload.startswith(b"BM"):
return True
return False
def _ensure_mfd_lut(self):
if self._mfd_lut is not None:
return
# First try to construct the exact LUT used by ControlPanel (parity)
try:
from controlpanel.app_state import AppState
state = AppState()
params = state.mfd_params
raw_map_factor = params.get("raw_map_intensity", 128) / 255.0
pixel_map = params.get("pixel_to_category", {})
categories = params.get("categories", {})
new_lut = None
try:
import numpy as _np
new_lut = _np.zeros((256, 3), dtype=_np.uint8)
for idx in range(256):
cat_name = pixel_map.get(idx)
if cat_name:
cat_data = categories[cat_name]
bgr = cat_data["color"]
intensity_factor = cat_data["intensity"] / 255.0
new_lut[idx, 0] = _np.clip(int(round(float(bgr[0]) * intensity_factor)), 0, 255)
new_lut[idx, 1] = _np.clip(int(round(float(bgr[1]) * intensity_factor)), 0, 255)
new_lut[idx, 2] = _np.clip(int(round(float(bgr[2]) * intensity_factor)), 0, 255)
elif 32 <= idx <= 255:
raw_intensity = (float(idx) - 32.0) * (255.0 / 223.0)
final_gray = int(round(_np.clip(raw_intensity * raw_map_factor, 0, 255)))
new_lut[idx, :] = final_gray
except Exception:
new_lut = None
if new_lut is not None:
self._mfd_lut = new_lut
logging.getLogger().info("VideoReceiverSFP: built ControlPanel-compatible MFD LUT for colorization")
return
except Exception:
# fall through to other build methods
pass
# Fallback: use local build_mfd_lut if available
if build_mfd_lut is None:
return
try:
self._mfd_lut = build_mfd_lut()
logging.getLogger().info("VideoReceiverSFP: built fallback MFD LUT for colorization")
except Exception:
logging.getLogger().exception("VideoReceiverSFP: failed to build MFD LUT")
def _extract_index_array(self, width: int, height: int, row_stride: int, pixel_data: bytes):
if np is None:
return None
total_bytes = row_stride * height
if len(pixel_data) < total_bytes:
logging.getLogger().warning(
"VideoReceiverSFP: pixel_data shorter than expected (got=%d need=%d)",
len(pixel_data),
total_bytes,
)
return None
try:
arr = np.frombuffer(pixel_data[:total_bytes], dtype=np.uint8)
row_pixels = width
if row_stride == row_pixels:
arr = arr.reshape((height, width))
else:
arr = arr.reshape((height, row_stride))[:, :width]
return arr.copy()
except Exception:
logging.getLogger().exception("VideoReceiverSFP: failed to reshape MFD pixel data")
return None
def _build_grayscale_frame(self, width, height, row_stride, row_pixels, pixel_data):
if Image is None:
return None
try:
if row_stride == row_pixels:
img = Image.frombytes("L", (width, height), pixel_data)
else:
rows = []
for r in range(height):
start = r * row_stride
rows.append(pixel_data[start:start + row_pixels])
img = Image.frombytes("L", (width, height), b"".join(rows))
normalized_img = None
if self._normalize_method == 'cp' and normalize_image is not None:
try:
if np is not None:
arr = np.frombuffer(img.tobytes(), dtype=np.uint8).reshape((height, width))
norm = normalize_image(arr, target_type=np.uint8)
if norm is not None:
normalized_img = Image.fromarray(norm, mode='L')
except Exception:
logging.getLogger().exception("VideoReceiverSFP: controlpanel normalize_image failed")
elif self._normalize_method == 'autocontrast':
try:
from PIL import ImageOps
normalized_img = ImageOps.autocontrast(img)
except Exception:
normalized_img = None
if normalized_img is not None:
frame = normalized_img.convert("RGB")
try:
dumps_dir = os.path.join(os.getcwd(), "dumps")
os.makedirs(dumps_dir, exist_ok=True)
dbg_path = os.path.join(dumps_dir, "VideoReceiverSFP_last_normalized.png")
frame.save(dbg_path)
logging.getLogger().info("VideoReceiverSFP: saved normalized debug frame to %s", dbg_path)
except Exception:
pass
return frame
return img.convert("RGB")
except Exception:
logging.getLogger().exception("VideoReceiverSFP: failed to build grayscale frame")
return None
def _decode_leader_payload(self, leader, payload: bytes):
if ImageLeaderData is None:
return None
header = leader.HEADER_DATA
image_type_id = int(header.TYPE)
if self._image_type_filter is not None and image_type_id != self._image_type_filter:
logging.getLogger().debug(
"VideoReceiverSFP: skipping image_type=%s due to filter",
get_image_type_name(image_type_id),
)
return None
width = int(header.DX)
height = int(header.DY)
stride = int(header.STRIDE)
bpp = int(header.BPP)
comp = int(header.COMP)
pal_type = int(getattr(header, "PALTYPE", 0))
if bpp <= 8:
bytes_per_pixel = 1
elif bpp % 8 == 0:
bytes_per_pixel = bpp // 8
else:
bytes_per_pixel = max(1, (bpp + 7) // 8)
row_pixels = width * bytes_per_pixel
row_stride = stride if stride and stride > 0 else row_pixels
# Calculate pixel data offset using ImageLeaderData layout when available
if ImageLeaderData is not None:
try:
# Mirror ControlPanel's calculation to remain compatible with payload layout
pixel_offset = (
leader.HEADER_TAG.size()
+ leader.HEADER_DATA.size()
+ leader.GEO_TAG.size()
+ leader.GEO_DATA.size()
+ leader.RESERVED_TAG.size()
+ ImageLeaderData.get_reserved_data_size()
+ leader.CM_TAG.size()
+ ImageLeaderData.get_colour_map_size()
+ leader.PIXEL_TAG.size()
)
except Exception:
pixel_offset = ImageLeaderData.size()
else:
pixel_offset = 0
total_bytes = row_stride * height
pixel_data = payload[pixel_offset:pixel_offset + total_bytes]
if len(pixel_data) < total_bytes:
logging.getLogger().warning(
"VideoReceiverSFP: truncated pixel payload (have=%d expected=%d)",
len(pixel_data),
total_bytes,
)
return None
if image_type_id == IMAGE_TYPE_MFD and pal_type == 1 and bytes_per_pixel == 1:
# Extract raw indices first
indices = self._extract_index_array(width, height, row_stride, pixel_data)
if indices is None:
return None
# Attempt to produce a colorized RGB image using the MFD LUT,
# then enhance visibility (scale non-zero pixels and autocontrast)
rgb = None
try:
self._ensure_mfd_lut()
if self._mfd_lut is not None and apply_mfd_lut is not None:
rgb = apply_mfd_lut(indices, self._mfd_lut)
except Exception:
rgb = None
if rgb is not None:
try:
import numpy as _np
from PIL import ImageOps
rgb_arr = _np.asarray(rgb, dtype=_np.uint8)
mask = _np.any(rgb_arr != 0, axis=2)
pct_nonzero = float(_np.count_nonzero(mask)) / float(mask.size) if mask.size else 0.0
max_val = int(rgb_arr.max()) if rgb_arr.size else 0
if pct_nonzero < 0.10 or max_val < 220:
if _np.count_nonzero(mask) > 0 and max_val > 0:
scale = 255.0 / float(max_val)
rgb_arr = _np.clip(rgb_arr.astype(_np.float32) * scale, 0, 255).astype(_np.uint8)
try:
pil_tmp = Image.fromarray(rgb_arr, mode='RGB')
pil_tmp = ImageOps.autocontrast(pil_tmp)
rgb_arr = _np.array(pil_tmp)
except Exception:
pass
if Image is not None:
return Image.fromarray(rgb_arr, mode='RGB')
return rgb_arr
except Exception:
if Image is not None:
return Image.fromarray(rgb, mode='RGB')
return rgb
# Fallback: try again to map indices -> RGB, or return grayscale RGB
try:
self._ensure_mfd_lut()
if self._mfd_lut is not None and apply_mfd_lut is not None:
rgb2 = apply_mfd_lut(indices, self._mfd_lut)
if rgb2 is not None:
try:
import numpy as _np
from PIL import ImageOps
rgb_arr = _np.asarray(rgb2, dtype=_np.uint8)
max_val = int(rgb_arr.max()) if rgb_arr.size else 0
if _np.count_nonzero(_np.any(rgb_arr != 0, axis=2)) > 0 and max_val > 0:
scale = 255.0 / float(max_val)
rgb_arr = _np.clip(rgb_arr.astype(_np.float32) * scale, 0, 255).astype(_np.uint8)
try:
pil_tmp = Image.fromarray(rgb_arr, mode='RGB')
pil_tmp = ImageOps.autocontrast(pil_tmp)
rgb_arr = _np.array(pil_tmp)
except Exception:
pass
return Image.fromarray(rgb_arr, mode='RGB')
except Exception:
pass
except Exception:
pass
if Image is not None:
return Image.fromarray(indices, mode='L').convert("RGB")
else:
return indices
if comp == 0 and bytes_per_pixel == 1:
frame = self._build_grayscale_frame(width, height, row_stride, row_pixels, pixel_data)
if frame is not None:
logging.getLogger().info(
"VideoReceiverSFP: decoded %s frame %dx%d bytes_per_pixel=%d stride=%d",
get_image_type_name(image_type_id),
width,
height,
bytes_per_pixel,
row_stride,
)
return frame
logging.getLogger().debug(
"VideoReceiverSFP: unsupported comp=%s bpp=%s bytes_per_pixel=%s for type=%s",
comp,
bpp,
bytes_per_pixel,
get_image_type_name(image_type_id),
)
return None
def _network_payload_handler(self, completed_payload: bytearray):
payload = bytes(completed_payload)
frame = None
logging.getLogger().debug("VideoReceiverSFP: network payload received size=%d bytes", len(payload))
leader = None
if ImageLeaderData is not None and not self._looks_like_encoded_image(payload):
leader_size = ImageLeaderData.size()
if len(payload) >= leader_size:
try:
leader = ImageLeaderData.from_buffer_copy(payload[:leader_size])
except Exception:
leader = None
# If we parsed a leader, save a debug dump of header fields and raw payload
try:
if leader is not None:
try:
import time, os
dbgdir = os.path.join(os.getcwd(), 'dumps')
os.makedirs(dbgdir, exist_ok=True)
ts = time.strftime('%Y%m%d_%H%M%S')
# Save raw payload for offline analysis only if enabled
if getattr(self, '_save_bin', False):
binpath = os.path.join(dbgdir, f'VideoReceiverSFP_payload_{ts}.bin')
with open(binpath, 'wb') as fh:
fh.write(payload)
# manage rotation queue
try:
self._saved_bins.append(binpath)
while len(self._saved_bins) > self._dump_keep:
old = self._saved_bins.popleft()
try:
pathlib.Path(old).unlink()
except Exception:
pass
except Exception:
pass
else:
binpath = None
# Log key header fields
hd = leader.HEADER_DATA
gd = getattr(leader, 'GEO_DATA', None)
logging.getLogger().debug(
'VideoReceiverSFP: Parsed ImageLeader TYPE=%s DX=%s DY=%s BPP=%s STRIDE=%s PALTYPE=%s COMP=%s FCOUNTER=%s saved=%s',
int(hd.TYPE), int(hd.DX), int(hd.DY), int(hd.BPP), int(hd.STRIDE), int(getattr(hd,'PALTYPE',0)), int(hd.COMP), int(hd.FCOUNTER), binpath
)
except Exception:
logging.getLogger().exception('VideoReceiverSFP: error saving payload debug file')
except Exception:
pass
if leader is not None:
frame = self._decode_leader_payload(leader, payload)
if frame is None and Image is not None:
try:
img = Image.open(io.BytesIO(payload))
frame = img.convert("RGB")
logging.getLogger().debug("VideoReceiverSFP: payload decoded by PIL as standalone image")
except Exception:
frame = None
if frame is None:
frame = payload
# Save the first decoded frame to disk so user can verify output
if (not getattr(self, "_first_frame_saved", False)) and Image is not None and hasattr(frame, "save"):
try:
dumps_dir = os.path.join(os.getcwd(), "dumps")
os.makedirs(dumps_dir, exist_ok=True)
path = os.path.join(dumps_dir, "VideoReceiverSFP_first_frame.png")
frame.save(path)
self._first_frame_saved = True
self._first_frame_path = path
try:
logging.getLogger().info("VideoReceiverSFP: saved first decoded frame to %s", path)
except Exception:
pass
except Exception:
try:
logging.getLogger().exception("VideoReceiverSFP: failed to save first decoded frame")
except Exception:
pass
# Additionally save an enhanced preview image to help Windows thumbnail/preview
try:
from PIL import ImageOps
import numpy as _np
try:
arr = _np.array(frame.convert('RGB'))
nonzero = _np.count_nonzero(arr)
pct_nonzero = float(nonzero) / arr.size if arr.size > 0 else 0.0
except Exception:
arr = None
pct_nonzero = 0.0
enhanced = None
# If image is very sparse (few non-zero pixels) scale non-zero values to full range
if arr is not None and pct_nonzero < 0.05:
try:
mask = _np.any(arr != 0, axis=2)
if _np.any(mask):
maxv = int(arr[mask].max())
if maxv > 0:
scale = 255.0 / float(maxv)
arr2 = _np.clip(arr.astype(_np.float32) * scale, 0, 255).astype(_np.uint8)
enhanced = Image.fromarray(arr2, mode='RGB')
except Exception:
enhanced = None
# Fallback to PIL autocontrast if not produced above
if enhanced is None:
try:
enhanced = ImageOps.autocontrast(frame)
except Exception:
enhanced = None
if enhanced is not None:
try:
preview_path = os.path.join(dumps_dir, "VideoReceiverSFP_first_frame_preview.png")
enhanced.save(preview_path)
logging.getLogger().info("VideoReceiverSFP: saved enhanced preview to %s", preview_path)
except Exception:
logging.getLogger().exception("VideoReceiverSFP: failed to save enhanced preview")
except Exception:
# Do not break the main flow if preview creation fails
pass
with self._frame_lock:
self._last_frame = frame
self._frames_received += 1
try:
if self._frames_received % 5 == 0:
logging.getLogger().debug("VideoReceiverSFP: total frames_received=%d", self._frames_received)
except Exception:
pass
for cb in list(self._callbacks):
try:
# If saving of PNGs enabled and frame is an Image, save timestamped copy
if getattr(self, '_save_png', False) and Image is not None and hasattr(frame, 'save'):
try:
ts = time.strftime('%Y%m%d_%H%M%S') + (f"_{int(time.time() * 1000) % 1000:03d}")
dumps_dir = getattr(self, '_dumps_dir', os.path.join(os.getcwd(), 'dumps'))
os.makedirs(dumps_dir, exist_ok=True)
pngpath = os.path.join(dumps_dir, f'VideoReceiverSFP_frame_{ts}.png')
frame.save(pngpath)
self._saved_pngs.append(pngpath)
while len(self._saved_pngs) > self._dump_keep:
old = self._saved_pngs.popleft()
try:
pathlib.Path(old).unlink()
except Exception:
pass
except Exception:
logging.getLogger().exception("VideoReceiverSFP: failed saving png frame")
cb(frame)
except Exception:
pass
# Methods to control saving flags from external UI
def set_save_png(self, enabled: bool) -> None:
self._save_png = bool(enabled)
logging.getLogger().info("VideoReceiverSFP: save_png set to %s", self._save_png)
def set_save_bin(self, enabled: bool) -> None:
self._save_bin = bool(enabled)
logging.getLogger().info("VideoReceiverSFP: save_bin set to %s", self._save_bin)