# --- START OF FILE utils.py --- # utils.py """ THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. Provides utility functions used throughout the application, including safe queue management (put, clear), coordinate formatting (decimal degrees to DMS and DMS string to decimal), KML generation and management, and opening web maps. Uses standardized logging prefixes. Drop counts are managed within AppState. """ # Standard library imports import queue import logging import math import os import datetime import sys import subprocess import shutil from pathlib import Path import webbrowser import urllib.parse import re # For DMS parsing from typing import Optional, Tuple # Import Optional and Tuple for type hints # Import KML and GEO libraries, handling ImportError try: import simplekml _simplekml_available = True except ImportError: simplekml = None _simplekml_available = False logging.warning( "[Utils KML] Library 'simplekml' not found. KML generation disabled. (pip install simplekml)" ) try: import pyproj _pyproj_available = True except ImportError: pyproj = None _pyproj_available = False logging.warning( "[Utils KML] Library 'pyproj' not found. KML generation requires it. (pip install pyproj)" ) # --- Queue Management Functions --- def put_queue(queue_obj, item, queue_name="Unknown", app_instance=None): """ Safely puts an item into a queue using non-blocking put. Increments specific drop counters in the AppState object if the queue is full. Discards item if the application is shutting down. """ log_prefix = "[Utils Queue Put]" # Check shutdown flag via app_instance if ( app_instance and hasattr(app_instance, "state") and app_instance.state.shutting_down ): # logging.debug(...) # Reduce verbosity return try: queue_obj.put(item, block=False) # logging.debug(...) # Reduce verbosity except queue.Full: # logging.debug(...) # Reduce verbosity # Increment the counter via the AppState instance if app_instance and hasattr(app_instance, "state"): try: app_instance.state.increment_dropped_count(queue_name) except Exception as e: logging.error( f"{log_prefix} Failed to increment drop count for '{queue_name}': {e}" ) else: logging.error( f"{log_prefix} Cannot increment drop count for '{queue_name}': App instance or state missing." ) except Exception as e: logging.exception( f"{log_prefix} Unexpected error putting item onto queue '{queue_name}': {e}" ) def clear_queue(q): """Removes all items currently in the specified queue.""" log_prefix = "[Utils Queue Clear]" try: q_size_before = q.qsize() with q.mutex: q.queue.clear() logging.debug( f"{log_prefix} Cleared queue (approx size before: {q_size_before})." ) except Exception as e: logging.exception(f"{log_prefix} Error clearing queue: {e}") # --- Coordinate Formatting --- def decimal_to_dms(decimal_degrees: float, is_latitude: bool) -> str: """ Converts decimal degrees to a formatted DMS (Degrees, Minutes, Seconds) string. Returns "N/A", "Invalid Lat/Lon", or "Error DMS" on error. """ log_prefix = "[Utils DMS Conv]" coord_type = "Latitude" if is_latitude else "Longitude" # logging.debug(...) # Reduce verbosity try: if ( not isinstance(decimal_degrees, (int, float)) or math.isnan(decimal_degrees) or math.isinf(decimal_degrees) ): logging.warning( f"{log_prefix} Invalid input type or value ({decimal_degrees}) for {coord_type}." ) return "N/A" # Determine direction and padding if is_latitude: if abs(decimal_degrees) > 90.000001: return "Invalid Lat" direction = "N" if decimal_degrees >= 0 else "S" deg_pad = 2 else: # Longitude if abs(decimal_degrees) > 180.000001: return "Invalid Lon" direction = "E" if decimal_degrees >= 0 else "W" deg_pad = 3 dd_abs = abs(decimal_degrees) degrees = math.floor(dd_abs) minutes_dec = (dd_abs - degrees) * 60.0 minutes = math.floor(minutes_dec) seconds = (minutes_dec - minutes) * 60.0 # Handle float inaccuracies near 60 if abs(seconds - 60.0) < 1e-9: seconds = 0.0 minutes += 1 if minutes == 60: minutes = 0 degrees += 1 dms_string = ( f"{degrees:0{deg_pad}d}° {minutes:02d}' {seconds:05.2f}\" {direction}" ) # logging.debug(...) # Reduce verbosity return dms_string except Exception as e: logging.exception( f"{log_prefix} Error converting {decimal_degrees} to DMS: {e}" ) return "Error DMS" # --- >>> START OF NEW FUNCTION <<< --- def dms_string_to_decimal(dms_str: str, is_latitude: bool) -> Optional[float]: """ Converts a DMS string (e.g., "46° 09' 50.24\" N", "009° 23' 14.11\" E") to decimal degrees. Args: dms_str (str): The DMS string to parse. is_latitude (bool): True if the string represents latitude, False for longitude. Returns: Optional[float]: The coordinate in decimal degrees, or None on parsing error. """ log_prefix = "[Utils DMS Parse]" if not dms_str or not isinstance(dms_str, str): logging.warning(f"{log_prefix} Invalid input string: {dms_str}") return None # Regex to capture degrees, minutes, seconds, and direction (flexible with spaces) pattern = re.compile( r"^\s*(\d{1,3})\s*[°]\s*(\d{1,2})\s*[']\s*([\d.]+)\s*[\"”]\s*([NSEWnsew])\s*$", # Accept " or ” re.IGNORECASE, ) match = pattern.match(dms_str.strip()) if not match: logging.error(f"{log_prefix} Could not parse DMS string format: '{dms_str}'") return None try: degrees = float(match.group(1)) minutes = float(match.group(2)) seconds = float(match.group(3)) direction = match.group(4).upper() # Validate components if not (0 <= minutes < 60 and 0 <= seconds < 60): logging.error( f"{log_prefix} Invalid minutes or seconds in DMS: '{dms_str}'" ) return None # Validate direction valid_dirs = ("N", "S") if is_latitude else ("E", "W") if direction not in valid_dirs: type_str = "latitude" if is_latitude else "longitude" logging.error( f"{log_prefix} Invalid direction '{direction}' for {type_str}: '{dms_str}'" ) return None # Calculate decimal degrees decimal_degrees = degrees + (minutes / 60.0) + (seconds / 3600.0) # Apply sign based on direction if direction in ("S", "W"): decimal_degrees *= -1.0 # Final range check if is_latitude and not (-90.0 <= decimal_degrees <= 90.0): logging.warning( f"{log_prefix} Calculated latitude {decimal_degrees:.6f} out of range." ) return None # Treat out of range as error if not is_latitude and not (-180.0 <= decimal_degrees <= 180.0): logging.warning( f"{log_prefix} Calculated longitude {decimal_degrees:.6f} out of range." ) return None logging.debug(f"{log_prefix} Parsed '{dms_str}' -> {decimal_degrees:.7f}") return decimal_degrees except ValueError as ve: logging.error( f"{log_prefix} Error converting components to float: {ve} in '{dms_str}'" ) return None except Exception as e: logging.exception( f"{log_prefix} Unexpected error parsing DMS string '{dms_str}':" ) return None # --- >>> END OF NEW FUNCTION <<< --- # --- Google Maps Launcher --- def open_google_maps(latitude_deg: float, longitude_deg: float): """Opens the default web browser to Google Maps centered on the specified coordinates.""" log_prefix = "[Utils Gmaps]" # Validate coordinates if not ( isinstance(latitude_deg, (int, float)) and math.isfinite(latitude_deg) and -90 <= latitude_deg <= 90 ): logging.error(f"{log_prefix} Invalid latitude: {latitude_deg}.") return if not ( isinstance(longitude_deg, (int, float)) and math.isfinite(longitude_deg) and -180 <= longitude_deg <= 180 ): logging.error(f"{log_prefix} Invalid longitude: {longitude_deg}.") return try: # Construct URL (search API usually shows a pin) query_str = f"{latitude_deg:.7f},{longitude_deg:.7f}" gmaps_url = f"https://www.google.com/maps/search/?api=1&query={query_str}" logging.info(f"{log_prefix} Opening Google Maps URL: {gmaps_url}") opened_ok = webbrowser.open_new_tab(gmaps_url) # Open in new tab if not opened_ok: logging.warning(f"{log_prefix} webbrowser.open_new_tab returned False.") except Exception as e: logging.exception(f"{log_prefix} Failed to open Google Maps URL:") # --- KML Generation and Management --- def _calculate_geo_corners_for_kml(geo_info_radians): """Internal helper to calculate geographic corners (degrees) from geo_info (radians). Requires pyproj.""" # (Implementation unchanged from previous version) if not _pyproj_available: return None log_prefix = "[Utils KML Calc]" try: geod = pyproj.Geod(ellps="WGS84") center_lat_rad = geo_info_radians["lat"] center_lon_rad = geo_info_radians["lon"] orient_rad = geo_info_radians["orientation"] calc_orient_rad = -orient_rad ref_x = geo_info_radians["ref_x"] ref_y = geo_info_radians["ref_y"] scale_x = geo_info_radians["scale_x"] scale_y = geo_info_radians["scale_y"] width = geo_info_radians["width_px"] height = geo_info_radians["height_px"] if not (scale_x > 0 and scale_y > 0 and width > 0 and height > 0): return None corners_pixel = [ (0 - ref_x, ref_y - 0), (width - 1 - ref_x, ref_y - 0), (width - 1 - ref_x, ref_y - (height - 1)), (0 - ref_x, ref_y - (height - 1)), ] corners_meters = [(dx * scale_x, dy * scale_y) for dx, dy in corners_pixel] corners_meters_rotated = corners_meters if abs(calc_orient_rad) > 1e-6: cos_o, sin_o = math.cos(calc_orient_rad), math.sin(calc_orient_rad) corners_meters_rotated = [ (dx * cos_o - dy * sin_o, dx * sin_o + dy * cos_o) for dx, dy in corners_meters ] corners_geo_deg = [] center_lon_deg, center_lat_deg = math.degrees(center_lon_rad), math.degrees( center_lat_rad ) for dx_m, dy_m in corners_meters_rotated: dist = math.hypot(dx_m, dy_m) az = math.degrees(math.atan2(dx_m, dy_m)) lon, lat, _ = geod.fwd(center_lon_deg, center_lat_deg, az, dist) corners_geo_deg.append((lon, lat)) return corners_geo_deg if len(corners_geo_deg) == 4 else None except KeyError as ke: logging.error(f"{log_prefix} Missing key in geo_info_radians: {ke}") return None except Exception as e: logging.exception(f"{log_prefix} Error calculating geographic corners for KML:") return None def generate_sar_kml(geo_info_radians, output_path) -> bool: """Generates a KML file representing the SAR footprint area.""" # (Implementation unchanged from previous version) log_prefix = "[Utils KML Gen]" if not _simplekml_available or not _pyproj_available: return False if not geo_info_radians or not geo_info_radians.get("valid", False): return False try: corners_deg = _calculate_geo_corners_for_kml(geo_info_radians) if corners_deg is None: return False center_lon = math.degrees(geo_info_radians["lon"]) center_lat = math.degrees(geo_info_radians["lat"]) orientation = math.degrees(geo_info_radians["orientation"]) width_km = ( geo_info_radians.get("scale_x", 1) * geo_info_radians.get("width_px", 1) ) / 1000.0 height_km = ( geo_info_radians.get("scale_y", 1) * geo_info_radians.get("height_px", 1) ) / 1000.0 view_alt = max(width_km, height_km) * 2000 kml = simplekml.Kml(name=f"SAR Image {datetime.datetime.now():%Y%m%d_%H%M%S}") kml.document.lookat.longitude = center_lon kml.document.lookat.latitude = center_lat kml.document.lookat.range = view_alt kml.document.lookat.tilt = 45 kml.document.lookat.heading = orientation outer_boundary = [(lon, lat, 0) for lon, lat in corners_deg] pol = kml.newpolygon(name="SAR Footprint", outerboundaryis=outer_boundary) pol.style.linestyle.color = simplekml.Color.red pol.style.linestyle.width = 2 pol.style.polystyle.color = simplekml.Color.changealphaint( 100, simplekml.Color.red ) kml.save(output_path) logging.debug(f"{log_prefix} KML file saved successfully: {output_path}") return True except Exception as e: logging.exception(f"{log_prefix} Error generating or saving KML file:") return False def launch_google_earth(kml_path): """Attempts to open a KML file using the system's default application.""" # (Implementation unchanged from previous version) log_prefix = "[Utils Launch GE]" if not os.path.exists(kml_path): logging.error(f"{log_prefix} KML file not found: {kml_path}") return logging.info(f"{log_prefix} Attempting launch for: {kml_path}") try: if sys.platform == "win32": os.startfile(kml_path) elif sys.platform == "darwin": subprocess.run(["open", kml_path], check=True) else: # Linux/Unix cmd = shutil.which("google-earth-pro") or shutil.which("xdg-open") if cmd: ( subprocess.Popen([cmd, kml_path]) if cmd.endswith("pro") else subprocess.run([cmd, kml_path], check=True) ) else: raise FileNotFoundError("Neither google-earth-pro nor xdg-open found.") except Exception as e: logging.exception(f"{log_prefix} Error launching Google Earth:") def cleanup_old_kml_files(kml_directory: str, max_files_to_keep: int): """Removes the oldest KML files from the directory if count exceeds limit.""" # (Implementation unchanged from previous version) log_prefix = "[Utils KML Cleanup]" if max_files_to_keep <= 0: return try: kml_dir_path = Path(kml_directory) if not kml_dir_path.is_dir(): return kml_files = [] for item in kml_dir_path.glob("*.kml"): if item.is_file(): try: kml_files.append((item, item.stat().st_mtime)) except Exception as stat_e: logging.warning(f"{log_prefix} Stat error '{item.name}': {stat_e}") current_count = len(kml_files) if current_count <= max_files_to_keep: return kml_files.sort(key=lambda x: x[1]) num_to_delete = current_count - max_files_to_keep deleted_count = 0 for file_path, _ in kml_files[:num_to_delete]: try: file_path.unlink() deleted_count += 1 except Exception as delete_e: logging.error( f"{log_prefix} Delete error '{file_path.name}': {delete_e}" ) logging.debug(f"{log_prefix} Deleted {deleted_count}/{num_to_delete} files.") except Exception as e: logging.exception(f"{log_prefix} Error during KML cleanup:") # --- END OF FILE utils.py ---