diff --git a/config/config.json b/config/config.json index 5df5238..1914056 100644 --- a/config/config.json +++ b/config/config.json @@ -1,6 +1,6 @@ { - "last_opened_file": "C:/src/____GitProjects/radar_data_reader/_25-05-15-12-22-52_sata_354-n11.out", - "last_output_file": "C:\\src\\____GitProjects\\radar_data_reader\\_25-05-15-12-22-52_sata_354-n11.csv", + "last_opened_file": "C:/src/____GitProjects/radar_data_reader/_25-05-15-12-22-52_sata_568-n13.out", + "last_output_file": "C:\\src\\____GitProjects\\radar_data_reader\\_25-05-15-12-22-52_sata_568-n13.csv", "active_export_profile_name": "Default", "export_profiles": [ { diff --git a/radar_data_reader/core/file_reader.py b/radar_data_reader/core/file_reader.py index 6798200..ec3bbdf 100644 --- a/radar_data_reader/core/file_reader.py +++ b/radar_data_reader/core/file_reader.py @@ -1,12 +1,15 @@ +# radar_data_reader/core/file_reader.py + """ Worker process logic for reading and parsing radar data files. """ import multiprocessing as mp +import logging # <-- NUOVO IMPORT from pathlib import Path -from typing import List, Optional, Dict, Iterator, Tuple, Set +from typing import Set, Iterator, Tuple import queue -import cProfile -import pstats +import cProfile +import pstats import numpy as np @@ -22,6 +25,35 @@ HEADER_BLOCK_NAME = 1213223748 BLOCK_NAME_OFFSET = 17 BLOCK_SIZE_OFFSET = 5 +# --- NUOVA COSTANTE PER IL FILE DI LOG DEL WORKER --- +WORKER_LOG_FILE = "worker_process.log" + + +def _setup_worker_logging() -> logging.Handler: + """Configures a dedicated file logger for this worker process.""" + # Get the logger specific to this module + worker_log = logging.getLogger(__name__) + worker_log.setLevel(logging.DEBUG) # Capture all levels of messages + + # Create a file handler that overwrites the file each time (mode='w') + file_handler = logging.FileHandler(WORKER_LOG_FILE, mode='w', encoding='utf-8') + formatter = logging.Formatter( + '%(asctime)s [%(levelname)-8s] (Worker-PID:%(process)d): %(message)s', + '%H:%M:%S' + ) + file_handler.setFormatter(formatter) + + # Add the handler to this specific logger + worker_log.addHandler(file_handler) + + # --- THIS IS THE KEY STEP --- + # Stop log messages from propagating to the root logger, + # which would send them to the main process's queue. + worker_log.propagate = False + + worker_log.info("Dedicated worker logging configured. Output will be in " + WORKER_LOG_FILE) + return file_handler + def run_worker_process( filepath: Path, @@ -29,22 +61,23 @@ def run_worker_process( result_queue: mp.Queue, profile: ExportProfile ): + """This function is the main target for the multiprocessing.Process.""" + + # --- NUOVA GESTIONE DEL LOGGING CON TRY...FINALLY --- + worker_file_handler = _setup_worker_logging() + profiler = cProfile.Profile() profiler.enable() - - """This function is the main target for the multiprocessing.Process.""" - log.info(f"[Worker-{mp.current_process().pid}] Started for file: {filepath.name}") - - # --- Pre-computation of required data based on the profile --- - # Determine which signal types are needed by inspecting the data paths. - # A path like "signals.SUM.some_attribute" means we need to parse "SUM" blocks. - required_signal_types = { - f.data_path.split('.')[1] for f in profile.fields if f.data_path.startswith('signals.') - } - log.info(f"Optimization active. Required signal types for this run: {required_signal_types}") - # Note: Header data is always considered necessary for context (n_rbin, n_pri, etc.) - + try: + log.info(f"[Worker-{mp.current_process().pid}] Started for file: {filepath.name}") + + # Pre-computation of required data based on the profile + required_signal_types = { + f.data_path.split('.')[1] for f in profile.fields if f.data_path.startswith('signals.') + } + log.info(f"Optimization active. Required signal types for this run: {required_signal_types}") + reader = RadarFileReader(filepath) if not reader.load_and_find_blocks(): result_queue.put({"type": "error", "message": "Failed to load or find blocks."}) @@ -55,7 +88,6 @@ def run_worker_process( interrupted = False - # Pass the required signal types to the generator batch_generator = reader.build_batches_generator(required_signal_types) for i, (batch, blocks_done_count) in enumerate(batch_generator): @@ -82,16 +114,26 @@ def run_worker_process( result_queue.put({"type": "complete", "interrupted": interrupted}) log.info(f"[Worker-{mp.current_process().pid}] Processing finished.") + except Exception as e: log.error(f"[Worker-{mp.current_process().pid}] Unhandled exception: {e}", exc_info=True) result_queue.put({"type": "error", "message": f"Worker failed: {e}"}) finally: + # --- NUOVA LOGICA DI PULIZIA NEL BLOCCO FINALLY --- + log.info("Worker process shutting down. Cleaning up logging.") + profiler.disable() pid = mp.current_process().pid profile_filename = f"profile_output_{pid}.prof" profiler.dump_stats(profile_filename) log.info(f"Profiling data saved to {profile_filename}") + + # Restore logger to its original state and close the file handler + worker_log = logging.getLogger(__name__) + worker_log.propagate = True + worker_log.removeHandler(worker_file_handler) + worker_file_handler.close() class RadarFileReader: @@ -99,8 +141,8 @@ class RadarFileReader: def __init__(self, file_path: Path): self.file_path = file_path - self.data_vector: Optional[np.ndarray] = None - self.block_indices: Optional[List[int]] = None + self.data_vector: np.ndarray | None = None + self.block_indices: list[int] | None = None def load_and_find_blocks(self) -> bool: # This method remains unchanged @@ -129,8 +171,8 @@ class RadarFileReader: if self.block_indices is None or self.data_vector is None: return - current_header: Optional[MainHeader] = None - current_signals: Dict[str, np.ndarray] = {} + current_header: MainHeader | None = None + current_signals: dict[str, np.ndarray] = {} for block_num, block_start_index in enumerate(self.block_indices): try: