SXXXXXXX_ControlPanel/controlpanel/app_main.py
2025-10-16 09:52:34 +02:00

1470 lines
77 KiB
Python

# app_main.py
"""
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED.
Main application module for the Control Panel application.
Orchestrates UI, display, network reception, image processing pipeline,
test mode management, map integration, image recording, and state management.
Initializes all sub-modules and manages the main application lifecycle.
"""
# --- Standard library imports ---
import threading
import time
import queue
import os
import logging
import math
import sys
import socket
from typing import Optional, Tuple, Any, Dict, TYPE_CHECKING, Union, List, Callable
import datetime
import tkinter.filedialog as fd
from pathlib import Path
import tkinter as tk
from tkinter import ttk
from tkinter.scrolledtext import ScrolledText
# --- Third-party imports ---
import numpy as np
import cv2
import screeninfo
# --- PIL Import and Type Definition ---
try:
from PIL import Image, ImageTk
ImageType = Image.Image
except ImportError:
ImageType = Any
Image = None
ImageTk = None
logging.critical(
"[App Init] Pillow library not found. Map/Image functionality will fail."
)
# --- Absolute Imports for Application Modules ---
try:
from controlpanel import config
from controlpanel.logging_config import setup_logging
from controlpanel.app_state import AppState
# GUI related imports
from controlpanel.gui.ui import ControlPanel as UIPanel, StatusBar, create_main_window
from controlpanel.gui.display import DisplayManager
# Core processing imports
from controlpanel.core.sfp_transport import SfpTransport
from controlpanel.core.receiver import ImagePayloadProcessor
from controlpanel.core.test_mode_manager import TestModeManager
from controlpanel.core.image_pipeline import ImagePipeline
from controlpanel.core.image_recorder import ImageRecorder
# Utility imports
from controlpanel.utils.utils import (
put_queue, clear_queue, decimal_to_dms, dms_string_to_decimal,
generate_sar_kml, launch_google_earth, cleanup_kml_output_directory,
open_google_maps, generate_lookat_and_point_kml, generate_composite_kml,
_simplekml_available, _pyproj_available, format_ctypes_structure,
)
# Network utilities are now used by sfp_transport, but keep for now if needed elsewhere
from controlpanel.utils.network import create_udp_socket, close_udp_socket
from controlpanel.utils.image_processing import load_image, normalize_image, apply_color_palette
except ImportError as app_import_err:
print(f"FATAL ERROR: Failed to import core application modules: {app_import_err}")
logging.critical(f"FATAL ERROR: Failed to import core application modules: {app_import_err}", exc_info=True)
sys.exit(1)
# --- Map related imports (Conditional) ---
map_libs_found = True
try:
import mercantile
import pyproj
if Image is None and ImageType is not Any:
raise ImportError("Pillow failed import")
except ImportError as map_lib_err:
logging.warning(
f"[App Init] Core map lib import failed ({map_lib_err}). Map disabled."
)
map_libs_found = False
BaseMapService = None
MapTileManager = None
MapDisplayWindow = None
MapIntegrationManager = None
MapCalculationError = Exception
if map_libs_found:
try:
from controlpanel.map.map_services import get_map_service, BaseMapService
from controlpanel.map.map_manager import MapTileManager
from controlpanel.map.map_utils import MapCalculationError
from controlpanel.map.map_display import MapDisplayWindow
from controlpanel.map.map_integration import MapIntegrationManager
MAP_MODULES_LOADED = True
except ImportError as map_import_err:
logging.warning(
f"[App Init] Specific map module import failed ({map_import_err}). Map disabled."
)
MAP_MODULES_LOADED = False
BaseMapService = None
MapTileManager = None
MapDisplayWindow = None
MapIntegrationManager = None
MapCalculationError = Exception
else:
MAP_MODULES_LOADED = False
# --- Version Info Import ---
try:
from controlpanel import _version as wrapper_version
WRAPPER_APP_VERSION_STRING = f"{wrapper_version.__version__} ({wrapper_version.GIT_BRANCH}/{wrapper_version.GIT_COMMIT_HASH[:7]})"
WRAPPER_BUILD_INFO = f"Wrapper Built: {wrapper_version.BUILD_TIMESTAMP}"
except ImportError:
WRAPPER_APP_VERSION_STRING = "(Dev Wrapper)"
WRAPPER_BUILD_INFO = "Wrapper build time unknown"
# --- GeoElevation Integration ---
GEOELEVATION_AVAILABLE: bool = False
get_elevation_function: Optional[Callable] = None
_geoelevation_log_prefix = "[GeoElevation Integration]"
try:
configured_path_str = getattr(config, "GEOELEVATION_PROJECT_ROOT_PATH", None)
if configured_path_str and isinstance(configured_path_str, str) and configured_path_str.strip():
configured_path = Path(configured_path_str.strip())
logging.info(f"{_geoelevation_log_prefix} GEOELEVATION_PROJECT_ROOT_PATH configured to: '{configured_path_str}'")
if not configured_path.is_absolute():
controlpanel_package_dir = Path(__file__).parent
project_root_dir = controlpanel_package_dir.parent
absolute_geoelevation_path = (project_root_dir / configured_path).resolve()
logging.debug(f"{_geoelevation_log_prefix} Resolved relative path '{configured_path}' to '{absolute_geoelevation_path}'")
configured_path = absolute_geoelevation_path
potential_module_dir = configured_path / "geoelevation"
if configured_path.is_dir() and potential_module_dir.is_dir() and (potential_module_dir / "__init__.py").is_file():
path_to_add_to_sys = str(configured_path)
path_added_to_sys = False
if path_to_add_to_sys not in sys.path:
sys.path.insert(0, path_to_add_to_sys)
path_added_to_sys = True
logging.info(f"{_geoelevation_log_prefix} Added '{path_to_add_to_sys}' to sys.path for GeoElevation import.")
try:
from geoelevation import get_point_elevation as ge_get_point_elevation
GEOELEVATION_AVAILABLE = True
get_elevation_function = ge_get_point_elevation
logging.info(f"{_geoelevation_log_prefix} Module 'geoelevation.elevation_service.get_point_elevation' loaded successfully. Elevation feature ENABLED.")
if path_added_to_sys and path_to_add_to_sys in sys.path:
try:
sys.path.remove(path_to_add_to_sys)
except ValueError:
pass
except ImportError as e_import:
logging.warning(f"{_geoelevation_log_prefix} Failed to import 'get_point_elevation' from GeoElevation module at '{potential_module_dir}': {e_import}. Elevation feature DISABLED.")
GEOELEVATION_AVAILABLE = False
if path_added_to_sys and path_to_add_to_sys in sys.path:
try:
sys.path.remove(path_to_add_to_sys)
except ValueError:
pass
except Exception as e_general_import:
logging.error(f"{_geoelevation_log_prefix} An unexpected error occurred during GeoElevation import: {e_general_import}. Elevation feature DISABLED.", exc_info=True)
GEOELEVATION_AVAILABLE = False
if path_added_to_sys and path_to_add_to_sys in sys.path:
try: sys.path.remove(path_to_add_to_sys)
except ValueError: pass
else:
logging.warning(f"{_geoelevation_log_prefix} Configured GeoElevation path '{configured_path}' or its 'geoelevation' subfolder (with __init__.py) is not valid. Elevation feature DISABLED.")
GEOELEVATION_AVAILABLE = False
else:
logging.info(f"{_geoelevation_log_prefix} GEOELEVATION_PROJECT_ROOT_PATH not configured or empty. Elevation feature DISABLED.")
GEOELEVATION_AVAILABLE = False
except Exception as e_outer_config:
logging.error(f"{_geoelevation_log_prefix} Error processing GeoElevation configuration: {e_outer_config}. Elevation feature DISABLED.", exc_info=True)
GEOELEVATION_AVAILABLE = False
class ControlPanelApp:
"""
Main application class. Manages UI, display, processing, network, state,
and orchestrates various managers.
"""
def __init__(self, root: tk.Tk):
"""Initializes the main application components and state."""
log_prefix = "[App Init]"
logging.debug(f"{log_prefix} Starting initialization...")
self.root = root
self.root.title(f"Control Panel - {WRAPPER_APP_VERSION_STRING}")
try:
if getattr(sys, "frozen", False):
script_dir = os.path.dirname(sys.executable)
elif "__file__" in locals() or "__file__" in globals():
script_dir = os.path.dirname(os.path.abspath(__file__))
else:
script_dir = os.getcwd()
icon_path = os.path.join(script_dir, "ControlPanel.ico")
if os.path.exists(icon_path):
self.root.iconbitmap(default=icon_path)
except Exception as icon_e:
logging.warning(f"{log_prefix} Icon error: {icon_e}")
self.state = AppState()
self.sar_queue = queue.Queue(config.DEFAULT_SAR_QUEUE)
self.mouse_queue = queue.Queue(config.DEFAULT_MOUSE_QUEUE)
self.tkinter_queue = queue.Queue(config.DEFAULT_TKINTER_QUEUE)
self.mfd_queue = queue.Queue(config.DEFAULT_MFD_QUEUE)
screen_w, screen_h = self._get_screen_dimensions()
initial_sar_w, initial_sar_h = self._calculate_initial_sar_size()
self.tkinter_x, self.tkinter_y = self._calculate_tkinter_position(screen_h)
self.original_window_width = config.TKINTER_MIN_WIDTH
self.metadata_panel_width = 300
self.expanded_window_width = self.original_window_width + self.metadata_panel_width + 10
initial_height = max(config.TKINTER_MIN_HEIGHT, 650)
self.root.geometry(f"{self.original_window_width}x{initial_height}+{self.tkinter_x}+{self.tkinter_y}")
self.root.minsize(self.original_window_width, config.TKINTER_MIN_HEIGHT)
self.statusbar = StatusBar(self.root)
self.statusbar.pack(side=tk.BOTTOM, fill=tk.X)
self.container_frame = ttk.Frame(self.root)
self.container_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True)
self.container_frame.columnconfigure(0, weight=1)
self.container_frame.columnconfigure(1, weight=0)
self.container_frame.rowconfigure(0, weight=1)
self.control_panel = UIPanel(self.container_frame, self)
self.control_panel.grid(row=0, column=0, sticky="nsew")
self.metadata_frame = ttk.Labelframe(self.container_frame, text="Raw SAR Metadata", padding=5)
self.metadata_text_frame = ttk.Frame(self.metadata_frame)
self.metadata_text_frame.pack(fill=tk.BOTH, expand=True)
self.metadata_scrollbar = ttk.Scrollbar(self.metadata_text_frame, orient=tk.VERTICAL)
self.metadata_display_text = tk.Text(
self.metadata_text_frame, wrap=tk.NONE, state="disabled", height=8,
yscrollcommand=self.metadata_scrollbar.set, font=("Courier New", 8)
)
self.metadata_scrollbar.config(command=self.metadata_display_text.yview)
self.metadata_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.metadata_display_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self.set_metadata_display("Enable 'Show SAR Metadata' checkbox to view data...")
self.mfd_x, self.mfd_y = self._calculate_mfd_position()
self.sar_x, self.sar_y = self._calculate_sar_position(screen_w, initial_sar_w)
map_max_w = (
config.MAX_MAP_DISPLAY_WIDTH if not MapDisplayWindow
else getattr(MapDisplayWindow, "MAX_DISPLAY_WIDTH", config.MAX_MAP_DISPLAY_WIDTH)
)
map_x, map_y = self._calculate_map_position(screen_w, initial_sar_w, map_max_w)
self.display_manager = DisplayManager(
self, self.sar_queue, self.mouse_queue, self.sar_x, self.sar_y, self.mfd_x,
self.mfd_y, initial_sar_w, initial_sar_h
)
try:
self.display_manager.initialize_display_windows()
except Exception as e:
self.set_status("Error: Display Init Failed")
logging.critical(f"{log_prefix} Display init failed: {e}", exc_info=True)
self.update_brightness_contrast_lut()
self.update_mfd_lut()
self.image_pipeline = ImagePipeline(self.state, self.sar_queue, self.mfd_queue, self)
self.test_mode_manager = TestModeManager(self.state, self.root, self.sar_queue, self.mfd_queue, self)
self.map_integration_manager = None
if config.ENABLE_MAP_OVERLAY and MAP_MODULES_LOADED and MapIntegrationManager:
try:
self.map_integration_manager = MapIntegrationManager(self.state, self.tkinter_queue, self, map_x, map_y)
except Exception as map_e:
logging.exception(f"{log_prefix} MapIntegrationManager init failed:")
self.map_integration_manager = None
self.set_status("Error: Map Init Failed")
self.image_recorder = None
if ImageRecorder:
try:
self.image_recorder = ImageRecorder(self.state)
except Exception as rec_e:
logging.exception(f"{log_prefix} ImageRecorder init failed:")
self.image_recorder = None
self._update_initial_ui_display()
# --- NEW Network Setup using SfpTransport ---
self.local_ip = config.DEFAULT_SER_IP
self.local_port = config.DEFAULT_SER_PORT
self.sfp_transport: Optional[SfpTransport] = None
self.payload_processor: Optional[ImagePayloadProcessor] = None
if not config.USE_LOCAL_IMAGES:
self._setup_network_receiver()
else:
self._start_initial_image_loader()
# --- Start loops/timers ---
self.process_sar_queue()
self.process_mfd_queue()
self.process_mouse_queue()
self.process_tkinter_queue()
self.schedule_periodic_updates()
self.update_image_mode()
self.active_elevation_requests: Dict[str, threading.Thread] = {}
logging.info(f"{log_prefix} Application initialization complete.")
if GEOELEVATION_AVAILABLE:
logging.info(f"{log_prefix} GeoElevation functionality is AVAILABLE.")
else:
logging.warning(f"{log_prefix} GeoElevation functionality is UNAVAILABLE.")
def _setup_network_receiver(self):
"""Creates the payload processor and SFP transport, then starts the transport layer."""
log_prefix = "[App Init Network]"
logging.info(f"{log_prefix} Setting up SFP transport layer and payload processor...")
try:
# 1. Create the application-specific payload processor
self.payload_processor = ImagePayloadProcessor(
app=self,
set_new_sar_image_callback=self.handle_new_sar_data,
set_new_mfd_indices_image_callback=self.handle_new_mfd_data,
image_recorder=self.image_recorder,
)
# 2. Define the handlers for the transport layer
payload_handlers = {
ord('S'): self.payload_processor.process_sar_payload,
ord('M'): self.payload_processor.process_mfd_payload,
}
# 3. Create and start the transport layer
self.sfp_transport = SfpTransport(
host=self.local_ip,
port=self.local_port,
payload_handlers=payload_handlers,
)
if self.sfp_transport.start():
self.set_status(f"Listening UDP {self.local_ip}:{self.local_port}")
else:
self.set_status("Error: Transport Start Failed")
self.sfp_transport = None
except Exception as e:
logging.critical(f"{log_prefix} Failed to initialize network components: {e}", exc_info=True)
self.set_status("Error: Network Init Failed")
self.sfp_transport = None
self.payload_processor = None
def close_app(self):
"""Performs graceful shutdown of all components."""
if hasattr(self, "state") and self.state.shutting_down:
return
if not hasattr(self, "state"):
sys.exit(1)
logging.info("[App Shutdown] Starting shutdown sequence...")
self.state.shutting_down = True
try:
self.set_status("Closing...")
except Exception: pass
if self.sfp_transport:
self.sfp_transport.shutdown()
self.sfp_transport = None
if hasattr(self, "test_mode_manager"): self.test_mode_manager.stop_timers()
if hasattr(self, "map_integration_manager") and self.map_integration_manager: self.map_integration_manager.shutdown()
if hasattr(self, "image_recorder") and self.image_recorder: self.image_recorder.shutdown()
if hasattr(self, "display_manager"): self.display_manager.destroy_windows()
try: cv2.waitKey(5)
except Exception: pass
try:
if self.root and self.root.winfo_exists():
logging.debug("[App Shutdown] Destroying Tkinter root window...")
self.root.destroy()
except Exception as e:
logging.exception(f"[App Shutdown] Error destroying Tkinter window: {e}")
logging.info("[App Shutdown] Application close sequence finished.")
sys.exit(0)
def set_status(self, message: str):
log_prefix = "[App Set Status]"
if not hasattr(self, "state") or self.state.shutting_down: return
new_status_prefix = f"Status: {message}"
def _update():
if not hasattr(self, "state") or self.state.shutting_down: return
try:
statusbar = getattr(self, "statusbar", None)
if statusbar and isinstance(statusbar, tk.Widget) and statusbar.winfo_exists():
current_text: str = statusbar.cget("text")
parts = current_text.split("|", 1)
suffix = f" | {parts[1].strip()}" if len(parts) > 1 else ""
final_text = f"{new_status_prefix}{suffix}"
if hasattr(statusbar, "set_status_text"): statusbar.set_status_text(final_text)
else: statusbar.config(text=final_text)
except Exception as e: logging.exception(f"{log_prefix} Error updating status bar text: {e}")
try:
if hasattr(self, "root") and self.root and self.root.winfo_exists():
self.root.after_idle(_update)
except Exception as e:
logging.warning(f"{log_prefix} Error scheduling status update: {e}")
def request_elevation_for_point(self, source_description: str, latitude: float, longitude: float):
_log_prefix_req = "[App Elev Req]"
if not GEOELEVATION_AVAILABLE or get_elevation_function is None:
self._update_elevation_display(source_description, "GeoElevation N/A")
return
if not (math.isfinite(latitude) and math.isfinite(longitude)):
self._update_elevation_display(source_description, "Invalid Coords")
return
request_id = f"{source_description}_{latitude:.6f}_{longitude:.6f}"
if request_id in self.active_elevation_requests and self.active_elevation_requests[request_id].is_alive():
return
logging.info(f"{_log_prefix_req} Requesting elevation for {source_description} at ({latitude:.4f}, {longitude:.4f}). Request ID: {request_id}")
self._update_elevation_display(source_description, "Elevation: Requesting...")
elevation_thread = threading.Thread(
target=self._get_elevation_worker,
args=(request_id, source_description, latitude, longitude, get_elevation_function),
name=f"ElevationWorker-{request_id}", daemon=True
)
self.active_elevation_requests[request_id] = elevation_thread
elevation_thread.start()
def _get_elevation_worker(self, request_id: str, source_description: str, latitude: float, longitude: float, elevation_func: Callable):
_log_prefix_worker = f"[App Elev Work] ({request_id})"
result_payload: Dict[str, Any] = {"request_id": request_id, "source_description": source_description, "latitude": latitude, "longitude": longitude, "elevation_value": None, "display_text": "Elev: Error", "error": None}
try:
elevation = elevation_func(latitude, longitude, show_progress_dialog=False)
if elevation is None:
result_payload["display_text"] = "Elev: N/A"
result_payload["elevation_value"] = None
elif elevation != elevation:
result_payload["display_text"] = "Elev: NoData"
result_payload["elevation_value"] = float('nan')
else:
numeric_elevation = float(elevation)
result_payload["elevation_value"] = numeric_elevation
result_payload["display_text"] = f"{numeric_elevation:.1f} m"
except RuntimeError as e_rt:
result_payload["display_text"] = "Elev: Err(RT)"
result_payload["error"] = str(e_rt)
except Exception as e_generic:
result_payload["display_text"] = "Elev: Err(Sys)"
result_payload["error"] = str(e_generic)
finally:
if hasattr(self, 'tkinter_queue') and self.tkinter_queue is not None:
put_queue(self.tkinter_queue, ("ELEVATION_RESULT", result_payload), "tkinter", self)
if request_id in self.active_elevation_requests:
del self.active_elevation_requests[request_id]
def _update_elevation_display(self, source_description: str, display_text_value: str):
control_panel_ui = getattr(self, "control_panel", None)
if not control_panel_ui: return
try:
if source_description == config.ELEVATION_SOURCE_SAR_CENTER:
if hasattr(control_panel_ui, 'set_sar_center_elevation'): control_panel_ui.set_sar_center_elevation(display_text_value)
elif source_description == config.ELEVATION_SOURCE_SAR_MOUSE:
if hasattr(control_panel_ui, 'set_sar_mouse_elevation'): control_panel_ui.set_sar_mouse_elevation(display_text_value)
elif source_description == config.ELEVATION_SOURCE_MAP_MOUSE:
if hasattr(control_panel_ui, 'set_map_mouse_elevation'): control_panel_ui.set_map_mouse_elevation(display_text_value)
except Exception: logging.exception(f"[App Elev Disp] Error updating elevation display for '{source_description}':")
def update_brightness_contrast_lut(self):
if not hasattr(self, "state"): return
try:
contrast = max(0.01, self.state.sar_contrast)
brightness = self.state.sar_brightness
lut_values = np.arange(256, dtype=np.float32)
adjusted = (lut_values * contrast) + brightness
self.state.brightness_contrast_lut = np.clip(np.round(adjusted), 0, 255).astype(np.uint8)
except Exception as e:
logging.exception(f"[App Update SAR LUT] Error calculating SAR B/C LUT: {e}")
self.state.brightness_contrast_lut = np.arange(256, dtype=np.uint8)
def update_mfd_lut(self):
if not hasattr(self, "state"): return
try:
params = self.state.mfd_params
raw_map_factor = params["raw_map_intensity"] / 255.0
pixel_map = params["pixel_to_category"]
categories = params["categories"]
new_lut = np.zeros((256, 3), dtype=np.uint8)
for idx in range(256):
cat_name = pixel_map.get(idx)
if cat_name:
cat_data = categories[cat_name]
bgr = cat_data["color"]
intensity_factor = cat_data["intensity"] / 255.0
new_lut[idx, 0] = np.clip(int(round(float(bgr[0]) * intensity_factor)), 0, 255)
new_lut[idx, 1] = np.clip(int(round(float(bgr[1]) * intensity_factor)), 0, 255)
new_lut[idx, 2] = np.clip(int(round(float(bgr[2]) * intensity_factor)), 0, 255)
elif 32 <= idx <= 255:
raw_intensity = (float(idx) - 32.0) * (255.0 / 223.0)
final_gray = int(round(np.clip(raw_intensity * raw_map_factor, 0, 255)))
new_lut[idx, :] = final_gray
self.state.mfd_lut = new_lut
except Exception as e:
logging.critical(f"[MFD LUT Update] CRITICAL MFD LUT error:", exc_info=True)
self._apply_fallback_mfd_lut()
def _apply_fallback_mfd_lut(self):
if hasattr(self, "state"):
try:
gray_ramp = np.arange(256, dtype=np.uint8)[:, np.newaxis]
self.state.mfd_lut = cv2.cvtColor(gray_ramp, cv2.COLOR_GRAY2BGR)[:, 0, :]
except Exception as fb_e:
logging.critical(f"[MFD LUT Update] Fallback LUT failed: {fb_e}")
self.state.mfd_lut = np.zeros((256, 3), dtype=np.uint8)
def update_image_mode(self):
log_prefix = "[App Mode Switch]"
if not hasattr(self, "state") or not hasattr(self, "test_mode_manager") or self.state.shutting_down: return
try:
cp = getattr(self, "control_panel", None)
var = getattr(cp, "test_image_var", None) if cp else None
is_test_req = var.get() == 1 if (var and isinstance(var, tk.Variable)) else self.state.test_mode_active
if is_test_req != self.state.test_mode_active:
self.state.test_mode_active = is_test_req
if is_test_req:
if self.test_mode_manager.activate(): self.activate_test_mode_ui_actions()
else: self._revert_test_mode_ui()
else:
self.test_mode_manager.deactivate()
self.deactivate_test_mode_ui_actions()
self.state.reset_statistics()
self.update_status()
except Exception as e:
logging.exception(f"{log_prefix} Error changing image mode: {e}")
def update_sar_size(self, event=None):
log_prefix = "[App CB SAR Size]"
if not hasattr(self, "state") or self.state.shutting_down: return
try:
cp = getattr(self, "control_panel", None)
combo = getattr(cp, "sar_size_combo", None) if cp else None
if not combo: return
size_str = combo.get()
factor = 1
if size_str and ":" in size_str:
try:
factor_val = int(size_str.split(":")[1])
factor = max(1, factor_val)
except (ValueError, IndexError): factor = 1
w = max(1, config.SAR_WIDTH // factor)
h = max(1, config.SAR_HEIGHT // factor)
if w != self.state.sar_display_width or h != self.state.sar_display_height:
self.state.update_sar_display_size(w, h)
self._trigger_sar_update()
except Exception as e:
logging.exception(f"{log_prefix} Error updating SAR size: {e}")
def update_contrast(self, value_str: str):
if self.state.shutting_down: return
try:
contrast = float(value_str)
self.state.update_sar_parameters(contrast=contrast)
self.update_brightness_contrast_lut()
self._trigger_sar_update()
except ValueError: logging.warning(f"[App CB SAR Contrast] Invalid contrast value: {value_str}")
except Exception as e: logging.exception(f"[App CB SAR Contrast] Error updating contrast: {e}")
def update_brightness(self, value_str: str):
if self.state.shutting_down: return
try:
brightness = int(float(value_str))
self.state.update_sar_parameters(brightness=brightness)
self.update_brightness_contrast_lut()
self._trigger_sar_update()
except ValueError: logging.warning(f"[App CB SAR Brightness] Invalid brightness value: {value_str}")
except Exception as e: logging.exception(f"[App CB SAR Brightness] Error updating brightness: {e}")
def update_sar_palette(self, event=None):
if self.state.shutting_down: return
try:
cp = getattr(self, "control_panel", None)
combo = getattr(cp, "palette_combo", None) if cp else None
if not combo: return
palette = combo.get()
if palette in config.COLOR_PALETTES:
self.state.update_sar_parameters(palette=palette)
self._trigger_sar_update()
else:
logging.warning(f"[App CB SAR Palette] Invalid palette selected: {palette}. Reverting.")
combo.set(self.state.sar_palette)
except Exception as e:
logging.exception(f"[App CB SAR Palette] Error updating palette: {e}")
def update_mfd_category_intensity(self, category_name: str, intensity_value: int):
if self.state.shutting_down: return
try:
if category_name in self.state.mfd_params["categories"]:
clamped_value = np.clip(intensity_value, 0, 255)
self.state.mfd_params["categories"][category_name]["intensity"] = clamped_value
self.update_mfd_lut()
self._trigger_mfd_update()
else:
logging.warning(f"[App CB MFD Intensity] Unknown category: {category_name}")
except Exception as e:
logging.exception(f"[App CB MFD Intensity] Error updating intensity for '{category_name}': {e}")
def choose_mfd_category_color(self, category_name: str):
from tkinter import colorchooser
if self.state.shutting_down: return
if category_name not in self.state.mfd_params["categories"]:
logging.warning(f"[App CB MFD Color] Unknown category: {category_name}")
return
try:
initial_bgr = self.state.mfd_params["categories"][category_name]["color"]
initial_hex = f"#{initial_bgr[2]:02x}{initial_bgr[1]:02x}{initial_bgr[0]:02x}"
color_code = colorchooser.askcolor(title=f"Select Color for {category_name}", initialcolor=initial_hex)
if color_code and color_code[0]:
rgb = color_code[0]
new_bgr = tuple(np.clip(int(c), 0, 255) for c in (rgb[2], rgb[1], rgb[0]))
self.state.mfd_params["categories"][category_name]["color"] = new_bgr
self.update_mfd_lut()
cp = getattr(self, "control_panel", None)
if self.root and self.root.winfo_exists() and cp and hasattr(cp, "update_mfd_color_display"):
self.root.after_idle(cp.update_mfd_color_display, category_name, new_bgr)
self._trigger_mfd_update()
except Exception as e:
logging.exception(f"[App CB MFD Color] Error choosing color for '{category_name}': {e}")
def update_mfd_raw_map_intensity(self, intensity_value: int):
if self.state.shutting_down: return
try:
clamped_value = np.clip(intensity_value, 0, 255)
self.state.mfd_params["raw_map_intensity"] = clamped_value
self.update_mfd_lut()
self._trigger_mfd_update()
except Exception as e:
logging.exception(f"[App CB MFD RawMap] Error updating raw map intensity: {e}")
def update_map_size(self, event=None):
if not hasattr(self, "state") or self.state.shutting_down: return
try:
cp = getattr(self, "control_panel", None)
combo = getattr(cp, "map_size_combo", None) if cp else None
if not combo: return
self.state.update_map_scale_factor(combo.get())
self.trigger_map_redraw()
except Exception as e:
logging.exception(f"[App CB Map Size] Error updating map size: {e}")
def toggle_sar_overlay(self):
if not hasattr(self, "state") or self.state.shutting_down: return
try:
cp = getattr(self, "control_panel", None)
var = getattr(cp, "sar_overlay_var", None) if cp else None
if not var or not isinstance(var, tk.Variable): return
self.state.update_map_overlay_params(enabled=var.get())
self.trigger_map_redraw(full_update=False)
except Exception as e:
logging.exception(f"[App CB Overlay Toggle] Error toggling overlay: {e}")
def on_alpha_slider_release(self, event=None):
log_prefix = "[App CB SAR Alpha Release]"
if not hasattr(self, "state") or self.state.shutting_down: return
try:
cp = getattr(self, "control_panel", None)
var = getattr(cp, "sar_overlay_alpha_var", None) if cp else None
if not var or not isinstance(var, tk.Variable): return
final_alpha = var.get()
self.state.update_map_overlay_params(alpha=final_alpha)
self.trigger_map_redraw(full_update=False)
except Exception as e:
logging.exception(f"{log_prefix} Error handling alpha slider release: {e}")
def toggle_sar_recording(self):
if not hasattr(self, "state") or self.state.shutting_down: return
try:
cp = getattr(self, "control_panel", None)
var = getattr(cp, "record_sar_var", None) if cp else None
if not var or not isinstance(var, tk.Variable): return
self.state.update_sar_recording_enabled(enabled=var.get())
except Exception as e:
logging.exception(f"[App CB Record Toggle] Error toggling recording: {e}")
def apply_sar_overlay_shift(self):
log_prefix = "[App CB Apply Shift]"
if not hasattr(self, "state") or self.state.shutting_down: return
cp = getattr(self, "control_panel", None)
if not cp: return
lat_str, lon_str = cp.sar_lat_shift_var.get(), cp.sar_lon_shift_var.get()
try:
lat_shift, lon_shift = float(lat_str), float(lon_str)
if not (-90.0 <= lat_shift <= 90.0): raise ValueError("Latitude shift out of range (-90 to 90)")
if not (-180.0 <= lon_shift <= 180.0): raise ValueError("Longitude shift out of range (-180 to 180)")
self.state.update_sar_overlay_shift(lat_shift, lon_shift)
self.trigger_map_redraw(full_update=True)
self.set_status("Applied SAR overlay shift.")
except ValueError as ve:
self.set_status(f"Error: Invalid shift - {ve}")
except Exception as e:
self.set_status("Error applying shift.")
def save_current_map_view(self):
log_prefix = "[App CB Save Map]"
if not hasattr(self, "state") or self.state.shutting_down: return
mgr = getattr(self, "map_integration_manager", None)
if not mgr:
self.set_status("Error: Map components not ready.")
return
if self.state.last_composed_map_pil is None:
self.set_status("Error: No map view to save.")
return
try:
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
init_fn = f"map_view_{ts}.png"
file_path = fd.asksaveasfilename(
title="Save Map View As", initialfile=init_fn, defaultextension=".png",
filetypes=[("PNG files", "*.png"), ("All files", "*.*")]
)
if file_path:
mgr.save_map_view_to_file(directory=os.path.dirname(file_path), filename=Path(file_path).stem)
else:
self.set_status("Save map view cancelled.")
except Exception as e:
self.set_status("Error saving map view.")
def go_to_google_maps(self, coord_source: str):
log_prefix = "[App CB Go Gmaps]"
if not hasattr(self, "state") or self.state.shutting_down: return
cp = getattr(self, "control_panel", None)
if not cp: return
coords_text, source_desc, lat_deg, lon_deg = None, "Unknown", None, None
try:
if coord_source == "sar_center":
coords_text, source_desc = cp.sar_center_coords_var.get(), "SAR Center"
elif coord_source == "sar_mouse":
coords_text, source_desc = cp.mouse_coords_var.get(), "SAR Mouse"
elif coord_source == "map_mouse":
coords_text, source_desc = cp.map_mouse_coords_var.get(), "Map Mouse"
else: return
if not coords_text or any(s in coords_text for s in ["N/A", "Error", "Invalid"]):
self.set_status(f"Error: No valid coordinates for {source_desc}.")
return
lon_sep = ", Lon="
if lon_sep in coords_text:
parts = coords_text.split(lon_sep, 1)
lat_dms_str = parts[0].split("=", 1)[1].strip() if "Lat=" in parts[0] else None
lon_dms_str = parts[1].strip()
if lat_dms_str and lon_dms_str:
lat_deg = dms_string_to_decimal(lat_dms_str, is_latitude=True)
lon_deg = dms_string_to_decimal(lon_dms_str, is_latitude=False)
else: raise ValueError("Could not extract Lat/Lon parts")
else: raise ValueError("Separator ', Lon=' not found")
if lat_deg is None or lon_deg is None:
self.set_status(f"Error: Cannot parse coordinates for {source_desc}.")
return
open_google_maps(lat_deg, lon_deg)
except ValueError as ve:
self.set_status(f"Error parsing coords for {source_desc}.")
except Exception:
self.set_status(f"Error opening map for {source_desc}.")
def go_to_google_earth(self, coord_source: str):
log_prefix = "[App CB Go GEarth]"
if not hasattr(self, "state") or self.state.shutting_down: return
if not _simplekml_available:
self.set_status("Error: KML library missing.")
return
control_panel_ref = getattr(self, "control_panel", None)
if not control_panel_ref: return
coords_text, source_desc, lat_deg, lon_deg, placemark_name = None, "Unknown", None, None, "Selected Location"
try:
if coord_source == "sar_center":
coords_text, source_desc, placemark_name = control_panel_ref.sar_center_coords_var.get(), "SAR Center", "SAR Center"
elif coord_source == "sar_mouse":
coords_text, source_desc, placemark_name = control_panel_ref.mouse_coords_var.get(), "SAR Mouse", "Mouse on SAR"
elif coord_source == "map_mouse":
coords_text, source_desc, placemark_name = control_panel_ref.map_mouse_coords_var.get(), "Map Mouse", "Mouse on Map"
else: return
if not coords_text or any(s in coords_text for s in ["N/A", "Error", "Invalid"]):
self.set_status(f"Error: No valid coordinates for {source_desc}.")
return
lon_sep = ", Lon="
if lon_sep in coords_text:
parts = coords_text.split(lon_sep, 1)
lat_dms_str = parts[0].split("=", 1)[1].strip() if "Lat=" in parts[0] else None
lon_dms_str = parts[1].strip()
if lat_dms_str and lon_dms_str:
lat_deg = dms_string_to_decimal(lat_dms_str, is_latitude=True)
lon_deg = dms_string_to_decimal(lon_dms_str, is_latitude=False)
else: raise ValueError("Could not split Lat/Lon parts")
else: raise ValueError("Separator ', Lon=' not found")
if lat_deg is None or lon_deg is None:
self.set_status(f"Error: Cannot parse coordinates for {source_desc}.")
return
temp_kml_path = generate_lookat_and_point_kml(
latitude_deg=lat_deg, longitude_deg=lon_deg, placemark_name=placemark_name,
placemark_desc=f"Source: {source_desc}\nCoords: {coords_text}"
)
if temp_kml_path: launch_google_earth(temp_kml_path)
else: self.set_status("Error: Failed to create KML.")
except ValueError as ve:
self.set_status(f"Error parsing coords for {source_desc}.")
except Exception:
self.set_status(f"Error launching GE for {source_desc}.")
def go_to_all_gearth(self):
log_prefix = "[App CB GE All]"
if not hasattr(self, "state") or self.state.shutting_down: return
if not _simplekml_available or not _pyproj_available:
self.set_status("Error: KML library missing.")
return
control_panel_ref = getattr(self, "control_panel", None)
if not control_panel_ref: return
points_to_plot: List[Tuple[float, float, str, Optional[str]]] = []
source_map = {"SAR Center": ("SAR Center", control_panel_ref.sar_center_coords_var), "SAR Mouse": ("Mouse on SAR", control_panel_ref.mouse_coords_var), "Map Mouse": ("Mouse on Map", control_panel_ref.map_mouse_coords_var)}
for internal_name, (kml_name, tk_var) in source_map.items():
coords_text = tk_var.get()
lat_deg, lon_deg = None, None
if coords_text and not any(s in coords_text for s in ["N/A", "Error", "Invalid"]):
try:
lon_sep = ", Lon="
if lon_sep in coords_text:
parts = coords_text.split(lon_sep, 1)
lat_dms_str = parts[0].split("=", 1)[1].strip() if "Lat=" in parts[0] else None
lon_dms_str = parts[1].strip()
if lat_dms_str and lon_dms_str:
lat_deg = dms_string_to_decimal(lat_dms_str, is_latitude=True)
lon_deg = dms_string_to_decimal(lon_dms_str, is_latitude=False)
else: raise ValueError("Could not split Lat/Lon parts")
else: raise ValueError("Separator ', Lon=' not found")
if lat_deg is not None and lon_deg is not None:
points_to_plot.append((lat_deg, lon_deg, kml_name, f"Source: {internal_name}\nCoords: {coords_text}"))
except Exception: pass
sar_normalized_uint8, geo_info = self.state.current_sar_normalized, self.state.current_sar_geo_info
if sar_normalized_uint8 is None or sar_normalized_uint8.size == 0:
self.set_status("Error: SAR image data missing.")
return
if not geo_info or not geo_info.get("valid", False):
self.set_status("Error: SAR GeoInfo missing.")
return
bc_lut, palette = self.state.brightness_contrast_lut, self.state.sar_palette
if bc_lut is None:
self.set_status("Error: SAR LUT missing.")
return
try:
img_for_kml = cv2.LUT(sar_normalized_uint8.copy(), bc_lut)
if palette != "GRAY": img_for_kml = apply_color_palette(img_for_kml, palette)
elif img_for_kml.ndim == 2: img_for_kml = cv2.cvtColor(img_for_kml, cv2.COLOR_GRAY2BGR)
if img_for_kml is None: raise ValueError("Image processing for KML resulted in None.")
except Exception:
self.set_status("Error processing SAR image.")
return
try:
composite_kml_path = generate_composite_kml(points_to_plot, img_for_kml, geo_info)
if composite_kml_path:
launch_google_earth(composite_kml_path)
self.set_status("Launched Google Earth with composite view.")
cleanup_kml_output_directory(config.KML_OUTPUT_DIRECTORY, config.MAX_KML_FILES)
else:
self.set_status("Error: Failed to create KML.")
except Exception:
self.set_status("Error during GE All generation.")
def toggle_sar_metadata_display(self):
log_prefix = "[App CB MetaToggle]"
if not hasattr(self, "state") or self.state.shutting_down: return
cp, var, metadata_frame, container = getattr(self, "control_panel", None), getattr(self.control_panel, "show_meta_var", None) if hasattr(self, "control_panel") else None, getattr(self, "metadata_frame", None), getattr(self, "container_frame", None)
if not var or not cp or not metadata_frame or not container: return
try:
is_enabled = var.get()
if bool(is_enabled) == self.state.display_sar_metadata: return
self.state.display_sar_metadata = bool(is_enabled)
current_height = self.root.winfo_height()
if is_enabled:
metadata_frame.grid(row=0, column=1, sticky="nsew", padx=(5, 5), pady=(0, 0))
container.columnconfigure(1, weight=1)
new_width = self.expanded_window_width
self.root.geometry(f"{new_width}x{current_height}")
self.root.minsize(new_width, config.TKINTER_MIN_HEIGHT)
if self.state.last_sar_metadata_str: self.set_metadata_display(self.state.last_sar_metadata_str)
else: self.set_metadata_display("<Waiting for SAR metadata...>")
else:
metadata_frame.grid_remove()
container.columnconfigure(1, weight=0)
new_width = self.original_window_width
self.root.geometry(f"{new_width}x{current_height}")
self.root.minsize(new_width, config.TKINTER_MIN_HEIGHT)
self.set_metadata_display("Enable 'Show SAR Metadata' checkbox to view data...")
except Exception as e:
logging.exception(f"{log_prefix} Error toggling metadata display: {e}")
def _get_screen_dimensions(self) -> Tuple[int, int]:
try:
screen = screeninfo.get_monitors()[0]
return screen.width, screen.height
except Exception: return 1920, 1080
def _calculate_initial_sar_size(self, desired_factor_if_map: int = 4) -> Tuple[int, int]:
initial_w, initial_h = self.state.sar_display_width, self.state.sar_display_height
if config.ENABLE_MAP_OVERLAY and MAP_MODULES_LOADED:
forced_factor = max(1, desired_factor_if_map)
initial_w, initial_h = config.SAR_WIDTH // forced_factor, config.SAR_HEIGHT // forced_factor
if initial_w != self.state.sar_display_width or initial_h != self.state.sar_display_height:
self.state.update_sar_display_size(initial_w, initial_h)
return initial_w, initial_h
def _calculate_tkinter_position(self, screen_h: int) -> Tuple[int, int]:
x = 10
y = config.INITIAL_MFD_HEIGHT + 40
if y + config.TKINTER_MIN_HEIGHT > screen_h: y = max(10, screen_h - config.TKINTER_MIN_HEIGHT - 10)
return x, y
def _calculate_mfd_position(self) -> Tuple[int, int]:
return self.tkinter_x, 10
def _calculate_sar_position(self, screen_w: int, initial_sar_w: int) -> Tuple[int, int]:
x = self.tkinter_x + self.original_window_width + 20
if x + initial_sar_w > screen_w: x = max(10, screen_w - initial_sar_w - 10)
return x, 10
def _calculate_map_position(self, screen_w: int, current_sar_w: int, max_map_width: int) -> Tuple[int, int]:
x = self.sar_x + current_sar_w + 20
if x + max_map_width > screen_w: x = max(10, screen_w - max_map_width - 10)
return x, 10
def _start_initial_image_loader(self):
if config.USE_LOCAL_IMAGES or config.ENABLE_TEST_MODE:
image_loading_thread = threading.Thread(target=self.load_initial_images, name="ImageLoaderThread", daemon=True)
image_loading_thread.start()
elif self.root and self.root.winfo_exists():
self.root.after_idle(self._set_initial_display_from_loaded_data)
def _update_initial_ui_display(self):
control_panel_ref = getattr(self, "control_panel", None)
if not control_panel_ref: return
try:
control_panel_ref.set_sar_center_coords("N/A", "N/A")
control_panel_ref.set_sar_orientation("N/A")
control_panel_ref.set_sar_size_km("N/A")
control_panel_ref.set_mouse_coordinates("N/A", "N/A")
control_panel_ref.set_map_mouse_coordinates("N/A", "N/A")
initial_stats = self.state.get_statistics()
drop_txt = f"Drop(Q): S={initial_stats['dropped_sar_q']},M={initial_stats['dropped_mfd_q']},Tk={initial_stats['dropped_tk_q']},Mo={initial_stats['dropped_mouse_q']}"
incmpl_txt = f"Incmpl(RX): S={initial_stats['incomplete_sar_rx']},M={initial_stats['incomplete_mfd_rx']}"
control_panel_ref.set_statistics_display(drop_txt, incmpl_txt)
except tk.TclError: pass
except Exception: pass
def load_initial_images(self):
if self.state.shutting_down: return
if self.root and self.root.winfo_exists(): self.root.after_idle(self.set_status, "Loading initial images...")
try:
if config.ENABLE_TEST_MODE or self.state.test_mode_active:
if hasattr(self, "test_mode_manager") and self.test_mode_manager:
self.test_mode_manager._ensure_test_images()
if config.USE_LOCAL_IMAGES:
self._load_local_mfd_image()
self._load_local_sar_image()
if self.root and self.root.winfo_exists():
self.root.after_idle(self._set_initial_display_from_loaded_data)
except Exception:
if self.root and self.root.winfo_exists(): self.root.after_idle(self.set_status, "Error Loading Images")
def _load_local_mfd_image(self):
default_indices = np.random.randint(0, 256, (config.MFD_HEIGHT, config.MFD_WIDTH), dtype=np.uint8)
try:
mfd_path = getattr(config, "MFD_IMAGE_PATH", "local_mfd_indices.png")
self.state.local_mfd_image_data_indices = default_indices
except Exception:
self.state.local_mfd_image_data_indices = default_indices
def _load_local_sar_image(self):
default_raw_data = np.zeros((config.SAR_HEIGHT, config.SAR_WIDTH), dtype=config.SAR_DATA_TYPE)
try:
sar_path = getattr(config, "SAR_IMAGE_PATH", "local_sar_image.tif")
loaded_raw_data = load_image(sar_path, config.SAR_DATA_TYPE)
if loaded_raw_data is not None and loaded_raw_data.size > 0: self.state.local_sar_image_data_raw = loaded_raw_data
else: self.state.local_sar_image_data_raw = default_raw_data
except Exception:
self.state.local_sar_image_data_raw = default_raw_data
def _set_initial_display_from_loaded_data(self):
if self.state.shutting_down: return
is_test, is_local = self.state.test_mode_active, config.USE_LOCAL_IMAGES
if not is_test and is_local:
if self.state.local_mfd_image_data_indices is not None:
if hasattr(self, "image_pipeline") and self.image_pipeline:
self.state.current_mfd_indices = self.state.local_mfd_image_data_indices.copy()
self.image_pipeline.process_mfd_for_display()
if self.state.local_sar_image_data_raw is not None: self.set_initial_sar_image(self.state.local_sar_image_data_raw)
else: self.set_initial_sar_image(None)
elif is_test: pass
else: self._show_network_placeholders()
map_loading = False
mgr = getattr(self, "map_integration_manager", None)
if mgr:
thread_attr = getattr(mgr, "_map_initial_display_thread", None)
if thread_attr and isinstance(thread_attr, threading.Thread): map_loading = thread_attr.is_alive()
if not map_loading:
status = ""
if is_test: status = "Ready (Test Mode)"
elif is_local: status = "Ready (Local Mode)"
else: status = f"Listening UDP {self.local_ip}:{self.local_port}" if hasattr(self, 'sfp_transport') and self.sfp_transport else "Error: No Transport"
self.set_status(status)
def set_initial_sar_image(self, raw_image_data: Optional[np.ndarray]):
if self.state.shutting_down: return
normalized = normalize_image(raw_image_data, target_type=np.uint8) if raw_image_data is not None and raw_image_data.size > 0 else None
if normalized is None:
if self.state.current_sar_normalized is None: self.state.current_sar_normalized = np.zeros((config.SAR_HEIGHT, config.SAR_WIDTH), dtype=np.uint8)
self.state.current_sar_normalized.fill(0)
else: self.state.current_sar_normalized = normalized
self.state.current_sar_geo_info = self.state._initialize_geo_info()
self._reset_ui_geo_info()
if hasattr(self, "image_pipeline") and self.image_pipeline: self.image_pipeline.process_sar_for_display()
def activate_test_mode_ui_actions(self):
self.set_status("Activating Test Mode...")
self._reset_ui_geo_info()
clear_queue(self.mfd_queue)
clear_queue(self.sar_queue)
self.set_status("Ready (Test Mode)")
def deactivate_test_mode_ui_actions(self):
self.set_status("Activating Normal Mode...")
clear_queue(self.mfd_queue)
clear_queue(self.sar_queue)
self._reset_ui_geo_info()
if config.USE_LOCAL_IMAGES:
if self.state.local_mfd_image_data_indices is not None:
self.state.current_mfd_indices = self.state.local_mfd_image_data_indices.copy()
if hasattr(self, "image_pipeline"): self.image_pipeline.process_mfd_for_display()
if self.state.local_sar_image_data_raw is not None: self.set_initial_sar_image(self.state.local_sar_image_data_raw)
else: self.set_initial_sar_image(None)
self.set_status("Ready (Local Mode)")
else:
self._show_network_placeholders()
status = f"Listening UDP {self.local_ip}:{self.local_port}" if hasattr(self, 'sfp_transport') and self.sfp_transport else "Error: No Transport"
self.set_status(status)
def _reset_ui_geo_info(self):
cp = getattr(self, "control_panel", None)
if self.root and self.root.winfo_exists() and cp:
self.root.after_idle(lambda: cp.set_sar_orientation("N/A"))
self.root.after_idle(lambda: cp.set_mouse_coordinates("N/A", "N/A"))
self.root.after_idle(lambda: cp.set_map_mouse_coordinates("N/A", "N/A"))
self.root.after_idle(lambda: cp.set_sar_center_coords("N/A", "N/A"))
self.root.after_idle(lambda: cp.set_sar_size_km("N/A"))
def _revert_test_mode_ui(self):
cp = getattr(self, "control_panel", None)
var = getattr(cp, "test_image_var", None) if cp else None
if self.root and self.root.winfo_exists() and var:
try: self.root.after_idle(var.set, 0)
except Exception: pass
if hasattr(self, "state"): self.state.test_mode_active = False
def _show_network_placeholders(self):
try:
ph_mfd = np.full((config.INITIAL_MFD_HEIGHT, config.INITIAL_MFD_WIDTH, 3), 30, dtype=np.uint8)
ph_sar = np.full((self.state.sar_display_height, self.state.sar_display_width, 3), 60, dtype=np.uint8)
put_queue(self.mfd_queue, ph_mfd, "mfd", self)
put_queue(self.sar_queue, ph_sar, "sar", self)
except Exception: pass
def trigger_map_redraw(self, full_update: bool = False):
if self.state.shutting_down: return
mgr = getattr(self, "map_integration_manager", None)
if not (config.ENABLE_MAP_OVERLAY and mgr): return
if full_update:
self._trigger_map_update_from_sar()
else:
can_recompose = self.state.last_processed_sar_for_overlay is not None and self.state.last_sar_warp_matrix is not None and self.state.last_map_image_pil is not None
if can_recompose:
put_queue(self.tkinter_queue, ("REDRAW_MAP", None), "tkinter", self)
else:
self._trigger_map_update_from_sar()
def handle_new_sar_data(self, normalized_image_uint8: np.ndarray, geo_info_radians: Dict[str, Any]):
if self.state.shutting_down: return
self.state.set_sar_data(normalized_image_uint8, geo_info_radians)
if self.root and self.root.winfo_exists(): self.root.after_idle(self._process_sar_update_on_main_thread)
def handle_new_mfd_data(self, image_indices: np.ndarray):
if self.state.shutting_down: return
self.state.set_mfd_indices(image_indices)
if self.root and self.root.winfo_exists(): self.root.after_idle(self._process_mfd_update_on_main_thread)
def _process_sar_update_on_main_thread(self):
if self.state.shutting_down: return
self._update_sar_ui_labels()
if hasattr(self, "image_pipeline") and self.image_pipeline: self.image_pipeline.process_sar_for_display()
self._trigger_map_update_from_sar()
geo_info = self.state.current_sar_geo_info
if geo_info and geo_info.get("valid") and config.ENABLE_KML_GENERATION: self._handle_kml_generation(geo_info)
self._update_fps_stats("sar")
def _handle_kml_generation(self, geo_info):
if not config.ENABLE_AUTO_SAR_KML_GENERATION: return
if not config.ENABLE_KML_GENERATION: return
if not _simplekml_available or not _pyproj_available: return
try:
kml_dir = config.KML_OUTPUT_DIRECTORY
os.makedirs(kml_dir, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
fp = os.path.join(kml_dir, f"sar_footprint_{ts}.kml")
if generate_sar_kml(geo_info, fp):
cleanup_kml_output_directory(config.KML_OUTPUT_DIRECTORY, config.MAX_KML_FILES)
if config.AUTO_LAUNCH_GOOGLE_EARTH: launch_google_earth(fp)
except Exception: pass
def _process_mfd_update_on_main_thread(self):
if self.state.shutting_down: return
if hasattr(self, "image_pipeline") and self.image_pipeline: self.image_pipeline.process_mfd_for_display()
self._update_fps_stats("mfd")
def _update_sar_ui_labels(self):
cp = getattr(self, "control_panel", None)
geo = self.state.current_sar_geo_info
lat_s, lon_s, orient_s, size_s = "N/A", "N/A", "N/A", "N/A"
is_valid_geo_for_ui = geo and geo.get("valid")
lat_deg_for_elevation, lon_deg_for_elevation = None, None
if is_valid_geo_for_ui:
try:
lat_d, lon_d = math.degrees(geo["lat"]), math.degrees(geo["lon"])
lat_deg_for_elevation, lon_deg_for_elevation = lat_d, lon_d
orient_d = math.degrees(geo.get("orientation", 0.0))
lat_s, lon_s = decimal_to_dms(lat_d, True), decimal_to_dms(lon_d, False)
orient_s = f"{orient_d:.2f}°"
scale_x, width_px, scale_y, height_px = geo.get("scale_x", 0.0), geo.get("width_px", 0), geo.get("scale_y", 0.0), geo.get("height_px", 0)
if scale_x > 0 and width_px > 0 and scale_y > 0 and height_px > 0:
size_w_km, size_h_km = (scale_x * width_px) / 1000.0, (scale_y * height_px) / 1000.0
size_s = f"W: {size_w_km:.1f} km, H: {size_h_km:.1f} km"
except Exception:
lat_s, lon_s, orient_s, size_s = "Error", "Error", "Error", "Error"
is_valid_geo_for_ui, lat_deg_for_elevation, lon_deg_for_elevation = False, None, None
try:
if cp and cp.winfo_exists():
cp.set_sar_center_coords(lat_s, lon_s)
cp.set_sar_orientation(orient_s)
cp.set_sar_size_km(size_s)
except Exception: pass
if is_valid_geo_for_ui and lat_deg_for_elevation is not None and lon_deg_for_elevation is not None:
self.request_elevation_for_point(source_description=config.ELEVATION_SOURCE_SAR_CENTER, latitude=lat_deg_for_elevation, longitude=lon_deg_for_elevation)
else:
self._update_elevation_display(config.ELEVATION_SOURCE_SAR_CENTER, "GeoInfo N/A")
if not is_valid_geo_for_ui:
try:
if cp and cp.winfo_exists():
cp.set_mouse_coordinates("N/A", "N/A")
cp.set_map_mouse_coordinates("N/A", "N/A")
self._update_elevation_display(config.ELEVATION_SOURCE_SAR_MOUSE, "GeoInfo N/A")
self._update_elevation_display(config.ELEVATION_SOURCE_MAP_MOUSE, "GeoInfo N/A")
except Exception: pass
def _update_fps_stats(self, img_type: str):
now = time.time()
try:
if img_type == "sar":
self.state.sar_frame_count += 1
elapsed = now - self.state.sar_update_time
if elapsed >= config.LOG_UPDATE_INTERVAL:
self.state.sar_fps = self.state.sar_frame_count / elapsed
self.state.sar_update_time, self.state.sar_frame_count = now, 0
elif img_type == "mfd":
self.state.mfd_frame_count += 1
elapsed = now - self.state.mfd_start_time
if elapsed >= config.LOG_UPDATE_INTERVAL:
self.state.mfd_fps = self.state.mfd_frame_count / elapsed
self.state.mfd_start_time, self.state.mfd_frame_count = now, 0
except ZeroDivisionError: pass
except Exception: pass
def _trigger_sar_update(self):
if self.state.shutting_down or self.state.test_mode_active: return
if hasattr(self, "image_pipeline") and self.image_pipeline: self.image_pipeline.process_sar_for_display()
def _trigger_mfd_update(self):
if self.state.shutting_down or self.state.test_mode_active: return
if hasattr(self, "image_pipeline") and self.image_pipeline: self.image_pipeline.process_mfd_for_display()
def _trigger_map_update_from_sar(self):
mgr = getattr(self, "map_integration_manager", None)
if self.state.shutting_down or not config.ENABLE_MAP_OVERLAY or not mgr: return
geo, sar = self.state.current_sar_geo_info, self.state.current_sar_normalized
if geo and geo.get("valid") and sar is not None and sar.size > 0:
try: mgr.update_map_overlay(sar, geo)
except Exception: pass
def schedule_periodic_updates(self):
if self.state.shutting_down: return
self.update_status()
interval_ms = max(100, int(config.LOG_UPDATE_INTERVAL * 1000))
if self.root and self.root.winfo_exists():
try: self.root.after(interval_ms, self.schedule_periodic_updates)
except Exception: pass
def process_sar_queue(self):
image_to_display = None
if self.state.shutting_down: return
try:
image_to_display = self.sar_queue.get(block=False)
self.sar_queue.task_done()
except queue.Empty: pass
except Exception: pass
if image_to_display is not None:
display_mgr = getattr(self, "display_manager", None)
if display_mgr:
try: display_mgr.show_sar_image(image_to_display)
except Exception: pass
if not self.state.shutting_down: self._reschedule_queue_processor(self.process_sar_queue)
def process_mfd_queue(self):
image_to_display = None
if self.state.shutting_down: return
try:
image_to_display = self.mfd_queue.get(block=False)
self.mfd_queue.task_done()
except queue.Empty: pass
except Exception: pass
if image_to_display is not None:
display_mgr = getattr(self, "display_manager", None)
if display_mgr:
try: display_mgr.show_mfd_image(image_to_display)
except Exception: pass
if not self.state.shutting_down: self._reschedule_queue_processor(self.process_mfd_queue)
def process_tkinter_queue(self):
if self.state.shutting_down: return
item: Optional[Tuple[str, Any]] = None
try:
item = self.tkinter_queue.get(block=False)
self.tkinter_queue.task_done()
except queue.Empty: pass
except Exception: item = None
if item is not None:
try:
if isinstance(item, tuple) and len(item) == 2:
command, payload = item
if command == "MOUSE_COORDS": self._handle_sar_mouse_coords_update(payload)
elif command == "MAP_MOUSE_COORDS": self._handle_map_mouse_coords_update(payload)
elif command == "SHOW_MAP": self._handle_show_map_update(payload)
elif command == "REDRAW_MAP": self._handle_redraw_map_command()
elif command == "SAR_CLICK_UPDATE": self._handle_sar_click_update(payload)
elif command == "MAP_CLICK_UPDATE": self._handle_map_click_update(payload)
elif command == "SAR_METADATA_UPDATE": self._handle_sar_metadata_update(payload)
elif command == "ELEVATION_RESULT": self._handle_elevation_result(payload)
except Exception: pass
if not self.state.shutting_down: self._reschedule_queue_processor(self.process_tkinter_queue, delay=50)
def _handle_sar_mouse_coords_update(self, payload: Optional[Tuple[str, str]]):
cp = getattr(self, "control_panel", None)
if not cp: return
lat_s, lon_s = payload if (payload and isinstance(payload, tuple) and len(payload) == 2) else ("N/A", "N/A")
try: cp.set_mouse_coordinates(lat_s, lon_s)
except Exception: pass
def _handle_map_mouse_coords_update(self, payload: Optional[Tuple[int, int]]):
lat_s, lon_s = ("N/A", "N/A")
cp = getattr(self, "control_panel", None)
if cp and payload and isinstance(payload, tuple) and len(payload) == 2:
mgr = getattr(self, "map_integration_manager", None)
if mgr:
geo_coords = mgr.get_geo_coords_from_map_pixel(payload[0], payload[1])
if geo_coords:
lat_s_calc, lon_s_calc = decimal_to_dms(geo_coords[0], True), decimal_to_dms(geo_coords[1], False)
if "Error" not in lat_s_calc and "Invalid" not in lat_s_calc and "Error" not in lon_s_calc and "Invalid" not in lon_s_calc:
lat_s, lon_s = lat_s_calc, lon_s_calc
else: lat_s, lon_s = "Error DMS", "Error DMS"
try: cp.set_map_mouse_coordinates(lat_s, lon_s)
except Exception: pass
elif cp:
try: cp.set_map_mouse_coordinates("N/A", "N/A")
except Exception: pass
def _handle_elevation_result(self, payload: Dict[str, Any]):
if self.state.shutting_down: return
try:
source_desc = payload.get("source_description", "UnknownPoint")
display_text_for_ui = payload.get("display_text", "Elev: Error")
self._update_elevation_display(source_desc, display_text_for_ui)
except Exception: pass
def _handle_show_map_update(self, payload: Optional[ImageType]):
mgr = getattr(self, "map_integration_manager", None)
if mgr:
try: mgr.display_map(payload)
except Exception: pass
def _handle_redraw_map_command(self):
mgr = getattr(self, "map_integration_manager", None)
if mgr:
try: mgr._recompose_map_overlay()
except Exception: pass
def _handle_sar_click_update(self, payload: Optional[Tuple[int, int]]):
log_prefix = "[App SAR Click State]"
if payload and isinstance(payload, tuple) and len(payload) == 2:
self.state.last_sar_click_coords = payload
self._trigger_sar_update()
geo_info, disp_w, disp_h = self.state.current_sar_geo_info, self.state.sar_display_width, self.state.sar_display_height
if (geo_info and geo_info.get("valid") and disp_w > 0 and disp_h > 0 and geo_info.get("width_px", 0) > 0 and geo_info.get("height_px", 0) > 0):
try:
x_disp, y_disp = payload
orig_w, orig_h, scale_x, scale_y, ref_x, ref_y, ref_lat_rad, ref_lon_rad = geo_info["width_px"], geo_info["height_px"], geo_info["scale_x"], geo_info["scale_y"], geo_info["ref_x"], geo_info["ref_y"], geo_info["lat"], geo_info["lon"]
orig_x_f, orig_y_f = (x_disp / disp_w) * orig_w, (y_disp / disp_h) * orig_h
pixel_delta_x_f, pixel_delta_y_f = orig_x_f - ref_x, ref_y - orig_y_f
meters_delta_x, meters_delta_y = pixel_delta_x_f * scale_x, pixel_delta_y_f * scale_y
M_PER_DLAT, M_PER_DLON_EQ = 111132.954, 111319.488
m_per_dlon_val = max(abs(M_PER_DLON_EQ * math.cos(ref_lat_rad)), 1e-3)
lat_offset_deg_val, lon_offset_deg_val = meters_delta_y / M_PER_DLAT, meters_delta_x / m_per_dlon_val
final_lat_deg_val, final_lon_deg_val = math.degrees(ref_lat_rad) + lat_offset_deg_val, math.degrees(ref_lon_rad) + lon_offset_deg_val
if math.isfinite(final_lat_deg_val) and abs(final_lat_deg_val) <= 90.0 and math.isfinite(final_lon_deg_val) and abs(final_lon_deg_val) <= 180.0:
self.request_elevation_for_point(source_description=config.ELEVATION_SOURCE_SAR_MOUSE, latitude=final_lat_deg_val, longitude=final_lon_deg_val)
else: self._update_elevation_display(config.ELEVATION_SOURCE_SAR_MOUSE, "Invalid Geo")
except Exception: self._update_elevation_display(config.ELEVATION_SOURCE_SAR_MOUSE, "Geo Calc Err")
else: self._update_elevation_display(config.ELEVATION_SOURCE_SAR_MOUSE, "Geo Data N/A")
def _handle_map_click_update(self, payload: Optional[Tuple[int, int]]):
if payload and isinstance(payload, tuple) and len(payload) == 2:
self.state.last_map_click_coords = payload
self.trigger_map_redraw(full_update=False)
mgr = getattr(self, "map_integration_manager", None)
if mgr:
pixel_x, pixel_y = payload
geo_coords = mgr.get_geo_coords_from_map_pixel(pixel_x, pixel_y)
if geo_coords:
lat_deg, lon_deg = geo_coords
self.request_elevation_for_point(source_description=config.ELEVATION_SOURCE_MAP_MOUSE, latitude=lat_deg, longitude=lon_deg)
else: self._update_elevation_display(config.ELEVATION_SOURCE_MAP_MOUSE, "Geo Conv Err")
else: self._update_elevation_display(config.ELEVATION_SOURCE_MAP_MOUSE, "Map Mgr N/A")
def _handle_sar_metadata_update(self, metadata_string: Optional[str]):
if self.state.shutting_down: return
text_widget = getattr(self, "metadata_display_text", None)
if text_widget and isinstance(metadata_string, str):
self.state.last_sar_metadata_str = metadata_string
if self.state.display_sar_metadata: self.set_metadata_display(metadata_string)
elif text_widget and metadata_string is None:
self.state.last_sar_metadata_str = None
if self.state.display_sar_metadata: self.set_metadata_display("<Error retrieving metadata>")
def _reschedule_queue_processor(self, processor_func, delay: Optional[int] = None):
if self.state.shutting_down: return
if delay is None:
if processor_func in [self.process_sar_queue, self.process_mfd_queue]:
target_fps = config.MFD_FPS
calculated_delay = 1000 / (target_fps * 1.5) if target_fps > 0 else 20
delay = max(10, int(calculated_delay))
else: delay = 50
try:
if self.root and self.root.winfo_exists(): self.root.after(delay, processor_func)
except Exception: pass
def process_mouse_queue(self):
if self.state.shutting_down: return
raw_coords = None
try:
raw_coords = self.mouse_queue.get(block=False)
self.mouse_queue.task_done()
except queue.Empty: pass
except Exception: pass
if isinstance(raw_coords, tuple) and len(raw_coords) == 2:
x_disp, y_disp = raw_coords
geo, disp_w, disp_h = self.state.current_sar_geo_info, self.state.sar_display_width, self.state.sar_display_height
lat_s, lon_s = "N/A", "N/A"
is_geo_valid_for_calc = (geo and geo.get("valid") and disp_w > 0 and disp_h > 0 and geo.get("width_px", 0) > 0 and geo.get("height_px", 0) > 0 and geo.get("scale_x", 0.0) > 0 and geo.get("scale_y", 0.0) > 0 and all(k in geo for k in ["lat", "lon", "ref_x", "ref_y", "orientation"]))
if is_geo_valid_for_calc:
try:
orig_w, orig_h, scale_x, scale_y, ref_x, ref_y, ref_lat_rad, ref_lon_rad, angle_rad = geo["width_px"], geo["height_px"], geo["scale_x"], geo["scale_y"], geo["ref_x"], geo["ref_y"], geo["lat"], geo["lon"], geo.get("orientation", 0.0)
orig_x, orig_y = (x_disp / disp_w) * orig_w, (y_disp / disp_h) * orig_h
pixel_delta_x, pixel_delta_y = orig_x - ref_x, ref_y - orig_y
meters_delta_x, meters_delta_y = pixel_delta_x * scale_x, pixel_delta_y * scale_y
M_PER_DLAT, M_PER_DLON_EQ = 111132.954, 111319.488
m_per_dlon = max(abs(M_PER_DLON_EQ * math.cos(ref_lat_rad)), 1e-3)
lat_offset_deg, lon_offset_deg = meters_delta_y / M_PER_DLAT, meters_delta_x / m_per_dlon
final_lat_deg, final_lon_deg = math.degrees(ref_lat_rad) + lat_offset_deg, math.degrees(ref_lon_rad) + lon_offset_deg
lat_valid, lon_valid = math.isfinite(final_lat_deg) and abs(final_lat_deg) <= 90.0, math.isfinite(final_lon_deg) and abs(final_lon_deg) <= 180.0
if lat_valid and lon_valid:
lat_s_calc, lon_s_calc = decimal_to_dms(final_lat_deg, True), decimal_to_dms(final_lon_deg, False)
if "Error" not in lat_s_calc and "Invalid" not in lat_s_calc and "Error" not in lon_s_calc and "Invalid" not in lon_s_calc:
lat_s, lon_s = lat_s_calc, lon_s_calc
else: lat_s, lon_s = "Error DMS", "Error DMS"
else: lat_s, lon_s = "Invalid Calc", "Invalid Calc"
except KeyError: lat_s, lon_s = "Error Key", "Error Key"
except Exception: lat_s, lon_s = "Calc Error", "Calc Error"
self.put_mouse_coordinates_queue(("MOUSE_COORDS", (lat_s, lon_s)))
if not self.state.shutting_down: self._reschedule_queue_processor(self.process_mouse_queue, delay=100)
def put_mouse_coordinates_queue(self, command_payload_tuple: Tuple[str, Tuple[str, str]]):
if self.state.shutting_down: return
command, payload = command_payload_tuple
put_queue(queue_obj=self.tkinter_queue, item=(command, payload), queue_name="tkinter", app_instance=self)
def set_metadata_display(self, text: str):
text_widget = getattr(self, "metadata_display_text", None)
if not text_widget: return
try:
if text_widget.winfo_exists():
text_widget.config(state="normal")
text_widget.delete("1.0", tk.END)
text_widget.insert("1.0", text)
text_widget.config(state="disabled")
except Exception: pass
def update_status(self):
if not hasattr(self, "state") or self.state.shutting_down: return
try:
sb = getattr(self, "statusbar", None)
if sb and sb.winfo_exists() and "Loading" in sb.cget("text"): return
except Exception: pass
stats = self.state.get_statistics()
try:
mode = "Test" if self.state.test_mode_active else ("Local" if config.USE_LOCAL_IMAGES else "Network")
map_on = " MapOn" if (config.ENABLE_MAP_OVERLAY and hasattr(self, "map_integration_manager") and self.map_integration_manager) else ""
mfd_fps, sar_fps = f"MFD:{self.state.mfd_fps:.1f}fps", f"SAR:{self.state.sar_fps:.1f}fps" if self.state.sar_fps > 0 else "SAR:N/A"
status_prefix = f"Status: {mode}{map_on} | {mfd_fps} | {sar_fps}"
drop = f"Drop(Q): S={stats['dropped_sar_q']},M={stats['dropped_mfd_q']},Tk={stats['dropped_tk_q']},Mo={stats['dropped_mouse_q']}"
incmpl = f"Incmpl(RX): S={stats['incomplete_sar_rx']},M={stats['incomplete_mfd_rx']}"
if self.root and self.root.winfo_exists():
if sb and sb.winfo_exists():
self.root.after_idle(sb.set_status_text, status_prefix)
cp = getattr(self, "control_panel", None)
if cp and cp.winfo_exists():
self.root.after_idle(cp.set_statistics_display, drop, incmpl)
except Exception: pass
if __name__ == "__main__":
root, app_instance = None, None
try:
if config.ENABLE_MAP_OVERLAY and not MAP_MODULES_LOADED:
sys.exit(1)
root = create_main_window("Control Panel", config.TKINTER_MIN_WIDTH, config.TKINTER_MIN_HEIGHT, 10, 10)
app_instance = ControlPanelApp(root)
root.protocol("WM_DELETE_WINDOW", app_instance.close_app)
root.mainloop()
except SystemExit as exit_e:
exit_code = exit_e.code if isinstance(exit_e.code, int) else 1
log_level = logging.INFO if exit_code == 0 else logging.WARNING
logging.log(log_level, f"[App Main] Application exited via sys.exit({exit_code}).")
except ImportError as imp_err:
sys.exit(1)
except Exception as e:
sys.exit(1)
finally:
logging.info("=== App End ===")
logging.shutdown()