346 lines
14 KiB
Python
346 lines
14 KiB
Python
# --- START OF FILE app_state.py ---
|
|
|
|
# app_state.py
|
|
"""
|
|
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
|
|
|
|
Centralized state management for the Control Panel application.
|
|
|
|
Holds shared application state, including SAR/MFD parameters, georeferencing info,
|
|
current image data, LUTs, test mode status, and statistics. Provides methods
|
|
for controlled updates where necessary (e.g., thread safety for stats).
|
|
"""
|
|
|
|
# Standard library imports
|
|
import logging
|
|
import threading
|
|
import time
|
|
import math
|
|
from typing import Optional, Dict, Any, Tuple, List
|
|
|
|
# Third-party imports
|
|
import numpy as np
|
|
|
|
# Local application imports
|
|
import config # For default values
|
|
|
|
|
|
class AppState:
|
|
"""Class to hold and manage the application's shared state."""
|
|
|
|
def __init__(self):
|
|
"""Initializes the application state with default values."""
|
|
log_prefix = "[AppState Init]"
|
|
logging.debug(f"{log_prefix} Initializing application state...")
|
|
|
|
# --- General Application State ---
|
|
self.shutting_down: bool = False
|
|
|
|
# --- SAR Display Parameters ---
|
|
self.sar_contrast: float = config.DEFAULT_SAR_CONTRAST
|
|
self.sar_brightness: int = config.DEFAULT_SAR_BRIGHTNESS
|
|
self.sar_palette: str = config.DEFAULT_SAR_PALETTE
|
|
self.sar_display_width: int = (
|
|
config.INITIAL_SAR_WIDTH
|
|
) # Initial value, App might override if map enabled
|
|
self.sar_display_height: int = config.INITIAL_SAR_HEIGHT # Initial value
|
|
|
|
# --- SAR Data & Geo-referencing ---
|
|
# Raw SAR data (e.g., uint16) - Holds the latest complete raw image if needed, or None
|
|
# self.current_sar_raw: Optional[np.ndarray] = None # Decided not to store raw here yet, App holds it
|
|
|
|
# Normalized SAR data (uint8) - Holds the latest received/processed uint8 image
|
|
self.current_sar_normalized: Optional[np.ndarray] = np.zeros(
|
|
(config.SAR_HEIGHT, config.SAR_WIDTH), dtype=np.uint8
|
|
)
|
|
# SAR GeoInfo dictionary (contents defined in App/Receiver)
|
|
self.current_sar_geo_info: Dict[str, Any] = self._initialize_geo_info()
|
|
|
|
# SAR Look-up Table
|
|
self.brightness_contrast_lut: Optional[np.ndarray] = np.arange(
|
|
256, dtype=np.uint8
|
|
) # Start with identity LUT
|
|
|
|
# --- MFD Display Parameters & Data ---
|
|
self.mfd_params: Dict[str, Any] = self._initialize_mfd_params()
|
|
self.mfd_lut: np.ndarray = np.zeros(
|
|
(256, 3), dtype=np.uint8
|
|
) # Initial empty LUT
|
|
|
|
# MFD Index Data - Holds the latest received MFD index map
|
|
self.current_mfd_indices: Optional[np.ndarray] = np.zeros(
|
|
(config.MFD_HEIGHT, config.MFD_WIDTH), dtype=np.uint8
|
|
)
|
|
|
|
# --- Test Mode State ---
|
|
# Flag indicating if test mode *should be* active (set by UI handler/App)
|
|
self.test_mode_active: bool = config.ENABLE_TEST_MODE
|
|
# Test mode specific data (offsets are managed by TestModeManager)
|
|
# self.mfd_x_offset: int = 0 # Managed by TestModeManager
|
|
# self.sar_x_offset: int = 0 # Managed by TestModeManager
|
|
# Test image data (loaded once, potentially by TestModeManager or App init)
|
|
self.test_mfd_image_indices: Optional[np.ndarray] = None
|
|
self.test_sar_image_raw: Optional[np.ndarray] = None
|
|
self.local_mfd_image_data_indices: Optional[np.ndarray] = (
|
|
None # For local image mode
|
|
)
|
|
self.local_sar_image_data_raw: Optional[np.ndarray] = (
|
|
None # For local image mode
|
|
)
|
|
|
|
# --- Network State ---
|
|
# self.network_listening: bool = False # Might be useful later
|
|
# self.udp_socket_status: str = "Inactive" # Could store status string
|
|
|
|
# --- Performance / Statistics ---
|
|
self._stats_lock = threading.Lock() # Lock for updating stats safely
|
|
# FPS counters (updated by relevant processing loops/managers)
|
|
self.mfd_fps: float = 0.0
|
|
self.sar_fps: float = 0.0
|
|
self.mfd_frame_count: int = 0
|
|
self.sar_frame_count: int = 0
|
|
self.mfd_start_time: float = time.time()
|
|
self.sar_update_time: float = time.time()
|
|
# Drop/Incomplete Counts (provide methods to update/read safely)
|
|
self._dropped_sar_queue: int = 0
|
|
self._dropped_mfd_queue: int = 0
|
|
self._dropped_tkinter_queue: int = 0
|
|
self._dropped_mouse_queue: int = 0
|
|
self._incomplete_sar_rx: int = 0
|
|
self._incomplete_mfd_rx: int = 0
|
|
|
|
# --- UI Related State ---
|
|
self.last_mouse_update_time: float = 0.0 # Used for mouse event rate limiting
|
|
|
|
logging.debug(f"{log_prefix} Application state initialized.")
|
|
|
|
def _initialize_mfd_params(self) -> Dict[str, Any]:
|
|
"""Initializes the dictionary holding MFD visualization parameters."""
|
|
# Copied from App._initialize_mfd_params
|
|
log_prefix = "[AppState MFD Params]"
|
|
logging.debug(f"{log_prefix} Initializing MFD parameter dictionary.")
|
|
params = {
|
|
"categories": {
|
|
"Occlusion": {
|
|
"color": (0, 0, 0),
|
|
"intensity": config.DEFAULT_MFD_INTENSITY,
|
|
"pixels": [0, 1],
|
|
},
|
|
"Cat A": {
|
|
"color": (255, 255, 255),
|
|
"intensity": config.DEFAULT_MFD_INTENSITY,
|
|
"pixels": [2],
|
|
},
|
|
"Cat B": {
|
|
"color": (255, 255, 255),
|
|
"intensity": config.DEFAULT_MFD_INTENSITY,
|
|
"pixels": [3, 18],
|
|
},
|
|
"Cat C": {
|
|
"color": (255, 255, 255),
|
|
"intensity": config.DEFAULT_MFD_INTENSITY,
|
|
"pixels": [4, 5, 6, 16],
|
|
},
|
|
"Cat C1": {
|
|
"color": (255, 255, 255),
|
|
"intensity": config.DEFAULT_MFD_INTENSITY,
|
|
"pixels": [7, 8, 9],
|
|
},
|
|
"Cat C2": {
|
|
"color": (255, 255, 255),
|
|
"intensity": config.DEFAULT_MFD_INTENSITY,
|
|
"pixels": [10, 11, 12],
|
|
},
|
|
"Cat C3": {
|
|
"color": (255, 255, 255),
|
|
"intensity": config.DEFAULT_MFD_INTENSITY,
|
|
"pixels": [13, 14, 15],
|
|
},
|
|
"Reserved": {
|
|
"color": (0, 0, 0),
|
|
"intensity": config.DEFAULT_MFD_INTENSITY,
|
|
"pixels": list(range(17, 32)),
|
|
},
|
|
},
|
|
"raw_map_intensity": config.DEFAULT_MFD_RAW_MAP_INTENSITY,
|
|
}
|
|
params["pixel_to_category"] = {
|
|
pixel_index: category_name
|
|
for category_name, data in params["categories"].items()
|
|
for pixel_index in data["pixels"]
|
|
}
|
|
logging.debug(f"{log_prefix} MFD parameters initialized.")
|
|
return params
|
|
|
|
def _initialize_geo_info(self) -> Dict[str, Any]:
|
|
"""Initializes the SAR geo-referencing dictionary with default values."""
|
|
# Copied from App.__init__ geo section
|
|
log_prefix = "[AppState Geo Init]"
|
|
logging.debug(f"{log_prefix} Initializing default SAR GeoInfo.")
|
|
sar_width_px = config.SAR_WIDTH
|
|
sar_height_px = config.SAR_HEIGHT
|
|
sar_center_lat_deg = config.SAR_CENTER_LAT
|
|
sar_center_lon_deg = config.SAR_CENTER_LON
|
|
sar_size_km = config.SAR_IMAGE_SIZE_KM
|
|
|
|
scale_x_val = (sar_size_km * 1000.0) / sar_width_px if sar_width_px > 0 else 0.0
|
|
scale_y_val = (
|
|
(sar_size_km * 1000.0) / sar_height_px if sar_height_px > 0 else 0.0
|
|
)
|
|
|
|
geo_info = {
|
|
"lat": math.radians(sar_center_lat_deg), # RADIANS
|
|
"lon": math.radians(sar_center_lon_deg), # RADIANS
|
|
"ref_x": sar_width_px // 2 if sar_width_px > 0 else 0,
|
|
"ref_y": sar_height_px // 2 if sar_height_px > 0 else 0,
|
|
"scale_x": scale_x_val,
|
|
"scale_y": scale_y_val,
|
|
"width_px": sar_width_px,
|
|
"height_px": sar_height_px,
|
|
"orientation": 0.0, # RADIANS
|
|
"valid": False, # Default to False, set True only when valid data arrives
|
|
}
|
|
logging.debug(f"{log_prefix} Default SAR GeoInfo initialized.")
|
|
return geo_info
|
|
|
|
# --- Methods for Safe Stat Updates/Reads ---
|
|
|
|
def increment_dropped_count(self, queue_name: str):
|
|
"""Safely increments the drop counter for the specified queue."""
|
|
with self._stats_lock:
|
|
if queue_name == "sar":
|
|
self._dropped_sar_queue += 1
|
|
elif queue_name == "mfd":
|
|
self._dropped_mfd_queue += 1
|
|
elif queue_name == "tkinter":
|
|
self._dropped_tkinter_queue += 1
|
|
elif queue_name == "mouse":
|
|
self._dropped_mouse_queue += 1
|
|
else:
|
|
logging.warning(
|
|
f"[AppState] Attempted to increment drop count for unknown queue: {queue_name}"
|
|
)
|
|
|
|
def increment_incomplete_rx_count(self, image_type: str):
|
|
"""Safely increments the incomplete counter for SAR or MFD."""
|
|
with self._stats_lock:
|
|
if image_type == "sar":
|
|
self._incomplete_sar_rx += 1
|
|
elif image_type == "mfd":
|
|
self._incomplete_mfd_rx += 1
|
|
else:
|
|
logging.warning(
|
|
f"[AppState] Attempted to increment incomplete count for unknown type: {image_type}"
|
|
)
|
|
|
|
def get_statistics(self) -> Dict[str, int]:
|
|
"""Returns a dictionary containing the current statistics (thread-safe read)."""
|
|
with self._stats_lock:
|
|
stats = {
|
|
"dropped_sar_q": self._dropped_sar_queue,
|
|
"dropped_mfd_q": self._dropped_mfd_queue,
|
|
"dropped_tk_q": self._dropped_tkinter_queue,
|
|
"dropped_mouse_q": self._dropped_mouse_queue,
|
|
"incomplete_sar_rx": self._incomplete_sar_rx,
|
|
"incomplete_mfd_rx": self._incomplete_mfd_rx,
|
|
}
|
|
return stats
|
|
|
|
def reset_statistics(self):
|
|
"""Resets all statistic counters to zero (thread-safe write)."""
|
|
log_prefix = "[AppState Stats]"
|
|
logging.info(f"{log_prefix} Resetting statistic counters.")
|
|
with self._stats_lock:
|
|
self._dropped_sar_queue = 0
|
|
self._dropped_mfd_queue = 0
|
|
self._dropped_tkinter_queue = 0
|
|
self._dropped_mouse_queue = 0
|
|
self._incomplete_sar_rx = 0
|
|
self._incomplete_mfd_rx = 0
|
|
# Reset FPS related counts as well? Or handle in their respective update logic.
|
|
self.mfd_fps = 0.0
|
|
self.sar_fps = 0.0
|
|
self.mfd_frame_count = 0
|
|
self.sar_frame_count = 0
|
|
self.mfd_start_time = time.time()
|
|
self.sar_update_time = time.time()
|
|
logging.debug(f"{log_prefix} Statistic counters reset.")
|
|
|
|
# --- Methods for state updates (Examples) ---
|
|
# Add more methods as needed to update state in a controlled way,
|
|
# especially if complex logic or thread-safety beyond simple assignment is needed.
|
|
|
|
def update_sar_parameters(
|
|
self,
|
|
contrast: Optional[float] = None,
|
|
brightness: Optional[int] = None,
|
|
palette: Optional[str] = None,
|
|
):
|
|
"""Updates SAR display parameters."""
|
|
log_prefix = "[AppState Update]"
|
|
if contrast is not None:
|
|
self.sar_contrast = contrast
|
|
logging.debug(f"{log_prefix} SAR contrast updated to {contrast}")
|
|
if brightness is not None:
|
|
self.sar_brightness = brightness
|
|
logging.debug(f"{log_prefix} SAR brightness updated to {brightness}")
|
|
if palette is not None:
|
|
self.sar_palette = palette
|
|
logging.debug(f"{log_prefix} SAR palette updated to {palette}")
|
|
|
|
def update_sar_display_size(self, width: int, height: int):
|
|
"""Updates SAR display dimensions."""
|
|
log_prefix = "[AppState Update]"
|
|
if width > 0 and height > 0:
|
|
self.sar_display_width = width
|
|
self.sar_display_height = height
|
|
logging.info(f"{log_prefix} SAR display size updated to {width}x{height}")
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Attempted to set invalid SAR display size: {width}x{height}"
|
|
)
|
|
|
|
def set_sar_data(self, normalized_image: np.ndarray, geo_info: Dict[str, Any]):
|
|
"""Updates the current normalized SAR image and geo info."""
|
|
log_prefix = "[AppState Update]"
|
|
if normalized_image is not None and isinstance(normalized_image, np.ndarray):
|
|
self.current_sar_normalized = (
|
|
normalized_image # Assume copy was made before calling
|
|
)
|
|
logging.debug(
|
|
f"{log_prefix} Updated current_sar_normalized (Shape: {normalized_image.shape})"
|
|
)
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Invalid normalized SAR image provided for update."
|
|
)
|
|
|
|
if geo_info is not None and isinstance(geo_info, dict):
|
|
self.current_sar_geo_info = geo_info # Assume copy was made before calling
|
|
logging.debug(
|
|
f"{log_prefix} Updated current_sar_geo_info (Valid: {geo_info.get('valid', 'N/A')})"
|
|
)
|
|
else:
|
|
logging.warning(f"{log_prefix} Invalid geo_info provided for update.")
|
|
# Ensure geo_info is always a dict with 'valid' key
|
|
self.current_sar_geo_info = self._initialize_geo_info()
|
|
self.current_sar_geo_info["valid"] = False
|
|
|
|
def set_mfd_indices(self, indices_image: np.ndarray):
|
|
"""Updates the current MFD indices image."""
|
|
log_prefix = "[AppState Update]"
|
|
if indices_image is not None and isinstance(indices_image, np.ndarray):
|
|
self.current_mfd_indices = (
|
|
indices_image # Assume copy was made before calling
|
|
)
|
|
logging.debug(
|
|
f"{log_prefix} Updated current_mfd_indices (Shape: {indices_image.shape})"
|
|
)
|
|
else:
|
|
logging.warning(
|
|
f"{log_prefix} Invalid indices image provided for MFD update."
|
|
)
|
|
|
|
|
|
# --- END OF FILE app_state.py ---
|