diff --git a/receiver.py b/receiver.py index d017142..1ea59dd 100644 --- a/receiver.py +++ b/receiver.py @@ -550,65 +550,82 @@ class UdpReceiver: ) def _cleanup_lingering_transactions(self, current_flow, current_tid): - """Checks for and cleans up old transactions for the same flow.""" - log_prefix = "[Receiver Cleanup]" # Specific prefix - keys_to_remove = [] - current_flow_char = ( - chr(current_flow) if 32 <= current_flow <= 126 else f"0x{current_flow:02X}" - ) + """ + Checks for and cleans up old transactions for the same flow, handling potential race conditions. + This is called when a new fragment 0 arrives for a flow/tid pair, indicating + any previous transaction for the same flow should be considered finished or lost. + """ + log_prefix = "[Receiver Cleanup]" + # Format characters for logging + current_flow_char = chr(current_flow) if 32 <= current_flow <= 126 else f"0x{current_flow:02X}" new_key_str = f"({current_flow_char},{current_tid})" + # Iterate over a copy of the keys to avoid RuntimeError if dict changes size during iteration + # Note: This protects against adding new keys, but not removal. Pop handles removal safely. for existing_key in list(self.image_fragments.keys()): existing_flow, existing_tid = existing_key + # Check if the existing transaction is for the *same flow* but a *different (older) transaction ID* if existing_flow == current_flow and existing_tid != current_tid: - existing_flow_char = ( - chr(existing_flow) - if 32 <= existing_flow <= 126 - else f"0x{existing_flow:02X}" - ) + existing_flow_char = chr(existing_flow) if 32 <= existing_flow <= 126 else f"0x{existing_flow:02X}" lingering_key_str = f"({existing_flow_char},{existing_tid})" - # WARNING for detecting and cleaning up lingering transaction - logging.warning( - f"{log_prefix} Detected lingering transaction {lingering_key_str} while starting {new_key_str}. Cleaning up." - ) + # Log that we found an older transaction for this flow + logging.warning(f"{log_prefix} Detected lingering transaction {lingering_key_str} while starting new transaction {new_key_str}. Attempting cleanup.") - # Check if incomplete (DEBUG level for details) - try: - fragments = self.image_fragments[existing_key] - received_count = len(fragments) - expected_count = -1 - if fragments: - expected_count = next(iter(fragments.values())) + # --- Safely get and remove fragment data using pop() --- + # This prevents KeyError if the worker thread removes the key concurrently. + # fragments will be the dictionary of received fragments if the key existed, otherwise None. + fragments = self.image_fragments.pop(existing_key, None) - if expected_count > 0 and received_count < expected_count: - logging.debug( - f"{log_prefix} Lingering transaction {lingering_key_str} was incomplete ({received_count}/{expected_count}). Marking as lost." - ) - with self._stats_lock: - if existing_flow == 0x53: - self.incomplete_sar_count += 1 # 'S' - elif existing_flow == 0x4D: - self.incomplete_mfd_count += 1 # 'M' - else: - logging.debug( - f"{log_prefix} Lingering transaction {lingering_key_str} state: {received_count}/{expected_count}. Proceeding with cleanup." - ) - except Exception as e: - # Log exception during check but still remove - logging.exception( - f"{log_prefix} Error checking state of lingering transaction {lingering_key_str}: {e}" - ) + # --- Check state of the lingering transaction only if we successfully retrieved fragment data --- + if fragments is not None: + try: + received_count = len(fragments) + expected_count = -1 # Default if no fragments had total count info - keys_to_remove.append(existing_key) + # Try to get the expected total from the first available fragment's value + if fragments: + # Get value from any fragment (they should all have the same total) + expected_count = next(iter(fragments.values())) - if keys_to_remove: - logging.debug( - f"{log_prefix} Cleaning up {len(keys_to_remove)} lingering keys: {keys_to_remove}" - ) - for key_to_remove in keys_to_remove: - self.cleanup_transaction(key_to_remove) # Logs internally + # Check if the transaction was incomplete + if expected_count > 0 and received_count < expected_count: + logging.debug(f"{log_prefix} Lingering transaction {lingering_key_str} was incomplete ({received_count}/{expected_count}). Marking as lost.") + # Safely update the global incomplete count via AppState + img_type = "unknown" + if existing_flow == 0x53: # ASCII 'S' for SAR + img_type = "sar" + elif existing_flow == 0x4D: # ASCII 'M' for MFD + img_type = "mfd" + + if img_type != "unknown" and self.app and hasattr(self.app, 'state'): + self.app.state.increment_incomplete_rx_count(img_type) + else: + logging.warning(f"{log_prefix} Could not increment incomplete count for {lingering_key_str} (type='{img_type}', app/state missing?).") + else: + # Logged if transaction was complete or expected count unknown + logging.debug(f"{log_prefix} Lingering transaction {lingering_key_str} state ({received_count}/{expected_count}) processed during cleanup.") + + except Exception as e: + # Log error during the state check, but cleanup continues + logging.exception(f"{log_prefix} Error checking state of popped lingering transaction {lingering_key_str}: {e}") + else: + # This branch is hit if the worker thread already removed the fragment entry + logging.debug(f"{log_prefix} Fragment tracking for lingering transaction {lingering_key_str} was already removed (likely by worker).") + + # --- Always attempt to remove the corresponding data buffer --- + # Use pop() here as well for safety, although it's less likely to race + # with buffer removal compared to fragment removal. + buffer_popped = self.image_data_buffers.pop(existing_key, None) + if buffer_popped is not None: + logging.debug(f"{log_prefix} Cleaned up data buffer for lingering key {lingering_key_str}.") + # Explicitly delete the large bytearray to potentially help GC sooner + del buffer_popped + # else: Buffer was already gone + + # Log completion of cleanup for this specific lingering key + logging.debug(f"{log_prefix} Finished cleanup attempt for lingering key {lingering_key_str}.") def _calculate_pixel_data_offset(self, image_leader): """Calculates the expected byte offset to the start of the pixel data."""