120 lines
4.2 KiB
Python
120 lines
4.2 KiB
Python
# target_simulator/analysis/simulation_archive.py
|
|
|
|
import os
|
|
import json
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Dict, List, Any, Tuple, Optional
|
|
|
|
from target_simulator.core.models import Scenario
|
|
|
|
# Define the structure for a recorded state
|
|
RecordedState = Tuple[float, float, float, float] # (timestamp, x_ft, y_ft, z_ft)
|
|
|
|
|
|
class SimulationArchive:
|
|
"""
|
|
Manages data collection for a single simulation run and saves it to a file.
|
|
"""
|
|
|
|
ARCHIVE_FOLDER = "archive_simulations"
|
|
|
|
def __init__(self, scenario: Scenario):
|
|
"""
|
|
Initializes a new archive session for a given scenario.
|
|
"""
|
|
self.start_time = time.monotonic()
|
|
self.scenario_name = scenario.name
|
|
self.scenario_data = scenario.to_dict()
|
|
|
|
# Data structure to hold recorded events, indexed by target_id
|
|
self.recorded_data: Dict[int, Dict[str, List[RecordedState]]] = {}
|
|
|
|
# Data structure to hold the ownship's trajectory
|
|
self.ownship_trajectory: List[Dict[str, Any]] = []
|
|
|
|
self._ensure_archive_directory()
|
|
|
|
def _ensure_archive_directory(self):
|
|
"""Creates the main archive directory if it does not exist."""
|
|
if not os.path.exists(self.ARCHIVE_FOLDER):
|
|
try:
|
|
os.makedirs(self.ARCHIVE_FOLDER)
|
|
except OSError as e:
|
|
print(f"Error creating archive directory: {e}")
|
|
|
|
def add_simulated_state(
|
|
self, target_id: int, timestamp: float, state: Tuple[float, ...]
|
|
):
|
|
"""Adds a simulated state to the archive."""
|
|
if target_id not in self.recorded_data:
|
|
self.recorded_data[target_id] = {"simulated": [], "real": []}
|
|
|
|
full_state: RecordedState = (timestamp, state[0], state[1], state[2])
|
|
self.recorded_data[target_id]["simulated"].append(full_state)
|
|
|
|
def add_real_state(
|
|
self, target_id: int, timestamp: float, state: Tuple[float, ...]
|
|
):
|
|
"""Adds a real state (from the server) to the archive."""
|
|
if target_id not in self.recorded_data:
|
|
self.recorded_data[target_id] = {"simulated": [], "real": []}
|
|
|
|
full_state: RecordedState = (timestamp, state[0], state[1], state[2])
|
|
self.recorded_data[target_id]["real"].append(full_state)
|
|
|
|
def add_ownship_state(self, state: Dict[str, Any]):
|
|
"""
|
|
Adds an ownship state sample to the archive's trajectory.
|
|
|
|
Args:
|
|
state: A dictionary representing the ownship's state at a point in time.
|
|
"""
|
|
self.ownship_trajectory.append(state)
|
|
|
|
def save(self, extra_metadata: Optional[Dict[str, Any]] = None) -> str:
|
|
"""
|
|
Saves the complete simulation archive to a JSON file.
|
|
The filename is generated from the timestamp and scenario name.
|
|
|
|
Args:
|
|
extra_metadata: An optional dictionary of metadata to add or
|
|
overwrite in the final archive file.
|
|
|
|
Returns:
|
|
The path of the saved file.
|
|
"""
|
|
end_time = time.monotonic()
|
|
|
|
metadata = {
|
|
"scenario_name": self.scenario_name,
|
|
"start_timestamp_utc": datetime.utcnow().isoformat(),
|
|
"duration_seconds": end_time - self.start_time,
|
|
}
|
|
|
|
# Merge extra metadata if provided
|
|
if extra_metadata:
|
|
metadata.update(extra_metadata)
|
|
|
|
archive_content = {
|
|
"metadata": metadata,
|
|
"scenario_definition": self.scenario_data,
|
|
"ownship_trajectory": self.ownship_trajectory,
|
|
"simulation_results": self.recorded_data,
|
|
}
|
|
|
|
ts_str = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
safe_scenario_name = "".join(
|
|
c for c in self.scenario_name if c.isalnum() or c in (" ", "_")
|
|
).rstrip()
|
|
filename = f"{ts_str}_{safe_scenario_name}.json"
|
|
filepath = os.path.join(self.ARCHIVE_FOLDER, filename)
|
|
|
|
try:
|
|
with open(filepath, "w", encoding="utf-8") as f:
|
|
json.dump(archive_content, f, indent=4)
|
|
print(f"Simulation archive saved to: {filepath}")
|
|
return filepath
|
|
except IOError as e:
|
|
print(f"Error saving simulation archive: {e}")
|
|
return "" |