# --- START OF FILE utils.py --- # utils.py """ THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. Provides utility functions used throughout the application, including safe queue management (put, clear), coordinate formatting (decimal degrees to DMS and DMS string to decimal), KML generation and management, and opening web maps. Uses standardized logging prefixes. Drop counts are managed within AppState. """ # Standard library imports import queue import logging import math import os import datetime import sys import subprocess import shutil from pathlib import Path import webbrowser import urllib.parse import re # For DMS parsing from typing import Optional, Tuple, List, Any, Dict # Added List, Any, Dict import tempfile import ctypes # Needed for format_ctypes_structure # Import KML and GEO libraries, handling ImportError try: import simplekml _simplekml_available = True except ImportError: simplekml = None _simplekml_available = False logging.warning( "[Utils KML] Library 'simplekml' not found. KML generation disabled. " "(pip install simplekml)" ) try: import pyproj _pyproj_available = True except ImportError: pyproj = None _pyproj_available = False # Pyproj is needed for accurate geo calculations, log warning if missing logging.warning( "[Utils Geo] Library 'pyproj' not found. " "Some geometric calculations (KML corners, BBox) may be less accurate. " "(pip install pyproj)" ) # --- Queue Management Functions --- def put_queue(queue_obj, item, queue_name="Unknown", app_instance=None): """ Safely puts an item into a queue using non-blocking put. Increments specific drop counters in the AppState object if the queue is full. Discards item if the application is shutting down. Args: queue_obj (queue.Queue): The queue instance. item (Any): The item to put into the queue. queue_name (str): Name of the queue (for logging/stats). app_instance (Optional[ControlPanelApp]): Reference to the main app instance (for shutdown check and drop count). """ log_prefix = "[Utils Queue Put]" # Check shutdown flag via app_instance's state if ( app_instance and hasattr(app_instance, "state") and app_instance.state.shutting_down ): # Silently discard if shutting down (or log debug -1 if needed) # logging.log(logging.DEBUG - 1, f"{log_prefix} Discarding item for queue '{queue_name}': Shutdown.") return try: # Put item without blocking queue_obj.put(item, block=False) # Log successful put at very low debug level if needed # logging.log(logging.DEBUG - 1, f"{log_prefix} Item put onto queue '{queue_name}'.") except queue.Full: # Log queue full warning (consider reducing level if too noisy) logging.warning(f"{log_prefix} Queue '{queue_name}' is full. Dropping item.") # Increment the specific drop counter via the AppState instance if app_instance and hasattr(app_instance, "state"): try: app_instance.state.increment_dropped_count(queue_name) except Exception as e: # Log error only if incrementing fails (shouldn't happen) logging.error( f"{log_prefix} Failed to increment drop count for '{queue_name}': {e}" ) else: # Log error if app instance/state is not available to increment count logging.error( f"{log_prefix} Cannot increment drop count for '{queue_name}': " "App instance or state missing." ) except Exception as e: # Log any other unexpected errors during queue put logging.exception( f"{log_prefix} Unexpected error putting item onto queue '{queue_name}': {e}" ) def clear_queue(q: queue.Queue): """Removes all items currently in the specified queue.""" log_prefix = "[Utils Queue Clear]" try: q_size_before = q.qsize() # Get approximate size before clearing # Ensure thread safety during clear operation with q.mutex: q.queue.clear() logging.debug( f"{log_prefix} Cleared queue (approx size before: {q_size_before})." ) except Exception as e: logging.exception(f"{log_prefix} Error clearing queue: {e}") # --- Coordinate Formatting --- def decimal_to_dms(decimal_degrees: float, is_latitude: bool) -> str: """ Converts decimal degrees to a formatted DMS (Degrees, Minutes, Seconds) string. Returns "N/A", "Invalid Lat/Lon", or "Error DMS" on error. Args: decimal_degrees (float): Coordinate value in decimal degrees. is_latitude (bool): True if latitude, False if longitude. Returns: str: Formatted DMS string or an error indicator. """ log_prefix = "[Utils DMS Conv]" coord_type = "Latitude" if is_latitude else "Longitude" try: # Input validation if ( not isinstance(decimal_degrees, (int, float)) or math.isnan(decimal_degrees) or math.isinf(decimal_degrees) ): logging.warning( f"{log_prefix} Invalid input type or value ({decimal_degrees}) " f"for {coord_type}." ) return "N/A" # Range check and direction determination if is_latitude: if not (-90.0 <= decimal_degrees <= 90.0): logging.warning( f"{log_prefix} Latitude {decimal_degrees} out of range." ) return "Invalid Lat" direction = "N" if decimal_degrees >= 0 else "S" deg_pad = 2 # Padding for degrees (e.g., 01°, 89°) else: # Longitude if not (-180.0 <= decimal_degrees <= 180.0): logging.warning( f"{log_prefix} Longitude {decimal_degrees} out of range." ) return "Invalid Lon" direction = "E" if decimal_degrees >= 0 else "W" deg_pad = 3 # Padding for degrees (e.g., 001°, 179°) # Calculations dd_abs = abs(decimal_degrees) degrees = math.floor(dd_abs) minutes_dec = (dd_abs - degrees) * 60.0 minutes = math.floor(minutes_dec) seconds = (minutes_dec - minutes) * 60.0 # Handle floating point precision near 60.0 for seconds/minutes if abs(seconds - 60.0) < 1e-9: seconds = 0.0 minutes += 1 if minutes == 60: minutes = 0 degrees += 1 # Recalculate absolute degrees if degrees incremented past range limit # (e.g., 89° 59' 59.999" N should become 90° 00' 00.00" N) if is_latitude and degrees > 90: degrees = 90 minutes = 0 seconds = 0.0 if not is_latitude and degrees > 180: degrees = 180 minutes = 0 seconds = 0.0 # Format the final string dms_string = ( f"{degrees:0{deg_pad}d}° {minutes:02d}' {seconds:05.2f}\" {direction}" ) # logging.debug(f"{log_prefix} Converted {decimal_degrees:.6f} -> {dms_string}") return dms_string except Exception as e: logging.exception( f"{log_prefix} Error converting {decimal_degrees} to DMS: {e}" ) return "Error DMS" def dms_string_to_decimal(dms_str: str, is_latitude: bool) -> Optional[float]: """ Converts a DMS string (e.g., "46° 09' 50.24\" N", "009° 23' 14.11\" E") to decimal degrees. Args: dms_str (str): The DMS string to parse. is_latitude (bool): True if the string represents latitude, False for longitude. Returns: Optional[float]: The coordinate in decimal degrees, or None on parsing error. """ log_prefix = "[Utils DMS Parse]" if not dms_str or not isinstance(dms_str, str): logging.warning(f"{log_prefix} Invalid input string: {dms_str}") return None # Regex to capture degrees, minutes, seconds, and direction # Allows for variable spacing and ° ' " or ” symbols. pattern = re.compile( r"^\s*(\d{1,3})\s*[°]\s*" # Degrees (1-3 digits) + ° symbol r"(\d{1,2})\s*[']\s*" # Minutes (1-2 digits) + ' symbol r"([\d.]+)\s*[\"”]\s*" # Seconds (digits, dot) + " or ” symbol r"([NSEWnsew])\s*$", # Direction (N,S,E,W case-insensitive) re.IGNORECASE, ) match = pattern.match(dms_str.strip()) if not match: logging.error(f"{log_prefix} Could not parse DMS string format: '{dms_str}'") return None try: # Extract components as floats degrees = float(match.group(1)) minutes = float(match.group(2)) seconds = float(match.group(3)) direction = match.group(4).upper() # Validate component ranges if not (0 <= minutes < 60 and 0 <= seconds < 60): logging.error( f"{log_prefix} Invalid minutes ({minutes}) or seconds ({seconds}) " f"in DMS: '{dms_str}'" ) return None # Validate direction based on coordinate type valid_dirs = ("N", "S") if is_latitude else ("E", "W") if direction not in valid_dirs: coord_type = "latitude" if is_latitude else "longitude" logging.error( f"{log_prefix} Invalid direction '{direction}' for {coord_type}: " f"'{dms_str}'" ) return None # Calculate decimal degrees decimal_degrees = degrees + (minutes / 60.0) + (seconds / 3600.0) # Apply sign based on direction if direction in ("S", "W"): decimal_degrees *= -1.0 # Final range check for the calculated decimal value if is_latitude and not (-90.0 <= decimal_degrees <= 90.0): logging.warning( f"{log_prefix} Calculated latitude {decimal_degrees:.7f} out of range." ) return None # Treat out of range as error if not is_latitude and not (-180.0 <= decimal_degrees <= 180.0): logging.warning( f"{log_prefix} Calculated longitude {decimal_degrees:.7f} out of range." ) return None logging.debug(f"{log_prefix} Parsed '{dms_str}' -> {decimal_degrees:.7f}") return decimal_degrees except ValueError as ve: # Handle errors during float conversion logging.error( f"{log_prefix} Error converting components to float: {ve} in '{dms_str}'" ) return None except Exception as e: logging.exception( f"{log_prefix} Unexpected error parsing DMS string '{dms_str}':" ) return None # --- Metadata Formatting Function --- def format_ctypes_structure(structure: ctypes.Structure, indent_level: int = 0) -> str: """ Generates a formatted string representation of a ctypes Structure, handling nested structures, arrays, and basic types recursively. Args: structure: The ctypes.Structure instance to format. indent_level: The current indentation level for nested structures. Returns: A multi-line string representing the structure's content. """ indent = " " * indent_level # Define indentation string result = "" # Check if it's actually a structure with _fields_ attribute if not hasattr(structure, "_fields_"): # Fallback for non-structures or basic types passed accidentally return f"{indent}Value: {repr(structure)}\n" # Iterate through the fields defined in the structure for field_name, field_type in structure._fields_: try: # Get the value of the current field value = getattr(structure, field_name) formatted_value = "" # --- Handle Nested Structures --- if hasattr(field_type, "_fields_"): # If the field type itself is another Structure result += f"{indent}{field_name} ({field_type.__name__}):\n" # Recursively call formatting for the nested structure result += format_ctypes_structure(value, indent_level + 1) # --- Handle Arrays --- elif issubclass(field_type, ctypes.Array): array_len = getattr(field_type, "_length_", "N/A") elem_type = getattr(field_type, "_type_", "N/A") elem_name = getattr(elem_type, "__name__", "?") # Special handling for byte arrays (show hex preview) if issubclass(elem_type, (ctypes.c_byte, ctypes.c_ubyte)): try: # Attempt conversion to bytes for hex preview byte_value = bytes(value[:16]) # Limit preview length preview = byte_value.hex().upper() if len(value) > 16: preview += "..." formatted_value = f"<{elem_name}[{array_len}] Data: {preview}>" except: # Fallback if conversion to bytes fails formatted_value = f"<{elem_name}[{array_len}] (Preview N/A)>" # Handle simple numeric arrays (show first few elements) elif isinstance(value, (list, tuple)): preview = str(list(value[:8])) # Limit preview length if len(value) > 8: # Indicate truncation preview = preview[:-1] + ", ...]" formatted_value = f"<{elem_name}[{array_len}] Data: {preview}>" else: # For other array types, just show type and size formatted_value = f"<{elem_name}[{array_len}]>" result += f"{indent}{field_name}: {formatted_value}\n" # --- Handle Basic Types --- else: # --- Special Formatting for Known Fields --- # GeoData floats (convert radians to degrees for readability) if field_name in ( "LATITUDE", "LONGITUDE", "ORIENTATION", "POI_LATITUDE", "POI_LONGITUDE", "POI_ORIENTATION", ) and isinstance(value, float): if math.isfinite(value): deg_value = math.degrees(value) prec = 2 if "ORIENTATION" in field_name else 6 formatted_value = f"{value:.7f} rad ({deg_value:.{prec}f} deg)" else: formatted_value = f"{value} rad (Non-finite)" # SFP Header special fields (show hex and char if printable) elif field_name in ( "SFP_MARKER", "SFP_PT_SPARE", "SFP_TAG", "SFP_SRC", "SFP_TID", "SFP_FLAGS", "SFP_WIN", "SFP_ERR", "SFP_ERR_INFO", "SFP_RECTYPE", "SFP_RECSPARE", "SFP_PLDAP", "SFP_PLEXT", ) and isinstance(value, int): formatted_value = f"0x{value:02X} ({value})" elif field_name in ("SFP_DIRECTION", "SFP_FLOW") and isinstance( value, int ): char_repr = chr(value) if 32 <= value <= 126 else "?" formatted_value = f"0x{value:02X} ('{char_repr}')" # Handle byte strings (show decoded + hex preview) elif isinstance(value, bytes): try: decoded = value.decode("ascii", errors="replace") hex_preview = value.hex().upper()[:32] # Limit hex preview suffix = "..." if len(value) > 16 else "" formatted_value = f"'{decoded}' (hex: {hex_preview}{suffix})" except: formatted_value = repr(value) # Fallback if not decodable # Default representation for other types else: formatted_value = repr(value) result += f"{indent}{field_name}: {formatted_value}\n" except AttributeError: # Handle cases where getattr might fail (should be rare) result += f"{indent}{field_name}: \n" except Exception as e: # Catch any other formatting errors for a specific field result += f"{indent}{field_name}: \n" return result # --- Google Maps Launcher --- def open_google_maps(latitude_deg: float, longitude_deg: float): """Opens the default web browser to Google Maps centered on the specified coordinates.""" log_prefix = "[Utils Gmaps]" # Validate coordinates if not ( isinstance(latitude_deg, (int, float)) and math.isfinite(latitude_deg) and -90 <= latitude_deg <= 90 ): logging.error(f"{log_prefix} Invalid latitude: {latitude_deg}.") return if not ( isinstance(longitude_deg, (int, float)) and math.isfinite(longitude_deg) and -180 <= longitude_deg <= 180 ): logging.error(f"{log_prefix} Invalid longitude: {longitude_deg}.") return try: # Construct URL using the search API for better pinning query_str = f"{latitude_deg:.7f},{longitude_deg:.7f}" # URL encode the query string encoded_query = urllib.parse.quote(query_str) gmaps_url = f"https://www.google.com/maps/search/?api=1&query={encoded_query}" logging.info(f"{log_prefix} Opening Google Maps URL: {gmaps_url}") # Open in a new tab if possible opened_ok = webbrowser.open_new_tab(gmaps_url) if not opened_ok: logging.warning(f"{log_prefix} webbrowser.open_new_tab returned False.") # Fallback attempt if new tab fails webbrowser.open(gmaps_url) except Exception as e: logging.exception(f"{log_prefix} Failed to open Google Maps URL:") # --- KML Generation and Management --- def _calculate_geo_corners_for_kml( geo_info_radians: Dict[str, Any], ) -> Optional[List[Tuple[float, float]]]: """Internal helper to calculate geographic corners (degrees) from geo_info (radians). Requires pyproj.""" # Requires pyproj library if not _pyproj_available: logging.error("[Utils KML Calc] pyproj library needed for corner calculation.") return None log_prefix = "[Utils KML Calc]" try: # Define the geodetic calculator (WGS84 ellipsoid) geod = pyproj.Geod(ellps="WGS84") # Extract necessary info from the dictionary center_lat_rad = geo_info_radians["lat"] center_lon_rad = geo_info_radians["lon"] # Orientation: Use the *negative* for forward projection from North # (Angle typically defines rotation FROM North axis) orient_rad = geo_info_radians["orientation"] calc_orient_rad = -orient_rad # Angle used for projection ref_x = geo_info_radians["ref_x"] ref_y = geo_info_radians["ref_y"] scale_x = geo_info_radians["scale_x"] scale_y = geo_info_radians["scale_y"] width = geo_info_radians["width_px"] height = geo_info_radians["height_px"] # Validate inputs if not (scale_x > 0 and scale_y > 0 and width > 0 and height > 0): logging.error(f"{log_prefix} Invalid scale or dimensions in geo_info.") return None # Define corner pixel coordinates relative to reference pixel (ref_x, ref_y) # Top-Left (0,0), Top-Right (w-1, 0), Bottom-Right (w-1, h-1), Bottom-Left (0, h-1) # Note: Pixel Y increases downwards corners_pixel_delta = [ (0 - ref_x, ref_y - 0), # Top-Left Delta (width - 1 - ref_x, ref_y - 0), # Top-Right Delta (width - 1 - ref_x, ref_y - (height - 1)), # Bottom-Right Delta (0 - ref_x, ref_y - (height - 1)), # Bottom-Left Delta ] # Convert pixel deltas to meter deltas (unrotated) corners_meters = [ (dx * scale_x, dy * scale_y) for dx, dy in corners_pixel_delta ] # Rotate meter deltas based on orientation if significant corners_meters_rotated = corners_meters if abs(calc_orient_rad) > 1e-6: # Only rotate if angle is non-zero cos_o = math.cos(calc_orient_rad) sin_o = math.sin(calc_orient_rad) corners_meters_rotated = [ (dx * cos_o - dy * sin_o, dx * sin_o + dy * cos_o) for dx, dy in corners_meters ] # Calculate geographic coordinates for each corner using forward projection corners_geo_deg = [] center_lon_deg = math.degrees(center_lon_rad) center_lat_deg = math.degrees(center_lat_rad) for dx_m, dy_m in corners_meters_rotated: # Calculate distance and azimuth from center to corner distance = math.hypot(dx_m, dy_m) # Azimuth: angle from North clockwise (atan2 gives angle from East CCW) azimuth = math.degrees(math.atan2(dx_m, dy_m)) % 360.0 # Ensure 0-360 range # Use pyproj geod.fwd for accurate projection lon, lat, _ = geod.fwd(center_lon_deg, center_lat_deg, azimuth, distance) corners_geo_deg.append((lon, lat)) # Store (longitude, latitude) # Ensure we have exactly 4 corners if len(corners_geo_deg) == 4: # logging.debug(f"{log_prefix} Calculated corners (Lon, Lat): {corners_geo_deg}") return corners_geo_deg else: logging.error(f"{log_prefix} Incorrect number of corners calculated.") return None except KeyError as ke: logging.error(f"{log_prefix} Missing required key in geo_info_radians: {ke}") return None except Exception as e: logging.exception(f"{log_prefix} Error calculating geographic corners for KML:") return None def generate_sar_kml(geo_info_radians: Dict[str, Any], output_path: str) -> bool: """ Generates a KML file representing the SAR footprint area as a polygon. Args: geo_info_radians (Dict[str, Any]): Dictionary with SAR georeferencing. output_path (str): The full path where the KML file should be saved. Returns: bool: True if KML generation and saving were successful, False otherwise. """ log_prefix = "[Utils KML Gen]" # Check for required libraries if not _simplekml_available or not _pyproj_available: logging.error(f"{log_prefix} Cannot generate KML: simplekml or pyproj missing.") return False # Check for valid input geo_info if not geo_info_radians or not geo_info_radians.get("valid", False): logging.warning(f"{log_prefix} Cannot generate KML: Invalid geo_info provided.") return False try: # Calculate geographic corner coordinates corners_deg = _calculate_geo_corners_for_kml(geo_info_radians) if corners_deg is None: logging.error(f"{log_prefix} Failed to calculate KML corners.") return False # Error already logged by helper # Extract center and orientation for LookAt view center_lon_deg = math.degrees(geo_info_radians["lon"]) center_lat_deg = math.degrees(geo_info_radians["lat"]) orientation_deg = math.degrees(geo_info_radians.get("orientation", 0.0)) # Estimate a reasonable viewing altitude based on image size try: width_km = ( geo_info_radians.get("scale_x", 1.0) * geo_info_radians.get("width_px", 1000) ) / 1000.0 height_km = ( geo_info_radians.get("scale_y", 1.0) * geo_info_radians.get("height_px", 1000) ) / 1000.0 # Simple heuristic for altitude (e.g., 2x the larger dimension in km * 1000) view_altitude_m = max(width_km, height_km) * 2000.0 # Clamp altitude to a reasonable range view_altitude_m = max(1000.0, min(view_altitude_m, 500000.0)) except: view_altitude_m = 10000.0 # Default altitude on error # Create KML object kml = simplekml.Kml(name=f"SAR Image {datetime.datetime.now():%Y%m%d_%H%M%S}") # Set LookAt view kml.document.lookat.longitude = center_lon_deg kml.document.lookat.latitude = center_lat_deg kml.document.lookat.range = view_altitude_m kml.document.lookat.tilt = 45.0 # Default tilt kml.document.lookat.heading = orientation_deg % 360.0 # Ensure 0-360 heading kml.document.lookat.altitudemode = simplekml.AltitudeMode.relativetoground # Create the polygon footprint # KML uses (longitude, latitude, altitude) tuples # Close the polygon by repeating the first point at the end outer_boundary = [(lon, lat, 0) for lon, lat in corners_deg] + [ (corners_deg[0][0], corners_deg[0][1], 0) ] polygon = kml.newpolygon(name="SAR Footprint", outerboundaryis=outer_boundary) # Style the polygon (red outline, semi-transparent red fill) polygon.style.linestyle.color = simplekml.Color.red polygon.style.linestyle.width = 2 polygon.style.polystyle.color = simplekml.Color.changealphaint( 100, simplekml.Color.red # ~40% opacity red fill ) polygon.style.polystyle.outline = 1 # Ensure outline is drawn # Save the KML file kml.save(output_path) logging.debug(f"{log_prefix} KML file saved successfully: {output_path}") return True except Exception as e: logging.exception(f"{log_prefix} Error generating or saving KML file:") return False def generate_lookat_and_point_kml( latitude_deg: float, longitude_deg: float, altitude_m: float = 10000.0, tilt: float = 45.0, heading: float = 0.0, placemark_name: str = "Selected Location", placemark_desc: Optional[str] = None, ) -> Optional[str]: """ Generates a temporary KML file containing a LookAt element and a Placemark point. Args: latitude_deg (float): Target latitude in decimal degrees. longitude_deg (float): Target longitude in decimal degrees. altitude_m (float): Viewing altitude in meters for LookAt. tilt (float): Camera tilt angle for LookAt. heading (float): Camera heading/azimuth for LookAt. placemark_name (str): Name for the placemark. placemark_desc (Optional[str]): Optional description for the placemark. Returns: Optional[str]: Path to the temporary KML file, or None on error. """ log_prefix = "[Utils KML LookAtPoint]" if not _simplekml_available: logging.error(f"{log_prefix} Cannot generate KML: simplekml missing.") return None # Validate coordinates if not ( isinstance(latitude_deg, (int, float)) and math.isfinite(latitude_deg) and -90 <= latitude_deg <= 90 ): logging.error(f"{log_prefix} Invalid latitude provided: {latitude_deg}.") return None if not ( isinstance(longitude_deg, (int, float)) and math.isfinite(longitude_deg) and -180 <= longitude_deg <= 180 ): logging.error(f"{log_prefix} Invalid longitude provided: {longitude_deg}.") return None try: kml = simplekml.Kml(name=placemark_name) # Use name for KML document too # Set LookAt parameters kml.document.lookat.longitude = longitude_deg kml.document.lookat.latitude = latitude_deg kml.document.lookat.range = max(100.0, altitude_m) # Min range 100m kml.document.lookat.tilt = tilt kml.document.lookat.heading = heading % 360.0 kml.document.lookat.altitudemode = simplekml.AltitudeMode.relativetoground # Add the Placemark point point = kml.newpoint(name=placemark_name) # simplekml uses (lon, lat, optional_alt) order for coordinates point.coords = [(longitude_deg, latitude_deg)] if placemark_desc: point.description = placemark_desc # Create a temporary file (delete=False so GE can open it) # Consider using a subfolder in the main app temp/cache dir? with tempfile.NamedTemporaryFile( mode="w", suffix=".kml", delete=False, encoding="utf-8" ) as temp_kml: temp_kml_path = temp_kml.name logging.debug( f"{log_prefix} Saving temporary LookAt+Point KML to: {temp_kml_path}" ) temp_kml.write(kml.kml()) # Write KML content logging.debug(f"{log_prefix} LookAt+Point KML file created successfully.") return temp_kml_path except Exception as e: logging.exception(f"{log_prefix} Error generating LookAt+Point KML file:") return None def generate_multi_point_kml( points_data: List[Tuple[float, float, str, Optional[str]]], ) -> Optional[str]: """ Generates a temporary KML file containing multiple Placemarks (points) and a LookAt view potentially encompassing them. Args: points_data (List[Tuple[float, float, str, Optional[str]]]): List of (latitude_deg, longitude_deg, placemark_name, optional_desc). Returns: Optional[str]: Path to the temporary KML file, or None on error. """ log_prefix = "[Utils KML MultiPoint]" if not _simplekml_available: logging.error(f"{log_prefix} Cannot generate KML: simplekml missing.") return None if not points_data: logging.warning(f"{log_prefix} No points provided to generate KML.") return None try: kml = simplekml.Kml(name="Multiple Locations") # --- Determine LookAt View (Look at first point) --- first_lat, first_lon, _, _ = points_data[0] kml.document.lookat.longitude = first_lon kml.document.lookat.latitude = first_lat kml.document.lookat.range = 20000 # Default range for multiple points kml.document.lookat.tilt = 30 kml.document.lookat.heading = 0 kml.document.lookat.altitudemode = simplekml.AltitudeMode.relativetoground # --- Define styles --- style_sar_center = simplekml.Style() style_sar_center.iconstyle.icon.href = ( "http://maps.google.com/mapfiles/kml/paddle/red-stars.png" ) style_sar_center.iconstyle.scale = 1.1 style_mouse_on_sar = simplekml.Style() style_mouse_on_sar.iconstyle.icon.href = ( "http://maps.google.com/mapfiles/kml/paddle/blu-circle.png" ) style_mouse_on_sar.iconstyle.scale = 1.1 style_mouse_on_map = simplekml.Style() style_mouse_on_map.iconstyle.icon.href = ( "http://maps.google.com/mapfiles/kml/paddle/grn-diamond.png" ) style_mouse_on_map.iconstyle.scale = 1.1 # --- Add Placemarks --- logging.debug(f"{log_prefix} Adding {len(points_data)} placemarks to KML.") for lat, lon, name, desc in points_data: point = kml.newpoint(name=name) point.coords = [(lon, lat)] # (lon, lat) order if desc: point.description = desc # Apply style based on name if name == "SAR Center": point.style = style_sar_center elif name == "Mouse on SAR": point.style = style_mouse_on_sar elif name == "Mouse on Map": point.style = style_mouse_on_map # else: default style # --- Save to Temporary File --- with tempfile.NamedTemporaryFile( mode="w", suffix=".kml", delete=False, encoding="utf-8" ) as temp_kml: temp_kml_path = temp_kml.name logging.debug( f"{log_prefix} Saving temporary Multi-Point KML to: {temp_kml_path}" ) temp_kml.write(kml.kml()) logging.debug(f"{log_prefix} Multi-Point KML file created successfully.") return temp_kml_path except Exception as e: logging.exception(f"{log_prefix} Error generating Multi-Point KML file:") return None def launch_google_earth(kml_path: Optional[str]): """ Attempts to open a KML file using the system's default application or google-earth-pro directly. Args: kml_path (Optional[str]): The path to the KML file to launch. """ log_prefix = "[Utils Launch GE]" if not kml_path or not os.path.exists(kml_path): logging.error( f"{log_prefix} Cannot launch: KML file path invalid or not found: {kml_path}" ) return logging.info(f"{log_prefix} Attempting launch for KML: {kml_path}") try: if sys.platform == "win32": # Use os.startfile on Windows os.startfile(kml_path) elif sys.platform == "darwin": # Use 'open' command on macOS subprocess.run(["open", kml_path], check=True) else: # Use 'google-earth-pro' or 'xdg-open' on Linux/Unix cmd = shutil.which("google-earth-pro") or shutil.which("xdg-open") if cmd: logging.debug(f"{log_prefix} Using command: {cmd}") if cmd.endswith("pro"): # Popen for GE Pro to run in background potentially subprocess.Popen([cmd, kml_path]) else: # run for xdg-open, wait for it? subprocess.run([cmd, kml_path], check=True) else: # Log error if no suitable command found err_msg = "Neither google-earth-pro nor xdg-open found in PATH." logging.error(f"{log_prefix} {err_msg}") raise FileNotFoundError(err_msg) except Exception as e: logging.exception(f"{log_prefix} Error launching KML handler:") def cleanup_old_kml_files(kml_directory: str, max_files_to_keep: int): """Removes the oldest KML files from the directory if count exceeds limit.""" log_prefix = "[Utils KML Cleanup]" # Skip if cleanup is disabled or directory invalid if max_files_to_keep <= 0: return try: kml_dir_path = Path(kml_directory) if not kml_dir_path.is_dir(): logging.warning( f"{log_prefix} KML directory not found: '{kml_directory}'. Cannot cleanup." ) return # Get list of KML files with modification times kml_files = [] for item in kml_dir_path.glob("*.kml"): if item.is_file(): try: # Get modification time mtime = item.stat().st_mtime kml_files.append((item, mtime)) except Exception as stat_e: # Log error getting stats but continue with others logging.warning( f"{log_prefix} Could not stat file '{item.name}': {stat_e}" ) current_count = len(kml_files) # Only proceed if count exceeds the limit if current_count <= max_files_to_keep: return # Sort files by modification time (oldest first) kml_files.sort(key=lambda x: x[1]) # Determine number of files to delete num_to_delete = current_count - max_files_to_keep # Get the list of files to delete files_to_delete = kml_files[:num_to_delete] logging.info( f"{log_prefix} Found {current_count} KML files. " f"Attempting to delete oldest {num_to_delete}..." ) deleted_count = 0 # Iterate and delete oldest files for file_path, _ in files_to_delete: try: file_path.unlink() # Delete the file logging.debug(f"{log_prefix} Deleted old KML: {file_path.name}") deleted_count += 1 except Exception as delete_e: # Log error deleting specific file but continue logging.error( f"{log_prefix} Failed to delete file '{file_path.name}': {delete_e}" ) logging.info( f"{log_prefix} Cleanup finished. Deleted {deleted_count}/{num_to_delete} files." ) except Exception as e: logging.exception(f"{log_prefix} Error during KML cleanup process:") # --- END OF FILE utils.py ---