S1005403_RisCC/target_simulator/core/models.py

333 lines
13 KiB
Python

# target_simulator/core/models.py
"""
Defines the core data models for the application, including 3D dynamic Targets
with programmable trajectories and Scenarios.
"""
from __future__ import annotations
import math
from enum import Enum
from dataclasses import dataclass, field, fields
from typing import Dict, List, Optional, Tuple
# --- Constants ---
MIN_TARGET_ID = 0
MAX_TARGET_ID = 15
KNOTS_TO_FPS = 1.68781
FPS_TO_KNOTS = 1 / KNOTS_TO_FPS
NM_TO_FT = 6076.12
class ManeuverType(Enum):
FLY_TO_POINT = "Fly to Point"
FLY_FOR_DURATION = "Fly for Duration"
@dataclass
class Waypoint:
"""Represents a segment of a target's trajectory, defining a maneuver."""
maneuver_type: ManeuverType = ManeuverType.FLY_FOR_DURATION
# Parameters used by both maneuver types
duration_s: Optional[float] = 10.0
target_altitude_ft: Optional[float] = 10000.0
# Final state parameters for FLY_TO_POINT and dynamic for FLY_FOR_DURATION
target_velocity_fps: Optional[float] = None
target_heading_deg: Optional[float] = None
# Position parameters for FLY_TO_POINT
target_range_nm: Optional[float] = None
target_azimuth_deg: Optional[float] = None
def to_dict(self) -> Dict:
"""Serializes the waypoint to a dictionary."""
data = {"maneuver_type": self.maneuver_type.value}
for f in fields(self):
# Exclude private fields and the already-handled maneuver_type
if not f.name.startswith('_') and f.name != "maneuver_type":
val = getattr(self, f.name)
if val is not None:
data[f.name] = val
return data
@dataclass
class Target:
"""Represents a dynamic 3D target with a programmable trajectory."""
target_id: int
trajectory: List[Waypoint] = field(default_factory=list)
active: bool = True
traceable: bool = True
use_spline: bool = False
# --- Current simulation state (dynamic) ---
current_range_nm: float = field(init=False, default=0.0)
current_azimuth_deg: float = field(init=False, default=0.0)
current_altitude_ft: float = field(init=False, default=0.0)
current_velocity_fps: float = field(init=False, default=0.0)
current_heading_deg: float = field(init=False, default=0.0)
current_pitch_deg: float = field(init=False, default=0.0)
# Internal Cartesian position in feet
_pos_x_ft: float = field(init=False, repr=False, default=0.0)
_pos_y_ft: float = field(init=False, repr=False, default=0.0)
_pos_z_ft: float = field(init=False, repr=False, default=0.0)
# Simulation time and pre-calculated path
_sim_time_s: float = field(init=False, default=0.0)
_total_duration_s: float = field(init=False, default=0.0)
# Path format: List of (timestamp_s, x_ft, y_ft, z_ft)
_path: List[Tuple[float, float, float, float]] = field(init=False, repr=False, default_factory=list)
def __post_init__(self):
"""Validates ID and initializes the simulation state."""
if not (MIN_TARGET_ID <= self.target_id <= MAX_TARGET_ID):
raise ValueError(f"Target ID must be between {MIN_TARGET_ID} and {MAX_TARGET_ID}.")
self.reset_simulation()
def reset_simulation(self):
"""
Resets the simulation time and pre-calculates the entire trajectory path
based on the waypoints. This is the core of the refactored logic.
"""
self._sim_time_s = 0.0
self._generate_path()
if self._path:
# Set initial state from the first point of the generated path
initial_pos = self._path[0]
self._pos_x_ft = initial_pos[1]
self._pos_y_ft = initial_pos[2]
self._pos_z_ft = initial_pos[3]
# Calculate initial heading and velocity from the first segment
if len(self._path) > 1:
next_pos = self._path[1]
dx = next_pos[1] - self._pos_x_ft
dy = next_pos[2] - self._pos_y_ft
dz = next_pos[3] - self._pos_z_ft
dt = next_pos[0] - initial_pos[0]
dist_3d = math.sqrt(dx**2 + dy**2 + dz**2)
dist_2d = math.sqrt(dx**2 + dy**2)
self.current_heading_deg = math.degrees(math.atan2(dx, dy)) % 360 if dist_2d > 0.1 else 0.0
self.current_pitch_deg = math.degrees(math.atan2(dz, dist_2d)) if dist_2d > 0.1 else 0.0
self.current_velocity_fps = dist_3d / dt if dt > 0 else 0.0
else:
self.current_heading_deg = 0.0
self.current_pitch_deg = 0.0
self.current_velocity_fps = 0.0
else:
# No trajectory, reset to origin
self._pos_x_ft = self._pos_y_ft = self._pos_z_ft = 0.0
self.current_velocity_fps = 0.0
self.current_heading_deg = 0.0
self.current_pitch_deg = 0.0
self._update_current_polar_coords()
self.active = bool(self.trajectory) # A target is active if it has a trajectory
def _generate_path(self):
"""Instance method that uses the static path generator."""
self._path, self._total_duration_s = Target.generate_path_from_waypoints(self.trajectory, self.use_spline)
@staticmethod
def generate_path_from_waypoints(waypoints: List[Waypoint], use_spline: bool) -> Tuple[List[Tuple[float, ...]], float]:
"""
Generates a time-stamped, Cartesian path from a list of waypoints.
This is a static method so it can be used for previews without a Target instance.
"""
path = []
total_duration_s = 0.0
if not waypoints:
return path, total_duration_s
first_wp = waypoints[0]
if first_wp.maneuver_type != ManeuverType.FLY_TO_POINT:
return path, total_duration_s
keyframes = []
current_time = 0.0
range_ft = (first_wp.target_range_nm or 0.0) * NM_TO_FT
az_rad = math.radians(first_wp.target_azimuth_deg or 0.0)
current_x = range_ft * math.sin(az_rad)
current_y = range_ft * math.cos(az_rad)
current_z = first_wp.target_altitude_ft or 0.0
current_heading = first_wp.target_heading_deg or 0.0
current_velocity = first_wp.target_velocity_fps or 0.0
keyframes.append((current_time, current_x, current_y, current_z))
for i, wp in enumerate(waypoints):
if i == 0:
continue
duration = wp.duration_s or 0.0
if wp.maneuver_type == ManeuverType.FLY_TO_POINT:
target_range_ft = (wp.target_range_nm or 0.0) * NM_TO_FT
target_az_rad = math.radians(wp.target_azimuth_deg or 0.0)
current_x = target_range_ft * math.sin(target_az_rad)
current_y = target_range_ft * math.cos(target_az_rad)
current_z = wp.target_altitude_ft or current_z
elif wp.maneuver_type == ManeuverType.FLY_FOR_DURATION:
velocity_to_use = wp.target_velocity_fps if wp.target_velocity_fps is not None else current_velocity
heading_to_use = wp.target_heading_deg if wp.target_heading_deg is not None else current_heading
heading_rad = math.radians(heading_to_use)
dist_moved = velocity_to_use * duration
current_x += dist_moved * math.sin(heading_rad)
current_y += dist_moved * math.cos(heading_rad)
current_z = wp.target_altitude_ft or current_z
current_heading = heading_to_use
current_velocity = velocity_to_use
total_duration_s += duration
keyframes.append((total_duration_s, current_x, current_y, current_z))
if len(keyframes) < 2:
return keyframes, total_duration_s
if use_spline and len(keyframes) >= 4:
from target_simulator.utils.spline import catmull_rom_spline
points_to_spline = [kf[1:] for kf in keyframes]
num_spline_points = int(total_duration_s * 10)
splined_points = catmull_rom_spline(points_to_spline, num_points=max(num_spline_points, 100))
for i, point in enumerate(splined_points):
time = (i / (len(splined_points) - 1)) * total_duration_s if len(splined_points) > 1 else 0.0
path.append((time, point[0], point[1], point[2]))
else:
path = keyframes
return path, total_duration_s
def update_state(self, delta_time_s: float):
# ... (questo metodo rimane invariato)
if not self.active or not self._path:
return
self._sim_time_s += delta_time_s
current_sim_time = min(self._sim_time_s, self._total_duration_s)
if current_sim_time >= self._total_duration_s:
final_pos = self._path[-1]
self._pos_x_ft, self._pos_y_ft, self._pos_z_ft = final_pos[1], final_pos[2], final_pos[3]
self.current_velocity_fps = 0.0
if self._sim_time_s >= self._total_duration_s:
self.active = False
else:
p1 = self._path[0]
p2 = self._path[-1]
for i in range(len(self._path) - 1):
if self._path[i][0] <= current_sim_time <= self._path[i+1][0]:
p1 = self._path[i]
p2 = self._path[i+1]
break
segment_duration = p2[0] - p1[0]
progress = (current_sim_time - p1[0]) / segment_duration if segment_duration > 0 else 1.0
prev_x, prev_y, prev_z = self._pos_x_ft, self._pos_y_ft, self._pos_z_ft
self._pos_x_ft = p1[1] + (p2[1] - p1[1]) * progress
self._pos_y_ft = p1[2] + (p2[2] - p1[2]) * progress
self._pos_z_ft = p1[3] + (p2[3] - p1[3]) * progress
dx = self._pos_x_ft - prev_x
dy = self._pos_y_ft - prev_y
dz = self._pos_z_ft - prev_z
dist_moved_3d = math.sqrt(dx**2 + dy**2 + dz**2)
dist_moved_2d = math.sqrt(dx**2 + dy**2)
if delta_time_s > 0:
self.current_velocity_fps = dist_moved_3d / delta_time_s
if dist_moved_2d > 0.1:
self.current_heading_deg = math.degrees(math.atan2(dx, dy)) % 360
self.current_pitch_deg = math.degrees(math.atan2(dz, dist_moved_2d))
self._update_current_polar_coords()
def _update_current_polar_coords(self):
# ... (questo metodo rimane invariato)
self.current_range_nm = math.sqrt(self._pos_x_ft**2 + self._pos_y_ft**2) / NM_TO_FT
self.current_azimuth_deg = math.degrees(math.atan2(self._pos_x_ft, self._pos_y_ft)) % 360
self.current_altitude_ft = self._pos_z_ft
def to_dict(self) -> Dict:
# ... (questo metodo rimane invariato)
return {
"target_id": self.target_id,
"active": self.active,
"traceable": self.traceable,
"trajectory": [wp.to_dict() for wp in self.trajectory],
"use_spline": self.use_spline
}
class Scenario:
def __init__(self, name: str = "Untitled Scenario"):
self.name = name
self.targets: Dict[int, Target] = {}
def add_target(self, target: Target):
self.targets[target.target_id] = target
def get_target(self, target_id: int) -> Optional[Target]:
return self.targets.get(target_id)
def remove_target(self, target_id: int):
if target_id in self.targets:
del self.targets[target_id]
def get_all_targets(self) -> List[Target]:
return list(self.targets.values())
def reset_simulation(self):
"""Resets the simulation state for all targets in the scenario."""
for target in self.targets.values():
target.reset_simulation()
def update_state(self, delta_time_s: float):
"""Updates the state of all targets in the scenario."""
for target in self.targets.values():
target.update_state(delta_time_s)
def is_finished(self) -> bool:
"""Checks if all targets in the scenario have finished their trajectory."""
return all(not target.active for target in self.targets.values())
def to_dict(self) -> Dict:
return {
"name": self.name,
"targets": [target.to_dict() for target in self.get_all_targets()]
}
@classmethod
def from_dict(cls, data: Dict) -> Scenario:
name = data.get("name", "Loaded Scenario")
scenario = cls(name=name)
for target_data in data.get("targets", []):
try:
waypoints = []
for wp_data in target_data.get("trajectory", []):
wp_data["maneuver_type"] = ManeuverType(wp_data["maneuver_type"])
waypoints.append(Waypoint(**wp_data))
target = Target(
target_id=target_data["target_id"],
active=target_data.get("active", True),
traceable=target_data.get("traceable", True),
trajectory=waypoints,
use_spline=target_data.get("use_spline", False)
)
scenario.add_target(target)
except Exception as e:
print(f"Skipping invalid target data: {target_data}. Error: {e}")
return scenario