107 lines
4.1 KiB
Python
107 lines
4.1 KiB
Python
"""
|
|
Manages the simulation thread and data communication for the GUI.
|
|
|
|
This module encapsulates the simulation loop in a separate thread to keep the GUI
|
|
responsive. It uses a queue to send data back to the main thread for plotting
|
|
and display.
|
|
"""
|
|
|
|
import threading
|
|
import queue
|
|
import time
|
|
import numpy as np
|
|
|
|
from ..core.simulation_engine import generate_iq_data
|
|
|
|
class SimulationManager:
|
|
"""Runs the simulation in a separate thread and provides data via a queue."""
|
|
def __init__(self):
|
|
self.simulation_thread = None
|
|
self.stop_event = threading.Event()
|
|
self.data_queue = queue.Queue()
|
|
|
|
def is_running(self):
|
|
"""Check if the simulation thread is currently running."""
|
|
return self.simulation_thread is not None and self.simulation_thread.is_alive()
|
|
|
|
def start(self, radar_cfg, targets, duration_s, num_pulses_cpi):
|
|
"""Starts the simulation thread."""
|
|
if self.is_running():
|
|
return
|
|
|
|
self.stop_event.clear()
|
|
# Clear the queue before starting a new simulation
|
|
while not self.data_queue.empty():
|
|
try:
|
|
self.data_queue.get_nowait()
|
|
except queue.Empty:
|
|
break
|
|
|
|
self.simulation_thread = threading.Thread(
|
|
target=self._simulation_loop,
|
|
args=(radar_cfg, targets, duration_s, num_pulses_cpi),
|
|
daemon=True
|
|
)
|
|
self.simulation_thread.start()
|
|
|
|
def stop(self):
|
|
"""Signals the simulation thread to stop and waits for it to join."""
|
|
if not self.is_running():
|
|
return
|
|
self.stop_event.set()
|
|
self.simulation_thread.join() # Wait for the thread to finish
|
|
self.simulation_thread = None
|
|
|
|
def _simulation_loop(self, radar_cfg, targets, total_duration_s, num_pulses_cpi):
|
|
"""The main loop for the simulation thread."""
|
|
prf = radar_cfg.prf
|
|
pri = 1.0 / prf
|
|
cpi_duration = num_pulses_cpi * pri
|
|
|
|
current_az_deg = radar_cfg.scan_config.min_az_deg if radar_cfg.scan_config.mode == 'sector' else 0.0
|
|
current_time_s = 0.0
|
|
frame_num = 0
|
|
|
|
start_time = time.time()
|
|
|
|
while current_time_s < total_duration_s:
|
|
if self.stop_event.is_set():
|
|
break
|
|
|
|
# Determine antenna pointing for this CPI
|
|
if radar_cfg.scan_config.mode == 'staring':
|
|
pass
|
|
elif radar_cfg.scan_config.mode == 'sector':
|
|
scan_range = radar_cfg.scan_config.max_az_deg - radar_cfg.scan_config.min_az_deg
|
|
if scan_range <= 0:
|
|
current_az_deg = radar_cfg.scan_config.min_az_deg
|
|
else:
|
|
# Simplified periodic sweep for real-time simulation
|
|
scan_speed_rad_s = np.deg2rad(radar_cfg.scan_config.scan_speed_deg_s)
|
|
scan_range_rad = np.deg2rad(scan_range)
|
|
min_az_rad = np.deg2rad(radar_cfg.scan_config.min_az_deg)
|
|
|
|
cycle_duration = 2 * scan_range_rad / scan_speed_rad_s
|
|
time_in_cycle = (time.time() - start_time) % cycle_duration
|
|
|
|
if time_in_cycle < cycle_duration / 2:
|
|
current_az_rad = min_az_rad + time_in_cycle * scan_speed_rad_s
|
|
else:
|
|
current_az_rad = np.deg2rad(radar_cfg.scan_config.max_az_deg) - (time_in_cycle - cycle_duration / 2) * scan_speed_rad_s
|
|
current_az_deg = np.rad2deg(current_az_rad)
|
|
|
|
# Generate IQ data for this CPI at the current antenna angle
|
|
iq_data_cpi = generate_iq_data(radar_cfg, targets, num_pulses_cpi, current_az_deg)
|
|
|
|
# Put data in the queue for the GUI thread
|
|
frame_data = (current_az_deg, iq_data_cpi, frame_num)
|
|
self.data_queue.put(frame_data)
|
|
|
|
current_time_s += cpi_duration
|
|
frame_num += 1
|
|
|
|
# Control the simulation rate
|
|
time.sleep(cpi_duration)
|
|
|
|
# Signal that the simulation is finished
|
|
self.data_queue.put(None) |