SXXXXXXX_ControlPanel/app_state.py
2025-04-08 07:53:55 +02:00

346 lines
14 KiB
Python

# --- START OF FILE app_state.py ---
# app_state.py
"""
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
Centralized state management for the Control Panel application.
Holds shared application state, including SAR/MFD parameters, georeferencing info,
current image data, LUTs, test mode status, and statistics. Provides methods
for controlled updates where necessary (e.g., thread safety for stats).
"""
# Standard library imports
import logging
import threading
import time
import math
from typing import Optional, Dict, Any, Tuple, List
# Third-party imports
import numpy as np
# Local application imports
import config # For default values
class AppState:
"""Class to hold and manage the application's shared state."""
def __init__(self):
"""Initializes the application state with default values."""
log_prefix = "[AppState Init]"
logging.debug(f"{log_prefix} Initializing application state...")
# --- General Application State ---
self.shutting_down: bool = False
# --- SAR Display Parameters ---
self.sar_contrast: float = config.DEFAULT_SAR_CONTRAST
self.sar_brightness: int = config.DEFAULT_SAR_BRIGHTNESS
self.sar_palette: str = config.DEFAULT_SAR_PALETTE
self.sar_display_width: int = (
config.INITIAL_SAR_WIDTH
) # Initial value, App might override if map enabled
self.sar_display_height: int = config.INITIAL_SAR_HEIGHT # Initial value
# --- SAR Data & Geo-referencing ---
# Raw SAR data (e.g., uint16) - Holds the latest complete raw image if needed, or None
# self.current_sar_raw: Optional[np.ndarray] = None # Decided not to store raw here yet, App holds it
# Normalized SAR data (uint8) - Holds the latest received/processed uint8 image
self.current_sar_normalized: Optional[np.ndarray] = np.zeros(
(config.SAR_HEIGHT, config.SAR_WIDTH), dtype=np.uint8
)
# SAR GeoInfo dictionary (contents defined in App/Receiver)
self.current_sar_geo_info: Dict[str, Any] = self._initialize_geo_info()
# SAR Look-up Table
self.brightness_contrast_lut: Optional[np.ndarray] = np.arange(
256, dtype=np.uint8
) # Start with identity LUT
# --- MFD Display Parameters & Data ---
self.mfd_params: Dict[str, Any] = self._initialize_mfd_params()
self.mfd_lut: np.ndarray = np.zeros(
(256, 3), dtype=np.uint8
) # Initial empty LUT
# MFD Index Data - Holds the latest received MFD index map
self.current_mfd_indices: Optional[np.ndarray] = np.zeros(
(config.MFD_HEIGHT, config.MFD_WIDTH), dtype=np.uint8
)
# --- Test Mode State ---
# Flag indicating if test mode *should be* active (set by UI handler/App)
self.test_mode_active: bool = config.ENABLE_TEST_MODE
# Test mode specific data (offsets are managed by TestModeManager)
# self.mfd_x_offset: int = 0 # Managed by TestModeManager
# self.sar_x_offset: int = 0 # Managed by TestModeManager
# Test image data (loaded once, potentially by TestModeManager or App init)
self.test_mfd_image_indices: Optional[np.ndarray] = None
self.test_sar_image_raw: Optional[np.ndarray] = None
self.local_mfd_image_data_indices: Optional[np.ndarray] = (
None # For local image mode
)
self.local_sar_image_data_raw: Optional[np.ndarray] = (
None # For local image mode
)
# --- Network State ---
# self.network_listening: bool = False # Might be useful later
# self.udp_socket_status: str = "Inactive" # Could store status string
# --- Performance / Statistics ---
self._stats_lock = threading.Lock() # Lock for updating stats safely
# FPS counters (updated by relevant processing loops/managers)
self.mfd_fps: float = 0.0
self.sar_fps: float = 0.0
self.mfd_frame_count: int = 0
self.sar_frame_count: int = 0
self.mfd_start_time: float = time.time()
self.sar_update_time: float = time.time()
# Drop/Incomplete Counts (provide methods to update/read safely)
self._dropped_sar_queue: int = 0
self._dropped_mfd_queue: int = 0
self._dropped_tkinter_queue: int = 0
self._dropped_mouse_queue: int = 0
self._incomplete_sar_rx: int = 0
self._incomplete_mfd_rx: int = 0
# --- UI Related State ---
self.last_mouse_update_time: float = 0.0 # Used for mouse event rate limiting
logging.debug(f"{log_prefix} Application state initialized.")
def _initialize_mfd_params(self) -> Dict[str, Any]:
"""Initializes the dictionary holding MFD visualization parameters."""
# Copied from App._initialize_mfd_params
log_prefix = "[AppState MFD Params]"
logging.debug(f"{log_prefix} Initializing MFD parameter dictionary.")
params = {
"categories": {
"Occlusion": {
"color": (0, 0, 0),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [0, 1],
},
"Cat A": {
"color": (255, 255, 255),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [2],
},
"Cat B": {
"color": (255, 255, 255),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [3, 18],
},
"Cat C": {
"color": (255, 255, 255),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [4, 5, 6, 16],
},
"Cat C1": {
"color": (255, 255, 255),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [7, 8, 9],
},
"Cat C2": {
"color": (255, 255, 255),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [10, 11, 12],
},
"Cat C3": {
"color": (255, 255, 255),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": [13, 14, 15],
},
"Reserved": {
"color": (0, 0, 0),
"intensity": config.DEFAULT_MFD_INTENSITY,
"pixels": list(range(17, 32)),
},
},
"raw_map_intensity": config.DEFAULT_MFD_RAW_MAP_INTENSITY,
}
params["pixel_to_category"] = {
pixel_index: category_name
for category_name, data in params["categories"].items()
for pixel_index in data["pixels"]
}
logging.debug(f"{log_prefix} MFD parameters initialized.")
return params
def _initialize_geo_info(self) -> Dict[str, Any]:
"""Initializes the SAR geo-referencing dictionary with default values."""
# Copied from App.__init__ geo section
log_prefix = "[AppState Geo Init]"
logging.debug(f"{log_prefix} Initializing default SAR GeoInfo.")
sar_width_px = config.SAR_WIDTH
sar_height_px = config.SAR_HEIGHT
sar_center_lat_deg = config.SAR_CENTER_LAT
sar_center_lon_deg = config.SAR_CENTER_LON
sar_size_km = config.SAR_IMAGE_SIZE_KM
scale_x_val = (sar_size_km * 1000.0) / sar_width_px if sar_width_px > 0 else 0.0
scale_y_val = (
(sar_size_km * 1000.0) / sar_height_px if sar_height_px > 0 else 0.0
)
geo_info = {
"lat": math.radians(sar_center_lat_deg), # RADIANS
"lon": math.radians(sar_center_lon_deg), # RADIANS
"ref_x": sar_width_px // 2 if sar_width_px > 0 else 0,
"ref_y": sar_height_px // 2 if sar_height_px > 0 else 0,
"scale_x": scale_x_val,
"scale_y": scale_y_val,
"width_px": sar_width_px,
"height_px": sar_height_px,
"orientation": 0.0, # RADIANS
"valid": False, # Default to False, set True only when valid data arrives
}
logging.debug(f"{log_prefix} Default SAR GeoInfo initialized.")
return geo_info
# --- Methods for Safe Stat Updates/Reads ---
def increment_dropped_count(self, queue_name: str):
"""Safely increments the drop counter for the specified queue."""
with self._stats_lock:
if queue_name == "sar":
self._dropped_sar_queue += 1
elif queue_name == "mfd":
self._dropped_mfd_queue += 1
elif queue_name == "tkinter":
self._dropped_tkinter_queue += 1
elif queue_name == "mouse":
self._dropped_mouse_queue += 1
else:
logging.warning(
f"[AppState] Attempted to increment drop count for unknown queue: {queue_name}"
)
def increment_incomplete_rx_count(self, image_type: str):
"""Safely increments the incomplete counter for SAR or MFD."""
with self._stats_lock:
if image_type == "sar":
self._incomplete_sar_rx += 1
elif image_type == "mfd":
self._incomplete_mfd_rx += 1
else:
logging.warning(
f"[AppState] Attempted to increment incomplete count for unknown type: {image_type}"
)
def get_statistics(self) -> Dict[str, int]:
"""Returns a dictionary containing the current statistics (thread-safe read)."""
with self._stats_lock:
stats = {
"dropped_sar_q": self._dropped_sar_queue,
"dropped_mfd_q": self._dropped_mfd_queue,
"dropped_tk_q": self._dropped_tkinter_queue,
"dropped_mouse_q": self._dropped_mouse_queue,
"incomplete_sar_rx": self._incomplete_sar_rx,
"incomplete_mfd_rx": self._incomplete_mfd_rx,
}
return stats
def reset_statistics(self):
"""Resets all statistic counters to zero (thread-safe write)."""
log_prefix = "[AppState Stats]"
logging.info(f"{log_prefix} Resetting statistic counters.")
with self._stats_lock:
self._dropped_sar_queue = 0
self._dropped_mfd_queue = 0
self._dropped_tkinter_queue = 0
self._dropped_mouse_queue = 0
self._incomplete_sar_rx = 0
self._incomplete_mfd_rx = 0
# Reset FPS related counts as well? Or handle in their respective update logic.
self.mfd_fps = 0.0
self.sar_fps = 0.0
self.mfd_frame_count = 0
self.sar_frame_count = 0
self.mfd_start_time = time.time()
self.sar_update_time = time.time()
logging.debug(f"{log_prefix} Statistic counters reset.")
# --- Methods for state updates (Examples) ---
# Add more methods as needed to update state in a controlled way,
# especially if complex logic or thread-safety beyond simple assignment is needed.
def update_sar_parameters(
self,
contrast: Optional[float] = None,
brightness: Optional[int] = None,
palette: Optional[str] = None,
):
"""Updates SAR display parameters."""
log_prefix = "[AppState Update]"
if contrast is not None:
self.sar_contrast = contrast
logging.debug(f"{log_prefix} SAR contrast updated to {contrast}")
if brightness is not None:
self.sar_brightness = brightness
logging.debug(f"{log_prefix} SAR brightness updated to {brightness}")
if palette is not None:
self.sar_palette = palette
logging.debug(f"{log_prefix} SAR palette updated to {palette}")
def update_sar_display_size(self, width: int, height: int):
"""Updates SAR display dimensions."""
log_prefix = "[AppState Update]"
if width > 0 and height > 0:
self.sar_display_width = width
self.sar_display_height = height
logging.info(f"{log_prefix} SAR display size updated to {width}x{height}")
else:
logging.warning(
f"{log_prefix} Attempted to set invalid SAR display size: {width}x{height}"
)
def set_sar_data(self, normalized_image: np.ndarray, geo_info: Dict[str, Any]):
"""Updates the current normalized SAR image and geo info."""
log_prefix = "[AppState Update]"
if normalized_image is not None and isinstance(normalized_image, np.ndarray):
self.current_sar_normalized = (
normalized_image # Assume copy was made before calling
)
logging.debug(
f"{log_prefix} Updated current_sar_normalized (Shape: {normalized_image.shape})"
)
else:
logging.warning(
f"{log_prefix} Invalid normalized SAR image provided for update."
)
if geo_info is not None and isinstance(geo_info, dict):
self.current_sar_geo_info = geo_info # Assume copy was made before calling
logging.debug(
f"{log_prefix} Updated current_sar_geo_info (Valid: {geo_info.get('valid', 'N/A')})"
)
else:
logging.warning(f"{log_prefix} Invalid geo_info provided for update.")
# Ensure geo_info is always a dict with 'valid' key
self.current_sar_geo_info = self._initialize_geo_info()
self.current_sar_geo_info["valid"] = False
def set_mfd_indices(self, indices_image: np.ndarray):
"""Updates the current MFD indices image."""
log_prefix = "[AppState Update]"
if indices_image is not None and isinstance(indices_image, np.ndarray):
self.current_mfd_indices = (
indices_image # Assume copy was made before calling
)
logging.debug(
f"{log_prefix} Updated current_mfd_indices (Shape: {indices_image.shape})"
)
else:
logging.warning(
f"{log_prefix} Invalid indices image provided for MFD update."
)
# --- END OF FILE app_state.py ---