SXXXXXXX_ControlPanel/image_pipeline.py
2025-04-08 07:53:55 +02:00

311 lines
13 KiB
Python

# --- START OF FILE image_pipeline.py ---
# image_pipeline.py
"""
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
Handles the image processing pipeline for normal mode operation.
Reads current image data (SAR normalized, MFD indices) and parameters (LUTs,
palette, rotation, size) from AppState, performs the necessary processing steps
(LUT application, colorization, rotation, resizing), and queues the resulting
images for display by the DisplayManager.
"""
# Standard library imports
import logging
import queue # For type hinting
import math
from typing import Optional
# Third-party imports
import numpy as np
import cv2
# Local application imports
from app_state import AppState
from utils import put_queue
from image_processing import (
apply_color_palette, # Keep specific imports needed
resize_image,
# normalize_image is used by receiver/test manager, not directly here
)
# Forward declaration for type hinting App instance
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from app import App
class ImagePipeline:
"""Processes SAR and MFD images based on AppState for display."""
def __init__(
self,
app_state: AppState,
sar_queue: queue.Queue,
mfd_queue: queue.Queue,
app: 'App', # Use forward declaration for App type hint
):
"""
Initializes the ImagePipeline.
Args:
app_state (AppState): Reference to the shared application state.
sar_queue (queue.Queue): Queue for sending processed SAR images for display.
mfd_queue (queue.Queue): Queue for sending processed MFD images for display.
app (App): Reference to the main application instance (for put_queue context).
"""
self._log_prefix = "[ImagePipeline]"
logging.debug(f"{self._log_prefix} Initializing...")
self._app_state: AppState = app_state
self._sar_queue: queue.Queue = sar_queue
self._mfd_queue: queue.Queue = mfd_queue
self._app: 'App' = app # Store app reference
logging.debug(f"{self._log_prefix} Initialization complete.")
def process_sar_for_display(self):
"""
Processes the current SAR image from AppState (normalization, LUT, palette,
rotation, resize) and queues it for display.
Checks shutdown and test mode flags from AppState.
"""
log_prefix = f"{self._log_prefix} SAR Process"
# Check flags first
if self._app_state.shutting_down:
logging.debug(f"{log_prefix} Skipping processing: Shutdown detected.")
return
if self._app_state.test_mode_active:
logging.debug(f"{log_prefix} Skipping processing: Test Mode active.")
return
# Validate Input Image from state
current_normalized_sar = self._app_state.current_sar_normalized
if current_normalized_sar is None or current_normalized_sar.size == 0:
logging.warning(f"{log_prefix} No normalized SAR image in AppState. Cannot process.")
return
# Get parameters from state safely
try:
bc_lut = self._app_state.brightness_contrast_lut
palette = self._app_state.sar_palette
geo_info = self._app_state.current_sar_geo_info
target_w = self._app_state.sar_display_width
target_h = self._app_state.sar_display_height
except AttributeError as ae:
logging.error(f"{log_prefix} Missing required attribute in AppState: {ae}. Cannot process.")
return
# Validate essential parameters
if bc_lut is None:
logging.warning(f"{log_prefix} Brightness/Contrast LUT missing in AppState. Cannot process.")
return
if target_w <= 0 or target_h <= 0:
logging.warning(f"{log_prefix} Invalid target display dimensions ({target_w}x{target_h}). Cannot process.")
return
logging.debug(f"{log_prefix} Processing SAR image from AppState...")
# Work on a copy to avoid modifying state during processing
# Ensure the input image is uint8 as expected by subsequent steps
if current_normalized_sar.dtype != np.uint8:
logging.warning(f"{log_prefix} Input SAR normalized image is not uint8 ({current_normalized_sar.dtype}). Attempting conversion.")
try:
base_img = current_normalized_sar.astype(np.uint8)
except Exception as e:
logging.error(f"{log_prefix} Failed to convert input SAR to uint8: {e}. Aborting.")
return
else:
base_img = current_normalized_sar.copy()
try:
# --- Processing Steps ---
is_geo_valid = geo_info.get("valid", False) if geo_info else False
orient_rad = geo_info.get("orientation", 0.0) if is_geo_valid else 0.0
# 1. Apply B/C LUT (from state)
logging.debug(f"{log_prefix} Applying B/C LUT...")
img = cv2.LUT(base_img, bc_lut)
if self._app_state.shutting_down: return # Check after each step
# 2. Apply Color Palette (from state)
if palette != "GRAY":
logging.debug(f"{log_prefix} Applying color palette '{palette}'...")
img = apply_color_palette(img, palette) # Use imported function
if self._app_state.shutting_down: return
else:
logging.debug(f"{log_prefix} Skipping color palette (GRAY).")
# 3. Apply Rotation
if is_geo_valid and abs(orient_rad) > 1e-4:
logging.debug(f"{log_prefix} Applying rotation (Angle: {math.degrees(orient_rad):.2f} deg)...")
img = self._rotate_image(img, orient_rad) # Use helper method
if img is None or self._app_state.shutting_down: return # Check helper result and shutdown
else:
logging.debug(f"{log_prefix} Skipping rotation (Geo invalid or angle near zero).")
# 4. Resize Image
logging.debug(f"{log_prefix} Resizing image to {target_w}x{target_h}...")
img = self._resize_sar_image(img, target_w, target_h) # Use helper method
if img is None or self._app_state.shutting_down: # Check helper result and shutdown
return
logging.debug(f"{log_prefix} Resize complete. Final shape: {img.shape}")
# 5. Queue Final Image
# Use put_queue utility, passing the app instance for context
put_queue(self._sar_queue, img.copy(), "sar", self._app)
logging.debug(f"{log_prefix} Queued processed SAR image for display.")
except Exception as e:
# Log error only if not during shutdown
if not self._app_state.shutting_down:
logging.exception(f"{log_prefix} Error during SAR processing pipeline:")
def _rotate_image(self, img: np.ndarray, angle_rad: float) -> Optional[np.ndarray]:
"""
Helper method to rotate an image using OpenCV.
Args:
img (np.ndarray): Input image.
angle_rad (float): Rotation angle in radians.
Returns:
Optional[np.ndarray]: The rotated image, or None on critical error.
"""
log_prefix = f"{self._log_prefix} SAR Rotate Helper"
try:
deg = math.degrees(angle_rad)
h, w = img.shape[:2]
center = (w // 2, h // 2)
# Get rotation matrix
M = cv2.getRotationMatrix2D(center, deg, 1.0)
# Determine border color (black for BGR, 0 for grayscale)
border_color = [0,0,0] if img.ndim == 3 else 0
# Perform affine warp
rotated_img = cv2.warpAffine(
img,
M,
(w, h), # Output size same as input
flags=cv2.INTER_LINEAR, # Linear interpolation
borderMode=cv2.BORDER_CONSTANT,
borderValue=border_color # Fill borders with black
)
logging.debug(f"{log_prefix} Rotation successful.")
return rotated_img
except Exception as e:
# Log error and return None to indicate failure
logging.exception(f"{log_prefix} Rotation warpAffine error:")
return None
def _resize_sar_image(self, img: np.ndarray, target_w: int, target_h: int) -> Optional[np.ndarray]:
"""
Helper method to resize SAR image using the utility function.
Args:
img (np.ndarray): Input image.
target_w (int): Target width.
target_h (int): Target height.
Returns:
Optional[np.ndarray]: The resized image, or None on error.
"""
log_prefix = f"{self._log_prefix} SAR Resize Helper"
# Basic validation
if img is None or target_w <= 0 or target_h <=0:
logging.error(f"{log_prefix} Invalid input for resize (Image None or invalid dims {target_w}x{target_h}).")
return None
# Check if resize is actually needed
if img.shape[1] == target_w and img.shape[0] == target_h:
logging.debug(f"{log_prefix} Image already target size. Skipping resize.")
return img
# Perform resize using the function from image_processing
resized_img = resize_image(img, target_w, target_h) # This function handles logging
if resized_img is None:
logging.error(f"{log_prefix} SAR Resize failed (resize_image returned None).")
return None # Propagate None on failure
else:
logging.debug(f"{log_prefix} Resize successful.")
return resized_img
def process_mfd_for_display(self):
"""
Applies the MFD LUT from AppState to the indices in AppState and queues the result.
Checks shutdown and test mode flags from AppState.
"""
log_prefix = f"{self._log_prefix} MFD Process"
# Check flags first
if self._app_state.shutting_down:
logging.debug(f"{log_prefix} Skipping processing: Shutdown detected.")
return
if self._app_state.test_mode_active:
logging.debug(f"{log_prefix} Skipping processing: Test Mode active.")
return
# Validate Input Indices from state
current_mfd_indices = self._app_state.current_mfd_indices
if current_mfd_indices is None or current_mfd_indices.size == 0:
logging.warning(f"{log_prefix} No MFD indices in AppState. Cannot process.")
return
# Get MFD LUT from state
mfd_lut = self._app_state.mfd_lut
if mfd_lut is None:
logging.warning(f"{log_prefix} MFD LUT missing in AppState. Cannot process.")
return
logging.debug(f"{log_prefix} Processing MFD indices from AppState...")
try:
# --- Check shutdown before potentially heavy LUT application ---
if self._app_state.shutting_down:
logging.debug(f"{log_prefix} Shutdown detected before LUT application.")
return
# --- Apply MFD LUT (from state) ---
logging.debug(f"{log_prefix} Applying MFD LUT (shape {mfd_lut.shape}) to indices (shape {current_mfd_indices.shape})...")
mfd_bgr = mfd_lut[current_mfd_indices]
logging.debug(f"{log_prefix} MFD LUT applied. Result shape: {mfd_bgr.shape}.")
# --- Check shutdown again before queueing ---
if self._app_state.shutting_down:
logging.debug(f"{log_prefix} Shutdown detected after LUT application.")
return
# --- Queue Result ---
if mfd_bgr is not None:
# Use put_queue utility, passing the app instance for context
put_queue(self._mfd_queue, mfd_bgr.copy(), "mfd", self._app)
logging.debug(f"{log_prefix} Queued processed MFD image for display.")
else:
# This case should be rare if LUT is always valid and indices are valid
logging.error(f"{log_prefix} MFD BGR image is None after LUT application (unexpected).")
except IndexError as e:
# Handle index errors (e.g., index value > LUT size - 1)
min_idx, max_idx = "-","-"
try: # Safely try to get min/max for logging
if current_mfd_indices is not None:
min_idx = np.min(current_mfd_indices)
max_idx = np.max(current_mfd_indices)
except ValueError: pass # Ignore if array is empty etc.
# Log error only if not shutting down
if not self._app_state.shutting_down:
logging.error(
f"{log_prefix} MFD LUT IndexError: {e}. "
f"Indices range maybe ({min_idx},{max_idx}). "
f"LUT shape {mfd_lut.shape}"
)
except Exception as e:
# Log other errors only if not shutting down
if not self._app_state.shutting_down:
logging.exception(f"{log_prefix} Error applying LUT to MFD indices:")
# --- END OF FILE image_pipeline.py ---