SXXXXXXX_GeoElevation/geoelevation/elevation_gui.py
2025-05-13 08:26:12 +02:00

458 lines
23 KiB
Python

# elevation_gui.py
import tkinter as tk
from tkinter import ttk, messagebox
import logging
import math
import multiprocessing
import threading
import os
from typing import Optional, Tuple, List, Dict, TYPE_CHECKING
# Use ABSOLUTE imports (assuming 'geoelevation' is the top-level package name)
from geoelevation.elevation_manager import ElevationManager, RASTERIO_AVAILABLE
from geoelevation.image_processor import (
load_prepare_single_browse,
create_composite_area_image,
PIL_AVAILABLE,
)
from geoelevation.visualizer import (
show_image_matplotlib,
show_3d_matplotlib,
MATPLOTLIB_AVAILABLE,
SCIPY_AVAILABLE # Import check for SciPy (needed for smoothing/interpolation)
)
# Type hint for numpy array without direct import if possible
if TYPE_CHECKING:
import numpy as np_typing
# Setup logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
DEFAULT_CACHE_DIR = "elevation_cache"
# === MULTIPROCESSING TARGET FUNCTIONS ===
def process_target_show_image(image_path: str, tile_name: str, window_title: str):
"""Target function for multiprocessing: Loads, prepares, and shows single image."""
try:
from geoelevation.image_processor import load_prepare_single_browse, PIL_AVAILABLE
from geoelevation.visualizer import show_image_matplotlib, MATPLOTLIB_AVAILABLE
import os
if not PIL_AVAILABLE or not MATPLOTLIB_AVAILABLE:
print("PROCESS ERROR (show_image): Pillow/Matplotlib not available in child.")
return
prepared_image = load_prepare_single_browse(image_path, tile_name)
if prepared_image:
print(f"PROCESS (show_image): Showing '{window_title}'")
show_image_matplotlib(prepared_image, window_title)
else:
print(f"PROCESS ERROR (show_image): Could not prepare {os.path.basename(image_path)}")
except Exception as e:
print(f"PROCESS ERROR in process_target_show_image: {e}")
import traceback
traceback.print_exc()
def process_target_show_3d(
hgt_data: Optional["np_typing.ndarray"],
plot_title: str,
initial_subsample: int,
smooth_sigma: Optional[float] = None,
interpolation_factor: int = 1,
plot_grid_points: int = 500
):
"""Target function for multiprocessing: Shows 3D plot, optionally smoothed/interpolated."""
try:
from geoelevation.visualizer import show_3d_matplotlib, MATPLOTLIB_AVAILABLE, SCIPY_AVAILABLE
import numpy as np_child # Use alias if main module has dummy np
if not MATPLOTLIB_AVAILABLE:
print("PROCESS ERROR (show_3d): Matplotlib not available in child.")
return
if (interpolation_factor > 1 or smooth_sigma is not None) and not SCIPY_AVAILABLE:
print(f"PROCESS WARNING (show_3d): SciPy not available in child. Disabling smoothing/interpolation.")
smooth_sigma = None
interpolation_factor = 1
if hgt_data is not None:
print(f"PROCESS (show_3d): Plotting '{plot_title}' (InitialSub:{initial_subsample}, Smooth:{smooth_sigma}, Interp:{interpolation_factor}x, PlotGridTarget:{plot_grid_points})")
show_3d_matplotlib(
hgt_data,
plot_title,
initial_subsample=initial_subsample,
smooth_sigma=smooth_sigma,
interpolation_factor=interpolation_factor,
plot_grid_points=plot_grid_points
)
else:
print("PROCESS ERROR (show_3d): No HGT data received.")
except Exception as e:
print(f"PROCESS ERROR in process_target_show_3d: {e}")
import traceback
traceback.print_exc()
def process_target_create_show_area(tile_info_list: List[Dict], window_title: str):
"""Target function for multiprocessing: Creates composite image and shows it."""
try:
from geoelevation.image_processor import create_composite_area_image, PIL_AVAILABLE
from geoelevation.visualizer import show_image_matplotlib, MATPLOTLIB_AVAILABLE
if not PIL_AVAILABLE or not MATPLOTLIB_AVAILABLE:
print("PROCESS ERROR (show_area): Pillow/Matplotlib not available in child.")
return
print("PROCESS (show_area): Creating composite image...")
composite_image = create_composite_area_image(tile_info_list)
if composite_image:
print(f"PROCESS (show_area): Showing '{window_title}'")
show_image_matplotlib(composite_image, window_title)
else:
print("PROCESS ERROR (show_area): Failed to create composite image.")
except Exception as e:
print(f"PROCESS ERROR in process_target_create_show_area: {e}")
import traceback
traceback.print_exc()
# === END MULTIPROCESSING TARGET FUNCTIONS ===
# --- Import Version Info FOR THE WRAPPER ITSELF ---
try:
# Use absolute import based on package name
from geoelevation import _version as wrapper_version
WRAPPER_APP_VERSION_STRING = f"{wrapper_version.__version__} ({wrapper_version.GIT_BRANCH}/{wrapper_version.GIT_COMMIT_HASH[:7]})"
WRAPPER_BUILD_INFO = f"Wrapper Built: {wrapper_version.BUILD_TIMESTAMP}"
except ImportError:
# This might happen if you run the wrapper directly from source
# without generating its _version.py first (if you use that approach for the wrapper itself)
WRAPPER_APP_VERSION_STRING = "(Dev Wrapper)"
WRAPPER_BUILD_INFO = "Wrapper build time unknown"
# --- End Import Version Info ---
# --- Constants for Version Generation ---
DEFAULT_VERSION = "0.0.0+unknown"
DEFAULT_COMMIT = "Unknown"
DEFAULT_BRANCH = "Unknown"
# --- End Constants ---
class ElevationApp:
"""Main application class for the Elevation Tool GUI."""
def __init__(self, parent_widget: tk.Tk, elevation_manager: Optional[ElevationManager] = None):
self.root = parent_widget
if elevation_manager is None:
try:
if not RASTERIO_AVAILABLE:
logging.warning("Rasterio not available. Elevation functions limited.")
self.manager = ElevationManager(tile_directory=DEFAULT_CACHE_DIR)
except Exception as e:
logging.critical(f"Failed to initialize ElevationManager: {e}", exc_info=True)
messagebox.showerror("Init Error", f"Could not start Elevation Manager:\n{e}", parent=self.root)
self.manager = None
else:
self.manager = elevation_manager
self.root.title(f"Elevation Tool - {WRAPPER_APP_VERSION_STRING}")
self.root.minsize(450, 350)
self.last_valid_point_coords: Optional[Tuple[float, float]] = None
self.last_area_coords: Optional[Tuple[float, float, float, float]] = None
self.is_processing: bool = False
main_frame = ttk.Frame(self.root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
point_frame = ttk.LabelFrame(main_frame, text="Get Elevation for Point", padding="10")
point_frame.grid(row=0, column=0, padx=5, pady=5, sticky=(tk.W, tk.E))
point_frame.columnconfigure(1, weight=1)
ttk.Label(point_frame, text="Latitude:").grid(row=0, column=0, sticky=tk.W, padx=5, pady=3)
self.lat_entry = ttk.Entry(point_frame, width=15)
self.lat_entry.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=5, pady=3)
self.lat_entry.insert(0, "45.0")
ttk.Label(point_frame, text="Longitude:").grid(row=1, column=0, sticky=tk.W, padx=5, pady=3)
self.lon_entry = ttk.Entry(point_frame, width=15)
self.lon_entry.grid(row=1, column=1, sticky=(tk.W, tk.E), padx=5, pady=3)
self.lon_entry.insert(0, "7.0")
self.get_elevation_button = ttk.Button(point_frame, text="Get Elevation", command=self.run_get_elevation)
self.get_elevation_button.grid(row=2, column=0, columnspan=2, pady=5, sticky=(tk.W, tk.E))
self.result_label = ttk.Label(point_frame, text="Result: ", wraplength=400, justify=tk.LEFT)
self.result_label.grid(row=3, column=0, columnspan=2, sticky=tk.W, pady=5)
action_frame = ttk.Frame(point_frame)
action_frame.grid(row=4, column=0, columnspan=2, pady=(5, 0), sticky=(tk.W, tk.E))
action_frame.columnconfigure(0, weight=1)
action_frame.columnconfigure(1, weight=1)
self.show_2d_button = ttk.Button(action_frame, text="Show Browse Image (2D)", command=self.trigger_2d_display, state=tk.DISABLED)
self.show_2d_button.grid(row=0, column=0, padx=2, sticky=(tk.W, tk.E))
if not MATPLOTLIB_AVAILABLE or not PIL_AVAILABLE:
self.show_2d_button.config(state=tk.DISABLED, text="Show Browse (Libs N/A)")
self.show_3d_button = ttk.Button(action_frame, text="Show DEM Tile (3D)", command=self.trigger_3d_display, state=tk.DISABLED)
self.show_3d_button.grid(row=0, column=1, padx=2, sticky=(tk.W, tk.E))
if not MATPLOTLIB_AVAILABLE:
self.show_3d_button.config(state=tk.DISABLED, text="Show DEM Tile (Matplotlib N/A)")
area_frame = ttk.LabelFrame(main_frame, text="Pre-Download Tiles for Area", padding="10")
area_frame.grid(row=1, column=0, padx=5, pady=5, sticky=(tk.W, tk.E))
area_frame.columnconfigure(1, weight=1)
area_frame.columnconfigure(3, weight=1)
ttk.Label(area_frame, text="Min Lat:").grid(row=0, column=0, sticky=tk.W, padx=5, pady=3)
self.min_lat_entry = ttk.Entry(area_frame, width=10)
self.min_lat_entry.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=5, pady=3)
self.min_lat_entry.insert(0, "44.0")
ttk.Label(area_frame, text="Max Lat:").grid(row=0, column=2, sticky=tk.W, padx=(10, 5), pady=3)
self.max_lat_entry = ttk.Entry(area_frame, width=10)
self.max_lat_entry.grid(row=0, column=3, sticky=(tk.W, tk.E), padx=5, pady=3)
self.max_lat_entry.insert(0, "45.0")
ttk.Label(area_frame, text="Min Lon:").grid(row=1, column=0, sticky=tk.W, padx=5, pady=3)
self.min_lon_entry = ttk.Entry(area_frame, width=10)
self.min_lon_entry.grid(row=1, column=1, sticky=(tk.W, tk.E), padx=5, pady=3)
self.min_lon_entry.insert(0, "7.0")
ttk.Label(area_frame, text="Max Lon:").grid(row=1, column=2, sticky=tk.W, padx=(10, 5), pady=3)
self.max_lon_entry = ttk.Entry(area_frame, width=10)
self.max_lon_entry.grid(row=1, column=3, sticky=(tk.W, tk.E), padx=5, pady=3)
self.max_lon_entry.insert(0, "8.0")
self.download_area_button = ttk.Button(area_frame, text="Download Area Tiles", command=self.run_download_area)
self.download_area_button.grid(row=2, column=0, columnspan=4, pady=10, sticky=(tk.W, tk.E))
self.show_area_button = ttk.Button(area_frame, text="Show Area Browse Images (2D)", command=self.trigger_area_display, state=tk.DISABLED)
self.show_area_button.grid(row=3, column=0, columnspan=4, pady=5, sticky=(tk.W, tk.E))
if not MATPLOTLIB_AVAILABLE or not PIL_AVAILABLE:
self.show_area_button.config(state=tk.DISABLED, text="Show Area (Libs N/A)")
self.download_status_label = ttk.Label(area_frame, text="Status: Idle", wraplength=400, justify=tk.LEFT)
self.download_status_label.grid(row=4, column=0, columnspan=4, sticky=tk.W, pady=5)
main_frame.columnconfigure(0, weight=1)
main_frame.rowconfigure(0, weight=0)
main_frame.rowconfigure(1, weight=0)
if self.manager is None:
self.get_elevation_button.config(state=tk.DISABLED)
self.download_area_button.config(state=tk.DISABLED)
self.result_label.config(text="Result: Manager Init Failed.")
self.download_status_label.config(text="Status: Manager Failed.")
def _set_busy(self, busy: bool):
self.is_processing = busy
state = tk.DISABLED if busy else tk.NORMAL
if self.manager is not None:
self.get_elevation_button.config(state=state)
self.download_area_button.config(state=state)
def _validate_coordinates(self, lat_str: str, lon_str: str) -> Optional[Tuple[float, float]]:
try:
if not lat_str: raise ValueError("Latitude empty.")
lat = float(lat_str.strip())
if not (-90 <= lat < 90): raise ValueError("Latitude out of range [-90, 90).")
if not lon_str: raise ValueError("Longitude empty.")
lon = float(lon_str.strip())
if not (-180 <= lon < 180): raise ValueError("Longitude out of range [-180, 180).")
return lat, lon
except ValueError as e:
logging.error(f"Invalid coordinate: {e}")
messagebox.showerror("Input Error", f"Invalid coordinate:\n{e}", parent=self.root)
return None
def _validate_area_bounds(self) -> Optional[Tuple[float, float, float, float]]:
try:
min_l_s, max_l_s = self.min_lat_entry.get().strip(), self.max_lat_entry.get().strip()
min_o_s, max_o_s = self.min_lon_entry.get().strip(), self.max_lon_entry.get().strip()
if not all([min_l_s, max_l_s, min_o_s, max_o_s]): raise ValueError("All bounds must be filled.")
min_l, max_l = float(min_l_s), float(max_l_s)
min_o, max_o = float(min_o_s), float(max_o_s)
if not (-90<=min_l<90 and -90<=max_l<90 and -180<=min_o<180 and -180<=max_o<180):
raise ValueError("Coordinates out of valid range.")
if min_l >= max_l: raise ValueError("Min Lat >= Max Lat.")
if min_o >= max_o: raise ValueError("Min Lon >= Max Lon.")
return min_l, min_o, max_l, max_o
except ValueError as e:
logging.error(f"Invalid area: {e}")
messagebox.showerror("Input Error", f"Invalid area:\n{e}", parent=self.root)
return None
def run_get_elevation(self):
if self.is_processing or self.manager is None: return
coords = self._validate_coordinates(self.lat_entry.get(), self.lon_entry.get())
if not coords: return
lat, lon = coords
self._set_busy(True)
self.result_label.config(text="Result: Requesting...")
self.show_2d_button.config(state=tk.DISABLED)
self.show_3d_button.config(state=tk.DISABLED)
self.last_valid_point_coords = None
self.root.update_idletasks()
try:
elevation = self.manager.get_elevation(lat, lon)
if elevation is None:
res_text = "Result: Elevation data unavailable."
messagebox.showwarning("Info", "Could not retrieve elevation.", parent=self.root)
elif math.isnan(elevation):
res_text = "Result: Point on NoData area."
self.last_valid_point_coords = (lat, lon)
else:
res_text = f"Result: Elevation {elevation:.2f}m"
self.last_valid_point_coords = (lat, lon)
self.result_label.config(text=res_text)
if self.last_valid_point_coords:
if MATPLOTLIB_AVAILABLE and PIL_AVAILABLE: self.show_2d_button.config(state=tk.NORMAL)
if MATPLOTLIB_AVAILABLE: self.show_3d_button.config(state=tk.NORMAL)
except Exception as e:
logging.exception("GUI Error: get_elevation")
messagebox.showerror("Error", f"Unexpected error:\n{e}", parent=self.root)
self.result_label.config(text="Result: Error.")
finally:
self._set_busy(False)
def run_download_area(self):
if self.is_processing or self.manager is None: return
bounds = self._validate_area_bounds()
if not bounds: return
min_lat, min_lon, max_lat, max_lon = bounds
self.last_area_coords = bounds
self._set_busy(True)
self.download_status_label.config(text="Status: Starting...")
self.show_area_button.config(state=tk.DISABLED)
self.root.update_idletasks()
thread = threading.Thread(target=self._perform_area_download_task, args=bounds, daemon=True)
thread.start()
def _perform_area_download_task(self, min_lat, min_lon, max_lat, max_lon):
status, success, p_count, o_count = "Status: Unknown error.", False, 0, 0
try:
self.root.after(0, lambda: self.download_status_label.config(text="Status: Downloading..."))
if self.manager:
p_count, o_count = self.manager.download_area(min_lat, min_lon, max_lat, max_lon)
status = f"Status: Complete. Processed {p_count}, Obtained {o_count} HGT."
success = True
else:
status = "Status: Error - Manager N/A."
except Exception as e:
logging.exception("GUI Error: area download task")
status = f"Status: Error: {type(e).__name__}"
finally:
self.root.after(0, self._area_download_complete_ui, status, success, p_count, o_count)
def _area_download_complete_ui(self, status_msg, success, processed, obtained):
self.download_status_label.config(text=status_msg)
self._set_busy(False)
if success:
summary = f"Processed {processed} tiles.\nObtained {obtained} HGT files."
messagebox.showinfo("Download Complete", summary, parent=self.root)
if MATPLOTLIB_AVAILABLE and PIL_AVAILABLE: self.show_area_button.config(state=tk.NORMAL)
else:
err = status_msg.split(":")[-1].strip()
messagebox.showerror("Download Error", f"Area download failed: {err}\nCheck logs.", parent=self.root)
self.show_area_button.config(state=tk.DISABLED)
def trigger_2d_display(self):
if not (MATPLOTLIB_AVAILABLE and PIL_AVAILABLE):
messagebox.showwarning("Deps Error", "Matplotlib/PIL N/A.", parent=self.root)
return
if not self.last_valid_point_coords:
messagebox.showinfo("Info", "Get elevation first.", parent=self.root)
return
if self.manager is None: return
lat, lon = self.last_valid_point_coords
tile_info = self.manager.get_tile_info(lat, lon)
if tile_info and tile_info.get("browse_available") and tile_info.get("browse_image_path"):
img_path = tile_info["browse_image_path"]
t_name = tile_info.get("tile_base_name", "Unknown")
win_title = f"Browse: {t_name.upper()}"
try:
proc = multiprocessing.Process(target=process_target_show_image, args=(img_path, t_name, win_title), daemon=True)
proc.start()
except Exception as e:
logging.exception("GUI Error: start 2D process")
messagebox.showerror("Process Error", f"Could not start 2D display:\n{e}", parent=self.root)
else:
messagebox.showinfo("Image Info", "Browse image N/A.", parent=self.root)
def trigger_3d_display(self):
if not MATPLOTLIB_AVAILABLE:
messagebox.showwarning("Deps Error", "Matplotlib N/A.", parent=self.root)
return
if not self.last_valid_point_coords:
messagebox.showinfo("Info", "Get elevation first.", parent=self.root)
return
if self.manager is None: return
lat, lon = self.last_valid_point_coords
hgt_data = self.manager.get_hgt_data(lat, lon)
if hgt_data is not None:
tile_info = self.manager.get_tile_info(lat, lon)
t_name = tile_info.get("tile_base_name", "Unknown").upper() if tile_info else "Unknown"
p_title = f"3D View: Tile {t_name}"
# --- Configuration for Plotting ---
initial_subsample_val = 1
smooth_sigma_val: Optional[float] = 0.5 # Light smoothing or None
interpolation_factor_val: int = 2 # 2, 3, or 4 typically
plot_grid_points_val: int = 150 # Target plot points (e.g., 300x300)
if not SCIPY_AVAILABLE:
if smooth_sigma_val is not None:
logging.warning("SciPy N/A. Disabling 3D smoothing.")
smooth_sigma_val = None
if interpolation_factor_val > 1:
logging.warning("SciPy N/A. Disabling 3D interpolation.")
interpolation_factor_val = 1
# --- End Configuration ---
try:
proc_kwargs = {
"initial_subsample": initial_subsample_val,
"smooth_sigma": smooth_sigma_val,
"interpolation_factor": interpolation_factor_val,
"plot_grid_points": plot_grid_points_val
}
proc = multiprocessing.Process(target=process_target_show_3d, args=(hgt_data, p_title), kwargs=proc_kwargs, daemon=True)
proc.start()
logging.info(f"Started 3D display process {proc.pid} (InitialSub:{initial_subsample_val}, Smooth:{smooth_sigma_val}, Interp:{interpolation_factor_val}x, PlotGrid:{plot_grid_points_val})")
except Exception as e:
logging.exception("GUI Error: start 3D process")
messagebox.showerror("Process Error", f"Could not start 3D display:\n{e}", parent=self.root)
else:
messagebox.showerror("3D Data Error", "Could not retrieve HGT data.\nCheck logs.", parent=self.root)
def trigger_area_display(self):
if not (MATPLOTLIB_AVAILABLE and PIL_AVAILABLE):
messagebox.showwarning("Deps Error", "Matplotlib/PIL N/A.", parent=self.root)
return
if not self.last_area_coords:
messagebox.showinfo("Info", "Download area first.", parent=self.root)
return
if self.manager is None: return
min_lat, min_lon, max_lat, max_lon = self.last_area_coords
tile_info_list = self.manager.get_area_tile_info(min_lat, min_lon, max_lat, max_lon)
if not tile_info_list:
messagebox.showinfo("Area Info", "No tile info found.", parent=self.root)
return
win_title = f"Area: Lat [{min_lat:.1f}-{max_lat:.1f}], Lon [{min_lon:.1f}-{max_lon:.1f}]"
try:
proc = multiprocessing.Process(target=process_target_create_show_area, args=(tile_info_list, win_title), daemon=True)
proc.start()
except Exception as e:
logging.exception("GUI Error: start area display process")
messagebox.showerror("Process Error", f"Could not start area display:\n{e}", parent=self.root)
# --- Main Execution Block (for direct script testing) ---
if __name__ == "__main__":
print("Running elevation_gui.py directly for testing...")
if not RASTERIO_AVAILABLE: print("WARNING (Test): Rasterio N/A.")
if not PIL_AVAILABLE: print("WARNING (Test): Pillow N/A.")
if not MATPLOTLIB_AVAILABLE: print("WARNING (Test): Matplotlib N/A.")
if SCIPY_AVAILABLE: print("INFO (Test): SciPy available.")
else: print("WARNING (Test): SciPy N/A (no smooth/interp).")
# Call freeze_support() here for direct test runs of the GUI module
# This is because run_gui_application (which normally calls it) isn't used here.
multiprocessing.freeze_support() # Important if launching GUI processes
root = tk.Tk()
try:
app = ElevationApp(root)
root.mainloop()
except Exception as e_main:
logging.critical(f"Error in direct run of elevation_gui: {e_main}", exc_info=True)
try:
err_root = tk.Tk()
err_root.withdraw()
messagebox.showerror("Fatal Error (Test Run)", f"App failed:\n{e_main}")
err_root.destroy()
except Exception: pass
exit(1)