416 lines
17 KiB
Python
416 lines
17 KiB
Python
# --- START OF FILE app_state.py ---
|
|
|
|
# app_state.py
|
|
"""
|
|
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
|
|
|
|
Centralized state management for the Control Panel application.
|
|
|
|
Holds shared application state, including SAR/MFD parameters, georeferencing info,
|
|
current image data, LUTs, test mode status, statistics, and map display state.
|
|
Provides methods for controlled updates where necessary.
|
|
"""
|
|
|
|
# Standard library imports
|
|
import logging
|
|
import threading
|
|
import time
|
|
import math
|
|
from typing import Optional, Dict, Any, Tuple, List
|
|
|
|
# Third-party imports
|
|
import numpy as np
|
|
|
|
try:
|
|
from PIL import Image # Needed for map image state typing
|
|
except ImportError:
|
|
Image = None # Define as None if Pillow not installed
|
|
|
|
# Local application imports
|
|
import config # For default values
|
|
|
|
|
|
class AppState:
|
|
"""Class to hold and manage the application's shared state."""
|
|
|
|
def __init__(self):
|
|
"""Initializes the application state with default values."""
|
|
log_prefix = "[AppState Init]"
|
|
logging.debug(f"{log_prefix} Initializing application state...")
|
|
|
|
# --- General Application State ---
|
|
self.shutting_down: bool = False
|
|
|
|
# --- SAR Display Parameters ---
|
|
self.sar_contrast: float = config.DEFAULT_SAR_CONTRAST
|
|
self.sar_brightness: int = config.DEFAULT_SAR_BRIGHTNESS
|
|
self.sar_palette: str = config.DEFAULT_SAR_PALETTE
|
|
self.sar_display_width: int = config.INITIAL_SAR_WIDTH
|
|
self.sar_display_height: int = config.INITIAL_SAR_HEIGHT
|
|
|
|
# --- SAR Data & Geo-referencing ---
|
|
self.current_sar_normalized: Optional[np.ndarray] = np.zeros(
|
|
(config.SAR_HEIGHT, config.SAR_WIDTH), dtype=np.uint8
|
|
)
|
|
self.current_sar_geo_info: Dict[str, Any] = self._initialize_geo_info()
|
|
self.brightness_contrast_lut: Optional[np.ndarray] = np.arange(
|
|
256, dtype=np.uint8
|
|
)
|
|
|
|
# --- MFD Display Parameters & Data ---
|
|
self.mfd_params: Dict[str, Any] = self._initialize_mfd_params()
|
|
self.mfd_lut: np.ndarray = np.zeros((256, 3), dtype=np.uint8)
|
|
self.current_mfd_indices: Optional[np.ndarray] = np.zeros(
|
|
(config.MFD_HEIGHT, config.MFD_WIDTH), dtype=np.uint8
|
|
)
|
|
|
|
# --- Test Mode State ---
|
|
self.test_mode_active: bool = config.ENABLE_TEST_MODE
|
|
self.test_mfd_image_indices: Optional[np.ndarray] = None
|
|
self.test_sar_image_raw: Optional[np.ndarray] = None
|
|
self.local_mfd_image_data_indices: Optional[np.ndarray] = None
|
|
self.local_sar_image_data_raw: Optional[np.ndarray] = None
|
|
|
|
# --- Network State ---
|
|
# (Potentially add network status indicators here if needed)
|
|
|
|
# --- Performance / Statistics ---
|
|
self._stats_lock = threading.Lock()
|
|
self.mfd_fps: float = 0.0
|
|
self.sar_fps: float = 0.0
|
|
self.mfd_frame_count: int = 0
|
|
self.sar_frame_count: int = 0
|
|
self.mfd_start_time: float = time.time()
|
|
self.sar_update_time: float = time.time()
|
|
self._dropped_sar_queue: int = 0
|
|
self._dropped_mfd_queue: int = 0
|
|
self._dropped_tkinter_queue: int = 0
|
|
self._dropped_mouse_queue: int = 0
|
|
self._incomplete_sar_rx: int = 0
|
|
self._incomplete_mfd_rx: int = 0
|
|
|
|
# --- Map Related State ---
|
|
# Stores the scaling factor derived from the MAP_SIZE_FACTORS combobox
|
|
# e.g., 1.0 for "1:1", 2.0 for "2:1", 0.5 for "1:2"
|
|
self.map_display_scale_factor: float = self._parse_map_scale_factor(
|
|
config.DEFAULT_MAP_SIZE
|
|
)
|
|
# Store the last successfully generated PIL map image (before final display scaling)
|
|
# Used for quick redraws when only scale factor changes.
|
|
self.last_map_image_pil: Optional[Image.Image] = None # Pillow Image or None
|
|
|
|
# --- UI Related State ---
|
|
self.last_mouse_update_time: float = 0.0
|
|
|
|
logging.debug(f"{log_prefix} Application state initialized.")
|
|
|
|
# --- NEW HELPER METHOD ---
|
|
def _parse_map_scale_factor(self, factor_str: str) -> float:
|
|
"""Parses map size string ("N:1" or "1:N") into a float factor."""
|
|
log_prefix = "[AppState Helper]" # Use different prefix for clarity
|
|
try:
|
|
if not isinstance(factor_str, str) or ":" not in factor_str:
|
|
raise ValueError("Invalid format, must be string with ':'")
|
|
|
|
parts = factor_str.split(":")
|
|
if len(parts) != 2:
|
|
raise ValueError("Invalid format, must have two parts separated by ':'")
|
|
|
|
val1_str, val2_str = parts[0].strip(), parts[1].strip()
|
|
val1 = int(val1_str)
|
|
val2 = int(val2_str)
|
|
|
|
if val1 <= 0 or val2 <= 0:
|
|
raise ValueError("Values must be positive integers")
|
|
|
|
if val1 == 1 and val2 == 1:
|
|
scale = 1.0
|
|
elif val2 == 1: # Format "N:1" -> Multiply
|
|
scale = float(val1)
|
|
elif val1 == 1: # Format "1:N" -> Divide
|
|
scale = 1.0 / float(val2)
|
|
else:
|
|
# Handles cases like "2:3" which are not supported by the defined factors
|
|
raise ValueError("Invalid format combination, must be 'N:1' or '1:N'")
|
|
|
|
logging.debug(
|
|
f"{log_prefix} Parsed map scale factor '{factor_str}' to {scale:.4f}"
|
|
)
|
|
return scale
|
|
|
|
except (ValueError, IndexError, TypeError) as e:
|
|
logging.error(
|
|
f"{log_prefix} Error parsing map scale factor '{factor_str}': {e}. Using 1.0."
|
|
)
|
|
return 1.0 # Default fallback
|
|
|
|
def _initialize_mfd_params(self) -> Dict[str, Any]:
|
|
"""Initializes the dictionary holding MFD visualization parameters."""
|
|
log_prefix = "[AppState MFD Params]"
|
|
logging.debug(f"{log_prefix} Initializing MFD parameter dictionary.")
|
|
# ... (implementation remains the same) ...
|
|
params = {
|
|
"categories": {
|
|
"Occlusion": {
|
|
"color": (0, 0, 0),
|
|
"intensity": config.DEFAULT_MFD_INTENSITY,
|
|
"pixels": [0, 1],
|
|
},
|
|
"Cat A": {
|
|
"color": (255, 255, 255),
|
|
"intensity": config.DEFAULT_MFD_INTENSITY,
|
|
"pixels": [2],
|
|
},
|
|
"Cat B": {
|
|
"color": (255, 255, 255),
|
|
"intensity": config.DEFAULT_MFD_INTENSITY,
|
|
"pixels": [3, 18],
|
|
},
|
|
"Cat C": {
|
|
"color": (255, 255, 255),
|
|
"intensity": config.DEFAULT_MFD_INTENSITY,
|
|
"pixels": [4, 5, 6, 16],
|
|
},
|
|
"Cat C1": {
|
|
"color": (255, 255, 255),
|
|
"intensity": config.DEFAULT_MFD_INTENSITY,
|
|
"pixels": [7, 8, 9],
|
|
},
|
|
"Cat C2": {
|
|
"color": (255, 255, 255),
|
|
"intensity": config.DEFAULT_MFD_INTENSITY,
|
|
"pixels": [10, 11, 12],
|
|
},
|
|
"Cat C3": {
|
|
"color": (255, 255, 255),
|
|
"intensity": config.DEFAULT_MFD_INTENSITY,
|
|
"pixels": [13, 14, 15],
|
|
},
|
|
"Reserved": {
|
|
"color": (0, 0, 0),
|
|
"intensity": config.DEFAULT_MFD_INTENSITY,
|
|
"pixels": list(range(17, 32)),
|
|
},
|
|
},
|
|
"raw_map_intensity": config.DEFAULT_MFD_RAW_MAP_INTENSITY,
|
|
}
|
|
params["pixel_to_category"] = {
|
|
pixel_index: category_name
|
|
for category_name, data in params["categories"].items()
|
|
for pixel_index in data["pixels"]
|
|
}
|
|
logging.debug(f"{log_prefix} MFD parameters initialized.")
|
|
return params
|
|
|
|
def _initialize_geo_info(self) -> Dict[str, Any]:
|
|
"""Initializes the SAR geo-referencing dictionary with default values."""
|
|
log_prefix = "[AppState Geo Init]"
|
|
logging.debug(f"{log_prefix} Initializing default SAR GeoInfo.")
|
|
# ... (implementation remains the same) ...
|
|
sar_width_px = config.SAR_WIDTH
|
|
sar_height_px = config.SAR_HEIGHT
|
|
sar_center_lat_deg = config.SAR_CENTER_LAT
|
|
sar_center_lon_deg = config.SAR_CENTER_LON
|
|
sar_size_km = config.SAR_IMAGE_SIZE_KM # Used only if lat/lon != 0
|
|
|
|
scale_x_val = (sar_size_km * 1000.0) / sar_width_px if sar_width_px > 0 else 0.0
|
|
scale_y_val = (
|
|
(sar_size_km * 1000.0) / sar_height_px if sar_height_px > 0 else 0.0
|
|
)
|
|
|
|
geo_info = {
|
|
"lat": math.radians(sar_center_lat_deg), # RADIANS
|
|
"lon": math.radians(sar_center_lon_deg), # RADIANS
|
|
"ref_x": sar_width_px // 2 if sar_width_px > 0 else 0,
|
|
"ref_y": sar_height_px // 2 if sar_height_px > 0 else 0,
|
|
"scale_x": scale_x_val, # Might be 0 if default size is 0
|
|
"scale_y": scale_y_val, # Might be 0 if default size is 0
|
|
"width_px": sar_width_px,
|
|
"height_px": sar_height_px,
|
|
"orientation": 0.0, # RADIANS
|
|
# Valid is False by default, receiver sets to True when valid data arrives
|
|
# OR MapIntegrationManager might set it True if using default non-zero lat/lon for initial map.
|
|
"valid": False,
|
|
}
|
|
logging.debug(f"{log_prefix} Default SAR GeoInfo initialized.")
|
|
return geo_info
|
|
|
|
# --- Methods for Safe Stat Updates/Reads ---
|
|
# ... (increment_dropped_count, increment_incomplete_rx_count, get_statistics, reset_statistics remain the same) ...
|
|
def increment_dropped_count(self, queue_name: str):
|
|
"""Safely increments the drop counter for the specified queue."""
|
|
with self._stats_lock:
|
|
if queue_name == "sar":
|
|
self._dropped_sar_queue += 1
|
|
elif queue_name == "mfd":
|
|
self._dropped_mfd_queue += 1
|
|
elif queue_name == "tkinter":
|
|
self._dropped_tkinter_queue += 1
|
|
elif queue_name == "mouse":
|
|
self._dropped_mouse_queue += 1
|
|
else:
|
|
logging.warning(
|
|
f"[AppState] Attempted to increment drop count for unknown queue: {queue_name}"
|
|
)
|
|
|
|
def increment_incomplete_rx_count(self, image_type: str):
|
|
"""Safely increments the incomplete counter for SAR or MFD."""
|
|
with self._stats_lock:
|
|
if image_type == "sar":
|
|
self._incomplete_sar_rx += 1
|
|
elif image_type == "mfd":
|
|
self._incomplete_mfd_rx += 1
|
|
else:
|
|
logging.warning(
|
|
f"[AppState] Attempted to increment incomplete count for unknown type: {image_type}"
|
|
)
|
|
|
|
def get_statistics(self) -> Dict[str, int]:
|
|
"""Returns a dictionary containing the current statistics (thread-safe read)."""
|
|
with self._stats_lock:
|
|
stats = {
|
|
"dropped_sar_q": self._dropped_sar_queue,
|
|
"dropped_mfd_q": self._dropped_mfd_queue,
|
|
"dropped_tk_q": self._dropped_tkinter_queue,
|
|
"dropped_mouse_q": self._dropped_mouse_queue,
|
|
"incomplete_sar_rx": self._incomplete_sar_rx,
|
|
"incomplete_mfd_rx": self._incomplete_mfd_rx,
|
|
}
|
|
return stats
|
|
|
|
def reset_statistics(self):
|
|
"""Resets all statistic counters to zero (thread-safe write)."""
|
|
log_prefix = "[AppState Stats]"
|
|
logging.info(f"{log_prefix} Resetting statistic counters.")
|
|
with self._stats_lock:
|
|
self._dropped_sar_queue = 0
|
|
self._dropped_mfd_queue = 0
|
|
self._dropped_tkinter_queue = 0
|
|
self._dropped_mouse_queue = 0
|
|
self._incomplete_sar_rx = 0
|
|
self._incomplete_mfd_rx = 0
|
|
self.mfd_fps = 0.0
|
|
self.sar_fps = 0.0
|
|
self.mfd_frame_count = 0
|
|
self.sar_frame_count = 0
|
|
self.mfd_start_time = time.time()
|
|
self.sar_update_time = time.time()
|
|
logging.debug(f"{log_prefix} Statistic counters reset.")
|
|
|
|
# --- Methods for state updates ---
|
|
|
|
# --- NEW METHOD ---
|
|
def update_map_scale_factor(self, factor_str: str):
|
|
"""
|
|
Safely updates the map display scale factor based on the UI string.
|
|
Parses the string and stores the resulting float factor.
|
|
"""
|
|
log_prefix = "[AppState Update]"
|
|
try:
|
|
new_scale = self._parse_map_scale_factor(factor_str)
|
|
# Update only if the value actually changed
|
|
if (
|
|
abs(new_scale - self.map_display_scale_factor) > 1e-6
|
|
): # Use tolerance for float comparison
|
|
self.map_display_scale_factor = new_scale
|
|
logging.info(
|
|
f"{log_prefix} Map display scale factor updated to {new_scale:.4f} (from '{factor_str}')"
|
|
)
|
|
else:
|
|
logging.debug(
|
|
f"{log_prefix} Map display scale factor already {new_scale:.4f}."
|
|
)
|
|
except Exception as e:
|
|
# Log errors during update attempt
|
|
logging.error(
|
|
f"{log_prefix} Failed to update map scale factor from string '{factor_str}': {e}"
|
|
)
|
|
|
|
def update_sar_parameters(
|
|
self,
|
|
contrast: Optional[float] = None,
|
|
brightness: Optional[int] = None,
|
|
palette: Optional[str] = None,
|
|
):
|
|
"""Updates SAR display parameters."""
|
|
log_prefix = "[AppState Update]"
|
|
updated = False
|
|
if contrast is not None and abs(contrast - self.sar_contrast) > 1e-6:
|
|
self.sar_contrast = contrast
|
|
logging.debug(f"{log_prefix} SAR contrast updated to {contrast:.3f}")
|
|
updated = True
|
|
if brightness is not None and brightness != self.sar_brightness:
|
|
self.sar_brightness = brightness
|
|
logging.debug(f"{log_prefix} SAR brightness updated to {brightness}")
|
|
updated = True
|
|
if palette is not None and palette != self.sar_palette:
|
|
self.sar_palette = palette
|
|
logging.debug(f"{log_prefix} SAR palette updated to {palette}")
|
|
updated = True
|
|
if not updated:
|
|
logging.debug(
|
|
f"{log_prefix} SAR parameters update called but no values changed."
|
|
)
|
|
|
|
def update_sar_display_size(self, width: int, height: int):
|
|
"""Updates SAR display dimensions."""
|
|
log_prefix = "[AppState Update]"
|
|
if width > 0 and height > 0:
|
|
if width != self.sar_display_width or height != self.sar_display_height:
|
|
self.sar_display_width = width
|
|
self.sar_display_height = height
|
|
logging.info(
|
|
f"{log_prefix} SAR display size updated to {width}x{height}"
|
|
)
|
|
else:
|
|
logging.debug(
|
|
f"{log_prefix} SAR display size already {width}x{height}."
|
|
)
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Attempted to set invalid SAR display size: {width}x{height}"
|
|
)
|
|
|
|
def set_sar_data(self, normalized_image: np.ndarray, geo_info: Dict[str, Any]):
|
|
"""Updates the current normalized SAR image and geo info."""
|
|
log_prefix = "[AppState Update]"
|
|
# (Implementation remains the same)
|
|
if normalized_image is not None and isinstance(normalized_image, np.ndarray):
|
|
self.current_sar_normalized = normalized_image
|
|
logging.debug(
|
|
f"{log_prefix} Updated current_sar_normalized (Shape: {normalized_image.shape})"
|
|
)
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Invalid normalized SAR image provided for update."
|
|
)
|
|
|
|
if geo_info is not None and isinstance(geo_info, dict):
|
|
self.current_sar_geo_info = geo_info
|
|
logging.debug(
|
|
f"{log_prefix} Updated current_sar_geo_info (Valid: {geo_info.get('valid', 'N/A')})"
|
|
)
|
|
else:
|
|
logging.warning(f"{log_prefix} Invalid geo_info provided for update.")
|
|
self.current_sar_geo_info = (
|
|
self._initialize_geo_info()
|
|
) # Re-init with defaults
|
|
self.current_sar_geo_info["valid"] = False # Ensure invalid
|
|
|
|
def set_mfd_indices(self, indices_image: np.ndarray):
|
|
"""Updates the current MFD indices image."""
|
|
log_prefix = "[AppState Update]"
|
|
# (Implementation remains the same)
|
|
if indices_image is not None and isinstance(indices_image, np.ndarray):
|
|
self.current_mfd_indices = indices_image
|
|
logging.debug(
|
|
f"{log_prefix} Updated current_mfd_indices (Shape: {indices_image.shape})"
|
|
)
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Invalid indices image provided for MFD update."
|
|
)
|
|
|
|
|
|
# --- END OF FILE app_state.py ---
|