# test_mode_manager.py """ THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. Manages the Test Mode functionality for the application. Handles test image generation, scheduled updates for SAR and MFD test displays, applies relevant parameters from AppState (LUTs, palette, size), performs scrolling, and queues the resulting images for the DisplayManager. """ # Standard library imports import logging import time import queue # For type hinting from typing import Optional # Third-party imports import numpy as np import cv2 import tkinter as tk # For root.after # Local application imports from controlpanel import config from controlpanel.app_state import AppState from controlpanel.utils.utils import put_queue from controlpanel.utils.image_processing import ( normalize_image, apply_color_palette, resize_image, ) class TestModeManager: """Manages application behavior when running in Test Mode.""" def __init__( self, app_state: AppState, root: tk.Tk, sar_queue: queue.Queue, mfd_queue: queue.Queue, app, # Pass the app instance for put_queue context ): """ Initializes the TestModeManager. Args: app_state (AppState): Reference to the shared application state. root (tk.Tk): The main Tkinter window instance (for scheduling). sar_queue (queue.Queue): Queue for sending processed SAR images for display. mfd_queue (queue.Queue): Queue for sending processed MFD images for display. app (App): The main application instance (for put_queue context). """ self._log_prefix = "[TestModeManager]" logging.debug(f"{self._log_prefix} Initializing...") self._app_state: AppState = app_state self._root: tk.Tk = root self._sar_queue: queue.Queue = sar_queue self._mfd_queue: queue.Queue = mfd_queue self._app = app # Store app reference for put_queue # Internal state for test mode operation self._mfd_test_timer_id: Optional[str] = None self._sar_test_timer_id: Optional[str] = None self._test_mfd_offset: int = 0 self._test_sar_offset: int = 0 logging.debug(f"{self._log_prefix} Initialization complete.") def activate(self): """Activates the test mode: ensures data exists and starts update timers.""" log_prefix = f"{self._log_prefix} Activate" logging.info(f"{log_prefix} Activating Test Mode...") # Ensure test data exists in AppState (generate if needed) self._ensure_test_images() if ( self._app_state.test_mfd_image_indices is None or self._app_state.test_sar_image_raw is None ): logging.error( f"{log_prefix} Test image data missing in AppState! Cannot activate." ) # Should we revert the state flag? App handler should do this. return False # Indicate activation failure # Stop any existing timers, clear queues (App should handle queue clearing), reset offsets self.stop_timers() self._test_mfd_offset = 0 self._test_sar_offset = 0 logging.debug(f"{log_prefix} Timers stopped and offsets reset.") # Schedule the first update for MFD and SAR self._schedule_mfd_test_update() self._schedule_sar_test_update() logging.info(f"{log_prefix} Test Mode update loops scheduled.") return True # Indicate success def deactivate(self): """Deactivates the test mode by stopping update timers.""" log_prefix = f"{self._log_prefix} Deactivate" logging.info(f"{log_prefix} Deactivating Test Mode (stopping timers)...") self.stop_timers() logging.info(f"{log_prefix} Test Mode timers stopped.") def _ensure_test_images(self): """ Checks if test images exist in AppState and generates them if not. Moved from App.generate_test_images. """ log_prefix = f"{self._log_prefix} Ensure Test Images" # Check if both images already exist in state if ( self._app_state.test_mfd_image_indices is not None and self._app_state.test_sar_image_raw is not None ): logging.debug(f"{log_prefix} Test images already present in AppState.") return logging.info(f"{log_prefix} Generating test images into AppState...") try: # MFD Indices Generation mfd_shape = (config.MFD_HEIGHT, config.MFD_WIDTH) self._app_state.test_mfd_image_indices = np.random.randint( low=0, high=256, size=mfd_shape, dtype=np.uint8 ) logging.debug( f"{log_prefix} Generated random MFD indices (shape {mfd_shape})." ) # SAR Raw Data Generation sar_shape = (config.SAR_HEIGHT, config.SAR_WIDTH) # Use iinfo to get min/max for the configured SAR data type try: dtype_info = np.iinfo(config.SAR_DATA_TYPE) min_val = dtype_info.min max_val = dtype_info.max except ValueError: # Handle case where SAR_DATA_TYPE might be float logging.warning( f"{log_prefix} SAR_DATA_TYPE {config.SAR_DATA_TYPE} is not integer. Generating float test data [0,1)." ) min_val = 0.0 max_val = 1.0 # Adjust range if floats are needed # Generate floats if needed, otherwise stick to integers if np.issubdtype(config.SAR_DATA_TYPE, np.floating): self._app_state.test_sar_image_raw = np.random.rand( *sar_shape ).astype(config.SAR_DATA_TYPE) else: # Fallback to uint16 if type is weird but not float logging.warning( f"{log_prefix} Unexpected SAR_DATA_TYPE {config.SAR_DATA_TYPE}. Falling back to uint16 generation." ) dtype_info = np.iinfo(np.uint16) min_val = dtype_info.min max_val = dtype_info.max self._app_state.test_sar_image_raw = np.random.randint( low=min_val, high=max_val + 1, # randint is exclusive of high size=sar_shape, dtype=np.uint16, # Explicit fallback type ) # Generate integers if type allows if np.issubdtype(config.SAR_DATA_TYPE, np.integer): self._app_state.test_sar_image_raw = np.random.randint( low=min_val, high=max_val + 1, # numpy randint is exclusive of high size=sar_shape, dtype=config.SAR_DATA_TYPE, ) logging.debug( f"{log_prefix} Generated random SAR raw data (shape {sar_shape}, dtype {config.SAR_DATA_TYPE})." ) logging.info( f"{log_prefix} Test images generated successfully into AppState." ) except Exception as e: logging.exception(f"{log_prefix} Error generating test images:") # Set fallback state to avoid None values crashing later code if self._app_state.test_mfd_image_indices is None: self._app_state.test_mfd_image_indices = np.zeros( (config.MFD_HEIGHT, config.MFD_WIDTH), dtype=np.uint8 ) if self._app_state.test_sar_image_raw is None: self._app_state.test_sar_image_raw = np.zeros( (config.SAR_HEIGHT, config.SAR_WIDTH), dtype=config.SAR_DATA_TYPE ) logging.error( f"{log_prefix} Fallback test images (zeros) set in AppState due to generation error." ) def _schedule_mfd_test_update(self): """Schedules the next MFD test image update if active.""" log_prefix = f"{self._log_prefix} MFD Scheduler" # Check application state flags before proceeding if self._app_state.test_mode_active and not self._app_state.shutting_down: # Call the update logic for one frame self._update_mfd_test_display() # Calculate delay based on configured MFD FPS delay_ms = ( max(1, int(1000 / config.MFD_FPS)) if config.MFD_FPS > 0 else 40 ) # Default ~25fps try: # Schedule the next call using Tkinter's after method # Ensure root window still exists if self._root and self._root.winfo_exists(): self._mfd_test_timer_id = self._root.after( delay_ms, self._schedule_mfd_test_update ) logging.debug( f"{log_prefix} Scheduled next update in {delay_ms} ms (ID: {self._mfd_test_timer_id})." ) else: logging.warning( f"{log_prefix} Root window destroyed. Stopping MFD test updates." ) self._mfd_test_timer_id = None # Ensure timer ID is cleared except Exception as e: # Log error during scheduling but attempt to stop timer ID logging.warning(f"{log_prefix} Error scheduling next MFD update: {e}") self._mfd_test_timer_id = None else: # Log if scheduling stops due to state change logging.debug( f"{log_prefix} Test mode inactive or shutting down. Stopping MFD updates." ) self._mfd_test_timer_id = None # Ensure timer ID is cleared def _schedule_sar_test_update(self): """Schedules the next SAR test image update if active.""" log_prefix = f"{self._log_prefix} SAR Scheduler" # Check application state flags before proceeding if self._app_state.test_mode_active and not self._app_state.shutting_down: # Call the update logic for one frame self._update_sar_test_display() # Use a fixed delay for SAR test updates (e.g., 1 second) delay_ms = 1000 try: # Schedule the next call using Tkinter's after method # Ensure root window still exists if self._root and self._root.winfo_exists(): self._sar_test_timer_id = self._root.after( delay_ms, self._schedule_sar_test_update ) logging.debug( f"{log_prefix} Scheduled next update in {delay_ms} ms (ID: {self._sar_test_timer_id})." ) else: logging.warning( f"{log_prefix} Root window destroyed. Stopping SAR test updates." ) self._sar_test_timer_id = None # Ensure timer ID is cleared except Exception as e: # Log error during scheduling but attempt to stop timer ID logging.warning(f"{log_prefix} Error scheduling next SAR update: {e}") self._sar_test_timer_id = None else: # Log if scheduling stops due to state change logging.debug( f"{log_prefix} Test mode inactive or shutting down. Stopping SAR updates." ) self._sar_test_timer_id = None # Ensure timer ID is cleared def stop_timers(self): """Cancels any active test mode update timers.""" log_prefix = f"{self._log_prefix} Stop Timers" # Cancel MFD timer if active if self._mfd_test_timer_id: try: # Check if root window exists before cancelling if self._root and self._root.winfo_exists(): self._root.after_cancel(self._mfd_test_timer_id) logging.debug( f"{log_prefix} MFD test timer (ID: {self._mfd_test_timer_id}) cancelled." ) except Exception as e: # Log warning if cancellation fails (e.g., ID invalid, window closed) logging.warning( f"{log_prefix} Ignoring error cancelling MFD timer (ID: {self._mfd_test_timer_id}): {e}" ) finally: self._mfd_test_timer_id = None # Always clear the ID # Cancel SAR timer if active if self._sar_test_timer_id: try: # Check if root window exists if self._root and self._root.winfo_exists(): self._root.after_cancel(self._sar_test_timer_id) logging.debug( f"{log_prefix} SAR test timer (ID: {self._sar_test_timer_id}) cancelled." ) except Exception as e: # Log warning if cancellation fails logging.warning( f"{log_prefix} Ignoring error cancelling SAR timer (ID: {self._sar_test_timer_id}): {e}" ) finally: self._sar_test_timer_id = None # Always clear the ID def _update_mfd_test_display(self): """ Processes and queues one MFD test frame using data and parameters from AppState. Reads test_mfd_image_indices, applies mfd_lut, scrolls, and queues the result. """ log_prefix = f"{self._log_prefix} MFD Update" # Check state flags before intensive processing if self._app_state.shutting_down: logging.debug(f"{log_prefix} Shutdown detected. Skipping update.") return if not self._app_state.test_mode_active: logging.debug(f"{log_prefix} Test mode not active. Skipping update.") return # Should not happen if called from scheduler, but defensive check # --- Get required data and parameters from AppState --- try: test_indices = self._app_state.test_mfd_image_indices mfd_lut = self._app_state.mfd_lut except AttributeError as ae: logging.error( f"{log_prefix} Missing required attribute in AppState: {ae}. Cannot process." ) self.stop_timers() # Stop updates if state is broken return # Validate data existence if test_indices is None: logging.warning( f"{log_prefix} Test MFD indices data missing in AppState. Cannot process." ) # Attempt regeneration or stop? For now, log and return. return if mfd_lut is None: logging.warning( f"{log_prefix} MFD LUT missing in AppState. Cannot process." ) # Maybe try regenerating LUT? For now, log and return. return logging.debug(f"{log_prefix} Processing MFD test frame...") try: # --- Scrolling --- # Use internal offset attribute, update it self._test_mfd_offset = ( self._test_mfd_offset + 2 ) % config.MFD_WIDTH # Scroll by 2 pixels # Apply roll using the internal offset scrolled_indices = np.roll(test_indices, -self._test_mfd_offset, axis=1) logging.debug( f"{log_prefix} Applied scroll (offset: {self._test_mfd_offset})." ) # --- Check shutdown again before LUT application --- if self._app_state.shutting_down: logging.debug( f"{log_prefix} Shutdown detected after scroll. Skipping LUT." ) return # --- Apply MFD LUT from AppState --- logging.debug( f"{log_prefix} Applying MFD LUT (shape {mfd_lut.shape}) to indices (shape {scrolled_indices.shape})..." ) # Perform LUT lookup using NumPy indexing mfd_bgr_image = mfd_lut[scrolled_indices] logging.debug( f"{log_prefix} MFD LUT applied. Result shape: {mfd_bgr_image.shape}." ) # --- Check shutdown again before queueing --- if self._app_state.shutting_down: logging.debug( f"{log_prefix} Shutdown detected after LUT. Skipping queue put." ) return # --- Queue the processed image --- # Use put_queue utility, passing the app instance for context put_queue(self._mfd_queue, mfd_bgr_image.copy(), "mfd", self._app) logging.debug(f"{log_prefix} Queued processed MFD test image.") # --- Update FPS stats (handled by App after receiving data in main thread) --- # self._app.state._update_fps_stats("mfd") # Consider moving FPS update to App's main thread processor except IndexError as idx_err: # Handle potential index errors if LUT/indices dimensions mismatch min_idx, max_idx = "-", "-" if scrolled_indices is not None: try: min_idx = np.min(scrolled_indices) max_idx = np.max(scrolled_indices) except ValueError: pass # Handle empty array case lut_shape_str = str(mfd_lut.shape) if mfd_lut is not None else "None" logging.error( f"{log_prefix} MFD LUT IndexError: {idx_err}. " f"Indices range maybe ({min_idx},{max_idx}). LUT shape {lut_shape_str}" ) self.stop_timers() # Stop test mode on critical error except Exception as e: logging.exception(f"{log_prefix} Error during MFD test display update:") self.stop_timers() # Stop test mode on critical error def _update_sar_test_display(self): """ Processes and queues one SAR test frame using data and parameters from AppState. Reads test_sar_image_raw, normalizes, applies B/C LUT, palette, resizes, scrolls, and queues the result. """ log_prefix = f"{self._log_prefix} SAR Update" # Check state flags first if self._app_state.shutting_down: logging.debug(f"{log_prefix} Shutdown detected. Skipping update.") return if not self._app_state.test_mode_active: logging.debug(f"{log_prefix} Test mode not active. Skipping update.") return # --- Get required data and parameters from AppState --- try: test_raw_data = self._app_state.test_sar_image_raw bc_lut = self._app_state.brightness_contrast_lut palette = self._app_state.sar_palette display_width = self._app_state.sar_display_width display_height = self._app_state.sar_display_height except AttributeError as ae: logging.error( f"{log_prefix} Missing required attribute in AppState: {ae}. Cannot process." ) self.stop_timers() return # Validate data and parameters if test_raw_data is None: logging.warning( f"{log_prefix} Test SAR raw data missing in AppState. Cannot process." ) return if bc_lut is None: logging.warning( f"{log_prefix} SAR Brightness/Contrast LUT missing in AppState. Cannot process." ) return if display_width <= 0 or display_height <= 0: logging.warning( f"{log_prefix} Invalid SAR display dimensions ({display_width}x{display_height}) in AppState. Cannot process." ) return logging.debug(f"{log_prefix} Processing SAR test frame...") try: # --- Processing Pipeline (using functions from image_processing) --- # 1. Normalize Raw Data to uint8 logging.debug( f"{log_prefix} Normalizing raw test data (shape {test_raw_data.shape}, dtype {test_raw_data.dtype}) to uint8..." ) img = normalize_image(test_raw_data, target_type=np.uint8) if ( img is None or self._app_state.shutting_down ): # Check result and shutdown if img is None: logging.error(f"{log_prefix} Normalization failed.") else: logging.debug(f"{log_prefix} Shutdown after normalization.") return logging.debug( f"{log_prefix} Normalization complete. Image shape: {img.shape}." ) # 2. Apply Brightness/Contrast LUT logging.debug(f"{log_prefix} Applying B/C LUT (shape {bc_lut.shape})...") img = cv2.LUT(img, bc_lut) if self._app_state.shutting_down: # Check shutdown logging.debug(f"{log_prefix} Shutdown after B/C LUT.") return logging.debug(f"{log_prefix} B/C LUT applied.") # 3. Apply Color Palette (if not GRAY) if palette != "GRAY": logging.debug(f"{log_prefix} Applying color palette: {palette}...") img = apply_color_palette(img, palette) if self._app_state.shutting_down: # Check shutdown logging.debug(f"{log_prefix} Shutdown after palette.") return logging.debug( f"{log_prefix} Palette '{palette}' applied. Image shape: {img.shape}." ) else: logging.debug(f"{log_prefix} Skipping color palette (GRAY selected).") # 4. Resize Image to display dimensions logging.debug( f"{log_prefix} Resizing image to {display_width}x{display_height}..." ) img = resize_image(img, display_width, display_height) if ( img is None or self._app_state.shutting_down ): # Check result and shutdown if img is None: logging.error(f"{log_prefix} Resize failed.") else: logging.debug(f"{log_prefix} Shutdown after resize.") return logging.debug(f"{log_prefix} Resize complete. Image shape: {img.shape}.") # --- Scrolling --- # Use internal offset attribute, update it self._test_sar_offset = ( self._test_sar_offset + 1 ) % display_width # Scroll by 1 pixel # Apply roll using the internal offset img = np.roll(img, -self._test_sar_offset, axis=1) logging.debug( f"{log_prefix} Applied scroll (offset: {self._test_sar_offset})." ) # --- Check shutdown again before queueing --- if self._app_state.shutting_down: logging.debug( f"{log_prefix} Shutdown detected after scroll. Skipping queue put." ) return # --- Queue the processed image --- # Use put_queue utility, passing the app instance for context put_queue(self._sar_queue, img.copy(), "sar", self._app) logging.debug(f"{log_prefix} Queued processed SAR test image.") # --- Update FPS stats (handled by App after receiving data in main thread) --- # self._app.state._update_fps_stats("sar") # Consider moving FPS update to App's main thread processor except Exception as e: logging.exception(f"{log_prefix} Error during SAR test display update:") self.stop_timers() # Stop test mode on critical error