# target_simulator/core/models.py """ Defines the core data models for the application, including 3D dynamic Targets with programmable trajectories and Scenarios. """ from __future__ import annotations import math from enum import Enum from dataclasses import dataclass, field, fields from typing import Dict, List, Optional # --- Constants --- MIN_TARGET_ID = 0 MAX_TARGET_ID = 15 KNOTS_TO_FPS = 1.68781 FPS_TO_KNOTS = 1 / KNOTS_TO_FPS NM_TO_FT = 6076.12 class ManeuverType(Enum): FLY_TO_POINT = "Fly to Point" FLY_FOR_DURATION = "Fly for Duration" @dataclass class Waypoint: """Represents a segment of a target's trajectory, defining a maneuver.""" maneuver_type: ManeuverType = ManeuverType.FLY_FOR_DURATION # Parameters used by FLY_FOR_DURATION and as final state for FLY_TO_POINT target_velocity_fps: Optional[float] = None target_heading_deg: Optional[float] = None # Parameters used by both duration_s: Optional[float] = 10.0 target_altitude_ft: Optional[float] = 10000.0 # Parameters used by FLY_TO_POINT target_range_nm: Optional[float] = None target_azimuth_deg: Optional[float] = None # Internal state for interpolation, set when waypoint becomes active _start_velocity_fps: float = field(init=False, repr=False, default=0.0) _start_heading_deg: float = field(init=False, repr=False, default=0.0) _start_altitude_ft: float = field(init=False, repr=False, default=0.0) _calculated_duration_s: Optional[float] = field(init=False, default=None, repr=False) def to_dict(self) -> Dict: data = {"maneuver_type": self.maneuver_type.value} # Use dataclasses.fields to iterate reliably for f in fields(self): if not f.name.startswith('_') and f.name != "maneuver_type": val = getattr(self, f.name) if val is not None: data[f.name] = val return data @dataclass class Target: _spline_path: Optional[List[List[float]]] = field(init=False, default=None, repr=False) """Represents a dynamic 3D target with a programmable trajectory.""" target_id: int trajectory: List[Waypoint] = field(default_factory=list) active: bool = True traceable: bool = True use_spline: bool = False # Se True, la traiettoria viene interpretata come spline # --- Current simulation state (dynamic) --- current_range_nm: float = field(init=False, default=0.0) current_azimuth_deg: float = field(init=False, default=0.0) current_altitude_ft: float = field(init=False, default=0.0) current_velocity_fps: float = field(init=False, default=0.0) current_heading_deg: float = field(init=False, default=0.0) current_pitch_deg: float = field(init=False, default=0.0) _pos_x_ft: float = field(init=False, repr=False, default=0.0) _pos_y_ft: float = field(init=False, repr=False, default=0.0) _pos_z_ft: float = field(init=False, repr=False, default=0.0) _sim_time_s: float = field(init=False, default=0.0) _current_waypoint_index: int = field(init=False, default=-1) _time_in_waypoint_s: float = field(init=False, default=0.0) def __post_init__(self): if not (MIN_TARGET_ID <= self.target_id <= MAX_TARGET_ID): raise ValueError(f"Target ID must be between {MIN_TARGET_ID} and {MAX_TARGET_ID}.") self.reset_simulation() def reset_simulation(self): self._sim_time_s = 0.0 self._time_in_waypoint_s = 0.0 self._current_waypoint_index = -1 if self.use_spline and self.trajectory: from target_simulator.utils.spline import catmull_rom_spline # Convert all waypoints to cartesian points in feet pts = [] for wp in self.trajectory: if wp.maneuver_type == ManeuverType.FLY_TO_POINT and wp.target_range_nm is not None and wp.target_azimuth_deg is not None: range_ft = (wp.target_range_nm or 0.0) * NM_TO_FT az_rad = math.radians(wp.target_azimuth_deg or 0.0) x = range_ft * math.sin(az_rad) y = range_ft * math.cos(az_rad) z = wp.target_altitude_ft or 0.0 pts.append([x, y, z]) if not pts: self._pos_x_ft = self._pos_y_ft = self._pos_z_ft = 0.0 self.current_velocity_fps = 0.0 self.current_heading_deg = 0.0 self._spline_path = None return # Generate spline path from points in feet self._spline_path = catmull_rom_spline(pts, num_points=200) self._spline_index = 0 # Set initial position from the first point of the spline initial_pos = self._spline_path[0] self._pos_x_ft, self._pos_y_ft, self._pos_z_ft = initial_pos[0], initial_pos[1], initial_pos[2] # Calculate total duration from waypoints total_duration = sum(wp.duration_s or 0.0 for wp in self.trajectory if wp.duration_s) # Create a time mapping for each point in the spline path self._spline_point_times = [i * (total_duration / (len(self._spline_path) - 1)) for i in range(len(self._spline_path))] if not self._spline_point_times: self._spline_point_times = [0.0] self._update_current_polar_coords() else: # Waypoint-based simulation reset if not self.trajectory or self.trajectory[0].maneuver_type != ManeuverType.FLY_TO_POINT: self._pos_x_ft = self._pos_y_ft = self._pos_z_ft = 0.0 self.current_velocity_fps = 0.0 self.current_heading_deg = 0.0 self.current_pitch_deg = 0.0 else: first_wp = self.trajectory[0] self.current_range_nm = first_wp.target_range_nm or 0.0 self.current_azimuth_deg = first_wp.target_azimuth_deg or 0.0 self.current_altitude_ft = first_wp.target_altitude_ft or 0.0 self.current_velocity_fps = first_wp.target_velocity_fps or 0.0 self.current_heading_deg = first_wp.target_heading_deg or 0.0 self.current_pitch_deg = 0.0 range_ft = self.current_range_nm * NM_TO_FT az_rad = math.radians(self.current_azimuth_deg) self._pos_x_ft = range_ft * math.sin(az_rad) self._pos_y_ft = range_ft * math.cos(az_rad) self._pos_z_ft = self.current_altitude_ft self._activate_next_waypoint() self._update_current_polar_coords() def _activate_next_waypoint(self): """Pre-calculates and activates the next waypoint in the trajectory.""" self._current_waypoint_index += 1 self._time_in_waypoint_s = 0.0 if self._current_waypoint_index >= len(self.trajectory): self._current_waypoint_index = -1; return wp = self.trajectory[self._current_waypoint_index] wp._start_velocity_fps = self.current_velocity_fps wp._start_heading_deg = self.current_heading_deg wp._start_altitude_ft = self.current_altitude_ft if wp.maneuver_type == ManeuverType.FLY_TO_POINT: target_range_ft = (wp.target_range_nm or 0.0) * NM_TO_FT target_az_rad = math.radians(wp.target_azimuth_deg or 0.0) target_x = target_range_ft * math.sin(target_az_rad) target_y = target_range_ft * math.cos(target_az_rad) target_z = wp.target_altitude_ft or self._pos_z_ft dx, dy, dz = target_x - self._pos_x_ft, target_y - self._pos_y_ft, target_z - self._pos_z_ft dist_3d = math.sqrt(dx**2 + dy**2 + dz**2) dist_2d = math.sqrt(dx**2 + dy**2) self.current_heading_deg = math.degrees(math.atan2(dx, dy)) % 360 self.current_pitch_deg = math.degrees(math.atan2(dz, dist_2d)) if dist_2d > 1 else 90.0 if dz > 0 else -90.0 if wp.duration_s and wp.duration_s > 0: self.current_velocity_fps = dist_3d / wp.duration_s wp._calculated_duration_s = wp.duration_s else: self.current_velocity_fps = 0 wp._calculated_duration_s = 999999 else: self.current_pitch_deg = 0 def update_state(self, delta_time_s: float): if not self.active: return if self.use_spline and self._spline_path: self._sim_time_s += delta_time_s total_duration = self._spline_point_times[-1] if total_duration <= 0 or self._sim_time_s >= total_duration: # Simulation finished, stay at the last point self._spline_index = len(self._spline_path) - 1 else: # Find the current index in the spline path based on time progress = self._sim_time_s / total_duration self._spline_index = int(progress * (len(self._spline_path) - 1)) pt = self._spline_path[self._spline_index] # Determine previous point for calculating heading/pitch prev_pt = self._spline_path[self._spline_index - 1] if self._spline_index > 0 else pt # Update position (path is already in feet) self._pos_x_ft, self._pos_y_ft, self._pos_z_ft = pt[0], pt[1], pt[2] # Update kinematics dx = self._pos_x_ft - prev_pt[0] dy = self._pos_y_ft - prev_pt[1] dz = self._pos_z_ft - prev_pt[2] dist_moved_since_last_tick = math.sqrt(dx**2 + dy**2 + dz**2) dist_2d = math.sqrt(dx**2 + dy**2) if dist_2d > 0.1: # Avoid division by zero or noise self.current_heading_deg = math.degrees(math.atan2(dx, dy)) % 360 self.current_pitch_deg = math.degrees(math.atan2(dz, dist_2d)) if delta_time_s > 0: self.current_velocity_fps = dist_moved_since_last_tick / delta_time_s self._update_current_polar_coords() # --- DEBUG LOG --- #import logging #logging.getLogger("target_simulator.spline_debug").info( # f"SplinePos: t={self._sim_time_s:.2f}s idx={self._spline_index} x={self._pos_x_ft:.2f} y={self._pos_y_ft:.2f} z={self._pos_z_ft:.2f}") # --- DEBUG LOG --- #import logging #logging.getLogger("target_simulator.spline_debug").info( # f"SplinePos: t={self._sim_time_s:.2f}s idx={self._spline_index} x={self._pos_x_ft:.2f} y={self._pos_y_ft:.2f} z={self._pos_z_ft:.2f}") else: # Logica classica self._sim_time_s += delta_time_s self._time_in_waypoint_s += delta_time_s if self._current_waypoint_index == -1: return wp = self.trajectory[self._current_waypoint_index] duration = wp._calculated_duration_s if wp._calculated_duration_s is not None else wp.duration_s or 0.0 if self._time_in_waypoint_s >= duration: # Finalize state to match waypoint target if wp.maneuver_type == ManeuverType.FLY_FOR_DURATION: self.current_velocity_fps = wp.target_velocity_fps or self.current_velocity_fps self.current_heading_deg = wp.target_heading_deg or self.current_heading_deg self.current_altitude_ft = wp.target_altitude_ft or self.current_altitude_ft self._activate_next_waypoint() if self._current_waypoint_index == -1: return wp = self.trajectory[self._current_waypoint_index] # Get new active waypoint duration = wp._calculated_duration_s if wp._calculated_duration_s is not None else wp.duration_s or 0.0 progress = self._time_in_waypoint_s / duration if duration > 0 else 1.0 if wp.maneuver_type == ManeuverType.FLY_FOR_DURATION: # Interpolate kinematics during the maneuver self.current_velocity_fps = wp._start_velocity_fps + ((wp.target_velocity_fps or wp._start_velocity_fps) - wp._start_velocity_fps) * progress self.current_altitude_ft = wp._start_altitude_ft + ((wp.target_altitude_ft or wp._start_altitude_ft) - wp._start_altitude_ft) * progress delta_heading = ((wp.target_heading_deg or wp._start_heading_deg) - wp._start_heading_deg + 180) % 360 - 180 self.current_heading_deg = (wp._start_heading_deg + delta_heading * progress) % 360 # Update position using current 3D vector heading_rad = math.radians(self.current_heading_deg) pitch_rad = math.radians(self.current_pitch_deg) dist_moved = self.current_velocity_fps * delta_time_s dist_2d = dist_moved * math.cos(pitch_rad) self._pos_x_ft += dist_2d * math.sin(heading_rad) self._pos_y_ft += dist_2d * math.cos(heading_rad) self._pos_z_ft += dist_moved * math.sin(pitch_rad) self._update_current_polar_coords() def _update_current_polar_coords(self): """Recalculates current polar coordinates from Cartesian ones.""" self.current_range_nm = math.sqrt(self._pos_x_ft**2 + self._pos_y_ft**2) / NM_TO_FT self.current_azimuth_deg = math.degrees(math.atan2(self._pos_x_ft, self._pos_y_ft)) self.current_altitude_ft = self._pos_z_ft def to_dict(self) -> Dict: data = { "target_id": self.target_id, "active": self.active, "traceable": self.traceable, "trajectory": [wp.to_dict() for wp in self.trajectory], "use_spline": self.use_spline } return data class Scenario: def __init__(self, name: str = "Untitled Scenario"): self.name = name self.targets: Dict[int, Target] = {} def add_target(self, target: Target): self.targets[target.target_id] = target def get_target(self, target_id: int) -> Optional[Target]: return self.targets.get(target_id) def remove_target(self, target_id: int): if target_id in self.targets: del self.targets[target_id] def get_all_targets(self) -> List[Target]: return list(self.targets.values()) def reset_simulation(self): """Resets the simulation state for all targets in the scenario.""" for target in self.targets.values(): target.reset_simulation() def update_state(self, delta_time_s: float): """Updates the state of all targets in the scenario.""" for target in self.targets.values(): target.update_state(delta_time_s) def to_dict(self) -> Dict: return { "name": self.name, "targets": [target.to_dict() for target in self.get_all_targets()] } @classmethod def from_dict(cls, data: Dict) -> Scenario: name = data.get("name", "Loaded Scenario") scenario = cls(name=name) for target_data in data.get("targets", []): try: waypoints = [] for wp_data in target_data.get("trajectory", []): wp_data["maneuver_type"] = ManeuverType(wp_data["maneuver_type"]) waypoints.append(Waypoint(**wp_data)) target = Target( target_id=target_data["target_id"], active=target_data.get("active", True), traceable=target_data.get("traceable", True), trajectory=waypoints, use_spline=target_data.get("use_spline", False) ) scenario.add_target(target) except Exception as e: print(f"Skipping invalid target data: {target_data}. Error: {e}") return scenario