# FlightMonitor/data/opensky_live_adapter.py """ Adapter for fetching live flight data from the OpenSky Network API using the official opensky-api library. This adapter polls the API periodically, transforms the raw data into CanonicalFlightState objects, and puts structured messages into an output queue. """ import requests import time import threading # Still needed for threading.Event and general thread management if not using super().init directly from queue import ( Queue, Empty as QueueEmpty, Full as QueueFull, ) import random from typing import Dict, Any, List, Tuple, Optional from opensky_api import OpenSkyApi from . import config as app_config from ..utils.logger import get_logger from .common_models import CanonicalFlightState # MODIFICATO: Importa la nuova classe base # PERCHÉ: OpenSkyLiveAdapter deve ereditare da questa classe per conformarsi all'interfaccia. # DOVE: Inizio del file, sezione importazioni. # COME: Aggiunta l'importazione. from .base_live_data_adapter import BaseLiveDataAdapter, AdapterMessage # Import AdapterMessage to match Base class signature module_logger = get_logger(__name__) # --- Adapter Specific Constants --- PROVIDER_NAME = "OpenSkyNetwork" INITIAL_BACKOFF_DELAY_SECONDS = 20.0 MAX_BACKOFF_DELAY_SECONDS = 300.0 BACKOFF_FACTOR = 1.8 MAX_CONSECUTIVE_ERRORS_THRESHOLD = 5 # --- Message Types and Status Codes for Output Queue --- # MODIFICATO: Rimosso AdapterMessage, FlightDataPayload, MSG_TYPE_* qui. # PERCHÉ: AdapterMessage è ora importato dalla base class, e le costanti MSG_TYPE_* e STATUS_* # dovrebbero idealmente essere centralizzate in un modulo comune per i messaggi dell'adapter, # ma per ora le abbiamo in LiveDataProcessor e opensky_live_adapter.py. # Qui le abbiamo solo se _send_status_to_queue ha bisogno di queste costanti. # Mantenute per _send_status_to_queue. # AdapterMessage = Dict[str, Any] # Già importato da BaseLiveDataAdapter # FlightDataPayload = List[CanonicalFlightState] # Non è una costante, ma un tipo MSG_TYPE_FLIGHT_DATA = "flight_data" MSG_TYPE_ADAPTER_STATUS = "adapter_status" STATUS_STARTING = "STARTING" STATUS_FETCHING = "FETCHING" STATUS_RECOVERED = "RECOVERED" STATUS_RATE_LIMITED = "RATE_LIMITED" STATUS_API_ERROR_TEMPORARY = "API_ERROR_TEMPORARY" STATUS_PERMANENT_FAILURE = "PERMANENT_FAILURE" STATUS_STOPPING = "STOPPING" STATUS_STOPPED = "STOPPED" # MODIFICATO: OpenSkyLiveAdapter ora eredita da BaseLiveDataAdapter # PERCHÉ: Per conformarsi all'interfaccia comune e riutilizzare la logica base del thread. # DOVE: Dichiarazione della classe. # COME: Cambiato `threading.Thread` a `BaseLiveDataAdapter`. class OpenSkyLiveAdapter(BaseLiveDataAdapter): """ Polls the OpenSky Network API using the opensky-api library, converts data to CanonicalFlightState, and sends structured messages (flight data or status updates) to an output queue. Implements exponential backoff for API errors not handled by the library. """ # MODIFICATO: Aggiornato la firma del costruttore per chiamare super().__init__ correttamente # PERCHÉ: Il costruttore della classe base (BaseLiveDataAdapter) ora gestisce l'inizializzazione # degli attributi comuni (output_queue, bounding_box, polling_interval, daemon, name). # DOVE: Inizio del metodo __init__. # COME: Aggiunto `super().__init__(output_queue, bounding_box, polling_interval, daemon, name=thread_name)`. def __init__( self, output_queue: Queue[AdapterMessage], bounding_box: Dict[str, float], polling_interval: int = app_config.LIVE_POLLING_INTERVAL_SECONDS, daemon: bool = True, ): thread_name = f"OpenSkyLibAdapter-bbox-{bounding_box.get('lat_min',0):.1f}" super().__init__(output_queue, bounding_box, float(polling_interval), daemon, name=thread_name) # Call base class __init__ # MODIFICATO: Attributi comuni ora inizializzati nella base class # PERCHÉ: Evita la duplicazione e mantiene la coerenza. # DOVE: Rimozione delle seguenti righe. # self.output_queue = output_queue # self.bounding_box_dict = bounding_box # self.base_polling_interval = float(polling_interval) # self._stop_event = threading.Event() # Managed by BaseLiveDataAdapter self._consecutive_api_errors: int = 0 self._current_backoff_delay: float = 0.0 self._in_backoff_mode: bool = False try: username = app_config.OPENSKY_USERNAME password = app_config.OPENSKY_PASSWORD use_credentials_flag = getattr(app_config, "USE_OPENSKY_CREDENTIALS", False) api_timeout = getattr(app_config, "DEFAULT_API_TIMEOUT_SECONDS", 15) module_logger.info(f"{self.name}: OpenSkyApi client will use a timeout of {api_timeout} seconds.") if use_credentials_flag and username and password: self.api_client = OpenSkyApi(username=username, password=password)#, timeout=api_timeout) module_logger.info(f"{self.name}: OpenSkyApi client initialized with provided credentials and timeout.") else: self.api_client = OpenSkyApi()#timeout=api_timeout) log_msg = f"{self.name}: OpenSkyApi client initialized without credentials (anonymous access) and timeout. " if use_credentials_flag: log_msg += "USE_OPENSKY_CREDENTIALS is True but credentials not found/provided." else: log_msg += "USE_OPENSKY_CREDENTIALS is False." module_logger.warning(log_msg) except Exception as e: module_logger.critical( f"{self.name}: Failed to initialize OpenSkyApi client: {e}", exc_info=True, ) raise RuntimeError(f"Failed to initialize OpenSkyApi client: {e}") from e module_logger.debug( f"{self.name} initialized. BBox (dict): {self.bounding_box}, " # MODIFIED: Use self.bounding_box from base class f"Base Interval: {self.polling_interval}s" # MODIFIED: Use self.polling_interval from base class ) def _generate_mock_flight_state(self, icao_suffix: int) -> CanonicalFlightState: """Helper to generate a single mock CanonicalFlightState.""" now = time.time() # MODIFICATO: Usa self.bounding_box dalla base class lat_center = ( self.bounding_box["lat_min"] + self.bounding_box["lat_max"] ) / 2 lon_center = ( self.bounding_box["lon_min"] + self.bounding_box["lon_max"] ) / 2 lat_span = self.bounding_box["lat_max"] - self.bounding_box["lat_min"] lon_span = self.bounding_box["lon_max"] - self.bounding_box["lon_min"] return CanonicalFlightState( icao24=f"mock{icao_suffix:02x}", callsign=f"MOCK{icao_suffix:02X}", origin_country="Mockland", timestamp=now, last_contact_timestamp=now, latitude=round( lat_center + random.uniform(-lat_span / 2.1, lat_span / 2.1), 4 ), longitude=round( lon_center + random.uniform(-lon_span / 2.1, lon_span / 2.1), 4 ), baro_altitude_m=random.uniform(1000, 12000), geo_altitude_m=random.uniform(1000, 12000) + random.uniform(-200, 200), on_ground=random.choice([True, False]), velocity_mps=random.uniform(50, 250), true_track_deg=random.uniform(0, 360), vertical_rate_mps=random.uniform(-10, 10), squawk=str(random.randint(1000, 7777)), spi=random.choice([True, False]), position_source=0, raw_data_provider=f"{PROVIDER_NAME}-Mock", ) def _send_status_to_queue( self, status_code: str, message: str, details: Optional[Dict] = None ): status_payload = { "type": MSG_TYPE_ADAPTER_STATUS, "status_code": status_code, "message": f"{self.name}: {message}", } if details: status_payload["details"] = details try: self.output_queue.put_nowait(status_payload) module_logger.debug( f"{self.name}: Sent status '{status_code}' to queue. Msg: {message}" ) except QueueFull: module_logger.warning( f"{self.name}: Output queue full. Could not send status: {status_code}" ) except Exception as e: module_logger.error( f"{self.name}: Error sending status '{status_code}' to queue: {e}", exc_info=True, ) # MODIFICATO: Rimosso il metodo stop. # PERCHÉ: Ora è implementato nella BaseLiveDataAdapter e può essere chiamato tramite super().stop() # DOVE: L'intero metodo è stato eliminato. # def stop(self): # module_logger.info( # f"Stop signal received for {self.name}. Signaling stop event." # ) # self._stop_event.set() def _convert_state_vector_to_canonical( self, sv: Any ) -> Optional[CanonicalFlightState]: """Converts an OpenSkyApi StateVector object to a CanonicalFlightState object.""" try: if sv.icao24 is None: module_logger.warning(f"Skipping StateVector with None icao24: {sv}") return None primary_ts = ( sv.time_position if sv.time_position is not None else sv.last_contact ) last_contact_ts = sv.last_contact if primary_ts is None: primary_ts = time.time() module_logger.warning( f"ICAO {sv.icao24}: Using current time as primary_timestamp due to missing API timestamps (last_contact={sv.last_contact}, time_position={sv.time_position})." ) if last_contact_ts is None: last_contact_ts = primary_ts return CanonicalFlightState( icao24=sv.icao24, callsign=( sv.callsign.strip() if sv.callsign else None ), origin_country=sv.origin_country, timestamp=float(primary_ts), last_contact_timestamp=float(last_contact_ts), latitude=sv.latitude, longitude=sv.longitude, baro_altitude_m=sv.baro_altitude, on_ground=sv.on_ground, velocity_mps=sv.velocity, true_track_deg=sv.true_track, vertical_rate_mps=sv.vertical_rate, geo_altitude_m=sv.geo_altitude, squawk=sv.squawk, spi=sv.spi, position_source=( str(sv.position_source) if sv.position_source is not None else None ), raw_data_provider=PROVIDER_NAME, ) except AttributeError as e_attr: module_logger.error( f"AttributeError parsing StateVector: {e_attr}. Vector: {sv}", exc_info=False, ) return None except Exception as e: module_logger.error( f"Error converting StateVector for ICAO '{getattr(sv, 'icao24', 'UNKNOWN')}': {e}. Vector: {sv}", exc_info=True, ) return None def _perform_api_request(self) -> Dict[str, Any]: """ Performs API request using opensky-api library or generates mock data. Returns a structured result dictionary. """ if app_config.USE_MOCK_OPENSKY_API: module_logger.info(f"{self.name}: Using MOCK API data as per configuration.") self._send_status_to_queue(STATUS_FETCHING, "Generating mock flight data...") if app_config.MOCK_API_ERROR_SIMULATION == "RATE_LIMITED": self._consecutive_api_errors += 1 self._in_backoff_mode = True mock_retry_after = str(getattr(app_config, "MOCK_RETRY_AFTER_SECONDS", 60)) delay = self._calculate_next_backoff_delay(provided_retry_after=mock_retry_after) err_msg = f"MOCK: Rate limited. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.warning(f"{self.name}: {err_msg}") return { "error_type": STATUS_RATE_LIMITED, "delay": delay, "message": err_msg, "consecutive_errors": self._consecutive_api_errors, "status_code": 429, } if app_config.MOCK_API_ERROR_SIMULATION == "HTTP_ERROR": self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() mock_status_code = getattr(app_config, "MOCK_HTTP_ERROR_STATUS", 500) err_msg = f"MOCK: HTTP error {mock_status_code}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=False) return { "error_type": STATUS_API_ERROR_TEMPORARY, "status_code": mock_status_code, "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors, } mock_states: List[CanonicalFlightState] = [] mock_count = getattr(app_config, "MOCK_API_FLIGHT_COUNT", 5) for i in range(mock_count): mock_states.append(self._generate_mock_flight_state(i + 1)) module_logger.info(f"{self.name}: Generated {len(mock_states)} mock flight states.") self._reset_error_state() return {"data": mock_states} # MODIFICATO: Usa self.bounding_box dalla base class if not self.bounding_box: err_msg = "Bounding box not set for REAL API request." module_logger.error(f"{self.name}: {err_msg}") return { "error_type": STATUS_PERMANENT_FAILURE, "message": err_msg, "status_code": "NO_BBOX", "delay": 0.0, "consecutive_errors": self._consecutive_api_errors, } try: api_bbox_tuple = ( self.bounding_box["lat_min"], self.bounding_box["lat_max"], self.bounding_box["lon_min"], self.bounding_box["lon_max"], ) except KeyError as e_key: err_msg = f"Missing key in bounding_box_dict: {e_key}" module_logger.error(f"{self.name}: {err_msg}") return { "error_type": STATUS_PERMANENT_FAILURE, "message": err_msg, "status_code": "INVALID_BBOX_FORMAT", "delay": 0.0, "consecutive_errors": self._consecutive_api_errors, } self._send_status_to_queue(STATUS_FETCHING, f"Requesting REAL data for bbox (tuple): {api_bbox_tuple}") try: states_response = self.api_client.get_states(bbox=api_bbox_tuple) self._reset_error_state() canonical_states: List[CanonicalFlightState] = [] if states_response and states_response.states: for sv in states_response.states: canonical_sv = self._convert_state_vector_to_canonical(sv) if canonical_sv: canonical_states.append(canonical_sv) module_logger.info(f"{self.name}: Fetched and parsed {len(canonical_states)} flight states via opensky-api.") else: module_logger.info(f"{self.name}: API returned no flight states (states_response or .states is null/empty).") return {"data": canonical_states} except ( requests.exceptions.HTTPError ) as http_err: self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() status_code = http_err.response.status_code if http_err.response else "N/A" if status_code == 429: err_msg = f"UNEXPECTED Rate limit (429) propagated by library. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}") return { "error_type": STATUS_RATE_LIMITED, "delay": delay, "message": err_msg, "consecutive_errors": self._consecutive_api_errors, "status_code": 429, } else: err_msg = f"HTTP error {status_code} from opensky-api: {http_err}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=False) return { "error_type": STATUS_API_ERROR_TEMPORARY, "status_code": status_code, "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors, } except requests.exceptions.Timeout: self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() err_msg = f"API request timed out (via opensky-api). Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=False) return { "error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "TIMEOUT", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors, } except ( requests.exceptions.RequestException ) as req_err: self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() err_msg = f"Request error via opensky-api: {req_err}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=True) return { "error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "REQUEST_EXCEPTION", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors, } except Exception as e: self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() err_msg = f"Unexpected critical error using opensky-api: {e}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.critical(f"{self.name}: {err_msg}", exc_info=True) return { "error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "UNEXPECTED_LIBRARY_ERROR", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors, } def _calculate_next_backoff_delay( self, provided_retry_after: Optional[str] = None, ) -> float: calculated_delay = INITIAL_BACKOFF_DELAY_SECONDS * ( BACKOFF_FACTOR ** (self._consecutive_api_errors - 1) ) api_delay_from_header = 0.0 if provided_retry_after is not None and provided_retry_after.isdigit(): try: api_delay_from_header = float(provided_retry_after) module_logger.debug(f"{self.name}: Found Retry-After header value: {provided_retry_after}s") except ValueError: module_logger.warning(f"{self.name}: Could not parse Retry-After header value: '{provided_retry_after}'") api_delay_from_header = 0.0 self._current_backoff_delay = max(api_delay_from_header, calculated_delay) self._current_backoff_delay = min(self._current_backoff_delay, MAX_BACKOFF_DELAY_SECONDS) self._current_backoff_delay = max(self._current_backoff_delay, 1.0) module_logger.debug(f"{self.name}: Calculated next backoff delay: {self._current_backoff_delay:.1f}s") return self._current_backoff_delay def _reset_error_state(self): if self._in_backoff_mode or self._consecutive_api_errors > 0: module_logger.info(f"{self.name}: API connection successful after {self._consecutive_api_errors} error(s). Resetting backoff.") self._send_status_to_queue(STATUS_RECOVERED, "API connection recovered.") self._consecutive_api_errors = 0 self._current_backoff_delay = 0.0 self._in_backoff_mode = False # MODIFICATO: Il metodo run ora gestisce il loop di fetching e invia i messaggi alla coda di output. # PERCHÉ: OpenSkyLiveAdapter, in quanto Thread, deve implementare il suo ciclo di vita qui. # DOVE: Questo è il ciclo principale del thread adapter. # COME: Implementa il polling e l'invio alla coda. def run(self): initial_settle_delay_seconds = 0.1 module_logger.debug(f"{self.name} thread starting initial settle delay ({initial_settle_delay_seconds}s)...") if self._stop_event.wait(timeout=initial_settle_delay_seconds): module_logger.info(f"{self.name} thread received stop signal during initial settle. Terminating.") return module_logger.info(f"{self.name} thread is fully operational. Base polling interval: {self.polling_interval:.1f}s.") self._send_status_to_queue(STATUS_STARTING, "Adapter thread started, preparing initial fetch.") while not self._stop_event.is_set(): # Loop continua finché non viene segnalato l'arresto if self._consecutive_api_errors >= MAX_CONSECUTIVE_ERRORS_THRESHOLD: perm_fail_msg = f"Reached max ({self._consecutive_api_errors}) consecutive API errors. Stopping live updates." module_logger.critical(f"{self.name}: {perm_fail_msg}") self._send_status_to_queue(STATUS_PERMANENT_FAILURE, perm_fail_msg) break # Esce dal loop principale del thread api_result = self._perform_api_request() # Esegue la richiesta API o genera mock if self._stop_event.is_set(): # Controlla di nuovo dopo la chiamata API (potrebbe essere lunga) module_logger.info(f"{self.name}: Stop event detected after API request. Exiting loop.") break if "data" in api_result: flight_data_payload: List[CanonicalFlightState] = api_result["data"] try: self.output_queue.put_nowait({"type": MSG_TYPE_FLIGHT_DATA, "payload": flight_data_payload}) module_logger.debug(f"{self.name}: Sent {len(flight_data_payload)} flight states to queue.") except QueueFull: module_logger.warning(f"{self.name}: Output queue full. Discarding {len(flight_data_payload)} flight states.") except Exception as e: module_logger.error(f"{self.name}: Error putting flight data into queue: {e}", exc_info=True) elif ("error_type" in api_result): # Se _perform_api_request ha restituito un errore strutturato error_details_for_controller = api_result.copy() error_details_for_controller["type"] = MSG_TYPE_ADAPTER_STATUS self._send_status_to_queue( status_code=api_result["error_type"], message=api_result.get("message", "An API error occurred."), details=api_result, ) else: # Struttura risultato sconosciuta module_logger.error(f"{self.name}: Unknown result structure from _perform_api_request: {api_result}") time_to_wait_seconds: float if self._in_backoff_mode: time_to_wait_seconds = self._current_backoff_delay module_logger.debug(f"{self.name}: In backoff, next attempt in {time_to_wait_seconds:.1f}s.") else: time_to_wait_seconds = self.polling_interval # MODIFICATO: Usa self.polling_interval dalla base class module_logger.debug(f"{self.name}: Next fetch cycle in {time_to_wait_seconds:.1f}s.") if time_to_wait_seconds > 0: if self._stop_event.wait(timeout=time_to_wait_seconds): module_logger.info(f"{self.name}: Stop event received during wait period. Exiting loop.") break else: # Se il tempo di attesa è zero o negativo, controlla comunque lo stop event if self._stop_event.is_set(): module_logger.info(f"{self.name}: Stop event detected before waiting (wait time <= 0). Exiting loop.") break # --- End Main Adapter Loop --- module_logger.info(f"{self.name}: Exited main loop. Sending final STOPPED status.") try: final_status_payload = { "type": MSG_TYPE_ADAPTER_STATUS, "status_code": STATUS_STOPPED, "message": f"{self.name}: Adapter thread stopped.", } self.output_queue.put_nowait(final_status_payload) module_logger.info(f"{self.name}: Successfully sent final STATUS_STOPPED to queue.") except QueueFull: module_logger.warning(f"{self.name}: Output queue full. Could not send final STATUS_STOPPED message.") except Exception as e_final_put: module_logger.error(f"{self.name}: Error sending final STATUS_STOPPED message: {e_final_put}", exc_info=True) module_logger.info(f"{self.name}: RUN method is terminating now.")