350 lines
16 KiB
Python
350 lines
16 KiB
Python
# target_simulator/core/models.py
|
|
"""
|
|
Defines the core data models for the application, including 3D dynamic Targets
|
|
with programmable trajectories and Scenarios.
|
|
"""
|
|
from __future__ import annotations
|
|
import math
|
|
from enum import Enum
|
|
from dataclasses import dataclass, field, fields
|
|
from typing import Dict, List, Optional
|
|
|
|
# --- Constants ---
|
|
MIN_TARGET_ID = 0
|
|
MAX_TARGET_ID = 15
|
|
KNOTS_TO_FPS = 1.68781
|
|
FPS_TO_KNOTS = 1 / KNOTS_TO_FPS
|
|
NM_TO_FT = 6076.12
|
|
|
|
class ManeuverType(Enum):
|
|
FLY_TO_POINT = "Fly to Point"
|
|
FLY_FOR_DURATION = "Fly for Duration"
|
|
|
|
@dataclass
|
|
class Waypoint:
|
|
"""Represents a segment of a target's trajectory, defining a maneuver."""
|
|
maneuver_type: ManeuverType = ManeuverType.FLY_FOR_DURATION
|
|
|
|
# Parameters used by FLY_FOR_DURATION and as final state for FLY_TO_POINT
|
|
target_velocity_fps: Optional[float] = None
|
|
target_heading_deg: Optional[float] = None
|
|
|
|
# Parameters used by both
|
|
duration_s: Optional[float] = 10.0
|
|
target_altitude_ft: Optional[float] = 10000.0
|
|
|
|
# Parameters used by FLY_TO_POINT
|
|
target_range_nm: Optional[float] = None
|
|
target_azimuth_deg: Optional[float] = None
|
|
|
|
# Internal state for interpolation, set when waypoint becomes active
|
|
_start_velocity_fps: float = field(init=False, repr=False, default=0.0)
|
|
_start_heading_deg: float = field(init=False, repr=False, default=0.0)
|
|
_start_altitude_ft: float = field(init=False, repr=False, default=0.0)
|
|
_calculated_duration_s: Optional[float] = field(init=False, default=None, repr=False)
|
|
|
|
def to_dict(self) -> Dict:
|
|
data = {"maneuver_type": self.maneuver_type.value}
|
|
# Use dataclasses.fields to iterate reliably
|
|
for f in fields(self):
|
|
if not f.name.startswith('_') and f.name != "maneuver_type":
|
|
val = getattr(self, f.name)
|
|
if val is not None:
|
|
data[f.name] = val
|
|
return data
|
|
|
|
@dataclass
|
|
class Target:
|
|
_spline_path: Optional[List[List[float]]] = field(init=False, default=None, repr=False)
|
|
"""Represents a dynamic 3D target with a programmable trajectory."""
|
|
target_id: int
|
|
trajectory: List[Waypoint] = field(default_factory=list)
|
|
active: bool = True
|
|
traceable: bool = True
|
|
use_spline: bool = False # Se True, la traiettoria viene interpretata come spline
|
|
|
|
# --- Current simulation state (dynamic) ---
|
|
current_range_nm: float = field(init=False, default=0.0)
|
|
current_azimuth_deg: float = field(init=False, default=0.0)
|
|
current_altitude_ft: float = field(init=False, default=0.0)
|
|
current_velocity_fps: float = field(init=False, default=0.0)
|
|
current_heading_deg: float = field(init=False, default=0.0)
|
|
current_pitch_deg: float = field(init=False, default=0.0)
|
|
|
|
_pos_x_ft: float = field(init=False, repr=False, default=0.0)
|
|
_pos_y_ft: float = field(init=False, repr=False, default=0.0)
|
|
_pos_z_ft: float = field(init=False, repr=False, default=0.0)
|
|
|
|
_sim_time_s: float = field(init=False, default=0.0)
|
|
_current_waypoint_index: int = field(init=False, default=-1)
|
|
_time_in_waypoint_s: float = field(init=False, default=0.0)
|
|
|
|
def __post_init__(self):
|
|
if not (MIN_TARGET_ID <= self.target_id <= MAX_TARGET_ID):
|
|
raise ValueError(f"Target ID must be between {MIN_TARGET_ID} and {MAX_TARGET_ID}.")
|
|
self.reset_simulation()
|
|
|
|
def reset_simulation(self):
|
|
self._sim_time_s = 0.0
|
|
self._time_in_waypoint_s = 0.0
|
|
self._current_waypoint_index = -1
|
|
|
|
if self.use_spline and self.trajectory:
|
|
from target_simulator.utils.spline import catmull_rom_spline
|
|
|
|
# Convert all waypoints to cartesian points in feet
|
|
pts = []
|
|
for wp in self.trajectory:
|
|
if wp.maneuver_type == ManeuverType.FLY_TO_POINT and wp.target_range_nm is not None and wp.target_azimuth_deg is not None:
|
|
range_ft = (wp.target_range_nm or 0.0) * NM_TO_FT
|
|
az_rad = math.radians(wp.target_azimuth_deg or 0.0)
|
|
x = range_ft * math.sin(az_rad)
|
|
y = range_ft * math.cos(az_rad)
|
|
z = wp.target_altitude_ft or 0.0
|
|
pts.append([x, y, z])
|
|
|
|
if not pts:
|
|
self._pos_x_ft = self._pos_y_ft = self._pos_z_ft = 0.0
|
|
self.current_velocity_fps = 0.0
|
|
self.current_heading_deg = 0.0
|
|
self._spline_path = None
|
|
return
|
|
|
|
# Generate spline path from points in feet
|
|
self._spline_path = catmull_rom_spline(pts, num_points=200)
|
|
self._spline_index = 0
|
|
|
|
# Set initial position from the first point of the spline
|
|
initial_pos = self._spline_path[0]
|
|
self._pos_x_ft, self._pos_y_ft, self._pos_z_ft = initial_pos[0], initial_pos[1], initial_pos[2]
|
|
|
|
# Calculate total duration from waypoints
|
|
total_duration = sum(wp.duration_s or 0.0 for wp in self.trajectory if wp.duration_s)
|
|
|
|
# Create a time mapping for each point in the spline path
|
|
self._spline_point_times = [i * (total_duration / (len(self._spline_path) - 1)) for i in range(len(self._spline_path))]
|
|
if not self._spline_point_times: self._spline_point_times = [0.0]
|
|
|
|
self._update_current_polar_coords()
|
|
|
|
else:
|
|
# Waypoint-based simulation reset
|
|
if not self.trajectory or self.trajectory[0].maneuver_type != ManeuverType.FLY_TO_POINT:
|
|
self._pos_x_ft = self._pos_y_ft = self._pos_z_ft = 0.0
|
|
self.current_velocity_fps = 0.0
|
|
self.current_heading_deg = 0.0
|
|
self.current_pitch_deg = 0.0
|
|
else:
|
|
first_wp = self.trajectory[0]
|
|
self.current_range_nm = first_wp.target_range_nm or 0.0
|
|
self.current_azimuth_deg = first_wp.target_azimuth_deg or 0.0
|
|
self.current_altitude_ft = first_wp.target_altitude_ft or 0.0
|
|
self.current_velocity_fps = first_wp.target_velocity_fps or 0.0
|
|
self.current_heading_deg = first_wp.target_heading_deg or 0.0
|
|
self.current_pitch_deg = 0.0
|
|
|
|
range_ft = self.current_range_nm * NM_TO_FT
|
|
az_rad = math.radians(self.current_azimuth_deg)
|
|
self._pos_x_ft = range_ft * math.sin(az_rad)
|
|
self._pos_y_ft = range_ft * math.cos(az_rad)
|
|
self._pos_z_ft = self.current_altitude_ft
|
|
|
|
self._activate_next_waypoint()
|
|
|
|
self._update_current_polar_coords()
|
|
|
|
def _activate_next_waypoint(self):
|
|
"""Pre-calculates and activates the next waypoint in the trajectory."""
|
|
self._current_waypoint_index += 1
|
|
self._time_in_waypoint_s = 0.0
|
|
|
|
if self._current_waypoint_index >= len(self.trajectory):
|
|
self._current_waypoint_index = -1; return
|
|
|
|
wp = self.trajectory[self._current_waypoint_index]
|
|
wp._start_velocity_fps = self.current_velocity_fps
|
|
wp._start_heading_deg = self.current_heading_deg
|
|
wp._start_altitude_ft = self.current_altitude_ft
|
|
|
|
if wp.maneuver_type == ManeuverType.FLY_TO_POINT:
|
|
target_range_ft = (wp.target_range_nm or 0.0) * NM_TO_FT
|
|
target_az_rad = math.radians(wp.target_azimuth_deg or 0.0)
|
|
target_x = target_range_ft * math.sin(target_az_rad)
|
|
target_y = target_range_ft * math.cos(target_az_rad)
|
|
target_z = wp.target_altitude_ft or self._pos_z_ft
|
|
|
|
dx, dy, dz = target_x - self._pos_x_ft, target_y - self._pos_y_ft, target_z - self._pos_z_ft
|
|
dist_3d = math.sqrt(dx**2 + dy**2 + dz**2)
|
|
dist_2d = math.sqrt(dx**2 + dy**2)
|
|
|
|
self.current_heading_deg = math.degrees(math.atan2(dx, dy)) % 360
|
|
self.current_pitch_deg = math.degrees(math.atan2(dz, dist_2d)) if dist_2d > 1 else 90.0 if dz > 0 else -90.0
|
|
|
|
if wp.duration_s and wp.duration_s > 0:
|
|
self.current_velocity_fps = dist_3d / wp.duration_s
|
|
wp._calculated_duration_s = wp.duration_s
|
|
else:
|
|
self.current_velocity_fps = 0
|
|
wp._calculated_duration_s = 999999
|
|
else:
|
|
self.current_pitch_deg = 0
|
|
|
|
def update_state(self, delta_time_s: float):
|
|
if not self.active:
|
|
return
|
|
|
|
if self.use_spline and self._spline_path:
|
|
self._sim_time_s += delta_time_s
|
|
|
|
total_duration = self._spline_point_times[-1]
|
|
if total_duration <= 0 or self._sim_time_s >= total_duration:
|
|
# Simulation finished, stay at the last point
|
|
self._spline_index = len(self._spline_path) - 1
|
|
self.active = False
|
|
self.current_velocity_fps = 0.0
|
|
else:
|
|
# Find the current index in the spline path based on time
|
|
progress = self._sim_time_s / total_duration
|
|
self._spline_index = int(progress * (len(self._spline_path) - 1))
|
|
|
|
pt = self._spline_path[self._spline_index]
|
|
|
|
# Determine previous point for calculating heading/pitch
|
|
prev_pt = self._spline_path[self._spline_index - 1] if self._spline_index > 0 else pt
|
|
|
|
# Update position (path is already in feet)
|
|
self._pos_x_ft, self._pos_y_ft, self._pos_z_ft = pt[0], pt[1], pt[2]
|
|
|
|
# Update kinematics
|
|
dx = self._pos_x_ft - prev_pt[0]
|
|
dy = self._pos_y_ft - prev_pt[1]
|
|
dz = self._pos_z_ft - prev_pt[2]
|
|
|
|
dist_moved_since_last_tick = math.sqrt(dx**2 + dy**2 + dz**2)
|
|
dist_2d = math.sqrt(dx**2 + dy**2)
|
|
|
|
if dist_2d > 0.1: # Avoid division by zero or noise
|
|
self.current_heading_deg = math.degrees(math.atan2(dx, dy)) % 360
|
|
self.current_pitch_deg = math.degrees(math.atan2(dz, dist_2d))
|
|
|
|
if delta_time_s > 0:
|
|
self.current_velocity_fps = dist_moved_since_last_tick / delta_time_s
|
|
|
|
self._update_current_polar_coords()
|
|
|
|
# --- DEBUG LOG ---
|
|
#import logging
|
|
#logging.getLogger("target_simulator.spline_debug").info(
|
|
# f"SplinePos: t={self._sim_time_s:.2f}s idx={self._spline_index} x={self._pos_x_ft:.2f} y={self._pos_y_ft:.2f} z={self._pos_z_ft:.2f}")
|
|
# --- DEBUG LOG ---
|
|
#import logging
|
|
#logging.getLogger("target_simulator.spline_debug").info(
|
|
# f"SplinePos: t={self._sim_time_s:.2f}s idx={self._spline_index} x={self._pos_x_ft:.2f} y={self._pos_y_ft:.2f} z={self._pos_z_ft:.2f}")
|
|
else:
|
|
# Logica classica
|
|
self._sim_time_s += delta_time_s
|
|
self._time_in_waypoint_s += delta_time_s
|
|
if self._current_waypoint_index == -1:
|
|
return
|
|
wp = self.trajectory[self._current_waypoint_index]
|
|
duration = wp._calculated_duration_s if wp._calculated_duration_s is not None else wp.duration_s or 0.0
|
|
if self._time_in_waypoint_s >= duration:
|
|
# Finalize state to match waypoint target
|
|
if wp.maneuver_type == ManeuverType.FLY_FOR_DURATION:
|
|
self.current_velocity_fps = wp.target_velocity_fps or self.current_velocity_fps
|
|
self.current_heading_deg = wp.target_heading_deg or self.current_heading_deg
|
|
self.current_altitude_ft = wp.target_altitude_ft or self.current_altitude_ft
|
|
self._activate_next_waypoint()
|
|
if self._current_waypoint_index == -1:
|
|
return
|
|
wp = self.trajectory[self._current_waypoint_index] # Get new active waypoint
|
|
duration = wp._calculated_duration_s if wp._calculated_duration_s is not None else wp.duration_s or 0.0
|
|
progress = self._time_in_waypoint_s / duration if duration > 0 else 1.0
|
|
if wp.maneuver_type == ManeuverType.FLY_FOR_DURATION:
|
|
# Interpolate kinematics during the maneuver
|
|
self.current_velocity_fps = wp._start_velocity_fps + ((wp.target_velocity_fps or wp._start_velocity_fps) - wp._start_velocity_fps) * progress
|
|
self.current_altitude_ft = wp._start_altitude_ft + ((wp.target_altitude_ft or wp._start_altitude_ft) - wp._start_altitude_ft) * progress
|
|
delta_heading = ((wp.target_heading_deg or wp._start_heading_deg) - wp._start_heading_deg + 180) % 360 - 180
|
|
self.current_heading_deg = (wp._start_heading_deg + delta_heading * progress) % 360
|
|
# Update position using current 3D vector
|
|
heading_rad = math.radians(self.current_heading_deg)
|
|
pitch_rad = math.radians(self.current_pitch_deg)
|
|
dist_moved = self.current_velocity_fps * delta_time_s
|
|
dist_2d = dist_moved * math.cos(pitch_rad)
|
|
self._pos_x_ft += dist_2d * math.sin(heading_rad)
|
|
self._pos_y_ft += dist_2d * math.cos(heading_rad)
|
|
self._pos_z_ft += dist_moved * math.sin(pitch_rad)
|
|
self._update_current_polar_coords()
|
|
|
|
def _update_current_polar_coords(self):
|
|
"""Recalculates current polar coordinates from Cartesian ones."""
|
|
self.current_range_nm = math.sqrt(self._pos_x_ft**2 + self._pos_y_ft**2) / NM_TO_FT
|
|
self.current_azimuth_deg = math.degrees(math.atan2(self._pos_x_ft, self._pos_y_ft))
|
|
self.current_altitude_ft = self._pos_z_ft
|
|
|
|
def to_dict(self) -> Dict:
|
|
data = {
|
|
"target_id": self.target_id,
|
|
"active": self.active,
|
|
"traceable": self.traceable,
|
|
"trajectory": [wp.to_dict() for wp in self.trajectory],
|
|
"use_spline": self.use_spline
|
|
}
|
|
return data
|
|
|
|
class Scenario:
|
|
def __init__(self, name: str = "Untitled Scenario"):
|
|
self.name = name
|
|
self.targets: Dict[int, Target] = {}
|
|
|
|
def add_target(self, target: Target):
|
|
self.targets[target.target_id] = target
|
|
|
|
def get_target(self, target_id: int) -> Optional[Target]:
|
|
return self.targets.get(target_id)
|
|
|
|
def remove_target(self, target_id: int):
|
|
if target_id in self.targets:
|
|
del self.targets[target_id]
|
|
|
|
def get_all_targets(self) -> List[Target]:
|
|
return list(self.targets.values())
|
|
|
|
def reset_simulation(self):
|
|
"""Resets the simulation state for all targets in the scenario."""
|
|
for target in self.targets.values():
|
|
target.reset_simulation()
|
|
|
|
def update_state(self, delta_time_s: float):
|
|
"""Updates the state of all targets in the scenario."""
|
|
for target in self.targets.values():
|
|
target.update_state(delta_time_s)
|
|
|
|
def to_dict(self) -> Dict:
|
|
return {
|
|
"name": self.name,
|
|
"targets": [target.to_dict() for target in self.get_all_targets()]
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict) -> Scenario:
|
|
name = data.get("name", "Loaded Scenario")
|
|
scenario = cls(name=name)
|
|
for target_data in data.get("targets", []):
|
|
try:
|
|
waypoints = []
|
|
for wp_data in target_data.get("trajectory", []):
|
|
wp_data["maneuver_type"] = ManeuverType(wp_data["maneuver_type"])
|
|
waypoints.append(Waypoint(**wp_data))
|
|
|
|
target = Target(
|
|
target_id=target_data["target_id"],
|
|
active=target_data.get("active", True),
|
|
traceable=target_data.get("traceable", True),
|
|
trajectory=waypoints,
|
|
use_spline=target_data.get("use_spline", False)
|
|
)
|
|
scenario.add_target(target)
|
|
except Exception as e:
|
|
print(f"Skipping invalid target data: {target_data}. Error: {e}")
|
|
return scenario |