SXXXXXXX_ControlPanel/controlpanel/app_state.py
2025-05-06 10:32:03 +02:00

516 lines
23 KiB
Python

# --- START OF FILE app_state.py ---
# app_state.py
"""
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED.
Centralized state management for the Control Panel application.
Holds shared application state, including SAR/MFD parameters, georeferencing info,
current image data, LUTs, test mode status, statistics, SAR overlay shift,
map display state, and cached data for efficient map recomposition. Provides methods
for controlled updates where necessary.
"""
# Standard library imports
import logging
import threading
import time
import math
from typing import Optional, Dict, Any, Tuple, List
# Third-party imports
import numpy as np
# Try to import PIL for type hinting the image attributes
try:
from PIL import Image
# Define ImageType for convenience if PIL is available
ImageType = Image.Image
except ImportError:
# Define ImageType as None if PIL is not available, for type hinting compatibility
ImageType = None # type: ignore
# Local application imports
import config # For default values
class AppState:
"""Class to hold and manage the application's shared state."""
def __init__(self):
"""Initializes the application state with default values."""
log_prefix = "[AppState Init]"
logging.debug(f"{log_prefix} Initializing application state...")
# --- General Application State ---
self.shutting_down: bool = False
# --- SAR Display Parameters ---
self.sar_contrast: float = config.DEFAULT_SAR_CONTRAST
self.sar_brightness: int = config.DEFAULT_SAR_BRIGHTNESS
self.sar_palette: str = config.DEFAULT_SAR_PALETTE
self.sar_display_width: int = config.INITIAL_SAR_WIDTH
self.sar_display_height: int = config.INITIAL_SAR_HEIGHT
# --- SAR Recording State ---
self.sar_recording_enabled: bool = config.DEFAULT_SAR_RECORDING_ENABLED
# --- SAR Data & Geo-referencing ---
# Note: current_sar_normalized is for DISPLAY. Recorder needs RAW data.
self.current_sar_normalized: Optional[np.ndarray] = np.zeros(
(config.SAR_HEIGHT, config.SAR_WIDTH), dtype=np.uint8
)
# Stores the latest received GeoInfo (reference point, scale, orientation) in RADIANS
self.current_sar_geo_info: Dict[str, Any] = self._initialize_geo_info()
# Brightness/Contrast Look-Up Table for SAR display processing
self.brightness_contrast_lut: Optional[np.ndarray] = np.arange(
256, dtype=np.uint8
)
# --- SAR Overlay Shift State ---
# User-defined shift (in degrees) to apply to SAR overlay position on the map
self.sar_lat_shift_deg: float = 0.0 # Latitude shift
self.sar_lon_shift_deg: float = 0.0 # Longitude shift
# --- SAR Overlay Cache for Recomposition ---
# Last SAR image after B/C, Palette (uint8 BGR or GRAY), before warping.
# Used for fast recomposition when only alpha changes.
self.last_processed_sar_for_overlay: Optional[np.ndarray] = None
# Last calculated perspective transformation matrix for the SAR overlay
self.last_sar_warp_matrix: Optional[np.ndarray] = None
# Last calculated SAR corner pixel coordinates on the *stitched* map image
self.last_sar_corners_pixels_map: Optional[List[Tuple[int, int]]] = None
# --- MFD Display Parameters & Data ---
# Dictionary holding MFD category definitions (colors, intensities, pixel indices)
self.mfd_params: Dict[str, Any] = self._initialize_mfd_params()
# MFD Look-Up Table (maps index values to BGR colors)
self.mfd_lut: np.ndarray = np.zeros((256, 3), dtype=np.uint8)
# Current MFD image represented as an array of index values (uint8)
self.current_mfd_indices: Optional[np.ndarray] = np.zeros(
(config.MFD_HEIGHT, config.MFD_WIDTH), dtype=np.uint8
)
# --- Test Mode State ---
# Flag indicating if test mode is active
self.test_mode_active: bool = config.ENABLE_TEST_MODE
# Pre-generated MFD index image used in test mode
self.test_mfd_image_indices: Optional[np.ndarray] = None
# Pre-generated raw SAR image data used in test mode
self.test_sar_image_raw: Optional[np.ndarray] = None
# MFD index data loaded for local mode (if USE_LOCAL_IMAGES is True)
self.local_mfd_image_data_indices: Optional[np.ndarray] = None
# Raw SAR data loaded for local mode (if USE_LOCAL_IMAGES is True)
self.local_sar_image_data_raw: Optional[np.ndarray] = None
# --- Network State ---
# (Potentially add network status indicators here if needed)
# --- Performance / Statistics ---
self._stats_lock = threading.Lock() # Lock for thread-safe access to statistics
self.mfd_fps: float = 0.0 # Calculated MFD frames per second
self.sar_fps: float = 0.0 # Calculated SAR frames per second
self.mfd_frame_count: int = 0 # Frame counter for MFD FPS calculation
self.sar_frame_count: int = 0 # Frame counter for SAR FPS calculation
self.mfd_start_time: float = time.time() # Start time for MFD FPS interval
self.sar_update_time: float = (
time.time()
) # Last update time for SAR FPS calculation
self._dropped_sar_queue: int = 0 # Counter for dropped SAR display queue items
self._dropped_mfd_queue: int = 0 # Counter for dropped MFD display queue items
self._dropped_tkinter_queue: int = 0 # Counter for dropped Tkinter queue items
self._dropped_mouse_queue: int = 0 # Counter for dropped raw mouse queue items
self._incomplete_sar_rx: int = 0 # Counter for incomplete SAR image receptions
self._incomplete_mfd_rx: int = 0 # Counter for incomplete MFD image receptions
# --- Map Related State ---
# Scaling factor applied to the displayed map window relative to stitched image size
self.map_display_scale_factor: float = self._parse_map_scale_factor(
config.DEFAULT_MAP_SIZE
)
# Last downloaded and stitched BASE map image (PIL format, before overlay/scalebar)
self.last_map_image_pil: Optional[ImageType] = None
# Flag indicating if SAR overlay should be drawn on the map
self.map_sar_overlay_enabled: bool = config.DEFAULT_MAP_SAR_OVERLAY_ENABLED
# Alpha transparency value for the SAR overlay (0.0=transparent, 1.0=opaque)
self.map_sar_overlay_alpha: float = max(
0.0, min(1.0, config.DEFAULT_MAP_SAR_OVERLAY_ALPHA)
)
# Last displayed map image *after* composition (including overlay, scale bar, etc.)
# Used for the "Save Map View" feature.
self.last_composed_map_pil: Optional[ImageType] = None
# Information about the currently displayed map, needed for geo coordinate conversion
# Updated by MapIntegrationManager whenever the map view changes.
self.map_current_bounds_deg: Optional[Tuple[float, float, float, float]] = (
None # (west, south, east, north)
)
self.map_current_zoom: Optional[int] = None # Zoom level
self.map_current_shape_px: Optional[Tuple[int, int]] = (
None # (height, width) of displayed map window
)
# --- UI Related State ---
# Last time the SAR mouse coordinates were processed (for rate limiting)
self.last_mouse_update_time: float = 0.0
# Last time the Map mouse coordinates were processed (for rate limiting)
self.last_map_mouse_update_time: float = 0.0
# Last click pixel coordinates on the displayed SAR window (x, y)
self.last_sar_click_coords: Optional[Tuple[int, int]] = None
# Last click pixel coordinates on the displayed Map window (x, y)
self.last_map_click_coords: Optional[Tuple[int, int]] = None
# Flag to control SAR metadata display in UI
self.display_sar_metadata: bool = False
# Last raw SAR metadata string received for display
self.last_sar_metadata_str: Optional[str] = None
logging.debug(f"{log_prefix} Application state initialized.")
# --- Helper Methods for Initialization ---
def _parse_map_scale_factor(self, factor_str: str) -> float:
"""
Parses a map scale factor string (e.g., "N:1", "1:N") into a float multiplier.
Returns 1.0 on error.
"""
log_prefix = "[AppState Helper]"
try:
if not isinstance(factor_str, str) or ":" not in factor_str:
raise ValueError("Invalid format")
parts = factor_str.split(":")
val1_str, val2_str = parts[0].strip(), parts[1].strip()
if len(parts) != 2:
raise ValueError("Invalid parts")
val1 = int(val1_str)
val2 = int(val2_str)
if val1 <= 0 or val2 <= 0:
raise ValueError("Values must be positive")
if val1 == 1 and val2 == 1:
scale = 1.0
elif val2 == 1:
scale = float(val1)
elif val1 == 1:
scale = 1.0 / float(val2)
else:
raise ValueError("Invalid format combination")
logging.debug(
f"{log_prefix} Parsed map scale factor '{factor_str}' to {scale:.4f}"
)
return scale
except Exception as e:
logging.error(
f"{log_prefix} Error parsing map scale factor '{factor_str}': {e}. Using 1.0."
)
return 1.0
def _initialize_mfd_params(self) -> Dict[str, Any]:
"""Initializes the dictionary containing MFD category parameters based on config."""
log_prefix = "[AppState MFD Params]"
logging.debug(f"{log_prefix} Initializing MFD parameter dictionary.")
params = {
"categories": { # BGR colors
"Occlusion": {
"color": (0, 0, 0),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [0, 1],
},
"Cat A": {
"color": (255, 255, 255),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [2],
},
"Cat B": {
"color": (255, 255, 255),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [3, 18],
},
"Cat C": {
"color": (255, 255, 255),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [4, 5, 6, 16],
},
"Cat C1": {
"color": (255, 255, 255),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [7, 8, 9],
},
"Cat C2": {
"color": (255, 255, 255),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [10, 11, 12],
},
"Cat C3": {
"color": (255, 255, 255),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [13, 14, 15],
},
"Reserved": {
"color": (0, 0, 0),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": list(range(17, 32)),
},
},
"raw_map_intensity": config.DEFAULT_MFD_RAW_MAP_INTENSITY,
}
# Create reverse mapping for LUT generation
params["pixel_to_category"] = {
p: cat for cat, data in params["categories"].items() for p in data["pixels"]
}
logging.debug(f"{log_prefix} MFD parameters initialized.")
return params
def _initialize_geo_info(self) -> Dict[str, Any]:
"""Initializes the default SAR GeoInfo dictionary based on config values (angles in RADIANS)."""
log_prefix = "[AppState Geo Init]"
logging.debug(f"{log_prefix} Initializing default SAR GeoInfo.")
try:
w, h = config.SAR_WIDTH, config.SAR_HEIGHT
lat_deg, lon_deg = config.SAR_CENTER_LAT, config.SAR_CENTER_LON
size_km = config.SAR_IMAGE_SIZE_KM
scale_x = (size_km * 1000.0) / w if w > 0 else 0.0
scale_y = (size_km * 1000.0) / h if h > 0 else 0.0
geo_info = {
"lat": math.radians(lat_deg), # Latitude in RADIANS
"lon": math.radians(lon_deg), # Longitude in RADIANS
"ref_x": w // 2 if w > 0 else 0, # Reference pixel X coordinate
"ref_y": h // 2 if h > 0 else 0, # Reference pixel Y coordinate
"scale_x": scale_x, # Meters per pixel along X-axis
"scale_y": scale_y, # Meters per pixel along Y-axis
"width_px": w, # Full image width in pixels
"height_px": h, # Full image height in pixels
"orientation": 0.0, # Angle in RADIANS (0=North up)
"valid": False, # Initially invalid until real data arrives
}
logging.debug(f"{log_prefix} Default SAR GeoInfo initialized.")
return geo_info
except Exception as e:
logging.exception(f"{log_prefix} Error initializing default GeoInfo:")
# Return a minimal invalid structure on error
return {
"lat": 0.0,
"lon": 0.0,
"ref_x": 0,
"ref_y": 0,
"scale_x": 0.0,
"scale_y": 0.0,
"width_px": 0,
"height_px": 0,
"orientation": 0.0,
"valid": False,
}
# --- Methods for Safe Stat Updates/Reads ---
def increment_dropped_count(self, queue_name: str):
"""Safely increments the drop counter for the specified queue name (thread-safe)."""
with self._stats_lock:
if queue_name == "sar":
self._dropped_sar_queue += 1
elif queue_name == "mfd":
self._dropped_mfd_queue += 1
elif queue_name == "tkinter":
self._dropped_tkinter_queue += 1
elif queue_name == "mouse":
self._dropped_mouse_queue += 1
# else: logging.warning(...) # Optional: Log unknown queue
def increment_incomplete_rx_count(self, image_type: str):
"""Safely increments the counter for incomplete image receptions (thread-safe)."""
with self._stats_lock:
if image_type == "sar":
self._incomplete_sar_rx += 1
elif image_type == "mfd":
self._incomplete_mfd_rx += 1
# else: logging.warning(...) # Optional: Log unknown type
def get_statistics(self) -> Dict[str, int]:
"""Returns a dictionary containing current statistic counters (thread-safe read)."""
with self._stats_lock:
# Create a copy of the current stats
stats = {
"dropped_sar_q": self._dropped_sar_queue,
"dropped_mfd_q": self._dropped_mfd_queue,
"dropped_tk_q": self._dropped_tkinter_queue,
"dropped_mouse_q": self._dropped_mouse_queue,
"incomplete_sar_rx": self._incomplete_sar_rx,
"incomplete_mfd_rx": self._incomplete_mfd_rx,
}
return stats
def reset_statistics(self):
"""Resets all statistic counters and FPS calculation variables (thread-safe)."""
log_prefix = "[AppState Stats]"
logging.info(f"{log_prefix} Resetting statistic counters.")
with self._stats_lock:
self._dropped_sar_queue = 0
self._dropped_mfd_queue = 0
self._dropped_tkinter_queue = 0
self._dropped_mouse_queue = 0
self._incomplete_sar_rx = 0
self._incomplete_mfd_rx = 0
self.mfd_fps = 0.0
self.sar_fps = 0.0
self.mfd_frame_count = 0
self.sar_frame_count = 0
self.mfd_start_time = time.time()
self.sar_update_time = time.time()
logging.debug(f"{log_prefix} Statistic counters reset.")
# --- Methods for state updates ---
def update_map_scale_factor(self, factor_str: str):
"""Updates the map display scale factor based on a string representation."""
log_prefix = "[AppState Update]"
try:
new_scale = self._parse_map_scale_factor(factor_str)
if abs(new_scale - self.map_display_scale_factor) > 1e-6:
self.map_display_scale_factor = new_scale
logging.info(
f"{log_prefix} Map display scale factor updated to {new_scale:.4f}"
)
# else: logging.debug(...) # Reduce verbosity
except Exception as e:
logging.error(
f"{log_prefix} Failed to update map scale factor from '{factor_str}': {e}"
)
def update_map_overlay_params(
self, enabled: Optional[bool] = None, alpha: Optional[float] = None
):
"""Updates the SAR overlay enabled state and/or alpha value. Clamps alpha [0.0, 1.0]."""
log_prefix = "[AppState Update]"
updated = False
if enabled is not None and enabled != self.map_sar_overlay_enabled:
self.map_sar_overlay_enabled = enabled
logging.info(
f"{log_prefix} Map SAR overlay enabled state updated to: {enabled}"
)
updated = True
if alpha is not None:
alpha_clamped = max(0.0, min(1.0, alpha))
if abs(alpha_clamped - self.map_sar_overlay_alpha) > 1e-6:
self.map_sar_overlay_alpha = alpha_clamped
logging.info(
f"{log_prefix} Map SAR overlay alpha updated to: {alpha_clamped:.3f}"
)
updated = True
elif alpha != alpha_clamped:
logging.warning(
f"{log_prefix} Provided alpha value {alpha:.3f} clamped to {alpha_clamped:.3f}"
)
# if not updated: logging.debug(...) # Reduce verbosity
def update_sar_overlay_shift(self, lat_shift_deg: float, lon_shift_deg: float):
"""Updates the latitude and longitude shift values for the SAR overlay."""
log_prefix = "[AppState Update]"
updated = False
if (
math.isfinite(lat_shift_deg)
and abs(lat_shift_deg - self.sar_lat_shift_deg) > 1e-9
):
self.sar_lat_shift_deg = lat_shift_deg
logging.info(
f"{log_prefix} SAR overlay Latitude shift updated to: {lat_shift_deg:.6f} degrees."
)
updated = True
elif not math.isfinite(lat_shift_deg):
logging.warning(f"{log_prefix} Invalid latitude shift: {lat_shift_deg}.")
if (
math.isfinite(lon_shift_deg)
and abs(lon_shift_deg - self.sar_lon_shift_deg) > 1e-9
):
self.sar_lon_shift_deg = lon_shift_deg
logging.info(
f"{log_prefix} SAR overlay Longitude shift updated to: {lon_shift_deg:.6f} degrees."
)
updated = True
elif not math.isfinite(lon_shift_deg):
logging.warning(f"{log_prefix} Invalid longitude shift: {lon_shift_deg}.")
# if not updated: logging.debug(...) # Reduce verbosity
def update_sar_recording_enabled(self, enabled: bool):
"""Safely updates the SAR recording enabled flag."""
log_prefix = "[AppState Update]"
if enabled != self.sar_recording_enabled:
self.sar_recording_enabled = enabled
logging.info(
f"{log_prefix} SAR recording enabled state updated to: {enabled}"
)
# else: logging.debug(...) # Reduce verbosity
def update_sar_parameters(
self,
contrast: Optional[float] = None,
brightness: Optional[int] = None,
palette: Optional[str] = None,
):
"""Updates SAR display parameters (contrast, brightness, palette) if changed."""
log_prefix = "[AppState Update]"
updated = False
if contrast is not None and abs(contrast - self.sar_contrast) > 1e-6:
self.sar_contrast = contrast
updated = True
if brightness is not None and brightness != self.sar_brightness:
self.sar_brightness = brightness
updated = True
if palette is not None and palette != self.sar_palette:
self.sar_palette = palette
updated = True
# if updated: logging.debug(...) # Reduce verbosity
def update_sar_display_size(self, width: int, height: int):
"""Updates the target display size for the SAR image. Validates input."""
log_prefix = "[AppState Update]"
if width > 0 and height > 0:
if width != self.sar_display_width or height != self.sar_display_height:
self.sar_display_width = width
self.sar_display_height = height
logging.info(
f"{log_prefix} SAR display size updated to {width}x{height}"
)
# else: logging.debug(...) # Reduce verbosity
else:
logging.warning(
f"{log_prefix} Attempted invalid SAR display size: {width}x{height}"
)
def set_sar_data(
self, normalized_image: Optional[np.ndarray], geo_info: Optional[Dict[str, Any]]
):
"""Updates the current normalized SAR image and its associated GeoInfo."""
log_prefix = "[AppState Update]"
# Update image if valid
if (
normalized_image is not None
and isinstance(normalized_image, np.ndarray)
and normalized_image.size > 0
):
self.current_sar_normalized = normalized_image
# logging.debug(...) # Reduce verbosity
else:
logging.warning(f"{log_prefix} Invalid normalized SAR image provided.")
# Update GeoInfo if valid
if geo_info is not None and isinstance(geo_info, dict):
self.current_sar_geo_info = geo_info
# logging.debug(...) # Reduce verbosity
else:
logging.warning(f"{log_prefix} Invalid geo_info provided.")
self.current_sar_geo_info = self._initialize_geo_info() # Reset to default
def set_mfd_indices(self, indices_image: Optional[np.ndarray]):
"""Updates the current MFD index image."""
log_prefix = "[AppState Update]"
if (
indices_image is not None
and isinstance(indices_image, np.ndarray)
and indices_image.size > 0
):
self.current_mfd_indices = indices_image
# logging.debug(...) # Reduce verbosity
else:
logging.warning(f"{log_prefix} Invalid MFD indices image provided.")
# --- END OF FILE app_state.py ---