ok view point on map

This commit is contained in:
VALLONGOL 2025-05-13 16:01:44 +02:00
parent e7ca4855ab
commit 6925e8e323
6 changed files with 2104 additions and 380 deletions

View File

@ -19,7 +19,7 @@ DEFAULT_MAP_TILE_CACHE_ROOT_DIR: str = "geoelevation_map_tile_cache"
# --- Map Viewer Default Settings ---
# Default zoom level for map displays when not otherwise specified.
DEFAULT_MAP_DISPLAY_ZOOM_LEVEL: int = 15
DEFAULT_MAP_DISPLAY_ZOOM_LEVEL: int = 3
# Default area size (in kilometers) to display around a point on the map
# when showing a map for a single point.

View File

@ -146,29 +146,49 @@ def run_map_viewer_process_target(
display_scale_factor: float
) -> None:
child_logger = logging.getLogger("GeoElevationMapViewerChildProcess")
# Configure logger if it hasn't been configured in this process yet
if not child_logger.handlers:
stream_handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
child_logger.addHandler(stream_handler)
child_logger.setLevel(logging.INFO)
child_logger.propagate = False
# Use basicConfig as this is a fresh process, ensure it doesn't inherit handlers from root
# and send output to sys.stdout/stderr which is captured by the parent process's terminal.
# Level should ideally be inherited or passed, but setting INFO here as a default.
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
stream=sys.stdout # Explicitly route to stdout
)
child_map_viewer_instance: Optional[Any] = None
child_map_system_ok = False
try:
# Attempt to import components needed in the child process
from geoelevation.map_viewer.geo_map_viewer import GeoElevationMapViewer as ChildGeoMapViewer
from geoelevation.elevation_manager import ElevationManager as ChildElevationManager
import cv2 as child_cv2
import cv2 as child_cv2 # Ensure cv2 is available in the child process
child_map_system_ok = True
except ImportError as e_child_imp_map_corrected:
child_logger.error(f"Map viewer components or OpenCV not found in child: {e_child_imp_map_corrected}")
child_map_system_ok = False
child_logger.error(f"Map viewer components or OpenCV not found in child process: {e_child_imp_map_corrected}")
# MODIFIED: Send an error message back to the GUI queue if critical imports fail in child process.
# WHY: The GUI process needs to know that the map process failed to start properly.
# HOW: Put a specific error message in the queue before exiting.
error_payload = {"type": "map_info_update", "latitude": None, "longitude": None,
"elevation_str": f"Fatal Error: {type(e_child_imp_map_corrected).__name__}",
"map_area_size_str": "Map System N/A"}
try: map_interaction_q.put(error_payload)
except Exception as e_put_err: child_logger.error(f"Failed to put error payload to queue before exit: {e_put_err}")
# Note: No return here yet, the finally block will execute.
# Let's add a return right after sending the error.
return # Exit if critical libraries are missing
if not child_map_system_ok:
return
# MODIFIED: Added the main operational logic for the map viewer process.
# WHY: Centralizes the try/except/finally for the core map viewing loop.
# HOW: Wrapped the map viewer initialization and display logic in a try block.
try:
child_logger.info(f"Initializing for mode '{operation_mode}', display scale: {display_scale_factor:.2f}")
# MODIFIED: Pass the correct DEM cache directory path to the child ElevationManager.
# WHY: The map viewer process needs its own ElevationManager instance, configured with the same cache path.
# HOW: Used the dem_data_cache_dir parameter passed from the GUI.
local_em = ChildElevationManager(tile_directory=dem_data_cache_dir)
child_map_viewer_instance = ChildGeoMapViewer(
elevation_manager_instance=local_em,
@ -176,35 +196,76 @@ def run_map_viewer_process_target(
initial_display_scale=display_scale_factor
)
if operation_mode == "point" and center_latitude is not None and center_longitude is not None:
# MODIFIED: Call display_map_for_point with the center coordinates.
# WHY: This method contains the logic to stitch the map, draw markers, and send initial info.
# HOW: Called the method on the initialized child_map_viewer_instance.
child_map_viewer_instance.display_map_for_point(center_latitude, center_longitude)
elif operation_mode == "area" and area_bounding_box:
# MODIFIED: Call display_map_for_area with the area bounding box.
# WHY: This method handles stitching the map for an area and potentially drawing a boundary.
# HOW: Called the method on the initialized child_map_viewer_instance.
child_map_viewer_instance.display_map_for_area(area_bounding_box)
else:
child_logger.error(f"Invalid mode ('{operation_mode}') or missing parameters.")
if hasattr(child_map_viewer_instance, 'shutdown'): child_map_viewer_instance.shutdown()
return
child_logger.info("Display method called. Map window should be active.")
child_logger.error(f"Invalid mode ('{operation_mode}') or missing parameters passed to map process.")
# MODIFIED: Send an error message to the GUI queue if the mode or parameters are invalid.
# WHY: The GUI process needs feedback if the map process received bad instructions.
# HOW: Put a specific error message in the queue before exiting.
error_payload = {"type": "map_info_update", "latitude": None, "longitude": None,
"elevation_str": f"Fatal Error: Invalid Map Args",
"map_area_size_str": "Invalid Args"}
try: map_interaction_q.put(error_payload)
except Exception as e_put_err: child_logger.error(f"Failed to put error payload to queue before exit: {e_put_err}")
# No need to call shutdown here, as the instance might not be fully initialized
return # Exit on invalid parameters
child_logger.info("Map display method called. Entering event loop.")
# The map display window (managed by MapDisplayWindow) runs its own internal
# event loop when cv2.imshow is called periodically, and handles user input via callbacks.
# The loop here just needs to keep the process alive and allow the window
# to process events via waitKey. We also check if the window is still alive.
is_map_active = True
while is_map_active:
# cv2.waitKey(milliseconds) is crucial for processing window events and mouse callbacks
key = child_cv2.waitKey(100) # Check for key press every 100ms (optional, but good practice)
# Check if the window still exists and is active.
# is_window_alive() uses cv2.getWindowProperty which is thread-safe.
if child_map_viewer_instance.map_display_window_controller:
if child_map_viewer_instance.map_display_window_controller.is_window_alive():
child_cv2.waitKey(100)
# Optional: Check for specific keys to close the window (e.g., 'q' or Escape)
if key != -1: # A key was pressed
child_logger.debug(f"Map window received key press: {key}")
if key == ord('q') or key == 27: # 'q' or Escape key
child_logger.info("Map window closing due to 'q' or Escape key press.")
is_map_active = False # Signal loop to exit
else:
child_logger.info("Map window reported as not alive by is_window_alive().")
is_map_active = False
is_map_active = False # Signal loop to exit if window closed by user 'X' button
else:
child_logger.info("map_display_window_controller is None. Assuming window closed.")
is_map_active = False
child_logger.info("Map window loop in child process exited.")
child_logger.info("map_display_window_controller is None. Assuming window closed or initialization failed.")
is_map_active = False # Signal loop to exit
child_logger.info("Map window process event loop exited.")
except Exception as e_map_proc_child_fatal_final:
child_logger.error(f"Fatal error in map viewer process target: {e_map_proc_child_fatal_final}", exc_info=True)
# MODIFIED: Send a fatal error message back to the GUI queue if an exception occurs in the main loop.
# WHY: The GUI process needs to know if the map process crashed unexpectedly.
# HOW: Put a specific error message in the queue before exiting.
error_payload = {"type": "map_info_update", "latitude": None, "longitude": None,
"elevation_str": f"Fatal Error: {type(e_map_proc_child_fatal_final).__name__}",
"map_area_size_str": "Fatal Error"}
try: map_interaction_q.put(error_payload)
except Exception as e_put_err: child_logger.error(f"Failed to put error payload to queue before exit: {e_put_err}")
finally:
# MODIFIED: Ensure shutdown is called regardless of how the process loop exits.
# WHY: Cleans up OpenCV resources.
# HOW: Moved the shutdown call into the finally block.
if child_map_viewer_instance and hasattr(child_map_viewer_instance, 'shutdown'):
child_map_viewer_instance.shutdown()
child_logger.info("Terminated child map viewer process.")
# --- END MULTIPROCESSING TARGET FUNCTIONS ---
class ElevationApp:
"""Main application class for the GeoElevation Tool GUI."""
@ -230,7 +291,7 @@ class ElevationApp:
# HOW: Replaced the hardcoded or previously mis-aliased variable name
# with the correct imported constant. The try...except block ensures
# a fallback is used if the import somehow fails.
default_dem_cache = GEOELEVATION_DEM_CACHE_DEFAULT
default_dem_cache = GEOELEVATION_DEM_CACHE_DEFAULT
if self.elevation_manager is None:
try:
@ -264,6 +325,11 @@ class ElevationApp:
self.map_viewer_process_handle: Optional[multiprocessing.Process] = None
self.map_interaction_message_queue: Optional[multiprocessing.Queue] = None
# MODIFIED: Store the default DEM cache path to pass to the map viewer process.
# WHY: The map process needs this path to initialize its own ElevationManager.
# HOW: Assigned the resolved default_dem_cache to a new instance attribute.
self._dem_data_cache_dir_for_map_process = default_dem_cache
if MAP_VIEWER_SYSTEM_AVAILABLE:
try:
self.map_interaction_message_queue = multiprocessing.Queue()
@ -274,7 +340,14 @@ class ElevationApp:
self._apply_initial_widget_states()
if self.map_interaction_message_queue:
# MODIFIED: Start processing the map queue periodically.
# WHY: The GUI needs to receive messages from the map viewer process.
# HOW: Scheduled the _process_map_interaction_queue_messages method to run after a delay.
self.root.after(100, self._process_map_interaction_queue_messages)
# MODIFIED: Handle window closing event gracefully.
# WHY: To terminate the separate map process when the main GUI closes.
# HOW: Set the WM_DELETE_WINDOW protocol to call _on_application_closing.
self.root.protocol("WM_DELETE_WINDOW", self._on_application_closing)
def _get_scale_display_text_from_value(self, target_scale_value: float) -> str:
@ -356,7 +429,7 @@ class ElevationApp:
self.view_map_for_area_button.grid(row=4, column=0, columnspan=4, pady=5, sticky=(tk.W,tk.E))
self.area_download_status_label = ttk.Label(area_download_frame, text="Status: Idle", wraplength=400, justify=tk.LEFT)
self.area_download_status_label.grid(row=5, column=0, columnspan=4, sticky=tk.W, pady=5)
map_display_options_frame = ttk.LabelFrame(main_app_frame, text="Map Display Options", padding="10")
map_display_options_frame.grid(row=2, column=0, padx=5, pady=5, sticky=(tk.W, tk.E))
map_display_options_frame.columnconfigure(1, weight=1)
@ -373,18 +446,28 @@ class ElevationApp:
self.map_scale_combobox.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=5, pady=3)
self.map_scale_combobox.bind("<<ComboboxSelected>>", self._on_map_display_scale_changed)
map_click_info_frame = ttk.LabelFrame(main_app_frame, text="Map Click Information", padding="10")
map_click_info_frame.grid(row=3, column=0, padx=5, pady=5, sticky=(tk.W, tk.E))
map_click_info_frame.columnconfigure(1, weight=1)
ttk.Label(map_click_info_frame, text="Clicked Latitude:").grid(row=0, column=0, sticky=tk.W, padx=5, pady=2)
self.map_click_latitude_label = ttk.Label(map_click_info_frame, text="N/A")
# MODIFIED: Changed LabelFrame text from "Map Click Information" to "Map Info".
# WHY: The panel will now show information beyond just the click location (e.g., area size).
# HOW: Updated the text option in the LabelFrame constructor.
map_info_frame = ttk.LabelFrame(main_app_frame, text="Map Info", padding="10")
map_info_frame.grid(row=3, column=0, padx=5, pady=5, sticky=(tk.W, tk.E))
map_info_frame.columnconfigure(1, weight=1)
ttk.Label(map_info_frame, text="Latitude:").grid(row=0, column=0, sticky=tk.W, padx=5, pady=2)
self.map_click_latitude_label = ttk.Label(map_info_frame, text="N/A") # Keep name for now for less refactoring
self.map_click_latitude_label.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=5, pady=2)
ttk.Label(map_click_info_frame, text="Clicked Longitude:").grid(row=1, column=0, sticky=tk.W, padx=5, pady=2)
self.map_click_longitude_label = ttk.Label(map_click_info_frame, text="N/A")
ttk.Label(map_info_frame, text="Longitude:").grid(row=1, column=0, sticky=tk.W, padx=5, pady=2)
self.map_click_longitude_label = ttk.Label(map_info_frame, text="N/A") # Keep name for now
self.map_click_longitude_label.grid(row=1, column=1, sticky=(tk.W, tk.E), padx=5, pady=2)
ttk.Label(map_click_info_frame, text="Elevation at Click:").grid(row=2, column=0, sticky=tk.W, padx=5, pady=2)
self.map_click_elevation_label = ttk.Label(map_click_info_frame, text="N/A")
ttk.Label(map_info_frame, text="Elevation:").grid(row=2, column=0, sticky=tk.W, padx=5, pady=2)
self.map_click_elevation_label = ttk.Label(map_info_frame, text="N/A") # Keep name for now
self.map_click_elevation_label.grid(row=2, column=1, sticky=(tk.W, tk.E), padx=5, pady=2)
# MODIFIED: Added a new Label for displaying the map area size.
# WHY: To fulfill the requirement to show the size of the currently displayed map patch.
# HOW: Created and gridded a new ttk.Label.
ttk.Label(map_info_frame, text="Map Area Size:").grid(row=3, column=0, sticky=tk.W, padx=5, pady=2)
self.map_area_size_label = ttk.Label(map_info_frame, text="N/A", wraplength=300, justify=tk.LEFT)
self.map_area_size_label.grid(row=3, column=1, sticky=(tk.W, tk.E), padx=5, pady=2)
main_app_frame.columnconfigure(0, weight=1)
main_app_frame.rowconfigure(0, weight=0)
@ -394,7 +477,11 @@ class ElevationApp:
if not MAP_VIEWER_SYSTEM_AVAILABLE:
map_display_options_frame.grid_remove()
map_click_info_frame.grid_remove()
# MODIFIED: Also hide the renamed "Map Info" frame if map viewer system is unavailable.
# WHY: The panel is only relevant when the map viewer is working.
# HOW: Called grid_remove on the map_info_frame.
map_info_frame.grid_remove()
def _apply_initial_widget_states(self) -> None:
if self.elevation_manager is None:
@ -402,7 +489,7 @@ class ElevationApp:
self.download_area_button.config(state=tk.DISABLED)
self.point_result_label.config(text="Result: Elevation Manager Init Failed.")
self.area_download_status_label.config(text="Status: Elevation Manager Init Failed.")
self.show_2d_browse_button.config(state=tk.DISABLED)
self.show_3d_dem_button.config(state=tk.DISABLED)
self.show_area_composite_button.config(state=tk.DISABLED)
@ -419,7 +506,7 @@ class ElevationApp:
scale_combo_state = "readonly"
point_map_text = "View Point on Map"
area_map_text = "View Area on Map"
self.view_map_for_point_button.config(state=point_map_initial_state, text=point_map_text)
self.view_map_for_area_button.config(state=area_map_initial_state, text=area_map_text)
self.map_scale_combobox.config(state=scale_combo_state)
@ -449,11 +536,26 @@ class ElevationApp:
logger.warning(f"Invalid map scale option selected: '{selected_display_text}'")
def _set_busy_state(self, is_busy: bool) -> None:
"""Sets the GUI to a busy or idle state."""
self.is_processing_task = is_busy
new_widget_state = tk.DISABLED if is_busy else tk.NORMAL
# MODIFIED: Add check for elevation_manager before configuring buttons.
# WHY: Avoids AttributeError if manager init failed.
# HOW: Added 'if self.elevation_manager:'.
if self.elevation_manager:
new_widget_state = tk.DISABLED if is_busy else tk.NORMAL
self.get_elevation_button.config(state=new_widget_state)
self.download_area_button.config(state=new_widget_state)
# MODIFIED: Also disable/enable visualization buttons based on busy state.
# WHY: Prevent triggering new tasks while one is in progress.
# HOW: Include visualization buttons in the state change.
self.show_2d_browse_button.config(state=tk.DISABLED if is_busy else (tk.NORMAL if self.last_valid_point_coords and MATPLOTLIB_AVAILABLE and PIL_AVAILABLE else tk.DISABLED))
self.show_3d_dem_button.config(state=tk.DISABLED if is_busy else (tk.NORMAL if self.last_valid_point_coords and MATPLOTLIB_AVAILABLE else tk.DISABLED))
self.show_area_composite_button.config(state=tk.DISABLED if is_busy else (tk.NORMAL if self.last_area_coords and MATPLOTLIB_AVAILABLE and PIL_AVAILABLE else tk.DISABLED))
if MAP_VIEWER_SYSTEM_AVAILABLE:
# Re-enable map buttons based on busy state and data availability.
self.view_map_for_point_button.config(state=tk.DISABLED if is_busy else (tk.NORMAL if self.last_valid_point_coords else tk.DISABLED))
self.view_map_for_area_button.config(state=tk.DISABLED if is_busy else (tk.NORMAL if self.last_area_coords else tk.DISABLED))
def _validate_point_coordinates(self, lat_s: str, lon_s: str) -> Optional[Tuple[float, float]]:
try:
@ -480,192 +582,529 @@ class ElevationApp:
except ValueError as e: logger.error(f"Invalid area: {e}"); messagebox.showerror("Input Error", f"Invalid area:\n{e}", parent=self.root); return None
def _trigger_get_elevation_task(self) -> None:
"""Starts a background thread to get elevation for a point."""
if self.is_processing_task or not self.elevation_manager: return
coords = self._validate_point_coordinates(self.latitude_entry.get(), self.longitude_entry.get())
if not coords: return
lat, lon = coords
# MODIFIED: Start busy state and update status label immediately on GUI thread.
# WHY: Provide immediate visual feedback that processing has started.
# HOW: Call _set_busy_state(True) and update label text.
self._set_busy_state(True)
self.point_result_label.config(text="Result: Requesting...")
self.show_2d_browse_button.config(state=tk.DISABLED)
self.show_3d_dem_button.config(state=tk.DISABLED)
if MAP_VIEWER_SYSTEM_AVAILABLE: self.view_map_for_point_button.config(state=tk.DISABLED)
self.last_valid_point_coords = None
self.root.update_idletasks()
self.point_result_label.config(text="Result: Requesting elevation... Please wait.")
self.root.update_idletasks() # Force GUI update
# MODIFIED: Start the elevation retrieval in a separate thread.
# WHY: Prevent the GUI from freezing while waiting for network or disk I/O.
# HOW: Create and start a threading.Thread.
elev_thread = threading.Thread(
target=self._perform_background_get_elevation,
args=(lat, lon),
daemon=True # Thread will exit if main program exits
)
elev_thread.start()
# MODIFIED: Added a new background task function for point elevation retrieval.
# WHY: Encapsulates the potentially blocking operation to run in a separate thread.
# HOW: Created this method to call elevation_manager.get_elevation.
def _perform_background_get_elevation(self, latitude: float, longitude: float) -> None:
"""Background task to retrieve elevation."""
result_elevation: Optional[float] = None
exception_occurred: Optional[Exception] = None
try:
elev = self.elevation_manager.get_elevation(lat, lon)
res_txt = "Result: "
if elev is None: res_txt += "Data unavailable."; messagebox.showwarning("Info", "Could not retrieve elevation.", parent=self.root)
elif math.isnan(elev): res_txt += "Point on NoData area."; self.last_valid_point_coords = (lat, lon)
else: res_txt += f"Elevation {elev:.2f}m"; self.last_valid_point_coords = (lat, lon)
self.point_result_label.config(text=res_txt)
if self.last_valid_point_coords:
if MATPLOTLIB_AVAILABLE and PIL_AVAILABLE: self.show_2d_browse_button.config(state=tk.NORMAL)
if MATPLOTLIB_AVAILABLE: self.show_3d_dem_button.config(state=tk.NORMAL)
if MAP_VIEWER_SYSTEM_AVAILABLE: self.view_map_for_point_button.config(state=tk.NORMAL)
except Exception as e: logger.exception("GUI Error: get_elevation"); messagebox.showerror("Error", f"Error:\n{e}", parent=self.root); self.point_result_label.config(text="Result: Error.")
finally: self._set_busy_state(False)
# Call the actual elevation retrieval logic (which might involve downloads)
# Using the shared manager instance in the main process.
if self.elevation_manager:
logger.info(f"GUI Thread: Calling elevation_manager.get_elevation for ({latitude:.5f},{longitude:.5f}) in background thread.")
# This call will handle its own progress (e.g., the NullHandler logging or the CLI progress dialog if applicable)
result_elevation = self.elevation_manager.get_elevation(latitude, longitude)
else:
raise RuntimeError("ElevationManager is not initialized.")
except Exception as e:
logger.exception(f"GUI Thread Error: Exception during background get_elevation for ({latitude:.5f},{longitude:.5f}).")
exception_occurred = e
finally:
# MODIFIED: Use root.after to call the UI update method on the main GUI thread.
# WHY: Tkinter GUI updates must happen on the main thread.
# HOW: Schedule _get_elevation_task_complete_ui_update call.
self.root.after(
0, # Schedule to run as soon as the main loop is free
self._get_elevation_task_complete_ui_update, # Callback function
result_elevation, # Pass the result/error back
exception_occurred,
latitude, longitude # Pass original coords for context
)
# MODIFIED: Added a new UI update function for point elevation task completion.
# WHY: Centralizes the logic to update GUI elements after the background task finishes.
# HOW: Created this method to update labels and button states.
def _get_elevation_task_complete_ui_update(
self,
elevation_result: Optional[float],
exception_occurred: Optional[Exception],
original_latitude: float,
original_longitude: float
) -> None:
"""Updates GUI elements after a point elevation task completes."""
res_txt = "Result: "
self.last_valid_point_coords = None # Reset valid point state initially
if exception_occurred:
res_txt += f"Error: {type(exception_occurred).__name__}"
messagebox.showerror("Error", f"Error retrieving elevation:\n{exception_occurred}", parent=self.root)
logger.error(f"GUI: Point elevation task completed with error for ({original_latitude:.5f},{original_longitude:.5f}).")
elif elevation_result is None:
res_txt += "Data unavailable."
messagebox.showwarning("Info", "Could not retrieve elevation for the point.", parent=self.root)
logger.warning(f"GUI: Point elevation task completed, data unavailable for ({original_latitude:.5f},{original_longitude:.5f}).")
elif isinstance(elevation_result, float) and math.isnan(elevation_result):
res_txt += "Point on NoData area."
self.last_valid_point_coords = (original_latitude, original_longitude) # Valid coords, result is NoData
logger.info(f"GUI: Point elevation task completed, NoData for ({original_latitude:.5f},{original_longitude:.5f}).")
else:
res_txt += f"Elevation {elevation_result:.2f}m"
self.last_valid_point_coords = (original_latitude, original_longitude) # Valid coords, got elevation
logger.info(f"GUI: Point elevation task completed, elevation {elevation_result:.2f}m for ({original_latitude:.5f},{original_longitude:.5f}).")
self.point_result_label.config(text=res_txt)
# MODIFIED: Re-enable point-specific visualization buttons if valid coords were obtained.
# WHY: These actions require a valid point location.
# HOW: Check if last_valid_point_coords is set and dependency libraries are available.
# The _set_busy_state(False) call already handles re-enabling based on last_valid_point_coords,
# but let's ensure the state is correctly set before clearing busy.
if self.last_valid_point_coords:
if MATPLOTLIB_AVAILABLE and PIL_AVAILABLE: self.show_2d_browse_button.config(state=tk.NORMAL)
if MATPLOTLIB_AVAILABLE: self.show_3d_dem_button.config(state=tk.NORMAL)
if MAP_VIEWER_SYSTEM_AVAILABLE: self.view_map_for_point_button.config(state=tk.NORMAL)
else:
# Ensure buttons are disabled if no valid point was obtained
self.show_2d_browse_button.config(state=tk.DISABLED)
self.show_3d_dem_button.config(state=tk.DISABLED)
if MAP_VIEWER_SYSTEM_AVAILABLE: self.view_map_for_point_button.config(state=tk.DISABLED)
self._set_busy_state(False) # Clear busy state, which also updates button states
def _trigger_download_area_task(self) -> None:
"""Starts a background thread to download tiles for an area."""
if self.is_processing_task or not self.elevation_manager: return
bounds = self._validate_area_boundary_coordinates()
if not bounds: return
self.last_area_coords = bounds
self.last_area_coords = bounds # Store validated bounds
# MODIFIED: Start busy state and update status label immediately on GUI thread.
# WHY: Provide immediate visual feedback that processing has started.
# HOW: Call _set_busy_state(True) and update label text.
self._set_busy_state(True)
self.area_download_status_label.config(text="Status: Starting...")
self.show_area_composite_button.config(state=tk.DISABLED)
if MAP_VIEWER_SYSTEM_AVAILABLE: self.view_map_for_area_button.config(state=tk.DISABLED)
self.root.update_idletasks()
self.area_download_status_label.config(text="Status: Starting download task... Please wait.")
self.root.update_idletasks() # Force GUI update
# Start download in a background thread to keep GUI responsive
# Pass bounds as separate arguments to avoid potential issues with tuple packing/unpacking in target
dl_thread = threading.Thread(target=self._perform_background_area_download, args=bounds, daemon=True)
dl_thread.start()
def _perform_background_area_download(self, min_l: float, min_o: float, max_l: float, max_o: float) -> None:
status_str, success_bool, p_c, o_c = "Status: Unknown error.", False, 0, 0
"""Background task for downloading area tiles."""
status_str, success_bool, processed_count, obtained_count = "Status: Unknown error.", False, 0, 0
try:
# Update status label on the GUI thread using root.after
self.root.after(0, lambda: self.area_download_status_label.config(text="Status: Downloading..."))
if self.elevation_manager:
p_c,o_c = self.elevation_manager.download_area(min_l,min_o,max_l,max_o)
status_str = f"Status: Complete. Processed {p_c}, Obtained {o_c} HGT."
logger.info(f"GUI Thread: Starting background download for area: Lat [{min_l:.2f}-{max_l:.2f}], Lon [{min_o:.2f}-{max_o:.2f}].")
# Call ElevationManager method to perform the download
processed_count, obtained_count = self.elevation_manager.download_area(min_l, min_o, max_l, max_o)
status_str = f"Status: Complete. Processed {processed_count}, Obtained {obtained_count} HGT."
success_bool = True
else: status_str = "Status: Error - Manager N/A."
except Exception as e: logger.exception("GUI Error: area download task"); status_str = f"Status: Error: {type(e).__name__}"
finally: self.root.after(0, self._area_download_task_complete_ui_update, status_str, success_bool, p_c, o_c)
logger.info(f"GUI Thread: Background download complete: {status_str}")
else:
status_str = "Status: Error - Elevation Manager N/A."
logger.error("GUI Thread: Background download failed - Elevation Manager is None.")
except Exception as e:
logger.exception("GUI Thread Error: Unhandled exception during area download task.")
status_str = f"Status: Error: {type(e).__name__}"
finally:
# Update GUI and re-enable buttons on the GUI thread regardless of success/failure
self.root.after(0, self._area_download_task_complete_ui_update, status_str, success_bool, processed_count, obtained_count)
def _area_download_task_complete_ui_update(self, msg: str, success: bool, proc: int, obt: int) -> None:
"""Updates GUI after area download task finishes."""
self.area_download_status_label.config(text=msg)
self._set_busy_state(False)
# MODIFIED: Re-enable area visualization buttons if the download was successful.
# WHY: These actions require downloaded area data.
# HOW: Check success flag and library availability.
if success:
summary = f"Processed {proc} tiles.\nObtained {obt} HGT files."
summary = f"Processed {proc} tile locations.\nSuccessfully obtained {obt} HGT files (downloaded or found in cache)."
messagebox.showinfo("Download Complete", summary, parent=self.root)
if MATPLOTLIB_AVAILABLE and PIL_AVAILABLE: self.show_area_composite_button.config(state=tk.NORMAL)
if MAP_VIEWER_SYSTEM_AVAILABLE and obt > 0: self.view_map_for_area_button.config(state=tk.NORMAL)
if MATPLOTLIB_AVAILABLE and PIL_AVAILABLE:
# Re-enable composite image button only if at least one tile was obtained
self.show_area_composite_button.config(state=tk.NORMAL if obt > 0 else tk.DISABLED)
if MAP_VIEWER_SYSTEM_AVAILABLE:
# Re-enable area map button only if at least one tile was obtained
self.view_map_for_area_button.config(state=tk.NORMAL if obt > 0 else tk.DISABLED)
else:
err_brief = msg.split(":")[-1].strip(); messagebox.showerror("Download Error", f"Failed: {err_brief}\nCheck logs.", parent=self.root)
err_brief = msg.split(":")[-1].strip() if ":" in msg else "An error occurred."
messagebox.showerror("Download Error", f"Area download failed:\n{err_brief}\nCheck logs for details.", parent=self.root)
# MODIFIED: Ensure area visualization buttons are disabled on failure.
# WHY: No data available for visualization.
# HOW: Explicitly disable buttons.
self.show_area_composite_button.config(state=tk.DISABLED)
if MAP_VIEWER_SYSTEM_AVAILABLE: self.view_map_for_area_button.config(state=tk.DISABLED)
self._set_busy_state(False) # Clear busy state, which also updates button states
def _start_visualization_process(self, func: callable, args_t: tuple, name_id: str) -> None:
"""Helper to start a visualization task in a separate process."""
logger.info(f"GUI: Attempting to start visualization process '{name_id}'...")
try:
# Configure logging for the child process if needed (optional, can also configure in target function)
# multiprocessing.log_to_stderr(logging.DEBUG) # Example: uncomment for verbose process logging
proc = multiprocessing.Process(target=func, args=args_t, daemon=True, name=name_id)
proc.start()
logger.info(f"Started process '{name_id}' (PID: {proc.pid}).")
except Exception as e: logger.exception(f"Failed to start '{name_id}'."); messagebox.showerror("Process Error", f"Could not start {name_id}:\n{e}", parent=self.root)
logger.info(f"GUI: Started process '{name_id}' (PID: {proc.pid}).")
# Note: The GUI main thread does not wait for these visualization processes to finish.
except Exception as e:
logger.exception(f"GUI Error: Failed to start visualization process '{name_id}'.")
messagebox.showerror("Process Error", f"Could not start {name_id}:\n{e}", parent=self.root)
def _trigger_2d_browse_display(self) -> None:
if not (MATPLOTLIB_AVAILABLE and PIL_AVAILABLE): messagebox.showwarning("Deps Error", "MPL/PIL N/A.", parent=self.root); return
if not self.last_valid_point_coords or not self.elevation_manager: messagebox.showinfo("Info", "Get elevation first.", parent=self.root); return
"""Triggers display of the browse image for the last valid point."""
if not (MATPLOTLIB_AVAILABLE and PIL_AVAILABLE):
messagebox.showwarning("Dependencies Missing", "Matplotlib or Pillow not available.", parent=self.root); return
if not self.last_valid_point_coords or not self.elevation_manager:
messagebox.showinfo("Info", "Please get elevation for a point first.", parent=self.root); return
lat, lon = self.last_valid_point_coords
logger.info(f"GUI: Requesting 2D browse image display for ({lat:.5f},{lon:.5f}).")
# get_tile_info ensures browse image download is *attempted*, but doesn't guarantee success.
info = self.elevation_manager.get_tile_info(lat, lon)
# Pass image path and tile name to the target process
if info and info.get("browse_available") and info.get("browse_image_path"):
args = (info["browse_image_path"], info.get("tile_base_name","?"), f"Browse: {info.get('tile_base_name','?').upper()}")
browse_img_path = info["browse_image_path"]
tile_name = info.get("tile_base_name","?")
window_title = f"Browse: {tile_name.upper()}"
# The process target will load and display the image from the path
args = (browse_img_path, tile_name, window_title)
self._start_visualization_process(process_target_show_image, args, "2DBrowse")
else: messagebox.showinfo("Image Info", "Browse image N/A.", parent=self.root)
else:
logger.warning(f"GUI: Browse image not available for ({lat:.5f},{lon:.5f}).")
messagebox.showinfo("Image Info", "Browse image not available for this tile.", parent=self.root)
def _trigger_3d_dem_display(self) -> None:
if not MATPLOTLIB_AVAILABLE: messagebox.showwarning("Deps Error", "MPL N/A.", parent=self.root); return
if not self.last_valid_point_coords or not self.elevation_manager: messagebox.showinfo("Info", "Get elevation first.", parent=self.root); return
"""Triggers display of the 3D DEM plot for the last valid point."""
if not MATPLOTLIB_AVAILABLE:
messagebox.showwarning("Dependencies Missing", "Matplotlib not available.", parent=self.root); return
# SciPy is optional for advanced features but required by the plotting function implementation
# Let's make sure we warn the user if SciPy is needed for full functionality
if not SCIPY_AVAILABLE:
logger.warning("GUI: SciPy not available. Advanced 3D plot features (smoothing/interpolation) will be disabled.")
# Decide if we want to show a warning box here or just rely on the log.
# messagebox.showwarning("Dependencies Missing", "SciPy not available. Advanced 3D features disabled.", parent=self.root)
if not self.last_valid_point_coords or not self.elevation_manager:
messagebox.showinfo("Info", "Please get elevation for a point first.", parent=self.root); return
lat, lon = self.last_valid_point_coords
logger.info(f"GUI: Requesting 3D DEM display for ({lat:.5f},{lon:.5f}).")
# get_hgt_data ensures HGT download is *attempted*.
data = self.elevation_manager.get_hgt_data(lat, lon)
if data is not None:
# Retrieve tile info again for tile name, even if HGT data was obtained directly.
# get_tile_info won't re-download if HGT is already there.
info = self.elevation_manager.get_tile_info(lat, lon)
t_name = info.get("tile_base_name","?").upper() if info else "?"
p_title = f"3D View: Tile {t_name}"
cfg_is, cfg_ss, cfg_if, cfg_pgp = 1, 0.5, 3, 300
if not SCIPY_AVAILABLE: cfg_ss, cfg_if = None, 1; logger.warning("SciPy N/A. Advanced 3D plot disabled.")
args = (data, p_title, cfg_is, cfg_ss, cfg_if, cfg_pgp)
tile_name = info.get("tile_base_name","?") if info else "?"
plot_title = f"3D View: Tile {tile_name.upper()}"
# Configuration parameters for the 3D plot processing (passed to the target process)
# These can be made configurable in the GUI if needed later.
config_initial_subsample = 1 # Initial subsampling factor of the raw data
config_smooth_sigma = 0.5 # Sigma for Gaussian smoothing (set to None to disable)
config_interpolation_factor = 3 # Interpolation factor (e.g., 3x for 3x denser grid)
config_plot_grid_points = 300 # Target number of points along each axis for the final plot grid
# Pass the raw NumPy array and config to the target process
# The process target will handle data processing (subsampling, smoothing, interpolation) and plotting.
args = (data, plot_title, config_initial_subsample, config_smooth_sigma,
config_interpolation_factor, config_plot_grid_points)
self._start_visualization_process(process_target_show_3d, args, "3DDEM")
else: messagebox.showerror("3D Data Error", "Could not retrieve HGT data.", parent=self.root)
else:
logger.warning(f"GUI: HGT data not available for ({lat:.5f},{lon:.5f}). Cannot show 3D plot.")
messagebox.showerror("3D Data Error", "Could not retrieve HGT data for this tile.", parent=self.root)
def _trigger_area_composite_display(self) -> None:
if not (MATPLOTLIB_AVAILABLE and PIL_AVAILABLE): messagebox.showwarning("Deps Error", "MPL/PIL N/A.", parent=self.root); return
if not self.last_area_coords or not self.elevation_manager: messagebox.showinfo("Info", "Download area first.", parent=self.root); return
min_l,min_o,max_l,max_o = self.last_area_coords
info_list = self.elevation_manager.get_area_tile_info(min_l,min_o,max_l,max_o)
if not info_list: messagebox.showinfo("Area Info", "No tile info found.", parent=self.root); return
"""Triggers display of the composite browse image for the last downloaded area."""
if not (MATPLOTLIB_AVAILABLE and PIL_AVAILABLE):
messagebox.showwarning("Dependencies Missing", "Matplotlib or Pillow not available.", parent=self.root); return
if not self.last_area_coords or not self.elevation_manager:
messagebox.showinfo("Info", "Please download an area first.", parent=self.root); return
min_l, min_o, max_l, max_o = self.last_area_coords
logger.info(f"GUI: Requesting area composite display for area: Lat [{min_l:.2f}-{max_l:.2f}], Lon [{min_o:.2f}-{max_o:.2f}]")
# get_area_tile_info returns info for tiles that *should* cover the area,
# and includes paths to locally available files (doesn't trigger download here).
info_list = self.elevation_manager.get_area_tile_info(min_l, min_o, max_l, max_o)
# Check if any browse images are potentially available before starting the process
available_browse_images = [
info for info in info_list if info.get("browse_available") and info.get("browse_image_path")
]
if not available_browse_images:
logger.warning("GUI: No browse images found for the downloaded area.")
messagebox.showinfo("Area Info", "No browse images were found locally for the specified area.", parent=self.root); return
title = f"Area: Lat [{min_l:.1f}-{max_l:.1f}], Lon [{min_o:.1f}-{max_o:.1f}]"
args = (info_list, title)
# Pass the filtered list of available info dicts to the process
# The process target will create the composite image from these.
args = (available_browse_images, title)
self._start_visualization_process(process_target_create_show_area, args, "AreaComposite")
def _start_map_viewer_process(self, base_args_tuple: tuple) -> None:
"""Helper to start the map viewer process."""
if not MAP_VIEWER_SYSTEM_AVAILABLE:
messagebox.showerror("Map System Unavailable", "Map viewer dependencies (OpenCV, etc.) are not available.", parent=self.root)
logger.error("GUI: Map viewer process start failed: MAP_VIEWER_SYSTEM_AVAILABLE is False.")
return
if not self.map_interaction_message_queue:
logger.error("Cannot start map viewer: interaction queue missing."); messagebox.showerror("Internal Error", "Map comms not ready.", parent=self.root); return
logger.error("GUI: Cannot start map viewer: interaction queue not initialized.");
messagebox.showerror("Internal Error", "Map communication queue not ready.", parent=self.root); return
# Check if a map viewer process is already running
if self.map_viewer_process_handle and self.map_viewer_process_handle.is_alive():
messagebox.showinfo("Info", "Map viewer is already open.", parent=self.root)
logger.info("GUI: Map viewer process already alive.")
return
logger.info("GUI: Attempting to start map viewer process...")
try:
# The target function `run_map_viewer_process_target` requires the DEM cache directory path.
# This was stored during GUI initialization.
dem_cache_dir = self._dem_data_cache_dir_for_map_process
# Include the DEM cache directory and display scale in the arguments tuple.
# base_args_tuple already contains (map_interaction_q, operation_mode, center_latitude, center_longitude, area_bounding_box).
# Add dem_cache_dir and display_scale_factor to the end.
scale_val = self.map_display_scale_factor_var.get()
full_args = base_args_tuple + (scale_val,)
self.map_viewer_process_handle = multiprocessing.Process(target=run_map_viewer_process_target, args=full_args, daemon=True, name="GeoElevationMapViewer")
full_args = base_args_tuple + (dem_cache_dir, scale_val)
self.map_viewer_process_handle = multiprocessing.Process(
target=run_map_viewer_process_target,
args=full_args,
daemon=True, # Daemon process will be terminated when the main GUI process exits
name="GeoElevationMapViewer"
)
self.map_viewer_process_handle.start()
logger.info(f"Started map viewer PID: {self.map_viewer_process_handle.pid}, Scale: {scale_val:.3f}.")
self.root.after(100, self._process_map_interaction_queue_messages)
except Exception as e: logger.exception("Failed to start map viewer."); messagebox.showerror("Process Error",f"Could not start map viewer:\n{e}",parent=self.root)
logger.info(f"GUI: Started map viewer process PID: {self.map_viewer_process_handle.pid}, Scale: {scale_val:.3f}.")
# Ensure the GUI starts checking the queue for messages from the new process.
# The initial `self.root.after` call in __init__ handles this, but doesn't hurt to be sure.
# self.root.after(100, self._process_map_interaction_queue_messages) # Already scheduled
except Exception as e:
logger.exception("GUI Error: Failed to start map viewer process.")
messagebox.showerror("Process Error",f"Could not start map viewer:\n{e}",parent=self.root)
def _trigger_view_map_for_point(self) -> None:
if not MAP_VIEWER_SYSTEM_AVAILABLE: messagebox.showerror("Feature N/A", "Map viewer system N/A.", parent=self.root); return
if not self.last_valid_point_coords: messagebox.showinfo("Info", "Get elevation first.", parent=self.root); return
if not self.map_interaction_message_queue: messagebox.showerror("Internal Error", "Map queue N/A.", parent=self.root); return
if self.map_viewer_process_handle and self.map_viewer_process_handle.is_alive(): messagebox.showinfo("Info", "Map already open.", parent=self.root); return
"""Triggers the display of a map centered on the last valid point."""
# MODIFIED: Check map system availability first.
# WHY: Prevent starting process logic if dependencies are missing.
# HOW: Added initial check.
if not MAP_VIEWER_SYSTEM_AVAILABLE:
messagebox.showerror("Map System Unavailable", "Map viewer dependencies are not available.", parent=self.root); return
if not self.last_valid_point_coords:
messagebox.showinfo("Info", "Please get elevation for a point first.", parent=self.root); return
lat, lon = self.last_valid_point_coords
# MODIFIED: Use the correctly imported GEOELEVATION_DEM_CACHE_DEFAULT if elevation_manager is None
# WHY: Consistency in using the configured default cache path if the manager wasn't initialized
# due to an error but we still attempt to start the map viewer.
# HOW: Added a check for self.elevation_manager and used the constant if it's None.
dem_cache = self.elevation_manager.tile_directory if self.elevation_manager else GEOELEVATION_DEM_CACHE_DEFAULT
base_args = (self.map_interaction_message_queue, "point", lat, lon, None, dem_cache)
logger.info(f"GUI: Requesting map view for point ({lat:.5f},{lon:.5f}).")
# Prepare base arguments for the map viewer process target function.
# It needs the queue, mode, point coords, area bbox (None for point mode), and cache dir (added in _start_map_viewer_process).
# The display scale is also added in _start_map_viewer_process.
base_args = (self.map_interaction_message_queue, "point", lat, lon, None)
# Call the helper to start the process
self._start_map_viewer_process(base_args)
def _trigger_view_map_for_area(self) -> None:
if not MAP_VIEWER_SYSTEM_AVAILABLE: messagebox.showerror("Feature N/A", "Map viewer system N/A.", parent=self.root); return
if not self.last_area_coords: messagebox.showinfo("Info", "Download area first.", parent=self.root); return
if not self.map_interaction_message_queue: messagebox.showerror("Internal Error", "Map queue N/A.", parent=self.root); return
if self.map_viewer_process_handle and self.map_viewer_process_handle.is_alive(): messagebox.showinfo("Info", "Map already open.", parent=self.root); return
"""Triggers the display of a map covering the last downloaded area."""
# MODIFIED: Check map system availability first.
# WHY: Prevent starting process logic if dependencies are missing.
# HOW: Added initial check.
if not MAP_VIEWER_SYSTEM_AVAILABLE:
messagebox.showerror("Map System Unavailable", "Map viewer dependencies are not available.", parent=self.root); return
if not self.last_area_coords:
messagebox.showinfo("Info", "Please download an area first.", parent=self.root); return
min_l, min_o, max_l, max_o = self.last_area_coords
bbox = (min_o, min_l, max_o, max_l)
# MODIFIED: Use the correctly imported GEOELEVATION_DEM_CACHE_DEFAULT if elevation_manager is None
# WHY: Consistency in using the configured default cache path if the manager wasn't initialized
# due to an error but we still attempt to start the map viewer.
# HOW: Added a check for self.elevation_manager and used the constant if it's None.
dem_cache = self.elevation_manager.tile_directory if self.elevation_manager else GEOELEVATION_DEM_CACHE_DEFAULT
base_args = (self.map_interaction_message_queue, "area", None, None, bbox, dem_cache)
logger.info(f"GUI: Requesting map view for area Lat [{min_l:.2f}-{max_l:.2f}], Lon [{min_o:.2f}-{max_o:.2f}].")
# Prepare base arguments for the map viewer process target function.
# It needs the queue, mode, point coords (None for area mode), area bbox, and cache dir (added in _start_map_viewer_process).
# The display scale is also added in _start_map_viewer_process.
area_bbox_for_process = (min_o, min_l, max_o, max_l) # Pass as (west, south, east, north) as expected by map_utils/geo_map_viewer
base_args = (self.map_interaction_message_queue, "area", None, None, area_bbox_for_process)
# Call the helper to start the process
self._start_map_viewer_process(base_args)
def _process_map_interaction_queue_messages(self) -> None:
if not self.map_interaction_message_queue: return
"""Periodically checks the queue for messages from the map viewer process and updates the GUI."""
if not self.map_interaction_message_queue:
logger.debug("GUI: Map interaction queue is not initialized, stopping message processing.")
return # Stop scheduling if queue is gone
try:
# Process all messages currently in the queue without blocking
while not self.map_interaction_message_queue.empty():
msg = self.map_interaction_message_queue.get_nowait()
msg_t = msg.get("type")
if msg_t == "map_click_data":
lat,lon,elev_s = msg.get("latitude"), msg.get("longitude"), msg.get("elevation_str", "Error")
lat_txt,lon_txt = (f"{lat:.5f}" if lat is not None else "N/A"), (f"{lon:.5f}" if lon is not None else "N/A")
# MODIFIED: Handle the new message type 'map_info_update'.
# WHY: This message is sent by the map process for initial point info, click info, and errors.
# HOW: Check for the type and update all relevant Map Info labels.
if msg_t == "map_info_update":
lat_val = msg.get("latitude")
lon_val = msg.get("longitude")
elev_str = msg.get("elevation_str", "N/A") # Default to N/A if key is missing
map_area_size_str = msg.get("map_area_size_str", "N/A") # Default to N/A
# Format coordinates for display
lat_txt = f"{lat_val:.5f}" if isinstance(lat_val, (int, float)) and not math.isnan(lat_val) else "N/A"
lon_txt = f"{lon_val:.5f}" if isinstance(lon_val, (int, float)) and not math.isnan(lon_val) else "N/A"
self.map_click_latitude_label.config(text=lat_txt)
self.map_click_longitude_label.config(text=lon_txt)
self.map_click_elevation_label.config(text=elev_s)
except queue.Empty: pass
except Exception as e: logger.error(f"Error processing map queue: {e}")
self.map_click_elevation_label.config(text=elev_str)
# MODIFIED: Update the new map area size label.
# WHY: Display the size received from the map process.
# HOW: Set the text of map_area_size_label.
self.map_area_size_label.config(text=map_area_size_str)
logger.debug(f"GUI: Updated Map Info panel with: Lat={lat_txt}, Lon={lon_txt}, Elev='{elev_str}', Size='{map_area_size_str}'")
# MODIFIED: Handle the new message type 'map_fetching_status'.
# WHY: Receive progress/status updates specifically for map tile fetching.
# HOW: Update the map info labels with the status message.
elif msg_t == "map_fetching_status":
status_message = msg.get("status", "Fetching...")
# Update all map info labels to show the status, maybe clear coords/elev temporarily
self.map_click_latitude_label.config(text="") # Clear coords
self.map_click_longitude_label.config(text="")
self.map_click_elevation_label.config(text=status_message) # Show status as elevation text
self.map_area_size_label.config(text="") # Clear size
# elif msg_t == "another_message_type":
# # Handle other message types if needed in the future
# pass
else:
# Log any messages with unrecognized types
logger.warning(f"GUI: Received unrecognized message type from map queue: {msg_t} (Full message: {msg})")
except queue.Empty:
# This exception is expected when the queue is empty, just means no messages were ready.
pass
except Exception as e:
# Catch any other unexpected errors while processing messages
logger.error(f"GUI Error: Exception during map queue message processing: {e}", exc_info=True)
# Optional: Update GUI status to indicate communication error
finally:
if self.map_interaction_message_queue: self.root.after(250, self._process_map_interaction_queue_messages)
# Schedule this method to run again after a short delay to keep checking the queue.
# Only reschedule if the root window still exists.
if self.root.winfo_exists():
self.root.after(250, self._process_map_interaction_queue_messages)
else:
logger.debug("GUI: Root window no longer exists, stopping map queue message processing scheduling.")
def _on_application_closing(self) -> None:
"""Handles cleanup when the main application window is closed."""
logger.info("Main GUI application closing...")
# MODIFIED: Terminate the map viewer process if it's running.
# WHY: The separate process should not be left running when the main application exits.
# HOW: Check if the process handle exists and is alive, then attempt to terminate/kill it.
if self.map_viewer_process_handle and self.map_viewer_process_handle.is_alive():
logger.info("Terminating active map viewer process...")
self.map_viewer_process_handle.terminate()
self.map_viewer_process_handle.join(timeout=1.5)
if self.map_viewer_process_handle.is_alive():
logger.warning("Map viewer process did not terminate via SIGTERM. Killing."); self.map_viewer_process_handle.kill()
self.map_viewer_process_handle.join(timeout=0.5)
logger.info(f"GUI: Terminating active map viewer process PID: {self.map_viewer_process_handle.pid}...")
try:
# Use terminate (sends SIGTERM) first for graceful shutdown
self.map_viewer_process_handle.terminate()
# Wait a bit for the process to exit cleanly
self.map_viewer_process_handle.join(timeout=3.0) # Increased timeout slightly
# If it's still alive, resort to killing (SIGKILL)
if self.map_viewer_process_handle.is_alive():
logger.warning(f"GUI: Map viewer process PID {self.map_viewer_process_handle.pid} did not terminate cleanly. Killing.")
self.map_viewer_process_handle.kill()
self.map_viewer_process_handle.join(timeout=1.0) # Wait briefly after killing
if self.map_viewer_process_handle.is_alive():
logger.error(f"GUI: Map viewer process PID {self.map_viewer_process_handle.pid} is still alive after killing. May be orphaned.")
else:
logger.info(f"GUI: Map viewer process PID {self.map_viewer_process_handle.pid} has exited.")
except Exception as e_terminate:
logger.exception(f"GUI Error: Exception during map viewer process termination:")
# MODIFIED: Close the multiprocessing queue if it exists.
# WHY: Release resources associated with the queue.
# HOW: Call close() and potentially join_thread().
if self.map_interaction_message_queue:
logger.debug("Closing map interaction queue."); self.map_interaction_message_queue.close()
try: self.map_interaction_message_queue.join_thread()
except Exception as e: logger.warning(f"Error joining map queue thread: {e}")
logger.debug("GUI: Closing map interaction queue.")
try:
# Closing the queue signals that no more data will be added.
self.map_interaction_message_queue.close()
# join_thread() waits for the background thread that flushes the queue buffer.
# Can sometimes block if the other end is gone or blocked. Use a timeout.
# self.map_interaction_message_queue.join_thread(timeout=1.0) # Optional, may hang
except Exception as e_queue_close:
logger.warning(f"GUI Error: Exception closing map queue: {e_queue_close}")
# Finally, destroy the main Tkinter window
self.root.destroy()
logger.info("Main GUI window destroyed.")
# --- Main Execution Block (for direct script testing) ---
if __name__ == "__main__":
print("Running elevation_gui.py directly for testing purposes...")
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s-%(levelname)s-%(name)s-%(module)s-%(message)s")
# Configure logging for the main process when running this script directly
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s-%(levelname)s-%(name)s-%(module)s-%(message)s", stream=sys.stdout)
logger.info(f"TEST: Rasterio Avail: {RASTERIO_AVAILABLE}, PIL Avail: {PIL_AVAILABLE}, MPL Avail: {MATPLOTLIB_AVAILABLE}, SciPy Avail: {SCIPY_AVAILABLE}, MapViewerSys Avail: {MAP_VIEWER_SYSTEM_AVAILABLE}")
# multiprocessing.freeze_support() is essential for creating standalone executables
# that use multiprocessing, especially on Windows. It must be called within the
# main block of the script that starts the other processes.
# MODIFIED: Keep freeze_support() here in the __main__ block as it's the entry point for the GUI when run directly.
# WHY: It's a requirement for multiprocessing when the script is frozen (e.g., by PyInstaller).
# HOW: Keep the call here. It's also called in geoelevation/__main__.py, which is the
# primary entry point for the package as a whole. Calling it here ensures it runs
# if *this specific script* is executed directly (python elevation_gui.py), which is
# only for testing but good practice.
multiprocessing.freeze_support()
test_root = tk.Tk()
app_test_instance: Optional[ElevationApp] = None
try:
@ -673,6 +1112,13 @@ if __name__ == "__main__":
test_root.mainloop()
except Exception as e_test_run:
logger.critical(f"Fatal error in direct test run: {e_test_run}", exc_info=True)
if app_test_instance: app_test_instance._on_application_closing() # Try cleanup
elif test_root.winfo_exists(): test_root.destroy()
sys.exit(1)
# Attempt cleanup on fatal error
if app_test_instance:
try:
app_test_instance._on_application_closing()
except Exception as e_cleanup:
logger.error(f"Error during cleanup after fatal test run error: {e_cleanup}")
elif test_root.winfo_exists():
# If app instance wasn't created, just destroy the root if it still exists
test_root.destroy()
sys.exit(1) # Exit with error code

File diff suppressed because it is too large Load Diff

View File

@ -12,21 +12,10 @@ relying on the 'mercantile' library for Web Mercator projections.
# Standard library imports
import logging
import sys # Import sys for logging stream
from typing import Optional, Tuple, Any # 'Any' for app_facade type hint
# Third-party imports
try:
import cv2 # OpenCV for windowing and drawing
import numpy as np
CV2_NUMPY_LIBS_AVAILABLE_DISPLAY = True
except ImportError:
cv2 = None # type: ignore
np = None # type: ignore
CV2_NUMPY_LIBS_AVAILABLE_DISPLAY = False
# Logging might not be set up if this is imported very early by a child process
print("ERROR: MapDisplay - OpenCV or NumPy not found. Map display cannot function.")
try:
from PIL import Image
ImageType = Image.Image # type: ignore
@ -35,7 +24,20 @@ except ImportError:
Image = None # type: ignore
ImageType = None # type: ignore
PIL_LIB_AVAILABLE_DISPLAY = False
print("WARNING: MapDisplay - Pillow (PIL) not found. Image conversion from PIL might fail.")
# Logging might not be set up if this is imported very early by a child process
# So, direct print or rely on higher-level logger configuration.
print("ERROR: MapDisplay - Pillow (PIL) library not found. Image conversion from PIL might fail.")
try:
import cv2 # OpenCV for windowing and drawing
import numpy as np
CV2_NUMPY_LIBS_AVAILABLE_DISPLAY = True
except ImportError:
cv2 = None # type: ignore
np = None # type: ignore
CV2_NUMPY_LIBS_AVAILABLE_DISPLAY = False
print("ERROR: MapDisplay - OpenCV or NumPy not found. Drawing and image operations will fail.")
try:
import mercantile # For Web Mercator tile calculations and coordinate conversions
@ -71,16 +73,24 @@ class MapDisplayWindow:
Args:
app_facade: An object that has a 'handle_map_mouse_click(x, y)' method
and an attribute 'current_display_scale_factor'.
and an attribute 'current_display_scale_factor'.
window_name_str: The name for the OpenCV window.
initial_screen_x_pos: Initial X screen position for the window.
initial_screen_y_pos: Initial Y screen position for the window.
"""
logger.info(f"Initializing MapDisplayWindow with name: '{window_name_str}'")
# MODIFIED: Added a check for critical dependencies at init.
# WHY: Ensure the class can function before proceeding.
# HOW: Raise ImportError if dependencies are missing.
if not CV2_NUMPY_LIBS_AVAILABLE_DISPLAY:
# This error should ideally prevent the application from reaching this point
# if map functionality is critical and relies on this module.
raise ImportError("OpenCV and/or NumPy are not available for MapDisplayWindow operation.")
critical_msg = "OpenCV and/or NumPy are not available for MapDisplayWindow operation."
logger.critical(critical_msg)
raise ImportError(critical_msg)
# PIL is needed for image conversion, but not strictly for windowing itself,
# though show_map will likely fail without it for non-numpy inputs.
# mercantile is needed for pixel-geo conversions, not for windowing.
# We'll check those where they are strictly needed.
self.app_facade_handler: Any = app_facade # Facade to access scale and report clicks
self.opencv_window_name: str = window_name_str
@ -113,27 +123,49 @@ class MapDisplayWindow:
if map_pil_image_input is None:
logger.warning("Received None PIL image for display. Generating a placeholder.")
bgr_image_unscaled = self._create_placeholder_bgr_numpy_array()
# MODIFIED: Added more explicit check for PIL availability and instance type.
# WHY: Ensure PIL is available before attempting conversion from a PIL object.
# HOW: Included PIL_LIB_AVAILABLE_DISPLAY in the check.
elif PIL_LIB_AVAILABLE_DISPLAY and isinstance(map_pil_image_input, Image.Image): # type: ignore
logger.debug(
f"Converting input PIL Image (Size: {map_pil_image_input.size}, Mode: {map_pil_image_input.mode}) to BGR."
)
bgr_image_unscaled = self._convert_pil_image_to_bgr_numpy_array(map_pil_image_input)
else:
# This else branch handles cases where input is not None, not a PIL Image, or PIL is not available.
logger.error(
f"Received unexpected image type for display: {type(map_pil_image_input)}. Using placeholder."
f"Received unexpected image type for display: {type(map_pil_image_input)}. Or Pillow is missing. Using placeholder."
)
bgr_image_unscaled = self._create_placeholder_bgr_numpy_array()
if bgr_image_unscaled is None: # Fallback if conversion or placeholder failed
logger.error("Failed to obtain a BGR image (unscaled) for display. Using minimal black square.")
bgr_image_unscaled = np.zeros((256, 256, 3), dtype=np.uint8) # type: ignore
# MODIFIED: Create a minimal black image using NumPy for robustness.
# WHY: Ensure a displayable image is created even if placeholder creation fails.
# HOW: Use np.zeros.
if np: # Ensure np is available
bgr_image_unscaled = np.zeros((100, 100, 3), dtype=np.uint8) # type: ignore
else:
logger.critical("NumPy not available, cannot even create fallback black image for imshow.")
return # Cannot proceed without NumPy
# --- Apply Display Scaling ---
scaled_bgr_image_for_display: np.ndarray = bgr_image_unscaled # type: ignore
try:
display_scale = 1.0 # Default scale if not found on facade
if hasattr(self.app_facade_handler, 'current_display_scale_factor'):
display_scale = float(self.app_facade_handler.current_display_scale_factor)
# MODIFIED: Added check that app_facade_handler is not None before accessing its attribute.
# WHY: Avoids AttributeError if facade is unexpectedly None.
# HOW: Check 'if self.app_facade_handler and hasattr(...)'.
if self.app_facade_handler and hasattr(self.app_facade_handler, 'current_display_scale_factor'):
# MODIFIED: Added try-except around float conversion of scale factor.
# WHY: Defend against non-numeric scale factor values.
# HOW: Use a try-except block.
try:
display_scale = float(self.app_facade_handler.current_display_scale_factor)
except (ValueError, TypeError) as e_scale_conv:
logger.warning(f"Could not convert scale factor from facade to float: {self.app_facade_handler.current_display_scale_factor}. Using 1.0. Error: {e_scale_conv}")
display_scale = 1.0
if display_scale <= 0: # Prevent invalid scale
logger.warning(f"Invalid scale factor {display_scale} from facade. Using 1.0.")
display_scale = 1.0
@ -145,12 +177,13 @@ class MapDisplayWindow:
f"Unscaled BGR image size: {unscaled_w}x{unscaled_h}. Applying display scale: {display_scale:.3f}"
)
# Only resize if scale is not 1.0 (with tolerance) and image dimensions are valid
if abs(display_scale - 1.0) > 1e-6 and unscaled_w > 0 and unscaled_h > 0:
target_w = max(1, int(round(unscaled_w * display_scale)))
target_h = max(1, int(round(unscaled_h * display_scale)))
interpolation_method = cv2.INTER_LINEAR if display_scale > 1.0 else cv2.INTER_AREA # type: ignore
logger.debug(f"Resizing image from {unscaled_w}x{unscaled_h} to {target_w}x{target_h}")
logger.debug(f"Resizing image from {unscaled_w}x{unscaled_h} to {target_w}x{target_h} using {interpolation_method}.")
scaled_bgr_image_for_display = cv2.resize( # type: ignore
bgr_image_unscaled, (target_w, target_h), interpolation=interpolation_method
)
@ -166,7 +199,17 @@ class MapDisplayWindow:
# Recreate OpenCV window if its initialized state suggests it's needed,
# or if the size of the (scaled) image to be displayed has changed.
if self.is_opencv_window_initialized and \
# Only recreate if window is initialized and its size changed from the last displayed size.
# We also add a check if the window exists.
window_exists = False
try:
# Check if the window property can be retrieved without error
if CV2_NUMPY_LIBS_AVAILABLE_DISPLAY and cv2.getWindowProperty(self.opencv_window_name, cv2.WND_PROP_AUTOSIZE) >= 0: # type: ignore # Any property check works
window_exists = True
except cv2.error: # type: ignore
window_exists = False # Error means window is gone
if self.is_opencv_window_initialized and window_exists and \
(current_disp_h, current_disp_w) != self._last_displayed_scaled_image_shape:
logger.info(
f"Scaled image size changed ({self._last_displayed_scaled_image_shape} -> "
@ -177,50 +220,76 @@ class MapDisplayWindow:
cv2.waitKey(5) # Allow OS to process window destruction
self.is_opencv_window_initialized = False
self.is_opencv_mouse_callback_set = False # Callback must be reset
self._last_displayed_scaled_image_shape = (0, 0) # Reset stored size
except cv2.error as e_cv_destroy: # type: ignore
logger.warning(f"Error destroying OpenCV window before recreation: {e_cv_destroy}")
self.is_opencv_window_initialized = False # Force recreation attempt
self.is_opencv_mouse_callback_set = False
self._last_displayed_scaled_image_shape = (0, 0)
# Ensure OpenCV window exists and mouse callback is properly set
if not self.is_opencv_window_initialized:
# Create window if not initialized or if it was destroyed unexpectedly
if not self.is_opencv_window_initialized or not window_exists:
logger.debug(f"Creating/moving OpenCV window: '{self.opencv_window_name}' (AUTOSIZE)")
cv2.namedWindow(self.opencv_window_name, cv2.WINDOW_AUTOSIZE) # type: ignore
try:
cv2.moveWindow(self.opencv_window_name, self.window_start_x_position, self.window_start_y_position) # type: ignore
except cv2.error as e_cv_move: # type: ignore
logger.warning(f"Could not move OpenCV window '{self.opencv_window_name}': {e_cv_move}")
self.is_opencv_window_initialized = True # Assume created even if move failed
logger.info(f"OpenCV window '{self.opencv_window_name}' (AUTOSIZE) is ready.")
cv2.namedWindow(self.opencv_window_name, cv2.WINDOW_AUTOSIZE) # type: ignore
# Try moving the window. This might fail on some systems or if window creation is delayed.
try:
cv2.moveWindow(self.opencv_window_name, self.window_start_x_position, self.window_start_y_position) # type: ignore
except cv2.error as e_cv_move: # type: ignore
logger.warning(f"Could not move OpenCV window '{self.opencv_window_name}': {e_cv_move}")
self.is_opencv_window_initialized = True # Assume created even if move failed
logger.info(f"OpenCV window '{self.opencv_window_name}' (AUTOSIZE) is ready.")
except Exception as e_window_create:
logger.error(f"Failed to create OpenCV window '{self.opencv_window_name}': {e_window_create}")
self.is_opencv_window_initialized = False # Mark as not initialized
self.is_opencv_mouse_callback_set = False
self._last_displayed_scaled_image_shape = (0, 0)
# Cannot proceed to imshow or set mouse callback if window creation failed.
return
# Set mouse callback if the window is initialized and callback hasn't been set
if self.is_opencv_window_initialized and not self.is_opencv_mouse_callback_set:
try:
cv2.setMouseCallback(self.opencv_window_name, self._opencv_mouse_callback, param=self.app_facade_handler) # type: ignore
self.is_opencv_mouse_callback_set = True
logger.info(f"Mouse callback successfully set for '{self.opencv_window_name}'.")
except cv2.error as e_cv_callback: # type: ignore
logger.error(f"Failed to set mouse callback for '{self.opencv_window_name}': {e_cv_callback}")
# MODIFIED: Added check that app_facade_handler is not None before setting callback param.
# WHY: Avoids passing None as param, although OpenCV might handle it, it's safer.
# HOW: Check 'if self.app_facade_handler:'.
if self.app_facade_handler:
try:
cv2.setMouseCallback(self.opencv_window_name, self._opencv_mouse_callback, param=self.app_facade_handler) # type: ignore
self.is_opencv_mouse_callback_set = True
logger.info(f"Mouse callback successfully set for '{self.opencv_window_name}'.")
except cv2.error as e_cv_callback: # type: ignore
logger.error(f"Failed to set mouse callback for '{self.opencv_window_name}': {e_cv_callback}")
self.is_opencv_mouse_callback_set = False # Mark as failed to set
else:
logger.warning("App facade is None, cannot set mouse callback parameter.")
self.is_opencv_mouse_callback_set = False
# Display the final (scaled) image
try:
cv2.imshow(self.opencv_window_name, scaled_bgr_image_for_display) # type: ignore
# Store the shape of the image that was actually displayed (scaled)
self._last_displayed_scaled_image_shape = (current_disp_h, current_disp_w)
# cv2.waitKey(1) is important for OpenCV to process GUI events.
# The main event loop for this window is expected to be handled by the
# calling process (e.g., the run_map_viewer_process_target loop).
except cv2.error as e_cv_imshow: # type: ignore
if "NULL window" in str(e_cv_imshow).lower() or \
"invalid window" in str(e_cv_imshow).lower() or \
"checkView" in str(e_cv_imshow).lower():
logger.warning(f"OpenCV window '{self.opencv_window_name}' seems closed or invalid during imshow operation.")
self.is_opencv_window_initialized = False
self.is_opencv_mouse_callback_set = False
self._last_displayed_scaled_image_shape = (0, 0)
else: # Re-raise other OpenCV errors if not related to window state
logger.exception(f"OpenCV error during map display (imshow): {e_cv_imshow}")
except Exception as e_disp_final:
logger.exception(f"Unexpected error displaying final map image: {e_disp_final}")
# Display the final (scaled) image if the window is initialized
if self.is_opencv_window_initialized:
try:
cv2.imshow(self.opencv_window_name, scaled_bgr_image_for_display) # type: ignore
# Store the shape of the image that was actually displayed (scaled)
self._last_displayed_scaled_image_shape = (current_disp_h, current_disp_w)
# cv2.waitKey(1) is important for OpenCV to process GUI events.
# The main event loop for this window is expected to be handled by the
# calling process (e.g., the run_map_viewer_process_target loop).
except cv2.error as e_cv_imshow: # type: ignore
# Catch specific OpenCV errors that indicate the window is gone
error_str = str(e_cv_imshow).lower()
if "null window" in error_str or "invalid window" in error_str or "checkview" in error_str:
logger.warning(f"OpenCV window '{self.opencv_window_name}' seems closed or invalid during imshow operation.")
# Reset state flags as the window is gone
self.is_opencv_window_initialized = False
self.is_opencv_mouse_callback_set = False
self._last_displayed_scaled_image_shape = (0, 0)
else: # Re-raise other OpenCV errors if not related to window state
logger.exception(f"OpenCV error during map display (imshow): {e_cv_imshow}")
except Exception as e_disp_final:
logger.exception(f"Unexpected error displaying final map image: {e_disp_final}")
else:
logger.error("Cannot display image: OpenCV window is not initialized.")
def _opencv_mouse_callback(self, event_type: int, x_coord: int, y_coord: int, flags: int, app_facade_param: Any) -> None:
@ -229,27 +298,43 @@ class MapDisplayWindow:
Invoked by OpenCV when a mouse event occurs in the managed window.
Clamps coordinates and calls the app_facade's handler method.
'app_facade_param' is expected to be the GeoElevationMapViewer instance.
This callback runs in the OpenCV internal thread.
"""
# MODIFIED: Added check for left mouse button down event.
# WHY: Only process clicks, ignore other mouse events like move etc.
# HOW: Check event_type.
if event_type == cv2.EVENT_LBUTTONDOWN: # type: ignore # Check for left mouse button down event
logger.debug(f"OpenCV Mouse Event: LBUTTONDOWN at raw pixel ({x_coord},{y_coord})")
# Get the dimensions of the image currently displayed (which is the scaled image)
current_displayed_height, current_displayed_width = self._last_displayed_scaled_image_shape
if current_displayed_width <= 0 or current_displayed_height <= 0:
logger.warning("Mouse click on map, but no valid displayed image dimensions are stored.")
return # Cannot process click without knowing the displayed image size
# Clamp clicked x, y coordinates to be within the bounds of the displayed (scaled) image
# This is important because the click coordinates can sometimes be slightly outside the window bounds,
# or the image size might momentarily not match the window size.
x_coord_clamped = max(0, min(x_coord, current_displayed_width - 1))
y_coord_clamped = max(0, min(y_coord, current_displayed_height - 1))
logger.debug(
f"Map Window Left Click (OpenCV raw): ({x_coord},{y_coord}), "
f"Clamped to displayed image: ({x_coord_clamped},{y_coord_clamped})"
f"Clamped to displayed image ({current_displayed_width}x{current_displayed_height}): "
f"({x_coord_clamped},{y_coord_clamped})"
)
# The app_facade_param should be the GeoElevationMapViewer instance.
# We call its handler method, passing the clamped pixel coordinates on the *displayed* image.
if app_facade_param and hasattr(app_facade_param, 'handle_map_mouse_click'):
# Call the designated handler method on the GeoElevationMapViewer instance
app_facade_param.handle_map_mouse_click(x_coord_clamped, y_coord_clamped)
try:
# Call the designated handler method on the GeoElevationMapViewer instance
# Pass the clamped pixel coordinates on the SCALED, DISPLAYED image
app_facade_param.handle_map_mouse_click(x_coord_clamped, y_coord_clamped)
logger.debug("Called facade's handle_map_mouse_click.")
except Exception as e_handle_click:
logger.exception(f"Error executing handle_map_mouse_click on app facade: {e_handle_click}")
else:
logger.error(
"app_facade_param not correctly passed to OpenCV mouse callback, or it lacks "
@ -267,11 +352,16 @@ class MapDisplayWindow:
"""
Converts pixel coordinates from the (potentially scaled) displayed map image
to geographic WGS84 coordinates (latitude, longitude).
This method is called by GeoElevationMapViewer.handle_map_mouse_click.
It uses the stored context of the original, unscaled map (`current_map_geo_bounds`, `current_map_native_zoom`)
and the shape of the image *actually displayed* (`displayed_map_pixel_shape`) to perform the conversion.
"""
if not MERCANTILE_LIB_AVAILABLE_DISPLAY:
logger.error("mercantile library not available for pixel_to_geo conversion.")
return None
if not (current_map_geo_bounds and displayed_map_pixel_shape and current_map_native_zoom is not None):
# This warning indicates the context needed for conversion wasn't properly stored/passed.
logger.warning("Cannot convert pixel to geo: Current map context for conversion is incomplete.")
return None
@ -280,34 +370,41 @@ class MapDisplayWindow:
# Geographic bounds of the *original, unscaled* map tile data
map_west_lon, map_south_lat, map_east_lon, map_north_lat = current_map_geo_bounds
# Basic validation of dimensions
if displayed_width <= 0 or displayed_height <= 0:
logger.error("Cannot convert pixel to geo: Invalid displayed map dimensions.")
return None
try:
# Get Web Mercator coordinates of the unscaled map's top-left and bottom-right corners
# Use mercantile to get Web Mercator coordinates of the unscaled map's corners
map_ul_merc_x, map_ul_merc_y = mercantile.xy(map_west_lon, map_north_lat) # type: ignore
map_lr_merc_x, map_lr_merc_y = mercantile.xy(map_east_lon, map_south_lat) # type: ignore
total_map_width_merc = abs(map_lr_merc_x - map_ul_merc_x)
total_map_height_merc = abs(map_ul_merc_y - map_lr_merc_y)
# Handle zero dimensions in Mercator space (e.g., invalid geo bounds)
if total_map_width_merc <= 0 or total_map_height_merc <= 0:
logger.error("Cannot convert pixel to geo: Invalid Mercator dimensions for map bounds.")
return None
# Calculate relative position of the click within the *displayed (scaled)* map (0.0 to 1.0)
relative_x_on_displayed_map = pixel_x_on_displayed / displayed_width
relative_y_on_displayed_map = pixel_y_on_displayed / displayed_height
# Ensure we don't divide by zero if dimensions are unexpectedly zero
relative_x_on_displayed_map = pixel_x_on_displayed / displayed_width if displayed_width > 0 else 0.0
relative_y_on_displayed_map = pixel_y_on_displayed / displayed_height if displayed_height > 0 else 0.0
# Use these relative positions to find the corresponding Web Mercator coordinate
# within the *unscaled* map's Mercator extent.
# Y Mercator increases towards North, pixel Y increases downwards. So use map_ul_merc_y - ...
clicked_point_merc_x = map_ul_merc_x + (relative_x_on_displayed_map * total_map_width_merc)
clicked_point_merc_y = map_ul_merc_y - (relative_y_on_displayed_map * total_map_height_merc)
# Convert Mercator coordinates back to geographic (longitude, latitude)
clicked_lon, clicked_lat = mercantile.lnglat(clicked_point_merc_x, clicked_point_merc_y) # type: ignore
# Return as (latitude, longitude) tuple
return (clicked_lat, clicked_lon)
except Exception as e_px_to_geo:
logger.exception(f"Error during pixel_to_geo conversion: {e_px_to_geo}")
logger.exception(f"Error during pixel_to_geo conversion for pixel ({pixel_x_on_displayed},{pixel_y_on_displayed}): {e_px_to_geo}")
return None
def geo_to_pixel_on_current_map(
@ -321,36 +418,53 @@ class MapDisplayWindow:
"""
Converts geographic WGS84 coordinates to pixel coordinates (x, y)
on the currently displayed (potentially scaled) map image.
This method might be called by the app_facade (GeoElevationMapViewer)
to determine where to draw a marker on the *displayed* image, although
the current drawing implementation in GeoElevationMapViewer draws on the
*unscaled* image and relies on its own direct geo-to-pixel logic for the unscaled image.
This method is kept here for completeness and potential future use if
drawing logic were moved to this class or needed scaled coordinates.
"""
if not MERCANTILE_LIB_AVAILABLE_DISPLAY:
logger.error("mercantile library not available for geo_to_pixel conversion.")
return None
if not (current_map_geo_bounds and displayed_map_pixel_shape and current_map_native_zoom is not None):
# This warning indicates the context needed for conversion wasn't properly stored/passed.
logger.warning("Cannot convert geo to pixel: Current map context for conversion is incomplete.")
return None
# Dimensions of the image *as it is displayed* (after scaling)
displayed_height, displayed_width = displayed_map_pixel_shape
# Geographic bounds of the *original, unscaled* map tile data
map_west_lon, map_south_lat, map_east_lon, map_north_lat = current_map_geo_bounds
# Basic validation of dimensions
if displayed_width <= 0 or displayed_height <= 0:
logger.error("Cannot convert geo to pixel: Invalid displayed map dimensions.")
return None
try:
# Use mercantile to get Web Mercator coordinates of the unscaled map's corners
map_ul_merc_x, map_ul_merc_y = mercantile.xy(map_west_lon, map_north_lat) # type: ignore
map_lr_merc_x, map_lr_merc_y = mercantile.xy(map_east_lon, map_south_lat) # type: ignore
total_map_width_merc = abs(map_lr_merc_x - map_ul_merc_x)
total_map_height_merc = abs(map_ul_merc_y - map_lr_merc_y)
# Handle zero dimensions in Mercator space (e.g., invalid geo bounds)
if total_map_width_merc <= 0 or total_map_height_merc <= 0:
logger.error("Cannot convert geo to pixel: Invalid Mercator dimensions for map bounds.")
return None
# Get Web Mercator coordinates of the target geographic point
target_merc_x, target_merc_y = mercantile.xy(longitude_deg, latitude_deg) # type: ignore
# Relative position of the target geo point within the *unscaled* map's Mercator extent
relative_merc_x_in_map = (target_merc_x - map_ul_merc_x) / total_map_width_merc
relative_merc_y_in_map = (map_ul_merc_y - target_merc_y) / total_map_height_merc
# Calculate relative position of the target geo point within the *unscaled* map's Mercator extent.
# Ensure we don't divide by zero.
relative_merc_x_in_map = (target_merc_x - map_ul_merc_x) / total_map_width_merc if total_map_width_merc > 0 else 0.0
relative_merc_y_in_map = (map_ul_merc_y - target_merc_y) / total_map_height_merc if total_map_height_merc > 0 else 0.0 # Y Mercator increases North, pixel Y downwards
# Convert these relative positions to pixel coordinates on the *displayed (scaled)* image
pixel_x_on_displayed = int(round(relative_merc_x_in_map * displayed_width))
@ -361,7 +475,7 @@ class MapDisplayWindow:
py_clamped = max(0, min(pixel_y_on_displayed, displayed_height - 1))
return (px_clamped, py_clamped)
except Exception as e_geo_to_px:
logger.exception(f"Error during geo_to_pixel conversion: {e_geo_to_px}")
logger.exception(f"Error during geo_to_pixel conversion for geo ({latitude_deg:.5f},{longitude_deg:.5f}): {e_geo_to_px}")
return None
def _create_placeholder_bgr_numpy_array(self) -> np.ndarray: # type: ignore
@ -369,63 +483,103 @@ class MapDisplayWindow:
placeholder_h = 256 # Default placeholder dimensions
placeholder_w = 256
bgr_light_grey = (220, 220, 220) # BGR color for light grey
# MODIFIED: Added check for NumPy availability before creation.
# WHY: Defend against scenarios where NumPy is None despite initial check (unlikely but safe).
# HOW: Check 'if np:'.
if np: # type: ignore
return np.full((placeholder_h, placeholder_w, 3), bgr_light_grey, dtype=np.uint8) # type: ignore
try:
return np.full((placeholder_h, placeholder_w, 3), bgr_light_grey, dtype=np.uint8) # type: ignore
except Exception as e_np_full:
logger.exception(f"Error creating NumPy full array for placeholder: {e_np_full}. Using zeros fallback.")
# Fallback to zeros array if full() fails
return np.zeros((100, 100, 3), dtype=np.uint8) # type: ignore # Minimal array
else: # Fallback if NumPy somehow became None (should not happen if CV2_NUMPY_AVAILABLE is true)
# This case is highly unlikely if __init__ guard passed.
# Creating a manual list of lists if np is None is too complex for a simple placeholder.
# Returning a pre-made small array or raising error might be better.
# For now, this indicates a severe issue.
logger.critical("NumPy became unavailable unexpectedly during placeholder creation.")
# Return a minimal black image to avoid crashing imshow if possible
return [[[0,0,0]]] * 10 # type: ignore # Minimal 10x1 black image (highly not ideal)
# Cannot create a NumPy array, return None which might cause further errors in imshow.
# This indicates a severe issue.
return None # type: ignore
def _convert_pil_image_to_bgr_numpy_array(self, pil_image: ImageType) -> Optional[np.ndarray]: # type: ignore
"""Converts a PIL Image object to a NumPy BGR array for OpenCV display."""
"""
Converts a PIL Image object to a NumPy BGR array for OpenCV display.
Handles different PIL modes (RGB, RGBA, L/Grayscale).
"""
# MODIFIED: Added check for PIL and CV2/NumPy availability.
# WHY: Ensure dependencies are present before attempting conversion.
# HOW: Added checks.
if not (PIL_LIB_AVAILABLE_DISPLAY and CV2_NUMPY_LIBS_AVAILABLE_DISPLAY and pil_image):
logger.error("Cannot convert PIL to BGR: Pillow, OpenCV/NumPy missing, or input image is None.")
return None
try:
# Convert PIL image to NumPy array. This retains the number of channels.
numpy_image_array = np.array(pil_image) # type: ignore
if numpy_image_array.ndim == 2: # Grayscale image
# Convert based on the number of channels (shape[2]) or dimension (ndim)
if numpy_image_array.ndim == 2: # Grayscale or L mode PIL image
logger.debug("Converting grayscale/L PIL image to BGR NumPy array.")
return cv2.cvtColor(numpy_image_array, cv2.COLOR_GRAY2BGR) # type: ignore
elif numpy_image_array.ndim == 3:
if numpy_image_array.shape[2] == 3: # RGB image
logger.debug("Converting RGB PIL image to BGR NumPy array.")
return cv2.cvtColor(numpy_image_array, cv2.COLOR_RGB2BGR) # type: ignore
elif numpy_image_array.shape[2] == 4: # RGBA image (alpha channel will be stripped)
logger.debug("Converting RGBA PIL image to BGR NumPy array (stripping Alpha).")
return cv2.cvtColor(numpy_image_array, cv2.COLOR_RGBA2BGR) # type: ignore
logger.warning(
f"Unsupported NumPy image shape after PIL conversion: {numpy_image_array.shape}. Cannot convert to BGR."
)
return None
else:
logger.warning(
f"Unsupported NumPy image shape after PIL conversion ({numpy_image_array.shape}). Cannot convert to BGR."
)
return None
else: # Unexpected number of dimensions
logger.warning(
f"Unexpected NumPy image dimensions ({numpy_image_array.ndim}) after PIL conversion. Cannot convert to BGR."
)
return None
except Exception as e_conv_pil_bgr:
logger.exception(f"Error converting PIL image to BGR NumPy array: {e_conv_pil_bgr}")
return None
def is_window_alive(self) -> bool:
"""Checks if the OpenCV window is likely still open and initialized."""
# MODIFIED: Added check for CV2/NumPy availability.
# WHY: Prevent errors if dependencies are gone.
# HOW: Added initial check.
if not self.is_opencv_window_initialized or not CV2_NUMPY_LIBS_AVAILABLE_DISPLAY:
return False # Not initialized or OpenCV gone
try:
# WND_PROP_VISIBLE returns >= 1.0 if window is visible, 0.0 if hidden/occluded,
# and < 0 (typically -1.0) if window does not exist.
window_visibility = cv2.getWindowProperty(self.opencv_window_name, cv2.WND_PROP_VISIBLE) # type: ignore
if window_visibility >= 1.0: # Window exists and is visible
return True
else: # Window likely closed or an issue occurred
# Check for any property to see if the window handle is still valid.
# getWindowProperty returns -1 if the window does not exist.
window_property_value = cv2.getWindowProperty(self.opencv_window_name, cv2.WND_PROP_AUTOSIZE) # type: ignore
# A value of -1.0 indicates the window does not exist.
if window_property_value >= 0.0: # Window exists
logger.debug(f"Window '{self.opencv_window_name}' property check >= 0.0 ({window_property_value}). Assuming alive.")
# We can also check for visibility specifically if needed:
# visibility = cv2.getWindowProperty(self.opencv_window_name, cv2.WND_PROP_VISIBLE)
# if visibility >= 1.0: return True else False
return True # Window exists and is likely alive
else: # Window likely closed or an issue occurred (property < 0)
logger.debug(
f"Window '{self.opencv_window_name}' not reported as visible (prop_val={window_visibility}). "
f"Window '{self.opencv_window_name}' property check < 0.0 ({window_property_value}). "
"Assuming it's closed or invalid for interaction."
)
self.is_opencv_window_initialized = False # Update internal state
self.is_opencv_mouse_callback_set = False
self._last_displayed_scaled_image_shape = (0, 0)
return False
except cv2.error: # type: ignore
# OpenCV error (e.g., window name invalid/destroyed)
# OpenCV error (e.g., window name invalid/destroyed).
# This happens if the window was destroyed by user action or other means.
logger.debug(f"OpenCV error when checking property for window '{self.opencv_window_name}'. Assuming closed.")
self.is_opencv_window_initialized = False
self.is_opencv_mouse_callback_set = False
self._last_displayed_scaled_image_shape = (0, 0)
return False
except Exception as e_unexpected_alive_check:
logger.exception(f"Unexpected error checking if window '{self.opencv_window_name}' is alive: {e_unexpected_alive_check}")
@ -436,9 +590,14 @@ class MapDisplayWindow:
def destroy_window(self) -> None:
"""Explicitly destroys the managed OpenCV window and resets state flags."""
logger.info(f"Attempting to destroy OpenCV window: '{self.opencv_window_name}'")
# MODIFIED: Added check for CV2/NumPy availability before destroying.
# WHY: Prevent errors if dependencies are gone.
# HOW: Added initial check.
if self.is_opencv_window_initialized and CV2_NUMPY_LIBS_AVAILABLE_DISPLAY:
try:
cv2.destroyWindow(self.opencv_window_name) # type: ignore
# It is important to call cv2.waitKey() after destroyWindow to process the event queue
# and ensure the window is actually closed by the OS. A small delay helps.
cv2.waitKey(5) # Give OpenCV a moment to process the destruction
logger.info(f"Window '{self.opencv_window_name}' explicitly destroyed.")
except cv2.error as e_cv_destroy_final: # type: ignore
@ -455,7 +614,7 @@ class MapDisplayWindow:
f"Window '{self.opencv_window_name}' was not marked as initialized or OpenCV is not available. "
"No explicit destroy action taken."
)
# Always reset flags after attempting destroy to ensure clean state
# Always reset flags after attempting destroy to ensure clean state regardless of outcome.
self.is_opencv_window_initialized = False
self.is_opencv_mouse_callback_set = False
self._last_displayed_scaled_image_shape = (0, 0)

View File

@ -27,13 +27,16 @@ except ImportError:
logging.error("MapTileManager: 'requests' library not found. Online tile fetching will fail.")
try:
from PIL import Image # For handling image data (opening, saving, stitching)
# MODIFIED: Import ImageDraw along with Image from PIL.
# WHY: ImageDraw is required for drawing on placeholder images and potentially other image manipulation tasks.
# HOW: Added ImageDraw to the import list from PIL.
from PIL import Image, ImageDraw
ImageType = Image.Image # type: ignore
PIL_AVAILABLE_MANAGER = True
except ImportError:
Image = None # type: ignore
ImageDraw = None # type: ignore # Define as None if import fails
ImageType = None # type: ignore
PIL_AVAILABLE_MANAGER = False
logging.error("MapTileManager: 'Pillow' library not found. Image operations will fail.")
@ -63,7 +66,12 @@ class MapTileManager:
self,
map_service: BaseMapService,
cache_root_directory: Optional[str] = None,
enable_online_tile_fetching: Optional[bool] = None
enable_online_tile_fetching: Optional[bool] = None,
# MODIFIED: Added tile_pixel_size parameter to the constructor.
# WHY: To allow the caller (GeoElevationMapViewer) to explicitly specify the tile size
# based on the selected map service configuration.
# HOW: Added the parameter with an Optional[int] type hint and default None.
tile_pixel_size: Optional[int] = None
) -> None:
"""
Initializes the MapTileManager.
@ -76,17 +84,23 @@ class MapTileManager:
A subdirectory for the specific service will be created.
enable_online_tile_fetching: Whether to download tiles if not found in cache.
If None, uses DEFAULT_ENABLE_ONLINE_FETCHING.
tile_pixel_size: The pixel dimension (width/height) of map tiles for this manager.
If None, the size is taken from the map_service instance.
Raises:
TypeError: If map_service_instance is not a valid BaseMapService instance.
ImportError: If 'requests' or 'Pillow' libraries are not installed.
ValueError: If a tile_pixel_size is provided but invalid.
"""
logger.info("Initializing MapTileManager...")
if not REQUESTS_AVAILABLE:
raise ImportError("'requests' library is required by MapTileManager but not found.")
if not PIL_AVAILABLE_MANAGER:
raise ImportError("'Pillow' library is required by MapTileManager but not found.")
# MODIFIED: Check for ImageDraw availability as well if Pillow is expected.
# WHY: Drawing on placeholders requires ImageDraw.
# HOW: Added ImageDraw check.
if not (PIL_AVAILABLE_MANAGER and ImageDraw is not None):
raise ImportError("'Pillow' library (or its drawing module ImageDraw) is required by MapTileManager but not found.")
if not isinstance(map_service, BaseMapService):
logger.critical("Invalid map_service_instance provided. Must be an instance of BaseMapService.")
@ -95,6 +109,21 @@ class MapTileManager:
self.map_service: BaseMapService = map_service
self.service_identifier_name: str = self.map_service.name
# MODIFIED: Set the tile_size attribute using the provided parameter or the service's size.
# WHY: The manager needs to know the pixel dimensions of the tiles it handles for stitching and placeholder creation.
# HOW: Check if tile_pixel_size was provided; if so, validate and use it. Otherwise, use the size from the map_service instance.
if tile_pixel_size is not None:
if not isinstance(tile_pixel_size, int) or tile_pixel_size <= 0:
logger.error(f"Invalid provided tile_pixel_size: {tile_pixel_size}. Must be a positive integer.")
# Fallback to service size or raise error? Let's raise for clarity if a bad value is explicitly passed.
raise ValueError(f"Invalid provided tile_pixel_size: {tile_pixel_size}. Must be a positive integer.")
self.tile_size: int = tile_pixel_size
logger.info(f"Map tile size explicitly set to {self.tile_size}px.")
else:
# Use the size from the provided map service instance
self.tile_size: int = self.map_service.tile_size
logger.info(f"Map tile size inherited from service '{self.map_service.name}': {self.tile_size}px.")
# Determine cache directory path
effective_cache_root_dir = cache_root_directory if cache_root_directory is not None \
else DEFAULT_MAP_TILE_CACHE_ROOT_DIR
@ -154,6 +183,9 @@ class MapTileManager:
pil_image.load() # Load image data into memory to release file lock sooner
# Ensure consistency by converting to RGB if needed
# MODIFIED: Ensure consistency by converting to RGB or RGBA depending on service need (currently RGB).
# WHY: Consistent format is important for image processing and display.
# HOW: Convert to RGB.
if pil_image.mode != "RGB":
logger.debug(
f"Converting cached image {tile_coordinates_log_str} from mode {pil_image.mode} to RGB."
@ -214,9 +246,21 @@ class MapTileManager:
try:
pil_image = Image.open(io.BytesIO(image_binary_data)) # type: ignore
pil_image.load()
# MODIFIED: Convert downloaded image to RGB mode before saving/returning.
# WHY: Consistency in image format within the manager.
# HOW: Added .convert("RGB").
if pil_image.mode != "RGB":
pil_image = pil_image.convert("RGB")
# Optional: Resize downloaded tile if its size doesn't match self.tile_size
# This would be needed if the service URL returns tiles of different sizes,
# which is uncommon for standard XYZ services, but could happen.
# For standard services, the service.tile_size should be correct.
# if pil_image.size != (self.tile_size, self.tile_size):
# logger.warning(f"Downloaded tile size {pil_image.size} doesn't match expected {self.tile_size}. Resizing.")
# pil_image = pil_image.resize((self.tile_size, self.tile_size), Image.Resampling.LANCZOS)
logger.debug(f"Tile {tile_coordinates_log_str} downloaded (Attempt {attempt_num + 1}).")
self._save_image_to_cache_file(tile_cache_path, pil_image)
downloaded_pil_image = pil_image
@ -264,7 +308,8 @@ class MapTileManager:
try:
# Ensure parent directory for the tile file exists
tile_cache_path.parent.mkdir(parents=True, exist_ok=True)
pil_image.save(tile_cache_path) # PIL infers format from extension (e.g., .png)
# Use 'png' format explicitly as it's lossless and common for map tiles
pil_image.save(tile_cache_path, format='PNG') # MODIFIED: Explicitly save as PNG. WHY: Standard format.
logger.debug(f"Saved tile to cache: {tile_cache_path}")
except IOError as e_io_save:
logger.error(f"IOError saving tile to cache {tile_cache_path}: {e_io_save}")
@ -298,6 +343,15 @@ class MapTileManager:
tile_coords_log_str = f"({zoom_level},{tile_x},{tile_y})"
logger.debug(f"Requesting tile image for {tile_coords_log_str}")
# MODIFIED: Check if the zoom level is valid for the map service.
# WHY: Avoid requesting tiles for invalid zoom levels from the service or cache.
# HOW: Added a check using self.map_service.is_zoom_level_valid.
if not self.map_service.is_zoom_level_valid(zoom_level):
logger.error(f"Invalid zoom level {zoom_level} for map service '{self.service_identifier_name}'. Cannot get tile.")
# Return a placeholder for invalid zoom levels
return self._create_placeholder_tile_image(f"Invalid Zoom {zoom_level}")
tile_cache_file = self._get_tile_cache_file_path(zoom_level, tile_x, tile_y)
retrieved_image: Optional[ImageType] = None
@ -311,10 +365,13 @@ class MapTileManager:
if retrieved_image is None: # All attempts failed
logger.warning(f"Failed to retrieve tile {tile_coords_log_str}. Using placeholder.")
retrieved_image = self._create_placeholder_tile_image()
# MODIFIED: Pass tile coordinates to placeholder for debugging/visual info.
# WHY: Helps identify which specific tile failed when looking at the stitched map.
# HOW: Pass a string identifier to the placeholder creation function.
retrieved_image = self._create_placeholder_tile_image(tile_coords_log_str)
if retrieved_image is None: # Should be rare if Pillow is working
logger.critical("Failed to create even a placeholder tile. Returning None.")
return retrieved_image
@ -343,17 +400,68 @@ class MapTileManager:
min_tile_x, max_tile_x = x_tile_range
min_tile_y, max_tile_y = y_tile_range
# Basic validation of ranges
if not (min_tile_x <= max_tile_x and min_tile_y <= max_tile_y):
logger.error(f"Invalid tile ranges for stitching: X={x_tile_range}, Y={y_tile_range}")
return None
num_tiles_wide = (max_tile_x - min_tile_x) + 1
num_tiles_high = (max_tile_y - min_tile_y) + 1
single_tile_pixel_size = self.map_service.tile_size
# MODIFIED: Use the tile_size attribute of the manager.
# WHY: Consistency. The manager's size should be used, not necessarily the service's size again here.
# HOW: Changed self.map_service.tile_size to self.tile_size.
single_tile_pixel_size = self.tile_size
if single_tile_pixel_size <= 0:
logger.error(f"Invalid tile size ({single_tile_pixel_size}) from map service. Cannot stitch.")
return None
logger.error(f"Invalid tile size ({single_tile_pixel_size}) stored in manager. Cannot stitch.")
# MODIFIED: Return placeholder instead of None on invalid tile size.
# WHY: Provide a visual indication that stitching failed due to config, rather than a blank window.
# HOW: Create and return a large placeholder image.
try:
# Ensure Pillow/ImageDraw are available for placeholder creation
if PIL_AVAILABLE_MANAGER and ImageDraw is not None: # type: ignore
# Create a large placeholder image (e.g., 3x3 tiles size)
placeholder_img = Image.new("RGB", (256 * 3, 256 * 3), DEFAULT_PLACEHOLDER_COLOR_RGB) # Use a fixed reasonable size for error image
draw = ImageDraw.Draw(placeholder_img) # type: ignore
# Add error text
error_text = f"Stitch Failed\nInvalid Tile Size:\n{single_tile_pixel_size}"
# This simple text drawing assumes basic PIL text capabilities
try:
# Try drawing with a font loaded in image_processor
from geoelevation.image_processor import DEFAULT_FONT # Assume font loading logic in image_processor
font_to_use = DEFAULT_FONT # type: ignore # Use font loaded in image_processor
if font_to_use:
# Calculate text size and position using the font
# Note: textbbox requires Pillow >= 8.0
try:
text_bbox = draw.textbbox((0,0), error_text, font=font_to_use, spacing=2) # type: ignore
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
text_pos = ((placeholder_img.width - text_width) // 2, (placeholder_img.height - text_height) // 2)
draw.text(text_pos, error_text, fill="black", font=font_to_use, spacing=2, anchor="lt") # type: ignore # Draw text using font
except AttributeError: # Fallback for textbbox if Pillow < 8.0
text_width, text_height = draw.textsize(error_text, font=font_to_use) # type: ignore
text_pos = ((placeholder_img.width - text_width) // 2, (placeholder_img.height - text_height) // 2)
draw.text(text_pos, error_text, fill="black", font=font_to_use) # type: ignore # Draw text using font fallback
except Exception as e_font_draw:
logger.warning(f"Error drawing text with font on placeholder: {e_font_draw}. Falling back to simple draw.")
draw.text((10, 10), error_text, fill="black") # type: ignore # Simple draw if font drawing fails
else:
draw.text((10, 10), error_text, fill="black") # type: ignore # Simple draw if no font was loaded
except Exception as e_draw:
logger.warning(f"Error drawing text on placeholder: {e_draw}. Falling back to simple draw.")
draw.text((10, 10), error_text, fill="black") # type: ignore # Final fallback
return placeholder_img
else:
logger.error("Pillow or ImageDraw not available to create placeholder image.")
return None # Cannot create placeholder without PIL
except Exception as e_placeholder_fail:
logger.exception(f"Failed to create large placeholder for stitching error: {e_placeholder_fail}")
return None # Return None if placeholder creation fails
num_tiles_wide = (max_tile_x - min_tile_x) + 1
num_tiles_high = (max_tile_y - min_tile_y) + 1
total_image_width = num_tiles_wide * single_tile_pixel_size
total_image_height = num_tiles_high * single_tile_pixel_size
@ -362,12 +470,102 @@ class MapTileManager:
f"({num_tiles_wide}x{num_tiles_high} tiles of {single_tile_pixel_size}px)"
)
# Handle potential excessively large image size request
MAX_IMAGE_DIMENSION = 16384 # Arbitrary limit to prevent crashes with massive requests
if total_image_width > MAX_IMAGE_DIMENSION or total_image_height > MAX_IMAGE_DIMENSION:
logger.error(
f"Requested stitched image size ({total_image_width}x{total_image_height}) "
f"exceeds maximum allowed dimension ({MAX_IMAGE_DIMENSION}). Aborting stitch."
)
# Return placeholder for excessive size request
try:
if PIL_AVAILABLE_MANAGER and ImageDraw is not None: # type: ignore
placeholder_img = Image.new("RGB", (256 * 3, 256 * 3), (255, 100, 100)) # Reddish placeholder
draw = ImageDraw.Draw(placeholder_img) # type: ignore
error_text = f"Stitch Failed\nImage too large:\n{total_image_width}x{total_image_height}px"
try:
from geoelevation.image_processor import DEFAULT_FONT
font_to_use = DEFAULT_FONT # type: ignore
if font_to_use:
try:
text_bbox = draw.textbbox((0,0), error_text, font=font_to_use, spacing=2) # type: ignore
text_w = text_bbox[2] - text_bbox[0]
text_h = text_bbox[3] - text_bbox[1]
text_pos = ((placeholder_img.width - text_w) // 2, (placeholder_img.height - text_h) // 2)
draw.text(text_pos, error_text, fill="black", font=font_to_use, spacing=2, anchor="lt") # type: ignore
except AttributeError:
text_w, text_h = draw.textsize(error_text, font=font_to_use) # type: ignore
text_pos = ((placeholder_img.width - text_w) // 2, (placeholder_img.height - text_h) // 2)
draw.text(text_pos, error_text, fill="black", font=font_to_use) # type: ignore
except Exception as e_font_draw:
logger.warning(f"Error drawing text with font on placeholder: {e_font_draw}. Falling back to simple draw.")
draw.text((10, 10), error_text, fill="black") # type: ignore
else:
draw.text((10, 10), error_text, fill="black") # type: ignore
except Exception as e_draw:
logger.warning(f"Error drawing text on placeholder: {e_draw}. Falling back to simple draw.")
draw.text((10, 10), error_text, fill="black") # type: ignore # Final fallback
return placeholder_img
else:
logger.error("Pillow or ImageDraw not available to create placeholder image.")
return None # Cannot create placeholder without PIL
except Exception as e_placeholder_fail:
logger.exception(f"Failed to create large placeholder for size error: {e_placeholder_fail}")
return None # Return None if placeholder fails
try:
# Create a new blank RGB image to paste tiles onto
stitched_map_image = Image.new("RGB", (total_image_width, total_image_height)) # type: ignore
# MODIFIED: Ensure PIL_AVAILABLE_MANAGER is true before creating Image.new.
# WHY: Avoids NameError if PIL import failed.
# HOW: Added check.
if PIL_AVAILABLE_MANAGER:
stitched_map_image = Image.new("RGB", (total_image_width, total_image_height)) # type: ignore
else:
raise ImportError("Pillow not available to create new image.")
except Exception as e_create_blank:
logger.exception(f"Failed to create blank image for stitching: {e_create_blank}")
return None
logger.exception(f"Failed to create blank image for stitching: {e_create_blank}. Dimensions: {total_image_width}x{total_image_height}")
# Return placeholder if blank image creation fails (e.g., out of memory)
try:
if PIL_AVAILABLE_MANAGER and ImageDraw is not None: # type: ignore
placeholder_img = Image.new("RGB", (256 * 3, 256 * 3), (255, 100, 100)) # Reddish placeholder
draw = ImageDraw.Draw(placeholder_img) # type: ignore
error_text = f"Stitch Failed\nCannot create image:\n{e_create_blank}"
try:
from geoelevation.image_processor import DEFAULT_FONT
font_to_use = DEFAULT_FONT # type: ignore
if font_to_use:
try:
text_bbox = draw.textbbox((0,0), error_text, font=font_to_use, spacing=2) # type: ignore
text_w = text_bbox[2] - text_bbox[0]
text_h = text_bbox[3] - text_bbox[1]
text_pos = ((placeholder_img.width - text_w) // 2, (placeholder_img.height - text_h) // 2)
draw.text(text_pos, error_text, fill="black", font=font_to_use, spacing=2, anchor="lt") # type: ignore
except AttributeError:
text_w, text_h = draw.textsize(error_text, font=font_to_use) # type: ignore
text_pos = ((placeholder_img.width - text_w) // 2, (placeholder_img.height - text_h) // 2)
draw.text(text_pos, error_text, fill="black", font=font_to_use) # type: ignore
except Exception as e_font_draw:
logger.warning(f"Error drawing text with font on placeholder: {e_font_draw}. Falling back to simple draw.")
draw.text((10, 10), error_text, fill="black") # type: ignore
else:
draw.text((10, 10), error_text, fill="black") # type: ignore
except Exception as e_draw:
logger.warning(f"Error drawing text on placeholder: {e_draw}. Falling back to simple draw.")
draw.text((10, 10), error_text, fill="black") # type: ignore # Final fallback
return placeholder_img
else:
logger.error("Pillow or ImageDraw not available to create placeholder image.")
return None # Cannot create placeholder without PIL
except Exception as e_placeholder_fail:
logger.exception(f"Failed to create large placeholder for memory error: {e_placeholder_fail}")
return None # Return None if placeholder fails
# Iterate through the required tile coordinates, fetch, and paste
for row_index, current_tile_y in enumerate(range(min_tile_y, max_tile_y + 1)):
@ -379,7 +577,7 @@ class MapTileManager:
logger.critical(
f"Critical error: get_tile_image returned None for ({zoom_level},{current_tile_x},{current_tile_y}). Aborting stitch."
)
return None
return None # Abort stitching on critical tile failure
# Calculate top-left pixel position to paste this tile
paste_position_x = col_index * single_tile_pixel_size
@ -389,7 +587,32 @@ class MapTileManager:
f"pixel position ({paste_position_x},{paste_position_y})"
)
try:
stitched_map_image.paste(tile_image_pil, (paste_position_x, paste_position_y))
# Ensure the tile image is the correct size before pasting
# MODIFIED: Check if tile_image_pil is valid before checking its size.
# WHY: Avoids AttributeError if tile_image_pil is None (shouldn't happen if get_tile_image handles None, but defensive).
# HOW: Added `if tile_image_pil and tile_image_pil.size...`.
if tile_image_pil and tile_image_pil.size != (self.tile_size, self.tile_size):
# This might happen if the downloaded tile or placeholder was the wrong size.
# Resize it to match the expected tile size for stitching consistency.
logger.warning(f"Tile image size {tile_image_pil.size} doesn't match expected {self.tile_size}. Resizing for stitch.")
# MODIFIED: Check PIL_AVAILABLE_MANAGER before resizing.
# WHY: Resize requires PIL.
# HOW: Added check.
if PIL_AVAILABLE_MANAGER:
tile_image_pil = tile_image_pil.resize((self.tile_size, self.tile_size), Image.Resampling.LANCZOS) # type: ignore
else:
logger.error("Pillow not available, cannot resize tile for stitch.")
# Decide fallback: skip pasting this tile or use placeholder?
# Leaving it blank might be okay, or replace with a placeholder of correct size.
# Let's just leave it blank (skip paste) if resize fails due to missing lib.
continue # Skip pasting this tile
# MODIFIED: Check if tile_image_pil is still valid before pasting.
# WHY: It might have become None if resize failed due to missing PIL.
# HOW: Added `if tile_image_pil:`.
if tile_image_pil:
stitched_map_image.paste(tile_image_pil, (paste_position_x, paste_position_y)) # type: ignore
except Exception as e_paste:
logger.exception(
f"Error pasting tile ({zoom_level},{current_tile_x},{current_tile_y}) "
@ -401,21 +624,79 @@ class MapTileManager:
return stitched_map_image
def _create_placeholder_tile_image(self) -> Optional[ImageType]:
"""Creates and returns a placeholder tile image (e.g., a grey square)."""
if not PIL_AVAILABLE_MANAGER:
logger.warning("Cannot create placeholder tile: Pillow library not available.")
def _create_placeholder_tile_image(self, identifier: str = "N/A") -> Optional[ImageType]:
"""
Creates and returns a placeholder tile image (e.g., a grey square).
Includes optional text identifier on the placeholder.
"""
# MODIFIED: Added check for ImageDraw availability.
# WHY: Drawing on placeholders requires ImageDraw.
# HOW: Added check.
if not (PIL_AVAILABLE_MANAGER and ImageDraw is not None):
logger.warning("Cannot create placeholder tile: Pillow or ImageDraw library not available.")
return None
try:
tile_pixel_size = self.map_service.tile_size
tile_pixel_size = self.tile_size # Use the manager's stored tile size
# Ensure placeholder_color is a valid RGB tuple
placeholder_color = DEFAULT_PLACEHOLDER_COLOR_RGB
if not (isinstance(placeholder_color, tuple) and len(placeholder_color) == 3 and
all(isinstance(c, int) and 0 <= c <= 255 for c in placeholder_color)):
logger.warning(f"Invalid placeholder color '{placeholder_color}'. Using default grey.")
placeholder_color = (220, 220, 220)
# No need to re-validate color if it's a fixed constant, but defensive check
# if not (isinstance(placeholder_color, tuple) and len(placeholder_color) == 3 and
# all(isinstance(c, int) and 0 <= c <= 255 for c in placeholder_color)):
# logger.warning(f"Invalid placeholder color '{placeholder_color}'. Using default grey.")
# placeholder_color = (220, 220, 220)
placeholder_img = Image.new("RGB", (tile_pixel_size, tile_pixel_size), color=placeholder_color) # type: ignore
draw = ImageDraw.Draw(placeholder_img) # type: ignore
# Add text overlay indicating failure and identifier
overlay_text = f"Tile Fail\n{identifier}"
try:
# Attempt to use a font loaded in image_processor for consistency
from geoelevation.image_processor import DEFAULT_FONT # Assume font loading logic exists
font_to_use = DEFAULT_FONT # type: ignore # Use the shared loaded font
# Calculate text position for centering or top-left
# Using textbbox for accurate size calculation (requires Pillow >= 8.0)
try:
# textbbox returns (left, top, right, bottom) relative to the anchor (0,0)
text_bbox = draw.textbbox((0,0), overlay_text, font=font_to_use, spacing=2) # type: ignore
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
# Center the text (approx)
text_x = (tile_pixel_size - text_width) // 2
text_y = (tile_pixel_size - text_height) // 2
# Draw text with the loaded font, anchored at the top-left of the text bbox
draw.text((text_x, text_y), overlay_text, fill="black", font=font_to_use, spacing=2, anchor="lt") # type: ignore
except AttributeError: # Fallback for textbbox if Pillow < 8.0
logger.warning("Pillow textbbox not available (Pillow < 8.0). Using textsize fallback.")
# textsize might not handle multiline spacing well
text_width, text_height = draw.textsize(overlay_text, font=font_to_use) # type: ignore
# Add approximated height for multiline if needed
if "\n" in overlay_text:
line_count = overlay_text.count("\n") + 1
text_height += line_count * 2 # Rough approximation
# Center text based on textsize (less accurate for multiline)
text_x = (tile_pixel_size - text_width) // 2
text_y = (tile_pixel_size - text_height) // 2
draw.text((text_x, text_y), overlay_text, fill="black", font=font_to_use) # type: ignore # Draw text using font fallback
except Exception as e_font_draw:
logger.warning(f"Error drawing text with font on placeholder: {e_font_draw}. Falling back to simple draw.")
# Fallback to simple draw if font drawing fails
draw.text((10, 10), overlay_text, fill="black") # type: ignore # Simple draw near top-left
except Exception as e_draw:
logger.warning(f"Error drawing text on placeholder: {e_draw}. Falling back to simple draw.")
draw.text((10, 10), overlay_text, fill="black") # type: ignore # Final fallback
return placeholder_img
return Image.new("RGB", (tile_pixel_size, tile_pixel_size), color=placeholder_color) # type: ignore
except Exception as e_placeholder:
logger.exception(f"Error creating placeholder tile image: {e_placeholder}")
return None
@ -431,9 +712,14 @@ class MapTileManager:
or kept here if MapTileManager is the primary user of mercantile for this.
Requires 'mercantile' library.
"""
# Check if mercantile is available (it should be if class initialized)
# Check if mercantile is available (it should be if MapTileManager initialized without error)
try:
import mercantile as local_mercantile # Local import for this method
# MODIFIED: Check if mercantile is actually available after import attempt.
# WHY: Defend against scenarios where the import succeeds but mercantile is None.
# HOW: Add explicit check.
if local_mercantile is None:
raise ImportError("mercantile is None after import.")
except ImportError:
logger.error("mercantile library not found, cannot calculate bounds for tile range.")
return None

View File

@ -103,7 +103,7 @@ def get_bounding_box_from_center_size(
west_boundary_lon, _, _ = geodetic_calculator.fwd(
lons=center_longitude_deg, lats=center_latitude_deg, az=270.0, dist=half_side_length_meters
)
# Handle potential latitude clamping at poles
# pyproj should handle this correctly, but a sanity check can be useful
north_boundary_lat = min(90.0, max(-90.0, north_boundary_lat))
@ -156,9 +156,9 @@ def get_tile_ranges_for_bbox(
# mercantile.tiles expects (west, south, east, north) and zoom as integer
# It returns a generator of Tile(x, y, z) objects.
tiles_in_bbox_generator = mercantile.tiles( # type: ignore
west_lon, south_lat, east_lon, north_lat, zooms=zoom_level
west_lon, south_lat, east_lon, north_lat, zooms=[zoom_level] # Pass zoom as a list
)
list_of_tiles = list(tiles_in_bbox_generator)
if not list_of_tiles:
@ -170,9 +170,13 @@ def get_tile_ranges_for_bbox(
)
center_lon = (west_lon + east_lon) / 2.0
center_lat = (south_lat + north_lat) / 2.0
# Clamp center_lat to avoid mercantile issues near poles if bbox extends beyond valid range
center_lat = max(-85.0, min(85.0, center_lat)) # Mercantile limits
center_lon = max(-180.0, min(180.0, center_lon)) # Mercantile limits
# mercantile.tile(lon, lat, zoom)
center_point_tile = mercantile.tile(center_lon, center_lat, zoom_level) # type: ignore
min_tile_x = center_point_tile.x
max_tile_x = center_point_tile.x
min_tile_y = center_point_tile.y
@ -233,19 +237,229 @@ def calculate_meters_per_pixel(
# Earth's equatorial circumference in meters (WGS84)
EARTH_CIRCUMFERENCE_METERS = 40075016.686
latitude_radians = math.radians(latitude_degrees)
# Formula: Resolution = (Circumference * cos(latitude_rad)) / (tile_size * 2^zoom)
resolution_m_px = (EARTH_CIRCUMFERENCE_METERS * math.cos(latitude_radians)) / \
(tile_pixel_size * (2**zoom_level))
# Avoid returning non-finite values if cos(latitude) is near zero at poles
if not math.isfinite(resolution_m_px) or resolution_m_px <= 0:
logger.warning(f"Calculated non-finite or non-positive m/px ({resolution_m_px}) at Lat {latitude_degrees}. Returning None.")
return None
logger.debug(
f"Calculated meters/pixel at lat {latitude_degrees:.4f}, zoom {zoom_level}, "
f"tile_size {tile_pixel_size}px: {resolution_m_px:.4f} m/px"
)
return resolution_m_px
except Exception as e_mpp_calc:
logger.exception(f"Error calculating meters per pixel: {e_mpp_calc}")
return None
# MODIFIED: Added function to calculate geographic size of a bounding box.
# WHY: Needed to report the displayed map area size in the GUI.
# HOW: Implemented logic using pyproj to calculate distances for width and height.
def calculate_geographic_bbox_size_km(
bounding_box_deg: Tuple[float, float, float, float] # (west, south, east, north)
) -> Optional[Tuple[float, float]]: # Returns (approx_width_km, approx_height_km)
"""
Calculates the approximate geographic width and height of a bounding box in kilometers.
Uses pyproj if available. Width is calculated along the center latitude, height along center longitude.
Args:
bounding_box_deg: A tuple (west_lon, south_lat, east_lon, north_lat) in degrees.
Returns:
A tuple (approx_width_km, approx_height_km), or None if calculation fails or pyproj is unavailable.
"""
if not PYPROJ_AVAILABLE:
logger.error(
"'pyproj' library is required for geographic size calculation but is not found."
)
return None
west_lon, south_lat, east_lon, north_lat = bounding_box_deg
# Basic validation
if not (-90.0 <= south_lat <= north_lat <= 90.0):
logger.warning(f"Invalid latitude range for size calculation: {south_lat}, {north_lat}")
return None
# For longitude, check range is within a reasonable bound, but allow west > east for dateline crossing
# The geographic width should be calculated carefully to handle wrapping around the globe.
# The height calculation is simpler as latitude is bounded.
try:
geodetic_calculator = pyproj.Geod(ellps="WGS84") # type: ignore
# Calculate approximate width along the center latitude
# The 'inv' method from pyproj.Geod is suitable for this, it handles the antimeridian.
center_lat = (south_lat + north_lat) / 2.0
# Clamp center lat to avoid issues near poles for geod.inv
center_lat = max(-89.9, min(89.9, center_lat))
# geod.inv returns (forward_azimuth, backward_azimuth, distance)
_, _, width_meters = geodetic_calculator.inv(west_lon, center_lat, east_lon, center_lat)
# Calculate approximate height along the center longitude
# This is simpler, distance between south_lat and north_lat at center_lon
# Need to handle potential longitude wrap around - use inv carefully
# The straight line distance calculation below should be generally fine for height.
center_lon = (west_lon + east_lon) / 2.0 # Use the average longitude for height calculation line
# Clamp center lon
center_lon = max(-179.9, min(179.9, center_lon))
_, _, height_meters = geodetic_calculator.inv(center_lon, south_lat, center_lon, north_lat)
approx_width_km = abs(width_meters) / 1000.0 # Ensure positive distance
approx_height_km = abs(height_meters) / 1000.0
# Add a sanity check: if width or height are zero, something is wrong (e.g., points are identical)
if approx_width_km <= 0 or approx_height_km <= 0:
logger.warning(f"Calculated non-positive width or height for BBox {bounding_box_deg}. Result: ({approx_width_km:.2f}, {approx_height_km:.2f}). Returning None.")
return None
logger.debug(
f"Calculated BBox size for {bounding_box_deg}: "
f"Approx. {approx_width_km:.2f}km W x {approx_height_km:.2f}km H"
)
return (approx_width_km, approx_height_km)
except Exception as e_size_calc:
logger.exception(f"Error calculating geographic bounding box size: {e_size_calc}")
return None
# MODIFIED: Added function to calculate the geographic bounds of an HGT tile from its integer coordinates.
# WHY: Needed to get the exact bounds of the DEM tile to determine the map fetch area and potentially draw the boundary.
# HOW: Based on HGT tile naming conventions (e.g., N45E007 covers 7E-8E, 45N-46N), the integer coordinates are the southwest corner.
def get_hgt_tile_geographic_bounds(lat_coord: int, lon_coord: int) -> Tuple[float, float, float, float]:
"""
Calculates the precise geographic bounding box (W, S, E, N) for an HGT tile
based on its integer latitude and longitude coordinates.
Assumes standard HGT 1x1 degree tile coverage where lat_coord is the
southern boundary latitude and lon_coord is the western boundary longitude.
E.g., tile N45E007 (lat_coord=45, lon_coord=7) covers 7E-8E, 45N-46N.
Args:
lat_coord: The integer latitude coordinate of the tile (e.g., 45 for N45).
lon_coord: The integer longitude coordinate of the tile (e.g., 7 for E007).
Returns:
A tuple (west_lon, south_lat, east_lon, north_lat) in degrees.
"""
west_lon = float(lon_coord)
south_lat = float(lat_coord)
east_lon = float(lon_coord + 1)
north_lat = float(lat_coord + 1)
# Clamping to strict WGS84 bounds for sanity, though tile coords should respect this
west_lon = max(-180.0, min(180.0, west_lon))
south_lat = max(-90.0, min(90.0, south_lat))
east_lon = max(-180.0, min(180.0, east_lon))
north_lat = max(-90.0, min(90.0, north_lat))
logger.debug(f"Calculated HGT tile bounds for ({lat_coord},{lon_coord}): ({west_lon:.6f}, {south_lat:.6f}, {east_lon:.6f}, {north_lat:.6f})")
return (west_lon, south_lat, east_lon, north_lat)
# MODIFIED: Added function to calculate required zoom level for a target geographic size to fit in a target pixel size.
# WHY: To determine the appropriate zoom level for displaying the 1x1 degree DEM tile area within a manageable pixel size.
# HOW: Used the inverse of the calculate_meters_per_pixel formula to solve for the zoom level.
def calculate_zoom_level_for_geographic_size(
latitude_degrees: float,
geographic_height_meters: float, # The height of the area you want to fit in pixels
target_pixel_height: int, # The desired pixel height for that geographic area
tile_pixel_size: int = 256
) -> Optional[int]:
"""
Calculates the approximate Web Mercator zoom level required for a given
geographic height (in meters at a specific latitude) to span a target pixel height.
This is useful for determining the zoom needed to fit a known geographic area
(like a DEM tile's height) into a certain number of pixels on a map composed of tiles.
Args:
latitude_degrees: The latitude at which the geographic_height_meters is measured.
geographic_height_meters: The actual height of the geographic area in meters at that latitude.
target_pixel_height: The desired height in pixels for that geographic area on the map.
tile_pixel_size: The size of one side of a map tile in pixels (usually 256).
Returns:
The approximate integer zoom level, or None if calculation fails or inputs are invalid.
"""
if not (-90.0 <= latitude_degrees <= 90.0):
logger.warning(f"Invalid latitude for zoom calculation: {latitude_degrees}")
return None
if not (isinstance(geographic_height_meters, (int, float)) and geographic_height_meters > 0):
logger.warning(f"Invalid geographic_height_meters for zoom calculation: {geographic_height_meters}. Must be positive.")
return None
if not (isinstance(target_pixel_height, int) and target_pixel_height > 0):
logger.warning(f"Invalid target_pixel_height for zoom calculation: {target_pixel_height}. Must be positive integer.")
return None
if not (isinstance(tile_pixel_size, int) and tile_pixel_size > 0):
logger.warning(f"Invalid tile_pixel_size for zoom calculation: {tile_pixel_size}. Must be positive integer.")
return None
try:
# Earth's equatorial circumference in meters (WGS84)
EARTH_CIRCUMFERENCE_METERS = 40075016.686
# Calculate the required meters per pixel (resolution) to fit the geographic height into the target pixel height
required_resolution_m_px = geographic_height_meters / target_pixel_height
# Avoid division by zero or non-finite results
if required_resolution_m_px <= 0 or not math.isfinite(required_resolution_m_px):
logger.warning(f"Calculated non-positive or non-finite required resolution ({required_resolution_m_px} m/px). Cannot calculate zoom.")
return None
# Use the inverse of the resolution formula: Resolution = (Circumference * cos(latitude_rad)) / (tile_size * 2^z)
# Rearranging for 2^z: 2^z = (Circumference * cos(latitude_rad)) / (tile_size * Resolution)
# Solving for z: z = log2( (Circumference * cos(latitude_rad)) / (tile_size * Resolution) )
latitude_radians = math.radians(latitude_degrees)
cos_lat = math.cos(latitude_radians)
# Avoid division by zero or log of zero/negative if cos_lat is near zero (at poles)
# The latitude validation should prevent hitting exactly 90/-90, but check for very small values.
if abs(cos_lat) < 1e-9: # Handle near poles
logger.warning(f"Latitude {latitude_degrees} is too close to a pole for reliable zoom calculation.")
# Return a very low zoom level as a fallback? Or None?
# Given the context (mapping a DEM tile), this likely won't happen as DEMs stop at 60 deg.
return None # Return None for latitudes very close to poles
term_for_log = (EARTH_CIRCUMFERENCE_METERS * cos_lat) / (tile_pixel_size * required_resolution_m_px)
# Ensure the argument for log2 is positive
if term_for_log <= 0:
logger.warning(f"Calculated non-positive term for log2 ({term_for_log}) during zoom calculation. Cannot calculate zoom.")
return None
# Calculate the precise zoom level (can be fractional)
precise_zoom = math.log2(term_for_log)
# We need an integer zoom level for tile fetching. Rounding to the nearest integer is common.
# Floor might get fewer tiles than needed, ceil might get more. Rounding is a good balance.
integer_zoom = int(round(precise_zoom))
# Clamp the calculated zoom level to a reasonable range (e.g., 0 to 20)
# A zoom level too high (e.g., > 22) might not be supported by map services.
# A level too low (negative) indicates an issue or a request for an impossibly large area.
# We should probably cap it to the map service's max zoom as well, but that info isn't available here.
# Let's clamp it to a general reasonable range.
clamped_zoom = max(0, min(integer_zoom, 20)) # Max zoom of 20 is usually safe for OSM
logger.debug(
f"Calculated zoom for {geographic_height_meters:.2f}m at Lat {latitude_degrees:.4f} "
f"to fit in {target_pixel_height}px: Precise Zoom {precise_zoom:.2f}, Clamped Integer Zoom {clamped_zoom}"
)
return clamped_zoom
except Exception as e_zoom_calc:
logger.exception(f"Error calculating zoom level: {e_zoom_calc}")
return None