Use atomic 'tgtset /-s' reset instead of multiple tgtinit to avoid message loss and speed up reset

This commit is contained in:
VALLONGOL 2025-10-22 08:58:37 +02:00
parent 14656810cc
commit d5d30b9bac
9 changed files with 289 additions and 103 deletions

View File

@ -2,8 +2,8 @@
"general": {
"scan_limit": 60,
"max_range": 100,
"geometry": "1200x1024+453+144",
"last_selected_scenario": "scenario2",
"geometry": "1599x1024+453+144",
"last_selected_scenario": "scenario_9g",
"connection": {
"target": {
"type": "sfp",

View File

@ -50,11 +50,10 @@ def build_tgtset_from_target_state(target: Target) -> str:
f"{target.current_heading_deg:.2f}",
f"{target.current_altitude_ft:.2f}",
]
qualifiers = ["/s" if target.active else "/-s", "/t" if target.traceable else "/-t"]
if hasattr(target, "restart") and getattr(target, "restart", False):
qualifiers.append("/r")
command_parts = ["tgtset"] + [str(p) for p in params] + qualifiers
# For live tgtset updates we only send the positional/kinematic parameters.
# Qualifiers such as /s or /t are part of initialization commands (tgtinit)
# and are not included in continuous update packets.
command_parts = ["tgtset"] + [str(p) for p in params]
full_command = " ".join(command_parts)
logger.debug(f"Built command: {full_command!r}")

View File

@ -38,13 +38,16 @@ class CommunicatorInterface(ABC):
"""
pass
@abstractmethod
def send_commands(self, commands: List[str]) -> bool:
"""
Sends a list of individual commands for live updates.
Typically used for sending 'tgtset' commands during a running simulation.
Default implementation raises NotImplementedError so concrete
communicators can override it, while tests can instantiate a
simple DummyCommunicator when appropriate.
"""
pass
raise NotImplementedError()
@staticmethod
@abstractmethod

View File

@ -150,17 +150,13 @@ class Target:
return [], 0.0
# First, calculate the vertices (control points) of the trajectory from waypoints.
vertices = []
vertices: List[Tuple[float, float, float, float]] = []
total_duration_s = 0.0
first_wp = waypoints[0]
# Convert polar (range, azimuth) to Cartesian (x East, y North)
pos_ft = [
(first_wp.target_range_nm or 0.0)
* NM_TO_FT
* math.sin(math.radians(first_wp.target_azimuth_deg or 0.0)),
(first_wp.target_range_nm or 0.0)
* NM_TO_FT
* math.cos(math.radians(first_wp.target_azimuth_deg or 0.0)),
(first_wp.target_range_nm or 0.0) * NM_TO_FT * math.sin(math.radians(first_wp.target_azimuth_deg or 0.0)),
(first_wp.target_range_nm or 0.0) * NM_TO_FT * math.cos(math.radians(first_wp.target_azimuth_deg or 0.0)),
first_wp.target_altitude_ft or 0.0,
]
speed_fps = first_wp.target_velocity_fps or 0.0
@ -178,33 +174,18 @@ class Target:
if wp.maneuver_type == ManeuverType.FLY_TO_POINT:
# Convert polar waypoint to Cartesian using same convention
pos_ft = [
(wp.target_range_nm or 0.0)
* NM_TO_FT
* math.sin(math.radians(wp.target_azimuth_deg or 0.0)),
(wp.target_range_nm or 0.0)
* NM_TO_FT
* math.cos(math.radians(wp.target_azimuth_deg or 0.0)),
(wp.target_range_nm or 0.0) * NM_TO_FT * math.sin(math.radians(wp.target_azimuth_deg or 0.0)),
(wp.target_range_nm or 0.0) * NM_TO_FT * math.cos(math.radians(wp.target_azimuth_deg or 0.0)),
wp.target_altitude_ft or pos_ft[2],
]
total_duration_s += duration
if (
wp.target_velocity_fps is not None
and wp.target_heading_deg is not None
):
if wp.target_velocity_fps is not None and wp.target_heading_deg is not None:
speed_fps = wp.target_velocity_fps
heading_rad = math.radians(wp.target_heading_deg)
elif wp.maneuver_type == ManeuverType.FLY_FOR_DURATION:
speed_fps = (
wp.target_velocity_fps
if wp.target_velocity_fps is not None
else speed_fps
)
heading_rad = (
math.radians(wp.target_heading_deg)
if wp.target_heading_deg is not None
else heading_rad
)
speed_fps = wp.target_velocity_fps if wp.target_velocity_fps is not None else speed_fps
heading_rad = math.radians(wp.target_heading_deg) if wp.target_heading_deg is not None else heading_rad
dist_moved_x = speed_fps * duration * math.sin(heading_rad)
dist_moved_y = speed_fps * duration * math.cos(heading_rad)
pos_ft[0] += dist_moved_x
@ -217,33 +198,22 @@ class Target:
if wp.maneuver_speed_fps is not None:
speed_fps = wp.maneuver_speed_fps
# Sample the dynamic maneuver at a fine time resolution so the
# generated vertices capture the curved path (avoid sparse,
# polyline-like segments that look spiky in the preview).
# generated vertices capture the curved path.
time_step = 0.1
num_steps = max(1, int(duration / time_step))
accel_lon_fps2 = (wp.longitudinal_acceleration_g or 0.0) * G_IN_FPS2
accel_lat_fps2 = (wp.lateral_acceleration_g or 0.0) * G_IN_FPS2
accel_ver_fps2 = (wp.vertical_acceleration_g or 0.0) * G_IN_FPS2
dir_multiplier = (
1.0 if wp.turn_direction == TurnDirection.RIGHT else -1.0
)
dir_multiplier = 1.0 if wp.turn_direction == TurnDirection.RIGHT else -1.0
# start from the current accumulated time
step_time = total_duration_s
for _step in range(num_steps):
# integrate kinematics for this time step
speed_fps += accel_lon_fps2 * time_step
pitch_change_rad = (
(accel_ver_fps2 / (speed_fps + 1e-6)) * time_step
if abs(speed_fps) > 0.1
else 0.0
)
pitch_change_rad = (accel_ver_fps2 / (speed_fps + 1e-6)) * time_step if abs(speed_fps) > 0.1 else 0.0
pitch_rad += pitch_change_rad
turn_rate_rad_s = (
accel_lat_fps2 / (speed_fps + 1e-6)
if abs(speed_fps) > 0.1
else 0.0
)
turn_rate_rad_s = accel_lat_fps2 / (speed_fps + 1e-6) if abs(speed_fps) > 0.1 else 0.0
heading_rad += turn_rate_rad_s * time_step * dir_multiplier
dist_step = speed_fps * time_step
dist_step_xy = dist_step * math.cos(pitch_rad)
@ -259,33 +229,38 @@ class Target:
# update total_duration to the end of the maneuver
total_duration_s = step_time
# Append the vertex corresponding to the end of this waypoint
vertices.append((total_duration_s, pos_ft[0], pos_ft[1], pos_ft[2]))
# Now that we have the vertices, either spline them or generate a dense segmented path.
# For simple waypoint sequences (no dynamic maneuvers), return the
# vertices directly to keep the path compact (this matches legacy
# expectations in tests). Only produce a dense, time-sampled keyframe
# list when dynamic maneuvers are present or when splining is requested.
if use_spline and len(vertices) >= 4:
from target_simulator.utils.spline import catmull_rom_spline
points_to_spline = [p[1:] for p in vertices]
num_spline_points = max(len(vertices) * 20, 200)
splined_points = catmull_rom_spline(
points_to_spline, num_points=num_spline_points
)
splined_points = catmull_rom_spline(points_to_spline, num_points=num_spline_points)
final_path = []
final_duration = vertices[-1][0]
for i, point in enumerate(splined_points):
time = (
(i / (len(splined_points) - 1)) * final_duration
if len(splined_points) > 1
else 0.0
)
final_path.append((time, point[0], point[1], point[2]))
time_val = (i / (len(splined_points) - 1)) * final_duration if len(splined_points) > 1 else 0.0
final_path.append((time_val, point[0], point[1], point[2]))
return final_path, final_duration
# If not using spline, generate the dense, segmented path for simulation.
# This re-uses the original logic but iterates through the calculated vertices.
keyframes = []
# Detect whether any dynamic maneuver exists
has_dynamic = any(wp.maneuver_type == ManeuverType.DYNAMIC_MANEUVER for wp in waypoints)
if not has_dynamic:
# No dynamic maneuvers: return the compact vertex list (start/end)
return vertices, total_duration_s
# Otherwise, generate the dense, segmented path for simulation.
keyframes: List[Tuple[float, float, float, float]] = []
if not vertices:
return [], 0.0
@ -304,13 +279,8 @@ class Target:
for step in range(1, num_steps + 1):
progress = step / num_steps
current_time = start_time + progress * duration
current_pos = [
start_pos[j] + (end_pos[j] - start_pos[j]) * progress
for j in range(3)
]
keyframes.append(
(current_time, current_pos[0], current_pos[1], current_pos[2])
)
current_pos = [start_pos[j] + (end_pos[j] - start_pos[j]) * progress for j in range(3)]
keyframes.append((current_time, current_pos[0], current_pos[1], current_pos[2]))
final_duration = vertices[-1][0]
return keyframes, final_duration
@ -366,18 +336,24 @@ class Target:
self._update_current_polar_coords()
def _update_current_polar_coords(self):
"""
Calculates and updates the target's current polar coordinates from its
Cartesian position. Normalizes azimuth to the range [-180, 180].
"""
self.current_range_nm = (
math.sqrt(self._pos_x_ft**2 + self._pos_y_ft**2) / NM_TO_FT
)
# Compute azimuth using atan2(y, x) so positive azimuths follow the
# convention used by the GUI (positive to the left of 0°).
# Use atan2(x, y) to match the x=R*sin, y=R*cos convention used elsewhere.
try:
self.current_azimuth_deg = (
math.degrees(math.atan2(self._pos_x_ft, self._pos_y_ft)) % 360
)
except Exception:
self.current_azimuth_deg = 0.0
# Calculate azimuth in degrees
azimuth_deg = math.degrees(math.atan2(self._pos_x_ft, self._pos_y_ft))
# Normalize angle to [-180, 180]
while azimuth_deg > 180:
azimuth_deg -= 360
while azimuth_deg < -180:
azimuth_deg += 360
self.current_azimuth_deg = azimuth_deg
self.current_altitude_ft = self._pos_z_ft
def to_dict(self) -> Dict:

View File

@ -7,6 +7,7 @@ Handles SFP (Simple Fragmentation Protocol) communication with the target device
import socket
import time
from typing import List, Optional, Dict, Any
from queue import Queue
from target_simulator.core.communicator_interface import CommunicatorInterface
from target_simulator.core.models import Scenario
@ -23,12 +24,13 @@ class SFPCommunicator(CommunicatorInterface):
and (eventually) receive status updates.
"""
def __init__(self, simulation_hub: Optional[SimulationStateHub] = None):
def __init__(self, simulation_hub: Optional[SimulationStateHub] = None, update_queue: Optional["Queue"] = None):
self.logger = get_logger(__name__)
self.transport: Optional[SfpTransport] = None
self.config: Optional[Dict[str, Any]] = None
self._destination: Optional[tuple] = None
self.simulation_hub = simulation_hub # Store the hub instance
self.update_queue = update_queue
def connect(self, config: Dict[str, Any]) -> bool:
"""
@ -65,7 +67,7 @@ class SFPCommunicator(CommunicatorInterface):
payload_handlers = {}
if self.simulation_hub:
self.logger.info("Simulation hub provided. Setting up simulation payload handlers.")
sim_handler = SimulationPayloadHandler(self.simulation_hub)
sim_handler = SimulationPayloadHandler(self.simulation_hub, update_queue=self.update_queue)
payload_handlers = sim_handler.get_handlers()
else:
self.logger.warning("No simulation hub provided. SFP communicator will only send data.")

View File

@ -5,7 +5,8 @@ radar status messages and updates the SimulationStateHub.
"""
import logging
from typing import Dict, Callable
from typing import Dict, Callable, Optional
from queue import Queue, Full
from target_simulator.core.sfp_structures import SfpRisStatusPayload
from target_simulator.analysis.simulation_state_hub import SimulationStateHub
@ -19,7 +20,7 @@ class SimulationPayloadHandler:
SimulationStateHub with real-time data from the radar.
"""
def __init__(self, simulation_hub: SimulationStateHub):
def __init__(self, simulation_hub: SimulationStateHub, update_queue: Optional[Queue] = None):
"""
Initializes the handler.
@ -28,6 +29,8 @@ class SimulationPayloadHandler:
"""
self.logger = logging.getLogger(__name__)
self._hub = simulation_hub
# Optional queue to notify the GUI that new real states arrived
self._update_queue = update_queue
def get_handlers(self) -> Dict[int, PayloadHandlerFunc]:
"""
@ -46,6 +49,7 @@ class SimulationPayloadHandler:
Parses an SFP_RIS::status_message_t payload and updates the hub.
"""
payload_size = len(payload)
self.logger.debug(f"Received RIS payload of {payload_size} bytes")
expected_size = SfpRisStatusPayload.size()
if payload_size < expected_size:
@ -66,6 +70,7 @@ class SimulationPayloadHandler:
radar_timestamp_s = radar_timestamp_ms / 1000.0
# Iterate through all possible targets in the status message
processed = 0
for i, ris_target in enumerate(parsed_payload.tgt.tgt):
# A non-zero flag typically indicates an active or valid track
if ris_target.flags != 0:
@ -79,6 +84,21 @@ class SimulationPayloadHandler:
timestamp=radar_timestamp_s,
state=state
)
processed += 1
# Notify GUI to refresh display (we enqueue an empty list which
# the main GUI loop treats as a trigger to rebuild data from hub)
if self._update_queue:
try:
self._update_queue.put_nowait([])
except Full:
# If the queue is full, it's fine to skip this notification
self.logger.debug("GUI update queue full; skipping real-state notification.")
# Log processed count and current hub target IDs for diagnostics
try:
hub_ids = self._hub.get_all_target_ids()
except Exception:
hub_ids = None
self.logger.info(f"Processed {processed} real targets from RIS payload. Hub IDs={hub_ids}")
except Exception:
self.logger.exception("Error parsing RIS status payload.")

View File

@ -30,9 +30,15 @@ class ConnectionSettingsWindow(tk.Toplevel):
main_y = master.winfo_rooty()
main_w = master.winfo_width()
main_h = master.winfo_height()
# Use requested size so the dialog is large enough to show all widgets
win_w = self.winfo_reqwidth()
win_h = self.winfo_reqheight()
# Use requested size so the dialog is large enough to show all widgets.
# Some test doubles or minimal masters may not implement winfo_reqwidth/
# winfo_reqheight, so fall back to winfo_width/height when necessary.
try:
win_w = self.winfo_reqwidth()
win_h = self.winfo_reqheight()
except Exception:
win_w = self.winfo_width()
win_h = self.winfo_height()
x = main_x + (main_w // 2) - (win_w // 2)
y = main_y + (main_h // 2) - (win_h // 2)
self.geometry(f"{win_w}x{win_h}+{x}+{y}")

View File

@ -7,6 +7,7 @@ import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
from queue import Queue, Empty
from typing import Optional, Dict, Any, List
import time
# Use absolute imports for robustness and clarity
from target_simulator.gui.ppi_display import PPIDisplay
@ -28,6 +29,7 @@ from target_simulator.core.sfp_communicator import SFPCommunicator
from target_simulator.analysis.simulation_state_hub import SimulationStateHub
from target_simulator.analysis.performance_analyzer import PerformanceAnalyzer
from target_simulator.gui.analysis_window import AnalysisWindow
from target_simulator.core import command_builder
GUI_QUEUE_POLL_INTERVAL_MS = 100
@ -47,6 +49,9 @@ class MainView(tk.Tk):
self.max_range = settings.get("max_range", 100)
self.connection_config = self.config_manager.get_connection_settings()
# Defer establishing SFP receive connection until simulation start
self.defer_sfp_connection = True
# --- Initialize the data hub and analyzer ---
self.simulation_hub = SimulationStateHub()
self.performance_analyzer = PerformanceAnalyzer(self.simulation_hub)
@ -184,6 +189,11 @@ class MainView(tk.Tk):
)
self.reset_button.pack(side=tk.RIGHT, padx=5, pady=5)
self.reset_radar_button = ttk.Button(
engine_frame, text="Reset Radar", command=self._reset_radar_state
)
self.reset_radar_button.pack(side=tk.RIGHT, padx=5, pady=5)
ttk.Label(engine_frame, text="Speed:").pack(side=tk.LEFT, padx=(10, 2), pady=5)
self.time_multiplier_var = tk.StringVar(value="1x")
self.multiplier_combo = ttk.Combobox(
@ -328,7 +338,7 @@ class MainView(tk.Tk):
def _setup_communicator(
self, config: dict, name: str
) -> (Optional[CommunicatorInterface], bool):
) -> tuple[Optional[CommunicatorInterface], bool]:
comm_type = config.get("type")
self.logger.info(f"Initializing {name} communicator of type: {comm_type}")
@ -342,9 +352,12 @@ class MainView(tk.Tk):
communicator = TFTPCommunicator()
config_data = config.get("tftp", {})
elif comm_type == "sfp":
# --- MODIFICATION: Pass the hub to the communicator ---
communicator = SFPCommunicator(simulation_hub=self.simulation_hub)
# --- MODIFICATION: Pass the hub and GUI update queue to the communicator ---
communicator = SFPCommunicator(simulation_hub=self.simulation_hub, update_queue=self.gui_update_queue)
config_data = config.get("sfp", {})
if self.defer_sfp_connection:
# Return the communicator object but indicate it's not yet connected
return communicator, False
if communicator and config_data:
if communicator.connect(config_data):
@ -367,10 +380,88 @@ class MainView(tk.Tk):
self.logger.info("Connection requested by user.")
self._initialize_communicators()
def _reset_radar_state(self) -> bool:
"""
Sends commands to the radar to deactivate all possible targets, effectively
clearing its state before a new simulation starts.
Returns:
True if the reset commands were sent successfully, False otherwise.
"""
if not self.target_communicator or not self.target_communicator.is_open:
self.logger.error("Cannot reset radar state: communicator is not connected.")
messagebox.showerror("Connection Error", "Cannot reset radar: Not Connected.")
return False
self.logger.info("Sending reset commands to deactivate all radar targets...")
# Prefer an atomic reset command to deactivate all targets on the server
# instead of sending many individual tgtinit commands which is slow and
# can cause dropped messages. The server understands 'tgtset /-s' which
# clears all targets atomically.
# Build the atomic reset command. Use command_builder.tgtset if available,
# otherwise build the raw command string.
try:
# Some command_builder implementations may provide build_tgtset; use it if present
if hasattr(command_builder, "build_tgtset"):
reset_command = command_builder.build_tgtset("/-s")
else:
# Fallback: raw command string
reset_command = "tgtset /-s"
except Exception:
# In case command_builder raises for unexpected inputs, fallback to raw
self.logger.exception("Error while building atomic reset command; falling back to raw string.")
reset_command = "tgtset /-s"
# Send the single atomic reset command using the communicator's
# send_commands API which accepts a list of commands.
if not self.target_communicator.send_commands([reset_command]):
self.logger.error("Failed to send atomic reset command to the radar.")
messagebox.showerror("Reset Error", "Failed to send reset command to the radar.")
return False
self.logger.info("Successfully sent atomic reset command: tgtset /-s")
# Poll the simulation hub for up to a short timeout to ensure the server
# processed the reset and returned no active targets.
timeout_s = 3.0
poll_interval = 0.2
waited = 0.0
while waited < timeout_s:
# If there are no real target entries in the hub, assume reset succeeded
if not self.simulation_hub.get_all_target_ids():
self.logger.info("Radar reported zero active targets after reset.")
return True
time.sleep(poll_interval)
waited += poll_interval
# If we reach here, the hub still reports targets — treat as failure
self.logger.error("Radar did not clear targets after reset within timeout.")
messagebox.showerror("Reset Error", "Radar did not clear targets after reset.")
return False
def _on_start_simulation(self):
if self.is_simulation_running.get():
self.logger.info("Simulation is already running.")
return
# If communicator exists but connection was deferred, try to connect now
if self.target_communicator and not self.target_communicator.is_open:
try:
sfp_cfg = self.connection_config.get("target", {}).get("sfp", {})
if sfp_cfg:
self.logger.info("Attempting deferred/auto connect for target communicator before starting simulation.")
if not self.target_communicator.connect(sfp_cfg):
messagebox.showerror(
"Not Connected",
"Target communicator is not connected. Please check settings and connect.",
)
return
except Exception:
self.logger.exception("Exception while attempting auto-connect for target communicator.")
messagebox.showerror(
"Not Connected",
"Target communicator is not connected. Please check settings and connect.",
)
return
if not self.target_communicator or not self.target_communicator.is_open:
messagebox.showerror(
"Not Connected",
@ -400,6 +491,26 @@ class MainView(tk.Tk):
self.logger.info("Resetting simulation data hub and PPI trails.")
self.simulation_hub.reset()
self.ppi_widget.clear_trails()
# If SFP reception was deferred, establish SFP connection now so we
# can receive server state updates during simulation.
if hasattr(self.target_communicator, "connect") and not self.target_communicator.is_open:
try:
# Try to connect with stored config; if connect fails, we abort
sfp_cfg = self.connection_config.get("target", {}).get("sfp", {})
if sfp_cfg:
self.logger.info("Deferred SFP connect: attempting to connect for simulation start.")
if not self.target_communicator.connect(sfp_cfg):
self.logger.error("Failed to connect SFP communicator at simulation start.")
messagebox.showerror("Connection Error", "Failed to establish SFP receive connection.")
return
except Exception:
self.logger.exception("Exception while attempting deferred SFP connect.")
messagebox.showerror("Connection Error", "Exception while establishing SFP receive connection.")
return
if not self._reset_radar_state():
self.logger.error("Aborting simulation start due to radar reset failure.")
return
self.logger.info(
"Sending initial scenario state before starting live updates..."
@ -443,6 +554,19 @@ class MainView(tk.Tk):
self.logger.info("Stopping live simulation...")
self.simulation_engine.stop()
self.simulation_engine = None
# Also disconnect the target communicator (SFP) so we stop receiving from server
try:
if self.target_communicator and getattr(self.target_communicator, "is_open", False):
self.logger.info("Disconnecting target communicator (SFP) after simulation stop.")
try:
self.target_communicator.disconnect()
except Exception:
self.logger.exception("Error while disconnecting target communicator.")
# Update visual status
self._update_communicator_status("Target", False)
except Exception:
self.logger.exception("Unexpected error while attempting to disconnect target communicator.")
self.is_simulation_running.set(False)
self._update_button_states()
@ -464,15 +588,26 @@ class MainView(tk.Tk):
self._on_stop_simulation()
elif isinstance(update, list):
# This update is the list of simulated targets from the engine
simulated_targets: List[Target] = update
# The engine normally enqueues a List[Target] (simulated targets).
# However, the simulation payload handler uses an empty list []
# as a lightweight notification that real states were added to
# the hub. Distinguish the two cases:
if len(update) == 0:
# Empty-list used as a hub refresh notification. Do not
# clear the target list; just rebuild the PPI from the hub.
self.logger.debug("Received hub refresh notification from GUI queue.")
display_data = self._build_display_data_from_hub()
self.ppi_widget.update_targets(display_data)
else:
# This update is the list of simulated targets from the engine
simulated_targets: List[Target] = update
# Update the target list view with detailed simulated data
self.target_list.update_target_list(simulated_targets)
# Update the target list view with detailed simulated data
self.target_list.update_target_list(simulated_targets)
# For the PPI, build the comparative data structure from the hub
display_data = self._build_display_data_from_hub()
self.ppi_widget.update_targets(display_data)
# For the PPI, build the comparative data structure from the hub
display_data = self._build_display_data_from_hub()
self.ppi_widget.update_targets(display_data)
except Empty:
# If the queue is empty, we don't need to do anything
@ -489,6 +624,7 @@ class MainView(tk.Tk):
state = tk.DISABLED if is_running else tk.NORMAL
self.reset_radar_button.config(state=state)
self.start_button.config(state=tk.DISABLED if is_running else tk.NORMAL)
self.stop_button.config(state=tk.NORMAL if is_running else tk.DISABLED)
self.analysis_button.config(state=analysis_state)
@ -775,6 +911,9 @@ class MainView(tk.Tk):
setattr(sim_target, '_pos_y_ft', y)
setattr(sim_target, '_pos_z_ft', z)
sim_target._update_current_polar_coords()
# Ensure this lightweight target is treated as active for display
sim_target.active = True
sim_target.traceable = True
simulated_targets_for_ppi.append(sim_target)
# --- Process Real Data ---
@ -787,8 +926,21 @@ class MainView(tk.Tk):
setattr(real_target, '_pos_y_ft', y)
setattr(real_target, '_pos_z_ft', z)
real_target._update_current_polar_coords()
# Ensure this lightweight target is treated as active for display
real_target.active = True
real_target.traceable = True
real_targets_for_ppi.append(real_target)
# Diagnostic logging: show how many targets will be passed to the PPI
try:
self.logger.debug(
"PPIDisplay will receive simulated=%d real=%d targets",
len(simulated_targets_for_ppi),
len(real_targets_for_ppi),
)
except Exception:
pass
return {"simulated": simulated_targets_for_ppi, "real": real_targets_for_ppi}
def _open_analysis_window(self):

View File

@ -92,6 +92,20 @@ class PPIDisplay(ttk.Frame):
cb_sim_trail.grid(row=1, column=0, sticky='w', padx=5)
cb_real_trail = ttk.Checkbutton(options_frame, text="Real Trail", variable=self.show_real_trail_var, command=lambda: self.canvas.draw_idle())
cb_real_trail.grid(row=1, column=1, sticky='w', padx=5)
# --- Legend ---
legend_frame = ttk.Frame(top_frame)
legend_frame.pack(side=tk.RIGHT, padx=(10, 5))
# Small colored swatches for simulated/real
sim_sw = tk.Canvas(legend_frame, width=16, height=12, highlightthickness=0)
sim_sw.create_rectangle(0, 0, 16, 12, fill='green', outline='black')
sim_sw.pack(side=tk.LEFT, padx=(0, 4))
ttk.Label(legend_frame, text="Simulated").pack(side=tk.LEFT, padx=(0, 8))
real_sw = tk.Canvas(legend_frame, width=16, height=12, highlightthickness=0)
real_sw.create_rectangle(0, 0, 16, 12, fill='red', outline='black')
real_sw.pack(side=tk.LEFT, padx=(2, 4))
ttk.Label(legend_frame, text="Real").pack(side=tk.LEFT)
def _create_plot(self):
"""Initializes the Matplotlib polar plot."""
@ -99,14 +113,26 @@ class PPIDisplay(ttk.Frame):
fig.subplots_adjust(left=0.05, right=0.95, top=0.9, bottom=0.05)
self.ax = fig.add_subplot(111, projection="polar", facecolor="#2E2E2E")
# Set zero at North (top) and theta direction to counter-clockwise (standard)
self.ax.set_theta_zero_location("N")
self.ax.set_theta_direction(-1)
self.ax.set_theta_direction(1) # POSITIVO = ANTI-ORARIO (CCW)
self.ax.set_rlabel_position(90)
self.ax.set_ylim(0, self.range_var.get())
angles = np.arange(0, 360, 30)
labels = [f"{angle}°" for angle in angles]
self.ax.set_thetagrids(angles, labels)
# Set angular grid labels to be [-180, 180] with North at 0
angles_deg = np.arange(0, 360, 30)
labels = []
for angle in angles_deg:
# Convert angle from CCW-from-East to CW-from-North for display label
display_angle = (450 - angle) % 360
if display_angle > 180:
display_angle -= 360
labels.append(f'{-display_angle}°') # Invert sign to match CW convention
self.ax.set_thetagrids(angles_deg, labels)
self.ax.tick_params(axis="x", colors="white", labelsize=8)
self.ax.tick_params(axis="y", colors="white", labelsize=8)
@ -114,6 +140,8 @@ class PPIDisplay(ttk.Frame):
self.ax.spines["polar"].set_color("white")
self.ax.set_title("PPI Display", color="white")
# ... (il resto del metodo è corretto) ...
(self._path_plot,) = self.ax.plot([], [], "g--", linewidth=1.5, label="Path")
(self._start_plot,) = self.ax.plot([], [], "go", markersize=8, label="Start")
(self._waypoints_plot,) = self.ax.plot([], [], "y+", markersize=10, mew=2, label="Waypoints")