solve problem into def _cleanup_lingering_transactions (race conditions)

This commit is contained in:
VALLONGOL 2025-04-09 16:00:02 +02:00
parent 801f5f208c
commit d6182ac1d2

View File

@ -550,65 +550,82 @@ class UdpReceiver:
)
def _cleanup_lingering_transactions(self, current_flow, current_tid):
"""Checks for and cleans up old transactions for the same flow."""
log_prefix = "[Receiver Cleanup]" # Specific prefix
keys_to_remove = []
current_flow_char = (
chr(current_flow) if 32 <= current_flow <= 126 else f"0x{current_flow:02X}"
)
"""
Checks for and cleans up old transactions for the same flow, handling potential race conditions.
This is called when a new fragment 0 arrives for a flow/tid pair, indicating
any previous transaction for the same flow should be considered finished or lost.
"""
log_prefix = "[Receiver Cleanup]"
# Format characters for logging
current_flow_char = chr(current_flow) if 32 <= current_flow <= 126 else f"0x{current_flow:02X}"
new_key_str = f"({current_flow_char},{current_tid})"
# Iterate over a copy of the keys to avoid RuntimeError if dict changes size during iteration
# Note: This protects against adding new keys, but not removal. Pop handles removal safely.
for existing_key in list(self.image_fragments.keys()):
existing_flow, existing_tid = existing_key
# Check if the existing transaction is for the *same flow* but a *different (older) transaction ID*
if existing_flow == current_flow and existing_tid != current_tid:
existing_flow_char = (
chr(existing_flow)
if 32 <= existing_flow <= 126
else f"0x{existing_flow:02X}"
)
existing_flow_char = chr(existing_flow) if 32 <= existing_flow <= 126 else f"0x{existing_flow:02X}"
lingering_key_str = f"({existing_flow_char},{existing_tid})"
# WARNING for detecting and cleaning up lingering transaction
logging.warning(
f"{log_prefix} Detected lingering transaction {lingering_key_str} while starting {new_key_str}. Cleaning up."
)
# Log that we found an older transaction for this flow
logging.warning(f"{log_prefix} Detected lingering transaction {lingering_key_str} while starting new transaction {new_key_str}. Attempting cleanup.")
# Check if incomplete (DEBUG level for details)
try:
fragments = self.image_fragments[existing_key]
received_count = len(fragments)
expected_count = -1
if fragments:
expected_count = next(iter(fragments.values()))
# --- Safely get and remove fragment data using pop() ---
# This prevents KeyError if the worker thread removes the key concurrently.
# fragments will be the dictionary of received fragments if the key existed, otherwise None.
fragments = self.image_fragments.pop(existing_key, None)
if expected_count > 0 and received_count < expected_count:
logging.debug(
f"{log_prefix} Lingering transaction {lingering_key_str} was incomplete ({received_count}/{expected_count}). Marking as lost."
)
with self._stats_lock:
if existing_flow == 0x53:
self.incomplete_sar_count += 1 # 'S'
elif existing_flow == 0x4D:
self.incomplete_mfd_count += 1 # 'M'
else:
logging.debug(
f"{log_prefix} Lingering transaction {lingering_key_str} state: {received_count}/{expected_count}. Proceeding with cleanup."
)
except Exception as e:
# Log exception during check but still remove
logging.exception(
f"{log_prefix} Error checking state of lingering transaction {lingering_key_str}: {e}"
)
# --- Check state of the lingering transaction only if we successfully retrieved fragment data ---
if fragments is not None:
try:
received_count = len(fragments)
expected_count = -1 # Default if no fragments had total count info
keys_to_remove.append(existing_key)
# Try to get the expected total from the first available fragment's value
if fragments:
# Get value from any fragment (they should all have the same total)
expected_count = next(iter(fragments.values()))
if keys_to_remove:
logging.debug(
f"{log_prefix} Cleaning up {len(keys_to_remove)} lingering keys: {keys_to_remove}"
)
for key_to_remove in keys_to_remove:
self.cleanup_transaction(key_to_remove) # Logs internally
# Check if the transaction was incomplete
if expected_count > 0 and received_count < expected_count:
logging.debug(f"{log_prefix} Lingering transaction {lingering_key_str} was incomplete ({received_count}/{expected_count}). Marking as lost.")
# Safely update the global incomplete count via AppState
img_type = "unknown"
if existing_flow == 0x53: # ASCII 'S' for SAR
img_type = "sar"
elif existing_flow == 0x4D: # ASCII 'M' for MFD
img_type = "mfd"
if img_type != "unknown" and self.app and hasattr(self.app, 'state'):
self.app.state.increment_incomplete_rx_count(img_type)
else:
logging.warning(f"{log_prefix} Could not increment incomplete count for {lingering_key_str} (type='{img_type}', app/state missing?).")
else:
# Logged if transaction was complete or expected count unknown
logging.debug(f"{log_prefix} Lingering transaction {lingering_key_str} state ({received_count}/{expected_count}) processed during cleanup.")
except Exception as e:
# Log error during the state check, but cleanup continues
logging.exception(f"{log_prefix} Error checking state of popped lingering transaction {lingering_key_str}: {e}")
else:
# This branch is hit if the worker thread already removed the fragment entry
logging.debug(f"{log_prefix} Fragment tracking for lingering transaction {lingering_key_str} was already removed (likely by worker).")
# --- Always attempt to remove the corresponding data buffer ---
# Use pop() here as well for safety, although it's less likely to race
# with buffer removal compared to fragment removal.
buffer_popped = self.image_data_buffers.pop(existing_key, None)
if buffer_popped is not None:
logging.debug(f"{log_prefix} Cleaned up data buffer for lingering key {lingering_key_str}.")
# Explicitly delete the large bytearray to potentially help GC sooner
del buffer_popped
# else: Buffer was already gone
# Log completion of cleanup for this specific lingering key
logging.debug(f"{log_prefix} Finished cleanup attempt for lingering key {lingering_key_str}.")
def _calculate_pixel_data_offset(self, image_leader):
"""Calculates the expected byte offset to the start of the pixel data."""