107 lines
3.7 KiB
Python
107 lines
3.7 KiB
Python
"""Small collection of image helper functions factored out of sfp_module.
|
|
These are intentionally lightweight wrappers that avoid depending on the
|
|
SfpConnectorModule instance state; callers should pass necessary parameters
|
|
(e.g. normalize method) when required.
|
|
"""
|
|
from typing import Optional
|
|
import os
|
|
import logging
|
|
|
|
try:
|
|
import numpy as np
|
|
except Exception: # pragma: no cover - best-effort
|
|
np = None
|
|
|
|
try:
|
|
from PIL import Image, ImageOps
|
|
except Exception:
|
|
Image = None
|
|
ImageOps = None
|
|
|
|
# Try to import the local normalize_image if available
|
|
try:
|
|
from .image_processing import normalize_image
|
|
except Exception:
|
|
normalize_image = None
|
|
|
|
logger = logging.getLogger("VideoReceiverSFP.image_helpers")
|
|
|
|
|
|
def looks_like_encoded_image(payload: bytes) -> bool:
|
|
if not payload:
|
|
return False
|
|
if payload.startswith(b"\x89PNG\r\n\x1a\n"):
|
|
return True
|
|
if payload.startswith(b"\xff\xd8\xff"):
|
|
return True
|
|
if payload.startswith(b"BM"):
|
|
return True
|
|
return False
|
|
|
|
|
|
def extract_index_array(width: int, height: int, row_stride: int, pixel_data: bytes):
|
|
if np is None:
|
|
return None
|
|
total_bytes = row_stride * height
|
|
if len(pixel_data) < total_bytes:
|
|
logger.warning("extract_index_array: pixel_data shorter than expected (got=%d need=%d)", len(pixel_data), total_bytes)
|
|
return None
|
|
try:
|
|
arr = np.frombuffer(pixel_data[:total_bytes], dtype=np.uint8)
|
|
row_pixels = width
|
|
if row_stride == row_pixels:
|
|
arr = arr.reshape((height, width))
|
|
else:
|
|
arr = arr.reshape((height, row_stride))[:, :width]
|
|
return arr.copy()
|
|
except Exception:
|
|
logger.exception("extract_index_array: failed to reshape pixel data")
|
|
return None
|
|
|
|
|
|
def build_grayscale_frame(width, height, row_stride, row_pixels, pixel_data, normalize_method: Optional[str] = None):
|
|
if Image is None:
|
|
return None
|
|
try:
|
|
if row_stride == row_pixels:
|
|
img = Image.frombytes("L", (width, height), pixel_data)
|
|
else:
|
|
rows = []
|
|
for r in range(height):
|
|
start = r * row_stride
|
|
rows.append(pixel_data[start:start + row_pixels])
|
|
img = Image.frombytes("L", (width, height), b"".join(rows))
|
|
|
|
normalized_img = None
|
|
if normalize_method == 'cp' and normalize_image is not None:
|
|
try:
|
|
if np is not None:
|
|
arr = np.frombuffer(img.tobytes(), dtype=np.uint8).reshape((height, width))
|
|
norm = normalize_image(arr, target_type=np.uint8)
|
|
if norm is not None:
|
|
normalized_img = Image.fromarray(norm, mode='L')
|
|
except Exception:
|
|
logger.exception("build_grayscale_frame: controlpanel normalize_image failed")
|
|
elif normalize_method == 'autocontrast':
|
|
try:
|
|
if ImageOps is not None:
|
|
normalized_img = ImageOps.autocontrast(img)
|
|
except Exception:
|
|
normalized_img = None
|
|
|
|
if normalized_img is not None:
|
|
frame = normalized_img.convert("RGB")
|
|
try:
|
|
dumps_dir = os.path.join(os.getcwd(), "dumps")
|
|
os.makedirs(dumps_dir, exist_ok=True)
|
|
dbg_path = os.path.join(dumps_dir, "VideoReceiverSFP_last_normalized.png")
|
|
frame.save(dbg_path)
|
|
logger.info("build_grayscale_frame: saved normalized debug frame to %s", dbg_path)
|
|
except Exception:
|
|
pass
|
|
return frame
|
|
return img.convert("RGB")
|
|
except Exception:
|
|
logger.exception("build_grayscale_frame: failed to build grayscale frame")
|
|
return None
|