SXXXXXXX_RadarDataReader/run_structure_analysis.py
VALLONGOL 5de2650675 add
2025-11-12 13:43:30 +01:00

87 lines
3.2 KiB
Python

import multiprocessing
from pathlib import Path
import logging
import sys
from radar_data_reader.core.file_analyzer import FileStructureAnalyzer
from radar_data_reader.utils.config_manager import ConfigManager
def run_structure_analysis():
"""
Runs the file structure analysis for a given .out file.
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)-8s] %(name)-20s: %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger(__name__)
if len(sys.argv) < 2:
log.error("Usage: python run_structure_analysis.py <path_to_out_file>")
return
filepath = Path(sys.argv[1])
if not filepath.is_file():
log.error(f"Input file not found: {filepath}")
return
# The analyzer needs the config for some settings, but we can simplify
config_manager = ConfigManager(Path("config/config.json").resolve())
config_manager.load_config()
analysis_config = config_manager.get("file_analyzer_config", {})
# The analyzer writes to a queue, so we need one.
result_queue = multiprocessing.Queue()
log.info(f"Starting structure analysis for file: {filepath}")
try:
analyzer = FileStructureAnalyzer(filepath, analysis_config)
report = analyzer.analyze(result_queue)
stats = analyzer.stats
# Save the report to a text file
report_path = filepath.parent / f"{filepath.stem}_python_analysis_report.txt"
log.info(f"Saving analysis report to: {report_path}")
with open(report_path, "w", encoding="utf-8") as f:
f.write(f"--- Analysis Report for file: {filepath.name} ---\n\n")
f.write("--- Summary ---\n")
for key, value in stats.items():
if key != "block_type_counts":
f.write(f"{key.replace('_', ' ').title()}: {value}\n")
f.write("\n")
if "block_type_counts" in stats and stats["block_type_counts"]:
f.write("--- Block Type Summary ---\n")
sorted_blocks = sorted(stats["block_type_counts"].items(), key=lambda item: item[1], reverse=True)
for name, count in sorted_blocks:
f.write(f"{name:<15}: {count}\n")
f.write("\n")
f.write("--- Block Sequence Log ---\n")
for entry in report:
offset = entry.get('offset', 'N/A')
msg_type = entry.get('type', 'INFO').upper()
if msg_type == "BLOCK":
name = entry.get('name', 'N/A')
size = entry.get('size_bytes', 'N/A')
real_size = entry.get('real_block_size_bytes', 'N/A')
line = f"[{offset}] {msg_type:<8} | Name: {name:<15} | Declared: {size:<8} | Real: {real_size}"
f.write(line + "\n")
else:
message = entry.get('message', '')
f.write(f"[{offset}] {msg_type:<8} | {message}\n")
log.info("Analysis complete.")
except Exception as e:
log.error(f"An error occurred during analysis: {e}", exc_info=True)
if __name__ == "__main__":
multiprocessing.freeze_support()
run_structure_analysis()