S1005403_RisCC/target_simulator/core/models.py

238 lines
9.9 KiB
Python

# target_simulator/core/models.py
"""
Defines the core data models for the application, including dynamic Targets
with programmable trajectories and Scenarios.
"""
from __future__ import annotations
import json
import math
from dataclasses import dataclass, field, asdict
from typing import Dict, List, Optional
MIN_TARGET_ID = 0
MAX_TARGET_ID = 15
KNOTS_TO_FPS = 1.68781
FPS_TO_KNOTS = 1 / KNOTS_TO_FPS
NM_TO_FT = 6076.12
@dataclass
class Waypoint:
"""Represents a segment of a target's trajectory."""
duration_s: float = 10.0
target_velocity_fps: float = 500.0
target_heading_deg: float = 0.0
# Internal state for interpolation, set when waypoint becomes active
_start_velocity_fps: float = field(init=False, repr=False)
_start_heading_deg: float = field(init=False, repr=False)
def to_dict(self) -> Dict:
return {
"duration_s": self.duration_s,
"target_velocity_fps": self.target_velocity_fps,
"target_heading_deg": self.target_heading_deg,
}
@dataclass
class Target:
"""Represents a dynamic target with a programmable trajectory."""
target_id: int
# Initial state for simulation reset
initial_range_nm: float
initial_azimuth_deg: float
initial_altitude_ft: float
trajectory: List[Waypoint] = field(default_factory=list)
active: bool = True
traceable: bool = True
# --- Current simulation state (dynamic) ---
current_range_nm: float = field(init=False)
current_azimuth_deg: float = field(init=False)
current_velocity_fps: float = field(init=False)
current_heading_deg: float = field(init=False)
current_altitude_ft: float = field(init=False)
# Internal Cartesian coordinates for physics simulation
_pos_x_ft: float = field(init=False, repr=False)
_pos_y_ft: float = field(init=False, repr=False)
# --- Simulation progression tracking ---
_sim_time_s: float = field(init=False, default=0.0)
_current_waypoint_index: int = field(init=False, default=-1)
_time_in_waypoint_s: float = field(init=False, default=0.0)
def __post_init__(self):
if not (MIN_TARGET_ID <= self.target_id <= MAX_TARGET_ID):
raise ValueError(f"Target ID must be between {MIN_TARGET_ID} and {MAX_TARGET_ID}.")
self.reset_simulation()
def reset_simulation(self):
"""Resets the target to its initial state to start a new simulation run."""
self.current_range_nm = self.initial_range_nm
self.current_azimuth_deg = self.initial_azimuth_deg
self.current_altitude_ft = self.initial_altitude_ft
# Convert initial polar coordinates to Cartesian for physics
initial_range_ft = self.initial_range_nm * NM_TO_FT
initial_azimuth_rad = math.radians(self.initial_azimuth_deg)
self._pos_x_ft = initial_range_ft * math.sin(initial_azimuth_rad)
self._pos_y_ft = initial_range_ft * math.cos(initial_azimuth_rad)
self._sim_time_s = 0.0
self._current_waypoint_index = -1
self._time_in_waypoint_s = 0.0
# Initialize velocity and heading from the first waypoint if it exists
if self.trajectory:
self.current_velocity_fps = self.trajectory[0].target_velocity_fps
self.current_heading_deg = self.trajectory[0].target_heading_deg
self._activate_next_waypoint()
else:
self.current_velocity_fps = 0.0
self.current_heading_deg = 0.0
def _activate_next_waypoint(self):
"""Activates the next waypoint in the trajectory list."""
self._current_waypoint_index += 1
if self._current_waypoint_index < len(self.trajectory):
wp = self.trajectory[self._current_waypoint_index]
wp._start_velocity_fps = self.current_velocity_fps
wp._start_heading_deg = self.current_heading_deg
self._time_in_waypoint_s = 0.0
else:
# No more waypoints, continue with last state
self._current_waypoint_index = -1 # Sentinel for "finished"
def update_state(self, delta_time_s: float):
"""Updates the target's state and position based on its trajectory."""
if not self.active or not self.trajectory or self._current_waypoint_index == -1:
return # Target is static if inactive or has no trajectory
self._sim_time_s += delta_time_s
self._time_in_waypoint_s += delta_time_s
wp = self.trajectory[self._current_waypoint_index]
# Check if current waypoint is complete
if self._time_in_waypoint_s >= wp.duration_s:
self.current_velocity_fps = wp.target_velocity_fps
self.current_heading_deg = wp.target_heading_deg
self._activate_next_waypoint()
# If there's a new waypoint, update based on its new state for the remainder of the time slice
if self._current_waypoint_index != -1:
wp = self.trajectory[self._current_waypoint_index]
else: # Trajectory finished
pass # The target will continue straight with its last velocity/heading
# --- Interpolate current heading and velocity ---
if self._current_waypoint_index != -1 and wp.duration_s > 0:
progress = self._time_in_waypoint_s / wp.duration_s
# Linear interpolation for velocity
self.current_velocity_fps = wp._start_velocity_fps + (wp.target_velocity_fps - wp._start_velocity_fps) * progress
# Interpolate heading (shortest path)
delta_heading = (wp.target_heading_deg - wp._start_heading_deg + 180) % 360 - 180
self.current_heading_deg = (wp._start_heading_deg + delta_heading * progress) % 360
# --- Update position based on current kinematics ---
heading_rad = math.radians(self.current_heading_deg)
distance_ft = self.current_velocity_fps * delta_time_s
self._pos_x_ft += distance_ft * math.sin(heading_rad)
self._pos_y_ft += distance_ft * math.cos(heading_rad)
# --- Convert Cartesian back to Polar for display/reporting ---
self.current_range_nm = math.sqrt(self._pos_x_ft**2 + self._pos_y_ft**2) / NM_TO_FT
self.current_azimuth_deg = math.degrees(math.atan2(self._pos_x_ft, self._pos_y_ft))
def to_dict(self) -> Dict:
"""Serializes the Target object to a dictionary for saving."""
return {
"target_id": self.target_id,
"initial_range_nm": self.initial_range_nm,
"initial_azimuth_deg": self.initial_azimuth_deg,
"initial_altitude_ft": self.initial_altitude_ft,
"active": self.active,
"traceable": self.traceable,
"trajectory": [wp.to_dict() for wp in self.trajectory]
}
class Scenario:
def __init__(self, name: str = "Untitled Scenario"):
self.name = name
self.targets: Dict[int, Target] = {}
def add_target(self, target: Target):
self.targets[target.target_id] = target
def get_target(self, target_id: int) -> Optional[Target]:
return self.targets.get(target_id)
def remove_target(self, target_id: int):
if target_id in self.targets:
del self.targets[target_id]
def get_all_targets(self) -> List[Target]:
return list(self.targets.values())
def reset_simulation(self):
"""Resets the simulation state for all targets in the scenario."""
for target in self.targets.values():
target.reset_simulation()
def update_state(self, delta_time_s: float):
"""Updates the state of all targets in the scenario."""
for target in self.targets.values():
target.update_state(delta_time_s)
def to_dict(self) -> Dict:
return {
"name": self.name,
"targets": [target.to_dict() for target in self.get_all_targets()]
}
@classmethod
def from_dict(cls, data: Dict) -> Scenario:
name = data.get("name", "Loaded Scenario")
scenario = cls(name=name)
for target_data in data.get("targets", []):
try:
# --- Compatibility for old format ---
if "initial_range_nm" not in target_data and "range_nm" in target_data:
target_data["initial_range_nm"] = target_data.pop("range_nm")
if "initial_azimuth_deg" not in target_data and "azimuth_deg" in target_data:
target_data["initial_azimuth_deg"] = target_data.pop("azimuth_deg")
if "initial_altitude_ft" not in target_data and "altitude_ft" in target_data:
target_data["initial_altitude_ft"] = target_data.pop("altitude_ft")
# Reconstruct Waypoints
waypoints = [Waypoint(**wp_data) for wp_data in target_data.get("trajectory", [])]
# --- Compatibility for old format (trajectory) ---
if not waypoints and "velocity_fps" in target_data:
waypoints.append(Waypoint(
duration_s=3600, # Long duration
target_velocity_fps=target_data.get("velocity_fps", 0),
target_heading_deg=target_data.get("heading_deg", 0)
))
# Create Target
target = Target(
target_id=target_data["target_id"],
initial_range_nm=target_data["initial_range_nm"],
initial_azimuth_deg=target_data["initial_azimuth_deg"],
initial_altitude_ft=target_data["initial_altitude_ft"],
active=target_data.get("active", True),
traceable=target_data.get("traceable", True),
trajectory=waypoints
)
scenario.add_target(target)
except (KeyError, TypeError) as e:
print(f"Skipping invalid target data: {target_data}. Error: {e}")
return scenario