SXXXXXXX_RadarDataReader/extract_dsphrdin.py
VALLONGOL 5de2650675 add
2025-11-12 13:43:30 +01:00

1026 lines
39 KiB
Python

import struct
import json
import argparse
from dataclasses import dataclass, asdict
# --- IDL Structure Definitions (Translated to Python dataclasses) ---
# Assuming little-endian ('<')
# L: unsigned long (4 bytes)
# f: float (4 bytes)
# I: unsigned int (4 bytes) - used for 0L
# F: float (4 bytes) - used for 0.
@dataclass
class PACKETDESCR_t:
NPRI: int
NRBIN: int
NUMOFCHANNELS: int
BLOCKDIMENSION: int
SIGNALORDERING: int
_format = '<IIIII' # 5 unsigned int
@classmethod
def from_bytes(cls, data: bytes):
size = struct.calcsize(cls._format)
if len(data) < size:
raise ValueError(f"Not enough data for PACKETDESCR_t. Expected {size} bytes, got {len(data)}")
unpacked = struct.unpack_from(cls._format, data)
return cls(*unpacked)
@dataclass
class signal_descr_t:
batch_counter: int
ttag: int
PACKETDESCR: PACKETDESCR_t
PACKETLENGTH: int
NUMBEROFPACKETSIGNAL: int
# Format for signal_descr_t excluding PACKETDESCR
_format_prefix = '<II' # batch_counter, ttag
_format_suffix = '<II' # PACKETLENGTH, NUMBEROFPACKETSIGNAL
@classmethod
def from_bytes(cls, data: bytes):
size_prefix = struct.calcsize(cls._format_prefix)
size_packetdescr = struct.calcsize(PACKETDESCR_t._format)
size_suffix = struct.calcsize(cls._format_suffix)
total_size = size_prefix + size_packetdescr + size_suffix
if len(data) < total_size:
raise ValueError(f"Not enough data for signal_descr_t. Expected {total_size} bytes, got {len(data)}")
batch_counter, ttag = struct.unpack_from(cls._format_prefix, data, offset=0)
packet_descr = PACKETDESCR_t.from_bytes(data[size_prefix : size_prefix + size_packetdescr])
packet_length, number_of_packet_signal = struct.unpack_from(cls._format_suffix, data, offset=size_prefix + size_packetdescr)
return cls(batch_counter, ttag, packet_descr, packet_length, number_of_packet_signal)
@dataclass
class header_info_t:
MARKER_DATI_1: int
MARKER_DATI_2: int
header_marker: int
rev_id: int
header_dim: int
VUOTO: list[int] # lonarr(10)
_format = '<IIIII' # MARKER_DATI_1, MARKER_DATI_2, header_marker, rev_id, header_dim
_vuoto_len = 10
@classmethod
def from_bytes(cls, data: bytes):
size_prefix = struct.calcsize(cls._format)
size_vuoto = cls._vuoto_len * struct.calcsize('<I')
total_size = size_prefix + size_vuoto
if len(data) < total_size:
raise ValueError(f"Not enough data for header_info_t. Expected {total_size} bytes, got {len(data)}")
marker1, marker2, h_marker, rev_id, h_dim = struct.unpack_from(cls._format, data, offset=0)
vuoto_data = struct.unpack_from(f'<{cls._vuoto_len}I', data, offset=size_prefix)
return cls(marker1, marker2, h_marker, rev_id, h_dim, list(vuoto_data))
@dataclass
class mode_descr_t:
master_mode: int
operation_mode: int
standby: int
radiate: int
waveform: int
range_scale: int
scan_rate: int
dsp_mode_qualifier: int
VUOTO: list[int] # lonarr(8)
_format = '<IIIIIIII' # 8 unsigned int
_vuoto_len = 8
@classmethod
def from_bytes(cls, data: bytes):
size_prefix = struct.calcsize(cls._format)
size_vuoto = cls._vuoto_len * struct.calcsize('<I')
total_size = size_prefix + size_vuoto
if len(data) < total_size:
raise ValueError(f"Not enough data for mode_descr_t. Expected {total_size} bytes, got {len(data)}")
unpacked_prefix = struct.unpack_from(cls._format, data, offset=0)
vuoto_data = struct.unpack_from(f'<{cls._vuoto_len}I', data, offset=size_prefix)
return cls(*unpacked_prefix, list(vuoto_data))
@dataclass
class exp_settings_t:
exp_mof: int
phase_rst_code: int
pulse_id: int
exp_f_base: int
exp_f_delta: int
pulse_len: int
amp_peak: int
pulse_id_2: int
exp_f_base_2: int
exp_f_delta_2: int
pulse_len_2: int
amp_peak_2: int
_format = '<IIIIIIIIIIII' # 12 unsigned int
@classmethod
def from_bytes(cls, data: bytes):
size = struct.calcsize(cls._format)
if len(data) < size:
raise ValueError(f"Not enough data for exp_settings_t. Expected {size} bytes, got {len(data)}")
unpacked = struct.unpack_from(cls._format, data)
return cls(*unpacked)
@dataclass
class det_settings_t:
det_mof: int
lpf7_filter_id: int
lpf8_filter_id: int
det_f_base: int
det_f_delta: int
ph_vector: int # Assuming 0L
pc_lsn_start: int
pc_lsn_len: int
_format = '<IIIIIIII' # 8 unsigned int
@classmethod
def from_bytes(cls, data: bytes):
size = struct.calcsize(cls._format)
if len(data) < size:
raise ValueError(f"Not enough data for det_settings_t. Expected {size} bytes, got {len(data)}")
unpacked = struct.unpack_from(cls._format, data)
return cls(*unpacked)
@dataclass
class pc_settings_t:
pc_mof: int
filt_id: int
pc_lsn_start: int
pc_lsn_len: int
pc_out_offset: int
_format = '<IIIII' # 5 unsigned int
@classmethod
def from_bytes(cls, data: bytes):
size = struct.calcsize(cls._format)
if len(data) < size:
raise ValueError(f"Not enough data for pc_settings_t. Expected {size} bytes, got {len(data)}")
unpacked = struct.unpack_from(cls._format, data)
return cls(*unpacked)
@dataclass
class agc_settings_t:
n_rbin: int
n_rbin_1: int
mgc: float
tgt_rgate: float
unamb_tgt_range: int
init_rgate: int
app_wind_len: int
inu_valid: int
vmbc_inu: float
vmbc_inu_der: float
tx_chan: int
prt: int
prt_1: int
stc_profile_modif: int
_format = '<IIffIIIIffIIII' # 2 unsigned int, 2 float, 4 unsigned int, 2 float, 4 unsigned int
@classmethod
def from_bytes(cls, data: bytes):
size = struct.calcsize(cls._format)
if len(data) < size:
raise ValueError(f"Not enough data for agc_settings_t. Expected {size} bytes, got {len(data)}")
unpacked = struct.unpack_from(cls._format, data)
return cls(*unpacked)
@dataclass
class sp_settings_t:
exp_settings: exp_settings_t
det_settings: det_settings_t
pc_settings: pc_settings_t
agc_settings: agc_settings_t
@classmethod
def from_bytes(cls, data: bytes):
offset = 0
exp_settings = exp_settings_t.from_bytes(data[offset:])
offset += struct.calcsize(exp_settings_t._format)
det_settings = det_settings_t.from_bytes(data[offset:])
offset += struct.calcsize(det_settings_t._format)
pc_settings = pc_settings_t.from_bytes(data[offset:])
offset += struct.calcsize(pc_settings_t._format)
agc_settings = agc_settings_t.from_bytes(data[offset:])
# offset += struct.calcsize(agc_settings_t._format) # Not needed after last field
return cls(exp_settings, det_settings, pc_settings, agc_settings)
@dataclass
class mission_time_t:
utc_time_1: int
utc_time_2: int
utc_time_3: int
utc_time_4: int
timetag: int
_format = '<IIIII' # 5 unsigned int
@classmethod
def from_bytes(cls, data: bytes):
size = struct.calcsize(cls._format)
if len(data) < size:
raise ValueError(f"Not enough data for mission_time_t. Expected {size} bytes, got {len(data)}")
unpacked = struct.unpack_from(cls._format, data)
return cls(*unpacked)
@dataclass
class nav_attitude_t:
true_heading_rad: float
pitch_rad: float
roll_rad: float
platform_azimuth_rad: float
magnetic_heading_rad: float
_format = '<fffff' # 5 float
@classmethod
def from_bytes(cls, data: bytes):
size = struct.calcsize(cls._format)
if len(data) < size:
raise ValueError(f"Not enough data for nav_attitude_t. Expected {size} bytes, got {len(data)}")
unpacked = struct.unpack_from(cls._format, data)
return cls(*unpacked)
@dataclass
class nav_attitude_rate_t:
heading_rad_sec: float
pitch_rad_sec: float
roll_rad_sec: float
_format = '<fff' # 3 float
@classmethod
def from_bytes(cls, data: bytes):
size = struct.calcsize(cls._format)
if len(data) < size:
raise ValueError(f"Not enough data for nav_attitude_rate_t. Expected {size} bytes, got {len(data)}")
unpacked = struct.unpack_from(cls._format, data)
return cls(*unpacked)
@dataclass
class nav_geo_pos_t:
lat_rad: float
lon_rad: float
altitude_m: float
_format = '<fff' # 3 float
@classmethod
def from_bytes(cls, data: bytes):
size = struct.calcsize(cls._format)
if len(data) < size:
raise ValueError(f"Not enough data for nav_geo_pos_t. Expected {size} bytes, got {len(data)}")
unpacked = struct.unpack_from(cls._format, data)
return cls(*unpacked)
@dataclass
class nav_speed_t:
north_m_s: float
east_m_s: float
down_m_s: float
vx_ground: float
_format = '<ffff' # 4 float
@classmethod
def from_bytes(cls, data: bytes):
size = struct.calcsize(cls._format)
if len(data) < size:
raise ValueError(f"Not enough data for nav_speed_t. Expected {size} bytes, got {len(data)}")
unpacked = struct.unpack_from(cls._format, data)
return cls(*unpacked)
@dataclass
class nav_acceleration_t:
north_m_s2: float
east_m_s2: float
down_m_s2: float
_format = '<fff' # 3 float
@classmethod
def from_bytes(cls, data: bytes):
size = struct.calcsize(cls._format)
if len(data) < size:
raise ValueError(f"Not enough data for nav_acceleration_t. Expected {size} bytes, got {len(data)}")
unpacked = struct.unpack_from(cls._format, data)
return cls(*unpacked)
@dataclass
class navigation_t:
time_tag: int
attitude: nav_attitude_t
attitude_rate: nav_attitude_rate_t
geo_pos: nav_geo_pos_t
speed: nav_speed_t
acceleration: nav_acceleration_t
_format_prefix = '<I' # time_tag
@classmethod
def from_bytes(cls, data: bytes):
offset = 0
time_tag = struct.unpack_from(cls._format_prefix, data, offset=offset)[0]
offset += struct.calcsize(cls._format_prefix)
attitude = nav_attitude_t.from_bytes(data[offset:])
offset += struct.calcsize(nav_attitude_t._format)
attitude_rate = nav_attitude_rate_t.from_bytes(data[offset:])
offset += struct.calcsize(nav_attitude_rate_t._format)
geo_pos = nav_geo_pos_t.from_bytes(data[offset:])
offset += struct.calcsize(nav_geo_pos_t._format)
speed = nav_speed_t.from_bytes(data[offset:])
offset += struct.calcsize(nav_speed_t._format)
acceleration = nav_acceleration_t.from_bytes(data[offset:])
# offset += struct.calcsize(nav_acceleration_t._format) # Not needed after last field
return cls(time_tag, attitude, attitude_rate, geo_pos, speed, acceleration)
@dataclass
class ant_pos_t:
antttag: int
navaz_rad: float
navel_rad: float
bodyaz_rad: float
bodyel_rad: float
antennaaz_rad: float
antennael_rad: float
utx_rad: float
vtx_rad: float
urx_rad: float
vrx_rad: float
uguardrx_rad: float
vguardrx_rad: float
VUOTO: list[int] # lonarr(4)
_format = '<Ifffffffffffff' # 1 unsigned int, 13 float
_vuoto_len = 4
@classmethod
def from_bytes(cls, data: bytes):
size_prefix = struct.calcsize(cls._format)
size_vuoto = cls._vuoto_len * struct.calcsize('<I')
total_size = size_prefix + size_vuoto
if len(data) < total_size:
raise ValueError(f"Not enough data for ant_pos_t. Expected {total_size} bytes, got {len(data)}")
unpacked_prefix = struct.unpack_from(cls._format, data, offset=0)
vuoto_data = struct.unpack_from(f'<{cls._vuoto_len}I', data, offset=size_prefix)
return cls(*unpacked_prefix, list(vuoto_data))
@dataclass
class ant_event_t:
frame_in_progress: int
bar_in_progress: int
number_of_bars: int
current_bar_number: int
VUOTO: list[int] # lonarr(4)
_format = '<IIII' # 4 unsigned int
_vuoto_len = 4
@classmethod
def from_bytes(cls, data: bytes):
size_prefix = struct.calcsize(cls._format)
size_vuoto = cls._vuoto_len * struct.calcsize('<I')
total_size = size_prefix + size_vuoto
if len(data) < total_size:
raise ValueError(f"Not enough data for ant_event_t. Expected {total_size} bytes, got {len(data)}")
unpacked_prefix = struct.unpack_from(cls._format, data, offset=0)
vuoto_data = struct.unpack_from(f'<{cls._vuoto_len}I', data, offset=size_prefix)
return cls(*unpacked_prefix, list(vuoto_data))
@dataclass
class antenna_t:
position: ant_pos_t
event: ant_event_t
@classmethod
def from_bytes(cls, data: bytes):
offset = 0
position = ant_pos_t.from_bytes(data[offset:])
offset += (struct.calcsize(ant_pos_t._format) + ant_pos_t._vuoto_len * struct.calcsize('<I'))
event = ant_event_t.from_bytes(data[offset:])
# offset += (struct.calcsize(ant_event_t._format) + ant_event_t._vuoto_len * struct.calcsize('<I')) # Not needed after last field
return cls(position, event)
@dataclass
class waveform_descriptor_t:
validity: int
VUOTO: list[int] # lonarr(8)
_format = '<I' # 1 unsigned int
_vuoto_len = 8
@classmethod
def from_bytes(cls, data: bytes):
size_prefix = struct.calcsize(cls._format)
size_vuoto = cls._vuoto_len * struct.calcsize('<I')
total_size = size_prefix + size_vuoto
if len(data) < total_size:
raise ValueError(f"Not enough data for waveform_descriptor_t. Expected {total_size} bytes, got {len(data)}")
unpacked_prefix = struct.unpack_from(cls._format, data, offset=0)
vuoto_data = struct.unpack_from(f'<{cls._vuoto_len}I', data, offset=size_prefix)
return cls(*unpacked_prefix, list(vuoto_data))
@dataclass
class general_settings_t:
missiontime: mission_time_t
navigation: navigation_t
antenna: antenna_t
waveform_descriptor: waveform_descriptor_t
@classmethod
def from_bytes(cls, data: bytes):
offset = 0
missiontime = mission_time_t.from_bytes(data[offset:])
offset += struct.calcsize(mission_time_t._format)
navigation = navigation_t.from_bytes(data[offset:])
offset += (struct.calcsize(navigation_t._format_prefix) +
struct.calcsize(nav_attitude_t._format) +
struct.calcsize(nav_attitude_rate_t._format) +
struct.calcsize(nav_geo_pos_t._format) +
struct.calcsize(nav_speed_t._format) +
struct.calcsize(nav_acceleration_t._format))
antenna = antenna_t.from_bytes(data[offset:])
offset += (struct.calcsize(ant_pos_t._format) + ant_pos_t._vuoto_len * struct.calcsize('<I') +
struct.calcsize(ant_event_t._format) + ant_event_t._vuoto_len * struct.calcsize('<I'))
waveform_descriptor = waveform_descriptor_t.from_bytes(data[offset:])
# offset += (struct.calcsize(waveform_descriptor_t._format) + waveform_descriptor_t._vuoto_len * struct.calcsize('<I')) # Not needed after last field
return cls(missiontime, navigation, antenna, waveform_descriptor)
@dataclass
class nci_settings_t:
n_rbin: int
n_pulses: int
time_tag: int
conv_fact: float
_format = '<IIIf' # 3 unsigned int, 1 float
@classmethod
def from_bytes(cls, data: bytes):
size = struct.calcsize(cls._format)
if len(data) < size:
raise ValueError(f"Not enough data for nci_settings_t. Expected {size} bytes, got {len(data)}")
unpacked = struct.unpack_from(cls._format, data)
return cls(*unpacked)
@dataclass
class mtifft_settings_t:
TIMETAG: int
N_RBIN: int
N_PULSES: int
N_INT: int
NCLUTTER_REG: int
PRT: int
MTI_FILT_ID: int
FFT_WINDOW_ID: int
CONV_SCALE_FACTOR: float
_format = '<IIIIIIIIf' # 8 unsigned int, 1 float
@classmethod
def from_bytes(cls, data: bytes):
size = struct.calcsize(cls._format)
if len(data) < size:
raise ValueError(f"Not enough data for mtifft_settings_t. Expected {size} bytes, got {len(data)}")
unpacked = struct.unpack_from(cls._format, data)
return cls(*unpacked)
@dataclass
class cfar_settings_t: # Placeholder, as it's defined in another file (HEADER_CFAR)
# For now, we'll just read a fixed size of bytes for this placeholder
# In a real scenario, you'd parse HEADER_CFAR.pro to define this structure
placeholder_data: list[int]
_size_bytes = 10 * struct.calcsize('<I') # Assuming 10 L (40 bytes) as a placeholder
@classmethod
def from_bytes(cls, data: bytes):
num_longs = cls._size_bytes // struct.calcsize('<I')
_format = f'<{num_longs}I'
if len(data) < cls._size_bytes:
raise ValueError(f"Not enough data for cfar_settings_t placeholder. Expected {cls._size_bytes} bytes, got {len(data)}")
unpacked = struct.unpack_from(_format, data)
return cls(list(unpacked))
@dataclass
class arf_settings_t:
N_RBIN: int
N_FFT: int
_format = '<II' # 2 unsigned int
@classmethod
def from_bytes(cls, data: bytes):
size = struct.calcsize(cls._format)
if len(data) < size:
raise ValueError(f"Not enough data for arf_settings_t. Expected {size} bytes, got {len(data)}")
unpacked = struct.unpack_from(cls._format, data)
return cls(*unpacked)
@dataclass
class slb_settings_t:
TIMETAG: int
SLOBE_BLANKING: int
SLBC: float
N_RBIN: int
N_FFT: int
GRIFO_GAIN: float
MAX_COLL_DET: int
COLL_MAX_RNG: int
COLL_MAX_DOP: int
CFAR_START_FILT: int
DOPPLER_NOTCH: float
PRT: int
_format = '<II f II f IIII f I' # 2 unsigned int, 1 float, 2 unsigned int, 1 float, 4 unsigned int, 1 float, 1 unsigned int
@classmethod
def from_bytes(cls, data: bytes):
size = struct.calcsize(cls._format)
if len(data) < size:
raise ValueError(f"Not enough data for slb_settings_t. Expected {size} bytes, got {len(data)}")
unpacked = struct.unpack_from(cls._format, data)
return cls(*unpacked)
@dataclass
class pe_settings_t:
TIMETAG: int
N_RBIN: int
LRB: float
PRT: float
MLC_FREQ: float
CLOSE_PLOT_FLAG: int
TX_CHAN: int
ANT_AZ: float
ANT_EL: float
NAV_AZ: float
NAV_EL: float
YAW: float
PITCH: float
ROLL: float
PLAT_AZ: float
PP_COAST_FLAG: int
PP_MPRF_LOOKUP: int
STT_RB_START: int
STT_RB_STOP: int
STT_FIL_START: int
STT_FIL_STOP: int
UNAMB_TGT_RANGE: float
UNAMB_TGT_DOPPLER: float
FM_SLOPE: int
RB_START: int
RB_STOP: int
FIL_START: int
FIL_STOP: int
SDR_BEFORE_COAST: float
PRF_SET: int
PRF_NUMBER: int
MTG: int
SPUR_NUM: int
DELSIGN: int
JAM_THRESH: int
CUT_OFF_VEL: int
BATCH_END: int
SPUR_FREQ: int
FILTERED_ERR_VEL_CLUTT: int
FLW_RANGE_START: int
FLW_RANGE_STOP: int
FLW_DOPPLER_START: int
FLW_DOPPLER_STOP: int
DESENSITISATION: int
BATCH_AZIM_ANGLE: int
BATCH_COUNT: int
BATCH_DUR: int
AGR_WINDOW_CENTER: int
AGR_WINDOW_LENGTH: int
NGT_ENABLE: int
MTI_CODE: int
MBC_DOP_INDEX: int
STT_TRANSITION_STATUS: int
TRACKED_TGT_AMPLITUDE: int
ALE_BLANKING_ENABLE: int
ALE_RANGE_ESTIMATE: int
ALE_BLK_START: int
ALE_BLK_STOP: int
ALE_TRK_START: int
ALE_TRK_STOP: int
BARO_ALT: int
BARO_ALT_VALIDITY: float
Z_VELOCITY: int
AGC_GAIN: int
MGC: int
EXPAND: int
tx_chan_mhz: int
lrb_meters: float
VUOTO: list[int] # lonarr(16)
_format = '<II fff II ffffffff IIII f f IIII f III IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII'
# DSPHDRIN_MARKER = 1213223748 # This is the value for 'HDRIN' in IDL, as seen in Grifo_E_data_reader.pro
DSPHDRIN_MARKER = 0x48445249 # 'HDRIN' as ASCII in hex, assuming little-endian byte order for the marker itself.
# The IDL code uses 1213223748L, which is 0x48445249 in decimal. This confirms the marker.
@dataclass
class header_sw_part1:
hdr_part1: int
hdr_part2: int
version: int
vuoto_10: int
block_header_size: int
block_size: int
skip_offset: int
counter: int
batch_counter: int
signal_counter: int
timetag: int
reserved_0: int
reserved_1: int
reserved_2: int
reserved_3: int
chunk_info_size: int
extra_info_size: int
_format = '<IIIIIIIIIIIIIIIII' # 17 unsigned int
@classmethod
def from_bytes(cls, data: bytes):
size = struct.calcsize(cls._format)
if len(data) < size:
raise ValueError(f"Not enough data for header_sw_part1. Expected {size} bytes, got {len(data)}")
unpacked = struct.unpack_from(cls._format, data)
return cls(*unpacked)
@dataclass
class header_sw_chank:
NOME_1: int
NOME_2: int
NOME_3: int
NOME_4: int
RESERVED: int
NUMERO_DI_CHUNKS: int
MARKER_ASCII_1: int
MARKER_ASCII_2: int
SIZE_META_INFORMATION: int
RESERVED_5: int
RESERVED_6: int
RESERVED_7: int
RESERVED_8: int
CHUNK_SIZE: int
vuoto_2: int
TIPO_: int
vuoto_7: int
_format = '<IIIIIIIIIIIIIIIII' # 17 unsigned int
@classmethod
def from_bytes(cls, data: bytes):
size = struct.calcsize(cls._format)
if len(data) < size:
raise ValueError(f"Not enough data for header_sw_chank. Expected {size} bytes, got {len(data)}")
unpacked = struct.unpack_from(cls._format, data)
return cls(*unpacked)
@dataclass
class header_sw:
header_sw_part1: header_sw_part1
header_sw_chank: header_sw_chank
@classmethod
def from_bytes(cls, data: bytes):
offset = 0
part1 = header_sw_part1.from_bytes(data[offset:])
offset += struct.calcsize(header_sw_part1._format)
chank = header_sw_chank.from_bytes(data[offset:])
# offset += struct.calcsize(header_sw_chank._format) # Not needed after last field
return cls(part1, chank)
@dataclass
class function_settings_t:
nci_settings: nci_settings_t
mtifft_settings: mtifft_settings_t
cfar_settings: cfar_settings_t
arf_settings: arf_settings_t
slb_settings: slb_settings_t
pe_settings: pe_settings_t
sc_settings: 'sc_settings_t' # Forward reference
dbs_pe_settings: 'dbs_pe_settings_t' # Forward reference
@classmethod
def from_bytes(cls, data: bytes):
offset = 0
nci = nci_settings_t.from_bytes(data[offset:])
offset += struct.calcsize(nci_settings_t._format)
mtifft = mtifft_settings_t.from_bytes(data[offset:])
offset += struct.calcsize(mtifft_settings_t._format)
cfar = cfar_settings_t.from_bytes(data[offset:])
offset += cfar_settings_t._size_bytes # Use defined size for placeholder
arf = arf_settings_t.from_bytes(data[offset:])
offset += struct.calcsize(arf_settings_t._format)
slb = slb_settings_t.from_bytes(data[offset:])
offset += struct.calcsize(slb_settings_t._format)
pe = pe_settings_t.from_bytes(data[offset:])
offset += (struct.calcsize(pe_settings_t._format) + pe_settings_t._vuoto_len * struct.calcsize('<I'))
sc = sc_settings_t.from_bytes(data[offset:])
offset += (struct.calcsize(sc_settings_t._format) + sc_settings_t._vuoto_len * struct.calcsize('<I'))
dbs_pe = dbs_pe_settings_t.from_bytes(data[offset:])
# offset += (struct.calcsize(dbs_pe_settings_t._format) + dbs_pe_settings_t._vuoto_len * struct.calcsize('<I')) # Not needed after last field
return cls(nci, mtifft, cfar, arf, slb, pe, sc, dbs_pe)
@dataclass
class sc_settings_t:
ant_nav_direction: int
ant_current_bar_number: int
ant_nav_scan_center: int
ant_nav_left_scan_limit: int
ant_nav_right_scan_limit: int
rbm_white_level: int
map_comp_vx: int
map_comp_vy: int
VUOTO: list[int] # lonarr(8)
_format = '<IIIIIIII' # 8 unsigned int
_vuoto_len = 8
@classmethod
def from_bytes(cls, data: bytes):
size_prefix = struct.calcsize(cls._format)
size_vuoto = cls._vuoto_len * struct.calcsize('<I')
total_size = size_prefix + size_vuoto
if len(data) < total_size:
raise ValueError(f"Not enough data for sc_settings_t. Expected {total_size} bytes, got {len(data)}")
unpacked_prefix = struct.unpack_from(cls._format, data, offset=0)
vuoto_data = struct.unpack_from(f'<{cls._vuoto_len}I', data, offset=size_prefix)
return cls(*unpacked_prefix, list(vuoto_data))
@dataclass
class dbs_pe_settings_t:
dbs_flush_flag: int
int_rbin_offset: int
skip_processing: int
VUOTO: list[int] # lonarr(8)
_format = '<III' # 3 unsigned int
_vuoto_len = 8
@classmethod
def from_bytes(cls, data: bytes):
size_prefix = struct.calcsize(cls._format)
size_vuoto = cls._vuoto_len * struct.calcsize('<I')
total_size = size_prefix + size_vuoto
if len(data) < total_size:
raise ValueError(f"Not enough data for dbs_pe_settings_t. Expected {total_size} bytes, got {len(data)}")
unpacked_prefix = struct.unpack_from(cls._format, data, offset=0)
vuoto_data = struct.unpack_from(f'<{cls._vuoto_len}I', data, offset=size_prefix)
return cls(*unpacked_prefix, list(vuoto_data))
@dataclass
class ge_header_t:
header_info: header_info_t
signal_descr: signal_descr_t
mode: mode_descr_t
sp_settings: sp_settings_t
general_settings: general_settings_t
function_settings: function_settings_t
@classmethod
def from_bytes(cls, data: bytes):
offset = 0
h_info = header_info_t.from_bytes(data[offset:])
offset += (struct.calcsize(header_info_t._format) + header_info_t._vuoto_len * struct.calcsize('<I'))
s_descr = signal_descr_t.from_bytes(data[offset:])
offset += (struct.calcsize(signal_descr_t._format_prefix) +
struct.calcsize(PACKETDESCR_t._format) +
struct.calcsize(signal_descr_t._format_suffix))
mode = mode_descr_t.from_bytes(data[offset:])
offset += (struct.calcsize(mode_descr_t._format) + mode_descr_t._vuoto_len * struct.calcsize('<I'))
sp_settings = sp_settings_t.from_bytes(data[offset:])
offset += (struct.calcsize(exp_settings_t._format) +
struct.calcsize(det_settings_t._format) +
struct.calcsize(pc_settings_t._format) +
struct.calcsize(agc_settings_t._format))
general_settings = general_settings_t.from_bytes(data[offset:])
offset += (struct.calcsize(mission_time_t._format) +
struct.calcsize(navigation_t._format_prefix) +
struct.calcsize(nav_attitude_t._format) +
struct.calcsize(nav_attitude_rate_t._format) +
struct.calcsize(nav_geo_pos_t._format) +
struct.calcsize(nav_speed_t._format) +
struct.calcsize(nav_acceleration_t._format) +
struct.calcsize(ant_pos_t._format) + ant_pos_t._vuoto_len * struct.calcsize('<I') +
struct.calcsize(ant_event_t._format) + ant_event_t._vuoto_len * struct.calcsize('<I') +
struct.calcsize(waveform_descriptor_t._format) + waveform_descriptor_t._vuoto_len * struct.calcsize('<I'))
func_settings = function_settings_t.from_bytes(data[offset:])
# offset += ... # Not needed after last field
return cls(h_info, s_descr, mode, sp_settings, general_settings, func_settings)
@dataclass
class Header:
header_sw: header_sw
header_data: ge_header_t
@classmethod
def from_bytes(cls, data: bytes):
offset = 0
h_sw = header_sw.from_bytes(data[offset:])
offset += (struct.calcsize(header_sw_part1._format) + struct.calcsize(header_sw_chank._format))
h_data = ge_header_t.from_bytes(data[offset:])
# offset += ... # Not needed after last field
return cls(h_sw, h_data)
def read_dsphrdin_block(file_handle, offset: int) -> dict:
"""
Reads a DSPHDRIN block from the file at the given offset.
Returns a dictionary of the extracted data.
"""
file_handle.seek(offset)
# Read enough bytes to cover the entire Header structure (based on header_gen_v1.pro)
# This is a rough estimate; a more robust solution would calculate exact size
# by summing up all struct sizes. For now, let's read a generous amount.
# A typical header might be a few hundred bytes. Let's assume 1024 bytes for safety.
# In a real scenario, you'd calculate the exact size of the Header class.
# Calculate the total size of the Header structure for header_gen_v1.pro
total_header_size = (
struct.calcsize(header_sw_part1._format) +
struct.calcsize(header_sw_chank._format) +
struct.calcsize(header_info_t._format) + header_info_t._vuoto_len * struct.calcsize('<I') +
struct.calcsize(signal_descr_t._format_prefix) +
struct.calcsize(PACKETDESCR_t._format) +
struct.calcsize(signal_descr_t._format_suffix) +
struct.calcsize(mode_descr_t._format) + mode_descr_t._vuoto_len * struct.calcsize('<I') +
struct.calcsize(exp_settings_t._format) +
struct.calcsize(det_settings_t._format) +
struct.calcsize(pc_settings_t._format) +
struct.calcsize(agc_settings_t._format) +
struct.calcsize(mission_time_t._format) +
struct.calcsize(navigation_t._format_prefix) +
struct.calcsize(nav_attitude_t._format) +
struct.calcsize(nav_attitude_rate_t._format) +
struct.calcsize(nav_geo_pos_t._format) +
struct.calcsize(nav_speed_t._format) +
struct.calcsize(nav_acceleration_t._format) +
struct.calcsize(ant_pos_t._format) + ant_pos_t._vuoto_len * struct.calcsize('<I') +
struct.calcsize(ant_event_t._format) + ant_event_t._vuoto_len * struct.calcsize('<I') +
struct.calcsize(waveform_descriptor_t._format) + waveform_descriptor_t._vuoto_len * struct.calcsize('<I') +
struct.calcsize(nci_settings_t._format) +
struct.calcsize(mtifft_settings_t._format) +
cfar_settings_t._size_bytes + # Placeholder size
struct.calcsize(arf_settings_t._format) +
struct.calcsize(slb_settings_t._format) +
struct.calcsize(pe_settings_t._format) + pe_settings_t._vuoto_len * struct.calcsize('<I') +
struct.calcsize(sc_settings_t._format) + sc_settings_t._vuoto_len * struct.calcsize('<I') +
struct.calcsize(dbs_pe_settings_t._format) + dbs_pe_settings_t._vuoto_len * struct.calcsize('<I')
)
header_bytes = file_handle.read(total_header_size)
if not header_bytes:
return None
try:
header_data = Header.from_bytes(header_bytes)
return asdict(header_data)
except ValueError as e:
print(f"Error parsing DSPHDRIN block at offset {offset}: {e}")
return None
def main():
parser = argparse.ArgumentParser(description="Extract DSPHDRIN blocks from a radar .out file and save to JSON.")
parser.add_argument("input_file", help="Path to the input .out file.")
parser.add_argument("output_file", help="Path to the output JSON file.")
args = parser.parse_args()
extracted_data = []
dsphrdin_marker_bytes = DSPHDRIN_MARKER.to_bytes(4, 'little') # Convert marker to 4-byte little-endian
try:
with open(args.input_file, 'rb') as f:
file_content = f.read()
offset = 0
while True:
# Search for the marker
marker_pos = file_content.find(dsphrdin_marker_bytes, offset)
if marker_pos == -1:
break # No more markers found
# The DSPHDRIN marker is part of the header_info_t.header_marker field.
# The actual DSPHDRIN block starts earlier, at the beginning of the Header structure.
# We need to find the start of the *entire* Header structure.
# From Grifo_E_data_reader.pro, the DSPHDRIN marker (1213223748L) is found at
# NAME_BLOCK_V[index_hdr[i]+j], and then the readu is done from INDEX_BLOCK.
# INDEX_BLOCK is ixBlocks[index_hdr[i]+j].
# NAME_BLOCK_V = vector[ixBlocks+17]
# This means the marker is 17 * 4 bytes (68 bytes) into the block.
# So, the actual start of the block is marker_pos - (17 * 4) bytes.
header_start_offset = marker_pos - (17 * 4) # 17 unsigned ints before the marker
if header_start_offset < 0:
print(f"Warning: DSPHDRIN marker found at {marker_pos}, but calculated header start offset {header_start_offset} is negative. Skipping.")
offset = marker_pos + len(dsphrdin_marker_bytes)
continue
print(f"Found DSPHDRIN marker at byte offset: {marker_pos}. Attempting to parse header from offset: {header_start_offset}")
# Read the block from the calculated start offset
f.seek(header_start_offset)
# Read enough bytes for the header.
# This is a simplified approach. A more robust solution would
# read the header_sw_part1 first to get block_size, then read that many bytes.
# For now, we rely on the fixed size calculation in read_dsphrdin_block.
# Pass the file handle and the calculated start offset
header_data = read_dsphrdin_block(f, header_start_offset)
if header_data:
extracted_data.append(header_data)
# Move the offset past the current header block to search for the next one
# For simplicity, we'll just move past the marker for now.
# A more accurate approach would use the block_size from the header.
offset = marker_pos + len(dsphrdin_marker_bytes)
except FileNotFoundError:
print(f"Error: Input file '{args.input_file}' not found.")
return
except Exception as e:
print(f"An unexpected error occurred: {e}")
return
if extracted_data:
with open(args.output_file, 'w') as f_json:
json.dump(extracted_data, f_json, indent=4)
print(f"Successfully extracted {len(extracted_data)} DSPHDRIN blocks to '{args.output_file}'.")
else:
print("No DSPHDRIN blocks found or extracted.")
if __name__ == "__main__":
main()