436 lines
18 KiB
Python
436 lines
18 KiB
Python
# target_simulator/core/models.py
|
|
"""
|
|
Defines the core data models for the application, including 3D dynamic Targets
|
|
with programmable trajectories and Scenarios.
|
|
"""
|
|
from __future__ import annotations
|
|
import math
|
|
from enum import Enum
|
|
from dataclasses import dataclass, field, fields
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
# --- Constants ---
|
|
MIN_TARGET_ID = 0
|
|
MAX_TARGET_ID = 15
|
|
KNOTS_TO_FPS = 1.68781
|
|
FPS_TO_KNOTS = 1 / KNOTS_TO_FPS
|
|
NM_TO_FT = 6076.12
|
|
G_IN_FPS2 = 32.174
|
|
|
|
|
|
class ManeuverType(Enum):
|
|
FLY_TO_POINT = "Fly to Point"
|
|
FLY_FOR_DURATION = "Fly for Duration"
|
|
DYNAMIC_MANEUVER = "Dynamic Maneuver"
|
|
|
|
|
|
class TurnDirection(Enum):
|
|
LEFT = "Left"
|
|
RIGHT = "Right"
|
|
|
|
|
|
@dataclass
|
|
class Waypoint:
|
|
maneuver_type: ManeuverType = ManeuverType.FLY_FOR_DURATION
|
|
duration_s: Optional[float] = 10.0
|
|
target_range_nm: Optional[float] = None
|
|
target_azimuth_deg: Optional[float] = None
|
|
target_altitude_ft: Optional[float] = None
|
|
target_velocity_fps: Optional[float] = None
|
|
target_heading_deg: Optional[float] = None
|
|
maneuver_speed_fps: Optional[float] = None
|
|
longitudinal_acceleration_g: Optional[float] = 0.0
|
|
lateral_acceleration_g: Optional[float] = 0.0
|
|
vertical_acceleration_g: Optional[float] = 0.0
|
|
turn_direction: Optional[TurnDirection] = TurnDirection.RIGHT
|
|
|
|
def to_dict(self) -> Dict:
|
|
data = {"maneuver_type": self.maneuver_type.value}
|
|
for f in fields(self):
|
|
if not f.name.startswith("_") and f.name != "maneuver_type":
|
|
val = getattr(self, f.name)
|
|
if isinstance(val, Enum):
|
|
data[f.name] = val.value
|
|
elif val is not None:
|
|
data[f.name] = val
|
|
return data
|
|
|
|
|
|
@dataclass
|
|
class Target:
|
|
target_id: int
|
|
trajectory: List[Waypoint] = field(default_factory=list)
|
|
active: bool = True
|
|
traceable: bool = True
|
|
restart: bool = False
|
|
use_spline: bool = False
|
|
current_range_nm: float = field(init=False, default=0.0)
|
|
current_azimuth_deg: float = field(init=False, default=0.0)
|
|
current_altitude_ft: float = field(init=False, default=0.0)
|
|
current_velocity_fps: float = field(init=False, default=0.0)
|
|
current_heading_deg: float = field(init=False, default=0.0)
|
|
current_pitch_deg: float = field(init=False, default=0.0)
|
|
_pos_x_ft: float = field(init=False, repr=False, default=0.0)
|
|
_pos_y_ft: float = field(init=False, repr=False, default=0.0)
|
|
_pos_z_ft: float = field(init=False, repr=False, default=0.0)
|
|
_sim_time_s: float = field(init=False, default=0.0)
|
|
_total_duration_s: float = field(init=False, default=0.0)
|
|
_path: List[Tuple[float, float, float, float]] = field(
|
|
init=False, repr=False, default_factory=list
|
|
)
|
|
|
|
def __post_init__(self):
|
|
if not (MIN_TARGET_ID <= self.target_id <= MAX_TARGET_ID):
|
|
raise ValueError(
|
|
f"Target ID must be between {MIN_TARGET_ID} and {MAX_TARGET_ID}."
|
|
)
|
|
self.reset_simulation()
|
|
|
|
def reset_simulation(self):
|
|
self._sim_time_s = 0.0
|
|
self._generate_path()
|
|
if self._path:
|
|
initial_pos = self._path[0]
|
|
self._pos_x_ft, self._pos_y_ft, self._pos_z_ft = (
|
|
initial_pos[1],
|
|
initial_pos[2],
|
|
initial_pos[3],
|
|
)
|
|
if len(self._path) > 1:
|
|
next_pos = self._path[1]
|
|
dx, dy, dz = (
|
|
next_pos[1] - self._pos_x_ft,
|
|
next_pos[2] - self._pos_y_ft,
|
|
next_pos[3] - self._pos_z_ft,
|
|
)
|
|
dt = next_pos[0] - initial_pos[0]
|
|
dist_3d, dist_2d = math.sqrt(dx**2 + dy**2 + dz**2), math.sqrt(
|
|
dx**2 + dy**2
|
|
)
|
|
# Compute heading using atan2(dy, dx) so heading degrees correspond
|
|
# to azimuth convention where 0 is North and positive is left (CCW).
|
|
# Restore original heading calculation (atan2(dx, dy)) to match
|
|
# the path generation convention (x = R*sin(az), y = R*cos(az)).
|
|
try:
|
|
self.current_heading_deg = (
|
|
math.degrees(math.atan2(dx, dy)) % 360 if dist_2d > 0.1 else 0.0
|
|
)
|
|
except Exception:
|
|
self.current_heading_deg = 0.0
|
|
self.current_pitch_deg = (
|
|
math.degrees(math.atan2(dz, dist_2d)) if dist_2d > 0.1 else 0.0
|
|
)
|
|
self.current_velocity_fps = dist_3d / dt if dt > 0 else 0.0
|
|
else:
|
|
(
|
|
self.current_heading_deg,
|
|
self.current_pitch_deg,
|
|
self.current_velocity_fps,
|
|
) = (0.0, 0.0, 0.0)
|
|
else:
|
|
self._pos_x_ft, self._pos_y_ft, self._pos_z_ft = 0.0, 0.0, 0.0
|
|
(
|
|
self.current_velocity_fps,
|
|
self.current_heading_deg,
|
|
self.current_pitch_deg,
|
|
) = (0.0, 0.0, 0.0)
|
|
self._update_current_polar_coords()
|
|
self.active = bool(self.trajectory)
|
|
|
|
def _generate_path(self):
|
|
self._path, self._total_duration_s = Target.generate_path_from_waypoints(
|
|
self.trajectory, self.use_spline
|
|
)
|
|
|
|
@staticmethod
|
|
def generate_path_from_waypoints(
|
|
waypoints: List[Waypoint], use_spline: bool
|
|
) -> Tuple[List[Tuple[float, ...]], float]:
|
|
if not waypoints or waypoints[0].maneuver_type != ManeuverType.FLY_TO_POINT:
|
|
return [], 0.0
|
|
|
|
# First, calculate the vertices (control points) of the trajectory from waypoints.
|
|
vertices: List[Tuple[float, float, float, float]] = []
|
|
total_duration_s = 0.0
|
|
first_wp = waypoints[0]
|
|
# Convert polar (range, azimuth) to Cartesian (x East, y North)
|
|
pos_ft = [
|
|
(first_wp.target_range_nm or 0.0) * NM_TO_FT * math.sin(math.radians(first_wp.target_azimuth_deg or 0.0)),
|
|
(first_wp.target_range_nm or 0.0) * NM_TO_FT * math.cos(math.radians(first_wp.target_azimuth_deg or 0.0)),
|
|
first_wp.target_altitude_ft or 0.0,
|
|
]
|
|
speed_fps = first_wp.target_velocity_fps or 0.0
|
|
heading_rad = math.radians(first_wp.target_heading_deg or 0.0)
|
|
pitch_rad = 0.0
|
|
|
|
vertices.append((total_duration_s, pos_ft[0], pos_ft[1], pos_ft[2]))
|
|
|
|
for i, wp in enumerate(waypoints):
|
|
if i == 0:
|
|
continue
|
|
|
|
duration = wp.duration_s or 0.0
|
|
|
|
if wp.maneuver_type == ManeuverType.FLY_TO_POINT:
|
|
# Convert polar waypoint to Cartesian using same convention
|
|
pos_ft = [
|
|
(wp.target_range_nm or 0.0) * NM_TO_FT * math.sin(math.radians(wp.target_azimuth_deg or 0.0)),
|
|
(wp.target_range_nm or 0.0) * NM_TO_FT * math.cos(math.radians(wp.target_azimuth_deg or 0.0)),
|
|
wp.target_altitude_ft or pos_ft[2],
|
|
]
|
|
total_duration_s += duration
|
|
if wp.target_velocity_fps is not None and wp.target_heading_deg is not None:
|
|
speed_fps = wp.target_velocity_fps
|
|
heading_rad = math.radians(wp.target_heading_deg)
|
|
|
|
elif wp.maneuver_type == ManeuverType.FLY_FOR_DURATION:
|
|
speed_fps = wp.target_velocity_fps if wp.target_velocity_fps is not None else speed_fps
|
|
heading_rad = math.radians(wp.target_heading_deg) if wp.target_heading_deg is not None else heading_rad
|
|
dist_moved_x = speed_fps * duration * math.sin(heading_rad)
|
|
dist_moved_y = speed_fps * duration * math.cos(heading_rad)
|
|
pos_ft[0] += dist_moved_x
|
|
pos_ft[1] += dist_moved_y
|
|
if wp.target_altitude_ft is not None:
|
|
pos_ft[2] = wp.target_altitude_ft
|
|
total_duration_s += duration
|
|
|
|
elif wp.maneuver_type == ManeuverType.DYNAMIC_MANEUVER:
|
|
if wp.maneuver_speed_fps is not None:
|
|
speed_fps = wp.maneuver_speed_fps
|
|
# Sample the dynamic maneuver at a fine time resolution so the
|
|
# generated vertices capture the curved path.
|
|
time_step = 0.1
|
|
num_steps = max(1, int(duration / time_step))
|
|
accel_lon_fps2 = (wp.longitudinal_acceleration_g or 0.0) * G_IN_FPS2
|
|
accel_lat_fps2 = (wp.lateral_acceleration_g or 0.0) * G_IN_FPS2
|
|
accel_ver_fps2 = (wp.vertical_acceleration_g or 0.0) * G_IN_FPS2
|
|
dir_multiplier = 1.0 if wp.turn_direction == TurnDirection.RIGHT else -1.0
|
|
|
|
# start from the current accumulated time
|
|
step_time = total_duration_s
|
|
for _step in range(num_steps):
|
|
# integrate kinematics for this time step
|
|
speed_fps += accel_lon_fps2 * time_step
|
|
pitch_change_rad = (accel_ver_fps2 / (speed_fps + 1e-6)) * time_step if abs(speed_fps) > 0.1 else 0.0
|
|
pitch_rad += pitch_change_rad
|
|
turn_rate_rad_s = accel_lat_fps2 / (speed_fps + 1e-6) if abs(speed_fps) > 0.1 else 0.0
|
|
heading_rad += turn_rate_rad_s * time_step * dir_multiplier
|
|
dist_step = speed_fps * time_step
|
|
dist_step_xy = dist_step * math.cos(pitch_rad)
|
|
dist_step_z = dist_step * math.sin(pitch_rad)
|
|
pos_ft[0] += dist_step_xy * math.sin(heading_rad)
|
|
pos_ft[1] += dist_step_xy * math.cos(heading_rad)
|
|
pos_ft[2] += dist_step_z
|
|
|
|
# advance time and append an intermediate vertex
|
|
step_time += time_step
|
|
vertices.append((step_time, pos_ft[0], pos_ft[1], pos_ft[2]))
|
|
|
|
# update total_duration to the end of the maneuver
|
|
total_duration_s = step_time
|
|
|
|
# Append the vertex corresponding to the end of this waypoint
|
|
vertices.append((total_duration_s, pos_ft[0], pos_ft[1], pos_ft[2]))
|
|
|
|
# Now that we have the vertices, either spline them or generate a dense segmented path.
|
|
# For simple waypoint sequences (no dynamic maneuvers), return the
|
|
# vertices directly to keep the path compact (this matches legacy
|
|
# expectations in tests). Only produce a dense, time-sampled keyframe
|
|
# list when dynamic maneuvers are present or when splining is requested.
|
|
if use_spline and len(vertices) >= 4:
|
|
from target_simulator.utils.spline import catmull_rom_spline
|
|
|
|
points_to_spline = [p[1:] for p in vertices]
|
|
num_spline_points = max(len(vertices) * 20, 200)
|
|
|
|
splined_points = catmull_rom_spline(points_to_spline, num_points=num_spline_points)
|
|
|
|
final_path = []
|
|
final_duration = vertices[-1][0]
|
|
for i, point in enumerate(splined_points):
|
|
time_val = (i / (len(splined_points) - 1)) * final_duration if len(splined_points) > 1 else 0.0
|
|
final_path.append((time_val, point[0], point[1], point[2]))
|
|
return final_path, final_duration
|
|
|
|
# Detect whether any dynamic maneuver exists
|
|
has_dynamic = any(wp.maneuver_type == ManeuverType.DYNAMIC_MANEUVER for wp in waypoints)
|
|
|
|
if not has_dynamic:
|
|
# No dynamic maneuvers: return the compact vertex list (start/end)
|
|
return vertices, total_duration_s
|
|
|
|
# Otherwise, generate the dense, segmented path for simulation.
|
|
keyframes: List[Tuple[float, float, float, float]] = []
|
|
if not vertices:
|
|
return [], 0.0
|
|
|
|
keyframes.append(vertices[0])
|
|
for i in range(len(vertices) - 1):
|
|
start_v = vertices[i]
|
|
end_v = vertices[i + 1]
|
|
start_time, start_pos = start_v[0], list(start_v[1:])
|
|
end_time, end_pos = end_v[0], list(end_v[1:])
|
|
|
|
duration = end_time - start_time
|
|
if duration <= 0:
|
|
continue
|
|
|
|
time_step, num_steps = 0.1, max(1, int(duration / 0.1))
|
|
for step in range(1, num_steps + 1):
|
|
progress = step / num_steps
|
|
current_time = start_time + progress * duration
|
|
current_pos = [start_pos[j] + (end_pos[j] - start_pos[j]) * progress for j in range(3)]
|
|
keyframes.append((current_time, current_pos[0], current_pos[1], current_pos[2]))
|
|
|
|
final_duration = vertices[-1][0]
|
|
return keyframes, final_duration
|
|
|
|
def update_state(self, delta_time_s: float):
|
|
if not self.active or not self._path:
|
|
return
|
|
self._sim_time_s += delta_time_s
|
|
current_sim_time = min(self._sim_time_s, self._total_duration_s)
|
|
if current_sim_time >= self._total_duration_s:
|
|
final_pos = self._path[-1]
|
|
self._pos_x_ft, self._pos_y_ft, self._pos_z_ft = (
|
|
final_pos[1],
|
|
final_pos[2],
|
|
final_pos[3],
|
|
)
|
|
self.current_velocity_fps = 0.0
|
|
if self._sim_time_s >= self._total_duration_s:
|
|
self.active = False
|
|
else:
|
|
p1, p2 = self._path[0], self._path[-1]
|
|
for i in range(len(self._path) - 1):
|
|
if self._path[i][0] <= current_sim_time <= self._path[i + 1][0]:
|
|
p1, p2 = self._path[i], self._path[i + 1]
|
|
break
|
|
segment_duration = p2[0] - p1[0]
|
|
progress = (
|
|
(current_sim_time - p1[0]) / segment_duration
|
|
if segment_duration > 0
|
|
else 1.0
|
|
)
|
|
prev_x, prev_y, prev_z = self._pos_x_ft, self._pos_y_ft, self._pos_z_ft
|
|
self._pos_x_ft = p1[1] + (p2[1] - p1[1]) * progress
|
|
self._pos_y_ft = p1[2] + (p2[2] - p1[2]) * progress
|
|
self._pos_z_ft = p1[3] + (p2[3] - p1[3]) * progress
|
|
dx, dy, dz = (
|
|
self._pos_x_ft - prev_x,
|
|
self._pos_y_ft - prev_y,
|
|
self._pos_z_ft - prev_z,
|
|
)
|
|
dist_3d, dist_2d = math.sqrt(dx**2 + dy**2 + dz**2), math.sqrt(
|
|
dx**2 + dy**2
|
|
)
|
|
if delta_time_s > 0:
|
|
self.current_velocity_fps = dist_3d / delta_time_s
|
|
if dist_2d > 0.1:
|
|
# Restore original update_state heading computation using atan2(dx, dy).
|
|
try:
|
|
self.current_heading_deg = math.degrees(math.atan2(dx, dy)) % 360
|
|
except Exception:
|
|
self.current_heading_deg = 0.0
|
|
self.current_pitch_deg = math.degrees(math.atan2(dz, dist_2d))
|
|
self._update_current_polar_coords()
|
|
|
|
def _update_current_polar_coords(self):
|
|
"""
|
|
Calculates and updates the target's current polar coordinates from its
|
|
Cartesian position. Normalizes azimuth to the range [-180, 180].
|
|
"""
|
|
self.current_range_nm = (
|
|
math.sqrt(self._pos_x_ft**2 + self._pos_y_ft**2) / NM_TO_FT
|
|
)
|
|
|
|
# Calculate azimuth in degrees
|
|
azimuth_deg = math.degrees(math.atan2(self._pos_x_ft, self._pos_y_ft))
|
|
|
|
# Normalize angle to [-180, 180]
|
|
while azimuth_deg > 180:
|
|
azimuth_deg -= 360
|
|
while azimuth_deg < -180:
|
|
azimuth_deg += 360
|
|
|
|
self.current_azimuth_deg = azimuth_deg
|
|
self.current_altitude_ft = self._pos_z_ft
|
|
|
|
def to_dict(self) -> Dict:
|
|
return {
|
|
"target_id": self.target_id,
|
|
"active": self.active,
|
|
"traceable": self.traceable,
|
|
"trajectory": [wp.to_dict() for wp in self.trajectory],
|
|
"use_spline": self.use_spline,
|
|
}
|
|
|
|
|
|
class Scenario:
|
|
def __init__(self, name: str = "Untitled Scenario"):
|
|
self.name = name
|
|
self.targets: Dict[int, Target] = {}
|
|
|
|
def add_target(self, target: Target):
|
|
self.targets[target.target_id] = target
|
|
|
|
def get_target(self, target_id: int) -> Optional[Target]:
|
|
return self.targets.get(target_id)
|
|
|
|
def remove_target(self, target_id: int):
|
|
if target_id in self.targets:
|
|
del self.targets[target_id]
|
|
|
|
def get_all_targets(self) -> List[Target]:
|
|
return list(self.targets.values())
|
|
|
|
def reset_simulation(self):
|
|
for target in self.targets.values():
|
|
target.reset_simulation()
|
|
|
|
def update_state(self, delta_time_s: float):
|
|
for target in self.targets.values():
|
|
target.update_state(delta_time_s)
|
|
|
|
def is_finished(self) -> bool:
|
|
return all(not target.active for target in self.targets.values())
|
|
|
|
def to_dict(self) -> Dict:
|
|
return {
|
|
"name": self.name,
|
|
"targets": [t.to_dict() for t in self.get_all_targets()],
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict) -> Scenario:
|
|
scenario = cls(name=data.get("name", "Loaded Scenario"))
|
|
for target_data in data.get("targets", []):
|
|
try:
|
|
waypoints = []
|
|
for wp_data in target_data.get("trajectory", []):
|
|
if wp_data.get("maneuver_type") == "Constant Turn":
|
|
wp_data["maneuver_type"] = "Dynamic Maneuver"
|
|
wp_data["longitudinal_acceleration_g"] = 0.0
|
|
wp_data["vertical_acceleration_g"] = 0.0
|
|
wp_data["maneuver_type"] = ManeuverType(wp_data["maneuver_type"])
|
|
if "turn_direction" in wp_data and wp_data["turn_direction"]:
|
|
wp_data["turn_direction"] = TurnDirection(
|
|
wp_data["turn_direction"]
|
|
)
|
|
valid_keys = {f.name for f in fields(Waypoint)}
|
|
filtered_wp_data = {
|
|
k: v for k, v in wp_data.items() if k in valid_keys
|
|
}
|
|
waypoints.append(Waypoint(**filtered_wp_data))
|
|
target = Target(
|
|
target_id=target_data["target_id"],
|
|
active=target_data.get("active", True),
|
|
traceable=target_data.get("traceable", True),
|
|
trajectory=waypoints,
|
|
use_spline=target_data.get("use_spline", False),
|
|
)
|
|
scenario.add_target(target)
|
|
except Exception as e:
|
|
print(f"Skipping invalid target data: {target_data}. Error: {e}")
|
|
return scenario
|