SXXXXXXX_ControlPanel/VideoReceiverSFP/core/payload_decoder.py
2026-01-16 10:09:51 +01:00

276 lines
10 KiB
Python

"""Payload decoder extracted from sfp_module._decode_leader_payload.
This module exposes `decode_leader_payload(module, leader, payload)` which
mirrors the original instance method but is callable as a helper to reduce
`SfpConnectorModule` size.
"""
from typing import Any
import logging
try:
import numpy as np
except Exception:
np = None
try:
from PIL import Image
except Exception:
Image = None
# local helpers likely available in the package
try:
from .mfd_palette import apply_mfd_lut
except Exception:
apply_mfd_lut = None
try:
from .image_processing import normalize_image
except Exception:
normalize_image = None
# ImageLeaderData may be provided by controlpanel or present in module
try:
from controlpanel.core.sfp_structures import ImageLeaderData as _ILD
except Exception:
_ILD = None
IMAGE_TYPE_MFD = 1
IMAGE_TYPE_SAR = 2
def decode_leader_payload(module: Any, leader, payload: bytes):
"""Decode payload given a parsed leader. Returns PIL Image / ndarray / None
The function mirrors the logic previously inside SfpConnectorModule._decode_leader_payload.
"""
if leader is None:
return None
# leader expected to be a ctypes-like structure with HEADER_DATA
header = leader.HEADER_DATA
try:
image_type_id = int(header.TYPE)
except Exception:
image_type_id = 0
# honor module filter if present
try:
if getattr(module, '_image_type_filter', None) is not None and image_type_id != module._image_type_filter:
logging.getLogger().debug("VideoReceiverSFP: skipping image_type=%s due to filter", image_type_id)
return None
except Exception:
pass
try:
width = int(header.DX)
height = int(header.DY)
stride = int(header.STRIDE)
bpp = int(header.BPP)
comp = int(header.COMP)
pal_type = int(getattr(header, 'PALTYPE', 0))
except Exception:
return None
# compute bytes_per_pixel
if bpp in (1, 2):
bytes_per_pixel = int(bpp)
else:
if bpp <= 8:
bytes_per_pixel = 1
elif bpp % 8 == 0:
bytes_per_pixel = bpp // 8
else:
bytes_per_pixel = max(1, (bpp + 7) // 8)
row_pixels = width * bytes_per_pixel
row_stride = stride if stride and stride > 0 else row_pixels
# pixel offset calculation
pixel_offset = 0
try:
if _ILD is not None:
pixel_offset = (
leader.HEADER_TAG.size()
+ leader.HEADER_DATA.size()
+ leader.GEO_TAG.size()
+ leader.GEO_DATA.size()
+ leader.RESERVED_TAG.size()
+ _ILD.get_reserved_data_size()
+ leader.CM_TAG.size()
+ _ILD.get_colour_map_size()
+ leader.PIXEL_TAG.size()
)
else:
pixel_offset = getattr(leader, 'size', lambda: 0)()
except Exception:
try:
pixel_offset = _ILD.size() if _ILD is not None else 0
except Exception:
pixel_offset = 0
row_stride_pixels = stride if stride and stride > 0 else width
row_stride_bytes = row_stride_pixels * bytes_per_pixel
total_bytes = row_stride_bytes * height
pixel_data_offset = pixel_offset
have_bytes = len(payload) - pixel_data_offset
if have_bytes < total_bytes:
logging.getLogger().warning(
"VideoReceiverSFP: truncated pixel payload (have=%d expected=%d)",
have_bytes,
total_bytes,
)
return None
# MFD indexed palette
if image_type_id == IMAGE_TYPE_MFD and pal_type == 1 and bytes_per_pixel == 1:
try:
mview = None
if np is not None:
mview = np.ndarray(
shape=(height, width),
dtype=np.uint8,
buffer=payload,
offset=pixel_data_offset,
strides=(row_stride_pixels * 1, 1),
)
if mview is None:
return None
indices = mview.copy()
except Exception:
logging.getLogger().exception("VideoReceiverSFP: failed to construct MFD index view")
return None
rgb = None
try:
# ensure LUT on module
try:
module._ensure_mfd_lut()
except Exception:
pass
if getattr(module, '_mfd_lut', None) is not None and apply_mfd_lut is not None:
rgb = apply_mfd_lut(indices, module._mfd_lut)
except Exception:
rgb = None
if rgb is not None:
try:
import numpy as _np
from PIL import Image as _PILImage, ImageOps as _ImageOps
rgb_arr = _np.asarray(rgb, dtype=_np.uint8)
mask = _np.any(rgb_arr != 0, axis=2)
pct_nonzero = float(_np.count_nonzero(mask)) / float(mask.size) if mask.size else 0.0
max_val = int(rgb_arr.max()) if rgb_arr.size else 0
if pct_nonzero < 0.10 or max_val < 220:
if _np.count_nonzero(mask) > 0 and max_val > 0:
scale = 255.0 / float(max_val)
rgb_arr = _np.clip(rgb_arr.astype(_np.float32) * scale, 0, 255).astype(_np.uint8)
try:
pil_tmp = _PILImage.fromarray(rgb_arr, mode='RGB')
pil_tmp = _ImageOps.autocontrast(pil_tmp)
rgb_arr = _np.array(pil_tmp)
except Exception:
pass
if Image is not None:
return Image.fromarray(rgb_arr, mode='RGB')
return rgb_arr
except Exception:
if Image is not None:
return Image.fromarray(rgb, mode='RGB')
return rgb
try:
try:
module._ensure_mfd_lut()
except Exception:
pass
if getattr(module, '_mfd_lut', None) is not None and apply_mfd_lut is not None:
rgb2 = apply_mfd_lut(indices, module._mfd_lut)
if rgb2 is not None:
try:
import numpy as _np
from PIL import Image as _PILImage, ImageOps as _ImageOps
rgb_arr = _np.asarray(rgb2, dtype=_np.uint8)
max_val = int(rgb_arr.max()) if rgb_arr.size else 0
if _np.count_nonzero(_np.any(rgb_arr != 0, axis=2)) > 0 and max_val > 0:
scale = 255.0 / float(max_val)
rgb_arr = _np.clip(rgb_arr.astype(_np.float32) * scale, 0, 255).astype(_np.uint8)
try:
pil_tmp = _PILImage.fromarray(rgb_arr, mode='RGB')
pil_tmp = _ImageOps.autocontrast(pil_tmp)
rgb_arr = _np.array(pil_tmp)
except Exception:
pass
return Image.fromarray(rgb_arr, mode='RGB')
except Exception:
pass
except Exception:
pass
if Image is not None:
return Image.fromarray(indices, mode='L').convert('RGB')
else:
return indices
# grayscale handling
if comp == 0:
if bytes_per_pixel == 1:
frame = module._build_grayscale_frame(width, height, row_stride_bytes, row_pixels, payload[pixel_data_offset:pixel_data_offset + total_bytes])
if frame is not None:
logging.getLogger().info(
"VideoReceiverSFP: decoded %s frame %dx%d bytes_per_pixel=%d stride=%d",
"MFD" if image_type_id == IMAGE_TYPE_MFD else ("SAR" if image_type_id == IMAGE_TYPE_SAR else str(image_type_id)),
width,
height,
bytes_per_pixel,
row_stride_bytes,
)
return frame
if bytes_per_pixel == 2:
try:
import numpy as _np
if _np is None:
return None
raw_view = _np.ndarray(
shape=(height, width),
dtype=_np.uint16,
buffer=payload,
offset=pixel_data_offset,
strides=(row_stride_bytes, 2),
)
arr = raw_view.copy()
norm = None
try:
if normalize_image is not None:
norm = normalize_image(arr, target_type=_np.uint8)
except Exception:
logging.getLogger().exception("VideoReceiverSFP: normalization of uint16 SAR failed")
if norm is None:
try:
norm = (_np.right_shift(arr, 8)).astype(_np.uint8)
except Exception:
logging.getLogger().exception("VideoReceiverSFP: fallback uint16->uint8 conversion failed")
return None
try:
from PIL import Image as _PILImage
pil_img = _PILImage.fromarray(norm, mode='L').convert('RGB')
logging.getLogger().info(
"VideoReceiverSFP: decoded %s frame %dx%d bytes_per_pixel=%d stride=%d (uint16->uint8)",
"MFD" if image_type_id == IMAGE_TYPE_MFD else ("SAR" if image_type_id == IMAGE_TYPE_SAR else str(image_type_id)),
width,
height,
bytes_per_pixel,
row_stride_bytes,
)
return pil_img
except Exception:
logging.getLogger().exception("VideoReceiverSFP: failed to build PIL image from normalized SAR data")
return norm
except Exception:
logging.getLogger().exception("VideoReceiverSFP: error decoding uint16 SAR")
return None
logging.getLogger().debug(
"VideoReceiverSFP: unsupported comp=%s bpp=%s bytes_per_pixel=%s for type=%s",
comp,
bpp,
bytes_per_pixel,
image_type_id,
)
return None