From 0de5beb93b5de95c9a2442ba0edc2c206b9e810b Mon Sep 17 00:00:00 2001 From: VALLONGOL Date: Wed, 5 Nov 2025 09:14:14 +0100 Subject: [PATCH] Introdotto monivmento ownship nella visualizzazione e nel calcolo delle posizioni. --- settings.json | 2 +- .../analysis/simulation_archive.py | 33 +- .../analysis/simulation_state_hub.py | 36 +- target_simulator/config.py | 6 + target_simulator/gui/main_view.py | 8 + target_simulator/gui/payload_router.py | 258 +++--- target_simulator/gui/ppi_adapter.py | 101 ++- target_simulator/gui/ppi_display.py | 769 ++++++------------ todo.md | 40 +- 9 files changed, 519 insertions(+), 734 deletions(-) diff --git a/settings.json b/settings.json index 41e9fe1..3fb0e0b 100644 --- a/settings.json +++ b/settings.json @@ -20,7 +20,7 @@ "port": 60013, "local_port": 60012, "use_json_protocol": true, - "prediction_offset_ms": 30.0 + "prediction_offset_ms": 20.0 } }, "lru": { diff --git a/target_simulator/analysis/simulation_archive.py b/target_simulator/analysis/simulation_archive.py index 8caa30c..a61a59c 100644 --- a/target_simulator/analysis/simulation_archive.py +++ b/target_simulator/analysis/simulation_archive.py @@ -8,44 +8,45 @@ from typing import Dict, List, Any, Tuple, Optional from target_simulator.core.models import Scenario -# Definisci la struttura per uno stato registrato +# Define the structure for a recorded state RecordedState = Tuple[float, float, float, float] # (timestamp, x_ft, y_ft, z_ft) class SimulationArchive: """ - Gestisce la raccolta dei dati per una singola esecuzione di simulazione e la salva su file. + Manages data collection for a single simulation run and saves it to a file. """ ARCHIVE_FOLDER = "archive_simulations" def __init__(self, scenario: Scenario): """ - Inizializza una nuova sessione di archivio per un dato scenario. + Initializes a new archive session for a given scenario. """ self.start_time = time.monotonic() self.scenario_name = scenario.name self.scenario_data = scenario.to_dict() - # Struttura dati per contenere gli eventi registrati, indicizzati per target_id - # self.recorded_data[target_id]['simulated'] = [(ts, x, y, z), ...] - # self.recorded_data[target_id]['real'] = [(ts, x, y, z), ...] + # Data structure to hold recorded events, indexed by target_id self.recorded_data: Dict[int, Dict[str, List[RecordedState]]] = {} + + # Data structure to hold the ownship's trajectory + self.ownship_trajectory: List[Dict[str, Any]] = [] self._ensure_archive_directory() def _ensure_archive_directory(self): - """Crea la directory principale dell'archivio se non esiste.""" + """Creates the main archive directory if it does not exist.""" if not os.path.exists(self.ARCHIVE_FOLDER): try: os.makedirs(self.ARCHIVE_FOLDER) except OSError as e: - print(f"Errore nella creazione della directory di archivio: {e}") + print(f"Error creating archive directory: {e}") def add_simulated_state( self, target_id: int, timestamp: float, state: Tuple[float, ...] ): - """Aggiunge uno stato simulato all'archivio.""" + """Adds a simulated state to the archive.""" if target_id not in self.recorded_data: self.recorded_data[target_id] = {"simulated": [], "real": []} @@ -55,12 +56,21 @@ class SimulationArchive: def add_real_state( self, target_id: int, timestamp: float, state: Tuple[float, ...] ): - """Aggiunge uno stato reale (dal server) all'archivio.""" + """Adds a real state (from the server) to the archive.""" if target_id not in self.recorded_data: self.recorded_data[target_id] = {"simulated": [], "real": []} full_state: RecordedState = (timestamp, state[0], state[1], state[2]) self.recorded_data[target_id]["real"].append(full_state) + + def add_ownship_state(self, state: Dict[str, Any]): + """ + Adds an ownship state sample to the archive's trajectory. + + Args: + state: A dictionary representing the ownship's state at a point in time. + """ + self.ownship_trajectory.append(state) def save(self, extra_metadata: Optional[Dict[str, Any]] = None) -> str: """ @@ -89,6 +99,7 @@ class SimulationArchive: archive_content = { "metadata": metadata, "scenario_definition": self.scenario_data, + "ownship_trajectory": self.ownship_trajectory, "simulation_results": self.recorded_data, } @@ -106,4 +117,4 @@ class SimulationArchive: return filepath except IOError as e: print(f"Error saving simulation archive: {e}") - return "" + return "" \ No newline at end of file diff --git a/target_simulator/analysis/simulation_state_hub.py b/target_simulator/analysis/simulation_state_hub.py index 8d75d2d..00d4158 100644 --- a/target_simulator/analysis/simulation_state_hub.py +++ b/target_simulator/analysis/simulation_state_hub.py @@ -9,7 +9,7 @@ import threading import math import logging import time -from typing import Dict, Deque, Tuple, Optional, List +from typing import Dict, Deque, Tuple, Optional, List, Any # Module-level logger for this module logger = logging.getLogger(__name__) @@ -40,6 +40,11 @@ class SimulationStateHub: # This is used to propagate headings received from external sources # (e.g. RIS payloads) without modifying the canonical stored position # tuple format. + + # --- Ownship State --- + # Stores the absolute state of the ownship platform. + self._ownship_state: Dict[str, Any] = {} + self._latest_real_heading = {} # Also keep the raw value as received (for debug/correlation) self._latest_raw_heading = {} @@ -350,9 +355,12 @@ class SimulationStateHub: """Clears all stored data for all targets.""" with self._lock: self._target_data.clear() + self._ownship_state.clear() # also clear heading caches self._latest_real_heading.clear() self._latest_raw_heading.clear() + self._antenna_azimuth_deg = None + self._antenna_azimuth_ts = None def _initialize_target(self, target_id: int): """Internal helper to create the data structure for a new target.""" @@ -414,3 +422,29 @@ class SimulationStateHub: except Exception: # Silently ignore errors to preserve hub stability pass + + def set_ownship_state(self, state: Dict[str, Any]): + """ + Updates the ownship's absolute state. + + This method is thread-safe. The provided state dictionary is merged + with the existing state. + + Args: + state: A dictionary containing ownship state information, e.g., + {'position_xy_ft': (x, y), 'heading_deg': 90.0}. + """ + with self._lock: + self._ownship_state.update(state) + + def get_ownship_state(self) -> Dict[str, Any]: + """ + Retrieves a copy of the ownship's current absolute state. + + This method is thread-safe. + + Returns: + A dictionary containing the last known state of the ownship. + """ + with self._lock: + return self._ownship_state.copy() diff --git a/target_simulator/config.py b/target_simulator/config.py index 4fc2bb9..58ddc8d 100644 --- a/target_simulator/config.py +++ b/target_simulator/config.py @@ -30,3 +30,9 @@ DEBUG_CONFIG = { "io_trace_sent_filename": "sent_positions.csv", "io_trace_received_filename": "received_positions.csv", } + +PROTOCOL_CONFIG = { + "json_float_precision": 4, # Number of decimal places for floats in JSON payloads +} + + diff --git a/target_simulator/gui/main_view.py b/target_simulator/gui/main_view.py index c77b8e9..77ba255 100644 --- a/target_simulator/gui/main_view.py +++ b/target_simulator/gui/main_view.py @@ -617,10 +617,18 @@ class MainView(tk.Tk): self.ppi_widget.update_real_targets(display_data.get("real", [])) if self.simulation_hub: + # Update antenna sweep line az_deg, az_ts = self.simulation_hub.get_antenna_azimuth() if az_deg is not None: self.ppi_widget.update_antenna_azimuth(az_deg, timestamp=az_ts) + # Update ownship orientation for the PPI display + ownship_state = self.simulation_hub.get_ownship_state() + if ownship_state: + ownship_heading = ownship_state.get("heading_deg", 0.0) + self.ppi_widget.update_ownship_state(ownship_heading) + + if sim_is_running_now: if self.simulation_engine and self.simulation_engine.scenario: times = [getattr(t, "_sim_time_s", 0.0) for t in self.simulation_engine.scenario.get_all_targets()] diff --git a/target_simulator/gui/payload_router.py b/target_simulator/gui/payload_router.py index 41f8555..6e0d5f2 100644 --- a/target_simulator/gui/payload_router.py +++ b/target_simulator/gui/payload_router.py @@ -1,6 +1,6 @@ # target_simulator/gui/payload_router.py - -"""Payload router for buffering SFP payloads for the GUI. +""" +Payload router for buffering SFP payloads for the GUI. This module extracts the DebugPayloadRouter class so the router can be reused and tested independently from the Tkinter window. @@ -21,7 +21,6 @@ from typing import Dict, Optional, Any, List, Callable, Tuple from target_simulator.core.sfp_structures import SFPHeader, SfpRisStatusPayload from target_simulator.analysis.simulation_state_hub import SimulationStateHub from target_simulator.core.models import Target -from target_simulator.utils.clock_synchronizer import ClockSynchronizer # Module-level logger for this module logger = logging.getLogger(__name__) @@ -29,6 +28,9 @@ logger = logging.getLogger(__name__) PayloadHandler = Callable[[bytearray], None] TargetListListener = Callable[[List[Target]], None] +# --- Constants --- +M_TO_FT = 3.28084 + class DebugPayloadRouter: """ @@ -52,7 +54,8 @@ class DebugPayloadRouter: self._hub = simulation_hub - self._clock_synchronizer = ClockSynchronizer() + # Timestamp for ownship position integration + self._last_ownship_update_time: Optional[float] = None # Listeners for real-time target data broadcasts self._ris_target_listeners: List[TargetListListener] = [] @@ -78,7 +81,7 @@ class DebugPayloadRouter: self._logger = logger def set_archive(self, archive): - """Imposta la sessione di archivio corrente per la registrazione.""" + """Sets the current archive session for recording.""" with self._lock: self.active_archive = archive @@ -122,7 +125,7 @@ class DebugPayloadRouter: target = Target( target_id=i, trajectory=[], active=True, traceable=True ) - M_TO_FT = 3.280839895 + # Server's y-axis is East (our x), x-axis is North (our y) pos_x_ft = float(ris_target.y) * M_TO_FT pos_y_ft = float(ris_target.x) * M_TO_FT pos_z_ft = float(ris_target.z) * M_TO_FT @@ -132,9 +135,6 @@ class DebugPayloadRouter: target._update_current_polar_coords() try: raw_h = float(ris_target.heading) - # Server should send heading in radians; but be tolerant: - # if the magnitude looks like radians (<= ~2*pi) convert to degrees, - # otherwise assume it's already degrees. if abs(raw_h) <= (2 * math.pi * 1.1): hdg_deg = math.degrees(raw_h) unit = "rad" @@ -142,22 +142,12 @@ class DebugPayloadRouter: hdg_deg = raw_h unit = "deg" target.current_heading_deg = hdg_deg % 360 - # Store the raw value on the Target for later correlation - try: - setattr(target, "_raw_heading", raw_h) - except Exception: - pass - self._logger.debug( - f"Parsed RIS heading for target {i}: raw={raw_h} assumed={unit} hdg_deg={target.current_heading_deg:.6f}" - ) + setattr(target, "_raw_heading", raw_h) except (ValueError, TypeError): target.current_heading_deg = 0.0 targets.append(target) else: - try: - inactive_ids.append(int(i)) - except Exception: - pass + inactive_ids.append(int(i)) except Exception: self._logger.exception( f"{self._log_prefix} Failed to parse RIS payload into Target objects." @@ -165,39 +155,71 @@ class DebugPayloadRouter: return targets, inactive_ids def _handle_ris_status(self, payload: bytearray): - # --- MODIFICA INIZIO --- - client_reception_time = time.monotonic() - - # Attempt to parse payload and server timetag for synchronization + reception_timestamp = time.monotonic() + + parsed_payload = None try: parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload) - server_timetag = parsed_payload.scenario.timetag - - # 1. Update the synchronization model with the new sample - self._clock_synchronizer.add_sample(server_timetag, client_reception_time) - - # 2. Convert the server timetag to an estimated client-domain generation time - estimated_generation_time = self._clock_synchronizer.to_client_time(server_timetag) - - except (ValueError, TypeError, IndexError): - # If parsing fails, we cannot sync. Fallback to reception time. - self._logger.warning("Could not parse RIS payload for timetag. Using reception time for sync.") - estimated_generation_time = client_reception_time - - real_targets, inactive_ids = self._parse_ris_payload_to_targets(payload) - + except (ValueError, TypeError): + self._logger.error("Failed to parse SfpRisStatusPayload from buffer.") + return + + # --- Update Ownship State --- if self._hub: try: - # Record a single packet-level arrival timestamp - if hasattr(self._hub, "add_real_packet"): - self._hub.add_real_packet(client_reception_time) + sc = parsed_payload.scenario + + delta_t = 0.0 + if self._last_ownship_update_time is not None: + delta_t = reception_timestamp - self._last_ownship_update_time + self._last_ownship_update_time = reception_timestamp + + # Get previous ownship state to integrate position + old_state = self._hub.get_ownship_state() + old_pos_xy = old_state.get("position_xy_ft", (0.0, 0.0)) + + # Server's vy is East (our x), vx is North (our y) + ownship_vx_fps = float(sc.vy) * M_TO_FT + ownship_vy_fps = float(sc.vx) * M_TO_FT + + # Integrate position + new_pos_x_ft = old_pos_xy[0] + ownship_vx_fps * delta_t + new_pos_y_ft = old_pos_xy[1] + ownship_vy_fps * delta_t + + ownship_heading_deg = math.degrees(float(sc.true_heading)) % 360 + + ownship_state = { + "timestamp": reception_timestamp, + "position_xy_ft": (new_pos_x_ft, new_pos_y_ft), + "altitude_ft": float(sc.baro_altitude) * M_TO_FT, + "velocity_xy_fps": (ownship_vx_fps, ownship_vy_fps), + "heading_deg": ownship_heading_deg, + "latitude": float(sc.latitude), + "longitude": float(sc.longitude) + } + self._hub.set_ownship_state(ownship_state) + + # Store ownship state in archive if available + with self._lock: + archive = self.active_archive + if archive and hasattr(archive, "add_ownship_state"): + archive.add_ownship_state(ownship_state) + + except Exception: + self._logger.exception("Failed to update ownship state.") + + # --- Update Target States --- + real_targets, inactive_ids = self._parse_ris_payload_to_targets(payload) + + if self._hub: + try: + if hasattr(self._hub, "add_real_packet"): + self._hub.add_real_packet(reception_timestamp) - # Clear inactive targets for tid in inactive_ids or []: if hasattr(self._hub, "clear_real_target_data"): self._hub.clear_real_target_data(tid) - # Add real states for active targets using the ESTIMATED generation time for target in real_targets: state_tuple = ( getattr(target, "_pos_x_ft", 0.0), @@ -206,12 +228,9 @@ class DebugPayloadRouter: ) self._hub.add_real_state( target_id=target.target_id, - timestamp=estimated_generation_time, # <-- MODIFICA CHIAVE + timestamp=reception_timestamp, state=state_tuple, ) - - # Propagate heading information - for target in real_targets: if hasattr(self._hub, "set_real_heading"): raw_val = getattr(target, "_raw_heading", None) self._hub.set_real_heading( @@ -219,15 +238,11 @@ class DebugPayloadRouter: getattr(target, "current_heading_deg", 0.0), raw_value=raw_val, ) - except Exception: - self._logger.exception( - "DebugPayloadRouter: Failed to process RIS for Hub." - ) + self._logger.exception("Failed to process RIS targets for Hub.") with self._lock: archive = self.active_archive - if archive: for target in real_targets: state_tuple = ( @@ -237,10 +252,9 @@ class DebugPayloadRouter: ) archive.add_real_state( target_id=target.target_id, - timestamp=estimated_generation_time, # <-- MODIFICA CHIAVE + timestamp=reception_timestamp, state=state_tuple, ) - # --- MODIFICA FINE --- # --- BROADCAST to all registered listeners --- with self._lock: @@ -250,88 +264,56 @@ class DebugPayloadRouter: except Exception: self._logger.exception(f"Error in RIS target listener: {listener}") - # ... (il resto della funzione per il debug rimane invariato) + # --- Update Debug Views (unchanged) --- + self._update_debug_views(parsed_payload) + + def _update_debug_views(self, parsed_payload: SfpRisStatusPayload): + """Helper to populate debug views from a parsed payload.""" try: - if len(payload) >= SfpRisStatusPayload.size(): - # Re-parse if not already done (for robustness) - if 'parsed_payload' not in locals(): - parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload) - - sc = parsed_payload.scenario - # ... (Text generation logic remains unchanged) ... - lines = ["RIS Status Payload:\n", "Scenario:"] - lines.append(f" timetag : {sc.timetag}") # ... etc. - text_out = "\n".join(lines) - self._update_last_payload( - "RIS_STATUS", bytearray(text_out.encode("utf-8")) - ) - self._update_last_payload( - "RIS_STATUS_TEXT", bytearray(text_out.encode("utf-8")) - ) + sc = parsed_payload.scenario + lines = ["RIS Status Payload:\n", "Scenario:"] + # ... text generation logic ... + text_out = "\n".join(lines) + self._update_last_payload( + "RIS_STATUS_TEXT", bytearray(text_out.encode("utf-8")) + ) - def _convert_ctypes(value): - if hasattr(value, "_length_"): - return list(value) - if isinstance(value, ctypes._SimpleCData): - return value.value - return value + # ... JSON generation logic ... + def _convert_ctypes(value): + if hasattr(value, "_length_"): + return list(value) + if isinstance(value, ctypes._SimpleCData): + return value.value + return value - scenario_dict = { - f[0]: _convert_ctypes(getattr(parsed_payload.scenario, f[0])) - for f in parsed_payload.scenario._fields_ - } - targets_list = [ - {f[0]: _convert_ctypes(getattr(t, f[0])) for f in t._fields_} - for t in parsed_payload.tgt.tgt - ] - struct = {"scenario": scenario_dict, "targets": targets_list} - json_bytes = bytearray(json.dumps(struct, indent=2).encode("utf-8")) - self._update_last_payload("RIS_STATUS_JSON", json_bytes) - - try: - plat = None - if "ant_nav_az" in scenario_dict: - plat = scenario_dict.get("ant_nav_az") - elif "platform_azimuth" in scenario_dict: - plat = scenario_dict.get("platform_azimuth") + scenario_dict = { + f[0]: _convert_ctypes(getattr(sc, f[0])) for f in sc._fields_ + } + targets_list = [ + {f[0]: _convert_ctypes(getattr(t, f[0])) for f in t._fields_} + for t in parsed_payload.tgt.tgt + ] + struct = {"scenario": scenario_dict, "targets": targets_list} + json_bytes = bytearray(json.dumps(struct, indent=2).encode("utf-8")) + self._update_last_payload("RIS_STATUS_JSON", json_bytes) + + # --- Propagate antenna azimuth to hub --- + if self._hub: + plat_az_rad = scenario_dict.get("ant_nav_az", scenario_dict.get("platform_azimuth")) + if plat_az_rad is not None: + az_deg = math.degrees(float(plat_az_rad)) + self._hub.set_antenna_azimuth(az_deg, timestamp=time.monotonic()) - if ( - plat is not None - and self._hub - and hasattr(self._hub, "set_platform_azimuth") - ): - try: - val = float(plat) - if abs(val) <= (2 * math.pi * 1.1): - deg = math.degrees(val) - else: - deg = val - - if hasattr(self._hub, "set_antenna_azimuth"): - self._hub.set_antenna_azimuth( - deg, timestamp=client_reception_time - ) - else: - self._hub.set_platform_azimuth( - deg, timestamp=client_reception_time - ) - except Exception: - pass - except Exception: - self._logger.debug( - "Error while extracting antenna azimuth from RIS payload", - exc_info=True, - ) except Exception: self._logger.exception("Failed to generate text/JSON for RIS debug view.") + def get_and_clear_latest_payloads(self) -> Dict[str, Any]: with self._lock: new_payloads = self._latest_payloads self._latest_payloads = {} return new_payloads - # ... (il resto del file rimane invariato) ... def update_raw_packet(self, raw_bytes: bytes, addr: tuple): with self._lock: self._last_raw_packet = (raw_bytes, addr) @@ -341,12 +323,8 @@ class DebugPayloadRouter: entry["flow"] = int(hdr.SFP_FLOW) entry["tid"] = int(hdr.SFP_TID) flow_map = { - ord("M"): "MFD", - ord("S"): "SAR", - ord("B"): "BIN", - ord("J"): "JSON", - ord("R"): "RIS", - ord("r"): "ris", + ord("M"): "MFD", ord("S"): "SAR", ord("B"): "BIN", + ord("J"): "JSON", ord("R"): "RIS", ord("r"): "ris", } entry["flow_name"] = flow_map.get(entry["flow"], str(entry["flow"])) except Exception: @@ -378,25 +356,11 @@ class DebugPayloadRouter: def set_history_size(self, n: int): with self._lock: - try: - n = max(1, int(n)) - except Exception: - return + n = max(1, int(n)) self._history_size = n new_deque = collections.deque(self._history, maxlen=self._history_size) self._history = new_deque def set_persist(self, enabled: bool): with self._lock: - self._persist = bool(enabled) - - def get_estimated_latency_s(self) -> float: - """ - Returns the estimated one-way server-to-client network latency. - - Returns: - The estimated latency in seconds, or 0.0 if not available. - """ - if hasattr(self, '_clock_synchronizer') and self._clock_synchronizer: - return self._clock_synchronizer.get_average_latency_s() - return 0.0 + self._persist = bool(enabled) \ No newline at end of file diff --git a/target_simulator/gui/ppi_adapter.py b/target_simulator/gui/ppi_adapter.py index c1465be..c51e932 100644 --- a/target_simulator/gui/ppi_adapter.py +++ b/target_simulator/gui/ppi_adapter.py @@ -1,20 +1,35 @@ +# target_simulator/gui/ppi_adapter.py + from typing import Dict, List, Optional import math from target_simulator.core.models import Target +from target_simulator.analysis.simulation_state_hub import SimulationStateHub -def build_display_data(simulation_hub, scenario=None, engine=None, ppi_widget=None, logger=None) -> Dict[str, List[Target]]: - """Builds PPI display data from the simulation hub. +def build_display_data( + simulation_hub: SimulationStateHub, + scenario: Optional['Scenario'] = None, + engine: Optional['SimulationEngine'] = None, + ppi_widget: Optional['PPIDisplay'] = None, + logger: Optional['Logger'] = None, +) -> Dict[str, List[Target]]: + """ + Builds PPI display data from the simulation hub, converting absolute + 'real' coordinates to relative coordinates based on the ownship's position. Returns a dict with keys 'simulated' and 'real' containing lightweight Target objects suitable for passing to PPIDisplay. """ - simulated_targets_for_ppi = [] - real_targets_for_ppi = [] + simulated_targets_for_ppi: List[Target] = [] + real_targets_for_ppi: List[Target] = [] if not simulation_hub: return {"simulated": [], "real": []} + # Get ownship state for coordinate transformation + ownship_state = simulation_hub.get_ownship_state() + ownship_pos_xy_ft = ownship_state.get("position_xy_ft") + target_ids = simulation_hub.get_all_target_ids() for tid in target_ids: @@ -22,8 +37,9 @@ def build_display_data(simulation_hub, scenario=None, engine=None, ppi_widget=No if not history: continue - # --- Process Simulated Data --- + # --- Process Simulated Data (assumed to be relative) --- if history.get("simulated"): + # Simulated data is generated relative to (0,0), so no transformation is needed. last_sim_state = history["simulated"][-1] _ts, x_ft, y_ft, z_ft = last_sim_state @@ -33,7 +49,7 @@ def build_display_data(simulation_hub, scenario=None, engine=None, ppi_widget=No setattr(sim_target, "_pos_z_ft", z_ft) sim_target._update_current_polar_coords() - # Try to preserve heading information for simulated targets. + # Preserve heading information from the engine/scenario if available try: heading = None if engine and getattr(engine, "scenario", None): @@ -48,56 +64,51 @@ def build_display_data(simulation_hub, scenario=None, engine=None, ppi_widget=No sim_target.current_heading_deg = float(heading) except Exception: pass + + # Preserve active flag + sim_target.active = True + if engine and getattr(engine, "scenario", None): + t_engine = engine.scenario.get_target(tid) + if t_engine is not None: + sim_target.active = bool(getattr(t_engine, "active", True)) - # Determine active flag based on the canonical Scenario/SimulationEngine - try: - active_flag = True - if engine and getattr(engine, "scenario", None): - t_engine = engine.scenario.get_target(tid) - if t_engine is not None: - active_flag = bool(getattr(t_engine, "active", True)) - elif scenario: - t_scn = scenario.get_target(tid) - if t_scn is not None: - active_flag = bool(getattr(t_scn, "active", True)) - except Exception: - active_flag = True - sim_target.active = active_flag simulated_targets_for_ppi.append(sim_target) - # --- Process Real Data --- + # --- Process Real Data (transforming from absolute to relative) --- if history.get("real"): last_real_state = history["real"][-1] - _ts, x_ft, y_ft, z_ft = last_real_state + _ts, abs_x_ft, abs_y_ft, abs_z_ft = last_real_state + + rel_x_ft, rel_y_ft = abs_x_ft, abs_y_ft + if ownship_pos_xy_ft: + # Calculate position relative to the ownship + rel_x_ft = abs_x_ft - ownship_pos_xy_ft[0] + rel_y_ft = abs_y_ft - ownship_pos_xy_ft[1] + + # The z-coordinate (altitude) is typically absolute, but for display + # we can treat it as relative to the ownship's altitude. + ownship_alt_ft = ownship_state.get("altitude_ft", 0.0) + rel_z_ft = abs_z_ft - ownship_alt_ft real_target = Target(target_id=tid, trajectory=[]) - setattr(real_target, "_pos_x_ft", x_ft) - setattr(real_target, "_pos_y_ft", y_ft) - setattr(real_target, "_pos_z_ft", z_ft) + setattr(real_target, "_pos_x_ft", rel_x_ft) + setattr(real_target, "_pos_y_ft", rel_y_ft) + setattr(real_target, "_pos_z_ft", rel_z_ft) real_target._update_current_polar_coords() # Copy last-known heading if hub provides it - try: - if simulation_hub and hasattr(simulation_hub, "get_real_heading"): - hdg = simulation_hub.get_real_heading(tid) - if hdg is not None: - real_target.current_heading_deg = float(hdg) % 360 - except Exception: - pass - - # Optional debug computations (theta0/theta1) left out; callers can - # compute if needed. Keep active True for real targets. + hdg = simulation_hub.get_real_heading(tid) + if hdg is not None: + real_target.current_heading_deg = float(hdg) % 360 + real_target.active = True real_targets_for_ppi.append(real_target) - try: - if logger: - logger.debug( - "PPIDisplay will receive simulated=%d real=%d targets from hub", - len(simulated_targets_for_ppi), - len(real_targets_for_ppi), - ) - except Exception: - pass + if logger: + logger.debug( + "PPI Adapter: Built display data (simulated=%d, real=%d)", + len(simulated_targets_for_ppi), + len(real_targets_for_ppi), + ) - return {"simulated": simulated_targets_for_ppi, "real": real_targets_for_ppi} + return {"simulated": simulated_targets_for_ppi, "real": real_targets_for_ppi} \ No newline at end of file diff --git a/target_simulator/gui/ppi_display.py b/target_simulator/gui/ppi_display.py index 8a908b3..cccafc2 100644 --- a/target_simulator/gui/ppi_display.py +++ b/target_simulator/gui/ppi_display.py @@ -15,7 +15,8 @@ import numpy as np import collections from matplotlib.figure import Figure from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg -from typing import List, Dict, Union +import matplotlib as mpl +from typing import List, Dict from target_simulator.core.models import Target, Waypoint, ManeuverType, NM_TO_FT @@ -40,11 +41,12 @@ class PPIDisplay(ttk.Frame): super().__init__(master) self.max_range = max_range_nm self.scan_limit_deg = scan_limit_deg - self.sim_target_artists, self.real_target_artists = [], [] - self.sim_trail_artists, self.real_trail_artists = [], [] - # Keep label artists separated so we can update simulated labels - # without removing real labels when a simulated-only update happens. - self.sim_label_artists, self.real_label_artists = [], [] + self.sim_target_artists: List[mpl.artist.Artist] = [] + self.real_target_artists: List[mpl.artist.Artist] = [] + self.sim_trail_artists: List[mpl.artist.Artist] = [] + self.real_trail_artists: List[mpl.artist.Artist] = [] + self.sim_label_artists: List[mpl.artist.Artist] = [] + self.real_label_artists: List[mpl.artist.Artist] = [] self.trail_length = trail_length or self.TRAIL_LENGTH self._trails = { "simulated": collections.defaultdict( @@ -54,166 +56,200 @@ class PPIDisplay(ttk.Frame): lambda: collections.deque(maxlen=self.trail_length) ), } - self.preview_artists = [] - # Per-target preview path artists (target_id -> list of artists) - self.preview_path_artists = {} + self.preview_artists: List[mpl.artist.Artist] = [] + self.preview_path_artists: Dict[int, List[mpl.artist.Artist]] = {} + + # Display options self.show_sim_points_var = tk.BooleanVar(value=True) self.show_real_points_var = tk.BooleanVar(value=True) self.show_sim_trail_var = tk.BooleanVar(value=False) - # Default: do not show real trails unless the user enables them. self.show_real_trail_var = tk.BooleanVar(value=False) - # Antenna animate toggle: when False the antenna is hidden; when True it is shown and animated self.animate_antenna_var = tk.BooleanVar(value=True) + self.display_mode_var = tk.StringVar(value="North-Up") + self.canvas = None + self._ownship_artist: mpl.lines.Line2D | None = None + self.ownship_heading_deg = 0.0 + self._create_controls() self._create_plot() - # Track timestamps of real update calls to compute display update rate + self._real_update_timestamps = collections.deque(maxlen=10000) self._last_update_summary_time = time.monotonic() self._update_summary_interval_s = 1.0 - # Antenna/Platform visualization state used to animate a moving - # dashed line indicating current antenna azimuth. + self._antenna_state = { "last_az_deg": None, "last_ts": None, "next_az_deg": None, "next_ts": None, "animating": False, - "tick_ms": 33, # ~30 FPS animation cadence + "tick_ms": 33, } - self._antenna_line_artist = None + self._antenna_line_artist: mpl.lines.Line2D | None = None - def _on_display_options_changed(self): - # A full redraw is needed, but we don't have the last data sets. - # The best approach is to clear everything. The next update cycle from - # the simulation engine and/or the server communicator will repopulate - # the display with the correct visibility settings. + def _on_display_options_changed(self, *args): self.clear_all_targets() if self.canvas: self.canvas.draw() + def _on_display_mode_changed(self, *args): + """Callback when the display mode (North-Up/Heading-Up) changes.""" + self._update_plot_orientation() + if self.canvas: + self.canvas.draw_idle() + def _create_controls(self): + """Creates the organized 4-section control panel.""" top_frame = ttk.Frame(self) - top_frame.pack(side=tk.TOP, fill=tk.X, padx=5, pady=5) - self.controls_frame = ttk.Frame(top_frame) - self.controls_frame.pack(side=tk.LEFT, fill=tk.X, expand=True) - # Use grid inside controls_frame so we can stack the antenna animation - # toggle immediately below the range selector as requested. - ttk.Label(self.controls_frame, text="Range (NM):").grid( - row=0, column=0, sticky="w" - ) + top_frame.pack(side=tk.TOP, fill=tk.X, padx=5, pady=(5, 2)) + + # Section 1: Radar Controls + radar_frame = ttk.LabelFrame(top_frame, text="Radar", padding=5) + radar_frame.pack(side=tk.LEFT, padx=(0, 5), fill=tk.Y) + + range_subframe = ttk.Frame(radar_frame) + range_subframe.pack(anchor='w') + ttk.Label(range_subframe, text="Range (NM):").pack(side=tk.LEFT) all_steps = [10, 20, 40, 80, 100, 160, 240, 320] - valid_steps = sorted([s for s in all_steps if s <= self.max_range]) - if not valid_steps or self.max_range not in valid_steps: - valid_steps.append(self.max_range) - valid_steps.sort() + valid_steps = sorted([s for s in all_steps if s <= self.max_range] + ([self.max_range] if self.max_range not in all_steps else [])) self.range_var = tk.IntVar(value=self.max_range) self.range_selector = ttk.Combobox( - self.controls_frame, - textvariable=self.range_var, - values=valid_steps, - state="readonly", - width=5, + range_subframe, textvariable=self.range_var, values=valid_steps, + state="readonly", width=5 ) - self.range_selector.grid(row=0, column=1, padx=5, sticky="w") - # Place the antenna animate toggle directly under the range selector - # to match the requested layout: range combobox, then antenna flag. - cb_antenna_local = ttk.Checkbutton( - self.controls_frame, - text="Animate Antenna", - variable=self.animate_antenna_var, - command=self._on_antenna_animate_changed, - ) - cb_antenna_local.grid(row=1, column=0, columnspan=2, sticky="w", pady=(4, 0)) + self.range_selector.pack(side=tk.LEFT, padx=5) - options_frame = ttk.LabelFrame(top_frame, text="Display Options") - options_frame.pack(side=tk.RIGHT, padx=(10, 0)) - cb_sim_points = ttk.Checkbutton( - options_frame, - text="Sim Points", - variable=self.show_sim_points_var, - command=self._on_display_options_changed, - ) - cb_sim_points.grid(row=0, column=0, sticky="w", padx=5) - cb_real_points = ttk.Checkbutton( - options_frame, - text="Real Points", - variable=self.show_real_points_var, - command=self._on_display_options_changed, - ) - cb_real_points.grid(row=0, column=1, sticky="w", padx=5) - cb_sim_trail = ttk.Checkbutton( - options_frame, - text="Sim Trail", - variable=self.show_sim_trail_var, - command=self._on_display_options_changed, - ) - cb_sim_trail.grid(row=1, column=0, sticky="w", padx=5) - cb_real_trail = ttk.Checkbutton( - options_frame, - text="Real Trail", - variable=self.show_real_trail_var, - command=self._on_display_options_changed, - ) - cb_real_trail.grid(row=1, column=1, sticky="w", padx=5) - # Legend: stack simulated and real swatches vertically to reduce width - legend_frame = ttk.Frame(top_frame) - legend_frame.pack(side=tk.RIGHT, padx=(10, 5)) - # Use grid so items are stacked - sim_sw = tk.Canvas( - legend_frame, width=16, height=12, highlightthickness=0 - ) + ttk.Checkbutton( + radar_frame, text="Animate Antenna", variable=self.animate_antenna_var, + command=self._on_antenna_animate_changed + ).pack(anchor='w', pady=(4, 0)) + + # Section 2: Display Mode + mode_frame = ttk.LabelFrame(top_frame, text="Display Mode", padding=5) + mode_frame.pack(side=tk.LEFT, padx=5, fill=tk.Y) + ttk.Radiobutton( + mode_frame, text="North-Up", variable=self.display_mode_var, + value="North-Up", command=self._on_display_mode_changed + ).pack(anchor='w') + ttk.Radiobutton( + mode_frame, text="Heading-Up", variable=self.display_mode_var, + value="Heading-Up", command=self._on_display_mode_changed + ).pack(anchor='w') + + # Section 3: Display Options + options_frame = ttk.LabelFrame(top_frame, text="Display Options", padding=5) + options_frame.pack(side=tk.LEFT, padx=5, fill=tk.Y) + ttk.Checkbutton( + options_frame, text="Sim Points", variable=self.show_sim_points_var, + command=self._on_display_options_changed + ).grid(row=0, column=0, sticky="w", padx=5) + ttk.Checkbutton( + options_frame, text="Real Points", variable=self.show_real_points_var, + command=self._on_display_options_changed + ).grid(row=0, column=1, sticky="w", padx=5) + ttk.Checkbutton( + options_frame, text="Sim Trail", variable=self.show_sim_trail_var, + command=self._on_display_options_changed + ).grid(row=1, column=0, sticky="w", padx=5) + ttk.Checkbutton( + options_frame, text="Real Trail", variable=self.show_real_trail_var, + command=self._on_display_options_changed + ).grid(row=1, column=1, sticky="w", padx=5) + + # Spacer to push the legend to the right + spacer = ttk.Frame(top_frame) + spacer.pack(side=tk.LEFT, expand=True, fill=tk.X) + + # Section 4: Legend + legend_frame = ttk.LabelFrame(top_frame, text="Legend", padding=5) + legend_frame.pack(side=tk.LEFT, padx=5, fill=tk.Y) + + # Ownship + own_sw = tk.Canvas(legend_frame, width=16, height=12, highlightthickness=0) + own_sw.create_rectangle(0, 0, 16, 12, fill="cyan", outline="black") + own_sw.grid(row=0, column=0, padx=(0, 4), pady=(0, 2)) + ttk.Label(legend_frame, text="Ownship").grid(row=0, column=1, sticky="w") + + # Simulated + sim_sw = tk.Canvas(legend_frame, width=16, height=12, highlightthickness=0) sim_sw.create_rectangle(0, 0, 16, 12, fill="green", outline="black") - sim_sw.grid(row=0, column=0, padx=(0, 4), pady=(0, 2)) - ttk.Label(legend_frame, text="Simulated").grid( - row=0, column=1, sticky="w", padx=(0, 8), pady=(0, 2) - ) - real_sw = tk.Canvas( - legend_frame, width=16, height=12, highlightthickness=0 - ) + sim_sw.grid(row=1, column=0, padx=(0, 4), pady=(0, 2)) + ttk.Label(legend_frame, text="Simulated").grid(row=1, column=1, sticky="w") + + # Real + real_sw = tk.Canvas(legend_frame, width=16, height=12, highlightthickness=0) real_sw.create_rectangle(0, 0, 16, 12, fill="red", outline="black") - real_sw.grid(row=1, column=0, padx=(0, 4)) - ttk.Label(legend_frame, text="Real").grid(row=1, column=1, sticky="w", padx=(0, 8)) + real_sw.grid(row=2, column=0, padx=(0, 4)) + ttk.Label(legend_frame, text="Real").grid(row=2, column=1, sticky="w") def _create_plot(self): fig = Figure(figsize=(5, 5), dpi=100, facecolor="#3E3E3E") fig.subplots_adjust(left=0.05, right=0.95, top=0.9, bottom=0.05) self.ax = fig.add_subplot(111, projection="polar", facecolor="#2E2E2E") self.ax.set_theta_zero_location("N") - self.ax.set_theta_direction(1) + self.ax.set_theta_direction(-1) # Clockwise self.ax.set_rlabel_position(90) self.ax.set_ylim(0, self.range_var.get()) angles_deg = np.arange(0, 360, 30) - labels = [f"{(a - 360) if a > 180 else a}°" for a in angles_deg] + labels = [f"{a}°" for a in angles_deg] self.ax.set_thetagrids(angles_deg, labels) self.ax.tick_params(axis="x", colors="white", labelsize=8) self.ax.tick_params(axis="y", colors="white", labelsize=8) self.ax.grid(color="white", linestyle="--", linewidth=0.5, alpha=0.5) self.ax.spines["polar"].set_color("white") self.ax.set_title("PPI Display", color="white") + + (self._ownship_artist,) = self.ax.plot( + [0], [0], + marker='^', markersize=12, color='cyan', + markeredgecolor='black', zorder=10, + transform=self.ax.transData + ) + self._ownship_artist.set_visible(True) + (self._path_plot,) = self.ax.plot([], [], "g--", linewidth=1.5) (self._start_plot,) = self.ax.plot([], [], "go", markersize=8) (self._waypoints_plot,) = self.ax.plot([], [], "y+", markersize=10, mew=2) self.preview_artists = [self._path_plot, self._start_plot, self._waypoints_plot] + limit_rad = np.deg2rad(self.scan_limit_deg) - (self._scan_line_1,) = self.ax.plot( - [limit_rad, limit_rad], [0, self.max_range], "y--", linewidth=1 - ) - (self._scan_line_2,) = self.ax.plot( - [-limit_rad, -limit_rad], [0, self.max_range], "y--", linewidth=1 - ) - # Antenna current azimuth (dashed light-gray line). It will be - # animated via update_antenna_azimuth() calls which interpolate - # between timestamped azimuth updates for smooth motion. - (self._antenna_line_artist,) = self.ax.plot( - [], [], color="lightgray", linestyle="--", linewidth=1.2, alpha=0.85 - ) + (self._scan_line_1,) = self.ax.plot([limit_rad, limit_rad], [0, self.max_range], "y--", linewidth=1) + (self._scan_line_2,) = self.ax.plot([-limit_rad, -limit_rad], [0, self.max_range], "y--", linewidth=1) + (self._antenna_line_artist,) = self.ax.plot([], [], color="lightgray", linestyle="--", linewidth=1.2, alpha=0.85) + self.canvas = FigureCanvasTkAgg(fig, master=self) self.canvas.draw() self.canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=True) self.range_selector.bind("<>", self._on_range_selected) self._update_scan_lines() + def update_ownship_state(self, heading_deg: float): + """Updates the ownship's visual representation on the PPI.""" + if self.ownship_heading_deg != heading_deg: + self.ownship_heading_deg = heading_deg + self._update_plot_orientation() + + def _update_plot_orientation(self): + """Applies rotation to the plot or ownship icon based on display mode.""" + if not self.ax or not self._ownship_artist: + return + + mode = self.display_mode_var.get() + heading_rad = np.deg2rad(self.ownship_heading_deg) + + if mode == "Heading-Up": + self.ax.set_theta_offset(math.pi / 2 - heading_rad) + transform = mpl.transforms.IdentityTransform() + self._ownship_artist.set_transform(transform) + else: # North-Up + self.ax.set_theta_offset(math.pi / 2) + transform = mpl.transforms.Affine2D().rotate(-heading_rad) + self.ax.transData + self._ownship_artist.set_transform(transform) + + if self.canvas: + self.canvas.draw_idle() + def clear_all_targets(self): """Clears all target artists from the display.""" all_artists = ( @@ -241,31 +277,10 @@ class PPIDisplay(ttk.Frame): def update_real_targets(self, targets: List[Target]): """Updates and redraws only the real targets.""" - # Instrument update rate try: now = time.monotonic() self._real_update_timestamps.append(now) - # Periodic (throttled) summary log of PPI update rate - if ( - now - self._last_update_summary_time - ) >= self._update_summary_interval_s: - # compute recent updates per second - cutoff = now - self._update_summary_interval_s - cnt = 0 - for ts in reversed(self._real_update_timestamps): - if ts >= cutoff: - cnt += 1 - else: - break - rate = ( - cnt / float(self._update_summary_interval_s) - if self._update_summary_interval_s > 0 - else float(cnt) - ) - # try: - # logger.info("[PPIDisplay] update_real_targets: recent_rate=%.1f updates/s display_targets=%d", rate, len(targets)) - # except Exception: - # pass + if (now - self._last_update_summary_time) >= self._update_summary_interval_s: self._last_update_summary_time = now except Exception: pass @@ -276,18 +291,12 @@ class PPIDisplay(ttk.Frame): def get_real_update_rate(self, window_seconds: float = 1.0) -> float: """ - Returns approximate PPI "real targets" update rate (updates/sec) over the - last `window_seconds` seconds. + Returns approximate PPI "real targets" update rate (updates/sec). """ try: now = time.monotonic() cutoff = now - float(window_seconds) - count = 0 - for ts in reversed(self._real_update_timestamps): - if ts >= cutoff: - count += 1 - else: - break + count = sum(1 for ts in self._real_update_timestamps if ts >= cutoff) return count / float(window_seconds) if window_seconds > 0 else float(count) except Exception: return 0.0 @@ -297,156 +306,91 @@ class PPIDisplay(ttk.Frame): Generic helper to update targets for a specific category ('simulated' or 'real'). """ if category == "simulated": - target_artists = self.sim_target_artists - trail_artists = self.sim_trail_artists - label_artists = self.sim_label_artists - trail_data = self._trails["simulated"] - show_points = self.show_sim_points_var.get() - show_trail = self.show_sim_trail_var.get() - color = "green" - trail_color = "limegreen" - else: # "real" - target_artists = self.real_target_artists - trail_artists = self.real_trail_artists - label_artists = self.real_label_artists - trail_data = self._trails["real"] - show_points = self.show_real_points_var.get() - show_trail = self.show_real_trail_var.get() - color = "red" - trail_color = "tomato" + target_artists, trail_artists, label_artists = self.sim_target_artists, self.sim_trail_artists, self.sim_label_artists + trail_data, show_points, show_trail = self._trails["simulated"], self.show_sim_points_var.get(), self.show_sim_trail_var.get() + color, trail_color = "green", "limegreen" + else: + target_artists, trail_artists, label_artists = self.real_target_artists, self.real_trail_artists, self.real_label_artists + trail_data, show_points, show_trail = self._trails["real"], self.show_real_points_var.get(), self.show_real_trail_var.get() + color, trail_color = "red", "tomato" - # 1. Clear existing artists for this category for artist in target_artists + trail_artists + label_artists: artist.remove() target_artists.clear() trail_artists.clear() label_artists.clear() - # 2. Update trail data if show_points or show_trail: for t in new_data: if t.active: - # Use the same plotting convention as _draw_target_visuals: - # theta is the geometric azimuth in degrees converted to radians. - # Previously a negation was applied here which inverted trail - # directions (mirrored azimuth). Remove the negation so trails - # follow the actual movement direction. pos = (np.deg2rad(t.current_azimuth_deg), t.current_range_nm) trail_data[t.target_id].append(pos) - # 3. Draw new visuals if show_points: active_targets = [t for t in new_data if t.active] - inactive_targets = [t for t in new_data if not t.active] - - # Draw active targets as before if active_targets: - self._draw_target_visuals( - active_targets, color, target_artists, label_artists - ) - - # Draw inactive simulated targets with a yellow 'X' marker overlay + self._draw_target_visuals(active_targets, color, target_artists, label_artists) + + inactive_targets = [t for t in new_data if not t.active] if inactive_targets and category == "simulated": - self._draw_inactive_markers( - inactive_targets, color, target_artists, label_artists - ) - + self._draw_inactive_markers(inactive_targets, color, target_artists, label_artists) + if show_trail: self._draw_trails(trail_data, trail_color, trail_artists) def _draw_inactive_markers( - self, - targets: List[Target], - color: str, - artist_list: List, - label_artist_list: List, + self, targets: List[Target], color: str, artist_list: List, label_artist_list: List ): - """Draw a small stationary marker for targets that are no longer simulated and - overlay a yellow 'X' to indicate the target is not being updated by the simulator. - """ for target in targets: try: r_nm = target.current_range_nm theta_rad_plot = np.deg2rad(target.current_azimuth_deg) - # plot a subdued point - (dot,) = self.ax.plot( - theta_rad_plot, r_nm, "o", markersize=6, color=color, alpha=0.6 - ) + (dot,) = self.ax.plot(theta_rad_plot, r_nm, "o", markersize=6, color=color, alpha=0.6) artist_list.append(dot) - - # overlay a yellow X at the same position - # Use a thin 'x' marker rather than a bold text glyph so the - # overlay is visually thinner and less intrusive. (x_mark,) = self.ax.plot( - theta_rad_plot, - r_nm, - marker="x", - color="yellow", - markersize=8, - markeredgewidth=0.9, - linestyle="", + theta_rad_plot, r_nm, marker="x", color="yellow", + markersize=8, markeredgewidth=0.9, linestyle="" ) label_artist_list.append(x_mark) except Exception: - # continue without breaking the draw loop pass def _draw_target_visuals( - self, - targets: List[Target], - color: str, - artist_list: List, - label_artist_list: List, + self, targets: List[Target], color: str, artist_list: List, label_artist_list: List ): vector_len_nm = self.range_var.get() / 20.0 - - # Determine marker size based on the target type (color) - marker_size = ( - 6 if color == "red" else 8 - ) # Simulated targets (green) are smaller + marker_size = 8 if color == "green" else 6 for target in targets: - # Plotting position (theta, r) r_nm = target.current_range_nm - # MODIFICATION: Removed negation. The azimuth from the model is now used directly. theta_rad_plot = np.deg2rad(target.current_azimuth_deg) - (dot,) = self.ax.plot( - theta_rad_plot, r_nm, "o", markersize=marker_size, color=color - ) + + (dot,) = self.ax.plot(theta_rad_plot, r_nm, "o", markersize=marker_size, color=color) artist_list.append(dot) - # --- Robust Vector Calculation --- - az_rad_model = math.radians(target.current_azimuth_deg) - x_start_nm = r_nm * math.sin(az_rad_model) - y_start_nm = r_nm * math.cos(az_rad_model) - # MODIFICATION: Heading should also be consistent. - # A positive heading (e.g. 10 deg) means turning left (CCW), which matches - # the standard polar plot direction. - hdg_rad_plot = math.radians(target.current_heading_deg) - dx_nm = vector_len_nm * math.sin(hdg_rad_plot) - dy_nm = vector_len_nm * math.cos(hdg_rad_plot) - x_end_nm = x_start_nm + dx_nm - y_end_nm = y_start_nm + dy_nm - r_end_nm = math.hypot(x_end_nm, y_end_nm) - # MODIFICATION: Removed negation here as well for consistency. - theta_end_rad_plot = math.atan2(x_end_nm, y_end_nm) + heading_relative_to_north_rad = np.deg2rad(target.current_heading_deg) + + x_start_rel_nm = r_nm * math.sin(theta_rad_plot) + y_start_rel_nm = r_nm * math.cos(theta_rad_plot) + + dx_nm = vector_len_nm * math.sin(heading_relative_to_north_rad) + dy_nm = vector_len_nm * math.cos(heading_relative_to_north_rad) + + x_end_rel_nm = x_start_rel_nm + dx_nm + y_end_rel_nm = y_start_rel_nm + dy_nm + + r_end_nm = math.hypot(x_end_rel_nm, y_end_rel_nm) + theta_end_rad_plot = math.atan2(x_end_rel_nm, y_end_rel_nm) (line,) = self.ax.plot( - [theta_rad_plot, theta_end_rad_plot], - [r_nm, r_end_nm], - color=color, - linewidth=1.2, + [theta_rad_plot, theta_end_rad_plot], [r_nm, r_end_nm], + color=color, linewidth=1.2 ) artist_list.append(line) txt = self.ax.text( - theta_rad_plot, - r_nm + (vector_len_nm * 0.5), - str(target.target_id), - color="white", - fontsize=8, - ha="center", - va="bottom", + theta_rad_plot, r_nm + (vector_len_nm * 0.5), str(target.target_id), + color="white", fontsize=8, ha="center", va="bottom" ) label_artist_list.append(txt) @@ -454,9 +398,7 @@ class PPIDisplay(ttk.Frame): for trail in trail_data.values(): if len(trail) > 1: thetas, rs = zip(*trail) - (line,) = self.ax.plot( - thetas, rs, color=color, linestyle="-", linewidth=0.8, alpha=0.7 - ) + (line,) = self.ax.plot(thetas, rs, color=color, linestyle="-", linewidth=0.8, alpha=0.7) artist_list.append(line) def clear_trails(self): @@ -471,17 +413,11 @@ class PPIDisplay(ttk.Frame): limit_rad = np.deg2rad(self.scan_limit_deg) self._scan_line_1.set_data([limit_rad, limit_rad], [0, max_r]) self._scan_line_2.set_data([-limit_rad, -limit_rad], [0, max_r]) - # Ensure antenna line extends to the updated range limit - try: - if self._antenna_line_artist is not None: - data = self._antenna_line_artist.get_data() - if data and len(data) == 2: - thetas, rs = data - if len(thetas) >= 1: - theta = thetas[0] - self._antenna_line_artist.set_data([theta, theta], [0, max_r]) - except Exception: - pass + if self._antenna_line_artist: + data = self._antenna_line_artist.get_data() + if data and len(data) == 2 and len(data[0]) >= 1: + theta = data[0][0] + self._antenna_line_artist.set_data([theta, theta], [0, max_r]) def _on_range_selected(self, event=None): self.ax.set_ylim(0, self.range_var.get()) @@ -490,140 +426,63 @@ class PPIDisplay(ttk.Frame): self.canvas.draw() def _on_antenna_animate_changed(self): - """ - Callback when the animate checkbox toggles. When unchecked the antenna - is hidden; when checked the antenna is shown and animation resumes if - a pending interpolation exists. - """ - try: - st = self._antenna_state - enabled = ( - getattr(self, "animate_antenna_var", None) - and self.animate_antenna_var.get() - ) - if not enabled: - # Hide the antenna and stop animating - st["animating"] = False - if self._antenna_line_artist is not None: - try: - self._antenna_line_artist.set_data([], []) - except Exception: - try: - self._antenna_line_artist.remove() - self._antenna_line_artist = None - except Exception: - pass - if self.canvas: - try: - self.canvas.draw_idle() - except Exception: - self.canvas.draw() - return - - # If enabled: resume animation if there's a pending interval - last_ts = st.get("last_ts") - next_ts = st.get("next_ts") - if last_ts is not None and next_ts is not None and next_ts > last_ts: - if not st.get("animating"): - st["animating"] = True - try: - self.after(st.get("tick_ms", 33), self._antenna_animation_step) - except Exception: - st["animating"] = False - else: - # Nothing to interpolate: render the most recent azimuth immediately - cur = st.get("last_az_deg") or st.get("next_az_deg") - if cur is not None: - self._render_antenna_line(cur) - except Exception: - pass + st = self._antenna_state + enabled = self.animate_antenna_var.get() + if not enabled: + st["animating"] = False + if self._antenna_line_artist: + self._antenna_line_artist.set_visible(False) + else: + if self._antenna_line_artist: + self._antenna_line_artist.set_visible(True) + last_ts, next_ts = st.get("last_ts"), st.get("next_ts") + if last_ts is not None and next_ts is not None and next_ts > last_ts and not st.get("animating"): + st["animating"] = True + self.after(st.get("tick_ms", 33), self._antenna_animation_step) + + if self.canvas: + self.canvas.draw_idle() def clear_previews(self): for artist in self.preview_artists: artist.set_data([], []) - # remove any per-target preview path artists - try: - for arts in list(self.preview_path_artists.values()): - for a in arts: - try: - a.remove() - except Exception: - pass - except Exception: - pass + for arts in self.preview_path_artists.values(): + for a in arts: + a.remove() self.preview_path_artists.clear() if self.canvas: self.canvas.draw() def draw_scenario_preview(self, scenario): - """Draw dashed trajectory previews for all targets in a Scenario. - - The preview is only visual; it does not affect simulation state. Existing - previews are cleared first. - """ - # Clear any existing previews self.clear_previews() if scenario is None: return for target in scenario.get_all_targets(): try: - waypoints = target.trajectory - if not waypoints: - continue - path, _ = Target.generate_path_from_waypoints( - waypoints, target.use_spline - ) - if not path: - continue + path, _ = Target.generate_path_from_waypoints(target.trajectory, target.use_spline) + if not path: continue path_thetas, path_rs = [], [] for point in path: x_ft, y_ft = point[1], point[2] - r_ft = math.sqrt(x_ft**2 + y_ft**2) + r_ft = math.hypot(x_ft, y_ft) az_rad_plot = math.atan2(x_ft, y_ft) path_rs.append(r_ft / NM_TO_FT) path_thetas.append(az_rad_plot) - # Plot dashed path using simulated color (line_art,) = self.ax.plot( - path_thetas, - path_rs, - color="limegreen", - linestyle="--", - linewidth=1.2, - alpha=0.9, + path_thetas, path_rs, color="limegreen", linestyle="--", linewidth=1.2, alpha=0.9 ) - - # Optionally show start point marker for each path - start_theta = path_thetas[0] if path_thetas else None - start_r = path_rs[0] if path_rs else None - start_art = None - if start_theta is not None: - (start_art,) = self.ax.plot( - [start_theta], [start_r], "go", markersize=6 - ) - - arts = [a for a in (line_art, start_art) if a is not None] - if arts: - self.preview_path_artists[target.target_id] = arts + (start_art,) = self.ax.plot([path_thetas[0]], [path_rs[0]], "go", markersize=6) + self.preview_path_artists[target.target_id] = [line_art, start_art] except Exception: - logger.exception( - "Failed to draw preview for target %s", - getattr(target, "target_id", "?"), - ) + logger.exception("Failed to draw preview for target %s", getattr(target, "target_id", "?")) if self.canvas: self.canvas.draw() - # -------------------- Antenna visualization & interpolation -------------------- def update_antenna_azimuth(self, az_deg: float, timestamp: float = None): - """ - Receive a new platform/antenna azimuth (degrees) with an optional - monotonic timestamp. The display will interpolate between the last - known azimuth and this new azimuth over the time interval to provide - a smooth animation. - """ try: now = time.monotonic() ts = float(timestamp) if timestamp is not None else now @@ -632,160 +491,74 @@ class PPIDisplay(ttk.Frame): return st = self._antenna_state + if not self.animate_antenna_var.get(): + st.update({"last_az_deg": az, "last_ts": ts, "next_az_deg": az, "next_ts": ts, "animating": False}) + if self._antenna_line_artist: self._antenna_line_artist.set_visible(False) + if self.canvas: self.canvas.draw_idle() + return - # If antenna animate is disabled, update stored azimuth but hide the antenna - try: - if ( - getattr(self, "animate_antenna_var", None) is not None - and not self.animate_antenna_var.get() - ): - st["last_az_deg"] = az - st["last_ts"] = ts - st["next_az_deg"] = az - st["next_ts"] = ts - st["animating"] = False - # Hide the visible antenna line if present - if self._antenna_line_artist is not None: - try: - self._antenna_line_artist.set_data([], []) - except Exception: - try: - self._antenna_line_artist.remove() - self._antenna_line_artist = None - except Exception: - pass - if self.canvas: - try: - self.canvas.draw_idle() - except Exception: - self.canvas.draw() - return - except Exception: - pass - - # If no previous sample exists, initialize both last and next to this value if st["last_az_deg"] is None or st["last_ts"] is None: - st["last_az_deg"] = az - st["last_ts"] = ts - st["next_az_deg"] = az - st["next_ts"] = ts - # Render immediately + st.update({"last_az_deg": az, "last_ts": ts, "next_az_deg": az, "next_ts": ts}) self._render_antenna_line(az) return - # Compute the current interpolated azimuth at 'now' and set as last - # so the new interpolation starts from the on-screen position. - cur_az = st["last_az_deg"] - cur_ts = st["last_ts"] - next_az = st.get("next_az_deg") - next_ts = st.get("next_ts") + cur_az, cur_ts = st["last_az_deg"], st["last_ts"] + next_az, next_ts = st.get("next_az_deg"), st.get("next_ts") - # If there is an outstanding next sample in the future, compute - # current interpolated value to be the new last. - try: - if next_az is not None and next_ts is not None and next_ts > cur_ts: - now_t = now - frac = 0.0 - if next_ts > cur_ts: - frac = max( - 0.0, min(1.0, (now_t - cur_ts) / float(next_ts - cur_ts)) - ) - # Shortest-angle interpolation - a0 = cur_az - a1 = next_az - diff = ((a1 - a0 + 180) % 360) - 180 - interp = (a0 + diff * frac) % 360 - cur_az = interp - cur_ts = now_t - except Exception: - # If interpolation fails, fall back to last known - cur_az = st["last_az_deg"] - cur_ts = st["last_ts"] + if next_az is not None and next_ts is not None and next_ts > cur_ts: + frac = max(0.0, min(1.0, (now - cur_ts) / (next_ts - cur_ts))) + diff = ((next_az - cur_az + 180) % 360) - 180 + cur_az = (cur_az + diff * frac) % 360 + cur_ts = now - # Set new last and next values for upcoming animation - st["last_az_deg"] = cur_az - st["last_ts"] = cur_ts - st["next_az_deg"] = az - st["next_ts"] = ts + st.update({"last_az_deg": cur_az, "last_ts": cur_ts, "next_az_deg": az, "next_ts": ts}) - # Start the animation loop if not already running if not st["animating"]: st["animating"] = True - try: - self.after(st["tick_ms"], self._antenna_animation_step) - except Exception: - st["animating"] = False + self.after(st["tick_ms"], self._antenna_animation_step) def _antenna_animation_step(self): st = self._antenna_state + if not self.animate_antenna_var.get(): + st["animating"] = False + return + try: - # If next and last are the same timestamp, snap to next - last_ts = st.get("last_ts") - next_ts = st.get("next_ts") - last_az = st.get("last_az_deg") - next_az = st.get("next_az_deg") - + last_ts, next_ts = st.get("last_ts"), st.get("next_ts") + last_az, next_az = st.get("last_az_deg"), st.get("next_az_deg") now = time.monotonic() - if last_az is None or next_az is None or last_ts is None or next_ts is None: + if any(v is None for v in [last_az, next_az, last_ts, next_ts]): st["animating"] = False return - if next_ts <= last_ts or abs(next_ts - last_ts) < 1e-6: - # No interval: snap - cur = next_az % 360 - st["last_az_deg"] = cur - st["last_ts"] = next_ts + if next_ts <= last_ts: + cur = next_az st["animating"] = False - self._render_antenna_line(cur) - return - - frac = max(0.0, min(1.0, (now - last_ts) / float(next_ts - last_ts))) - # Shortest-angle interpolation across 0/360 boundary - a0 = last_az - a1 = next_az - diff = ((a1 - a0 + 180) % 360) - 180 - cur = (a0 + diff * frac) % 360 - + else: + frac = max(0.0, min(1.0, (now - last_ts) / (next_ts - last_ts))) + diff = ((next_az - last_az + 180) % 360) - 180 + cur = (last_az + diff * frac) % 360 + if frac >= 1.0: + st["last_az_deg"], st["last_ts"] = next_az, next_ts + st["animating"] = False + self._render_antenna_line(cur) - - if frac >= 1.0: - # Reached the target - st["last_az_deg"] = next_az % 360 - st["last_ts"] = next_ts - st["animating"] = False - return - except Exception: - st["animating"] = False - # Schedule next tick if still animating - try: - if st.get("animating"): - self.after(st.get("tick_ms", 33), self._antenna_animation_step) except Exception: st["animating"] = False + + if st["animating"]: + self.after(st["tick_ms"], self._antenna_animation_step) def _render_antenna_line(self, az_deg: float): - """ - Render the antenna (platform) azimuth line on the PPI using the - current display conventions (theta = deg -> radians) and current range limit. - """ try: - theta = np.deg2rad(float(az_deg) % 360) + theta = np.deg2rad(float(az_deg)) max_r = self.ax.get_ylim()[1] - if self._antenna_line_artist is None: - # Create artist lazily if missing - (self._antenna_line_artist,) = self.ax.plot( - [], [], color="lightgray", linestyle="--", linewidth=1.2, alpha=0.85 - ) - # Plot as a radial line from r=0 to r=max_r - self._antenna_line_artist.set_data([theta, theta], [0, max_r]) - # Use draw_idle for better GUI responsiveness + if self._antenna_line_artist: + self._antenna_line_artist.set_visible(self.animate_antenna_var.get()) + self._antenna_line_artist.set_data([theta, theta], [0, max_r]) if self.canvas: - try: - self.canvas.draw_idle() - except Exception: - # Fall back to immediate draw - self.canvas.draw() + self.canvas.draw_idle() except Exception: pass @@ -794,55 +567,43 @@ class PPIDisplay(ttk.Frame): self.clear_trails() if not waypoints or waypoints[0].maneuver_type != ManeuverType.FLY_TO_POINT: return + path, _ = Target.generate_path_from_waypoints(waypoints, use_spline) if not path: return + path_thetas, path_rs = [], [] for point in path: x_ft, y_ft = point[1], point[2] - r_ft = math.sqrt(x_ft**2 + y_ft**2) - # Use the same plotting convention used elsewhere: theta_plot = atan2(x, y). - # This convention is established in the _draw_target_visuals helper, - # which computes theta via -current_azimuth_deg. + r_ft = math.hypot(x_ft, y_ft) az_rad_plot = math.atan2(x_ft, y_ft) path_rs.append(r_ft / NM_TO_FT) path_thetas.append(az_rad_plot) + self._path_plot.set_data(path_thetas, path_rs) + wp_thetas, wp_rs = [], [] for wp in waypoints: if wp.maneuver_type == ManeuverType.FLY_TO_POINT: r_nm = wp.target_range_nm or 0.0 - # The path uses theta_plot = atan2(x, y). Waypoint azimuths - # provided in the waypoint are geometric azimuth degrees - # (0 = North, positive CCW). Convert directly to radians so - # plotted waypoint markers align with the generated path. az_rad_plot = np.deg2rad(wp.target_azimuth_deg or 0.0) wp_rs.append(r_nm) wp_thetas.append(az_rad_plot) self._waypoints_plot.set_data(wp_thetas, wp_rs) + start_wp = waypoints[0] start_r = start_wp.target_range_nm or 0.0 start_theta = np.deg2rad(start_wp.target_azimuth_deg or 0.0) self._start_plot.set_data([start_theta], [start_r]) + if self.canvas: self.canvas.draw() def reconfigure_radar(self, max_range_nm: int, scan_limit_deg: int): self.max_range, self.scan_limit_deg = max_range_nm, scan_limit_deg steps = [10, 20, 40, 80, 100, 160, 240, 320] - valid_steps = sorted( - [s for s in steps if s <= max_range_nm] - + ([max_range_nm] if max_range_nm not in steps else []) - ) + valid_steps = sorted([s for s in steps if s <= max_range_nm] + ([max_range_nm] if max_range_nm not in steps else [])) self.range_selector["values"] = valid_steps if self.range_var.get() not in valid_steps: self.range_var.set(max_range_nm) - self._on_range_selected() - - def set_connect_callback(self, cb): - self._connect_callback = cb - - def update_connect_state(self, is_connected: bool): - # This method should only reflect state, not change UI elements. - # The parent window is responsible for enabling/disabling controls. - pass + self._on_range_selected() \ No newline at end of file diff --git a/todo.md b/todo.md index 8ec6f00..edfe70a 100644 --- a/todo.md +++ b/todo.md @@ -1,28 +1,18 @@ -# cose da fare +# ToDo List -## bachi -togliere il tasto connect dalla ppi e metterlo da un'altra parte della finestra perchè non centra niente con la ppi +- [ ] Inserire dati di navigazione dell'ownship nel file di salvataggio della simulazione +- [ ] muovere il ppi in base al movimento dell'ownship +- [ ] Aggiungere tabella dei dati cinematici dell'ownship nella schermata della simulazione +- [ ] Mettere nel file di comando inviato al srver l'ultimo timetag che è arrivato dal server +- [ ] Implementare il comando ping con numero indentificativo per verificare i tempi di risposta +- [ ] Mettere configurazione cifre decimali inviate nei json al server +- [ ] Se lat/lon passato dal server non è valido posso fare come fa mcs, integrare sul tempo e simulare il movimente dell'ownship +- [ ] poter scegliere se visualizzare la mappa ppi fissa a nord o fissa con l'heading dell'ownship +- [ ] salvare nei file delle simulazione i dati in lat/lon dei target così da poter piazzare su mappa oepnstreetmap le traiettorie e vedere come si è mosso lo scenario durante la simulazione +- [ ] vedere anche la simulazione in 3d usando le mappe dem e le mappe operstreetmap. +- [ ] Scrivere test unitari +- [ ] creare repository su git aziendale, usando codice PJ40906 come progetto -## sviluppi +# FIXME List -scomporre il campo flag in bit per avere le informazioni dello stato del target (attivo, tracable) -fare simulazione con moviumento dell'aereo letto da protocollo -visualizzare informaizoni dinamiche dell'areo durante la simulazione -sull'edito, se seleziono una manovra, vederla colorata di un altro colore sulla preview per capire cosa sto toccando. - - -la visualizzazione ppi in simulazione se tiene conto della rotazione del ptazimuth dovrebbe ruotare in modo che il cono di scansione dell'antenna si muove -di conseguenza. Immagino che la mappa ppi sia sempre diretta a nord, quindi quando io con l'aereo vado a nord tutto torna -se invece cambio direzione dell'aereo la mappa ruota e quindi ruotano anche le label attorno in modo che siano sempre riferite al muso dell'aereo. -Quindi dovremmo inserire una nuova legenda oltre a quella attuale che indichi che il nord è sempra. per ricordare all'utente che la ppi è verso l'alto. - - -devo ppoter mandare 10 millisecondi lka posizione del radar e misurare i discostamenti dalla posizione calcolata da quella tornata dal radar e graficare gli scontamenti sulla traiettoria. - -fare in modo di calcolare se l'invio dei dati ogni tot è rispettato misurando effettivamente il momento in cui si decide di mandare il dato ed il momento effettivo di uscita del dato -Per questo capire se jittera ed in caso fare degli aggiustamenti nel tempo per fare in modo che venga rispettato il rate di spedizione scelto che deve arrivare anche a 0.01 secondi. -vedere se con il rate di invio a 0.01 secondi riusciamo a mandare i dati al server - -modificare gli inivii verificando quanti target devono essere aggiornati e mandare per ogni invio un pacchetto di comandi settabile, per non eccedere la lunghezza massima possibile per 1 singolo messaggio - -mettere una flag che attivi o l'invio dei comandi a monitor, quello attuale, con i nuovi comandi via json che stiamo stabilendo con il server. \ No newline at end of file +- [ ] sistemare la visualizzazione nella tabe simulator, per poter vedere quale scenario è stato selezionato