SXXXXXXX_ControlPanel/map_integration.py
VALLONGOL cd66dab8db add go to google map and google earth button
add "x" red for pointing mouse on sar and map images
2025-04-15 12:25:00 +02:00

1143 lines
52 KiB
Python

# --- START OF FILE map_integration.py ---
# map_integration.py
"""
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED.
Manages map integration functionalities, including service interaction,
tile fetching/caching, display window management, SAR overlay application (with alpha
blending and user-defined shifting), coordinate conversion, and map saving.
Acts as an intermediary between the main application and map-specific modules.
Implements masked blending for SAR overlay and rapid recomposition for alpha changes.
"""
# Standard library imports
import logging
import threading
import queue
import math
import os
import datetime
from typing import Optional, Dict, Any, Tuple, List
from pathlib import Path
# Third-party imports
import numpy as np
try:
from PIL import Image, ImageDraw, ImageFont
ImageType = Image.Image
except ImportError:
Image = None
ImageDraw = None
ImageFont = None
ImageType = None # type: ignore
try:
import mercantile
except ImportError:
mercantile = None
try:
import pyproj
except ImportError:
pyproj = None
# OpenCV is needed for warping and blending
import cv2
# Local application imports
import config
from app_state import AppState
from utils import put_queue, decimal_to_dms
# Map specific modules
from map_services import get_map_service, BaseMapService
from map_manager import MapTileManager
from map_utils import (
get_bounding_box_from_center_size,
get_tile_ranges_for_bbox,
MapCalculationError,
calculate_meters_per_pixel,
)
from map_display import MapDisplayWindow
# Import image processing function needed for overlay
from image_processing import apply_color_palette
# Forward declaration for type hinting App instance
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ControlPanel import ControlPanelApp
class MapIntegrationManager:
"""Orchestrates map services, tile management, and display including SAR overlay."""
DEFAULT_SAVE_DIRECTORY = "saved_map_views"
def __init__(
self,
app_state: AppState,
tkinter_queue: queue.Queue,
app: "ControlPanelApp",
map_x: int,
map_y: int,
):
"""Initializes the MapIntegrationManager."""
self._log_prefix = "[MapIntegrationManager]"
logging.debug(f"{self._log_prefix} Initializing...")
self._app_state: AppState = app_state
self._tkinter_queue: queue.Queue = tkinter_queue
self._app: "ControlPanelApp" = app
# Dependency Checks
if Image is None:
raise ImportError("Pillow library not found")
if mercantile is None:
raise ImportError("mercantile library not found")
if pyproj is None:
raise ImportError("pyproj library not found")
if ImageDraw is None or ImageFont is None:
logging.warning(f"{self._log_prefix} Pillow ImageDraw/ImageFont not found.")
# Attributes
self._map_service: Optional[BaseMapService] = None
self._map_tile_manager: Optional[MapTileManager] = None
self._map_display_window: Optional[MapDisplayWindow] = None
self._map_initial_display_thread: Optional[threading.Thread] = None
self._geod: Optional[pyproj.Geod] = None
self._map_update_lock = threading.Lock()
try:
# Geodetic Calculator
self._geod = pyproj.Geod(ellps="WGS84")
# Map Service
service_name = getattr(config, "MAP_SERVICE_PROVIDER", "osm")
api_key = getattr(config, "MAP_API_KEY", None)
self._map_service = get_map_service(service_name, api_key)
if not self._map_service:
raise ValueError(f"Failed to get map service '{service_name}'.")
# Tile Manager
cache_dir = getattr(config, "MAP_CACHE_DIRECTORY", None)
online_fetch = getattr(config, "ENABLE_ONLINE_MAP_FETCHING", None)
self._map_tile_manager = MapTileManager(
self._map_service, cache_dir, online_fetch
)
# Map Display Window
self._map_display_window = MapDisplayWindow(
self._app, "Map Overlay", map_x, map_y
)
# Initial Map Display Thread
self._app.set_status("Loading initial map...")
self._map_initial_display_thread = threading.Thread(
target=self._display_initial_map_area_thread,
name="InitialMapDisplayThread",
daemon=True,
)
self._map_initial_display_thread.start()
except Exception as init_err:
logging.critical(
f"{self._log_prefix} Initialization failed: {init_err}", exc_info=True
)
self._geod = None
self._map_service = None
self._map_tile_manager = None
self._map_display_window = None
raise
def _display_initial_map_area_thread(self):
"""(Runs in background thread) Calculates initial map area and queues for display."""
# (Implementation unchanged from previous version)
log_prefix = f"{self._log_prefix} InitialMap"
base_map_image_pil: Optional[ImageType] = None
map_bounds_deg: Optional[Tuple[float, float, float, float]] = None
zoom: Optional[int] = None
if config.SAR_CENTER_LAT == 0.0 and config.SAR_CENTER_LON == 0.0:
self._update_app_state_map_context(None, None, None)
self._queue_map_for_display(None)
return
if not (self._map_tile_manager and self._map_display_window and self._geod):
self._update_app_state_map_context(None, None, None)
self._queue_map_for_display(None)
return
if self._app_state.shutting_down:
return
try:
zoom = config.DEFAULT_MAP_ZOOM_LEVEL
center_lat_deg = config.SAR_CENTER_LAT
center_lon_deg = config.SAR_CENTER_LON
size_km = config.SAR_IMAGE_SIZE_KM
fetch_bbox_deg = get_bounding_box_from_center_size(
center_lat_deg, center_lon_deg, size_km * 1.2
)
if fetch_bbox_deg is None:
raise MapCalculationError("BBox calc failed.")
tile_ranges = get_tile_ranges_for_bbox(fetch_bbox_deg, zoom)
if tile_ranges is None:
raise MapCalculationError("Tile range calc failed.")
if self._app_state.shutting_down:
return
map_bounds_deg = self._get_bounds_for_tile_range(zoom, tile_ranges)
if map_bounds_deg is None:
raise MapCalculationError("Map bounds calc failed.")
base_map_image_pil = self._map_tile_manager.stitch_map_image(
zoom, tile_ranges[0], tile_ranges[1]
)
if self._app_state.shutting_down:
return
if base_map_image_pil:
base_map_image_pil = self._draw_scale_bar(
base_map_image_pil, center_lat_deg, zoom
)
except Exception as e:
logging.exception(f"{log_prefix} Error calculating initial map:")
base_map_image_pil = None
map_bounds_deg = None
zoom = None
finally:
if not self._app_state.shutting_down:
self._update_app_state_map_context(
base_map_image_pil, map_bounds_deg, zoom
)
self._queue_map_for_display(
base_map_image_pil
) # Initial map is also the composed map initially
logging.debug(f"{log_prefix} Initial map display thread finished.")
# --- >>> START OF MODIFIED FUNCTION <<< ---
def update_map_overlay(
self,
sar_normalized_uint8: Optional[np.ndarray],
geo_info_radians: Optional[Dict[str, Any]],
):
"""
Calculates the map overlay based on current state. Fetches/stitches map,
applies SAR shift, processes SAR image, warps/blends overlay using masked alpha,
draws bounding box, draws map click marker, adds scale bar, updates AppState
(base map, context, processed SAR, warp info, corners, composed map),
and queues the result for display.
Uses a lock to prevent concurrent updates.
"""
log_prefix = f"{self._log_prefix} Map Update"
if not self._map_update_lock.acquire(blocking=False):
logging.debug(f"{log_prefix} Map update already in progress. Skipping.")
return
# Initialize variables for this update cycle outside the try block
# So they can be referenced in the final 'finally' block for AppState update
base_map_pil: Optional[ImageType] = None
final_composed_pil: Optional[ImageType] = None
map_bounds_deg: Optional[Tuple[float, float, float, float]] = None
zoom: Optional[int] = None
processed_sar_overlay: Optional[np.ndarray] = None # Store processed SAR for AppState
sar_warp_matrix: Optional[np.ndarray] = None # Store warp matrix for AppState
sar_corners_pixels: Optional[List[Tuple[int, int]]] = None # Store corners for AppState
try:
# Prerequisite Checks
if self._app_state.shutting_down or self._app_state.test_mode_active:
logging.debug(f"{log_prefix} Skipping: Shutdown or Test Mode active.")
return # Return early if shutting down or in test mode
if not (self._map_tile_manager and self._map_display_window and self._geod):
logging.warning(f"{log_prefix} Skipping: Map components not ready.")
return # Return early if components missing
if not geo_info_radians or not geo_info_radians.get("valid", False):
logging.warning(f"{log_prefix} Skipping: Invalid GeoInfo.")
return # Return early if GeoInfo invalid
if Image is None or mercantile is None or pyproj is None or cv2 is None:
logging.error(f"{log_prefix} Skipping: Required libraries missing (PIL/mercantile/pyproj/cv2).")
return # Return early if libs missing
# State Variables
overlay_enabled = self._app_state.map_sar_overlay_enabled
overlay_alpha = self._app_state.map_sar_overlay_alpha
lat_shift_deg = self._app_state.sar_lat_shift_deg
lon_shift_deg = self._app_state.sar_lon_shift_deg
# SAR Data Check (only if overlay enabled)
if overlay_enabled and (
sar_normalized_uint8 is None or sar_normalized_uint8.size == 0
):
logging.warning(
f"{log_prefix} Disabling overlay for this frame: Missing SAR data."
)
overlay_enabled = False # Disable locally for this frame
logging.debug(
f"{log_prefix} Starting (Overlay:{overlay_enabled}, Alpha:{overlay_alpha:.2f}, Shift:{lat_shift_deg:.6f},{lon_shift_deg:.6f})..."
)
# Use a nested try...except specifically for map/overlay calculations
try:
# Apply Shift to Center Coordinates
center_lat_rad = geo_info_radians.get("lat", 0.0)
center_lon_rad = geo_info_radians.get("lon", 0.0)
shifted_lat_deg = max(
-90.0, min(90.0, math.degrees(center_lat_rad) + lat_shift_deg)
)
shifted_lon_deg = (
(math.degrees(center_lon_rad) + lon_shift_deg + 180) % 360
) - 180
# Calculate Common Parameters (using SHIFTED center)
scale_x = geo_info_radians.get("scale_x", 0.0)
width_px = geo_info_radians.get("width_px", 0)
size_km = (
(scale_x * width_px / 1000.0)
if (scale_x > 0 and width_px > 0)
else config.SAR_IMAGE_SIZE_KM
)
zoom = config.DEFAULT_MAP_ZOOM_LEVEL # Use configured zoom
# Fetch and Stitch Base Map
fetch_bbox_deg = get_bounding_box_from_center_size(
shifted_lat_deg, shifted_lon_deg, size_km * 1.2 # Fetch slightly larger area
)
if fetch_bbox_deg is None: raise MapCalculationError("Tile BBox calc failed.")
tile_ranges = get_tile_ranges_for_bbox(fetch_bbox_deg, zoom)
if tile_ranges is None: raise MapCalculationError("Tile range calc failed.")
if self._app_state.shutting_down: return # Check shutdown before long operation
map_bounds_deg = self._get_bounds_for_tile_range(zoom, tile_ranges)
if map_bounds_deg is None: raise MapCalculationError("Map bounds calc failed.")
base_map_pil = self._map_tile_manager.stitch_map_image(
zoom, tile_ranges[0], tile_ranges[1]
)
if base_map_pil is None: raise MapCalculationError("Base map stitch failed.")
if self._app_state.shutting_down: return # Check again after potential long operation
# Calculate SAR Footprint Pixels on Map
shifted_geo_info = geo_info_radians.copy()
shifted_geo_info["lat"] = math.radians(shifted_lat_deg)
shifted_geo_info["lon"] = math.radians(shifted_lon_deg)
sar_corners_deg = self._calculate_sar_corners_geo(shifted_geo_info)
if sar_corners_deg is None: raise MapCalculationError("SAR corner geo calc failed.")
map_shape_yx = base_map_pil.size[::-1] # PIL size is (W,H), need (H,W)
sar_corners_pixels = self._geo_coords_to_map_pixels(
sar_corners_deg, map_bounds_deg, tile_ranges, zoom, map_shape_yx
)
if sar_corners_pixels is None: raise MapCalculationError("SAR corner pixel conversion failed.")
# Prepare Base Map Image (NumPy BGR) for drawing/blending
map_cv_bgr = cv2.cvtColor(np.array(base_map_pil), cv2.COLOR_RGB2BGR)
map_h, map_w = map_cv_bgr.shape[:2]
display_cv_bgr = map_cv_bgr.copy() # Start with base map for final composition
# --- Apply SAR Overlay OR Draw Bounding Box ---
if overlay_enabled: # SAR data validity checked earlier
logging.debug(f"{log_prefix} Processing SAR image for overlay...")
processed_sar_overlay = self._process_sar_for_overlay(
sar_normalized_uint8
) # Store this for AppState cache
if processed_sar_overlay is not None:
logging.debug(f"{log_prefix} Warping SAR image onto map perspective...")
try:
sar_h, sar_w = processed_sar_overlay.shape[:2]
pts_sar = np.float32([[0, 0], [sar_w - 1, 0], [sar_w - 1, sar_h - 1], [0, sar_h - 1]])
pts_map = np.float32(sar_corners_pixels)
sar_warp_matrix = cv2.getPerspectiveTransform(pts_sar, pts_map) # Store this for AppState cache
# Warp the processed SAR image
warped_sar_bgr = cv2.warpPerspective(
src=processed_sar_overlay,
M=sar_warp_matrix,
dsize=(map_w, map_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0) # Use black border for mask generation
)
# Create Mask from warped SAR (non-black pixels)
gray_warped = cv2.cvtColor(warped_sar_bgr, cv2.COLOR_BGR2GRAY)
_, sar_mask = cv2.threshold(gray_warped, 1, 255, cv2.THRESH_BINARY)
# --- MASKED Alpha Blending ---
# Convert mask to float (0.0 to 1.0) and multiply by alpha
mask_alpha = (sar_mask.astype(np.float32) / 255.0) * overlay_alpha
# Expand mask_alpha to 3 channels
mask_alpha_3ch = cv2.cvtColor(mask_alpha, cv2.COLOR_GRAY2BGR)
logging.debug(f"{log_prefix} Blending map and warped SAR using mask (Alpha: {overlay_alpha:.2f})...")
# Blend: map * (1 - mask*alpha) + warped_sar * (mask*alpha)
term1 = map_cv_bgr.astype(np.float32) * (1.0 - mask_alpha_3ch)
term2 = warped_sar_bgr.astype(np.float32) * mask_alpha_3ch
blended_float = term1 + term2
display_cv_bgr = np.clip(blended_float, 0, 255).astype(np.uint8) # Final blended result
logging.debug(f"{log_prefix} Masked alpha blending complete.")
except cv2.error as warp_err:
logging.exception(f"{log_prefix} OpenCV error during SAR warp/blend:")
overlay_enabled = False # Fallback to drawing box
except Exception as warp_generic_err:
logging.exception(f"{log_prefix} Unexpected error during SAR warp/blend:")
overlay_enabled = False # Fallback to drawing box
else:
logging.warning(f"{log_prefix} SAR processing for overlay failed. Disabling overlay.")
overlay_enabled = False # Fallback to drawing box
# Draw Red Box (If overlay was disabled or failed during processing)
if not overlay_enabled:
logging.debug(f"{log_prefix} Drawing SAR bounding box...")
try:
pts = np.array(sar_corners_pixels, np.int32).reshape((-1, 1, 2))
cv2.polylines(display_cv_bgr, [pts], isClosed=True, color=(0, 0, 255), thickness=2)
except Exception as draw_err:
logging.exception(f"{log_prefix} Error drawing SAR bounding box:")
# --- >>> START OF MAP MARKER DRAWING <<< ---
# Draw Map Click Marker onto the composed BGR image BEFORE converting to PIL
map_click_coords = self._app_state.last_map_click_coords
if map_click_coords and isinstance(map_click_coords, tuple) and len(map_click_coords) == 2:
try:
marker_x, marker_y = map_click_coords
marker_color = (0, 0, 255) # BGR for Red
marker_type = cv2.MARKER_CROSS
marker_size = 15
marker_thickness = 2
logging.debug(f"{log_prefix} Drawing Map click marker at {map_click_coords}")
# Draw directly onto display_cv_bgr
cv2.drawMarker(display_cv_bgr, (marker_x, marker_y), marker_color, marker_type, marker_size, marker_thickness)
except Exception as draw_err:
logging.exception(f"{log_prefix} Error drawing Map marker: {draw_err}")
# --- >>> END OF MAP MARKER DRAWING <<< ---
# Convert Final CV Image back to PIL
final_composed_pil = Image.fromarray(
cv2.cvtColor(display_cv_bgr, cv2.COLOR_BGR2RGB)
)
# Add Scale Bar (to PIL image)
final_composed_pil = self._draw_scale_bar(
final_composed_pil, shifted_lat_deg, zoom
)
except MapCalculationError as e:
logging.error(f"{log_prefix} Map overlay calculation failed: {e}")
final_composed_pil = self._get_fallback_map() # Use last base map if possible
except Exception as e:
logging.exception(f"{log_prefix} Unexpected error during map overlay update:")
final_composed_pil = self._get_fallback_map() # Use last base map if possible
finally:
# --- Update AppState and Queue ---
# This block runs even if errors occurred during calculation
if not self._app_state.shutting_down:
# Update context (bounds, zoom, base map) - use values calculated in the 'try' block
self._update_app_state_map_context(
base_map_pil, # The stitched map before overlay/markers
map_bounds_deg,
zoom
)
# Update SAR overlay cache data
self._app_state.last_processed_sar_for_overlay = processed_sar_overlay.copy() if processed_sar_overlay is not None else None
self._app_state.last_sar_warp_matrix = sar_warp_matrix.copy() if sar_warp_matrix is not None else None
self._app_state.last_sar_corners_pixels_map = sar_corners_pixels.copy() if sar_corners_pixels is not None else None
# Queue the final composed image (or fallback) for display
# This also updates self._app_state.last_composed_map_pil
self._queue_map_for_display(final_composed_pil)
self._map_update_lock.release() # Ensure lock is released
logging.debug(f"{log_prefix} Map update finished.")
# Function modified: _recompose_map_overlay
def _recompose_map_overlay(self):
"""
Re-applies SAR overlay with current alpha/settings using cached data
and draws the last map click marker.
If cached data is insufficient, it logs a warning and exits.
Uses a lock to prevent concurrent updates.
"""
log_prefix = f"{self._log_prefix} RecomposeMap"
if not self._map_update_lock.acquire(blocking=False):
logging.debug(
f"{log_prefix} Map update/recomposition already in progress. Skipping."
)
return
final_recomposed_pil: Optional[ImageType] = None # Initialize here
try:
if self._app_state.shutting_down:
logging.debug(f"{log_prefix} Skipping: Shutdown detected.")
return
logging.debug(f"{log_prefix} Starting map recomposition...")
# --- Get cached data & current state ---
base_map_pil = self._app_state.last_map_image_pil
processed_sar = self._app_state.last_processed_sar_for_overlay
warp_matrix = self._app_state.last_sar_warp_matrix
sar_corner_pixels = self._app_state.last_sar_corners_pixels_map
map_click_coords = self._app_state.last_map_click_coords # Get marker coords
overlay_enabled = self._app_state.map_sar_overlay_enabled
overlay_alpha = self._app_state.map_sar_overlay_alpha
# Get lat/zoom from state for scale bar calculation (if possible)
latitude_deg = 0.0
zoom = None
current_geo = self._app_state.current_sar_geo_info
if current_geo and current_geo.get('valid'):
latitude_deg = math.degrees(current_geo.get('lat', 0.0))
# Apply shift to latitude if scale bar should reflect shifted center
latitude_deg += self._app_state.sar_lat_shift_deg
latitude_deg = max(-90.0, min(90.0, latitude_deg)) # Clamp after shift
zoom = self._app_state.map_current_zoom
# --- Validate necessary cached data for the CURRENT desired state ---
if base_map_pil is None:
logging.warning(f"{log_prefix} Cannot recompose: Base map image missing.")
return # Cannot proceed without base map
# Check data needed based on whether overlay is enabled or not
if overlay_enabled and (processed_sar is None or warp_matrix is None):
logging.error(
f"{log_prefix} Cannot recompose overlay: SAR cache data missing "
f"(ProcessedSAR:{processed_sar is not None}, WarpMatrix:{warp_matrix is not None})."
"Consider triggering a full update."
)
# Decide fallback: Trigger full update or just show base map?
# For now, just log and exit, letting the user trigger full update if needed.
return
elif not overlay_enabled and sar_corner_pixels is None:
# If overlay is off, we need corner pixels to draw the box
logging.warning(
f"{log_prefix} Cannot draw bounding box during recompose: Corner pixels missing."
"Showing base map without box."
)
# Fallback: Use base map directly, skip drawing box/overlay
final_recomposed_pil = base_map_pil.copy() # Start with base map
# Skip further processing, jump to scale bar / queuing later
# --- Recomposition Steps (only if not fallen back to base map already) ---
if final_recomposed_pil is None:
logging.debug(f"{log_prefix} Starting recomposition drawing...")
map_cv_bgr = cv2.cvtColor(np.array(base_map_pil), cv2.COLOR_RGB2BGR)
map_h, map_w = map_cv_bgr.shape[:2]
display_cv_bgr = map_cv_bgr.copy() # Start with base map
# Apply Overlay if enabled (we already checked cache validity)
if overlay_enabled:
logging.debug(f"{log_prefix} Recomposing SAR overlay (Alpha: {overlay_alpha:.2f})...")
try:
# Warp using cached matrix
warped_sar_bgr = cv2.warpPerspective(
processed_sar,
warp_matrix,
(map_w, map_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0),
)
# Create mask
warped_sar_gray = cv2.cvtColor(warped_sar_bgr, cv2.COLOR_BGR2GRAY)
_, sar_mask = cv2.threshold(warped_sar_gray, 1, 255, cv2.THRESH_BINARY)
# Blend using current alpha
mask_alpha = (sar_mask.astype(np.float32) / 255.0) * overlay_alpha
mask_alpha_3ch = cv2.cvtColor(mask_alpha, cv2.COLOR_GRAY2BGR)
term1 = map_cv_bgr.astype(np.float32) * (1.0 - mask_alpha_3ch)
term2 = warped_sar_bgr.astype(np.float32) * mask_alpha_3ch
blended_float = term1 + term2
display_cv_bgr = np.clip(blended_float, 0, 255).astype(np.uint8) # Update display image
except Exception as e:
logging.exception(f"{log_prefix} Error during SAR overlay recomposition warp/blend:")
# Fallback to base map if recompose fails catastrophically
display_cv_bgr = map_cv_bgr # Reset to base map
# Draw Bounding Box if overlay disabled (and corners exist)
elif sar_corner_pixels is not None:
logging.debug(f"{log_prefix} Recomposing SAR bounding box...")
try:
pts = np.array(sar_corner_pixels, np.int32).reshape((-1, 1, 2))
cv2.polylines(display_cv_bgr, [pts], isClosed=True, color=(0, 0, 255), thickness=2)
except Exception as e:
logging.exception(f"{log_prefix} Error recomposing bounding box:")
# display_cv_bgr remains base map
# --- >>> START OF MAP MARKER DRAWING <<< ---
# Draw Map Click Marker onto the recomposed BGR image
if map_click_coords and isinstance(map_click_coords, tuple) and len(map_click_coords) == 2:
try:
marker_x, marker_y = map_click_coords
marker_color = (0, 0, 255) # BGR for Red
marker_type = cv2.MARKER_CROSS
marker_size = 15
marker_thickness = 2
logging.debug(f"{log_prefix} Recomposing: Drawing Map click marker at {map_click_coords}")
# Draw directly onto display_cv_bgr
cv2.drawMarker(display_cv_bgr, (marker_x, marker_y), marker_color, marker_type, marker_size, marker_thickness)
except Exception as draw_err:
logging.exception(f"{log_prefix} Recomposing: Error drawing Map marker: {draw_err}")
# --- >>> END OF MAP MARKER DRAWING <<< ---
# Convert final OpenCV image back to PIL
final_recomposed_pil = Image.fromarray(
cv2.cvtColor(display_cv_bgr, cv2.COLOR_BGR2RGB)
)
# End of drawing logic if not fallen back to base map
# --- Add Scale Bar (if possible) ---
if final_recomposed_pil and zoom is not None and math.isfinite(latitude_deg):
# Add scale bar to the potentially modified final_recomposed_pil
final_recomposed_pil = self._draw_scale_bar(
final_recomposed_pil, latitude_deg, zoom
)
elif final_recomposed_pil: # Log if scale bar cannot be added
logging.warning(
f"{log_prefix} Cannot draw scale bar during recompose (missing zoom/lat context? Zoom:{zoom}, LatFinite:{math.isfinite(latitude_deg)})."
)
# --- Update state and Queue ---
# Queue the recomposed image (or the base map fallback) for display
# This call also updates AppState.last_composed_map_pil
self._queue_map_for_display(final_recomposed_pil)
except Exception as e:
logging.exception(f"{log_prefix} Unexpected error during map recomposition:")
# Attempt to queue base map as fallback if something unexpected happened
if not self._app_state.shutting_down:
base_map = self._app_state.last_map_image_pil
if base_map:
logging.warning(f"{log_prefix} Queuing base map as fallback due to error.")
self._queue_map_for_display(base_map)
else:
logging.error(f"{log_prefix} Cannot queue fallback, base map is also None.")
finally:
self._map_update_lock.release() # Release lock regardless of success/failure
logging.debug(f"{log_prefix} Map recomposition finished.")
def _process_sar_for_overlay(
self, sar_normalized_uint8: np.ndarray
) -> Optional[np.ndarray]:
"""Applies B/C LUT and Palette to SAR image for overlay."""
# (Implementation unchanged)
log_prefix = f"{self._log_prefix} ProcessSAROverlay"
try:
bc_lut = self._app_state.brightness_contrast_lut
palette = self._app_state.sar_palette
if bc_lut is None:
raise ValueError("B/C LUT is None")
processed_sar = cv2.LUT(sar_normalized_uint8, bc_lut)
if palette != "GRAY":
processed_sar = apply_color_palette(processed_sar, palette)
elif processed_sar.ndim == 2:
processed_sar = cv2.cvtColor(processed_sar, cv2.COLOR_GRAY2BGR)
return processed_sar
except Exception as e:
logging.exception(f"{log_prefix} Error:")
return None
def _get_bounds_for_tile_range(
self, zoom: int, tile_ranges: Tuple[Tuple[int, int], Tuple[int, int]]
) -> Optional[Tuple[float, float, float, float]]:
"""Calculate the precise geographic bounds covered by a given range of tiles."""
# (Implementation corrected in previous step)
log_prefix = f"{self._log_prefix} TileBounds"
if mercantile is None:
return None
try:
min_x, max_x = tile_ranges[0]
min_y, max_y = tile_ranges[1]
ul_bound = mercantile.bounds(min_x, min_y, zoom)
lr_bound = mercantile.bounds(max_x, max_y, zoom)
west = ul_bound[0]
east = lr_bound[2]
south = lr_bound[1]
north = ul_bound[3]
return (west, south, east, north)
except Exception as e:
logging.exception(
f"{log_prefix} Error calculating bounds for tile range: {e}"
)
return None
def _update_app_state_map_context(
self,
base_map_pil: Optional[ImageType],
bounds_deg: Optional[Tuple[float, float, float, float]],
zoom: Optional[int],
):
"""Safely updates map context information in AppState."""
# (Implementation unchanged)
log_prefix = f"{self._log_prefix} UpdateMapContext"
try:
self._app_state.last_map_image_pil = (
base_map_pil.copy() if base_map_pil else None
)
self._app_state.map_current_bounds_deg = bounds_deg
self._app_state.map_current_zoom = zoom
# Shape is updated later by display_map
logging.debug(
f"{log_prefix} Updated AppState: Bounds={bounds_deg}, Zoom={zoom}, BasePIL={'Set' if base_map_pil else 'None'}"
)
except Exception as e:
logging.exception(f"{log_prefix} Error updating AppState map context:")
# --- >>> START OF MODIFIED FUNCTION <<< ---
def _queue_map_for_display(self, image_pil: Optional[ImageType]):
"""Puts the final composed map image onto the Tkinter queue AND updates composed state."""
log_prefix = f"{self._log_prefix} QueueMap"
payload_type = type(image_pil).__name__ if image_pil else "None"
# --- Update composed map state ---
try:
self._app_state.last_composed_map_pil = (
image_pil.copy() if image_pil else None
)
logging.debug(
f"{log_prefix} Updated last_composed_map_pil in AppState (Type: {payload_type})."
)
except Exception as e:
logging.exception(
f"{log_prefix} Error updating last_composed_map_pil in AppState:"
)
# --- Queue for display ---
logging.debug(
f"{log_prefix} Queueing SHOW_MAP command (Payload type: {payload_type})"
)
put_queue(self._tkinter_queue, ("SHOW_MAP", image_pil), "tkinter", self._app)
#
def display_map(self, map_image_pil: Optional[ImageType]):
"""Instructs the MapDisplayWindow to show the provided map image and updates app state."""
# (Implementation unchanged)
log_prefix = f"{self._log_prefix} Display"
displayed_shape = None
if self._map_display_window:
try:
self._map_display_window.show_map(map_image_pil)
displayed_shape = self._map_display_window._last_displayed_shape
if (
displayed_shape
and displayed_shape[0] > 0
and displayed_shape[1] > 0
):
self._app_state.map_current_shape_px = displayed_shape
else:
self._app_state.map_current_shape_px = None
self._update_app_status_after_map_load(map_image_pil is not None)
except Exception as e:
logging.exception(
f"{log_prefix} Error calling MapDisplayWindow.show_map:"
)
self._app_state.map_current_shape_px = None
else:
self._app_state.map_current_shape_px = None
def _update_app_status_after_map_load(self, success: bool):
"""Updates the main application status after the initial map load attempt."""
# (Implementation unchanged)
log_prefix = f"{self._log_prefix} Status Update"
try:
statusbar_ref = getattr(self._app, "statusbar", None)
if statusbar_ref and "Loading initial map" in statusbar_ref.cget("text"):
status_msg = "Error Loading Map"
if success: # Determine correct 'Ready' status
mode = self._app.state.test_mode_active
is_local = config.USE_LOCAL_IMAGES
is_net = not is_local and not mode
if mode:
status_msg = "Ready (Test Mode)"
elif is_local:
status_msg = "Ready (Local Mode)"
elif is_net:
status_msg = (
f"Listening UDP {self._app.local_ip}:{self._app.local_port}"
if self._app.udp_socket
else "Error: No Socket"
)
else:
status_msg = "Ready"
self._app.set_status(status_msg)
except Exception as e:
logging.warning(
f"{log_prefix} Error checking/updating app status after map load: {e}"
)
def shutdown(self):
"""Cleans up map-related resources."""
# (Implementation unchanged)
log_prefix = f"{self._log_prefix} Shutdown"
if self._map_display_window:
try:
self._map_display_window.destroy_window()
except Exception as e:
logging.exception(f"{log_prefix} Error destroying MapDisplayWindow:")
self._map_display_window = None
logging.debug(f"{log_prefix} Map integration shutdown complete.")
# --- Geo Calculation Helpers ---
# (_calculate_sar_corners_geo, _geo_coords_to_map_pixels - Unchanged)
def _calculate_sar_corners_geo(
self, geo_info: Dict[str, Any]
) -> Optional[List[Tuple[float, float]]]:
# (Implementation unchanged from previous version)
log_prefix = f"{self._log_prefix} SAR Corners Geo"
if not self._geod:
return None
try:
lat_rad, lon_rad = geo_info["lat"], geo_info["lon"]
orient_rad = geo_info["orientation"]
ref_x, ref_y = geo_info["ref_x"], geo_info["ref_y"]
scale_x, scale_y = geo_info["scale_x"], geo_info["scale_y"]
width, height = geo_info["width_px"], geo_info["height_px"]
calc_orient_rad = -orient_rad
if not (scale_x > 0 and scale_y > 0 and width > 0 and height > 0):
return None
corners_pixel = [
(0 - ref_x, ref_y - 0),
(width - 1 - ref_x, ref_y - 0),
(width - 1 - ref_x, ref_y - (height - 1)),
(0 - ref_x, ref_y - (height - 1)),
]
corners_meters = [(dx * scale_x, dy * scale_y) for dx, dy in corners_pixel]
corners_meters_rotated = []
if abs(calc_orient_rad) > 1e-6:
cos_o, sin_o = math.cos(calc_orient_rad), math.sin(calc_orient_rad)
corners_meters_rotated = [
(dx * cos_o - dy * sin_o, dx * sin_o + dy * cos_o)
for dx, dy in corners_meters
]
else:
corners_meters_rotated = corners_meters
corners_geo_deg = []
lon_deg, lat_deg = math.degrees(lon_rad), math.degrees(lat_rad)
for dx_m, dy_m in corners_meters_rotated:
dist = math.hypot(dx_m, dy_m)
az = math.degrees(math.atan2(dx_m, dy_m))
endlon, endlat, _ = self._geod.fwd(lon_deg, lat_deg, az, dist)
corners_geo_deg.append((endlon, endlat))
return corners_geo_deg if len(corners_geo_deg) == 4 else None
except KeyError as ke:
logging.error(f"{log_prefix} Missing key: {ke}")
return None
except Exception as e:
logging.exception(f"{log_prefix} Error:")
return None
def _geo_coords_to_map_pixels(
self,
coords_deg: List[Tuple[float, float]],
map_bounds: Tuple[float, float, float, float],
map_tile_ranges: Tuple[Tuple[int, int], Tuple[int, int]],
zoom: int,
stitched_map_shape: Tuple[int, int],
) -> Optional[List[Tuple[int, int]]]:
# (Implementation unchanged from previous version)
log_prefix = f"{self._log_prefix} Geo to Pixel"
if mercantile is None:
return None
if (
not stitched_map_shape
or stitched_map_shape[0] <= 0
or stitched_map_shape[1] <= 0
):
return None
pixel_coords = []
map_h, map_w = stitched_map_shape
map_west, map_south, map_east, map_north = map_bounds
try:
map_ul_x, map_ul_y = mercantile.xy(map_west, map_north)
map_lr_x, map_lr_y = mercantile.xy(map_east, map_south)
map_w_merc = map_lr_x - map_ul_x
map_h_merc = map_ul_y - map_lr_y
if map_w_merc <= 0 or map_h_merc <= 0:
return None
for lon, lat in coords_deg:
pt_x, pt_y = mercantile.xy(lon, lat)
rel_x = pt_x - map_ul_x
rel_y = map_ul_y - pt_y
px = int(round((rel_x / map_w_merc) * map_w))
py = int(round((rel_y / map_h_merc) * map_h))
pixel_coords.append(
(max(0, min(px, map_w - 1)), max(0, min(py, map_h - 1)))
)
return pixel_coords
except Exception as e:
logging.exception(f"{log_prefix} Error:")
return None
# --- Pixel to Geo Conversion ---
def get_geo_coords_from_map_pixel(
self, pixel_x: int, pixel_y: int
) -> Optional[Tuple[float, float]]:
"""Converts pixel coordinates from the map display window to geographic coordinates (lat, lon degrees)."""
# (Implementation unchanged from previous version)
log_prefix = f"{self._log_prefix} Pixel to Geo"
map_bounds_deg = self._app_state.map_current_bounds_deg
map_shape_px = self._app_state.map_current_shape_px
if (
map_bounds_deg is None
or map_shape_px is None
or map_shape_px[0] <= 0
or map_shape_px[1] <= 0
or mercantile is None
):
return None
map_h, map_w = map_shape_px
map_west, map_south, map_east, map_north = map_bounds_deg
try:
if not (0 <= pixel_x < map_w and 0 <= pixel_y < map_h):
return None # Outside bounds
map_ul_x, map_ul_y = mercantile.xy(map_west, map_north)
map_lr_x, map_lr_y = mercantile.xy(map_east, map_south)
map_w_merc = map_lr_x - map_ul_x
map_h_merc = map_ul_y - map_lr_y
if map_w_merc <= 0 or map_h_merc <= 0:
return None
rel_x = pixel_x / map_w
rel_y = pixel_y / map_h
pt_x = map_ul_x + (rel_x * map_w_merc)
pt_y = map_ul_y - (rel_y * map_h_merc)
lon_deg, lat_deg = mercantile.lnglat(pt_x, pt_y)
return (lat_deg, lon_deg)
except Exception as e:
logging.exception(f"{log_prefix} Error:")
return None
# --- Map Saving ---
def save_map_view_to_file(
self, directory: Optional[str] = None, filename: Optional[str] = None
) -> bool:
"""Saves the last composed map view (from AppState.last_composed_map_pil) to a file."""
# (Implementation unchanged from previous version)
log_prefix = f"{self._log_prefix} SaveMap"
image_to_save = self._app_state.last_composed_map_pil
if image_to_save is None:
self._app.set_status("Error: No map view to save.")
return False
if Image is None:
self._app.set_status("Error: Pillow library missing.")
return False
try:
save_dir_path: Path
if directory:
save_dir_path = Path(directory)
else:
app_path = (
Path(sys.executable).parent
if getattr(sys, "frozen", False)
else Path(sys.argv[0]).parent
)
save_dir_path = app_path / self.DEFAULT_SAVE_DIRECTORY
save_dir_path.mkdir(parents=True, exist_ok=True)
save_filename = (
(Path(filename).stem + ".png")
if filename
else f"map_view_{datetime.datetime.now():%Y%m%d_%H%M%S}.png"
)
full_save_path = save_dir_path / save_filename
if ImageDraw and ImageFont: # Add timestamp
try:
draw = ImageDraw.Draw(image_to_save)
font = (
ImageFont.truetype("arial.ttf", 14)
if ImageFont
else ImageFont.load_default()
)
text = f"Saved: {datetime.datetime.now():%Y-%m-%d %H:%M:%S}"
draw.text(
(10, image_to_save.height - 20), text, font=font, fill=(0, 0, 0)
)
except Exception as draw_err:
logging.warning(
f"{log_prefix} Could not draw text on saved image: {draw_err}"
)
image_to_save.save(full_save_path, "PNG")
logging.info(f"{log_prefix} Map view saved to: {full_save_path}")
self._app.set_status(f"Map view saved: {save_filename}")
return True
except OSError as e:
logging.error(
f"{log_prefix} OS Error saving map view to '{full_save_path}': {e}"
)
self._app.set_status("Error: Failed to save map view (OS Error).")
return False
except Exception as e:
logging.exception(f"{log_prefix} Unexpected error saving map view:")
self._app.set_status("Error: Failed to save map view (Exception).")
return False
# --- Scale Bar Drawing ---
def _draw_scale_bar(
self, image_pil: Optional[ImageType], latitude_deg: float, zoom: int
) -> Optional[ImageType]:
"""Draws a simple scale bar onto the map image (PIL). Returns modified image or original on error."""
log_prefix = f"{self._log_prefix} ScaleBar"
if image_pil is None or not (Image and cv2):
return image_pil # Return original if no image or dependencies missing
try:
meters_per_pixel = calculate_meters_per_pixel(latitude_deg, zoom)
if meters_per_pixel is None or meters_per_pixel <= 0:
logging.warning(
f"{log_prefix} Invalid meters/pixel ({meters_per_pixel}). Skipping scale bar."
)
return image_pil # Cannot calculate scale
img_w, img_h = image_pil.size
# Target bar width relative to image width
target_bar_px = max(
50, min(150, img_w // 5)
) # Aim for ~1/5 width, 50-150px
# Predefined "nice" distances in kilometers
possible_distances_km = [
0.005,
0.01,
0.02,
0.05,
0.1,
0.2,
0.5,
1,
2,
5,
10,
20,
50,
100,
200,
500,
1000,
2000,
5000,
]
best_dist_km = None # Initialize best distance found
min_diff = float("inf") # Initialize minimum difference
# Find the "nicest" distance that results in a bar length close to the target
for dist_km in possible_distances_km:
pixels_for_dist = (dist_km * 1000.0) / meters_per_pixel
# Ensure the calculated bar length is reasonable (e.g., >= 10 pixels)
if pixels_for_dist >= 10:
# --- >>> START OF FIX <<< ---
# Calculate difference *after* ensuring pixels_for_dist is valid
diff = abs(pixels_for_dist - target_bar_px)
# If this difference is smaller than the current minimum, update best distance
if diff < min_diff:
min_diff = diff
best_dist_km = dist_km
# --- >>> END OF FIX <<< ---
# If no suitable distance was found (e.g., very high zoom, small image)
if best_dist_km is None:
logging.warning(
f"{log_prefix} Could not find suitable scale bar distance for zoom {zoom} and image width {img_w}. Skipping."
)
return image_pil # Return original image
# Calculate final bar length in pixels for the best distance found
scale_distance_km = best_dist_km
scale_distance_meters = scale_distance_km * 1000.0
scale_bar_pixels = int(round(scale_distance_meters / meters_per_pixel))
# Final check if calculated bar is somehow too small (redundant due to pixels>=10 check)
if scale_bar_pixels < 10:
return image_pil # Should not happen now
# --- Draw the scale bar using OpenCV ---
map_cv = cv2.cvtColor(np.array(image_pil), cv2.COLOR_RGB2BGR)
# Define bar parameters (position, size, color)
bar_x_start = 15 # X position of the start of the bar
bar_y = map_cv.shape[0] - 25 # Y position (near bottom)
bar_tick_height = 6 # Height of the end ticks
bar_thickness = 2
text_gap = 5 # Gap between bar and text label
text_color = (0, 0, 0) # Black
bar_color = (0, 0, 0) # Black
# Draw main horizontal line
cv2.line(
map_cv,
(bar_x_start, bar_y),
(bar_x_start + scale_bar_pixels, bar_y),
bar_color,
bar_thickness,
)
# Draw vertical ticks at ends
cv2.line(
map_cv,
(bar_x_start, bar_y - bar_tick_height // 2),
(bar_x_start, bar_y + bar_tick_height // 2),
bar_color,
bar_thickness,
)
cv2.line(
map_cv,
(bar_x_start + scale_bar_pixels, bar_y - bar_tick_height // 2),
(bar_x_start + scale_bar_pixels, bar_y + bar_tick_height // 2),
bar_color,
bar_thickness,
)
# Prepare label text (e.g., "10 km", "500 m")
if scale_distance_km >= 1:
label = (
f"{int(scale_distance_km)} km"
if scale_distance_km.is_integer()
else f"{scale_distance_km:.1f} km"
)
else:
label = f"{int(scale_distance_meters)} m"
# Add text label above the bar
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.5
font_thickness = 1
(text_w, text_h), _ = cv2.getTextSize(
label, font, font_scale, font_thickness
)
# Calculate centered X position for the text
text_x = max(5, bar_x_start + (scale_bar_pixels // 2) - (text_w // 2))
# Calculate Y position above the bar
text_y = bar_y - bar_tick_height // 2 - text_gap
# Draw text
cv2.putText(
map_cv,
label,
(text_x, text_y),
font,
font_scale,
text_color,
font_thickness,
cv2.LINE_AA,
)
# Convert back to PIL image
image_pil_with_scale = Image.fromarray(
cv2.cvtColor(map_cv, cv2.COLOR_BGR2RGB)
)
logging.debug(
f"{log_prefix} Scale bar drawn ({label}, {scale_bar_pixels}px)."
)
return image_pil_with_scale
except Exception as e:
logging.exception(f"{log_prefix} Error drawing scale bar:")
return image_pil # Return original image on any unexpected error
def _get_fallback_map(self) -> Optional[ImageType]:
"""Returns the last known base map or None."""
# Helper to reduce code duplication in error handling
return self._app_state.last_map_image_pil
# --- END OF FILE map_integration.py ---