SXXXXXXX_ControlPanel/receiver.py
2025-04-08 07:53:55 +02:00

1217 lines
52 KiB
Python

# --- START OF FILE receiver.py ---
# receiver.py
"""
THIS SOFTWARE IS PROVIDED “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
Handles UDP reception, SFP fragment reassembly, metadata extraction, ACK handling,
and submission of completed images (SAR/MFD) to worker threads for processing.
Tracks incomplete transactions and uses standardized logging with prefixes.
"""
# Standard library imports
import socket
import logging
import struct
import os
import ctypes
import queue # Only needed for type hinting if used, not directly used here
import cProfile
import pstats
import threading
import time
import math
# Third-party imports
import numpy as np
import cv2 # Keep for potential internal use, though main processing moved
# Local application imports
import config
from concurrent.futures import ThreadPoolExecutor
# Import specific function needed by this module
from image_processing import normalize_image
# --- Data Structures Definition using ctypes ---
class SFPHeader(ctypes.Structure):
"""Structure representing the SFP header (32 bytes)."""
_pack_ = 1
_fields_ = [
("SFP_MARKER", ctypes.c_uint8),
("SFP_DIRECTION", ctypes.c_uint8),
("SFP_PROT_VER", ctypes.c_uint8),
("SFP_PT_SPARE", ctypes.c_uint8),
("SFP_TAG", ctypes.c_uint8),
("SFP_SRC", ctypes.c_uint8),
("SFP_FLOW", ctypes.c_uint8),
("SFP_TID", ctypes.c_uint8),
("SFP_FLAGS", ctypes.c_uint8),
("SFP_WIN", ctypes.c_uint8),
("SFP_ERR", ctypes.c_uint8),
("SFP_ERR_INFO", ctypes.c_uint8),
("SFP_TOTFRGAS", ctypes.c_uint16),
("SFP_FRAG", ctypes.c_uint16),
("SFP_RECTYPE", ctypes.c_uint8),
("SFP_RECSPARE", ctypes.c_uint8),
("SFP_PLDAP", ctypes.c_uint8),
("SFP_PLEXT", ctypes.c_uint8),
("SFP_RECCOUNTER", ctypes.c_uint16),
("SFP_PLSIZE", ctypes.c_uint16),
("SFP_TOTSIZE", ctypes.c_uint32),
("SFP_PLOFFSET", ctypes.c_uint32),
]
@classmethod
def size(cls):
return ctypes.sizeof(cls)
# Helper to get field offset, handling potential AttributeError
@staticmethod
def get_field_offset(field_name):
try:
return getattr(SFPHeader, field_name).offset
except AttributeError:
return -1 # Indicate error
class DataTag(ctypes.Structure):
"""Structure representing a generic data tag (8 bytes)."""
_pack_ = 1
_fields_ = [
("ID", ctypes.c_uint8 * 2),
("VALID", ctypes.c_uint8),
("VERSION", ctypes.c_uint8),
("SIZE", ctypes.c_uint32),
]
@classmethod
def size(cls):
return ctypes.sizeof(cls)
class HeaderData(ctypes.Structure):
"""Structure representing the HEADER_DATA block (~48 bytes)."""
_pack_ = 1
_fields_ = [
("TYPE", ctypes.c_uint8),
("SUBTYPE", ctypes.c_uint8),
("LOGCOLORS", ctypes.c_uint8),
("IMG_RESERVED", ctypes.c_uint8),
("PRODINFO", ctypes.c_uint32 * 2),
("TOD", ctypes.c_uint16 * 2),
("RESERVED", ctypes.c_uint32),
("FCOUNTER", ctypes.c_uint32),
("TIMETAG", ctypes.c_uint32),
("NOMINALFRATE", ctypes.c_uint16),
("FRAMETAG", ctypes.c_uint16),
("OX", ctypes.c_uint16),
("OY", ctypes.c_uint16),
("DX", ctypes.c_uint16),
("DY", ctypes.c_uint16),
("STRIDE", ctypes.c_uint16),
("BPP", ctypes.c_uint8),
("COMP", ctypes.c_uint8),
("SPARE", ctypes.c_uint16),
("PALTYPE", ctypes.c_uint8),
("GAP", ctypes.c_uint8),
]
@classmethod
def size(cls):
return ctypes.sizeof(cls)
class GeoData(ctypes.Structure):
"""Structure representing the GEO_DATA block (~80 bytes). Assumes specific types."""
_pack_ = 1
_fields_ = [
("INVMASK", ctypes.c_uint32),
("ORIENTATION", ctypes.c_float), # RADIANS (as per correction)
("LATITUDE", ctypes.c_float), # Degrees (as assumed by documentation/structure)
(
"LONGITUDE",
ctypes.c_float,
), # Degrees (as assumed by documentation/structure)
("REF_X", ctypes.c_uint16),
("REF_Y", ctypes.c_uint16),
("SCALE_X", ctypes.c_float),
("SCALE_Y", ctypes.c_float),
("POI_ORIENTATION", ctypes.c_float),
("POI_LATITUDE", ctypes.c_float),
("POI_LONGITUDE", ctypes.c_float),
("POI_ALTITUDE", ctypes.c_float),
("POI_X", ctypes.c_uint16),
("POI_Y", ctypes.c_uint16),
("SPARE", ctypes.c_uint32 * 7), # 28 bytes
]
@classmethod
def size(cls):
return ctypes.sizeof(cls)
class ImageLeaderData(ctypes.Structure):
"""Represents the complete Image Leader data structure (~1320 bytes)."""
_pack_ = 1
_fields_ = [
("HEADER_TAG", DataTag),
("HEADER_DATA", HeaderData),
("GEO_TAG", DataTag),
("GEO_DATA", GeoData),
("RESERVED_TAG", DataTag),
("RESERVED_DATA", ctypes.c_uint8 * 128),
("CM_TAG", DataTag),
("COLOUR_MAP", ctypes.c_uint32 * 256),
("PIXEL_TAG", DataTag),
]
@classmethod
def size(cls):
return ctypes.sizeof(cls)
@staticmethod
def get_reserved_data_size():
return 128 # ctypes.sizeof(ctypes.c_uint8 * 128)
@staticmethod
def get_colour_map_size():
return 1024 # ctypes.sizeof(ctypes.c_uint32 * 256)
# --- UDP Receiver Class ---
class UdpReceiver:
"""
Handles UDP reception, SFP fragment reassembly, metadata extraction,
image callback invocation, ACK sending, and statistics tracking.
Uses a ThreadPoolExecutor for processing completed images.
"""
def __init__(
self,
app, # Reference to the main App instance
udp_socket,
set_new_sar_image_callback,
set_new_mfd_indices_image_callback,
):
"""
Initializes the UDP receiver.
Args:
app (App): The main application instance (provides access to config, state).
udp_socket (socket.socket): The bound UDP socket to receive from.
set_new_sar_image_callback (callable): Callback for SAR images (norm_uint8, geo_info_radians).
set_new_mfd_indices_image_callback (callable): Callback for MFD images (indices_uint8).
"""
log_prefix = "[Receiver Init]" # Specific prefix
logging.debug(f"{log_prefix} Initializing UdpReceiver...")
self.app = app
self.udp_socket = udp_socket
self.set_new_sar_image_callback = set_new_sar_image_callback
self.set_new_mfd_image_callback = set_new_mfd_indices_image_callback
# Reassembly state dictionaries
self.image_fragments = {} # { (flow, tid): {frag_num: total_frags_expected} }
self.image_data_buffers = {} # { (flow, tid): bytearray }
logging.debug(f"{log_prefix} Reassembly dictionaries initialized.")
# Binary dump configuration
self.save_dumps = config.SAVE_BINARY_DUMPS
self.dump_dir = config.DUMP_DIRECTORY
if self.save_dumps:
if not os.path.exists(self.dump_dir):
try:
os.makedirs(self.dump_dir)
# INFO for directory creation
logging.info(
f"{log_prefix} Created binary dump directory: {self.dump_dir}"
)
except OSError as e:
# ERROR if creation fails
logging.error(
f"{log_prefix} Failed to create dump directory '{self.dump_dir}': {e}. Disabling dumps."
)
self.save_dumps = False
else:
logging.debug(
f"{log_prefix} Binary dump directory '{self.dump_dir}' already exists."
)
else:
logging.debug(f"{log_prefix} Binary dump saving is disabled.")
# Thread pool for image processing
try:
self.executor = ThreadPoolExecutor(
max_workers=config.MAX_WORKERS, thread_name_prefix="ImageProcessor"
)
# INFO for successful pool creation
logging.info(
f"{log_prefix} ThreadPoolExecutor created with max_workers={config.MAX_WORKERS}"
)
except Exception as pool_e:
# CRITICAL if pool creation fails, likely fatal for processing
logging.critical(
f"{log_prefix} Failed to create ThreadPoolExecutor: {pool_e}",
exc_info=True,
)
# Potentially raise exception or exit app here? For now, log and continue.
self.executor = None # Ensure executor is None
# Statistics
self.incomplete_sar_count = 0
self.incomplete_mfd_count = 0
self._stats_lock = threading.Lock()
logging.debug(f"{log_prefix} Statistics counters and lock initialized.")
logging.info(f"{log_prefix} UdpReceiver initialization complete.")
def get_incomplete_counts(self):
"""Returns the current counts of incomplete/lost transactions (thread-safe)."""
# No logging needed for simple getter typically
with self._stats_lock:
sar_count = self.incomplete_sar_count
mfd_count = self.incomplete_mfd_count
return sar_count, mfd_count
def receive_udp_data(self):
"""Main loop to listen for UDP packets, parse headers, and reassemble fragments."""
log_prefix = "[Receiver Loop]" # Prefix for general loop messages
if not self.udp_socket or self.udp_socket.fileno() == -1:
logging.critical(
f"{log_prefix} Receiver thread cannot start: socket is invalid or closed."
)
return
logging.info(
f"{log_prefix} Receiver thread started and listening on {self.udp_socket.getsockname()}."
)
profiler = None
if config.ENABLE_PROFILING:
profiler = cProfile.Profile()
profiler.enable()
logging.info(f"{log_prefix} Receiver profiling enabled.")
while True:
if self.app.state.shutting_down:
logging.info(f"{log_prefix} Shutdown detected. Exiting thread.")
break
try:
data, addr = self.udp_socket.recvfrom(65535)
if not data:
logging.warning(
f"{log_prefix} Received empty UDP packet from {addr}. Continuing."
)
continue
except socket.timeout:
logging.debug(f"{log_prefix} UDP socket timeout.")
continue
except OSError as e:
if self.app.state.shutting_down:
logging.info(
f"{log_prefix} UDP socket error during recvfrom (expected during shutdown): {e}. Exiting."
)
else:
logging.error(
f"{log_prefix} UDP socket error during recvfrom: {e}. Exiting thread."
)
break
except Exception as e:
logging.exception(f"{log_prefix} Unexpected error during UDP recvfrom:")
time.sleep(0.01)
continue
try:
header_size = SFPHeader.size()
if len(data) < header_size:
logging.warning(
f"{log_prefix} Rcvd packet too small ({len(data)} bytes) for SFP header from {addr}. Ignoring."
)
continue
try:
header = SFPHeader.from_buffer_copy(data)
except (ValueError, TypeError) as parse_err:
logging.error(
f"{log_prefix} Failed to parse SFP header from {addr}: {parse_err}. Ignoring packet."
)
continue
sfp_flow = header.SFP_FLOW
sfp_tid = header.SFP_TID
sfp_frag = header.SFP_FRAG
sfp_totfrgas = header.SFP_TOTFRGAS
sfp_plsize = header.SFP_PLSIZE
sfp_ploffset = header.SFP_PLOFFSET
sfp_flags = header.SFP_FLAGS
sfp_totsize = header.SFP_TOTSIZE
image_key = (sfp_flow, sfp_tid)
flow_char = (
chr(sfp_flow) if 32 <= sfp_flow <= 126 else f"0x{sfp_flow:02X}"
)
key_str = f"({flow_char},{sfp_tid})"
if config.DEBUG_RECEIVER_PACKETS:
logging.debug(
f"Rcvd: Key={key_str}, Frag={sfp_frag}/{sfp_totfrgas}, "
f"Size={sfp_plsize}, Offset={sfp_ploffset}, Flags=0x{sfp_flags:02X}, TotalSize={sfp_totsize}"
)
if sfp_flags & 0x01:
logging.debug(
f"{log_prefix} ACK Request received for Key={key_str}. Triggering ACK send."
)
self.send_ack(addr, data[:header_size])
if self.save_dumps:
dump_filename = (
f"{self.dump_dir}/pkt_{flow_char}_{sfp_tid}_{sfp_frag}.bin"
)
self.save_binary_dump(data, dump_filename)
if sfp_totfrgas == 0 or sfp_totfrgas > 60000:
logging.warning(
f"{log_prefix} Key={key_str}: Invalid total fragments count {sfp_totfrgas}. Ignoring packet."
)
continue
MAX_EXPECTED_IMG_SIZE = 100 * 1024 * 1024
MIN_EXPECTED_IMG_SIZE = ImageLeaderData.size()
if (
sfp_totsize < MIN_EXPECTED_IMG_SIZE
or sfp_totsize > MAX_EXPECTED_IMG_SIZE
):
logging.error(
f"{log_prefix} Key={key_str}, Frag={sfp_frag}: Invalid total image size {sfp_totsize} "
f"(Limits: {MIN_EXPECTED_IMG_SIZE}-{MAX_EXPECTED_IMG_SIZE}). Ignoring packet/transaction."
)
continue
if sfp_frag == 0:
self._cleanup_lingering_transactions(sfp_flow, sfp_tid)
# --- MODIFICATION START: Simplified Logging for New TX ---
# Always log as DEBUG, controlled by DEBUG_RECEIVER_REASSEMBLY
log_msg = (
f"[Receiver Reassembly] New TX Start: Key={key_str}, Total Frags={sfp_totfrgas}, "
f"Total Size={sfp_totsize}, From={addr}"
)
logging.debug(log_msg) # Now always DEBUG level
# --- MODIFICATION END ---
self.image_fragments[image_key] = {}
try:
if config.DEBUG_RECEIVER_REASSEMBLY:
logging.debug(
f"{log_prefix} Allocating {sfp_totsize} byte buffer for Key={key_str}"
)
self.image_data_buffers[image_key] = bytearray(sfp_totsize)
if config.DEBUG_RECEIVER_REASSEMBLY:
logging.debug(
f"{log_prefix} Buffer allocated successfully for Key={key_str}."
)
except (MemoryError, ValueError) as alloc_e:
logging.error(
f"{log_prefix} Failed to allocate buffer for Key={key_str} (Size: {sfp_totsize}): {alloc_e}. Ignoring transaction."
)
self.image_fragments.pop(image_key, None)
continue
if (
image_key not in self.image_fragments
or image_key not in self.image_data_buffers
):
logging.warning(
f"{log_prefix} Key={key_str}, Frag={sfp_frag}: Received fragment for inactive/cleaned transaction. Ignoring."
)
continue
self.image_fragments[image_key][sfp_frag] = sfp_totfrgas
payload = data[header_size:]
payload_len = len(payload)
bytes_to_copy = min(sfp_plsize, payload_len)
if sfp_plsize > payload_len:
logging.warning(
f"{log_prefix} Key={key_str}, Frag={sfp_frag}: Declared PLSIZE {sfp_plsize} > Actual Payload {payload_len}. Using actual length {payload_len}."
)
buff = self.image_data_buffers[image_key]
buff_len = len(buff)
start_offset = sfp_ploffset
end_offset = start_offset + bytes_to_copy
if start_offset >= buff_len:
logging.error(
f"{log_prefix} Key={key_str}, Frag={sfp_frag}: Payload offset {start_offset} is out of buffer bounds ({buff_len}). Ignoring fragment data."
)
continue
if end_offset > buff_len:
bytes_to_copy = buff_len - start_offset
logging.warning(
f"{log_prefix} Key={key_str}, Frag={sfp_frag}: Payload write (end {end_offset}) exceeds buffer length ({buff_len}). Truncating copy to {bytes_to_copy} bytes."
)
if bytes_to_copy > 0:
try:
if config.DEBUG_RECEIVER_REASSEMBLY:
logging.debug(
f"{log_prefix} Copying {bytes_to_copy} bytes to buffer for Key={key_str}, Frag={sfp_frag}, Offset={start_offset}."
)
buff[start_offset : start_offset + bytes_to_copy] = payload[
:bytes_to_copy
]
except IndexError as copy_idx_e:
logging.error(
f"{log_prefix} Key={key_str}, Frag={sfp_frag}: IndexError copying payload slice (Start:{start_offset}, End:{start_offset+bytes_to_copy}, BufLen:{buff_len}): {copy_idx_e}"
)
continue
except Exception as copy_e:
logging.error(
f"{log_prefix} Key={key_str}, Frag={sfp_frag}: Error copying payload to buffer: {copy_e}"
)
continue
received_frags_count = len(self.image_fragments[image_key])
expected_total = sfp_totfrgas
if received_frags_count == expected_total:
# --- MODIFICATION START: Simplified Logging for Completion ---
# Always log as DEBUG, controlled by DEBUG_RECEIVER_REASSEMBLY
log_msg = f"[Receiver Reassembly] Complete: Key={key_str}. Rcvd {received_frags_count}/{expected_total} frags. Submitting for processing."
logging.debug(log_msg) # Now always DEBUG level
# --- MODIFICATION END ---
completed_buffer = self.image_data_buffers[image_key]
if self.executor:
try:
if config.DEBUG_RECEIVER_REASSEMBLY:
logging.debug(
f"{log_prefix} Submitting task to executor for Key={key_str}."
)
self.executor.submit(
self.process_completed_image,
sfp_flow,
sfp_tid,
completed_buffer,
)
self.image_data_buffers.pop(image_key, None)
if config.DEBUG_RECEIVER_REASSEMBLY:
logging.debug(
f"{log_prefix} Buffer removed from receiver tracking for Key={key_str}."
)
except Exception as submit_e:
logging.exception(
f"{log_prefix} Failed to submit task to executor for Key={key_str}: {submit_e}"
)
self.cleanup_transaction(image_key)
else:
logging.error(
f"{log_prefix} Cannot submit task for Key={key_str}: ThreadPoolExecutor is not available."
)
self.cleanup_transaction(image_key)
elif received_frags_count > expected_total:
logging.warning(
f"{log_prefix} Key={key_str}: Received more fragments ({received_frags_count}) than expected ({expected_total})."
)
except Exception as e:
key_info_str = key_str if "key_str" in locals() else "Key=Unknown"
logging.exception(
f"{log_prefix} Error processing UDP packet data {key_info_str} from {addr}: {e}"
)
logging.info(f"{log_prefix} UDP data receiving thread stopped.")
if config.ENABLE_PROFILING and profiler:
profiler.disable()
logging.info(f"{log_prefix} Receiver profiling disabled. Saving stats...")
try:
stats_filename = (
f"receiver_profile_{time.strftime('%Y%m%d_%H%M%S')}.prof"
)
stats = pstats.Stats(profiler)
stats.sort_stats(pstats.SortKey.CUMULATIVE)
stats.dump_stats(stats_filename)
logging.info(
f"{log_prefix} Receiver profile stats saved to {stats_filename}"
)
except Exception as profile_e:
logging.exception(
f"{log_prefix} Error saving/printing profiler stats: {profile_e}"
)
def _cleanup_lingering_transactions(self, current_flow, current_tid):
"""Checks for and cleans up old transactions for the same flow."""
log_prefix = "[Receiver Cleanup]" # Specific prefix
keys_to_remove = []
current_flow_char = (
chr(current_flow) if 32 <= current_flow <= 126 else f"0x{current_flow:02X}"
)
new_key_str = f"({current_flow_char},{current_tid})"
for existing_key in list(self.image_fragments.keys()):
existing_flow, existing_tid = existing_key
if existing_flow == current_flow and existing_tid != current_tid:
existing_flow_char = (
chr(existing_flow)
if 32 <= existing_flow <= 126
else f"0x{existing_flow:02X}"
)
lingering_key_str = f"({existing_flow_char},{existing_tid})"
# WARNING for detecting and cleaning up lingering transaction
logging.warning(
f"{log_prefix} Detected lingering transaction {lingering_key_str} while starting {new_key_str}. Cleaning up."
)
# Check if incomplete (DEBUG level for details)
try:
fragments = self.image_fragments[existing_key]
received_count = len(fragments)
expected_count = -1
if fragments:
expected_count = next(iter(fragments.values()))
if expected_count > 0 and received_count < expected_count:
logging.debug(
f"{log_prefix} Lingering transaction {lingering_key_str} was incomplete ({received_count}/{expected_count}). Marking as lost."
)
with self._stats_lock:
if existing_flow == 0x53:
self.incomplete_sar_count += 1 # 'S'
elif existing_flow == 0x4D:
self.incomplete_mfd_count += 1 # 'M'
else:
logging.debug(
f"{log_prefix} Lingering transaction {lingering_key_str} state: {received_count}/{expected_count}. Proceeding with cleanup."
)
except Exception as e:
# Log exception during check but still remove
logging.exception(
f"{log_prefix} Error checking state of lingering transaction {lingering_key_str}: {e}"
)
keys_to_remove.append(existing_key)
if keys_to_remove:
logging.debug(
f"{log_prefix} Cleaning up {len(keys_to_remove)} lingering keys: {keys_to_remove}"
)
for key_to_remove in keys_to_remove:
self.cleanup_transaction(key_to_remove) # Logs internally
def _calculate_pixel_data_offset(self, image_leader):
"""Calculates the expected byte offset to the start of the pixel data."""
log_prefix = "[Receiver Pixel Offset]" # Specific prefix
try:
# Calculate offset by summing known structure sizes
offset = (
DataTag.size()
+ HeaderData.size() # Header Tag & Data
+ DataTag.size()
+ GeoData.size() # Geo Tag & Data
+ DataTag.size()
+ ImageLeaderData.get_reserved_data_size() # Reserved Tag & Data
+ DataTag.size()
+ ImageLeaderData.get_colour_map_size() # CM Tag & Data
+ DataTag.size() # Pixel Tag size
)
# DEBUG log for calculated offset
logging.debug(
f"{log_prefix} Calculated standard pixel data offset: {offset} bytes."
)
return offset
except Exception as e:
# EXCEPTION for calculation errors (should be unlikely with static sizes)
logging.exception(f"{log_prefix} Error calculating pixel data offset:")
# Fallback using estimated size - WARNING for using fallback
fallback_offset = ImageLeaderData.size()
logging.warning(
f"{log_prefix} Using fallback pixel data offset estimate: {fallback_offset}"
)
return fallback_offset
def reassemble_sar_image(self, image_leader, image_data, log_prefix):
"""
Extracts SAR metadata and pixel data (normalized uint8) from buffer.
Handles corrected radian interpretation for orientation.
Args:
image_leader (ImageLeaderData): Parsed leader structure.
image_data (bytearray): The complete image data buffer.
log_prefix (str): The logging prefix (e.g., "[Worker SAR/TID]").
Returns:
tuple: (normalized_image_uint8, geo_info_radians_dict) or None on error.
"""
# Inherits log_prefix (e.g., "[Worker SAR/TID]")
logging.debug(f"{log_prefix} Entering reassemble_sar_image.")
fcounter = image_leader.HEADER_DATA.FCOUNTER
image_key_log = f"SAR(FCNT={fcounter})" # For specific logs within this func
try:
# 1. Extract and validate HeaderData - DEBUG for details
hdr_d = image_leader.HEADER_DATA
dx, dy, bpp = int(hdr_d.DX), int(hdr_d.DY), int(hdr_d.BPP)
stride_pixels, pal_type = int(hdr_d.STRIDE), int(hdr_d.PALTYPE)
logging.debug(
f"{log_prefix} {image_key_log}: Extracted HeaderData: DX={dx}, DY={dy}, BPP={bpp}, Stride(px)={stride_pixels}, PALTYPE={pal_type}"
)
if (
dx <= 0
or dy <= 0
or bpp not in [1, 2]
or stride_pixels < dx
or pal_type != 0
):
# ERROR for invalid metadata
logging.error(
f"{log_prefix} {image_key_log}: Invalid SAR metadata. Cannot reassemble."
)
return None
pixel_dtype = np.uint8 if bpp == 1 else np.uint16
pixel_bytes = bpp
# 2. Calculate pixel offset - DEBUG for offset calc
pixel_data_offset = self._calculate_pixel_data_offset(
image_leader
) # Logs internally
logging.debug(
f"{log_prefix} {image_key_log}: Using pixel data offset: {pixel_data_offset}"
)
# 3. Validate offset and buffer size - DEBUG for validation steps
available_data_length = len(image_data)
logging.debug(
f"{log_prefix} {image_key_log}: Validating offset ({pixel_data_offset}) vs buffer size ({available_data_length})."
)
if pixel_data_offset >= available_data_length:
# ERROR if offset invalid
logging.error(
f"{log_prefix} {image_key_log}: Pixel offset >= buffer size. Cannot extract pixel data."
)
return None
minimum_required_core_bytes = dy * dx * pixel_bytes
actual_pixel_bytes_available = available_data_length - pixel_data_offset
if actual_pixel_bytes_available < minimum_required_core_bytes:
# ERROR if insufficient data
logging.error(
f"{log_prefix} {image_key_log}: Insufficient pixel data in buffer (Need min {minimum_required_core_bytes}, Found {actual_pixel_bytes_available})."
)
return None
logging.debug(f"{log_prefix} {image_key_log}: Buffer size validated.")
# 4. Create NumPy view - DEBUG for view creation attempt
try:
stride_bytes = stride_pixels * pixel_bytes
logging.debug(
f"{log_prefix} {image_key_log}: Creating NumPy view: Shape=({dy},{dx}), Dtype={pixel_dtype}, Offset={pixel_data_offset}, Strides=({stride_bytes},{pixel_bytes})"
)
sar_image_view = np.ndarray(
shape=(dy, dx),
dtype=pixel_dtype,
buffer=image_data,
offset=pixel_data_offset,
strides=(stride_bytes, pixel_bytes),
)
logging.debug(
f"{log_prefix} {image_key_log}: NumPy view created successfully."
)
except ValueError as ve:
# ERROR for view creation failure
logging.error(
f"{log_prefix} {image_key_log}: Failed to create SAR NumPy view (Shape/stride/offset mismatch?): {ve}"
)
return None
# 5. Shutdown Check
if self.app.state.shutting_down:
logging.info(
f"{log_prefix} Shutdown detected before normalization. Exiting reassemble."
)
return None
# 6. Normalize image view to uint8 - DEBUG for normalization step
logging.debug(
f"{log_prefix} {image_key_log}: Normalizing SAR view to uint8..."
)
normalized_image_uint8 = normalize_image(
sar_image_view, target_type=np.uint8
) # Logs internally
if normalized_image_uint8 is None:
# ERROR for normalization failure
logging.error(
f"{log_prefix} {image_key_log}: SAR normalization to uint8 failed."
)
return None
logging.debug(
f"{log_prefix} {image_key_log}: Normalization complete (Shape: {normalized_image_uint8.shape})."
)
# 7. Extract and Convert Geo Info (RADIANS) - Use specific prefix
geo_log_prefix = "[Geo extract]"
geo_info_radians = {"valid": False}
try:
geo_d = image_leader.GEO_DATA
logging.debug(
f"{geo_log_prefix} {image_key_log}: Extracting and interpreting GeoData (Orientation as RADIANS)..."
)
# Read orientation directly as RADIANS (corrected)
orient_rad_raw = float(geo_d.ORIENTATION)
# Read lat/lon as DEGREES (from structure assumption) and convert
lat_deg_raw = float(geo_d.LATITUDE)
lon_deg_raw = float(geo_d.LONGITUDE)
lat_rad = math.radians(lat_deg_raw)
lon_rad = math.radians(lon_deg_raw)
# Store RADIANS internally
geo_info_radians["lat"] = lat_rad
geo_info_radians["lon"] = lon_rad
geo_info_radians["orientation"] = orient_rad_raw
geo_info_radians["ref_x"] = int(geo_d.REF_X)
geo_info_radians["ref_y"] = int(geo_d.REF_Y)
geo_info_radians["scale_x"] = float(geo_d.SCALE_X)
geo_info_radians["scale_y"] = float(geo_d.SCALE_Y)
geo_info_radians["width_px"] = dx
geo_info_radians["height_px"] = dy
# Validate scale - DEBUG for validation result
if geo_info_radians["scale_x"] > 0 and geo_info_radians["scale_y"] > 0:
geo_info_radians["valid"] = True
# Log extracted values (DEBUG controlled by DEBUG_RECEIVER_GEO)
orient_deg_for_log = math.degrees(orient_rad_raw)
logging.debug(
f"{geo_log_prefix} {image_key_log}: GeoInfo Extracted: Valid={geo_info_radians['valid']}, "
f"Lat={lat_deg_raw:.4f}deg({lat_rad:.6f}rad), Lon={lon_deg_raw:.4f}deg({lon_rad:.6f}rad), "
f"Orient={orient_deg_for_log:.2f}deg({orient_rad_raw:.6f}rad), "
f"Ref=({geo_info_radians['ref_x']},{geo_info_radians['ref_y']}), "
f"Scale=({geo_info_radians['scale_x']:.3f},{geo_info_radians['scale_y']:.3f}), "
f"Size=({dx},{dy})"
)
else:
# WARNING for invalid scale marking Geo invalid
logging.warning(
f"{geo_log_prefix} {image_key_log}: Invalid scale values found (ScaleX={geo_info_radians['scale_x']}, ScaleY={geo_info_radians['scale_y']}). GeoInfo marked invalid."
)
geo_info_radians["valid"] = False
except OverflowError as oe:
# ERROR for math errors
logging.error(
f"{geo_log_prefix} {image_key_log}: Math OverflowError during GeoData conversion: {oe}. GeoInfo marked invalid."
)
geo_info_radians = {"valid": False}
except Exception as e:
# Keep EXCEPTION for other geo errors
logging.exception(
f"{geo_log_prefix} {image_key_log}: Failed during GeoData extraction/conversion: {e}"
)
geo_info_radians = {"valid": False}
# 8. Return results - DEBUG for successful exit
logging.debug(f"{log_prefix} Exiting reassemble_sar_image successfully.")
return normalized_image_uint8.copy(), geo_info_radians
except Exception as e:
# Keep EXCEPTION for unexpected errors in reassembly
logging.exception(
f"{log_prefix} {image_key_log}: Unexpected error during SAR reassembly: {e}"
)
return None
def reassemble_mfd_image(self, image_leader, image_data, log_prefix):
"""
Extracts MFD metadata and pixel indices (uint8) from buffer.
Args:
image_leader (ImageLeaderData): Parsed leader structure.
image_data (bytearray): The complete image data buffer.
log_prefix (str): The logging prefix (e.g., "[Worker MFD/TID]").
Returns:
numpy.ndarray: Copy of the 2D uint8 index array, or None on error.
"""
# Inherits log_prefix (e.g., "[Worker MFD/TID]")
logging.debug(f"{log_prefix} Entering reassemble_mfd_image.")
fcounter = image_leader.HEADER_DATA.FCOUNTER
image_key_log = f"MFD(FCNT={fcounter})" # For specific logs
try:
# 1. Extract and validate HeaderData - DEBUG for details
hdr_d = image_leader.HEADER_DATA
dx, dy, bpp = int(hdr_d.DX), int(hdr_d.DY), int(hdr_d.BPP)
stride_pixels, pal_type = int(hdr_d.STRIDE), int(hdr_d.PALTYPE)
logging.debug(
f"{log_prefix} {image_key_log}: Extracted HeaderData: DX={dx}, DY={dy}, BPP={bpp}, Stride(px)={stride_pixels}, PALTYPE={pal_type}"
)
# MFD must be BPP=1, PALTYPE=1
if dx <= 0 or dy <= 0 or bpp != 1 or stride_pixels < dx or pal_type != 1:
# ERROR for invalid metadata
logging.error(
f"{log_prefix} {image_key_log}: Invalid MFD metadata. Cannot reassemble."
)
return None
pixel_bytes = 1
# 2. Calculate pixel offset - DEBUG for offset calc
pixel_data_offset = self._calculate_pixel_data_offset(
image_leader
) # Logs internally
logging.debug(
f"{log_prefix} {image_key_log}: Using pixel data offset: {pixel_data_offset}"
)
# 3. Validate offset and buffer size - DEBUG for validation steps
available_data_length = len(image_data)
logging.debug(
f"{log_prefix} {image_key_log}: Validating offset ({pixel_data_offset}) vs buffer size ({available_data_length})."
)
if pixel_data_offset >= available_data_length:
# ERROR if offset invalid
logging.error(
f"{log_prefix} {image_key_log}: Pixel offset >= buffer size. Cannot extract pixel data."
)
return None
minimum_required_core_bytes = dy * dx * pixel_bytes
actual_pixel_bytes_available = available_data_length - pixel_data_offset
if actual_pixel_bytes_available < minimum_required_core_bytes:
# ERROR if insufficient data
logging.error(
f"{log_prefix} {image_key_log}: Insufficient pixel data in buffer (Need min {minimum_required_core_bytes}, Found {actual_pixel_bytes_available})."
)
return None
logging.debug(f"{log_prefix} {image_key_log}: Buffer size validated.")
# 4. Create NumPy view for indices (uint8) - DEBUG for view creation
try:
stride_bytes = stride_pixels * pixel_bytes
logging.debug(
f"{log_prefix} {image_key_log}: Creating NumPy view: Shape=({dy},{dx}), Dtype=uint8, Offset={pixel_data_offset}, Strides=({stride_bytes},{pixel_bytes})"
)
mfd_index_view = np.ndarray(
shape=(dy, dx),
dtype=np.uint8,
buffer=image_data,
offset=pixel_data_offset,
strides=(stride_bytes, pixel_bytes),
)
logging.debug(
f"{log_prefix} {image_key_log}: NumPy view created successfully."
)
except ValueError as ve:
# ERROR for view creation failure
logging.error(
f"{log_prefix} {image_key_log}: Failed to create MFD index NumPy view (Shape/stride/offset mismatch?): {ve}"
)
return None
# 5. Shutdown Check
if self.app.state.shutting_down:
logging.info(
f"{log_prefix} Shutdown detected after MFD view creation. Exiting reassemble."
)
return None
# 6. Return a COPY of the index view - DEBUG for successful exit
logging.debug(
f"{log_prefix} Exiting reassemble_mfd_image successfully with index copy."
)
return mfd_index_view.copy()
except Exception as e:
# Keep EXCEPTION for unexpected errors
logging.exception(
f"{log_prefix} {image_key_log}: Unexpected error during MFD index reassembly: {e}"
)
return None
def save_binary_dump(self, data, filename):
"""Saves raw packet data to a binary file if enabled in config."""
log_prefix = "[Receiver Dump]" # Specific prefix
if not self.save_dumps:
return # Check again
try:
with open(filename, "wb") as f:
f.write(data)
# DEBUG for successful save
logging.debug(
f"{log_prefix} Saved binary dump to {filename} ({len(data)} bytes)"
)
except Exception as e:
# ERROR for save failure
logging.error(f"{log_prefix} Error saving binary dump to {filename}: {e}")
def send_ack(self, dest_addr, original_header_bytes):
"""Sends an SFP ACK packet back to the sender using the provided address."""
log_prefix = "[Receiver ACK]" # Specific prefix
try:
header_size = SFPHeader.size()
if len(original_header_bytes) < header_size:
logging.error(
f"{log_prefix} ACK fail to {dest_addr}: Original header too short ({len(original_header_bytes)} bytes)."
)
return
ack_header = bytearray(original_header_bytes[:header_size])
# Determine Flow Type and Window Size - DEBUG for internal steps
flow_offset = SFPHeader.get_field_offset("SFP_FLOW")
win_offset = SFPHeader.get_field_offset("SFP_WIN")
flags_offset = SFPHeader.get_field_offset("SFP_FLAGS")
tid_offset = SFPHeader.get_field_offset("SFP_TID")
if (
flow_offset == -1
or win_offset == -1
or flags_offset == -1
or tid_offset == -1
):
logging.error(
f"{log_prefix} Failed to get field offsets for ACK construction. Cannot send."
)
return
sfp_flow = ack_header[flow_offset]
flow_char = chr(sfp_flow) if 32 <= sfp_flow <= 126 else f"0x{sfp_flow:02X}"
sfp_tid = ack_header[tid_offset] # For logging
window_size = 0
flow_name = f"Unknown({flow_char})"
if flow_char == "M":
window_size = config.ACK_WINDOW_SIZE_MFD
flow_name = "MFD"
elif flow_char == "S":
window_size = config.ACK_WINDOW_SIZE_SAR
flow_name = "SAR"
else:
logging.warning(
f"{log_prefix} ACK for unknown SFP_FLOW {flow_char}. Using Window=0."
)
window_size = max(0, min(window_size, 255))
logging.debug(
f"{log_prefix} Determined Flow={flow_name}, TID={sfp_tid}, Window Size={window_size}"
)
# Modify SFP_WIN
ack_header[win_offset] = window_size
# Modify SFP_FLAGS (Set ACK=1, Clear ACK_REQ=0)
original_flags = ack_header[flags_offset]
new_flags = (original_flags | 0x01) & ~0x02 # Set bit 0, clear bit 1
ack_header[flags_offset] = new_flags
logging.debug(
f"{log_prefix} Original Flags=0x{original_flags:02X}, New Flags=0x{new_flags:02X}"
)
# Send ACK - DEBUG for send action
if self.udp_socket and self.udp_socket.fileno() != -1:
logging.debug(
f"{log_prefix} Sending ACK ({len(ack_header)}b) to {dest_addr}..."
)
bytes_sent = self.udp_socket.sendto(ack_header, dest_addr)
logging.debug(
f"{log_prefix} ACK ({bytes_sent}b) sent successfully to {dest_addr} for TID={sfp_tid}, Flow={flow_name}, Win={window_size}"
)
else:
# WARNING if socket closed before sending
logging.warning(
f"{log_prefix} Cannot send ACK to {dest_addr}: UDP socket is closed or invalid."
)
except Exception as e:
# Keep EXCEPTION for unexpected errors during ACK
logging.exception(
f"{log_prefix} Unexpected error sending ACK to {dest_addr}: {e}"
)
# Within UdpReceiver class in receiver.py
def process_completed_image(self, sfp_flow, sfp_tid, image_data):
"""Processes a fully reassembled image buffer in a worker thread."""
image_key = (sfp_flow, sfp_tid)
flow_char = chr(sfp_flow) if 32 <= sfp_flow <= 126 else f"0x{sfp_flow:02X}"
log_prefix = f"[Worker {flow_char}/{sfp_tid}]"
logging.debug(
f"{log_prefix} Starting image processing task. Buffer size: {len(image_data) if image_data else 'None'}"
)
try:
if self.app.state.shutting_down:
logging.info(f"{log_prefix} Shutdown detected at start. Aborting.")
return
if not image_data or len(image_data) < ImageLeaderData.size():
logging.error(f"{log_prefix} Invalid/incomplete data buffer. Aborting.")
return
try:
logging.debug(f"{log_prefix} Parsing ImageLeaderData...")
image_leader = ImageLeaderData.from_buffer(image_data)
# Access a field to force potential error
_ = image_leader.HEADER_TAG.VALID
logging.debug(f"{log_prefix} ImageLeaderData parsed.")
except Exception as parse_err:
logging.error(
f"{log_prefix} Failed to parse ImageLeaderData: {parse_err}. Aborting."
)
return
# --- LOG AGGIUNTO: Verifica tipo ---
pal_type = image_leader.HEADER_DATA.PALTYPE
hdr_type = image_leader.HEADER_DATA.TYPE
bpp = image_leader.HEADER_DATA.BPP
logging.debug(
f"{log_prefix} Determining image type - PALTYPE={pal_type}, Header TYPE={hdr_type}, BPP={bpp}, Flow='{flow_char}'"
)
# --- FINE LOG AGGIUNTO ---
# --- MFD Branch ---
if pal_type == 1:
logging.debug(
f"{log_prefix} Identified as MFD type based on PALTYPE=1."
)
if bpp != 1:
logging.warning(
f"{log_prefix} MFD data has BPP={bpp} (expected 1)."
)
if flow_char != "M":
logging.warning(
f"{log_prefix} MFD data received on unexpected flow '{flow_char}'."
)
if self.app.state.shutting_down:
logging.info(
f"{log_prefix} Shutdown detected before MFD reassembly."
)
return
# --- LOG AGGIUNTO: Prima della chiamata ---
logging.debug(f"{log_prefix} CALLING reassemble_mfd_image...")
mfd_indices = self.reassemble_mfd_image(
image_leader, image_data, log_prefix
)
logging.debug(
f"{log_prefix} RETURNED from reassemble_mfd_image. Result is None: {mfd_indices is None}"
)
# --- FINE LOG AGGIUNTO ---
if self.app.state.shutting_down:
logging.info(
f"{log_prefix} Shutdown detected after MFD reassembly."
)
return
if mfd_indices is not None:
logging.debug(
f"{log_prefix} MFD indices reassembled. Calling App callback (handle_new_mfd_data)..."
)
# --- Questa è la chiamata che non avviene ---
self.set_new_mfd_image_callback(
mfd_indices
) # This was the original name passed in __init__
logging.debug(f"{log_prefix} Returned from App MFD callback.")
else:
logging.error(
f"{log_prefix} MFD reassembly failed (returned None). Callback NOT called."
)
# --- SAR Branch ---
elif pal_type == 0:
# (Codice SAR esistente...)
logging.debug(
f"{log_prefix} Identified as SAR type based on PALTYPE=0."
)
# ... (resto del codice SAR) ...
# --- LOG AGGIUNTO: Prima della chiamata ---
logging.debug(f"{log_prefix} CALLING reassemble_sar_image...")
result = self.reassemble_sar_image(image_leader, image_data, log_prefix)
logging.debug(
f"{log_prefix} RETURNED from reassemble_sar_image. Result is None: {result is None}"
)
# --- FINE LOG AGGIUNTO ---
if result is not None:
normalized_sar_uint8, geo_info_radians = result
logging.debug(
f"{log_prefix} SAR reassembled. Calling App callback (handle_new_sar_data)..."
)
# --- Questa è la chiamata SAR ---
self.set_new_sar_image_callback(
normalized_sar_uint8, geo_info_radians
) # Original name passed in init
logging.debug(f"{log_prefix} Returned from App SAR callback.")
else:
logging.error(
f"{log_prefix} SAR reassembly failed (returned None). Callback NOT called."
)
# --- Unknown Type ---
else:
logging.error(
f"{log_prefix} Unsupported PALTYPE={pal_type} encountered. Cannot process."
)
except Exception as e:
if not self.app.state.shutting_down:
logging.exception(
f"{log_prefix} Unexpected error processing completed image:"
)
finally:
logging.debug(f"{log_prefix} Entering finally block for cleanup.")
self.cleanup_transaction_fragments(image_key)
logging.debug(f"{log_prefix} Exiting image processing task.")
def cleanup_transaction_fragments(self, image_key):
"""Removes only the fragment tracking data for a transaction."""
log_prefix = "[Receiver Cleanup]" # Use cleanup prefix
frags_popped = self.image_fragments.pop(image_key, None)
if frags_popped is not None:
flow_char = (
chr(image_key[0])
if 32 <= image_key[0] <= 126
else f"0x{image_key[0]:02X}"
)
key_str = f"({flow_char},{image_key[1]})"
# DEBUG for successful cleanup
logging.debug(
f"{log_prefix} Cleaned up fragment tracking for Key={key_str}"
)
# else: # Optional: Log if called for non-existent key
# logging.debug(f"{log_prefix} Cleanup fragments called for non-existent Key={image_key}")
def cleanup_transaction(self, image_key):
"""Removes all transaction data (fragments tracking, buffer) from memory."""
log_prefix = "[Receiver Cleanup]" # Use cleanup prefix
frags_popped = self.image_fragments.pop(image_key, None)
buffer_popped = self.image_data_buffers.pop(image_key, None)
flow_char = (
chr(image_key[0]) if 32 <= image_key[0] <= 126 else f"0x{image_key[0]:02X}"
)
key_str = f"({flow_char},{image_key[1]})"
if frags_popped is not None or buffer_popped is not None:
# DEBUG for successful cleanup
logging.debug(
f"{log_prefix} Cleaned up resources for Key={key_str} "
f"(Fragments:{frags_popped is not None}, Buffer:{buffer_popped is not None})"
)
# else: # Optional: Log if called for non-existent key
# logging.debug(f"{log_prefix} Cleanup called for non-existent Key={key_str}")
# --- END OF FILE receiver.py ---