diff --git a/target_simulator/analysis/simulation_state_hub.py b/target_simulator/analysis/simulation_state_hub.py index 88282f0..eddd106 100644 --- a/target_simulator/analysis/simulation_state_hub.py +++ b/target_simulator/analysis/simulation_state_hub.py @@ -56,6 +56,10 @@ class SimulationStateHub: # Summary throttle to avoid flooding logs while still providing throughput info self._last_real_summary_time = time.monotonic() self._real_summary_interval_s = 1.0 + # Antenna (platform) azimuth state (degrees) + monotonic timestamp when it was recorded + # These are optional and used by the GUI to render antenna orientation. + self._antenna_azimuth_deg = None + self._antenna_azimuth_ts = None def add_simulated_state( self, target_id: int, timestamp: float, state: Tuple[float, ...] @@ -241,6 +245,53 @@ class SimulationStateHub: # On error, do nothing (silently ignore invalid heading) pass + def set_antenna_azimuth( + self, azimuth_deg: float, timestamp: Optional[float] = None + ): + """ + Store the latest antenna (platform) azimuth in degrees along with an + optional monotonic timestamp. The GUI can retrieve this to render the + antenna orientation and smoothly animate between updates. + + Args: + azimuth_deg: Azimuth value in degrees (0 = North, positive CCW) + timestamp: Optional monotonic timestamp (defaults to time.monotonic()) + """ + try: + ts = float(timestamp) if timestamp is not None else time.monotonic() + az = float(azimuth_deg) % 360 + except Exception: + return + with self._lock: + self._antenna_azimuth_deg = az + self._antenna_azimuth_ts = ts + + def get_antenna_azimuth(self) -> Tuple[Optional[float], Optional[float]]: + """ + Returns a tuple (azimuth_deg, timestamp) representing the last-known + antenna azimuth and the monotonic timestamp when it was recorded. + If not available, returns (None, None). + """ + with self._lock: + return (self._antenna_azimuth_deg, self._antenna_azimuth_ts) + + # Backwards-compatible aliases for existing callers. These delegate to the + # new antenna-named methods so external code using the older names continues + # to work. + def set_platform_azimuth( + self, azimuth_deg: float, timestamp: Optional[float] = None + ): + try: + self.set_antenna_azimuth(azimuth_deg, timestamp=timestamp) + except Exception: + pass + + def get_platform_azimuth(self) -> Tuple[Optional[float], Optional[float]]: + try: + return self.get_antenna_azimuth() + except Exception: + return (None, None) + def get_real_heading(self, target_id: int) -> Optional[float]: """ Retrieve the last stored real heading for a target, or None if not set. diff --git a/target_simulator/gui/main_view.py b/target_simulator/gui/main_view.py index 7e9147f..3e80273 100644 --- a/target_simulator/gui/main_view.py +++ b/target_simulator/gui/main_view.py @@ -954,6 +954,39 @@ class MainView(tk.Tk): # self.logger.debug("MainView: received hub refresh. Updating real targets.") display_data = self._build_display_data_from_hub() self.ppi_widget.update_real_targets(display_data.get("real", [])) + # Also propagate platform/antenna azimuth (if available) so the PPI + # can render the antenna orientation. The hub stores a (az_deg, ts) + # tuple via set_platform_azimuth(). + try: + if ( + hasattr(self, "simulation_hub") + and self.simulation_hub is not None + and hasattr(self.ppi_widget, "update_antenna_azimuth") + ): + try: + # Prefer the new API name, fall back to the legacy + if hasattr(self.simulation_hub, "get_antenna_azimuth"): + az_deg, az_ts = ( + self.simulation_hub.get_antenna_azimuth() + ) + else: + az_deg, az_ts = ( + self.simulation_hub.get_platform_azimuth() + ) + + if az_deg is not None: + # pass the hub-provided timestamp if available + self.ppi_widget.update_antenna_azimuth( + az_deg, timestamp=az_ts + ) + except Exception: + # don't allow GUI update failures to interrupt queue processing + self.logger.debug( + "Failed to propagate antenna azimuth to PPI", + exc_info=True, + ) + except Exception: + pass else: # This is an update with simulated targets from the engine. diff --git a/target_simulator/gui/payload_router.py b/target_simulator/gui/payload_router.py index 6e46c63..fb1e79c 100644 --- a/target_simulator/gui/payload_router.py +++ b/target_simulator/gui/payload_router.py @@ -272,6 +272,57 @@ class DebugPayloadRouter: struct = {"scenario": scenario_dict, "targets": targets_list} json_bytes = bytearray(json.dumps(struct, indent=2).encode("utf-8")) self._update_last_payload("RIS_STATUS_JSON", json_bytes) + # Propagate antenna azimuth into the hub so the GUI can render + # the antenna orientation. Prefer the RIS field `ant_nav_az` (this + # is the antenna navigation azimuth). For backward compatibility + # fall back to `platform_azimuth` if `ant_nav_az` is not present. + try: + plat = None + if "ant_nav_az" in scenario_dict: + plat = scenario_dict.get("ant_nav_az") + elif "platform_azimuth" in scenario_dict: + plat = scenario_dict.get("platform_azimuth") + + if ( + plat is not None + and self._hub + and hasattr(self._hub, "set_platform_azimuth") + ): + try: + val = float(plat) + # If the value looks like radians (<= ~2*pi) convert to degrees + if abs(val) <= (2 * math.pi * 1.1): + deg = math.degrees(val) + else: + deg = val + recv_ts = ( + reception_timestamp + if "reception_timestamp" in locals() + else time.monotonic() + ) + try: + # New API: set_antenna_azimuth + if hasattr(self._hub, "set_antenna_azimuth"): + self._hub.set_antenna_azimuth( + deg, timestamp=recv_ts + ) + else: + # Fallback to legacy name if present + self._hub.set_platform_azimuth( + deg, timestamp=recv_ts + ) + except Exception: + self._logger.debug( + "Failed to set antenna/platform azimuth on hub", + exc_info=True, + ) + except Exception: + pass + except Exception: + self._logger.debug( + "Error while extracting antenna azimuth from RIS payload", + exc_info=True, + ) except Exception: self._logger.exception("Failed to generate text/JSON for RIS debug view.") diff --git a/target_simulator/gui/ppi_display.py b/target_simulator/gui/ppi_display.py index 26cf9d3..67f3c62 100644 --- a/target_simulator/gui/ppi_display.py +++ b/target_simulator/gui/ppi_display.py @@ -60,6 +60,8 @@ class PPIDisplay(ttk.Frame): self.show_sim_trail_var = tk.BooleanVar(value=False) # Default: do not show real trails unless the user enables them. self.show_real_trail_var = tk.BooleanVar(value=False) + # Antenna animate toggle: when False the antenna is hidden; when True it is shown and animated + self.animate_antenna_var = tk.BooleanVar(value=True) self.canvas = None self._create_controls() self._create_plot() @@ -67,6 +69,17 @@ class PPIDisplay(ttk.Frame): self._real_update_timestamps = collections.deque(maxlen=10000) self._last_update_summary_time = time.monotonic() self._update_summary_interval_s = 1.0 + # Antenna/Platform visualization state used to animate a moving + # dashed line indicating current antenna azimuth. + self._antenna_state = { + "last_az_deg": None, + "last_ts": None, + "next_az_deg": None, + "next_ts": None, + "animating": False, + "tick_ms": 33, # ~30 FPS animation cadence + } + self._antenna_line_artist = None def _on_display_options_changed(self): # A full redraw is needed, but we don't have the last data sets. @@ -127,6 +140,14 @@ class PPIDisplay(ttk.Frame): command=self._on_display_options_changed, ) cb_real_trail.grid(row=1, column=1, sticky="w", padx=5) + # Antenna animate toggle (single Checkbutton) + cb_antenna = ttk.Checkbutton( + options_frame, + text="Animate Antenna", + variable=self.animate_antenna_var, + command=self._on_antenna_animate_changed, + ) + cb_antenna.grid(row=2, column=0, columnspan=2, sticky="w", padx=5, pady=(6, 0)) legend_frame = ttk.Frame(top_frame) legend_frame.pack(side=tk.RIGHT, padx=(10, 5)) sim_sw = tk.Canvas(legend_frame, width=16, height=12, highlightthickness=0) @@ -165,6 +186,12 @@ class PPIDisplay(ttk.Frame): (self._scan_line_2,) = self.ax.plot( [-limit_rad, -limit_rad], [0, self.max_range], "y--", linewidth=1 ) + # Antenna current azimuth (dashed light-gray line). It will be + # animated via update_antenna_azimuth() calls which interpolate + # between timestamped azimuth updates for smooth motion. + (self._antenna_line_artist,) = self.ax.plot( + [], [], color="lightgray", linestyle="--", linewidth=1.2, alpha=0.85 + ) self.canvas = FigureCanvasTkAgg(fig, master=self) self.canvas.draw() self.canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=True) @@ -374,6 +401,17 @@ class PPIDisplay(ttk.Frame): limit_rad = np.deg2rad(self.scan_limit_deg) self._scan_line_1.set_data([limit_rad, limit_rad], [0, max_r]) self._scan_line_2.set_data([-limit_rad, -limit_rad], [0, max_r]) + # Ensure antenna line extends to the updated range limit + try: + if self._antenna_line_artist is not None: + data = self._antenna_line_artist.get_data() + if data and len(data) == 2: + thetas, rs = data + if len(thetas) >= 1: + theta = thetas[0] + self._antenna_line_artist.set_data([theta, theta], [0, max_r]) + except Exception: + pass def _on_range_selected(self, event=None): self.ax.set_ylim(0, self.range_var.get()) @@ -381,12 +419,234 @@ class PPIDisplay(ttk.Frame): if self.canvas: self.canvas.draw() + def _on_antenna_animate_changed(self): + """ + Callback when the animate checkbox toggles. When unchecked the antenna + is hidden; when checked the antenna is shown and animation resumes if + a pending interpolation exists. + """ + try: + st = self._antenna_state + enabled = ( + getattr(self, "animate_antenna_var", None) + and self.animate_antenna_var.get() + ) + if not enabled: + # Hide the antenna and stop animating + st["animating"] = False + if self._antenna_line_artist is not None: + try: + self._antenna_line_artist.set_data([], []) + except Exception: + try: + self._antenna_line_artist.remove() + self._antenna_line_artist = None + except Exception: + pass + if self.canvas: + try: + self.canvas.draw_idle() + except Exception: + self.canvas.draw() + return + + # If enabled: resume animation if there's a pending interval + last_ts = st.get("last_ts") + next_ts = st.get("next_ts") + if last_ts is not None and next_ts is not None and next_ts > last_ts: + if not st.get("animating"): + st["animating"] = True + try: + self.after(st.get("tick_ms", 33), self._antenna_animation_step) + except Exception: + st["animating"] = False + else: + # Nothing to interpolate: render the most recent azimuth immediately + cur = st.get("last_az_deg") or st.get("next_az_deg") + if cur is not None: + self._render_antenna_line(cur) + except Exception: + pass + def clear_previews(self): for artist in self.preview_artists: artist.set_data([], []) if self.canvas: self.canvas.draw() + # -------------------- Antenna visualization & interpolation -------------------- + def update_antenna_azimuth(self, az_deg: float, timestamp: float = None): + """ + Receive a new platform/antenna azimuth (degrees) with an optional + monotonic timestamp. The display will interpolate between the last + known azimuth and this new azimuth over the time interval to provide + a smooth animation. + """ + try: + now = time.monotonic() + ts = float(timestamp) if timestamp is not None else now + az = float(az_deg) % 360 + except Exception: + return + + st = self._antenna_state + + # If antenna animate is disabled, update stored azimuth but hide the antenna + try: + if ( + getattr(self, "animate_antenna_var", None) is not None + and not self.animate_antenna_var.get() + ): + st["last_az_deg"] = az + st["last_ts"] = ts + st["next_az_deg"] = az + st["next_ts"] = ts + st["animating"] = False + # Hide the visible antenna line if present + if self._antenna_line_artist is not None: + try: + self._antenna_line_artist.set_data([], []) + except Exception: + try: + self._antenna_line_artist.remove() + self._antenna_line_artist = None + except Exception: + pass + if self.canvas: + try: + self.canvas.draw_idle() + except Exception: + self.canvas.draw() + return + except Exception: + pass + + # If no previous sample exists, initialize both last and next to this value + if st["last_az_deg"] is None or st["last_ts"] is None: + st["last_az_deg"] = az + st["last_ts"] = ts + st["next_az_deg"] = az + st["next_ts"] = ts + # Render immediately + self._render_antenna_line(az) + return + + # Compute the current interpolated azimuth at 'now' and set as last + # so the new interpolation starts from the on-screen position. + cur_az = st["last_az_deg"] + cur_ts = st["last_ts"] + next_az = st.get("next_az_deg") + next_ts = st.get("next_ts") + + # If there is an outstanding next sample in the future, compute + # current interpolated value to be the new last. + try: + if next_az is not None and next_ts is not None and next_ts > cur_ts: + now_t = now + frac = 0.0 + if next_ts > cur_ts: + frac = max( + 0.0, min(1.0, (now_t - cur_ts) / float(next_ts - cur_ts)) + ) + # Shortest-angle interpolation + a0 = cur_az + a1 = next_az + diff = ((a1 - a0 + 180) % 360) - 180 + interp = (a0 + diff * frac) % 360 + cur_az = interp + cur_ts = now_t + except Exception: + # If interpolation fails, fall back to last known + cur_az = st["last_az_deg"] + cur_ts = st["last_ts"] + + # Set new last and next values for upcoming animation + st["last_az_deg"] = cur_az + st["last_ts"] = cur_ts + st["next_az_deg"] = az + st["next_ts"] = ts + + # Start the animation loop if not already running + if not st["animating"]: + st["animating"] = True + try: + self.after(st["tick_ms"], self._antenna_animation_step) + except Exception: + st["animating"] = False + + def _antenna_animation_step(self): + st = self._antenna_state + try: + # If next and last are the same timestamp, snap to next + last_ts = st.get("last_ts") + next_ts = st.get("next_ts") + last_az = st.get("last_az_deg") + next_az = st.get("next_az_deg") + + now = time.monotonic() + + if last_az is None or next_az is None or last_ts is None or next_ts is None: + st["animating"] = False + return + + if next_ts <= last_ts or abs(next_ts - last_ts) < 1e-6: + # No interval: snap + cur = next_az % 360 + st["last_az_deg"] = cur + st["last_ts"] = next_ts + st["animating"] = False + self._render_antenna_line(cur) + return + + frac = max(0.0, min(1.0, (now - last_ts) / float(next_ts - last_ts))) + # Shortest-angle interpolation across 0/360 boundary + a0 = last_az + a1 = next_az + diff = ((a1 - a0 + 180) % 360) - 180 + cur = (a0 + diff * frac) % 360 + + self._render_antenna_line(cur) + + if frac >= 1.0: + # Reached the target + st["last_az_deg"] = next_az % 360 + st["last_ts"] = next_ts + st["animating"] = False + return + except Exception: + st["animating"] = False + # Schedule next tick if still animating + try: + if st.get("animating"): + self.after(st.get("tick_ms", 33), self._antenna_animation_step) + except Exception: + st["animating"] = False + + def _render_antenna_line(self, az_deg: float): + """ + Render the antenna (platform) azimuth line on the PPI using the + current display conventions (theta = deg -> radians) and current range limit. + """ + try: + theta = np.deg2rad(float(az_deg) % 360) + max_r = self.ax.get_ylim()[1] + if self._antenna_line_artist is None: + # Create artist lazily if missing + (self._antenna_line_artist,) = self.ax.plot( + [], [], color="lightgray", linestyle="--", linewidth=1.2, alpha=0.85 + ) + # Plot as a radial line from r=0 to r=max_r + self._antenna_line_artist.set_data([theta, theta], [0, max_r]) + # Use draw_idle for better GUI responsiveness + if self.canvas: + try: + self.canvas.draw_idle() + except Exception: + # Fall back to immediate draw + self.canvas.draw() + except Exception: + pass + def draw_trajectory_preview(self, waypoints: List[Waypoint], use_spline: bool): self.clear_previews() self.clear_trails()