# image_pipeline.py """ THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. Handles the image processing pipeline for normal mode operation. Reads current image data (SAR normalized, MFD indices) and parameters (LUTs, palette, rotation, size) from AppState, performs the necessary processing steps (LUT application, colorization, rotation, resizing), and queues the resulting images for display by the DisplayManager. """ # Standard library imports import logging import queue # For type hinting import math from typing import Optional # Third-party imports import numpy as np import cv2 # Local application imports from app_state import AppState from utils import put_queue from image_processing import ( apply_color_palette, # Keep specific imports needed resize_image, # normalize_image is used by receiver/test manager, not directly here ) # Forward declaration for type hinting App instance from typing import TYPE_CHECKING if TYPE_CHECKING: from app import App class ImagePipeline: """Processes SAR and MFD images based on AppState for display.""" def __init__( self, app_state: AppState, sar_queue: queue.Queue, mfd_queue: queue.Queue, app: "App", # Use forward declaration for App type hint ): """ Initializes the ImagePipeline. Args: app_state (AppState): Reference to the shared application state. sar_queue (queue.Queue): Queue for sending processed SAR images for display. mfd_queue (queue.Queue): Queue for sending processed MFD images for display. app (App): Reference to the main application instance (for put_queue context). """ self._log_prefix = "[ImagePipeline]" logging.debug(f"{self._log_prefix} Initializing...") self._app_state: AppState = app_state self._sar_queue: queue.Queue = sar_queue self._mfd_queue: queue.Queue = mfd_queue self._app: "App" = app # Store app reference logging.debug(f"{self._log_prefix} Initialization complete.") def process_sar_for_display(self): """ Processes the current SAR image from AppState (normalization, LUT, palette, rotation, resize) and queues it for display. Checks shutdown and test mode flags from AppState. """ log_prefix = f"{self._log_prefix} SAR Process" # Check flags first if self._app_state.shutting_down: logging.debug(f"{log_prefix} Skipping processing: Shutdown detected.") return if self._app_state.test_mode_active: logging.debug(f"{log_prefix} Skipping processing: Test Mode active.") return # Validate Input Image from state current_normalized_sar = self._app_state.current_sar_normalized if current_normalized_sar is None or current_normalized_sar.size == 0: logging.warning( f"{log_prefix} No normalized SAR image in AppState. Cannot process." ) return # Get parameters from state safely try: bc_lut = self._app_state.brightness_contrast_lut palette = self._app_state.sar_palette geo_info = self._app_state.current_sar_geo_info target_w = self._app_state.sar_display_width target_h = self._app_state.sar_display_height except AttributeError as ae: logging.error( f"{log_prefix} Missing required attribute in AppState: {ae}. Cannot process." ) return # Validate essential parameters if bc_lut is None: logging.warning( f"{log_prefix} Brightness/Contrast LUT missing in AppState. Cannot process." ) return if target_w <= 0 or target_h <= 0: logging.warning( f"{log_prefix} Invalid target display dimensions ({target_w}x{target_h}). Cannot process." ) return logging.debug(f"{log_prefix} Processing SAR image from AppState...") # Work on a copy to avoid modifying state during processing # Ensure the input image is uint8 as expected by subsequent steps if current_normalized_sar.dtype != np.uint8: logging.warning( f"{log_prefix} Input SAR normalized image is not uint8 ({current_normalized_sar.dtype}). Attempting conversion." ) try: base_img = current_normalized_sar.astype(np.uint8) except Exception as e: logging.error( f"{log_prefix} Failed to convert input SAR to uint8: {e}. Aborting." ) return else: base_img = current_normalized_sar.copy() try: # --- Processing Steps --- is_geo_valid = geo_info.get("valid", False) if geo_info else False original_orient_rad = ( geo_info.get("orientation", 0.0) if is_geo_valid else 0.0 ) # Negate the angle for display purposes as requested. orient_rad_for_display = -original_orient_rad logging.debug( f"{log_prefix} Original orientation: {math.degrees(original_orient_rad):.2f} deg. Using {-math.degrees(original_orient_rad):.2f} deg for display rotation." ) # 1. Apply B/C LUT (from state) logging.debug(f"{log_prefix} Applying B/C LUT...") img = cv2.LUT(base_img, bc_lut) if self._app_state.shutting_down: return # Check after each step # 2. Apply Color Palette (from state) if palette != "GRAY": logging.debug(f"{log_prefix} Applying color palette '{palette}'...") img = apply_color_palette(img, palette) # Use imported function if self._app_state.shutting_down: return else: logging.debug(f"{log_prefix} Skipping color palette (GRAY).") # 3. Apply Rotation if is_geo_valid and abs(orient_rad_for_display) > 1e-4: logging.debug( f"{log_prefix} Applying rotation (Angle: {math.degrees(orient_rad_for_display):.2f} deg)..." ) img = self._rotate_image( img, orient_rad_for_display ) # Use helper method if img is None or self._app_state.shutting_down: return # Check helper result and shutdown else: logging.debug( f"{log_prefix} Skipping rotation (Geo invalid or angle near zero)." ) # 4. Resize Image logging.debug(f"{log_prefix} Resizing image to {target_w}x{target_h}...") img = self._resize_sar_image(img, target_w, target_h) # Use helper method if ( img is None or self._app_state.shutting_down ): # Check helper result and shutdown return logging.debug(f"{log_prefix} Resize complete. Final shape: {img.shape}") # 5. Draw Marker (if coordinates exist in state) click_coords = self._app_state.last_sar_click_coords if ( click_coords and isinstance(click_coords, tuple) and len(click_coords) == 2 ): try: marker_x, marker_y = click_coords marker_color = (0, 0, 255) # BGR for Red marker_type = cv2.MARKER_CROSS marker_size = 15 marker_thickness = 2 # Ensure image is BGR before drawing color marker if it was GRAY if img.ndim == 2 or (img.ndim == 3 and img.shape[2] == 1): img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) logging.debug( f"{log_prefix} Drawing SAR click marker at {click_coords}" ) cv2.drawMarker( img, (marker_x, marker_y), marker_color, marker_type, marker_size, marker_thickness, ) except Exception as draw_err: logging.exception( f"{log_prefix} Error drawing SAR marker: {draw_err}" ) # 6. Queue Final Image (era step 5) if self._app_state.shutting_down: return # Final check put_queue(self._sar_queue, img.copy(), "sar", self._app) logging.debug( f"{log_prefix} Queued processed SAR image (with marker if applicable) for display." ) except Exception as e: # Log error only if not during shutdown if not self._app_state.shutting_down: logging.exception(f"{log_prefix} Error during SAR processing pipeline:") # Only the modified function _rotate_image is sent. # The process_sar_for_display function does not need changes for this specific request, # as it will receive the potentially larger image from the modified _rotate_image # and pass it correctly to _resize_sar_image. # Function to be modified: _rotate_image def _rotate_image(self, img: np.ndarray, angle_rad: float) -> Optional[np.ndarray]: """ Helper method to rotate an image using OpenCV, ensuring the entire rotated image is contained within the output canvas by resizing the canvas and adding borders as necessary. Args: img (np.ndarray): Input image (can be grayscale or BGR). angle_rad (float): Rotation angle in radians. Returns: Optional[np.ndarray]: The rotated image within an expanded canvas, or None on critical error. """ log_prefix = f"{self._log_prefix} SAR Rotate Helper" try: # 1. Get image dimensions and center h, w = img.shape[:2] center_x, center_y = w // 2, h // 2 # 2. Calculate rotation matrix for the original center deg = math.degrees(angle_rad) M = cv2.getRotationMatrix2D((center_x, center_y), deg, 1.0) # 3. Calculate the new bounding box dimensions cos = np.abs(M[0, 0]) sin = np.abs(M[0, 1]) new_w = int((h * sin) + (w * cos)) new_h = int((h * cos) + (w * sin)) logging.debug( f"{log_prefix} Original dims ({w}x{h}), Rotated BBox dims ({new_w}x{new_h}) for angle {deg:.2f} deg." ) # 4. Adjust the rotation matrix to account for translation # The matrix needs to shift the image center to the center of the new, larger canvas. M[0, 2] += (new_w / 2) - center_x M[1, 2] += (new_h / 2) - center_y logging.debug(f"{log_prefix} Rotation matrix adjusted for translation.") # 5. Determine border color (black) border_color = [0, 0, 0] if img.ndim == 3 else 0 # 6. Perform the affine transformation on the larger canvas rotated_img = cv2.warpAffine( img, M, (new_w, new_h), # Use the new bounding box dimensions flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT, borderValue=border_color, # Fill borders with black ) logging.debug( f"{log_prefix} Rotation and warpAffine successful. Output shape: {rotated_img.shape}" ) return rotated_img except Exception as e: # Log error and return None to indicate failure logging.exception(f"{log_prefix} Rotation/warpAffine error:") return None def _resize_sar_image( self, img: np.ndarray, target_w: int, target_h: int ) -> Optional[np.ndarray]: """ Helper method to resize SAR image using the utility function. Args: img (np.ndarray): Input image. target_w (int): Target width. target_h (int): Target height. Returns: Optional[np.ndarray]: The resized image, or None on error. """ log_prefix = f"{self._log_prefix} SAR Resize Helper" # Basic validation if img is None or target_w <= 0 or target_h <= 0: logging.error( f"{log_prefix} Invalid input for resize (Image None or invalid dims {target_w}x{target_h})." ) return None # Check if resize is actually needed if img.shape[1] == target_w and img.shape[0] == target_h: logging.debug(f"{log_prefix} Image already target size. Skipping resize.") return img # Perform resize using the function from image_processing resized_img = resize_image( img, target_w, target_h ) # This function handles logging if resized_img is None: logging.error( f"{log_prefix} SAR Resize failed (resize_image returned None)." ) return None # Propagate None on failure else: logging.debug(f"{log_prefix} Resize successful.") return resized_img def process_mfd_for_display(self): """ Applies the MFD LUT from AppState to the indices in AppState and queues the result. Checks shutdown and test mode flags from AppState. """ log_prefix = f"{self._log_prefix} MFD Process" # Check flags first if self._app_state.shutting_down: logging.debug(f"{log_prefix} Skipping processing: Shutdown detected.") return if self._app_state.test_mode_active: logging.debug(f"{log_prefix} Skipping processing: Test Mode active.") return # Validate Input Indices from state current_mfd_indices = self._app_state.current_mfd_indices if current_mfd_indices is None or current_mfd_indices.size == 0: logging.warning(f"{log_prefix} No MFD indices in AppState. Cannot process.") return # Get MFD LUT from state mfd_lut = self._app_state.mfd_lut if mfd_lut is None: logging.warning( f"{log_prefix} MFD LUT missing in AppState. Cannot process." ) return logging.debug(f"{log_prefix} Processing MFD indices from AppState...") try: # --- Check shutdown before potentially heavy LUT application --- if self._app_state.shutting_down: logging.debug(f"{log_prefix} Shutdown detected before LUT application.") return # --- Apply MFD LUT (from state) --- logging.debug( f"{log_prefix} Applying MFD LUT (shape {mfd_lut.shape}) to indices (shape {current_mfd_indices.shape})..." ) mfd_bgr = mfd_lut[current_mfd_indices] logging.debug( f"{log_prefix} MFD LUT applied. Result shape: {mfd_bgr.shape}." ) # --- Check shutdown again before queueing --- if self._app_state.shutting_down: logging.debug(f"{log_prefix} Shutdown detected after LUT application.") return # --- Queue Result --- if mfd_bgr is not None: # Use put_queue utility, passing the app instance for context put_queue(self._mfd_queue, mfd_bgr.copy(), "mfd", self._app) logging.debug(f"{log_prefix} Queued processed MFD image for display.") else: # This case should be rare if LUT is always valid and indices are valid logging.error( f"{log_prefix} MFD BGR image is None after LUT application (unexpected)." ) except IndexError as e: # Handle index errors (e.g., index value > LUT size - 1) min_idx, max_idx = "-", "-" try: # Safely try to get min/max for logging if current_mfd_indices is not None: min_idx = np.min(current_mfd_indices) max_idx = np.max(current_mfd_indices) except ValueError: pass # Ignore if array is empty etc. # Log error only if not shutting down if not self._app_state.shutting_down: logging.error( f"{log_prefix} MFD LUT IndexError: {e}. " f"Indices range maybe ({min_idx},{max_idx}). " f"LUT shape {mfd_lut.shape}" ) except Exception as e: # Log other errors only if not shutting down if not self._app_state.shutting_down: logging.exception(f"{log_prefix} Error applying LUT to MFD indices:")