add separate log for worker

This commit is contained in:
VALLONGOL 2025-06-18 15:50:55 +02:00
parent 23b3c37c88
commit 94d3ba00d5
2 changed files with 65 additions and 23 deletions

View File

@ -1,6 +1,6 @@
{
"last_opened_file": "C:/src/____GitProjects/radar_data_reader/_25-05-15-12-22-52_sata_354-n11.out",
"last_output_file": "C:\\src\\____GitProjects\\radar_data_reader\\_25-05-15-12-22-52_sata_354-n11.csv",
"last_opened_file": "C:/src/____GitProjects/radar_data_reader/_25-05-15-12-22-52_sata_568-n13.out",
"last_output_file": "C:\\src\\____GitProjects\\radar_data_reader\\_25-05-15-12-22-52_sata_568-n13.csv",
"active_export_profile_name": "Default",
"export_profiles": [
{

View File

@ -1,12 +1,15 @@
# radar_data_reader/core/file_reader.py
"""
Worker process logic for reading and parsing radar data files.
"""
import multiprocessing as mp
import logging # <-- NUOVO IMPORT
from pathlib import Path
from typing import List, Optional, Dict, Iterator, Tuple, Set
from typing import Set, Iterator, Tuple
import queue
import cProfile
import pstats
import cProfile
import pstats
import numpy as np
@ -22,6 +25,35 @@ HEADER_BLOCK_NAME = 1213223748
BLOCK_NAME_OFFSET = 17
BLOCK_SIZE_OFFSET = 5
# --- NUOVA COSTANTE PER IL FILE DI LOG DEL WORKER ---
WORKER_LOG_FILE = "worker_process.log"
def _setup_worker_logging() -> logging.Handler:
"""Configures a dedicated file logger for this worker process."""
# Get the logger specific to this module
worker_log = logging.getLogger(__name__)
worker_log.setLevel(logging.DEBUG) # Capture all levels of messages
# Create a file handler that overwrites the file each time (mode='w')
file_handler = logging.FileHandler(WORKER_LOG_FILE, mode='w', encoding='utf-8')
formatter = logging.Formatter(
'%(asctime)s [%(levelname)-8s] (Worker-PID:%(process)d): %(message)s',
'%H:%M:%S'
)
file_handler.setFormatter(formatter)
# Add the handler to this specific logger
worker_log.addHandler(file_handler)
# --- THIS IS THE KEY STEP ---
# Stop log messages from propagating to the root logger,
# which would send them to the main process's queue.
worker_log.propagate = False
worker_log.info("Dedicated worker logging configured. Output will be in " + WORKER_LOG_FILE)
return file_handler
def run_worker_process(
filepath: Path,
@ -29,22 +61,23 @@ def run_worker_process(
result_queue: mp.Queue,
profile: ExportProfile
):
"""This function is the main target for the multiprocessing.Process."""
# --- NUOVA GESTIONE DEL LOGGING CON TRY...FINALLY ---
worker_file_handler = _setup_worker_logging()
profiler = cProfile.Profile()
profiler.enable()
"""This function is the main target for the multiprocessing.Process."""
log.info(f"[Worker-{mp.current_process().pid}] Started for file: {filepath.name}")
# --- Pre-computation of required data based on the profile ---
# Determine which signal types are needed by inspecting the data paths.
# A path like "signals.SUM.some_attribute" means we need to parse "SUM" blocks.
required_signal_types = {
f.data_path.split('.')[1] for f in profile.fields if f.data_path.startswith('signals.')
}
log.info(f"Optimization active. Required signal types for this run: {required_signal_types}")
# Note: Header data is always considered necessary for context (n_rbin, n_pri, etc.)
try:
log.info(f"[Worker-{mp.current_process().pid}] Started for file: {filepath.name}")
# Pre-computation of required data based on the profile
required_signal_types = {
f.data_path.split('.')[1] for f in profile.fields if f.data_path.startswith('signals.')
}
log.info(f"Optimization active. Required signal types for this run: {required_signal_types}")
reader = RadarFileReader(filepath)
if not reader.load_and_find_blocks():
result_queue.put({"type": "error", "message": "Failed to load or find blocks."})
@ -55,7 +88,6 @@ def run_worker_process(
interrupted = False
# Pass the required signal types to the generator
batch_generator = reader.build_batches_generator(required_signal_types)
for i, (batch, blocks_done_count) in enumerate(batch_generator):
@ -82,16 +114,26 @@ def run_worker_process(
result_queue.put({"type": "complete", "interrupted": interrupted})
log.info(f"[Worker-{mp.current_process().pid}] Processing finished.")
except Exception as e:
log.error(f"[Worker-{mp.current_process().pid}] Unhandled exception: {e}", exc_info=True)
result_queue.put({"type": "error", "message": f"Worker failed: {e}"})
finally:
# --- NUOVA LOGICA DI PULIZIA NEL BLOCCO FINALLY ---
log.info("Worker process shutting down. Cleaning up logging.")
profiler.disable()
pid = mp.current_process().pid
profile_filename = f"profile_output_{pid}.prof"
profiler.dump_stats(profile_filename)
log.info(f"Profiling data saved to {profile_filename}")
# Restore logger to its original state and close the file handler
worker_log = logging.getLogger(__name__)
worker_log.propagate = True
worker_log.removeHandler(worker_file_handler)
worker_file_handler.close()
class RadarFileReader:
@ -99,8 +141,8 @@ class RadarFileReader:
def __init__(self, file_path: Path):
self.file_path = file_path
self.data_vector: Optional[np.ndarray] = None
self.block_indices: Optional[List[int]] = None
self.data_vector: np.ndarray | None = None
self.block_indices: list[int] | None = None
def load_and_find_blocks(self) -> bool:
# This method remains unchanged
@ -129,8 +171,8 @@ class RadarFileReader:
if self.block_indices is None or self.data_vector is None:
return
current_header: Optional[MainHeader] = None
current_signals: Dict[str, np.ndarray] = {}
current_header: MainHeader | None = None
current_signals: dict[str, np.ndarray] = {}
for block_num, block_start_index in enumerate(self.block_indices):
try: