# FlightMonitor/data/opensky_live_adapter.py """ Adapter for fetching live flight data from the OpenSky Network API. This adapter polls the API periodically, transforms the raw data into CanonicalFlightState objects, and puts structured messages into an output queue. """ import requests import json import time import threading from queue import Queue, Full as QueueFull # Import QueueFull for specific exception handling from typing import List, Optional, Dict, Any, Union # Relative imports from . import config as app_config from ..utils.logger import get_logger from .common_models import CanonicalFlightState module_logger = get_logger(__name__) # flightmonitor.data.opensky_live_adapter # --- Adapter Specific Constants --- PROVIDER_NAME = "OpenSkyNetwork" INITIAL_BACKOFF_DELAY_SECONDS = 20.0 # Increased initial backoff slightly MAX_BACKOFF_DELAY_SECONDS = 300.0 # Max 5 minutes BACKOFF_FACTOR = 1.8 MAX_CONSECUTIVE_ERRORS_THRESHOLD = 5 # --- Message Types and Status Codes for Output Queue --- AdapterMessage = Dict[str, Any] FlightDataPayload = List[CanonicalFlightState] MSG_TYPE_FLIGHT_DATA = "flight_data" MSG_TYPE_ADAPTER_STATUS = "adapter_status" STATUS_STARTING = "STARTING" # Adapter thread is starting STATUS_FETCHING = "FETCHING" # Actively fetching data now STATUS_RECOVERED = "RECOVERED" # Recovered from a previous error state STATUS_RATE_LIMITED = "RATE_LIMITED" # Hit API rate limit STATUS_API_ERROR_TEMPORARY = "API_ERROR_TEMPORARY" # Other temporary API/network error STATUS_PERMANENT_FAILURE = "PERMANENT_FAILURE" # Too many errors, giving up STATUS_STOPPING = "STOPPING" # Adapter thread is stopping STATUS_STOPPED = "STOPPED" # Adapter thread has stopped cleanly class OpenSkyLiveAdapter(threading.Thread): """ Polls the OpenSky Network API, converts data to CanonicalFlightState, and sends structured messages (flight data or status updates) to an output queue. Implements exponential backoff for API errors. """ def __init__(self, output_queue: Queue[AdapterMessage], bounding_box: Dict[str, float], polling_interval: int = app_config.LIVE_POLLING_INTERVAL_SECONDS, api_timeout: int = app_config.DEFAULT_API_TIMEOUT_SECONDS, daemon: bool = True): thread_name = f"OpenSkyLiveAdapter-bbox-{bounding_box.get('lat_min',0):.1f}" super().__init__(daemon=daemon, name=thread_name) if output_queue is None: # This is a critical setup error. module_logger.critical(f"{self.name}: Output queue cannot be None during initialization.") raise ValueError("Output queue must be provided to OpenSkyLiveAdapter.") self.output_queue = output_queue self.bounding_box = bounding_box self.base_polling_interval = float(polling_interval) # Ensure float for time calculations self.api_timeout = float(api_timeout) self._stop_event = threading.Event() self._consecutive_api_errors: int = 0 self._current_backoff_delay: float = 0.0 # Current calculated delay for backoff self._in_backoff_mode: bool = False module_logger.debug( f"{self.name} initialized. BBox: {self.bounding_box}, " f"Base Interval: {self.base_polling_interval}s, API Timeout: {self.api_timeout}s" ) def _send_status_to_queue(self, status_code: str, message: str, details: Optional[Dict] = None): """Helper to put a status message into the output queue.""" try: status_payload = { "type": MSG_TYPE_ADAPTER_STATUS, "status_code": status_code, "message": f"{self.name}: {message}", # Prepend adapter name for clarity } if details: status_payload["details"] = details self.output_queue.put_nowait(status_payload) module_logger.debug(f"{self.name}: Sent status '{status_code}' to queue. Msg: {message}") except QueueFull: module_logger.warning(f"{self.name}: Output queue is full. Could not send status: {status_code}") except Exception as e: module_logger.error(f"{self.name}: Error sending status '{status_code}' to queue: {e}", exc_info=True) def stop(self): """Signals the thread to stop its execution loop and sends a STOPPING status.""" module_logger.info(f"Stop signal received for {self.name}. Signaling stop event.") self._send_status_to_queue(STATUS_STOPPING, "Stop signal received, attempting to terminate.") self._stop_event.set() def _parse_state_vector(self, raw_sv: list) -> Optional[CanonicalFlightState]: """Parses a raw OpenSky state vector into a CanonicalFlightState object.""" try: if not raw_sv or len(raw_sv) < 17: module_logger.warning(f"Skipping incomplete state vector (length {len(raw_sv)}): {raw_sv}") return None icao24 = raw_sv[0] if not isinstance(icao24, str) or not icao24.strip(): module_logger.warning(f"Skipping state vector due to invalid/missing ICAO24: '{icao24}' in {raw_sv}") return None api_time_position = float(raw_sv[3]) if raw_sv[3] is not None else None api_last_contact = float(raw_sv[4]) if raw_sv[4] is not None else None primary_timestamp: float if api_last_contact is not None: primary_timestamp = api_last_contact elif api_time_position is not None: primary_timestamp = api_time_position else: primary_timestamp = time.time() module_logger.warning(f"ICAO {icao24}: Using current time as primary_timestamp due to missing API timestamps (last_contact={api_last_contact}, time_position={api_time_position}).") last_contact_for_model = api_last_contact if api_last_contact is not None else primary_timestamp state = CanonicalFlightState( icao24=icao24, callsign=raw_sv[1], origin_country=raw_sv[2], timestamp=primary_timestamp, last_contact_timestamp=last_contact_for_model, latitude=float(raw_sv[5]) if raw_sv[5] is not None else None, longitude=float(raw_sv[6]) if raw_sv[6] is not None else None, baro_altitude_m=float(raw_sv[7]) if raw_sv[7] is not None else None, on_ground=bool(raw_sv[8]), velocity_mps=float(raw_sv[9]) if raw_sv[9] is not None else None, true_track_deg=float(raw_sv[10]) if raw_sv[10] is not None else None, vertical_rate_mps=float(raw_sv[11]) if raw_sv[11] is not None else None, geo_altitude_m=float(raw_sv[13]) if raw_sv[13] is not None else None, squawk=raw_sv[14], spi=bool(raw_sv[15]), position_source=int(raw_sv[16]) if raw_sv[16] is not None else None, raw_data_provider=PROVIDER_NAME ) return state except (TypeError, ValueError, IndexError) as e: module_logger.error(f"Error parsing state vector for ICAO '{raw_sv[0] if raw_sv else 'UNKNOWN'}': {e}. Vector: {raw_sv}", exc_info=False) # Less verbose exc_info return None def _perform_api_request(self) -> Dict[str, Any]: """ Performs API request and returns a structured result. Result keys: 'data' (List[CanonicalFlightState]) on success, 'error_type' (str, e.g. STATUS_RATE_LIMITED) on failure, plus other error details. """ if not self.bounding_box: return {"error_type": STATUS_API_ERROR_TEMPORARY, "message": "Bounding box not set."} params = { "lamin": self.bounding_box["lat_min"], "lomin": self.bounding_box["lon_min"], "lamax": self.bounding_box["lat_max"], "lomax": self.bounding_box["lon_max"], } self._send_status_to_queue(STATUS_FETCHING, f"Requesting data for bbox: {self.bounding_box}") response: Optional[requests.Response] = None # Define response here for wider scope try: response = requests.get(app_config.OPENSKY_API_URL, params=params, timeout=self.api_timeout) module_logger.debug(f"{self.name}: API Response Status: {response.status_code} {response.reason}") if response.status_code == 429: # Rate limit self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay(response.headers.get("Retry-After")) err_msg = f"Rate limited (429). Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.warning(f"{self.name}: {err_msg}") return {"error_type": STATUS_RATE_LIMITED, "delay": delay, "message": err_msg, "consecutive_errors": self._consecutive_api_errors} response.raise_for_status() # Other HTTP errors self._reset_error_state() # Success, reset error counters response_data = response.json() raw_states_list = response_data.get("states") canonical_states: List[CanonicalFlightState] = [] if raw_states_list is not None: for raw_sv in raw_states_list: parsed_state = self._parse_state_vector(raw_sv) if parsed_state: canonical_states.append(parsed_state) module_logger.info(f"{self.name}: Fetched and parsed {len(canonical_states)} flight states.") else: module_logger.info(f"{self.name}: API returned no flight states ('states' is null or empty).") return {"data": canonical_states} # Success except requests.exceptions.HTTPError as http_err: # Other 4xx/5xx self._consecutive_api_errors += 1; self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() status_code = http_err.response.status_code if http_err.response else 'N/A' err_msg = f"HTTP error {status_code}: {http_err}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=False) return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": status_code, "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} except requests.exceptions.Timeout: self._consecutive_api_errors += 1; self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() err_msg = f"API request timed out. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=False) return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "TIMEOUT", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} except (requests.exceptions.RequestException, json.JSONDecodeError) as e: self._consecutive_api_errors += 1; self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() err_msg = f"Request/JSON error: {e}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=True) # exc_info for these unexpected ones return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "REQUEST_JSON_ERROR", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} except Exception as e: # Catch-all self._consecutive_api_errors += 1; self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() err_msg = f"Unexpected critical error: {e}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.critical(f"{self.name}: {err_msg}", exc_info=True) return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "UNEXPECTED_ADAPTER_ERROR", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} def _calculate_next_backoff_delay(self, provided_retry_after: Optional[str] = None) -> float: """Calculates exponential backoff delay, respecting Retry-After header if provided.""" calculated_delay = INITIAL_BACKOFF_DELAY_SECONDS * (BACKOFF_FACTOR ** (self._consecutive_api_errors - 1)) if provided_retry_after and provided_retry_after.isdigit(): api_delay = float(provided_retry_after) self._current_backoff_delay = max(api_delay, calculated_delay) # Respect API if it asks for longer else: self._current_backoff_delay = calculated_delay self._current_backoff_delay = min(self._current_backoff_delay, MAX_BACKOFF_DELAY_SECONDS) self._current_backoff_delay = max(self._current_backoff_delay, 1.0) # Ensure at least 1s delay return self._current_backoff_delay def _reset_error_state(self): """Resets error counters and backoff mode, sends RECOVERED status if was in error.""" if self._in_backoff_mode or self._consecutive_api_errors > 0: module_logger.info(f"{self.name}: API connection successful after {self._consecutive_api_errors} error(s). Resetting backoff.") self._send_status_to_queue(STATUS_RECOVERED, "API connection recovered.") self._consecutive_api_errors = 0 self._current_backoff_delay = 0.0 self._in_backoff_mode = False def run(self): module_logger.info(f"{self.name} thread started. Base polling interval: {self.base_polling_interval:.1f}s.") self._send_status_to_queue(STATUS_STARTING, "Adapter thread started, preparing initial fetch.") while not self._stop_event.is_set(): if self._consecutive_api_errors >= MAX_CONSECUTIVE_ERRORS_THRESHOLD: perm_fail_msg = f"Reached max ({self._consecutive_api_errors}) consecutive API errors. Stopping live updates." module_logger.critical(f"{self.name}: {perm_fail_msg}") self._send_status_to_queue(STATUS_PERMANENT_FAILURE, perm_fail_msg) self.stop() # This will trigger sending STATUS_STOPPING then STATUS_STOPPED break # Perform the API request and get a structured result api_result = self._perform_api_request() # This now sends its own STATUS_FETCHING # Process the result and send appropriate message to the queue if "data" in api_result: # Success flight_data_payload: List[CanonicalFlightState] = api_result["data"] try: self.output_queue.put_nowait({ "type": MSG_TYPE_FLIGHT_DATA, "payload": flight_data_payload }) module_logger.debug(f"{self.name}: Sent {len(flight_data_payload)} flight states to queue.") except QueueFull: module_logger.warning(f"{self.name}: Output queue full. Discarding {len(flight_data_payload)} flight states.") except Exception as e: module_logger.error(f"{self.name}: Error putting flight data into queue: {e}", exc_info=True) elif "error_type" in api_result: # An error occurred # Error details already logged by _perform_api_request # Send a status update to the controller error_details_for_controller = api_result.copy() # Make a copy to add type error_details_for_controller["type"] = MSG_TYPE_ADAPTER_STATUS error_details_for_controller["status_code"] = api_result["error_type"] # Standardize key # Message is already in api_result['message'] try: self.output_queue.put_nowait(error_details_for_controller) except QueueFull: module_logger.warning(f"{self.name}: Output queue full. Discarding error status: {api_result['error_type']}") except Exception as e: module_logger.error(f"{self.name}: Error putting error status into queue: {e}", exc_info=True) else: module_logger.error(f"{self.name}: Unknown result structure from _perform_api_request: {api_result}") # Determine wait time for the next cycle time_to_wait_seconds: float if self._in_backoff_mode: time_to_wait_seconds = self._current_backoff_delay module_logger.debug(f"{self.name}: In backoff, next attempt in {time_to_wait_seconds:.1f}s.") else: time_to_wait_seconds = self.base_polling_interval module_logger.debug(f"{self.name}: Next fetch cycle in {time_to_wait_seconds:.1f}s.") # Wait, checking for stop event periodically # Use _stop_event.wait(timeout) for a cleaner interruptible sleep if self._stop_event.wait(timeout=time_to_wait_seconds): module_logger.debug(f"{self.name}: Stop event received during wait period.") break # Exit while loop if stop event is set self._send_status_to_queue(STATUS_STOPPED, "Adapter thread terminated.") module_logger.info(f"{self.name} thread event loop finished.")