add new filed for timer

add edit path in profile window
add array function  to export
This commit is contained in:
VALLONGOL 2025-06-24 14:30:13 +02:00
parent c8c0823ff6
commit 90d09cd005
6 changed files with 247 additions and 119 deletions

View File

@ -59,8 +59,8 @@
"translate_with_enum": true
},
{
"column_name": "prt_num",
"data_path": "timer_data.blob.payload.prt_num",
"column_name": "tcr",
"data_path": "timer_data.blob.payload.tcr",
"translate_with_enum": false
},
{
@ -68,24 +68,19 @@
"data_path": "timer_data.blob.payload.diff_prt_num",
"translate_with_enum": false
},
{
"column_name": "tcr",
"data_path": "timer_data.blob.payload.tcr",
"translate_with_enum": false
},
{
"column_name": "tpr",
"data_path": "timer_data.blob.payload.tpr",
"translate_with_enum": false
},
{
"column_name": "B_Filter",
"data_path": "timer_data.blob.payload.shift.B_Filter",
"translate_with_enum": false
},
{
"column_name": "PT_DET",
"data_path": "timer_data.blob.payload.shift.PT_DET",
"column_name": "RX_SYNC",
"data_path": "timer_data.blob.payload.shift.RX_SYNC",
"translate_with_enum": false
},
{
"column_name": "exp_pulse1_delay",
"data_path": "timer_data.blob.payload.exp_pulse1_delay",
"translate_with_enum": false
}
]

View File

@ -10,6 +10,7 @@ import json
import os
import subprocess
import sys
import re
from pathlib import Path
from typing import List, Any, Dict, Tuple, Optional
from tkinter import filedialog, messagebox
@ -30,31 +31,51 @@ log = logger.get_logger(__name__)
def _get_value_from_path(batch: DataBatch, field: ExportField) -> Any:
path = field.data_path
"""
Retrieves a value from a nested object using a dot- and bracket-notation path.
Example paths: "main_header.ge_header.mode.master_mode", "timer_data.blob.payload.aesa_delay[0].fifo[3]"
"""
try:
parts = path.split(".")
if not parts:
return "N/A"
path = field.data_path
if path == "batch_id":
return batch.batch_id
# Split the path by dots and brackets to handle attributes and indices
parts = re.split(r'\.|\[' , path)
current_obj = batch
for part in parts:
if current_obj is None:
return "N/A"
# Questa linea funziona sia per oggetti Python normali che per ctypes.Structure
current_obj = getattr(current_obj, part, None)
if part.endswith(']'):
# This is an index access
index_str = part[:-1]
if not index_str.isdigit():
log.warning(f"Invalid index '{index_str}' in path: {path}")
return "N/A"
try:
current_obj = current_obj[int(index_str)]
except (IndexError, TypeError):
log.warning(f"Index out of bounds for '{index_str}' in path: {path}")
return "N/A"
else:
# This is an attribute access
current_obj = getattr(current_obj, part, None)
value = current_obj if current_obj is not None else "N/A"
# Handle translation for enums if the final value is an integer
if field.translate_with_enum and isinstance(value, int):
enum_class = ENUM_REGISTRY.get(path)
# For enum translation, we need the path without indices
enum_path = re.sub(r'\[\d+\]', '', path)
enum_class = ENUM_REGISTRY.get(enum_path)
if enum_class:
return get_enum_name(enum_class, value)
return value
except AttributeError:
log.warning(f"Could not find attribute for path: {path}")
except Exception as e:
log.warning(f"Could not resolve path '{field.data_path}': {e}")
return "N/A"

View File

@ -63,9 +63,9 @@ BLOCK_TYPE_MAP = {
SIGNAL_DATA_MARKER = 1313304915
# --- GE_HEADER Structure Definitions (used for DSPHDRIN) ---
# ... (le definizioni di GeHeader e le sue sotto-strutture restano invariate)
class HeaderInfo(CtypesStructureBase):
_fields_ = [
("marker_dati_1", ctypes.c_uint),
@ -305,7 +305,8 @@ class GeHeader(CtypesStructureBase):
# --- CDPSTS Block and Sub-structures (ctypes) ---
# ... (le definizioni di CdpStsPayload e le sue sotto-strutture restano invariate)
class SharedMemoryHeader(CtypesStructureBase):
_fields_ = [
("marker_low", ctypes.c_uint32),
@ -468,9 +469,40 @@ class ShiftRegisters(CtypesStructureBase):
]
class TimerRawIf(CtypesStructureBase):
"""Mirrors the C++ `timer_raw_if_t` struct (partial)."""
class UniquePrtFifo(CtypesStructureBase):
"""Mirrors the C++ `unique_prt_fifo_t` struct."""
_fields_ = [("fifo", ctypes.c_uint32 * 8)]
class PrtFifo(CtypesStructureBase):
"""Mirrors the C++ `prt_fifo_t` struct."""
_fields_ = [("fifo", ctypes.c_uint32 * 128)]
class DelayWithPeriod(CtypesStructureBase):
"""Mirrors the C++ `delay_with_period_t` struct."""
_fields_ = [
("delay", UniquePrtFifo),
("width", UniquePrtFifo),
("period", UniquePrtFifo),
("spare__", ctypes.c_byte * (0x280 - 0x25F - 1)),
]
class DelayWidth(CtypesStructureBase):
"""Mirrors the C++ `delay_width_t` struct."""
_fields_ = [("delay", UniquePrtFifo), ("width", UniquePrtFifo)]
class TimerRawIf(CtypesStructureBase):
"""
Mirrors the C++ `timer_raw_if_t` struct (Reduced initial version).
This maps only the first, most stable part of the structure.
"""
_fields_ = [
("tcr", ctypes.c_uint32),
("tpr", ctypes.c_uint32),
@ -480,7 +512,20 @@ class TimerRawIf(CtypesStructureBase):
("diff_prt_num", ctypes.c_uint16),
("spares__", ctypes.c_uint32 * 3),
("shift", ShiftRegisters),
# The rest of the struct is very large, omitted for now.
("spare_after_shift", ctypes.c_byte * (12)), # Padding fino a 0x50
# Offset in C++: 0x50
("aesa_delay", UniquePrtFifo * 2), # Dimensione: 64 bytes * 2 = 128 bytes. Va da 0x50 a 0xD0
("spare0__", ctypes.c_byte * 48), # Padding per raggiungere 0x100 (da 0xD0 a 0x100)
# Offset in C++: 0x100
("exp_pulse1_delay", UniquePrtFifo * 2), # da 0x100 a 0x180
("exp_pulse2_delay", UniquePrtFifo * 2), # da 0x180 a 0x200
# Offset in C++: 0x200
("pretrigger_det_delay", UniquePrtFifo * 2), # da 0x200 a 0x280
# Il resto è omesso per ora
]
@ -494,8 +539,6 @@ class GrifoTimerBlob(CtypesStructureBase):
# --- Top-Level Block Definitions (Python-side) ---
@dataclass
class DspHeaderIn(BaseBlock):
ge_header: GeHeader
@ -505,7 +548,30 @@ class DspHeaderIn(BaseBlock):
class CdpStsBlock(BaseBlock):
is_valid: bool
payload: Optional[CdpStsPayload] = None
# ... properties ...
@property
def timetag_batch_id(self) -> Optional[int]:
return (
self.payload.data.timetag_chunk.data.batch_id
if self.is_valid and self.payload
else None
)
@property
def timetag_time(self) -> Optional[int]:
return (
self.payload.data.timetag_chunk.data.time
if self.is_valid and self.payload
else None
)
@property
def status(self) -> Optional[ModeStatus]:
return (
self.payload.data.status_chunk.data
if self.is_valid and self.payload
else None
)
@dataclass
@ -530,7 +596,7 @@ class DataBatch:
batch_id: int
blocks: List[BaseBlock] = field(default_factory=list)
cdp_sts_results: Optional[CdpStsBlock] = None
timer_data: Optional[TimerBlock] = None # Added field for Timer data
timer_data: Optional[TimerBlock] = None
@property
def main_header(self) -> Optional[DspHeaderIn]:

View File

@ -321,7 +321,9 @@ class RadarFileReader:
total_file_words = self.data_vector.size
batch_counter = 0
for block_num, (start_offset_words, size_words, block_name) in enumerate(self.block_metadata):
for block_num, (start_offset_words, size_words, block_name) in enumerate(
self.block_metadata
):
blocks_processed_so_far = block_num + 1
if start_offset_words + size_words > total_file_words:
@ -331,10 +333,16 @@ class RadarFileReader:
stats["skipped_blocks"] += 1
continue
block_id = next((id for id, name in ds.BLOCK_TYPE_MAP.items() if name == block_name), 0)
block_data_slice = self.data_vector[start_offset_words : start_offset_words + size_words]
block_id = next(
(id for id, name in ds.BLOCK_TYPE_MAP.items() if name == block_name), 0
)
block_data_slice = self.data_vector[
start_offset_words : start_offset_words + size_words
]
parsed_block = parse_block(block_id, block_data_slice, last_header, block_name_override=block_name)
parsed_block = parse_block(
block_id, block_data_slice, last_header, block_name_override=block_name
)
if parsed_block is None:
stats["failed_to_parse_blocks"] += 1

View File

View File

@ -2,13 +2,15 @@
"""
GUI Window for creating, editing, and deleting export profiles.
Includes a custom dialog for editing complex data paths.
"""
import tkinter as tk
from tkinter import ttk, simpledialog, messagebox
import ctypes
import copy
from typing import List, Type, Dict, Any, Union, Optional
import re
from typing import List, Type, Dict, Any, Optional
from ..core import data_structures as ds
from ..core.data_enums import ENUM_REGISTRY
@ -18,6 +20,61 @@ from ..utils import logger
log = logger.get_logger(__name__)
class EditPathDialog(tk.Toplevel):
"""A custom dialog window to edit a data path string."""
def __init__(self, parent, initial_value=""):
super().__init__(parent)
self.transient(parent)
self.grab_set()
self.title("Edit Data Path")
self.geometry("800x150") # Wide dialog for long paths
self.result = None
main_frame = ttk.Frame(self, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
main_frame.columnconfigure(0, weight=1)
# Instructions Label
instructions = (
"Enter the full data path. Use '.' for attributes and '[index]' for arrays.\n"
"Example 1: main_header.ge_header.mode.master_mode\n"
"Example 2: timer_data.blob.payload.aesa_delay[0].fifo[3]"
)
ttk.Label(main_frame, text=instructions, justify=tk.LEFT).grid(row=0, column=0, sticky="w", pady=(0, 10))
# Entry Widget
self.path_var = tk.StringVar(value=initial_value)
self.path_entry = ttk.Entry(main_frame, textvariable=self.path_var, font=("Courier", 10))
self.path_entry.grid(row=1, column=0, sticky="ew")
self.path_entry.focus_set()
self.path_entry.selection_range(0, tk.END)
# Buttons Frame
button_frame = ttk.Frame(main_frame)
button_frame.grid(row=2, column=0, sticky="e", pady=(10, 0))
ttk.Button(button_frame, text="OK", command=self._on_ok, default=tk.ACTIVE).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="Cancel", command=self._on_cancel).pack(side=tk.LEFT)
self.protocol("WM_DELETE_WINDOW", self._on_cancel)
self.bind("<Return>", self._on_ok)
self.bind("<Escape>", self._on_cancel)
def _on_ok(self, event=None):
self.result = self.path_var.get().strip()
self.destroy()
def _on_cancel(self, event=None):
self.result = None
self.destroy()
def wait_for_input(self):
"""Waits for the dialog to close and returns the result."""
self.wait_window()
return self.result
class ProfileEditorWindow(tk.Toplevel):
"""A Toplevel window for managing export profiles."""
@ -38,32 +95,26 @@ class ProfileEditorWindow(tk.Toplevel):
self.protocol("WM_DELETE_WINDOW", self._on_close)
def _init_window(self):
"""Initializes window properties."""
self.title("Export Profile Editor")
self.geometry("1200x700") # Increased width for new column
self.geometry("1200x700")
self.transient(self.master)
self.grab_set()
def _init_vars(self):
"""Initializes Tkinter variables."""
self.selected_profile_name = tk.StringVar()
def _create_widgets(self):
"""Creates the main layout and widgets for the editor."""
main_pane = ttk.PanedWindow(self, orient=tk.HORIZONTAL)
main_pane.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# --- Left Frame: Profile Management ---
profile_mgmt_frame = ttk.LabelFrame(main_pane, text="Profiles")
main_pane.add(profile_mgmt_frame, weight=2) # Adjusted weight
main_pane.add(profile_mgmt_frame, weight=2)
profile_mgmt_frame.columnconfigure(0, weight=1)
cb_frame = ttk.Frame(profile_mgmt_frame)
cb_frame.grid(row=0, column=0, sticky="ew", padx=5, pady=5)
cb_frame.columnconfigure(0, weight=1)
self.profile_combobox = ttk.Combobox(
cb_frame, textvariable=self.selected_profile_name, state="readonly"
)
self.profile_combobox = ttk.Combobox(cb_frame, textvariable=self.selected_profile_name, state="readonly")
self.profile_combobox.grid(row=0, column=0, sticky="ew")
self.profile_combobox.bind("<<ComboboxSelected>>", self._on_profile_selected)
@ -73,9 +124,8 @@ class ProfileEditorWindow(tk.Toplevel):
ttk.Button(btn_frame, text="New", command=self._on_new_profile).grid(row=0, column=0, sticky="ew", padx=2)
ttk.Button(btn_frame, text="Delete", command=self._on_delete_profile).grid(row=0, column=1, sticky="ew", padx=2)
# --- Middle Frame: Available Fields ---
fields_frame = ttk.LabelFrame(main_pane, text="Available Fields")
main_pane.add(fields_frame, weight=3) # Adjusted weight
main_pane.add(fields_frame, weight=3)
fields_frame.rowconfigure(0, weight=1)
fields_frame.columnconfigure(0, weight=1)
self.fields_tree = ttk.Treeview(fields_frame, selectmode="browse")
@ -84,9 +134,8 @@ class ProfileEditorWindow(tk.Toplevel):
self.fields_tree.configure(yscrollcommand=ysb.set)
ysb.grid(row=0, column=1, sticky="ns")
# --- Right Frame: Selected Fields and Actions ---
selected_frame_container = ttk.Frame(main_pane)
main_pane.add(selected_frame_container, weight=5) # Adjusted weight
main_pane.add(selected_frame_container, weight=5)
selected_frame_container.rowconfigure(0, weight=1)
selected_frame_container.columnconfigure(1, weight=1)
@ -94,98 +143,112 @@ class ProfileEditorWindow(tk.Toplevel):
action_btn_frame.grid(row=0, column=0, sticky="ns", padx=5, pady=5)
ttk.Button(action_btn_frame, text=">>", command=self._add_field).grid(pady=5)
ttk.Button(action_btn_frame, text="<<", command=self._remove_field).grid(pady=5)
ttk.Button(action_btn_frame, text="Up", command=lambda: self._move_field(-1)).grid(pady=20)
ttk.Button(action_btn_frame, text="Up", command=lambda: self._move_field(-1)).grid(pady=10)
ttk.Button(action_btn_frame, text="Down", command=lambda: self._move_field(1)).grid(pady=5)
ttk.Button(
action_btn_frame, text="Reset", command=self._clear_selected_fields
).grid(pady=20)
ttk.Button(action_btn_frame, text="Edit Path", command=self._edit_selected_field_path).grid(pady=10)
ttk.Button(action_btn_frame, text="Reset", command=self._clear_selected_fields).grid(pady=5)
selected_fields_frame = ttk.LabelFrame(selected_frame_container, text="Selected Fields for Profile")
selected_fields_frame.grid(row=0, column=1, sticky="nsew")
selected_fields_frame.rowconfigure(0, weight=1)
selected_fields_frame.columnconfigure(0, weight=1)
self.selected_tree = ttk.Treeview(
selected_fields_frame,
columns=("display_name", "data_path", "translate"), # Added data_path column
show="headings",
selectmode="browse",
)
self.selected_tree = ttk.Treeview(selected_fields_frame, columns=("display_name", "data_path", "translate"), show="headings", selectmode="browse")
self.selected_tree.heading("display_name", text="Field Name")
self.selected_tree.heading("data_path", text="Source Path") # New header
self.selected_tree.heading("data_path", text="Source Path")
self.selected_tree.heading("translate", text="Translate")
self.selected_tree.column("display_name", width=150, stretch=True)
self.selected_tree.column("data_path", width=250, stretch=True) # New column config
self.selected_tree.column("data_path", width=300, stretch=True) # Widened column
self.selected_tree.column("translate", width=80, anchor="center", stretch=False)
self.selected_tree.grid(row=0, column=0, sticky="nsew")
self.selected_tree.bind("<Button-1>", self._on_selected_tree_click)
self.selected_tree.bind("<Double-1>", self._on_selected_tree_double_click)
# --- Bottom Frame: Save/Cancel ---
bottom_frame = ttk.Frame(self)
bottom_frame.pack(fill=tk.X, padx=10, pady=(0, 10))
ttk.Button(bottom_frame, text="Save & Close", command=self._on_save_and_close).pack(side=tk.RIGHT)
ttk.Button(bottom_frame, text="Cancel", command=self._on_close).pack(side=tk.RIGHT, padx=5)
def _clear_selected_fields(self):
"""Clears all fields from the currently selected profile."""
def _on_selected_tree_double_click(self, event):
"""Handle double-click on the selected fields tree to edit path."""
self._edit_selected_field_path()
def _edit_selected_field_path(self):
"""Allows manual editing of the data_path for the selected field."""
selection = self.selected_tree.selection()
if not selection:
messagebox.showinfo("No Selection", "Please select a field to edit.", parent=self)
return
index = int(selection[0])
profile = self._get_current_profile()
if not profile:
return
if not profile: return
if not profile.fields: # Do nothing if already empty
return
field = profile.fields[index]
if messagebox.askyesno(
"Confirm Clear",
f"Are you sure you want to remove all fields from the profile '{profile.name}'?",
parent=self
):
profile.fields.clear()
dialog = EditPathDialog(self, initial_value=field.data_path)
new_path = dialog.wait_for_input()
if new_path is not None and new_path != field.data_path:
# Basic validation for the path format
if not re.match(r'^[\w\.\[\]]+$', new_path):
messagebox.showerror("Invalid Path", "The path contains invalid characters.", parent=self)
return
field.data_path = new_path
self._load_profile_into_ui()
def _on_selected_tree_click(self, event):
region = self.selected_tree.identify_region(event.x, event.y)
if region != "cell": return
column_id = self.selected_tree.identify_column(event.x)
if column_id != "#3": # Column is now the 3rd one
return
if column_id != "#3": return
item_id = self.selected_tree.identify_row(event.y)
if not item_id: return
profile = self._get_current_profile()
if not profile: return
field_index = int(item_id)
field = profile.fields[field_index]
if field.data_path in ENUM_REGISTRY:
if field.data_path in ENUM_REGISTRY or re.sub(r'\[\d+\]', '', field.data_path) in ENUM_REGISTRY:
field.translate_with_enum = not field.translate_with_enum
self._load_profile_into_ui()
def _load_profile_into_ui(self):
for i in self.selected_tree.get_children(): self.selected_tree.delete(i)
profile = self._get_current_profile()
if not profile: return
for index, field in enumerate(profile.fields):
base_path = re.sub(r'\[\d+\]', '', field.data_path)
is_translatable = base_path in ENUM_REGISTRY
checkbox_char = ""
if is_translatable: checkbox_char = "" if field.translate_with_enum else ""
self.selected_tree.insert("", "end", iid=str(index), values=(field.column_name, field.data_path, checkbox_char))
def _clear_selected_fields(self):
"""Clears all fields from the currently selected profile."""
profile = self._get_current_profile()
if not profile or not profile.fields: return
if messagebox.askyesno("Confirm Clear", f"Are you sure you want to remove all fields from the profile '{profile.name}'?", parent=self):
profile.fields.clear()
self._load_profile_into_ui()
# --- Other methods remain unchanged ---
def _populate_available_fields_tree(self):
self.fields_tree.delete(*self.fields_tree.get_children())
batch_root = self.fields_tree.insert("", "end", iid="batch_properties", text="Batch Properties")
self.fields_tree.insert(batch_root, "end", iid="batch_id", text="batch_id", values=("batch_id", "batch_id"))
header_root = self.fields_tree.insert("", "end", iid="header_data", text="Header Data (from DSPHDRIN)")
self._recursive_populate_tree_ctypes(ds.GeHeader, header_root, "main_header.ge_header")
cdpsts_root = self.fields_tree.insert("", "end", iid="cdpsts_data", text="CDP/STS Block Data")
self._recursive_populate_tree_ctypes(ds.CdpDataLayout, cdpsts_root, "cdp_sts_results.payload.data")
timer_root = self.fields_tree.insert("", "end", iid="timer_data", text="Timer Block Data")
self._recursive_populate_tree_ctypes(ds.GrifoTimerBlob, timer_root, "timer_data.blob")
def _recursive_populate_tree_ctypes(self, class_obj: Type[ctypes.Structure], parent_id: str, base_path: str):
if not hasattr(class_obj, '_fields_'): return
for field_name, field_type in class_obj._fields_:
current_path = f"{base_path}.{field_name}"
node_id = f"{parent_id}_{field_name}"
if hasattr(field_type, '_fields_'):
child_node = self.fields_tree.insert(parent_id, "end", iid=node_id, text=field_name)
self._recursive_populate_tree_ctypes(field_type, child_node, current_path)
@ -209,44 +272,19 @@ class ProfileEditorWindow(tk.Toplevel):
def _on_profile_selected(self, event=None):
self._load_profile_into_ui()
def _load_profile_into_ui(self):
for i in self.selected_tree.get_children():
self.selected_tree.delete(i)
profile = self._get_current_profile()
if not profile: return
for index, field in enumerate(profile.fields):
is_translatable = field.data_path in ENUM_REGISTRY
checkbox_char = ""
if is_translatable:
checkbox_char = "" if field.translate_with_enum else ""
# Show a more readable source path
source_display = '.'.join(field.data_path.split('.')[:2])
self.selected_tree.insert(
"", "end", iid=str(index),
values=(field.column_name, source_display, checkbox_char)
)
def _add_field(self):
selected_item_id = self.fields_tree.focus()
if not selected_item_id: return
item_values = self.fields_tree.item(selected_item_id, "values")
if not item_values or len(item_values) < 2:
messagebox.showinfo("Cannot Add Field", "Please select a specific data field.", parent=self)
return
column_name, data_path = item_values
profile = self._get_current_profile()
if not profile: return
if any(f.data_path == data_path for f in profile.fields):
messagebox.showinfo("Duplicate Field", "This field is already in the profile.", parent=self)
return
profile.fields.append(ExportField(column_name=column_name, data_path=data_path))
self._load_profile_into_ui()