440 lines
18 KiB
Python
440 lines
18 KiB
Python
# target_simulator/gui/ppi_display.py
|
|
|
|
"""
|
|
A reusable Tkinter widget that displays a Plan Position Indicator (PPI)
|
|
using Matplotlib, capable of showing both live targets and trajectory previews,
|
|
and comparing simulated vs. real-time data.
|
|
"""
|
|
|
|
import tkinter as tk
|
|
from tkinter import ttk
|
|
import math
|
|
import time
|
|
import logging
|
|
import numpy as np
|
|
import collections
|
|
from matplotlib.figure import Figure
|
|
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
|
|
from typing import List, Dict, Union
|
|
|
|
from target_simulator.core.models import Target, Waypoint, ManeuverType, NM_TO_FT
|
|
|
|
# Module-level logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PPIDisplay(ttk.Frame):
|
|
"""
|
|
A custom widget for the PPI radar display.
|
|
"""
|
|
|
|
TRAIL_LENGTH = 100
|
|
|
|
def __init__(
|
|
self,
|
|
master,
|
|
max_range_nm: int = 100,
|
|
scan_limit_deg: int = 60,
|
|
trail_length: int = None,
|
|
):
|
|
super().__init__(master)
|
|
self.max_range = max_range_nm
|
|
self.scan_limit_deg = scan_limit_deg
|
|
self.sim_target_artists, self.real_target_artists = [], []
|
|
self.sim_trail_artists, self.real_trail_artists = [], []
|
|
# Keep label artists separated so we can update simulated labels
|
|
# without removing real labels when a simulated-only update happens.
|
|
self.sim_label_artists, self.real_label_artists = [], []
|
|
self.trail_length = trail_length or self.TRAIL_LENGTH
|
|
self._trails = {
|
|
"simulated": collections.defaultdict(
|
|
lambda: collections.deque(maxlen=self.trail_length)
|
|
),
|
|
"real": collections.defaultdict(
|
|
lambda: collections.deque(maxlen=self.trail_length)
|
|
),
|
|
}
|
|
self.preview_artists = []
|
|
self.show_sim_points_var = tk.BooleanVar(value=True)
|
|
self.show_real_points_var = tk.BooleanVar(value=True)
|
|
self.show_sim_trail_var = tk.BooleanVar(value=False)
|
|
self.show_real_trail_var = tk.BooleanVar(value=True)
|
|
self.canvas = None
|
|
self._create_controls()
|
|
self._create_plot()
|
|
# Track timestamps of real update calls to compute display update rate
|
|
self._real_update_timestamps = collections.deque(maxlen=10000)
|
|
self._last_update_summary_time = time.monotonic()
|
|
self._update_summary_interval_s = 1.0
|
|
|
|
def _on_display_options_changed(self):
|
|
# A full redraw is needed, but we don't have the last data sets.
|
|
# The best approach is to clear everything. The next update cycle from
|
|
# the simulation engine and/or the server communicator will repopulate
|
|
# the display with the correct visibility settings.
|
|
self.clear_all_targets()
|
|
if self.canvas:
|
|
self.canvas.draw()
|
|
|
|
def _create_controls(self):
|
|
top_frame = ttk.Frame(self)
|
|
top_frame.pack(side=tk.TOP, fill=tk.X, padx=5, pady=5)
|
|
self.controls_frame = ttk.Frame(top_frame)
|
|
self.controls_frame.pack(side=tk.LEFT, fill=tk.X, expand=True)
|
|
ttk.Label(self.controls_frame, text="Range (NM):").pack(side=tk.LEFT)
|
|
all_steps = [10, 20, 40, 80, 100, 160, 240, 320]
|
|
valid_steps = sorted([s for s in all_steps if s <= self.max_range])
|
|
if not valid_steps or self.max_range not in valid_steps:
|
|
valid_steps.append(self.max_range)
|
|
valid_steps.sort()
|
|
self.range_var = tk.IntVar(value=self.max_range)
|
|
self.range_selector = ttk.Combobox(
|
|
self.controls_frame,
|
|
textvariable=self.range_var,
|
|
values=valid_steps,
|
|
state="readonly",
|
|
width=5,
|
|
)
|
|
self.range_selector.pack(side=tk.LEFT, padx=5)
|
|
options_frame = ttk.LabelFrame(top_frame, text="Display Options")
|
|
options_frame.pack(side=tk.RIGHT, padx=(10, 0))
|
|
cb_sim_points = ttk.Checkbutton(
|
|
options_frame,
|
|
text="Sim Points",
|
|
variable=self.show_sim_points_var,
|
|
command=self._on_display_options_changed,
|
|
)
|
|
cb_sim_points.grid(row=0, column=0, sticky="w", padx=5)
|
|
cb_real_points = ttk.Checkbutton(
|
|
options_frame,
|
|
text="Real Points",
|
|
variable=self.show_real_points_var,
|
|
command=self._on_display_options_changed,
|
|
)
|
|
cb_real_points.grid(row=0, column=1, sticky="w", padx=5)
|
|
cb_sim_trail = ttk.Checkbutton(
|
|
options_frame,
|
|
text="Sim Trail",
|
|
variable=self.show_sim_trail_var,
|
|
command=self._on_display_options_changed,
|
|
)
|
|
cb_sim_trail.grid(row=1, column=0, sticky="w", padx=5)
|
|
cb_real_trail = ttk.Checkbutton(
|
|
options_frame,
|
|
text="Real Trail",
|
|
variable=self.show_real_trail_var,
|
|
command=self._on_display_options_changed,
|
|
)
|
|
cb_real_trail.grid(row=1, column=1, sticky="w", padx=5)
|
|
legend_frame = ttk.Frame(top_frame)
|
|
legend_frame.pack(side=tk.RIGHT, padx=(10, 5))
|
|
sim_sw = tk.Canvas(legend_frame, width=16, height=12, highlightthickness=0)
|
|
sim_sw.create_rectangle(0, 0, 16, 12, fill="green", outline="black")
|
|
sim_sw.pack(side=tk.LEFT, padx=(0, 4))
|
|
ttk.Label(legend_frame, text="Simulated").pack(side=tk.LEFT, padx=(0, 8))
|
|
real_sw = tk.Canvas(legend_frame, width=16, height=12, highlightthickness=0)
|
|
real_sw.create_rectangle(0, 0, 16, 12, fill="red", outline="black")
|
|
real_sw.pack(side=tk.LEFT, padx=(2, 4))
|
|
ttk.Label(legend_frame, text="Real").pack(side=tk.LEFT)
|
|
|
|
def _create_plot(self):
|
|
fig = Figure(figsize=(5, 5), dpi=100, facecolor="#3E3E3E")
|
|
fig.subplots_adjust(left=0.05, right=0.95, top=0.9, bottom=0.05)
|
|
self.ax = fig.add_subplot(111, projection="polar", facecolor="#2E2E2E")
|
|
self.ax.set_theta_zero_location("N")
|
|
self.ax.set_theta_direction(1)
|
|
self.ax.set_rlabel_position(90)
|
|
self.ax.set_ylim(0, self.range_var.get())
|
|
angles_deg = np.arange(0, 360, 30)
|
|
labels = [f"{(a - 360) if a > 180 else a}°" for a in angles_deg]
|
|
self.ax.set_thetagrids(angles_deg, labels)
|
|
self.ax.tick_params(axis="x", colors="white", labelsize=8)
|
|
self.ax.tick_params(axis="y", colors="white", labelsize=8)
|
|
self.ax.grid(color="white", linestyle="--", linewidth=0.5, alpha=0.5)
|
|
self.ax.spines["polar"].set_color("white")
|
|
self.ax.set_title("PPI Display", color="white")
|
|
(self._path_plot,) = self.ax.plot([], [], "g--", linewidth=1.5)
|
|
(self._start_plot,) = self.ax.plot([], [], "go", markersize=8)
|
|
(self._waypoints_plot,) = self.ax.plot([], [], "y+", markersize=10, mew=2)
|
|
self.preview_artists = [self._path_plot, self._start_plot, self._waypoints_plot]
|
|
limit_rad = np.deg2rad(self.scan_limit_deg)
|
|
(self._scan_line_1,) = self.ax.plot(
|
|
[limit_rad, limit_rad], [0, self.max_range], "y--", linewidth=1
|
|
)
|
|
(self._scan_line_2,) = self.ax.plot(
|
|
[-limit_rad, -limit_rad], [0, self.max_range], "y--", linewidth=1
|
|
)
|
|
self.canvas = FigureCanvasTkAgg(fig, master=self)
|
|
self.canvas.draw()
|
|
self.canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=True)
|
|
self.range_selector.bind("<<ComboboxSelected>>", self._on_range_selected)
|
|
self._update_scan_lines()
|
|
|
|
def clear_all_targets(self):
|
|
"""Clears all target artists from the display."""
|
|
all_artists = (
|
|
self.sim_target_artists
|
|
+ self.real_target_artists
|
|
+ self.sim_trail_artists
|
|
+ self.real_trail_artists
|
|
+ self.sim_label_artists
|
|
+ self.real_label_artists
|
|
)
|
|
for artist in all_artists:
|
|
artist.remove()
|
|
self.sim_target_artists.clear()
|
|
self.real_target_artists.clear()
|
|
self.sim_trail_artists.clear()
|
|
self.real_trail_artists.clear()
|
|
self.sim_label_artists.clear()
|
|
self.real_label_artists.clear()
|
|
|
|
def update_simulated_targets(self, targets: List[Target]):
|
|
"""Updates and redraws only the simulated targets."""
|
|
self._update_target_category(targets, "simulated")
|
|
if self.canvas:
|
|
self.canvas.draw()
|
|
|
|
def update_real_targets(self, targets: List[Target]):
|
|
"""Updates and redraws only the real targets."""
|
|
# Instrument update rate
|
|
try:
|
|
now = time.monotonic()
|
|
self._real_update_timestamps.append(now)
|
|
# Periodic (throttled) summary log of PPI update rate
|
|
if (now - self._last_update_summary_time) >= self._update_summary_interval_s:
|
|
# compute recent updates per second
|
|
cutoff = now - self._update_summary_interval_s
|
|
cnt = 0
|
|
for ts in reversed(self._real_update_timestamps):
|
|
if ts >= cutoff:
|
|
cnt += 1
|
|
else:
|
|
break
|
|
rate = cnt / float(self._update_summary_interval_s) if self._update_summary_interval_s > 0 else float(cnt)
|
|
try:
|
|
logger.info("[PPIDisplay] update_real_targets: recent_rate=%.1f updates/s display_targets=%d", rate, len(targets))
|
|
except Exception:
|
|
pass
|
|
self._last_update_summary_time = now
|
|
except Exception:
|
|
pass
|
|
|
|
self._update_target_category(targets, "real")
|
|
if self.canvas:
|
|
self.canvas.draw()
|
|
|
|
def get_real_update_rate(self, window_seconds: float = 1.0) -> float:
|
|
"""
|
|
Returns approximate PPI "real targets" update rate (updates/sec) over the
|
|
last `window_seconds` seconds.
|
|
"""
|
|
try:
|
|
now = time.monotonic()
|
|
cutoff = now - float(window_seconds)
|
|
count = 0
|
|
for ts in reversed(self._real_update_timestamps):
|
|
if ts >= cutoff:
|
|
count += 1
|
|
else:
|
|
break
|
|
return count / float(window_seconds) if window_seconds > 0 else float(count)
|
|
except Exception:
|
|
return 0.0
|
|
|
|
def _update_target_category(self, new_data: List[Target], category: str):
|
|
"""
|
|
Generic helper to update targets for a specific category ('simulated' or 'real').
|
|
"""
|
|
if category == "simulated":
|
|
target_artists = self.sim_target_artists
|
|
trail_artists = self.sim_trail_artists
|
|
label_artists = self.sim_label_artists
|
|
trail_data = self._trails["simulated"]
|
|
show_points = self.show_sim_points_var.get()
|
|
show_trail = self.show_sim_trail_var.get()
|
|
color = "green"
|
|
trail_color = "limegreen"
|
|
else: # "real"
|
|
target_artists = self.real_target_artists
|
|
trail_artists = self.real_trail_artists
|
|
label_artists = self.real_label_artists
|
|
trail_data = self._trails["real"]
|
|
show_points = self.show_real_points_var.get()
|
|
show_trail = self.show_real_trail_var.get()
|
|
color = "red"
|
|
trail_color = "tomato"
|
|
|
|
# 1. Clear existing artists for this category
|
|
for artist in target_artists + trail_artists + label_artists:
|
|
artist.remove()
|
|
target_artists.clear()
|
|
trail_artists.clear()
|
|
label_artists.clear()
|
|
|
|
# 2. Update trail data
|
|
if show_points or show_trail:
|
|
for t in new_data:
|
|
if t.active:
|
|
pos = (np.deg2rad(-t.current_azimuth_deg), t.current_range_nm)
|
|
trail_data[t.target_id].append(pos)
|
|
|
|
# 3. Draw new visuals
|
|
if show_points:
|
|
self._draw_target_visuals(
|
|
[t for t in new_data if t.active], color, target_artists, label_artists
|
|
)
|
|
if show_trail:
|
|
self._draw_trails(trail_data, trail_color, trail_artists)
|
|
|
|
def _draw_target_visuals(
|
|
self,
|
|
targets: List[Target],
|
|
color: str,
|
|
artist_list: List,
|
|
label_artist_list: List,
|
|
):
|
|
vector_len_nm = self.range_var.get() / 20.0
|
|
|
|
# Determine marker size based on the target type (color)
|
|
marker_size = (
|
|
6 if color == "red" else 8
|
|
) # Simulated targets (green) are smaller
|
|
|
|
for target in targets:
|
|
# Plotting position (theta, r)
|
|
r_nm = target.current_range_nm
|
|
# MODIFICATION: Removed negation. The azimuth from the model is now used directly.
|
|
theta_rad_plot = np.deg2rad(target.current_azimuth_deg)
|
|
(dot,) = self.ax.plot(
|
|
theta_rad_plot, r_nm, "o", markersize=marker_size, color=color
|
|
)
|
|
artist_list.append(dot)
|
|
|
|
# --- Robust Vector Calculation ---
|
|
az_rad_model = math.radians(target.current_azimuth_deg)
|
|
x_start_nm = r_nm * math.sin(az_rad_model)
|
|
y_start_nm = r_nm * math.cos(az_rad_model)
|
|
# MODIFICATION: Heading should also be consistent.
|
|
# A positive heading (e.g. 10 deg) means turning left (CCW), which matches
|
|
# the standard polar plot direction.
|
|
hdg_rad_plot = math.radians(target.current_heading_deg)
|
|
dx_nm = vector_len_nm * math.sin(hdg_rad_plot)
|
|
dy_nm = vector_len_nm * math.cos(hdg_rad_plot)
|
|
x_end_nm = x_start_nm + dx_nm
|
|
y_end_nm = y_start_nm + dy_nm
|
|
r_end_nm = math.hypot(x_end_nm, y_end_nm)
|
|
# MODIFICATION: Removed negation here as well for consistency.
|
|
theta_end_rad_plot = math.atan2(x_end_nm, y_end_nm)
|
|
|
|
(line,) = self.ax.plot(
|
|
[theta_rad_plot, theta_end_rad_plot],
|
|
[r_nm, r_end_nm],
|
|
color=color,
|
|
linewidth=1.2,
|
|
)
|
|
artist_list.append(line)
|
|
|
|
txt = self.ax.text(
|
|
theta_rad_plot,
|
|
r_nm + (vector_len_nm * 0.5),
|
|
str(target.target_id),
|
|
color="white",
|
|
fontsize=8,
|
|
ha="center",
|
|
va="bottom",
|
|
)
|
|
label_artist_list.append(txt)
|
|
|
|
def _draw_trails(self, trail_data: Dict, color: str, artist_list: List):
|
|
for trail in trail_data.values():
|
|
if len(trail) > 1:
|
|
thetas, rs = zip(*trail)
|
|
(line,) = self.ax.plot(
|
|
thetas, rs, color=color, linestyle="-", linewidth=0.8, alpha=0.7
|
|
)
|
|
artist_list.append(line)
|
|
|
|
def clear_trails(self):
|
|
self._trails["simulated"].clear()
|
|
self._trails["real"].clear()
|
|
self.clear_all_targets()
|
|
if self.canvas:
|
|
self.canvas.draw()
|
|
|
|
def _update_scan_lines(self):
|
|
max_r = self.ax.get_ylim()[1]
|
|
limit_rad = np.deg2rad(self.scan_limit_deg)
|
|
self._scan_line_1.set_data([limit_rad, limit_rad], [0, max_r])
|
|
self._scan_line_2.set_data([-limit_rad, -limit_rad], [0, max_r])
|
|
|
|
def _on_range_selected(self, event=None):
|
|
self.ax.set_ylim(0, self.range_var.get())
|
|
self._update_scan_lines()
|
|
if self.canvas:
|
|
self.canvas.draw()
|
|
|
|
def clear_previews(self):
|
|
for artist in self.preview_artists:
|
|
artist.set_data([], [])
|
|
if self.canvas:
|
|
self.canvas.draw()
|
|
|
|
def draw_trajectory_preview(self, waypoints: List[Waypoint], use_spline: bool):
|
|
self.clear_previews()
|
|
self.clear_trails()
|
|
if not waypoints or waypoints[0].maneuver_type != ManeuverType.FLY_TO_POINT:
|
|
return
|
|
path, _ = Target.generate_path_from_waypoints(waypoints, use_spline)
|
|
if not path:
|
|
return
|
|
path_thetas, path_rs = [], []
|
|
for point in path:
|
|
x_ft, y_ft = point[1], point[2]
|
|
r_ft = math.sqrt(x_ft**2 + y_ft**2)
|
|
# Use the same plotting convention used elsewhere: theta_plot = atan2(x, y).
|
|
# This convention is established in the _draw_target_visuals helper,
|
|
# which computes theta via -current_azimuth_deg.
|
|
az_rad_plot = math.atan2(x_ft, y_ft)
|
|
path_rs.append(r_ft / NM_TO_FT)
|
|
path_thetas.append(az_rad_plot)
|
|
self._path_plot.set_data(path_thetas, path_rs)
|
|
wp_thetas, wp_rs = [], []
|
|
for wp in waypoints:
|
|
if wp.maneuver_type == ManeuverType.FLY_TO_POINT:
|
|
r_nm = wp.target_range_nm or 0.0
|
|
# The path uses theta_plot = atan2(x, y). Waypoint azimuths
|
|
# provided in the waypoint are geometric azimuth degrees
|
|
# (0 = North, positive CCW). Convert directly to radians so
|
|
# plotted waypoint markers align with the generated path.
|
|
az_rad_plot = np.deg2rad(wp.target_azimuth_deg or 0.0)
|
|
wp_rs.append(r_nm)
|
|
wp_thetas.append(az_rad_plot)
|
|
self._waypoints_plot.set_data(wp_thetas, wp_rs)
|
|
start_wp = waypoints[0]
|
|
start_r = start_wp.target_range_nm or 0.0
|
|
start_theta = np.deg2rad(start_wp.target_azimuth_deg or 0.0)
|
|
self._start_plot.set_data([start_theta], [start_r])
|
|
if self.canvas:
|
|
self.canvas.draw()
|
|
|
|
def reconfigure_radar(self, max_range_nm: int, scan_limit_deg: int):
|
|
self.max_range, self.scan_limit_deg = max_range_nm, scan_limit_deg
|
|
steps = [10, 20, 40, 80, 100, 160, 240, 320]
|
|
valid_steps = sorted(
|
|
[s for s in steps if s <= max_range_nm]
|
|
+ ([max_range_nm] if max_range_nm not in steps else [])
|
|
)
|
|
self.range_selector["values"] = valid_steps
|
|
if self.range_var.get() not in valid_steps:
|
|
self.range_var.set(max_range_nm)
|
|
self._on_range_selected()
|
|
|
|
def set_connect_callback(self, cb):
|
|
self._connect_callback = cb
|
|
|
|
def update_connect_state(self, is_connected: bool):
|
|
# This method should only reflect state, not change UI elements.
|
|
# The parent window is responsible for enabling/disabling controls.
|
|
pass
|