# utils.py """ THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. Provides utility functions used throughout the application, including safe queue management (put, clear) and coordinate formatting (decimal degrees to DMS). Uses standardized logging prefixes. Drop counts are now managed within AppState. """ # Standard library imports import queue import logging import math import os import datetime import sys import subprocess import shutil from pathlib import Path # Importa le librerie KML e GEO, gestendo l'ImportError try: import simplekml _simplekml_available = True except ImportError: simplekml = None _simplekml_available = False logging.warning( "[Utils KML] Library 'simplekml' not found. KML generation disabled. (pip install simplekml)" ) try: import pyproj _pyproj_available = True except ImportError: pyproj = None _pyproj_available = False logging.warning( "[Utils KML] Library 'pyproj' not found. KML generation requires it for corner calculation. (pip install pyproj)" ) # Removed: threading (Lock is now in AppState) # Local application imports # Removed: Global counters and lock # --- Queue Management Functions --- # --- MODIFIED FUNCTION: put_queue --- def put_queue(queue_obj, item, queue_name="Unknown", app_instance=None): """ Safely puts an item into a queue using non-blocking put. Increments specific drop counters in the AppState object if the queue is full. Discards item if the application is shutting down. Args: queue_obj (queue.Queue): The queue instance. item (any): The item to put in the queue. queue_name (str): Identifier for the queue (for logging/counting: "sar", "mfd", "tkinter", "mouse"). app_instance (App, optional): Reference to the main app instance which holds the AppState. Required for shutdown check and incrementing drop counts. """ log_prefix = "[Utils Queue Put]" # Specific prefix # Check shutdown flag via app_instance if ( app_instance and hasattr(app_instance, "state") and app_instance.state.shutting_down ): logging.debug( f"{log_prefix} Queue '{queue_name}': Skipping put due to shutdown." ) return # Don't queue if shutting down try: # Attempt non-blocking put queue_obj.put(item, block=False) # DEBUG log for successful put logging.debug( f"{log_prefix} Item successfully put onto queue '{queue_name}' (Current approx size: {queue_obj.qsize()})." ) except queue.Full: # WARNING level is appropriate for dropped items due to full queue logging.debug( f"{log_prefix} Queue '{queue_name}' is full (maxsize={queue_obj.maxsize}). Dropping item." ) # Increment the counter via the AppState instance if app_instance and hasattr(app_instance, "state"): # Use the dedicated method in AppState for safe incrementing try: app_instance.state.increment_dropped_count(queue_name) except Exception as e: logging.error( f"{log_prefix} Failed to increment drop count for queue '{queue_name}': {e}" ) else: logging.error( f"{log_prefix} Cannot increment drop count for queue '{queue_name}': App instance or state missing." ) pass # Item is discarded except Exception as e: # Keep EXCEPTION for unexpected errors during put logging.exception( f"{log_prefix} Unexpected error putting item onto queue '{queue_name}': {e}" ) # --- REMOVED FUNCTION: get_dropped_counts --- # This is now handled by AppState.get_statistics() def clear_queue(q): """Removes all items currently in the specified queue.""" log_prefix = "[Utils Queue Clear]" # Specific prefix try: q_size_before = q.qsize() # Get approximate size before clearing # Clear the underlying deque while holding the mutex for safety with q.mutex: q.queue.clear() # DEBUG log confirming the action logging.debug( f"{log_prefix} Cleared queue (approx size before: {q_size_before})." ) except Exception as e: # Keep EXCEPTION for unexpected errors during clear logging.exception(f"{log_prefix} Error clearing queue: {e}") # --- Coordinate Formatting --- def decimal_to_dms(decimal_degrees, is_latitude): """ Converts decimal degrees to a formatted DMS (Degrees, Minutes, Seconds) string. Args: decimal_degrees (float): The coordinate value in decimal degrees. is_latitude (bool): True if the coordinate is latitude, False for longitude. Returns: str: Formatted DMS string (e.g., "40° 51' 54.33\" N") or status string on error. """ log_prefix = "[Utils DMS Conv]" # Specific prefix # DEBUG for function entry and input values coord_type = "Latitude" if is_latitude else "Longitude" logging.debug(f"{log_prefix} Converting {coord_type} {decimal_degrees} to DMS...") try: # Validate input type - Return "N/A" for invalid types if ( not isinstance(decimal_degrees, (int, float)) or math.isnan(decimal_degrees) or math.isinf(decimal_degrees) ): # WARNING if input is invalid type logging.warning( f"{log_prefix} Invalid input type or value ({decimal_degrees}). Returning 'N/A'." ) return "N/A" # Determine direction and padding if is_latitude: direction = "N" if decimal_degrees >= 0 else "S" degrees_padding = 2 # Validate range if ( abs(decimal_degrees) > 90.000001 ): # Allow slight tolerance for float issues # WARNING for out-of-range input logging.warning( f"{log_prefix} Latitude {decimal_degrees} out of range (-90 to 90). Returning 'Invalid Lat'." ) return "Invalid Lat" else: # Longitude direction = "E" if decimal_degrees >= 0 else "W" degrees_padding = 3 # Validate range if abs(decimal_degrees) > 180.000001: # Allow slight tolerance # WARNING for out-of-range input logging.warning( f"{log_prefix} Longitude {decimal_degrees} out of range (-180 to 180). Returning 'Invalid Lon'." ) return "Invalid Lon" # Use absolute value for calculations decimal_degrees = abs(decimal_degrees) # Calculate components - DEBUG for intermediate values degrees = math.floor(decimal_degrees) minutes_decimal = (decimal_degrees - degrees) * 60.0 minutes = math.floor(minutes_decimal) seconds = (minutes_decimal - minutes) * 60.0 # Handle potential floating point inaccuracies near 60 seconds if abs(seconds - 60.0) < 1e-9: seconds = 0.0 minutes += 1 if minutes == 60: minutes = 0 degrees += 1 logging.debug( f"{log_prefix} Calculated components: D={degrees}, M={minutes}, S={seconds:.4f}" ) # Format the string - Use f-strings for clarity # Ensure seconds formatting handles potential negative sign space if needed (though abs value used now) dms_string = f"{degrees:0{degrees_padding}d}° {minutes:02d}' {seconds:05.2f}\" {direction}" # DEBUG for successful formatting result logging.debug(f"{log_prefix} Formatting successful: '{dms_string}'") return dms_string except Exception as e: # Keep EXCEPTION for unexpected errors during conversion/formatting logging.exception( f"{log_prefix} Error converting {decimal_degrees} to DMS: {e}" ) return "Error DMS" # Return specific error string def _calculate_geo_corners_for_kml(geo_info_radians): """ Helper interno per calcolare i corner geografici (gradi) da geo_info (radianti). Basato sulla logica di MapIntegrationManager._calculate_sar_corners_geo. Richiede pyproj. Restituisce lista di tuple (lon, lat) in gradi o None. """ if not _pyproj_available: return None log_prefix = "[Utils KML Calc]" try: geod = pyproj.Geod(ellps="WGS84") # Estrai dati necessari (gestisci KeyError) center_lat_rad = geo_info_radians["lat"] center_lon_rad = geo_info_radians["lon"] orient_rad = geo_info_radians["orientation"] ref_x = geo_info_radians["ref_x"] ref_y = geo_info_radians["ref_y"] scale_x = geo_info_radians["scale_x"] scale_y = geo_info_radians["scale_y"] width = geo_info_radians["width_px"] height = geo_info_radians["height_px"] orient_rad = -orient_rad # inverse angle if not (scale_x > 0 and scale_y > 0 and width > 0 and height > 0): logging.error(f"{log_prefix} Invalid scale/dimensions in geo_info.") return None # Calcola offset pixel e metri corners_pixel = [ (0 - ref_x, ref_y - 0), (width - 1 - ref_x, ref_y - 0), (width - 1 - ref_x, ref_y - (height - 1)), (0 - ref_x, ref_y - (height - 1)), ] corners_meters = [(dx * scale_x, dy * scale_y) for dx, dy in corners_pixel] # Applica rotazione corners_meters_rotated = [] if abs(orient_rad) > 1e-6: cos_o = math.cos(orient_rad) sin_o = math.sin(orient_rad) for dx_m, dy_m in corners_meters: rot_dx = dx_m * cos_o - dy_m * sin_o rot_dy = dx_m * sin_o + dy_m * cos_o corners_meters_rotated.append((rot_dx, rot_dy)) else: corners_meters_rotated = corners_meters # Calcola coordinate geografiche finali sar_corners_geo_deg = [] center_lon_deg = math.degrees(center_lon_rad) center_lat_deg = math.degrees(center_lat_rad) for dx_m_rot, dy_m_rot in corners_meters_rotated: distance_m = math.sqrt(dx_m_rot**2 + dy_m_rot**2) azimuth_rad = math.atan2(dx_m_rot, dy_m_rot) azimuth_deg = math.degrees(azimuth_rad) endlon, endlat, _ = geod.fwd( center_lon_deg, center_lat_deg, azimuth_deg, distance_m ) sar_corners_geo_deg.append((endlon, endlat)) # (lon, lat) if len(sar_corners_geo_deg) == 4: return sar_corners_geo_deg else: logging.error(f"{log_prefix} Failed to calculate all 4 corner coordinates.") return None except KeyError as ke: logging.error(f"{log_prefix} Missing required key in geo_info_radians: {ke}") return None except Exception as e: logging.exception(f"{log_prefix} Error calculating geographic corners for KML:") return None def generate_sar_kml(geo_info_radians, output_path) -> bool: """ Genera un file KML rappresentante l'area SAR. Args: geo_info_radians (dict): Dizionario GeoInfo con valori in radianti. output_path (str): Percorso completo dove salvare il file .kml. Returns: bool: True se il KML è stato generato e salvato, False altrimenti. """ log_prefix = "[Utils KML Gen]" if not _simplekml_available or not _pyproj_available: logging.error( f"{log_prefix} Cannot generate KML: simplekml or pyproj library missing." ) return False if not geo_info_radians or not geo_info_radians.get("valid", False): logging.warning( f"{log_prefix} Cannot generate KML: Invalid or missing GeoInfo." ) return False try: # Calcola i corner in gradi corners_deg = _calculate_geo_corners_for_kml(geo_info_radians) if corners_deg is None: logging.error(f"{log_prefix} Failed to calculate SAR corners for KML.") return False # Errore già loggato nella funzione helper # Estrai centro e orientamento (converti in gradi per KML) center_lon_deg = math.degrees(geo_info_radians["lon"]) center_lat_deg = math.degrees(geo_info_radians["lat"]) orientation_deg = math.degrees(geo_info_radians["orientation"]) # KML usa gradi # Calcola dimensione approssimativa per l'altitudine della vista width_km = ( geo_info_radians.get("scale_x", 1) * geo_info_radians.get("width_px", 1) ) / 1000.0 height_km = ( geo_info_radians.get("scale_y", 1) * geo_info_radians.get("height_px", 1) ) / 1000.0 view_altitude_m = ( max(width_km, height_km) * 2000 ) # Altitudine vista = 2 * dimensione max in metri # Crea oggetto KML kml = simplekml.Kml(name=f"SAR Image {datetime.datetime.now():%Y%m%d_%H%M%S}") # Aggiungi LookAt per centrare la vista kml.document.lookat.longitude = center_lon_deg kml.document.lookat.latitude = center_lat_deg kml.document.lookat.range = ( view_altitude_m # Distanza in metri dalla coordinata ) kml.document.lookat.tilt = 45 # Angolo di vista (0=diretto verso il basso) kml.document.lookat.heading = ( orientation_deg # Orientamento della camera (0=Nord) ) # Aggiungi un segnaposto al centro # placemark = kml.newpoint(name="SAR Center", coords=[(center_lon_deg, center_lat_deg)]) # Aggiungi poligono per il footprint SAR # Nota: simplekml si aspetta [(lon,lat,alt), (lon,lat,alt), ...] # L'altitudine è opzionale, la mettiamo a 0 rispetto al suolo. outer_boundary = [(lon, lat, 0) for lon, lat in corners_deg] pol = kml.newpolygon(name="SAR Footprint", outerboundaryis=outer_boundary) pol.style.linestyle.color = simplekml.Color.red # Colore linea pol.style.linestyle.width = 2 # Spessore linea pol.style.polystyle.color = simplekml.Color.changealphaint( 100, simplekml.Color.red ) # Rosso semi-trasparente per riempimento # Salva il file KML logging.debug(f"{log_prefix} Saving KML to: {output_path}") kml.save(output_path) logging.debug(f"{log_prefix} KML file saved successfully: {output_path}") return True except Exception as e: logging.exception(f"{log_prefix} Error generating or saving KML file:") return False def launch_google_earth(kml_path): """ Tenta di aprire un file KML con l'applicazione predefinita del sistema (che dovrebbe essere Google Earth Pro se installato correttamente). Args: kml_path (str): Percorso del file KML da aprire. """ log_prefix = "[Utils Launch GE]" if not os.path.exists(kml_path): logging.error(f"{log_prefix} Cannot launch: KML file not found at {kml_path}") return logging.info( f"{log_prefix} Attempting to launch default KML handler for: {kml_path}" ) try: if sys.platform == "win32": os.startfile( kml_path ) # Metodo standard Windows per aprire un file con l'app associata elif sys.platform == "darwin": # macOS subprocess.run(["open", kml_path], check=True) else: # Linux e altri Unix-like # Tenta di trovare google-earth-pro nel PATH google_earth_cmd = shutil.which("google-earth-pro") if google_earth_cmd: subprocess.Popen([google_earth_cmd, kml_path]) logging.debug( f"{log_prefix} Launched using found command: {google_earth_cmd}" ) else: # Fallback: usa xdg-open che usa l'associazione MIME logging.debug( f"{log_prefix} 'google-earth-pro' not in PATH, using 'xdg-open'..." ) subprocess.run(["xdg-open", kml_path], check=True) logging.info(f"{log_prefix} Launch command issued for {kml_path}.") except FileNotFoundError: # Questo può accadere su Linux se né google-earth-pro né xdg-open sono trovati logging.error( f"{log_prefix} Launch command failed: Command not found (is Google Earth Pro installed and in PATH, or xdg-utils installed?)" ) except subprocess.CalledProcessError as e: # Errore da 'open' o 'xdg-open' logging.error(f"{log_prefix} Error launching KML handler: {e}") except Exception as e: logging.exception(f"{log_prefix} Unexpected error launching Google Earth:") def cleanup_old_kml_files(kml_directory: str, max_files_to_keep: int): """ Removes the oldest KML files from the specified directory if the total number of KML files exceeds max_files_to_keep. Args: kml_directory (str): The path to the directory containing KML files. max_files_to_keep (int): The maximum number of KML files to retain. If 0 or less, cleanup is disabled. """ log_prefix = "[Utils KML Cleanup]" if max_files_to_keep <= 0: logging.debug( f"{log_prefix} KML cleanup disabled (max_files_to_keep={max_files_to_keep})." ) return logging.debug( f"{log_prefix} Checking directory '{kml_directory}' for KML files older than the newest {max_files_to_keep}." ) try: kml_dir_path = Path(kml_directory) if not kml_dir_path.is_dir(): logging.warning( f"{log_prefix} Directory '{kml_directory}' not found. Cannot perform cleanup." ) return # 1. List all .kml files with their modification times kml_files = [] for item in kml_dir_path.glob("*.kml"): if item.is_file(): try: # Get modification time mtime = item.stat().st_mtime kml_files.append((item, mtime)) except OSError as stat_err: logging.warning( f"{log_prefix} Could not get modification time for '{item.name}': {stat_err}" ) except Exception as stat_e: logging.exception( f"{log_prefix} Unexpected error getting stat for '{item.name}':" ) # 2. Check if cleanup is needed current_file_count = len(kml_files) logging.debug( f"{log_prefix} Found {current_file_count} KML files. Max allowed: {max_files_to_keep}." ) if current_file_count <= max_files_to_keep: logging.debug( f"{log_prefix} File count is within limit. No cleanup needed." ) return # 3. Sort files by modification time (oldest first) kml_files.sort(key=lambda x: x[1]) # Sort by the second element (mtime) # 4. Determine files to delete num_to_delete = current_file_count - max_files_to_keep files_to_delete = kml_files[:num_to_delete] # Get the oldest ones logging.debug(f"{log_prefix} Need to delete {num_to_delete} oldest KML files.") # 5. Delete the oldest files deleted_count = 0 for file_path, _ in files_to_delete: try: file_path.unlink() # Use unlink() for Path objects (equivalent to os.remove) logging.debug(f"{log_prefix} Deleted old KML file: {file_path.name}") deleted_count += 1 except OSError as delete_err: logging.error( f"{log_prefix} Failed to delete file '{file_path.name}': {delete_err}" ) except Exception as delete_e: logging.exception( f"{log_prefix} Unexpected error deleting file '{file_path.name}':" ) logging.debug( f"{log_prefix} Cleanup finished. Deleted {deleted_count}/{num_to_delete} files." ) except Exception as e: logging.exception(f"{log_prefix} Error during KML cleanup process:")