178 lines
6.9 KiB
Python
178 lines
6.9 KiB
Python
# FlightMonitor/data/playback_adapter.py
|
|
"""
|
|
Adapter for "playing back" recorded flight data from a local database.
|
|
This adapter reads data in time chunks and simulates a real-time data flow.
|
|
"""
|
|
import time
|
|
import threading
|
|
from queue import Queue, Full as QueueFull
|
|
from typing import Dict, Any, List, Optional
|
|
|
|
from flightmonitor.utils.logger import get_logger
|
|
from flightmonitor.data.common_models import CanonicalFlightState
|
|
from flightmonitor.data.storage import DataStorage
|
|
from flightmonitor.data.base_live_data_adapter import BaseLiveDataAdapter, AdapterMessage
|
|
from flightmonitor.data.data_constants import (
|
|
MSG_TYPE_FLIGHT_DATA,
|
|
MSG_TYPE_ADAPTER_STATUS,
|
|
STATUS_STARTING,
|
|
STATUS_FETCHING,
|
|
STATUS_STOPPED,
|
|
STATUS_PERMANENT_FAILURE,
|
|
)
|
|
|
|
module_logger = get_logger(__name__)
|
|
|
|
|
|
class PlaybackAdapter(BaseLiveDataAdapter):
|
|
"""
|
|
Simulates a live data feed by reading historical data from the daily database
|
|
for a specific session and yielding it in time-ordered chunks.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
output_queue: Queue[AdapterMessage],
|
|
data_storage: DataStorage,
|
|
session_date_str: str,
|
|
start_timestamp: float,
|
|
end_timestamp: float,
|
|
bounding_box: Dict[str, float],
|
|
playback_speed: float = 1.0,
|
|
tick_interval_ms: int = 1000,
|
|
daemon: bool = True,
|
|
):
|
|
# The polling_interval for the base class here represents the loop's tick rate
|
|
super().__init__(
|
|
output_queue,
|
|
bounding_box,
|
|
float(tick_interval_ms / 1000.0),
|
|
daemon,
|
|
name="PlaybackAdapter",
|
|
)
|
|
self.data_storage = data_storage
|
|
self.session_date_str = session_date_str
|
|
self.start_ts = start_timestamp
|
|
self.end_ts = end_timestamp
|
|
|
|
self._playback_speed = playback_speed
|
|
self._tick_interval_ms = tick_interval_ms
|
|
self._control_lock = threading.Lock()
|
|
self._is_paused = False
|
|
self._virtual_time = self.start_ts
|
|
|
|
def pause(self):
|
|
"""Pauses the playback."""
|
|
with self._control_lock:
|
|
self._is_paused = True
|
|
module_logger.info(f"{self.name}: Playback paused.")
|
|
|
|
def resume(self):
|
|
"""Resumes the playback."""
|
|
with self._control_lock:
|
|
self._is_paused = False
|
|
module_logger.info(f"{self.name}: Playback resumed.")
|
|
|
|
def set_speed(self, speed_multiplier: float):
|
|
"""Sets the playback speed."""
|
|
with self._control_lock:
|
|
self._playback_speed = max(0.1, speed_multiplier) # Prevent zero or negative speed
|
|
module_logger.info(f"{self.name}: Playback speed set to {self._playback_speed}x.")
|
|
|
|
def seek_to_time(self, timestamp: float):
|
|
"""Jumps the playback to a specific timestamp within the session."""
|
|
with self._control_lock:
|
|
self._virtual_time = max(self.start_ts, min(timestamp, self.end_ts))
|
|
module_logger.info(f"{self.name}: Seeking playback to timestamp {self._virtual_time:.0f}.")
|
|
|
|
def _send_status_to_queue(self, status_code: str, message: str):
|
|
"""Helper to send status messages to the output queue."""
|
|
status_payload = {
|
|
"type": MSG_TYPE_ADAPTER_STATUS,
|
|
"status_code": status_code,
|
|
"message": f"{self.name}: {message}",
|
|
}
|
|
try:
|
|
self.output_queue.put_nowait(status_payload)
|
|
except QueueFull:
|
|
module_logger.warning(
|
|
f"{self.name}: Output queue full. Could not send status: {status_code}"
|
|
)
|
|
|
|
def run(self):
|
|
"""Main thread loop for the playback adapter."""
|
|
module_logger.info(
|
|
f"{self.name}: Thread starting playback for session on {self.session_date_str} "
|
|
f"from {self.start_ts:.0f} to {self.end_ts:.0f}."
|
|
)
|
|
self._send_status_to_queue(STATUS_STARTING, "Playback session started.")
|
|
|
|
if not self.data_storage:
|
|
err_msg = "DataStorage not available. Cannot start playback."
|
|
module_logger.critical(f"{self.name}: {err_msg}")
|
|
self._send_status_to_queue(STATUS_PERMANENT_FAILURE, err_msg)
|
|
return
|
|
|
|
while not self.is_stopped() and self._virtual_time <= self.end_ts:
|
|
with self._control_lock:
|
|
is_paused_now = self._is_paused
|
|
current_speed = self._playback_speed
|
|
|
|
if is_paused_now:
|
|
time.sleep(self.polling_interval) # Sleep while paused
|
|
continue
|
|
|
|
time_to_fetch_sec = self.polling_interval * current_speed
|
|
query_start_ts = self._virtual_time
|
|
query_end_ts = self._virtual_time + time_to_fetch_sec
|
|
|
|
try:
|
|
# We query the DB for the next chunk of data
|
|
flight_states = self.data_storage.get_positions_in_range(
|
|
self.session_date_str, query_start_ts, query_end_ts
|
|
)
|
|
|
|
if flight_states:
|
|
# Create a payload structure similar to other adapters for consistency
|
|
payload = {"canonical": flight_states, "raw_json": "{}"}
|
|
message: AdapterMessage = {
|
|
"type": MSG_TYPE_FLIGHT_DATA,
|
|
"timestamp": query_end_ts, # The "current" time of the playback
|
|
"payload": payload,
|
|
}
|
|
self.output_queue.put(message)
|
|
module_logger.debug(
|
|
f"{self.name}: Sent {len(flight_states)} flight states for time "
|
|
f"~{int(query_end_ts)} to queue."
|
|
)
|
|
else:
|
|
# Even if no flights, we should send a message to update the clock
|
|
empty_message: AdapterMessage = {
|
|
"type": MSG_TYPE_FLIGHT_DATA,
|
|
"timestamp": query_end_ts,
|
|
"payload": {"canonical": [], "raw_json": "{}"},
|
|
}
|
|
self.output_queue.put(empty_message)
|
|
|
|
except Exception as e:
|
|
module_logger.error(
|
|
f"{self.name}: Error querying data for playback: {e}", exc_info=True
|
|
)
|
|
self._send_status_to_queue(
|
|
STATUS_PERMANENT_FAILURE, f"DB query failed: {e}"
|
|
)
|
|
break
|
|
|
|
# Update virtual time
|
|
with self._control_lock:
|
|
self._virtual_time += time_to_fetch_sec
|
|
|
|
# Wait for the tick interval to simulate real-time flow
|
|
time.sleep(self.polling_interval)
|
|
|
|
if not self.is_stopped():
|
|
module_logger.info(f"{self.name}: Playback session finished.")
|
|
self._send_status_to_queue(STATUS_STOPPED, "Playback finished.")
|
|
else:
|
|
module_logger.info(f"{self.name}: Playback stopped by user.")
|
|
self._send_status_to_queue(STATUS_STOPPED, "Playback stopped.") |