# elevation_manager.py import os import math import logging import netrc import time import zipfile import io from typing import Tuple, Optional, Dict, List import requests from requests.auth import HTTPBasicAuth import numpy as np # Importa numpy per il tipo di ritorno di get_hgt_data # Importa rasterio qui se serve solo per get_hgt_data o internamente try: import rasterio from rasterio.errors import RasterioIOError RASTERIO_AVAILABLE = True except ImportError: RASTERIO_AVAILABLE = False RasterioIOError = Exception # Definisci un fallback per l'eccezione logging.warning( "Rasterio library not found. Elevation data reading will not be available." ) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) # --- Configuration --- NASADEM_DATA_BASE_URL = ( "https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/NASADEM_HGT.001/" ) NASADEM_BROWSE_BASE_URL = ( "https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-public/NASADEM_HGT.001/" ) EARTHDATA_MACHINE = "urs.earthdata.nasa.gov" BROWSE_IMAGE_SUFFIX_VERSION = ".1" BROWSE_IMAGE_EXTENSION = ".jpg" # --- End Configuration --- class ElevationManager: """ Manages fetching, caching, and querying NASADEM HGT v001 elevation data and associated browse images from NASA Earthdata Cloud. Handles authentication, download, extraction, and provides access methods. """ def __init__(self, tile_directory: str = "map_elevation"): """ Initializes the ElevationManager. Args: tile_directory (str): Root directory for storing HGT data and browse images. """ if not isinstance(tile_directory, str) or not tile_directory: raise ValueError("tile_directory must be a non-empty string") self.root_directory: str = tile_directory self.tile_directory: str = os.path.join(self.root_directory, "hgt_tiles") self.browse_directory: str = os.path.join(self.root_directory, "browse_images") self.nodata_value: Optional[int] = -32768 self.earthdata_credentials: Optional[Tuple[str, str]] = None self._read_netrc_credentials() self._setup_directories() def _read_netrc_credentials(self): """Reads Earthdata Login credentials from the .netrc file.""" try: netrc_path = os.path.join(os.path.expanduser("~"), ".netrc") if not os.path.exists(netrc_path): logging.warning( f"Authentication Warning: .netrc file not found at {netrc_path}. Data downloads might fail." ) return netrc_info = netrc.netrc(netrc_path) auth_tokens = netrc_info.authenticators(EARTHDATA_MACHINE) if auth_tokens: username, _, password = auth_tokens if username and password: self.earthdata_credentials = (username, password) logging.info( f"Successfully read credentials for {EARTHDATA_MACHINE} from .netrc." ) else: logging.error( f"Incomplete credentials for {EARTHDATA_MACHINE} in .netrc." ) else: logging.warning( f"No credentials found for machine '{EARTHDATA_MACHINE}' in {netrc_path}." ) except (FileNotFoundError, netrc.NetrcParseError) as e: logging.error(f"Error reading/parsing .netrc file: {e}") except Exception as e: logging.error(f"Unexpected error reading .netrc credentials: {e}") def _setup_directories(self): """Creates the necessary cache directories.""" try: os.makedirs(self.tile_directory, exist_ok=True) os.makedirs(self.browse_directory, exist_ok=True) logging.info(f"HGT tile cache directory set to: {self.tile_directory}") logging.info( f"Browse image cache directory set to: {self.browse_directory}" ) except OSError as e: logging.error( f"Failed to create cache directories under {self.root_directory}: {e}" ) raise # Rilancia errore critico def _get_tile_base_coordinates( self, latitude: float, longitude: float ) -> Tuple[int, int]: """Calculates integer coordinates of the bottom-left corner.""" if longitude >= 180.0: longitude = 179.9999999 if longitude < -180.0: longitude = -180.0 if not (-90 <= latitude < 90): raise ValueError("Latitude must be between -90 and < 90 degrees.") return int(math.floor(latitude)), int(math.floor(longitude)) def _get_nasadem_tile_base_name(self, lat_coord: int, lon_coord: int) -> str: """Generates the NASADEM base name component (e.g., 'n45e007').""" lat_prefix = "n" if lat_coord >= 0 else "s" lon_prefix = "e" if lon_coord >= 0 else "w" return f"{lat_prefix}{abs(lat_coord):02d}{lon_prefix}{abs(lon_coord):03d}" def _get_download_url( self, tile_base_name: str, file_type: str = "data" ) -> Optional[str]: """Constructs the download URL based on file type.""" directory_name = f"NASADEM_HGT_{tile_base_name}" if file_type == "data": if not NASADEM_DATA_BASE_URL: logging.error("DATA URL not configured.") return None filename = f"{directory_name}.zip" url = f"{NASADEM_DATA_BASE_URL.rstrip('/')}/{directory_name}/{filename}" log_prefix = "data" elif file_type == "browse": if not NASADEM_BROWSE_BASE_URL: logging.error("BROWSE URL not configured.") return None filename = ( f"{directory_name}{BROWSE_IMAGE_SUFFIX_VERSION}{BROWSE_IMAGE_EXTENSION}" ) url = f"{NASADEM_BROWSE_BASE_URL.rstrip('/')}/{directory_name}/{filename}" log_prefix = "browse" else: logging.error(f"Invalid file_type: {file_type}") return None logging.debug(f"Constructed {log_prefix} URL: {url}") return url def _get_local_hgt_path(self, lat_coord: int, lon_coord: int) -> str: """Gets the full local path for the extracted HGT file.""" lat_prefix = "N" if lat_coord >= 0 else "S" lon_prefix = "E" if lon_coord >= 0 else "W" hgt_filename = ( f"{lat_prefix}{abs(lat_coord):02d}{lon_prefix}{abs(lon_coord):03d}.hgt" ) return os.path.join(self.tile_directory, hgt_filename) def _get_local_browse_path(self, lat_coord: int, lon_coord: int) -> str: """Gets the full local path for the downloaded browse image file.""" tile_base_name = self._get_nasadem_tile_base_name(lat_coord, lon_coord) browse_filename = f"NASADEM_HGT_{tile_base_name}{BROWSE_IMAGE_SUFFIX_VERSION}{BROWSE_IMAGE_EXTENSION}" return os.path.join(self.browse_directory, browse_filename) def _perform_download( self, download_url: str, local_path: str, use_auth: bool, is_zip: bool, max_retries: int = 1, retry_delay: int = 5, ) -> Optional[bytes]: """Helper function to download a file with optional auth and retries.""" if not download_url: return None headers = {"User-Agent": "Mozilla/5.0 ..."} # Definire user agent completo auth = ( HTTPBasicAuth(self.earthdata_credentials[0], self.earthdata_credentials[1]) if use_auth and self.earthdata_credentials else None ) session = requests.Session() file_content = None log_file_type = "ZIP DATA" if is_zip else "BROWSE IMAGE" for attempt in range(max_retries + 1): try: # Scarica tutto in memoria per ora (sia zip che browse) response = session.get( download_url, stream=False, timeout=60 if is_zip else 30, headers=headers, auth=auth, allow_redirects=True, ) response.raise_for_status() file_content = response.content if not file_content: logging.warning( f"Downloaded {log_file_type} content is empty for {download_url}." ) break logging.debug( f"{log_file_type} content downloaded ({len(file_content)} bytes)." ) break # Success except requests.exceptions.HTTPError as e: status_code = e.response.status_code logging.error( f"HTTP error downloading {log_file_type} {download_url}: Status {status_code}" ) if status_code in [404, 401, 403] or status_code < 500: break # Non ritentare errori client except requests.exceptions.RequestException as e: logging.warning( f"Network error ({type(e).__name__}) downloading {log_file_type} {download_url}. Retrying... (Attempt {attempt + 1}/{max_retries + 1})" ) except Exception as e: logging.error( f"Unexpected error during {log_file_type} download {download_url}: {e}", exc_info=True, ) break if attempt < max_retries: time.sleep(retry_delay) else: logging.error( f"{log_file_type} download failed for {download_url} after {max_retries + 1} attempts." ) session.close() return file_content def _download_and_extract_hgt(self, lat_coord: int, lon_coord: int) -> bool: """Downloads zipped HGT (using auth) and extracts it.""" tile_base_name = self._get_nasadem_tile_base_name(lat_coord, lon_coord) local_hgt_path = self._get_local_hgt_path(lat_coord, lon_coord) data_download_url = self._get_download_url(tile_base_name, file_type="data") zip_content_bytes = self._perform_download( data_download_url, "memory", use_auth=True, is_zip=True ) if not zip_content_bytes: logging.error(f"Failed to download HGT zip for ({lat_coord},{lon_coord}).") return False try: with io.BytesIO(zip_content_bytes) as zip_content_stream: with zipfile.ZipFile(zip_content_stream, "r") as zip_ref: hgt_files = [ name for name in zip_ref.namelist() if name.lower().endswith(".hgt") ] if not hgt_files: logging.error(f"No .hgt in zip: {tile_base_name}.zip") return False hgt_filename = hgt_files[0] with open(local_hgt_path, "wb") as f_out: f_out.write(zip_ref.read(hgt_filename)) logging.info(f"Successfully extracted HGT to: {local_hgt_path}") return True except zipfile.BadZipFile: logging.error( f"Downloaded data for ({lat_coord},{lon_coord}) is not a valid zip." ) return False except Exception as e: logging.error( f"Failed to extract HGT for ({lat_coord},{lon_coord}): {e}", exc_info=True, ) return False def _download_browse_image(self, lat_coord: int, lon_coord: int) -> bool: """Downloads the public browse image file if it doesn't exist locally.""" local_browse_path = self._get_local_browse_path(lat_coord, lon_coord) if os.path.exists(local_browse_path): return True # Già presente tile_base_name = self._get_nasadem_tile_base_name(lat_coord, lon_coord) browse_download_url = self._get_download_url(tile_base_name, file_type="browse") browse_content_bytes = self._perform_download( browse_download_url, local_browse_path, use_auth=False, is_zip=False ) if not browse_content_bytes: logging.warning( f"Failed to download browse image for ({lat_coord},{lon_coord})." ) return False try: browse_dir = os.path.dirname(local_browse_path) os.makedirs(browse_dir, exist_ok=True) # Assicura esista dir with open(local_browse_path, "wb") as f_img: f_img.write(browse_content_bytes) logging.info(f"Successfully saved browse image to: {local_browse_path}") return True except Exception as e: logging.error( f"Failed to save browse image to {local_browse_path}: {e}", exc_info=True, ) return False def _ensure_tile_available(self, lat_coord: int, lon_coord: int) -> bool: """Ensures HGT is available and attempts browse download. Returns HGT status.""" local_hgt_path = self._get_local_hgt_path(lat_coord, lon_coord) hgt_available = os.path.exists(local_hgt_path) if not hgt_available: logging.info( f"HGT for ({lat_coord},{lon_coord}) not found locally, attempting download." ) hgt_available = self._download_and_extract_hgt(lat_coord, lon_coord) if hgt_available: # Se HGT ok, tenta download browse (non bloccante per il risultato) self._download_browse_image(lat_coord, lon_coord) else: logging.error(f"Could not obtain HGT for ({lat_coord},{lon_coord}).") return hgt_available # --- Metodi Pubblici --- def get_tile_info(self, latitude: float, longitude: float) -> Optional[Dict]: """ Returns dictionary with info about the tile containing the coordinates. Ensures the tile (HGT and potentially browse) is downloaded if possible. """ try: lat_coord, lon_coord = self._get_tile_base_coordinates(latitude, longitude) except ValueError as e: logging.error(f"Invalid coordinates for get_tile_info: {e}") return None # Assicura che HGT (e browse) siano tentati di scaricare hgt_success = self._ensure_tile_available(lat_coord, lon_coord) # Costruisci il dizionario informativo tile_base_name = self._get_nasadem_tile_base_name(lat_coord, lon_coord) local_hgt_path = self._get_local_hgt_path(lat_coord, lon_coord) local_browse_path = self._get_local_browse_path(lat_coord, lon_coord) info = { "latitude_coord": lat_coord, "longitude_coord": lon_coord, "tile_base_name": tile_base_name, "hgt_file_path": local_hgt_path if os.path.exists(local_hgt_path) else None, "browse_image_path": ( local_browse_path if os.path.exists(local_browse_path) else None ), "hgt_available": hgt_success, "browse_available": ( os.path.exists(local_browse_path) if local_browse_path else False ), } return info def get_elevation(self, latitude: float, longitude: float) -> Optional[float]: """Gets elevation for a point, downloading data if needed.""" if not RASTERIO_AVAILABLE: logging.error("Cannot get elevation: Rasterio library is not installed.") return None tile_info = self.get_tile_info(latitude, longitude) if not tile_info or not tile_info.get("hgt_available"): logging.warning( f"Elevation not available: HGT data could not be obtained for ({latitude},{longitude})." ) return None tile_filepath = tile_info["hgt_file_path"] if not tile_filepath: # Doppia verifica logging.error("Inconsistency: HGT marked available but path is None.") return None try: with rasterio.open(tile_filepath) as dataset: nodata = ( dataset.nodata if dataset.nodata is not None else self.nodata_value ) try: row, col = dataset.index(longitude, latitude) except IndexError: logging.warning(f"Coords outside tile bounds {tile_filepath}.") return None if not (0 <= row < dataset.height and 0 <= col < dataset.width): logging.warning(f"Pixel outside raster dims.") return None window = rasterio.windows.Window( col_off=col, row_off=row, width=1, height=1 ) elevation_value = dataset.read(1, window=window)[0, 0] if nodata is not None and int(elevation_value) == int(nodata): logging.warning(f"Coords fall on nodata point.") return float("nan") if not isinstance( elevation_value, (int, float, complex) ) and not hasattr(elevation_value, "item"): logging.error( f"Unexpected raster data type: {type(elevation_value)}" ) return None elevation_meters = float(elevation_value) logging.info(f"Elevation found: {elevation_meters:.2f} meters") return elevation_meters except RasterioIOError as e: logging.error(f"Rasterio error reading {tile_filepath}: {e}") return None except FileNotFoundError: logging.error(f"HGT file not found for reading: {tile_filepath}") return None except Exception as e: logging.error( f"Unexpected error reading elevation from {tile_filepath}: {e}", exc_info=True, ) return None def get_hgt_data(self, latitude: float, longitude: float) -> Optional[np.ndarray]: """Reads and returns the raw HGT data array for the corresponding tile.""" if not RASTERIO_AVAILABLE: logging.error("Cannot get HGT data: Rasterio library is not installed.") return None tile_info = self.get_tile_info(latitude, longitude) if not tile_info or not tile_info.get("hgt_available"): logging.warning( f"HGT data array not available for ({latitude},{longitude})." ) return None tile_filepath = tile_info["hgt_file_path"] if not tile_filepath: return None try: with rasterio.open(tile_filepath) as dataset: data_array = dataset.read(1) logging.info( f"Successfully read HGT data array from {tile_filepath} (shape: {data_array.shape})" ) return data_array except Exception as e: logging.error( f"Failed to read HGT data from {tile_filepath}: {e}", exc_info=True ) return None def get_browse_image_path(self, latitude: float, longitude: float) -> Optional[str]: """Returns the local path to the browse image for the tile, if available.""" # Questo è un wrapper più semplice, riutilizza get_tile_info tile_info = self.get_tile_info(latitude, longitude) if tile_info and tile_info.get("browse_available"): return tile_info["browse_image_path"] # Non loggare qui, get_tile_info logga già se non trovato return None def get_area_tile_info( self, min_lat: float, min_lon: float, max_lat: float, max_lon: float ) -> List[Dict]: """ Returns a list of tile info dictionaries for all tiles within the bounding box. Does NOT trigger downloads, expects tiles to be potentially downloaded via download_area. """ tile_info_list = [] try: # ... (validazione input bounds come in download_area) ... if not ( -90 <= min_lat < 90 and -90 <= max_lat < 90 and -180 <= min_lon < 180 and -180 <= max_lon < 180 ): raise ValueError("Coords out of valid range.") if min_lat >= max_lat or min_lon >= max_lon: raise ValueError("Invalid area bounds.") except ValueError as e: logging.error(f"Invalid input for get_area_tile_info: {e}") return [] start_lat = math.floor(min_lat) end_lat = math.floor(max_lat) start_lon = math.floor(min_lon) end_lon = math.floor(max_lon) logging.info( f"Getting info for tiles in range: Lat [{start_lat}, {end_lat}], Lon [{start_lon}, {end_lon}]" ) for lat_coord in range(int(start_lat), int(end_lat) + 1): if not (-60 <= lat_coord < 60): continue # Limiti copertura for lon_coord in range(int(start_lon), int(end_lon) + 1): # Ricostruisci info senza triggerare download qui tile_base_name = self._get_nasadem_tile_base_name(lat_coord, lon_coord) local_hgt_path = self._get_local_hgt_path(lat_coord, lon_coord) local_browse_path = self._get_local_browse_path(lat_coord, lon_coord) info = { "latitude_coord": lat_coord, "longitude_coord": lon_coord, "tile_base_name": tile_base_name, "hgt_file_path": ( local_hgt_path if os.path.exists(local_hgt_path) else None ), "browse_image_path": ( local_browse_path if os.path.exists(local_browse_path) else None ), "hgt_available": ( os.path.exists(local_hgt_path) if local_hgt_path else False ), "browse_available": ( os.path.exists(local_browse_path) if local_browse_path else False ), } tile_info_list.append(info) return tile_info_list def download_area( self, min_lat: float, min_lon: float, max_lat: float, max_lon: float ) -> Tuple[int, int]: """Downloads HGT and attempts browse download for all tiles in the area.""" logging.info( f"Starting HGT download & browse check for area: Lat [{min_lat}, {max_lat}], Lon [{min_lon}, {max_lon}]" ) try: if not ( -90 <= min_lat < 90 and -90 <= max_lat < 90 and -180 <= min_lon < 180 and -180 <= max_lon < 180 ): raise ValueError("Coords out of valid range.") if min_lat >= max_lat or min_lon >= max_lon: raise ValueError("Invalid area bounds.") except ValueError as e: logging.error(f"Invalid input for download_area: {e}") return 0, 0 start_lat = math.floor(min_lat) end_lat = math.floor(max_lat) start_lon = math.floor(min_lon) end_lon = math.floor(max_lon) logging.info( f"Tile coordinate range: Lat [{start_lat}, {end_lat}], Lon [{start_lon}, {end_lon}]" ) tiles_processed = 0 hgt_tiles_obtained = 0 # Conta HGT ottenuti/trovati con successo for lat_coord in range(int(start_lat), int(end_lat) + 1): if not (-60 <= lat_coord < 60): continue for lon_coord in range(int(start_lon), int(end_lon) + 1): tiles_processed += 1 logging.debug( f"Ensuring tile available for ({lat_coord},{lon_coord}) in download_area." ) # Questa chiamata tenta download HGT e poi browse if self._ensure_tile_available(lat_coord, lon_coord): hgt_tiles_obtained += 1 # Errore già loggato internamente se fallisce logging.info( f"Area processing complete. Tiles checked: {tiles_processed}, HGT tiles successfully obtained/found: {hgt_tiles_obtained}" ) return tiles_processed, hgt_tiles_obtained # --- End of ElevationManager class ---