597 lines
24 KiB
Python
597 lines
24 KiB
Python
# elevation_manager.py
|
|
|
|
import os
|
|
import math
|
|
import logging
|
|
import netrc
|
|
import time
|
|
import zipfile
|
|
import io
|
|
from typing import Tuple, Optional, Dict, List
|
|
import requests
|
|
from requests.auth import HTTPBasicAuth
|
|
import numpy as np # Importa numpy per il tipo di ritorno di get_hgt_data
|
|
|
|
# Importa rasterio qui se serve solo per get_hgt_data o internamente
|
|
try:
|
|
import rasterio
|
|
from rasterio.errors import RasterioIOError
|
|
|
|
RASTERIO_AVAILABLE = True
|
|
except ImportError:
|
|
RASTERIO_AVAILABLE = False
|
|
RasterioIOError = Exception # Definisci un fallback per l'eccezione
|
|
logging.warning(
|
|
"Rasterio library not found. Elevation data reading will not be available."
|
|
)
|
|
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
|
)
|
|
|
|
# --- Configuration ---
|
|
NASADEM_DATA_BASE_URL = (
|
|
"https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/NASADEM_HGT.001/"
|
|
)
|
|
NASADEM_BROWSE_BASE_URL = (
|
|
"https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-public/NASADEM_HGT.001/"
|
|
)
|
|
EARTHDATA_MACHINE = "urs.earthdata.nasa.gov"
|
|
BROWSE_IMAGE_SUFFIX_VERSION = ".1"
|
|
BROWSE_IMAGE_EXTENSION = ".jpg"
|
|
# --- End Configuration ---
|
|
|
|
|
|
class ElevationManager:
|
|
"""
|
|
Manages fetching, caching, and querying NASADEM HGT v001 elevation data
|
|
and associated browse images from NASA Earthdata Cloud.
|
|
Handles authentication, download, extraction, and provides access methods.
|
|
"""
|
|
|
|
def __init__(self, tile_directory: str = "map_elevation"):
|
|
"""
|
|
Initializes the ElevationManager.
|
|
|
|
Args:
|
|
tile_directory (str): Root directory for storing HGT data and browse images.
|
|
"""
|
|
if not isinstance(tile_directory, str) or not tile_directory:
|
|
raise ValueError("tile_directory must be a non-empty string")
|
|
|
|
self.root_directory: str = tile_directory
|
|
self.tile_directory: str = os.path.join(self.root_directory, "hgt_tiles")
|
|
self.browse_directory: str = os.path.join(self.root_directory, "browse_images")
|
|
self.nodata_value: Optional[int] = -32768
|
|
self.earthdata_credentials: Optional[Tuple[str, str]] = None
|
|
|
|
self._read_netrc_credentials()
|
|
self._setup_directories()
|
|
|
|
def _read_netrc_credentials(self):
|
|
"""Reads Earthdata Login credentials from the .netrc file."""
|
|
try:
|
|
netrc_path = os.path.join(os.path.expanduser("~"), ".netrc")
|
|
if not os.path.exists(netrc_path):
|
|
logging.warning(
|
|
f"Authentication Warning: .netrc file not found at {netrc_path}. Data downloads might fail."
|
|
)
|
|
return
|
|
netrc_info = netrc.netrc(netrc_path)
|
|
auth_tokens = netrc_info.authenticators(EARTHDATA_MACHINE)
|
|
if auth_tokens:
|
|
username, _, password = auth_tokens
|
|
if username and password:
|
|
self.earthdata_credentials = (username, password)
|
|
logging.info(
|
|
f"Successfully read credentials for {EARTHDATA_MACHINE} from .netrc."
|
|
)
|
|
else:
|
|
logging.error(
|
|
f"Incomplete credentials for {EARTHDATA_MACHINE} in .netrc."
|
|
)
|
|
else:
|
|
logging.warning(
|
|
f"No credentials found for machine '{EARTHDATA_MACHINE}' in {netrc_path}."
|
|
)
|
|
except (FileNotFoundError, netrc.NetrcParseError) as e:
|
|
logging.error(f"Error reading/parsing .netrc file: {e}")
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error reading .netrc credentials: {e}")
|
|
|
|
def _setup_directories(self):
|
|
"""Creates the necessary cache directories."""
|
|
try:
|
|
os.makedirs(self.tile_directory, exist_ok=True)
|
|
os.makedirs(self.browse_directory, exist_ok=True)
|
|
logging.info(f"HGT tile cache directory set to: {self.tile_directory}")
|
|
logging.info(
|
|
f"Browse image cache directory set to: {self.browse_directory}"
|
|
)
|
|
except OSError as e:
|
|
logging.error(
|
|
f"Failed to create cache directories under {self.root_directory}: {e}"
|
|
)
|
|
raise # Rilancia errore critico
|
|
|
|
def _get_tile_base_coordinates(
|
|
self, latitude: float, longitude: float
|
|
) -> Tuple[int, int]:
|
|
"""Calculates integer coordinates of the bottom-left corner."""
|
|
if longitude >= 180.0:
|
|
longitude = 179.9999999
|
|
if longitude < -180.0:
|
|
longitude = -180.0
|
|
if not (-90 <= latitude < 90):
|
|
raise ValueError("Latitude must be between -90 and < 90 degrees.")
|
|
return int(math.floor(latitude)), int(math.floor(longitude))
|
|
|
|
def _get_nasadem_tile_base_name(self, lat_coord: int, lon_coord: int) -> str:
|
|
"""Generates the NASADEM base name component (e.g., 'n45e007')."""
|
|
lat_prefix = "n" if lat_coord >= 0 else "s"
|
|
lon_prefix = "e" if lon_coord >= 0 else "w"
|
|
return f"{lat_prefix}{abs(lat_coord):02d}{lon_prefix}{abs(lon_coord):03d}"
|
|
|
|
def _get_download_url(
|
|
self, tile_base_name: str, file_type: str = "data"
|
|
) -> Optional[str]:
|
|
"""Constructs the download URL based on file type."""
|
|
directory_name = f"NASADEM_HGT_{tile_base_name}"
|
|
if file_type == "data":
|
|
if not NASADEM_DATA_BASE_URL:
|
|
logging.error("DATA URL not configured.")
|
|
return None
|
|
filename = f"{directory_name}.zip"
|
|
url = f"{NASADEM_DATA_BASE_URL.rstrip('/')}/{directory_name}/{filename}"
|
|
log_prefix = "data"
|
|
elif file_type == "browse":
|
|
if not NASADEM_BROWSE_BASE_URL:
|
|
logging.error("BROWSE URL not configured.")
|
|
return None
|
|
filename = (
|
|
f"{directory_name}{BROWSE_IMAGE_SUFFIX_VERSION}{BROWSE_IMAGE_EXTENSION}"
|
|
)
|
|
url = f"{NASADEM_BROWSE_BASE_URL.rstrip('/')}/{directory_name}/{filename}"
|
|
log_prefix = "browse"
|
|
else:
|
|
logging.error(f"Invalid file_type: {file_type}")
|
|
return None
|
|
logging.debug(f"Constructed {log_prefix} URL: {url}")
|
|
return url
|
|
|
|
def _get_local_hgt_path(self, lat_coord: int, lon_coord: int) -> str:
|
|
"""Gets the full local path for the extracted HGT file."""
|
|
lat_prefix = "N" if lat_coord >= 0 else "S"
|
|
lon_prefix = "E" if lon_coord >= 0 else "W"
|
|
hgt_filename = (
|
|
f"{lat_prefix}{abs(lat_coord):02d}{lon_prefix}{abs(lon_coord):03d}.hgt"
|
|
)
|
|
return os.path.join(self.tile_directory, hgt_filename)
|
|
|
|
def _get_local_browse_path(self, lat_coord: int, lon_coord: int) -> str:
|
|
"""Gets the full local path for the downloaded browse image file."""
|
|
tile_base_name = self._get_nasadem_tile_base_name(lat_coord, lon_coord)
|
|
browse_filename = f"NASADEM_HGT_{tile_base_name}{BROWSE_IMAGE_SUFFIX_VERSION}{BROWSE_IMAGE_EXTENSION}"
|
|
return os.path.join(self.browse_directory, browse_filename)
|
|
|
|
def _perform_download(
|
|
self,
|
|
download_url: str,
|
|
local_path: str,
|
|
use_auth: bool,
|
|
is_zip: bool,
|
|
max_retries: int = 1,
|
|
retry_delay: int = 5,
|
|
) -> Optional[bytes]:
|
|
"""Helper function to download a file with optional auth and retries."""
|
|
if not download_url:
|
|
return None
|
|
|
|
headers = {"User-Agent": "Mozilla/5.0 ..."} # Definire user agent completo
|
|
auth = (
|
|
HTTPBasicAuth(self.earthdata_credentials[0], self.earthdata_credentials[1])
|
|
if use_auth and self.earthdata_credentials
|
|
else None
|
|
)
|
|
session = requests.Session()
|
|
file_content = None
|
|
|
|
log_file_type = "ZIP DATA" if is_zip else "BROWSE IMAGE"
|
|
|
|
for attempt in range(max_retries + 1):
|
|
try:
|
|
# Scarica tutto in memoria per ora (sia zip che browse)
|
|
response = session.get(
|
|
download_url,
|
|
stream=False,
|
|
timeout=60 if is_zip else 30,
|
|
headers=headers,
|
|
auth=auth,
|
|
allow_redirects=True,
|
|
)
|
|
response.raise_for_status()
|
|
file_content = response.content
|
|
if not file_content:
|
|
logging.warning(
|
|
f"Downloaded {log_file_type} content is empty for {download_url}."
|
|
)
|
|
break
|
|
logging.debug(
|
|
f"{log_file_type} content downloaded ({len(file_content)} bytes)."
|
|
)
|
|
break # Success
|
|
except requests.exceptions.HTTPError as e:
|
|
status_code = e.response.status_code
|
|
logging.error(
|
|
f"HTTP error downloading {log_file_type} {download_url}: Status {status_code}"
|
|
)
|
|
if status_code in [404, 401, 403] or status_code < 500:
|
|
break # Non ritentare errori client
|
|
except requests.exceptions.RequestException as e:
|
|
logging.warning(
|
|
f"Network error ({type(e).__name__}) downloading {log_file_type} {download_url}. Retrying... (Attempt {attempt + 1}/{max_retries + 1})"
|
|
)
|
|
except Exception as e:
|
|
logging.error(
|
|
f"Unexpected error during {log_file_type} download {download_url}: {e}",
|
|
exc_info=True,
|
|
)
|
|
break
|
|
if attempt < max_retries:
|
|
time.sleep(retry_delay)
|
|
else:
|
|
logging.error(
|
|
f"{log_file_type} download failed for {download_url} after {max_retries + 1} attempts."
|
|
)
|
|
|
|
session.close()
|
|
return file_content
|
|
|
|
def _download_and_extract_hgt(self, lat_coord: int, lon_coord: int) -> bool:
|
|
"""Downloads zipped HGT (using auth) and extracts it."""
|
|
tile_base_name = self._get_nasadem_tile_base_name(lat_coord, lon_coord)
|
|
local_hgt_path = self._get_local_hgt_path(lat_coord, lon_coord)
|
|
data_download_url = self._get_download_url(tile_base_name, file_type="data")
|
|
|
|
zip_content_bytes = self._perform_download(
|
|
data_download_url, "memory", use_auth=True, is_zip=True
|
|
)
|
|
|
|
if not zip_content_bytes:
|
|
logging.error(f"Failed to download HGT zip for ({lat_coord},{lon_coord}).")
|
|
return False
|
|
|
|
try:
|
|
with io.BytesIO(zip_content_bytes) as zip_content_stream:
|
|
with zipfile.ZipFile(zip_content_stream, "r") as zip_ref:
|
|
hgt_files = [
|
|
name
|
|
for name in zip_ref.namelist()
|
|
if name.lower().endswith(".hgt")
|
|
]
|
|
if not hgt_files:
|
|
logging.error(f"No .hgt in zip: {tile_base_name}.zip")
|
|
return False
|
|
hgt_filename = hgt_files[0]
|
|
with open(local_hgt_path, "wb") as f_out:
|
|
f_out.write(zip_ref.read(hgt_filename))
|
|
logging.info(f"Successfully extracted HGT to: {local_hgt_path}")
|
|
return True
|
|
except zipfile.BadZipFile:
|
|
logging.error(
|
|
f"Downloaded data for ({lat_coord},{lon_coord}) is not a valid zip."
|
|
)
|
|
return False
|
|
except Exception as e:
|
|
logging.error(
|
|
f"Failed to extract HGT for ({lat_coord},{lon_coord}): {e}",
|
|
exc_info=True,
|
|
)
|
|
return False
|
|
|
|
def _download_browse_image(self, lat_coord: int, lon_coord: int) -> bool:
|
|
"""Downloads the public browse image file if it doesn't exist locally."""
|
|
local_browse_path = self._get_local_browse_path(lat_coord, lon_coord)
|
|
if os.path.exists(local_browse_path):
|
|
return True # Già presente
|
|
|
|
tile_base_name = self._get_nasadem_tile_base_name(lat_coord, lon_coord)
|
|
browse_download_url = self._get_download_url(tile_base_name, file_type="browse")
|
|
|
|
browse_content_bytes = self._perform_download(
|
|
browse_download_url, local_browse_path, use_auth=False, is_zip=False
|
|
)
|
|
|
|
if not browse_content_bytes:
|
|
logging.warning(
|
|
f"Failed to download browse image for ({lat_coord},{lon_coord})."
|
|
)
|
|
return False
|
|
|
|
try:
|
|
browse_dir = os.path.dirname(local_browse_path)
|
|
os.makedirs(browse_dir, exist_ok=True) # Assicura esista dir
|
|
with open(local_browse_path, "wb") as f_img:
|
|
f_img.write(browse_content_bytes)
|
|
logging.info(f"Successfully saved browse image to: {local_browse_path}")
|
|
return True
|
|
except Exception as e:
|
|
logging.error(
|
|
f"Failed to save browse image to {local_browse_path}: {e}",
|
|
exc_info=True,
|
|
)
|
|
return False
|
|
|
|
def _ensure_tile_available(self, lat_coord: int, lon_coord: int) -> bool:
|
|
"""Ensures HGT is available and attempts browse download. Returns HGT status."""
|
|
local_hgt_path = self._get_local_hgt_path(lat_coord, lon_coord)
|
|
hgt_available = os.path.exists(local_hgt_path)
|
|
|
|
if not hgt_available:
|
|
logging.info(
|
|
f"HGT for ({lat_coord},{lon_coord}) not found locally, attempting download."
|
|
)
|
|
hgt_available = self._download_and_extract_hgt(lat_coord, lon_coord)
|
|
|
|
if hgt_available:
|
|
# Se HGT ok, tenta download browse (non bloccante per il risultato)
|
|
self._download_browse_image(lat_coord, lon_coord)
|
|
else:
|
|
logging.error(f"Could not obtain HGT for ({lat_coord},{lon_coord}).")
|
|
|
|
return hgt_available
|
|
|
|
# --- Metodi Pubblici ---
|
|
|
|
def get_tile_info(self, latitude: float, longitude: float) -> Optional[Dict]:
|
|
"""
|
|
Returns dictionary with info about the tile containing the coordinates.
|
|
Ensures the tile (HGT and potentially browse) is downloaded if possible.
|
|
"""
|
|
try:
|
|
lat_coord, lon_coord = self._get_tile_base_coordinates(latitude, longitude)
|
|
except ValueError as e:
|
|
logging.error(f"Invalid coordinates for get_tile_info: {e}")
|
|
return None
|
|
|
|
# Assicura che HGT (e browse) siano tentati di scaricare
|
|
hgt_success = self._ensure_tile_available(lat_coord, lon_coord)
|
|
|
|
# Costruisci il dizionario informativo
|
|
tile_base_name = self._get_nasadem_tile_base_name(lat_coord, lon_coord)
|
|
local_hgt_path = self._get_local_hgt_path(lat_coord, lon_coord)
|
|
local_browse_path = self._get_local_browse_path(lat_coord, lon_coord)
|
|
|
|
info = {
|
|
"latitude_coord": lat_coord,
|
|
"longitude_coord": lon_coord,
|
|
"tile_base_name": tile_base_name,
|
|
"hgt_file_path": local_hgt_path if os.path.exists(local_hgt_path) else None,
|
|
"browse_image_path": (
|
|
local_browse_path if os.path.exists(local_browse_path) else None
|
|
),
|
|
"hgt_available": hgt_success,
|
|
"browse_available": (
|
|
os.path.exists(local_browse_path) if local_browse_path else False
|
|
),
|
|
}
|
|
return info
|
|
|
|
def get_elevation(self, latitude: float, longitude: float) -> Optional[float]:
|
|
"""Gets elevation for a point, downloading data if needed."""
|
|
if not RASTERIO_AVAILABLE:
|
|
logging.error("Cannot get elevation: Rasterio library is not installed.")
|
|
return None
|
|
|
|
tile_info = self.get_tile_info(latitude, longitude)
|
|
|
|
if not tile_info or not tile_info.get("hgt_available"):
|
|
logging.warning(
|
|
f"Elevation not available: HGT data could not be obtained for ({latitude},{longitude})."
|
|
)
|
|
return None
|
|
|
|
tile_filepath = tile_info["hgt_file_path"]
|
|
if not tile_filepath: # Doppia verifica
|
|
logging.error("Inconsistency: HGT marked available but path is None.")
|
|
return None
|
|
|
|
try:
|
|
with rasterio.open(tile_filepath) as dataset:
|
|
nodata = (
|
|
dataset.nodata if dataset.nodata is not None else self.nodata_value
|
|
)
|
|
try:
|
|
row, col = dataset.index(longitude, latitude)
|
|
except IndexError:
|
|
logging.warning(f"Coords outside tile bounds {tile_filepath}.")
|
|
return None
|
|
if not (0 <= row < dataset.height and 0 <= col < dataset.width):
|
|
logging.warning(f"Pixel outside raster dims.")
|
|
return None
|
|
window = rasterio.windows.Window(
|
|
col_off=col, row_off=row, width=1, height=1
|
|
)
|
|
elevation_value = dataset.read(1, window=window)[0, 0]
|
|
if nodata is not None and int(elevation_value) == int(nodata):
|
|
logging.warning(f"Coords fall on nodata point.")
|
|
return float("nan")
|
|
if not isinstance(
|
|
elevation_value, (int, float, complex)
|
|
) and not hasattr(elevation_value, "item"):
|
|
logging.error(
|
|
f"Unexpected raster data type: {type(elevation_value)}"
|
|
)
|
|
return None
|
|
elevation_meters = float(elevation_value)
|
|
logging.info(f"Elevation found: {elevation_meters:.2f} meters")
|
|
return elevation_meters
|
|
except RasterioIOError as e:
|
|
logging.error(f"Rasterio error reading {tile_filepath}: {e}")
|
|
return None
|
|
except FileNotFoundError:
|
|
logging.error(f"HGT file not found for reading: {tile_filepath}")
|
|
return None
|
|
except Exception as e:
|
|
logging.error(
|
|
f"Unexpected error reading elevation from {tile_filepath}: {e}",
|
|
exc_info=True,
|
|
)
|
|
return None
|
|
|
|
def get_hgt_data(self, latitude: float, longitude: float) -> Optional[np.ndarray]:
|
|
"""Reads and returns the raw HGT data array for the corresponding tile."""
|
|
if not RASTERIO_AVAILABLE:
|
|
logging.error("Cannot get HGT data: Rasterio library is not installed.")
|
|
return None
|
|
|
|
tile_info = self.get_tile_info(latitude, longitude)
|
|
|
|
if not tile_info or not tile_info.get("hgt_available"):
|
|
logging.warning(
|
|
f"HGT data array not available for ({latitude},{longitude})."
|
|
)
|
|
return None
|
|
|
|
tile_filepath = tile_info["hgt_file_path"]
|
|
if not tile_filepath:
|
|
return None
|
|
|
|
try:
|
|
with rasterio.open(tile_filepath) as dataset:
|
|
data_array = dataset.read(1)
|
|
logging.info(
|
|
f"Successfully read HGT data array from {tile_filepath} (shape: {data_array.shape})"
|
|
)
|
|
return data_array
|
|
except Exception as e:
|
|
logging.error(
|
|
f"Failed to read HGT data from {tile_filepath}: {e}", exc_info=True
|
|
)
|
|
return None
|
|
|
|
def get_browse_image_path(self, latitude: float, longitude: float) -> Optional[str]:
|
|
"""Returns the local path to the browse image for the tile, if available."""
|
|
# Questo è un wrapper più semplice, riutilizza get_tile_info
|
|
tile_info = self.get_tile_info(latitude, longitude)
|
|
if tile_info and tile_info.get("browse_available"):
|
|
return tile_info["browse_image_path"]
|
|
# Non loggare qui, get_tile_info logga già se non trovato
|
|
return None
|
|
|
|
def get_area_tile_info(
|
|
self, min_lat: float, min_lon: float, max_lat: float, max_lon: float
|
|
) -> List[Dict]:
|
|
"""
|
|
Returns a list of tile info dictionaries for all tiles within the bounding box.
|
|
Does NOT trigger downloads, expects tiles to be potentially downloaded via download_area.
|
|
"""
|
|
tile_info_list = []
|
|
try:
|
|
# ... (validazione input bounds come in download_area) ...
|
|
if not (
|
|
-90 <= min_lat < 90
|
|
and -90 <= max_lat < 90
|
|
and -180 <= min_lon < 180
|
|
and -180 <= max_lon < 180
|
|
):
|
|
raise ValueError("Coords out of valid range.")
|
|
if min_lat >= max_lat or min_lon >= max_lon:
|
|
raise ValueError("Invalid area bounds.")
|
|
except ValueError as e:
|
|
logging.error(f"Invalid input for get_area_tile_info: {e}")
|
|
return []
|
|
|
|
start_lat = math.floor(min_lat)
|
|
end_lat = math.floor(max_lat)
|
|
start_lon = math.floor(min_lon)
|
|
end_lon = math.floor(max_lon)
|
|
|
|
logging.info(
|
|
f"Getting info for tiles in range: Lat [{start_lat}, {end_lat}], Lon [{start_lon}, {end_lon}]"
|
|
)
|
|
|
|
for lat_coord in range(int(start_lat), int(end_lat) + 1):
|
|
if not (-60 <= lat_coord < 60):
|
|
continue # Limiti copertura
|
|
for lon_coord in range(int(start_lon), int(end_lon) + 1):
|
|
# Ricostruisci info senza triggerare download qui
|
|
tile_base_name = self._get_nasadem_tile_base_name(lat_coord, lon_coord)
|
|
local_hgt_path = self._get_local_hgt_path(lat_coord, lon_coord)
|
|
local_browse_path = self._get_local_browse_path(lat_coord, lon_coord)
|
|
info = {
|
|
"latitude_coord": lat_coord,
|
|
"longitude_coord": lon_coord,
|
|
"tile_base_name": tile_base_name,
|
|
"hgt_file_path": (
|
|
local_hgt_path if os.path.exists(local_hgt_path) else None
|
|
),
|
|
"browse_image_path": (
|
|
local_browse_path if os.path.exists(local_browse_path) else None
|
|
),
|
|
"hgt_available": (
|
|
os.path.exists(local_hgt_path) if local_hgt_path else False
|
|
),
|
|
"browse_available": (
|
|
os.path.exists(local_browse_path)
|
|
if local_browse_path
|
|
else False
|
|
),
|
|
}
|
|
tile_info_list.append(info)
|
|
return tile_info_list
|
|
|
|
def download_area(
|
|
self, min_lat: float, min_lon: float, max_lat: float, max_lon: float
|
|
) -> Tuple[int, int]:
|
|
"""Downloads HGT and attempts browse download for all tiles in the area."""
|
|
logging.info(
|
|
f"Starting HGT download & browse check for area: Lat [{min_lat}, {max_lat}], Lon [{min_lon}, {max_lon}]"
|
|
)
|
|
try:
|
|
if not (
|
|
-90 <= min_lat < 90
|
|
and -90 <= max_lat < 90
|
|
and -180 <= min_lon < 180
|
|
and -180 <= max_lon < 180
|
|
):
|
|
raise ValueError("Coords out of valid range.")
|
|
if min_lat >= max_lat or min_lon >= max_lon:
|
|
raise ValueError("Invalid area bounds.")
|
|
except ValueError as e:
|
|
logging.error(f"Invalid input for download_area: {e}")
|
|
return 0, 0
|
|
|
|
start_lat = math.floor(min_lat)
|
|
end_lat = math.floor(max_lat)
|
|
start_lon = math.floor(min_lon)
|
|
end_lon = math.floor(max_lon)
|
|
logging.info(
|
|
f"Tile coordinate range: Lat [{start_lat}, {end_lat}], Lon [{start_lon}, {end_lon}]"
|
|
)
|
|
|
|
tiles_processed = 0
|
|
hgt_tiles_obtained = 0 # Conta HGT ottenuti/trovati con successo
|
|
|
|
for lat_coord in range(int(start_lat), int(end_lat) + 1):
|
|
if not (-60 <= lat_coord < 60):
|
|
continue
|
|
for lon_coord in range(int(start_lon), int(end_lon) + 1):
|
|
tiles_processed += 1
|
|
logging.debug(
|
|
f"Ensuring tile available for ({lat_coord},{lon_coord}) in download_area."
|
|
)
|
|
# Questa chiamata tenta download HGT e poi browse
|
|
if self._ensure_tile_available(lat_coord, lon_coord):
|
|
hgt_tiles_obtained += 1
|
|
# Errore già loggato internamente se fallisce
|
|
|
|
logging.info(
|
|
f"Area processing complete. Tiles checked: {tiles_processed}, HGT tiles successfully obtained/found: {hgt_tiles_obtained}"
|
|
)
|
|
return tiles_processed, hgt_tiles_obtained
|
|
|
|
|
|
# --- End of ElevationManager class ---
|