S1005403_RisCC/target_simulator/core/models.py

358 lines
16 KiB
Python

# target_simulator/core/models.py
"""
Defines the core data models for the application, including 3D dynamic Targets
with programmable trajectories and Scenarios.
"""
from __future__ import annotations
import math
from enum import Enum
from dataclasses import dataclass, field, fields
from typing import Dict, List, Optional
# --- Constants ---
MIN_TARGET_ID = 0
MAX_TARGET_ID = 15
KNOTS_TO_FPS = 1.68781
FPS_TO_KNOTS = 1 / KNOTS_TO_FPS
NM_TO_FT = 6076.12
class ManeuverType(Enum):
FLY_TO_POINT = "Fly to Point"
FLY_FOR_DURATION = "Fly for Duration"
@dataclass
class Waypoint:
"""Represents a segment of a target's trajectory, defining a maneuver."""
maneuver_type: ManeuverType = ManeuverType.FLY_FOR_DURATION
# Parameters used by FLY_FOR_DURATION and as final state for FLY_TO_POINT
target_velocity_fps: Optional[float] = None
target_heading_deg: Optional[float] = None
# Parameters used by both
duration_s: Optional[float] = 10.0
target_altitude_ft: Optional[float] = 10000.0
# Parameters used by FLY_TO_POINT
target_range_nm: Optional[float] = None
target_azimuth_deg: Optional[float] = None
# Internal state for interpolation, set when waypoint becomes active
_start_velocity_fps: float = field(init=False, repr=False, default=0.0)
_start_heading_deg: float = field(init=False, repr=False, default=0.0)
_start_altitude_ft: float = field(init=False, repr=False, default=0.0)
_calculated_duration_s: Optional[float] = field(init=False, default=None, repr=False)
def to_dict(self) -> Dict:
data = {"maneuver_type": self.maneuver_type.value}
# Use dataclasses.fields to iterate reliably
for f in fields(self):
if not f.name.startswith('_') and f.name != "maneuver_type":
val = getattr(self, f.name)
if val is not None:
data[f.name] = val
return data
@dataclass
class Target:
_spline_path: Optional[List[List[float]]] = field(init=False, default=None, repr=False)
"""Represents a dynamic 3D target with a programmable trajectory."""
target_id: int
trajectory: List[Waypoint] = field(default_factory=list)
active: bool = True
traceable: bool = True
use_spline: bool = False # Se True, la traiettoria viene interpretata come spline
# --- Current simulation state (dynamic) ---
current_range_nm: float = field(init=False, default=0.0)
current_azimuth_deg: float = field(init=False, default=0.0)
current_altitude_ft: float = field(init=False, default=0.0)
current_velocity_fps: float = field(init=False, default=0.0)
current_heading_deg: float = field(init=False, default=0.0)
current_pitch_deg: float = field(init=False, default=0.0)
_pos_x_ft: float = field(init=False, repr=False, default=0.0)
_pos_y_ft: float = field(init=False, repr=False, default=0.0)
_pos_z_ft: float = field(init=False, repr=False, default=0.0)
_sim_time_s: float = field(init=False, default=0.0)
_current_waypoint_index: int = field(init=False, default=-1)
_time_in_waypoint_s: float = field(init=False, default=0.0)
def __post_init__(self):
if not (MIN_TARGET_ID <= self.target_id <= MAX_TARGET_ID):
raise ValueError(f"Target ID must be between {MIN_TARGET_ID} and {MAX_TARGET_ID}.")
self.reset_simulation()
def reset_simulation(self):
self._sim_time_s = 0.0
self._time_in_waypoint_s = 0.0
self._current_waypoint_index = -1
if self.use_spline and self.trajectory:
from target_simulator.utils.spline import catmull_rom_spline
# Convert all waypoints to cartesian points in feet
pts = []
for wp in self.trajectory:
if wp.maneuver_type == ManeuverType.FLY_TO_POINT and wp.target_range_nm is not None and wp.target_azimuth_deg is not None:
range_ft = (wp.target_range_nm or 0.0) * NM_TO_FT
az_rad = math.radians(wp.target_azimuth_deg or 0.0)
x = range_ft * math.sin(az_rad)
y = range_ft * math.cos(az_rad)
z = wp.target_altitude_ft or 0.0
pts.append([x, y, z])
if not pts:
self._pos_x_ft = self._pos_y_ft = self._pos_z_ft = 0.0
self.current_velocity_fps = 0.0
self.current_heading_deg = 0.0
self._spline_path = None
return
# Generate spline path from points in feet
self._spline_path = catmull_rom_spline(pts, num_points=200)
self._spline_index = 0
# Set initial position from the first point of the spline
initial_pos = self._spline_path[0]
self._pos_x_ft, self._pos_y_ft, self._pos_z_ft = initial_pos[0], initial_pos[1], initial_pos[2]
# Calculate total duration from waypoints
total_duration = sum(wp.duration_s or 0.0 for wp in self.trajectory if wp.duration_s)
# Create a time mapping for each point in the spline path
if self._spline_path is not None and len(self._spline_path) > 1:
self._spline_point_times = [i * (total_duration / (len(self._spline_path) - 1)) for i in range(len(self._spline_path))]
else:
self._spline_point_times = [0.0]
self._update_current_polar_coords()
else:
# Waypoint-based simulation reset
if not self.trajectory or self.trajectory[0].maneuver_type != ManeuverType.FLY_TO_POINT:
self._pos_x_ft = self._pos_y_ft = self._pos_z_ft = 0.0
self.current_velocity_fps = 0.0
self.current_heading_deg = 0.0
self.current_pitch_deg = 0.0
else:
first_wp = self.trajectory[0]
self.current_range_nm = first_wp.target_range_nm or 0.0
self.current_azimuth_deg = first_wp.target_azimuth_deg or 0.0
self.current_altitude_ft = first_wp.target_altitude_ft or 0.0
self.current_velocity_fps = first_wp.target_velocity_fps or 0.0
self.current_heading_deg = first_wp.target_heading_deg or 0.0
self.current_pitch_deg = 0.0
range_ft = self.current_range_nm * NM_TO_FT
az_rad = math.radians(self.current_azimuth_deg)
self._pos_x_ft = range_ft * math.sin(az_rad)
self._pos_y_ft = range_ft * math.cos(az_rad)
self._pos_z_ft = self.current_altitude_ft
self._activate_next_waypoint()
self._update_current_polar_coords()
def _activate_next_waypoint(self):
"""Pre-calculates and activates the next waypoint in the trajectory."""
self._current_waypoint_index += 1
self._time_in_waypoint_s = 0.0
if self._current_waypoint_index >= len(self.trajectory):
self._current_waypoint_index = -1; return
wp = self.trajectory[self._current_waypoint_index]
wp._start_velocity_fps = self.current_velocity_fps
wp._start_heading_deg = self.current_heading_deg
wp._start_altitude_ft = self.current_altitude_ft
if wp.maneuver_type == ManeuverType.FLY_TO_POINT:
target_range_ft = (wp.target_range_nm or 0.0) * NM_TO_FT
target_az_rad = math.radians(wp.target_azimuth_deg or 0.0)
target_x = target_range_ft * math.sin(target_az_rad)
target_y = target_range_ft * math.cos(target_az_rad)
target_z = wp.target_altitude_ft or self._pos_z_ft
dx, dy, dz = target_x - self._pos_x_ft, target_y - self._pos_y_ft, target_z - self._pos_z_ft
dist_3d = math.sqrt(dx**2 + dy**2 + dz**2)
dist_2d = math.sqrt(dx**2 + dy**2)
self.current_heading_deg = math.degrees(math.atan2(dx, dy)) % 360
self.current_pitch_deg = math.degrees(math.atan2(dz, dist_2d)) if dist_2d > 1 else 90.0 if dz > 0 else -90.0
if wp.duration_s and wp.duration_s > 0:
self.current_velocity_fps = dist_3d / wp.duration_s
wp._calculated_duration_s = wp.duration_s
else:
self.current_velocity_fps = 0
wp._calculated_duration_s = 999999
else:
self.current_pitch_deg = 0
def update_state(self, delta_time_s: float):
if not self.active:
return
if self.use_spline and self._spline_path:
self._sim_time_s += delta_time_s
total_duration = self._spline_point_times[-1]
if total_duration <= 0 or self._sim_time_s >= total_duration:
# Simulation finished, stay at the last point
self._spline_index = len(self._spline_path) - 1
self.active = False
self.current_velocity_fps = 0.0
else:
# Find the current index in the spline path based on time
progress = self._sim_time_s / total_duration
self._spline_index = int(progress * (len(self._spline_path) - 1))
pt = self._spline_path[self._spline_index]
# Determine previous point for calculating heading/pitch
prev_pt = self._spline_path[self._spline_index - 1] if self._spline_index > 0 else pt
# Update position (path is already in feet)
self._pos_x_ft, self._pos_y_ft, self._pos_z_ft = pt[0], pt[1], pt[2]
# Update kinematics
dx = self._pos_x_ft - prev_pt[0]
dy = self._pos_y_ft - prev_pt[1]
dz = self._pos_z_ft - prev_pt[2]
dist_moved_since_last_tick = math.sqrt(dx**2 + dy**2 + dz**2)
dist_2d = math.sqrt(dx**2 + dy**2)
if dist_2d > 0.1: # Avoid division by zero or noise
self.current_heading_deg = math.degrees(math.atan2(dx, dy)) % 360
self.current_pitch_deg = math.degrees(math.atan2(dz, dist_2d))
if delta_time_s > 0:
self.current_velocity_fps = dist_moved_since_last_tick / delta_time_s
self._update_current_polar_coords()
# --- DEBUG LOG ---
#import logging
#logging.getLogger("target_simulator.spline_debug").info(
# f"SplinePos: t={self._sim_time_s:.2f}s idx={self._spline_index} x={self._pos_x_ft:.2f} y={self._pos_y_ft:.2f} z={self._pos_z_ft:.2f}")
# --- DEBUG LOG ---
#import logging
#logging.getLogger("target_simulator.spline_debug").info(
# f"SplinePos: t={self._sim_time_s:.2f}s idx={self._spline_index} x={self._pos_x_ft:.2f} y={self._pos_y_ft:.2f} z={self._pos_z_ft:.2f}")
else:
# Logica classica
self._sim_time_s += delta_time_s
self._time_in_waypoint_s += delta_time_s
if self._current_waypoint_index == -1:
# Traiettoria terminata: disattiva target
self.active = False
self.current_velocity_fps = 0.0
return
wp = self.trajectory[self._current_waypoint_index]
duration = wp._calculated_duration_s if wp._calculated_duration_s is not None else wp.duration_s or 0.0
if self._time_in_waypoint_s >= duration:
# Finalize state to match waypoint target
if wp.maneuver_type == ManeuverType.FLY_FOR_DURATION:
self.current_velocity_fps = wp.target_velocity_fps or self.current_velocity_fps
self.current_heading_deg = wp.target_heading_deg or self.current_heading_deg
self.current_altitude_ft = wp.target_altitude_ft or self.current_altitude_ft
self._activate_next_waypoint()
if self._current_waypoint_index == -1:
# Traiettoria terminata: disattiva target
self.active = False
self.current_velocity_fps = 0.0
return
wp = self.trajectory[self._current_waypoint_index] # Get new active waypoint
duration = wp._calculated_duration_s if wp._calculated_duration_s is not None else wp.duration_s or 0.0
progress = self._time_in_waypoint_s / duration if duration > 0 else 1.0
if wp.maneuver_type == ManeuverType.FLY_FOR_DURATION:
# Interpolate kinematics during the maneuver
self.current_velocity_fps = wp._start_velocity_fps + ((wp.target_velocity_fps or wp._start_velocity_fps) - wp._start_velocity_fps) * progress
self.current_altitude_ft = wp._start_altitude_ft + ((wp.target_altitude_ft or wp._start_altitude_ft) - wp._start_altitude_ft) * progress
delta_heading = ((wp.target_heading_deg or wp._start_heading_deg) - wp._start_heading_deg + 180) % 360 - 180
self.current_heading_deg = (wp._start_heading_deg + delta_heading * progress) % 360
# Update position using current 3D vector
heading_rad = math.radians(self.current_heading_deg)
pitch_rad = math.radians(self.current_pitch_deg)
dist_moved = self.current_velocity_fps * delta_time_s
dist_2d = dist_moved * math.cos(pitch_rad)
self._pos_x_ft += dist_2d * math.sin(heading_rad)
self._pos_y_ft += dist_2d * math.cos(heading_rad)
self._pos_z_ft += dist_moved * math.sin(pitch_rad)
self._update_current_polar_coords()
def _update_current_polar_coords(self):
"""Recalculates current polar coordinates from Cartesian ones."""
self.current_range_nm = math.sqrt(self._pos_x_ft**2 + self._pos_y_ft**2) / NM_TO_FT
self.current_azimuth_deg = math.degrees(math.atan2(self._pos_x_ft, self._pos_y_ft))
self.current_altitude_ft = self._pos_z_ft
def to_dict(self) -> Dict:
data = {
"target_id": self.target_id,
"active": self.active,
"traceable": self.traceable,
"trajectory": [wp.to_dict() for wp in self.trajectory],
"use_spline": self.use_spline
}
return data
class Scenario:
def __init__(self, name: str = "Untitled Scenario"):
self.name = name
self.targets: Dict[int, Target] = {}
def add_target(self, target: Target):
self.targets[target.target_id] = target
def get_target(self, target_id: int) -> Optional[Target]:
return self.targets.get(target_id)
def remove_target(self, target_id: int):
if target_id in self.targets:
del self.targets[target_id]
def get_all_targets(self) -> List[Target]:
return list(self.targets.values())
def reset_simulation(self):
"""Resets the simulation state for all targets in the scenario."""
for target in self.targets.values():
target.reset_simulation()
def update_state(self, delta_time_s: float):
"""Updates the state of all targets in the scenario."""
for target in self.targets.values():
target.update_state(delta_time_s)
def to_dict(self) -> Dict:
return {
"name": self.name,
"targets": [target.to_dict() for target in self.get_all_targets()]
}
@classmethod
def from_dict(cls, data: Dict) -> Scenario:
name = data.get("name", "Loaded Scenario")
scenario = cls(name=name)
for target_data in data.get("targets", []):
try:
waypoints = []
for wp_data in target_data.get("trajectory", []):
wp_data["maneuver_type"] = ManeuverType(wp_data["maneuver_type"])
waypoints.append(Waypoint(**wp_data))
target = Target(
target_id=target_data["target_id"],
active=target_data.get("active", True),
traceable=target_data.get("traceable", True),
trajectory=waypoints,
use_spline=target_data.get("use_spline", False)
)
scenario.add_target(target)
except Exception as e:
print(f"Skipping invalid target data: {target_data}. Error: {e}")
return scenario