# FlightMonitor/data/opensky_live_adapter.py """ Adapter for fetching live flight data from the OpenSky Network API using the official opensky-api library. This adapter polls the API periodically, transforms the raw data into CanonicalFlightState objects, and puts structured messages into an output queue. """ import requests # La libreria opensky-api usa requests, quindi potremmo dover gestire le sue eccezioni import time import threading from queue import Queue, Empty as QueueEmpty, Full as QueueFull # Manteniamo QueueFull per la gestione della coda di output import random from typing import Dict, Any, List, Tuple, Optional # MODIFICATO: Importazioni per la libreria opensky-api # PERCHÉ: Utilizzeremo la libreria ufficiale per interagire con l'API OpenSky. # DOVE: Inizio del file, sezione importazioni. # COME: Aggiunte le importazioni necessarie dalla libreria opensky_api. from opensky_api import OpenSkyApi # Dalla documentazione, sembra che la libreria gestisca il rate-limiting internamente # e sollevi eccezioni di requests per altri problemi. Non vedo eccezioni specifiche # di OpenSkyApi da importare per la gestione degli errori di base qui. from . import config as app_config from ..utils.logger import get_logger from .common_models import CanonicalFlightState module_logger = get_logger(__name__) # --- Adapter Specific Constants --- PROVIDER_NAME = "OpenSkyNetwork" # Rimane lo stesso, identifica la fonte per CanonicalFlightState INITIAL_BACKOFF_DELAY_SECONDS = 20.0 MAX_BACKOFF_DELAY_SECONDS = 300.0 BACKOFF_FACTOR = 1.8 MAX_CONSECUTIVE_ERRORS_THRESHOLD = 5 # --- Message Types and Status Codes for Output Queue --- AdapterMessage = Dict[str, Any] FlightDataPayload = List[CanonicalFlightState] MSG_TYPE_FLIGHT_DATA = "flight_data" MSG_TYPE_ADAPTER_STATUS = "adapter_status" STATUS_STARTING = "STARTING" STATUS_FETCHING = "FETCHING" STATUS_RECOVERED = "RECOVERED" STATUS_RATE_LIMITED = "RATE_LIMITED" # Potremmo non usarlo più se la libreria gestisce internamente STATUS_API_ERROR_TEMPORARY = "API_ERROR_TEMPORARY" STATUS_PERMANENT_FAILURE = "PERMANENT_FAILURE" STATUS_STOPPING = "STOPPING" STATUS_STOPPED = "STOPPED" class OpenSkyLiveAdapter(threading.Thread): """ Polls the OpenSky Network API using the opensky-api library, converts data to CanonicalFlightState, and sends structured messages (flight data or status updates) to an output queue. Implements exponential backoff for API errors not handled by the library. """ def __init__( self, output_queue: Queue[AdapterMessage], bounding_box: Dict[str, float], # Formato: {"lat_min": ..., "lon_min": ..., "lat_max": ..., "lon_max": ...} polling_interval: int = app_config.LIVE_POLLING_INTERVAL_SECONDS, # api_timeout non è più direttamente usato da noi, la libreria potrebbe averne uno suo o usare quello di requests daemon: bool = True, ): thread_name = f"OpenSkyLibAdapter-bbox-{bounding_box.get('lat_min',0):.1f}" super().__init__(daemon=daemon, name=thread_name) if output_queue is None: module_logger.critical( f"{self.name}: Output queue cannot be None during initialization." ) raise ValueError("Output queue must be provided to OpenSkyLiveAdapter.") self.output_queue = output_queue self.bounding_box_dict = bounding_box # Il nostro formato standard self.base_polling_interval = float(polling_interval) self._stop_event = threading.Event() self._consecutive_api_errors: int = 0 self._current_backoff_delay: float = 0.0 self._in_backoff_mode: bool = False # MODIFICATO: Inizializzazione del client opensky-api # PERCHÉ: Necessario per utilizzare la libreria. # DOVE: Nel metodo __init__. # COME: Creata un'istanza di OpenSkyApi. try: self.api_client = OpenSkyApi() module_logger.info(f"{self.name}: OpenSkyApi client initialized successfully.") except Exception as e: module_logger.critical(f"{self.name}: Failed to initialize OpenSkyApi client: {e}", exc_info=True) # Potremmo voler sollevare l'eccezione o gestire lo stato in modo che il thread non parta raise RuntimeError(f"Failed to initialize OpenSkyApi client: {e}") from e module_logger.debug( f"{self.name} initialized. BBox (dict): {self.bounding_box_dict}, " f"Base Interval: {self.base_polling_interval}s" ) def _generate_mock_flight_state(self, icao_suffix: int) -> CanonicalFlightState: """Helper to generate a single mock CanonicalFlightState.""" now = time.time() lat_center = (self.bounding_box_dict["lat_min"] + self.bounding_box_dict["lat_max"]) / 2 lon_center = (self.bounding_box_dict["lon_min"] + self.bounding_box_dict["lon_max"]) / 2 lat_span = self.bounding_box_dict["lat_max"] - self.bounding_box_dict["lat_min"] lon_span = self.bounding_box_dict["lon_max"] - self.bounding_box_dict["lon_min"] return CanonicalFlightState( icao24=f"mock{icao_suffix:02x}", callsign=f"MOCK{icao_suffix:02X}", origin_country="Mockland", timestamp=now, # Primary timestamp (posizione) last_contact_timestamp=now, # Timestamp ultimo contatto latitude=round( lat_center + random.uniform(-lat_span / 2.1, lat_span / 2.1), 4 ), longitude=round( lon_center + random.uniform(-lon_span / 2.1, lon_span / 2.1), 4 ), baro_altitude_m=random.uniform(1000, 12000), geo_altitude_m=random.uniform(1000,12000) + random.uniform(-200,200), # Aggiunto geo_altitude fittizio on_ground=random.choice([True, False]), velocity_mps=random.uniform(50, 250), true_track_deg=random.uniform(0, 360), vertical_rate_mps=random.uniform(-10, 10), squawk=str(random.randint(1000, 7777)), spi=random.choice([True, False]), position_source=0, # ADS-B (fittizio) raw_data_provider=f"{PROVIDER_NAME}-Mock", ) def _send_status_to_queue( self, status_code: str, message: str, details: Optional[Dict] = None ): status_payload = { "type": MSG_TYPE_ADAPTER_STATUS, "status_code": status_code, "message": f"{self.name}: {message}", } if details: status_payload["details"] = details try: self.output_queue.put_nowait(status_payload) module_logger.debug( f"{self.name}: Sent status '{status_code}' to queue. Msg: {message}" ) except QueueFull: module_logger.warning( f"{self.name}: Output queue full. Could not send status: {status_code}" ) except Exception as e: module_logger.error( f"{self.name}: Error sending status '{status_code}' to queue: {e}", exc_info=True, ) def stop(self): module_logger.info( f"Stop signal received for {self.name}. Signaling stop event." ) self._stop_event.set() # MODIFICATO: Il metodo _parse_state_vector è stato rimosso. # PERCHÉ: La libreria opensky-api restituisce oggetti StateVector già parsati. # DOVE: L'intero metodo è stato eliminato. # COME: Rimozione del codice del metodo. # MODIFICATO: Nuovo metodo per convertire StateVector in CanonicalFlightState # PERCHÉ: Necessario per mappare i dati dalla libreria al nostro modello canonico. # DOVE: Aggiunto come nuovo metodo helper. # COME: Implementata la logica di mapping campo per campo. def _convert_state_vector_to_canonical(self, sv: Any) -> Optional[CanonicalFlightState]: """Converts an OpenSkyApi StateVector object to a CanonicalFlightState object.""" try: if sv.icao24 is None: # icao24 è un campo obbligatorio module_logger.warning(f"Skipping StateVector with None icao24: {sv}") return None # Gestione timestamp: CanonicalFlightState.timestamp è il timestamp della posizione/stato. # StateVector ha time_position e last_contact. # Diamo priorità a time_position per il nostro timestamp primario se disponibile. primary_ts = sv.time_position if sv.time_position is not None else sv.last_contact last_contact_ts = sv.last_contact if primary_ts is None: # Se anche last_contact è None, usiamo il tempo corrente primary_ts = time.time() module_logger.warning(f"ICAO {sv.icao24}: Using current time as primary_timestamp due to missing API timestamps (last_contact={sv.last_contact}, time_position={sv.time_position}).") if last_contact_ts is None: last_contact_ts = primary_ts # Se last_contact è None, impostalo al primary_ts determinato # La libreria dovrebbe già fornire i valori nelle unità corrette (metri, m/s) # ma è bene verificarlo nella documentazione della libreria o con test. # Ad esempio, velocity è in m/s, altitude in metri. return CanonicalFlightState( icao24=sv.icao24, callsign=sv.callsign.strip() if sv.callsign else None, # Assicurati che il callsign sia pulito origin_country=sv.origin_country, timestamp=float(primary_ts), last_contact_timestamp=float(last_contact_ts), latitude=sv.latitude, longitude=sv.longitude, baro_altitude_m=sv.baro_altitude, on_ground=sv.on_ground, velocity_mps=sv.velocity, true_track_deg=sv.true_track, vertical_rate_mps=sv.vertical_rate, geo_altitude_m=sv.geo_altitude, squawk=sv.squawk, spi=sv.spi, position_source=str(sv.position_source) if sv.position_source is not None else None, # Converti in stringa per coerenza, se necessario raw_data_provider=PROVIDER_NAME, ) except AttributeError as e_attr: module_logger.error(f"AttributeError parsing StateVector: {e_attr}. Vector: {sv}", exc_info=False) return None except Exception as e: module_logger.error(f"Error converting StateVector for ICAO '{getattr(sv, 'icao24', 'UNKNOWN')}': {e}. Vector: {sv}", exc_info=True) return None def _perform_api_request(self) -> Dict[str, Any]: """ Performs API request using opensky-api library or generates mock data. Returns a structured result dictionary. """ # --- MOCK API Logic (invariato) --- if app_config.USE_MOCK_OPENSKY_API: # ... (logica mock esistente, non la ripeto per brevità ma è identica a prima) ... module_logger.info(f"{self.name}: Using MOCK API data as per configuration.") self._send_status_to_queue(STATUS_FETCHING, "Generating mock flight data...") if app_config.MOCK_API_ERROR_SIMULATION == "RATE_LIMITED": # Simula rate limit self._consecutive_api_errors += 1 self._in_backoff_mode = True mock_retry_after = str(getattr(app_config, "MOCK_RETRY_AFTER_SECONDS", 60)) delay = self._calculate_next_backoff_delay(provided_retry_after=mock_retry_after) err_msg = f"MOCK: Rate limited. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.warning(f"{self.name}: {err_msg}") return {"error_type": STATUS_RATE_LIMITED, "delay": delay, "message": err_msg, "consecutive_errors": self._consecutive_api_errors, "status_code": 429} if app_config.MOCK_API_ERROR_SIMULATION == "HTTP_ERROR": # Simula errore HTTP generico self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() mock_status_code = getattr(app_config, "MOCK_HTTP_ERROR_STATUS", 500) err_msg = f"MOCK: HTTP error {mock_status_code}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=False) return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": mock_status_code, "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} mock_states: List[CanonicalFlightState] = [] mock_count = getattr(app_config, "MOCK_API_FLIGHT_COUNT", 5) for i in range(mock_count): mock_states.append(self._generate_mock_flight_state(i + 1)) module_logger.info(f"{self.name}: Generated {len(mock_states)} mock flight states.") self._reset_error_state() # Resetta errori se il mock ha successo return {"data": mock_states} # --- END MOCK API Logic --- # --- REAL API Call Logic (MODIFICATO per usare opensky-api) --- if not self.bounding_box_dict: # Usa il nostro formato standard per il check err_msg = "Bounding box not set for REAL API request." module_logger.error(f"{self.name}: {err_msg}") return {"error_type": STATUS_PERMANENT_FAILURE, "message": err_msg, "status_code": "NO_BBOX", "delay": 0.0, "consecutive_errors": self._consecutive_api_errors} # MODIFICATO: Conversione del BBox al formato richiesto dalla libreria (tupla) # PERCHÉ: La libreria opensky-api si aspetta una tupla (min_lat, max_lat, min_lon, max_lon). # DOVE: Prima della chiamata API. # COME: Creata una tupla dal dizionario self.bounding_box_dict. try: api_bbox_tuple = ( self.bounding_box_dict["lat_min"], self.bounding_box_dict["lat_max"], self.bounding_box_dict["lon_min"], self.bounding_box_dict["lon_max"], ) except KeyError as e_key: err_msg = f"Missing key in bounding_box_dict: {e_key}" module_logger.error(f"{self.name}: {err_msg}") return {"error_type": STATUS_PERMANENT_FAILURE, "message": err_msg, "status_code": "INVALID_BBOX_FORMAT", "delay": 0.0, "consecutive_errors": self._consecutive_api_errors} self._send_status_to_queue( STATUS_FETCHING, f"Requesting REAL data for bbox (tuple): {api_bbox_tuple}" ) try: # MODIFICATO: Chiamata API tramite la libreria opensky-api # PERCHÉ: Sostituisce la chiamata HTTP manuale. # DOVE: All'interno del blocco try per la chiamata API. # COME: Utilizzato self.api_client.get_states(). states_response = self.api_client.get_states(bbox=api_bbox_tuple) # La libreria gestisce il rate limiting internamente, quindi non dovremmo vedere 429 qui # a meno che la libreria non lo propaghi, cosa che non sembra fare per default. # Se get_states() ritorna, assumiamo che sia andato a buon fine o che il rate limit sia stato gestito. self._reset_error_state() # Chiamata API riuscita (o rate limit gestito dalla libreria) canonical_states: List[CanonicalFlightState] = [] if states_response and states_response.states: for sv in states_response.states: canonical_sv = self._convert_state_vector_to_canonical(sv) if canonical_sv: canonical_states.append(canonical_sv) module_logger.info( f"{self.name}: Fetched and parsed {len(canonical_states)} flight states via opensky-api." ) else: module_logger.info( f"{self.name}: API returned no flight states (states_response or .states is null/empty)." ) return {"data": canonical_states} # MODIFICATO: Gestione delle eccezioni # PERCHÉ: La libreria opensky-api potrebbe sollevare eccezioni di 'requests' per errori di rete/HTTP. # DOVE: Nel blocco except per la chiamata API. # COME: Catturate eccezioni generiche di 'requests' e attivata la nostra logica di backoff. except requests.exceptions.HTTPError as http_err: # Cattura errori HTTP specifici self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() # Non c'è Retry-After header da passare qui status_code = http_err.response.status_code if http_err.response else "N/A" # Se è 429, la libreria dovrebbe averlo gestito. Se lo vediamo qui, è inaspettato. if status_code == 429: err_msg = f"UNEXPECTED Rate limit (429) propagated by library. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}") return {"error_type": STATUS_RATE_LIMITED, "delay": delay, "message": err_msg, "consecutive_errors": self._consecutive_api_errors, "status_code": 429} else: err_msg = f"HTTP error {status_code} from opensky-api: {http_err}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=False) # exc_info=False per non loggare lo stacktrace di requests return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": status_code, "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} except requests.exceptions.Timeout: self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() err_msg = f"API request timed out (via opensky-api). Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=False) return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "TIMEOUT", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} except requests.exceptions.RequestException as req_err: # Cattura altre eccezioni di requests (es. ConnectionError) self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() err_msg = f"Request error via opensky-api: {req_err}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=True) # exc_info=True qui può essere utile return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "REQUEST_EXCEPTION", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} except Exception as e: # Cattura eccezioni impreviste self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() err_msg = f"Unexpected critical error using opensky-api: {e}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.critical(f"{self.name}: {err_msg}", exc_info=True) return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "UNEXPECTED_LIBRARY_ERROR", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} def _calculate_next_backoff_delay( self, provided_retry_after: Optional[str] = None # Mantenuto per il MOCK, ma la libreria dovrebbe gestire per le chiamate reali ) -> float: calculated_delay = INITIAL_BACKOFF_DELAY_SECONDS * ( BACKOFF_FACTOR ** (self._consecutive_api_errors - 1) ) api_delay_from_header = 0.0 # Questa logica per Retry-After potrebbe non essere più necessaria se la libreria gestisce il 429. # La manteniamo per il mock e se la libreria dovesse propagare un 429 in modo inaspettato. if provided_retry_after is not None and provided_retry_after.isdigit(): try: api_delay_from_header = float(provided_retry_after) module_logger.debug( f"{self.name}: Found Retry-After header value: {provided_retry_after}s" ) except ValueError: module_logger.warning( f"{self.name}: Could not parse Retry-After header value: '{provided_retry_after}'" ) api_delay_from_header = 0.0 self._current_backoff_delay = max(api_delay_from_header, calculated_delay) self._current_backoff_delay = min( self._current_backoff_delay, MAX_BACKOFF_DELAY_SECONDS ) self._current_backoff_delay = max(self._current_backoff_delay, 1.0) # Assicura un delay minimo module_logger.debug( f"{self.name}: Calculated next backoff delay: {self._current_backoff_delay:.1f}s" ) return self._current_backoff_delay def _reset_error_state(self): if self._in_backoff_mode or self._consecutive_api_errors > 0: module_logger.info( f"{self.name}: API connection successful after {self._consecutive_api_errors} error(s). Resetting backoff." ) self._send_status_to_queue(STATUS_RECOVERED, "API connection recovered.") self._consecutive_api_errors = 0 self._current_backoff_delay = 0.0 self._in_backoff_mode = False def run(self): initial_settle_delay_seconds = 0.1 # Breve ritardo per permettere alla GUI di stabilizzarsi se necessario module_logger.debug(f"{self.name} thread starting initial settle delay ({initial_settle_delay_seconds}s)...") if self._stop_event.wait(timeout=initial_settle_delay_seconds): module_logger.info(f"{self.name} thread received stop signal during initial settle. Terminating.") return module_logger.info(f"{self.name} thread is fully operational. Base polling interval: {self.base_polling_interval:.1f}s.") self._send_status_to_queue(STATUS_STARTING, "Adapter thread started, preparing initial fetch.") while not self._stop_event.is_set(): if self._consecutive_api_errors >= MAX_CONSECUTIVE_ERRORS_THRESHOLD: perm_fail_msg = f"Reached max ({self._consecutive_api_errors}) consecutive API errors. Stopping live updates." module_logger.critical(f"{self.name}: {perm_fail_msg}") self._send_status_to_queue(STATUS_PERMANENT_FAILURE, perm_fail_msg) break # Esce dal loop principale api_result = self._perform_api_request() if self._stop_event.is_set(): # Controlla di nuovo dopo la chiamata API (potrebbe essere lunga) module_logger.info(f"{self.name}: Stop event detected after API request. Exiting loop.") break if "data" in api_result: flight_data_payload: List[CanonicalFlightState] = api_result["data"] try: self.output_queue.put_nowait( {"type": MSG_TYPE_FLIGHT_DATA, "payload": flight_data_payload} ) module_logger.debug( f"{self.name}: Sent {len(flight_data_payload)} flight states to queue." ) except QueueFull: module_logger.warning( f"{self.name}: Output queue full. Discarding {len(flight_data_payload)} flight states." ) except Exception as e: # Altre eccezioni mettendo in coda module_logger.error(f"{self.name}: Error putting flight data into queue: {e}", exc_info=True) elif "error_type" in api_result: # Se _perform_api_request ha restituito un errore strutturato error_details_for_controller = api_result.copy() # Assicurati che il tipo sia corretto per il messaggio di stato error_details_for_controller["type"] = MSG_TYPE_ADAPTER_STATUS # status_code è già l'error_type # message dovrebbe già essere presente # delay e consecutive_errors sono utili per il controller self._send_status_to_queue( status_code=api_result["error_type"], # es. STATUS_API_ERROR_TEMPORARY message=api_result.get("message", "An API error occurred."), details=api_result # Passa tutti i dettagli dell'errore ) else: # Struttura risultato sconosciuta module_logger.error(f"{self.name}: Unknown result structure from _perform_api_request: {api_result}") time_to_wait_seconds: float if self._in_backoff_mode: time_to_wait_seconds = self._current_backoff_delay module_logger.debug(f"{self.name}: In backoff, next attempt in {time_to_wait_seconds:.1f}s.") else: time_to_wait_seconds = self.base_polling_interval module_logger.debug(f"{self.name}: Next fetch cycle in {time_to_wait_seconds:.1f}s.") if time_to_wait_seconds > 0: if self._stop_event.wait(timeout=time_to_wait_seconds): print(f"{self.name}: Stop event received during wait period. Exiting loop.") break else: # Se il tempo di attesa è zero o negativo, controlla comunque lo stop event if self._stop_event.is_set(): print(f"{self.name}: Stop event detected before waiting (wait time <= 0). Exiting loop.") break # --- End Main Adapter Loop --- print(f"{self.name}: Exited main loop. Sending final STOPPED status.") try: final_status_payload = { "type": MSG_TYPE_ADAPTER_STATUS, "status_code": STATUS_STOPPED, "message": f"{self.name}: Adapter thread stopped.", } self.output_queue.put_nowait(final_status_payload) print(f"{self.name}: Successfully sent final STATUS_STOPPED to queue.") except QueueFull: print(f"{self.name}: Output queue full. Could not send final STATUS_STOPPED message.") except Exception as e_final_put: print(f"{self.name}: Error sending final STATUS_STOPPED message: {e_final_put}", exc_info=True) print(f"{self.name}: RUN method is terminating now.")