SXXXXXXX_FlightMonitor/flightmonitor/data/playback_adapter.py

178 lines
6.9 KiB
Python

# FlightMonitor/data/playback_adapter.py
"""
Adapter for "playing back" recorded flight data from a local database.
This adapter reads data in time chunks and simulates a real-time data flow.
"""
import time
import threading
from queue import Queue, Full as QueueFull
from typing import Dict, Any, List, Optional
from flightmonitor.utils.logger import get_logger
from flightmonitor.data.common_models import CanonicalFlightState
from flightmonitor.data.storage import DataStorage
from flightmonitor.data.base_live_data_adapter import BaseLiveDataAdapter, AdapterMessage
from flightmonitor.data.data_constants import (
MSG_TYPE_FLIGHT_DATA,
MSG_TYPE_ADAPTER_STATUS,
STATUS_STARTING,
STATUS_FETCHING,
STATUS_STOPPED,
STATUS_PERMANENT_FAILURE,
)
module_logger = get_logger(__name__)
class PlaybackAdapter(BaseLiveDataAdapter):
"""
Simulates a live data feed by reading historical data from the daily database
for a specific session and yielding it in time-ordered chunks.
"""
def __init__(
self,
output_queue: Queue[AdapterMessage],
data_storage: DataStorage,
session_date_str: str,
start_timestamp: float,
end_timestamp: float,
bounding_box: Dict[str, float],
playback_speed: float = 1.0,
tick_interval_ms: int = 1000,
daemon: bool = True,
):
# The polling_interval for the base class here represents the loop's tick rate
super().__init__(
output_queue,
bounding_box,
float(tick_interval_ms / 1000.0),
daemon,
name="PlaybackAdapter",
)
self.data_storage = data_storage
self.session_date_str = session_date_str
self.start_ts = start_timestamp
self.end_ts = end_timestamp
self._playback_speed = playback_speed
self._tick_interval_ms = tick_interval_ms
self._control_lock = threading.Lock()
self._is_paused = False
self._virtual_time = self.start_ts
def pause(self):
"""Pauses the playback."""
with self._control_lock:
self._is_paused = True
module_logger.info(f"{self.name}: Playback paused.")
def resume(self):
"""Resumes the playback."""
with self._control_lock:
self._is_paused = False
module_logger.info(f"{self.name}: Playback resumed.")
def set_speed(self, speed_multiplier: float):
"""Sets the playback speed."""
with self._control_lock:
self._playback_speed = max(0.1, speed_multiplier) # Prevent zero or negative speed
module_logger.info(f"{self.name}: Playback speed set to {self._playback_speed}x.")
def seek_to_time(self, timestamp: float):
"""Jumps the playback to a specific timestamp within the session."""
with self._control_lock:
self._virtual_time = max(self.start_ts, min(timestamp, self.end_ts))
module_logger.info(f"{self.name}: Seeking playback to timestamp {self._virtual_time:.0f}.")
def _send_status_to_queue(self, status_code: str, message: str):
"""Helper to send status messages to the output queue."""
status_payload = {
"type": MSG_TYPE_ADAPTER_STATUS,
"status_code": status_code,
"message": f"{self.name}: {message}",
}
try:
self.output_queue.put_nowait(status_payload)
except QueueFull:
module_logger.warning(
f"{self.name}: Output queue full. Could not send status: {status_code}"
)
def run(self):
"""Main thread loop for the playback adapter."""
module_logger.info(
f"{self.name}: Thread starting playback for session on {self.session_date_str} "
f"from {self.start_ts:.0f} to {self.end_ts:.0f}."
)
self._send_status_to_queue(STATUS_STARTING, "Playback session started.")
if not self.data_storage:
err_msg = "DataStorage not available. Cannot start playback."
module_logger.critical(f"{self.name}: {err_msg}")
self._send_status_to_queue(STATUS_PERMANENT_FAILURE, err_msg)
return
while not self.is_stopped() and self._virtual_time <= self.end_ts:
with self._control_lock:
is_paused_now = self._is_paused
current_speed = self._playback_speed
if is_paused_now:
time.sleep(self.polling_interval) # Sleep while paused
continue
time_to_fetch_sec = self.polling_interval * current_speed
query_start_ts = self._virtual_time
query_end_ts = self._virtual_time + time_to_fetch_sec
try:
# We query the DB for the next chunk of data
flight_states = self.data_storage.get_positions_in_range(
self.session_date_str, query_start_ts, query_end_ts
)
if flight_states:
# Create a payload structure similar to other adapters for consistency
payload = {"canonical": flight_states, "raw_json": "{}"}
message: AdapterMessage = {
"type": MSG_TYPE_FLIGHT_DATA,
"timestamp": query_end_ts, # The "current" time of the playback
"payload": payload,
}
self.output_queue.put(message)
module_logger.debug(
f"{self.name}: Sent {len(flight_states)} flight states for time "
f"~{int(query_end_ts)} to queue."
)
else:
# Even if no flights, we should send a message to update the clock
empty_message: AdapterMessage = {
"type": MSG_TYPE_FLIGHT_DATA,
"timestamp": query_end_ts,
"payload": {"canonical": [], "raw_json": "{}"},
}
self.output_queue.put(empty_message)
except Exception as e:
module_logger.error(
f"{self.name}: Error querying data for playback: {e}", exc_info=True
)
self._send_status_to_queue(
STATUS_PERMANENT_FAILURE, f"DB query failed: {e}"
)
break
# Update virtual time
with self._control_lock:
self._virtual_time += time_to_fetch_sec
# Wait for the tick interval to simulate real-time flow
time.sleep(self.polling_interval)
if not self.is_stopped():
module_logger.info(f"{self.name}: Playback session finished.")
self._send_status_to_queue(STATUS_STOPPED, "Playback finished.")
else:
module_logger.info(f"{self.name}: Playback stopped by user.")
self._send_status_to_queue(STATUS_STOPPED, "Playback stopped.")