add timer block

add path for struct fileds
add reset button to profile
This commit is contained in:
VALLONGOL 2025-06-24 14:05:00 +02:00
parent ad25970cc8
commit c8c0823ff6
5 changed files with 286 additions and 287 deletions

View File

@ -57,6 +57,36 @@
"column_name": "range_scale", "column_name": "range_scale",
"data_path": "cdp_sts_results.payload.data.status_chunk.data.range_scale", "data_path": "cdp_sts_results.payload.data.status_chunk.data.range_scale",
"translate_with_enum": true "translate_with_enum": true
},
{
"column_name": "prt_num",
"data_path": "timer_data.blob.payload.prt_num",
"translate_with_enum": false
},
{
"column_name": "diff_prt_num",
"data_path": "timer_data.blob.payload.diff_prt_num",
"translate_with_enum": false
},
{
"column_name": "tcr",
"data_path": "timer_data.blob.payload.tcr",
"translate_with_enum": false
},
{
"column_name": "tpr",
"data_path": "timer_data.blob.payload.tpr",
"translate_with_enum": false
},
{
"column_name": "B_Filter",
"data_path": "timer_data.blob.payload.shift.B_Filter",
"translate_with_enum": false
},
{
"column_name": "PT_DET",
"data_path": "timer_data.blob.payload.shift.PT_DET",
"translate_with_enum": false
} }
] ]
} }

View File

@ -30,7 +30,6 @@ class CtypesStructureBase(ctypes.Structure):
_pack_ = 1 _pack_ = 1
# C-style type aliases for clarity
c_boolean = ctypes.c_uint32 c_boolean = ctypes.c_uint32
c_radians_t = ctypes.c_float c_radians_t = ctypes.c_float
c_metres_t = ctypes.c_float c_metres_t = ctypes.c_float
@ -64,9 +63,9 @@ BLOCK_TYPE_MAP = {
SIGNAL_DATA_MARKER = 1313304915 SIGNAL_DATA_MARKER = 1313304915
# --- GE_HEADER Structure Definitions (used for DSPHDRIN) --- # --- GE_HEADER Structure Definitions (used for DSPHDRIN) ---
# ... (le definizioni di GeHeader e le sue sotto-strutture restano invariate)
class HeaderInfo(CtypesStructureBase): class HeaderInfo(CtypesStructureBase):
_fields_ = [ _fields_ = [
("marker_dati_1", ctypes.c_uint), ("marker_dati_1", ctypes.c_uint),
@ -291,18 +290,7 @@ class GeneralSettings(CtypesStructureBase):
class FunctionSettings(CtypesStructureBase): class FunctionSettings(CtypesStructureBase):
_fields_ = [] # Placeholder, to be expanded later _fields_ = []
class GeHeader(CtypesStructureBase):
_fields_ = [
("header_info", HeaderInfo),
("signal_descr", SignalDescriptor),
("mode", ModeDescriptor),
("sp_settings", SpSettings),
("general_settings", GeneralSettings),
("function_settings", FunctionSettings),
]
class GeHeader(CtypesStructureBase): class GeHeader(CtypesStructureBase):
@ -317,8 +305,7 @@ class GeHeader(CtypesStructureBase):
# --- CDPSTS Block and Sub-structures (ctypes) --- # --- CDPSTS Block and Sub-structures (ctypes) ---
# ... (le definizioni di CdpStsPayload e le sue sotto-strutture restano invariate)
class SharedMemoryHeader(CtypesStructureBase): class SharedMemoryHeader(CtypesStructureBase):
_fields_ = [ _fields_ = [
("marker_low", ctypes.c_uint32), ("marker_low", ctypes.c_uint32),
@ -423,10 +410,6 @@ class IdentifierChunk(CtypesStructureBase):
_fields_ = [("raw_data", ctypes.c_byte * 24)] _fields_ = [("raw_data", ctypes.c_byte * 24)]
class RawChunk(CtypesStructureBase):
_fields_ = [("header", ChunkHeader), ("data", ctypes.c_byte * 1)] # Placeholder
def create_chunk_type(data_type): def create_chunk_type(data_type):
class Chunk(CtypesStructureBase): class Chunk(CtypesStructureBase):
_fields_ = [("header", ChunkHeader), ("data", data_type)] _fields_ = [("header", ChunkHeader), ("data", data_type)]
@ -455,6 +438,61 @@ class CdpStsPayload(CtypesStructureBase):
_fields_ = [("mem_header", SharedMemoryHeader), ("data", CdpDataLayout)] _fields_ = [("mem_header", SharedMemoryHeader), ("data", CdpDataLayout)]
# --- TIMER Block and Sub-structures (ctypes) ---
class GrifoFwBlobHeader(CtypesStructureBase):
"""Mirrors the C++ `grifo_fw_blob_header_t` struct."""
_fields_ = [
("header_marker", ctypes.c_uint32),
("header_marker_bis", ctypes.c_uint32),
("size", ctypes.c_uint32),
("work_size", ctypes.c_uint32),
("sub_marker", ctypes.c_uint32),
("counter", ctypes.c_uint32),
("timetag", ctypes.c_uint32),
("sw_reserved", ctypes.c_uint32),
]
class ShiftRegisters(CtypesStructureBase):
"""Mirrors the C++ `shift_registers_t` struct."""
_fields_ = [
("B_Filter", ctypes.c_int32),
("B_RF", ctypes.c_int32),
("PT_DET", ctypes.c_int32),
("PC_Win", ctypes.c_int32),
("RX_SYNC", ctypes.c_int32),
]
class TimerRawIf(CtypesStructureBase):
"""Mirrors the C++ `timer_raw_if_t` struct (partial)."""
_fields_ = [
("tcr", ctypes.c_uint32),
("tpr", ctypes.c_uint32),
("tor", ctypes.c_uint32),
("stsr", ctypes.c_uint32),
("prt_num", ctypes.c_uint16),
("diff_prt_num", ctypes.c_uint16),
("spares__", ctypes.c_uint32 * 3),
("shift", ShiftRegisters),
# The rest of the struct is very large, omitted for now.
]
class GrifoTimerBlob(CtypesStructureBase):
"""Mirrors the C++ `grifo_timer_blob_t` struct."""
_fields_ = [
("hdr", GrifoFwBlobHeader),
("payload", TimerRawIf),
]
# --- Top-Level Block Definitions (Python-side) --- # --- Top-Level Block Definitions (Python-side) ---
@ -467,30 +505,13 @@ class DspHeaderIn(BaseBlock):
class CdpStsBlock(BaseBlock): class CdpStsBlock(BaseBlock):
is_valid: bool is_valid: bool
payload: Optional[CdpStsPayload] = None payload: Optional[CdpStsPayload] = None
# ... properties ...
@property
def timetag_batch_id(self) -> Optional[int]:
return (
self.payload.data.timetag_chunk.data.batch_id
if self.is_valid and self.payload
else None
)
@property @dataclass
def timetag_time(self) -> Optional[int]: class TimerBlock(BaseBlock):
return ( is_valid: bool
self.payload.data.timetag_chunk.data.time blob: Optional[GrifoTimerBlob] = None
if self.is_valid and self.payload
else None
)
@property
def status(self) -> Optional[ModeStatus]:
return (
self.payload.data.status_chunk.data
if self.is_valid and self.payload
else None
)
@dataclass @dataclass
@ -509,6 +530,7 @@ class DataBatch:
batch_id: int batch_id: int
blocks: List[BaseBlock] = field(default_factory=list) blocks: List[BaseBlock] = field(default_factory=list)
cdp_sts_results: Optional[CdpStsBlock] = None cdp_sts_results: Optional[CdpStsBlock] = None
timer_data: Optional[TimerBlock] = None # Added field for Timer data
@property @property
def main_header(self) -> Optional[DspHeaderIn]: def main_header(self) -> Optional[DspHeaderIn]:

View File

@ -172,7 +172,9 @@ class RadarFileReader:
[] []
) # (start_offset_words, size_words, name) ) # (start_offset_words, size_words, name)
def _parse_fw_block_header(self, start_index: int) -> Optional[Tuple[int, int, str]]: def _parse_fw_block_header(
self, start_index: int
) -> Optional[Tuple[int, int, str]]:
"""Parses a firmware block header (marker 0x7A7A7A7A).""" """Parses a firmware block header (marker 0x7A7A7A7A)."""
try: try:
name_id = self.data_vector[start_index + FW_NAME_OFFSET_WORDS] name_id = self.data_vector[start_index + FW_NAME_OFFSET_WORDS]
@ -184,17 +186,23 @@ class RadarFileReader:
payload_size_bytes = self.data_vector[start_index + FW_SIZE_OFFSET_WORDS] payload_size_bytes = self.data_vector[start_index + FW_SIZE_OFFSET_WORDS]
if not (0 < payload_size_bytes < (20 * 1024 * 1024)): if not (0 < payload_size_bytes < (20 * 1024 * 1024)):
self.log.debug(f"Invalid firmware payload size {payload_size_bytes} at word {start_index}.") self.log.debug(
f"Invalid firmware payload size {payload_size_bytes} at word {start_index}."
)
return None return None
payload_size_words = (payload_size_bytes + 3) // 4 payload_size_words = (payload_size_bytes + 3) // 4
payload_start_offset = start_index + FW_HEADER_SIZE_WORDS payload_start_offset = start_index + FW_HEADER_SIZE_WORDS
return payload_start_offset, payload_size_words, block_name return payload_start_offset, payload_size_words, block_name
except IndexError: except IndexError:
self.log.warning(f"IndexError while parsing firmware block at word {start_index}.") self.log.warning(
f"IndexError while parsing firmware block at word {start_index}."
)
return None return None
def _parse_fw_block_header(self, start_index: int) -> Optional[Tuple[int, int, str]]: def _parse_fw_block_header(
self, start_index: int
) -> Optional[Tuple[int, int, str]]:
"""Parses a firmware block header (marker 0x7A7A7A7A).""" """Parses a firmware block header (marker 0x7A7A7A7A)."""
try: try:
name_id = self.data_vector[start_index + FW_NAME_OFFSET_WORDS] name_id = self.data_vector[start_index + FW_NAME_OFFSET_WORDS]
@ -206,36 +214,48 @@ class RadarFileReader:
payload_size_bytes = self.data_vector[start_index + FW_SIZE_OFFSET_WORDS] payload_size_bytes = self.data_vector[start_index + FW_SIZE_OFFSET_WORDS]
if not (0 < payload_size_bytes < (20 * 1024 * 1024)): if not (0 < payload_size_bytes < (20 * 1024 * 1024)):
self.log.debug(f"Invalid firmware payload size {payload_size_bytes} at word {start_index}.") self.log.debug(
f"Invalid firmware payload size {payload_size_bytes} at word {start_index}."
)
return None return None
payload_size_words = (payload_size_bytes + 3) // 4 payload_size_words = (payload_size_bytes + 3) // 4
payload_start_offset = start_index + FW_HEADER_SIZE_WORDS payload_start_offset = start_index + FW_HEADER_SIZE_WORDS
return payload_start_offset, payload_size_words, block_name return payload_start_offset, payload_size_words, block_name
except IndexError: except IndexError:
self.log.warning(f"IndexError while parsing firmware block at word {start_index}.") self.log.warning(
f"IndexError while parsing firmware block at word {start_index}."
)
return None return None
def _parse_legacy_block_header(self, start_index: int) -> Optional[Tuple[int, int, str]]: def _parse_legacy_block_header(
self, start_index: int
) -> Optional[Tuple[int, int, str]]:
"""Parses a legacy block header (marker 0x5A5A5A5A).""" """Parses a legacy block header (marker 0x5A5A5A5A)."""
try: try:
name_id = self.data_vector[start_index + LEGACY_NAME_OFFSET_WORDS] name_id = self.data_vector[start_index + LEGACY_NAME_OFFSET_WORDS]
block_name = ds.BLOCK_TYPE_MAP.get(name_id) block_name = ds.BLOCK_TYPE_MAP.get(name_id)
if not block_name: if not block_name:
self.log.debug(f"Found unknown Legacy block with ID: {name_id}") self.log.debug(f"Found unknown Legacy block with ID: {name_id}")
block_name = f"UNKNOWN_{name_id}" block_name = f"UNKNOWN_{name_id}"
payload_size_bytes = self.data_vector[start_index + LEGACY_SIZE_OFFSET_WORDS] payload_size_bytes = self.data_vector[
start_index + LEGACY_SIZE_OFFSET_WORDS
]
if not (0 < payload_size_bytes < (20 * 1024 * 1024)): if not (0 < payload_size_bytes < (20 * 1024 * 1024)):
self.log.debug(f"Invalid legacy payload size {payload_size_bytes} at word {start_index}.") self.log.debug(
f"Invalid legacy payload size {payload_size_bytes} at word {start_index}."
)
return None return None
payload_size_words = (payload_size_bytes + 3) // 4 payload_size_words = (payload_size_bytes + 3) // 4
payload_start_offset = start_index payload_start_offset = start_index
return payload_start_offset, payload_size_words, block_name return payload_start_offset, payload_size_words, block_name
except IndexError: except IndexError:
self.log.warning(f"IndexError while parsing legacy block at word {start_index}.") self.log.warning(
f"IndexError while parsing legacy block at word {start_index}."
)
return None return None
def load_and_find_blocks(self) -> bool: def load_and_find_blocks(self) -> bool:
@ -301,9 +321,7 @@ class RadarFileReader:
total_file_words = self.data_vector.size total_file_words = self.data_vector.size
batch_counter = 0 batch_counter = 0
for block_num, (start_offset_words, size_words, block_name) in enumerate( for block_num, (start_offset_words, size_words, block_name) in enumerate(self.block_metadata):
self.block_metadata
):
blocks_processed_so_far = block_num + 1 blocks_processed_so_far = block_num + 1
if start_offset_words + size_words > total_file_words: if start_offset_words + size_words > total_file_words:
@ -313,17 +331,10 @@ class RadarFileReader:
stats["skipped_blocks"] += 1 stats["skipped_blocks"] += 1
continue continue
# The block ID from the name is not strictly necessary anymore but can be useful block_id = next((id for id, name in ds.BLOCK_TYPE_MAP.items() if name == block_name), 0)
block_id = next( block_data_slice = self.data_vector[start_offset_words : start_offset_words + size_words]
(id for id, name in ds.BLOCK_TYPE_MAP.items() if name == block_name), 0
)
block_data_slice = self.data_vector[
start_offset_words : start_offset_words + size_words
]
parsed_block = parse_block( parsed_block = parse_block(block_id, block_data_slice, last_header, block_name_override=block_name)
block_id, block_data_slice, last_header, block_name_override=block_name
)
if parsed_block is None: if parsed_block is None:
stats["failed_to_parse_blocks"] += 1 stats["failed_to_parse_blocks"] += 1
@ -342,6 +353,8 @@ class RadarFileReader:
current_batch.blocks.append(parsed_block) current_batch.blocks.append(parsed_block)
if isinstance(parsed_block, ds.CdpStsBlock): if isinstance(parsed_block, ds.CdpStsBlock):
current_batch.cdp_sts_results = parsed_block current_batch.cdp_sts_results = parsed_block
elif isinstance(parsed_block, ds.TimerBlock):
current_batch.timer_data = parsed_block
if current_batch: if current_batch:
yield current_batch, len(self.block_metadata) yield current_batch, len(self.block_metadata)

View File

@ -19,15 +19,12 @@ def _parse_ge_header_block(block_data_bytes: bytes) -> Optional[ds.DspHeaderIn]:
""" """
Parses the DSPHDRIN block by mapping it to the GeHeader ctypes structure. Parses the DSPHDRIN block by mapping it to the GeHeader ctypes structure.
""" """
# This header is embedded deep inside the block data GE_HEADER_START_OFFSET_BYTES = 136
GE_HEADER_START_OFFSET_BYTES = 34 * 4 # 136 bytes
GE_HEADER_SIZE = ctypes.sizeof(ds.GeHeader) GE_HEADER_SIZE = ctypes.sizeof(ds.GeHeader)
if len(block_data_bytes) < GE_HEADER_START_OFFSET_BYTES + GE_HEADER_SIZE: if len(block_data_bytes) < GE_HEADER_START_OFFSET_BYTES + GE_HEADER_SIZE:
log.warning( log.warning(
f"DSPHDRIN block is too small to contain a full GeHeader. " f"DSPHDRIN block too small for GeHeader. Size: {len(block_data_bytes)}, Required: {GE_HEADER_START_OFFSET_BYTES + GE_HEADER_SIZE}"
f"Size: {len(block_data_bytes)} bytes, Required minimum: "
f"{GE_HEADER_START_OFFSET_BYTES + GE_HEADER_SIZE} bytes."
) )
return None return None
@ -35,79 +32,97 @@ def _parse_ge_header_block(block_data_bytes: bytes) -> Optional[ds.DspHeaderIn]:
ge_header_struct = ds.GeHeader.from_buffer_copy( ge_header_struct = ds.GeHeader.from_buffer_copy(
block_data_bytes, GE_HEADER_START_OFFSET_BYTES block_data_bytes, GE_HEADER_START_OFFSET_BYTES
) )
return ds.DspHeaderIn(
parsed_block = ds.DspHeaderIn(
block_name="DSPHDRIN", block_name="DSPHDRIN",
block_size_words=len(block_data_bytes) // 4, block_size_words=len(block_data_bytes) // 4,
ge_header=ge_header_struct, ge_header=ge_header_struct,
) )
return parsed_block
except (ValueError, TypeError) as e: except (ValueError, TypeError) as e:
log.error( log.error(f"Failed to map data to GeHeader: {e}", exc_info=True)
f"Failed to map data to GeHeader ctypes structure: {e}", exc_info=True
)
return None return None
def _parse_cdpsts_block( def _parse_cdpsts_block(
block_data_bytes: bytes, block_name: str, block_size_words: int block_data_bytes: bytes, block_name: str, block_size_words: int
) -> Optional[ds.CdpStsBlock]: ) -> ds.CdpStsBlock:
""" """
Parses a CDPSTS block payload. It expects a CdpStsPayload structure Parses a CDPSTS block payload, expecting a CdpStsPayload structure
to be embedded at a fixed offset within the block data. to be embedded at a fixed offset.
""" """
# From hexdump analysis, the actual payload starts at a fixed offset.
PAYLOAD_START_OFFSET_BYTES = 144 PAYLOAD_START_OFFSET_BYTES = 144
required_size = PAYLOAD_START_OFFSET_BYTES + ctypes.sizeof(ds.CdpStsPayload) required_size = PAYLOAD_START_OFFSET_BYTES + ctypes.sizeof(ds.CdpStsPayload)
if len(block_data_bytes) < required_size: if len(block_data_bytes) < required_size:
log.warning( log.warning(
f"CDPSTS block is too small to contain embedded payload. " f"CDPSTS block too small for embedded payload. Size: {len(block_data_bytes)}, Required: {required_size}"
f"Size: {len(block_data_bytes)}, Required minimum: {required_size}"
) )
return ds.CdpStsBlock( return ds.CdpStsBlock(
block_name=block_name, block_size_words=block_size_words, is_valid=False block_name=block_name, block_size_words=block_size_words, is_valid=False
) )
try: try:
# We apply the ctypes structure starting from the known offset.
payload_struct = ds.CdpStsPayload.from_buffer_copy( payload_struct = ds.CdpStsPayload.from_buffer_copy(
block_data_bytes, PAYLOAD_START_OFFSET_BYTES block_data_bytes, PAYLOAD_START_OFFSET_BYTES
) )
is_valid = ( is_valid = (
payload_struct.mem_header.marker_low == 0x5A5AA5A5 payload_struct.mem_header.marker_low == 0x5A5AA5A5
and payload_struct.mem_header.marker_high == 0x12345678 and payload_struct.mem_header.marker_high == 0x12345678
) )
if not is_valid: if not is_valid:
# This case should now be much rarer, indicating a truly malformed block. log.debug(f"CDPSTS block at offset has invalid shared memory marker.")
log.warning(
f"CDPSTS block found, but its embedded shared memory header marker is invalid. "
f"Read low=0x{payload_struct.mem_header.marker_low:X}, high=0x{payload_struct.mem_header.marker_high:X}"
)
parsed_block = ds.CdpStsBlock( return ds.CdpStsBlock(
block_name=block_name, block_name=block_name,
block_size_words=block_size_words, block_size_words=block_size_words,
is_valid=is_valid, is_valid=is_valid,
payload=payload_struct, payload=payload_struct,
) )
except Exception as e:
log.error(f"Failed to map data to CdpStsPayload: {e}", exc_info=True)
return ds.CdpStsBlock(
block_name=block_name, block_size_words=block_size_words, is_valid=False
)
if is_valid:
def _parse_timer_block(
block_data_bytes: bytes, block_name: str, block_size_words: int
) -> ds.TimerBlock:
"""
Parses a TIMER block by mapping it to the GrifoTimerBlob ctypes structure.
"""
required_size = ctypes.sizeof(ds.GrifoTimerBlob)
if len(block_data_bytes) < required_size:
log.warning(
f"TIMER block is too small for GrifoTimerBlob. Size: {len(block_data_bytes)}, Required: {required_size}"
)
return ds.TimerBlock(
block_name=block_name, block_size_words=block_size_words, is_valid=False
)
try:
timer_blob = ds.GrifoTimerBlob.from_buffer_copy(block_data_bytes)
# Validate using the internal header marker
is_valid = (
timer_blob.hdr.header_marker == 0x12345678
and timer_blob.hdr.sub_marker == 0x54494D45
) # 'TIME'
if not is_valid:
log.debug( log.debug(
f"Successfully parsed a valid CDPSTS block. Batch ID: {parsed_block.timetag_batch_id}" "TIMER block has an invalid internal Grifo FW blob header marker."
) )
return parsed_block return ds.TimerBlock(
block_name=block_name,
except Exception as e: block_size_words=block_size_words,
log.error( is_valid=is_valid,
f"Failed to map data to CdpStsPayload ctypes structure: {e}", exc_info=True blob=timer_blob,
) )
return ds.CdpStsBlock( except Exception as e:
log.error(f"Failed to map data to GrifoTimerBlob: {e}", exc_info=True)
return ds.TimerBlock(
block_name=block_name, block_size_words=block_size_words, is_valid=False block_name=block_name, block_size_words=block_size_words, is_valid=False
) )
@ -120,15 +135,14 @@ def _parse_signal_block(
) -> ds.SignalBlock: ) -> ds.SignalBlock:
"""Parses a block of I/Q signal data (SUM, GUARD, etc.).""" """Parses a block of I/Q signal data (SUM, GUARD, etc.)."""
log.debug(f"Parsing signal block of type '{signal_type}'.") log.debug(f"Parsing signal block of type '{signal_type}'.")
if not last_header:
if last_header:
n_rbin = last_header.ge_header.signal_descr.packet_descr.nrbin
n_pri = last_header.ge_header.signal_descr.packet_descr.npri
else:
log.warning( log.warning(
f"Cannot parse signal block '{signal_type}' without a preceding DSPHDRIN." f"Cannot parse signal block '{signal_type}' without a preceding DSPHDRIN."
) )
n_rbin, n_pri = 0, 0 n_rbin, n_pri = 0, 0
else:
n_rbin = last_header.ge_header.signal_descr.packet_descr.nrbin
n_pri = last_header.ge_header.signal_descr.packet_descr.npri
empty_block = ds.SignalBlock( empty_block = ds.SignalBlock(
block_name=signal_type, block_name=signal_type,
@ -136,7 +150,6 @@ def _parse_signal_block(
signal_type=signal_type, signal_type=signal_type,
iq_data=np.array([]), iq_data=np.array([]),
) )
if n_rbin <= 0 or n_pri <= 0: if n_rbin <= 0 or n_pri <= 0:
return empty_block return empty_block
@ -146,7 +159,6 @@ def _parse_signal_block(
signal_start_word = marker_indices[0] + 2 signal_start_word = marker_indices[0] + 2
num_words_for_iq = ((n_rbin * n_pri * 2) + 1) // 2 num_words_for_iq = ((n_rbin * n_pri * 2) + 1) // 2
if signal_start_word + num_words_for_iq > len(block_data): if signal_start_word + num_words_for_iq > len(block_data):
return empty_block return empty_block
@ -154,14 +166,12 @@ def _parse_signal_block(
signal_start_word : signal_start_word + num_words_for_iq signal_start_word : signal_start_word + num_words_for_iq
] ]
iq_samples = raw_signal_words.view(np.int16) iq_samples = raw_signal_words.view(np.int16)
if iq_samples.size % 2 != 0: if iq_samples.size % 2 != 0:
iq_samples = iq_samples[:-1] iq_samples = iq_samples[:-1]
complex_signal = iq_samples[::2].astype(np.float32) + 1j * iq_samples[1::2].astype( complex_signal = iq_samples[::2].astype(np.float32) + 1j * iq_samples[1::2].astype(
np.float32 np.float32
) )
if complex_signal.size != n_rbin * n_pri: if complex_signal.size != n_rbin * n_pri:
return empty_block return empty_block
@ -173,6 +183,9 @@ def _parse_signal_block(
) )
# --- Main Dispatcher ---
def parse_block( def parse_block(
block_id: int, block_id: int,
block_data_numpy: np.ndarray, block_data_numpy: np.ndarray,
@ -192,22 +205,20 @@ def parse_block(
try: try:
if block_name == "DSPHDRIN": if block_name == "DSPHDRIN":
return _parse_ge_header_block(block_data_bytes) return _parse_ge_header_block(block_data_bytes)
elif block_name == "CDPSTS": elif block_name == "CDPSTS":
return _parse_cdpsts_block(block_data_bytes, block_name, block_size_words) return _parse_cdpsts_block(block_data_bytes, block_name, block_size_words)
elif block_name == "TIMER":
return _parse_timer_block(block_data_bytes, block_name, block_size_words)
elif block_name in ["SUM", "GUARD", "DAZ", "DEL", "MTIFFT"]: elif block_name in ["SUM", "GUARD", "DAZ", "DEL", "MTIFFT"]:
return _parse_signal_block( return _parse_signal_block(
block_data_numpy, block_size_words, block_name, last_header block_data_numpy, block_size_words, block_name, last_header
) )
else:
else: # Handles all other cases, known and unknown, as generic blocks
return ds.GenericBlock( return ds.GenericBlock(
block_name=block_name, block_size_words=block_size_words block_name=block_name, block_size_words=block_size_words
) )
except Exception as e: except Exception as e:
log.error( log.error(
f"Failed to parse block '{block_name}' (ID: {block_id}): {e}", exc_info=True f"Unhandled error in parse_block for '{block_name}': {e}", exc_info=True
) )
return None return None

View File

@ -40,7 +40,7 @@ class ProfileEditorWindow(tk.Toplevel):
def _init_window(self): def _init_window(self):
"""Initializes window properties.""" """Initializes window properties."""
self.title("Export Profile Editor") self.title("Export Profile Editor")
self.geometry("1000x600") self.geometry("1200x700") # Increased width for new column
self.transient(self.master) self.transient(self.master)
self.grab_set() self.grab_set()
@ -55,7 +55,7 @@ class ProfileEditorWindow(tk.Toplevel):
# --- Left Frame: Profile Management --- # --- Left Frame: Profile Management ---
profile_mgmt_frame = ttk.LabelFrame(main_pane, text="Profiles") profile_mgmt_frame = ttk.LabelFrame(main_pane, text="Profiles")
main_pane.add(profile_mgmt_frame, weight=1) main_pane.add(profile_mgmt_frame, weight=2) # Adjusted weight
profile_mgmt_frame.columnconfigure(0, weight=1) profile_mgmt_frame.columnconfigure(0, weight=1)
cb_frame = ttk.Frame(profile_mgmt_frame) cb_frame = ttk.Frame(profile_mgmt_frame)
@ -70,29 +70,23 @@ class ProfileEditorWindow(tk.Toplevel):
btn_frame = ttk.Frame(profile_mgmt_frame) btn_frame = ttk.Frame(profile_mgmt_frame)
btn_frame.grid(row=1, column=0, sticky="ew", padx=5) btn_frame.grid(row=1, column=0, sticky="ew", padx=5)
btn_frame.columnconfigure((0, 1), weight=1) btn_frame.columnconfigure((0, 1), weight=1)
ttk.Button(btn_frame, text="New", command=self._on_new_profile).grid( ttk.Button(btn_frame, text="New", command=self._on_new_profile).grid(row=0, column=0, sticky="ew", padx=2)
row=0, column=0, sticky="ew", padx=2 ttk.Button(btn_frame, text="Delete", command=self._on_delete_profile).grid(row=0, column=1, sticky="ew", padx=2)
)
ttk.Button(btn_frame, text="Delete", command=self._on_delete_profile).grid(
row=0, column=1, sticky="ew", padx=2
)
# --- Middle Frame: Available Fields --- # --- Middle Frame: Available Fields ---
fields_frame = ttk.LabelFrame(main_pane, text="Available Fields") fields_frame = ttk.LabelFrame(main_pane, text="Available Fields")
main_pane.add(fields_frame, weight=2) main_pane.add(fields_frame, weight=3) # Adjusted weight
fields_frame.rowconfigure(0, weight=1) fields_frame.rowconfigure(0, weight=1)
fields_frame.columnconfigure(0, weight=1) fields_frame.columnconfigure(0, weight=1)
self.fields_tree = ttk.Treeview(fields_frame, selectmode="browse") self.fields_tree = ttk.Treeview(fields_frame, selectmode="browse")
self.fields_tree.grid(row=0, column=0, sticky="nsew", padx=5, pady=5) self.fields_tree.grid(row=0, column=0, sticky="nsew", padx=5, pady=5)
ysb = ttk.Scrollbar( ysb = ttk.Scrollbar(fields_frame, orient="vertical", command=self.fields_tree.yview)
fields_frame, orient="vertical", command=self.fields_tree.yview
)
self.fields_tree.configure(yscrollcommand=ysb.set) self.fields_tree.configure(yscrollcommand=ysb.set)
ysb.grid(row=0, column=1, sticky="ns") ysb.grid(row=0, column=1, sticky="ns")
# --- Right Frame: Selected Fields and Actions --- # --- Right Frame: Selected Fields and Actions ---
selected_frame_container = ttk.Frame(main_pane) selected_frame_container = ttk.Frame(main_pane)
main_pane.add(selected_frame_container, weight=3) main_pane.add(selected_frame_container, weight=5) # Adjusted weight
selected_frame_container.rowconfigure(0, weight=1) selected_frame_container.rowconfigure(0, weight=1)
selected_frame_container.columnconfigure(1, weight=1) selected_frame_container.columnconfigure(1, weight=1)
@ -100,62 +94,69 @@ class ProfileEditorWindow(tk.Toplevel):
action_btn_frame.grid(row=0, column=0, sticky="ns", padx=5, pady=5) action_btn_frame.grid(row=0, column=0, sticky="ns", padx=5, pady=5)
ttk.Button(action_btn_frame, text=">>", command=self._add_field).grid(pady=5) ttk.Button(action_btn_frame, text=">>", command=self._add_field).grid(pady=5)
ttk.Button(action_btn_frame, text="<<", command=self._remove_field).grid(pady=5) ttk.Button(action_btn_frame, text="<<", command=self._remove_field).grid(pady=5)
ttk.Button(action_btn_frame, text="Up", command=lambda: self._move_field(-1)).grid(pady=20)
ttk.Button(action_btn_frame, text="Down", command=lambda: self._move_field(1)).grid(pady=5)
ttk.Button( ttk.Button(
action_btn_frame, text="Up", command=lambda: self._move_field(-1) action_btn_frame, text="Reset", command=self._clear_selected_fields
).grid(pady=20) ).grid(pady=20)
ttk.Button(
action_btn_frame, text="Down", command=lambda: self._move_field(1)
).grid(pady=5)
selected_fields_frame = ttk.LabelFrame( selected_fields_frame = ttk.LabelFrame(selected_frame_container, text="Selected Fields for Profile")
selected_frame_container, text="Selected Fields for Profile"
)
selected_fields_frame.grid(row=0, column=1, sticky="nsew") selected_fields_frame.grid(row=0, column=1, sticky="nsew")
selected_fields_frame.rowconfigure(0, weight=1) selected_fields_frame.rowconfigure(0, weight=1)
selected_fields_frame.columnconfigure(0, weight=1) selected_fields_frame.columnconfigure(0, weight=1)
self.selected_tree = ttk.Treeview( self.selected_tree = ttk.Treeview(
selected_fields_frame, selected_fields_frame,
columns=("display_name", "translate"), columns=("display_name", "data_path", "translate"), # Added data_path column
show="headings", show="headings",
selectmode="browse", selectmode="browse",
) )
self.selected_tree.heading("display_name", text="Field Name") self.selected_tree.heading("display_name", text="Field Name")
self.selected_tree.heading("data_path", text="Source Path") # New header
self.selected_tree.heading("translate", text="Translate") self.selected_tree.heading("translate", text="Translate")
self.selected_tree.column("display_name", width=200, stretch=True) self.selected_tree.column("display_name", width=150, stretch=True)
self.selected_tree.column( self.selected_tree.column("data_path", width=250, stretch=True) # New column config
"translate", width=100, anchor="center", stretch=False self.selected_tree.column("translate", width=80, anchor="center", stretch=False)
)
self.selected_tree.grid(row=0, column=0, sticky="nsew") self.selected_tree.grid(row=0, column=0, sticky="nsew")
self.selected_tree.bind("<Button-1>", self._on_selected_tree_click) self.selected_tree.bind("<Button-1>", self._on_selected_tree_click)
# --- Bottom Frame: Save/Cancel --- # --- Bottom Frame: Save/Cancel ---
bottom_frame = ttk.Frame(self) bottom_frame = ttk.Frame(self)
bottom_frame.pack(fill=tk.X, padx=10, pady=(0, 10)) bottom_frame.pack(fill=tk.X, padx=10, pady=(0, 10))
ttk.Button( ttk.Button(bottom_frame, text="Save & Close", command=self._on_save_and_close).pack(side=tk.RIGHT)
bottom_frame, text="Save & Close", command=self._on_save_and_close ttk.Button(bottom_frame, text="Cancel", command=self._on_close).pack(side=tk.RIGHT, padx=5)
).pack(side=tk.RIGHT)
ttk.Button(bottom_frame, text="Cancel", command=self._on_close).pack( def _clear_selected_fields(self):
side=tk.RIGHT, padx=5 """Clears all fields from the currently selected profile."""
)
def _on_selected_tree_click(self, event):
region = self.selected_tree.identify_region(event.x, event.y)
if region != "cell":
return
column_id = self.selected_tree.identify_column(event.x)
if column_id != "#2":
return
item_id = self.selected_tree.identify_row(event.y)
if not item_id:
return
profile = self._get_current_profile() profile = self._get_current_profile()
if not profile: if not profile:
return return
if not profile.fields: # Do nothing if already empty
return
if messagebox.askyesno(
"Confirm Clear",
f"Are you sure you want to remove all fields from the profile '{profile.name}'?",
parent=self
):
profile.fields.clear()
self._load_profile_into_ui()
def _on_selected_tree_click(self, event):
region = self.selected_tree.identify_region(event.x, event.y)
if region != "cell": return
column_id = self.selected_tree.identify_column(event.x)
if column_id != "#3": # Column is now the 3rd one
return
item_id = self.selected_tree.identify_row(event.y)
if not item_id: return
profile = self._get_current_profile()
if not profile: return
field_index = int(item_id) field_index = int(item_id)
field = profile.fields[field_index] field = profile.fields[field_index]
@ -165,85 +166,40 @@ class ProfileEditorWindow(tk.Toplevel):
def _populate_available_fields_tree(self): def _populate_available_fields_tree(self):
self.fields_tree.delete(*self.fields_tree.get_children()) self.fields_tree.delete(*self.fields_tree.get_children())
batch_root = self.fields_tree.insert("", "end", iid="batch_properties", text="Batch Properties")
self.fields_tree.insert(batch_root, "end", iid="batch_id", text="batch_id", values=("batch_id", "batch_id"))
header_root = self.fields_tree.insert("", "end", iid="header_data", text="Header Data (from DSPHDRIN)")
self._recursive_populate_tree_ctypes(ds.GeHeader, header_root, "main_header.ge_header")
# --- Batch Properties (standalone fields) --- cdpsts_root = self.fields_tree.insert("", "end", iid="cdpsts_data", text="CDP/STS Block Data")
batch_root = self.fields_tree.insert( self._recursive_populate_tree_ctypes(ds.CdpDataLayout, cdpsts_root, "cdp_sts_results.payload.data")
"", "end", iid="batch_properties", text="Batch Properties"
)
self.fields_tree.insert(
batch_root,
"end",
iid="batch_id",
text="batch_id",
values=("batch_id", "batch_id"),
)
# --- DSPHDRIN Data --- timer_root = self.fields_tree.insert("", "end", iid="timer_data", text="Timer Block Data")
header_root = self.fields_tree.insert( self._recursive_populate_tree_ctypes(ds.GrifoTimerBlob, timer_root, "timer_data.blob")
"", "end", iid="header_data", text="Header Data (from DSPHDRIN)"
)
self._recursive_populate_tree_ctypes(
ds.GeHeader, header_root, "main_header.ge_header"
)
# --- CDPSTS Data --- def _recursive_populate_tree_ctypes(self, class_obj: Type[ctypes.Structure], parent_id: str, base_path: str):
cdpsts_root = self.fields_tree.insert( if not hasattr(class_obj, '_fields_'): return
"", "end", iid="cdpsts_data", text="CDP/STS Block Data"
)
# Explore the CdpDataLayout, which is nested inside cdp_sts_results.payload
self._recursive_populate_tree_ctypes(
ds.CdpDataLayout, cdpsts_root, "cdp_sts_results.payload.data"
)
def _recursive_populate_tree_ctypes(
self, class_obj: Type[ctypes.Structure], parent_id: str, base_path: str
):
"""Recursively populates a Treeview with fields from a ctypes.Structure."""
if not hasattr(class_obj, "_fields_"):
return
for field_name, field_type in class_obj._fields_: for field_name, field_type in class_obj._fields_:
current_path = f"{base_path}.{field_name}" current_path = f"{base_path}.{field_name}"
node_id = f"{parent_id}_{field_name}" node_id = f"{parent_id}_{field_name}"
# Check if the field type is another ctypes.Structure if hasattr(field_type, '_fields_'):
if hasattr(field_type, "_fields_"): child_node = self.fields_tree.insert(parent_id, "end", iid=node_id, text=field_name)
child_node = self.fields_tree.insert( self._recursive_populate_tree_ctypes(field_type, child_node, current_path)
parent_id, "end", iid=node_id, text=field_name elif hasattr(field_type, '_length_'):
) self.fields_tree.insert(parent_id, "end", iid=node_id, text=f"{field_name} [Array]", values=(field_name, current_path))
self._recursive_populate_tree_ctypes(
field_type, child_node, current_path
)
# Check if it's a ctypes array
elif hasattr(field_type, "_length_"):
# For arrays, we don't expand further in the tree, just show it's an array
self.fields_tree.insert(
parent_id,
"end",
iid=node_id,
text=f"{field_name} [Array]",
values=(field_name, current_path),
)
# It's a primitive ctypes field (c_int, c_float, etc.)
else: else:
display_text = f"{field_name}" display_text = f"{field_name}"
if current_path in ENUM_REGISTRY: if current_path in ENUM_REGISTRY: display_text += " (Enum)"
display_text += " (Enum)" self.fields_tree.insert(parent_id, "end", iid=node_id, text=display_text, values=(field_name, current_path))
self.fields_tree.insert(
parent_id,
"end",
iid=node_id,
text=display_text,
values=(field_name, current_path),
)
def _load_profiles_to_combobox(self): def _load_profiles_to_combobox(self):
profile_names = [p.name for p in self.profiles] profile_names = [p.name for p in self.profiles]
self.profile_combobox["values"] = profile_names self.profile_combobox["values"] = profile_names
if profile_names: if profile_names: self.selected_profile_name.set(profile_names[0])
self.selected_profile_name.set(profile_names[0])
self._load_profile_into_ui() self._load_profile_into_ui()
def _get_current_profile(self) -> Optional[ExportProfile]: def _get_current_profile(self) -> Optional[ExportProfile]:
@ -258,40 +214,37 @@ class ProfileEditorWindow(tk.Toplevel):
self.selected_tree.delete(i) self.selected_tree.delete(i)
profile = self._get_current_profile() profile = self._get_current_profile()
if not profile: if not profile: return
return
for index, field in enumerate(profile.fields): for index, field in enumerate(profile.fields):
is_translatable = field.data_path in ENUM_REGISTRY is_translatable = field.data_path in ENUM_REGISTRY
checkbox_char = "" checkbox_char = ""
if is_translatable: if is_translatable:
checkbox_char = "" if field.translate_with_enum else "" checkbox_char = "" if field.translate_with_enum else ""
# Show a more readable source path
source_display = '.'.join(field.data_path.split('.')[:2])
self.selected_tree.insert( self.selected_tree.insert(
"", "end", iid=str(index), values=(field.column_name, checkbox_char) "", "end", iid=str(index),
values=(field.column_name, source_display, checkbox_char)
) )
def _add_field(self): def _add_field(self):
selected_item_id = self.fields_tree.focus() selected_item_id = self.fields_tree.focus()
if not selected_item_id: if not selected_item_id: return
return
item_values = self.fields_tree.item(selected_item_id, "values") item_values = self.fields_tree.item(selected_item_id, "values")
if not item_values or len(item_values) < 2: if not item_values or len(item_values) < 2:
messagebox.showinfo( messagebox.showinfo("Cannot Add Field", "Please select a specific data field.", parent=self)
"Cannot Add Field", "Please select a specific data field.", parent=self
)
return return
column_name, data_path = item_values column_name, data_path = item_values
profile = self._get_current_profile() profile = self._get_current_profile()
if not profile: if not profile: return
return
if any(f.data_path == data_path for f in profile.fields): if any(f.data_path == data_path for f in profile.fields):
messagebox.showinfo( messagebox.showinfo("Duplicate Field", "This field is already in the profile.", parent=self)
"Duplicate Field", "This field is already in the profile.", parent=self
)
return return
profile.fields.append(ExportField(column_name=column_name, data_path=data_path)) profile.fields.append(ExportField(column_name=column_name, data_path=data_path))
@ -299,49 +252,31 @@ class ProfileEditorWindow(tk.Toplevel):
def _remove_field(self): def _remove_field(self):
selection = self.selected_tree.selection() selection = self.selected_tree.selection()
if not selection: if not selection: return
return
index_to_remove = int(selection[0]) index_to_remove = int(selection[0])
profile = self._get_current_profile() profile = self._get_current_profile()
if not profile: if not profile: return
return
del profile.fields[index_to_remove] del profile.fields[index_to_remove]
self._load_profile_into_ui() self._load_profile_into_ui()
def _move_field(self, direction: int): def _move_field(self, direction: int):
selection = self.selected_tree.selection() selection = self.selected_tree.selection()
if not selection: if not selection: return
return
index = int(selection[0]) index = int(selection[0])
new_index = index + direction new_index = index + direction
profile = self._get_current_profile() profile = self._get_current_profile()
if not profile or not (0 <= new_index < len(profile.fields)): if not profile or not (0 <= new_index < len(profile.fields)): return
return
fields = profile.fields fields = profile.fields
fields.insert(new_index, fields.pop(index)) fields.insert(new_index, fields.pop(index))
self._load_profile_into_ui() self._load_profile_into_ui()
self.selected_tree.selection_set(str(new_index)) self.selected_tree.selection_set(str(new_index))
def _on_new_profile(self): def _on_new_profile(self):
name = simpledialog.askstring( name = simpledialog.askstring("New Profile", "Enter a name for the new profile:", parent=self)
"New Profile", "Enter a name for the new profile:", parent=self if not name or not name.strip(): return
)
if not name or not name.strip():
return
if any(p.name == name for p in self.profiles): if any(p.name == name for p in self.profiles):
messagebox.showerror( messagebox.showerror("Error", f"A profile with the name '{name}' already exists.", parent=self)
"Error",
f"A profile with the name '{name}' already exists.",
parent=self,
)
return return
new_profile = ExportProfile(name=name.strip()) new_profile = ExportProfile(name=name.strip())
self.profiles.append(new_profile) self.profiles.append(new_profile)
self._load_profiles_to_combobox() self._load_profiles_to_combobox()
@ -350,14 +285,8 @@ class ProfileEditorWindow(tk.Toplevel):
def _on_delete_profile(self): def _on_delete_profile(self):
profile = self._get_current_profile() profile = self._get_current_profile()
if not profile: if not profile: return
return if messagebox.askyesno("Confirm Delete", f"Are you sure you want to delete the profile '{profile.name}'?", parent=self):
if messagebox.askyesno(
"Confirm Delete",
f"Are you sure you want to delete the profile '{profile.name}'?",
parent=self,
):
self.profiles.remove(profile) self.profiles.remove(profile)
self._load_profiles_to_combobox() self._load_profiles_to_combobox()
@ -372,14 +301,8 @@ class ProfileEditorWindow(tk.Toplevel):
def _on_close(self): def _on_close(self):
if self._check_unsaved_changes(): if self._check_unsaved_changes():
response = messagebox.askyesnocancel( response = messagebox.askyesnocancel("Unsaved Changes", "You have unsaved changes. Would you like to save them?", parent=self)
"Unsaved Changes", if response is True: self._on_save_and_close()
"You have unsaved changes. Would you like to save them?", elif response is False: self.destroy()
parent=self,
)
if response is True:
self._on_save_and_close()
elif response is False:
self.destroy()
else: else:
self.destroy() self.destroy()