SXXXXXXX_ControlPanel/utils.py
2025-04-15 14:10:43 +02:00

929 lines
36 KiB
Python

# --- START OF FILE utils.py ---
# utils.py
"""
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED.
Provides utility functions used throughout the application, including
safe queue management (put, clear), coordinate formatting (decimal degrees to DMS
and DMS string to decimal), KML generation and management, and opening web maps.
Uses standardized logging prefixes. Drop counts are managed within AppState.
"""
# Standard library imports
import queue
import logging
import math
import os
import datetime
import sys
import subprocess
import shutil
from pathlib import Path
import webbrowser
import urllib.parse
import re # For DMS parsing
from typing import Optional, Tuple, List, Any, Dict # Added List, Any, Dict
import tempfile
import ctypes # Needed for format_ctypes_structure
# Import KML and GEO libraries, handling ImportError
try:
import simplekml
_simplekml_available = True
except ImportError:
simplekml = None
_simplekml_available = False
logging.warning(
"[Utils KML] Library 'simplekml' not found. KML generation disabled. "
"(pip install simplekml)"
)
try:
import pyproj
_pyproj_available = True
except ImportError:
pyproj = None
_pyproj_available = False
# Pyproj is needed for accurate geo calculations, log warning if missing
logging.warning(
"[Utils Geo] Library 'pyproj' not found. "
"Some geometric calculations (KML corners, BBox) may be less accurate. "
"(pip install pyproj)"
)
# --- Queue Management Functions ---
def put_queue(queue_obj, item, queue_name="Unknown", app_instance=None):
"""
Safely puts an item into a queue using non-blocking put.
Increments specific drop counters in the AppState object if the queue is full.
Discards item if the application is shutting down.
Args:
queue_obj (queue.Queue): The queue instance.
item (Any): The item to put into the queue.
queue_name (str): Name of the queue (for logging/stats).
app_instance (Optional[ControlPanelApp]): Reference to the main app instance
(for shutdown check and drop count).
"""
log_prefix = "[Utils Queue Put]"
# Check shutdown flag via app_instance's state
if (
app_instance
and hasattr(app_instance, "state")
and app_instance.state.shutting_down
):
# Silently discard if shutting down (or log debug -1 if needed)
# logging.log(logging.DEBUG - 1, f"{log_prefix} Discarding item for queue '{queue_name}': Shutdown.")
return
try:
# Put item without blocking
queue_obj.put(item, block=False)
# Log successful put at very low debug level if needed
# logging.log(logging.DEBUG - 1, f"{log_prefix} Item put onto queue '{queue_name}'.")
except queue.Full:
# Log queue full warning (consider reducing level if too noisy)
logging.warning(f"{log_prefix} Queue '{queue_name}' is full. Dropping item.")
# Increment the specific drop counter via the AppState instance
if app_instance and hasattr(app_instance, "state"):
try:
app_instance.state.increment_dropped_count(queue_name)
except Exception as e:
# Log error only if incrementing fails (shouldn't happen)
logging.error(
f"{log_prefix} Failed to increment drop count for '{queue_name}': {e}"
)
else:
# Log error if app instance/state is not available to increment count
logging.error(
f"{log_prefix} Cannot increment drop count for '{queue_name}': "
"App instance or state missing."
)
except Exception as e:
# Log any other unexpected errors during queue put
logging.exception(
f"{log_prefix} Unexpected error putting item onto queue '{queue_name}': {e}"
)
def clear_queue(q: queue.Queue):
"""Removes all items currently in the specified queue."""
log_prefix = "[Utils Queue Clear]"
try:
q_size_before = q.qsize() # Get approximate size before clearing
# Ensure thread safety during clear operation
with q.mutex:
q.queue.clear()
logging.debug(
f"{log_prefix} Cleared queue (approx size before: {q_size_before})."
)
except Exception as e:
logging.exception(f"{log_prefix} Error clearing queue: {e}")
# --- Coordinate Formatting ---
def decimal_to_dms(decimal_degrees: float, is_latitude: bool) -> str:
"""
Converts decimal degrees to a formatted DMS (Degrees, Minutes, Seconds) string.
Returns "N/A", "Invalid Lat/Lon", or "Error DMS" on error.
Args:
decimal_degrees (float): Coordinate value in decimal degrees.
is_latitude (bool): True if latitude, False if longitude.
Returns:
str: Formatted DMS string or an error indicator.
"""
log_prefix = "[Utils DMS Conv]"
coord_type = "Latitude" if is_latitude else "Longitude"
try:
# Input validation
if (
not isinstance(decimal_degrees, (int, float))
or math.isnan(decimal_degrees)
or math.isinf(decimal_degrees)
):
logging.warning(
f"{log_prefix} Invalid input type or value ({decimal_degrees}) "
f"for {coord_type}."
)
return "N/A"
# Range check and direction determination
if is_latitude:
if not (-90.0 <= decimal_degrees <= 90.0):
logging.warning(
f"{log_prefix} Latitude {decimal_degrees} out of range."
)
return "Invalid Lat"
direction = "N" if decimal_degrees >= 0 else "S"
deg_pad = 2 # Padding for degrees (e.g., 01°, 89°)
else: # Longitude
if not (-180.0 <= decimal_degrees <= 180.0):
logging.warning(
f"{log_prefix} Longitude {decimal_degrees} out of range."
)
return "Invalid Lon"
direction = "E" if decimal_degrees >= 0 else "W"
deg_pad = 3 # Padding for degrees (e.g., 001°, 179°)
# Calculations
dd_abs = abs(decimal_degrees)
degrees = math.floor(dd_abs)
minutes_dec = (dd_abs - degrees) * 60.0
minutes = math.floor(minutes_dec)
seconds = (minutes_dec - minutes) * 60.0
# Handle floating point precision near 60.0 for seconds/minutes
if abs(seconds - 60.0) < 1e-9:
seconds = 0.0
minutes += 1
if minutes == 60:
minutes = 0
degrees += 1
# Recalculate absolute degrees if degrees incremented past range limit
# (e.g., 89° 59' 59.999" N should become 90° 00' 00.00" N)
if is_latitude and degrees > 90:
degrees = 90
minutes = 0
seconds = 0.0
if not is_latitude and degrees > 180:
degrees = 180
minutes = 0
seconds = 0.0
# Format the final string
dms_string = (
f"{degrees:0{deg_pad}d}° {minutes:02d}' {seconds:05.2f}\" {direction}"
)
# logging.debug(f"{log_prefix} Converted {decimal_degrees:.6f} -> {dms_string}")
return dms_string
except Exception as e:
logging.exception(
f"{log_prefix} Error converting {decimal_degrees} to DMS: {e}"
)
return "Error DMS"
def dms_string_to_decimal(dms_str: str, is_latitude: bool) -> Optional[float]:
"""
Converts a DMS string (e.g., "46° 09' 50.24\" N", "009° 23' 14.11\" E")
to decimal degrees.
Args:
dms_str (str): The DMS string to parse.
is_latitude (bool): True if the string represents latitude, False for longitude.
Returns:
Optional[float]: The coordinate in decimal degrees, or None on parsing error.
"""
log_prefix = "[Utils DMS Parse]"
if not dms_str or not isinstance(dms_str, str):
logging.warning(f"{log_prefix} Invalid input string: {dms_str}")
return None
# Regex to capture degrees, minutes, seconds, and direction
# Allows for variable spacing and ° ' " or ” symbols.
pattern = re.compile(
r"^\s*(\d{1,3})\s*[°]\s*" # Degrees (1-3 digits) + ° symbol
r"(\d{1,2})\s*[']\s*" # Minutes (1-2 digits) + ' symbol
r"([\d.]+)\s*[\"”]\s*" # Seconds (digits, dot) + " or ” symbol
r"([NSEWnsew])\s*$", # Direction (N,S,E,W case-insensitive)
re.IGNORECASE,
)
match = pattern.match(dms_str.strip())
if not match:
logging.error(f"{log_prefix} Could not parse DMS string format: '{dms_str}'")
return None
try:
# Extract components as floats
degrees = float(match.group(1))
minutes = float(match.group(2))
seconds = float(match.group(3))
direction = match.group(4).upper()
# Validate component ranges
if not (0 <= minutes < 60 and 0 <= seconds < 60):
logging.error(
f"{log_prefix} Invalid minutes ({minutes}) or seconds ({seconds}) "
f"in DMS: '{dms_str}'"
)
return None
# Validate direction based on coordinate type
valid_dirs = ("N", "S") if is_latitude else ("E", "W")
if direction not in valid_dirs:
coord_type = "latitude" if is_latitude else "longitude"
logging.error(
f"{log_prefix} Invalid direction '{direction}' for {coord_type}: "
f"'{dms_str}'"
)
return None
# Calculate decimal degrees
decimal_degrees = degrees + (minutes / 60.0) + (seconds / 3600.0)
# Apply sign based on direction
if direction in ("S", "W"):
decimal_degrees *= -1.0
# Final range check for the calculated decimal value
if is_latitude and not (-90.0 <= decimal_degrees <= 90.0):
logging.warning(
f"{log_prefix} Calculated latitude {decimal_degrees:.7f} out of range."
)
return None # Treat out of range as error
if not is_latitude and not (-180.0 <= decimal_degrees <= 180.0):
logging.warning(
f"{log_prefix} Calculated longitude {decimal_degrees:.7f} out of range."
)
return None
logging.debug(f"{log_prefix} Parsed '{dms_str}' -> {decimal_degrees:.7f}")
return decimal_degrees
except ValueError as ve:
# Handle errors during float conversion
logging.error(
f"{log_prefix} Error converting components to float: {ve} in '{dms_str}'"
)
return None
except Exception as e:
logging.exception(
f"{log_prefix} Unexpected error parsing DMS string '{dms_str}':"
)
return None
# --- Metadata Formatting Function ---
def format_ctypes_structure(structure: ctypes.Structure, indent_level: int = 0) -> str:
"""
Generates a formatted string representation of a ctypes Structure,
handling nested structures, arrays, and basic types recursively.
Args:
structure: The ctypes.Structure instance to format.
indent_level: The current indentation level for nested structures.
Returns:
A multi-line string representing the structure's content.
"""
indent = " " * indent_level # Define indentation string
result = ""
# Check if it's actually a structure with _fields_ attribute
if not hasattr(structure, "_fields_"):
# Fallback for non-structures or basic types passed accidentally
return f"{indent}Value: {repr(structure)}\n"
# Iterate through the fields defined in the structure
for field_name, field_type in structure._fields_:
try:
# Get the value of the current field
value = getattr(structure, field_name)
formatted_value = ""
# --- Handle Nested Structures ---
if hasattr(field_type, "_fields_"):
# If the field type itself is another Structure
result += f"{indent}{field_name} ({field_type.__name__}):\n"
# Recursively call formatting for the nested structure
result += format_ctypes_structure(value, indent_level + 1)
# --- Handle Arrays ---
elif issubclass(field_type, ctypes.Array):
array_len = getattr(field_type, "_length_", "N/A")
elem_type = getattr(field_type, "_type_", "N/A")
elem_name = getattr(elem_type, "__name__", "?")
# Special handling for byte arrays (show hex preview)
if issubclass(elem_type, (ctypes.c_byte, ctypes.c_ubyte)):
try:
# Attempt conversion to bytes for hex preview
byte_value = bytes(value[:16]) # Limit preview length
preview = byte_value.hex().upper()
if len(value) > 16:
preview += "..."
formatted_value = f"<{elem_name}[{array_len}] Data: {preview}>"
except: # Fallback if conversion to bytes fails
formatted_value = f"<{elem_name}[{array_len}] (Preview N/A)>"
# Handle simple numeric arrays (show first few elements)
elif isinstance(value, (list, tuple)):
preview = str(list(value[:8])) # Limit preview length
if len(value) > 8:
# Indicate truncation
preview = preview[:-1] + ", ...]"
formatted_value = f"<{elem_name}[{array_len}] Data: {preview}>"
else:
# For other array types, just show type and size
formatted_value = f"<{elem_name}[{array_len}]>"
result += f"{indent}{field_name}: {formatted_value}\n"
# --- Handle Basic Types ---
else:
# --- Special Formatting for Known Fields ---
# GeoData floats (convert radians to degrees for readability)
if field_name in (
"LATITUDE",
"LONGITUDE",
"ORIENTATION",
"POI_LATITUDE",
"POI_LONGITUDE",
"POI_ORIENTATION",
) and isinstance(value, float):
if math.isfinite(value):
deg_value = math.degrees(value)
prec = 2 if "ORIENTATION" in field_name else 6
formatted_value = f"{value:.7f} rad ({deg_value:.{prec}f} deg)"
else:
formatted_value = f"{value} rad (Non-finite)"
# SFP Header special fields (show hex and char if printable)
elif field_name in (
"SFP_MARKER",
"SFP_PT_SPARE",
"SFP_TAG",
"SFP_SRC",
"SFP_TID",
"SFP_FLAGS",
"SFP_WIN",
"SFP_ERR",
"SFP_ERR_INFO",
"SFP_RECTYPE",
"SFP_RECSPARE",
"SFP_PLDAP",
"SFP_PLEXT",
) and isinstance(value, int):
formatted_value = f"0x{value:02X} ({value})"
elif field_name in ("SFP_DIRECTION", "SFP_FLOW") and isinstance(
value, int
):
char_repr = chr(value) if 32 <= value <= 126 else "?"
formatted_value = f"0x{value:02X} ('{char_repr}')"
# Handle byte strings (show decoded + hex preview)
elif isinstance(value, bytes):
try:
decoded = value.decode("ascii", errors="replace")
hex_preview = value.hex().upper()[:32] # Limit hex preview
suffix = "..." if len(value) > 16 else ""
formatted_value = f"'{decoded}' (hex: {hex_preview}{suffix})"
except:
formatted_value = repr(value) # Fallback if not decodable
# Default representation for other types
else:
formatted_value = repr(value)
result += f"{indent}{field_name}: {formatted_value}\n"
except AttributeError:
# Handle cases where getattr might fail (should be rare)
result += f"{indent}{field_name}: <Attribute Error>\n"
except Exception as e:
# Catch any other formatting errors for a specific field
result += f"{indent}{field_name}: <Error Formatting: {e}>\n"
return result
# --- Google Maps Launcher ---
def open_google_maps(latitude_deg: float, longitude_deg: float):
"""Opens the default web browser to Google Maps centered on the specified coordinates."""
log_prefix = "[Utils Gmaps]"
# Validate coordinates
if not (
isinstance(latitude_deg, (int, float))
and math.isfinite(latitude_deg)
and -90 <= latitude_deg <= 90
):
logging.error(f"{log_prefix} Invalid latitude: {latitude_deg}.")
return
if not (
isinstance(longitude_deg, (int, float))
and math.isfinite(longitude_deg)
and -180 <= longitude_deg <= 180
):
logging.error(f"{log_prefix} Invalid longitude: {longitude_deg}.")
return
try:
# Construct URL using the search API for better pinning
query_str = f"{latitude_deg:.7f},{longitude_deg:.7f}"
# URL encode the query string
encoded_query = urllib.parse.quote(query_str)
gmaps_url = f"https://www.google.com/maps/search/?api=1&query={encoded_query}"
logging.info(f"{log_prefix} Opening Google Maps URL: {gmaps_url}")
# Open in a new tab if possible
opened_ok = webbrowser.open_new_tab(gmaps_url)
if not opened_ok:
logging.warning(f"{log_prefix} webbrowser.open_new_tab returned False.")
# Fallback attempt if new tab fails
webbrowser.open(gmaps_url)
except Exception as e:
logging.exception(f"{log_prefix} Failed to open Google Maps URL:")
# --- KML Generation and Management ---
def _calculate_geo_corners_for_kml(
geo_info_radians: Dict[str, Any],
) -> Optional[List[Tuple[float, float]]]:
"""Internal helper to calculate geographic corners (degrees) from geo_info (radians). Requires pyproj."""
# Requires pyproj library
if not _pyproj_available:
logging.error("[Utils KML Calc] pyproj library needed for corner calculation.")
return None
log_prefix = "[Utils KML Calc]"
try:
# Define the geodetic calculator (WGS84 ellipsoid)
geod = pyproj.Geod(ellps="WGS84")
# Extract necessary info from the dictionary
center_lat_rad = geo_info_radians["lat"]
center_lon_rad = geo_info_radians["lon"]
# Orientation: Use the *negative* for forward projection from North
# (Angle typically defines rotation FROM North axis)
orient_rad = geo_info_radians["orientation"]
calc_orient_rad = -orient_rad # Angle used for projection
ref_x = geo_info_radians["ref_x"]
ref_y = geo_info_radians["ref_y"]
scale_x = geo_info_radians["scale_x"]
scale_y = geo_info_radians["scale_y"]
width = geo_info_radians["width_px"]
height = geo_info_radians["height_px"]
# Validate inputs
if not (scale_x > 0 and scale_y > 0 and width > 0 and height > 0):
logging.error(f"{log_prefix} Invalid scale or dimensions in geo_info.")
return None
# Define corner pixel coordinates relative to reference pixel (ref_x, ref_y)
# Top-Left (0,0), Top-Right (w-1, 0), Bottom-Right (w-1, h-1), Bottom-Left (0, h-1)
# Note: Pixel Y increases downwards
corners_pixel_delta = [
(0 - ref_x, ref_y - 0), # Top-Left Delta
(width - 1 - ref_x, ref_y - 0), # Top-Right Delta
(width - 1 - ref_x, ref_y - (height - 1)), # Bottom-Right Delta
(0 - ref_x, ref_y - (height - 1)), # Bottom-Left Delta
]
# Convert pixel deltas to meter deltas (unrotated)
corners_meters = [
(dx * scale_x, dy * scale_y) for dx, dy in corners_pixel_delta
]
# Rotate meter deltas based on orientation if significant
corners_meters_rotated = corners_meters
if abs(calc_orient_rad) > 1e-6: # Only rotate if angle is non-zero
cos_o = math.cos(calc_orient_rad)
sin_o = math.sin(calc_orient_rad)
corners_meters_rotated = [
(dx * cos_o - dy * sin_o, dx * sin_o + dy * cos_o)
for dx, dy in corners_meters
]
# Calculate geographic coordinates for each corner using forward projection
corners_geo_deg = []
center_lon_deg = math.degrees(center_lon_rad)
center_lat_deg = math.degrees(center_lat_rad)
for dx_m, dy_m in corners_meters_rotated:
# Calculate distance and azimuth from center to corner
distance = math.hypot(dx_m, dy_m)
# Azimuth: angle from North clockwise (atan2 gives angle from East CCW)
azimuth = math.degrees(math.atan2(dx_m, dy_m)) % 360.0 # Ensure 0-360 range
# Use pyproj geod.fwd for accurate projection
lon, lat, _ = geod.fwd(center_lon_deg, center_lat_deg, azimuth, distance)
corners_geo_deg.append((lon, lat)) # Store (longitude, latitude)
# Ensure we have exactly 4 corners
if len(corners_geo_deg) == 4:
# logging.debug(f"{log_prefix} Calculated corners (Lon, Lat): {corners_geo_deg}")
return corners_geo_deg
else:
logging.error(f"{log_prefix} Incorrect number of corners calculated.")
return None
except KeyError as ke:
logging.error(f"{log_prefix} Missing required key in geo_info_radians: {ke}")
return None
except Exception as e:
logging.exception(f"{log_prefix} Error calculating geographic corners for KML:")
return None
def generate_sar_kml(geo_info_radians: Dict[str, Any], output_path: str) -> bool:
"""
Generates a KML file representing the SAR footprint area as a polygon.
Args:
geo_info_radians (Dict[str, Any]): Dictionary with SAR georeferencing.
output_path (str): The full path where the KML file should be saved.
Returns:
bool: True if KML generation and saving were successful, False otherwise.
"""
log_prefix = "[Utils KML Gen]"
# Check for required libraries
if not _simplekml_available or not _pyproj_available:
logging.error(f"{log_prefix} Cannot generate KML: simplekml or pyproj missing.")
return False
# Check for valid input geo_info
if not geo_info_radians or not geo_info_radians.get("valid", False):
logging.warning(f"{log_prefix} Cannot generate KML: Invalid geo_info provided.")
return False
try:
# Calculate geographic corner coordinates
corners_deg = _calculate_geo_corners_for_kml(geo_info_radians)
if corners_deg is None:
logging.error(f"{log_prefix} Failed to calculate KML corners.")
return False # Error already logged by helper
# Extract center and orientation for LookAt view
center_lon_deg = math.degrees(geo_info_radians["lon"])
center_lat_deg = math.degrees(geo_info_radians["lat"])
orientation_deg = math.degrees(geo_info_radians.get("orientation", 0.0))
# Estimate a reasonable viewing altitude based on image size
try:
width_km = (
geo_info_radians.get("scale_x", 1.0)
* geo_info_radians.get("width_px", 1000)
) / 1000.0
height_km = (
geo_info_radians.get("scale_y", 1.0)
* geo_info_radians.get("height_px", 1000)
) / 1000.0
# Simple heuristic for altitude (e.g., 2x the larger dimension in km * 1000)
view_altitude_m = max(width_km, height_km) * 2000.0
# Clamp altitude to a reasonable range
view_altitude_m = max(1000.0, min(view_altitude_m, 500000.0))
except:
view_altitude_m = 10000.0 # Default altitude on error
# Create KML object
kml = simplekml.Kml(name=f"SAR Image {datetime.datetime.now():%Y%m%d_%H%M%S}")
# Set LookAt view
kml.document.lookat.longitude = center_lon_deg
kml.document.lookat.latitude = center_lat_deg
kml.document.lookat.range = view_altitude_m
kml.document.lookat.tilt = 45.0 # Default tilt
kml.document.lookat.heading = orientation_deg % 360.0 # Ensure 0-360 heading
kml.document.lookat.altitudemode = simplekml.AltitudeMode.relativetoground
# Create the polygon footprint
# KML uses (longitude, latitude, altitude) tuples
# Close the polygon by repeating the first point at the end
outer_boundary = [(lon, lat, 0) for lon, lat in corners_deg] + [
(corners_deg[0][0], corners_deg[0][1], 0)
]
polygon = kml.newpolygon(name="SAR Footprint", outerboundaryis=outer_boundary)
# Style the polygon (red outline, semi-transparent red fill)
polygon.style.linestyle.color = simplekml.Color.red
polygon.style.linestyle.width = 2
polygon.style.polystyle.color = simplekml.Color.changealphaint(
100, simplekml.Color.red # ~40% opacity red fill
)
polygon.style.polystyle.outline = 1 # Ensure outline is drawn
# Save the KML file
kml.save(output_path)
logging.debug(f"{log_prefix} KML file saved successfully: {output_path}")
return True
except Exception as e:
logging.exception(f"{log_prefix} Error generating or saving KML file:")
return False
def generate_lookat_and_point_kml(
latitude_deg: float,
longitude_deg: float,
altitude_m: float = 10000.0,
tilt: float = 45.0,
heading: float = 0.0,
placemark_name: str = "Selected Location",
placemark_desc: Optional[str] = None,
) -> Optional[str]:
"""
Generates a temporary KML file containing a LookAt element and a Placemark point.
Args:
latitude_deg (float): Target latitude in decimal degrees.
longitude_deg (float): Target longitude in decimal degrees.
altitude_m (float): Viewing altitude in meters for LookAt.
tilt (float): Camera tilt angle for LookAt.
heading (float): Camera heading/azimuth for LookAt.
placemark_name (str): Name for the placemark.
placemark_desc (Optional[str]): Optional description for the placemark.
Returns:
Optional[str]: Path to the temporary KML file, or None on error.
"""
log_prefix = "[Utils KML LookAtPoint]"
if not _simplekml_available:
logging.error(f"{log_prefix} Cannot generate KML: simplekml missing.")
return None
# Validate coordinates
if not (
isinstance(latitude_deg, (int, float))
and math.isfinite(latitude_deg)
and -90 <= latitude_deg <= 90
):
logging.error(f"{log_prefix} Invalid latitude provided: {latitude_deg}.")
return None
if not (
isinstance(longitude_deg, (int, float))
and math.isfinite(longitude_deg)
and -180 <= longitude_deg <= 180
):
logging.error(f"{log_prefix} Invalid longitude provided: {longitude_deg}.")
return None
try:
kml = simplekml.Kml(name=placemark_name) # Use name for KML document too
# Set LookAt parameters
kml.document.lookat.longitude = longitude_deg
kml.document.lookat.latitude = latitude_deg
kml.document.lookat.range = max(100.0, altitude_m) # Min range 100m
kml.document.lookat.tilt = tilt
kml.document.lookat.heading = heading % 360.0
kml.document.lookat.altitudemode = simplekml.AltitudeMode.relativetoground
# Add the Placemark point
point = kml.newpoint(name=placemark_name)
# simplekml uses (lon, lat, optional_alt) order for coordinates
point.coords = [(longitude_deg, latitude_deg)]
if placemark_desc:
point.description = placemark_desc
# Create a temporary file (delete=False so GE can open it)
# Consider using a subfolder in the main app temp/cache dir?
with tempfile.NamedTemporaryFile(
mode="w", suffix=".kml", delete=False, encoding="utf-8"
) as temp_kml:
temp_kml_path = temp_kml.name
logging.debug(
f"{log_prefix} Saving temporary LookAt+Point KML to: {temp_kml_path}"
)
temp_kml.write(kml.kml()) # Write KML content
logging.debug(f"{log_prefix} LookAt+Point KML file created successfully.")
return temp_kml_path
except Exception as e:
logging.exception(f"{log_prefix} Error generating LookAt+Point KML file:")
return None
def generate_multi_point_kml(
points_data: List[Tuple[float, float, str, Optional[str]]],
) -> Optional[str]:
"""
Generates a temporary KML file containing multiple Placemarks (points)
and a LookAt view potentially encompassing them.
Args:
points_data (List[Tuple[float, float, str, Optional[str]]]):
List of (latitude_deg, longitude_deg, placemark_name, optional_desc).
Returns:
Optional[str]: Path to the temporary KML file, or None on error.
"""
log_prefix = "[Utils KML MultiPoint]"
if not _simplekml_available:
logging.error(f"{log_prefix} Cannot generate KML: simplekml missing.")
return None
if not points_data:
logging.warning(f"{log_prefix} No points provided to generate KML.")
return None
try:
kml = simplekml.Kml(name="Multiple Locations")
# --- Determine LookAt View (Look at first point) ---
first_lat, first_lon, _, _ = points_data[0]
kml.document.lookat.longitude = first_lon
kml.document.lookat.latitude = first_lat
kml.document.lookat.range = 20000 # Default range for multiple points
kml.document.lookat.tilt = 30
kml.document.lookat.heading = 0
kml.document.lookat.altitudemode = simplekml.AltitudeMode.relativetoground
# --- Define styles ---
style_sar_center = simplekml.Style()
style_sar_center.iconstyle.icon.href = (
"http://maps.google.com/mapfiles/kml/paddle/red-stars.png"
)
style_sar_center.iconstyle.scale = 1.1
style_mouse_on_sar = simplekml.Style()
style_mouse_on_sar.iconstyle.icon.href = (
"http://maps.google.com/mapfiles/kml/paddle/blu-circle.png"
)
style_mouse_on_sar.iconstyle.scale = 1.1
style_mouse_on_map = simplekml.Style()
style_mouse_on_map.iconstyle.icon.href = (
"http://maps.google.com/mapfiles/kml/paddle/grn-diamond.png"
)
style_mouse_on_map.iconstyle.scale = 1.1
# --- Add Placemarks ---
logging.debug(f"{log_prefix} Adding {len(points_data)} placemarks to KML.")
for lat, lon, name, desc in points_data:
point = kml.newpoint(name=name)
point.coords = [(lon, lat)] # (lon, lat) order
if desc:
point.description = desc
# Apply style based on name
if name == "SAR Center":
point.style = style_sar_center
elif name == "Mouse on SAR":
point.style = style_mouse_on_sar
elif name == "Mouse on Map":
point.style = style_mouse_on_map
# else: default style
# --- Save to Temporary File ---
with tempfile.NamedTemporaryFile(
mode="w", suffix=".kml", delete=False, encoding="utf-8"
) as temp_kml:
temp_kml_path = temp_kml.name
logging.debug(
f"{log_prefix} Saving temporary Multi-Point KML to: {temp_kml_path}"
)
temp_kml.write(kml.kml())
logging.debug(f"{log_prefix} Multi-Point KML file created successfully.")
return temp_kml_path
except Exception as e:
logging.exception(f"{log_prefix} Error generating Multi-Point KML file:")
return None
def launch_google_earth(kml_path: Optional[str]):
"""
Attempts to open a KML file using the system's default application
or google-earth-pro directly.
Args:
kml_path (Optional[str]): The path to the KML file to launch.
"""
log_prefix = "[Utils Launch GE]"
if not kml_path or not os.path.exists(kml_path):
logging.error(
f"{log_prefix} Cannot launch: KML file path invalid or not found: {kml_path}"
)
return
logging.info(f"{log_prefix} Attempting launch for KML: {kml_path}")
try:
if sys.platform == "win32":
# Use os.startfile on Windows
os.startfile(kml_path)
elif sys.platform == "darwin":
# Use 'open' command on macOS
subprocess.run(["open", kml_path], check=True)
else:
# Use 'google-earth-pro' or 'xdg-open' on Linux/Unix
cmd = shutil.which("google-earth-pro") or shutil.which("xdg-open")
if cmd:
logging.debug(f"{log_prefix} Using command: {cmd}")
if cmd.endswith("pro"):
# Popen for GE Pro to run in background potentially
subprocess.Popen([cmd, kml_path])
else:
# run for xdg-open, wait for it?
subprocess.run([cmd, kml_path], check=True)
else:
# Log error if no suitable command found
err_msg = "Neither google-earth-pro nor xdg-open found in PATH."
logging.error(f"{log_prefix} {err_msg}")
raise FileNotFoundError(err_msg)
except Exception as e:
logging.exception(f"{log_prefix} Error launching KML handler:")
def cleanup_old_kml_files(kml_directory: str, max_files_to_keep: int):
"""Removes the oldest KML files from the directory if count exceeds limit."""
log_prefix = "[Utils KML Cleanup]"
# Skip if cleanup is disabled or directory invalid
if max_files_to_keep <= 0:
return
try:
kml_dir_path = Path(kml_directory)
if not kml_dir_path.is_dir():
logging.warning(
f"{log_prefix} KML directory not found: '{kml_directory}'. Cannot cleanup."
)
return
# Get list of KML files with modification times
kml_files = []
for item in kml_dir_path.glob("*.kml"):
if item.is_file():
try:
# Get modification time
mtime = item.stat().st_mtime
kml_files.append((item, mtime))
except Exception as stat_e:
# Log error getting stats but continue with others
logging.warning(
f"{log_prefix} Could not stat file '{item.name}': {stat_e}"
)
current_count = len(kml_files)
# Only proceed if count exceeds the limit
if current_count <= max_files_to_keep:
return
# Sort files by modification time (oldest first)
kml_files.sort(key=lambda x: x[1])
# Determine number of files to delete
num_to_delete = current_count - max_files_to_keep
# Get the list of files to delete
files_to_delete = kml_files[:num_to_delete]
logging.info(
f"{log_prefix} Found {current_count} KML files. "
f"Attempting to delete oldest {num_to_delete}..."
)
deleted_count = 0
# Iterate and delete oldest files
for file_path, _ in files_to_delete:
try:
file_path.unlink() # Delete the file
logging.debug(f"{log_prefix} Deleted old KML: {file_path.name}")
deleted_count += 1
except Exception as delete_e:
# Log error deleting specific file but continue
logging.error(
f"{log_prefix} Failed to delete file '{file_path.name}': {delete_e}"
)
logging.info(
f"{log_prefix} Cleanup finished. Deleted {deleted_count}/{num_to_delete} files."
)
except Exception as e:
logging.exception(f"{log_prefix} Error during KML cleanup process:")
# --- END OF FILE utils.py ---