# --- START OF FILE map_integration.py --- # map_integration.py """ THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. Manages map integration functionalities, including service interaction, tile fetching/caching, display window management, and overlay updates. Acts as an intermediary between the main application and map-specific modules. """ # Standard library imports import logging import threading import queue # For type hinting import math from typing import Optional, Dict, Any, Tuple, List # Third-party imports import numpy as np try: from PIL import Image, ImageOps # ImageOps might be useful except ImportError: Image = None ImageOps = None try: import mercantile except ImportError: mercantile = None try: import pyproj except ImportError: pyproj = None # OpenCV is needed for warping and blending import cv2 # Local application imports import config from app_state import AppState from utils import put_queue # Map specific modules that this manager orchestrates from map_services import get_map_service, BaseMapService from map_manager import MapTileManager from map_utils import ( get_bounding_box_from_center_size, get_tile_ranges_for_bbox, MapCalculationError, calculate_meters_per_pixel, ) from map_display import MapDisplayWindow # Import image processing function needed for overlay from image_processing import apply_color_palette # Forward declaration for type hinting App instance from typing import TYPE_CHECKING if TYPE_CHECKING: # Use the new main app class name if renamed from ControlPanel import ControlPanelApp class MapIntegrationManager: """Orchestrates map services, tile management, and display including SAR overlay.""" def __init__( self, app_state: AppState, tkinter_queue: queue.Queue, app: "ControlPanelApp", # Use correct type hint for main app class map_x: int, map_y: int, ): """ Initializes the MapIntegrationManager. Args: app_state (AppState): Reference to the shared application state. tkinter_queue (queue.Queue): Queue for sending display commands to the main thread. app (ControlPanelApp): Reference to the main application instance. map_x (int): Initial X position for the map window. map_y (int): Initial Y position for the map window. Raises: ImportError: If required libraries (Pillow, mercantile, pyproj) are missing. ValueError: If the configured map service cannot be loaded. pyproj.exceptions.CRSError: If the geodetic calculator fails to initialize. """ self._log_prefix = "[MapIntegrationManager]" logging.debug(f"{self._log_prefix} Initializing...") self._app_state: AppState = app_state self._tkinter_queue: queue.Queue = tkinter_queue self._app: "ControlPanelApp" = app # Store App instance with correct type hint # --- Dependency Checks --- if Image is None: raise ImportError("Pillow library not found") if mercantile is None: raise ImportError("mercantile library not found") if pyproj is None: raise ImportError("pyproj library not found") # --- Initialize Attributes --- self._map_service: Optional[BaseMapService] = None self._map_tile_manager: Optional[MapTileManager] = None self._map_display_window: Optional[MapDisplayWindow] = None self._map_initial_display_thread: Optional[threading.Thread] = None self._geod: Optional[pyproj.Geod] = None try: # --- Geodetic Calculator Initialization --- self._geod = pyproj.Geod(ellps="WGS84") logging.debug(f"{self._log_prefix} pyproj Geod object initialized (WGS84).") # --- Initialize Other Map Components --- # 1. Get Map Service service_name = getattr(config, "MAP_SERVICE_PROVIDER", "osm") api_key = getattr(config, "MAP_API_KEY", None) self._map_service = get_map_service(service_name, api_key) if not self._map_service: raise ValueError(f"Failed to get map service '{service_name}'.") logging.debug( f"{self._log_prefix} Map service '{self._map_service.name}' loaded." ) # 2. Create Tile Manager cache_dir = getattr(config, "MAP_CACHE_DIRECTORY", None) online_fetch = getattr(config, "ENABLE_ONLINE_MAP_FETCHING", None) self._map_tile_manager = MapTileManager( map_service=self._map_service, cache_base_dir=cache_dir, enable_online_fetching=online_fetch, ) logging.debug(f"{self._log_prefix} MapTileManager created.") # 3. Create Map Display Window Manager (Passing App instance) self._map_display_window = MapDisplayWindow( app=self._app, # Pass the app instance window_name="Map Overlay", x_pos=map_x, y_pos=map_y, ) logging.debug( f"{self._log_prefix} MapDisplayWindow created at ({map_x},{map_y})." ) # 4. Trigger Initial Map Display in Background Thread logging.debug(f"{self._log_prefix} Starting initial map display thread...") self._app.set_status("Loading initial map...") # Set initial status self._map_initial_display_thread = threading.Thread( target=self._display_initial_map_area_thread, name="InitialMapDisplayThread", daemon=True, ) self._map_initial_display_thread.start() except (ImportError, ValueError, pyproj.exceptions.CRSError) as init_err: logging.critical(f"{self._log_prefix} Initialization failed: {init_err}") self._geod = None self._map_service = None self._map_tile_manager = None self._map_display_window = None raise # Re-raise critical errors except Exception as e: logging.exception( f"{self._log_prefix} Unexpected error during initialization:" ) self._geod = None self._map_service = None self._map_tile_manager = None self._map_display_window = None raise # Re-raise critical errors def _display_initial_map_area_thread(self): """ (Runs in background thread) Calculates initial map area and queues for display. Skips if default lat/lon in config are (0,0). Stores the resulting PIL image in AppState before queueing. """ log_prefix = f"{self._log_prefix} InitialMap" map_image_pil: Optional[Image.Image] = None # Initialize # Check skip condition based on config defaults if config.SAR_CENTER_LAT == 0.0 and config.SAR_CENTER_LON == 0.0: logging.debug( f"{log_prefix} Initial map display skipped (config default 0,0). Waiting for valid GeoInfo." ) # Update status is handled by _update_app_status_after_map_load called by display_map # Send None to potentially create placeholder window self._app_state.last_map_image_pil = None # Ensure state is cleared put_queue(self._tkinter_queue, ("SHOW_MAP", None), "tkinter", self._app) return # Proceed with calculation if default lat/lon are non-zero logging.debug( f"{log_prefix} Calculating initial map area based on non-zero config defaults..." ) if not (self._map_tile_manager and self._map_display_window): logging.error( f"{log_prefix} Map components not initialized. Aborting thread." ) self._app_state.last_map_image_pil = None put_queue(self._tkinter_queue, ("SHOW_MAP", None), "tkinter", self._app) return if self._app_state.shutting_down: logging.debug(f"{log_prefix} Shutdown detected. Aborting.") return try: zoom = config.DEFAULT_MAP_ZOOM_LEVEL logging.debug(f"{log_prefix} Using default zoom level: {zoom}") bbox = get_bounding_box_from_center_size( config.SAR_CENTER_LAT, config.SAR_CENTER_LON, config.SAR_IMAGE_SIZE_KM ) if bbox is None: raise MapCalculationError("Failed to calculate initial bounding box.") tile_ranges = get_tile_ranges_for_bbox(bbox, zoom) if tile_ranges is None: raise MapCalculationError("Failed to calculate initial tile ranges.") if self._app_state.shutting_down: logging.debug(f"{log_prefix} Shutdown detected before stitching.") return logging.debug( f"{log_prefix} Stitching initial map tiles (Zoom: {zoom}, X: {tile_ranges[0]}, Y: {tile_ranges[1]})..." ) map_image_pil = self._map_tile_manager.stitch_map_image( zoom, tile_ranges[0], tile_ranges[1] ) if self._app_state.shutting_down: logging.debug(f"{log_prefix} Shutdown detected after stitching.") return if map_image_pil: logging.debug(f"{log_prefix} Initial map area stitched successfully.") center_lat_deg = config.SAR_CENTER_LAT # Add scale bar map_image_pil = self._draw_scale_bar( map_image_pil, center_lat_deg, zoom ) else: logging.error(f"{log_prefix} Failed to stitch initial map area.") except (ImportError, MapCalculationError) as e: logging.error(f"{log_prefix} Calculation error: {e}") map_image_pil = None except Exception as e: logging.exception(f"{log_prefix} Unexpected error calculating initial map:") map_image_pil = None finally: # --- Store final image in AppState and Queue --- if not self._app_state.shutting_down: logging.debug( f"{log_prefix} Storing initial map in AppState before queueing." ) # Store copy in state self._app_state.last_map_image_pil = ( map_image_pil.copy() if map_image_pil else None ) logging.debug( f"{log_prefix} Queueing SHOW_MAP command for initial map." ) put_queue( self._tkinter_queue, ("SHOW_MAP", map_image_pil), "tkinter", self._app, ) logging.debug(f"{log_prefix} Initial map display thread finished.") # --- >>> START OF MODIFIED FUNCTION <<< --- def update_map_overlay( self, sar_normalized_uint8: np.ndarray, geo_info_radians: Dict[str, Any] ): """ Calculates the map overlay. If enabled in AppState, it stitches the map, processes the SAR image (B/C LUT, Palette), warps it to the geographic footprint, blends it with the map using alpha from AppState, adds a scale bar, and queues the result. Otherwise, it draws the red bounding box. Stores the final PIL image in AppState before queueing. """ log_prefix = f"{self._log_prefix} Map Update" map_image_final: Optional[Image.Image] = None # Final image to queue stitched_map_pil: Optional[Image.Image] = None # Base stitched map # --- Prerequisite Checks --- if self._app_state.shutting_down: logging.debug(f"{log_prefix} Skipping: Shutdown.") return if self._app_state.test_mode_active: logging.debug(f"{log_prefix} Skipping: Test Mode.") return if not (self._map_tile_manager and self._map_display_window and self._geod): logging.warning(f"{log_prefix} Skipping: Map components not ready.") return if not geo_info_radians or not geo_info_radians.get("valid", False): logging.warning(f"{log_prefix} Skipping: Invalid GeoInfo.") return # Check libraries needed for this function if Image is None or mercantile is None or pyproj is None or cv2 is None: logging.error( f"{log_prefix} Skipping: Missing required libraries (Pillow/mercantile/pyproj/cv2)." ) return # --- Check SAR data only if overlay is enabled --- overlay_enabled = self._app_state.map_sar_overlay_enabled if overlay_enabled and ( sar_normalized_uint8 is None or sar_normalized_uint8.size == 0 ): logging.warning(f"{log_prefix} Skipping SAR overlay: Missing SAR data.") # Disable overlay just for this frame if SAR data is missing overlay_enabled = False logging.debug( f"{log_prefix} Starting map overlay update (SAR Overlay Enabled: {overlay_enabled})..." ) try: # --- Calculate Common Parameters --- center_lat_deg = math.degrees(geo_info_radians.get("lat", 0.0)) center_lon_deg = math.degrees(geo_info_radians.get("lon", 0.0)) scale_x = geo_info_radians.get("scale_x", 0.0) width_px = geo_info_radians.get("width_px", 0) size_km = config.SAR_IMAGE_SIZE_KM # Default fallback if scale_x > 0 and width_px > 0: size_km = (scale_x * width_px) / 1000.0 else: logging.error( f"{log_prefix} Invalid scale/width in GeoInfo. Using fallback size." ) zoom = config.DEFAULT_MAP_ZOOM_LEVEL # --- Fetch and Stitch Base Map --- logging.debug( f"{log_prefix} Calculating fetch BBox (Center: {center_lat_deg:.4f},{center_lon_deg:.4f}, Size: {size_km*1.2:.1f}km)" ) # Fetch slightly larger area than SAR image size fetch_bbox = get_bounding_box_from_center_size( center_lat_deg, center_lon_deg, size_km * 1.2 ) if fetch_bbox is None: raise MapCalculationError("Tile BBox calculation failed.") logging.debug(f"{log_prefix} Calculating tile ranges (Zoom {zoom})...") tile_ranges = get_tile_ranges_for_bbox(fetch_bbox, zoom) if tile_ranges is None: raise MapCalculationError("Tile range calculation failed.") if self._app_state.shutting_down: logging.debug(f"{log_prefix} Shutdown detected before stitching.") return logging.debug( f"{log_prefix} Stitching base map tiles (Zoom: {zoom}, X: {tile_ranges[0]}, Y: {tile_ranges[1]})..." ) stitched_map_pil = self._map_tile_manager.stitch_map_image( zoom, tile_ranges[0], tile_ranges[1] ) if stitched_map_pil is None: raise MapCalculationError("Failed to stitch base map image.") logging.debug( f"{log_prefix} Base map stitched successfully (PIL Size: {stitched_map_pil.size})." ) if self._app_state.shutting_down: logging.debug(f"{log_prefix} Shutdown detected after stitching.") return # --- Calculate SAR Footprint Pixels on Map --- logging.debug( f"{log_prefix} Calculating SAR corner geographic coordinates..." ) sar_corners_deg = self._calculate_sar_corners_geo(geo_info_radians) if sar_corners_deg is None: raise MapCalculationError("SAR corner geo calculation failed.") logging.debug( f"{log_prefix} Converting SAR corners to map pixel coordinates..." ) # Need top-left tile coords and map size for conversion top_left_tile = mercantile.Tile( x=tile_ranges[0][0], y=tile_ranges[1][0], z=zoom ) map_display_bounds = mercantile.bounds(top_left_tile) map_shape_yx = stitched_map_pil.size[ ::-1 ] # Get (height, width) from PIL size sar_corners_pixels_map = self._geo_coords_to_map_pixels( coords_deg=sar_corners_deg, map_bounds=map_display_bounds, map_tile_ranges=tile_ranges, zoom=zoom, stitched_map_shape=map_shape_yx, # Pass (height, width) ) if sar_corners_pixels_map is None: raise MapCalculationError("SAR corner pixel conversion failed.") # --- Prepare Base Map Image (NumPy BGR) --- # Convert PIL map to NumPy BGR for OpenCV operations map_cv_bgr = cv2.cvtColor(np.array(stitched_map_pil), cv2.COLOR_RGB2BGR) map_h, map_w = map_cv_bgr.shape[:2] # --- Apply SAR Overlay OR Draw Bounding Box --- if overlay_enabled: logging.debug(f"{log_prefix} Processing SAR image for overlay...") # 1. Process SAR Image (using current state) sar_img_to_warp = None try: # Get current parameters from AppState bc_lut = self._app_state.brightness_contrast_lut palette = self._app_state.sar_palette if bc_lut is None: raise ValueError("B/C LUT is None in AppState") # Apply B/C LUT to the normalized SAR data logging.debug(f"{log_prefix} Applying B/C LUT to SAR data...") processed_sar = cv2.LUT(sar_normalized_uint8, bc_lut) # Apply color palette if not grayscale if palette != "GRAY": logging.debug( f"{log_prefix} Applying Palette '{palette}' to SAR data..." ) processed_sar = apply_color_palette(processed_sar, palette) else: # Ensure BGR format even for grayscale for consistent warping/blending if processed_sar.ndim == 2: processed_sar = cv2.cvtColor( processed_sar, cv2.COLOR_GRAY2BGR ) # Store the BGR uint8 processed image ready for warping sar_img_to_warp = processed_sar logging.debug( f"{log_prefix} SAR processed for warp (Shape: {sar_img_to_warp.shape})." ) except Exception as sar_proc_err: logging.exception( f"{log_prefix} Error processing SAR image for overlay:" ) # Fallback: disable overlay for this frame if processing fails overlay_enabled = False # 2. Warp SAR Image (if processing succeeded) if overlay_enabled and sar_img_to_warp is not None: logging.debug( f"{log_prefix} Warping SAR image onto map perspective..." ) try: sar_h, sar_w = sar_img_to_warp.shape[:2] # Source points: corners of the original SAR image (0,0 top-left) pts_sar = np.float32( [ [0, 0], [sar_w - 1, 0], [sar_w - 1, sar_h - 1], [0, sar_h - 1], ] ) # Destination points: calculated SAR corners on the map pixel grid pts_map = np.float32(sar_corners_pixels_map) # Calculate the perspective transformation matrix matrix = cv2.getPerspectiveTransform(pts_sar, pts_map) logging.debug( f"{log_prefix} Perspective transform matrix calculated." ) # Warp the processed SAR image onto a canvas the size of the map warped_sar = cv2.warpPerspective( src=sar_img_to_warp, M=matrix, dsize=(map_w, map_h), # Output size = map size flags=cv2.INTER_LINEAR, # Or INTER_NEAREST if preferred borderMode=cv2.BORDER_TRANSPARENT, # Avoid black border ) logging.debug(f"{log_prefix} SAR image warped successfully.") # 3. Alpha Blend overlay_alpha = self._app_state.map_sar_overlay_alpha map_weight = 1.0 - overlay_alpha logging.debug( f"{log_prefix} Blending map ({map_weight:.2f}) and warped SAR ({overlay_alpha:.2f})..." ) # Blend using OpenCV's addWeighted function blended_cv = cv2.addWeighted( src1=map_cv_bgr, alpha=map_weight, src2=warped_sar, beta=overlay_alpha, gamma=0.0, # No scalar added ) logging.debug(f"{log_prefix} Alpha blending complete.") # Final image is the blended one map_cv_final = blended_cv except cv2.error as warp_err: logging.exception( f"{log_prefix} OpenCV error during SAR warp/blend:" ) # Fallback to red box on error overlay_enabled = False except Exception as warp_generic_err: logging.exception( f"{log_prefix} Unexpected error during SAR warp/blend:" ) # Fallback to red box on error overlay_enabled = False # --- Draw Red Box (If overlay disabled or failed) --- if not overlay_enabled: logging.debug( f"{log_prefix} Drawing SAR bounding box (overlay disabled or failed)." ) try: # Reshape points for polylines pts = np.array(sar_corners_pixels_map, np.int32).reshape((-1, 1, 2)) # Draw red polyline on the base map BGR image cv2.polylines( map_cv_bgr, [pts], isClosed=True, color=(0, 0, 255), thickness=2 ) # Final image is the map with the drawn box map_cv_final = map_cv_bgr except Exception as draw_err: logging.exception(f"{log_prefix} Error drawing SAR bounding box:") # Fallback to just the base map if drawing fails map_cv_final = map_cv_bgr # --- Convert Final CV Image back to PIL --- if map_cv_final is not None: # Convert BGR back to RGB for PIL map_image_final = Image.fromarray( cv2.cvtColor(map_cv_final, cv2.COLOR_BGR2RGB) ) logging.debug( f"{log_prefix} Final map image prepared for display (PIL)." ) else: # This case should ideally not happen if map_cv_bgr was valid map_image_final = stitched_map_pil # Fallback to base stitched map logging.error(f"{log_prefix} map_cv_final was None unexpectedly.") # --- Add Scale Bar (applies to the final PIL image) --- if map_image_final: current_center_lat_deg = math.degrees(geo_info_radians["lat"]) map_image_final = self._draw_scale_bar( map_image_final, current_center_lat_deg, zoom ) except MapCalculationError as e: logging.error(f"{log_prefix} Map overlay calculation failed: {e}") map_image_final = stitched_map_pil # Use base map on known calc error # Attempt to add scale bar even to base map if it exists if map_image_final: default_lat = config.SAR_CENTER_LAT default_zoom = config.DEFAULT_MAP_ZOOM_LEVEL map_image_final = self._draw_scale_bar( map_image_final, default_lat, default_zoom ) except Exception as e: logging.exception( f"{log_prefix} Unexpected error during map overlay update:" ) map_image_final = stitched_map_pil # Use base map on unexpected error # Attempt to add scale bar even to base map if it exists if map_image_final: default_lat = config.SAR_CENTER_LAT default_zoom = config.DEFAULT_MAP_ZOOM_LEVEL map_image_final = self._draw_scale_bar( map_image_final, default_lat, default_zoom ) finally: # --- Store final image in AppState and Queue --- if not self._app_state.shutting_down: payload_type = type(map_image_final) payload_size = getattr(map_image_final, "size", "N/A") logging.debug( f"{log_prefix} Storing final map overlay in AppState. Type: {payload_type}, Size: {payload_size}" ) # Store copy in state self._app_state.last_map_image_pil = ( map_image_final.copy() if map_image_final else None ) logging.debug( f"{log_prefix} Queueing SHOW_MAP command for updated map overlay." ) put_queue( self._tkinter_queue, ("SHOW_MAP", map_image_final), "tkinter", self._app, ) else: logging.debug(f"{log_prefix} Skipping queue put due to shutdown.") # --- >>> END OF MODIFIED FUNCTION <<< --- def display_map(self, map_image_pil: Optional[Image.Image]): """Instructs the MapDisplayWindow to show the provided map image.""" log_prefix = f"{self._log_prefix} Display" if self._map_display_window: logging.debug(f"{log_prefix} Calling MapDisplayWindow.show_map...") try: self._map_display_window.show_map(map_image_pil) # Update app status only after initial load attempt completes self._update_app_status_after_map_load(map_image_pil is not None) except Exception as e: logging.exception( f"{log_prefix} Error calling MapDisplayWindow.show_map:" ) else: logging.warning(f"{log_prefix} Map display window not available.") def _update_app_status_after_map_load(self, success: bool): """Updates the main application status after the initial map load attempt.""" log_prefix = f"{self._log_prefix} Status Update" try: # Check if status bar still shows loading message statusbar_ref = getattr(self._app, "statusbar", None) if statusbar_ref and "Loading initial map" in statusbar_ref.cget("text"): status_msg = "Error Loading Map" # Default if failed if success: # Determine correct 'Ready' status based on current app mode current_mode = self._app.state.test_mode_active is_local = config.USE_LOCAL_IMAGES is_network = not is_local and not current_mode if current_mode: status_msg = "Ready (Test Mode)" elif is_local: status_msg = "Ready (Local Mode)" elif is_network: socket_ok = self._app.udp_socket is not None status_msg = ( f"Listening UDP {self._app.local_ip}:{self._app.local_port}" if socket_ok else "Error: No Socket" ) else: # Fallback status_msg = "Ready" logging.debug( f"{log_prefix} Initial map load finished (Success: {success}). " f"Setting App status to: '{status_msg}'" ) # Use the app's set_status method self._app.set_status(status_msg) except Exception as e: logging.warning( f"{log_prefix} Error checking/updating app status after map load: {e}" ) def shutdown(self): """Cleans up map-related resources.""" log_prefix = f"{self._log_prefix} Shutdown" logging.debug(f"{log_prefix} Shutting down map integration components...") if self._map_display_window: logging.debug(f"{log_prefix} Requesting MapDisplayWindow destroy...") try: self._map_display_window.destroy_window() except Exception as e: logging.exception(f"{log_prefix} Error destroying MapDisplayWindow:") self._map_display_window = None logging.debug(f"{log_prefix} Map integration shutdown complete.") def _calculate_sar_corners_geo( self, geo_info: Dict[str, Any] ) -> Optional[List[Tuple[float, float]]]: """Calculates the geographic coordinates (lon, lat degrees) of the four SAR image corners.""" log_prefix = f"{self._log_prefix} SAR Corners Geo" logging.debug(f"{log_prefix} Calculating SAR corner geographic coordinates...") if not self._geod: logging.error(f"{log_prefix} Geodetic calculator not initialized.") return None try: # Extract data, handle potential KeyErrors center_lat_rad = geo_info["lat"] center_lon_rad = geo_info["lon"] orient_rad = geo_info["orientation"] ref_x = geo_info["ref_x"] ref_y = geo_info["ref_y"] scale_x = geo_info["scale_x"] scale_y = geo_info["scale_y"] width = geo_info["width_px"] height = geo_info["height_px"] # Inverse angle for calculation calc_orient_rad = -orient_rad if not (scale_x > 0 and scale_y > 0 and width > 0 and height > 0): logging.error(f"{log_prefix} Invalid scale/dimensions.") return None # Pixel coordinates relative to reference center corners_pixel = [ (0 - ref_x, ref_y - 0), # Top-Left (width - 1 - ref_x, ref_y - 0), # Top-Right (width - 1 - ref_x, ref_y - (height - 1)), # Bottom-Right (0 - ref_x, ref_y - (height - 1)), # Bottom-Left ] # Convert pixel offsets to meter offsets corners_meters = [(dx * scale_x, dy * scale_y) for dx, dy in corners_pixel] # Apply rotation to meter offsets if angle is significant corners_meters_rotated = [] if abs(calc_orient_rad) > 1e-6: cos_o = math.cos(calc_orient_rad) sin_o = math.sin(calc_orient_rad) for dx_m, dy_m in corners_meters: rot_dx = dx_m * cos_o - dy_m * sin_o rot_dy = dx_m * sin_o + dy_m * cos_o corners_meters_rotated.append((rot_dx, rot_dy)) logging.debug( f"{log_prefix} Applied rotation ({math.degrees(calc_orient_rad):.2f} deg) to meter offsets." ) else: corners_meters_rotated = corners_meters logging.debug(f"{log_prefix} Skipping rotation (angle near zero).") # Calculate final geographic coordinates using forward geodetic problem sar_corners_geo_deg = [] center_lon_deg = math.degrees(center_lon_rad) center_lat_deg = math.degrees(center_lat_rad) for dx_m_rot, dy_m_rot in corners_meters_rotated: # Distance from center to corner distance_m = math.hypot(dx_m_rot, dy_m_rot) # Azimuth (bearing) from center to corner (atan2 handles quadrants) azimuth_rad = math.atan2( dx_m_rot, dy_m_rot ) # atan2(x, y) for angle from North azimuth_deg = math.degrees(azimuth_rad) # Use geod.fwd to calculate endpoint coordinates endlon, endlat, _ = self._geod.fwd( lons=center_lon_deg, lats=center_lat_deg, az=azimuth_deg, dist=distance_m, ) sar_corners_geo_deg.append((endlon, endlat)) # Append (lon, lat) tuple logging.debug( f"{log_prefix} Corner: Dist={distance_m:.1f}m, Az={azimuth_deg:.2f}deg " f"-> Lon={endlon:.6f}, Lat={endlat:.6f}" ) if len(sar_corners_geo_deg) == 4: logging.debug( f"{log_prefix} Successfully calculated 4 SAR corner coordinates." ) return sar_corners_geo_deg else: logging.error(f"{log_prefix} Failed to calculate all 4 corners.") return None except KeyError as ke: logging.error(f"{log_prefix} Missing key in geo_info: {ke}") return None except Exception as e: logging.exception(f"{log_prefix} Error calculating SAR corners:") return None def _geo_coords_to_map_pixels( self, coords_deg: List[Tuple[float, float]], map_bounds: mercantile.LngLatBbox, map_tile_ranges: Tuple[Tuple[int, int], Tuple[int, int]], zoom: int, stitched_map_shape: Tuple[int, int], # Expect (height, width) ) -> Optional[List[Tuple[int, int]]]: """Converts geographic coordinates (lon, lat degrees) to pixel coordinates relative to the stitched map image.""" log_prefix = f"{self._log_prefix} Geo to Pixel" logging.debug( f"{log_prefix} Converting {len(coords_deg)} geo coordinates to map pixels..." ) if mercantile is None: logging.error(f"{log_prefix} Mercantile library not available.") return None if ( not stitched_map_shape or stitched_map_shape[0] <= 0 or stitched_map_shape[1] <= 0 ): logging.error(f"{log_prefix} Invalid map shape: {stitched_map_shape}") return None pixel_coords = [] map_height_px, map_width_px = stitched_map_shape tile_size = self._map_service.tile_size if self._map_service else 256 try: # Get tile info for the top-left corner of the stitched map min_tile_x = map_tile_ranges[0][0] min_tile_y = map_tile_ranges[1][0] # Get mercator bounds of the top-left tile tl_tile_mercator_bounds = mercantile.xy_bounds(min_tile_x, min_tile_y, zoom) # Map origin is the top-left corner in Mercator coordinates map_origin_x_mercator = tl_tile_mercator_bounds.left map_origin_y_mercator = ( tl_tile_mercator_bounds.top ) # Top has higher Y value # Calculate total map span in Mercator units max_tile_x = map_tile_ranges[0][1] max_tile_y = map_tile_ranges[1][1] br_tile_mercator_bounds = mercantile.xy_bounds(max_tile_x, max_tile_y, zoom) map_total_width_mercator = ( br_tile_mercator_bounds.right - map_origin_x_mercator ) # Height is Top Y - Bottom Y map_total_height_mercator = ( map_origin_y_mercator - br_tile_mercator_bounds.bottom ) if map_total_width_mercator <= 0 or map_total_height_mercator <= 0: logging.error(f"{log_prefix} Invalid map span in Mercator units.") return None # Convert each geographic coordinate to pixel coordinate for lon, lat in coords_deg: # Convert geographic coords to mercator coords point_x_mercator, point_y_mercator = mercantile.xy(lon, lat) # Calculate position relative to map origin in mercator units relative_x_mercator = point_x_mercator - map_origin_x_mercator # Y difference is inverted because map origin Y is top, point Y decreases going down relative_y_mercator = map_origin_y_mercator - point_y_mercator # Convert relative mercator position to pixel position using ratio pixel_x = int( round( (relative_x_mercator / map_total_width_mercator) * map_width_px ) ) pixel_y = int( round( (relative_y_mercator / map_total_height_mercator) * map_height_px ) ) # Clamp pixel coordinates to be within the map boundaries pixel_x_clamped = max(0, min(pixel_x, map_width_px - 1)) pixel_y_clamped = max(0, min(pixel_y, map_height_px - 1)) if pixel_x != pixel_x_clamped or pixel_y != pixel_y_clamped: logging.debug( f"{log_prefix} Clamped pixel coords for ({lon:.4f},{lat:.4f}): " f"Orig=({pixel_x},{pixel_y}), Clamped=({pixel_x_clamped},{pixel_y_clamped})" ) pixel_coords.append((pixel_x_clamped, pixel_y_clamped)) logging.debug( f"{log_prefix} Converted ({lon:.4f},{lat:.4f}) -> Pixel({pixel_x_clamped},{pixel_y_clamped})" ) logging.debug( f"{log_prefix} Successfully converted {len(pixel_coords)} coordinates." ) return pixel_coords except Exception as e: logging.exception(f"{log_prefix} Error converting geo to map pixels:") return None def _draw_scale_bar( self, image_pil: Image.Image, latitude_deg: float, zoom: int ) -> Optional[Image.Image]: """Draws a simple scale bar onto the map image.""" log_prefix = f"{self._log_prefix} ScaleBar" if image_pil is None: return None # Return None if input is None try: meters_per_pixel = calculate_meters_per_pixel(latitude_deg, zoom) if meters_per_pixel is None or meters_per_pixel <= 0: logging.warning( f"{log_prefix} Invalid meters/pixel ({meters_per_pixel}). Skipping scale bar." ) return image_pil # Return original image if scale cannot be calculated img_w, img_h = image_pil.size # --- Calculate reasonable scale bar size --- # Aim for roughly 1/4 to 1/6 of image width, min 50px, max 150px target_bar_px = max(50, min(150, img_w // 4)) # Find a "nice" distance (1, 2, 5, 10, etc.) that results in a bar length close to target_bar_px possible_distances_km = [ 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, ] # Expanded range for different zooms best_dist_km = 1 min_diff = float("inf") for dist_km in possible_distances_km: pixels = (dist_km * 1000.0) / meters_per_pixel # Prioritize lengths close to target, but ensure minimum length if pixels >= 10: # Ensure bar is at least 10 pixels diff = abs(pixels - target_bar_px) if diff < min_diff: min_diff = diff best_dist_km = dist_km scale_distance_km = best_dist_km scale_distance_meters = scale_distance_km * 1000.0 scale_bar_pixels = int(round(scale_distance_meters / meters_per_pixel)) # Final check if calculated bar is too small if scale_bar_pixels < 10: logging.warning( f"{log_prefix} Calculated scale bar too small ({scale_bar_pixels}px). Skipping." ) return image_pil # --- Draw the scale bar using OpenCV --- map_cv = cv2.cvtColor(np.array(image_pil), cv2.COLOR_RGB2BGR) h, w = map_cv.shape[:2] # Position near bottom-left bar_x_start = 15 bar_y = h - 20 bar_height = 6 # Ticks height bar_thickness = 2 text_gap = 5 color = (0, 0, 0) # Black # Draw main horizontal line cv2.line( img=map_cv, pt1=(bar_x_start, bar_y), pt2=(bar_x_start + scale_bar_pixels, bar_y), color=color, thickness=bar_thickness, ) # Draw vertical ticks at ends cv2.line( img=map_cv, pt1=(bar_x_start, bar_y - bar_height // 2), pt2=(bar_x_start, bar_y + bar_height // 2), color=color, thickness=bar_thickness, ) cv2.line( img=map_cv, pt1=(bar_x_start + scale_bar_pixels, bar_y - bar_height // 2), pt2=(bar_x_start + scale_bar_pixels, bar_y + bar_height // 2), color=color, thickness=bar_thickness, ) # Prepare label text label = "" if scale_distance_km >= 1: # Use integer format if possible, else one decimal place label = f"{int(scale_distance_km) if scale_distance_km.is_integer() else scale_distance_km:.1f} km" else: label = f"{int(scale_distance_meters)} m" # Add text label above the bar font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 0.5 font_thickness = 1 # Get text size to center it (text_w, text_h), _ = cv2.getTextSize( label, font, font_scale, font_thickness ) text_x = bar_x_start + (scale_bar_pixels // 2) - (text_w // 2) text_y = bar_y - bar_height // 2 - text_gap # Position above the bar # Ensure text doesn't go off-screen left text_x = max(5, text_x) # Draw text cv2.putText( img=map_cv, text=label, org=(text_x, text_y), fontFace=font, fontScale=font_scale, color=color, thickness=font_thickness, lineType=cv2.LINE_AA, ) # Convert back to PIL image image_pil_with_scale = Image.fromarray( cv2.cvtColor(map_cv, cv2.COLOR_BGR2RGB) ) logging.debug( f"{log_prefix} Scale bar drawn ({label}, {scale_bar_pixels}px)." ) return image_pil_with_scale except Exception as e: logging.exception(f"{log_prefix} Error drawing scale bar:") return image_pil # Return original image on error # --- END OF FILE map_integration.py ---