S1005403_RisCC/target_simulator/analysis/simulation_archive.py
2025-11-14 15:47:48 +01:00

249 lines
9.3 KiB
Python

# target_simulator/analysis/simulation_archive.py
import os
import json
import time
from datetime import datetime
from typing import Dict, List, Any, Tuple, Optional
from target_simulator.core.models import Scenario
import math
# Prefer pyproj for accurate geodesic calculations; fall back to a simple
# equirectangular approximation when pyproj is not available.
try:
from pyproj import Geod
_GEOD = Geod(ellps="WGS84")
_HAS_PYPROJ = True
except Exception:
_GEOD = None
_HAS_PYPROJ = False
# Define the structure for a recorded state
RecordedState = Tuple[float, float, float, float] # (timestamp, x_ft, y_ft, z_ft)
class SimulationArchive:
"""
Manages data collection for a single simulation run and saves it to a file.
"""
ARCHIVE_FOLDER = "archive_simulations"
def __init__(self, scenario: Scenario):
"""
Initializes a new archive session for a given scenario.
"""
self.start_time = time.monotonic()
self.scenario_name = scenario.name
self.scenario_data = scenario.to_dict()
# Data structure to hold recorded events, indexed by target_id
self.recorded_data: Dict[int, Dict[str, List[RecordedState]]] = {}
# Data structure to hold the ownship's trajectory
self.ownship_trajectory: List[Dict[str, Any]] = []
# Data structure to hold computed georeferenced positions for real targets
# keyed by target_id -> list of {'timestamp': t, 'lat': ..., 'lon': ..., 'alt_ft': ...}
self.recorded_geopos: Dict[int, List[Dict[str, Any]]] = {}
self._ensure_archive_directory()
def _ensure_archive_directory(self):
"""Creates the main archive directory if it does not exist."""
if not os.path.exists(self.ARCHIVE_FOLDER):
try:
os.makedirs(self.ARCHIVE_FOLDER)
except OSError as e:
print(f"Error creating archive directory: {e}")
def add_simulated_state(
self, target_id: int, timestamp: float, state: Tuple[float, ...]
):
"""Adds a simulated state to the archive."""
if target_id not in self.recorded_data:
self.recorded_data[target_id] = {"simulated": [], "real": []}
full_state: RecordedState = (timestamp, state[0], state[1], state[2])
self.recorded_data[target_id]["simulated"].append(full_state)
def add_real_state(
self, target_id: int, timestamp: float, state: Tuple[float, ...]
):
"""Adds a real state (from the server) to the archive."""
if target_id not in self.recorded_data:
self.recorded_data[target_id] = {"simulated": [], "real": []}
full_state: RecordedState = (timestamp, state[0], state[1], state[2])
self.recorded_data[target_id]["real"].append(full_state)
# Attempt to compute and store geoposition for this real sample.
try:
self._compute_and_store_geopos(target_id, timestamp, state)
except Exception:
# Non-fatal: if geopositioning fails we simply skip it
pass
def _compute_and_store_geopos(
self, target_id: int, timestamp: float, state: Tuple[float, ...]
):
"""Compute georeferenced lat/lon for a real state and store it in recorded_geopos.
This method is separated for easier testing and clarity.
"""
if not self.ownship_trajectory:
return
# Find ownship state closest in time
best = min(
self.ownship_trajectory,
key=lambda s: abs(s.get("timestamp", 0.0) - timestamp),
)
own_lat = best.get("latitude")
own_lon = best.get("longitude")
own_pos = best.get("position_xy_ft")
if own_lat is None or own_lon is None or not own_pos:
return
# target and ownship positions are in feet: (x_east_ft, y_north_ft)
target_x_ft = float(state[0])
target_y_ft = float(state[1])
own_x_ft = float(own_pos[0])
own_y_ft = float(own_pos[1])
# Compute deltas in meters
delta_east_m = (target_x_ft - own_x_ft) * 0.3048
delta_north_m = (target_y_ft - own_y_ft) * 0.3048
# Use pyproj.Geod when available for accurate forward geodesic
target_lat = None
target_lon = None
if _HAS_PYPROJ and _GEOD is not None:
distance_m = math.hypot(delta_east_m, delta_north_m)
az_rad = math.atan2(delta_east_m, delta_north_m)
az_deg = math.degrees(az_rad)
try:
lon2, lat2, _ = _GEOD.fwd(
float(own_lon), float(own_lat), az_deg, distance_m
)
target_lat = lat2
target_lon = lon2
except Exception:
# fall back to equirectangular below
target_lat = None
target_lon = None
if target_lat is None or target_lon is None:
# Convert meters to degrees using a simple equirectangular approximation
R = 6378137.0 # Earth radius in meters (WGS84 sphere approx)
dlat = (delta_north_m / R) * (180.0 / math.pi)
lat_rad = math.radians(float(own_lat))
dlon = (delta_east_m / (R * math.cos(lat_rad))) * (180.0 / math.pi)
target_lat = float(own_lat) + dlat
target_lon = float(own_lon) + dlon
if target_id not in self.recorded_geopos:
self.recorded_geopos[target_id] = []
self.recorded_geopos[target_id].append(
{
"timestamp": timestamp,
"lat": round(target_lat, 7),
"lon": round(target_lon, 7),
"alt_ft": float(state[2]) if len(state) > 2 else None,
}
)
def add_ownship_state(self, state: Dict[str, Any]):
"""
Adds an ownship state sample to the archive's trajectory.
Args:
state: A dictionary representing the ownship's state at a point in time.
"""
self.ownship_trajectory.append(state)
def save(self, extra_metadata: Optional[Dict[str, Any]] = None) -> str:
"""
Saves the complete simulation archive to a JSON file.
The filename is generated from the timestamp and scenario name.
Performance data is saved to a separate '.perf.csv' file.
Args:
extra_metadata: An optional dictionary of metadata to add or
overwrite in the final archive file.
Returns:
The path of the saved file.
"""
end_time = time.monotonic()
metadata = {
"scenario_name": self.scenario_name,
"start_timestamp_utc": datetime.utcnow().isoformat(),
"duration_seconds": end_time - self.start_time,
}
# Merge extra metadata if provided
if extra_metadata:
metadata.update(extra_metadata)
# --- Performance Data Separation ---
performance_samples = metadata.pop("performance_samples", None)
ts_str = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_scenario_name = "".join(
c for c in self.scenario_name if c.isalnum() or c in (" ", "_")
).rstrip()
base_filename = f"{ts_str}_{safe_scenario_name}"
# Save performance data to a separate CSV file
if performance_samples:
import csv
perf_filename = f"{base_filename}.perf.csv"
perf_filepath = os.path.join(self.ARCHIVE_FOLDER, perf_filename)
# Define headers based on the keys of the first sample
headers = list(performance_samples[0].keys())
try:
with open(perf_filepath, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
# Write metadata as commented header lines
writer.writerow([f"# Scenario Name: {metadata.get('scenario_name')}"])
writer.writerow([f"# Start Timestamp (UTC): {metadata.get('start_timestamp_utc')}"])
writer.writerow([f"# Source File: {base_filename}.json"])
# Write the actual header row
writer.writerow(headers)
# Write data rows
for sample in performance_samples:
writer.writerow([sample.get(h, "") for h in headers])
print(f"Performance data saved to: {perf_filepath}")
except IOError as e:
print(f"Error saving performance data CSV: {e}")
# --- End of Separation Logic ---
archive_content = {
"metadata": metadata,
"scenario_definition": self.scenario_data,
"ownship_trajectory": self.ownship_trajectory,
"simulation_results": self.recorded_data,
"simulation_geopos": self.recorded_geopos,
}
filename = f"{base_filename}.json"
filepath = os.path.join(self.ARCHIVE_FOLDER, filename)
try:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(archive_content, f, indent=4)
print(f"Simulation archive saved to: {filepath}")
return filepath
except IOError as e:
print(f"Error saving simulation archive: {e}")
return ""