SXXXXXXX_ControlPanel/utils.py
2025-04-09 09:06:21 +02:00

404 lines
16 KiB
Python

# --- START OF FILE utils.py ---
# utils.py
"""
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
Provides utility functions used throughout the application, including
safe queue management (put, clear) and coordinate formatting (decimal degrees to DMS).
Uses standardized logging prefixes. Drop counts are now managed within AppState.
"""
# Standard library imports
import queue
import logging
import math
import os # Aggiunto
import datetime # Aggiunto per timestamp
import sys # Aggiunto per platform check
import subprocess # Aggiunto per lanciare processi
import shutil # Aggiunto per trovare eseguibili (opzionale)
# Importa le librerie KML e GEO, gestendo l'ImportError
try:
import simplekml
_simplekml_available = True
except ImportError:
simplekml = None
_simplekml_available = False
logging.warning("[Utils KML] Library 'simplekml' not found. KML generation disabled. (pip install simplekml)")
try:
import pyproj
_pyproj_available = True
except ImportError:
pyproj = None
_pyproj_available = False
logging.warning("[Utils KML] Library 'pyproj' not found. KML generation requires it for corner calculation. (pip install pyproj)")
# Removed: threading (Lock is now in AppState)
# Local application imports
# Removed: Global counters and lock
# --- Queue Management Functions ---
# --- MODIFIED FUNCTION: put_queue ---
def put_queue(queue_obj, item, queue_name="Unknown", app_instance=None):
"""
Safely puts an item into a queue using non-blocking put.
Increments specific drop counters in the AppState object if the queue is full.
Discards item if the application is shutting down.
Args:
queue_obj (queue.Queue): The queue instance.
item (any): The item to put in the queue.
queue_name (str): Identifier for the queue (for logging/counting: "sar", "mfd", "tkinter", "mouse").
app_instance (App, optional): Reference to the main app instance which holds the AppState.
Required for shutdown check and incrementing drop counts.
"""
log_prefix = "[Utils Queue Put]" # Specific prefix
# Check shutdown flag via app_instance
if (
app_instance
and hasattr(app_instance, "state")
and app_instance.state.shutting_down
):
logging.debug(
f"{log_prefix} Queue '{queue_name}': Skipping put due to shutdown."
)
return # Don't queue if shutting down
try:
# Attempt non-blocking put
queue_obj.put(item, block=False)
# DEBUG log for successful put
logging.debug(
f"{log_prefix} Item successfully put onto queue '{queue_name}' (Current approx size: {queue_obj.qsize()})."
)
except queue.Full:
# WARNING level is appropriate for dropped items due to full queue
logging.warning(
f"{log_prefix} Queue '{queue_name}' is full (maxsize={queue_obj.maxsize}). Dropping item."
)
# Increment the counter via the AppState instance
if app_instance and hasattr(app_instance, "state"):
# Use the dedicated method in AppState for safe incrementing
try:
app_instance.state.increment_dropped_count(queue_name)
except Exception as e:
logging.error(
f"{log_prefix} Failed to increment drop count for queue '{queue_name}': {e}"
)
else:
logging.error(
f"{log_prefix} Cannot increment drop count for queue '{queue_name}': App instance or state missing."
)
pass # Item is discarded
except Exception as e:
# Keep EXCEPTION for unexpected errors during put
logging.exception(
f"{log_prefix} Unexpected error putting item onto queue '{queue_name}': {e}"
)
# --- REMOVED FUNCTION: get_dropped_counts ---
# This is now handled by AppState.get_statistics()
def clear_queue(q):
"""Removes all items currently in the specified queue."""
log_prefix = "[Utils Queue Clear]" # Specific prefix
try:
q_size_before = q.qsize() # Get approximate size before clearing
# Clear the underlying deque while holding the mutex for safety
with q.mutex:
q.queue.clear()
# DEBUG log confirming the action
logging.debug(
f"{log_prefix} Cleared queue (approx size before: {q_size_before})."
)
except Exception as e:
# Keep EXCEPTION for unexpected errors during clear
logging.exception(f"{log_prefix} Error clearing queue: {e}")
# --- Coordinate Formatting ---
def decimal_to_dms(decimal_degrees, is_latitude):
"""
Converts decimal degrees to a formatted DMS (Degrees, Minutes, Seconds) string.
Args:
decimal_degrees (float): The coordinate value in decimal degrees.
is_latitude (bool): True if the coordinate is latitude, False for longitude.
Returns:
str: Formatted DMS string (e.g., "40° 51' 54.33\" N") or status string on error.
"""
log_prefix = "[Utils DMS Conv]" # Specific prefix
# DEBUG for function entry and input values
coord_type = "Latitude" if is_latitude else "Longitude"
logging.debug(f"{log_prefix} Converting {coord_type} {decimal_degrees} to DMS...")
try:
# Validate input type - Return "N/A" for invalid types
if (
not isinstance(decimal_degrees, (int, float))
or math.isnan(decimal_degrees)
or math.isinf(decimal_degrees)
):
# WARNING if input is invalid type
logging.warning(
f"{log_prefix} Invalid input type or value ({decimal_degrees}). Returning 'N/A'."
)
return "N/A"
# Determine direction and padding
if is_latitude:
direction = "N" if decimal_degrees >= 0 else "S"
degrees_padding = 2
# Validate range
if (
abs(decimal_degrees) > 90.000001
): # Allow slight tolerance for float issues
# WARNING for out-of-range input
logging.warning(
f"{log_prefix} Latitude {decimal_degrees} out of range (-90 to 90). Returning 'Invalid Lat'."
)
return "Invalid Lat"
else: # Longitude
direction = "E" if decimal_degrees >= 0 else "W"
degrees_padding = 3
# Validate range
if abs(decimal_degrees) > 180.000001: # Allow slight tolerance
# WARNING for out-of-range input
logging.warning(
f"{log_prefix} Longitude {decimal_degrees} out of range (-180 to 180). Returning 'Invalid Lon'."
)
return "Invalid Lon"
# Use absolute value for calculations
decimal_degrees = abs(decimal_degrees)
# Calculate components - DEBUG for intermediate values
degrees = math.floor(decimal_degrees)
minutes_decimal = (decimal_degrees - degrees) * 60.0
minutes = math.floor(minutes_decimal)
seconds = (minutes_decimal - minutes) * 60.0
# Handle potential floating point inaccuracies near 60 seconds
if abs(seconds - 60.0) < 1e-9:
seconds = 0.0
minutes += 1
if minutes == 60:
minutes = 0
degrees += 1
logging.debug(
f"{log_prefix} Calculated components: D={degrees}, M={minutes}, S={seconds:.4f}"
)
# Format the string - Use f-strings for clarity
# Ensure seconds formatting handles potential negative sign space if needed (though abs value used now)
dms_string = f"{degrees:0{degrees_padding}d}° {minutes:02d}' {seconds:05.2f}\" {direction}"
# DEBUG for successful formatting result
logging.debug(f"{log_prefix} Formatting successful: '{dms_string}'")
return dms_string
except Exception as e:
# Keep EXCEPTION for unexpected errors during conversion/formatting
logging.exception(
f"{log_prefix} Error converting {decimal_degrees} to DMS: {e}"
)
return "Error DMS" # Return specific error string
def _calculate_geo_corners_for_kml(geo_info_radians):
"""
Helper interno per calcolare i corner geografici (gradi) da geo_info (radianti).
Basato sulla logica di MapIntegrationManager._calculate_sar_corners_geo.
Richiede pyproj.
Restituisce lista di tuple (lon, lat) in gradi o None.
"""
if not _pyproj_available: return None
log_prefix = "[Utils KML Calc]"
try:
geod = pyproj.Geod(ellps="WGS84")
# Estrai dati necessari (gestisci KeyError)
center_lat_rad = geo_info_radians['lat']
center_lon_rad = geo_info_radians['lon']
orient_rad = geo_info_radians['orientation']
ref_x = geo_info_radians['ref_x']
ref_y = geo_info_radians['ref_y']
scale_x = geo_info_radians['scale_x']
scale_y = geo_info_radians['scale_y']
width = geo_info_radians['width_px']
height = geo_info_radians['height_px']
orient_rad = -orient_rad #inverse angle
if not (scale_x > 0 and scale_y > 0 and width > 0 and height > 0):
logging.error(f"{log_prefix} Invalid scale/dimensions in geo_info.")
return None
# Calcola offset pixel e metri
corners_pixel = [
(0 - ref_x, ref_y - 0),
(width - 1 - ref_x, ref_y - 0),
(width - 1 - ref_x, ref_y - (height - 1)),
(0 - ref_x, ref_y - (height - 1))
]
corners_meters = [(dx * scale_x, dy * scale_y) for dx, dy in corners_pixel]
# Applica rotazione
corners_meters_rotated = []
if abs(orient_rad) > 1e-6:
cos_o = math.cos(orient_rad)
sin_o = math.sin(orient_rad)
for dx_m, dy_m in corners_meters:
rot_dx = dx_m * cos_o - dy_m * sin_o
rot_dy = dx_m * sin_o + dy_m * cos_o
corners_meters_rotated.append((rot_dx, rot_dy))
else:
corners_meters_rotated = corners_meters
# Calcola coordinate geografiche finali
sar_corners_geo_deg = []
center_lon_deg = math.degrees(center_lon_rad)
center_lat_deg = math.degrees(center_lat_rad)
for dx_m_rot, dy_m_rot in corners_meters_rotated:
distance_m = math.sqrt(dx_m_rot**2 + dy_m_rot**2)
azimuth_rad = math.atan2(dx_m_rot, dy_m_rot)
azimuth_deg = math.degrees(azimuth_rad)
endlon, endlat, _ = geod.fwd(center_lon_deg, center_lat_deg, azimuth_deg, distance_m)
sar_corners_geo_deg.append((endlon, endlat)) # (lon, lat)
if len(sar_corners_geo_deg) == 4:
return sar_corners_geo_deg
else:
logging.error(f"{log_prefix} Failed to calculate all 4 corner coordinates.")
return None
except KeyError as ke:
logging.error(f"{log_prefix} Missing required key in geo_info_radians: {ke}")
return None
except Exception as e:
logging.exception(f"{log_prefix} Error calculating geographic corners for KML:")
return None
def generate_sar_kml(geo_info_radians, output_path) -> bool:
"""
Genera un file KML rappresentante l'area SAR.
Args:
geo_info_radians (dict): Dizionario GeoInfo con valori in radianti.
output_path (str): Percorso completo dove salvare il file .kml.
Returns:
bool: True se il KML è stato generato e salvato, False altrimenti.
"""
log_prefix = "[Utils KML Gen]"
if not _simplekml_available or not _pyproj_available:
logging.error(f"{log_prefix} Cannot generate KML: simplekml or pyproj library missing.")
return False
if not geo_info_radians or not geo_info_radians.get("valid", False):
logging.warning(f"{log_prefix} Cannot generate KML: Invalid or missing GeoInfo.")
return False
try:
# Calcola i corner in gradi
corners_deg = _calculate_geo_corners_for_kml(geo_info_radians)
if corners_deg is None:
logging.error(f"{log_prefix} Failed to calculate SAR corners for KML.")
return False # Errore già loggato nella funzione helper
# Estrai centro e orientamento (converti in gradi per KML)
center_lon_deg = math.degrees(geo_info_radians['lon'])
center_lat_deg = math.degrees(geo_info_radians['lat'])
orientation_deg = math.degrees(geo_info_radians['orientation']) # KML usa gradi
# Calcola dimensione approssimativa per l'altitudine della vista
width_km = (geo_info_radians.get('scale_x', 1) * geo_info_radians.get('width_px', 1)) / 1000.0
height_km = (geo_info_radians.get('scale_y', 1) * geo_info_radians.get('height_px', 1)) / 1000.0
view_altitude_m = max(width_km, height_km) * 2000 # Altitudine vista = 2 * dimensione max in metri
# Crea oggetto KML
kml = simplekml.Kml(name=f"SAR Image {datetime.datetime.now():%Y%m%d_%H%M%S}")
# Aggiungi LookAt per centrare la vista
kml.document.lookat.longitude = center_lon_deg
kml.document.lookat.latitude = center_lat_deg
kml.document.lookat.range = view_altitude_m # Distanza in metri dalla coordinata
kml.document.lookat.tilt = 45 # Angolo di vista (0=diretto verso il basso)
kml.document.lookat.heading = orientation_deg # Orientamento della camera (0=Nord)
# Aggiungi un segnaposto al centro
# placemark = kml.newpoint(name="SAR Center", coords=[(center_lon_deg, center_lat_deg)])
# Aggiungi poligono per il footprint SAR
# Nota: simplekml si aspetta [(lon,lat,alt), (lon,lat,alt), ...]
# L'altitudine è opzionale, la mettiamo a 0 rispetto al suolo.
outer_boundary = [(lon, lat, 0) for lon, lat in corners_deg]
pol = kml.newpolygon(name="SAR Footprint", outerboundaryis=outer_boundary)
pol.style.linestyle.color = simplekml.Color.red # Colore linea
pol.style.linestyle.width = 2 # Spessore linea
pol.style.polystyle.color = simplekml.Color.changealphaint(100, simplekml.Color.red) # Rosso semi-trasparente per riempimento
# Salva il file KML
logging.debug(f"{log_prefix} Saving KML to: {output_path}")
kml.save(output_path)
logging.info(f"{log_prefix} KML file saved successfully: {output_path}")
return True
except Exception as e:
logging.exception(f"{log_prefix} Error generating or saving KML file:")
return False
def launch_google_earth(kml_path):
"""
Tenta di aprire un file KML con l'applicazione predefinita del sistema
(che dovrebbe essere Google Earth Pro se installato correttamente).
Args:
kml_path (str): Percorso del file KML da aprire.
"""
log_prefix = "[Utils Launch GE]"
if not os.path.exists(kml_path):
logging.error(f"{log_prefix} Cannot launch: KML file not found at {kml_path}")
return
logging.info(f"{log_prefix} Attempting to launch default KML handler for: {kml_path}")
try:
if sys.platform == "win32":
os.startfile(kml_path) # Metodo standard Windows per aprire un file con l'app associata
elif sys.platform == "darwin": # macOS
subprocess.run(['open', kml_path], check=True)
else: # Linux e altri Unix-like
# Tenta di trovare google-earth-pro nel PATH
google_earth_cmd = shutil.which('google-earth-pro')
if google_earth_cmd:
subprocess.Popen([google_earth_cmd, kml_path])
logging.debug(f"{log_prefix} Launched using found command: {google_earth_cmd}")
else:
# Fallback: usa xdg-open che usa l'associazione MIME
logging.debug(f"{log_prefix} 'google-earth-pro' not in PATH, using 'xdg-open'...")
subprocess.run(['xdg-open', kml_path], check=True)
logging.info(f"{log_prefix} Launch command issued for {kml_path}.")
except FileNotFoundError:
# Questo può accadere su Linux se né google-earth-pro né xdg-open sono trovati
logging.error(f"{log_prefix} Launch command failed: Command not found (is Google Earth Pro installed and in PATH, or xdg-utils installed?)")
except subprocess.CalledProcessError as e:
# Errore da 'open' o 'xdg-open'
logging.error(f"{log_prefix} Error launching KML handler: {e}")
except Exception as e:
logging.exception(f"{log_prefix} Unexpected error launching Google Earth:")
# --- END OF FILE utils.py ---