SXXXXXXX_GeoElevation/geoelevation/map_viewer/geo_map_viewer.py
2025-05-13 14:18:48 +02:00

467 lines
24 KiB
Python

# geoelevation/map_viewer/geo_map_viewer.py
"""
Orchestrates map display functionalities for the GeoElevation application.
This module initializes and manages map services, tile fetching/caching,
and the map display window. It handles requests to show maps centered on
specific points or covering defined areas, applying a specified display scale.
It also processes user interactions (mouse clicks) on the map, converting
pixel coordinates to geographic coordinates, fetching elevation for those points
using the core ElevationManager, and sending this information back to the
main GUI via a queue.
"""
# Standard library imports
import logging
import math
import queue # For type hinting, actual queue object is passed in
from typing import Optional, Tuple, Dict, Any, List # Added List
# Third-party imports
try:
from PIL import Image
ImageType = Image.Image # type: ignore
PIL_IMAGE_LIB_AVAILABLE = True
except ImportError:
Image = None # type: ignore
ImageType = None # type: ignore
PIL_IMAGE_LIB_AVAILABLE = False
logging.error("GeoMapViewer: Pillow (PIL) library not found. Image operations will fail.")
try:
import cv2 # OpenCV for drawing operations
import numpy as np
CV2_NUMPY_LIBS_AVAILABLE = True
except ImportError:
cv2 = None # type: ignore
np = None # type: ignore
CV2_NUMPY_LIBS_AVAILABLE = False
logging.error("GeoMapViewer: OpenCV or NumPy not found. Drawing and image operations will fail.")
# Local application/package imports
# Imports from other modules within the 'map_viewer' subpackage
from .map_services import BaseMapService
from .map_services import OpenStreetMapService # Default service if none specified
from .map_manager import MapTileManager
from .map_utils import get_bounding_box_from_center_size
from .map_utils import get_tile_ranges_for_bbox
from .map_utils import MapCalculationError
# from .map_utils import calculate_meters_per_pixel # If scale bar is drawn here
# Imports from the parent 'geoelevation' package
from geoelevation.elevation_manager import ElevationManager
# Module-level logger
logger = logging.getLogger(__name__)
# Default configuration values specific to the map viewer's operation
DEFAULT_MAP_TILE_CACHE_DIRECTORY = "map_tile_cache_ge" # Specific cache for map tiles
DEFAULT_MAP_DISPLAY_ZOOM_LEVEL = 15
DEFAULT_MAP_VIEW_AREA_SIZE_KM = 5.0 # Default size of map area around a point in km
class GeoElevationMapViewer:
"""
Manages the display of maps (e.g., OpenStreetMap) and user interaction
for the GeoElevation application. This class is intended to be run in
a separate process.
"""
def __init__(
self,
elevation_manager_instance: ElevationManager,
gui_output_communication_queue: queue.Queue, # For sending data back to GUI
initial_display_scale: float = 1.0 # Scale factor for the map image
) -> None:
"""
Initializes the GeoElevationMapViewer.
Args:
elevation_manager_instance: An instance of ElevationManager to fetch elevations.
gui_output_communication_queue: A queue to send interaction data back to the main GUI.
initial_display_scale: The initial scaling factor for displaying the map image.
Example: 0.5 means 50% of original stitched map size.
"""
logger.info("Initializing GeoElevationMapViewer...")
if not (CV2_NUMPY_LIBS_AVAILABLE and PIL_IMAGE_LIB_AVAILABLE):
# Logged during import, but critical for class functionality
raise ImportError("Pillow, OpenCV, or NumPy not available. GeoElevationMapViewer cannot function.")
self.elevation_manager: ElevationManager = elevation_manager_instance
self.gui_com_queue: queue.Queue = gui_output_communication_queue
self.current_display_scale_factor: float = initial_display_scale
self.map_service_provider: Optional[BaseMapService] = None
self.map_tile_fetch_manager: Optional[MapTileManager] = None
self.map_display_window_controller: Optional['MapDisplayWindow'] = None # Forward declaration for type hint
# State attributes for the currently displayed map context
self._current_stitched_map_pil: Optional[ImageType] = None
self._current_map_geo_bounds_deg: Optional[Tuple[float, float, float, float]] = None
self._current_map_render_zoom: Optional[int] = None
# Pixel shape of the *stitched* map before display scaling
self._current_stitched_map_pixel_shape: Optional[Tuple[int, int]] = None # Height, Width
self._last_map_click_pixel_coords_on_displayed_image: Optional[Tuple[int, int]] = None
self._initialize_map_viewer_components()
logger.info("GeoElevationMapViewer initialization complete.")
def _initialize_map_viewer_components(self) -> None:
"""Initializes internal components like map service, tile manager, and display window."""
logger.debug("Initializing internal map viewer components...")
try:
# Import MapDisplayWindow here to manage potential import complexities
# and ensure it's only imported when GeoElevationMapViewer is instantiated.
from .map_display import MapDisplayWindow
# Default to OpenStreetMap service
self.map_service_provider = OpenStreetMapService()
if not self.map_service_provider: # Should not happen if constructor is fine
raise ValueError("Failed to initialize OpenStreetMapService.")
logger.info(f"Map service provider '{self.map_service_provider.name}' initialized.")
self.map_tile_fetch_manager = MapTileManager(
map_service=self.map_service_provider, # Corrected parameter name
cache_root_directory=DEFAULT_MAP_TILE_CACHE_DIRECTORY,
enable_online_tile_fetching=True # Assume online capability
)
logger.info("MapTileManager initialized.")
# The MapDisplayWindow will use 'self' (this GeoElevationMapViewer instance)
# as its 'app_facade' to call back 'handle_map_mouse_click'.
# It will also access 'self.current_display_scale_factor' for scaling.
self.map_display_window_controller = MapDisplayWindow(
app_facade=self,
window_name="GeoElevation - Interactive Map"
)
logger.info("MapDisplayWindow controller initialized.")
except ImportError as e_imp_comp:
logger.critical(f"Failed to import a required map component: {e_imp_comp}", exc_info=True)
raise # Propagate error to signal fatal initialization failure
except Exception as e_init_comp:
logger.critical(f"Failed to initialize map components: {e_init_comp}", exc_info=True)
raise
def display_map_for_point(
self,
center_latitude: float,
center_longitude: float,
target_map_zoom: Optional[int] = None
) -> None:
"""Displays a map centered on the given geographic point, scaled by current factor."""
if not self.map_tile_fetch_manager or not self.map_display_window_controller:
logger.error("Map components not ready. Cannot display map for point.")
return
effective_zoom = target_map_zoom if target_map_zoom is not None else DEFAULT_MAP_DISPLAY_ZOOM_LEVEL
logger.info(
f"Requesting map display for point: ({center_latitude:.5f}, {center_longitude:.5f}), "
f"Zoom: {effective_zoom}, Scale: {self.current_display_scale_factor:.2f}"
)
try:
# Determine the geographic area to fetch tiles for
# Fetch a slightly larger area (e.g., by factor of 1.2) than the target display size
# to provide some context around the point.
map_area_km_for_fetch = DEFAULT_MAP_VIEW_AREA_SIZE_KM * 1.2
tile_fetch_geographic_bbox = get_bounding_box_from_center_size(
center_latitude, center_longitude, map_area_km_for_fetch
)
if not tile_fetch_geographic_bbox:
raise MapCalculationError("Bounding box calculation for point display failed.")
# Determine the range of map tiles (X, Y) needed for this bounding box and zoom
map_tile_xy_ranges = get_tile_ranges_for_bbox(tile_fetch_geographic_bbox, effective_zoom)
if not map_tile_xy_ranges:
raise MapCalculationError("Tile range calculation for point display failed.")
# Retrieve and stitch the map tiles into a single PIL image
stitched_map_pil_image = self.map_tile_fetch_manager.stitch_map_image(
effective_zoom, map_tile_xy_ranges[0], map_tile_xy_ranges[1] # x_range, y_range
)
if not stitched_map_pil_image:
logger.error("Failed to stitch map image for point display.")
self.map_display_window_controller.show_map(None) # Show placeholder
return
# Update internal state with details of the new map
self._current_stitched_map_pil = stitched_map_pil_image
# Get actual geographic bounds of the stitched map for accurate coordinate conversions
self._current_map_geo_bounds_deg = self.map_tile_fetch_manager._get_bounds_for_tile_range(
effective_zoom, map_tile_xy_ranges
)
self._current_map_render_zoom = effective_zoom
self._current_stitched_map_pixel_shape = (stitched_map_pil_image.height, stitched_map_pil_image.width)
# Draw a marker at the specified center point on the map
map_with_center_marker_pil = self._draw_point_marker_on_map(
stitched_map_pil_image.copy(), center_latitude, center_longitude # Operate on a copy
)
# Display the (potentially marked) map using MapDisplayWindow
# MapDisplayWindow will handle the scaling using self.current_display_scale_factor
self.map_display_window_controller.show_map(map_with_center_marker_pil)
self._last_user_click_pixel_coords_on_displayed_image = None # Reset last click
except MapCalculationError as e_map_calc_pt:
logger.error(f"Map calculation error during point display: {e_map_calc_pt}")
if self.map_display_window_controller: self.map_display_window_controller.show_map(None)
except Exception as e_disp_map_pt:
logger.exception(f"Unexpected error displaying map for point: {e_disp_map_pt}")
if self.map_display_window_controller: self.map_display_window_controller.show_map(None)
def display_map_for_area(
self,
area_geographic_bbox: Tuple[float, float, float, float], # west, south, east, north
target_map_zoom: Optional[int] = None
) -> None:
"""Displays a map covering the specified geographic area, scaled by current factor."""
if not self.map_tile_fetch_manager or not self.map_display_window_controller:
logger.error("Map components not ready. Cannot display map for area.")
return
effective_zoom = target_map_zoom if target_map_zoom is not None else DEFAULT_MAP_DISPLAY_ZOOM_LEVEL
logger.info(
f"Requesting map display for area: BBox {area_geographic_bbox}, "
f"Zoom: {effective_zoom}, Scale: {self.current_display_scale_factor:.2f}"
)
try:
map_tile_xy_ranges = get_tile_ranges_for_bbox(area_geographic_bbox, effective_zoom)
if not map_tile_xy_ranges:
raise MapCalculationError("Tile range calculation for area display failed.")
stitched_map_pil_image = self.map_tile_fetch_manager.stitch_map_image(
effective_zoom, map_tile_xy_ranges[0], map_tile_xy_ranges[1]
)
if not stitched_map_pil_image:
logger.error("Failed to stitch map image for area display.")
self.map_display_window_controller.show_map(None)
return
self._current_stitched_map_pil = stitched_map_pil_image
self._current_map_geo_bounds_deg = self.map_tile_fetch_manager._get_bounds_for_tile_range(
effective_zoom, map_tile_xy_ranges
)
self._current_map_render_zoom = effective_zoom
self._current_stitched_map_pixel_shape = (stitched_map_pil_image.height, stitched_map_pil_image.width)
map_with_area_bbox_pil = self._draw_area_bounding_box_on_map(
stitched_map_pil_image.copy(), area_geographic_bbox # Operate on a copy
)
self.map_display_window_controller.show_map(map_with_area_bbox_pil)
self._last_user_click_pixel_coords_on_displayed_image = None
except MapCalculationError as e_map_calc_area:
logger.error(f"Map calculation error during area display: {e_map_calc_area}")
if self.map_display_window_controller: self.map_display_window_controller.show_map(None)
except Exception as e_disp_map_area:
logger.exception(f"Unexpected error displaying map for area: {e_disp_map_area}")
if self.map_display_window_controller: self.map_display_window_controller.show_map(None)
def _can_perform_drawing_operations(self) -> bool:
"""Checks if conditions are met for drawing markers/boxes on the map."""
return bool(
self._current_map_geo_bounds_deg and
self._current_map_render_zoom is not None and
self._current_stitched_map_pixel_shape and # Shape of the *unscaled* stitched map
self.map_display_window_controller and # The window handler must exist
CV2_NUMPY_LIBS_AVAILABLE and PIL_IMAGE_LIB_AVAILABLE # Core drawing libs
)
def _draw_point_marker_on_map(
self,
pil_image_to_draw_on: ImageType,
latitude_deg: float,
longitude_deg: float
) -> ImageType:
"""Draws a marker for a single point on the provided PIL map image."""
if not self._can_perform_drawing_operations():
logger.warning("Cannot draw point marker: drawing context or libraries not ready.")
return pil_image_to_draw_on
# Convert geographic coordinates to pixel coordinates on the *unscaled* stitched map
pixel_coords_on_stitched_map = self.map_display_window_controller.geo_to_pixel_on_current_map( # type: ignore
latitude_deg, longitude_deg,
self._current_map_geo_bounds_deg, # type: ignore # Bounds of the unscaled map
self._current_stitched_map_pixel_shape, # type: ignore # Shape of the unscaled map
self._current_map_render_zoom # type: ignore # Zoom of the unscaled map
)
if pixel_coords_on_stitched_map and cv2 and np:
px, py = pixel_coords_on_stitched_map
logger.debug(f"Drawing point marker at pixel ({px},{py}) on stitched map.")
# Convert PIL to BGR NumPy array for OpenCV drawing
map_cv_bgr = cv2.cvtColor(np.array(pil_image_to_draw_on), cv2.COLOR_RGB2BGR) # type: ignore
# Draw a red cross marker
cv2.drawMarker(map_cv_bgr, (px, py), (0,0,255), cv2.MARKER_CROSS, markerSize=20, thickness=2) # type: ignore
# Convert back to PIL Image
return Image.fromarray(cv2.cvtColor(map_cv_bgr, cv2.COLOR_BGR2RGB)) # type: ignore
logger.warning("Failed to convert geo to pixel for point marker, or OpenCV/NumPy missing.")
return pil_image_to_draw_on # Return original if conversion failed
def _draw_area_bounding_box_on_map(
self,
pil_image_to_draw_on: ImageType,
area_bbox_deg: Tuple[float, float, float, float] # west, south, east, north
) -> ImageType:
"""Draws the outline of the area's bounding box on the PIL map image."""
if not self._can_perform_drawing_operations():
logger.warning("Cannot draw area BBox: drawing context or libraries not ready.")
return pil_image_to_draw_on
west, south, east, north = area_bbox_deg
corners_geo_coords = [
(west, north), (east, north), (east, south), (west, south) # TL, TR, BR, BL
]
pixel_corner_coords_list: List[Tuple[int,int]] = []
for lon, lat in corners_geo_coords:
px_coords = self.map_display_window_controller.geo_to_pixel_on_current_map( # type: ignore
lat, lon,
self._current_map_geo_bounds_deg, # type: ignore
self._current_stitched_map_pixel_shape, # type: ignore
self._current_map_render_zoom # type: ignore
)
if px_coords:
pixel_corner_coords_list.append(px_coords)
if len(pixel_corner_coords_list) == 4 and cv2 and np:
logger.debug(f"Drawing area BBox with pixel corners: {pixel_corner_coords_list}")
map_cv_bgr = cv2.cvtColor(np.array(pil_image_to_draw_on), cv2.COLOR_RGB2BGR) # type: ignore
pts_for_cv = np.array(pixel_corner_coords_list, np.int32).reshape((-1,1,2)) # type: ignore
cv2.polylines(map_cv_bgr, [pts_for_cv], isClosed=True, color=(255,0,0), thickness=2) # Blue rectangle # type: ignore
return Image.fromarray(cv2.cvtColor(map_cv_bgr, cv2.COLOR_BGR2RGB)) # type: ignore
logger.warning("Failed to convert all corners for area BBox, or OpenCV/NumPy missing.")
return pil_image_to_draw_on
def _draw_user_click_marker_on_map(
self,
pil_image_to_draw_on: ImageType
) -> Optional[ImageType]:
"""Draws a marker at the last user-clicked pixel location on the map image."""
if not self._last_user_click_pixel_coords_on_displayed_image or \
pil_image_to_draw_on is None or \
not self._can_perform_drawing_operations():
logger.debug("Conditions not met for drawing user click marker.")
return pil_image_to_draw_on
# IMPORTANT: The _last_user_click_pixel_coords_on_displayed_image are in the coordinate
# system of the SCALED image shown in MapDisplayWindow.
# To draw on pil_image_to_draw_on (which is the UN SCALED stitched map),
# we need to un-scale these pixel coordinates.
clicked_px_on_displayed, clicked_py_on_displayed = self._last_user_click_pixel_coords_on_displayed_image
# Un-scale the click coordinates to match the original stitched image
unscaled_click_px = int(round(clicked_px_on_displayed / self.current_display_scale_factor))
unscaled_click_py = int(round(clicked_py_on_displayed / self.current_display_scale_factor))
# Clamp to the dimensions of the unscaled stitched image
if self._current_stitched_map_pixel_shape:
h_unscaled, w_unscaled = self._current_stitched_map_pixel_shape
unscaled_click_px = max(0, min(unscaled_click_px, w_unscaled - 1))
unscaled_click_py = max(0, min(unscaled_click_py, h_unscaled - 1))
else:
logger.warning("Cannot accurately unscale click for marker: unscaled map shape unknown.")
return pil_image_to_draw_on
if cv2 and np:
try:
logger.debug(f"Drawing user click marker at unscaled pixel ({unscaled_click_px},{unscaled_click_py})")
map_cv_bgr = cv2.cvtColor(np.array(pil_image_to_draw_on), cv2.COLOR_RGB2BGR) # type: ignore
# Red cross marker for user click
cv2.drawMarker(map_cv_bgr, (unscaled_click_px,unscaled_click_py), (0,0,255), cv2.MARKER_CROSS, 15, 2) # type: ignore
return Image.fromarray(cv2.cvtColor(map_cv_bgr, cv2.COLOR_BGR2RGB)) # type: ignore
except Exception as e_draw_user_click:
logger.exception(f"Error drawing user click marker: {e_draw_user_click}")
return pil_image_to_draw_on # Return original on drawing error
return pil_image_to_draw_on
def handle_map_mouse_click(self, pixel_x_on_displayed: int, pixel_y_on_displayed: int) -> None:
"""
Handles a mouse click event from MapDisplayWindow (coordinates are on scaled image).
Converts pixel to geographic, fetches elevation, sends data to GUI queue,
and redraws the map with the new click marker.
"""
logger.debug(f"Map mouse click (on scaled img) received at pixel ({pixel_x_on_displayed}, {pixel_y_on_displayed})")
self._last_user_click_pixel_coords_on_displayed_image = (pixel_x_on_displayed, pixel_y_on_displayed)
if not self._can_perform_drawing_operations() or not self.map_display_window_controller:
logger.warning("Cannot process map click: map context or display handler not fully loaded.")
return
# The pixel_to_geo_on_current_map method in MapDisplayWindow expects pixel coordinates
# relative to the *displayed* (potentially scaled) image. It uses its own
# _last_displayed_image_shape which *is* the shape of the scaled image.
# The map bounds and zoom, however, always refer to the unscaled map data.
geo_coords_clicked = self.map_display_window_controller.pixel_to_geo_on_current_map( # type: ignore
pixel_x_on_displayed, pixel_y_on_displayed,
self._current_map_geo_bounds_deg, # type: ignore # Geographic bounds of the full unscaled map
self.map_display_window_controller._last_displayed_image_shape, # Pixel shape of displayed (scaled) map
self._current_map_render_zoom # type: ignore # Zoom level of the unscaled map
)
elevation_value: Optional[float] = None
elevation_display_text = "N/A"
latitude_value: Optional[float] = None
longitude_value: Optional[float] = None
if geo_coords_clicked:
latitude_value, longitude_value = geo_coords_clicked
logger.info(f"Map click converted to Geo: Lat={latitude_value:.5f}, Lon={longitude_value:.5f}")
elevation_value = self.elevation_manager.get_elevation(latitude_value, longitude_value)
if elevation_value is None: elevation_display_text = "Unavailable"
elif math.isnan(elevation_value): elevation_display_text = "NoData"
else: elevation_display_text = f"{elevation_value:.2f} m"
logger.info(f"Elevation at clicked geo point: {elevation_display_text}")
else:
logger.warning(f"Could not convert pixel click ({pixel_x_on_displayed}, {pixel_y_on_displayed}) to geo.")
elevation_display_text = "Error: Click out of bounds?"
try:
click_data_to_gui = {
"type": "map_click_data",
"latitude": latitude_value,
"longitude": longitude_value,
"elevation_str": elevation_display_text,
"elevation_val": elevation_value
}
self.gui_com_queue.put(click_data_to_gui)
logger.debug(f"Sent map_click_data to GUI queue: {click_data_to_gui}")
except Exception as e_queue_send:
logger.exception(f"Error putting click data onto GUI queue: {e_queue_send}")
# Redraw the map: take the original stitched map, draw relevant markers, then show.
# MapDisplayWindow.show_map() will re-apply the current_display_scale_factor.
if self._current_stitched_map_pil and self.map_display_window_controller:
base_map_for_drawing = self._current_stitched_map_pil.copy()
# Redraw point/area markers if applicable (logic for this needs to be robust)
# For simplicity, assume we only draw the latest click for now on top of base.
# A better way would be to store what 'mode' the map is in (point/area)
# and redraw the primary feature of that mode plus the click.
map_with_new_marker = self._draw_user_click_marker_on_map(base_map_for_drawing)
if map_with_new_marker:
self.map_display_window_controller.show_map(map_with_new_marker)
else: # Fallback if marker drawing fails
self.map_display_window_controller.show_map(base_map_for_drawing)
def shutdown(self) -> None:
"""Cleans up resources, particularly the map display window."""
logger.info("Shutting down GeoElevationMapViewer and its display window.")
if self.map_display_window_controller:
self.map_display_window_controller.destroy_window()
logger.info("GeoElevationMapViewer shutdown procedure complete.")