# target_simulator/core/models.py """ Defines the core data models for the application, including 3D dynamic Targets with programmable trajectories and Scenarios. """ from __future__ import annotations import math from enum import Enum from dataclasses import dataclass, field, fields from typing import Dict, List, Optional, Tuple # --- Constants --- MIN_TARGET_ID = 0 # Increase allowed max target id to 32 (inclusive). Historically this was 15 # but RIS/firmware can address up to 32 targets in current deployments. MAX_TARGET_ID = 32 KNOTS_TO_FPS = 1.68781 FPS_TO_KNOTS = 1 / KNOTS_TO_FPS NM_TO_FT = 6076.12 G_IN_FPS2 = 32.174 class ManeuverType(Enum): FLY_TO_POINT = "Fly to Point" FLY_FOR_DURATION = "Fly for Duration" DYNAMIC_MANEUVER = "Dynamic Maneuver" class TurnDirection(Enum): LEFT = "Left" RIGHT = "Right" @dataclass class Waypoint: maneuver_type: ManeuverType = ManeuverType.FLY_FOR_DURATION duration_s: Optional[float] = 10.0 target_range_nm: Optional[float] = None target_azimuth_deg: Optional[float] = None target_altitude_ft: Optional[float] = None target_velocity_fps: Optional[float] = None target_heading_deg: Optional[float] = None # --- MODIFIED PART START --- target_vertical_velocity_fps: Optional[float] = None # --- MODIFIED PART END --- maneuver_speed_fps: Optional[float] = None longitudinal_acceleration_g: Optional[float] = 0.0 lateral_acceleration_g: Optional[float] = 0.0 vertical_acceleration_g: Optional[float] = 0.0 turn_direction: Optional[TurnDirection] = TurnDirection.RIGHT def to_dict(self) -> Dict: data = {"maneuver_type": self.maneuver_type.value} for f in fields(self): if not f.name.startswith("_") and f.name != "maneuver_type": val = getattr(self, f.name) if isinstance(val, Enum): data[f.name] = val.value elif val is not None: data[f.name] = val return data @dataclass class Target: target_id: int trajectory: List[Waypoint] = field(default_factory=list) active: bool = True traceable: bool = True restart: bool = False use_spline: bool = False # --- MODIFIED PART START --- target_type_name: Optional[str] = None rcs: float = 0.0 amplitude: float = 0.0 # --- MODIFIED PART END --- current_range_nm: float = field(init=False, default=0.0) current_azimuth_deg: float = field(init=False, default=0.0) current_altitude_ft: float = field(init=False, default=0.0) current_velocity_fps: float = field(init=False, default=0.0) current_vertical_velocity_fps: float = field(init=False, default=0.0) current_heading_deg: float = field(init=False, default=0.0) current_pitch_deg: float = field(init=False, default=0.0) _pos_x_ft: float = field(init=False, repr=False, default=0.0) _pos_y_ft: float = field(init=False, repr=False, default=0.0) _pos_z_ft: float = field(init=False, repr=False, default=0.0) _sim_time_s: float = field(init=False, default=0.0) _total_duration_s: float = field(init=False, default=0.0) _path: List[Tuple[float, float, float, float]] = field( init=False, repr=False, default_factory=list ) def __post_init__(self): if not (MIN_TARGET_ID <= self.target_id <= MAX_TARGET_ID): raise ValueError( f"Target ID must be between {MIN_TARGET_ID} and {MAX_TARGET_ID}." ) self.reset_simulation() def reset_simulation(self): self._sim_time_s = 0.0 self._generate_path() if self._path: initial_pos = self._path[0] self._pos_x_ft, self._pos_y_ft, self._pos_z_ft = ( initial_pos[1], initial_pos[2], initial_pos[3], ) if len(self._path) > 1: next_pos = self._path[1] dx, dy, dz = ( next_pos[1] - self._pos_x_ft, next_pos[2] - self._pos_y_ft, next_pos[3] - self._pos_z_ft, ) dt = next_pos[0] - initial_pos[0] dist_3d, dist_2d = math.sqrt(dx**2 + dy**2 + dz**2), math.sqrt( dx**2 + dy**2 ) # Compute heading using atan2(dy, dx) where x is North and y is West. # This aligns with the PPI convention (0=North, positive=CCW/West). try: self.current_heading_deg = ( math.degrees(math.atan2(dy, dx)) % 360 if dist_2d > 0.1 else 0.0 ) except Exception: self.current_heading_deg = 0.0 self.current_pitch_deg = ( math.degrees(math.atan2(dz, dist_2d)) if dist_2d > 0.1 else 0.0 ) self.current_velocity_fps = dist_3d / dt if dt > 0 else 0.0 self.current_vertical_velocity_fps = dz / dt if dt > 0 else 0.0 else: ( self.current_heading_deg, self.current_pitch_deg, self.current_velocity_fps, self.current_vertical_velocity_fps, ) = (0.0, 0.0, 0.0, 0.0) else: self._pos_x_ft, self._pos_y_ft, self._pos_z_ft = 0.0, 0.0, 0.0 ( self.current_velocity_fps, self.current_vertical_velocity_fps, self.current_heading_deg, self.current_pitch_deg, ) = (0.0, 0.0, 0.0, 0.0) self._update_current_polar_coords() self.active = bool(self.trajectory) def _generate_path(self): self._path, self._total_duration_s = Target.generate_path_from_waypoints( self.trajectory, self.use_spline ) @staticmethod def generate_path_from_waypoints( waypoints: List[Waypoint], use_spline: bool ) -> Tuple[List[Tuple[float, ...]], float]: """Generate a time-ordered path from a list of Waypoints. Returns a tuple (path, total_duration). The path is a list of tuples (time_s, x_ft, y_ft, z_ft). For simple waypoint sequences the returned path is compact (start/end vertices). When dynamic maneuvers are present or when `use_spline` is True a denser sampled or splined path is returned suitable for simulation. """ if not waypoints or waypoints[0].maneuver_type != ManeuverType.FLY_TO_POINT: return [], 0.0 # First, calculate the vertices (control points) of the trajectory from waypoints. vertices: List[Tuple[float, float, float, float]] = [] total_duration_s = 0.0 first_wp = waypoints[0] # Convert polar (range, azimuth CCW) to Cartesian (x North, y West) az_rad = math.radians(first_wp.target_azimuth_deg or 0.0) range_ft = (first_wp.target_range_nm or 0.0) * NM_TO_FT pos_ft = [ range_ft * math.cos(az_rad), range_ft * math.sin(az_rad), first_wp.target_altitude_ft or 0.0, ] speed_fps = first_wp.target_velocity_fps or 0.0 # --- MODIFIED PART START --- vert_speed_fps = first_wp.target_vertical_velocity_fps or 0.0 # --- MODIFIED PART END --- heading_rad = math.radians(first_wp.target_heading_deg or 0.0) pitch_rad = 0.0 vertices.append((total_duration_s, pos_ft[0], pos_ft[1], pos_ft[2])) for i, wp in enumerate(waypoints): if i == 0: continue duration = wp.duration_s or 0.0 if wp.maneuver_type == ManeuverType.FLY_TO_POINT: # Convert polar waypoint to Cartesian using same convention az_rad = math.radians(wp.target_azimuth_deg or 0.0) range_ft = (wp.target_range_nm or 0.0) * NM_TO_FT # --- MODIFIED PART START --- # Calcola lo spostamento richiesto per raggiungere il punto dx = range_ft * math.cos(az_rad) - pos_ft[0] dy = range_ft * math.sin(az_rad) - pos_ft[1] dz = (wp.target_altitude_ft or pos_ft[2]) - pos_ft[2] dist_3d = math.sqrt(dx**2 + dy**2 + dz**2) if duration > 0: # Calcola le velocità necessarie speed_fps = dist_3d / duration vert_speed_fps = dz / duration heading_rad = math.atan2(dy, dx) else: speed_fps = 0 vert_speed_fps = 0 pos_ft = [ range_ft * math.cos(az_rad), range_ft * math.sin(az_rad), wp.target_altitude_ft or pos_ft[2], ] total_duration_s += duration # --- MODIFIED PART END --- elif wp.maneuver_type == ManeuverType.FLY_FOR_DURATION: speed_fps = ( wp.target_velocity_fps if wp.target_velocity_fps is not None else speed_fps ) heading_rad = ( math.radians(wp.target_heading_deg) if wp.target_heading_deg is not None else heading_rad ) # --- MODIFIED PART START --- vert_speed_fps = ( wp.target_vertical_velocity_fps if wp.target_vertical_velocity_fps is not None else vert_speed_fps ) # Convention: heading is 0=N, +=CCW. dist_moved_x = speed_fps * duration * math.cos(heading_rad) # North dist_moved_y = speed_fps * duration * math.sin(heading_rad) # West dist_moved_z = vert_speed_fps * duration pos_ft[0] += dist_moved_x pos_ft[1] += dist_moved_y pos_ft[2] += dist_moved_z # --- MODIFIED PART END --- total_duration_s += duration elif wp.maneuver_type == ManeuverType.DYNAMIC_MANEUVER: if wp.maneuver_speed_fps is not None: speed_fps = wp.maneuver_speed_fps # --- MODIFIED PART START --- if wp.target_vertical_velocity_fps is not None: vert_speed_fps = wp.target_vertical_velocity_fps # --- MODIFIED PART END --- time_step = 0.1 num_steps = max(1, int(duration / time_step)) accel_lon_fps2 = (wp.longitudinal_acceleration_g or 0.0) * G_IN_FPS2 accel_lat_fps2 = (wp.lateral_acceleration_g or 0.0) * G_IN_FPS2 accel_ver_fps2 = (wp.vertical_acceleration_g or 0.0) * G_IN_FPS2 dir_multiplier = ( -1.0 if wp.turn_direction == TurnDirection.RIGHT else 1.0 ) step_time = total_duration_s for _step in range(num_steps): speed_fps += accel_lon_fps2 * time_step vert_speed_fps += accel_ver_fps2 * time_step turn_rate_rad_s = ( accel_lat_fps2 / (speed_fps + 1e-6) if abs(speed_fps) > 0.1 else 0.0 ) heading_rad += turn_rate_rad_s * time_step * dir_multiplier dist_step_xy = speed_fps * time_step dist_step_z = vert_speed_fps * time_step pos_ft[0] += dist_step_xy * math.cos(heading_rad) pos_ft[1] += dist_step_xy * math.sin(heading_rad) pos_ft[2] += dist_step_z step_time += time_step vertices.append((step_time, pos_ft[0], pos_ft[1], pos_ft[2])) total_duration_s = step_time vertices.append((total_duration_s, pos_ft[0], pos_ft[1], pos_ft[2])) # ... (il resto del metodo per spline e keyframe rimane invariato) # Now that we have the vertices, either spline them or generate a dense segmented path. if use_spline and len(vertices) >= 4: from target_simulator.utils.spline import catmull_rom_spline points_to_spline = [p[1:] for p in vertices] num_spline_points = max(len(vertices) * 20, 200) splined_points = catmull_rom_spline( points_to_spline, num_points=num_spline_points ) final_path = [] final_duration = vertices[-1][0] for i, point in enumerate(splined_points): time_val = ( (i / (len(splined_points) - 1)) * final_duration if len(splined_points) > 1 else 0.0 ) final_path.append((time_val, point[0], point[1], point[2])) return final_path, final_duration has_dynamic = any( wp.maneuver_type == ManeuverType.DYNAMIC_MANEUVER for wp in waypoints ) if not has_dynamic: return vertices, total_duration_s keyframes: List[Tuple[float, float, float, float]] = [] if not vertices: return [], 0.0 keyframes.append(vertices[0]) for i in range(len(vertices) - 1): start_v = vertices[i] end_v = vertices[i + 1] start_time, start_pos = start_v[0], list(start_v[1:]) end_time, end_pos = end_v[0], list(end_v[1:]) duration = end_time - start_time if duration <= 0: continue num_steps = max(1, int(duration / 0.1)) for step in range(1, num_steps + 1): progress = step / num_steps current_time = start_time + progress * duration current_pos = [ start_pos[j] + (end_pos[j] - start_pos[j]) * progress for j in range(3) ] keyframes.append( (current_time, current_pos[0], current_pos[1], current_pos[2]) ) final_duration = vertices[-1][0] return keyframes, final_duration def update_state(self, delta_time_s: float): """Advance the target forward by delta_time_s seconds.""" if not self.active or not self._path: return self._sim_time_s += delta_time_s current_sim_time = min(self._sim_time_s, self._total_duration_s) if current_sim_time >= self._total_duration_s: final_pos = self._path[-1] self._pos_x_ft, self._pos_y_ft, self._pos_z_ft = ( final_pos[1], final_pos[2], final_pos[3], ) self.current_vertical_velocity_fps = 0.0 if self._sim_time_s >= self._total_duration_s: self.active = False else: p1, p2 = self._path[0], self._path[-1] for i in range(len(self._path) - 1): if self._path[i][0] <= current_sim_time <= self._path[i + 1][0]: p1, p2 = self._path[i], self._path[i + 1] break segment_duration = p2[0] - p1[0] progress = ( (current_sim_time - p1[0]) / segment_duration if segment_duration > 0 else 1.0 ) prev_x, prev_y, prev_z = self._pos_x_ft, self._pos_y_ft, self._pos_z_ft self._pos_x_ft = p1[1] + (p2[1] - p1[1]) * progress self._pos_y_ft = p1[2] + (p2[2] - p1[2]) * progress self._pos_z_ft = p1[3] + (p2[3] - p1[3]) * progress dx, dy, dz = ( self._pos_x_ft - prev_x, self._pos_y_ft - prev_y, self._pos_z_ft - prev_z, ) dist_3d, dist_2d = math.sqrt(dx**2 + dy**2 + dz**2), math.sqrt( dx**2 + dy**2 ) if delta_time_s > 0: self.current_velocity_fps = dist_3d / delta_time_s self.current_vertical_velocity_fps = dz / delta_time_s if dist_2d > 0.1: try: self.current_heading_deg = math.degrees(math.atan2(dy, dx)) % 360 except Exception: self.current_heading_deg = 0.0 self.current_pitch_deg = math.degrees(math.atan2(dz, dist_2d)) self._update_current_polar_coords() def _update_current_polar_coords(self): """ Calculates and updates the target's current polar coordinates from its Cartesian position. Normalizes azimuth to the range [-180, 180]. Internal convention: _pos_x_ft is North, _pos_y_ft is West. """ self.current_range_nm = ( math.sqrt(self._pos_x_ft**2 + self._pos_y_ft**2) / NM_TO_FT ) # Calculate azimuth in degrees. # Convention: 0° = North, positive angles = counter-clockwise (West) # Standard atan2(y, x) gives angle from positive x-axis. # Here, x=North, y=West. So atan2(y,x) gives azimuth from North (CCW positive). azimuth_deg = math.degrees(math.atan2(self._pos_y_ft, self._pos_x_ft)) # Normalize angle to [-180, 180] while azimuth_deg > 180: azimuth_deg -= 360 while azimuth_deg < -180: azimuth_deg += 360 self.current_azimuth_deg = azimuth_deg self.current_altitude_ft = self._pos_z_ft def to_dict(self) -> Dict: # --- MODIFIED PART START --- return { "target_id": self.target_id, "active": self.active, "traceable": self.traceable, "trajectory": [wp.to_dict() for wp in self.trajectory], "use_spline": self.use_spline, "target_type_name": self.target_type_name, "rcs": self.rcs, "amplitude": self.amplitude, } # --- MODIFIED PART END --- class Scenario: def __init__(self, name: str = "Untitled Scenario"): self.name = name self.targets: Dict[int, Target] = {} def add_target(self, target: Target): self.targets[target.target_id] = target def get_target(self, target_id: int) -> Optional[Target]: return self.targets.get(target_id) def remove_target(self, target_id: int): if target_id in self.targets: del self.targets[target_id] def get_all_targets(self) -> List[Target]: return list(self.targets.values()) def reset_simulation(self): for target in self.targets.values(): target.reset_simulation() def update_state(self, delta_time_s: float): """Advance simulation state for all targets by delta_time_s seconds.""" for target in self.targets.values(): target.update_state(delta_time_s) def is_finished(self) -> bool: """Return True when all targets in the scenario are inactive.""" return all(not target.active for target in self.targets.values()) def to_dict(self) -> Dict: """Serialize the scenario to a plain dict suitable for JSON encoding.""" return { "name": self.name, "targets": [t.to_dict() for t in self.get_all_targets()], } @classmethod def from_dict(cls, data: Dict) -> "Scenario": """Construct a Scenario instance from a dictionary (e.g., loaded JSON). The method performs basic validation and will skip invalid target entries while continuing to load the rest. """ scenario = cls(name=data.get("name", "Loaded Scenario")) for target_data in data.get("targets", []): try: waypoints = [] for wp_data in target_data.get("trajectory", []): # Normalize legacy 'Constant Turn' representation if present if wp_data.get("maneuver_type") == "Constant Turn": wp_data["maneuver_type"] = "Dynamic Maneuver" wp_data.setdefault("longitudinal_acceleration_g", 0.0) wp_data.setdefault("vertical_acceleration_g", 0.0) wp_data["maneuver_type"] = ManeuverType(wp_data["maneuver_type"]) if "turn_direction" in wp_data and wp_data["turn_direction"]: wp_data["turn_direction"] = TurnDirection( wp_data["turn_direction"] ) valid_keys = {f.name for f in fields(Waypoint)} filtered_wp_data = { k: v for k, v in wp_data.items() if k in valid_keys } waypoints.append(Waypoint(**filtered_wp_data)) # --- MODIFIED PART START --- target = Target( target_id=target_data["target_id"], active=target_data.get("active", True), traceable=target_data.get("traceable", True), trajectory=waypoints, use_spline=target_data.get("use_spline", False), target_type_name=target_data.get("target_type_name"), rcs=target_data.get("rcs", 0.0), amplitude=target_data.get("amplitude", 0.0), ) # --- MODIFIED PART END --- scenario.add_target(target) except Exception as e: # Keep loading remaining targets; emit lightweight diagnostic. print(f"Skipping invalid target data: {target_data}. Error: {e}") return scenario