SXXXXXXX_ControlPanel/utils.py
2025-04-28 12:39:47 +02:00

1121 lines
44 KiB
Python

# --- START OF FILE utils.py ---
# utils.py
"""
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED.
Provides utility functions used throughout the application, including
safe queue management (put, clear), coordinate formatting (decimal degrees to DMS
and DMS string to decimal), KML generation and management, and opening web maps.
Uses standardized logging prefixes. Drop counts are managed within AppState.
"""
# Standard library imports
import queue
import logging
import math
import os
import datetime
import sys
import subprocess
import shutil
from pathlib import Path
import webbrowser
import urllib.parse
import re # For DMS parsing
from typing import Optional, Tuple, List, Any, Dict
import tempfile
import ctypes # Needed for format_ctypes_structure
# Third-party imports (ensure these are installed)
import numpy as np
# --- Availability Flags & Library Imports ---
# Define flags *before* use, handle ImportErrors
_simplekml_available = False
try:
import simplekml
_simplekml_available = True
except ImportError:
simplekml = None
logging.warning(
"[Utils Init] Library 'simplekml' not found. KML generation disabled. "
"(pip install simplekml)"
)
_pyproj_available = False
try:
import pyproj
_pyproj_available = True
except ImportError:
pyproj = None
logging.warning(
"[Utils Init] Library 'pyproj' not found. "
"Some geometric calculations (KML corners, BBox) may be less accurate. "
"(pip install pyproj)"
)
_cv2_available = False
try:
import cv2
_cv2_available = True
except ImportError:
cv2 = None
logging.warning(
"[Utils Init] Library 'opencv-python' not found. "
"Cannot save images for KML overlays."
)
_lxml_available = False
try:
from lxml import etree
_lxml_available = True
except ImportError:
etree = None
logging.warning("[Utils Init] Library 'lxml' not found. Cannot use gx:LatLonQuad fallback.")
# --- Local application imports (after third-party and flag definitions) ---
import config # Import config after potential library warnings
_apply_color_palette_available = False
try:
# Import only after cv2 availability check might be safer, but fine here for now
from image_processing import apply_color_palette
_apply_color_palette_available = True
except ImportError:
apply_color_palette = None
logging.warning(
"[Utils Init] Function 'apply_color_palette' not found. "
"Color palettes in KML overlay might not work."
)
# --- Queue Management Functions ---
def put_queue(queue_obj, item, queue_name="Unknown", app_instance=None):
"""
Safely puts an item into a queue using non-blocking put.
Increments specific drop counters in the AppState object if the queue is full.
Discards item if the application is shutting down.
Args:
queue_obj (queue.Queue): The queue instance.
item (Any): The item to put into the queue.
queue_name (str): Name of the queue (for logging/stats).
app_instance (Optional[ControlPanelApp]): Reference to the main app instance
(for shutdown check and drop count).
"""
log_prefix = "[Utils Queue Put]"
# Check shutdown flag via app_instance's state
if (
app_instance
and hasattr(app_instance, "state")
and app_instance.state.shutting_down
):
return # Silently discard if shutting down
try:
# Put item without blocking
queue_obj.put(item, block=False)
except queue.Full:
# Log queue full warning
logging.warning(f"{log_prefix} Queue '{queue_name}' is full. Dropping item.")
# Increment the specific drop counter via the AppState instance
if app_instance and hasattr(app_instance, "state"):
try:
app_instance.state.increment_dropped_count(queue_name)
except Exception as e:
logging.error(
f"{log_prefix} Failed to increment drop count for '{queue_name}': {e}"
)
else:
logging.error(
f"{log_prefix} Cannot increment drop count for '{queue_name}': "
"App instance or state missing."
)
except Exception as e:
# Log any other unexpected errors during queue put
logging.exception(
f"{log_prefix} Unexpected error putting item onto queue '{queue_name}': {e}"
)
def clear_queue(q: queue.Queue):
"""Removes all items currently in the specified queue."""
log_prefix = "[Utils Queue Clear]"
try:
q_size_before = q.qsize() # Get approximate size before clearing
# Ensure thread safety during clear operation
with q.mutex:
q.queue.clear()
# Reset task tracking if used
if hasattr(q, 'unfinished_tasks'):
q.unfinished_tasks = 0
logging.debug(
f"{log_prefix} Cleared queue (approx size before: {q_size_before})."
)
except Exception as e:
logging.exception(f"{log_prefix} Error clearing queue: {e}")
# --- Coordinate Formatting ---
def decimal_to_dms(decimal_degrees: float, is_latitude: bool) -> str:
"""
Converts decimal degrees to a formatted DMS (Degrees, Minutes, Seconds) string.
Returns "N/A", "Invalid Lat/Lon", or "Error DMS" on error.
Args:
decimal_degrees (float): Coordinate value in decimal degrees.
is_latitude (bool): True if latitude, False if longitude.
Returns:
str: Formatted DMS string or an error indicator.
"""
log_prefix = "[Utils DMS Conv]"
coord_type = "Latitude" if is_latitude else "Longitude"
try:
# Input validation
if (
not isinstance(decimal_degrees, (int, float))
or math.isnan(decimal_degrees)
or math.isinf(decimal_degrees)
):
logging.warning(
f"{log_prefix} Invalid input type or value ({decimal_degrees}) "
f"for {coord_type}."
)
return "N/A"
# Range check and direction determination
if is_latitude:
if not (-90.0 <= decimal_degrees <= 90.0):
logging.warning(
f"{log_prefix} Latitude {decimal_degrees} out of range."
)
return "Invalid Lat"
direction = "N" if decimal_degrees >= 0 else "S"
deg_pad = 2 # Padding for degrees (e.g., 01°, 89°)
else: # Longitude
if not (-180.0 <= decimal_degrees <= 180.0):
logging.warning(
f"{log_prefix} Longitude {decimal_degrees} out of range."
)
return "Invalid Lon"
direction = "E" if decimal_degrees >= 0 else "W"
deg_pad = 3 # Padding for degrees (e.g., 001°, 179°)
# Calculations
dd_abs = abs(decimal_degrees)
degrees = math.floor(dd_abs)
minutes_dec = (dd_abs - degrees) * 60.0
minutes = math.floor(minutes_dec)
seconds = (minutes_dec - minutes) * 60.0
# Handle floating point precision near 60.0 for seconds/minutes
if abs(seconds - 60.0) < 1e-9:
seconds = 0.0
minutes += 1
if minutes == 60:
minutes = 0
degrees += 1
# Recalculate absolute degrees if degrees incremented past range limit
if is_latitude and degrees > 90:
degrees = 90
minutes = 0
seconds = 0.0
if not is_latitude and degrees > 180:
degrees = 180
minutes = 0
seconds = 0.0
# Format the final string
dms_string = (
f"{degrees:0{deg_pad}d}° {minutes:02d}' {seconds:05.2f}\" {direction}"
)
return dms_string
except Exception as e:
logging.exception(
f"{log_prefix} Error converting {decimal_degrees} to DMS: {e}"
)
return "Error DMS"
def dms_string_to_decimal(dms_str: str, is_latitude: bool) -> Optional[float]:
"""
Converts a DMS string (e.g., "46° 09' 50.24\" N", "009° 23' 14.11\" E")
to decimal degrees.
Args:
dms_str (str): The DMS string to parse.
is_latitude (bool): True if the string represents latitude, False for longitude.
Returns:
Optional[float]: The coordinate in decimal degrees, or None on parsing error.
"""
log_prefix = "[Utils DMS Parse]"
if not dms_str or not isinstance(dms_str, str):
logging.warning(f"{log_prefix} Invalid input string: {dms_str}")
return None
# Regex to capture degrees, minutes, seconds, and direction
pattern = re.compile(
r"^\s*(\d{1,3})\s*[°]\s*" # Degrees (1-3 digits) + ° symbol
r"(\d{1,2})\s*[']\s*" # Minutes (1-2 digits) + ' symbol
r"([\d.]+)\s*[\"”]\s*" # Seconds (digits, dot) + " or ” symbol
r"([NSEWnsew])\s*$", # Direction (N,S,E,W case-insensitive)
re.IGNORECASE,
)
match = pattern.match(dms_str.strip())
if not match:
logging.error(f"{log_prefix} Could not parse DMS string format: '{dms_str}'")
return None
try:
# Extract components as floats
degrees = float(match.group(1))
minutes = float(match.group(2))
seconds = float(match.group(3))
direction = match.group(4).upper()
# Validate component ranges
if not (0 <= minutes < 60 and 0 <= seconds < 60):
logging.error(
f"{log_prefix} Invalid minutes ({minutes}) or seconds ({seconds}) "
f"in DMS: '{dms_str}'"
)
return None
# Validate direction based on coordinate type
valid_dirs = ("N", "S") if is_latitude else ("E", "W")
if direction not in valid_dirs:
coord_type = "latitude" if is_latitude else "longitude"
logging.error(
f"{log_prefix} Invalid direction '{direction}' for {coord_type}: "
f"'{dms_str}'"
)
return None
# Calculate decimal degrees
decimal_degrees = degrees + (minutes / 60.0) + (seconds / 3600.0)
# Apply sign based on direction
if direction in ("S", "W"):
decimal_degrees *= -1.0
# Final range check for the calculated decimal value
if is_latitude and not (-90.0 <= decimal_degrees <= 90.0):
logging.warning(
f"{log_prefix} Calculated latitude {decimal_degrees:.7f} out of range."
)
return None # Treat out of range as error
if not is_latitude and not (-180.0 <= decimal_degrees <= 180.0):
logging.warning(
f"{log_prefix} Calculated longitude {decimal_degrees:.7f} out of range."
)
return None
logging.debug(f"{log_prefix} Parsed '{dms_str}' -> {decimal_degrees:.7f}")
return decimal_degrees
except ValueError as ve:
# Handle errors during float conversion
logging.error(
f"{log_prefix} Error converting components to float: {ve} in '{dms_str}'"
)
return None
except Exception as e:
logging.exception(
f"{log_prefix} Unexpected error parsing DMS string '{dms_str}':"
)
return None
# --- Metadata Formatting Function ---
def format_ctypes_structure(structure: ctypes.Structure, indent_level: int = 0) -> str:
"""
Generates a formatted string representation of a ctypes Structure,
handling nested structures, arrays, and basic types recursively.
Args:
structure: The ctypes.Structure instance to format.
indent_level: The current indentation level for nested structures.
Returns:
A multi-line string representing the structure's content.
"""
indent = " " * indent_level # Define indentation string
result = ""
# Check if it's actually a structure with _fields_ attribute
if not hasattr(structure, "_fields_"):
# Fallback for non-structures or basic types passed accidentally
return f"{indent}Value: {repr(structure)}\n"
# Iterate through the fields defined in the structure
for field_name, field_type in structure._fields_:
try:
# Get the value of the current field
value = getattr(structure, field_name)
formatted_value = ""
# --- Handle Nested Structures ---
if hasattr(field_type, "_fields_"):
# If the field type itself is another Structure
result += f"{indent}{field_name} ({field_type.__name__}):\n"
# Recursively call formatting for the nested structure
result += format_ctypes_structure(value, indent_level + 1)
# --- Handle Arrays ---
elif issubclass(field_type, ctypes.Array):
array_len = getattr(field_type, "_length_", "N/A")
elem_type = getattr(field_type, "_type_", "N/A")
elem_name = getattr(elem_type, "__name__", "?")
# Special handling for byte arrays (show hex preview)
if issubclass(elem_type, (ctypes.c_byte, ctypes.c_ubyte)):
try:
# Attempt conversion to bytes for hex preview
byte_value = bytes(value[:16]) # Limit preview length
preview = byte_value.hex().upper()
if len(value) > 16:
preview += "..."
formatted_value = f"<{elem_name}[{array_len}] Data: {preview}>"
except: # Fallback if conversion to bytes fails
formatted_value = f"<{elem_name}[{array_len}] (Preview N/A)>"
# Handle simple numeric arrays (show first few elements)
elif isinstance(value, (list, tuple)):
preview = str(list(value[:8])) # Limit preview length
if len(value) > 8:
# Indicate truncation
preview = preview[:-1] + ", ...]"
formatted_value = f"<{elem_name}[{array_len}] Data: {preview}>"
else:
# For other array types, just show type and size
formatted_value = f"<{elem_name}[{array_len}]>"
result += f"{indent}{field_name}: {formatted_value}\n"
# --- Handle Basic Types ---
else:
# Special Formatting for Known Fields
if field_name in (
"LATITUDE",
"LONGITUDE",
"ORIENTATION",
"POI_LATITUDE",
"POI_LONGITUDE",
"POI_ORIENTATION",
) and isinstance(value, float):
if math.isfinite(value):
deg_value = math.degrees(value)
prec = 2 if "ORIENTATION" in field_name else 6
formatted_value = f"{value:.7f} rad ({deg_value:.{prec}f} deg)"
else:
formatted_value = f"{value} rad (Non-finite)"
elif field_name in (
"SFP_MARKER",
"SFP_PT_SPARE",
"SFP_TAG",
"SFP_SRC",
"SFP_TID",
"SFP_FLAGS",
"SFP_WIN",
"SFP_ERR",
"SFP_ERR_INFO",
"SFP_RECTYPE",
"SFP_RECSPARE",
"SFP_PLDAP",
"SFP_PLEXT",
) and isinstance(value, int):
formatted_value = f"0x{value:02X} ({value})"
elif field_name in ("SFP_DIRECTION", "SFP_FLOW") and isinstance(
value, int
):
char_repr = chr(value) if 32 <= value <= 126 else "?"
formatted_value = f"0x{value:02X} ('{char_repr}')"
elif isinstance(value, bytes):
try:
decoded = value.decode("ascii", errors="replace")
hex_preview = value.hex().upper()[:32] # Limit hex preview
suffix = "..." if len(value) > 16 else ""
formatted_value = f"'{decoded}' (hex: {hex_preview}{suffix})"
except:
formatted_value = repr(value) # Fallback if not decodable
else:
# Default representation for other types
formatted_value = repr(value)
result += f"{indent}{field_name}: {formatted_value}\n"
except AttributeError:
# Handle cases where getattr might fail
result += f"{indent}{field_name}: <Attribute Error>\n"
except Exception as e:
# Catch any other formatting errors for a specific field
result += f"{indent}{field_name}: <Error Formatting: {e}>\n"
return result
# --- Google Maps Launcher ---
def open_google_maps(latitude_deg: float, longitude_deg: float):
"""Opens the default web browser to Google Maps centered on the specified coordinates."""
log_prefix = "[Utils Gmaps]"
# Validate coordinates
if not (
isinstance(latitude_deg, (int, float))
and math.isfinite(latitude_deg)
and -90 <= latitude_deg <= 90
):
logging.error(f"{log_prefix} Invalid latitude: {latitude_deg}.")
return
if not (
isinstance(longitude_deg, (int, float))
and math.isfinite(longitude_deg)
and -180 <= longitude_deg <= 180
):
logging.error(f"{log_prefix} Invalid longitude: {longitude_deg}.")
return
try:
# Construct URL using the search API for better pinning
query_str = f"{latitude_deg:.7f},{longitude_deg:.7f}"
# URL encode the query string
encoded_query = urllib.parse.quote(query_str)
gmaps_url = f"https://www.google.com/maps/search/?api=1&query={encoded_query}"
logging.info(f"{log_prefix} Opening Google Maps URL: {gmaps_url}")
# Open in a new tab if possible
opened_ok = webbrowser.open_new_tab(gmaps_url)
if not opened_ok:
logging.warning(f"{log_prefix} webbrowser.open_new_tab returned False.")
# Fallback attempt if new tab fails
webbrowser.open(gmaps_url)
except Exception as e:
logging.exception(f"{log_prefix} Failed to open Google Maps URL:")
# --- KML Generation and Management ---
def _calculate_geo_corners_for_kml(
geo_info_radians: Dict[str, Any],
) -> Optional[List[Tuple[float, float]]]:
"""Internal helper to calculate geographic corners (degrees) from geo_info (radians). Requires pyproj."""
# Check if pyproj is available
if not _pyproj_available:
logging.error("[Utils KML Calc] pyproj library needed for corner calculation.")
return None
log_prefix = "[Utils KML Calc]"
try:
# Define the geodetic calculator (WGS84 ellipsoid)
geod = pyproj.Geod(ellps="WGS84")
# Extract necessary info from the dictionary
center_lat_rad = geo_info_radians["lat"]
center_lon_rad = geo_info_radians["lon"]
orient_rad = geo_info_radians["orientation"]
calc_orient_rad = -orient_rad # Angle used for projection
ref_x = geo_info_radians["ref_x"]
ref_y = geo_info_radians["ref_y"]
scale_x = geo_info_radians["scale_x"]
scale_y = geo_info_radians["scale_y"]
width = geo_info_radians["width_px"]
height = geo_info_radians["height_px"]
# Validate inputs
if not (scale_x > 0 and scale_y > 0 and width > 0 and height > 0):
logging.error(f"{log_prefix} Invalid scale or dimensions in geo_info.")
return None
# Define corner pixel coordinates relative to reference pixel
corners_pixel_delta = [
(0 - ref_x, ref_y - 0), # Top-Left Delta
(width - 1 - ref_x, ref_y - 0), # Top-Right Delta
(width - 1 - ref_x, ref_y - (height - 1)), # Bottom-Right Delta
(0 - ref_x, ref_y - (height - 1)), # Bottom-Left Delta
]
# Convert pixel deltas to meter deltas (unrotated)
corners_meters = [
(dx * scale_x, dy * scale_y) for dx, dy in corners_pixel_delta
]
# Rotate meter deltas based on orientation if significant
corners_meters_rotated = corners_meters
if abs(calc_orient_rad) > 1e-6: # Only rotate if angle is non-zero
cos_o = math.cos(calc_orient_rad)
sin_o = math.sin(calc_orient_rad)
corners_meters_rotated = [
(dx * cos_o - dy * sin_o, dx * sin_o + dy * cos_o)
for dx, dy in corners_meters
]
# Calculate geographic coordinates for each corner using forward projection
corners_geo_deg = []
center_lon_deg = math.degrees(center_lon_rad)
center_lat_deg = math.degrees(center_lat_rad)
# Loop through rotated meter deltas
for dx_m, dy_m in corners_meters_rotated:
# Calculate distance and azimuth from center to corner
distance = math.hypot(dx_m, dy_m)
azimuth = math.degrees(math.atan2(dx_m, dy_m)) % 360.0
# Use pyproj geod.fwd for accurate projection
lon, lat, _ = geod.fwd(center_lon_deg, center_lat_deg, azimuth, distance)
corners_geo_deg.append((lon, lat)) # Store (longitude, latitude)
# Ensure we have exactly 4 corners
if len(corners_geo_deg) == 4:
return corners_geo_deg
else:
logging.error(f"{log_prefix} Incorrect number of corners calculated.")
return None
except KeyError as ke:
logging.error(f"{log_prefix} Missing required key in geo_info_radians: {ke}")
return None
except Exception as e:
logging.exception(f"{log_prefix} Error calculating geographic corners for KML:")
return None
def generate_sar_kml(geo_info_radians: Dict[str, Any], output_path: str) -> bool:
"""
Generates a KML file representing the SAR footprint area as a polygon.
Args:
geo_info_radians (Dict[str, Any]): Dictionary with SAR georeferencing.
output_path (str): The full path where the KML file should be saved.
Returns:
bool: True if KML generation and saving were successful, False otherwise.
"""
log_prefix = "[Utils KML Gen]"
# Check for required libraries
if not _simplekml_available or not _pyproj_available:
logging.error(f"{log_prefix} Cannot generate KML: simplekml or pyproj missing.")
return False
# Check for valid input geo_info
if not geo_info_radians or not geo_info_radians.get("valid", False):
logging.warning(f"{log_prefix} Cannot generate KML: Invalid geo_info provided.")
return False
try:
# Calculate geographic corner coordinates
corners_deg = _calculate_geo_corners_for_kml(geo_info_radians)
if corners_deg is None:
logging.error(f"{log_prefix} Failed to calculate KML corners.")
return False # Error already logged by helper
# Extract center and orientation for LookAt view
center_lon_deg = math.degrees(geo_info_radians["lon"])
center_lat_deg = math.degrees(geo_info_radians["lat"])
orientation_deg = math.degrees(geo_info_radians.get("orientation", 0.0))
# Estimate a reasonable viewing altitude based on image size
try:
width_km = (
geo_info_radians.get("scale_x", 1.0)
* geo_info_radians.get("width_px", 1000)
) / 1000.0
height_km = (
geo_info_radians.get("scale_y", 1.0)
* geo_info_radians.get("height_px", 1000)
) / 1000.0
view_altitude_m = max(width_km, height_km) * 2000.0
view_altitude_m = max(1000.0, min(view_altitude_m, 500000.0))
except:
view_altitude_m = 10000.0 # Default altitude on error
# Create KML object
kml = simplekml.Kml(name=f"SAR Image {datetime.datetime.now():%Y%m%d_%H%M%S}")
# Set LookAt view
kml.document.lookat.longitude = center_lon_deg
kml.document.lookat.latitude = center_lat_deg
kml.document.lookat.range = view_altitude_m
kml.document.lookat.tilt = 45.0
kml.document.lookat.heading = orientation_deg % 360.0
kml.document.lookat.altitudemode = simplekml.AltitudeMode.relativetoground
# Create the polygon footprint
outer_boundary = [(lon, lat, 0) for lon, lat in corners_deg] + [
(corners_deg[0][0], corners_deg[0][1], 0)
]
polygon = kml.newpolygon(name="SAR Footprint", outerboundaryis=outer_boundary)
# Style the polygon
polygon.style.linestyle.color = simplekml.Color.red
polygon.style.linestyle.width = 2
polygon.style.polystyle.color = simplekml.Color.changealphaint(
100, simplekml.Color.red
)
polygon.style.polystyle.outline = 1
# Save the KML file
kml.save(output_path)
logging.debug(f"{log_prefix} KML file saved successfully: {output_path}")
return True
except Exception as e:
logging.exception(f"{log_prefix} Error generating or saving KML file:")
return False
def generate_lookat_and_point_kml(
latitude_deg: float,
longitude_deg: float,
altitude_m: float = 10000.0,
tilt: float = 45.0,
heading: float = 0.0,
placemark_name: str = "Selected Location",
placemark_desc: Optional[str] = None,
) -> Optional[str]:
"""
Generates a temporary KML file containing a LookAt element and a Placemark point.
Args:
latitude_deg (float): Target latitude in decimal degrees.
longitude_deg (float): Target longitude in decimal degrees.
altitude_m (float): Viewing altitude in meters for LookAt.
tilt (float): Camera tilt angle for LookAt.
heading (float): Camera heading/azimuth for LookAt.
placemark_name (str): Name for the placemark.
placemark_desc (Optional[str]): Optional description for the placemark.
Returns:
Optional[str]: Path to the temporary KML file, or None on error.
"""
log_prefix = "[Utils KML LookAtPoint]"
if not _simplekml_available:
logging.error(f"{log_prefix} Cannot generate KML: simplekml missing.")
return None
# Validate coordinates
if not (
isinstance(latitude_deg, (int, float))
and math.isfinite(latitude_deg)
and -90 <= latitude_deg <= 90
):
logging.error(f"{log_prefix} Invalid latitude provided: {latitude_deg}.")
return None
if not (
isinstance(longitude_deg, (int, float))
and math.isfinite(longitude_deg)
and -180 <= longitude_deg <= 180
):
logging.error(f"{log_prefix} Invalid longitude provided: {longitude_deg}.")
return None
try:
kml = simplekml.Kml(name=placemark_name)
# Set LookAt parameters
kml.document.lookat.longitude = longitude_deg
kml.document.lookat.latitude = latitude_deg
kml.document.lookat.range = max(100.0, altitude_m)
kml.document.lookat.tilt = tilt
kml.document.lookat.heading = heading % 360.0
kml.document.lookat.altitudemode = simplekml.AltitudeMode.relativetoground
# Add the Placemark point
point = kml.newpoint(name=placemark_name)
point.coords = [(longitude_deg, latitude_deg)] # (lon, lat) order
if placemark_desc:
point.description = placemark_desc
# Create a temporary file
with tempfile.NamedTemporaryFile(
mode="w", suffix=".kml", delete=False, encoding="utf-8"
) as temp_kml:
temp_kml_path = temp_kml.name
logging.debug(
f"{log_prefix} Saving temporary LookAt+Point KML to: {temp_kml_path}"
)
temp_kml.write(kml.kml())
logging.debug(f"{log_prefix} LookAt+Point KML file created successfully.")
return temp_kml_path
except Exception as e:
logging.exception(f"{log_prefix} Error generating LookAt+Point KML file:")
return None
def generate_composite_kml(
points_data: List[Tuple[float, float, str, Optional[str]]],
sar_image_processed: np.ndarray, # Assumed BGR uint8
geo_info_radians: Dict[str, Any],
) -> Optional[str]:
"""
Generates a KML file including placemarks, SAR footprint polygon, and
a GroundOverlay using gx:LatLonQuad for accurate image placement.
Saves the KML and a temporary PNG overlay image to the configured KML output directory.
Args:
points_data (List[Tuple[float, float, str, Optional[str]]]):
List of (latitude_deg, longitude_deg, placemark_name, optional_desc).
sar_image_processed (np.ndarray): The SAR image processed for display
(uint8 BGR, after B/C and palette).
geo_info_radians (Dict[str, Any]): The SAR georeferencing info (radians).
Returns:
Optional[str]: Full path to the saved KML file, or None on error.
"""
log_prefix = "[Utils KML Composite]"
# Library checks (using flags defined globally)
if not _simplekml_available or not _pyproj_available:
logging.error(f"{log_prefix} Cannot generate KML: simplekml or pyproj missing.")
return None
if not _cv2_available: # Check OpenCV flag
logging.error(f"{log_prefix} Cannot generate KML overlay: OpenCV (cv2) missing.")
return None
# Input data checks
if sar_image_processed is None or sar_image_processed.size == 0:
logging.error(f"{log_prefix} Cannot generate KML: Invalid SAR image provided.")
return None
if not geo_info_radians or not geo_info_radians.get("valid", False):
logging.error(f"{log_prefix} Cannot generate KML: Invalid geo_info provided.")
return None
png_output_path = None # Define variable before try block for cleanup
try:
kml_dir_path = Path(config.KML_OUTPUT_DIRECTORY)
# Ensure output directory exists
try:
kml_dir_path.mkdir(parents=True, exist_ok=True)
except OSError as e:
logging.error(
f"{log_prefix} Failed to create KML directory '{kml_dir_path}': {e}"
)
return None
# --- 1. Save Processed SAR Image as Temporary PNG ---
ts_img = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
png_filename = f"sar_overlay_{ts_img}.png"
png_output_path = kml_dir_path / png_filename # Path defined here
kml_image_href = png_filename # Relative path for KML Icon href
try:
logging.debug(
f"{log_prefix} Saving processed SAR image to {png_output_path}..."
)
# Ensure uint8 and BGR before saving with OpenCV
img_to_save = sar_image_processed
if img_to_save.dtype != np.uint8:
img_to_save = np.clip(img_to_save, 0, 255).astype(np.uint8)
if img_to_save.ndim == 2:
img_to_save = cv2.cvtColor(img_to_save, cv2.COLOR_GRAY2BGR)
save_success = cv2.imwrite(str(png_output_path), img_to_save)
if not save_success:
raise IOError("cv2.imwrite failed to save PNG.")
logging.debug(f"{log_prefix} SAR overlay PNG saved successfully.")
except Exception as img_save_err:
logging.exception(f"{log_prefix} Failed to save SAR overlay PNG:")
return None # Cannot proceed without the image
# --- 2. Calculate Geographic Corners ---
corners_deg = _calculate_geo_corners_for_kml(geo_info_radians)
if corners_deg is None:
logging.error(f"{log_prefix} Failed to calculate SAR corners for KML.")
png_output_path.unlink(missing_ok=True) # Attempt cleanup
return None
# --- 3. Create KML Document ---
center_lon_deg = math.degrees(geo_info_radians["lon"])
center_lat_deg = math.degrees(geo_info_radians["lat"])
kml_doc_name = f"GE_All_{ts_img}"
kml = simplekml.Kml(name=kml_doc_name)
# --- 4. Set LookAt View (Center on SAR) ---
try:
width_km = (
geo_info_radians.get("scale_x", 1.0)
* geo_info_radians.get("width_px", 1000)
) / 1000.0
height_km = (
geo_info_radians.get("scale_y", 1.0)
* geo_info_radians.get("height_px", 1000)
) / 1000.0
view_altitude_m = max(width_km, height_km) * 2500.0
view_altitude_m = max(1000.0, min(view_altitude_m, 500000.0))
except:
view_altitude_m = 25000.0
kml.document.lookat.longitude = center_lon_deg
kml.document.lookat.latitude = center_lat_deg
kml.document.lookat.range = view_altitude_m
kml.document.lookat.tilt = 0
kml.document.lookat.heading = 0
kml.document.lookat.altitudemode = simplekml.AltitudeMode.clamptoground
# --- 5. Add Placemarks (Points) ---
style_sar_center = simplekml.Style()
style_sar_center.iconstyle.icon.href = (
"http://maps.google.com/mapfiles/kml/paddle/red-stars.png"
)
style_sar_center.iconstyle.scale = 1.1
style_mouse_on_sar = simplekml.Style()
style_mouse_on_sar.iconstyle.icon.href = (
"http://maps.google.com/mapfiles/kml/paddle/blu-circle.png"
)
style_mouse_on_sar.iconstyle.scale = 1.1
style_mouse_on_map = simplekml.Style()
style_mouse_on_map.iconstyle.icon.href = (
"http://maps.google.com/mapfiles/kml/paddle/grn-diamond.png"
)
style_mouse_on_map.iconstyle.scale = 1.1
folder_points = kml.newfolder(name="Points")
logging.debug(f"{log_prefix} Adding {len(points_data)} placemarks to KML.")
for lat, lon, name, desc in points_data:
point = folder_points.newpoint(name=name)
point.coords = [(lon, lat)]
if desc:
point.description = desc
# Apply style based on name
if name == "SAR Center":
point.style = style_sar_center
elif name == "Mouse on SAR":
point.style = style_mouse_on_sar
elif name == "Mouse on Map":
point.style = style_mouse_on_map
# --- 6. Add SAR Footprint Polygon ---
folder_footprint = kml.newfolder(name="SAR Footprint")
outer_boundary = [(lon, lat, 0) for lon, lat in corners_deg] + [
(corners_deg[0][0], corners_deg[0][1], 0)
]
polygon = folder_footprint.newpolygon(
name="SAR Outline",
outerboundaryis=outer_boundary
)
polygon.extrude = 0
polygon.altitudemode = simplekml.AltitudeMode.clamptoground
polygon.style.linestyle.color = simplekml.Color.red
polygon.style.linestyle.width = 2
polygon.style.polystyle.fill = 0 # No fill
# --- 7. Add SAR Ground Overlay using gx:LatLonQuad (Manual XML) ---
folder_overlay = kml.newfolder(name="SAR Overlay")
# Prepare coordinates string: BL, BR, TR, TL
bl_lon, bl_lat = corners_deg[3]
br_lon, br_lat = corners_deg[2]
tr_lon, tr_lat = corners_deg[1]
tl_lon, tl_lat = corners_deg[0]
coord_string = (
f"{bl_lon:.8f},{bl_lat:.8f},0 "
f"{br_lon:.8f},{br_lat:.8f},0 "
f"{tr_lon:.8f},{tr_lat:.8f},0 "
f"{tl_lon:.8f},{tl_lat:.8f},0"
)
kml_added_manually = False
if _lxml_available: # <<< Use global flag
try:
kml_folder_element = getattr(folder_overlay, '_elem', None)
if kml_folder_element is None:
raise AttributeError("Cannot access internal simplekml element (_elem).")
GX_NAMESPACE = "http://www.google.com/kml/ext/2.2"
GX = "{%s}" % GX_NAMESPACE
go_elem = etree.Element("GroundOverlay")
etree.SubElement(go_elem, "name").text = "SAR Image"
icon_elem = etree.SubElement(go_elem, "Icon")
etree.SubElement(icon_elem, "href").text = kml_image_href
etree.SubElement(go_elem, "altitudeMode").text = "clampToGround"
quad_elem = etree.SubElement(go_elem, GX + "LatLonQuad")
etree.SubElement(quad_elem, "coordinates").text = coord_string
kml_folder_element.append(go_elem)
kml_added_manually = True
logging.debug(f"{log_prefix} Manually added GroundOverlay with gx:LatLonQuad using lxml.")
except ImportError: # Should not happen if _lxml_available is True
logging.error(f"{log_prefix} lxml import failed despite initial check.")
# Do NOT assign to _lxml_available here (avoid UnboundLocalError)
except AttributeError as ae:
logging.error(f"{log_prefix} Error accessing simplekml internal element for manual KML addition: {ae}")
except Exception as xml_err:
logging.exception(f"{log_prefix} Error manually constructing KML overlay XML: {xml_err}")
# --- Fallback if manual addition failed or lxml not available ---
if not kml_added_manually:
logging.warning(f"{log_prefix} Failed to add gx:LatLonQuad. Falling back to standard GroundOverlay with LatLonBox/Rotation (may look distorted).")
ground = folder_overlay.newgroundoverlay(name="SAR Image (Fallback)")
ground.icon.href = kml_image_href
lons = [c[0] for c in corners_deg]
lats = [c[1] for c in corners_deg]
north = max(lats)
south = min(lats)
east = max(lons)
west = min(lons)
orientation_deg = math.degrees(geo_info_radians.get("orientation", 0.0))
ground.latlonbox.north = north
ground.latlonbox.south = south
ground.latlonbox.east = east
ground.latlonbox.west = west
ground.latlonbox.rotation = -orientation_deg
ground.altitudemode = simplekml.AltitudeMode.clamptoground
# --- 8. Save Composite KML File ---
ts_kml = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
kml_filename = f"composite_{ts_kml}.kml"
kml_output_path = kml_dir_path / kml_filename
logging.debug(f"{log_prefix} Saving Composite KML to: {kml_output_path}")
kml_output_path.parent.mkdir(parents=True, exist_ok=True) # Ensure again
kml.save(str(kml_output_path))
logging.debug(f"{log_prefix} Composite KML file created successfully.")
return str(kml_output_path) # Return the full path
except Exception as e:
logging.exception(f"{log_prefix} Error generating composite KML file:")
# Attempt cleanup of the PNG if it was created
if png_output_path is not None and png_output_path.exists():
try:
png_output_path.unlink()
logging.debug(f"{log_prefix} Cleaned up temporary PNG: {png_output_path}")
except OSError:
logging.warning(f"{log_prefix} Failed to cleanup temporary PNG: {png_output_path}")
return None
def launch_google_earth(kml_path: Optional[str]):
"""
Attempts to open a KML file using the system's default application
or google-earth-pro directly.
Args:
kml_path (Optional[str]): The path to the KML file to launch.
"""
log_prefix = "[Utils Launch GE]"
if not kml_path or not os.path.exists(kml_path):
logging.error(
f"{log_prefix} Cannot launch: KML file path invalid or not found: {kml_path}"
)
return
logging.info(f"{log_prefix} Attempting launch for KML: {kml_path}")
try:
if sys.platform == "win32":
# Use os.startfile on Windows
os.startfile(kml_path)
elif sys.platform == "darwin":
# Use 'open' command on macOS
subprocess.run(["open", kml_path], check=True)
else:
# Use 'google-earth-pro' or 'xdg-open' on Linux/Unix
cmd = shutil.which("google-earth-pro") or shutil.which("xdg-open")
if cmd:
logging.debug(f"{log_prefix} Using command: {cmd}")
if cmd.endswith("pro"):
# Popen for GE Pro to run in background potentially
subprocess.Popen([cmd, kml_path])
else:
# run for xdg-open, wait for it?
subprocess.run([cmd, kml_path], check=True)
else:
# Log error if no suitable command found
err_msg = "Neither google-earth-pro nor xdg-open found in PATH."
logging.error(f"{log_prefix} {err_msg}")
raise FileNotFoundError(err_msg)
except Exception as e:
logging.exception(f"{log_prefix} Error launching KML handler:")
def cleanup_kml_output_directory(kml_directory: str, max_files_to_keep: int):
"""
Removes the oldest files (KML and associated PNGs) from the specified
directory if the total count exceeds the configured limit.
Relies on timestamps embedded in filenames (YYYYMMDD_HHMMSS_fff format).
Args:
kml_directory (str): The directory containing KML and PNG files.
max_files_to_keep (int): The maximum number of files (KML+PNG) to retain.
Cleanup is skipped if <= 0.
"""
log_prefix = "[Utils Cleanup KMLDir]"
# Skip if cleanup is disabled or directory invalid
if max_files_to_keep <= 0:
return
try:
kml_dir_path = Path(kml_directory)
if not kml_dir_path.is_dir():
logging.warning(
f"{log_prefix} Output directory not found: '{kml_directory}'. Cannot cleanup."
)
return
# --- 1. Identify Files and Extract Timestamps ---
all_files_with_ts: List[Tuple[Path, str]] = []
# Regex to capture the timestamp string from supported filename patterns
timestamp_pattern = re.compile(
r"^(?:sar_footprint_|composite_|multi_point_|sar_overlay_)"
r"(\d{8}_\d{6}_\d{3})\."
r"(?:kml|png)$"
)
logging.debug(f"{log_prefix} Scanning directory: {kml_dir_path}")
# Iterate through all files in the directory
for item in kml_dir_path.iterdir():
if item.is_file():
match = timestamp_pattern.match(item.name)
if match:
timestamp_str = match.group(1) # Extract timestamp string
all_files_with_ts.append((item, timestamp_str))
else:
pass # Skip files not matching the pattern
# --- 2. Check if Cleanup is Needed ---
current_total_count = len(all_files_with_ts)
logging.debug(f"{log_prefix} Found {current_total_count} potentially manageable files.")
if current_total_count <= max_files_to_keep:
logging.debug(f"{log_prefix} File count within limit ({max_files_to_keep}). No cleanup needed.")
return
# --- 3. Sort Files by Timestamp (Oldest First) ---
all_files_with_ts.sort(key=lambda x: x[1])
logging.debug(f"{log_prefix} Sorted files by timestamp.")
# --- 4. Determine Files to Delete ---
num_to_delete = current_total_count - max_files_to_keep
files_to_delete = all_files_with_ts[:num_to_delete] # Get the oldest ones
logging.info(
f"{log_prefix} Found {current_total_count} files. "
f"Attempting to delete oldest {num_to_delete} (KML and PNG)..."
)
# --- 5. Delete Oldest Files ---
deleted_count = 0
for file_path, ts_str in files_to_delete:
try:
file_path.unlink() # Delete the file
logging.debug(f"{log_prefix} Deleted old file: {file_path.name} (TS: {ts_str})")
deleted_count += 1
except Exception as delete_e:
logging.error(
f"{log_prefix} Failed to delete file '{file_path.name}': {delete_e}"
)
logging.info(
f"{log_prefix} Cleanup finished. Deleted {deleted_count}/{num_to_delete} files."
)
except Exception as e:
logging.exception(f"{log_prefix} Error during KML/PNG cleanup process:")
# --- END OF FILE utils.py ---