add d1553 block

This commit is contained in:
VALLONGOL 2025-06-25 08:49:04 +02:00
parent c7d78cf343
commit ac5a29ded4
5 changed files with 396 additions and 208 deletions

View File

@ -18,31 +18,6 @@
"data_path": "main_header.ge_header.signal_descr.ttag",
"translate_with_enum": false
},
{
"column_name": "master_mode",
"data_path": "main_header.ge_header.mode.master_mode",
"translate_with_enum": true
},
{
"column_name": "operation_mode",
"data_path": "main_header.ge_header.mode.operation_mode",
"translate_with_enum": true
},
{
"column_name": "range_scale",
"data_path": "main_header.ge_header.mode.range_scale",
"translate_with_enum": false
},
{
"column_name": "batch_id",
"data_path": "cdp_sts_results.payload.data.timetag_chunk.data.batch_id",
"translate_with_enum": false
},
{
"column_name": "time",
"data_path": "cdp_sts_results.payload.data.timetag_chunk.data.time",
"translate_with_enum": false
},
{
"column_name": "mode",
"data_path": "cdp_sts_results.payload.data.status_chunk.data.mode",
@ -59,48 +34,23 @@
"translate_with_enum": true
},
{
"column_name": "tcr",
"data_path": "timer_data.blob.payload.tcr",
"column_name": "baro_altitude_m",
"data_path": "d1553_data.baro_altitude_m",
"translate_with_enum": false
},
{
"column_name": "diff_prt_num",
"data_path": "timer_data.blob.payload.diff_prt_num",
"column_name": "latitude_deg",
"data_path": "d1553_data.latitude_deg",
"translate_with_enum": false
},
{
"column_name": "B_Filter",
"data_path": "timer_data.blob.payload.shift.B_Filter",
"column_name": "longitude_deg",
"data_path": "d1553_data.longitude_deg",
"translate_with_enum": false
},
{
"column_name": "RX_SYNC",
"data_path": "timer_data.blob.payload.shift.RX_SYNC",
"translate_with_enum": false
},
{
"column_name": "exp_pulse1_delay",
"data_path": "timer_data.blob.payload.exp_pulse1_delay[0].fifo[0]",
"translate_with_enum": false
},
{
"column_name": "updates",
"data_path": "aesa_data.payload.updates",
"translate_with_enum": false
},
{
"column_name": "ignore_aesa_status",
"data_path": "aesa_data.payload.ignore_aesa_status",
"translate_with_enum": false
},
{
"column_name": "cbite_mode",
"data_path": "aesa_data.payload.cbite_mode",
"translate_with_enum": false
},
{
"column_name": "download_map_executed",
"data_path": "aesa_data.payload.download_map_executed",
"column_name": "true_heading_deg",
"data_path": "d1553_data.true_heading_deg",
"translate_with_enum": false
}
]

View File

@ -42,7 +42,7 @@ BLOCK_TYPE_MAP = {
1181316173: "MTIFFT",
892678468: "D1553",
1397769283: "CDPSTS",
1095976257: "AESA", # ID Generico per AESA
1095976257: "AESA",
1397773124: "DSPS",
5265477: "EXP",
17232: "PC",
@ -510,86 +510,52 @@ class GrifoTimerBlob(CtypesStructureBase):
# --- AESA Block and Sub-structures (ctypes) ---
# Le costanti AESA_TX_MESSAGE_MAX_SIZE e AESA_RX_MESSAGE_MAX_SIZE
# riflettono la dimensione massima dei *messaggi* definiti in AESA_IF_TYPES_H.
# La dimensione reale dei *buffer* all'interno di AntennaCmdBuffer/AntennaReplyBuffer
# potrebbe differire per via di allineamenti o altri dati non esposti.
# Le dimensioni reali viste nel file sono: TX (8424), RX (4840), STATUS (256)
# Le nostre struct AESA_CMD_BUFFER e AESA_REPLY_BUFFER devono riflettere
# i loro usi specifici all'interno dei blocchi.
# Ricalcoliamo le dimensioni delle struct basandoci sui log reali del tuo file,
# considerando che i buffer interni potrebbero essere più piccoli o che ci siano header/footer aggiuntivi
# che non sono parte della struct C++ originale ma sono presenti nel file.
# Supponiamo che il blocco AESA sia un wrapper attorno a queste struct.
# AESA_TX_MESSAGE_MAX_SIZE = 8192 (da AesaStream.h)
# AESA_RX_MESSAGE_MAX_SIZE = 2048 (da AESA_IF_TYPES_H.h)
# Per i blocchi AESA, ci basiamo sulla dimensione totale del blocco nel file.
# La definizione di AntennaCmdBuffer e AntennaReplyBuffer sono *giuste* per le struct C++ pure.
# Il problema è che il blocco AESA nel file contiene *più* di queste struct pure.
# AESA_TX_BLOCK_SIZE = 8424
# AESA_RX_BLOCK_SIZE = 4840
# AESA_SYNTH_REPORT_BLOCK_SIZE = 256
# Visto che i dati AESA sono messaggi complessi, li gestiamo con raw data per ora.
# Le definizioni di queste struct rimangono qui per riferimento, ma non saranno usate direttamente per il parsing del blocco AESA.
# Serviranno quando decideremo di implementare un parser dettagliato per il contenuto dei messaggi.
# Lasciamo le costanti per chiarezza, ma le classi AntennaCmdBuffer e AntennaReplyBuffer
# verranno rimosse dall'uso diretto nel parsing del blocco AESA.
class AesaSyntheticReport(CtypesStructureBase):
_fields_ = [("aesa_fail_mask", ctypes.c_uint), ("comm", ctypes.c_uint)]
# Per il blocco AESA, ci aspettiamo solo AesaSyntheticReport (se la dimensione è 256).
# Gli altri tipi AESA (TX e RX) non sono ancora gestiti in dettaglio.
class AntennaCmdBuffer(CtypesStructureBase):
# --- D1553 Block and Sub-structures (ctypes) ---
ICD1553_GEOPOS_DEG_LSB = (4.65661e-10) * 180.0
ICD1553_SEMICIRCLE_DEG_LSB = (3.05176e-05) * 180.0
ICD1553_BARO_ALT_METERS_LSB = 4.0 * (1.0 / 3.280839895)
class AvionicsMessage(CtypesStructureBase):
_fields_ = [
("h", ctypes.c_uint32 * 8),
("data", ctypes.c_uint8 * 8192),
] # Corrisponde a 8224 byte
class AntennaErrCounters(CtypesStructureBase):
_fields_ = [
("rfif_sts_received", ctypes.c_uint),
("rfif_bite_received", ctypes.c_uint),
("rfif_rxn", ctypes.c_uint),
("rfif_duty", ctypes.c_uint),
("rfif_overpulse", ctypes.c_uint),
("spare", ctypes.c_uint * 16),
("tag", ctypes.c_uint32),
("att", ctypes.c_uint32 * 9),
("a1", ctypes.c_uint16 * 32),
("a2", ctypes.c_uint16 * 32),
("a3", ctypes.c_uint16 * 32),
("a4", ctypes.c_uint16 * 32),
("a5", ctypes.c_uint16 * 32),
("a7", ctypes.c_uint16 * 32),
("a8", ctypes.c_uint16 * 32),
("x1", ctypes.c_uint16 * 32),
("btt", ctypes.c_uint32 * 32),
("b1", ctypes.c_uint16 * 32),
("b2", ctypes.c_uint16 * 32),
("b3", ctypes.c_uint16 * 32),
("b4", ctypes.c_uint16 * 32),
("b5", ctypes.c_uint16 * 32),
("b6", ctypes.c_uint16 * 32),
("b7", ctypes.c_uint16 * 32),
("b8", ctypes.c_uint16 * 32),
("b9", ctypes.c_uint16 * 32),
("b10", ctypes.c_uint16 * 32),
("b11", ctypes.c_uint16 * 32),
("b12", ctypes.c_uint16 * 32),
("b13", ctypes.c_uint16 * 32),
("b14", ctypes.c_uint16 * 32),
("b15", ctypes.c_uint16 * 32),
("b16", ctypes.c_uint16 * 32),
("b17", ctypes.c_uint16 * 32),
("b18", ctypes.c_uint16 * 32),
]
class AntennaReplyBuffer(CtypesStructureBase):
_fields_ = [
("updates", ctypes.c_uint),
("hstate", ctypes.c_uint),
("response_decoded", ctypes.c_uint),
("msg_count_errors", ctypes.c_uint),
("timeouterr", ctypes.c_uint),
("crcerr", ctypes.c_uint),
("lenerr", ctypes.c_uint),
("rxerr", ctypes.c_uint),
("pri_err", ctypes.c_int),
("ignore_aesa_status", ctypes.c_int),
("download_map_executed", ctypes.c_uint),
("spare", ctypes.c_uint * 6),
("data_updates", ctypes.c_uint),
("data_size", ctypes.c_uint),
("data", ctypes.c_uint8 * 2048),
("cbite_mode", ctypes.c_uint),
("errs", AntennaErrCounters),
] # Corrisponde a 2240 byte
class AesaSyntheticReport(CtypesStructureBase):
_fields_ = [
("aesa_fail_mask", ctypes.c_uint),
("comm", ctypes.c_uint),
] # Dimensione 8 byte
class D1553Payload(CtypesStructureBase):
_fields_ = [("d", AvionicsMessage)]
# --- Top-Level Block Definitions (Python-side) ---
@ -612,15 +578,44 @@ class TimerBlock(BaseBlock):
@dataclass
class AesaBlock(BaseBlock):
block_subtype: str # "SYNTHETIC", "TX_RAW", "RX_RAW", "UNKNOWN"
block_subtype: str
is_valid: bool
# Per AESA, payload sarà un'istanza della struct appropriata,
# o un buffer raw per i tipi non parsati in dettaglio.
payload: Optional[Union[AesaSyntheticReport, bytes]] = None
# For AESA, we will also store the raw bytes to allow for later custom parsing
raw_data_bytes: Optional[bytes] = None
@dataclass
class D1553Block(BaseBlock):
is_valid: bool
payload: Optional[D1553Payload] = None
@property
def latitude_deg(self) -> Optional[float]:
if not (self.is_valid and self.payload):
return None
raw_val = (self.payload.d.a4[23] << 16) | self.payload.d.a4[24]
return ctypes.c_int32(raw_val).value * ICD1553_GEOPOS_DEG_LSB
@property
def longitude_deg(self) -> Optional[float]:
if not (self.is_valid and self.payload):
return None
raw_val = (self.payload.d.a4[25] << 16) | self.payload.d.a4[26]
return ctypes.c_int32(raw_val).value * ICD1553_GEOPOS_DEG_LSB
@property
def baro_altitude_m(self) -> Optional[float]:
if not (self.is_valid and self.payload):
return None
return self.payload.d.a4[9] * ICD1553_BARO_ALT_METERS_LSB
@property
def true_heading_deg(self) -> Optional[float]:
if not (self.is_valid and self.payload):
return None
return ctypes.c_int16(self.payload.d.a4[2]).value * ICD1553_SEMICIRCLE_DEG_LSB
@dataclass
class SignalBlock(BaseBlock):
signal_type: str
@ -639,6 +634,7 @@ class DataBatch:
cdp_sts_results: Optional[CdpStsBlock] = None
timer_data: Optional[TimerBlock] = None
aesa_data: Optional[AesaBlock] = None
d1553_data: Optional[D1553Block] = None
@property
def main_header(self) -> Optional[DspHeaderIn]:

View File

@ -298,6 +298,8 @@ class RadarFileReader:
current_batch.timer_data = parsed_block
elif isinstance(parsed_block, ds.AesaBlock):
current_batch.aesa_data = parsed_block
elif isinstance(parsed_block, ds.D1553Block): # New block type
current_batch.d1553_data = parsed_block
if current_batch:
yield current_batch, len(self.block_metadata)

View File

@ -22,15 +22,26 @@ def _parse_ge_header_block(block_data_bytes: bytes) -> Optional[ds.DspHeaderIn]:
GE_HEADER_START_OFFSET_BYTES = 136
GE_HEADER_SIZE = ctypes.sizeof(ds.GeHeader)
if len(block_data_bytes) < GE_HEADER_START_OFFSET_BYTES + GE_HEADER_SIZE:
log.warning(f"DSPHDRIN block too small for GeHeader. Size: {len(block_data_bytes)}, Required: {GE_HEADER_START_OFFSET_BYTES + GE_HEADER_SIZE}")
log.warning(
f"DSPHDRIN block too small for GeHeader. Size: {len(block_data_bytes)}, Required: {GE_HEADER_START_OFFSET_BYTES + GE_HEADER_SIZE}"
)
return None
try:
return ds.DspHeaderIn(block_name="DSPHDRIN", block_size_words=len(block_data_bytes) // 4, ge_header=ds.GeHeader.from_buffer_copy(block_data_bytes, GE_HEADER_START_OFFSET_BYTES))
return ds.DspHeaderIn(
block_name="DSPHDRIN",
block_size_words=len(block_data_bytes) // 4,
ge_header=ds.GeHeader.from_buffer_copy(
block_data_bytes, GE_HEADER_START_OFFSET_BYTES
),
)
except (ValueError, TypeError) as e:
log.error(f"Failed to map data to GeHeader: {e}", exc_info=True)
return None
def _parse_cdpsts_block(block_data_bytes: bytes, block_name: str, block_size_words: int) -> ds.CdpStsBlock:
def _parse_cdpsts_block(
block_data_bytes: bytes, block_name: str, block_size_words: int
) -> ds.CdpStsBlock:
"""
Parses a CDPSTS block payload, expecting a CdpStsPayload structure
to be embedded at a fixed offset.
@ -38,18 +49,38 @@ def _parse_cdpsts_block(block_data_bytes: bytes, block_name: str, block_size_wor
PAYLOAD_START_OFFSET_BYTES = 144
required_size = PAYLOAD_START_OFFSET_BYTES + ctypes.sizeof(ds.CdpStsPayload)
if len(block_data_bytes) < required_size:
log.warning(f"CDPSTS block too small for embedded payload. Size: {len(block_data_bytes)}, Required: {required_size}")
return ds.CdpStsBlock(block_name=block_name, block_size_words=block_size_words, is_valid=False)
log.warning(
f"CDPSTS block too small for embedded payload. Size: {len(block_data_bytes)}, Required: {required_size}"
)
return ds.CdpStsBlock(
block_name=block_name, block_size_words=block_size_words, is_valid=False
)
try:
payload_struct = ds.CdpStsPayload.from_buffer_copy(block_data_bytes, PAYLOAD_START_OFFSET_BYTES)
is_valid = (payload_struct.mem_header.marker_low == 0x5A5AA5A5 and payload_struct.mem_header.marker_high == 0x12345678)
if not is_valid: log.debug(f"CDPSTS block at offset has invalid shared memory marker.")
return ds.CdpStsBlock(block_name=block_name, block_size_words=block_size_words, is_valid=is_valid, payload=payload_struct)
payload_struct = ds.CdpStsPayload.from_buffer_copy(
block_data_bytes, PAYLOAD_START_OFFSET_BYTES
)
is_valid = (
payload_struct.mem_header.marker_low == 0x5A5AA5A5
and payload_struct.mem_header.marker_high == 0x12345678
)
if not is_valid:
log.debug(f"CDPSTS block at offset has invalid shared memory marker.")
return ds.CdpStsBlock(
block_name=block_name,
block_size_words=block_size_words,
is_valid=is_valid,
payload=payload_struct,
)
except Exception as e:
log.error(f"Failed to map data to CdpStsPayload: {e}", exc_info=True)
return ds.CdpStsBlock(block_name=block_name, block_size_words=block_size_words, is_valid=False)
return ds.CdpStsBlock(
block_name=block_name, block_size_words=block_size_words, is_valid=False
)
def _parse_timer_block(block_data_bytes: bytes, block_name: str, block_size_words: int) -> ds.TimerBlock:
def _parse_timer_block(
block_data_bytes: bytes, block_name: str, block_size_words: int
) -> ds.TimerBlock:
"""
Parses a TIMER block by mapping it to the GrifoTimerBlob ctypes structure.
This version flexibly handles blocks that may be smaller than the full struct size.
@ -59,97 +90,239 @@ def _parse_timer_block(block_data_bytes: bytes, block_name: str, block_size_word
mappable_size = min(full_struct_size, actual_block_size)
header_size = ctypes.sizeof(ds.GrifoFwBlobHeader)
if actual_block_size < header_size:
log.warning(f"TIMER block is too small to even contain a header. Size: {actual_block_size}")
return ds.TimerBlock(block_name=block_name, block_size_words=block_size_words, is_valid=False)
log.warning(
f"TIMER block is too small to even contain a header. Size: {actual_block_size}"
)
return ds.TimerBlock(
block_name=block_name, block_size_words=block_size_words, is_valid=False
)
try:
timer_blob = ds.GrifoTimerBlob(); ctypes.memmove(ctypes.addressof(timer_blob), block_data_bytes, mappable_size)
is_valid = (timer_blob.hdr.header_marker == 0x12345678 and timer_blob.hdr.sub_marker == 0x54494D45) # 'TIME'
if not is_valid: log.debug("TIMER block has an invalid internal Grifo FW blob header marker.")
return ds.TimerBlock(block_name=block_name, block_size_words=block_size_words, is_valid=is_valid, blob=timer_blob)
timer_blob = ds.GrifoTimerBlob()
ctypes.memmove(ctypes.addressof(timer_blob), block_data_bytes, mappable_size)
is_valid = (
timer_blob.hdr.header_marker == 0x12345678
and timer_blob.hdr.sub_marker == 0x54494D45
) # 'TIME'
if not is_valid:
log.debug(
"TIMER block has an invalid internal Grifo FW blob header marker."
)
return ds.TimerBlock(
block_name=block_name,
block_size_words=block_size_words,
is_valid=is_valid,
blob=timer_blob,
)
except Exception as e:
log.error(f"Failed to map data to GrifoTimerBlob: {e}", exc_info=True)
return ds.TimerBlock(block_name=block_name, block_size_words=block_size_words, is_valid=False)
return ds.TimerBlock(
block_name=block_name, block_size_words=block_size_words, is_valid=False
)
# --- Funzione di parsing AESA ---
def _parse_aesa_block(block_data_bytes: bytes, block_name: str, block_size_words: int) -> ds.AesaBlock:
def _parse_aesa_block(
block_data_bytes: bytes, block_name: str, block_size_words: int
) -> ds.AesaBlock:
"""
Parses a generic AESA block, identifying its subtype based on size and storing raw data.
"""
actual_size = len(block_data_bytes)
# Observed AESA block sizes from previous logs:
SYNTH_REPORT_SIZE = 256
TX_RAW_BLOCK_SIZE = 8424
RX_RAW_BLOCK_SIZE = 4840
UNKNOWN_AESA_2816_SIZE = 2816
UNKNOWN_AESA_1792_SIZE = 1792 # Newly observed size
UNKNOWN_AESA_1792_SIZE = 1792
# AESA Synthetic Report (e.g., from aesa_report.h)
if actual_size == SYNTH_REPORT_SIZE:
synth_payload_size = ctypes.sizeof(ds.AesaSyntheticReport)
if actual_size >= synth_payload_size:
try:
synth_report = ds.AesaSyntheticReport.from_buffer_copy(block_data_bytes[:synth_payload_size])
log.debug(f"AESA block (size {actual_size}) identified as SYNTHETIC Report.")
return ds.AesaBlock(block_name=block_name, block_subtype="SYNTHETIC", is_valid=True, payload=synth_report, raw_data_bytes=block_data_bytes, block_size_words=block_size_words)
synth_report = ds.AesaSyntheticReport.from_buffer_copy(
block_data_bytes[:synth_payload_size]
)
log.debug(
f"AESA block (size {actual_size}) identified as SYNTHETIC Report."
)
return ds.AesaBlock(
block_name=block_name,
block_subtype="SYNTHETIC",
is_valid=True,
payload=synth_report,
raw_data_bytes=block_data_bytes,
block_size_words=block_size_words,
)
except Exception as e:
log.error(f"Failed to map AesaSyntheticReport (size {actual_size}): {e}", exc_info=True)
return ds.AesaBlock(block_name=block_name, block_subtype="SYNTHETIC", is_valid=False, raw_data_bytes=block_data_bytes, block_size_words=block_size_words)
log.error(
f"Failed to map AesaSyntheticReport (size {actual_size}): {e}",
exc_info=True,
)
return ds.AesaBlock(
block_name=block_name,
block_subtype="SYNTHETIC",
is_valid=False,
raw_data_bytes=block_data_bytes,
block_size_words=block_size_words,
)
else:
log.warning(f"AESA block (size {actual_size}) too small for SYNTHETIC Report payload ({synth_payload_size}).")
return ds.AesaBlock(block_name=block_name, block_subtype="SYNTHETIC", is_valid=False, raw_data_bytes=block_data_bytes, block_size_words=block_size_words)
log.warning(
f"AESA block (size {actual_size}) too small for SYNTHETIC Report payload ({synth_payload_size})."
)
return ds.AesaBlock(
block_name=block_name,
block_subtype="SYNTHETIC",
is_valid=False,
raw_data_bytes=block_data_bytes,
block_size_words=block_size_words,
)
# AESA TX Raw Block (Observed size 8424)
elif actual_size == TX_RAW_BLOCK_SIZE:
log.debug(f"AESA block (size {actual_size}) identified as TX Raw Data.")
return ds.AesaBlock(block_name=block_name, block_subtype="TX_RAW", is_valid=True, raw_data_bytes=block_data_bytes, block_size_words=block_size_words)
# AESA RX Raw Block (Observed size 4840)
return ds.AesaBlock(
block_name=block_name,
block_subtype="TX_RAW",
is_valid=True,
raw_data_bytes=block_data_bytes,
block_size_words=block_size_words,
)
elif actual_size == RX_RAW_BLOCK_SIZE:
log.debug(f"AESA block (size {actual_size}) identified as RX Raw Data.")
return ds.AesaBlock(block_name=block_name, block_subtype="RX_RAW", is_valid=True, raw_data_bytes=block_data_bytes, block_size_words=block_size_words)
return ds.AesaBlock(
block_name=block_name,
block_subtype="RX_RAW",
is_valid=True,
raw_data_bytes=block_data_bytes,
block_size_words=block_size_words,
)
# AESA UNKNOWN (2816)
elif actual_size == UNKNOWN_AESA_2816_SIZE:
log.debug(f"AESA block (size {actual_size}) identified as UNKNOWN AESA (2816).")
return ds.AesaBlock(block_name=block_name, block_subtype="UNKNOWN_AESA_2816_RAW", is_valid=True, raw_data_bytes=block_data_bytes, block_size_words=block_size_words)
return ds.AesaBlock(
block_name=block_name,
block_subtype="UNKNOWN_AESA_2816_RAW",
is_valid=True,
raw_data_bytes=block_data_bytes,
block_size_words=block_size_words,
)
# AESA UNKNOWN (1792)
elif actual_size == UNKNOWN_AESA_1792_SIZE:
log.debug(f"AESA block (size {actual_size}) identified as UNKNOWN AESA (1792).")
return ds.AesaBlock(block_name=block_name, block_subtype="UNKNOWN_AESA_1792_RAW", is_valid=True, raw_data_bytes=block_data_bytes, block_size_words=block_size_words)
return ds.AesaBlock(
block_name=block_name,
block_subtype="UNKNOWN_AESA_1792_RAW",
is_valid=True,
raw_data_bytes=block_data_bytes,
block_size_words=block_size_words,
)
# Fallback for any other size
else:
log.warning(f"AESA block size ({actual_size}) does not match any known AESA subtypes (SYNTHETIC: {SYNTH_REPORT_SIZE}, TX_RAW: {TX_RAW_BLOCK_SIZE}, RX_RAW: {RX_RAW_BLOCK_SIZE}, UNKNOWN_2816: {UNKNOWN_AESA_2816_SIZE}, UNKNOWN_1792: {UNKNOWN_AESA_1792_SIZE}).")
return ds.AesaBlock(block_name=block_name, block_subtype="UNKNOWN_RAW", is_valid=False, raw_data_bytes=block_data_bytes, block_size_words=block_size_words)
log.warning(
f"AESA block size ({actual_size}) does not match any known AESA subtypes (SYNTHETIC: {SYNTH_REPORT_SIZE}, TX_RAW: {TX_RAW_BLOCK_SIZE}, RX_RAW: {RX_RAW_BLOCK_SIZE}, UNKNOWN_2816: {UNKNOWN_AESA_2816_SIZE}, UNKNOWN_1792: {UNKNOWN_AESA_1792_SIZE})."
)
return ds.AesaBlock(
block_name=block_name,
block_subtype="UNKNOWN_RAW",
is_valid=False,
raw_data_bytes=block_data_bytes,
block_size_words=block_size_words,
)
def _parse_signal_block(block_data: np.ndarray, block_size_words: int, signal_type: str, last_header: Optional[ds.DspHeaderIn]) -> ds.SignalBlock:
"""Parses a block of I/Q signal data (SUM, GUARD, etc.)."""
log.debug(f"Parsing signal block of type '{signal_type}'.")
if not last_header: n_rbin, n_pri = 0, 0
else: n_rbin = last_header.ge_header.signal_descr.packet_descr.nrbin; n_pri = last_header.ge_header.signal_descr.packet_descr.npri
empty_block = ds.SignalBlock(block_name=signal_type, block_size_words=block_size_words, signal_type=signal_type, iq_data=np.array([]))
if n_rbin <= 0 or n_pri <= 0: return empty_block
# --- Nuova funzione di parsing D1553 ---
def _parse_d1553_block(
block_data_bytes: bytes, block_name: str, block_size_words: int
) -> ds.D1553Block:
"""
Parses a D1553 block by mapping its payload to the D1553Payload structure.
"""
required_size = ctypes.sizeof(ds.D1553Payload)
actual_size = len(block_data_bytes)
if actual_size < required_size:
log.warning(
f"D1553 block is too small for D1553Payload. Size: {actual_size}, Required: {required_size}."
)
return ds.D1553Block(
block_name=block_name, block_size_words=block_size_words, is_valid=False
)
try:
# We can directly map the data since the D1553Payload should be the full content
payload = ds.D1553Payload.from_buffer_copy(block_data_bytes)
return ds.D1553Block(
block_name=block_name,
block_size_words=block_size_words,
is_valid=True,
payload=payload,
)
except Exception as e:
log.error(f"Failed to map data to D1553Payload: {e}", exc_info=True)
return ds.D1553Block(
block_name=block_name, block_size_words=block_size_words, is_valid=False
)
def _parse_signal_block(
block_data: np.ndarray,
block_size_words: int,
signal_type: str,
last_header: Optional[ds.DspHeaderIn],
) -> ds.SignalBlock:
if not last_header:
n_rbin, n_pri = 0, 0
else:
n_rbin = last_header.ge_header.signal_descr.packet_descr.nrbin
n_pri = last_header.ge_header.signal_descr.packet_descr.npri
empty_block = ds.SignalBlock(
block_name=signal_type,
block_size_words=block_size_words,
signal_type=signal_type,
iq_data=np.array([]),
)
if n_rbin <= 0 or n_pri <= 0:
return empty_block
marker_indices = np.where(block_data == ds.SIGNAL_DATA_MARKER)[0]
if not marker_indices.size: return empty_block
signal_start_word = marker_indices[0] + 2; num_words_for_iq = ((n_rbin * n_pri * 2) + 1) // 2
if signal_start_word + num_words_for_iq > len(block_data): return empty_block
raw_signal_words = block_data[signal_start_word : signal_start_word + num_words_for_iq]
if not marker_indices.size:
return empty_block
signal_start_word = marker_indices[0] + 2
num_words_for_iq = ((n_rbin * n_pri * 2) + 1) // 2
if signal_start_word + num_words_for_iq > len(block_data):
return empty_block
raw_signal_words = block_data[
signal_start_word : signal_start_word + num_words_for_iq
]
iq_samples = raw_signal_words.view(np.int16)
if iq_samples.size % 2 != 0: iq_samples = iq_samples[:-1]
complex_signal = iq_samples[::2].astype(np.float32) + 1j * iq_samples[1::2].astype(np.float32)
if complex_signal.size != n_rbin * n_pri: return empty_block
return ds.SignalBlock(block_name=signal_type, block_size_words=block_size_words, signal_type=signal_type, iq_data=complex_signal.reshape((n_rbin, n_pri)))
if iq_samples.size % 2 != 0:
iq_samples = iq_samples[:-1]
complex_signal = iq_samples[::2].astype(np.float32) + 1j * iq_samples[1::2].astype(
np.float32
)
if complex_signal.size != n_rbin * n_pri:
return empty_block
return ds.SignalBlock(
block_name=signal_type,
block_size_words=block_size_words,
signal_type=signal_type,
iq_data=complex_signal.reshape((n_rbin, n_pri)),
)
# --- Main Dispatcher ---
def parse_block(block_id: int, block_data_numpy: np.ndarray, last_header: Optional[ds.DspHeaderIn], block_name_override: Optional[str] = None) -> Optional[ds.BaseBlock]:
def parse_block(
block_id: int,
block_data_numpy: np.ndarray,
last_header: Optional[ds.DspHeaderIn],
block_name_override: Optional[str] = None,
) -> Optional[ds.BaseBlock]:
"""
Dispatcher function. Parses a block of data based on its name/ID
by mapping it to the appropriate ctypes structure.
"""
block_name = block_name_override or ds.BLOCK_TYPE_MAP.get(block_id, f"UNKNOWN_{block_id}")
block_name = block_name_override or ds.BLOCK_TYPE_MAP.get(
block_id, f"UNKNOWN_{block_id}"
)
block_size_words = len(block_data_numpy)
block_data_bytes = block_data_numpy.tobytes()
try:
@ -161,10 +334,18 @@ def parse_block(block_id: int, block_data_numpy: np.ndarray, last_header: Option
return _parse_timer_block(block_data_bytes, block_name, block_size_words)
elif block_name == "AESA":
return _parse_aesa_block(block_data_bytes, block_name, block_size_words)
elif block_name == "D1553": # New block type
return _parse_d1553_block(block_data_bytes, block_name, block_size_words)
elif block_name in ["SUM", "GUARD", "DAZ", "DEL", "MTIFFT"]:
return _parse_signal_block(block_data_numpy, block_size_words, block_name, last_header)
return _parse_signal_block(
block_data_numpy, block_size_words, block_name, last_header
)
else:
return ds.GenericBlock(block_name=block_name, block_size_words=block_size_words)
return ds.GenericBlock(
block_name=block_name, block_size_words=block_size_words
)
except Exception as e:
log.error(f"Unhandled error in parse_block for '{block_name}': {e}", exc_info=True)
return None
log.error(
f"Unhandled error in parse_block for '{block_name}': {e}", exc_info=True
)
return None

View File

@ -5,6 +5,7 @@ from tkinter import ttk, simpledialog, messagebox
import ctypes
import copy
import re
import inspect # Import necessary for inspecting properties
from typing import List, Type, Dict, Any, Optional
from .gui_utils import center_window
@ -27,7 +28,7 @@ class EditPathDialog(tk.Toplevel):
main_frame = ttk.Frame(self, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
main_frame.columnconfigure(0, weight=1)
instructions = "Enter the full data path. Use '.' for attributes and '[index]' for arrays.\nExample 1: main_header.ge_header.mode.master_mode\nExample 2: timer_data.blob.payload.aesa_delay[0].fifo[3]"
instructions = "Enter the full data path. Use '.' for attributes and '[index]' for arrays.\nExample 1: main_header.ge_header.mode.master_mode\nExample 2: timer_data.blob.payload.aesa_delay[0].fifo[3]\nExample 3: d1553_data.latitude_deg (for calculated properties)"
ttk.Label(main_frame, text=instructions, justify=tk.LEFT).grid(
row=0, column=0, sticky="w", pady=(0, 10)
)
@ -263,7 +264,7 @@ class ProfileEditorWindow(tk.Toplevel):
values=("batch_id", "batch_id"),
)
header_root = self.fields_tree.insert(
"", "end", iid="header_data", text="Header Data (DSPHDRIN)"
"", "end", iid="header_data", text="Header Data (from DSPHDRIN)"
)
self._recursive_populate_tree_ctypes(
ds.GeHeader, header_root, "main_header.ge_header"
@ -283,19 +284,77 @@ class ProfileEditorWindow(tk.Toplevel):
aesa_root = self.fields_tree.insert(
"", "end", iid="aesa_data", text="AESA Block Data"
)
aesa_tx_root = self.fields_tree.insert(
aesa_root, "end", iid="aesa_tx", text="TX (Command)"
aesa_raw_data_root = self.fields_tree.insert(
aesa_root, "end", iid="aesa_raw_data", text="Raw Data By Subtype"
)
# Add a node for the raw_data_bytes field itself
self.fields_tree.insert(
aesa_raw_data_root,
"end",
iid="aesa_raw_bytes",
text="raw_data_bytes",
values=("raw_data_bytes", "aesa_data.raw_data_bytes"),
)
# Add a node for AesaSyntheticReport if applicable
aesa_synth_root = self.fields_tree.insert(
aesa_root, "end", iid="aesa_synth", text="Synthetic Report (256 bytes)"
)
self._recursive_populate_tree_ctypes(
ds.AntennaCmdBuffer, aesa_tx_root, "aesa_data.payload"
ds.AesaSyntheticReport, aesa_synth_root, "aesa_data.payload"
)
aesa_rx_root = self.fields_tree.insert(
aesa_root, "end", iid="aesa_rx", text="RX (Reply)"
d1553_root = self.fields_tree.insert(
"", "end", iid="d1553_data", text="D1553 Block Data"
)
d1553_raw_root = self.fields_tree.insert(d1553_root, "end", iid="d1553_payload", text="Raw Payload (D1553Payload)")
self._recursive_populate_tree_ctypes(
ds.AntennaReplyBuffer, aesa_rx_root, "aesa_data.payload"
ds.D1553Payload, d1553_raw_root, "d1553_data.payload"
)
# Now add the calculated properties
d1553_calc_root = self.fields_tree.insert(
d1553_root, "end", iid="d1553_calculated", text="Calculated Properties"
)
# To get properties, we need an instance to inspect. We use a dummy instance.
dummy_d1553_block = ds.D1553Block(
block_name="D1553", block_size_words=0, is_valid=False
)
self._recursive_populate_properties(
type(dummy_d1553_block), d1553_calc_root, "d1553_data"
)
def _recursive_populate_properties(self, class_obj: Type[Any], parent_id: str, base_path: str):
# We need an actual instance to safely call properties,
# but creating a dummy one for every class might be complex.
# Instead, we just list properties by name from the class.
for name in dir(class_obj):
if name.startswith('_'): continue
attr = getattr(class_obj, name)
# Check if it's a property (getter method)
if isinstance(attr, property):
# Optionally, we can check if it's a function, though `property` implies it.
# if inspect.isfunction(attr.fget): # Check if it has a getter
# We can add a hint to the display name or values tuple if this property
# is known to derive from specific raw fields.
# For D1553, this would be: (name, f"{base_path}.{name} (from payload.d.a4[XX])")
# For now, let's keep it generic, just indicating it's a property.
display_text = f"{name} (Property)"
# A common convention for properties is to put relevant raw data paths in their docstring.
# We could try to read it here:
# if attr.__doc__:
# # Example docstring: "Calculates latitude from payload.d.a4[23] and a4[24]"
# # You could parse this docstring for the raw data path.
# pass
self.fields_tree.insert(
parent_id, "end", iid=f"{parent_id}_{name}",
text=display_text,
values=(name, f"{base_path}.{name}") # The path is directly to the property
)
def _recursive_populate_tree_ctypes(
self, class_obj: Type[ctypes.Structure], parent_id: str, base_path: str
):