SXXXXXXX_ScenarioSimulator/scenario_simulator/gui/simulation_manager.py
2025-09-30 10:21:49 +02:00

107 lines
4.1 KiB
Python

"""
Manages the simulation thread and data communication for the GUI.
This module encapsulates the simulation loop in a separate thread to keep the GUI
responsive. It uses a queue to send data back to the main thread for plotting
and display.
"""
import threading
import queue
import time
import numpy as np
from ..core.simulation_engine import generate_iq_data
class SimulationManager:
"""Runs the simulation in a separate thread and provides data via a queue."""
def __init__(self):
self.simulation_thread = None
self.stop_event = threading.Event()
self.data_queue = queue.Queue()
def is_running(self):
"""Check if the simulation thread is currently running."""
return self.simulation_thread is not None and self.simulation_thread.is_alive()
def start(self, radar_cfg, targets, duration_s, num_pulses_cpi):
"""Starts the simulation thread."""
if self.is_running():
return
self.stop_event.clear()
# Clear the queue before starting a new simulation
while not self.data_queue.empty():
try:
self.data_queue.get_nowait()
except queue.Empty:
break
self.simulation_thread = threading.Thread(
target=self._simulation_loop,
args=(radar_cfg, targets, duration_s, num_pulses_cpi),
daemon=True
)
self.simulation_thread.start()
def stop(self):
"""Signals the simulation thread to stop and waits for it to join."""
if not self.is_running():
return
self.stop_event.set()
self.simulation_thread.join() # Wait for the thread to finish
self.simulation_thread = None
def _simulation_loop(self, radar_cfg, targets, total_duration_s, num_pulses_cpi):
"""The main loop for the simulation thread."""
prf = radar_cfg.prf
pri = 1.0 / prf
cpi_duration = num_pulses_cpi * pri
current_az_deg = radar_cfg.scan_config.min_az_deg if radar_cfg.scan_config.mode == 'sector' else 0.0
current_time_s = 0.0
frame_num = 0
start_time = time.time()
while current_time_s < total_duration_s:
if self.stop_event.is_set():
break
# Determine antenna pointing for this CPI
if radar_cfg.scan_config.mode == 'staring':
pass
elif radar_cfg.scan_config.mode == 'sector':
scan_range = radar_cfg.scan_config.max_az_deg - radar_cfg.scan_config.min_az_deg
if scan_range <= 0:
current_az_deg = radar_cfg.scan_config.min_az_deg
else:
# Simplified periodic sweep for real-time simulation
scan_speed_rad_s = np.deg2rad(radar_cfg.scan_config.scan_speed_deg_s)
scan_range_rad = np.deg2rad(scan_range)
min_az_rad = np.deg2rad(radar_cfg.scan_config.min_az_deg)
cycle_duration = 2 * scan_range_rad / scan_speed_rad_s
time_in_cycle = (time.time() - start_time) % cycle_duration
if time_in_cycle < cycle_duration / 2:
current_az_rad = min_az_rad + time_in_cycle * scan_speed_rad_s
else:
current_az_rad = np.deg2rad(radar_cfg.scan_config.max_az_deg) - (time_in_cycle - cycle_duration / 2) * scan_speed_rad_s
current_az_deg = np.rad2deg(current_az_rad)
# Generate IQ data for this CPI at the current antenna angle
iq_data_cpi = generate_iq_data(radar_cfg, targets, num_pulses_cpi, current_az_deg)
# Put data in the queue for the GUI thread
frame_data = (current_az_deg, iq_data_cpi, frame_num)
self.data_queue.put(frame_data)
current_time_s += cpi_duration
frame_num += 1
# Control the simulation rate
time.sleep(cpi_duration)
# Signal that the simulation is finished
self.data_queue.put(None)