767 lines
40 KiB
Python
767 lines
40 KiB
Python
# --- START OF FILE map_integration.py ---
|
|
|
|
# map_integration.py
|
|
"""
|
|
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
|
|
|
|
Manages map integration functionalities, including service interaction,
|
|
tile fetching/caching, display window management, and overlay updates.
|
|
Acts as an intermediary between the main application and map-specific modules.
|
|
"""
|
|
|
|
# Standard library imports
|
|
import logging
|
|
import threading
|
|
import queue # For type hinting
|
|
import math
|
|
from typing import Optional, Dict, Any, Tuple, List
|
|
|
|
# Third-party imports
|
|
import numpy as np
|
|
try:
|
|
from PIL import Image
|
|
except ImportError:
|
|
Image = None # Handled in dependent modules, but check here too
|
|
try:
|
|
import mercantile
|
|
except ImportError:
|
|
mercantile = None
|
|
try:
|
|
import pyproj
|
|
except ImportError:
|
|
pyproj = None
|
|
|
|
import cv2
|
|
|
|
# Local application imports
|
|
import config
|
|
from app_state import AppState
|
|
from utils import put_queue
|
|
# Map specific modules that this manager orchestrates
|
|
from map_services import get_map_service, BaseMapService
|
|
from map_manager import MapTileManager
|
|
from map_utils import (
|
|
get_bounding_box_from_center_size,
|
|
get_tile_ranges_for_bbox,
|
|
MapCalculationError,
|
|
calculate_meters_per_pixel
|
|
)
|
|
from map_display import MapDisplayWindow
|
|
|
|
# Forward declaration for type hinting App instance
|
|
from typing import TYPE_CHECKING
|
|
if TYPE_CHECKING:
|
|
from app import App
|
|
|
|
|
|
class MapIntegrationManager:
|
|
"""Orchestrates map services, tile management, and display."""
|
|
|
|
def __init__(
|
|
self,
|
|
app_state: AppState,
|
|
tkinter_queue: queue.Queue,
|
|
app: 'App',
|
|
map_x: int,
|
|
map_y: int,
|
|
):
|
|
"""
|
|
Initializes the MapIntegrationManager.
|
|
# ... (docstring args, raises) ...
|
|
"""
|
|
self._log_prefix = "[MapIntegrationManager]"
|
|
logging.debug(f"{self._log_prefix} Initializing...")
|
|
|
|
self._app_state: AppState = app_state
|
|
self._tkinter_queue: queue.Queue = tkinter_queue
|
|
self._app: 'App' = app
|
|
|
|
# --- Dependency Checks ---
|
|
if Image is None: raise ImportError("Pillow library not found")
|
|
if mercantile is None: raise ImportError("mercantile library not found")
|
|
if pyproj is None: raise ImportError("pyproj library not found")
|
|
|
|
# --- Initialize Attributes ---
|
|
self._map_service: Optional[BaseMapService] = None
|
|
self._map_tile_manager: Optional[MapTileManager] = None
|
|
self._map_display_window: Optional[MapDisplayWindow] = None
|
|
self._map_initial_display_thread: Optional[threading.Thread] = None
|
|
self._geod: Optional[pyproj.Geod] = None # Initialize as None first
|
|
|
|
try:
|
|
# --- Geodetic Calculator Initialization ---
|
|
# Create the Geod object here
|
|
self._geod = pyproj.Geod(ellps="WGS84")
|
|
logging.debug(f"{self._log_prefix} pyproj Geod object initialized (WGS84).")
|
|
|
|
# --- Initialize Other Map Components ---
|
|
# 1. Get Map Service
|
|
service_name = getattr(config, 'MAP_SERVICE_PROVIDER', 'osm')
|
|
api_key = getattr(config, 'MAP_API_KEY', None)
|
|
self._map_service = get_map_service(service_name, api_key)
|
|
if not self._map_service:
|
|
raise ValueError(f"Failed to get map service '{service_name}'.")
|
|
logging.debug(f"{self._log_prefix} Map service '{self._map_service.name}' loaded.")
|
|
|
|
# 2. Create Tile Manager
|
|
self._map_tile_manager = MapTileManager(
|
|
map_service=self._map_service,
|
|
cache_base_dir=getattr(config, 'MAP_CACHE_DIRECTORY', None),
|
|
enable_online_fetching=getattr(config, 'ENABLE_ONLINE_MAP_FETCHING', None)
|
|
)
|
|
logging.debug(f"{self._log_prefix} MapTileManager created.")
|
|
|
|
# 3. Create Map Display Window Manager
|
|
self._map_display_window = MapDisplayWindow(
|
|
window_name="Map Overlay", x_pos=map_x, y_pos=map_y
|
|
)
|
|
logging.debug(f"{self._log_prefix} MapDisplayWindow created at ({map_x},{map_y}).")
|
|
|
|
# 4. Trigger Initial Map Display in Background Thread
|
|
logging.debug(f"{self._log_prefix} Starting initial map display thread...")
|
|
# Set status before starting thread
|
|
self._app.set_status("Loading initial map...")
|
|
self._map_initial_display_thread = threading.Thread(
|
|
target=self._display_initial_map_area_thread,
|
|
name="InitialMapDisplayThread",
|
|
daemon=True
|
|
)
|
|
self._map_initial_display_thread.start()
|
|
|
|
except (ImportError, ValueError, pyproj.exceptions.CRSError) as init_err: # Catch pyproj errors too
|
|
logging.critical(f"{self._log_prefix} Initialization failed: {init_err}")
|
|
# Ensure components are None if init fails midway
|
|
self._geod = None
|
|
self._map_service = None
|
|
self._map_tile_manager = None
|
|
self._map_display_window = None
|
|
raise # Re-raise critical errors
|
|
except Exception as e:
|
|
logging.exception(f"{self._log_prefix} Unexpected error during initialization:")
|
|
self._geod = None
|
|
self._map_service = None
|
|
self._map_tile_manager = None
|
|
self._map_display_window = None
|
|
raise # Re-raise other unexpected errors # Re-raise other unexpected errors
|
|
|
|
|
|
def _display_initial_map_area_thread(self):
|
|
"""
|
|
(Runs in background thread) Calculates the initial map area based on default
|
|
config settings and queues the result for display on the main thread,
|
|
*unless* the default coordinates in config are set to (0,0) which signals
|
|
to skip the initial display.
|
|
"""
|
|
log_prefix = f"{self._log_prefix} InitialMap"
|
|
|
|
# Check if default lat/lon are set to 0.0 to prevent initial display
|
|
if config.SAR_CENTER_LAT == 0.0 and config.SAR_CENTER_LON == 0.0:
|
|
# ... (codice per saltare e aggiornare lo stato, come prima) ...
|
|
# ... (assicurati che questa parte sia corretta come nella risposta precedente) ...
|
|
logging.debug(f"{log_prefix} Initial map display skipped based on config defaults (0,0). Waiting for valid GeoInfo.")
|
|
if not self._app_state.shutting_down:
|
|
status_msg = "Status Unavailable" # Default
|
|
try:
|
|
if self._app_state.test_mode_active:
|
|
status_msg = "Ready (Test Mode)"
|
|
elif config.USE_LOCAL_IMAGES:
|
|
status_msg = "Ready (Local Mode)"
|
|
else:
|
|
socket_ok = False
|
|
listening_info = "Error: No Network Socket"
|
|
if hasattr(self._app, 'udp_socket') and self._app.udp_socket:
|
|
if hasattr(self._app, 'local_ip') and hasattr(self._app, 'local_port'):
|
|
listening_info = f"Listening UDP {self._app.local_ip}:{self._app.local_port}"
|
|
socket_ok = True
|
|
else:
|
|
listening_info = "Listening UDP (IP/Port Unknown)"
|
|
socket_ok = True
|
|
status_msg = listening_info
|
|
status_msg += " | Map Ready (Waiting for GeoData)"
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Unexpected error determining status message:")
|
|
status_msg = "Error Getting Status | Map Ready (Waiting for GeoData)"
|
|
self._app.set_status(status_msg)
|
|
return # Esce dal thread
|
|
|
|
# Se le coordinate di default *non* sono (0,0), procedi
|
|
logging.debug(f"{log_prefix} Calculating initial map area based on non-zero config defaults...")
|
|
|
|
# Check dependencies
|
|
if not (self._map_tile_manager and self._map_display_window):
|
|
logging.error(f"{log_prefix} Map components not initialized. Aborting thread.")
|
|
put_queue(self._tkinter_queue, ('SHOW_MAP', None), "tkinter", self._app)
|
|
return
|
|
if self._app_state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown detected. Aborting.")
|
|
return
|
|
|
|
map_image_pil: Optional[Image.Image] = None
|
|
try:
|
|
# --- MODIFICA QUI: Definisci 'zoom' PRIMA di usarlo ---
|
|
zoom = config.DEFAULT_MAP_ZOOM_LEVEL
|
|
logging.debug(f"{log_prefix} Using default zoom level: {zoom}")
|
|
# --- FINE MODIFICA ---
|
|
|
|
# Usa default center/size da config
|
|
bbox = get_bounding_box_from_center_size(
|
|
config.SAR_CENTER_LAT, config.SAR_CENTER_LON, config.SAR_IMAGE_SIZE_KM
|
|
)
|
|
if bbox is None:
|
|
raise MapCalculationError("Failed to calculate initial bounding box.")
|
|
|
|
# Calcola i tile ranges USANDO la variabile zoom definita sopra
|
|
tile_ranges = get_tile_ranges_for_bbox(bbox, zoom)
|
|
if tile_ranges is None:
|
|
raise MapCalculationError("Failed to calculate initial tile ranges.")
|
|
|
|
if self._app_state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown detected before stitching.")
|
|
return
|
|
|
|
# Ora puoi usare 'zoom' nel messaggio didebug(f"{log_prefix} Stitching initial map tiles (Zoom: {zoom}, X: {tile_ranges[0]}, Y: {tile_ranges[1]})...")
|
|
map_image_pil = self._map_tile_manager.stitch_map_image(
|
|
zoom, tile_ranges[0], tile_ranges[1]
|
|
)
|
|
|
|
if self._app_state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown detected after stitching.")
|
|
return
|
|
|
|
if map_image_pil:
|
|
logging.debug(f"{log_prefix} Initial map area stitched successfully.")
|
|
center_lat_deg = config.SAR_CENTER_LAT # Usa il default per la mappa iniziale
|
|
map_image_pil = self._draw_scale_bar(map_image_pil, center_lat_deg, zoom)
|
|
else:
|
|
logging.error(f"{log_prefix} Failed to stitch initial map area.")
|
|
|
|
except (ImportError, MapCalculationError) as e:
|
|
logging.error(f"{log_prefix} Calculation error: {e}")
|
|
map_image_pil = None
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Unexpected error calculating initial map:")
|
|
map_image_pil = None
|
|
finally:
|
|
if not self._app_state.shutting_down:
|
|
logging.debug(f"{log_prefix} Queueing SHOW_MAP command for initial map.")
|
|
put_queue(self._tkinter_queue, ('SHOW_MAP', map_image_pil), "tkinter", self._app)
|
|
logging.debug(f"{log_prefix} Initial map display thread finished.")
|
|
|
|
|
|
def update_map_overlay(self, sar_normalized_uint8: np.ndarray, geo_info_radians: Dict[str, Any]):
|
|
"""
|
|
Calculates the map overlay based on current SAR data.
|
|
Currently fetches map tiles and draws the SAR bounding box.
|
|
Queues the result for display on the main thread.
|
|
|
|
Args:
|
|
sar_normalized_uint8 (np.ndarray): Current normalized SAR image (uint8). Unused in Phase 1.
|
|
geo_info_radians (Dict[str, Any]): Current SAR georeferencing info (in radians).
|
|
"""
|
|
log_prefix = f"{self._log_prefix} Map Update"
|
|
|
|
# --- Prerequisite Checks (Shutdown, Test Mode, Components, GeoInfo) ---
|
|
if self._app_state.shutting_down:
|
|
logging.debug(f"{log_prefix} Skipping update: Shutdown detected.")
|
|
return
|
|
if self._app_state.test_mode_active:
|
|
logging.debug(f"{log_prefix} Skipping update: Test Mode active.")
|
|
return
|
|
|
|
# +++ DETAILED COMPONENT CHECK +++
|
|
if not self._map_tile_manager:
|
|
logging.warning(f"{log_prefix} Skipping update: _map_tile_manager is not available (None or evaluates False).")
|
|
return
|
|
if not self._map_display_window:
|
|
logging.warning(f"{log_prefix} Skipping update: _map_display_window is not available (None or evaluates False).")
|
|
return
|
|
if not self._geod:
|
|
logging.warning(f"{log_prefix} Skipping update: _geod (geodetic calculator) is not available (None or evaluates False).")
|
|
return
|
|
# +++ END DETAILED CHECK +++
|
|
|
|
# Check GeoInfo validity
|
|
if not geo_info_radians or not geo_info_radians.get("valid", False):
|
|
logging.warning(f"{log_prefix} Skipping update: Invalid GeoInfo provided.")
|
|
return
|
|
# Check libraries (redundant if init succeeded, but safe)
|
|
if Image is None or mercantile is None or pyproj is None:
|
|
logging.error(f"{log_prefix} Skipping update: Missing required map libraries.")
|
|
return
|
|
|
|
# Log start of calculation
|
|
logging.debug(f"{log_prefix} Starting map overlay calculation (Phase 1: BBox)...")
|
|
# Initialize variables to store map images
|
|
map_image_with_overlay: Optional[Image.Image] = None
|
|
stitched_map_image: Optional[Image.Image] = None # Store the base map
|
|
|
|
try:
|
|
# --- Calculate SAR Footprint Parameters ---
|
|
# Extract center coordinates (convert back to degrees for utility functions)
|
|
center_lat_deg = math.degrees(geo_info_radians.get('lat', 0.0))
|
|
center_lon_deg = math.degrees(geo_info_radians.get('lon', 0.0))
|
|
# Extract scale and dimensions to calculate size
|
|
scale_x = geo_info_radians.get('scale_x', 0.0)
|
|
width_px = geo_info_radians.get('width_px', 0)
|
|
# Calculate size in KM, using default from config as fallback
|
|
if scale_x > 0 and width_px > 0:
|
|
size_km = (scale_x * width_px) / 1000.0
|
|
logging.debug(f"{log_prefix} Calculated approximate size based on scale_x * width_px: {size_km:.2f} km")
|
|
else:
|
|
logging.error(
|
|
f"{log_prefix} Invalid scale_x ({scale_x}) or width_px ({width_px}) in received GeoInfo. "
|
|
f"Cannot determine map size from data. Using fallback default size: {config.SAR_IMAGE_SIZE_KM} km."
|
|
)
|
|
size_km = config.SAR_IMAGE_SIZE_KM
|
|
# Get zoom level from config
|
|
zoom = config.DEFAULT_MAP_ZOOM_LEVEL
|
|
|
|
# --- Fetch and Stitch Base Map ---
|
|
# 1. Calculate Geographic Bounding Box for fetching tiles
|
|
logging.debug(f"{log_prefix} Calculating map tile BBox for center ({center_lat_deg:.4f},{center_lon_deg:.4f}), size {size_km*1.2:.1f}km.")
|
|
fetch_bbox = get_bounding_box_from_center_size(center_lat_deg, center_lon_deg, size_km * 1.2)
|
|
if fetch_bbox is None:
|
|
raise MapCalculationError("Tile Bounding Box calculation failed.")
|
|
|
|
# 2. Calculate Tile Ranges needed for the bounding box
|
|
logging.debug(f"{log_prefix} Calculating tile ranges for zoom {zoom}...")
|
|
tile_ranges = get_tile_ranges_for_bbox(fetch_bbox, zoom)
|
|
if tile_ranges is None:
|
|
raise MapCalculationError("Tile range calculation failed.")
|
|
|
|
# --- Check shutdown flag before potentially long operation ---
|
|
if self._app_state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown detected before stitching base map.")
|
|
return
|
|
|
|
# 3. Stitch Background Map Image using MapTileManager
|
|
logging.debug(f"{log_prefix} Stitching base map tiles (Zoom: {zoom}, X: {tile_ranges[0]}, Y: {tile_ranges[1]})...")
|
|
stitched_map_image = self._map_tile_manager.stitch_map_image(zoom, tile_ranges[0], tile_ranges[1])
|
|
|
|
# --- Validate Stitched Image and Log ---
|
|
if stitched_map_image is None:
|
|
logging.error(f"{log_prefix} MapTileManager.stitch_map_image returned None. Cannot proceed.")
|
|
map_image_with_overlay = None
|
|
raise MapCalculationError("Failed to stitch base map image.")
|
|
else:
|
|
logging.debug(f"{log_prefix} Base map stitched successfully (PIL Size: {stitched_map_image.size}, Mode: {stitched_map_image.mode}).")
|
|
map_image_with_overlay = stitched_map_image.copy()
|
|
|
|
# --- Check shutdown flag after stitching ---
|
|
if self._app_state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown detected after stitching base map.")
|
|
return
|
|
|
|
# --- Calculate and Draw SAR Bounding Box ---
|
|
# 4. Calculate Geographic Coordinates of SAR Corners
|
|
logging.debug(f"{log_prefix} Calculating SAR corner geographic coordinates...")
|
|
sar_corners_deg = self._calculate_sar_corners_geo(geo_info_radians)
|
|
if sar_corners_deg is None:
|
|
raise MapCalculationError("SAR corner geographic coordinate calculation failed.")
|
|
|
|
# 5. Convert SAR Corner Geographic Coords to Pixel Coords on Stitched Map
|
|
logging.debug(f"{log_prefix} Converting SAR corners to map pixel coordinates...")
|
|
top_left_tile = mercantile.Tile(x=tile_ranges[0][0], y=tile_ranges[1][0], z=zoom)
|
|
map_display_bounds = mercantile.bounds(top_left_tile)
|
|
sar_corners_pixels = self._geo_coords_to_map_pixels(
|
|
coords_deg=sar_corners_deg,
|
|
map_bounds=map_display_bounds,
|
|
map_tile_ranges=tile_ranges,
|
|
zoom=zoom,
|
|
stitched_map_shape=map_image_with_overlay.size[::-1]
|
|
)
|
|
if sar_corners_pixels is None:
|
|
raise MapCalculationError("SAR corner to map pixel conversion failed.")
|
|
|
|
# 6. Draw the SAR Bounding Box Polygon on the map image copy
|
|
logging.debug(f"{log_prefix} Drawing SAR bounding box polygon on map image...")
|
|
try:
|
|
map_cv = cv2.cvtColor(np.array(map_image_with_overlay), cv2.COLOR_RGB2BGR)
|
|
pts = np.array(sar_corners_pixels, np.int32).reshape((-1, 1, 2))
|
|
cv2.polylines(map_cv, [pts], isClosed=True, color=(0, 0, 255), thickness=2)
|
|
map_image_with_overlay = Image.fromarray(cv2.cvtColor(map_cv, cv2.COLOR_BGR2RGB))
|
|
logging.debug(f"{log_prefix} SAR bounding box drawn successfully on map.")
|
|
except Exception as draw_err:
|
|
logging.exception(f"{log_prefix} Error drawing SAR bounding box on map:")
|
|
map_image_with_overlay = stitched_map_image
|
|
logging.warning(f"{log_prefix} Proceeding with map display without SAR bounding box due to drawing error.")
|
|
|
|
current_center_lat_deg = math.degrees(geo_info_radians['lat']) # Usa la lat attuale
|
|
map_image_with_overlay = self._draw_scale_bar(map_image_with_overlay, current_center_lat_deg, zoom)
|
|
|
|
except MapCalculationError as e:
|
|
logging.error(f"{log_prefix} Map overlay calculation failed: {e}")
|
|
map_image_with_overlay = stitched_map_image
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Unexpected error during map overlay update:")
|
|
map_image_with_overlay = stitched_map_image
|
|
finally:
|
|
# --- Queue Result for Main Thread Display ---
|
|
if not self._app_state.shutting_down:
|
|
payload_type = type(map_image_with_overlay)
|
|
payload_size = getattr(map_image_with_overlay, 'size', 'N/A')
|
|
logging.debug(f"{log_prefix} Queueing SHOW_MAP command for updated map overlay. Payload Type: {payload_type}, Size: {payload_size}")
|
|
put_queue(self._tkinter_queue, ('SHOW_MAP', map_image_with_overlay), "tkinter", self._app)
|
|
else:
|
|
logging.debug(f"{log_prefix} Skipping queue put due to shutdown.")
|
|
|
|
def display_map(self, map_image_pil: Optional[Image.Image]):
|
|
"""
|
|
Instructs the MapDisplayWindow to show the provided map image.
|
|
This method is intended to be called from the main thread (e.g., via tkinter_queue).
|
|
|
|
Args:
|
|
map_image_pil (Optional[Image.Image]): The PIL map image to display, or None for placeholder.
|
|
"""
|
|
log_prefix = f"{self._log_prefix} Display"
|
|
if self._map_display_window:
|
|
logging.debug(f"{log_prefix} Calling MapDisplayWindow.show_map...")
|
|
try:
|
|
self._map_display_window.show_map(map_image_pil)
|
|
# Update app status only *after* the initial map load attempt completes
|
|
self._update_app_status_after_map_load(map_image_pil is not None)
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error calling MapDisplayWindow.show_map:")
|
|
else:
|
|
logging.warning(f"{log_prefix} Map display window not available. Cannot display map.")
|
|
|
|
def _update_app_status_after_map_load(self, success: bool):
|
|
"""Updates the main application status after the initial map load attempt."""
|
|
log_prefix = f"{self._log_prefix} Status Update"
|
|
try:
|
|
# Check if the status bar still shows the loading message
|
|
# Access status bar via self._app reference
|
|
if hasattr(self._app, 'statusbar') and "Loading initial map" in self._app.statusbar.cget("text"):
|
|
# Determine the final status based on success and current mode
|
|
if success:
|
|
current_mode = self._app.state.test_mode_active # Check current mode
|
|
status_msg = "Ready (Test Mode)" if current_mode else \
|
|
("Ready (Local Mode)" if config.USE_LOCAL_IMAGES else \
|
|
(f"Listening UDP {self._app.local_ip}:{self._app.local_port}" if self._app.udp_socket else "Error: No Socket"))
|
|
else:
|
|
status_msg = "Error Loading Map"
|
|
|
|
logging.debug(f"{log_prefix} Initial map load finished (Success: {success}). Setting App status to: '{status_msg}'")
|
|
self._app.set_status(status_msg) # Use App's method for thread safety
|
|
#else: # Status already updated by something else, do nothing
|
|
# logging.debug(f"{log_prefix} Skipping status update, map loading message not present.")
|
|
except Exception as e:
|
|
logging.warning(f"{log_prefix} Error checking/updating app status after map load: {e}")
|
|
|
|
|
|
def shutdown(self):
|
|
"""Cleans up map-related resources, like closing the display window."""
|
|
log_prefix = f"{self._log_prefix} Shutdown"
|
|
logging.debug(f"{log_prefix} Shutting down map integration components...")
|
|
|
|
# Destroy Map Display Window
|
|
if self._map_display_window:
|
|
logging.debug(f"{log_prefix} Requesting MapDisplayWindow destroy...")
|
|
try:
|
|
self._map_display_window.destroy_window() # Handles internal logging
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error destroying MapDisplayWindow:")
|
|
self._map_display_window = None
|
|
|
|
# Stop initial display thread if still running (less critical as it's daemon)
|
|
# No explicit stop needed, just rely on shutdown flag check within the thread
|
|
|
|
# Clear tile manager cache? Optional, maybe not needed on normal shutdown.
|
|
|
|
logging.debug(f"{log_prefix} Map integration shutdown complete.")
|
|
|
|
def _calculate_sar_corners_geo(
|
|
self, geo_info: Dict[str, Any]
|
|
) -> Optional[List[Tuple[float, float]]]:
|
|
"""
|
|
Calculates the geographic coordinates (latitude, longitude in degrees)
|
|
of the four corners of the SAR image based on its georeferencing info.
|
|
|
|
Args:
|
|
geo_info (Dict[str, Any]): The georeferencing dictionary from AppState.
|
|
Expects keys like 'lat', 'lon', 'orientation' (radians),
|
|
'ref_x', 'ref_y', 'scale_x', 'scale_y', 'width_px', 'height_px'.
|
|
|
|
Returns:
|
|
Optional[List[Tuple[float, float]]]: A list of four (lon, lat) tuples in degrees
|
|
representing the corners (e.g., TL, TR, BR, BL),
|
|
or None on error.
|
|
"""
|
|
log_prefix = f"{self._log_prefix} SAR Corners Geo"
|
|
logging.debug(f"{log_prefix} Calculating SAR corner geographic coordinates...")
|
|
|
|
if not self._geod:
|
|
logging.error(f"{log_prefix} Geodetic calculator (pyproj.Geod) not initialized.")
|
|
return None
|
|
|
|
try:
|
|
# Extract necessary info (ensure keys exist and values are valid)
|
|
center_lat_rad = geo_info['lat']
|
|
center_lon_rad = geo_info['lon']
|
|
orient_rad = geo_info['orientation']
|
|
ref_x = geo_info['ref_x']
|
|
ref_y = geo_info['ref_y']
|
|
scale_x = geo_info['scale_x'] # meters/pixel
|
|
scale_y = geo_info['scale_y'] # meters/pixel
|
|
width = geo_info['width_px']
|
|
height = geo_info['height_px']
|
|
|
|
if not (scale_x > 0 and scale_y > 0 and width > 0 and height > 0):
|
|
logging.error(f"{log_prefix} Invalid scale or dimensions in geo_info.")
|
|
return None
|
|
|
|
# 1. Calculate pixel coordinates of corners relative to the reference pixel (ref_x, ref_y)
|
|
# Origin (0,0) is top-left. Y increases downwards in pixel space.
|
|
corners_pixel = [
|
|
(0 - ref_x, ref_y - 0), # Top-Left (dx, dy relative to ref, y inverted)
|
|
(width - 1 - ref_x, ref_y - 0), # Top-Right
|
|
(width - 1 - ref_x, ref_y - (height - 1)), # Bottom-Right
|
|
(0 - ref_x, ref_y - (height - 1)) # Bottom-Left
|
|
]
|
|
|
|
# 2. Convert pixel offsets to meter offsets
|
|
corners_meters = [
|
|
(dx * scale_x, dy * scale_y) for dx, dy in corners_pixel
|
|
] # (delta_meters_east, delta_meters_north)
|
|
|
|
# 3. Apply inverse rotation to meter offsets if necessary
|
|
# The map needs the *geographic* corners, so we need to find where
|
|
# the image corners land geographically. We start from the geo center
|
|
# and calculate the destination point by applying the *rotated* meter offsets.
|
|
corners_meters_rotated = []
|
|
if abs(orient_rad) > 1e-6: # Apply rotation if significant
|
|
cos_o = math.cos(orient_rad)
|
|
sin_o = math.sin(orient_rad)
|
|
for dx_m, dy_m in corners_meters:
|
|
# Rotate the offset vector (dx_m, dy_m) by orient_rad
|
|
rot_dx = dx_m * cos_o - dy_m * sin_o
|
|
rot_dy = dx_m * sin_o + dy_m * cos_o
|
|
corners_meters_rotated.append((rot_dx, rot_dy))
|
|
logging.debug(f"{log_prefix} Applied rotation ({math.degrees(orient_rad):.2f} deg) to meter offsets.")
|
|
else:
|
|
corners_meters_rotated = corners_meters # No rotation needed
|
|
logging.debug(f"{log_prefix} Skipping rotation for meter offsets (angle near zero).")
|
|
|
|
|
|
# 4. Calculate geographic coordinates of corners using pyproj.Geod.fwd
|
|
# This requires calculating distance and azimuth from the center to each rotated meter offset.
|
|
sar_corners_geo_deg = []
|
|
for dx_m_rot, dy_m_rot in corners_meters_rotated:
|
|
# Calculate distance from center (0,0) in rotated meter space
|
|
distance_m = math.sqrt(dx_m_rot**2 + dy_m_rot**2)
|
|
# Calculate azimuth from center (North=0, East=90)
|
|
# atan2(dx, dy) gives angle relative to North axis
|
|
azimuth_rad = math.atan2(dx_m_rot, dy_m_rot)
|
|
azimuth_deg = math.degrees(azimuth_rad)
|
|
|
|
# Use geod.fwd from the known center lat/lon (radians needed for input?)
|
|
# pyproj fwd expects degrees for lon, lat, az
|
|
center_lon_deg = math.degrees(center_lon_rad)
|
|
center_lat_deg = math.degrees(center_lat_rad)
|
|
|
|
# Calculate the destination point
|
|
endlon, endlat, _ = self._geod.fwd(center_lon_deg, center_lat_deg, azimuth_deg, distance_m)
|
|
|
|
# Append (lon, lat) tuple in degrees
|
|
sar_corners_geo_deg.append((endlon, endlat))
|
|
logging.debug(f"{log_prefix} Calculated corner: Dist={distance_m:.1f}m, Az={azimuth_deg:.2f}deg -> Lon={endlon:.6f}, Lat={endlat:.6f}")
|
|
|
|
if len(sar_corners_geo_deg) != 4:
|
|
logging.error(f"{log_prefix} Failed to calculate all 4 corner coordinates.")
|
|
return None
|
|
|
|
logging.debug(f"{log_prefix} Successfully calculated 4 SAR corner geographic coordinates.")
|
|
return sar_corners_geo_deg
|
|
|
|
except KeyError as ke:
|
|
logging.error(f"{log_prefix} Missing required key in geo_info: {ke}")
|
|
return None
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error calculating SAR corner coordinates:")
|
|
return None
|
|
|
|
# --- NUOVA FUNZIONE HELPER (SCHELETRO/PLACEHOLDER) ---
|
|
def _geo_coords_to_map_pixels(
|
|
self,
|
|
coords_deg: List[Tuple[float, float]],
|
|
map_bounds: mercantile.LngLatBbox,
|
|
map_tile_ranges: Tuple[Tuple[int, int], Tuple[int, int]],
|
|
zoom: int,
|
|
stitched_map_shape: Tuple[int, int] # (height, width)
|
|
) -> Optional[List[Tuple[int, int]]]:
|
|
"""
|
|
Converts a list of geographic coordinates (lon, lat degrees) to pixel
|
|
coordinates (x, y) relative to the top-left corner of the stitched map image.
|
|
|
|
Args:
|
|
coords_deg (List[Tuple[float, float]]): List of (longitude, latitude) tuples in degrees.
|
|
map_bounds (mercantile.LngLatBbox): Geographic bounds of the *top-left tile* used for stitching.
|
|
Used as the reference for pixel conversion.
|
|
map_tile_ranges (Tuple[Tuple[int, int], Tuple[int, int]]): ((min_x, max_x), (min_y, max_y)) tile indices.
|
|
zoom (int): The zoom level of the map tiles.
|
|
stitched_map_shape (Tuple[int, int]): The shape (height, width) of the stitched map image in pixels.
|
|
|
|
Returns:
|
|
Optional[List[Tuple[int, int]]]: List of (x, y) pixel coordinates corresponding
|
|
to the input geographic coordinates, relative to the
|
|
top-left of the stitched map image. Returns None on error.
|
|
"""
|
|
log_prefix = f"{self._log_prefix} Geo to Pixel"
|
|
logging.debug(f"{log_prefix} Converting {len(coords_deg)} geo coordinates to map pixels...")
|
|
|
|
if mercantile is None:
|
|
logging.error(f"{log_prefix} Mercantile library not available.")
|
|
return None
|
|
if not stitched_map_shape or stitched_map_shape[0] <= 0 or stitched_map_shape[1] <= 0:
|
|
logging.error(f"{log_prefix} Invalid stitched map shape: {stitched_map_shape}")
|
|
return None
|
|
|
|
pixel_coords = []
|
|
map_height_px, map_width_px = stitched_map_shape
|
|
# Tile size from config or service? Assume 256 for mercantile functions
|
|
tile_size = self._map_service.tile_size if self._map_service else 256
|
|
|
|
try:
|
|
# Get the coordinates of the top-left corner of the entire stitched map in the world pixel space (at the given zoom)
|
|
# This is the top-left corner of the top-left tile (min_x, min_y)
|
|
min_tile_x = map_tile_ranges[0][0]
|
|
min_tile_y = map_tile_ranges[1][0]
|
|
# mercantile.xy_bounds(tile) gives bounds in projected meters, not pixels
|
|
# We need the pixel coordinates using mercantile.xy() perhaps?
|
|
|
|
# Let's try converting each geographic point to its world pixel coordinate at the given zoom
|
|
# and then find its position relative to the top-left corner of our stitched map area.
|
|
|
|
# Calculate the world pixel coordinate (at zoom level) of the top-left corner of our stitched map area
|
|
# This corresponds to the top-left of tile (min_tile_x, min_tile_y)
|
|
tl_tile_bounds = mercantile.xy_bounds(min_tile_x, min_tile_y, zoom)
|
|
# mercantile.xy() converts lon/lat to projected meters (Web Mercator)
|
|
# We need a function to convert lon/lat directly to *tile pixel coordinates* or *world pixel coordinates*
|
|
# mercantile doesn't seem to offer this directly. We might need to implement the math:
|
|
# https://developers.google.com/maps/documentation/javascript/examples/map-coordinates
|
|
|
|
# --- Alternative Approach using mercantile.xy and relating to tile bounds ---
|
|
# 1. Find the projected meter coordinates (Web Mercator) of the top-left corner of the stitched area.
|
|
tl_tile_mercator_bounds = mercantile.xy_bounds(min_tile_x, min_tile_y, zoom)
|
|
map_origin_x_mercator = tl_tile_mercator_bounds.left
|
|
map_origin_y_mercator = tl_tile_mercator_bounds.top # Top has higher Y in Mercator
|
|
|
|
# 2. Calculate the total span of the stitched map in Mercator meters
|
|
max_tile_x = map_tile_ranges[0][1]
|
|
max_tile_y = map_tile_ranges[1][1]
|
|
br_tile_mercator_bounds = mercantile.xy_bounds(max_tile_x, max_tile_y, zoom)
|
|
map_total_width_mercator = br_tile_mercator_bounds.right - map_origin_x_mercator
|
|
map_total_height_mercator = map_origin_y_mercator - br_tile_mercator_bounds.bottom # Top Y > Bottom Y
|
|
|
|
if map_total_width_mercator <= 0 or map_total_height_mercator <=0:
|
|
logging.error(f"{log_prefix} Invalid map span in Mercator coordinates calculated.")
|
|
return None
|
|
|
|
# 3. For each input geographic coordinate:
|
|
for lon, lat in coords_deg:
|
|
# a. Convert geo coord to Mercator meters
|
|
point_x_mercator, point_y_mercator = mercantile.xy(lon, lat)
|
|
|
|
# b. Calculate the coordinate relative to the map's top-left origin in Mercator meters
|
|
relative_x_mercator = point_x_mercator - map_origin_x_mercator
|
|
relative_y_mercator = map_origin_y_mercator - point_y_mercator # Invert Y difference
|
|
|
|
# c. Scale the relative Mercator coordinates to pixel coordinates based on the total map span and pixel dimensions
|
|
pixel_x = int(round((relative_x_mercator / map_total_width_mercator) * map_width_px))
|
|
pixel_y = int(round((relative_y_mercator / map_total_height_mercator) * map_height_px))
|
|
|
|
# Clamp pixel coordinates to be within the stitched map bounds
|
|
pixel_x_clamped = max(0, min(pixel_x, map_width_px - 1))
|
|
pixel_y_clamped = max(0, min(pixel_y, map_height_px - 1))
|
|
|
|
if pixel_x != pixel_x_clamped or pixel_y != pixel_y_clamped:
|
|
logging.warning(f"{log_prefix} Clamped pixel coords for ({lon:.4f},{lat:.4f}): ({pixel_x},{pixel_y}) -> ({pixel_x_clamped},{pixel_y_clamped})")
|
|
|
|
pixel_coords.append((pixel_x_clamped, pixel_y_clamped))
|
|
logging.debug(f"{log_prefix} Converted ({lon:.4f},{lat:.4f}) -> MercatorRel({relative_x_mercator:.1f},{relative_y_mercator:.1f}) -> Pixel({pixel_x_clamped},{pixel_y_clamped})")
|
|
|
|
logging.debug(f"{log_prefix} Successfully converted {len(pixel_coords)} coordinates to map pixels.")
|
|
return pixel_coords
|
|
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error converting geo coordinates to map pixels:")
|
|
return None
|
|
|
|
def _draw_scale_bar(self, image_pil: Image.Image, latitude_deg: float, zoom: int) -> Image.Image:
|
|
"""Draws a simple scale bar onto the map image."""
|
|
log_prefix = f"{self._log_prefix} ScaleBar"
|
|
if image_pil is None: return None # Non fare nulla se l'immagine non c'è
|
|
|
|
try:
|
|
# 1. Calculate meters/pixel
|
|
meters_per_pixel = calculate_meters_per_pixel(latitude_deg, zoom)
|
|
if meters_per_pixel is None or meters_per_pixel <= 0:
|
|
logging.warning(f"{log_prefix} Invalid meters_per_pixel ({meters_per_pixel}). Cannot draw scale bar.")
|
|
return image_pil # Restituisci l'immagine originale
|
|
|
|
# 2. Choose scale distance based on meters/pixel or image width
|
|
# Obiettivo: barra lunga ~100-150 pixel
|
|
img_w, img_h = image_pil.size
|
|
target_bar_px = max(50, min(150, img_w // 4)) # Lunghezza desiderata in pixel (adattiva)
|
|
|
|
# Trova una distanza "tonda" (1, 2, 5, 10, 20, 50, 100... km)
|
|
# che corrisponda a circa target_bar_px
|
|
possible_distances_km = [0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000]
|
|
best_dist_km = 1
|
|
min_diff = float('inf')
|
|
|
|
for dist_km in possible_distances_km:
|
|
dist_m = dist_km * 1000.0
|
|
pixels = dist_m / meters_per_pixel
|
|
diff = abs(pixels - target_bar_px)
|
|
if diff < min_diff and pixels > 10: # Assicura una barra minima
|
|
min_diff = diff
|
|
best_dist_km = dist_km
|
|
|
|
scale_distance_km = best_dist_km
|
|
scale_distance_meters = scale_distance_km * 1000.0
|
|
scale_bar_pixels = int(round(scale_distance_meters / meters_per_pixel))
|
|
|
|
if scale_bar_pixels < 10: # Troppo piccola per essere utile
|
|
logging.warning(f"{log_prefix} Calculated scale bar length too small ({scale_bar_pixels}px). Skipping draw.")
|
|
return image_pil
|
|
|
|
# 3. Prepare for drawing (Convert to OpenCV BGR)
|
|
map_cv = cv2.cvtColor(np.array(image_pil), cv2.COLOR_RGB2BGR)
|
|
h, w = map_cv.shape[:2]
|
|
|
|
# 4. Define drawing parameters
|
|
bar_x_start = 15
|
|
bar_y = h - 20 # Posiziona in basso
|
|
bar_thickness = 2
|
|
text_offset_y = -5
|
|
font = cv2.FONT_HERSHEY_SIMPLEX
|
|
font_scale = 0.5
|
|
font_thickness = 1
|
|
color = (0, 0, 0) # Nero
|
|
|
|
# 5. Draw the scale bar line
|
|
cv2.line(map_cv, (bar_x_start, bar_y), (bar_x_start + scale_bar_pixels, bar_y), color, bar_thickness)
|
|
# Draw small ticks at the ends
|
|
cv2.line(map_cv, (bar_x_start, bar_y - 3), (bar_x_start, bar_y + 3), color, bar_thickness)
|
|
cv2.line(map_cv, (bar_x_start + scale_bar_pixels, bar_y - 3), (bar_x_start + scale_bar_pixels, bar_y + 3), color, bar_thickness)
|
|
|
|
|
|
# 6. Draw the text label
|
|
label = f"{scale_distance_km} km" if scale_distance_km >= 1 else f"{int(scale_distance_meters)} m"
|
|
text_size, _ = cv2.getTextSize(label, font, font_scale, font_thickness)
|
|
text_x = bar_x_start + (scale_bar_pixels // 2) - (text_size[0] // 2)
|
|
text_y = bar_y + text_offset_y - 5 # Posiziona sopra la barra
|
|
cv2.putText(map_cv, label, (text_x, text_y), font, font_scale, color, font_thickness, cv2.LINE_AA)
|
|
|
|
# 7. Convert back to PIL RGB
|
|
image_pil_with_scale = Image.fromarray(cv2.cvtColor(map_cv, cv2.COLOR_BGR2RGB))
|
|
logging.debug(f"{log_prefix} Scale bar drawn ({label}, {scale_bar_pixels}px).")
|
|
return image_pil_with_scale
|
|
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error drawing scale bar:")
|
|
return image_pil # Restituisci l'originale in caso di errore
|
|
|
|
|
|
# --- END OF FILE map_integration.py --- |