SXXXXXXX_ControlPanel/controlpanel/app_main.py
2025-05-06 11:18:50 +02:00

2569 lines
110 KiB
Python

# --- START OF FILE ControlPanel.py ---
# ControlPanel.py (Previously app.py)
"""
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED.
Main application module for the Control Panel application.
Orchestrates UI, display, network reception, image processing pipeline,
test mode management, map integration, image recording, and state management.
Initializes all sub-modules and manages the main application lifecycle.
"""
# --- Standard library imports ---
import threading
import time
import queue
import os
import logging
import math
import sys
import socket
from typing import Optional, Tuple, Any, Dict, TYPE_CHECKING, Union, List
import datetime
import tkinter.filedialog as fd
from pathlib import Path
import tkinter as tk
from tkinter import ttk
from tkinter.scrolledtext import ScrolledText
# --- Third-party imports ---
import numpy as np
import cv2
import screeninfo
# --- PIL Import and Type Definition ---
try:
from PIL import Image, ImageTk
ImageType = Image.Image # type: ignore
except ImportError:
ImageType = Any # Fallback type hint
Image = None
ImageTk = None
logging.critical(
"[App Init] Pillow library not found. Map/Image functionality will fail."
)
# --- Configuration Import ---
from controlpanel import config
# --- Logging Setup ---
try:
from controlpanel.logging_config import setup_logging
setup_logging()
except ImportError:
print("ERROR: logging_config.py not found. Using basic logging.")
logging.basicConfig(
level=logging.WARNING,
format="%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s",
)
# --- Application Modules Import ---
from controlpanel.gui.ui import ControlPanel as UIPanel, StatusBar, create_main_window
from controlpanel.gui.display import DisplayManager
from controlpanel.utils.utils import (
put_queue,
clear_queue,
decimal_to_dms,
dms_string_to_decimal,
generate_sar_kml, # Rimane invariato
launch_google_earth,
# cleanup_old_kml_files, # Rimuovere o commentare il vecchio import
cleanup_kml_output_directory, # <<< NUOVO IMPORT
open_google_maps,
generate_lookat_and_point_kml,
generate_composite_kml, # Modificato precedentemente per usare questa
_simplekml_available,
_pyproj_available,
format_ctypes_structure,
)
from controlpanel.utils.network import create_udp_socket, close_udp_socket
from controlpanel.core.receiver import UdpReceiver
from controlpanel.app_state import AppState
from controlpanel.core.test_mode_manager import TestModeManager
from controlpanel.core.image_pipeline import ImagePipeline
from controlpanel.utils.image_processing import load_image, normalize_image, apply_color_palette
from controlpanel.core.image_recorder import ImageRecorder
# --- Map related imports (Conditional) ---
map_libs_found = True
try:
import mercantile
import pyproj
if Image is None and ImageType is not Any:
raise ImportError("Pillow failed import")
except ImportError as map_lib_err:
logging.warning(
f"[App Init] Core map lib import failed ({map_lib_err}). Map disabled."
)
map_libs_found = False
BaseMapService = None
MapTileManager = None
MapDisplayWindow = None
MapIntegrationManager = None
MapCalculationError = Exception
if map_libs_found:
try:
from controlpanel.map.map_services import get_map_service, BaseMapService
from controlpanel.map.map_manager import MapTileManager
from controlpanel.map.map_utils import MapCalculationError
from controlpanel.map.map_display import MapDisplayWindow
from controlpanel.map.map_integration import MapIntegrationManager
MAP_MODULES_LOADED = True
except ImportError as map_import_err:
logging.warning(
f"[App Init] Specific map module import failed ({map_import_err}). Map disabled."
)
MAP_MODULES_LOADED = False
BaseMapService = None
MapTileManager = None
MapDisplayWindow = None
MapIntegrationManager = None
MapCalculationError = Exception
else:
MAP_MODULES_LOADED = False
# --- Main Application Class ---
class ControlPanelApp:
"""
Main application class. Manages UI, display, processing, network, state,
and orchestrates various managers.
"""
# --- Initialization Method ---
def __init__(self, root: tk.Tk):
"""Initializes the main application components and state."""
log_prefix = "[App Init]"
logging.debug(f"{log_prefix} Starting initialization...")
self.root = root
self.root.title("Control Panel")
try:
# Determine script directory safely
if getattr(sys, "frozen", False): # Running as compiled executable
script_dir = os.path.dirname(sys.executable)
elif "__file__" in locals() or "__file__" in globals(): # Running as script
script_dir = os.path.dirname(os.path.abspath(__file__))
else: # Fallback (interactive mode?)
script_dir = os.getcwd()
icon_path = os.path.join(script_dir, "ControlPanel.ico")
if os.path.exists(icon_path):
self.root.iconbitmap(default=icon_path)
logging.debug(f"{log_prefix} Icon set from: {icon_path}")
else:
logging.warning(f"{log_prefix} Icon file not found at: {icon_path}")
except Exception as icon_e:
logging.warning(f"{log_prefix} Icon error: {icon_e}")
# Initialize State
self.state = AppState()
# Initialize Queues
self.sar_queue = queue.Queue(config.DEFAULT_SAR_QUEUE)
self.mouse_queue = queue.Queue(config.DEFAULT_MOUSE_QUEUE)
self.tkinter_queue = queue.Queue(config.DEFAULT_TKINTER_QUEUE)
self.mfd_queue = queue.Queue(config.DEFAULT_MFD_QUEUE)
# --- Window Placement and Sizing ---
screen_w, screen_h = self._get_screen_dimensions()
initial_sar_w, initial_sar_h = self._calculate_initial_sar_size()
self.tkinter_x, self.tkinter_y = self._calculate_tkinter_position(screen_h)
# Store original/expanded widths for dynamic resizing
self.original_window_width = config.TKINTER_MIN_WIDTH
self.metadata_panel_width = 300 # Adjusted width example
self.expanded_window_width = (
self.original_window_width + self.metadata_panel_width + 10
)
# Set initial geometry
initial_height = max(config.TKINTER_MIN_HEIGHT, 650)
self.root.geometry(
f"{self.original_window_width}x{initial_height}+{self.tkinter_x}+{self.tkinter_y}"
)
self.root.minsize(self.original_window_width, config.TKINTER_MIN_HEIGHT)
# --- UI Structure with Grid ---
# Status bar packed at bottom first
self.statusbar = StatusBar(self.root)
self.statusbar.pack(side=tk.BOTTOM, fill=tk.X)
# Main container frame packed to fill remaining space
self.container_frame = ttk.Frame(self.root)
self.container_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True)
# Configure grid columns for the container
# Column 0 for the main control panel
self.container_frame.columnconfigure(0, weight=1)
# Column 1 for the optional metadata panel (initially no weight)
self.container_frame.columnconfigure(1, weight=0)
# Allow the row containing the panels to expand vertically
self.container_frame.rowconfigure(0, weight=1)
# Initialize ControlPanel UI (UIPanel class from ui.py)
self.control_panel = UIPanel(self.container_frame, self)
# Grid the control panel into the container's first column
self.control_panel.grid(row=0, column=0, sticky="nsew")
# --- Create Metadata Frame Structure (as attribute of self) ---
# Create metadata frame as child of the container_frame
self.metadata_frame = ttk.Labelframe(
self.container_frame, text="Raw SAR Metadata", padding=5
)
# Create inner frame for Text + Scrollbar
self.metadata_text_frame = ttk.Frame(self.metadata_frame)
self.metadata_text_frame.pack(fill=tk.BOTH, expand=True)
# Create Scrollbar
self.metadata_scrollbar = ttk.Scrollbar(
self.metadata_text_frame, orient=tk.VERTICAL
)
# Create Text widget and assign to self attribute
self.metadata_display_text = tk.Text(
self.metadata_text_frame,
wrap=tk.NONE,
state="disabled",
height=8,
yscrollcommand=self.metadata_scrollbar.set,
font=("Courier New", 8),
)
# Configure scrollbar
self.metadata_scrollbar.config(command=self.metadata_display_text.yview)
# Pack scrollbar and text widget
self.metadata_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.metadata_display_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
# Set initial placeholder text using the method on self
self.set_metadata_display("Enable 'Show SAR Metadata' checkbox to view data...")
# NOTE: metadata_frame is CREATED but NOT GRIDDED here initially
logging.debug(
f"{log_prefix} Metadata Display frame structure created but not gridded."
)
# --- Initialize Sub-systems ---
# Calculate positions for external windows
self.mfd_x, self.mfd_y = self._calculate_mfd_position()
self.sar_x, self.sar_y = self._calculate_sar_position(screen_w, initial_sar_w)
map_max_w = (
config.MAX_MAP_DISPLAY_WIDTH
if not MapDisplayWindow
else getattr(
MapDisplayWindow, "MAX_DISPLAY_WIDTH", config.MAX_MAP_DISPLAY_WIDTH
)
)
map_x, map_y = self._calculate_map_position(screen_w, initial_sar_w, map_max_w)
# Initialize Display Manager
self.display_manager = DisplayManager(
self,
self.sar_queue,
self.mouse_queue,
self.sar_x,
self.sar_y,
self.mfd_x,
self.mfd_y,
initial_sar_w, # Correct variable name
initial_sar_h, # Correct variable name
)
try:
self.display_manager.initialize_display_windows()
except Exception as e:
self.set_status("Error: Display Init Failed")
logging.critical(f"{log_prefix} Display init failed: {e}", exc_info=True)
# Call LUT updates AFTER control_panel is initialized
self.update_brightness_contrast_lut()
self.update_mfd_lut()
# Initialize Image Pipeline
self.image_pipeline = ImagePipeline(
self.state, self.sar_queue, self.mfd_queue, self
)
# Initialize Test Mode Manager
self.test_mode_manager = TestModeManager(
self.state, self.root, self.sar_queue, self.mfd_queue, self
)
# Initialize Map Manager (Conditional)
self.map_integration_manager = None
if config.ENABLE_MAP_OVERLAY:
if MAP_MODULES_LOADED and MapIntegrationManager:
try:
self.map_integration_manager = MapIntegrationManager(
self.state, self.tkinter_queue, self, map_x, map_y
)
except Exception as map_e:
logging.exception(
f"{log_prefix} MapIntegrationManager init failed:"
)
self.map_integration_manager = None
self.set_status("Error: Map Init Failed")
else:
self.set_status("Error: Map Modules Missing")
# Initialize Image Recorder (Conditional)
self.image_recorder = None
if ImageRecorder:
try:
self.image_recorder = ImageRecorder(self.state)
except Exception as rec_e:
logging.exception(f"{log_prefix} ImageRecorder init failed:")
self.image_recorder = None
# Update initial UI display (like coords, stats)
self._update_initial_ui_display()
# Network Setup
self.local_ip = config.DEFAULT_SER_IP
self.local_port = config.DEFAULT_SER_PORT
self.udp_socket = None
self.udp_receiver = None
self.udp_thread = None
if not config.USE_LOCAL_IMAGES:
self._setup_network_receiver()
else:
# Load local/test images if network disabled
self._start_initial_image_loader()
# Start loops/timers
self.process_sar_queue()
self.process_mfd_queue()
self.process_mouse_queue()
self.process_tkinter_queue()
self.schedule_periodic_updates()
# Set initial mode (Test or Normal/Local)
self.update_image_mode()
logging.info(f"{log_prefix} Application initialization complete.")
# --- Status Update Method ---
def set_status(self, message: str):
"""Safely updates the main status message prefix in the status bar via after_idle."""
log_prefix = "[App Set Status]"
if not hasattr(self, "state") or self.state.shutting_down:
return
new_status_prefix = f"Status: {message}"
def _update():
# Inner function to perform the update in the main thread
if not hasattr(self, "state") or self.state.shutting_down:
return
try:
statusbar = getattr(self, "statusbar", None)
if (
statusbar
and isinstance(statusbar, tk.Widget)
and statusbar.winfo_exists()
):
current_text: str = statusbar.cget("text")
# Preserve info after the first '|'
parts = current_text.split("|", 1) # Split only once
suffix = ""
if len(parts) > 1:
suffix = f" | {parts[1].strip()}" # Reconstruct suffix
final_text = f"{new_status_prefix}{suffix}"
# Call the specific method of the StatusBar class if available
if hasattr(statusbar, "set_status_text"):
statusbar.set_status_text(final_text)
else: # Fallback
statusbar.config(text=final_text)
except Exception as e:
logging.exception(f"{log_prefix} Error updating status bar text:")
try:
# Schedule the inner function using after_idle
if hasattr(self, "root") and self.root and self.root.winfo_exists():
self.root.after_idle(_update)
except Exception as e:
logging.warning(f"{log_prefix} Error scheduling status update: {e}")
# --- LUT Generation Methods ---
def update_brightness_contrast_lut(self):
"""Recalculates the SAR B/C LUT based on AppState and stores it back."""
log_prefix = "[App Update SAR LUT]"
if not hasattr(self, "state"):
return
try:
contrast = max(0.01, self.state.sar_contrast)
brightness = self.state.sar_brightness
# Create linear values 0-255
lut_values = np.arange(256, dtype=np.float32)
# Apply contrast and brightness
adjusted = (lut_values * contrast) + brightness
# Clip to 0-255 and convert to uint8
self.state.brightness_contrast_lut = np.clip(
np.round(adjusted), 0, 255
).astype(np.uint8)
except Exception as e:
logging.exception(f"{log_prefix} Error calculating SAR B/C LUT:")
# Provide a default identity LUT on error
self.state.brightness_contrast_lut = np.arange(256, dtype=np.uint8)
def update_mfd_lut(self):
"""Recalculates the MFD LUT based on AppState parameters and stores it back."""
log_prefix = "[MFD LUT Update]"
if not hasattr(self, "state"):
return
try:
params = self.state.mfd_params
raw_map_factor = params["raw_map_intensity"] / 255.0
pixel_map = params["pixel_to_category"]
categories = params["categories"]
# Initialize LUT with zeros (black)
new_lut = np.zeros((256, 3), dtype=np.uint8)
# Iterate through all possible pixel index values (0-255)
for idx in range(256):
cat_name = pixel_map.get(idx)
# If index belongs to a defined category
if cat_name:
cat_data = categories[cat_name]
bgr = cat_data["color"]
intensity_factor = cat_data["intensity"] / 255.0
# Calculate color based on BGR and intensity factor
new_lut[idx, 0] = np.clip(
int(round(float(bgr[0]) * intensity_factor)), 0, 255
)
new_lut[idx, 1] = np.clip(
int(round(float(bgr[1]) * intensity_factor)), 0, 255
)
new_lut[idx, 2] = np.clip(
int(round(float(bgr[2]) * intensity_factor)), 0, 255
)
# Handle raw map pixel range (32-255)
elif 32 <= idx <= 255:
# Scale index linearly from 32-255 to 0-255 intensity
raw_intensity = (float(idx) - 32.0) * (255.0 / 223.0)
# Apply the raw map intensity factor and clip
final_gray = int(
round(np.clip(raw_intensity * raw_map_factor, 0, 255))
)
# Assign gray value to all BGR channels
new_lut[idx, :] = final_gray
# Update the LUT in the application state
self.state.mfd_lut = new_lut
except Exception as e:
logging.critical(f"{log_prefix} CRITICAL MFD LUT error:", exc_info=True)
self._apply_fallback_mfd_lut()
def _apply_fallback_mfd_lut(self):
"""Applies a simple grayscale ramp as a fallback MFD LUT."""
if hasattr(self, "state"):
try:
# Create a single channel grayscale ramp
gray_ramp = np.arange(256, dtype=np.uint8)[:, np.newaxis]
# Convert grayscale ramp to 3-channel BGR
self.state.mfd_lut = cv2.cvtColor(gray_ramp, cv2.COLOR_GRAY2BGR)[
:, 0, :
] # Remove added dimension
except Exception as fb_e:
logging.critical(f"[MFD LUT Update] Fallback LUT failed: {fb_e}")
# Ultimate fallback: all black
self.state.mfd_lut = np.zeros((256, 3), dtype=np.uint8)
# --- UI Callbacks ---
def update_image_mode(self):
"""Callback for the Test Image checkbox."""
log_prefix = "[App Mode Switch]"
if (
not hasattr(self, "state")
or not hasattr(self, "test_mode_manager")
or self.state.shutting_down
):
return
try:
cp = getattr(self, "control_panel", None)
var = getattr(cp, "test_image_var", None) if cp else None
# Determine the requested state from the checkbox variable
is_test_req = (
var.get() == 1
if (var and isinstance(var, tk.Variable))
else self.state.test_mode_active
)
# Only act if the state is actually changing
if is_test_req != self.state.test_mode_active:
self.state.test_mode_active = is_test_req # Update state first
if is_test_req:
# Activate test mode manager and related UI actions
if self.test_mode_manager.activate():
self.activate_test_mode_ui_actions()
else:
# If activation fails, revert the UI checkbox
self._revert_test_mode_ui()
else:
# Deactivate test mode manager and related UI actions
self.test_mode_manager.deactivate()
self.deactivate_test_mode_ui_actions()
# Reset stats and update status bar on mode change
self.state.reset_statistics()
self.update_status()
except Exception as e:
logging.exception(f"{log_prefix} Error changing image mode:")
def update_sar_size(self, event=None):
"""Callback when SAR display size combobox selection changes."""
log_prefix = "[App CB SAR Size]"
if not hasattr(self, "state") or self.state.shutting_down:
return
try:
cp = getattr(self, "control_panel", None)
combo = getattr(cp, "sar_size_combo", None) if cp else None
if not combo:
return
size_str = combo.get()
factor = 1
# Parse the "1:N" string safely
if size_str and ":" in size_str:
try:
factor_val = int(size_str.split(":")[1])
factor = max(1, factor_val) # Ensure factor is at least 1
except (ValueError, IndexError):
factor = 1 # Default to 1 on parsing error
# Calculate new dimensions
w = max(1, config.SAR_WIDTH // factor)
h = max(1, config.SAR_HEIGHT // factor)
# Update state and trigger redraw only if size actually changed
if w != self.state.sar_display_width or h != self.state.sar_display_height:
self.state.update_sar_display_size(w, h)
# Trigger reprocessing/redisplay of SAR image
self._trigger_sar_update()
except Exception as e:
logging.exception(f"{log_prefix} Error updating SAR size:")
def update_contrast(self, value_str: str):
"""Callback for SAR contrast slider changes."""
if self.state.shutting_down:
return
try:
# Convert string value from slider to float
contrast = float(value_str)
# Update contrast in AppState
self.state.update_sar_parameters(contrast=contrast)
# Recalculate the LUT
self.update_brightness_contrast_lut()
# Trigger reprocessing/redisplay of SAR image
self._trigger_sar_update()
except ValueError:
logging.warning(
f"[App CB SAR Contrast] Invalid contrast value: {value_str}"
)
except Exception as e:
logging.exception(f"[App CB SAR Contrast] Error updating contrast: {e}")
def update_brightness(self, value_str: str):
"""Callback for SAR brightness slider changes."""
if self.state.shutting_down:
return
try:
# Convert string value from slider to int (via float for safety)
brightness = int(float(value_str))
# Update brightness in AppState
self.state.update_sar_parameters(brightness=brightness)
# Recalculate the LUT
self.update_brightness_contrast_lut()
# Trigger reprocessing/redisplay of SAR image
self._trigger_sar_update()
except ValueError:
logging.warning(
f"[App CB SAR Brightness] Invalid brightness value: {value_str}"
)
except Exception as e:
logging.exception(f"[App CB SAR Brightness] Error updating brightness: {e}")
def update_sar_palette(self, event=None):
"""Callback when SAR palette combobox selection changes."""
if self.state.shutting_down:
return
try:
cp = getattr(self, "control_panel", None)
combo = getattr(cp, "palette_combo", None) if cp else None
if not combo:
return
palette = combo.get()
# Validate against configured list
if palette in config.COLOR_PALETTES:
# Update palette in AppState
self.state.update_sar_parameters(palette=palette)
# Trigger reprocessing/redisplay of SAR image
self._trigger_sar_update()
else:
# If selection is somehow invalid, revert to current state value
logging.warning(
f"[App CB SAR Palette] Invalid palette selected: {palette}. Reverting."
)
combo.set(self.state.sar_palette)
except Exception as e:
logging.exception(f"[App CB SAR Palette] Error updating palette: {e}")
def update_mfd_category_intensity(self, category_name: str, intensity_value: int):
"""Callback for MFD category intensity slider changes."""
if self.state.shutting_down:
return
try:
if category_name in self.state.mfd_params["categories"]:
# Clamp value to valid range 0-255
clamped_value = np.clip(intensity_value, 0, 255)
# Update the intensity in the MFD parameters dictionary
self.state.mfd_params["categories"][category_name][
"intensity"
] = clamped_value
# Recalculate the MFD LUT based on the change
self.update_mfd_lut()
# Trigger reprocessing/redisplay of MFD image
self._trigger_mfd_update()
else:
logging.warning(
f"[App CB MFD Intensity] Unknown category: {category_name}"
)
except Exception as e:
logging.exception(
f"[App CB MFD Intensity] Error updating intensity for '{category_name}': {e}"
)
def choose_mfd_category_color(self, category_name: str):
"""Callback for MFD category color chooser button."""
# Import moved here to avoid potential issues
from tkinter import colorchooser
if self.state.shutting_down:
return
if category_name not in self.state.mfd_params["categories"]:
logging.warning(f"[App CB MFD Color] Unknown category: {category_name}")
return
try:
initial_bgr = self.state.mfd_params["categories"][category_name]["color"]
initial_hex = (
f"#{initial_bgr[2]:02x}{initial_bgr[1]:02x}{initial_bgr[0]:02x}"
)
color_code = colorchooser.askcolor(
title=f"Select Color for {category_name}", initialcolor=initial_hex
)
# Check if user selected a color
if color_code and color_code[0]:
rgb = color_code[0]
# Convert RGB to BGR tuple, clamping values
new_bgr = tuple(
np.clip(int(c), 0, 255) for c in (rgb[2], rgb[1], rgb[0])
)
# Update state
self.state.mfd_params["categories"][category_name]["color"] = new_bgr
self.update_mfd_lut()
# Schedule UI update for the color preview
cp = getattr(self, "control_panel", None)
if (
self.root
and self.root.winfo_exists()
and cp
and hasattr(cp, "update_mfd_color_display")
):
self.root.after_idle(
cp.update_mfd_color_display, category_name, new_bgr
)
self._trigger_mfd_update()
except Exception as e:
logging.exception(
f"[App CB MFD Color] Error choosing color for '{category_name}': {e}"
)
def update_mfd_raw_map_intensity(self, intensity_value: int):
"""Callback for the MFD Raw Map intensity slider."""
if self.state.shutting_down:
return
try:
clamped_value = np.clip(intensity_value, 0, 255)
self.state.mfd_params["raw_map_intensity"] = clamped_value
self.update_mfd_lut()
self._trigger_mfd_update()
except Exception as e:
logging.exception(
f"[App CB MFD RawMap] Error updating raw map intensity: {e}"
)
def update_map_size(self, event=None):
"""Callback when Map Display Size combobox selection changes."""
if not hasattr(self, "state") or self.state.shutting_down:
return
try:
cp = getattr(self, "control_panel", None)
combo = getattr(cp, "map_size_combo", None) if cp else None
if not combo:
return
self.state.update_map_scale_factor(combo.get())
# Trigger map redraw to apply the new scale
self.trigger_map_redraw()
except Exception as e:
logging.exception(f"[App CB Map Size] Error updating map size: {e}")
def toggle_sar_overlay(self):
"""Callback for the Show SAR Overlay checkbox."""
if not hasattr(self, "state") or self.state.shutting_down:
return
try:
cp = getattr(self, "control_panel", None)
var = getattr(cp, "sar_overlay_var", None) if cp else None
if not var or not isinstance(var, tk.Variable):
return
self.state.update_map_overlay_params(enabled=var.get())
# Trigger map redraw (likely can use recomposition)
self.trigger_map_redraw(full_update=False)
except Exception as e:
logging.exception(f"[App CB Overlay Toggle] Error toggling overlay: {e}")
def on_alpha_slider_release(self, event=None):
"""Callback for SAR Overlay Alpha slider release event."""
log_prefix = "[App CB SAR Alpha Release]"
if not hasattr(self, "state") or self.state.shutting_down:
return
try:
cp = getattr(self, "control_panel", None)
var = getattr(cp, "sar_overlay_alpha_var", None) if cp else None
if not var or not isinstance(var, tk.Variable):
return
final_alpha = var.get()
logging.info(f"{log_prefix} Alpha value set to: {final_alpha:.3f}")
self.state.update_map_overlay_params(alpha=final_alpha)
# Trigger a map redraw using recomposition (faster)
self.trigger_map_redraw(full_update=False)
except Exception as e:
logging.exception(f"{log_prefix} Error handling alpha slider release: {e}")
def toggle_sar_recording(self):
"""Callback for the Record SAR checkbox."""
if not hasattr(self, "state") or self.state.shutting_down:
return
try:
cp = getattr(self, "control_panel", None)
var = getattr(cp, "record_sar_var", None) if cp else None
if not var or not isinstance(var, tk.Variable):
return
self.state.update_sar_recording_enabled(enabled=var.get())
except Exception as e:
logging.exception(f"[App CB Record Toggle] Error toggling recording: {e}")
def apply_sar_overlay_shift(self):
"""Callback for the Apply Shift button."""
log_prefix = "[App CB Apply Shift]"
if not hasattr(self, "state") or self.state.shutting_down:
return
cp = getattr(self, "control_panel", None)
if not cp:
return
lat_str = cp.sar_lat_shift_var.get()
lon_str = cp.sar_lon_shift_var.get()
try:
lat_shift = float(lat_str)
lon_shift = float(lon_str)
# Validate ranges
if not (-90.0 <= lat_shift <= 90.0):
raise ValueError("Latitude shift out of range (-90 to 90)")
if not (-180.0 <= lon_shift <= 180.0):
raise ValueError("Longitude shift out of range (-180 to 180)")
self.state.update_sar_overlay_shift(lat_shift, lon_shift)
# Trigger a FULL map update
self.trigger_map_redraw(full_update=True)
self.set_status("Applied SAR overlay shift.")
except ValueError as ve:
logging.error(f"{log_prefix} Invalid shift value: {ve}")
self.set_status(f"Error: Invalid shift - {ve}")
except Exception as e:
logging.exception(f"{log_prefix} Error applying shift: {e}")
self.set_status("Error applying shift.")
def save_current_map_view(self):
"""Callback for the Save Map View button."""
log_prefix = "[App CB Save Map]"
if not hasattr(self, "state") or self.state.shutting_down:
return
mgr = getattr(self, "map_integration_manager", None)
if not mgr:
self.set_status("Error: Map components not ready.")
return
if self.state.last_composed_map_pil is None:
self.set_status("Error: No map view to save.")
return
try:
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
init_fn = f"map_view_{ts}.png"
file_path = fd.asksaveasfilename(
title="Save Map View As",
initialfile=init_fn,
defaultextension=".png",
filetypes=[("PNG files", "*.png"), ("All files", "*.*")],
)
if file_path:
save_dir = os.path.dirname(file_path)
save_fn = Path(file_path).stem # Filename without extension
mgr.save_map_view_to_file(directory=save_dir, filename=save_fn)
else:
self.set_status("Save map view cancelled.")
except Exception as e:
logging.exception(f"{log_prefix} Error during save map dialog/call: {e}")
self.set_status("Error saving map view.")
def go_to_google_maps(self, coord_source: str):
"""Callback for 'Go' buttons; opens Google Maps."""
log_prefix = "[App CB Go Gmaps]"
if not hasattr(self, "state") or self.state.shutting_down:
return
cp = getattr(self, "control_panel", None)
if not cp:
return
coords_text: Optional[str] = None
source_desc: str = "Unknown"
lat_deg: Optional[float] = None
lon_deg: Optional[float] = None
try:
if coord_source == "sar_center":
coords_text = cp.sar_center_coords_var.get()
source_desc = "SAR Center"
elif coord_source == "sar_mouse":
coords_text = cp.mouse_coords_var.get()
source_desc = "SAR Mouse"
elif coord_source == "map_mouse":
coords_text = cp.map_mouse_coords_var.get()
source_desc = "Map Mouse"
else:
logging.warning(
f"{log_prefix} Unknown coordinate source: {coord_source}"
)
return
if (
not coords_text
or "N/A" in coords_text
or "Error" in coords_text
or "Invalid" in coords_text
):
self.set_status(f"Error: No valid coordinates for {source_desc}.")
return
# Parse the "Lat=..., Lon=..." string
lon_sep = ", Lon="
if lon_sep in coords_text:
parts = coords_text.split(lon_sep, 1)
lat_dms_str = None
if "Lat=" in parts[0]:
lat_dms_str = parts[0].split("=", 1)[1].strip()
lon_dms_str = parts[1].strip()
if lat_dms_str and lon_dms_str:
lat_deg = dms_string_to_decimal(lat_dms_str, is_latitude=True)
lon_deg = dms_string_to_decimal(lon_dms_str, is_latitude=False)
else:
raise ValueError("Could not extract Lat/Lon parts")
else:
raise ValueError("Separator ', Lon=' not found")
if lat_deg is None or lon_deg is None:
self.set_status(f"Error: Cannot parse coordinates for {source_desc}.")
return
# Call utility function
open_google_maps(lat_deg, lon_deg)
except ValueError as ve:
logging.error(
f"{log_prefix} Parsing error for {source_desc}: {ve} (Text: '{coords_text}')"
)
self.set_status(f"Error parsing coords for {source_desc}.")
except Exception as e:
logging.exception(
f"{log_prefix} Error opening Google Maps for {source_desc}:"
)
self.set_status(f"Error opening map for {source_desc}.")
def go_to_google_earth(self, coord_source: str):
"""Callback for 'GE' buttons; opens Google Earth with LookAt/Point KML."""
log_prefix = "[App CB Go GEarth]"
if not hasattr(self, "state") or self.state.shutting_down:
return
if not _simplekml_available:
logging.error(f"{log_prefix} Cannot proceed: simplekml missing.")
self.set_status("Error: KML library missing.")
return
control_panel_ref = getattr(self, "control_panel", None)
if not control_panel_ref:
logging.error(f"{log_prefix} Control Panel UI reference not found.")
return
coords_text: Optional[str] = None
source_desc: str = "Unknown"
lat_deg: Optional[float] = None
lon_deg: Optional[float] = None
placemark_name: str = "Selected Location"
try:
# Get coordinates text and determine KML placemark name
if coord_source == "sar_center":
coords_text = control_panel_ref.sar_center_coords_var.get()
source_desc = "SAR Center"
placemark_name = "SAR Center"
elif coord_source == "sar_mouse":
coords_text = control_panel_ref.mouse_coords_var.get()
source_desc = "SAR Mouse"
placemark_name = "Mouse on SAR"
elif coord_source == "map_mouse":
coords_text = control_panel_ref.map_mouse_coords_var.get()
source_desc = "Map Mouse"
placemark_name = "Mouse on Map"
else:
logging.warning(
f"{log_prefix} Unknown coordinate source: {coord_source}"
)
return
# Validate coordinates text
if (
not coords_text
or "N/A" in coords_text
or "Error" in coords_text
or "Invalid" in coords_text
):
self.set_status(f"Error: No valid coordinates for {source_desc}.")
return
# Parse the DMS string to get decimal degrees
lon_sep = ", Lon="
if lon_sep in coords_text:
parts = coords_text.split(lon_sep, 1)
lat_dms_str = None
if "Lat=" in parts[0]:
lat_dms_str = parts[0].split("=", 1)[1].strip()
lon_dms_str = parts[1].strip()
if lat_dms_str and lon_dms_str:
lat_deg = dms_string_to_decimal(lat_dms_str, is_latitude=True)
lon_deg = dms_string_to_decimal(lon_dms_str, is_latitude=False)
else:
raise ValueError("Could not split Lat/Lon parts")
else:
raise ValueError("Separator ', Lon=' not found")
if lat_deg is None or lon_deg is None:
self.set_status(f"Error: Cannot parse coordinates for {source_desc}.")
return
# Generate Temporary KML
logging.debug(
f"{log_prefix} Generating KML for '{placemark_name}' at Lat={lat_deg:.6f}, Lon={lon_deg:.6f}"
)
temp_kml_path = generate_lookat_and_point_kml(
latitude_deg=lat_deg,
longitude_deg=lon_deg,
placemark_name=placemark_name,
placemark_desc=f"Source: {source_desc}\nCoords: {coords_text}",
)
# Launch Google Earth if KML generated
if temp_kml_path:
logging.debug(
f"{log_prefix} Launching Google Earth with KML: {temp_kml_path}"
)
launch_google_earth(temp_kml_path)
else:
logging.error(f"{log_prefix} Failed to generate KML file.")
self.set_status("Error: Failed to create KML.")
except ValueError as ve:
logging.error(
f"{log_prefix} Parsing error for {source_desc}: {ve} (Text: '{coords_text}')"
)
self.set_status(f"Error parsing coords for {source_desc}.")
except Exception as e:
logging.exception(
f"{log_prefix} Error preparing/launching GE for {source_desc}:"
)
self.set_status(f"Error launching GE for {source_desc}.")
def go_to_all_gearth(self):
"""
Callback for 'GE All' button. Generates a composite KML including
points, SAR footprint, and SAR ground overlay, then launches Google Earth.
"""
log_prefix = "[App CB GE All]"
if not hasattr(self, "state") or self.state.shutting_down:
return
if not _simplekml_available or not _pyproj_available:
logging.error(f"{log_prefix} Cannot proceed: simplekml or pyproj missing.")
self.set_status("Error: KML library missing.")
return
control_panel_ref = getattr(self, "control_panel", None)
if not control_panel_ref:
logging.error(f"{log_prefix} Control Panel UI reference not found.")
return
points_to_plot: List[Tuple[float, float, str, Optional[str]]] = []
# --- 1. Collect Point Data (same as before) ---
source_map = {
"SAR Center": ("SAR Center", control_panel_ref.sar_center_coords_var),
"SAR Mouse": ("Mouse on SAR", control_panel_ref.mouse_coords_var),
"Map Mouse": ("Mouse on Map", control_panel_ref.map_mouse_coords_var),
}
for internal_name, (kml_name, tk_var) in source_map.items():
coords_text = tk_var.get()
lat_deg, lon_deg = None, None
if (
coords_text
and "N/A" not in coords_text
and "Error" not in coords_text
and "Invalid" not in coords_text
):
try:
lon_sep = ", Lon="
if lon_sep in coords_text:
parts = coords_text.split(lon_sep, 1)
lat_dms_str = (
parts[0].split("=", 1)[1].strip()
if "Lat=" in parts[0]
else None
)
lon_dms_str = parts[1].strip()
if lat_dms_str and lon_dms_str:
lat_deg = dms_string_to_decimal(
lat_dms_str, is_latitude=True
)
lon_deg = dms_string_to_decimal(
lon_dms_str, is_latitude=False
)
else:
raise ValueError("Could not split Lat/Lon parts")
else:
raise ValueError("Separator ', Lon=' not found")
if lat_deg is not None and lon_deg is not None:
points_to_plot.append(
(
lat_deg,
lon_deg,
kml_name,
f"Source: {internal_name}\nCoords: {coords_text}",
)
)
except Exception as parse_err:
logging.error(
f"{log_prefix} Error parsing coords for {internal_name}: {parse_err}"
)
# --- 2. Get SAR Data and GeoInfo from AppState ---
sar_normalized_uint8 = self.state.current_sar_normalized
geo_info = self.state.current_sar_geo_info
if sar_normalized_uint8 is None or sar_normalized_uint8.size == 0:
logging.error(f"{log_prefix} No current SAR image data available.")
self.set_status("Error: SAR image data missing.")
return
if not geo_info or not geo_info.get("valid", False):
logging.error(f"{log_prefix} Invalid or missing SAR GeoInfo available.")
self.set_status("Error: SAR GeoInfo missing.")
return
# --- 3. Process SAR Image for Overlay (B/C, Palette) ---
# Retrieve current parameters from state
bc_lut = self.state.brightness_contrast_lut
palette = self.state.sar_palette
if bc_lut is None:
logging.error(f"{log_prefix} SAR B/C LUT is missing. Cannot process image.")
self.set_status("Error: SAR LUT missing.")
return
logging.debug(f"{log_prefix} Processing current SAR image for KML overlay...")
try:
# Start with the normalized uint8 image from state
img_for_kml = sar_normalized_uint8.copy()
# Apply B/C LUT
img_for_kml = cv2.LUT(img_for_kml, bc_lut)
# Apply Color Palette (if not GRAY)
if palette != "GRAY":
img_for_kml = apply_color_palette(img_for_kml, palette)
# Ensure BGR format even if GRAY palette was used
elif img_for_kml.ndim == 2:
img_for_kml = cv2.cvtColor(img_for_kml, cv2.COLOR_GRAY2BGR)
if img_for_kml is None:
raise ValueError("Image processing for KML resulted in None.")
logging.debug(f"{log_prefix} SAR image processed for KML (shape: {img_for_kml.shape}).")
except Exception as proc_err:
logging.exception(f"{log_prefix} Error processing SAR image for KML:")
self.set_status("Error processing SAR image.")
return
# --- 4. Generate Composite KML ---
try:
logging.debug(f"{log_prefix} Generating composite KML...")
# Pass the collected points, the *processed* SAR image, and geo_info
composite_kml_path = generate_composite_kml(
points_to_plot, img_for_kml, geo_info
)
if composite_kml_path:
# --- 5. Launch Google Earth ---
logging.debug(
f"{log_prefix} Launching GE with composite KML: {composite_kml_path}"
)
launch_google_earth(composite_kml_path)
self.set_status("Launched Google Earth with composite view.")
# Optionally: Clean up old KML files (including this one eventually)
cleanup_kml_output_directory(config.KML_OUTPUT_DIRECTORY, config.MAX_KML_FILES)
else:
logging.error(f"{log_prefix} Failed to generate composite KML file.")
self.set_status("Error: Failed to create KML.")
except Exception as e:
logging.exception(
f"{log_prefix} Error generating/launching composite KML:"
)
self.set_status("Error during GE All generation.")
def toggle_sar_metadata_display(self):
"""Callback for 'Show SAR Metadata' checkbox. Shows/Hides the metadata panel using grid."""
log_prefix = "[App CB MetaToggle]"
if not hasattr(self, "state") or self.state.shutting_down:
return
# Get references needed
cp = getattr(self, "control_panel", None)
var = getattr(cp, "show_meta_var", None) if cp else None
metadata_frame = getattr(self, "metadata_frame", None)
container = getattr(self, "container_frame", None)
if not var or not cp or not metadata_frame or not container:
logging.error(
f"{log_prefix} UI components missing. Cannot toggle metadata. "
f"(Var: {var is not None}, CP: {cp is not None}, "
f"MetaFrame: {metadata_frame is not None}, Container: {container is not None})"
)
return
try:
is_enabled = var.get()
# Only proceed if the state needs changing
if bool(is_enabled) == self.state.display_sar_metadata:
return
self.state.display_sar_metadata = bool(is_enabled)
logging.info(f"{log_prefix} SAR Metadata display changing to: {is_enabled}")
current_height = self.root.winfo_height()
if is_enabled:
# --- SHOW METADATA PANEL ---
# 1. Grid the metadata frame into the container (column 1)
logging.debug(f"{log_prefix} Gridding metadata frame...")
metadata_frame.grid(
row=0, column=1, sticky="nsew", padx=(5, 5), pady=(0, 0)
)
# 2. Configure column weight (give equal weight)
container.columnconfigure(1, weight=1)
# 3. Resize window
new_width = self.expanded_window_width
logging.debug(
f"{log_prefix} Resizing window to expanded width: {new_width}"
)
self.root.geometry(f"{new_width}x{current_height}")
self.root.minsize(new_width, config.TKINTER_MIN_HEIGHT)
# 4. Update content
if self.state.last_sar_metadata_str:
self.set_metadata_display(self.state.last_sar_metadata_str)
else:
self.set_metadata_display("<Waiting for SAR metadata...>")
else:
# --- HIDE METADATA PANEL ---
# 1. Remove metadata frame from grid management
logging.debug(f"{log_prefix} Removing metadata frame from grid...")
metadata_frame.grid_remove()
# 2. Reset column weight
container.columnconfigure(1, weight=0)
# 3. Resize window back to original
new_width = self.original_window_width
logging.debug(
f"{log_prefix} Resizing window to original width: {new_width}"
)
self.root.geometry(f"{new_width}x{current_height}")
self.root.minsize(new_width, config.TKINTER_MIN_HEIGHT)
# 4. Clear content
self.set_metadata_display(
"Enable 'Show SAR Metadata' checkbox to view data..."
)
except Exception as e:
logging.exception(f"{log_prefix} Error toggling metadata display: {e}")
# --- Initialization Helper Methods ---
def _get_screen_dimensions(self) -> Tuple[int, int]:
"""Gets primary screen dimensions using screeninfo."""
log_prefix = "[App Init]"
try:
monitors = screeninfo.get_monitors()
if not monitors:
raise screeninfo.ScreenInfoError("No monitors detected.")
screen = monitors[0]
logging.debug(
f"{log_prefix} Detected Screen: {screen.width}x{screen.height}"
)
return screen.width, screen.height
except Exception as e:
logging.warning(
f"{log_prefix} Screen info error: {e}. Using default 1920x1080."
)
return 1920, 1080
def _calculate_initial_sar_size(
self, desired_factor_if_map: int = 4
) -> Tuple[int, int]:
"""Calculates initial SAR display size based on config and map state."""
log_prefix = "[App Init]"
initial_w = self.state.sar_display_width
initial_h = self.state.sar_display_height
map_enabled_and_loaded = config.ENABLE_MAP_OVERLAY and MAP_MODULES_LOADED
if map_enabled_and_loaded:
forced_factor = max(1, desired_factor_if_map)
initial_w = config.SAR_WIDTH // forced_factor
initial_h = config.SAR_HEIGHT // forced_factor
# Update state immediately if map forces a different initial size
if (
initial_w != self.state.sar_display_width
or initial_h != self.state.sar_display_height
):
self.state.update_sar_display_size(initial_w, initial_h)
logging.info(
f"{log_prefix} Map active, using SAR size 1:{forced_factor} ({initial_w}x{initial_h})."
)
else:
logging.debug(
f"{log_prefix} Using initial SAR size from state: {initial_w}x{initial_h}."
)
return initial_w, initial_h
def _calculate_tkinter_position(self, screen_h: int) -> Tuple[int, int]:
"""Calculates the initial X, Y position for the Tkinter control panel window."""
x = 10
y = config.INITIAL_MFD_HEIGHT + 40 # Position below MFD placeholder area
# Adjust if it goes off screen bottom
if y + config.TKINTER_MIN_HEIGHT > screen_h:
y = max(10, screen_h - config.TKINTER_MIN_HEIGHT - 10)
return x, y
def _calculate_mfd_position(self) -> Tuple[int, int]:
"""Calculates the initial X, Y position for the MFD display window."""
x = self.tkinter_x # Align with Tkinter window's left edge
y = 10 # Near top
return x, y
def _calculate_sar_position(
self, screen_w: int, initial_sar_w: int
) -> Tuple[int, int]:
"""Calculates the initial X, Y position for the SAR display window."""
# Position right of ORIGINAL Tkinter width
x = self.tkinter_x + self.original_window_width + 20
y = 10 # Align with top
# Adjust if it goes off screen right
if x + initial_sar_w > screen_w:
x = max(10, screen_w - initial_sar_w - 10)
return x, y
def _calculate_map_position(
self, screen_w: int, current_sar_w: int, max_map_width: int
) -> Tuple[int, int]:
"""Calculates the initial X, Y position for the Map display window."""
# Position right of SAR window
x = self.sar_x + current_sar_w + 20
y = 10 # Align with top
# Adjust if it goes off screen right
if x + max_map_width > screen_w:
x = max(10, screen_w - max_map_width - 10)
return x, y
def _setup_network_receiver(self):
"""Creates and starts the UDP socket and receiver thread."""
log_prefix = "[App Init Network]"
logging.info(
f"{log_prefix} Attempting network receiver on {self.local_ip}:{self.local_port}"
)
self.udp_socket = create_udp_socket(self.local_ip, self.local_port)
if self.udp_socket:
try:
recorder_instance = getattr(self, "image_recorder", None)
self.udp_receiver = UdpReceiver(
app=self,
udp_socket=self.udp_socket,
set_new_sar_image_callback=self.handle_new_sar_data,
set_new_mfd_indices_image_callback=self.handle_new_mfd_data,
image_recorder=recorder_instance,
)
self.udp_thread = threading.Thread(
target=self.udp_receiver.receive_udp_data,
name="UDPReceiverThread",
daemon=True,
)
self.udp_thread.start()
self.set_status(f"Listening UDP {self.local_ip}:{self.local_port}")
except Exception as receiver_init_e:
logging.critical(
f"{log_prefix} Failed to initialize UdpReceiver: {receiver_init_e}",
exc_info=True,
)
self.set_status("Error: Receiver Init Failed")
close_udp_socket(self.udp_socket)
self.udp_socket = None
else:
self.set_status("Error: UDP Socket Failed")
def _start_initial_image_loader(self):
"""Starts a background thread to load local/test images if needed."""
log_prefix = "[App Init]"
should_load = config.USE_LOCAL_IMAGES or config.ENABLE_TEST_MODE
if should_load:
logging.debug(f"{log_prefix} Starting initial image loading thread...")
image_loading_thread = threading.Thread(
target=self.load_initial_images, name="ImageLoaderThread", daemon=True
)
image_loading_thread.start()
elif self.root and self.root.winfo_exists():
# If not loading local/test, schedule initial display setup immediately
self.root.after_idle(self._set_initial_display_from_loaded_data)
def _update_initial_ui_display(self):
"""Sets the initial text for UI info Entry widgets based on default AppState."""
log_prefix = "[App Init]"
logging.debug(f"{log_prefix} Setting initial UI info display...")
control_panel_ref = getattr(self, "control_panel", None)
if not control_panel_ref:
return
try:
control_panel_ref.set_sar_center_coords("N/A", "N/A")
control_panel_ref.set_sar_orientation("N/A")
control_panel_ref.set_sar_size_km("N/A")
control_panel_ref.set_mouse_coordinates("N/A", "N/A")
control_panel_ref.set_map_mouse_coordinates("N/A", "N/A")
# Get initial stats and display them
initial_stats = self.state.get_statistics()
drop_txt = (
f"Drop(Q): S={initial_stats['dropped_sar_q']},"
f"M={initial_stats['dropped_mfd_q']},"
f"Tk={initial_stats['dropped_tk_q']},"
f"Mo={initial_stats['dropped_mouse_q']}"
)
incmpl_txt = (
f"Incmpl(RX): S={initial_stats['incomplete_sar_rx']},"
f"M={initial_stats['incomplete_mfd_rx']}"
)
control_panel_ref.set_statistics_display(drop_txt, incmpl_txt)
logging.debug(f"{log_prefix} Initial UI info display set.")
except tk.TclError as e:
# May happen if widgets are not fully ready during init sequence
logging.warning(
f"{log_prefix} Error setting initial UI display (TclError): {e}"
)
except Exception as e:
logging.exception(
f"{log_prefix} Unexpected error setting initial UI display:"
)
def load_initial_images(self):
"""(Runs in background thread) Loads initial local/test images into AppState."""
log_prefix = "[App Image Loader]"
if self.state.shutting_down:
return
# Optionally update status bar
if self.root and self.root.winfo_exists():
self.root.after_idle(self.set_status, "Loading initial images...")
try:
# Ensure test images are generated if needed
if config.ENABLE_TEST_MODE or self.state.test_mode_active:
if hasattr(self, "test_mode_manager") and self.test_mode_manager:
self.test_mode_manager._ensure_test_images()
# Load local images if configured
if config.USE_LOCAL_IMAGES:
self._load_local_mfd_image()
self._load_local_sar_image()
# Schedule the display update after loading
if self.root and self.root.winfo_exists():
self.root.after_idle(self._set_initial_display_from_loaded_data)
except Exception as e:
logging.exception(f"{log_prefix} Error during initial image loading:")
# Update status bar on error
if self.root and self.root.winfo_exists():
self.root.after_idle(self.set_status, "Error Loading Images")
finally:
logging.info(f"{log_prefix} Initial image loading thread finished.")
def _load_local_mfd_image(self):
"""Loads local MFD image data (indices) into AppState."""
log_prefix = "[App Image Loader]"
# Default placeholder if loading fails
default_indices = np.random.randint(
0, 256, (config.MFD_HEIGHT, config.MFD_WIDTH), dtype=np.uint8
)
try:
# Placeholder: Implement actual loading from config.MFD_IMAGE_PATH
mfd_path = getattr(
config, "MFD_IMAGE_PATH", "local_mfd_indices.png"
) # Example
logging.warning(
f"{log_prefix} Local MFD image loading NYI ({mfd_path}). Using random."
)
# loaded_indices = load_image(mfd_path, np.uint8) # Hypothetical load
# self.state.local_mfd_image_data_indices = loaded_indices if loaded_indices ... else default_indices
self.state.local_mfd_image_data_indices = default_indices
except Exception as e:
logging.exception(f"{log_prefix} Error loading local MFD image:")
self.state.local_mfd_image_data_indices = default_indices
def _load_local_sar_image(self):
"""Loads local SAR image data (raw) into AppState."""
log_prefix = "[App Image Loader]"
# Default placeholder if loading fails
default_raw_data = np.zeros(
(config.SAR_HEIGHT, config.SAR_WIDTH), dtype=config.SAR_DATA_TYPE
)
try:
sar_path = getattr(config, "SAR_IMAGE_PATH", "local_sar_image.tif")
logging.info(
f"{log_prefix} Attempting to load local SAR raw data from: {sar_path}"
)
loaded_raw_data = load_image(sar_path, config.SAR_DATA_TYPE)
# Check if loading was successful and data is not empty
if loaded_raw_data is not None and loaded_raw_data.size > 0:
self.state.local_sar_image_data_raw = loaded_raw_data
else:
logging.warning(
f"{log_prefix} Failed to load local SAR raw data or empty image. Using zeros."
)
self.state.local_sar_image_data_raw = default_raw_data
except Exception as e:
logging.exception(f"{log_prefix} Error loading local SAR raw data:")
self.state.local_sar_image_data_raw = default_raw_data
def _set_initial_display_from_loaded_data(self):
"""(Runs in main thread) Sets initial display based on loaded data/mode."""
log_prefix = "[App Init Display]"
if self.state.shutting_down:
return
is_test = self.state.test_mode_active
is_local = config.USE_LOCAL_IMAGES
# If in local mode (and not test mode starting up)
if not is_test and is_local:
# Process MFD if data available
if self.state.local_mfd_image_data_indices is not None:
if hasattr(self, "image_pipeline") and self.image_pipeline:
self.state.current_mfd_indices = (
self.state.local_mfd_image_data_indices.copy()
)
self.image_pipeline.process_mfd_for_display()
# Process SAR if data available
if self.state.local_sar_image_data_raw is not None:
self.set_initial_sar_image(self.state.local_sar_image_data_raw)
else:
# Show black SAR image if local data missing
self.set_initial_sar_image(None)
elif is_test:
# Test mode display is handled by TestModeManager activation
pass
else: # Network mode (no local/test images loaded yet)
# Show placeholders while waiting for network data
self._show_network_placeholders()
# Set final status (consider if map is still loading)
map_loading = False
mgr = getattr(self, "map_integration_manager", None)
if mgr:
thread_attr = getattr(mgr, "_map_initial_display_thread", None)
if thread_attr and isinstance(thread_attr, threading.Thread):
map_loading = thread_attr.is_alive()
# Update status only if map is done loading (or no map)
if not map_loading:
status = ""
if is_test:
status = "Ready (Test Mode)"
elif is_local:
status = "Ready (Local Mode)"
else:
# Network mode status
status = (
f"Listening UDP {self.local_ip}:{self.local_port}"
if self.udp_socket
else "Error: No Socket"
)
self.set_status(status)
def set_initial_sar_image(self, raw_image_data: Optional[np.ndarray]):
"""Processes raw SAR data (or None), updates state, resets UI, triggers display."""
log_prefix = "[App Init SAR Image]"
normalized = None
if self.state.shutting_down:
return
if raw_image_data is not None and raw_image_data.size > 0:
try:
# Normalize raw data to uint8 for display processing
normalized = normalize_image(raw_image_data, target_type=np.uint8)
except Exception as e:
logging.exception(f"{log_prefix} Error normalizing initial SAR:")
# Use black image if normalization failed or no raw data provided
if normalized is None:
# Create black image if state is None initially
if self.state.current_sar_normalized is None:
self.state.current_sar_normalized = np.zeros(
(config.SAR_HEIGHT, config.SAR_WIDTH), dtype=np.uint8
)
# Fill existing array with black
self.state.current_sar_normalized.fill(0)
else:
# Assign the successfully normalized image
self.state.current_sar_normalized = normalized
# Reset GeoInfo to default invalid state
self.state.current_sar_geo_info = self.state._initialize_geo_info()
# Reset related UI fields
self._reset_ui_geo_info()
# Trigger display processing pipeline
if hasattr(self, "image_pipeline") and self.image_pipeline:
self.image_pipeline.process_sar_for_display()
# --- Mode Switching UI Actions ---
def activate_test_mode_ui_actions(self):
"""Actions to perform when Test Mode is activated via UI."""
log_prefix = "[App Test Activate]"
self.set_status("Activating Test Mode...")
# Reset geo info display
self._reset_ui_geo_info()
# Clear display queues
clear_queue(self.mfd_queue)
clear_queue(self.sar_queue)
# Set status (TestModeManager might update it again)
self.set_status("Ready (Test Mode)")
def deactivate_test_mode_ui_actions(self):
"""Actions to perform when Test Mode is deactivated via UI."""
log_prefix = "[App Test Deactivate]"
self.set_status("Activating Normal Mode...")
# Clear display queues
clear_queue(self.mfd_queue)
clear_queue(self.sar_queue)
# Reset geo info display
self._reset_ui_geo_info()
# Restore display based on whether we are in Local or Network mode
if config.USE_LOCAL_IMAGES: # Local Mode Restore
# Display local MFD if available
if self.state.local_mfd_image_data_indices is not None:
self.state.current_mfd_indices = (
self.state.local_mfd_image_data_indices.copy()
)
if hasattr(self, "image_pipeline"):
self.image_pipeline.process_mfd_for_display()
# Display local SAR if available
if self.state.local_sar_image_data_raw is not None:
self.set_initial_sar_image(self.state.local_sar_image_data_raw)
else:
self.set_initial_sar_image(None) # Show black if missing
self.set_status("Ready (Local Mode)")
else: # Network Mode Restore
# Show placeholders until network data arrives
self._show_network_placeholders()
# Set status based on socket state
status = (
f"Listening UDP {self.local_ip}:{self.local_port}"
if self.udp_socket
else "Error: No UDP Socket"
)
self.set_status(status)
def _reset_ui_geo_info(self):
"""Schedules UI reset for geo-related Entry widgets on the main thread."""
log_prefix = "[App UI Reset]"
cp = getattr(self, "control_panel", None)
if self.root and self.root.winfo_exists() and cp:
# Use lambda with after_idle to ensure calls run in main thread
self.root.after_idle(lambda: cp.set_sar_orientation("N/A"))
self.root.after_idle(lambda: cp.set_mouse_coordinates("N/A", "N/A"))
self.root.after_idle(lambda: cp.set_map_mouse_coordinates("N/A", "N/A"))
self.root.after_idle(lambda: cp.set_sar_center_coords("N/A", "N/A"))
self.root.after_idle(lambda: cp.set_sar_size_km("N/A"))
def _revert_test_mode_ui(self):
"""Tries to uncheck the test mode checkbox in the UI and resets the state flag."""
log_prefix = "[App Mode Switch]"
logging.warning(f"{log_prefix} Reverting Test Mode UI/state...")
cp = getattr(self, "control_panel", None)
var = getattr(cp, "test_image_var", None) if cp else None
if self.root and self.root.winfo_exists() and var:
try:
# Schedule the UI update
self.root.after_idle(var.set, 0)
except Exception as e:
logging.warning(f"{log_prefix} Failed to schedule uncheck: {e}")
# Ensure state flag is also reset
if hasattr(self, "state"):
self.state.test_mode_active = False
def _show_network_placeholders(self):
"""Queues placeholder images for MFD and SAR displays."""
log_prefix = "[App Placeholders]"
try:
# Create MFD placeholder (dark gray)
ph_mfd = np.full(
(config.INITIAL_MFD_HEIGHT, config.INITIAL_MFD_WIDTH, 3),
30,
dtype=np.uint8,
)
# Create SAR placeholder (lighter gray, uses current display size)
ph_sar = np.full(
(self.state.sar_display_height, self.state.sar_display_width, 3),
60,
dtype=np.uint8,
)
# Put placeholders onto respective display queues
put_queue(self.mfd_queue, ph_mfd, "mfd", self)
put_queue(self.sar_queue, ph_sar, "sar", self)
except Exception as e:
logging.exception(f"{log_prefix} Error creating/queueing placeholders:")
# --- Trigger map redraw ---
def trigger_map_redraw(self, full_update: bool = False):
"""
Requests a map redraw. Uses cached data for simple redraws or triggers
a full update if needed or requested.
"""
log_prefix = "[App Trigger Map Redraw]"
if self.state.shutting_down:
return
mgr = getattr(self, "map_integration_manager", None)
# Only proceed if map is enabled and manager exists
if not (config.ENABLE_MAP_OVERLAY and mgr):
return
if full_update:
# Explicit request for full update (e.g., shift changed)
logging.debug(f"{log_prefix} Triggering full map update (explicit)...")
self._trigger_map_update_from_sar()
else:
# Simple redraw requested (e.g., alpha, toggle, marker change)
# Check if data required for fast recomposition is available
can_recompose = (
self.state.last_processed_sar_for_overlay is not None
and self.state.last_sar_warp_matrix is not None
and self.state.last_map_image_pil is not None
)
if can_recompose:
# Queue the fast redraw command
logging.debug(f"{log_prefix} Queueing simple REDRAW_MAP command...")
put_queue(self.tkinter_queue, ("REDRAW_MAP", None), "tkinter", self)
else:
# If cache is invalid, force a full update instead
logging.warning(
f"{log_prefix} Recomposition cache invalid. Triggering full update..."
)
self._trigger_map_update_from_sar()
# --- Network Data Handlers ---
def handle_new_sar_data(
self, normalized_image_uint8: np.ndarray, geo_info_radians: Dict[str, Any]
):
"""Callback executed by receiver when new SAR data is ready."""
if self.state.shutting_down:
return
# Update AppState with the new data
self.state.set_sar_data(normalized_image_uint8, geo_info_radians)
# Schedule the main thread processing using after_idle
if self.root and self.root.winfo_exists():
self.root.after_idle(self._process_sar_update_on_main_thread)
def handle_new_mfd_data(self, image_indices: np.ndarray):
"""Callback executed by receiver when new MFD data is ready."""
if self.state.shutting_down:
return
# Update AppState with the new MFD index image
self.state.set_mfd_indices(image_indices)
# Schedule the main thread processing using after_idle
if self.root and self.root.winfo_exists():
self.root.after_idle(self._process_mfd_update_on_main_thread)
# --- Main Thread Processing ---
def _process_sar_update_on_main_thread(self):
"""Processes SAR updates in the main thread: UI, pipeline, map, KML, FPS."""
if self.state.shutting_down:
return
# Update UI labels (SAR Center, Orient, Size)
self._update_sar_ui_labels()
# Process image for display (LUT, palette, resize, marker)
if hasattr(self, "image_pipeline") and self.image_pipeline:
self.image_pipeline.process_sar_for_display()
# Trigger map update if map is enabled
self._trigger_map_update_from_sar()
# Handle KML generation if enabled and data valid
geo_info = self.state.current_sar_geo_info
if geo_info and geo_info.get("valid") and config.ENABLE_KML_GENERATION:
self._handle_kml_generation(geo_info)
# Update FPS counter
self._update_fps_stats("sar")
def _handle_kml_generation(self, geo_info):
"""
Handles KML generation, cleanup, and optional launch.
Checks the ENABLE_AUTO_SAR_KML_GENERATION flag before proceeding.
"""
log_prefix = "[App KML]" # Local log prefix
# --- >>> START OF NEW CODE <<< ---
# Check the configuration flag first
if not config.ENABLE_AUTO_SAR_KML_GENERATION:
# Log only if the general KML generation is enabled but auto is disabled
if config.ENABLE_KML_GENERATION:
logging.debug(
f"{log_prefix} Automatic KML generation for SAR footprint is disabled via config flag. Skipping."
)
return # Exit if automatic generation is disabled
# --- >>> END OF NEW CODE <<< ---
# Check if KML generation is globally disabled
if not config.ENABLE_KML_GENERATION:
# This condition might be redundant if the caller already checks,
# but added for safety.
return
# Check if libraries needed for KML were loaded
if not _simplekml_available or not _pyproj_available:
# Log this only once maybe? Or check flag? For now, log each time.
logging.warning(
f"{log_prefix} Skipping KML generation: simplekml or pyproj missing."
)
return
try:
kml_dir = config.KML_OUTPUT_DIRECTORY
os.makedirs(kml_dir, exist_ok=True)
# Use timestamp with milliseconds for unique filenames
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
fn = f"sar_footprint_{ts}.kml"
fp = os.path.join(kml_dir, fn)
# Call utility function to generate KML
logging.debug(f"{log_prefix} Generating KML file: {fp}")
success = generate_sar_kml(geo_info, fp)
if success:
logging.debug(f"{log_prefix} KML generation successful.")
# Call utility function to clean up old KML files
cleanup_kml_output_directory(
config.KML_OUTPUT_DIRECTORY, config.MAX_KML_FILES
)
# Optionally launch Google Earth
if config.AUTO_LAUNCH_GOOGLE_EARTH:
launch_google_earth(fp) # Use utility function
else:
logging.error(f"{log_prefix} KML generation failed (utility returned False).")
except Exception as e:
logging.exception(f"{log_prefix} Error during KML handling: {e}")
def _process_mfd_update_on_main_thread(self):
"""Processes MFD updates in the main thread: pipeline, FPS."""
if self.state.shutting_down:
return
# Process image for display (LUT application)
if hasattr(self, "image_pipeline") and self.image_pipeline:
self.image_pipeline.process_mfd_for_display()
# Update FPS counter
self._update_fps_stats("mfd")
# --- UI Display Update Helpers ---
def _update_sar_ui_labels(self):
"""Updates SAR related UI Entry widgets from AppState."""
cp = getattr(self, "control_panel", None)
# Check if control panel exists and its window is valid
if not cp or not cp.winfo_exists():
return
geo = self.state.current_sar_geo_info
lat_s, lon_s, orient_s, size_s = "N/A", "N/A", "N/A", "N/A"
is_valid = geo and geo.get("valid")
if is_valid:
try:
# Convert radians to degrees for display/formatting
lat_d = math.degrees(geo["lat"])
lon_d = math.degrees(geo["lon"])
orient_d = math.degrees(geo.get("orientation", 0.0))
# Format using DMS utility
lat_s = decimal_to_dms(lat_d, True)
lon_s = decimal_to_dms(lon_d, False)
orient_s = f"{orient_d:.2f}°" # Format orientation
# Calculate size in km if possible
scale_x = geo.get("scale_x", 0.0)
width_px = geo.get("width_px", 0)
scale_y = geo.get("scale_y", 0.0)
height_px = geo.get("height_px", 0)
if scale_x > 0 and width_px > 0 and scale_y > 0 and height_px > 0:
size_w_km = (scale_x * width_px) / 1000.0
size_h_km = (scale_y * height_px) / 1000.0
size_s = f"W: {size_w_km:.1f} km, H: {size_h_km:.1f} km"
except Exception as e:
logging.warning(f"[App UI Update] Error formatting SAR geo labels: {e}")
lat_s, lon_s, orient_s, size_s = "Error", "Error", "Error", "Error"
is_valid = False # Mark as invalid if formatting fails
# Update UI widgets safely using control panel methods
try:
cp.set_sar_center_coords(lat_s, lon_s)
cp.set_sar_orientation(orient_s)
cp.set_sar_size_km(size_s)
except Exception as e:
# Catch errors if UI elements don't exist or Tcl errors
logging.exception(f"[App UI Update] Error setting SAR labels: {e}")
# Clear mouse coords if GeoInfo becomes invalid
if not is_valid:
try:
cp.set_mouse_coordinates("N/A", "N/A")
cp.set_map_mouse_coordinates("N/A", "N/A")
except Exception:
pass # Ignore errors if UI closed
def _update_fps_stats(self, img_type: str):
"""Updates FPS counters in AppState based on LOG_UPDATE_INTERVAL."""
now = time.time()
try:
if img_type == "sar":
self.state.sar_frame_count += 1
elapsed = now - self.state.sar_update_time
# Update FPS if interval passed
if elapsed >= config.LOG_UPDATE_INTERVAL:
self.state.sar_fps = self.state.sar_frame_count / elapsed
# Reset timer and counter
self.state.sar_update_time = now
self.state.sar_frame_count = 0
elif img_type == "mfd":
self.state.mfd_frame_count += 1
elapsed = now - self.state.mfd_start_time
# Update FPS if interval passed
if elapsed >= config.LOG_UPDATE_INTERVAL:
self.state.mfd_fps = self.state.mfd_frame_count / elapsed
# Reset timer and counter
self.state.mfd_start_time = now
self.state.mfd_frame_count = 0
except ZeroDivisionError:
pass # Ignore if elapsed time is zero
except Exception:
pass # Ignore other potential calculation errors
# --- Trigger Methods ---
def _trigger_sar_update(self):
"""Triggers SAR reprocessing if not in test mode."""
if self.state.shutting_down or self.state.test_mode_active:
return
if hasattr(self, "image_pipeline") and self.image_pipeline:
# Ask the pipeline to process the current SAR data in state
self.image_pipeline.process_sar_for_display()
def _trigger_mfd_update(self):
"""Triggers MFD reprocessing if not in test mode."""
if self.state.shutting_down or self.state.test_mode_active:
return
if hasattr(self, "image_pipeline") and self.image_pipeline:
# Ask the pipeline to process the current MFD data in state
self.image_pipeline.process_mfd_for_display()
def _trigger_map_update_from_sar(self):
"""Triggers a full map update based on current SAR data."""
mgr = getattr(self, "map_integration_manager", None)
# Check if map is enabled and manager exists
if self.state.shutting_down or not config.ENABLE_MAP_OVERLAY or not mgr:
return
geo = self.state.current_sar_geo_info
sar = self.state.current_sar_normalized
# Check if data needed for update is valid
if geo and geo.get("valid") and sar is not None and sar.size > 0:
try:
# Call manager method to update map and overlay
mgr.update_map_overlay(sar, geo)
except Exception as e:
logging.exception(
f"[App Trigger Map Update] Error calling manager: {e}"
)
# --- Periodic Update Scheduling ---
def schedule_periodic_updates(self):
"""Schedules the regular update of the status bar."""
if self.state.shutting_down:
return
# Update status text now
self.update_status()
# Calculate delay in milliseconds
interval_ms = max(100, int(config.LOG_UPDATE_INTERVAL * 1000))
# Reschedule using root.after if window still exists
if self.root and self.root.winfo_exists():
try:
self.root.after(interval_ms, self.schedule_periodic_updates)
except Exception:
# Ignore errors during shutdown
pass
# --- Queue Processors ---
def process_sar_queue(self):
"""Gets processed SAR image from queue and displays it."""
log_prefix = "[App QProc SAR]"
image_to_display = None
if self.state.shutting_down:
return
try:
# Get item non-blockingly
image_to_display = self.sar_queue.get(block=False)
self.sar_queue.task_done()
except queue.Empty:
pass # Normal case if queue is empty
except Exception as e:
logging.exception(f"{log_prefix} Error getting from SAR display queue:")
# If an image was retrieved, display it
if image_to_display is not None:
display_mgr = getattr(self, "display_manager", None)
if display_mgr:
try:
display_mgr.show_sar_image(image_to_display)
except Exception as display_e:
logging.exception(f"{log_prefix} Error calling show_sar_image:")
else:
# This should not happen if init sequence is correct
logging.error(f"{log_prefix} DisplayManager not available.")
# Reschedule this processor if not shutting down
if not self.state.shutting_down:
self._reschedule_queue_processor(self.process_sar_queue)
def process_mfd_queue(self):
"""Gets processed MFD image from queue and displays it."""
log_prefix = "[App QProc MFD]"
image_to_display = None
if self.state.shutting_down:
return
try:
image_to_display = self.mfd_queue.get(block=False)
self.mfd_queue.task_done()
except queue.Empty:
pass
except Exception as e:
logging.exception(f"{log_prefix} Error getting from MFD display queue:")
if image_to_display is not None:
display_mgr = getattr(self, "display_manager", None)
if display_mgr:
try:
display_mgr.show_mfd_image(image_to_display)
except Exception as display_e:
logging.exception(f"{log_prefix} Error calling show_mfd_image:")
else:
logging.error(f"{log_prefix} DisplayManager not available.")
# Reschedule this processor if not shutting down
if not self.state.shutting_down:
self._reschedule_queue_processor(self.process_mfd_queue)
# --- Tkinter Queue Processor ---
def process_tkinter_queue(self):
"""Processes commands from queue for UI/State updates in main thread."""
log_prefix = "[App QProc Tkinter]"
if self.state.shutting_down:
return
item: Optional[Tuple[str, Any]] = None
try:
# Get item non-blockingly
item = self.tkinter_queue.get(block=False)
self.tkinter_queue.task_done()
except queue.Empty:
pass # Queue is empty, do nothing this cycle
except Exception as e:
logging.exception(f"{log_prefix} Error getting from queue:")
item = None # Ensure item is None if error occurs
# Process item if one was retrieved
if item is not None:
try:
# Check if item is a tuple (command, payload)
if isinstance(item, tuple) and len(item) == 2:
command, payload = item
# logging.debug(f"{log_prefix} Processing command: {command}")
# Call the appropriate handler based on command string
if command == "MOUSE_COORDS":
self._handle_sar_mouse_coords_update(payload)
elif command == "MAP_MOUSE_COORDS":
self._handle_map_mouse_coords_update(payload)
elif command == "SHOW_MAP":
self._handle_show_map_update(payload)
elif command == "REDRAW_MAP":
self._handle_redraw_map_command()
elif command == "SAR_CLICK_UPDATE":
self._handle_sar_click_update(payload)
elif command == "MAP_CLICK_UPDATE":
self._handle_map_click_update(payload)
elif command == "SAR_METADATA_UPDATE":
self._handle_sar_metadata_update(payload)
else:
# Log if an unknown command is received
logging.warning(f"{log_prefix} Unknown command: {command}")
else:
# Log if item format is unexpected
logging.warning(f"{log_prefix} Invalid item type: {type(item)}")
except Exception as e:
logging.exception(f"{log_prefix} Error processing item:")
# Reschedule this processor if not shutting down
if not self.state.shutting_down:
# Use a slightly faster delay for UI responsiveness?
self._reschedule_queue_processor(self.process_tkinter_queue, delay=50)
# --- Tkinter Queue Command Handlers ---
def _handle_sar_mouse_coords_update(self, payload: Optional[Tuple[str, str]]):
"""Updates the SAR mouse coordinates UI Entry widget."""
cp = getattr(self, "control_panel", None)
if not cp:
return
# Unpack payload or use default "N/A"
lat_s, lon_s = (
payload
if (payload and isinstance(payload, tuple) and len(payload) == 2)
else ("N/A", "N/A")
)
try:
# Call method on UI object to update the text
cp.set_mouse_coordinates(lat_s, lon_s)
except Exception as e:
logging.warning(f"[App UI Update] Error updating SAR mouse coords: {e}")
def _handle_map_mouse_coords_update(self, payload: Optional[Tuple[int, int]]):
"""Handles MAP_MOUSE_COORDS command: converts pixels to geo and updates UI."""
lat_s, lon_s = ("N/A", "N/A")
cp = getattr(self, "control_panel", None)
if cp and payload and isinstance(payload, tuple) and len(payload) == 2:
# Convert map pixel coords to geo coords using MapIntegrationManager
mgr = getattr(self, "map_integration_manager", None)
if mgr:
geo_coords = mgr.get_geo_coords_from_map_pixel(payload[0], payload[1])
# If conversion successful, format to DMS
if geo_coords:
lat_s_calc = decimal_to_dms(geo_coords[0], True)
lon_s_calc = decimal_to_dms(geo_coords[1], False)
# Check for formatting errors
if (
"Error" not in lat_s_calc
and "Invalid" not in lat_s_calc
and "Error" not in lon_s_calc
and "Invalid" not in lon_s_calc
):
lat_s, lon_s = lat_s_calc, lon_s_calc
else:
# Use error string if DMS formatting failed
lat_s, lon_s = "Error DMS", "Error DMS"
# Update the UI element
try:
cp.set_map_mouse_coordinates(lat_s, lon_s)
except Exception as e:
logging.warning(f"[App UI Update] Error updating map mouse coords: {e}")
elif cp:
# Clear UI if payload was invalid
try:
cp.set_map_mouse_coordinates("N/A", "N/A")
except Exception:
pass # Ignore errors if UI closed
def _handle_show_map_update(self, payload: Optional[ImageType]):
"""Handles the SHOW_MAP command by delegating display to MapIntegrationManager."""
mgr = getattr(self, "map_integration_manager", None)
if mgr:
try:
# Call the display method on the map manager
mgr.display_map(payload)
except Exception as e:
logging.exception(
"[App QProc Tkinter] Error calling map_integration_manager.display_map:"
)
else:
logging.warning(
"[App QProc Tkinter] Received map display command but MapIntegrationManager not active."
)
def _handle_redraw_map_command(self):
"""Handles the REDRAW_MAP command for simple map recomposition."""
mgr = getattr(self, "map_integration_manager", None)
if mgr:
try:
# Call the recomposition method on the map manager
mgr._recompose_map_overlay()
except Exception as e:
logging.exception(
"[App QProc Tkinter] Error calling map_integration_manager._recompose_map_overlay:"
)
else:
logging.warning(
"[App QProc Tkinter] REDRAW_MAP ignored: Map manager unavailable."
)
def _handle_sar_click_update(self, payload: Optional[Tuple[int, int]]):
"""Updates the SAR click coordinates state and triggers a SAR redraw."""
log_prefix = "[App SAR Click State]"
if self.state.shutting_down:
return
# Validate payload (should be (x, y) tuple)
if payload and isinstance(payload, tuple) and len(payload) == 2:
# Store the pixel coordinates in AppState
self.state.last_sar_click_coords = payload
logging.debug(
f"{log_prefix} Updated state.last_sar_click_coords to {payload}"
)
# Trigger SAR redraw pipeline to include the marker
self._trigger_sar_update()
else:
logging.warning(f"{log_prefix} Received invalid payload: {payload}")
def _handle_map_click_update(self, payload: Optional[Tuple[int, int]]):
"""Updates the Map click coordinates state and triggers a Map redraw."""
log_prefix = "[App Map Click State]"
if self.state.shutting_down:
return
# Validate payload
if payload and isinstance(payload, tuple) and len(payload) == 2:
# Store the pixel coordinates in AppState
self.state.last_map_click_coords = payload
logging.debug(
f"{log_prefix} Updated state.last_map_click_coords to {payload}"
)
# Trigger map redraw (recomposition should pick up the new marker state)
self.trigger_map_redraw(full_update=False)
else:
logging.warning(f"{log_prefix} Received invalid payload: {payload}")
def _handle_sar_metadata_update(self, metadata_string: Optional[str]):
"""Updates the metadata display text widget."""
log_prefix = "[App Meta Display]"
if self.state.shutting_down:
return
# Check if the text widget exists (attribute of ControlPanelApp)
text_widget = getattr(self, "metadata_display_text", None)
if text_widget and isinstance(metadata_string, str):
# Cache the last received string in state
self.state.last_sar_metadata_str = metadata_string
# Update UI only if the display is currently enabled
if self.state.display_sar_metadata:
logging.debug(f"{log_prefix} Updating metadata display widget.")
self.set_metadata_display(metadata_string) # Use method on self
else:
# Store it, but don't update UI if checkbox is off
logging.debug(
f"{log_prefix} Metadata received but display is disabled. Cached."
)
elif text_widget and metadata_string is None:
# Handle case where None might be sent (e.g., error during formatting)
self.state.last_sar_metadata_str = None
if self.state.display_sar_metadata:
self.set_metadata_display("<Error retrieving metadata>")
elif not text_widget:
# This shouldn't happen if init is correct
logging.warning(
f"{log_prefix} Metadata text widget not available to display metadata."
)
def _reschedule_queue_processor(self, processor_func, delay: Optional[int] = None):
"""Helper method to reschedule a queue processor function using root.after."""
if self.state.shutting_down:
return # Don't reschedule if shutting down
if delay is None:
# Determine default delay based on processor type
if processor_func in [self.process_sar_queue, self.process_mfd_queue]:
# Target slightly faster than FPS for image display queues
target_fps = config.MFD_FPS
calculated_delay = 1000 / (target_fps * 1.5) if target_fps > 0 else 20
delay = max(10, int(calculated_delay)) # Minimum delay 10ms
else:
# Default delay for other queues (tkinter, mouse)
delay = 50 # Use slightly faster delay for UI responsiveness
try:
# Schedule only if root window exists
if self.root and self.root.winfo_exists():
self.root.after(delay, processor_func)
except Exception as e:
# Log error only if not shutting down
if not self.state.shutting_down:
logging.warning(
f"[App Rescheduler] Error rescheduling {processor_func.__name__}: {e}"
)
# --- Mouse Coordinate Handling (SAR) ---
def process_mouse_queue(self):
"""Processes raw SAR mouse coords from queue, calculates geo coords, queues result."""
log_prefix = "[App GeoCalc]"
if self.state.shutting_down:
return
raw_coords = None
try:
# Get raw pixel coordinates from the dedicated mouse queue
raw_coords = self.mouse_queue.get(block=False)
self.mouse_queue.task_done()
except queue.Empty:
pass # Nothing to process
except Exception as e:
logging.exception(f"{log_prefix} Error getting from mouse queue:")
# Process if valid coordinates received
if isinstance(raw_coords, tuple) and len(raw_coords) == 2:
x_disp, y_disp = raw_coords
geo = self.state.current_sar_geo_info
disp_w = self.state.sar_display_width
disp_h = self.state.sar_display_height
lat_s, lon_s = "N/A", "N/A"
# Check if geo info is valid for calculation
is_geo_valid_for_calc = (
geo
and geo.get("valid")
and disp_w > 0
and disp_h > 0
and geo.get("width_px", 0) > 0
and geo.get("height_px", 0) > 0
and geo.get("scale_x", 0.0) > 0
and geo.get("scale_y", 0.0) > 0
and all(
k in geo for k in ["lat", "lon", "ref_x", "ref_y", "orientation"]
)
)
if is_geo_valid_for_calc:
try:
# Extract values from geo info dictionary
orig_w: int = geo["width_px"]
orig_h: int = geo["height_px"]
scale_x: float = geo["scale_x"]
scale_y: float = geo["scale_y"]
ref_x: int = geo["ref_x"]
ref_y: int = geo["ref_y"]
ref_lat_rad: float = geo["lat"]
ref_lon_rad: float = geo["lon"]
angle_rad: float = geo.get("orientation", 0.0)
# --- Simplified Geo Calculation ---
# TODO: Implement more accurate calculation accounting for rotation
# Scale display coords to approximate original image pixel coords
orig_x: float = (x_disp / disp_w) * orig_w
orig_y: float = (y_disp / disp_h) * orig_h
# Calculate distance from reference pixel
pixel_delta_x: float = orig_x - ref_x
pixel_delta_y: float = ref_y - orig_y # Y increases downwards
# Convert pixel delta to meters
meters_delta_x: float = pixel_delta_x * scale_x
meters_delta_y: float = pixel_delta_y * scale_y
# Convert meter offsets to degree offsets (approximate)
M_PER_DLAT: float = 111132.954
M_PER_DLON_EQ: float = 111319.488
m_per_dlon: float = max(
abs(M_PER_DLON_EQ * math.cos(ref_lat_rad)), 1e-3
)
lat_offset_deg: float = meters_delta_y / M_PER_DLAT
lon_offset_deg: float = meters_delta_x / m_per_dlon
# Calculate final coordinates
final_lat_deg: float = math.degrees(ref_lat_rad) + lat_offset_deg
final_lon_deg: float = math.degrees(ref_lon_rad) + lon_offset_deg
# Validate calculated coordinates and format to DMS
lat_valid = (
math.isfinite(final_lat_deg) and abs(final_lat_deg) <= 90.0
)
lon_valid = (
math.isfinite(final_lon_deg) and abs(final_lon_deg) <= 180.0
)
if lat_valid and lon_valid:
lat_s_calc = decimal_to_dms(final_lat_deg, True)
lon_s_calc = decimal_to_dms(final_lon_deg, False)
# Check for formatting errors
if (
"Error" not in lat_s_calc
and "Invalid" not in lat_s_calc
and "Error" not in lon_s_calc
and "Invalid" not in lon_s_calc
):
lat_s, lon_s = lat_s_calc, lon_s_calc
else:
lat_s, lon_s = "Error DMS", "Error DMS"
else:
lat_s, lon_s = "Invalid Calc", "Invalid Calc"
except KeyError as ke:
logging.error(f"{log_prefix} Missing key in geo_info: {ke}")
lat_s, lon_s = "Error Key", "Error Key"
except Exception as calc_e:
logging.exception(f"{log_prefix} Geo calculation error:")
lat_s, lon_s = "Calc Error", "Calc Error"
# Queue Result (DMS strings or error strings) for UI update
self.put_mouse_coordinates_queue(("MOUSE_COORDS", (lat_s, lon_s)))
# Reschedule processor
if not self.state.shutting_down:
self._reschedule_queue_processor(self.process_mouse_queue, delay=100)
def put_mouse_coordinates_queue(
self, command_payload_tuple: Tuple[str, Tuple[str, str]]
):
"""Puts processed mouse coords tuple onto Tkinter queue."""
log_prefix = "[App Mouse Queue Put]"
if self.state.shutting_down:
return
command, payload = command_payload_tuple
# logging.debug(f"{log_prefix} Putting command '{command}' payload {payload} onto tkinter_queue.")
put_queue(
queue_obj=self.tkinter_queue,
item=(command, payload),
queue_name="tkinter",
app_instance=self,
)
# --- Metadata Display Update Method ---
def set_metadata_display(self, text: str):
"""Safely updates the content of the metadata tk.Text widget."""
# Access the widget stored as an attribute of the app instance
text_widget = getattr(self, "metadata_display_text", None)
if not text_widget:
logging.warning(
"[App UI Update] set_metadata_display called but text widget not found on app."
)
return
try:
# Ensure the widget still exists before configuring
if text_widget.winfo_exists():
text_widget.config(state="normal") # Enable editing
text_widget.delete("1.0", tk.END) # Clear existing content
text_widget.insert("1.0", text) # Insert new text
text_widget.config(state="disabled") # Disable editing
except Exception as e:
logging.warning(f"[App UI Update] Error setting metadata display text: {e}")
# --- Status Update ---
def update_status(self):
"""Updates status bar text and statistics display."""
if not hasattr(self, "state") or self.state.shutting_down:
return
# Avoid updating if map is still loading initially
try:
sb = getattr(self, "statusbar", None)
if sb and sb.winfo_exists() and "Loading" in sb.cget("text"):
return
except Exception:
pass # Ignore errors checking status bar text
stats = self.state.get_statistics()
try:
# Determine current mode
mode = (
"Test"
if self.state.test_mode_active
else ("Local" if config.USE_LOCAL_IMAGES else "Network")
)
# Check if map is active
map_on = (
" MapOn"
if (
config.ENABLE_MAP_OVERLAY
and hasattr(self, "map_integration_manager")
and self.map_integration_manager
)
else ""
)
# Format FPS strings
mfd_fps = f"MFD:{self.state.mfd_fps:.1f}fps"
sar_fps = (
f"SAR:{self.state.sar_fps:.1f}fps"
if self.state.sar_fps > 0
else "SAR:N/A"
)
# Construct status prefix
status_prefix = f"Status: {mode}{map_on} | {mfd_fps} | {sar_fps}"
# Format statistics strings
drop = (
f"Drop(Q): S={stats['dropped_sar_q']},"
f"M={stats['dropped_mfd_q']},"
f"Tk={stats['dropped_tk_q']},"
f"Mo={stats['dropped_mouse_q']}"
)
incmpl = (
f"Incmpl(RX): S={stats['incomplete_sar_rx']},"
f"M={stats['incomplete_mfd_rx']}"
)
# Schedule UI updates using after_idle for thread safety
if self.root and self.root.winfo_exists():
# Update status bar
if sb and sb.winfo_exists():
final_status = status_prefix # Keep status bar cleaner
self.root.after_idle(sb.set_status_text, final_status)
# Update statistics display in the control panel
cp = getattr(self, "control_panel", None)
if cp and cp.winfo_exists():
self.root.after_idle(cp.set_statistics_display, drop, incmpl)
except Exception as e:
# Log warning but don't crash the app for status update errors
logging.warning(f"[App Status Update] Error during status update: {e}")
# --- Cleanup ---
def close_app(self):
"""Performs graceful shutdown of all components."""
# Prevent multiple shutdown attempts
if hasattr(self, "state") and self.state.shutting_down:
return
# Should not happen, but safety check
if not hasattr(self, "state"):
sys.exit(1)
logging.info("[App Shutdown] Starting shutdown sequence...")
self.state.shutting_down = True
# Attempt to update status bar
try:
self.set_status("Closing...")
except Exception:
pass
# Shutdown components in reverse order of dependency
if hasattr(self, "test_mode_manager"):
self.test_mode_manager.stop_timers()
if hasattr(self, "map_integration_manager") and self.map_integration_manager:
self.map_integration_manager.shutdown()
if hasattr(self, "image_recorder") and self.image_recorder:
self.image_recorder.shutdown()
# Network cleanup
if self.udp_socket:
close_udp_socket(self.udp_socket)
self.udp_socket = None
# Join receiver thread
if self.udp_thread and self.udp_thread.is_alive():
logging.debug("[App Shutdown] Joining UDP receiver thread...")
self.udp_thread.join(timeout=0.5) # Short timeout
if self.udp_thread.is_alive():
logging.warning("[App Shutdown] UDP thread did not join cleanly.")
# Shutdown worker pool
pool = (
getattr(self.udp_receiver, "executor", None)
if hasattr(self, "udp_receiver")
else None
)
if pool:
logging.debug("[App Shutdown] Shutting down ThreadPoolExecutor...")
pool.shutdown(wait=False, cancel_futures=True)
# Display cleanup
if hasattr(self, "display_manager"):
self.display_manager.destroy_windows()
# Short wait for OpenCV windows to close
try:
cv2.waitKey(5)
except Exception:
pass
# Tkinter cleanup
try:
if self.root and self.root.winfo_exists():
logging.debug("[App Shutdown] Destroying Tkinter root window...")
self.root.destroy()
except Exception as e:
logging.exception(f"[App Shutdown] Error destroying Tkinter window: {e}")
logging.info("[App Shutdown] Application close sequence finished.")
# Use sys.exit(0) for a clean exit code
sys.exit(0)
# --- Main Execution Block ---
if __name__ == "__main__":
root = None
app_instance = None
try:
# Check for critical map library dependencies if map is enabled
if config.ENABLE_MAP_OVERLAY and not MAP_MODULES_LOADED:
msg = "Map enabled but required modules failed. Cannot start."
logging.critical(f"[App Main] {msg}")
print(f"ERROR: {msg} Check logs for missing libraries.")
sys.exit(1)
# Create main Tkinter window
root = create_main_window(
"Control Panel",
config.TKINTER_MIN_WIDTH,
config.TKINTER_MIN_HEIGHT,
10,
10, # Initial position, App constructor calculates final
)
# Instantiate the main application class
app_instance = ControlPanelApp(root) # Pass root window
# Set the window close protocol handler
root.protocol("WM_DELETE_WINDOW", app_instance.close_app)
# Start the Tkinter main event loop
root.mainloop()
except SystemExit as exit_e:
# Handle clean exits initiated by sys.exit()
exit_code = exit_e.code if isinstance(exit_e.code, int) else 1
log_level = logging.INFO if exit_code == 0 else logging.WARNING
# Log the exit code without re-raising SystemExit
logging.log(
log_level, f"[App Main] Application exited via sys.exit({exit_code})."
)
except ImportError as imp_err:
# Handle critical import errors during startup
logging.critical(
f"[App Main] CRITICAL IMPORT ERROR: {imp_err}. Application cannot start.",
exc_info=True,
)
print(
f"\nCRITICAL ERROR: Missing required library - {imp_err}\n"
"Please install the necessary libraries (check logs/readme) and try again.\n"
)
sys.exit(1)
except Exception as e:
# Catch any other unhandled exceptions during startup or main loop
logging.critical(
"[App Main] UNHANDLED EXCEPTION during startup or main loop:", exc_info=True
)
print(
"\nFATAL ERROR: An unhandled exception occurred. Check logs for details.\n"
)
sys.exit(1)
finally:
# Ensure logging is shut down properly on any exit path
logging.info("=== App End ===")
logging.shutdown()
# --- END OF FILE ControlPanel.py ---