# --- START OF FILE map_integration.py --- # map_integration.py """ THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. Manages map integration functionalities, including service interaction, tile fetching/caching, display window management, SAR overlay application (with alpha blending and user-defined shifting), coordinate conversion, and map saving. Acts as an intermediary between the main application and map-specific modules. Implements masked blending for SAR overlay and rapid recomposition for alpha changes. """ # Standard library imports import logging import threading import queue import math import os import datetime from typing import Optional, Dict, Any, Tuple, List from pathlib import Path # Third-party imports import numpy as np try: from PIL import Image, ImageDraw, ImageFont ImageType = Image.Image except ImportError: Image = None ImageDraw = None ImageFont = None ImageType = None # type: ignore try: import mercantile except ImportError: mercantile = None try: import pyproj except ImportError: pyproj = None # OpenCV is needed for warping and blending import cv2 # Local application imports import config from app_state import AppState from utils import put_queue, decimal_to_dms # Map specific modules from map_services import get_map_service, BaseMapService from map_manager import MapTileManager from map_utils import ( get_bounding_box_from_center_size, get_tile_ranges_for_bbox, MapCalculationError, calculate_meters_per_pixel, ) from map_display import MapDisplayWindow # Import image processing function needed for overlay from image_processing import apply_color_palette # Forward declaration for type hinting App instance from typing import TYPE_CHECKING if TYPE_CHECKING: from ControlPanel import ControlPanelApp class MapIntegrationManager: """Orchestrates map services, tile management, and display including SAR overlay.""" DEFAULT_SAVE_DIRECTORY = "saved_map_views" def __init__( self, app_state: AppState, tkinter_queue: queue.Queue, app: "ControlPanelApp", map_x: int, map_y: int, ): """Initializes the MapIntegrationManager.""" self._log_prefix = "[MapIntegrationManager]" logging.debug(f"{self._log_prefix} Initializing...") self._app_state: AppState = app_state self._tkinter_queue: queue.Queue = tkinter_queue self._app: "ControlPanelApp" = app # Dependency Checks if Image is None: raise ImportError("Pillow library not found") if mercantile is None: raise ImportError("mercantile library not found") if pyproj is None: raise ImportError("pyproj library not found") if ImageDraw is None or ImageFont is None: logging.warning(f"{self._log_prefix} Pillow ImageDraw/ImageFont not found.") # Attributes self._map_service: Optional[BaseMapService] = None self._map_tile_manager: Optional[MapTileManager] = None self._map_display_window: Optional[MapDisplayWindow] = None self._map_initial_display_thread: Optional[threading.Thread] = None self._geod: Optional[pyproj.Geod] = None self._map_update_lock = threading.Lock() try: # Geodetic Calculator self._geod = pyproj.Geod(ellps="WGS84") # Map Service service_name = getattr(config, "MAP_SERVICE_PROVIDER", "osm") api_key = getattr(config, "MAP_API_KEY", None) self._map_service = get_map_service(service_name, api_key) if not self._map_service: raise ValueError(f"Failed to get map service '{service_name}'.") # Tile Manager cache_dir = getattr(config, "MAP_CACHE_DIRECTORY", None) online_fetch = getattr(config, "ENABLE_ONLINE_MAP_FETCHING", None) self._map_tile_manager = MapTileManager( self._map_service, cache_dir, online_fetch ) # Map Display Window self._map_display_window = MapDisplayWindow( self._app, "Map Overlay", map_x, map_y ) # Initial Map Display Thread self._app.set_status("Loading initial map...") self._map_initial_display_thread = threading.Thread( target=self._display_initial_map_area_thread, name="InitialMapDisplayThread", daemon=True, ) self._map_initial_display_thread.start() except Exception as init_err: logging.critical( f"{self._log_prefix} Initialization failed: {init_err}", exc_info=True ) self._geod = None self._map_service = None self._map_tile_manager = None self._map_display_window = None raise def _display_initial_map_area_thread(self): """(Runs in background thread) Calculates initial map area and queues for display.""" # (Implementation unchanged from previous version) log_prefix = f"{self._log_prefix} InitialMap" base_map_image_pil: Optional[ImageType] = None map_bounds_deg: Optional[Tuple[float, float, float, float]] = None zoom: Optional[int] = None if config.SAR_CENTER_LAT == 0.0 and config.SAR_CENTER_LON == 0.0: self._update_app_state_map_context(None, None, None) self._queue_map_for_display(None) return if not (self._map_tile_manager and self._map_display_window and self._geod): self._update_app_state_map_context(None, None, None) self._queue_map_for_display(None) return if self._app_state.shutting_down: return try: zoom = config.DEFAULT_MAP_ZOOM_LEVEL center_lat_deg = config.SAR_CENTER_LAT center_lon_deg = config.SAR_CENTER_LON size_km = config.SAR_IMAGE_SIZE_KM fetch_bbox_deg = get_bounding_box_from_center_size( center_lat_deg, center_lon_deg, size_km * 1.2 ) if fetch_bbox_deg is None: raise MapCalculationError("BBox calc failed.") tile_ranges = get_tile_ranges_for_bbox(fetch_bbox_deg, zoom) if tile_ranges is None: raise MapCalculationError("Tile range calc failed.") if self._app_state.shutting_down: return map_bounds_deg = self._get_bounds_for_tile_range(zoom, tile_ranges) if map_bounds_deg is None: raise MapCalculationError("Map bounds calc failed.") base_map_image_pil = self._map_tile_manager.stitch_map_image( zoom, tile_ranges[0], tile_ranges[1] ) if self._app_state.shutting_down: return if base_map_image_pil: base_map_image_pil = self._draw_scale_bar( base_map_image_pil, center_lat_deg, zoom ) except Exception as e: logging.exception(f"{log_prefix} Error calculating initial map:") base_map_image_pil = None map_bounds_deg = None zoom = None finally: if not self._app_state.shutting_down: self._update_app_state_map_context( base_map_image_pil, map_bounds_deg, zoom ) self._queue_map_for_display( base_map_image_pil ) # Initial map is also the composed map initially logging.debug(f"{log_prefix} Initial map display thread finished.") # --- >>> START OF MODIFIED FUNCTION <<< --- def update_map_overlay( self, sar_normalized_uint8: Optional[np.ndarray], geo_info_radians: Optional[Dict[str, Any]], ): """ Calculates the map overlay based on current state. Fetches/stitches map, applies SAR shift, processes SAR image, warps/blends overlay using masked alpha, draws bounding box, draws map click marker, adds scale bar, updates AppState (base map, context, processed SAR, warp info, corners, composed map), and queues the result for display. Uses a lock to prevent concurrent updates. """ log_prefix = f"{self._log_prefix} Map Update" if not self._map_update_lock.acquire(blocking=False): logging.debug(f"{log_prefix} Map update already in progress. Skipping.") return # Initialize variables for this update cycle outside the try block # So they can be referenced in the final 'finally' block for AppState update base_map_pil: Optional[ImageType] = None final_composed_pil: Optional[ImageType] = None map_bounds_deg: Optional[Tuple[float, float, float, float]] = None zoom: Optional[int] = None processed_sar_overlay: Optional[np.ndarray] = None # Store processed SAR for AppState sar_warp_matrix: Optional[np.ndarray] = None # Store warp matrix for AppState sar_corners_pixels: Optional[List[Tuple[int, int]]] = None # Store corners for AppState try: # Prerequisite Checks if self._app_state.shutting_down or self._app_state.test_mode_active: logging.debug(f"{log_prefix} Skipping: Shutdown or Test Mode active.") return # Return early if shutting down or in test mode if not (self._map_tile_manager and self._map_display_window and self._geod): logging.warning(f"{log_prefix} Skipping: Map components not ready.") return # Return early if components missing if not geo_info_radians or not geo_info_radians.get("valid", False): logging.warning(f"{log_prefix} Skipping: Invalid GeoInfo.") return # Return early if GeoInfo invalid if Image is None or mercantile is None or pyproj is None or cv2 is None: logging.error(f"{log_prefix} Skipping: Required libraries missing (PIL/mercantile/pyproj/cv2).") return # Return early if libs missing # State Variables overlay_enabled = self._app_state.map_sar_overlay_enabled overlay_alpha = self._app_state.map_sar_overlay_alpha lat_shift_deg = self._app_state.sar_lat_shift_deg lon_shift_deg = self._app_state.sar_lon_shift_deg # SAR Data Check (only if overlay enabled) if overlay_enabled and ( sar_normalized_uint8 is None or sar_normalized_uint8.size == 0 ): logging.warning( f"{log_prefix} Disabling overlay for this frame: Missing SAR data." ) overlay_enabled = False # Disable locally for this frame logging.debug( f"{log_prefix} Starting (Overlay:{overlay_enabled}, Alpha:{overlay_alpha:.2f}, Shift:{lat_shift_deg:.6f},{lon_shift_deg:.6f})..." ) # Use a nested try...except specifically for map/overlay calculations try: # Apply Shift to Center Coordinates center_lat_rad = geo_info_radians.get("lat", 0.0) center_lon_rad = geo_info_radians.get("lon", 0.0) shifted_lat_deg = max( -90.0, min(90.0, math.degrees(center_lat_rad) + lat_shift_deg) ) shifted_lon_deg = ( (math.degrees(center_lon_rad) + lon_shift_deg + 180) % 360 ) - 180 # Calculate Common Parameters (using SHIFTED center) scale_x = geo_info_radians.get("scale_x", 0.0) width_px = geo_info_radians.get("width_px", 0) size_km = ( (scale_x * width_px / 1000.0) if (scale_x > 0 and width_px > 0) else config.SAR_IMAGE_SIZE_KM ) zoom = config.DEFAULT_MAP_ZOOM_LEVEL # Use configured zoom # Fetch and Stitch Base Map fetch_bbox_deg = get_bounding_box_from_center_size( shifted_lat_deg, shifted_lon_deg, size_km * 1.2 # Fetch slightly larger area ) if fetch_bbox_deg is None: raise MapCalculationError("Tile BBox calc failed.") tile_ranges = get_tile_ranges_for_bbox(fetch_bbox_deg, zoom) if tile_ranges is None: raise MapCalculationError("Tile range calc failed.") if self._app_state.shutting_down: return # Check shutdown before long operation map_bounds_deg = self._get_bounds_for_tile_range(zoom, tile_ranges) if map_bounds_deg is None: raise MapCalculationError("Map bounds calc failed.") base_map_pil = self._map_tile_manager.stitch_map_image( zoom, tile_ranges[0], tile_ranges[1] ) if base_map_pil is None: raise MapCalculationError("Base map stitch failed.") if self._app_state.shutting_down: return # Check again after potential long operation # Calculate SAR Footprint Pixels on Map shifted_geo_info = geo_info_radians.copy() shifted_geo_info["lat"] = math.radians(shifted_lat_deg) shifted_geo_info["lon"] = math.radians(shifted_lon_deg) sar_corners_deg = self._calculate_sar_corners_geo(shifted_geo_info) if sar_corners_deg is None: raise MapCalculationError("SAR corner geo calc failed.") map_shape_yx = base_map_pil.size[::-1] # PIL size is (W,H), need (H,W) sar_corners_pixels = self._geo_coords_to_map_pixels( sar_corners_deg, map_bounds_deg, tile_ranges, zoom, map_shape_yx ) if sar_corners_pixels is None: raise MapCalculationError("SAR corner pixel conversion failed.") # Prepare Base Map Image (NumPy BGR) for drawing/blending map_cv_bgr = cv2.cvtColor(np.array(base_map_pil), cv2.COLOR_RGB2BGR) map_h, map_w = map_cv_bgr.shape[:2] display_cv_bgr = map_cv_bgr.copy() # Start with base map for final composition # --- Apply SAR Overlay OR Draw Bounding Box --- if overlay_enabled: # SAR data validity checked earlier logging.debug(f"{log_prefix} Processing SAR image for overlay...") processed_sar_overlay = self._process_sar_for_overlay( sar_normalized_uint8 ) # Store this for AppState cache if processed_sar_overlay is not None: logging.debug(f"{log_prefix} Warping SAR image onto map perspective...") try: sar_h, sar_w = processed_sar_overlay.shape[:2] pts_sar = np.float32([[0, 0], [sar_w - 1, 0], [sar_w - 1, sar_h - 1], [0, sar_h - 1]]) pts_map = np.float32(sar_corners_pixels) sar_warp_matrix = cv2.getPerspectiveTransform(pts_sar, pts_map) # Store this for AppState cache # Warp the processed SAR image warped_sar_bgr = cv2.warpPerspective( src=processed_sar_overlay, M=sar_warp_matrix, dsize=(map_w, map_h), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT, borderValue=(0, 0, 0) # Use black border for mask generation ) # Create Mask from warped SAR (non-black pixels) gray_warped = cv2.cvtColor(warped_sar_bgr, cv2.COLOR_BGR2GRAY) _, sar_mask = cv2.threshold(gray_warped, 1, 255, cv2.THRESH_BINARY) # --- MASKED Alpha Blending --- # Convert mask to float (0.0 to 1.0) and multiply by alpha mask_alpha = (sar_mask.astype(np.float32) / 255.0) * overlay_alpha # Expand mask_alpha to 3 channels mask_alpha_3ch = cv2.cvtColor(mask_alpha, cv2.COLOR_GRAY2BGR) logging.debug(f"{log_prefix} Blending map and warped SAR using mask (Alpha: {overlay_alpha:.2f})...") # Blend: map * (1 - mask*alpha) + warped_sar * (mask*alpha) term1 = map_cv_bgr.astype(np.float32) * (1.0 - mask_alpha_3ch) term2 = warped_sar_bgr.astype(np.float32) * mask_alpha_3ch blended_float = term1 + term2 display_cv_bgr = np.clip(blended_float, 0, 255).astype(np.uint8) # Final blended result logging.debug(f"{log_prefix} Masked alpha blending complete.") except cv2.error as warp_err: logging.exception(f"{log_prefix} OpenCV error during SAR warp/blend:") overlay_enabled = False # Fallback to drawing box except Exception as warp_generic_err: logging.exception(f"{log_prefix} Unexpected error during SAR warp/blend:") overlay_enabled = False # Fallback to drawing box else: logging.warning(f"{log_prefix} SAR processing for overlay failed. Disabling overlay.") overlay_enabled = False # Fallback to drawing box # Draw Red Box (If overlay was disabled or failed during processing) if not overlay_enabled: logging.debug(f"{log_prefix} Drawing SAR bounding box...") try: pts = np.array(sar_corners_pixels, np.int32).reshape((-1, 1, 2)) cv2.polylines(display_cv_bgr, [pts], isClosed=True, color=(0, 0, 255), thickness=2) except Exception as draw_err: logging.exception(f"{log_prefix} Error drawing SAR bounding box:") # --- >>> START OF MAP MARKER DRAWING <<< --- # Draw Map Click Marker onto the composed BGR image BEFORE converting to PIL map_click_coords = self._app_state.last_map_click_coords if map_click_coords and isinstance(map_click_coords, tuple) and len(map_click_coords) == 2: try: marker_x, marker_y = map_click_coords marker_color = (0, 0, 255) # BGR for Red marker_type = cv2.MARKER_CROSS marker_size = 15 marker_thickness = 2 logging.debug(f"{log_prefix} Drawing Map click marker at {map_click_coords}") # Draw directly onto display_cv_bgr cv2.drawMarker(display_cv_bgr, (marker_x, marker_y), marker_color, marker_type, marker_size, marker_thickness) except Exception as draw_err: logging.exception(f"{log_prefix} Error drawing Map marker: {draw_err}") # --- >>> END OF MAP MARKER DRAWING <<< --- # Convert Final CV Image back to PIL final_composed_pil = Image.fromarray( cv2.cvtColor(display_cv_bgr, cv2.COLOR_BGR2RGB) ) # Add Scale Bar (to PIL image) final_composed_pil = self._draw_scale_bar( final_composed_pil, shifted_lat_deg, zoom ) except MapCalculationError as e: logging.error(f"{log_prefix} Map overlay calculation failed: {e}") final_composed_pil = self._get_fallback_map() # Use last base map if possible except Exception as e: logging.exception(f"{log_prefix} Unexpected error during map overlay update:") final_composed_pil = self._get_fallback_map() # Use last base map if possible finally: # --- Update AppState and Queue --- # This block runs even if errors occurred during calculation if not self._app_state.shutting_down: # Update context (bounds, zoom, base map) - use values calculated in the 'try' block self._update_app_state_map_context( base_map_pil, # The stitched map before overlay/markers map_bounds_deg, zoom ) # Update SAR overlay cache data self._app_state.last_processed_sar_for_overlay = processed_sar_overlay.copy() if processed_sar_overlay is not None else None self._app_state.last_sar_warp_matrix = sar_warp_matrix.copy() if sar_warp_matrix is not None else None self._app_state.last_sar_corners_pixels_map = sar_corners_pixels.copy() if sar_corners_pixels is not None else None # Queue the final composed image (or fallback) for display # This also updates self._app_state.last_composed_map_pil self._queue_map_for_display(final_composed_pil) self._map_update_lock.release() # Ensure lock is released logging.debug(f"{log_prefix} Map update finished.") # Function modified: _recompose_map_overlay def _recompose_map_overlay(self): """ Re-applies SAR overlay with current alpha/settings using cached data and draws the last map click marker. If cached data is insufficient, it logs a warning and exits. Uses a lock to prevent concurrent updates. """ log_prefix = f"{self._log_prefix} RecomposeMap" if not self._map_update_lock.acquire(blocking=False): logging.debug( f"{log_prefix} Map update/recomposition already in progress. Skipping." ) return final_recomposed_pil: Optional[ImageType] = None # Initialize here try: if self._app_state.shutting_down: logging.debug(f"{log_prefix} Skipping: Shutdown detected.") return logging.debug(f"{log_prefix} Starting map recomposition...") # --- Get cached data & current state --- base_map_pil = self._app_state.last_map_image_pil processed_sar = self._app_state.last_processed_sar_for_overlay warp_matrix = self._app_state.last_sar_warp_matrix sar_corner_pixels = self._app_state.last_sar_corners_pixels_map map_click_coords = self._app_state.last_map_click_coords # Get marker coords overlay_enabled = self._app_state.map_sar_overlay_enabled overlay_alpha = self._app_state.map_sar_overlay_alpha # Get lat/zoom from state for scale bar calculation (if possible) latitude_deg = 0.0 zoom = None current_geo = self._app_state.current_sar_geo_info if current_geo and current_geo.get('valid'): latitude_deg = math.degrees(current_geo.get('lat', 0.0)) # Apply shift to latitude if scale bar should reflect shifted center latitude_deg += self._app_state.sar_lat_shift_deg latitude_deg = max(-90.0, min(90.0, latitude_deg)) # Clamp after shift zoom = self._app_state.map_current_zoom # --- Validate necessary cached data for the CURRENT desired state --- if base_map_pil is None: logging.warning(f"{log_prefix} Cannot recompose: Base map image missing.") return # Cannot proceed without base map # Check data needed based on whether overlay is enabled or not if overlay_enabled and (processed_sar is None or warp_matrix is None): logging.error( f"{log_prefix} Cannot recompose overlay: SAR cache data missing " f"(ProcessedSAR:{processed_sar is not None}, WarpMatrix:{warp_matrix is not None})." "Consider triggering a full update." ) # Decide fallback: Trigger full update or just show base map? # For now, just log and exit, letting the user trigger full update if needed. return elif not overlay_enabled and sar_corner_pixels is None: # If overlay is off, we need corner pixels to draw the box logging.warning( f"{log_prefix} Cannot draw bounding box during recompose: Corner pixels missing." "Showing base map without box." ) # Fallback: Use base map directly, skip drawing box/overlay final_recomposed_pil = base_map_pil.copy() # Start with base map # Skip further processing, jump to scale bar / queuing later # --- Recomposition Steps (only if not fallen back to base map already) --- if final_recomposed_pil is None: logging.debug(f"{log_prefix} Starting recomposition drawing...") map_cv_bgr = cv2.cvtColor(np.array(base_map_pil), cv2.COLOR_RGB2BGR) map_h, map_w = map_cv_bgr.shape[:2] display_cv_bgr = map_cv_bgr.copy() # Start with base map # Apply Overlay if enabled (we already checked cache validity) if overlay_enabled: logging.debug(f"{log_prefix} Recomposing SAR overlay (Alpha: {overlay_alpha:.2f})...") try: # Warp using cached matrix warped_sar_bgr = cv2.warpPerspective( processed_sar, warp_matrix, (map_w, map_h), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT, borderValue=(0, 0, 0), ) # Create mask warped_sar_gray = cv2.cvtColor(warped_sar_bgr, cv2.COLOR_BGR2GRAY) _, sar_mask = cv2.threshold(warped_sar_gray, 1, 255, cv2.THRESH_BINARY) # Blend using current alpha mask_alpha = (sar_mask.astype(np.float32) / 255.0) * overlay_alpha mask_alpha_3ch = cv2.cvtColor(mask_alpha, cv2.COLOR_GRAY2BGR) term1 = map_cv_bgr.astype(np.float32) * (1.0 - mask_alpha_3ch) term2 = warped_sar_bgr.astype(np.float32) * mask_alpha_3ch blended_float = term1 + term2 display_cv_bgr = np.clip(blended_float, 0, 255).astype(np.uint8) # Update display image except Exception as e: logging.exception(f"{log_prefix} Error during SAR overlay recomposition warp/blend:") # Fallback to base map if recompose fails catastrophically display_cv_bgr = map_cv_bgr # Reset to base map # Draw Bounding Box if overlay disabled (and corners exist) elif sar_corner_pixels is not None: logging.debug(f"{log_prefix} Recomposing SAR bounding box...") try: pts = np.array(sar_corner_pixels, np.int32).reshape((-1, 1, 2)) cv2.polylines(display_cv_bgr, [pts], isClosed=True, color=(0, 0, 255), thickness=2) except Exception as e: logging.exception(f"{log_prefix} Error recomposing bounding box:") # display_cv_bgr remains base map # --- >>> START OF MAP MARKER DRAWING <<< --- # Draw Map Click Marker onto the recomposed BGR image if map_click_coords and isinstance(map_click_coords, tuple) and len(map_click_coords) == 2: try: marker_x, marker_y = map_click_coords marker_color = (0, 0, 255) # BGR for Red marker_type = cv2.MARKER_CROSS marker_size = 15 marker_thickness = 2 logging.debug(f"{log_prefix} Recomposing: Drawing Map click marker at {map_click_coords}") # Draw directly onto display_cv_bgr cv2.drawMarker(display_cv_bgr, (marker_x, marker_y), marker_color, marker_type, marker_size, marker_thickness) except Exception as draw_err: logging.exception(f"{log_prefix} Recomposing: Error drawing Map marker: {draw_err}") # --- >>> END OF MAP MARKER DRAWING <<< --- # Convert final OpenCV image back to PIL final_recomposed_pil = Image.fromarray( cv2.cvtColor(display_cv_bgr, cv2.COLOR_BGR2RGB) ) # End of drawing logic if not fallen back to base map # --- Add Scale Bar (if possible) --- if final_recomposed_pil and zoom is not None and math.isfinite(latitude_deg): # Add scale bar to the potentially modified final_recomposed_pil final_recomposed_pil = self._draw_scale_bar( final_recomposed_pil, latitude_deg, zoom ) elif final_recomposed_pil: # Log if scale bar cannot be added logging.warning( f"{log_prefix} Cannot draw scale bar during recompose (missing zoom/lat context? Zoom:{zoom}, LatFinite:{math.isfinite(latitude_deg)})." ) # --- Update state and Queue --- # Queue the recomposed image (or the base map fallback) for display # This call also updates AppState.last_composed_map_pil self._queue_map_for_display(final_recomposed_pil) except Exception as e: logging.exception(f"{log_prefix} Unexpected error during map recomposition:") # Attempt to queue base map as fallback if something unexpected happened if not self._app_state.shutting_down: base_map = self._app_state.last_map_image_pil if base_map: logging.warning(f"{log_prefix} Queuing base map as fallback due to error.") self._queue_map_for_display(base_map) else: logging.error(f"{log_prefix} Cannot queue fallback, base map is also None.") finally: self._map_update_lock.release() # Release lock regardless of success/failure logging.debug(f"{log_prefix} Map recomposition finished.") def _process_sar_for_overlay( self, sar_normalized_uint8: np.ndarray ) -> Optional[np.ndarray]: """Applies B/C LUT and Palette to SAR image for overlay.""" # (Implementation unchanged) log_prefix = f"{self._log_prefix} ProcessSAROverlay" try: bc_lut = self._app_state.brightness_contrast_lut palette = self._app_state.sar_palette if bc_lut is None: raise ValueError("B/C LUT is None") processed_sar = cv2.LUT(sar_normalized_uint8, bc_lut) if palette != "GRAY": processed_sar = apply_color_palette(processed_sar, palette) elif processed_sar.ndim == 2: processed_sar = cv2.cvtColor(processed_sar, cv2.COLOR_GRAY2BGR) return processed_sar except Exception as e: logging.exception(f"{log_prefix} Error:") return None def _get_bounds_for_tile_range( self, zoom: int, tile_ranges: Tuple[Tuple[int, int], Tuple[int, int]] ) -> Optional[Tuple[float, float, float, float]]: """Calculate the precise geographic bounds covered by a given range of tiles.""" # (Implementation corrected in previous step) log_prefix = f"{self._log_prefix} TileBounds" if mercantile is None: return None try: min_x, max_x = tile_ranges[0] min_y, max_y = tile_ranges[1] ul_bound = mercantile.bounds(min_x, min_y, zoom) lr_bound = mercantile.bounds(max_x, max_y, zoom) west = ul_bound[0] east = lr_bound[2] south = lr_bound[1] north = ul_bound[3] return (west, south, east, north) except Exception as e: logging.exception( f"{log_prefix} Error calculating bounds for tile range: {e}" ) return None def _update_app_state_map_context( self, base_map_pil: Optional[ImageType], bounds_deg: Optional[Tuple[float, float, float, float]], zoom: Optional[int], ): """Safely updates map context information in AppState.""" # (Implementation unchanged) log_prefix = f"{self._log_prefix} UpdateMapContext" try: self._app_state.last_map_image_pil = ( base_map_pil.copy() if base_map_pil else None ) self._app_state.map_current_bounds_deg = bounds_deg self._app_state.map_current_zoom = zoom # Shape is updated later by display_map logging.debug( f"{log_prefix} Updated AppState: Bounds={bounds_deg}, Zoom={zoom}, BasePIL={'Set' if base_map_pil else 'None'}" ) except Exception as e: logging.exception(f"{log_prefix} Error updating AppState map context:") # --- >>> START OF MODIFIED FUNCTION <<< --- def _queue_map_for_display(self, image_pil: Optional[ImageType]): """Puts the final composed map image onto the Tkinter queue AND updates composed state.""" log_prefix = f"{self._log_prefix} QueueMap" payload_type = type(image_pil).__name__ if image_pil else "None" # --- Update composed map state --- try: self._app_state.last_composed_map_pil = ( image_pil.copy() if image_pil else None ) logging.debug( f"{log_prefix} Updated last_composed_map_pil in AppState (Type: {payload_type})." ) except Exception as e: logging.exception( f"{log_prefix} Error updating last_composed_map_pil in AppState:" ) # --- Queue for display --- logging.debug( f"{log_prefix} Queueing SHOW_MAP command (Payload type: {payload_type})" ) put_queue(self._tkinter_queue, ("SHOW_MAP", image_pil), "tkinter", self._app) # def display_map(self, map_image_pil: Optional[ImageType]): """Instructs the MapDisplayWindow to show the provided map image and updates app state.""" # (Implementation unchanged) log_prefix = f"{self._log_prefix} Display" displayed_shape = None if self._map_display_window: try: self._map_display_window.show_map(map_image_pil) displayed_shape = self._map_display_window._last_displayed_shape if ( displayed_shape and displayed_shape[0] > 0 and displayed_shape[1] > 0 ): self._app_state.map_current_shape_px = displayed_shape else: self._app_state.map_current_shape_px = None self._update_app_status_after_map_load(map_image_pil is not None) except Exception as e: logging.exception( f"{log_prefix} Error calling MapDisplayWindow.show_map:" ) self._app_state.map_current_shape_px = None else: self._app_state.map_current_shape_px = None def _update_app_status_after_map_load(self, success: bool): """Updates the main application status after the initial map load attempt.""" # (Implementation unchanged) log_prefix = f"{self._log_prefix} Status Update" try: statusbar_ref = getattr(self._app, "statusbar", None) if statusbar_ref and "Loading initial map" in statusbar_ref.cget("text"): status_msg = "Error Loading Map" if success: # Determine correct 'Ready' status mode = self._app.state.test_mode_active is_local = config.USE_LOCAL_IMAGES is_net = not is_local and not mode if mode: status_msg = "Ready (Test Mode)" elif is_local: status_msg = "Ready (Local Mode)" elif is_net: status_msg = ( f"Listening UDP {self._app.local_ip}:{self._app.local_port}" if self._app.udp_socket else "Error: No Socket" ) else: status_msg = "Ready" self._app.set_status(status_msg) except Exception as e: logging.warning( f"{log_prefix} Error checking/updating app status after map load: {e}" ) def shutdown(self): """Cleans up map-related resources.""" # (Implementation unchanged) log_prefix = f"{self._log_prefix} Shutdown" if self._map_display_window: try: self._map_display_window.destroy_window() except Exception as e: logging.exception(f"{log_prefix} Error destroying MapDisplayWindow:") self._map_display_window = None logging.debug(f"{log_prefix} Map integration shutdown complete.") # --- Geo Calculation Helpers --- # (_calculate_sar_corners_geo, _geo_coords_to_map_pixels - Unchanged) def _calculate_sar_corners_geo( self, geo_info: Dict[str, Any] ) -> Optional[List[Tuple[float, float]]]: # (Implementation unchanged from previous version) log_prefix = f"{self._log_prefix} SAR Corners Geo" if not self._geod: return None try: lat_rad, lon_rad = geo_info["lat"], geo_info["lon"] orient_rad = geo_info["orientation"] ref_x, ref_y = geo_info["ref_x"], geo_info["ref_y"] scale_x, scale_y = geo_info["scale_x"], geo_info["scale_y"] width, height = geo_info["width_px"], geo_info["height_px"] calc_orient_rad = -orient_rad if not (scale_x > 0 and scale_y > 0 and width > 0 and height > 0): return None corners_pixel = [ (0 - ref_x, ref_y - 0), (width - 1 - ref_x, ref_y - 0), (width - 1 - ref_x, ref_y - (height - 1)), (0 - ref_x, ref_y - (height - 1)), ] corners_meters = [(dx * scale_x, dy * scale_y) for dx, dy in corners_pixel] corners_meters_rotated = [] if abs(calc_orient_rad) > 1e-6: cos_o, sin_o = math.cos(calc_orient_rad), math.sin(calc_orient_rad) corners_meters_rotated = [ (dx * cos_o - dy * sin_o, dx * sin_o + dy * cos_o) for dx, dy in corners_meters ] else: corners_meters_rotated = corners_meters corners_geo_deg = [] lon_deg, lat_deg = math.degrees(lon_rad), math.degrees(lat_rad) for dx_m, dy_m in corners_meters_rotated: dist = math.hypot(dx_m, dy_m) az = math.degrees(math.atan2(dx_m, dy_m)) endlon, endlat, _ = self._geod.fwd(lon_deg, lat_deg, az, dist) corners_geo_deg.append((endlon, endlat)) return corners_geo_deg if len(corners_geo_deg) == 4 else None except KeyError as ke: logging.error(f"{log_prefix} Missing key: {ke}") return None except Exception as e: logging.exception(f"{log_prefix} Error:") return None def _geo_coords_to_map_pixels( self, coords_deg: List[Tuple[float, float]], map_bounds: Tuple[float, float, float, float], map_tile_ranges: Tuple[Tuple[int, int], Tuple[int, int]], zoom: int, stitched_map_shape: Tuple[int, int], ) -> Optional[List[Tuple[int, int]]]: # (Implementation unchanged from previous version) log_prefix = f"{self._log_prefix} Geo to Pixel" if mercantile is None: return None if ( not stitched_map_shape or stitched_map_shape[0] <= 0 or stitched_map_shape[1] <= 0 ): return None pixel_coords = [] map_h, map_w = stitched_map_shape map_west, map_south, map_east, map_north = map_bounds try: map_ul_x, map_ul_y = mercantile.xy(map_west, map_north) map_lr_x, map_lr_y = mercantile.xy(map_east, map_south) map_w_merc = map_lr_x - map_ul_x map_h_merc = map_ul_y - map_lr_y if map_w_merc <= 0 or map_h_merc <= 0: return None for lon, lat in coords_deg: pt_x, pt_y = mercantile.xy(lon, lat) rel_x = pt_x - map_ul_x rel_y = map_ul_y - pt_y px = int(round((rel_x / map_w_merc) * map_w)) py = int(round((rel_y / map_h_merc) * map_h)) pixel_coords.append( (max(0, min(px, map_w - 1)), max(0, min(py, map_h - 1))) ) return pixel_coords except Exception as e: logging.exception(f"{log_prefix} Error:") return None # --- Pixel to Geo Conversion --- def get_geo_coords_from_map_pixel( self, pixel_x: int, pixel_y: int ) -> Optional[Tuple[float, float]]: """Converts pixel coordinates from the map display window to geographic coordinates (lat, lon degrees).""" # (Implementation unchanged from previous version) log_prefix = f"{self._log_prefix} Pixel to Geo" map_bounds_deg = self._app_state.map_current_bounds_deg map_shape_px = self._app_state.map_current_shape_px if ( map_bounds_deg is None or map_shape_px is None or map_shape_px[0] <= 0 or map_shape_px[1] <= 0 or mercantile is None ): return None map_h, map_w = map_shape_px map_west, map_south, map_east, map_north = map_bounds_deg try: if not (0 <= pixel_x < map_w and 0 <= pixel_y < map_h): return None # Outside bounds map_ul_x, map_ul_y = mercantile.xy(map_west, map_north) map_lr_x, map_lr_y = mercantile.xy(map_east, map_south) map_w_merc = map_lr_x - map_ul_x map_h_merc = map_ul_y - map_lr_y if map_w_merc <= 0 or map_h_merc <= 0: return None rel_x = pixel_x / map_w rel_y = pixel_y / map_h pt_x = map_ul_x + (rel_x * map_w_merc) pt_y = map_ul_y - (rel_y * map_h_merc) lon_deg, lat_deg = mercantile.lnglat(pt_x, pt_y) return (lat_deg, lon_deg) except Exception as e: logging.exception(f"{log_prefix} Error:") return None # --- Map Saving --- def save_map_view_to_file( self, directory: Optional[str] = None, filename: Optional[str] = None ) -> bool: """Saves the last composed map view (from AppState.last_composed_map_pil) to a file.""" # (Implementation unchanged from previous version) log_prefix = f"{self._log_prefix} SaveMap" image_to_save = self._app_state.last_composed_map_pil if image_to_save is None: self._app.set_status("Error: No map view to save.") return False if Image is None: self._app.set_status("Error: Pillow library missing.") return False try: save_dir_path: Path if directory: save_dir_path = Path(directory) else: app_path = ( Path(sys.executable).parent if getattr(sys, "frozen", False) else Path(sys.argv[0]).parent ) save_dir_path = app_path / self.DEFAULT_SAVE_DIRECTORY save_dir_path.mkdir(parents=True, exist_ok=True) save_filename = ( (Path(filename).stem + ".png") if filename else f"map_view_{datetime.datetime.now():%Y%m%d_%H%M%S}.png" ) full_save_path = save_dir_path / save_filename if ImageDraw and ImageFont: # Add timestamp try: draw = ImageDraw.Draw(image_to_save) font = ( ImageFont.truetype("arial.ttf", 14) if ImageFont else ImageFont.load_default() ) text = f"Saved: {datetime.datetime.now():%Y-%m-%d %H:%M:%S}" draw.text( (10, image_to_save.height - 20), text, font=font, fill=(0, 0, 0) ) except Exception as draw_err: logging.warning( f"{log_prefix} Could not draw text on saved image: {draw_err}" ) image_to_save.save(full_save_path, "PNG") logging.info(f"{log_prefix} Map view saved to: {full_save_path}") self._app.set_status(f"Map view saved: {save_filename}") return True except OSError as e: logging.error( f"{log_prefix} OS Error saving map view to '{full_save_path}': {e}" ) self._app.set_status("Error: Failed to save map view (OS Error).") return False except Exception as e: logging.exception(f"{log_prefix} Unexpected error saving map view:") self._app.set_status("Error: Failed to save map view (Exception).") return False # --- Scale Bar Drawing --- def _draw_scale_bar( self, image_pil: Optional[ImageType], latitude_deg: float, zoom: int ) -> Optional[ImageType]: """Draws a simple scale bar onto the map image (PIL). Returns modified image or original on error.""" log_prefix = f"{self._log_prefix} ScaleBar" if image_pil is None or not (Image and cv2): return image_pil # Return original if no image or dependencies missing try: meters_per_pixel = calculate_meters_per_pixel(latitude_deg, zoom) if meters_per_pixel is None or meters_per_pixel <= 0: logging.warning( f"{log_prefix} Invalid meters/pixel ({meters_per_pixel}). Skipping scale bar." ) return image_pil # Cannot calculate scale img_w, img_h = image_pil.size # Target bar width relative to image width target_bar_px = max( 50, min(150, img_w // 5) ) # Aim for ~1/5 width, 50-150px # Predefined "nice" distances in kilometers possible_distances_km = [ 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, ] best_dist_km = None # Initialize best distance found min_diff = float("inf") # Initialize minimum difference # Find the "nicest" distance that results in a bar length close to the target for dist_km in possible_distances_km: pixels_for_dist = (dist_km * 1000.0) / meters_per_pixel # Ensure the calculated bar length is reasonable (e.g., >= 10 pixels) if pixels_for_dist >= 10: # --- >>> START OF FIX <<< --- # Calculate difference *after* ensuring pixels_for_dist is valid diff = abs(pixels_for_dist - target_bar_px) # If this difference is smaller than the current minimum, update best distance if diff < min_diff: min_diff = diff best_dist_km = dist_km # --- >>> END OF FIX <<< --- # If no suitable distance was found (e.g., very high zoom, small image) if best_dist_km is None: logging.warning( f"{log_prefix} Could not find suitable scale bar distance for zoom {zoom} and image width {img_w}. Skipping." ) return image_pil # Return original image # Calculate final bar length in pixels for the best distance found scale_distance_km = best_dist_km scale_distance_meters = scale_distance_km * 1000.0 scale_bar_pixels = int(round(scale_distance_meters / meters_per_pixel)) # Final check if calculated bar is somehow too small (redundant due to pixels>=10 check) if scale_bar_pixels < 10: return image_pil # Should not happen now # --- Draw the scale bar using OpenCV --- map_cv = cv2.cvtColor(np.array(image_pil), cv2.COLOR_RGB2BGR) # Define bar parameters (position, size, color) bar_x_start = 15 # X position of the start of the bar bar_y = map_cv.shape[0] - 25 # Y position (near bottom) bar_tick_height = 6 # Height of the end ticks bar_thickness = 2 text_gap = 5 # Gap between bar and text label text_color = (0, 0, 0) # Black bar_color = (0, 0, 0) # Black # Draw main horizontal line cv2.line( map_cv, (bar_x_start, bar_y), (bar_x_start + scale_bar_pixels, bar_y), bar_color, bar_thickness, ) # Draw vertical ticks at ends cv2.line( map_cv, (bar_x_start, bar_y - bar_tick_height // 2), (bar_x_start, bar_y + bar_tick_height // 2), bar_color, bar_thickness, ) cv2.line( map_cv, (bar_x_start + scale_bar_pixels, bar_y - bar_tick_height // 2), (bar_x_start + scale_bar_pixels, bar_y + bar_tick_height // 2), bar_color, bar_thickness, ) # Prepare label text (e.g., "10 km", "500 m") if scale_distance_km >= 1: label = ( f"{int(scale_distance_km)} km" if scale_distance_km.is_integer() else f"{scale_distance_km:.1f} km" ) else: label = f"{int(scale_distance_meters)} m" # Add text label above the bar font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 0.5 font_thickness = 1 (text_w, text_h), _ = cv2.getTextSize( label, font, font_scale, font_thickness ) # Calculate centered X position for the text text_x = max(5, bar_x_start + (scale_bar_pixels // 2) - (text_w // 2)) # Calculate Y position above the bar text_y = bar_y - bar_tick_height // 2 - text_gap # Draw text cv2.putText( map_cv, label, (text_x, text_y), font, font_scale, text_color, font_thickness, cv2.LINE_AA, ) # Convert back to PIL image image_pil_with_scale = Image.fromarray( cv2.cvtColor(map_cv, cv2.COLOR_BGR2RGB) ) logging.debug( f"{log_prefix} Scale bar drawn ({label}, {scale_bar_pixels}px)." ) return image_pil_with_scale except Exception as e: logging.exception(f"{log_prefix} Error drawing scale bar:") return image_pil # Return original image on any unexpected error def _get_fallback_map(self) -> Optional[ImageType]: """Returns the last known base map or None.""" # Helper to reduce code duplication in error handling return self._app_state.last_map_image_pil # --- END OF FILE map_integration.py ---