# --- START OF FILE receiver.py --- # receiver.py """ THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. Handles UDP reception, SFP fragment reassembly, metadata extraction, ACK handling, and submission of completed images (SAR/MFD) to worker threads for processing. Tracks incomplete transactions and uses standardized logging with prefixes. """ # Standard library imports import socket import logging import struct import os import ctypes import queue # Only needed for type hinting if used, not directly used here import cProfile import pstats import threading import time import math # Third-party imports import numpy as np import cv2 # Keep for potential internal use, though main processing moved # Local application imports import config from concurrent.futures import ThreadPoolExecutor # Import specific function needed by this module from image_processing import normalize_image # --- Data Structures Definition using ctypes --- class SFPHeader(ctypes.Structure): """Structure representing the SFP header (32 bytes).""" _pack_ = 1 _fields_ = [ ("SFP_MARKER", ctypes.c_uint8), ("SFP_DIRECTION", ctypes.c_uint8), ("SFP_PROT_VER", ctypes.c_uint8), ("SFP_PT_SPARE", ctypes.c_uint8), ("SFP_TAG", ctypes.c_uint8), ("SFP_SRC", ctypes.c_uint8), ("SFP_FLOW", ctypes.c_uint8), ("SFP_TID", ctypes.c_uint8), ("SFP_FLAGS", ctypes.c_uint8), ("SFP_WIN", ctypes.c_uint8), ("SFP_ERR", ctypes.c_uint8), ("SFP_ERR_INFO", ctypes.c_uint8), ("SFP_TOTFRGAS", ctypes.c_uint16), ("SFP_FRAG", ctypes.c_uint16), ("SFP_RECTYPE", ctypes.c_uint8), ("SFP_RECSPARE", ctypes.c_uint8), ("SFP_PLDAP", ctypes.c_uint8), ("SFP_PLEXT", ctypes.c_uint8), ("SFP_RECCOUNTER", ctypes.c_uint16), ("SFP_PLSIZE", ctypes.c_uint16), ("SFP_TOTSIZE", ctypes.c_uint32), ("SFP_PLOFFSET", ctypes.c_uint32), ] @classmethod def size(cls): return ctypes.sizeof(cls) # Helper to get field offset, handling potential AttributeError @staticmethod def get_field_offset(field_name): try: return getattr(SFPHeader, field_name).offset except AttributeError: return -1 # Indicate error class DataTag(ctypes.Structure): """Structure representing a generic data tag (8 bytes).""" _pack_ = 1 _fields_ = [ ("ID", ctypes.c_uint8 * 2), ("VALID", ctypes.c_uint8), ("VERSION", ctypes.c_uint8), ("SIZE", ctypes.c_uint32), ] @classmethod def size(cls): return ctypes.sizeof(cls) class HeaderData(ctypes.Structure): """Structure representing the HEADER_DATA block (~48 bytes).""" _pack_ = 1 _fields_ = [ ("TYPE", ctypes.c_uint8), ("SUBTYPE", ctypes.c_uint8), ("LOGCOLORS", ctypes.c_uint8), ("IMG_RESERVED", ctypes.c_uint8), ("PRODINFO", ctypes.c_uint32 * 2), ("TOD", ctypes.c_uint16 * 2), ("RESERVED", ctypes.c_uint32), ("FCOUNTER", ctypes.c_uint32), ("TIMETAG", ctypes.c_uint32), ("NOMINALFRATE", ctypes.c_uint16), ("FRAMETAG", ctypes.c_uint16), ("OX", ctypes.c_uint16), ("OY", ctypes.c_uint16), ("DX", ctypes.c_uint16), ("DY", ctypes.c_uint16), ("STRIDE", ctypes.c_uint16), ("BPP", ctypes.c_uint8), ("COMP", ctypes.c_uint8), ("SPARE", ctypes.c_uint16), ("PALTYPE", ctypes.c_uint8), ("GAP", ctypes.c_uint8), ] @classmethod def size(cls): return ctypes.sizeof(cls) class GeoData(ctypes.Structure): """Structure representing the GEO_DATA block (~80 bytes). Assumes specific types.""" _pack_ = 1 _fields_ = [ ("INVMASK", ctypes.c_uint32), ("ORIENTATION", ctypes.c_float), # RADIANS (as per correction) ("LATITUDE", ctypes.c_float), # Degrees (as assumed by documentation/structure) ( "LONGITUDE", ctypes.c_float, ), # Degrees (as assumed by documentation/structure) ("REF_X", ctypes.c_uint16), ("REF_Y", ctypes.c_uint16), ("SCALE_X", ctypes.c_float), ("SCALE_Y", ctypes.c_float), ("POI_ORIENTATION", ctypes.c_float), ("POI_LATITUDE", ctypes.c_float), ("POI_LONGITUDE", ctypes.c_float), ("POI_ALTITUDE", ctypes.c_float), ("POI_X", ctypes.c_uint16), ("POI_Y", ctypes.c_uint16), ("SPARE", ctypes.c_uint32 * 7), # 28 bytes ] @classmethod def size(cls): return ctypes.sizeof(cls) class ImageLeaderData(ctypes.Structure): """Represents the complete Image Leader data structure (~1320 bytes).""" _pack_ = 1 _fields_ = [ ("HEADER_TAG", DataTag), ("HEADER_DATA", HeaderData), ("GEO_TAG", DataTag), ("GEO_DATA", GeoData), ("RESERVED_TAG", DataTag), ("RESERVED_DATA", ctypes.c_uint8 * 128), ("CM_TAG", DataTag), ("COLOUR_MAP", ctypes.c_uint32 * 256), ("PIXEL_TAG", DataTag), ] @classmethod def size(cls): return ctypes.sizeof(cls) @staticmethod def get_reserved_data_size(): return 128 # ctypes.sizeof(ctypes.c_uint8 * 128) @staticmethod def get_colour_map_size(): return 1024 # ctypes.sizeof(ctypes.c_uint32 * 256) # --- UDP Receiver Class --- class UdpReceiver: """ Handles UDP reception, SFP fragment reassembly, metadata extraction, image callback invocation, ACK sending, and statistics tracking. Uses a ThreadPoolExecutor for processing completed images. """ def __init__( self, app, # Reference to the main App instance udp_socket, set_new_sar_image_callback, set_new_mfd_indices_image_callback, ): """ Initializes the UDP receiver. Args: app (App): The main application instance (provides access to config, state). udp_socket (socket.socket): The bound UDP socket to receive from. set_new_sar_image_callback (callable): Callback for SAR images (norm_uint8, geo_info_radians). set_new_mfd_indices_image_callback (callable): Callback for MFD images (indices_uint8). """ log_prefix = "[Receiver Init]" # Specific prefix logging.debug(f"{log_prefix} Initializing UdpReceiver...") self.app = app self.udp_socket = udp_socket self.set_new_sar_image_callback = set_new_sar_image_callback self.set_new_mfd_image_callback = set_new_mfd_indices_image_callback # Reassembly state dictionaries self.image_fragments = {} # { (flow, tid): {frag_num: total_frags_expected} } self.image_data_buffers = {} # { (flow, tid): bytearray } logging.debug(f"{log_prefix} Reassembly dictionaries initialized.") # Binary dump configuration self.save_dumps = config.SAVE_BINARY_DUMPS self.dump_dir = config.DUMP_DIRECTORY if self.save_dumps: if not os.path.exists(self.dump_dir): try: os.makedirs(self.dump_dir) # INFO for directory creation logging.info( f"{log_prefix} Created binary dump directory: {self.dump_dir}" ) except OSError as e: # ERROR if creation fails logging.error( f"{log_prefix} Failed to create dump directory '{self.dump_dir}': {e}. Disabling dumps." ) self.save_dumps = False else: logging.debug( f"{log_prefix} Binary dump directory '{self.dump_dir}' already exists." ) else: logging.debug(f"{log_prefix} Binary dump saving is disabled.") # Thread pool for image processing try: self.executor = ThreadPoolExecutor( max_workers=config.MAX_WORKERS, thread_name_prefix="ImageProcessor" ) # INFO for successful pool creation logging.info( f"{log_prefix} ThreadPoolExecutor created with max_workers={config.MAX_WORKERS}" ) except Exception as pool_e: # CRITICAL if pool creation fails, likely fatal for processing logging.critical( f"{log_prefix} Failed to create ThreadPoolExecutor: {pool_e}", exc_info=True, ) # Potentially raise exception or exit app here? For now, log and continue. self.executor = None # Ensure executor is None # Statistics self.incomplete_sar_count = 0 self.incomplete_mfd_count = 0 self._stats_lock = threading.Lock() logging.debug(f"{log_prefix} Statistics counters and lock initialized.") logging.info(f"{log_prefix} UdpReceiver initialization complete.") def get_incomplete_counts(self): """Returns the current counts of incomplete/lost transactions (thread-safe).""" # No logging needed for simple getter typically with self._stats_lock: sar_count = self.incomplete_sar_count mfd_count = self.incomplete_mfd_count return sar_count, mfd_count def receive_udp_data(self): """Main loop to listen for UDP packets, parse headers, and reassemble fragments.""" log_prefix = "[Receiver Loop]" # Prefix for general loop messages if not self.udp_socket or self.udp_socket.fileno() == -1: logging.critical( f"{log_prefix} Receiver thread cannot start: socket is invalid or closed." ) return logging.info( f"{log_prefix} Receiver thread started and listening on {self.udp_socket.getsockname()}." ) profiler = None if config.ENABLE_PROFILING: profiler = cProfile.Profile() profiler.enable() logging.info(f"{log_prefix} Receiver profiling enabled.") while True: if self.app.state.shutting_down: logging.info(f"{log_prefix} Shutdown detected. Exiting thread.") break try: data, addr = self.udp_socket.recvfrom(65535) if not data: logging.warning( f"{log_prefix} Received empty UDP packet from {addr}. Continuing." ) continue except socket.timeout: logging.debug(f"{log_prefix} UDP socket timeout.") continue except OSError as e: if self.app.state.shutting_down: logging.info( f"{log_prefix} UDP socket error during recvfrom (expected during shutdown): {e}. Exiting." ) else: logging.error( f"{log_prefix} UDP socket error during recvfrom: {e}. Exiting thread." ) break except Exception as e: logging.exception(f"{log_prefix} Unexpected error during UDP recvfrom:") time.sleep(0.01) continue try: header_size = SFPHeader.size() if len(data) < header_size: logging.warning( f"{log_prefix} Rcvd packet too small ({len(data)} bytes) for SFP header from {addr}. Ignoring." ) continue try: header = SFPHeader.from_buffer_copy(data) except (ValueError, TypeError) as parse_err: logging.error( f"{log_prefix} Failed to parse SFP header from {addr}: {parse_err}. Ignoring packet." ) continue sfp_flow = header.SFP_FLOW sfp_tid = header.SFP_TID sfp_frag = header.SFP_FRAG sfp_totfrgas = header.SFP_TOTFRGAS sfp_plsize = header.SFP_PLSIZE sfp_ploffset = header.SFP_PLOFFSET sfp_flags = header.SFP_FLAGS sfp_totsize = header.SFP_TOTSIZE image_key = (sfp_flow, sfp_tid) flow_char = ( chr(sfp_flow) if 32 <= sfp_flow <= 126 else f"0x{sfp_flow:02X}" ) key_str = f"({flow_char},{sfp_tid})" if config.DEBUG_RECEIVER_PACKETS: logging.debug( f"Rcvd: Key={key_str}, Frag={sfp_frag}/{sfp_totfrgas}, " f"Size={sfp_plsize}, Offset={sfp_ploffset}, Flags=0x{sfp_flags:02X}, TotalSize={sfp_totsize}" ) if sfp_flags & 0x01: logging.debug( f"{log_prefix} ACK Request received for Key={key_str}. Triggering ACK send." ) self.send_ack(addr, data[:header_size]) if self.save_dumps: dump_filename = ( f"{self.dump_dir}/pkt_{flow_char}_{sfp_tid}_{sfp_frag}.bin" ) self.save_binary_dump(data, dump_filename) if sfp_totfrgas == 0 or sfp_totfrgas > 60000: logging.warning( f"{log_prefix} Key={key_str}: Invalid total fragments count {sfp_totfrgas}. Ignoring packet." ) continue MAX_EXPECTED_IMG_SIZE = 100 * 1024 * 1024 MIN_EXPECTED_IMG_SIZE = ImageLeaderData.size() if ( sfp_totsize < MIN_EXPECTED_IMG_SIZE or sfp_totsize > MAX_EXPECTED_IMG_SIZE ): logging.error( f"{log_prefix} Key={key_str}, Frag={sfp_frag}: Invalid total image size {sfp_totsize} " f"(Limits: {MIN_EXPECTED_IMG_SIZE}-{MAX_EXPECTED_IMG_SIZE}). Ignoring packet/transaction." ) continue if sfp_frag == 0: self._cleanup_lingering_transactions(sfp_flow, sfp_tid) # --- MODIFICATION START: Simplified Logging for New TX --- # Always log as DEBUG, controlled by DEBUG_RECEIVER_REASSEMBLY log_msg = ( f"[Receiver Reassembly] New TX Start: Key={key_str}, Total Frags={sfp_totfrgas}, " f"Total Size={sfp_totsize}, From={addr}" ) logging.debug(log_msg) # Now always DEBUG level # --- MODIFICATION END --- self.image_fragments[image_key] = {} try: if config.DEBUG_RECEIVER_REASSEMBLY: logging.debug( f"{log_prefix} Allocating {sfp_totsize} byte buffer for Key={key_str}" ) self.image_data_buffers[image_key] = bytearray(sfp_totsize) if config.DEBUG_RECEIVER_REASSEMBLY: logging.debug( f"{log_prefix} Buffer allocated successfully for Key={key_str}." ) except (MemoryError, ValueError) as alloc_e: logging.error( f"{log_prefix} Failed to allocate buffer for Key={key_str} (Size: {sfp_totsize}): {alloc_e}. Ignoring transaction." ) self.image_fragments.pop(image_key, None) continue if ( image_key not in self.image_fragments or image_key not in self.image_data_buffers ): logging.warning( f"{log_prefix} Key={key_str}, Frag={sfp_frag}: Received fragment for inactive/cleaned transaction. Ignoring." ) continue self.image_fragments[image_key][sfp_frag] = sfp_totfrgas payload = data[header_size:] payload_len = len(payload) bytes_to_copy = min(sfp_plsize, payload_len) if sfp_plsize > payload_len: logging.warning( f"{log_prefix} Key={key_str}, Frag={sfp_frag}: Declared PLSIZE {sfp_plsize} > Actual Payload {payload_len}. Using actual length {payload_len}." ) buff = self.image_data_buffers[image_key] buff_len = len(buff) start_offset = sfp_ploffset end_offset = start_offset + bytes_to_copy if start_offset >= buff_len: logging.error( f"{log_prefix} Key={key_str}, Frag={sfp_frag}: Payload offset {start_offset} is out of buffer bounds ({buff_len}). Ignoring fragment data." ) continue if end_offset > buff_len: bytes_to_copy = buff_len - start_offset logging.warning( f"{log_prefix} Key={key_str}, Frag={sfp_frag}: Payload write (end {end_offset}) exceeds buffer length ({buff_len}). Truncating copy to {bytes_to_copy} bytes." ) if bytes_to_copy > 0: try: if config.DEBUG_RECEIVER_REASSEMBLY: logging.debug( f"{log_prefix} Copying {bytes_to_copy} bytes to buffer for Key={key_str}, Frag={sfp_frag}, Offset={start_offset}." ) buff[start_offset : start_offset + bytes_to_copy] = payload[ :bytes_to_copy ] except IndexError as copy_idx_e: logging.error( f"{log_prefix} Key={key_str}, Frag={sfp_frag}: IndexError copying payload slice (Start:{start_offset}, End:{start_offset+bytes_to_copy}, BufLen:{buff_len}): {copy_idx_e}" ) continue except Exception as copy_e: logging.error( f"{log_prefix} Key={key_str}, Frag={sfp_frag}: Error copying payload to buffer: {copy_e}" ) continue received_frags_count = len(self.image_fragments[image_key]) expected_total = sfp_totfrgas if received_frags_count == expected_total: # --- MODIFICATION START: Simplified Logging for Completion --- # Always log as DEBUG, controlled by DEBUG_RECEIVER_REASSEMBLY log_msg = f"[Receiver Reassembly] Complete: Key={key_str}. Rcvd {received_frags_count}/{expected_total} frags. Submitting for processing." logging.debug(log_msg) # Now always DEBUG level # --- MODIFICATION END --- completed_buffer = self.image_data_buffers[image_key] if self.executor: try: if config.DEBUG_RECEIVER_REASSEMBLY: logging.debug( f"{log_prefix} Submitting task to executor for Key={key_str}." ) self.executor.submit( self.process_completed_image, sfp_flow, sfp_tid, completed_buffer, ) self.image_data_buffers.pop(image_key, None) if config.DEBUG_RECEIVER_REASSEMBLY: logging.debug( f"{log_prefix} Buffer removed from receiver tracking for Key={key_str}." ) except Exception as submit_e: logging.exception( f"{log_prefix} Failed to submit task to executor for Key={key_str}: {submit_e}" ) self.cleanup_transaction(image_key) else: logging.error( f"{log_prefix} Cannot submit task for Key={key_str}: ThreadPoolExecutor is not available." ) self.cleanup_transaction(image_key) elif received_frags_count > expected_total: logging.warning( f"{log_prefix} Key={key_str}: Received more fragments ({received_frags_count}) than expected ({expected_total})." ) except Exception as e: key_info_str = key_str if "key_str" in locals() else "Key=Unknown" logging.exception( f"{log_prefix} Error processing UDP packet data {key_info_str} from {addr}: {e}" ) logging.info(f"{log_prefix} UDP data receiving thread stopped.") if config.ENABLE_PROFILING and profiler: profiler.disable() logging.info(f"{log_prefix} Receiver profiling disabled. Saving stats...") try: stats_filename = ( f"receiver_profile_{time.strftime('%Y%m%d_%H%M%S')}.prof" ) stats = pstats.Stats(profiler) stats.sort_stats(pstats.SortKey.CUMULATIVE) stats.dump_stats(stats_filename) logging.info( f"{log_prefix} Receiver profile stats saved to {stats_filename}" ) except Exception as profile_e: logging.exception( f"{log_prefix} Error saving/printing profiler stats: {profile_e}" ) def _cleanup_lingering_transactions(self, current_flow, current_tid): """Checks for and cleans up old transactions for the same flow.""" log_prefix = "[Receiver Cleanup]" # Specific prefix keys_to_remove = [] current_flow_char = ( chr(current_flow) if 32 <= current_flow <= 126 else f"0x{current_flow:02X}" ) new_key_str = f"({current_flow_char},{current_tid})" for existing_key in list(self.image_fragments.keys()): existing_flow, existing_tid = existing_key if existing_flow == current_flow and existing_tid != current_tid: existing_flow_char = ( chr(existing_flow) if 32 <= existing_flow <= 126 else f"0x{existing_flow:02X}" ) lingering_key_str = f"({existing_flow_char},{existing_tid})" # WARNING for detecting and cleaning up lingering transaction logging.warning( f"{log_prefix} Detected lingering transaction {lingering_key_str} while starting {new_key_str}. Cleaning up." ) # Check if incomplete (DEBUG level for details) try: fragments = self.image_fragments[existing_key] received_count = len(fragments) expected_count = -1 if fragments: expected_count = next(iter(fragments.values())) if expected_count > 0 and received_count < expected_count: logging.debug( f"{log_prefix} Lingering transaction {lingering_key_str} was incomplete ({received_count}/{expected_count}). Marking as lost." ) with self._stats_lock: if existing_flow == 0x53: self.incomplete_sar_count += 1 # 'S' elif existing_flow == 0x4D: self.incomplete_mfd_count += 1 # 'M' else: logging.debug( f"{log_prefix} Lingering transaction {lingering_key_str} state: {received_count}/{expected_count}. Proceeding with cleanup." ) except Exception as e: # Log exception during check but still remove logging.exception( f"{log_prefix} Error checking state of lingering transaction {lingering_key_str}: {e}" ) keys_to_remove.append(existing_key) if keys_to_remove: logging.debug( f"{log_prefix} Cleaning up {len(keys_to_remove)} lingering keys: {keys_to_remove}" ) for key_to_remove in keys_to_remove: self.cleanup_transaction(key_to_remove) # Logs internally def _calculate_pixel_data_offset(self, image_leader): """Calculates the expected byte offset to the start of the pixel data.""" log_prefix = "[Receiver Pixel Offset]" # Specific prefix try: # Calculate offset by summing known structure sizes offset = ( DataTag.size() + HeaderData.size() # Header Tag & Data + DataTag.size() + GeoData.size() # Geo Tag & Data + DataTag.size() + ImageLeaderData.get_reserved_data_size() # Reserved Tag & Data + DataTag.size() + ImageLeaderData.get_colour_map_size() # CM Tag & Data + DataTag.size() # Pixel Tag size ) # DEBUG log for calculated offset logging.debug( f"{log_prefix} Calculated standard pixel data offset: {offset} bytes." ) return offset except Exception as e: # EXCEPTION for calculation errors (should be unlikely with static sizes) logging.exception(f"{log_prefix} Error calculating pixel data offset:") # Fallback using estimated size - WARNING for using fallback fallback_offset = ImageLeaderData.size() logging.warning( f"{log_prefix} Using fallback pixel data offset estimate: {fallback_offset}" ) return fallback_offset def reassemble_sar_image(self, image_leader, image_data, log_prefix): """ Extracts SAR metadata and pixel data (normalized uint8) from buffer. Handles corrected radian interpretation for orientation. Args: image_leader (ImageLeaderData): Parsed leader structure. image_data (bytearray): The complete image data buffer. log_prefix (str): The logging prefix (e.g., "[Worker SAR/TID]"). Returns: tuple: (normalized_image_uint8, geo_info_radians_dict) or None on error. """ # Inherits log_prefix (e.g., "[Worker SAR/TID]") logging.debug(f"{log_prefix} Entering reassemble_sar_image.") fcounter = image_leader.HEADER_DATA.FCOUNTER image_key_log = f"SAR(FCNT={fcounter})" # For specific logs within this func try: # 1. Extract and validate HeaderData - DEBUG for details hdr_d = image_leader.HEADER_DATA dx, dy, bpp = int(hdr_d.DX), int(hdr_d.DY), int(hdr_d.BPP) stride_pixels, pal_type = int(hdr_d.STRIDE), int(hdr_d.PALTYPE) logging.debug( f"{log_prefix} {image_key_log}: Extracted HeaderData: DX={dx}, DY={dy}, BPP={bpp}, Stride(px)={stride_pixels}, PALTYPE={pal_type}" ) if ( dx <= 0 or dy <= 0 or bpp not in [1, 2] or stride_pixels < dx or pal_type != 0 ): # ERROR for invalid metadata logging.error( f"{log_prefix} {image_key_log}: Invalid SAR metadata. Cannot reassemble." ) return None pixel_dtype = np.uint8 if bpp == 1 else np.uint16 pixel_bytes = bpp # 2. Calculate pixel offset - DEBUG for offset calc pixel_data_offset = self._calculate_pixel_data_offset( image_leader ) # Logs internally logging.debug( f"{log_prefix} {image_key_log}: Using pixel data offset: {pixel_data_offset}" ) # 3. Validate offset and buffer size - DEBUG for validation steps available_data_length = len(image_data) logging.debug( f"{log_prefix} {image_key_log}: Validating offset ({pixel_data_offset}) vs buffer size ({available_data_length})." ) if pixel_data_offset >= available_data_length: # ERROR if offset invalid logging.error( f"{log_prefix} {image_key_log}: Pixel offset >= buffer size. Cannot extract pixel data." ) return None minimum_required_core_bytes = dy * dx * pixel_bytes actual_pixel_bytes_available = available_data_length - pixel_data_offset if actual_pixel_bytes_available < minimum_required_core_bytes: # ERROR if insufficient data logging.error( f"{log_prefix} {image_key_log}: Insufficient pixel data in buffer (Need min {minimum_required_core_bytes}, Found {actual_pixel_bytes_available})." ) return None logging.debug(f"{log_prefix} {image_key_log}: Buffer size validated.") # 4. Create NumPy view - DEBUG for view creation attempt try: stride_bytes = stride_pixels * pixel_bytes logging.debug( f"{log_prefix} {image_key_log}: Creating NumPy view: Shape=({dy},{dx}), Dtype={pixel_dtype}, Offset={pixel_data_offset}, Strides=({stride_bytes},{pixel_bytes})" ) sar_image_view = np.ndarray( shape=(dy, dx), dtype=pixel_dtype, buffer=image_data, offset=pixel_data_offset, strides=(stride_bytes, pixel_bytes), ) logging.debug( f"{log_prefix} {image_key_log}: NumPy view created successfully." ) except ValueError as ve: # ERROR for view creation failure logging.error( f"{log_prefix} {image_key_log}: Failed to create SAR NumPy view (Shape/stride/offset mismatch?): {ve}" ) return None # 5. Shutdown Check if self.app.state.shutting_down: logging.info( f"{log_prefix} Shutdown detected before normalization. Exiting reassemble." ) return None # 6. Normalize image view to uint8 - DEBUG for normalization step logging.debug( f"{log_prefix} {image_key_log}: Normalizing SAR view to uint8..." ) normalized_image_uint8 = normalize_image( sar_image_view, target_type=np.uint8 ) # Logs internally if normalized_image_uint8 is None: # ERROR for normalization failure logging.error( f"{log_prefix} {image_key_log}: SAR normalization to uint8 failed." ) return None logging.debug( f"{log_prefix} {image_key_log}: Normalization complete (Shape: {normalized_image_uint8.shape})." ) # 7. Extract and Convert Geo Info (RADIANS) - Use specific prefix geo_log_prefix = "[Geo extract]" geo_info_radians = {"valid": False} try: geo_d = image_leader.GEO_DATA logging.debug( f"{geo_log_prefix} {image_key_log}: Extracting and interpreting GeoData (Orientation as RADIANS)..." ) # Read orientation directly as RADIANS (corrected) orient_rad_raw = float(geo_d.ORIENTATION) # Read lat/lon as DEGREES (from structure assumption) and convert lat_deg_raw = float(geo_d.LATITUDE) lon_deg_raw = float(geo_d.LONGITUDE) lat_rad = math.radians(lat_deg_raw) lon_rad = math.radians(lon_deg_raw) # Store RADIANS internally geo_info_radians["lat"] = lat_rad geo_info_radians["lon"] = lon_rad geo_info_radians["orientation"] = orient_rad_raw geo_info_radians["ref_x"] = int(geo_d.REF_X) geo_info_radians["ref_y"] = int(geo_d.REF_Y) geo_info_radians["scale_x"] = float(geo_d.SCALE_X) geo_info_radians["scale_y"] = float(geo_d.SCALE_Y) geo_info_radians["width_px"] = dx geo_info_radians["height_px"] = dy # Validate scale - DEBUG for validation result if geo_info_radians["scale_x"] > 0 and geo_info_radians["scale_y"] > 0: geo_info_radians["valid"] = True # Log extracted values (DEBUG controlled by DEBUG_RECEIVER_GEO) orient_deg_for_log = math.degrees(orient_rad_raw) logging.debug( f"{geo_log_prefix} {image_key_log}: GeoInfo Extracted: Valid={geo_info_radians['valid']}, " f"Lat={lat_deg_raw:.4f}deg({lat_rad:.6f}rad), Lon={lon_deg_raw:.4f}deg({lon_rad:.6f}rad), " f"Orient={orient_deg_for_log:.2f}deg({orient_rad_raw:.6f}rad), " f"Ref=({geo_info_radians['ref_x']},{geo_info_radians['ref_y']}), " f"Scale=({geo_info_radians['scale_x']:.3f},{geo_info_radians['scale_y']:.3f}), " f"Size=({dx},{dy})" ) else: # WARNING for invalid scale marking Geo invalid logging.warning( f"{geo_log_prefix} {image_key_log}: Invalid scale values found (ScaleX={geo_info_radians['scale_x']}, ScaleY={geo_info_radians['scale_y']}). GeoInfo marked invalid." ) geo_info_radians["valid"] = False except OverflowError as oe: # ERROR for math errors logging.error( f"{geo_log_prefix} {image_key_log}: Math OverflowError during GeoData conversion: {oe}. GeoInfo marked invalid." ) geo_info_radians = {"valid": False} except Exception as e: # Keep EXCEPTION for other geo errors logging.exception( f"{geo_log_prefix} {image_key_log}: Failed during GeoData extraction/conversion: {e}" ) geo_info_radians = {"valid": False} # 8. Return results - DEBUG for successful exit logging.debug(f"{log_prefix} Exiting reassemble_sar_image successfully.") return normalized_image_uint8.copy(), geo_info_radians except Exception as e: # Keep EXCEPTION for unexpected errors in reassembly logging.exception( f"{log_prefix} {image_key_log}: Unexpected error during SAR reassembly: {e}" ) return None def reassemble_mfd_image(self, image_leader, image_data, log_prefix): """ Extracts MFD metadata and pixel indices (uint8) from buffer. Args: image_leader (ImageLeaderData): Parsed leader structure. image_data (bytearray): The complete image data buffer. log_prefix (str): The logging prefix (e.g., "[Worker MFD/TID]"). Returns: numpy.ndarray: Copy of the 2D uint8 index array, or None on error. """ # Inherits log_prefix (e.g., "[Worker MFD/TID]") logging.debug(f"{log_prefix} Entering reassemble_mfd_image.") fcounter = image_leader.HEADER_DATA.FCOUNTER image_key_log = f"MFD(FCNT={fcounter})" # For specific logs try: # 1. Extract and validate HeaderData - DEBUG for details hdr_d = image_leader.HEADER_DATA dx, dy, bpp = int(hdr_d.DX), int(hdr_d.DY), int(hdr_d.BPP) stride_pixels, pal_type = int(hdr_d.STRIDE), int(hdr_d.PALTYPE) logging.debug( f"{log_prefix} {image_key_log}: Extracted HeaderData: DX={dx}, DY={dy}, BPP={bpp}, Stride(px)={stride_pixels}, PALTYPE={pal_type}" ) # MFD must be BPP=1, PALTYPE=1 if dx <= 0 or dy <= 0 or bpp != 1 or stride_pixels < dx or pal_type != 1: # ERROR for invalid metadata logging.error( f"{log_prefix} {image_key_log}: Invalid MFD metadata. Cannot reassemble." ) return None pixel_bytes = 1 # 2. Calculate pixel offset - DEBUG for offset calc pixel_data_offset = self._calculate_pixel_data_offset( image_leader ) # Logs internally logging.debug( f"{log_prefix} {image_key_log}: Using pixel data offset: {pixel_data_offset}" ) # 3. Validate offset and buffer size - DEBUG for validation steps available_data_length = len(image_data) logging.debug( f"{log_prefix} {image_key_log}: Validating offset ({pixel_data_offset}) vs buffer size ({available_data_length})." ) if pixel_data_offset >= available_data_length: # ERROR if offset invalid logging.error( f"{log_prefix} {image_key_log}: Pixel offset >= buffer size. Cannot extract pixel data." ) return None minimum_required_core_bytes = dy * dx * pixel_bytes actual_pixel_bytes_available = available_data_length - pixel_data_offset if actual_pixel_bytes_available < minimum_required_core_bytes: # ERROR if insufficient data logging.error( f"{log_prefix} {image_key_log}: Insufficient pixel data in buffer (Need min {minimum_required_core_bytes}, Found {actual_pixel_bytes_available})." ) return None logging.debug(f"{log_prefix} {image_key_log}: Buffer size validated.") # 4. Create NumPy view for indices (uint8) - DEBUG for view creation try: stride_bytes = stride_pixels * pixel_bytes logging.debug( f"{log_prefix} {image_key_log}: Creating NumPy view: Shape=({dy},{dx}), Dtype=uint8, Offset={pixel_data_offset}, Strides=({stride_bytes},{pixel_bytes})" ) mfd_index_view = np.ndarray( shape=(dy, dx), dtype=np.uint8, buffer=image_data, offset=pixel_data_offset, strides=(stride_bytes, pixel_bytes), ) logging.debug( f"{log_prefix} {image_key_log}: NumPy view created successfully." ) except ValueError as ve: # ERROR for view creation failure logging.error( f"{log_prefix} {image_key_log}: Failed to create MFD index NumPy view (Shape/stride/offset mismatch?): {ve}" ) return None # 5. Shutdown Check if self.app.state.shutting_down: logging.info( f"{log_prefix} Shutdown detected after MFD view creation. Exiting reassemble." ) return None # 6. Return a COPY of the index view - DEBUG for successful exit logging.debug( f"{log_prefix} Exiting reassemble_mfd_image successfully with index copy." ) return mfd_index_view.copy() except Exception as e: # Keep EXCEPTION for unexpected errors logging.exception( f"{log_prefix} {image_key_log}: Unexpected error during MFD index reassembly: {e}" ) return None def save_binary_dump(self, data, filename): """Saves raw packet data to a binary file if enabled in config.""" log_prefix = "[Receiver Dump]" # Specific prefix if not self.save_dumps: return # Check again try: with open(filename, "wb") as f: f.write(data) # DEBUG for successful save logging.debug( f"{log_prefix} Saved binary dump to {filename} ({len(data)} bytes)" ) except Exception as e: # ERROR for save failure logging.error(f"{log_prefix} Error saving binary dump to {filename}: {e}") def send_ack(self, dest_addr, original_header_bytes): """Sends an SFP ACK packet back to the sender using the provided address.""" log_prefix = "[Receiver ACK]" # Specific prefix try: header_size = SFPHeader.size() if len(original_header_bytes) < header_size: logging.error( f"{log_prefix} ACK fail to {dest_addr}: Original header too short ({len(original_header_bytes)} bytes)." ) return ack_header = bytearray(original_header_bytes[:header_size]) # Determine Flow Type and Window Size - DEBUG for internal steps flow_offset = SFPHeader.get_field_offset("SFP_FLOW") win_offset = SFPHeader.get_field_offset("SFP_WIN") flags_offset = SFPHeader.get_field_offset("SFP_FLAGS") tid_offset = SFPHeader.get_field_offset("SFP_TID") if ( flow_offset == -1 or win_offset == -1 or flags_offset == -1 or tid_offset == -1 ): logging.error( f"{log_prefix} Failed to get field offsets for ACK construction. Cannot send." ) return sfp_flow = ack_header[flow_offset] flow_char = chr(sfp_flow) if 32 <= sfp_flow <= 126 else f"0x{sfp_flow:02X}" sfp_tid = ack_header[tid_offset] # For logging window_size = 0 flow_name = f"Unknown({flow_char})" if flow_char == "M": window_size = config.ACK_WINDOW_SIZE_MFD flow_name = "MFD" elif flow_char == "S": window_size = config.ACK_WINDOW_SIZE_SAR flow_name = "SAR" else: logging.warning( f"{log_prefix} ACK for unknown SFP_FLOW {flow_char}. Using Window=0." ) window_size = max(0, min(window_size, 255)) logging.debug( f"{log_prefix} Determined Flow={flow_name}, TID={sfp_tid}, Window Size={window_size}" ) # Modify SFP_WIN ack_header[win_offset] = window_size # Modify SFP_FLAGS (Set ACK=1, Clear ACK_REQ=0) original_flags = ack_header[flags_offset] new_flags = (original_flags | 0x01) & ~0x02 # Set bit 0, clear bit 1 ack_header[flags_offset] = new_flags logging.debug( f"{log_prefix} Original Flags=0x{original_flags:02X}, New Flags=0x{new_flags:02X}" ) # Send ACK - DEBUG for send action if self.udp_socket and self.udp_socket.fileno() != -1: logging.debug( f"{log_prefix} Sending ACK ({len(ack_header)}b) to {dest_addr}..." ) bytes_sent = self.udp_socket.sendto(ack_header, dest_addr) logging.debug( f"{log_prefix} ACK ({bytes_sent}b) sent successfully to {dest_addr} for TID={sfp_tid}, Flow={flow_name}, Win={window_size}" ) else: # WARNING if socket closed before sending logging.warning( f"{log_prefix} Cannot send ACK to {dest_addr}: UDP socket is closed or invalid." ) except Exception as e: # Keep EXCEPTION for unexpected errors during ACK logging.exception( f"{log_prefix} Unexpected error sending ACK to {dest_addr}: {e}" ) # Within UdpReceiver class in receiver.py def process_completed_image(self, sfp_flow, sfp_tid, image_data): """Processes a fully reassembled image buffer in a worker thread.""" image_key = (sfp_flow, sfp_tid) flow_char = chr(sfp_flow) if 32 <= sfp_flow <= 126 else f"0x{sfp_flow:02X}" log_prefix = f"[Worker {flow_char}/{sfp_tid}]" logging.debug( f"{log_prefix} Starting image processing task. Buffer size: {len(image_data) if image_data else 'None'}" ) try: if self.app.state.shutting_down: logging.info(f"{log_prefix} Shutdown detected at start. Aborting.") return if not image_data or len(image_data) < ImageLeaderData.size(): logging.error(f"{log_prefix} Invalid/incomplete data buffer. Aborting.") return try: logging.debug(f"{log_prefix} Parsing ImageLeaderData...") image_leader = ImageLeaderData.from_buffer(image_data) # Access a field to force potential error _ = image_leader.HEADER_TAG.VALID logging.debug(f"{log_prefix} ImageLeaderData parsed.") except Exception as parse_err: logging.error( f"{log_prefix} Failed to parse ImageLeaderData: {parse_err}. Aborting." ) return # --- LOG AGGIUNTO: Verifica tipo --- pal_type = image_leader.HEADER_DATA.PALTYPE hdr_type = image_leader.HEADER_DATA.TYPE bpp = image_leader.HEADER_DATA.BPP logging.debug( f"{log_prefix} Determining image type - PALTYPE={pal_type}, Header TYPE={hdr_type}, BPP={bpp}, Flow='{flow_char}'" ) # --- FINE LOG AGGIUNTO --- # --- MFD Branch --- if pal_type == 1: logging.debug( f"{log_prefix} Identified as MFD type based on PALTYPE=1." ) if bpp != 1: logging.warning( f"{log_prefix} MFD data has BPP={bpp} (expected 1)." ) if flow_char != "M": logging.warning( f"{log_prefix} MFD data received on unexpected flow '{flow_char}'." ) if self.app.state.shutting_down: logging.info( f"{log_prefix} Shutdown detected before MFD reassembly." ) return # --- LOG AGGIUNTO: Prima della chiamata --- logging.debug(f"{log_prefix} CALLING reassemble_mfd_image...") mfd_indices = self.reassemble_mfd_image( image_leader, image_data, log_prefix ) logging.debug( f"{log_prefix} RETURNED from reassemble_mfd_image. Result is None: {mfd_indices is None}" ) # --- FINE LOG AGGIUNTO --- if self.app.state.shutting_down: logging.info( f"{log_prefix} Shutdown detected after MFD reassembly." ) return if mfd_indices is not None: logging.debug( f"{log_prefix} MFD indices reassembled. Calling App callback (handle_new_mfd_data)..." ) # --- Questa è la chiamata che non avviene --- self.set_new_mfd_image_callback( mfd_indices ) # This was the original name passed in __init__ logging.debug(f"{log_prefix} Returned from App MFD callback.") else: logging.error( f"{log_prefix} MFD reassembly failed (returned None). Callback NOT called." ) # --- SAR Branch --- elif pal_type == 0: # (Codice SAR esistente...) logging.debug( f"{log_prefix} Identified as SAR type based on PALTYPE=0." ) # ... (resto del codice SAR) ... # --- LOG AGGIUNTO: Prima della chiamata --- logging.debug(f"{log_prefix} CALLING reassemble_sar_image...") result = self.reassemble_sar_image(image_leader, image_data, log_prefix) logging.debug( f"{log_prefix} RETURNED from reassemble_sar_image. Result is None: {result is None}" ) # --- FINE LOG AGGIUNTO --- if result is not None: normalized_sar_uint8, geo_info_radians = result logging.debug( f"{log_prefix} SAR reassembled. Calling App callback (handle_new_sar_data)..." ) # --- Questa è la chiamata SAR --- self.set_new_sar_image_callback( normalized_sar_uint8, geo_info_radians ) # Original name passed in init logging.debug(f"{log_prefix} Returned from App SAR callback.") else: logging.error( f"{log_prefix} SAR reassembly failed (returned None). Callback NOT called." ) # --- Unknown Type --- else: logging.error( f"{log_prefix} Unsupported PALTYPE={pal_type} encountered. Cannot process." ) except Exception as e: if not self.app.state.shutting_down: logging.exception( f"{log_prefix} Unexpected error processing completed image:" ) finally: logging.debug(f"{log_prefix} Entering finally block for cleanup.") self.cleanup_transaction_fragments(image_key) logging.debug(f"{log_prefix} Exiting image processing task.") def cleanup_transaction_fragments(self, image_key): """Removes only the fragment tracking data for a transaction.""" log_prefix = "[Receiver Cleanup]" # Use cleanup prefix frags_popped = self.image_fragments.pop(image_key, None) if frags_popped is not None: flow_char = ( chr(image_key[0]) if 32 <= image_key[0] <= 126 else f"0x{image_key[0]:02X}" ) key_str = f"({flow_char},{image_key[1]})" # DEBUG for successful cleanup logging.debug( f"{log_prefix} Cleaned up fragment tracking for Key={key_str}" ) # else: # Optional: Log if called for non-existent key # logging.debug(f"{log_prefix} Cleanup fragments called for non-existent Key={image_key}") def cleanup_transaction(self, image_key): """Removes all transaction data (fragments tracking, buffer) from memory.""" log_prefix = "[Receiver Cleanup]" # Use cleanup prefix frags_popped = self.image_fragments.pop(image_key, None) buffer_popped = self.image_data_buffers.pop(image_key, None) flow_char = ( chr(image_key[0]) if 32 <= image_key[0] <= 126 else f"0x{image_key[0]:02X}" ) key_str = f"({flow_char},{image_key[1]})" if frags_popped is not None or buffer_popped is not None: # DEBUG for successful cleanup logging.debug( f"{log_prefix} Cleaned up resources for Key={key_str} " f"(Fragments:{frags_popped is not None}, Buffer:{buffer_popped is not None})" ) # else: # Optional: Log if called for non-existent key # logging.debug(f"{log_prefix} Cleanup called for non-existent Key={key_str}") # --- END OF FILE receiver.py ---