# --- START OF FILE utils.py --- # utils.py """ THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. Provides utility functions used throughout the application, including safe queue management (put, clear), coordinate formatting (decimal degrees to DMS and DMS string to decimal), KML generation and management, and opening web maps. Uses standardized logging prefixes. Drop counts are managed within AppState. """ # Standard library imports import queue import logging import math import os import datetime import sys import subprocess import shutil from pathlib import Path import webbrowser import urllib.parse import re # For DMS parsing from typing import Optional, Tuple, List, Any, Dict import tempfile import ctypes # Needed for format_ctypes_structure # Third-party imports (ensure these are installed) import numpy as np # --- Availability Flags & Library Imports --- # Define flags *before* use, handle ImportErrors _simplekml_available = False try: import simplekml _simplekml_available = True except ImportError: simplekml = None logging.warning( "[Utils Init] Library 'simplekml' not found. KML generation disabled. " "(pip install simplekml)" ) _pyproj_available = False try: import pyproj _pyproj_available = True except ImportError: pyproj = None logging.warning( "[Utils Init] Library 'pyproj' not found. " "Some geometric calculations (KML corners, BBox) may be less accurate. " "(pip install pyproj)" ) _cv2_available = False try: import cv2 _cv2_available = True except ImportError: cv2 = None logging.warning( "[Utils Init] Library 'opencv-python' not found. " "Cannot save images for KML overlays." ) _lxml_available = False try: from lxml import etree _lxml_available = True except ImportError: etree = None logging.warning("[Utils Init] Library 'lxml' not found. Cannot use gx:LatLonQuad fallback.") # --- Local application imports (after third-party and flag definitions) --- import config # Import config after potential library warnings _apply_color_palette_available = False try: # Import only after cv2 availability check might be safer, but fine here for now from image_processing import apply_color_palette _apply_color_palette_available = True except ImportError: apply_color_palette = None logging.warning( "[Utils Init] Function 'apply_color_palette' not found. " "Color palettes in KML overlay might not work." ) # --- Queue Management Functions --- def put_queue(queue_obj, item, queue_name="Unknown", app_instance=None): """ Safely puts an item into a queue using non-blocking put. Increments specific drop counters in the AppState object if the queue is full. Discards item if the application is shutting down. Args: queue_obj (queue.Queue): The queue instance. item (Any): The item to put into the queue. queue_name (str): Name of the queue (for logging/stats). app_instance (Optional[ControlPanelApp]): Reference to the main app instance (for shutdown check and drop count). """ log_prefix = "[Utils Queue Put]" # Check shutdown flag via app_instance's state if ( app_instance and hasattr(app_instance, "state") and app_instance.state.shutting_down ): return # Silently discard if shutting down try: # Put item without blocking queue_obj.put(item, block=False) except queue.Full: # Log queue full warning logging.warning(f"{log_prefix} Queue '{queue_name}' is full. Dropping item.") # Increment the specific drop counter via the AppState instance if app_instance and hasattr(app_instance, "state"): try: app_instance.state.increment_dropped_count(queue_name) except Exception as e: logging.error( f"{log_prefix} Failed to increment drop count for '{queue_name}': {e}" ) else: logging.error( f"{log_prefix} Cannot increment drop count for '{queue_name}': " "App instance or state missing." ) except Exception as e: # Log any other unexpected errors during queue put logging.exception( f"{log_prefix} Unexpected error putting item onto queue '{queue_name}': {e}" ) def clear_queue(q: queue.Queue): """Removes all items currently in the specified queue.""" log_prefix = "[Utils Queue Clear]" try: q_size_before = q.qsize() # Get approximate size before clearing # Ensure thread safety during clear operation with q.mutex: q.queue.clear() # Reset task tracking if used if hasattr(q, 'unfinished_tasks'): q.unfinished_tasks = 0 logging.debug( f"{log_prefix} Cleared queue (approx size before: {q_size_before})." ) except Exception as e: logging.exception(f"{log_prefix} Error clearing queue: {e}") # --- Coordinate Formatting --- def decimal_to_dms(decimal_degrees: float, is_latitude: bool) -> str: """ Converts decimal degrees to a formatted DMS (Degrees, Minutes, Seconds) string. Returns "N/A", "Invalid Lat/Lon", or "Error DMS" on error. Args: decimal_degrees (float): Coordinate value in decimal degrees. is_latitude (bool): True if latitude, False if longitude. Returns: str: Formatted DMS string or an error indicator. """ log_prefix = "[Utils DMS Conv]" coord_type = "Latitude" if is_latitude else "Longitude" try: # Input validation if ( not isinstance(decimal_degrees, (int, float)) or math.isnan(decimal_degrees) or math.isinf(decimal_degrees) ): logging.warning( f"{log_prefix} Invalid input type or value ({decimal_degrees}) " f"for {coord_type}." ) return "N/A" # Range check and direction determination if is_latitude: if not (-90.0 <= decimal_degrees <= 90.0): logging.warning( f"{log_prefix} Latitude {decimal_degrees} out of range." ) return "Invalid Lat" direction = "N" if decimal_degrees >= 0 else "S" deg_pad = 2 # Padding for degrees (e.g., 01°, 89°) else: # Longitude if not (-180.0 <= decimal_degrees <= 180.0): logging.warning( f"{log_prefix} Longitude {decimal_degrees} out of range." ) return "Invalid Lon" direction = "E" if decimal_degrees >= 0 else "W" deg_pad = 3 # Padding for degrees (e.g., 001°, 179°) # Calculations dd_abs = abs(decimal_degrees) degrees = math.floor(dd_abs) minutes_dec = (dd_abs - degrees) * 60.0 minutes = math.floor(minutes_dec) seconds = (minutes_dec - minutes) * 60.0 # Handle floating point precision near 60.0 for seconds/minutes if abs(seconds - 60.0) < 1e-9: seconds = 0.0 minutes += 1 if minutes == 60: minutes = 0 degrees += 1 # Recalculate absolute degrees if degrees incremented past range limit if is_latitude and degrees > 90: degrees = 90 minutes = 0 seconds = 0.0 if not is_latitude and degrees > 180: degrees = 180 minutes = 0 seconds = 0.0 # Format the final string dms_string = ( f"{degrees:0{deg_pad}d}° {minutes:02d}' {seconds:05.2f}\" {direction}" ) return dms_string except Exception as e: logging.exception( f"{log_prefix} Error converting {decimal_degrees} to DMS: {e}" ) return "Error DMS" def dms_string_to_decimal(dms_str: str, is_latitude: bool) -> Optional[float]: """ Converts a DMS string (e.g., "46° 09' 50.24\" N", "009° 23' 14.11\" E") to decimal degrees. Args: dms_str (str): The DMS string to parse. is_latitude (bool): True if the string represents latitude, False for longitude. Returns: Optional[float]: The coordinate in decimal degrees, or None on parsing error. """ log_prefix = "[Utils DMS Parse]" if not dms_str or not isinstance(dms_str, str): logging.warning(f"{log_prefix} Invalid input string: {dms_str}") return None # Regex to capture degrees, minutes, seconds, and direction pattern = re.compile( r"^\s*(\d{1,3})\s*[°]\s*" # Degrees (1-3 digits) + ° symbol r"(\d{1,2})\s*[']\s*" # Minutes (1-2 digits) + ' symbol r"([\d.]+)\s*[\"”]\s*" # Seconds (digits, dot) + " or ” symbol r"([NSEWnsew])\s*$", # Direction (N,S,E,W case-insensitive) re.IGNORECASE, ) match = pattern.match(dms_str.strip()) if not match: logging.error(f"{log_prefix} Could not parse DMS string format: '{dms_str}'") return None try: # Extract components as floats degrees = float(match.group(1)) minutes = float(match.group(2)) seconds = float(match.group(3)) direction = match.group(4).upper() # Validate component ranges if not (0 <= minutes < 60 and 0 <= seconds < 60): logging.error( f"{log_prefix} Invalid minutes ({minutes}) or seconds ({seconds}) " f"in DMS: '{dms_str}'" ) return None # Validate direction based on coordinate type valid_dirs = ("N", "S") if is_latitude else ("E", "W") if direction not in valid_dirs: coord_type = "latitude" if is_latitude else "longitude" logging.error( f"{log_prefix} Invalid direction '{direction}' for {coord_type}: " f"'{dms_str}'" ) return None # Calculate decimal degrees decimal_degrees = degrees + (minutes / 60.0) + (seconds / 3600.0) # Apply sign based on direction if direction in ("S", "W"): decimal_degrees *= -1.0 # Final range check for the calculated decimal value if is_latitude and not (-90.0 <= decimal_degrees <= 90.0): logging.warning( f"{log_prefix} Calculated latitude {decimal_degrees:.7f} out of range." ) return None # Treat out of range as error if not is_latitude and not (-180.0 <= decimal_degrees <= 180.0): logging.warning( f"{log_prefix} Calculated longitude {decimal_degrees:.7f} out of range." ) return None logging.debug(f"{log_prefix} Parsed '{dms_str}' -> {decimal_degrees:.7f}") return decimal_degrees except ValueError as ve: # Handle errors during float conversion logging.error( f"{log_prefix} Error converting components to float: {ve} in '{dms_str}'" ) return None except Exception as e: logging.exception( f"{log_prefix} Unexpected error parsing DMS string '{dms_str}':" ) return None # --- Metadata Formatting Function --- def format_ctypes_structure(structure: ctypes.Structure, indent_level: int = 0) -> str: """ Generates a formatted string representation of a ctypes Structure, handling nested structures, arrays, and basic types recursively. Args: structure: The ctypes.Structure instance to format. indent_level: The current indentation level for nested structures. Returns: A multi-line string representing the structure's content. """ indent = " " * indent_level # Define indentation string result = "" # Check if it's actually a structure with _fields_ attribute if not hasattr(structure, "_fields_"): # Fallback for non-structures or basic types passed accidentally return f"{indent}Value: {repr(structure)}\n" # Iterate through the fields defined in the structure for field_name, field_type in structure._fields_: try: # Get the value of the current field value = getattr(structure, field_name) formatted_value = "" # --- Handle Nested Structures --- if hasattr(field_type, "_fields_"): # If the field type itself is another Structure result += f"{indent}{field_name} ({field_type.__name__}):\n" # Recursively call formatting for the nested structure result += format_ctypes_structure(value, indent_level + 1) # --- Handle Arrays --- elif issubclass(field_type, ctypes.Array): array_len = getattr(field_type, "_length_", "N/A") elem_type = getattr(field_type, "_type_", "N/A") elem_name = getattr(elem_type, "__name__", "?") # Special handling for byte arrays (show hex preview) if issubclass(elem_type, (ctypes.c_byte, ctypes.c_ubyte)): try: # Attempt conversion to bytes for hex preview byte_value = bytes(value[:16]) # Limit preview length preview = byte_value.hex().upper() if len(value) > 16: preview += "..." formatted_value = f"<{elem_name}[{array_len}] Data: {preview}>" except: # Fallback if conversion to bytes fails formatted_value = f"<{elem_name}[{array_len}] (Preview N/A)>" # Handle simple numeric arrays (show first few elements) elif isinstance(value, (list, tuple)): preview = str(list(value[:8])) # Limit preview length if len(value) > 8: # Indicate truncation preview = preview[:-1] + ", ...]" formatted_value = f"<{elem_name}[{array_len}] Data: {preview}>" else: # For other array types, just show type and size formatted_value = f"<{elem_name}[{array_len}]>" result += f"{indent}{field_name}: {formatted_value}\n" # --- Handle Basic Types --- else: # Special Formatting for Known Fields if field_name in ( "LATITUDE", "LONGITUDE", "ORIENTATION", "POI_LATITUDE", "POI_LONGITUDE", "POI_ORIENTATION", ) and isinstance(value, float): if math.isfinite(value): deg_value = math.degrees(value) prec = 2 if "ORIENTATION" in field_name else 6 formatted_value = f"{value:.7f} rad ({deg_value:.{prec}f} deg)" else: formatted_value = f"{value} rad (Non-finite)" elif field_name in ( "SFP_MARKER", "SFP_PT_SPARE", "SFP_TAG", "SFP_SRC", "SFP_TID", "SFP_FLAGS", "SFP_WIN", "SFP_ERR", "SFP_ERR_INFO", "SFP_RECTYPE", "SFP_RECSPARE", "SFP_PLDAP", "SFP_PLEXT", ) and isinstance(value, int): formatted_value = f"0x{value:02X} ({value})" elif field_name in ("SFP_DIRECTION", "SFP_FLOW") and isinstance( value, int ): char_repr = chr(value) if 32 <= value <= 126 else "?" formatted_value = f"0x{value:02X} ('{char_repr}')" elif isinstance(value, bytes): try: decoded = value.decode("ascii", errors="replace") hex_preview = value.hex().upper()[:32] # Limit hex preview suffix = "..." if len(value) > 16 else "" formatted_value = f"'{decoded}' (hex: {hex_preview}{suffix})" except: formatted_value = repr(value) # Fallback if not decodable else: # Default representation for other types formatted_value = repr(value) result += f"{indent}{field_name}: {formatted_value}\n" except AttributeError: # Handle cases where getattr might fail result += f"{indent}{field_name}: \n" except Exception as e: # Catch any other formatting errors for a specific field result += f"{indent}{field_name}: \n" return result # --- Google Maps Launcher --- def open_google_maps(latitude_deg: float, longitude_deg: float): """Opens the default web browser to Google Maps centered on the specified coordinates.""" log_prefix = "[Utils Gmaps]" # Validate coordinates if not ( isinstance(latitude_deg, (int, float)) and math.isfinite(latitude_deg) and -90 <= latitude_deg <= 90 ): logging.error(f"{log_prefix} Invalid latitude: {latitude_deg}.") return if not ( isinstance(longitude_deg, (int, float)) and math.isfinite(longitude_deg) and -180 <= longitude_deg <= 180 ): logging.error(f"{log_prefix} Invalid longitude: {longitude_deg}.") return try: # Construct URL using the search API for better pinning query_str = f"{latitude_deg:.7f},{longitude_deg:.7f}" # URL encode the query string encoded_query = urllib.parse.quote(query_str) gmaps_url = f"https://www.google.com/maps/search/?api=1&query={encoded_query}" logging.info(f"{log_prefix} Opening Google Maps URL: {gmaps_url}") # Open in a new tab if possible opened_ok = webbrowser.open_new_tab(gmaps_url) if not opened_ok: logging.warning(f"{log_prefix} webbrowser.open_new_tab returned False.") # Fallback attempt if new tab fails webbrowser.open(gmaps_url) except Exception as e: logging.exception(f"{log_prefix} Failed to open Google Maps URL:") # --- KML Generation and Management --- def _calculate_geo_corners_for_kml( geo_info_radians: Dict[str, Any], ) -> Optional[List[Tuple[float, float]]]: """Internal helper to calculate geographic corners (degrees) from geo_info (radians). Requires pyproj.""" # Check if pyproj is available if not _pyproj_available: logging.error("[Utils KML Calc] pyproj library needed for corner calculation.") return None log_prefix = "[Utils KML Calc]" try: # Define the geodetic calculator (WGS84 ellipsoid) geod = pyproj.Geod(ellps="WGS84") # Extract necessary info from the dictionary center_lat_rad = geo_info_radians["lat"] center_lon_rad = geo_info_radians["lon"] orient_rad = geo_info_radians["orientation"] calc_orient_rad = -orient_rad # Angle used for projection ref_x = geo_info_radians["ref_x"] ref_y = geo_info_radians["ref_y"] scale_x = geo_info_radians["scale_x"] scale_y = geo_info_radians["scale_y"] width = geo_info_radians["width_px"] height = geo_info_radians["height_px"] # Validate inputs if not (scale_x > 0 and scale_y > 0 and width > 0 and height > 0): logging.error(f"{log_prefix} Invalid scale or dimensions in geo_info.") return None # Define corner pixel coordinates relative to reference pixel corners_pixel_delta = [ (0 - ref_x, ref_y - 0), # Top-Left Delta (width - 1 - ref_x, ref_y - 0), # Top-Right Delta (width - 1 - ref_x, ref_y - (height - 1)), # Bottom-Right Delta (0 - ref_x, ref_y - (height - 1)), # Bottom-Left Delta ] # Convert pixel deltas to meter deltas (unrotated) corners_meters = [ (dx * scale_x, dy * scale_y) for dx, dy in corners_pixel_delta ] # Rotate meter deltas based on orientation if significant corners_meters_rotated = corners_meters if abs(calc_orient_rad) > 1e-6: # Only rotate if angle is non-zero cos_o = math.cos(calc_orient_rad) sin_o = math.sin(calc_orient_rad) corners_meters_rotated = [ (dx * cos_o - dy * sin_o, dx * sin_o + dy * cos_o) for dx, dy in corners_meters ] # Calculate geographic coordinates for each corner using forward projection corners_geo_deg = [] center_lon_deg = math.degrees(center_lon_rad) center_lat_deg = math.degrees(center_lat_rad) # Loop through rotated meter deltas for dx_m, dy_m in corners_meters_rotated: # Calculate distance and azimuth from center to corner distance = math.hypot(dx_m, dy_m) azimuth = math.degrees(math.atan2(dx_m, dy_m)) % 360.0 # Use pyproj geod.fwd for accurate projection lon, lat, _ = geod.fwd(center_lon_deg, center_lat_deg, azimuth, distance) corners_geo_deg.append((lon, lat)) # Store (longitude, latitude) # Ensure we have exactly 4 corners if len(corners_geo_deg) == 4: return corners_geo_deg else: logging.error(f"{log_prefix} Incorrect number of corners calculated.") return None except KeyError as ke: logging.error(f"{log_prefix} Missing required key in geo_info_radians: {ke}") return None except Exception as e: logging.exception(f"{log_prefix} Error calculating geographic corners for KML:") return None def generate_sar_kml(geo_info_radians: Dict[str, Any], output_path: str) -> bool: """ Generates a KML file representing the SAR footprint area as a polygon. Args: geo_info_radians (Dict[str, Any]): Dictionary with SAR georeferencing. output_path (str): The full path where the KML file should be saved. Returns: bool: True if KML generation and saving were successful, False otherwise. """ log_prefix = "[Utils KML Gen]" # Check for required libraries if not _simplekml_available or not _pyproj_available: logging.error(f"{log_prefix} Cannot generate KML: simplekml or pyproj missing.") return False # Check for valid input geo_info if not geo_info_radians or not geo_info_radians.get("valid", False): logging.warning(f"{log_prefix} Cannot generate KML: Invalid geo_info provided.") return False try: # Calculate geographic corner coordinates corners_deg = _calculate_geo_corners_for_kml(geo_info_radians) if corners_deg is None: logging.error(f"{log_prefix} Failed to calculate KML corners.") return False # Error already logged by helper # Extract center and orientation for LookAt view center_lon_deg = math.degrees(geo_info_radians["lon"]) center_lat_deg = math.degrees(geo_info_radians["lat"]) orientation_deg = math.degrees(geo_info_radians.get("orientation", 0.0)) # Estimate a reasonable viewing altitude based on image size try: width_km = ( geo_info_radians.get("scale_x", 1.0) * geo_info_radians.get("width_px", 1000) ) / 1000.0 height_km = ( geo_info_radians.get("scale_y", 1.0) * geo_info_radians.get("height_px", 1000) ) / 1000.0 view_altitude_m = max(width_km, height_km) * 2000.0 view_altitude_m = max(1000.0, min(view_altitude_m, 500000.0)) except: view_altitude_m = 10000.0 # Default altitude on error # Create KML object kml = simplekml.Kml(name=f"SAR Image {datetime.datetime.now():%Y%m%d_%H%M%S}") # Set LookAt view kml.document.lookat.longitude = center_lon_deg kml.document.lookat.latitude = center_lat_deg kml.document.lookat.range = view_altitude_m kml.document.lookat.tilt = 45.0 kml.document.lookat.heading = orientation_deg % 360.0 kml.document.lookat.altitudemode = simplekml.AltitudeMode.relativetoground # Create the polygon footprint outer_boundary = [(lon, lat, 0) for lon, lat in corners_deg] + [ (corners_deg[0][0], corners_deg[0][1], 0) ] polygon = kml.newpolygon(name="SAR Footprint", outerboundaryis=outer_boundary) # Style the polygon polygon.style.linestyle.color = simplekml.Color.red polygon.style.linestyle.width = 2 polygon.style.polystyle.color = simplekml.Color.changealphaint( 100, simplekml.Color.red ) polygon.style.polystyle.outline = 1 # Save the KML file kml.save(output_path) logging.debug(f"{log_prefix} KML file saved successfully: {output_path}") return True except Exception as e: logging.exception(f"{log_prefix} Error generating or saving KML file:") return False def generate_lookat_and_point_kml( latitude_deg: float, longitude_deg: float, altitude_m: float = 10000.0, tilt: float = 45.0, heading: float = 0.0, placemark_name: str = "Selected Location", placemark_desc: Optional[str] = None, ) -> Optional[str]: """ Generates a temporary KML file containing a LookAt element and a Placemark point. Args: latitude_deg (float): Target latitude in decimal degrees. longitude_deg (float): Target longitude in decimal degrees. altitude_m (float): Viewing altitude in meters for LookAt. tilt (float): Camera tilt angle for LookAt. heading (float): Camera heading/azimuth for LookAt. placemark_name (str): Name for the placemark. placemark_desc (Optional[str]): Optional description for the placemark. Returns: Optional[str]: Path to the temporary KML file, or None on error. """ log_prefix = "[Utils KML LookAtPoint]" if not _simplekml_available: logging.error(f"{log_prefix} Cannot generate KML: simplekml missing.") return None # Validate coordinates if not ( isinstance(latitude_deg, (int, float)) and math.isfinite(latitude_deg) and -90 <= latitude_deg <= 90 ): logging.error(f"{log_prefix} Invalid latitude provided: {latitude_deg}.") return None if not ( isinstance(longitude_deg, (int, float)) and math.isfinite(longitude_deg) and -180 <= longitude_deg <= 180 ): logging.error(f"{log_prefix} Invalid longitude provided: {longitude_deg}.") return None try: kml = simplekml.Kml(name=placemark_name) # Set LookAt parameters kml.document.lookat.longitude = longitude_deg kml.document.lookat.latitude = latitude_deg kml.document.lookat.range = max(100.0, altitude_m) kml.document.lookat.tilt = tilt kml.document.lookat.heading = heading % 360.0 kml.document.lookat.altitudemode = simplekml.AltitudeMode.relativetoground # Add the Placemark point point = kml.newpoint(name=placemark_name) point.coords = [(longitude_deg, latitude_deg)] # (lon, lat) order if placemark_desc: point.description = placemark_desc # Create a temporary file with tempfile.NamedTemporaryFile( mode="w", suffix=".kml", delete=False, encoding="utf-8" ) as temp_kml: temp_kml_path = temp_kml.name logging.debug( f"{log_prefix} Saving temporary LookAt+Point KML to: {temp_kml_path}" ) temp_kml.write(kml.kml()) logging.debug(f"{log_prefix} LookAt+Point KML file created successfully.") return temp_kml_path except Exception as e: logging.exception(f"{log_prefix} Error generating LookAt+Point KML file:") return None def generate_composite_kml( points_data: List[Tuple[float, float, str, Optional[str]]], sar_image_processed: np.ndarray, # Assumed BGR uint8 geo_info_radians: Dict[str, Any], ) -> Optional[str]: """ Generates a KML file including placemarks, SAR footprint polygon, and a GroundOverlay using gx:LatLonQuad for accurate image placement. Saves the KML and a temporary PNG overlay image to the configured KML output directory. Uses lxml to insert the gx:LatLonQuad tag after initial KML generation. """ log_prefix = "[Utils KML Composite]" # Library checks (using flags defined globally) if not _simplekml_available or not _pyproj_available: logging.error(f"{log_prefix} Cannot generate KML: simplekml or pyproj missing.") return None if not _cv2_available: # Check OpenCV flag logging.error(f"{log_prefix} Cannot generate KML overlay: OpenCV (cv2) missing.") return None # --- >>> NEW LXML Check <<< --- if not _lxml_available: logging.error(f"{log_prefix} Cannot generate KML with gx:LatLonQuad: lxml missing.") # Decide if you want to proceed with fallback or stop logging.warning(f"{log_prefix} Falling back to standard GroundOverlay (may look distorted).") # If you want to completely block fallback, uncomment the next line: # return None # --- >>> END NEW LXML Check <<< --- # Input data checks if sar_image_processed is None or sar_image_processed.size == 0: logging.error(f"{log_prefix} Cannot generate KML: Invalid SAR image provided.") return None if not geo_info_radians or not geo_info_radians.get("valid", False): logging.error(f"{log_prefix} Cannot generate KML: Invalid geo_info provided.") return None png_output_path = None # Define variable before try block for cleanup kml_output_path = None # Define variable for final KML path try: kml_dir_path = Path(config.KML_OUTPUT_DIRECTORY) # Ensure output directory exists try: kml_dir_path.mkdir(parents=True, exist_ok=True) except OSError as e: logging.error( f"{log_prefix} Failed to create KML directory '{kml_dir_path}': {e}" ) return None # --- 1. Save Processed SAR Image as Temporary PNG --- ts_img = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3] png_filename = f"sar_overlay_{ts_img}.png" png_output_path = kml_dir_path / png_filename # Path defined here kml_image_href = png_filename # Relative path for KML Icon href try: logging.debug( f"{log_prefix} Saving processed SAR image to {png_output_path}..." ) img_to_save = sar_image_processed if img_to_save.dtype != np.uint8: img_to_save = np.clip(img_to_save, 0, 255).astype(np.uint8) if img_to_save.ndim == 2: img_to_save = cv2.cvtColor(img_to_save, cv2.COLOR_GRAY2BGR) save_success = cv2.imwrite(str(png_output_path), img_to_save) if not save_success: raise IOError("cv2.imwrite failed to save PNG.") logging.debug(f"{log_prefix} SAR overlay PNG saved successfully.") except Exception as img_save_err: logging.exception(f"{log_prefix} Failed to save SAR overlay PNG:") return None # Cannot proceed without the image # --- 2. Calculate Geographic Corners --- corners_deg = _calculate_geo_corners_for_kml(geo_info_radians) if corners_deg is None: logging.error(f"{log_prefix} Failed to calculate SAR corners for KML.") png_output_path.unlink(missing_ok=True) # Attempt cleanup return None # --- 3. Create KML Document using simplekml --- center_lon_deg = math.degrees(geo_info_radians["lon"]) center_lat_deg = math.degrees(geo_info_radians["lat"]) kml_doc_name = f"GE_All_{ts_img}" kml = simplekml.Kml(name=kml_doc_name) # --- 4. Set LookAt View (Center on SAR) --- # (Code for LookAt generation is unchanged) try: width_km = ( geo_info_radians.get("scale_x", 1.0) * geo_info_radians.get("width_px", 1000) ) / 1000.0 height_km = ( geo_info_radians.get("scale_y", 1.0) * geo_info_radians.get("height_px", 1000) ) / 1000.0 view_altitude_m = max(width_km, height_km) * 2500.0 view_altitude_m = max(1000.0, min(view_altitude_m, 500000.0)) except: view_altitude_m = 25000.0 kml.document.lookat.longitude = center_lon_deg kml.document.lookat.latitude = center_lat_deg kml.document.lookat.range = view_altitude_m kml.document.lookat.tilt = 0 kml.document.lookat.heading = 0 kml.document.lookat.altitudemode = simplekml.AltitudeMode.clamptoground # --- 5. Add Placemarks (Points) --- # (Code for Placemarks and styles is unchanged) style_sar_center = simplekml.Style() style_sar_center.iconstyle.icon.href = ( "http://maps.google.com/mapfiles/kml/paddle/red-stars.png" ) style_sar_center.iconstyle.scale = 1.1 style_mouse_on_sar = simplekml.Style() style_mouse_on_sar.iconstyle.icon.href = ( "http://maps.google.com/mapfiles/kml/paddle/blu-circle.png" ) style_mouse_on_sar.iconstyle.scale = 1.1 style_mouse_on_map = simplekml.Style() style_mouse_on_map.iconstyle.icon.href = ( "http://maps.google.com/mapfiles/kml/paddle/grn-diamond.png" ) style_mouse_on_map.iconstyle.scale = 1.1 folder_points = kml.newfolder(name="Points") logging.debug(f"{log_prefix} Adding {len(points_data)} placemarks to KML.") for lat, lon, name, desc in points_data: point = folder_points.newpoint(name=name) point.coords = [(lon, lat)] if desc: point.description = desc # Apply style based on name if name == "SAR Center": point.style = style_sar_center elif name == "Mouse on SAR": point.style = style_mouse_on_sar elif name == "Mouse on Map": point.style = style_mouse_on_map # --- 6. Add SAR Footprint Polygon --- # (Code for Polygon is unchanged) folder_footprint = kml.newfolder(name="SAR Footprint") outer_boundary = [(lon, lat, 0) for lon, lat in corners_deg] + [ (corners_deg[0][0], corners_deg[0][1], 0) ] polygon = folder_footprint.newpolygon( name="SAR Outline", outerboundaryis=outer_boundary ) polygon.extrude = 0 polygon.altitudemode = simplekml.AltitudeMode.clamptoground polygon.style.linestyle.color = simplekml.Color.red polygon.style.linestyle.width = 2 polygon.style.polystyle.fill = 0 # No fill # --- >>> START OF MODIFIED OVERLAY SECTION <<< --- # --- 7. Add SAR Ground Overlay --- folder_overlay = kml.newfolder(name="SAR Overlay") # Create folder first if _lxml_available: # Add a simple, empty GroundOverlay using simplekml as a placeholder # We will modify its content later using lxml placeholder_overlay = folder_overlay.newgroundoverlay(name="SAR Image") placeholder_overlay.icon.href = kml_image_href placeholder_overlay.altitudemode = simplekml.AltitudeMode.clamptoground # Add a dummy LatLonBox which we will remove later placeholder_overlay.latlonbox.north = 0 placeholder_overlay.latlonbox.south = 0 placeholder_overlay.latlonbox.east = 0 placeholder_overlay.latlonbox.west = 0 placeholder_overlay.latlonbox.rotation = 0 logging.debug(f"{log_prefix} Added placeholder GroundOverlay using simplekml.") else: # --- Fallback if lxml not available --- logging.warning(f"{log_prefix} lxml not available. Falling back to standard GroundOverlay (may look distorted).") ground = folder_overlay.newgroundoverlay(name="SAR Image (Fallback)") ground.icon.href = kml_image_href lons = [c[0] for c in corners_deg] lats = [c[1] for c in corners_deg] north = max(lats) south = min(lats) east = max(lons) west = min(lons) orientation_deg = math.degrees(geo_info_radians.get("orientation", 0.0)) ground.latlonbox.north = north ground.latlonbox.south = south ground.latlonbox.east = east ground.latlonbox.west = west ground.latlonbox.rotation = -orientation_deg # KML rotation is often counter-clockwise ground.altitudemode = simplekml.AltitudeMode.clamptoground # --- 8. Save INITIAL KML File (potentially with placeholder overlay) --- ts_kml = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3] kml_filename = f"composite_{ts_kml}.kml" kml_output_path = kml_dir_path / kml_filename logging.debug(f"{log_prefix} Saving Initial KML to: {kml_output_path}") kml_output_path.parent.mkdir(parents=True, exist_ok=True) # Ensure again kml.save(str(kml_output_path)) # --- 9. Post-process KML with LXML to add gx:LatLonQuad (if available) --- if _lxml_available: logging.debug(f"{log_prefix} Post-processing KML with lxml to insert gx:LatLonQuad...") try: # Define namespaces NS_KML = "http://www.opengis.net/kml/2.2" NS_GX = "http://www.google.com/kml/ext/2.2" nsmap = {'gx': NS_GX, None: NS_KML} # None for default KML ns # Parse the saved KML file parser = etree.XMLParser(remove_blank_text=True) tree = etree.parse(str(kml_output_path), parser) root = tree.getroot() # Find the placeholder GroundOverlay element using XPath # Search within the "SAR Overlay" folder for a GroundOverlay named "SAR Image" xpath_expr = f'.//kml:Folder[kml:name="SAR Overlay"]/kml:GroundOverlay[kml:name="SAR Image"]' found_overlays = root.xpath(xpath_expr, namespaces={'kml': NS_KML}) if not found_overlays: logging.error(f"{log_prefix} lxml: Could not find the placeholder GroundOverlay element in KML.") # Proceed without modification? Or raise error? For now, log and continue. else: overlay_element = found_overlays[0] # Assume only one logging.debug(f"{log_prefix} lxml: Found placeholder GroundOverlay.") # Remove the dummy LatLonBox element latlonbox_elements = overlay_element.xpath('./kml:LatLonBox', namespaces={'kml': NS_KML}) for box in latlonbox_elements: logging.debug(f"{log_prefix} lxml: Removing dummy LatLonBox.") overlay_element.remove(box) # Prepare coordinates string: BL, BR, TR, TL (lon,lat,alt) bl_lon, bl_lat = corners_deg[3] br_lon, br_lat = corners_deg[2] tr_lon, tr_lat = corners_deg[1] tl_lon, tl_lat = corners_deg[0] coord_string = ( f"{bl_lon:.8f},{bl_lat:.8f},0 " f"{br_lon:.8f},{br_lat:.8f},0 " f"{tr_lon:.8f},{tr_lat:.8f},0 " f"{tl_lon:.8f},{tl_lat:.8f},0" ) # Create and append the gx:LatLonQuad element quad_elem = etree.Element(etree.QName(NS_GX, "LatLonQuad")) etree.SubElement(quad_elem, "coordinates").text = coord_string overlay_element.append(quad_elem) logging.debug(f"{log_prefix} lxml: Successfully inserted gx:LatLonQuad element.") # Overwrite the KML file with the modified tree logging.debug(f"{log_prefix} lxml: Saving modified KML back to {kml_output_path}...") tree.write(str(kml_output_path), pretty_print=True, xml_declaration=True, encoding='UTF-8') logging.info(f"{log_prefix} KML successfully updated with gx:LatLonQuad using lxml.") except Exception as lxml_err: logging.exception(f"{log_prefix} Error during lxml post-processing:") # KML file might be left in intermediate state (with placeholder overlay) # --- >>> END OF MODIFIED OVERLAY SECTION <<< --- logging.debug(f"{log_prefix} Composite KML generation process complete.") return str(kml_output_path) # Return the full path except Exception as e: logging.exception(f"{log_prefix} Error generating composite KML file:") # Attempt cleanup of the PNG if it was created if png_output_path is not None and png_output_path.exists(): try: png_output_path.unlink() logging.debug(f"{log_prefix} Cleaned up temporary PNG: {png_output_path}") except OSError: logging.warning(f"{log_prefix} Failed to cleanup temporary PNG: {png_output_path}") return None def launch_google_earth(kml_path: Optional[str]): """ Attempts to open a KML file using the system's default application or google-earth-pro directly. Args: kml_path (Optional[str]): The path to the KML file to launch. """ log_prefix = "[Utils Launch GE]" if not kml_path or not os.path.exists(kml_path): logging.error( f"{log_prefix} Cannot launch: KML file path invalid or not found: {kml_path}" ) return logging.info(f"{log_prefix} Attempting launch for KML: {kml_path}") try: if sys.platform == "win32": # Use os.startfile on Windows os.startfile(kml_path) elif sys.platform == "darwin": # Use 'open' command on macOS subprocess.run(["open", kml_path], check=True) else: # Use 'google-earth-pro' or 'xdg-open' on Linux/Unix cmd = shutil.which("google-earth-pro") or shutil.which("xdg-open") if cmd: logging.debug(f"{log_prefix} Using command: {cmd}") if cmd.endswith("pro"): # Popen for GE Pro to run in background potentially subprocess.Popen([cmd, kml_path]) else: # run for xdg-open, wait for it? subprocess.run([cmd, kml_path], check=True) else: # Log error if no suitable command found err_msg = "Neither google-earth-pro nor xdg-open found in PATH." logging.error(f"{log_prefix} {err_msg}") raise FileNotFoundError(err_msg) except Exception as e: logging.exception(f"{log_prefix} Error launching KML handler:") def cleanup_kml_output_directory(kml_directory: str, max_files_to_keep: int): """ Removes the oldest files (KML and associated PNGs) from the specified directory if the total count exceeds the configured limit. Relies on timestamps embedded in filenames (YYYYMMDD_HHMMSS_fff format). Args: kml_directory (str): The directory containing KML and PNG files. max_files_to_keep (int): The maximum number of files (KML+PNG) to retain. Cleanup is skipped if <= 0. """ log_prefix = "[Utils Cleanup KMLDir]" # Skip if cleanup is disabled or directory invalid if max_files_to_keep <= 0: return try: kml_dir_path = Path(kml_directory) if not kml_dir_path.is_dir(): logging.warning( f"{log_prefix} Output directory not found: '{kml_directory}'. Cannot cleanup." ) return # --- 1. Identify Files and Extract Timestamps --- all_files_with_ts: List[Tuple[Path, str]] = [] # Regex to capture the timestamp string from supported filename patterns timestamp_pattern = re.compile( r"^(?:sar_footprint_|composite_|multi_point_|sar_overlay_)" r"(\d{8}_\d{6}_\d{3})\." r"(?:kml|png)$" ) logging.debug(f"{log_prefix} Scanning directory: {kml_dir_path}") # Iterate through all files in the directory for item in kml_dir_path.iterdir(): if item.is_file(): match = timestamp_pattern.match(item.name) if match: timestamp_str = match.group(1) # Extract timestamp string all_files_with_ts.append((item, timestamp_str)) else: pass # Skip files not matching the pattern # --- 2. Check if Cleanup is Needed --- current_total_count = len(all_files_with_ts) logging.debug(f"{log_prefix} Found {current_total_count} potentially manageable files.") if current_total_count <= max_files_to_keep: logging.debug(f"{log_prefix} File count within limit ({max_files_to_keep}). No cleanup needed.") return # --- 3. Sort Files by Timestamp (Oldest First) --- all_files_with_ts.sort(key=lambda x: x[1]) logging.debug(f"{log_prefix} Sorted files by timestamp.") # --- 4. Determine Files to Delete --- num_to_delete = current_total_count - max_files_to_keep files_to_delete = all_files_with_ts[:num_to_delete] # Get the oldest ones logging.info( f"{log_prefix} Found {current_total_count} files. " f"Attempting to delete oldest {num_to_delete} (KML and PNG)..." ) # --- 5. Delete Oldest Files --- deleted_count = 0 for file_path, ts_str in files_to_delete: try: file_path.unlink() # Delete the file logging.debug(f"{log_prefix} Deleted old file: {file_path.name} (TS: {ts_str})") deleted_count += 1 except Exception as delete_e: logging.error( f"{log_prefix} Failed to delete file '{file_path.name}': {delete_e}" ) logging.info( f"{log_prefix} Cleanup finished. Deleted {deleted_count}/{num_to_delete} files." ) except Exception as e: logging.exception(f"{log_prefix} Error during KML/PNG cleanup process:") # --- END OF FILE utils.py ---