3039 lines
138 KiB
Python
3039 lines
138 KiB
Python
# app.py
|
|
"""
|
|
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
|
|
|
|
Main application module for the Control Panel application.
|
|
|
|
Orchestrates UI, display, network reception, image processing pipeline,
|
|
test mode management, map integration, and state management.
|
|
Initializes all sub-modules and manages the main application lifecycle.
|
|
"""
|
|
|
|
# --- Standard library imports ---
|
|
import threading
|
|
import time
|
|
import queue
|
|
import os
|
|
import logging
|
|
import math
|
|
import sys
|
|
import socket # Required for network setup
|
|
from typing import Optional, Tuple, Any, Dict, TYPE_CHECKING
|
|
import datetime
|
|
import cv2
|
|
|
|
# --- Third-party imports ---
|
|
import tkinter as tk
|
|
from tkinter import ttk
|
|
from tkinter import colorchooser
|
|
import numpy as np
|
|
import screeninfo
|
|
|
|
# Conditional map imports are handled further down
|
|
|
|
# --- Configuration Import ---
|
|
import config
|
|
|
|
# --- Logging Setup ---
|
|
# Import and call the setup function from the dedicated module
|
|
try:
|
|
from logging_config import setup_logging
|
|
|
|
# Configure logging as early as possible
|
|
setup_logging()
|
|
except ImportError:
|
|
# Fallback basic configuration if logging_config fails
|
|
print("ERROR: logging_config.py not found. Using basic logging.")
|
|
logging.basicConfig(
|
|
level=logging.WARNING,
|
|
format="%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s",
|
|
)
|
|
|
|
# --- Application Modules Import ---
|
|
from ui import ControlPanel, StatusBar, create_main_window
|
|
|
|
# image_processing functions are used by other modules, App uses ImagePipeline now
|
|
# from image_processing import ...
|
|
from display import DisplayManager
|
|
from utils import (
|
|
put_queue,
|
|
clear_queue,
|
|
decimal_to_dms,
|
|
generate_sar_kml,
|
|
launch_google_earth,
|
|
cleanup_old_kml_files
|
|
)
|
|
from network import create_udp_socket, close_udp_socket
|
|
from receiver import UdpReceiver
|
|
from app_state import AppState # Centralized state
|
|
from test_mode_manager import TestModeManager # Manages test mode logic
|
|
from image_pipeline import ImagePipeline # Manages normal image processing
|
|
|
|
# --- Map related imports (Conditional) ---
|
|
# Check if map modules are present before attempting specific imports
|
|
map_libs_found = True
|
|
try:
|
|
import mercantile
|
|
import pyproj
|
|
from PIL import Image # Needed by map modules usually
|
|
except ImportError as map_lib_err:
|
|
logging.warning(
|
|
f"[App Init] Failed to import core map library ({map_lib_err}). Map functionality disabled."
|
|
)
|
|
map_libs_found = False
|
|
# Define placeholders for type hinting if core libs failed
|
|
BaseMapService = None # type: ignore
|
|
MapTileManager = None # type: ignore
|
|
MapDisplayWindow = None # type: ignore
|
|
MapIntegrationManager = None # type: ignore
|
|
MapCalculationError = Exception
|
|
|
|
if map_libs_found:
|
|
try:
|
|
from map_services import get_map_service, BaseMapService
|
|
from map_manager import MapTileManager
|
|
from map_utils import (
|
|
get_bounding_box_from_center_size,
|
|
get_tile_ranges_for_bbox,
|
|
MapCalculationError,
|
|
)
|
|
from map_display import MapDisplayWindow
|
|
|
|
# Import the integration manager
|
|
from map_integration import MapIntegrationManager
|
|
|
|
MAP_MODULES_LOADED = True
|
|
except ImportError as map_import_err:
|
|
logging.warning(
|
|
f"[App Init] Failed to import specific map modules ({map_import_err}). Map functionality disabled."
|
|
)
|
|
MAP_MODULES_LOADED = False
|
|
# Define placeholders if specific modules failed
|
|
BaseMapService = None # type: ignore
|
|
MapTileManager = None # type: ignore
|
|
MapDisplayWindow = None # type: ignore
|
|
MapIntegrationManager = None # type: ignore
|
|
MapCalculationError = Exception
|
|
else:
|
|
MAP_MODULES_LOADED = False
|
|
|
|
# Type checking block for App class reference in managers
|
|
if TYPE_CHECKING:
|
|
# This avoids circular imports at runtime but helps type checkers
|
|
pass # No direct import needed here as other modules import App
|
|
|
|
|
|
# --- Main Application Class ---
|
|
class App:
|
|
"""
|
|
Main application class. Manages UI, display, processing, network, state,
|
|
and orchestrates various managers (Test Mode, Image Pipeline, Map Integration).
|
|
"""
|
|
|
|
# --- Class Attributes ---
|
|
# Timers and offsets previously here are now managed by TestModeManager
|
|
# Map components previously here are now managed by MapIntegrationManager
|
|
|
|
# --- Methods DEFINED BEFORE __init__ to be available for bindings ---
|
|
|
|
# --- Status Update Method ---
|
|
def set_status(self, message: str):
|
|
"""
|
|
Safely updates the main status message prefix in the status bar.
|
|
Uses after_idle for thread safety.
|
|
"""
|
|
log_prefix = "[App Set Status]"
|
|
# Check state exists and flag before proceeding
|
|
# Use hasattr for robustness during init/shutdown
|
|
if not hasattr(self, "state") or self.state.shutting_down:
|
|
return
|
|
|
|
new_status_prefix = f"Status: {message}"
|
|
# Use INFO level for user-visible status changes
|
|
logging.info(f"{log_prefix} Request to set status message prefix: '{message}'")
|
|
|
|
def _update_status_text_on_main_thread():
|
|
"""Internal function to update status text, runs in main GUI thread."""
|
|
# Check state again inside the scheduled function
|
|
if not hasattr(self, "state") or self.state.shutting_down:
|
|
return
|
|
try:
|
|
# Check if statusbar exists and is valid Tkinter widget
|
|
if not (
|
|
hasattr(self, "statusbar")
|
|
and isinstance(self.statusbar, tk.Widget)
|
|
and self.statusbar.winfo_exists()
|
|
):
|
|
# Use WARNING if statusbar is gone unexpectedly
|
|
logging.warning(
|
|
f"{log_prefix} Statusbar widget not available, cannot update status."
|
|
)
|
|
return
|
|
|
|
# Get current text and parse existing parts (keep info after '|')
|
|
current_text: str = self.statusbar.cget("text")
|
|
parts = current_text.split("|")
|
|
suffix = ""
|
|
# Rebuild suffix from parts after the first one
|
|
if len(parts) > 1:
|
|
suffix_parts = [p.strip() for p in parts[1:] if p.strip()]
|
|
if suffix_parts:
|
|
suffix = " | " + " | ".join(suffix_parts)
|
|
|
|
# Combine new prefix and existing suffix
|
|
final_text = f"{new_status_prefix}{suffix}"
|
|
logging.debug(
|
|
f"{log_prefix} Updating status bar text to: '{final_text}'"
|
|
)
|
|
# Call StatusBar's method to update
|
|
self.statusbar.set_status_text(final_text)
|
|
|
|
except tk.TclError as e:
|
|
# Log TclError (widget destroyed) if not shutting down
|
|
if not self.state.shutting_down:
|
|
logging.warning(f"{log_prefix} TclError setting status text: {e}")
|
|
except Exception as e:
|
|
# Log other unexpected errors
|
|
logging.exception(f"{log_prefix} Error updating status bar text:")
|
|
|
|
# Schedule the update on the main GUI thread
|
|
try:
|
|
if hasattr(self, "root") and self.root and self.root.winfo_exists():
|
|
# Use after_idle to ensure it runs when Tkinter is idle
|
|
self.root.after_idle(_update_status_text_on_main_thread)
|
|
# else: Don't log if root is None or destroyed, expected during shutdown
|
|
except Exception as e:
|
|
# Log error only if not shutting down
|
|
if not hasattr(self, "state") or not self.state.shutting_down:
|
|
logging.warning(
|
|
f"{log_prefix} Error scheduling status update via after_idle: {e}"
|
|
)
|
|
|
|
# --- LUT Generation Methods ---
|
|
def update_brightness_contrast_lut(self):
|
|
"""Recalculates the SAR B/C LUT based on AppState and stores it back in AppState."""
|
|
log_prefix = "[App Update SAR LUT]"
|
|
logging.debug(f"{log_prefix} Updating SAR Brightness/Contrast LUT...")
|
|
|
|
# Check if state is initialized
|
|
if not hasattr(self, "state"):
|
|
logging.error(f"{log_prefix} AppState not ready for LUT update.")
|
|
return
|
|
|
|
try:
|
|
# Read parameters from AppState
|
|
# Use max(0.01, ...) for contrast to avoid zero or negative values
|
|
contrast_val = max(0.01, self.state.sar_contrast)
|
|
brightness_val = self.state.sar_brightness
|
|
except AttributeError:
|
|
# This case should be covered by the hasattr check above, but keep for safety
|
|
logging.error(
|
|
f"{log_prefix} Error accessing state for SAR LUT parameters (AttributeError)."
|
|
)
|
|
return
|
|
except Exception as e:
|
|
logging.error(
|
|
f"{log_prefix} Unexpected error accessing state for SAR LUT params: {e}"
|
|
)
|
|
return
|
|
|
|
# Calculate the LUT using numpy vectorized operations
|
|
try:
|
|
# Create an array representing pixel values 0-255
|
|
lut_values = np.arange(
|
|
256, dtype=np.float32
|
|
) # Use float for calculation accuracy
|
|
# Apply contrast (multiplication) and brightness (addition)
|
|
adjusted_values = (lut_values * contrast_val) + brightness_val
|
|
# Clip the results to the valid 0-255 range and convert to uint8
|
|
lut = np.clip(np.round(adjusted_values), 0, 255).astype(np.uint8)
|
|
except Exception as e:
|
|
# Log calculation errors and set a default identity LUT
|
|
logging.exception(f"{log_prefix} Error calculating SAR B/C LUT:")
|
|
# Create identity LUT as fallback
|
|
identity_lut = np.arange(256, dtype=np.uint8)
|
|
# Store fallback LUT in state
|
|
self.state.brightness_contrast_lut = identity_lut
|
|
logging.error(
|
|
f"{log_prefix} Using identity SAR LUT as fallback due to calculation error."
|
|
)
|
|
return
|
|
|
|
# Store the calculated LUT back into AppState
|
|
self.state.brightness_contrast_lut = lut
|
|
logging.debug(f"{log_prefix} SAR B/C LUT updated successfully in AppState.")
|
|
|
|
def update_mfd_lut(self):
|
|
"""Recalculates the MFD LUT based on AppState parameters and stores it back in AppState."""
|
|
log_prefix = "[MFD LUT Update]"
|
|
logging.debug(f"{log_prefix} Recalculating MFD Color LUT...")
|
|
|
|
# Check if state is initialized
|
|
if not hasattr(self, "state"):
|
|
logging.error(f"{log_prefix} AppState not ready for MFD LUT update.")
|
|
return
|
|
|
|
try:
|
|
# Read parameters from AppState safely
|
|
mfd_params = self.state.mfd_params
|
|
raw_map_intensity_factor = mfd_params["raw_map_intensity"] / 255.0
|
|
pixel_to_category = mfd_params["pixel_to_category"]
|
|
categories = mfd_params["categories"]
|
|
except AttributeError:
|
|
logging.error(
|
|
f"{log_prefix} Error accessing mfd_params state (AttributeError)."
|
|
)
|
|
return
|
|
except KeyError as ke:
|
|
logging.error(f"{log_prefix} Missing key in AppState mfd_params: {ke}")
|
|
return
|
|
except Exception as e:
|
|
logging.error(
|
|
f"{log_prefix} Unexpected error accessing state for MFD LUT params: {e}"
|
|
)
|
|
return
|
|
|
|
# Initialize a new LUT array (256 entries, 3 channels BGR) with zeros
|
|
new_lut = np.zeros((256, 3), dtype=np.uint8)
|
|
|
|
try:
|
|
# Iterate through all possible pixel index values (0-255)
|
|
for index_value in range(256):
|
|
# Find the category associated with this pixel index
|
|
category_name = pixel_to_category.get(index_value)
|
|
|
|
if category_name:
|
|
# --- Handle Categorized Pixels (0-31 typically) ---
|
|
cat_data = categories[
|
|
category_name
|
|
] # Get category data (color, intensity)
|
|
base_bgr = cat_data["color"] # Base BGR color tuple
|
|
intensity_factor = (
|
|
cat_data["intensity"] / 255.0
|
|
) # Intensity slider value (0-1)
|
|
|
|
# Calculate final color components applying intensity
|
|
final_b = float(base_bgr[0]) * intensity_factor
|
|
final_g = float(base_bgr[1]) * intensity_factor
|
|
final_r = float(base_bgr[2]) * intensity_factor
|
|
|
|
# Clip and convert to integer for the LUT entry
|
|
new_lut[index_value, 0] = np.clip(
|
|
int(round(final_b)), 0, 255
|
|
) # Blue
|
|
new_lut[index_value, 1] = np.clip(
|
|
int(round(final_g)), 0, 255
|
|
) # Green
|
|
new_lut[index_value, 2] = np.clip(
|
|
int(round(final_r)), 0, 255
|
|
) # Red
|
|
|
|
elif 32 <= index_value <= 255:
|
|
# --- Handle Raw Map Pixels (32-255 typically) ---
|
|
# Map index value (32-255) to a raw intensity (0-255)
|
|
raw_intensity = (float(index_value) - 32.0) * (255.0 / 223.0)
|
|
# Apply the raw map intensity slider factor
|
|
final_gray_float = raw_intensity * raw_map_intensity_factor
|
|
# Clip and convert to integer
|
|
final_gray_int = int(round(np.clip(final_gray_float, 0, 255)))
|
|
# Assign the gray value to all BGR channels
|
|
new_lut[index_value, :] = final_gray_int
|
|
else:
|
|
# Handle cases where index is < 32 but not found in pixel_to_category (e.g., Reserved range)
|
|
# This case might indicate an issue in config, but handle gracefully.
|
|
# We default these to black (as per initial new_lut value)
|
|
if category_name is None:
|
|
# Log unexpected unmapped indices
|
|
logging.warning(
|
|
f"{log_prefix} Index {index_value} has no assigned category. Defaulting to black."
|
|
)
|
|
# new_lut[index_value, :] is already [0, 0, 0]
|
|
|
|
# Store the completed LUT back into AppState
|
|
self.state.mfd_lut = new_lut
|
|
logging.info(
|
|
f"{log_prefix} MFD LUT update complete and stored in AppState."
|
|
)
|
|
|
|
except KeyError as ke:
|
|
logging.error(
|
|
f"{log_prefix} Missing category key '{ke}' during MFD LUT generation."
|
|
)
|
|
self._apply_fallback_mfd_lut() # Apply fallback if structure error occurs
|
|
except Exception as e:
|
|
logging.critical(
|
|
f"{log_prefix} CRITICAL error during MFD LUT generation:", exc_info=True
|
|
)
|
|
self._apply_fallback_mfd_lut() # Apply fallback on critical errors
|
|
|
|
def _apply_fallback_mfd_lut(self):
|
|
"""Applies a simple grayscale ramp as a fallback MFD LUT in case of errors."""
|
|
log_prefix = "[MFD LUT Update]"
|
|
logging.error(
|
|
f"{log_prefix} Applying fallback grayscale MFD LUT due to previous errors."
|
|
)
|
|
if hasattr(self, "state"):
|
|
try:
|
|
# Create a simple grayscale ramp (0-255)
|
|
gray_ramp = np.arange(256, dtype=np.uint8)
|
|
# Convert to BGR format for the LUT
|
|
fallback_lut = cv2.cvtColor(
|
|
gray_ramp[:, np.newaxis], cv2.COLOR_GRAY2BGR
|
|
)[:, 0, :]
|
|
self.state.mfd_lut = fallback_lut
|
|
except Exception as fallback_e:
|
|
logging.critical(
|
|
f"{log_prefix} Failed even to create fallback MFD LUT: {fallback_e}"
|
|
)
|
|
# Ensure state LUT is at least *something* to avoid None errors later
|
|
self.state.mfd_lut = np.zeros((256, 3), dtype=np.uint8)
|
|
|
|
# --- UI Callback Methods ---
|
|
def update_image_mode(self): # UI Callback
|
|
"""Handles switching between Test and Normal Mode based on UI checkbox."""
|
|
log_prefix = "[App Mode Switch]"
|
|
# Check essential components exist
|
|
if not hasattr(self, "state") or not hasattr(self, "test_mode_manager"):
|
|
logging.error(
|
|
f"{log_prefix} State or TestModeManager not initialized. Cannot switch mode."
|
|
)
|
|
return
|
|
if self.state.shutting_down:
|
|
return # Ignore if shutting down
|
|
|
|
try:
|
|
is_test_req = False
|
|
# Safely get checkbox state
|
|
if hasattr(self.control_panel, "test_image_var") and isinstance(
|
|
self.control_panel.test_image_var, tk.Variable
|
|
):
|
|
is_test_req = self.control_panel.test_image_var.get() == 1
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} test_image_var not found or invalid in control_panel."
|
|
)
|
|
# Fallback: assume current state to avoid unintended switch
|
|
is_test_req = self.state.test_mode_active
|
|
|
|
# --- Perform switch only if requested mode differs from current state ---
|
|
if is_test_req != self.state.test_mode_active:
|
|
logging.info(
|
|
f"{log_prefix} Request to change Test Mode state to: {is_test_req}"
|
|
)
|
|
# Update the state flag first
|
|
self.state.test_mode_active = is_test_req
|
|
|
|
# Call appropriate activation/deactivation sequences
|
|
if self.state.test_mode_active:
|
|
# Attempt to activate the manager
|
|
if self.test_mode_manager.activate():
|
|
# If manager activated successfully, perform UI/State actions
|
|
self.activate_test_mode_ui_actions()
|
|
else:
|
|
# If manager activation failed (e.g., missing test data)
|
|
logging.error(
|
|
f"{log_prefix} TestModeManager activation failed. Reverting UI and state."
|
|
)
|
|
# Try to revert the UI checkbox and state flag
|
|
self._revert_test_mode_ui()
|
|
else:
|
|
# Deactivate the manager (stops timers)
|
|
self.test_mode_manager.deactivate()
|
|
# Perform UI/State actions for returning to normal mode
|
|
self.deactivate_test_mode_ui_actions()
|
|
|
|
# Reset statistics whenever the mode successfully changes
|
|
self.state.reset_statistics()
|
|
# Update the status bar display
|
|
self.update_status()
|
|
else:
|
|
# Log if no change is needed
|
|
logging.debug(
|
|
f"{log_prefix} Requested mode ({is_test_req}) is the same as current mode. No change."
|
|
)
|
|
|
|
except tk.TclError as e:
|
|
# Handle Tkinter errors (e.g., widget destroyed)
|
|
logging.warning(
|
|
f"{log_prefix} UI error accessing checkbox state (TclError): {e}"
|
|
)
|
|
except AttributeError as ae:
|
|
# Handle potential errors if managers aren't fully initialized
|
|
logging.error(
|
|
f"{log_prefix} Missing attribute during mode update (likely manager init issue): {ae}"
|
|
)
|
|
except Exception as e:
|
|
# Log any other unexpected errors during the mode switch process
|
|
logging.exception(f"{log_prefix} Unexpected error during mode update:")
|
|
|
|
def update_sar_size(self, event=None): # UI Callback
|
|
"""Callback for SAR size combobox change. Updates state and triggers processing."""
|
|
log_prefix = "[App CB SAR Size]"
|
|
if self.state.shutting_down:
|
|
return # Ignore if shutting down
|
|
|
|
# If map overlay is enabled, SAR size is fixed, prevent UI change
|
|
# Check map manager existence as a proxy for map being active
|
|
map_active = (
|
|
hasattr(self, "map_integration_manager")
|
|
and self.map_integration_manager is not None
|
|
)
|
|
#if config.ENABLE_MAP_OVERLAY and map_active:
|
|
# logging.debug(
|
|
# f"{log_prefix} Ignoring SAR size change request (Map Overlay active)."
|
|
# )
|
|
# # Try to force the UI combobox back to the map-enforced value
|
|
# try:
|
|
# # Calculate the fixed factor based on current state (set during init if map on)
|
|
# forced_factor = (
|
|
# config.SAR_WIDTH // self.state.sar_display_width
|
|
# if self.state.sar_display_width > 0
|
|
# else 1
|
|
# )
|
|
# forced_size_str = f"1:{forced_factor}"
|
|
# if hasattr(self.control_panel, "sar_size_combo"):
|
|
# # Only set if the current value differs to avoid unnecessary events
|
|
# if self.control_panel.sar_size_combo.get() != forced_size_str:
|
|
# self.control_panel.sar_size_combo.set(forced_size_str)
|
|
# except Exception as e:
|
|
# logging.warning(
|
|
# f"{log_prefix} Failed to reset SAR size combobox UI for map mode: {e}"
|
|
# )
|
|
# return # Exit without processing size change
|
|
|
|
# Proceed with size change if map overlay is not active
|
|
try:
|
|
selected_size_str = self.control_panel.sar_size_combo.get()
|
|
logging.debug(
|
|
f"{log_prefix} SAR display size selected: '{selected_size_str}'"
|
|
)
|
|
|
|
# Parse the factor from the string "1:N"
|
|
factor = 1 # Default factor
|
|
if selected_size_str != "1:1":
|
|
# Split by ':' and take the second part, convert to int
|
|
factor = int(selected_size_str.split(":")[1])
|
|
|
|
# Calculate new dimensions based on factor
|
|
new_width = max(1, config.SAR_WIDTH // factor)
|
|
new_height = max(1, config.SAR_HEIGHT // factor)
|
|
|
|
# Update the display size in AppState
|
|
self.state.update_sar_display_size(new_width, new_height)
|
|
# Trigger a SAR image update to reflect the new size
|
|
self._trigger_sar_update() # Calls ImagePipeline if not in test mode
|
|
|
|
except (ValueError, IndexError, TypeError) as e:
|
|
# Handle errors parsing the combobox value
|
|
logging.warning(
|
|
f"{log_prefix} Invalid SAR size format: '{selected_size_str}'. Error: {e}. Resetting UI."
|
|
)
|
|
# Reset UI to current state value as fallback
|
|
try:
|
|
current_factor = (
|
|
config.SAR_WIDTH // self.state.sar_display_width
|
|
if self.state.sar_display_width > 0
|
|
else 1
|
|
)
|
|
current_size_str = f"1:{current_factor}"
|
|
if hasattr(self.control_panel, "sar_size_combo"):
|
|
self.control_panel.sar_size_combo.set(current_size_str)
|
|
except Exception as reset_e:
|
|
logging.warning(
|
|
f"{log_prefix} Failed to reset SAR size combobox UI after error: {reset_e}"
|
|
)
|
|
except Exception as e:
|
|
# Log other unexpected errors
|
|
logging.exception(f"{log_prefix} Error processing SAR size update: {e}")
|
|
|
|
def update_contrast(self, value_str: str): # UI Callback
|
|
"""Callback for SAR contrast slider. Updates state, LUT, and triggers display update."""
|
|
log_prefix = "[App CB SAR Contrast]"
|
|
if self.state.shutting_down:
|
|
return # Ignore if shutting down
|
|
try:
|
|
# Convert slider value string to float
|
|
contrast = float(value_str)
|
|
# Update the contrast value in AppState
|
|
self.state.update_sar_parameters(contrast=contrast)
|
|
# Recalculate the SAR LUT based on the new contrast
|
|
self.update_brightness_contrast_lut()
|
|
# Trigger a display update to show the effect
|
|
self._trigger_sar_update() # Calls ImagePipeline if not in test mode
|
|
except ValueError:
|
|
# Log error if slider value is not a valid float
|
|
logging.warning(
|
|
f"{log_prefix} Invalid contrast value received from slider: {value_str}"
|
|
)
|
|
except Exception as e:
|
|
# Log other unexpected errors
|
|
logging.exception(f"{log_prefix} Error updating contrast: {e}")
|
|
|
|
def update_brightness(self, value_str: str): # UI Callback
|
|
"""Callback for SAR brightness slider. Updates state, LUT, and triggers display update."""
|
|
log_prefix = "[App CB SAR Brightness]"
|
|
if self.state.shutting_down:
|
|
return # Ignore if shutting down
|
|
try:
|
|
# Convert slider value string to integer (can be float then int)
|
|
brightness = int(float(value_str))
|
|
# Update the brightness value in AppState
|
|
self.state.update_sar_parameters(brightness=brightness)
|
|
# Recalculate the SAR LUT based on the new brightness
|
|
self.update_brightness_contrast_lut()
|
|
# Trigger a display update to show the effect
|
|
self._trigger_sar_update() # Calls ImagePipeline if not in test mode
|
|
except ValueError:
|
|
# Log error if slider value is not a valid number
|
|
logging.warning(
|
|
f"{log_prefix} Invalid brightness value received from slider: {value_str}"
|
|
)
|
|
except Exception as e:
|
|
# Log other unexpected errors
|
|
logging.exception(f"{log_prefix} Error updating brightness: {e}")
|
|
|
|
def update_sar_palette(self, event=None): # UI Callback
|
|
"""Callback for SAR palette combobox. Updates state and triggers display update."""
|
|
log_prefix = "[App CB SAR Palette]"
|
|
if self.state.shutting_down:
|
|
return # Ignore if shutting down
|
|
try:
|
|
# Get the selected palette name from the combobox
|
|
palette = self.control_panel.palette_combo.get()
|
|
logging.debug(f"{log_prefix} Palette changed to '{palette}'")
|
|
|
|
# Validate if the selected palette is known/supported
|
|
if palette in config.COLOR_PALETTES:
|
|
# Update the palette name in AppState
|
|
self.state.update_sar_parameters(palette=palette)
|
|
# Trigger a display update to apply the new palette
|
|
self._trigger_sar_update() # Calls ImagePipeline if not in test mode
|
|
else:
|
|
# Log warning and reset UI if palette is unknown
|
|
logging.warning(
|
|
f"{log_prefix} Unknown palette selected: '{palette}'. Ignoring change."
|
|
)
|
|
# Reset combobox to the current value stored in state
|
|
self.control_panel.palette_combo.set(self.state.sar_palette)
|
|
except Exception as e:
|
|
# Log other unexpected errors
|
|
logging.exception(f"{log_prefix} Error updating SAR palette: {e}")
|
|
|
|
def update_mfd_category_intensity(
|
|
self, category_name: str, intensity_value: int
|
|
): # UI Callback
|
|
"""Callback for MFD category intensity slider. Updates state, LUT, and triggers display."""
|
|
log_prefix = "[App CB MFD Param Intensity]"
|
|
if self.state.shutting_down:
|
|
return # Ignore if shutting down
|
|
|
|
logging.debug(
|
|
f"{log_prefix} Category='{category_name}', Intensity={intensity_value}"
|
|
)
|
|
try:
|
|
# Ensure intensity is within the valid range 0-255
|
|
intensity = np.clip(
|
|
intensity_value, 0, 255
|
|
) # Value is already int fromIntVar
|
|
|
|
# Check if category exists in state before updating
|
|
if category_name in self.state.mfd_params["categories"]:
|
|
# Update the intensity for the specific category in AppState
|
|
self.state.mfd_params["categories"][category_name][
|
|
"intensity"
|
|
] = intensity
|
|
logging.info(
|
|
f"{log_prefix} Intensity for '{category_name}' set to {intensity} in AppState."
|
|
)
|
|
# Recalculate the MFD LUT to reflect the change
|
|
self.update_mfd_lut()
|
|
# Trigger an MFD display update
|
|
self._trigger_mfd_update() # Calls ImagePipeline if not in test mode
|
|
else:
|
|
# Log warning if the category name is not found
|
|
logging.warning(
|
|
f"{log_prefix} Unknown MFD category received: '{category_name}'"
|
|
)
|
|
except KeyError as ke:
|
|
logging.error(
|
|
f"{log_prefix} KeyError accessing MFD params for '{category_name}': {ke}"
|
|
)
|
|
except Exception as e:
|
|
# Log other unexpected errors
|
|
logging.exception(
|
|
f"{log_prefix} Error updating MFD intensity for '{category_name}': {e}"
|
|
)
|
|
|
|
def choose_mfd_category_color(self, category_name: str): # UI Callback
|
|
"""Callback for MFD category color button. Opens chooser, updates state, LUT, UI, and triggers display."""
|
|
log_prefix = "[App CB MFD Param Color]"
|
|
if self.state.shutting_down:
|
|
return # Ignore if shutting down
|
|
|
|
logging.debug(
|
|
f"{log_prefix} Color chooser requested for Category='{category_name}'"
|
|
)
|
|
|
|
# Validate category name
|
|
if category_name not in self.state.mfd_params["categories"]:
|
|
logging.warning(
|
|
f"{log_prefix} Cannot choose color for unknown MFD category: '{category_name}'"
|
|
)
|
|
return
|
|
|
|
try:
|
|
# Get the current color (BGR tuple) from AppState to set initial color
|
|
initial_bgr = self.state.mfd_params["categories"][category_name]["color"]
|
|
# Convert BGR to HEX string for the color chooser
|
|
initial_hex = (
|
|
f"#{initial_bgr[2]:02x}{initial_bgr[1]:02x}{initial_bgr[0]:02x}"
|
|
)
|
|
logging.debug(
|
|
f"{log_prefix} Opening color chooser (initial BGR: {initial_bgr}, initial HEX: {initial_hex})"
|
|
)
|
|
|
|
# Open the Tkinter color chooser dialog
|
|
# Returns a tuple: ((R, G, B), "#RRGGBB") or (None, None) if cancelled
|
|
color_code = colorchooser.askcolor(
|
|
title=f"Select Color for {category_name}", initialcolor=initial_hex
|
|
)
|
|
|
|
# Check if a color was selected (result is not None and first element is not None)
|
|
if color_code and color_code[0]:
|
|
# Extract the chosen RGB tuple (float 0-255 or int 0-255 depending on Tk version)
|
|
rgb = color_code[0]
|
|
# Convert RGB tuple to integer BGR tuple, clipping values
|
|
new_bgr = tuple(
|
|
np.clip(int(c), 0, 255) for c in (rgb[2], rgb[1], rgb[0])
|
|
) # B=rgb[2], G=rgb[1], R=rgb[0]
|
|
|
|
logging.info(
|
|
f"{log_prefix} New color chosen for '{category_name}': RGB={rgb}, BGR={new_bgr}"
|
|
)
|
|
|
|
# Update the color in AppState
|
|
self.state.mfd_params["categories"][category_name]["color"] = new_bgr
|
|
# Recalculate the MFD LUT
|
|
self.update_mfd_lut()
|
|
# Update the color preview label in the UI (schedule on main thread)
|
|
if self.root and self.root.winfo_exists():
|
|
# Ensure control panel and method exist before scheduling
|
|
if hasattr(self.control_panel, "update_mfd_color_display"):
|
|
self.root.after_idle(
|
|
self.control_panel.update_mfd_color_display,
|
|
category_name,
|
|
new_bgr,
|
|
)
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} control_panel.update_mfd_color_display method not found."
|
|
)
|
|
# Trigger an MFD display update
|
|
self._trigger_mfd_update() # Calls ImagePipeline if not in test mode
|
|
else:
|
|
# Log if the user cancelled the color chooser
|
|
logging.debug(f"{log_prefix} Color selection cancelled by user.")
|
|
|
|
except KeyError as ke:
|
|
logging.error(
|
|
f"{log_prefix} KeyError accessing MFD params for '{category_name}': {ke}"
|
|
)
|
|
except Exception as e:
|
|
# Log other unexpected errors during color selection process
|
|
logging.exception(
|
|
f"{log_prefix} Error during color selection for '{category_name}': {e}"
|
|
)
|
|
|
|
def update_mfd_raw_map_intensity(self, intensity_value: int): # UI Callback
|
|
"""Callback for Raw Map intensity slider. Updates state, LUT, and triggers display."""
|
|
log_prefix = "[App CB MFD Param RawMap]"
|
|
if self.state.shutting_down:
|
|
return # Ignore if shutting down
|
|
|
|
logging.debug(
|
|
f"{log_prefix} Raw Map intensity slider changed -> {intensity_value}"
|
|
)
|
|
try:
|
|
# Ensure intensity is within the valid range 0-255
|
|
intensity = np.clip(
|
|
intensity_value, 0, 255
|
|
) # Value is already int fromIntVar
|
|
# Update the raw map intensity value in AppState
|
|
self.state.mfd_params["raw_map_intensity"] = intensity
|
|
logging.info(
|
|
f"{log_prefix} Raw Map intensity set to {intensity} in AppState."
|
|
)
|
|
# Recalculate the MFD LUT
|
|
self.update_mfd_lut()
|
|
# Trigger an MFD display update
|
|
self._trigger_mfd_update() # Calls ImagePipeline if not in test mode
|
|
except KeyError as ke:
|
|
logging.error(
|
|
f"{log_prefix} KeyError accessing MFD params for raw_map_intensity: {ke}"
|
|
)
|
|
except Exception as e:
|
|
# Log other unexpected errors
|
|
logging.exception(f"{log_prefix} Error updating raw map intensity: {e}")
|
|
|
|
# --- Initialization ---
|
|
def __init__(self, root: tk.Tk):
|
|
"""
|
|
Initializes the main application components and state.
|
|
|
|
Args:
|
|
root (tk.Tk): The main Tkinter window instance.
|
|
"""
|
|
log_prefix = "[App Init]"
|
|
logging.debug(f"{log_prefix} Starting application initialization...")
|
|
|
|
self.root = root
|
|
self.root.title("Control Panel")
|
|
self.root.minsize(config.TKINTER_MIN_WIDTH, config.TKINTER_MIN_HEIGHT)
|
|
|
|
# --- Central State Initialization ---
|
|
self.state = AppState()
|
|
logging.debug(f"{log_prefix} AppState instance created.")
|
|
|
|
# --- Data Queues ---
|
|
self.sar_queue: queue.Queue = queue.Queue(maxsize=config.DEFAULT_SAR_QUEUE)
|
|
self.mouse_queue: queue.Queue = queue.Queue(maxsize=config.DEFAULT_MOUSE_QUEUE)
|
|
self.tkinter_queue: queue.Queue = queue.Queue(
|
|
maxsize=config.DEFAULT_TKINTER_QUEUE
|
|
)
|
|
self.mfd_queue: queue.Queue = queue.Queue(maxsize=config.DEFAULT_MFD_QUEUE)
|
|
logging.debug(f"{log_prefix} Data queues initialized.")
|
|
|
|
# --- Screen Info & Window Placement ---
|
|
screen_w, screen_h = self._get_screen_dimensions()
|
|
|
|
# --- Calculate Initial Window Positions and Sizes ---
|
|
# Calculate SAR display size first, as it might depend on map state
|
|
initial_sar_w, initial_sar_h = self._calculate_initial_sar_size(
|
|
desired_factor_if_map=5
|
|
) # Updates state if map active
|
|
# Calculate Tkinter and MFD positions
|
|
self.tkinter_x, self.tkinter_y = self._calculate_tkinter_position(screen_h)
|
|
self.mfd_x, self.mfd_y = self._calculate_mfd_position()
|
|
# Calculate SAR position using the initial size (potentially adjusted by map)
|
|
self.sar_x, self.sar_y = self._calculate_sar_position(screen_w, initial_sar_w)
|
|
# Calculate potential map position (used if manager is created)
|
|
map_x, map_y = self._calculate_map_position(
|
|
screen_w,
|
|
initial_sar_w,
|
|
# Pass the max map size used in MapDisplayWindow
|
|
max_map_width=MapDisplayWindow.MAX_DISPLAY_WIDTH,
|
|
)
|
|
|
|
# Set Tkinter window position
|
|
self.root.geometry(f"+{self.tkinter_x}+{self.tkinter_y}")
|
|
logging.debug(
|
|
f"{log_prefix} Initial Window positions: Tk({self.tkinter_x},{self.tkinter_y}), "
|
|
f"MFD({self.mfd_x},{self.mfd_y}), SAR({self.sar_x},{self.sar_y}), MapEst({map_x},{map_y})"
|
|
)
|
|
|
|
# --- Initialize Sub-systems ---
|
|
# 1. UI Components (need 'self' for callbacks)
|
|
self.statusbar = StatusBar(self.root)
|
|
self.control_panel = ControlPanel(self.root, self) # Pass App instance
|
|
logging.debug(f"{log_prefix} UI components created.")
|
|
|
|
# 2. LUTs (read initial state, store back in state)
|
|
self.update_brightness_contrast_lut()
|
|
self.update_mfd_lut()
|
|
logging.debug(f"{log_prefix} Initial LUTs generated.")
|
|
|
|
# 3. Display Manager (handles MFD/SAR OpenCV windows)
|
|
self.display_manager = DisplayManager(
|
|
app=self, # Pass self for AppState access
|
|
sar_queue=self.sar_queue,
|
|
mouse_queue=self.mouse_queue,
|
|
sar_x=self.sar_x,
|
|
sar_y=self.sar_y,
|
|
mfd_x=self.mfd_x,
|
|
mfd_y=self.mfd_y,
|
|
initial_sar_width=self.state.sar_display_width, # Use current state (potentially map-adjusted)
|
|
initial_sar_height=self.state.sar_display_height, # Use current state
|
|
)
|
|
logging.debug(f"{log_prefix} DisplayManager created.")
|
|
# Initialize display windows immediately (shows placeholders)
|
|
try:
|
|
self.display_manager.initialize_display_windows()
|
|
except Exception as e:
|
|
self.set_status("Error: Display Init Failed") # Use self.set_status
|
|
logging.critical(
|
|
f"{log_prefix} Display window initialization failed: {e}", exc_info=True
|
|
)
|
|
|
|
# 4. Image Processing Pipeline (handles normal mode image processing)
|
|
self.image_pipeline = ImagePipeline(
|
|
app_state=self.state,
|
|
sar_queue=self.sar_queue,
|
|
mfd_queue=self.mfd_queue,
|
|
app=self, # Pass self for put_queue context
|
|
)
|
|
logging.debug(f"{log_prefix} ImagePipeline created.")
|
|
|
|
# 5. Test Mode Manager (handles test mode logic)
|
|
self.test_mode_manager = TestModeManager(
|
|
app_state=self.state,
|
|
root=self.root,
|
|
sar_queue=self.sar_queue,
|
|
mfd_queue=self.mfd_queue,
|
|
app=self, # Pass self for put_queue context
|
|
)
|
|
logging.debug(f"{log_prefix} TestModeManager created.")
|
|
|
|
# 6. Map Integration Manager (conditional initialization)
|
|
self.map_integration_manager: Optional[MapIntegrationManager] = None
|
|
if config.ENABLE_MAP_OVERLAY:
|
|
# Check if necessary libraries and manager class were loaded
|
|
if MAP_MODULES_LOADED and MapIntegrationManager is not None:
|
|
logging.info(
|
|
f"{log_prefix} Map Overlay enabled. Initializing MapIntegrationManager..."
|
|
)
|
|
try:
|
|
# Create the manager instance
|
|
self.map_integration_manager = MapIntegrationManager(
|
|
app_state=self.state,
|
|
tkinter_queue=self.tkinter_queue,
|
|
app=self,
|
|
map_x=map_x, # Pass pre-calculated position
|
|
map_y=map_y,
|
|
)
|
|
logging.info(
|
|
f"{log_prefix} MapIntegrationManager initialized successfully."
|
|
)
|
|
except Exception as map_mgr_e:
|
|
# Log errors during map manager initialization
|
|
logging.exception(
|
|
f"{log_prefix} Failed to initialize MapIntegrationManager:"
|
|
)
|
|
self.map_integration_manager = (
|
|
None # Ensure manager is None on error
|
|
)
|
|
self.set_status("Error: Map Init Failed") # Update status bar
|
|
else:
|
|
# Log error if map is enabled but components are missing
|
|
logging.error(
|
|
f"{log_prefix} Map Overlay enabled but required modules/manager failed to load."
|
|
)
|
|
self.set_status("Error: Map Modules Missing")
|
|
else:
|
|
# Log if map overlay is disabled in config
|
|
logging.debug(f"{log_prefix} Map Overlay is disabled in configuration.")
|
|
|
|
# 7. Set initial UI state labels AFTER all components potentially needed exist
|
|
self._update_initial_ui_labels()
|
|
|
|
# 8. Network Setup
|
|
self.local_ip: str = config.DEFAULT_SER_IP
|
|
self.local_port: int = config.DEFAULT_SER_PORT
|
|
self.udp_socket: Optional[socket.socket] = None # Define attribute type
|
|
self.udp_receiver: Optional[UdpReceiver] = None # Define attribute type
|
|
self.udp_thread: Optional[threading.Thread] = None # Define attribute type
|
|
|
|
# Setup receiver only if not using local images
|
|
if not config.USE_LOCAL_IMAGES:
|
|
self._setup_network_receiver() # Calls set_status internally on success/failure
|
|
else:
|
|
logging.info(
|
|
f"{log_prefix} Network receiver disabled (USE_LOCAL_IMAGES=True)."
|
|
)
|
|
# Status will be set later by image loader completion
|
|
|
|
# 9. Initial Image Load Thread (runs in background)
|
|
self._start_initial_image_loader() # Calls _set_initial_display_from_loaded_data upon completion
|
|
|
|
# 10. Start Queue Processors (run periodically on main thread)
|
|
self.process_sar_queue()
|
|
self.process_mfd_queue()
|
|
self.process_mouse_queue()
|
|
self.process_tkinter_queue()
|
|
logging.debug(f"{log_prefix} Queue processors scheduled.")
|
|
|
|
# 11. Start Periodic Status Updates (runs periodically on main thread)
|
|
self.schedule_periodic_updates()
|
|
logging.debug(f"{log_prefix} Periodic updates scheduled.")
|
|
|
|
# 12. Set initial image mode based on config (calls TestModeManager activate/deactivate)
|
|
self.update_image_mode()
|
|
logging.debug(f"{log_prefix} Initial image mode set.")
|
|
|
|
# Final status is set by _set_initial_display_from_loaded_data or map manager after loading
|
|
|
|
logging.info(f"{log_prefix} Application initialization sequence complete.")
|
|
|
|
# --- Initialization Helper Methods ---
|
|
|
|
def _get_screen_dimensions(self) -> Tuple[int, int]:
|
|
"""Gets primary screen dimensions using screeninfo, returning defaults on error."""
|
|
log_prefix = "[App Init]" # Part of initialization
|
|
try:
|
|
# Get list of monitors
|
|
monitors = screeninfo.get_monitors()
|
|
if not monitors:
|
|
# Raise specific error if no monitors found
|
|
raise screeninfo.ScreenInfoError("No monitors detected by screeninfo.")
|
|
# Use the first monitor as primary
|
|
screen = monitors[0]
|
|
screen_w: int = screen.width
|
|
screen_h: int = screen.height
|
|
logging.debug(
|
|
f"{log_prefix} Detected Screen Dimensions: {screen_w}x{screen_h}"
|
|
)
|
|
return screen_w, screen_h
|
|
except Exception as e:
|
|
# Log warning and return default values on any error
|
|
logging.warning(
|
|
f"{log_prefix} Screen info error: {e}. Using default dimensions 1920x1080."
|
|
)
|
|
return 1920, 1080
|
|
|
|
def _calculate_initial_sar_size(
|
|
self, desired_factor_if_map: int = 4
|
|
) -> Tuple[int, int]:
|
|
"""
|
|
Calculates initial SAR display size based on config and map state.
|
|
Allows specifying the desired reduction factor if the map is active.
|
|
Updates AppState if map overlay forces a different size.
|
|
|
|
Args:
|
|
desired_factor_if_map (int): The reduction factor (e.g., 4 for 1:4, 5 for 1:5)
|
|
to use if the map overlay is active. Defaults to 4.
|
|
|
|
Returns:
|
|
Tuple[int, int]: (initial_width, initial_height) to use for window positioning.
|
|
"""
|
|
log_prefix = "[App Init]"
|
|
initial_w = self.state.sar_display_width
|
|
initial_h = self.state.sar_display_height
|
|
|
|
map_enabled_and_loaded = config.ENABLE_MAP_OVERLAY and MAP_MODULES_LOADED
|
|
if map_enabled_and_loaded:
|
|
# Use the specified factor for SAR size when map is active
|
|
forced_factor = max(1, desired_factor_if_map) # Ensure factor is at least 1
|
|
initial_w = config.SAR_WIDTH // forced_factor
|
|
initial_h = config.SAR_HEIGHT // forced_factor
|
|
# Update the AppState with the map-enforced size
|
|
self.state.update_sar_display_size(initial_w, initial_h)
|
|
logging.info(
|
|
f"{log_prefix} Map overlay active, forcing SAR display size to 1:{forced_factor} "
|
|
f"({initial_w}x{initial_h})."
|
|
)
|
|
else:
|
|
logging.debug(
|
|
f"{log_prefix} Using initial SAR display size from state: {initial_w}x{initial_h}."
|
|
)
|
|
|
|
return initial_w, initial_h
|
|
|
|
def _calculate_tkinter_position(self, screen_h: int) -> Tuple[int, int]:
|
|
"""Calculates the initial X, Y position for the Tkinter control panel window."""
|
|
# Position near top-left, leaving space for MFD above if needed
|
|
x = 10
|
|
# Place below the default MFD window height + some padding
|
|
y = config.INITIAL_MFD_HEIGHT + 40
|
|
# Check if calculated Y position pushes window off-screen
|
|
if y + config.TKINTER_MIN_HEIGHT > screen_h:
|
|
# If off-screen, adjust Y upwards, ensuring some padding from top/bottom
|
|
y = max(10, screen_h - config.TKINTER_MIN_HEIGHT - 10)
|
|
return x, y
|
|
|
|
def _calculate_mfd_position(self) -> Tuple[int, int]:
|
|
"""Calculates the initial X, Y position for the MFD display window."""
|
|
# Align X with Tkinter window, place near the top
|
|
x = self.tkinter_x # Use the already calculated tkinter_x
|
|
y = 10
|
|
return x, y
|
|
|
|
def _calculate_sar_position(
|
|
self, screen_w: int, initial_sar_w: int
|
|
) -> Tuple[int, int]:
|
|
"""
|
|
Calculates the initial X, Y position for the SAR display window.
|
|
|
|
Args:
|
|
screen_w (int): Width of the screen.
|
|
initial_sar_w (int): Calculated initial width of the SAR window.
|
|
|
|
Returns:
|
|
Tuple[int, int]: (x_pos, y_pos) for the SAR window.
|
|
"""
|
|
# Place SAR window to the right of the Tkinter window + padding
|
|
x = self.tkinter_x + config.TKINTER_MIN_WIDTH + 20
|
|
# Align top with MFD window
|
|
y = 10
|
|
# Check if calculated X position pushes window off-screen
|
|
if x + initial_sar_w > screen_w:
|
|
# If off-screen, adjust X leftwards, ensuring padding
|
|
x = max(10, screen_w - initial_sar_w - 10)
|
|
return x, y
|
|
|
|
def _calculate_map_position(
|
|
self,
|
|
screen_w: int,
|
|
current_sar_w: int,
|
|
max_map_width: int = 512, # Add max width argument
|
|
) -> Tuple[int, int]:
|
|
"""
|
|
Calculates the initial X, Y position for the Map display window.
|
|
|
|
Args:
|
|
screen_w (int): Width of the screen.
|
|
current_sar_w (int): Current width of the SAR window (potentially resized).
|
|
max_map_width (int): The maximum expected width of the map window for bounds check.
|
|
|
|
Returns:
|
|
Tuple[int, int]: (x_pos, y_pos) for the Map window.
|
|
"""
|
|
# Place Map window to the right of the SAR window + padding
|
|
x = self.sar_x + current_sar_w + 20
|
|
# Align top with SAR/MFD window
|
|
y = 10
|
|
# Check if calculated X position pushes window off-screen, using max_map_width
|
|
if x + max_map_width > screen_w:
|
|
# If off-screen, adjust X leftwards, ensuring padding
|
|
x = max(10, screen_w - max_map_width - 10)
|
|
return x, y
|
|
|
|
def _setup_network_receiver(self):
|
|
"""Creates and starts the UDP socket and receiver thread."""
|
|
log_prefix = "[App Init Network]"
|
|
logging.info(
|
|
f"{log_prefix} Attempting to start network receiver on {self.local_ip}:{self.local_port}"
|
|
)
|
|
|
|
# Create the UDP socket using the network utility function
|
|
self.udp_socket = create_udp_socket(self.local_ip, self.local_port)
|
|
|
|
if self.udp_socket:
|
|
# If socket created successfully, initialize the receiver
|
|
try:
|
|
self.udp_receiver = UdpReceiver(
|
|
app=self, # Pass App instance for state/config access
|
|
udp_socket=self.udp_socket,
|
|
set_new_sar_image_callback=self.handle_new_sar_data, # Pass SAR handler method
|
|
set_new_mfd_indices_image_callback=self.handle_new_mfd_data, # Pass MFD handler method
|
|
)
|
|
logging.info(f"{log_prefix} UdpReceiver instance created.")
|
|
|
|
# Start the receiver loop in a separate daemon thread
|
|
self.udp_thread = threading.Thread(
|
|
target=self.udp_receiver.receive_udp_data,
|
|
name="UDPReceiverThread",
|
|
daemon=True, # Allows app to exit even if this thread hangs (though it checks shutdown flag)
|
|
)
|
|
self.udp_thread.start()
|
|
logging.info(f"{log_prefix} UDP Receiver thread started.")
|
|
# Set status to indicate listening only after successful setup
|
|
self.set_status(f"Listening UDP {self.local_ip}:{self.local_port}")
|
|
|
|
except Exception as receiver_init_e:
|
|
# Log critical error if UdpReceiver initialization fails
|
|
logging.critical(
|
|
f"{log_prefix} Failed to initialize UdpReceiver: {receiver_init_e}",
|
|
exc_info=True,
|
|
)
|
|
self.set_status("Error: Receiver Init Failed")
|
|
# Close the socket if receiver init failed
|
|
if self.udp_socket:
|
|
close_udp_socket(self.udp_socket)
|
|
self.udp_socket = None
|
|
else:
|
|
# Log error and set status if socket creation failed
|
|
logging.error(f"{log_prefix} UDP socket creation failed.")
|
|
self.set_status("Error: UDP Socket Failed")
|
|
|
|
def _start_initial_image_loader(self):
|
|
"""Starts a background thread to load local/test images into AppState if needed."""
|
|
log_prefix = "[App Init]"
|
|
# Determine if loading is needed based on config flags
|
|
# Load if using local images OR if test mode is enabled by default
|
|
should_load = config.USE_LOCAL_IMAGES or config.ENABLE_TEST_MODE
|
|
|
|
if should_load:
|
|
logging.debug(f"{log_prefix} Starting initial image loading thread...")
|
|
# Create and start the thread
|
|
image_loading_thread = threading.Thread(
|
|
target=self.load_initial_images, # Target function in App
|
|
name="ImageLoaderThread",
|
|
daemon=True, # Allow app to exit even if this thread hangs
|
|
)
|
|
image_loading_thread.start()
|
|
else:
|
|
logging.debug(
|
|
f"{log_prefix} Skipping initial image loading (USE_LOCAL_IMAGES=False, ENABLE_TEST_MODE=False)."
|
|
)
|
|
# If not loading images, set initial display immediately (placeholders or network wait)
|
|
# Need to ensure status is set correctly if not loading anything.
|
|
# Call the display setup directly, skipping the thread load.
|
|
if self.root and self.root.winfo_exists():
|
|
self.root.after_idle(self._set_initial_display_from_loaded_data)
|
|
|
|
def _update_initial_ui_labels(self):
|
|
"""Sets the initial text for UI info labels based on default AppState."""
|
|
log_prefix = "[App Init]"
|
|
logging.debug(f"{log_prefix} Setting initial UI info labels...")
|
|
if not hasattr(self, "control_panel") or not self.control_panel:
|
|
logging.warning(
|
|
f"{log_prefix} Control panel not ready, cannot set initial labels."
|
|
)
|
|
return
|
|
|
|
try:
|
|
# --- Set SAR Center Label ---
|
|
# Use default geo info from state for initial display
|
|
default_geo = self.state.current_sar_geo_info
|
|
center_txt = "Image Ref: Lat=N/A, Lon=N/A" # Default text
|
|
# Only format if initial state claims validity (unlikely, but possible)
|
|
if default_geo and default_geo.get("valid", False):
|
|
try:
|
|
# Convert default radians back to degrees for display formatting
|
|
lat_s = decimal_to_dms(math.degrees(default_geo["lat"]), True)
|
|
lon_s = decimal_to_dms(math.degrees(default_geo["lon"]), False)
|
|
center_txt = f"Image Ref: Lat={lat_s}, Lon={lon_s}"
|
|
except KeyError as ke:
|
|
logging.error(
|
|
f"{log_prefix} Missing key '{ke}' in initial default geo info."
|
|
)
|
|
center_txt = "Image Ref: Data Error"
|
|
except Exception as format_err:
|
|
logging.error(
|
|
f"{log_prefix} Error formatting initial geo label: {format_err}"
|
|
)
|
|
center_txt = "Image Ref: Format Error"
|
|
|
|
# Safely update the label widget, checking existence
|
|
if hasattr(self.control_panel, "sar_center_label"):
|
|
self.control_panel.sar_center_label.config(text=center_txt)
|
|
|
|
# --- Set SAR Orientation Label ---
|
|
# Use the dedicated method on ControlPanel if available
|
|
if hasattr(self.control_panel, "set_sar_orientation"):
|
|
self.control_panel.set_sar_orientation("N/A") # Initial value
|
|
|
|
# --- Set Mouse Coordinates Label ---
|
|
# Use the dedicated method on ControlPanel if available
|
|
if hasattr(self.control_panel, "set_mouse_coordinates"):
|
|
self.control_panel.set_mouse_coordinates("N/A", "N/A") # Initial value
|
|
|
|
# --- Set Statistics Labels ---
|
|
# Set initial text for drop/incomplete labels
|
|
initial_stats = self.state.get_statistics() # Get initial zeroed stats
|
|
drop_txt = (
|
|
f"Drop(Q): S={initial_stats['dropped_sar_q']},M={initial_stats['dropped_mfd_q']}, "
|
|
f"Tk={initial_stats['dropped_tk_q']},Mo={initial_stats['dropped_mouse_q']}"
|
|
)
|
|
incmpl_txt = (
|
|
f"Incmpl(RX): S={initial_stats['incomplete_sar_rx']},"
|
|
f"M={initial_stats['incomplete_mfd_rx']}"
|
|
)
|
|
|
|
if hasattr(self.control_panel, "dropped_label"):
|
|
self.control_panel.dropped_label.config(text=drop_txt)
|
|
if hasattr(self.control_panel, "incomplete_label"):
|
|
self.control_panel.incomplete_label.config(text=incmpl_txt)
|
|
|
|
logging.debug(f"{log_prefix} Initial UI state labels set.")
|
|
except tk.TclError as e:
|
|
# Catch potential errors if UI elements are destroyed prematurely
|
|
logging.warning(
|
|
f"{log_prefix} Error setting initial UI labels (TclError): {e}"
|
|
)
|
|
except Exception as e:
|
|
# Catch other unexpected errors
|
|
logging.exception(
|
|
f"{log_prefix} Unexpected error setting initial UI labels:"
|
|
)
|
|
|
|
# --- Network Data Handlers ---
|
|
def handle_new_sar_data(
|
|
self, normalized_image_uint8: np.ndarray, geo_info_radians: Dict[str, Any]
|
|
):
|
|
"""
|
|
Safely handles new SAR data received from the network receiver.
|
|
Updates AppState and schedules main thread processing.
|
|
This method is called by the UdpReceiver instance.
|
|
"""
|
|
log_prefix = "[App CB SAR]" # Callback prefix
|
|
logging.debug(
|
|
f"{log_prefix} Handling new SAR data (Shape: {normalized_image_uint8.shape}, Geo Valid: {geo_info_radians.get('valid', False)})..."
|
|
)
|
|
|
|
# Check shutdown flag before processing
|
|
if self.state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown detected. Ignoring new SAR data.")
|
|
return
|
|
|
|
# Update the shared application state with the received data
|
|
# Assume receiver already made copies if necessary before calling back
|
|
self.state.set_sar_data(normalized_image_uint8, geo_info_radians)
|
|
logging.debug(f"{log_prefix} SAR data and GeoInfo updated in AppState.")
|
|
|
|
# Schedule the main thread processing logic using after_idle
|
|
# This ensures UI updates and processing happen safely on the GUI thread
|
|
if self.root and self.root.winfo_exists():
|
|
logging.debug(
|
|
f"{log_prefix} Scheduling _process_sar_update_on_main_thread."
|
|
)
|
|
self.root.after_idle(self._process_sar_update_on_main_thread)
|
|
else:
|
|
# Log warning if root window is gone, cannot schedule update
|
|
logging.warning(
|
|
f"{log_prefix} Cannot schedule SAR update: Root window destroyed or not available."
|
|
)
|
|
|
|
def handle_new_mfd_data(self, image_indices: np.ndarray):
|
|
"""
|
|
Safely handles new MFD index data received from the network receiver.
|
|
Updates AppState and schedules main thread processing.
|
|
This method is called by the UdpReceiver instance.
|
|
"""
|
|
log_prefix = "[App CB MFD]" # Callback prefix
|
|
logging.debug(
|
|
f"{log_prefix} Handling new MFD index data (Shape: {image_indices.shape})..."
|
|
)
|
|
|
|
# Check shutdown flag
|
|
if self.state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown detected. Ignoring new MFD data.")
|
|
return
|
|
|
|
# Update the shared application state with the received indices
|
|
# Assume receiver already made a copy if necessary
|
|
self.state.set_mfd_indices(image_indices)
|
|
logging.debug(f"{log_prefix} MFD indices updated in AppState.")
|
|
|
|
# Schedule the main thread processing logic using after_idle
|
|
if self.root and self.root.winfo_exists():
|
|
logging.debug(
|
|
f"{log_prefix} Scheduling _process_mfd_update_on_main_thread."
|
|
)
|
|
self.root.after_idle(self._process_mfd_update_on_main_thread)
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Cannot schedule MFD update: Root window destroyed or not available."
|
|
)
|
|
|
|
# --- Main Thread Processing Triggers ---
|
|
def _process_sar_update_on_main_thread(self):
|
|
"""
|
|
Processes SAR updates scheduled to run on the main GUI thread.
|
|
Updates UI labels, triggers image pipeline, and triggers map update.
|
|
"""
|
|
log_prefix = "[App MainThread SAR Update]"
|
|
if self.state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown detected. Skipping.")
|
|
return
|
|
logging.debug(f"{log_prefix} Processing scheduled SAR update...")
|
|
|
|
# 1. Update UI Labels based on the latest state
|
|
self._update_sar_ui_labels() # Reads from self.state
|
|
|
|
# 2. Trigger Image Processing Pipeline (for SAR display queue)
|
|
# The pipeline checks the test_mode flag internally
|
|
logging.debug(f"{log_prefix} Calling image_pipeline.process_sar_for_display...")
|
|
try:
|
|
if hasattr(self, "image_pipeline") and self.image_pipeline:
|
|
self.image_pipeline.process_sar_for_display()
|
|
else:
|
|
logging.error(f"{log_prefix} ImagePipeline not available.")
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error calling ImagePipeline for SAR:")
|
|
|
|
# 3. Trigger Map Update (if map manager exists and geo is valid)
|
|
geo_info = self.state.current_sar_geo_info # Get current info
|
|
is_geo_valid = geo_info and geo_info.get("valid", False)
|
|
# Check if map manager was initialized successfully
|
|
map_manager_active = (
|
|
hasattr(self, "map_integration_manager")
|
|
and self.map_integration_manager is not None
|
|
)
|
|
if map_manager_active and geo_info and geo_info.get("valid", False):
|
|
logging.debug(
|
|
f"{log_prefix} Calling map_integration_manager.update_map_overlay..."
|
|
)
|
|
try:
|
|
# Pass current normalized image and geo info from state to the manager
|
|
self.map_integration_manager.update_map_overlay(
|
|
self.state.current_sar_normalized, geo_info
|
|
)
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error calling map manager update:")
|
|
elif (
|
|
config.ENABLE_MAP_OVERLAY
|
|
): # Log reason for skipping only if map is supposed to be on
|
|
if not map_manager_active:
|
|
logging.debug(
|
|
f"{log_prefix} Skipping map update: MapIntegrationManager not available."
|
|
)
|
|
elif not geo_info or not geo_info.get("valid", False):
|
|
logging.debug(f"{log_prefix} Skipping map update: GeoInfo not valid.")
|
|
|
|
if is_geo_valid and config.ENABLE_KML_GENERATION:
|
|
kml_log_prefix = "[App KML]"
|
|
logging.debug(f"{kml_log_prefix} KML generation enabled. Proceeding...")
|
|
try:
|
|
# Assicurati che la cartella di output esista
|
|
kml_dir = config.KML_OUTPUT_DIRECTORY
|
|
os.makedirs(kml_dir, exist_ok=True)
|
|
|
|
# Crea un nome file univoco (es. basato su timestamp)
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
|
|
kml_filename = f"sar_footprint_{timestamp}.kml"
|
|
kml_output_path = os.path.join(kml_dir, kml_filename)
|
|
|
|
# Genera il KML
|
|
logging.debug(
|
|
f"{kml_log_prefix} Calling generate_sar_kml for path: {kml_output_path}"
|
|
)
|
|
kml_success = generate_sar_kml(
|
|
geo_info, kml_output_path
|
|
) # Passa geo_info
|
|
|
|
if kml_success:
|
|
logging.debug(
|
|
f"{kml_log_prefix} KML file generated successfully: {kml_output_path}"
|
|
)
|
|
|
|
logging.debug(f"{kml_log_prefix} Calling KML cleanup utility...")
|
|
try:
|
|
cleanup_old_kml_files(config.KML_OUTPUT_DIRECTORY, config.MAX_KML_FILES)
|
|
except Exception as cleanup_e:
|
|
# Log error during cleanup but don't stop subsequent actions (like GE launch)
|
|
logging.exception(f"{kml_log_prefix} Error during KML cleanup call:")
|
|
|
|
# Lancia Google Earth se richiesto
|
|
if config.AUTO_LAUNCH_GOOGLE_EARTH:
|
|
logging.debug(
|
|
f"{kml_log_prefix} Auto-launch Google Earth enabled. Calling launch function..."
|
|
)
|
|
launch_google_earth(kml_output_path)
|
|
else:
|
|
logging.debug(
|
|
f"{kml_log_prefix} Auto-launch Google Earth disabled."
|
|
)
|
|
else:
|
|
logging.error(f"{kml_log_prefix} KML file generation failed.")
|
|
|
|
except ImportError as ie:
|
|
# Logga se manca una libreria necessaria per KML
|
|
logging.error(
|
|
f"{kml_log_prefix} Cannot generate KML due to missing library: {ie}"
|
|
)
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{kml_log_prefix} Error during KML generation/launch process:"
|
|
)
|
|
elif is_geo_valid and not config.ENABLE_KML_GENERATION:
|
|
logging.debug(f"{log_prefix} KML generation disabled in config.")
|
|
|
|
# 4. Update FPS Statistics for SAR
|
|
self._update_fps_stats("sar") # Updates self.state counters
|
|
|
|
logging.debug(f"{log_prefix} Finished processing SAR update.")
|
|
|
|
def _process_mfd_update_on_main_thread(self):
|
|
"""
|
|
Processes MFD updates scheduled to run on the main GUI thread.
|
|
Triggers image pipeline and updates FPS stats.
|
|
"""
|
|
log_prefix = "[App MainThread MFD Update]"
|
|
if self.state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown detected. Skipping.")
|
|
return
|
|
logging.debug(f"{log_prefix} Processing scheduled MFD update...")
|
|
|
|
# 1. Trigger Image Processing Pipeline for MFD display
|
|
# The pipeline checks the test_mode flag internally
|
|
logging.debug(f"{log_prefix} Calling image_pipeline.process_mfd_for_display...")
|
|
try:
|
|
if hasattr(self, "image_pipeline") and self.image_pipeline:
|
|
self.image_pipeline.process_mfd_for_display()
|
|
else:
|
|
logging.error(f"{log_prefix} ImagePipeline not available.")
|
|
except Exception as e:
|
|
logging.exception(f"{log_prefix} Error calling ImagePipeline for MFD:")
|
|
|
|
# 2. Update FPS Statistics for MFD
|
|
self._update_fps_stats("mfd") # Updates self.state counters
|
|
|
|
logging.debug(f"{log_prefix} Finished processing MFD update.")
|
|
|
|
def _update_sar_ui_labels(self):
|
|
"""Helper method to update SAR related UI labels from AppState (runs in main thread)."""
|
|
log_prefix = (
|
|
"[App MainThread SAR Update]" # Part of the main thread update cycle
|
|
)
|
|
# Check if control panel exists and is valid
|
|
if (
|
|
not hasattr(self, "control_panel")
|
|
or not self.control_panel
|
|
or not self.control_panel.winfo_exists()
|
|
):
|
|
# logging.debug(f"{log_prefix} Control panel not available for UI label update.") # Can be noisy
|
|
return
|
|
|
|
geo_info = self.state.current_sar_geo_info
|
|
center_txt = "Image Ref: N/A" # Default text
|
|
orient_txt = "N/A" # Default text
|
|
is_valid_geo = geo_info and geo_info.get("valid", False)
|
|
|
|
if is_valid_geo:
|
|
try:
|
|
# Convert radians back to degrees for display formatting
|
|
lat_d = math.degrees(geo_info["lat"])
|
|
lon_d = math.degrees(geo_info["lon"])
|
|
orient_d = math.degrees(geo_info["orientation"])
|
|
# Format using the utility function
|
|
lat_s = decimal_to_dms(lat_d, is_latitude=True)
|
|
lon_s = decimal_to_dms(lon_d, is_latitude=False)
|
|
# Format orientation string
|
|
orient_txt = f"{orient_d:.2f}°" # Show degrees with 2 decimal places
|
|
# Format center string
|
|
center_txt = f"Image Ref: Lat={lat_s}, Lon={lon_s}"
|
|
|
|
scale_x = geo_info.get("scale_x", 0.0)
|
|
scale_y = geo_info.get("scale_y", 0.0)
|
|
width_px = geo_info.get("width_px", 0)
|
|
height_px = geo_info.get("height_px", 0)
|
|
|
|
if scale_x > 0 and width_px > 0 and scale_y > 0 and height_px > 0:
|
|
width_km = (scale_x * width_px) / 1000.0
|
|
height_km = (scale_y * height_px) / 1000.0
|
|
# Formatta la stringa (es. W: 10.5 km, H: 9.8 km)
|
|
size_txt = f"W: {width_km:.1f} km, H: {height_km:.1f} km"
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Cannot calculate SAR size in km due to invalid scale/dimensions in GeoInfo."
|
|
)
|
|
size_txt = "Invalid Scale/Dims"
|
|
|
|
except KeyError as ke:
|
|
# Log error if expected keys are missing in geo_info
|
|
logging.error(
|
|
f"{log_prefix} Missing key '{ke}' in geo_info for UI update."
|
|
)
|
|
center_txt = "Ref: Data Error"
|
|
orient_txt = "Data Error"
|
|
is_valid_geo = False # Mark as invalid if data is incomplete
|
|
except Exception as e:
|
|
# Log other formatting errors
|
|
logging.error(f"{log_prefix} Error formatting geo info for UI: {e}")
|
|
center_txt = "Ref: Format Error"
|
|
orient_txt = "Format Error"
|
|
is_valid_geo = False # Mark as invalid on formatting error
|
|
|
|
# --- Safely update UI elements ---
|
|
try:
|
|
# Update Center Label
|
|
if hasattr(self.control_panel, "sar_center_label"):
|
|
self.control_panel.sar_center_label.config(text=center_txt)
|
|
# Update Orientation Label (using ControlPanel method)
|
|
if hasattr(self.control_panel, "set_sar_orientation"):
|
|
self.control_panel.set_sar_orientation(orient_txt)
|
|
if hasattr(self.control_panel, "set_sar_size_km"):
|
|
self.control_panel.set_sar_size_km(size_txt)
|
|
# Reset mouse coordinates display if geo becomes invalid
|
|
if not is_valid_geo and hasattr(
|
|
self.control_panel, "set_mouse_coordinates"
|
|
):
|
|
self.control_panel.set_mouse_coordinates("N/A", "N/A")
|
|
# Log success at debug level
|
|
# logging.debug(f"{log_prefix} SAR UI labels updated.") # Can be noisy
|
|
|
|
except tk.TclError as ui_err:
|
|
# Catch errors if UI widgets are destroyed prematurely (e.g., during shutdown)
|
|
if not self.state.shutting_down:
|
|
logging.warning(
|
|
f"{log_prefix} Error updating SAR UI labels (TclError): {ui_err}"
|
|
)
|
|
except Exception as gen_err:
|
|
# Catch other unexpected errors during UI update
|
|
logging.exception(f"{log_prefix} Unexpected error updating SAR UI labels:")
|
|
|
|
def _update_fps_stats(self, img_type: str):
|
|
"""Helper function to update FPS counters in AppState based on frame processing."""
|
|
now = time.time() # Get current time
|
|
log_prefix = "[App FPS Update]"
|
|
|
|
try:
|
|
if img_type == "sar":
|
|
# Increment SAR frame count in state
|
|
self.state.sar_frame_count += 1
|
|
# Check if enough time has passed to calculate FPS
|
|
elapsed = now - self.state.sar_update_time
|
|
if elapsed >= config.LOG_UPDATE_INTERVAL:
|
|
# Calculate FPS
|
|
self.state.sar_fps = self.state.sar_frame_count / elapsed
|
|
# Reset timer and counter for next interval
|
|
self.state.sar_update_time = now
|
|
self.state.sar_frame_count = 0
|
|
# Log calculated FPS at debug level
|
|
logging.debug(
|
|
f"{log_prefix} SAR FPS calculated: {self.state.sar_fps:.2f}"
|
|
)
|
|
elif img_type == "mfd":
|
|
# Increment MFD frame count in state
|
|
self.state.mfd_frame_count += 1
|
|
# Check if enough time has passed
|
|
elapsed = now - self.state.mfd_start_time
|
|
if elapsed >= config.LOG_UPDATE_INTERVAL:
|
|
# Calculate FPS
|
|
self.state.mfd_fps = self.state.mfd_frame_count / elapsed
|
|
# Reset timer and counter
|
|
self.state.mfd_start_time = now
|
|
self.state.mfd_frame_count = 0
|
|
# Log calculated FPS at debug level
|
|
logging.debug(
|
|
f"{log_prefix} MFD FPS calculated: {self.state.mfd_fps:.2f}"
|
|
)
|
|
except Exception as e:
|
|
# Prevent FPS calculation errors from stopping the application
|
|
logging.warning(
|
|
f"{log_prefix} Error updating FPS stats for '{img_type}': {e}"
|
|
)
|
|
|
|
# --- Trigger Methods ---
|
|
# These methods are typically called by UI callbacks after parameters change.
|
|
# They ensure the correct processing pipeline (or test mode update) is triggered.
|
|
|
|
def _trigger_sar_update(self):
|
|
"""Triggers a SAR image update processing via ImagePipeline if not in test mode."""
|
|
log_prefix = "[App Trigger SAR]"
|
|
if self.state.shutting_down:
|
|
return # Ignore if shutting down
|
|
|
|
# Check if test mode is active. Test mode updates are handled by TestModeManager's timer.
|
|
if not self.state.test_mode_active:
|
|
# If not in test mode, trigger the normal image processing pipeline.
|
|
logging.debug(
|
|
f"{log_prefix} Triggering SAR update processing via ImagePipeline."
|
|
)
|
|
try:
|
|
# Ensure image_pipeline exists before calling
|
|
if hasattr(self, "image_pipeline") and self.image_pipeline:
|
|
self.image_pipeline.process_sar_for_display()
|
|
else:
|
|
logging.error(
|
|
f"{log_prefix} 'image_pipeline' not found. Cannot trigger SAR update."
|
|
)
|
|
except Exception as e:
|
|
# Log exceptions during the pipeline call
|
|
logging.exception(
|
|
f"{log_prefix} Error calling image_pipeline.process_sar_for_display:"
|
|
)
|
|
else:
|
|
# Log that the trigger is skipped because test mode is active.
|
|
logging.debug(
|
|
f"{log_prefix} SAR update trigger skipped (Test Mode active, handled by TestModeManager timer)."
|
|
)
|
|
|
|
def _trigger_mfd_update(self):
|
|
"""Triggers an MFD image update processing via ImagePipeline if not in test mode."""
|
|
log_prefix = "[App Trigger MFD]"
|
|
if self.state.shutting_down:
|
|
return # Ignore if shutting down
|
|
|
|
# Check if test mode is active. Test mode updates are handled by TestModeManager's timer.
|
|
if not self.state.test_mode_active:
|
|
# If not in test mode, trigger the normal image processing pipeline.
|
|
logging.debug(
|
|
f"{log_prefix} Triggering MFD update processing via ImagePipeline."
|
|
)
|
|
try:
|
|
# Ensure image_pipeline exists before calling
|
|
if hasattr(self, "image_pipeline") and self.image_pipeline:
|
|
self.image_pipeline.process_mfd_for_display()
|
|
else:
|
|
logging.error(
|
|
f"{log_prefix} 'image_pipeline' not found. Cannot trigger MFD update."
|
|
)
|
|
except Exception as e:
|
|
# Log exceptions during the pipeline call
|
|
logging.exception(
|
|
f"{log_prefix} Error calling image_pipeline.process_mfd_for_display:"
|
|
)
|
|
else:
|
|
# Log that the trigger is skipped because test mode is active.
|
|
logging.debug(
|
|
f"{log_prefix} MFD update trigger skipped (Test Mode active, handled by TestModeManager timer)."
|
|
)
|
|
|
|
# --- Periodic Update Scheduling ---
|
|
def schedule_periodic_updates(self):
|
|
"""Schedules the regular update of the status bar information."""
|
|
log_prefix = "[App Status Scheduler]"
|
|
# Stop scheduling if application is shutting down
|
|
if self.state.shutting_down:
|
|
logging.debug(f"{log_prefix} Shutdown detected. Stopping periodic updates.")
|
|
return
|
|
|
|
# Call the update function first to update status immediately
|
|
try:
|
|
self.update_status()
|
|
except Exception as e:
|
|
# Log error during update but continue scheduling
|
|
logging.error(
|
|
f"{log_prefix} Error during periodic status update execution: {e}"
|
|
)
|
|
|
|
# Calculate interval in milliseconds from config (seconds)
|
|
# Ensure a minimum delay to prevent overly frequent updates
|
|
interval_ms = max(100, int(config.LOG_UPDATE_INTERVAL * 1000))
|
|
|
|
# Schedule the next call using root.after, checking root existence
|
|
try:
|
|
if self.root and self.root.winfo_exists():
|
|
self.root.after(interval_ms, self.schedule_periodic_updates)
|
|
# else: Don't log warning if root is gone, expected during shutdown
|
|
except Exception as e:
|
|
# Log error only if not shutting down
|
|
if not self.state.shutting_down:
|
|
logging.warning(f"{log_prefix} Error rescheduling periodic update: {e}")
|
|
|
|
# --- Initial Image Loading ---
|
|
def load_initial_images(self):
|
|
"""
|
|
(Runs in background thread) Loads initial local images into AppState
|
|
and ensures test images are generated if needed. Calls UI setup upon completion.
|
|
"""
|
|
log_prefix = "[App Image Loader]"
|
|
if self.state.shutting_down:
|
|
logging.debug(
|
|
f"{log_prefix} Shutdown detected. Aborting image loading thread."
|
|
)
|
|
return
|
|
logging.info(f"{log_prefix} Initial image loading thread started.")
|
|
|
|
# Schedule initial status update on main thread (thread-safe via after_idle)
|
|
if self.root and self.root.winfo_exists():
|
|
self.root.after_idle(self.set_status, "Loading initial images...")
|
|
|
|
try:
|
|
# Ensure test images are generated if test mode is enabled initially
|
|
# The manager handles the check if data already exists.
|
|
if config.ENABLE_TEST_MODE or self.state.test_mode_active:
|
|
if hasattr(self, "test_mode_manager") and self.test_mode_manager:
|
|
logging.debug(
|
|
f"{log_prefix} Ensuring test images are generated via TestModeManager..."
|
|
)
|
|
self.test_mode_manager._ensure_test_images() # Call manager's internal method
|
|
else:
|
|
logging.error(
|
|
f"{log_prefix} TestModeManager not available to generate test images."
|
|
)
|
|
|
|
# Load local images if configured
|
|
if config.USE_LOCAL_IMAGES:
|
|
logging.debug(f"{log_prefix} Loading local MFD image...")
|
|
self._load_local_mfd_image() # Loads into self.state.local_mfd_image_data_indices
|
|
logging.debug(f"{log_prefix} Loading local SAR image...")
|
|
self._load_local_sar_image() # Loads into self.state.local_sar_image_data_raw
|
|
|
|
# Schedule the final display setup on the main thread after loading is complete
|
|
if self.root and self.root.winfo_exists():
|
|
logging.debug(
|
|
f"{log_prefix} Scheduling _set_initial_display_from_loaded_data on main thread."
|
|
)
|
|
self.root.after_idle(self._set_initial_display_from_loaded_data)
|
|
|
|
except Exception as e:
|
|
# Log any exceptions during the loading process
|
|
logging.exception(f"{log_prefix} Error during initial image loading:")
|
|
# Schedule error status update on main thread
|
|
if self.root and self.root.winfo_exists():
|
|
self.root.after_idle(self.set_status, "Error Loading Images")
|
|
finally:
|
|
# Log thread completion
|
|
logging.info(f"{log_prefix} Initial image loading thread finished.")
|
|
|
|
def _load_local_mfd_image(self):
|
|
"""Loads local MFD image data (indices) into AppState."""
|
|
log_prefix = "[App Image Loader]"
|
|
# Default to random indices if loading fails
|
|
default_indices = np.random.randint(
|
|
0, 256, (config.MFD_HEIGHT, config.MFD_WIDTH), dtype=np.uint8
|
|
)
|
|
loaded_indices = None
|
|
try:
|
|
mfd_path = config.MFD_IMAGE_PATH
|
|
if os.path.exists(mfd_path):
|
|
logging.warning(
|
|
f"{log_prefix} Local MFD loading from file is NYI. Using random data."
|
|
)
|
|
# Placeholder for actual loading logic (e.g., np.load)
|
|
# Example: loaded_indices = np.load(mfd_path).astype(np.uint8)
|
|
loaded_indices = default_indices # Use default for now
|
|
logging.debug(f"{log_prefix} Using placeholder random MFD indices.")
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Local MFD image file not found: {mfd_path}. Using random data."
|
|
)
|
|
loaded_indices = default_indices
|
|
|
|
# Store the result (loaded or default) in AppState
|
|
self.state.local_mfd_image_data_indices = loaded_indices
|
|
|
|
except Exception as e:
|
|
# Log error and ensure state has the default value
|
|
logging.exception(f"{log_prefix} Error loading local MFD image:")
|
|
self.state.local_mfd_image_data_indices = default_indices
|
|
|
|
def _load_local_sar_image(self):
|
|
"""Loads local SAR image data (raw) into AppState."""
|
|
log_prefix = "[App Image Loader]"
|
|
# Default to zeros if loading fails
|
|
default_raw_data = np.zeros(
|
|
(config.SAR_HEIGHT, config.SAR_WIDTH), dtype=config.SAR_DATA_TYPE
|
|
)
|
|
loaded_raw_data = None
|
|
try:
|
|
# Use the load_image utility from image_processing module
|
|
# This handles file existence check, loading, and type conversion
|
|
loaded_raw_data = load_image(
|
|
config.SAR_IMAGE_PATH, config.SAR_DATA_TYPE
|
|
) # load_image logs internally
|
|
|
|
if loaded_raw_data is None or loaded_raw_data.size == 0:
|
|
# load_image returns placeholder on error, check if it's different from default
|
|
logging.error(
|
|
f"{log_prefix} Failed to load local SAR raw data from {config.SAR_IMAGE_PATH}. Using zeros."
|
|
)
|
|
loaded_raw_data = default_raw_data # Ensure zeros on failure
|
|
else:
|
|
logging.info(
|
|
f"{log_prefix} Loaded local SAR raw data into AppState (shape: {loaded_raw_data.shape})."
|
|
)
|
|
|
|
# Store the result (loaded or default) in AppState
|
|
self.state.local_sar_image_data_raw = loaded_raw_data
|
|
|
|
except Exception as e:
|
|
# Log error and ensure state has the default value
|
|
logging.exception(f"{log_prefix} Error loading local SAR raw data:")
|
|
self.state.local_sar_image_data_raw = default_raw_data
|
|
|
|
def _set_initial_display_from_loaded_data(self):
|
|
"""
|
|
(Runs in main thread) Sets the initial display based on loaded image data
|
|
(if any) and the current application mode (Test, Local, Network).
|
|
Also sets the final initial status message if the map isn't loading.
|
|
"""
|
|
log_prefix = "[App Init Display]"
|
|
if self.state.shutting_down:
|
|
logging.debug(
|
|
f"{log_prefix} Shutdown detected. Skipping initial display setup."
|
|
)
|
|
return
|
|
|
|
is_test = self.state.test_mode_active
|
|
is_local = config.USE_LOCAL_IMAGES
|
|
|
|
# Determine initial display content based on mode
|
|
if not is_test and is_local:
|
|
# --- Local Image Mode ---
|
|
logging.info(
|
|
f"{log_prefix} Setting initial display based on loaded local images."
|
|
)
|
|
# Display MFD (if loaded)
|
|
if self.state.local_mfd_image_data_indices is not None:
|
|
# Update current state from the loaded local data
|
|
self.state.current_mfd_indices = (
|
|
self.state.local_mfd_image_data_indices.copy()
|
|
)
|
|
# Process the restored state via the pipeline
|
|
self.image_pipeline.process_mfd_for_display()
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Local MFD data not loaded. MFD display will be blank/stale."
|
|
)
|
|
|
|
# Display SAR (if loaded)
|
|
if self.state.local_sar_image_data_raw is not None:
|
|
# This method handles normalization, state update, UI reset, and pipeline call
|
|
self.set_initial_sar_image(self.state.local_sar_image_data_raw)
|
|
else:
|
|
# Handle case where SAR load failed but local mode is active
|
|
logging.warning(
|
|
f"{log_prefix} Local SAR data not loaded. Displaying black SAR image."
|
|
)
|
|
# Ensure normalized state buffer exists and is black
|
|
if self.state.current_sar_normalized is None:
|
|
self.state.current_sar_normalized = np.zeros(
|
|
(config.SAR_HEIGHT, config.SAR_WIDTH), dtype=np.uint8
|
|
)
|
|
self.state.current_sar_normalized.fill(0)
|
|
# Mark geo as invalid and reset UI
|
|
self.state.current_sar_geo_info["valid"] = False
|
|
self._reset_ui_geo_info()
|
|
# Process the black image via the pipeline
|
|
self.image_pipeline.process_sar_for_display()
|
|
|
|
elif is_test:
|
|
# --- Test Mode ---
|
|
logging.info(
|
|
f"{log_prefix} Test mode active. Display handled by TestModeManager timers."
|
|
)
|
|
# Activation of test mode (in update_image_mode) starts the timers.
|
|
|
|
else:
|
|
# --- Network Mode ---
|
|
logging.info(
|
|
f"{log_prefix} Network mode active. Displaying initial placeholders."
|
|
)
|
|
# Show placeholder images (these bypass the pipeline for simplicity)
|
|
self._show_network_placeholders()
|
|
|
|
# --- Set Final Initial Status ---
|
|
# Check if the map manager is active and if its initial loading thread is still running
|
|
map_manager_active = (
|
|
hasattr(self, "map_integration_manager")
|
|
and self.map_integration_manager is not None
|
|
)
|
|
map_thread_running = False
|
|
if map_manager_active:
|
|
# Check existence of thread attribute before accessing is_alive
|
|
map_thread_attr = getattr(
|
|
self.map_integration_manager, "_map_initial_display_thread", None
|
|
)
|
|
if map_thread_attr and isinstance(map_thread_attr, threading.Thread):
|
|
map_thread_running = map_thread_attr.is_alive()
|
|
|
|
map_is_loading = map_manager_active and map_thread_running
|
|
|
|
# Set the final status message *only if* the map isn't currently loading
|
|
# If map is loading, its completion callback will set the status.
|
|
if not map_is_loading:
|
|
# Determine status based on mode
|
|
final_status = (
|
|
"Ready (Test Mode)"
|
|
if is_test
|
|
else (
|
|
"Ready (Local Mode)"
|
|
if is_local
|
|
else (
|
|
f"Listening UDP {self.local_ip}:{self.local_port}"
|
|
if self.udp_socket
|
|
else "Error: No Socket"
|
|
)
|
|
)
|
|
)
|
|
self.set_status(final_status)
|
|
logging.debug(
|
|
f"{log_prefix} Set final initial status (map not loading): '{final_status}'"
|
|
)
|
|
else:
|
|
# Status 'Loading initial map...' was likely set by MapIntegrationManager init
|
|
logging.debug(
|
|
f"{log_prefix} Initial map display is still loading. Status will be updated upon completion."
|
|
)
|
|
|
|
def set_initial_sar_image(self, raw_image_data: np.ndarray):
|
|
"""
|
|
Processes provided raw SAR data (typically loaded locally), updates AppState,
|
|
resets UI Geo info, and triggers display via the ImagePipeline.
|
|
"""
|
|
log_prefix = "[App Init SAR Image]"
|
|
if self.state.shutting_down:
|
|
return # Ignore if shutting down
|
|
logging.debug(f"{log_prefix} Processing initial raw SAR image...")
|
|
|
|
normalized: Optional[np.ndarray] = None
|
|
# Validate input raw data
|
|
if raw_image_data is not None and raw_image_data.size > 0:
|
|
try:
|
|
# Normalize the raw data to uint8 using the utility function
|
|
normalized = normalize_image(
|
|
raw_image_data, target_type=np.uint8
|
|
) # Logs internally
|
|
if normalized is None:
|
|
# Log error if normalization itself failed
|
|
logging.error(f"{log_prefix} Normalization of raw SAR data failed.")
|
|
except Exception as e:
|
|
# Log any unexpected errors during normalization
|
|
logging.exception(
|
|
f"{log_prefix} Error during initial SAR normalization:"
|
|
)
|
|
else:
|
|
# Log error if raw data is invalid
|
|
logging.error(f"{log_prefix} Provided raw SAR data is invalid or empty.")
|
|
|
|
# Update AppState with the normalized image (or a black image on failure)
|
|
if normalized is not None:
|
|
self.state.current_sar_normalized = normalized
|
|
logging.debug(
|
|
f"{log_prefix} Stored normalized initial SAR image in AppState."
|
|
)
|
|
else:
|
|
# Ensure the state buffer exists before filling with zeros
|
|
logging.warning(
|
|
f"{log_prefix} Using black image fallback for initial SAR display."
|
|
)
|
|
if self.state.current_sar_normalized is None:
|
|
# Create buffer if it doesn't exist
|
|
self.state.current_sar_normalized = np.zeros(
|
|
(config.SAR_HEIGHT, config.SAR_WIDTH), dtype=np.uint8
|
|
)
|
|
# Fill existing buffer with zeros
|
|
self.state.current_sar_normalized.fill(0)
|
|
|
|
# Mark geo info as invalid for locally loaded images and reset UI display
|
|
self.state.current_sar_geo_info["valid"] = False
|
|
self._reset_ui_geo_info() # Schedule UI label reset
|
|
|
|
# Trigger the image pipeline to process and display the normalized (or black) image
|
|
logging.debug(
|
|
f"{log_prefix} Triggering display of initial SAR image via ImagePipeline."
|
|
)
|
|
if hasattr(self, "image_pipeline") and self.image_pipeline:
|
|
self.image_pipeline.process_sar_for_display() # Use the pipeline
|
|
else:
|
|
logging.error(
|
|
f"{log_prefix} ImagePipeline not available to display initial SAR."
|
|
)
|
|
|
|
logging.info(f"{log_prefix} Initial SAR image processed and queued.")
|
|
|
|
# --- Mode Switching UI Actions ---
|
|
# These handle UI/State changes NOT directly related to manager timers/data generation
|
|
|
|
def activate_test_mode_ui_actions(self):
|
|
"""Handles UI and state changes needed when activating test mode."""
|
|
log_prefix = "[App Test Activate]"
|
|
logging.info(
|
|
f"{log_prefix} Performing UI/State actions for Test Mode activation."
|
|
)
|
|
self.set_status("Activating Test Mode...") # Initial status
|
|
|
|
# Reset geo info display in UI as test mode doesn't use real geo
|
|
self._reset_ui_geo_info()
|
|
|
|
# Clear display queues immediately before test images start arriving
|
|
clear_queue(self.mfd_queue)
|
|
clear_queue(self.sar_queue)
|
|
logging.debug(f"{log_prefix} MFD and SAR display queues cleared.")
|
|
|
|
# Set final status after manager activation and queue clearing
|
|
self.set_status("Ready (Test Mode)")
|
|
logging.info(f"{log_prefix} Test Mode UI/State actions complete.")
|
|
|
|
def deactivate_test_mode_ui_actions(self):
|
|
"""Handles UI and state changes needed when deactivating test mode."""
|
|
log_prefix = "[App Test Deactivate]"
|
|
logging.info(
|
|
f"{log_prefix} Performing UI/State actions for Test Mode deactivation -> Normal Mode."
|
|
)
|
|
self.set_status("Activating Normal Mode...") # Initial status
|
|
|
|
# Test timers are stopped by TestModeManager.deactivate() before this is called
|
|
|
|
# Clear display queues of any lingering test images
|
|
clear_queue(self.mfd_queue)
|
|
clear_queue(self.sar_queue)
|
|
logging.debug(f"{log_prefix} MFD and SAR display queues cleared.")
|
|
|
|
# Reset geo info display in UI (will be updated by real data if network mode)
|
|
self._reset_ui_geo_info()
|
|
|
|
# --- Restore display based on configuration (Local or Network) ---
|
|
if config.USE_LOCAL_IMAGES:
|
|
# --- Local Image Mode Restoration ---
|
|
logging.info(
|
|
f"{log_prefix} Restoring display from local images stored in AppState."
|
|
)
|
|
# MFD restore
|
|
if self.state.local_mfd_image_data_indices is not None:
|
|
# Update current display state from the loaded local backup
|
|
self.state.current_mfd_indices = (
|
|
self.state.local_mfd_image_data_indices.copy()
|
|
)
|
|
# Process the restored state via the pipeline
|
|
self.image_pipeline.process_mfd_for_display()
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} No local MFD data in AppState to restore."
|
|
)
|
|
# Optionally queue a black MFD image or clear display? For now, do nothing.
|
|
|
|
# SAR restore
|
|
if self.state.local_sar_image_data_raw is not None:
|
|
# This method handles processing, state update, UI reset, and pipeline call
|
|
self.set_initial_sar_image(self.state.local_sar_image_data_raw)
|
|
else:
|
|
# Handle case where SAR load failed but local mode is active
|
|
logging.warning(
|
|
f"{log_prefix} No local SAR data in AppState to restore. Displaying black."
|
|
)
|
|
# Ensure normalized state is black, mark geo invalid, reset UI, and process black image
|
|
if self.state.current_sar_normalized is None:
|
|
self.state.current_sar_normalized = np.zeros(
|
|
(config.SAR_HEIGHT, config.SAR_WIDTH), dtype=np.uint8
|
|
)
|
|
self.state.current_sar_normalized.fill(0)
|
|
self.state.current_sar_geo_info["valid"] = False
|
|
self._reset_ui_geo_info()
|
|
self.image_pipeline.process_sar_for_display()
|
|
|
|
# Set status for local mode
|
|
self.set_status("Ready (Local Mode)")
|
|
|
|
else:
|
|
# --- Network Mode Restoration ---
|
|
logging.info(
|
|
f"{log_prefix} Switched to Network mode. Displaying placeholders."
|
|
)
|
|
# Queue placeholder images (these bypass the pipeline)
|
|
self._show_network_placeholders()
|
|
# Determine status based on current socket state
|
|
socket_ok = self.udp_socket is not None and self.udp_socket.fileno() != -1
|
|
status = (
|
|
f"Listening UDP {self.local_ip}:{self.local_port}"
|
|
if socket_ok
|
|
else "Error: No UDP Socket"
|
|
)
|
|
self.set_status(status)
|
|
# Geo info will be updated when valid data arrives via network handler.
|
|
|
|
logging.info(f"{log_prefix} Normal Mode UI/State actions complete.")
|
|
|
|
def _reset_ui_geo_info(self):
|
|
"""Schedules UI reset for geo-related labels on the main thread."""
|
|
log_prefix = "[App UI Reset]"
|
|
# Check if root window exists before scheduling
|
|
if self.root and self.root.winfo_exists():
|
|
# Schedule updates using after_idle for thread safety
|
|
# Check if control_panel and specific methods/widgets exist before calling config/set
|
|
if hasattr(self.control_panel, "set_sar_orientation"):
|
|
self.root.after_idle(
|
|
lambda: self.control_panel.set_sar_orientation("N/A")
|
|
)
|
|
if hasattr(self.control_panel, "set_mouse_coordinates"):
|
|
self.root.after_idle(
|
|
lambda: self.control_panel.set_mouse_coordinates("N/A", "N/A")
|
|
)
|
|
if hasattr(self.control_panel, "sar_center_label"):
|
|
self.root.after_idle(
|
|
lambda: self.control_panel.sar_center_label.config(
|
|
text="Image Ref: Lat=N/A, Lon=N/A"
|
|
)
|
|
)
|
|
logging.debug(f"{log_prefix} Geo UI label reset scheduled.")
|
|
# else: # Don't log if root gone, expected during shutdown
|
|
|
|
def _revert_test_mode_ui(self):
|
|
"""Tries to uncheck the test mode checkbox in the UI and resets the state flag."""
|
|
log_prefix = "[App Mode Switch]" # Part of mode switching logic
|
|
logging.warning(
|
|
f"{log_prefix} Reverting Test Mode UI and state due to activation failure."
|
|
)
|
|
# Schedule UI update on main thread if possible
|
|
if (
|
|
self.root
|
|
and self.root.winfo_exists()
|
|
and hasattr(self.control_panel, "test_image_var")
|
|
):
|
|
try:
|
|
# Use after_idle to ensure UI update happens when Tkinter is ready
|
|
self.root.after_idle(
|
|
self.control_panel.test_image_var.set, 0
|
|
) # Set checkbox to unchecked
|
|
logging.debug(f"{log_prefix} Scheduled UI checkbox uncheck.")
|
|
except Exception as e:
|
|
logging.warning(
|
|
f"{log_prefix} Failed to schedule uncheck of test mode checkbox: {e}"
|
|
)
|
|
|
|
# Correct state flag regardless of UI success
|
|
if hasattr(self, "state"):
|
|
self.state.test_mode_active = False
|
|
logging.debug(f"{log_prefix} Test mode state flag reverted to False.")
|
|
|
|
def _show_network_placeholders(self):
|
|
"""Queues placeholder images for MFD and SAR displays when in Network mode without data."""
|
|
log_prefix = "[App Placeholders]"
|
|
logging.debug(f"{log_prefix} Queueing network placeholder images.")
|
|
try:
|
|
# Create MFD placeholder (dark gray)
|
|
ph_mfd = np.full(
|
|
(config.INITIAL_MFD_HEIGHT, config.INITIAL_MFD_WIDTH, 3),
|
|
30,
|
|
dtype=np.uint8,
|
|
)
|
|
# Create SAR placeholder (lighter gray) - Use current display size from state
|
|
ph_sar = np.full(
|
|
(self.state.sar_display_height, self.state.sar_display_width, 3),
|
|
60,
|
|
dtype=np.uint8,
|
|
)
|
|
# Queue the placeholders directly, bypassing the processing pipeline
|
|
put_queue(self.mfd_queue, ph_mfd, "mfd", self)
|
|
put_queue(self.sar_queue, ph_sar, "sar", self)
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Error creating or queueing placeholder images:"
|
|
)
|
|
|
|
# --- Mouse Coordinate Handling ---
|
|
def process_mouse_queue(self):
|
|
"""
|
|
Processes raw mouse coords from queue, calculates geo coords (using AppState),
|
|
and queues the result string tuple onto the Tkinter queue. Runs periodically.
|
|
"""
|
|
log_prefix = "[App GeoCalc]" # Calculation prefix
|
|
if self.state.shutting_down:
|
|
return # Stop processing on shutdown
|
|
|
|
raw_coords = None
|
|
try:
|
|
# Get raw (x, y) display coordinates non-blockingly
|
|
raw_coords = self.mouse_queue.get(block=False)
|
|
# Mark task done *after* getting item successfully
|
|
self.mouse_queue.task_done()
|
|
except queue.Empty:
|
|
pass # Normal case, queue is empty
|
|
except Exception as e:
|
|
# Log error getting from queue
|
|
logging.exception(f"{log_prefix} Error getting from mouse queue:")
|
|
# Don't reschedule immediately on error? Or reschedule anyway? Reschedule for now.
|
|
pass
|
|
# else: # Process if coords were retrieved
|
|
if (
|
|
raw_coords is not None
|
|
and isinstance(raw_coords, tuple)
|
|
and len(raw_coords) == 2
|
|
):
|
|
x_disp, y_disp = raw_coords
|
|
# Proceed with calculation
|
|
geo = self.state.current_sar_geo_info
|
|
disp_w = self.state.sar_display_width
|
|
disp_h = self.state.sar_display_height
|
|
lat_s: str = "N/A" # Default result string
|
|
lon_s: str = "N/A" # Default result string
|
|
|
|
# --- Check if geo info is valid for calculation ---
|
|
# Requires valid geo dict, positive display and original dimensions, positive scale factors
|
|
is_geo_valid_for_calc = (
|
|
geo
|
|
and geo.get("valid")
|
|
and disp_w > 0
|
|
and disp_h > 0
|
|
and geo.get("width_px", 0) > 0
|
|
and geo.get("height_px", 0) > 0
|
|
and geo.get("scale_x", 0.0) > 0
|
|
and geo.get("scale_y", 0.0) > 0
|
|
and
|
|
# Check essential lat/lon keys exist
|
|
"lat" in geo
|
|
and "lon" in geo
|
|
and "ref_x" in geo
|
|
and "ref_y" in geo
|
|
)
|
|
|
|
if is_geo_valid_for_calc:
|
|
logging.debug(
|
|
f"{log_prefix} Processing mouse coords: Display({x_disp},{y_disp}) with valid GeoInfo."
|
|
)
|
|
try:
|
|
# --- Extract necessary values from state ---
|
|
orig_w = geo["width_px"]
|
|
orig_h = geo["height_px"]
|
|
scale_x = geo["scale_x"] # meters per pixel
|
|
scale_y = geo["scale_y"] # meters per pixel
|
|
ref_x = geo["ref_x"] # pixel coord of ref lat/lon
|
|
ref_y = geo["ref_y"] # pixel coord of ref lat/lon
|
|
ref_lat_rad = geo["lat"] # radians
|
|
ref_lon_rad = geo["lon"] # radians
|
|
orient_rad = geo.get(
|
|
"orientation", 0.0
|
|
) # radians (default to 0 if missing)
|
|
|
|
# Read the original orientation angle from geo data (in radians)
|
|
original_orient_rad = geo.get("orientation", 0.0)
|
|
|
|
# The SAR image displayed to the user is rotated by -original_orient_rad.
|
|
# To find the geographic coordinate corresponding to the mouse click on the
|
|
# *displayed* image, we must apply the *inverse* of the display rotation
|
|
# to the normalized mouse coordinates *before* converting to geographic space.
|
|
# The inverse of a rotation by -original_orient_rad is a rotation by +original_orient_rad.
|
|
angle_for_inverse_rotation_rad = original_orient_rad
|
|
|
|
# --- Coordinate Transformation ---
|
|
# 1. Normalize display coordinates to [0, 1] range
|
|
# Avoid division by zero checked by is_geo_valid_for_calc
|
|
nx_disp = x_disp / disp_w
|
|
ny_disp = y_disp / disp_h
|
|
# 2. Apply Inverse Rotation if needed
|
|
nx_orig_norm, ny_orig_norm = (
|
|
nx_disp,
|
|
ny_disp,
|
|
) # Start with normalized display coords
|
|
if abs(angle_for_inverse_rotation_rad) > 1e-4: # Check if rotation is significant
|
|
logging.debug(
|
|
f"{log_prefix} Applying inverse rotation (angle: {math.degrees(angle_for_inverse_rotation_rad):.2f} deg)..."
|
|
)
|
|
# Use the angle needed to undo the display rotation
|
|
arad_inv = angle_for_inverse_rotation_rad # Use the *original* angle
|
|
cosa = math.cos(arad_inv)
|
|
sina = math.sin(arad_inv)
|
|
# Rotation center (normalized coordinates: 0.5, 0.5)
|
|
cx, cy = 0.5, 0.5
|
|
# Translate coordinates relative to center
|
|
tx = nx_disp - cx
|
|
ty = ny_disp - cy
|
|
# Perform 2D rotation
|
|
rtx = tx * cosa - ty * sina
|
|
rty = tx * sina + ty * cosa
|
|
# Translate back and store result
|
|
nx_orig_norm = rtx + cx
|
|
ny_orig_norm = rty + cy
|
|
logging.debug(
|
|
f"{log_prefix} Inverse rotation applied. Norm coords: ({nx_orig_norm:.4f}, {ny_orig_norm:.4f})"
|
|
)
|
|
|
|
# 3. Convert normalized original coordinates back to original pixel coordinates
|
|
# Clamp result to valid pixel range [0, width/height - 1]
|
|
orig_x = max(0.0, min(nx_orig_norm * orig_w, orig_w - 1.0))
|
|
orig_y = max(0.0, min(ny_orig_norm * orig_h, orig_h - 1.0))
|
|
logging.debug(
|
|
f"{log_prefix} Calculated original pixel coords: ({orig_x:.2f}, {orig_y:.2f})"
|
|
)
|
|
|
|
# --- Geodetic Calculation ---
|
|
# 4. Calculate pixel offset from reference point
|
|
# Note y-axis inversion: display Y increases downwards, geo latitude increases upwards
|
|
pixel_delta_x = orig_x - ref_x
|
|
pixel_delta_y = (
|
|
ref_y - orig_y
|
|
) # Positive delta_y means pointing North from ref
|
|
# 5. Convert pixel offset to meter offset using scale factors
|
|
meters_delta_x = pixel_delta_x * scale_x # Positive delta_x = East
|
|
meters_delta_y = pixel_delta_y * scale_y # Positive delta_y = North
|
|
logging.debug(
|
|
f"{log_prefix} Calculated offset from ref (pixels): dX={pixel_delta_x:.1f}, dY={pixel_delta_y:.1f}"
|
|
)
|
|
logging.debug(
|
|
f"{log_prefix} Calculated offset from ref (meters): dX={meters_delta_x:.1f} (E), dY={meters_delta_y:.1f} (N)"
|
|
)
|
|
|
|
# 6. Convert meter offsets to degree offsets (approximate using simple sphere/ellipse model)
|
|
# Constants for WGS84 approximation
|
|
M_PER_DLAT = (
|
|
111132.954 # Approx meters per degree latitude (constant)
|
|
)
|
|
M_PER_DLON_EQ = (
|
|
111319.488 # Approx meters per degree longitude at equator
|
|
)
|
|
# Adjust meters per degree longitude based on reference latitude
|
|
# Avoid division by zero near poles (cos(pi/2)=0)
|
|
m_per_dlon = max(
|
|
abs(M_PER_DLON_EQ * math.cos(ref_lat_rad)), 1e-3
|
|
) # Use small non-zero minimum
|
|
# Calculate degree offsets
|
|
lat_offset_deg = meters_delta_y / M_PER_DLAT
|
|
lon_offset_deg = meters_delta_x / m_per_dlon
|
|
logging.debug(
|
|
f"{log_prefix} Calculated offset (degrees): dLat={lat_offset_deg:.6f}, dLon={lon_offset_deg:.6f}"
|
|
)
|
|
|
|
# 7. Calculate final coordinates by adding offsets to reference coordinates
|
|
final_lat_deg = math.degrees(ref_lat_rad) + lat_offset_deg
|
|
final_lon_deg = math.degrees(ref_lon_rad) + lon_offset_deg
|
|
logging.debug(
|
|
f"{log_prefix} Calculated final coords (dec deg): Lat={final_lat_deg:.6f}, Lon={final_lon_deg:.6f}"
|
|
)
|
|
|
|
# --- Format Output ---
|
|
# 8. Validate calculated coordinates and format to DMS string
|
|
# Check for NaN/inf and reasonable lat/lon ranges
|
|
if (
|
|
math.isfinite(final_lat_deg)
|
|
and math.isfinite(final_lon_deg)
|
|
and abs(final_lat_deg) <= 90.0
|
|
and abs(final_lon_deg) <= 180.0
|
|
):
|
|
# Use utility function for DMS conversion
|
|
lat_s = decimal_to_dms(final_lat_deg, is_latitude=True)
|
|
lon_s = decimal_to_dms(final_lon_deg, is_latitude=False)
|
|
# Check if DMS conversion itself returned an error string
|
|
if (
|
|
"Error" in lat_s
|
|
or "Invalid" in lat_s
|
|
or "Error" in lon_s
|
|
or "Invalid" in lon_s
|
|
):
|
|
logging.warning(
|
|
f"{log_prefix} DMS conversion failed for valid decimal degrees."
|
|
)
|
|
lat_s, lon_s = "Error DMS", "Error DMS"
|
|
else:
|
|
# Log warning if calculated coordinates are out of valid range
|
|
logging.warning(
|
|
f"{log_prefix} Calculated coordinates out of valid range: Lat={final_lat_deg}, Lon={final_lon_deg}"
|
|
)
|
|
lat_s, lon_s = "Invalid Calc", "Invalid Calc"
|
|
except KeyError as ke:
|
|
# Log error if required keys are missing in state
|
|
logging.error(
|
|
f"{log_prefix} Missing key in geo_info state during calculation: {ke}"
|
|
)
|
|
lat_s, lon_s = "Error Key", "Error Key"
|
|
except Exception as calc_e:
|
|
# Log any other unexpected calculation errors
|
|
logging.exception(f"{log_prefix} Geo calculation error:")
|
|
lat_s, lon_s = "Calc Error", "Calc Error"
|
|
# else: Geo info invalid, keep default "N/A" strings
|
|
|
|
# --- Queue Result ---
|
|
# Create the payload tuple (lat_s, lon_s)
|
|
result_payload = (lat_s, lon_s)
|
|
# Queue the result (tuple of strings) for the Tkinter thread
|
|
self.put_mouse_coordinates_queue(result_payload)
|
|
|
|
# Reschedule the processor to run again after a short delay
|
|
if not self.state.shutting_down:
|
|
self._reschedule_queue_processor(
|
|
self.process_mouse_queue, delay=50
|
|
) # Check every 50ms
|
|
|
|
def put_mouse_coordinates_queue(self, coords_tuple: Tuple[str, str]):
|
|
"""Puts processed mouse coords tuple onto Tkinter queue using command structure."""
|
|
log_prefix = "[App Mouse Queue Put]" # Specific prefix for this action
|
|
if self.state.shutting_down:
|
|
return # Don't queue if shutting down
|
|
|
|
# Structure: command, payload
|
|
command = "MOUSE_COORDS"
|
|
payload = coords_tuple # Payload is the tuple (lat_s, lon_s)
|
|
logging.debug(
|
|
f"{log_prefix} Putting command '{command}' with payload {payload} onto tkinter_queue."
|
|
)
|
|
|
|
# Use the put_queue utility for safe queueing with drop counting
|
|
put_queue(
|
|
self.tkinter_queue,
|
|
(command, payload), # Send command and payload as a tuple
|
|
queue_name="tkinter", # Identify queue name for logging/stats
|
|
app_instance=self, # Pass app instance for context
|
|
)
|
|
|
|
# --- Queue Processors ---
|
|
def process_sar_queue(self):
|
|
"""Gets processed SAR image from queue and displays it using DisplayManager."""
|
|
log_prefix = "[App QProc SAR]"
|
|
if self.state.shutting_down:
|
|
return # Stop processing on shutdown
|
|
|
|
image_to_display = None
|
|
try:
|
|
# Get item non-blockingly from the queue
|
|
image_to_display = self.sar_queue.get(block=False)
|
|
# Mark task as done immediately after getting
|
|
self.sar_queue.task_done()
|
|
except queue.Empty:
|
|
pass # Normal case, queue is empty
|
|
except Exception as e:
|
|
# Log error getting from queue
|
|
logging.exception(f"{log_prefix} Error getting from SAR display queue:")
|
|
# Don't process item if get failed
|
|
|
|
# Process item if successfully retrieved
|
|
if image_to_display is not None:
|
|
logging.debug(
|
|
f"{log_prefix} Dequeued SAR image (Shape: {getattr(image_to_display, 'shape', 'N/A')}). Calling DisplayManager..."
|
|
)
|
|
# Check if display manager exists
|
|
if hasattr(self, "display_manager") and self.display_manager:
|
|
try:
|
|
# Call DisplayManager to show the image
|
|
self.display_manager.show_sar_image(image_to_display)
|
|
except Exception as display_e:
|
|
# Log errors during display call
|
|
logging.exception(
|
|
f"{log_prefix} Error calling DisplayManager.show_sar_image:"
|
|
)
|
|
else:
|
|
logging.error(
|
|
f"{log_prefix} DisplayManager not available to show SAR image."
|
|
)
|
|
# else: Item was None (error during get or queue empty)
|
|
|
|
# Reschedule next check always
|
|
if not self.state.shutting_down:
|
|
self._reschedule_queue_processor(
|
|
self.process_sar_queue
|
|
) # Use default delay
|
|
|
|
def process_mfd_queue(self):
|
|
"""Gets processed MFD image from queue and displays it using DisplayManager."""
|
|
log_prefix = "[App QProc MFD]"
|
|
if self.state.shutting_down:
|
|
return # Stop processing on shutdown
|
|
|
|
image_to_display = None
|
|
try:
|
|
# Get item non-blockingly
|
|
image_to_display = self.mfd_queue.get(block=False)
|
|
# Mark task done
|
|
self.mfd_queue.task_done()
|
|
except queue.Empty:
|
|
pass # Normal case
|
|
except Exception as e:
|
|
# Log error getting from queue
|
|
logging.exception(f"{log_prefix} Error getting from MFD display queue:")
|
|
|
|
# Process if item retrieved
|
|
if image_to_display is not None:
|
|
logging.debug(
|
|
f"{log_prefix} Dequeued MFD image (Shape: {getattr(image_to_display, 'shape', 'N/A')}). Calling DisplayManager..."
|
|
)
|
|
# Check if display manager exists
|
|
if hasattr(self, "display_manager") and self.display_manager:
|
|
try:
|
|
# Call DisplayManager to show the image
|
|
self.display_manager.show_mfd_image(image_to_display)
|
|
except Exception as display_e:
|
|
# Log errors during display call
|
|
logging.exception(
|
|
f"{log_prefix} Error calling DisplayManager.show_mfd_image:"
|
|
)
|
|
else:
|
|
logging.error(
|
|
f"{log_prefix} DisplayManager not available to show MFD image."
|
|
)
|
|
|
|
# Reschedule next check always
|
|
if not self.state.shutting_down:
|
|
self._reschedule_queue_processor(
|
|
self.process_mfd_queue
|
|
) # Use default delay
|
|
|
|
def process_tkinter_queue(self):
|
|
"""Processes commands (mouse coords, map updates) from queue to update UI."""
|
|
log_prefix = "[App QProc Tkinter]"
|
|
if self.state.shutting_down:
|
|
return # Stop processing on shutdown
|
|
|
|
item = None
|
|
try:
|
|
# Get item non-blockingly
|
|
item = self.tkinter_queue.get(block=False)
|
|
# Mark task done *after* getting item successfully
|
|
self.tkinter_queue.task_done()
|
|
except queue.Empty:
|
|
pass # Normal case
|
|
except Exception as e:
|
|
# Log error getting from queue
|
|
logging.exception(f"{log_prefix} Error getting from Tkinter queue:")
|
|
# Don't process item if get failed
|
|
|
|
# Process item if successfully retrieved
|
|
if item is not None:
|
|
try:
|
|
# Check if item is the expected command tuple structure
|
|
if isinstance(item, tuple) and len(item) == 2:
|
|
command, payload = item
|
|
logging.debug(
|
|
f"{log_prefix} Dequeued Command:'{command}', Payload Type:{type(payload)}"
|
|
)
|
|
|
|
# Handle different commands
|
|
if command == "MOUSE_COORDS":
|
|
# Update mouse coordinate display in UI
|
|
self._handle_mouse_coords_update(payload)
|
|
elif command == "SHOW_MAP":
|
|
# Update the map display window (delegated to manager)
|
|
self._handle_show_map_update(payload)
|
|
else:
|
|
# Log warning for unknown commands
|
|
logging.warning(
|
|
f"{log_prefix} Unknown command received: {command}"
|
|
)
|
|
# Handle legacy None for mouse coordinates (can be removed if put_mouse_coordinates_queue always sends tuple)
|
|
# elif item is None:
|
|
# self._handle_mouse_coords_update(None)
|
|
else:
|
|
# Log warning for unexpected item types
|
|
logging.warning(
|
|
f"{log_prefix} Dequeued unexpected item type: {type(item)}"
|
|
)
|
|
|
|
except Exception as e:
|
|
# Log error during processing of the dequeued item
|
|
logging.exception(
|
|
f"{log_prefix} Error processing dequeued Tkinter item:"
|
|
)
|
|
# Task already marked done, avoid potential infinite loops on bad data.
|
|
|
|
# Reschedule next check always
|
|
if not self.state.shutting_down:
|
|
# Check Tkinter queue less often than image queues
|
|
self._reschedule_queue_processor(self.process_tkinter_queue, delay=100)
|
|
|
|
def _handle_mouse_coords_update(self, payload: Optional[Tuple[str, str]]):
|
|
"""Updates the mouse coordinates UI label."""
|
|
log_prefix = "[App QProc Tkinter]" # Part of Tkinter queue processing
|
|
lat_s, lon_s = ("N/A", "N/A") # Default values
|
|
|
|
# Check if payload is the expected tuple of strings
|
|
if payload is not None:
|
|
if isinstance(payload, tuple) and len(payload) == 2:
|
|
lat_s, lon_s = payload # Unpack the strings
|
|
else:
|
|
# Log error if payload is not None or the expected tuple format
|
|
logging.warning(
|
|
f"{log_prefix} Invalid payload type/format for MOUSE_COORDS: {type(payload)}"
|
|
)
|
|
lat_s, lon_s = ("Error", "Error") # Indicate error in UI
|
|
|
|
# Safely update the UI label
|
|
try:
|
|
# Check if control panel and the specific method exist
|
|
if hasattr(self, "control_panel") and hasattr(
|
|
self.control_panel, "set_mouse_coordinates"
|
|
):
|
|
# Call the method on the ControlPanel instance
|
|
self.control_panel.set_mouse_coordinates(lat_s, lon_s)
|
|
# else: # Don't log if UI elements not ready, expected during init/shutdown
|
|
except tk.TclError as e:
|
|
if not self.state.shutting_down:
|
|
logging.warning(
|
|
f"{log_prefix} Error updating mouse coords UI (TclError): {e}"
|
|
)
|
|
except Exception as e:
|
|
# Log other unexpected errors during UI update
|
|
logging.warning(f"{log_prefix} Error updating mouse coords UI: {e}")
|
|
|
|
def _handle_show_map_update(self, payload: Optional[Image.Image]):
|
|
"""
|
|
Handles the SHOW_MAP command by delegating display to the MapIntegrationManager.
|
|
|
|
Args:
|
|
payload (Optional[Image.Image]): The PIL Image object (or None) to display.
|
|
"""
|
|
log_prefix = "[App QProc Tkinter]" # Part of Tkinter queue processing
|
|
logging.debug(f"{log_prefix} Processing SHOW_MAP command...")
|
|
# Check if the map manager is active
|
|
if hasattr(self, "map_integration_manager") and self.map_integration_manager:
|
|
try:
|
|
# Delegate the display task to the manager's method
|
|
self.map_integration_manager.display_map(payload)
|
|
except Exception as e:
|
|
# Log errors calling the manager's method
|
|
logging.exception(
|
|
f"{log_prefix} Error calling map_integration_manager.display_map:"
|
|
)
|
|
else:
|
|
# Log warning if command received but map manager is not active
|
|
logging.warning(
|
|
f"{log_prefix} Received SHOW_MAP command but MapIntegrationManager is not active."
|
|
)
|
|
|
|
def _reschedule_queue_processor(self, processor_func, delay: Optional[int] = None):
|
|
"""Helper method to reschedule a queue processor function using root.after."""
|
|
# Calculate default delay if none provided
|
|
if delay is None:
|
|
# Default delay based on target MFD FPS for image queues (run slightly faster than FPS)
|
|
if processor_func in [self.process_sar_queue, self.process_mfd_queue]:
|
|
target_fps = config.MFD_FPS
|
|
# Calculate delay in ms, ensure minimum delay (e.g., 10ms)
|
|
delay = (
|
|
max(10, int(1000 / (target_fps * 1.5))) if target_fps > 0 else 20
|
|
) # Default ~20ms if FPS is 0/invalid
|
|
else:
|
|
# Default delay for other queues (like tkinter, mouse) - check less often
|
|
delay = 100 # ms
|
|
|
|
# Schedule the next call using root.after, checking root existence
|
|
try:
|
|
if self.root and self.root.winfo_exists():
|
|
self.root.after(delay, processor_func)
|
|
# else: Don't warn if root is gone, expected during shutdown
|
|
except Exception as e:
|
|
# Log error only if not shutting down
|
|
if not self.state.shutting_down:
|
|
logging.warning(
|
|
f"[App Rescheduler] Error rescheduling {processor_func.__name__}: {e}"
|
|
)
|
|
|
|
# --- Status Update ---
|
|
def update_status(self):
|
|
"""Updates status bar text and statistics labels periodically."""
|
|
log_prefix = "[App Status Update]"
|
|
# Check shutdown flag and state initialization
|
|
if not hasattr(self, "state") or self.state.shutting_down:
|
|
return
|
|
|
|
# Avoid updating status if initial map load is in progress (status set by map manager)
|
|
map_loading = False
|
|
try:
|
|
# Check if status bar indicates map loading
|
|
if (
|
|
hasattr(self, "statusbar")
|
|
and self.statusbar.winfo_exists()
|
|
and "Loading initial map" in self.statusbar.cget("text")
|
|
):
|
|
map_loading = True
|
|
logging.debug(
|
|
f"{log_prefix} Skipping status update while initial map loads."
|
|
)
|
|
return
|
|
except Exception as status_check_e:
|
|
# Ignore errors checking status bar text
|
|
logging.debug(
|
|
f"{log_prefix} Error checking status bar for map loading message: {status_check_e}"
|
|
)
|
|
|
|
logging.debug(f"{log_prefix} Updating status bar and statistics labels...")
|
|
# Get latest statistics dictionary from AppState (thread-safe read)
|
|
stats = self.state.get_statistics()
|
|
|
|
try:
|
|
# --- Format Status String Components ---
|
|
# Mode
|
|
mode = (
|
|
"Test"
|
|
if self.state.test_mode_active
|
|
else ("Local" if config.USE_LOCAL_IMAGES else "Network")
|
|
)
|
|
# Map Status
|
|
map_active = (
|
|
hasattr(self, "map_integration_manager")
|
|
and self.map_integration_manager is not None
|
|
)
|
|
map_stat = " MapOn" if config.ENABLE_MAP_OVERLAY and map_active else ""
|
|
# FPS (use values directly from state)
|
|
mfd_fps = f"MFD:{self.state.mfd_fps:.1f}fps"
|
|
sar_fps = (
|
|
f"SAR:{self.state.sar_fps:.1f}fps"
|
|
if self.state.sar_fps > 0
|
|
else "SAR:N/A"
|
|
)
|
|
# Mouse Coordinates (get current text from UI label)
|
|
mouse_txt = "Mouse: N/A" # Default
|
|
if (
|
|
hasattr(self.control_panel, "mouse_latlon_label")
|
|
and self.control_panel.mouse_latlon_label.winfo_exists()
|
|
):
|
|
try:
|
|
# Get text directly, no need for full 'Mouse: ...' prefix if label holds it
|
|
mouse_label_text = self.control_panel.mouse_latlon_label.cget(
|
|
"text"
|
|
)
|
|
# Extract coords part if label format is consistent
|
|
if mouse_label_text.startswith("Mouse"):
|
|
mouse_txt = mouse_label_text # Keep full label text
|
|
else: # Assume label only contains coords
|
|
mouse_txt = f"Mouse: {mouse_label_text}"
|
|
except Exception:
|
|
mouse_txt = (
|
|
"Mouse: UI Error" # Fallback on error getting label text
|
|
)
|
|
|
|
# Assemble final status string (first part of status bar)
|
|
status_prefix = (
|
|
f"Status: {mode}{map_stat}" # Use the prefix set by set_status
|
|
)
|
|
status_info = f"{mfd_fps} | {sar_fps} | {mouse_txt}"
|
|
full_status = f"{status_prefix} | {status_info}" # Combine prefix and info
|
|
|
|
# --- Format Statistics Strings ---
|
|
drop_txt = (
|
|
f"Drop(Q): S={stats['dropped_sar_q']},M={stats['dropped_mfd_q']}, "
|
|
f"Tk={stats['dropped_tk_q']},Mo={stats['dropped_mouse_q']}"
|
|
)
|
|
incmpl_txt = (
|
|
f"Incmpl(RX): S={stats['incomplete_sar_rx']},"
|
|
f"M={stats['incomplete_mfd_rx']}"
|
|
)
|
|
|
|
logging.debug(
|
|
f"{log_prefix} Formatted status strings. Status: '{full_status}', Drop: '{drop_txt}', Incmpl: '{incmpl_txt}'"
|
|
)
|
|
|
|
# --- Schedule UI updates on main thread using after_idle ---
|
|
if self.root and self.root.winfo_exists():
|
|
# Update Status Bar
|
|
if hasattr(self, "statusbar") and self.statusbar.winfo_exists():
|
|
# Update the full text using status bar's method
|
|
self.root.after_idle(self.statusbar.set_status_text, full_status)
|
|
# Update Dropped Label
|
|
if (
|
|
hasattr(self.control_panel, "dropped_label")
|
|
and self.control_panel.dropped_label.winfo_exists()
|
|
):
|
|
self.root.after_idle(
|
|
self.control_panel.dropped_label.config, {"text": drop_txt}
|
|
)
|
|
# Update Incomplete Label
|
|
if (
|
|
hasattr(self.control_panel, "incomplete_label")
|
|
and self.control_panel.incomplete_label.winfo_exists()
|
|
):
|
|
self.root.after_idle(
|
|
self.control_panel.incomplete_label.config, {"text": incmpl_txt}
|
|
)
|
|
|
|
except tk.TclError as e:
|
|
# Log TclErrors (widget likely destroyed during update) only if not shutting down
|
|
if not self.state.shutting_down:
|
|
logging.warning(f"{log_prefix} TclError updating status UI: {e}")
|
|
except Exception as e:
|
|
# Log other unexpected errors during formatting/scheduling
|
|
if not self.state.shutting_down:
|
|
logging.exception(f"{log_prefix} Error formatting/updating status UI:")
|
|
|
|
# --- Cleanup ---
|
|
def close_app(self):
|
|
"""Performs graceful shutdown of the application and its components."""
|
|
log_prefix = "[App Shutdown]"
|
|
# Prevent double execution if called multiple times
|
|
if hasattr(self, "state") and self.state.shutting_down:
|
|
logging.warning(
|
|
f"{log_prefix} Close sequence already initiated. Ignoring request."
|
|
)
|
|
return
|
|
# Ensure state exists before setting flag
|
|
if not hasattr(self, "state"):
|
|
logging.error(f"{log_prefix} Cannot initiate shutdown: AppState not found.")
|
|
# Attempt basic exit?
|
|
sys.exit(1)
|
|
|
|
logging.info(
|
|
f"{log_prefix} Close application requested. Starting shutdown sequence..."
|
|
)
|
|
|
|
# 1. Set Shutdown Flag in Central State
|
|
logging.debug(f"{log_prefix} Setting shutdown flag to True in AppState.")
|
|
self.state.shutting_down = True
|
|
# Attempt to update status bar (might fail if UI closing)
|
|
try:
|
|
# Use set_status for thread safety if called from non-main thread
|
|
self.set_status("Closing...")
|
|
except Exception:
|
|
pass # Ignore errors setting status during shutdown
|
|
|
|
# 2. Stop Test Mode Timers (via manager)
|
|
logging.debug(f"{log_prefix} Stopping TestModeManager timers...")
|
|
if hasattr(self, "test_mode_manager") and self.test_mode_manager:
|
|
self.test_mode_manager.stop_timers()
|
|
else:
|
|
logging.debug(
|
|
f"{log_prefix} TestModeManager not found or already cleaned up."
|
|
)
|
|
|
|
# 3. Stop Map Integration (if active)
|
|
logging.debug(f"{log_prefix} Shutting down MapIntegrationManager...")
|
|
if hasattr(self, "map_integration_manager") and self.map_integration_manager:
|
|
self.map_integration_manager.shutdown() # Closes map window
|
|
else:
|
|
logging.debug(
|
|
f"{log_prefix} MapIntegrationManager not found or not active."
|
|
)
|
|
|
|
# 4. Stop Periodic UI Updates & Queue Processors
|
|
# These check the self.state.shutting_down flag internally, no explicit stop needed,
|
|
# but log the intent.
|
|
logging.debug(
|
|
f"{log_prefix} Signalling periodic status updates and queue processors to stop."
|
|
)
|
|
|
|
# 5. Close UDP Socket
|
|
logging.debug(f"{log_prefix} Closing UDP socket...")
|
|
if self.udp_socket:
|
|
close_udp_socket(self.udp_socket) # Use utility function
|
|
self.udp_socket = None # Clear reference
|
|
logging.debug(f"{log_prefix} UDP socket closed.")
|
|
else:
|
|
logging.debug(f"{log_prefix} UDP socket was not open or already closed.")
|
|
|
|
# 6. Wait briefly for UDP receiver thread to exit (it checks shutdown flag)
|
|
if self.udp_thread and self.udp_thread.is_alive():
|
|
logging.debug(f"{log_prefix} Waiting up to 0.5s for UDP receiver thread...")
|
|
self.udp_thread.join(timeout=0.5)
|
|
if self.udp_thread.is_alive():
|
|
logging.warning(
|
|
f"{log_prefix} UDP receiver thread did not exit cleanly after 0.5s."
|
|
)
|
|
else:
|
|
logging.debug(f"{log_prefix} UDP receiver thread exited.")
|
|
elif self.udp_thread:
|
|
logging.debug(
|
|
f"{log_prefix} UDP receiver thread reference exists but thread already finished."
|
|
)
|
|
else:
|
|
logging.debug(f"{log_prefix} No UDP receiver thread instance found.")
|
|
|
|
# 7. Shutdown Executor Pool from Receiver (if receiver exists)
|
|
worker_pool = None
|
|
if hasattr(self, "udp_receiver") and self.udp_receiver:
|
|
# Access executor attribute safely
|
|
worker_pool = getattr(self.udp_receiver, "executor", None)
|
|
|
|
if worker_pool:
|
|
logging.info(f"{log_prefix} Shutting down worker pool (non-blocking)...")
|
|
try:
|
|
# Shutdown gracefully, don't wait indefinitely, don't cancel running tasks
|
|
worker_pool.shutdown(wait=False, cancel_futures=False)
|
|
logging.debug(f"{log_prefix} Worker pool shutdown initiated.")
|
|
# Note: Tasks within the pool should ideally check the shutting_down flag too.
|
|
except Exception as e:
|
|
logging.exception(
|
|
f"{log_prefix} Exception during worker_pool shutdown: {e}"
|
|
)
|
|
else:
|
|
logging.debug(
|
|
f"{log_prefix} Worker pool not found or not initialized in UdpReceiver."
|
|
)
|
|
|
|
# 8. Destroy Display Windows (MFD, SAR) via DisplayManager
|
|
logging.debug(
|
|
f"{log_prefix} Destroying MFD/SAR display windows via DisplayManager..."
|
|
)
|
|
if hasattr(self, "display_manager"):
|
|
self.display_manager.destroy_windows() # Handles internal logging and checks
|
|
else:
|
|
logging.debug(
|
|
f"{log_prefix} DisplayManager not found or already cleaned up."
|
|
)
|
|
|
|
# 9. Brief wait for OpenCV events processing (belt-and-braces)
|
|
try:
|
|
logging.debug(
|
|
f"{log_prefix} Calling cv2.waitKey(5) for final OpenCV cleanup..."
|
|
)
|
|
cv2.waitKey(5)
|
|
except Exception as e:
|
|
logging.warning(f"{log_prefix} Error during final cv2.waitKey: {e}")
|
|
|
|
# 10. Destroy Tkinter Root Window (last step)
|
|
logging.info(f"{log_prefix} Requesting Tkinter root window destruction...")
|
|
try:
|
|
if self.root and self.root.winfo_exists():
|
|
self.root.destroy()
|
|
logging.info(f"{log_prefix} Tkinter root window destroyed.")
|
|
except tk.TclError as e:
|
|
# Expected if window closed manually before app shutdown logic completed
|
|
logging.warning(
|
|
f"{log_prefix} Error destroying Tkinter window (likely closed already): {e}"
|
|
)
|
|
except Exception as e:
|
|
# Log other unexpected errors during root window destruction
|
|
logging.exception(
|
|
f"{log_prefix} Unexpected error destroying Tkinter window:"
|
|
)
|
|
|
|
# --- Final Log Messages ---
|
|
logging.info("-----------------------------------------")
|
|
logging.info(f"{log_prefix} Application close sequence finished.")
|
|
logging.info("-----------------------------------------")
|
|
|
|
# Exit the application cleanly
|
|
# Use sys.exit(0) for normal exit
|
|
sys.exit(0)
|
|
|
|
|
|
# --- Main Execution Block ---
|
|
if __name__ == "__main__":
|
|
main_log_prefix = "[App Main]"
|
|
root = None # Tkinter root window instance
|
|
app = None # App instance
|
|
|
|
try:
|
|
# --- Pre-Initialization Checks ---
|
|
# Check Map Module Dependencies *before* creating Tkinter window if map enabled
|
|
if config.ENABLE_MAP_OVERLAY and not MAP_MODULES_LOADED:
|
|
# Log critical error and exit if map enabled but components missing
|
|
logging.critical(
|
|
f"{main_log_prefix} Map Overlay is enabled in config "
|
|
"but required map modules failed to load. Cannot start."
|
|
)
|
|
sys.exit(1) # Exit with error code
|
|
|
|
# --- Application Setup ---
|
|
logging.debug(f"{main_log_prefix} Creating main Tkinter window...")
|
|
# Create the root window using the UI utility function
|
|
root = create_main_window(
|
|
title="Control Panel v1.1", # Updated version maybe?
|
|
min_width=config.TKINTER_MIN_WIDTH,
|
|
min_height=config.TKINTER_MIN_HEIGHT,
|
|
x_pos=10, # Initial position, App.__init__ recalculates and sets geometry
|
|
y_pos=10,
|
|
)
|
|
logging.debug(f"{main_log_prefix} Main Tkinter window created.")
|
|
|
|
logging.debug(f"{main_log_prefix} Initializing App class...")
|
|
# Initialize the main application logic, passing the root window
|
|
app = App(root)
|
|
logging.debug(f"{main_log_prefix} App class initialized.")
|
|
|
|
# Set the close button behaviour (calls app.close_app for graceful shutdown)
|
|
root.protocol("WM_DELETE_WINDOW", app.close_app)
|
|
logging.debug(
|
|
f"{main_log_prefix} WM_DELETE_WINDOW protocol set to call app.close_app."
|
|
)
|
|
|
|
# --- Start Event Loop ---
|
|
logging.info(
|
|
f"{main_log_prefix} Starting Tkinter main event loop (root.mainloop())..."
|
|
)
|
|
# This blocks until the root window is destroyed
|
|
root.mainloop()
|
|
# Code here is reached only after mainloop ends (typically via root.destroy() in close_app)
|
|
logging.info(f"{main_log_prefix} Tkinter main event loop finished.")
|
|
|
|
except SystemExit as exit_e:
|
|
# Catch sys.exit() calls for clean shutdown messages
|
|
if exit_e.code == 0:
|
|
# Normal exit initiated by close_app
|
|
logging.info(
|
|
f"{main_log_prefix} Application exited normally (sys.exit code 0)."
|
|
)
|
|
else:
|
|
# Exit due to error (e.g., failed pre-check)
|
|
logging.warning(
|
|
f"{main_log_prefix} Application exited with error code {exit_e.code}."
|
|
)
|
|
except Exception as e:
|
|
# Catch any other unhandled exceptions during setup or mainloop run
|
|
logging.critical(
|
|
f"{main_log_prefix} UNHANDLED EXCEPTION OCCURRED:", exc_info=True
|
|
)
|
|
# Attempt emergency cleanup only if app instance exists and wasn't already shutting down
|
|
if app and hasattr(app, "state") and not app.state.shutting_down:
|
|
logging.error(
|
|
f"{main_log_prefix} Attempting emergency cleanup due to unhandled exception..."
|
|
)
|
|
try:
|
|
# Call close directly, don't rely on mainloop anymore
|
|
app.close_app()
|
|
except SystemExit:
|
|
# close_app initiated the exit, this is expected here
|
|
pass
|
|
except Exception as cleanup_e:
|
|
# Log any error during the emergency cleanup attempt
|
|
logging.exception(
|
|
f"{main_log_prefix} Error during emergency cleanup: {cleanup_e}"
|
|
)
|
|
# Exit with error code after attempting cleanup or if app never initialized
|
|
sys.exit(1)
|
|
finally:
|
|
# This block executes after try/except, even after sys.exit() is called
|
|
logging.info(f"{main_log_prefix} Application finally block reached.")
|
|
# Final OpenCV cleanup attempt (belt-and-braces, may not be necessary if managers clean up)
|
|
logging.debug(
|
|
f"{main_log_prefix} Final check: Destroying any remaining OpenCV windows..."
|
|
)
|
|
try:
|
|
# This might destroy windows not managed by DisplayManager/MapIntegrationManager if any exist
|
|
cv2.destroyAllWindows()
|
|
except Exception as cv_err:
|
|
# Warning is sufficient here, main cleanup happened in close_app/managers
|
|
logging.warning(
|
|
f"{main_log_prefix} Exception during final cv2.destroyAllWindows(): {cv_err}"
|
|
)
|
|
|
|
logging.info("================ Application End ================")
|
|
# Ensure all log messages are flushed before exit
|
|
logging.shutdown()
|