one analyze function for segment and single out file

This commit is contained in:
VALLONGOL 2025-08-06 15:56:23 +02:00
parent 17e2cabffb
commit 9ed9a87922
6 changed files with 314 additions and 349 deletions

View File

@ -1,10 +1,10 @@
{
"last_opened_out_file": "C:/src/____GitProjects/radar_data_reader/flight_workspace/RWS-stt_40NM_wf-2-mprf_223745-227624/RWS-stt_40NM_wf-2-mprf_223745-227624.out",
"last_opened_out_file": "C:\\src\\____GitProjects\\radar_data_reader\\flight_workspace\\250515_122252_Flight\\185864-186369_GM-dbs_20NM_wf-1-lprf\\185864-186369_GM-dbs_20NM_wf-1-lprf.out",
"last_opened_rec_file": "C:/src/____GitProjects/radar_data_reader/_rec/_25-05-15-12-22-52_sata_345.rec",
"last_out_output_dir": "C:/src/____GitProjects/radar_data_reader/flight_workspace/250515_122252_Flight",
"last_out_output_dir": "C:\\src\\____GitProjects\\radar_data_reader\\out_analisys",
"last_rec_output_dir": "C:\\src\\____GitProjects\\radar_data_reader\\_rec",
"last_flight_folder": "C:/__Voli/Volo_12_25maggio2025/rec",
"last_flight_workspace_parent_dir": "C:/src/____GitProjects/radar_data_reader/flight_workspace/250515_122252_Flight",
"last_flight_folder": "C:/__Voli/Volo_12_25maggio2025/rec/rec",
"last_flight_workspace_parent_dir": "C:/src/____GitProjects/radar_data_reader/flight_workspace/",
"active_out_export_profile_name": "trackingdata",
"export_profiles": [
{
@ -534,7 +534,8 @@
},
"segment_processor_config": {
"last_output_dir": "C:/src/____GitProjects/radar_data_reader/out_analisys",
"create_separate_folders": true
"create_separate_folders": true,
"last_flight_analysis_dir": "C:\\src\\____GitProjects\\radar_data_reader\\flight_workspace\\250515_122252_Flight"
},
"flight_analysis_options": {
"aggregate_by_scale": true,

View File

@ -18,7 +18,6 @@ import time
from datetime import datetime
import queue
from collections import OrderedDict
from .file_analyzer import FileStructureAnalyzer
try:
import pandas as pd
@ -35,6 +34,7 @@ from .export_manager import ExportManager, ExportJob
from .flight_analyzer import FlightAnalyzer
from .out_processor import OutProcessor
from .segment_processor import SegmentProcessor
from .file_analyzer import FileStructureAnalyzer
from ..utils import logger
from ..gui.profile_editor_window import ProfileEditorWindow
@ -47,6 +47,7 @@ log = logger.get_logger(__name__)
def _get_value_from_path(batch: DataBatch, field: ExportField) -> Any:
"""Helper function to safely retrieve a nested value from a DataBatch object."""
try:
path = field.data_path
if path == "batch_id": return batch.batch_id
@ -74,6 +75,7 @@ def _get_value_from_path(batch: DataBatch, field: ExportField) -> Any:
def _convert_ctypes_for_json(obj: Any) -> Any:
"""Recursively converts ctypes objects to JSON-serializable types."""
if isinstance(obj, (int, float, str, bool)) or obj is None: return obj
if isinstance(obj, (ctypes._SimpleCData)): return obj.value
if isinstance(obj, CtypesStructureBase):
@ -94,7 +96,6 @@ def run_analysis_worker(filepath: Path, config: Dict[str, Any], result_queue: mp
analyzer = FileStructureAnalyzer(filepath, config)
report = analyzer.analyze()
stats = analyzer.stats
# Invia un messaggio unico con il report completo e le statistiche
result_queue.put({"type": "analysis_report", "report": report, "stats": stats})
except Exception as e:
log.error(f"Analysis worker failed: {e}", exc_info=True)
@ -114,7 +115,7 @@ class AppController:
self.export_manager = ExportManager(self.result_queue)
self.flight_analyzer = FlightAnalyzer(config_manager, self.result_queue)
self.out_processor = OutProcessor(self.command_queue, self.result_queue)
# self.out_processor = OutProcessor(self.command_queue, self.result_queue) # --> ORA GESTITO DIRETTAMENTE
self.segment_processor: Optional[SegmentProcessor] = None
self.active_export_profiles: Dict[str, ExportProfile] = {}
@ -122,13 +123,11 @@ class AppController:
self.csv_writers: Dict[str, Any] = {}
self.json_data_buffer: List[Dict[str, Any]] = []
# --- Attributes for batch buffering ---
self.batch_buffer = OrderedDict()
self.last_written_batch_counter = -1
self.total_batches_found_count = 0
self.last_generated_out_file: Optional[Path] = None
self.last_flight_summary_df: Optional["pd.DataFrame"] = None
self.total_files_for_analysis: int = 0
@ -138,6 +137,9 @@ class AppController:
self.view = view
self._load_initial_config()
self.export_manager.start()
# --- MODIFICA: Inizializza il SegmentProcessor qui ---
self.segment_processor = SegmentProcessor(self)
def shutdown(self):
log.info("Controller shutting down.")
@ -179,6 +181,9 @@ class AppController:
self.view.segment_processor_tab.output_dir_var.set(sp_config.get("last_output_dir", ""))
self.view.segment_processor_tab.create_separate_folders_var.set(sp_config.get("create_separate_folders", True))
if last_analysis_dir := sp_config.get("last_flight_analysis_dir"):
self.view.segment_processor_tab.flight_analysis_dir_var.set(last_analysis_dir)
fa_options = self.config_manager.get("flight_analysis_options", {})
self.view.aggregate_by_scale_var.set(fa_options.get("aggregate_by_scale", True))
self.view.aggregate_by_waveform_var.set(fa_options.get("aggregate_by_waveform", True))
@ -218,7 +223,7 @@ class AppController:
def select_rec_file(self):
self._select_file(self.view.rec_filepath_var, [("Recorder Data", "*.rec"), ("All files", "*.*")])
def _prepare_out_processor_files(self) -> bool:
def _prepare_out_processor_files(self, output_dir: Path, basename: str, options: Dict[str, Any]) -> bool:
self.output_file_handles.clear()
self.csv_writers.clear()
self.active_export_profiles.clear()
@ -226,18 +231,20 @@ class AppController:
self.batch_buffer.clear()
self.last_written_batch_counter = -1
self.total_batches_found_count = 0
try:
output_dir, basename, profiles = Path(self.view.out_output_dir_var.get()), self.view.out_basename_var.get(), self.config_manager.get_export_profiles()
use_full_path = self.view.out_use_full_path_var.get()
profiles = self.config_manager.get_export_profiles()
use_full_path = options.get("use_full_path_headers", False)
if self.view.out_output_csv_var.get():
profile = next((p for p in profiles if p.name == self.view.out_csv_profile_var.get()), None)
if not profile: raise ValueError(f"CSV profile '{self.view.out_csv_profile_var.get()}' not found.")
if options.get("generate_csv", False):
profile_name = options.get("csv_profile_name")
profile = next((p for p in profiles if p.name == profile_name), None)
if not profile: raise ValueError(f"CSV profile '{profile_name}' not found.")
self.active_export_profiles["csv"] = profile
csv_filename = f"{basename}_{profile.name}.csv"
path = output_dir / csv_filename
delimiter = "\t" if self.view.out_csv_use_tab_var.get() else ","
delimiter = "\t" if options.get("csv_use_tab", False) else ","
fh = open(path, "w", encoding="utf-8", newline="")
header = [field.data_path if use_full_path else field.column_name for field in profile.fields]
@ -246,9 +253,10 @@ class AppController:
self.output_file_handles["csv"] = fh
self.csv_writers["csv"] = csv_writer
if self.view.out_output_json_var.get():
profile = next((p for p in profiles if p.name == self.view.out_json_profile_var.get()), None)
if not profile: raise ValueError(f"JSON profile '{self.view.out_json_profile_var.get()}' not found.")
if options.get("generate_json", False):
profile_name = options.get("json_profile_name")
profile = next((p for p in profiles if p.name == profile_name), None)
if not profile: raise ValueError(f"JSON profile '{profile_name}' not found.")
self.active_export_profiles["json"] = profile
return True
@ -263,53 +271,88 @@ class AppController:
return
filepath_str = self.view.out_filepath_var.get()
output_dir_str = self.view.out_output_dir_var.get()
basename = self.view.out_basename_var.get()
if not all([filepath_str, output_dir_str, basename]):
log.error("Please set input file, output directory, and base filename.")
if not filepath_str:
messagebox.showerror("Input Error", "Please select an input .out file.", parent=self.view)
return
# --- AVVIO DEL FLUSSO CENTRALIZZATO ---
thread = threading.Thread(target=self._run_single_out_processing_flow, daemon=True)
thread.start()
def _run_single_out_processing_flow(self):
"""Orchestrates the processing for the 'Single OUT Processor' tab."""
self.is_processing = True
self.view.start_processing_ui()
# --- NUOVA LOGICA PER SCEGLIERE LA MODALITÀ ---
if self.view.out_analysis_only_var.get():
log.info("Starting file structure analysis...")
analysis_config = self.config_manager.get("file_analyzer_config", {})
worker_args = (Path(filepath_str), analysis_config, self.result_queue)
self._launch_worker(run_analysis_worker, worker_args)
filepath = Path(self.view.out_filepath_var.get())
output_dir = Path(self.view.out_output_dir_var.get())
basename = self.view.out_basename_var.get()
# Raccoglie le opzioni dalla UI
options = {
"generate_csv": self.view.out_output_csv_var.get(),
"csv_profile_name": self.view.out_csv_profile_var.get(),
"csv_use_tab": self.view.out_csv_use_tab_var.get(),
"generate_json": self.view.out_output_json_var.get(),
"json_profile_name": self.view.out_json_profile_var.get(),
"use_full_path_headers": self.view.out_use_full_path_var.get(),
"analysis_only": self.view.out_analysis_only_var.get()
}
# Salva le configurazioni recenti
self.config_manager.set("last_opened_out_file", str(filepath))
self.config_manager.set("last_out_output_dir", str(output_dir))
self.config_manager.set("active_out_export_profile_name", options["csv_profile_name"])
self.config_manager.save_config()
# Chiama la funzione centralizzata
success = self.process_single_out_file(filepath, output_dir, basename, options)
if success:
log.info(f"Successfully processed {filepath.name}.")
else:
# Flusso di lavoro standard per l'esportazione dati
if not any([self.view.out_output_csv_var.get(), self.view.out_output_json_var.get()]):
log.error("Please select at least one output format (CSV or JSON).")
self.is_processing = False
self.view.update_ui_for_processing_state(False)
return
if not self._prepare_out_processor_files():
self.is_processing = False
self.view.update_ui_for_processing_state(False)
return
self.config_manager.set("last_opened_out_file", filepath_str)
self.config_manager.set("last_out_output_dir", output_dir_str)
self.config_manager.set("active_out_export_profile_name", self.view.out_csv_profile_var.get())
self.config_manager.save_config()
active_profile = self.active_export_profiles.get("csv") or self.active_export_profiles.get("json")
if not active_profile:
log.error("No active export profile found for processing.")
self.is_processing = False
self.view.update_ui_for_processing_state(False)
return
log.error(f"Failed to process {filepath.name}.")
cpp_config = self.config_manager.get_cpp_converter_config()
enable_profiling = cpp_config.get("enable_python_worker_profiling", False)
self.is_processing = False
# La UI viene aggiornata dal loop di polling che rileva la fine del processo
# self.view.update_ui_for_processing_state(False) # Gestito da handle_worker_completion
def process_single_out_file(self, filepath: Path, output_dir: Path, basename: str, options: Dict[str, Any]) -> bool:
"""
Core blocking function to process a single .out file.
This is now the central point for OUT processing logic.
"""
if options.get("analysis_only", False):
log.info(f"Starting file structure analysis for {filepath.name}...")
analysis_config = self.config_manager.get("file_analyzer_config", {})
worker_args = (filepath, analysis_config, self.result_queue)
self._launch_worker(run_analysis_worker, worker_args)
return True # L'esito finale sarà gestito dal messaggio di completamento
# Logica di esportazione standard
if not options.get("generate_csv") and not options.get("generate_json"):
log.error("No output format selected. Aborting.")
return False
self.out_processor.start_processing(Path(filepath_str), active_profile, enable_profiling)
# Il worker process viene recuperato all'interno di out_processor
self.worker_process = self.out_processor.get_worker()
self.view.poll_result_queue()
if not self._prepare_out_processor_files(output_dir, basename, options):
return False
active_profile = self.active_export_profiles.get("csv") or self.active_export_profiles.get("json")
if not active_profile:
log.error("No active export profile found for processing.")
return False
cpp_config = self.config_manager.get_cpp_converter_config()
enable_profiling = cpp_config.get("enable_python_worker_profiling", False)
worker_args = (filepath, self.command_queue, self.result_queue, active_profile, enable_profiling)
self._launch_worker(run_worker_process, worker_args)
# Questo è un loop bloccante che attende la fine del processo
while self.is_processing:
time.sleep(0.1) # Attende passivamente, la UI è gestita da poll_result_queue
return True # L'esito finale dipende da handle_worker_completion
def _build_cpp_command_list(self) -> List[str]:
config = self.config_manager.get_cpp_converter_config()
@ -342,9 +385,16 @@ class AppController:
def _launch_worker(self, target_func, args_tuple: Tuple):
while not self.command_queue.empty(): self.command_queue.get_nowait()
while not self.result_queue.empty(): self.result_queue.get_nowait()
# --- MODIFICA: Aggiorna lo stato prima di avviare il processo ---
self.is_processing = True
self.worker_process = mp.Process(target=target_func, args=args_tuple, daemon=True)
self.worker_process.start()
self.view.poll_result_queue()
# Inizia il polling solo se non è già attivo
if self.view:
self.view.after(100, self.view.poll_result_queue)
def stop_processing(self):
if not self.is_processing: return
@ -352,7 +402,7 @@ class AppController:
if self.worker_process and self.worker_process.is_alive():
self.command_queue.put("STOP")
if self.segment_processor:
pass
self.segment_processor.stop() # Aggiunta chiamata a stop
def _close_all_files(self):
self._flush_batch_buffer(force_write_all=True)
@ -363,7 +413,6 @@ class AppController:
self.csv_writers.clear()
def _write_single_batch_to_files(self, batch: DataBatch):
"""Writes a single complete DataBatch to output files."""
if not batch: return
self.total_batches_found_count += 1
@ -395,7 +444,6 @@ class AppController:
self.output_file_handles["csv"].flush()
def _flush_batch_buffer(self, new_batch_counter: Optional[int] = None, force_write_all: bool = False):
"""Writes completed batches from the buffer."""
if force_write_all:
for counter, batch_to_write in self.batch_buffer.items():
self._write_single_batch_to_files(batch_to_write)
@ -414,7 +462,6 @@ class AppController:
del self.batch_buffer[counter]
def handle_data_batch_fragment(self, fragment: DataBatch):
"""Receives a fragment, merges it into the buffer, and writes completed batches."""
header = fragment.main_header
if not header or not header.ge_header:
log.warning("Received a batch fragment without a valid header, skipping.")
@ -474,9 +521,9 @@ class AppController:
if self.view.out_output_json_var.get(): self._write_json_buffer_to_file()
self._close_all_files()
self.is_processing = False
self.is_processing = False # --> Stato globale di processamento
self.worker_process = None
self.segment_processor = None
# self.segment_processor = None # --> Non resettarlo qui
self.view.update_ui_for_processing_state(False)
is_cpp_success = "Conversion process completed successfully" in msg.get("message", "")
self.view.update_rec_tab_buttons_state(conversion_successful=is_cpp_success)
@ -670,11 +717,16 @@ class AppController:
def handle_analysis_summary_data(self, msg: Dict[str, Any]):
summary_df = msg.get("data")
flight_folder_path = msg.get("flight_folder_path")
flight_folder_path_str = str(msg.get("flight_folder_path"))
if self.view and summary_df is not None:
self.last_flight_summary_df = summary_df
self.view.populate_timeline_from_dataframe(summary_df)
log.info(f"Analysis summary received for flight folder: {flight_folder_path}")
if flight_folder_path_str:
log.info(f"Analysis summary received for flight folder: {flight_folder_path_str}")
log.info("Automatically setting flight analysis path in Segment Processor tab.")
self.view.segment_processor_tab.flight_analysis_dir_var.set(flight_folder_path_str)
def start_segment_export(self):
if self.is_processing:
@ -740,87 +792,14 @@ class AppController:
self.view.poll_result_queue()
def load_segments_for_processing(self):
log.info("Loading segments for batch processing...")
if self.is_processing:
messagebox.showwarning("Busy", "Cannot load segments while another process is running.", parent=self.view)
return
flight_folder = self.flight_analyzer.current_flight_folder_path
if not flight_folder or not flight_folder.is_dir():
messagebox.showerror("No Flight Loaded", "Please run a flight analysis first on the 'Flight Analyzer' tab.", parent=self.view)
return
if self.last_flight_summary_df is None:
messagebox.showerror("No Summary Data", "Flight summary data not found. Please re-run the analysis.", parent=self.view)
return
try:
export_config = self.config_manager.get("segment_export_config", {})
naming_template = export_config.get("naming_options", {}).get("folder_name_template", "{Segment}_{StartBatch}-{EndBatch}")
verified_segments = SegmentProcessor.scan_for_segments(flight_folder, self.last_flight_summary_df, naming_template)
if not verified_segments:
log.warning("No segments found or summary is empty.")
self.view.segment_processor_tab.populate_segments(verified_segments)
log.info(f"Found and verified {len(verified_segments)} segments.")
except Exception as e:
log.error(f"Failed to load and verify segments: {e}", exc_info=True)
messagebox.showerror("Error", f"An error occurred while loading segments:\n{e}", parent=self.view)
if self.segment_processor:
self.segment_processor.load_segments_for_processing()
def start_segment_batch_processing(self):
if self.is_processing:
messagebox.showwarning("Busy", "Another process is already running.", parent=self.view)
return
tab: SegmentProcessorTab = self.view.segment_processor_tab
selected_segments = tab.get_selected_segments_data()
if not selected_segments:
messagebox.showinfo("No Selection", "Please select at least one 'Ready' segment to process.", parent=self.view)
return
cpp_config = self.config_manager.get_cpp_converter_config()
enable_profiling = cpp_config.get("enable_python_worker_profiling", False)
config = {
"segments_to_process": selected_segments,
"output_dir": tab.output_dir_var.get(),
"generate_csv": tab.generate_csv_var.get(),
"generate_json": tab.generate_json_var.get(),
"create_separate_folders": tab.create_separate_folders_var.get(),
"csv_profile_name": tab.csv_profile_var.get(),
"json_profile_name": tab.json_profile_var.get(),
"csv_use_tab": tab.csv_use_tab_var.get(),
"use_full_path_headers": tab.use_full_path_var.get(),
"profiles": self.config_manager.get_export_profiles(),
"enable_profiling": enable_profiling,
}
if not config["output_dir"]:
messagebox.showerror("Configuration Error", "Please specify an output directory.", parent=self.view)
return
if not config["generate_csv"] and not config["generate_json"]:
messagebox.showerror("Configuration Error", "Please select at least one output format (CSV or JSON).", parent=self.view)
return
sp_config = self.config_manager.get("segment_processor_config", {})
sp_config["last_output_dir"] = config["output_dir"]
sp_config["create_separate_folders"] = config["create_separate_folders"]
self.config_manager.set("segment_processor_config", sp_config)
self.config_manager.save_config()
self.is_processing = True
self.view.start_processing_ui()
self.segment_processor = SegmentProcessor(config, self.result_queue, self.command_queue)
self.segment_processor.start()
self.view.poll_result_queue()
if self.segment_processor:
self.segment_processor.start_batch_processing()
def _save_analysis_report(self, report_data: Dict[str, Any]):
"""Saves the analysis report to a text file."""
report_list = report_data.get("report", [])
stats = report_data.get("stats", {})
block_counts = stats.get("block_type_counts", {})
@ -836,13 +815,11 @@ class AppController:
f.write(f"--- Analysis Report for file: {self.view.out_filepath_var.get()} ---\n\n")
f.write("--- Summary ---\n")
# Scrive le statistiche generali
for key, value in stats.items():
if key != "block_type_counts":
f.write(f"{key.replace('_', ' ').title()}: {value}\n")
f.write("\n")
# --- NUOVA SEZIONE PER IL RIASSUNTO DEI BLOCCHI ---
if block_counts:
f.write("--- Block Type Summary ---\n")
sorted_blocks = sorted(block_counts.items(), key=lambda item: item[1], reverse=True)
@ -857,7 +834,6 @@ class AppController:
if msg_type == "BLOCK":
name = entry.get('name', 'N/A')
# Legge la chiave corretta: declared_payload_bytes
size = entry.get('declared_payload_bytes', 'N/A')
f.write(f"[{offset}] {msg_type:<8} | Name: {name:<15} | Declared Payload: {size} bytes\n")
else:
@ -868,10 +844,21 @@ class AppController:
log.error(f"Failed to save analysis report: {e}")
def handle_analysis_report(self, msg: Dict[str, Any]):
"""Handles the completion of the analysis worker."""
log.info("--- File Structure Analysis Complete. ---")
self._save_analysis_report(msg)
self.is_processing = False
self.worker_process = None
self.view.update_ui_for_processing_state(False)
self.view.update_ui_for_processing_state(False)
def select_flight_analysis_dir_for_segment_processor(self):
initial_dir = self.view.segment_processor_tab.flight_analysis_dir_var.get()
if not initial_dir:
initial_dir = self.flight_analyzer.current_flight_folder_path
new_dir_str = filedialog.askdirectory(
initialdir=initial_dir,
title="Select Flight Analysis Directory"
)
if new_dir_str:
self.view.segment_processor_tab.flight_analysis_dir_var.set(new_dir_str)

View File

@ -204,7 +204,7 @@ class RadarFileReader:
self.log.info(f"Loading data from {self.file_path}...")
self.data_vector = np.fromfile(str(self.file_path), dtype="<u4")
self.log.info(f"Loaded {self.data_vector.size} 32-bit words.")
# ... (il resto della funzione rimane invariato)
self.log.info("Scanning for block markers (Legacy & FW)...")
legacy_indices = np.where(self.data_vector == LEGACY_BLOCK_MARKER)[0]
@ -259,10 +259,18 @@ class RadarFileReader:
block_data_slice = self.data_vector[
start_offset_words : start_offset_words + size_words
]
block_id = self.data_vector[start_offset_words + LEGACY_NAME_OFFSET_WORDS]
# The concept of a single block ID is complex due to multi-word names.
# We rely on the block name identified during the initial scan.
# We still need to parse an ID for the parse_block function.
# Let's assume a legacy-style offset for simplicity, as the name override is key.
try:
block_id_for_parser = self.data_vector[start_offset_words + LEGACY_NAME_OFFSET_WORDS]
except IndexError:
block_id_for_parser = 0 # Default if block is too short
parsed_block = parse_block(
block_id, block_data_slice, last_header, block_name_override=block_name
block_id_for_parser, block_data_slice, last_header, block_name_override=block_name
)
if parsed_block is None:
@ -271,7 +279,8 @@ class RadarFileReader:
stats[f"found_{parsed_block.block_name.split('_')[0]}"] += 1
# Simplified logic: if it's a header, start a new batch fragment.
# --- LOGICA DI AGGREGAZIONE SEMPLIFICATA ---
# Se è un header, finalizza il batch precedente e ne inizia uno nuovo.
if isinstance(parsed_block, ds.DspHeaderIn):
if current_batch:
yield current_batch, block_num
@ -287,7 +296,9 @@ class RadarFileReader:
elif isinstance(parsed_block, ds.AesaBlock): current_batch.aesa_data = parsed_block
elif isinstance(parsed_block, ds.D1553Block): current_batch.d1553_data = parsed_block
elif isinstance(parsed_block, ds.SttBlock): current_batch.stt_data = parsed_block
elif isinstance(parsed_block, ds.DspHeaderIn): current_batch.blocks[0] = parsed_block # ensure header is first
# ... (altri blocchi se necessario)
# Assicura che l'ultimo batch venga inviato
if current_batch:
yield current_batch, len(self.block_metadata)

View File

@ -2,17 +2,14 @@
"""
Contains the business logic for the Segment Processor feature.
This class handles scanning for exported segments and processing their .out
files in a batch operation to extract structured data.
This class now acts as an orchestrator, delegating the processing of
individual .out files to the AppController to ensure consistent logic.
"""
import threading
import queue
import re
import multiprocessing as mp
import json
import csv
from pathlib import Path
from typing import Dict, Any, List
from tkinter import messagebox
try:
import pandas as pd
@ -20,219 +17,171 @@ except ImportError:
pd = None
from ..utils import logger
from .file_reader import run_worker_process
from ..core.export_profiles import ExportProfile
log = logger.get_logger(__name__)
def _get_value_from_path(batch: Any, path: str) -> Any:
try:
if path == "batch_id":
return getattr(batch, "batch_id", "N/A")
parts = re.split(r"\.|\[", path)
current_obj = batch
for part in parts:
if current_obj is None:
return "N/A"
if part.endswith("]"):
index_str = part[:-1]
if not index_str.isdigit():
return "N/A"
try:
current_obj = current_obj[int(index_str)]
except (IndexError, TypeError):
return "N/A"
else:
current_obj = getattr(current_obj, part, None)
return current_obj if current_obj is not None else "N/A"
except Exception:
return "N/A"
def _convert_ctypes_for_json(obj: Any) -> Any:
if isinstance(obj, (int, float, str, bool)) or obj is None:
return obj
if hasattr(obj, "_length_"):
return [_convert_ctypes_for_json(item) for item in obj]
if hasattr(obj, "_fields_"):
return {
field: _convert_ctypes_for_json(getattr(obj, field))
for field, _ in obj._fields_
}
if isinstance(obj, bytes):
return obj.hex()
return obj
class SegmentProcessor:
"""Manages the batch processing of exported flight segments."""
"""Manages the batch processing of exported flight segments by orchestrating the AppController."""
def __init__(
self,
config: Dict[str, Any],
result_queue: queue.Queue,
command_queue: queue.Queue,
):
self.config = config
self.result_queue = result_queue
self.command_queue = command_queue
def __init__(self, controller: "AppController"):
self.controller = controller
self.config: Dict[str, Any] = {}
self._worker_thread: threading.Thread = None
self._stop_event = threading.Event()
def start(self):
"""Starts the batch processing in a background thread."""
if self._worker_thread and self._worker_thread.is_alive():
log.warning("Segment processing is already running.")
def load_segments_for_processing(self):
"""Loads and verifies segments from a flight analysis directory."""
log.info("Loading segments for batch processing...")
if self.controller.is_processing:
messagebox.showwarning("Busy", "Cannot load segments while another process is running.", parent=self.controller.view)
return
tab = self.controller.view.segment_processor_tab
flight_folder_str = tab.flight_analysis_dir_var.get()
if not flight_folder_str:
messagebox.showerror("No Directory Selected", "Please select a flight analysis directory first.", parent=self.controller.view)
return
flight_folder = Path(flight_folder_str)
summary_file = flight_folder / "flight_summary.csv"
if not flight_folder.is_dir() or not summary_file.is_file():
messagebox.showerror(
"Invalid Directory",
f"The selected directory is not a valid flight analysis directory.\nMissing: {summary_file.name}",
parent=self.controller.view
)
return
try:
if pd is None:
log.error("Pandas is not installed. Cannot load segments."); return
summary_df = pd.read_csv(summary_file)
export_config = self.controller.config_manager.get("segment_export_config", {})
naming_template = export_config.get("naming_options", {}).get("folder_name_template", "{Segment}_{StartBatch}-{EndBatch}")
verified_segments = self.scan_for_segments(flight_folder, summary_df, naming_template)
if not verified_segments:
log.warning("No segments found in the summary or none are valid.")
tab.populate_segments(verified_segments)
log.info(f"Found and verified {len(verified_segments)} segments from {flight_folder.name}.")
sp_config = self.controller.config_manager.get("segment_processor_config", {})
sp_config["last_flight_analysis_dir"] = flight_folder_str
self.controller.config_manager.set("segment_processor_config", sp_config)
self.controller.config_manager.save_config()
except Exception as e:
log.error(f"Failed to load and verify segments: {e}", exc_info=True)
messagebox.showerror("Error", f"An error occurred while loading segments:\n{e}", parent=self.controller.view)
def start_batch_processing(self):
"""Starts the batch processing in a background thread."""
if self.controller.is_processing:
messagebox.showwarning("Busy", "Another process is already running.", parent=self.controller.view)
return
tab = self.controller.view.segment_processor_tab
selected_segments = tab.get_selected_segments_data()
if not selected_segments:
messagebox.showinfo("No Selection", "Please select at least one 'Ready' segment to process.", parent=self.controller.view)
return
self.config = {
"segments_to_process": selected_segments,
"output_dir": tab.output_dir_var.get(),
"generate_csv": tab.generate_csv_var.get(),
"generate_json": tab.generate_json_var.get(),
"create_separate_folders": tab.create_separate_folders_var.get(),
"csv_profile_name": tab.csv_profile_var.get(),
"json_profile_name": tab.json_profile_var.get(),
"csv_use_tab": tab.csv_use_tab_var.get(),
"use_full_path_headers": tab.use_full_path_var.get(),
}
if not self.config["output_dir"]:
messagebox.showerror("Configuration Error", "Please specify an output directory.", parent=self.controller.view)
return
if not self.config["generate_csv"] and not self.config["generate_json"]:
messagebox.showerror("Configuration Error", "Please select at least one output format (CSV or JSON).", parent=self.controller.view)
return
sp_config = self.controller.config_manager.get("segment_processor_config", {})
sp_config["last_output_dir"] = self.config["output_dir"]
sp_config["create_separate_folders"] = self.config["create_separate_folders"]
self.controller.config_manager.set("segment_processor_config", sp_config)
self.controller.config_manager.save_config()
self._stop_event.clear()
self._worker_thread = threading.Thread(
target=self._run_batch_processing, daemon=True
)
self._worker_thread.start()
def stop(self):
"""Signals the worker thread to stop processing."""
self._stop_event.set()
def _run_batch_processing(self):
"""The main loop for processing segments, executed in a thread."""
segments_to_process = self.config.get("segments_to_process", [])
output_dir = Path(self.config.get("output_dir"))
create_folders = self.config.get("create_separate_folders", True)
"""The main loop that iterates through segments and calls the controller."""
self.controller.is_processing = True
self.controller.view.start_processing_ui()
profiles = self.config.get("profiles", [])
csv_profile = next(
(p for p in profiles if p.name == self.config.get("csv_profile_name")), None
)
json_profile = next(
(p for p in profiles if p.name == self.config.get("json_profile_name")),
None,
)
segments = self.config.get("segments_to_process", [])
total_segments = len(segments)
log.info(f"Starting batch processing for {len(segments_to_process)} segments.")
for i, segment_info in enumerate(segments_to_process):
for i, segment_info in enumerate(segments):
if self._stop_event.is_set():
log.info("Stop command received, halting batch processing.")
break
segment_name = segment_info.get("folder_name")
out_file_path = Path(segment_info.get("out_file_path"))
log.info(
f"Processing segment {i+1}/{len(segments_to_process)}: {segment_name}"
)
self.result_queue.put(
# Update UI progress
self.controller.result_queue.put(
{
"type": "batch_progress",
"current": i + 1,
"total": len(segments_to_process),
"total": total_segments,
"segment_name": segment_name,
}
)
log.info(f"Processing segment {i+1}/{total_segments}: {segment_name}")
output_dir_base = Path(self.config.get("output_dir"))
current_output_dir = (
output_dir / segment_name if create_folders else output_dir
output_dir_base / segment_name
if self.config.get("create_separate_folders")
else output_dir_base
)
current_output_dir.mkdir(parents=True, exist_ok=True)
self._process_single_segment(
out_file_path,
current_output_dir,
segment_name,
csv_profile,
json_profile,
# --- DELEGA AL CONTROLLER ---
self.controller.process_single_out_file(
filepath=out_file_path,
output_dir=current_output_dir,
basename=segment_name,
options=self.config # Passa le opzioni di output
)
log.info("Batch processing finished.")
self.result_queue.put(
self.controller.result_queue.put(
{"type": "complete", "message": "Segment batch processing complete."}
)
def _process_single_segment(
self,
in_path: Path,
out_dir: Path,
base_name: str,
csv_p: ExportProfile,
json_p: ExportProfile,
):
worker_cmd_q = mp.Queue()
worker_res_q = mp.Queue()
active_profile = csv_p if self.config.get("generate_csv") else json_p
if not active_profile:
return
# Get the profiling flag from the main config
enable_profiling = self.config.get("enable_profiling", False)
worker_args = (in_path, worker_cmd_q, worker_res_q, active_profile, enable_profiling) # <-- AGGIUNTO FLAG
process = mp.Process(target=run_worker_process, args=worker_args, daemon=True)
process.start()
csv_writer, json_data, csv_fh = None, [], None
try:
if self.config.get("generate_csv") and csv_p:
csv_filename = f"{base_name}_{csv_p.name}.csv"
csv_path = out_dir / csv_filename
csv_fh = open(csv_path, "w", encoding="utf-8", newline="")
delimiter = "\t" if self.config.get("csv_use_tab") else ","
csv_writer = csv.writer(csv_fh, delimiter=delimiter)
header = [
(
field.data_path
if self.config.get("use_full_path_headers")
else field.column_name
)
for field in csv_p.fields
]
csv_writer.writerow(header)
if self.config.get("generate_json") and json_p:
json_data = []
while True:
msg = worker_res_q.get()
if msg["type"] == "data_batch":
batch = msg["data"]
if csv_writer and csv_p:
row = [
_get_value_from_path(batch, field.data_path)
for field in csv_p.fields
]
csv_writer.writerow(row)
if json_data is not None and json_p:
row_dict = {}
for field in json_p.fields:
key = (
field.data_path
if self.config.get("use_full_path_headers")
else field.column_name
)
raw_value = _get_value_from_path(batch, field.data_path)
row_dict[key] = _convert_ctypes_for_json(raw_value)
json_data.append(row_dict)
elif msg["type"] == "complete":
break
elif msg["type"] == "error":
log.error(f"Worker for {in_path.name} failed: {msg['message']}")
break
finally:
if csv_fh:
csv_fh.close()
if self.config.get("generate_json") and json_p and json_data:
json_filename = f"{base_name}_{json_p.name}.json"
json_path = out_dir / json_filename
with open(json_path, "w", encoding="utf-8") as f:
json.dump(json_data, f, indent=4)
process.join(timeout=2)
if process.is_alive():
process.terminate()
# Il reset dello stato 'is_processing' è gestito da handle_worker_completion
# chiamato dall'ultimo messaggio "complete" nella coda.
@staticmethod
def scan_for_segments(
flight_path: Path, flight_summary_df: pd.DataFrame, naming_template: str
) -> List[Dict[str, Any]]:
"""Scans for exported segments based on a flight summary and verifies them."""
if flight_summary_df is None or flight_summary_df.empty:
return []
@ -247,6 +196,7 @@ class SegmentProcessor:
folder_name = naming_template
for key, value in placeholders.items():
folder_name = folder_name.replace(key, value)
safe_folder_name = re.sub(r'[\\/*?:"<>|]', "-", folder_name)
segment_path = flight_path / safe_folder_name
@ -268,4 +218,4 @@ class SegmentProcessor:
verified_segments.append(segment_info)
return verified_segments
return verified_segments

View File

@ -48,7 +48,7 @@ class MainWindow(tk.Frame):
self.master.title(
f"Radar Data Reader & Processor - {WRAPPER_APP_VERSION_STRING}"
)
self.master.geometry("1280x800")
self.master.geometry("1280x1024")
self._create_widgets()
self._setup_gui_logging(logging_config)

View File

@ -34,26 +34,42 @@ class SegmentProcessorTab(ttk.Frame):
self.progress_var = tk.DoubleVar(value=0)
self.progress_text_var = tk.StringVar(value="N/A")
self.flight_analysis_dir_var = tk.StringVar()
self._create_widgets()
def _create_widgets(self):
"""Creates and lays out all widgets for the tab."""
self.columnconfigure(0, weight=1)
self.rowconfigure(1, weight=1)
self.rowconfigure(2, weight=1)
source_frame = ttk.LabelFrame(self, text="Flight Analysis Source Directory")
source_frame.grid(row=0, column=0, sticky="ew", pady=(0, 10))
source_frame.columnconfigure(1, weight=1)
ttk.Label(source_frame, text="Path:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
dir_entry = ttk.Entry(source_frame, textvariable=self.flight_analysis_dir_var)
dir_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
browse_button = ttk.Button(
source_frame,
text="Browse...",
command=self.controller.select_flight_analysis_dir_for_segment_processor
)
browse_button.grid(row=0, column=2, padx=5, pady=5)
action_frame = ttk.Frame(self)
action_frame.grid(row=0, column=0, sticky="ew", pady=(0, 10))
action_frame.grid(row=1, column=0, sticky="ew", pady=(0, 10))
self.load_segments_button = ttk.Button(
action_frame,
text="Load Exported Segments from Current Flight",
text="Load Segments from Selected Directory",
command=self.controller.load_segments_for_processing,
)
self.load_segments_button.pack(side=tk.LEFT)
segments_frame = ttk.LabelFrame(self, text="Available Segments")
segments_frame.grid(row=1, column=0, sticky="nsew", pady=10)
segments_frame.grid(row=2, column=0, sticky="nsew", pady=10)
segments_frame.columnconfigure(0, weight=1)
segments_frame.rowconfigure(0, weight=1)
@ -98,19 +114,8 @@ class SegmentProcessorTab(ttk.Frame):
self.segments_tree.tag_configure("ready", foreground="green")
self.segments_tree.tag_configure("not_exported", foreground="gray")
help_label = ttk.Label(
segments_frame,
text="Tip: Segments in gray are not exported and cannot be selected. Use the 'Flight Analyzer' tab to export them.",
wraplength=600,
justify=tk.LEFT,
style="Italic.TLabel",
)
help_label.grid(row=1, column=0, columnspan=2, sticky="w", padx=5, pady=5)
self.master.style = ttk.Style()
self.master.style.configure("Italic.TLabel", font=("", 9, "italic"))
selection_frame = ttk.Frame(self)
selection_frame.grid(row=2, column=0, sticky="ew", pady=(5, 10))
selection_frame = ttk.Frame(segments_frame)
selection_frame.grid(row=1, column=0, columnspan=2, sticky="ew", pady=(5, 0))
ttk.Button(
selection_frame, text="Select All Ready", command=self._select_all_ready
@ -119,6 +124,17 @@ class SegmentProcessorTab(ttk.Frame):
side=tk.LEFT, padx=5
)
help_label = ttk.Label(
segments_frame,
text="Tip: Segments in gray are not exported and cannot be selected. Use the 'Flight Analyzer' tab to export them.",
wraplength=600,
justify=tk.LEFT,
style="Italic.TLabel",
)
help_label.grid(row=2, column=0, columnspan=2, sticky="w", padx=5, pady=5)
self.master.style = ttk.Style()
self.master.style.configure("Italic.TLabel", font=("", 9, "italic"))
output_config_frame = ttk.LabelFrame(
self, text="Batch Processing Output Configuration"
)