fatto refactoring con submodulo map-manager

This commit is contained in:
VALLONGOL 2025-12-02 09:50:29 +01:00
parent e2f5fdd1d7
commit ebb2546d84
21 changed files with 250 additions and 5224 deletions

25
.gitignore vendored
View File

@ -1,14 +1,19 @@
.svn
map_elevation/
map_elevation/*
.hgt
.jpg
.png
elevation_cache/
__pycache__/
_build/
_dist/
_req_packages/
map_tile_cache/
map_tile_cache_ge/
geoelevation_dem_cache/
build/
elevation_cache/*
__pycache__/*
_build/*
_dist/*
_req_packages/*
map_tile_cache/*
map_tile_cache_ge/*
geoelevation_dem_cache/*
build/*
elevation_data_cache_gui_fallback_critical/*
elevation_data_cache/*
debug_map_cache/*
test_cache_dir/*
.pytest_cache/*

149
doc/map_manager.md Normal file
View File

@ -0,0 +1,149 @@
Ecco il piano operativo completo per il refactoring e la creazione del modulo `python-map-manager`. Questo documento è strutturato per essere inserito direttamente nella documentazione tecnica del progetto.
---
# Piano Operativo: Refactoring e Creazione Modulo `python-map-manager`
## 1. Obiettivo del Refactoring
L'obiettivo primario è disaccoppiare la logica di gestione, recupero e visualizzazione delle mappe geografiche (attualmente integrata nell'applicazione `geoelevation`) per creare un componente software autonomo, riutilizzabile e manutenibile separatamente.
Questo nuovo componente sarà gestito come **Git Submodule** e denominato **`python-map-manager`**.
### Obiettivi Specifici
1. **Indipendenza**: Il modulo non deve avere dipendenze dalla logica di business dell'applicazione ospite (es. non deve conoscere `ElevationManager`).
2. **Modularità**: Separazione netta tra logica di elaborazione (`Engine`) e logica di visualizzazione (`Visualizer`).
3. **Testabilità**: Inclusione di un tool di debug integrato (`debug_tool.py`) per lo sviluppo e il test isolato delle funzionalità.
4. **Interfaccia Chiara**: Esposizione di API semplici per richiedere immagini di mappe basate su aree, punti o raggi.
---
## 2. Architettura del Nuovo Modulo
Il modulo `python-map-manager` esporrà due componenti principali:
### A. `MapEngine` (Logica Backend)
È il cervello del modulo. Non ha interfaccia grafica.
* **Responsabilità**:
* Gestione dei provider di mappe (es. OpenStreetMap).
* Gestione della cache su disco (download, salvataggio, recupero).
* Calcoli matematici (conversioni coordinate Geo <-> Pixel, calcolo Bounding Box).
* Stitching (unione) delle tile per formare un'unica immagine PIL.
* **Funzionalità Chiave**:
* `get_image_for_area(bbox, max_size)`: Restituisce un'immagine ottimizzata per coprire un'area.
* `get_image_for_point(lat, lon, zoom, size)`: Restituisce un'immagine centrata su un punto.
### B. `MapVisualizer` (Interfaccia Frontend - Opzionale)
È il componente di visualizzazione interattiva (basato su OpenCV).
* **Responsabilità**:
* Apertura e gestione della finestra grafica.
* Gestione dell'input utente (Mouse, Zoom, Pan).
* Rendering dell'immagine fornita dall'`Engine`.
* **Disaccoppiamento**:
* Invece di chiamare funzioni esterne, il Visualizer emette **Eventi** (tramite callback) quando l'utente interagisce (es. `on_map_click`, `on_area_selected`). L'applicazione ospite si sottoscrive a questi eventi.
---
## 3. Struttura del Repository `python-map-manager`
```text
python-map-manager/
├── map_manager/ # Package Python principale
│ ├── __init__.py # Espone MapEngine e MapVisualizer
│ ├── engine.py # Classe MapEngine (Facade logica)
│ ├── visualizer.py # Classe MapVisualizer (Gestione Window/OpenCV)
│ ├── tile_manager.py # Gestione download e cache (ex map_manager.py)
│ ├── services.py # Definizioni Provider Mappe (ex map_services.py)
│ ├── utils.py # Calcoli geografici puri (ex map_utils.py)
│ └── drawing.py # Funzioni di disegno su PIL (ex map_drawing.py)
├── debug_tool.py # Tool CLI/GUI per testare il modulo isolatamente
├── requirements.txt # Dipendenze (requests, Pillow, opencv-python, mercantile, pyproj)
└── README.md # Documentazione API
```
---
## 4. Fasi di Implementazione
### Fase 1: Setup dell'Ambiente e Migrazione File "Puri"
In questa fase si crea la struttura base e si migrano le librerie di utilità che non richiedono refactoring logico.
1. Creare la cartella `python-map-manager` e inizializzare git.
2. Creare la struttura cartelle `map_manager/`.
3. **Migrazione Diretta**:
* Copiare `geoelevation/map_viewer/map_services.py` -> `map_manager/services.py`.
* Copiare `geoelevation/map_viewer/map_utils.py` -> `map_manager/utils.py`.
* Copiare `geoelevation/map_viewer/map_drawing.py` -> `map_manager/drawing.py`.
* Copiare `geoelevation/map_viewer/map_manager.py` -> `map_manager/tile_manager.py` (Rinominato per chiarezza).
4. **Normalizzazione Import**: Aggiornare gli import interni ai file copiati per puntare ai nuovi percorsi relativi (es. `from .services import ...` invece di `from .map_services import ...`).
### Fase 2: Implementazione di `MapEngine` (`engine.py`)
In questa fase si astrae la logica di calcolo e recupero immagini.
1. Creare `map_manager/engine.py`.
2. Definire la classe `MapEngine`.
3. Implementare il metodo `__init__` per configurare il `MapTileManager` e la cache.
4. Implementare `get_image_for_area`:
* Deve accettare coordinate `(min_lat, min_lon, max_lat, max_lon)`.
* Deve usare `utils.py` per calcolare lo zoom ottimale in base alle dimensioni pixel richieste.
* Deve chiamare `tile_manager.stitch_map_image`.
5. Implementare `get_image_for_point`:
* Accetta centro e livello di zoom.
* Calcola il Bounding Box necessario usando `utils.py`.
* Richiede lo stitching.
### Fase 3: Implementazione di `MapVisualizer` (`visualizer.py`)
In questa fase si crea il gestore della finestra, rimuovendo ogni logica di business specifica di `geoelevation`.
1. Creare `map_manager/visualizer.py`.
2. Definire la classe `MapVisualizer` che accetta un'istanza di `MapEngine`.
3. Estrarre la logica OpenCV da `geo_map_viewer.py` e `map_display.py`.
4. Implementare il loop di gestione eventi mouse (`cv2.setMouseCallback`).
5. **Refactoring Eventi**:
* Definire una proprietà `callback_on_click` (funzione che accetta lat, lon).
* Quando avviene un click, usare `engine` o `utils` per convertire Pixel -> Lat/Lon.
* Invocare `self.callback_on_click(lat, lon)` invece di chiamare `elevation_manager`.
### Fase 4: Creazione del `debug_tool.py`
Uno strumento essenziale per garantire che il modulo funzioni "out of the box".
1. Creare `debug_tool.py` nella root del repository.
2. Lo script deve:
* Istanziare `MapEngine` (con una cache temporanea o di debug).
* Istanziare `MapVisualizer`.
* Definire una funzione dummy: `def on_click(lat, lon): print(f"Clicked: {lat}, {lon}")`.
* Collegare la funzione al visualizzatore.
* Avviare la mappa su coordinate di default (es. Roma).
3. Questo tool servirà per verificare lo zoom, il pan e la correttezza del download delle tile.
### Fase 5: Integrazione nell'Applicazione Principale
Una volta che il submodule è stabile e pushato sul repository remoto.
1. In `geoelevation`, rimuovere la cartella `map_viewer` esistente.
2. Aggiungere il submodule:
```bash
git submodule add -b master <URL_REPO> external/python-map-manager
```
3. Configurare i path in `geoelevation/__init__.py` (o file di setup path dedicato) per includere il submodule.
4. Modificare `geoelevation/process_targets.py` (o dove risiede il processo mappa):
* Importare `MapEngine` e `MapVisualizer` dal submodule.
* Nel processo dedicato alla mappa, definire la funzione di callback reale che interroga `ElevationManager`.
* Passare questa callback al `MapVisualizer`.
---
## 5. Specifiche Tecniche e Standard
* **PEP8**: Tutto il codice deve seguire rigorosamente lo standard PEP8.
* **Type Hinting**: Ogni funzione deve avere le annotazioni di tipo (`-> Optional[Image.Image]`, ecc.).
* **Docstrings**: Ogni classe e metodo pubblico deve avere docstring in Inglese.
* **Dipendenze**:
* Non usare `try-except ImportError` per nascondere dipendenze mancanti all'interno del modulo. Se `MapVisualizer` richiede OpenCV, l'import deve fallire esplicitamente se manca, o essere gestito a livello di `__init__.py` per esporre le funzionalità disponibili.
* Il file `requirements.txt` deve elencare le versioni minime testate.
## 6. Risultato Atteso
Al termine di questo processo, avremo:
1. Un repository `python-map-manager` autonomo.
2. La possibilità di sviluppare e migliorare la gestione mappe lanciando solo `python debug_tool.py`.
3. L'applicazione `geoelevation` più leggera, che delega tutta la complessità cartografica al modulo esterno, mantenendo solo la logica di "cosa fare quando l'utente clicca un punto" (ovvero chiedere l'elevazione).

View File

@ -38,6 +38,19 @@ if not library_logger.hasHandlers():
# --- Import Core Components and Configuration ---
# These imports make key classes and constants available and are used by
# the public API functions defined in this __init__.py.
## Ensure external `python-map-manager` package (workspace submodule) is importable
# This helps top-level modules (e.g. elevation_gui) import `map_manager` early.
try:
_pkg_root = os.path.dirname(__file__)
_candidate = os.path.normpath(os.path.join(_pkg_root, "..", "external", "python-map-manager"))
if os.path.isdir(_candidate) and _candidate not in sys.path:
# Prepend so it takes precedence over other installations
sys.path.insert(0, _candidate)
library_logger.debug(f"Added external python-map-manager to sys.path: {_candidate}")
except Exception:
# Non-fatal: if path manipulation fails, imports will raise as before and fallback handles it.
pass
try:
from .elevation_manager import ElevationManager
from .elevation_manager import RASTERIO_AVAILABLE # Critical dependency check

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -49,12 +49,12 @@ try:
# MODIFIED: Import the deg_to_dms_string utility function from map_utils.py
# WHY: Needed to convert coordinates to DMS format for the GUI display.
# HOW: Added the import statement from the map_viewer.map_utils sub-module.
from geoelevation.map_viewer.map_utils import deg_to_dms_string
# MODIFIED: Import the utility functions for geographic bounds calculation from map_utils.py.
from map_manager.utils import deg_to_dms_string
# MODIFIED: Import the utility functions for geographic bounds calculation from map_manager.utils.
# WHY: Needed to get geographic bounds for 2D plot aspect ratio correction.
# HOW: Added the import statements.
from geoelevation.map_viewer.map_utils import get_hgt_tile_geographic_bounds # For single tile browse
from geoelevation.map_viewer.map_utils import get_combined_geographic_bounds_from_tile_info_list # For area composite
from map_manager.utils import get_hgt_tile_geographic_bounds # For single tile browse
from map_manager.utils import get_combined_geographic_bounds_from_tile_info_list # For area composite
# MODIFIED: Import the multiprocessing target functions from the new module.
# WHY: These functions have been moved to their own module as part of refactoring.
# HOW: Added the import statement.

View File

@ -1,61 +0,0 @@
# geoelevation/map_viewer/__init__.py
"""
GeoElevation Map Viewer Subpackage.
This package provides the components necessary for displaying and interacting
with tiled web maps (e.g., OpenStreetMap) within the main GeoElevation application.
It is designed to run in a separate process to keep the main GUI responsive.
Key components include:
- GeoElevationMapViewer: Orchestrates the map display, data fetching for tiles,
user interaction (mouse clicks), and communication with
the main application GUI.
- MapDisplayWindow: Manages the OpenCV window used for rendering the map and
capturing mouse events.
- MapTileManager: Handles the logic for retrieving map tiles from a specified
map service, including caching and stitching tiles together.
- MapServices: Defines an interface (BaseMapService) and concrete implementations
(e.g., OpenStreetMapService) for different map tile providers.
- MapUtils: Contains utility functions for common geographic and map-tile
related calculations (e.g., bounding boxes, tile ranges).
"""
# To make the main orchestrator class easily importable from this subpackage,
# e.g., `from geoelevation.map_viewer import GeoElevationMapViewer`
# We perform a guarded import here. If critical dependencies for GeoElevationMapViewer
# (like OpenCV, Pillow, which are checked in geo_map_viewer.py itself) are missing,
# this import might fail. The main application (elevation_gui.py) has its own
# checks for MAP_VIEWER_SYSTEM_AVAILABLE to handle this gracefully.
try:
from .geo_map_viewer import GeoElevationMapViewer
# You could also expose other key classes or factory functions if desired:
# from .map_services import OpenStreetMapService, get_map_service_instance
# from .map_utils import get_bounding_box_from_center_size
# Define what `from geoelevation.map_viewer import *` will import.
# It's generally good practice to be explicit.
__all__ = [
"GeoElevationMapViewer",
# "OpenStreetMapService",
# "get_map_service_instance",
# "get_bounding_box_from_center_size",
# Add other names here if you want them to be part of the public API
# of this sub-package when imported with '*'.
]
except ImportError as e_map_viewer_init_import:
# This might happen if, for example, OpenCV is not installed, and thus
# geo_map_viewer.py (or one of its imports like map_display.py) fails to load.
# The main application GUI (elevation_gui.py) has more robust checks.
import logging
# Use the name of this __init__.py file for the logger
logger_init = logging.getLogger(__name__) # Will be 'geoelevation.map_viewer'
logger_init.warning(
f"Could not import GeoElevationMapViewer or other components from the "
f"map_viewer subpackage, possibly due to missing dependencies (e.g., OpenCV, Pillow): {e_map_viewer_init_import}. "
f"Map functionality might be limited if this subpackage is used directly."
)
# If the core component fails to import, __all__ should reflect that
# nothing (or very little) is available.
__all__ = []

File diff suppressed because it is too large Load Diff

View File

@ -1,630 +0,0 @@
# geoelevation/map_viewer/map_display.py
"""
Manages the dedicated OpenCV window for displaying map tiles.
This module handles the creation and updating of an OpenCV window,
displaying map images. It applies a display scale factor (provided by its
app_facade) to the incoming map image before rendering. It also captures
mouse events within the map window and performs conversions between pixel
coordinates (on the displayed, scaled image) and geographic coordinates,
relying on the 'mercantile' library for Web Mercator projections.
"""
# Standard library imports
import logging
import sys # Import sys for logging stream
from typing import Optional, Tuple, Any # 'Any' for app_facade type hint
# Third-party imports
try:
from PIL import Image
ImageType = Image.Image # type: ignore
PIL_LIB_AVAILABLE_DISPLAY = True
except ImportError:
Image = None # type: ignore
ImageType = None # type: ignore
PIL_LIB_AVAILABLE_DISPLAY = False
# Logging might not be set up if this is imported very early by a child process
# So, direct print or rely on higher-level logger configuration.
print("ERROR: MapDisplay - Pillow (PIL) library not found. Image conversion from PIL might fail.")
try:
import cv2 # OpenCV for windowing and drawing
import numpy as np
CV2_NUMPY_LIBS_AVAILABLE_DISPLAY = True
except ImportError:
cv2 = None # type: ignore
np = None # type: ignore
CV2_NUMPY_LIBS_AVAILABLE_DISPLAY = False
print("ERROR: MapDisplay - OpenCV or NumPy not found. Drawing and image operations will fail.")
try:
import mercantile # For Web Mercator tile calculations and coordinate conversions
MERCANTILE_LIB_AVAILABLE_DISPLAY = True
except ImportError:
mercantile = None # type: ignore
MERCANTILE_LIB_AVAILABLE_DISPLAY = False
print("ERROR: MapDisplay - 'mercantile' library not found. Coordinate conversions will fail.")
# Module-level logger
logger = logging.getLogger(__name__) # Uses 'geoelevation.map_viewer.map_display'
# Default window properties (can be overridden or extended if needed)
DEFAULT_CV_WINDOW_X_POSITION = 150
DEFAULT_CV_WINDOW_Y_POSITION = 150
class MapDisplayWindow:
"""
Manages an OpenCV window for displaying map images and handling mouse interactions.
The displayed image is scaled according to a factor provided by its
app_facade.
"""
def __init__(
self,
app_facade: Any, # Instance of GeoElevationMapViewer (or similar providing scale and click handler)
window_name_str: str = "GeoElevation - Interactive Map",
initial_screen_x_pos: int = DEFAULT_CV_WINDOW_X_POSITION,
initial_screen_y_pos: int = DEFAULT_CV_WINDOW_Y_POSITION
) -> None:
"""
Initializes the MapDisplayWindow manager.
Args:
app_facade: An object that has a 'handle_map_click_event(event_type, x, y, flags)' method
and an attribute 'current_display_scale_factor'.
window_name_str: The name for the OpenCV window.
initial_screen_x_pos: Initial X screen position for the window.
initial_screen_y_pos: Initial Y screen position for the window.
"""
logger.info(f"Initializing MapDisplayWindow with name: '{window_name_str}'")
# MODIFIED: Added a check for critical dependencies at init.
# WHY: Ensure the class can function before proceeding.
# HOW: Raise ImportError if dependencies are missing.
if not CV2_NUMPY_LIBS_AVAILABLE_DISPLAY:
critical_msg = "OpenCV and/or NumPy are not available for MapDisplayWindow operation."
logger.critical(critical_msg)
raise ImportError(critical_msg)
# PIL is needed for image conversion, but not strictly for windowing itself,
# though show_map will likely fail without it for non-numpy inputs.
# mercantile is needed for pixel-geo conversions, not for windowing.
# We'll check those where they are strictly needed.
self.app_facade_handler: Any = app_facade # Facade to access scale and report clicks
self.opencv_window_name: str = window_name_str
self.window_start_x_position: int = initial_screen_x_pos
self.window_start_y_position: int = initial_screen_y_pos
self.is_opencv_window_initialized: bool = False
self.is_opencv_mouse_callback_set: bool = False
# Stores the shape (height, width) of the *scaled image actually displayed* by imshow
self._last_displayed_scaled_image_shape: Tuple[int, int] = (0, 0)
def show_map(self, map_pil_image_input: Optional[ImageType]) -> None:
"""
Displays the provided map image (PIL format) in the OpenCV window.
The image is first converted to BGR, then scaled using the factor
from `app_facade.current_display_scale_factor`, and then displayed.
The OpenCV window autosizes to the scaled image.
Args:
map_pil_image_input: The map image (PIL.Image) to display.
If None, a placeholder image is shown.
"""
if not CV2_NUMPY_LIBS_AVAILABLE_DISPLAY:
logger.error("Cannot show map: OpenCV/NumPy not available.")
return
bgr_image_unscaled: Optional[np.ndarray] # type: ignore
if map_pil_image_input is None:
logger.warning("Received None PIL image for display. Generating a placeholder.")
bgr_image_unscaled = self._create_placeholder_bgr_numpy_array()
# MODIFIED: Added more explicit check for PIL availability and instance type.
# WHY: Ensure PIL is available before attempting conversion from a PIL object.
# HOW: Included PIL_LIB_AVAILABLE_DISPLAY in the check.
elif PIL_LIB_AVAILABLE_DISPLAY and isinstance(map_pil_image_input, Image.Image): # type: ignore
logger.debug(
f"Converting input PIL Image (Size: {map_pil_image_input.size}, Mode: {map_pil_image_input.mode}) to BGR."
)
bgr_image_unscaled = self._convert_pil_image_to_bgr_numpy_array(map_pil_image_input)
else:
# This else branch handles cases where input is not None, not a PIL Image, or PIL is not available.
logger.error(
f"Received unexpected image type for display: {type(map_pil_image_input)}. Or Pillow is missing. Using placeholder."
)
bgr_image_unscaled = self._create_placeholder_bgr_numpy_array()
if bgr_image_unscaled is None: # Fallback if conversion or placeholder failed
logger.error("Failed to obtain a BGR image (unscaled) for display. Using minimal black square.")
# MODIFIED: Create a minimal black image using NumPy for robustness.
# WHY: Ensure a displayable image is created even if placeholder creation fails.
# HOW: Use np.zeros.
if np: # Ensure np is available
bgr_image_unscaled = np.zeros((100, 100, 3), dtype=np.uint8) # type: ignore
else:
logger.critical("NumPy not available, cannot even create fallback black image for imshow.")
return # Cannot proceed without NumPy
# --- Apply Display Scaling ---
scaled_bgr_image_for_display: np.ndarray = bgr_image_unscaled # type: ignore
try:
display_scale = 1.0 # Default scale if not found on facade
# MODIFIED: Added check that app_facade_handler is not None before accessing its attribute.
# WHY: Avoids AttributeError if facade is unexpectedly None.
# HOW: Check 'if self.app_facade_handler and hasattr(...)'.
if self.app_facade_handler and hasattr(self.app_facade_handler, 'current_display_scale_factor'):
# MODIFIED: Added try-except around float conversion of scale factor.
# WHY: Defend against non-numeric scale factor values.
# HOW: Use a try-except block.
try:
display_scale = float(self.app_facade_handler.current_display_scale_factor)
except (ValueError, TypeError) as e_scale_conv:
logger.warning(f"Could not convert scale factor from facade to float: {self.app_facade_handler.current_display_scale_factor}. Using 1.0. Error: {e_scale_conv}")
display_scale = 1.0
if display_scale <= 0: # Prevent invalid scale
logger.warning(f"Invalid scale factor {display_scale} from facade. Using 1.0.")
display_scale = 1.0
else:
logger.warning("Display scale factor not found on app_facade. Defaulting to 1.0.")
unscaled_h, unscaled_w = bgr_image_unscaled.shape[:2]
logger.debug(
f"Unscaled BGR image size: {unscaled_w}x{unscaled_h}. Applying display scale: {display_scale:.3f}"
)
# Only resize if scale is not 1.0 (with tolerance) and image dimensions are valid
if abs(display_scale - 1.0) > 1e-6 and unscaled_w > 0 and unscaled_h > 0:
target_w = max(1, int(round(unscaled_w * display_scale)))
target_h = max(1, int(round(unscaled_h * display_scale)))
interpolation_method = cv2.INTER_LINEAR if display_scale > 1.0 else cv2.INTER_AREA # type: ignore
logger.debug(f"Resizing image from {unscaled_w}x{unscaled_h} to {target_w}x{target_h} using {interpolation_method}.")
scaled_bgr_image_for_display = cv2.resize( # type: ignore
bgr_image_unscaled, (target_w, target_h), interpolation=interpolation_method
)
# else: no scaling needed, scaled_bgr_image_for_display remains bgr_image_unscaled
except Exception as e_scaling_img:
logger.exception(f"Error during image scaling: {e_scaling_img}. Displaying unscaled image.")
scaled_bgr_image_for_display = bgr_image_unscaled # Fallback to unscaled
# --- End Display Scaling ---
current_disp_h, current_disp_w = scaled_bgr_image_for_display.shape[:2]
logger.debug(f"Final scaled image shape for display: {current_disp_w}x{current_disp_h}")
# Recreate OpenCV window if its initialized state suggests it's needed,
# or if the size of the (scaled) image to be displayed has changed.
# Only recreate if window is initialized and its size changed from the last displayed size.
# We also add a check if the window exists.
window_exists = False
try:
# Check if the window property can be retrieved without error
if CV2_NUMPY_LIBS_AVAILABLE_DISPLAY and cv2.getWindowProperty(self.opencv_window_name, cv2.WND_PROP_AUTOSIZE) >= 0: # type: ignore # Any property check works
window_exists = True
except cv2.error: # type: ignore
window_exists = False # Error means window is gone
if self.is_opencv_window_initialized and window_exists and \
(current_disp_h, current_disp_w) != self._last_displayed_scaled_image_shape:
logger.info(
f"Scaled image size changed ({self._last_displayed_scaled_image_shape} -> "
f"{(current_disp_h, current_disp_w)}). Recreating OpenCV window."
)
try:
cv2.destroyWindow(self.opencv_window_name) # type: ignore
cv2.waitKey(5) # Allow OS to process window destruction
self.is_opencv_window_initialized = False
self.is_opencv_mouse_callback_set = False # Callback must be reset
self._last_displayed_scaled_image_shape = (0, 0) # Reset stored size
except cv2.error as e_cv_destroy: # type: ignore
logger.warning(f"Error destroying OpenCV window before recreation: {e_cv_destroy}")
self.is_opencv_window_initialized = False # Force recreation attempt
self.is_opencv_mouse_callback_set = False
self._last_displayed_scaled_image_shape = (0, 0)
# Ensure OpenCV window exists and mouse callback is properly set
# Create window if not initialized or if it was destroyed unexpectedly
if not self.is_opencv_window_initialized or not window_exists:
logger.debug(f"Creating/moving OpenCV window: '{self.opencv_window_name}' (AUTOSIZE)")
try:
cv2.namedWindow(self.opencv_window_name, cv2.WINDOW_AUTOSIZE) # type: ignore
# Try moving the window. This might fail on some systems or if window creation is delayed.
try:
cv2.moveWindow(self.opencv_window_name, self.window_start_x_position, self.window_start_y_position) # type: ignore
except cv2.error as e_cv_move: # type: ignore
logger.warning(f"Could not move OpenCV window '{self.opencv_window_name}': {e_cv_move}")
self.is_opencv_window_initialized = True # Assume created even if move failed
logger.info(f"OpenCV window '{self.opencv_window_name}' (AUTOSIZE) is ready.")
except Exception as e_window_create:
logger.error(f"Failed to create OpenCV window '{self.opencv_window_name}': {e_window_create}")
self.is_opencv_window_initialized = False # Mark as not initialized
self.is_opencv_mouse_callback_set = False
self._last_displayed_scaled_image_shape = (0, 0)
# Cannot proceed to imshow or set mouse callback if window creation failed.
return
# Set mouse callback if the window is initialized and callback hasn't been set
if self.is_opencv_window_initialized and not self.is_opencv_mouse_callback_set:
# MODIFIED: Added check that app_facade_handler is not None before setting callback param.
# WHY: Avoids passing None as param, although OpenCV might handle it, it's safer.
# HOW: Check 'if self.app_facade_handler:'.
if self.app_facade_handler:
try:
# MODIFIED: Set the mouse callback to _opencv_mouse_callback, passing the facade as param.
# WHY: This callback will receive all mouse events (including right click) and flags.
# HOW: Changed the function name and ensured the facade is passed as param.
cv2.setMouseCallback(self.opencv_window_name, self._opencv_mouse_callback, param=self.app_facade_handler) # type: ignore
self.is_opencv_mouse_callback_set = True
logger.info(f"Mouse callback successfully set for '{self.opencv_window_name}'.")
except cv2.error as e_cv_callback: # type: ignore
logger.error(f"Failed to set mouse callback for '{self.opencv_window_name}': {e_cv_callback}")
self.is_opencv_mouse_callback_set = False # Mark as failed to set
else:
logger.warning("App facade is None, cannot set mouse callback parameter.")
self.is_opencv_mouse_callback_set = False
# Display the final (scaled) image if the window is initialized
if self.is_opencv_window_initialized:
try:
cv2.imshow(self.opencv_window_name, scaled_bgr_image_for_display) # type: ignore
# Store the shape of the image that was actually displayed (scaled)
self._last_displayed_scaled_image_shape = (current_disp_h, current_disp_w)
# cv2.waitKey(1) is important for OpenCV to process GUI events.
# The main event loop for this window is expected to be handled by the
# calling process (e.g., the run_map_viewer_process_target loop).
except cv2.error as e_cv_imshow: # type: ignore
# Catch specific OpenCV errors that indicate the window is gone
error_str = str(e_cv_imshow).lower()
if "null window" in error_str or "invalid window" in error_str or "checkview" in error_str:
logger.warning(f"OpenCV window '{self.opencv_window_name}' seems closed or invalid during imshow operation.")
# Reset state flags as the window is gone
self.is_opencv_window_initialized = False
self.is_opencv_mouse_callback_set = False
self._last_displayed_scaled_image_shape = (0, 0)
else: # Re-raise other OpenCV errors if not related to window state
logger.exception(f"OpenCV error during map display (imshow): {e_cv_imshow}")
except Exception as e_disp_final:
logger.exception(f"Unexpected error displaying final map image: {e_disp_final}")
else:
logger.error("Cannot display image: OpenCV window is not initialized.")
# MODIFIED: Modified mouse callback signature to receive event_type and flags.
# WHY: To detect different mouse events (left/right click) and modifier key states (Shift).
# HOW: Added event_type and flags parameters.
def _opencv_mouse_callback(self, event_type: int, x_coord: int, y_coord: int, flags: int, app_facade_param: Any) -> None:
"""
Internal OpenCV mouse callback function.
Invoked by OpenCV when a mouse event occurs in the managed window.
Clamps coordinates and calls the app_facade's handler method with all event details.
'app_facade_param' is expected to be the GeoElevationMapViewer instance.
This callback runs in the OpenCV internal thread.
"""
# MODIFIED: Process event only if it's a button press event (left, right, middle).
# WHY: Ignore mouse movements and other non-click events for simplicity.
# HOW: Check event_type against button press constants.
if event_type in [cv2.EVENT_LBUTTONDOWN, cv2.EVENT_RBUTTONDOWN, cv2.EVENT_MBUTTONDOWN]: # type: ignore
logger.debug(f"OpenCV Mouse Event: {event_type} at raw pixel ({x_coord},{y_coord}), flags: {flags}")
# Get the dimensions of the image currently displayed (which is the scaled image)
current_displayed_height, current_displayed_width = self._last_displayed_scaled_image_shape
if current_displayed_width <= 0 or current_displayed_height <= 0:
logger.warning("Mouse click on map, but no valid displayed image dimensions are stored.")
return # Cannot process click without knowing the displayed image size
# Clamp clicked x, y coordinates to be within the bounds of the displayed (scaled) image
# This is important because the click coordinates can sometimes be slightly outside the window bounds,
# or the image size might momentarily not match the window size.
x_coord_clamped = max(0, min(x_coord, current_displayed_width - 1))
y_coord_clamped = max(0, min(y_coord, current_displayed_height - 1))
logger.debug(
f"Map Window Click (OpenCV raw): ({x_coord},{y_coord}), "
f"Clamped to displayed image ({current_displayed_width}x{current_displayed_height}): "
f"({x_coord_clamped},{y_coord_clamped})"
)
# The app_facade_param should be the GeoElevationMapViewer instance.
# We call its handler method, passing the clamped pixel coordinates on the *displayed* image
# along with the event type and flags.
if app_facade_param and hasattr(app_facade_param, 'handle_map_click_event'):
try:
# MODIFIED: Call the designated handler method on the GeoElevationMapViewer instance,
# passing all relevant event details.
# WHY: GeoElevationMapViewer needs event type and flags to handle different actions (left click, right click, etc.).
# HOW: Changed method name and arguments passed.
app_facade_param.handle_map_click_event(event_type, x_coord_clamped, y_coord_clamped, flags)
logger.debug("Called facade's handle_map_click_event.")
except Exception as e_handle_click:
logger.exception(f"Error executing handle_map_click_event on app facade: {e_handle_click}")
else:
logger.error(
"app_facade_param not correctly passed to OpenCV mouse callback, or it lacks "
"the 'handle_map_click_event' method."
)
def pixel_to_geo_on_current_map(
self,
pixel_x_on_displayed: int, # X coordinate on the (potentially scaled) displayed image
pixel_y_on_displayed: int, # Y coordinate on the (potentially scaled) displayed image
current_map_geo_bounds: Tuple[float, float, float, float], # west, south, east, north of UN SCALED map
displayed_map_pixel_shape: Tuple[int, int], # height, width of SCALED image shown
current_map_native_zoom: int # Zoom level of the UN SCALED map data
) -> Optional[Tuple[float, float]]: # Returns (latitude, longitude)
"""
Converts pixel coordinates from the (potentially scaled) displayed map image
to geographic WGS84 coordinates (latitude, longitude).
This method is called by GeoElevationMapViewer.handle_map_mouse_click.
It uses the stored context of the original, unscaled map (`current_map_geo_bounds`, `current_map_native_zoom`)
and the shape of the image *actually displayed* (`displayed_map_pixel_shape`) to perform the conversion.
"""
if not MERCANTILE_LIB_AVAILABLE_DISPLAY:
logger.error("mercantile library not available for pixel_to_geo conversion.")
return None
if not (current_map_geo_bounds and displayed_map_pixel_shape and current_map_native_zoom is not None):
# This warning indicates the context needed for conversion wasn't properly stored/passed.
logger.warning("Cannot convert pixel to geo: Current map context for conversion is incomplete.")
return None
# Dimensions of the image *as it is displayed* (after scaling)
displayed_height, displayed_width = displayed_map_pixel_shape
# Geographic bounds of the *original, unscaled* map tile data
map_west_lon, map_south_lat, map_east_lon, map_north_lat = current_map_geo_bounds
# Basic validation of dimensions
if displayed_width <= 0 or displayed_height <= 0:
logger.error("Cannot convert pixel to geo: Invalid displayed map dimensions.")
return None
try:
# Use mercantile to get Web Mercator coordinates of the unscaled map's corners
map_ul_merc_x, map_ul_merc_y = mercantile.xy(map_west_lon, map_north_lat) # type: ignore
map_lr_merc_x, map_lr_merc_y = mercantile.xy(map_east_lon, map_south_lat) # type: ignore
total_map_width_merc = abs(map_lr_merc_x - map_ul_merc_x)
total_map_height_merc = abs(map_ul_merc_y - map_lr_merc_y)
# Handle zero dimensions in Mercator space (e.g., invalid geo bounds)
if total_map_width_merc <= 0 or total_map_height_merc <= 0:
logger.error("Cannot convert pixel to geo: Invalid Mercator dimensions for map bounds.")
return None
# Calculate relative position of the click within the *displayed (scaled)* map (0.0 to 1.0)
# Ensure we don't divide by zero if dimensions are unexpectedly zero
relative_x_on_displayed_map = pixel_x_on_displayed / displayed_width if displayed_width > 0 else 0.0
relative_y_on_displayed_map = pixel_y_on_displayed / displayed_height if displayed_height > 0 else 0.0
# Use these relative positions to find the corresponding Web Mercator coordinate
# within the *unscaled* map's Mercator extent.
# Y Mercator increases towards North, pixel Y increases downwards. So use map_ul_merc_y - ...
clicked_point_merc_x = map_ul_merc_x + (relative_x_on_displayed_map * total_map_width_merc)
clicked_point_merc_y = map_ul_merc_y - (relative_y_on_displayed_map * total_map_height_merc)
# Convert Mercator coordinates back to geographic (longitude, latitude)
clicked_lon, clicked_lat = mercantile.lnglat(clicked_point_merc_x, clicked_point_merc_y) # type: ignore
# Return as (latitude, longitude) tuple
return (clicked_lat, clicked_lon)
except Exception as e_px_to_geo:
logger.exception(f"Error during pixel_to_geo conversion for pixel ({pixel_x_on_displayed},{pixel_y_on_displayed}): {e_px_to_geo}")
return None
def geo_to_pixel_on_current_map(
self,
latitude_deg: float,
longitude_deg: float,
current_map_geo_bounds: Tuple[float, float, float, float], # west, south, east, north of UN SCALED map
displayed_map_pixel_shape: Tuple[int, int], # height, width of SCALED image shown
current_map_native_zoom: int # Zoom level of the UN SCALED map data
) -> Optional[Tuple[int, int]]: # Returns (pixel_x, pixel_y) on the SCALED image
"""
Converts geographic WGS84 coordinates to pixel coordinates (x, y)
on the currently displayed (potentially scaled) map image.
This method might be called by the app_facade (GeoElevationMapViewer)
to determine where to draw a marker on the *displayed* image, although
the current drawing implementation in GeoElevationMapViewer draws on the
*unscaled* image and relies on its own direct geo-to-pixel logic for the unscaled image.
This method is kept here for completeness and potential future use if
drawing logic were moved to this class or needed scaled coordinates.
"""
if not MERCANTILE_LIB_AVAILABLE_DISPLAY:
logger.error("mercantile library not available for geo_to_pixel conversion.")
return None
if not (current_map_geo_bounds and displayed_map_pixel_shape and current_map_native_zoom is not None):
# This warning indicates the context needed for conversion wasn't properly stored/passed.
logger.warning("Cannot convert geo to pixel: Current map context for conversion is incomplete.")
return None
# Dimensions of the image *as it is displayed* (after scaling)
displayed_height, displayed_width = displayed_map_pixel_shape
# Geographic bounds of the *original, unscaled* map tile data
map_west_lon, map_south_lat, map_east_lon, map_north_lat = current_map_geo_bounds
# Basic validation of dimensions
if displayed_width <= 0 or displayed_height <= 0:
logger.error("Cannot convert geo to pixel: Invalid displayed map dimensions.")
return None
try:
# Use mercantile to get Web Mercator coordinates of the unscaled map's corners
map_ul_merc_x, map_ul_merc_y = mercantile.xy(map_west_lon, map_north_lat) # type: ignore
map_lr_merc_x, map_lr_merc_y = mercantile.xy(map_east_lon, map_south_lat) # type: ignore
total_map_width_merc = abs(map_lr_merc_x - map_ul_merc_x)
total_map_height_merc = abs(map_ul_merc_y - map_lr_merc_y)
# Handle zero dimensions in Mercator space (e.g., invalid geo bounds)
if total_map_width_merc <= 0 or total_map_height_merc <= 0:
logger.error("Cannot convert geo to pixel: Invalid Mercator dimensions for map bounds.")
return None
# Get Web Mercator coordinates of the target geographic point
target_merc_x, target_merc_y = mercantile.xy(longitude_deg, latitude_deg) # type: ignore
# Calculate relative position of the target geo point within the *unscaled* map's Mercator extent.
# Ensure we don't divide by zero.
relative_merc_x_in_map = (target_merc_x - map_ul_merc_x) / total_map_width_merc if total_map_width_merc > 0 else 0.0
relative_merc_y_in_map = (map_ul_merc_y - target_merc_y) / total_map_height_merc if total_map_height_merc > 0 else 0.0 # Y Mercator increases North, pixel Y downwards
# Convert these relative positions to pixel coordinates on the *displayed (scaled)* image
pixel_x_on_displayed = int(round(relative_merc_x_in_map * displayed_width))
pixel_y_on_displayed = int(round(relative_merc_y_in_map * displayed_height))
# Clamp to the boundaries of the displayed (scaled) image
px_clamped = max(0, min(pixel_x_on_displayed, displayed_width - 1))
py_clamped = max(0, min(pixel_y_on_displayed, displayed_height - 1))
return (px_clamped, py_clamped)
except Exception as e_geo_to_px:
logger.exception(f"Error during geo_to_pixel conversion for geo ({latitude_deg:.5f},{longitude_deg:.5f}): {e_geo_to_px}")
return None
def _create_placeholder_bgr_numpy_array(self) -> np.ndarray: # type: ignore
"""Creates a simple BGR NumPy array to use as a placeholder image."""
placeholder_h = 256 # Default placeholder dimensions
placeholder_w = 256
bgr_light_grey = (220, 220, 220) # BGR color for light grey
# MODIFIED: Added check for NumPy availability before creation.
# WHY: Defend against scenarios where NumPy is None despite initial check (unlikely but safe).
# HOW: Check 'if np:'.
if np: # type: ignore
try:
return np.full((placeholder_h, placeholder_w, 3), bgr_light_grey, dtype=np.uint8) # type: ignore
except Exception as e_np_full:
logger.exception(f"Error creating NumPy full array for placeholder: {e_np_full}. Using zeros fallback.")
# Fallback to zeros array if full() fails
return np.zeros((100, 100, 3), dtype=np.uint8) # type: ignore # Minimal array
else: # Fallback if NumPy somehow became None (should not happen if CV2_NUMPY_AVAILABLE is true)
# This case is highly unlikely if __init__ guard passed.
logger.critical("NumPy became unavailable unexpectedly during placeholder creation.")
# Cannot create a NumPy array, return None which might cause further errors in imshow.
# This indicates a severe issue.
return None # type: ignore
def _convert_pil_image_to_bgr_numpy_array(self, pil_image: ImageType) -> Optional[np.ndarray]: # type: ignore
"""
Converts a PIL Image object to a NumPy BGR array for OpenCV display.
Handles different PIL modes (RGB, RGBA, L/Grayscale).
"""
# MODIFIED: Added check for PIL and CV2/NumPy availability.
# WHY: Ensure dependencies are present before attempting conversion.
# HOW: Added checks.
if not (PIL_LIB_AVAILABLE_DISPLAY and CV2_NUMPY_LIBS_AVAILABLE_DISPLAY and pil_image):
logger.error("Cannot convert PIL to BGR: Pillow, OpenCV/NumPy missing, or input image is None.")
return None
try:
# Convert PIL image to NumPy array. This retains the number of channels.
numpy_image_array = np.array(pil_image) # type: ignore
# Convert based on the number of channels (shape[2]) or dimension (ndim)
if numpy_image_array.ndim == 2: # Grayscale or L mode PIL image
logger.debug("Converting grayscale/L PIL image to BGR NumPy array.")
return cv2.cvtColor(numpy_image_array, cv2.COLOR_GRAY2BGR) # type: ignore
elif numpy_image_array.ndim == 3:
if numpy_image_array.shape[2] == 3: # RGB image
logger.debug("Converting RGB PIL image to BGR NumPy array.")
return cv2.cvtColor(numpy_image_array, cv2.COLOR_RGB2BGR) # type: ignore
elif numpy_image_array.shape[2] == 4: # RGBA image (alpha channel will be stripped)
logger.debug("Converting RGBA PIL image to BGR NumPy array (stripping Alpha).")
return cv2.cvtColor(numpy_image_array, cv2.COLOR_RGBA2BGR) # type: ignore
else:
logger.warning(
f"Unsupported NumPy image shape after PIL conversion ({numpy_image_array.shape}). Cannot convert to BGR."
)
return None
else: # Unexpected number of dimensions
logger.warning(
f"Unexpected NumPy image dimensions ({numpy_image_array.ndim}) after PIL conversion. Cannot convert to BGR."
)
return None
except Exception as e_conv_pil_bgr:
logger.exception(f"Error converting PIL image to BGR NumPy array: {e_conv_pil_bgr}")
return None
def is_window_alive(self) -> bool:
"""Checks if the OpenCV window is likely still open and initialized."""
# MODIFIED: Added check for CV2/NumPy availability.
# WHY: Prevent errors if dependencies are gone.
# HOW: Added initial check.
if not self.is_opencv_window_initialized or not CV2_NUMPY_LIBS_AVAILABLE_DISPLAY:
return False # Not initialized or OpenCV gone
try:
# WND_PROP_VISIBLE returns >= 1.0 if window is visible, 0.0 if hidden/occluded,
# and < 0 (typically -1.0) if window does not exist.
# Check for any property to see if the window handle is still valid.
# getWindowProperty returns -1 if the window does not exist.
window_property_value = cv2.getWindowProperty(self.opencv_window_name, cv2.WND_PROP_AUTOSIZE) # type: ignore
# A value of -1.0 indicates the window does not exist.
if window_property_value >= 0.0: # Window exists
logger.debug(f"Window '{self.opencv_window_name}' property check >= 0.0 ({window_property_value}). Assuming alive.")
# We can also check for visibility specifically if needed:
# visibility = cv2.getWindowProperty(self.opencv_window_name, cv2.WND_PROP_VISIBLE)
# if visibility >= 1.0: return True else False
return True # Window exists and is likely alive
else: # Window likely closed or an issue occurred (property < 0)
logger.debug(
f"Window '{self.opencv_window_name}' property check < 0.0 ({window_property_value}). "
"Assuming it's closed or invalid for interaction."
)
self.is_opencv_window_initialized = False # Update internal state
self.is_opencv_mouse_callback_set = False
self._last_displayed_scaled_image_shape = (0, 0)
return False
except cv2.error: # type: ignore
# OpenCV error (e.g., window name invalid/destroyed).
# This happens if the window was destroyed by user action or other means.
logger.debug(f"OpenCV error when checking property for window '{self.opencv_window_name}'. Assuming closed.")
self.is_opencv_window_initialized = False
self.is_opencv_mouse_callback_set = False
self._last_displayed_scaled_image_shape = (0, 0)
return False
except Exception as e_unexpected_alive_check:
logger.exception(f"Unexpected error checking if window '{self.opencv_window_name}' is alive: {e_unexpected_alive_check}")
self.is_opencv_window_initialized = False # Assume not alive on any other error
self.is_opencv_mouse_callback_set = False
return False
def destroy_window(self) -> None:
"""Explicitly destroys the managed OpenCV window and resets state flags."""
logger.info(f"Attempting to destroy OpenCV window: '{self.opencv_window_name}'")
# MODIFIED: Added check for CV2/NumPy availability before destroying.
# WHY: Prevent errors if dependencies are gone.
# HOW: Added initial check.
if self.is_opencv_window_initialized and CV2_NUMPY_LIBS_AVAILABLE_DISPLAY:
try:
cv2.destroyWindow(self.opencv_window_name) # type: ignore
# It is important to call cv2.waitKey() after destroyWindow to process the event queue
# and ensure the window is actually closed by the OS. A small delay helps.
cv2.waitKey(5) # Give OpenCV a moment to process the destruction
logger.info(f"Window '{self.opencv_window_name}' explicitly destroyed.")
except cv2.error as e_cv_destroy_final: # type: ignore
logger.warning(
f"Ignoring OpenCV error during explicit destroyWindow '{self.opencv_window_name}' "
f"(window might have been already closed by user): {e_cv_destroy_final}"
)
except Exception as e_destroy_final_generic:
logger.exception(
f"Unexpected error during explicit destroy window '{self.opencv_window_name}': {e_destroy_final_generic}"
)
else:
logger.debug(
f"Window '{self.opencv_window_name}' was not marked as initialized or OpenCV is not available. "
"No explicit destroy action taken."
)
# Always reset flags after attempting destroy to ensure clean state regardless of outcome.
self.is_opencv_window_initialized = False
self.is_opencv_mouse_callback_set = False
self._last_displayed_scaled_image_shape = (0, 0)

View File

@ -1,689 +0,0 @@
# geoelevation/map_viewer/map_drawing.py
"""
Utility functions for drawing overlays on map images (PIL).
Handles conversions between geographic coordinates and pixel coordinates
on a stitched map image, and draws markers, boundaries, and labels.
"""
# Standard library imports
import logging
import math
from typing import Optional, Tuple, List, Dict, Any
# Third-party imports
try:
# MODIFIED: Ensure ImageFont is imported correctly from PIL.
# WHY: The ImageFont module is needed directly for font loading, not as a submodule of ImageDraw.
# HOW: Changed the import statement to include ImageFont at the top level.
from PIL import Image, ImageDraw, ImageFont # Import ImageFont for text size/position
PIL_LIB_AVAILABLE_DRAWING = True
except ImportError:
Image = None # type: ignore
ImageDraw = None # type: ignore # Define as None if import fails
# MODIFIED: Define dummy ImageFont as None if import fails.
# WHY: Ensures the ImageFont variable exists and is None if the import failed,
# allowing subsequent checks (e.g., `if ImageFont is None`) to work correctly.
# HOW: Added ImageFont = None.
ImageFont = None # type: ignore # Define as None if import fails
PIL_LIB_AVAILABLE_DRAWING = False
logging.error("MapDrawing: Pillow (PIL) library not found. Drawing operations will fail.")
try:
import cv2 # OpenCV for drawing markers (optional but used)
import numpy as np # Needed by cv2, also for calculations
CV2_NUMPY_LIBS_AVAILABLE_DRAWING = True
except ImportError:
cv2 = None # type: ignore
np = None # type: ignore
CV2_NUMPY_LIBS_AVAILABLE_DRAWING = False
logging.warning("MapDrawing: OpenCV or NumPy not found. Some drawing operations (markers) will be disabled.")
try:
import mercantile # For Web Mercator tile calculations and coordinate conversions
MERCANTILE_LIB_AVAILABLE_DRAWING = True
except ImportError:
mercantile = None # type: ignore
MERCANTILE_LIB_AVAILABLE_DRAWING = False
logging.error("MapDrawing: 'mercantile' library not found. Coordinate conversions will fail.")
# Local application/package imports
# Import constants and potentially shared font from image_processor for consistent style
try:
# MODIFIED: Import DEFAULT_FONT from image_processor directly.
# WHY: The font loading logic in image_processor is preferred.
# HOW: Imported DEFAULT_FONT.
from geoelevation.image_processor import TILE_TEXT_COLOR, TILE_TEXT_BG_COLOR, DEFAULT_FONT
# MODIFIED: Use the imported DEFAULT_FONT directly as the initial font source for labels.
# WHY: This font is loaded based on system availability in image_processor.
# HOW: Assigned DEFAULT_FONT to _DEFAULT_FONT_FOR_LABELS.
_DEFAULT_FONT_FOR_LABELS = DEFAULT_FONT # Use the font loaded in image_processor
# Reusing constants defined in geo_map_viewer for drawing styles
# MODIFIED: Import constants directly from geo_map_viewer.
# WHY: Use the defined constants for consistency.
# HOW: Added import.
from .geo_map_viewer import DEM_BOUNDARY_COLOR, DEM_BOUNDARY_THICKNESS_PX
from .geo_map_viewer import AREA_BOUNDARY_COLOR, AREA_BOUNDARY_THICKNESS_PX
from .geo_map_viewer import DEM_TILE_LABEL_BASE_FONT_SIZE, DEM_TILE_LABEL_BASE_ZOOM # For font scaling
except ImportError:
# MODIFIED: Set _DEFAULT_FONT_FOR_LABELS to None if image_processor import fails.
# WHY: This variable should be None if the preferred font loading method failed.
# HOW: Set the variable to None.
_DEFAULT_FONT_FOR_LABELS = None
# Fallback constants if geo_map_viewer or image_processor constants are not available
DEM_BOUNDARY_COLOR = "red"
DEM_BOUNDARY_THICKNESS_PX = 3
AREA_BOUNDARY_COLOR = "blue"
AREA_BOUNDARY_THICKNESS_PX = 2
TILE_TEXT_COLOR = "white"
TILE_TEXT_BG_COLOR = "rgba(0, 0, 0, 150)"
DEM_TILE_LABEL_BASE_FONT_SIZE = 12
DEM_TILE_LABEL_BASE_ZOOM = 10
logging.warning("MapDrawing: Could not import style constants or default font from image_processor/geo_map_viewer. Using fallbacks.")
# Import necessary map_utils functions
from .map_utils import get_hgt_tile_geographic_bounds # Needed for DEM bounds for drawing
from .map_utils import MapCalculationError # Re-raise calculation errors
# Module-level logger
logger = logging.getLogger(__name__) # Uses 'geoelevation.map_viewer.map_drawing'
# --- Helper for Geo to Pixel Conversion on Unscaled Stitched Image ---
def _geo_to_pixel_on_unscaled_map(
latitude_deg: float,
longitude_deg: float,
current_map_geo_bounds: Optional[Tuple[float, float, float, float]], # west, south, east, north
current_stitched_map_pixel_shape: Optional[Tuple[int, int]] # height, width
) -> Optional[Tuple[int, int]]:
"""
Converts geographic coordinates to pixel coordinates on the UN SCALED stitched PIL map image.
Args:
latitude_deg: Latitude in degrees.
longitude_deg: Longitude in degrees.
current_map_geo_bounds: The geographic bounds of the unscaled stitched map image.
current_stitched_map_pixel_shape: The pixel shape (height, width) of the unscaled image.
Returns:
A tuple (pixel_x, pixel_y) on the unscaled image, or None if conversion fails
or map context is incomplete.
"""
# MODIFIED: Check for necessary libraries and map context at the start.
# WHY: Ensure conditions for conversion are met.
# HOW: Added checks.
if not MERCANTILE_LIB_AVAILABLE_DRAWING or current_map_geo_bounds is None or current_stitched_map_pixel_shape is None:
logger.warning("Map context incomplete or mercantile missing for geo_to_pixel_on_unscaled_map conversion.")
return None
unscaled_height, unscaled_width = current_stitched_map_pixel_shape
map_west_lon, map_south_lat, map_east_lon, map_north_lat = current_map_geo_bounds
if unscaled_width <= 0 or unscaled_height <= 0:
logger.warning("Unscaled map dimensions are zero, cannot convert geo to pixel.")
return None
try:
# Use mercantile to get Web Mercator coordinates of the unscaled map's corners
map_ul_merc_x, map_ul_merc_y = mercantile.xy(map_west_lon, map_north_lat) # type: ignore
map_lr_merc_x, map_lr_merc_y = mercantile.xy(map_east_lon, map_south_lat) # type: ignore
total_map_width_merc = abs(map_lr_merc_x - map_ul_merc_x)
total_map_height_merc = abs(map_ul_merc_y - map_lr_merc_y)
if total_map_width_merc <= 0 or total_map_height_merc <= 0:
logger.warning("Map Mercator extent is zero, cannot convert geo to pixel on unscaled map.")
return None
target_merc_x, target_merc_y = mercantile.xy(longitude_deg, latitude_deg) # type: ignore
# Relative position within the *unscaled* map's Mercator extent
relative_merc_x_in_map = (target_merc_x - map_ul_merc_x) / total_map_width_merc if total_map_width_merc > 0 else 0.0
relative_merc_y_in_map = (map_ul_merc_y - target_merc_y) / total_map_height_merc if total_map_height_merc > 0 else 0.0 # Y Mercator increases North, pixel Y downwards
# Convert to pixel coordinates on the *unscaled* image
pixel_x_on_unscaled = int(round(relative_merc_x_in_map * unscaled_width))
pixel_y_on_unscaled = int(round(relative_merc_y_in_map * unscaled_height))
# Clamp to the boundaries of the unscaled image (allow slight overflow for boundary drawing)
# This clamping logic is more appropriate for drawing lines/points near edges
# A simple clamp to [0, dim-1] might clip drawing unexpectedly.
# A padding based on line thickness is better. Let's use a default padding.
# For point markers, clamping strictly to bounds is often preferred.
# Let's keep this helper simple and clamp to the image bounds.
px_clamped = max(0, min(pixel_x_on_unscaled, unscaled_width -1))
py_clamped = max(0, min(pixel_y_on_unscaled, unscaled_height -1))
return (px_clamped, py_clamped)
except Exception as e_geo_to_px_unscaled:
logger.exception(f"Error during geo_to_pixel_on_unscaled_map conversion for geo ({latitude_deg:.5f},{longitude_deg:.5f}): {e_geo_to_px_unscaled}")
return None
# --- Drawing Functions (formerly methods of GeoElevationMapViewer) ---
def draw_point_marker(
pil_image_to_draw_on: Image.Image,
latitude_deg: float,
longitude_deg: float,
current_map_geo_bounds: Optional[Tuple[float, float, float, float]],
current_stitched_map_pixel_shape: Optional[Tuple[int, int]]
) -> Image.Image:
"""
Draws a point marker at specified geographic coordinates on the provided PIL Image.
Requires OpenCV and NumPy for drawing.
"""
# MODIFIED: Pass map context explicitly. Check for PIL_LIB_AVAILABLE_DRAWING.
# WHY: The function is now standalone and needs context. Ensure PIL is available.
if not PIL_LIB_AVAILABLE_DRAWING or pil_image_to_draw_on is None or current_map_geo_bounds is None or current_stitched_map_pixel_shape is None:
logger.warning("Cannot draw point marker: PIL image or map context missing.")
return pil_image_to_draw_on # Return original image if drawing not possible
# Use the helper method to convert geo to pixel on the UN SCALED map image.
pixel_coords_on_unscaled = _geo_to_pixel_on_unscaled_map(
latitude_deg, longitude_deg,
current_map_geo_bounds, current_stitched_map_pixel_shape # Pass context to helper
)
if pixel_coords_on_unscaled:
px_clamped, py_clamped = pixel_coords_on_unscaled
logger.debug(f"Drawing point marker at unscaled pixel ({px_clamped},{py_clamped}) for geo ({latitude_deg:.5f},{longitude_deg:.5f})")
# MODIFIED: Check for CV2 and NumPy availability before using them.
# WHY: Ensure dependencies are present for drawing with OpenCV.
# HOW: Added check.
if CV2_NUMPY_LIBS_AVAILABLE_DRAWING and cv2 and np:
try:
# Convert PIL image to OpenCV format (BGR) for drawing
# Ensure image is in a mode OpenCV can handle (BGR)
# Converting here ensures drawing is possible if the input image was, e.g., L mode
if pil_image_to_draw_on.mode != 'RGB':
# Convert to RGB first if not already, then to BGR
map_cv_bgr = cv2.cvtColor(np.array(pil_image_to_draw_on.convert('RGB')), cv2.COLOR_RGB2BGR) # type: ignore
else:
map_cv_bgr = cv2.cvtColor(np.array(pil_image_to_draw_on), cv2.COLOR_RGB2BGR) # type: ignore
# Draw a cross marker at the calculated unscaled pixel coordinates
# Note: Marker color (0,0,255) is BGR for red
cv2.drawMarker(map_cv_bgr, (px_clamped, py_clamped), (0,0,255), cv2.MARKER_CROSS, markerSize=15, thickness=2) # type: ignore
# Convert back to PIL format (RGB)
return Image.fromarray(cv2.cvtColor(map_cv_bgr, cv2.COLOR_BGR2RGB)) # type: ignore
except Exception as e_draw_click_cv:
logger.exception(f"Error drawing point marker with OpenCV: {e_draw_click_cv}")
return pil_image_to_draw_on # Return original image on error
else:
logger.warning("CV2 or NumPy not available, cannot draw point marker using OpenCV.")
return pil_image_to_draw_on # Return original image if CV2/NumPy are somehow missing here
else:
logger.warning(f"Geo-to-pixel conversion failed for ({latitude_deg:.5f},{longitude_deg:.5f}), cannot draw point marker.")
return pil_image_to_draw_on # Return original image
def draw_area_bounding_box(
pil_image_to_draw_on: Image.Image,
area_geo_bbox: Tuple[float, float, float, float], # west, south, east, north
current_map_geo_bounds: Optional[Tuple[float, float, float, float]],
current_stitched_map_pixel_shape: Optional[Tuple[int, int]],
color: str = "blue", # Allow specifying color
thickness: int = 2 # Allow specifying thickness
) -> Image.Image:
"""
Draws an area bounding box on the provided PIL Image.
Requires PIL and ImageDraw.
"""
# MODIFIED: Pass map context explicitly. Check for PIL_LIB_AVAILABLE_DRAWING and ImageDraw.
# WHY: The function is now standalone and needs context. Ensure PIL/ImageDraw are available.
# MODIFIED: Use predefined DEM boundary style constants.
# WHY: Centralize styling.
# HOW: Pass DEM_BOUNDARY_COLOR and DEM_BOUNDARY_THICKNESS_PX to draw_area_bounding_box.
if not PIL_LIB_AVAILABLE_DRAWING or ImageDraw is None or pil_image_to_draw_on is None or current_map_geo_bounds is None or current_stitched_map_pixel_shape is None:
logger.warning("Cannot draw area BBox: PIL image, ImageDraw, or map context missing.")
return pil_image_to_draw_on # Return original image if drawing not possible
west, south, east, north = area_geo_bbox
# Corners of the box in geographic degrees
corners_geo = [(west, north), (east, north), (east, south), (west, south)]
pixel_corners: List[Tuple[int,int]] = []
try:
# Convert all corners to pixel coordinates on the *unscaled* image, suitable for drawing lines.
# Recalculate pixel coords relative to the unscaled map using mercantile for line drawing accuracy,
# and clamp with padding.
unscaled_height, unscaled_width = current_stitched_map_pixel_shape
map_west_lon_stitched, map_south_lat_stitched, map_east_lon_stitched, map_north_lat_stitched = current_map_geo_bounds
map_ul_merc_x_stitched, map_ul_merc_y_stitched = mercantile.xy(map_west_lon_stitched, map_north_lat_stitched) # type: ignore
map_lr_merc_x_stitched, map_lr_merc_y_stitched = mercantile.xy(map_east_lon_stitched, map_south_lat_stitched) # type: ignore
total_map_width_merc = abs(map_lr_merc_x_stitched - map_ul_merc_x_stitched)
total_map_height_merc = abs(map_ul_merc_y_stitched - map_lr_merc_y_stitched)
if total_map_width_merc <= 0 or total_map_height_merc <= 0:
raise ValueError("Map Mercator extent is zero for drawing lines.")
import mercantile as local_mercantile # Use mercantile directly here
if local_mercantile is None: raise ImportError("mercantile not available locally.")
for lon, lat in corners_geo:
target_merc_x, target_merc_y = local_mercantile.xy(lon, lat) # type: ignore
relative_merc_x_in_map = (target_merc_x - map_ul_merc_x_stitched) / total_map_width_merc if total_map_width_merc > 0 else 0.0
relative_merc_y_in_map = (map_ul_merc_y_stitched - target_merc_y) / total_map_height_merc if total_map_height_merc > 0 else 0.0 # Y Mercator increases North, pixel Y downwards
pixel_x_on_unscaled_raw = int(round(relative_merc_x_in_map * unscaled_width))
pixel_y_on_unscaled_raw = int(round(relative_merc_y_in_map * unscaled_height))
# Clamp with padding for line drawing - allow coordinates slightly outside image bounds
px_clamped_for_line = max(-thickness, min(pixel_x_on_unscaled_raw, unscaled_width + thickness))
py_clamped_for_line = max(-thickness, min(pixel_y_on_unscaled_raw, unscaled_height + thickness))
pixel_corners.append((px_clamped_for_line, py_clamped_for_line))
except Exception as e_geo_to_px_bbox:
logger.exception(f"Error during geo_to_pixel conversion for BBox drawing: {e_geo_to_px_bbox}")
return pil_image_to_draw_on # Return original image on error
# MODIFIED: Check ImageDraw availability before using it.
# WHY: Ensure dependency is present. (Already checked at function start, but defensive)
# HOW: Added check.
if len(pixel_corners) == 4 and ImageDraw is not None:
logger.debug(f"Drawing area BBox with unscaled pixel corners: {pixel_corners}")
# Ensure image is in a mode that supports drawing (RGB or RGBA)
# Converting here ensures drawing is possible if the input image was, e.g., L mode
if pil_image_to_draw_on.mode not in ("RGB", "RGBA"):
pil_image_to_draw_on = pil_image_to_draw_on.convert("RGBA") # Prefer RGBA for drawing
draw = ImageDraw.Draw(pil_image_to_draw_on)
# Draw lines connecting the corner points
try:
draw.line([pixel_corners[0], pixel_corners[1]], fill=color, width=thickness) # Top edge
draw.line([pixel_corners[1], pixel_corners[2]], fill=color, width=thickness) # Right edge
draw.line([pixel_corners[2], pixel_corners[3]], fill=color, width=thickness) # Bottom edge
draw.line([pixel_corners[3], pixel_corners[0]], fill=color, width=thickness) # Left edge
except Exception as e_draw_lines:
logger.exception(f"Error drawing BBox lines: {e_draw_lines}")
return pil_image_to_draw_on
else:
logger.warning("Not enough pixel corners calculated for BBox, or ImageDraw missing.")
return pil_image_to_draw_on # Return original image
def draw_dem_tile_boundary(
pil_image_to_draw_on: Image.Image,
dem_tile_geo_bbox: Tuple[float, float, float, float],
current_map_geo_bounds: Optional[Tuple[float, float, float, float]],
current_stitched_map_pixel_shape: Optional[Tuple[int, int]]
) -> Image.Image:
"""
Draws a boundary box for a single DEM tile on the provided PIL Image.
Requires PIL and ImageDraw. Uses predefined DEM boundary style.
"""
# MODIFIED: Pass map context explicitly. Check for PIL_LIB_AVAILABLE_DRAWING and ImageDraw.
# WHY: The function is now standalone and needs context. Ensure PIL/ImageDraw are available.
# MODIFIED: Use predefined DEM boundary style constants.
# WHY: Centralize styling.
# HOW: Pass DEM_BOUNDARY_COLOR and DEM_BOUNDARY_THICKNESS_PX to draw_area_bounding_box.
if not PIL_LIB_AVAILABLE_DRAWING or ImageDraw is None or pil_image_to_draw_on is None or current_map_geo_bounds is None or current_stitched_map_pixel_shape is None:
logger.warning("Cannot draw DEM tile boundary: PIL image, ImageDraw, or map context missing.")
return pil_image_to_draw_on
logger.debug(f"Drawing DEM tile boundary on map for bbox: {dem_tile_geo_bbox}")
# Use the generic area drawing function with specific style
return draw_area_bounding_box(
pil_image_to_draw_on,
dem_tile_geo_bbox,
current_map_geo_bounds,
current_stitched_map_pixel_shape,
color=DEM_BOUNDARY_COLOR,
thickness=DEM_BOUNDARY_THICKNESS_PX
)
def draw_dem_tile_boundaries_with_labels(
pil_image_to_draw_on: Image.Image,
dem_tiles_info_list: List[Dict],
current_map_geo_bounds: Optional[Tuple[float, float, float, float]],
current_map_render_zoom: Optional[int], # Needed for font scaling
current_stitched_map_pixel_shape: Optional[Tuple[int, int]]
) -> Image.Image:
"""
Draws boundaries and names for a list of DEM tiles on the provided PIL Image.
Draws only for tiles marked as available HGT.
Requires PIL, ImageDraw, and map_utils.get_hgt_tile_geographic_bounds.
"""
# MODIFIED: Pass map context explicitly. Check for necessary libraries and context.
# WHY: The function is now standalone and needs context. Ensure dependencies are available.
if not PIL_LIB_AVAILABLE_DRAWING or ImageDraw is None or pil_image_to_draw_on is None or current_map_geo_bounds is None or current_map_render_zoom is None or current_stitched_map_pixel_shape is None:
logger.warning("Cannot draw multiple DEM tile boundaries/labels: PIL image, ImageDraw, or map context missing.")
return pil_image_to_draw_on
if not dem_tiles_info_list:
logger.debug("No DEM tile info provided for drawing multiple boundaries.")
return pil_image_to_draw_on # Nothing to draw
logger.debug(f"Drawing {len(dem_tiles_info_list)} DEM tile boundaries and labels.")
# Ensure image is in a mode that supports drawing (RGB or RGBA)
# Converting here ensures drawing is possible if the input image was, e.g., L mode
if pil_image_to_draw_on.mode not in ("RGB", "RGBA"):
pil_image_to_draw_on = pil_image_to_draw_on.convert("RGBA")
draw = ImageDraw.Draw(pil_image_to_draw_on)
# Attempt to use a font loaded in image_processor for consistency (_DEFAULT_FONT_FOR_LABELS)
# If that failed (e.g., image_processor not imported or font load failed there),
# fallback to default PIL font if ImageFont module is available.
font_to_use = _DEFAULT_FONT_FOR_LABELS
# MODIFIED: Corrected the fallback logic for loading the default PIL font.
# WHY: The previous attempt `ImageDraw.ImageFont.load_default()` caused an AttributeError.
# The correct way is to use `ImageFont.load_default()` if the `ImageFont` module
# was successfully imported at the top of the file.
# HOW: Changed `ImageDraw.ImageFont.load_default()` to `ImageFont.load_default()` and added a check
# `if ImageFont is not None` to ensure the module is available before calling its method.
if font_to_use is None: # If the preferred font from image_processor is not available
if PIL_LIB_AVAILABLE_DRAWING and ImageFont is not None: # Check if PIL and ImageFont module are available
try:
font_to_use = ImageFont.load_default() # type: ignore # Use the correct way to load default font
logger.debug("Using default PIL font for DEM tile labels (fallback).")
except Exception as e_default_font:
logger.warning(f"Could not load default PIL font: {e_default_font}. Cannot draw text labels.")
font_to_use = None # Ensure font_to_use is None if loading default also fails
else:
logger.debug("Pillow (PIL) or ImageFont module not available, skipping text label drawing.")
# font_to_use remains None, text drawing logic will be skipped below.
# MODIFIED: Calculate font size based on current map zoom.
# WHY: To make labels more readable at different zoom levels.
# HOW: Use a simple linear scaling based on a base zoom and base font size.
current_map_zoom = current_map_render_zoom # Use the zoom level the map was rendered at
if current_map_zoom is None: # Should not be None based on function signature and checks, but defensive
logger.warning("Current map zoom is None, cannot scale font. Using base size.")
current_map_zoom = DEM_TILE_LABEL_BASE_ZOOM # Default to base zoom for size calc
# Simple linear scaling: size increases by 1 for each zoom level above base zoom
# Ensure minimum sensible font size (e.g., 6)
scaled_font_size = max(6, DEM_TILE_LABEL_BASE_FONT_SIZE + (current_map_zoom - DEM_TILE_LABEL_BASE_ZOOM))
logger.debug(f"Calculated label font size {scaled_font_size} for zoom {current_map_zoom}.")
# Update the font instance with the calculated size (if using truetype font)
# If load_default is used, resizing is often not possible or behaves differently.
# Let's re-load the font with the scaled size if it's a truetype font.
# This requires knowing the path of the original font used by image_processor, which is tricky.
# Alternative: Store the font path and size calculation logic from image_processor here.
# Or, maybe simpler, if using load_default fallback, just use the default size.
# Let's assume if _DEFAULT_FONT_FOR_LABELS is not None and has the 'font' attribute, it's a truetype font we can resize.
# Also need to check if ImageFont module is available before using ImageFont.truetype.
if font_to_use and hasattr(font_to_use, 'font') and ImageFont is not None: # Check if it looks like a truetype font object and ImageFont module is available
try:
# Get the original font object's path and index from Pillow's internal structure
original_font_path = font_to_use.font.path # type: ignore
font_index = font_to_use.font.index # type: ignore # Handle TTC files
# MODIFIED: Call ImageFont.truetype correctly.
# WHY: Ensure the call uses the correctly imported module.
# HOW: Used ImageFont.truetype instead of ImageDraw.ImageFont.truetype.
font_to_use = ImageFont.truetype(original_font_path, scaled_font_size, index=font_index) # type: ignore
logger.debug(f"Resized truetype font to {scaled_font_size}.")
except Exception as e_resize_font:
logger.warning(f"Could not resize truetype font: {e_resize_font}. Using original size.")
# Keep the font_to_use as it was (original size)
for tile_info in dem_tiles_info_list:
# Draw only if HGT data is available for this tile
if not tile_info.get("hgt_available"):
logger.debug(f"Skipping drawing boundary/label for tile {tile_info.get('tile_base_name', '?')}: HGT not available.")
continue # Skip this tile if no HGT
lat_coord = tile_info.get("latitude_coord")
lon_coord = tile_info.get("longitude_coord")
tile_base_name = tile_info.get("tile_base_name")
if lat_coord is None or lon_coord is None or tile_base_name is None:
logger.warning(f"Skipping drawing for invalid tile info entry: {tile_info}")
continue
try:
# Get the precise geographic bounds for this HGT tile
tile_geo_bbox = get_hgt_tile_geographic_bounds(lat_coord, lon_coord)
if not tile_geo_bbox:
logger.warning(f"Could not get geographic bounds for tile ({lat_coord},{lon_coord}), skipping drawing.")
continue # Skip tile if bounds calculation fails
west, south, east, north = tile_geo_bbox
# Corners of this specific tile's bbox in geographic degrees
tile_corners_geo = [(west, north), (east, north), (east, south), (west, south)]
tile_pixel_corners_on_unscaled: List[Tuple[int,int]] = []
# Convert tile corners to unscaled pixel coordinates, suitable for drawing lines.
# Recalculate pixel coords relative to the unscaled map using mercantile for line drawing accuracy,
# and clamp with padding.
if current_map_geo_bounds is not None and current_stitched_map_pixel_shape is not None:
unscaled_height, unscaled_width = current_stitched_map_pixel_shape
map_west_lon_stitched, map_south_lat_stitched, map_east_lon_stitched, map_north_lat_stitched = current_map_geo_bounds
map_ul_merc_x_stitched, map_ul_merc_y_stitched = mercantile.xy(map_west_lon_stitched, map_north_lat_stitched) # type: ignore
map_lr_merc_x_stitched, map_lr_merc_y_stitched = mercantile.xy(map_east_lon_stitched, map_south_lat_stitched) # type: ignore
total_map_width_merc = abs(map_lr_merc_x_stitched - map_ul_merc_x_stitched)
total_map_height_merc = abs(map_ul_merc_y_stitched - map_lr_merc_y_stitched)
if total_map_width_merc <= 0 or total_map_height_merc <= 0:
logger.warning("Map Mercator extent is zero for drawing tile boundaries.")
continue # Skip this tile if map extent is zero
import mercantile as local_mercantile # Use mercantile directly here
if local_mercantile is None: raise ImportError("mercantile not available locally.")
for lon, lat in tile_corners_geo:
target_merc_x, target_merc_y = local_mercantile.xy(lon, lat) # type: ignore
relative_merc_x_in_map = (target_merc_x - map_ul_merc_x_stitched) / total_map_width_merc if total_map_width_merc > 0 else 0.0
relative_merc_y_in_map = (map_ul_merc_y_stitched - target_merc_y) / total_map_height_merc if total_map_height_merc > 0 else 0.0 # Y Mercator increases North, pixel Y downwards
pixel_x_on_unscaled_raw = int(round(relative_merc_x_in_map * unscaled_width))
pixel_y_on_unscaled_raw = int(round(relative_merc_y_in_map * unscaled_height))
# Clamp with padding for line drawing - allow coordinates slightly outside image bounds
px_clamped_for_line = max(-DEM_BOUNDARY_THICKNESS_PX, min(pixel_x_on_unscaled_raw, unscaled_width + DEM_BOUNDARY_THICKNESS_PX))
py_clamped_for_line = max(-DEM_BOUNDARY_THICKNESS_PX, min(pixel_y_on_unscaled_raw, unscaled_height + DEM_BOUNDARY_THICKNESS_PX))
tile_pixel_corners_on_unscaled.append((px_clamped_for_line, py_clamped_for_line))
else:
logger.warning(f"Unscaled map dimensions or geo bounds not available, cannot clamp pixel corners for tile ({lat_coord},{lon_coord}).")
continue # Skip this tile if map context is missing
if len(tile_pixel_corners_on_unscaled) == 4:
# Draw the tile boundary (red)
draw.line([tile_pixel_corners_on_unscaled[0], tile_pixel_corners_on_unscaled[1]], fill=DEM_BOUNDARY_COLOR, width=DEM_BOUNDARY_THICKNESS_PX) # Top
draw.line([tile_pixel_corners_on_unscaled[1], tile_pixel_corners_on_unscaled[2]], fill=DEM_BOUNDARY_COLOR, width=DEM_BOUNDARY_THICKNESS_PX) # Right
draw.line([tile_pixel_corners_on_unscaled[2], tile_pixel_corners_on_unscaled[3]], fill=DEM_BOUNDARY_COLOR, width=DEM_BOUNDARY_THICKNESS_PX) # Bottom
draw.line([tile_pixel_corners_on_unscaled[3], tile_pixel_corners_on_unscaled[0]], fill=DEM_BOUNDARY_COLOR, width=DEM_BOUNDARY_THICKNESS_PX) # Left
# --- Draw Tile Name Label ---
label_text = tile_base_name.upper()
# Find pixel position for label - bottom-right corner area of the tile's pixel box.
# Get the precise unscaled pixel coords for the SE corner using the helper (which clamps to edge)
se_pixel_on_unscaled = _geo_to_pixel_on_unscaled_map(
south, east,
current_map_geo_bounds, # Pass context to helper
current_stitched_map_pixel_shape # Pass context to helper
)
label_margin = 3 # Small margin from the border
# Draw text only if a font is available and position is calculable
# MODIFIED: Check if font_to_use is not None before attempting to use it.
# WHY: The font might not have been loaded due to missing libraries or errors.
# HOW: Added the check.
if font_to_use is not None and se_pixel_on_unscaled and current_stitched_map_pixel_shape is not None:
try:
unscaled_height, unscaled_width = current_stitched_map_pixel_shape
se_px, se_py = se_pixel_on_unscaled
# Calculate text size using textbbox (requires Pillow >= 8.0)
# Use the bottom-right corner pixel as the anchor point for calculation (not drawing)
try:
# textbbox relative to (0,0)
# MODIFIED: Use font_to_use variable directly instead of ImageFont.truetype.
# WHY: font_to_use already holds the appropriate font object (potentially scaled).
# HOW: Changed font parameter in textbbox and text calls.
text_bbox = draw.textbbox((0,0), label_text, font=font_to_use) # type: ignore
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
# Target bottom-right corner for text drawing relative to the unscaled image pixel
target_text_br_x = se_px - label_margin
target_text_br_y = se_py - label_margin
# Top-left corner for drawing the text based on target bottom-right and text size
label_text_x = target_text_br_x - text_width
label_text_y = target_text_br_y - text_height
# Clamp text position to be within the visible area of the unscaled map image
label_text_x = max(0, min(label_text_x, unscaled_width - text_width - 1))
label_text_y = max(0, min(label_text_y, unscaled_height - text_height - 1))
# Draw background rectangle for the text
bg_padding = 1 # Small padding around text background
bg_coords = [
label_text_x - bg_padding,
label_text_y - bg_padding,
label_text_x + text_width + bg_padding,
label_text_y + text_height + bg_padding,
]
draw.rectangle(bg_coords, fill=TILE_TEXT_BG_COLOR)
# Draw the text itself
draw.text((label_text_x, label_text_y), label_text, fill=TILE_TEXT_COLOR, font=font_to_use) # type: ignore
except AttributeError:
# Fallback for older Pillow versions using textsize
logger.warning("Pillow textbbox not available (Pillow < 8.0). Using textsize fallback for labels.")
# MODIFIED: Use font_to_use variable directly.
# WHY: Ensure the correct font object is used.
# HOW: Changed font parameter.
text_width, text_height = draw.textsize(label_text, font=font_to_use) # type: ignore
# Rough position calculation based on textsize
if current_stitched_map_pixel_shape and se_pixel_on_unscaled:
unscaled_height, unscaled_width = current_stitched_map_pixel_shape
se_px, se_py = se_pixel_on_unscaled
label_text_x = se_px - text_width - label_margin
label_text_y = se_py - text_height - label_margin
# Clamp text position to be within the visible area
label_text_x = max(0, min(label_text_x, unscaled_width - text_width - 1))
label_text_y = max(0, min(label_text_y, unscaled_height - text_height - 1))
# Draw background
bg_padding = 1
bg_coords = [label_text_x - bg_padding, label_text_y - bg_padding,
label_text_x + text_width + bg_padding, label_text_y + text_height + bg_padding]
draw.rectangle(bg_coords, fill=TILE_TEXT_BG_COLOR)
# Draw text using font fallback
# MODIFIED: Use font_to_use variable directly.
# WHY: Ensure the correct font object is used.
# HOW: Changed font parameter.
draw.text((label_text_x, label_text_y), label_text, fill=TILE_TEXT_COLOR, font=font_to_use) # type: ignore
else:
logger.warning(f"Could not get SE pixel coords for tile ({lat_coord},{lon_coord}) label positioning (textsize fallback).")
except Exception as e_draw_label:
logger.warning(f"Error drawing label '{label_text}' for tile ({lat_coord},{lon_coord}): {e_draw_label}")
else:
# This log message is now accurate as the outer if font_to_use check handles the case where no font was loaded.
logger.debug(f"No font available, skipping drawing label '{label_text}' for tile ({lat_coord},{lon_coord}).")
except ValueError as ve: # Catch explicit ValueErrors raised for conversion/drawing issues for a single tile
logger.warning(f"Value error during drawing for tile ({lat_coord},{lon_coord}): {ve}. Skipping this tile.")
pass # Skip this tile and continue with the next
except Exception as e_draw_tile:
logger.exception(f"Unexpected error drawing boundary/label for tile ({lat_coord},{lon_coord}): {e_draw_tile}. Skipping this tile.")
pass # Skip this tile and continue with the next
# Return the image with all drawn boundaries and labels
return pil_image_to_draw_on
def draw_user_click_marker(
pil_image_to_draw_on: Image.Image,
last_user_click_pixel_coords_on_displayed_image: Optional[Tuple[int, int]],
current_display_scale_factor: float,
current_stitched_map_pixel_shape: Optional[Tuple[int, int]]
) -> Optional[Image.Image]:
"""
Draws a marker at the last user-clicked pixel location on the (unscaled) PIL map image.
Requires OpenCV and NumPy for drawing.
"""
# MODIFIED: Pass map context explicitly. Check for necessary libraries and context.
# WHY: The function is now standalone and needs context. Ensure dependencies are available.
if not PIL_LIB_AVAILABLE_DRAWING or pil_image_to_draw_on is None or not CV2_NUMPY_LIBS_AVAILABLE_DRAWING or cv2 is None or np is None or current_stitched_map_pixel_shape is None or last_user_click_pixel_coords_on_displayed_image is None:
logger.debug("Conditions not met for drawing user click marker (no click, no image, no context, or libraries missing).")
return pil_image_to_draw_on # Return original image if drawing not possible
# Unscale the click coordinates from displayed (scaled) image to original stitched image coordinates
clicked_px_scaled, clicked_py_scaled = last_user_click_pixel_coords_on_displayed_image
# Get the shape of the original unscaled stitched image
unscaled_height, unscaled_width = current_stitched_map_pixel_shape
# Calculate the unscaled pixel coordinates corresponding to the clicked scaled pixel
unscaled_target_px = int(round(clicked_px_scaled / current_display_scale_factor))
unscaled_target_py = int(round(clicked_py_scaled / current_display_scale_factor))
# Clamp to unscaled image dimensions
unscaled_target_px = max(0, min(unscaled_target_px, unscaled_width - 1))
unscaled_target_py = max(0, min(unscaled_target_py, unscaled_height - 1))
# MODIFIED: Check for CV2 and NumPy availability before using them.
# WHY: Ensure dependencies are present for drawing with OpenCV. (Already checked at start, but defensive).
# HOW: Added check.
if cv2 and np:
try:
logger.debug(f"Drawing user click marker at unscaled pixel ({unscaled_target_px},{unscaled_target_py})")
# Convert PIL image to OpenCV format (BGR) for drawing
# Ensure image is in a mode OpenCV can handle (BGR)
# Converting here ensures drawing is possible if the input image was, e.g., L mode
if pil_image_to_draw_on.mode != 'RGB':
# Convert to RGB first if not already, then to BGR
map_cv_bgr = cv2.cvtColor(np.array(pil_image_to_draw_on.convert('RGB')), cv2.COLOR_RGB2BGR) # type: ignore
else:
map_cv_bgr = cv2.cvtColor(np.array(pil_image_to_draw_on), cv2.COLOR_RGB2BGR) # type: ignore
# Draw a cross marker at the calculated unscaled pixel coordinates
# Note: Marker color (0,0,255) is BGR for red
cv2.drawMarker(map_cv_bgr, (unscaled_target_px,unscaled_target_py), (0,0,255), cv2.MARKER_CROSS, markerSize=15, thickness=2) # type: ignore
# Convert back to PIL format (RGB)
return Image.fromarray(cv2.cvtColor(map_cv_bgr, cv2.COLOR_BGR2RGB)) # type: ignore
except Exception as e_draw_click_cv:
logger.exception(f"Error drawing user click marker with OpenCV: {e_draw_click_cv}")
return pil_image_to_draw_on # Return original image on error
else:
logger.warning("CV2 or NumPy not available, cannot draw user click marker.")
return pil_image_to_draw_on # Return original image if CV2/NumPy are somehow missing here

View File

@ -1,775 +0,0 @@
# geoelevation/map_viewer/map_manager.py
"""
Manages the retrieval, caching, and stitching of map tiles from a selected map service.
This module interacts with a map service provider (implementing BaseMapService),
handles local disk caching of tiles to support offline use and reduce network
requests, and assembles individual tiles into a complete map image.
"""
# Standard library imports
import logging
import os
import time
import threading
from pathlib import Path # For robust path manipulation
from typing import Tuple, Optional, List, Dict # Ensure these are available
import io # To handle byte stream from requests as an image
import shutil # For cache clearing operations
# Third-party imports
try:
import requests # For downloading map tiles
REQUESTS_AVAILABLE = True
except ImportError:
requests = None # type: ignore
REQUESTS_AVAILABLE = False
logging.error("MapTileManager: 'requests' library not found. Online tile fetching will fail.")
try:
# MODIFIED: Import ImageDraw along with Image from PIL.
# WHY: ImageDraw is required for drawing on placeholder images and potentially other image manipulation tasks.
# HOW: Added ImageDraw to the import list from PIL.
from PIL import Image, ImageDraw
ImageType = Image.Image # type: ignore
PIL_AVAILABLE_MANAGER = True
except ImportError:
Image = None # type: ignore
ImageDraw = None # type: ignore # Define as None if import fails
ImageType = None # type: ignore
logging.error("MapTileManager: 'Pillow' library not found. Image operations will fail.")
# Local application/package imports
# Assumes map_services is in the same subpackage 'map_viewer'
from .map_services import BaseMapService
# Module-level logger
logger = logging.getLogger(__name__)
# Default values for the manager if not provided or configured
DEFAULT_MAP_TILE_CACHE_ROOT_DIR = "map_tile_cache" # Root for all service caches
DEFAULT_ENABLE_ONLINE_FETCHING = True
DEFAULT_NETWORK_TIMEOUT_SECONDS = 10 # Increased timeout slightly
DEFAULT_DOWNLOAD_RETRY_DELAY_SECONDS = 2
DEFAULT_MAX_DOWNLOAD_RETRIES = 2
DEFAULT_PLACEHOLDER_COLOR_RGB = (220, 220, 220) # Light grey placeholder
class MapTileManager:
"""
Manages fetching, caching, and assembling map tiles for a given map service.
Requires 'requests' and 'Pillow' libraries to be installed.
"""
def __init__(
self,
map_service: BaseMapService,
cache_root_directory: Optional[str] = None,
enable_online_tile_fetching: Optional[bool] = None,
# MODIFIED: Added tile_pixel_size parameter to the constructor.
# WHY: To allow the caller (GeoElevationMapViewer) to explicitly specify the tile size
# based on the selected map service configuration.
# HOW: Added the parameter with an Optional[int] type hint and default None.
tile_pixel_size: Optional[int] = None
) -> None:
"""
Initializes the MapTileManager.
Args:
map_service_instance: An instance of a map service provider
(e.g., OpenStreetMapService).
cache_root_directory: The root directory for caching tiles from all services.
If None, uses DEFAULT_MAP_TILE_CACHE_ROOT_DIR.
A subdirectory for the specific service will be created.
enable_online_tile_fetching: Whether to download tiles if not found in cache.
If None, uses DEFAULT_ENABLE_ONLINE_FETCHING.
tile_pixel_size: The pixel dimension (width/height) of map tiles for this manager.
If None, the size is taken from the map_service instance.
Raises:
TypeError: If map_service_instance is not a valid BaseMapService instance.
ImportError: If 'requests' or 'Pillow' libraries are not installed.
ValueError: If a tile_pixel_size is provided but invalid.
"""
logger.info("Initializing MapTileManager...")
if not REQUESTS_AVAILABLE:
raise ImportError("'requests' library is required by MapTileManager but not found.")
# MODIFIED: Check for ImageDraw availability as well if Pillow is expected.
# WHY: Drawing on placeholders requires ImageDraw.
# HOW: Added ImageDraw check.
if not (PIL_AVAILABLE_MANAGER and ImageDraw is not None):
raise ImportError("'Pillow' library (or its drawing module ImageDraw) is required by MapTileManager but not found.")
if not isinstance(map_service, BaseMapService):
logger.critical("Invalid map_service_instance provided. Must be an instance of BaseMapService.")
raise TypeError("map_service_instance must be an instance of BaseMapService.")
self.map_service: BaseMapService = map_service
self.service_identifier_name: str = self.map_service.name
# MODIFIED: Set the tile_size attribute using the provided parameter or the service's size.
# WHY: The manager needs to know the pixel dimensions of the tiles it handles for stitching and placeholder creation.
# HOW: Check if tile_pixel_size was provided; if so, validate and use it. Otherwise, use the size from the map_service instance.
if tile_pixel_size is not None:
if not isinstance(tile_pixel_size, int) or tile_pixel_size <= 0:
logger.error(f"Invalid provided tile_pixel_size: {tile_pixel_size}. Must be a positive integer.")
# Fallback to service size or raise error? Let's raise for clarity if a bad value is explicitly passed.
raise ValueError(f"Invalid provided tile_pixel_size: {tile_pixel_size}. Must be a positive integer.")
self.tile_size: int = tile_pixel_size
logger.info(f"Map tile size explicitly set to {self.tile_size}px.")
else:
# Use the size from the provided map service instance
self.tile_size: int = self.map_service.tile_size
logger.info(f"Map tile size inherited from service '{self.map_service.name}': {self.tile_size}px.")
# Determine cache directory path
effective_cache_root_dir = cache_root_directory if cache_root_directory is not None \
else DEFAULT_MAP_TILE_CACHE_ROOT_DIR
self.service_specific_cache_dir: Path = Path(effective_cache_root_dir) / self.service_identifier_name
logger.info(f"Service-specific cache directory set to: {self.service_specific_cache_dir}")
# Determine online fetching status
self.is_online_fetching_enabled: bool = enable_online_tile_fetching \
if enable_online_tile_fetching is not None else DEFAULT_ENABLE_ONLINE_FETCHING
logger.info(f"Online tile fetching enabled: {self.is_online_fetching_enabled}")
# Network request parameters
self.http_user_agent: str = "GeoElevationMapViewer/0.1 (Python Requests)"
self.http_request_headers: Dict[str, str] = {"User-Agent": self.http_user_agent}
self.http_request_timeout_seconds: int = DEFAULT_NETWORK_TIMEOUT_SECONDS
self.download_max_retries: int = DEFAULT_MAX_DOWNLOAD_RETRIES
self.download_retry_delay_seconds: int = DEFAULT_DOWNLOAD_RETRY_DELAY_SECONDS
self._ensure_service_cache_directory_exists()
self._cache_access_lock = threading.Lock() # For thread-safe cache operations
logger.info(
f"MapTileManager initialized for service '{self.service_identifier_name}'. Online: {self.is_online_fetching_enabled}"
)
def _ensure_service_cache_directory_exists(self) -> None:
"""Creates the service-specific cache directory if it doesn't exist."""
try:
self.service_specific_cache_dir.mkdir(parents=True, exist_ok=True)
logger.debug(f"Cache directory verified/created: {self.service_specific_cache_dir}")
except OSError as e_mkdir:
logger.error(
f"Failed to create cache directory '{self.service_specific_cache_dir}': {e_mkdir}"
)
except Exception as e_unexpected_mkdir:
logger.exception(
f"Unexpected error ensuring cache directory exists: {e_unexpected_mkdir}"
)
def _get_tile_cache_file_path(self, z: int, x: int, y: int) -> Path:
"""
Constructs the full local file path for a specific map tile.
The structure is: <service_cache_dir>/<zoom>/<x_tile>/<y_tile>.png
"""
return self.service_specific_cache_dir / str(z) / str(x) / f"{y}.png"
def _load_tile_from_cache(
self, tile_cache_path: Path, tile_coordinates_log_str: str
) -> Optional[ImageType]:
"""Attempts to load a tile image from a cache file (thread-safe read)."""
logger.debug(f"Checking cache for tile {tile_coordinates_log_str} at {tile_cache_path}")
try:
with self._cache_access_lock: # Protect file system access
if tile_cache_path.is_file():
logger.info(f"Cache hit for tile {tile_coordinates_log_str}. Loading from disk.")
pil_image = Image.open(tile_cache_path) # type: ignore
pil_image.load() # Load image data into memory to release file lock sooner
# Ensure consistency by converting to RGB if needed
# MODIFIED: Ensure consistency by converting to RGB or RGBA depending on service need (currently RGB).
# WHY: Consistent format is important for image processing and display.
# HOW: Convert to RGB.
if pil_image.mode != "RGB":
logger.debug(
f"Converting cached image {tile_coordinates_log_str} from mode {pil_image.mode} to RGB."
)
pil_image = pil_image.convert("RGB")
return pil_image
else:
logger.debug(f"Cache miss for tile {tile_coordinates_log_str}.")
return None
except IOError as e_io_cache:
logger.error(
f"IOError reading cached tile {tile_cache_path}: {e_io_cache}. Treating as cache miss."
)
return None
except Exception as e_cache_unexpected:
logger.exception(
f"Unexpected error accessing cache file {tile_cache_path}: {e_cache_unexpected}"
)
return None
def _download_and_save_tile_to_cache(
self,
zoom_level: int,
tile_x: int,
tile_y: int,
tile_cache_path: Path,
tile_coordinates_log_str: str
) -> Optional[ImageType]:
"""Attempts to download a tile, process it, and save it to the cache."""
if not self.is_online_fetching_enabled:
logger.debug(f"Online fetching disabled. Cannot download tile {tile_coordinates_log_str}.")
return None
tile_download_url = self.map_service.get_tile_url(zoom_level, tile_x, tile_y)
if not tile_download_url:
logger.error(f"Failed to get URL for tile {tile_coordinates_log_str} from service.")
return None
logger.info(f"Downloading tile {tile_coordinates_log_str} from: {tile_download_url}")
downloaded_pil_image: Optional[ImageType] = None
for attempt_num in range(self.download_max_retries + 1):
try:
response = requests.get( # type: ignore
tile_download_url,
headers=self.http_request_headers,
timeout=self.http_request_timeout_seconds,
stream=True # Efficient for binary content
)
response.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx)
image_binary_data = response.content
if not image_binary_data:
logger.warning(f"Downloaded empty content for tile {tile_coordinates_log_str}.")
break # Stop retrying if server sends empty response
# Process downloaded image data and save to cache
try:
pil_image = Image.open(io.BytesIO(image_binary_data)) # type: ignore
pil_image.load()
# MODIFIED: Convert downloaded image to RGB mode before saving/returning.
# WHY: Consistency in image format within the manager.
# HOW: Added .convert("RGB").
if pil_image.mode != "RGB":
pil_image = pil_image.convert("RGB")
# Optional: Resize downloaded tile if its size doesn't match self.tile_size
# This would be needed if the service URL returns tiles of different sizes,
# which is uncommon for standard XYZ services, but could happen.
# For standard services, the service.tile_size should be correct.
# if pil_image.size != (self.tile_size, self.tile_size):
# logger.warning(f"Downloaded tile size {pil_image.size} doesn't match expected {self.tile_size}. Resizing.")
# pil_image = pil_image.resize((self.tile_size, self.tile_size), Image.Resampling.LANCZOS)
logger.debug(f"Tile {tile_coordinates_log_str} downloaded (Attempt {attempt_num + 1}).")
self._save_image_to_cache_file(tile_cache_path, pil_image)
downloaded_pil_image = pil_image
break # Success, exit retry loop
except (IOError, Image.UnidentifiedImageError) as e_img_proc: # type: ignore
logger.error(
f"Failed to process image data for {tile_coordinates_log_str}: {e_img_proc}"
)
break # Don't retry if image data is corrupt
except Exception as e_proc_unexpected:
logger.exception(
f"Unexpected error processing downloaded image {tile_coordinates_log_str}: {e_proc_unexpected}"
)
break
except requests.exceptions.Timeout: # type: ignore
logger.warning(
f"Timeout downloading tile {tile_coordinates_log_str} (Attempt {attempt_num + 1})."
)
except requests.exceptions.RequestException as e_req: # type: ignore
status = getattr(e_req.response, "status_code", "N/A")
logger.warning(
f"Request error for tile {tile_coordinates_log_str} (Status: {status}, Attempt {attempt_num + 1}): {e_req}"
)
if status == 404: break # No point retrying a 404 Not Found
except Exception as e_dl_unexpected:
logger.exception(
f"Unexpected error downloading tile {tile_coordinates_log_str} (Attempt {attempt_num + 1}): {e_dl_unexpected}"
)
break # Stop on other unexpected download errors
if attempt_num < self.download_max_retries:
logger.debug(f"Waiting {self.download_retry_delay_seconds}s before retrying for {tile_coordinates_log_str}.")
time.sleep(self.download_retry_delay_seconds)
if downloaded_pil_image is None:
logger.error(f"Failed to download tile {tile_coordinates_log_str} after all retries.")
return downloaded_pil_image
def _save_image_to_cache_file(self, tile_cache_path: Path, pil_image: ImageType) -> None:
"""Saves a PIL Image object to a file in the cache (thread-safe write)."""
with self._cache_access_lock: # Protect file system access
try:
# Ensure parent directory for the tile file exists
tile_cache_path.parent.mkdir(parents=True, exist_ok=True)
# Use 'png' format explicitly as it's lossless and common for map tiles
pil_image.save(tile_cache_path, format='PNG') # MODIFIED: Explicitly save as PNG. WHY: Standard format.
logger.debug(f"Saved tile to cache: {tile_cache_path}")
except IOError as e_io_save:
logger.error(f"IOError saving tile to cache {tile_cache_path}: {e_io_save}")
except Exception as e_save_unexpected:
logger.exception(
f"Unexpected error saving tile to cache {tile_cache_path}: {e_save_unexpected}"
)
def get_tile_image(
self,
zoom_level: int,
tile_x: int,
tile_y: int,
force_online_refresh: bool = False
) -> Optional[ImageType]:
"""
Retrieves a map tile image, using local cache first.
If not cached or refresh is forced, attempts to download (if enabled).
Returns a placeholder image if the tile cannot be retrieved.
Args:
zoom_level: The zoom level of the tile.
tile_x: The X coordinate of the tile.
tile_y: The Y coordinate of the tile.
force_online_refresh: If True, bypasses cache and attempts download.
Returns:
A PIL Image object of the tile, or a placeholder image on failure.
Returns None only if placeholder creation itself fails critically.
"""
tile_coords_log_str = f"({zoom_level},{tile_x},{tile_y})"
logger.debug(f"Requesting tile image for {tile_coords_log_str}")
# MODIFIED: Check if the zoom level is valid for the map service.
# WHY: Avoid requesting tiles for invalid zoom levels from the service or cache.
# HOW: Added a check using self.map_service.is_zoom_level_valid.
if not self.map_service.is_zoom_level_valid(zoom_level):
logger.error(f"Invalid zoom level {zoom_level} for map service '{self.service_identifier_name}'. Cannot get tile.")
# Return a placeholder for invalid zoom levels
return self._create_placeholder_tile_image(f"Invalid Zoom {zoom_level}")
tile_cache_file = self._get_tile_cache_file_path(zoom_level, tile_x, tile_y)
retrieved_image: Optional[ImageType] = None
if not force_online_refresh:
retrieved_image = self._load_tile_from_cache(tile_cache_file, tile_coords_log_str)
if retrieved_image is None: # Cache miss or force_refresh
retrieved_image = self._download_and_save_tile_to_cache(
zoom_level, tile_x, tile_y, tile_cache_file, tile_coords_log_str
)
if retrieved_image is None: # All attempts failed
logger.warning(f"Failed to retrieve tile {tile_coords_log_str}. Using placeholder.")
# MODIFIED: Pass tile coordinates to placeholder for debugging/visual info.
# WHY: Helps identify which specific tile failed when looking at the stitched map.
# HOW: Pass a string identifier to the placeholder creation function.
retrieved_image = self._create_placeholder_tile_image(tile_coords_log_str)
if retrieved_image is None: # Should be rare if Pillow is working
logger.critical("Failed to create even a placeholder tile. Returning None.")
return retrieved_image
def stitch_map_image(
self,
zoom_level: int,
x_tile_range: Tuple[int, int], # (min_x, max_x)
y_tile_range: Tuple[int, int] # (min_y, max_y)
) -> Optional[ImageType]:
"""
Retrieves and stitches multiple map tiles to form a larger composite map image.
Args:
zoom_level: The zoom level for all tiles.
x_tile_range: Inclusive start and end X tile coordinates (min_x, max_x).
y_tile_range: Inclusive start and end Y tile coordinates (min_y, max_y).
Returns:
A PIL Image object of the stitched map, or None if a critical error occurs.
Missing individual tiles are replaced by placeholders.
"""
logger.info(
f"Request to stitch map: Zoom {zoom_level}, X-Range {x_tile_range}, Y-Range {y_tile_range}"
)
min_tile_x, max_tile_x = x_tile_range
min_tile_y, max_tile_y = y_tile_range
# Basic validation of ranges
if not (min_tile_x <= max_tile_x and min_tile_y <= max_tile_y):
logger.error(f"Invalid tile ranges for stitching: X={x_tile_range}, Y={y_tile_range}")
return None
# MODIFIED: Use the tile_size attribute of the manager.
# WHY: Consistency. The manager's size should be used, not necessarily the service's size again here.
# HOW: Changed self.map_service.tile_size to self.tile_size.
single_tile_pixel_size = self.tile_size
if single_tile_pixel_size <= 0:
logger.error(f"Invalid tile size ({single_tile_pixel_size}) stored in manager. Cannot stitch.")
# MODIFIED: Return placeholder instead of None on invalid tile size.
# WHY: Provide a visual indication that stitching failed due to config, rather than a blank window.
# HOW: Create and return a large placeholder image.
try:
# Ensure Pillow/ImageDraw are available for placeholder creation
if PIL_AVAILABLE_MANAGER and ImageDraw is not None: # type: ignore
# Create a large placeholder image (e.g., 3x3 tiles size)
placeholder_img = Image.new("RGB", (256 * 3, 256 * 3), DEFAULT_PLACEHOLDER_COLOR_RGB) # Use a fixed reasonable size for error image
draw = ImageDraw.Draw(placeholder_img) # type: ignore
# Add error text
error_text = f"Stitch Failed\nInvalid Tile Size:\n{single_tile_pixel_size}"
# This simple text drawing assumes basic PIL text capabilities
try:
# Try drawing with a font loaded in image_processor
from geoelevation.image_processor import DEFAULT_FONT # Assume font loading logic in image_processor
font_to_use = DEFAULT_FONT # type: ignore # Use font loaded in image_processor
if font_to_use:
# Calculate text size and position using the font
# Note: textbbox requires Pillow >= 8.0
try:
text_bbox = draw.textbbox((0,0), error_text, font=font_to_use, spacing=2) # type: ignore
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
text_pos = ((placeholder_img.width - text_width) // 2, (placeholder_img.height - text_height) // 2)
draw.text(text_pos, error_text, fill="black", font=font_to_use, spacing=2, anchor="lt") # type: ignore # Draw text using font
except AttributeError: # Fallback for textbbox if Pillow < 8.0
text_width, text_height = draw.textsize(error_text, font=font_to_use) # type: ignore
text_pos = ((placeholder_img.width - text_width) // 2, (placeholder_img.height - text_height) // 2)
draw.text(text_pos, error_text, fill="black", font=font_to_use) # type: ignore # Draw text using font fallback
except Exception as e_font_draw:
logger.warning(f"Error drawing text with font on placeholder: {e_font_draw}. Falling back to simple draw.")
draw.text((10, 10), error_text, fill="black") # type: ignore # Simple draw if font drawing fails
else:
draw.text((10, 10), error_text, fill="black") # type: ignore # Simple draw if no font was loaded
except Exception as e_draw:
logger.warning(f"Error drawing text on placeholder: {e_draw}. Falling back to simple draw.")
draw.text((10, 10), error_text, fill="black") # type: ignore # Final fallback
return placeholder_img
else:
logger.error("Pillow or ImageDraw not available to create placeholder image.")
return None # Cannot create placeholder without PIL
except Exception as e_placeholder_fail:
logger.exception(f"Failed to create large placeholder for stitching error: {e_placeholder_fail}")
return None # Return None if placeholder creation fails
num_tiles_wide = (max_tile_x - min_tile_x) + 1
num_tiles_high = (max_tile_y - min_tile_y) + 1
total_image_width = num_tiles_wide * single_tile_pixel_size
total_image_height = num_tiles_high * single_tile_pixel_size
logger.debug(
f"Stitched image dimensions: {total_image_width}x{total_image_height} "
f"({num_tiles_wide}x{num_tiles_high} tiles of {single_tile_pixel_size}px)"
)
# Handle potential excessively large image size request
MAX_IMAGE_DIMENSION = 16384 # Arbitrary limit to prevent crashes with massive requests
if total_image_width > MAX_IMAGE_DIMENSION or total_image_height > MAX_IMAGE_DIMENSION:
logger.error(
f"Requested stitched image size ({total_image_width}x{total_image_height}) "
f"exceeds maximum allowed dimension ({MAX_IMAGE_DIMENSION}). Aborting stitch."
)
# Return placeholder for excessive size request
try:
if PIL_AVAILABLE_MANAGER and ImageDraw is not None: # type: ignore
placeholder_img = Image.new("RGB", (256 * 3, 256 * 3), (255, 100, 100)) # Reddish placeholder
draw = ImageDraw.Draw(placeholder_img) # type: ignore
error_text = f"Stitch Failed\nImage too large:\n{total_image_width}x{total_image_height}px"
try:
from geoelevation.image_processor import DEFAULT_FONT
font_to_use = DEFAULT_FONT # type: ignore
if font_to_use:
try:
text_bbox = draw.textbbox((0,0), error_text, font=font_to_use, spacing=2) # type: ignore
text_w = text_bbox[2] - text_bbox[0]
text_h = text_bbox[3] - text_bbox[1]
text_pos = ((placeholder_img.width - text_w) // 2, (placeholder_img.height - text_h) // 2)
draw.text(text_pos, error_text, fill="black", font=font_to_use, spacing=2, anchor="lt") # type: ignore
except AttributeError:
text_w, text_h = draw.textsize(error_text, font=font_to_use) # type: ignore
text_pos = ((placeholder_img.width - text_w) // 2, (placeholder_img.height - text_h) // 2)
draw.text(text_pos, error_text, fill="black", font=font_to_use) # type: ignore
except Exception as e_font_draw:
logger.warning(f"Error drawing text with font on placeholder: {e_font_draw}. Falling back to simple draw.")
draw.text((10, 10), error_text, fill="black") # type: ignore
else:
draw.text((10, 10), error_text, fill="black") # type: ignore
except Exception as e_draw:
logger.warning(f"Error drawing text on placeholder: {e_draw}. Falling back to simple draw.")
draw.text((10, 10), error_text, fill="black") # type: ignore # Final fallback
return placeholder_img
else:
logger.error("Pillow or ImageDraw not available to create placeholder image.")
return None # Cannot create placeholder without PIL
except Exception as e_placeholder_fail:
logger.exception(f"Failed to create large placeholder for size error: {e_placeholder_fail}")
return None # Return None if placeholder fails
try:
# Create a new blank RGB image to paste tiles onto
# MODIFIED: Ensure PIL_AVAILABLE_MANAGER is true before creating Image.new.
# WHY: Avoids NameError if PIL import failed.
# HOW: Added check.
if PIL_AVAILABLE_MANAGER:
stitched_map_image = Image.new("RGB", (total_image_width, total_image_height)) # type: ignore
else:
raise ImportError("Pillow not available to create new image.")
except Exception as e_create_blank:
logger.exception(f"Failed to create blank image for stitching: {e_create_blank}. Dimensions: {total_image_width}x{total_image_height}")
# Return placeholder if blank image creation fails (e.g., out of memory)
try:
if PIL_AVAILABLE_MANAGER and ImageDraw is not None: # type: ignore
placeholder_img = Image.new("RGB", (256 * 3, 256 * 3), (255, 100, 100)) # Reddish placeholder
draw = ImageDraw.Draw(placeholder_img) # type: ignore
error_text = f"Stitch Failed\nCannot create image:\n{e_create_blank}"
try:
from geoelevation.image_processor import DEFAULT_FONT
font_to_use = DEFAULT_FONT # type: ignore
if font_to_use:
try:
text_bbox = draw.textbbox((0,0), error_text, font=font_to_use, spacing=2) # type: ignore
text_w = text_bbox[2] - text_bbox[0]
text_h = text_bbox[3] - text_bbox[1]
text_pos = ((placeholder_img.width - text_w) // 2, (placeholder_img.height - text_h) // 2)
draw.text(text_pos, error_text, fill="black", font=font_to_use, spacing=2, anchor="lt") # type: ignore
except AttributeError:
text_w, text_h = draw.textsize(error_text, font=font_to_use) # type: ignore
text_pos = ((placeholder_img.width - text_w) // 2, (placeholder_img.height - text_h) // 2)
draw.text(text_pos, error_text, fill="black", font=font_to_use) # type: ignore
except Exception as e_font_draw:
logger.warning(f"Error drawing text with font on placeholder: {e_font_draw}. Falling back to simple draw.")
draw.text((10, 10), error_text, fill="black") # type: ignore
else:
draw.text((10, 10), error_text, fill="black") # type: ignore
except Exception as e_draw:
logger.warning(f"Error drawing text on placeholder: {e_draw}. Falling back to simple draw.")
draw.text((10, 10), error_text, fill="black") # type: ignore # Final fallback
return placeholder_img
else:
logger.error("Pillow or ImageDraw not available to create placeholder image.")
return None # Cannot create placeholder without PIL
except Exception as e_placeholder_fail:
logger.exception(f"Failed to create large placeholder for memory error: {e_placeholder_fail}")
return None # Return None if placeholder fails
# Iterate through the required tile coordinates, fetch, and paste
for row_index, current_tile_y in enumerate(range(min_tile_y, max_tile_y + 1)):
for col_index, current_tile_x in enumerate(range(min_tile_x, max_tile_x + 1)):
tile_image_pil = self.get_tile_image(zoom_level, current_tile_x, current_tile_y)
if tile_image_pil is None:
# This implies even placeholder creation failed, which is critical.
logger.critical(
f"Critical error: get_tile_image returned None for ({zoom_level},{current_tile_x},{current_tile_y}). Aborting stitch."
)
return None # Abort stitching on critical tile failure
# Calculate top-left pixel position to paste this tile
paste_position_x = col_index * single_tile_pixel_size
paste_position_y = row_index * single_tile_pixel_size
logger.debug(
f"Pasting tile ({zoom_level},{current_tile_x},{current_tile_y}) at "
f"pixel position ({paste_position_x},{paste_position_y})"
)
try:
# Ensure the tile image is the correct size before pasting
# MODIFIED: Check if tile_image_pil is valid before checking its size.
# WHY: Avoids AttributeError if tile_image_pil is None (shouldn't happen if get_tile_image handles None, but defensive).
# HOW: Added `if tile_image_pil and tile_image_pil.size...`.
if tile_image_pil and tile_image_pil.size != (self.tile_size, self.tile_size):
# This might happen if the downloaded tile or placeholder was the wrong size.
# Resize it to match the expected tile size for stitching consistency.
logger.warning(f"Tile image size {tile_image_pil.size} doesn't match expected {self.tile_size}. Resizing for stitch.")
# MODIFIED: Check PIL_AVAILABLE_MANAGER before resizing.
# WHY: Resize requires PIL.
# HOW: Added check.
if PIL_AVAILABLE_MANAGER:
tile_image_pil = tile_image_pil.resize((self.tile_size, self.tile_size), Image.Resampling.LANCZOS) # type: ignore
else:
logger.error("Pillow not available, cannot resize tile for stitch.")
# Decide fallback: skip pasting this tile or use placeholder?
# Leaving it blank might be okay, or replace with a placeholder of correct size.
# Let's just leave it blank (skip paste) if resize fails due to missing lib.
continue # Skip pasting this tile
# MODIFIED: Check if tile_image_pil is still valid before pasting.
# WHY: It might have become None if resize failed due to missing PIL.
# HOW: Added `if tile_image_pil:`.
if tile_image_pil:
stitched_map_image.paste(tile_image_pil, (paste_position_x, paste_position_y)) # type: ignore
except Exception as e_paste:
logger.exception(
f"Error pasting tile ({zoom_level},{current_tile_x},{current_tile_y}) "
f"at ({paste_position_x},{paste_position_y}): {e_paste}"
)
# Continue, leaving that part of the map blank or with placeholder color
logger.info(f"Map stitching complete for zoom {zoom_level}, X={x_tile_range}, Y={y_tile_range}.")
return stitched_map_image
def _create_placeholder_tile_image(self, identifier: str = "N/A") -> Optional[ImageType]:
"""
Creates and returns a placeholder tile image (e.g., a grey square).
Includes optional text identifier on the placeholder.
"""
# MODIFIED: Added check for ImageDraw availability.
# WHY: Drawing on placeholders requires ImageDraw.
# HOW: Added check.
if not (PIL_AVAILABLE_MANAGER and ImageDraw is not None):
logger.warning("Cannot create placeholder tile: Pillow or ImageDraw library not available.")
return None
try:
tile_pixel_size = self.tile_size # Use the manager's stored tile size
# Ensure placeholder_color is a valid RGB tuple
placeholder_color = DEFAULT_PLACEHOLDER_COLOR_RGB
# No need to re-validate color if it's a fixed constant, but defensive check
# if not (isinstance(placeholder_color, tuple) and len(placeholder_color) == 3 and
# all(isinstance(c, int) and 0 <= c <= 255 for c in placeholder_color)):
# logger.warning(f"Invalid placeholder color '{placeholder_color}'. Using default grey.")
# placeholder_color = (220, 220, 220)
placeholder_img = Image.new("RGB", (tile_pixel_size, tile_pixel_size), color=placeholder_color) # type: ignore
draw = ImageDraw.Draw(placeholder_img) # type: ignore
# Add text overlay indicating failure and identifier
overlay_text = f"Tile Fail\n{identifier}"
try:
# Attempt to use a font loaded in image_processor for consistency
from geoelevation.image_processor import DEFAULT_FONT # Assume font loading logic exists
font_to_use = DEFAULT_FONT # type: ignore # Use the shared loaded font
# Calculate text position for centering or top-left
# Using textbbox for accurate size calculation (requires Pillow >= 8.0)
try:
# textbbox returns (left, top, right, bottom) relative to the anchor (0,0)
text_bbox = draw.textbbox((0,0), overlay_text, font=font_to_use, spacing=2) # type: ignore
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
# Center the text (approx)
text_x = (tile_pixel_size - text_width) // 2
text_y = (tile_pixel_size - text_height) // 2
# Draw text with the loaded font, anchored at the top-left of the text bbox
draw.text((text_x, text_y), overlay_text, fill="black", font=font_to_use, spacing=2, anchor="lt") # type: ignore
except AttributeError: # Fallback for textbbox if Pillow < 8.0
logger.warning("Pillow textbbox not available (Pillow < 8.0). Using textsize fallback.")
# textsize might not handle multiline spacing well
text_width, text_height = draw.textsize(overlay_text, font=font_to_use) # type: ignore
# Add approximated height for multiline if needed
if "\n" in overlay_text:
line_count = overlay_text.count("\n") + 1
text_height += line_count * 2 # Rough approximation
# Center text based on textsize (less accurate for multiline)
text_x = (tile_pixel_size - text_width) // 2
text_y = (tile_pixel_size - text_height) // 2
draw.text((text_x, text_y), overlay_text, fill="black", font=font_to_use) # type: ignore # Draw text using font fallback
except Exception as e_font_draw:
logger.warning(f"Error drawing text with font on placeholder: {e_font_draw}. Falling back to simple draw.")
# Fallback to simple draw if font drawing fails
draw.text((10, 10), overlay_text, fill="black") # type: ignore # Simple draw near top-left
except Exception as e_draw:
logger.warning(f"Error drawing text on placeholder: {e_draw}. Falling back to simple draw.")
draw.text((10, 10), overlay_text, fill="black") # type: ignore # Final fallback
return placeholder_img
except Exception as e_placeholder:
logger.exception(f"Error creating placeholder tile image: {e_placeholder}")
return None
def _get_bounds_for_tile_range(
self,
zoom: int,
tile_ranges: Tuple[Tuple[int, int], Tuple[int, int]] # ((min_x, max_x), (min_y, max_y))
) -> Optional[Tuple[float, float, float, float]]: # (west, south, east, north)
"""
Calculates the precise geographic bounds covered by a given range of tiles.
This method might be better placed in map_utils if mercantile is available there,
or kept here if MapTileManager is the primary user of mercantile for this.
Requires 'mercantile' library.
"""
# Check if mercantile is available (it should be if MapTileManager initialized without error)
try:
import mercantile as local_mercantile # Local import for this method
# MODIFIED: Check if mercantile is actually available after import attempt.
# WHY: Defend against scenarios where the import succeeds but mercantile is None.
# HOW: Add explicit check.
if local_mercantile is None:
raise ImportError("mercantile is None after import.")
except ImportError:
logger.error("mercantile library not found, cannot calculate bounds for tile range.")
return None
try:
min_x, max_x = tile_ranges[0]
min_y, max_y = tile_ranges[1]
# Get bounds of the top-left tile and bottom-right tile
# mercantile.bounds(x, y, z) returns (west, south, east, north)
top_left_tile_bounds = local_mercantile.bounds(min_x, min_y, zoom)
bottom_right_tile_bounds = local_mercantile.bounds(max_x, max_y, zoom)
# The overall bounding box is:
# West longitude from the top-left tile
# South latitude from the bottom-right tile
# East longitude from the bottom-right tile
# North latitude from the top-left tile
overall_west_lon = top_left_tile_bounds.west
overall_south_lat = bottom_right_tile_bounds.south
overall_east_lon = bottom_right_tile_bounds.east
overall_north_lat = top_left_tile_bounds.north
return (overall_west_lon, overall_south_lat, overall_east_lon, overall_north_lat)
except Exception as e_bounds_calc:
logger.exception(
f"Error calculating geographic bounds for tile range {tile_ranges} at zoom {zoom}: {e_bounds_calc}"
)
return None
def clear_entire_service_cache(self) -> None:
"""Deletes all cached tiles for the current map service."""
logger.info(f"Attempting to clear entire cache for service '{self.service_identifier_name}' at {self.service_specific_cache_dir}")
if not self.service_specific_cache_dir.exists():
logger.warning(f"Cache directory '{self.service_specific_cache_dir}' does not exist. Nothing to clear.")
return
with self._cache_access_lock: # Ensure exclusive access during deletion
try:
if self.service_specific_cache_dir.is_dir():
shutil.rmtree(self.service_specific_cache_dir)
logger.info(f"Successfully cleared cache at {self.service_specific_cache_dir}.")
# Recreate the base directory for this service after clearing
self._ensure_service_cache_directory_exists()
else:
logger.warning(f"Cache path '{self.service_specific_cache_dir}' is not a directory.")
except OSError as e_os_clear:
logger.error(f"OS Error clearing cache at '{self.service_specific_cache_dir}': {e_os_clear}")
except Exception as e_clear_unexpected:
logger.exception(
f"Unexpected error clearing cache '{self.service_specific_cache_dir}': {e_clear_unexpected}"
)

View File

@ -1,250 +0,0 @@
# geoelevation/map_viewer/map_services.py
"""
Defines an abstract base class for map tile services and provides concrete
implementations for specific map providers (e.g., OpenStreetMap).
This allows the application to interact with different map sources through a
common, well-defined interface, facilitating extensibility to other services.
"""
# Standard library imports
import abc # Abstract Base Classes
import logging
from urllib.parse import urlparse # For basic URL validation
from typing import Optional, Tuple, Dict, Any
# Module-level logger
logger = logging.getLogger(__name__)
class BaseMapService(abc.ABC):
"""
Abstract Base Class for map tile service providers.
Subclasses must implement the 'name', 'attribution', and 'get_tile_url'
properties and methods.
"""
DEFAULT_TILE_PIXEL_SIZE: int = 256
DEFAULT_MAX_ZOOM_LEVEL: int = 19 # Common for many services like OSM
def __init__(
self,
service_api_key: Optional[str] = None,
tile_pixel_dim: int = DEFAULT_TILE_PIXEL_SIZE,
max_supported_zoom: int = DEFAULT_MAX_ZOOM_LEVEL
) -> None:
"""
Initializes the BaseMapService.
Args:
service_api_key: API key required by the service, if any.
tile_pixel_dim: The pixel dimension (width/height) of map tiles.
max_supported_zoom: The maximum zoom level supported by this service.
"""
# Use class name of the concrete subclass for logging
self._service_log_prefix = f"[{self.__class__.__name__}]"
logger.debug(f"{self._service_log_prefix} Initializing base map service.")
self.api_key: Optional[str] = service_api_key
self.tile_size: int = tile_pixel_dim
self.max_zoom: int = max_supported_zoom
# Validate provided tile_size and max_zoom
if not (isinstance(self.tile_size, int) and self.tile_size > 0):
logger.warning(
f"{self._service_log_prefix} Invalid tile_size '{self.tile_size}'. "
f"Using default: {self.DEFAULT_TILE_PIXEL_SIZE}px."
)
self.tile_size = self.DEFAULT_TILE_PIXEL_SIZE
# Practical limits for Web Mercator zoom levels
if not (isinstance(self.max_zoom, int) and 0 <= self.max_zoom <= 25):
logger.warning(
f"{self._service_log_prefix} Invalid max_zoom '{self.max_zoom}'. "
f"Using default: {self.DEFAULT_MAX_ZOOM_LEVEL}."
)
self.max_zoom = self.DEFAULT_MAX_ZOOM_LEVEL
@property
@abc.abstractmethod
def name(self) -> str:
"""
Returns the unique, short name of the map service (e.g., 'osm').
This is used for identification and potentially for cache directory naming.
"""
pass
@property
@abc.abstractmethod
def attribution(self) -> str:
"""
Returns the required attribution text for the map service.
This text should be displayed whenever map tiles from this service are shown.
"""
pass
@abc.abstractmethod
def get_tile_url(self, z: int, x: int, y: int) -> Optional[str]:
"""
Generates the full URL for a specific map tile based on its ZXY coordinates.
Args:
z: The zoom level of the tile.
x: The X coordinate of the tile.
y: The Y coordinate of the tile.
Returns:
The fully formed URL string for the tile, or None if the zoom level
is invalid for this service or if URL construction fails.
"""
pass
def is_zoom_level_valid(self, zoom_level: int) -> bool:
"""
Checks if the requested zoom level is within the valid range for this service.
Args:
zoom_level: The zoom level to validate.
Returns:
True if the zoom level is valid, False otherwise.
"""
is_valid = 0 <= zoom_level <= self.max_zoom
if not is_valid:
logger.warning(
f"{self._service_log_prefix} Requested zoom level {zoom_level} is outside "
f"the valid range [0, {self.max_zoom}] for this service."
)
return is_valid
def _is_generated_url_structurally_valid(self, url_string: str) -> bool:
"""Performs a basic structural validation of a generated URL string."""
if not url_string: # Check for empty string
logger.error(f"{self._service_log_prefix} Generated URL is empty.")
return False
try:
parsed_url = urlparse(url_string)
# A valid URL typically has a scheme (http/https) and a netloc (domain name).
has_scheme_and_netloc = bool(parsed_url.scheme and parsed_url.netloc)
if not has_scheme_and_netloc:
logger.error(f"{self._service_log_prefix} Generated URL '{url_string}' appears malformed (missing scheme or netloc).")
return has_scheme_and_netloc
except Exception as e_url_parse: # Catch potential errors from urlparse itself
logger.error(
f"{self._service_log_prefix} Error parsing generated URL '{url_string}': {e_url_parse}"
)
return False
def __repr__(self) -> str:
"""Provides a concise and informative string representation of the service object."""
return (
f"<{self.__class__.__name__}(Name: '{self.name}', MaxZoom: {self.max_zoom}, TileSize: {self.tile_size})>"
)
class OpenStreetMapService(BaseMapService):
"""
Concrete implementation for fetching map tiles from OpenStreetMap (OSM).
This service does not require an API key.
"""
SERVICE_IDENTIFIER_NAME: str = "osm"
SERVICE_ATTRIBUTION_TEXT: str = \
"© OpenStreetMap contributors (openstreetmap.org/copyright)"
# Standard URL template for OSM tiles. Subdomains (a,b,c) can be used for load balancing.
TILE_URL_TEMPLATE: str = "https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png"
# OSM standard tile servers typically support up to zoom level 19.
OSM_MAX_ZOOM_LEVEL: int = 19
# Optional: cycle through subdomains for better load distribution
SUBDOMAINS: Tuple[str, ...] = ('a', 'b', 'c')
_subdomain_index: int = 0 # Class variable for simple round-robin
def __init__(self) -> None:
"""Initializes the OpenStreetMap tile service."""
super().__init__(
service_api_key=None, # OSM does not require an API key
max_supported_zoom=self.OSM_MAX_ZOOM_LEVEL
)
logger.info(f"{self._service_log_prefix} OpenStreetMap service instance ready.")
@property
def name(self) -> str:
"""Returns the unique name for the OpenStreetMap service."""
return self.SERVICE_IDENTIFIER_NAME
@property
def attribution(self) -> str:
"""Returns the required attribution text for OpenStreetMap."""
return self.SERVICE_ATTRIBUTION_TEXT
def get_tile_url(self, z: int, x: int, y: int) -> Optional[str]:
"""
Generates the tile URL for an OpenStreetMap tile.
Args:
z: The zoom level.
x: The tile X coordinate.
y: The tile Y coordinate.
Returns:
The tile URL string, or None if the zoom level is invalid.
"""
if not self.is_zoom_level_valid(z):
# Warning logged by is_zoom_level_valid
return None
# Simple round-robin for subdomains
subdomain = self.SUBDOMAINS[OpenStreetMapService._subdomain_index % len(self.SUBDOMAINS)]
OpenStreetMapService._subdomain_index += 1
try:
# Format the URL using the class template and selected subdomain
tile_url = self.TILE_URL_TEMPLATE.format(s=subdomain, z=z, x=x, y=y)
if not self._is_generated_url_structurally_valid(tile_url):
# Error logged by _is_generated_url_structurally_valid
return None
logger.debug(f"{self._service_log_prefix} Generated URL for ({z},{x},{y}): {tile_url}")
return tile_url
except Exception as e_url_format: # Catch potential errors during .format()
logger.error(
f"{self._service_log_prefix} Error formatting tile URL for ({z},{x},{y}): {e_url_format}"
)
return None
# --- Factory Function to Get Map Service Instances ---
def get_map_service_instance(
service_name_key: str,
api_key_value: Optional[str] = None
) -> Optional[BaseMapService]:
"""
Factory function to create and return an instance of a specific map service.
Args:
service_name_key: The unique string identifier for the desired service (e.g., 'osm').
api_key_value: The API key, if required by the selected service.
Returns:
An instance of a BaseMapService subclass, or None if the service_name_key
is unknown or if a required API key is missing.
"""
log_prefix_factory = "[MapServiceFactory]"
normalized_service_name = service_name_key.lower().strip()
logger.debug(f"{log_prefix_factory} Requesting map service instance for '{normalized_service_name}'.")
if normalized_service_name == OpenStreetMapService.SERVICE_IDENTIFIER_NAME:
return OpenStreetMapService()
# Example for a future service requiring an API key:
# elif normalized_service_name == "some_other_service_key":
# if api_key_value:
# return SomeOtherMapService(api_key=api_key_value)
# else:
# logger.error(f"{log_prefix_factory} API key is required for '{normalized_service_name}' but was not provided.")
# return None
else:
logger.error(f"{log_prefix_factory} Unknown map service name specified: '{service_name_key}'.")
return None

File diff suppressed because it is too large Load Diff

View File

@ -226,55 +226,62 @@ def run_map_viewer_process_target(
child_map_viewer_instance: Optional[Any] = None
critical_libs_available = False
try:
# Local imports needed by this target function
from geoelevation.map_viewer.geo_map_viewer import GeoElevationMapViewer as ChildGeoMapViewer
# Local imports: use the external map_manager package for map rendering and engine
from map_manager.engine import MapEngine as ChildMapEngine
from map_manager.visualizer import MapVisualizer as ChildMapVisualizer
from geoelevation.elevation_manager import ElevationManager as ChildElevationManager
import cv2 as child_cv2 # Ensure cv2 is available in the child process
# Also check other libraries that geo_map_viewer depends on and checks internally
# If GeoElevationMapViewer import succeeds, it means PIL and Mercantile were likely available,
# as GeoElevationMapViewer's __init__ raises ImportError if they're missing.
# We still need cv2 and numpy for the loop itself.
import numpy as child_np # Ensure numpy is available
import cv2 as child_cv2 # Optional, used by MapVisualizer if available
import numpy as child_np
critical_libs_available = True
except ImportError as e_child_imp_map:
child_logger.critical(f"CRITICAL: Map viewer components or essential libraries not found in child process: {e_child_imp_map}", exc_info=True)
# Send an error message back to the GUI queue if critical imports fail in child process.
error_payload = {"type": "map_info_update", "latitude": None, "longitude": None,
"latitude_dms_str": "Error", "longitude_dms_str": "Error",
"elevation_str": f"Fatal Error: {type(e_child_imp_map).__name__} (Import)",
"map_area_size_str": "Map System N/A"}
try: map_interaction_q.put(error_payload)
except Exception as e_put_err: child_logger.error(f"Failed to put error payload to queue before exit: {e_put_err}")
return # Exit if critical libraries are missing
return
# --- Initialize Map Viewer and Run Main Loop ---
try:
child_logger.info(f"Initializing GeoElevationMapViewer instance for mode '{operation_mode}', display scale: {display_scale_factor:.2f}...")
# Initialize ElevationManager and GeoElevationMapViewer within the child process
# Each process needs its own instance, but they share the filesystem cache.
# Initialize ElevationManager and the map engine/visualizer from external package
local_em = ChildElevationManager(tile_directory=dem_data_cache_dir)
# Initialize map engine and visualizer
engine = ChildMapEngine(service_name='osm', cache_dir=dem_data_cache_dir, enable_online=True)
visual = ChildMapVisualizer(engine)
# MODIFIED: Pass initial view parameters to the GeoElevationMapViewer constructor.
# WHY: The class now loads the initial view internally based on these parameters.
# HOW: Added initial_operation_mode, initial_point_coords, initial_area_bbox arguments.
# The constructor will handle which parameters are relevant based on initial_operation_mode.
child_map_viewer_instance = ChildGeoMapViewer(
elevation_manager_instance=local_em,
gui_output_communication_queue=map_interaction_q,
initial_display_scale=display_scale_factor,
initial_operation_mode=operation_mode,
initial_point_coords=(center_latitude, center_longitude) if operation_mode == "point" else None,
initial_area_bbox=area_bounding_box if operation_mode == "area" else None
)
child_logger.info("Child GeoElevationMapViewer instance initialized.")
child_logger.info("Child MapEngine and MapVisualizer instances initialized.")
# MODIFIED: REMOVE the old block that called display_map_for_point/area.
# WHY: The initial map view loading is now handled by the GeoElevationMapViewer constructor.
# HOW: Deleted the following if/elif/else block:
# Show map depending on requested operation mode
if operation_mode == 'point' and center_latitude is not None and center_longitude is not None:
child_logger.info(f"Showing point {center_latitude},{center_longitude} via MapVisualizer.show_point")
try:
visual.show_point(center_latitude, center_longitude)
except Exception as e_vis:
child_logger.error(f"Error while showing point via MapVisualizer: {e_vis}")
elif operation_mode == 'area' and area_bounding_box:
child_logger.info(f"Creating area image for bbox {area_bounding_box} via MapEngine.get_image_for_area")
try:
img = engine.get_image_for_area(area_bounding_box, zoom=None, max_size=800)
if img:
visual.show_pil_image(img)
else:
child_logger.error("MapEngine returned no image for area request")
except Exception as e_area:
child_logger.error(f"Error while creating/showing area image: {e_area}")
else:
child_logger.error(f"Invalid operation mode ('{operation_mode}') or missing parameters passed to map process target.")
error_payload = {"type": "map_info_update", "latitude": None, "longitude": None,
"latitude_dms_str": "Error", "longitude_dms_str": "Error",
"elevation_str": f"Fatal Error: Invalid Map Args",
"map_area_size_str": "Invalid Args"}
try: map_interaction_q.put(error_payload)
except Exception as e_put_err: child_logger.error(f"Failed to put error payload to queue before exit: {e_put_err}")
return
# if operation_mode == "point" and center_latitude is not None and center_longitude is not None:
# child_logger.info(f"Calling display_map_for_point for ({center_latitude:.5f},{center_longitude:.5f}).")
# child_map_viewer_instance.display_map_for_point(center_latitude, center_longitude)
@ -294,45 +301,43 @@ def run_map_viewer_process_target(
child_logger.info("Initial map display call complete. Entering OpenCV event loop.")
# Main loop to keep the OpenCV window alive and process events.
# cv2.waitKey() is essential for processing window events and mouse callbacks.
# A non-zero argument (e.g., 100ms) makes it wait for a key press for that duration,
# but more importantly, it allows OpenCV to process the window's message queue.
# Without regular calls to waitKey, window updates and callbacks won't happen.
# Use OpenCV window visibility checks rather than relying on an application-specific
# `map_display_window_controller` attribute which may not exist with the external visualizer.
is_map_active = True
window_name = 'Map' if 'child_cv2' in locals() and child_cv2 is not None else None
while is_map_active:
# Pass a short delay to yield CPU time
key = child_cv2.waitKey(100) # Process events every 100ms
# Pass a short delay to yield CPU time and let OpenCV process window events
key = child_cv2.waitKey(100) if window_name else -1
# Check if the map window is still alive.
# The MapDisplayWindow instance holds the OpenCV window name and can check its property.
if child_map_viewer_instance.map_display_window_controller:
if child_map_viewer_instance.map_display_window_controller.is_window_alive():
# Check for specific key presses (like 'q' or Escape) to allow closing the window via keyboard
if key != -1: # A key was pressed
child_logger.debug(f"Map window received key press: {key}")
# Convert key code to character if it's printable for logging
try:
# Check for 'q' or 'Q' keys (example)
if chr(key & 0xFF) in ('q', 'Q'):
child_logger.info("Map window closing due to 'q'/'Q' key press.")
is_map_active = False # Signal loop to exit
# Check for Escape key (common window close shortcut)
if key == 27:
child_logger.info("Map window closing due to Escape key press.")
is_map_active = False # Signal loop to exit
except ValueError: # Ignore non-printable keys
pass # Do nothing for non-printable keys
try:
# If there's an OpenCV window, check its visibility property. If the window is closed
# by the user, getWindowProperty returns a value < 1. Use that as the signal to exit.
if window_name:
try:
vis = child_cv2.getWindowProperty(window_name, child_cv2.WND_PROP_VISIBLE)
if vis < 1:
child_logger.info("Map window detected as closed (OpenCV).")
break
except Exception:
# If getWindowProperty is unsupported for some reason, fall back to key checks.
pass
# Check for specific key presses (like 'q' or Escape) to allow closing via keyboard
if key != -1:
child_logger.debug(f"Map window received key press: {key}")
try:
if chr(key & 0xFF) in ('q', 'Q'):
child_logger.info("Map window closing due to 'q'/'Q' key press.")
break
if key == 27:
child_logger.info("Map window closing due to Escape key press.")
break
except Exception:
pass
else:
# is_window_alive() returned False, meaning the window was closed (e.g., by user clicking the 'X').
child_logger.info("Map window detected as closed.")
is_map_active = False # Signal loop to exit
else:
# The controller instance is None, indicating an issue or shutdown signal.
child_logger.info("MapDisplayWindow controller is None. Exiting map process loop.")
is_map_active = False # Signal loop to exit
except Exception as e_loop_check:
child_logger.debug(f"Error during OpenCV loop checks: {e_loop_check}")
break
child_logger.info("OpenCV event loop finished. Map viewer process target returning.")