2372 lines
101 KiB
Python
2372 lines
101 KiB
Python
# --- START OF FILE app.py ---
|
|
|
|
# app.py
|
|
"""
|
|
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
|
|
|
|
Main application module for the Control Panel application.
|
|
|
|
Orchestrates UI, display, network reception, image processing pipeline,
|
|
test mode management, map integration, and state management.
|
|
Initializes all sub-modules and manages the main application lifecycle.
|
|
"""
|
|
|
|
# --- Standard library imports ---
|
|
import threading
|
|
import time
|
|
import queue
|
|
import os
|
|
import logging
|
|
import math
|
|
import sys
|
|
import socket # Required for network setup
|
|
from typing import Optional, Tuple, Any, Dict, TYPE_CHECKING
|
|
import datetime
|
|
|
|
# --- Third-party imports ---
|
|
import tkinter as tk
|
|
from tkinter import ttk
|
|
from tkinter import colorchooser
|
|
import numpy as np
|
|
import cv2 # Keep for potential utility/fallback use
|
|
import screeninfo
|
|
|
|
try:
|
|
from PIL import Image # Needed for map image state and type hinting
|
|
except ImportError:
|
|
Image = None # Define as None if Pillow not installed
|
|
logging.critical(
|
|
"[App Init] Pillow library not found. Map/Image functionality will fail."
|
|
)
|
|
|
|
|
|
# --- Configuration Import ---
|
|
import config
|
|
|
|
# --- Logging Setup ---
|
|
# Import and call the setup function from the dedicated module
|
|
try:
|
|
from logging_config import setup_logging
|
|
|
|
# Configure logging as early as possible
|
|
setup_logging()
|
|
except ImportError:
|
|
# Fallback basic configuration if logging_config fails
|
|
print("ERROR: logging_config.py not found. Using basic logging.")
|
|
logging.basicConfig(
|
|
level=logging.WARNING,
|
|
format="%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s",
|
|
)
|
|
|
|
# --- Application Modules Import ---
|
|
from ui import ControlPanel, StatusBar, create_main_window
|
|
from display import DisplayManager
|
|
|
|
# Import specific utils as needed
|
|
from utils import (
|
|
put_queue,
|
|
clear_queue,
|
|
decimal_to_dms,
|
|
generate_sar_kml,
|
|
launch_google_earth,
|
|
cleanup_old_kml_files,
|
|
) # Added cleanup
|
|
from network import create_udp_socket, close_udp_socket
|
|
from receiver import UdpReceiver
|
|
from app_state import AppState # Centralized state
|
|
from test_mode_manager import TestModeManager # Manages test mode logic
|
|
from image_pipeline import ImagePipeline # Manages normal image processing
|
|
|
|
# Import image processing functions only if needed directly in App
|
|
from image_processing import (
|
|
load_image,
|
|
normalize_image,
|
|
) # normalize_image needed for set_initial_sar_image
|
|
|
|
|
|
# --- Map related imports (Conditional) ---
|
|
# Check if map modules are present before attempting specific imports
|
|
map_libs_found = True
|
|
try:
|
|
# Core map libs needed by manager/utils
|
|
import mercantile
|
|
import pyproj
|
|
|
|
# Pillow already checked above
|
|
if Image is None:
|
|
raise ImportError("Pillow library failed to import earlier.")
|
|
except ImportError as map_lib_err:
|
|
logging.warning(
|
|
f"[App Init] Failed to import core map library ({map_lib_err}). Map functionality disabled."
|
|
)
|
|
map_libs_found = False
|
|
# Define placeholders for type hinting if core libs failed
|
|
BaseMapService = None # type: ignore
|
|
MapTileManager = None # type: ignore
|
|
MapDisplayWindow = None # type: ignore
|
|
MapIntegrationManager = None # type: ignore
|
|
MapCalculationError = Exception
|
|
|
|
if map_libs_found:
|
|
try:
|
|
from map_services import get_map_service, BaseMapService
|
|
from map_manager import MapTileManager
|
|
from map_utils import (
|
|
get_bounding_box_from_center_size,
|
|
get_tile_ranges_for_bbox,
|
|
MapCalculationError,
|
|
)
|
|
from map_display import MapDisplayWindow
|
|
|
|
# Import the integration manager
|
|
from map_integration import MapIntegrationManager
|
|
|
|
MAP_MODULES_LOADED = True
|
|
except ImportError as map_import_err:
|
|
logging.warning(
|
|
f"[App Init] Failed to import specific map modules ({map_import_err}). Map functionality disabled."
|
|
)
|
|
MAP_MODULES_LOADED = False
|
|
# Define placeholders if specific modules failed
|
|
BaseMapService = None # type: ignore
|
|
MapTileManager = None # type: ignore
|
|
MapDisplayWindow = None # type: ignore
|
|
MapIntegrationManager = None # type: ignore
|
|
MapCalculationError = Exception
|
|
else:
|
|
MAP_MODULES_LOADED = False
|
|
|
|
# Type checking block for App class reference in managers
|
|
if TYPE_CHECKING:
|
|
# This avoids circular imports at runtime but helps type checkers
|
|
pass # No direct import needed here as other modules import App
|
|
|
|
|
|
# --- Main Application Class ---
|
|
class App:
|
|
"""
|
|
Main application class. Manages UI, display, processing, network, state,
|
|
and orchestrates various managers (Test Mode, Image Pipeline, Map Integration).
|
|
"""
|
|
|
|
# --- Methods DEFINED BEFORE __init__ to be available for bindings ---
|
|
|
|
# --- Status Update Method ---
|
|
def set_status(self, message: str):
|
|
"""
|
|
Safely updates the main status message prefix in the status bar.
|
|
Uses after_idle for thread safety.
|
|
"""
|
|
log_prefix = "[App Set Status]"
|
|
# Check state exists and flag before proceeding
|
|
if not hasattr(self, "state") or self.state.shutting_down:
|
|
return
|
|
|
|
new_status_prefix = f"Status: {message}"
|
|
# Use INFO level for user-visible status changes
|
|
logging.info(f"{log_prefix} Request to set status message prefix: '{message}'")
|
|
|
|
def _update_status_text_on_main_thread():
|
|
"""Internal function to update status text, runs in main GUI thread."""
|
|
# Check state again inside the scheduled function
|
|
if not hasattr(self, "state") or self.state.shutting_down:
|
|
return
|
|
try:
|
|
# Check if statusbar exists and is valid Tkinter widget
|
|
if not (
|
|
hasattr(self, "statusbar")
|
|
and isinstance(self.statusbar, tk.Widget)
|
|
and self.statusbar.winfo_exists()
|
|
):
|
|
logging.warning(
|
|
f"{log_prefix} Statusbar widget not available, cannot update status."
|
|
)
|
|
return
|
|
|
|
# Get current text and parse existing parts (keep info after '|')
|
|
current_text: str = self.statusbar.cget("text")
|
|
parts = current_text.split("|")
|
|
suffix = ""
|
|
# Rebuild suffix from parts after the first one
|
|
if len(parts) > 1:
|
|
suffix_parts = [p.strip() for p in parts[1:] if p.strip()]
|
|
if suffix_parts:
|
|
suffix = " | " + " | ".join(suffix_parts)
|
|
|
|
# Combine new prefix and existing suffix
|
|
final_text = f"{new_status_prefix}{suffix}"
|
|
logging.debug(
|
|
f"{log_prefix} Updating status bar text to: '{final_text}'"
|
|
)
|
|
# Call StatusBar's method to update
|
|
self.statusbar.set_status_text(final_text)
|
|
|
|
except tk.TclError as e:
|
|
# Log TclError (widget destroyed) if not shutting down
|
|
if not self.state.shutting_down:
|
|
logging.warning(f"{log_prefix} TclError setting status text: {e}")
|
|
except Exception as e:
|
|
# Log other unexpected errors
|
|
logging.exception(f"{log_prefix} Error updating status bar text:")
|
|
|
|
# Schedule the update on the main GUI thread
|
|
try:
|
|
if hasattr(self, "root") and self.root and self.root.winfo_exists():
|
|
# Use after_idle to ensure it runs when Tkinter is idle
|
|
self.root.after_idle(_update_status_text_on_main_thread)
|
|
# else: Don't log if root is None or destroyed, expected during shutdown
|
|
except Exception as e:
|
|
# Log error only if not shutting down
|
|
if not hasattr(self, "state") or not self.state.shutting_down:
|
|
logging.warning(
|
|
f"{log_prefix} Error scheduling status update via after_idle: {e}"
|
|
)
|
|
|
|
# --- LUT Generation Methods ---
|
|
def update_brightness_contrast_lut(self):
|
|
"""Recalculates the SAR B/C LUT based on AppState and stores it back in AppState."""
|
|
log_prefix = "[App Update SAR LUT]"
|
|
logging.debug(f"{log_prefix} Updating SAR Brightness/Contrast LUT...")
|
|
|
|
# Check if state is initialized
|
|
if not hasattr(self, "state"):
|
|
logging.error(f"{log_prefix} AppState not ready for LUT update.")
|
|
return
|
|
|
|
try:
|
|
contrast_val = max(0.01, self.state.sar_contrast) # Use max(0.01,...)
|
|
brightness_val = self.state.sar_brightness
|
|
except AttributeError:
|
|
logging.error(
|
|
f"{log_prefix} Error accessing state for SAR LUT parameters (AttributeError)."
|
|
)
|
|
return
|
|
except Exception as e:
|
|
logging.error(
|
|
f"{log_prefix} Unexpected error accessing state for SAR LUT params: {e}"
|
|
)
|
|
return
|
|
|
|
# Calculate the LUT using numpy vectorized operations
|
|
try:
|
|
lut_values = np.arange(256, dtype=np.float32)
|
|
adjusted_values = (lut_values * contrast_val) + brightness_val
|
|
lut = np.clip(np.round(adjusted_values), 0, 255).astype(np.uint8)
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error calculating SAR B/C LUT:")
|
|
identity_lut = np.arange(
|
|
256, dtype=np.uint8
|
|
) # Create identity LUT as fallback
|
|
self.state.brightness_contrast_lut = identity_lut # Store fallback
|
|
logging.error(
|
|
f"{log_prefix} Using identity SAR LUT as fallback due to calculation error."
|
|
)
|
|
return
|
|
|
|
# Store the calculated LUT back into AppState
|
|
self.state.brightness_contrast_lut = lut
|
|
logging.debug(f"{log_prefix} SAR B/C LUT updated successfully in AppState.")
|
|
|
|
def update_mfd_lut(self):
|
|
"""Recalculates the MFD LUT based on AppState parameters and stores it back in AppState."""
|
|
log_prefix = "[MFD LUT Update]"
|
|
logging.debug(f"{log_prefix} Recalculating MFD Color LUT...")
|
|
|
|
# Check if state is initialized
|
|
if not hasattr(self, "state"):
|
|
logging.error(f"{log_prefix} AppState not ready for MFD LUT update.")
|
|
return
|
|
|
|
try:
|
|
# Read parameters from AppState safely
|
|
mfd_params = self.state.mfd_params
|
|
raw_map_intensity_factor = mfd_params["raw_map_intensity"] / 255.0
|
|
pixel_to_category = mfd_params["pixel_to_category"]
|
|
categories = mfd_params["categories"]
|
|
except AttributeError:
|
|
logging.error(
|
|
f"{log_prefix} Error accessing mfd_params state (AttributeError)."
|
|
)
|
|
return
|
|
except KeyError as ke:
|
|
logging.error(f"{log_prefix} Missing key in AppState mfd_params: {ke}")
|
|
return
|
|
except Exception as e:
|
|
logging.error(
|
|
f"{log_prefix} Unexpected error accessing state for MFD LUT params: {e}"
|
|
)
|
|
return
|
|
|
|
# Initialize a new LUT array (256 entries, 3 channels BGR) with zeros
|
|
new_lut = np.zeros((256, 3), dtype=np.uint8)
|
|
|
|
try:
|
|
# Iterate through all possible pixel index values (0-255)
|
|
for index_value in range(256):
|
|
category_name = pixel_to_category.get(index_value)
|
|
|
|
if category_name: # Categorized Pixels (0-31 typically)
|
|
cat_data = categories[category_name]
|
|
base_bgr = cat_data["color"]
|
|
intensity_factor = cat_data["intensity"] / 255.0
|
|
final_b = float(base_bgr[0]) * intensity_factor
|
|
final_g = float(base_bgr[1]) * intensity_factor
|
|
final_r = float(base_bgr[2]) * intensity_factor
|
|
new_lut[index_value, 0] = np.clip(int(round(final_b)), 0, 255)
|
|
new_lut[index_value, 1] = np.clip(int(round(final_g)), 0, 255)
|
|
new_lut[index_value, 2] = np.clip(int(round(final_r)), 0, 255)
|
|
|
|
elif 32 <= index_value <= 255: # Raw Map Pixels (32-255 typically)
|
|
raw_intensity = (float(index_value) - 32.0) * (255.0 / 223.0)
|
|
final_gray_float = raw_intensity * raw_map_intensity_factor
|
|
final_gray_int = int(round(np.clip(final_gray_float, 0, 255)))
|
|
new_lut[index_value, :] = final_gray_int
|
|
else: # Handle cases where index is < 32 but not found (e.g., Reserved)
|
|
if category_name is None:
|
|
logging.warning(
|
|
f"{log_prefix} Index {index_value} has no assigned category. Defaulting to black."
|
|
)
|
|
# new_lut[index_value, :] is already [0, 0, 0]
|
|
|
|
# Store the completed LUT back into AppState
|
|
self.state.mfd_lut = new_lut
|
|
logging.debug(
|
|
f"{log_prefix} MFD LUT update complete and stored in AppState."
|
|
)
|
|
|
|
except KeyError as ke:
|
|
logging.error(
|
|
f"{log_prefix} Missing category key '{ke}' during MFD LUT generation."
|
|
)
|
|
self._apply_fallback_mfd_lut() # Apply fallback
|
|
except Exception as e:
|
|
logging.critical(
|
|
f"{log_prefix} CRITICAL error during MFD LUT generation:", exc_info=True
|
|
)
|
|
self._apply_fallback_mfd_lut() # Apply fallback
|
|
|
|
def _apply_fallback_mfd_lut(self):
|
|
"""Applies a simple grayscale ramp as a fallback MFD LUT in case of errors."""
|
|
log_prefix = "[MFD LUT Update]"
|
|
logging.error(
|
|
f"{log_prefix} Applying fallback grayscale MFD LUT due to previous errors."
|
|
)
|
|
if hasattr(self, "state"):
|
|
try:
|
|
gray_ramp = np.arange(256, dtype=np.uint8)
|
|
fallback_lut = cv2.cvtColor(
|
|
gray_ramp[:, np.newaxis], cv2.COLOR_GRAY2BGR
|
|
)[:, 0, :]
|
|
self.state.mfd_lut = fallback_lut
|
|
except Exception as fallback_e:
|
|
logging.critical(
|
|
f"{log_prefix} Failed even to create fallback MFD LUT: {fallback_e}"
|
|
)
|
|
self.state.mfd_lut = np.zeros(
|
|
(256, 3), dtype=np.uint8
|
|
) # Ensure state LUT is valid
|
|
|
|
# --- UI Callback Methods ---
|
|
def update_image_mode(self): # UI Callback
|
|
"""Handles switching between Test and Normal Mode based on UI checkbox."""
|
|
log_prefix = "[App Mode Switch]"
|
|
if not hasattr(self, "state") or not hasattr(self, "test_mode_manager"):
|
|
logging.error(
|
|
f"{log_prefix} State or TestModeManager not initialized. Cannot switch mode."
|
|
)
|
|
return
|
|
if self.state.shutting_down:
|
|
return # Ignore if shutting down
|
|
|
|
try:
|
|
is_test_req = False
|
|
if hasattr(self.control_panel, "test_image_var") and isinstance(
|
|
self.control_panel.test_image_var, tk.Variable
|
|
):
|
|
is_test_req = self.control_panel.test_image_var.get() == 1
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} test_image_var not found or invalid in control_panel."
|
|
)
|
|
is_test_req = self.state.test_mode_active # Fallback to current state
|
|
|
|
# Perform switch only if requested mode differs from current state
|
|
if is_test_req != self.state.test_mode_active:
|
|
logging.info(
|
|
f"{log_prefix} Request to change Test Mode state to: {is_test_req}"
|
|
)
|
|
self.state.test_mode_active = is_test_req # Update state flag first
|
|
|
|
if self.state.test_mode_active:
|
|
if self.test_mode_manager.activate(): # Attempt activation
|
|
self.activate_test_mode_ui_actions() # Perform UI actions on success
|
|
else: # Manager activation failed
|
|
logging.error(
|
|
f"{log_prefix} TestModeManager activation failed. Reverting UI and state."
|
|
)
|
|
self._revert_test_mode_ui() # Revert checkbox and state flag
|
|
else: # Deactivating test mode
|
|
self.test_mode_manager.deactivate() # Stop timers
|
|
self.deactivate_test_mode_ui_actions() # Perform UI actions for normal mode
|
|
|
|
# Reset statistics whenever the mode successfully changes
|
|
self.state.reset_statistics()
|
|
self.update_status() # Update the status bar display
|
|
else:
|
|
logging.debug(
|
|
f"{log_prefix} Requested mode ({is_test_req}) is the same as current mode. No change."
|
|
)
|
|
|
|
except tk.TclError as e:
|
|
logging.warning(
|
|
f"{log_prefix} UI error accessing checkbox state (TclError): {e}"
|
|
)
|
|
except AttributeError as ae:
|
|
logging.error(
|
|
f"{log_prefix} Missing attribute during mode update (manager init issue?): {ae}"
|
|
)
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Unexpected error during mode update:")
|
|
|
|
def update_sar_size(self, event=None): # UI Callback
|
|
"""
|
|
Callback for SAR size combobox change. Updates state and triggers processing.
|
|
Allows changing size even when map overlay is active.
|
|
"""
|
|
log_prefix = "[App CB SAR Size]"
|
|
if not hasattr(self, "state") or self.state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown or state not ready. Ignoring.")
|
|
return
|
|
|
|
# Map active check removed - allow resize always
|
|
|
|
try:
|
|
# Check UI elements exist
|
|
if not hasattr(self, "control_panel") or not hasattr(
|
|
self.control_panel, "sar_size_combo"
|
|
):
|
|
logging.warning(f"{log_prefix} SAR size combobox not found.")
|
|
return
|
|
|
|
selected_size_str = self.control_panel.sar_size_combo.get()
|
|
logging.debug(
|
|
f"{log_prefix} SAR display size selected: '{selected_size_str}'"
|
|
)
|
|
|
|
factor = 1 # Default factor
|
|
if selected_size_str and ":" in selected_size_str:
|
|
try:
|
|
factor_str = selected_size_str.split(":")[1]
|
|
factor = int(factor_str)
|
|
if factor <= 0:
|
|
raise ValueError("Factor must be positive")
|
|
except (IndexError, ValueError) as parse_err:
|
|
logging.warning(
|
|
f"{log_prefix} Invalid SAR size factor: '{selected_size_str}'. Error: {parse_err}. Using factor 1."
|
|
)
|
|
factor = 1
|
|
try: # Attempt to reset UI
|
|
current_factor = (
|
|
config.SAR_WIDTH // self.state.sar_display_width
|
|
if self.state.sar_display_width > 0
|
|
else 1
|
|
)
|
|
current_size_str = f"1:{current_factor}"
|
|
if current_size_str not in config.SAR_SIZE_FACTORS:
|
|
current_size_str = config.DEFAULT_SAR_SIZE
|
|
self.control_panel.sar_size_combo.set(current_size_str)
|
|
except Exception as reset_e:
|
|
logging.warning(
|
|
f"{log_prefix} Failed to reset SAR size combobox UI: {reset_e}"
|
|
)
|
|
|
|
new_width = max(1, config.SAR_WIDTH // factor)
|
|
new_height = max(1, config.SAR_HEIGHT // factor)
|
|
|
|
if (
|
|
new_width != self.state.sar_display_width
|
|
or new_height != self.state.sar_display_height
|
|
):
|
|
logging.info(
|
|
f"{log_prefix} Requesting SAR display size update to {new_width}x{new_height} (Factor 1:{factor})"
|
|
)
|
|
self.state.update_sar_display_size(new_width, new_height)
|
|
self._trigger_sar_update()
|
|
else:
|
|
logging.debug(
|
|
f"{log_prefix} Selected size {new_width}x{new_height} is the same as current."
|
|
)
|
|
|
|
except tk.TclError as e:
|
|
if not self.state.shutting_down:
|
|
logging.warning(
|
|
f"{log_prefix} TclError accessing SAR size combobox: {e}"
|
|
)
|
|
except Exception as e:
|
|
if not self.state.shutting_down:
|
|
logging.exception(f"{log_prefix} Error processing SAR size update:")
|
|
|
|
def update_contrast(self, value_str: str): # UI Callback
|
|
"""Callback for SAR contrast slider. Updates state, LUT, and triggers display update."""
|
|
log_prefix = "[App CB SAR Contrast]"
|
|
if self.state.shutting_down:
|
|
return
|
|
try:
|
|
contrast = float(value_str)
|
|
self.state.update_sar_parameters(contrast=contrast)
|
|
self.update_brightness_contrast_lut()
|
|
self._trigger_sar_update()
|
|
except ValueError:
|
|
logging.warning(
|
|
f"{log_prefix} Invalid contrast value received: {value_str}"
|
|
)
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error updating contrast: {e}")
|
|
|
|
def update_brightness(self, value_str: str): # UI Callback
|
|
"""Callback for SAR brightness slider. Updates state, LUT, and triggers display update."""
|
|
log_prefix = "[App CB SAR Brightness]"
|
|
if self.state.shutting_down:
|
|
return
|
|
try:
|
|
brightness = int(float(value_str))
|
|
self.state.update_sar_parameters(brightness=brightness)
|
|
self.update_brightness_contrast_lut()
|
|
self._trigger_sar_update()
|
|
except ValueError:
|
|
logging.warning(
|
|
f"{log_prefix} Invalid brightness value received: {value_str}"
|
|
)
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error updating brightness: {e}")
|
|
|
|
def update_sar_palette(self, event=None): # UI Callback
|
|
"""Callback for SAR palette combobox. Updates state and triggers display update."""
|
|
log_prefix = "[App CB SAR Palette]"
|
|
if self.state.shutting_down:
|
|
return
|
|
try:
|
|
palette = self.control_panel.palette_combo.get()
|
|
logging.debug(f"{log_prefix} Palette changed to '{palette}'")
|
|
if palette in config.COLOR_PALETTES:
|
|
self.state.update_sar_parameters(palette=palette)
|
|
self._trigger_sar_update()
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Unknown palette selected: '{palette}'. Ignoring."
|
|
)
|
|
self.control_panel.palette_combo.set(self.state.sar_palette) # Reset UI
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error updating SAR palette: {e}")
|
|
|
|
def update_mfd_category_intensity(
|
|
self, category_name: str, intensity_value: int
|
|
): # UI Callback
|
|
"""Callback for MFD category intensity slider. Updates state, LUT, and triggers display."""
|
|
log_prefix = "[App CB MFD Param Intensity]"
|
|
if self.state.shutting_down:
|
|
return
|
|
|
|
logging.debug(
|
|
f"{log_prefix} Category='{category_name}', Intensity={intensity_value}"
|
|
)
|
|
try:
|
|
intensity = np.clip(
|
|
intensity_value, 0, 255
|
|
) # Value is already int fromIntVar
|
|
if category_name in self.state.mfd_params["categories"]:
|
|
self.state.mfd_params["categories"][category_name][
|
|
"intensity"
|
|
] = intensity
|
|
logging.debug(
|
|
f"{log_prefix} Intensity for '{category_name}' set to {intensity} in AppState."
|
|
)
|
|
self.update_mfd_lut()
|
|
self._trigger_mfd_update()
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Unknown MFD category received: '{category_name}'"
|
|
)
|
|
except KeyError as ke:
|
|
logging.error(
|
|
f"{log_prefix} KeyError accessing MFD params for '{category_name}': {ke}"
|
|
)
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Error updating MFD intensity for '{category_name}': {e}"
|
|
)
|
|
|
|
def choose_mfd_category_color(self, category_name: str): # UI Callback
|
|
"""Callback for MFD category color button. Opens chooser, updates state, LUT, UI, and triggers display."""
|
|
log_prefix = "[App CB MFD Param Color]"
|
|
if self.state.shutting_down:
|
|
return
|
|
|
|
logging.debug(
|
|
f"{log_prefix} Color chooser requested for Category='{category_name}'"
|
|
)
|
|
if category_name not in self.state.mfd_params["categories"]:
|
|
logging.warning(
|
|
f"{log_prefix} Cannot choose color for unknown MFD category: '{category_name}'"
|
|
)
|
|
return
|
|
|
|
try:
|
|
initial_bgr = self.state.mfd_params["categories"][category_name]["color"]
|
|
initial_hex = (
|
|
f"#{initial_bgr[2]:02x}{initial_bgr[1]:02x}{initial_bgr[0]:02x}"
|
|
)
|
|
logging.debug(
|
|
f"{log_prefix} Opening color chooser (initial BGR: {initial_bgr}, initial HEX: {initial_hex})"
|
|
)
|
|
|
|
# Open the Tkinter color chooser dialog
|
|
color_code = colorchooser.askcolor(
|
|
title=f"Select Color for {category_name}", initialcolor=initial_hex
|
|
)
|
|
|
|
if color_code and color_code[0]: # Check if a color was selected
|
|
rgb = color_code[0]
|
|
new_bgr = tuple(
|
|
np.clip(int(c), 0, 255) for c in (rgb[2], rgb[1], rgb[0])
|
|
) # Convert RGB to BGR
|
|
logging.info(
|
|
f"{log_prefix} New color chosen for '{category_name}': RGB={rgb}, BGR={new_bgr}"
|
|
)
|
|
|
|
self.state.mfd_params["categories"][category_name][
|
|
"color"
|
|
] = new_bgr # Update state
|
|
self.update_mfd_lut() # Recalculate LUT
|
|
# Schedule UI update for color preview
|
|
if (
|
|
self.root
|
|
and self.root.winfo_exists()
|
|
and hasattr(self.control_panel, "update_mfd_color_display")
|
|
):
|
|
self.root.after_idle(
|
|
self.control_panel.update_mfd_color_display,
|
|
category_name,
|
|
new_bgr,
|
|
)
|
|
# Trigger MFD display update
|
|
self._trigger_mfd_update()
|
|
else:
|
|
logging.debug(f"{log_prefix} Color selection cancelled.")
|
|
|
|
except KeyError as ke:
|
|
logging.error(
|
|
f"{log_prefix} KeyError accessing MFD params for '{category_name}': {ke}"
|
|
)
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Error during color selection for '{category_name}': {e}"
|
|
)
|
|
|
|
def update_mfd_raw_map_intensity(self, intensity_value: int): # UI Callback
|
|
"""Callback for Raw Map intensity slider. Updates state, LUT, and triggers display."""
|
|
log_prefix = "[App CB MFD Param RawMap]"
|
|
if self.state.shutting_down:
|
|
return
|
|
|
|
logging.debug(
|
|
f"{log_prefix} Raw Map intensity slider changed -> {intensity_value}"
|
|
)
|
|
try:
|
|
intensity = np.clip(intensity_value, 0, 255)
|
|
self.state.mfd_params["raw_map_intensity"] = intensity
|
|
logging.info(
|
|
f"{log_prefix} Raw Map intensity set to {intensity} in AppState."
|
|
)
|
|
self.update_mfd_lut()
|
|
self._trigger_mfd_update()
|
|
except KeyError as ke:
|
|
logging.error(
|
|
f"{log_prefix} KeyError accessing MFD params for raw_map_intensity: {ke}"
|
|
)
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error updating raw map intensity: {e}")
|
|
|
|
# --- NEW UI Callback Method ---
|
|
def update_map_size(self, event=None): # UI Callback for Map Size
|
|
"""Callback for Map size combobox change. Updates state and triggers map redraw."""
|
|
log_prefix = "[App CB Map Size]"
|
|
if not hasattr(self, "state") or self.state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown or state not ready. Ignoring.")
|
|
return
|
|
|
|
try:
|
|
# Check if control panel and combo box exist
|
|
if not hasattr(self, "control_panel") or not hasattr(
|
|
self.control_panel, "map_size_combo"
|
|
):
|
|
logging.warning(
|
|
f"{log_prefix} Map size combobox not found in control panel."
|
|
)
|
|
return
|
|
|
|
selected_size_str = self.control_panel.map_size_combo.get()
|
|
logging.debug(
|
|
f"{log_prefix} Map display size selected: '{selected_size_str}'"
|
|
)
|
|
|
|
# Update the scale factor in AppState using its dedicated method
|
|
self.state.update_map_scale_factor(selected_size_str)
|
|
|
|
# Trigger a map redraw using the last known map image
|
|
logging.debug(f"{log_prefix} Queueing REDRAW_MAP command.")
|
|
put_queue(
|
|
queue_obj=self.tkinter_queue,
|
|
item=("REDRAW_MAP", None), # Command and dummy payload
|
|
queue_name="tkinter",
|
|
app_instance=self,
|
|
)
|
|
|
|
except tk.TclError as e:
|
|
if not self.state.shutting_down:
|
|
logging.warning(
|
|
f"{log_prefix} TclError accessing Map size combobox: {e}"
|
|
)
|
|
except Exception as e:
|
|
if not self.state.shutting_down:
|
|
logging.exception(f"{log_prefix} Error processing Map size update: {e}")
|
|
|
|
# --- Initialization ---
|
|
def __init__(self, root: tk.Tk):
|
|
"""Initializes the main application components and state."""
|
|
log_prefix = "[App Init]"
|
|
logging.debug(f"{log_prefix} Starting application initialization...")
|
|
|
|
self.root = root
|
|
self.root.title("Control Panel")
|
|
self.root.minsize(config.TKINTER_MIN_WIDTH, config.TKINTER_MIN_HEIGHT)
|
|
|
|
# --- Central State Initialization ---
|
|
self.state = AppState()
|
|
logging.debug(f"{log_prefix} AppState instance created.")
|
|
|
|
# --- Data Queues ---
|
|
self.sar_queue: queue.Queue = queue.Queue(maxsize=config.DEFAULT_SAR_QUEUE)
|
|
self.mouse_queue: queue.Queue = queue.Queue(maxsize=config.DEFAULT_MOUSE_QUEUE)
|
|
self.tkinter_queue: queue.Queue = queue.Queue(
|
|
maxsize=config.DEFAULT_TKINTER_QUEUE
|
|
)
|
|
self.mfd_queue: queue.Queue = queue.Queue(maxsize=config.DEFAULT_MFD_QUEUE)
|
|
logging.debug(f"{log_prefix} Data queues initialized.")
|
|
|
|
# --- Screen Info & Window Placement ---
|
|
screen_w, screen_h = self._get_screen_dimensions()
|
|
|
|
# --- Calculate Initial Window Positions and Sizes ---
|
|
initial_sar_w, initial_sar_h = self._calculate_initial_sar_size()
|
|
self.tkinter_x, self.tkinter_y = self._calculate_tkinter_position(screen_h)
|
|
self.mfd_x, self.mfd_y = self._calculate_mfd_position()
|
|
self.sar_x, self.sar_y = self._calculate_sar_position(screen_w, initial_sar_w)
|
|
# Calculate map position using base max size
|
|
# Assuming MapDisplayWindow has MAX_DISPLAY_WIDTH defined or use config
|
|
map_max_w = getattr(
|
|
MapDisplayWindow, "MAX_DISPLAY_WIDTH", config.MAX_MAP_DISPLAY_WIDTH
|
|
)
|
|
map_x, map_y = self._calculate_map_position(screen_w, initial_sar_w, map_max_w)
|
|
|
|
# Set Tkinter window position
|
|
self.root.geometry(f"+{self.tkinter_x}+{self.tkinter_y}")
|
|
logging.debug(
|
|
f"{log_prefix} Initial Window positions: Tk({self.tkinter_x},{self.tkinter_y}), "
|
|
f"MFD({self.mfd_x},{self.mfd_y}), SAR({self.sar_x},{self.sar_y}), MapEst({map_x},{map_y})"
|
|
)
|
|
|
|
# --- Initialize Sub-systems ---
|
|
# 1. UI Components
|
|
self.statusbar = StatusBar(self.root)
|
|
self.control_panel = ControlPanel(self.root, self) # Pass App instance
|
|
logging.debug(f"{log_prefix} UI components created.")
|
|
|
|
# 2. LUTs
|
|
self.update_brightness_contrast_lut()
|
|
self.update_mfd_lut()
|
|
logging.debug(f"{log_prefix} Initial LUTs generated.")
|
|
|
|
# 3. Display Manager (Pass App instance)
|
|
self.display_manager = DisplayManager(
|
|
app=self, # Pass self for state access
|
|
sar_queue=self.sar_queue,
|
|
mouse_queue=self.mouse_queue,
|
|
sar_x=self.sar_x,
|
|
sar_y=self.sar_y,
|
|
mfd_x=self.mfd_x,
|
|
mfd_y=self.mfd_y,
|
|
initial_sar_width=self.state.sar_display_width,
|
|
initial_sar_height=self.state.sar_display_height,
|
|
)
|
|
logging.debug(f"{log_prefix} DisplayManager created.")
|
|
try:
|
|
self.display_manager.initialize_display_windows() # Initialize placeholders
|
|
except Exception as e:
|
|
self.set_status("Error: Display Init Failed")
|
|
logging.critical(
|
|
f"{log_prefix} Display window initialization failed: {e}", exc_info=True
|
|
)
|
|
|
|
# 4. Image Processing Pipeline
|
|
self.image_pipeline = ImagePipeline(
|
|
app_state=self.state,
|
|
sar_queue=self.sar_queue,
|
|
mfd_queue=self.mfd_queue,
|
|
app=self, # Pass app
|
|
)
|
|
logging.debug(f"{log_prefix} ImagePipeline created.")
|
|
|
|
# 5. Test Mode Manager
|
|
self.test_mode_manager = TestModeManager(
|
|
app_state=self.state,
|
|
root=self.root,
|
|
sar_queue=self.sar_queue,
|
|
mfd_queue=self.mfd_queue,
|
|
app=self, # Pass app
|
|
)
|
|
logging.debug(f"{log_prefix} TestModeManager created.")
|
|
|
|
# 6. Map Integration Manager (Pass App instance)
|
|
self.map_integration_manager: Optional[MapIntegrationManager] = None
|
|
if config.ENABLE_MAP_OVERLAY:
|
|
if MAP_MODULES_LOADED and MapIntegrationManager is not None:
|
|
logging.info(f"{log_prefix} Initializing MapIntegrationManager...")
|
|
try:
|
|
self.map_integration_manager = MapIntegrationManager(
|
|
app_state=self.state,
|
|
tkinter_queue=self.tkinter_queue,
|
|
app=self, # Pass app
|
|
map_x=map_x,
|
|
map_y=map_y,
|
|
)
|
|
logging.info(
|
|
f"{log_prefix} MapIntegrationManager initialized successfully."
|
|
)
|
|
except Exception as map_mgr_e:
|
|
logging.exception(
|
|
f"{log_prefix} Failed to initialize MapIntegrationManager:"
|
|
)
|
|
self.map_integration_manager = None
|
|
self.set_status("Error: Map Init Failed")
|
|
else:
|
|
logging.error(
|
|
f"{log_prefix} Map Overlay enabled but required modules/manager failed to load."
|
|
)
|
|
self.set_status("Error: Map Modules Missing")
|
|
else:
|
|
logging.debug(f"{log_prefix} Map Overlay is disabled in configuration.")
|
|
|
|
# 7. Set initial UI state labels
|
|
self._update_initial_ui_labels()
|
|
|
|
# 8. Network Setup
|
|
self.local_ip: str = config.DEFAULT_SER_IP
|
|
self.local_port: int = config.DEFAULT_SER_PORT
|
|
self.udp_socket: Optional[socket.socket] = None
|
|
self.udp_receiver: Optional[UdpReceiver] = None
|
|
self.udp_thread: Optional[threading.Thread] = None
|
|
if not config.USE_LOCAL_IMAGES:
|
|
self._setup_network_receiver()
|
|
else:
|
|
logging.info(
|
|
f"{log_prefix} Network receiver disabled (USE_LOCAL_IMAGES=True)."
|
|
)
|
|
|
|
# 9. Initial Image Load Thread
|
|
self._start_initial_image_loader()
|
|
|
|
# 10. Start Queue Processors
|
|
self.process_sar_queue()
|
|
self.process_mfd_queue()
|
|
self.process_mouse_queue()
|
|
self.process_tkinter_queue()
|
|
logging.debug(f"{log_prefix} Queue processors scheduled.")
|
|
|
|
# 11. Start Periodic Status Updates
|
|
self.schedule_periodic_updates()
|
|
logging.debug(f"{log_prefix} Periodic updates scheduled.")
|
|
|
|
# 12. Set initial image mode
|
|
self.update_image_mode()
|
|
logging.debug(f"{log_prefix} Initial image mode set.")
|
|
|
|
logging.info(f"{log_prefix} Application initialization sequence complete.")
|
|
|
|
# --- Initialization Helper Methods ---
|
|
def _get_screen_dimensions(self) -> Tuple[int, int]:
|
|
"""Gets primary screen dimensions."""
|
|
log_prefix = "[App Init]"
|
|
try:
|
|
monitors = screeninfo.get_monitors()
|
|
if not monitors:
|
|
raise screeninfo.ScreenInfoError("No monitors detected.")
|
|
screen = monitors[0]
|
|
logging.debug(
|
|
f"{log_prefix} Detected Screen Dimensions: {screen.width}x{screen.height}"
|
|
)
|
|
return screen.width, screen.height
|
|
except Exception as e:
|
|
logging.warning(
|
|
f"{log_prefix} Screen info error: {e}. Using default 1920x1080."
|
|
)
|
|
return 1920, 1080
|
|
|
|
def _calculate_initial_sar_size(
|
|
self, desired_factor_if_map: int = 4
|
|
) -> Tuple[int, int]:
|
|
"""Calculates initial SAR display size based on config and map state."""
|
|
log_prefix = "[App Init]"
|
|
initial_w = self.state.sar_display_width
|
|
initial_h = self.state.sar_display_height
|
|
map_enabled_and_loaded = config.ENABLE_MAP_OVERLAY and MAP_MODULES_LOADED
|
|
if map_enabled_and_loaded:
|
|
forced_factor = max(1, desired_factor_if_map)
|
|
initial_w = config.SAR_WIDTH // forced_factor
|
|
initial_h = config.SAR_HEIGHT // forced_factor
|
|
self.state.update_sar_display_size(initial_w, initial_h) # Update state
|
|
logging.info(
|
|
f"{log_prefix} Map active, forcing SAR size to 1:{forced_factor} ({initial_w}x{initial_h})."
|
|
)
|
|
else:
|
|
logging.debug(
|
|
f"{log_prefix} Using initial SAR size from state: {initial_w}x{initial_h}."
|
|
)
|
|
return initial_w, initial_h
|
|
|
|
def _calculate_tkinter_position(self, screen_h: int) -> Tuple[int, int]:
|
|
"""Calculates the initial X, Y position for the Tkinter control panel window."""
|
|
x = 10
|
|
y = config.INITIAL_MFD_HEIGHT + 40
|
|
if y + config.TKINTER_MIN_HEIGHT > screen_h:
|
|
y = max(10, screen_h - config.TKINTER_MIN_HEIGHT - 10)
|
|
return x, y
|
|
|
|
def _calculate_mfd_position(self) -> Tuple[int, int]:
|
|
"""Calculates the initial X, Y position for the MFD display window."""
|
|
x = self.tkinter_x
|
|
y = 10
|
|
return x, y
|
|
|
|
def _calculate_sar_position(
|
|
self, screen_w: int, initial_sar_w: int
|
|
) -> Tuple[int, int]:
|
|
"""Calculates the initial X, Y position for the SAR display window."""
|
|
x = self.tkinter_x + config.TKINTER_MIN_WIDTH + 20
|
|
y = 10
|
|
if x + initial_sar_w > screen_w:
|
|
x = max(10, screen_w - initial_sar_w - 10)
|
|
return x, y
|
|
|
|
def _calculate_map_position(
|
|
self, screen_w: int, current_sar_w: int, max_map_width: int
|
|
) -> Tuple[int, int]:
|
|
"""Calculates the initial X, Y position for the Map display window."""
|
|
x = self.sar_x + current_sar_w + 20
|
|
y = 10
|
|
if x + max_map_width > screen_w:
|
|
x = max(10, screen_w - max_map_width - 10)
|
|
return x, y
|
|
|
|
def _setup_network_receiver(self):
|
|
"""Creates and starts the UDP socket and receiver thread."""
|
|
log_prefix = "[App Init Network]"
|
|
logging.info(
|
|
f"{log_prefix} Attempting to start network receiver on {self.local_ip}:{self.local_port}"
|
|
)
|
|
self.udp_socket = create_udp_socket(self.local_ip, self.local_port)
|
|
if self.udp_socket:
|
|
try:
|
|
self.udp_receiver = UdpReceiver(
|
|
app=self,
|
|
udp_socket=self.udp_socket,
|
|
set_new_sar_image_callback=self.handle_new_sar_data,
|
|
set_new_mfd_indices_image_callback=self.handle_new_mfd_data,
|
|
)
|
|
logging.info(f"{log_prefix} UdpReceiver instance created.")
|
|
self.udp_thread = threading.Thread(
|
|
target=self.udp_receiver.receive_udp_data,
|
|
name="UDPReceiverThread",
|
|
daemon=True,
|
|
)
|
|
self.udp_thread.start()
|
|
logging.info(f"{log_prefix} UDP Receiver thread started.")
|
|
self.set_status(f"Listening UDP {self.local_ip}:{self.local_port}")
|
|
except Exception as receiver_init_e:
|
|
logging.critical(
|
|
f"{log_prefix} Failed to initialize UdpReceiver: {receiver_init_e}",
|
|
exc_info=True,
|
|
)
|
|
self.set_status("Error: Receiver Init Failed")
|
|
if self.udp_socket:
|
|
close_udp_socket(self.udp_socket)
|
|
self.udp_socket = None
|
|
else:
|
|
logging.error(f"{log_prefix} UDP socket creation failed.")
|
|
self.set_status("Error: UDP Socket Failed")
|
|
|
|
def _start_initial_image_loader(self):
|
|
"""Starts a background thread to load local/test images if needed."""
|
|
log_prefix = "[App Init]"
|
|
should_load = config.USE_LOCAL_IMAGES or config.ENABLE_TEST_MODE
|
|
if should_load:
|
|
logging.debug(f"{log_prefix} Starting initial image loading thread...")
|
|
image_loading_thread = threading.Thread(
|
|
target=self.load_initial_images, name="ImageLoaderThread", daemon=True
|
|
)
|
|
image_loading_thread.start()
|
|
else:
|
|
logging.debug(f"{log_prefix} Skipping initial image loading.")
|
|
if self.root and self.root.winfo_exists():
|
|
self.root.after_idle(self._set_initial_display_from_loaded_data)
|
|
|
|
def _update_initial_ui_labels(self):
|
|
"""Sets the initial text for UI info labels based on default AppState."""
|
|
log_prefix = "[App Init]"
|
|
logging.debug(f"{log_prefix} Setting initial UI info labels...")
|
|
if not hasattr(self, "control_panel") or not self.control_panel:
|
|
logging.warning(f"{log_prefix} Control panel not ready for initial labels.")
|
|
return
|
|
try:
|
|
# SAR Center Label
|
|
default_geo = self.state.current_sar_geo_info
|
|
center_txt = "Image Ref: Lat=N/A, Lon=N/A"
|
|
if default_geo and default_geo.get("valid", False):
|
|
try:
|
|
lat_s = decimal_to_dms(math.degrees(default_geo["lat"]), True)
|
|
lon_s = decimal_to_dms(math.degrees(default_geo["lon"]), False)
|
|
center_txt = f"Image Ref: Lat={lat_s}, Lon={lon_s}"
|
|
except Exception as format_err:
|
|
logging.error(
|
|
f"{log_prefix} Error formatting initial geo label: {format_err}"
|
|
)
|
|
center_txt = "Image Ref: Format Error"
|
|
if hasattr(self.control_panel, "sar_center_label"):
|
|
self.control_panel.sar_center_label.config(text=center_txt)
|
|
# SAR Orientation Label
|
|
if hasattr(self.control_panel, "set_sar_orientation"):
|
|
self.control_panel.set_sar_orientation("N/A")
|
|
# SAR Size Km Label (Ensure method exists before calling)
|
|
if hasattr(self.control_panel, "set_sar_size_km"):
|
|
self.control_panel.set_sar_size_km("N/A")
|
|
# Mouse Coordinates Label
|
|
if hasattr(self.control_panel, "set_mouse_coordinates"):
|
|
self.control_panel.set_mouse_coordinates("N/A", "N/A")
|
|
# Statistics Labels
|
|
initial_stats = self.state.get_statistics()
|
|
drop_txt = f"Drop(Q): S={initial_stats['dropped_sar_q']},M={initial_stats['dropped_mfd_q']}, Tk={initial_stats['dropped_tk_q']},Mo={initial_stats['dropped_mouse_q']}"
|
|
incmpl_txt = f"Incmpl(RX): S={initial_stats['incomplete_sar_rx']},M={initial_stats['incomplete_mfd_rx']}"
|
|
if hasattr(self.control_panel, "dropped_label"):
|
|
self.control_panel.dropped_label.config(text=drop_txt)
|
|
if hasattr(self.control_panel, "incomplete_label"):
|
|
self.control_panel.incomplete_label.config(text=incmpl_txt)
|
|
logging.debug(f"{log_prefix} Initial UI state labels set.")
|
|
except tk.TclError as e:
|
|
logging.warning(
|
|
f"{log_prefix} Error setting initial UI labels (TclError): {e}"
|
|
)
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Unexpected error setting initial UI labels:"
|
|
)
|
|
|
|
# --- Network Data Handlers ---
|
|
def handle_new_sar_data(
|
|
self, normalized_image_uint8: np.ndarray, geo_info_radians: Dict[str, Any]
|
|
):
|
|
"""Safely handles new SAR data received from the network receiver."""
|
|
log_prefix = "[App CB SAR]"
|
|
logging.debug(
|
|
f"{log_prefix} Handling new SAR data (Shape: {normalized_image_uint8.shape}, Geo Valid: {geo_info_radians.get('valid', False)})..."
|
|
)
|
|
if self.state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown detected. Ignoring.")
|
|
return
|
|
self.state.set_sar_data(
|
|
normalized_image_uint8, geo_info_radians
|
|
) # Update state
|
|
logging.debug(f"{log_prefix} SAR data/GeoInfo updated in AppState.")
|
|
# Schedule main thread processing
|
|
if self.root and self.root.winfo_exists():
|
|
logging.debug(
|
|
f"{log_prefix} Scheduling _process_sar_update_on_main_thread."
|
|
)
|
|
self.root.after_idle(self._process_sar_update_on_main_thread)
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Cannot schedule SAR update: Root window gone."
|
|
)
|
|
|
|
def handle_new_mfd_data(self, image_indices: np.ndarray):
|
|
"""Safely handles new MFD index data received from the network receiver."""
|
|
log_prefix = "[App CB MFD]"
|
|
logging.debug(
|
|
f"{log_prefix} Handling new MFD index data (Shape: {image_indices.shape})..."
|
|
)
|
|
if self.state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown detected. Ignoring.")
|
|
return
|
|
self.state.set_mfd_indices(image_indices) # Update state
|
|
logging.debug(f"{log_prefix} MFD indices updated in AppState.")
|
|
# Schedule main thread processing
|
|
if self.root and self.root.winfo_exists():
|
|
logging.debug(
|
|
f"{log_prefix} Scheduling _process_mfd_update_on_main_thread."
|
|
)
|
|
self.root.after_idle(self._process_mfd_update_on_main_thread)
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Cannot schedule MFD update: Root window gone."
|
|
)
|
|
|
|
# --- Main Thread Processing Triggers ---
|
|
def _process_sar_update_on_main_thread(self):
|
|
"""
|
|
Processes SAR updates scheduled to run on the main GUI thread.
|
|
Updates UI labels, triggers image pipeline, triggers map update,
|
|
and handles KML generation/cleanup.
|
|
"""
|
|
log_prefix = "[App MainThread SAR Update]"
|
|
if self.state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown detected. Skipping.")
|
|
return
|
|
logging.debug(f"{log_prefix} Processing scheduled SAR update...")
|
|
|
|
# 1. Update UI Labels
|
|
self._update_sar_ui_labels()
|
|
|
|
# 2. Trigger Image Pipeline
|
|
logging.debug(f"{log_prefix} Calling image_pipeline.process_sar_for_display...")
|
|
try:
|
|
if hasattr(self, "image_pipeline") and self.image_pipeline:
|
|
self.image_pipeline.process_sar_for_display()
|
|
else:
|
|
logging.error(f"{log_prefix} ImagePipeline not available.")
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error calling ImagePipeline for SAR:")
|
|
|
|
# 3. Trigger Map Update
|
|
geo_info = self.state.current_sar_geo_info
|
|
is_geo_valid = geo_info and geo_info.get("valid", False)
|
|
map_manager_active = (
|
|
hasattr(self, "map_integration_manager")
|
|
and self.map_integration_manager is not None
|
|
)
|
|
if map_manager_active and is_geo_valid:
|
|
logging.debug(
|
|
f"{log_prefix} Calling map_integration_manager.update_map_overlay..."
|
|
)
|
|
try:
|
|
self.map_integration_manager.update_map_overlay(
|
|
self.state.current_sar_normalized, geo_info
|
|
)
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error calling map manager update:")
|
|
elif config.ENABLE_MAP_OVERLAY: # Log reason for skipping only if map enabled
|
|
if not map_manager_active:
|
|
logging.debug(
|
|
f"{log_prefix} Skipping map update: MapIntegrationManager not available."
|
|
)
|
|
elif not is_geo_valid:
|
|
logging.debug(f"{log_prefix} Skipping map update: GeoInfo not valid.")
|
|
|
|
# 4. KML Generation and Cleanup Logic
|
|
if is_geo_valid and config.ENABLE_KML_GENERATION:
|
|
kml_log_prefix = "[App KML]"
|
|
logging.debug(f"{kml_log_prefix} KML generation enabled.")
|
|
try:
|
|
kml_dir = config.KML_OUTPUT_DIRECTORY
|
|
os.makedirs(kml_dir, exist_ok=True)
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
|
|
kml_filename = f"sar_footprint_{timestamp}.kml"
|
|
kml_output_path = os.path.join(kml_dir, kml_filename)
|
|
logging.debug(
|
|
f"{kml_log_prefix} Calling generate_sar_kml for path: {kml_output_path}"
|
|
)
|
|
kml_success = generate_sar_kml(
|
|
geo_info, kml_output_path
|
|
) # Use util function
|
|
|
|
if kml_success:
|
|
logging.info(
|
|
f"{kml_log_prefix} KML file generated successfully: {kml_output_path}"
|
|
)
|
|
# --- Call KML Cleanup ---
|
|
logging.debug(f"{kml_log_prefix} Calling KML cleanup utility...")
|
|
try:
|
|
cleanup_old_kml_files(
|
|
config.KML_OUTPUT_DIRECTORY, config.MAX_KML_FILES
|
|
) # Use util function
|
|
except Exception as cleanup_e:
|
|
logging.exception(
|
|
f"{kml_log_prefix} Error during KML cleanup call:"
|
|
)
|
|
# --- End Cleanup Call ---
|
|
if config.AUTO_LAUNCH_GOOGLE_EARTH:
|
|
logging.debug(
|
|
f"{kml_log_prefix} Auto-launch Google Earth enabled."
|
|
)
|
|
launch_google_earth(kml_output_path) # Use util function
|
|
else:
|
|
logging.debug(
|
|
f"{kml_log_prefix} Auto-launch Google Earth disabled."
|
|
)
|
|
else:
|
|
logging.error(f"{kml_log_prefix} KML file generation failed.")
|
|
except ImportError as ie:
|
|
logging.error(
|
|
f"{kml_log_prefix} Cannot generate/cleanup KML due to missing library: {ie}"
|
|
)
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{kml_log_prefix} Error during KML generation/launch/cleanup process:"
|
|
)
|
|
elif is_geo_valid and not config.ENABLE_KML_GENERATION:
|
|
logging.debug(f"{log_prefix} KML generation disabled in config.")
|
|
|
|
# 5. Update FPS Statistics for SAR
|
|
self._update_fps_stats("sar")
|
|
|
|
logging.debug(f"{log_prefix} Finished processing SAR update.")
|
|
|
|
def _process_mfd_update_on_main_thread(self):
|
|
"""Processes MFD updates scheduled to run on the main GUI thread."""
|
|
log_prefix = "[App MainThread MFD Update]"
|
|
if self.state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown detected. Skipping.")
|
|
return
|
|
logging.debug(f"{log_prefix} Processing scheduled MFD update...")
|
|
|
|
# 1. Trigger Image Processing Pipeline
|
|
logging.debug(f"{log_prefix} Calling image_pipeline.process_mfd_for_display...")
|
|
try:
|
|
if hasattr(self, "image_pipeline") and self.image_pipeline:
|
|
self.image_pipeline.process_mfd_for_display()
|
|
else:
|
|
logging.error(f"{log_prefix} ImagePipeline not available.")
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error calling ImagePipeline for MFD:")
|
|
|
|
# 2. Update FPS Statistics for MFD
|
|
self._update_fps_stats("mfd")
|
|
|
|
logging.debug(f"{log_prefix} Finished processing MFD update.")
|
|
|
|
def _update_sar_ui_labels(self):
|
|
"""Helper method to update SAR related UI labels from AppState."""
|
|
log_prefix = "[App MainThread SAR Update]"
|
|
if (
|
|
not hasattr(self, "control_panel")
|
|
or not self.control_panel
|
|
or not self.control_panel.winfo_exists()
|
|
):
|
|
return
|
|
|
|
geo_info = self.state.current_sar_geo_info
|
|
center_txt = "Image Ref: N/A"
|
|
orient_txt = "N/A"
|
|
size_txt = "N/A"
|
|
is_valid_geo = geo_info and geo_info.get("valid", False)
|
|
|
|
if is_valid_geo:
|
|
try:
|
|
lat_d = math.degrees(geo_info["lat"])
|
|
lon_d = math.degrees(geo_info["lon"])
|
|
orient_d = math.degrees(geo_info["orientation"])
|
|
lat_s = decimal_to_dms(lat_d, is_latitude=True)
|
|
lon_s = decimal_to_dms(lon_d, is_latitude=False)
|
|
orient_txt = f"{orient_d:.2f}°"
|
|
center_txt = f"Image Ref: Lat={lat_s}, Lon={lon_s}"
|
|
|
|
# Calculate size in Km
|
|
scale_x = geo_info.get("scale_x", 0.0)
|
|
scale_y = geo_info.get("scale_y", 0.0)
|
|
width_px = geo_info.get("width_px", 0)
|
|
height_px = geo_info.get("height_px", 0)
|
|
if scale_x > 0 and width_px > 0 and scale_y > 0 and height_px > 0:
|
|
width_km = (scale_x * width_px) / 1000.0
|
|
height_km = (scale_y * height_px) / 1000.0
|
|
size_txt = f"W: {width_km:.1f} km, H: {height_km:.1f} km"
|
|
else:
|
|
size_txt = "Invalid Scale/Dims"
|
|
except KeyError as ke:
|
|
logging.error(
|
|
f"{log_prefix} Missing key '{ke}' in geo_info for UI update."
|
|
)
|
|
center_txt = "Ref: Data Error"
|
|
orient_txt = "Data Error"
|
|
size_txt = "Data Error"
|
|
is_valid_geo = False
|
|
except Exception as e:
|
|
logging.error(f"{log_prefix} Error formatting geo info for UI: {e}")
|
|
center_txt = "Ref: Format Error"
|
|
orient_txt = "Format Error"
|
|
size_txt = "Format Error"
|
|
is_valid_geo = False
|
|
|
|
# Safely update UI elements
|
|
try:
|
|
if hasattr(self.control_panel, "sar_center_label"):
|
|
self.control_panel.sar_center_label.config(text=center_txt)
|
|
if hasattr(self.control_panel, "set_sar_orientation"):
|
|
self.control_panel.set_sar_orientation(orient_txt)
|
|
if hasattr(self.control_panel, "set_sar_size_km"):
|
|
self.control_panel.set_sar_size_km(size_txt)
|
|
if not is_valid_geo and hasattr(
|
|
self.control_panel, "set_mouse_coordinates"
|
|
):
|
|
self.control_panel.set_mouse_coordinates("N/A", "N/A")
|
|
except tk.TclError as ui_err:
|
|
if not self.state.shutting_down:
|
|
logging.warning(
|
|
f"{log_prefix} Error updating SAR UI labels (TclError): {ui_err}"
|
|
)
|
|
except Exception as gen_err:
|
|
logging.exception(f"{log_prefix} Unexpected error updating SAR UI labels:")
|
|
|
|
def _update_fps_stats(self, img_type: str):
|
|
"""Helper function to update FPS counters in AppState."""
|
|
now = time.time()
|
|
log_prefix = "[App FPS Update]"
|
|
try:
|
|
if img_type == "sar":
|
|
self.state.sar_frame_count += 1
|
|
elapsed = now - self.state.sar_update_time
|
|
if elapsed >= config.LOG_UPDATE_INTERVAL:
|
|
self.state.sar_fps = self.state.sar_frame_count / elapsed
|
|
self.state.sar_update_time = now
|
|
self.state.sar_frame_count = 0
|
|
logging.debug(
|
|
f"{log_prefix} SAR FPS calculated: {self.state.sar_fps:.2f}"
|
|
)
|
|
elif img_type == "mfd":
|
|
self.state.mfd_frame_count += 1
|
|
elapsed = now - self.state.mfd_start_time
|
|
if elapsed >= config.LOG_UPDATE_INTERVAL:
|
|
self.state.mfd_fps = self.state.mfd_frame_count / elapsed
|
|
self.state.mfd_start_time = now
|
|
self.state.mfd_frame_count = 0
|
|
logging.debug(
|
|
f"{log_prefix} MFD FPS calculated: {self.state.mfd_fps:.2f}"
|
|
)
|
|
except Exception as e:
|
|
logging.warning(
|
|
f"{log_prefix} Error updating FPS stats for '{img_type}': {e}"
|
|
)
|
|
|
|
# --- Trigger Methods ---
|
|
def _trigger_sar_update(self):
|
|
"""Triggers a SAR image update processing via ImagePipeline if not in test mode."""
|
|
log_prefix = "[App Trigger SAR]"
|
|
if self.state.shutting_down:
|
|
return
|
|
if not self.state.test_mode_active:
|
|
logging.debug(f"{log_prefix} Triggering SAR update via ImagePipeline.")
|
|
try:
|
|
if hasattr(self, "image_pipeline") and self.image_pipeline:
|
|
self.image_pipeline.process_sar_for_display()
|
|
else:
|
|
logging.error(f"{log_prefix} 'image_pipeline' not found.")
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Error calling image_pipeline.process_sar_for_display:"
|
|
)
|
|
else:
|
|
logging.debug(
|
|
f"{log_prefix} SAR update trigger skipped (Test Mode active)."
|
|
)
|
|
|
|
def _trigger_mfd_update(self):
|
|
"""Triggers an MFD image update processing via ImagePipeline if not in test mode."""
|
|
log_prefix = "[App Trigger MFD]"
|
|
if self.state.shutting_down:
|
|
return
|
|
if not self.state.test_mode_active:
|
|
logging.debug(f"{log_prefix} Triggering MFD update via ImagePipeline.")
|
|
try:
|
|
if hasattr(self, "image_pipeline") and self.image_pipeline:
|
|
self.image_pipeline.process_mfd_for_display()
|
|
else:
|
|
logging.error(f"{log_prefix} 'image_pipeline' not found.")
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Error calling image_pipeline.process_mfd_for_display:"
|
|
)
|
|
else:
|
|
logging.debug(
|
|
f"{log_prefix} MFD update trigger skipped (Test Mode active)."
|
|
)
|
|
|
|
# --- Periodic Update Scheduling ---
|
|
def schedule_periodic_updates(self):
|
|
"""Schedules the regular update of the status bar information."""
|
|
log_prefix = "[App Status Scheduler]"
|
|
if self.state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown detected. Stopping periodic updates.")
|
|
return
|
|
try:
|
|
self.update_status()
|
|
except Exception as e:
|
|
logging.error(
|
|
f"{log_prefix} Error during periodic status update execution: {e}"
|
|
)
|
|
interval_ms = max(100, int(config.LOG_UPDATE_INTERVAL * 1000))
|
|
try:
|
|
if self.root and self.root.winfo_exists():
|
|
self.root.after(interval_ms, self.schedule_periodic_updates)
|
|
except Exception as e:
|
|
if not self.state.shutting_down:
|
|
logging.warning(f"{log_prefix} Error rescheduling periodic update: {e}")
|
|
|
|
# --- Initial Image Loading ---
|
|
def load_initial_images(self):
|
|
"""(Runs in background thread) Loads initial local/test images into AppState."""
|
|
log_prefix = "[App Image Loader]"
|
|
if self.state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown detected. Aborting.")
|
|
return
|
|
logging.info(f"{log_prefix} Initial image loading thread started.")
|
|
if self.root and self.root.winfo_exists():
|
|
self.root.after_idle(self.set_status, "Loading initial images...")
|
|
try:
|
|
if config.ENABLE_TEST_MODE or self.state.test_mode_active:
|
|
if hasattr(self, "test_mode_manager") and self.test_mode_manager:
|
|
logging.debug(f"{log_prefix} Ensuring test images are generated...")
|
|
self.test_mode_manager._ensure_test_images()
|
|
else:
|
|
logging.error(
|
|
f"{log_prefix} TestModeManager not available for test image generation."
|
|
)
|
|
if config.USE_LOCAL_IMAGES:
|
|
logging.debug(f"{log_prefix} Loading local MFD image...")
|
|
self._load_local_mfd_image()
|
|
logging.debug(f"{log_prefix} Loading local SAR image...")
|
|
self._load_local_sar_image()
|
|
if self.root and self.root.winfo_exists():
|
|
logging.debug(
|
|
f"{log_prefix} Scheduling _set_initial_display_from_loaded_data."
|
|
)
|
|
self.root.after_idle(self._set_initial_display_from_loaded_data)
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error during initial image loading:")
|
|
if self.root and self.root.winfo_exists():
|
|
self.root.after_idle(self.set_status, "Error Loading Images")
|
|
finally:
|
|
logging.info(f"{log_prefix} Initial image loading thread finished.")
|
|
|
|
def _load_local_mfd_image(self):
|
|
"""Loads local MFD image data (indices) into AppState."""
|
|
log_prefix = "[App Image Loader]"
|
|
default_indices = np.random.randint(
|
|
0, 256, (config.MFD_HEIGHT, config.MFD_WIDTH), dtype=np.uint8
|
|
)
|
|
loaded_indices = None
|
|
try:
|
|
mfd_path = config.MFD_IMAGE_PATH
|
|
if os.path.exists(mfd_path):
|
|
logging.warning(
|
|
f"{log_prefix} Local MFD loading from file NYI. Using random data."
|
|
)
|
|
loaded_indices = default_indices # Placeholder
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Local MFD image file not found: {mfd_path}. Using random data."
|
|
)
|
|
loaded_indices = default_indices
|
|
self.state.local_mfd_image_data_indices = loaded_indices
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error loading local MFD image:")
|
|
self.state.local_mfd_image_data_indices = default_indices
|
|
|
|
def _load_local_sar_image(self):
|
|
"""Loads local SAR image data (raw) into AppState."""
|
|
log_prefix = "[App Image Loader]"
|
|
default_raw_data = np.zeros(
|
|
(config.SAR_HEIGHT, config.SAR_WIDTH), dtype=config.SAR_DATA_TYPE
|
|
)
|
|
loaded_raw_data = None
|
|
try:
|
|
# Use the load_image utility
|
|
loaded_raw_data = load_image(
|
|
config.SAR_IMAGE_PATH, config.SAR_DATA_TYPE
|
|
) # Logs internally
|
|
if loaded_raw_data is None or loaded_raw_data.size == 0:
|
|
logging.error(
|
|
f"{log_prefix} Failed to load local SAR raw data. Using zeros."
|
|
)
|
|
loaded_raw_data = default_raw_data
|
|
else:
|
|
logging.info(
|
|
f"{log_prefix} Loaded local SAR raw data (shape: {loaded_raw_data.shape})."
|
|
)
|
|
self.state.local_sar_image_data_raw = loaded_raw_data
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error loading local SAR raw data:")
|
|
self.state.local_sar_image_data_raw = default_raw_data
|
|
|
|
def _set_initial_display_from_loaded_data(self):
|
|
"""(Runs in main thread) Sets the initial display based on loaded data and mode."""
|
|
log_prefix = "[App Init Display]"
|
|
if self.state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown detected. Skipping.")
|
|
return
|
|
|
|
is_test = self.state.test_mode_active
|
|
is_local = config.USE_LOCAL_IMAGES
|
|
|
|
if not is_test and is_local: # Local Image Mode
|
|
logging.info(f"{log_prefix} Setting initial display from local images.")
|
|
if self.state.local_mfd_image_data_indices is not None:
|
|
self.state.current_mfd_indices = (
|
|
self.state.local_mfd_image_data_indices.copy()
|
|
)
|
|
self.image_pipeline.process_mfd_for_display()
|
|
else:
|
|
logging.warning(f"{log_prefix} Local MFD data not loaded.")
|
|
if self.state.local_sar_image_data_raw is not None:
|
|
self.set_initial_sar_image(
|
|
self.state.local_sar_image_data_raw
|
|
) # Handles normalization, state update, UI reset, pipeline call
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Local SAR data not loaded. Displaying black."
|
|
)
|
|
self.set_initial_sar_image(None) # Display black
|
|
|
|
elif is_test: # Test Mode
|
|
logging.info(
|
|
f"{log_prefix} Test mode active. Display handled by TestModeManager timers."
|
|
)
|
|
|
|
else: # Network Mode
|
|
logging.info(
|
|
f"{log_prefix} Network mode active. Displaying initial placeholders."
|
|
)
|
|
self._show_network_placeholders()
|
|
|
|
# Set Final Initial Status (only if map isn't loading)
|
|
map_manager_active = (
|
|
hasattr(self, "map_integration_manager")
|
|
and self.map_integration_manager is not None
|
|
)
|
|
map_thread_running = False
|
|
if map_manager_active:
|
|
map_thread_attr = getattr(
|
|
self.map_integration_manager, "_map_initial_display_thread", None
|
|
)
|
|
if map_thread_attr and isinstance(map_thread_attr, threading.Thread):
|
|
map_thread_running = map_thread_attr.is_alive()
|
|
map_is_loading = map_manager_active and map_thread_running
|
|
|
|
if not map_is_loading:
|
|
final_status = (
|
|
"Ready (Test Mode)"
|
|
if is_test
|
|
else (
|
|
"Ready (Local Mode)"
|
|
if is_local
|
|
else (
|
|
f"Listening UDP {self.local_ip}:{self.local_port}"
|
|
if self.udp_socket
|
|
else "Error: No Socket"
|
|
)
|
|
)
|
|
)
|
|
self.set_status(final_status)
|
|
logging.debug(
|
|
f"{log_prefix} Set final initial status (map not loading): '{final_status}'"
|
|
)
|
|
else:
|
|
logging.debug(
|
|
f"{log_prefix} Initial map display is still loading. Status update deferred."
|
|
)
|
|
|
|
def set_initial_sar_image(self, raw_image_data: Optional[np.ndarray]):
|
|
"""Processes provided raw SAR data (or None for black image), updates AppState, resets UI, triggers display."""
|
|
log_prefix = "[App Init SAR Image]"
|
|
if self.state.shutting_down:
|
|
return
|
|
logging.debug(f"{log_prefix} Processing initial raw SAR image...")
|
|
|
|
normalized: Optional[np.ndarray] = None
|
|
if raw_image_data is not None and raw_image_data.size > 0:
|
|
try:
|
|
normalized = normalize_image(
|
|
raw_image_data, target_type=np.uint8
|
|
) # Logs internally
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Error during initial SAR normalization:"
|
|
)
|
|
else:
|
|
logging.warning(f"{log_prefix} Provided raw SAR data is invalid or empty.")
|
|
|
|
if normalized is not None:
|
|
self.state.current_sar_normalized = normalized
|
|
logging.debug(
|
|
f"{log_prefix} Stored normalized initial SAR image in AppState."
|
|
)
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Using black image fallback for initial SAR display."
|
|
)
|
|
if self.state.current_sar_normalized is None:
|
|
self.state.current_sar_normalized = np.zeros(
|
|
(config.SAR_HEIGHT, config.SAR_WIDTH), dtype=np.uint8
|
|
)
|
|
self.state.current_sar_normalized.fill(0)
|
|
|
|
self.state.current_sar_geo_info["valid"] = (
|
|
False # Mark geo invalid for local/black image
|
|
)
|
|
self._reset_ui_geo_info() # Schedule UI label reset
|
|
|
|
logging.debug(
|
|
f"{log_prefix} Triggering display of initial SAR image via ImagePipeline."
|
|
)
|
|
if hasattr(self, "image_pipeline") and self.image_pipeline:
|
|
self.image_pipeline.process_sar_for_display()
|
|
else:
|
|
logging.error(f"{log_prefix} ImagePipeline not available for initial SAR.")
|
|
logging.info(f"{log_prefix} Initial SAR image processed and queued.")
|
|
|
|
# --- Mode Switching UI Actions ---
|
|
def activate_test_mode_ui_actions(self):
|
|
"""Handles UI and state changes needed when activating test mode."""
|
|
log_prefix = "[App Test Activate]"
|
|
logging.info(
|
|
f"{log_prefix} Performing UI/State actions for Test Mode activation."
|
|
)
|
|
self.set_status("Activating Test Mode...")
|
|
self._reset_ui_geo_info() # Reset geo display
|
|
clear_queue(self.mfd_queue)
|
|
clear_queue(self.sar_queue)
|
|
logging.debug(f"{log_prefix} Display queues cleared.")
|
|
self.set_status("Ready (Test Mode)")
|
|
logging.info(f"{log_prefix} Test Mode UI/State actions complete.")
|
|
|
|
def deactivate_test_mode_ui_actions(self):
|
|
"""Handles UI and state changes needed when deactivating test mode."""
|
|
log_prefix = "[App Test Deactivate]"
|
|
logging.info(
|
|
f"{log_prefix} Performing UI/State actions for Test Mode deactivation -> Normal Mode."
|
|
)
|
|
self.set_status("Activating Normal Mode...")
|
|
clear_queue(self.mfd_queue)
|
|
clear_queue(self.sar_queue)
|
|
logging.debug(f"{log_prefix} Display queues cleared.")
|
|
self._reset_ui_geo_info() # Reset geo display
|
|
|
|
if config.USE_LOCAL_IMAGES: # Local Image Mode Restoration
|
|
logging.info(
|
|
f"{log_prefix} Restoring display from local images stored in AppState."
|
|
)
|
|
if self.state.local_mfd_image_data_indices is not None:
|
|
self.state.current_mfd_indices = (
|
|
self.state.local_mfd_image_data_indices.copy()
|
|
)
|
|
self.image_pipeline.process_mfd_for_display()
|
|
else:
|
|
logging.warning(f"{log_prefix} No local MFD data to restore.")
|
|
if self.state.local_sar_image_data_raw is not None:
|
|
self.set_initial_sar_image(self.state.local_sar_image_data_raw)
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} No local SAR data to restore. Displaying black."
|
|
)
|
|
self.set_initial_sar_image(None)
|
|
self.set_status("Ready (Local Mode)")
|
|
else: # Network Mode Restoration
|
|
logging.info(
|
|
f"{log_prefix} Switched to Network mode. Displaying placeholders."
|
|
)
|
|
self._show_network_placeholders()
|
|
socket_ok = self.udp_socket is not None and self.udp_socket.fileno() != -1
|
|
status = (
|
|
f"Listening UDP {self.local_ip}:{self.local_port}"
|
|
if socket_ok
|
|
else "Error: No UDP Socket"
|
|
)
|
|
self.set_status(status)
|
|
logging.info(f"{log_prefix} Normal Mode UI/State actions complete.")
|
|
|
|
def _reset_ui_geo_info(self):
|
|
"""Schedules UI reset for geo-related labels on the main thread."""
|
|
log_prefix = "[App UI Reset]"
|
|
if self.root and self.root.winfo_exists():
|
|
if hasattr(self.control_panel, "set_sar_orientation"):
|
|
self.root.after_idle(
|
|
lambda: self.control_panel.set_sar_orientation("N/A")
|
|
)
|
|
if hasattr(self.control_panel, "set_mouse_coordinates"):
|
|
self.root.after_idle(
|
|
lambda: self.control_panel.set_mouse_coordinates("N/A", "N/A")
|
|
)
|
|
if hasattr(self.control_panel, "sar_center_label"):
|
|
self.root.after_idle(
|
|
lambda: self.control_panel.sar_center_label.config(
|
|
text="Image Ref: Lat=N/A, Lon=N/A"
|
|
)
|
|
)
|
|
# Ensure sar_size_label is reset too if it exists
|
|
if hasattr(self.control_panel, "set_sar_size_km"):
|
|
self.root.after_idle(lambda: self.control_panel.set_sar_size_km("N/A"))
|
|
logging.debug(f"{log_prefix} Geo UI label reset scheduled.")
|
|
|
|
def _revert_test_mode_ui(self):
|
|
"""Tries to uncheck the test mode checkbox in the UI and resets the state flag."""
|
|
log_prefix = "[App Mode Switch]"
|
|
logging.warning(
|
|
f"{log_prefix} Reverting Test Mode UI and state due to activation failure."
|
|
)
|
|
if (
|
|
self.root
|
|
and self.root.winfo_exists()
|
|
and hasattr(self.control_panel, "test_image_var")
|
|
):
|
|
try:
|
|
self.root.after_idle(
|
|
self.control_panel.test_image_var.set, 0
|
|
) # Uncheck
|
|
except Exception as e:
|
|
logging.warning(
|
|
f"{log_prefix} Failed to schedule uncheck of test mode checkbox: {e}"
|
|
)
|
|
if hasattr(self, "state"):
|
|
self.state.test_mode_active = False
|
|
logging.debug(f"{log_prefix} Test mode state flag reverted to False.")
|
|
|
|
def _show_network_placeholders(self):
|
|
"""Queues placeholder images for MFD and SAR displays."""
|
|
log_prefix = "[App Placeholders]"
|
|
logging.debug(f"{log_prefix} Queueing network placeholder images.")
|
|
try:
|
|
ph_mfd = np.full(
|
|
(config.INITIAL_MFD_HEIGHT, config.INITIAL_MFD_WIDTH, 3),
|
|
30,
|
|
dtype=np.uint8,
|
|
)
|
|
ph_sar = np.full(
|
|
(self.state.sar_display_height, self.state.sar_display_width, 3),
|
|
60,
|
|
dtype=np.uint8,
|
|
)
|
|
put_queue(self.mfd_queue, ph_mfd, "mfd", self)
|
|
put_queue(self.sar_queue, ph_sar, "sar", self)
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Error creating/queueing placeholder images:"
|
|
)
|
|
|
|
# --- Mouse Coordinate Handling ---
|
|
def process_mouse_queue(self):
|
|
"""Processes raw mouse coords from queue, calculates geo coords, queues result."""
|
|
log_prefix = "[App GeoCalc]"
|
|
if self.state.shutting_down:
|
|
return
|
|
|
|
raw_coords = None
|
|
try:
|
|
raw_coords = self.mouse_queue.get(block=False)
|
|
self.mouse_queue.task_done()
|
|
except queue.Empty:
|
|
pass
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error getting from mouse queue:")
|
|
pass
|
|
|
|
if (
|
|
raw_coords is not None
|
|
and isinstance(raw_coords, tuple)
|
|
and len(raw_coords) == 2
|
|
):
|
|
x_disp, y_disp = raw_coords
|
|
geo = self.state.current_sar_geo_info
|
|
disp_w = self.state.sar_display_width
|
|
disp_h = self.state.sar_display_height
|
|
lat_s: str = "N/A"
|
|
lon_s: str = "N/A"
|
|
|
|
is_geo_valid_for_calc = (
|
|
geo
|
|
and geo.get("valid")
|
|
and disp_w > 0
|
|
and disp_h > 0
|
|
and geo.get("width_px", 0) > 0
|
|
and geo.get("height_px", 0) > 0
|
|
and geo.get("scale_x", 0.0) > 0
|
|
and geo.get("scale_y", 0.0) > 0
|
|
and "lat" in geo
|
|
and "lon" in geo
|
|
and "ref_x" in geo
|
|
and "ref_y" in geo
|
|
and "orientation" in geo
|
|
)
|
|
|
|
if is_geo_valid_for_calc:
|
|
logging.debug(
|
|
f"{log_prefix} Processing mouse coords: Display({x_disp},{y_disp}) with valid GeoInfo."
|
|
)
|
|
try:
|
|
orig_w = geo["width_px"]
|
|
orig_h = geo["height_px"]
|
|
scale_x = geo["scale_x"]
|
|
scale_y = geo["scale_y"]
|
|
ref_x = geo["ref_x"]
|
|
ref_y = geo["ref_y"]
|
|
ref_lat_rad = geo["lat"]
|
|
ref_lon_rad = geo["lon"]
|
|
original_orient_rad = geo.get(
|
|
"orientation", 0.0
|
|
) # Angle from geo data
|
|
angle_for_inverse_rotation_rad = (
|
|
original_orient_rad # Angle needed to undo display rotation
|
|
)
|
|
|
|
nx_disp = x_disp / disp_w
|
|
ny_disp = y_disp / disp_h # Normalize display coords
|
|
nx_orig_norm, ny_orig_norm = nx_disp, ny_disp
|
|
|
|
# Apply Inverse Rotation
|
|
if abs(angle_for_inverse_rotation_rad) > 1e-4:
|
|
logging.debug(
|
|
f"{log_prefix} Applying inverse rotation (angle: {math.degrees(angle_for_inverse_rotation_rad):.2f} deg)..."
|
|
)
|
|
arad_inv = angle_for_inverse_rotation_rad
|
|
cosa = math.cos(arad_inv)
|
|
sina = math.sin(arad_inv)
|
|
cx, cy = 0.5, 0.5
|
|
tx = nx_disp - cx
|
|
ty = ny_disp - cy
|
|
rtx = tx * cosa - ty * sina
|
|
rty = tx * sina + ty * cosa
|
|
nx_orig_norm = rtx + cx
|
|
ny_orig_norm = rty + cy
|
|
logging.debug(
|
|
f"{log_prefix} Inverse rotation applied. Norm coords: ({nx_orig_norm:.4f}, {ny_orig_norm:.4f})"
|
|
)
|
|
|
|
# Convert back to original pixel coordinates
|
|
orig_x = max(0.0, min(nx_orig_norm * orig_w, orig_w - 1.0))
|
|
orig_y = max(0.0, min(ny_orig_norm * orig_h, orig_h - 1.0))
|
|
logging.debug(
|
|
f"{log_prefix} Calculated original pixel coords: ({orig_x:.2f}, {orig_y:.2f})"
|
|
)
|
|
|
|
# Geodetic Calculation
|
|
pixel_delta_x = orig_x - ref_x
|
|
pixel_delta_y = ref_y - orig_y # y inverted
|
|
meters_delta_x = pixel_delta_x * scale_x
|
|
meters_delta_y = pixel_delta_y * scale_y
|
|
logging.debug(
|
|
f"{log_prefix} Offset (meters): dX={meters_delta_x:.1f} (E), dY={meters_delta_y:.1f} (N)"
|
|
)
|
|
M_PER_DLAT = 111132.954
|
|
M_PER_DLON_EQ = 111319.488
|
|
m_per_dlon = max(abs(M_PER_DLON_EQ * math.cos(ref_lat_rad)), 1e-3)
|
|
lat_offset_deg = meters_delta_y / M_PER_DLAT
|
|
lon_offset_deg = meters_delta_x / m_per_dlon
|
|
logging.debug(
|
|
f"{log_prefix} Offset (degrees): dLat={lat_offset_deg:.6f}, dLon={lon_offset_deg:.6f}"
|
|
)
|
|
final_lat_deg = math.degrees(ref_lat_rad) + lat_offset_deg
|
|
final_lon_deg = math.degrees(ref_lon_rad) + lon_offset_deg
|
|
logging.debug(
|
|
f"{log_prefix} Final coords (dec deg): Lat={final_lat_deg:.6f}, Lon={final_lon_deg:.6f}"
|
|
)
|
|
|
|
# Format Output
|
|
if (
|
|
math.isfinite(final_lat_deg)
|
|
and math.isfinite(final_lon_deg)
|
|
and abs(final_lat_deg) <= 90.0
|
|
and abs(final_lon_deg) <= 180.0
|
|
):
|
|
lat_s = decimal_to_dms(final_lat_deg, is_latitude=True)
|
|
lon_s = decimal_to_dms(final_lon_deg, is_latitude=False)
|
|
if (
|
|
"Error" in lat_s
|
|
or "Invalid" in lat_s
|
|
or "Error" in lon_s
|
|
or "Invalid" in lon_s
|
|
):
|
|
logging.warning(f"{log_prefix} DMS conversion failed.")
|
|
lat_s, lon_s = "Error DMS", "Error DMS"
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Calculated coordinates out of range."
|
|
)
|
|
lat_s, lon_s = "Invalid Calc", "Invalid Calc"
|
|
|
|
except KeyError as ke:
|
|
logging.error(f"{log_prefix} Missing key in geo_info: {ke}")
|
|
lat_s, lon_s = "Error Key", "Error Key"
|
|
except Exception as calc_e:
|
|
logging.exception(f"{log_prefix} Geo calculation error:")
|
|
lat_s, lon_s = "Calc Error", "Calc Error"
|
|
|
|
# Queue Result
|
|
result_payload = (lat_s, lon_s)
|
|
self.put_mouse_coordinates_queue(result_payload)
|
|
|
|
# Reschedule
|
|
if not self.state.shutting_down:
|
|
self._reschedule_queue_processor(self.process_mouse_queue, delay=50)
|
|
|
|
def put_mouse_coordinates_queue(self, coords_tuple: Tuple[str, str]):
|
|
"""Puts processed mouse coords tuple onto Tkinter queue."""
|
|
log_prefix = "[App Mouse Queue Put]"
|
|
if self.state.shutting_down:
|
|
return
|
|
command = "MOUSE_COORDS"
|
|
payload = coords_tuple
|
|
logging.debug(
|
|
f"{log_prefix} Putting command '{command}' with payload {payload} onto tkinter_queue."
|
|
)
|
|
put_queue(
|
|
self.tkinter_queue,
|
|
(command, payload),
|
|
queue_name="tkinter",
|
|
app_instance=self,
|
|
)
|
|
|
|
# --- Queue Processors ---
|
|
def process_sar_queue(self):
|
|
"""Gets processed SAR image from queue and displays it."""
|
|
log_prefix = "[App QProc SAR]"
|
|
if self.state.shutting_down:
|
|
return
|
|
|
|
image_to_display = None
|
|
try:
|
|
image_to_display = self.sar_queue.get(block=False)
|
|
self.sar_queue.task_done()
|
|
except queue.Empty:
|
|
pass
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error getting from SAR display queue:")
|
|
|
|
if image_to_display is not None:
|
|
logging.debug(f"{log_prefix} Dequeued SAR image. Calling DisplayManager...")
|
|
if hasattr(self, "display_manager") and self.display_manager:
|
|
try:
|
|
self.display_manager.show_sar_image(image_to_display)
|
|
except Exception as display_e:
|
|
logging.exception(
|
|
f"{log_prefix} Error calling DisplayManager.show_sar_image:"
|
|
)
|
|
else:
|
|
logging.error(f"{log_prefix} DisplayManager not available.")
|
|
|
|
if not self.state.shutting_down:
|
|
self._reschedule_queue_processor(self.process_sar_queue)
|
|
|
|
def process_mfd_queue(self):
|
|
"""Gets processed MFD image from queue and displays it."""
|
|
log_prefix = "[App QProc MFD]"
|
|
if self.state.shutting_down:
|
|
return
|
|
|
|
image_to_display = None
|
|
try:
|
|
image_to_display = self.mfd_queue.get(block=False)
|
|
self.mfd_queue.task_done()
|
|
except queue.Empty:
|
|
pass
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error getting from MFD display queue:")
|
|
|
|
if image_to_display is not None:
|
|
logging.debug(f"{log_prefix} Dequeued MFD image. Calling DisplayManager...")
|
|
if hasattr(self, "display_manager") and self.display_manager:
|
|
try:
|
|
self.display_manager.show_mfd_image(image_to_display)
|
|
except Exception as display_e:
|
|
logging.exception(
|
|
f"{log_prefix} Error calling DisplayManager.show_mfd_image:"
|
|
)
|
|
else:
|
|
logging.error(f"{log_prefix} DisplayManager not available.")
|
|
|
|
if not self.state.shutting_down:
|
|
self._reschedule_queue_processor(self.process_mfd_queue)
|
|
|
|
# --- MODIFIED FUNCTION: process_tkinter_queue ---
|
|
def process_tkinter_queue(self):
|
|
"""Processes commands (mouse coords, map updates, map redraw) from queue to update UI."""
|
|
log_prefix = "[App QProc Tkinter]"
|
|
if self.state.shutting_down:
|
|
return
|
|
|
|
item = None
|
|
try:
|
|
item = self.tkinter_queue.get(block=False)
|
|
self.tkinter_queue.task_done()
|
|
except queue.Empty:
|
|
pass
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error getting from Tkinter queue:")
|
|
item = None
|
|
|
|
if item is not None:
|
|
try:
|
|
if isinstance(item, tuple) and len(item) == 2:
|
|
command, payload = item
|
|
logging.debug(
|
|
f"{log_prefix} Dequeued Command:'{command}', Payload Type:{type(payload)}"
|
|
)
|
|
|
|
if command == "MOUSE_COORDS":
|
|
self._handle_mouse_coords_update(payload)
|
|
|
|
elif command == "SHOW_MAP":
|
|
# Store last map PIL image in AppState before display
|
|
if Image is not None and isinstance(payload, Image.Image):
|
|
self.state.last_map_image_pil = payload.copy()
|
|
logging.debug(
|
|
f"{log_prefix} Stored last map image (PIL) in AppState."
|
|
)
|
|
elif payload is None:
|
|
self.state.last_map_image_pil = None
|
|
logging.debug(
|
|
f"{log_prefix} Cleared last map image in AppState (payload was None)."
|
|
)
|
|
|
|
# Delegate display to handler
|
|
self._handle_show_map_update(payload)
|
|
|
|
elif command == "REDRAW_MAP":
|
|
logging.debug(f"{log_prefix} Handling REDRAW_MAP command.")
|
|
if self.state.last_map_image_pil:
|
|
logging.debug(
|
|
f"{log_prefix} Re-displaying last stored map image."
|
|
)
|
|
# Call handler with the stored image
|
|
self._handle_show_map_update(self.state.last_map_image_pil)
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} REDRAW_MAP requested but no previous map image found."
|
|
)
|
|
|
|
else: # Unknown command
|
|
logging.warning(
|
|
f"{log_prefix} Unknown command received: {command}"
|
|
)
|
|
else: # Unexpected item type
|
|
logging.warning(
|
|
f"{log_prefix} Dequeued unexpected item type: {type(item)}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Error processing dequeued Tkinter item:"
|
|
)
|
|
|
|
# Reschedule next check
|
|
if not self.state.shutting_down:
|
|
self._reschedule_queue_processor(self.process_tkinter_queue, delay=100)
|
|
|
|
def _handle_mouse_coords_update(self, payload: Optional[Tuple[str, str]]):
|
|
"""Updates the mouse coordinates UI label."""
|
|
log_prefix = "[App QProc Tkinter]"
|
|
lat_s, lon_s = ("N/A", "N/A")
|
|
if payload is not None:
|
|
if isinstance(payload, tuple) and len(payload) == 2:
|
|
lat_s, lon_s = payload
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Invalid payload for MOUSE_COORDS: {type(payload)}"
|
|
)
|
|
lat_s, lon_s = ("Error", "Error")
|
|
try:
|
|
if hasattr(self, "control_panel") and hasattr(
|
|
self.control_panel, "set_mouse_coordinates"
|
|
):
|
|
self.control_panel.set_mouse_coordinates(lat_s, lon_s)
|
|
except tk.TclError as e:
|
|
if not self.state.shutting_down:
|
|
logging.warning(
|
|
f"{log_prefix} Error updating mouse coords UI (TclError): {e}"
|
|
)
|
|
except Exception as e:
|
|
logging.warning(f"{log_prefix} Error updating mouse coords UI: {e}")
|
|
|
|
def _handle_show_map_update(self, payload: Optional[Image.Image]):
|
|
"""Handles the SHOW_MAP/REDRAW_MAP command by delegating display to MapIntegrationManager."""
|
|
log_prefix = "[App QProc Tkinter]"
|
|
logging.debug(f"{log_prefix} Processing SHOW_MAP/REDRAW_MAP...")
|
|
if hasattr(self, "map_integration_manager") and self.map_integration_manager:
|
|
try:
|
|
self.map_integration_manager.display_map(payload) # Delegate to manager
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Error calling map_integration_manager.display_map:"
|
|
)
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Received map display command but MapIntegrationManager not active."
|
|
)
|
|
|
|
def _reschedule_queue_processor(self, processor_func, delay: Optional[int] = None):
|
|
"""Helper method to reschedule a queue processor function using root.after."""
|
|
if delay is None:
|
|
if processor_func in [self.process_sar_queue, self.process_mfd_queue]:
|
|
target_fps = config.MFD_FPS
|
|
delay = (
|
|
max(10, int(1000 / (target_fps * 1.5))) if target_fps > 0 else 20
|
|
)
|
|
else:
|
|
delay = 100 # Default for other queues
|
|
try:
|
|
if self.root and self.root.winfo_exists():
|
|
self.root.after(delay, processor_func)
|
|
except Exception as e:
|
|
if not self.state.shutting_down:
|
|
logging.warning(
|
|
f"[App Rescheduler] Error rescheduling {processor_func.__name__}: {e}"
|
|
)
|
|
|
|
# --- Status Update ---
|
|
def update_status(self):
|
|
"""Updates status bar text and statistics labels periodically."""
|
|
log_prefix = "[App Status Update]"
|
|
if not hasattr(self, "state") or self.state.shutting_down:
|
|
return
|
|
|
|
map_loading = False # Check if map is still loading initially
|
|
try:
|
|
if (
|
|
hasattr(self, "statusbar")
|
|
and self.statusbar.winfo_exists()
|
|
and "Loading initial map" in self.statusbar.cget("text")
|
|
):
|
|
map_loading = True
|
|
logging.debug(
|
|
f"{log_prefix} Skipping status update while initial map loads."
|
|
)
|
|
return
|
|
except Exception:
|
|
pass # Ignore errors checking status bar
|
|
|
|
logging.debug(f"{log_prefix} Updating status bar and statistics labels...")
|
|
stats = self.state.get_statistics()
|
|
|
|
try:
|
|
# Format Status String Components
|
|
mode = (
|
|
"Test"
|
|
if self.state.test_mode_active
|
|
else ("Local" if config.USE_LOCAL_IMAGES else "Network")
|
|
)
|
|
map_active = (
|
|
hasattr(self, "map_integration_manager")
|
|
and self.map_integration_manager is not None
|
|
)
|
|
map_stat = " MapOn" if config.ENABLE_MAP_OVERLAY and map_active else ""
|
|
mfd_fps = f"MFD:{self.state.mfd_fps:.1f}fps"
|
|
sar_fps = (
|
|
f"SAR:{self.state.sar_fps:.1f}fps"
|
|
if self.state.sar_fps > 0
|
|
else "SAR:N/A"
|
|
)
|
|
#mouse_txt = "Mouse: N/A"
|
|
#if (
|
|
# hasattr(self.control_panel, "mouse_latlon_label")
|
|
# and self.control_panel.mouse_latlon_label.winfo_exists()
|
|
#):
|
|
# try:
|
|
# mouse_label_text = self.control_panel.mouse_latlon_label.cget(
|
|
# "text"
|
|
# )
|
|
# mouse_txt = (
|
|
# mouse_label_text
|
|
# if mouse_label_text.startswith("Mouse")
|
|
# else f"Mouse: {mouse_label_text}"
|
|
# )
|
|
# except Exception:
|
|
# mouse_txt = "Mouse: UI Error"
|
|
|
|
status_prefix = f"Status: {mode}{map_stat}"
|
|
status_info = f"{mfd_fps} | {sar_fps}"
|
|
full_status = f"{status_prefix} | {status_info}"
|
|
|
|
# Format Statistics Strings
|
|
drop_txt = f"Drop(Q): S={stats['dropped_sar_q']},M={stats['dropped_mfd_q']}, Tk={stats['dropped_tk_q']},Mo={stats['dropped_mouse_q']}"
|
|
incmpl_txt = f"Incmpl(RX): S={stats['incomplete_sar_rx']},M={stats['incomplete_mfd_rx']}"
|
|
|
|
logging.debug(f"{log_prefix} Formatted status strings.")
|
|
|
|
# Schedule UI updates
|
|
if self.root and self.root.winfo_exists():
|
|
if hasattr(self, "statusbar") and self.statusbar.winfo_exists():
|
|
self.root.after_idle(self.statusbar.set_status_text, full_status)
|
|
if (
|
|
hasattr(self.control_panel, "dropped_label")
|
|
and self.control_panel.dropped_label.winfo_exists()
|
|
):
|
|
self.root.after_idle(
|
|
self.control_panel.dropped_label.config, {"text": drop_txt}
|
|
)
|
|
if (
|
|
hasattr(self.control_panel, "incomplete_label")
|
|
and self.control_panel.incomplete_label.winfo_exists()
|
|
):
|
|
self.root.after_idle(
|
|
self.control_panel.incomplete_label.config, {"text": incmpl_txt}
|
|
)
|
|
|
|
except tk.TclError as e:
|
|
if not self.state.shutting_down:
|
|
logging.warning(f"{log_prefix} TclError updating status UI: {e}")
|
|
except Exception as e:
|
|
if not self.state.shutting_down:
|
|
logging.exception(f"{log_prefix} Error formatting/updating status UI:")
|
|
|
|
# --- Cleanup ---
|
|
def close_app(self):
|
|
"""Performs graceful shutdown of the application and its components."""
|
|
log_prefix = "[App Shutdown]"
|
|
if hasattr(self, "state") and self.state.shutting_down:
|
|
logging.warning(f"{log_prefix} Close already initiated.")
|
|
return
|
|
if not hasattr(self, "state"):
|
|
logging.error(f"{log_prefix} Cannot shutdown: AppState not found.")
|
|
sys.exit(1)
|
|
|
|
logging.info(f"{log_prefix} Starting shutdown sequence...")
|
|
self.state.shutting_down = True
|
|
try:
|
|
self.set_status("Closing...") # Attempt final status update
|
|
except Exception:
|
|
pass
|
|
|
|
logging.debug(f"{log_prefix} Stopping TestModeManager timers...")
|
|
if hasattr(self, "test_mode_manager") and self.test_mode_manager:
|
|
self.test_mode_manager.stop_timers()
|
|
logging.debug(f"{log_prefix} Shutting down MapIntegrationManager...")
|
|
if hasattr(self, "map_integration_manager") and self.map_integration_manager:
|
|
self.map_integration_manager.shutdown()
|
|
logging.debug(f"{log_prefix} Signalling periodic updates/processors to stop.")
|
|
logging.debug(f"{log_prefix} Closing UDP socket...")
|
|
if self.udp_socket:
|
|
close_udp_socket(self.udp_socket)
|
|
self.udp_socket = None
|
|
|
|
if self.udp_thread and self.udp_thread.is_alive():
|
|
logging.debug(f"{log_prefix} Waiting for UDP receiver thread...")
|
|
self.udp_thread.join(timeout=0.5)
|
|
if self.udp_thread.is_alive():
|
|
logging.warning(
|
|
f"{log_prefix} UDP receiver thread did not exit cleanly."
|
|
)
|
|
else:
|
|
logging.debug(f"{log_prefix} UDP receiver thread exited.")
|
|
|
|
worker_pool = None
|
|
if hasattr(self, "udp_receiver") and self.udp_receiver:
|
|
worker_pool = getattr(self.udp_receiver, "executor", None)
|
|
if worker_pool:
|
|
logging.info(f"{log_prefix} Shutting down worker pool...")
|
|
try:
|
|
worker_pool.shutdown(wait=False, cancel_futures=False)
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Exception during worker_pool shutdown: {e}"
|
|
)
|
|
|
|
logging.debug(f"{log_prefix} Destroying display windows via DisplayManager...")
|
|
if hasattr(self, "display_manager"):
|
|
self.display_manager.destroy_windows()
|
|
|
|
try:
|
|
logging.debug(f"{log_prefix} Final cv2.waitKey(5)...")
|
|
cv2.waitKey(5)
|
|
except Exception as e:
|
|
logging.warning(f"{log_prefix} Error during final cv2.waitKey: {e}")
|
|
|
|
logging.info(f"{log_prefix} Requesting Tkinter root window destruction...")
|
|
try:
|
|
if self.root and self.root.winfo_exists():
|
|
self.root.destroy()
|
|
logging.info(f"{log_prefix} Tkinter root window destroyed.")
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error destroying Tkinter window:")
|
|
|
|
logging.info("-----------------------------------------")
|
|
logging.info(f"{log_prefix} Application close sequence finished.")
|
|
logging.info("-----------------------------------------")
|
|
sys.exit(0)
|
|
|
|
|
|
# --- Main Execution Block ---
|
|
if __name__ == "__main__":
|
|
main_log_prefix = "[App Main]"
|
|
root = None
|
|
app = None
|
|
|
|
try:
|
|
if config.ENABLE_MAP_OVERLAY and not MAP_MODULES_LOADED:
|
|
logging.critical(
|
|
f"{main_log_prefix} Map Overlay enabled but modules failed to load. Cannot start."
|
|
)
|
|
sys.exit(1)
|
|
|
|
logging.debug(f"{main_log_prefix} Creating main Tkinter window...")
|
|
root = create_main_window(
|
|
title="Control Panel",
|
|
min_width=config.TKINTER_MIN_WIDTH,
|
|
min_height=config.TKINTER_MIN_HEIGHT,
|
|
x_pos=10,
|
|
y_pos=10,
|
|
)
|
|
logging.debug(f"{main_log_prefix} Main Tkinter window created.")
|
|
|
|
logging.debug(f"{main_log_prefix} Initializing App class...")
|
|
app = App(root)
|
|
logging.debug(f"{main_log_prefix} App class initialized.")
|
|
|
|
root.protocol("WM_DELETE_WINDOW", app.close_app)
|
|
logging.debug(f"{main_log_prefix} WM_DELETE_WINDOW protocol set.")
|
|
|
|
logging.info(f"{main_log_prefix} Starting Tkinter main event loop...")
|
|
root.mainloop()
|
|
logging.info(f"{main_log_prefix} Tkinter main event loop finished.")
|
|
|
|
except SystemExit as exit_e:
|
|
if exit_e.code == 0:
|
|
logging.info(f"{main_log_prefix} Application exited normally.")
|
|
else:
|
|
logging.warning(
|
|
f"{main_log_prefix} Application exited with error code {exit_e.code}."
|
|
)
|
|
except Exception as e:
|
|
logging.critical(
|
|
f"{main_log_prefix} UNHANDLED EXCEPTION OCCURRED:", exc_info=True
|
|
)
|
|
if app and hasattr(app, "state") and not app.state.shutting_down:
|
|
logging.error(f"{main_log_prefix} Attempting emergency cleanup...")
|
|
try:
|
|
app.close_app()
|
|
except SystemExit:
|
|
pass
|
|
except Exception as cleanup_e:
|
|
logging.exception(
|
|
f"{main_log_prefix} Error during emergency cleanup: {cleanup_e}"
|
|
)
|
|
sys.exit(1)
|
|
finally:
|
|
logging.info(f"{main_log_prefix} Application finally block reached.")
|
|
logging.debug(
|
|
f"{main_log_prefix} Final check: Destroying any remaining OpenCV windows..."
|
|
)
|
|
try:
|
|
cv2.destroyAllWindows()
|
|
except Exception as cv_err:
|
|
logging.warning(
|
|
f"{main_log_prefix} Exception during final cv2.destroyAllWindows(): {cv_err}"
|
|
)
|
|
logging.info("================ Application End ================")
|
|
logging.shutdown()
|
|
|
|
# --- END OF FILE app.py ---
|