# target_simulator/analysis/simulation_archive.py import os import json import time from datetime import datetime from typing import Dict, List, Any, Tuple, Optional from target_simulator.core.models import Scenario # Define the structure for a recorded state RecordedState = Tuple[float, float, float, float] # (timestamp, x_ft, y_ft, z_ft) class SimulationArchive: """ Manages data collection for a single simulation run and saves it to a file. """ ARCHIVE_FOLDER = "archive_simulations" def __init__(self, scenario: Scenario): """ Initializes a new archive session for a given scenario. """ self.start_time = time.monotonic() self.scenario_name = scenario.name self.scenario_data = scenario.to_dict() # Data structure to hold recorded events, indexed by target_id self.recorded_data: Dict[int, Dict[str, List[RecordedState]]] = {} # Data structure to hold the ownship's trajectory self.ownship_trajectory: List[Dict[str, Any]] = [] self._ensure_archive_directory() def _ensure_archive_directory(self): """Creates the main archive directory if it does not exist.""" if not os.path.exists(self.ARCHIVE_FOLDER): try: os.makedirs(self.ARCHIVE_FOLDER) except OSError as e: print(f"Error creating archive directory: {e}") def add_simulated_state( self, target_id: int, timestamp: float, state: Tuple[float, ...] ): """Adds a simulated state to the archive.""" if target_id not in self.recorded_data: self.recorded_data[target_id] = {"simulated": [], "real": []} full_state: RecordedState = (timestamp, state[0], state[1], state[2]) self.recorded_data[target_id]["simulated"].append(full_state) def add_real_state( self, target_id: int, timestamp: float, state: Tuple[float, ...] ): """Adds a real state (from the server) to the archive.""" if target_id not in self.recorded_data: self.recorded_data[target_id] = {"simulated": [], "real": []} full_state: RecordedState = (timestamp, state[0], state[1], state[2]) self.recorded_data[target_id]["real"].append(full_state) def add_ownship_state(self, state: Dict[str, Any]): """ Adds an ownship state sample to the archive's trajectory. Args: state: A dictionary representing the ownship's state at a point in time. """ self.ownship_trajectory.append(state) def save(self, extra_metadata: Optional[Dict[str, Any]] = None) -> str: """ Saves the complete simulation archive to a JSON file. The filename is generated from the timestamp and scenario name. Args: extra_metadata: An optional dictionary of metadata to add or overwrite in the final archive file. Returns: The path of the saved file. """ end_time = time.monotonic() metadata = { "scenario_name": self.scenario_name, "start_timestamp_utc": datetime.utcnow().isoformat(), "duration_seconds": end_time - self.start_time, } # Merge extra metadata if provided if extra_metadata: metadata.update(extra_metadata) archive_content = { "metadata": metadata, "scenario_definition": self.scenario_data, "ownship_trajectory": self.ownship_trajectory, "simulation_results": self.recorded_data, } ts_str = datetime.now().strftime("%Y%m%d_%H%M%S") safe_scenario_name = "".join( c for c in self.scenario_name if c.isalnum() or c in (" ", "_") ).rstrip() filename = f"{ts_str}_{safe_scenario_name}.json" filepath = os.path.join(self.ARCHIVE_FOLDER, filename) try: with open(filepath, "w", encoding="utf-8") as f: json.dump(archive_content, f, indent=4) print(f"Simulation archive saved to: {filepath}") return filepath except IOError as e: print(f"Error saving simulation archive: {e}") return ""