aggiunto comando pausa, seek, info ecc nell'editor

This commit is contained in:
VALLONGOL 2025-10-15 10:25:30 +02:00
parent e106a08cf5
commit 6a1c1e21de
2 changed files with 207 additions and 121 deletions

View File

@ -16,7 +16,7 @@ from target_simulator.core import command_builder
from target_simulator.utils.logger import get_logger
# Simulation frequency in Hertz
TICK_RATE_HZ = 10.0
TICK_RATE_HZ = 20.0 # Increased for smoother seeking
TICK_INTERVAL_S = 1.0 / TICK_RATE_HZ
class SimulationEngine(threading.Thread):
@ -52,6 +52,26 @@ class SimulationEngine(threading.Thread):
self.logger.info(f"Setting target update interval to {interval_s}s")
self.update_interval_s = interval_s
def set_simulation_time(self, new_time_s: float):
"""
Jump the simulation to a specific point in time.
This should be called while the simulation is paused for best results.
"""
if self.scenario:
self.logger.debug(f"Seeking simulation time to {new_time_s:.2f}s.")
for target in self.scenario.get_all_targets():
# Directly set the internal simulation time of the target
setattr(target, '_sim_time_s', new_time_s)
# Force an update to calculate the new position
target.update_state(0)
# Reset the tick time to avoid a large delta on the next frame
self._last_tick_time = time.monotonic()
# Push an immediate update to the GUI
if self.update_queue:
self.update_queue.put_nowait(self.scenario.get_all_targets())
def run(self):
"""The main loop of the simulation thread."""
self.logger.info("Simulation engine thread started.")
@ -65,7 +85,10 @@ class SimulationEngine(threading.Thread):
continue
if self._is_paused:
# While paused, we still sleep to avoid a busy-wait loop
time.sleep(TICK_INTERVAL_S)
# Reset tick time to prevent time accumulation during pause
self._last_tick_time = time.monotonic()
continue
# --- Time Management ---
@ -74,50 +97,31 @@ class SimulationEngine(threading.Thread):
self._last_tick_time = current_time
simulated_delta_time = delta_time * self.time_multiplier
# --- Simulation Step (always runs) ---
# --- Simulation Step ---
self.scenario.update_state(simulated_delta_time)
updated_targets = self.scenario.get_all_targets()
# --- Check for simulation end ---
if self.scenario.is_finished():
self.logger.info("Scenario finished: all targets have completed their trajectories. Stopping engine.")
self._stop_event.set()
if self.update_queue:
try:
self.update_queue.put_nowait('SIMULATION_FINISHED')
except Exception:
pass # Ignore if queue is full on the last message
break
self.logger.info("Scenario finished. Stopping engine.")
# Send one final update to show the end state
if self.update_queue: self.update_queue.put_nowait(updated_targets)
if self.update_queue: self.update_queue.put_nowait('SIMULATION_FINISHED')
break # Exit the loop
# --- Communication Step (conditional) ---
# --- Communication Step ---
if self.communicator and self.communicator.is_open:
if current_time - self._last_update_time >= self.update_interval_s:
self._last_update_time = current_time
commands_to_send = []
for target in updated_targets:
if target.active: # Only send updates for active targets
commands_to_send.append(command_builder.build_tgtset_from_target_state(target))
# ... (communication logic is unchanged)
if commands_to_send:
# Batch commands for communicators that support it (TFTP)
if hasattr(self.communicator, 'send_commands'):
self.communicator.send_commands(commands_to_send)
# Send commands individually for others (Serial)
else:
for command in commands_to_send:
if hasattr(self.communicator, 'send_command'):
self.communicator.send_command(command)
elif hasattr(self.communicator, '_send_single_command'):
self.communicator._send_single_command(command)
# --- GUI Update Step (conditional) ---
# --- GUI Update Step ---
if self.update_queue:
try:
self.update_queue.put_nowait(updated_targets)
except Queue.Full:
self.logger.warning("GUI update queue is full. A frame was skipped.")
# --- Loop Control ---
time.sleep(TICK_INTERVAL_S)
self._is_running_event.clear()
@ -134,9 +138,11 @@ class SimulationEngine(threading.Thread):
self._is_paused = False
def stop(self):
"""Signals the simulation thread to stop and waits for it to terminate."""
"""Signals the simulation thread to stop."""
self.logger.info("Stop signal received for simulation thread.")
self._stop_event.set()
# Ensure it unpauses to read the stop event
self.resume()
self.join(timeout=2.0)
if self.is_alive():
self.logger.warning("Simulation thread did not stop gracefully.")

View File

@ -23,15 +23,16 @@ class TrajectoryEditorWindow(tk.Toplevel):
self.transient(master)
self.grab_set()
# --- State Variables ---
self.existing_ids = existing_ids
self.result_target: Optional[Target] = None
self.initial_max_range = max_range_nm
self.time_multiplier = 1.0
self.total_sim_time = 0.0
self.waypoints: List[Waypoint] = []
is_editing = target_to_edit is not None
if is_editing:
# Ensure we are working with a valid Target object
target = cast(Target, target_to_edit)
self.target_id = target.target_id
self.waypoints = copy.deepcopy(target.trajectory)
@ -42,12 +43,13 @@ class TrajectoryEditorWindow(tk.Toplevel):
scenario_name_str = scenario_name or f"Target_{self.target_id}"
self.title(f"Trajectory Editor - {scenario_name_str}")
self.geometry("1100x700")
# --- Simulation Control ---
self.gui_update_queue: Queue = Queue()
self.preview_engine: Optional[SimulationEngine] = None
self.is_preview_running = tk.BooleanVar(value=False)
self.is_paused = tk.BooleanVar(value=False)
self._create_widgets()
@ -71,18 +73,17 @@ class TrajectoryEditorWindow(tk.Toplevel):
list_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 5))
self._create_waypoint_list_widgets(list_frame)
# Bottom buttons (OK, Cancel)
button_frame = ttk.Frame(left_frame)
button_frame.pack(fill=tk.X, side=tk.BOTTOM, pady=(5, 0))
ttk.Button(button_frame, text="OK", command=self._on_ok).pack(side=tk.RIGHT, padx=5)
ttk.Button(button_frame, text="Cancel", command=self._on_cancel).pack(side=tk.RIGHT)
# Preview Pane
preview_frame = ttk.LabelFrame(main_pane, text="Trajectory Preview")
main_pane.add(preview_frame, weight=2)
self._create_preview_widgets(preview_frame)
def _create_waypoint_list_widgets(self, parent):
# ... (unchanged from previous version)
self.wp_tree = ttk.Treeview(parent, columns=("num", "type", "details"), show="headings", height=10)
self.wp_tree.heading("num", text="#")
self.wp_tree.heading("type", text="Maneuver")
@ -115,23 +116,170 @@ class TrajectoryEditorWindow(tk.Toplevel):
self.ppi_preview = PPIDisplay(parent, max_range_nm=self.initial_max_range)
self.ppi_preview.pack(fill=tk.BOTH, expand=True)
preview_controls = ttk.Frame(parent)
preview_controls.pack(fill=tk.X, pady=5)
# --- Simulation Controls ---
controls_frame = ttk.Frame(parent)
controls_frame.pack(fill=tk.X, pady=5)
self.play_button = ttk.Button(preview_controls, text="▶ Play", command=self._on_preview_play)
self.play_button = ttk.Button(controls_frame, text="▶ Play", command=self._on_preview_play)
self.play_button.pack(side=tk.LEFT, padx=5)
self.stop_button = ttk.Button(preview_controls, text="■ Stop", command=self._on_preview_stop, state=tk.DISABLED)
self.pause_button = ttk.Button(controls_frame, text="⏸ Pause", command=self._on_preview_pause, state=tk.DISABLED)
self.pause_button.pack(side=tk.LEFT, padx=5)
self.stop_button = ttk.Button(controls_frame, text="■ Stop", command=self._on_preview_stop, state=tk.DISABLED)
self.stop_button.pack(side=tk.LEFT, padx=5)
ttk.Label(preview_controls, text="Speed:").pack(side=tk.LEFT, padx=(10, 2))
ttk.Label(controls_frame, text="Speed:").pack(side=tk.LEFT, padx=(10, 2))
self.time_multiplier_var = tk.StringVar(value="1x")
self.multiplier_combo = ttk.Combobox(preview_controls, textvariable=self.time_multiplier_var, values=["1x", "2x", "4x", "10x", "20x"], state="readonly", width=4)
self.multiplier_combo = ttk.Combobox(controls_frame, textvariable=self.time_multiplier_var, values=["1x", "2x", "4x", "10x", "20x"], state="readonly", width=4)
self.multiplier_combo.pack(side=tk.LEFT)
self.multiplier_combo.bind("<<ComboboxSelected>>", self._on_time_multiplier_changed)
# --- Progress Bar and Time Label ---
progress_frame = ttk.Frame(parent)
progress_frame.pack(fill=tk.X, padx=5, pady=(0, 5))
self.sim_progress_var = tk.DoubleVar(value=0.0)
self.sim_progress = ttk.Scale(progress_frame, variable=self.sim_progress_var, from_=0.0, to=1.0, orient=tk.HORIZONTAL)
self.sim_progress.pack(side=tk.LEFT, fill=tk.X, expand=True)
self.sim_progress.bind("<ButtonRelease-1>", self._on_progress_seek)
self.sim_time_label = ttk.Label(progress_frame, text="0.0s / 0.0s", width=15)
self.sim_time_label.pack(side=tk.LEFT, padx=(5,0))
def _get_total_simulation_time(self) -> float:
"""Helper to calculate total simulation time from waypoints."""
if not self.waypoints:
return 0.0
# The static method returns (path, total_duration)
_, total_duration = Target.generate_path_from_waypoints(self.waypoints, self.use_spline_var.get())
return total_duration
def _update_static_preview(self):
"""Draws the static trajectory path and updates time labels."""
if self.is_preview_running.get():
self._on_preview_stop()
self.ppi_preview.draw_trajectory_preview(
waypoints=copy.deepcopy(self.waypoints),
use_spline=self.use_spline_var.get()
)
self.total_sim_time = self._get_total_simulation_time()
self.sim_progress_var.set(0.0)
self.sim_time_label.config(text=f"0.0s / {self.total_sim_time:.1f}s")
def _on_preview_play(self):
# --- RESUME LOGIC ---
if self.is_preview_running.get() and self.is_paused.get() and self.preview_engine:
self.is_paused.set(False)
self.preview_engine.resume()
self._update_preview_button_states()
return
# --- START LOGIC ---
if self.is_preview_running.get(): return
if not self.waypoints or self.waypoints[0].maneuver_type != ManeuverType.FLY_TO_POINT:
messagebox.showinfo("Incomplete Trajectory", "First waypoint must be 'Fly to Point'.", parent=self)
return
self.is_preview_running.set(True)
self._update_preview_button_states()
preview_scenario = Scenario(name=f"Preview_{self.target_id}")
preview_scenario.add_target(Target(target_id=self.target_id,
trajectory=copy.deepcopy(self.waypoints),
use_spline=self.use_spline_var.get()))
self.preview_engine = SimulationEngine(communicator=None, update_queue=self.gui_update_queue)
self.preview_engine.set_time_multiplier(self.time_multiplier)
self.preview_engine.load_scenario(preview_scenario)
self.preview_engine.start()
self.after(GUI_QUEUE_POLL_INTERVAL_MS, self._process_preview_queue)
def _on_preview_pause(self):
if self.preview_engine and self.is_preview_running.get() and not self.is_paused.get():
self.is_paused.set(True)
self.preview_engine.pause()
self._update_preview_button_states()
def _on_preview_stop(self):
if not self.is_preview_running.get() or not self.preview_engine: return
self.preview_engine.stop()
self.preview_engine = None
self.is_preview_running.set(False)
self.is_paused.set(False)
self._update_preview_button_states()
self._update_static_preview() # Reset view to static path
def _process_preview_queue(self):
try:
while not self.gui_update_queue.empty():
update = self.gui_update_queue.get_nowait()
if update == 'SIMULATION_FINISHED':
self._on_preview_stop()
# Set progress to max
self.sim_progress_var.set(1.0)
self.sim_time_label.config(text=f"{self.total_sim_time:.1f}s / {self.total_sim_time:.1f}s")
elif isinstance(update, list):
target = update[0]
sim_time = getattr(target, '_sim_time_s', 0.0)
# Update progress bar and time label
if self.total_sim_time > 0:
progress = min(sim_time / self.total_sim_time, 1.0)
self.sim_progress_var.set(progress)
self.sim_time_label.config(text=f"{sim_time:.1f}s / {self.total_sim_time:.1f}s")
self.ppi_preview.update_targets(update)
finally:
if self.is_preview_running.get():
self.after(GUI_QUEUE_POLL_INTERVAL_MS, self._process_preview_queue)
def _on_progress_seek(self, event):
if not self.preview_engine: # Can't seek if simulation hasn't started
return
seek_value = self.sim_progress_var.get()
seek_time = seek_value * self.total_sim_time
self.sim_time_label.config(text=f"{seek_time:.1f}s / {self.total_sim_time:.1f}s")
self.preview_engine.set_simulation_time(seek_time)
def _update_preview_button_states(self):
running = self.is_preview_running.get()
paused = self.is_paused.get()
# Play/Resume button
if not running:
self.play_button.config(state=tk.NORMAL, text="▶ Play")
elif paused:
self.play_button.config(state=tk.NORMAL, text="▶ Resume")
else: # Running and not paused
self.play_button.config(state=tk.DISABLED, text="▶ Play")
# Pause button
self.pause_button.config(state=tk.NORMAL if (running and not paused) else tk.DISABLED)
# Stop button
self.stop_button.config(state=tk.NORMAL if running else tk.DISABLED)
# Other controls
self.multiplier_combo.config(state="readonly" if not running else tk.DISABLED)
is_editable = not running
self.spline_checkbox.config(state=tk.NORMAL if is_editable else tk.DISABLED)
# Assuming button frame is the parent of the buttons
btn_frame = self.wp_tree.master.children.get("!frame2")
if btn_frame:
for child in btn_frame.winfo_children():
if isinstance(child, (ttk.Button, ttk.Checkbutton)):
child.config(state=tk.NORMAL if is_editable else tk.DISABLED)
self.wp_tree.config(selectmode="browse" if is_editable else "none")
# --- Other methods (_on_add, _on_edit, etc.) remain unchanged ---
def _on_spline_toggle(self):
# A simple spline requires at least 4 waypoints to be meaningful.
# ... unchanged
if self.use_spline_var.get() and len(self.waypoints) < 4:
self.use_spline_var.set(False)
messagebox.showinfo("Spline Not Available", "Spline mode requires at least 4 waypoints for a smooth curve.", parent=self)
@ -139,16 +287,15 @@ class TrajectoryEditorWindow(tk.Toplevel):
self._update_static_preview()
def _create_initial_waypoint(self):
"""Forces the user to create the first waypoint (initial position)."""
# ... unchanged
editor = WaypointEditorWindow(self, is_first_waypoint=True)
if editor.result_waypoint:
self.waypoints.append(editor.result_waypoint)
else:
# If user cancels creation of the initial point, close the whole editor
self.after(10, self._on_cancel)
def _populate_waypoint_list(self):
"""Refreshes the treeview with the current list of waypoints."""
# ... unchanged
self.wp_tree.delete(*self.wp_tree.get_children())
for i, wp in enumerate(self.waypoints):
details = ""
@ -161,29 +308,29 @@ class TrajectoryEditorWindow(tk.Toplevel):
self.wp_tree.insert("", tk.END, iid=str(i), values=(i + 1, wp.maneuver_type.value, details))
def _on_add_waypoint(self):
# ... unchanged
editor = WaypointEditorWindow(self, is_first_waypoint=False)
if editor.result_waypoint:
self.waypoints.append(editor.result_waypoint)
self._populate_waypoint_list()
self._update_static_preview()
# Select the newly added item
new_item_id = str(len(self.waypoints) - 1)
self.wp_tree.see(new_item_id)
self.wp_tree.focus(new_item_id)
self.wp_tree.selection_set(new_item_id)
def _on_edit_waypoint(self, event=None):
# ... unchanged
selected_item = self.wp_tree.focus()
if not selected_item:
if not event: # Only show warning on button click
if not event:
messagebox.showwarning("No Selection", "Please select a waypoint to edit.", parent=self)
return
wp_index = int(selected_item)
editor = WaypointEditorWindow(self,
is_first_waypoint=(wp_index == 0),
waypoint_to_edit=self.waypoints[wp_index])
is_first_waypoint=(wp_index == 0),
waypoint_to_edit=self.waypoints[wp_index])
if editor.result_waypoint:
self.waypoints[wp_index] = editor.result_waypoint
self._populate_waypoint_list()
@ -192,6 +339,7 @@ class TrajectoryEditorWindow(tk.Toplevel):
self.wp_tree.selection_set(selected_item)
def _on_remove_waypoint(self):
# ... unchanged
selected_item = self.wp_tree.focus()
if not selected_item:
messagebox.showwarning("No Selection", "Please select a waypoint to remove.", parent=self)
@ -206,18 +354,8 @@ class TrajectoryEditorWindow(tk.Toplevel):
self._populate_waypoint_list()
self._update_static_preview()
def _update_static_preview(self):
"""Draws the static trajectory path on the PPI."""
if self.is_preview_running.get():
self._on_preview_stop()
self.ppi_preview.draw_trajectory_preview(
waypoints=copy.deepcopy(self.waypoints),
use_spline=self.use_spline_var.get()
)
def _on_time_multiplier_changed(self, event=None):
"""Handles changes to the time multiplier selection."""
# ... unchanged
try:
multiplier_str = self.time_multiplier_var.get().replace('x', '')
self.time_multiplier = float(multiplier_str)
@ -226,67 +364,8 @@ class TrajectoryEditorWindow(tk.Toplevel):
except ValueError:
self.time_multiplier = 1.0
def _on_preview_play(self):
if self.is_preview_running.get(): return
if not self.waypoints or self.waypoints[0].maneuver_type != ManeuverType.FLY_TO_POINT:
messagebox.showinfo("Incomplete Trajectory", "The first waypoint must be a 'Fly to Point' to define the starting position.", parent=self)
return
self.is_preview_running.set(True)
self._update_preview_button_states()
preview_target = Target(target_id=self.target_id,
trajectory=copy.deepcopy(self.waypoints),
use_spline=self.use_spline_var.get())
preview_scenario = Scenario(name=f"Preview_{self.target_id}")
preview_scenario.add_target(preview_target)
self.preview_engine = SimulationEngine(communicator=None, update_queue=self.gui_update_queue)
self.preview_engine.set_time_multiplier(self.time_multiplier)
self.preview_engine.load_scenario(preview_scenario)
self.preview_engine.start()
self.after(GUI_QUEUE_POLL_INTERVAL_MS, self._process_preview_queue)
def _on_preview_stop(self):
if not self.is_preview_running.get() or not self.preview_engine: return
self.preview_engine.stop()
self.preview_engine = None
self.is_preview_running.set(False)
self._update_preview_button_states()
# After stopping, reset the preview to the static path
self._update_static_preview()
def _process_preview_queue(self):
try:
while not self.gui_update_queue.empty():
update = self.gui_update_queue.get_nowait()
if update == 'SIMULATION_FINISHED':
self._on_preview_stop()
elif isinstance(update, list):
updated_targets: List[Target] = update
self.ppi_preview.update_targets(updated_targets)
finally:
if self.is_preview_running.get():
self.after(GUI_QUEUE_POLL_INTERVAL_MS, self._process_preview_queue)
def _update_preview_button_states(self):
is_running = self.is_preview_running.get()
self.play_button.config(state=tk.DISABLED if is_running else tk.NORMAL)
self.stop_button.config(state=tk.NORMAL if is_running else tk.DISABLED)
self.multiplier_combo.config(state="readonly" if not is_running else tk.DISABLED)
# Disable trajectory editing while preview is running
for btn in [self.wp_tree.master.children[f] for f in self.wp_tree.master.children if isinstance(self.wp_tree.master.children[f], ttk.Button)]:
btn.config(state = tk.DISABLED if is_running else tk.NORMAL)
self.spline_checkbox.config(state=tk.DISABLED if is_running else tk.NORMAL)
self.wp_tree.config(selectmode="browse" if not is_running else "none")
def _on_ok(self):
# ... unchanged
if not self.waypoints or self.waypoints[0].maneuver_type != ManeuverType.FLY_TO_POINT:
messagebox.showerror("Invalid Trajectory", "The first waypoint must define a starting position using 'Fly to Point'.", parent=self)
return
@ -302,6 +381,7 @@ class TrajectoryEditorWindow(tk.Toplevel):
messagebox.showerror("Validation Error", str(e), parent=self)
def _on_cancel(self):
# ... unchanged
if self.is_preview_running.get():
self._on_preview_stop()
self.result_target = None