update start, stop

This commit is contained in:
VALLONGOL 2025-06-18 14:04:01 +02:00
parent 71ada6d3b1
commit 7055c7ac27
3 changed files with 164 additions and 123 deletions

View File

@ -1,15 +1,15 @@
"""
Application Controller for the Radar Data Reader.
Orchestrates the interaction between the GUI and the core processing logic.
Orchestrates the interaction between the GUI and the core processing logic using multiprocessing.
"""
import multiprocessing as mp
import threading
import csv
from pathlib import Path
from typing import List
from ..utils.config_manager import ConfigManager
from ..core.file_reader import RadarFileReader
from ..core.data_structures import DataBatch
from ..core.file_reader import run_worker_process
from ..utils import logger
log = logger.get_logger(__name__)
@ -21,111 +21,103 @@ class AppController:
def __init__(self, config_manager: ConfigManager):
self.config_manager = config_manager
self.view = None
self.worker_thread: threading.Thread | None = None
self.worker: RadarFileReader | None = None
self.worker_process: mp.Process | None = None
self.is_processing = False
self.command_queue = mp.Queue()
self.result_queue = mp.Queue()
def bind_view(self, view):
"""Binds the GUI/View to this controller."""
self.view = view
self._load_initial_config()
def _load_initial_config(self):
"""Loads initial settings from config and updates the view."""
if last_file := self.config_manager.get("last_opened_file"):
if Path(last_file).is_file():
self.view.set_filepath(last_file)
def select_file(self):
"""Opens a file dialog to select a radar file."""
current_path = self.view.get_filepath()
if filepath := self.view.ask_open_filename(current_path):
self.view.set_filepath(filepath)
def start_processing(self):
"""Starts the file processing worker in a separate thread."""
if self.is_processing: return
"""Starts the file processing in a separate process."""
if self.is_processing:
log.warning("Processing is already in progress.")
return
filepath_str = self.view.get_filepath()
if not filepath_str or not Path(filepath_str).is_file():
log.error("No valid file selected.")
log.error("No valid file selected to process.")
return
self.is_processing = True
self.view.start_processing_ui()
self.worker = RadarFileReader(Path(filepath_str))
self.config_manager.set("last_opened_file", filepath_str)
self.config_manager.save_config()
self.worker_thread = threading.Thread(
target=self._run_processing,
# Clear queues before starting a new process
while not self.command_queue.empty(): self.command_queue.get()
while not self.result_queue.empty(): self.result_queue.get()
self.worker_process = mp.Process(
target=run_worker_process,
args=(Path(filepath_str), self.command_queue, self.result_queue),
daemon=True
)
self.worker_thread.start()
self.worker_process.start()
self.view.poll_result_queue()
def stop_processing(self):
if not self.is_processing or not self.worker: return
log.info("Stop requested by user. Signaling worker...")
self.worker.stop()
"""Sends a stop command to the worker process."""
if not self.is_processing:
log.warning("Stop clicked, but no process is running.")
return
log.info("Stop requested by user. Sending STOP command to worker...")
self.command_queue.put("STOP")
def _run_processing(self):
"""The actual processing logic that runs in the thread."""
saved_batches: List[DataBatch] = []
try:
filepath = self.worker.file_path
self.config_manager.set("last_opened_file", str(filepath))
self.config_manager.save_config()
def handle_worker_completion(self, results: List[dict], was_interrupted: bool):
"""Handles the final results from the worker."""
status = "Interrupted by user" if was_interrupted else "Processing Complete"
log.info(f"--- {status}. Found {len(results)} batches. ---")
if self.worker.load_and_find_blocks():
if not self.worker.stop_event.is_set():
total_blocks = len(self.worker.block_indices or [])
self.view.put_message_in_queue({"type": "start", "total": total_blocks})
filepath = Path(self.view.get_filepath())
if results and filepath:
self._save_results_to_csv(results, filepath)
def progress_callback(batch: DataBatch, batch_count: int, blocks_done: int):
self._on_batch_processed(batch, batch_count, blocks_done)
saved_batches.append(batch)
self.is_processing = False
self.worker_process = None
# Pass the callback to the worker
self.worker.build_batches(callback=progress_callback)
interrupted = self.worker.stop_event.is_set()
log_msg = "Interrupted by user" if interrupted else "Processing Complete"
log.info(f"--- {log_msg}. Found {len(saved_batches)} batches. ---")
if saved_batches:
self._save_results_to_csv(saved_batches, filepath)
except Exception as e:
log.error(f"An error occurred in worker thread: {e}", exc_info=True)
finally:
self.is_processing = False
self.worker = None
self.view.put_message_in_queue({"type": "done"})
def _on_batch_processed(self, batch: DataBatch, batch_count: int, blocks_done: int):
"""Puts progress info into the view's queue."""
update_info = {
"type": "progress",
"batch_id": batch_count,
"timetag": batch.header.header_data.signal_descr.ttag,
"blocks_done": blocks_done
}
self.view.put_message_in_queue(update_info)
def _save_results_to_csv(self, batches: List[DataBatch], source_path: Path):
def _save_results_to_csv(self, batch_results: List[dict], source_path: Path):
"""Saves the extracted data to a CSV file."""
output_path = source_path.with_suffix('.csv')
log.info(f"Saving {len(batches)} records to: {output_path}")
log.info(f"Saving {len(batch_results)} records to: {output_path}")
header = ["batch_id", "TimeTag"]
try:
with open(output_path, 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerow(header)
for i, batch in enumerate(batches):
writer.writerow([i + 1, batch.header.header_data.signal_descr.ttag])
for record in batch_results:
writer.writerow([record["batch_id"], record["timetag"]])
log.info("Save complete.")
except IOError as e:
log.error(f"Failed to write CSV: {e}")
def shutdown(self):
"""Handles application shutdown logic."""
log.info("Controller shutting down.")
self.stop_processing()
if self.worker_thread and self.worker_thread.is_alive():
self.worker_thread.join(timeout=1.0)
if self.worker_process and self.worker_process.is_alive():
log.info("Waiting for worker process to join...")
self.worker_process.join(timeout=1.0)
if self.worker_process.is_alive():
log.warning("Worker process did not exit gracefully, terminating.")
self.worker_process.terminate()
logger.shutdown_logging_system()

View File

@ -1,10 +1,10 @@
"""
Worker class to read and process radar data from a binary file.
Worker process logic for reading and parsing radar data files.
"""
import threading
import time
import multiprocessing as mp
from pathlib import Path
from typing import List, Optional, Dict, Callable
from typing import List, Optional, Dict, Iterator
import queue
import numpy as np
@ -20,63 +20,94 @@ BLOCK_NAME_OFFSET = 17
BLOCK_SIZE_OFFSET = 5
def run_worker_process(filepath: Path, command_queue: mp.Queue, result_queue: mp.Queue):
"""This function is the main target for the multiprocessing.Process."""
log.info(f"[Worker-{mp.current_process().pid}] Started for file: {filepath.name}")
try:
reader = RadarFileReader(filepath)
if not reader.load_and_find_blocks():
result_queue.put({"type": "error", "message": "Failed to load or find blocks."})
return
total_blocks = len(reader.block_indices or [])
result_queue.put({"type": "start", "total": total_blocks})
final_results = []
interrupted = False
batch_generator = reader.build_batches_generator()
for i, batch in enumerate(batch_generator):
try:
if not command_queue.empty() and command_queue.get_nowait() == "STOP":
log.warning(f"[Worker-{mp.current_process().pid}] Stop command received. Halting.")
interrupted = True
break
except queue.Empty:
pass
batch_count = i + 1
progress_data = {
"type": "progress",
"batch_id": batch_count,
"timetag": batch.header.header_data.signal_descr.ttag,
"heading": batch.header.header_data.general_settings.navigation.attitude.true_heading_rad,
"blocks_done": batch.header.header_sw.header_sw_part1.counter # Use a real counter if available
}
result_queue.put(progress_data)
final_results.append({"batch_id": batch_count, "timetag": progress_data["timetag"]})
result_queue.put({"type": "complete", "results": final_results, "interrupted": interrupted})
log.info(f"[Worker-{mp.current_process().pid}] Processing finished.")
except Exception as e:
log.error(f"[Worker-{mp.current_process().pid}] Unhandled exception: {e}", exc_info=True)
result_queue.put({"type": "error", "message": f"Worker failed: {e}"})
class RadarFileReader:
"""Worker class to read and process radar data from a binary file."""
"""Class containing the pure logic for file reading and parsing."""
def __init__(self, file_path: Path):
if not file_path.is_file():
raise FileNotFoundError(f"File not found: {file_path}")
self.file_path: Path = file_path
self.file_path = file_path
self.data_vector: Optional[np.ndarray] = None
self.block_indices: Optional[List[int]] = None
self.stop_event = threading.Event()
def stop(self):
if not self.stop_event.is_set():
self.stop_event.set()
def load_and_find_blocks(self) -> bool:
if self.stop_event.is_set(): return False
log.info(f"Loading data from {self.file_path}...")
self.data_vector = np.fromfile(str(self.file_path), dtype='<u4')
log.info(f"Loaded {self.data_vector.size} 32-bit words.")
try:
self.data_vector = np.fromfile(str(self.file_path), dtype='<u4')
log.info(f"Loaded {self.data_vector.size} 32-bit words.")
if self.stop_event.is_set(): return False
log.info(f"Searching for data blocks...")
all_marker_indices = np.where(self.data_vector == BLOCK_MARKER)[0]
if all_marker_indices.size < 2:
self.block_indices = []
else:
consecutive_markers_mask = np.diff(all_marker_indices) == 1
self.block_indices = all_marker_indices[:-1][consecutive_markers_mask].tolist()
log.info(f"Found {len(self.block_indices)} potential data blocks.")
return True
log.info(f"Searching for data blocks...")
all_marker_indices = np.where(self.data_vector == BLOCK_MARKER)[0]
if all_marker_indices.size < 2:
self.block_indices = []
else:
consecutive_markers_mask = np.diff(all_marker_indices) == 1
self.block_indices = all_marker_indices[:-1][consecutive_markers_mask].tolist()
log.info(f"Found {len(self.block_indices)} potential data blocks.")
return True
except Exception as e:
log.error(f"Failed to load or find blocks: {e}")
return False
def build_batches(self, callback: Optional[Callable]):
if self.block_indices is None or self.data_vector is None: return
def build_batches_generator(self) -> Iterator[DataBatch]:
if self.block_indices is None or self.data_vector is None:
return
current_header: Optional[MainHeader] = None
current_signals: Dict[str, np.ndarray] = {}
batch_count = 0
for i, block_start_index in enumerate(self.block_indices):
if self.stop_event.is_set():
log.info("Processing loop in worker interrupted.")
break
if i > 0 and i % 500 == 0:
time.sleep(0.001)
for block_start_index in self.block_indices:
try:
block_name = self.data_vector[block_start_index + BLOCK_NAME_OFFSET]
if block_name == HEADER_BLOCK_NAME:
if current_header:
new_batch = DataBatch(header=current_header, signals=current_signals)
if callback: callback(new_batch, batch_count, i)
yield DataBatch(header=current_header, signals=current_signals)
current_header, _ = parse_main_header(self.data_vector, block_start_index)
current_signals = {}
batch_count += 1
elif block_name in SIGNAL_TYPE_MAP and current_header:
block_size_words = self.data_vector[block_start_index + BLOCK_SIZE_OFFSET] // 4
@ -87,7 +118,7 @@ class RadarFileReader:
)
except (ValueError, IndexError) as e:
log.warning(f"Parse error at offset {block_start_index}: {e}")
continue
if current_header and not self.stop_event.is_set():
last_batch = DataBatch(header=current_header, signals=current_signals)
if callback: callback(last_batch, batch_count, len(self.block_indices))
if current_header:
yield DataBatch(header=current_header, signals=current_signals)

View File

@ -20,7 +20,7 @@ class MainWindow(tk.Frame):
self.master = master
self.controller = controller
self.gui_update_queue = self.controller.result_queue
self.gui_update_queue = controller.result_queue
self.total_items_for_progress = 0
self._init_vars()
@ -35,13 +35,16 @@ class MainWindow(tk.Frame):
log.info("Main window View initialized.")
def _init_vars(self):
"""Initialize all Tkinter variables."""
self.filepath_var = tk.StringVar()
self.batch_id_var = tk.StringVar(value="N/A")
self.timetag_var = tk.StringVar(value="N/A")
self.heading_var = tk.StringVar(value="N/A")
self.progress_var = tk.DoubleVar(value=0)
self.status_bar_var = tk.StringVar(value="Ready")
self.status_bar_var = tk.StringVar(value="Ready") # For status bar
def _create_widgets(self):
"""Create all the widgets for the main window."""
main_frame = tk.Frame(self)
main_frame.pack(padx=10, pady=10, fill=tk.BOTH, expand=True)
main_frame.rowconfigure(2, weight=1)
@ -72,12 +75,15 @@ class MainWindow(tk.Frame):
status_frame.grid(row=1, column=0, sticky="ew", pady=5)
status_frame.columnconfigure(1, weight=1)
status_frame.columnconfigure(3, weight=1)
status_frame.columnconfigure(5, weight=1)
ttk.Label(status_frame, text="Batch ID:").grid(row=0, column=0, padx=5)
ttk.Label(status_frame, textvariable=self.batch_id_var).grid(row=0, column=1, sticky="w")
ttk.Label(status_frame, text="TimeTag:").grid(row=0, column=2, padx=5)
ttk.Label(status_frame, textvariable=self.timetag_var).grid(row=0, column=3, sticky="w")
ttk.Label(status_frame, text="Heading:").grid(row=0, column=4, padx=5)
ttk.Label(status_frame, textvariable=self.heading_var).grid(row=0, column=5, sticky="w")
self.progress_bar = ttk.Progressbar(status_frame, variable=self.progress_var, maximum=100)
self.progress_bar.grid(row=1, column=0, columnspan=4, sticky="ew", padx=5, pady=5)
self.progress_bar.grid(row=1, column=0, columnspan=6, sticky="ew", padx=5, pady=5)
log_frame = ttk.LabelFrame(main_frame, text="Log Console")
log_frame.grid(row=2, column=0, sticky="nsew", pady=(5, 0))
@ -86,7 +92,8 @@ class MainWindow(tk.Frame):
self.log_widget = scrolledtext.ScrolledText(log_frame, state=tk.DISABLED, wrap=tk.WORD)
self.log_widget.grid(row=0, column=0, sticky="nsew", padx=5, pady=5)
self.status_bar = ttk.Label(self.master, textvariable=self.status_bar_var, relief=tk.SUNKEN, anchor=tk.W, padding=2)
# Status Bar at the very bottom
self.status_bar = ttk.Label(self, textvariable=self.status_bar_var, relief=tk.SUNKEN, anchor=tk.W, padding=2)
self.status_bar.pack(side=tk.BOTTOM, fill=tk.X)
def _setup_gui_logging(self, logging_config):
@ -103,20 +110,24 @@ class MainWindow(tk.Frame):
return filedialog.askopenfilename(initialdir=initial_dir, filetypes=[("Radar Output", "*.out"), ("All files", "*.*")])
def start_processing_ui(self):
"""Prepares the UI for processing and starts the update loop."""
self.update_ui_for_processing_state(True)
self.batch_id_var.set("Starting...")
self.timetag_var.set("N/A")
self.heading_var.set("N/A")
self.progress_var.set(0)
self.after(100, self.poll_result_queue)
def update_ui_for_processing_state(self, is_processing: bool):
"""Toggles the state of UI controls and status bar based on processing status."""
state = tk.DISABLED if is_processing else tk.NORMAL
self.browse_button.config(state=state)
self.process_button.config(state=state)
self.stop_button.config(state=tk.NORMAL if is_processing else tk.DISABLED)
if is_processing:
self.status_bar_var.set("Processing file... Please wait.")
self.status_bar_var.set("Processing... Please wait.")
self.master.config(cursor="watch")
self.batch_id_var.set("Starting...")
self.progress_var.set(0)
else:
self.status_bar_var.set("Ready")
self.progress_var.set(0)
@ -124,9 +135,9 @@ class MainWindow(tk.Frame):
self.master.config(cursor="")
def poll_result_queue(self):
# Process a limited number of messages per call to keep the GUI responsive
for _ in range(100):
try:
"""Polls the result queue from the worker process for updates."""
try:
while True:
msg = self.gui_update_queue.get_nowait()
msg_type = msg.get("type")
@ -136,11 +147,15 @@ class MainWindow(tk.Frame):
elif msg_type == "progress":
batch_id = msg.get("batch_id", "N/A")
timetag = msg.get("timetag", "N/A")
heading = msg.get('heading', 0.0)
# Log the progress to the GUI log widget
log.info(f"Processed Batch ID: {batch_id} (TimeTag: {timetag})")
# Update the live data labels
self.batch_id_var.set(str(batch_id))
self.timetag_var.set(str(timetag))
self.heading_var.set(f"{heading:.5f}")
if self.total_items_for_progress > 0:
progress = (msg.get("blocks_done", 0) / self.total_items_for_progress) * 100
@ -152,15 +167,18 @@ class MainWindow(tk.Frame):
was_interrupted=msg.get("interrupted", False)
)
self.update_ui_for_processing_state(False)
return
return # Stop polling
elif msg_type == "error":
log.error(f"Received error from worker: {msg.get('message')}")
self.update_ui_for_processing_state(False)
return
self.batch_id_var.set("Error!")
return # Stop polling
except queue.Empty:
break # No more messages
except queue.Empty:
pass # Normal case, no new messages
except Exception as e:
log.error(f"Error in GUI polling loop: {e}")
if self.controller.is_processing:
self.after(100, self.poll_result_queue)