SXXXXXXX_ControlPanel/app_state.py

444 lines
18 KiB
Python

# --- START OF FILE app_state.py ---
# app_state.py
"""
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
Centralized state management for the Control Panel application.
Holds shared application state, including SAR/MFD parameters, georeferencing info,
current image data, LUTs, test mode status, statistics, and map display state.
Provides methods for controlled updates where necessary.
"""
# Standard library imports
import logging
import threading
import time
import math
from typing import Optional, Dict, Any, Tuple, List
# Third-party imports
import numpy as np
try:
from PIL import Image # Needed for map image state typing
except ImportError:
Image = None # Define as None if Pillow not installed
# Local application imports
import config # For default values
class AppState:
"""Class to hold and manage the application's shared state."""
def __init__(self):
"""Initializes the application state with default values."""
log_prefix = "[AppState Init]"
logging.debug(f"{log_prefix} Initializing application state...")
# --- General Application State ---
self.shutting_down: bool = False
# --- SAR Display Parameters ---
self.sar_contrast: float = config.DEFAULT_SAR_CONTRAST
self.sar_brightness: int = config.DEFAULT_SAR_BRIGHTNESS
self.sar_palette: str = config.DEFAULT_SAR_PALETTE
self.sar_display_width: int = config.INITIAL_SAR_WIDTH
self.sar_display_height: int = config.INITIAL_SAR_HEIGHT
# --- >>> START OF NEW CODE <<< ---
# --- SAR Recording State ---
self.sar_recording_enabled: bool = config.DEFAULT_SAR_RECORDING_ENABLED
# --- >>> END OF NEW CODE <<< ---
# --- SAR Data & Geo-referencing ---
# Note: current_sar_normalized is for DISPLAY. Recorder needs RAW data from receiver.
self.current_sar_normalized: Optional[np.ndarray] = np.zeros(
(config.SAR_HEIGHT, config.SAR_WIDTH), dtype=np.uint8
)
self.current_sar_geo_info: Dict[str, Any] = self._initialize_geo_info()
self.brightness_contrast_lut: Optional[np.ndarray] = np.arange(
256, dtype=np.uint8
)
# --- MFD Display Parameters & Data ---
self.mfd_params: Dict[str, Any] = self._initialize_mfd_params()
self.mfd_lut: np.ndarray = np.zeros((256, 3), dtype=np.uint8)
self.current_mfd_indices: Optional[np.ndarray] = np.zeros(
(config.MFD_HEIGHT, config.MFD_WIDTH), dtype=np.uint8
)
# --- Test Mode State ---
self.test_mode_active: bool = config.ENABLE_TEST_MODE
self.test_mfd_image_indices: Optional[np.ndarray] = None
self.test_sar_image_raw: Optional[np.ndarray] = None # Raw data for test mode
self.local_mfd_image_data_indices: Optional[np.ndarray] = None
self.local_sar_image_data_raw: Optional[np.ndarray] = (
None # Raw data for local mode
)
# --- Network State ---
# (Potentially add network status indicators here if needed)
# --- Performance / Statistics ---
self._stats_lock = threading.Lock()
self.mfd_fps: float = 0.0
self.sar_fps: float = 0.0
self.mfd_frame_count: int = 0
self.sar_frame_count: int = 0
self.mfd_start_time: float = time.time()
self.sar_update_time: float = time.time()
self._dropped_sar_queue: int = 0
self._dropped_mfd_queue: int = 0
self._dropped_tkinter_queue: int = 0
self._dropped_mouse_queue: int = 0
self._incomplete_sar_rx: int = 0
self._incomplete_mfd_rx: int = 0
# --- Map Related State ---
self.map_display_scale_factor: float = self._parse_map_scale_factor(
config.DEFAULT_MAP_SIZE
)
self.last_map_image_pil: Optional[Image.Image] = None # Pillow Image or None
self.map_sar_overlay_enabled: bool = config.DEFAULT_MAP_SAR_OVERLAY_ENABLED
self.map_sar_overlay_alpha: float = max(
0.0, min(1.0, config.DEFAULT_MAP_SAR_OVERLAY_ALPHA)
) # Clamped 0.0-1.0
# --- UI Related State ---
self.last_mouse_update_time: float = 0.0
logging.debug(f"{log_prefix} Application state initialized.")
# --- Helper Methods for Initialization ---
def _parse_map_scale_factor(self, factor_str: str) -> float:
# ... (implementation unchanged) ...
log_prefix = "[AppState Helper]"
try:
if not isinstance(factor_str, str) or ":" not in factor_str:
raise ValueError("Invalid format, must be string with ':'")
parts = factor_str.split(":")
if len(parts) != 2:
raise ValueError("Invalid format, must have two parts separated by ':'")
val1_str, val2_str = parts[0].strip(), parts[1].strip()
val1 = int(val1_str)
val2 = int(val2_str)
if val1 <= 0 or val2 <= 0:
raise ValueError("Values must be positive integers")
if val1 == 1 and val2 == 1:
scale = 1.0
elif val2 == 1:
scale = float(val1)
elif val1 == 1:
scale = 1.0 / float(val2)
else:
raise ValueError("Invalid format combination, must be 'N:1' or '1:N'")
logging.debug(
f"{log_prefix} Parsed map scale factor '{factor_str}' to {scale:.4f}"
)
return scale
except (ValueError, IndexError, TypeError) as e:
logging.error(
f"{log_prefix} Error parsing map scale factor '{factor_str}': {e}. Using 1.0."
)
return 1.0
def _initialize_mfd_params(self) -> Dict[str, Any]:
# ... (implementation unchanged) ...
log_prefix = "[AppState MFD Params]"
logging.debug(f"{log_prefix} Initializing MFD parameter dictionary.")
params = {
"categories": {
"Occlusion": {
"color": (0, 0, 0),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [0, 1],
},
"Cat A": {
"color": (255, 255, 255),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [2],
},
"Cat B": {
"color": (255, 255, 255),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [3, 18],
},
"Cat C": {
"color": (255, 255, 255),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [4, 5, 6, 16],
},
"Cat C1": {
"color": (255, 255, 255),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [7, 8, 9],
},
"Cat C2": {
"color": (255, 255, 255),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [10, 11, 12],
},
"Cat C3": {
"color": (255, 255, 255),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [13, 14, 15],
},
"Reserved": {
"color": (0, 0, 0),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": list(range(17, 32)),
},
},
"raw_map_intensity": config.DEFAULT_MFD_RAW_MAP_INTENSITY,
}
params["pixel_to_category"] = {
pixel_index: category_name
for category_name, data in params["categories"].items()
for pixel_index in data["pixels"]
}
logging.debug(f"{log_prefix} MFD parameters initialized.")
return params
def _initialize_geo_info(self) -> Dict[str, Any]:
# ... (implementation unchanged) ...
log_prefix = "[AppState Geo Init]"
logging.debug(f"{log_prefix} Initializing default SAR GeoInfo.")
sar_width_px = config.SAR_WIDTH
sar_height_px = config.SAR_HEIGHT
sar_center_lat_deg = config.SAR_CENTER_LAT
sar_center_lon_deg = config.SAR_CENTER_LON
sar_size_km = config.SAR_IMAGE_SIZE_KM
scale_x_val = 0.0
if sar_width_px > 0:
scale_x_val = (sar_size_km * 1000.0) / sar_width_px
scale_y_val = 0.0
if sar_height_px > 0:
scale_y_val = (sar_size_km * 1000.0) / sar_height_px
geo_info = {
"lat": math.radians(sar_center_lat_deg),
"lon": math.radians(sar_center_lon_deg),
"ref_x": sar_width_px // 2 if sar_width_px > 0 else 0,
"ref_y": sar_height_px // 2 if sar_height_px > 0 else 0,
"scale_x": scale_x_val,
"scale_y": scale_y_val,
"width_px": sar_width_px,
"height_px": sar_height_px,
"orientation": 0.0,
"valid": False,
}
logging.debug(f"{log_prefix} Default SAR GeoInfo initialized.")
return geo_info
# --- Methods for Safe Stat Updates/Reads ---
def increment_dropped_count(self, queue_name: str):
# ... (implementation unchanged) ...
with self._stats_lock:
if queue_name == "sar":
self._dropped_sar_queue += 1
elif queue_name == "mfd":
self._dropped_mfd_queue += 1
elif queue_name == "tkinter":
self._dropped_tkinter_queue += 1
elif queue_name == "mouse":
self._dropped_mouse_queue += 1
else:
logging.warning(
f"[AppState] Attempted to increment drop count for unknown queue: {queue_name}"
)
def increment_incomplete_rx_count(self, image_type: str):
# ... (implementation unchanged) ...
with self._stats_lock:
if image_type == "sar":
self._incomplete_sar_rx += 1
elif image_type == "mfd":
self._incomplete_mfd_rx += 1
else:
logging.warning(
f"[AppState] Attempted to increment incomplete count for unknown type: {image_type}"
)
def get_statistics(self) -> Dict[str, int]:
# ... (implementation unchanged) ...
with self._stats_lock:
stats = {
"dropped_sar_q": self._dropped_sar_queue,
"dropped_mfd_q": self._dropped_mfd_queue,
"dropped_tk_q": self._dropped_tkinter_queue,
"dropped_mouse_q": self._dropped_mouse_queue,
"incomplete_sar_rx": self._incomplete_sar_rx,
"incomplete_mfd_rx": self._incomplete_mfd_rx,
}
return stats
def reset_statistics(self):
# ... (implementation unchanged) ...
log_prefix = "[AppState Stats]"
logging.info(f"{log_prefix} Resetting statistic counters.")
with self._stats_lock:
self._dropped_sar_queue = 0
self._dropped_mfd_queue = 0
self._dropped_tkinter_queue = 0
self._dropped_mouse_queue = 0
self._incomplete_sar_rx = 0
self._incomplete_mfd_rx = 0
self.mfd_fps = 0.0
self.sar_fps = 0.0
self.mfd_frame_count = 0
self.sar_frame_count = 0
self.mfd_start_time = time.time()
self.sar_update_time = time.time()
logging.debug(f"{log_prefix} Statistic counters reset.")
# --- Methods for state updates ---
def update_map_scale_factor(self, factor_str: str):
# ... (implementation unchanged) ...
log_prefix = "[AppState Update]"
try:
new_scale = self._parse_map_scale_factor(factor_str)
if abs(new_scale - self.map_display_scale_factor) > 1e-6:
self.map_display_scale_factor = new_scale
logging.info(
f"{log_prefix} Map display scale factor updated to {new_scale:.4f} (from '{factor_str}')"
)
else:
logging.debug(
f"{log_prefix} Map display scale factor already {new_scale:.4f}."
)
except Exception as e:
logging.error(
f"{log_prefix} Failed to update map scale factor from string '{factor_str}': {e}"
)
def update_map_overlay_params(
self, enabled: Optional[bool] = None, alpha: Optional[float] = None
):
# ... (implementation unchanged from previous step) ...
log_prefix = "[AppState Update]"
updated = False
if enabled is not None and enabled != self.map_sar_overlay_enabled:
self.map_sar_overlay_enabled = enabled
logging.info(
f"{log_prefix} Map SAR overlay enabled state updated to: {enabled}"
)
updated = True
if alpha is not None:
alpha_clamped = max(0.0, min(1.0, alpha))
if abs(alpha_clamped - self.map_sar_overlay_alpha) > 1e-6:
self.map_sar_overlay_alpha = alpha_clamped
logging.info(
f"{log_prefix} Map SAR overlay alpha updated to: {alpha_clamped:.3f}"
)
updated = True
elif alpha != alpha_clamped:
logging.warning(
f"{log_prefix} Provided alpha value {alpha:.3f} was clamped to {alpha_clamped:.3f}"
)
if not updated:
logging.debug(
f"{log_prefix} Map overlay parameters update called but no values changed."
)
# --- >>> START OF NEW METHOD <<< ---
def update_sar_recording_enabled(self, enabled: bool):
"""Safely updates the SAR recording enabled flag."""
log_prefix = "[AppState Update]"
if enabled != self.sar_recording_enabled:
self.sar_recording_enabled = enabled
logging.info(
f"{log_prefix} SAR recording enabled state updated to: {enabled}"
)
else:
logging.debug(
f"{log_prefix} SAR recording enabled state already {enabled}."
)
# --- >>> END OF NEW METHOD <<< ---
def update_sar_parameters(
self,
contrast: Optional[float] = None,
brightness: Optional[int] = None,
palette: Optional[str] = None,
):
# ... (implementation unchanged) ...
log_prefix = "[AppState Update]"
updated = False
if contrast is not None and abs(contrast - self.sar_contrast) > 1e-6:
self.sar_contrast = contrast
logging.debug(f"{log_prefix} SAR contrast updated to {contrast:.3f}")
updated = True
if brightness is not None and brightness != self.sar_brightness:
self.sar_brightness = brightness
logging.debug(f"{log_prefix} SAR brightness updated to {brightness}")
updated = True
if palette is not None and palette != self.sar_palette:
self.sar_palette = palette
logging.debug(f"{log_prefix} SAR palette updated to {palette}")
updated = True
if not updated:
logging.debug(
f"{log_prefix} SAR parameters update called but no values changed."
)
def update_sar_display_size(self, width: int, height: int):
# ... (implementation unchanged) ...
log_prefix = "[AppState Update]"
if width > 0 and height > 0:
if width != self.sar_display_width or height != self.sar_display_height:
self.sar_display_width = width
self.sar_display_height = height
logging.info(
f"{log_prefix} SAR display size updated to {width}x{height}"
)
else:
logging.debug(
f"{log_prefix} SAR display size already {width}x{height}."
)
else:
logging.warning(
f"{log_prefix} Attempted to set invalid SAR display size: {width}x{height}"
)
def set_sar_data(self, normalized_image: np.ndarray, geo_info: Dict[str, Any]):
# ... (implementation unchanged) ...
log_prefix = "[AppState Update]"
if normalized_image is not None and isinstance(normalized_image, np.ndarray):
self.current_sar_normalized = normalized_image
logging.debug(
f"{log_prefix} Updated current_sar_normalized (Shape: {normalized_image.shape})"
)
else:
logging.warning(
f"{log_prefix} Invalid normalized SAR image provided for update."
)
if geo_info is not None and isinstance(geo_info, dict):
self.current_sar_geo_info = geo_info
logging.debug(
f"{log_prefix} Updated current_sar_geo_info (Valid: {geo_info.get('valid', 'N/A')})"
)
else:
logging.warning(f"{log_prefix} Invalid geo_info provided for update.")
self.current_sar_geo_info = self._initialize_geo_info()
self.current_sar_geo_info["valid"] = False
def set_mfd_indices(self, indices_image: np.ndarray):
# ... (implementation unchanged) ...
log_prefix = "[AppState Update]"
if indices_image is not None and isinstance(indices_image, np.ndarray):
self.current_mfd_indices = indices_image
logging.debug(
f"{log_prefix} Updated current_mfd_indices (Shape: {indices_image.shape})"
)
else:
logging.warning(
f"{log_prefix} Invalid indices image provided for MFD update."
)
# --- END OF FILE app_state.py ---