276 lines
10 KiB
Python
276 lines
10 KiB
Python
"""Payload decoder extracted from sfp_module._decode_leader_payload.
|
|
This module exposes `decode_leader_payload(module, leader, payload)` which
|
|
mirrors the original instance method but is callable as a helper to reduce
|
|
`SfpConnectorModule` size.
|
|
"""
|
|
from typing import Any
|
|
import logging
|
|
|
|
try:
|
|
import numpy as np
|
|
except Exception:
|
|
np = None
|
|
|
|
try:
|
|
from PIL import Image
|
|
except Exception:
|
|
Image = None
|
|
|
|
# local helpers likely available in the package
|
|
try:
|
|
from .mfd_palette import apply_mfd_lut
|
|
except Exception:
|
|
apply_mfd_lut = None
|
|
|
|
try:
|
|
from .image_processing import normalize_image
|
|
except Exception:
|
|
normalize_image = None
|
|
|
|
# ImageLeaderData may be provided by controlpanel or present in module
|
|
try:
|
|
from controlpanel.core.sfp_structures import ImageLeaderData as _ILD
|
|
except Exception:
|
|
_ILD = None
|
|
|
|
IMAGE_TYPE_MFD = 1
|
|
IMAGE_TYPE_SAR = 2
|
|
|
|
|
|
def decode_leader_payload(module: Any, leader, payload: bytes):
|
|
"""Decode payload given a parsed leader. Returns PIL Image / ndarray / None
|
|
The function mirrors the logic previously inside SfpConnectorModule._decode_leader_payload.
|
|
"""
|
|
if leader is None:
|
|
return None
|
|
# leader expected to be a ctypes-like structure with HEADER_DATA
|
|
header = leader.HEADER_DATA
|
|
try:
|
|
image_type_id = int(header.TYPE)
|
|
except Exception:
|
|
image_type_id = 0
|
|
# honor module filter if present
|
|
try:
|
|
if getattr(module, '_image_type_filter', None) is not None and image_type_id != module._image_type_filter:
|
|
logging.getLogger().debug("VideoReceiverSFP: skipping image_type=%s due to filter", image_type_id)
|
|
return None
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
width = int(header.DX)
|
|
height = int(header.DY)
|
|
stride = int(header.STRIDE)
|
|
bpp = int(header.BPP)
|
|
comp = int(header.COMP)
|
|
pal_type = int(getattr(header, 'PALTYPE', 0))
|
|
except Exception:
|
|
return None
|
|
|
|
# compute bytes_per_pixel
|
|
if bpp in (1, 2):
|
|
bytes_per_pixel = int(bpp)
|
|
else:
|
|
if bpp <= 8:
|
|
bytes_per_pixel = 1
|
|
elif bpp % 8 == 0:
|
|
bytes_per_pixel = bpp // 8
|
|
else:
|
|
bytes_per_pixel = max(1, (bpp + 7) // 8)
|
|
|
|
row_pixels = width * bytes_per_pixel
|
|
row_stride = stride if stride and stride > 0 else row_pixels
|
|
# pixel offset calculation
|
|
pixel_offset = 0
|
|
try:
|
|
if _ILD is not None:
|
|
pixel_offset = (
|
|
leader.HEADER_TAG.size()
|
|
+ leader.HEADER_DATA.size()
|
|
+ leader.GEO_TAG.size()
|
|
+ leader.GEO_DATA.size()
|
|
+ leader.RESERVED_TAG.size()
|
|
+ _ILD.get_reserved_data_size()
|
|
+ leader.CM_TAG.size()
|
|
+ _ILD.get_colour_map_size()
|
|
+ leader.PIXEL_TAG.size()
|
|
)
|
|
else:
|
|
pixel_offset = getattr(leader, 'size', lambda: 0)()
|
|
except Exception:
|
|
try:
|
|
pixel_offset = _ILD.size() if _ILD is not None else 0
|
|
except Exception:
|
|
pixel_offset = 0
|
|
|
|
row_stride_pixels = stride if stride and stride > 0 else width
|
|
row_stride_bytes = row_stride_pixels * bytes_per_pixel
|
|
total_bytes = row_stride_bytes * height
|
|
pixel_data_offset = pixel_offset
|
|
have_bytes = len(payload) - pixel_data_offset
|
|
if have_bytes < total_bytes:
|
|
logging.getLogger().warning(
|
|
"VideoReceiverSFP: truncated pixel payload (have=%d expected=%d)",
|
|
have_bytes,
|
|
total_bytes,
|
|
)
|
|
return None
|
|
|
|
# MFD indexed palette
|
|
if image_type_id == IMAGE_TYPE_MFD and pal_type == 1 and bytes_per_pixel == 1:
|
|
try:
|
|
mview = None
|
|
if np is not None:
|
|
mview = np.ndarray(
|
|
shape=(height, width),
|
|
dtype=np.uint8,
|
|
buffer=payload,
|
|
offset=pixel_data_offset,
|
|
strides=(row_stride_pixels * 1, 1),
|
|
)
|
|
if mview is None:
|
|
return None
|
|
indices = mview.copy()
|
|
except Exception:
|
|
logging.getLogger().exception("VideoReceiverSFP: failed to construct MFD index view")
|
|
return None
|
|
|
|
rgb = None
|
|
try:
|
|
# ensure LUT on module
|
|
try:
|
|
module._ensure_mfd_lut()
|
|
except Exception:
|
|
pass
|
|
if getattr(module, '_mfd_lut', None) is not None and apply_mfd_lut is not None:
|
|
rgb = apply_mfd_lut(indices, module._mfd_lut)
|
|
except Exception:
|
|
rgb = None
|
|
|
|
if rgb is not None:
|
|
try:
|
|
import numpy as _np
|
|
from PIL import Image as _PILImage, ImageOps as _ImageOps
|
|
rgb_arr = _np.asarray(rgb, dtype=_np.uint8)
|
|
mask = _np.any(rgb_arr != 0, axis=2)
|
|
pct_nonzero = float(_np.count_nonzero(mask)) / float(mask.size) if mask.size else 0.0
|
|
max_val = int(rgb_arr.max()) if rgb_arr.size else 0
|
|
if pct_nonzero < 0.10 or max_val < 220:
|
|
if _np.count_nonzero(mask) > 0 and max_val > 0:
|
|
scale = 255.0 / float(max_val)
|
|
rgb_arr = _np.clip(rgb_arr.astype(_np.float32) * scale, 0, 255).astype(_np.uint8)
|
|
try:
|
|
pil_tmp = _PILImage.fromarray(rgb_arr, mode='RGB')
|
|
pil_tmp = _ImageOps.autocontrast(pil_tmp)
|
|
rgb_arr = _np.array(pil_tmp)
|
|
except Exception:
|
|
pass
|
|
if Image is not None:
|
|
return Image.fromarray(rgb_arr, mode='RGB')
|
|
return rgb_arr
|
|
except Exception:
|
|
if Image is not None:
|
|
return Image.fromarray(rgb, mode='RGB')
|
|
return rgb
|
|
|
|
try:
|
|
try:
|
|
module._ensure_mfd_lut()
|
|
except Exception:
|
|
pass
|
|
if getattr(module, '_mfd_lut', None) is not None and apply_mfd_lut is not None:
|
|
rgb2 = apply_mfd_lut(indices, module._mfd_lut)
|
|
if rgb2 is not None:
|
|
try:
|
|
import numpy as _np
|
|
from PIL import Image as _PILImage, ImageOps as _ImageOps
|
|
rgb_arr = _np.asarray(rgb2, dtype=_np.uint8)
|
|
max_val = int(rgb_arr.max()) if rgb_arr.size else 0
|
|
if _np.count_nonzero(_np.any(rgb_arr != 0, axis=2)) > 0 and max_val > 0:
|
|
scale = 255.0 / float(max_val)
|
|
rgb_arr = _np.clip(rgb_arr.astype(_np.float32) * scale, 0, 255).astype(_np.uint8)
|
|
try:
|
|
pil_tmp = _PILImage.fromarray(rgb_arr, mode='RGB')
|
|
pil_tmp = _ImageOps.autocontrast(pil_tmp)
|
|
rgb_arr = _np.array(pil_tmp)
|
|
except Exception:
|
|
pass
|
|
return Image.fromarray(rgb_arr, mode='RGB')
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
if Image is not None:
|
|
return Image.fromarray(indices, mode='L').convert('RGB')
|
|
else:
|
|
return indices
|
|
|
|
# grayscale handling
|
|
if comp == 0:
|
|
if bytes_per_pixel == 1:
|
|
frame = module._build_grayscale_frame(width, height, row_stride_bytes, row_pixels, payload[pixel_data_offset:pixel_data_offset + total_bytes])
|
|
if frame is not None:
|
|
logging.getLogger().info(
|
|
"VideoReceiverSFP: decoded %s frame %dx%d bytes_per_pixel=%d stride=%d",
|
|
"MFD" if image_type_id == IMAGE_TYPE_MFD else ("SAR" if image_type_id == IMAGE_TYPE_SAR else str(image_type_id)),
|
|
width,
|
|
height,
|
|
bytes_per_pixel,
|
|
row_stride_bytes,
|
|
)
|
|
return frame
|
|
|
|
if bytes_per_pixel == 2:
|
|
try:
|
|
import numpy as _np
|
|
if _np is None:
|
|
return None
|
|
raw_view = _np.ndarray(
|
|
shape=(height, width),
|
|
dtype=_np.uint16,
|
|
buffer=payload,
|
|
offset=pixel_data_offset,
|
|
strides=(row_stride_bytes, 2),
|
|
)
|
|
arr = raw_view.copy()
|
|
norm = None
|
|
try:
|
|
if normalize_image is not None:
|
|
norm = normalize_image(arr, target_type=_np.uint8)
|
|
except Exception:
|
|
logging.getLogger().exception("VideoReceiverSFP: normalization of uint16 SAR failed")
|
|
if norm is None:
|
|
try:
|
|
norm = (_np.right_shift(arr, 8)).astype(_np.uint8)
|
|
except Exception:
|
|
logging.getLogger().exception("VideoReceiverSFP: fallback uint16->uint8 conversion failed")
|
|
return None
|
|
try:
|
|
from PIL import Image as _PILImage
|
|
pil_img = _PILImage.fromarray(norm, mode='L').convert('RGB')
|
|
logging.getLogger().info(
|
|
"VideoReceiverSFP: decoded %s frame %dx%d bytes_per_pixel=%d stride=%d (uint16->uint8)",
|
|
"MFD" if image_type_id == IMAGE_TYPE_MFD else ("SAR" if image_type_id == IMAGE_TYPE_SAR else str(image_type_id)),
|
|
width,
|
|
height,
|
|
bytes_per_pixel,
|
|
row_stride_bytes,
|
|
)
|
|
return pil_img
|
|
except Exception:
|
|
logging.getLogger().exception("VideoReceiverSFP: failed to build PIL image from normalized SAR data")
|
|
return norm
|
|
except Exception:
|
|
logging.getLogger().exception("VideoReceiverSFP: error decoding uint16 SAR")
|
|
return None
|
|
|
|
logging.getLogger().debug(
|
|
"VideoReceiverSFP: unsupported comp=%s bpp=%s bytes_per_pixel=%s for type=%s",
|
|
comp,
|
|
bpp,
|
|
bytes_per_pixel,
|
|
image_type_id,
|
|
)
|
|
return None
|