SXXXXXXX_ControlPanel/map_integration.py
2025-04-09 13:37:43 +02:00

789 lines
33 KiB
Python

# --- START OF FILE map_integration.py ---
# map_integration.py
"""
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
Manages map integration functionalities, including service interaction,
tile fetching/caching, display window management, and overlay updates.
Acts as an intermediary between the main application and map-specific modules.
"""
# Standard library imports
import logging
import threading
import queue # For type hinting
import math
from typing import Optional, Dict, Any, Tuple, List
# Third-party imports
import numpy as np
try:
from PIL import Image
except ImportError:
Image = None # Handled in dependent modules, but check here too
try:
import mercantile
except ImportError:
mercantile = None
try:
import pyproj
except ImportError:
pyproj = None
import cv2 # Keep for drawing overlay
# Local application imports
import config
from app_state import AppState
from utils import put_queue
# Map specific modules that this manager orchestrates
from map_services import get_map_service, BaseMapService
from map_manager import MapTileManager
from map_utils import (
get_bounding_box_from_center_size,
get_tile_ranges_for_bbox,
MapCalculationError,
calculate_meters_per_pixel,
)
from map_display import MapDisplayWindow
# Forward declaration for type hinting App instance
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from app import App
class MapIntegrationManager:
"""Orchestrates map services, tile management, and display."""
def __init__(
self,
app_state: AppState,
tkinter_queue: queue.Queue,
app: "App", # Accept App instance
map_x: int,
map_y: int,
):
"""
Initializes the MapIntegrationManager.
Args:
app_state (AppState): Reference to the shared application state.
tkinter_queue (queue.Queue): Queue for sending display commands to the main thread.
app (App): Reference to the main application instance.
map_x (int): Initial X position for the map window.
map_y (int): Initial Y position for the map window.
Raises:
ImportError: If required libraries (Pillow, mercantile, pyproj) are missing.
ValueError: If the configured map service cannot be loaded.
pyproj.exceptions.CRSError: If the geodetic calculator fails to initialize.
"""
self._log_prefix = "[MapIntegrationManager]"
logging.debug(f"{self._log_prefix} Initializing...")
self._app_state: AppState = app_state
self._tkinter_queue: queue.Queue = tkinter_queue
self._app: "App" = app # Store App instance
# --- Dependency Checks ---
if Image is None:
raise ImportError("Pillow library not found")
if mercantile is None:
raise ImportError("mercantile library not found")
if pyproj is None:
raise ImportError("pyproj library not found")
# --- Initialize Attributes ---
self._map_service: Optional[BaseMapService] = None
self._map_tile_manager: Optional[MapTileManager] = None
self._map_display_window: Optional[MapDisplayWindow] = None
self._map_initial_display_thread: Optional[threading.Thread] = None
self._geod: Optional[pyproj.Geod] = None
try:
# --- Geodetic Calculator Initialization ---
self._geod = pyproj.Geod(ellps="WGS84")
logging.debug(f"{self._log_prefix} pyproj Geod object initialized (WGS84).")
# --- Initialize Other Map Components ---
# 1. Get Map Service
service_name = getattr(config, "MAP_SERVICE_PROVIDER", "osm")
api_key = getattr(config, "MAP_API_KEY", None)
self._map_service = get_map_service(service_name, api_key)
if not self._map_service:
raise ValueError(f"Failed to get map service '{service_name}'.")
logging.debug(
f"{self._log_prefix} Map service '{self._map_service.name}' loaded."
)
# 2. Create Tile Manager
self._map_tile_manager = MapTileManager(
map_service=self._map_service,
cache_base_dir=getattr(config, "MAP_CACHE_DIRECTORY", None),
enable_online_fetching=getattr(
config, "ENABLE_ONLINE_MAP_FETCHING", None
),
)
logging.debug(f"{self._log_prefix} MapTileManager created.")
# 3. Create Map Display Window Manager (Passing App instance)
self._map_display_window = MapDisplayWindow(
app=self._app, # <<< PASS APP INSTANCE
window_name="Map Overlay",
x_pos=map_x,
y_pos=map_y,
)
logging.debug(
f"{self._log_prefix} MapDisplayWindow created at ({map_x},{map_y})."
)
# 4. Trigger Initial Map Display in Background Thread
logging.debug(f"{self._log_prefix} Starting initial map display thread...")
self._app.set_status("Loading initial map...") # Set initial status
self._map_initial_display_thread = threading.Thread(
target=self._display_initial_map_area_thread,
name="InitialMapDisplayThread",
daemon=True,
)
self._map_initial_display_thread.start()
except (ImportError, ValueError, pyproj.exceptions.CRSError) as init_err:
logging.critical(f"{self._log_prefix} Initialization failed: {init_err}")
self._geod = None
self._map_service = None
self._map_tile_manager = None
self._map_display_window = None
raise
except Exception as e:
logging.exception(
f"{self._log_prefix} Unexpected error during initialization:"
)
self._geod = None
self._map_service = None
self._map_tile_manager = None
self._map_display_window = None
raise
def _display_initial_map_area_thread(self):
"""
(Runs in background thread) Calculates initial map area and queues for display.
Skips if default lat/lon in config are (0,0).
Stores the resulting PIL image in AppState before queueing.
"""
log_prefix = f"{self._log_prefix} InitialMap"
map_image_pil: Optional[Image.Image] = None # Initialize
# Check skip condition based on config defaults
if config.SAR_CENTER_LAT == 0.0 and config.SAR_CENTER_LON == 0.0:
logging.debug(
f"{log_prefix} Initial map display skipped (config default 0,0). Waiting for valid GeoInfo."
)
status_msg = "Status Unavailable" # Default status if skipped
#if not self._app_state.shutting_down:
# try: # Determine appropriate 'Ready' status based on mode
# if self._app_state.test_mode_active:
# status_msg = "Ready (Test Mode)"
# elif config.USE_LOCAL_IMAGES:
# status_msg = "Ready (Local Mode)"
# else:
# status_msg = (
# f"Listening UDP {self._app.local_ip}:{self._app.local_port}"
# if self._app.udp_socket
# else "Error: No Socket"
# )
# status_msg += " | Map Ready (Waiting for GeoData)"
# except Exception as e:
# logging.exception(f"{log_prefix} Error determining status message:")
# status_msg = (
# "Error Getting Status | Map Ready (Waiting for GeoData)"
# )
# self._app.set_status(status_msg) # Update App status via main instance
# Skip further processing and queueing
self._app_state.last_map_image_pil = None # Ensure state is cleared
put_queue(
self._tkinter_queue, ("SHOW_MAP", None), "tkinter", self._app
) # Send None to potentially create placeholder window
return
# Proceed with calculation if default lat/lon are non-zero
logging.debug(
f"{log_prefix} Calculating initial map area based on non-zero config defaults..."
)
if not (self._map_tile_manager and self._map_display_window):
logging.error(
f"{log_prefix} Map components not initialized. Aborting thread."
)
self._app_state.last_map_image_pil = None
put_queue(self._tkinter_queue, ("SHOW_MAP", None), "tkinter", self._app)
return
if self._app_state.shutting_down:
logging.debug(f"{log_prefix} Shutdown detected. Aborting.")
return
try:
zoom = config.DEFAULT_MAP_ZOOM_LEVEL
logging.debug(f"{log_prefix} Using default zoom level: {zoom}")
bbox = get_bounding_box_from_center_size(
config.SAR_CENTER_LAT, config.SAR_CENTER_LON, config.SAR_IMAGE_SIZE_KM
)
if bbox is None:
raise MapCalculationError("Failed to calculate initial bounding box.")
tile_ranges = get_tile_ranges_for_bbox(bbox, zoom)
if tile_ranges is None:
raise MapCalculationError("Failed to calculate initial tile ranges.")
if self._app_state.shutting_down:
logging.debug(f"{log_prefix} Shutdown detected before stitching.")
return
logging.debug(
f"{log_prefix} Stitching initial map tiles (Zoom: {zoom}, X: {tile_ranges[0]}, Y: {tile_ranges[1]})..."
)
map_image_pil = self._map_tile_manager.stitch_map_image(
zoom, tile_ranges[0], tile_ranges[1]
)
if self._app_state.shutting_down:
logging.debug(f"{log_prefix} Shutdown detected after stitching.")
return
if map_image_pil:
logging.debug(f"{log_prefix} Initial map area stitched successfully.")
center_lat_deg = config.SAR_CENTER_LAT
map_image_pil = self._draw_scale_bar(
map_image_pil, center_lat_deg, zoom
) # Add scale bar
else:
logging.error(f"{log_prefix} Failed to stitch initial map area.")
except (ImportError, MapCalculationError) as e:
logging.error(f"{log_prefix} Calculation error: {e}")
map_image_pil = None
except Exception as e:
logging.exception(f"{log_prefix} Unexpected error calculating initial map:")
map_image_pil = None
finally:
# --- Store final image in AppState and Queue ---
if not self._app_state.shutting_down:
logging.debug(
f"{log_prefix} Storing initial map in AppState before queueing."
)
self._app_state.last_map_image_pil = (
map_image_pil.copy() if map_image_pil else None
) # Store copy
logging.debug(
f"{log_prefix} Queueing SHOW_MAP command for initial map."
)
put_queue(
self._tkinter_queue,
("SHOW_MAP", map_image_pil),
"tkinter",
self._app,
)
logging.debug(f"{log_prefix} Initial map display thread finished.")
def update_map_overlay(
self, sar_normalized_uint8: np.ndarray, geo_info_radians: Dict[str, Any]
):
"""
Calculates the map overlay based on SAR data, draws footprint,
and queues the result for display. Stores the resulting PIL image
in AppState before queueing.
"""
log_prefix = f"{self._log_prefix} Map Update"
map_image_with_overlay: Optional[Image.Image] = None # Initialize
stitched_map_image: Optional[Image.Image] = None
# --- Prerequisite Checks ---
if self._app_state.shutting_down:
logging.debug(f"{log_prefix} Skipping: Shutdown.")
return
if self._app_state.test_mode_active:
logging.debug(f"{log_prefix} Skipping: Test Mode.")
return
if not (self._map_tile_manager and self._map_display_window and self._geod):
logging.warning(f"{log_prefix} Skipping: Map components not ready.")
return
if not geo_info_radians or not geo_info_radians.get("valid", False):
logging.warning(f"{log_prefix} Skipping: Invalid GeoInfo.")
return
if Image is None or mercantile is None or pyproj is None:
logging.error(f"{log_prefix} Skipping: Missing libraries.")
return
logging.debug(f"{log_prefix} Starting map overlay calculation...")
try:
# --- Calculate SAR Footprint Parameters ---
center_lat_deg = math.degrees(geo_info_radians.get("lat", 0.0))
center_lon_deg = math.degrees(geo_info_radians.get("lon", 0.0))
scale_x = geo_info_radians.get("scale_x", 0.0)
width_px = geo_info_radians.get("width_px", 0)
if scale_x > 0 and width_px > 0:
size_km = (scale_x * width_px) / 1000.0
else:
logging.error(
f"{log_prefix} Invalid scale/width in GeoInfo. Using fallback size."
)
size_km = config.SAR_IMAGE_SIZE_KM
zoom = config.DEFAULT_MAP_ZOOM_LEVEL
# --- Fetch and Stitch Base Map ---
logging.debug(
f"{log_prefix} Calculating fetch BBox (Center: {center_lat_deg:.4f},{center_lon_deg:.4f}, Size: {size_km*1.2:.1f}km)"
)
fetch_bbox = get_bounding_box_from_center_size(
center_lat_deg, center_lon_deg, size_km * 1.2
)
if fetch_bbox is None:
raise MapCalculationError("Tile BBox calculation failed.")
logging.debug(f"{log_prefix} Calculating tile ranges (Zoom {zoom})...")
tile_ranges = get_tile_ranges_for_bbox(fetch_bbox, zoom)
if tile_ranges is None:
raise MapCalculationError("Tile range calculation failed.")
if self._app_state.shutting_down:
logging.debug(f"{log_prefix} Shutdown before stitching.")
return
logging.debug(
f"{log_prefix} Stitching base map tiles (Zoom: {zoom}, X: {tile_ranges[0]}, Y: {tile_ranges[1]})..."
)
stitched_map_image = self._map_tile_manager.stitch_map_image(
zoom, tile_ranges[0], tile_ranges[1]
)
if stitched_map_image is None:
raise MapCalculationError("Failed to stitch base map image.")
logging.debug(
f"{log_prefix} Base map stitched successfully (PIL Size: {stitched_map_image.size})."
)
map_image_with_overlay = (
stitched_map_image.copy()
) # Start overlay with base map
if self._app_state.shutting_down:
logging.debug(f"{log_prefix} Shutdown after stitching.")
return
# --- Calculate and Draw SAR Bounding Box ---
logging.debug(
f"{log_prefix} Calculating SAR corner geographic coordinates..."
)
sar_corners_deg = self._calculate_sar_corners_geo(geo_info_radians)
if sar_corners_deg is None:
raise MapCalculationError("SAR corner geo calculation failed.")
logging.debug(
f"{log_prefix} Converting SAR corners to map pixel coordinates..."
)
top_left_tile = mercantile.Tile(
x=tile_ranges[0][0], y=tile_ranges[1][0], z=zoom
)
map_display_bounds = mercantile.bounds(top_left_tile)
sar_corners_pixels = self._geo_coords_to_map_pixels(
coords_deg=sar_corners_deg,
map_bounds=map_display_bounds,
map_tile_ranges=tile_ranges,
zoom=zoom,
stitched_map_shape=map_image_with_overlay.size[::-1],
)
if sar_corners_pixels is None:
raise MapCalculationError("SAR corner pixel conversion failed.")
logging.debug(f"{log_prefix} Drawing SAR bounding box polygon...")
try: # Draw polygon on the map copy
map_cv = cv2.cvtColor(
np.array(map_image_with_overlay), cv2.COLOR_RGB2BGR
)
pts = np.array(sar_corners_pixels, np.int32).reshape((-1, 1, 2))
cv2.polylines(
map_cv, [pts], isClosed=True, color=(0, 0, 255), thickness=2
) # Red outline
map_image_with_overlay = Image.fromarray(
cv2.cvtColor(map_cv, cv2.COLOR_BGR2RGB)
)
logging.debug(f"{log_prefix} SAR bounding box drawn.")
except Exception as draw_err:
logging.exception(f"{log_prefix} Error drawing SAR bounding box:")
map_image_with_overlay = stitched_map_image # Fallback to base map
# Draw scale bar using current latitude
current_center_lat_deg = math.degrees(geo_info_radians["lat"])
map_image_with_overlay = self._draw_scale_bar(
map_image_with_overlay, current_center_lat_deg, zoom
)
except MapCalculationError as e:
logging.error(f"{log_prefix} Map overlay calculation failed: {e}")
map_image_with_overlay = stitched_map_image # Use base map on error
except Exception as e:
logging.exception(
f"{log_prefix} Unexpected error during map overlay update:"
)
map_image_with_overlay = stitched_map_image # Use base map on error
finally:
# --- Store final image in AppState and Queue ---
if not self._app_state.shutting_down:
payload_type = type(map_image_with_overlay)
payload_size = getattr(map_image_with_overlay, "size", "N/A")
logging.debug(
f"{log_prefix} Storing final map overlay in AppState. Type: {payload_type}, Size: {payload_size}"
)
self._app_state.last_map_image_pil = (
map_image_with_overlay.copy() if map_image_with_overlay else None
) # Store copy
logging.debug(
f"{log_prefix} Queueing SHOW_MAP command for updated map overlay."
)
put_queue(
self._tkinter_queue,
("SHOW_MAP", map_image_with_overlay),
"tkinter",
self._app,
)
else:
logging.debug(f"{log_prefix} Skipping queue put due to shutdown.")
def display_map(self, map_image_pil: Optional[Image.Image]):
"""Instructs the MapDisplayWindow to show the provided map image."""
log_prefix = f"{self._log_prefix} Display"
if self._map_display_window:
logging.debug(f"{log_prefix} Calling MapDisplayWindow.show_map...")
try:
self._map_display_window.show_map(map_image_pil)
self._update_app_status_after_map_load(
map_image_pil is not None
) # Update status after display attempt
except Exception as e:
logging.exception(
f"{log_prefix} Error calling MapDisplayWindow.show_map:"
)
else:
logging.warning(f"{log_prefix} Map display window not available.")
def _update_app_status_after_map_load(self, success: bool):
"""Updates the main application status after the initial map load attempt."""
log_prefix = f"{self._log_prefix} Status Update"
try:
if hasattr(
self._app, "statusbar"
) and "Loading initial map" in self._app.statusbar.cget("text"):
status_msg = "Error Loading Map" # Default if failed
if success:
current_mode = self._app.state.test_mode_active
status_msg = (
"Ready (Test Mode)"
if current_mode
else (
"Ready (Local Mode)"
if config.USE_LOCAL_IMAGES
else (
f"Listening UDP {self._app.local_ip}:{self._app.local_port}"
if self._app.udp_socket
else "Error: No Socket"
)
)
)
logging.debug(
f"{log_prefix} Initial map load finished (Success: {success}). Setting App status to: '{status_msg}'"
)
self._app.set_status(status_msg)
except Exception as e:
logging.warning(
f"{log_prefix} Error checking/updating app status after map load: {e}"
)
def shutdown(self):
"""Cleans up map-related resources."""
log_prefix = f"{self._log_prefix} Shutdown"
logging.debug(f"{log_prefix} Shutting down map integration components...")
if self._map_display_window:
logging.debug(f"{log_prefix} Requesting MapDisplayWindow destroy...")
try:
self._map_display_window.destroy_window()
except Exception as e:
logging.exception(f"{log_prefix} Error destroying MapDisplayWindow:")
self._map_display_window = None
logging.debug(f"{log_prefix} Map integration shutdown complete.")
def _calculate_sar_corners_geo(
self, geo_info: Dict[str, Any]
) -> Optional[List[Tuple[float, float]]]:
"""Calculates the geographic coordinates (lon, lat degrees) of the four SAR image corners."""
log_prefix = f"{self._log_prefix} SAR Corners Geo"
logging.debug(f"{log_prefix} Calculating SAR corner geographic coordinates...")
if not self._geod:
logging.error(f"{log_prefix} Geodetic calculator not initialized.")
return None
try:
center_lat_rad = geo_info["lat"]
center_lon_rad = geo_info["lon"]
orient_rad = geo_info["orientation"]
ref_x = geo_info["ref_x"]
ref_y = geo_info["ref_y"]
scale_x = geo_info["scale_x"]
scale_y = geo_info["scale_y"]
width = geo_info["width_px"]
height = geo_info["height_px"]
# Invert orientation for calculation as per requirement analysis
calc_orient_rad = -orient_rad # Apply inversion for calculation
if not (scale_x > 0 and scale_y > 0 and width > 0 and height > 0):
logging.error(f"{log_prefix} Invalid scale/dimensions.")
return None
corners_pixel = [
(0 - ref_x, ref_y - 0),
(width - 1 - ref_x, ref_y - 0),
(width - 1 - ref_x, ref_y - (height - 1)),
(0 - ref_x, ref_y - (height - 1)),
]
corners_meters = [(dx * scale_x, dy * scale_y) for dx, dy in corners_pixel]
corners_meters_rotated = []
if abs(calc_orient_rad) > 1e-6:
cos_o = math.cos(calc_orient_rad)
sin_o = math.sin(calc_orient_rad)
for dx_m, dy_m in corners_meters:
corners_meters_rotated.append(
(dx_m * cos_o - dy_m * sin_o, dx_m * sin_o + dy_m * cos_o)
)
logging.debug(
f"{log_prefix} Applied rotation ({math.degrees(calc_orient_rad):.2f} deg) to meter offsets."
)
else:
corners_meters_rotated = corners_meters
logging.debug(f"{log_prefix} Skipping rotation (angle near zero).")
sar_corners_geo_deg = []
center_lon_deg = math.degrees(center_lon_rad)
center_lat_deg = math.degrees(center_lat_rad)
for dx_m_rot, dy_m_rot in corners_meters_rotated:
distance_m = math.sqrt(dx_m_rot**2 + dy_m_rot**2)
azimuth_rad = math.atan2(dx_m_rot, dy_m_rot)
azimuth_deg = math.degrees(azimuth_rad)
endlon, endlat, _ = self._geod.fwd(
center_lon_deg, center_lat_deg, azimuth_deg, distance_m
)
sar_corners_geo_deg.append((endlon, endlat))
logging.debug(
f"{log_prefix} Corner: Dist={distance_m:.1f}m, Az={azimuth_deg:.2f}deg -> Lon={endlon:.6f}, Lat={endlat:.6f}"
)
if len(sar_corners_geo_deg) == 4:
logging.debug(
f"{log_prefix} Successfully calculated 4 SAR corner coordinates."
)
return sar_corners_geo_deg
else:
logging.error(f"{log_prefix} Failed to calculate all 4 corners.")
return None
except KeyError as ke:
logging.error(f"{log_prefix} Missing key in geo_info: {ke}")
return None
except Exception as e:
logging.exception(f"{log_prefix} Error calculating SAR corners:")
return None
def _geo_coords_to_map_pixels(
self,
coords_deg: List[Tuple[float, float]],
map_bounds: mercantile.LngLatBbox,
map_tile_ranges: Tuple[Tuple[int, int], Tuple[int, int]],
zoom: int,
stitched_map_shape: Tuple[int, int],
) -> Optional[List[Tuple[int, int]]]:
"""Converts geographic coordinates (lon, lat degrees) to pixel coordinates relative to the stitched map image."""
log_prefix = f"{self._log_prefix} Geo to Pixel"
logging.debug(
f"{log_prefix} Converting {len(coords_deg)} geo coordinates to map pixels..."
)
if mercantile is None:
logging.error(f"{log_prefix} Mercantile library not available.")
return None
if (
not stitched_map_shape
or stitched_map_shape[0] <= 0
or stitched_map_shape[1] <= 0
):
logging.error(f"{log_prefix} Invalid map shape: {stitched_map_shape}")
return None
pixel_coords = []
map_height_px, map_width_px = stitched_map_shape
tile_size = self._map_service.tile_size if self._map_service else 256
try:
min_tile_x = map_tile_ranges[0][0]
min_tile_y = map_tile_ranges[1][0]
tl_tile_mercator_bounds = mercantile.xy_bounds(min_tile_x, min_tile_y, zoom)
map_origin_x_mercator = tl_tile_mercator_bounds.left
map_origin_y_mercator = tl_tile_mercator_bounds.top # Top has higher Y
max_tile_x = map_tile_ranges[0][1]
max_tile_y = map_tile_ranges[1][1]
br_tile_mercator_bounds = mercantile.xy_bounds(max_tile_x, max_tile_y, zoom)
map_total_width_mercator = (
br_tile_mercator_bounds.right - map_origin_x_mercator
)
map_total_height_mercator = (
map_origin_y_mercator - br_tile_mercator_bounds.bottom
) # Top Y > Bottom Y
if map_total_width_mercator <= 0 or map_total_height_mercator <= 0:
logging.error(f"{log_prefix} Invalid map span in Mercator.")
return None
for lon, lat in coords_deg:
point_x_mercator, point_y_mercator = mercantile.xy(lon, lat)
relative_x_mercator = point_x_mercator - map_origin_x_mercator
relative_y_mercator = (
map_origin_y_mercator - point_y_mercator
) # Invert Y difference
pixel_x = int(
round(
(relative_x_mercator / map_total_width_mercator) * map_width_px
)
)
pixel_y = int(
round(
(relative_y_mercator / map_total_height_mercator)
* map_height_px
)
)
pixel_x_clamped = max(0, min(pixel_x, map_width_px - 1))
pixel_y_clamped = max(0, min(pixel_y, map_height_px - 1))
if pixel_x != pixel_x_clamped or pixel_y != pixel_y_clamped:
logging.warning(
f"{log_prefix} Clamped pixel coords for ({lon:.4f},{lat:.4f})"
)
pixel_coords.append((pixel_x_clamped, pixel_y_clamped))
logging.debug(
f"{log_prefix} Converted ({lon:.4f},{lat:.4f}) -> Pixel({pixel_x_clamped},{pixel_y_clamped})"
)
logging.debug(
f"{log_prefix} Successfully converted {len(pixel_coords)} coordinates."
)
return pixel_coords
except Exception as e:
logging.exception(f"{log_prefix} Error converting geo to map pixels:")
return None
def _draw_scale_bar(
self, image_pil: Image.Image, latitude_deg: float, zoom: int
) -> Image.Image:
"""Draws a simple scale bar onto the map image."""
log_prefix = f"{self._log_prefix} ScaleBar"
if image_pil is None:
return None
try:
meters_per_pixel = calculate_meters_per_pixel(latitude_deg, zoom)
if meters_per_pixel is None or meters_per_pixel <= 0:
logging.warning(
f"{log_prefix} Invalid meters/pixel. Skipping scale bar."
)
return image_pil
img_w, img_h = image_pil.size
target_bar_px = max(50, min(150, img_w // 4))
possible_distances_km = [
0.1,
0.2,
0.5,
1,
2,
5,
10,
20,
50,
100,
200,
500,
1000,
]
best_dist_km = 1
min_diff = float("inf")
for dist_km in possible_distances_km:
pixels = (dist_km * 1000.0) / meters_per_pixel
diff = abs(pixels - target_bar_px)
if diff < min_diff and pixels > 10:
min_diff = diff
best_dist_km = dist_km
scale_distance_km = best_dist_km
scale_distance_meters = scale_distance_km * 1000.0
scale_bar_pixels = int(round(scale_distance_meters / meters_per_pixel))
if scale_bar_pixels < 10:
logging.warning(
f"{log_prefix} Scale bar too small ({scale_bar_pixels}px). Skipping."
)
return image_pil
map_cv = cv2.cvtColor(np.array(image_pil), cv2.COLOR_RGB2BGR)
h, w = map_cv.shape[:2]
bar_x_start = 15
bar_y = h - 20
bar_thickness = 2
color = (0, 0, 0) # Black
cv2.line(
map_cv,
(bar_x_start, bar_y),
(bar_x_start + scale_bar_pixels, bar_y),
color,
bar_thickness,
)
cv2.line(
map_cv,
(bar_x_start, bar_y - 3),
(bar_x_start, bar_y + 3),
color,
bar_thickness,
)
cv2.line(
map_cv,
(bar_x_start + scale_bar_pixels, bar_y - 3),
(bar_x_start + scale_bar_pixels, bar_y + 3),
color,
bar_thickness,
)
label = (
f"{scale_distance_km} km"
if scale_distance_km >= 1
else f"{int(scale_distance_meters)} m"
)
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.5
font_thickness = 1
text_size, _ = cv2.getTextSize(label, font, font_scale, font_thickness)
text_x = bar_x_start + (scale_bar_pixels // 2) - (text_size[0] // 2)
text_y = bar_y - 10
cv2.putText(
map_cv,
label,
(text_x, text_y),
font,
font_scale,
color,
font_thickness,
cv2.LINE_AA,
)
image_pil_with_scale = Image.fromarray(
cv2.cvtColor(map_cv, cv2.COLOR_BGR2RGB)
)
logging.debug(
f"{log_prefix} Scale bar drawn ({label}, {scale_bar_pixels}px)."
)
return image_pil_with_scale
except Exception as e:
logging.exception(f"{log_prefix} Error drawing scale bar:")
return image_pil
# --- END OF FILE map_integration.py ---