SXXXXXXX_GeoElevation/elevation_manager.py
2025-04-14 12:17:43 +02:00

392 lines
22 KiB
Python

# elevation_manager.py
import os
import math
import logging
import netrc
import time
import zipfile
import io
from typing import Tuple, Optional
import requests
from requests.auth import HTTPBasicAuth
import rasterio
from rasterio.errors import RasterioIOError
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Configuration ---
# URL Base per i DATI HGT (protetti, richiedono auth)
NASADEM_DATA_BASE_URL = "https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/NASADEM_HGT.001/"
# URL Base per le IMMAGINI BROWSE (pubbliche, senza auth)
NASADEM_BROWSE_BASE_URL = "https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-public/NASADEM_HGT.001/"
EARTHDATA_MACHINE = 'urs.earthdata.nasa.gov'
# Estensione e suffisso per le immagini browse pubbliche
BROWSE_IMAGE_SUFFIX_VERSION = ".1" # Il ".1" osservato
BROWSE_IMAGE_EXTENSION = ".jpg"
# --- End Configuration ---
class ElevationManager:
"""
Manages NASADEM HGT v001 data & browse images from NASA Earthdata Cloud.
Downloads protected HGT data (zip) using .netrc auth.
Downloads public browse images (jpg) without auth.
"""
def __init__(self, tile_directory: str = "map_elevation"):
# ... (init come prima, incluso _read_netrc_credentials) ...
if not isinstance(tile_directory, str) or not tile_directory:
raise ValueError("tile_directory must be a non-empty string")
self.tile_directory: str = tile_directory
self.browse_directory: str = os.path.join(tile_directory, "browse_images")
self.nodata_value: Optional[int] = -32768
self.earthdata_credentials: Optional[Tuple[str, str]] = None
self._read_netrc_credentials()
try:
os.makedirs(self.tile_directory, exist_ok=True)
os.makedirs(self.browse_directory, exist_ok=True)
logging.info(f"Tile cache directory set to: {self.tile_directory}")
logging.info(f"Browse image cache directory set to: {self.browse_directory}")
except OSError as e:
logging.error(f"Failed to create directories under {tile_directory}: {e}")
raise
def _read_netrc_credentials(self):
# ... (come prima) ...
try:
netrc_path = os.path.join(os.path.expanduser("~"), ".netrc")
if not os.path.exists(netrc_path):
logging.error(f"Authentication Error: .netrc file not found at {netrc_path}.")
return
netrc_info = netrc.netrc(netrc_path)
auth_tokens = netrc_info.authenticators(EARTHDATA_MACHINE)
if auth_tokens:
username, _, password = auth_tokens
if username and password:
self.earthdata_credentials = (username, password)
logging.info(f"Successfully read credentials for {EARTHDATA_MACHINE} from .netrc.")
else:
logging.error(f"Incomplete credentials found for {EARTHDATA_MACHINE} in .netrc.")
else:
logging.error(f"No credentials found for machine '{EARTHDATA_MACHINE}' in {netrc_path}.")
except (FileNotFoundError, netrc.NetrcParseError) as e:
logging.error(f"Error reading or parsing .netrc file: {e}")
except Exception as e:
logging.error(f"Unexpected error reading .netrc credentials: {e}")
def _get_tile_base_coordinates(self, latitude: float, longitude: float) -> Tuple[int, int]:
# ... (come prima) ...
if longitude >= 180.0: longitude = 179.9999999
if longitude < -180.0: longitude = -180.0
if not (-90 <= latitude < 90): raise ValueError("Latitude must be between -90 and < 90 degrees.")
lat_coord = math.floor(latitude)
lon_coord = math.floor(longitude)
return int(lat_coord), int(lon_coord)
def _get_nasadem_tile_base_name(self, lat_coord: int, lon_coord: int) -> str:
# ... (come prima, es. 'n45e007') ...
lat_prefix = 'n' if lat_coord >= 0 else 's'
lon_prefix = 'e' if lon_coord >= 0 else 'w'
base_name = f"{lat_prefix}{abs(lat_coord):02d}{lon_prefix}{abs(lon_coord):03d}"
return base_name
# === MODIFICATO: Usa URL base diversi e nomi file diversi ===
def _get_download_url(self, tile_base_name: str, file_type: str = 'data') -> Optional[str]:
"""Constructs the download URL based on file type (data or browse)."""
directory_name = f"NASADEM_HGT_{tile_base_name}" # Es: NASADEM_HGT_n45e007
if file_type == 'data':
if not NASADEM_DATA_BASE_URL:
logging.error("NASADEM_DATA_BASE_URL is not configured.")
return None
zip_filename = f"{directory_name}.zip"
url = f"{NASADEM_DATA_BASE_URL.rstrip('/')}/{directory_name}/{zip_filename}"
logging.debug(f"Constructed data URL: {url}")
return url
elif file_type == 'browse':
if not NASADEM_BROWSE_BASE_URL:
logging.error("NASADEM_BROWSE_BASE_URL is not configured.")
return None
# Costruisce nome file browse con suffisso/versione e estensione
browse_filename = f"{directory_name}{BROWSE_IMAGE_SUFFIX_VERSION}{BROWSE_IMAGE_EXTENSION}" # Es: NASADEM_HGT_n45e007.1.jpg
url = f"{NASADEM_BROWSE_BASE_URL.rstrip('/')}/{directory_name}/{browse_filename}"
logging.debug(f"Constructed browse URL: {url}")
return url
else:
logging.error(f"Invalid file_type specified: {file_type}")
return None
def _get_local_tile_path(self, lat_coord: int, lon_coord: int) -> str:
# ... (come prima, percorso per .hgt estratto) ...
lat_prefix = 'N' if lat_coord >= 0 else 'S'
lon_prefix = 'E' if lon_coord >= 0 else 'W'
hgt_filename = f"{lat_prefix}{abs(lat_coord):02d}{lon_prefix}{abs(lon_coord):03d}.hgt"
return os.path.join(self.tile_directory, hgt_filename)
# === MODIFICATO: Usa il nome file browse corretto per il path locale ===
def _get_local_browse_path(self, lat_coord: int, lon_coord: int) -> str:
"""Gets the full local path for the downloaded browse image file."""
tile_base_name = self._get_nasadem_tile_base_name(lat_coord, lon_coord)
# Usa suffisso/versione ed estensione corretti per il nome locale
browse_filename = f"NASADEM_HGT_{tile_base_name}{BROWSE_IMAGE_SUFFIX_VERSION}{BROWSE_IMAGE_EXTENSION}" # Es: NASADEM_HGT_n45e007.1.jpg
return os.path.join(self.browse_directory, browse_filename)
# === MODIFICATO: Separato download dati (con auth) e browse (senza auth) ===
def _download_and_unzip_tile(self, lat_coord: int, lon_coord: int, max_retries: int = 1, retry_delay: int = 5) -> bool:
"""
Downloads zipped HGT (using auth), extracts HGT. Returns True on HGT success.
Does NOT download browse image here anymore.
"""
if not self.earthdata_credentials:
logging.error("Cannot attempt HGT download: Earthdata credentials not loaded.")
return False
tile_base_name = self._get_nasadem_tile_base_name(lat_coord, lon_coord)
local_hgt_path = self._get_local_tile_path(lat_coord, lon_coord)
data_download_url = self._get_download_url(tile_base_name, file_type='data')
if not data_download_url: return False
logging.info(f"Attempting to download DATA ZIP: {data_download_url}")
headers = {'User-Agent': 'Mozilla/5.0 ...'} # User agent come prima
auth = HTTPBasicAuth(self.earthdata_credentials[0], self.earthdata_credentials[1])
session = requests.Session()
hgt_download_ok = False
zip_content = None
# Blocco Download ZIP Dati (invariato, usa auth)
for attempt in range(max_retries + 1):
try:
response = session.get(data_download_url, stream=True, timeout=60, headers=headers, auth=auth, allow_redirects=True)
response.raise_for_status()
zip_content = io.BytesIO(response.content)
logging.debug("Data ZIP downloaded successfully.")
hgt_download_ok = True
break
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code
logging.error(f"HTTP error downloading DATA {data_download_url}: Status {status_code}, Response: {e.response.text[:200]}")
if status_code in [404, 401, 403] or status_code < 500 : break
except requests.exceptions.RequestException as e:
logging.warning(f"Network error ({type(e).__name__}) downloading DATA {data_download_url}. Retrying... (Attempt {attempt + 1}/{max_retries + 1})")
except Exception as e:
logging.error(f"Unexpected error during DATA ZIP download {tile_base_name}.zip: {e}", exc_info=True)
session.close(); return False
if attempt < max_retries: time.sleep(retry_delay)
else: logging.error(f"DATA ZIP Download failed for {data_download_url} after {max_retries + 1} attempts.")
session.close() # Chiudi sessione dopo download dati
# Blocco Estrazione HGT (invariato)
hgt_extraction_ok = False
if hgt_download_ok and zip_content:
try:
with zipfile.ZipFile(zip_content, 'r') as zip_ref:
hgt_files_in_zip = [name for name in zip_ref.namelist() if name.lower().endswith('.hgt')]
if not hgt_files_in_zip:
logging.error(f"No .hgt file found inside {tile_base_name}.zip")
else:
hgt_filename_in_zip = hgt_files_in_zip[0]
with open(local_hgt_path, 'wb') as f_out:
f_out.write(zip_ref.read(hgt_filename_in_zip))
logging.info(f"Successfully extracted HGT to: {local_hgt_path}")
hgt_extraction_ok = True
except zipfile.BadZipFile: logging.error(f"Downloaded {data_download_url} is not a valid zip.")
except Exception as e: logging.error(f"Failed to extract {tile_base_name}.zip: {e}", exc_info=True)
return hgt_extraction_ok # Restituisce successo solo per HGT
# === NUOVO METODO: Download separato per browse image (senza auth) ===
def _download_browse_image(self, lat_coord: int, lon_coord: int, max_retries: int = 1, retry_delay: int = 3) -> bool:
"""Downloads the public browse image file if it doesn't exist locally."""
tile_base_name = self._get_nasadem_tile_base_name(lat_coord, lon_coord)
local_browse_path = self._get_local_browse_path(lat_coord, lon_coord)
browse_download_url = self._get_download_url(tile_base_name, file_type='browse')
if os.path.exists(local_browse_path):
logging.debug(f"Browse image already exists locally: {local_browse_path}")
return True
if not browse_download_url: return False
logging.info(f"Attempting to download BROWSE IMAGE: {browse_download_url}")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
session = requests.Session() # Nuova sessione, non autenticata
download_successful = False
browse_content = None
# Blocco Download Contenuto Immagine
for attempt in range(max_retries + 1):
try:
# Usiamo stream=False, assumendo browse non enormi, per semplicità
browse_response = session.get(browse_download_url, stream=False, timeout=30, headers=headers, allow_redirects=True)
browse_response.raise_for_status()
browse_content = browse_response.content
if not browse_content:
logging.warning(f"Downloaded browse image content is empty for {browse_download_url}.")
break # Tratta come fallimento se vuoto
logging.debug(f"Browse image content downloaded successfully (size: {len(browse_content)} bytes).")
download_successful = True
break # Successo
except requests.exceptions.HTTPError as e_img:
status_code = e_img.response.status_code
logging.warning(f"Failed to download BROWSE image {browse_download_url}: Status {status_code}")
if status_code < 500: break # Non ritentare errori client
except requests.exceptions.RequestException as e_img:
logging.warning(f"Network error downloading BROWSE image {browse_download_url}: {e_img}. Retrying... (Attempt {attempt + 1}/{max_retries + 1})")
except Exception as e_img:
logging.warning(f"Unexpected error during BROWSE image download request {browse_download_url}: {e_img}")
break
if attempt < max_retries: time.sleep(retry_delay)
else: logging.error(f"BROWSE image download failed for {browse_download_url} after {max_retries + 1} attempts.")
session.close()
# Blocco Scrittura File (solo se download OK e contenuto non vuoto)
if download_successful and browse_content:
try:
# === DIAGNOSI: Controlla/Crea directory PRIMA di aprire ===
browse_dir = os.path.dirname(local_browse_path)
if not os.path.exists(browse_dir):
logging.warning(f"Browse directory '{browse_dir}' not found before writing. Attempting to create.")
try:
os.makedirs(browse_dir, exist_ok=True) # Tenta (ri)creazione
logging.info(f"Successfully created browse directory: {browse_dir}")
except Exception as mkdir_e:
logging.error(f"Failed to create browse directory '{browse_dir}': {mkdir_e}")
return False # Impossibile salvare
else:
logging.debug(f"Browse directory '{browse_dir}' exists.")
# === FINE DIAGNOSI ===
logging.debug(f"Attempting to write browse image to: {local_browse_path}")
with open(local_browse_path, 'wb') as f_img:
f_img.write(browse_content) # Scrivi contenuto in un colpo solo
logging.info(f"Successfully saved browse image to: {local_browse_path}")
return True # Scrittura riuscita
except (IOError, OSError) as e_write:
logging.error(f"Failed to write browse image to {local_browse_path}: {e_write}")
if os.path.exists(local_browse_path):
try: os.remove(local_browse_path) # Tenta rimozione parziale
except Exception as e_del: logging.warning(f"Could not remove partial file {local_browse_path}: {e_del}")
return False
except Exception as e_other:
logging.error(f"Unexpected error saving browse image to {local_browse_path}: {e_other}", exc_info=True)
return False
else:
logging.debug("Browse image download did not succeed or content was empty, skipping save.")
return False # Fallito
# === MODIFICATO: Chiama download separati ===
def _ensure_tile_available(self, lat_coord: int, lon_coord: int) -> bool:
"""
Ensures HGT is available (downloads+extracts if needed).
Independently ensures browse image is available (downloads if needed).
Returns True if HGT is available, False otherwise.
"""
hgt_local_path = self._get_local_tile_path(lat_coord, lon_coord)
hgt_available = False
if os.path.exists(hgt_local_path):
logging.debug(f"HGT Tile already exists locally: {hgt_local_path}")
hgt_available = True
else:
logging.info(f"Extracted HGT tile not found locally, attempting download and extraction: {hgt_local_path}")
# Tenta download+estrazione HGT
if self._download_and_unzip_tile(lat_coord, lon_coord):
hgt_available = True
else:
logging.error(f"Failed to obtain HGT tile for coordinates ({lat_coord},{lon_coord}).")
return False # Se HGT fallisce, l'operazione principale fallisce
# Se HGT è disponibile (o era già presente), tenta il download del browse (se non già presente)
if hgt_available:
self._download_browse_image(lat_coord, lon_coord) # Tenta download browse, ignora risultato bool qui
return hgt_available # Ritorna successo basato solo su HGT
# === Metodo get_browse_image_path rimane invariato ===
def get_browse_image_path(self, latitude: float, longitude: float) -> Optional[str]:
"""Returns the local path to the browse image, if it exists."""
try:
lat_coord, lon_coord = self._get_tile_base_coordinates(latitude, longitude)
local_browse_path = self._get_local_browse_path(lat_coord, lon_coord)
if os.path.exists(local_browse_path):
return local_browse_path
else:
logging.debug(f"Browse image not found locally at expected path: {local_browse_path}")
return None
except ValueError: return None
except Exception as e: logging.error(f"Error getting browse image path: {e}"); return None
# === Metodo get_elevation rimane invariato ===
def get_elevation(self, latitude: float, longitude: float) -> Optional[float]:
"""Gets elevation using cached NASADEM data. Ensures HGT and attempts browse download."""
logging.info(f"Requesting elevation for Lat: {latitude}, Lon: {longitude}")
try:
lat_coord, lon_coord = self._get_tile_base_coordinates(latitude, longitude)
except ValueError as e: logging.error(f"Invalid coordinates: {e}"); return None
# Questa chiamata ora gestisce HGT e browse separatamente
if not self._ensure_tile_available(lat_coord, lon_coord):
# Errore già loggato da _ensure_tile_available se HGT fallisce
return None
tile_filepath = self._get_local_tile_path(lat_coord, lon_coord)
# ... (resto della logica di lettura rasterio invariata) ...
try:
with rasterio.open(tile_filepath) as dataset:
nodata = dataset.nodata if dataset.nodata is not None else self.nodata_value
try: row, col = dataset.index(longitude, latitude)
except IndexError: logging.warning(f"Coords ({longitude},{latitude}) outside tile bounds {tile_filepath}."); return None
if not (0 <= row < dataset.height and 0 <= col < dataset.width): logging.warning(f"Pixel ({row},{col}) outside raster dims."); return None
window = rasterio.windows.Window(col_off=col, row_off=row, width=1, height=1)
elevation_value = dataset.read(1, window=window)[0, 0]
if nodata is not None and int(elevation_value) == int(nodata): logging.warning(f"Coords fall on nodata point."); return float('nan')
if not isinstance(elevation_value, (int, float, complex)) and not hasattr(elevation_value, 'item'): logging.error(f"Unexpected raster data type: {type(elevation_value)}"); return None
elevation_meters = float(elevation_value)
logging.info(f"Elevation found: {elevation_meters:.2f} meters")
return elevation_meters
except RasterioIOError as e: logging.error(f"Error reading raster file {tile_filepath}: {e}"); return None
except FileNotFoundError: logging.error(f"Raster file not found despite check: {tile_filepath}"); return None
except Exception as e: logging.error(f"Unexpected error reading elevation: {e}", exc_info=True); return None
# === Metodo download_area modificato per chiamare _ensure_tile_available ===
def download_area(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> Tuple[int, int]:
"""Downloads HGT and browse images for all tiles in the specified area."""
logging.info(f"Starting HGT download & browse check for area: Lat [{min_lat}, {max_lat}], Lon [{min_lon}, {max_lon}]")
try:
# ... (validazione input invariata) ...
if not (-90 <= min_lat < 90 and -90 <= max_lat < 90 and -180 <= min_lon < 180 and -180 <= max_lon < 180): raise ValueError("Coords out of valid range.")
if min_lat >= max_lat or min_lon >= max_lon: raise ValueError("Invalid area bounds.")
except ValueError as e: logging.error(f"Invalid input for download_area: {e}"); return 0, 0
start_lat_coord = math.floor(min_lat)
end_lat_coord = math.floor(max_lat)
start_lon_coord = math.floor(min_lon)
end_lon_coord = math.floor(max_lon)
logging.info(f"Tile coordinate range: Lat [{start_lat_coord}, {end_lat_coord}], Lon [{start_lon_coord}, {end_lon_coord}]")
tiles_processed = 0
hgt_tiles_obtained = 0 # Conta HGT ottenuti con successo
for lat_coord in range(int(start_lat_coord), int(end_lat_coord) + 1):
if not (-60 <= lat_coord < 60): continue # Salta fuori copertura
for lon_coord in range(int(start_lon_coord), int(end_lon_coord) + 1):
tiles_processed += 1
logging.info(f"Ensuring tile available for coordinates: Lat {lat_coord}, Lon {lon_coord}")
# _ensure_tile_available gestisce download HGT e browse
if self._ensure_tile_available(lat_coord, lon_coord):
hgt_tiles_obtained += 1
# L'errore per HGT è già loggato dentro _ensure_tile_available
logging.info(f"Area processing complete. Tiles checked: {tiles_processed}, HGT tiles successfully obtained/found: {hgt_tiles_obtained}")
return tiles_processed, hgt_tiles_obtained
# --- End of ElevationManager class ---