# --- START OF FILE ControlPanel.py --- # ControlPanel.py (Previously app.py) """ THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. Main application module for the Control Panel application. Orchestrates UI, display, network reception, image processing pipeline, test mode management, map integration, image recording, and state management. Initializes all sub-modules and manages the main application lifecycle. """ # --- Standard library imports --- import threading import time import queue import os import logging import math import sys import socket from typing import Optional, Tuple, Any, Dict, TYPE_CHECKING, Union, List import datetime import tkinter.filedialog as fd from pathlib import Path import tkinter as tk from tkinter import ttk from tkinter.scrolledtext import ScrolledText # --- Third-party imports --- import numpy as np import cv2 import screeninfo # --- PIL Import and Type Definition --- try: from PIL import Image, ImageTk ImageType = Image.Image # type: ignore except ImportError: ImageType = Any # Fallback type hint Image = None ImageTk = None logging.critical( "[App Init] Pillow library not found. Map/Image functionality will fail." ) # --- Configuration Import --- from controlpanel import config # --- Logging Setup --- try: from controlpanel.logging_config import setup_logging setup_logging() except ImportError: print("ERROR: logging_config.py not found. Using basic logging.") logging.basicConfig( level=logging.WARNING, format="%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s", ) # --- Application Modules Import --- from controlpanel.gui.ui import ControlPanel as UIPanel, StatusBar, create_main_window from controlpanel.gui.display import DisplayManager from controlpanel.utils.utils import ( put_queue, clear_queue, decimal_to_dms, dms_string_to_decimal, generate_sar_kml, # Rimane invariato launch_google_earth, # cleanup_old_kml_files, # Rimuovere o commentare il vecchio import cleanup_kml_output_directory, # <<< NUOVO IMPORT open_google_maps, generate_lookat_and_point_kml, generate_composite_kml, # Modificato precedentemente per usare questa _simplekml_available, _pyproj_available, format_ctypes_structure, ) from controlpanel.utils.network import create_udp_socket, close_udp_socket from controlpanel.core.receiver import UdpReceiver from controlpanel.app_state import AppState from controlpanel.core.test_mode_manager import TestModeManager from controlpanel.core.image_pipeline import ImagePipeline from controlpanel.utils.image_processing import load_image, normalize_image, apply_color_palette from controlpanel.core.image_recorder import ImageRecorder # --- Map related imports (Conditional) --- map_libs_found = True try: import mercantile import pyproj if Image is None and ImageType is not Any: raise ImportError("Pillow failed import") except ImportError as map_lib_err: logging.warning( f"[App Init] Core map lib import failed ({map_lib_err}). Map disabled." ) map_libs_found = False BaseMapService = None MapTileManager = None MapDisplayWindow = None MapIntegrationManager = None MapCalculationError = Exception if map_libs_found: try: from controlpanel.map.map_services import get_map_service, BaseMapService from controlpanel.map.map_manager import MapTileManager from controlpanel.map.map_utils import MapCalculationError from controlpanel.map.map_display import MapDisplayWindow from controlpanel.map.map_integration import MapIntegrationManager MAP_MODULES_LOADED = True except ImportError as map_import_err: logging.warning( f"[App Init] Specific map module import failed ({map_import_err}). Map disabled." ) MAP_MODULES_LOADED = False BaseMapService = None MapTileManager = None MapDisplayWindow = None MapIntegrationManager = None MapCalculationError = Exception else: MAP_MODULES_LOADED = False # --- Import Version Info FOR THE WRAPPER ITSELF --- try: # Use absolute import based on package name from controlpanel import _version as wrapper_version WRAPPER_APP_VERSION_STRING = f"{wrapper_version.__version__} ({wrapper_version.GIT_BRANCH}/{wrapper_version.GIT_COMMIT_HASH[:7]})" WRAPPER_BUILD_INFO = f"Wrapper Built: {wrapper_version.BUILD_TIMESTAMP}" except ImportError: # This might happen if you run the wrapper directly from source # without generating its _version.py first (if you use that approach for the wrapper itself) WRAPPER_APP_VERSION_STRING = "(Dev Wrapper)" WRAPPER_BUILD_INFO = "Wrapper build time unknown" # --- End Import Version Info --- # --- Constants for Version Generation --- DEFAULT_VERSION = "0.0.0+unknown" DEFAULT_COMMIT = "Unknown" DEFAULT_BRANCH = "Unknown" # --- End Constants --- # --- Main Application Class --- class ControlPanelApp: """ Main application class. Manages UI, display, processing, network, state, and orchestrates various managers. """ # --- Initialization Method --- def __init__(self, root: tk.Tk): """Initializes the main application components and state.""" log_prefix = "[App Init]" logging.debug(f"{log_prefix} Starting initialization...") self.root = root self.root.title(f"Control Panel - {WRAPPER_APP_VERSION_STRING}") try: # Determine script directory safely if getattr(sys, "frozen", False): # Running as compiled executable script_dir = os.path.dirname(sys.executable) elif "__file__" in locals() or "__file__" in globals(): # Running as script script_dir = os.path.dirname(os.path.abspath(__file__)) else: # Fallback (interactive mode?) script_dir = os.getcwd() icon_path = os.path.join(script_dir, "ControlPanel.ico") if os.path.exists(icon_path): self.root.iconbitmap(default=icon_path) logging.debug(f"{log_prefix} Icon set from: {icon_path}") else: logging.warning(f"{log_prefix} Icon file not found at: {icon_path}") except Exception as icon_e: logging.warning(f"{log_prefix} Icon error: {icon_e}") # Initialize State self.state = AppState() # Initialize Queues self.sar_queue = queue.Queue(config.DEFAULT_SAR_QUEUE) self.mouse_queue = queue.Queue(config.DEFAULT_MOUSE_QUEUE) self.tkinter_queue = queue.Queue(config.DEFAULT_TKINTER_QUEUE) self.mfd_queue = queue.Queue(config.DEFAULT_MFD_QUEUE) # --- Window Placement and Sizing --- screen_w, screen_h = self._get_screen_dimensions() initial_sar_w, initial_sar_h = self._calculate_initial_sar_size() self.tkinter_x, self.tkinter_y = self._calculate_tkinter_position(screen_h) # Store original/expanded widths for dynamic resizing self.original_window_width = config.TKINTER_MIN_WIDTH self.metadata_panel_width = 300 # Adjusted width example self.expanded_window_width = ( self.original_window_width + self.metadata_panel_width + 10 ) # Set initial geometry initial_height = max(config.TKINTER_MIN_HEIGHT, 650) self.root.geometry( f"{self.original_window_width}x{initial_height}+{self.tkinter_x}+{self.tkinter_y}" ) self.root.minsize(self.original_window_width, config.TKINTER_MIN_HEIGHT) # --- UI Structure with Grid --- # Status bar packed at bottom first self.statusbar = StatusBar(self.root) self.statusbar.pack(side=tk.BOTTOM, fill=tk.X) # Main container frame packed to fill remaining space self.container_frame = ttk.Frame(self.root) self.container_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True) # Configure grid columns for the container # Column 0 for the main control panel self.container_frame.columnconfigure(0, weight=1) # Column 1 for the optional metadata panel (initially no weight) self.container_frame.columnconfigure(1, weight=0) # Allow the row containing the panels to expand vertically self.container_frame.rowconfigure(0, weight=1) # Initialize ControlPanel UI (UIPanel class from ui.py) self.control_panel = UIPanel(self.container_frame, self) # Grid the control panel into the container's first column self.control_panel.grid(row=0, column=0, sticky="nsew") # --- Create Metadata Frame Structure (as attribute of self) --- # Create metadata frame as child of the container_frame self.metadata_frame = ttk.Labelframe( self.container_frame, text="Raw SAR Metadata", padding=5 ) # Create inner frame for Text + Scrollbar self.metadata_text_frame = ttk.Frame(self.metadata_frame) self.metadata_text_frame.pack(fill=tk.BOTH, expand=True) # Create Scrollbar self.metadata_scrollbar = ttk.Scrollbar( self.metadata_text_frame, orient=tk.VERTICAL ) # Create Text widget and assign to self attribute self.metadata_display_text = tk.Text( self.metadata_text_frame, wrap=tk.NONE, state="disabled", height=8, yscrollcommand=self.metadata_scrollbar.set, font=("Courier New", 8), ) # Configure scrollbar self.metadata_scrollbar.config(command=self.metadata_display_text.yview) # Pack scrollbar and text widget self.metadata_scrollbar.pack(side=tk.RIGHT, fill=tk.Y) self.metadata_display_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) # Set initial placeholder text using the method on self self.set_metadata_display("Enable 'Show SAR Metadata' checkbox to view data...") # NOTE: metadata_frame is CREATED but NOT GRIDDED here initially logging.debug( f"{log_prefix} Metadata Display frame structure created but not gridded." ) # --- Initialize Sub-systems --- # Calculate positions for external windows self.mfd_x, self.mfd_y = self._calculate_mfd_position() self.sar_x, self.sar_y = self._calculate_sar_position(screen_w, initial_sar_w) map_max_w = ( config.MAX_MAP_DISPLAY_WIDTH if not MapDisplayWindow else getattr( MapDisplayWindow, "MAX_DISPLAY_WIDTH", config.MAX_MAP_DISPLAY_WIDTH ) ) map_x, map_y = self._calculate_map_position(screen_w, initial_sar_w, map_max_w) # Initialize Display Manager self.display_manager = DisplayManager( self, self.sar_queue, self.mouse_queue, self.sar_x, self.sar_y, self.mfd_x, self.mfd_y, initial_sar_w, # Correct variable name initial_sar_h, # Correct variable name ) try: self.display_manager.initialize_display_windows() except Exception as e: self.set_status("Error: Display Init Failed") logging.critical(f"{log_prefix} Display init failed: {e}", exc_info=True) # Call LUT updates AFTER control_panel is initialized self.update_brightness_contrast_lut() self.update_mfd_lut() # Initialize Image Pipeline self.image_pipeline = ImagePipeline( self.state, self.sar_queue, self.mfd_queue, self ) # Initialize Test Mode Manager self.test_mode_manager = TestModeManager( self.state, self.root, self.sar_queue, self.mfd_queue, self ) # Initialize Map Manager (Conditional) self.map_integration_manager = None if config.ENABLE_MAP_OVERLAY: if MAP_MODULES_LOADED and MapIntegrationManager: try: self.map_integration_manager = MapIntegrationManager( self.state, self.tkinter_queue, self, map_x, map_y ) except Exception as map_e: logging.exception( f"{log_prefix} MapIntegrationManager init failed:" ) self.map_integration_manager = None self.set_status("Error: Map Init Failed") else: self.set_status("Error: Map Modules Missing") # Initialize Image Recorder (Conditional) self.image_recorder = None if ImageRecorder: try: self.image_recorder = ImageRecorder(self.state) except Exception as rec_e: logging.exception(f"{log_prefix} ImageRecorder init failed:") self.image_recorder = None # Update initial UI display (like coords, stats) self._update_initial_ui_display() # Network Setup self.local_ip = config.DEFAULT_SER_IP self.local_port = config.DEFAULT_SER_PORT self.udp_socket = None self.udp_receiver = None self.udp_thread = None if not config.USE_LOCAL_IMAGES: self._setup_network_receiver() else: # Load local/test images if network disabled self._start_initial_image_loader() # Start loops/timers self.process_sar_queue() self.process_mfd_queue() self.process_mouse_queue() self.process_tkinter_queue() self.schedule_periodic_updates() # Set initial mode (Test or Normal/Local) self.update_image_mode() logging.info(f"{log_prefix} Application initialization complete.") # --- Status Update Method --- def set_status(self, message: str): """Safely updates the main status message prefix in the status bar via after_idle.""" log_prefix = "[App Set Status]" if not hasattr(self, "state") or self.state.shutting_down: return new_status_prefix = f"Status: {message}" def _update(): # Inner function to perform the update in the main thread if not hasattr(self, "state") or self.state.shutting_down: return try: statusbar = getattr(self, "statusbar", None) if ( statusbar and isinstance(statusbar, tk.Widget) and statusbar.winfo_exists() ): current_text: str = statusbar.cget("text") # Preserve info after the first '|' parts = current_text.split("|", 1) # Split only once suffix = "" if len(parts) > 1: suffix = f" | {parts[1].strip()}" # Reconstruct suffix final_text = f"{new_status_prefix}{suffix}" # Call the specific method of the StatusBar class if available if hasattr(statusbar, "set_status_text"): statusbar.set_status_text(final_text) else: # Fallback statusbar.config(text=final_text) except Exception as e: logging.exception(f"{log_prefix} Error updating status bar text:") try: # Schedule the inner function using after_idle if hasattr(self, "root") and self.root and self.root.winfo_exists(): self.root.after_idle(_update) except Exception as e: logging.warning(f"{log_prefix} Error scheduling status update: {e}") # --- LUT Generation Methods --- def update_brightness_contrast_lut(self): """Recalculates the SAR B/C LUT based on AppState and stores it back.""" log_prefix = "[App Update SAR LUT]" if not hasattr(self, "state"): return try: contrast = max(0.01, self.state.sar_contrast) brightness = self.state.sar_brightness # Create linear values 0-255 lut_values = np.arange(256, dtype=np.float32) # Apply contrast and brightness adjusted = (lut_values * contrast) + brightness # Clip to 0-255 and convert to uint8 self.state.brightness_contrast_lut = np.clip( np.round(adjusted), 0, 255 ).astype(np.uint8) except Exception as e: logging.exception(f"{log_prefix} Error calculating SAR B/C LUT:") # Provide a default identity LUT on error self.state.brightness_contrast_lut = np.arange(256, dtype=np.uint8) def update_mfd_lut(self): """Recalculates the MFD LUT based on AppState parameters and stores it back.""" log_prefix = "[MFD LUT Update]" if not hasattr(self, "state"): return try: params = self.state.mfd_params raw_map_factor = params["raw_map_intensity"] / 255.0 pixel_map = params["pixel_to_category"] categories = params["categories"] # Initialize LUT with zeros (black) new_lut = np.zeros((256, 3), dtype=np.uint8) # Iterate through all possible pixel index values (0-255) for idx in range(256): cat_name = pixel_map.get(idx) # If index belongs to a defined category if cat_name: cat_data = categories[cat_name] bgr = cat_data["color"] intensity_factor = cat_data["intensity"] / 255.0 # Calculate color based on BGR and intensity factor new_lut[idx, 0] = np.clip( int(round(float(bgr[0]) * intensity_factor)), 0, 255 ) new_lut[idx, 1] = np.clip( int(round(float(bgr[1]) * intensity_factor)), 0, 255 ) new_lut[idx, 2] = np.clip( int(round(float(bgr[2]) * intensity_factor)), 0, 255 ) # Handle raw map pixel range (32-255) elif 32 <= idx <= 255: # Scale index linearly from 32-255 to 0-255 intensity raw_intensity = (float(idx) - 32.0) * (255.0 / 223.0) # Apply the raw map intensity factor and clip final_gray = int( round(np.clip(raw_intensity * raw_map_factor, 0, 255)) ) # Assign gray value to all BGR channels new_lut[idx, :] = final_gray # Update the LUT in the application state self.state.mfd_lut = new_lut except Exception as e: logging.critical(f"{log_prefix} CRITICAL MFD LUT error:", exc_info=True) self._apply_fallback_mfd_lut() def _apply_fallback_mfd_lut(self): """Applies a simple grayscale ramp as a fallback MFD LUT.""" if hasattr(self, "state"): try: # Create a single channel grayscale ramp gray_ramp = np.arange(256, dtype=np.uint8)[:, np.newaxis] # Convert grayscale ramp to 3-channel BGR self.state.mfd_lut = cv2.cvtColor(gray_ramp, cv2.COLOR_GRAY2BGR)[ :, 0, : ] # Remove added dimension except Exception as fb_e: logging.critical(f"[MFD LUT Update] Fallback LUT failed: {fb_e}") # Ultimate fallback: all black self.state.mfd_lut = np.zeros((256, 3), dtype=np.uint8) # --- UI Callbacks --- def update_image_mode(self): """Callback for the Test Image checkbox.""" log_prefix = "[App Mode Switch]" if ( not hasattr(self, "state") or not hasattr(self, "test_mode_manager") or self.state.shutting_down ): return try: cp = getattr(self, "control_panel", None) var = getattr(cp, "test_image_var", None) if cp else None # Determine the requested state from the checkbox variable is_test_req = ( var.get() == 1 if (var and isinstance(var, tk.Variable)) else self.state.test_mode_active ) # Only act if the state is actually changing if is_test_req != self.state.test_mode_active: self.state.test_mode_active = is_test_req # Update state first if is_test_req: # Activate test mode manager and related UI actions if self.test_mode_manager.activate(): self.activate_test_mode_ui_actions() else: # If activation fails, revert the UI checkbox self._revert_test_mode_ui() else: # Deactivate test mode manager and related UI actions self.test_mode_manager.deactivate() self.deactivate_test_mode_ui_actions() # Reset stats and update status bar on mode change self.state.reset_statistics() self.update_status() except Exception as e: logging.exception(f"{log_prefix} Error changing image mode:") def update_sar_size(self, event=None): """Callback when SAR display size combobox selection changes.""" log_prefix = "[App CB SAR Size]" if not hasattr(self, "state") or self.state.shutting_down: return try: cp = getattr(self, "control_panel", None) combo = getattr(cp, "sar_size_combo", None) if cp else None if not combo: return size_str = combo.get() factor = 1 # Parse the "1:N" string safely if size_str and ":" in size_str: try: factor_val = int(size_str.split(":")[1]) factor = max(1, factor_val) # Ensure factor is at least 1 except (ValueError, IndexError): factor = 1 # Default to 1 on parsing error # Calculate new dimensions w = max(1, config.SAR_WIDTH // factor) h = max(1, config.SAR_HEIGHT // factor) # Update state and trigger redraw only if size actually changed if w != self.state.sar_display_width or h != self.state.sar_display_height: self.state.update_sar_display_size(w, h) # Trigger reprocessing/redisplay of SAR image self._trigger_sar_update() except Exception as e: logging.exception(f"{log_prefix} Error updating SAR size:") def update_contrast(self, value_str: str): """Callback for SAR contrast slider changes.""" if self.state.shutting_down: return try: # Convert string value from slider to float contrast = float(value_str) # Update contrast in AppState self.state.update_sar_parameters(contrast=contrast) # Recalculate the LUT self.update_brightness_contrast_lut() # Trigger reprocessing/redisplay of SAR image self._trigger_sar_update() except ValueError: logging.warning( f"[App CB SAR Contrast] Invalid contrast value: {value_str}" ) except Exception as e: logging.exception(f"[App CB SAR Contrast] Error updating contrast: {e}") def update_brightness(self, value_str: str): """Callback for SAR brightness slider changes.""" if self.state.shutting_down: return try: # Convert string value from slider to int (via float for safety) brightness = int(float(value_str)) # Update brightness in AppState self.state.update_sar_parameters(brightness=brightness) # Recalculate the LUT self.update_brightness_contrast_lut() # Trigger reprocessing/redisplay of SAR image self._trigger_sar_update() except ValueError: logging.warning( f"[App CB SAR Brightness] Invalid brightness value: {value_str}" ) except Exception as e: logging.exception(f"[App CB SAR Brightness] Error updating brightness: {e}") def update_sar_palette(self, event=None): """Callback when SAR palette combobox selection changes.""" if self.state.shutting_down: return try: cp = getattr(self, "control_panel", None) combo = getattr(cp, "palette_combo", None) if cp else None if not combo: return palette = combo.get() # Validate against configured list if palette in config.COLOR_PALETTES: # Update palette in AppState self.state.update_sar_parameters(palette=palette) # Trigger reprocessing/redisplay of SAR image self._trigger_sar_update() else: # If selection is somehow invalid, revert to current state value logging.warning( f"[App CB SAR Palette] Invalid palette selected: {palette}. Reverting." ) combo.set(self.state.sar_palette) except Exception as e: logging.exception(f"[App CB SAR Palette] Error updating palette: {e}") def update_mfd_category_intensity(self, category_name: str, intensity_value: int): """Callback for MFD category intensity slider changes.""" if self.state.shutting_down: return try: if category_name in self.state.mfd_params["categories"]: # Clamp value to valid range 0-255 clamped_value = np.clip(intensity_value, 0, 255) # Update the intensity in the MFD parameters dictionary self.state.mfd_params["categories"][category_name][ "intensity" ] = clamped_value # Recalculate the MFD LUT based on the change self.update_mfd_lut() # Trigger reprocessing/redisplay of MFD image self._trigger_mfd_update() else: logging.warning( f"[App CB MFD Intensity] Unknown category: {category_name}" ) except Exception as e: logging.exception( f"[App CB MFD Intensity] Error updating intensity for '{category_name}': {e}" ) def choose_mfd_category_color(self, category_name: str): """Callback for MFD category color chooser button.""" # Import moved here to avoid potential issues from tkinter import colorchooser if self.state.shutting_down: return if category_name not in self.state.mfd_params["categories"]: logging.warning(f"[App CB MFD Color] Unknown category: {category_name}") return try: initial_bgr = self.state.mfd_params["categories"][category_name]["color"] initial_hex = ( f"#{initial_bgr[2]:02x}{initial_bgr[1]:02x}{initial_bgr[0]:02x}" ) color_code = colorchooser.askcolor( title=f"Select Color for {category_name}", initialcolor=initial_hex ) # Check if user selected a color if color_code and color_code[0]: rgb = color_code[0] # Convert RGB to BGR tuple, clamping values new_bgr = tuple( np.clip(int(c), 0, 255) for c in (rgb[2], rgb[1], rgb[0]) ) # Update state self.state.mfd_params["categories"][category_name]["color"] = new_bgr self.update_mfd_lut() # Schedule UI update for the color preview cp = getattr(self, "control_panel", None) if ( self.root and self.root.winfo_exists() and cp and hasattr(cp, "update_mfd_color_display") ): self.root.after_idle( cp.update_mfd_color_display, category_name, new_bgr ) self._trigger_mfd_update() except Exception as e: logging.exception( f"[App CB MFD Color] Error choosing color for '{category_name}': {e}" ) def update_mfd_raw_map_intensity(self, intensity_value: int): """Callback for the MFD Raw Map intensity slider.""" if self.state.shutting_down: return try: clamped_value = np.clip(intensity_value, 0, 255) self.state.mfd_params["raw_map_intensity"] = clamped_value self.update_mfd_lut() self._trigger_mfd_update() except Exception as e: logging.exception( f"[App CB MFD RawMap] Error updating raw map intensity: {e}" ) def update_map_size(self, event=None): """Callback when Map Display Size combobox selection changes.""" if not hasattr(self, "state") or self.state.shutting_down: return try: cp = getattr(self, "control_panel", None) combo = getattr(cp, "map_size_combo", None) if cp else None if not combo: return self.state.update_map_scale_factor(combo.get()) # Trigger map redraw to apply the new scale self.trigger_map_redraw() except Exception as e: logging.exception(f"[App CB Map Size] Error updating map size: {e}") def toggle_sar_overlay(self): """Callback for the Show SAR Overlay checkbox.""" if not hasattr(self, "state") or self.state.shutting_down: return try: cp = getattr(self, "control_panel", None) var = getattr(cp, "sar_overlay_var", None) if cp else None if not var or not isinstance(var, tk.Variable): return self.state.update_map_overlay_params(enabled=var.get()) # Trigger map redraw (likely can use recomposition) self.trigger_map_redraw(full_update=False) except Exception as e: logging.exception(f"[App CB Overlay Toggle] Error toggling overlay: {e}") def on_alpha_slider_release(self, event=None): """Callback for SAR Overlay Alpha slider release event.""" log_prefix = "[App CB SAR Alpha Release]" if not hasattr(self, "state") or self.state.shutting_down: return try: cp = getattr(self, "control_panel", None) var = getattr(cp, "sar_overlay_alpha_var", None) if cp else None if not var or not isinstance(var, tk.Variable): return final_alpha = var.get() logging.info(f"{log_prefix} Alpha value set to: {final_alpha:.3f}") self.state.update_map_overlay_params(alpha=final_alpha) # Trigger a map redraw using recomposition (faster) self.trigger_map_redraw(full_update=False) except Exception as e: logging.exception(f"{log_prefix} Error handling alpha slider release: {e}") def toggle_sar_recording(self): """Callback for the Record SAR checkbox.""" if not hasattr(self, "state") or self.state.shutting_down: return try: cp = getattr(self, "control_panel", None) var = getattr(cp, "record_sar_var", None) if cp else None if not var or not isinstance(var, tk.Variable): return self.state.update_sar_recording_enabled(enabled=var.get()) except Exception as e: logging.exception(f"[App CB Record Toggle] Error toggling recording: {e}") def apply_sar_overlay_shift(self): """Callback for the Apply Shift button.""" log_prefix = "[App CB Apply Shift]" if not hasattr(self, "state") or self.state.shutting_down: return cp = getattr(self, "control_panel", None) if not cp: return lat_str = cp.sar_lat_shift_var.get() lon_str = cp.sar_lon_shift_var.get() try: lat_shift = float(lat_str) lon_shift = float(lon_str) # Validate ranges if not (-90.0 <= lat_shift <= 90.0): raise ValueError("Latitude shift out of range (-90 to 90)") if not (-180.0 <= lon_shift <= 180.0): raise ValueError("Longitude shift out of range (-180 to 180)") self.state.update_sar_overlay_shift(lat_shift, lon_shift) # Trigger a FULL map update self.trigger_map_redraw(full_update=True) self.set_status("Applied SAR overlay shift.") except ValueError as ve: logging.error(f"{log_prefix} Invalid shift value: {ve}") self.set_status(f"Error: Invalid shift - {ve}") except Exception as e: logging.exception(f"{log_prefix} Error applying shift: {e}") self.set_status("Error applying shift.") def save_current_map_view(self): """Callback for the Save Map View button.""" log_prefix = "[App CB Save Map]" if not hasattr(self, "state") or self.state.shutting_down: return mgr = getattr(self, "map_integration_manager", None) if not mgr: self.set_status("Error: Map components not ready.") return if self.state.last_composed_map_pil is None: self.set_status("Error: No map view to save.") return try: ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") init_fn = f"map_view_{ts}.png" file_path = fd.asksaveasfilename( title="Save Map View As", initialfile=init_fn, defaultextension=".png", filetypes=[("PNG files", "*.png"), ("All files", "*.*")], ) if file_path: save_dir = os.path.dirname(file_path) save_fn = Path(file_path).stem # Filename without extension mgr.save_map_view_to_file(directory=save_dir, filename=save_fn) else: self.set_status("Save map view cancelled.") except Exception as e: logging.exception(f"{log_prefix} Error during save map dialog/call: {e}") self.set_status("Error saving map view.") def go_to_google_maps(self, coord_source: str): """Callback for 'Go' buttons; opens Google Maps.""" log_prefix = "[App CB Go Gmaps]" if not hasattr(self, "state") or self.state.shutting_down: return cp = getattr(self, "control_panel", None) if not cp: return coords_text: Optional[str] = None source_desc: str = "Unknown" lat_deg: Optional[float] = None lon_deg: Optional[float] = None try: if coord_source == "sar_center": coords_text = cp.sar_center_coords_var.get() source_desc = "SAR Center" elif coord_source == "sar_mouse": coords_text = cp.mouse_coords_var.get() source_desc = "SAR Mouse" elif coord_source == "map_mouse": coords_text = cp.map_mouse_coords_var.get() source_desc = "Map Mouse" else: logging.warning( f"{log_prefix} Unknown coordinate source: {coord_source}" ) return if ( not coords_text or "N/A" in coords_text or "Error" in coords_text or "Invalid" in coords_text ): self.set_status(f"Error: No valid coordinates for {source_desc}.") return # Parse the "Lat=..., Lon=..." string lon_sep = ", Lon=" if lon_sep in coords_text: parts = coords_text.split(lon_sep, 1) lat_dms_str = None if "Lat=" in parts[0]: lat_dms_str = parts[0].split("=", 1)[1].strip() lon_dms_str = parts[1].strip() if lat_dms_str and lon_dms_str: lat_deg = dms_string_to_decimal(lat_dms_str, is_latitude=True) lon_deg = dms_string_to_decimal(lon_dms_str, is_latitude=False) else: raise ValueError("Could not extract Lat/Lon parts") else: raise ValueError("Separator ', Lon=' not found") if lat_deg is None or lon_deg is None: self.set_status(f"Error: Cannot parse coordinates for {source_desc}.") return # Call utility function open_google_maps(lat_deg, lon_deg) except ValueError as ve: logging.error( f"{log_prefix} Parsing error for {source_desc}: {ve} (Text: '{coords_text}')" ) self.set_status(f"Error parsing coords for {source_desc}.") except Exception as e: logging.exception( f"{log_prefix} Error opening Google Maps for {source_desc}:" ) self.set_status(f"Error opening map for {source_desc}.") def go_to_google_earth(self, coord_source: str): """Callback for 'GE' buttons; opens Google Earth with LookAt/Point KML.""" log_prefix = "[App CB Go GEarth]" if not hasattr(self, "state") or self.state.shutting_down: return if not _simplekml_available: logging.error(f"{log_prefix} Cannot proceed: simplekml missing.") self.set_status("Error: KML library missing.") return control_panel_ref = getattr(self, "control_panel", None) if not control_panel_ref: logging.error(f"{log_prefix} Control Panel UI reference not found.") return coords_text: Optional[str] = None source_desc: str = "Unknown" lat_deg: Optional[float] = None lon_deg: Optional[float] = None placemark_name: str = "Selected Location" try: # Get coordinates text and determine KML placemark name if coord_source == "sar_center": coords_text = control_panel_ref.sar_center_coords_var.get() source_desc = "SAR Center" placemark_name = "SAR Center" elif coord_source == "sar_mouse": coords_text = control_panel_ref.mouse_coords_var.get() source_desc = "SAR Mouse" placemark_name = "Mouse on SAR" elif coord_source == "map_mouse": coords_text = control_panel_ref.map_mouse_coords_var.get() source_desc = "Map Mouse" placemark_name = "Mouse on Map" else: logging.warning( f"{log_prefix} Unknown coordinate source: {coord_source}" ) return # Validate coordinates text if ( not coords_text or "N/A" in coords_text or "Error" in coords_text or "Invalid" in coords_text ): self.set_status(f"Error: No valid coordinates for {source_desc}.") return # Parse the DMS string to get decimal degrees lon_sep = ", Lon=" if lon_sep in coords_text: parts = coords_text.split(lon_sep, 1) lat_dms_str = None if "Lat=" in parts[0]: lat_dms_str = parts[0].split("=", 1)[1].strip() lon_dms_str = parts[1].strip() if lat_dms_str and lon_dms_str: lat_deg = dms_string_to_decimal(lat_dms_str, is_latitude=True) lon_deg = dms_string_to_decimal(lon_dms_str, is_latitude=False) else: raise ValueError("Could not split Lat/Lon parts") else: raise ValueError("Separator ', Lon=' not found") if lat_deg is None or lon_deg is None: self.set_status(f"Error: Cannot parse coordinates for {source_desc}.") return # Generate Temporary KML logging.debug( f"{log_prefix} Generating KML for '{placemark_name}' at Lat={lat_deg:.6f}, Lon={lon_deg:.6f}" ) temp_kml_path = generate_lookat_and_point_kml( latitude_deg=lat_deg, longitude_deg=lon_deg, placemark_name=placemark_name, placemark_desc=f"Source: {source_desc}\nCoords: {coords_text}", ) # Launch Google Earth if KML generated if temp_kml_path: logging.debug( f"{log_prefix} Launching Google Earth with KML: {temp_kml_path}" ) launch_google_earth(temp_kml_path) else: logging.error(f"{log_prefix} Failed to generate KML file.") self.set_status("Error: Failed to create KML.") except ValueError as ve: logging.error( f"{log_prefix} Parsing error for {source_desc}: {ve} (Text: '{coords_text}')" ) self.set_status(f"Error parsing coords for {source_desc}.") except Exception as e: logging.exception( f"{log_prefix} Error preparing/launching GE for {source_desc}:" ) self.set_status(f"Error launching GE for {source_desc}.") def go_to_all_gearth(self): """ Callback for 'GE All' button. Generates a composite KML including points, SAR footprint, and SAR ground overlay, then launches Google Earth. """ log_prefix = "[App CB GE All]" if not hasattr(self, "state") or self.state.shutting_down: return if not _simplekml_available or not _pyproj_available: logging.error(f"{log_prefix} Cannot proceed: simplekml or pyproj missing.") self.set_status("Error: KML library missing.") return control_panel_ref = getattr(self, "control_panel", None) if not control_panel_ref: logging.error(f"{log_prefix} Control Panel UI reference not found.") return points_to_plot: List[Tuple[float, float, str, Optional[str]]] = [] # --- 1. Collect Point Data (same as before) --- source_map = { "SAR Center": ("SAR Center", control_panel_ref.sar_center_coords_var), "SAR Mouse": ("Mouse on SAR", control_panel_ref.mouse_coords_var), "Map Mouse": ("Mouse on Map", control_panel_ref.map_mouse_coords_var), } for internal_name, (kml_name, tk_var) in source_map.items(): coords_text = tk_var.get() lat_deg, lon_deg = None, None if ( coords_text and "N/A" not in coords_text and "Error" not in coords_text and "Invalid" not in coords_text ): try: lon_sep = ", Lon=" if lon_sep in coords_text: parts = coords_text.split(lon_sep, 1) lat_dms_str = ( parts[0].split("=", 1)[1].strip() if "Lat=" in parts[0] else None ) lon_dms_str = parts[1].strip() if lat_dms_str and lon_dms_str: lat_deg = dms_string_to_decimal( lat_dms_str, is_latitude=True ) lon_deg = dms_string_to_decimal( lon_dms_str, is_latitude=False ) else: raise ValueError("Could not split Lat/Lon parts") else: raise ValueError("Separator ', Lon=' not found") if lat_deg is not None and lon_deg is not None: points_to_plot.append( ( lat_deg, lon_deg, kml_name, f"Source: {internal_name}\nCoords: {coords_text}", ) ) except Exception as parse_err: logging.error( f"{log_prefix} Error parsing coords for {internal_name}: {parse_err}" ) # --- 2. Get SAR Data and GeoInfo from AppState --- sar_normalized_uint8 = self.state.current_sar_normalized geo_info = self.state.current_sar_geo_info if sar_normalized_uint8 is None or sar_normalized_uint8.size == 0: logging.error(f"{log_prefix} No current SAR image data available.") self.set_status("Error: SAR image data missing.") return if not geo_info or not geo_info.get("valid", False): logging.error(f"{log_prefix} Invalid or missing SAR GeoInfo available.") self.set_status("Error: SAR GeoInfo missing.") return # --- 3. Process SAR Image for Overlay (B/C, Palette) --- # Retrieve current parameters from state bc_lut = self.state.brightness_contrast_lut palette = self.state.sar_palette if bc_lut is None: logging.error(f"{log_prefix} SAR B/C LUT is missing. Cannot process image.") self.set_status("Error: SAR LUT missing.") return logging.debug(f"{log_prefix} Processing current SAR image for KML overlay...") try: # Start with the normalized uint8 image from state img_for_kml = sar_normalized_uint8.copy() # Apply B/C LUT img_for_kml = cv2.LUT(img_for_kml, bc_lut) # Apply Color Palette (if not GRAY) if palette != "GRAY": img_for_kml = apply_color_palette(img_for_kml, palette) # Ensure BGR format even if GRAY palette was used elif img_for_kml.ndim == 2: img_for_kml = cv2.cvtColor(img_for_kml, cv2.COLOR_GRAY2BGR) if img_for_kml is None: raise ValueError("Image processing for KML resulted in None.") logging.debug(f"{log_prefix} SAR image processed for KML (shape: {img_for_kml.shape}).") except Exception as proc_err: logging.exception(f"{log_prefix} Error processing SAR image for KML:") self.set_status("Error processing SAR image.") return # --- 4. Generate Composite KML --- try: logging.debug(f"{log_prefix} Generating composite KML...") # Pass the collected points, the *processed* SAR image, and geo_info composite_kml_path = generate_composite_kml( points_to_plot, img_for_kml, geo_info ) if composite_kml_path: # --- 5. Launch Google Earth --- logging.debug( f"{log_prefix} Launching GE with composite KML: {composite_kml_path}" ) launch_google_earth(composite_kml_path) self.set_status("Launched Google Earth with composite view.") # Optionally: Clean up old KML files (including this one eventually) cleanup_kml_output_directory(config.KML_OUTPUT_DIRECTORY, config.MAX_KML_FILES) else: logging.error(f"{log_prefix} Failed to generate composite KML file.") self.set_status("Error: Failed to create KML.") except Exception as e: logging.exception( f"{log_prefix} Error generating/launching composite KML:" ) self.set_status("Error during GE All generation.") def toggle_sar_metadata_display(self): """Callback for 'Show SAR Metadata' checkbox. Shows/Hides the metadata panel using grid.""" log_prefix = "[App CB MetaToggle]" if not hasattr(self, "state") or self.state.shutting_down: return # Get references needed cp = getattr(self, "control_panel", None) var = getattr(cp, "show_meta_var", None) if cp else None metadata_frame = getattr(self, "metadata_frame", None) container = getattr(self, "container_frame", None) if not var or not cp or not metadata_frame or not container: logging.error( f"{log_prefix} UI components missing. Cannot toggle metadata. " f"(Var: {var is not None}, CP: {cp is not None}, " f"MetaFrame: {metadata_frame is not None}, Container: {container is not None})" ) return try: is_enabled = var.get() # Only proceed if the state needs changing if bool(is_enabled) == self.state.display_sar_metadata: return self.state.display_sar_metadata = bool(is_enabled) logging.info(f"{log_prefix} SAR Metadata display changing to: {is_enabled}") current_height = self.root.winfo_height() if is_enabled: # --- SHOW METADATA PANEL --- # 1. Grid the metadata frame into the container (column 1) logging.debug(f"{log_prefix} Gridding metadata frame...") metadata_frame.grid( row=0, column=1, sticky="nsew", padx=(5, 5), pady=(0, 0) ) # 2. Configure column weight (give equal weight) container.columnconfigure(1, weight=1) # 3. Resize window new_width = self.expanded_window_width logging.debug( f"{log_prefix} Resizing window to expanded width: {new_width}" ) self.root.geometry(f"{new_width}x{current_height}") self.root.minsize(new_width, config.TKINTER_MIN_HEIGHT) # 4. Update content if self.state.last_sar_metadata_str: self.set_metadata_display(self.state.last_sar_metadata_str) else: self.set_metadata_display("") else: # --- HIDE METADATA PANEL --- # 1. Remove metadata frame from grid management logging.debug(f"{log_prefix} Removing metadata frame from grid...") metadata_frame.grid_remove() # 2. Reset column weight container.columnconfigure(1, weight=0) # 3. Resize window back to original new_width = self.original_window_width logging.debug( f"{log_prefix} Resizing window to original width: {new_width}" ) self.root.geometry(f"{new_width}x{current_height}") self.root.minsize(new_width, config.TKINTER_MIN_HEIGHT) # 4. Clear content self.set_metadata_display( "Enable 'Show SAR Metadata' checkbox to view data..." ) except Exception as e: logging.exception(f"{log_prefix} Error toggling metadata display: {e}") # --- Initialization Helper Methods --- def _get_screen_dimensions(self) -> Tuple[int, int]: """Gets primary screen dimensions using screeninfo.""" log_prefix = "[App Init]" try: monitors = screeninfo.get_monitors() if not monitors: raise screeninfo.ScreenInfoError("No monitors detected.") screen = monitors[0] logging.debug( f"{log_prefix} Detected Screen: {screen.width}x{screen.height}" ) return screen.width, screen.height except Exception as e: logging.warning( f"{log_prefix} Screen info error: {e}. Using default 1920x1080." ) return 1920, 1080 def _calculate_initial_sar_size( self, desired_factor_if_map: int = 4 ) -> Tuple[int, int]: """Calculates initial SAR display size based on config and map state.""" log_prefix = "[App Init]" initial_w = self.state.sar_display_width initial_h = self.state.sar_display_height map_enabled_and_loaded = config.ENABLE_MAP_OVERLAY and MAP_MODULES_LOADED if map_enabled_and_loaded: forced_factor = max(1, desired_factor_if_map) initial_w = config.SAR_WIDTH // forced_factor initial_h = config.SAR_HEIGHT // forced_factor # Update state immediately if map forces a different initial size if ( initial_w != self.state.sar_display_width or initial_h != self.state.sar_display_height ): self.state.update_sar_display_size(initial_w, initial_h) logging.info( f"{log_prefix} Map active, using SAR size 1:{forced_factor} ({initial_w}x{initial_h})." ) else: logging.debug( f"{log_prefix} Using initial SAR size from state: {initial_w}x{initial_h}." ) return initial_w, initial_h def _calculate_tkinter_position(self, screen_h: int) -> Tuple[int, int]: """Calculates the initial X, Y position for the Tkinter control panel window.""" x = 10 y = config.INITIAL_MFD_HEIGHT + 40 # Position below MFD placeholder area # Adjust if it goes off screen bottom if y + config.TKINTER_MIN_HEIGHT > screen_h: y = max(10, screen_h - config.TKINTER_MIN_HEIGHT - 10) return x, y def _calculate_mfd_position(self) -> Tuple[int, int]: """Calculates the initial X, Y position for the MFD display window.""" x = self.tkinter_x # Align with Tkinter window's left edge y = 10 # Near top return x, y def _calculate_sar_position( self, screen_w: int, initial_sar_w: int ) -> Tuple[int, int]: """Calculates the initial X, Y position for the SAR display window.""" # Position right of ORIGINAL Tkinter width x = self.tkinter_x + self.original_window_width + 20 y = 10 # Align with top # Adjust if it goes off screen right if x + initial_sar_w > screen_w: x = max(10, screen_w - initial_sar_w - 10) return x, y def _calculate_map_position( self, screen_w: int, current_sar_w: int, max_map_width: int ) -> Tuple[int, int]: """Calculates the initial X, Y position for the Map display window.""" # Position right of SAR window x = self.sar_x + current_sar_w + 20 y = 10 # Align with top # Adjust if it goes off screen right if x + max_map_width > screen_w: x = max(10, screen_w - max_map_width - 10) return x, y def _setup_network_receiver(self): """Creates and starts the UDP socket and receiver thread.""" log_prefix = "[App Init Network]" logging.info( f"{log_prefix} Attempting network receiver on {self.local_ip}:{self.local_port}" ) self.udp_socket = create_udp_socket(self.local_ip, self.local_port) if self.udp_socket: try: recorder_instance = getattr(self, "image_recorder", None) self.udp_receiver = UdpReceiver( app=self, udp_socket=self.udp_socket, set_new_sar_image_callback=self.handle_new_sar_data, set_new_mfd_indices_image_callback=self.handle_new_mfd_data, image_recorder=recorder_instance, ) self.udp_thread = threading.Thread( target=self.udp_receiver.receive_udp_data, name="UDPReceiverThread", daemon=True, ) self.udp_thread.start() self.set_status(f"Listening UDP {self.local_ip}:{self.local_port}") except Exception as receiver_init_e: logging.critical( f"{log_prefix} Failed to initialize UdpReceiver: {receiver_init_e}", exc_info=True, ) self.set_status("Error: Receiver Init Failed") close_udp_socket(self.udp_socket) self.udp_socket = None else: self.set_status("Error: UDP Socket Failed") def _start_initial_image_loader(self): """Starts a background thread to load local/test images if needed.""" log_prefix = "[App Init]" should_load = config.USE_LOCAL_IMAGES or config.ENABLE_TEST_MODE if should_load: logging.debug(f"{log_prefix} Starting initial image loading thread...") image_loading_thread = threading.Thread( target=self.load_initial_images, name="ImageLoaderThread", daemon=True ) image_loading_thread.start() elif self.root and self.root.winfo_exists(): # If not loading local/test, schedule initial display setup immediately self.root.after_idle(self._set_initial_display_from_loaded_data) def _update_initial_ui_display(self): """Sets the initial text for UI info Entry widgets based on default AppState.""" log_prefix = "[App Init]" logging.debug(f"{log_prefix} Setting initial UI info display...") control_panel_ref = getattr(self, "control_panel", None) if not control_panel_ref: return try: control_panel_ref.set_sar_center_coords("N/A", "N/A") control_panel_ref.set_sar_orientation("N/A") control_panel_ref.set_sar_size_km("N/A") control_panel_ref.set_mouse_coordinates("N/A", "N/A") control_panel_ref.set_map_mouse_coordinates("N/A", "N/A") # Get initial stats and display them initial_stats = self.state.get_statistics() drop_txt = ( f"Drop(Q): S={initial_stats['dropped_sar_q']}," f"M={initial_stats['dropped_mfd_q']}," f"Tk={initial_stats['dropped_tk_q']}," f"Mo={initial_stats['dropped_mouse_q']}" ) incmpl_txt = ( f"Incmpl(RX): S={initial_stats['incomplete_sar_rx']}," f"M={initial_stats['incomplete_mfd_rx']}" ) control_panel_ref.set_statistics_display(drop_txt, incmpl_txt) logging.debug(f"{log_prefix} Initial UI info display set.") except tk.TclError as e: # May happen if widgets are not fully ready during init sequence logging.warning( f"{log_prefix} Error setting initial UI display (TclError): {e}" ) except Exception as e: logging.exception( f"{log_prefix} Unexpected error setting initial UI display:" ) def load_initial_images(self): """(Runs in background thread) Loads initial local/test images into AppState.""" log_prefix = "[App Image Loader]" if self.state.shutting_down: return # Optionally update status bar if self.root and self.root.winfo_exists(): self.root.after_idle(self.set_status, "Loading initial images...") try: # Ensure test images are generated if needed if config.ENABLE_TEST_MODE or self.state.test_mode_active: if hasattr(self, "test_mode_manager") and self.test_mode_manager: self.test_mode_manager._ensure_test_images() # Load local images if configured if config.USE_LOCAL_IMAGES: self._load_local_mfd_image() self._load_local_sar_image() # Schedule the display update after loading if self.root and self.root.winfo_exists(): self.root.after_idle(self._set_initial_display_from_loaded_data) except Exception as e: logging.exception(f"{log_prefix} Error during initial image loading:") # Update status bar on error if self.root and self.root.winfo_exists(): self.root.after_idle(self.set_status, "Error Loading Images") finally: logging.info(f"{log_prefix} Initial image loading thread finished.") def _load_local_mfd_image(self): """Loads local MFD image data (indices) into AppState.""" log_prefix = "[App Image Loader]" # Default placeholder if loading fails default_indices = np.random.randint( 0, 256, (config.MFD_HEIGHT, config.MFD_WIDTH), dtype=np.uint8 ) try: # Placeholder: Implement actual loading from config.MFD_IMAGE_PATH mfd_path = getattr( config, "MFD_IMAGE_PATH", "local_mfd_indices.png" ) # Example logging.warning( f"{log_prefix} Local MFD image loading NYI ({mfd_path}). Using random." ) # loaded_indices = load_image(mfd_path, np.uint8) # Hypothetical load # self.state.local_mfd_image_data_indices = loaded_indices if loaded_indices ... else default_indices self.state.local_mfd_image_data_indices = default_indices except Exception as e: logging.exception(f"{log_prefix} Error loading local MFD image:") self.state.local_mfd_image_data_indices = default_indices def _load_local_sar_image(self): """Loads local SAR image data (raw) into AppState.""" log_prefix = "[App Image Loader]" # Default placeholder if loading fails default_raw_data = np.zeros( (config.SAR_HEIGHT, config.SAR_WIDTH), dtype=config.SAR_DATA_TYPE ) try: sar_path = getattr(config, "SAR_IMAGE_PATH", "local_sar_image.tif") logging.info( f"{log_prefix} Attempting to load local SAR raw data from: {sar_path}" ) loaded_raw_data = load_image(sar_path, config.SAR_DATA_TYPE) # Check if loading was successful and data is not empty if loaded_raw_data is not None and loaded_raw_data.size > 0: self.state.local_sar_image_data_raw = loaded_raw_data else: logging.warning( f"{log_prefix} Failed to load local SAR raw data or empty image. Using zeros." ) self.state.local_sar_image_data_raw = default_raw_data except Exception as e: logging.exception(f"{log_prefix} Error loading local SAR raw data:") self.state.local_sar_image_data_raw = default_raw_data def _set_initial_display_from_loaded_data(self): """(Runs in main thread) Sets initial display based on loaded data/mode.""" log_prefix = "[App Init Display]" if self.state.shutting_down: return is_test = self.state.test_mode_active is_local = config.USE_LOCAL_IMAGES # If in local mode (and not test mode starting up) if not is_test and is_local: # Process MFD if data available if self.state.local_mfd_image_data_indices is not None: if hasattr(self, "image_pipeline") and self.image_pipeline: self.state.current_mfd_indices = ( self.state.local_mfd_image_data_indices.copy() ) self.image_pipeline.process_mfd_for_display() # Process SAR if data available if self.state.local_sar_image_data_raw is not None: self.set_initial_sar_image(self.state.local_sar_image_data_raw) else: # Show black SAR image if local data missing self.set_initial_sar_image(None) elif is_test: # Test mode display is handled by TestModeManager activation pass else: # Network mode (no local/test images loaded yet) # Show placeholders while waiting for network data self._show_network_placeholders() # Set final status (consider if map is still loading) map_loading = False mgr = getattr(self, "map_integration_manager", None) if mgr: thread_attr = getattr(mgr, "_map_initial_display_thread", None) if thread_attr and isinstance(thread_attr, threading.Thread): map_loading = thread_attr.is_alive() # Update status only if map is done loading (or no map) if not map_loading: status = "" if is_test: status = "Ready (Test Mode)" elif is_local: status = "Ready (Local Mode)" else: # Network mode status status = ( f"Listening UDP {self.local_ip}:{self.local_port}" if self.udp_socket else "Error: No Socket" ) self.set_status(status) def set_initial_sar_image(self, raw_image_data: Optional[np.ndarray]): """Processes raw SAR data (or None), updates state, resets UI, triggers display.""" log_prefix = "[App Init SAR Image]" normalized = None if self.state.shutting_down: return if raw_image_data is not None and raw_image_data.size > 0: try: # Normalize raw data to uint8 for display processing normalized = normalize_image(raw_image_data, target_type=np.uint8) except Exception as e: logging.exception(f"{log_prefix} Error normalizing initial SAR:") # Use black image if normalization failed or no raw data provided if normalized is None: # Create black image if state is None initially if self.state.current_sar_normalized is None: self.state.current_sar_normalized = np.zeros( (config.SAR_HEIGHT, config.SAR_WIDTH), dtype=np.uint8 ) # Fill existing array with black self.state.current_sar_normalized.fill(0) else: # Assign the successfully normalized image self.state.current_sar_normalized = normalized # Reset GeoInfo to default invalid state self.state.current_sar_geo_info = self.state._initialize_geo_info() # Reset related UI fields self._reset_ui_geo_info() # Trigger display processing pipeline if hasattr(self, "image_pipeline") and self.image_pipeline: self.image_pipeline.process_sar_for_display() # --- Mode Switching UI Actions --- def activate_test_mode_ui_actions(self): """Actions to perform when Test Mode is activated via UI.""" log_prefix = "[App Test Activate]" self.set_status("Activating Test Mode...") # Reset geo info display self._reset_ui_geo_info() # Clear display queues clear_queue(self.mfd_queue) clear_queue(self.sar_queue) # Set status (TestModeManager might update it again) self.set_status("Ready (Test Mode)") def deactivate_test_mode_ui_actions(self): """Actions to perform when Test Mode is deactivated via UI.""" log_prefix = "[App Test Deactivate]" self.set_status("Activating Normal Mode...") # Clear display queues clear_queue(self.mfd_queue) clear_queue(self.sar_queue) # Reset geo info display self._reset_ui_geo_info() # Restore display based on whether we are in Local or Network mode if config.USE_LOCAL_IMAGES: # Local Mode Restore # Display local MFD if available if self.state.local_mfd_image_data_indices is not None: self.state.current_mfd_indices = ( self.state.local_mfd_image_data_indices.copy() ) if hasattr(self, "image_pipeline"): self.image_pipeline.process_mfd_for_display() # Display local SAR if available if self.state.local_sar_image_data_raw is not None: self.set_initial_sar_image(self.state.local_sar_image_data_raw) else: self.set_initial_sar_image(None) # Show black if missing self.set_status("Ready (Local Mode)") else: # Network Mode Restore # Show placeholders until network data arrives self._show_network_placeholders() # Set status based on socket state status = ( f"Listening UDP {self.local_ip}:{self.local_port}" if self.udp_socket else "Error: No UDP Socket" ) self.set_status(status) def _reset_ui_geo_info(self): """Schedules UI reset for geo-related Entry widgets on the main thread.""" log_prefix = "[App UI Reset]" cp = getattr(self, "control_panel", None) if self.root and self.root.winfo_exists() and cp: # Use lambda with after_idle to ensure calls run in main thread self.root.after_idle(lambda: cp.set_sar_orientation("N/A")) self.root.after_idle(lambda: cp.set_mouse_coordinates("N/A", "N/A")) self.root.after_idle(lambda: cp.set_map_mouse_coordinates("N/A", "N/A")) self.root.after_idle(lambda: cp.set_sar_center_coords("N/A", "N/A")) self.root.after_idle(lambda: cp.set_sar_size_km("N/A")) def _revert_test_mode_ui(self): """Tries to uncheck the test mode checkbox in the UI and resets the state flag.""" log_prefix = "[App Mode Switch]" logging.warning(f"{log_prefix} Reverting Test Mode UI/state...") cp = getattr(self, "control_panel", None) var = getattr(cp, "test_image_var", None) if cp else None if self.root and self.root.winfo_exists() and var: try: # Schedule the UI update self.root.after_idle(var.set, 0) except Exception as e: logging.warning(f"{log_prefix} Failed to schedule uncheck: {e}") # Ensure state flag is also reset if hasattr(self, "state"): self.state.test_mode_active = False def _show_network_placeholders(self): """Queues placeholder images for MFD and SAR displays.""" log_prefix = "[App Placeholders]" try: # Create MFD placeholder (dark gray) ph_mfd = np.full( (config.INITIAL_MFD_HEIGHT, config.INITIAL_MFD_WIDTH, 3), 30, dtype=np.uint8, ) # Create SAR placeholder (lighter gray, uses current display size) ph_sar = np.full( (self.state.sar_display_height, self.state.sar_display_width, 3), 60, dtype=np.uint8, ) # Put placeholders onto respective display queues put_queue(self.mfd_queue, ph_mfd, "mfd", self) put_queue(self.sar_queue, ph_sar, "sar", self) except Exception as e: logging.exception(f"{log_prefix} Error creating/queueing placeholders:") # --- Trigger map redraw --- def trigger_map_redraw(self, full_update: bool = False): """ Requests a map redraw. Uses cached data for simple redraws or triggers a full update if needed or requested. """ log_prefix = "[App Trigger Map Redraw]" if self.state.shutting_down: return mgr = getattr(self, "map_integration_manager", None) # Only proceed if map is enabled and manager exists if not (config.ENABLE_MAP_OVERLAY and mgr): return if full_update: # Explicit request for full update (e.g., shift changed) logging.debug(f"{log_prefix} Triggering full map update (explicit)...") self._trigger_map_update_from_sar() else: # Simple redraw requested (e.g., alpha, toggle, marker change) # Check if data required for fast recomposition is available can_recompose = ( self.state.last_processed_sar_for_overlay is not None and self.state.last_sar_warp_matrix is not None and self.state.last_map_image_pil is not None ) if can_recompose: # Queue the fast redraw command logging.debug(f"{log_prefix} Queueing simple REDRAW_MAP command...") put_queue(self.tkinter_queue, ("REDRAW_MAP", None), "tkinter", self) else: # If cache is invalid, force a full update instead logging.warning( f"{log_prefix} Recomposition cache invalid. Triggering full update..." ) self._trigger_map_update_from_sar() # --- Network Data Handlers --- def handle_new_sar_data( self, normalized_image_uint8: np.ndarray, geo_info_radians: Dict[str, Any] ): """Callback executed by receiver when new SAR data is ready.""" if self.state.shutting_down: return # Update AppState with the new data self.state.set_sar_data(normalized_image_uint8, geo_info_radians) # Schedule the main thread processing using after_idle if self.root and self.root.winfo_exists(): self.root.after_idle(self._process_sar_update_on_main_thread) def handle_new_mfd_data(self, image_indices: np.ndarray): """Callback executed by receiver when new MFD data is ready.""" if self.state.shutting_down: return # Update AppState with the new MFD index image self.state.set_mfd_indices(image_indices) # Schedule the main thread processing using after_idle if self.root and self.root.winfo_exists(): self.root.after_idle(self._process_mfd_update_on_main_thread) # --- Main Thread Processing --- def _process_sar_update_on_main_thread(self): """Processes SAR updates in the main thread: UI, pipeline, map, KML, FPS.""" if self.state.shutting_down: return # Update UI labels (SAR Center, Orient, Size) self._update_sar_ui_labels() # Process image for display (LUT, palette, resize, marker) if hasattr(self, "image_pipeline") and self.image_pipeline: self.image_pipeline.process_sar_for_display() # Trigger map update if map is enabled self._trigger_map_update_from_sar() # Handle KML generation if enabled and data valid geo_info = self.state.current_sar_geo_info if geo_info and geo_info.get("valid") and config.ENABLE_KML_GENERATION: self._handle_kml_generation(geo_info) # Update FPS counter self._update_fps_stats("sar") def _handle_kml_generation(self, geo_info): """ Handles KML generation, cleanup, and optional launch. Checks the ENABLE_AUTO_SAR_KML_GENERATION flag before proceeding. """ log_prefix = "[App KML]" # Local log prefix # --- >>> START OF NEW CODE <<< --- # Check the configuration flag first if not config.ENABLE_AUTO_SAR_KML_GENERATION: # Log only if the general KML generation is enabled but auto is disabled if config.ENABLE_KML_GENERATION: logging.debug( f"{log_prefix} Automatic KML generation for SAR footprint is disabled via config flag. Skipping." ) return # Exit if automatic generation is disabled # --- >>> END OF NEW CODE <<< --- # Check if KML generation is globally disabled if not config.ENABLE_KML_GENERATION: # This condition might be redundant if the caller already checks, # but added for safety. return # Check if libraries needed for KML were loaded if not _simplekml_available or not _pyproj_available: # Log this only once maybe? Or check flag? For now, log each time. logging.warning( f"{log_prefix} Skipping KML generation: simplekml or pyproj missing." ) return try: kml_dir = config.KML_OUTPUT_DIRECTORY os.makedirs(kml_dir, exist_ok=True) # Use timestamp with milliseconds for unique filenames ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3] fn = f"sar_footprint_{ts}.kml" fp = os.path.join(kml_dir, fn) # Call utility function to generate KML logging.debug(f"{log_prefix} Generating KML file: {fp}") success = generate_sar_kml(geo_info, fp) if success: logging.debug(f"{log_prefix} KML generation successful.") # Call utility function to clean up old KML files cleanup_kml_output_directory( config.KML_OUTPUT_DIRECTORY, config.MAX_KML_FILES ) # Optionally launch Google Earth if config.AUTO_LAUNCH_GOOGLE_EARTH: launch_google_earth(fp) # Use utility function else: logging.error(f"{log_prefix} KML generation failed (utility returned False).") except Exception as e: logging.exception(f"{log_prefix} Error during KML handling: {e}") def _process_mfd_update_on_main_thread(self): """Processes MFD updates in the main thread: pipeline, FPS.""" if self.state.shutting_down: return # Process image for display (LUT application) if hasattr(self, "image_pipeline") and self.image_pipeline: self.image_pipeline.process_mfd_for_display() # Update FPS counter self._update_fps_stats("mfd") # --- UI Display Update Helpers --- def _update_sar_ui_labels(self): """Updates SAR related UI Entry widgets from AppState.""" cp = getattr(self, "control_panel", None) # Check if control panel exists and its window is valid if not cp or not cp.winfo_exists(): return geo = self.state.current_sar_geo_info lat_s, lon_s, orient_s, size_s = "N/A", "N/A", "N/A", "N/A" is_valid = geo and geo.get("valid") if is_valid: try: # Convert radians to degrees for display/formatting lat_d = math.degrees(geo["lat"]) lon_d = math.degrees(geo["lon"]) orient_d = math.degrees(geo.get("orientation", 0.0)) # Format using DMS utility lat_s = decimal_to_dms(lat_d, True) lon_s = decimal_to_dms(lon_d, False) orient_s = f"{orient_d:.2f}°" # Format orientation # Calculate size in km if possible scale_x = geo.get("scale_x", 0.0) width_px = geo.get("width_px", 0) scale_y = geo.get("scale_y", 0.0) height_px = geo.get("height_px", 0) if scale_x > 0 and width_px > 0 and scale_y > 0 and height_px > 0: size_w_km = (scale_x * width_px) / 1000.0 size_h_km = (scale_y * height_px) / 1000.0 size_s = f"W: {size_w_km:.1f} km, H: {size_h_km:.1f} km" except Exception as e: logging.warning(f"[App UI Update] Error formatting SAR geo labels: {e}") lat_s, lon_s, orient_s, size_s = "Error", "Error", "Error", "Error" is_valid = False # Mark as invalid if formatting fails # Update UI widgets safely using control panel methods try: cp.set_sar_center_coords(lat_s, lon_s) cp.set_sar_orientation(orient_s) cp.set_sar_size_km(size_s) except Exception as e: # Catch errors if UI elements don't exist or Tcl errors logging.exception(f"[App UI Update] Error setting SAR labels: {e}") # Clear mouse coords if GeoInfo becomes invalid if not is_valid: try: cp.set_mouse_coordinates("N/A", "N/A") cp.set_map_mouse_coordinates("N/A", "N/A") except Exception: pass # Ignore errors if UI closed def _update_fps_stats(self, img_type: str): """Updates FPS counters in AppState based on LOG_UPDATE_INTERVAL.""" now = time.time() try: if img_type == "sar": self.state.sar_frame_count += 1 elapsed = now - self.state.sar_update_time # Update FPS if interval passed if elapsed >= config.LOG_UPDATE_INTERVAL: self.state.sar_fps = self.state.sar_frame_count / elapsed # Reset timer and counter self.state.sar_update_time = now self.state.sar_frame_count = 0 elif img_type == "mfd": self.state.mfd_frame_count += 1 elapsed = now - self.state.mfd_start_time # Update FPS if interval passed if elapsed >= config.LOG_UPDATE_INTERVAL: self.state.mfd_fps = self.state.mfd_frame_count / elapsed # Reset timer and counter self.state.mfd_start_time = now self.state.mfd_frame_count = 0 except ZeroDivisionError: pass # Ignore if elapsed time is zero except Exception: pass # Ignore other potential calculation errors # --- Trigger Methods --- def _trigger_sar_update(self): """Triggers SAR reprocessing if not in test mode.""" if self.state.shutting_down or self.state.test_mode_active: return if hasattr(self, "image_pipeline") and self.image_pipeline: # Ask the pipeline to process the current SAR data in state self.image_pipeline.process_sar_for_display() def _trigger_mfd_update(self): """Triggers MFD reprocessing if not in test mode.""" if self.state.shutting_down or self.state.test_mode_active: return if hasattr(self, "image_pipeline") and self.image_pipeline: # Ask the pipeline to process the current MFD data in state self.image_pipeline.process_mfd_for_display() def _trigger_map_update_from_sar(self): """Triggers a full map update based on current SAR data.""" mgr = getattr(self, "map_integration_manager", None) # Check if map is enabled and manager exists if self.state.shutting_down or not config.ENABLE_MAP_OVERLAY or not mgr: return geo = self.state.current_sar_geo_info sar = self.state.current_sar_normalized # Check if data needed for update is valid if geo and geo.get("valid") and sar is not None and sar.size > 0: try: # Call manager method to update map and overlay mgr.update_map_overlay(sar, geo) except Exception as e: logging.exception( f"[App Trigger Map Update] Error calling manager: {e}" ) # --- Periodic Update Scheduling --- def schedule_periodic_updates(self): """Schedules the regular update of the status bar.""" if self.state.shutting_down: return # Update status text now self.update_status() # Calculate delay in milliseconds interval_ms = max(100, int(config.LOG_UPDATE_INTERVAL * 1000)) # Reschedule using root.after if window still exists if self.root and self.root.winfo_exists(): try: self.root.after(interval_ms, self.schedule_periodic_updates) except Exception: # Ignore errors during shutdown pass # --- Queue Processors --- def process_sar_queue(self): """Gets processed SAR image from queue and displays it.""" log_prefix = "[App QProc SAR]" image_to_display = None if self.state.shutting_down: return try: # Get item non-blockingly image_to_display = self.sar_queue.get(block=False) self.sar_queue.task_done() except queue.Empty: pass # Normal case if queue is empty except Exception as e: logging.exception(f"{log_prefix} Error getting from SAR display queue:") # If an image was retrieved, display it if image_to_display is not None: display_mgr = getattr(self, "display_manager", None) if display_mgr: try: display_mgr.show_sar_image(image_to_display) except Exception as display_e: logging.exception(f"{log_prefix} Error calling show_sar_image:") else: # This should not happen if init sequence is correct logging.error(f"{log_prefix} DisplayManager not available.") # Reschedule this processor if not shutting down if not self.state.shutting_down: self._reschedule_queue_processor(self.process_sar_queue) def process_mfd_queue(self): """Gets processed MFD image from queue and displays it.""" log_prefix = "[App QProc MFD]" image_to_display = None if self.state.shutting_down: return try: image_to_display = self.mfd_queue.get(block=False) self.mfd_queue.task_done() except queue.Empty: pass except Exception as e: logging.exception(f"{log_prefix} Error getting from MFD display queue:") if image_to_display is not None: display_mgr = getattr(self, "display_manager", None) if display_mgr: try: display_mgr.show_mfd_image(image_to_display) except Exception as display_e: logging.exception(f"{log_prefix} Error calling show_mfd_image:") else: logging.error(f"{log_prefix} DisplayManager not available.") # Reschedule this processor if not shutting down if not self.state.shutting_down: self._reschedule_queue_processor(self.process_mfd_queue) # --- Tkinter Queue Processor --- def process_tkinter_queue(self): """Processes commands from queue for UI/State updates in main thread.""" log_prefix = "[App QProc Tkinter]" if self.state.shutting_down: return item: Optional[Tuple[str, Any]] = None try: # Get item non-blockingly item = self.tkinter_queue.get(block=False) self.tkinter_queue.task_done() except queue.Empty: pass # Queue is empty, do nothing this cycle except Exception as e: logging.exception(f"{log_prefix} Error getting from queue:") item = None # Ensure item is None if error occurs # Process item if one was retrieved if item is not None: try: # Check if item is a tuple (command, payload) if isinstance(item, tuple) and len(item) == 2: command, payload = item # logging.debug(f"{log_prefix} Processing command: {command}") # Call the appropriate handler based on command string if command == "MOUSE_COORDS": self._handle_sar_mouse_coords_update(payload) elif command == "MAP_MOUSE_COORDS": self._handle_map_mouse_coords_update(payload) elif command == "SHOW_MAP": self._handle_show_map_update(payload) elif command == "REDRAW_MAP": self._handle_redraw_map_command() elif command == "SAR_CLICK_UPDATE": self._handle_sar_click_update(payload) elif command == "MAP_CLICK_UPDATE": self._handle_map_click_update(payload) elif command == "SAR_METADATA_UPDATE": self._handle_sar_metadata_update(payload) else: # Log if an unknown command is received logging.warning(f"{log_prefix} Unknown command: {command}") else: # Log if item format is unexpected logging.warning(f"{log_prefix} Invalid item type: {type(item)}") except Exception as e: logging.exception(f"{log_prefix} Error processing item:") # Reschedule this processor if not shutting down if not self.state.shutting_down: # Use a slightly faster delay for UI responsiveness? self._reschedule_queue_processor(self.process_tkinter_queue, delay=50) # --- Tkinter Queue Command Handlers --- def _handle_sar_mouse_coords_update(self, payload: Optional[Tuple[str, str]]): """Updates the SAR mouse coordinates UI Entry widget.""" cp = getattr(self, "control_panel", None) if not cp: return # Unpack payload or use default "N/A" lat_s, lon_s = ( payload if (payload and isinstance(payload, tuple) and len(payload) == 2) else ("N/A", "N/A") ) try: # Call method on UI object to update the text cp.set_mouse_coordinates(lat_s, lon_s) except Exception as e: logging.warning(f"[App UI Update] Error updating SAR mouse coords: {e}") def _handle_map_mouse_coords_update(self, payload: Optional[Tuple[int, int]]): """Handles MAP_MOUSE_COORDS command: converts pixels to geo and updates UI.""" lat_s, lon_s = ("N/A", "N/A") cp = getattr(self, "control_panel", None) if cp and payload and isinstance(payload, tuple) and len(payload) == 2: # Convert map pixel coords to geo coords using MapIntegrationManager mgr = getattr(self, "map_integration_manager", None) if mgr: geo_coords = mgr.get_geo_coords_from_map_pixel(payload[0], payload[1]) # If conversion successful, format to DMS if geo_coords: lat_s_calc = decimal_to_dms(geo_coords[0], True) lon_s_calc = decimal_to_dms(geo_coords[1], False) # Check for formatting errors if ( "Error" not in lat_s_calc and "Invalid" not in lat_s_calc and "Error" not in lon_s_calc and "Invalid" not in lon_s_calc ): lat_s, lon_s = lat_s_calc, lon_s_calc else: # Use error string if DMS formatting failed lat_s, lon_s = "Error DMS", "Error DMS" # Update the UI element try: cp.set_map_mouse_coordinates(lat_s, lon_s) except Exception as e: logging.warning(f"[App UI Update] Error updating map mouse coords: {e}") elif cp: # Clear UI if payload was invalid try: cp.set_map_mouse_coordinates("N/A", "N/A") except Exception: pass # Ignore errors if UI closed def _handle_show_map_update(self, payload: Optional[ImageType]): """Handles the SHOW_MAP command by delegating display to MapIntegrationManager.""" mgr = getattr(self, "map_integration_manager", None) if mgr: try: # Call the display method on the map manager mgr.display_map(payload) except Exception as e: logging.exception( "[App QProc Tkinter] Error calling map_integration_manager.display_map:" ) else: logging.warning( "[App QProc Tkinter] Received map display command but MapIntegrationManager not active." ) def _handle_redraw_map_command(self): """Handles the REDRAW_MAP command for simple map recomposition.""" mgr = getattr(self, "map_integration_manager", None) if mgr: try: # Call the recomposition method on the map manager mgr._recompose_map_overlay() except Exception as e: logging.exception( "[App QProc Tkinter] Error calling map_integration_manager._recompose_map_overlay:" ) else: logging.warning( "[App QProc Tkinter] REDRAW_MAP ignored: Map manager unavailable." ) def _handle_sar_click_update(self, payload: Optional[Tuple[int, int]]): """Updates the SAR click coordinates state and triggers a SAR redraw.""" log_prefix = "[App SAR Click State]" if self.state.shutting_down: return # Validate payload (should be (x, y) tuple) if payload and isinstance(payload, tuple) and len(payload) == 2: # Store the pixel coordinates in AppState self.state.last_sar_click_coords = payload logging.debug( f"{log_prefix} Updated state.last_sar_click_coords to {payload}" ) # Trigger SAR redraw pipeline to include the marker self._trigger_sar_update() else: logging.warning(f"{log_prefix} Received invalid payload: {payload}") def _handle_map_click_update(self, payload: Optional[Tuple[int, int]]): """Updates the Map click coordinates state and triggers a Map redraw.""" log_prefix = "[App Map Click State]" if self.state.shutting_down: return # Validate payload if payload and isinstance(payload, tuple) and len(payload) == 2: # Store the pixel coordinates in AppState self.state.last_map_click_coords = payload logging.debug( f"{log_prefix} Updated state.last_map_click_coords to {payload}" ) # Trigger map redraw (recomposition should pick up the new marker state) self.trigger_map_redraw(full_update=False) else: logging.warning(f"{log_prefix} Received invalid payload: {payload}") def _handle_sar_metadata_update(self, metadata_string: Optional[str]): """Updates the metadata display text widget.""" log_prefix = "[App Meta Display]" if self.state.shutting_down: return # Check if the text widget exists (attribute of ControlPanelApp) text_widget = getattr(self, "metadata_display_text", None) if text_widget and isinstance(metadata_string, str): # Cache the last received string in state self.state.last_sar_metadata_str = metadata_string # Update UI only if the display is currently enabled if self.state.display_sar_metadata: logging.debug(f"{log_prefix} Updating metadata display widget.") self.set_metadata_display(metadata_string) # Use method on self else: # Store it, but don't update UI if checkbox is off logging.debug( f"{log_prefix} Metadata received but display is disabled. Cached." ) elif text_widget and metadata_string is None: # Handle case where None might be sent (e.g., error during formatting) self.state.last_sar_metadata_str = None if self.state.display_sar_metadata: self.set_metadata_display("") elif not text_widget: # This shouldn't happen if init is correct logging.warning( f"{log_prefix} Metadata text widget not available to display metadata." ) def _reschedule_queue_processor(self, processor_func, delay: Optional[int] = None): """Helper method to reschedule a queue processor function using root.after.""" if self.state.shutting_down: return # Don't reschedule if shutting down if delay is None: # Determine default delay based on processor type if processor_func in [self.process_sar_queue, self.process_mfd_queue]: # Target slightly faster than FPS for image display queues target_fps = config.MFD_FPS calculated_delay = 1000 / (target_fps * 1.5) if target_fps > 0 else 20 delay = max(10, int(calculated_delay)) # Minimum delay 10ms else: # Default delay for other queues (tkinter, mouse) delay = 50 # Use slightly faster delay for UI responsiveness try: # Schedule only if root window exists if self.root and self.root.winfo_exists(): self.root.after(delay, processor_func) except Exception as e: # Log error only if not shutting down if not self.state.shutting_down: logging.warning( f"[App Rescheduler] Error rescheduling {processor_func.__name__}: {e}" ) # --- Mouse Coordinate Handling (SAR) --- def process_mouse_queue(self): """Processes raw SAR mouse coords from queue, calculates geo coords, queues result.""" log_prefix = "[App GeoCalc]" if self.state.shutting_down: return raw_coords = None try: # Get raw pixel coordinates from the dedicated mouse queue raw_coords = self.mouse_queue.get(block=False) self.mouse_queue.task_done() except queue.Empty: pass # Nothing to process except Exception as e: logging.exception(f"{log_prefix} Error getting from mouse queue:") # Process if valid coordinates received if isinstance(raw_coords, tuple) and len(raw_coords) == 2: x_disp, y_disp = raw_coords geo = self.state.current_sar_geo_info disp_w = self.state.sar_display_width disp_h = self.state.sar_display_height lat_s, lon_s = "N/A", "N/A" # Check if geo info is valid for calculation is_geo_valid_for_calc = ( geo and geo.get("valid") and disp_w > 0 and disp_h > 0 and geo.get("width_px", 0) > 0 and geo.get("height_px", 0) > 0 and geo.get("scale_x", 0.0) > 0 and geo.get("scale_y", 0.0) > 0 and all( k in geo for k in ["lat", "lon", "ref_x", "ref_y", "orientation"] ) ) if is_geo_valid_for_calc: try: # Extract values from geo info dictionary orig_w: int = geo["width_px"] orig_h: int = geo["height_px"] scale_x: float = geo["scale_x"] scale_y: float = geo["scale_y"] ref_x: int = geo["ref_x"] ref_y: int = geo["ref_y"] ref_lat_rad: float = geo["lat"] ref_lon_rad: float = geo["lon"] angle_rad: float = geo.get("orientation", 0.0) # --- Simplified Geo Calculation --- # TODO: Implement more accurate calculation accounting for rotation # Scale display coords to approximate original image pixel coords orig_x: float = (x_disp / disp_w) * orig_w orig_y: float = (y_disp / disp_h) * orig_h # Calculate distance from reference pixel pixel_delta_x: float = orig_x - ref_x pixel_delta_y: float = ref_y - orig_y # Y increases downwards # Convert pixel delta to meters meters_delta_x: float = pixel_delta_x * scale_x meters_delta_y: float = pixel_delta_y * scale_y # Convert meter offsets to degree offsets (approximate) M_PER_DLAT: float = 111132.954 M_PER_DLON_EQ: float = 111319.488 m_per_dlon: float = max( abs(M_PER_DLON_EQ * math.cos(ref_lat_rad)), 1e-3 ) lat_offset_deg: float = meters_delta_y / M_PER_DLAT lon_offset_deg: float = meters_delta_x / m_per_dlon # Calculate final coordinates final_lat_deg: float = math.degrees(ref_lat_rad) + lat_offset_deg final_lon_deg: float = math.degrees(ref_lon_rad) + lon_offset_deg # Validate calculated coordinates and format to DMS lat_valid = ( math.isfinite(final_lat_deg) and abs(final_lat_deg) <= 90.0 ) lon_valid = ( math.isfinite(final_lon_deg) and abs(final_lon_deg) <= 180.0 ) if lat_valid and lon_valid: lat_s_calc = decimal_to_dms(final_lat_deg, True) lon_s_calc = decimal_to_dms(final_lon_deg, False) # Check for formatting errors if ( "Error" not in lat_s_calc and "Invalid" not in lat_s_calc and "Error" not in lon_s_calc and "Invalid" not in lon_s_calc ): lat_s, lon_s = lat_s_calc, lon_s_calc else: lat_s, lon_s = "Error DMS", "Error DMS" else: lat_s, lon_s = "Invalid Calc", "Invalid Calc" except KeyError as ke: logging.error(f"{log_prefix} Missing key in geo_info: {ke}") lat_s, lon_s = "Error Key", "Error Key" except Exception as calc_e: logging.exception(f"{log_prefix} Geo calculation error:") lat_s, lon_s = "Calc Error", "Calc Error" # Queue Result (DMS strings or error strings) for UI update self.put_mouse_coordinates_queue(("MOUSE_COORDS", (lat_s, lon_s))) # Reschedule processor if not self.state.shutting_down: self._reschedule_queue_processor(self.process_mouse_queue, delay=100) def put_mouse_coordinates_queue( self, command_payload_tuple: Tuple[str, Tuple[str, str]] ): """Puts processed mouse coords tuple onto Tkinter queue.""" log_prefix = "[App Mouse Queue Put]" if self.state.shutting_down: return command, payload = command_payload_tuple # logging.debug(f"{log_prefix} Putting command '{command}' payload {payload} onto tkinter_queue.") put_queue( queue_obj=self.tkinter_queue, item=(command, payload), queue_name="tkinter", app_instance=self, ) # --- Metadata Display Update Method --- def set_metadata_display(self, text: str): """Safely updates the content of the metadata tk.Text widget.""" # Access the widget stored as an attribute of the app instance text_widget = getattr(self, "metadata_display_text", None) if not text_widget: logging.warning( "[App UI Update] set_metadata_display called but text widget not found on app." ) return try: # Ensure the widget still exists before configuring if text_widget.winfo_exists(): text_widget.config(state="normal") # Enable editing text_widget.delete("1.0", tk.END) # Clear existing content text_widget.insert("1.0", text) # Insert new text text_widget.config(state="disabled") # Disable editing except Exception as e: logging.warning(f"[App UI Update] Error setting metadata display text: {e}") # --- Status Update --- def update_status(self): """Updates status bar text and statistics display.""" if not hasattr(self, "state") or self.state.shutting_down: return # Avoid updating if map is still loading initially try: sb = getattr(self, "statusbar", None) if sb and sb.winfo_exists() and "Loading" in sb.cget("text"): return except Exception: pass # Ignore errors checking status bar text stats = self.state.get_statistics() try: # Determine current mode mode = ( "Test" if self.state.test_mode_active else ("Local" if config.USE_LOCAL_IMAGES else "Network") ) # Check if map is active map_on = ( " MapOn" if ( config.ENABLE_MAP_OVERLAY and hasattr(self, "map_integration_manager") and self.map_integration_manager ) else "" ) # Format FPS strings mfd_fps = f"MFD:{self.state.mfd_fps:.1f}fps" sar_fps = ( f"SAR:{self.state.sar_fps:.1f}fps" if self.state.sar_fps > 0 else "SAR:N/A" ) # Construct status prefix status_prefix = f"Status: {mode}{map_on} | {mfd_fps} | {sar_fps}" # Format statistics strings drop = ( f"Drop(Q): S={stats['dropped_sar_q']}," f"M={stats['dropped_mfd_q']}," f"Tk={stats['dropped_tk_q']}," f"Mo={stats['dropped_mouse_q']}" ) incmpl = ( f"Incmpl(RX): S={stats['incomplete_sar_rx']}," f"M={stats['incomplete_mfd_rx']}" ) # Schedule UI updates using after_idle for thread safety if self.root and self.root.winfo_exists(): # Update status bar if sb and sb.winfo_exists(): final_status = status_prefix # Keep status bar cleaner self.root.after_idle(sb.set_status_text, final_status) # Update statistics display in the control panel cp = getattr(self, "control_panel", None) if cp and cp.winfo_exists(): self.root.after_idle(cp.set_statistics_display, drop, incmpl) except Exception as e: # Log warning but don't crash the app for status update errors logging.warning(f"[App Status Update] Error during status update: {e}") # --- Cleanup --- def close_app(self): """Performs graceful shutdown of all components.""" # Prevent multiple shutdown attempts if hasattr(self, "state") and self.state.shutting_down: return # Should not happen, but safety check if not hasattr(self, "state"): sys.exit(1) logging.info("[App Shutdown] Starting shutdown sequence...") self.state.shutting_down = True # Attempt to update status bar try: self.set_status("Closing...") except Exception: pass # Shutdown components in reverse order of dependency if hasattr(self, "test_mode_manager"): self.test_mode_manager.stop_timers() if hasattr(self, "map_integration_manager") and self.map_integration_manager: self.map_integration_manager.shutdown() if hasattr(self, "image_recorder") and self.image_recorder: self.image_recorder.shutdown() # Network cleanup if self.udp_socket: close_udp_socket(self.udp_socket) self.udp_socket = None # Join receiver thread if self.udp_thread and self.udp_thread.is_alive(): logging.debug("[App Shutdown] Joining UDP receiver thread...") self.udp_thread.join(timeout=0.5) # Short timeout if self.udp_thread.is_alive(): logging.warning("[App Shutdown] UDP thread did not join cleanly.") # Shutdown worker pool pool = ( getattr(self.udp_receiver, "executor", None) if hasattr(self, "udp_receiver") else None ) if pool: logging.debug("[App Shutdown] Shutting down ThreadPoolExecutor...") pool.shutdown(wait=False, cancel_futures=True) # Display cleanup if hasattr(self, "display_manager"): self.display_manager.destroy_windows() # Short wait for OpenCV windows to close try: cv2.waitKey(5) except Exception: pass # Tkinter cleanup try: if self.root and self.root.winfo_exists(): logging.debug("[App Shutdown] Destroying Tkinter root window...") self.root.destroy() except Exception as e: logging.exception(f"[App Shutdown] Error destroying Tkinter window: {e}") logging.info("[App Shutdown] Application close sequence finished.") # Use sys.exit(0) for a clean exit code sys.exit(0) # --- Main Execution Block --- if __name__ == "__main__": root = None app_instance = None try: # Check for critical map library dependencies if map is enabled if config.ENABLE_MAP_OVERLAY and not MAP_MODULES_LOADED: msg = "Map enabled but required modules failed. Cannot start." logging.critical(f"[App Main] {msg}") print(f"ERROR: {msg} Check logs for missing libraries.") sys.exit(1) # Create main Tkinter window root = create_main_window( "Control Panel", config.TKINTER_MIN_WIDTH, config.TKINTER_MIN_HEIGHT, 10, 10, # Initial position, App constructor calculates final ) # Instantiate the main application class app_instance = ControlPanelApp(root) # Pass root window # Set the window close protocol handler root.protocol("WM_DELETE_WINDOW", app_instance.close_app) # Start the Tkinter main event loop root.mainloop() except SystemExit as exit_e: # Handle clean exits initiated by sys.exit() exit_code = exit_e.code if isinstance(exit_e.code, int) else 1 log_level = logging.INFO if exit_code == 0 else logging.WARNING # Log the exit code without re-raising SystemExit logging.log( log_level, f"[App Main] Application exited via sys.exit({exit_code})." ) except ImportError as imp_err: # Handle critical import errors during startup logging.critical( f"[App Main] CRITICAL IMPORT ERROR: {imp_err}. Application cannot start.", exc_info=True, ) print( f"\nCRITICAL ERROR: Missing required library - {imp_err}\n" "Please install the necessary libraries (check logs/readme) and try again.\n" ) sys.exit(1) except Exception as e: # Catch any other unhandled exceptions during startup or main loop logging.critical( "[App Main] UNHANDLED EXCEPTION during startup or main loop:", exc_info=True ) print( "\nFATAL ERROR: An unhandled exception occurred. Check logs for details.\n" ) sys.exit(1) finally: # Ensure logging is shut down properly on any exit path logging.info("=== App End ===") logging.shutdown() # --- END OF FILE ControlPanel.py ---