"""Payload decoder extracted from sfp_module._decode_leader_payload. This module exposes `decode_leader_payload(module, leader, payload)` which mirrors the original instance method but is callable as a helper to reduce `SfpConnectorModule` size. """ from typing import Any import logging try: import numpy as np except Exception: np = None try: from PIL import Image except Exception: Image = None # local helpers likely available in the package try: from .mfd_palette import apply_mfd_lut except Exception: apply_mfd_lut = None try: from .image_processing import normalize_image except Exception: normalize_image = None # ImageLeaderData may be provided by controlpanel or present in module try: from controlpanel.core.sfp_structures import ImageLeaderData as _ILD except Exception: _ILD = None IMAGE_TYPE_MFD = 1 IMAGE_TYPE_SAR = 2 def decode_leader_payload(module: Any, leader, payload: bytes): """Decode payload given a parsed leader. Returns PIL Image / ndarray / None The function mirrors the logic previously inside SfpConnectorModule._decode_leader_payload. """ if leader is None: return None # leader expected to be a ctypes-like structure with HEADER_DATA header = leader.HEADER_DATA try: image_type_id = int(header.TYPE) except Exception: image_type_id = 0 # honor module filter if present try: if getattr(module, '_image_type_filter', None) is not None and image_type_id != module._image_type_filter: logging.getLogger().debug("VideoReceiverSFP: skipping image_type=%s due to filter", image_type_id) return None except Exception: pass try: width = int(header.DX) height = int(header.DY) stride = int(header.STRIDE) bpp = int(header.BPP) comp = int(header.COMP) pal_type = int(getattr(header, 'PALTYPE', 0)) except Exception: return None # compute bytes_per_pixel if bpp in (1, 2): bytes_per_pixel = int(bpp) else: if bpp <= 8: bytes_per_pixel = 1 elif bpp % 8 == 0: bytes_per_pixel = bpp // 8 else: bytes_per_pixel = max(1, (bpp + 7) // 8) row_pixels = width * bytes_per_pixel row_stride = stride if stride and stride > 0 else row_pixels # pixel offset calculation pixel_offset = 0 try: if _ILD is not None: pixel_offset = ( leader.HEADER_TAG.size() + leader.HEADER_DATA.size() + leader.GEO_TAG.size() + leader.GEO_DATA.size() + leader.RESERVED_TAG.size() + _ILD.get_reserved_data_size() + leader.CM_TAG.size() + _ILD.get_colour_map_size() + leader.PIXEL_TAG.size() ) else: pixel_offset = getattr(leader, 'size', lambda: 0)() except Exception: try: pixel_offset = _ILD.size() if _ILD is not None else 0 except Exception: pixel_offset = 0 row_stride_pixels = stride if stride and stride > 0 else width row_stride_bytes = row_stride_pixels * bytes_per_pixel total_bytes = row_stride_bytes * height pixel_data_offset = pixel_offset have_bytes = len(payload) - pixel_data_offset if have_bytes < total_bytes: logging.getLogger().warning( "VideoReceiverSFP: truncated pixel payload (have=%d expected=%d)", have_bytes, total_bytes, ) return None # MFD indexed palette if image_type_id == IMAGE_TYPE_MFD and pal_type == 1 and bytes_per_pixel == 1: try: mview = None if np is not None: mview = np.ndarray( shape=(height, width), dtype=np.uint8, buffer=payload, offset=pixel_data_offset, strides=(row_stride_pixels * 1, 1), ) if mview is None: return None indices = mview.copy() except Exception: logging.getLogger().exception("VideoReceiverSFP: failed to construct MFD index view") return None rgb = None try: # ensure LUT on module try: module._ensure_mfd_lut() except Exception: pass if getattr(module, '_mfd_lut', None) is not None and apply_mfd_lut is not None: rgb = apply_mfd_lut(indices, module._mfd_lut) except Exception: rgb = None if rgb is not None: try: import numpy as _np from PIL import Image as _PILImage, ImageOps as _ImageOps rgb_arr = _np.asarray(rgb, dtype=_np.uint8) mask = _np.any(rgb_arr != 0, axis=2) pct_nonzero = float(_np.count_nonzero(mask)) / float(mask.size) if mask.size else 0.0 max_val = int(rgb_arr.max()) if rgb_arr.size else 0 if pct_nonzero < 0.10 or max_val < 220: if _np.count_nonzero(mask) > 0 and max_val > 0: scale = 255.0 / float(max_val) rgb_arr = _np.clip(rgb_arr.astype(_np.float32) * scale, 0, 255).astype(_np.uint8) try: pil_tmp = _PILImage.fromarray(rgb_arr, mode='RGB') pil_tmp = _ImageOps.autocontrast(pil_tmp) rgb_arr = _np.array(pil_tmp) except Exception: pass if Image is not None: return Image.fromarray(rgb_arr, mode='RGB') return rgb_arr except Exception: if Image is not None: return Image.fromarray(rgb, mode='RGB') return rgb try: try: module._ensure_mfd_lut() except Exception: pass if getattr(module, '_mfd_lut', None) is not None and apply_mfd_lut is not None: rgb2 = apply_mfd_lut(indices, module._mfd_lut) if rgb2 is not None: try: import numpy as _np from PIL import Image as _PILImage, ImageOps as _ImageOps rgb_arr = _np.asarray(rgb2, dtype=_np.uint8) max_val = int(rgb_arr.max()) if rgb_arr.size else 0 if _np.count_nonzero(_np.any(rgb_arr != 0, axis=2)) > 0 and max_val > 0: scale = 255.0 / float(max_val) rgb_arr = _np.clip(rgb_arr.astype(_np.float32) * scale, 0, 255).astype(_np.uint8) try: pil_tmp = _PILImage.fromarray(rgb_arr, mode='RGB') pil_tmp = _ImageOps.autocontrast(pil_tmp) rgb_arr = _np.array(pil_tmp) except Exception: pass return Image.fromarray(rgb_arr, mode='RGB') except Exception: pass except Exception: pass if Image is not None: return Image.fromarray(indices, mode='L').convert('RGB') else: return indices # grayscale handling if comp == 0: if bytes_per_pixel == 1: frame = module._build_grayscale_frame(width, height, row_stride_bytes, row_pixels, payload[pixel_data_offset:pixel_data_offset + total_bytes]) if frame is not None: logging.getLogger().info( "VideoReceiverSFP: decoded %s frame %dx%d bytes_per_pixel=%d stride=%d", "MFD" if image_type_id == IMAGE_TYPE_MFD else ("SAR" if image_type_id == IMAGE_TYPE_SAR else str(image_type_id)), width, height, bytes_per_pixel, row_stride_bytes, ) return frame if bytes_per_pixel == 2: try: import numpy as _np if _np is None: return None raw_view = _np.ndarray( shape=(height, width), dtype=_np.uint16, buffer=payload, offset=pixel_data_offset, strides=(row_stride_bytes, 2), ) arr = raw_view.copy() norm = None try: if normalize_image is not None: norm = normalize_image(arr, target_type=_np.uint8) except Exception: logging.getLogger().exception("VideoReceiverSFP: normalization of uint16 SAR failed") if norm is None: try: norm = (_np.right_shift(arr, 8)).astype(_np.uint8) except Exception: logging.getLogger().exception("VideoReceiverSFP: fallback uint16->uint8 conversion failed") return None try: from PIL import Image as _PILImage pil_img = _PILImage.fromarray(norm, mode='L').convert('RGB') logging.getLogger().info( "VideoReceiverSFP: decoded %s frame %dx%d bytes_per_pixel=%d stride=%d (uint16->uint8)", "MFD" if image_type_id == IMAGE_TYPE_MFD else ("SAR" if image_type_id == IMAGE_TYPE_SAR else str(image_type_id)), width, height, bytes_per_pixel, row_stride_bytes, ) return pil_img except Exception: logging.getLogger().exception("VideoReceiverSFP: failed to build PIL image from normalized SAR data") return norm except Exception: logging.getLogger().exception("VideoReceiverSFP: error decoding uint16 SAR") return None logging.getLogger().debug( "VideoReceiverSFP: unsupported comp=%s bpp=%s bytes_per_pixel=%s for type=%s", comp, bpp, bytes_per_pixel, image_type_id, ) return None