SXXXXXXX_RadarDataReader/extract_aesa_samples.py
2025-06-25 07:50:56 +02:00

100 lines
3.4 KiB
Python

# extract_aesa_samples.py
import numpy as np
import sys
from pathlib import Path
from collections import defaultdict
# --- Configuration ---
LEGACY_BLOCK_MARKER = 0x5A5A5A5A
AESA_BLOCK_ID = 1095976257 # Generic AESA ID
LEGACY_NAME_OFFSET_WORDS = 17 # Offset to name within legacy header
LEGACY_SIZE_OFFSET_WORDS = 5 # Offset to payload size within legacy header
def extract_aesa_samples_robust(
input_file: Path,
output_dir: Path,
num_samples: int = 3,
extract_size_words: int = 2500,
):
"""
Finds the first N AESA blocks and extracts a fixed large chunk of data
from their start to a separate file for manual inspection.
"""
if not input_file.is_file():
print(f"Error: Input file not found at '{input_file}'")
return
output_dir.mkdir(parents=True, exist_ok=True)
print(f"Loading data from '{input_file}'...")
try:
data_vector = np.fromfile(str(input_file), dtype="<u4")
print(f"Loaded {data_vector.size} 32-bit words.")
except Exception as e:
print(f"Error loading file: {e}")
return
print(f"Searching for legacy block marker (0x{LEGACY_BLOCK_MARKER:X})...")
legacy_indices = np.where(data_vector == LEGACY_BLOCK_MARKER)[0]
legacy_starts = legacy_indices[:-1][np.diff(legacy_indices) == 1]
if not legacy_starts.any():
print("No legacy blocks with double markers found.")
return
print(
f"Found {len(legacy_starts)} potential legacy blocks. Searching for AESA blocks to extract..."
)
extracted_count = 0
for start_index in legacy_starts:
if extracted_count >= num_samples:
break
try:
# Check if it's an AESA block
name_id = data_vector[start_index + LEGACY_NAME_OFFSET_WORDS]
if name_id == AESA_BLOCK_ID:
# Extract a fixed, large chunk from this AESA block's starting point
# This chunk includes the 5A5A header, and a large portion of the payload.
end_extract_index = min(
start_index + extract_size_words, data_vector.size
)
block_data_chunk = data_vector[start_index:end_extract_index]
output_file = (
output_dir
/ f"aesa_sample_{extracted_count+1}_at_word_{start_index}.out"
)
print(
f"Extracting sample {extracted_count+1} ({block_data_chunk.size * 4} bytes) from word {start_index} to '{output_file}'..."
)
with open(output_file, "wb") as f:
f.write(block_data_chunk.tobytes())
extracted_count += 1
except IndexError:
# Skip blocks that are too small or too close to EOF
continue
if extracted_count > 0:
print(f"Successfully extracted {extracted_count} AESA block samples.")
else:
print("No AESA blocks found for extraction.")
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python extract_aesa_samples.py <path_to_your_file.out>")
sys.exit(1)
input_path = Path(sys.argv[1])
output_directory = Path("aesa_samples_robust") # Changed output directory name
extract_aesa_samples_robust(
input_path, output_directory, num_samples=5, extract_size_words=2500
) # Extract 5 samples, each up to 2500 words (10KB)