SXXXXXXX_FlightMonitor/flightmonitor/map/map_manager.py
2025-05-27 07:33:08 +02:00

1252 lines
56 KiB
Python

# geoelevation/map_viewer/map_manager.py
"""
Manages the retrieval, caching, and stitching of map tiles from a selected map service.
This module interacts with a map service provider (implementing BaseMapService),
handles local disk caching of tiles to support offline use and reduce network
requests, and assembles individual tiles into a complete map image.
"""
# Standard library imports
import logging
import os
import time
import threading
from pathlib import Path # For robust path manipulation
from typing import Tuple, Optional, List, Dict # Ensure these are available
import io # To handle byte stream from requests as an image
import shutil # For cache clearing operations
# Third-party imports
try:
import requests # For downloading map tiles
REQUESTS_AVAILABLE = True
except ImportError:
requests = None # type: ignore
REQUESTS_AVAILABLE = False
logging.error(
"MapTileManager: 'requests' library not found. Online tile fetching will fail."
)
try:
from PIL import Image, ImageDraw
ImageType = Image.Image # type: ignore
PIL_AVAILABLE_MANAGER = True
except ImportError:
Image = None # type: ignore
ImageDraw = None # type: ignore
ImageType = None # type: ignore
PIL_AVAILABLE_MANAGER = False
logging.error(
"MapTileManager: 'Pillow' library not found. Image operations will fail."
)
try:
# MODIFIED: Import mercantile here.
# WHY: map_manager uses mercantile directly in _get_bounds_for_tile_range.
# HOW: Added import.
import mercantile
except ImportError:
mercantile = None # type: ignore
# No module-specific flag needed here, just check `if mercantile is None:` where used.
logging.warning(
"MapTileManager: 'mercantile' library not found. Tile bounds calculation will fail."
)
# Local application/package imports
from .map_services import BaseMapService
try:
from ..utils.logger import get_logger
logger = get_logger(__name__)
except ImportError:
logger = logging.getLogger(__name__)
logger.warning(
"MapTileManager: Could not import application logger. Using standard logging."
)
# Default values for the manager if not provided or configured
DEFAULT_MAP_TILE_CACHE_ROOT_DIR = "map_tile_cache"
DEFAULT_ENABLE_ONLINE_FETCHING = True
DEFAULT_NETWORK_TIMEOUT_SECONDS = 10
DEFAULT_DOWNLOAD_RETRY_DELAY_SECONDS = 2
DEFAULT_MAX_DOWNLOAD_RETRIES = 2
DEFAULT_PLACEHOLDER_COLOR_RGB = (220, 220, 220)
class MapTileManager:
"""
Manages fetching, caching, and assembling map tiles for a given map service.
Requires 'requests' and 'Pillow' libraries to be installed. 'mercantile'
is required for calculating geographic bounds of tile ranges.
"""
def __init__(
self,
map_service: BaseMapService,
cache_root_directory: Optional[str] = None,
enable_online_tile_fetching: Optional[bool] = None,
tile_pixel_size: Optional[int] = None,
) -> None:
"""
Initializes the MapTileManager.
Args:
map_service_instance: An instance of a map service provider
(e.g., OpenStreetMapService).
cache_root_directory: The root directory for caching tiles from all services.
If None, uses DEFAULT_MAP_TILE_CACHE_ROOT_DIR.
A subdirectory for the specific service will be created.
enable_online_tile_fetching: Whether to download tiles if not found in cache.
If None, uses DEFAULT_ENABLE_ONLINE_FETCHING.
tile_pixel_size: The pixel dimension (width/height) of map tiles for this manager.
If None, the size is taken from the map_service instance.
Raises:
TypeError: If map_service_instance is not a valid BaseMapService instance.
ImportError: If 'requests' or 'Pillow' libraries are not installed.
ValueError: If a tile_pixel_size is provided but invalid.
"""
logger.info("Initializing MapTileManager...")
if not REQUESTS_AVAILABLE:
raise ImportError(
"'requests' library is required by MapTileManager but not found."
)
if not (PIL_AVAILABLE_MANAGER and ImageDraw is not None):
raise ImportError(
"'Pillow' library (or its drawing module ImageDraw) is required by MapTileManager but not fully available."
)
# MODIFIED: Add a check for mercantile availability here, as it's used later.
# WHY: Inform the user early if a required dependency is missing.
# HOW: Added if mercantile is None check.
if mercantile is None:
logger.error(
"'mercantile' library is required for tile bounds calculation but is not found or is None."
)
# Decide if this is a fatal error for the manager init.
# The manager can still fetch/stitch if mercantile is missing, but cannot provide bounds.
# Let's log an error but not raise, allowing partial functionality.
if not isinstance(map_service, BaseMapService):
logger.critical(
"Invalid map_service_instance provided. Must be an instance of BaseMapService."
)
raise TypeError(
"map_service_instance must be an instance of BaseMapService."
)
self.map_service: BaseMapService = map_service
self.service_identifier_name: str = self.map_service.name
if tile_pixel_size is not None:
if not isinstance(tile_pixel_size, int) or tile_pixel_size <= 0:
logger.error(
f"Invalid provided tile_pixel_size: {tile_pixel_size}. Must be a positive integer."
)
raise ValueError(
f"Invalid provided tile_pixel_size: {tile_pixel_size}. Must be a positive integer."
)
self.tile_size: int = tile_pixel_size
logger.info(f"Map tile size explicitly set to {self.tile_size}px.")
else:
self.tile_size: int = self.map_service.tile_size
logger.info(
f"Map tile size inherited from service '{self.map_service.name}': {self.tile_size}px."
)
effective_cache_root_dir = (
cache_root_directory
if cache_root_directory is not None
else DEFAULT_MAP_TILE_CACHE_ROOT_DIR
)
self.service_specific_cache_dir: Path = (
Path(effective_cache_root_dir) / self.service_identifier_name
)
logger.info(
f"Service-specific cache directory set to: {self.service_specific_cache_dir}"
)
self.is_online_fetching_enabled: bool = (
enable_online_tile_fetching
if enable_online_tile_fetching is not None
else DEFAULT_ENABLE_ONLINE_FETCHING
)
logger.info(f"Online tile fetching enabled: {self.is_online_fetching_enabled}")
self.http_user_agent: str = "GeoElevationMapViewer/0.1 (Python Requests)"
self.http_request_headers: Dict[str, str] = {"User-Agent": self.http_user_agent}
self.http_request_timeout_seconds: int = DEFAULT_NETWORK_TIMEOUT_SECONDS
self.download_max_retries: int = DEFAULT_MAX_DOWNLOAD_RETRIES
self.download_retry_delay_seconds: int = DEFAULT_DOWNLOAD_RETRY_DELAY_SECONDS
self._ensure_service_cache_directory_exists()
self._cache_access_lock = threading.Lock()
logger.info(
f"MapTileManager initialized for service '{self.service_identifier_name}'. Online: {self.is_online_fetching_enabled}"
)
def _ensure_service_cache_directory_exists(self) -> None:
"""Creates the service-specific cache directory if it doesn't exist."""
try:
self.service_specific_cache_dir.mkdir(parents=True, exist_ok=True)
logger.debug(
f"Cache directory verified/created: {self.service_specific_cache_dir}"
)
except OSError as e_mkdir:
logger.error(
f"Failed to create cache directory '{self.service_specific_cache_dir}': {e_mkdir}"
)
except Exception as e_unexpected_mkdir:
logger.exception(
f"Unexpected error ensuring cache directory exists: {e_unexpected_mkdir}"
)
def _get_tile_cache_file_path(self, z: int, x: int, y: int) -> Path:
"""
Constructs the full local file path for a specific map tile.
The structure is: <service_cache_dir>/<zoom>/<x_tile>/<y_tile>.png
"""
return self.service_specific_cache_dir / str(z) / str(x) / f"{y}.png"
def _load_tile_from_cache(
self, tile_cache_path: Path, tile_coordinates_log_str: str
) -> Optional[ImageType]:
"""Attempts to load a tile image from a cache file (thread-safe read)."""
logger.debug(
f"Checking cache for tile {tile_coordinates_log_str} at {tile_cache_path}"
)
if not PIL_AVAILABLE_MANAGER:
logger.error("Pillow not available, cannot load tile from cache.")
return None
try:
with self._cache_access_lock:
if tile_cache_path.is_file():
logger.info(
f"Cache hit for tile {tile_coordinates_log_str}. Loading from disk."
)
if Image:
pil_image = Image.open(tile_cache_path)
pil_image.load()
if pil_image.mode != "RGB":
logger.debug(
f"Converting cached image {tile_coordinates_log_str} from mode {pil_image.mode} to RGB."
)
pil_image = pil_image.convert("RGB")
return pil_image
else:
logger.error(
"Pillow's Image class is None, cannot open image from cache."
)
return None
else:
logger.debug(f"Cache miss for tile {tile_coordinates_log_str}.")
return None
except IOError as e_io_cache:
logger.error(
f"IOError reading cached tile {tile_cache_path}: {e_io_cache}. Treating as cache miss."
)
return None
except Exception as e_cache_unexpected:
logger.exception(
f"Unexpected error accessing cache file {tile_cache_path}: {e_cache_unexpected}"
)
return None
def _download_and_save_tile_to_cache(
self,
zoom_level: int,
tile_x: int,
tile_y: int,
tile_cache_path: Path,
tile_coordinates_log_str: str,
) -> Optional[ImageType]:
"""Attempts to download a tile, process it, and save it to the cache."""
if not PIL_AVAILABLE_MANAGER:
logger.error(
"Pillow not available, cannot download, process, or save tile."
)
return None
if not self.is_online_fetching_enabled:
logger.debug(
f"Online fetching disabled. Cannot download tile {tile_coordinates_log_str}."
)
return None
tile_download_url = self.map_service.get_tile_url(zoom_level, tile_x, tile_y)
if not tile_download_url:
logger.error(
f"Failed to get URL for tile {tile_coordinates_log_str} from service."
)
return None
logger.info(
f"Downloading tile {tile_coordinates_log_str} from: {tile_download_url}"
)
downloaded_pil_image: Optional[ImageType] = None
for attempt_num in range(self.download_max_retries + 1):
try:
if requests:
response = requests.get(
tile_download_url,
headers=self.http_request_headers,
timeout=self.http_request_timeout_seconds,
stream=True,
)
response.raise_for_status()
else:
logger.error("Requests library is None, cannot perform HTTP GET.")
break
image_binary_data = response.content
if not image_binary_data:
logger.warning(
f"Downloaded empty content for tile {tile_coordinates_log_str}."
)
break
try:
if Image:
pil_image = Image.open(io.BytesIO(image_binary_data))
pil_image.load()
if pil_image.mode != "RGB":
pil_image = pil_image.convert("RGB")
logger.debug(
f"Tile {tile_coordinates_log_str} downloaded (Attempt {attempt_num + 1})."
)
if tile_cache_path:
self._save_image_to_cache_file(tile_cache_path, pil_image)
else:
logger.error(
f"Invalid tile_cache_path for tile {tile_coordinates_log_str}. Cannot save."
)
downloaded_pil_image = pil_image
break
else:
logger.error(
"Pillow's Image class is None, cannot process downloaded image data."
)
break
except (IOError, Image.UnidentifiedImageError) as e_img_proc:
logger.error(
f"Failed to process image data for {tile_coordinates_log_str}: {e_img_proc}"
)
break
except Exception as e_proc_unexpected:
logger.exception(
f"Unexpected error processing downloaded image {tile_coordinates_log_str}: {e_proc_unexpected}"
)
break
except requests.exceptions.Timeout:
logger.warning(
f"Timeout downloading tile {tile_coordinates_log_str} (Attempt {attempt_num + 1})."
)
except requests.exceptions.RequestException as e_req:
status = getattr(e_req.response, "status_code", "N/A")
logger.warning(
f"Request error for tile {tile_coordinates_log_str} (Status: {status}, Attempt {attempt_num + 1}): {e_req}"
)
if status == 404:
break
except Exception as e_dl_unexpected:
logger.exception(
f"Unexpected error downloading tile {tile_coordinates_log_str} (Attempt {attempt_num + 1}): {e_dl_unexpected}"
)
break
if attempt_num < self.download_max_retries:
logger.debug(
f"Waiting {self.download_retry_delay_seconds}s before retrying for {tile_coordinates_log_str}."
)
time.sleep(self.download_retry_delay_seconds)
if downloaded_pil_image is None:
logger.error(
f"Failed to download tile {tile_coordinates_log_str} after all retries."
)
return downloaded_pil_image
def _save_image_to_cache_file(
self, tile_cache_path: Path, pil_image: ImageType
) -> None:
"""Saves a PIL Image object to a file in the cache (thread-safe write)."""
if not PIL_AVAILABLE_MANAGER or pil_image is None:
logger.error(
"Pillow not available or image is None, cannot save tile to cache."
)
return
with self._cache_access_lock:
try:
tile_cache_path.parent.mkdir(parents=True, exist_ok=True)
if Image:
pil_image.save(tile_cache_path, format="PNG")
logger.debug(f"Saved tile to cache: {tile_cache_path}")
else:
logger.error("Pillow's Image class is None, cannot save image.")
except IOError as e_io_save:
logger.error(
f"IOError saving tile to cache {tile_cache_path}: {e_io_save}"
)
except Exception as e_save_unexpected:
logger.exception(
f"Unexpected error saving tile to cache {tile_cache_path}: {e_save_unexpected}"
)
def get_tile_image(
self,
zoom_level: int,
tile_x: int,
tile_y: int,
force_online_refresh: bool = False,
) -> Optional[ImageType]:
"""
Retrieves a map tile image, using local cache first.
If not cached or refresh is forced, attempts to download (if enabled).
Returns a placeholder image if the tile cannot be retrieved.
Args:
zoom_level: The zoom level of the tile.
tile_x: The X coordinate of the tile.
tile_y: The Y coordinate of the tile.
force_online_refresh: If True, bypasses cache and attempts download.
Returns:
A PIL Image object of the tile, or a placeholder image on failure.
Returns None only if placeholder creation itself fails critically.
"""
tile_coords_log_str = f"({zoom_level},{tile_x},{tile_y})"
logger.debug(f"Requesting tile image for {tile_coords_log_str}")
if not self.map_service.is_zoom_level_valid(zoom_level):
logger.error(
f"Invalid zoom level {zoom_level} for map service '{self.service_identifier_name}'. Cannot get tile."
)
if PIL_AVAILABLE_MANAGER:
return self._create_placeholder_tile_image(
f"Invalid Zoom {zoom_level} Tile {tile_coords_log_str}"
)
else:
logger.error(
"Pillow not available, cannot create placeholder for invalid zoom tile."
)
return None
tile_cache_file = self._get_tile_cache_file_path(zoom_level, tile_x, tile_y)
retrieved_image: Optional[ImageType] = None
if not force_online_refresh:
retrieved_image = self._load_tile_from_cache(
tile_cache_file, tile_coords_log_str
)
if retrieved_image is None:
retrieved_image = self._download_and_save_tile_to_cache(
zoom_level, tile_x, tile_y, tile_cache_file, tile_coords_log_str
)
if retrieved_image is None:
logger.warning(
f"Failed to retrieve tile {tile_coords_log_str}. Using placeholder."
)
retrieved_image = self._create_placeholder_tile_image(tile_coords_log_str)
if retrieved_image is None:
logger.critical(
"Failed to create even a placeholder tile. Returning None."
)
return retrieved_image
def stitch_map_image(
self,
zoom_level: int,
x_tile_range: Tuple[int, int],
y_tile_range: Tuple[int, int],
) -> Optional[ImageType]:
"""
Retrieves and stitches multiple map tiles to form a larger composite map image.
Args:
zoom_level: The zoom level for all tiles.
x_tile_range: Inclusive start and end X tile coordinates (min_x, max_x).
y_tile_range: Inclusive start and end Y tile coordinates (min_y, max_y).
Returns:
A PIL Image object of the stitched map, or None if a critical error occurs.
Missing individual tiles are replaced by placeholders.
"""
logger.info(
f"Request to stitch map: Zoom {zoom_level}, X-Range {x_tile_range}, Y-Range {y_tile_range}"
)
min_tile_x, max_tile_x = x_tile_range
min_tile_y, max_tile_y = y_tile_range
if not (min_tile_x <= max_tile_x and min_tile_y <= max_tile_y):
logger.error(
f"Invalid tile ranges for stitching: X={x_tile_range}, Y={y_tile_range}"
)
if PIL_AVAILABLE_MANAGER:
try:
placeholder_img = Image.new(
"RGB", (self.tile_size * 3, self.tile_size * 3), (255, 150, 150)
)
if ImageDraw:
draw = ImageDraw.Draw(placeholder_img)
error_text = f"Stitch Failed\nInvalid Tile Ranges:\nX={x_tile_range}, Y={y_tile_range}"
try:
try:
from geoelevation.image_processor import DEFAULT_FONT
font_to_use = DEFAULT_FONT
except ImportError:
font_to_use = None
if font_to_use:
if hasattr(draw, "textbbox"):
try:
text_bbox = draw.textbbox(
(0, 0),
error_text,
font=font_to_use,
spacing=2,
)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
text_x = (
placeholder_img.width - text_width
) // 2
text_y = (
placeholder_img.height - text_height
) // 2
draw.text(
(text_x, text_y),
error_text,
fill="black",
font=font_to_use,
spacing=2,
anchor="lt",
)
except AttributeError:
logger.warning(
"Pillow textbbox not available (Pillow < 8.0). Using textsize fallback for error text."
)
text_width, text_height = draw.textsize(
error_text, font=font_to_use
)
text_x = (
placeholder_img.width - text_width
) // 2
text_y = (
placeholder_img.height - text_height
) // 2
draw.text(
(text_x, text_y),
error_text,
fill="black",
font=font_to_use,
)
except Exception as e_font_draw:
logger.warning(
f"Error drawing text with font on placeholder: {e_font_draw}. Falling back to simple draw."
)
draw.text((10, 10), error_text, fill="black")
else:
if font_to_use:
text_width, text_height = draw.textsize(
error_text, font=font_to_use
)
text_x = (
placeholder_img.width - text_width
) // 2
text_y = (
placeholder_img.height - text_height
) // 2
draw.text(
(text_x, text_y),
error_text,
fill="black",
font=font_to_use,
)
else:
draw.text((10, 10), error_text, fill="black")
except Exception as e_draw:
logger.warning(
f"Error drawing text on placeholder for invalid ranges: {e_draw}. Falling back to simple draw."
)
if ImageDraw:
draw.text((10, 10), error_text, fill="black")
return placeholder_img
except Exception as e_placeholder_fail:
logger.exception(
f"Failed to create placeholder for invalid tile ranges: {e_placeholder_fail}"
)
return None
else:
logger.error(
"Pillow not available, cannot create placeholder for invalid ranges."
)
return None
single_tile_pixel_size = self.tile_size
if single_tile_pixel_size <= 0:
logger.error(
f"Invalid tile size ({single_tile_pixel_size}) stored in manager. Cannot stitch."
)
if PIL_AVAILABLE_MANAGER:
try:
if ImageDraw is not None:
placeholder_img = Image.new(
"RGB", (256 * 3, 256 * 3), DEFAULT_PLACEHOLDER_COLOR_RGB
)
draw = ImageDraw.Draw(placeholder_img)
error_text = f"Stitch Failed\nInvalid Tile Size:\n{single_tile_pixel_size}"
try:
try:
from geoelevation.image_processor import DEFAULT_FONT
font_to_use = DEFAULT_FONT
except ImportError:
font_to_use = None
if font_to_use:
if hasattr(draw, "textbbox"):
try:
text_bbox = draw.textbbox(
(0, 0),
error_text,
font=font_to_use,
spacing=2,
)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
text_pos = (
(placeholder_img.width - text_width) // 2,
(placeholder_img.height - text_height) // 2,
)
draw.text(
text_pos,
error_text,
fill="black",
font=font_to_use,
spacing=2,
anchor="lt",
)
except AttributeError:
text_width, text_height = draw.textsize(
error_text, font=font_to_use
)
text_pos = (
(placeholder_img.width - text_width) // 2,
(placeholder_img.height - text_height) // 2,
)
draw.text(
text_pos,
error_text,
fill="black",
font=font_to_use,
)
except Exception as e_font_draw:
logger.warning(
f"Error drawing text with font on placeholder: {e_font_draw}. Falling back to simple draw."
)
draw.text((10, 10), error_text, fill="black")
else:
if font_to_use:
text_width, text_height = draw.textsize(
error_text, font=font_to_use
)
text_pos = (
(placeholder_img.width - text_width) // 2,
(placeholder_img.height - text_height) // 2,
)
draw.text(
text_pos,
error_text,
fill="black",
font=font_to_use,
)
else:
draw.text((10, 10), error_text, fill="black")
else:
draw.text((10, 10), error_text, fill="black")
except Exception as e_draw:
logger.warning(
f"Error drawing text on placeholder: {e_draw}. Falling back to simple draw."
)
if ImageDraw:
draw.text((10, 10), error_text, fill="black")
return placeholder_img
else:
logger.error(
"Pillow's ImageDraw class is None, cannot create placeholder image."
)
return None
except Exception as e_placeholder_fail:
logger.exception(
f"Failed to create large placeholder for stitching error: {e_placeholder_fail}"
)
return None
else:
logger.error(
"Pillow not available, cannot create placeholder for invalid tile size."
)
return None
num_tiles_wide = (max_tile_x - min_tile_x) + 1
num_tiles_high = (max_tile_y - min_tile_y) + 1
total_image_width = num_tiles_wide * single_tile_pixel_size
total_image_height = num_tiles_high * single_tile_pixel_size
logger.debug(
f"Stitched image dimensions: {total_image_width}x{total_image_height} "
f"({num_tiles_wide}x{num_tiles_high} tiles of {single_tile_pixel_size}px)"
)
MAX_IMAGE_DIMENSION = 16384
if (
total_image_width > MAX_IMAGE_DIMENSION
or total_image_height > MAX_IMAGE_DIMENSION
):
logger.error(
f"Requested stitched image size ({total_image_width}x{total_image_height}) "
f"exceeds maximum allowed dimension ({MAX_IMAGE_DIMENSION}). Aborting stitch."
)
if PIL_AVAILABLE_MANAGER:
try:
if ImageDraw is not None:
placeholder_img = Image.new(
"RGB", (256 * 3, 256 * 3), (255, 100, 100)
)
draw = ImageDraw.Draw(placeholder_img)
error_text = f"Stitch Failed\nImage too large:\n{total_image_width}x{total_image_height}px"
try:
try:
from geoelevation.image_processor import DEFAULT_FONT
font_to_use = DEFAULT_FONT
except ImportError:
font_to_use = None
if font_to_use:
if hasattr(draw, "textbbox"):
try:
text_bbox = draw.textbbox(
(0, 0),
error_text,
font=font_to_use,
spacing=2,
)
text_w = text_bbox[2] - text_bbox[0]
text_h = text_bbox[3] - text_bbox[1]
text_pos = (
(placeholder_img.width - text_w) // 2,
(placeholder_img.height - text_h) // 2,
)
draw.text(
text_pos,
error_text,
fill="black",
font=font_to_use,
spacing=2,
anchor="lt",
)
except AttributeError:
text_w, text_h = draw.textsize(
error_text, font=font_to_use
)
text_pos = (
(placeholder_img.width - text_w) // 2,
(placeholder_img.height - text_h) // 2,
)
draw.text(
text_pos,
error_text,
fill="black",
font=font_to_use,
)
except Exception as e_font_draw:
logger.warning(
f"Error drawing text with font on placeholder: {e_font_draw}. Falling back to simple draw."
)
draw.text((10, 10), error_text, fill="black")
else:
if font_to_use:
text_w, text_h = draw.textsize(
error_text, font=font_to_use
)
text_pos = (
(placeholder_img.width - text_w) // 2,
(placeholder_img.height - text_h) // 2,
)
draw.text(
text_pos,
error_text,
fill="black",
font=font_to_use,
)
else:
draw.text((10, 10), error_text, fill="black")
else:
draw.text((10, 10), error_text, fill="black")
except Exception as e_draw:
logger.warning(
f"Error drawing text on placeholder: {e_draw}. Falling back to simple draw."
)
if ImageDraw:
draw.text((10, 10), error_text, fill="black")
return placeholder_img
else:
logger.error(
"Pillow's ImageDraw class is None, cannot create placeholder image."
)
return None
except Exception as e_placeholder_fail:
logger.exception(
f"Failed to create large placeholder for size error: {e_placeholder_fail}"
)
return None
else:
logger.error(
"Pillow not available, cannot create placeholder for excessive size request."
)
return None
try:
if PIL_AVAILABLE_MANAGER:
if Image:
stitched_map_image = Image.new(
"RGB", (total_image_width, total_image_height)
)
else:
raise ImportError("Pillow's Image class is None.")
else:
raise ImportError("Pillow not available to create new image.")
except Exception as e_create_blank:
logger.exception(
f"Failed to create blank image for stitching: {e_create_blank}. Dimensions: {total_image_width}x{total_image_height}"
)
if PIL_AVAILABLE_MANAGER:
try:
if ImageDraw is not None:
placeholder_img = Image.new(
"RGB", (256 * 3, 256 * 3), (255, 100, 100)
)
draw = ImageDraw.Draw(placeholder_img)
error_text = (
f"Stitch Failed\nCannot create image:\n{e_create_blank}"
)
try:
try:
from geoelevation.image_processor import DEFAULT_FONT
font_to_use = DEFAULT_FONT
except ImportError:
font_to_use = None
if font_to_use:
if hasattr(draw, "textbbox"):
try:
text_bbox = draw.textbbox(
(0, 0),
error_text,
font=font_to_use,
spacing=2,
)
text_w = text_bbox[2] - text_bbox[0]
text_h = text_bbox[3] - text_bbox[1]
text_pos = (
(placeholder_img.width - text_w) // 2,
(placeholder_img.height - text_h) // 2,
)
draw.text(
text_pos,
error_text,
fill="black",
font=font_to_use,
spacing=2,
anchor="lt",
)
except AttributeError:
text_w, text_h = draw.textsize(
error_text, font=font_to_use
)
text_pos = (
(placeholder_img.width - text_w) // 2,
(placeholder_img.height - text_h) // 2,
)
draw.text(
text_pos,
error_text,
fill="black",
font=font_to_use,
)
except Exception as e_font_draw:
logger.warning(
f"Error drawing text with font on placeholder: {e_font_draw}. Falling back to simple draw."
)
draw.text((10, 10), error_text, fill="black")
else:
if font_to_use:
text_w, text_h = draw.textsize(
error_text, font=font_to_use
)
text_pos = (
(placeholder_img.width - text_w) // 2,
(placeholder_img.height - text_h) // 2,
)
draw.text(
text_pos,
error_text,
fill="black",
font=font_to_use,
)
else:
draw.text((10, 10), error_text, fill="black")
else:
draw.text((10, 10), error_text, fill="black")
except Exception as e_draw:
logger.warning(
f"Error drawing text on placeholder: {e_draw}. Falling back to simple draw."
)
if ImageDraw:
draw.text((10, 10), error_text, fill="black")
return placeholder_img
else:
logger.error(
"Pillow's ImageDraw class is None, cannot create placeholder image."
)
return None
except Exception as e_placeholder_fail:
logger.exception(
f"Failed to create large placeholder for memory error: {e_placeholder_fail}"
)
return None
else:
logger.error(
"Pillow not available, cannot create placeholder for image creation error."
)
return None
for row_index, current_tile_y in enumerate(range(min_tile_y, max_tile_y + 1)):
for col_index, current_tile_x in enumerate(
range(min_tile_x, max_tile_x + 1)
):
tile_image_pil = self.get_tile_image(
zoom_level, current_tile_x, current_tile_y
)
if tile_image_pil is None:
logger.critical(
f"Critical error: get_tile_image returned None for ({zoom_level},{current_tile_x},{current_tile_y}). Aborting stitch."
)
return None
paste_position_x = col_index * single_tile_pixel_size
paste_position_y = row_index * single_tile_pixel_size
logger.debug(
f"Pasting tile ({zoom_level},{current_tile_x},{current_tile_y}) at "
f"pixel position ({paste_position_x},{paste_position_y})"
)
try:
if tile_image_pil and tile_image_pil.size != (
self.tile_size,
self.tile_size,
):
logger.warning(
f"Tile image size {tile_image_pil.size} doesn't match expected {self.tile_size}. Resizing for stitch."
)
if PIL_AVAILABLE_MANAGER and Image:
tile_image_pil = tile_image_pil.resize(
(self.tile_size, self.tile_size),
Image.Resampling.LANCZOS,
)
else:
logger.error(
"Pillow not available, cannot resize tile for stitch."
)
continue
if tile_image_pil:
stitched_map_image.paste(
tile_image_pil, (paste_position_x, paste_position_y)
)
except Exception as e_paste:
logger.exception(
f"Error pasting tile ({zoom_level},{current_tile_x},{current_tile_y}) "
f"at ({paste_position_x},{paste_position_y}): {e_paste}"
)
logger.info(
f"Map stitching complete for zoom {zoom_level}, X={x_tile_range}, Y={y_tile_range}."
)
return stitched_map_image
def _create_placeholder_tile_image(
self, identifier: str = "N/A"
) -> Optional[ImageType]:
"""
Creates and returns a placeholder tile image (e.g., a grey square).
Includes optional text identifier on the placeholder.
"""
if not (PIL_AVAILABLE_MANAGER and ImageDraw is not None):
logger.warning(
"Cannot create placeholder tile: Pillow or ImageDraw library not available."
)
return None
try:
tile_pixel_size = self.tile_size
placeholder_color = DEFAULT_PLACEHOLDER_COLOR_RGB
if Image:
placeholder_img = Image.new(
"RGB", (tile_pixel_size, tile_pixel_size), color=placeholder_color
)
else:
logger.error(
"Pillow's Image class is None, cannot create placeholder image."
)
return None
if ImageDraw:
draw = ImageDraw.Draw(placeholder_img)
else:
logger.error(
"Pillow's ImageDraw class is None, cannot draw on placeholder."
)
return placeholder_img
overlay_text = f"Tile Fail\n{identifier}"
try:
try:
from geoelevation.image_processor import DEFAULT_FONT
font_to_use = DEFAULT_FONT
except ImportError:
font_to_use = None
if font_to_use is not None:
if hasattr(draw, "textbbox"):
try:
text_bbox = draw.textbbox(
(0, 0), overlay_text, font=font_to_use, spacing=2
)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
text_x = (tile_pixel_size - text_width) // 2
text_y = (tile_pixel_size - text_height) // 2
bg_padding = 1
bg_coords = [
text_x - bg_padding,
text_y - bg_padding,
text_x + text_width + bg_padding,
text_y + text_height + bg_padding,
]
draw.rectangle(bg_coords, fill="rgba(0, 0, 0, 150)")
draw.text(
(text_x, text_y),
overlay_text,
fill="white",
font=font_to_use,
spacing=2,
anchor="lt",
)
except AttributeError:
logger.warning(
"Pillow textbbox not available (Pillow < 8.0). Using textsize fallback for placeholder text."
)
if font_to_use:
text_width, text_height = draw.textsize(
overlay_text, font=font_to_use
)
if "\n" in overlay_text:
line_count = overlay_text.count("\n") + 1
pass
text_x = (tile_pixel_size - text_width) // 2
text_y = (tile_pixel_size - text_height) // 2
bg_padding = 1
bg_coords = [
text_x - bg_padding,
text_y - bg_padding,
text_x + text_width + bg_padding,
text_y + text_height + bg_padding,
]
draw.rectangle(bg_coords, fill="rgba(0, 0, 0, 150)")
draw.text(
(text_x, text_y),
overlay_text,
fill="white",
font=font_to_use,
)
else:
draw.text((10, 10), overlay_text, fill="black")
except Exception as e_font_draw:
logger.warning(
f"Error drawing text with font on placeholder: {e_font_draw}. Falling back to simple draw."
)
draw.text((10, 10), overlay_text, fill="black")
else:
if font_to_use:
draw.text(
(10, 10), overlay_text, fill="black", font=font_to_use
)
else:
draw.text((10, 10), overlay_text, fill="black")
else:
logger.debug("No font available for placeholder text drawing.")
draw.text((10, 10), overlay_text, fill="black")
except Exception as e_draw:
logger.warning(
f"Error drawing text on placeholder: {e_draw}. Falling back to simple draw."
)
if ImageDraw:
draw.text((10, 10), overlay_text, fill="black")
return placeholder_img
except Exception as e_placeholder:
logger.exception(f"Error creating placeholder tile image: {e_placeholder}")
return None
def _get_bounds_for_tile_range(
self, zoom: int, tile_ranges: Tuple[Tuple[int, int], Tuple[int, int]]
) -> Optional[Tuple[float, float, float, float]]:
"""
Calculates the precise geographic bounds covered by a given range of tiles.
Requires 'mercantile' library.
"""
# MODIFIED: Removed check for MERCANTILE_MODULE_LOCALLY_AVAILABLE.
# WHY: This constant is defined in map_utils.py, not map_manager.py.
# Instead, check if the locally imported `mercantile` is None.
# HOW: Removed the check for MERCANTILE_MODULE_LOCALLY_AVAILABLE.
if (
mercantile is None
): # Check if mercantile is None from the local import attempt
logger.error(
"mercantile library not found or is None, cannot calculate bounds for tile range."
)
return None
try:
min_x, max_x = tile_ranges[0]
min_y, max_y = tile_ranges[1]
# Get bounds of the top-left tile and bottom-right tile
# mercantile.bounds(x, y, z) returns (west, south, east, north)
# MODIFIED: Check for mercantile before using mercantile.bounds.
# WHY: Prevent AttributeError. (Redundant with the check at the start of the method, but defensive).
# HOW: Added `if mercantile:`.
if mercantile:
top_left_tile_bounds = mercantile.bounds(min_x, min_y, zoom)
bottom_right_tile_bounds = mercantile.bounds(max_x, max_y, zoom)
else:
# This branch should ideally not be hit due to the check at the method start.
logger.error(
"mercantile library is None during bounds calculation step."
)
return None
overall_west_lon = top_left_tile_bounds.west
overall_south_lat = bottom_right_tile_bounds.south
overall_east_lon = bottom_right_tile_bounds.east
overall_north_lat = top_left_tile_bounds.north
return (
overall_west_lon,
overall_south_lat,
overall_east_lon,
overall_north_lat,
)
except Exception as e_bounds_calc:
logger.exception(
f"Error calculating geographic bounds for tile range {tile_ranges} at zoom {zoom}: {e_bounds_calc}"
)
return None
def clear_entire_service_cache(self) -> None:
"""Deletes all cached tiles for the current map service."""
logger.info(
f"Attempting to clear entire cache for service '{self.service_identifier_name}' at {self.service_specific_cache_dir}"
)
if not self.service_specific_cache_dir.exists():
logger.warning(
f"Cache directory '{self.service_specific_cache_dir}' does not exist. Nothing to clear."
)
return
with self._cache_access_lock:
try:
if self.service_specific_cache_dir.is_dir():
shutil.rmtree(self.service_specific_cache_dir)
logger.info(
f"Successfully cleared cache at {self.service_specific_cache_dir}."
)
self._ensure_service_cache_directory_exists()
else:
logger.warning(
f"Cache path '{self.service_specific_cache_dir}' is not a directory."
)
except OSError as e_os_clear:
logger.error(
f"OS Error clearing cache at '{self.service_specific_cache_dir}': {e_os_clear}"
)
except Exception as e_clear_unexpected:
logger.exception(
f"Unexpected error clearing cache '{self.service_specific_cache_dir}': {e_clear_unexpected}"
)