# FlightMonitor/data/opensky_live_adapter.py """ Adapter for fetching live flight data from the OpenSky Network API. This adapter polls the API periodically, transforms the raw data into CanonicalFlightState objects, and puts structured messages into an output queue. """ import requests import json import time import threading # MODIFIED: Corrected the import of queue exceptions. (Attempt #4... hopefully the final one!) # WHY: The direct import of `Timeout` from `queue` causes an ImportError in some environments. # We need to import the `queue` module itself and refer to exceptions like `queue.Timeout` # and `queue.Full` by their module prefix. # HOW: Removed `Timeout` and `Full as QueueFull` from the direct import list. Kept `Empty as QueueEmpty` for consistency if used. # Removed `Full as QueueFull`. from queue import ( Queue, # Removed Full as QueueFull Empty as QueueEmpty, # Removed Timeout from here ) # MODIFIED: Import the queue module itself. # WHY: Needed to refer to exceptions like queue.Timeout and queue.Full by their module prefix. # HOW: Added import. import queue # Import the queue module itself from typing import List, Optional, Dict, Any, Union import random from . import config as app_config from ..utils.logger import get_logger from .common_models import CanonicalFlightState module_logger = get_logger(__name__) # --- Adapter Specific Constants --- PROVIDER_NAME = "OpenSkyNetwork" INITIAL_BACKOFF_DELAY_SECONDS = 20.0 MAX_BACKOFF_DELAY_SECONDS = 300.0 BACKOFF_FACTOR = 1.8 MAX_CONSECUTIVE_ERRORS_THRESHOLD = 5 # --- Message Types and Status Codes for Output Queue --- AdapterMessage = Dict[str, Any] FlightDataPayload = List[CanonicalFlightState] MSG_TYPE_FLIGHT_DATA = "flight_data" MSG_TYPE_ADAPTER_STATUS = "adapter_status" STATUS_STARTING = "STARTING" STATUS_FETCHING = "FETCHING" STATUS_RECOVERED = "RECOVERED" STATUS_RATE_LIMITED = "RATE_LIMITED" STATUS_API_ERROR_TEMPORARY = "API_ERROR_TEMPORARY" STATUS_PERMANENT_FAILURE = "PERMANENT_FAILURE" STATUS_STOPPING = "STOPPING" STATUS_STOPPED = "STOPPED" class OpenSkyLiveAdapter(threading.Thread): """ Polls the OpenSky Network API, converts data to CanonicalFlightState, and sends structured messages (flight data or status updates) to an output queue. Implements exponential backoff for API errors. """ def __init__( self, output_queue: Queue[AdapterMessage], bounding_box: Dict[str, float], polling_interval: int = app_config.LIVE_POLLING_INTERVAL_SECONDS, api_timeout: int = app_config.DEFAULT_API_TIMEOUT_SECONDS, daemon: bool = True, ): thread_name = f"OpenSkyLiveAdapter-bbox-{bounding_box.get('lat_min',0):.1f}" super().__init__(daemon=daemon, name=thread_name) if output_queue is None: module_logger.critical( f"{self.name}: Output queue cannot be None during initialization." ) raise ValueError("Output queue must be provided to OpenSkyLiveAdapter.") self.output_queue = output_queue self.bounding_box = bounding_box self.base_polling_interval = float(polling_interval) self.api_timeout = float(api_timeout) self._stop_event = threading.Event() self._consecutive_api_errors: int = 0 self._current_backoff_delay: float = 0.0 self._in_backoff_mode: bool = False module_logger.debug( f"{self.name} initialized. BBox: {self.bounding_box}, " f"Base Interval: {self.base_polling_interval}s, API Timeout: {self.api_timeout}s" ) def _generate_mock_flight_state(self, icao_suffix: int) -> CanonicalFlightState: """Helper to generate a single mock CanonicalFlightState.""" now = time.time() lat_center = (self.bounding_box["lat_min"] + self.bounding_box["lat_max"]) / 2 lon_center = (self.bounding_box["lon_min"] + self.bounding_box["lon_max"]) / 2 lat_span = self.bounding_box["lat_max"] - self.bounding_box["lat_min"] lon_span = self.bounding_box["lon_max"] - self.bounding_box["lon_min"] return CanonicalFlightState( icao24=f"mock{icao_suffix:02x}", callsign=f"MOCK{icao_suffix:02X}", origin_country="Mockland", timestamp=now, last_contact_timestamp=now, latitude=round( lat_center + random.uniform(-lat_span / 2.1, lat_span / 2.1), 4 ), longitude=round( lon_center + random.uniform(-lon_span / 2.1, lon_span / 2.1), 4 ), baro_altitude_m=random.uniform(1000, 12000), on_ground=random.choice([True, False]), velocity_mps=random.uniform(50, 250), true_track_deg=random.uniform(0, 360), vertical_rate_mps=random.uniform(-10, 10), squawk=str(random.randint(1000, 7777)), spi=random.choice([True, False]), position_source="MOCK_GENERATOR", raw_data_provider=f"{PROVIDER_NAME}-Mock", ) def _send_status_to_queue( self, status_code: str, message: str, details: Optional[Dict] = None ): """ Helper to put a status message into the output queue. Uses put_nowait and logs if the queue is full. """ status_payload = { "type": MSG_TYPE_ADAPTER_STATUS, "status_code": status_code, "message": f"{self.name}: {message}", } if details: status_payload["details"] = details # Use put_nowait for status messages. try: self.output_queue.put_nowait(status_payload) module_logger.debug( f"{self.name}: Sent status '{status_code}' to queue. Msg: {message}" ) # MODIFIED: Changed exception type to use queue.Full. # WHY: Refer to the exception using the module prefix. # HOW: Changed `QueueFull` to `queue.Full`. except queue.Full: module_logger.warning( f"{self.name}: Output queue full. Could not send status: {status_code}" ) except Exception as e: module_logger.error( f"{self.name}: Error sending status '{status_code}' to queue: {e}", exc_info=True, ) def stop(self): """Signals the thread to stop its execution loop.""" module_logger.info( f"Stop signal received for {self.name}. Signaling stop event." ) self._stop_event.set() def _parse_state_vector(self, raw_sv: list) -> Optional[CanonicalFlightState]: """Parses a raw OpenSky state vector into a CanonicalFlightState object.""" try: if not raw_sv or len(raw_sv) < 17: module_logger.warning( f"Skipping incomplete state vector (length {len(raw_sv)}): {raw_sv}" ) return None icao24 = raw_sv[0] if not isinstance(icao24, str) or not icao24.strip(): module_logger.warning( f"Skipping state vector due to invalid/missing ICAO24: '{icao24}' in {raw_sv}" ) return None api_time_position = float(raw_sv[3]) if raw_sv[3] is not None else None api_last_contact = float(raw_sv[4]) if raw_sv[4] is not None else None primary_timestamp: float if api_last_contact is not None and api_time_position is not None: primary_timestamp = max(api_last_contact, api_time_position) elif api_last_contact is not None: primary_timestamp = api_last_contact elif api_time_position is not None: primary_timestamp = api_time_position else: primary_timestamp = time.time() module_logger.warning( f"ICAO {icao24}: Using current time as primary_timestamp due to missing API timestamps (last_contact={api_last_contact}, time_position={api_time_position})." ) last_contact_for_model = ( api_last_contact if api_last_contact is not None else primary_timestamp ) state = CanonicalFlightState( icao24=icao24, callsign=raw_sv[1], origin_country=raw_sv[2], timestamp=primary_timestamp, last_contact_timestamp=last_contact_for_model, latitude=float(raw_sv[5]) if raw_sv[5] is not None else None, longitude=float(raw_sv[6]) if raw_sv[6] is not None else None, baro_altitude_m=float(raw_sv[7]) if raw_sv[7] is not None else None, on_ground=bool(raw_sv[8]), velocity_mps=float(raw_sv[9]) if raw_sv[9] is not None else None, true_track_deg=float(raw_sv[10]) if raw_sv[10] is not None else None, vertical_rate_mps=float(raw_sv[11]) if raw_sv[11] is not None else None, geo_altitude_m=float(raw_sv[13]) if raw_sv[13] is not None else None, squawk=raw_sv[14], spi=bool(raw_sv[15]), position_source=int(raw_sv[16]) if raw_sv[16] is not None else None, raw_data_provider=PROVIDER_NAME, ) return state except (TypeError, ValueError, IndexError) as e: module_logger.error( f"Error parsing state vector for ICAO '{raw_sv[0] if raw_sv else 'UNKNOWN'}': {e}. Vector: {raw_sv}", exc_info=False, ) return None def _perform_api_request(self) -> Dict[str, Any]: """ Performs API request or generates mock data based on config. Returns a structured result dictionary containing either 'data' or 'error_type' and details. """ # --- MOCK API Logic --- if app_config.USE_MOCK_OPENSKY_API: module_logger.info( f"{self.name}: Using MOCK API data as per configuration." ) self._send_status_to_queue( STATUS_FETCHING, "Generating mock flight data..." ) # Removed the time.sleep(0.5) as it was non-interruptible. # The delay between polling cycles is handled by wait(). if app_config.MOCK_API_ERROR_SIMULATION == "RATE_LIMITED": self._consecutive_api_errors += 1 self._in_backoff_mode = True mock_retry_after = str( getattr(app_config, "MOCK_RETRY_AFTER_SECONDS", 60) ) delay = self._calculate_next_backoff_delay( provided_retry_after=mock_retry_after ) err_msg = f"MOCK: Rate limited. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.warning(f"{self.name}: {err_msg}") return { "error_type": STATUS_RATE_LIMITED, "delay": delay, "message": err_msg, "consecutive_errors": self._consecutive_api_errors, "status_code": 429, } if app_config.MOCK_API_ERROR_SIMULATION == "HTTP_ERROR": self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() mock_status_code = getattr(app_config, "MOCK_HTTP_ERROR_STATUS", 500) err_msg = f"MOCK: HTTP error {mock_status_code}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=False) return { "error_type": STATUS_API_ERROR_TEMPORARY, "status_code": mock_status_code, "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors, } mock_states: List[CanonicalFlightState] = [] mock_count = getattr(app_config, "MOCK_API_FLIGHT_COUNT", 5) for i in range(mock_count): mock_states.append(self._generate_mock_flight_state(i + 1)) module_logger.info( f"{self.name}: Generated {len(mock_states)} mock flight states." ) self._reset_error_state() return {"data": mock_states} # --- END MOCK API Logic --- # --- REAL API Call Logic --- if not self.bounding_box: err_msg = "Bounding box not set for REAL API request." module_logger.error(f"{self.name}: {err_msg}") return { "error_type": STATUS_PERMANENT_FAILURE, "message": err_msg, "status_code": "NO_BBOX", "delay": 0.0, "consecutive_errors": self._consecutive_api_errors, } params = { "lamin": self.bounding_box["lat_min"], "lomin": self.bounding_box["lon_min"], "lamax": self.bounding_box["lat_max"], "lomax": self.bounding_box["lon_max"], } self._send_status_to_queue( STATUS_FETCHING, f"Requesting REAL data for bbox: {self.bounding_box}" ) response: Optional[requests.Response] = None try: # The requests.get call includes a timeout and is the appropriate place for blocking network I/O. response = requests.get( app_config.OPENSKY_API_URL, params=params, timeout=self.api_timeout ) module_logger.debug( f"{self.name}: API Response Status: {response.status_code} {response.reason}" ) if response.status_code == 429: self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay( response.headers.get("Retry-After") ) err_msg = f"Rate limited (429). Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.warning(f"{self.name}: {err_msg}") return { "error_type": STATUS_RATE_LIMITED, "delay": delay, "message": err_msg, "consecutive_errors": self._consecutive_api_errors, "status_code": 429, } response.raise_for_status() self._reset_error_state() response_data = response.json() raw_states_list = response_data.get("states") canonical_states: List[CanonicalFlightState] = [] if raw_states_list is not None: for raw_sv in raw_states_list: parsed_state = self._parse_state_vector(raw_sv) if parsed_state: canonical_states.append(parsed_state) module_logger.info( f"{self.name}: Fetched and parsed {len(canonical_states)} flight states." ) else: module_logger.info( f"{self.name}: API returned no flight states ('states' is null or empty)." ) return {"data": canonical_states} except requests.exceptions.HTTPError as http_err: self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() status_code = http_err.response.status_code if http_err.response else "N/A" err_msg = f"HTTP error {status_code}: {http_err}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=False) return { "error_type": STATUS_API_ERROR_TEMPORARY, "status_code": status_code, "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors, } except requests.exceptions.Timeout: self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() err_msg = f"API request timed out. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=False) return { "error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "TIMEOUT", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors, } except (requests.exceptions.RequestException, json.JSONDecodeError) as e: self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() err_msg = f"Request/JSON error: {e}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=True) return { "error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "REQUEST_JSON_ERROR", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors, } except Exception as e: self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() err_msg = f"Unexpected critical error: {e}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.critical(f"{self.name}: {err_msg}", exc_info=True) return { "error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "UNEXPECTED_ADAPTER_ERROR", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors, } def _calculate_next_backoff_delay( self, provided_retry_after: Optional[str] = None ) -> float: """Calculates exponential backoff delay, respecting Retry-After header if provided.""" calculated_delay = INITIAL_BACKOFF_DELAY_SECONDS * ( BACKOFF_FACTOR ** (self._consecutive_api_errors - 1) ) api_delay_from_header = 0.0 if provided_retry_after is not None and provided_retry_after.isdigit(): try: api_delay_from_header = float(provided_retry_after) module_logger.debug( f"{self.name}: Found Retry-After header: {provided_retry_after}s" ) except ValueError: logger.warning( f"{self.name}: Could not parse Retry-After header value: '{provided_retry_after}'" ) api_delay_from_header = 0.0 except Exception as e: logger.warning( f"{self.name}: Unexpected error parsing Retry-After header: {e}", exc_info=False, ) api_delay_from_header = 0.0 self._current_backoff_delay = max(api_delay_from_header, calculated_delay) self._current_backoff_delay = min( self._current_backoff_delay, MAX_BACKOFF_DELAY_SECONDS ) self._current_backoff_delay = max(self._current_backoff_delay, 1.0) module_logger.debug( f"{self.name}: Calculated next backoff delay: {self._current_backoff_delay:.1f}s" ) return self._current_backoff_delay def _reset_error_state(self): """Resets error counters and backoff mode, sends RECOVERED status if was in error.""" if self._in_backoff_mode or self._consecutive_api_errors > 0: module_logger.info( f"{self.name}: API connection successful after {self._consecutive_api_errors} error(s). Resetting backoff." ) self._send_status_to_queue(STATUS_RECOVERED, "API connection recovered.") self._consecutive_api_errors = 0 self._current_backoff_delay = 0.0 self._in_backoff_mode = False def run(self): # Use wait() for the initial delay to make it interruptible. initial_settle_delay_seconds = 0.1 module_logger.debug( f"{self.name} thread starting initial settle delay ({initial_settle_delay_seconds}s)..." ) try: if self._stop_event.wait(timeout=initial_settle_delay_seconds): module_logger.info( f"{self.name} thread received stop signal during initial settle delay. Terminating early." ) return except Exception as e: module_logger.error( f"{self.name} unexpected error during initial settle delay: {e}", exc_info=True, ) return module_logger.info( f"{self.name} thread is fully operational. Base polling interval: {self.base_polling_interval:.1f}s." ) # Send STARTING status self._send_status_to_queue( STATUS_STARTING, "Adapter thread started, preparing initial fetch." ) # --- Main Adapter Loop --- while not self._stop_event.is_set(): if self._consecutive_api_errors >= MAX_CONSECUTIVE_ERRORS_THRESHOLD: perm_fail_msg = f"Reached max ({self._consecutive_api_errors}) consecutive API errors. Stopping live updates." module_logger.critical(f"{self.name}: {perm_fail_msg}") self._send_status_to_queue(STATUS_PERMANENT_FAILURE, perm_fail_msg) break api_result = self._perform_api_request() if self._stop_event.is_set(): module_logger.info( f"{self.name}: Stop event detected after API request. Exiting loop." ) break if "data" in api_result: flight_data_payload: List[CanonicalFlightState] = api_result["data"] # Use put_nowait for data messages. try: self.output_queue.put_nowait( { "type": MSG_TYPE_FLIGHT_DATA, "payload": flight_data_payload, } ) module_logger.debug( f"{self.name}: Sent {len(flight_data_payload)} flight states to queue." ) # Catch QueueFull except queue.Full: module_logger.warning( f"{self.name}: Output queue full. Discarding {len(flight_data_payload)} flight states." ) except Exception as e: module_logger.error( f"{self.name}: Error putting flight data into queue: {e}", exc_info=True, ) elif "error_type" in api_result: error_details_for_controller = api_result.copy() error_details_for_controller["type"] = MSG_TYPE_ADAPTER_STATUS error_details_for_controller["status_code"] = api_result["error_type"] # Sending status already uses put_nowait() in _send_status_to_queue else: module_logger.error( f"{self.name}: Unknown result structure from _perform_api_request: {api_result}" ) time_to_wait_seconds: float if self._in_backoff_mode: time_to_wait_seconds = self._current_backoff_delay module_logger.debug( f"{self.name}: In backoff, next attempt in {time_to_wait_seconds:.1f}s." ) else: time_to_wait_seconds = self.base_polling_interval module_logger.debug( f"{self.name}: Next fetch cycle in {time_to_wait_seconds:.1f}s." ) if time_to_wait_seconds > 0: if self._stop_event.wait(timeout=time_to_wait_seconds): print( f"{self.name}: Stop event received during wait period. Exiting loop." ) break else: if self._stop_event.is_set(): print( f"{self.name}: Stop event detected before waiting (wait time <= 0). Exiting loop." ) break # --- End Main Adapter Loop --- # Added a log right after the loop exits, before the final cleanup block. print(f"{self.name}: Exited main loop. Attempting final actions.") # Send STATUS_STOPPED just before terminating # Use put_nowait for the final message and handle QueueFull explicitly. try: final_status_payload = { "type": MSG_TYPE_ADAPTER_STATUS, "status_code": STATUS_STOPPED, "message": f"{self.name}: Adapter thread stopped.", } # MODIFIED: Wrap the final put_nowait call in a try/except specifically for queue.Full. # WHY: Ensure that even if the queue is full and put_nowait raises QueueFull, # the thread still proceeds to termination without blocking. # HOW: Added the try/except queue.Full block around the put_nowait call. try: self.output_queue.put_nowait(final_status_payload) # Log success of sending final status - this log itself might block print(f"{self.name}: Successfully sent final STATUS_STOPPED to queue.") except queue.Full: # Log warning if queue full - this log itself might block print( f"{self.name}: Output queue full. Could not send final STATUS_STOPPED message." ) # except Exception as e: # Catch other errors during this final put # Log error - this log itself might block print( f"{self.name}: Error sending final STATUS_STOPPED message: {e}", exc_info=True, ) except Exception as e_outer_put: # Catch any exception *during* the attempt to send the final status message, # including issues with accessing the queue object itself. print( f"{self.name}: Unexpected error during final status message attempt: {e_outer_put}", exc_info=True, ) # Removed the intermediate debug log here. # This is the absolute last standard logger call. print(f"{self.name}: RUN method is terminating now.")