diff --git a/radar_data_reader/core/app_controller.py b/radar_data_reader/core/app_controller.py index bb7c05c..e355bb2 100644 --- a/radar_data_reader/core/app_controller.py +++ b/radar_data_reader/core/app_controller.py @@ -1,15 +1,15 @@ """ Application Controller for the Radar Data Reader. -Orchestrates the interaction between the GUI and the core processing logic. +Orchestrates the interaction between the GUI and the core processing logic using multiprocessing. """ +import multiprocessing as mp import threading import csv from pathlib import Path from typing import List from ..utils.config_manager import ConfigManager -from ..core.file_reader import RadarFileReader -from ..core.data_structures import DataBatch +from ..core.file_reader import run_worker_process from ..utils import logger log = logger.get_logger(__name__) @@ -21,9 +21,11 @@ class AppController: def __init__(self, config_manager: ConfigManager): self.config_manager = config_manager self.view = None - self.worker_thread: threading.Thread | None = None - self.worker: RadarFileReader | None = None + self.worker_process: mp.Process | None = None self.is_processing = False + + self.command_queue = mp.Queue() + self.result_queue = mp.Queue() def bind_view(self, view): """Binds the GUI/View to this controller.""" @@ -31,101 +33,91 @@ class AppController: self._load_initial_config() def _load_initial_config(self): + """Loads initial settings from config and updates the view.""" if last_file := self.config_manager.get("last_opened_file"): if Path(last_file).is_file(): self.view.set_filepath(last_file) def select_file(self): + """Opens a file dialog to select a radar file.""" current_path = self.view.get_filepath() if filepath := self.view.ask_open_filename(current_path): self.view.set_filepath(filepath) def start_processing(self): - """Starts the file processing worker in a separate thread.""" - if self.is_processing: return + """Starts the file processing in a separate process.""" + if self.is_processing: + log.warning("Processing is already in progress.") + return + filepath_str = self.view.get_filepath() if not filepath_str or not Path(filepath_str).is_file(): - log.error("No valid file selected.") + log.error("No valid file selected to process.") return self.is_processing = True self.view.start_processing_ui() - self.worker = RadarFileReader(Path(filepath_str)) + self.config_manager.set("last_opened_file", filepath_str) + self.config_manager.save_config() - self.worker_thread = threading.Thread( - target=self._run_processing, + # Clear queues before starting a new process + while not self.command_queue.empty(): self.command_queue.get() + while not self.result_queue.empty(): self.result_queue.get() + + self.worker_process = mp.Process( + target=run_worker_process, + args=(Path(filepath_str), self.command_queue, self.result_queue), daemon=True ) - self.worker_thread.start() + self.worker_process.start() + + self.view.poll_result_queue() def stop_processing(self): - if not self.is_processing or not self.worker: return - log.info("Stop requested by user. Signaling worker...") - self.worker.stop() + """Sends a stop command to the worker process.""" + if not self.is_processing: + log.warning("Stop clicked, but no process is running.") + return + log.info("Stop requested by user. Sending STOP command to worker...") + self.command_queue.put("STOP") - def _run_processing(self): - """The actual processing logic that runs in the thread.""" - saved_batches: List[DataBatch] = [] - try: - filepath = self.worker.file_path - self.config_manager.set("last_opened_file", str(filepath)) - self.config_manager.save_config() - - if self.worker.load_and_find_blocks(): - if not self.worker.stop_event.is_set(): - total_blocks = len(self.worker.block_indices or []) - self.view.put_message_in_queue({"type": "start", "total": total_blocks}) - - def progress_callback(batch: DataBatch, batch_count: int, blocks_done: int): - self._on_batch_processed(batch, batch_count, blocks_done) - saved_batches.append(batch) - - # Pass the callback to the worker - self.worker.build_batches(callback=progress_callback) + def handle_worker_completion(self, results: List[dict], was_interrupted: bool): + """Handles the final results from the worker.""" + status = "Interrupted by user" if was_interrupted else "Processing Complete" + log.info(f"--- {status}. Found {len(results)} batches. ---") + + filepath = Path(self.view.get_filepath()) + if results and filepath: + self._save_results_to_csv(results, filepath) - interrupted = self.worker.stop_event.is_set() - log_msg = "Interrupted by user" if interrupted else "Processing Complete" - log.info(f"--- {log_msg}. Found {len(saved_batches)} batches. ---") - - if saved_batches: - self._save_results_to_csv(saved_batches, filepath) + self.is_processing = False + self.worker_process = None - except Exception as e: - log.error(f"An error occurred in worker thread: {e}", exc_info=True) - finally: - self.is_processing = False - self.worker = None - self.view.put_message_in_queue({"type": "done"}) - - def _on_batch_processed(self, batch: DataBatch, batch_count: int, blocks_done: int): - """Puts progress info into the view's queue.""" - update_info = { - "type": "progress", - "batch_id": batch_count, - "timetag": batch.header.header_data.signal_descr.ttag, - "blocks_done": blocks_done - } - self.view.put_message_in_queue(update_info) - - def _save_results_to_csv(self, batches: List[DataBatch], source_path: Path): + def _save_results_to_csv(self, batch_results: List[dict], source_path: Path): + """Saves the extracted data to a CSV file.""" output_path = source_path.with_suffix('.csv') - log.info(f"Saving {len(batches)} records to: {output_path}") + log.info(f"Saving {len(batch_results)} records to: {output_path}") header = ["batch_id", "TimeTag"] try: with open(output_path, 'w', encoding='utf-8', newline='') as f: writer = csv.writer(f) writer.writerow(header) - for i, batch in enumerate(batches): - writer.writerow([i + 1, batch.header.header_data.signal_descr.ttag]) + for record in batch_results: + writer.writerow([record["batch_id"], record["timetag"]]) log.info("Save complete.") except IOError as e: log.error(f"Failed to write CSV: {e}") def shutdown(self): + """Handles application shutdown logic.""" log.info("Controller shutting down.") self.stop_processing() - if self.worker_thread and self.worker_thread.is_alive(): - self.worker_thread.join(timeout=1.0) + if self.worker_process and self.worker_process.is_alive(): + log.info("Waiting for worker process to join...") + self.worker_process.join(timeout=1.0) + if self.worker_process.is_alive(): + log.warning("Worker process did not exit gracefully, terminating.") + self.worker_process.terminate() logger.shutdown_logging_system() \ No newline at end of file diff --git a/radar_data_reader/core/file_reader.py b/radar_data_reader/core/file_reader.py index a6cfaf2..49d38d9 100644 --- a/radar_data_reader/core/file_reader.py +++ b/radar_data_reader/core/file_reader.py @@ -1,10 +1,10 @@ """ -Worker class to read and process radar data from a binary file. +Worker process logic for reading and parsing radar data files. """ -import threading -import time +import multiprocessing as mp from pathlib import Path -from typing import List, Optional, Dict, Callable +from typing import List, Optional, Dict, Iterator +import queue import numpy as np @@ -20,63 +20,94 @@ BLOCK_NAME_OFFSET = 17 BLOCK_SIZE_OFFSET = 5 +def run_worker_process(filepath: Path, command_queue: mp.Queue, result_queue: mp.Queue): + """This function is the main target for the multiprocessing.Process.""" + log.info(f"[Worker-{mp.current_process().pid}] Started for file: {filepath.name}") + + try: + reader = RadarFileReader(filepath) + if not reader.load_and_find_blocks(): + result_queue.put({"type": "error", "message": "Failed to load or find blocks."}) + return + + total_blocks = len(reader.block_indices or []) + result_queue.put({"type": "start", "total": total_blocks}) + + final_results = [] + interrupted = False + + batch_generator = reader.build_batches_generator() + for i, batch in enumerate(batch_generator): + try: + if not command_queue.empty() and command_queue.get_nowait() == "STOP": + log.warning(f"[Worker-{mp.current_process().pid}] Stop command received. Halting.") + interrupted = True + break + except queue.Empty: + pass + + batch_count = i + 1 + progress_data = { + "type": "progress", + "batch_id": batch_count, + "timetag": batch.header.header_data.signal_descr.ttag, + "heading": batch.header.header_data.general_settings.navigation.attitude.true_heading_rad, + "blocks_done": batch.header.header_sw.header_sw_part1.counter # Use a real counter if available + } + result_queue.put(progress_data) + + final_results.append({"batch_id": batch_count, "timetag": progress_data["timetag"]}) + + result_queue.put({"type": "complete", "results": final_results, "interrupted": interrupted}) + log.info(f"[Worker-{mp.current_process().pid}] Processing finished.") + except Exception as e: + log.error(f"[Worker-{mp.current_process().pid}] Unhandled exception: {e}", exc_info=True) + result_queue.put({"type": "error", "message": f"Worker failed: {e}"}) + + class RadarFileReader: - """Worker class to read and process radar data from a binary file.""" + """Class containing the pure logic for file reading and parsing.""" def __init__(self, file_path: Path): - if not file_path.is_file(): - raise FileNotFoundError(f"File not found: {file_path}") - self.file_path: Path = file_path + self.file_path = file_path self.data_vector: Optional[np.ndarray] = None self.block_indices: Optional[List[int]] = None - self.stop_event = threading.Event() - - def stop(self): - if not self.stop_event.is_set(): - self.stop_event.set() def load_and_find_blocks(self) -> bool: - if self.stop_event.is_set(): return False log.info(f"Loading data from {self.file_path}...") - self.data_vector = np.fromfile(str(self.file_path), dtype=' Iterator[DataBatch]: + if self.block_indices is None or self.data_vector is None: + return current_header: Optional[MainHeader] = None current_signals: Dict[str, np.ndarray] = {} - batch_count = 0 - - for i, block_start_index in enumerate(self.block_indices): - if self.stop_event.is_set(): - log.info("Processing loop in worker interrupted.") - break - - if i > 0 and i % 500 == 0: - time.sleep(0.001) + for block_start_index in self.block_indices: try: block_name = self.data_vector[block_start_index + BLOCK_NAME_OFFSET] if block_name == HEADER_BLOCK_NAME: if current_header: - new_batch = DataBatch(header=current_header, signals=current_signals) - if callback: callback(new_batch, batch_count, i) + yield DataBatch(header=current_header, signals=current_signals) current_header, _ = parse_main_header(self.data_vector, block_start_index) current_signals = {} - batch_count += 1 elif block_name in SIGNAL_TYPE_MAP and current_header: block_size_words = self.data_vector[block_start_index + BLOCK_SIZE_OFFSET] // 4 @@ -87,7 +118,7 @@ class RadarFileReader: ) except (ValueError, IndexError) as e: log.warning(f"Parse error at offset {block_start_index}: {e}") + continue - if current_header and not self.stop_event.is_set(): - last_batch = DataBatch(header=current_header, signals=current_signals) - if callback: callback(last_batch, batch_count, len(self.block_indices)) \ No newline at end of file + if current_header: + yield DataBatch(header=current_header, signals=current_signals) \ No newline at end of file diff --git a/radar_data_reader/gui/main_window.py b/radar_data_reader/gui/main_window.py index 5a3cadb..5e504e4 100644 --- a/radar_data_reader/gui/main_window.py +++ b/radar_data_reader/gui/main_window.py @@ -20,7 +20,7 @@ class MainWindow(tk.Frame): self.master = master self.controller = controller - self.gui_update_queue = self.controller.result_queue + self.gui_update_queue = controller.result_queue self.total_items_for_progress = 0 self._init_vars() @@ -35,13 +35,16 @@ class MainWindow(tk.Frame): log.info("Main window View initialized.") def _init_vars(self): + """Initialize all Tkinter variables.""" self.filepath_var = tk.StringVar() self.batch_id_var = tk.StringVar(value="N/A") self.timetag_var = tk.StringVar(value="N/A") + self.heading_var = tk.StringVar(value="N/A") self.progress_var = tk.DoubleVar(value=0) - self.status_bar_var = tk.StringVar(value="Ready") + self.status_bar_var = tk.StringVar(value="Ready") # For status bar def _create_widgets(self): + """Create all the widgets for the main window.""" main_frame = tk.Frame(self) main_frame.pack(padx=10, pady=10, fill=tk.BOTH, expand=True) main_frame.rowconfigure(2, weight=1) @@ -72,12 +75,15 @@ class MainWindow(tk.Frame): status_frame.grid(row=1, column=0, sticky="ew", pady=5) status_frame.columnconfigure(1, weight=1) status_frame.columnconfigure(3, weight=1) + status_frame.columnconfigure(5, weight=1) ttk.Label(status_frame, text="Batch ID:").grid(row=0, column=0, padx=5) ttk.Label(status_frame, textvariable=self.batch_id_var).grid(row=0, column=1, sticky="w") ttk.Label(status_frame, text="TimeTag:").grid(row=0, column=2, padx=5) ttk.Label(status_frame, textvariable=self.timetag_var).grid(row=0, column=3, sticky="w") + ttk.Label(status_frame, text="Heading:").grid(row=0, column=4, padx=5) + ttk.Label(status_frame, textvariable=self.heading_var).grid(row=0, column=5, sticky="w") self.progress_bar = ttk.Progressbar(status_frame, variable=self.progress_var, maximum=100) - self.progress_bar.grid(row=1, column=0, columnspan=4, sticky="ew", padx=5, pady=5) + self.progress_bar.grid(row=1, column=0, columnspan=6, sticky="ew", padx=5, pady=5) log_frame = ttk.LabelFrame(main_frame, text="Log Console") log_frame.grid(row=2, column=0, sticky="nsew", pady=(5, 0)) @@ -86,7 +92,8 @@ class MainWindow(tk.Frame): self.log_widget = scrolledtext.ScrolledText(log_frame, state=tk.DISABLED, wrap=tk.WORD) self.log_widget.grid(row=0, column=0, sticky="nsew", padx=5, pady=5) - self.status_bar = ttk.Label(self.master, textvariable=self.status_bar_var, relief=tk.SUNKEN, anchor=tk.W, padding=2) + # Status Bar at the very bottom + self.status_bar = ttk.Label(self, textvariable=self.status_bar_var, relief=tk.SUNKEN, anchor=tk.W, padding=2) self.status_bar.pack(side=tk.BOTTOM, fill=tk.X) def _setup_gui_logging(self, logging_config): @@ -103,20 +110,24 @@ class MainWindow(tk.Frame): return filedialog.askopenfilename(initialdir=initial_dir, filetypes=[("Radar Output", "*.out"), ("All files", "*.*")]) def start_processing_ui(self): + """Prepares the UI for processing and starts the update loop.""" self.update_ui_for_processing_state(True) + self.batch_id_var.set("Starting...") + self.timetag_var.set("N/A") + self.heading_var.set("N/A") + self.progress_var.set(0) self.after(100, self.poll_result_queue) def update_ui_for_processing_state(self, is_processing: bool): + """Toggles the state of UI controls and status bar based on processing status.""" state = tk.DISABLED if is_processing else tk.NORMAL self.browse_button.config(state=state) self.process_button.config(state=state) self.stop_button.config(state=tk.NORMAL if is_processing else tk.DISABLED) if is_processing: - self.status_bar_var.set("Processing file... Please wait.") + self.status_bar_var.set("Processing... Please wait.") self.master.config(cursor="watch") - self.batch_id_var.set("Starting...") - self.progress_var.set(0) else: self.status_bar_var.set("Ready") self.progress_var.set(0) @@ -124,9 +135,9 @@ class MainWindow(tk.Frame): self.master.config(cursor="") def poll_result_queue(self): - # Process a limited number of messages per call to keep the GUI responsive - for _ in range(100): - try: + """Polls the result queue from the worker process for updates.""" + try: + while True: msg = self.gui_update_queue.get_nowait() msg_type = msg.get("type") @@ -136,11 +147,15 @@ class MainWindow(tk.Frame): elif msg_type == "progress": batch_id = msg.get("batch_id", "N/A") timetag = msg.get("timetag", "N/A") + heading = msg.get('heading', 0.0) + # Log the progress to the GUI log widget log.info(f"Processed Batch ID: {batch_id} (TimeTag: {timetag})") + # Update the live data labels self.batch_id_var.set(str(batch_id)) self.timetag_var.set(str(timetag)) + self.heading_var.set(f"{heading:.5f}") if self.total_items_for_progress > 0: progress = (msg.get("blocks_done", 0) / self.total_items_for_progress) * 100 @@ -152,16 +167,19 @@ class MainWindow(tk.Frame): was_interrupted=msg.get("interrupted", False) ) self.update_ui_for_processing_state(False) - return + return # Stop polling elif msg_type == "error": log.error(f"Received error from worker: {msg.get('message')}") self.update_ui_for_processing_state(False) - return - - except queue.Empty: - break # No more messages + self.batch_id_var.set("Error!") + return # Stop polling + except queue.Empty: + pass # Normal case, no new messages + except Exception as e: + log.error(f"Error in GUI polling loop: {e}") + if self.controller.is_processing: self.after(100, self.poll_result_queue)