# --- START OF FILE receiver.py --- # receiver.py """ THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. Handles UDP reception, SFP fragment reassembly, metadata extraction, ACK handling, and submission of completed images (SAR/MFD) to worker threads for processing. Tracks incomplete transactions and uses standardized logging with prefixes. Calls ImageRecorder for saving SAR images. """ # Standard library imports import socket import logging import struct import os import ctypes import queue # Only needed for type hinting if used, not directly used here import cProfile import pstats import threading import time import math from typing import Optional, Dict, Any, Tuple # Add necessary types # Third-party imports import numpy as np import cv2 # Keep for potential internal use # Local application imports import config from concurrent.futures import ThreadPoolExecutor # Import specific function needed by this module from image_processing import normalize_image from utils import format_ctypes_structure, put_queue # --- >>> START OF NEW CODE <<< --- # Import the recorder class (handle optional import) try: from image_recorder import ImageRecorder except ImportError: ImageRecorder = None logging.warning( "[Receiver Init] ImageRecorder class not found. SAR Recording will be disabled." ) # --- >>> END OF NEW CODE <<< --- # --- Data Structures Definition using ctypes --- class SFPHeader(ctypes.Structure): """Structure representing the SFP header (32 bytes).""" _pack_ = 1 _fields_ = [ ("SFP_MARKER", ctypes.c_uint8), ("SFP_DIRECTION", ctypes.c_uint8), ("SFP_PROT_VER", ctypes.c_uint8), ("SFP_PT_SPARE", ctypes.c_uint8), ("SFP_TAG", ctypes.c_uint8), ("SFP_SRC", ctypes.c_uint8), ("SFP_FLOW", ctypes.c_uint8), ("SFP_TID", ctypes.c_uint8), ("SFP_FLAGS", ctypes.c_uint8), ("SFP_WIN", ctypes.c_uint8), ("SFP_ERR", ctypes.c_uint8), ("SFP_ERR_INFO", ctypes.c_uint8), ("SFP_TOTFRGAS", ctypes.c_uint16), ("SFP_FRAG", ctypes.c_uint16), ("SFP_RECTYPE", ctypes.c_uint8), ("SFP_RECSPARE", ctypes.c_uint8), ("SFP_PLDAP", ctypes.c_uint8), ("SFP_PLEXT", ctypes.c_uint8), ("SFP_RECCOUNTER", ctypes.c_uint16), ("SFP_PLSIZE", ctypes.c_uint16), ("SFP_TOTSIZE", ctypes.c_uint32), ("SFP_PLOFFSET", ctypes.c_uint32), ] @classmethod def size(cls): return ctypes.sizeof(cls) @staticmethod def get_field_offset(field_name): try: return getattr(SFPHeader, field_name).offset except AttributeError: return -1 class DataTag(ctypes.Structure): """Structure representing a generic data tag (8 bytes).""" _pack_ = 1 _fields_ = [ ("ID", ctypes.c_uint8 * 2), ("VALID", ctypes.c_uint8), ("VERSION", ctypes.c_uint8), ("SIZE", ctypes.c_uint32), ] @classmethod def size(cls): return ctypes.sizeof(cls) class HeaderData(ctypes.Structure): """Structure representing the HEADER_DATA block (~48 bytes).""" _pack_ = 1 _fields_ = [ ("TYPE", ctypes.c_uint8), ("SUBTYPE", ctypes.c_uint8), ("LOGCOLORS", ctypes.c_uint8), ("IMG_RESERVED", ctypes.c_uint8), ("PRODINFO", ctypes.c_uint32 * 2), ("TOD", ctypes.c_uint16 * 2), ("RESERVED", ctypes.c_uint32), ("FCOUNTER", ctypes.c_uint32), ("TIMETAG", ctypes.c_uint32), ("NOMINALFRATE", ctypes.c_uint16), ("FRAMETAG", ctypes.c_uint16), ("OX", ctypes.c_uint16), ("OY", ctypes.c_uint16), ("DX", ctypes.c_uint16), ("DY", ctypes.c_uint16), ("STRIDE", ctypes.c_uint16), ("BPP", ctypes.c_uint8), ("COMP", ctypes.c_uint8), ("SPARE", ctypes.c_uint16), ("PALTYPE", ctypes.c_uint8), ("GAP", ctypes.c_uint8), ] @classmethod def size(cls): return ctypes.sizeof(cls) class GeoData(ctypes.Structure): """Structure representing the GEO_DATA block (~80 bytes). Assumes specific types.""" _pack_ = 1 _fields_ = [ ("INVMASK", ctypes.c_uint32), ("ORIENTATION", ctypes.c_float), # RADIANS ("LATITUDE", ctypes.c_float), # Assumed RADIANS based on spec/usage ("LONGITUDE", ctypes.c_float), # Assumed RADIANS based on spec/usage ("REF_X", ctypes.c_uint16), ("REF_Y", ctypes.c_uint16), ("SCALE_X", ctypes.c_float), ("SCALE_Y", ctypes.c_float), ("POI_ORIENTATION", ctypes.c_float), ("POI_LATITUDE", ctypes.c_float), ("POI_LONGITUDE", ctypes.c_float), ("POI_ALTITUDE", ctypes.c_float), ("POI_X", ctypes.c_uint16), ("POI_Y", ctypes.c_uint16), ("SPARE", ctypes.c_uint32 * 7), # 28 bytes ] @classmethod def size(cls): return ctypes.sizeof(cls) class ImageLeaderData(ctypes.Structure): """Represents the complete Image Leader data structure (~1320 bytes).""" _pack_ = 1 _fields_ = [ ("HEADER_TAG", DataTag), ("HEADER_DATA", HeaderData), ("GEO_TAG", DataTag), ("GEO_DATA", GeoData), ("RESERVED_TAG", DataTag), ("RESERVED_DATA", ctypes.c_uint8 * 128), ("CM_TAG", DataTag), ("COLOUR_MAP", ctypes.c_uint32 * 256), ("PIXEL_TAG", DataTag), ] @classmethod def size(cls): return ctypes.sizeof(cls) @staticmethod def get_reserved_data_size(): return 128 @staticmethod def get_colour_map_size(): return 1024 # --- UDP Receiver Class --- class UdpReceiver: """ Handles UDP reception, SFP fragment reassembly, metadata extraction, image callback invocation, ACK sending, and statistics tracking. Uses a ThreadPoolExecutor for processing completed images. """ # --- >>> START OF MODIFIED CODE <<< --- def __init__( self, app, # Reference to the main App instance (ControlPanelApp) udp_socket, set_new_sar_image_callback, set_new_mfd_indices_image_callback, image_recorder: Optional[ImageRecorder] = None, # Accept recorder instance ): """ Initializes the UDP receiver. Args: app (ControlPanelApp): The main application instance. udp_socket (socket.socket): The bound UDP socket to receive from. set_new_sar_image_callback (callable): Callback for SAR images (norm_uint8, geo_info_radians). set_new_mfd_indices_image_callback (callable): Callback for MFD images (indices_uint8). image_recorder (Optional[ImageRecorder]): Instance for saving images. """ log_prefix = "[Receiver Init]" logging.debug(f"{log_prefix} Initializing UdpReceiver...") self.app = app self.udp_socket = udp_socket self.set_new_sar_image_callback = set_new_sar_image_callback self.set_new_mfd_image_callback = set_new_mfd_indices_image_callback self.image_recorder = image_recorder # Store recorder instance if self.image_recorder: logging.info(f"{log_prefix} Image Recorder instance provided.") else: logging.warning( f"{log_prefix} Image Recorder not provided. Recording disabled." ) # Reassembly state dictionaries self.image_fragments = {} # { (flow, tid): {frag_num: total_frags_expected} } self.image_data_buffers = {} # { (flow, tid): bytearray } logging.debug(f"{log_prefix} Reassembly dictionaries initialized.") # --- >>> END OF MODIFIED CODE <<< --- # Binary dump configuration self.save_dumps = config.SAVE_BINARY_DUMPS self.dump_dir = config.DUMP_DIRECTORY if self.save_dumps: if not os.path.exists(self.dump_dir): try: os.makedirs(self.dump_dir) logging.info( f"{log_prefix} Created binary dump directory: {self.dump_dir}" ) except OSError as e: logging.error( f"{log_prefix} Failed to create dump directory " f"'{self.dump_dir}': {e}. Disabling dumps." ) self.save_dumps = False else: logging.debug( f"{log_prefix} Binary dump directory '{self.dump_dir}' already exists." ) else: logging.debug(f"{log_prefix} Binary dump saving is disabled.") # Thread pool for image processing self.executor = None # Initialize as None try: self.executor = ThreadPoolExecutor( max_workers=config.MAX_WORKERS, thread_name_prefix="ImageProcessor" ) logging.info( f"{log_prefix} ThreadPoolExecutor created with max_workers={config.MAX_WORKERS}" ) except Exception as pool_e: logging.critical( f"{log_prefix} Failed to create ThreadPoolExecutor: {pool_e}", exc_info=True, ) # App might not function correctly without the executor # Statistics self.incomplete_sar_count = 0 self.incomplete_mfd_count = 0 self._stats_lock = threading.Lock() logging.debug(f"{log_prefix} Statistics counters and lock initialized.") logging.info(f"{log_prefix} UdpReceiver initialization complete.") # --- get_incomplete_counts (Unchanged) --- def get_incomplete_counts(self): """Returns the current counts of incomplete/lost transactions (thread-safe).""" with self._stats_lock: sar_count = self.incomplete_sar_count mfd_count = self.incomplete_mfd_count return sar_count, mfd_count # --- receive_udp_data (Unchanged) --- def receive_udp_data(self): """Main loop to listen for UDP packets, parse headers, and reassemble fragments.""" log_prefix = "[Receiver Loop]" if not self.udp_socket or self.udp_socket.fileno() == -1: logging.critical( f"{log_prefix} Receiver thread cannot start: " "socket is invalid or closed." ) return try: socket_name = self.udp_socket.getsockname() logging.info( f"{log_prefix} Receiver thread started and listening on {socket_name}." ) except OSError as sock_err: logging.critical( f"{log_prefix} Cannot get socket name (already closed?): {sock_err}" ) return profiler = None if config.ENABLE_PROFILING: profiler = cProfile.Profile() profiler.enable() logging.info(f"{log_prefix} Receiver profiling enabled.") while True: if self.app.state.shutting_down: logging.info(f"{log_prefix} Shutdown detected. Exiting thread.") break try: # Receive data and sender address data, addr = self.udp_socket.recvfrom(65535) # Max UDP packet size if not data: logging.warning( f"{log_prefix} Received empty UDP packet from {addr}. Continuing." ) continue except socket.timeout: logging.debug(f"{log_prefix} UDP socket timeout.") continue except OSError as e: # Check if error is expected during shutdown if self.app.state.shutting_down: logging.info( f"{log_prefix} UDP socket error during recvfrom " f"(expected during shutdown): {e}. Exiting." ) else: logging.error( f"{log_prefix} UDP socket error during recvfrom: {e}. " "Exiting thread." ) break # Exit loop on socket error except Exception as e: logging.exception(f"{log_prefix} Unexpected error during UDP recvfrom:") time.sleep(0.01) # Prevent high CPU usage on continuous errors continue # --- Process received packet --- try: header_size = SFPHeader.size() if len(data) < header_size: logging.warning( f"{log_prefix} Rcvd packet too small ({len(data)} bytes) " f"for SFP header from {addr}. Ignoring." ) continue # Parse header safely try: header = SFPHeader.from_buffer_copy(data) except (ValueError, TypeError) as parse_err: logging.error( f"{log_prefix} Failed to parse SFP header from {addr}: {parse_err}. " "Ignoring packet." ) continue # Extract relevant header fields sfp_flow = header.SFP_FLOW sfp_tid = header.SFP_TID sfp_frag = header.SFP_FRAG sfp_totfrgas = header.SFP_TOTFRGAS sfp_plsize = header.SFP_PLSIZE sfp_ploffset = header.SFP_PLOFFSET sfp_flags = header.SFP_FLAGS sfp_totsize = header.SFP_TOTSIZE image_key = (sfp_flow, sfp_tid) # Format flow char safely for logging flow_char = ( chr(sfp_flow) if 32 <= sfp_flow <= 126 else f"0x{sfp_flow:02X}" ) key_str = f"({flow_char},{sfp_tid})" if config.DEBUG_RECEIVER_PACKETS: logging.debug( f"Rcvd: Key={key_str}, Frag={sfp_frag}/{sfp_totfrgas}, " f"Size={sfp_plsize}, Offset={sfp_ploffset}, Flags=0x{sfp_flags:02X}, " f"TotalSize={sfp_totsize}" ) # Handle ACK Request flag if sfp_flags & 0x01: # Bit 1 is ACK Request logging.debug( f"{log_prefix} ACK Request flag (0x02) detected in Flags=0x{sfp_flags:02X} " f"for Key={key_str}. Triggering ACK send." ) self.send_ack(addr, data[:header_size]) # Save binary dump if enabled if self.save_dumps: dump_filename = ( f"{self.dump_dir}/pkt_{flow_char}_{sfp_tid}_{sfp_frag}.bin" ) self.save_binary_dump(data, dump_filename) # Validate fragment count and total size if sfp_totfrgas == 0 or sfp_totfrgas > 60000: # Check total fragments logging.warning( f"{log_prefix} Key={key_str}: Invalid total fragments count " f"{sfp_totfrgas}. Ignoring packet." ) continue MAX_EXPECTED_IMG_SIZE = 100 * 1024 * 1024 # 100 MB limit MIN_EXPECTED_IMG_SIZE = ImageLeaderData.size() if ( sfp_totsize < MIN_EXPECTED_IMG_SIZE or sfp_totsize > MAX_EXPECTED_IMG_SIZE ): logging.error( f"{log_prefix} Key={key_str}, Frag={sfp_frag}: Invalid total image size " f"{sfp_totsize} (Limits: {MIN_EXPECTED_IMG_SIZE}-{MAX_EXPECTED_IMG_SIZE}). " "Ignoring packet/transaction." ) # Don't start reassembly if total size is invalid continue # --- Start New Transaction (Fragment 0) --- if sfp_frag == 0: # Cleanup any lingering older transactions for this flow self._cleanup_lingering_transactions(sfp_flow, sfp_tid) log_msg = ( f"[Receiver Reassembly] New TX Start: Key={key_str}, " f"Total Frags={sfp_totfrgas}, Total Size={sfp_totsize}, From={addr}" ) logging.debug(log_msg) # Initialize tracking structures for this transaction self.image_fragments[image_key] = {} try: if config.DEBUG_RECEIVER_REASSEMBLY: logging.debug( f"{log_prefix} Allocating {sfp_totsize} byte buffer for Key={key_str}" ) self.image_data_buffers[image_key] = bytearray(sfp_totsize) if config.DEBUG_RECEIVER_REASSEMBLY: logging.debug( f"{log_prefix} Buffer allocated successfully for Key={key_str}." ) except (MemoryError, ValueError) as alloc_e: logging.error( f"{log_prefix} Failed to allocate buffer for Key={key_str} " f"(Size: {sfp_totsize}): {alloc_e}. Ignoring transaction." ) # Clean up fragment tracking if buffer allocation fails self.image_fragments.pop(image_key, None) continue # Ignore this fragment 0 # --- Process Subsequent Fragments --- # Check if transaction is still active (might have been cleaned up) if ( image_key not in self.image_fragments or image_key not in self.image_data_buffers ): logging.warning( f"{log_prefix} Key={key_str}, Frag={sfp_frag}: Received fragment " "for inactive/cleaned transaction. Ignoring." ) continue # Store fragment info self.image_fragments[image_key][sfp_frag] = sfp_totfrgas # --- Copy Payload to Buffer --- payload = data[header_size:] payload_len = len(payload) # Use the smaller of declared payload size and actual received length bytes_to_copy = min(sfp_plsize, payload_len) if sfp_plsize > payload_len: logging.warning( f"{log_prefix} Key={key_str}, Frag={sfp_frag}: Declared PLSIZE {sfp_plsize} > " f"Actual Payload {payload_len}. Using actual length {payload_len}." ) buff = self.image_data_buffers[image_key] buff_len = len(buff) start_offset = sfp_ploffset end_offset = start_offset + bytes_to_copy # Validate offsets and buffer boundaries if start_offset >= buff_len: logging.error( f"{log_prefix} Key={key_str}, Frag={sfp_frag}: Payload offset {start_offset} " f"is out of buffer bounds ({buff_len}). Ignoring fragment data." ) continue if end_offset > buff_len: # Truncate copy if payload exceeds buffer end bytes_to_copy = buff_len - start_offset logging.warning( f"{log_prefix} Key={key_str}, Frag={sfp_frag}: Payload write (end {end_offset}) " f"exceeds buffer length ({buff_len}). Truncating copy to {bytes_to_copy} bytes." ) # Copy data if valid size if bytes_to_copy > 0: try: if config.DEBUG_RECEIVER_REASSEMBLY: logging.debug( f"{log_prefix} Copying {bytes_to_copy} bytes to buffer for Key={key_str}, " f"Frag={sfp_frag}, Offset={start_offset}." ) # Slice assignment copies data efficiently buff[start_offset : start_offset + bytes_to_copy] = payload[ :bytes_to_copy ] except IndexError as copy_idx_e: logging.error( f"{log_prefix} Key={key_str}, Frag={sfp_frag}: IndexError copying payload " f"slice (Start:{start_offset}, End:{start_offset+bytes_to_copy}, " f"BufLen:{buff_len}): {copy_idx_e}" ) continue # Skip fragment if copy fails except Exception as copy_e: logging.error( f"{log_prefix} Key={key_str}, Frag={sfp_frag}: " f"Error copying payload to buffer: {copy_e}" ) continue # Skip fragment if copy fails # --- Check for Completion --- received_frags_count = len(self.image_fragments[image_key]) expected_total = sfp_totfrgas if received_frags_count == expected_total: log_msg = ( f"[Receiver Reassembly] Complete: Key={key_str}. " f"Rcvd {received_frags_count}/{expected_total} frags. " "Submitting for processing." ) logging.debug(log_msg) completed_buffer = self.image_data_buffers[image_key] # Submit to worker pool if available if self.executor: try: if config.DEBUG_RECEIVER_REASSEMBLY: logging.debug( f"{log_prefix} Submitting task to executor for Key={key_str}." ) self.executor.submit( self.process_completed_image, sfp_flow, sfp_tid, completed_buffer, # Pass the whole buffer ) # Remove buffer immediately after successful submission # Fragment tracking removed by worker later self.image_data_buffers.pop(image_key, None) if config.DEBUG_RECEIVER_REASSEMBLY: logging.debug( f"{log_prefix} Buffer removed from receiver tracking for Key={key_str}." ) except Exception as submit_e: logging.exception( f"{log_prefix} Failed to submit task to executor for Key={key_str}: {submit_e}" ) # Clean up if submission fails self.cleanup_transaction(image_key) else: logging.error( f"{log_prefix} Cannot submit task for Key={key_str}: " "ThreadPoolExecutor is not available." ) # Clean up if no executor self.cleanup_transaction(image_key) elif received_frags_count > expected_total: # This shouldn't happen with proper tracking but log if it does logging.warning( f"{log_prefix} Key={key_str}: Received more fragments ({received_frags_count}) " f"than expected ({expected_total})." ) except Exception as e: # Log errors processing a specific packet key_info_str = key_str if "key_str" in locals() else "Key=Unknown" logging.exception( f"{log_prefix} Error processing UDP packet data {key_info_str} from {addr}: {e}" ) # --- Loop End --- logging.info(f"{log_prefix} UDP data receiving thread stopped.") # --- Profiling Cleanup --- if config.ENABLE_PROFILING and profiler: profiler.disable() logging.info(f"{log_prefix} Receiver profiling disabled. Saving stats...") try: stats_filename = ( f"receiver_profile_{time.strftime('%Y%m%d_%H%M%S')}.prof" ) stats = pstats.Stats(profiler) stats.sort_stats(pstats.SortKey.CUMULATIVE) stats.dump_stats(stats_filename) logging.info( f"{log_prefix} Receiver profile stats saved to {stats_filename}" ) except Exception as profile_e: logging.exception( f"{log_prefix} Error saving/printing profiler stats: {profile_e}" ) # --- _cleanup_lingering_transactions (Unchanged) --- def _cleanup_lingering_transactions(self, current_flow, current_tid): """ Checks for and cleans up old transactions for the same flow, handling potential race conditions. This is called when a new fragment 0 arrives for a flow/tid pair, indicating any previous transaction for the same flow should be considered finished or lost. """ log_prefix = "[Receiver Cleanup]" # Format characters for logging current_flow_char = ( chr(current_flow) if 32 <= current_flow <= 126 else f"0x{current_flow:02X}" ) new_key_str = f"({current_flow_char},{current_tid})" # Iterate over a copy of the keys to avoid RuntimeError if dict changes size during iteration for existing_key in list(self.image_fragments.keys()): existing_flow, existing_tid = existing_key # Check if the existing transaction is for the *same flow* but a *different (older) transaction ID* if existing_flow == current_flow and existing_tid != current_tid: existing_flow_char = ( chr(existing_flow) if 32 <= existing_flow <= 126 else f"0x{existing_flow:02X}" ) lingering_key_str = f"({existing_flow_char},{existing_tid})" # Log that we found an older transaction for this flow logging.debug( f"{log_prefix} Detected lingering transaction {lingering_key_str} " f"while starting new transaction {new_key_str}. Attempting cleanup." ) # --- Safely get and remove fragment data using pop() --- fragments = self.image_fragments.pop(existing_key, None) # --- Check state of the lingering transaction only if we successfully retrieved fragment data --- if fragments is not None: try: received_count = len(fragments) expected_count = ( -1 ) # Default if no fragments had total count info # Try to get the expected total from the first available fragment's value if fragments: expected_count = next(iter(fragments.values())) # Check if the transaction was incomplete if expected_count > 0 and received_count < expected_count: logging.debug( f"{log_prefix} Lingering transaction {lingering_key_str} was incomplete " f"({received_count}/{expected_count}). Marking as lost." ) # Safely update the global incomplete count via AppState img_type = "unknown" if existing_flow == 0x53: # ASCII 'S' for SAR img_type = "sar" elif existing_flow == 0x4D: # ASCII 'M' for MFD img_type = "mfd" if ( img_type != "unknown" and self.app and hasattr(self.app, "state") ): self.app.state.increment_incomplete_rx_count(img_type) else: logging.warning( f"{log_prefix} Could not increment incomplete count for " f"{lingering_key_str} (type='{img_type}', app/state missing?)." ) else: # Logged if transaction was complete or expected count unknown logging.debug( f"{log_prefix} Lingering transaction {lingering_key_str} state " f"({received_count}/{expected_count}) processed during cleanup." ) except Exception as e: # Log error during the state check, but cleanup continues logging.exception( f"{log_prefix} Error checking state of popped lingering " f"transaction {lingering_key_str}: {e}" ) else: # This branch is hit if the worker thread already removed the fragment entry logging.debug( f"{log_prefix} Fragment tracking for lingering transaction " f"{lingering_key_str} was already removed (likely by worker)." ) # --- Always attempt to remove the corresponding data buffer --- buffer_popped = self.image_data_buffers.pop(existing_key, None) if buffer_popped is not None: logging.debug( f"{log_prefix} Cleaned up data buffer for lingering key {lingering_key_str}." ) # Explicitly delete the large bytearray to potentially help GC sooner del buffer_popped # else: Buffer was already gone # Log completion of cleanup for this specific lingering key logging.debug( f"{log_prefix} Finished cleanup attempt for lingering key {lingering_key_str}." ) # --- _calculate_pixel_data_offset (Unchanged) --- def _calculate_pixel_data_offset(self, image_leader): """Calculates the expected byte offset to the start of the pixel data.""" log_prefix = "[Receiver Pixel Offset]" try: offset = ( DataTag.size() + HeaderData.size() # Header Tag & Data + DataTag.size() + GeoData.size() # Geo Tag & Data + DataTag.size() + ImageLeaderData.get_reserved_data_size() # Reserved Tag & Data + DataTag.size() + ImageLeaderData.get_colour_map_size() # CM Tag & Data + DataTag.size() # Pixel Tag size ) logging.debug( f"{log_prefix} Calculated standard pixel data offset: {offset} bytes." ) return offset except Exception as e: logging.exception(f"{log_prefix} Error calculating pixel data offset:") fallback_offset = ImageLeaderData.size() logging.warning( f"{log_prefix} Using fallback pixel data offset estimate: {fallback_offset}" ) return fallback_offset # --- >>> START OF MODIFIED FUNCTION <<< --- def reassemble_sar_image( self, image_leader, image_data, log_prefix ) -> Optional[Tuple[np.ndarray, np.ndarray, Dict[str, Any]]]: """ Extracts SAR metadata and pixel data from buffer. Returns RAW data, normalized uint8 data, and GeoInfo dictionary. Args: image_leader (ImageLeaderData): Parsed leader structure. image_data (bytearray): The complete image data buffer. log_prefix (str): The logging prefix (e.g., "[Worker SAR/TID]"). Returns: Optional[Tuple[np.ndarray, np.ndarray, Dict[str, Any]]]: (raw_image_data, normalized_image_uint8, geo_info_radians_dict) or None on error. """ logging.debug(f"{log_prefix} Entering reassemble_sar_image.") fcounter = image_leader.HEADER_DATA.FCOUNTER image_key_log = f"SAR(FCNT={fcounter})" try: # 1. Extract and validate HeaderData hdr_d = image_leader.HEADER_DATA dx = int(hdr_d.DX) dy = int(hdr_d.DY) bpp = int(hdr_d.BPP) stride_pixels = int(hdr_d.STRIDE) pal_type = int(hdr_d.PALTYPE) logging.debug( f"{log_prefix} {image_key_log}: Extracted HeaderData: DX={dx}, DY={dy}, " f"BPP={bpp}, Stride(px)={stride_pixels}, PALTYPE={pal_type}" ) # Validate SAR specific metadata if ( dx <= 0 or dy <= 0 or bpp not in [1, 2] or stride_pixels < dx or pal_type != 0 ): logging.error( f"{log_prefix} {image_key_log}: Invalid SAR metadata. Cannot reassemble." ) return None pixel_dtype = np.uint8 if bpp == 1 else np.uint16 pixel_bytes = bpp # 2. Calculate pixel offset pixel_data_offset = self._calculate_pixel_data_offset(image_leader) logging.debug( f"{log_prefix} {image_key_log}: Using pixel data offset: {pixel_data_offset}" ) # 3. Validate offset and buffer size available_data_length = len(image_data) logging.debug( f"{log_prefix} {image_key_log}: Validating offset ({pixel_data_offset}) " f"vs buffer size ({available_data_length})." ) if pixel_data_offset >= available_data_length: logging.error( f"{log_prefix} {image_key_log}: Pixel offset >= buffer size. " "Cannot extract pixel data." ) return None minimum_required_core_bytes = dy * dx * pixel_bytes actual_pixel_bytes_available = available_data_length - pixel_data_offset if actual_pixel_bytes_available < minimum_required_core_bytes: logging.error( f"{log_prefix} {image_key_log}: Insufficient pixel data in buffer " f"(Need min {minimum_required_core_bytes}, " f"Found {actual_pixel_bytes_available})." ) return None logging.debug(f"{log_prefix} {image_key_log}: Buffer size validated.") # 4. Create NumPy view for RAW data raw_image_data = None # Initialize try: stride_bytes = stride_pixels * pixel_bytes logging.debug( f"{log_prefix} {image_key_log}: Creating NumPy view for RAW data: " f"Shape=({dy},{dx}), Dtype={pixel_dtype}, Offset={pixel_data_offset}, " f"Strides=({stride_bytes},{pixel_bytes})" ) raw_image_data = np.ndarray( shape=(dy, dx), dtype=pixel_dtype, buffer=image_data, offset=pixel_data_offset, strides=(stride_bytes, pixel_bytes), ) logging.debug( f"{log_prefix} {image_key_log}: NumPy view for RAW data created successfully." ) except ValueError as ve: logging.error( f"{log_prefix} {image_key_log}: Failed to create SAR RAW NumPy view " f"(Shape/stride/offset mismatch?): {ve}" ) return None # 5. Shutdown Check if self.app.state.shutting_down: logging.info( f"{log_prefix} Shutdown detected before normalization. Exiting reassemble." ) return None # 6. Normalize RAW view to uint8 for display/overlay logging.debug( f"{log_prefix} {image_key_log}: Normalizing SAR view to uint8..." ) normalized_image_uint8 = normalize_image( raw_image_data, target_type=np.uint8 # Use the view as source ) if normalized_image_uint8 is None: logging.error( f"{log_prefix} {image_key_log}: SAR normalization to uint8 failed." ) return None logging.debug( f"{log_prefix} {image_key_log}: Normalization complete " f"(Shape: {normalized_image_uint8.shape})." ) # 7. Extract and Validate Geo Info (RADIANS) geo_log_prefix = "[Geo extract]" geo_info_radians = {"valid": False} # Initialize as invalid try: geo_d = image_leader.GEO_DATA logging.debug( f"{geo_log_prefix} {image_key_log}: Extracting GeoData (interpreting angles as RADIANS)..." ) # Read angles directly as float (assumed radians) lat_rad = float(geo_d.LATITUDE) lon_rad = float(geo_d.LONGITUDE) orient_rad = float(geo_d.ORIENTATION) # Populate dictionary geo_info_radians["lat"] = lat_rad geo_info_radians["lon"] = lon_rad geo_info_radians["orientation"] = orient_rad geo_info_radians["ref_x"] = int(geo_d.REF_X) geo_info_radians["ref_y"] = int(geo_d.REF_Y) geo_info_radians["scale_x"] = float(geo_d.SCALE_X) geo_info_radians["scale_y"] = float(geo_d.SCALE_Y) geo_info_radians["width_px"] = dx geo_info_radians["height_px"] = dy # Validate values is_scale_valid = ( geo_info_radians["scale_x"] > 0 and geo_info_radians["scale_y"] > 0 ) is_lat_valid = -math.pi / 2 <= lat_rad <= math.pi / 2 is_lon_valid = -math.pi <= lon_rad <= math.pi is_orient_valid = math.isfinite(orient_rad) if is_scale_valid and is_lat_valid and is_lon_valid and is_orient_valid: geo_info_radians["valid"] = True # Log degrees for readability if needed lat_deg_log = math.degrees(lat_rad) lon_deg_log = math.degrees(lon_rad) orient_deg_log = math.degrees(orient_rad) logging.debug( f"{geo_log_prefix} {image_key_log}: GeoInfo Extracted: Valid={geo_info_radians['valid']}, " f"Lat={lat_deg_log:.4f}deg({lat_rad:.6f}rad), " f"Lon={lon_deg_log:.4f}deg({lon_rad:.6f}rad), " f"Orient={orient_deg_log:.2f}deg({orient_rad:.6f}rad), " f"Ref=({geo_info_radians['ref_x']},{geo_info_radians['ref_y']}), " f"Scale=({geo_info_radians['scale_x']:.3f},{geo_info_radians['scale_y']:.3f}), " f"Size=({dx},{dy})" ) else: logging.warning( f"{geo_log_prefix} {image_key_log}: Invalid geo values found " f"(ScaleValid={is_scale_valid}, LatValid={is_lat_valid}, " f"LonValid={is_lon_valid}, OrientValid={is_orient_valid}). " "GeoInfo marked invalid." ) geo_info_radians["valid"] = False except OverflowError as oe: logging.error( f"{geo_log_prefix} {image_key_log}: Math OverflowError during GeoData " f"conversion: {oe}. GeoInfo marked invalid." ) geo_info_radians = {"valid": False} except Exception as e: logging.exception( f"{geo_log_prefix} {image_key_log}: Failed during GeoData extraction/conversion: {e}" ) geo_info_radians = {"valid": False} # 8. Return results: RAW data, NORMALIZED data, GEO info logging.debug(f"{log_prefix} Exiting reassemble_sar_image successfully.") # Return COPIES to avoid buffer issues after function returns return ( raw_image_data.copy(), normalized_image_uint8.copy(), geo_info_radians, ) except Exception as e: logging.exception( f"{log_prefix} {image_key_log}: Unexpected error during SAR reassembly: {e}" ) return None # # --- reassemble_mfd_image (Unchanged) --- def reassemble_mfd_image(self, image_leader, image_data, log_prefix): """ Extracts MFD metadata and pixel indices (uint8) from buffer. Args: image_leader (ImageLeaderData): Parsed leader structure. image_data (bytearray): The complete image data buffer. log_prefix (str): The logging prefix (e.g., "[Worker MFD/TID]"). Returns: numpy.ndarray: Copy of the 2D uint8 index array, or None on error. """ logging.debug(f"{log_prefix} Entering reassemble_mfd_image.") fcounter = image_leader.HEADER_DATA.FCOUNTER image_key_log = f"MFD(FCNT={fcounter})" try: # 1. Extract and validate HeaderData hdr_d = image_leader.HEADER_DATA dx = int(hdr_d.DX) dy = int(hdr_d.DY) bpp = int(hdr_d.BPP) stride_pixels = int(hdr_d.STRIDE) pal_type = int(hdr_d.PALTYPE) logging.debug( f"{log_prefix} {image_key_log}: Extracted HeaderData: DX={dx}, DY={dy}, " f"BPP={bpp}, Stride(px)={stride_pixels}, PALTYPE={pal_type}" ) # MFD must be BPP=1, PALTYPE=1 if dx <= 0 or dy <= 0 or bpp != 1 or stride_pixels < dx or pal_type != 1: logging.error( f"{log_prefix} {image_key_log}: Invalid MFD metadata. Cannot reassemble." ) return None pixel_bytes = 1 # 2. Calculate pixel offset pixel_data_offset = self._calculate_pixel_data_offset(image_leader) logging.debug( f"{log_prefix} {image_key_log}: Using pixel data offset: {pixel_data_offset}" ) # 3. Validate offset and buffer size available_data_length = len(image_data) logging.debug( f"{log_prefix} {image_key_log}: Validating offset ({pixel_data_offset}) " f"vs buffer size ({available_data_length})." ) if pixel_data_offset >= available_data_length: logging.error( f"{log_prefix} {image_key_log}: Pixel offset >= buffer size. " "Cannot extract pixel data." ) return None minimum_required_core_bytes = dy * dx * pixel_bytes actual_pixel_bytes_available = available_data_length - pixel_data_offset if actual_pixel_bytes_available < minimum_required_core_bytes: logging.error( f"{log_prefix} {image_key_log}: Insufficient pixel data in buffer " f"(Need min {minimum_required_core_bytes}, " f"Found {actual_pixel_bytes_available})." ) return None logging.debug(f"{log_prefix} {image_key_log}: Buffer size validated.") # 4. Create NumPy view for indices (uint8) try: stride_bytes = stride_pixels * pixel_bytes logging.debug( f"{log_prefix} {image_key_log}: Creating NumPy view: Shape=({dy},{dx}), " f"Dtype=uint8, Offset={pixel_data_offset}, Strides=({stride_bytes},{pixel_bytes})" ) mfd_index_view = np.ndarray( shape=(dy, dx), dtype=np.uint8, buffer=image_data, offset=pixel_data_offset, strides=(stride_bytes, pixel_bytes), ) logging.debug( f"{log_prefix} {image_key_log}: NumPy view created successfully." ) except ValueError as ve: logging.error( f"{log_prefix} {image_key_log}: Failed to create MFD index NumPy view " f"(Shape/stride/offset mismatch?): {ve}" ) return None # 5. Shutdown Check if self.app.state.shutting_down: logging.info( f"{log_prefix} Shutdown detected after MFD view creation. Exiting reassemble." ) return None # 6. Return a COPY of the index view logging.debug( f"{log_prefix} Exiting reassemble_mfd_image successfully with index copy." ) return mfd_index_view.copy() except Exception as e: logging.exception( f"{log_prefix} {image_key_log}: Unexpected error during MFD index reassembly: {e}" ) return None # --- save_binary_dump (Unchanged) --- def save_binary_dump(self, data, filename): """Saves raw packet data to a binary file if enabled in config.""" log_prefix = "[Receiver Dump]" if not self.save_dumps: return try: with open(filename, "wb") as f: f.write(data) logging.debug( f"{log_prefix} Saved binary dump to {filename} ({len(data)} bytes)" ) except Exception as e: logging.error(f"{log_prefix} Error saving binary dump to {filename}: {e}") # --- send_ack (Unchanged) --- def send_ack(self, dest_addr, original_header_bytes): """Sends an SFP ACK packet back to the sender using the provided address.""" log_prefix = "[Receiver ACK]" try: header_size = SFPHeader.size() if len(original_header_bytes) < header_size: logging.error( f"{log_prefix} ACK fail to {dest_addr}: Original header too short " f"({len(original_header_bytes)} bytes)." ) return # Create a modifiable copy ack_header = bytearray(original_header_bytes[:header_size]) # Get offsets for fields to modify flow_offset = SFPHeader.get_field_offset("SFP_FLOW") win_offset = SFPHeader.get_field_offset("SFP_WIN") flags_offset = SFPHeader.get_field_offset("SFP_FLAGS") tid_offset = SFPHeader.get_field_offset("SFP_TID") # --- >>> START OF MODIFIED CODE <<< --- direction_offset = SFPHeader.get_field_offset("SFP_DIRECTION") # --- >>> END OF MODIFIED CODE <<< --- # Check if all offsets were retrieved successfully if ( flow_offset == -1 or win_offset == -1 or flags_offset == -1 or tid_offset == -1 or direction_offset == -1 ): # Added direction_offset check logging.error( f"{log_prefix} Failed to get field offsets for ACK construction. Cannot send." ) return sfp_flow = ack_header[flow_offset] flow_char = chr(sfp_flow) if 32 <= sfp_flow <= 126 else f"0x{sfp_flow:02X}" sfp_tid = ack_header[tid_offset] # For logging # Determine window size based on flow window_size = 0 flow_name = f"Unknown({flow_char})" if flow_char == "M": window_size = config.ACK_WINDOW_SIZE_MFD flow_name = "MFD" elif flow_char == "S": window_size = config.ACK_WINDOW_SIZE_SAR flow_name = "SAR" else: logging.warning( f"{log_prefix} ACK for unknown SFP_FLOW {flow_char}. Using Window=0." ) # Ensure window size is valid uint8 window_size = max(0, min(window_size, 255)) logging.debug( f"{log_prefix} Determined Flow={flow_name}, TID={sfp_tid}, Window Size={window_size}" ) # --- Modify Header Fields for ACK --- # 1. Set Window Size ack_header[win_offset] = window_size # 2. Set Flags (Set ACK=1 [bit 0], Clear ACK_REQ=0 [bit 1]) original_flags = ack_header[flags_offset] new_flags = (original_flags | 0x01) & ~0x02 ack_header[flags_offset] = new_flags logging.debug( f"{log_prefix} Original Flags=0x{original_flags:02X}, New Flags=0x{new_flags:02X}" ) # --- >>> START OF MODIFIED CODE <<< --- # 3. Set Direction (Invert original direction) # Assuming original direction was '>' (0x3E), set to '<' (0x3C) ack_header[direction_offset] = 0x3C # ASCII code for '<' logging.debug(f"{log_prefix} Set Direction to 0x3C ('<')") # --- >>> END OF MODIFIED CODE <<< --- # 4. TODO (Optional but recommended): Set SFP_SRC to receiver's ID if defined/needed # Send ACK if socket is valid if self.udp_socket and self.udp_socket.fileno() != -1: logging.debug( f"{log_prefix} Sending ACK ({len(ack_header)}b) to {dest_addr}..." ) bytes_sent = self.udp_socket.sendto(ack_header, dest_addr) logging.debug( f"{log_prefix} ACK ({bytes_sent}b) sent successfully to {dest_addr} " f"for TID={sfp_tid}, Flow={flow_name}, Win={window_size}" ) else: logging.warning( f"{log_prefix} Cannot send ACK to {dest_addr}: " "UDP socket is closed or invalid." ) except Exception as e: logging.exception( f"{log_prefix} Unexpected error sending ACK to {dest_addr}: {e}" ) # --- >>> START OF MODIFIED FUNCTION <<< --- def process_completed_image(self, sfp_flow, sfp_tid, image_data): """Processes a fully reassembled image buffer in a worker thread.""" image_key = (sfp_flow, sfp_tid) flow_char = chr(sfp_flow) if 32 <= sfp_flow <= 126 else f"0x{sfp_flow:02X}" log_prefix = f"[Worker {flow_char}/{sfp_tid}]" logging.debug( f"{log_prefix} Starting image processing task. " f"Buffer size: {len(image_data) if image_data else 'None'}" ) try: if self.app.state.shutting_down: logging.info(f"{log_prefix} Shutdown detected at start. Aborting.") return # Validate buffer basics if not image_data or len(image_data) < ImageLeaderData.size(): logging.error(f"{log_prefix} Invalid/incomplete data buffer. Aborting.") return # Parse leader data try: logging.debug(f"{log_prefix} Parsing ImageLeaderData...") image_leader = ImageLeaderData.from_buffer(image_data) # Access a field to force potential parse error early _ = image_leader.HEADER_TAG.VALID logging.debug(f"{log_prefix} ImageLeaderData parsed.") except Exception as parse_err: logging.error( f"{log_prefix} Failed to parse ImageLeaderData: {parse_err}. Aborting." ) return # Determine image type based on PALTYPE pal_type = image_leader.HEADER_DATA.PALTYPE hdr_type = image_leader.HEADER_DATA.TYPE bpp = image_leader.HEADER_DATA.BPP logging.debug( f"{log_prefix} Determining image type - PALTYPE={pal_type}, " f"Header TYPE={hdr_type}, BPP={bpp}, Flow='{flow_char}'" ) # --- MFD Branch --- if pal_type == 1: logging.debug( f"{log_prefix} Identified as MFD type based on PALTYPE=1." ) if bpp != 1: logging.warning( f"{log_prefix} MFD data has BPP={bpp} (expected 1)." ) if flow_char != "M": logging.warning( f"{log_prefix} MFD data received on unexpected flow '{flow_char}'." ) if self.app.state.shutting_down: logging.info( f"{log_prefix} Shutdown detected before MFD reassembly." ) return logging.debug(f"{log_prefix} CALLING reassemble_mfd_image...") mfd_indices = self.reassemble_mfd_image( image_leader, image_data, log_prefix ) logging.debug( f"{log_prefix} RETURNED from reassemble_mfd_image. " f"Result is None: {mfd_indices is None}" ) if self.app.state.shutting_down: logging.info( f"{log_prefix} Shutdown detected after MFD reassembly." ) # No need to return here, cleanup happens in finally if mfd_indices is not None: logging.debug( f"{log_prefix} MFD indices reassembled. " "Calling App callback (handle_new_mfd_data)..." ) # Call the main app's handler for MFD data self.set_new_mfd_image_callback(mfd_indices) logging.debug(f"{log_prefix} Returned from App MFD callback.") else: logging.error( f"{log_prefix} MFD reassembly failed (returned None). " "Callback NOT called." ) # Increment incomplete counter if reassembly failed self.app.state.increment_incomplete_rx_count("mfd") # --- SAR Branch --- elif pal_type == 0: logging.debug( f"{log_prefix} Identified as SAR type based on PALTYPE=0." ) if flow_char != "S": logging.warning( f"{log_prefix} SAR data received on unexpected flow '{flow_char}'." ) if self.app.state.shutting_down: logging.info( f"{log_prefix} Shutdown detected before SAR reassembly." ) return logging.debug(f"{log_prefix} CALLING reassemble_sar_image...") # reassemble_sar_image now returns (raw, normalized, geo_info) or None reassembly_result = self.reassemble_sar_image( image_leader, image_data, log_prefix ) logging.debug( f"{log_prefix} RETURNED from reassemble_sar_image. " f"Result is None: {reassembly_result is None}" ) if self.app.state.shutting_down: logging.info( f"{log_prefix} Shutdown detected after SAR reassembly." ) # No need to return here, cleanup happens in finally if reassembly_result is not None: # Unpack the results raw_sar_data, normalized_sar_uint8, geo_info_radians = ( reassembly_result ) # --- Callback for Display --- logging.debug( f"{log_prefix} SAR reassembled. Calling App callback for display " "(handle_new_sar_data)..." ) self.set_new_sar_image_callback( normalized_sar_uint8, # Pass normalized for display geo_info_radians, ) logging.debug( f"{log_prefix} Returned from App SAR display callback." ) # --- Callback for Recording (if recorder exists) --- if self.image_recorder: logging.debug(f"{log_prefix} Calling ImageRecorder...") # Pass RAW data for saving self.image_recorder.record_sar_image( raw_sar_data, geo_info_radians ) logging.debug(f"{log_prefix} Returned from ImageRecorder call.") else: logging.debug( f"{log_prefix} ImageRecorder not available, skipping recording." ) # Check if metadata display is enabled in AppState if self.app.state.display_sar_metadata: logging.debug(f"{log_prefix} Metadata display enabled. Formatting...") metadata_str = None # Initialize try: # Call the generic formatting utility metadata_str = format_ctypes_structure(image_leader) except Exception as fmt_err: logging.error(f"{log_prefix} Error formatting metadata: {fmt_err}") metadata_str = f"" # Queue the formatted string (or error message) if metadata_str is not None: meta_command = "SAR_METADATA_UPDATE" logging.debug(f"{log_prefix} Queuing '{meta_command}' for UI update.") put_queue(self.app.tkinter_queue, (meta_command, metadata_str), "tkinter", self.app) else: logging.debug(f"{log_prefix} Metadata display disabled. Skipping format/queue.") else: logging.error( f"{log_prefix} SAR reassembly failed (returned None). " "Callbacks NOT called." ) # Increment incomplete counter if reassembly failed self.app.state.increment_incomplete_rx_count("sar") # --- Unknown Type --- else: logging.error( f"{log_prefix} Unsupported PALTYPE={pal_type} encountered. " "Cannot process." ) # Consider incrementing a generic counter? except Exception as e: # Catch unexpected errors during processing if not self.app.state.shutting_down: logging.exception( f"{log_prefix} Unexpected error processing completed image:" ) # Increment incomplete counter on unexpected error img_type = ( "sar" if flow_char == "S" else "mfd" if flow_char == "M" else "unknown" ) if img_type != "unknown": self.app.state.increment_incomplete_rx_count(img_type) finally: # --- Cleanup --- # Always remove fragment tracking after processing attempt (success or fail) # Buffer was already removed upon submission to executor. logging.debug(f"{log_prefix} Entering finally block for fragment cleanup.") self.cleanup_transaction_fragments(image_key) logging.debug(f"{log_prefix} Exiting image processing task.") # def cleanup_transaction_fragments(self, image_key): """Removes only the fragment tracking data for a transaction.""" log_prefix = "[Receiver Cleanup]" # Use pop for thread-safe removal frags_popped = self.image_fragments.pop(image_key, None) if frags_popped is not None: flow_char = ( chr(image_key[0]) if 32 <= image_key[0] <= 126 else f"0x{image_key[0]:02X}" ) key_str = f"({flow_char},{image_key[1]})" logging.debug( f"{log_prefix} Cleaned up fragment tracking for Key={key_str}" ) # else: No need to log if key didn't exist def cleanup_transaction(self, image_key): """Removes all transaction data (fragments tracking, buffer) from memory.""" log_prefix = "[Receiver Cleanup]" # Use pop for thread-safe removal frags_popped = self.image_fragments.pop(image_key, None) buffer_popped = self.image_data_buffers.pop(image_key, None) flow_char = ( chr(image_key[0]) if 32 <= image_key[0] <= 126 else f"0x{image_key[0]:02X}" ) key_str = f"({flow_char},{image_key[1]})" if frags_popped is not None or buffer_popped is not None: logging.debug( f"{log_prefix} Cleaned up resources for Key={key_str} " f"(Fragments:{frags_popped is not None}, Buffer:{buffer_popped is not None})" ) # Explicitly delete buffer if popped to help GC if buffer_popped is not None: del buffer_popped # else: No need to log if key didn't exist # --- END OF FILE receiver.py ---