# target_simulator/core/models.py """ Defines the core data models for the application, including 3D dynamic Targets with programmable trajectories and Scenarios. """ from __future__ import annotations import math from enum import Enum from dataclasses import dataclass, field, fields from typing import Dict, List, Optional # --- Constants --- MIN_TARGET_ID = 0 MAX_TARGET_ID = 15 KNOTS_TO_FPS = 1.68781 FPS_TO_KNOTS = 1 / KNOTS_TO_FPS NM_TO_FT = 6076.12 class ManeuverType(Enum): FLY_TO_POINT = "Fly to Point" FLY_FOR_DURATION = "Fly for Duration" DYNAMIC_MANEUVER = "Dynamic Maneuver" @dataclass class Waypoint: """Represents a segment of a target's trajectory, defining a maneuver.""" maneuver_type: ManeuverType = ManeuverType.FLY_FOR_DURATION # Parameters for FLY_FOR_DURATION and FLY_TO_POINT target_velocity_fps: Optional[float] = None target_heading_deg: Optional[float] = None duration_s: Optional[float] = 10.0 target_altitude_ft: Optional[float] = 10000.0 target_range_nm: Optional[float] = None target_azimuth_deg: Optional[float] = None # Parameters for DYNAMIC_MANEUVER (all obbligatori, velocity non opzionale) dynamic_velocity_fps: Optional[float] = None # velocità durante la manovra lateral_g: Optional[float] = None # G laterale (virata) longitudinal_g: Optional[float] = None # G longitudinale (accelerazione) vertical_g: Optional[float] = None # G verticale (salita/discesa) # Internal state for interpolation, set when waypoint becomes active _start_velocity_fps: float = field(init=False, repr=False, default=0.0) _start_heading_deg: float = field(init=False, repr=False, default=0.0) _start_altitude_ft: float = field(init=False, repr=False, default=0.0) _calculated_duration_s: Optional[float] = field(init=False, default=None, repr=False) def to_dict(self) -> Dict: data = {"maneuver_type": self.maneuver_type.value} for f in fields(self): if not f.name.startswith('_') and f.name != "maneuver_type": val = getattr(self, f.name) if val is not None: data[f.name] = val return data @dataclass class Target: _spline_path: Optional[List[List[float]]] = field(init=False, default=None, repr=False) """Represents a dynamic 3D target with a programmable trajectory.""" target_id: int trajectory: List[Waypoint] = field(default_factory=list) active: bool = True traceable: bool = True use_spline: bool = False # Se True, la traiettoria viene interpretata come spline # --- Current simulation state (dynamic) --- current_range_nm: float = field(init=False, default=0.0) current_azimuth_deg: float = field(init=False, default=0.0) current_altitude_ft: float = field(init=False, default=0.0) current_velocity_fps: float = field(init=False, default=0.0) current_heading_deg: float = field(init=False, default=0.0) current_pitch_deg: float = field(init=False, default=0.0) _pos_x_ft: float = field(init=False, repr=False, default=0.0) _pos_y_ft: float = field(init=False, repr=False, default=0.0) _pos_z_ft: float = field(init=False, repr=False, default=0.0) _sim_time_s: float = field(init=False, default=0.0) _current_waypoint_index: int = field(init=False, default=-1) _time_in_waypoint_s: float = field(init=False, default=0.0) def __post_init__(self): if not (MIN_TARGET_ID <= self.target_id <= MAX_TARGET_ID): raise ValueError(f"Target ID must be between {MIN_TARGET_ID} and {MAX_TARGET_ID}.") self.reset_simulation() def reset_simulation(self): self._sim_time_s = 0.0 self._time_in_waypoint_s = 0.0 self._current_waypoint_index = -1 if self.use_spline and self.trajectory: from target_simulator.utils.spline import catmull_rom_spline # Convert all waypoints to cartesian points in feet pts = [] for wp in self.trajectory: if wp.maneuver_type == ManeuverType.FLY_TO_POINT and wp.target_range_nm is not None and wp.target_azimuth_deg is not None: range_ft = (wp.target_range_nm or 0.0) * NM_TO_FT az_rad = math.radians(wp.target_azimuth_deg or 0.0) x = range_ft * math.sin(az_rad) y = range_ft * math.cos(az_rad) z = wp.target_altitude_ft or 0.0 pts.append([x, y, z]) if not pts: self._pos_x_ft = self._pos_y_ft = self._pos_z_ft = 0.0 self.current_velocity_fps = 0.0 self.current_heading_deg = 0.0 self._spline_path = None return # Generate spline path from points in feet self._spline_path = catmull_rom_spline(pts, num_points=200) self._spline_index = 0 # Set initial position from the first point of the spline initial_pos = self._spline_path[0] self._pos_x_ft, self._pos_y_ft, self._pos_z_ft = initial_pos[0], initial_pos[1], initial_pos[2] # Calculate total duration from waypoints total_duration = sum(wp.duration_s or 0.0 for wp in self.trajectory if wp.duration_s) # Create a time mapping for each point in the spline path self._spline_point_times = [i * (total_duration / (len(self._spline_path) - 1)) for i in range(len(self._spline_path))] if not self._spline_point_times: self._spline_point_times = [0.0] self._update_current_polar_coords() else: # Waypoint-based simulation reset if not self.trajectory or self.trajectory[0].maneuver_type != ManeuverType.FLY_TO_POINT: self._pos_x_ft = self._pos_y_ft = self._pos_z_ft = 0.0 self.current_velocity_fps = 0.0 self.current_heading_deg = 0.0 self.current_pitch_deg = 0.0 else: first_wp = self.trajectory[0] self.current_range_nm = first_wp.target_range_nm or 0.0 self.current_azimuth_deg = first_wp.target_azimuth_deg or 0.0 self.current_altitude_ft = first_wp.target_altitude_ft or 0.0 self.current_velocity_fps = first_wp.target_velocity_fps or 0.0 self.current_heading_deg = first_wp.target_heading_deg or 0.0 self.current_pitch_deg = 0.0 range_ft = self.current_range_nm * NM_TO_FT az_rad = math.radians(self.current_azimuth_deg) self._pos_x_ft = range_ft * math.sin(az_rad) self._pos_y_ft = range_ft * math.cos(az_rad) self._pos_z_ft = self.current_altitude_ft self._activate_next_waypoint() self._update_current_polar_coords() def _activate_next_waypoint(self): """Pre-calculates and activates the next waypoint in the trajectory.""" self._current_waypoint_index += 1 self._time_in_waypoint_s = 0.0 if self._current_waypoint_index >= len(self.trajectory): self._current_waypoint_index = -1; return wp = self.trajectory[self._current_waypoint_index] wp._start_velocity_fps = self.current_velocity_fps wp._start_heading_deg = self.current_heading_deg wp._start_altitude_ft = self.current_altitude_ft if wp.maneuver_type == ManeuverType.FLY_TO_POINT: target_range_ft = (wp.target_range_nm or 0.0) * NM_TO_FT target_az_rad = math.radians(wp.target_azimuth_deg or 0.0) target_x = target_range_ft * math.sin(target_az_rad) target_y = target_range_ft * math.cos(target_az_rad) target_z = wp.target_altitude_ft or self._pos_z_ft dx, dy, dz = target_x - self._pos_x_ft, target_y - self._pos_y_ft, target_z - self._pos_z_ft dist_3d = math.sqrt(dx**2 + dy**2 + dz**2) dist_2d = math.sqrt(dx**2 + dy**2) self.current_heading_deg = math.degrees(math.atan2(dx, dy)) % 360 self.current_pitch_deg = math.degrees(math.atan2(dz, dist_2d)) if dist_2d > 1 else 90.0 if dz > 0 else -90.0 if wp.duration_s and wp.duration_s > 0: self.current_velocity_fps = dist_3d / wp.duration_s wp._calculated_duration_s = wp.duration_s else: self.current_velocity_fps = 0 wp._calculated_duration_s = 999999 else: self.current_pitch_deg = 0 def update_state(self, delta_time_s: float): if not self.active: return if self.use_spline and self._spline_path: # ...existing code... pass else: self._sim_time_s += delta_time_s self._time_in_waypoint_s += delta_time_s if self._current_waypoint_index == -1: return wp = self.trajectory[self._current_waypoint_index] duration = wp._calculated_duration_s if wp._calculated_duration_s is not None else wp.duration_s or 0.0 if self._time_in_waypoint_s >= duration: # Finalize state to match waypoint target if wp.maneuver_type == ManeuverType.FLY_FOR_DURATION: self.current_velocity_fps = wp.target_velocity_fps or self.current_velocity_fps self.current_heading_deg = wp.target_heading_deg or self.current_heading_deg elif wp.maneuver_type == ManeuverType.DYNAMIC_MANEUVER: self.current_velocity_fps = wp.dynamic_velocity_fps or self.current_velocity_fps self.current_altitude_ft = wp.target_altitude_ft or self.current_altitude_ft self._activate_next_waypoint() if self._current_waypoint_index == -1: return wp = self.trajectory[self._current_waypoint_index] duration = wp._calculated_duration_s if wp._calculated_duration_s is not None else wp.duration_s or 0.0 progress = self._time_in_waypoint_s / duration if duration > 0 else 1.0 if wp.maneuver_type == ManeuverType.FLY_FOR_DURATION: # ...existing code... self.current_velocity_fps = wp._start_velocity_fps + ((wp.target_velocity_fps or wp._start_velocity_fps) - wp._start_velocity_fps) * progress self.current_altitude_ft = wp._start_altitude_ft + ((wp.target_altitude_ft or wp._start_altitude_ft) - wp._start_altitude_ft) * progress delta_heading = ((wp.target_heading_deg or wp._start_heading_deg) - wp._start_heading_deg + 180) % 360 - 180 self.current_heading_deg = (wp._start_heading_deg + delta_heading * progress) % 360 elif wp.maneuver_type == ManeuverType.DYNAMIC_MANEUVER: # Logica dinamica: integra accelerazioni e heading # Parametri obbligatori: dynamic_velocity_fps, lateral_g, duration_s velocity = wp.dynamic_velocity_fps lateral_g = wp.lateral_g or 0.0 longitudinal_g = wp.longitudinal_g or 0.0 vertical_g = wp.vertical_g or 0.0 # Conversione G in ft/s^2 G_FT_S2 = 32.174 lateral_acc = lateral_g * G_FT_S2 long_acc = longitudinal_g * G_FT_S2 vert_acc = vertical_g * G_FT_S2 # Aggiorna heading (virata): rateo di virata = a_laterale / velocità # LV turn_rate_rad_s = lateral_acc / max(velocity, 1e-3) R = (velocity* velocity)/lateral_acc omega = velocity/R if R!=0 else 0 turn_rate_rad_s = omega self.current_heading_deg = (self.current_heading_deg + math.degrees(turn_rate_rad_s * delta_time_s)) % 360 # Aggiorna velocità (accelerazione longitudinale) self.current_velocity_fps = velocity + long_acc * delta_time_s # Aggiorna posizione heading_rad = math.radians(self.current_heading_deg) pitch_rad = math.radians(self.current_pitch_deg) dist_moved = self.current_velocity_fps * delta_time_s dist_2d = dist_moved * math.cos(pitch_rad) self._pos_x_ft += dist_2d * math.sin(heading_rad) self._pos_y_ft += dist_2d * math.cos(heading_rad) self._pos_z_ft += dist_moved * math.sin(pitch_rad) + vert_acc * delta_time_s # ...existing code... self._update_current_polar_coords() def _update_current_polar_coords(self): """Recalculates current polar coordinates from Cartesian ones.""" self.current_range_nm = math.sqrt(self._pos_x_ft**2 + self._pos_y_ft**2) / NM_TO_FT self.current_azimuth_deg = math.degrees(math.atan2(self._pos_x_ft, self._pos_y_ft)) self.current_altitude_ft = self._pos_z_ft def to_dict(self) -> Dict: data = { "target_id": self.target_id, "active": self.active, "traceable": self.traceable, "trajectory": [wp.to_dict() for wp in self.trajectory], "use_spline": self.use_spline } return data class Scenario: def __init__(self, name: str = "Untitled Scenario"): self.name = name self.targets: Dict[int, Target] = {} def add_target(self, target: Target): self.targets[target.target_id] = target def get_target(self, target_id: int) -> Optional[Target]: return self.targets.get(target_id) def remove_target(self, target_id: int): if target_id in self.targets: del self.targets[target_id] def get_all_targets(self) -> List[Target]: return list(self.targets.values()) def reset_simulation(self): """Resets the simulation state for all targets in the scenario.""" for target in self.targets.values(): target.reset_simulation() def update_state(self, delta_time_s: float): """Updates the state of all targets in the scenario.""" for target in self.targets.values(): target.update_state(delta_time_s) def to_dict(self) -> Dict: return { "name": self.name, "targets": [target.to_dict() for target in self.get_all_targets()] } @classmethod def from_dict(cls, data: Dict) -> Scenario: name = data.get("name", "Loaded Scenario") scenario = cls(name=name) for target_data in data.get("targets", []): try: waypoints = [] for wp_data in target_data.get("trajectory", []): wp_data["maneuver_type"] = ManeuverType(wp_data["maneuver_type"]) waypoints.append(Waypoint(**wp_data)) target = Target( target_id=target_data["target_id"], active=target_data.get("active", True), traceable=target_data.get("traceable", True), trajectory=waypoints, use_spline=target_data.get("use_spline", False) ) scenario.add_target(target) except Exception as e: print(f"Skipping invalid target data: {target_data}. Error: {e}") return scenario