1121 lines
44 KiB
Python
1121 lines
44 KiB
Python
# --- START OF FILE utils.py ---
|
|
|
|
# utils.py
|
|
"""
|
|
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
|
|
BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
|
|
PARTICULAR PURPOSE ARE DISCLAIMED.
|
|
|
|
Provides utility functions used throughout the application, including
|
|
safe queue management (put, clear), coordinate formatting (decimal degrees to DMS
|
|
and DMS string to decimal), KML generation and management, and opening web maps.
|
|
Uses standardized logging prefixes. Drop counts are managed within AppState.
|
|
"""
|
|
|
|
# Standard library imports
|
|
import queue
|
|
import logging
|
|
import math
|
|
import os
|
|
import datetime
|
|
import sys
|
|
import subprocess
|
|
import shutil
|
|
from pathlib import Path
|
|
import webbrowser
|
|
import urllib.parse
|
|
import re # For DMS parsing
|
|
from typing import Optional, Tuple, List, Any, Dict
|
|
import tempfile
|
|
import ctypes # Needed for format_ctypes_structure
|
|
|
|
# Third-party imports (ensure these are installed)
|
|
import numpy as np
|
|
|
|
# --- Availability Flags & Library Imports ---
|
|
# Define flags *before* use, handle ImportErrors
|
|
|
|
_simplekml_available = False
|
|
try:
|
|
import simplekml
|
|
_simplekml_available = True
|
|
except ImportError:
|
|
simplekml = None
|
|
logging.warning(
|
|
"[Utils Init] Library 'simplekml' not found. KML generation disabled. "
|
|
"(pip install simplekml)"
|
|
)
|
|
|
|
_pyproj_available = False
|
|
try:
|
|
import pyproj
|
|
_pyproj_available = True
|
|
except ImportError:
|
|
pyproj = None
|
|
logging.warning(
|
|
"[Utils Init] Library 'pyproj' not found. "
|
|
"Some geometric calculations (KML corners, BBox) may be less accurate. "
|
|
"(pip install pyproj)"
|
|
)
|
|
|
|
_cv2_available = False
|
|
try:
|
|
import cv2
|
|
_cv2_available = True
|
|
except ImportError:
|
|
cv2 = None
|
|
logging.warning(
|
|
"[Utils Init] Library 'opencv-python' not found. "
|
|
"Cannot save images for KML overlays."
|
|
)
|
|
|
|
_lxml_available = False
|
|
try:
|
|
from lxml import etree
|
|
_lxml_available = True
|
|
except ImportError:
|
|
etree = None
|
|
logging.warning("[Utils Init] Library 'lxml' not found. Cannot use gx:LatLonQuad fallback.")
|
|
|
|
# --- Local application imports (after third-party and flag definitions) ---
|
|
import config # Import config after potential library warnings
|
|
|
|
_apply_color_palette_available = False
|
|
try:
|
|
# Import only after cv2 availability check might be safer, but fine here for now
|
|
from image_processing import apply_color_palette
|
|
_apply_color_palette_available = True
|
|
except ImportError:
|
|
apply_color_palette = None
|
|
logging.warning(
|
|
"[Utils Init] Function 'apply_color_palette' not found. "
|
|
"Color palettes in KML overlay might not work."
|
|
)
|
|
|
|
|
|
# --- Queue Management Functions ---
|
|
|
|
def put_queue(queue_obj, item, queue_name="Unknown", app_instance=None):
|
|
"""
|
|
Safely puts an item into a queue using non-blocking put.
|
|
Increments specific drop counters in the AppState object if the queue is full.
|
|
Discards item if the application is shutting down.
|
|
|
|
Args:
|
|
queue_obj (queue.Queue): The queue instance.
|
|
item (Any): The item to put into the queue.
|
|
queue_name (str): Name of the queue (for logging/stats).
|
|
app_instance (Optional[ControlPanelApp]): Reference to the main app instance
|
|
(for shutdown check and drop count).
|
|
"""
|
|
log_prefix = "[Utils Queue Put]"
|
|
# Check shutdown flag via app_instance's state
|
|
if (
|
|
app_instance
|
|
and hasattr(app_instance, "state")
|
|
and app_instance.state.shutting_down
|
|
):
|
|
return # Silently discard if shutting down
|
|
|
|
try:
|
|
# Put item without blocking
|
|
queue_obj.put(item, block=False)
|
|
except queue.Full:
|
|
# Log queue full warning
|
|
logging.warning(f"{log_prefix} Queue '{queue_name}' is full. Dropping item.")
|
|
# Increment the specific drop counter via the AppState instance
|
|
if app_instance and hasattr(app_instance, "state"):
|
|
try:
|
|
app_instance.state.increment_dropped_count(queue_name)
|
|
except Exception as e:
|
|
logging.error(
|
|
f"{log_prefix} Failed to increment drop count for '{queue_name}': {e}"
|
|
)
|
|
else:
|
|
logging.error(
|
|
f"{log_prefix} Cannot increment drop count for '{queue_name}': "
|
|
"App instance or state missing."
|
|
)
|
|
except Exception as e:
|
|
# Log any other unexpected errors during queue put
|
|
logging.exception(
|
|
f"{log_prefix} Unexpected error putting item onto queue '{queue_name}': {e}"
|
|
)
|
|
|
|
|
|
def clear_queue(q: queue.Queue):
|
|
"""Removes all items currently in the specified queue."""
|
|
log_prefix = "[Utils Queue Clear]"
|
|
try:
|
|
q_size_before = q.qsize() # Get approximate size before clearing
|
|
# Ensure thread safety during clear operation
|
|
with q.mutex:
|
|
q.queue.clear()
|
|
# Reset task tracking if used
|
|
if hasattr(q, 'unfinished_tasks'):
|
|
q.unfinished_tasks = 0
|
|
logging.debug(
|
|
f"{log_prefix} Cleared queue (approx size before: {q_size_before})."
|
|
)
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error clearing queue: {e}")
|
|
|
|
|
|
# --- Coordinate Formatting ---
|
|
|
|
def decimal_to_dms(decimal_degrees: float, is_latitude: bool) -> str:
|
|
"""
|
|
Converts decimal degrees to a formatted DMS (Degrees, Minutes, Seconds) string.
|
|
Returns "N/A", "Invalid Lat/Lon", or "Error DMS" on error.
|
|
|
|
Args:
|
|
decimal_degrees (float): Coordinate value in decimal degrees.
|
|
is_latitude (bool): True if latitude, False if longitude.
|
|
|
|
Returns:
|
|
str: Formatted DMS string or an error indicator.
|
|
"""
|
|
log_prefix = "[Utils DMS Conv]"
|
|
coord_type = "Latitude" if is_latitude else "Longitude"
|
|
|
|
try:
|
|
# Input validation
|
|
if (
|
|
not isinstance(decimal_degrees, (int, float))
|
|
or math.isnan(decimal_degrees)
|
|
or math.isinf(decimal_degrees)
|
|
):
|
|
logging.warning(
|
|
f"{log_prefix} Invalid input type or value ({decimal_degrees}) "
|
|
f"for {coord_type}."
|
|
)
|
|
return "N/A"
|
|
|
|
# Range check and direction determination
|
|
if is_latitude:
|
|
if not (-90.0 <= decimal_degrees <= 90.0):
|
|
logging.warning(
|
|
f"{log_prefix} Latitude {decimal_degrees} out of range."
|
|
)
|
|
return "Invalid Lat"
|
|
direction = "N" if decimal_degrees >= 0 else "S"
|
|
deg_pad = 2 # Padding for degrees (e.g., 01°, 89°)
|
|
else: # Longitude
|
|
if not (-180.0 <= decimal_degrees <= 180.0):
|
|
logging.warning(
|
|
f"{log_prefix} Longitude {decimal_degrees} out of range."
|
|
)
|
|
return "Invalid Lon"
|
|
direction = "E" if decimal_degrees >= 0 else "W"
|
|
deg_pad = 3 # Padding for degrees (e.g., 001°, 179°)
|
|
|
|
# Calculations
|
|
dd_abs = abs(decimal_degrees)
|
|
degrees = math.floor(dd_abs)
|
|
minutes_dec = (dd_abs - degrees) * 60.0
|
|
minutes = math.floor(minutes_dec)
|
|
seconds = (minutes_dec - minutes) * 60.0
|
|
|
|
# Handle floating point precision near 60.0 for seconds/minutes
|
|
if abs(seconds - 60.0) < 1e-9:
|
|
seconds = 0.0
|
|
minutes += 1
|
|
if minutes == 60:
|
|
minutes = 0
|
|
degrees += 1
|
|
# Recalculate absolute degrees if degrees incremented past range limit
|
|
if is_latitude and degrees > 90:
|
|
degrees = 90
|
|
minutes = 0
|
|
seconds = 0.0
|
|
if not is_latitude and degrees > 180:
|
|
degrees = 180
|
|
minutes = 0
|
|
seconds = 0.0
|
|
|
|
# Format the final string
|
|
dms_string = (
|
|
f"{degrees:0{deg_pad}d}° {minutes:02d}' {seconds:05.2f}\" {direction}"
|
|
)
|
|
return dms_string
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Error converting {decimal_degrees} to DMS: {e}"
|
|
)
|
|
return "Error DMS"
|
|
|
|
|
|
def dms_string_to_decimal(dms_str: str, is_latitude: bool) -> Optional[float]:
|
|
"""
|
|
Converts a DMS string (e.g., "46° 09' 50.24\" N", "009° 23' 14.11\" E")
|
|
to decimal degrees.
|
|
|
|
Args:
|
|
dms_str (str): The DMS string to parse.
|
|
is_latitude (bool): True if the string represents latitude, False for longitude.
|
|
|
|
Returns:
|
|
Optional[float]: The coordinate in decimal degrees, or None on parsing error.
|
|
"""
|
|
log_prefix = "[Utils DMS Parse]"
|
|
if not dms_str or not isinstance(dms_str, str):
|
|
logging.warning(f"{log_prefix} Invalid input string: {dms_str}")
|
|
return None
|
|
|
|
# Regex to capture degrees, minutes, seconds, and direction
|
|
pattern = re.compile(
|
|
r"^\s*(\d{1,3})\s*[°]\s*" # Degrees (1-3 digits) + ° symbol
|
|
r"(\d{1,2})\s*[']\s*" # Minutes (1-2 digits) + ' symbol
|
|
r"([\d.]+)\s*[\"”]\s*" # Seconds (digits, dot) + " or ” symbol
|
|
r"([NSEWnsew])\s*$", # Direction (N,S,E,W case-insensitive)
|
|
re.IGNORECASE,
|
|
)
|
|
match = pattern.match(dms_str.strip())
|
|
|
|
if not match:
|
|
logging.error(f"{log_prefix} Could not parse DMS string format: '{dms_str}'")
|
|
return None
|
|
|
|
try:
|
|
# Extract components as floats
|
|
degrees = float(match.group(1))
|
|
minutes = float(match.group(2))
|
|
seconds = float(match.group(3))
|
|
direction = match.group(4).upper()
|
|
|
|
# Validate component ranges
|
|
if not (0 <= minutes < 60 and 0 <= seconds < 60):
|
|
logging.error(
|
|
f"{log_prefix} Invalid minutes ({minutes}) or seconds ({seconds}) "
|
|
f"in DMS: '{dms_str}'"
|
|
)
|
|
return None
|
|
|
|
# Validate direction based on coordinate type
|
|
valid_dirs = ("N", "S") if is_latitude else ("E", "W")
|
|
if direction not in valid_dirs:
|
|
coord_type = "latitude" if is_latitude else "longitude"
|
|
logging.error(
|
|
f"{log_prefix} Invalid direction '{direction}' for {coord_type}: "
|
|
f"'{dms_str}'"
|
|
)
|
|
return None
|
|
|
|
# Calculate decimal degrees
|
|
decimal_degrees = degrees + (minutes / 60.0) + (seconds / 3600.0)
|
|
|
|
# Apply sign based on direction
|
|
if direction in ("S", "W"):
|
|
decimal_degrees *= -1.0
|
|
|
|
# Final range check for the calculated decimal value
|
|
if is_latitude and not (-90.0 <= decimal_degrees <= 90.0):
|
|
logging.warning(
|
|
f"{log_prefix} Calculated latitude {decimal_degrees:.7f} out of range."
|
|
)
|
|
return None # Treat out of range as error
|
|
if not is_latitude and not (-180.0 <= decimal_degrees <= 180.0):
|
|
logging.warning(
|
|
f"{log_prefix} Calculated longitude {decimal_degrees:.7f} out of range."
|
|
)
|
|
return None
|
|
|
|
logging.debug(f"{log_prefix} Parsed '{dms_str}' -> {decimal_degrees:.7f}")
|
|
return decimal_degrees
|
|
|
|
except ValueError as ve:
|
|
# Handle errors during float conversion
|
|
logging.error(
|
|
f"{log_prefix} Error converting components to float: {ve} in '{dms_str}'"
|
|
)
|
|
return None
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Unexpected error parsing DMS string '{dms_str}':"
|
|
)
|
|
return None
|
|
|
|
|
|
# --- Metadata Formatting Function ---
|
|
|
|
def format_ctypes_structure(structure: ctypes.Structure, indent_level: int = 0) -> str:
|
|
"""
|
|
Generates a formatted string representation of a ctypes Structure,
|
|
handling nested structures, arrays, and basic types recursively.
|
|
|
|
Args:
|
|
structure: The ctypes.Structure instance to format.
|
|
indent_level: The current indentation level for nested structures.
|
|
|
|
Returns:
|
|
A multi-line string representing the structure's content.
|
|
"""
|
|
indent = " " * indent_level # Define indentation string
|
|
result = ""
|
|
|
|
# Check if it's actually a structure with _fields_ attribute
|
|
if not hasattr(structure, "_fields_"):
|
|
# Fallback for non-structures or basic types passed accidentally
|
|
return f"{indent}Value: {repr(structure)}\n"
|
|
|
|
# Iterate through the fields defined in the structure
|
|
for field_name, field_type in structure._fields_:
|
|
try:
|
|
# Get the value of the current field
|
|
value = getattr(structure, field_name)
|
|
formatted_value = ""
|
|
|
|
# --- Handle Nested Structures ---
|
|
if hasattr(field_type, "_fields_"):
|
|
# If the field type itself is another Structure
|
|
result += f"{indent}{field_name} ({field_type.__name__}):\n"
|
|
# Recursively call formatting for the nested structure
|
|
result += format_ctypes_structure(value, indent_level + 1)
|
|
|
|
# --- Handle Arrays ---
|
|
elif issubclass(field_type, ctypes.Array):
|
|
array_len = getattr(field_type, "_length_", "N/A")
|
|
elem_type = getattr(field_type, "_type_", "N/A")
|
|
elem_name = getattr(elem_type, "__name__", "?")
|
|
|
|
# Special handling for byte arrays (show hex preview)
|
|
if issubclass(elem_type, (ctypes.c_byte, ctypes.c_ubyte)):
|
|
try:
|
|
# Attempt conversion to bytes for hex preview
|
|
byte_value = bytes(value[:16]) # Limit preview length
|
|
preview = byte_value.hex().upper()
|
|
if len(value) > 16:
|
|
preview += "..."
|
|
formatted_value = f"<{elem_name}[{array_len}] Data: {preview}>"
|
|
except: # Fallback if conversion to bytes fails
|
|
formatted_value = f"<{elem_name}[{array_len}] (Preview N/A)>"
|
|
|
|
# Handle simple numeric arrays (show first few elements)
|
|
elif isinstance(value, (list, tuple)):
|
|
preview = str(list(value[:8])) # Limit preview length
|
|
if len(value) > 8:
|
|
# Indicate truncation
|
|
preview = preview[:-1] + ", ...]"
|
|
formatted_value = f"<{elem_name}[{array_len}] Data: {preview}>"
|
|
else:
|
|
# For other array types, just show type and size
|
|
formatted_value = f"<{elem_name}[{array_len}]>"
|
|
|
|
result += f"{indent}{field_name}: {formatted_value}\n"
|
|
|
|
# --- Handle Basic Types ---
|
|
else:
|
|
# Special Formatting for Known Fields
|
|
if field_name in (
|
|
"LATITUDE",
|
|
"LONGITUDE",
|
|
"ORIENTATION",
|
|
"POI_LATITUDE",
|
|
"POI_LONGITUDE",
|
|
"POI_ORIENTATION",
|
|
) and isinstance(value, float):
|
|
if math.isfinite(value):
|
|
deg_value = math.degrees(value)
|
|
prec = 2 if "ORIENTATION" in field_name else 6
|
|
formatted_value = f"{value:.7f} rad ({deg_value:.{prec}f} deg)"
|
|
else:
|
|
formatted_value = f"{value} rad (Non-finite)"
|
|
elif field_name in (
|
|
"SFP_MARKER",
|
|
"SFP_PT_SPARE",
|
|
"SFP_TAG",
|
|
"SFP_SRC",
|
|
"SFP_TID",
|
|
"SFP_FLAGS",
|
|
"SFP_WIN",
|
|
"SFP_ERR",
|
|
"SFP_ERR_INFO",
|
|
"SFP_RECTYPE",
|
|
"SFP_RECSPARE",
|
|
"SFP_PLDAP",
|
|
"SFP_PLEXT",
|
|
) and isinstance(value, int):
|
|
formatted_value = f"0x{value:02X} ({value})"
|
|
elif field_name in ("SFP_DIRECTION", "SFP_FLOW") and isinstance(
|
|
value, int
|
|
):
|
|
char_repr = chr(value) if 32 <= value <= 126 else "?"
|
|
formatted_value = f"0x{value:02X} ('{char_repr}')"
|
|
elif isinstance(value, bytes):
|
|
try:
|
|
decoded = value.decode("ascii", errors="replace")
|
|
hex_preview = value.hex().upper()[:32] # Limit hex preview
|
|
suffix = "..." if len(value) > 16 else ""
|
|
formatted_value = f"'{decoded}' (hex: {hex_preview}{suffix})"
|
|
except:
|
|
formatted_value = repr(value) # Fallback if not decodable
|
|
else:
|
|
# Default representation for other types
|
|
formatted_value = repr(value)
|
|
|
|
result += f"{indent}{field_name}: {formatted_value}\n"
|
|
|
|
except AttributeError:
|
|
# Handle cases where getattr might fail
|
|
result += f"{indent}{field_name}: <Attribute Error>\n"
|
|
except Exception as e:
|
|
# Catch any other formatting errors for a specific field
|
|
result += f"{indent}{field_name}: <Error Formatting: {e}>\n"
|
|
|
|
return result
|
|
|
|
|
|
# --- Google Maps Launcher ---
|
|
|
|
def open_google_maps(latitude_deg: float, longitude_deg: float):
|
|
"""Opens the default web browser to Google Maps centered on the specified coordinates."""
|
|
log_prefix = "[Utils Gmaps]"
|
|
# Validate coordinates
|
|
if not (
|
|
isinstance(latitude_deg, (int, float))
|
|
and math.isfinite(latitude_deg)
|
|
and -90 <= latitude_deg <= 90
|
|
):
|
|
logging.error(f"{log_prefix} Invalid latitude: {latitude_deg}.")
|
|
return
|
|
if not (
|
|
isinstance(longitude_deg, (int, float))
|
|
and math.isfinite(longitude_deg)
|
|
and -180 <= longitude_deg <= 180
|
|
):
|
|
logging.error(f"{log_prefix} Invalid longitude: {longitude_deg}.")
|
|
return
|
|
try:
|
|
# Construct URL using the search API for better pinning
|
|
query_str = f"{latitude_deg:.7f},{longitude_deg:.7f}"
|
|
# URL encode the query string
|
|
encoded_query = urllib.parse.quote(query_str)
|
|
gmaps_url = f"https://www.google.com/maps/search/?api=1&query={encoded_query}"
|
|
|
|
logging.info(f"{log_prefix} Opening Google Maps URL: {gmaps_url}")
|
|
# Open in a new tab if possible
|
|
opened_ok = webbrowser.open_new_tab(gmaps_url)
|
|
if not opened_ok:
|
|
logging.warning(f"{log_prefix} webbrowser.open_new_tab returned False.")
|
|
# Fallback attempt if new tab fails
|
|
webbrowser.open(gmaps_url)
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Failed to open Google Maps URL:")
|
|
|
|
|
|
# --- KML Generation and Management ---
|
|
|
|
def _calculate_geo_corners_for_kml(
|
|
geo_info_radians: Dict[str, Any],
|
|
) -> Optional[List[Tuple[float, float]]]:
|
|
"""Internal helper to calculate geographic corners (degrees) from geo_info (radians). Requires pyproj."""
|
|
# Check if pyproj is available
|
|
if not _pyproj_available:
|
|
logging.error("[Utils KML Calc] pyproj library needed for corner calculation.")
|
|
return None
|
|
|
|
log_prefix = "[Utils KML Calc]"
|
|
try:
|
|
# Define the geodetic calculator (WGS84 ellipsoid)
|
|
geod = pyproj.Geod(ellps="WGS84")
|
|
|
|
# Extract necessary info from the dictionary
|
|
center_lat_rad = geo_info_radians["lat"]
|
|
center_lon_rad = geo_info_radians["lon"]
|
|
orient_rad = geo_info_radians["orientation"]
|
|
calc_orient_rad = -orient_rad # Angle used for projection
|
|
|
|
ref_x = geo_info_radians["ref_x"]
|
|
ref_y = geo_info_radians["ref_y"]
|
|
scale_x = geo_info_radians["scale_x"]
|
|
scale_y = geo_info_radians["scale_y"]
|
|
width = geo_info_radians["width_px"]
|
|
height = geo_info_radians["height_px"]
|
|
|
|
# Validate inputs
|
|
if not (scale_x > 0 and scale_y > 0 and width > 0 and height > 0):
|
|
logging.error(f"{log_prefix} Invalid scale or dimensions in geo_info.")
|
|
return None
|
|
|
|
# Define corner pixel coordinates relative to reference pixel
|
|
corners_pixel_delta = [
|
|
(0 - ref_x, ref_y - 0), # Top-Left Delta
|
|
(width - 1 - ref_x, ref_y - 0), # Top-Right Delta
|
|
(width - 1 - ref_x, ref_y - (height - 1)), # Bottom-Right Delta
|
|
(0 - ref_x, ref_y - (height - 1)), # Bottom-Left Delta
|
|
]
|
|
|
|
# Convert pixel deltas to meter deltas (unrotated)
|
|
corners_meters = [
|
|
(dx * scale_x, dy * scale_y) for dx, dy in corners_pixel_delta
|
|
]
|
|
|
|
# Rotate meter deltas based on orientation if significant
|
|
corners_meters_rotated = corners_meters
|
|
if abs(calc_orient_rad) > 1e-6: # Only rotate if angle is non-zero
|
|
cos_o = math.cos(calc_orient_rad)
|
|
sin_o = math.sin(calc_orient_rad)
|
|
corners_meters_rotated = [
|
|
(dx * cos_o - dy * sin_o, dx * sin_o + dy * cos_o)
|
|
for dx, dy in corners_meters
|
|
]
|
|
|
|
# Calculate geographic coordinates for each corner using forward projection
|
|
corners_geo_deg = []
|
|
center_lon_deg = math.degrees(center_lon_rad)
|
|
center_lat_deg = math.degrees(center_lat_rad)
|
|
|
|
# Loop through rotated meter deltas
|
|
for dx_m, dy_m in corners_meters_rotated:
|
|
# Calculate distance and azimuth from center to corner
|
|
distance = math.hypot(dx_m, dy_m)
|
|
azimuth = math.degrees(math.atan2(dx_m, dy_m)) % 360.0
|
|
|
|
# Use pyproj geod.fwd for accurate projection
|
|
lon, lat, _ = geod.fwd(center_lon_deg, center_lat_deg, azimuth, distance)
|
|
corners_geo_deg.append((lon, lat)) # Store (longitude, latitude)
|
|
|
|
# Ensure we have exactly 4 corners
|
|
if len(corners_geo_deg) == 4:
|
|
return corners_geo_deg
|
|
else:
|
|
logging.error(f"{log_prefix} Incorrect number of corners calculated.")
|
|
return None
|
|
|
|
except KeyError as ke:
|
|
logging.error(f"{log_prefix} Missing required key in geo_info_radians: {ke}")
|
|
return None
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error calculating geographic corners for KML:")
|
|
return None
|
|
|
|
|
|
def generate_sar_kml(geo_info_radians: Dict[str, Any], output_path: str) -> bool:
|
|
"""
|
|
Generates a KML file representing the SAR footprint area as a polygon.
|
|
|
|
Args:
|
|
geo_info_radians (Dict[str, Any]): Dictionary with SAR georeferencing.
|
|
output_path (str): The full path where the KML file should be saved.
|
|
|
|
Returns:
|
|
bool: True if KML generation and saving were successful, False otherwise.
|
|
"""
|
|
log_prefix = "[Utils KML Gen]"
|
|
# Check for required libraries
|
|
if not _simplekml_available or not _pyproj_available:
|
|
logging.error(f"{log_prefix} Cannot generate KML: simplekml or pyproj missing.")
|
|
return False
|
|
# Check for valid input geo_info
|
|
if not geo_info_radians or not geo_info_radians.get("valid", False):
|
|
logging.warning(f"{log_prefix} Cannot generate KML: Invalid geo_info provided.")
|
|
return False
|
|
|
|
try:
|
|
# Calculate geographic corner coordinates
|
|
corners_deg = _calculate_geo_corners_for_kml(geo_info_radians)
|
|
if corners_deg is None:
|
|
logging.error(f"{log_prefix} Failed to calculate KML corners.")
|
|
return False # Error already logged by helper
|
|
|
|
# Extract center and orientation for LookAt view
|
|
center_lon_deg = math.degrees(geo_info_radians["lon"])
|
|
center_lat_deg = math.degrees(geo_info_radians["lat"])
|
|
orientation_deg = math.degrees(geo_info_radians.get("orientation", 0.0))
|
|
|
|
# Estimate a reasonable viewing altitude based on image size
|
|
try:
|
|
width_km = (
|
|
geo_info_radians.get("scale_x", 1.0)
|
|
* geo_info_radians.get("width_px", 1000)
|
|
) / 1000.0
|
|
height_km = (
|
|
geo_info_radians.get("scale_y", 1.0)
|
|
* geo_info_radians.get("height_px", 1000)
|
|
) / 1000.0
|
|
view_altitude_m = max(width_km, height_km) * 2000.0
|
|
view_altitude_m = max(1000.0, min(view_altitude_m, 500000.0))
|
|
except:
|
|
view_altitude_m = 10000.0 # Default altitude on error
|
|
|
|
# Create KML object
|
|
kml = simplekml.Kml(name=f"SAR Image {datetime.datetime.now():%Y%m%d_%H%M%S}")
|
|
|
|
# Set LookAt view
|
|
kml.document.lookat.longitude = center_lon_deg
|
|
kml.document.lookat.latitude = center_lat_deg
|
|
kml.document.lookat.range = view_altitude_m
|
|
kml.document.lookat.tilt = 45.0
|
|
kml.document.lookat.heading = orientation_deg % 360.0
|
|
kml.document.lookat.altitudemode = simplekml.AltitudeMode.relativetoground
|
|
|
|
# Create the polygon footprint
|
|
outer_boundary = [(lon, lat, 0) for lon, lat in corners_deg] + [
|
|
(corners_deg[0][0], corners_deg[0][1], 0)
|
|
]
|
|
polygon = kml.newpolygon(name="SAR Footprint", outerboundaryis=outer_boundary)
|
|
|
|
# Style the polygon
|
|
polygon.style.linestyle.color = simplekml.Color.red
|
|
polygon.style.linestyle.width = 2
|
|
polygon.style.polystyle.color = simplekml.Color.changealphaint(
|
|
100, simplekml.Color.red
|
|
)
|
|
polygon.style.polystyle.outline = 1
|
|
|
|
# Save the KML file
|
|
kml.save(output_path)
|
|
logging.debug(f"{log_prefix} KML file saved successfully: {output_path}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error generating or saving KML file:")
|
|
return False
|
|
|
|
|
|
def generate_lookat_and_point_kml(
|
|
latitude_deg: float,
|
|
longitude_deg: float,
|
|
altitude_m: float = 10000.0,
|
|
tilt: float = 45.0,
|
|
heading: float = 0.0,
|
|
placemark_name: str = "Selected Location",
|
|
placemark_desc: Optional[str] = None,
|
|
) -> Optional[str]:
|
|
"""
|
|
Generates a temporary KML file containing a LookAt element and a Placemark point.
|
|
|
|
Args:
|
|
latitude_deg (float): Target latitude in decimal degrees.
|
|
longitude_deg (float): Target longitude in decimal degrees.
|
|
altitude_m (float): Viewing altitude in meters for LookAt.
|
|
tilt (float): Camera tilt angle for LookAt.
|
|
heading (float): Camera heading/azimuth for LookAt.
|
|
placemark_name (str): Name for the placemark.
|
|
placemark_desc (Optional[str]): Optional description for the placemark.
|
|
|
|
Returns:
|
|
Optional[str]: Path to the temporary KML file, or None on error.
|
|
"""
|
|
log_prefix = "[Utils KML LookAtPoint]"
|
|
if not _simplekml_available:
|
|
logging.error(f"{log_prefix} Cannot generate KML: simplekml missing.")
|
|
return None
|
|
|
|
# Validate coordinates
|
|
if not (
|
|
isinstance(latitude_deg, (int, float))
|
|
and math.isfinite(latitude_deg)
|
|
and -90 <= latitude_deg <= 90
|
|
):
|
|
logging.error(f"{log_prefix} Invalid latitude provided: {latitude_deg}.")
|
|
return None
|
|
if not (
|
|
isinstance(longitude_deg, (int, float))
|
|
and math.isfinite(longitude_deg)
|
|
and -180 <= longitude_deg <= 180
|
|
):
|
|
logging.error(f"{log_prefix} Invalid longitude provided: {longitude_deg}.")
|
|
return None
|
|
|
|
try:
|
|
kml = simplekml.Kml(name=placemark_name)
|
|
|
|
# Set LookAt parameters
|
|
kml.document.lookat.longitude = longitude_deg
|
|
kml.document.lookat.latitude = latitude_deg
|
|
kml.document.lookat.range = max(100.0, altitude_m)
|
|
kml.document.lookat.tilt = tilt
|
|
kml.document.lookat.heading = heading % 360.0
|
|
kml.document.lookat.altitudemode = simplekml.AltitudeMode.relativetoground
|
|
|
|
# Add the Placemark point
|
|
point = kml.newpoint(name=placemark_name)
|
|
point.coords = [(longitude_deg, latitude_deg)] # (lon, lat) order
|
|
if placemark_desc:
|
|
point.description = placemark_desc
|
|
|
|
# Create a temporary file
|
|
with tempfile.NamedTemporaryFile(
|
|
mode="w", suffix=".kml", delete=False, encoding="utf-8"
|
|
) as temp_kml:
|
|
temp_kml_path = temp_kml.name
|
|
logging.debug(
|
|
f"{log_prefix} Saving temporary LookAt+Point KML to: {temp_kml_path}"
|
|
)
|
|
temp_kml.write(kml.kml())
|
|
|
|
logging.debug(f"{log_prefix} LookAt+Point KML file created successfully.")
|
|
return temp_kml_path
|
|
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error generating LookAt+Point KML file:")
|
|
return None
|
|
|
|
|
|
def generate_composite_kml(
|
|
points_data: List[Tuple[float, float, str, Optional[str]]],
|
|
sar_image_processed: np.ndarray, # Assumed BGR uint8
|
|
geo_info_radians: Dict[str, Any],
|
|
) -> Optional[str]:
|
|
"""
|
|
Generates a KML file including placemarks, SAR footprint polygon, and
|
|
a GroundOverlay using gx:LatLonQuad for accurate image placement.
|
|
Saves the KML and a temporary PNG overlay image to the configured KML output directory.
|
|
|
|
Args:
|
|
points_data (List[Tuple[float, float, str, Optional[str]]]):
|
|
List of (latitude_deg, longitude_deg, placemark_name, optional_desc).
|
|
sar_image_processed (np.ndarray): The SAR image processed for display
|
|
(uint8 BGR, after B/C and palette).
|
|
geo_info_radians (Dict[str, Any]): The SAR georeferencing info (radians).
|
|
|
|
Returns:
|
|
Optional[str]: Full path to the saved KML file, or None on error.
|
|
"""
|
|
log_prefix = "[Utils KML Composite]"
|
|
# Library checks (using flags defined globally)
|
|
if not _simplekml_available or not _pyproj_available:
|
|
logging.error(f"{log_prefix} Cannot generate KML: simplekml or pyproj missing.")
|
|
return None
|
|
if not _cv2_available: # Check OpenCV flag
|
|
logging.error(f"{log_prefix} Cannot generate KML overlay: OpenCV (cv2) missing.")
|
|
return None
|
|
# Input data checks
|
|
if sar_image_processed is None or sar_image_processed.size == 0:
|
|
logging.error(f"{log_prefix} Cannot generate KML: Invalid SAR image provided.")
|
|
return None
|
|
if not geo_info_radians or not geo_info_radians.get("valid", False):
|
|
logging.error(f"{log_prefix} Cannot generate KML: Invalid geo_info provided.")
|
|
return None
|
|
|
|
png_output_path = None # Define variable before try block for cleanup
|
|
try:
|
|
kml_dir_path = Path(config.KML_OUTPUT_DIRECTORY)
|
|
# Ensure output directory exists
|
|
try:
|
|
kml_dir_path.mkdir(parents=True, exist_ok=True)
|
|
except OSError as e:
|
|
logging.error(
|
|
f"{log_prefix} Failed to create KML directory '{kml_dir_path}': {e}"
|
|
)
|
|
return None
|
|
|
|
# --- 1. Save Processed SAR Image as Temporary PNG ---
|
|
ts_img = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
|
|
png_filename = f"sar_overlay_{ts_img}.png"
|
|
png_output_path = kml_dir_path / png_filename # Path defined here
|
|
kml_image_href = png_filename # Relative path for KML Icon href
|
|
try:
|
|
logging.debug(
|
|
f"{log_prefix} Saving processed SAR image to {png_output_path}..."
|
|
)
|
|
# Ensure uint8 and BGR before saving with OpenCV
|
|
img_to_save = sar_image_processed
|
|
if img_to_save.dtype != np.uint8:
|
|
img_to_save = np.clip(img_to_save, 0, 255).astype(np.uint8)
|
|
if img_to_save.ndim == 2:
|
|
img_to_save = cv2.cvtColor(img_to_save, cv2.COLOR_GRAY2BGR)
|
|
|
|
save_success = cv2.imwrite(str(png_output_path), img_to_save)
|
|
if not save_success:
|
|
raise IOError("cv2.imwrite failed to save PNG.")
|
|
logging.debug(f"{log_prefix} SAR overlay PNG saved successfully.")
|
|
except Exception as img_save_err:
|
|
logging.exception(f"{log_prefix} Failed to save SAR overlay PNG:")
|
|
return None # Cannot proceed without the image
|
|
|
|
# --- 2. Calculate Geographic Corners ---
|
|
corners_deg = _calculate_geo_corners_for_kml(geo_info_radians)
|
|
if corners_deg is None:
|
|
logging.error(f"{log_prefix} Failed to calculate SAR corners for KML.")
|
|
png_output_path.unlink(missing_ok=True) # Attempt cleanup
|
|
return None
|
|
|
|
# --- 3. Create KML Document ---
|
|
center_lon_deg = math.degrees(geo_info_radians["lon"])
|
|
center_lat_deg = math.degrees(geo_info_radians["lat"])
|
|
kml_doc_name = f"GE_All_{ts_img}"
|
|
kml = simplekml.Kml(name=kml_doc_name)
|
|
|
|
# --- 4. Set LookAt View (Center on SAR) ---
|
|
try:
|
|
width_km = (
|
|
geo_info_radians.get("scale_x", 1.0)
|
|
* geo_info_radians.get("width_px", 1000)
|
|
) / 1000.0
|
|
height_km = (
|
|
geo_info_radians.get("scale_y", 1.0)
|
|
* geo_info_radians.get("height_px", 1000)
|
|
) / 1000.0
|
|
view_altitude_m = max(width_km, height_km) * 2500.0
|
|
view_altitude_m = max(1000.0, min(view_altitude_m, 500000.0))
|
|
except:
|
|
view_altitude_m = 25000.0
|
|
|
|
kml.document.lookat.longitude = center_lon_deg
|
|
kml.document.lookat.latitude = center_lat_deg
|
|
kml.document.lookat.range = view_altitude_m
|
|
kml.document.lookat.tilt = 0
|
|
kml.document.lookat.heading = 0
|
|
kml.document.lookat.altitudemode = simplekml.AltitudeMode.clamptoground
|
|
|
|
# --- 5. Add Placemarks (Points) ---
|
|
style_sar_center = simplekml.Style()
|
|
style_sar_center.iconstyle.icon.href = (
|
|
"http://maps.google.com/mapfiles/kml/paddle/red-stars.png"
|
|
)
|
|
style_sar_center.iconstyle.scale = 1.1
|
|
style_mouse_on_sar = simplekml.Style()
|
|
style_mouse_on_sar.iconstyle.icon.href = (
|
|
"http://maps.google.com/mapfiles/kml/paddle/blu-circle.png"
|
|
)
|
|
style_mouse_on_sar.iconstyle.scale = 1.1
|
|
style_mouse_on_map = simplekml.Style()
|
|
style_mouse_on_map.iconstyle.icon.href = (
|
|
"http://maps.google.com/mapfiles/kml/paddle/grn-diamond.png"
|
|
)
|
|
style_mouse_on_map.iconstyle.scale = 1.1
|
|
|
|
folder_points = kml.newfolder(name="Points")
|
|
logging.debug(f"{log_prefix} Adding {len(points_data)} placemarks to KML.")
|
|
for lat, lon, name, desc in points_data:
|
|
point = folder_points.newpoint(name=name)
|
|
point.coords = [(lon, lat)]
|
|
if desc:
|
|
point.description = desc
|
|
# Apply style based on name
|
|
if name == "SAR Center":
|
|
point.style = style_sar_center
|
|
elif name == "Mouse on SAR":
|
|
point.style = style_mouse_on_sar
|
|
elif name == "Mouse on Map":
|
|
point.style = style_mouse_on_map
|
|
|
|
# --- 6. Add SAR Footprint Polygon ---
|
|
folder_footprint = kml.newfolder(name="SAR Footprint")
|
|
outer_boundary = [(lon, lat, 0) for lon, lat in corners_deg] + [
|
|
(corners_deg[0][0], corners_deg[0][1], 0)
|
|
]
|
|
polygon = folder_footprint.newpolygon(
|
|
name="SAR Outline",
|
|
outerboundaryis=outer_boundary
|
|
)
|
|
polygon.extrude = 0
|
|
polygon.altitudemode = simplekml.AltitudeMode.clamptoground
|
|
polygon.style.linestyle.color = simplekml.Color.red
|
|
polygon.style.linestyle.width = 2
|
|
polygon.style.polystyle.fill = 0 # No fill
|
|
|
|
# --- 7. Add SAR Ground Overlay using gx:LatLonQuad (Manual XML) ---
|
|
folder_overlay = kml.newfolder(name="SAR Overlay")
|
|
# Prepare coordinates string: BL, BR, TR, TL
|
|
bl_lon, bl_lat = corners_deg[3]
|
|
br_lon, br_lat = corners_deg[2]
|
|
tr_lon, tr_lat = corners_deg[1]
|
|
tl_lon, tl_lat = corners_deg[0]
|
|
coord_string = (
|
|
f"{bl_lon:.8f},{bl_lat:.8f},0 "
|
|
f"{br_lon:.8f},{br_lat:.8f},0 "
|
|
f"{tr_lon:.8f},{tr_lat:.8f},0 "
|
|
f"{tl_lon:.8f},{tl_lat:.8f},0"
|
|
)
|
|
|
|
kml_added_manually = False
|
|
if _lxml_available: # <<< Use global flag
|
|
try:
|
|
kml_folder_element = getattr(folder_overlay, '_elem', None)
|
|
if kml_folder_element is None:
|
|
raise AttributeError("Cannot access internal simplekml element (_elem).")
|
|
|
|
GX_NAMESPACE = "http://www.google.com/kml/ext/2.2"
|
|
GX = "{%s}" % GX_NAMESPACE
|
|
|
|
go_elem = etree.Element("GroundOverlay")
|
|
etree.SubElement(go_elem, "name").text = "SAR Image"
|
|
icon_elem = etree.SubElement(go_elem, "Icon")
|
|
etree.SubElement(icon_elem, "href").text = kml_image_href
|
|
etree.SubElement(go_elem, "altitudeMode").text = "clampToGround"
|
|
quad_elem = etree.SubElement(go_elem, GX + "LatLonQuad")
|
|
etree.SubElement(quad_elem, "coordinates").text = coord_string
|
|
kml_folder_element.append(go_elem)
|
|
kml_added_manually = True
|
|
logging.debug(f"{log_prefix} Manually added GroundOverlay with gx:LatLonQuad using lxml.")
|
|
|
|
except ImportError: # Should not happen if _lxml_available is True
|
|
logging.error(f"{log_prefix} lxml import failed despite initial check.")
|
|
# Do NOT assign to _lxml_available here (avoid UnboundLocalError)
|
|
except AttributeError as ae:
|
|
logging.error(f"{log_prefix} Error accessing simplekml internal element for manual KML addition: {ae}")
|
|
except Exception as xml_err:
|
|
logging.exception(f"{log_prefix} Error manually constructing KML overlay XML: {xml_err}")
|
|
|
|
# --- Fallback if manual addition failed or lxml not available ---
|
|
if not kml_added_manually:
|
|
logging.warning(f"{log_prefix} Failed to add gx:LatLonQuad. Falling back to standard GroundOverlay with LatLonBox/Rotation (may look distorted).")
|
|
ground = folder_overlay.newgroundoverlay(name="SAR Image (Fallback)")
|
|
ground.icon.href = kml_image_href
|
|
lons = [c[0] for c in corners_deg]
|
|
lats = [c[1] for c in corners_deg]
|
|
north = max(lats)
|
|
south = min(lats)
|
|
east = max(lons)
|
|
west = min(lons)
|
|
orientation_deg = math.degrees(geo_info_radians.get("orientation", 0.0))
|
|
ground.latlonbox.north = north
|
|
ground.latlonbox.south = south
|
|
ground.latlonbox.east = east
|
|
ground.latlonbox.west = west
|
|
ground.latlonbox.rotation = -orientation_deg
|
|
ground.altitudemode = simplekml.AltitudeMode.clamptoground
|
|
|
|
# --- 8. Save Composite KML File ---
|
|
ts_kml = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
|
|
kml_filename = f"composite_{ts_kml}.kml"
|
|
kml_output_path = kml_dir_path / kml_filename
|
|
|
|
logging.debug(f"{log_prefix} Saving Composite KML to: {kml_output_path}")
|
|
kml_output_path.parent.mkdir(parents=True, exist_ok=True) # Ensure again
|
|
kml.save(str(kml_output_path))
|
|
|
|
logging.debug(f"{log_prefix} Composite KML file created successfully.")
|
|
return str(kml_output_path) # Return the full path
|
|
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error generating composite KML file:")
|
|
# Attempt cleanup of the PNG if it was created
|
|
if png_output_path is not None and png_output_path.exists():
|
|
try:
|
|
png_output_path.unlink()
|
|
logging.debug(f"{log_prefix} Cleaned up temporary PNG: {png_output_path}")
|
|
except OSError:
|
|
logging.warning(f"{log_prefix} Failed to cleanup temporary PNG: {png_output_path}")
|
|
return None
|
|
|
|
|
|
def launch_google_earth(kml_path: Optional[str]):
|
|
"""
|
|
Attempts to open a KML file using the system's default application
|
|
or google-earth-pro directly.
|
|
|
|
Args:
|
|
kml_path (Optional[str]): The path to the KML file to launch.
|
|
"""
|
|
log_prefix = "[Utils Launch GE]"
|
|
if not kml_path or not os.path.exists(kml_path):
|
|
logging.error(
|
|
f"{log_prefix} Cannot launch: KML file path invalid or not found: {kml_path}"
|
|
)
|
|
return
|
|
logging.info(f"{log_prefix} Attempting launch for KML: {kml_path}")
|
|
try:
|
|
if sys.platform == "win32":
|
|
# Use os.startfile on Windows
|
|
os.startfile(kml_path)
|
|
elif sys.platform == "darwin":
|
|
# Use 'open' command on macOS
|
|
subprocess.run(["open", kml_path], check=True)
|
|
else:
|
|
# Use 'google-earth-pro' or 'xdg-open' on Linux/Unix
|
|
cmd = shutil.which("google-earth-pro") or shutil.which("xdg-open")
|
|
if cmd:
|
|
logging.debug(f"{log_prefix} Using command: {cmd}")
|
|
if cmd.endswith("pro"):
|
|
# Popen for GE Pro to run in background potentially
|
|
subprocess.Popen([cmd, kml_path])
|
|
else:
|
|
# run for xdg-open, wait for it?
|
|
subprocess.run([cmd, kml_path], check=True)
|
|
else:
|
|
# Log error if no suitable command found
|
|
err_msg = "Neither google-earth-pro nor xdg-open found in PATH."
|
|
logging.error(f"{log_prefix} {err_msg}")
|
|
raise FileNotFoundError(err_msg)
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error launching KML handler:")
|
|
|
|
|
|
def cleanup_kml_output_directory(kml_directory: str, max_files_to_keep: int):
|
|
"""
|
|
Removes the oldest files (KML and associated PNGs) from the specified
|
|
directory if the total count exceeds the configured limit.
|
|
Relies on timestamps embedded in filenames (YYYYMMDD_HHMMSS_fff format).
|
|
|
|
Args:
|
|
kml_directory (str): The directory containing KML and PNG files.
|
|
max_files_to_keep (int): The maximum number of files (KML+PNG) to retain.
|
|
Cleanup is skipped if <= 0.
|
|
"""
|
|
log_prefix = "[Utils Cleanup KMLDir]"
|
|
# Skip if cleanup is disabled or directory invalid
|
|
if max_files_to_keep <= 0:
|
|
return
|
|
|
|
try:
|
|
kml_dir_path = Path(kml_directory)
|
|
if not kml_dir_path.is_dir():
|
|
logging.warning(
|
|
f"{log_prefix} Output directory not found: '{kml_directory}'. Cannot cleanup."
|
|
)
|
|
return
|
|
|
|
# --- 1. Identify Files and Extract Timestamps ---
|
|
all_files_with_ts: List[Tuple[Path, str]] = []
|
|
# Regex to capture the timestamp string from supported filename patterns
|
|
timestamp_pattern = re.compile(
|
|
r"^(?:sar_footprint_|composite_|multi_point_|sar_overlay_)"
|
|
r"(\d{8}_\d{6}_\d{3})\."
|
|
r"(?:kml|png)$"
|
|
)
|
|
|
|
logging.debug(f"{log_prefix} Scanning directory: {kml_dir_path}")
|
|
# Iterate through all files in the directory
|
|
for item in kml_dir_path.iterdir():
|
|
if item.is_file():
|
|
match = timestamp_pattern.match(item.name)
|
|
if match:
|
|
timestamp_str = match.group(1) # Extract timestamp string
|
|
all_files_with_ts.append((item, timestamp_str))
|
|
else:
|
|
pass # Skip files not matching the pattern
|
|
|
|
# --- 2. Check if Cleanup is Needed ---
|
|
current_total_count = len(all_files_with_ts)
|
|
logging.debug(f"{log_prefix} Found {current_total_count} potentially manageable files.")
|
|
if current_total_count <= max_files_to_keep:
|
|
logging.debug(f"{log_prefix} File count within limit ({max_files_to_keep}). No cleanup needed.")
|
|
return
|
|
|
|
# --- 3. Sort Files by Timestamp (Oldest First) ---
|
|
all_files_with_ts.sort(key=lambda x: x[1])
|
|
logging.debug(f"{log_prefix} Sorted files by timestamp.")
|
|
|
|
# --- 4. Determine Files to Delete ---
|
|
num_to_delete = current_total_count - max_files_to_keep
|
|
files_to_delete = all_files_with_ts[:num_to_delete] # Get the oldest ones
|
|
|
|
logging.info(
|
|
f"{log_prefix} Found {current_total_count} files. "
|
|
f"Attempting to delete oldest {num_to_delete} (KML and PNG)..."
|
|
)
|
|
|
|
# --- 5. Delete Oldest Files ---
|
|
deleted_count = 0
|
|
for file_path, ts_str in files_to_delete:
|
|
try:
|
|
file_path.unlink() # Delete the file
|
|
logging.debug(f"{log_prefix} Deleted old file: {file_path.name} (TS: {ts_str})")
|
|
deleted_count += 1
|
|
except Exception as delete_e:
|
|
logging.error(
|
|
f"{log_prefix} Failed to delete file '{file_path.name}': {delete_e}"
|
|
)
|
|
logging.info(
|
|
f"{log_prefix} Cleanup finished. Deleted {deleted_count}/{num_to_delete} files."
|
|
)
|
|
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error during KML/PNG cleanup process:")
|
|
|
|
|
|
# --- END OF FILE utils.py --- |