1026 lines
39 KiB
Python
1026 lines
39 KiB
Python
import struct
|
|
import json
|
|
import argparse
|
|
from dataclasses import dataclass, asdict
|
|
|
|
# --- IDL Structure Definitions (Translated to Python dataclasses) ---
|
|
# Assuming little-endian ('<')
|
|
# L: unsigned long (4 bytes)
|
|
# f: float (4 bytes)
|
|
# I: unsigned int (4 bytes) - used for 0L
|
|
# F: float (4 bytes) - used for 0.
|
|
|
|
@dataclass
|
|
class PACKETDESCR_t:
|
|
NPRI: int
|
|
NRBIN: int
|
|
NUMOFCHANNELS: int
|
|
BLOCKDIMENSION: int
|
|
SIGNALORDERING: int
|
|
|
|
_format = '<IIIII' # 5 unsigned int
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size = struct.calcsize(cls._format)
|
|
if len(data) < size:
|
|
raise ValueError(f"Not enough data for PACKETDESCR_t. Expected {size} bytes, got {len(data)}")
|
|
unpacked = struct.unpack_from(cls._format, data)
|
|
return cls(*unpacked)
|
|
|
|
@dataclass
|
|
class signal_descr_t:
|
|
batch_counter: int
|
|
ttag: int
|
|
PACKETDESCR: PACKETDESCR_t
|
|
PACKETLENGTH: int
|
|
NUMBEROFPACKETSIGNAL: int
|
|
|
|
# Format for signal_descr_t excluding PACKETDESCR
|
|
_format_prefix = '<II' # batch_counter, ttag
|
|
_format_suffix = '<II' # PACKETLENGTH, NUMBEROFPACKETSIGNAL
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size_prefix = struct.calcsize(cls._format_prefix)
|
|
size_packetdescr = struct.calcsize(PACKETDESCR_t._format)
|
|
size_suffix = struct.calcsize(cls._format_suffix)
|
|
total_size = size_prefix + size_packetdescr + size_suffix
|
|
|
|
if len(data) < total_size:
|
|
raise ValueError(f"Not enough data for signal_descr_t. Expected {total_size} bytes, got {len(data)}")
|
|
|
|
batch_counter, ttag = struct.unpack_from(cls._format_prefix, data, offset=0)
|
|
packet_descr = PACKETDESCR_t.from_bytes(data[size_prefix : size_prefix + size_packetdescr])
|
|
packet_length, number_of_packet_signal = struct.unpack_from(cls._format_suffix, data, offset=size_prefix + size_packetdescr)
|
|
|
|
return cls(batch_counter, ttag, packet_descr, packet_length, number_of_packet_signal)
|
|
|
|
@dataclass
|
|
class header_info_t:
|
|
MARKER_DATI_1: int
|
|
MARKER_DATI_2: int
|
|
header_marker: int
|
|
rev_id: int
|
|
header_dim: int
|
|
VUOTO: list[int] # lonarr(10)
|
|
|
|
_format = '<IIIII' # MARKER_DATI_1, MARKER_DATI_2, header_marker, rev_id, header_dim
|
|
_vuoto_len = 10
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size_prefix = struct.calcsize(cls._format)
|
|
size_vuoto = cls._vuoto_len * struct.calcsize('<I')
|
|
total_size = size_prefix + size_vuoto
|
|
|
|
if len(data) < total_size:
|
|
raise ValueError(f"Not enough data for header_info_t. Expected {total_size} bytes, got {len(data)}")
|
|
|
|
marker1, marker2, h_marker, rev_id, h_dim = struct.unpack_from(cls._format, data, offset=0)
|
|
vuoto_data = struct.unpack_from(f'<{cls._vuoto_len}I', data, offset=size_prefix)
|
|
|
|
return cls(marker1, marker2, h_marker, rev_id, h_dim, list(vuoto_data))
|
|
|
|
@dataclass
|
|
class mode_descr_t:
|
|
master_mode: int
|
|
operation_mode: int
|
|
standby: int
|
|
radiate: int
|
|
waveform: int
|
|
range_scale: int
|
|
scan_rate: int
|
|
dsp_mode_qualifier: int
|
|
VUOTO: list[int] # lonarr(8)
|
|
|
|
_format = '<IIIIIIII' # 8 unsigned int
|
|
_vuoto_len = 8
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size_prefix = struct.calcsize(cls._format)
|
|
size_vuoto = cls._vuoto_len * struct.calcsize('<I')
|
|
total_size = size_prefix + size_vuoto
|
|
|
|
if len(data) < total_size:
|
|
raise ValueError(f"Not enough data for mode_descr_t. Expected {total_size} bytes, got {len(data)}")
|
|
|
|
unpacked_prefix = struct.unpack_from(cls._format, data, offset=0)
|
|
vuoto_data = struct.unpack_from(f'<{cls._vuoto_len}I', data, offset=size_prefix)
|
|
|
|
return cls(*unpacked_prefix, list(vuoto_data))
|
|
|
|
@dataclass
|
|
class exp_settings_t:
|
|
exp_mof: int
|
|
phase_rst_code: int
|
|
pulse_id: int
|
|
exp_f_base: int
|
|
exp_f_delta: int
|
|
pulse_len: int
|
|
amp_peak: int
|
|
pulse_id_2: int
|
|
exp_f_base_2: int
|
|
exp_f_delta_2: int
|
|
pulse_len_2: int
|
|
amp_peak_2: int
|
|
|
|
_format = '<IIIIIIIIIIII' # 12 unsigned int
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size = struct.calcsize(cls._format)
|
|
if len(data) < size:
|
|
raise ValueError(f"Not enough data for exp_settings_t. Expected {size} bytes, got {len(data)}")
|
|
unpacked = struct.unpack_from(cls._format, data)
|
|
return cls(*unpacked)
|
|
|
|
@dataclass
|
|
class det_settings_t:
|
|
det_mof: int
|
|
lpf7_filter_id: int
|
|
lpf8_filter_id: int
|
|
det_f_base: int
|
|
det_f_delta: int
|
|
ph_vector: int # Assuming 0L
|
|
pc_lsn_start: int
|
|
pc_lsn_len: int
|
|
|
|
_format = '<IIIIIIII' # 8 unsigned int
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size = struct.calcsize(cls._format)
|
|
if len(data) < size:
|
|
raise ValueError(f"Not enough data for det_settings_t. Expected {size} bytes, got {len(data)}")
|
|
unpacked = struct.unpack_from(cls._format, data)
|
|
return cls(*unpacked)
|
|
|
|
@dataclass
|
|
class pc_settings_t:
|
|
pc_mof: int
|
|
filt_id: int
|
|
pc_lsn_start: int
|
|
pc_lsn_len: int
|
|
pc_out_offset: int
|
|
|
|
_format = '<IIIII' # 5 unsigned int
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size = struct.calcsize(cls._format)
|
|
if len(data) < size:
|
|
raise ValueError(f"Not enough data for pc_settings_t. Expected {size} bytes, got {len(data)}")
|
|
unpacked = struct.unpack_from(cls._format, data)
|
|
return cls(*unpacked)
|
|
|
|
@dataclass
|
|
class agc_settings_t:
|
|
n_rbin: int
|
|
n_rbin_1: int
|
|
mgc: float
|
|
tgt_rgate: float
|
|
unamb_tgt_range: int
|
|
init_rgate: int
|
|
app_wind_len: int
|
|
inu_valid: int
|
|
vmbc_inu: float
|
|
vmbc_inu_der: float
|
|
tx_chan: int
|
|
prt: int
|
|
prt_1: int
|
|
stc_profile_modif: int
|
|
|
|
_format = '<IIffIIIIffIIII' # 2 unsigned int, 2 float, 4 unsigned int, 2 float, 4 unsigned int
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size = struct.calcsize(cls._format)
|
|
if len(data) < size:
|
|
raise ValueError(f"Not enough data for agc_settings_t. Expected {size} bytes, got {len(data)}")
|
|
unpacked = struct.unpack_from(cls._format, data)
|
|
return cls(*unpacked)
|
|
|
|
@dataclass
|
|
class sp_settings_t:
|
|
exp_settings: exp_settings_t
|
|
det_settings: det_settings_t
|
|
pc_settings: pc_settings_t
|
|
agc_settings: agc_settings_t
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
offset = 0
|
|
exp_settings = exp_settings_t.from_bytes(data[offset:])
|
|
offset += struct.calcsize(exp_settings_t._format)
|
|
|
|
det_settings = det_settings_t.from_bytes(data[offset:])
|
|
offset += struct.calcsize(det_settings_t._format)
|
|
|
|
pc_settings = pc_settings_t.from_bytes(data[offset:])
|
|
offset += struct.calcsize(pc_settings_t._format)
|
|
|
|
agc_settings = agc_settings_t.from_bytes(data[offset:])
|
|
# offset += struct.calcsize(agc_settings_t._format) # Not needed after last field
|
|
|
|
return cls(exp_settings, det_settings, pc_settings, agc_settings)
|
|
|
|
@dataclass
|
|
class mission_time_t:
|
|
utc_time_1: int
|
|
utc_time_2: int
|
|
utc_time_3: int
|
|
utc_time_4: int
|
|
timetag: int
|
|
|
|
_format = '<IIIII' # 5 unsigned int
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size = struct.calcsize(cls._format)
|
|
if len(data) < size:
|
|
raise ValueError(f"Not enough data for mission_time_t. Expected {size} bytes, got {len(data)}")
|
|
unpacked = struct.unpack_from(cls._format, data)
|
|
return cls(*unpacked)
|
|
|
|
@dataclass
|
|
class nav_attitude_t:
|
|
true_heading_rad: float
|
|
pitch_rad: float
|
|
roll_rad: float
|
|
platform_azimuth_rad: float
|
|
magnetic_heading_rad: float
|
|
|
|
_format = '<fffff' # 5 float
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size = struct.calcsize(cls._format)
|
|
if len(data) < size:
|
|
raise ValueError(f"Not enough data for nav_attitude_t. Expected {size} bytes, got {len(data)}")
|
|
unpacked = struct.unpack_from(cls._format, data)
|
|
return cls(*unpacked)
|
|
|
|
@dataclass
|
|
class nav_attitude_rate_t:
|
|
heading_rad_sec: float
|
|
pitch_rad_sec: float
|
|
roll_rad_sec: float
|
|
|
|
_format = '<fff' # 3 float
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size = struct.calcsize(cls._format)
|
|
if len(data) < size:
|
|
raise ValueError(f"Not enough data for nav_attitude_rate_t. Expected {size} bytes, got {len(data)}")
|
|
unpacked = struct.unpack_from(cls._format, data)
|
|
return cls(*unpacked)
|
|
|
|
@dataclass
|
|
class nav_geo_pos_t:
|
|
lat_rad: float
|
|
lon_rad: float
|
|
altitude_m: float
|
|
|
|
_format = '<fff' # 3 float
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size = struct.calcsize(cls._format)
|
|
if len(data) < size:
|
|
raise ValueError(f"Not enough data for nav_geo_pos_t. Expected {size} bytes, got {len(data)}")
|
|
unpacked = struct.unpack_from(cls._format, data)
|
|
return cls(*unpacked)
|
|
|
|
@dataclass
|
|
class nav_speed_t:
|
|
north_m_s: float
|
|
east_m_s: float
|
|
down_m_s: float
|
|
vx_ground: float
|
|
|
|
_format = '<ffff' # 4 float
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size = struct.calcsize(cls._format)
|
|
if len(data) < size:
|
|
raise ValueError(f"Not enough data for nav_speed_t. Expected {size} bytes, got {len(data)}")
|
|
unpacked = struct.unpack_from(cls._format, data)
|
|
return cls(*unpacked)
|
|
|
|
@dataclass
|
|
class nav_acceleration_t:
|
|
north_m_s2: float
|
|
east_m_s2: float
|
|
down_m_s2: float
|
|
|
|
_format = '<fff' # 3 float
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size = struct.calcsize(cls._format)
|
|
if len(data) < size:
|
|
raise ValueError(f"Not enough data for nav_acceleration_t. Expected {size} bytes, got {len(data)}")
|
|
unpacked = struct.unpack_from(cls._format, data)
|
|
return cls(*unpacked)
|
|
|
|
@dataclass
|
|
class navigation_t:
|
|
time_tag: int
|
|
attitude: nav_attitude_t
|
|
attitude_rate: nav_attitude_rate_t
|
|
geo_pos: nav_geo_pos_t
|
|
speed: nav_speed_t
|
|
acceleration: nav_acceleration_t
|
|
|
|
_format_prefix = '<I' # time_tag
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
offset = 0
|
|
time_tag = struct.unpack_from(cls._format_prefix, data, offset=offset)[0]
|
|
offset += struct.calcsize(cls._format_prefix)
|
|
|
|
attitude = nav_attitude_t.from_bytes(data[offset:])
|
|
offset += struct.calcsize(nav_attitude_t._format)
|
|
|
|
attitude_rate = nav_attitude_rate_t.from_bytes(data[offset:])
|
|
offset += struct.calcsize(nav_attitude_rate_t._format)
|
|
|
|
geo_pos = nav_geo_pos_t.from_bytes(data[offset:])
|
|
offset += struct.calcsize(nav_geo_pos_t._format)
|
|
|
|
speed = nav_speed_t.from_bytes(data[offset:])
|
|
offset += struct.calcsize(nav_speed_t._format)
|
|
|
|
acceleration = nav_acceleration_t.from_bytes(data[offset:])
|
|
# offset += struct.calcsize(nav_acceleration_t._format) # Not needed after last field
|
|
|
|
return cls(time_tag, attitude, attitude_rate, geo_pos, speed, acceleration)
|
|
|
|
@dataclass
|
|
class ant_pos_t:
|
|
antttag: int
|
|
navaz_rad: float
|
|
navel_rad: float
|
|
bodyaz_rad: float
|
|
bodyel_rad: float
|
|
antennaaz_rad: float
|
|
antennael_rad: float
|
|
utx_rad: float
|
|
vtx_rad: float
|
|
urx_rad: float
|
|
vrx_rad: float
|
|
uguardrx_rad: float
|
|
vguardrx_rad: float
|
|
VUOTO: list[int] # lonarr(4)
|
|
|
|
_format = '<Ifffffffffffff' # 1 unsigned int, 13 float
|
|
_vuoto_len = 4
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size_prefix = struct.calcsize(cls._format)
|
|
size_vuoto = cls._vuoto_len * struct.calcsize('<I')
|
|
total_size = size_prefix + size_vuoto
|
|
|
|
if len(data) < total_size:
|
|
raise ValueError(f"Not enough data for ant_pos_t. Expected {total_size} bytes, got {len(data)}")
|
|
|
|
unpacked_prefix = struct.unpack_from(cls._format, data, offset=0)
|
|
vuoto_data = struct.unpack_from(f'<{cls._vuoto_len}I', data, offset=size_prefix)
|
|
|
|
return cls(*unpacked_prefix, list(vuoto_data))
|
|
|
|
@dataclass
|
|
class ant_event_t:
|
|
frame_in_progress: int
|
|
bar_in_progress: int
|
|
number_of_bars: int
|
|
current_bar_number: int
|
|
VUOTO: list[int] # lonarr(4)
|
|
|
|
_format = '<IIII' # 4 unsigned int
|
|
_vuoto_len = 4
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size_prefix = struct.calcsize(cls._format)
|
|
size_vuoto = cls._vuoto_len * struct.calcsize('<I')
|
|
total_size = size_prefix + size_vuoto
|
|
|
|
if len(data) < total_size:
|
|
raise ValueError(f"Not enough data for ant_event_t. Expected {total_size} bytes, got {len(data)}")
|
|
|
|
unpacked_prefix = struct.unpack_from(cls._format, data, offset=0)
|
|
vuoto_data = struct.unpack_from(f'<{cls._vuoto_len}I', data, offset=size_prefix)
|
|
|
|
return cls(*unpacked_prefix, list(vuoto_data))
|
|
|
|
@dataclass
|
|
class antenna_t:
|
|
position: ant_pos_t
|
|
event: ant_event_t
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
offset = 0
|
|
position = ant_pos_t.from_bytes(data[offset:])
|
|
offset += (struct.calcsize(ant_pos_t._format) + ant_pos_t._vuoto_len * struct.calcsize('<I'))
|
|
|
|
event = ant_event_t.from_bytes(data[offset:])
|
|
# offset += (struct.calcsize(ant_event_t._format) + ant_event_t._vuoto_len * struct.calcsize('<I')) # Not needed after last field
|
|
|
|
return cls(position, event)
|
|
|
|
@dataclass
|
|
class waveform_descriptor_t:
|
|
validity: int
|
|
VUOTO: list[int] # lonarr(8)
|
|
|
|
_format = '<I' # 1 unsigned int
|
|
_vuoto_len = 8
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size_prefix = struct.calcsize(cls._format)
|
|
size_vuoto = cls._vuoto_len * struct.calcsize('<I')
|
|
total_size = size_prefix + size_vuoto
|
|
|
|
if len(data) < total_size:
|
|
raise ValueError(f"Not enough data for waveform_descriptor_t. Expected {total_size} bytes, got {len(data)}")
|
|
|
|
unpacked_prefix = struct.unpack_from(cls._format, data, offset=0)
|
|
vuoto_data = struct.unpack_from(f'<{cls._vuoto_len}I', data, offset=size_prefix)
|
|
|
|
return cls(*unpacked_prefix, list(vuoto_data))
|
|
|
|
@dataclass
|
|
class general_settings_t:
|
|
missiontime: mission_time_t
|
|
navigation: navigation_t
|
|
antenna: antenna_t
|
|
waveform_descriptor: waveform_descriptor_t
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
offset = 0
|
|
missiontime = mission_time_t.from_bytes(data[offset:])
|
|
offset += struct.calcsize(mission_time_t._format)
|
|
|
|
navigation = navigation_t.from_bytes(data[offset:])
|
|
offset += (struct.calcsize(navigation_t._format_prefix) +
|
|
struct.calcsize(nav_attitude_t._format) +
|
|
struct.calcsize(nav_attitude_rate_t._format) +
|
|
struct.calcsize(nav_geo_pos_t._format) +
|
|
struct.calcsize(nav_speed_t._format) +
|
|
struct.calcsize(nav_acceleration_t._format))
|
|
|
|
antenna = antenna_t.from_bytes(data[offset:])
|
|
offset += (struct.calcsize(ant_pos_t._format) + ant_pos_t._vuoto_len * struct.calcsize('<I') +
|
|
struct.calcsize(ant_event_t._format) + ant_event_t._vuoto_len * struct.calcsize('<I'))
|
|
|
|
waveform_descriptor = waveform_descriptor_t.from_bytes(data[offset:])
|
|
# offset += (struct.calcsize(waveform_descriptor_t._format) + waveform_descriptor_t._vuoto_len * struct.calcsize('<I')) # Not needed after last field
|
|
|
|
return cls(missiontime, navigation, antenna, waveform_descriptor)
|
|
|
|
@dataclass
|
|
class nci_settings_t:
|
|
n_rbin: int
|
|
n_pulses: int
|
|
time_tag: int
|
|
conv_fact: float
|
|
|
|
_format = '<IIIf' # 3 unsigned int, 1 float
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size = struct.calcsize(cls._format)
|
|
if len(data) < size:
|
|
raise ValueError(f"Not enough data for nci_settings_t. Expected {size} bytes, got {len(data)}")
|
|
unpacked = struct.unpack_from(cls._format, data)
|
|
return cls(*unpacked)
|
|
|
|
@dataclass
|
|
class mtifft_settings_t:
|
|
TIMETAG: int
|
|
N_RBIN: int
|
|
N_PULSES: int
|
|
N_INT: int
|
|
NCLUTTER_REG: int
|
|
PRT: int
|
|
MTI_FILT_ID: int
|
|
FFT_WINDOW_ID: int
|
|
CONV_SCALE_FACTOR: float
|
|
|
|
_format = '<IIIIIIIIf' # 8 unsigned int, 1 float
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size = struct.calcsize(cls._format)
|
|
if len(data) < size:
|
|
raise ValueError(f"Not enough data for mtifft_settings_t. Expected {size} bytes, got {len(data)}")
|
|
unpacked = struct.unpack_from(cls._format, data)
|
|
return cls(*unpacked)
|
|
|
|
@dataclass
|
|
class cfar_settings_t: # Placeholder, as it's defined in another file (HEADER_CFAR)
|
|
# For now, we'll just read a fixed size of bytes for this placeholder
|
|
# In a real scenario, you'd parse HEADER_CFAR.pro to define this structure
|
|
placeholder_data: list[int]
|
|
|
|
_size_bytes = 10 * struct.calcsize('<I') # Assuming 10 L (40 bytes) as a placeholder
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
num_longs = cls._size_bytes // struct.calcsize('<I')
|
|
_format = f'<{num_longs}I'
|
|
if len(data) < cls._size_bytes:
|
|
raise ValueError(f"Not enough data for cfar_settings_t placeholder. Expected {cls._size_bytes} bytes, got {len(data)}")
|
|
unpacked = struct.unpack_from(_format, data)
|
|
return cls(list(unpacked))
|
|
|
|
@dataclass
|
|
class arf_settings_t:
|
|
N_RBIN: int
|
|
N_FFT: int
|
|
|
|
_format = '<II' # 2 unsigned int
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size = struct.calcsize(cls._format)
|
|
if len(data) < size:
|
|
raise ValueError(f"Not enough data for arf_settings_t. Expected {size} bytes, got {len(data)}")
|
|
unpacked = struct.unpack_from(cls._format, data)
|
|
return cls(*unpacked)
|
|
|
|
@dataclass
|
|
class slb_settings_t:
|
|
TIMETAG: int
|
|
SLOBE_BLANKING: int
|
|
SLBC: float
|
|
N_RBIN: int
|
|
N_FFT: int
|
|
GRIFO_GAIN: float
|
|
MAX_COLL_DET: int
|
|
COLL_MAX_RNG: int
|
|
COLL_MAX_DOP: int
|
|
CFAR_START_FILT: int
|
|
DOPPLER_NOTCH: float
|
|
PRT: int
|
|
|
|
_format = '<II f II f IIII f I' # 2 unsigned int, 1 float, 2 unsigned int, 1 float, 4 unsigned int, 1 float, 1 unsigned int
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size = struct.calcsize(cls._format)
|
|
if len(data) < size:
|
|
raise ValueError(f"Not enough data for slb_settings_t. Expected {size} bytes, got {len(data)}")
|
|
unpacked = struct.unpack_from(cls._format, data)
|
|
return cls(*unpacked)
|
|
|
|
@dataclass
|
|
class pe_settings_t:
|
|
TIMETAG: int
|
|
N_RBIN: int
|
|
LRB: float
|
|
PRT: float
|
|
MLC_FREQ: float
|
|
CLOSE_PLOT_FLAG: int
|
|
TX_CHAN: int
|
|
ANT_AZ: float
|
|
ANT_EL: float
|
|
NAV_AZ: float
|
|
NAV_EL: float
|
|
YAW: float
|
|
PITCH: float
|
|
ROLL: float
|
|
PLAT_AZ: float
|
|
PP_COAST_FLAG: int
|
|
PP_MPRF_LOOKUP: int
|
|
STT_RB_START: int
|
|
STT_RB_STOP: int
|
|
STT_FIL_START: int
|
|
STT_FIL_STOP: int
|
|
UNAMB_TGT_RANGE: float
|
|
UNAMB_TGT_DOPPLER: float
|
|
FM_SLOPE: int
|
|
RB_START: int
|
|
RB_STOP: int
|
|
FIL_START: int
|
|
FIL_STOP: int
|
|
SDR_BEFORE_COAST: float
|
|
PRF_SET: int
|
|
PRF_NUMBER: int
|
|
MTG: int
|
|
SPUR_NUM: int
|
|
DELSIGN: int
|
|
JAM_THRESH: int
|
|
CUT_OFF_VEL: int
|
|
BATCH_END: int
|
|
SPUR_FREQ: int
|
|
FILTERED_ERR_VEL_CLUTT: int
|
|
FLW_RANGE_START: int
|
|
FLW_RANGE_STOP: int
|
|
FLW_DOPPLER_START: int
|
|
FLW_DOPPLER_STOP: int
|
|
DESENSITISATION: int
|
|
BATCH_AZIM_ANGLE: int
|
|
BATCH_COUNT: int
|
|
BATCH_DUR: int
|
|
AGR_WINDOW_CENTER: int
|
|
AGR_WINDOW_LENGTH: int
|
|
NGT_ENABLE: int
|
|
MTI_CODE: int
|
|
MBC_DOP_INDEX: int
|
|
STT_TRANSITION_STATUS: int
|
|
TRACKED_TGT_AMPLITUDE: int
|
|
ALE_BLANKING_ENABLE: int
|
|
ALE_RANGE_ESTIMATE: int
|
|
ALE_BLK_START: int
|
|
ALE_BLK_STOP: int
|
|
ALE_TRK_START: int
|
|
ALE_TRK_STOP: int
|
|
BARO_ALT: int
|
|
BARO_ALT_VALIDITY: float
|
|
Z_VELOCITY: int
|
|
AGC_GAIN: int
|
|
MGC: int
|
|
EXPAND: int
|
|
tx_chan_mhz: int
|
|
lrb_meters: float
|
|
VUOTO: list[int] # lonarr(16)
|
|
|
|
_format = '<II fff II ffffffff IIII f f IIII f III IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII'
|
|
# DSPHDRIN_MARKER = 1213223748 # This is the value for 'HDRIN' in IDL, as seen in Grifo_E_data_reader.pro
|
|
DSPHDRIN_MARKER = 0x48445249 # 'HDRIN' as ASCII in hex, assuming little-endian byte order for the marker itself.
|
|
# The IDL code uses 1213223748L, which is 0x48445249 in decimal. This confirms the marker.
|
|
|
|
@dataclass
|
|
class header_sw_part1:
|
|
hdr_part1: int
|
|
hdr_part2: int
|
|
version: int
|
|
vuoto_10: int
|
|
block_header_size: int
|
|
block_size: int
|
|
skip_offset: int
|
|
counter: int
|
|
batch_counter: int
|
|
signal_counter: int
|
|
timetag: int
|
|
reserved_0: int
|
|
reserved_1: int
|
|
reserved_2: int
|
|
reserved_3: int
|
|
chunk_info_size: int
|
|
extra_info_size: int
|
|
|
|
_format = '<IIIIIIIIIIIIIIIII' # 17 unsigned int
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size = struct.calcsize(cls._format)
|
|
if len(data) < size:
|
|
raise ValueError(f"Not enough data for header_sw_part1. Expected {size} bytes, got {len(data)}")
|
|
unpacked = struct.unpack_from(cls._format, data)
|
|
return cls(*unpacked)
|
|
|
|
@dataclass
|
|
class header_sw_chank:
|
|
NOME_1: int
|
|
NOME_2: int
|
|
NOME_3: int
|
|
NOME_4: int
|
|
RESERVED: int
|
|
NUMERO_DI_CHUNKS: int
|
|
MARKER_ASCII_1: int
|
|
MARKER_ASCII_2: int
|
|
SIZE_META_INFORMATION: int
|
|
RESERVED_5: int
|
|
RESERVED_6: int
|
|
RESERVED_7: int
|
|
RESERVED_8: int
|
|
CHUNK_SIZE: int
|
|
vuoto_2: int
|
|
TIPO_: int
|
|
vuoto_7: int
|
|
|
|
_format = '<IIIIIIIIIIIIIIIII' # 17 unsigned int
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size = struct.calcsize(cls._format)
|
|
if len(data) < size:
|
|
raise ValueError(f"Not enough data for header_sw_chank. Expected {size} bytes, got {len(data)}")
|
|
unpacked = struct.unpack_from(cls._format, data)
|
|
return cls(*unpacked)
|
|
|
|
@dataclass
|
|
class header_sw:
|
|
header_sw_part1: header_sw_part1
|
|
header_sw_chank: header_sw_chank
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
offset = 0
|
|
part1 = header_sw_part1.from_bytes(data[offset:])
|
|
offset += struct.calcsize(header_sw_part1._format)
|
|
|
|
chank = header_sw_chank.from_bytes(data[offset:])
|
|
# offset += struct.calcsize(header_sw_chank._format) # Not needed after last field
|
|
|
|
return cls(part1, chank)
|
|
|
|
@dataclass
|
|
class function_settings_t:
|
|
nci_settings: nci_settings_t
|
|
mtifft_settings: mtifft_settings_t
|
|
cfar_settings: cfar_settings_t
|
|
arf_settings: arf_settings_t
|
|
slb_settings: slb_settings_t
|
|
pe_settings: pe_settings_t
|
|
sc_settings: 'sc_settings_t' # Forward reference
|
|
dbs_pe_settings: 'dbs_pe_settings_t' # Forward reference
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
offset = 0
|
|
nci = nci_settings_t.from_bytes(data[offset:])
|
|
offset += struct.calcsize(nci_settings_t._format)
|
|
|
|
mtifft = mtifft_settings_t.from_bytes(data[offset:])
|
|
offset += struct.calcsize(mtifft_settings_t._format)
|
|
|
|
cfar = cfar_settings_t.from_bytes(data[offset:])
|
|
offset += cfar_settings_t._size_bytes # Use defined size for placeholder
|
|
|
|
arf = arf_settings_t.from_bytes(data[offset:])
|
|
offset += struct.calcsize(arf_settings_t._format)
|
|
|
|
slb = slb_settings_t.from_bytes(data[offset:])
|
|
offset += struct.calcsize(slb_settings_t._format)
|
|
|
|
pe = pe_settings_t.from_bytes(data[offset:])
|
|
offset += (struct.calcsize(pe_settings_t._format) + pe_settings_t._vuoto_len * struct.calcsize('<I'))
|
|
|
|
sc = sc_settings_t.from_bytes(data[offset:])
|
|
offset += (struct.calcsize(sc_settings_t._format) + sc_settings_t._vuoto_len * struct.calcsize('<I'))
|
|
|
|
dbs_pe = dbs_pe_settings_t.from_bytes(data[offset:])
|
|
# offset += (struct.calcsize(dbs_pe_settings_t._format) + dbs_pe_settings_t._vuoto_len * struct.calcsize('<I')) # Not needed after last field
|
|
|
|
return cls(nci, mtifft, cfar, arf, slb, pe, sc, dbs_pe)
|
|
|
|
@dataclass
|
|
class sc_settings_t:
|
|
ant_nav_direction: int
|
|
ant_current_bar_number: int
|
|
ant_nav_scan_center: int
|
|
ant_nav_left_scan_limit: int
|
|
ant_nav_right_scan_limit: int
|
|
rbm_white_level: int
|
|
map_comp_vx: int
|
|
map_comp_vy: int
|
|
VUOTO: list[int] # lonarr(8)
|
|
|
|
_format = '<IIIIIIII' # 8 unsigned int
|
|
_vuoto_len = 8
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size_prefix = struct.calcsize(cls._format)
|
|
size_vuoto = cls._vuoto_len * struct.calcsize('<I')
|
|
total_size = size_prefix + size_vuoto
|
|
|
|
if len(data) < total_size:
|
|
raise ValueError(f"Not enough data for sc_settings_t. Expected {total_size} bytes, got {len(data)}")
|
|
|
|
unpacked_prefix = struct.unpack_from(cls._format, data, offset=0)
|
|
vuoto_data = struct.unpack_from(f'<{cls._vuoto_len}I', data, offset=size_prefix)
|
|
|
|
return cls(*unpacked_prefix, list(vuoto_data))
|
|
|
|
@dataclass
|
|
class dbs_pe_settings_t:
|
|
dbs_flush_flag: int
|
|
int_rbin_offset: int
|
|
skip_processing: int
|
|
VUOTO: list[int] # lonarr(8)
|
|
|
|
_format = '<III' # 3 unsigned int
|
|
_vuoto_len = 8
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
size_prefix = struct.calcsize(cls._format)
|
|
size_vuoto = cls._vuoto_len * struct.calcsize('<I')
|
|
total_size = size_prefix + size_vuoto
|
|
|
|
if len(data) < total_size:
|
|
raise ValueError(f"Not enough data for dbs_pe_settings_t. Expected {total_size} bytes, got {len(data)}")
|
|
|
|
unpacked_prefix = struct.unpack_from(cls._format, data, offset=0)
|
|
vuoto_data = struct.unpack_from(f'<{cls._vuoto_len}I', data, offset=size_prefix)
|
|
|
|
return cls(*unpacked_prefix, list(vuoto_data))
|
|
|
|
@dataclass
|
|
class ge_header_t:
|
|
header_info: header_info_t
|
|
signal_descr: signal_descr_t
|
|
mode: mode_descr_t
|
|
sp_settings: sp_settings_t
|
|
general_settings: general_settings_t
|
|
function_settings: function_settings_t
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
offset = 0
|
|
h_info = header_info_t.from_bytes(data[offset:])
|
|
offset += (struct.calcsize(header_info_t._format) + header_info_t._vuoto_len * struct.calcsize('<I'))
|
|
|
|
s_descr = signal_descr_t.from_bytes(data[offset:])
|
|
offset += (struct.calcsize(signal_descr_t._format_prefix) +
|
|
struct.calcsize(PACKETDESCR_t._format) +
|
|
struct.calcsize(signal_descr_t._format_suffix))
|
|
|
|
mode = mode_descr_t.from_bytes(data[offset:])
|
|
offset += (struct.calcsize(mode_descr_t._format) + mode_descr_t._vuoto_len * struct.calcsize('<I'))
|
|
|
|
sp_settings = sp_settings_t.from_bytes(data[offset:])
|
|
offset += (struct.calcsize(exp_settings_t._format) +
|
|
struct.calcsize(det_settings_t._format) +
|
|
struct.calcsize(pc_settings_t._format) +
|
|
struct.calcsize(agc_settings_t._format))
|
|
|
|
general_settings = general_settings_t.from_bytes(data[offset:])
|
|
offset += (struct.calcsize(mission_time_t._format) +
|
|
struct.calcsize(navigation_t._format_prefix) +
|
|
struct.calcsize(nav_attitude_t._format) +
|
|
struct.calcsize(nav_attitude_rate_t._format) +
|
|
struct.calcsize(nav_geo_pos_t._format) +
|
|
struct.calcsize(nav_speed_t._format) +
|
|
struct.calcsize(nav_acceleration_t._format) +
|
|
struct.calcsize(ant_pos_t._format) + ant_pos_t._vuoto_len * struct.calcsize('<I') +
|
|
struct.calcsize(ant_event_t._format) + ant_event_t._vuoto_len * struct.calcsize('<I') +
|
|
struct.calcsize(waveform_descriptor_t._format) + waveform_descriptor_t._vuoto_len * struct.calcsize('<I'))
|
|
|
|
func_settings = function_settings_t.from_bytes(data[offset:])
|
|
# offset += ... # Not needed after last field
|
|
|
|
return cls(h_info, s_descr, mode, sp_settings, general_settings, func_settings)
|
|
|
|
@dataclass
|
|
class Header:
|
|
header_sw: header_sw
|
|
header_data: ge_header_t
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes):
|
|
offset = 0
|
|
h_sw = header_sw.from_bytes(data[offset:])
|
|
offset += (struct.calcsize(header_sw_part1._format) + struct.calcsize(header_sw_chank._format))
|
|
|
|
h_data = ge_header_t.from_bytes(data[offset:])
|
|
# offset += ... # Not needed after last field
|
|
|
|
return cls(h_sw, h_data)
|
|
|
|
def read_dsphrdin_block(file_handle, offset: int) -> dict:
|
|
"""
|
|
Reads a DSPHDRIN block from the file at the given offset.
|
|
Returns a dictionary of the extracted data.
|
|
"""
|
|
file_handle.seek(offset)
|
|
|
|
# Read enough bytes to cover the entire Header structure (based on header_gen_v1.pro)
|
|
# This is a rough estimate; a more robust solution would calculate exact size
|
|
# by summing up all struct sizes. For now, let's read a generous amount.
|
|
# A typical header might be a few hundred bytes. Let's assume 1024 bytes for safety.
|
|
# In a real scenario, you'd calculate the exact size of the Header class.
|
|
|
|
# Calculate the total size of the Header structure for header_gen_v1.pro
|
|
total_header_size = (
|
|
struct.calcsize(header_sw_part1._format) +
|
|
struct.calcsize(header_sw_chank._format) +
|
|
struct.calcsize(header_info_t._format) + header_info_t._vuoto_len * struct.calcsize('<I') +
|
|
struct.calcsize(signal_descr_t._format_prefix) +
|
|
struct.calcsize(PACKETDESCR_t._format) +
|
|
struct.calcsize(signal_descr_t._format_suffix) +
|
|
struct.calcsize(mode_descr_t._format) + mode_descr_t._vuoto_len * struct.calcsize('<I') +
|
|
struct.calcsize(exp_settings_t._format) +
|
|
struct.calcsize(det_settings_t._format) +
|
|
struct.calcsize(pc_settings_t._format) +
|
|
struct.calcsize(agc_settings_t._format) +
|
|
struct.calcsize(mission_time_t._format) +
|
|
struct.calcsize(navigation_t._format_prefix) +
|
|
struct.calcsize(nav_attitude_t._format) +
|
|
struct.calcsize(nav_attitude_rate_t._format) +
|
|
struct.calcsize(nav_geo_pos_t._format) +
|
|
struct.calcsize(nav_speed_t._format) +
|
|
struct.calcsize(nav_acceleration_t._format) +
|
|
struct.calcsize(ant_pos_t._format) + ant_pos_t._vuoto_len * struct.calcsize('<I') +
|
|
struct.calcsize(ant_event_t._format) + ant_event_t._vuoto_len * struct.calcsize('<I') +
|
|
struct.calcsize(waveform_descriptor_t._format) + waveform_descriptor_t._vuoto_len * struct.calcsize('<I') +
|
|
struct.calcsize(nci_settings_t._format) +
|
|
struct.calcsize(mtifft_settings_t._format) +
|
|
cfar_settings_t._size_bytes + # Placeholder size
|
|
struct.calcsize(arf_settings_t._format) +
|
|
struct.calcsize(slb_settings_t._format) +
|
|
struct.calcsize(pe_settings_t._format) + pe_settings_t._vuoto_len * struct.calcsize('<I') +
|
|
struct.calcsize(sc_settings_t._format) + sc_settings_t._vuoto_len * struct.calcsize('<I') +
|
|
struct.calcsize(dbs_pe_settings_t._format) + dbs_pe_settings_t._vuoto_len * struct.calcsize('<I')
|
|
)
|
|
|
|
header_bytes = file_handle.read(total_header_size)
|
|
if not header_bytes:
|
|
return None
|
|
|
|
try:
|
|
header_data = Header.from_bytes(header_bytes)
|
|
return asdict(header_data)
|
|
except ValueError as e:
|
|
print(f"Error parsing DSPHDRIN block at offset {offset}: {e}")
|
|
return None
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Extract DSPHDRIN blocks from a radar .out file and save to JSON.")
|
|
parser.add_argument("input_file", help="Path to the input .out file.")
|
|
parser.add_argument("output_file", help="Path to the output JSON file.")
|
|
args = parser.parse_args()
|
|
|
|
extracted_data = []
|
|
dsphrdin_marker_bytes = DSPHDRIN_MARKER.to_bytes(4, 'little') # Convert marker to 4-byte little-endian
|
|
|
|
try:
|
|
with open(args.input_file, 'rb') as f:
|
|
file_content = f.read()
|
|
|
|
offset = 0
|
|
while True:
|
|
# Search for the marker
|
|
marker_pos = file_content.find(dsphrdin_marker_bytes, offset)
|
|
|
|
if marker_pos == -1:
|
|
break # No more markers found
|
|
|
|
# The DSPHDRIN marker is part of the header_info_t.header_marker field.
|
|
# The actual DSPHDRIN block starts earlier, at the beginning of the Header structure.
|
|
# We need to find the start of the *entire* Header structure.
|
|
# From Grifo_E_data_reader.pro, the DSPHDRIN marker (1213223748L) is found at
|
|
# NAME_BLOCK_V[index_hdr[i]+j], and then the readu is done from INDEX_BLOCK.
|
|
# INDEX_BLOCK is ixBlocks[index_hdr[i]+j].
|
|
# NAME_BLOCK_V = vector[ixBlocks+17]
|
|
# This means the marker is 17 * 4 bytes (68 bytes) into the block.
|
|
# So, the actual start of the block is marker_pos - (17 * 4) bytes.
|
|
|
|
header_start_offset = marker_pos - (17 * 4) # 17 unsigned ints before the marker
|
|
|
|
if header_start_offset < 0:
|
|
print(f"Warning: DSPHDRIN marker found at {marker_pos}, but calculated header start offset {header_start_offset} is negative. Skipping.")
|
|
offset = marker_pos + len(dsphrdin_marker_bytes)
|
|
continue
|
|
|
|
print(f"Found DSPHDRIN marker at byte offset: {marker_pos}. Attempting to parse header from offset: {header_start_offset}")
|
|
|
|
# Read the block from the calculated start offset
|
|
f.seek(header_start_offset)
|
|
|
|
# Read enough bytes for the header.
|
|
# This is a simplified approach. A more robust solution would
|
|
# read the header_sw_part1 first to get block_size, then read that many bytes.
|
|
# For now, we rely on the fixed size calculation in read_dsphrdin_block.
|
|
|
|
# Pass the file handle and the calculated start offset
|
|
header_data = read_dsphrdin_block(f, header_start_offset)
|
|
|
|
if header_data:
|
|
extracted_data.append(header_data)
|
|
|
|
# Move the offset past the current header block to search for the next one
|
|
# For simplicity, we'll just move past the marker for now.
|
|
# A more accurate approach would use the block_size from the header.
|
|
offset = marker_pos + len(dsphrdin_marker_bytes)
|
|
|
|
except FileNotFoundError:
|
|
print(f"Error: Input file '{args.input_file}' not found.")
|
|
return
|
|
except Exception as e:
|
|
print(f"An unexpected error occurred: {e}")
|
|
return
|
|
|
|
if extracted_data:
|
|
with open(args.output_file, 'w') as f_json:
|
|
json.dump(extracted_data, f_json, indent=4)
|
|
print(f"Successfully extracted {len(extracted_data)} DSPHDRIN blocks to '{args.output_file}'.")
|
|
else:
|
|
print("No DSPHDRIN blocks found or extracted.")
|
|
|
|
if __name__ == "__main__":
|
|
main() |