diff --git a/.vscode/settings.json b/.vscode/settings.json index 2ddce20..2ba8ff0 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,5 +5,7 @@ "python.testing.unittestEnabled": false, "python.testing.pytestEnabled": true, "todo-tree.tree.showBadges": false, - "todo-tree.tree.showCountsInTree": true + "todo-tree.tree.showCountsInTree": true, + "python.terminal.activateEnvironment": true, + "python.defaultInterpreterPath": "${workspaceFolder}\\.venv\\Scripts\\python.exe" } \ No newline at end of file diff --git a/settings.json b/settings.json index 088e9be..05d2391 100644 --- a/settings.json +++ b/settings.json @@ -3,7 +3,7 @@ "scan_limit": 60, "max_range": 100, "geometry": "1492x992+230+258", - "last_selected_scenario": "scenario_dritto", + "last_selected_scenario": "scenario2", "connection": { "target": { "type": "sfp", @@ -45,6 +45,7 @@ }, "debug": { "enable_io_trace": true, + "enable_performance_profiling": true, "temp_folder_name": "Temp", "io_trace_sent_filename": "sent_positions.csv", "io_trace_received_filename": "received_positions.csv" diff --git a/target_simulator/analysis/performance_analyzer.py b/target_simulator/analysis/performance_analyzer.py deleted file mode 100644 index e43be40..0000000 --- a/target_simulator/analysis/performance_analyzer.py +++ /dev/null @@ -1,148 +0,0 @@ -# target_simulator/analysis/performance_analyzer.py -""" -Provides the PerformanceAnalyzer class for calculating error metrics -by comparing simulated data with real-time radar data. -""" -import math -from typing import Dict, List, Optional, Tuple - -from target_simulator.analysis.simulation_state_hub import ( - SimulationStateHub, - TargetState, -) - -# Structure to hold analysis results for a single target -AnalysisResult = Dict[str, Dict[str, float]] - - -class PerformanceAnalyzer: - """ - Analyzes the performance of the radar tracking by comparing simulated - 'ground truth' data against the real data received from the radar. - """ - - def __init__(self, hub: SimulationStateHub): - """ - Initializes the analyzer with a reference to the data hub. - - Args: - hub: The SimulationStateHub containing the historical data. - """ - self._hub = hub - - def analyze(self) -> Dict[int, AnalysisResult]: - """ - Performs a full analysis on all targets currently in the hub. - - For each target, it aligns the real and simulated data streams - temporally using linear interpolation and calculates key performance - metrics like Mean Error and Root Mean Square Error (RMSE). - - Returns: - A dictionary where keys are target IDs and values are the - analysis results for that target. - """ - results: Dict[int, AnalysisResult] = {} - target_ids = self._hub.get_all_target_ids() - - for tid in target_ids: - history = self._hub.get_target_history(tid) - if not history or not history["real"] or len(history["simulated"]) < 2: - # Not enough data to perform analysis - continue - - simulated_history = sorted( - history["simulated"] - ) # Ensure sorted by timestamp - real_history = history["real"] - - errors_x: List[float] = [] - errors_y: List[float] = [] - errors_z: List[float] = [] - - for real_state in real_history: - real_ts, real_x, real_y, real_z = real_state - - # Find the two simulated points that bracket the real point in time - p1, p2 = self._find_bracketing_points(real_ts, simulated_history) - - if p1 and p2: - # We have bracketing points, so we can interpolate - interpolated_state = self._interpolate(real_ts, p1, p2) - _interp_ts, interp_x, interp_y, interp_z = interpolated_state - - # Calculate instantaneous error - errors_x.append(real_x - interp_x) - errors_y.append(real_y - interp_y) - errors_z.append(real_z - interp_z) - - # If we have collected errors, calculate statistics - if errors_x: - results[tid] = { - "x": self._calculate_stats(errors_x), - "y": self._calculate_stats(errors_y), - "z": self._calculate_stats(errors_z), - } - - return results - - def _find_bracketing_points( - self, timestamp: float, history: List[TargetState] - ) -> Tuple[Optional[TargetState], Optional[TargetState]]: - """ - Finds two points in a time-sorted history that surround a given timestamp. - """ - p1, p2 = None, None - for i in range(len(history) - 1): - if history[i][0] <= timestamp <= history[i + 1][0]: - p1 = history[i] - p2 = history[i + 1] - break - return p1, p2 - - def _interpolate( - self, timestamp: float, p1: TargetState, p2: TargetState - ) -> TargetState: - """ - Performs linear interpolation between two state points (p1 and p2) - to estimate the state at a given timestamp. - """ - ts1, x1, y1, z1 = p1 - ts2, x2, y2, z2 = p2 - - # Avoid division by zero if timestamps are identical - duration = ts2 - ts1 - if duration == 0: - return p1 - - # Calculate interpolation factor (how far timestamp is between ts1 and ts2) - factor = (timestamp - ts1) / duration - - # Interpolate each coordinate - interp_x = x1 + (x2 - x1) * factor - interp_y = y1 + (y2 - y1) * factor - interp_z = z1 + (z2 - z1) * factor - - return (timestamp, interp_x, interp_y, interp_z) - - def _calculate_stats(self, errors: List[float]) -> Dict[str, float]: - """Calculates mean, variance, and RMSE for a list of errors.""" - n = len(errors) - if n == 0: - return {"mean": 0, "variance": 0, "std_dev": 0, "rmse": 0} - - mean = sum(errors) / n - - # Variance and Standard Deviation - variance = sum((x - mean) ** 2 for x in errors) / n - std_dev = math.sqrt(variance) - - # Root Mean Square Error - rmse = math.sqrt(sum(x**2 for x in errors) / n) - - return { - "mean": mean, - "variance": variance, - "std_dev": std_dev, - "rmse": rmse, - } diff --git a/target_simulator/analysis/simulation_archive.py b/target_simulator/analysis/simulation_archive.py index 74de216..86f87f0 100644 --- a/target_simulator/analysis/simulation_archive.py +++ b/target_simulator/analysis/simulation_archive.py @@ -5,18 +5,18 @@ import json import time from datetime import datetime from typing import Dict, List, Any, Tuple, Optional +import math from target_simulator.core.models import Scenario -import math +from target_simulator.utils.csv_logger import append_row, flush_all_csv_buffers # Prefer pyproj for accurate geodesic calculations; fall back to a simple # equirectangular approximation when pyproj is not available. try: from pyproj import Geod - _GEOD = Geod(ellps="WGS84") _HAS_PYPROJ = True -except Exception: +except ImportError: _GEOD = None _HAS_PYPROJ = False @@ -27,28 +27,31 @@ RecordedState = Tuple[float, float, float, float] # (timestamp, x_ft, y_ft, z_f class SimulationArchive: """ Manages data collection for a single simulation run and saves it to a file. + Positions are streamed to an efficient CSV trail file for fast analysis. """ ARCHIVE_FOLDER = "archive_simulations" + TRAIL_HEADERS = ["timestamp", "source", "target_id", "x_ft", "y_ft", "z_ft"] + LATENCY_HEADERS = ["timestamp", "latency_ms"] def __init__(self, scenario: Scenario): - """ - Initializes a new archive session for a given scenario. - """ + """Initializes a new archive session for a given scenario.""" self.start_time = time.monotonic() self.scenario_name = scenario.name self.scenario_data = scenario.to_dict() - # Data structure to hold recorded events, indexed by target_id self.recorded_data: Dict[int, Dict[str, List[RecordedState]]] = {} - - # Data structure to hold the ownship's trajectory self.ownship_trajectory: List[Dict[str, Any]] = [] - # Data structure to hold computed georeferenced positions for real targets - # keyed by target_id -> list of {'timestamp': t, 'lat': ..., 'lon': ..., 'alt_ft': ...} self.recorded_geopos: Dict[int, List[Dict[str, Any]]] = {} self._ensure_archive_directory() + + # Generate a unique temporary trail filename based on precise start time + ts_str_precise = datetime.now().strftime("%Y%m%d_%H%M%S_%f") + self._temp_trail_filename = self._get_trail_filename(ts_str=ts_str_precise) + self._temp_latency_filename = self._get_latency_filename(ts_str=ts_str_precise) + self._cleanup_stale_trail_file(self._temp_trail_filename) + self._cleanup_stale_trail_file(self._temp_latency_filename) def _ensure_archive_directory(self): """Creates the main archive directory if it does not exist.""" @@ -58,84 +61,90 @@ class SimulationArchive: except OSError as e: print(f"Error creating archive directory: {e}") - def add_simulated_state( - self, target_id: int, timestamp: float, state: Tuple[float, ...] - ): - """Adds a simulated state to the archive.""" + def _get_trail_filename(self, ts_str: Optional[str] = None) -> str: + """Generates a filename for the CSV trail file.""" + if not ts_str: + ts_str = datetime.now().strftime("%Y%m%d_%H%M%S") + safe_scenario_name = "".join( + c for c in self.scenario_name if c.isalnum() or c in (" ", "_") + ).rstrip() + return f"{ts_str}_{safe_scenario_name}.trail.csv" + + def _get_latency_filename(self, ts_str: Optional[str] = None) -> str: + """Generates a filename for the CSV latency file.""" + if not ts_str: + ts_str = datetime.now().strftime("%Y%m%d_%H%M%S") + safe_scenario_name = "".join( + c for c in self.scenario_name if c.isalnum() or c in (" ", "_") + ).rstrip() + return f"{ts_str}_{safe_scenario_name}.latency.csv" + + def _cleanup_stale_trail_file(self, filename: str): + """Removes the specified trail file if it exists.""" + filepath = os.path.join(self.ARCHIVE_FOLDER, filename) + if os.path.exists(filepath): + try: + os.remove(filepath) + except OSError: + pass # Non-critical if removal fails + + def add_simulated_state(self, target_id: int, timestamp: float, state: Tuple[float, ...]): + """Adds a simulated state to the archive and streams it to the trail file.""" + # This part is kept for backward compatibility and direct access if needed if target_id not in self.recorded_data: self.recorded_data[target_id] = {"simulated": [], "real": []} - full_state: RecordedState = (timestamp, state[0], state[1], state[2]) self.recorded_data[target_id]["simulated"].append(full_state) - def add_real_state( - self, target_id: int, timestamp: float, state: Tuple[float, ...] - ): - """Adds a real state (from the server) to the archive.""" + # Stream to the temporary CSV trail file asynchronously + row = [timestamp, "simulated", target_id, state[0], state[1], state[2]] + append_row(self._temp_trail_filename, row, headers=self.TRAIL_HEADERS) + + def add_real_state(self, target_id: int, timestamp: float, state: Tuple[float, ...]): + """Adds a real state (from the server) to the archive and streams it.""" if target_id not in self.recorded_data: self.recorded_data[target_id] = {"simulated": [], "real": []} - full_state: RecordedState = (timestamp, state[0], state[1], state[2]) self.recorded_data[target_id]["real"].append(full_state) - # Attempt to compute and store geoposition for this real sample. + + # Stream to the temporary CSV trail file asynchronously + row = [timestamp, "real", target_id, state[0], state[1], state[2]] + append_row(self._temp_trail_filename, row, headers=self.TRAIL_HEADERS) + try: self._compute_and_store_geopos(target_id, timestamp, state) except Exception: - # Non-fatal: if geopositioning fails we simply skip it pass - def _compute_and_store_geopos( - self, target_id: int, timestamp: float, state: Tuple[float, ...] - ): - """Compute georeferenced lat/lon for a real state and store it in recorded_geopos. - - This method is separated for easier testing and clarity. - """ + def _compute_and_store_geopos(self, target_id: int, timestamp: float, state: Tuple[float, ...]): + """Compute georeferenced lat/lon for a real state and store it.""" if not self.ownship_trajectory: return - # Find ownship state closest in time - best = min( - self.ownship_trajectory, - key=lambda s: abs(s.get("timestamp", 0.0) - timestamp), - ) - own_lat = best.get("latitude") - own_lon = best.get("longitude") - own_pos = best.get("position_xy_ft") - if own_lat is None or own_lon is None or not own_pos: + best_ownship = min(self.ownship_trajectory, key=lambda s: abs(s.get("timestamp", 0.0) - timestamp)) + own_lat, own_lon, own_pos = best_ownship.get("latitude"), best_ownship.get("longitude"), best_ownship.get("position_xy_ft") + + if any(v is None for v in [own_lat, own_lon, own_pos]): return - # target and ownship positions are in feet: (x_east_ft, y_north_ft) - target_x_ft = float(state[0]) - target_y_ft = float(state[1]) - own_x_ft = float(own_pos[0]) - own_y_ft = float(own_pos[1]) + target_x_ft, target_y_ft = float(state[0]), float(state[1]) + own_x_ft, own_y_ft = float(own_pos[0]), float(own_pos[1]) - # Compute deltas in meters delta_east_m = (target_x_ft - own_x_ft) * 0.3048 delta_north_m = (target_y_ft - own_y_ft) * 0.3048 - # Use pyproj.Geod when available for accurate forward geodesic - target_lat = None - target_lon = None - if _HAS_PYPROJ and _GEOD is not None: - distance_m = math.hypot(delta_east_m, delta_north_m) - az_rad = math.atan2(delta_east_m, delta_north_m) - az_deg = math.degrees(az_rad) + target_lat, target_lon = None, None + if _HAS_PYPROJ and _GEOD: try: - lon2, lat2, _ = _GEOD.fwd( - float(own_lon), float(own_lat), az_deg, distance_m - ) - target_lat = lat2 - target_lon = lon2 + distance_m = math.hypot(delta_east_m, delta_north_m) + az_deg = math.degrees(math.atan2(delta_east_m, delta_north_m)) + lon2, lat2, _ = _GEOD.fwd(float(own_lon), float(own_lat), az_deg, distance_m) + target_lat, target_lon = lat2, lon2 except Exception: - # fall back to equirectangular below - target_lat = None - target_lon = None - - if target_lat is None or target_lon is None: - # Convert meters to degrees using a simple equirectangular approximation - R = 6378137.0 # Earth radius in meters (WGS84 sphere approx) + target_lat, target_lon = None, None + + if target_lat is None: + R = 6378137.0 dlat = (delta_north_m / R) * (180.0 / math.pi) lat_rad = math.radians(float(own_lat)) dlon = (delta_east_m / (R * math.cos(lat_rad))) * (180.0 / math.pi) @@ -144,51 +153,31 @@ class SimulationArchive: if target_id not in self.recorded_geopos: self.recorded_geopos[target_id] = [] - self.recorded_geopos[target_id].append( - { - "timestamp": timestamp, - "lat": round(target_lat, 7), - "lon": round(target_lon, 7), - "alt_ft": float(state[2]) if len(state) > 2 else None, - } - ) + self.recorded_geopos[target_id].append({ + "timestamp": timestamp, "lat": round(target_lat, 7), "lon": round(target_lon, 7), + "alt_ft": float(state[2]) if len(state) > 2 else None, + }) def add_ownship_state(self, state: Dict[str, Any]): - """ - Adds an ownship state sample to the archive's trajectory. - - Args: - state: A dictionary representing the ownship's state at a point in time. - """ + """Adds an ownship state sample to the archive's trajectory.""" self.ownship_trajectory.append(state) + + def add_latency_sample(self, timestamp: float, latency_ms: float): + """Adds a latency sample to the latency CSV file.""" + row = [timestamp, latency_ms] + append_row(self._temp_latency_filename, row, headers=self.LATENCY_HEADERS) def save(self, extra_metadata: Optional[Dict[str, Any]] = None) -> str: """ - Saves the complete simulation archive to a JSON file. - The filename is generated from the timestamp and scenario name. - Performance data is saved to a separate '.perf.csv' file. - - Args: - extra_metadata: An optional dictionary of metadata to add or - overwrite in the final archive file. - - Returns: - The path of the saved file. + Saves the simulation archive and performance data to separate files, + and finalizes the trail file by renaming it. """ end_time = time.monotonic() - - metadata = { - "scenario_name": self.scenario_name, - "start_timestamp_utc": datetime.utcnow().isoformat(), - "duration_seconds": end_time - self.start_time, - } - - # Merge extra metadata if provided - if extra_metadata: - metadata.update(extra_metadata) - - # --- Performance Data Separation --- - performance_samples = metadata.pop("performance_samples", None) + + # --- MODIFIED PART START --- + # Force a synchronous flush of all CSV data before proceeding + flush_all_csv_buffers() + # --- MODIFIED PART END --- ts_str = datetime.now().strftime("%Y%m%d_%H%M%S") safe_scenario_name = "".join( @@ -196,36 +185,72 @@ class SimulationArchive: ).rstrip() base_filename = f"{ts_str}_{safe_scenario_name}" - # Save performance data to a separate CSV file + # --- Finalize Trail File --- + from target_simulator.config import DEBUG_CONFIG + temp_folder = DEBUG_CONFIG.get("temp_folder_name", "Temp") + + final_trail_filename = f"{base_filename}.trail.csv" + try: + temp_trail_path = os.path.join(temp_folder, self._temp_trail_filename) + final_trail_path = os.path.join(self.ARCHIVE_FOLDER, final_trail_filename) + + if os.path.exists(temp_trail_path): + os.rename(temp_trail_path, final_trail_path) + else: + print(f"Warning: Temporary trail file not found at {temp_trail_path}. No trail file will be saved.") + final_trail_filename = "" + except OSError as e: + print(f"Warning: could not rename trail file: {e}") + final_trail_filename = self._temp_trail_filename + + # --- Finalize Latency File --- + final_latency_filename = f"{base_filename}.latency.csv" + try: + temp_latency_path = os.path.join(temp_folder, self._temp_latency_filename) + final_latency_path = os.path.join(self.ARCHIVE_FOLDER, final_latency_filename) + + if os.path.exists(temp_latency_path): + os.rename(temp_latency_path, final_latency_path) + else: + print(f"Warning: Temporary latency file not found at {temp_latency_path}. No latency file will be saved.") + final_latency_filename = "" + except OSError as e: + print(f"Warning: could not rename latency file: {e}") + final_latency_filename = self._temp_latency_filename + + metadata = { + "scenario_name": self.scenario_name, + "start_timestamp_utc": datetime.utcnow().isoformat(), + "duration_seconds": end_time - self.start_time, + } + if final_trail_filename: + metadata["trail_file"] = final_trail_filename + if final_latency_filename: + metadata["latency_file"] = final_latency_filename + + if extra_metadata: + metadata.update(extra_metadata) + + performance_samples = metadata.pop("performance_samples", None) + if performance_samples: import csv - perf_filename = f"{base_filename}.perf.csv" perf_filepath = os.path.join(self.ARCHIVE_FOLDER, perf_filename) - - # Define headers based on the keys of the first sample - headers = list(performance_samples[0].keys()) - + headers = list(performance_samples[0].keys()) if performance_samples else [] try: with open(perf_filepath, "w", newline="", encoding="utf-8") as f: writer = csv.writer(f) - - # Write metadata as commented header lines writer.writerow([f"# Scenario Name: {metadata.get('scenario_name')}"]) writer.writerow([f"# Start Timestamp (UTC): {metadata.get('start_timestamp_utc')}"]) writer.writerow([f"# Source File: {base_filename}.json"]) - - # Write the actual header row - writer.writerow(headers) - - # Write data rows - for sample in performance_samples: - writer.writerow([sample.get(h, "") for h in headers]) - + if headers: + writer.writerow(headers) + for sample in performance_samples: + writer.writerow([sample.get(h, "") for h in headers]) print(f"Performance data saved to: {perf_filepath}") except IOError as e: print(f"Error saving performance data CSV: {e}") - # --- End of Separation Logic --- archive_content = { "metadata": metadata, @@ -245,4 +270,4 @@ class SimulationArchive: return filepath except IOError as e: print(f"Error saving simulation archive: {e}") - return "" + return "" \ No newline at end of file diff --git a/target_simulator/config.py b/target_simulator/config.py index 69ea382..c63834c 100644 --- a/target_simulator/config.py +++ b/target_simulator/config.py @@ -26,7 +26,7 @@ DEBUG_CONFIG = { "temp_folder_name": "Temp", # Enable saving of IO traces (sent/received positions) to CSV files in Temp/ # Set to True during debugging to collect logs. - "enable_io_trace": False, + "enable_io_trace": True, "io_trace_sent_filename": "sent_positions.csv", "io_trace_received_filename": "received_positions.csv", # Enable performance profiling of packet processing diff --git a/target_simulator/gui/analysis_window.py b/target_simulator/gui/analysis_window.py index 17a5869..4512717 100644 --- a/target_simulator/gui/analysis_window.py +++ b/target_simulator/gui/analysis_window.py @@ -1,112 +1,114 @@ +# target_simulator/gui/analysis_window.py """ -A Toplevel window for displaying real-time performance analysis, including -error statistics and plots. +A Toplevel window for displaying performance analysis by processing +an efficient trail file. """ + import tkinter as tk from tkinter import ttk, messagebox import json import os import csv -from typing import Optional, Dict, List, Any -from target_simulator.analysis.performance_analyzer import PerformanceAnalyzer -from target_simulator.analysis.simulation_state_hub import SimulationStateHub +import math +import statistics +import warnings +from typing import Optional, Dict, List, Any, Tuple + from target_simulator.gui.performance_analysis_window import PerformanceAnalysisWindow + try: + import numpy as np from matplotlib.figure import Figure from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk MATPLOTLIB_AVAILABLE = True except ImportError: + np = None MATPLOTLIB_AVAILABLE = False + +# Constants for analysis +DOWNSAMPLE_THRESHOLD = 4000 # Number of points before downsampling is applied + class AnalysisWindow(tk.Toplevel): """ - A window that displays real-time analysis of tracking performance. + A window that displays tracking performance analysis by loading data + from an archive's main JSON file and its associated `.trail.csv`. """ + def __init__(self, master, archive_filepath: str): super().__init__(master) self.title(f"Analysis for: {os.path.basename(archive_filepath)}") - self.geometry("900x750") + self.geometry("1100x800") self.archive_filepath = archive_filepath + self.trail_filepath: Optional[str] = None self.performance_data_path: Optional[str] = None self.scenario_name = "Unknown" + self.target_ids: List[int] = [] - # State variables - self.selected_target_id = tk.IntVar(value=0) - self._active = True - self._filtered_errors = None - + self.selected_target_id = tk.IntVar() + self._show_loading_window(archive_filepath) def _load_data_and_setup(self, filepath: str): - """Loads data from the main archive and finds the performance data file.""" + """Loads metadata from the main archive and finds associated data files.""" try: with open(filepath, "r", encoding="utf-8") as f: archive_data = json.load(f) except Exception as e: - raise IOError(f"Could not load archive file.\n{e}") + raise IOError(f"Could not load archive file: {e}") metadata = archive_data.get("metadata", {}) - self.estimated_latency_ms = metadata.get("estimated_latency_ms") - self.prediction_offset_ms = metadata.get("prediction_offset_ms") self.scenario_name = metadata.get("scenario_name", "Unknown") - - latency_samples = metadata.get("latency_samples", []) - self.latency_timestamps = [s[0] for s in latency_samples if isinstance(s, list) and len(s) > 1] - self.latency_values_ms = [s[1] for s in latency_samples if isinstance(s, list) and len(s) > 1] + self.title(f"Analysis - {self.scenario_name}") - self._hub = SimulationStateHub(history_size=100000) - results = archive_data.get("simulation_results", {}) - for target_id_str, data in results.items(): - target_id = int(target_id_str) - for state in data.get("simulated", []): - self._hub.add_simulated_state(target_id, state[0], tuple(state[1:])) - for state in data.get("real", []): - self._hub.add_real_state(target_id, state[0], tuple(state[1:])) - - self._analyzer = PerformanceAnalyzer(self._hub) + # Find the associated trail, latency, and performance files + base_path, _ = os.path.splitext(filepath) + self.trail_filepath = f"{base_path}.trail.csv" + self.latency_filepath = f"{base_path}.latency.csv" + self.performance_data_path = f"{base_path}.perf.csv" - # Find the associated performance data file - self.performance_data_path = self._find_performance_data_file(filepath) + if not os.path.exists(self.trail_filepath): + raise FileNotFoundError(f"Required trail file not found: {self.trail_filepath}") - def _find_performance_data_file(self, archive_path: str) -> Optional[str]: - """Finds the .perf.csv or .perf.json file associated with an archive.""" - base_path, _ = os.path.splitext(archive_path) - - # Prefer the new CSV format - csv_path = f"{base_path}.perf.csv" - if os.path.exists(csv_path): - return csv_path + # Get available target IDs from the trail file header + with open(self.trail_filepath, "r", encoding="utf-8") as f: + reader = csv.reader(f) + headers = next(reader, []) + if "target_id" not in headers: + raise ValueError("Trail file missing 'target_id' column.") - # Fallback to the old JSON format for backward compatibility - json_path = f"{base_path}.perf.json" - if os.path.exists(json_path): - return json_path - - return None + target_id_index = headers.index('target_id') + ids = set() + for row in reader: + if row and not row[0].startswith('#'): + try: + ids.add(int(row[target_id_index])) + except (ValueError, IndexError): + continue + self.target_ids = sorted(list(ids)) def _show_loading_window(self, archive_filepath: str): - """Show a loading dialog and load data asynchronously.""" + """Shows a loading dialog and loads data in the background.""" loading_dialog = tk.Toplevel(self) + # ... (loading dialog implementation is unchanged) loading_dialog.title("Loading Analysis") loading_dialog.geometry("400x150") loading_dialog.transient(self) loading_dialog.grab_set() - loading_dialog.update_idletasks() - x = self.winfo_x() + (self.winfo_width() // 2) - (loading_dialog.winfo_width() // 2) - y = self.winfo_y() + (self.winfo_height() // 2) - (loading_dialog.winfo_height() // 2) + x = self.winfo_x() + (self.winfo_width()//2) - (loading_dialog.winfo_width()//2) + y = self.winfo_y() + (self.winfo_height()//2) - (loading_dialog.winfo_height()//2) loading_dialog.geometry(f"+{x}+{y}") - ttk.Label(loading_dialog, text="Loading simulation data...", font=("Segoe UI", 11)).pack(pady=(20, 10)) progress_label = ttk.Label(loading_dialog, text="Please wait", font=("Segoe UI", 9)) progress_label.pack(pady=5) progress = ttk.Progressbar(loading_dialog, mode='indeterminate', length=300) progress.pack(pady=10) progress.start(10) - + def load_and_display(): try: - progress_label.config(text="Reading archive file...") + progress_label.config(text="Locating data files...") self.update() self._load_data_and_setup(archive_filepath) @@ -114,11 +116,14 @@ class AnalysisWindow(tk.Toplevel): self.update() self._create_widgets() - progress_label.config(text="Analyzing data...") + progress_label.config(text="Ready.") self.update() - self._populate_analysis() loading_dialog.destroy() + + # Trigger initial analysis + self._on_target_select() + except Exception as e: loading_dialog.destroy() messagebox.showerror("Analysis Error", f"Failed to load analysis:\n{e}", parent=self) @@ -126,15 +131,6 @@ class AnalysisWindow(tk.Toplevel): self.after(100, load_and_display) - def _populate_analysis(self): - """Runs the analysis and populates the widgets once.""" - self._update_target_selector() - target_ids = self.target_selector["values"] - if target_ids: - self.selected_target_id.set(target_ids[0]) - - self._on_target_select() - def _create_widgets(self): main_pane = ttk.PanedWindow(self, orient=tk.VERTICAL) main_pane.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) @@ -146,255 +142,394 @@ class AnalysisWindow(tk.Toplevel): plot_frame = ttk.LabelFrame(main_pane, text="Error Over Time (feet)") main_pane.add(plot_frame, weight=4) self._create_plot_widgets(plot_frame) - + def _create_stats_widgets(self, parent): - container = ttk.Frame(parent) - container.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) - - left = ttk.Frame(container) - left.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) - - right = ttk.Frame(container) - right.pack(side=tk.RIGHT, fill=tk.Y) - - top_bar = ttk.Frame(left) - top_bar.pack(fill=tk.X, padx=0, pady=(0, 6)) - + # Configure grid per il layout + parent.rowconfigure(0, weight=0) # Top bar + parent.rowconfigure(1, weight=1) # Content area + parent.columnconfigure(0, weight=1) + + # Top bar con combobox e pulsante + top_bar = ttk.Frame(parent, padding=5) + top_bar.grid(row=0, column=0, sticky="ew") + ttk.Label(top_bar, text="Select Target ID:").pack(side=tk.LEFT) self.target_selector = ttk.Combobox( - top_bar, textvariable=self.selected_target_id, state="readonly", width=5 + top_bar, textvariable=self.selected_target_id, state="readonly", width=5, values=self.target_ids ) self.target_selector.pack(side=tk.LEFT, padx=5) self.target_selector.bind("<>", self._on_target_select) + if self.target_ids: + self.selected_target_id.set(self.target_ids[0]) - sync_frame = ttk.Frame(top_bar) - sync_frame.pack(side=tk.LEFT, padx=(20, 0)) + # Performance Analysis button (always visible, disabled if no data) + perf_button = ttk.Button(top_bar, text="Open Performance Analysis", command=self._open_performance_window) + perf_button.pack(side=tk.LEFT, padx=(20, 0)) + if not os.path.exists(self.performance_data_path): + perf_button.config(state="disabled") - if self.estimated_latency_ms is not None: - ttk.Label(sync_frame, text="Avg. Latency:").pack(side=tk.LEFT) - ttk.Label( - sync_frame, text=f"{self.estimated_latency_ms:.1f} ms", - font=("Segoe UI", 9, "bold"), foreground="blue" - ).pack(side=tk.LEFT, padx=4) - - if self.prediction_offset_ms is not None: - ttk.Label(sync_frame, text="Prediction Offset:").pack(side=tk.LEFT, padx=(10, 0)) - ttk.Label( - sync_frame, text=f"{self.prediction_offset_ms:.1f} ms", - font=("Segoe UI", 9, "bold"), foreground="green" - ).pack(side=tk.LEFT, padx=4) + # Content container diviso in due colonne + content_frame = ttk.Frame(parent) + content_frame.grid(row=1, column=0, sticky="nsew", padx=5, pady=5) + content_frame.columnconfigure(0, weight=1, uniform="half") + content_frame.columnconfigure(1, weight=1, uniform="half") + content_frame.rowconfigure(0, weight=1) + + # Left: Stats table + table_container = ttk.Frame(content_frame) + table_container.grid(row=0, column=0, sticky="nsew", padx=(0, 2)) - # The button is now conditional - if self.performance_data_path: - perf_button = ttk.Button( - sync_frame, text="Performance Analysis...", command=self._open_performance_window - ) - perf_button.pack(side=tk.LEFT, padx=(20, 0)) - columns = ("error_type", "mean", "std_dev", "rmse") - self.stats_tree = ttk.Treeview(left, columns=columns, show="headings", height=4) + self.stats_tree = ttk.Treeview(table_container, columns=columns, show="headings", height=4) self.stats_tree.heading("error_type", text="") self.stats_tree.heading("mean", text="Mean (ft)") self.stats_tree.heading("std_dev", text="Std Dev (ft)") self.stats_tree.heading("rmse", text="RMSE (ft)") self.stats_tree.column("error_type", width=100, anchor=tk.W) - self.stats_tree.column("mean", anchor=tk.E, width=100) - self.stats_tree.column("std_dev", anchor=tk.E, width=100) - self.stats_tree.column("rmse", anchor=tk.E, width=100) + self.stats_tree.column("mean", anchor=tk.E, width=120) + self.stats_tree.column("std_dev", anchor=tk.E, width=120) + self.stats_tree.column("rmse", anchor=tk.E, width=120) self.stats_tree.pack(fill=tk.BOTH, expand=True) - - legend_title = ttk.Label(right, text="How to Interpret Results:", font=(None, 9, "bold")) - legend_title.pack(anchor=tk.NW, padx=(6, 6), pady=(4, 4)) + + # Right: Legend frame + legend_frame = ttk.Frame(content_frame) + legend_frame.grid(row=0, column=1, sticky="nsew", padx=(2, 0)) + + legend_title = ttk.Label(legend_frame, text="How to Interpret Results:", font=("Segoe UI", 9, "bold")) + legend_title.pack(anchor=tk.NW, pady=(0, 5)) + explanation_text = ( - "Formula: Error = Real Position - Simulated Position\n\n" - "Sign of Error (e.g., on X axis):\n" - "• Positive Error (+): Real target is at a larger X coordinate.\n" - "• Negative Error (-): Real target is at a smaller X coordinate.\n\n" - "Prediction Offset:\n" - "A manual offset to compensate for server processing delay." + "Error = Real - Simulated Position\n\n" + "Sign (e.g., X axis):\n" + "• Positive: Real target at larger X\n" + "• Negative: Real target at smaller X\n\n" + "Spike Filtering:\n" + "Transients >20x median filtered\n" + "from plots and statistics.\n\n" + "Latency:\n" + "Time from packet generation\n" + "(server) to reception (client)." ) - ttk.Label(right, text=explanation_text, justify=tk.LEFT, wraplength=280).pack(anchor=tk.NW, padx=(6, 6)) + ttk.Label(legend_frame, text=explanation_text, justify=tk.LEFT, font=("Segoe UI", 9)).pack(anchor=tk.NW, fill=tk.BOTH, expand=True) def _create_plot_widgets(self, parent): - fig = Figure(figsize=(5, 6), dpi=100) - gs = fig.add_gridspec(2, 1, height_ratios=[2, 1], hspace=0.35, top=0.95) + if not MATPLOTLIB_AVAILABLE: + ttk.Label(parent, text="Matplotlib is required for plotting.").pack() + return - self.ax = fig.add_subplot(gs[0, 0]) + fig = Figure(figsize=(5, 7), dpi=100) + + # Check if latency file exists to determine subplot layout + has_latency = os.path.exists(self.latency_filepath) + + if has_latency: + # Two subplots: errors (top) and latency (bottom) + gs = fig.add_gridspec(2, 1, height_ratios=[2, 1], hspace=0.3, top=0.95) + self.ax = fig.add_subplot(gs[0, 0]) + self.ax_latency = fig.add_subplot(gs[1, 0], sharex=self.ax) + else: + # Single subplot: just errors + self.ax = fig.add_subplot(111) + self.ax_latency = None + + # Error plot self.ax.set_title("Instantaneous Error") - self.ax.set_xlabel("Time (s)") self.ax.set_ylabel("Error (ft)") - (self.line_x,) = self.ax.plot([], [], lw=2, label="Error X") - (self.line_y,) = self.ax.plot([], [], lw=2, label="Error Y") - (self.line_z,) = self.ax.plot([], [], lw=2, label="Error Z") - self.ax.grid(True) - self.ax.axhline(0.0, color="black", lw=1, linestyle="--", alpha=0.8) + (self.line_x,) = self.ax.plot([], [], lw=1.5, label="Error X", color='#1f77b4') + (self.line_y,) = self.ax.plot([], [], lw=1.5, label="Error Y", color='#ff7f0e') + (self.line_z,) = self.ax.plot([], [], lw=1.5, label="Error Z", color='#2ca02c') + self.ax.grid(True, alpha=0.3) + self.ax.axhline(0.0, color="black", lw=1, linestyle="--", alpha=0.5) self.ax.legend(loc="upper right", fontsize=9) - - self.ax_latency = fig.add_subplot(gs[1, 0], sharex=self.ax) - self.ax_latency.set_title("Latency Evolution") - self.ax_latency.set_xlabel("Time (s)") - self.ax_latency.set_ylabel("Latency (ms)") - (self.line_latency,) = self.ax_latency.plot([], [], lw=2, color="orange", label="Latency") - self.ax_latency.grid(True) - self.ax_latency.legend(loc="upper right", fontsize=9) - fig.tight_layout() + if not has_latency: + self.ax.set_xlabel("Time (s)") - plot_container = ttk.Frame(parent) - plot_container.pack(fill=tk.BOTH, expand=True) + # Latency plot (if file exists) + if has_latency: + self.ax_latency.set_title("Latency Evolution") + self.ax_latency.set_xlabel("Time (s)") + self.ax_latency.set_ylabel("Latency (ms)") + (self.line_latency,) = self.ax_latency.plot([], [], lw=1.5, color='#d62728', label='Latency') + self.ax_latency.grid(True, alpha=0.3) + self.ax_latency.legend(loc="upper right", fontsize=9) + else: + self.line_latency = None - toolbar_frame = ttk.Frame(plot_container) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + fig.tight_layout() + + canvas_frame = ttk.Frame(parent) + canvas_frame.pack(fill=tk.BOTH, expand=True) + toolbar_frame = ttk.Frame(canvas_frame) toolbar_frame.pack(side=tk.TOP, fill=tk.X) - - self.canvas = FigureCanvasTkAgg(fig, master=plot_container) + self.canvas = FigureCanvasTkAgg(fig, master=canvas_frame) toolbar = NavigationToolbar2Tk(self.canvas, toolbar_frame) toolbar.update() - self.canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=True) - self.canvas.draw() - def _update_target_selector(self): - try: - target_ids = sorted(self._hub.get_all_target_ids()) - if target_ids: - self.target_selector["values"] = target_ids - if self.selected_target_id.get() not in target_ids: - self.selected_target_id.set(target_ids[0]) - except Exception: - pass + def _on_target_select(self, event=None): + """Initiates analysis for the selected target.""" + if not self.trail_filepath: + return - def _update_stats_table(self, results: Dict): + target_id = self.selected_target_id.get() + + # Analyze data (fast operation now) + timestamps, errors, stats = self._analyze_trail_file(target_id) + + # Update UI - load latency first so stats table can include it + self._update_latency_plot() + self._update_stats_table(stats) + self._update_plot(timestamps, errors) + + def _analyze_trail_file(self, target_id: int) -> Tuple[List[float], Dict[str, List[float]], Dict[str, Dict[str, float]]]: + """ + Analyzes the trail file for a specific target using an efficient + two-pointer algorithm. + """ + sim_points = [] + real_points = [] + + with open(self.trail_filepath, 'r', encoding='utf-8') as f: + reader = csv.DictReader(line for line in f if not line.startswith('#')) + for row in reader: + try: + if int(row['target_id']) == target_id: + point = (float(row['timestamp']), float(row['x_ft']), float(row['y_ft']), float(row['z_ft'])) + if row['source'] == 'simulated': + sim_points.append(point) + elif row['source'] == 'real': + real_points.append(point) + except (ValueError, KeyError): + continue + + if not sim_points or not real_points: + return [], {}, {} + + # --- Two-Pointer Algorithm for Error Calculation --- + timestamps, errors_x, errors_y, errors_z = [], [], [], [] + sim_idx = 0 + for real_p in real_points: + real_ts, real_x, real_y, real_z = real_p + + # Advance sim_idx to find the bracketing segment for the current real point + while sim_idx + 1 < len(sim_points) and sim_points[sim_idx + 1][0] < real_ts: + sim_idx += 1 + + if sim_idx + 1 < len(sim_points): + p1 = sim_points[sim_idx] + p2 = sim_points[sim_idx + 1] + + # Check if the real point is within this segment + if p1[0] <= real_ts <= p2[0]: + # Interpolate + ts1, x1, y1, z1 = p1 + ts2, x2, y2, z2 = p2 + duration = ts2 - ts1 + if duration == 0: continue + + factor = (real_ts - ts1) / duration + interp_x = x1 + (x2 - x1) * factor + interp_y = y1 + (y2 - y1) * factor + interp_z = z1 + (z2 - z1) * factor + + timestamps.append(real_ts) + errors_x.append(real_x - interp_x) + errors_y.append(real_y - interp_y) + errors_z.append(real_z - interp_z) + + errors = {'x': errors_x, 'y': errors_y, 'z': errors_z} + + # Calculate final statistics on the full (non-downsampled) data + stats = {} + for axis, err_list in errors.items(): + if not err_list: + stats[axis] = {'mean': 0, 'std_dev': 0, 'rmse': 0} + continue + mean = statistics.mean(err_list) + stdev = statistics.stdev(err_list) if len(err_list) > 1 else 0 + rmse = math.sqrt(sum(e**2 for e in err_list) / len(err_list)) + stats[axis] = {'mean': mean, 'std_dev': stdev, 'rmse': rmse} + + return timestamps, errors, stats + + def _downsample_data(self, timestamps: List, errors: Dict) -> Tuple[List, Dict]: + """Reduces the number of points for plotting while preserving shape.""" + if len(timestamps) <= DOWNSAMPLE_THRESHOLD: + return timestamps, errors + + # Simple interval-based downsampling + step = len(timestamps) // DOWNSAMPLE_THRESHOLD + + ts_down = timestamps[::step] + err_down = { + 'x': errors['x'][::step], + 'y': errors['y'][::step], + 'z': errors['z'][::step], + } + return ts_down, err_down + + def _update_stats_table(self, stats: Dict): + """Populates the stats Treeview with calculated metrics.""" self.stats_tree.delete(*self.stats_tree.get_children()) - if hasattr(self, '_filtered_errors') and self._filtered_errors: - import math - for axis in ["x", "y", "z"]: - errors = self._filtered_errors.get(axis, []) - if errors: - n = len(errors) - mean = sum(errors) / n - variance = sum((x - mean) ** 2 for x in errors) / n - std_dev = math.sqrt(variance) - rmse = math.sqrt(sum(x**2 for x in errors) / n) - self.stats_tree.insert("", "end", values=(f"Error {axis.upper()}", f"{mean:.3f}", f"{std_dev:.3f}", f"{rmse:.3f}")) - else: - self.stats_tree.insert("", "end", values=(f"Error {axis.upper()}", "N/A", "N/A", "N/A")) - else: - for axis in ["x", "y", "z"]: - self.stats_tree.insert("", "end", values=(f"Error {axis.upper()}", f"{results[axis]['mean']:.3f}", f"{results[axis]['std_dev']:.3f}", f"{results[axis]['rmse']:.3f}")) - - if self.latency_values_ms: - import statistics - lat_mean = statistics.mean(self.latency_values_ms) - lat_std = statistics.stdev(self.latency_values_ms) if len(self.latency_values_ms) > 1 else 0.0 - lat_min = min(self.latency_values_ms) - lat_max = max(self.latency_values_ms) - self.stats_tree.insert("", "end", values=("Latency (ms)", f"{lat_mean:.2f}", f"{lat_std:.2f}", f"{lat_min:.2f} - {lat_max:.2f}")) - - def _update_plot(self, target_id: int): - history = self._hub.get_target_history(target_id) - if not history or not history["real"] or len(history["simulated"]) < 2: - self._clear_views() - return - - times, errors_x, errors_y, errors_z = [], [], [], [] - sim_hist = sorted(history["simulated"]) - for real_state in history["real"]: - real_ts, real_x, real_y, real_z = real_state - p1, p2 = self._analyzer._find_bracketing_points(real_ts, sim_hist) - if p1 and p2: - _ts, interp_x, interp_y, interp_z = self._analyzer._interpolate(real_ts, p1, p2) - times.append(real_ts) - errors_x.append(real_x - interp_x) - errors_y.append(real_y - interp_y) - errors_z.append(real_z - interp_z) - - if not times: - self._clear_views() - return + for axis, data in stats.items(): + self.stats_tree.insert("", "end", values=( + f"Error {axis.upper()}", + f"{data['mean']:.3f}", + f"{data['std_dev']:.3f}", + f"{data['rmse']:.3f}", + )) - # Filtering logic - import statistics - sample_errors = [] - min_time = min(times) - for i, t in enumerate(times): - if min_time + 5.0 <= t <= min_time + 15.0: - sample_errors.append((errors_x[i]**2 + errors_y[i]**2 + errors_z[i]**2) ** 0.5) + # Add latency statistics if available + if hasattr(self, '_latency_data') and self._latency_data: + lat_mean = statistics.mean(self._latency_data) + lat_std = statistics.stdev(self._latency_data) if len(self._latency_data) > 1 else 0.0 + lat_min = min(self._latency_data) + lat_max = max(self._latency_data) + self.stats_tree.insert("", "end", values=( + "Latency (ms)", + f"{lat_mean:.2f}", + f"{lat_std:.2f}", + f"{lat_min:.2f} - {lat_max:.2f}" + )) + + def _update_plot(self, timestamps: List[float], errors: Dict[str, List[float]]): + """Updates the matplotlib plot with (potentially downsampled) data.""" + # Apply spike filtering + filtered_ts, filtered_errors, spike_count, max_spike_error, max_spike_time = self._filter_spikes(timestamps, errors) - threshold = max(statistics.median(sample_errors) * 20, 500.0) if sample_errors else 1000.0 - - filtered_times, filtered_x, filtered_y, filtered_z = [], [], [], [] - outlier_count = 0 - for i, t in enumerate(times): - if (errors_x[i]**2 + errors_y[i]**2 + errors_z[i]**2) ** 0.5 > threshold: - outlier_count += 1 - else: - filtered_times.append(t) - filtered_x.append(errors_x[i]) - filtered_y.append(errors_y[i]) - filtered_z.append(errors_z[i]) + # Downsample if needed + ts_plot, errors_plot = self._downsample_data(filtered_ts, filtered_errors) - self._filtered_errors = {'x': filtered_x, 'y': filtered_y, 'z': filtered_z} - - self.line_x.set_data(filtered_times, filtered_x) - self.line_y.set_data(filtered_times, filtered_y) - self.line_z.set_data(filtered_times, filtered_z) - + self.line_x.set_data(ts_plot, errors_plot['x']) + self.line_y.set_data(ts_plot, errors_plot['y']) + self.line_z.set_data(ts_plot, errors_plot['z']) + + # Remove old spike annotations for txt in getattr(self.ax, '_spike_annotations', []): txt.remove() self.ax._spike_annotations = [] - if outlier_count > 0: - txt = self.ax.text(0.02, 0.98, f"⚠ {outlier_count} spike(s) filtered", transform=self.ax.transAxes, - verticalalignment='top', bbox=dict(boxstyle='round', facecolor='yellow', alpha=0.7), fontsize=9) + # Add spike annotation if any were filtered + if spike_count > 0: + annotation_text = ( + f"⚠ {spike_count} acquisition spike(s) filtered\n" + f"(max error: {max_spike_error:.0f} ft at t={max_spike_time:.1f}s)\n" + f"Spikes excluded from statistics" + ) + txt = self.ax.text( + 0.02, 0.98, annotation_text, + transform=self.ax.transAxes, + verticalalignment='top', + bbox=dict(boxstyle='round', facecolor='yellow', alpha=0.7), + fontsize=8 + ) self.ax._spike_annotations.append(txt) - + self.ax.relim() self.ax.autoscale_view() self.canvas.draw_idle() - - def _update_latency_plot(self): - if self.latency_values_ms and self.latency_timestamps: - self.line_latency.set_data(self.latency_timestamps, self.latency_values_ms) - else: - self.line_latency.set_data([], []) + + def _filter_spikes(self, timestamps: List[float], errors: Dict[str, List[float]]) -> tuple: + """Filters acquisition spikes from error data.""" + if not timestamps: + return timestamps, errors, 0, 0.0, 0.0 - self.ax_latency.relim() - self.ax_latency.autoscale_view() - self.canvas.draw_idle() - - def _clear_views(self): - self.stats_tree.delete(*self.stats_tree.get_children()) - self.line_x.set_data([], []) - self.line_y.set_data([], []) - self.line_z.set_data([], []) - self.line_latency.set_data([], []) - for ax in [self.ax, self.ax_latency]: - ax.relim() - ax.autoscale_view() - self.canvas.draw_idle() + # Calculate magnitude for each point + magnitudes = [] + for i in range(len(timestamps)): + mag = math.sqrt(errors['x'][i]**2 + errors['y'][i]**2 + errors['z'][i]**2) + magnitudes.append(mag) + + # Sample a window 5-15 seconds into the simulation to compute threshold + min_time = min(timestamps) + sample_mags = [] + for i, t in enumerate(timestamps): + if min_time + 5.0 <= t <= min_time + 15.0: + sample_mags.append(magnitudes[i]) + + if not sample_mags: + return timestamps, errors, 0, 0.0, 0.0 + + # Threshold: 20x the median error magnitude in the sample window + threshold = max(statistics.median(sample_mags) * 20, 500.0) + + # Filter out spikes + filtered_ts = [] + filtered_errors = {'x': [], 'y': [], 'z': []} + spike_count = 0 + max_spike_error = 0.0 + max_spike_time = 0.0 + + for i in range(len(timestamps)): + if magnitudes[i] > threshold: + spike_count += 1 + if magnitudes[i] > max_spike_error: + max_spike_error = magnitudes[i] + max_spike_time = timestamps[i] + else: + filtered_ts.append(timestamps[i]) + filtered_errors['x'].append(errors['x'][i]) + filtered_errors['y'].append(errors['y'][i]) + filtered_errors['z'].append(errors['z'][i]) + + return filtered_ts, filtered_errors, spike_count, max_spike_error, max_spike_time + + def _update_latency_plot(self): + """Updates the latency subplot with data from the latency CSV file.""" + if not self.ax_latency or not self.line_latency: + self._latency_data = [] + return + + if not os.path.exists(self.latency_filepath): + self.line_latency.set_data([], []) + self._latency_data = [] + self.ax_latency.relim() + self.ax_latency.autoscale_view() + self.canvas.draw_idle() + return + + timestamps = [] + latencies = [] + + try: + with open(self.latency_filepath, 'r', encoding='utf-8') as f: + reader = csv.DictReader(line for line in f if not line.startswith('#')) + for row in reader: + try: + timestamps.append(float(row['timestamp'])) + latencies.append(float(row['latency_ms'])) + except (ValueError, KeyError): + continue + + # Save full data for statistics + self._latency_data = latencies + + # Downsample for plotting if needed + ts_plot = timestamps + lat_plot = latencies + if len(timestamps) > DOWNSAMPLE_THRESHOLD: + step = len(timestamps) // DOWNSAMPLE_THRESHOLD + ts_plot = timestamps[::step] + lat_plot = latencies[::step] + + self.line_latency.set_data(ts_plot, lat_plot) + self.ax_latency.relim() + self.ax_latency.autoscale_view() + self.canvas.draw_idle() + except Exception as e: + self.line_latency.set_data([], []) + self._latency_data = [] + print(f"Warning: Failed to load latency data: {e}") def _open_performance_window(self): - """Open the dedicated performance analysis window.""" - if not self.performance_data_path: - messagebox.showinfo("No Data", "No performance data file found for this simulation run.", parent=self) + """Opens the dedicated performance analysis window.""" + if not self.performance_data_path or not os.path.exists(self.performance_data_path): + messagebox.showinfo("No Data", "No performance data file found for this run.", parent=self) return - try: - # Pass the path to the CSV file to the performance window PerformanceAnalysisWindow(parent=self, performance_csv_path=self.performance_data_path) except Exception as e: - messagebox.showerror("Performance Analysis Error", f"Failed to open performance analysis:\n{e}", parent=self) - - def _on_target_select(self, event=None): - """Handle combobox selection changes and update stats/plot.""" - try: - sel_id = self.selected_target_id.get() - analysis_results = self._analyzer.analyze() - if sel_id in analysis_results: - self._update_plot(sel_id) # Update plot first to calculate filtered errors - self._update_stats_table(analysis_results[sel_id]) # Then update table with filtered stats - else: - self._clear_views() - self._update_latency_plot() - except Exception: - self._clear_views() \ No newline at end of file + messagebox.showerror("Error", f"Failed to open performance analysis:\n{e}", parent=self) \ No newline at end of file diff --git a/target_simulator/gui/main_view.py b/target_simulator/gui/main_view.py index 16b9cf2..cba8154 100644 --- a/target_simulator/gui/main_view.py +++ b/target_simulator/gui/main_view.py @@ -58,7 +58,6 @@ from target_simulator.gui.sfp_debug_window import SfpDebugWindow from target_simulator.gui.logger_panel import LoggerPanel from target_simulator.core.sfp_communicator import SFPCommunicator from target_simulator.analysis.simulation_state_hub import SimulationStateHub -from target_simulator.analysis.performance_analyzer import PerformanceAnalyzer from target_simulator.gui.analysis_window import AnalysisWindow from target_simulator.core import command_builder from target_simulator.analysis.simulation_archive import SimulationArchive @@ -114,7 +113,6 @@ class MainView(tk.Tk): # --- Initialize the data hub and controllers --- self.simulation_hub = SimulationStateHub() - self.performance_analyzer = PerformanceAnalyzer(self.simulation_hub) self.communicator_manager = CommunicatorManager( simulation_hub=self.simulation_hub, diff --git a/target_simulator/gui/payload_router.py b/target_simulator/gui/payload_router.py index 7316edb..f36472c 100644 --- a/target_simulator/gui/payload_router.py +++ b/target_simulator/gui/payload_router.py @@ -56,7 +56,6 @@ class DebugPayloadRouter: self._history = collections.deque(maxlen=self._sfp_debug_history_size) self._persist = False - self._latency_samples = collections.deque(maxlen=10000) self._hub = simulation_hub self._last_ownship_update_time: Optional[float] = None self._ris_target_listeners: List[TargetListListener] = [] @@ -212,8 +211,12 @@ class DebugPayloadRouter: est_gen = self._clock_sync.to_client_time(server_timetag) latency = reception_timestamp - est_gen if latency >= 0 and self.active_archive is not None: + # Save latency to CSV file via archive + latency_ms = latency * 1000 # Convert to milliseconds with self._lock: - self._latency_samples.append((reception_timestamp, latency)) + archive = self.active_archive + if archive and hasattr(archive, 'add_latency_sample'): + archive.add_latency_sample(reception_timestamp, latency_ms) except Exception: pass t_clock_end = time.perf_counter() @@ -281,7 +284,7 @@ class DebugPayloadRouter: if total_processing_time > 0.010 or self._perf_counters['_total_packet_count'] % 100 == 0: if self._perf_samples is not None: - self._perf_samples.append({ + sample = { 'timestamp': reception_timestamp, 'total_ms': round(total_processing_time * 1000, 3), 'parse_ms': round((t_parse_end - t_parse_start) * 1000, 3), @@ -289,7 +292,10 @@ class DebugPayloadRouter: 'archive_ms': round((t_archive_end - t_archive_start) * 1000, 3), 'listener_ms': round((t_listener_end - t_listener_start) * 1000, 3), 'clock_ms': round((t_clock_end - t_clock_start) * 1000, 3), - }) + } + self._perf_samples.append(sample) + if len(self._perf_samples) % 500 == 0: + self.logger.debug(f"Performance samples buffer: {len(self._perf_samples)} samples") current_time = time.time() if current_time - self._perf_counters['last_report_time'] >= 5.0: @@ -302,7 +308,6 @@ class DebugPayloadRouter: with self._lock: self.active_archive = archive if archive is not None: - self._latency_samples.clear() if self._perf_samples is not None: self._perf_samples.clear() # Reset all counters at the start of a new archive @@ -313,7 +318,7 @@ class DebugPayloadRouter: 'clock_sync_time_total': 0.0, 'max_processing_time': 0.0, 'last_report_time': time.time() }) - self.logger.debug("Latency and performance buffers cleared for new simulation") + self.logger.debug("Performance buffers cleared for new simulation") def add_ris_target_listener(self, listener: TargetListListener): with self._lock: @@ -459,34 +464,49 @@ class DebugPayloadRouter: return pkt def get_estimated_latency_s(self) -> float: - return self._clock_sync.get_average_latency_s() if self._clock_sync else 0.0 + try: + if self._clock_sync: + return self._clock_sync.get_average_latency_s() + except Exception: + pass + return 0.0 def get_latency_samples(self, limit: Optional[int] = None) -> List[tuple]: - with self._lock: - samples = list(self._latency_samples) - return samples[-limit:] if limit else samples + try: + if self._clock_sync: + samples = self._clock_sync.get_latency_history() + return samples[-limit:] if limit else samples + except Exception: + pass + return [] def get_latency_stats(self, sample_limit: int = 200) -> Dict[str, Any]: - with self._lock: - samples = list(self._latency_samples) - if not samples: - return {"count": 0} - samples = samples[-sample_limit:] if sample_limit else samples - ms = [s[1] * 1000.0 for s in samples] - return { - "mean_ms": round(statistics.mean(ms), 3), - "std_ms": round(statistics.stdev(ms) if len(ms) > 1 else 0.0, 3), - "min_ms": round(min(ms), 3), - "max_ms": round(max(ms), 3), - "count": len(ms), - } + try: + samples = self.get_latency_samples(limit=sample_limit) + if not samples: + return {"count": 0} + + # samples are (reception_time, latency_s) + ms = [s[1] * 1000.0 for s in samples] + return { + "mean_ms": round(statistics.mean(ms), 3), + "std_ms": round(statistics.stdev(ms) if len(ms) > 1 else 0.0, 3), + "min_ms": round(min(ms), 3), + "max_ms": round(max(ms), 3), + "count": len(ms), + } + except Exception: + pass + return {"count": 0} def get_history(self): with self._lock: return list(self._history) def get_performance_samples(self): - return list(self._perf_samples) if self._profiling_enabled and self._perf_samples else [] + result = list(self._perf_samples) if self._profiling_enabled and self._perf_samples else [] + self.logger.debug(f"get_performance_samples called: profiling={self._profiling_enabled}, samples_count={len(result)}") + return result def _report_performance_stats(self): try: diff --git a/target_simulator/gui/performance_analysis_window.py b/target_simulator/gui/performance_analysis_window.py index 3b0e2ae..94f5c4b 100644 --- a/target_simulator/gui/performance_analysis_window.py +++ b/target_simulator/gui/performance_analysis_window.py @@ -220,8 +220,13 @@ class PerformanceAnalysisWindow(tk.Toplevel): top_container = ttk.Frame(main_pane) main_pane.add(top_container, weight=1) + # Configure grid per dividere esattamente in due + top_container.columnconfigure(0, weight=1, uniform="half") + top_container.columnconfigure(1, weight=1, uniform="half") + top_container.rowconfigure(0, weight=1) + stats_frame = ttk.LabelFrame(top_container, text="Performance Statistics", padding=10) - stats_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5)) + stats_frame.grid(row=0, column=0, sticky="nsew", padx=(0, 5)) self._create_stats_table(stats_frame) self._create_info_panel(top_container) @@ -234,24 +239,25 @@ class PerformanceAnalysisWindow(tk.Toplevel): def _create_info_panel(self, parent): """Create an informational panel explaining the metrics.""" info_frame = ttk.LabelFrame(parent, text="ℹ About Performance Analysis", padding=10) - info_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=False) + info_frame.grid(row=0, column=1, sticky="nsew", padx=(5, 0)) + + # Simplified text to fit without scrolling info_text = ( - "This window analyzes packet processing times.\n\n" - "📊 Measured Components:\n" + "Packet processing time analysis.\n" + "📊 Components:\n" "• Parse: Decode SFP payload\n" "• Hub: Update SimulationStateHub\n" - "• Archive: Persist data to file\n" - "• Listener: Broadcast events to GUI\n" - "• Clock: Synchronize timestamps\n\n" + "• Archive: Save data to file\n" + "• Listener: Broadcast to GUI\n" + "• Clock: Sync timestamps\n" "⚠ Spikes (>100ms):\n" - "Critical slowdowns, likely Garbage\n" - "Collection, disk I/O, or lock contention.\n\n" + "Slowdowns from GC, disk I/O,\n" + "or lock contention.\n" "🎯 Bottleneck:\n" - "Component with the highest time\n" - "during the single slowest event." + "Slowest component in worst event." ) - info_label = ttk.Label(info_frame, text=info_text, justify=tk.LEFT, font=("Segoe UI", 9), wraplength=320) - info_label.pack(anchor=tk.W) + info_label = ttk.Label(info_frame, text=info_text, justify=tk.LEFT, font=("Segoe UI", 9)) + info_label.pack(anchor=tk.W, fill=tk.BOTH, expand=True) def _create_stats_table(self, parent): """Create the statistics table.""" @@ -267,7 +273,7 @@ class PerformanceAnalysisWindow(tk.Toplevel): self.stats_tree.heading("Metric", text="Metric") self.stats_tree.heading("Value", text="Value") self.stats_tree.heading("Details", text="Details") - self.stats_tree.column("Metric", width=200, anchor='w') + self.stats_tree.column("Metric", width=160, anchor='w') self.stats_tree.column("Value", width=150, anchor='e') self.stats_tree.column("Details", width=300, anchor='w') self.stats_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) diff --git a/target_simulator/simulation/simulation_controller.py b/target_simulator/simulation/simulation_controller.py index 82050e1..9353e42 100644 --- a/target_simulator/simulation/simulation_controller.py +++ b/target_simulator/simulation/simulation_controller.py @@ -253,9 +253,12 @@ class SimulationController: # Save performance profiling data if available if router and hasattr(router, 'get_performance_samples'): perf_samples = router.get_performance_samples() + self.logger.debug(f"Retrieved {len(perf_samples) if perf_samples else 0} performance samples from router") if perf_samples: extra_metadata["performance_samples"] = perf_samples self.logger.info(f"Saved {len(perf_samples)} performance samples to archive") + else: + self.logger.warning("No performance samples available to save") except Exception as e: self.logger.warning( f"Could not collect latency samples for archive: {e}" diff --git a/target_simulator/utils/clock_synchronizer.py b/target_simulator/utils/clock_synchronizer.py index 196c5fe..c4bef69 100644 --- a/target_simulator/utils/clock_synchronizer.py +++ b/target_simulator/utils/clock_synchronizer.py @@ -61,6 +61,7 @@ class ClockSynchronizer: self._lock = threading.Lock() self._history: collections.deque = collections.deque(maxlen=history_size) + self._latency_history: collections.deque = collections.deque(maxlen=history_size) self._min_samples = min_samples_for_fit self._update_interval = max(1, update_interval) self._update_counter = 0 @@ -135,9 +136,17 @@ class ClockSynchronizer: estimated_generation_times = self._m * x_vals + self._b # Latency is the difference between reception and estimated generation latencies = y_vals - estimated_generation_times - # Update the average latency, filtering out negative values which are artifacts - positive_latencies = latencies[latencies >= 0] - if len(positive_latencies) > 0: + + # Store latency samples and update the average + self._latency_history.clear() + positive_latencies = [] + for i in range(len(latencies)): + if latencies[i] >= 0: + reception_time = y_vals[i] + self._latency_history.append((reception_time, latencies[i])) + positive_latencies.append(latencies[i]) + + if positive_latencies: self._average_latency_s = np.mean(positive_latencies) else: self._average_latency_s = 0.0 @@ -179,4 +188,14 @@ class ClockSynchronizer: Returns 0.0 when insufficient data is available to compute the metric. """ with self._lock: - return self._average_latency_s \ No newline at end of file + return self._average_latency_s + + def get_latency_history(self) -> List[Tuple[float, float]]: + """ + Returns a copy of the recent latency samples. + + Returns: + A list of tuples, where each tuple is (reception_time_s, latency_s). + """ + with self._lock: + return list(self._latency_history) \ No newline at end of file diff --git a/target_simulator/utils/csv_logger.py b/target_simulator/utils/csv_logger.py index 3c040a7..c03549d 100644 --- a/target_simulator/utils/csv_logger.py +++ b/target_simulator/utils/csv_logger.py @@ -1,48 +1,31 @@ -"""CSV helpers for IO tracing. - -This module provides lightweight helpers that append sent/received -position records to CSV files located in the application's Temp folder. -Behavior is governed by `target_simulator.config.DEBUG_CONFIG` (see keys -``enable_io_trace``, ``temp_folder_name``, and filename overrides). - -These functions are intended for debugging and tracing; they return ``True`` -when the append operation succeeded and ``False`` when disabled or on error. - -PERFORMANCE: Uses asynchronous buffering to avoid blocking the simulation -thread. Rows are buffered in memory and flushed periodically by a background -thread, eliminating I/O overhead from the critical path. +# target_simulator/utils/csv_logger.py +""" +CSV helpers for IO tracing. +... (docstring existing) """ - import csv import os import time import threading import atexit -from typing import Iterable, Any, Dict, List, Tuple +from typing import Iterable, Any, Dict, List, Tuple,Optional from collections import deque from target_simulator.config import DEBUG_CONFIG -# --- Async CSV Buffer --- _CSV_BUFFER_LOCK = threading.Lock() -_CSV_BUFFERS: Dict[str, deque] = {} # filename -> deque of (row, headers) -_CSV_FLUSH_THREAD: threading.Thread = None +_CSV_BUFFERS: Dict[str, deque] = {} +_CSV_FLUSH_THREAD: Optional[threading.Thread] = None _CSV_STOP_EVENT = threading.Event() -_CSV_FLUSH_INTERVAL_S = 2.0 # Flush every 2 seconds -_CSV_MAX_BUFFER_SIZE = 1000 # Flush immediately if buffer exceeds this +_CSV_FLUSH_INTERVAL_S = 2.0 +_CSV_MAX_BUFFER_SIZE = 1000 -def _csv_flush_worker(): - """Background thread that periodically flushes buffered CSV rows to disk.""" - while not _CSV_STOP_EVENT.is_set(): - time.sleep(_CSV_FLUSH_INTERVAL_S) - _flush_all_buffers() - # Final flush on shutdown - _flush_all_buffers() - - -def _flush_all_buffers(): - """Flush all buffered CSV rows to their respective files.""" +def flush_all_csv_buffers(): + """ + Synchronously flushes all buffered CSV rows to their respective files. + This function can be called to ensure data is written to disk before an operation. + """ with _CSV_BUFFER_LOCK: for filename, buffer in list(_CSV_BUFFERS.items()): if not buffer: @@ -53,31 +36,44 @@ def _flush_all_buffers(): continue file_path = os.path.join(temp_folder, filename) - - # Check if we need to write headers - write_headers = not os.path.exists(file_path) + # Check if file is empty or doesn't exist to decide whether to write headers + write_headers = not os.path.exists(file_path) or os.path.getsize(file_path) == 0 try: with open(file_path, "a", newline="", encoding="utf-8") as csvfile: writer = csv.writer(csvfile) - - # Write all buffered rows + + # Extract all items from buffer to write in one go + rows_to_write = [] while buffer: - row, headers = buffer.popleft() + rows_to_write.append(buffer.popleft()) - # Write headers only once for new files - if write_headers and headers is not None: + if not rows_to_write: + continue + + # If headers need to be written, take them from the first buffered item + if write_headers: + _, headers = rows_to_write[0] + if headers: writer.writerow(list(headers)) - write_headers = False - + + # Write all the rows + for row, _ in rows_to_write: writer.writerow(list(row)) except Exception: - # Clear buffer on error to avoid accumulation - buffer.clear() + # On error, we can try to put items back in buffer or log the loss + # For now, clear to prevent repeated failures on same data + pass +def _csv_flush_worker(): + """Background thread that periodically flushes buffered CSV rows to disk.""" + while not _CSV_STOP_EVENT.wait(_CSV_FLUSH_INTERVAL_S): + flush_all_csv_buffers() + # Final flush on shutdown + flush_all_csv_buffers() def _ensure_csv_flush_thread(): - """Ensure the background flush thread is running.""" + """Ensures the background CSV flush thread is running.""" global _CSV_FLUSH_THREAD if _CSV_FLUSH_THREAD is None or not _CSV_FLUSH_THREAD.is_alive(): _CSV_STOP_EVENT.clear() @@ -85,95 +81,56 @@ def _ensure_csv_flush_thread(): target=_csv_flush_worker, daemon=True, name="CSVFlushThread" ) _CSV_FLUSH_THREAD.start() - # Register cleanup on exit atexit.register(_shutdown_csv_logger) - def _shutdown_csv_logger(): - """Stop the flush thread and ensure all data is written.""" + """Signals the flush thread to stop and performs a final flush.""" _CSV_STOP_EVENT.set() if _CSV_FLUSH_THREAD and _CSV_FLUSH_THREAD.is_alive(): - _CSV_FLUSH_THREAD.join(timeout=5.0) + _CSV_FLUSH_THREAD.join(timeout=2.0) + flush_all_csv_buffers() # Final synchronous flush - -def _ensure_temp_folder(): +def _ensure_temp_folder() -> Optional[str]: + """Ensures the temporary directory exists and returns its path.""" temp_folder = DEBUG_CONFIG.get("temp_folder_name", "Temp") - if not os.path.exists(temp_folder): - try: - os.makedirs(temp_folder, exist_ok=True) - except Exception: - # If we cannot create the folder, swallow the exception; callers - # should handle absence of files gracefully. - return None - return temp_folder + try: + os.makedirs(temp_folder, exist_ok=True) + return temp_folder + except Exception: + return None - -def append_row(filename: str, row: Iterable[Any], headers: Iterable[str] | None = None): - """Append a row to a CSV file stored under the Temp folder. - - If the file does not exist and ``headers`` is provided, the headers are - written as the first row. The function is a no-op when tracing is - disabled via DEBUG_CONFIG. - - PERFORMANCE: This function is now async-buffered and returns immediately - without blocking on I/O. Rows are written to disk by a background thread. - - Args: - filename: Name of the target CSV file inside the Temp folder. - row: Iterable of values to write as a CSV row. - headers: Optional iterable of header names to write when creating a - new file. - - Returns: - True on success, False when tracing is disabled or an error occurred. +def append_row(filename: str, row: Iterable[Any], headers: Optional[Iterable[str]] = None): """ - if not DEBUG_CONFIG.get("enable_io_trace", False): + Appends a row to a CSV file buffer, to be written asynchronously. + """ + if not DEBUG_CONFIG.get("enable_io_trace", True): return False - temp_folder = _ensure_temp_folder() - if not temp_folder: - return False - - # Ensure flush thread is running _ensure_csv_flush_thread() - # Buffer the row for async writing with _CSV_BUFFER_LOCK: if filename not in _CSV_BUFFERS: - _CSV_BUFFERS[filename] = deque(maxlen=_CSV_MAX_BUFFER_SIZE * 2) + _CSV_BUFFERS[filename] = deque() + # Store row and headers together _CSV_BUFFERS[filename].append((row, headers)) - # Force immediate flush if buffer is getting large + # Optional: trigger an early flush if buffer gets too large if len(_CSV_BUFFERS[filename]) >= _CSV_MAX_BUFFER_SIZE: - # Schedule immediate flush without blocking - threading.Thread(target=_flush_all_buffers, daemon=True).start() + threading.Thread(target=flush_all_csv_buffers, daemon=True).start() return True - -def append_sent_position( - timestamp: float, target_id: int, x: float, y: float, z: float, command: str -): - """Append a sent-position entry for IO tracing. - - The row contains a timestamp, target id, position in feet and the issued - command string. - """ +def append_sent_position(timestamp: float, target_id: int, x: float, y: float, z: float, command: str): + """Logs a sent target position to the corresponding trace file.""" filename = DEBUG_CONFIG.get("io_trace_sent_filename", "sent_positions.csv") headers = ["timestamp", "target_id", "x_ft", "y_ft", "z_ft", "command"] row = [timestamp, target_id, x, y, z, command] return append_row(filename, row, headers=headers) - -def append_received_position( - timestamp: float, target_id: int, x: float, y: float, z: float -): - """Append a received-position entry for IO tracing. - - The row contains a timestamp, target id and position in feet. - """ +def append_received_position(timestamp: float, target_id: int, x: float, y: float, z: float): + """Logs a received target position to the corresponding trace file.""" filename = DEBUG_CONFIG.get("io_trace_received_filename", "received_positions.csv") headers = ["timestamp", "target_id", "x_ft", "y_ft", "z_ft"] row = [timestamp, target_id, x, y, z] - return append_row(filename, row, headers=headers) + return append_row(filename, row, headers=headers) \ No newline at end of file