# FlightMonitor/data/playback_adapter.py """ Adapter for "playing back" recorded flight data from a local database. This adapter reads data in time chunks and simulates a real-time data flow. """ import time import threading from queue import Queue, Full as QueueFull from typing import Dict, Any, List, Optional from flightmonitor.utils.logger import get_logger from flightmonitor.data.common_models import CanonicalFlightState from flightmonitor.data.storage import DataStorage from flightmonitor.data.base_live_data_adapter import BaseLiveDataAdapter, AdapterMessage from flightmonitor.data.data_constants import ( MSG_TYPE_FLIGHT_DATA, MSG_TYPE_ADAPTER_STATUS, STATUS_STARTING, STATUS_FETCHING, STATUS_STOPPED, STATUS_PERMANENT_FAILURE, ) module_logger = get_logger(__name__) class PlaybackAdapter(BaseLiveDataAdapter): """ Simulates a live data feed by reading historical data from the daily database for a specific session and yielding it in time-ordered chunks. """ def __init__( self, output_queue: Queue[AdapterMessage], data_storage: DataStorage, session_date_str: str, start_timestamp: float, end_timestamp: float, bounding_box: Dict[str, float], playback_speed: float = 1.0, tick_interval_ms: int = 1000, daemon: bool = True, ): # The polling_interval for the base class here represents the loop's tick rate super().__init__( output_queue, bounding_box, float(tick_interval_ms / 1000.0), daemon, name="PlaybackAdapter", ) self.data_storage = data_storage self.session_date_str = session_date_str self.start_ts = start_timestamp self.end_ts = end_timestamp self._playback_speed = playback_speed self._tick_interval_ms = tick_interval_ms self._control_lock = threading.Lock() self._is_paused = False self._virtual_time = self.start_ts def pause(self): """Pauses the playback.""" with self._control_lock: self._is_paused = True module_logger.info(f"{self.name}: Playback paused.") def resume(self): """Resumes the playback.""" with self._control_lock: self._is_paused = False module_logger.info(f"{self.name}: Playback resumed.") def set_speed(self, speed_multiplier: float): """Sets the playback speed.""" with self._control_lock: self._playback_speed = max(0.1, speed_multiplier) # Prevent zero or negative speed module_logger.info(f"{self.name}: Playback speed set to {self._playback_speed}x.") def seek_to_time(self, timestamp: float): """Jumps the playback to a specific timestamp within the session.""" with self._control_lock: self._virtual_time = max(self.start_ts, min(timestamp, self.end_ts)) module_logger.info(f"{self.name}: Seeking playback to timestamp {self._virtual_time:.0f}.") def _send_status_to_queue(self, status_code: str, message: str): """Helper to send status messages to the output queue.""" status_payload = { "type": MSG_TYPE_ADAPTER_STATUS, "status_code": status_code, "message": f"{self.name}: {message}", } try: self.output_queue.put_nowait(status_payload) except QueueFull: module_logger.warning( f"{self.name}: Output queue full. Could not send status: {status_code}" ) def run(self): """Main thread loop for the playback adapter.""" module_logger.info( f"{self.name}: Thread starting playback for session on {self.session_date_str} " f"from {self.start_ts:.0f} to {self.end_ts:.0f}." ) if self.start_ts >= self.end_ts: err_msg = f"Invalid time range: start_ts ({self.start_ts}) is not less than end_ts ({self.end_ts}). Stopping." module_logger.error(f"{self.name}: {err_msg}") self._send_status_to_queue(STATUS_PERMANENT_FAILURE, err_msg) # Invia anche un messaggio di stop normale per chiudere pulitamente la UI self._send_status_to_queue(STATUS_STOPPED, "Playback failed due to invalid time range.") return self._send_status_to_queue(STATUS_STARTING, "Playback session started.") if not self.data_storage: err_msg = "DataStorage not available. Cannot start playback." module_logger.critical(f"{self.name}: {err_msg}") self._send_status_to_queue(STATUS_PERMANENT_FAILURE, err_msg) return while not self.is_stopped() and self._virtual_time <= self.end_ts: with self._control_lock: is_paused_now = self._is_paused current_speed = self._playback_speed if is_paused_now: time.sleep(self.polling_interval) # Sleep while paused continue time_to_fetch_sec = self.polling_interval * current_speed query_start_ts = self._virtual_time query_end_ts = self._virtual_time + time_to_fetch_sec try: # We query the DB for the next chunk of data flight_states = self.data_storage.get_positions_in_range( self.session_date_str, query_start_ts, query_end_ts ) if flight_states: # Create a payload structure similar to other adapters for consistency payload = {"canonical": flight_states, "raw_json": "{}"} message: AdapterMessage = { "type": MSG_TYPE_FLIGHT_DATA, "timestamp": query_end_ts, # The "current" time of the playback "payload": payload, } self.output_queue.put(message) module_logger.debug( f"{self.name}: Sent {len(flight_states)} flight states for time " f"~{int(query_end_ts)} to queue." ) else: # Even if no flights, we should send a message to update the clock empty_message: AdapterMessage = { "type": MSG_TYPE_FLIGHT_DATA, "timestamp": query_end_ts, "payload": {"canonical": [], "raw_json": "{}"}, } self.output_queue.put(empty_message) except Exception as e: module_logger.error( f"{self.name}: Error querying data for playback: {e}", exc_info=True ) self._send_status_to_queue( STATUS_PERMANENT_FAILURE, f"DB query failed: {e}" ) break # Update virtual time with self._control_lock: self._virtual_time += time_to_fetch_sec # Wait for the tick interval to simulate real-time flow time.sleep(self.polling_interval) if not self.is_stopped(): module_logger.info(f"{self.name}: Playback session finished.") self._send_status_to_queue(STATUS_STOPPED, "Playback finished.") else: module_logger.info(f"{self.name}: Playback stopped by user.") self._send_status_to_queue(STATUS_STOPPED, "Playback stopped.") def seek_to_time(self, timestamp: float): """Jumps the playback to a specific timestamp within the session.""" with self._control_lock: # Clamp the timestamp to be within the valid session range self._virtual_time = max(self.start_ts, min(timestamp, self.end_ts)) module_logger.info(f"{self.name}: Seeking playback to timestamp {self._virtual_time:.0f}.") def reset(self): """Resets the playback to the beginning of the session.""" with self._control_lock: self._virtual_time = self.start_ts module_logger.info(f"{self.name}: Playback reset to the beginning.") def force_update_for_timestamp(self, timestamp: float): """ Queries all data from the session start up to the specified timestamp and puts it into the output queue as a single "frame". """ if not self.data_storage: module_logger.error(f"{self.name}: DataStorage not available. Cannot force update.") return module_logger.info(f"{self.name}: Forcing data update for all history up to timestamp {timestamp:.0f}.") try: # Query for ALL data from the beginning of the session up to the seek time. all_states_until_seek = self.data_storage.get_positions_in_range( self.session_date_str, self.start_ts, timestamp ) # Il "payload" ora contiene la storia completa fino a quel punto. # Il MapCanvasManager userĂ  questi dati per costruire le tracce e mostrare l'ultimo stato. payload = {"canonical": all_states_until_seek, "raw_json": "{}"} message: AdapterMessage = { "type": MSG_TYPE_FLIGHT_DATA, "timestamp": timestamp, # L'orologio si sincronizza con l'istante scelto "payload": payload, } self.output_queue.put(message) except Exception as e: module_logger.error(f"{self.name}: Error during forced data update: {e}", exc_info=True)