141 lines
6.4 KiB
Python
141 lines
6.4 KiB
Python
"""Helpers to save payload preview images (PIL or array-based) for VideoReceiverSFP.
|
|
This centralizes preview-saving logic so `sfp_module` can be slimmer.
|
|
"""
|
|
import os
|
|
import logging
|
|
|
|
try:
|
|
import numpy as np
|
|
except Exception:
|
|
np = None
|
|
|
|
try:
|
|
from PIL import Image
|
|
except Exception:
|
|
Image = None
|
|
|
|
logger = logging.getLogger("VideoReceiverSFP.payload_preview")
|
|
|
|
|
|
def save_payload_preview(module, leader, payload, ity, dbgdir, ts):
|
|
"""Save a preview image for the given payload using module's helpers.
|
|
|
|
Args:
|
|
module: the SfpConnectorModule instance (used for _decode_leader_payload and DumpManager)
|
|
leader: the parsed ImageLeaderData structure
|
|
payload: raw payload bytes
|
|
ity: integer image type
|
|
dbgdir: dumps directory path
|
|
ts: timestamp string
|
|
"""
|
|
try:
|
|
preview_frame = None
|
|
try:
|
|
preview_frame = module._decode_leader_payload(leader, payload)
|
|
except Exception:
|
|
preview_frame = None
|
|
|
|
# If we obtained a PIL Image, save via DumpManager or fallback
|
|
if preview_frame is not None and Image is not None and hasattr(preview_frame, 'save'):
|
|
base_name = 'mfd' if ity == 1 else ('sar' if ity == 2 else 'unknown')
|
|
fmt = 'bmp'
|
|
if getattr(module, '_dump_manager', None) is not None:
|
|
try:
|
|
saved = module._dump_manager.save_preview_from_pil(preview_frame, category=base_name, fmt=fmt)
|
|
if saved:
|
|
logging.getLogger().info('VideoReceiverSFP: saved payload preview %s', saved)
|
|
return
|
|
except Exception:
|
|
logger.exception('save_payload_preview: DumpManager save_preview_from_pil failed')
|
|
try:
|
|
ts_name = ts + (f"_{int(__import__('time').time() * 1000) % 1000:03d}")
|
|
outpath = os.path.join(dbgdir, f'VideoReceiverSFP_{base_name}_{ts_name}.bmp')
|
|
preview_frame.save(outpath)
|
|
logging.getLogger().info('VideoReceiverSFP: saved payload preview %s', outpath)
|
|
return
|
|
except Exception:
|
|
logger.exception('save_payload_preview: failed saving preview (fallback)')
|
|
|
|
# Fallback: array-based approach
|
|
try:
|
|
hd = leader.HEADER_DATA
|
|
dx = int(hd.DX)
|
|
dy = int(hd.DY)
|
|
bpp = int(hd.BPP)
|
|
stride_pixels = int(hd.STRIDE) if int(hd.STRIDE) > 0 else dx
|
|
bytes_per_pixel = 1 if bpp <= 8 else (bpp // 8)
|
|
|
|
try:
|
|
pixel_offset = (
|
|
leader.HEADER_TAG.size()
|
|
+ leader.HEADER_DATA.size()
|
|
+ leader.GEO_TAG.size()
|
|
+ leader.GEO_DATA.size()
|
|
+ leader.RESERVED_TAG.size()
|
|
+ getattr(leader, 'get_reserved_data_size', lambda: 0)()
|
|
+ leader.CM_TAG.size()
|
|
+ getattr(leader, 'get_colour_map_size', lambda: 0)()
|
|
+ leader.PIXEL_TAG.size()
|
|
)
|
|
except Exception:
|
|
try:
|
|
# as last resort, use ImageLeaderData.size() if available on module
|
|
from .sfp_module import ImageLeaderData as _ILD
|
|
pixel_offset = _ILD.size()
|
|
except Exception:
|
|
pixel_offset = 0
|
|
|
|
row_stride_bytes = stride_pixels * bytes_per_pixel
|
|
total_bytes = row_stride_bytes * dy
|
|
pixel_data = payload[pixel_offset:pixel_offset + total_bytes]
|
|
|
|
if len(pixel_data) >= total_bytes and np is not None:
|
|
if bytes_per_pixel == 2:
|
|
arr = np.frombuffer(pixel_data[:total_bytes], dtype=np.uint16)
|
|
arr = arr.reshape((dy, stride_pixels))[:, :dx]
|
|
try:
|
|
norm = module._normalize_and_convert(arr) if hasattr(module, '_normalize_and_convert') else None
|
|
except Exception:
|
|
norm = None
|
|
if norm is None:
|
|
norm = (arr >> 8).astype(np.uint8)
|
|
else:
|
|
arr = np.frombuffer(pixel_data[:total_bytes], dtype=np.uint8)
|
|
arr = arr.reshape((dy, stride_pixels))[:, :dx]
|
|
try:
|
|
norm = module._normalize_and_convert(arr) if hasattr(module, '_normalize_and_convert') else None
|
|
except Exception:
|
|
norm = None
|
|
if norm is None:
|
|
norm = arr.astype(np.uint8)
|
|
|
|
try:
|
|
base_name = 'mfd' if ity == 1 else ('sar' if ity == 2 else 'unknown')
|
|
fmt = 'bmp'
|
|
if getattr(module, '_dump_manager', None) is not None:
|
|
try:
|
|
# Convert numpy array to PIL Image and use existing DumpManager API
|
|
from PIL import Image as _PILImage
|
|
pil_img = _PILImage.fromarray(norm, mode='L')
|
|
saved = module._dump_manager.save_preview_from_pil(pil_img, category=base_name, fmt=fmt)
|
|
if saved:
|
|
logging.getLogger().info('VideoReceiverSFP: saved payload preview %s', saved)
|
|
return
|
|
except Exception:
|
|
logger.exception('save_payload_preview: DumpManager save_preview_from_pil failed for array input')
|
|
else:
|
|
from PIL import Image as _PILImage
|
|
ts_name = ts + (f"_{int(__import__('time').time() * 1000) % 1000:03d}")
|
|
outpath = os.path.join(dbgdir, f'VideoReceiverSFP_{base_name}_{ts_name}.bmp')
|
|
pil_img = _PILImage.fromarray(norm, mode='L')
|
|
pil_img.save(outpath)
|
|
logging.getLogger().info('VideoReceiverSFP: saved payload preview %s', outpath)
|
|
return
|
|
except Exception:
|
|
logger.exception('save_payload_preview: failed saving payload preview (array fallback)')
|
|
except Exception:
|
|
logger.exception('save_payload_preview: error creating payload preview (array fallback)')
|
|
|
|
except Exception:
|
|
logger.exception('save_payload_preview: unexpected error')
|