SXXXXXXX_ControlPanel/controlpanel/core/test_mode_manager.py
2025-05-06 11:18:50 +02:00

543 lines
24 KiB
Python

# test_mode_manager.py
"""
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
Manages the Test Mode functionality for the application.
Handles test image generation, scheduled updates for SAR and MFD test displays,
applies relevant parameters from AppState (LUTs, palette, size), performs
scrolling, and queues the resulting images for the DisplayManager.
"""
# Standard library imports
import logging
import time
import queue # For type hinting
from typing import Optional
# Third-party imports
import numpy as np
import cv2
import tkinter as tk # For root.after
# Local application imports
from controlpanel import config
from controlpanel.app_state import AppState
from controlpanel.utils.utils import put_queue
from controlpanel.utils.image_processing import (
normalize_image,
apply_color_palette,
resize_image,
)
class TestModeManager:
"""Manages application behavior when running in Test Mode."""
def __init__(
self,
app_state: AppState,
root: tk.Tk,
sar_queue: queue.Queue,
mfd_queue: queue.Queue,
app, # Pass the app instance for put_queue context
):
"""
Initializes the TestModeManager.
Args:
app_state (AppState): Reference to the shared application state.
root (tk.Tk): The main Tkinter window instance (for scheduling).
sar_queue (queue.Queue): Queue for sending processed SAR images for display.
mfd_queue (queue.Queue): Queue for sending processed MFD images for display.
app (App): The main application instance (for put_queue context).
"""
self._log_prefix = "[TestModeManager]"
logging.debug(f"{self._log_prefix} Initializing...")
self._app_state: AppState = app_state
self._root: tk.Tk = root
self._sar_queue: queue.Queue = sar_queue
self._mfd_queue: queue.Queue = mfd_queue
self._app = app # Store app reference for put_queue
# Internal state for test mode operation
self._mfd_test_timer_id: Optional[str] = None
self._sar_test_timer_id: Optional[str] = None
self._test_mfd_offset: int = 0
self._test_sar_offset: int = 0
logging.debug(f"{self._log_prefix} Initialization complete.")
def activate(self):
"""Activates the test mode: ensures data exists and starts update timers."""
log_prefix = f"{self._log_prefix} Activate"
logging.info(f"{log_prefix} Activating Test Mode...")
# Ensure test data exists in AppState (generate if needed)
self._ensure_test_images()
if (
self._app_state.test_mfd_image_indices is None
or self._app_state.test_sar_image_raw is None
):
logging.error(
f"{log_prefix} Test image data missing in AppState! Cannot activate."
)
# Should we revert the state flag? App handler should do this.
return False # Indicate activation failure
# Stop any existing timers, clear queues (App should handle queue clearing), reset offsets
self.stop_timers()
self._test_mfd_offset = 0
self._test_sar_offset = 0
logging.debug(f"{log_prefix} Timers stopped and offsets reset.")
# Schedule the first update for MFD and SAR
self._schedule_mfd_test_update()
self._schedule_sar_test_update()
logging.info(f"{log_prefix} Test Mode update loops scheduled.")
return True # Indicate success
def deactivate(self):
"""Deactivates the test mode by stopping update timers."""
log_prefix = f"{self._log_prefix} Deactivate"
logging.info(f"{log_prefix} Deactivating Test Mode (stopping timers)...")
self.stop_timers()
logging.info(f"{log_prefix} Test Mode timers stopped.")
def _ensure_test_images(self):
"""
Checks if test images exist in AppState and generates them if not.
Moved from App.generate_test_images.
"""
log_prefix = f"{self._log_prefix} Ensure Test Images"
# Check if both images already exist in state
if (
self._app_state.test_mfd_image_indices is not None
and self._app_state.test_sar_image_raw is not None
):
logging.debug(f"{log_prefix} Test images already present in AppState.")
return
logging.info(f"{log_prefix} Generating test images into AppState...")
try:
# MFD Indices Generation
mfd_shape = (config.MFD_HEIGHT, config.MFD_WIDTH)
self._app_state.test_mfd_image_indices = np.random.randint(
low=0, high=256, size=mfd_shape, dtype=np.uint8
)
logging.debug(
f"{log_prefix} Generated random MFD indices (shape {mfd_shape})."
)
# SAR Raw Data Generation
sar_shape = (config.SAR_HEIGHT, config.SAR_WIDTH)
# Use iinfo to get min/max for the configured SAR data type
try:
dtype_info = np.iinfo(config.SAR_DATA_TYPE)
min_val = dtype_info.min
max_val = dtype_info.max
except ValueError: # Handle case where SAR_DATA_TYPE might be float
logging.warning(
f"{log_prefix} SAR_DATA_TYPE {config.SAR_DATA_TYPE} is not integer. Generating float test data [0,1)."
)
min_val = 0.0
max_val = 1.0 # Adjust range if floats are needed
# Generate floats if needed, otherwise stick to integers
if np.issubdtype(config.SAR_DATA_TYPE, np.floating):
self._app_state.test_sar_image_raw = np.random.rand(
*sar_shape
).astype(config.SAR_DATA_TYPE)
else: # Fallback to uint16 if type is weird but not float
logging.warning(
f"{log_prefix} Unexpected SAR_DATA_TYPE {config.SAR_DATA_TYPE}. Falling back to uint16 generation."
)
dtype_info = np.iinfo(np.uint16)
min_val = dtype_info.min
max_val = dtype_info.max
self._app_state.test_sar_image_raw = np.random.randint(
low=min_val,
high=max_val + 1, # randint is exclusive of high
size=sar_shape,
dtype=np.uint16, # Explicit fallback type
)
# Generate integers if type allows
if np.issubdtype(config.SAR_DATA_TYPE, np.integer):
self._app_state.test_sar_image_raw = np.random.randint(
low=min_val,
high=max_val + 1, # numpy randint is exclusive of high
size=sar_shape,
dtype=config.SAR_DATA_TYPE,
)
logging.debug(
f"{log_prefix} Generated random SAR raw data (shape {sar_shape}, dtype {config.SAR_DATA_TYPE})."
)
logging.info(
f"{log_prefix} Test images generated successfully into AppState."
)
except Exception as e:
logging.exception(f"{log_prefix} Error generating test images:")
# Set fallback state to avoid None values crashing later code
if self._app_state.test_mfd_image_indices is None:
self._app_state.test_mfd_image_indices = np.zeros(
(config.MFD_HEIGHT, config.MFD_WIDTH), dtype=np.uint8
)
if self._app_state.test_sar_image_raw is None:
self._app_state.test_sar_image_raw = np.zeros(
(config.SAR_HEIGHT, config.SAR_WIDTH), dtype=config.SAR_DATA_TYPE
)
logging.error(
f"{log_prefix} Fallback test images (zeros) set in AppState due to generation error."
)
def _schedule_mfd_test_update(self):
"""Schedules the next MFD test image update if active."""
log_prefix = f"{self._log_prefix} MFD Scheduler"
# Check application state flags before proceeding
if self._app_state.test_mode_active and not self._app_state.shutting_down:
# Call the update logic for one frame
self._update_mfd_test_display()
# Calculate delay based on configured MFD FPS
delay_ms = (
max(1, int(1000 / config.MFD_FPS)) if config.MFD_FPS > 0 else 40
) # Default ~25fps
try:
# Schedule the next call using Tkinter's after method
# Ensure root window still exists
if self._root and self._root.winfo_exists():
self._mfd_test_timer_id = self._root.after(
delay_ms, self._schedule_mfd_test_update
)
logging.debug(
f"{log_prefix} Scheduled next update in {delay_ms} ms (ID: {self._mfd_test_timer_id})."
)
else:
logging.warning(
f"{log_prefix} Root window destroyed. Stopping MFD test updates."
)
self._mfd_test_timer_id = None # Ensure timer ID is cleared
except Exception as e:
# Log error during scheduling but attempt to stop timer ID
logging.warning(f"{log_prefix} Error scheduling next MFD update: {e}")
self._mfd_test_timer_id = None
else:
# Log if scheduling stops due to state change
logging.debug(
f"{log_prefix} Test mode inactive or shutting down. Stopping MFD updates."
)
self._mfd_test_timer_id = None # Ensure timer ID is cleared
def _schedule_sar_test_update(self):
"""Schedules the next SAR test image update if active."""
log_prefix = f"{self._log_prefix} SAR Scheduler"
# Check application state flags before proceeding
if self._app_state.test_mode_active and not self._app_state.shutting_down:
# Call the update logic for one frame
self._update_sar_test_display()
# Use a fixed delay for SAR test updates (e.g., 1 second)
delay_ms = 1000
try:
# Schedule the next call using Tkinter's after method
# Ensure root window still exists
if self._root and self._root.winfo_exists():
self._sar_test_timer_id = self._root.after(
delay_ms, self._schedule_sar_test_update
)
logging.debug(
f"{log_prefix} Scheduled next update in {delay_ms} ms (ID: {self._sar_test_timer_id})."
)
else:
logging.warning(
f"{log_prefix} Root window destroyed. Stopping SAR test updates."
)
self._sar_test_timer_id = None # Ensure timer ID is cleared
except Exception as e:
# Log error during scheduling but attempt to stop timer ID
logging.warning(f"{log_prefix} Error scheduling next SAR update: {e}")
self._sar_test_timer_id = None
else:
# Log if scheduling stops due to state change
logging.debug(
f"{log_prefix} Test mode inactive or shutting down. Stopping SAR updates."
)
self._sar_test_timer_id = None # Ensure timer ID is cleared
def stop_timers(self):
"""Cancels any active test mode update timers."""
log_prefix = f"{self._log_prefix} Stop Timers"
# Cancel MFD timer if active
if self._mfd_test_timer_id:
try:
# Check if root window exists before cancelling
if self._root and self._root.winfo_exists():
self._root.after_cancel(self._mfd_test_timer_id)
logging.debug(
f"{log_prefix} MFD test timer (ID: {self._mfd_test_timer_id}) cancelled."
)
except Exception as e:
# Log warning if cancellation fails (e.g., ID invalid, window closed)
logging.warning(
f"{log_prefix} Ignoring error cancelling MFD timer (ID: {self._mfd_test_timer_id}): {e}"
)
finally:
self._mfd_test_timer_id = None # Always clear the ID
# Cancel SAR timer if active
if self._sar_test_timer_id:
try:
# Check if root window exists
if self._root and self._root.winfo_exists():
self._root.after_cancel(self._sar_test_timer_id)
logging.debug(
f"{log_prefix} SAR test timer (ID: {self._sar_test_timer_id}) cancelled."
)
except Exception as e:
# Log warning if cancellation fails
logging.warning(
f"{log_prefix} Ignoring error cancelling SAR timer (ID: {self._sar_test_timer_id}): {e}"
)
finally:
self._sar_test_timer_id = None # Always clear the ID
def _update_mfd_test_display(self):
"""
Processes and queues one MFD test frame using data and parameters from AppState.
Reads test_mfd_image_indices, applies mfd_lut, scrolls, and queues the result.
"""
log_prefix = f"{self._log_prefix} MFD Update"
# Check state flags before intensive processing
if self._app_state.shutting_down:
logging.debug(f"{log_prefix} Shutdown detected. Skipping update.")
return
if not self._app_state.test_mode_active:
logging.debug(f"{log_prefix} Test mode not active. Skipping update.")
return # Should not happen if called from scheduler, but defensive check
# --- Get required data and parameters from AppState ---
try:
test_indices = self._app_state.test_mfd_image_indices
mfd_lut = self._app_state.mfd_lut
except AttributeError as ae:
logging.error(
f"{log_prefix} Missing required attribute in AppState: {ae}. Cannot process."
)
self.stop_timers() # Stop updates if state is broken
return
# Validate data existence
if test_indices is None:
logging.warning(
f"{log_prefix} Test MFD indices data missing in AppState. Cannot process."
)
# Attempt regeneration or stop? For now, log and return.
return
if mfd_lut is None:
logging.warning(
f"{log_prefix} MFD LUT missing in AppState. Cannot process."
)
# Maybe try regenerating LUT? For now, log and return.
return
logging.debug(f"{log_prefix} Processing MFD test frame...")
try:
# --- Scrolling ---
# Use internal offset attribute, update it
self._test_mfd_offset = (
self._test_mfd_offset + 2
) % config.MFD_WIDTH # Scroll by 2 pixels
# Apply roll using the internal offset
scrolled_indices = np.roll(test_indices, -self._test_mfd_offset, axis=1)
logging.debug(
f"{log_prefix} Applied scroll (offset: {self._test_mfd_offset})."
)
# --- Check shutdown again before LUT application ---
if self._app_state.shutting_down:
logging.debug(
f"{log_prefix} Shutdown detected after scroll. Skipping LUT."
)
return
# --- Apply MFD LUT from AppState ---
logging.debug(
f"{log_prefix} Applying MFD LUT (shape {mfd_lut.shape}) to indices (shape {scrolled_indices.shape})..."
)
# Perform LUT lookup using NumPy indexing
mfd_bgr_image = mfd_lut[scrolled_indices]
logging.debug(
f"{log_prefix} MFD LUT applied. Result shape: {mfd_bgr_image.shape}."
)
# --- Check shutdown again before queueing ---
if self._app_state.shutting_down:
logging.debug(
f"{log_prefix} Shutdown detected after LUT. Skipping queue put."
)
return
# --- Queue the processed image ---
# Use put_queue utility, passing the app instance for context
put_queue(self._mfd_queue, mfd_bgr_image.copy(), "mfd", self._app)
logging.debug(f"{log_prefix} Queued processed MFD test image.")
# --- Update FPS stats (handled by App after receiving data in main thread) ---
# self._app.state._update_fps_stats("mfd") # Consider moving FPS update to App's main thread processor
except IndexError as idx_err:
# Handle potential index errors if LUT/indices dimensions mismatch
min_idx, max_idx = "-", "-"
if scrolled_indices is not None:
try:
min_idx = np.min(scrolled_indices)
max_idx = np.max(scrolled_indices)
except ValueError:
pass # Handle empty array case
lut_shape_str = str(mfd_lut.shape) if mfd_lut is not None else "None"
logging.error(
f"{log_prefix} MFD LUT IndexError: {idx_err}. "
f"Indices range maybe ({min_idx},{max_idx}). LUT shape {lut_shape_str}"
)
self.stop_timers() # Stop test mode on critical error
except Exception as e:
logging.exception(f"{log_prefix} Error during MFD test display update:")
self.stop_timers() # Stop test mode on critical error
def _update_sar_test_display(self):
"""
Processes and queues one SAR test frame using data and parameters from AppState.
Reads test_sar_image_raw, normalizes, applies B/C LUT, palette, resizes,
scrolls, and queues the result.
"""
log_prefix = f"{self._log_prefix} SAR Update"
# Check state flags first
if self._app_state.shutting_down:
logging.debug(f"{log_prefix} Shutdown detected. Skipping update.")
return
if not self._app_state.test_mode_active:
logging.debug(f"{log_prefix} Test mode not active. Skipping update.")
return
# --- Get required data and parameters from AppState ---
try:
test_raw_data = self._app_state.test_sar_image_raw
bc_lut = self._app_state.brightness_contrast_lut
palette = self._app_state.sar_palette
display_width = self._app_state.sar_display_width
display_height = self._app_state.sar_display_height
except AttributeError as ae:
logging.error(
f"{log_prefix} Missing required attribute in AppState: {ae}. Cannot process."
)
self.stop_timers()
return
# Validate data and parameters
if test_raw_data is None:
logging.warning(
f"{log_prefix} Test SAR raw data missing in AppState. Cannot process."
)
return
if bc_lut is None:
logging.warning(
f"{log_prefix} SAR Brightness/Contrast LUT missing in AppState. Cannot process."
)
return
if display_width <= 0 or display_height <= 0:
logging.warning(
f"{log_prefix} Invalid SAR display dimensions ({display_width}x{display_height}) in AppState. Cannot process."
)
return
logging.debug(f"{log_prefix} Processing SAR test frame...")
try:
# --- Processing Pipeline (using functions from image_processing) ---
# 1. Normalize Raw Data to uint8
logging.debug(
f"{log_prefix} Normalizing raw test data (shape {test_raw_data.shape}, dtype {test_raw_data.dtype}) to uint8..."
)
img = normalize_image(test_raw_data, target_type=np.uint8)
if (
img is None or self._app_state.shutting_down
): # Check result and shutdown
if img is None:
logging.error(f"{log_prefix} Normalization failed.")
else:
logging.debug(f"{log_prefix} Shutdown after normalization.")
return
logging.debug(
f"{log_prefix} Normalization complete. Image shape: {img.shape}."
)
# 2. Apply Brightness/Contrast LUT
logging.debug(f"{log_prefix} Applying B/C LUT (shape {bc_lut.shape})...")
img = cv2.LUT(img, bc_lut)
if self._app_state.shutting_down: # Check shutdown
logging.debug(f"{log_prefix} Shutdown after B/C LUT.")
return
logging.debug(f"{log_prefix} B/C LUT applied.")
# 3. Apply Color Palette (if not GRAY)
if palette != "GRAY":
logging.debug(f"{log_prefix} Applying color palette: {palette}...")
img = apply_color_palette(img, palette)
if self._app_state.shutting_down: # Check shutdown
logging.debug(f"{log_prefix} Shutdown after palette.")
return
logging.debug(
f"{log_prefix} Palette '{palette}' applied. Image shape: {img.shape}."
)
else:
logging.debug(f"{log_prefix} Skipping color palette (GRAY selected).")
# 4. Resize Image to display dimensions
logging.debug(
f"{log_prefix} Resizing image to {display_width}x{display_height}..."
)
img = resize_image(img, display_width, display_height)
if (
img is None or self._app_state.shutting_down
): # Check result and shutdown
if img is None:
logging.error(f"{log_prefix} Resize failed.")
else:
logging.debug(f"{log_prefix} Shutdown after resize.")
return
logging.debug(f"{log_prefix} Resize complete. Image shape: {img.shape}.")
# --- Scrolling ---
# Use internal offset attribute, update it
self._test_sar_offset = (
self._test_sar_offset + 1
) % display_width # Scroll by 1 pixel
# Apply roll using the internal offset
img = np.roll(img, -self._test_sar_offset, axis=1)
logging.debug(
f"{log_prefix} Applied scroll (offset: {self._test_sar_offset})."
)
# --- Check shutdown again before queueing ---
if self._app_state.shutting_down:
logging.debug(
f"{log_prefix} Shutdown detected after scroll. Skipping queue put."
)
return
# --- Queue the processed image ---
# Use put_queue utility, passing the app instance for context
put_queue(self._sar_queue, img.copy(), "sar", self._app)
logging.debug(f"{log_prefix} Queued processed SAR test image.")
# --- Update FPS stats (handled by App after receiving data in main thread) ---
# self._app.state._update_fps_stats("sar") # Consider moving FPS update to App's main thread processor
except Exception as e:
logging.exception(f"{log_prefix} Error during SAR test display update:")
self.stop_timers() # Stop test mode on critical error