929 lines
36 KiB
Python
929 lines
36 KiB
Python
# --- START OF FILE utils.py ---
|
|
|
|
# utils.py
|
|
"""
|
|
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
|
|
BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
|
|
PARTICULAR PURPOSE ARE DISCLAIMED.
|
|
|
|
Provides utility functions used throughout the application, including
|
|
safe queue management (put, clear), coordinate formatting (decimal degrees to DMS
|
|
and DMS string to decimal), KML generation and management, and opening web maps.
|
|
Uses standardized logging prefixes. Drop counts are managed within AppState.
|
|
"""
|
|
|
|
# Standard library imports
|
|
import queue
|
|
import logging
|
|
import math
|
|
import os
|
|
import datetime
|
|
import sys
|
|
import subprocess
|
|
import shutil
|
|
from pathlib import Path
|
|
import webbrowser
|
|
import urllib.parse
|
|
import re # For DMS parsing
|
|
from typing import Optional, Tuple, List, Any, Dict # Added List, Any, Dict
|
|
import tempfile
|
|
import ctypes # Needed for format_ctypes_structure
|
|
|
|
|
|
# Import KML and GEO libraries, handling ImportError
|
|
try:
|
|
import simplekml
|
|
|
|
_simplekml_available = True
|
|
except ImportError:
|
|
simplekml = None
|
|
_simplekml_available = False
|
|
logging.warning(
|
|
"[Utils KML] Library 'simplekml' not found. KML generation disabled. "
|
|
"(pip install simplekml)"
|
|
)
|
|
try:
|
|
import pyproj
|
|
|
|
_pyproj_available = True
|
|
except ImportError:
|
|
pyproj = None
|
|
_pyproj_available = False
|
|
# Pyproj is needed for accurate geo calculations, log warning if missing
|
|
logging.warning(
|
|
"[Utils Geo] Library 'pyproj' not found. "
|
|
"Some geometric calculations (KML corners, BBox) may be less accurate. "
|
|
"(pip install pyproj)"
|
|
)
|
|
|
|
|
|
# --- Queue Management Functions ---
|
|
def put_queue(queue_obj, item, queue_name="Unknown", app_instance=None):
|
|
"""
|
|
Safely puts an item into a queue using non-blocking put.
|
|
Increments specific drop counters in the AppState object if the queue is full.
|
|
Discards item if the application is shutting down.
|
|
|
|
Args:
|
|
queue_obj (queue.Queue): The queue instance.
|
|
item (Any): The item to put into the queue.
|
|
queue_name (str): Name of the queue (for logging/stats).
|
|
app_instance (Optional[ControlPanelApp]): Reference to the main app instance
|
|
(for shutdown check and drop count).
|
|
"""
|
|
log_prefix = "[Utils Queue Put]"
|
|
# Check shutdown flag via app_instance's state
|
|
if (
|
|
app_instance
|
|
and hasattr(app_instance, "state")
|
|
and app_instance.state.shutting_down
|
|
):
|
|
# Silently discard if shutting down (or log debug -1 if needed)
|
|
# logging.log(logging.DEBUG - 1, f"{log_prefix} Discarding item for queue '{queue_name}': Shutdown.")
|
|
return
|
|
try:
|
|
# Put item without blocking
|
|
queue_obj.put(item, block=False)
|
|
# Log successful put at very low debug level if needed
|
|
# logging.log(logging.DEBUG - 1, f"{log_prefix} Item put onto queue '{queue_name}'.")
|
|
except queue.Full:
|
|
# Log queue full warning (consider reducing level if too noisy)
|
|
logging.warning(f"{log_prefix} Queue '{queue_name}' is full. Dropping item.")
|
|
# Increment the specific drop counter via the AppState instance
|
|
if app_instance and hasattr(app_instance, "state"):
|
|
try:
|
|
app_instance.state.increment_dropped_count(queue_name)
|
|
except Exception as e:
|
|
# Log error only if incrementing fails (shouldn't happen)
|
|
logging.error(
|
|
f"{log_prefix} Failed to increment drop count for '{queue_name}': {e}"
|
|
)
|
|
else:
|
|
# Log error if app instance/state is not available to increment count
|
|
logging.error(
|
|
f"{log_prefix} Cannot increment drop count for '{queue_name}': "
|
|
"App instance or state missing."
|
|
)
|
|
except Exception as e:
|
|
# Log any other unexpected errors during queue put
|
|
logging.exception(
|
|
f"{log_prefix} Unexpected error putting item onto queue '{queue_name}': {e}"
|
|
)
|
|
|
|
|
|
def clear_queue(q: queue.Queue):
|
|
"""Removes all items currently in the specified queue."""
|
|
log_prefix = "[Utils Queue Clear]"
|
|
try:
|
|
q_size_before = q.qsize() # Get approximate size before clearing
|
|
# Ensure thread safety during clear operation
|
|
with q.mutex:
|
|
q.queue.clear()
|
|
logging.debug(
|
|
f"{log_prefix} Cleared queue (approx size before: {q_size_before})."
|
|
)
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error clearing queue: {e}")
|
|
|
|
|
|
# --- Coordinate Formatting ---
|
|
def decimal_to_dms(decimal_degrees: float, is_latitude: bool) -> str:
|
|
"""
|
|
Converts decimal degrees to a formatted DMS (Degrees, Minutes, Seconds) string.
|
|
Returns "N/A", "Invalid Lat/Lon", or "Error DMS" on error.
|
|
|
|
Args:
|
|
decimal_degrees (float): Coordinate value in decimal degrees.
|
|
is_latitude (bool): True if latitude, False if longitude.
|
|
|
|
Returns:
|
|
str: Formatted DMS string or an error indicator.
|
|
"""
|
|
log_prefix = "[Utils DMS Conv]"
|
|
coord_type = "Latitude" if is_latitude else "Longitude"
|
|
|
|
try:
|
|
# Input validation
|
|
if (
|
|
not isinstance(decimal_degrees, (int, float))
|
|
or math.isnan(decimal_degrees)
|
|
or math.isinf(decimal_degrees)
|
|
):
|
|
logging.warning(
|
|
f"{log_prefix} Invalid input type or value ({decimal_degrees}) "
|
|
f"for {coord_type}."
|
|
)
|
|
return "N/A"
|
|
|
|
# Range check and direction determination
|
|
if is_latitude:
|
|
if not (-90.0 <= decimal_degrees <= 90.0):
|
|
logging.warning(
|
|
f"{log_prefix} Latitude {decimal_degrees} out of range."
|
|
)
|
|
return "Invalid Lat"
|
|
direction = "N" if decimal_degrees >= 0 else "S"
|
|
deg_pad = 2 # Padding for degrees (e.g., 01°, 89°)
|
|
else: # Longitude
|
|
if not (-180.0 <= decimal_degrees <= 180.0):
|
|
logging.warning(
|
|
f"{log_prefix} Longitude {decimal_degrees} out of range."
|
|
)
|
|
return "Invalid Lon"
|
|
direction = "E" if decimal_degrees >= 0 else "W"
|
|
deg_pad = 3 # Padding for degrees (e.g., 001°, 179°)
|
|
|
|
# Calculations
|
|
dd_abs = abs(decimal_degrees)
|
|
degrees = math.floor(dd_abs)
|
|
minutes_dec = (dd_abs - degrees) * 60.0
|
|
minutes = math.floor(minutes_dec)
|
|
seconds = (minutes_dec - minutes) * 60.0
|
|
|
|
# Handle floating point precision near 60.0 for seconds/minutes
|
|
if abs(seconds - 60.0) < 1e-9:
|
|
seconds = 0.0
|
|
minutes += 1
|
|
if minutes == 60:
|
|
minutes = 0
|
|
degrees += 1
|
|
# Recalculate absolute degrees if degrees incremented past range limit
|
|
# (e.g., 89° 59' 59.999" N should become 90° 00' 00.00" N)
|
|
if is_latitude and degrees > 90:
|
|
degrees = 90
|
|
minutes = 0
|
|
seconds = 0.0
|
|
if not is_latitude and degrees > 180:
|
|
degrees = 180
|
|
minutes = 0
|
|
seconds = 0.0
|
|
|
|
# Format the final string
|
|
dms_string = (
|
|
f"{degrees:0{deg_pad}d}° {minutes:02d}' {seconds:05.2f}\" {direction}"
|
|
)
|
|
# logging.debug(f"{log_prefix} Converted {decimal_degrees:.6f} -> {dms_string}")
|
|
return dms_string
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Error converting {decimal_degrees} to DMS: {e}"
|
|
)
|
|
return "Error DMS"
|
|
|
|
|
|
def dms_string_to_decimal(dms_str: str, is_latitude: bool) -> Optional[float]:
|
|
"""
|
|
Converts a DMS string (e.g., "46° 09' 50.24\" N", "009° 23' 14.11\" E")
|
|
to decimal degrees.
|
|
|
|
Args:
|
|
dms_str (str): The DMS string to parse.
|
|
is_latitude (bool): True if the string represents latitude, False for longitude.
|
|
|
|
Returns:
|
|
Optional[float]: The coordinate in decimal degrees, or None on parsing error.
|
|
"""
|
|
log_prefix = "[Utils DMS Parse]"
|
|
if not dms_str or not isinstance(dms_str, str):
|
|
logging.warning(f"{log_prefix} Invalid input string: {dms_str}")
|
|
return None
|
|
|
|
# Regex to capture degrees, minutes, seconds, and direction
|
|
# Allows for variable spacing and ° ' " or ” symbols.
|
|
pattern = re.compile(
|
|
r"^\s*(\d{1,3})\s*[°]\s*" # Degrees (1-3 digits) + ° symbol
|
|
r"(\d{1,2})\s*[']\s*" # Minutes (1-2 digits) + ' symbol
|
|
r"([\d.]+)\s*[\"”]\s*" # Seconds (digits, dot) + " or ” symbol
|
|
r"([NSEWnsew])\s*$", # Direction (N,S,E,W case-insensitive)
|
|
re.IGNORECASE,
|
|
)
|
|
match = pattern.match(dms_str.strip())
|
|
|
|
if not match:
|
|
logging.error(f"{log_prefix} Could not parse DMS string format: '{dms_str}'")
|
|
return None
|
|
|
|
try:
|
|
# Extract components as floats
|
|
degrees = float(match.group(1))
|
|
minutes = float(match.group(2))
|
|
seconds = float(match.group(3))
|
|
direction = match.group(4).upper()
|
|
|
|
# Validate component ranges
|
|
if not (0 <= minutes < 60 and 0 <= seconds < 60):
|
|
logging.error(
|
|
f"{log_prefix} Invalid minutes ({minutes}) or seconds ({seconds}) "
|
|
f"in DMS: '{dms_str}'"
|
|
)
|
|
return None
|
|
|
|
# Validate direction based on coordinate type
|
|
valid_dirs = ("N", "S") if is_latitude else ("E", "W")
|
|
if direction not in valid_dirs:
|
|
coord_type = "latitude" if is_latitude else "longitude"
|
|
logging.error(
|
|
f"{log_prefix} Invalid direction '{direction}' for {coord_type}: "
|
|
f"'{dms_str}'"
|
|
)
|
|
return None
|
|
|
|
# Calculate decimal degrees
|
|
decimal_degrees = degrees + (minutes / 60.0) + (seconds / 3600.0)
|
|
|
|
# Apply sign based on direction
|
|
if direction in ("S", "W"):
|
|
decimal_degrees *= -1.0
|
|
|
|
# Final range check for the calculated decimal value
|
|
if is_latitude and not (-90.0 <= decimal_degrees <= 90.0):
|
|
logging.warning(
|
|
f"{log_prefix} Calculated latitude {decimal_degrees:.7f} out of range."
|
|
)
|
|
return None # Treat out of range as error
|
|
if not is_latitude and not (-180.0 <= decimal_degrees <= 180.0):
|
|
logging.warning(
|
|
f"{log_prefix} Calculated longitude {decimal_degrees:.7f} out of range."
|
|
)
|
|
return None
|
|
|
|
logging.debug(f"{log_prefix} Parsed '{dms_str}' -> {decimal_degrees:.7f}")
|
|
return decimal_degrees
|
|
|
|
except ValueError as ve:
|
|
# Handle errors during float conversion
|
|
logging.error(
|
|
f"{log_prefix} Error converting components to float: {ve} in '{dms_str}'"
|
|
)
|
|
return None
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Unexpected error parsing DMS string '{dms_str}':"
|
|
)
|
|
return None
|
|
|
|
|
|
# --- Metadata Formatting Function ---
|
|
def format_ctypes_structure(structure: ctypes.Structure, indent_level: int = 0) -> str:
|
|
"""
|
|
Generates a formatted string representation of a ctypes Structure,
|
|
handling nested structures, arrays, and basic types recursively.
|
|
|
|
Args:
|
|
structure: The ctypes.Structure instance to format.
|
|
indent_level: The current indentation level for nested structures.
|
|
|
|
Returns:
|
|
A multi-line string representing the structure's content.
|
|
"""
|
|
indent = " " * indent_level # Define indentation string
|
|
result = ""
|
|
|
|
# Check if it's actually a structure with _fields_ attribute
|
|
if not hasattr(structure, "_fields_"):
|
|
# Fallback for non-structures or basic types passed accidentally
|
|
return f"{indent}Value: {repr(structure)}\n"
|
|
|
|
# Iterate through the fields defined in the structure
|
|
for field_name, field_type in structure._fields_:
|
|
try:
|
|
# Get the value of the current field
|
|
value = getattr(structure, field_name)
|
|
formatted_value = ""
|
|
|
|
# --- Handle Nested Structures ---
|
|
if hasattr(field_type, "_fields_"):
|
|
# If the field type itself is another Structure
|
|
result += f"{indent}{field_name} ({field_type.__name__}):\n"
|
|
# Recursively call formatting for the nested structure
|
|
result += format_ctypes_structure(value, indent_level + 1)
|
|
|
|
# --- Handle Arrays ---
|
|
elif issubclass(field_type, ctypes.Array):
|
|
array_len = getattr(field_type, "_length_", "N/A")
|
|
elem_type = getattr(field_type, "_type_", "N/A")
|
|
elem_name = getattr(elem_type, "__name__", "?")
|
|
|
|
# Special handling for byte arrays (show hex preview)
|
|
if issubclass(elem_type, (ctypes.c_byte, ctypes.c_ubyte)):
|
|
try:
|
|
# Attempt conversion to bytes for hex preview
|
|
byte_value = bytes(value[:16]) # Limit preview length
|
|
preview = byte_value.hex().upper()
|
|
if len(value) > 16:
|
|
preview += "..."
|
|
formatted_value = f"<{elem_name}[{array_len}] Data: {preview}>"
|
|
except: # Fallback if conversion to bytes fails
|
|
formatted_value = f"<{elem_name}[{array_len}] (Preview N/A)>"
|
|
|
|
# Handle simple numeric arrays (show first few elements)
|
|
elif isinstance(value, (list, tuple)):
|
|
preview = str(list(value[:8])) # Limit preview length
|
|
if len(value) > 8:
|
|
# Indicate truncation
|
|
preview = preview[:-1] + ", ...]"
|
|
formatted_value = f"<{elem_name}[{array_len}] Data: {preview}>"
|
|
else:
|
|
# For other array types, just show type and size
|
|
formatted_value = f"<{elem_name}[{array_len}]>"
|
|
|
|
result += f"{indent}{field_name}: {formatted_value}\n"
|
|
|
|
# --- Handle Basic Types ---
|
|
else:
|
|
# --- Special Formatting for Known Fields ---
|
|
# GeoData floats (convert radians to degrees for readability)
|
|
if field_name in (
|
|
"LATITUDE",
|
|
"LONGITUDE",
|
|
"ORIENTATION",
|
|
"POI_LATITUDE",
|
|
"POI_LONGITUDE",
|
|
"POI_ORIENTATION",
|
|
) and isinstance(value, float):
|
|
if math.isfinite(value):
|
|
deg_value = math.degrees(value)
|
|
prec = 2 if "ORIENTATION" in field_name else 6
|
|
formatted_value = f"{value:.7f} rad ({deg_value:.{prec}f} deg)"
|
|
else:
|
|
formatted_value = f"{value} rad (Non-finite)"
|
|
# SFP Header special fields (show hex and char if printable)
|
|
elif field_name in (
|
|
"SFP_MARKER",
|
|
"SFP_PT_SPARE",
|
|
"SFP_TAG",
|
|
"SFP_SRC",
|
|
"SFP_TID",
|
|
"SFP_FLAGS",
|
|
"SFP_WIN",
|
|
"SFP_ERR",
|
|
"SFP_ERR_INFO",
|
|
"SFP_RECTYPE",
|
|
"SFP_RECSPARE",
|
|
"SFP_PLDAP",
|
|
"SFP_PLEXT",
|
|
) and isinstance(value, int):
|
|
formatted_value = f"0x{value:02X} ({value})"
|
|
elif field_name in ("SFP_DIRECTION", "SFP_FLOW") and isinstance(
|
|
value, int
|
|
):
|
|
char_repr = chr(value) if 32 <= value <= 126 else "?"
|
|
formatted_value = f"0x{value:02X} ('{char_repr}')"
|
|
# Handle byte strings (show decoded + hex preview)
|
|
elif isinstance(value, bytes):
|
|
try:
|
|
decoded = value.decode("ascii", errors="replace")
|
|
hex_preview = value.hex().upper()[:32] # Limit hex preview
|
|
suffix = "..." if len(value) > 16 else ""
|
|
formatted_value = f"'{decoded}' (hex: {hex_preview}{suffix})"
|
|
except:
|
|
formatted_value = repr(value) # Fallback if not decodable
|
|
# Default representation for other types
|
|
else:
|
|
formatted_value = repr(value)
|
|
|
|
result += f"{indent}{field_name}: {formatted_value}\n"
|
|
|
|
except AttributeError:
|
|
# Handle cases where getattr might fail (should be rare)
|
|
result += f"{indent}{field_name}: <Attribute Error>\n"
|
|
except Exception as e:
|
|
# Catch any other formatting errors for a specific field
|
|
result += f"{indent}{field_name}: <Error Formatting: {e}>\n"
|
|
|
|
return result
|
|
|
|
|
|
# --- Google Maps Launcher ---
|
|
def open_google_maps(latitude_deg: float, longitude_deg: float):
|
|
"""Opens the default web browser to Google Maps centered on the specified coordinates."""
|
|
log_prefix = "[Utils Gmaps]"
|
|
# Validate coordinates
|
|
if not (
|
|
isinstance(latitude_deg, (int, float))
|
|
and math.isfinite(latitude_deg)
|
|
and -90 <= latitude_deg <= 90
|
|
):
|
|
logging.error(f"{log_prefix} Invalid latitude: {latitude_deg}.")
|
|
return
|
|
if not (
|
|
isinstance(longitude_deg, (int, float))
|
|
and math.isfinite(longitude_deg)
|
|
and -180 <= longitude_deg <= 180
|
|
):
|
|
logging.error(f"{log_prefix} Invalid longitude: {longitude_deg}.")
|
|
return
|
|
try:
|
|
# Construct URL using the search API for better pinning
|
|
query_str = f"{latitude_deg:.7f},{longitude_deg:.7f}"
|
|
# URL encode the query string
|
|
encoded_query = urllib.parse.quote(query_str)
|
|
gmaps_url = f"https://www.google.com/maps/search/?api=1&query={encoded_query}"
|
|
|
|
logging.info(f"{log_prefix} Opening Google Maps URL: {gmaps_url}")
|
|
# Open in a new tab if possible
|
|
opened_ok = webbrowser.open_new_tab(gmaps_url)
|
|
if not opened_ok:
|
|
logging.warning(f"{log_prefix} webbrowser.open_new_tab returned False.")
|
|
# Fallback attempt if new tab fails
|
|
webbrowser.open(gmaps_url)
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Failed to open Google Maps URL:")
|
|
|
|
|
|
# --- KML Generation and Management ---
|
|
def _calculate_geo_corners_for_kml(
|
|
geo_info_radians: Dict[str, Any],
|
|
) -> Optional[List[Tuple[float, float]]]:
|
|
"""Internal helper to calculate geographic corners (degrees) from geo_info (radians). Requires pyproj."""
|
|
# Requires pyproj library
|
|
if not _pyproj_available:
|
|
logging.error("[Utils KML Calc] pyproj library needed for corner calculation.")
|
|
return None
|
|
|
|
log_prefix = "[Utils KML Calc]"
|
|
try:
|
|
# Define the geodetic calculator (WGS84 ellipsoid)
|
|
geod = pyproj.Geod(ellps="WGS84")
|
|
|
|
# Extract necessary info from the dictionary
|
|
center_lat_rad = geo_info_radians["lat"]
|
|
center_lon_rad = geo_info_radians["lon"]
|
|
# Orientation: Use the *negative* for forward projection from North
|
|
# (Angle typically defines rotation FROM North axis)
|
|
orient_rad = geo_info_radians["orientation"]
|
|
calc_orient_rad = -orient_rad # Angle used for projection
|
|
|
|
ref_x = geo_info_radians["ref_x"]
|
|
ref_y = geo_info_radians["ref_y"]
|
|
scale_x = geo_info_radians["scale_x"]
|
|
scale_y = geo_info_radians["scale_y"]
|
|
width = geo_info_radians["width_px"]
|
|
height = geo_info_radians["height_px"]
|
|
|
|
# Validate inputs
|
|
if not (scale_x > 0 and scale_y > 0 and width > 0 and height > 0):
|
|
logging.error(f"{log_prefix} Invalid scale or dimensions in geo_info.")
|
|
return None
|
|
|
|
# Define corner pixel coordinates relative to reference pixel (ref_x, ref_y)
|
|
# Top-Left (0,0), Top-Right (w-1, 0), Bottom-Right (w-1, h-1), Bottom-Left (0, h-1)
|
|
# Note: Pixel Y increases downwards
|
|
corners_pixel_delta = [
|
|
(0 - ref_x, ref_y - 0), # Top-Left Delta
|
|
(width - 1 - ref_x, ref_y - 0), # Top-Right Delta
|
|
(width - 1 - ref_x, ref_y - (height - 1)), # Bottom-Right Delta
|
|
(0 - ref_x, ref_y - (height - 1)), # Bottom-Left Delta
|
|
]
|
|
|
|
# Convert pixel deltas to meter deltas (unrotated)
|
|
corners_meters = [
|
|
(dx * scale_x, dy * scale_y) for dx, dy in corners_pixel_delta
|
|
]
|
|
|
|
# Rotate meter deltas based on orientation if significant
|
|
corners_meters_rotated = corners_meters
|
|
if abs(calc_orient_rad) > 1e-6: # Only rotate if angle is non-zero
|
|
cos_o = math.cos(calc_orient_rad)
|
|
sin_o = math.sin(calc_orient_rad)
|
|
corners_meters_rotated = [
|
|
(dx * cos_o - dy * sin_o, dx * sin_o + dy * cos_o)
|
|
for dx, dy in corners_meters
|
|
]
|
|
|
|
# Calculate geographic coordinates for each corner using forward projection
|
|
corners_geo_deg = []
|
|
center_lon_deg = math.degrees(center_lon_rad)
|
|
center_lat_deg = math.degrees(center_lat_rad)
|
|
|
|
for dx_m, dy_m in corners_meters_rotated:
|
|
# Calculate distance and azimuth from center to corner
|
|
distance = math.hypot(dx_m, dy_m)
|
|
# Azimuth: angle from North clockwise (atan2 gives angle from East CCW)
|
|
azimuth = math.degrees(math.atan2(dx_m, dy_m)) % 360.0 # Ensure 0-360 range
|
|
|
|
# Use pyproj geod.fwd for accurate projection
|
|
lon, lat, _ = geod.fwd(center_lon_deg, center_lat_deg, azimuth, distance)
|
|
corners_geo_deg.append((lon, lat)) # Store (longitude, latitude)
|
|
|
|
# Ensure we have exactly 4 corners
|
|
if len(corners_geo_deg) == 4:
|
|
# logging.debug(f"{log_prefix} Calculated corners (Lon, Lat): {corners_geo_deg}")
|
|
return corners_geo_deg
|
|
else:
|
|
logging.error(f"{log_prefix} Incorrect number of corners calculated.")
|
|
return None
|
|
|
|
except KeyError as ke:
|
|
logging.error(f"{log_prefix} Missing required key in geo_info_radians: {ke}")
|
|
return None
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error calculating geographic corners for KML:")
|
|
return None
|
|
|
|
|
|
def generate_sar_kml(geo_info_radians: Dict[str, Any], output_path: str) -> bool:
|
|
"""
|
|
Generates a KML file representing the SAR footprint area as a polygon.
|
|
|
|
Args:
|
|
geo_info_radians (Dict[str, Any]): Dictionary with SAR georeferencing.
|
|
output_path (str): The full path where the KML file should be saved.
|
|
|
|
Returns:
|
|
bool: True if KML generation and saving were successful, False otherwise.
|
|
"""
|
|
log_prefix = "[Utils KML Gen]"
|
|
# Check for required libraries
|
|
if not _simplekml_available or not _pyproj_available:
|
|
logging.error(f"{log_prefix} Cannot generate KML: simplekml or pyproj missing.")
|
|
return False
|
|
# Check for valid input geo_info
|
|
if not geo_info_radians or not geo_info_radians.get("valid", False):
|
|
logging.warning(f"{log_prefix} Cannot generate KML: Invalid geo_info provided.")
|
|
return False
|
|
|
|
try:
|
|
# Calculate geographic corner coordinates
|
|
corners_deg = _calculate_geo_corners_for_kml(geo_info_radians)
|
|
if corners_deg is None:
|
|
logging.error(f"{log_prefix} Failed to calculate KML corners.")
|
|
return False # Error already logged by helper
|
|
|
|
# Extract center and orientation for LookAt view
|
|
center_lon_deg = math.degrees(geo_info_radians["lon"])
|
|
center_lat_deg = math.degrees(geo_info_radians["lat"])
|
|
orientation_deg = math.degrees(geo_info_radians.get("orientation", 0.0))
|
|
|
|
# Estimate a reasonable viewing altitude based on image size
|
|
try:
|
|
width_km = (
|
|
geo_info_radians.get("scale_x", 1.0)
|
|
* geo_info_radians.get("width_px", 1000)
|
|
) / 1000.0
|
|
height_km = (
|
|
geo_info_radians.get("scale_y", 1.0)
|
|
* geo_info_radians.get("height_px", 1000)
|
|
) / 1000.0
|
|
# Simple heuristic for altitude (e.g., 2x the larger dimension in km * 1000)
|
|
view_altitude_m = max(width_km, height_km) * 2000.0
|
|
# Clamp altitude to a reasonable range
|
|
view_altitude_m = max(1000.0, min(view_altitude_m, 500000.0))
|
|
except:
|
|
view_altitude_m = 10000.0 # Default altitude on error
|
|
|
|
# Create KML object
|
|
kml = simplekml.Kml(name=f"SAR Image {datetime.datetime.now():%Y%m%d_%H%M%S}")
|
|
|
|
# Set LookAt view
|
|
kml.document.lookat.longitude = center_lon_deg
|
|
kml.document.lookat.latitude = center_lat_deg
|
|
kml.document.lookat.range = view_altitude_m
|
|
kml.document.lookat.tilt = 45.0 # Default tilt
|
|
kml.document.lookat.heading = orientation_deg % 360.0 # Ensure 0-360 heading
|
|
kml.document.lookat.altitudemode = simplekml.AltitudeMode.relativetoground
|
|
|
|
# Create the polygon footprint
|
|
# KML uses (longitude, latitude, altitude) tuples
|
|
# Close the polygon by repeating the first point at the end
|
|
outer_boundary = [(lon, lat, 0) for lon, lat in corners_deg] + [
|
|
(corners_deg[0][0], corners_deg[0][1], 0)
|
|
]
|
|
|
|
polygon = kml.newpolygon(name="SAR Footprint", outerboundaryis=outer_boundary)
|
|
|
|
# Style the polygon (red outline, semi-transparent red fill)
|
|
polygon.style.linestyle.color = simplekml.Color.red
|
|
polygon.style.linestyle.width = 2
|
|
polygon.style.polystyle.color = simplekml.Color.changealphaint(
|
|
100, simplekml.Color.red # ~40% opacity red fill
|
|
)
|
|
polygon.style.polystyle.outline = 1 # Ensure outline is drawn
|
|
|
|
# Save the KML file
|
|
kml.save(output_path)
|
|
logging.debug(f"{log_prefix} KML file saved successfully: {output_path}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error generating or saving KML file:")
|
|
return False
|
|
|
|
|
|
def generate_lookat_and_point_kml(
|
|
latitude_deg: float,
|
|
longitude_deg: float,
|
|
altitude_m: float = 10000.0,
|
|
tilt: float = 45.0,
|
|
heading: float = 0.0,
|
|
placemark_name: str = "Selected Location",
|
|
placemark_desc: Optional[str] = None,
|
|
) -> Optional[str]:
|
|
"""
|
|
Generates a temporary KML file containing a LookAt element and a Placemark point.
|
|
|
|
Args:
|
|
latitude_deg (float): Target latitude in decimal degrees.
|
|
longitude_deg (float): Target longitude in decimal degrees.
|
|
altitude_m (float): Viewing altitude in meters for LookAt.
|
|
tilt (float): Camera tilt angle for LookAt.
|
|
heading (float): Camera heading/azimuth for LookAt.
|
|
placemark_name (str): Name for the placemark.
|
|
placemark_desc (Optional[str]): Optional description for the placemark.
|
|
|
|
Returns:
|
|
Optional[str]: Path to the temporary KML file, or None on error.
|
|
"""
|
|
log_prefix = "[Utils KML LookAtPoint]"
|
|
if not _simplekml_available:
|
|
logging.error(f"{log_prefix} Cannot generate KML: simplekml missing.")
|
|
return None
|
|
|
|
# Validate coordinates
|
|
if not (
|
|
isinstance(latitude_deg, (int, float))
|
|
and math.isfinite(latitude_deg)
|
|
and -90 <= latitude_deg <= 90
|
|
):
|
|
logging.error(f"{log_prefix} Invalid latitude provided: {latitude_deg}.")
|
|
return None
|
|
if not (
|
|
isinstance(longitude_deg, (int, float))
|
|
and math.isfinite(longitude_deg)
|
|
and -180 <= longitude_deg <= 180
|
|
):
|
|
logging.error(f"{log_prefix} Invalid longitude provided: {longitude_deg}.")
|
|
return None
|
|
|
|
try:
|
|
kml = simplekml.Kml(name=placemark_name) # Use name for KML document too
|
|
|
|
# Set LookAt parameters
|
|
kml.document.lookat.longitude = longitude_deg
|
|
kml.document.lookat.latitude = latitude_deg
|
|
kml.document.lookat.range = max(100.0, altitude_m) # Min range 100m
|
|
kml.document.lookat.tilt = tilt
|
|
kml.document.lookat.heading = heading % 360.0
|
|
kml.document.lookat.altitudemode = simplekml.AltitudeMode.relativetoground
|
|
|
|
# Add the Placemark point
|
|
point = kml.newpoint(name=placemark_name)
|
|
# simplekml uses (lon, lat, optional_alt) order for coordinates
|
|
point.coords = [(longitude_deg, latitude_deg)]
|
|
if placemark_desc:
|
|
point.description = placemark_desc
|
|
|
|
# Create a temporary file (delete=False so GE can open it)
|
|
# Consider using a subfolder in the main app temp/cache dir?
|
|
with tempfile.NamedTemporaryFile(
|
|
mode="w", suffix=".kml", delete=False, encoding="utf-8"
|
|
) as temp_kml:
|
|
temp_kml_path = temp_kml.name
|
|
logging.debug(
|
|
f"{log_prefix} Saving temporary LookAt+Point KML to: {temp_kml_path}"
|
|
)
|
|
temp_kml.write(kml.kml()) # Write KML content
|
|
|
|
logging.debug(f"{log_prefix} LookAt+Point KML file created successfully.")
|
|
return temp_kml_path
|
|
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error generating LookAt+Point KML file:")
|
|
return None
|
|
|
|
|
|
def generate_multi_point_kml(
|
|
points_data: List[Tuple[float, float, str, Optional[str]]],
|
|
) -> Optional[str]:
|
|
"""
|
|
Generates a temporary KML file containing multiple Placemarks (points)
|
|
and a LookAt view potentially encompassing them.
|
|
|
|
Args:
|
|
points_data (List[Tuple[float, float, str, Optional[str]]]):
|
|
List of (latitude_deg, longitude_deg, placemark_name, optional_desc).
|
|
|
|
Returns:
|
|
Optional[str]: Path to the temporary KML file, or None on error.
|
|
"""
|
|
log_prefix = "[Utils KML MultiPoint]"
|
|
if not _simplekml_available:
|
|
logging.error(f"{log_prefix} Cannot generate KML: simplekml missing.")
|
|
return None
|
|
if not points_data:
|
|
logging.warning(f"{log_prefix} No points provided to generate KML.")
|
|
return None
|
|
|
|
try:
|
|
kml = simplekml.Kml(name="Multiple Locations")
|
|
|
|
# --- Determine LookAt View (Look at first point) ---
|
|
first_lat, first_lon, _, _ = points_data[0]
|
|
kml.document.lookat.longitude = first_lon
|
|
kml.document.lookat.latitude = first_lat
|
|
kml.document.lookat.range = 20000 # Default range for multiple points
|
|
kml.document.lookat.tilt = 30
|
|
kml.document.lookat.heading = 0
|
|
kml.document.lookat.altitudemode = simplekml.AltitudeMode.relativetoground
|
|
|
|
# --- Define styles ---
|
|
style_sar_center = simplekml.Style()
|
|
style_sar_center.iconstyle.icon.href = (
|
|
"http://maps.google.com/mapfiles/kml/paddle/red-stars.png"
|
|
)
|
|
style_sar_center.iconstyle.scale = 1.1
|
|
style_mouse_on_sar = simplekml.Style()
|
|
style_mouse_on_sar.iconstyle.icon.href = (
|
|
"http://maps.google.com/mapfiles/kml/paddle/blu-circle.png"
|
|
)
|
|
style_mouse_on_sar.iconstyle.scale = 1.1
|
|
style_mouse_on_map = simplekml.Style()
|
|
style_mouse_on_map.iconstyle.icon.href = (
|
|
"http://maps.google.com/mapfiles/kml/paddle/grn-diamond.png"
|
|
)
|
|
style_mouse_on_map.iconstyle.scale = 1.1
|
|
|
|
# --- Add Placemarks ---
|
|
logging.debug(f"{log_prefix} Adding {len(points_data)} placemarks to KML.")
|
|
for lat, lon, name, desc in points_data:
|
|
point = kml.newpoint(name=name)
|
|
point.coords = [(lon, lat)] # (lon, lat) order
|
|
if desc:
|
|
point.description = desc
|
|
|
|
# Apply style based on name
|
|
if name == "SAR Center":
|
|
point.style = style_sar_center
|
|
elif name == "Mouse on SAR":
|
|
point.style = style_mouse_on_sar
|
|
elif name == "Mouse on Map":
|
|
point.style = style_mouse_on_map
|
|
# else: default style
|
|
|
|
# --- Save to Temporary File ---
|
|
with tempfile.NamedTemporaryFile(
|
|
mode="w", suffix=".kml", delete=False, encoding="utf-8"
|
|
) as temp_kml:
|
|
temp_kml_path = temp_kml.name
|
|
logging.debug(
|
|
f"{log_prefix} Saving temporary Multi-Point KML to: {temp_kml_path}"
|
|
)
|
|
temp_kml.write(kml.kml())
|
|
|
|
logging.debug(f"{log_prefix} Multi-Point KML file created successfully.")
|
|
return temp_kml_path
|
|
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error generating Multi-Point KML file:")
|
|
return None
|
|
|
|
|
|
def launch_google_earth(kml_path: Optional[str]):
|
|
"""
|
|
Attempts to open a KML file using the system's default application
|
|
or google-earth-pro directly.
|
|
|
|
Args:
|
|
kml_path (Optional[str]): The path to the KML file to launch.
|
|
"""
|
|
log_prefix = "[Utils Launch GE]"
|
|
if not kml_path or not os.path.exists(kml_path):
|
|
logging.error(
|
|
f"{log_prefix} Cannot launch: KML file path invalid or not found: {kml_path}"
|
|
)
|
|
return
|
|
logging.info(f"{log_prefix} Attempting launch for KML: {kml_path}")
|
|
try:
|
|
if sys.platform == "win32":
|
|
# Use os.startfile on Windows
|
|
os.startfile(kml_path)
|
|
elif sys.platform == "darwin":
|
|
# Use 'open' command on macOS
|
|
subprocess.run(["open", kml_path], check=True)
|
|
else:
|
|
# Use 'google-earth-pro' or 'xdg-open' on Linux/Unix
|
|
cmd = shutil.which("google-earth-pro") or shutil.which("xdg-open")
|
|
if cmd:
|
|
logging.debug(f"{log_prefix} Using command: {cmd}")
|
|
if cmd.endswith("pro"):
|
|
# Popen for GE Pro to run in background potentially
|
|
subprocess.Popen([cmd, kml_path])
|
|
else:
|
|
# run for xdg-open, wait for it?
|
|
subprocess.run([cmd, kml_path], check=True)
|
|
else:
|
|
# Log error if no suitable command found
|
|
err_msg = "Neither google-earth-pro nor xdg-open found in PATH."
|
|
logging.error(f"{log_prefix} {err_msg}")
|
|
raise FileNotFoundError(err_msg)
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error launching KML handler:")
|
|
|
|
|
|
def cleanup_old_kml_files(kml_directory: str, max_files_to_keep: int):
|
|
"""Removes the oldest KML files from the directory if count exceeds limit."""
|
|
log_prefix = "[Utils KML Cleanup]"
|
|
# Skip if cleanup is disabled or directory invalid
|
|
if max_files_to_keep <= 0:
|
|
return
|
|
try:
|
|
kml_dir_path = Path(kml_directory)
|
|
if not kml_dir_path.is_dir():
|
|
logging.warning(
|
|
f"{log_prefix} KML directory not found: '{kml_directory}'. Cannot cleanup."
|
|
)
|
|
return
|
|
|
|
# Get list of KML files with modification times
|
|
kml_files = []
|
|
for item in kml_dir_path.glob("*.kml"):
|
|
if item.is_file():
|
|
try:
|
|
# Get modification time
|
|
mtime = item.stat().st_mtime
|
|
kml_files.append((item, mtime))
|
|
except Exception as stat_e:
|
|
# Log error getting stats but continue with others
|
|
logging.warning(
|
|
f"{log_prefix} Could not stat file '{item.name}': {stat_e}"
|
|
)
|
|
|
|
current_count = len(kml_files)
|
|
# Only proceed if count exceeds the limit
|
|
if current_count <= max_files_to_keep:
|
|
return
|
|
|
|
# Sort files by modification time (oldest first)
|
|
kml_files.sort(key=lambda x: x[1])
|
|
|
|
# Determine number of files to delete
|
|
num_to_delete = current_count - max_files_to_keep
|
|
# Get the list of files to delete
|
|
files_to_delete = kml_files[:num_to_delete]
|
|
|
|
logging.info(
|
|
f"{log_prefix} Found {current_count} KML files. "
|
|
f"Attempting to delete oldest {num_to_delete}..."
|
|
)
|
|
deleted_count = 0
|
|
# Iterate and delete oldest files
|
|
for file_path, _ in files_to_delete:
|
|
try:
|
|
file_path.unlink() # Delete the file
|
|
logging.debug(f"{log_prefix} Deleted old KML: {file_path.name}")
|
|
deleted_count += 1
|
|
except Exception as delete_e:
|
|
# Log error deleting specific file but continue
|
|
logging.error(
|
|
f"{log_prefix} Failed to delete file '{file_path.name}': {delete_e}"
|
|
)
|
|
logging.info(
|
|
f"{log_prefix} Cleanup finished. Deleted {deleted_count}/{num_to_delete} files."
|
|
)
|
|
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error during KML cleanup process:")
|
|
|
|
|
|
# --- END OF FILE utils.py ---
|