# FlightMonitor/data/opensky_live_adapter.py """ Adapter for fetching live flight data from the OpenSky Network API. Supports both anonymous access (via opensky-api library) and authenticated access using OAuth2 Client Credentials Flow (via direct requests). """ import requests import time import threading from queue import Queue, Empty as QueueEmpty, Full as QueueFull import random import os # Import added for environment variable checks from typing import Dict, Any, List, Optional, Tuple import warnings # Import OpenSkyApi for anonymous access (fallback) from opensky_api import OpenSkyApi # type: ignore from . import config as app_config from ..utils.logger import get_logger from .common_models import CanonicalFlightState from .base_live_data_adapter import BaseLiveDataAdapter, AdapterMessage from urllib3.exceptions import InsecureRequestWarning warnings.simplefilter('ignore', InsecureRequestWarning) module_logger = get_logger(__name__) # Constants from opensky_live_adapter or a shared constants module MSG_TYPE_FLIGHT_DATA = "flight_data" MSG_TYPE_ADAPTER_STATUS = "adapter_status" STATUS_STARTING = "STARTING" STATUS_FETCHING = "FETCHING" STATUS_RECOVERED = "RECOVERED" STATUS_RATE_LIMITED = "RATE_LIMITED" # Can still occur with anonymous or if token is invalid/revoked STATUS_API_ERROR_TEMPORARY = "API_ERROR_TEMPORARY" STATUS_PERMANENT_FAILURE = "PERMANENT_FAILURE" STATUS_STOPPING = "STOPPING" STATUS_STOPPED = "STOPPED" PROVIDER_NAME = "OpenSkyNetwork" INITIAL_BACKOFF_DELAY_SECONDS = 20.0 MAX_BACKOFF_DELAY_SECONDS = 300.0 BACKOFF_FACTOR = 1.8 MAX_CONSECUTIVE_ERRORS_THRESHOLD = 5 class OpenSkyLiveAdapter(BaseLiveDataAdapter): """ Polls the OpenSky Network API. Uses OAuth2 Client Credentials if configured, otherwise falls back to anonymous access via the opensky-api library. """ def __init__( self, output_queue: Queue[AdapterMessage], bounding_box: Dict[str, float], polling_interval: int = app_config.LIVE_POLLING_INTERVAL_SECONDS, daemon: bool = True, ): thread_name = f"OpenSkyAdapter-bbox-{bounding_box.get('lat_min',0):.1f}" super().__init__(output_queue, bounding_box, float(polling_interval), daemon, name=thread_name) self._consecutive_api_errors: int = 0 self._current_backoff_delay: float = 0.0 self._in_backoff_mode: bool = False self.use_oauth: bool = False self.client_id: Optional[str] = None self.client_secret: Optional[str] = None self.token_url: Optional[str] = None self.access_token: Optional[str] = None self.token_expires_at: float = 0.0 self.api_client_anonymous: Optional[OpenSkyApi] = None self.api_timeout = getattr(app_config, "DEFAULT_API_TIMEOUT_SECONDS", 15) # This timeout is used for direct OAuth calls self.use_oauth = getattr(app_config, "USE_OPENSKY_CREDENTIALS", False) if self.use_oauth: self.client_id = getattr(app_config, "OPENSKY_CLIENT_ID", None) self.client_secret = getattr(app_config, "OPENSKY_CLIENT_SECRET", None) self.token_url = getattr(app_config, "OPENSKY_TOKEN_URL", None) if not (self.client_id and self.client_secret and self.token_url): module_logger.warning( f"{self.name}: OAuth2 credentials/token URL missing in config, " f"but USE_OPENSKY_CREDENTIALS is True. Falling back to anonymous access." ) self.use_oauth = False else: module_logger.info(f"{self.name}: Configured to use OpenSky OAuth2 Client Credentials.") if not self.use_oauth: try: # The opensky-api library does not accept a 'timeout' argument in its constructor. self.api_client_anonymous = OpenSkyApi() # Timeout is not a valid argument here module_logger.info( f"{self.name}: OpenSkyApi client (anonymous access) initialized (uses internal timeout)." ) except Exception as e: module_logger.critical( f"{self.name}: Failed to initialize anonymous OpenSkyApi client: {e}", exc_info=True, ) raise RuntimeError(f"Failed to initialize OpenSkyApi client (anonymous): {e}") from e module_logger.debug( f"{self.name} initialized. OAuth Mode: {self.use_oauth}. " f"BBox: {self.bounding_box}, Interval: {self.polling_interval}s" ) def _get_oauth_token(self) -> bool: """ Requests an OAuth2 access token from OpenSky Network. Returns True if successful, False otherwise. """ if not (self.client_id and self.client_secret and self.token_url): module_logger.error(f"{self.name}: Cannot get OAuth token, client ID/secret/URL missing.") return False # Manually URL-encode the payload to ensure correct formatting from urllib.parse import urlencode payload_dict = { "grant_type": "client_credentials", "client_id": self.client_id, "client_secret": self.client_secret, } # The urlencode function will handle converting this to the correct string format encoded_payload = urlencode(payload_dict) headers = { "Content-Type": "application/x-www-form-urlencoded", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/119.0" } module_logger.info(f"{self.name}: Requesting OAuth2 access token from {self.token_url}...") try: # Pass the manually encoded string to the 'data' parameter response = requests.post(self.token_url, data=encoded_payload, headers=headers, timeout=self.api_timeout, verify=False) response.raise_for_status() token_data = response.json() self.access_token = token_data.get("access_token") expires_in = token_data.get("expires_in", 300) self.token_expires_at = time.time() + expires_in - 60 if self.access_token: module_logger.info(f"{self.name}: Successfully obtained OAuth2 access token. Expires in ~{expires_in // 60} minutes.") return True else: module_logger.error(f"{self.name}: Failed to obtain access token, 'access_token' field missing in response: {token_data}") return False except requests.exceptions.HTTPError as http_err: status_code = http_err.response.status_code if http_err.response else "N/A" try: error_details = http_err.response.json() error_text = f"JSON Response: {error_details}" except requests.exceptions.JSONDecodeError: error_text = f"Non-JSON Response: {http_err.response.text.strip()}" module_logger.error(f"{self.name}: HTTP error {status_code} obtaining OAuth token: {error_text}", exc_info=True) except Exception as e: module_logger.error(f"{self.name}: Unexpected error obtaining OAuth token: {e}", exc_info=True) self.access_token = None return False def _is_token_valid(self) -> bool: """Checks if the current access token is present and not expired (with buffer).""" return self.access_token is not None and time.time() < self.token_expires_at def _send_status_to_queue( self, status_code: str, message: str, details: Optional[Dict] = None ): status_payload = {"type": MSG_TYPE_ADAPTER_STATUS, "status_code": status_code, "message": f"{self.name}: {message}"} if details: status_payload["details"] = details try: self.output_queue.put_nowait(status_payload) except QueueFull: module_logger.warning(f"{self.name}: Output queue full. Could not send status: {status_code}") except Exception as e: module_logger.error(f"{self.name}: Error sending status '{status_code}' to queue: {e}", exc_info=True) def _convert_opensky_api_state_to_canonical(self, sv_opensky_lib: Any) -> Optional[CanonicalFlightState]: """Converts an OpenSkyApi library StateVector object to a CanonicalFlightState object.""" try: if sv_opensky_lib.icao24 is None: return None primary_ts = sv_opensky_lib.time_position if sv_opensky_lib.time_position is not None else sv_opensky_lib.last_contact last_contact_ts = sv_opensky_lib.last_contact if primary_ts is None: primary_ts = time.time() if last_contact_ts is None: last_contact_ts = primary_ts return CanonicalFlightState( icao24=sv_opensky_lib.icao24, callsign=(sv_opensky_lib.callsign.strip() if sv_opensky_lib.callsign else None), origin_country=sv_opensky_lib.origin_country, timestamp=float(primary_ts), last_contact_timestamp=float(last_contact_ts), latitude=sv_opensky_lib.latitude, longitude=sv_opensky_lib.longitude, baro_altitude_m=sv_opensky_lib.baro_altitude, on_ground=sv_opensky_lib.on_ground, velocity_mps=sv_opensky_lib.velocity, true_track_deg=sv_opensky_lib.true_track, vertical_rate_mps=sv_opensky_lib.vertical_rate, geo_altitude_m=sv_opensky_lib.geo_altitude, squawk=sv_opensky_lib.squawk, spi=sv_opensky_lib.spi, position_source=(str(sv_opensky_lib.position_source) if sv_opensky_lib.position_source is not None else None), raw_data_provider=PROVIDER_NAME) except Exception as e: module_logger.error(f"{self.name}: Error converting OpenSkyApi StateVector: {e}", exc_info=False); return None def _convert_direct_api_state_to_canonical(self, raw_state_list: List[Any]) -> Optional[CanonicalFlightState]: """ Converts a raw state vector list (from direct API call JSON response) to CanonicalFlightState. OpenSky direct API returns states as a list of values. Order: icao24, callsign, origin_country, time_position, last_contact, longitude, latitude, baro_altitude, on_ground, velocity, true_track, vertical_rate, sensors, geo_altitude, squawk, spi, position_source """ try: if not raw_state_list or len(raw_state_list) < 17: module_logger.warning(f"{self.name}: Received incomplete state vector list: {raw_state_list}") return None icao24 = raw_state_list[0] if not icao24: return None primary_ts = raw_state_list[3] if raw_state_list[3] is not None else raw_state_list[4] last_contact_ts = raw_state_list[4] if primary_ts is None: primary_ts = time.time() if last_contact_ts is None: last_contact_ts = primary_ts return CanonicalFlightState( icao24=str(icao24).lower().strip(), callsign=str(raw_state_list[1]).strip() if raw_state_list[1] else None, origin_country=str(raw_state_list[2]), timestamp=float(primary_ts), last_contact_timestamp=float(last_contact_ts), longitude=float(raw_state_list[5]) if raw_state_list[5] is not None else None, latitude=float(raw_state_list[6]) if raw_state_list[6] is not None else None, baro_altitude_m=float(raw_state_list[7]) if raw_state_list[7] is not None else None, on_ground=bool(raw_state_list[8]), velocity_mps=float(raw_state_list[9]) if raw_state_list[9] is not None else None, true_track_deg=float(raw_state_list[10]) if raw_state_list[10] is not None else None, vertical_rate_mps=float(raw_state_list[11]) if raw_state_list[11] is not None else None, # sensors = raw_state_list[12] # Typically not used directly in canonical model geo_altitude_m=float(raw_state_list[13]) if raw_state_list[13] is not None else None, squawk=str(raw_state_list[14]) if raw_state_list[14] else None, spi=bool(raw_state_list[15]), position_source=str(raw_state_list[16]), # Assuming position_source is an int/string raw_data_provider=PROVIDER_NAME ) except (IndexError, TypeError, ValueError) as e: module_logger.error(f"{self.name}: Error converting direct API state list to Canonical: {e}. List: {raw_state_list}", exc_info=False) return None except Exception as e_conv: module_logger.error(f"{self.name}: Unexpected error converting direct API state: {e_conv}. List: {raw_state_list}", exc_info=True) return None def _perform_api_request(self) -> Dict[str, Any]: if app_config.USE_MOCK_OPENSKY_API: module_logger.info(f"{self.name}: Using MOCK API data as per configuration.") self._send_status_to_queue(STATUS_FETCHING, "Generating mock flight data...") if app_config.MOCK_API_ERROR_SIMULATION == "RATE_LIMITED": self._consecutive_api_errors += 1 self._in_backoff_mode = True mock_retry_after = str(getattr(app_config, "MOCK_RETRY_AFTER_SECONDS", 60)) delay = self._calculate_next_backoff_delay(provided_retry_after=mock_retry_after) err_msg = f"MOCK: Rate limited. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.warning(f"{self.name}: {err_msg}") return {"error_type": STATUS_RATE_LIMITED, "delay": delay, "message": err_msg, "consecutive_errors": self._consecutive_api_errors, "status_code": 429} if app_config.MOCK_API_ERROR_SIMULATION == "HTTP_ERROR": self._consecutive_api_errors += 1; self._in_backoff_mode = True; delay = self._calculate_next_backoff_delay() mock_status_code = getattr(app_config, "MOCK_HTTP_ERROR_STATUS", 500) err_msg = f"MOCK: HTTP error {mock_status_code}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=False) return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": mock_status_code, "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} mock_states: List[CanonicalFlightState] = [] mock_count = getattr(app_config, "MOCK_API_FLIGHT_COUNT", 5) for i in range(mock_count): mock_states.append(self._generate_mock_flight_state(i + 1)) module_logger.info(f"{self.name}: Generated {len(mock_states)} mock flight states."); self._reset_error_state(); return {"data": mock_states} if not self.bounding_box: return {"error_type": STATUS_PERMANENT_FAILURE, "message": "Bounding box not set.", "status_code": "NO_BBOX", "delay": 0.0, "consecutive_errors": self._consecutive_api_errors} params = { "lamin": self.bounding_box["lat_min"], "lomin": self.bounding_box["lon_min"], "lamax": self.bounding_box["lat_max"], "lomax": self.bounding_box["lon_max"], } self._send_status_to_queue(STATUS_FETCHING, f"Requesting REAL data for bbox: {params}") try: canonical_states: List[CanonicalFlightState] = [] if self.use_oauth: # --- OAuth2 Authenticated Request --- if not self._is_token_valid(): if not self._get_oauth_token(): err_msg = "Failed to obtain/refresh OAuth2 token. Trying anonymous fallback if configured, or erroring." module_logger.error(f"{self.name}: {err_msg}") # If token fails, we could try anonymous as a fallback or just fail this attempt if not self.api_client_anonymous: # If anonymous is not even an option self._consecutive_api_errors += 1 return {"error_type": STATUS_API_ERROR_TEMPORARY, "message": err_msg, "status_code": "OAUTH_TOKEN_FAILURE", "delay": self._calculate_next_backoff_delay(), "consecutive_errors": self._consecutive_api_errors} else: # Fallback to anonymous for this attempt module_logger.warning(f"{self.name}: Attempting anonymous access due to OAuth token failure.") self.use_oauth = False # Temporarily disable for this cycle # Now the 'else' block below will handle it if self.use_oauth and self.access_token: # Double check token after potential refresh headers = {"Authorization": f"Bearer {self.access_token}"} response = requests.get(app_config.OPENSKY_API_URL, params=params, headers=headers, timeout=self.api_timeout) response.raise_for_status() json_response = response.json() # Direct API response structure: {"time": epoch, "states": [[...], [...]]} raw_states_list = json_response.get("states") if raw_states_list is not None: # Can be None if no aircraft for raw_state_vector in raw_states_list: cs = self._convert_direct_api_state_to_canonical(raw_state_vector) if cs: canonical_states.append(cs) module_logger.info(f"{self.name}: Fetched {len(canonical_states)} states via OAuth2.") else: # Anonymous access using opensky-api library if not self.api_client_anonymous: # This should not happen if __init__ logic is correct module_logger.critical(f"{self.name}: Anonymous API client not initialized! Cannot fetch data.") return {"error_type": STATUS_PERMANENT_FAILURE, "message": "Anonymous client not initialized.", "status_code": "CLIENT_INIT_ERROR", "delay": 0.0, "consecutive_errors": self._consecutive_api_errors} api_bbox_tuple = (self.bounding_box["lat_min"], self.bounding_box["lat_max"], self.bounding_box["lon_min"], self.bounding_box["lon_max"]) states_response_lib = self.api_client_anonymous.get_states(bbox=api_bbox_tuple) if states_response_lib and states_response_lib.states: for sv_lib in states_response_lib.states: cs = self._convert_opensky_api_state_to_canonical(sv_lib) if cs: canonical_states.append(cs) module_logger.info(f"{self.name}: Fetched {len(canonical_states)} states via anonymous access (opensky-api lib).") self._reset_error_state() return {"data": canonical_states} except requests.exceptions.HTTPError as http_err: # Covers both direct and library calls if they propagate self._consecutive_api_errors += 1; self._in_backoff_mode = True status_code = http_err.response.status_code if http_err.response else "N/A" err_msg = f"HTTP error {status_code}: {http_err}. Errors: {self._consecutive_api_errors}." delay = self._calculate_next_backoff_delay() if status_code == 401 or status_code == 403: # Unauthorized or Forbidden err_msg += " Check OAuth credentials/token or API key if used." # For OAuth, a 401 might mean token expired or invalid if self.use_oauth and (status_code == 401 or status_code == 403): self.access_token = None # Force token refresh on next attempt module_logger.warning(f"{self.name}: OAuth token might be invalid/expired (HTTP {status_code}). Forcing refresh.") module_logger.error(f"{self.name}: {err_msg} Retrying in {delay:.1f}s.", exc_info=False) return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": status_code, "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} except requests.exceptions.Timeout: self._consecutive_api_errors += 1; self._in_backoff_mode = True; delay = self._calculate_next_backoff_delay() err_msg = f"API request timed out. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=False) return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "TIMEOUT", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} except requests.exceptions.RequestException as req_err: # Other network errors self._consecutive_api_errors += 1; self._in_backoff_mode = True; delay = self._calculate_next_backoff_delay() err_msg = f"Network request error: {req_err}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=True) return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "REQUEST_EXCEPTION", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} except Exception as e: # Unexpected errors self._consecutive_api_errors += 1; self._in_backoff_mode = True; delay = self._calculate_next_backoff_delay() err_msg = f"Unexpected error during API request: {e}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.critical(f"{self.name}: {err_msg}", exc_info=True) return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "UNEXPECTED_ERROR", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} def _calculate_next_backoff_delay(self, provided_retry_after: Optional[str] = None) -> float: calculated_delay = INITIAL_BACKOFF_DELAY_SECONDS * (BACKOFF_FACTOR ** (self._consecutive_api_errors - 1)) api_delay_from_header = 0.0 if provided_retry_after and provided_retry_after.isdigit(): try: api_delay_from_header = float(provided_retry_after) except ValueError: api_delay_from_header = 0.0 self._current_backoff_delay = max(api_delay_from_header, calculated_delay) self._current_backoff_delay = min(self._current_backoff_delay, MAX_BACKOFF_DELAY_SECONDS) self._current_backoff_delay = max(self._current_backoff_delay, 1.0) return self._current_backoff_delay def _reset_error_state(self): if self._in_backoff_mode or self._consecutive_api_errors > 0: module_logger.info(f"{self.name}: API connection successful after {self._consecutive_api_errors} error(s). Resetting backoff.") self._send_status_to_queue(STATUS_RECOVERED, "API connection recovered.") self._consecutive_api_errors = 0 self._current_backoff_delay = 0.0 self._in_backoff_mode = False def run(self): initial_settle_delay_seconds = 0.1 if self._stop_event.wait(timeout=initial_settle_delay_seconds): module_logger.info(f"{self.name}: Stop signal during initial settle. Terminating."); return module_logger.info(f"{self.name}: Thread operational. Polling interval: {self.polling_interval:.1f}s. OAuth mode: {self.use_oauth}") self._send_status_to_queue(STATUS_STARTING, "Adapter thread started, preparing initial fetch.") # Initial token fetch if using OAuth if self.use_oauth and not self._get_oauth_token(): module_logger.error(f"{self.name}: Initial OAuth token fetch failed. Adapter may not function correctly or will use anonymous if fallback is enabled.") # Decide if we should stop or continue trying with anonymous (if use_oauth was forced False) while not self._stop_event.is_set(): if self._consecutive_api_errors >= MAX_CONSECUTIVE_ERRORS_THRESHOLD: perm_fail_msg = f"Reached max ({self._consecutive_api_errors}) API errors. Stopping live updates." module_logger.critical(f"{self.name}: {perm_fail_msg}") self._send_status_to_queue(STATUS_PERMANENT_FAILURE, perm_fail_msg); break api_result = self._perform_api_request() if self._stop_event.is_set(): module_logger.info(f"{self.name}: Stop event after API request. Exiting."); break if "data" in api_result: try: self.output_queue.put_nowait({"type": MSG_TYPE_FLIGHT_DATA, "payload": api_result["data"]}) except QueueFull: module_logger.warning(f"{self.name}: Output queue full. Discarding data.") except Exception as e: module_logger.error(f"{self.name}: Error putting data to queue: {e}", exc_info=True) elif "error_type" in api_result: self._send_status_to_queue(api_result["error_type"], api_result.get("message", "API error."), api_result) else: module_logger.error(f"{self.name}: Unknown API result: {api_result}") wait_time = self._current_backoff_delay if self._in_backoff_mode else self.polling_interval if self._stop_event.wait(timeout=wait_time): module_logger.info(f"{self.name}: Stop event during wait. Exiting."); break module_logger.info(f"{self.name}: Exited main loop. Sending final STOPPED status.") self._send_status_to_queue(STATUS_STOPPED, "Adapter thread stopped.") module_logger.info(f"{self.name}: RUN method terminating.") def _generate_mock_flight_state(self, icao_suffix: int) -> CanonicalFlightState: # Kept for mock logic now = time.time() lat_center = (self.bounding_box["lat_min"] + self.bounding_box["lat_max"]) / 2 lon_center = (self.bounding_box["lon_min"] + self.bounding_box["lon_max"]) / 2 lat_span = self.bounding_box["lat_max"] - self.bounding_box["lat_min"] lon_span = self.bounding_box["lon_max"] - self.bounding_box["lon_min"] return CanonicalFlightState( icao24=f"mock{icao_suffix:02x}", callsign=f"MOCK{icao_suffix:02X}", origin_country="Mockland", timestamp=now, last_contact_timestamp=now, latitude=round(lat_center + random.uniform(-lat_span / 2.1, lat_span / 2.1), 4), longitude=round(lon_center + random.uniform(-lon_span / 2.1, lon_span / 2.1), 4), baro_altitude_m=random.uniform(1000, 12000), geo_altitude_m=random.uniform(1000, 12000) + random.uniform(-200, 200), on_ground=random.choice([True, False]), velocity_mps=random.uniform(50, 250), true_track_deg=random.uniform(0, 360), vertical_rate_mps=random.uniform(-10, 10), squawk=str(random.randint(1000, 7777)), spi=random.choice([True, False]), position_source=0, raw_data_provider=f"{PROVIDER_NAME}-Mock", )