add boundary, images, point to google earth

This commit is contained in:
VALLONGOL 2025-04-28 12:39:47 +02:00
parent acd1965882
commit 8feef98b58
4 changed files with 447 additions and 197 deletions

3
.gitignore vendored
View File

@ -6,4 +6,5 @@ dumps/
__pycache__/
_build/
_dist/
sar_images/
sar_images/
kml_output/

View File

@ -71,13 +71,13 @@ from utils import (
clear_queue,
decimal_to_dms,
dms_string_to_decimal,
generate_sar_kml,
generate_sar_kml, # Rimane invariato
launch_google_earth,
cleanup_old_kml_files,
# cleanup_old_kml_files, # Rimuovere o commentare il vecchio import
cleanup_kml_output_directory, # <<< NUOVO IMPORT
open_google_maps,
generate_lookat_and_point_kml,
generate_multi_point_kml,
generate_lookat_and_point_kml,
generate_composite_kml, # Modificato precedentemente per usare questa
_simplekml_available,
_pyproj_available,
format_ctypes_structure,
@ -991,12 +991,15 @@ class ControlPanelApp:
self.set_status(f"Error launching GE for {source_desc}.")
def go_to_all_gearth(self):
"""Callback for 'GE All' button; opens Google Earth with multiple points."""
"""
Callback for 'GE All' button. Generates a composite KML including
points, SAR footprint, and SAR ground overlay, then launches Google Earth.
"""
log_prefix = "[App CB GE All]"
if not hasattr(self, "state") or self.state.shutting_down:
return
if not _simplekml_available:
logging.error(f"{log_prefix} Cannot proceed: simplekml missing.")
if not _simplekml_available or not _pyproj_available:
logging.error(f"{log_prefix} Cannot proceed: simplekml or pyproj missing.")
self.set_status("Error: KML library missing.")
return
@ -1006,21 +1009,15 @@ class ControlPanelApp:
return
points_to_plot: List[Tuple[float, float, str, Optional[str]]] = []
# Map internal source names to KML names and UI variables
# --- 1. Collect Point Data (same as before) ---
source_map = {
"SAR Center": ("SAR Center", control_panel_ref.sar_center_coords_var),
"SAR Mouse": ("Mouse on SAR", control_panel_ref.mouse_coords_var),
"Map Mouse": ("Mouse on Map", control_panel_ref.map_mouse_coords_var),
}
# Iterate through sources, parse coordinates, add valid points
for internal_name, (kml_name, tk_var) in source_map.items():
coords_text = tk_var.get()
lat_deg, lon_deg = None, None
logging.debug(
f"{log_prefix} Processing {internal_name} (KML: {kml_name}) - Text: '{coords_text}'"
)
if (
coords_text
and "N/A" not in coords_text
@ -1058,53 +1055,82 @@ class ControlPanelApp:
f"Source: {internal_name}\nCoords: {coords_text}",
)
)
logging.debug(
f"{log_prefix} Added valid point: {kml_name} ({lat_deg:.6f}, {lon_deg:.6f})"
)
else:
logging.warning(
f"{log_prefix} Could not parse coords for {internal_name} from '{coords_text}'"
)
except ValueError as ve:
logging.error(
f"{log_prefix} Parsing error for {internal_name}: {ve} (Text: '{coords_text}')"
)
except Exception as parse_err:
logging.error(
f"{log_prefix} Error parsing coords for {internal_name}: {parse_err}"
)
else:
logging.warning(
f"{log_prefix} Skipping invalid/unavailable coords for {internal_name}."
)
# Check if any valid points were found
if not points_to_plot:
self.set_status("Error: No valid coordinates available.")
logging.warning(f"{log_prefix} No valid points found.")
# --- 2. Get SAR Data and GeoInfo from AppState ---
sar_normalized_uint8 = self.state.current_sar_normalized
geo_info = self.state.current_sar_geo_info
if sar_normalized_uint8 is None or sar_normalized_uint8.size == 0:
logging.error(f"{log_prefix} No current SAR image data available.")
self.set_status("Error: SAR image data missing.")
return
if not geo_info or not geo_info.get("valid", False):
logging.error(f"{log_prefix} Invalid or missing SAR GeoInfo available.")
self.set_status("Error: SAR GeoInfo missing.")
return
# Generate Multi-Point KML
try:
logging.debug(
f"{log_prefix} Generating multi-point KML for {len(points_to_plot)} points..."
)
temp_kml_path = generate_multi_point_kml(points_to_plot)
# --- 3. Process SAR Image for Overlay (B/C, Palette) ---
# Retrieve current parameters from state
bc_lut = self.state.brightness_contrast_lut
palette = self.state.sar_palette
if temp_kml_path:
# Launch Google Earth
if bc_lut is None:
logging.error(f"{log_prefix} SAR B/C LUT is missing. Cannot process image.")
self.set_status("Error: SAR LUT missing.")
return
logging.debug(f"{log_prefix} Processing current SAR image for KML overlay...")
try:
# Start with the normalized uint8 image from state
img_for_kml = sar_normalized_uint8.copy()
# Apply B/C LUT
img_for_kml = cv2.LUT(img_for_kml, bc_lut)
# Apply Color Palette (if not GRAY)
if palette != "GRAY":
img_for_kml = apply_color_palette(img_for_kml, palette)
# Ensure BGR format even if GRAY palette was used
elif img_for_kml.ndim == 2:
img_for_kml = cv2.cvtColor(img_for_kml, cv2.COLOR_GRAY2BGR)
if img_for_kml is None:
raise ValueError("Image processing for KML resulted in None.")
logging.debug(f"{log_prefix} SAR image processed for KML (shape: {img_for_kml.shape}).")
except Exception as proc_err:
logging.exception(f"{log_prefix} Error processing SAR image for KML:")
self.set_status("Error processing SAR image.")
return
# --- 4. Generate Composite KML ---
try:
logging.debug(f"{log_prefix} Generating composite KML...")
# Pass the collected points, the *processed* SAR image, and geo_info
composite_kml_path = generate_composite_kml(
points_to_plot, img_for_kml, geo_info
)
if composite_kml_path:
# --- 5. Launch Google Earth ---
logging.debug(
f"{log_prefix} Launching GE with multi-point KML: {temp_kml_path}"
f"{log_prefix} Launching GE with composite KML: {composite_kml_path}"
)
launch_google_earth(temp_kml_path)
launch_google_earth(composite_kml_path)
self.set_status("Launched Google Earth with composite view.")
# Optionally: Clean up old KML files (including this one eventually)
cleanup_kml_output_directory(config.KML_OUTPUT_DIRECTORY, config.MAX_KML_FILES)
else:
logging.error(f"{log_prefix} Failed to generate multi-point KML file.")
logging.error(f"{log_prefix} Failed to generate composite KML file.")
self.set_status("Error: Failed to create KML.")
except Exception as e:
logging.exception(
f"{log_prefix} Error preparing/launching GE for multiple points:"
f"{log_prefix} Error generating/launching composite KML:"
)
self.set_status("Error launching Google Earth.")
self.set_status("Error during GE All generation.")
def toggle_sar_metadata_display(self):
"""Callback for 'Show SAR Metadata' checkbox. Shows/Hides the metadata panel using grid."""
@ -1699,12 +1725,34 @@ class ControlPanelApp:
self._update_fps_stats("sar")
def _handle_kml_generation(self, geo_info):
"""Handles KML generation, cleanup, and optional launch."""
"""
Handles KML generation, cleanup, and optional launch.
Checks the ENABLE_AUTO_SAR_KML_GENERATION flag before proceeding.
"""
log_prefix = "[App KML]" # Local log prefix
# --- >>> START OF NEW CODE <<< ---
# Check the configuration flag first
if not config.ENABLE_AUTO_SAR_KML_GENERATION:
# Log only if the general KML generation is enabled but auto is disabled
if config.ENABLE_KML_GENERATION:
logging.debug(
f"{log_prefix} Automatic KML generation for SAR footprint is disabled via config flag. Skipping."
)
return # Exit if automatic generation is disabled
# --- >>> END OF NEW CODE <<< ---
# Check if KML generation is globally disabled
if not config.ENABLE_KML_GENERATION:
# This condition might be redundant if the caller already checks,
# but added for safety.
return
# Check if libraries needed for KML were loaded
if not _simplekml_available or not _pyproj_available:
# Log this only once maybe? Or check flag? For now, log each time.
logging.warning(
"[App KML] Skipping KML generation: simplekml or pyproj missing."
f"{log_prefix} Skipping KML generation: simplekml or pyproj missing."
)
return
try:
@ -1715,16 +1763,21 @@ class ControlPanelApp:
fn = f"sar_footprint_{ts}.kml"
fp = os.path.join(kml_dir, fn)
# Call utility function to generate KML
logging.debug(f"{log_prefix} Generating KML file: {fp}")
success = generate_sar_kml(geo_info, fp)
if success:
logging.debug(f"{log_prefix} KML generation successful.")
# Call utility function to clean up old KML files
cleanup_old_kml_files(config.KML_OUTPUT_DIRECTORY, config.MAX_KML_FILES)
cleanup_kml_output_directory(
config.KML_OUTPUT_DIRECTORY, config.MAX_KML_FILES
)
# Optionally launch Google Earth
if config.AUTO_LAUNCH_GOOGLE_EARTH:
launch_google_earth(fp) # Use utility function
# else: error logged by generate_sar_kml
else:
logging.error(f"{log_prefix} KML generation failed (utility returned False).")
except Exception as e:
logging.exception(f"[App KML] Error during KML handling: {e}")
logging.exception(f"{log_prefix} Error during KML handling: {e}")
def _process_mfd_update_on_main_thread(self):
"""Processes MFD updates in the main thread: pipeline, FPS."""

View File

@ -197,7 +197,10 @@ DEFAULT_MAX_SAR_RECORDINGS = (
)
# --- KML / Google Earth Integration Configuration ---
ENABLE_KML_GENERATION = True
ENABLE_KML_GENERATION = True # Set to False to completely disable KML features
ENABLE_AUTO_SAR_KML_GENERATION = (
True # Set to False to disable KML on every SAR packet
)
KML_OUTPUT_DIRECTORY = "kml_output"
AUTO_LAUNCH_GOOGLE_EARTH = False
MAX_KML_FILES = 30 # Max KMLs to keep (0 or less disables cleanup)

481
utils.py
View File

@ -24,40 +24,77 @@ import shutil
from pathlib import Path
import webbrowser
import urllib.parse
import re # For DMS parsing
from typing import Optional, Tuple, List, Any, Dict # Added List, Any, Dict
import re # For DMS parsing
from typing import Optional, Tuple, List, Any, Dict
import tempfile
import ctypes # Needed for format_ctypes_structure
import ctypes # Needed for format_ctypes_structure
# Third-party imports (ensure these are installed)
import numpy as np
# Import KML and GEO libraries, handling ImportError
# --- Availability Flags & Library Imports ---
# Define flags *before* use, handle ImportErrors
_simplekml_available = False
try:
import simplekml
_simplekml_available = True
except ImportError:
simplekml = None
_simplekml_available = False
logging.warning(
"[Utils KML] Library 'simplekml' not found. KML generation disabled. "
"[Utils Init] Library 'simplekml' not found. KML generation disabled. "
"(pip install simplekml)"
)
_pyproj_available = False
try:
import pyproj
_pyproj_available = True
except ImportError:
pyproj = None
_pyproj_available = False
# Pyproj is needed for accurate geo calculations, log warning if missing
logging.warning(
"[Utils Geo] Library 'pyproj' not found. "
"[Utils Init] Library 'pyproj' not found. "
"Some geometric calculations (KML corners, BBox) may be less accurate. "
"(pip install pyproj)"
)
_cv2_available = False
try:
import cv2
_cv2_available = True
except ImportError:
cv2 = None
logging.warning(
"[Utils Init] Library 'opencv-python' not found. "
"Cannot save images for KML overlays."
)
_lxml_available = False
try:
from lxml import etree
_lxml_available = True
except ImportError:
etree = None
logging.warning("[Utils Init] Library 'lxml' not found. Cannot use gx:LatLonQuad fallback.")
# --- Local application imports (after third-party and flag definitions) ---
import config # Import config after potential library warnings
_apply_color_palette_available = False
try:
# Import only after cv2 availability check might be safer, but fine here for now
from image_processing import apply_color_palette
_apply_color_palette_available = True
except ImportError:
apply_color_palette = None
logging.warning(
"[Utils Init] Function 'apply_color_palette' not found. "
"Color palettes in KML overlay might not work."
)
# --- Queue Management Functions ---
def put_queue(queue_obj, item, queue_name="Unknown", app_instance=None):
"""
Safely puts an item into a queue using non-blocking put.
@ -78,28 +115,23 @@ def put_queue(queue_obj, item, queue_name="Unknown", app_instance=None):
and hasattr(app_instance, "state")
and app_instance.state.shutting_down
):
# Silently discard if shutting down (or log debug -1 if needed)
# logging.log(logging.DEBUG - 1, f"{log_prefix} Discarding item for queue '{queue_name}': Shutdown.")
return
return # Silently discard if shutting down
try:
# Put item without blocking
queue_obj.put(item, block=False)
# Log successful put at very low debug level if needed
# logging.log(logging.DEBUG - 1, f"{log_prefix} Item put onto queue '{queue_name}'.")
except queue.Full:
# Log queue full warning (consider reducing level if too noisy)
# Log queue full warning
logging.warning(f"{log_prefix} Queue '{queue_name}' is full. Dropping item.")
# Increment the specific drop counter via the AppState instance
if app_instance and hasattr(app_instance, "state"):
try:
app_instance.state.increment_dropped_count(queue_name)
except Exception as e:
# Log error only if incrementing fails (shouldn't happen)
logging.error(
f"{log_prefix} Failed to increment drop count for '{queue_name}': {e}"
)
else:
# Log error if app instance/state is not available to increment count
logging.error(
f"{log_prefix} Cannot increment drop count for '{queue_name}': "
"App instance or state missing."
@ -115,10 +147,13 @@ def clear_queue(q: queue.Queue):
"""Removes all items currently in the specified queue."""
log_prefix = "[Utils Queue Clear]"
try:
q_size_before = q.qsize() # Get approximate size before clearing
q_size_before = q.qsize() # Get approximate size before clearing
# Ensure thread safety during clear operation
with q.mutex:
q.queue.clear()
# Reset task tracking if used
if hasattr(q, 'unfinished_tasks'):
q.unfinished_tasks = 0
logging.debug(
f"{log_prefix} Cleared queue (approx size before: {q_size_before})."
)
@ -127,6 +162,7 @@ def clear_queue(q: queue.Queue):
# --- Coordinate Formatting ---
def decimal_to_dms(decimal_degrees: float, is_latitude: bool) -> str:
"""
Converts decimal degrees to a formatted DMS (Degrees, Minutes, Seconds) string.
@ -163,15 +199,15 @@ def decimal_to_dms(decimal_degrees: float, is_latitude: bool) -> str:
)
return "Invalid Lat"
direction = "N" if decimal_degrees >= 0 else "S"
deg_pad = 2 # Padding for degrees (e.g., 01°, 89°)
else: # Longitude
deg_pad = 2 # Padding for degrees (e.g., 01°, 89°)
else: # Longitude
if not (-180.0 <= decimal_degrees <= 180.0):
logging.warning(
f"{log_prefix} Longitude {decimal_degrees} out of range."
)
return "Invalid Lon"
direction = "E" if decimal_degrees >= 0 else "W"
deg_pad = 3 # Padding for degrees (e.g., 001°, 179°)
deg_pad = 3 # Padding for degrees (e.g., 001°, 179°)
# Calculations
dd_abs = abs(decimal_degrees)
@ -188,7 +224,6 @@ def decimal_to_dms(decimal_degrees: float, is_latitude: bool) -> str:
minutes = 0
degrees += 1
# Recalculate absolute degrees if degrees incremented past range limit
# (e.g., 89° 59' 59.999" N should become 90° 00' 00.00" N)
if is_latitude and degrees > 90:
degrees = 90
minutes = 0
@ -202,7 +237,6 @@ def decimal_to_dms(decimal_degrees: float, is_latitude: bool) -> str:
dms_string = (
f"{degrees:0{deg_pad}d}° {minutes:02d}' {seconds:05.2f}\" {direction}"
)
# logging.debug(f"{log_prefix} Converted {decimal_degrees:.6f} -> {dms_string}")
return dms_string
except Exception as e:
logging.exception(
@ -229,12 +263,11 @@ def dms_string_to_decimal(dms_str: str, is_latitude: bool) -> Optional[float]:
return None
# Regex to capture degrees, minutes, seconds, and direction
# Allows for variable spacing and ° ' " or ” symbols.
pattern = re.compile(
r"^\s*(\d{1,3})\s*[°]\s*" # Degrees (1-3 digits) + ° symbol
r"(\d{1,2})\s*[']\s*" # Minutes (1-2 digits) + ' symbol
r"([\d.]+)\s*[\"”]\s*" # Seconds (digits, dot) + " or ” symbol
r"([NSEWnsew])\s*$", # Direction (N,S,E,W case-insensitive)
r"^\s*(\d{1,3})\s*[°]\s*" # Degrees (1-3 digits) + ° symbol
r"(\d{1,2})\s*[']\s*" # Minutes (1-2 digits) + ' symbol
r"([\d.]+)\s*[\"”]\s*" # Seconds (digits, dot) + " or ” symbol
r"([NSEWnsew])\s*$", # Direction (N,S,E,W case-insensitive)
re.IGNORECASE,
)
match = pattern.match(dms_str.strip())
@ -280,7 +313,7 @@ def dms_string_to_decimal(dms_str: str, is_latitude: bool) -> Optional[float]:
logging.warning(
f"{log_prefix} Calculated latitude {decimal_degrees:.7f} out of range."
)
return None # Treat out of range as error
return None # Treat out of range as error
if not is_latitude and not (-180.0 <= decimal_degrees <= 180.0):
logging.warning(
f"{log_prefix} Calculated longitude {decimal_degrees:.7f} out of range."
@ -304,6 +337,7 @@ def dms_string_to_decimal(dms_str: str, is_latitude: bool) -> Optional[float]:
# --- Metadata Formatting Function ---
def format_ctypes_structure(structure: ctypes.Structure, indent_level: int = 0) -> str:
"""
Generates a formatted string representation of a ctypes Structure,
@ -316,7 +350,7 @@ def format_ctypes_structure(structure: ctypes.Structure, indent_level: int = 0)
Returns:
A multi-line string representing the structure's content.
"""
indent = " " * indent_level # Define indentation string
indent = " " * indent_level # Define indentation string
result = ""
# Check if it's actually a structure with _fields_ attribute
@ -348,17 +382,17 @@ def format_ctypes_structure(structure: ctypes.Structure, indent_level: int = 0)
if issubclass(elem_type, (ctypes.c_byte, ctypes.c_ubyte)):
try:
# Attempt conversion to bytes for hex preview
byte_value = bytes(value[:16]) # Limit preview length
byte_value = bytes(value[:16]) # Limit preview length
preview = byte_value.hex().upper()
if len(value) > 16:
preview += "..."
formatted_value = f"<{elem_name}[{array_len}] Data: {preview}>"
except: # Fallback if conversion to bytes fails
except: # Fallback if conversion to bytes fails
formatted_value = f"<{elem_name}[{array_len}] (Preview N/A)>"
# Handle simple numeric arrays (show first few elements)
elif isinstance(value, (list, tuple)):
preview = str(list(value[:8])) # Limit preview length
preview = str(list(value[:8])) # Limit preview length
if len(value) > 8:
# Indicate truncation
preview = preview[:-1] + ", ...]"
@ -371,8 +405,7 @@ def format_ctypes_structure(structure: ctypes.Structure, indent_level: int = 0)
# --- Handle Basic Types ---
else:
# --- Special Formatting for Known Fields ---
# GeoData floats (convert radians to degrees for readability)
# Special Formatting for Known Fields
if field_name in (
"LATITUDE",
"LONGITUDE",
@ -387,7 +420,6 @@ def format_ctypes_structure(structure: ctypes.Structure, indent_level: int = 0)
formatted_value = f"{value:.7f} rad ({deg_value:.{prec}f} deg)"
else:
formatted_value = f"{value} rad (Non-finite)"
# SFP Header special fields (show hex and char if printable)
elif field_name in (
"SFP_MARKER",
"SFP_PT_SPARE",
@ -409,23 +441,22 @@ def format_ctypes_structure(structure: ctypes.Structure, indent_level: int = 0)
):
char_repr = chr(value) if 32 <= value <= 126 else "?"
formatted_value = f"0x{value:02X} ('{char_repr}')"
# Handle byte strings (show decoded + hex preview)
elif isinstance(value, bytes):
try:
decoded = value.decode("ascii", errors="replace")
hex_preview = value.hex().upper()[:32] # Limit hex preview
hex_preview = value.hex().upper()[:32] # Limit hex preview
suffix = "..." if len(value) > 16 else ""
formatted_value = f"'{decoded}' (hex: {hex_preview}{suffix})"
except:
formatted_value = repr(value) # Fallback if not decodable
# Default representation for other types
formatted_value = repr(value) # Fallback if not decodable
else:
# Default representation for other types
formatted_value = repr(value)
result += f"{indent}{field_name}: {formatted_value}\n"
except AttributeError:
# Handle cases where getattr might fail (should be rare)
# Handle cases where getattr might fail
result += f"{indent}{field_name}: <Attribute Error>\n"
except Exception as e:
# Catch any other formatting errors for a specific field
@ -435,6 +466,7 @@ def format_ctypes_structure(structure: ctypes.Structure, indent_level: int = 0)
# --- Google Maps Launcher ---
def open_google_maps(latitude_deg: float, longitude_deg: float):
"""Opens the default web browser to Google Maps centered on the specified coordinates."""
log_prefix = "[Utils Gmaps]"
@ -472,11 +504,12 @@ def open_google_maps(latitude_deg: float, longitude_deg: float):
# --- KML Generation and Management ---
def _calculate_geo_corners_for_kml(
geo_info_radians: Dict[str, Any],
) -> Optional[List[Tuple[float, float]]]:
"""Internal helper to calculate geographic corners (degrees) from geo_info (radians). Requires pyproj."""
# Requires pyproj library
# Check if pyproj is available
if not _pyproj_available:
logging.error("[Utils KML Calc] pyproj library needed for corner calculation.")
return None
@ -489,10 +522,8 @@ def _calculate_geo_corners_for_kml(
# Extract necessary info from the dictionary
center_lat_rad = geo_info_radians["lat"]
center_lon_rad = geo_info_radians["lon"]
# Orientation: Use the *negative* for forward projection from North
# (Angle typically defines rotation FROM North axis)
orient_rad = geo_info_radians["orientation"]
calc_orient_rad = -orient_rad # Angle used for projection
calc_orient_rad = -orient_rad # Angle used for projection
ref_x = geo_info_radians["ref_x"]
ref_y = geo_info_radians["ref_y"]
@ -506,9 +537,7 @@ def _calculate_geo_corners_for_kml(
logging.error(f"{log_prefix} Invalid scale or dimensions in geo_info.")
return None
# Define corner pixel coordinates relative to reference pixel (ref_x, ref_y)
# Top-Left (0,0), Top-Right (w-1, 0), Bottom-Right (w-1, h-1), Bottom-Left (0, h-1)
# Note: Pixel Y increases downwards
# Define corner pixel coordinates relative to reference pixel
corners_pixel_delta = [
(0 - ref_x, ref_y - 0), # Top-Left Delta
(width - 1 - ref_x, ref_y - 0), # Top-Right Delta
@ -523,7 +552,7 @@ def _calculate_geo_corners_for_kml(
# Rotate meter deltas based on orientation if significant
corners_meters_rotated = corners_meters
if abs(calc_orient_rad) > 1e-6: # Only rotate if angle is non-zero
if abs(calc_orient_rad) > 1e-6: # Only rotate if angle is non-zero
cos_o = math.cos(calc_orient_rad)
sin_o = math.sin(calc_orient_rad)
corners_meters_rotated = [
@ -536,19 +565,18 @@ def _calculate_geo_corners_for_kml(
center_lon_deg = math.degrees(center_lon_rad)
center_lat_deg = math.degrees(center_lat_rad)
# Loop through rotated meter deltas
for dx_m, dy_m in corners_meters_rotated:
# Calculate distance and azimuth from center to corner
distance = math.hypot(dx_m, dy_m)
# Azimuth: angle from North clockwise (atan2 gives angle from East CCW)
azimuth = math.degrees(math.atan2(dx_m, dy_m)) % 360.0 # Ensure 0-360 range
azimuth = math.degrees(math.atan2(dx_m, dy_m)) % 360.0
# Use pyproj geod.fwd for accurate projection
lon, lat, _ = geod.fwd(center_lon_deg, center_lat_deg, azimuth, distance)
corners_geo_deg.append((lon, lat)) # Store (longitude, latitude)
corners_geo_deg.append((lon, lat)) # Store (longitude, latitude)
# Ensure we have exactly 4 corners
if len(corners_geo_deg) == 4:
# logging.debug(f"{log_prefix} Calculated corners (Lon, Lat): {corners_geo_deg}")
return corners_geo_deg
else:
logging.error(f"{log_prefix} Incorrect number of corners calculated.")
@ -588,7 +616,7 @@ def generate_sar_kml(geo_info_radians: Dict[str, Any], output_path: str) -> bool
corners_deg = _calculate_geo_corners_for_kml(geo_info_radians)
if corners_deg is None:
logging.error(f"{log_prefix} Failed to calculate KML corners.")
return False # Error already logged by helper
return False # Error already logged by helper
# Extract center and orientation for LookAt view
center_lon_deg = math.degrees(geo_info_radians["lon"])
@ -605,12 +633,10 @@ def generate_sar_kml(geo_info_radians: Dict[str, Any], output_path: str) -> bool
geo_info_radians.get("scale_y", 1.0)
* geo_info_radians.get("height_px", 1000)
) / 1000.0
# Simple heuristic for altitude (e.g., 2x the larger dimension in km * 1000)
view_altitude_m = max(width_km, height_km) * 2000.0
# Clamp altitude to a reasonable range
view_altitude_m = max(1000.0, min(view_altitude_m, 500000.0))
except:
view_altitude_m = 10000.0 # Default altitude on error
view_altitude_m = 10000.0 # Default altitude on error
# Create KML object
kml = simplekml.Kml(name=f"SAR Image {datetime.datetime.now():%Y%m%d_%H%M%S}")
@ -619,26 +645,23 @@ def generate_sar_kml(geo_info_radians: Dict[str, Any], output_path: str) -> bool
kml.document.lookat.longitude = center_lon_deg
kml.document.lookat.latitude = center_lat_deg
kml.document.lookat.range = view_altitude_m
kml.document.lookat.tilt = 45.0 # Default tilt
kml.document.lookat.heading = orientation_deg % 360.0 # Ensure 0-360 heading
kml.document.lookat.tilt = 45.0
kml.document.lookat.heading = orientation_deg % 360.0
kml.document.lookat.altitudemode = simplekml.AltitudeMode.relativetoground
# Create the polygon footprint
# KML uses (longitude, latitude, altitude) tuples
# Close the polygon by repeating the first point at the end
outer_boundary = [(lon, lat, 0) for lon, lat in corners_deg] + [
(corners_deg[0][0], corners_deg[0][1], 0)
]
polygon = kml.newpolygon(name="SAR Footprint", outerboundaryis=outer_boundary)
# Style the polygon (red outline, semi-transparent red fill)
# Style the polygon
polygon.style.linestyle.color = simplekml.Color.red
polygon.style.linestyle.width = 2
polygon.style.polystyle.color = simplekml.Color.changealphaint(
100, simplekml.Color.red # ~40% opacity red fill
100, simplekml.Color.red
)
polygon.style.polystyle.outline = 1 # Ensure outline is drawn
polygon.style.polystyle.outline = 1
# Save the KML file
kml.save(output_path)
@ -696,25 +719,23 @@ def generate_lookat_and_point_kml(
return None
try:
kml = simplekml.Kml(name=placemark_name) # Use name for KML document too
kml = simplekml.Kml(name=placemark_name)
# Set LookAt parameters
kml.document.lookat.longitude = longitude_deg
kml.document.lookat.latitude = latitude_deg
kml.document.lookat.range = max(100.0, altitude_m) # Min range 100m
kml.document.lookat.range = max(100.0, altitude_m)
kml.document.lookat.tilt = tilt
kml.document.lookat.heading = heading % 360.0
kml.document.lookat.altitudemode = simplekml.AltitudeMode.relativetoground
# Add the Placemark point
point = kml.newpoint(name=placemark_name)
# simplekml uses (lon, lat, optional_alt) order for coordinates
point.coords = [(longitude_deg, latitude_deg)]
point.coords = [(longitude_deg, latitude_deg)] # (lon, lat) order
if placemark_desc:
point.description = placemark_desc
# Create a temporary file (delete=False so GE can open it)
# Consider using a subfolder in the main app temp/cache dir?
# Create a temporary file
with tempfile.NamedTemporaryFile(
mode="w", suffix=".kml", delete=False, encoding="utf-8"
) as temp_kml:
@ -722,7 +743,7 @@ def generate_lookat_and_point_kml(
logging.debug(
f"{log_prefix} Saving temporary LookAt+Point KML to: {temp_kml_path}"
)
temp_kml.write(kml.kml()) # Write KML content
temp_kml.write(kml.kml())
logging.debug(f"{log_prefix} LookAt+Point KML file created successfully.")
return temp_kml_path
@ -732,41 +753,114 @@ def generate_lookat_and_point_kml(
return None
def generate_multi_point_kml(
def generate_composite_kml(
points_data: List[Tuple[float, float, str, Optional[str]]],
sar_image_processed: np.ndarray, # Assumed BGR uint8
geo_info_radians: Dict[str, Any],
) -> Optional[str]:
"""
Generates a temporary KML file containing multiple Placemarks (points)
and a LookAt view potentially encompassing them.
Generates a KML file including placemarks, SAR footprint polygon, and
a GroundOverlay using gx:LatLonQuad for accurate image placement.
Saves the KML and a temporary PNG overlay image to the configured KML output directory.
Args:
points_data (List[Tuple[float, float, str, Optional[str]]]):
List of (latitude_deg, longitude_deg, placemark_name, optional_desc).
sar_image_processed (np.ndarray): The SAR image processed for display
(uint8 BGR, after B/C and palette).
geo_info_radians (Dict[str, Any]): The SAR georeferencing info (radians).
Returns:
Optional[str]: Path to the temporary KML file, or None on error.
Optional[str]: Full path to the saved KML file, or None on error.
"""
log_prefix = "[Utils KML MultiPoint]"
if not _simplekml_available:
logging.error(f"{log_prefix} Cannot generate KML: simplekml missing.")
log_prefix = "[Utils KML Composite]"
# Library checks (using flags defined globally)
if not _simplekml_available or not _pyproj_available:
logging.error(f"{log_prefix} Cannot generate KML: simplekml or pyproj missing.")
return None
if not points_data:
logging.warning(f"{log_prefix} No points provided to generate KML.")
if not _cv2_available: # Check OpenCV flag
logging.error(f"{log_prefix} Cannot generate KML overlay: OpenCV (cv2) missing.")
return None
# Input data checks
if sar_image_processed is None or sar_image_processed.size == 0:
logging.error(f"{log_prefix} Cannot generate KML: Invalid SAR image provided.")
return None
if not geo_info_radians or not geo_info_radians.get("valid", False):
logging.error(f"{log_prefix} Cannot generate KML: Invalid geo_info provided.")
return None
png_output_path = None # Define variable before try block for cleanup
try:
kml = simplekml.Kml(name="Multiple Locations")
kml_dir_path = Path(config.KML_OUTPUT_DIRECTORY)
# Ensure output directory exists
try:
kml_dir_path.mkdir(parents=True, exist_ok=True)
except OSError as e:
logging.error(
f"{log_prefix} Failed to create KML directory '{kml_dir_path}': {e}"
)
return None
# --- Determine LookAt View (Look at first point) ---
first_lat, first_lon, _, _ = points_data[0]
kml.document.lookat.longitude = first_lon
kml.document.lookat.latitude = first_lat
kml.document.lookat.range = 20000 # Default range for multiple points
kml.document.lookat.tilt = 30
# --- 1. Save Processed SAR Image as Temporary PNG ---
ts_img = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
png_filename = f"sar_overlay_{ts_img}.png"
png_output_path = kml_dir_path / png_filename # Path defined here
kml_image_href = png_filename # Relative path for KML Icon href
try:
logging.debug(
f"{log_prefix} Saving processed SAR image to {png_output_path}..."
)
# Ensure uint8 and BGR before saving with OpenCV
img_to_save = sar_image_processed
if img_to_save.dtype != np.uint8:
img_to_save = np.clip(img_to_save, 0, 255).astype(np.uint8)
if img_to_save.ndim == 2:
img_to_save = cv2.cvtColor(img_to_save, cv2.COLOR_GRAY2BGR)
save_success = cv2.imwrite(str(png_output_path), img_to_save)
if not save_success:
raise IOError("cv2.imwrite failed to save PNG.")
logging.debug(f"{log_prefix} SAR overlay PNG saved successfully.")
except Exception as img_save_err:
logging.exception(f"{log_prefix} Failed to save SAR overlay PNG:")
return None # Cannot proceed without the image
# --- 2. Calculate Geographic Corners ---
corners_deg = _calculate_geo_corners_for_kml(geo_info_radians)
if corners_deg is None:
logging.error(f"{log_prefix} Failed to calculate SAR corners for KML.")
png_output_path.unlink(missing_ok=True) # Attempt cleanup
return None
# --- 3. Create KML Document ---
center_lon_deg = math.degrees(geo_info_radians["lon"])
center_lat_deg = math.degrees(geo_info_radians["lat"])
kml_doc_name = f"GE_All_{ts_img}"
kml = simplekml.Kml(name=kml_doc_name)
# --- 4. Set LookAt View (Center on SAR) ---
try:
width_km = (
geo_info_radians.get("scale_x", 1.0)
* geo_info_radians.get("width_px", 1000)
) / 1000.0
height_km = (
geo_info_radians.get("scale_y", 1.0)
* geo_info_radians.get("height_px", 1000)
) / 1000.0
view_altitude_m = max(width_km, height_km) * 2500.0
view_altitude_m = max(1000.0, min(view_altitude_m, 500000.0))
except:
view_altitude_m = 25000.0
kml.document.lookat.longitude = center_lon_deg
kml.document.lookat.latitude = center_lat_deg
kml.document.lookat.range = view_altitude_m
kml.document.lookat.tilt = 0
kml.document.lookat.heading = 0
kml.document.lookat.altitudemode = simplekml.AltitudeMode.relativetoground
kml.document.lookat.altitudemode = simplekml.AltitudeMode.clamptoground
# --- Define styles ---
# --- 5. Add Placemarks (Points) ---
style_sar_center = simplekml.Style()
style_sar_center.iconstyle.icon.href = (
"http://maps.google.com/mapfiles/kml/paddle/red-stars.png"
@ -783,14 +877,13 @@ def generate_multi_point_kml(
)
style_mouse_on_map.iconstyle.scale = 1.1
# --- Add Placemarks ---
folder_points = kml.newfolder(name="Points")
logging.debug(f"{log_prefix} Adding {len(points_data)} placemarks to KML.")
for lat, lon, name, desc in points_data:
point = kml.newpoint(name=name)
point.coords = [(lon, lat)] # (lon, lat) order
point = folder_points.newpoint(name=name)
point.coords = [(lon, lat)]
if desc:
point.description = desc
# Apply style based on name
if name == "SAR Center":
point.style = style_sar_center
@ -798,23 +891,105 @@ def generate_multi_point_kml(
point.style = style_mouse_on_sar
elif name == "Mouse on Map":
point.style = style_mouse_on_map
# else: default style
# --- Save to Temporary File ---
with tempfile.NamedTemporaryFile(
mode="w", suffix=".kml", delete=False, encoding="utf-8"
) as temp_kml:
temp_kml_path = temp_kml.name
logging.debug(
f"{log_prefix} Saving temporary Multi-Point KML to: {temp_kml_path}"
)
temp_kml.write(kml.kml())
# --- 6. Add SAR Footprint Polygon ---
folder_footprint = kml.newfolder(name="SAR Footprint")
outer_boundary = [(lon, lat, 0) for lon, lat in corners_deg] + [
(corners_deg[0][0], corners_deg[0][1], 0)
]
polygon = folder_footprint.newpolygon(
name="SAR Outline",
outerboundaryis=outer_boundary
)
polygon.extrude = 0
polygon.altitudemode = simplekml.AltitudeMode.clamptoground
polygon.style.linestyle.color = simplekml.Color.red
polygon.style.linestyle.width = 2
polygon.style.polystyle.fill = 0 # No fill
logging.debug(f"{log_prefix} Multi-Point KML file created successfully.")
return temp_kml_path
# --- 7. Add SAR Ground Overlay using gx:LatLonQuad (Manual XML) ---
folder_overlay = kml.newfolder(name="SAR Overlay")
# Prepare coordinates string: BL, BR, TR, TL
bl_lon, bl_lat = corners_deg[3]
br_lon, br_lat = corners_deg[2]
tr_lon, tr_lat = corners_deg[1]
tl_lon, tl_lat = corners_deg[0]
coord_string = (
f"{bl_lon:.8f},{bl_lat:.8f},0 "
f"{br_lon:.8f},{br_lat:.8f},0 "
f"{tr_lon:.8f},{tr_lat:.8f},0 "
f"{tl_lon:.8f},{tl_lat:.8f},0"
)
kml_added_manually = False
if _lxml_available: # <<< Use global flag
try:
kml_folder_element = getattr(folder_overlay, '_elem', None)
if kml_folder_element is None:
raise AttributeError("Cannot access internal simplekml element (_elem).")
GX_NAMESPACE = "http://www.google.com/kml/ext/2.2"
GX = "{%s}" % GX_NAMESPACE
go_elem = etree.Element("GroundOverlay")
etree.SubElement(go_elem, "name").text = "SAR Image"
icon_elem = etree.SubElement(go_elem, "Icon")
etree.SubElement(icon_elem, "href").text = kml_image_href
etree.SubElement(go_elem, "altitudeMode").text = "clampToGround"
quad_elem = etree.SubElement(go_elem, GX + "LatLonQuad")
etree.SubElement(quad_elem, "coordinates").text = coord_string
kml_folder_element.append(go_elem)
kml_added_manually = True
logging.debug(f"{log_prefix} Manually added GroundOverlay with gx:LatLonQuad using lxml.")
except ImportError: # Should not happen if _lxml_available is True
logging.error(f"{log_prefix} lxml import failed despite initial check.")
# Do NOT assign to _lxml_available here (avoid UnboundLocalError)
except AttributeError as ae:
logging.error(f"{log_prefix} Error accessing simplekml internal element for manual KML addition: {ae}")
except Exception as xml_err:
logging.exception(f"{log_prefix} Error manually constructing KML overlay XML: {xml_err}")
# --- Fallback if manual addition failed or lxml not available ---
if not kml_added_manually:
logging.warning(f"{log_prefix} Failed to add gx:LatLonQuad. Falling back to standard GroundOverlay with LatLonBox/Rotation (may look distorted).")
ground = folder_overlay.newgroundoverlay(name="SAR Image (Fallback)")
ground.icon.href = kml_image_href
lons = [c[0] for c in corners_deg]
lats = [c[1] for c in corners_deg]
north = max(lats)
south = min(lats)
east = max(lons)
west = min(lons)
orientation_deg = math.degrees(geo_info_radians.get("orientation", 0.0))
ground.latlonbox.north = north
ground.latlonbox.south = south
ground.latlonbox.east = east
ground.latlonbox.west = west
ground.latlonbox.rotation = -orientation_deg
ground.altitudemode = simplekml.AltitudeMode.clamptoground
# --- 8. Save Composite KML File ---
ts_kml = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
kml_filename = f"composite_{ts_kml}.kml"
kml_output_path = kml_dir_path / kml_filename
logging.debug(f"{log_prefix} Saving Composite KML to: {kml_output_path}")
kml_output_path.parent.mkdir(parents=True, exist_ok=True) # Ensure again
kml.save(str(kml_output_path))
logging.debug(f"{log_prefix} Composite KML file created successfully.")
return str(kml_output_path) # Return the full path
except Exception as e:
logging.exception(f"{log_prefix} Error generating Multi-Point KML file:")
logging.exception(f"{log_prefix} Error generating composite KML file:")
# Attempt cleanup of the PNG if it was created
if png_output_path is not None and png_output_path.exists():
try:
png_output_path.unlink()
logging.debug(f"{log_prefix} Cleaned up temporary PNG: {png_output_path}")
except OSError:
logging.warning(f"{log_prefix} Failed to cleanup temporary PNG: {png_output_path}")
return None
@ -860,60 +1035,78 @@ def launch_google_earth(kml_path: Optional[str]):
logging.exception(f"{log_prefix} Error launching KML handler:")
def cleanup_old_kml_files(kml_directory: str, max_files_to_keep: int):
"""Removes the oldest KML files from the directory if count exceeds limit."""
log_prefix = "[Utils KML Cleanup]"
def cleanup_kml_output_directory(kml_directory: str, max_files_to_keep: int):
"""
Removes the oldest files (KML and associated PNGs) from the specified
directory if the total count exceeds the configured limit.
Relies on timestamps embedded in filenames (YYYYMMDD_HHMMSS_fff format).
Args:
kml_directory (str): The directory containing KML and PNG files.
max_files_to_keep (int): The maximum number of files (KML+PNG) to retain.
Cleanup is skipped if <= 0.
"""
log_prefix = "[Utils Cleanup KMLDir]"
# Skip if cleanup is disabled or directory invalid
if max_files_to_keep <= 0:
return
try:
kml_dir_path = Path(kml_directory)
if not kml_dir_path.is_dir():
logging.warning(
f"{log_prefix} KML directory not found: '{kml_directory}'. Cannot cleanup."
f"{log_prefix} Output directory not found: '{kml_directory}'. Cannot cleanup."
)
return
# Get list of KML files with modification times
kml_files = []
for item in kml_dir_path.glob("*.kml"):
if item.is_file():
try:
# Get modification time
mtime = item.stat().st_mtime
kml_files.append((item, mtime))
except Exception as stat_e:
# Log error getting stats but continue with others
logging.warning(
f"{log_prefix} Could not stat file '{item.name}': {stat_e}"
)
# --- 1. Identify Files and Extract Timestamps ---
all_files_with_ts: List[Tuple[Path, str]] = []
# Regex to capture the timestamp string from supported filename patterns
timestamp_pattern = re.compile(
r"^(?:sar_footprint_|composite_|multi_point_|sar_overlay_)"
r"(\d{8}_\d{6}_\d{3})\."
r"(?:kml|png)$"
)
current_count = len(kml_files)
# Only proceed if count exceeds the limit
if current_count <= max_files_to_keep:
logging.debug(f"{log_prefix} Scanning directory: {kml_dir_path}")
# Iterate through all files in the directory
for item in kml_dir_path.iterdir():
if item.is_file():
match = timestamp_pattern.match(item.name)
if match:
timestamp_str = match.group(1) # Extract timestamp string
all_files_with_ts.append((item, timestamp_str))
else:
pass # Skip files not matching the pattern
# --- 2. Check if Cleanup is Needed ---
current_total_count = len(all_files_with_ts)
logging.debug(f"{log_prefix} Found {current_total_count} potentially manageable files.")
if current_total_count <= max_files_to_keep:
logging.debug(f"{log_prefix} File count within limit ({max_files_to_keep}). No cleanup needed.")
return
# Sort files by modification time (oldest first)
kml_files.sort(key=lambda x: x[1])
# --- 3. Sort Files by Timestamp (Oldest First) ---
all_files_with_ts.sort(key=lambda x: x[1])
logging.debug(f"{log_prefix} Sorted files by timestamp.")
# Determine number of files to delete
num_to_delete = current_count - max_files_to_keep
# Get the list of files to delete
files_to_delete = kml_files[:num_to_delete]
# --- 4. Determine Files to Delete ---
num_to_delete = current_total_count - max_files_to_keep
files_to_delete = all_files_with_ts[:num_to_delete] # Get the oldest ones
logging.info(
f"{log_prefix} Found {current_count} KML files. "
f"Attempting to delete oldest {num_to_delete}..."
f"{log_prefix} Found {current_total_count} files. "
f"Attempting to delete oldest {num_to_delete} (KML and PNG)..."
)
# --- 5. Delete Oldest Files ---
deleted_count = 0
# Iterate and delete oldest files
for file_path, _ in files_to_delete:
for file_path, ts_str in files_to_delete:
try:
file_path.unlink() # Delete the file
logging.debug(f"{log_prefix} Deleted old KML: {file_path.name}")
file_path.unlink() # Delete the file
logging.debug(f"{log_prefix} Deleted old file: {file_path.name} (TS: {ts_str})")
deleted_count += 1
except Exception as delete_e:
# Log error deleting specific file but continue
logging.error(
f"{log_prefix} Failed to delete file '{file_path.name}': {delete_e}"
)
@ -922,7 +1115,7 @@ def cleanup_old_kml_files(kml_directory: str, max_files_to_keep: int):
)
except Exception as e:
logging.exception(f"{log_prefix} Error during KML cleanup process:")
logging.exception(f"{log_prefix} Error during KML/PNG cleanup process:")
# --- END OF FILE utils.py ---
# --- END OF FILE utils.py ---