SXXXXXXX_FlightMonitor/flightmonitor/data/opensky_live_adapter.py

413 lines
24 KiB
Python

# FlightMonitor/data/opensky_live_adapter.py
"""
Adapter for fetching live flight data from the OpenSky Network API.
This adapter polls the API periodically, transforms the raw data into
CanonicalFlightState objects, and puts structured messages into an output queue.
"""
import requests
import json
import time
import threading
from queue import Queue, Full as QueueFull # Import QueueFull for specific exception handling
from typing import List, Optional, Dict, Any, Union
import random # Aggiungi per il mock
# Relative imports
from . import config as app_config
from ..utils.logger import get_logger
from .common_models import CanonicalFlightState
module_logger = get_logger(__name__) # flightmonitor.data.opensky_live_adapter
# --- Adapter Specific Constants ---
PROVIDER_NAME = "OpenSkyNetwork"
INITIAL_BACKOFF_DELAY_SECONDS = 20.0 # Increased initial backoff slightly
MAX_BACKOFF_DELAY_SECONDS = 300.0 # Max 5 minutes
BACKOFF_FACTOR = 1.8
MAX_CONSECUTIVE_ERRORS_THRESHOLD = 5
# --- Message Types and Status Codes for Output Queue ---
AdapterMessage = Dict[str, Any]
FlightDataPayload = List[CanonicalFlightState]
MSG_TYPE_FLIGHT_DATA = "flight_data"
MSG_TYPE_ADAPTER_STATUS = "adapter_status"
STATUS_STARTING = "STARTING" # Adapter thread is starting
STATUS_FETCHING = "FETCHING" # Actively fetching data now
STATUS_RECOVERED = "RECOVERED" # Recovered from a previous error state
STATUS_RATE_LIMITED = "RATE_LIMITED" # Hit API rate limit
STATUS_API_ERROR_TEMPORARY = "API_ERROR_TEMPORARY" # Other temporary API/network error
STATUS_PERMANENT_FAILURE = "PERMANENT_FAILURE" # Too many errors, giving up
STATUS_STOPPING = "STOPPING" # Adapter thread is stopping
STATUS_STOPPED = "STOPPED" # Adapter thread has stopped cleanly
class OpenSkyLiveAdapter(threading.Thread):
"""
Polls the OpenSky Network API, converts data to CanonicalFlightState,
and sends structured messages (flight data or status updates) to an output queue.
Implements exponential backoff for API errors.
"""
def __init__(self,
output_queue: Queue[AdapterMessage],
bounding_box: Dict[str, float],
polling_interval: int = app_config.LIVE_POLLING_INTERVAL_SECONDS,
api_timeout: int = app_config.DEFAULT_API_TIMEOUT_SECONDS,
daemon: bool = True):
thread_name = f"OpenSkyLiveAdapter-bbox-{bounding_box.get('lat_min',0):.1f}"
super().__init__(daemon=daemon, name=thread_name)
if output_queue is None:
# This is a critical setup error.
module_logger.critical(f"{self.name}: Output queue cannot be None during initialization.")
raise ValueError("Output queue must be provided to OpenSkyLiveAdapter.")
self.output_queue = output_queue
self.bounding_box = bounding_box
self.base_polling_interval = float(polling_interval) # Ensure float for time calculations
self.api_timeout = float(api_timeout)
self._stop_event = threading.Event()
self._consecutive_api_errors: int = 0
self._current_backoff_delay: float = 0.0 # Current calculated delay for backoff
self._in_backoff_mode: bool = False
module_logger.debug(
f"{self.name} initialized. BBox: {self.bounding_box}, "
f"Base Interval: {self.base_polling_interval}s, API Timeout: {self.api_timeout}s"
)
def _generate_mock_flight_state(self, icao_suffix: int) -> CanonicalFlightState:
"""Helper to generate a single mock CanonicalFlightState."""
now = time.time()
lat_center = (self.bounding_box["lat_min"] + self.bounding_box["lat_max"]) / 2
lon_center = (self.bounding_box["lon_min"] + self.bounding_box["lon_max"]) / 2
lat_span = self.bounding_box["lat_max"] - self.bounding_box["lat_min"]
lon_span = self.bounding_box["lon_max"] - self.bounding_box["lon_min"]
return CanonicalFlightState(
icao24=f"mock{icao_suffix:02x}",
callsign=f"MOCK{icao_suffix:02X}",
origin_country="Mockland",
timestamp=now,
last_contact_timestamp=now,
latitude=round(lat_center + random.uniform(-lat_span / 2.1, lat_span / 2.1), 4),
longitude=round(lon_center + random.uniform(-lon_span / 2.1, lon_span / 2.1), 4),
baro_altitude_m=random.uniform(1000, 12000),
on_ground=random.choice([True, False]),
velocity_mps=random.uniform(50, 250),
true_track_deg=random.uniform(0, 360),
vertical_rate_mps=random.uniform(-10, 10),
squawk=str(random.randint(1000, 7777)),
spi=random.choice([True, False]),
position_source="MOCK_GENERATOR",
raw_data_provider=f"{PROVIDER_NAME}-Mock"
)
def _send_status_to_queue(self, status_code: str, message: str, details: Optional[Dict] = None):
"""Helper to put a status message into the output queue."""
try:
status_payload = {
"type": MSG_TYPE_ADAPTER_STATUS,
"status_code": status_code,
"message": f"{self.name}: {message}", # Prepend adapter name for clarity
}
if details:
status_payload["details"] = details
self.output_queue.put_nowait(status_payload)
module_logger.debug(f"{self.name}: Sent status '{status_code}' to queue. Msg: {message}")
except QueueFull:
module_logger.warning(f"{self.name}: Output queue is full. Could not send status: {status_code}")
except Exception as e:
module_logger.error(f"{self.name}: Error sending status '{status_code}' to queue: {e}", exc_info=True)
def stop(self):
"""Signals the thread to stop its execution loop and sends a STOPPING status."""
# Non inviare STATUS_STOPPING qui, perché potrebbe non essere processato
# se il controller è già in attesa o se la coda è bloccata.
# Invieremo STATUS_STOPPED alla fine di run(), se possibile.
# L'importante è settare l'evento.
module_logger.info(f"Stop signal received for {self.name}. Signaling stop event.")
self._stop_event.set()
# Rimuoviamo _send_status_to_queue(STATUS_STOPPING, ...) da qui
def _parse_state_vector(self, raw_sv: list) -> Optional[CanonicalFlightState]:
"""Parses a raw OpenSky state vector into a CanonicalFlightState object."""
try:
if not raw_sv or len(raw_sv) < 17:
module_logger.warning(f"Skipping incomplete state vector (length {len(raw_sv)}): {raw_sv}")
return None
icao24 = raw_sv[0]
if not isinstance(icao24, str) or not icao24.strip():
module_logger.warning(f"Skipping state vector due to invalid/missing ICAO24: '{icao24}' in {raw_sv}")
return None
api_time_position = float(raw_sv[3]) if raw_sv[3] is not None else None
api_last_contact = float(raw_sv[4]) if raw_sv[4] is not None else None
primary_timestamp: float
if api_last_contact is not None:
primary_timestamp = api_last_contact
elif api_time_position is not None:
primary_timestamp = api_time_position
else:
primary_timestamp = time.time()
module_logger.warning(f"ICAO {icao24}: Using current time as primary_timestamp due to missing API timestamps (last_contact={api_last_contact}, time_position={api_time_position}).")
last_contact_for_model = api_last_contact if api_last_contact is not None else primary_timestamp
state = CanonicalFlightState(
icao24=icao24, callsign=raw_sv[1], origin_country=raw_sv[2],
timestamp=primary_timestamp, last_contact_timestamp=last_contact_for_model,
latitude=float(raw_sv[5]) if raw_sv[5] is not None else None,
longitude=float(raw_sv[6]) if raw_sv[6] is not None else None,
baro_altitude_m=float(raw_sv[7]) if raw_sv[7] is not None else None,
on_ground=bool(raw_sv[8]),
velocity_mps=float(raw_sv[9]) if raw_sv[9] is not None else None,
true_track_deg=float(raw_sv[10]) if raw_sv[10] is not None else None,
vertical_rate_mps=float(raw_sv[11]) if raw_sv[11] is not None else None,
geo_altitude_m=float(raw_sv[13]) if raw_sv[13] is not None else None,
squawk=raw_sv[14], spi=bool(raw_sv[15]),
position_source=int(raw_sv[16]) if raw_sv[16] is not None else None,
raw_data_provider=PROVIDER_NAME
)
return state
except (TypeError, ValueError, IndexError) as e:
module_logger.error(f"Error parsing state vector for ICAO '{raw_sv[0] if raw_sv else 'UNKNOWN'}': {e}. Vector: {raw_sv}", exc_info=False) # Less verbose exc_info
return None
def _perform_api_request(self) -> Dict[str, Any]:
"""
Performs API request or generates mock data based on config.
Returns a structured result.
"""
# --- MODIFICA INIZIO: Logica Mock API ---
if app_config.USE_MOCK_OPENSKY_API:
module_logger.info(f"{self.name}: Using MOCK API data as per configuration.")
self._send_status_to_queue(STATUS_FETCHING, "Generating mock flight data...")
time.sleep(0.5) # Simula una piccola latenza di rete
if app_config.MOCK_API_ERROR_SIMULATION == "RATE_LIMITED":
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay("60") # Simula Retry-After 60s
err_msg = f"MOCK: Rate limited. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.warning(f"{self.name}: {err_msg}")
return {"error_type": STATUS_RATE_LIMITED, "delay": delay, "message": err_msg, "consecutive_errors": self._consecutive_api_errors}
if app_config.MOCK_API_ERROR_SIMULATION == "HTTP_ERROR":
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
err_msg = f"MOCK: HTTP error 500. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.error(f"{self.name}: {err_msg}", exc_info=False)
return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": 500, "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors}
# Se non è un errore simulato, genera dati mock
mock_states: List[CanonicalFlightState] = []
for i in range(app_config.MOCK_API_FLIGHT_COUNT):
mock_states.append(self._generate_mock_flight_state(i + 1))
module_logger.info(f"{self.name}: Generated {len(mock_states)} mock flight states.")
self._reset_error_state() # Mock success, reset error state
return {"data": mock_states}
# --- MODIFICA FINE: Logica Mock API ---
# Codice originale per le chiamate API reali (come prima)
if not self.bounding_box:
return {"error_type": STATUS_API_ERROR_TEMPORARY, "message": "Bounding box not set."}
params = {
"lamin": self.bounding_box["lat_min"], "lomin": self.bounding_box["lon_min"],
"lamax": self.bounding_box["lat_max"], "lomax": self.bounding_box["lon_max"],
}
self._send_status_to_queue(STATUS_FETCHING, f"Requesting REAL data for bbox: {self.bounding_box}")
response: Optional[requests.Response] = None
try:
response = requests.get(app_config.OPENSKY_API_URL, params=params, timeout=self.api_timeout)
# ... (resto della logica API reale come prima, assicurati che NON ci sia 'return "test"') ...
# Assicurati che tutti i percorsi di ritorno qui restituiscano un dizionario corretto.
# Ad esempio, il 'return {"data": canonical_states}' è corretto.
# Anche i ritorni per errori come STATUS_RATE_LIMITED sono dizionari corretti.
module_logger.debug(f"{self.name}: API Response Status: {response.status_code} {response.reason}")
if response.status_code == 429: # Rate limit
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay(response.headers.get("Retry-After"))
err_msg = f"Rate limited (429). Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.warning(f"{self.name}: {err_msg}")
return {"error_type": STATUS_RATE_LIMITED, "delay": delay, "message": err_msg, "consecutive_errors": self._consecutive_api_errors}
response.raise_for_status()
self._reset_error_state()
response_data = response.json()
raw_states_list = response_data.get("states")
canonical_states: List[CanonicalFlightState] = []
if raw_states_list is not None:
for raw_sv in raw_states_list:
parsed_state = self._parse_state_vector(raw_sv)
if parsed_state: canonical_states.append(parsed_state)
module_logger.info(f"{self.name}: Fetched and parsed {len(canonical_states)} flight states.")
else:
module_logger.info(f"{self.name}: API returned no flight states ('states' is null or empty).")
return {"data": canonical_states}
except requests.exceptions.HTTPError as http_err:
self._consecutive_api_errors += 1; self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
status_code = http_err.response.status_code if http_err.response else 'N/A'
err_msg = f"HTTP error {status_code}: {http_err}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.error(f"{self.name}: {err_msg}", exc_info=False)
return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": status_code, "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors}
except requests.exceptions.Timeout:
self._consecutive_api_errors += 1; self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
err_msg = f"API request timed out. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.error(f"{self.name}: {err_msg}", exc_info=False)
return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "TIMEOUT", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors}
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
self._consecutive_api_errors += 1; self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
err_msg = f"Request/JSON error: {e}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.error(f"{self.name}: {err_msg}", exc_info=True)
return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "REQUEST_JSON_ERROR", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors}
except Exception as e:
self._consecutive_api_errors += 1; self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
err_msg = f"Unexpected critical error: {e}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.critical(f"{self.name}: {err_msg}", exc_info=True)
return {"error_type": STATUS_API_ERROR_TEMPORARY, "status_code": "UNEXPECTED_ADAPTER_ERROR", "message": err_msg, "delay": delay, "consecutive_errors": self._consecutive_api_errors}
def _calculate_next_backoff_delay(self, provided_retry_after: Optional[str] = None) -> float:
"""Calculates exponential backoff delay, respecting Retry-After header if provided."""
calculated_delay = INITIAL_BACKOFF_DELAY_SECONDS * (BACKOFF_FACTOR ** (self._consecutive_api_errors - 1))
if provided_retry_after and provided_retry_after.isdigit():
api_delay = float(provided_retry_after)
self._current_backoff_delay = max(api_delay, calculated_delay) # Respect API if it asks for longer
else:
self._current_backoff_delay = calculated_delay
self._current_backoff_delay = min(self._current_backoff_delay, MAX_BACKOFF_DELAY_SECONDS)
self._current_backoff_delay = max(self._current_backoff_delay, 1.0) # Ensure at least 1s delay
return self._current_backoff_delay
def _reset_error_state(self):
"""Resets error counters and backoff mode, sends RECOVERED status if was in error."""
if self._in_backoff_mode or self._consecutive_api_errors > 0:
module_logger.info(f"{self.name}: API connection successful after {self._consecutive_api_errors} error(s). Resetting backoff.")
self._send_status_to_queue(STATUS_RECOVERED, "API connection recovered.")
self._consecutive_api_errors = 0
self._current_backoff_delay = 0.0
self._in_backoff_mode = False
def run(self):
initial_settle_delay_seconds = 0.2
try:
if self._stop_event.wait(timeout=initial_settle_delay_seconds):
# module_logger.info(f"{self.name} thread received stop signal during initial settle delay. Terminating early.")
print(f"DEBUG_ADAPTER ({self.name}): Thread received stop signal during initial settle delay. Terminating early.", flush=True)
return
except Exception as e:
# module_logger.error(f"{self.name} unexpected error during initial settle delay: {e}", exc_info=True)
print(f"DEBUG_ADAPTER ({self.name}): ERROR - Unexpected error during initial settle delay: {e}", flush=True)
return
# module_logger.info(f"{self.name} thread started (after {initial_settle_delay_seconds}s delay). Base polling interval: {self.base_polling_interval:.1f}s.")
print(f"DEBUG_ADAPTER ({self.name}): Thread started (after {initial_settle_delay_seconds}s delay). Base polling interval: {self.base_polling_interval:.1f}s.", flush=True)
if not self._stop_event.is_set():
self._send_status_to_queue(STATUS_STARTING, "Adapter thread started, preparing initial fetch.")
else:
# module_logger.info(f"{self.name}: Stop event was set before sending initial STATUS_STARTING.")
print(f"DEBUG_ADAPTER ({self.name}): Stop event was set before sending initial STATUS_STARTING.", flush=True)
# --- Loop principale dell'adapter ---
while not self._stop_event.is_set():
if self._consecutive_api_errors >= MAX_CONSECUTIVE_ERRORS_THRESHOLD:
perm_fail_msg = f"Reached max ({self._consecutive_api_errors}) consecutive API errors. Stopping live updates."
# module_logger.critical(f"{self.name}: {perm_fail_msg}")
print(f"DEBUG_ADAPTER ({self.name}): CRITICAL - {perm_fail_msg}", flush=True)
if not self._stop_event.is_set():
self._send_status_to_queue(STATUS_PERMANENT_FAILURE, perm_fail_msg)
break
api_result = self._perform_api_request()
if self._stop_event.is_set():
print(f"DEBUG_ADAPTER ({self.name}): Stop event detected after API request. Exiting loop.", flush=True)
break
if "data" in api_result:
flight_data_payload: List[CanonicalFlightState] = api_result["data"]
try:
if not self._stop_event.is_set():
self.output_queue.put_nowait({
"type": MSG_TYPE_FLIGHT_DATA,
"payload": flight_data_payload
})
print(f"DEBUG_ADAPTER ({self.name}): Sent {len(flight_data_payload)} flight states to queue.", flush=True)
# else:
# print(f"DEBUG_ADAPTER ({self.name}: Stop event set, not sending {len(flight_data_payload)} flight states to queue.", flush=True)
except QueueFull: # Dovrebbe essere importato 'from queue import Full as QueueFull'
module_logger.warning(f"{self.name}: Output queue full. Discarding {len(flight_data_payload)} flight states.") # Logger OK qui
except Exception as e:
module_logger.error(f"{self.name}: Error putting flight data into queue: {e}", exc_info=True) # Logger OK qui
elif "error_type" in api_result:
error_details_for_controller = api_result.copy()
error_details_for_controller["type"] = MSG_TYPE_ADAPTER_STATUS
error_details_for_controller["status_code"] = api_result["error_type"]
try:
if not self._stop_event.is_set():
self.output_queue.put_nowait(error_details_for_controller)
# else:
# print(f"DEBUG_ADAPTER ({self.name}: Stop event set, not sending error status to queue: {api_result['error_type']}", flush=True)
except QueueFull: # Dovrebbe essere importato 'from queue import Full as QueueFull'
module_logger.warning(f"{self.name}: Output queue full. Discarding error status: {api_result['error_type']}") # Logger OK qui
except Exception as e:
module_logger.error(f"{self.name}: Error putting error status into queue: {e}", exc_info=True) # Logger OK qui
else:
# Gestito dalla logica mock o un errore reale
print(f"DEBUG_ADAPTER ({self.name}): Unknown result structure from _perform_api_request: {api_result}", flush=True)
if self._stop_event.is_set():
print(f"DEBUG_ADAPTER ({self.name}): Stop event detected before waiting. Exiting loop.", flush=True)
break
time_to_wait_seconds: float
if self._in_backoff_mode:
time_to_wait_seconds = self._current_backoff_delay
print(f"DEBUG_ADAPTER ({self.name}): In backoff, next attempt in {time_to_wait_seconds:.1f}s.", flush=True)
else:
time_to_wait_seconds = self.base_polling_interval
print(f"DEBUG_ADAPTER ({self.name}): Next fetch cycle in {time_to_wait_seconds:.1f}s.", flush=True)
if self._stop_event.wait(timeout=time_to_wait_seconds):
print(f"DEBUG_ADAPTER ({self.name}): Stop event received during wait period. Exiting loop.", flush=True)
break
# --- Fine Loop principale dell'adapter ---
# --- INIZIO PARTE SUPER SEMPLIFICATA PER DEBUG ---
print(f"DEBUG_ADAPTER_FINAL ({self.name}): REACHED END OF WHILE LOOP. About to attempt final put.", flush=True)
try:
# Tentativo di inviare un messaggio molto semplice alla coda
print(f"DEBUG_ADAPTER_FINAL ({self.name}): Attempting very simple put to output_queue.", flush=True)
self.output_queue.put({"type": "ADAPTER_TERMINATING_DEBUG", "message": "Adapter run method ending"}, timeout=0.5)
print(f"DEBUG_ADAPTER_FINAL ({self.name}): Simple put to output_queue SUCCEEDED or TIMED OUT without blocking.", flush=True)
except Exception as e: # Cattura qualsiasi eccezione dal put, inclusa QueueFull se timeout scade su coda piena con maxsize
print(f"DEBUG_ADAPTER_FINAL ({self.name}): EXCEPTION during simple put: {type(e).__name__} - {e}", flush=True)
print(f"DEBUG_ADAPTER_FINAL ({self.name}): RUN METHOD IS TERMINATING NOW. THIS IS THE ABSOLUTE LAST PRINT.", flush=True)
# NESSUN'ALTRA OPERAZIONE QUI
# Il thread termina implicitamente qui quando il metodo run() esce.
# --- FINE PARTE SUPER SEMPLIFICATA PER DEBUG ---
# Il thread termina implicitamente qui quando il metodo run() esce.