SXXXXXXX_ControlPanel/VideoReceiverSFP/core/payload_preview.py
2026-01-16 10:09:51 +01:00

135 lines
5.9 KiB
Python

"""Helpers to save payload preview images (PIL or array-based) for VideoReceiverSFP.
This centralizes preview-saving logic so `sfp_module` can be slimmer.
"""
import os
import logging
try:
import numpy as np
except Exception:
np = None
try:
from PIL import Image
except Exception:
Image = None
logger = logging.getLogger("VideoReceiverSFP.payload_preview")
def save_payload_preview(module, leader, payload, ity, dbgdir, ts):
"""Save a preview image for the given payload using module's helpers.
Args:
module: the SfpConnectorModule instance (used for _decode_leader_payload and DumpManager)
leader: the parsed ImageLeaderData structure
payload: raw payload bytes
ity: integer image type
dbgdir: dumps directory path
ts: timestamp string
"""
try:
preview_frame = None
try:
preview_frame = module._decode_leader_payload(leader, payload)
except Exception:
preview_frame = None
# If we obtained a PIL Image, save via DumpManager or fallback
if preview_frame is not None and Image is not None and hasattr(preview_frame, 'save'):
base_name = 'mfd' if ity == 1 else ('sar' if ity == 2 else 'unknown')
fmt = 'bmp'
if getattr(module, '_dump_manager', None) is not None:
try:
saved = module._dump_manager.save_preview_from_pil(preview_frame, category=base_name, fmt=fmt)
if saved:
logging.getLogger().info('VideoReceiverSFP: saved payload preview %s', saved)
return
except Exception:
logger.exception('save_payload_preview: DumpManager save_preview_from_pil failed')
try:
ts_name = ts + (f"_{int(__import__('time').time() * 1000) % 1000:03d}")
outpath = os.path.join(dbgdir, f'VideoReceiverSFP_{base_name}_{ts_name}.bmp')
preview_frame.save(outpath)
logging.getLogger().info('VideoReceiverSFP: saved payload preview %s', outpath)
return
except Exception:
logger.exception('save_payload_preview: failed saving preview (fallback)')
# Fallback: array-based approach
try:
hd = leader.HEADER_DATA
dx = int(hd.DX)
dy = int(hd.DY)
bpp = int(hd.BPP)
stride_pixels = int(hd.STRIDE) if int(hd.STRIDE) > 0 else dx
bytes_per_pixel = 1 if bpp <= 8 else (bpp // 8)
try:
pixel_offset = (
leader.HEADER_TAG.size()
+ leader.HEADER_DATA.size()
+ leader.GEO_TAG.size()
+ leader.GEO_DATA.size()
+ leader.RESERVED_TAG.size()
+ getattr(leader, 'get_reserved_data_size', lambda: 0)()
+ leader.CM_TAG.size()
+ getattr(leader, 'get_colour_map_size', lambda: 0)()
+ leader.PIXEL_TAG.size()
)
except Exception:
try:
# as last resort, use ImageLeaderData.size() if available on module
from .sfp_module import ImageLeaderData as _ILD
pixel_offset = _ILD.size()
except Exception:
pixel_offset = 0
row_stride_bytes = stride_pixels * bytes_per_pixel
total_bytes = row_stride_bytes * dy
pixel_data = payload[pixel_offset:pixel_offset + total_bytes]
if len(pixel_data) >= total_bytes and np is not None:
if bytes_per_pixel == 2:
arr = np.frombuffer(pixel_data[:total_bytes], dtype=np.uint16)
arr = arr.reshape((dy, stride_pixels))[:, :dx]
try:
norm = module._normalize_and_convert(arr) if hasattr(module, '_normalize_and_convert') else None
except Exception:
norm = None
if norm is None:
norm = (arr >> 8).astype(np.uint8)
else:
arr = np.frombuffer(pixel_data[:total_bytes], dtype=np.uint8)
arr = arr.reshape((dy, stride_pixels))[:, :dx]
try:
norm = module._normalize_and_convert(arr) if hasattr(module, '_normalize_and_convert') else None
except Exception:
norm = None
if norm is None:
norm = arr.astype(np.uint8)
try:
base_name = 'mfd' if ity == 1 else ('sar' if ity == 2 else 'unknown')
fmt = 'bmp'
if getattr(module, '_dump_manager', None) is not None:
saved = module._dump_manager.save_preview_from_array(norm, category=base_name, fmt=fmt)
if saved:
logging.getLogger().info('VideoReceiverSFP: saved payload preview %s', saved)
return
else:
from PIL import Image as _PILImage
ts_name = ts + (f"_{int(__import__('time').time() * 1000) % 1000:03d}")
outpath = os.path.join(dbgdir, f'VideoReceiverSFP_{base_name}_{ts_name}.bmp')
pil_img = _PILImage.fromarray(norm, mode='L')
pil_img.save(outpath)
logging.getLogger().info('VideoReceiverSFP: saved payload preview %s', outpath)
return
except Exception:
logger.exception('save_payload_preview: failed saving payload preview (array fallback)')
except Exception:
logger.exception('save_payload_preview: error creating payload preview (array fallback)')
except Exception:
logger.exception('save_payload_preview: unexpected error')