SXXXXXXX_ControlPanel/ControlPanel.py

2782 lines
119 KiB
Python

# --- START OF FILE ControlPanel.py - PART 1 ---
# ControlPanel.py (Previously app.py)
"""
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
Main application module for the Control Panel application.
Orchestrates UI, display, network reception, image processing pipeline,
test mode management, map integration, and state management.
Initializes all sub-modules and manages the main application lifecycle.
"""
# --- Standard library imports ---
import threading
import time
import queue
import os
import logging
import math
import sys
import socket # Required for network setup
from typing import Optional, Tuple, Any, Dict, TYPE_CHECKING
import datetime
# --- Third-party imports ---
import tkinter as tk
from tkinter import ttk
from tkinter import colorchooser
import numpy as np
import cv2 # Keep for potential utility/fallback use
import screeninfo
try:
from PIL import Image, ImageTk # ImageTk might be needed for icon
except ImportError:
Image = None # Define as None if Pillow not installed
ImageTk = None
logging.critical(
"[App Init] Pillow library not found. Map/Image functionality will fail."
)
# --- Configuration Import ---
import config
# --- Logging Setup ---
try:
from logging_config import setup_logging
setup_logging()
except ImportError:
# Fallback basic configuration if logging_config fails
print("ERROR: logging_config.py not found. Using basic logging.")
logging.basicConfig(
level=logging.WARNING,
format="%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s",
)
# --- Application Modules Import ---
# Use UIPanel alias to avoid name clash with the main class if named ControlPanel
from ui import ControlPanel as UIPanel, StatusBar, create_main_window
from display import DisplayManager
from utils import (
put_queue,
clear_queue,
decimal_to_dms,
generate_sar_kml,
launch_google_earth,
cleanup_old_kml_files,
)
from network import create_udp_socket, close_udp_socket
from receiver import UdpReceiver
from app_state import AppState
from test_mode_manager import TestModeManager
from image_pipeline import ImagePipeline
from image_processing import (
load_image,
normalize_image,
apply_color_palette, # Needed by MapIntegrationManager
)
# --- Map related imports (Conditional) ---
map_libs_found = True
try:
# Core map libs needed by manager/utils
import mercantile
import pyproj
# Pillow already checked above
if Image is None:
raise ImportError("Pillow library failed to import earlier.")
except ImportError as map_lib_err:
logging.warning(
f"[App Init] Failed to import core map library ({map_lib_err}). "
"Map functionality disabled."
)
map_libs_found = False
# Define placeholders for type hinting if core libs failed
BaseMapService = None
MapTileManager = None
MapDisplayWindow = None
MapIntegrationManager = None
MapCalculationError = Exception
if map_libs_found:
try:
from map_services import get_map_service, BaseMapService
from map_manager import MapTileManager
from map_utils import (
get_bounding_box_from_center_size,
get_tile_ranges_for_bbox,
MapCalculationError,
)
from map_display import MapDisplayWindow
# Import the integration manager
from map_integration import MapIntegrationManager
MAP_MODULES_LOADED = True
except ImportError as map_import_err:
logging.warning(
f"[App Init] Failed to import specific map modules ({map_import_err}). "
"Map functionality disabled."
)
MAP_MODULES_LOADED = False
# Define placeholders if specific modules failed
BaseMapService = None
MapTileManager = None
MapDisplayWindow = None
MapIntegrationManager = None
MapCalculationError = Exception
else:
MAP_MODULES_LOADED = False
# Type checking block for App class reference in managers
if TYPE_CHECKING:
# This avoids circular imports at runtime but helps type checkers
pass
# --- Main Application Class ---
# Renamed from App to ControlPanelApp to avoid clash with ui.ControlPanel
class ControlPanelApp:
"""
Main application class. Manages UI, display, processing, network, state,
and orchestrates various managers (Test Mode, Image Pipeline, Map Integration).
"""
# --- Methods DEFINED BEFORE __init__ to be available for bindings ---
# --- Status Update Method ---
def set_status(self, message: str):
"""
Safely updates the main status message prefix in the status bar.
Uses after_idle for thread safety.
"""
log_prefix = "[App Set Status]"
# Check state exists and flag before proceeding
if not hasattr(self, "state") or self.state.shutting_down:
return
new_status_prefix = f"Status: {message}"
# Use INFO level for user-visible status changes
logging.info(f"{log_prefix} Request to set status message prefix: '{message}'")
def _update_status_text_on_main_thread():
"""Internal function to update status text, runs in main GUI thread."""
# Check state again inside the scheduled function
if not hasattr(self, "state") or self.state.shutting_down:
return
try:
# Check if statusbar exists and is valid Tkinter widget
statusbar_valid = (
hasattr(self, "statusbar")
and isinstance(self.statusbar, tk.Widget)
and self.statusbar.winfo_exists()
)
if not statusbar_valid:
logging.warning(
f"{log_prefix} Statusbar widget not available, "
"cannot update status."
)
return
# Get current text and parse existing parts (keep info after '|')
current_text: str = self.statusbar.cget("text")
parts = current_text.split("|")
suffix = ""
# Rebuild suffix from parts after the first one
if len(parts) > 1:
suffix_parts = [p.strip() for p in parts[1:] if p.strip()]
if suffix_parts:
suffix = " | " + " | ".join(suffix_parts)
# Combine new prefix and existing suffix
final_text = f"{new_status_prefix}{suffix}"
logging.debug(
f"{log_prefix} Updating status bar text to: '{final_text}'"
)
# Call StatusBar's method to update
self.statusbar.set_status_text(final_text)
except tk.TclError as e:
# Log TclError (widget destroyed) if not shutting down
if not self.state.shutting_down:
logging.warning(f"{log_prefix} TclError setting status text: {e}")
except Exception as e:
# Log other unexpected errors
logging.exception(f"{log_prefix} Error updating status bar text:")
# Schedule the update on the main GUI thread
try:
root_valid = (
hasattr(self, "root") and self.root and self.root.winfo_exists()
)
if root_valid:
# Use after_idle to ensure it runs when Tkinter is idle
self.root.after_idle(_update_status_text_on_main_thread)
# else: Don't log if root is None or destroyed, expected during shutdown
except Exception as e:
# Log error only if not shutting down
if not hasattr(self, "state") or not self.state.shutting_down:
logging.warning(
f"{log_prefix} Error scheduling status update via after_idle: {e}"
)
# --- LUT Generation Methods ---
def update_brightness_contrast_lut(self):
"""Recalculates the SAR B/C LUT based on AppState and stores it back in AppState."""
log_prefix = "[App Update SAR LUT]"
logging.debug(f"{log_prefix} Updating SAR Brightness/Contrast LUT...")
# Check if state is initialized
if not hasattr(self, "state"):
logging.error(f"{log_prefix} AppState not ready for LUT update.")
return
try:
# Ensure contrast has a minimum positive value
contrast_val = max(0.01, self.state.sar_contrast)
brightness_val = self.state.sar_brightness
except AttributeError:
logging.error(
f"{log_prefix} Error accessing state for SAR LUT parameters "
f"(AttributeError)."
)
return
except Exception as e:
logging.error(
f"{log_prefix} Unexpected error accessing state for SAR LUT params: {e}"
)
return
# Calculate the LUT using numpy vectorized operations
try:
lut_values = np.arange(256, dtype=np.float32)
adjusted_values = (lut_values * contrast_val) + brightness_val
# Clip and round values to uint8 range
lut = np.clip(np.round(adjusted_values), 0, 255).astype(np.uint8)
except Exception as e:
logging.exception(f"{log_prefix} Error calculating SAR B/C LUT:")
# Create identity LUT as fallback
identity_lut = np.arange(256, dtype=np.uint8)
self.state.brightness_contrast_lut = identity_lut
logging.error(
f"{log_prefix} Using identity SAR LUT as fallback "
"due to calculation error."
)
return
# Store the calculated LUT back into AppState
self.state.brightness_contrast_lut = lut
logging.debug(f"{log_prefix} SAR B/C LUT updated successfully in AppState.")
def update_mfd_lut(self):
"""Recalculates the MFD LUT based on AppState parameters and stores it back in AppState."""
log_prefix = "[MFD LUT Update]"
logging.debug(f"{log_prefix} Recalculating MFD Color LUT...")
if not hasattr(self, "state"):
logging.error(f"{log_prefix} AppState not ready for MFD LUT update.")
return
try:
# Read parameters from AppState safely
mfd_params = self.state.mfd_params
raw_map_intensity_factor = mfd_params["raw_map_intensity"] / 255.0
pixel_to_category = mfd_params["pixel_to_category"]
categories = mfd_params["categories"]
except AttributeError:
logging.error(
f"{log_prefix} Error accessing mfd_params state (AttributeError)."
)
return
except KeyError as ke:
logging.error(f"{log_prefix} Missing key in AppState mfd_params: {ke}")
return
except Exception as e:
logging.error(
f"{log_prefix} Unexpected error accessing state for MFD LUT params: {e}"
)
return
# Initialize a new LUT array (256 entries, 3 channels BGR) with zeros
new_lut = np.zeros((256, 3), dtype=np.uint8)
try:
# Iterate through all possible pixel index values (0-255)
for index_value in range(256):
category_name = pixel_to_category.get(index_value)
if category_name: # Categorized Pixels (0-31 typically)
cat_data = categories[category_name]
base_bgr = cat_data["color"]
intensity_factor = cat_data["intensity"] / 255.0
# Calculate final color components
final_b = float(base_bgr[0]) * intensity_factor
final_g = float(base_bgr[1]) * intensity_factor
final_r = float(base_bgr[2]) * intensity_factor
# Assign clamped values to LUT
new_lut[index_value, 0] = np.clip(int(round(final_b)), 0, 255)
new_lut[index_value, 1] = np.clip(int(round(final_g)), 0, 255)
new_lut[index_value, 2] = np.clip(int(round(final_r)), 0, 255)
elif 32 <= index_value <= 255: # Raw Map Pixels (32-255 typically)
# Remap index range [32, 255] to intensity [0, 255]
raw_intensity = (float(index_value) - 32.0) * (255.0 / 223.0)
# Apply raw map intensity factor
final_gray_float = raw_intensity * raw_map_intensity_factor
# Clip and convert to integer
final_gray_int = int(round(np.clip(final_gray_float, 0, 255)))
# Assign gray value to all BGR channels
new_lut[index_value, :] = final_gray_int
else: # Handle cases where index is < 32 but not found
if category_name is None:
logging.warning(
f"{log_prefix} Index {index_value} has no assigned category. "
"Defaulting to black."
)
# new_lut[index_value, :] is already [0, 0, 0] by initialization
# Store the completed LUT back into AppState
self.state.mfd_lut = new_lut
logging.debug(
f"{log_prefix} MFD LUT update complete and stored in AppState."
)
except KeyError as ke:
logging.error(
f"{log_prefix} Missing category key '{ke}' during MFD LUT generation."
)
self._apply_fallback_mfd_lut() # Apply fallback
except Exception as e:
logging.critical(
f"{log_prefix} CRITICAL error during MFD LUT generation:", exc_info=True
)
self._apply_fallback_mfd_lut() # Apply fallback
def _apply_fallback_mfd_lut(self):
"""Applies a simple grayscale ramp as a fallback MFD LUT in case of errors."""
log_prefix = "[MFD LUT Update]"
logging.error(
f"{log_prefix} Applying fallback grayscale MFD LUT due to previous errors."
)
if hasattr(self, "state"):
try:
gray_ramp = np.arange(256, dtype=np.uint8)
# Use OpenCV to efficiently convert grayscale ramp to BGR LUT format
fallback_lut = cv2.cvtColor(
gray_ramp[:, np.newaxis], cv2.COLOR_GRAY2BGR
)[
:, 0, :
] # Reshape result correctly
self.state.mfd_lut = fallback_lut
except Exception as fallback_e:
logging.critical(
f"{log_prefix} Failed even to create fallback MFD LUT: {fallback_e}"
)
# Ensure state LUT is a valid array even if fallback fails
self.state.mfd_lut = np.zeros((256, 3), dtype=np.uint8)
# --- UI Callback Methods ---
def update_image_mode(self): # UI Callback
"""Handles switching between Test and Normal Mode based on UI checkbox."""
log_prefix = "[App Mode Switch]"
if not hasattr(self, "state") or not hasattr(self, "test_mode_manager"):
logging.error(
f"{log_prefix} State or TestModeManager not initialized. "
"Cannot switch mode."
)
return
if self.state.shutting_down:
return # Ignore if shutting down
try:
is_test_req = False
# Safely access control panel and its variable
control_panel_ref = getattr(self, "control_panel", None)
test_var_ref = (
getattr(control_panel_ref, "test_image_var", None)
if control_panel_ref
else None
)
if test_var_ref and isinstance(test_var_ref, tk.Variable):
is_test_req = test_var_ref.get() == 1
else:
logging.warning(
f"{log_prefix} test_image_var not found or invalid in control_panel."
)
# Fallback to current state if UI var is missing
is_test_req = self.state.test_mode_active
# Perform switch only if requested mode differs from current state
if is_test_req != self.state.test_mode_active:
logging.info(
f"{log_prefix} Request to change Test Mode state to: {is_test_req}"
)
self.state.test_mode_active = is_test_req # Update state flag first
if self.state.test_mode_active:
if self.test_mode_manager.activate(): # Attempt activation
self.activate_test_mode_ui_actions()
else: # Manager activation failed
logging.error(
f"{log_prefix} TestModeManager activation failed. "
"Reverting UI and state."
)
self._revert_test_mode_ui() # Revert checkbox and state flag
else: # Deactivating test mode
self.test_mode_manager.deactivate() # Stop timers
self.deactivate_test_mode_ui_actions()
# Reset statistics whenever the mode successfully changes
self.state.reset_statistics()
self.update_status() # Update the status bar display
else:
logging.debug(
f"{log_prefix} Requested mode ({is_test_req}) is the same as current mode. "
"No change."
)
except tk.TclError as e:
logging.warning(
f"{log_prefix} UI error accessing checkbox state (TclError): {e}"
)
except AttributeError as ae:
logging.error(
f"{log_prefix} Missing attribute during mode update "
f"(manager init issue?): {ae}"
)
except Exception as e:
logging.exception(f"{log_prefix} Unexpected error during mode update:")
def update_sar_size(self, event=None): # UI Callback
"""
Callback for SAR size combobox change. Updates state and triggers processing.
Allows changing size even when map overlay is active.
"""
log_prefix = "[App CB SAR Size]"
if not hasattr(self, "state") or self.state.shutting_down:
logging.debug(f"{log_prefix} Shutdown or state not ready. Ignoring.")
return
try:
# Safely access control panel and combobox
control_panel_ref = getattr(self, "control_panel", None)
combo_ref = (
getattr(control_panel_ref, "sar_size_combo", None)
if control_panel_ref
else None
)
if not combo_ref:
logging.warning(f"{log_prefix} SAR size combobox not found.")
return
selected_size_str = combo_ref.get()
logging.debug(
f"{log_prefix} SAR display size selected: '{selected_size_str}'"
)
factor = 1 # Default factor
if selected_size_str and ":" in selected_size_str:
try:
factor_str = selected_size_str.split(":")[1]
factor = int(factor_str)
if factor <= 0:
raise ValueError("Factor must be positive")
except (IndexError, ValueError) as parse_err:
logging.warning(
f"{log_prefix} Invalid SAR size factor: '{selected_size_str}'. "
f"Error: {parse_err}. Using factor 1."
)
factor = 1
# Attempt to reset UI combobox to a valid value
try:
current_factor = 1
if self.state.sar_display_width > 0:
current_factor = (
config.SAR_WIDTH // self.state.sar_display_width
)
current_size_str = f"1:{current_factor}"
# Ensure the reset value is actually in the list
if current_size_str not in config.SAR_SIZE_FACTORS:
current_size_str = config.DEFAULT_SAR_SIZE
combo_ref.set(current_size_str)
except Exception as reset_e:
logging.warning(
f"{log_prefix} Failed to reset SAR size combobox UI: {reset_e}"
)
# Calculate new dimensions, ensuring minimum 1x1
new_width = max(1, config.SAR_WIDTH // factor)
new_height = max(1, config.SAR_HEIGHT // factor)
# Update state and trigger processing only if size actually changed
if (
new_width != self.state.sar_display_width
or new_height != self.state.sar_display_height
):
logging.info(
f"{log_prefix} Requesting SAR display size update to "
f"{new_width}x{new_height} (Factor 1:{factor})"
)
self.state.update_sar_display_size(new_width, new_height)
self._trigger_sar_update() # Trigger SAR reprocessing
else:
logging.debug(
f"{log_prefix} Selected size {new_width}x{new_height} is the same as current."
)
except tk.TclError as e:
if not self.state.shutting_down:
logging.warning(
f"{log_prefix} TclError accessing SAR size combobox: {e}"
)
except Exception as e:
if not self.state.shutting_down:
logging.exception(f"{log_prefix} Error processing SAR size update:")
def update_contrast(self, value_str: str): # UI Callback
"""Callback for SAR contrast slider. Updates state, LUT, and triggers display update."""
log_prefix = "[App CB SAR Contrast]"
if self.state.shutting_down:
return
try:
contrast = float(value_str)
self.state.update_sar_parameters(contrast=contrast)
self.update_brightness_contrast_lut()
self._trigger_sar_update()
except ValueError:
logging.warning(
f"{log_prefix} Invalid contrast value received: {value_str}"
)
except Exception as e:
logging.exception(f"{log_prefix} Error updating contrast: {e}")
def update_brightness(self, value_str: str): # UI Callback
"""Callback for SAR brightness slider. Updates state, LUT, and triggers display update."""
log_prefix = "[App CB SAR Brightness]"
if self.state.shutting_down:
return
try:
# Slider value might be float, convert carefully
brightness = int(float(value_str))
self.state.update_sar_parameters(brightness=brightness)
self.update_brightness_contrast_lut()
self._trigger_sar_update()
except ValueError:
logging.warning(
f"{log_prefix} Invalid brightness value received: {value_str}"
)
except Exception as e:
logging.exception(f"{log_prefix} Error updating brightness: {e}")
def update_sar_palette(self, event=None): # UI Callback
"""Callback for SAR palette combobox. Updates state and triggers display update."""
log_prefix = "[App CB SAR Palette]"
if self.state.shutting_down:
return
try:
# Safely access control panel and combobox
control_panel_ref = getattr(self, "control_panel", None)
combo_ref = (
getattr(control_panel_ref, "palette_combo", None)
if control_panel_ref
else None
)
if not combo_ref:
logging.warning(f"{log_prefix} Palette combobox not found.")
return
palette = combo_ref.get()
logging.debug(f"{log_prefix} Palette changed to '{palette}'")
if palette in config.COLOR_PALETTES:
self.state.update_sar_parameters(palette=palette)
self._trigger_sar_update()
else:
logging.warning(
f"{log_prefix} Unknown palette selected: '{palette}'. Ignoring."
)
# Reset UI combobox to the actual current state value
combo_ref.set(self.state.sar_palette)
except Exception as e:
logging.exception(f"{log_prefix} Error updating SAR palette: {e}")
def update_mfd_category_intensity(
self, category_name: str, intensity_value: int
): # UI Callback
"""Callback for MFD category intensity slider. Updates state, LUT, and triggers display."""
log_prefix = "[App CB MFD Param Intensity]"
if self.state.shutting_down:
return
logging.debug(
f"{log_prefix} Category='{category_name}', Intensity={intensity_value}"
)
try:
# Value is already int from IntVar, clip just in case
intensity = np.clip(intensity_value, 0, 255)
if category_name in self.state.mfd_params["categories"]:
self.state.mfd_params["categories"][category_name][
"intensity"
] = intensity
logging.debug(
f"{log_prefix} Intensity for '{category_name}' set to {intensity} in AppState."
)
self.update_mfd_lut()
self._trigger_mfd_update()
else:
logging.warning(
f"{log_prefix} Unknown MFD category received: '{category_name}'"
)
except KeyError as ke:
logging.error(
f"{log_prefix} KeyError accessing MFD params for '{category_name}': {ke}"
)
except Exception as e:
logging.exception(
f"{log_prefix} Error updating MFD intensity for '{category_name}': {e}"
)
def choose_mfd_category_color(self, category_name: str): # UI Callback
"""Callback for MFD category color button. Opens chooser, updates state, LUT, UI, and triggers display."""
log_prefix = "[App CB MFD Param Color]"
if self.state.shutting_down:
return
logging.debug(
f"{log_prefix} Color chooser requested for Category='{category_name}'"
)
if category_name not in self.state.mfd_params["categories"]:
logging.warning(
f"{log_prefix} Cannot choose color for unknown MFD category: '{category_name}'"
)
return
try:
initial_bgr = self.state.mfd_params["categories"][category_name]["color"]
# Format BGR to HEX for color chooser initial color
initial_hex = (
f"#{initial_bgr[2]:02x}{initial_bgr[1]:02x}{initial_bgr[0]:02x}"
)
logging.debug(
f"{log_prefix} Opening color chooser (initial BGR: {initial_bgr}, initial HEX: {initial_hex})"
)
# Open the Tkinter color chooser dialog
color_code = colorchooser.askcolor(
title=f"Select Color for {category_name}", initialcolor=initial_hex
)
# Check if a color was selected (color_code is None if cancelled)
if color_code and color_code[0]:
rgb = color_code[0] # Returns tuple (R, G, B) as floats or ints
# Convert selected RGB to BGR tuple, ensuring ints and clamping
new_bgr = tuple(
np.clip(int(c), 0, 255) for c in (rgb[2], rgb[1], rgb[0])
)
logging.info(
f"{log_prefix} New color chosen for '{category_name}': RGB={rgb}, BGR={new_bgr}"
)
# Update state
self.state.mfd_params["categories"][category_name]["color"] = new_bgr
# Recalculate LUT
self.update_mfd_lut()
# Schedule UI update for color preview label
control_panel_ref = getattr(self, "control_panel", None)
if (
self.root
and self.root.winfo_exists()
and control_panel_ref
and hasattr(control_panel_ref, "update_mfd_color_display")
):
self.root.after_idle(
control_panel_ref.update_mfd_color_display,
category_name,
new_bgr,
)
# Trigger MFD display update
self._trigger_mfd_update()
else:
logging.debug(f"{log_prefix} Color selection cancelled.")
except KeyError as ke:
logging.error(
f"{log_prefix} KeyError accessing MFD params for '{category_name}': {ke}"
)
except Exception as e:
logging.exception(
f"{log_prefix} Error during color selection for '{category_name}': {e}"
)
def update_mfd_raw_map_intensity(self, intensity_value: int): # UI Callback
"""Callback for Raw Map intensity slider. Updates state, LUT, and triggers display."""
log_prefix = "[App CB MFD Param RawMap]"
if self.state.shutting_down:
return
logging.debug(
f"{log_prefix} Raw Map intensity slider changed -> {intensity_value}"
)
try:
# Value is already int from IntVar, clip just in case
intensity = np.clip(intensity_value, 0, 255)
self.state.mfd_params["raw_map_intensity"] = intensity
logging.info(
f"{log_prefix} Raw Map intensity set to {intensity} in AppState."
)
self.update_mfd_lut()
self._trigger_mfd_update()
except KeyError as ke:
logging.error(
f"{log_prefix} KeyError accessing MFD params for raw_map_intensity: {ke}"
)
except Exception as e:
logging.exception(f"{log_prefix} Error updating raw map intensity: {e}")
def update_map_size(self, event=None): # UI Callback for Map Size
"""Callback for Map size combobox change. Updates state and triggers map redraw."""
log_prefix = "[App CB Map Size]"
if not hasattr(self, "state") or self.state.shutting_down:
logging.debug(f"{log_prefix} Shutdown or state not ready. Ignoring.")
return
try:
# Safely access control panel and combobox
control_panel_ref = getattr(self, "control_panel", None)
combo_ref = (
getattr(control_panel_ref, "map_size_combo", None)
if control_panel_ref
else None
)
if not combo_ref:
logging.warning(
f"{log_prefix} Map size combobox not found in control panel."
)
return
selected_size_str = combo_ref.get()
logging.debug(
f"{log_prefix} Map display size selected: '{selected_size_str}'"
)
# Update the scale factor in AppState using its dedicated method
self.state.update_map_scale_factor(selected_size_str)
# Trigger a map redraw using the last known map image
# The map display logic will use the newly set scale factor
self.trigger_map_redraw() # Use helper function
except tk.TclError as e:
if not self.state.shutting_down:
logging.warning(
f"{log_prefix} TclError accessing Map size combobox: {e}"
)
except Exception as e:
if not self.state.shutting_down:
logging.exception(f"{log_prefix} Error processing Map size update: {e}")
# --- START OF FILE ControlPanel.py - PART 2 ---
# --- >>> START OF NEW CODE (Callbacks and Helper) <<< ---
# --- NEW UI Callback Methods ---
def toggle_sar_overlay(self): # UI Callback for SAR Overlay Checkbox
"""Handles SAR overlay checkbox state change."""
log_prefix = "[App CB SAR Overlay Toggle]"
if not hasattr(self, "state") or self.state.shutting_down:
return
try:
# Safely access control panel and checkbox variable
control_panel_ref = getattr(self, "control_panel", None)
overlay_var_ref = (
getattr(control_panel_ref, "sar_overlay_var", None)
if control_panel_ref
else None
)
if not overlay_var_ref or not isinstance(overlay_var_ref, tk.Variable):
logging.warning(
f"{log_prefix} SAR overlay checkbox var not found or invalid."
)
return
new_enabled_state = overlay_var_ref.get() # Get boolean value
logging.debug(
f"{log_prefix} SAR overlay checkbox toggled to: {new_enabled_state}"
)
# Update the state
self.state.update_map_overlay_params(enabled=new_enabled_state)
# Trigger a map content redraw (if map manager exists)
self.trigger_map_redraw() # Call helper
except tk.TclError as e:
if not self.state.shutting_down:
logging.warning(
f"{log_prefix} TclError accessing SAR overlay checkbox: {e}"
)
except Exception as e:
if not self.state.shutting_down:
logging.exception(
f"{log_prefix} Error processing SAR overlay toggle: {e}"
)
def update_sar_overlay_alpha(
self, value_str: str
): # UI Callback for SAR Overlay Alpha Slider
"""Handles SAR overlay alpha slider value change."""
log_prefix = "[App CB SAR Overlay Alpha]"
if not hasattr(self, "state") or self.state.shutting_down:
return
try:
# Slider passes value as string, convert to float
new_alpha = float(value_str)
logging.debug(
f"{log_prefix} SAR overlay alpha slider changed to: {new_alpha:.3f}"
)
# Update the state (method clamps value 0.0-1.0)
self.state.update_map_overlay_params(alpha=new_alpha)
# Trigger a map content redraw (if map manager exists)
self.trigger_map_redraw() # Call helper
except ValueError:
logging.warning(
f"{log_prefix} Invalid alpha value received from slider: {value_str}"
)
except Exception as e:
if not self.state.shutting_down:
logging.exception(
f"{log_prefix} Error processing SAR overlay alpha update: {e}"
)
def toggle_sar_recording(self): # UI Callback for Record SAR Checkbox
"""Handles SAR recording checkbox state change."""
log_prefix = "[App CB SAR Record Toggle]"
if not hasattr(self, "state") or self.state.shutting_down:
return
try:
# Safely access control panel and checkbox variable
control_panel_ref = getattr(self, "control_panel", None)
record_var_ref = (
getattr(control_panel_ref, "record_sar_var", None)
if control_panel_ref
else None
)
if not record_var_ref or not isinstance(record_var_ref, tk.Variable):
logging.warning(
f"{log_prefix} Record SAR checkbox var not found or invalid."
)
return
new_enabled_state = record_var_ref.get() # Get boolean value
logging.debug(
f"{log_prefix} Record SAR checkbox toggled to: {new_enabled_state}"
)
# Update the state using the dedicated method in AppState
self.state.update_sar_recording_enabled(enabled=new_enabled_state)
# No immediate action needed here, recording happens in receiver/recorder
except tk.TclError as e:
if not self.state.shutting_down:
logging.warning(
f"{log_prefix} TclError accessing Record SAR checkbox: {e}"
)
except Exception as e:
if not self.state.shutting_down:
logging.exception(
f"{log_prefix} Error processing Record SAR toggle: {e}"
)
# --- Helper to trigger map redraw ---
def trigger_map_redraw(self):
"""Requests a map redraw by putting a command on the Tkinter queue."""
log_prefix = "[App Trigger Map Redraw]"
if self.state.shutting_down:
return
# Check if map is actually enabled and manager exists
map_manager_exists = (
hasattr(self, "map_integration_manager")
and self.map_integration_manager is not None
)
if config.ENABLE_MAP_OVERLAY and map_manager_exists:
logging.debug(
f"{log_prefix} Queueing REDRAW_MAP command due to overlay parameter change."
)
# Use the existing REDRAW_MAP command. The map manager's display logic
# should inherently use the latest state (including alpha/enabled flags).
put_queue(
queue_obj=self.tkinter_queue,
item=("REDRAW_MAP", None), # Command, payload is ignored for redraw
queue_name="tkinter",
app_instance=self, # Pass self (ControlPanelApp instance) for context
)
else:
logging.debug(
f"{log_prefix} Map overlay not active, skipping redraw trigger."
)
# --- >>> END OF NEW CODE (Callbacks and Helper) <<< ---
# --- Initialization ---
def __init__(self, root: tk.Tk):
"""Initializes the main application components and state."""
log_prefix = "[App Init]"
logging.debug(f"{log_prefix} Starting application initialization...")
self.root = root
self.root.title("Control Panel")
# --- Set Icon ---
try:
# Define path relative to script location
script_dir = os.path.dirname(__file__)
icon_filename = "ControlPanel.ico"
icon_path = os.path.join(script_dir, icon_filename)
if os.path.exists(icon_path):
# iconbitmap is generally preferred for .ico on Windows
self.root.iconbitmap(default=icon_path)
logging.info(f"{log_prefix} Application icon set from: {icon_path}")
else:
logging.warning(
f"{log_prefix} Icon file not found: {icon_path}. Using default icon."
)
except tk.TclError as icon_err:
logging.warning(
f"{log_prefix} Failed to set application icon (TclError): {icon_err}"
)
except Exception as icon_e:
logging.exception(
f"{log_prefix} Unexpected error setting application icon:"
)
self.root.minsize(config.TKINTER_MIN_WIDTH, config.TKINTER_MIN_HEIGHT)
# --- Central State Initialization ---
self.state = AppState()
logging.debug(f"{log_prefix} AppState instance created.")
# --- Data Queues ---
self.sar_queue: queue.Queue = queue.Queue(maxsize=config.DEFAULT_SAR_QUEUE)
self.mouse_queue: queue.Queue = queue.Queue(maxsize=config.DEFAULT_MOUSE_QUEUE)
self.tkinter_queue: queue.Queue = queue.Queue(
maxsize=config.DEFAULT_TKINTER_QUEUE
)
self.mfd_queue: queue.Queue = queue.Queue(maxsize=config.DEFAULT_MFD_QUEUE)
logging.debug(f"{log_prefix} Data queues initialized.")
# --- Screen Info & Window Placement ---
screen_w, screen_h = self._get_screen_dimensions()
initial_sar_w, initial_sar_h = self._calculate_initial_sar_size()
self.tkinter_x, self.tkinter_y = self._calculate_tkinter_position(screen_h)
self.mfd_x, self.mfd_y = self._calculate_mfd_position()
self.sar_x, self.sar_y = self._calculate_sar_position(screen_w, initial_sar_w)
map_max_w = config.MAX_MAP_DISPLAY_WIDTH
if MapDisplayWindow:
map_max_w = getattr(
MapDisplayWindow, "MAX_DISPLAY_WIDTH", config.MAX_MAP_DISPLAY_WIDTH
)
map_x, map_y = self._calculate_map_position(screen_w, initial_sar_w, map_max_w)
self.root.geometry(
f"+{self.tkinter_x}+{self.tkinter_y}"
) # Position main window
logging.debug(
f"{log_prefix} Initial Window positions: Tk({self.tkinter_x},{self.tkinter_y}), "
f"MFD({self.mfd_x},{self.mfd_y}), SAR({self.sar_x},{self.sar_y}), "
f"MapEst({map_x},{map_y})"
)
# --- Initialize Sub-systems ---
# 1. UI Components
self.statusbar = StatusBar(self.root)
# Instantiate UI Frame, passing self (ControlPanelApp instance)
self.control_panel = UIPanel(self.root, self)
logging.debug(f"{log_prefix} UI components created.")
# 2. LUTs
self.update_brightness_contrast_lut()
self.update_mfd_lut()
logging.debug(f"{log_prefix} Initial LUTs generated.")
# 3. Display Manager
self.display_manager = DisplayManager(
app=self,
sar_queue=self.sar_queue,
mouse_queue=self.mouse_queue,
sar_x=self.sar_x,
sar_y=self.sar_y,
mfd_x=self.mfd_x,
mfd_y=self.mfd_y,
initial_sar_width=self.state.sar_display_width,
initial_sar_height=self.state.sar_display_height,
)
logging.debug(f"{log_prefix} DisplayManager created.")
try:
self.display_manager.initialize_display_windows()
except Exception as e:
self.set_status("Error: Display Init Failed")
logging.critical(
f"{log_prefix} Display window initialization failed: {e}", exc_info=True
)
# 4. Image Processing Pipeline
self.image_pipeline = ImagePipeline(
app_state=self.state,
sar_queue=self.sar_queue,
mfd_queue=self.mfd_queue,
app=self, # Pass app instance
)
logging.debug(f"{log_prefix} ImagePipeline created.")
# 5. Test Mode Manager
self.test_mode_manager = TestModeManager(
app_state=self.state,
root=self.root,
sar_queue=self.sar_queue,
mfd_queue=self.mfd_queue,
app=self, # Pass app instance
)
logging.debug(f"{log_prefix} TestModeManager created.")
# 6. Map Integration Manager
self.map_integration_manager: Optional[MapIntegrationManager] = None
if config.ENABLE_MAP_OVERLAY:
if MAP_MODULES_LOADED and MapIntegrationManager is not None:
logging.info(f"{log_prefix} Initializing MapIntegrationManager...")
try:
self.map_integration_manager = MapIntegrationManager(
app_state=self.state,
tkinter_queue=self.tkinter_queue,
app=self, # Pass app instance
map_x=map_x,
map_y=map_y,
)
logging.info(
f"{log_prefix} MapIntegrationManager initialized successfully."
)
except Exception as map_mgr_e:
logging.exception(
f"{log_prefix} Failed to initialize MapIntegrationManager:"
)
self.map_integration_manager = None
self.set_status("Error: Map Init Failed")
else:
logging.error(
f"{log_prefix} Map Overlay enabled but required modules/manager "
"failed to load."
)
self.set_status("Error: Map Modules Missing")
else:
logging.debug(f"{log_prefix} Map Overlay is disabled in configuration.")
# --- >>> START OF NEW CODE (Recorder Init) <<< ---
# 7. Image Recorder (Initialize after AppState)
self.image_recorder = None # Initialize as None
try:
# Attempt to import and initialize only if needed later
from image_recorder import (
ImageRecorder,
) # Import here or globally if preferred
self.image_recorder = ImageRecorder(app_state=self.state)
logging.info(f"{log_prefix} ImageRecorder initialized.")
except ImportError:
logging.warning(
f"{log_prefix} image_recorder.py not found or ImageRecorder class missing. "
"SAR recording disabled."
)
except Exception as rec_init_e:
logging.exception(f"{log_prefix} Failed to initialize ImageRecorder:")
self.image_recorder = None # Ensure it's None on error
# --- >>> END OF NEW CODE (Recorder Init) <<< ---
# 8. Set initial UI state labels
self._update_initial_ui_labels()
# 9. Network Setup
self.local_ip: str = config.DEFAULT_SER_IP
self.local_port: int = config.DEFAULT_SER_PORT
self.udp_socket: Optional[socket.socket] = None
self.udp_receiver: Optional[UdpReceiver] = None
self.udp_thread: Optional[threading.Thread] = None
if not config.USE_LOCAL_IMAGES:
self._setup_network_receiver()
else:
logging.info(
f"{log_prefix} Network receiver disabled (USE_LOCAL_IMAGES=True)."
)
# 10. Initial Image Load Thread
self._start_initial_image_loader()
# 11. Start Queue Processors
self.process_sar_queue()
self.process_mfd_queue()
self.process_mouse_queue()
self.process_tkinter_queue()
logging.debug(f"{log_prefix} Queue processors scheduled.")
# 12. Start Periodic Status Updates
self.schedule_periodic_updates()
logging.debug(f"{log_prefix} Periodic updates scheduled.")
# 13. Set initial image mode based on checkbox/config
self.update_image_mode()
logging.debug(f"{log_prefix} Initial image mode set.")
logging.info(f"{log_prefix} Application initialization sequence complete.")
# --- Initialization Helper Methods ---
def _get_screen_dimensions(self) -> Tuple[int, int]:
"""Gets primary screen dimensions."""
log_prefix = "[App Init]"
try:
monitors = screeninfo.get_monitors()
if not monitors:
raise screeninfo.ScreenInfoError("No monitors detected.")
# Assuming primary monitor is the first one
screen = monitors[0]
logging.debug(
f"{log_prefix} Detected Screen Dimensions: {screen.width}x{screen.height}"
)
return screen.width, screen.height
except Exception as e:
logging.warning(
f"{log_prefix} Screen info error: {e}. Using default 1920x1080."
)
return 1920, 1080
def _calculate_initial_sar_size(
self, desired_factor_if_map: int = 4
) -> Tuple[int, int]:
"""Calculates initial SAR display size based on config and map state."""
log_prefix = "[App Init]"
initial_w = self.state.sar_display_width
initial_h = self.state.sar_display_height
map_enabled_and_loaded = config.ENABLE_MAP_OVERLAY and MAP_MODULES_LOADED
if map_enabled_and_loaded:
forced_factor = max(1, desired_factor_if_map)
initial_w = config.SAR_WIDTH // forced_factor
initial_h = config.SAR_HEIGHT // forced_factor
self.state.update_sar_display_size(initial_w, initial_h) # Update state
logging.info(
f"{log_prefix} Map active, forcing SAR size to 1:{forced_factor} "
f"({initial_w}x{initial_h})."
)
else:
logging.debug(
f"{log_prefix} Using initial SAR size from state: {initial_w}x{initial_h}."
)
return initial_w, initial_h
def _calculate_tkinter_position(self, screen_h: int) -> Tuple[int, int]:
"""Calculates the initial X, Y position for the Tkinter control panel window."""
x = 10
# Position below MFD window + padding
y = config.INITIAL_MFD_HEIGHT + 40
# Adjust if it goes off screen
if y + config.TKINTER_MIN_HEIGHT > screen_h:
y = max(10, screen_h - config.TKINTER_MIN_HEIGHT - 10)
return x, y
def _calculate_mfd_position(self) -> Tuple[int, int]:
"""Calculates the initial X, Y position for the MFD display window."""
# Align with Tkinter window's left edge, near the top
x = self.tkinter_x
y = 10
return x, y
def _calculate_sar_position(
self, screen_w: int, initial_sar_w: int
) -> Tuple[int, int]:
"""Calculates the initial X, Y position for the SAR display window."""
# Position to the right of Tkinter window + padding
x = self.tkinter_x + config.TKINTER_MIN_WIDTH + 20
y = 10 # Align with top
# Adjust if it goes off screen
if x + initial_sar_w > screen_w:
x = max(10, screen_w - initial_sar_w - 10)
return x, y
def _calculate_map_position(
self, screen_w: int, current_sar_w: int, max_map_width: int
) -> Tuple[int, int]:
"""Calculates the initial X, Y position for the Map display window."""
# Position to the right of SAR window + padding
x = self.sar_x + current_sar_w + 20
y = 10 # Align with top
# Adjust if it goes off screen
if x + max_map_width > screen_w:
x = max(10, screen_w - max_map_width - 10)
return x, y
def _setup_network_receiver(self):
"""Creates and starts the UDP socket and receiver thread."""
log_prefix = "[App Init Network]"
logging.info(
f"{log_prefix} Attempting to start network receiver on "
f"{self.local_ip}:{self.local_port}"
)
self.udp_socket = create_udp_socket(self.local_ip, self.local_port)
if self.udp_socket:
try:
# Pass the image_recorder instance if it was created successfully
recorder_instance = getattr(self, "image_recorder", None)
self.udp_receiver = UdpReceiver(
app=self, # Pass self (ControlPanelApp)
udp_socket=self.udp_socket,
set_new_sar_image_callback=self.handle_new_sar_data,
set_new_mfd_indices_image_callback=self.handle_new_mfd_data,
image_recorder=recorder_instance, # Pass recorder instance
)
logging.info(f"{log_prefix} UdpReceiver instance created.")
self.udp_thread = threading.Thread(
target=self.udp_receiver.receive_udp_data,
name="UDPReceiverThread",
daemon=True,
)
self.udp_thread.start()
logging.info(f"{log_prefix} UDP Receiver thread started.")
self.set_status(f"Listening UDP {self.local_ip}:{self.local_port}")
except Exception as receiver_init_e:
logging.critical(
f"{log_prefix} Failed to initialize UdpReceiver: {receiver_init_e}",
exc_info=True,
)
self.set_status("Error: Receiver Init Failed")
# Clean up socket if receiver init failed
close_udp_socket(self.udp_socket)
self.udp_socket = None
else:
logging.error(f"{log_prefix} UDP socket creation failed.")
self.set_status("Error: UDP Socket Failed")
def _start_initial_image_loader(self):
"""Starts a background thread to load local/test images if needed."""
log_prefix = "[App Init]"
should_load = config.USE_LOCAL_IMAGES or config.ENABLE_TEST_MODE
if should_load:
logging.debug(f"{log_prefix} Starting initial image loading thread...")
image_loading_thread = threading.Thread(
target=self.load_initial_images, name="ImageLoaderThread", daemon=True
)
image_loading_thread.start()
else:
logging.debug(f"{log_prefix} Skipping initial image loading.")
# If not loading, schedule initial display setup immediately
if self.root and self.root.winfo_exists():
self.root.after_idle(self._set_initial_display_from_loaded_data)
def _update_initial_ui_labels(self):
"""Sets the initial text for UI info labels based on default AppState."""
log_prefix = "[App Init]"
logging.debug(f"{log_prefix} Setting initial UI info labels...")
# Safely access control_panel instance
control_panel_ref = getattr(self, "control_panel", None)
if not control_panel_ref:
logging.warning(f"{log_prefix} Control panel not ready for initial labels.")
return
try:
# SAR Center Label
default_geo = self.state.current_sar_geo_info
center_txt = "Image Ref: Lat=N/A, Lon=N/A"
if default_geo and default_geo.get("valid", False):
try:
lat_s = decimal_to_dms(math.degrees(default_geo["lat"]), True)
lon_s = decimal_to_dms(math.degrees(default_geo["lon"]), False)
center_txt = f"Image Ref: Lat={lat_s}, Lon={lon_s}"
except Exception as format_err:
logging.error(
f"{log_prefix} Error formatting initial geo label: {format_err}"
)
center_txt = "Image Ref: Format Error"
if hasattr(control_panel_ref, "sar_center_label"):
control_panel_ref.sar_center_label.config(text=center_txt)
# SAR Orientation Label
if hasattr(control_panel_ref, "set_sar_orientation"):
control_panel_ref.set_sar_orientation("N/A")
# SAR Size Km Label
if hasattr(control_panel_ref, "set_sar_size_km"):
control_panel_ref.set_sar_size_km("N/A")
# Mouse Coordinates Label
if hasattr(control_panel_ref, "set_mouse_coordinates"):
control_panel_ref.set_mouse_coordinates("N/A", "N/A")
# Statistics Labels
initial_stats = self.state.get_statistics()
drop_txt = (
f"Drop(Q): S={initial_stats['dropped_sar_q']},"
f"M={initial_stats['dropped_mfd_q']},"
f"Tk={initial_stats['dropped_tk_q']},"
f"Mo={initial_stats['dropped_mouse_q']}"
)
incmpl_txt = (
f"Incmpl(RX): S={initial_stats['incomplete_sar_rx']},"
f"M={initial_stats['incomplete_mfd_rx']}"
)
if hasattr(control_panel_ref, "dropped_label"):
control_panel_ref.dropped_label.config(text=drop_txt)
if hasattr(control_panel_ref, "incomplete_label"):
control_panel_ref.incomplete_label.config(text=incmpl_txt)
logging.debug(f"{log_prefix} Initial UI state labels set.")
except tk.TclError as e:
# Ignore Tcl errors if window is destroyed during init
logging.warning(
f"{log_prefix} Error setting initial UI labels (TclError): {e}"
)
except Exception as e:
logging.exception(
f"{log_prefix} Unexpected error setting initial UI labels:"
)
# --- Network Data Handlers ---
def handle_new_sar_data(
self, normalized_image_uint8: np.ndarray, geo_info_radians: Dict[str, Any]
):
"""Safely handles new SAR data received from the network receiver."""
log_prefix = "[App CB SAR]"
geo_valid = geo_info_radians.get("valid", False)
logging.debug(
f"{log_prefix} Handling new SAR data (Shape: {normalized_image_uint8.shape}, "
f"Geo Valid: {geo_valid})..."
)
if self.state.shutting_down:
logging.debug(f"{log_prefix} Shutdown detected. Ignoring.")
return
# Update state first
self.state.set_sar_data(normalized_image_uint8, geo_info_radians)
logging.debug(f"{log_prefix} SAR data/GeoInfo updated in AppState.")
# Schedule main thread processing
if self.root and self.root.winfo_exists():
logging.debug(
f"{log_prefix} Scheduling _process_sar_update_on_main_thread."
)
self.root.after_idle(self._process_sar_update_on_main_thread)
else:
logging.warning(
f"{log_prefix} Cannot schedule SAR update: Root window gone."
)
def handle_new_mfd_data(self, image_indices: np.ndarray):
"""Safely handles new MFD index data received from the network receiver."""
log_prefix = "[App CB MFD]"
logging.debug(
f"{log_prefix} Handling new MFD index data (Shape: {image_indices.shape})..."
)
if self.state.shutting_down:
logging.debug(f"{log_prefix} Shutdown detected. Ignoring.")
return
# Update state first
self.state.set_mfd_indices(image_indices)
logging.debug(f"{log_prefix} MFD indices updated in AppState.")
# Schedule main thread processing
if self.root and self.root.winfo_exists():
logging.debug(
f"{log_prefix} Scheduling _process_mfd_update_on_main_thread."
)
self.root.after_idle(self._process_mfd_update_on_main_thread)
else:
logging.warning(
f"{log_prefix} Cannot schedule MFD update: Root window gone."
)
# --- Main Thread Processing Triggers ---
def _process_sar_update_on_main_thread(self):
"""
Processes SAR updates scheduled to run on the main GUI thread.
Updates UI labels, triggers image pipeline, triggers map update,
and handles KML generation/cleanup.
"""
log_prefix = "[App MainThread SAR Update]"
if self.state.shutting_down:
logging.debug(f"{log_prefix} Shutdown detected. Skipping.")
return
logging.debug(f"{log_prefix} Processing scheduled SAR update...")
# 1. Update UI Labels
self._update_sar_ui_labels()
# 2. Trigger Image Pipeline for Display
logging.debug(f"{log_prefix} Calling image_pipeline.process_sar_for_display...")
try:
if hasattr(self, "image_pipeline") and self.image_pipeline:
self.image_pipeline.process_sar_for_display()
else:
logging.error(f"{log_prefix} ImagePipeline not available.")
except Exception as e:
logging.exception(f"{log_prefix} Error calling ImagePipeline for SAR:")
# 3. Trigger Map Update
geo_info = self.state.current_sar_geo_info
is_geo_valid = geo_info and geo_info.get("valid", False)
map_manager_active = (
hasattr(self, "map_integration_manager")
and self.map_integration_manager is not None
)
if map_manager_active and is_geo_valid:
logging.debug(
f"{log_prefix} Calling map_integration_manager.update_map_overlay..."
)
try:
# Pass current normalized SAR data and geo info
# Recorder uses raw data, overlay uses normalized+processed data
self.map_integration_manager.update_map_overlay(
self.state.current_sar_normalized, geo_info
)
except Exception as e:
logging.exception(f"{log_prefix} Error calling map manager update:")
elif config.ENABLE_MAP_OVERLAY: # Log reason for skipping only if map enabled
if not map_manager_active:
logging.debug(
f"{log_prefix} Skipping map update: MapIntegrationManager not available."
)
elif not is_geo_valid:
logging.debug(f"{log_prefix} Skipping map update: GeoInfo not valid.")
# 4. KML Generation and Cleanup Logic (if geo valid)
if is_geo_valid and config.ENABLE_KML_GENERATION:
self._handle_kml_generation(geo_info) # Use helper method
# 5. Update FPS Statistics for SAR
self._update_fps_stats("sar")
logging.debug(f"{log_prefix} Finished processing SAR update.")
def _handle_kml_generation(self, geo_info):
"""Handles KML generation, cleanup, and optional launch."""
kml_log_prefix = "[App KML]"
logging.debug(f"{kml_log_prefix} KML generation enabled.")
try:
kml_dir = config.KML_OUTPUT_DIRECTORY
os.makedirs(kml_dir, exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
kml_filename = f"sar_footprint_{timestamp}.kml"
kml_output_path = os.path.join(kml_dir, kml_filename)
logging.debug(
f"{kml_log_prefix} Calling generate_sar_kml for path: {kml_output_path}"
)
# Generate KML using utility function
kml_success = generate_sar_kml(geo_info, kml_output_path)
if kml_success:
logging.info(
f"{kml_log_prefix} KML file generated successfully: {kml_output_path}"
)
# --- Call KML Cleanup ---
logging.debug(f"{kml_log_prefix} Calling KML cleanup utility...")
try:
# Use utility function for cleanup
cleanup_old_kml_files(
config.KML_OUTPUT_DIRECTORY, config.MAX_KML_FILES
)
except Exception as cleanup_e:
logging.exception(
f"{kml_log_prefix} Error during KML cleanup call:"
)
# --- End Cleanup Call ---
# --- Optional Launch ---
if config.AUTO_LAUNCH_GOOGLE_EARTH:
logging.debug(f"{kml_log_prefix} Auto-launch Google Earth enabled.")
launch_google_earth(kml_output_path) # Use utility function
else:
logging.debug(
f"{kml_log_prefix} Auto-launch Google Earth disabled."
)
else:
logging.error(f"{kml_log_prefix} KML file generation failed.")
except ImportError as ie:
# Log error if required libraries for KML/Utils are missing
logging.error(
f"{kml_log_prefix} Cannot generate/cleanup KML due to missing library: {ie}"
)
except Exception as e:
logging.exception(
f"{kml_log_prefix} Error during KML generation/launch/cleanup process:"
)
def _process_mfd_update_on_main_thread(self):
"""Processes MFD updates scheduled to run on the main GUI thread."""
log_prefix = "[App MainThread MFD Update]"
if self.state.shutting_down:
logging.debug(f"{log_prefix} Shutdown detected. Skipping.")
return
logging.debug(f"{log_prefix} Processing scheduled MFD update...")
# 1. Trigger Image Processing Pipeline
logging.debug(f"{log_prefix} Calling image_pipeline.process_mfd_for_display...")
try:
if hasattr(self, "image_pipeline") and self.image_pipeline:
self.image_pipeline.process_mfd_for_display()
else:
logging.error(f"{log_prefix} ImagePipeline not available.")
except Exception as e:
logging.exception(f"{log_prefix} Error calling ImagePipeline for MFD:")
# 2. Update FPS Statistics for MFD
self._update_fps_stats("mfd")
logging.debug(f"{log_prefix} Finished processing MFD update.")
def _update_sar_ui_labels(self):
"""Helper method to update SAR related UI labels from AppState."""
log_prefix = "[App MainThread SAR Update]"
# Safely access control_panel instance
control_panel_ref = getattr(self, "control_panel", None)
if not control_panel_ref or not control_panel_ref.winfo_exists():
return # Skip if UI not ready
geo_info = self.state.current_sar_geo_info
center_txt = "Image Ref: N/A"
orient_txt = "N/A"
size_txt = "N/A"
is_valid_geo = geo_info and geo_info.get("valid", False)
if is_valid_geo:
try:
lat_d = math.degrees(geo_info["lat"])
lon_d = math.degrees(geo_info["lon"])
orient_d = math.degrees(geo_info["orientation"])
# Format using utility function
lat_s = decimal_to_dms(lat_d, is_latitude=True)
lon_s = decimal_to_dms(lon_d, is_latitude=False)
orient_txt = f"{orient_d:.2f}°"
center_txt = f"Image Ref: Lat={lat_s}, Lon={lon_s}"
# Calculate size in Km
scale_x = geo_info.get("scale_x", 0.0)
scale_y = geo_info.get("scale_y", 0.0)
width_px = geo_info.get("width_px", 0)
height_px = geo_info.get("height_px", 0)
if scale_x > 0 and width_px > 0 and scale_y > 0 and height_px > 0:
width_km = (scale_x * width_px) / 1000.0
height_km = (scale_y * height_px) / 1000.0
size_txt = f"W: {width_km:.1f} km, H: {height_km:.1f} km"
else:
size_txt = "Invalid Scale/Dims"
except KeyError as ke:
logging.error(
f"{log_prefix} Missing key '{ke}' in geo_info for UI update."
)
center_txt = "Ref: Data Error"
orient_txt = "Data Error"
size_txt = "Data Error"
is_valid_geo = False
except Exception as e:
logging.error(f"{log_prefix} Error formatting geo info for UI: {e}")
center_txt = "Ref: Format Error"
orient_txt = "Format Error"
size_txt = "Format Error"
is_valid_geo = False
# Safely update UI elements, checking existence first
try:
if hasattr(control_panel_ref, "sar_center_label"):
control_panel_ref.sar_center_label.config(text=center_txt)
if hasattr(control_panel_ref, "set_sar_orientation"):
control_panel_ref.set_sar_orientation(orient_txt)
if hasattr(control_panel_ref, "set_sar_size_km"):
control_panel_ref.set_sar_size_km(size_txt)
# Clear mouse coords if geo becomes invalid
if not is_valid_geo and hasattr(control_panel_ref, "set_mouse_coordinates"):
control_panel_ref.set_mouse_coordinates("N/A", "N/A")
except tk.TclError as ui_err:
if not self.state.shutting_down:
logging.warning(
f"{log_prefix} Error updating SAR UI labels (TclError): {ui_err}"
)
except Exception as gen_err:
logging.exception(f"{log_prefix} Unexpected error updating SAR UI labels:")
def _update_fps_stats(self, img_type: str):
"""Helper function to update FPS counters in AppState."""
now = time.time()
log_prefix = "[App FPS Update]"
try:
if img_type == "sar":
self.state.sar_frame_count += 1
elapsed = now - self.state.sar_update_time
# Update FPS only after a certain interval
if elapsed >= config.LOG_UPDATE_INTERVAL:
self.state.sar_fps = self.state.sar_frame_count / elapsed
self.state.sar_update_time = now
self.state.sar_frame_count = 0
logging.debug(
f"{log_prefix} SAR FPS calculated: {self.state.sar_fps:.2f}"
)
elif img_type == "mfd":
self.state.mfd_frame_count += 1
elapsed = now - self.state.mfd_start_time
if elapsed >= config.LOG_UPDATE_INTERVAL:
self.state.mfd_fps = self.state.mfd_frame_count / elapsed
self.state.mfd_start_time = now
self.state.mfd_frame_count = 0
logging.debug(
f"{log_prefix} MFD FPS calculated: {self.state.mfd_fps:.2f}"
)
except Exception as e:
logging.warning(
f"{log_prefix} Error updating FPS stats for '{img_type}': {e}"
)
# --- Trigger Methods ---
def _trigger_sar_update(self):
"""Triggers a SAR image update processing via ImagePipeline if not in test mode."""
log_prefix = "[App Trigger SAR]"
if self.state.shutting_down:
return
if not self.state.test_mode_active:
logging.debug(f"{log_prefix} Triggering SAR update via ImagePipeline.")
try:
if hasattr(self, "image_pipeline") and self.image_pipeline:
self.image_pipeline.process_sar_for_display()
else:
logging.error(f"{log_prefix} 'image_pipeline' not found.")
except Exception as e:
logging.exception(
f"{log_prefix} Error calling image_pipeline.process_sar_for_display:"
)
else:
logging.debug(
f"{log_prefix} SAR update trigger skipped (Test Mode active)."
)
def _trigger_mfd_update(self):
"""Triggers an MFD image update processing via ImagePipeline if not in test mode."""
log_prefix = "[App Trigger MFD]"
if self.state.shutting_down:
return
if not self.state.test_mode_active:
logging.debug(f"{log_prefix} Triggering MFD update via ImagePipeline.")
try:
if hasattr(self, "image_pipeline") and self.image_pipeline:
self.image_pipeline.process_mfd_for_display()
else:
logging.error(f"{log_prefix} 'image_pipeline' not found.")
except Exception as e:
logging.exception(
f"{log_prefix} Error calling image_pipeline.process_mfd_for_display:"
)
else:
logging.debug(
f"{log_prefix} MFD update trigger skipped (Test Mode active)."
)
# --- Periodic Update Scheduling ---
def schedule_periodic_updates(self):
"""Schedules the regular update of the status bar information."""
log_prefix = "[App Status Scheduler]"
if self.state.shutting_down:
logging.debug(f"{log_prefix} Shutdown detected. Stopping periodic updates.")
return
try:
# Call the status update logic
self.update_status()
except Exception as e:
logging.error(
f"{log_prefix} Error during periodic status update execution: {e}"
)
# Schedule the next call
interval_ms = max(100, int(config.LOG_UPDATE_INTERVAL * 1000))
try:
if self.root and self.root.winfo_exists():
self.root.after(interval_ms, self.schedule_periodic_updates)
except Exception as e:
# Log error only if not shutting down
if not self.state.shutting_down:
logging.warning(f"{log_prefix} Error rescheduling periodic update: {e}")
# --- Initial Image Loading ---
def load_initial_images(self):
"""(Runs in background thread) Loads initial local/test images into AppState."""
log_prefix = "[App Image Loader]"
if self.state.shutting_down:
logging.debug(f"{log_prefix} Shutdown detected. Aborting.")
return
logging.info(f"{log_prefix} Initial image loading thread started.")
# Schedule status update on main thread
if self.root and self.root.winfo_exists():
self.root.after_idle(self.set_status, "Loading initial images...")
try:
# Ensure test images are generated if test mode enabled
if config.ENABLE_TEST_MODE or self.state.test_mode_active:
if hasattr(self, "test_mode_manager") and self.test_mode_manager:
logging.debug(f"{log_prefix} Ensuring test images are generated...")
self.test_mode_manager._ensure_test_images()
else:
logging.error(
f"{log_prefix} TestModeManager not available for test image generation."
)
# Load local images if configured
if config.USE_LOCAL_IMAGES:
logging.debug(f"{log_prefix} Loading local MFD image...")
self._load_local_mfd_image()
logging.debug(f"{log_prefix} Loading local SAR image...")
self._load_local_sar_image()
# Schedule setting initial display after loading completes
if self.root and self.root.winfo_exists():
logging.debug(
f"{log_prefix} Scheduling _set_initial_display_from_loaded_data."
)
self.root.after_idle(self._set_initial_display_from_loaded_data)
except Exception as e:
logging.exception(f"{log_prefix} Error during initial image loading:")
if self.root and self.root.winfo_exists():
self.root.after_idle(self.set_status, "Error Loading Images")
finally:
logging.info(f"{log_prefix} Initial image loading thread finished.")
def _load_local_mfd_image(self):
"""Loads local MFD image data (indices) into AppState."""
log_prefix = "[App Image Loader]"
# Default to random data if file not found or loading fails
default_indices = np.random.randint(
0, 256, (config.MFD_HEIGHT, config.MFD_WIDTH), dtype=np.uint8
)
loaded_indices = None
try:
mfd_path = getattr(config, "MFD_IMAGE_PATH", "") # Safely get path
if mfd_path and os.path.exists(mfd_path):
# Actual loading logic would go here if supported
logging.warning(
f"{log_prefix} Local MFD loading from file NYI. Using random data."
)
loaded_indices = default_indices # Placeholder
else:
logging.warning(
f"{log_prefix} Local MFD image file not found or path not set: "
f"{mfd_path}. Using random data."
)
loaded_indices = default_indices
self.state.local_mfd_image_data_indices = loaded_indices
except Exception as e:
logging.exception(f"{log_prefix} Error loading local MFD image:")
self.state.local_mfd_image_data_indices = default_indices
def _load_local_sar_image(self):
"""Loads local SAR image data (raw) into AppState."""
log_prefix = "[App Image Loader]"
# Default to zeros if loading fails
default_raw_data = np.zeros(
(config.SAR_HEIGHT, config.SAR_WIDTH), dtype=config.SAR_DATA_TYPE
)
loaded_raw_data = None
try:
# Use the load_image utility from image_processing
loaded_raw_data = load_image(
config.SAR_IMAGE_PATH, config.SAR_DATA_TYPE
) # Logs internally
if loaded_raw_data is None or loaded_raw_data.size == 0:
logging.error(
f"{log_prefix} Failed to load local SAR raw data. Using zeros."
)
loaded_raw_data = default_raw_data
else:
logging.info(
f"{log_prefix} Loaded local SAR raw data "
f"(shape: {loaded_raw_data.shape})."
)
self.state.local_sar_image_data_raw = loaded_raw_data
except Exception as e:
logging.exception(f"{log_prefix} Error loading local SAR raw data:")
self.state.local_sar_image_data_raw = default_raw_data
def _set_initial_display_from_loaded_data(self):
"""(Runs in main thread) Sets the initial display based on loaded data and mode."""
log_prefix = "[App Init Display]"
if self.state.shutting_down:
logging.debug(f"{log_prefix} Shutdown detected. Skipping.")
return
is_test = self.state.test_mode_active
is_local = config.USE_LOCAL_IMAGES
if not is_test and is_local: # Local Image Mode
logging.info(f"{log_prefix} Setting initial display from local images.")
# Set MFD display
if self.state.local_mfd_image_data_indices is not None:
self.state.current_mfd_indices = (
self.state.local_mfd_image_data_indices.copy()
)
self.image_pipeline.process_mfd_for_display()
else:
logging.warning(f"{log_prefix} Local MFD data not loaded.")
# Set SAR display
if self.state.local_sar_image_data_raw is not None:
# Handles normalization, state update, UI reset, pipeline call
self.set_initial_sar_image(self.state.local_sar_image_data_raw)
else:
logging.warning(
f"{log_prefix} Local SAR data not loaded. Displaying black."
)
self.set_initial_sar_image(None) # Display black
elif is_test: # Test Mode
logging.info(
f"{log_prefix} Test mode active. Display handled by TestModeManager timers."
)
# Initial display is triggered by TestModeManager.activate() starting timers
else: # Network Mode
logging.info(
f"{log_prefix} Network mode active. Displaying initial placeholders."
)
self._show_network_placeholders()
# Set Final Initial Status (only if map isn't loading)
map_manager_active = (
hasattr(self, "map_integration_manager")
and self.map_integration_manager is not None
)
map_thread_running = False
if map_manager_active:
map_thread_attr = getattr(
self.map_integration_manager, "_map_initial_display_thread", None
)
if map_thread_attr and isinstance(map_thread_attr, threading.Thread):
map_thread_running = map_thread_attr.is_alive()
map_is_loading = map_manager_active and map_thread_running
if not map_is_loading:
final_status = ""
if is_test:
final_status = "Ready (Test Mode)"
elif is_local:
final_status = "Ready (Local Mode)"
else: # Network mode
if self.udp_socket:
final_status = f"Listening UDP {self.local_ip}:{self.local_port}"
else:
final_status = "Error: No Socket"
self.set_status(final_status)
logging.debug(
f"{log_prefix} Set final initial status (map not loading): '{final_status}'"
)
else:
logging.debug(
f"{log_prefix} Initial map display is still loading. Status update deferred."
)
def set_initial_sar_image(self, raw_image_data: Optional[np.ndarray]):
"""Processes provided raw SAR data (or None for black image), updates AppState, resets UI, triggers display."""
log_prefix = "[App Init SAR Image]"
if self.state.shutting_down:
return
logging.debug(f"{log_prefix} Processing initial raw SAR image...")
normalized: Optional[np.ndarray] = None
if raw_image_data is not None and raw_image_data.size > 0:
try:
# Use utility function for normalization
normalized = normalize_image(raw_image_data, target_type=np.uint8)
except Exception as e:
logging.exception(
f"{log_prefix} Error during initial SAR normalization:"
)
else:
logging.warning(f"{log_prefix} Provided raw SAR data is invalid or empty.")
# Update state with normalized image or fallback
if normalized is not None:
self.state.current_sar_normalized = normalized
logging.debug(
f"{log_prefix} Stored normalized initial SAR image in AppState."
)
else:
logging.warning(
f"{log_prefix} Using black image fallback for initial SAR display."
)
# Ensure state has a valid array, fill with 0 if needed
if self.state.current_sar_normalized is None:
self.state.current_sar_normalized = np.zeros(
(config.SAR_HEIGHT, config.SAR_WIDTH), dtype=np.uint8
)
self.state.current_sar_normalized.fill(0)
# Mark geo invalid for local/black image and reset UI labels
self.state.current_sar_geo_info["valid"] = False
self._reset_ui_geo_info()
# Trigger display via pipeline
logging.debug(
f"{log_prefix} Triggering display of initial SAR image via ImagePipeline."
)
if hasattr(self, "image_pipeline") and self.image_pipeline:
self.image_pipeline.process_sar_for_display()
else:
logging.error(f"{log_prefix} ImagePipeline not available for initial SAR.")
logging.info(f"{log_prefix} Initial SAR image processed and queued.")
# --- Mode Switching UI Actions ---
def activate_test_mode_ui_actions(self):
"""Handles UI and state changes needed when activating test mode."""
log_prefix = "[App Test Activate]"
logging.info(
f"{log_prefix} Performing UI/State actions for Test Mode activation."
)
self.set_status("Activating Test Mode...")
# Reset geo display as test mode doesn't use real geo data
self._reset_ui_geo_info()
# Clear display queues to avoid showing old network/local images
clear_queue(self.mfd_queue)
clear_queue(self.sar_queue)
logging.debug(f"{log_prefix} Display queues cleared.")
# Set final status
self.set_status("Ready (Test Mode)")
logging.info(f"{log_prefix} Test Mode UI/State actions complete.")
def deactivate_test_mode_ui_actions(self):
"""Handles UI and state changes needed when deactivating test mode."""
log_prefix = "[App Test Deactivate]"
logging.info(
f"{log_prefix} Performing UI/State actions for Test Mode deactivation -> Normal Mode."
)
self.set_status("Activating Normal Mode...")
# Clear queues to remove any lingering test images
clear_queue(self.mfd_queue)
clear_queue(self.sar_queue)
logging.debug(f"{log_prefix} Display queues cleared.")
# Reset geo display
self._reset_ui_geo_info()
if config.USE_LOCAL_IMAGES: # Local Image Mode Restoration
logging.info(
f"{log_prefix} Restoring display from local images stored in AppState."
)
# Restore MFD
if self.state.local_mfd_image_data_indices is not None:
self.state.current_mfd_indices = (
self.state.local_mfd_image_data_indices.copy()
)
self.image_pipeline.process_mfd_for_display()
else:
logging.warning(f"{log_prefix} No local MFD data to restore.")
# Restore SAR
if self.state.local_sar_image_data_raw is not None:
self.set_initial_sar_image(self.state.local_sar_image_data_raw)
else:
logging.warning(
f"{log_prefix} No local SAR data to restore. Displaying black."
)
self.set_initial_sar_image(None)
self.set_status("Ready (Local Mode)")
else: # Network Mode Restoration
logging.info(
f"{log_prefix} Switched to Network mode. Displaying placeholders."
)
self._show_network_placeholders()
# Set status based on socket state
socket_ok = self.udp_socket is not None and self.udp_socket.fileno() != -1
status = (
f"Listening UDP {self.local_ip}:{self.local_port}"
if socket_ok
else "Error: No UDP Socket"
)
self.set_status(status)
logging.info(f"{log_prefix} Normal Mode UI/State actions complete.")
def _reset_ui_geo_info(self):
"""Schedules UI reset for geo-related labels on the main thread."""
log_prefix = "[App UI Reset]"
# Safely access control_panel instance
control_panel_ref = getattr(self, "control_panel", None)
if self.root and self.root.winfo_exists() and control_panel_ref:
# Schedule individual label updates via after_idle
if hasattr(control_panel_ref, "set_sar_orientation"):
self.root.after_idle(
lambda: control_panel_ref.set_sar_orientation("N/A")
)
if hasattr(control_panel_ref, "set_mouse_coordinates"):
self.root.after_idle(
lambda: control_panel_ref.set_mouse_coordinates("N/A", "N/A")
)
if hasattr(control_panel_ref, "sar_center_label"):
self.root.after_idle(
lambda: control_panel_ref.sar_center_label.config(
text="Image Ref: Lat=N/A, Lon=N/A"
)
)
if hasattr(control_panel_ref, "set_sar_size_km"):
self.root.after_idle(lambda: control_panel_ref.set_sar_size_km("N/A"))
logging.debug(f"{log_prefix} Geo UI label reset scheduled.")
def _revert_test_mode_ui(self):
"""Tries to uncheck the test mode checkbox in the UI and resets the state flag."""
log_prefix = "[App Mode Switch]"
logging.warning(
f"{log_prefix} Reverting Test Mode UI and state due to activation failure."
)
# Safely access control panel and variable
control_panel_ref = getattr(self, "control_panel", None)
test_var_ref = (
getattr(control_panel_ref, "test_image_var", None)
if control_panel_ref
else None
)
if self.root and self.root.winfo_exists() and test_var_ref:
try:
# Schedule the uncheck operation on the main thread
self.root.after_idle(test_var_ref.set, 0)
except Exception as e:
logging.warning(
f"{log_prefix} Failed to schedule uncheck of test mode checkbox: {e}"
)
# Reset state flag regardless of UI success
if hasattr(self, "state"):
self.state.test_mode_active = False
logging.debug(f"{log_prefix} Test mode state flag reverted to False.")
def _show_network_placeholders(self):
"""Queues placeholder images for MFD and SAR displays."""
log_prefix = "[App Placeholders]"
logging.debug(f"{log_prefix} Queueing network placeholder images.")
try:
# Create dark gray placeholder for MFD
ph_mfd = np.full(
(config.INITIAL_MFD_HEIGHT, config.INITIAL_MFD_WIDTH, 3),
30, # Dark gray
dtype=np.uint8,
)
# Create medium gray placeholder for SAR (using current display size)
ph_sar = np.full(
(self.state.sar_display_height, self.state.sar_display_width, 3),
60, # Medium gray
dtype=np.uint8,
)
# Put placeholders onto respective display queues
put_queue(self.mfd_queue, ph_mfd, "mfd", self)
put_queue(self.sar_queue, ph_sar, "sar", self)
except Exception as e:
logging.exception(
f"{log_prefix} Error creating/queueing placeholder images:"
)
# --- Mouse Coordinate Handling ---
def process_mouse_queue(self):
"""Processes raw mouse coords from queue, calculates geo coords, queues result."""
log_prefix = "[App GeoCalc]"
if self.state.shutting_down:
return
raw_coords = None
try:
# Non-blocking get from queue
raw_coords = self.mouse_queue.get(block=False)
self.mouse_queue.task_done()
except queue.Empty:
pass # Normal case, queue is empty
except Exception as e:
logging.exception(f"{log_prefix} Error getting from mouse queue:")
pass # Continue processing loop
if (
raw_coords is not None
and isinstance(raw_coords, tuple)
and len(raw_coords) == 2
):
x_disp, y_disp = raw_coords
geo = self.state.current_sar_geo_info
disp_w = self.state.sar_display_width
disp_h = self.state.sar_display_height
lat_s: str = "N/A"
lon_s: str = "N/A"
# Check if all required geo info is present and valid
is_geo_valid_for_calc = (
geo
and geo.get("valid")
and disp_w > 0
and disp_h > 0
and geo.get("width_px", 0) > 0
and geo.get("height_px", 0) > 0
and geo.get("scale_x", 0.0) > 0
and geo.get("scale_y", 0.0) > 0
and "lat" in geo
and "lon" in geo
and "ref_x" in geo
and "ref_y" in geo
and "orientation" in geo
)
if is_geo_valid_for_calc:
logging.debug(
f"{log_prefix} Processing mouse coords: Display({x_disp},{y_disp}) "
"with valid GeoInfo."
)
try:
# Extract necessary values
orig_w = geo["width_px"]
orig_h = geo["height_px"]
scale_x = geo["scale_x"]
scale_y = geo["scale_y"]
ref_x = geo["ref_x"]
ref_y = geo["ref_y"]
ref_lat_rad = geo["lat"]
ref_lon_rad = geo["lon"]
original_orient_rad = geo.get("orientation", 0.0)
# Use original orientation for inverse rotation calculation
angle_for_inverse_rotation_rad = original_orient_rad
# Normalize display coordinates (0.0 to 1.0)
nx_disp = x_disp / disp_w
ny_disp = y_disp / disp_h
nx_orig_norm, ny_orig_norm = nx_disp, ny_disp
# Apply Inverse Rotation if needed
if abs(angle_for_inverse_rotation_rad) > 1e-4:
logging.debug(
f"{log_prefix} Applying inverse rotation "
f"(angle: {math.degrees(angle_for_inverse_rotation_rad):.2f} deg)..."
)
arad_inv = angle_for_inverse_rotation_rad
cosa = math.cos(arad_inv)
sina = math.sin(arad_inv)
# Rotate around center (0.5, 0.5) of normalized coords
cx, cy = 0.5, 0.5
tx = nx_disp - cx
ty = ny_disp - cy
rtx = tx * cosa - ty * sina
rty = tx * sina + ty * cosa
nx_orig_norm = rtx + cx
ny_orig_norm = rty + cy
logging.debug(
f"{log_prefix} Inverse rotation applied. "
f"Norm coords: ({nx_orig_norm:.4f}, {ny_orig_norm:.4f})"
)
# Convert normalized, un-rotated coords back to original pixel space
orig_x = max(0.0, min(nx_orig_norm * orig_w, orig_w - 1.0))
orig_y = max(0.0, min(ny_orig_norm * orig_h, orig_h - 1.0))
logging.debug(
f"{log_prefix} Calculated original pixel coords: ({orig_x:.2f}, {orig_y:.2f})"
)
# Geodetic Calculation (simplified approach)
# Pixel offset from reference point
pixel_delta_x = orig_x - ref_x
pixel_delta_y = ref_y - orig_y # y inverted in pixel space
# Convert pixel offset to meter offset
meters_delta_x = pixel_delta_x * scale_x # Easting offset
meters_delta_y = pixel_delta_y * scale_y # Northing offset
logging.debug(
f"{log_prefix} Offset (meters): dX={meters_delta_x:.1f} (E), "
f"dY={meters_delta_y:.1f} (N)"
)
# Approximate conversion from meters to degrees
M_PER_DLAT = 111132.954 # Approx meters per degree latitude
M_PER_DLON_EQ = (
111319.488 # Approx meters per degree longitude at equator
)
# Adjust longitude conversion based on latitude
m_per_dlon = max(abs(M_PER_DLON_EQ * math.cos(ref_lat_rad)), 1e-3)
# Calculate degree offsets
lat_offset_deg = meters_delta_y / M_PER_DLAT
lon_offset_deg = meters_delta_x / m_per_dlon
logging.debug(
f"{log_prefix} Offset (degrees): dLat={lat_offset_deg:.6f}, "
f"dLon={lon_offset_deg:.6f}"
)
# Calculate final coordinates
final_lat_deg = math.degrees(ref_lat_rad) + lat_offset_deg
final_lon_deg = math.degrees(ref_lon_rad) + lon_offset_deg
logging.debug(
f"{log_prefix} Final coords (dec deg): Lat={final_lat_deg:.6f}, "
f"Lon={final_lon_deg:.6f}"
)
# Format Output to DMS string if valid
lat_valid = (
math.isfinite(final_lat_deg) and abs(final_lat_deg) <= 90.0
)
lon_valid = (
math.isfinite(final_lon_deg) and abs(final_lon_deg) <= 180.0
)
if lat_valid and lon_valid:
lat_s = decimal_to_dms(final_lat_deg, is_latitude=True)
lon_s = decimal_to_dms(final_lon_deg, is_latitude=False)
# Check if DMS conversion itself failed
dms_failed = (
"Error" in lat_s
or "Invalid" in lat_s
or "Error" in lon_s
or "Invalid" in lon_s
)
if dms_failed:
logging.warning(f"{log_prefix} DMS conversion failed.")
lat_s, lon_s = "Error DMS", "Error DMS"
else:
logging.warning(
f"{log_prefix} Calculated coordinates out of range."
)
lat_s, lon_s = "Invalid Calc", "Invalid Calc"
except KeyError as ke:
logging.error(f"{log_prefix} Missing key in geo_info: {ke}")
lat_s, lon_s = "Error Key", "Error Key"
except Exception as calc_e:
logging.exception(f"{log_prefix} Geo calculation error:")
lat_s, lon_s = "Calc Error", "Calc Error"
# Queue Result (even if "N/A" or error strings)
result_payload = (lat_s, lon_s)
self.put_mouse_coordinates_queue(result_payload)
# Reschedule processor
if not self.state.shutting_down:
self._reschedule_queue_processor(self.process_mouse_queue, delay=50)
def put_mouse_coordinates_queue(self, coords_tuple: Tuple[str, str]):
"""Puts processed mouse coords tuple (DMS strings or status) onto Tkinter queue."""
log_prefix = "[App Mouse Queue Put]"
if self.state.shutting_down:
return
command = "MOUSE_COORDS"
payload = coords_tuple
logging.debug(
f"{log_prefix} Putting command '{command}' with payload {payload} onto tkinter_queue."
)
put_queue(
self.tkinter_queue,
(command, payload),
queue_name="tkinter",
app_instance=self, # Pass app instance for context
)
# --- Queue Processors ---
def process_sar_queue(self):
"""Gets processed SAR image from queue and displays it."""
log_prefix = "[App QProc SAR]"
if self.state.shutting_down:
return
image_to_display = None
try:
image_to_display = self.sar_queue.get(block=False)
self.sar_queue.task_done()
except queue.Empty:
pass # Normal case
except Exception as e:
logging.exception(f"{log_prefix} Error getting from SAR display queue:")
if image_to_display is not None:
logging.debug(f"{log_prefix} Dequeued SAR image. Calling DisplayManager...")
if hasattr(self, "display_manager") and self.display_manager:
try:
self.display_manager.show_sar_image(image_to_display)
except Exception as display_e:
logging.exception(
f"{log_prefix} Error calling DisplayManager.show_sar_image:"
)
else:
logging.error(f"{log_prefix} DisplayManager not available.")
# Reschedule processor
if not self.state.shutting_down:
self._reschedule_queue_processor(self.process_sar_queue)
def process_mfd_queue(self):
"""Gets processed MFD image from queue and displays it."""
log_prefix = "[App QProc MFD]"
if self.state.shutting_down:
return
image_to_display = None
try:
image_to_display = self.mfd_queue.get(block=False)
self.mfd_queue.task_done()
except queue.Empty:
pass # Normal case
except Exception as e:
logging.exception(f"{log_prefix} Error getting from MFD display queue:")
if image_to_display is not None:
logging.debug(f"{log_prefix} Dequeued MFD image. Calling DisplayManager...")
if hasattr(self, "display_manager") and self.display_manager:
try:
self.display_manager.show_mfd_image(image_to_display)
except Exception as display_e:
logging.exception(
f"{log_prefix} Error calling DisplayManager.show_mfd_image:"
)
else:
logging.error(f"{log_prefix} DisplayManager not available.")
# Reschedule processor
if not self.state.shutting_down:
self._reschedule_queue_processor(self.process_mfd_queue)
def process_tkinter_queue(self):
"""Processes commands (mouse coords, map updates, map redraw) from queue to update UI."""
log_prefix = "[App QProc Tkinter]"
if self.state.shutting_down:
return
item = None
try:
# Non-blocking get from queue
item = self.tkinter_queue.get(block=False)
self.tkinter_queue.task_done()
except queue.Empty:
pass # Normal case
except Exception as e:
logging.exception(f"{log_prefix} Error getting from Tkinter queue:")
item = None # Ensure item is None on error
if item is not None:
try:
# Expect items to be (command, payload) tuples
if isinstance(item, tuple) and len(item) == 2:
command, payload = item
logging.debug(
f"{log_prefix} Dequeued Command:'{command}', "
f"Payload Type:{type(payload)}"
)
if command == "MOUSE_COORDS":
self._handle_mouse_coords_update(payload)
elif command == "SHOW_MAP":
# Store last map PIL image in AppState before display
if Image is not None and isinstance(payload, Image.Image):
self.state.last_map_image_pil = payload.copy()
logging.debug(
f"{log_prefix} Stored last map image (PIL) in AppState."
)
elif payload is None:
# Clear state if None payload (e.g., initial placeholder)
self.state.last_map_image_pil = None
logging.debug(
f"{log_prefix} Cleared last map image in AppState "
"(payload was None)."
)
# Delegate display to handler
self._handle_show_map_update(payload)
elif command == "REDRAW_MAP":
logging.debug(f"{log_prefix} Handling REDRAW_MAP command.")
# Trigger display using the last known PIL image.
# MapIntegrationManager.display_map will handle applying current settings.
if self.state.last_map_image_pil:
logging.debug(
f"{log_prefix} Re-displaying using last stored map image as base."
)
self._handle_show_map_update(self.state.last_map_image_pil)
else:
# If no image is cached, try triggering a full update
logging.warning(
f"{log_prefix} REDRAW_MAP requested but no previous map image found. "
"Attempting full map update based on current SAR data."
)
self._trigger_map_update_from_sar()
else: # Unknown command
logging.warning(
f"{log_prefix} Unknown command received: {command}"
)
else: # Unexpected item type
logging.warning(
f"{log_prefix} Dequeued unexpected item type: {type(item)}"
)
except Exception as e:
logging.exception(
f"{log_prefix} Error processing dequeued Tkinter item:"
)
# Reschedule next check
if not self.state.shutting_down:
self._reschedule_queue_processor(self.process_tkinter_queue, delay=100)
def _trigger_map_update_from_sar(self):
"""Helper to trigger a full map update based on current SAR data."""
log_prefix = "[App Trigger Map Update]"
map_manager_exists = (
hasattr(self, "map_integration_manager")
and self.map_integration_manager is not None
)
# Check prerequisites
if (
self.state.shutting_down
or not config.ENABLE_MAP_OVERLAY
or not map_manager_exists
):
logging.debug(f"{log_prefix} Skipping map update trigger.")
return
geo_info = self.state.current_sar_geo_info
sar_data = self.state.current_sar_normalized # Use normalized for overlay
is_geo_valid = geo_info and geo_info.get("valid", False)
is_sar_valid = sar_data is not None and sar_data.size > 0
if is_geo_valid and is_sar_valid:
logging.debug(
f"{log_prefix} Triggering full map update using current SAR data..."
)
try:
# Directly call update_map_overlay. Assumes this process_tkinter_queue
# runs on the main thread, so direct call is safe.
self.map_integration_manager.update_map_overlay(sar_data, geo_info)
except Exception as e:
logging.exception(f"{log_prefix} Error during triggered map update:")
else:
logging.warning(
f"{log_prefix} Cannot trigger map update: Geo valid={is_geo_valid}, "
f"SAR valid={is_sar_valid}."
)
# Optionally queue a command to show a placeholder map
put_queue(
self.tkinter_queue,
("SHOW_MAP", None), # Command, None payload for placeholder
"tkinter",
self,
)
def _handle_mouse_coords_update(self, payload: Optional[Tuple[str, str]]):
"""Updates the mouse coordinates UI label."""
log_prefix = "[App QProc Tkinter]"
lat_s, lon_s = ("N/A", "N/A") # Default values
if payload is not None:
if isinstance(payload, tuple) and len(payload) == 2:
lat_s, lon_s = payload
else:
logging.warning(
f"{log_prefix} Invalid payload for MOUSE_COORDS: {type(payload)}"
)
lat_s, lon_s = ("Error", "Error")
try:
# Safely access control panel and method
control_panel_ref = getattr(self, "control_panel", None)
if control_panel_ref and hasattr(
control_panel_ref, "set_mouse_coordinates"
):
control_panel_ref.set_mouse_coordinates(lat_s, lon_s)
except tk.TclError as e:
if not self.state.shutting_down:
logging.warning(
f"{log_prefix} Error updating mouse coords UI (TclError): {e}"
)
except Exception as e:
logging.warning(f"{log_prefix} Error updating mouse coords UI: {e}")
def _handle_show_map_update(self, payload: Optional[Image.Image]):
"""Handles the SHOW_MAP/REDRAW_MAP command by delegating display to MapIntegrationManager."""
log_prefix = "[App QProc Tkinter]"
logging.debug(f"{log_prefix} Processing SHOW_MAP/REDRAW_MAP...")
if hasattr(self, "map_integration_manager") and self.map_integration_manager:
try:
# Delegate display call to the map manager
self.map_integration_manager.display_map(payload)
except Exception as e:
logging.exception(
f"{log_prefix} Error calling map_integration_manager.display_map:"
)
else:
logging.warning(
f"{log_prefix} Received map display command but MapIntegrationManager "
"not active."
)
def _reschedule_queue_processor(self, processor_func, delay: Optional[int] = None):
"""Helper method to reschedule a queue processor function using root.after."""
if delay is None:
# Determine default delay based on processor type
if processor_func in [self.process_sar_queue, self.process_mfd_queue]:
target_fps = config.MFD_FPS # Use MFD FPS for display queues?
calculated_delay = 1000 / (target_fps * 1.5) if target_fps > 0 else 20
delay = max(10, int(calculated_delay))
else:
delay = 100 # Default delay for other queues (e.g., tkinter, mouse)
try:
# Schedule only if root window exists
if self.root and self.root.winfo_exists():
self.root.after(delay, processor_func)
except Exception as e:
# Log error only if not shutting down
if not self.state.shutting_down:
logging.warning(
f"[App Rescheduler] Error rescheduling {processor_func.__name__}: {e}"
)
# --- Status Update ---
def update_status(self):
"""Updates status bar text and statistics labels periodically."""
log_prefix = "[App Status Update]"
if not hasattr(self, "state") or self.state.shutting_down:
return
# Check if map is still loading initially to avoid overwriting status
map_loading = False
try:
statusbar_ref = getattr(self, "statusbar", None)
if (
statusbar_ref
and statusbar_ref.winfo_exists()
and "Loading initial map" in statusbar_ref.cget("text")
):
map_loading = True
logging.debug(
f"{log_prefix} Skipping status update while initial map loads."
)
return
except Exception:
pass # Ignore errors checking status bar text
logging.debug(f"{log_prefix} Updating status bar and statistics labels...")
stats = self.state.get_statistics()
try:
# --- Format Status String Components ---
mode = (
"Test"
if self.state.test_mode_active
else ("Local" if config.USE_LOCAL_IMAGES else "Network")
)
map_active = (
hasattr(self, "map_integration_manager")
and self.map_integration_manager is not None
)
map_stat = " MapOn" if config.ENABLE_MAP_OVERLAY and map_active else ""
mfd_fps_str = f"MFD:{self.state.mfd_fps:.1f}fps"
sar_fps_str = (
f"SAR:{self.state.sar_fps:.1f}fps"
if self.state.sar_fps > 0
else "SAR:N/A"
)
status_prefix = f"Status: {mode}{map_stat}"
status_info = f"{mfd_fps_str} | {sar_fps_str}"
full_status = f"{status_prefix} | {status_info}"
# --- Format Statistics Strings ---
drop_txt = (
f"Drop(Q): S={stats['dropped_sar_q']},M={stats['dropped_mfd_q']},"
f"Tk={stats['dropped_tk_q']},Mo={stats['dropped_mouse_q']}"
)
incmpl_txt = f"Incmpl(RX): S={stats['incomplete_sar_rx']},M={stats['incomplete_mfd_rx']}"
logging.debug(f"{log_prefix} Formatted status strings.")
# --- Schedule UI updates via after_idle ---
if self.root and self.root.winfo_exists():
# Update status bar
statusbar_ref = getattr(self, "statusbar", None)
if statusbar_ref and statusbar_ref.winfo_exists():
self.root.after_idle(statusbar_ref.set_status_text, full_status)
# Update statistics labels in control panel
control_panel_ref = getattr(self, "control_panel", None)
if control_panel_ref:
# Update dropped label
dropped_label_ref = getattr(
control_panel_ref, "dropped_label", None
)
if dropped_label_ref and dropped_label_ref.winfo_exists():
self.root.after_idle(
dropped_label_ref.config, {"text": drop_txt}
)
# Update incomplete label
incomplete_label_ref = getattr(
control_panel_ref, "incomplete_label", None
)
if incomplete_label_ref and incomplete_label_ref.winfo_exists():
self.root.after_idle(
incomplete_label_ref.config, {"text": incmpl_txt}
)
except tk.TclError as e:
if not self.state.shutting_down:
logging.warning(f"{log_prefix} TclError updating status UI: {e}")
except Exception as e:
if not self.state.shutting_down:
logging.exception(f"{log_prefix} Error formatting/updating status UI:")
# --- Cleanup ---
def close_app(self):
"""Performs graceful shutdown of the application and its components."""
log_prefix = "[App Shutdown]"
if hasattr(self, "state") and self.state.shutting_down:
logging.warning(f"{log_prefix} Close already initiated.")
return
if not hasattr(self, "state"):
# Log critical error and exit if state missing during shutdown attempt
logging.error(f"{log_prefix} Cannot shutdown: AppState not found.")
sys.exit(1)
logging.info(f"{log_prefix} Starting shutdown sequence...")
self.state.shutting_down = True
try:
# Attempt final status update
self.set_status("Closing...")
except Exception:
pass # Ignore errors during final status update
# Stop test mode timers
logging.debug(f"{log_prefix} Stopping TestModeManager timers...")
if hasattr(self, "test_mode_manager") and self.test_mode_manager:
self.test_mode_manager.stop_timers()
# Shutdown map integration
logging.debug(f"{log_prefix} Shutting down MapIntegrationManager...")
if hasattr(self, "map_integration_manager") and self.map_integration_manager:
self.map_integration_manager.shutdown()
logging.debug(f"{log_prefix} Signalling periodic updates/processors to stop.")
# Close network resources
logging.debug(f"{log_prefix} Closing UDP socket...")
if self.udp_socket:
close_udp_socket(self.udp_socket)
self.udp_socket = None
# Wait for receiver thread
if self.udp_thread and self.udp_thread.is_alive():
logging.debug(f"{log_prefix} Waiting for UDP receiver thread...")
self.udp_thread.join(timeout=0.5) # Wait briefly
if self.udp_thread.is_alive():
logging.warning(
f"{log_prefix} UDP receiver thread did not exit cleanly."
)
else:
logging.debug(f"{log_prefix} UDP receiver thread exited.")
# Shutdown worker pool
worker_pool = None
if hasattr(self, "udp_receiver") and self.udp_receiver:
worker_pool = getattr(self.udp_receiver, "executor", None)
if worker_pool:
logging.info(f"{log_prefix} Shutting down worker pool...")
try:
# Cancel pending futures and don't wait for running ones
worker_pool.shutdown(wait=False, cancel_futures=True)
except Exception as e:
logging.exception(
f"{log_prefix} Exception during worker_pool shutdown: {e}"
)
# Destroy display windows
logging.debug(f"{log_prefix} Destroying display windows via DisplayManager...")
if hasattr(self, "display_manager"):
self.display_manager.destroy_windows()
# Small waitKey for OpenCV window handling
try:
logging.debug(f"{log_prefix} Final cv2.waitKey(5)...")
cv2.waitKey(5)
except Exception as e:
logging.warning(f"{log_prefix} Error during final cv2.waitKey: {e}")
# Destroy Tkinter window
logging.info(f"{log_prefix} Requesting Tkinter root window destruction...")
try:
if self.root and self.root.winfo_exists():
self.root.destroy()
logging.info(f"{log_prefix} Tkinter root window destroyed.")
except Exception as e:
logging.exception(f"{log_prefix} Error destroying Tkinter window:")
logging.info("-----------------------------------------")
logging.info(f"{log_prefix} Application close sequence finished.")
logging.info("-----------------------------------------")
sys.exit(0) # Ensure clean exit
# --- Main Execution Block ---
if __name__ == "__main__":
main_log_prefix = "[App Main]"
root = None
# Use a distinct name for the application instance variable
app_instance = None
try:
# Check map dependencies early
if config.ENABLE_MAP_OVERLAY and not MAP_MODULES_LOADED:
logging.critical(
f"{main_log_prefix} Map Overlay enabled but modules failed to load. "
"Cannot start."
)
sys.exit(1)
logging.debug(f"{main_log_prefix} Creating main Tkinter window...")
# Create window (icon setting is handled inside ControlPanelApp init)
root = create_main_window(
title="Control Panel",
min_width=config.TKINTER_MIN_WIDTH,
min_height=config.TKINTER_MIN_HEIGHT,
x_pos=10, # Initial position
y_pos=10,
)
logging.debug(f"{main_log_prefix} Main Tkinter window created.")
logging.debug(f"{main_log_prefix} Initializing App class (ControlPanelApp)...")
# Instantiate the main application class
app_instance = ControlPanelApp(root)
logging.debug(f"{main_log_prefix} App class initialized.")
# Set close protocol to call the instance's close method
root.protocol("WM_DELETE_WINDOW", app_instance.close_app)
logging.debug(f"{main_log_prefix} WM_DELETE_WINDOW protocol set.")
logging.info(f"{main_log_prefix} Starting Tkinter main event loop...")
root.mainloop()
logging.info(f"{main_log_prefix} Tkinter main event loop finished.")
except SystemExit as exit_e:
# Handle clean exit vs error exit
if exit_e.code == 0:
logging.info(f"{main_log_prefix} Application exited normally.")
else:
logging.warning(
f"{main_log_prefix} Application exited with error code {exit_e.code}."
)
except Exception as e:
# Log critical unhandled exceptions
logging.critical(
f"{main_log_prefix} UNHANDLED EXCEPTION OCCURRED:", exc_info=True
)
# Attempt emergency cleanup if app was partially initialized
if (
app_instance
and hasattr(app_instance, "state")
and not app_instance.state.shutting_down
):
logging.error(f"{main_log_prefix} Attempting emergency cleanup...")
try:
app_instance.close_app()
except SystemExit:
pass # Ignore SystemExit during cleanup
except Exception as cleanup_e:
logging.exception(
f"{main_log_prefix} Error during emergency cleanup: {cleanup_e}"
)
sys.exit(1) # Exit with error code after logging
finally:
# Final cleanup actions regardless of success or failure
logging.info(f"{main_log_prefix} Application finally block reached.")
logging.debug(
f"{main_log_prefix} Final check: Destroying any remaining OpenCV windows..."
)
try:
cv2.destroyAllWindows()
except Exception as cv_err:
logging.warning(
f"{main_log_prefix} Exception during final cv2.destroyAllWindows(): {cv_err}"
)
logging.info("================ Application End ================")
logging.shutdown() # Ensure logging resources are released