aggiunta su ppi la visualizzazione dell'antenna che si muove

This commit is contained in:
VALLONGOL 2025-10-31 10:10:38 +01:00
parent 20b7fd41ad
commit 34a6737fcc
4 changed files with 395 additions and 0 deletions

View File

@ -56,6 +56,10 @@ class SimulationStateHub:
# Summary throttle to avoid flooding logs while still providing throughput info
self._last_real_summary_time = time.monotonic()
self._real_summary_interval_s = 1.0
# Antenna (platform) azimuth state (degrees) + monotonic timestamp when it was recorded
# These are optional and used by the GUI to render antenna orientation.
self._antenna_azimuth_deg = None
self._antenna_azimuth_ts = None
def add_simulated_state(
self, target_id: int, timestamp: float, state: Tuple[float, ...]
@ -241,6 +245,53 @@ class SimulationStateHub:
# On error, do nothing (silently ignore invalid heading)
pass
def set_antenna_azimuth(
self, azimuth_deg: float, timestamp: Optional[float] = None
):
"""
Store the latest antenna (platform) azimuth in degrees along with an
optional monotonic timestamp. The GUI can retrieve this to render the
antenna orientation and smoothly animate between updates.
Args:
azimuth_deg: Azimuth value in degrees (0 = North, positive CCW)
timestamp: Optional monotonic timestamp (defaults to time.monotonic())
"""
try:
ts = float(timestamp) if timestamp is not None else time.monotonic()
az = float(azimuth_deg) % 360
except Exception:
return
with self._lock:
self._antenna_azimuth_deg = az
self._antenna_azimuth_ts = ts
def get_antenna_azimuth(self) -> Tuple[Optional[float], Optional[float]]:
"""
Returns a tuple (azimuth_deg, timestamp) representing the last-known
antenna azimuth and the monotonic timestamp when it was recorded.
If not available, returns (None, None).
"""
with self._lock:
return (self._antenna_azimuth_deg, self._antenna_azimuth_ts)
# Backwards-compatible aliases for existing callers. These delegate to the
# new antenna-named methods so external code using the older names continues
# to work.
def set_platform_azimuth(
self, azimuth_deg: float, timestamp: Optional[float] = None
):
try:
self.set_antenna_azimuth(azimuth_deg, timestamp=timestamp)
except Exception:
pass
def get_platform_azimuth(self) -> Tuple[Optional[float], Optional[float]]:
try:
return self.get_antenna_azimuth()
except Exception:
return (None, None)
def get_real_heading(self, target_id: int) -> Optional[float]:
"""
Retrieve the last stored real heading for a target, or None if not set.

View File

@ -954,6 +954,39 @@ class MainView(tk.Tk):
# self.logger.debug("MainView: received hub refresh. Updating real targets.")
display_data = self._build_display_data_from_hub()
self.ppi_widget.update_real_targets(display_data.get("real", []))
# Also propagate platform/antenna azimuth (if available) so the PPI
# can render the antenna orientation. The hub stores a (az_deg, ts)
# tuple via set_platform_azimuth().
try:
if (
hasattr(self, "simulation_hub")
and self.simulation_hub is not None
and hasattr(self.ppi_widget, "update_antenna_azimuth")
):
try:
# Prefer the new API name, fall back to the legacy
if hasattr(self.simulation_hub, "get_antenna_azimuth"):
az_deg, az_ts = (
self.simulation_hub.get_antenna_azimuth()
)
else:
az_deg, az_ts = (
self.simulation_hub.get_platform_azimuth()
)
if az_deg is not None:
# pass the hub-provided timestamp if available
self.ppi_widget.update_antenna_azimuth(
az_deg, timestamp=az_ts
)
except Exception:
# don't allow GUI update failures to interrupt queue processing
self.logger.debug(
"Failed to propagate antenna azimuth to PPI",
exc_info=True,
)
except Exception:
pass
else:
# This is an update with simulated targets from the engine.

View File

@ -272,6 +272,57 @@ class DebugPayloadRouter:
struct = {"scenario": scenario_dict, "targets": targets_list}
json_bytes = bytearray(json.dumps(struct, indent=2).encode("utf-8"))
self._update_last_payload("RIS_STATUS_JSON", json_bytes)
# Propagate antenna azimuth into the hub so the GUI can render
# the antenna orientation. Prefer the RIS field `ant_nav_az` (this
# is the antenna navigation azimuth). For backward compatibility
# fall back to `platform_azimuth` if `ant_nav_az` is not present.
try:
plat = None
if "ant_nav_az" in scenario_dict:
plat = scenario_dict.get("ant_nav_az")
elif "platform_azimuth" in scenario_dict:
plat = scenario_dict.get("platform_azimuth")
if (
plat is not None
and self._hub
and hasattr(self._hub, "set_platform_azimuth")
):
try:
val = float(plat)
# If the value looks like radians (<= ~2*pi) convert to degrees
if abs(val) <= (2 * math.pi * 1.1):
deg = math.degrees(val)
else:
deg = val
recv_ts = (
reception_timestamp
if "reception_timestamp" in locals()
else time.monotonic()
)
try:
# New API: set_antenna_azimuth
if hasattr(self._hub, "set_antenna_azimuth"):
self._hub.set_antenna_azimuth(
deg, timestamp=recv_ts
)
else:
# Fallback to legacy name if present
self._hub.set_platform_azimuth(
deg, timestamp=recv_ts
)
except Exception:
self._logger.debug(
"Failed to set antenna/platform azimuth on hub",
exc_info=True,
)
except Exception:
pass
except Exception:
self._logger.debug(
"Error while extracting antenna azimuth from RIS payload",
exc_info=True,
)
except Exception:
self._logger.exception("Failed to generate text/JSON for RIS debug view.")

View File

@ -60,6 +60,8 @@ class PPIDisplay(ttk.Frame):
self.show_sim_trail_var = tk.BooleanVar(value=False)
# Default: do not show real trails unless the user enables them.
self.show_real_trail_var = tk.BooleanVar(value=False)
# Antenna animate toggle: when False the antenna is hidden; when True it is shown and animated
self.animate_antenna_var = tk.BooleanVar(value=True)
self.canvas = None
self._create_controls()
self._create_plot()
@ -67,6 +69,17 @@ class PPIDisplay(ttk.Frame):
self._real_update_timestamps = collections.deque(maxlen=10000)
self._last_update_summary_time = time.monotonic()
self._update_summary_interval_s = 1.0
# Antenna/Platform visualization state used to animate a moving
# dashed line indicating current antenna azimuth.
self._antenna_state = {
"last_az_deg": None,
"last_ts": None,
"next_az_deg": None,
"next_ts": None,
"animating": False,
"tick_ms": 33, # ~30 FPS animation cadence
}
self._antenna_line_artist = None
def _on_display_options_changed(self):
# A full redraw is needed, but we don't have the last data sets.
@ -127,6 +140,14 @@ class PPIDisplay(ttk.Frame):
command=self._on_display_options_changed,
)
cb_real_trail.grid(row=1, column=1, sticky="w", padx=5)
# Antenna animate toggle (single Checkbutton)
cb_antenna = ttk.Checkbutton(
options_frame,
text="Animate Antenna",
variable=self.animate_antenna_var,
command=self._on_antenna_animate_changed,
)
cb_antenna.grid(row=2, column=0, columnspan=2, sticky="w", padx=5, pady=(6, 0))
legend_frame = ttk.Frame(top_frame)
legend_frame.pack(side=tk.RIGHT, padx=(10, 5))
sim_sw = tk.Canvas(legend_frame, width=16, height=12, highlightthickness=0)
@ -165,6 +186,12 @@ class PPIDisplay(ttk.Frame):
(self._scan_line_2,) = self.ax.plot(
[-limit_rad, -limit_rad], [0, self.max_range], "y--", linewidth=1
)
# Antenna current azimuth (dashed light-gray line). It will be
# animated via update_antenna_azimuth() calls which interpolate
# between timestamped azimuth updates for smooth motion.
(self._antenna_line_artist,) = self.ax.plot(
[], [], color="lightgray", linestyle="--", linewidth=1.2, alpha=0.85
)
self.canvas = FigureCanvasTkAgg(fig, master=self)
self.canvas.draw()
self.canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=True)
@ -374,6 +401,17 @@ class PPIDisplay(ttk.Frame):
limit_rad = np.deg2rad(self.scan_limit_deg)
self._scan_line_1.set_data([limit_rad, limit_rad], [0, max_r])
self._scan_line_2.set_data([-limit_rad, -limit_rad], [0, max_r])
# Ensure antenna line extends to the updated range limit
try:
if self._antenna_line_artist is not None:
data = self._antenna_line_artist.get_data()
if data and len(data) == 2:
thetas, rs = data
if len(thetas) >= 1:
theta = thetas[0]
self._antenna_line_artist.set_data([theta, theta], [0, max_r])
except Exception:
pass
def _on_range_selected(self, event=None):
self.ax.set_ylim(0, self.range_var.get())
@ -381,12 +419,234 @@ class PPIDisplay(ttk.Frame):
if self.canvas:
self.canvas.draw()
def _on_antenna_animate_changed(self):
"""
Callback when the animate checkbox toggles. When unchecked the antenna
is hidden; when checked the antenna is shown and animation resumes if
a pending interpolation exists.
"""
try:
st = self._antenna_state
enabled = (
getattr(self, "animate_antenna_var", None)
and self.animate_antenna_var.get()
)
if not enabled:
# Hide the antenna and stop animating
st["animating"] = False
if self._antenna_line_artist is not None:
try:
self._antenna_line_artist.set_data([], [])
except Exception:
try:
self._antenna_line_artist.remove()
self._antenna_line_artist = None
except Exception:
pass
if self.canvas:
try:
self.canvas.draw_idle()
except Exception:
self.canvas.draw()
return
# If enabled: resume animation if there's a pending interval
last_ts = st.get("last_ts")
next_ts = st.get("next_ts")
if last_ts is not None and next_ts is not None and next_ts > last_ts:
if not st.get("animating"):
st["animating"] = True
try:
self.after(st.get("tick_ms", 33), self._antenna_animation_step)
except Exception:
st["animating"] = False
else:
# Nothing to interpolate: render the most recent azimuth immediately
cur = st.get("last_az_deg") or st.get("next_az_deg")
if cur is not None:
self._render_antenna_line(cur)
except Exception:
pass
def clear_previews(self):
for artist in self.preview_artists:
artist.set_data([], [])
if self.canvas:
self.canvas.draw()
# -------------------- Antenna visualization & interpolation --------------------
def update_antenna_azimuth(self, az_deg: float, timestamp: float = None):
"""
Receive a new platform/antenna azimuth (degrees) with an optional
monotonic timestamp. The display will interpolate between the last
known azimuth and this new azimuth over the time interval to provide
a smooth animation.
"""
try:
now = time.monotonic()
ts = float(timestamp) if timestamp is not None else now
az = float(az_deg) % 360
except Exception:
return
st = self._antenna_state
# If antenna animate is disabled, update stored azimuth but hide the antenna
try:
if (
getattr(self, "animate_antenna_var", None) is not None
and not self.animate_antenna_var.get()
):
st["last_az_deg"] = az
st["last_ts"] = ts
st["next_az_deg"] = az
st["next_ts"] = ts
st["animating"] = False
# Hide the visible antenna line if present
if self._antenna_line_artist is not None:
try:
self._antenna_line_artist.set_data([], [])
except Exception:
try:
self._antenna_line_artist.remove()
self._antenna_line_artist = None
except Exception:
pass
if self.canvas:
try:
self.canvas.draw_idle()
except Exception:
self.canvas.draw()
return
except Exception:
pass
# If no previous sample exists, initialize both last and next to this value
if st["last_az_deg"] is None or st["last_ts"] is None:
st["last_az_deg"] = az
st["last_ts"] = ts
st["next_az_deg"] = az
st["next_ts"] = ts
# Render immediately
self._render_antenna_line(az)
return
# Compute the current interpolated azimuth at 'now' and set as last
# so the new interpolation starts from the on-screen position.
cur_az = st["last_az_deg"]
cur_ts = st["last_ts"]
next_az = st.get("next_az_deg")
next_ts = st.get("next_ts")
# If there is an outstanding next sample in the future, compute
# current interpolated value to be the new last.
try:
if next_az is not None and next_ts is not None and next_ts > cur_ts:
now_t = now
frac = 0.0
if next_ts > cur_ts:
frac = max(
0.0, min(1.0, (now_t - cur_ts) / float(next_ts - cur_ts))
)
# Shortest-angle interpolation
a0 = cur_az
a1 = next_az
diff = ((a1 - a0 + 180) % 360) - 180
interp = (a0 + diff * frac) % 360
cur_az = interp
cur_ts = now_t
except Exception:
# If interpolation fails, fall back to last known
cur_az = st["last_az_deg"]
cur_ts = st["last_ts"]
# Set new last and next values for upcoming animation
st["last_az_deg"] = cur_az
st["last_ts"] = cur_ts
st["next_az_deg"] = az
st["next_ts"] = ts
# Start the animation loop if not already running
if not st["animating"]:
st["animating"] = True
try:
self.after(st["tick_ms"], self._antenna_animation_step)
except Exception:
st["animating"] = False
def _antenna_animation_step(self):
st = self._antenna_state
try:
# If next and last are the same timestamp, snap to next
last_ts = st.get("last_ts")
next_ts = st.get("next_ts")
last_az = st.get("last_az_deg")
next_az = st.get("next_az_deg")
now = time.monotonic()
if last_az is None or next_az is None or last_ts is None or next_ts is None:
st["animating"] = False
return
if next_ts <= last_ts or abs(next_ts - last_ts) < 1e-6:
# No interval: snap
cur = next_az % 360
st["last_az_deg"] = cur
st["last_ts"] = next_ts
st["animating"] = False
self._render_antenna_line(cur)
return
frac = max(0.0, min(1.0, (now - last_ts) / float(next_ts - last_ts)))
# Shortest-angle interpolation across 0/360 boundary
a0 = last_az
a1 = next_az
diff = ((a1 - a0 + 180) % 360) - 180
cur = (a0 + diff * frac) % 360
self._render_antenna_line(cur)
if frac >= 1.0:
# Reached the target
st["last_az_deg"] = next_az % 360
st["last_ts"] = next_ts
st["animating"] = False
return
except Exception:
st["animating"] = False
# Schedule next tick if still animating
try:
if st.get("animating"):
self.after(st.get("tick_ms", 33), self._antenna_animation_step)
except Exception:
st["animating"] = False
def _render_antenna_line(self, az_deg: float):
"""
Render the antenna (platform) azimuth line on the PPI using the
current display conventions (theta = deg -> radians) and current range limit.
"""
try:
theta = np.deg2rad(float(az_deg) % 360)
max_r = self.ax.get_ylim()[1]
if self._antenna_line_artist is None:
# Create artist lazily if missing
(self._antenna_line_artist,) = self.ax.plot(
[], [], color="lightgray", linestyle="--", linewidth=1.2, alpha=0.85
)
# Plot as a radial line from r=0 to r=max_r
self._antenna_line_artist.set_data([theta, theta], [0, max_r])
# Use draw_idle for better GUI responsiveness
if self.canvas:
try:
self.canvas.draw_idle()
except Exception:
# Fall back to immediate draw
self.canvas.draw()
except Exception:
pass
def draw_trajectory_preview(self, waypoints: List[Waypoint], use_spline: bool):
self.clear_previews()
self.clear_trails()