1285 lines
56 KiB
Python
1285 lines
56 KiB
Python
# --- START OF FILE map_integration.py ---
|
|
|
|
# map_integration.py
|
|
"""
|
|
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
|
|
BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
|
|
PARTICULAR PURPOSE ARE DISCLAIMED.
|
|
|
|
Manages map integration functionalities, including service interaction,
|
|
tile fetching/caching, display window management, SAR overlay application (with alpha
|
|
blending and user-defined shifting), coordinate conversion, and map saving.
|
|
Acts as an intermediary between the main application and map-specific modules.
|
|
Implements masked blending for SAR overlay and rapid recomposition for alpha changes.
|
|
"""
|
|
|
|
# Standard library imports
|
|
import logging
|
|
import threading
|
|
import queue
|
|
import math
|
|
import os
|
|
import datetime
|
|
from typing import Optional, Dict, Any, Tuple, List
|
|
from pathlib import Path
|
|
|
|
# Third-party imports
|
|
import numpy as np
|
|
|
|
try:
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
|
|
ImageType = Image.Image
|
|
except ImportError:
|
|
Image = None
|
|
ImageDraw = None
|
|
ImageFont = None
|
|
ImageType = None # type: ignore
|
|
|
|
try:
|
|
import mercantile
|
|
except ImportError:
|
|
mercantile = None
|
|
try:
|
|
import pyproj
|
|
except ImportError:
|
|
pyproj = None
|
|
|
|
# OpenCV is needed for warping and blending
|
|
import cv2
|
|
|
|
# Local application imports
|
|
import config
|
|
from app_state import AppState
|
|
from utils import put_queue, decimal_to_dms
|
|
|
|
# Map specific modules
|
|
from map_services import get_map_service, BaseMapService
|
|
from map_manager import MapTileManager
|
|
from map_utils import (
|
|
get_bounding_box_from_center_size,
|
|
get_tile_ranges_for_bbox,
|
|
MapCalculationError,
|
|
calculate_meters_per_pixel,
|
|
)
|
|
from map_display import MapDisplayWindow
|
|
|
|
# Import image processing function needed for overlay
|
|
from image_processing import apply_color_palette
|
|
|
|
# Forward declaration for type hinting App instance
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from ControlPanel import ControlPanelApp
|
|
|
|
|
|
class MapIntegrationManager:
|
|
"""Orchestrates map services, tile management, and display including SAR overlay."""
|
|
|
|
DEFAULT_SAVE_DIRECTORY = "saved_map_views"
|
|
|
|
def __init__(
|
|
self,
|
|
app_state: AppState,
|
|
tkinter_queue: queue.Queue,
|
|
app: "ControlPanelApp",
|
|
map_x: int,
|
|
map_y: int,
|
|
):
|
|
"""Initializes the MapIntegrationManager."""
|
|
self._log_prefix = "[MapIntegrationManager]"
|
|
logging.debug(f"{self._log_prefix} Initializing...")
|
|
|
|
self._app_state: AppState = app_state
|
|
self._tkinter_queue: queue.Queue = tkinter_queue
|
|
self._app: "ControlPanelApp" = app
|
|
|
|
# Dependency Checks
|
|
if Image is None:
|
|
raise ImportError("Pillow library not found")
|
|
if mercantile is None:
|
|
raise ImportError("mercantile library not found")
|
|
if pyproj is None:
|
|
raise ImportError("pyproj library not found")
|
|
if ImageDraw is None or ImageFont is None:
|
|
logging.warning(f"{self._log_prefix} Pillow ImageDraw/ImageFont not found.")
|
|
|
|
# Attributes
|
|
self._map_service: Optional[BaseMapService] = None
|
|
self._map_tile_manager: Optional[MapTileManager] = None
|
|
self._map_display_window: Optional[MapDisplayWindow] = None
|
|
self._map_initial_display_thread: Optional[threading.Thread] = None
|
|
self._geod: Optional[pyproj.Geod] = None
|
|
self._map_update_lock = threading.Lock()
|
|
|
|
try:
|
|
# Geodetic Calculator
|
|
self._geod = pyproj.Geod(ellps="WGS84")
|
|
# Map Service
|
|
service_name = getattr(config, "MAP_SERVICE_PROVIDER", "osm")
|
|
api_key = getattr(config, "MAP_API_KEY", None)
|
|
self._map_service = get_map_service(service_name, api_key)
|
|
if not self._map_service:
|
|
raise ValueError(f"Failed to get map service '{service_name}'.")
|
|
# Tile Manager
|
|
cache_dir = getattr(config, "MAP_CACHE_DIRECTORY", None)
|
|
online_fetch = getattr(config, "ENABLE_ONLINE_MAP_FETCHING", None)
|
|
self._map_tile_manager = MapTileManager(
|
|
self._map_service, cache_dir, online_fetch
|
|
)
|
|
# Map Display Window
|
|
self._map_display_window = MapDisplayWindow(
|
|
self._app, "Map Overlay", map_x, map_y
|
|
)
|
|
# Initial Map Display Thread
|
|
self._app.set_status("Loading initial map...")
|
|
self._map_initial_display_thread = threading.Thread(
|
|
target=self._display_initial_map_area_thread,
|
|
name="InitialMapDisplayThread",
|
|
daemon=True,
|
|
)
|
|
self._map_initial_display_thread.start()
|
|
except Exception as init_err:
|
|
logging.critical(
|
|
f"{self._log_prefix} Initialization failed: {init_err}", exc_info=True
|
|
)
|
|
self._geod = None
|
|
self._map_service = None
|
|
self._map_tile_manager = None
|
|
self._map_display_window = None
|
|
raise
|
|
|
|
def _display_initial_map_area_thread(self):
|
|
"""(Runs in background thread) Calculates initial map area and queues for display."""
|
|
# (Implementation unchanged from previous version)
|
|
log_prefix = f"{self._log_prefix} InitialMap"
|
|
base_map_image_pil: Optional[ImageType] = None
|
|
map_bounds_deg: Optional[Tuple[float, float, float, float]] = None
|
|
zoom: Optional[int] = None
|
|
if config.SAR_CENTER_LAT == 0.0 and config.SAR_CENTER_LON == 0.0:
|
|
self._update_app_state_map_context(None, None, None)
|
|
self._queue_map_for_display(None)
|
|
return
|
|
if not (self._map_tile_manager and self._map_display_window and self._geod):
|
|
self._update_app_state_map_context(None, None, None)
|
|
self._queue_map_for_display(None)
|
|
return
|
|
if self._app_state.shutting_down:
|
|
return
|
|
try:
|
|
zoom = config.DEFAULT_MAP_ZOOM_LEVEL
|
|
center_lat_deg = config.SAR_CENTER_LAT
|
|
center_lon_deg = config.SAR_CENTER_LON
|
|
size_km = config.SAR_IMAGE_SIZE_KM
|
|
fetch_bbox_deg = get_bounding_box_from_center_size(
|
|
center_lat_deg, center_lon_deg, size_km * 1.2
|
|
)
|
|
if fetch_bbox_deg is None:
|
|
raise MapCalculationError("BBox calc failed.")
|
|
tile_ranges = get_tile_ranges_for_bbox(fetch_bbox_deg, zoom)
|
|
if tile_ranges is None:
|
|
raise MapCalculationError("Tile range calc failed.")
|
|
if self._app_state.shutting_down:
|
|
return
|
|
map_bounds_deg = self._get_bounds_for_tile_range(zoom, tile_ranges)
|
|
if map_bounds_deg is None:
|
|
raise MapCalculationError("Map bounds calc failed.")
|
|
base_map_image_pil = self._map_tile_manager.stitch_map_image(
|
|
zoom, tile_ranges[0], tile_ranges[1]
|
|
)
|
|
if self._app_state.shutting_down:
|
|
return
|
|
if base_map_image_pil:
|
|
base_map_image_pil = self._draw_scale_bar(
|
|
base_map_image_pil, center_lat_deg, zoom
|
|
)
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error calculating initial map:")
|
|
base_map_image_pil = None
|
|
map_bounds_deg = None
|
|
zoom = None
|
|
finally:
|
|
if not self._app_state.shutting_down:
|
|
self._update_app_state_map_context(
|
|
base_map_image_pil, map_bounds_deg, zoom
|
|
)
|
|
self._queue_map_for_display(
|
|
base_map_image_pil
|
|
) # Initial map is also the composed map initially
|
|
logging.debug(f"{log_prefix} Initial map display thread finished.")
|
|
|
|
# --- >>> START OF MODIFIED FUNCTION <<< ---
|
|
def update_map_overlay(
|
|
self,
|
|
sar_normalized_uint8: Optional[np.ndarray],
|
|
geo_info_radians: Optional[Dict[str, Any]],
|
|
):
|
|
"""
|
|
Calculates the map overlay based on current state. Fetches/stitches map,
|
|
applies SAR shift, processes SAR image, warps/blends overlay using masked alpha,
|
|
draws bounding box, draws map click marker, adds scale bar, updates AppState
|
|
(base map, context, processed SAR, warp info, corners, composed map),
|
|
and queues the result for display.
|
|
Uses a lock to prevent concurrent updates.
|
|
"""
|
|
log_prefix = f"{self._log_prefix} Map Update"
|
|
if not self._map_update_lock.acquire(blocking=False):
|
|
logging.debug(f"{log_prefix} Map update already in progress. Skipping.")
|
|
return
|
|
|
|
# Initialize variables for this update cycle outside the try block
|
|
# So they can be referenced in the final 'finally' block for AppState update
|
|
base_map_pil: Optional[ImageType] = None
|
|
final_composed_pil: Optional[ImageType] = None
|
|
map_bounds_deg: Optional[Tuple[float, float, float, float]] = None
|
|
zoom: Optional[int] = None
|
|
processed_sar_overlay: Optional[np.ndarray] = (
|
|
None # Store processed SAR for AppState
|
|
)
|
|
sar_warp_matrix: Optional[np.ndarray] = None # Store warp matrix for AppState
|
|
sar_corners_pixels: Optional[List[Tuple[int, int]]] = (
|
|
None # Store corners for AppState
|
|
)
|
|
|
|
try:
|
|
# Prerequisite Checks
|
|
if self._app_state.shutting_down or self._app_state.test_mode_active:
|
|
logging.debug(f"{log_prefix} Skipping: Shutdown or Test Mode active.")
|
|
return # Return early if shutting down or in test mode
|
|
if not (self._map_tile_manager and self._map_display_window and self._geod):
|
|
logging.warning(f"{log_prefix} Skipping: Map components not ready.")
|
|
return # Return early if components missing
|
|
if not geo_info_radians or not geo_info_radians.get("valid", False):
|
|
logging.warning(f"{log_prefix} Skipping: Invalid GeoInfo.")
|
|
return # Return early if GeoInfo invalid
|
|
if Image is None or mercantile is None or pyproj is None or cv2 is None:
|
|
logging.error(
|
|
f"{log_prefix} Skipping: Required libraries missing (PIL/mercantile/pyproj/cv2)."
|
|
)
|
|
return # Return early if libs missing
|
|
|
|
# State Variables
|
|
overlay_enabled = self._app_state.map_sar_overlay_enabled
|
|
overlay_alpha = self._app_state.map_sar_overlay_alpha
|
|
lat_shift_deg = self._app_state.sar_lat_shift_deg
|
|
lon_shift_deg = self._app_state.sar_lon_shift_deg
|
|
|
|
# SAR Data Check (only if overlay enabled)
|
|
if overlay_enabled and (
|
|
sar_normalized_uint8 is None or sar_normalized_uint8.size == 0
|
|
):
|
|
logging.warning(
|
|
f"{log_prefix} Disabling overlay for this frame: Missing SAR data."
|
|
)
|
|
overlay_enabled = False # Disable locally for this frame
|
|
|
|
logging.debug(
|
|
f"{log_prefix} Starting (Overlay:{overlay_enabled}, Alpha:{overlay_alpha:.2f}, Shift:{lat_shift_deg:.6f},{lon_shift_deg:.6f})..."
|
|
)
|
|
|
|
# Use a nested try...except specifically for map/overlay calculations
|
|
try:
|
|
# Apply Shift to Center Coordinates
|
|
center_lat_rad = geo_info_radians.get("lat", 0.0)
|
|
center_lon_rad = geo_info_radians.get("lon", 0.0)
|
|
shifted_lat_deg = max(
|
|
-90.0, min(90.0, math.degrees(center_lat_rad) + lat_shift_deg)
|
|
)
|
|
shifted_lon_deg = (
|
|
(math.degrees(center_lon_rad) + lon_shift_deg + 180) % 360
|
|
) - 180
|
|
|
|
# Calculate Common Parameters (using SHIFTED center)
|
|
scale_x = geo_info_radians.get("scale_x", 0.0)
|
|
width_px = geo_info_radians.get("width_px", 0)
|
|
size_km = (
|
|
(scale_x * width_px / 1000.0)
|
|
if (scale_x > 0 and width_px > 0)
|
|
else config.SAR_IMAGE_SIZE_KM
|
|
)
|
|
zoom = config.DEFAULT_MAP_ZOOM_LEVEL # Use configured zoom
|
|
|
|
# Fetch and Stitch Base Map
|
|
fetch_bbox_deg = get_bounding_box_from_center_size(
|
|
shifted_lat_deg,
|
|
shifted_lon_deg,
|
|
size_km * 1.2, # Fetch slightly larger area
|
|
)
|
|
if fetch_bbox_deg is None:
|
|
raise MapCalculationError("Tile BBox calc failed.")
|
|
|
|
tile_ranges = get_tile_ranges_for_bbox(fetch_bbox_deg, zoom)
|
|
if tile_ranges is None:
|
|
raise MapCalculationError("Tile range calc failed.")
|
|
|
|
if self._app_state.shutting_down:
|
|
return # Check shutdown before long operation
|
|
|
|
map_bounds_deg = self._get_bounds_for_tile_range(zoom, tile_ranges)
|
|
if map_bounds_deg is None:
|
|
raise MapCalculationError("Map bounds calc failed.")
|
|
|
|
base_map_pil = self._map_tile_manager.stitch_map_image(
|
|
zoom, tile_ranges[0], tile_ranges[1]
|
|
)
|
|
if base_map_pil is None:
|
|
raise MapCalculationError("Base map stitch failed.")
|
|
|
|
if self._app_state.shutting_down:
|
|
return # Check again after potential long operation
|
|
|
|
# Calculate SAR Footprint Pixels on Map
|
|
shifted_geo_info = geo_info_radians.copy()
|
|
shifted_geo_info["lat"] = math.radians(shifted_lat_deg)
|
|
shifted_geo_info["lon"] = math.radians(shifted_lon_deg)
|
|
sar_corners_deg = self._calculate_sar_corners_geo(shifted_geo_info)
|
|
if sar_corners_deg is None:
|
|
raise MapCalculationError("SAR corner geo calc failed.")
|
|
|
|
map_shape_yx = base_map_pil.size[::-1] # PIL size is (W,H), need (H,W)
|
|
sar_corners_pixels = self._geo_coords_to_map_pixels(
|
|
sar_corners_deg, map_bounds_deg, tile_ranges, zoom, map_shape_yx
|
|
)
|
|
if sar_corners_pixels is None:
|
|
raise MapCalculationError("SAR corner pixel conversion failed.")
|
|
|
|
# Prepare Base Map Image (NumPy BGR) for drawing/blending
|
|
map_cv_bgr = cv2.cvtColor(np.array(base_map_pil), cv2.COLOR_RGB2BGR)
|
|
map_h, map_w = map_cv_bgr.shape[:2]
|
|
display_cv_bgr = (
|
|
map_cv_bgr.copy()
|
|
) # Start with base map for final composition
|
|
|
|
# --- Apply SAR Overlay OR Draw Bounding Box ---
|
|
if overlay_enabled: # SAR data validity checked earlier
|
|
logging.debug(f"{log_prefix} Processing SAR image for overlay...")
|
|
processed_sar_overlay = self._process_sar_for_overlay(
|
|
sar_normalized_uint8
|
|
) # Store this for AppState cache
|
|
|
|
if processed_sar_overlay is not None:
|
|
logging.debug(
|
|
f"{log_prefix} Warping SAR image onto map perspective..."
|
|
)
|
|
try:
|
|
sar_h, sar_w = processed_sar_overlay.shape[:2]
|
|
pts_sar = np.float32(
|
|
[
|
|
[0, 0],
|
|
[sar_w - 1, 0],
|
|
[sar_w - 1, sar_h - 1],
|
|
[0, sar_h - 1],
|
|
]
|
|
)
|
|
pts_map = np.float32(sar_corners_pixels)
|
|
sar_warp_matrix = cv2.getPerspectiveTransform(
|
|
pts_sar, pts_map
|
|
) # Store this for AppState cache
|
|
|
|
# Warp the processed SAR image
|
|
warped_sar_bgr = cv2.warpPerspective(
|
|
src=processed_sar_overlay,
|
|
M=sar_warp_matrix,
|
|
dsize=(map_w, map_h),
|
|
flags=cv2.INTER_LINEAR,
|
|
borderMode=cv2.BORDER_CONSTANT,
|
|
borderValue=(
|
|
0,
|
|
0,
|
|
0,
|
|
), # Use black border for mask generation
|
|
)
|
|
|
|
# Create Mask from warped SAR (non-black pixels)
|
|
gray_warped = cv2.cvtColor(
|
|
warped_sar_bgr, cv2.COLOR_BGR2GRAY
|
|
)
|
|
_, sar_mask = cv2.threshold(
|
|
gray_warped, 1, 255, cv2.THRESH_BINARY
|
|
)
|
|
|
|
# --- MASKED Alpha Blending ---
|
|
# Convert mask to float (0.0 to 1.0) and multiply by alpha
|
|
mask_alpha = (
|
|
sar_mask.astype(np.float32) / 255.0
|
|
) * overlay_alpha
|
|
# Expand mask_alpha to 3 channels
|
|
mask_alpha_3ch = cv2.cvtColor(
|
|
mask_alpha, cv2.COLOR_GRAY2BGR
|
|
)
|
|
|
|
logging.debug(
|
|
f"{log_prefix} Blending map and warped SAR using mask (Alpha: {overlay_alpha:.2f})..."
|
|
)
|
|
# Blend: map * (1 - mask*alpha) + warped_sar * (mask*alpha)
|
|
term1 = map_cv_bgr.astype(np.float32) * (
|
|
1.0 - mask_alpha_3ch
|
|
)
|
|
term2 = warped_sar_bgr.astype(np.float32) * mask_alpha_3ch
|
|
blended_float = term1 + term2
|
|
display_cv_bgr = np.clip(blended_float, 0, 255).astype(
|
|
np.uint8
|
|
) # Final blended result
|
|
|
|
logging.debug(
|
|
f"{log_prefix} Masked alpha blending complete."
|
|
)
|
|
|
|
except cv2.error as warp_err:
|
|
logging.exception(
|
|
f"{log_prefix} OpenCV error during SAR warp/blend:"
|
|
)
|
|
overlay_enabled = False # Fallback to drawing box
|
|
except Exception as warp_generic_err:
|
|
logging.exception(
|
|
f"{log_prefix} Unexpected error during SAR warp/blend:"
|
|
)
|
|
overlay_enabled = False # Fallback to drawing box
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} SAR processing for overlay failed. Disabling overlay."
|
|
)
|
|
overlay_enabled = False # Fallback to drawing box
|
|
|
|
# Draw Red Box (If overlay was disabled or failed during processing)
|
|
if not overlay_enabled:
|
|
logging.debug(f"{log_prefix} Drawing SAR bounding box...")
|
|
try:
|
|
pts = np.array(sar_corners_pixels, np.int32).reshape((-1, 1, 2))
|
|
cv2.polylines(
|
|
display_cv_bgr,
|
|
[pts],
|
|
isClosed=True,
|
|
color=(0, 0, 255),
|
|
thickness=2,
|
|
)
|
|
except Exception as draw_err:
|
|
logging.exception(
|
|
f"{log_prefix} Error drawing SAR bounding box:"
|
|
)
|
|
|
|
# --- >>> START OF MAP MARKER DRAWING <<< ---
|
|
# Draw Map Click Marker onto the composed BGR image BEFORE converting to PIL
|
|
map_click_coords = self._app_state.last_map_click_coords
|
|
if (
|
|
map_click_coords
|
|
and isinstance(map_click_coords, tuple)
|
|
and len(map_click_coords) == 2
|
|
):
|
|
try:
|
|
marker_x, marker_y = map_click_coords
|
|
marker_color = (0, 0, 255) # BGR for Red
|
|
marker_type = cv2.MARKER_CROSS
|
|
marker_size = 15
|
|
marker_thickness = 2
|
|
logging.debug(
|
|
f"{log_prefix} Drawing Map click marker at {map_click_coords}"
|
|
)
|
|
# Draw directly onto display_cv_bgr
|
|
cv2.drawMarker(
|
|
display_cv_bgr,
|
|
(marker_x, marker_y),
|
|
marker_color,
|
|
marker_type,
|
|
marker_size,
|
|
marker_thickness,
|
|
)
|
|
except Exception as draw_err:
|
|
logging.exception(
|
|
f"{log_prefix} Error drawing Map marker: {draw_err}"
|
|
)
|
|
# --- >>> END OF MAP MARKER DRAWING <<< ---
|
|
|
|
# Convert Final CV Image back to PIL
|
|
final_composed_pil = Image.fromarray(
|
|
cv2.cvtColor(display_cv_bgr, cv2.COLOR_BGR2RGB)
|
|
)
|
|
|
|
# Add Scale Bar (to PIL image)
|
|
final_composed_pil = self._draw_scale_bar(
|
|
final_composed_pil, shifted_lat_deg, zoom
|
|
)
|
|
|
|
except MapCalculationError as e:
|
|
logging.error(f"{log_prefix} Map overlay calculation failed: {e}")
|
|
final_composed_pil = (
|
|
self._get_fallback_map()
|
|
) # Use last base map if possible
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Unexpected error during map overlay update:"
|
|
)
|
|
final_composed_pil = (
|
|
self._get_fallback_map()
|
|
) # Use last base map if possible
|
|
|
|
finally:
|
|
# --- Update AppState and Queue ---
|
|
# This block runs even if errors occurred during calculation
|
|
if not self._app_state.shutting_down:
|
|
# Update context (bounds, zoom, base map) - use values calculated in the 'try' block
|
|
self._update_app_state_map_context(
|
|
base_map_pil, # The stitched map before overlay/markers
|
|
map_bounds_deg,
|
|
zoom,
|
|
)
|
|
# Update SAR overlay cache data
|
|
self._app_state.last_processed_sar_for_overlay = (
|
|
processed_sar_overlay.copy()
|
|
if processed_sar_overlay is not None
|
|
else None
|
|
)
|
|
self._app_state.last_sar_warp_matrix = (
|
|
sar_warp_matrix.copy() if sar_warp_matrix is not None else None
|
|
)
|
|
self._app_state.last_sar_corners_pixels_map = (
|
|
sar_corners_pixels.copy()
|
|
if sar_corners_pixels is not None
|
|
else None
|
|
)
|
|
|
|
# Queue the final composed image (or fallback) for display
|
|
# This also updates self._app_state.last_composed_map_pil
|
|
self._queue_map_for_display(final_composed_pil)
|
|
|
|
self._map_update_lock.release() # Ensure lock is released
|
|
logging.debug(f"{log_prefix} Map update finished.")
|
|
|
|
# Function modified: _recompose_map_overlay
|
|
def _recompose_map_overlay(self):
|
|
"""
|
|
Re-applies SAR overlay with current alpha/settings using cached data
|
|
and draws the last map click marker.
|
|
If cached data is insufficient, it logs a warning and exits.
|
|
Uses a lock to prevent concurrent updates.
|
|
"""
|
|
log_prefix = f"{self._log_prefix} RecomposeMap"
|
|
if not self._map_update_lock.acquire(blocking=False):
|
|
logging.debug(
|
|
f"{log_prefix} Map update/recomposition already in progress. Skipping."
|
|
)
|
|
return
|
|
|
|
final_recomposed_pil: Optional[ImageType] = None # Initialize here
|
|
try:
|
|
if self._app_state.shutting_down:
|
|
logging.debug(f"{log_prefix} Skipping: Shutdown detected.")
|
|
return
|
|
logging.debug(f"{log_prefix} Starting map recomposition...")
|
|
|
|
# --- Get cached data & current state ---
|
|
base_map_pil = self._app_state.last_map_image_pil
|
|
processed_sar = self._app_state.last_processed_sar_for_overlay
|
|
warp_matrix = self._app_state.last_sar_warp_matrix
|
|
sar_corner_pixels = self._app_state.last_sar_corners_pixels_map
|
|
map_click_coords = (
|
|
self._app_state.last_map_click_coords
|
|
) # Get marker coords
|
|
overlay_enabled = self._app_state.map_sar_overlay_enabled
|
|
overlay_alpha = self._app_state.map_sar_overlay_alpha
|
|
# Get lat/zoom from state for scale bar calculation (if possible)
|
|
latitude_deg = 0.0
|
|
zoom = None
|
|
current_geo = self._app_state.current_sar_geo_info
|
|
if current_geo and current_geo.get("valid"):
|
|
latitude_deg = math.degrees(current_geo.get("lat", 0.0))
|
|
# Apply shift to latitude if scale bar should reflect shifted center
|
|
latitude_deg += self._app_state.sar_lat_shift_deg
|
|
latitude_deg = max(-90.0, min(90.0, latitude_deg)) # Clamp after shift
|
|
zoom = self._app_state.map_current_zoom
|
|
|
|
# --- Validate necessary cached data for the CURRENT desired state ---
|
|
if base_map_pil is None:
|
|
logging.warning(
|
|
f"{log_prefix} Cannot recompose: Base map image missing."
|
|
)
|
|
return # Cannot proceed without base map
|
|
|
|
# Check data needed based on whether overlay is enabled or not
|
|
if overlay_enabled and (processed_sar is None or warp_matrix is None):
|
|
logging.error(
|
|
f"{log_prefix} Cannot recompose overlay: SAR cache data missing "
|
|
f"(ProcessedSAR:{processed_sar is not None}, WarpMatrix:{warp_matrix is not None})."
|
|
"Consider triggering a full update."
|
|
)
|
|
# Decide fallback: Trigger full update or just show base map?
|
|
# For now, just log and exit, letting the user trigger full update if needed.
|
|
return
|
|
elif not overlay_enabled and sar_corner_pixels is None:
|
|
# If overlay is off, we need corner pixels to draw the box
|
|
logging.warning(
|
|
f"{log_prefix} Cannot draw bounding box during recompose: Corner pixels missing."
|
|
"Showing base map without box."
|
|
)
|
|
# Fallback: Use base map directly, skip drawing box/overlay
|
|
final_recomposed_pil = base_map_pil.copy() # Start with base map
|
|
# Skip further processing, jump to scale bar / queuing later
|
|
|
|
# --- Recomposition Steps (only if not fallen back to base map already) ---
|
|
if final_recomposed_pil is None:
|
|
logging.debug(f"{log_prefix} Starting recomposition drawing...")
|
|
map_cv_bgr = cv2.cvtColor(np.array(base_map_pil), cv2.COLOR_RGB2BGR)
|
|
map_h, map_w = map_cv_bgr.shape[:2]
|
|
display_cv_bgr = map_cv_bgr.copy() # Start with base map
|
|
|
|
# Apply Overlay if enabled (we already checked cache validity)
|
|
if overlay_enabled:
|
|
logging.debug(
|
|
f"{log_prefix} Recomposing SAR overlay (Alpha: {overlay_alpha:.2f})..."
|
|
)
|
|
try:
|
|
# Warp using cached matrix
|
|
warped_sar_bgr = cv2.warpPerspective(
|
|
processed_sar,
|
|
warp_matrix,
|
|
(map_w, map_h),
|
|
flags=cv2.INTER_LINEAR,
|
|
borderMode=cv2.BORDER_CONSTANT,
|
|
borderValue=(0, 0, 0),
|
|
)
|
|
# Create mask
|
|
warped_sar_gray = cv2.cvtColor(
|
|
warped_sar_bgr, cv2.COLOR_BGR2GRAY
|
|
)
|
|
_, sar_mask = cv2.threshold(
|
|
warped_sar_gray, 1, 255, cv2.THRESH_BINARY
|
|
)
|
|
|
|
# Blend using current alpha
|
|
mask_alpha = (
|
|
sar_mask.astype(np.float32) / 255.0
|
|
) * overlay_alpha
|
|
mask_alpha_3ch = cv2.cvtColor(mask_alpha, cv2.COLOR_GRAY2BGR)
|
|
term1 = map_cv_bgr.astype(np.float32) * (1.0 - mask_alpha_3ch)
|
|
term2 = warped_sar_bgr.astype(np.float32) * mask_alpha_3ch
|
|
blended_float = term1 + term2
|
|
display_cv_bgr = np.clip(blended_float, 0, 255).astype(
|
|
np.uint8
|
|
) # Update display image
|
|
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Error during SAR overlay recomposition warp/blend:"
|
|
)
|
|
# Fallback to base map if recompose fails catastrophically
|
|
display_cv_bgr = map_cv_bgr # Reset to base map
|
|
|
|
# Draw Bounding Box if overlay disabled (and corners exist)
|
|
elif sar_corner_pixels is not None:
|
|
logging.debug(f"{log_prefix} Recomposing SAR bounding box...")
|
|
try:
|
|
pts = np.array(sar_corner_pixels, np.int32).reshape((-1, 1, 2))
|
|
cv2.polylines(
|
|
display_cv_bgr,
|
|
[pts],
|
|
isClosed=True,
|
|
color=(0, 0, 255),
|
|
thickness=2,
|
|
)
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Error recomposing bounding box:"
|
|
)
|
|
# display_cv_bgr remains base map
|
|
|
|
# --- >>> START OF MAP MARKER DRAWING <<< ---
|
|
# Draw Map Click Marker onto the recomposed BGR image
|
|
if (
|
|
map_click_coords
|
|
and isinstance(map_click_coords, tuple)
|
|
and len(map_click_coords) == 2
|
|
):
|
|
try:
|
|
marker_x, marker_y = map_click_coords
|
|
marker_color = (0, 0, 255) # BGR for Red
|
|
marker_type = cv2.MARKER_CROSS
|
|
marker_size = 15
|
|
marker_thickness = 2
|
|
logging.debug(
|
|
f"{log_prefix} Recomposing: Drawing Map click marker at {map_click_coords}"
|
|
)
|
|
# Draw directly onto display_cv_bgr
|
|
cv2.drawMarker(
|
|
display_cv_bgr,
|
|
(marker_x, marker_y),
|
|
marker_color,
|
|
marker_type,
|
|
marker_size,
|
|
marker_thickness,
|
|
)
|
|
except Exception as draw_err:
|
|
logging.exception(
|
|
f"{log_prefix} Recomposing: Error drawing Map marker: {draw_err}"
|
|
)
|
|
# --- >>> END OF MAP MARKER DRAWING <<< ---
|
|
|
|
# Convert final OpenCV image back to PIL
|
|
final_recomposed_pil = Image.fromarray(
|
|
cv2.cvtColor(display_cv_bgr, cv2.COLOR_BGR2RGB)
|
|
)
|
|
# End of drawing logic if not fallen back to base map
|
|
|
|
# --- Add Scale Bar (if possible) ---
|
|
if (
|
|
final_recomposed_pil
|
|
and zoom is not None
|
|
and math.isfinite(latitude_deg)
|
|
):
|
|
# Add scale bar to the potentially modified final_recomposed_pil
|
|
final_recomposed_pil = self._draw_scale_bar(
|
|
final_recomposed_pil, latitude_deg, zoom
|
|
)
|
|
elif final_recomposed_pil: # Log if scale bar cannot be added
|
|
logging.warning(
|
|
f"{log_prefix} Cannot draw scale bar during recompose (missing zoom/lat context? Zoom:{zoom}, LatFinite:{math.isfinite(latitude_deg)})."
|
|
)
|
|
|
|
# --- Update state and Queue ---
|
|
# Queue the recomposed image (or the base map fallback) for display
|
|
# This call also updates AppState.last_composed_map_pil
|
|
self._queue_map_for_display(final_recomposed_pil)
|
|
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Unexpected error during map recomposition:"
|
|
)
|
|
# Attempt to queue base map as fallback if something unexpected happened
|
|
if not self._app_state.shutting_down:
|
|
base_map = self._app_state.last_map_image_pil
|
|
if base_map:
|
|
logging.warning(
|
|
f"{log_prefix} Queuing base map as fallback due to error."
|
|
)
|
|
self._queue_map_for_display(base_map)
|
|
else:
|
|
logging.error(
|
|
f"{log_prefix} Cannot queue fallback, base map is also None."
|
|
)
|
|
|
|
finally:
|
|
self._map_update_lock.release() # Release lock regardless of success/failure
|
|
logging.debug(f"{log_prefix} Map recomposition finished.")
|
|
|
|
def _process_sar_for_overlay(
|
|
self, sar_normalized_uint8: np.ndarray
|
|
) -> Optional[np.ndarray]:
|
|
"""Applies B/C LUT and Palette to SAR image for overlay."""
|
|
# (Implementation unchanged)
|
|
log_prefix = f"{self._log_prefix} ProcessSAROverlay"
|
|
try:
|
|
bc_lut = self._app_state.brightness_contrast_lut
|
|
palette = self._app_state.sar_palette
|
|
if bc_lut is None:
|
|
raise ValueError("B/C LUT is None")
|
|
processed_sar = cv2.LUT(sar_normalized_uint8, bc_lut)
|
|
if palette != "GRAY":
|
|
processed_sar = apply_color_palette(processed_sar, palette)
|
|
elif processed_sar.ndim == 2:
|
|
processed_sar = cv2.cvtColor(processed_sar, cv2.COLOR_GRAY2BGR)
|
|
return processed_sar
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error:")
|
|
return None
|
|
|
|
def _get_bounds_for_tile_range(
|
|
self, zoom: int, tile_ranges: Tuple[Tuple[int, int], Tuple[int, int]]
|
|
) -> Optional[Tuple[float, float, float, float]]:
|
|
"""Calculate the precise geographic bounds covered by a given range of tiles."""
|
|
# (Implementation corrected in previous step)
|
|
log_prefix = f"{self._log_prefix} TileBounds"
|
|
if mercantile is None:
|
|
return None
|
|
try:
|
|
min_x, max_x = tile_ranges[0]
|
|
min_y, max_y = tile_ranges[1]
|
|
ul_bound = mercantile.bounds(min_x, min_y, zoom)
|
|
lr_bound = mercantile.bounds(max_x, max_y, zoom)
|
|
west = ul_bound[0]
|
|
east = lr_bound[2]
|
|
south = lr_bound[1]
|
|
north = ul_bound[3]
|
|
return (west, south, east, north)
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Error calculating bounds for tile range: {e}"
|
|
)
|
|
return None
|
|
|
|
def _update_app_state_map_context(
|
|
self,
|
|
base_map_pil: Optional[ImageType],
|
|
bounds_deg: Optional[Tuple[float, float, float, float]],
|
|
zoom: Optional[int],
|
|
):
|
|
"""Safely updates map context information in AppState."""
|
|
# (Implementation unchanged)
|
|
log_prefix = f"{self._log_prefix} UpdateMapContext"
|
|
try:
|
|
self._app_state.last_map_image_pil = (
|
|
base_map_pil.copy() if base_map_pil else None
|
|
)
|
|
self._app_state.map_current_bounds_deg = bounds_deg
|
|
self._app_state.map_current_zoom = zoom
|
|
# Shape is updated later by display_map
|
|
logging.debug(
|
|
f"{log_prefix} Updated AppState: Bounds={bounds_deg}, Zoom={zoom}, BasePIL={'Set' if base_map_pil else 'None'}"
|
|
)
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error updating AppState map context:")
|
|
|
|
# --- >>> START OF MODIFIED FUNCTION <<< ---
|
|
def _queue_map_for_display(self, image_pil: Optional[ImageType]):
|
|
"""Puts the final composed map image onto the Tkinter queue AND updates composed state."""
|
|
log_prefix = f"{self._log_prefix} QueueMap"
|
|
payload_type = type(image_pil).__name__ if image_pil else "None"
|
|
|
|
# --- Update composed map state ---
|
|
try:
|
|
self._app_state.last_composed_map_pil = (
|
|
image_pil.copy() if image_pil else None
|
|
)
|
|
logging.debug(
|
|
f"{log_prefix} Updated last_composed_map_pil in AppState (Type: {payload_type})."
|
|
)
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Error updating last_composed_map_pil in AppState:"
|
|
)
|
|
|
|
# --- Queue for display ---
|
|
logging.debug(
|
|
f"{log_prefix} Queueing SHOW_MAP command (Payload type: {payload_type})"
|
|
)
|
|
put_queue(self._tkinter_queue, ("SHOW_MAP", image_pil), "tkinter", self._app)
|
|
|
|
#
|
|
|
|
def display_map(self, map_image_pil: Optional[ImageType]):
|
|
"""Instructs the MapDisplayWindow to show the provided map image and updates app state."""
|
|
# (Implementation unchanged)
|
|
log_prefix = f"{self._log_prefix} Display"
|
|
displayed_shape = None
|
|
if self._map_display_window:
|
|
try:
|
|
self._map_display_window.show_map(map_image_pil)
|
|
displayed_shape = self._map_display_window._last_displayed_shape
|
|
if (
|
|
displayed_shape
|
|
and displayed_shape[0] > 0
|
|
and displayed_shape[1] > 0
|
|
):
|
|
self._app_state.map_current_shape_px = displayed_shape
|
|
else:
|
|
self._app_state.map_current_shape_px = None
|
|
self._update_app_status_after_map_load(map_image_pil is not None)
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Error calling MapDisplayWindow.show_map:"
|
|
)
|
|
self._app_state.map_current_shape_px = None
|
|
else:
|
|
self._app_state.map_current_shape_px = None
|
|
|
|
def _update_app_status_after_map_load(self, success: bool):
|
|
"""Updates the main application status after the initial map load attempt."""
|
|
# (Implementation unchanged)
|
|
log_prefix = f"{self._log_prefix} Status Update"
|
|
try:
|
|
statusbar_ref = getattr(self._app, "statusbar", None)
|
|
if statusbar_ref and "Loading initial map" in statusbar_ref.cget("text"):
|
|
status_msg = "Error Loading Map"
|
|
if success: # Determine correct 'Ready' status
|
|
mode = self._app.state.test_mode_active
|
|
is_local = config.USE_LOCAL_IMAGES
|
|
is_net = not is_local and not mode
|
|
if mode:
|
|
status_msg = "Ready (Test Mode)"
|
|
elif is_local:
|
|
status_msg = "Ready (Local Mode)"
|
|
elif is_net:
|
|
status_msg = (
|
|
f"Listening UDP {self._app.local_ip}:{self._app.local_port}"
|
|
if self._app.udp_socket
|
|
else "Error: No Socket"
|
|
)
|
|
else:
|
|
status_msg = "Ready"
|
|
self._app.set_status(status_msg)
|
|
except Exception as e:
|
|
logging.warning(
|
|
f"{log_prefix} Error checking/updating app status after map load: {e}"
|
|
)
|
|
|
|
def shutdown(self):
|
|
"""Cleans up map-related resources."""
|
|
# (Implementation unchanged)
|
|
log_prefix = f"{self._log_prefix} Shutdown"
|
|
if self._map_display_window:
|
|
try:
|
|
self._map_display_window.destroy_window()
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error destroying MapDisplayWindow:")
|
|
self._map_display_window = None
|
|
logging.debug(f"{log_prefix} Map integration shutdown complete.")
|
|
|
|
# --- Geo Calculation Helpers ---
|
|
# (_calculate_sar_corners_geo, _geo_coords_to_map_pixels - Unchanged)
|
|
def _calculate_sar_corners_geo(
|
|
self, geo_info: Dict[str, Any]
|
|
) -> Optional[List[Tuple[float, float]]]:
|
|
# (Implementation unchanged from previous version)
|
|
log_prefix = f"{self._log_prefix} SAR Corners Geo"
|
|
if not self._geod:
|
|
return None
|
|
try:
|
|
lat_rad, lon_rad = geo_info["lat"], geo_info["lon"]
|
|
orient_rad = geo_info["orientation"]
|
|
ref_x, ref_y = geo_info["ref_x"], geo_info["ref_y"]
|
|
scale_x, scale_y = geo_info["scale_x"], geo_info["scale_y"]
|
|
width, height = geo_info["width_px"], geo_info["height_px"]
|
|
calc_orient_rad = -orient_rad
|
|
if not (scale_x > 0 and scale_y > 0 and width > 0 and height > 0):
|
|
return None
|
|
corners_pixel = [
|
|
(0 - ref_x, ref_y - 0),
|
|
(width - 1 - ref_x, ref_y - 0),
|
|
(width - 1 - ref_x, ref_y - (height - 1)),
|
|
(0 - ref_x, ref_y - (height - 1)),
|
|
]
|
|
corners_meters = [(dx * scale_x, dy * scale_y) for dx, dy in corners_pixel]
|
|
corners_meters_rotated = []
|
|
if abs(calc_orient_rad) > 1e-6:
|
|
cos_o, sin_o = math.cos(calc_orient_rad), math.sin(calc_orient_rad)
|
|
corners_meters_rotated = [
|
|
(dx * cos_o - dy * sin_o, dx * sin_o + dy * cos_o)
|
|
for dx, dy in corners_meters
|
|
]
|
|
else:
|
|
corners_meters_rotated = corners_meters
|
|
corners_geo_deg = []
|
|
lon_deg, lat_deg = math.degrees(lon_rad), math.degrees(lat_rad)
|
|
for dx_m, dy_m in corners_meters_rotated:
|
|
dist = math.hypot(dx_m, dy_m)
|
|
az = math.degrees(math.atan2(dx_m, dy_m))
|
|
endlon, endlat, _ = self._geod.fwd(lon_deg, lat_deg, az, dist)
|
|
corners_geo_deg.append((endlon, endlat))
|
|
return corners_geo_deg if len(corners_geo_deg) == 4 else None
|
|
except KeyError as ke:
|
|
logging.error(f"{log_prefix} Missing key: {ke}")
|
|
return None
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error:")
|
|
return None
|
|
|
|
def _geo_coords_to_map_pixels(
|
|
self,
|
|
coords_deg: List[Tuple[float, float]],
|
|
map_bounds: Tuple[float, float, float, float],
|
|
map_tile_ranges: Tuple[Tuple[int, int], Tuple[int, int]],
|
|
zoom: int,
|
|
stitched_map_shape: Tuple[int, int],
|
|
) -> Optional[List[Tuple[int, int]]]:
|
|
# (Implementation unchanged from previous version)
|
|
log_prefix = f"{self._log_prefix} Geo to Pixel"
|
|
if mercantile is None:
|
|
return None
|
|
if (
|
|
not stitched_map_shape
|
|
or stitched_map_shape[0] <= 0
|
|
or stitched_map_shape[1] <= 0
|
|
):
|
|
return None
|
|
pixel_coords = []
|
|
map_h, map_w = stitched_map_shape
|
|
map_west, map_south, map_east, map_north = map_bounds
|
|
try:
|
|
map_ul_x, map_ul_y = mercantile.xy(map_west, map_north)
|
|
map_lr_x, map_lr_y = mercantile.xy(map_east, map_south)
|
|
map_w_merc = map_lr_x - map_ul_x
|
|
map_h_merc = map_ul_y - map_lr_y
|
|
if map_w_merc <= 0 or map_h_merc <= 0:
|
|
return None
|
|
for lon, lat in coords_deg:
|
|
pt_x, pt_y = mercantile.xy(lon, lat)
|
|
rel_x = pt_x - map_ul_x
|
|
rel_y = map_ul_y - pt_y
|
|
px = int(round((rel_x / map_w_merc) * map_w))
|
|
py = int(round((rel_y / map_h_merc) * map_h))
|
|
pixel_coords.append(
|
|
(max(0, min(px, map_w - 1)), max(0, min(py, map_h - 1)))
|
|
)
|
|
return pixel_coords
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error:")
|
|
return None
|
|
|
|
# --- Pixel to Geo Conversion ---
|
|
def get_geo_coords_from_map_pixel(
|
|
self, pixel_x: int, pixel_y: int
|
|
) -> Optional[Tuple[float, float]]:
|
|
"""Converts pixel coordinates from the map display window to geographic coordinates (lat, lon degrees)."""
|
|
# (Implementation unchanged from previous version)
|
|
log_prefix = f"{self._log_prefix} Pixel to Geo"
|
|
map_bounds_deg = self._app_state.map_current_bounds_deg
|
|
map_shape_px = self._app_state.map_current_shape_px
|
|
if (
|
|
map_bounds_deg is None
|
|
or map_shape_px is None
|
|
or map_shape_px[0] <= 0
|
|
or map_shape_px[1] <= 0
|
|
or mercantile is None
|
|
):
|
|
return None
|
|
map_h, map_w = map_shape_px
|
|
map_west, map_south, map_east, map_north = map_bounds_deg
|
|
try:
|
|
if not (0 <= pixel_x < map_w and 0 <= pixel_y < map_h):
|
|
return None # Outside bounds
|
|
map_ul_x, map_ul_y = mercantile.xy(map_west, map_north)
|
|
map_lr_x, map_lr_y = mercantile.xy(map_east, map_south)
|
|
map_w_merc = map_lr_x - map_ul_x
|
|
map_h_merc = map_ul_y - map_lr_y
|
|
if map_w_merc <= 0 or map_h_merc <= 0:
|
|
return None
|
|
rel_x = pixel_x / map_w
|
|
rel_y = pixel_y / map_h
|
|
pt_x = map_ul_x + (rel_x * map_w_merc)
|
|
pt_y = map_ul_y - (rel_y * map_h_merc)
|
|
lon_deg, lat_deg = mercantile.lnglat(pt_x, pt_y)
|
|
return (lat_deg, lon_deg)
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error:")
|
|
return None
|
|
|
|
# --- Map Saving ---
|
|
def save_map_view_to_file(
|
|
self, directory: Optional[str] = None, filename: Optional[str] = None
|
|
) -> bool:
|
|
"""Saves the last composed map view (from AppState.last_composed_map_pil) to a file."""
|
|
# (Implementation unchanged from previous version)
|
|
log_prefix = f"{self._log_prefix} SaveMap"
|
|
image_to_save = self._app_state.last_composed_map_pil
|
|
if image_to_save is None:
|
|
self._app.set_status("Error: No map view to save.")
|
|
return False
|
|
if Image is None:
|
|
self._app.set_status("Error: Pillow library missing.")
|
|
return False
|
|
try:
|
|
save_dir_path: Path
|
|
if directory:
|
|
save_dir_path = Path(directory)
|
|
else:
|
|
app_path = (
|
|
Path(sys.executable).parent
|
|
if getattr(sys, "frozen", False)
|
|
else Path(sys.argv[0]).parent
|
|
)
|
|
save_dir_path = app_path / self.DEFAULT_SAVE_DIRECTORY
|
|
save_dir_path.mkdir(parents=True, exist_ok=True)
|
|
save_filename = (
|
|
(Path(filename).stem + ".png")
|
|
if filename
|
|
else f"map_view_{datetime.datetime.now():%Y%m%d_%H%M%S}.png"
|
|
)
|
|
full_save_path = save_dir_path / save_filename
|
|
if ImageDraw and ImageFont: # Add timestamp
|
|
try:
|
|
draw = ImageDraw.Draw(image_to_save)
|
|
font = (
|
|
ImageFont.truetype("arial.ttf", 14)
|
|
if ImageFont
|
|
else ImageFont.load_default()
|
|
)
|
|
text = f"Saved: {datetime.datetime.now():%Y-%m-%d %H:%M:%S}"
|
|
draw.text(
|
|
(10, image_to_save.height - 20), text, font=font, fill=(0, 0, 0)
|
|
)
|
|
except Exception as draw_err:
|
|
logging.warning(
|
|
f"{log_prefix} Could not draw text on saved image: {draw_err}"
|
|
)
|
|
image_to_save.save(full_save_path, "PNG")
|
|
logging.info(f"{log_prefix} Map view saved to: {full_save_path}")
|
|
self._app.set_status(f"Map view saved: {save_filename}")
|
|
return True
|
|
except OSError as e:
|
|
logging.error(
|
|
f"{log_prefix} OS Error saving map view to '{full_save_path}': {e}"
|
|
)
|
|
self._app.set_status("Error: Failed to save map view (OS Error).")
|
|
return False
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Unexpected error saving map view:")
|
|
self._app.set_status("Error: Failed to save map view (Exception).")
|
|
return False
|
|
|
|
# --- Scale Bar Drawing ---
|
|
def _draw_scale_bar(
|
|
self, image_pil: Optional[ImageType], latitude_deg: float, zoom: int
|
|
) -> Optional[ImageType]:
|
|
"""Draws a simple scale bar onto the map image (PIL). Returns modified image or original on error."""
|
|
log_prefix = f"{self._log_prefix} ScaleBar"
|
|
if image_pil is None or not (Image and cv2):
|
|
return image_pil # Return original if no image or dependencies missing
|
|
|
|
try:
|
|
meters_per_pixel = calculate_meters_per_pixel(latitude_deg, zoom)
|
|
if meters_per_pixel is None or meters_per_pixel <= 0:
|
|
logging.warning(
|
|
f"{log_prefix} Invalid meters/pixel ({meters_per_pixel}). Skipping scale bar."
|
|
)
|
|
return image_pil # Cannot calculate scale
|
|
|
|
img_w, img_h = image_pil.size
|
|
# Target bar width relative to image width
|
|
target_bar_px = max(
|
|
50, min(150, img_w // 5)
|
|
) # Aim for ~1/5 width, 50-150px
|
|
|
|
# Predefined "nice" distances in kilometers
|
|
possible_distances_km = [
|
|
0.005,
|
|
0.01,
|
|
0.02,
|
|
0.05,
|
|
0.1,
|
|
0.2,
|
|
0.5,
|
|
1,
|
|
2,
|
|
5,
|
|
10,
|
|
20,
|
|
50,
|
|
100,
|
|
200,
|
|
500,
|
|
1000,
|
|
2000,
|
|
5000,
|
|
]
|
|
|
|
best_dist_km = None # Initialize best distance found
|
|
min_diff = float("inf") # Initialize minimum difference
|
|
|
|
# Find the "nicest" distance that results in a bar length close to the target
|
|
for dist_km in possible_distances_km:
|
|
pixels_for_dist = (dist_km * 1000.0) / meters_per_pixel
|
|
# Ensure the calculated bar length is reasonable (e.g., >= 10 pixels)
|
|
if pixels_for_dist >= 10:
|
|
# --- >>> START OF FIX <<< ---
|
|
# Calculate difference *after* ensuring pixels_for_dist is valid
|
|
diff = abs(pixels_for_dist - target_bar_px)
|
|
# If this difference is smaller than the current minimum, update best distance
|
|
if diff < min_diff:
|
|
min_diff = diff
|
|
best_dist_km = dist_km
|
|
# --- >>> END OF FIX <<< ---
|
|
|
|
# If no suitable distance was found (e.g., very high zoom, small image)
|
|
if best_dist_km is None:
|
|
logging.warning(
|
|
f"{log_prefix} Could not find suitable scale bar distance for zoom {zoom} and image width {img_w}. Skipping."
|
|
)
|
|
return image_pil # Return original image
|
|
|
|
# Calculate final bar length in pixels for the best distance found
|
|
scale_distance_km = best_dist_km
|
|
scale_distance_meters = scale_distance_km * 1000.0
|
|
scale_bar_pixels = int(round(scale_distance_meters / meters_per_pixel))
|
|
|
|
# Final check if calculated bar is somehow too small (redundant due to pixels>=10 check)
|
|
if scale_bar_pixels < 10:
|
|
return image_pil # Should not happen now
|
|
|
|
# --- Draw the scale bar using OpenCV ---
|
|
map_cv = cv2.cvtColor(np.array(image_pil), cv2.COLOR_RGB2BGR)
|
|
# Define bar parameters (position, size, color)
|
|
bar_x_start = 15 # X position of the start of the bar
|
|
bar_y = map_cv.shape[0] - 25 # Y position (near bottom)
|
|
bar_tick_height = 6 # Height of the end ticks
|
|
bar_thickness = 2
|
|
text_gap = 5 # Gap between bar and text label
|
|
text_color = (0, 0, 0) # Black
|
|
bar_color = (0, 0, 0) # Black
|
|
|
|
# Draw main horizontal line
|
|
cv2.line(
|
|
map_cv,
|
|
(bar_x_start, bar_y),
|
|
(bar_x_start + scale_bar_pixels, bar_y),
|
|
bar_color,
|
|
bar_thickness,
|
|
)
|
|
# Draw vertical ticks at ends
|
|
cv2.line(
|
|
map_cv,
|
|
(bar_x_start, bar_y - bar_tick_height // 2),
|
|
(bar_x_start, bar_y + bar_tick_height // 2),
|
|
bar_color,
|
|
bar_thickness,
|
|
)
|
|
cv2.line(
|
|
map_cv,
|
|
(bar_x_start + scale_bar_pixels, bar_y - bar_tick_height // 2),
|
|
(bar_x_start + scale_bar_pixels, bar_y + bar_tick_height // 2),
|
|
bar_color,
|
|
bar_thickness,
|
|
)
|
|
|
|
# Prepare label text (e.g., "10 km", "500 m")
|
|
if scale_distance_km >= 1:
|
|
label = (
|
|
f"{int(scale_distance_km)} km"
|
|
if scale_distance_km.is_integer()
|
|
else f"{scale_distance_km:.1f} km"
|
|
)
|
|
else:
|
|
label = f"{int(scale_distance_meters)} m"
|
|
|
|
# Add text label above the bar
|
|
font = cv2.FONT_HERSHEY_SIMPLEX
|
|
font_scale = 0.5
|
|
font_thickness = 1
|
|
(text_w, text_h), _ = cv2.getTextSize(
|
|
label, font, font_scale, font_thickness
|
|
)
|
|
# Calculate centered X position for the text
|
|
text_x = max(5, bar_x_start + (scale_bar_pixels // 2) - (text_w // 2))
|
|
# Calculate Y position above the bar
|
|
text_y = bar_y - bar_tick_height // 2 - text_gap
|
|
# Draw text
|
|
cv2.putText(
|
|
map_cv,
|
|
label,
|
|
(text_x, text_y),
|
|
font,
|
|
font_scale,
|
|
text_color,
|
|
font_thickness,
|
|
cv2.LINE_AA,
|
|
)
|
|
|
|
# Convert back to PIL image
|
|
image_pil_with_scale = Image.fromarray(
|
|
cv2.cvtColor(map_cv, cv2.COLOR_BGR2RGB)
|
|
)
|
|
logging.debug(
|
|
f"{log_prefix} Scale bar drawn ({label}, {scale_bar_pixels}px)."
|
|
)
|
|
return image_pil_with_scale
|
|
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error drawing scale bar:")
|
|
return image_pil # Return original image on any unexpected error
|
|
|
|
def _get_fallback_map(self) -> Optional[ImageType]:
|
|
"""Returns the last known base map or None."""
|
|
# Helper to reduce code duplication in error handling
|
|
return self._app_state.last_map_image_pil
|
|
|
|
|
|
# --- END OF FILE map_integration.py ---
|