SXXXXXXX_ControlPanel/VideoReceiverSFP/core/sfp_module.py
2026-01-16 09:37:55 +01:00

1531 lines
72 KiB
Python

"""VideoReceiverSFP: SFP receiver module (simulated and network modes).
This module implements a simple L0-like adapter that can run in simulated
mode by reading images from a local directory and publishing frames to
registered callbacks, or in network mode by listening for SFP UDP packets.
"""
from typing import Callable, Dict, Any, Optional
from collections import deque
import pathlib
import threading
import time
import os
import logging
import io
try:
import numpy as np
except Exception: # pragma: no cover - numpy is bundled via requirements, but guard anyway
np = None # type: ignore
try:
from PIL import Image
except Exception:
Image = None
# Optional network transport
try:
from .sfp_transport import SfpTransport
from .sfp_structures import SFPHeader
except Exception:
SfpTransport = None
SFPHeader = None
# Try to import the authoritative image structures from controlpanel
try:
from controlpanel.core.sfp_structures import ImageLeaderData
except Exception:
ImageLeaderData = None
# Use local image processing helpers (module must be self-contained)
try:
from .image_processing import normalize_image, apply_color_palette, apply_brightness_contrast, resize_image
except Exception:
normalize_image = None
apply_color_palette = None
apply_brightness_contrast = None
resize_image = None
try:
from .mfd_palette import build_mfd_lut, apply_mfd_lut
except Exception:
build_mfd_lut = None # type: ignore
apply_mfd_lut = None # type: ignore
try:
from .dump_manager import DumpManager
except Exception:
DumpManager = None
# --- Image Type Constants ---
# Based on actual payload analysis: TYPE=1 with PALTYPE=1 is MFD, TYPE=2 is SAR
IMAGE_TYPE_UNKNOWN = 0
IMAGE_TYPE_MFD = 1 # MFD images have TYPE=1 and typically PALTYPE=1 (indexed palette)
IMAGE_TYPE_SAR = 2 # SAR images have TYPE=2
def get_image_type_name(type_id):
"""Helper function to get the name of the image type."""
return {
IMAGE_TYPE_SAR: "SAR",
IMAGE_TYPE_MFD: "MFD",
}.get(type_id, f"Unknown ({type_id})")
class SfpConnectorModule:
"""Minimal, self-contained SFP connector for development and testing.
- When initialized with `sim_dir` it will stream images from that folder
at the configured `fps`.
- When initialized with `host`/`port` it will listen for SFP UDP and
dispatch completed payloads to callbacks.
"""
def __init__(self, name: str = "VideoReceiverSFP"):
self.module_name = name
self.is_running = False
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._callbacks = []
self._last_frame = None
self._frame_lock = threading.Lock()
self._sim_dir: Optional[str] = None
self._fps = 5
self._frame_idx = 0
self._host = None
self._port = None
self._transport: Optional[object] = None
# normalization method: None, 'cp' (use controlpanel.normalize_image), or 'autocontrast'
self._normalize_method: Optional[str] = None
self._image_type_filter: Optional[int] = None
self._mfd_lut = None
# saving flags and queues
# Saving of received payloads is disabled by default; use config flags
self._save_png = False
self._save_bin = False
# video recording flags (disabled by default)
self._record_mfd_video = False
self._record_sar_video = False
# runtime tracking of active video writers
self._mfd_video_active = False
self._sar_video_active = False
# desired video fps when starting writers (default 20)
self._video_fps = 20
try:
from .config import DUMP_KEEP_COUNT, DUMPS_DIR
self._dump_keep = int(DUMP_KEEP_COUNT)
self._dumps_dir = str(DUMPS_DIR)
except Exception:
self._dump_keep = 100
self._dumps_dir = os.path.join(os.getcwd(), 'dumps')
# centralized dump manager for saving/rotation
try:
if DumpManager is not None:
self._dump_manager = DumpManager(self._dumps_dir, self._dump_keep)
else:
self._dump_manager = None
except Exception:
self._dump_manager = None
# deques to track saved files for rotation
self._saved_pngs = deque()
self._saved_bins = deque()
# dedicated callback lists for MFD and SAR viewers
self._mfd_callbacks = []
self._sar_callbacks = []
# callbacks for SAR metadata display (receives a formatted string)
self._sar_metadata_callbacks = []
# SAR-specific display controls
self._sar_brightness = 0
self._sar_contrast = 0
self._sar_autocontrast = False
self._saved_sar_pngs = deque()
self._sar_save_png = False
# track whether we've saved the first decoded frame to disk
self._first_frame_saved = False
self._first_frame_path: Optional[str] = None
# internal counters for logging
self._frames_received = 0
def initialize(self, config: Dict[str, Any]) -> bool:
"""Initialize the module.
Config keys:
- sim_dir: optional directory with sample images for simulated mode
- fps: frames per second (default 5)
- host: optional host to bind for network mode
- port: optional UDP port for network mode
"""
self._sim_dir = config.get("sim_dir")
self._fps = int(config.get("fps", 5))
self._host = config.get("host")
self._port = config.get("port")
self._normalize_method = config.get("normalize", None)
image_type_str = config.get("image_type_filter")
if image_type_str:
image_type_str = str(image_type_str).upper()
if image_type_str == 'MFD':
self._image_type_filter = IMAGE_TYPE_MFD
logging.info("VideoReceiverSFP: Filtering for MFD images only.")
elif image_type_str == 'SAR':
self._image_type_filter = IMAGE_TYPE_SAR
logging.info("VideoReceiverSFP: Filtering for SAR images only.")
else:
logging.warning("VideoReceiverSFP: Unknown image_type_filter=%s", image_type_str)
try:
logging.getLogger().debug("VideoReceiverSFP.initialize: sim_dir=%s fps=%s host=%s port=%s", self._sim_dir, self._fps, self._host, self._port)
except Exception:
pass
# Read default recording flags from local module config if present
try:
# Try to prefer centralized app config only if it explicitly defines the keys.
# Otherwise fall back to the local VideoReceiverSFP/core/config.py defaults.
try:
import controlpanel.config as appcfg
from . import config as localcfg
# If controlpanel.config explicitly defines the attributes, use them;
# otherwise use the local module defaults.
if hasattr(appcfg, 'RECORD_MFD_VIDEO'):
try:
self._record_mfd_video = bool(getattr(appcfg, 'RECORD_MFD_VIDEO'))
except Exception:
pass
else:
try:
self._record_mfd_video = bool(getattr(localcfg, 'RECORD_MFD_VIDEO', self._record_mfd_video))
except Exception:
pass
if hasattr(appcfg, 'RECORD_SAR_VIDEO'):
try:
self._record_sar_video = bool(getattr(appcfg, 'RECORD_SAR_VIDEO'))
except Exception:
pass
else:
try:
self._record_sar_video = bool(getattr(localcfg, 'RECORD_SAR_VIDEO', self._record_sar_video))
except Exception:
pass
if hasattr(appcfg, 'VIDEO_FPS'):
try:
self._video_fps = int(getattr(appcfg, 'VIDEO_FPS'))
except Exception:
pass
else:
try:
self._video_fps = int(getattr(localcfg, 'VIDEO_FPS', self._video_fps))
except Exception:
pass
except Exception:
# controlpanel.config not available; use local module config
from .config import RECORD_MFD_VIDEO, RECORD_SAR_VIDEO, VIDEO_FPS
try:
self._record_mfd_video = bool(RECORD_MFD_VIDEO)
except Exception:
pass
try:
self._record_sar_video = bool(RECORD_SAR_VIDEO)
except Exception:
pass
try:
self._video_fps = int(VIDEO_FPS)
except Exception:
pass
except Exception:
pass
# Allow orchestrator to override recording flags/fps via the config dict
try:
# Keys expected: 'record_mfd_video', 'record_sar_video', 'video_fps'
if 'record_mfd_video' in config:
try:
self._record_mfd_video = bool(config.get('record_mfd_video'))
except Exception:
pass
if 'record_sar_video' in config:
try:
self._record_sar_video = bool(config.get('record_sar_video'))
except Exception:
pass
if 'video_fps' in config:
try:
self._video_fps = int(config.get('video_fps'))
except Exception:
pass
except Exception:
pass
return True
def start_session(self) -> None:
if self.is_running:
return
# If network config provided and transport available, start transport
if self._host and self._port and SfpTransport is not None:
try:
# Register distinct handlers per SFP flow so the transport
# can call the correct one and we can rely on the flow id
# (ord('M') for MFD, ord('S') for SAR) instead of only
# trusting the embedded ImageLeader.TYPE which may be 0.
handlers = {
ord("M"): self._network_payload_handler_mfd,
ord("S"): self._network_payload_handler_sar,
}
self._transport = SfpTransport(self._host, int(self._port), handlers)
started = self._transport.start()
if started:
try:
logging.getLogger().info("VideoReceiverSFP: started SfpTransport on %s:%s", self._host, self._port)
except Exception:
pass
self.is_running = True
return
except Exception:
logging.exception("Failed to start SfpTransport; falling back to sim mode")
# fallback to simulated folder stream
self._stop_event.clear()
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
self.is_running = True
def stop_session(self) -> None:
if not self.is_running:
return
# Stop transport if active
if getattr(self, "_transport", None) is not None:
try:
self._transport.shutdown()
except Exception:
pass
self._transport = None
# Ensure any active video recordings are closed
try:
if getattr(self, '_dump_manager', None) is not None:
try:
logging.getLogger().info("VideoReceiverSFP: requesting stop of MFD video writer")
self._dump_manager.stop_video_record('mfd')
except Exception:
pass
try:
logging.getLogger().info("VideoReceiverSFP: requesting stop of SAR video writer")
self._dump_manager.stop_video_record('sar')
except Exception:
pass
except Exception:
pass
self._stop_event.set()
if self._thread is not None:
self._thread.join(timeout=2.0)
# mark writers inactive locally
self._mfd_video_active = False
self._sar_video_active = False
self.is_running = False
def register_callback(self, cb: Callable[[Any], None]) -> None:
if cb not in self._callbacks:
self._callbacks.append(cb)
try:
logging.getLogger().debug("VideoReceiverSFP: registered callback %s", getattr(cb, '__name__', str(cb)))
except Exception:
pass
def register_mfd_callback(self, cb: Callable[[Any], None]) -> None:
if cb not in self._mfd_callbacks:
self._mfd_callbacks.append(cb)
def register_sar_callback(self, cb: Callable[[Any], None]) -> None:
if cb not in self._sar_callbacks:
self._sar_callbacks.append(cb)
def register_sar_metadata_callback(self, cb: Callable[[str], None]) -> None:
"""Register a callback that receives a formatted metadata string for the last SAR image."""
if cb not in self._sar_metadata_callbacks:
self._sar_metadata_callbacks.append(cb)
def _notify_sar_metadata(self, metadata_str: Optional[str]) -> None:
if not metadata_str:
return
for cb in list(self._sar_metadata_callbacks):
try:
cb(metadata_str)
except Exception:
pass
def get_status(self) -> Dict[str, Any]:
return {
"is_running": self.is_running,
"sim_dir": self._sim_dir,
"fps": self._fps,
"host": self._host,
"port": self._port,
}
def get_data_stream(self) -> Any:
with self._frame_lock:
return self._last_frame
def update_mfd_lut(self, new_lut) -> None:
"""Update the MFD LUT used for MFD image decoding.
Args:
new_lut: 256x3 RGB LUT array (uint8)
"""
self._mfd_lut = new_lut
logging.info("VideoReceiverSFP: MFD LUT updated")
# Optionally re-decode last frame if it was MFD type
# (For simplicity, next frame will use new LUT)
def save_last_frame(self, path: str) -> bool:
frame = self.get_data_stream()
if frame is None:
return False
try:
if Image is not None and hasattr(frame, "save"):
frame.save(path)
else:
with open(path, "wb") as fh:
fh.write(frame)
return True
except Exception:
return False
def _list_sim_files(self):
if not self._sim_dir or not os.path.isdir(self._sim_dir):
return []
files = [f for f in sorted(os.listdir(self._sim_dir)) if not f.startswith(".")]
return [os.path.join(self._sim_dir, f) for f in files]
def _load_next_frame(self):
files = self._list_sim_files()
if not files:
return None
path = files[self._frame_idx % len(files)]
self._frame_idx += 1
try:
if Image is not None:
# Open without forced conversion so we can detect grayscale/index images
img = Image.open(path)
try:
mode = getattr(img, 'mode', None)
# If the file is single-channel (likely an MFD index image), attempt to colorize
if mode in ('L', 'P'):
try:
import numpy as _np
inds = _np.array(img.convert('L'))
self._ensure_mfd_lut()
if self._mfd_lut is not None and apply_mfd_lut is not None:
rgb = apply_mfd_lut(inds, self._mfd_lut)
if rgb is not None and Image is not None:
try:
frame = Image.fromarray(rgb.astype('uint8'), mode='RGB')
dumps_dir = os.path.join(os.getcwd(), 'dumps')
os.makedirs(dumps_dir, exist_ok=True)
dbg_path = os.path.join(dumps_dir, 'VideoReceiverSFP_colorized_debug.png')
frame.save(dbg_path)
except Exception:
pass
return frame
except Exception:
pass
# Some dumps are stored as RGB but actually contain index images (identical channels / low cardinality)
if mode == 'RGB':
try:
import numpy as _np
larr = _np.array(img.convert('L'))
unique_vals = _np.unique(larr)
# If unique index count is small (<=32) treat as indices and colorize
if unique_vals.size <= 32:
self._ensure_mfd_lut()
if self._mfd_lut is not None and apply_mfd_lut is not None:
rgb = apply_mfd_lut(larr, self._mfd_lut)
if rgb is not None and Image is not None:
try:
frame = Image.fromarray(rgb.astype('uint8'), mode='RGB')
dumps_dir = os.path.join(os.getcwd(), 'dumps')
os.makedirs(dumps_dir, exist_ok=True)
dbg_path = os.path.join(dumps_dir, 'VideoReceiverSFP_colorized_debug.png')
frame.save(dbg_path)
except Exception:
pass
return frame
except Exception:
pass
# Default: return an RGB-converted copy for display
return img.convert("RGB")
except Exception:
return img.convert("RGB")
else:
with open(path, "rb") as fh:
return fh.read()
except Exception:
return None
def _loop(self):
interval = 1.0 / max(1, self._fps)
while not self._stop_event.is_set():
frame = self._load_next_frame()
if frame is not None:
with self._frame_lock:
self._last_frame = frame
self._frames_received += 1
try:
if self._frames_received % 10 == 0:
logging.getLogger().info("VideoReceiverSFP: simulated frames received=%d", self._frames_received)
except Exception:
pass
for cb in list(self._callbacks):
try:
cb(frame)
except Exception:
pass
time.sleep(interval)
@staticmethod
def _looks_like_encoded_image(payload: bytes) -> bool:
if not payload:
return False
if payload.startswith(b"\x89PNG\r\n\x1a\n"):
return True
if payload.startswith(b"\xff\xd8\xff"):
return True
if payload.startswith(b"BM"):
return True
return False
def _ensure_mfd_lut(self):
if self._mfd_lut is not None:
return
# First try to construct the exact LUT used by ControlPanel (parity)
try:
from controlpanel.app_state import AppState
state = AppState()
params = state.mfd_params
raw_map_factor = params.get("raw_map_intensity", 128) / 255.0
pixel_map = params.get("pixel_to_category", {})
categories = params.get("categories", {})
new_lut = None
try:
import numpy as _np
new_lut = _np.zeros((256, 3), dtype=_np.uint8)
for idx in range(256):
cat_name = pixel_map.get(idx)
if cat_name:
cat_data = categories[cat_name]
bgr = cat_data["color"]
intensity_factor = cat_data["intensity"] / 255.0
new_lut[idx, 0] = _np.clip(int(round(float(bgr[0]) * intensity_factor)), 0, 255)
new_lut[idx, 1] = _np.clip(int(round(float(bgr[1]) * intensity_factor)), 0, 255)
new_lut[idx, 2] = _np.clip(int(round(float(bgr[2]) * intensity_factor)), 0, 255)
elif 32 <= idx <= 255:
raw_intensity = (float(idx) - 32.0) * (255.0 / 223.0)
final_gray = int(round(_np.clip(raw_intensity * raw_map_factor, 0, 255)))
new_lut[idx, :] = final_gray
except Exception:
new_lut = None
if new_lut is not None:
self._mfd_lut = new_lut
logging.getLogger().info("VideoReceiverSFP: built ControlPanel-compatible MFD LUT for colorization")
return
except Exception:
# fall through to other build methods
pass
# Fallback: use local build_mfd_lut if available
if build_mfd_lut is None:
return
try:
self._mfd_lut = build_mfd_lut()
logging.getLogger().info("VideoReceiverSFP: built fallback MFD LUT for colorization")
except Exception:
logging.getLogger().exception("VideoReceiverSFP: failed to build MFD LUT")
def _extract_index_array(self, width: int, height: int, row_stride: int, pixel_data: bytes):
if np is None:
return None
total_bytes = row_stride * height
if len(pixel_data) < total_bytes:
logging.getLogger().warning(
"VideoReceiverSFP: pixel_data shorter than expected (got=%d need=%d)",
len(pixel_data),
total_bytes,
)
return None
try:
arr = np.frombuffer(pixel_data[:total_bytes], dtype=np.uint8)
row_pixels = width
if row_stride == row_pixels:
arr = arr.reshape((height, width))
else:
arr = arr.reshape((height, row_stride))[:, :width]
return arr.copy()
except Exception:
logging.getLogger().exception("VideoReceiverSFP: failed to reshape MFD pixel data")
return None
def _build_grayscale_frame(self, width, height, row_stride, row_pixels, pixel_data):
if Image is None:
return None
try:
if row_stride == row_pixels:
img = Image.frombytes("L", (width, height), pixel_data)
else:
rows = []
for r in range(height):
start = r * row_stride
rows.append(pixel_data[start:start + row_pixels])
img = Image.frombytes("L", (width, height), b"".join(rows))
normalized_img = None
if self._normalize_method == 'cp' and normalize_image is not None:
try:
if np is not None:
arr = np.frombuffer(img.tobytes(), dtype=np.uint8).reshape((height, width))
norm = normalize_image(arr, target_type=np.uint8)
if norm is not None:
normalized_img = Image.fromarray(norm, mode='L')
except Exception:
logging.getLogger().exception("VideoReceiverSFP: controlpanel normalize_image failed")
elif self._normalize_method == 'autocontrast':
try:
from PIL import ImageOps
normalized_img = ImageOps.autocontrast(img)
except Exception:
normalized_img = None
if normalized_img is not None:
frame = normalized_img.convert("RGB")
try:
dumps_dir = os.path.join(os.getcwd(), "dumps")
os.makedirs(dumps_dir, exist_ok=True)
dbg_path = os.path.join(dumps_dir, "VideoReceiverSFP_last_normalized.png")
frame.save(dbg_path)
logging.getLogger().info("VideoReceiverSFP: saved normalized debug frame to %s", dbg_path)
except Exception:
pass
return frame
return img.convert("RGB")
except Exception:
logging.getLogger().exception("VideoReceiverSFP: failed to build grayscale frame")
return None
def _decode_leader_payload(self, leader, payload: bytes):
if ImageLeaderData is None:
return None
header = leader.HEADER_DATA
image_type_id = int(header.TYPE)
if self._image_type_filter is not None and image_type_id != self._image_type_filter:
logging.getLogger().debug(
"VideoReceiverSFP: skipping image_type=%s due to filter",
get_image_type_name(image_type_id),
)
return None
width = int(header.DX)
height = int(header.DY)
stride = int(header.STRIDE)
bpp = int(header.BPP)
comp = int(header.COMP)
pal_type = int(getattr(header, "PALTYPE", 0))
# `header.BPP` in the ControlPanel format is bytes-per-pixel (1 or 2)
# in most cases. Fall back to treating it as bits-per-pixel only if
# it looks larger than 2.
if bpp in (1, 2):
bytes_per_pixel = int(bpp)
else:
if bpp <= 8:
bytes_per_pixel = 1
elif bpp % 8 == 0:
bytes_per_pixel = bpp // 8
else:
bytes_per_pixel = max(1, (bpp + 7) // 8)
row_pixels = width * bytes_per_pixel
row_stride = stride if stride and stride > 0 else row_pixels
# Calculate pixel data offset using ImageLeaderData layout when available
if ImageLeaderData is not None:
try:
# Mirror ControlPanel's calculation to remain compatible with payload layout
pixel_offset = (
leader.HEADER_TAG.size()
+ leader.HEADER_DATA.size()
+ leader.GEO_TAG.size()
+ leader.GEO_DATA.size()
+ leader.RESERVED_TAG.size()
+ ImageLeaderData.get_reserved_data_size()
+ leader.CM_TAG.size()
+ ImageLeaderData.get_colour_map_size()
+ leader.PIXEL_TAG.size()
)
except Exception:
pixel_offset = ImageLeaderData.size()
else:
pixel_offset = 0
# Interpret header.STRIDE as number of pixels per row. Compute stride in bytes.
row_stride_pixels = stride if stride and stride > 0 else width
row_stride_bytes = row_stride_pixels * bytes_per_pixel
total_bytes = row_stride_bytes * height
# We'll reference the original payload buffer and use numpy views with
# proper `offset` and `strides` to match ControlPanel's reconstruction.
pixel_data_offset = pixel_offset
have_bytes = len(payload) - pixel_data_offset
if have_bytes < total_bytes:
logging.getLogger().warning(
"VideoReceiverSFP: truncated pixel payload (have=%d expected=%d)",
have_bytes,
total_bytes,
)
return None
if image_type_id == IMAGE_TYPE_MFD and pal_type == 1 and bytes_per_pixel == 1:
# Extract raw indices using the same approach as ControlPanel:
# create a NumPy view directly on the payload buffer with the
# correct offset and strides (bytes).
try:
# Use the original payload (bytes/bytearray) as the buffer
mview = None
if np is not None:
mview = np.ndarray(
shape=(height, width),
dtype=np.uint8,
buffer=payload,
offset=pixel_data_offset,
strides=(row_stride_pixels * 1, 1),
)
if mview is None:
return None
indices = mview.copy()
except Exception:
logging.getLogger().exception("VideoReceiverSFP: failed to construct MFD index view")
return None
# Attempt to produce a colorized RGB image using the MFD LUT,
# then enhance visibility (scale non-zero pixels and autocontrast)
rgb = None
try:
self._ensure_mfd_lut()
if self._mfd_lut is not None and apply_mfd_lut is not None:
rgb = apply_mfd_lut(indices, self._mfd_lut)
except Exception:
rgb = None
if rgb is not None:
try:
import numpy as _np
from PIL import ImageOps
rgb_arr = _np.asarray(rgb, dtype=_np.uint8)
mask = _np.any(rgb_arr != 0, axis=2)
pct_nonzero = float(_np.count_nonzero(mask)) / float(mask.size) if mask.size else 0.0
max_val = int(rgb_arr.max()) if rgb_arr.size else 0
if pct_nonzero < 0.10 or max_val < 220:
if _np.count_nonzero(mask) > 0 and max_val > 0:
scale = 255.0 / float(max_val)
rgb_arr = _np.clip(rgb_arr.astype(_np.float32) * scale, 0, 255).astype(_np.uint8)
try:
pil_tmp = Image.fromarray(rgb_arr, mode='RGB')
pil_tmp = ImageOps.autocontrast(pil_tmp)
rgb_arr = _np.array(pil_tmp)
except Exception:
pass
if Image is not None:
return Image.fromarray(rgb_arr, mode='RGB')
return rgb_arr
except Exception:
if Image is not None:
return Image.fromarray(rgb, mode='RGB')
return rgb
# Fallback: try again to map indices -> RGB, or return grayscale RGB
try:
self._ensure_mfd_lut()
if self._mfd_lut is not None and apply_mfd_lut is not None:
rgb2 = apply_mfd_lut(indices, self._mfd_lut)
if rgb2 is not None:
try:
import numpy as _np
from PIL import ImageOps
rgb_arr = _np.asarray(rgb2, dtype=_np.uint8)
max_val = int(rgb_arr.max()) if rgb_arr.size else 0
if _np.count_nonzero(_np.any(rgb_arr != 0, axis=2)) > 0 and max_val > 0:
scale = 255.0 / float(max_val)
rgb_arr = _np.clip(rgb_arr.astype(_np.float32) * scale, 0, 255).astype(_np.uint8)
try:
pil_tmp = Image.fromarray(rgb_arr, mode='RGB')
pil_tmp = ImageOps.autocontrast(pil_tmp)
rgb_arr = _np.array(pil_tmp)
except Exception:
pass
return Image.fromarray(rgb_arr, mode='RGB')
except Exception:
pass
except Exception:
pass
if Image is not None:
return Image.fromarray(indices, mode='L').convert("RGB")
else:
return indices
# Handle uncompressed grayscale frames. Support both 8-bit and 16-bit SAR images.
if comp == 0:
if bytes_per_pixel == 1:
# For 8-bit grayscale, build frame using bytes-per-row stride
frame = self._build_grayscale_frame(width, height, row_stride_bytes, row_pixels, payload[pixel_data_offset:pixel_data_offset + total_bytes])
if frame is not None:
logging.getLogger().info(
"VideoReceiverSFP: decoded %s frame %dx%d bytes_per_pixel=%d stride=%d",
get_image_type_name(image_type_id),
width,
height,
bytes_per_pixel,
row_stride_bytes,
)
return frame
# 16-bit per pixel handling (common for SAR images)
if bytes_per_pixel == 2:
try:
import numpy as _np
# Create a NumPy view on the original payload with proper offset
# and strides (bytes-per-row, bytes-per-pixel) to exactly mirror
# ControlPanel's reconstruction.
if _np is None:
return None
raw_view = _np.ndarray(
shape=(height, width),
dtype=_np.uint16,
buffer=payload,
offset=pixel_data_offset,
strides=(row_stride_bytes, 2),
)
arr = raw_view.copy()
# Normalize to uint8 for display using local normalize_image if available
norm = None
try:
if normalize_image is not None:
norm = normalize_image(arr, target_type=_np.uint8)
except Exception:
logging.getLogger().exception("VideoReceiverSFP: normalization of uint16 SAR failed")
if norm is None:
try:
norm = (_np.right_shift(arr, 8)).astype(_np.uint8)
except Exception:
logging.getLogger().exception("VideoReceiverSFP: fallback uint16->uint8 conversion failed")
return None
try:
from PIL import Image as _PILImage
pil_img = _PILImage.fromarray(norm, mode='L').convert('RGB')
logging.getLogger().info(
"VideoReceiverSFP: decoded %s frame %dx%d bytes_per_pixel=%d stride=%d (uint16->uint8)",
get_image_type_name(image_type_id),
width,
height,
bytes_per_pixel,
row_stride_bytes,
)
return pil_img
except Exception:
logging.getLogger().exception("VideoReceiverSFP: failed to build PIL image from normalized SAR data")
return norm
except Exception:
logging.getLogger().exception("VideoReceiverSFP: error decoding uint16 SAR")
return None
logging.getLogger().debug(
"VideoReceiverSFP: unsupported comp=%s bpp=%s bytes_per_pixel=%s for type=%s",
comp,
bpp,
bytes_per_pixel,
get_image_type_name(image_type_id),
)
return None
def _network_payload_handler(self, completed_payload: bytearray):
return self._network_payload_handler_with_flow(completed_payload, forced_flow=None)
def _network_payload_handler_mfd(self, completed_payload: bytearray):
return self._network_payload_handler_with_flow(completed_payload, forced_flow=ord('M'))
def _network_payload_handler_sar(self, completed_payload: bytearray):
return self._network_payload_handler_with_flow(completed_payload, forced_flow=ord('S'))
def _network_payload_handler_with_flow(self, completed_payload: bytearray, forced_flow: Optional[int] = None):
payload = bytes(completed_payload)
frame = None
logging.getLogger().debug("VideoReceiverSFP: network payload received size=%d bytes", len(payload))
leader = None
if ImageLeaderData is not None and not self._looks_like_encoded_image(payload):
leader_size = ImageLeaderData.size()
if len(payload) >= leader_size:
try:
leader = ImageLeaderData.from_buffer_copy(payload[:leader_size])
except Exception:
leader = None
# If we parsed a leader, save a debug dump of header fields and raw payload
if leader is not None:
import time, os, pathlib
dbgdir = os.path.join(os.getcwd(), 'dumps')
os.makedirs(dbgdir, exist_ok=True)
ts = time.strftime('%Y%m%d_%H%M%S')
# Decide whether to save this payload based on configured flags
try:
from .config import SAVE_RAW_MFD, SAVE_RAW_SAR, SAVE_RAW_UNKNOWN
except Exception:
SAVE_RAW_MFD = False
SAVE_RAW_SAR = False
SAVE_RAW_UNKNOWN = False
# Inspect leader to choose flag
try:
hd = leader.HEADER_DATA
ity = int(getattr(hd, 'TYPE', 0))
except Exception:
hd = None
ity = 0
# If transport indicated flow (M or S), prefer that as authoritative
if forced_flow is not None:
if forced_flow == ord('S'):
try:
if hd is not None:
hd.TYPE = int(IMAGE_TYPE_SAR)
ity = IMAGE_TYPE_SAR
except Exception:
pass
elif forced_flow == ord('M'):
try:
if hd is not None:
hd.TYPE = int(IMAGE_TYPE_MFD)
ity = IMAGE_TYPE_MFD
except Exception:
pass
save_this_bin = (
(ity == IMAGE_TYPE_MFD and SAVE_RAW_MFD)
or (ity == IMAGE_TYPE_SAR and SAVE_RAW_SAR)
or (ity == 0 and SAVE_RAW_UNKNOWN)
)
binpath = None
if save_this_bin or getattr(self, '_save_bin', False):
try:
if getattr(self, '_dump_manager', None) is not None:
binpath = self._dump_manager.save_bin(payload, category=('mfd' if ity==IMAGE_TYPE_MFD else ('sar' if ity==IMAGE_TYPE_SAR else 'unknown')))
else:
binpath = os.path.join(dbgdir, f'VideoReceiverSFP_payload_{ts}.bin')
with open(binpath, 'wb') as fh:
fh.write(payload)
self._saved_bins.append(binpath)
while len(self._saved_bins) > self._dump_keep:
old = self._saved_bins.popleft()
try:
pathlib.Path(old).unlink()
except Exception:
pass
except Exception:
logging.getLogger().exception('VideoReceiverSFP: error saving payload bin')
# Log leader fields
try:
logging.getLogger().debug(
'VideoReceiverSFP: Parsed ImageLeader TYPE=%s DX=%s DY=%s BPP=%s STRIDE=%s PALTYPE=%s COMP=%s FCOUNTER=%s saved=%s',
int(hd.TYPE) if hd is not None else -1,
int(hd.DX) if hd is not None else -1,
int(hd.DY) if hd is not None else -1,
int(hd.BPP) if hd is not None else -1,
int(hd.STRIDE) if hd is not None else -1,
int(getattr(hd,'PALTYPE',0)) if hd is not None else 0,
int(hd.COMP) if hd is not None else -1,
int(hd.FCOUNTER) if hd is not None else -1,
binpath,
)
except Exception:
pass
# Build formatted metadata string (if controlpanel helper available)
try:
metadata_str = None
try:
from controlpanel.utils.utils import format_ctypes_structure
metadata_str = format_ctypes_structure(leader)
except Exception:
metadata_str = None
# Only store/notify SAR metadata when leader indicates a SAR image.
try:
ity_local = int(getattr(hd, 'TYPE', 0)) if hd is not None else 0
except Exception:
ity_local = 0
if ity_local == IMAGE_TYPE_SAR:
self._last_sar_metadata_str = metadata_str
try:
self._notify_sar_metadata(metadata_str)
except Exception:
pass
except Exception:
self._last_sar_metadata_str = None
# Optionally write a preview (PNG for MFD, BMP for SAR/UNKNOWN)
try:
from .config import SAVE_BMP_MFD, SAVE_BMP_SAR, SAVE_BMP_UNKNOWN
except Exception:
SAVE_BMP_MFD = SAVE_BMP_SAR = SAVE_BMP_UNKNOWN = False
try:
ity = int(getattr(hd, 'TYPE', 0)) if hd is not None else 0
except Exception:
ity = 0
want_preview = (
(ity == IMAGE_TYPE_MFD and SAVE_BMP_MFD)
or (ity == IMAGE_TYPE_SAR and SAVE_BMP_SAR)
or (ity == 0 and SAVE_BMP_UNKNOWN)
)
if want_preview:
try:
# Decode the payload into the exact frame that will be displayed
preview_frame = None
try:
preview_frame = self._decode_leader_payload(leader, payload)
except Exception:
preview_frame = None
# If we obtained a PIL Image, save that image so the saved BMP
# matches the one shown in the viewer (which may apply autocontrast
# or other conversions).
if preview_frame is not None and Image is not None and hasattr(preview_frame, 'save'):
base_name = 'mfd' if ity == IMAGE_TYPE_MFD else ('sar' if ity == IMAGE_TYPE_SAR else 'unknown')
fmt = 'bmp'
if getattr(self, '_dump_manager', None) is not None:
saved = self._dump_manager.save_preview_from_pil(preview_frame, category=base_name, fmt=fmt)
if saved:
logging.getLogger().info('VideoReceiverSFP: saved payload preview %s', saved)
else:
try:
ts_name = ts + (f"_{int(time.time() * 1000) % 1000:03d}")
dumps_dir = dbgdir
outpath = os.path.join(dumps_dir, f'VideoReceiverSFP_{base_name}_{ts_name}.bmp')
preview_frame.save(outpath)
logging.getLogger().info('VideoReceiverSFP: saved payload preview %s', outpath)
except Exception:
logging.getLogger().exception('VideoReceiverSFP: failed saving payload preview (fallback)')
else:
# Fallback: if decoding didn't return a PIL image, try earlier array-based approach
try:
dx = int(hd.DX)
dy = int(hd.DY)
bpp = int(hd.BPP)
stride_pixels = int(hd.STRIDE) if int(hd.STRIDE) > 0 else dx
bytes_per_pixel = 1 if bpp <= 8 else (bpp // 8)
try:
pixel_offset = (
leader.HEADER_TAG.size()
+ leader.HEADER_DATA.size()
+ leader.GEO_TAG.size()
+ leader.GEO_DATA.size()
+ leader.RESERVED_TAG.size()
+ ImageLeaderData.get_reserved_data_size()
+ leader.CM_TAG.size()
+ ImageLeaderData.get_colour_map_size()
+ leader.PIXEL_TAG.size()
)
except Exception:
pixel_offset = ImageLeaderData.size()
row_stride_bytes = stride_pixels * bytes_per_pixel
total_bytes = row_stride_bytes * dy
pixel_data = payload[pixel_offset:pixel_offset + total_bytes]
if len(pixel_data) >= total_bytes and np is not None:
if bytes_per_pixel == 2:
arr = np.frombuffer(pixel_data[:total_bytes], dtype=np.uint16)
arr = arr.reshape((dy, stride_pixels))[:, :dx]
try:
norm = normalize_image(arr, target_type=np.uint8) if normalize_image is not None else None
except Exception:
norm = None
if norm is None:
norm = (arr >> 8).astype(np.uint8)
else:
arr = np.frombuffer(pixel_data[:total_bytes], dtype=np.uint8)
arr = arr.reshape((dy, stride_pixels))[:, :dx]
try:
norm = normalize_image(arr, target_type=np.uint8) if normalize_image is not None else None
except Exception:
norm = None
if norm is None:
norm = arr.astype(np.uint8)
try:
base_name = 'mfd' if ity == IMAGE_TYPE_MFD else ('sar' if ity == IMAGE_TYPE_SAR else 'unknown')
fmt = 'bmp'
if getattr(self, '_dump_manager', None) is not None:
saved = self._dump_manager.save_preview_from_array(norm, category=base_name, fmt=fmt)
if saved:
logging.getLogger().info('VideoReceiverSFP: saved payload preview %s', saved)
else:
from PIL import Image as _PILImage
ts_name = ts + (f"_{int(time.time() * 1000) % 1000:03d}")
dumps_dir = dbgdir
outpath = os.path.join(dumps_dir, f'VideoReceiverSFP_{base_name}_{ts_name}.bmp')
pil_img = _PILImage.fromarray(norm, mode='L')
pil_img.save(outpath)
logging.getLogger().info('VideoReceiverSFP: saved payload preview %s', outpath)
except Exception:
logging.getLogger().exception('VideoReceiverSFP: failed saving payload preview (array fallback)')
except Exception:
logging.getLogger().exception('VideoReceiverSFP: error creating payload preview (array fallback)')
except Exception:
logging.getLogger().exception('VideoReceiverSFP: error creating payload preview')
if leader is not None:
# Heuristic and diagnostics: some senders set HEADER_DATA.TYPE==0 or
# provide inconsistent DY/STRIDE. Try to infer SAR images from the
# payload size and stride; add extra logging to help debug cases
# where frames are classified as Unknown (0).
try:
hd = leader.HEADER_DATA
hdr_type = int(getattr(hd, 'TYPE', 0))
width = int(getattr(hd, 'DX', 0))
stride = int(getattr(hd, 'STRIDE', 0))
bpp = int(getattr(hd, 'BPP', 8))
if bpp <= 8:
bytes_per_pixel = 1
elif bpp % 8 == 0:
bytes_per_pixel = bpp // 8
else:
bytes_per_pixel = max(1, (bpp + 7) // 8)
row_pixels = width * bytes_per_pixel
row_stride = stride if stride and stride > 0 else row_pixels
try:
pixel_offset = (
leader.HEADER_TAG.size()
+ leader.HEADER_DATA.size()
+ leader.GEO_TAG.size()
+ leader.GEO_DATA.size()
+ leader.RESERVED_TAG.size()
+ ImageLeaderData.get_reserved_data_size()
+ leader.CM_TAG.size()
+ ImageLeaderData.get_colour_map_size()
+ leader.PIXEL_TAG.size()
)
except Exception:
pixel_offset = ImageLeaderData.size()
total_bytes = len(payload) - pixel_offset
# If TYPE is unknown, attempt to infer SAR by payload geometry
if hdr_type == 0 and total_bytes > 0 and row_stride > 0 and (total_bytes % row_stride) == 0:
inferred_h = total_bytes // row_stride
# If we see a common SAR width and inferred height in plausible range,
# assume SAR (TYPE=2). This handles senders that omit TYPE.
if width in (2048,) and 1800 <= inferred_h <= 2100:
try:
hd.DY = int(inferred_h)
hd.TYPE = int(2)
logging.getLogger().warning(
"VideoReceiverSFP: inferred SAR for payload - set TYPE=2 DX=%s DY=%s stride=%s",
width,
inferred_h,
row_stride,
)
except Exception:
pass
# If still unknown, emit a detailed debug message to aid investigation
if int(getattr(hd, 'TYPE', 0)) == 0:
try:
logging.getLogger().warning(
"VideoReceiverSFP: leader TYPE==0 parsed DX=%s DY=%s BPP=%s STRIDE=%s payload_bytes=%d",
width,
int(getattr(hd, 'DY', -1)),
bpp,
row_stride,
len(payload),
)
# save small header+payload sample if debug bin saving is enabled
if getattr(self, '_save_bin', False):
try:
if getattr(self, '_dump_manager', None) is not None:
path = self._dump_manager.save_bin(payload, category='unknown')
if path:
logging.getLogger().info('VideoReceiverSFP: saved unknown-type payload to %s', path)
else:
import time, os, pathlib
dbgdir = os.path.join(os.getcwd(), 'dumps')
os.makedirs(dbgdir, exist_ok=True)
ts = time.strftime('%Y%m%d_%H%M%S')
try:
path = os.path.join(dbgdir, f'VideoReceiverSFP_unknown_TYPE_{ts}.bin')
with open(path, 'wb') as fh:
fh.write(payload)
self._saved_bins.append(path)
while len(self._saved_bins) > self._dump_keep:
old = self._saved_bins.popleft()
try:
pathlib.Path(old).unlink()
except Exception:
pass
logging.getLogger().info('VideoReceiverSFP: saved unknown-type payload to %s', path)
except Exception:
logging.getLogger().exception('VideoReceiverSFP: failed to save unknown-type payload')
except Exception:
logging.getLogger().exception('VideoReceiverSFP: failed to save unknown-type payload')
except Exception:
logging.getLogger().exception('VideoReceiverSFP: diagnostic logging for TYPE==0 failed')
except Exception:
logging.getLogger().exception("VideoReceiverSFP: error while attempting to infer image type from leader")
frame = self._decode_leader_payload(leader, payload)
if frame is None and Image is not None:
try:
img = Image.open(io.BytesIO(payload))
frame = img.convert("RGB")
logging.getLogger().debug("VideoReceiverSFP: payload decoded by PIL as standalone image")
except Exception:
frame = None
if frame is None:
frame = payload
# Save the first decoded frame to disk so user can verify output
if (not getattr(self, "_first_frame_saved", False)) and Image is not None and hasattr(frame, "save"):
try:
dumps_dir = os.path.join(os.getcwd(), "dumps")
os.makedirs(dumps_dir, exist_ok=True)
path = os.path.join(dumps_dir, "VideoReceiverSFP_first_frame.png")
frame.save(path)
self._first_frame_saved = True
self._first_frame_path = path
try:
logging.getLogger().info("VideoReceiverSFP: saved first decoded frame to %s", path)
except Exception:
pass
except Exception:
try:
logging.getLogger().exception("VideoReceiverSFP: failed to save first decoded frame")
except Exception:
pass
# Additionally save an enhanced preview image to help Windows thumbnail/preview
try:
from PIL import ImageOps
import numpy as _np
try:
arr = _np.array(frame.convert('RGB'))
nonzero = _np.count_nonzero(arr)
pct_nonzero = float(nonzero) / arr.size if arr.size > 0 else 0.0
except Exception:
arr = None
pct_nonzero = 0.0
enhanced = None
# If image is very sparse (few non-zero pixels) scale non-zero values to full range
if arr is not None and pct_nonzero < 0.05:
try:
mask = _np.any(arr != 0, axis=2)
if _np.any(mask):
maxv = int(arr[mask].max())
if maxv > 0:
scale = 255.0 / float(maxv)
arr2 = _np.clip(arr.astype(_np.float32) * scale, 0, 255).astype(_np.uint8)
enhanced = Image.fromarray(arr2, mode='RGB')
except Exception:
enhanced = None
# Fallback to PIL autocontrast if not produced above
if enhanced is None:
try:
enhanced = ImageOps.autocontrast(frame)
except Exception:
enhanced = None
if enhanced is not None:
try:
preview_path = os.path.join(dumps_dir, "VideoReceiverSFP_first_frame_preview.png")
enhanced.save(preview_path)
logging.getLogger().info("VideoReceiverSFP: saved enhanced preview to %s", preview_path)
except Exception:
logging.getLogger().exception("VideoReceiverSFP: failed to save enhanced preview")
except Exception:
# Do not break the main flow if preview creation fails
pass
with self._frame_lock:
self._last_frame = frame
self._frames_received += 1
try:
if self._frames_received % 5 == 0:
logging.getLogger().debug("VideoReceiverSFP: total frames_received=%d", self._frames_received)
except Exception:
pass
# Determine image type (if leader available) to route to specialized callbacks
image_type_id = None
try:
if leader is not None:
image_type_id = int(leader.HEADER_DATA.TYPE)
except Exception:
image_type_id = None
# If this is a SAR image, apply SAR-specific transforms and notify SAR callbacks
if image_type_id == IMAGE_TYPE_SAR:
try:
if Image is not None and hasattr(frame, 'copy'):
sar_img = frame.copy()
# Apply autocontrast first if requested
if getattr(self, '_sar_autocontrast', False):
try:
from PIL import ImageOps
sar_img = ImageOps.autocontrast(sar_img)
except Exception:
pass
# Apply brightness/contrast
try:
from PIL import ImageEnhance
bf = max(0.0, 1.0 + (float(self._sar_brightness) / 100.0))
cf = max(0.0, 1.0 + (float(self._sar_contrast) / 100.0))
if bf != 1.0:
sar_img = ImageEnhance.Brightness(sar_img).enhance(bf)
if cf != 1.0:
sar_img = ImageEnhance.Contrast(sar_img).enhance(cf)
except Exception:
pass
# Optionally save SAR PNGs separately
if getattr(self, '_sar_save_png', False):
try:
# Use DumpManager when available
if getattr(self, '_dump_manager', None) is not None:
saved = self._dump_manager.save_preview_from_pil(sar_img, category='sar', fmt='png')
if saved:
logging.getLogger().info("VideoReceiverSFP: saved sar png %s", saved)
else:
ts = time.strftime('%Y%m%d_%H%M%S') + (f"_{int(time.time() * 1000) % 1000:03d}")
dumps_dir = getattr(self, '_dumps_dir', os.path.join(os.getcwd(), 'dumps'))
os.makedirs(dumps_dir, exist_ok=True)
pngpath = os.path.join(dumps_dir, f'VideoReceiverSFP_sar_{ts}.png')
sar_img.save(pngpath)
self._saved_sar_pngs.append(pngpath)
while len(self._saved_sar_pngs) > self._dump_keep:
old = self._saved_sar_pngs.popleft()
try:
pathlib.Path(old).unlink()
except Exception:
pass
except Exception:
logging.getLogger().exception("VideoReceiverSFP: failed saving sar png frame")
# Optionally record SAR video (start writer on first valid frame)
try:
if getattr(self, '_record_sar_video', False) and getattr(self, '_dump_manager', None) is not None:
if not getattr(self, '_sar_video_active', False):
try:
w, h = None, None
if Image is not None and hasattr(sar_img, 'size'):
w, h = sar_img.size
else:
try:
import numpy as _np
arr = _np.asarray(sar_img)
if arr is not None and arr.ndim >= 2:
h, w = arr.shape[0], arr.shape[1]
except Exception:
pass
if w and h:
started = self._dump_manager.start_video_record('sar', w, h, fps=getattr(self, '_video_fps', 20))
if started:
self._sar_video_active = True
try:
logging.getLogger().info("VideoReceiverSFP: started SAR video writer (%dx%d @ %s FPS)", w, h, getattr(self, '_video_fps', 20))
except Exception:
pass
except Exception:
logging.getLogger().exception("VideoReceiverSFP: failed to start sar video writer")
# write frame if active
if getattr(self, '_sar_video_active', False):
try:
self._dump_manager.write_video_frame(sar_img, 'sar')
except Exception:
logging.getLogger().exception("VideoReceiverSFP: failed writing sar frame to video")
except Exception:
pass
# Deliver to SAR callbacks
for scb in list(self._sar_callbacks):
try:
scb(sar_img)
except Exception:
pass
except Exception:
logging.getLogger().exception("VideoReceiverSFP: error processing SAR image for SAR callbacks")
# If this is an MFD image, notify MFD callbacks
if image_type_id == IMAGE_TYPE_MFD:
try:
# If requested, initialize/start MFD video writer on first frame
try:
if getattr(self, '_record_mfd_video', False) and getattr(self, '_dump_manager', None) is not None:
if not getattr(self, '_mfd_video_active', False):
try:
w, h = None, None
if Image is not None and hasattr(frame, 'size'):
w, h = frame.size
else:
try:
import numpy as _np
arr = _np.asarray(frame)
if arr is not None and arr.ndim >= 2:
h, w = arr.shape[0], arr.shape[1]
except Exception:
pass
if w and h:
started = self._dump_manager.start_video_record('mfd', w, h, fps=getattr(self, '_video_fps', 20))
if started:
self._mfd_video_active = True
try:
logging.getLogger().info("VideoReceiverSFP: started MFD video writer (%dx%d @ %s FPS)", w, h, getattr(self, '_video_fps', 20))
except Exception:
pass
except Exception:
logging.getLogger().exception('VideoReceiverSFP: failed to start mfd video writer')
# write frame if active
if getattr(self, '_mfd_video_active', False):
try:
self._dump_manager.write_video_frame(frame, 'mfd')
except Exception:
logging.getLogger().exception('VideoReceiverSFP: failed writing mfd frame to video')
except Exception:
pass
for mcb in list(self._mfd_callbacks):
try:
mcb(frame)
except Exception:
pass
except Exception:
pass
# Fallback: always notify generic callbacks (backwards compatibility)
for cb in list(self._callbacks):
try:
# If saving of PNGs enabled and frame is an Image, save timestamped copy
if getattr(self, '_save_png', False) and Image is not None and hasattr(frame, 'save'):
try:
# Use DumpManager when available
if getattr(self, '_dump_manager', None) is not None:
saved = self._dump_manager.save_preview_from_pil(frame, category='frame', fmt='png')
if saved:
logging.getLogger().info("VideoReceiverSFP: saved frame to %s", saved)
else:
ts = time.strftime('%Y%m%d_%H%M%S') + (f"_{int(time.time() * 1000) % 1000:03d}")
dumps_dir = getattr(self, '_dumps_dir', os.path.join(os.getcwd(), 'dumps'))
os.makedirs(dumps_dir, exist_ok=True)
pngpath = os.path.join(dumps_dir, f'VideoReceiverSFP_frame_{ts}.png')
frame.save(pngpath)
self._saved_pngs.append(pngpath)
while len(self._saved_pngs) > self._dump_keep:
old = self._saved_pngs.popleft()
try:
pathlib.Path(old).unlink()
except Exception:
pass
except Exception:
logging.getLogger().exception("VideoReceiverSFP: failed saving png frame")
cb(frame)
except Exception:
pass
# Methods to control saving flags from external UI
def set_save_png(self, enabled: bool) -> None:
self._save_png = bool(enabled)
logging.getLogger().info("VideoReceiverSFP: save_png set to %s", self._save_png)
def set_save_bin(self, enabled: bool) -> None:
self._save_bin = bool(enabled)
logging.getLogger().info("VideoReceiverSFP: save_bin set to %s", self._save_bin)
def set_record_mfd_video(self, enabled: bool, fps: Optional[int] = None) -> None:
try:
self._record_mfd_video = bool(enabled)
if fps is not None:
self._video_fps = int(fps)
logging.getLogger().info("VideoReceiverSFP: record_mfd_video set to %s (fps=%s)", self._record_mfd_video, self._video_fps)
# if disabling, ensure writer closed
if not self._record_mfd_video and getattr(self, '_dump_manager', None) is not None:
try:
self._dump_manager.stop_video_record('mfd')
except Exception:
pass
self._mfd_video_active = False
except Exception:
pass
def set_record_sar_video(self, enabled: bool, fps: Optional[int] = None) -> None:
try:
self._record_sar_video = bool(enabled)
if fps is not None:
self._video_fps = int(fps)
logging.getLogger().info("VideoReceiverSFP: record_sar_video set to %s (fps=%s)", self._record_sar_video, self._video_fps)
if not self._record_sar_video and getattr(self, '_dump_manager', None) is not None:
try:
self._dump_manager.stop_video_record('sar')
except Exception:
pass
self._sar_video_active = False
except Exception:
pass
# SAR-specific parameter setters
def set_sar_brightness(self, value: int) -> None:
try:
self._sar_brightness = int(value)
logging.getLogger().info("VideoReceiverSFP: sar_brightness set to %s", self._sar_brightness)
except Exception:
pass
def set_sar_contrast(self, value: int) -> None:
try:
self._sar_contrast = int(value)
logging.getLogger().info("VideoReceiverSFP: sar_contrast set to %s", self._sar_contrast)
except Exception:
pass
def set_sar_autocontrast(self, enabled: bool) -> None:
try:
self._sar_autocontrast = bool(enabled)
logging.getLogger().info("VideoReceiverSFP: sar_autocontrast set to %s", self._sar_autocontrast)
except Exception:
pass
def set_sar_save_png(self, enabled: bool) -> None:
try:
self._sar_save_png = bool(enabled)
logging.getLogger().info("VideoReceiverSFP: sar_save_png set to %s", self._sar_save_png)
except Exception:
pass