fix gui, save csv and add profiles

This commit is contained in:
VALLONGOL 2025-06-18 15:04:13 +02:00
parent 7055c7ac27
commit 1b3c8a5f5e
9 changed files with 705 additions and 103 deletions

View File

@ -1,3 +1,28 @@
{
"last_opened_file": "C:\\src\\____GitProjects\\radar_data_reader\\_25-05-15-12-22-52_sata_354-n11.out"
"last_opened_file": "C:/src/____GitProjects/radar_data_reader/_25-05-15-12-22-52_sata_354-n11.out",
"last_output_file": "C:\\src\\____GitProjects\\radar_data_reader\\_25-05-15-12-22-52_sata_354-n11.csv",
"active_export_profile_name": "Default",
"export_profiles": [
{
"name": "Default",
"fields": [
{
"column_name": "batch_counter",
"data_path": "header.header_data.signal_descr.batch_counter"
},
{
"column_name": "ttag",
"data_path": "header.header_data.signal_descr.ttag"
},
{
"column_name": "master_mode",
"data_path": "header.header_data.mode.master_mode"
},
{
"column_name": "operation_mode",
"data_path": "header.header_data.mode.operation_mode"
}
]
}
]
}

View File

@ -1,48 +1,98 @@
# app_controller.py (File Completo)
"""
Application Controller for the Radar Data Reader.
Orchestrates the interaction between the GUI and the core processing logic using multiprocessing.
"""
import multiprocessing as mp
import threading
import csv
from pathlib import Path
from typing import List
from typing import List, Any
from functools import reduce
from ..utils.config_manager import ConfigManager
from ..core.file_reader import run_worker_process
from ..core.data_structures import DataBatch
from ..utils import logger
from ..gui.profile_editor_window import ProfileEditorWindow
from ..core.export_profiles import ExportProfile
log = logger.get_logger(__name__)
def _get_value_from_path(obj: Any, path: str) -> Any:
# ... (this helper function is unchanged)
try:
if path == 'batch_id':
return obj.batch_id
return reduce(getattr, path.split('.'), obj)
except AttributeError:
log.warning(f"Could not find attribute for path: {path}")
return "N/A"
class AppController:
"""The main controller of the application."""
def __init__(self, config_manager: ConfigManager):
# ... (this method is unchanged)
self.config_manager = config_manager
self.view = None
self.profile_editor_window: ProfileEditorWindow | None = None
self.worker_process: mp.Process | None = None
self.is_processing = False
self.command_queue = mp.Queue()
self.result_queue = mp.Queue()
self.active_export_profile: ExportProfile | None = None
self.csv_file_handle = None
self.csv_writer = None
def bind_view(self, view):
"""Binds the GUI/View to this controller."""
# ... (this method is unchanged)
self.view = view
self._load_initial_config()
def _load_initial_config(self):
"""Loads initial settings from config and updates the view."""
# Load input file
if last_file := self.config_manager.get("last_opened_file"):
if Path(last_file).is_file():
self.view.set_filepath(last_file)
# Load output file
if last_output_file := self.config_manager.get("last_output_file"):
if Path(last_output_file).parent.exists(): # Check if dir exists
self.view.set_output_filepath(last_output_file)
# If no last output file, propose one based on input file
if not self.view.get_output_filepath() and self.view.get_filepath():
self._propose_output_filepath(self.view.get_filepath())
# Load profiles
profiles = self.config_manager.get_export_profiles()
active_profile_name = self.config_manager.get("active_export_profile_name")
self.view.update_export_profiles(profiles, active_profile_name)
def _propose_output_filepath(self, input_path_str: str):
"""Proposes a default output path based on the input path."""
if not input_path_str:
return
proposed_path = Path(input_path_str).with_suffix('.csv')
self.view.set_output_filepath(str(proposed_path))
def select_file(self):
"""Opens a file dialog to select a radar file."""
current_path = self.view.get_filepath()
if filepath := self.view.ask_open_filename(current_path):
self.view.set_filepath(filepath)
# Propose a new output file when a new input is selected
self._propose_output_filepath(filepath)
def select_output_file(self):
"""Opens a 'save as' dialog to select the output CSV file path."""
current_path = self.view.get_output_filepath()
if new_path := self.view.ask_save_as_filename(current_path):
self.view.set_output_filepath(new_path)
def start_processing(self):
"""Starts the file processing in a separate process."""
@ -52,16 +102,41 @@ class AppController:
filepath_str = self.view.get_filepath()
if not filepath_str or not Path(filepath_str).is_file():
log.error("No valid file selected to process.")
log.error("No valid input file selected to process.")
return
output_filepath_str = self.view.get_output_filepath()
if not output_filepath_str:
log.error("No output file path specified.")
return
active_profile_name = self.view.get_active_profile_name()
profiles = self.config_manager.get_export_profiles()
self.active_export_profile = next((p for p in profiles if p.name == active_profile_name), None)
if not self.active_export_profile:
log.error(f"No valid export profile named '{active_profile_name}' found.")
return
try:
output_path = Path(output_filepath_str)
log.info(f"Opening {output_path} for row-by-row saving using profile '{self.active_export_profile.name}'.")
self.csv_file_handle = open(output_path, 'w', encoding='utf-8', newline='')
self.csv_writer = csv.writer(self.csv_file_handle)
header = [field.column_name for field in self.active_export_profile.fields]
self.csv_writer.writerow(header)
except IOError as e:
log.error(f"Failed to open CSV file for writing: {e}")
return
self.is_processing = True
self.view.start_processing_ui()
self.config_manager.set("last_opened_file", filepath_str)
self.config_manager.set("last_output_file", output_filepath_str) # Save the chosen output path
self.config_manager.set("active_export_profile_name", active_profile_name)
self.config_manager.save_config()
# Clear queues before starting a new process
while not self.command_queue.empty(): self.command_queue.get()
while not self.result_queue.empty(): self.result_queue.get()
@ -74,50 +149,66 @@ class AppController:
self.view.poll_result_queue()
def stop_processing(self):
"""Sends a stop command to the worker process."""
if not self.is_processing:
log.warning("Stop clicked, but no process is running.")
def handle_data_batch(self, batch: DataBatch):
# ... (this method is unchanged)
if not self.csv_writer or not self.active_export_profile or not self.csv_file_handle:
log.warning("Received a data batch but no CSV writer is configured. Skipping.")
return
log.info("Stop requested by user. Sending STOP command to worker...")
self.command_queue.put("STOP")
def handle_worker_completion(self, results: List[dict], was_interrupted: bool):
"""Handles the final results from the worker."""
try:
row_values = [_get_value_from_path(batch, field.data_path) for field in self.active_export_profile.fields]
self.csv_writer.writerow(row_values)
self.csv_file_handle.flush()
except Exception as e:
log.error(f"An unexpected error occurred during CSV row writing: {e}", exc_info=True)
def handle_worker_completion(self, was_interrupted: bool):
# ... (this method is unchanged)
status = "Interrupted by user" if was_interrupted else "Processing Complete"
log.info(f"--- {status}. Found {len(results)} batches. ---")
log.info(f"--- {status}. Finalizing export. ---")
filepath = Path(self.view.get_filepath())
if results and filepath:
self._save_results_to_csv(results, filepath)
if self.csv_file_handle:
try:
self.csv_file_handle.close()
log.info("CSV file saved and closed successfully.")
except IOError as e:
log.error(f"Error closing CSV file: {e}")
self.csv_file_handle = None
self.csv_writer = None
self.active_export_profile = None
self.is_processing = False
self.worker_process = None
def _save_results_to_csv(self, batch_results: List[dict], source_path: Path):
"""Saves the extracted data to a CSV file."""
output_path = source_path.with_suffix('.csv')
log.info(f"Saving {len(batch_results)} records to: {output_path}")
def open_profile_editor(self):
# ... (this method is unchanged)
if self.profile_editor_window and self.profile_editor_window.winfo_exists():
self.profile_editor_window.lift()
self.profile_editor_window.focus_force()
return
header = ["batch_id", "TimeTag"]
try:
with open(output_path, 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerow(header)
for record in batch_results:
writer.writerow([record["batch_id"], record["timetag"]])
log.info("Save complete.")
except IOError as e:
log.error(f"Failed to write CSV: {e}")
profiles = self.config_manager.get_export_profiles()
self.profile_editor_window = ProfileEditorWindow(master=self.view, controller=self, profiles=profiles)
self.profile_editor_window.wait_window()
self._load_initial_config()
def save_export_profiles(self, profiles: List[ExportProfile]):
# ... (this method is unchanged)
self.config_manager.save_export_profiles(profiles)
def shutdown(self):
"""Handles application shutdown logic."""
# ... (this method is unchanged)
log.info("Controller shutting down.")
self.stop_processing()
if self.is_processing:
self.stop_processing()
if self.worker_process and self.worker_process.is_alive():
log.info("Waiting for worker process to join...")
self.worker_process.join(timeout=1.0)
if self.worker_process.is_alive():
log.warning("Worker process did not exit gracefully, terminating.")
self.worker_process.terminate()
if self.csv_file_handle:
self.csv_file_handle.close()
logger.shutdown_logging_system()

View File

@ -0,0 +1,54 @@
"""
Defines the data structures for managing export profiles.
"""
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
# Using slots for performance, similar to data_structures.py
DC_KWARGS = {'slots': True}
@dataclass(**DC_KWARGS)
class ExportField:
"""
Represents a single data field to be exported to a CSV column.
Attributes:
column_name (str): The name of the column in the CSV file header.
data_path (str): A dot-separated path to access the value within
the DataBatch object. E.g., "header.header_data.signal_descr.ttag".
"""
column_name: str
data_path: str
@dataclass(**DC_KWARGS)
class ExportProfile:
"""
Represents a full export profile, including its name and the list of fields.
Attributes:
name (str): The unique name of the profile (e.g., "Navigation Data").
fields (List[ExportField]): An ordered list of fields to export. The order
determines the column order in the CSV.
"""
name: str
fields: List[ExportField] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Converts the profile to a dictionary for JSON serialization."""
return {
"name": self.name,
"fields": [
{"column_name": f.column_name, "data_path": f.data_path}
for f in self.fields
],
}
@staticmethod
def from_dict(data: Dict[str, Any]) -> "ExportProfile":
"""Creates an ExportProfile instance from a dictionary."""
name = data.get("name", "Unnamed Profile")
fields_data = data.get("fields", [])
fields = [ExportField(**field_data) for field_data in fields_data]
return ExportProfile(name=name, fields=fields)

View File

@ -3,7 +3,7 @@ Worker process logic for reading and parsing radar data files.
"""
import multiprocessing as mp
from pathlib import Path
from typing import List, Optional, Dict, Iterator
from typing import List, Optional, Dict, Iterator, Tuple
import queue
import numpy as np
@ -20,6 +20,8 @@ BLOCK_NAME_OFFSET = 17
BLOCK_SIZE_OFFSET = 5
# in file_reader.py
def run_worker_process(filepath: Path, command_queue: mp.Queue, result_queue: mp.Queue):
"""This function is the main target for the multiprocessing.Process."""
log.info(f"[Worker-{mp.current_process().pid}] Started for file: {filepath.name}")
@ -33,11 +35,11 @@ def run_worker_process(filepath: Path, command_queue: mp.Queue, result_queue: mp
total_blocks = len(reader.block_indices or [])
result_queue.put({"type": "start", "total": total_blocks})
final_results = []
interrupted = False
batch_generator = reader.build_batches_generator()
for i, batch in enumerate(batch_generator):
for i, (batch, blocks_done_count) in enumerate(batch_generator):
try:
if not command_queue.empty() and command_queue.get_nowait() == "STOP":
log.warning(f"[Worker-{mp.current_process().pid}] Stop command received. Halting.")
@ -47,18 +49,21 @@ def run_worker_process(filepath: Path, command_queue: mp.Queue, result_queue: mp
pass
batch_count = i + 1
# --- The dictionary for the progress message is modified here ---
progress_data = {
"type": "progress",
"batch_id": batch_count,
"file_batch_counter": batch.header.header_data.signal_descr.batch_counter, # Added this field
"timetag": batch.header.header_data.signal_descr.ttag,
"heading": batch.header.header_data.general_settings.navigation.attitude.true_heading_rad,
"blocks_done": batch.header.header_sw.header_sw_part1.counter # Use a real counter if available
"blocks_done": blocks_done_count
# "heading" field has been removed
}
result_queue.put(progress_data)
final_results.append({"batch_id": batch_count, "timetag": progress_data["timetag"]})
batch.batch_id = batch_count
result_queue.put({"type": "data_batch", "data": batch})
result_queue.put({"type": "complete", "results": final_results, "interrupted": interrupted})
result_queue.put({"type": "complete", "interrupted": interrupted})
log.info(f"[Worker-{mp.current_process().pid}] Processing finished.")
except Exception as e:
log.error(f"[Worker-{mp.current_process().pid}] Unhandled exception: {e}", exc_info=True)
@ -92,19 +97,19 @@ class RadarFileReader:
log.error(f"Failed to load or find blocks: {e}")
return False
def build_batches_generator(self) -> Iterator[DataBatch]:
def build_batches_generator(self) -> Iterator[Tuple[DataBatch, int]]:
if self.block_indices is None or self.data_vector is None:
return
current_header: Optional[MainHeader] = None
current_signals: Dict[str, np.ndarray] = {}
for block_start_index in self.block_indices:
for block_num, block_start_index in enumerate(self.block_indices):
try:
block_name = self.data_vector[block_start_index + BLOCK_NAME_OFFSET]
if block_name == HEADER_BLOCK_NAME:
if current_header:
yield DataBatch(header=current_header, signals=current_signals)
yield DataBatch(header=current_header, signals=current_signals), block_num
current_header, _ = parse_main_header(self.data_vector, block_start_index)
current_signals = {}
@ -113,12 +118,21 @@ class RadarFileReader:
block_size_words = self.data_vector[block_start_index + BLOCK_SIZE_OFFSET] // 4
n_rbin = current_header.header_data.signal_descr.packet_descr.nrbin
n_pri = current_header.header_data.signal_descr.packet_descr.npri
current_signals[SIGNAL_TYPE_MAP[block_name]] = parse_signal_block(
# Call the parser and check the result
parsed_signal = parse_signal_block(
self.data_vector, block_start_index, block_size_words, n_rbin, n_pri
)
# Only add the signal if parsing was successful
if parsed_signal is not None:
current_signals[SIGNAL_TYPE_MAP[block_name]] = parsed_signal
except (ValueError, IndexError) as e:
log.warning(f"Parse error at offset {block_start_index}: {e}")
# This will now catch more critical errors, not the 'SGIN' one
log.warning(f"Critical parse error at offset {block_start_index}: {e}")
continue
if current_header:
yield DataBatch(header=current_header, signals=current_signals)
if current_header and self.block_indices:
last_block_num = len(self.block_indices)
yield DataBatch(header=current_header, signals=current_signals), last_block_num

View File

@ -402,17 +402,20 @@ def parse_signal_block(
block_size_words: int,
n_rbin: int,
n_pri: int,
) -> np.ndarray:
) -> np.ndarray | None: # Return type is now optional
"""Parses a block of I/Q signal data."""
log.debug(f"Parsing signal block at offset {block_offset_words} with size {n_rbin}x{n_pri}")
if n_rbin <= 0 or n_pri <= 0:
raise ValueError(f"Invalid signal dimensions: n_rbin={n_rbin}, n_pri={n_pri}")
log.warning(f"Invalid signal dimensions at offset {block_offset_words}: n_rbin={n_rbin}, n_pri={n_pri}")
return None
block_data = data[block_offset_words : block_offset_words + block_size_words]
marker_indices = np.where(block_data == ds.SIGNAL_DATA_MARKER)[0]
if len(marker_indices) == 0:
raise ValueError("Signal data marker 'SGIN' not found in block.")
# Instead of raising an error, log it and return None
log.debug(f"Signal data marker 'SGIN' not found in block at offset {block_offset_words}.")
return None
log.debug("Found 'SGIN' marker within signal block.")
signal_start_word = marker_indices[0] + 2
@ -421,6 +424,12 @@ def parse_signal_block(
num_words_to_read = (num_samples_iq + 1) // 2
raw_signal_words = block_data[signal_start_word : signal_start_word + num_words_to_read]
# Check if there's enough data after the marker
if raw_signal_words.size < num_words_to_read:
log.warning(f"Incomplete signal data at offset {block_offset_words}. Expected {num_words_to_read} words, found {raw_signal_words.size}.")
return None
iq_samples = raw_signal_words.view(np.int16)
i_data = iq_samples[::2]
@ -428,4 +437,9 @@ def parse_signal_block(
complex_signal = i_data.astype(np.float32) + 1j * q_data.astype(np.float32)
# Final check on dimensions before reshaping
if complex_signal.size != n_rbin * n_pri:
log.warning(f"Signal data size mismatch at offset {block_offset_words}. Expected {n_rbin * n_pri} samples, got {complex_signal.size}.")
return None
return complex_signal.reshape((n_rbin, n_pri))

View File

@ -4,10 +4,11 @@ Main View for the Radar Data Reader application.
import tkinter as tk
from tkinter import scrolledtext, filedialog, ttk
from pathlib import Path
from typing import Dict, Any
from typing import Dict, Any, List
import queue
from ..utils import logger
from ..core.export_profiles import ExportProfile
log = logger.get_logger(__name__)
@ -26,7 +27,7 @@ class MainWindow(tk.Frame):
self._init_vars()
self.pack(fill=tk.BOTH, expand=True)
self.master.title("Radar Data Reader")
self.master.geometry("800x700")
self.master.geometry("800x750") # Increased height slightly
self._create_widgets()
self._setup_gui_logging(logging_config)
@ -37,14 +38,28 @@ class MainWindow(tk.Frame):
def _init_vars(self):
"""Initialize all Tkinter variables."""
self.filepath_var = tk.StringVar()
self.output_filepath_var = tk.StringVar()
self.active_profile_var = tk.StringVar()
self.batch_id_var = tk.StringVar(value="N/A")
self.file_batch_counter_var = tk.StringVar(value="N/A")
self.timetag_var = tk.StringVar(value="N/A")
self.heading_var = tk.StringVar(value="N/A")
self.progress_var = tk.DoubleVar(value=0)
self.status_bar_var = tk.StringVar(value="Ready") # For status bar
self.status_bar_var = tk.StringVar(value="Ready")
def _create_widgets(self):
"""Create all the widgets for the main window."""
menu_bar = tk.Menu(self.master)
self.master.config(menu=menu_bar)
file_menu = tk.Menu(menu_bar, tearoff=0)
menu_bar.add_cascade(label="File", menu=file_menu)
file_menu.add_command(
label="Manage Export Profiles...",
command=self.controller.open_profile_editor
)
file_menu.add_separator()
file_menu.add_command(label="Exit", command=self.on_close)
main_frame = tk.Frame(self)
main_frame.pack(padx=10, pady=10, fill=tk.BOTH, expand=True)
main_frame.rowconfigure(2, weight=1)
@ -52,9 +67,20 @@ class MainWindow(tk.Frame):
controls_frame = ttk.LabelFrame(main_frame, text="Controls")
controls_frame.grid(row=0, column=0, sticky="ew", pady=(0, 5))
controls_frame.columnconfigure(1, weight=1)
# --- Row 0: Profile selection and main action buttons ---
ttk.Label(controls_frame, text="Export Profile:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.profile_combobox = ttk.Combobox(controls_frame, textvariable=self.active_profile_var, state="readonly")
self.profile_combobox.grid(row=0, column=1, padx=(0, 5), pady=5, sticky="ew")
self.process_button = ttk.Button(controls_frame, text="Process File", command=self.on_process_click)
self.process_button.grid(row=0, column=2, padx=(0, 5), pady=5, sticky="ew")
self.stop_button = ttk.Button(controls_frame, text="Stop", command=self.on_stop_click, state=tk.DISABLED)
self.stop_button.grid(row=0, column=3, padx=(5, 5), pady=5, sticky="ew")
# --- Row 1: Input File selection ---
file_select_frame = ttk.Frame(controls_frame)
file_select_frame.pack(fill=tk.X, padx=5, pady=5)
file_select_frame.grid(row=1, column=0, columnspan=4, sticky="ew", padx=5, pady=(0, 5))
file_select_frame.columnconfigure(1, weight=1)
ttk.Label(file_select_frame, text="Radar File:").grid(row=0, column=0, padx=(0, 5))
self.file_entry = ttk.Entry(file_select_frame, textvariable=self.filepath_var, state="readonly")
@ -62,26 +88,29 @@ class MainWindow(tk.Frame):
self.browse_button = ttk.Button(file_select_frame, text="Browse...", command=self.on_browse_click)
self.browse_button.grid(row=0, column=2, padx=(5, 0))
button_frame = ttk.Frame(controls_frame)
button_frame.pack(fill=tk.X, padx=5, pady=5)
button_frame.columnconfigure(0, weight=1)
button_frame.columnconfigure(1, weight=1)
self.process_button = ttk.Button(button_frame, text="Process File", command=self.on_process_click)
self.process_button.grid(row=0, column=0, sticky="ew", padx=2)
self.stop_button = ttk.Button(button_frame, text="Stop", command=self.on_stop_click, state=tk.DISABLED)
self.stop_button.grid(row=0, column=1, sticky="ew", padx=2)
# --- Row 2: Output File selection ---
output_file_frame = ttk.Frame(controls_frame)
output_file_frame.grid(row=2, column=0, columnspan=4, sticky="ew", padx=5, pady=(0, 5))
output_file_frame.columnconfigure(1, weight=1)
ttk.Label(output_file_frame, text="Output CSV File:").grid(row=0, column=0, padx=(0, 5))
self.output_file_entry = ttk.Entry(output_file_frame, textvariable=self.output_filepath_var)
self.output_file_entry.grid(row=0, column=1, sticky="ew")
self.save_as_button = ttk.Button(output_file_frame, text="Save As...", command=self.on_save_as_click)
self.save_as_button.grid(row=0, column=2, padx=(5, 0))
status_frame = ttk.LabelFrame(main_frame, text="Live Data & Progress")
status_frame.grid(row=1, column=0, sticky="ew", pady=5)
status_frame.columnconfigure(1, weight=1)
status_frame.columnconfigure(3, weight=1)
status_frame.columnconfigure(5, weight=1)
ttk.Label(status_frame, text="Batch ID:").grid(row=0, column=0, padx=5)
ttk.Label(status_frame, text="Batch ID:").grid(row=0, column=0, padx=5, sticky="w")
ttk.Label(status_frame, textvariable=self.batch_id_var).grid(row=0, column=1, sticky="w")
ttk.Label(status_frame, text="TimeTag:").grid(row=0, column=2, padx=5)
ttk.Label(status_frame, textvariable=self.timetag_var).grid(row=0, column=3, sticky="w")
ttk.Label(status_frame, text="Heading:").grid(row=0, column=4, padx=5)
ttk.Label(status_frame, textvariable=self.heading_var).grid(row=0, column=5, sticky="w")
ttk.Label(status_frame, text="File Batch Cntr:").grid(row=0, column=2, padx=5, sticky="w")
ttk.Label(status_frame, textvariable=self.file_batch_counter_var).grid(row=0, column=3, sticky="w")
ttk.Label(status_frame, text="TimeTag:").grid(row=0, column=4, padx=5, sticky="w")
ttk.Label(status_frame, textvariable=self.timetag_var).grid(row=0, column=5, sticky="w")
self.progress_bar = ttk.Progressbar(status_frame, variable=self.progress_var, maximum=100)
self.progress_bar.grid(row=1, column=0, columnspan=6, sticky="ew", padx=5, pady=5)
@ -92,7 +121,6 @@ class MainWindow(tk.Frame):
self.log_widget = scrolledtext.ScrolledText(log_frame, state=tk.DISABLED, wrap=tk.WORD)
self.log_widget.grid(row=0, column=0, sticky="nsew", padx=5, pady=5)
# Status Bar at the very bottom
self.status_bar = ttk.Label(self, textvariable=self.status_bar_var, relief=tk.SUNKEN, anchor=tk.W, padding=2)
self.status_bar.pack(side=tk.BOTTOM, fill=tk.X)
@ -105,16 +133,48 @@ class MainWindow(tk.Frame):
def get_filepath(self) -> str:
return self.filepath_var.get()
def set_output_filepath(self, path: str):
self.output_filepath_var.set(path)
def get_output_filepath(self) -> str:
return self.output_filepath_var.get()
def ask_open_filename(self, current_path: str) -> str:
initial_dir = Path(current_path).parent if current_path and Path(current_path).exists() else Path.cwd()
return filedialog.askopenfilename(initialdir=initial_dir, filetypes=[("Radar Output", "*.out"), ("All files", "*.*")])
def ask_save_as_filename(self, current_path: str) -> str:
"""Opens a 'save as' dialog for the CSV file."""
initial_dir = Path(current_path).parent if current_path else Path.cwd()
initial_file = Path(current_path).name if current_path else ""
return filedialog.asksaveasfilename(
initialdir=initial_dir,
initialfile=initial_file,
defaultextension=".csv",
filetypes=[("CSV files", "*.csv"), ("All files", "*.*")]
)
def update_export_profiles(self, profiles: List[ExportProfile], active_profile_name: str):
"""Updates the export profile combobox."""
profile_names = [p.name for p in profiles]
self.profile_combobox['values'] = profile_names
if active_profile_name in profile_names:
self.active_profile_var.set(active_profile_name)
elif profile_names:
self.active_profile_var.set(profile_names[0])
else:
self.active_profile_var.set("")
def get_active_profile_name(self) -> str:
"""Returns the name of the currently selected export profile."""
return self.active_profile_var.get()
def start_processing_ui(self):
"""Prepares the UI for processing and starts the update loop."""
self.update_ui_for_processing_state(True)
self.batch_id_var.set("Starting...")
self.file_batch_counter_var.set("N/A")
self.timetag_var.set("N/A")
self.heading_var.set("N/A")
self.progress_var.set(0)
self.after(100, self.poll_result_queue)
@ -122,7 +182,9 @@ class MainWindow(tk.Frame):
"""Toggles the state of UI controls and status bar based on processing status."""
state = tk.DISABLED if is_processing else tk.NORMAL
self.browse_button.config(state=state)
self.save_as_button.config(state=state)
self.process_button.config(state=state)
self.profile_combobox.config(state=state)
self.stop_button.config(state=tk.NORMAL if is_processing else tk.DISABLED)
if is_processing:
@ -146,37 +208,38 @@ class MainWindow(tk.Frame):
elif msg_type == "progress":
batch_id = msg.get("batch_id", "N/A")
file_batch_counter = msg.get("file_batch_counter", "N/A")
timetag = msg.get("timetag", "N/A")
heading = msg.get('heading', 0.0)
# Log the progress to the GUI log widget
log.info(f"Processed Batch ID: {batch_id} (TimeTag: {timetag})")
log.info(f"Processed Batch ID: {batch_id} (File Counter: {file_batch_counter}, TimeTag: {timetag})")
# Update the live data labels
self.batch_id_var.set(str(batch_id))
self.file_batch_counter_var.set(str(file_batch_counter))
self.timetag_var.set(str(timetag))
self.heading_var.set(f"{heading:.5f}")
if self.total_items_for_progress > 0:
progress = (msg.get("blocks_done", 0) / self.total_items_for_progress) * 100
self.progress_var.set(progress)
elif msg_type == "data_batch":
self.controller.handle_data_batch(msg.get("data"))
elif msg_type == "complete":
self.controller.handle_worker_completion(
results=msg.get("results", []),
was_interrupted=msg.get("interrupted", False)
)
self.update_ui_for_processing_state(False)
return # Stop polling
return
elif msg_type == "error":
log.error(f"Received error from worker: {msg.get('message')}")
self.controller.handle_worker_completion(was_interrupted=True)
self.update_ui_for_processing_state(False)
self.batch_id_var.set("Error!")
return # Stop polling
return
except queue.Empty:
pass # Normal case, no new messages
pass
except Exception as e:
log.error(f"Error in GUI polling loop: {e}")
@ -195,3 +258,6 @@ class MainWindow(tk.Frame):
def on_close(self):
self.controller.shutdown()
self.master.destroy()
def on_save_as_click(self):
self.controller.select_output_file()

View File

@ -0,0 +1,290 @@
"""
GUI Window for creating, editing, and deleting export profiles.
"""
import tkinter as tk
from tkinter import ttk, simpledialog, messagebox
import dataclasses
import inspect
import copy
from typing import List, Type
from ..core import data_structures as ds
from ..core.export_profiles import ExportProfile, ExportField
from ..utils import logger
log = logger.get_logger(__name__)
# --- Constants ---
# Base classes from which to start introspection.
# We add DataBatch to make fields like 'batch_id' available at the top level.
ROOT_DATA_CLASSES = {
"DataBatch": ds.DataBatch,
}
# Primitive types that should be treated as leaves in the tree
LEAF_TYPES = {int, float, str, bool}
class ProfileEditorWindow(tk.Toplevel):
"""A Toplevel window for managing export profiles."""
def __init__(self, master, controller, profiles: List[ExportProfile]):
super().__init__(master)
self.master = master
self.controller = controller
# Work on a deep copy to easily detect unsaved changes
self.profiles = copy.deepcopy(profiles)
self._original_profiles_dict = {p.name: p.to_dict() for p in self.profiles}
self._init_window()
self._init_vars()
self._create_widgets()
self._populate_available_fields_tree()
self._load_profiles_to_combobox()
self.protocol("WM_DELETE_WINDOW", self._on_close)
def _init_window(self):
"""Initializes window properties."""
self.title("Export Profile Editor")
self.geometry("900x600")
self.transient(self.master) # Keep this window on top of the main one
self.grab_set() # Modal behavior
def _init_vars(self):
"""Initializes Tkinter variables."""
self.selected_profile_name = tk.StringVar()
self.selected_fields_var = tk.Variable(value=[])
def _create_widgets(self):
"""Creates the main layout and widgets for the editor."""
main_pane = ttk.PanedWindow(self, orient=tk.HORIZONTAL)
main_pane.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# --- Left Frame: Profile Management ---
profile_mgmt_frame = ttk.LabelFrame(main_pane, text="Profiles")
main_pane.add(profile_mgmt_frame, weight=1)
profile_mgmt_frame.columnconfigure(0, weight=1)
cb_frame = ttk.Frame(profile_mgmt_frame)
cb_frame.grid(row=0, column=0, sticky="ew", padx=5, pady=5)
cb_frame.columnconfigure(0, weight=1)
self.profile_combobox = ttk.Combobox(cb_frame, textvariable=self.selected_profile_name, state="readonly")
self.profile_combobox.grid(row=0, column=0, sticky="ew")
self.profile_combobox.bind("<<ComboboxSelected>>", self._on_profile_selected)
btn_frame = ttk.Frame(profile_mgmt_frame)
btn_frame.grid(row=1, column=0, sticky="ew", padx=5)
btn_frame.columnconfigure((0, 1, 2), weight=1)
ttk.Button(btn_frame, text="New", command=self._on_new_profile).grid(row=0, column=0, sticky="ew", padx=2)
ttk.Button(btn_frame, text="Delete", command=self._on_delete_profile).grid(row=0, column=1, sticky="ew", padx=2)
# --- Middle Frame: Available Fields ---
fields_frame = ttk.LabelFrame(main_pane, text="Available Fields")
main_pane.add(fields_frame, weight=2)
fields_frame.rowconfigure(0, weight=1)
fields_frame.columnconfigure(0, weight=1)
self.fields_tree = ttk.Treeview(fields_frame, selectmode="browse")
self.fields_tree.grid(row=0, column=0, sticky="nsew", padx=5, pady=5)
ysb = ttk.Scrollbar(fields_frame, orient='vertical', command=self.fields_tree.yview)
self.fields_tree.configure(yscrollcommand=ysb.set)
ysb.grid(row=0, column=1, sticky='ns')
# --- Right Frame: Selected Fields and Actions ---
selected_frame_container = ttk.Frame(main_pane)
main_pane.add(selected_frame_container, weight=2)
selected_frame_container.rowconfigure(0, weight=1)
selected_frame_container.columnconfigure(1, weight=1)
action_btn_frame = ttk.Frame(selected_frame_container)
action_btn_frame.grid(row=0, column=0, sticky="ns", padx=5, pady=5)
ttk.Button(action_btn_frame, text=">>", command=self._add_field).grid(pady=5)
ttk.Button(action_btn_frame, text="<<", command=self._remove_field).grid(pady=5)
ttk.Button(action_btn_frame, text="Up", command=lambda: self._move_field(-1)).grid(pady=20)
ttk.Button(action_btn_frame, text="Down", command=lambda: self._move_field(1)).grid(pady=5)
selected_fields_frame = ttk.LabelFrame(selected_frame_container, text="Selected Fields for Profile")
selected_fields_frame.grid(row=0, column=1, sticky="nsew")
selected_fields_frame.rowconfigure(0, weight=1)
selected_fields_frame.columnconfigure(0, weight=1)
self.selected_listbox = tk.Listbox(selected_fields_frame, listvariable=self.selected_fields_var, selectmode=tk.SINGLE)
self.selected_listbox.grid(row=0, column=0, sticky="nsew", padx=5, pady=5)
ysb_list = ttk.Scrollbar(selected_fields_frame, orient='vertical', command=self.selected_listbox.yview)
self.selected_listbox.config(yscrollcommand=ysb_list.set)
ysb_list.grid(row=0, column=1, sticky='ns')
# --- Bottom Frame: Save/Cancel ---
bottom_frame = ttk.Frame(self)
bottom_frame.pack(fill=tk.X, padx=10, pady=(0, 10))
ttk.Button(bottom_frame, text="Save & Close", command=self._on_save_and_close).pack(side=tk.RIGHT)
ttk.Button(bottom_frame, text="Cancel", command=self._on_close).pack(side=tk.RIGHT, padx=5)
def _populate_available_fields_tree(self):
"""Introspects data structures and populates the Treeview."""
for name, class_obj in ROOT_DATA_CLASSES.items():
# Special handling for DataBatch to show its synthetic 'batch_id'
if name == "DataBatch":
self.fields_tree.insert("", "end", iid="batch_id", text="batch_id", values=("batch_id",))
# Now inspect its real fields, starting with an empty path
self._recursive_populate_tree(class_obj, "", "")
def _recursive_populate_tree(self, class_obj: Type, parent_id: str, current_path: str):
"""Recursively explores dataclasses to build the tree."""
if not dataclasses.is_dataclass(class_obj):
return
for field in dataclasses.fields(class_obj):
field_name = field.name
field_type = field.type
# --- THIS IS THE CORRECTED LINE ---
new_path = f"{current_path}.{field_name}" if current_path else field_name
node_id = new_path
# Check if field type is another dataclass (a branch)
if dataclasses.is_dataclass(field_type):
child_node = self.fields_tree.insert(parent_id, "end", iid=node_id, text=field_name)
self._recursive_populate_tree(field_type, child_node, new_path)
# Check if it's a primitive type (a leaf)
elif field_type in LEAF_TYPES:
self.fields_tree.insert(parent_id, "end", iid=node_id, text=field_name, values=(new_path,))
# Handle List[dataclass] or other complex types if needed in future
else:
self.fields_tree.insert(parent_id, "end", iid=node_id, text=f"{field_name} [{field_type.__name__}]")
def _load_profiles_to_combobox(self):
"""Updates the profile combobox with current profile names."""
profile_names = [p.name for p in self.profiles]
self.profile_combobox['values'] = profile_names
if profile_names:
self.selected_profile_name.set(profile_names[0])
self._load_profile_into_ui()
def _get_current_profile(self) -> ExportProfile | None:
"""Finds the profile object matching the current combobox selection."""
name = self.selected_profile_name.get()
return next((p for p in self.profiles if p.name == name), None)
def _on_profile_selected(self, event=None):
"""Handles changing the profile selection."""
# Here you could add logic to check for unsaved changes before switching
self._load_profile_into_ui()
def _load_profile_into_ui(self):
"""Loads the fields of the currently selected profile into the listbox."""
profile = self._get_current_profile()
if not profile:
self.selected_fields_var.set([])
return
display_list = [f.column_name for f in profile.fields]
self.selected_fields_var.set(display_list)
def _add_field(self):
"""Adds the selected field from the tree to the current profile."""
selected_item_id = self.fields_tree.focus()
if not selected_item_id:
return
# We only add leaf nodes which have a data path
data_path = self.fields_tree.item(selected_item_id, "values")
if not data_path:
messagebox.showinfo("Cannot Add Field", "Please select a specific data field (a leaf node), not a category.", parent=self)
return
data_path = data_path[0]
column_name = selected_item_id.split('.')[-1]
profile = self._get_current_profile()
if not profile: return
# Avoid duplicates
if any(f.data_path == data_path for f in profile.fields):
messagebox.showinfo("Duplicate Field", f"The field '{column_name}' is already in the profile.", parent=self)
return
profile.fields.append(ExportField(column_name=column_name, data_path=data_path))
self._load_profile_into_ui()
def _remove_field(self):
"""Removes the selected field from the current profile."""
selection = self.selected_listbox.curselection()
if not selection: return
index_to_remove = selection[0]
profile = self._get_current_profile()
if not profile: return
del profile.fields[index_to_remove]
self._load_profile_into_ui()
def _move_field(self, direction: int):
"""Moves the selected field up or down in the list."""
selection = self.selected_listbox.curselection()
if not selection: return
index = selection[0]
new_index = index + direction
profile = self._get_current_profile()
if not profile or not (0 <= new_index < len(profile.fields)):
return
fields = profile.fields
fields.insert(new_index, fields.pop(index))
self._load_profile_into_ui()
self.selected_listbox.selection_set(new_index)
def _on_new_profile(self):
"""Creates a new, empty profile."""
name = simpledialog.askstring("New Profile", "Enter a name for the new profile:", parent=self)
if not name or not name.strip():
return
if any(p.name == name for p in self.profiles):
messagebox.showerror("Error", f"A profile with the name '{name}' already exists.", parent=self)
return
new_profile = ExportProfile(name=name.strip())
self.profiles.append(new_profile)
self._load_profiles_to_combobox()
self.selected_profile_name.set(name)
self._load_profile_into_ui()
def _on_delete_profile(self):
"""Deletes the currently selected profile."""
profile = self._get_current_profile()
if not profile: return
if messagebox.askyesno("Confirm Delete", f"Are you sure you want to delete the profile '{profile.name}'?", parent=self):
self.profiles.remove(profile)
self._load_profiles_to_combobox()
def _check_unsaved_changes(self) -> bool:
"""Checks if any profiles have been modified since last save."""
current_profiles_dict = {p.name: p.to_dict() for p in self.profiles}
return current_profiles_dict != self._original_profiles_dict
def _on_save_and_close(self):
"""Saves all profiles and closes the window."""
log.info("Saving export profiles and closing editor.")
self.controller.save_export_profiles(self.profiles)
self.destroy()
def _on_close(self):
"""Handles the window close event."""
if self._check_unsaved_changes():
response = messagebox.askyesnocancel(
"Unsaved Changes",
"You have unsaved changes. Would you like to save them before closing?",
parent=self
)
if response is True: # Yes
self._on_save_and_close()
elif response is False: # No
self.destroy()
else: # Cancel
return
else:
self.destroy()

View File

@ -4,9 +4,10 @@ Handles loading and saving of application settings to a JSON file.
"""
import json
from pathlib import Path
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, List
from . import logger
from ..core.export_profiles import ExportProfile, ExportField
log = logger.get_logger(__name__)
@ -23,42 +24,80 @@ class ConfigManager:
"""
self.config_path: Path = config_path
self.config: Dict[str, Any] = self._get_default_config()
# In-memory representation as objects
self.export_profiles: List[ExportProfile] = []
@staticmethod
def _get_default_config() -> Dict[str, Any]:
"""Provides the default configuration structure."""
default_profile = ExportProfile(
name="Default",
fields=[
ExportField(column_name="batch_id", data_path="batch_id"),
ExportField(column_name="timetag", data_path="header.header_data.signal_descr.ttag"),
]
)
return {
"last_opened_file": ""
"last_opened_file": "",
"last_output_file": "", # <-- NUOVA CHIAVE
"active_export_profile_name": "Default",
"export_profiles": [default_profile.to_dict()]
}
def load_config(self) -> None:
"""
Loads the configuration from the JSON file.
If the file does not exist, it starts with the default configuration.
"""
"""Loads the configuration from the JSON file."""
log.info(f"Attempting to load configuration from: {self.config_path}")
if self.config_path.is_file():
try:
with open(self.config_path, 'r', encoding='utf-8') as f:
loaded_data = json.load(f)
# Merge loaded data with defaults to ensure all keys exist
self.config.update(loaded_data)
log.info("Configuration loaded successfully.")
except (json.JSONDecodeError, IOError) as e:
self.config['last_opened_file'] = loaded_data.get('last_opened_file', '')
self.config['last_output_file'] = loaded_data.get('last_output_file', '') # <-- NUOVA RIGA
self.config['active_export_profile_name'] = loaded_data.get('active_export_profile_name', 'Default')
profiles_data = loaded_data.get("export_profiles", [])
self.export_profiles = [ExportProfile.from_dict(p_data) for p_data in profiles_data]
if not self.export_profiles:
log.warning("No export profiles found in config, loading defaults.")
self._load_default_profiles()
log.info(f"Configuration loaded successfully. Found {len(self.export_profiles)} export profiles.")
except (json.JSONDecodeError, IOError, KeyError) as e:
log.error(f"Failed to load or parse config file: {e}. Using defaults.")
self.config = self._get_default_config()
self._load_defaults()
else:
log.warning("Configuration file not found. Using default settings.")
self.config = self._get_default_config()
self._load_defaults()
def _load_defaults(self):
"""Loads all default settings into the manager."""
self.config = self._get_default_config()
self._load_default_profiles()
def _load_default_profiles(self):
"""Loads just the default profiles from the default config."""
default_conf = self._get_default_config()
profiles_data = default_conf.get("export_profiles", [])
self.export_profiles = [ExportProfile.from_dict(p_data) for p_data in profiles_data]
def save_config(self) -> None:
"""Saves the current configuration to the JSON file."""
log.info(f"Saving configuration to: {self.config_path}")
data_to_save = {
"last_opened_file": self.get("last_opened_file", ""),
"last_output_file": self.get("last_output_file", ""), # <-- NUOVA RIGA
"active_export_profile_name": self.get("active_export_profile_name", "Default"),
"export_profiles": [profile.to_dict() for profile in self.export_profiles]
}
try:
# Ensure the parent directory exists
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.config_path, 'w', encoding='utf-8') as f:
json.dump(self.config, f, indent=4)
json.dump(data_to_save, f, indent=4)
log.info("Configuration saved successfully.")
except IOError as e:
log.error(f"Failed to save configuration file: {e}")
@ -70,3 +109,12 @@ class ConfigManager:
def set(self, key: str, value: Any) -> None:
"""Sets a value in the configuration."""
self.config[key] = value
def get_export_profiles(self) -> List[ExportProfile]:
"""Returns the list of loaded export profiles."""
return self.export_profiles
def save_export_profiles(self, profiles: List[ExportProfile]):
"""Updates the list of export profiles and saves the config."""
self.export_profiles = profiles
self.save_config()