# elevation_manager.py import os import math import logging import netrc import time import zipfile import io from typing import Tuple, Optional import requests from requests.auth import HTTPBasicAuth import rasterio from rasterio.errors import RasterioIOError logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # --- Configuration --- # URL Base per i DATI HGT (protetti, richiedono auth) NASADEM_DATA_BASE_URL = "https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/NASADEM_HGT.001/" # URL Base per le IMMAGINI BROWSE (pubbliche, senza auth) NASADEM_BROWSE_BASE_URL = "https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-public/NASADEM_HGT.001/" EARTHDATA_MACHINE = 'urs.earthdata.nasa.gov' # Estensione e suffisso per le immagini browse pubbliche BROWSE_IMAGE_SUFFIX_VERSION = ".1" # Il ".1" osservato BROWSE_IMAGE_EXTENSION = ".jpg" # --- End Configuration --- class ElevationManager: """ Manages NASADEM HGT v001 data & browse images from NASA Earthdata Cloud. Downloads protected HGT data (zip) using .netrc auth. Downloads public browse images (jpg) without auth. """ def __init__(self, tile_directory: str = "map_elevation"): # ... (init come prima, incluso _read_netrc_credentials) ... if not isinstance(tile_directory, str) or not tile_directory: raise ValueError("tile_directory must be a non-empty string") self.tile_directory: str = tile_directory self.browse_directory: str = os.path.join(tile_directory, "browse_images") self.nodata_value: Optional[int] = -32768 self.earthdata_credentials: Optional[Tuple[str, str]] = None self._read_netrc_credentials() try: os.makedirs(self.tile_directory, exist_ok=True) os.makedirs(self.browse_directory, exist_ok=True) logging.info(f"Tile cache directory set to: {self.tile_directory}") logging.info(f"Browse image cache directory set to: {self.browse_directory}") except OSError as e: logging.error(f"Failed to create directories under {tile_directory}: {e}") raise def _read_netrc_credentials(self): # ... (come prima) ... try: netrc_path = os.path.join(os.path.expanduser("~"), ".netrc") if not os.path.exists(netrc_path): logging.error(f"Authentication Error: .netrc file not found at {netrc_path}.") return netrc_info = netrc.netrc(netrc_path) auth_tokens = netrc_info.authenticators(EARTHDATA_MACHINE) if auth_tokens: username, _, password = auth_tokens if username and password: self.earthdata_credentials = (username, password) logging.info(f"Successfully read credentials for {EARTHDATA_MACHINE} from .netrc.") else: logging.error(f"Incomplete credentials found for {EARTHDATA_MACHINE} in .netrc.") else: logging.error(f"No credentials found for machine '{EARTHDATA_MACHINE}' in {netrc_path}.") except (FileNotFoundError, netrc.NetrcParseError) as e: logging.error(f"Error reading or parsing .netrc file: {e}") except Exception as e: logging.error(f"Unexpected error reading .netrc credentials: {e}") def _get_tile_base_coordinates(self, latitude: float, longitude: float) -> Tuple[int, int]: # ... (come prima) ... if longitude >= 180.0: longitude = 179.9999999 if longitude < -180.0: longitude = -180.0 if not (-90 <= latitude < 90): raise ValueError("Latitude must be between -90 and < 90 degrees.") lat_coord = math.floor(latitude) lon_coord = math.floor(longitude) return int(lat_coord), int(lon_coord) def _get_nasadem_tile_base_name(self, lat_coord: int, lon_coord: int) -> str: # ... (come prima, es. 'n45e007') ... lat_prefix = 'n' if lat_coord >= 0 else 's' lon_prefix = 'e' if lon_coord >= 0 else 'w' base_name = f"{lat_prefix}{abs(lat_coord):02d}{lon_prefix}{abs(lon_coord):03d}" return base_name # === MODIFICATO: Usa URL base diversi e nomi file diversi === def _get_download_url(self, tile_base_name: str, file_type: str = 'data') -> Optional[str]: """Constructs the download URL based on file type (data or browse).""" directory_name = f"NASADEM_HGT_{tile_base_name}" # Es: NASADEM_HGT_n45e007 if file_type == 'data': if not NASADEM_DATA_BASE_URL: logging.error("NASADEM_DATA_BASE_URL is not configured.") return None zip_filename = f"{directory_name}.zip" url = f"{NASADEM_DATA_BASE_URL.rstrip('/')}/{directory_name}/{zip_filename}" logging.debug(f"Constructed data URL: {url}") return url elif file_type == 'browse': if not NASADEM_BROWSE_BASE_URL: logging.error("NASADEM_BROWSE_BASE_URL is not configured.") return None # Costruisce nome file browse con suffisso/versione e estensione browse_filename = f"{directory_name}{BROWSE_IMAGE_SUFFIX_VERSION}{BROWSE_IMAGE_EXTENSION}" # Es: NASADEM_HGT_n45e007.1.jpg url = f"{NASADEM_BROWSE_BASE_URL.rstrip('/')}/{directory_name}/{browse_filename}" logging.debug(f"Constructed browse URL: {url}") return url else: logging.error(f"Invalid file_type specified: {file_type}") return None def _get_local_tile_path(self, lat_coord: int, lon_coord: int) -> str: # ... (come prima, percorso per .hgt estratto) ... lat_prefix = 'N' if lat_coord >= 0 else 'S' lon_prefix = 'E' if lon_coord >= 0 else 'W' hgt_filename = f"{lat_prefix}{abs(lat_coord):02d}{lon_prefix}{abs(lon_coord):03d}.hgt" return os.path.join(self.tile_directory, hgt_filename) # === MODIFICATO: Usa il nome file browse corretto per il path locale === def _get_local_browse_path(self, lat_coord: int, lon_coord: int) -> str: """Gets the full local path for the downloaded browse image file.""" tile_base_name = self._get_nasadem_tile_base_name(lat_coord, lon_coord) # Usa suffisso/versione ed estensione corretti per il nome locale browse_filename = f"NASADEM_HGT_{tile_base_name}{BROWSE_IMAGE_SUFFIX_VERSION}{BROWSE_IMAGE_EXTENSION}" # Es: NASADEM_HGT_n45e007.1.jpg return os.path.join(self.browse_directory, browse_filename) # === MODIFICATO: Separato download dati (con auth) e browse (senza auth) === def _download_and_unzip_tile(self, lat_coord: int, lon_coord: int, max_retries: int = 1, retry_delay: int = 5) -> bool: """ Downloads zipped HGT (using auth), extracts HGT. Returns True on HGT success. Does NOT download browse image here anymore. """ if not self.earthdata_credentials: logging.error("Cannot attempt HGT download: Earthdata credentials not loaded.") return False tile_base_name = self._get_nasadem_tile_base_name(lat_coord, lon_coord) local_hgt_path = self._get_local_tile_path(lat_coord, lon_coord) data_download_url = self._get_download_url(tile_base_name, file_type='data') if not data_download_url: return False logging.info(f"Attempting to download DATA ZIP: {data_download_url}") headers = {'User-Agent': 'Mozilla/5.0 ...'} # User agent come prima auth = HTTPBasicAuth(self.earthdata_credentials[0], self.earthdata_credentials[1]) session = requests.Session() hgt_download_ok = False zip_content = None # Blocco Download ZIP Dati (invariato, usa auth) for attempt in range(max_retries + 1): try: response = session.get(data_download_url, stream=True, timeout=60, headers=headers, auth=auth, allow_redirects=True) response.raise_for_status() zip_content = io.BytesIO(response.content) logging.debug("Data ZIP downloaded successfully.") hgt_download_ok = True break except requests.exceptions.HTTPError as e: status_code = e.response.status_code logging.error(f"HTTP error downloading DATA {data_download_url}: Status {status_code}, Response: {e.response.text[:200]}") if status_code in [404, 401, 403] or status_code < 500 : break except requests.exceptions.RequestException as e: logging.warning(f"Network error ({type(e).__name__}) downloading DATA {data_download_url}. Retrying... (Attempt {attempt + 1}/{max_retries + 1})") except Exception as e: logging.error(f"Unexpected error during DATA ZIP download {tile_base_name}.zip: {e}", exc_info=True) session.close(); return False if attempt < max_retries: time.sleep(retry_delay) else: logging.error(f"DATA ZIP Download failed for {data_download_url} after {max_retries + 1} attempts.") session.close() # Chiudi sessione dopo download dati # Blocco Estrazione HGT (invariato) hgt_extraction_ok = False if hgt_download_ok and zip_content: try: with zipfile.ZipFile(zip_content, 'r') as zip_ref: hgt_files_in_zip = [name for name in zip_ref.namelist() if name.lower().endswith('.hgt')] if not hgt_files_in_zip: logging.error(f"No .hgt file found inside {tile_base_name}.zip") else: hgt_filename_in_zip = hgt_files_in_zip[0] with open(local_hgt_path, 'wb') as f_out: f_out.write(zip_ref.read(hgt_filename_in_zip)) logging.info(f"Successfully extracted HGT to: {local_hgt_path}") hgt_extraction_ok = True except zipfile.BadZipFile: logging.error(f"Downloaded {data_download_url} is not a valid zip.") except Exception as e: logging.error(f"Failed to extract {tile_base_name}.zip: {e}", exc_info=True) return hgt_extraction_ok # Restituisce successo solo per HGT # === NUOVO METODO: Download separato per browse image (senza auth) === def _download_browse_image(self, lat_coord: int, lon_coord: int, max_retries: int = 1, retry_delay: int = 3) -> bool: """Downloads the public browse image file if it doesn't exist locally.""" tile_base_name = self._get_nasadem_tile_base_name(lat_coord, lon_coord) local_browse_path = self._get_local_browse_path(lat_coord, lon_coord) browse_download_url = self._get_download_url(tile_base_name, file_type='browse') if os.path.exists(local_browse_path): logging.debug(f"Browse image already exists locally: {local_browse_path}") return True if not browse_download_url: return False logging.info(f"Attempting to download BROWSE IMAGE: {browse_download_url}") headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } session = requests.Session() # Nuova sessione, non autenticata download_successful = False browse_content = None # Blocco Download Contenuto Immagine for attempt in range(max_retries + 1): try: # Usiamo stream=False, assumendo browse non enormi, per semplicità browse_response = session.get(browse_download_url, stream=False, timeout=30, headers=headers, allow_redirects=True) browse_response.raise_for_status() browse_content = browse_response.content if not browse_content: logging.warning(f"Downloaded browse image content is empty for {browse_download_url}.") break # Tratta come fallimento se vuoto logging.debug(f"Browse image content downloaded successfully (size: {len(browse_content)} bytes).") download_successful = True break # Successo except requests.exceptions.HTTPError as e_img: status_code = e_img.response.status_code logging.warning(f"Failed to download BROWSE image {browse_download_url}: Status {status_code}") if status_code < 500: break # Non ritentare errori client except requests.exceptions.RequestException as e_img: logging.warning(f"Network error downloading BROWSE image {browse_download_url}: {e_img}. Retrying... (Attempt {attempt + 1}/{max_retries + 1})") except Exception as e_img: logging.warning(f"Unexpected error during BROWSE image download request {browse_download_url}: {e_img}") break if attempt < max_retries: time.sleep(retry_delay) else: logging.error(f"BROWSE image download failed for {browse_download_url} after {max_retries + 1} attempts.") session.close() # Blocco Scrittura File (solo se download OK e contenuto non vuoto) if download_successful and browse_content: try: # === DIAGNOSI: Controlla/Crea directory PRIMA di aprire === browse_dir = os.path.dirname(local_browse_path) if not os.path.exists(browse_dir): logging.warning(f"Browse directory '{browse_dir}' not found before writing. Attempting to create.") try: os.makedirs(browse_dir, exist_ok=True) # Tenta (ri)creazione logging.info(f"Successfully created browse directory: {browse_dir}") except Exception as mkdir_e: logging.error(f"Failed to create browse directory '{browse_dir}': {mkdir_e}") return False # Impossibile salvare else: logging.debug(f"Browse directory '{browse_dir}' exists.") # === FINE DIAGNOSI === logging.debug(f"Attempting to write browse image to: {local_browse_path}") with open(local_browse_path, 'wb') as f_img: f_img.write(browse_content) # Scrivi contenuto in un colpo solo logging.info(f"Successfully saved browse image to: {local_browse_path}") return True # Scrittura riuscita except (IOError, OSError) as e_write: logging.error(f"Failed to write browse image to {local_browse_path}: {e_write}") if os.path.exists(local_browse_path): try: os.remove(local_browse_path) # Tenta rimozione parziale except Exception as e_del: logging.warning(f"Could not remove partial file {local_browse_path}: {e_del}") return False except Exception as e_other: logging.error(f"Unexpected error saving browse image to {local_browse_path}: {e_other}", exc_info=True) return False else: logging.debug("Browse image download did not succeed or content was empty, skipping save.") return False # Fallito # === MODIFICATO: Chiama download separati === def _ensure_tile_available(self, lat_coord: int, lon_coord: int) -> bool: """ Ensures HGT is available (downloads+extracts if needed). Independently ensures browse image is available (downloads if needed). Returns True if HGT is available, False otherwise. """ hgt_local_path = self._get_local_tile_path(lat_coord, lon_coord) hgt_available = False if os.path.exists(hgt_local_path): logging.debug(f"HGT Tile already exists locally: {hgt_local_path}") hgt_available = True else: logging.info(f"Extracted HGT tile not found locally, attempting download and extraction: {hgt_local_path}") # Tenta download+estrazione HGT if self._download_and_unzip_tile(lat_coord, lon_coord): hgt_available = True else: logging.error(f"Failed to obtain HGT tile for coordinates ({lat_coord},{lon_coord}).") return False # Se HGT fallisce, l'operazione principale fallisce # Se HGT è disponibile (o era già presente), tenta il download del browse (se non già presente) if hgt_available: self._download_browse_image(lat_coord, lon_coord) # Tenta download browse, ignora risultato bool qui return hgt_available # Ritorna successo basato solo su HGT # === Metodo get_browse_image_path rimane invariato === def get_browse_image_path(self, latitude: float, longitude: float) -> Optional[str]: """Returns the local path to the browse image, if it exists.""" try: lat_coord, lon_coord = self._get_tile_base_coordinates(latitude, longitude) local_browse_path = self._get_local_browse_path(lat_coord, lon_coord) if os.path.exists(local_browse_path): return local_browse_path else: logging.debug(f"Browse image not found locally at expected path: {local_browse_path}") return None except ValueError: return None except Exception as e: logging.error(f"Error getting browse image path: {e}"); return None # === Metodo get_elevation rimane invariato === def get_elevation(self, latitude: float, longitude: float) -> Optional[float]: """Gets elevation using cached NASADEM data. Ensures HGT and attempts browse download.""" logging.info(f"Requesting elevation for Lat: {latitude}, Lon: {longitude}") try: lat_coord, lon_coord = self._get_tile_base_coordinates(latitude, longitude) except ValueError as e: logging.error(f"Invalid coordinates: {e}"); return None # Questa chiamata ora gestisce HGT e browse separatamente if not self._ensure_tile_available(lat_coord, lon_coord): # Errore già loggato da _ensure_tile_available se HGT fallisce return None tile_filepath = self._get_local_tile_path(lat_coord, lon_coord) # ... (resto della logica di lettura rasterio invariata) ... try: with rasterio.open(tile_filepath) as dataset: nodata = dataset.nodata if dataset.nodata is not None else self.nodata_value try: row, col = dataset.index(longitude, latitude) except IndexError: logging.warning(f"Coords ({longitude},{latitude}) outside tile bounds {tile_filepath}."); return None if not (0 <= row < dataset.height and 0 <= col < dataset.width): logging.warning(f"Pixel ({row},{col}) outside raster dims."); return None window = rasterio.windows.Window(col_off=col, row_off=row, width=1, height=1) elevation_value = dataset.read(1, window=window)[0, 0] if nodata is not None and int(elevation_value) == int(nodata): logging.warning(f"Coords fall on nodata point."); return float('nan') if not isinstance(elevation_value, (int, float, complex)) and not hasattr(elevation_value, 'item'): logging.error(f"Unexpected raster data type: {type(elevation_value)}"); return None elevation_meters = float(elevation_value) logging.info(f"Elevation found: {elevation_meters:.2f} meters") return elevation_meters except RasterioIOError as e: logging.error(f"Error reading raster file {tile_filepath}: {e}"); return None except FileNotFoundError: logging.error(f"Raster file not found despite check: {tile_filepath}"); return None except Exception as e: logging.error(f"Unexpected error reading elevation: {e}", exc_info=True); return None # === Metodo download_area modificato per chiamare _ensure_tile_available === def download_area(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> Tuple[int, int]: """Downloads HGT and browse images for all tiles in the specified area.""" logging.info(f"Starting HGT download & browse check for area: Lat [{min_lat}, {max_lat}], Lon [{min_lon}, {max_lon}]") try: # ... (validazione input invariata) ... if not (-90 <= min_lat < 90 and -90 <= max_lat < 90 and -180 <= min_lon < 180 and -180 <= max_lon < 180): raise ValueError("Coords out of valid range.") if min_lat >= max_lat or min_lon >= max_lon: raise ValueError("Invalid area bounds.") except ValueError as e: logging.error(f"Invalid input for download_area: {e}"); return 0, 0 start_lat_coord = math.floor(min_lat) end_lat_coord = math.floor(max_lat) start_lon_coord = math.floor(min_lon) end_lon_coord = math.floor(max_lon) logging.info(f"Tile coordinate range: Lat [{start_lat_coord}, {end_lat_coord}], Lon [{start_lon_coord}, {end_lon_coord}]") tiles_processed = 0 hgt_tiles_obtained = 0 # Conta HGT ottenuti con successo for lat_coord in range(int(start_lat_coord), int(end_lat_coord) + 1): if not (-60 <= lat_coord < 60): continue # Salta fuori copertura for lon_coord in range(int(start_lon_coord), int(end_lon_coord) + 1): tiles_processed += 1 logging.info(f"Ensuring tile available for coordinates: Lat {lat_coord}, Lon {lon_coord}") # _ensure_tile_available gestisce download HGT e browse if self._ensure_tile_available(lat_coord, lon_coord): hgt_tiles_obtained += 1 # L'errore per HGT è già loggato dentro _ensure_tile_available logging.info(f"Area processing complete. Tiles checked: {tiles_processed}, HGT tiles successfully obtained/found: {hgt_tiles_obtained}") return tiles_processed, hgt_tiles_obtained # --- End of ElevationManager class ---