# FlightMonitor/data/opensky_live_adapter.py """ Adapter for fetching live flight data from the OpenSky Network API. This adapter polls the API periodically, transforms the raw data into CanonicalFlightState objects, and puts structured messages into an output queue. """ import requests import json import time import threading from queue import Queue, Full as QueueFull # Import QueueFull for specific exception handling from typing import List, Optional, Dict, Any, Union import random # Aggiungi per il mock # Relative imports from . import config as app_config from ..utils.logger import get_logger from .common_models import CanonicalFlightState module_logger = get_logger(__name__) # flightmonitor.data.opensky_live_adapter # --- Adapter Specific Constants --- PROVIDER_NAME = "OpenSkyNetwork" INITIAL_BACKOFF_DELAY_SECONDS = 20.0 # Increased initial backoff slightly MAX_BACKOFF_DELAY_SECONDS = 300.0 # Max 5 minutes BACKOFF_FACTOR = 1.8 MAX_CONSECUTIVE_ERRORS_THRESHOLD = 5 # --- Message Types and Status Codes for Output Queue --- AdapterMessage = Dict[str, Any] FlightDataPayload = List[CanonicalFlightState] MSG_TYPE_FLIGHT_DATA = "flight_data" MSG_TYPE_ADAPTER_STATUS = "adapter_status" STATUS_STARTING = "STARTING" # Adapter thread is starting STATUS_FETCHING = "FETCHING" # Actively fetching data now STATUS_RECOVERED = "RECOVERED" # Recovered from a previous error state STATUS_RATE_LIMITED = "RATE_LIMITED" # Hit API rate limit STATUS_API_ERROR_TEMPORARY = "API_ERROR_TEMPORARY" # Other temporary API/network error STATUS_PERMANENT_FAILURE = "PERMANENT_FAILURE" # Too many errors, giving up STATUS_STOPPING = "STOPPING" # Adapter thread is stopping STATUS_STOPPED = "STOPPED" # Adapter thread has stopped cleanly class OpenSkyLiveAdapter(threading.Thread): """ Polls the OpenSky Network API, converts data to CanonicalFlightState, and sends structured messages (flight data or status updates) to an output queue. Implements exponential backoff for API errors. """ def __init__(self, output_queue: Queue[AdapterMessage], bounding_box: Dict[str, float], polling_interval: int = app_config.LIVE_POLLING_INTERVAL_SECONDS, api_timeout: int = app_config.DEFAULT_API_TIMEOUT_SECONDS, daemon: bool = True): thread_name = f"OpenSkyLiveAdapter-bbox-{bounding_box.get('lat_min',0):.1f}" super().__init__(daemon=daemon, name=thread_name) if output_queue is None: # This is a critical setup error. module_logger.critical(f"{self.name}: Output queue cannot be None during initialization.") raise ValueError("Output queue must be provided to OpenSkyLiveAdapter.") self.output_queue = output_queue self.bounding_box = bounding_box self.base_polling_interval = float(polling_interval) # Ensure float for time calculations self.api_timeout = float(api_timeout) self._stop_event = threading.Event() self._consecutive_api_errors: int = 0 self._current_backoff_delay: float = 0.0 # Current calculated delay for backoff self._in_backoff_mode: bool = False module_logger.debug( f"{self.name} initialized. BBox: {self.bounding_box}, " f"Base Interval: {self.base_polling_interval}s, API Timeout: {self.api_timeout}s" ) def _generate_mock_flight_state(self, icao_suffix: int) -> CanonicalFlightState: """Helper to generate a single mock CanonicalFlightState.""" now = time.time() lat_center = (self.bounding_box["lat_min"] + self.bounding_box["lat_max"]) / 2 lon_center = (self.bounding_box["lon_min"] + self.bounding_box["lon_max"]) / 2 lat_span = self.bounding_box["lat_max"] - self.bounding_box["lat_min"] lon_span = self.bounding_box["lon_max"] - self.bounding_box["lon_min"] return CanonicalFlightState( icao24=f"mock{icao_suffix:02x}", callsign=f"MOCK{icao_suffix:02X}", origin_country="Mockland", timestamp=now, last_contact_timestamp=now, latitude=round(lat_center + random.uniform(-lat_span / 2.1, lat_span / 2.1), 4), longitude=round(lon_center + random.uniform(-lon_span / 2.1, lon_span / 2.1), 4), baro_altitude_m=random.uniform(1000, 12000), on_ground=random.choice([True, False]), velocity_mps=random.uniform(50, 250), true_track_deg=random.uniform(0, 360), vertical_rate_mps=random.uniform(-10, 10), squawk=str(random.randint(1000, 7777)), spi=random.choice([True, False]), position_source="MOCK_GENERATOR", raw_data_provider=f"{PROVIDER_NAME}-Mock" ) def _send_status_to_queue(self, status_code: str, message: str, details: Optional[Dict] = None): """Helper to put a status message into the output queue.""" try: status_payload = { "type": MSG_TYPE_ADAPTER_STATUS, "status_code": status_code, "message": f"{self.name}: {message}", # Prepend adapter name for clarity } if details: status_payload["details"] = details self.output_queue.put_nowait(status_payload) module_logger.debug(f"{self.name}: Sent status '{status_code}' to queue. Msg: {message}") except QueueFull: module_logger.warning(f"{self.name}: Output queue is full. Could not send status: {status_code}") except Exception as e: module_logger.error(f"{self.name}: Error sending status '{status_code}' to queue: {e}", exc_info=True) def stop(self): """Signals the thread to stop its execution loop and sends a STOPPING status.""" # Non inviare STATUS_STOPPING qui, perché potrebbe non essere processato # se il controller è già in attesa o se la coda è bloccata. # Invieremo STATUS_STOPPED alla fine di run(), se possibile. # L'importante è settare l'evento. module_logger.info(f"Stop signal received for {self.name}. Signaling stop event.") self._stop_event.set() # Rimuoviamo _send_status_to_queue(STATUS_STOPPING, ...) da qui def _parse_state_vector(self, raw_sv: list) -> Optional[CanonicalFlightState]: """Parses a raw OpenSky state vector into a CanonicalFlightState object.""" try: if not raw_sv or len(raw_sv) < 17: module_logger.warning(f"Skipping incomplete state vector (length {len(raw_sv)}): {raw_sv}") return None icao24 = raw_sv[0] if not isinstance(icao24, str) or not icao24.strip(): module_logger.warning(f"Skipping state vector due to invalid/missing ICAO24: '{icao24}' in {raw_sv}") return None api_time_position = float(raw_sv[3]) if raw_sv[3] is not None else None api_last_contact = float(raw_sv[4]) if raw_sv[4] is not None else None primary_timestamp: float if api_last_contact is not None: primary_timestamp = api_last_contact elif api_time_position is not None: primary_timestamp = api_time_position else: primary_timestamp = time.time() module_logger.warning(f"ICAO {icao24}: Using current time as primary_timestamp due to missing API timestamps (last_contact={api_last_contact}, time_position={api_time_position}).") last_contact_for_model = api_last_contact if api_last_contact is not None else primary_timestamp state = CanonicalFlightState( icao24=icao24, callsign=raw_sv[1], origin_country=raw_sv[2], timestamp=primary_timestamp, last_contact_timestamp=last_contact_for_model, latitude=float(raw_sv[5]) if raw_sv[5] is not None else None, longitude=float(raw_sv[6]) if raw_sv[6] is not None else None, baro_altitude_m=float(raw_sv[7]) if raw_sv[7] is not None else None, on_ground=bool(raw_sv[8]), velocity_mps=float(raw_sv[9]) if raw_sv[9] is not None else None, true_track_deg=float(raw_sv[10]) if raw_sv[10] is not None else None, vertical_rate_mps=float(raw_sv[11]) if raw_sv[11] is not None else None, geo_altitude_m=float(raw_sv[13]) if raw_sv[13] is not None else None, squawk=raw_sv[14], spi=bool(raw_sv[15]), position_source=int(raw_sv[16]) if raw_sv[16] is not None else None, raw_data_provider=PROVIDER_NAME ) return state except (TypeError, ValueError, IndexError) as e: module_logger.error(f"Error parsing state vector for ICAO '{raw_sv[0] if raw_sv else 'UNKNOWN'}': {e}. Vector: {raw_sv}", exc_info=False) # Less verbose exc_info return None def _perform_api_request(self) -> Dict[str, Any]: """ Performs API request or generates mock data based on config. Returns a structured result. """ # --- MODIFICA INIZIO: Logica Mock API --- if app_config.USE_MOCK_OPENSKY_API: module_logger.info(f"{self.name}: Using MOCK API data as per configuration.") self._send_status_to_queue(STATUS_FETCHING, "Generating mock flight data...") time.sleep(0.5) # Simula una piccola latenza di rete if app_config.MOCK_API_ERROR_SIMULATION == "RATE_LIMITED": self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay("60") # Simula Retry-After 60s err_msg = f"MOCK: Rate limited. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.warning(f"{self.name}: {err_msg}") return {"error_type": STATUS_RATE_LIMITED, "delay": delay, "message": err_msg, "consecutive_errors": self._consecutive_api_errors} if app_config.MOCK_API_ERROR_SIMULATION == "HTTP_ERROR": self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() err_msg = f"MOCK: HTTP error 500. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=False) return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": 500, "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} # Se non è un errore simulato, genera dati mock mock_states: List[CanonicalFlightState] = [] for i in range(app_config.MOCK_API_FLIGHT_COUNT): mock_states.append(self._generate_mock_flight_state(i + 1)) module_logger.info(f"{self.name}: Generated {len(mock_states)} mock flight states.") self._reset_error_state() # Mock success, reset error state return {"data": mock_states} # --- MODIFICA FINE: Logica Mock API --- # Codice originale per le chiamate API reali (come prima) if not self.bounding_box: return {"error_type": STATUS_API_ERROR_TEMPORARY, "message": "Bounding box not set."} params = { "lamin": self.bounding_box["lat_min"], "lomin": self.bounding_box["lon_min"], "lamax": self.bounding_box["lat_max"], "lomax": self.bounding_box["lon_max"], } self._send_status_to_queue(STATUS_FETCHING, f"Requesting REAL data for bbox: {self.bounding_box}") response: Optional[requests.Response] = None try: response = requests.get(app_config.OPENSKY_API_URL, params=params, timeout=self.api_timeout) # ... (resto della logica API reale come prima, assicurati che NON ci sia 'return "test"') ... # Assicurati che tutti i percorsi di ritorno qui restituiscano un dizionario corretto. # Ad esempio, il 'return {"data": canonical_states}' è corretto. # Anche i ritorni per errori come STATUS_RATE_LIMITED sono dizionari corretti. module_logger.debug(f"{self.name}: API Response Status: {response.status_code} {response.reason}") if response.status_code == 429: # Rate limit self._consecutive_api_errors += 1 self._in_backoff_mode = True delay = self._calculate_next_backoff_delay(response.headers.get("Retry-After")) err_msg = f"Rate limited (429). Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.warning(f"{self.name}: {err_msg}") return {"error_type": STATUS_RATE_LIMITED, "delay": delay, "message": err_msg, "consecutive_errors": self._consecutive_api_errors} response.raise_for_status() self._reset_error_state() response_data = response.json() raw_states_list = response_data.get("states") canonical_states: List[CanonicalFlightState] = [] if raw_states_list is not None: for raw_sv in raw_states_list: parsed_state = self._parse_state_vector(raw_sv) if parsed_state: canonical_states.append(parsed_state) module_logger.info(f"{self.name}: Fetched and parsed {len(canonical_states)} flight states.") else: module_logger.info(f"{self.name}: API returned no flight states ('states' is null or empty).") return {"data": canonical_states} except requests.exceptions.HTTPError as http_err: self._consecutive_api_errors += 1; self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() status_code = http_err.response.status_code if http_err.response else 'N/A' err_msg = f"HTTP error {status_code}: {http_err}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=False) return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": status_code, "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} except requests.exceptions.Timeout: self._consecutive_api_errors += 1; self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() err_msg = f"API request timed out. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=False) return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "TIMEOUT", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} except (requests.exceptions.RequestException, json.JSONDecodeError) as e: self._consecutive_api_errors += 1; self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() err_msg = f"Request/JSON error: {e}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.error(f"{self.name}: {err_msg}", exc_info=True) return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "REQUEST_JSON_ERROR", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} except Exception as e: self._consecutive_api_errors += 1; self._in_backoff_mode = True delay = self._calculate_next_backoff_delay() err_msg = f"Unexpected critical error: {e}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s." module_logger.critical(f"{self.name}: {err_msg}", exc_info=True) return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "UNEXPECTED_ADAPTER_ERROR", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors} def _calculate_next_backoff_delay(self, provided_retry_after: Optional[str] = None) -> float: """Calculates exponential backoff delay, respecting Retry-After header if provided.""" calculated_delay = INITIAL_BACKOFF_DELAY_SECONDS * (BACKOFF_FACTOR ** (self._consecutive_api_errors - 1)) if provided_retry_after and provided_retry_after.isdigit(): api_delay = float(provided_retry_after) self._current_backoff_delay = max(api_delay, calculated_delay) # Respect API if it asks for longer else: self._current_backoff_delay = calculated_delay self._current_backoff_delay = min(self._current_backoff_delay, MAX_BACKOFF_DELAY_SECONDS) self._current_backoff_delay = max(self._current_backoff_delay, 1.0) # Ensure at least 1s delay return self._current_backoff_delay def _reset_error_state(self): """Resets error counters and backoff mode, sends RECOVERED status if was in error.""" if self._in_backoff_mode or self._consecutive_api_errors > 0: module_logger.info(f"{self.name}: API connection successful after {self._consecutive_api_errors} error(s). Resetting backoff.") self._send_status_to_queue(STATUS_RECOVERED, "API connection recovered.") self._consecutive_api_errors = 0 self._current_backoff_delay = 0.0 self._in_backoff_mode = False def run(self): initial_settle_delay_seconds = 0.2 try: if self._stop_event.wait(timeout=initial_settle_delay_seconds): # module_logger.info(f"{self.name} thread received stop signal during initial settle delay. Terminating early.") print(f"DEBUG_ADAPTER ({self.name}): Thread received stop signal during initial settle delay. Terminating early.", flush=True) return except Exception as e: # module_logger.error(f"{self.name} unexpected error during initial settle delay: {e}", exc_info=True) print(f"DEBUG_ADAPTER ({self.name}): ERROR - Unexpected error during initial settle delay: {e}", flush=True) return # module_logger.info(f"{self.name} thread started (after {initial_settle_delay_seconds}s delay). Base polling interval: {self.base_polling_interval:.1f}s.") print(f"DEBUG_ADAPTER ({self.name}): Thread started (after {initial_settle_delay_seconds}s delay). Base polling interval: {self.base_polling_interval:.1f}s.", flush=True) if not self._stop_event.is_set(): self._send_status_to_queue(STATUS_STARTING, "Adapter thread started, preparing initial fetch.") else: # module_logger.info(f"{self.name}: Stop event was set before sending initial STATUS_STARTING.") print(f"DEBUG_ADAPTER ({self.name}): Stop event was set before sending initial STATUS_STARTING.", flush=True) # --- Loop principale dell'adapter --- while not self._stop_event.is_set(): if self._consecutive_api_errors >= MAX_CONSECUTIVE_ERRORS_THRESHOLD: perm_fail_msg = f"Reached max ({self._consecutive_api_errors}) consecutive API errors. Stopping live updates." # module_logger.critical(f"{self.name}: {perm_fail_msg}") print(f"DEBUG_ADAPTER ({self.name}): CRITICAL - {perm_fail_msg}", flush=True) if not self._stop_event.is_set(): self._send_status_to_queue(STATUS_PERMANENT_FAILURE, perm_fail_msg) break api_result = self._perform_api_request() if self._stop_event.is_set(): print(f"DEBUG_ADAPTER ({self.name}): Stop event detected after API request. Exiting loop.", flush=True) break if "data" in api_result: flight_data_payload: List[CanonicalFlightState] = api_result["data"] try: if not self._stop_event.is_set(): self.output_queue.put_nowait({ "type": MSG_TYPE_FLIGHT_DATA, "payload": flight_data_payload }) print(f"DEBUG_ADAPTER ({self.name}): Sent {len(flight_data_payload)} flight states to queue.", flush=True) # else: # print(f"DEBUG_ADAPTER ({self.name}: Stop event set, not sending {len(flight_data_payload)} flight states to queue.", flush=True) except QueueFull: # Dovrebbe essere importato 'from queue import Full as QueueFull' module_logger.warning(f"{self.name}: Output queue full. Discarding {len(flight_data_payload)} flight states.") # Logger OK qui except Exception as e: module_logger.error(f"{self.name}: Error putting flight data into queue: {e}", exc_info=True) # Logger OK qui elif "error_type" in api_result: error_details_for_controller = api_result.copy() error_details_for_controller["type"] = MSG_TYPE_ADAPTER_STATUS error_details_for_controller["status_code"] = api_result["error_type"] try: if not self._stop_event.is_set(): self.output_queue.put_nowait(error_details_for_controller) # else: # print(f"DEBUG_ADAPTER ({self.name}: Stop event set, not sending error status to queue: {api_result['error_type']}", flush=True) except QueueFull: # Dovrebbe essere importato 'from queue import Full as QueueFull' module_logger.warning(f"{self.name}: Output queue full. Discarding error status: {api_result['error_type']}") # Logger OK qui except Exception as e: module_logger.error(f"{self.name}: Error putting error status into queue: {e}", exc_info=True) # Logger OK qui else: # Gestito dalla logica mock o un errore reale print(f"DEBUG_ADAPTER ({self.name}): Unknown result structure from _perform_api_request: {api_result}", flush=True) if self._stop_event.is_set(): print(f"DEBUG_ADAPTER ({self.name}): Stop event detected before waiting. Exiting loop.", flush=True) break time_to_wait_seconds: float if self._in_backoff_mode: time_to_wait_seconds = self._current_backoff_delay print(f"DEBUG_ADAPTER ({self.name}): In backoff, next attempt in {time_to_wait_seconds:.1f}s.", flush=True) else: time_to_wait_seconds = self.base_polling_interval print(f"DEBUG_ADAPTER ({self.name}): Next fetch cycle in {time_to_wait_seconds:.1f}s.", flush=True) if self._stop_event.wait(timeout=time_to_wait_seconds): print(f"DEBUG_ADAPTER ({self.name}): Stop event received during wait period. Exiting loop.", flush=True) break # --- Fine Loop principale dell'adapter --- # --- INIZIO PARTE SUPER SEMPLIFICATA PER DEBUG --- print(f"DEBUG_ADAPTER_FINAL ({self.name}): REACHED END OF WHILE LOOP. About to attempt final put.", flush=True) try: # Tentativo di inviare un messaggio molto semplice alla coda print(f"DEBUG_ADAPTER_FINAL ({self.name}): Attempting very simple put to output_queue.", flush=True) self.output_queue.put({"type": "ADAPTER_TERMINATING_DEBUG", "message": "Adapter run method ending"}, timeout=0.5) print(f"DEBUG_ADAPTER_FINAL ({self.name}): Simple put to output_queue SUCCEEDED or TIMED OUT without blocking.", flush=True) except Exception as e: # Cattura qualsiasi eccezione dal put, inclusa QueueFull se timeout scade su coda piena con maxsize print(f"DEBUG_ADAPTER_FINAL ({self.name}): EXCEPTION during simple put: {type(e).__name__} - {e}", flush=True) print(f"DEBUG_ADAPTER_FINAL ({self.name}): RUN METHOD IS TERMINATING NOW. THIS IS THE ABSOLUTE LAST PRINT.", flush=True) # NESSUN'ALTRA OPERAZIONE QUI # Il thread termina implicitamente qui quando il metodo run() esce. # --- FINE PARTE SUPER SEMPLIFICATA PER DEBUG --- # Il thread termina implicitamente qui quando il metodo run() esce.