# FlightMonitor/data/opensky_historical_adapter.py """ Adapter for fetching historical flight data from the OpenSky Network API. This adapter iterates over a specified time range, fetching data at discrete intervals. """ import time import threading from queue import Queue, Full as QueueFull from typing import Dict, Any, List, Optional from opensky_api import OpenSkyApi # type: ignore from . import config as app_config from ..utils.logger import get_logger from .common_models import CanonicalFlightState from .base_live_data_adapter import BaseLiveDataAdapter, AdapterMessage from .data_constants import ( MSG_TYPE_FLIGHT_DATA, MSG_TYPE_ADAPTER_STATUS, STATUS_STARTING, STATUS_FETCHING, STATUS_STOPPED, PROVIDER_NAME_OPENSKY, ) module_logger = get_logger(__name__) class OpenSkyHistoricalAdapter(BaseLiveDataAdapter): """ Fetches historical flight data from OpenSky for a given time range and bounding box. """ def __init__( self, output_queue: Queue[AdapterMessage], bounding_box: Dict[str, float], start_timestamp: int, end_timestamp: int, sampling_interval_sec: int, scan_rate_sec: int, daemon: bool = True, ): """ Initializes the OpenSkyHistoricalAdapter. Args: output_queue: The queue to send data messages to. bounding_box: The geographical area to scan. start_timestamp: The beginning of the time range (Unix epoch). end_timestamp: The end of the time range (Unix epoch). sampling_interval_sec: The time step in seconds between each historical data fetch. scan_rate_sec: The real-world time in seconds to wait between API calls to avoid rate limiting. daemon: Whether the thread should be a daemon. """ # We reuse polling_interval for scan_rate_sec to fit the base class super().__init__( output_queue, bounding_box, float(scan_rate_sec), daemon, name="OpenSkyHistoricalAdapter", ) self.start_timestamp = int(start_timestamp) self.end_timestamp = int(end_timestamp) self.sampling_interval = int(sampling_interval_sec) self.api_client_anonymous: Optional[OpenSkyApi] = None try: self.api_client_anonymous = OpenSkyApi() module_logger.info( f"{self.name}: OpenSkyApi client (anonymous access) initialized." ) except Exception as e: module_logger.critical( f"{self.name}: Failed to initialize anonymous OpenSkyApi client: {e}", exc_info=True, ) raise RuntimeError(f"Failed to initialize OpenSkyApi client: {e}") from e def _send_status_to_queue(self, status_code: str, message: str): """Helper to send status messages to the output queue.""" status_payload = { "type": MSG_TYPE_ADAPTER_STATUS, "status_code": status_code, "message": f"{self.name}: {message}", } try: self.output_queue.put_nowait(status_payload) except QueueFull: module_logger.warning( f"{self.name}: Output queue full. Could not send status: {status_code}" ) def _convert_opensky_state_to_canonical( self, sv_opensky_lib: Any ) -> Optional[CanonicalFlightState]: """Converts an OpenSky StateVector object to a CanonicalFlightState.""" try: if sv_opensky_lib.icao24 is None: return None primary_ts = ( sv_opensky_lib.time_position if sv_opensky_lib.time_position is not None else sv_opensky_lib.last_contact ) last_contact_ts = sv_opensky_lib.last_contact if primary_ts is None: primary_ts = time.time() if last_contact_ts is None: last_contact_ts = primary_ts return CanonicalFlightState( icao24=sv_opensky_lib.icao24, callsign=( sv_opensky_lib.callsign.strip() if sv_opensky_lib.callsign else None ), origin_country=sv_opensky_lib.origin_country, timestamp=float(primary_ts), last_contact_timestamp=float(last_contact_ts), latitude=sv_opensky_lib.latitude, longitude=sv_opensky_lib.longitude, baro_altitude_m=sv_opensky_lib.baro_altitude, on_ground=sv_opensky_lib.on_ground, velocity_mps=sv_opensky_lib.velocity, true_track_deg=sv_opensky_lib.true_track, vertical_rate_mps=sv_opensky_lib.vertical_rate, geo_altitude_m=sv_opensky_lib.geo_altitude, squawk=sv_opensky_lib.squawk, spi=sv_opensky_lib.spi, position_source=( str(sv_opensky_lib.position_source) if sv_opensky_lib.position_source is not None else None ), raw_data_provider=PROVIDER_NAME_OPENSKY, ) except Exception as e: module_logger.error( f"{self.name}: Error converting OpenSkyApi StateVector: {e}", exc_info=False, ) return None def run(self): """Main thread loop for the historical adapter.""" module_logger.info( f"{self.name}: Thread starting historical download from {self.start_timestamp} " f"to {self.end_timestamp} with sampling interval {self.sampling_interval}s." ) self._send_status_to_queue( STATUS_STARTING, "Historical download thread started." ) if not self.api_client_anonymous: module_logger.critical( f"{self.name}: Anonymous client not available. Cannot fetch historical data." ) self._send_status_to_queue(STATUS_STOPPED, "Adapter failed to initialize.") return api_bbox_tuple = ( self.bounding_box["lat_min"], self.bounding_box["lat_max"], self.bounding_box["lon_min"], self.bounding_box["lon_max"], ) for current_time in range( self.start_timestamp, self.end_timestamp + 1, self.sampling_interval ): if self.is_stopped(): module_logger.info(f"{self.name}: Stop signal received. Terminating.") break try: self._send_status_to_queue(STATUS_FETCHING, f"Fetching data for timestamp {current_time}...") states_response = self.api_client_anonymous.get_states( time_sec=current_time, bbox=api_bbox_tuple ) canonical_states: List[CanonicalFlightState] = [] if states_response and states_response.states: for sv in states_response.states: canonical_sv = self._convert_opensky_state_to_canonical(sv) if canonical_sv: canonical_states.append(canonical_sv) # Create and queue the data message data_message: AdapterMessage = { "type": MSG_TYPE_FLIGHT_DATA, "timestamp": current_time, "payload": canonical_states, } self.output_queue.put(data_message) module_logger.debug( f"{self.name}: Sent {len(canonical_states)} states for timestamp {current_time}." ) except Exception as e: module_logger.error( f"{self.name}: Error fetching historical data for timestamp {current_time}: {e}", exc_info=True, ) # In a real scenario, you might want a more robust error handling # like backoff or skipping, but for now we just log and continue. # Wait for the specified scan rate before the next API call if self.polling_interval > 0: if self._stop_event.wait(timeout=self.polling_interval): module_logger.info(f"{self.name}: Stop signal received during wait period. Exiting loop.") break # Send a final 'stopped' message to signal completion self._send_status_to_queue(STATUS_STOPPED, "Historical download finished.") module_logger.info(f"{self.name}: Historical download loop finished.")