From ad25970cc8bb2c8fae39f84187dd243ac1956ce3 Mon Sep 17 00:00:00 2001 From: VALLONGOL Date: Tue, 24 Jun 2025 13:52:45 +0200 Subject: [PATCH] add new block info --- radar_data_reader/core/data_structures.py | 5 ++ radar_data_reader/core/file_reader.py | 94 +++++++++++++---------- 2 files changed, 58 insertions(+), 41 deletions(-) diff --git a/radar_data_reader/core/data_structures.py b/radar_data_reader/core/data_structures.py index 98a1a65..b5b950e 100644 --- a/radar_data_reader/core/data_structures.py +++ b/radar_data_reader/core/data_structures.py @@ -55,6 +55,11 @@ BLOCK_TYPE_MAP = { 5265477: "EXP", 17232: "PC", 5522756: "DET", + 1413893971: "TIMER", + 1347635524: "SOFT", + 1162692948: "EXTLINK", + 1280463939: "CTRLINK", + 1280596037: "SOFTDFE", } SIGNAL_DATA_MARKER = 1313304915 diff --git a/radar_data_reader/core/file_reader.py b/radar_data_reader/core/file_reader.py index a997a22..73e0445 100644 --- a/radar_data_reader/core/file_reader.py +++ b/radar_data_reader/core/file_reader.py @@ -172,58 +172,70 @@ class RadarFileReader: [] ) # (start_offset_words, size_words, name) - def _parse_legacy_block_header( - self, start_index: int - ) -> Optional[Tuple[int, int, str]]: - """Parses a legacy block header (marker 0x5A5A5A5A).""" - try: - name_id = self.data_vector[start_index + LEGACY_NAME_OFFSET_WORDS] - block_name = ds.BLOCK_TYPE_MAP.get(name_id, f"UNKNOWN_{name_id}") - - # The size in the header is the total size of the block data - payload_size_bytes = self.data_vector[ - start_index + LEGACY_SIZE_OFFSET_WORDS - ] - if not (0 < payload_size_bytes < (20 * 1024 * 1024)): - self.log.debug( - f"Invalid legacy payload size {payload_size_bytes} at word {start_index}." - ) - return None - - payload_size_words = (payload_size_bytes + 3) // 4 - # For legacy blocks, the payload starts after the marker. - # The exact header size is complex, but we assume the data follows immediately. - payload_start_offset = start_index - return payload_start_offset, payload_size_words, block_name - except IndexError: - self.log.warning( - f"IndexError while parsing legacy block at word {start_index}." - ) - return None - - def _parse_fw_block_header( - self, start_index: int - ) -> Optional[Tuple[int, int, str]]: + def _parse_fw_block_header(self, start_index: int) -> Optional[Tuple[int, int, str]]: """Parses a firmware block header (marker 0x7A7A7A7A).""" try: name_id = self.data_vector[start_index + FW_NAME_OFFSET_WORDS] - block_name = ds.BLOCK_TYPE_MAP.get(name_id, f"UNKNOWN_{name_id}") + block_name = ds.BLOCK_TYPE_MAP.get(name_id) + + if not block_name: + self.log.debug(f"Found unknown Firmware block with ID: {name_id}") + block_name = f"UNKNOWN_{name_id}" payload_size_bytes = self.data_vector[start_index + FW_SIZE_OFFSET_WORDS] if not (0 < payload_size_bytes < (20 * 1024 * 1024)): - self.log.debug( - f"Invalid firmware payload size {payload_size_bytes} at word {start_index}." - ) + self.log.debug(f"Invalid firmware payload size {payload_size_bytes} at word {start_index}.") return None - + payload_size_words = (payload_size_bytes + 3) // 4 - # For firmware blocks, the payload starts after the 8-word header. payload_start_offset = start_index + FW_HEADER_SIZE_WORDS return payload_start_offset, payload_size_words, block_name except IndexError: - self.log.warning( - f"IndexError while parsing firmware block at word {start_index}." - ) + self.log.warning(f"IndexError while parsing firmware block at word {start_index}.") + return None + + def _parse_fw_block_header(self, start_index: int) -> Optional[Tuple[int, int, str]]: + """Parses a firmware block header (marker 0x7A7A7A7A).""" + try: + name_id = self.data_vector[start_index + FW_NAME_OFFSET_WORDS] + block_name = ds.BLOCK_TYPE_MAP.get(name_id) + + if not block_name: + self.log.debug(f"Found unknown Firmware block with ID: {name_id}") + block_name = f"UNKNOWN_{name_id}" + + payload_size_bytes = self.data_vector[start_index + FW_SIZE_OFFSET_WORDS] + if not (0 < payload_size_bytes < (20 * 1024 * 1024)): + self.log.debug(f"Invalid firmware payload size {payload_size_bytes} at word {start_index}.") + return None + + payload_size_words = (payload_size_bytes + 3) // 4 + payload_start_offset = start_index + FW_HEADER_SIZE_WORDS + return payload_start_offset, payload_size_words, block_name + except IndexError: + self.log.warning(f"IndexError while parsing firmware block at word {start_index}.") + return None + + def _parse_legacy_block_header(self, start_index: int) -> Optional[Tuple[int, int, str]]: + """Parses a legacy block header (marker 0x5A5A5A5A).""" + try: + name_id = self.data_vector[start_index + LEGACY_NAME_OFFSET_WORDS] + block_name = ds.BLOCK_TYPE_MAP.get(name_id) + + if not block_name: + self.log.debug(f"Found unknown Legacy block with ID: {name_id}") + block_name = f"UNKNOWN_{name_id}" + + payload_size_bytes = self.data_vector[start_index + LEGACY_SIZE_OFFSET_WORDS] + if not (0 < payload_size_bytes < (20 * 1024 * 1024)): + self.log.debug(f"Invalid legacy payload size {payload_size_bytes} at word {start_index}.") + return None + + payload_size_words = (payload_size_bytes + 3) // 4 + payload_start_offset = start_index + return payload_start_offset, payload_size_words, block_name + except IndexError: + self.log.warning(f"IndexError while parsing legacy block at word {start_index}.") return None def load_and_find_blocks(self) -> bool: