# target_simulator/analysis/simulation_archive.py import os import json import time from datetime import datetime from typing import Dict, List, Any, Tuple, Optional from target_simulator.core.models import Scenario import math # Prefer pyproj for accurate geodesic calculations; fall back to a simple # equirectangular approximation when pyproj is not available. try: from pyproj import Geod _GEOD = Geod(ellps="WGS84") _HAS_PYPROJ = True except Exception: _GEOD = None _HAS_PYPROJ = False # Define the structure for a recorded state RecordedState = Tuple[float, float, float, float] # (timestamp, x_ft, y_ft, z_ft) class SimulationArchive: """ Manages data collection for a single simulation run and saves it to a file. """ ARCHIVE_FOLDER = "archive_simulations" def __init__(self, scenario: Scenario): """ Initializes a new archive session for a given scenario. """ self.start_time = time.monotonic() self.scenario_name = scenario.name self.scenario_data = scenario.to_dict() # Data structure to hold recorded events, indexed by target_id self.recorded_data: Dict[int, Dict[str, List[RecordedState]]] = {} # Data structure to hold the ownship's trajectory self.ownship_trajectory: List[Dict[str, Any]] = [] # Data structure to hold computed georeferenced positions for real targets # keyed by target_id -> list of {'timestamp': t, 'lat': ..., 'lon': ..., 'alt_ft': ...} self.recorded_geopos: Dict[int, List[Dict[str, Any]]] = {} self._ensure_archive_directory() def _ensure_archive_directory(self): """Creates the main archive directory if it does not exist.""" if not os.path.exists(self.ARCHIVE_FOLDER): try: os.makedirs(self.ARCHIVE_FOLDER) except OSError as e: print(f"Error creating archive directory: {e}") def add_simulated_state( self, target_id: int, timestamp: float, state: Tuple[float, ...] ): """Adds a simulated state to the archive.""" if target_id not in self.recorded_data: self.recorded_data[target_id] = {"simulated": [], "real": []} full_state: RecordedState = (timestamp, state[0], state[1], state[2]) self.recorded_data[target_id]["simulated"].append(full_state) def add_real_state( self, target_id: int, timestamp: float, state: Tuple[float, ...] ): """Adds a real state (from the server) to the archive.""" if target_id not in self.recorded_data: self.recorded_data[target_id] = {"simulated": [], "real": []} full_state: RecordedState = (timestamp, state[0], state[1], state[2]) self.recorded_data[target_id]["real"].append(full_state) # Attempt to compute and store geoposition for this real sample. try: self._compute_and_store_geopos(target_id, timestamp, state) except Exception: # Non-fatal: if geopositioning fails we simply skip it pass def _compute_and_store_geopos( self, target_id: int, timestamp: float, state: Tuple[float, ...] ): """Compute georeferenced lat/lon for a real state and store it in recorded_geopos. This method is separated for easier testing and clarity. """ if not self.ownship_trajectory: return # Find ownship state closest in time best = min( self.ownship_trajectory, key=lambda s: abs(s.get("timestamp", 0.0) - timestamp), ) own_lat = best.get("latitude") own_lon = best.get("longitude") own_pos = best.get("position_xy_ft") if own_lat is None or own_lon is None or not own_pos: return # target and ownship positions are in feet: (x_east_ft, y_north_ft) target_x_ft = float(state[0]) target_y_ft = float(state[1]) own_x_ft = float(own_pos[0]) own_y_ft = float(own_pos[1]) # Compute deltas in meters delta_east_m = (target_x_ft - own_x_ft) * 0.3048 delta_north_m = (target_y_ft - own_y_ft) * 0.3048 # Use pyproj.Geod when available for accurate forward geodesic target_lat = None target_lon = None if _HAS_PYPROJ and _GEOD is not None: distance_m = math.hypot(delta_east_m, delta_north_m) az_rad = math.atan2(delta_east_m, delta_north_m) az_deg = math.degrees(az_rad) try: lon2, lat2, _ = _GEOD.fwd( float(own_lon), float(own_lat), az_deg, distance_m ) target_lat = lat2 target_lon = lon2 except Exception: # fall back to equirectangular below target_lat = None target_lon = None if target_lat is None or target_lon is None: # Convert meters to degrees using a simple equirectangular approximation R = 6378137.0 # Earth radius in meters (WGS84 sphere approx) dlat = (delta_north_m / R) * (180.0 / math.pi) lat_rad = math.radians(float(own_lat)) dlon = (delta_east_m / (R * math.cos(lat_rad))) * (180.0 / math.pi) target_lat = float(own_lat) + dlat target_lon = float(own_lon) + dlon if target_id not in self.recorded_geopos: self.recorded_geopos[target_id] = [] self.recorded_geopos[target_id].append( { "timestamp": timestamp, "lat": round(target_lat, 7), "lon": round(target_lon, 7), "alt_ft": float(state[2]) if len(state) > 2 else None, } ) def add_ownship_state(self, state: Dict[str, Any]): """ Adds an ownship state sample to the archive's trajectory. Args: state: A dictionary representing the ownship's state at a point in time. """ self.ownship_trajectory.append(state) def save(self, extra_metadata: Optional[Dict[str, Any]] = None) -> str: """ Saves the complete simulation archive to a JSON file. The filename is generated from the timestamp and scenario name. Args: extra_metadata: An optional dictionary of metadata to add or overwrite in the final archive file. Returns: The path of the saved file. """ end_time = time.monotonic() metadata = { "scenario_name": self.scenario_name, "start_timestamp_utc": datetime.utcnow().isoformat(), "duration_seconds": end_time - self.start_time, } # Merge extra metadata if provided if extra_metadata: metadata.update(extra_metadata) archive_content = { "metadata": metadata, "scenario_definition": self.scenario_data, "ownship_trajectory": self.ownship_trajectory, "simulation_results": self.recorded_data, # Georeferenced positions per target (optional - may be empty) "simulation_geopos": self.recorded_geopos, } ts_str = datetime.now().strftime("%Y%m%d_%H%M%S") safe_scenario_name = "".join( c for c in self.scenario_name if c.isalnum() or c in (" ", "_") ).rstrip() filename = f"{ts_str}_{safe_scenario_name}.json" filepath = os.path.join(self.ARCHIVE_FOLDER, filename) try: with open(filepath, "w", encoding="utf-8") as f: json.dump(archive_content, f, indent=4) print(f"Simulation archive saved to: {filepath}") return filepath except IOError as e: print(f"Error saving simulation archive: {e}") return ""