SXXXXXXX_FlightMonitor/flightmonitor/data/opensky_live_adapter.py
2025-06-04 13:25:59 +02:00

524 lines
24 KiB
Python

# FlightMonitor/data/opensky_live_adapter.py
"""
Adapter for fetching live flight data from the OpenSky Network API using the official opensky-api library.
This adapter polls the API periodically, transforms the raw data into
CanonicalFlightState objects, and puts structured messages into an output queue.
"""
import requests
import time
import threading
from queue import (
Queue,
Empty as QueueEmpty,
Full as QueueFull,
)
import random
from typing import Dict, Any, List, Tuple, Optional
# MODIFICATO: Importazioni per la libreria opensky-api
# PERCHÉ: Utilizzeremo la libreria ufficiale per interagire con l'API OpenSky.
# DOVE: Inizio del file, sezione importazioni.
# COME: Aggiunte le importazioni necessarie dalla libreria opensky_api.
from opensky_api import OpenSkyApi
from . import config as app_config # Assicurati che config sia importato per accedere alle credenziali
from ..utils.logger import get_logger
from .common_models import CanonicalFlightState
module_logger = get_logger(__name__)
# --- Adapter Specific Constants ---
PROVIDER_NAME = "OpenSkyNetwork"
INITIAL_BACKOFF_DELAY_SECONDS = 20.0
MAX_BACKOFF_DELAY_SECONDS = 300.0
BACKOFF_FACTOR = 1.8
MAX_CONSECUTIVE_ERRORS_THRESHOLD = 5
# --- Message Types and Status Codes for Output Queue ---
AdapterMessage = Dict[str, Any]
FlightDataPayload = List[CanonicalFlightState]
MSG_TYPE_FLIGHT_DATA = "flight_data"
MSG_TYPE_ADAPTER_STATUS = "adapter_status"
STATUS_STARTING = "STARTING"
STATUS_FETCHING = "FETCHING"
STATUS_RECOVERED = "RECOVERED"
STATUS_RATE_LIMITED = "RATE_LIMITED"
STATUS_API_ERROR_TEMPORARY = "API_ERROR_TEMPORARY"
STATUS_PERMANENT_FAILURE = "PERMANENT_FAILURE"
STATUS_STOPPING = "STOPPING"
STATUS_STOPPED = "STOPPED"
class OpenSkyLiveAdapter(threading.Thread):
"""
Polls the OpenSky Network API using the opensky-api library,
converts data to CanonicalFlightState, and sends structured messages
(flight data or status updates) to an output queue.
Implements exponential backoff for API errors not handled by the library.
"""
def __init__(
self,
output_queue: Queue[AdapterMessage],
bounding_box: Dict[
str, float
],
polling_interval: int = app_config.LIVE_POLLING_INTERVAL_SECONDS,
daemon: bool = True,
):
thread_name = f"OpenSkyLibAdapter-bbox-{bounding_box.get('lat_min',0):.1f}"
super().__init__(daemon=daemon, name=thread_name)
if output_queue is None:
module_logger.critical(
f"{self.name}: Output queue cannot be None during initialization."
)
raise ValueError("Output queue must be provided to OpenSkyLiveAdapter.")
self.output_queue = output_queue
self.bounding_box_dict = bounding_box
self.base_polling_interval = float(polling_interval)
self._stop_event = threading.Event()
self._consecutive_api_errors: int = 0
self._current_backoff_delay: float = 0.0
self._in_backoff_mode: bool = False
# MODIFICATO: Inizializzazione del client opensky-api con gestione credenziali/flag
# PERCHÉ: Permette di usare credenziali se fornite e la flag USE_OPENSKY_CREDENTIALS è True.
# DOVE: Nel metodo __init__.
# COME: Controllo della flag e delle credenziali per inizializzare OpenSkyApi.
try:
username = app_config.OPENSKY_USERNAME
password = app_config.OPENSKY_PASSWORD
use_credentials_flag = getattr(app_config, "USE_OPENSKY_CREDENTIALS", False) # Default a False per retrocompatibilità
module_logger.info(f"{self.name}: DEBUG - Inizializzazione OpenSkyApi:")
module_logger.info(f"{self.name}: DEBUG - USE_OPENSKY_CREDENTIALS flag: {use_credentials_flag}")
module_logger.info(f"{self.name}: DEBUG - Username from config: {'Presente' if username else 'Assente'}")
module_logger.info(f"{self.name}: DEBUG - Password from config: {'Presente' if password else 'Assente'}")
if use_credentials_flag and username and password:
self.api_client = OpenSkyApi(username=username, password=password)
module_logger.info(f"{self.name}: OpenSkyApi client initialized with provided credentials.")
else:
self.api_client = OpenSkyApi()
log_msg = f"{self.name}: OpenSkyApi client initialized without credentials (anonymous access). "
if use_credentials_flag:
log_msg += "USE_OPENSKY_CREDENTIALS is True but credentials not found/provided."
else:
log_msg += "USE_OPENSKY_CREDENTIALS is False."
module_logger.warning(log_msg)
except Exception as e:
module_logger.critical(
f"{self.name}: Failed to initialize OpenSkyApi client: {e}",
exc_info=True,
)
raise RuntimeError(f"Failed to initialize OpenSkyApi client: {e}") from e
module_logger.debug(
f"{self.name} initialized. BBox (dict): {self.bounding_box_dict}, "
f"Base Interval: {self.base_polling_interval}s"
)
def _generate_mock_flight_state(self, icao_suffix: int) -> CanonicalFlightState:
"""Helper to generate a single mock CanonicalFlightState."""
now = time.time()
lat_center = (
self.bounding_box_dict["lat_min"] + self.bounding_box_dict["lat_max"]
) / 2
lon_center = (
self.bounding_box_dict["lon_min"] + self.bounding_box_dict["lon_max"]
) / 2
lat_span = self.bounding_box_dict["lat_max"] - self.bounding_box_dict["lat_min"]
lon_span = self.bounding_box_dict["lon_max"] - self.bounding_box_dict["lon_min"]
return CanonicalFlightState(
icao24=f"mock{icao_suffix:02x}",
callsign=f"MOCK{icao_suffix:02X}",
origin_country="Mockland",
timestamp=now,
last_contact_timestamp=now,
latitude=round(
lat_center + random.uniform(-lat_span / 2.1, lat_span / 2.1), 4
),
longitude=round(
lon_center + random.uniform(-lon_span / 2.1, lon_span / 2.1), 4
),
baro_altitude_m=random.uniform(1000, 12000),
geo_altitude_m=random.uniform(1000, 12000)
+ random.uniform(-200, 200),
on_ground=random.choice([True, False]),
velocity_mps=random.uniform(50, 250),
true_track_deg=random.uniform(0, 360),
vertical_rate_mps=random.uniform(-10, 10),
squawk=str(random.randint(1000, 7777)),
spi=random.choice([True, False]),
position_source=0,
raw_data_provider=f"{PROVIDER_NAME}-Mock",
)
def _send_status_to_queue(
self, status_code: str, message: str, details: Optional[Dict] = None
):
status_payload = {
"type": MSG_TYPE_ADAPTER_STATUS,
"status_code": status_code,
"message": f"{self.name}: {message}",
}
if details:
status_payload["details"] = details
try:
self.output_queue.put_nowait(status_payload)
module_logger.debug(
f"{self.name}: Sent status '{status_code}' to queue. Msg: {message}"
)
except QueueFull:
module_logger.warning(
f"{self.name}: Output queue full. Could not send status: {status_code}"
)
except Exception as e:
module_logger.error(
f"{self.name}: Error sending status '{status_code}' to queue: {e}",
exc_info=True,
)
def stop(self):
module_logger.info(
f"Stop signal received for {self.name}. Signaling stop event."
)
self._stop_event.set()
def _convert_state_vector_to_canonical(
self, sv: Any
) -> Optional[CanonicalFlightState]:
"""Converts an OpenSkyApi StateVector object to a CanonicalFlightState object."""
try:
if sv.icao24 is None:
module_logger.warning(f"Skipping StateVector with None icao24: {sv}")
return None
primary_ts = (
sv.time_position if sv.time_position is not None else sv.last_contact
)
last_contact_ts = sv.last_contact
if primary_ts is None:
primary_ts = time.time()
module_logger.warning(
f"ICAO {sv.icao24}: Using current time as primary_timestamp due to missing API timestamps (last_contact={sv.last_contact}, time_position={sv.time_position})."
)
if last_contact_ts is None:
last_contact_ts = primary_ts
return CanonicalFlightState(
icao24=sv.icao24,
callsign=(
sv.callsign.strip() if sv.callsign else None
),
origin_country=sv.origin_country,
timestamp=float(primary_ts),
last_contact_timestamp=float(last_contact_ts),
latitude=sv.latitude,
longitude=sv.longitude,
baro_altitude_m=sv.baro_altitude,
on_ground=sv.on_ground,
velocity_mps=sv.velocity,
true_track_deg=sv.true_track,
vertical_rate_mps=sv.vertical_rate,
geo_altitude_m=sv.geo_altitude,
squawk=sv.squawk,
spi=sv.spi,
position_source=(
str(sv.position_source) if sv.position_source is not None else None
),
raw_data_provider=PROVIDER_NAME,
)
except AttributeError as e_attr:
module_logger.error(
f"AttributeError parsing StateVector: {e_attr}. Vector: {sv}",
exc_info=False,
)
return None
except Exception as e:
module_logger.error(
f"Error converting StateVector for ICAO '{getattr(sv, 'icao24', 'UNKNOWN')}': {e}. Vector: {sv}",
exc_info=True,
)
return None
def _perform_api_request(self) -> Dict[str, Any]:
"""
Performs API request using opensky-api library or generates mock data.
Returns a structured result dictionary.
"""
if app_config.USE_MOCK_OPENSKY_API:
module_logger.info(f"{self.name}: Using MOCK API data as per configuration.")
self._send_status_to_queue(STATUS_FETCHING, "Generating mock flight data...")
if app_config.MOCK_API_ERROR_SIMULATION == "RATE_LIMITED":
self._consecutive_api_errors += 1
self._in_backoff_mode = True
mock_retry_after = str(getattr(app_config, "MOCK_RETRY_AFTER_SECONDS", 60))
delay = self._calculate_next_backoff_delay(provided_retry_after=mock_retry_after)
err_msg = f"MOCK: Rate limited. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.warning(f"{self.name}: {err_msg}")
return {
"error_type": STATUS_RATE_LIMITED,
"delay": delay,
"message": err_msg,
"consecutive_errors": self._consecutive_api_errors,
"status_code": 429,
}
if app_config.MOCK_API_ERROR_SIMULATION == "HTTP_ERROR":
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
mock_status_code = getattr(app_config, "MOCK_HTTP_ERROR_STATUS", 500)
err_msg = f"MOCK: HTTP error {mock_status_code}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.error(f"{self.name}: {err_msg}", exc_info=False)
return {
"error_type": STATUS_API_ERROR_TEMPORARY,
"status_code": mock_status_code,
"message": err_msg,
"delay": delay,
"consecutive_errors": self._consecutive_api_errors,
}
mock_states: List[CanonicalFlightState] = []
mock_count = getattr(app_config, "MOCK_API_FLIGHT_COUNT", 5)
for i in range(mock_count):
mock_states.append(self._generate_mock_flight_state(i + 1))
module_logger.info(f"{self.name}: Generated {len(mock_states)} mock flight states.")
self._reset_error_state()
return {"data": mock_states}
if not self.bounding_box_dict:
err_msg = "Bounding box not set for REAL API request."
module_logger.error(f"{self.name}: {err_msg}")
return {
"error_type": STATUS_PERMANENT_FAILURE,
"message": err_msg,
"status_code": "NO_BBOX",
"delay": 0.0,
"consecutive_errors": self._consecutive_api_errors,
}
try:
api_bbox_tuple = (
self.bounding_box_dict["lat_min"],
self.bounding_box_dict["lat_max"],
self.bounding_box_dict["lon_min"],
self.bounding_box_dict["lon_max"],
)
except KeyError as e_key:
err_msg = f"Missing key in bounding_box_dict: {e_key}"
module_logger.error(f"{self.name}: {err_msg}")
return {
"error_type": STATUS_PERMANENT_FAILURE,
"message": err_msg,
"status_code": "INVALID_BBOX_FORMAT",
"delay": 0.0,
"consecutive_errors": self._consecutive_api_errors,
}
self._send_status_to_queue(STATUS_FETCHING, f"Requesting REAL data for bbox (tuple): {api_bbox_tuple}")
try:
states_response = self.api_client.get_states(bbox=api_bbox_tuple)
self._reset_error_state()
canonical_states: List[CanonicalFlightState] = []
if states_response and states_response.states:
for sv in states_response.states:
canonical_sv = self._convert_state_vector_to_canonical(sv)
if canonical_sv:
canonical_states.append(canonical_sv)
module_logger.info(f"{self.name}: Fetched and parsed {len(canonical_states)} flight states via opensky-api.")
else:
module_logger.info(f"{self.name}: API returned no flight states (states_response or .states is null/empty).")
return {"data": canonical_states}
except (
requests.exceptions.HTTPError
) as http_err:
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
status_code = http_err.response.status_code if http_err.response else "N/A"
if status_code == 429:
err_msg = f"UNEXPECTED Rate limit (429) propagated by library. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.error(f"{self.name}: {err_msg}")
return {
"error_type": STATUS_RATE_LIMITED,
"delay": delay,
"message": err_msg,
"consecutive_errors": self._consecutive_api_errors,
"status_code": 429,
}
else:
err_msg = f"HTTP error {status_code} from opensky-api: {http_err}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.error(f"{self.name}: {err_msg}", exc_info=False)
return {
"error_type": STATUS_API_ERROR_TEMPORARY,
"status_code": status_code,
"message": err_msg,
"delay": delay,
"consecutive_errors": self._consecutive_api_errors,
}
except requests.exceptions.Timeout:
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
err_msg = f"API request timed out (via opensky-api). Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.error(f"{self.name}: {err_msg}", exc_info=False)
return {
"error_type": STATUS_API_ERROR_TEMPORARY,
"status_code": "TIMEOUT",
"message": err_msg,
"delay": delay,
"consecutive_errors": self._consecutive_api_errors,
}
except (
requests.exceptions.RequestException
) as req_err:
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
err_msg = f"Request error via opensky-api: {req_err}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.error(f"{self.name}: {err_msg}", exc_info=True)
return {
"error_type": STATUS_API_ERROR_TEMPORARY,
"status_code": "REQUEST_EXCEPTION",
"message": err_msg,
"delay": delay,
"consecutive_errors": self._consecutive_api_errors,
}
except Exception as e:
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
err_msg = f"Unexpected critical error using opensky-api: {e}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.critical(f"{self.name}: {err_msg}", exc_info=True)
return {
"error_type": STATUS_API_ERROR_TEMPORARY,
"status_code": "UNEXPECTED_LIBRARY_ERROR",
"message": err_msg,
"delay": delay,
"consecutive_errors": self._consecutive_api_errors,
}
def _calculate_next_backoff_delay(
self,
provided_retry_after: Optional[str] = None,
) -> float:
calculated_delay = INITIAL_BACKOFF_DELAY_SECONDS * (
BACKOFF_FACTOR ** (self._consecutive_api_errors - 1)
)
api_delay_from_header = 0.0
if provided_retry_after is not None and provided_retry_after.isdigit():
try:
api_delay_from_header = float(provided_retry_after)
module_logger.debug(f"{self.name}: Found Retry-After header value: {provided_retry_after}s")
except ValueError:
module_logger.warning(f"{self.name}: Could not parse Retry-After header value: '{provided_retry_after}'")
api_delay_from_header = 0.0
self._current_backoff_delay = max(api_delay_from_header, calculated_delay)
self._current_backoff_delay = min(self._current_backoff_delay, MAX_BACKOFF_DELAY_SECONDS)
self._current_backoff_delay = max(self._current_backoff_delay, 1.0)
module_logger.debug(f"{self.name}: Calculated next backoff delay: {self._current_backoff_delay:.1f}s")
return self._current_backoff_delay
def _reset_error_state(self):
if self._in_backoff_mode or self._consecutive_api_errors > 0:
module_logger.info(f"{self.name}: API connection successful after {self._consecutive_api_errors} error(s). Resetting backoff.")
self._send_status_to_queue(STATUS_RECOVERED, "API connection recovered.")
self._consecutive_api_errors = 0
self._current_backoff_delay = 0.0
self._in_backoff_mode = False
def run(self):
initial_settle_delay_seconds = 0.1
module_logger.debug(f"{self.name} thread starting initial settle delay ({initial_settle_delay_seconds}s)...")
if self._stop_event.wait(timeout=initial_settle_delay_seconds):
module_logger.info(f"{self.name} thread received stop signal during initial settle. Terminating.")
return
module_logger.info(f"{self.name} thread is fully operational. Base polling interval: {self.base_polling_interval:.1f}s.")
self._send_status_to_queue(STATUS_STARTING, "Adapter thread started, preparing initial fetch.")
while not self._stop_event.is_set():
if self._consecutive_api_errors >= MAX_CONSECUTIVE_ERRORS_THRESHOLD:
perm_fail_msg = f"Reached max ({self._consecutive_api_errors}) consecutive API errors. Stopping live updates."
module_logger.critical(f"{self.name}: {perm_fail_msg}")
self._send_status_to_queue(STATUS_PERMANENT_FAILURE, perm_fail_msg)
break
api_result = self._perform_api_request()
if self._stop_event.is_set():
module_logger.info(f"{self.name}: Stop event detected after API request. Exiting loop.")
break
if "data" in api_result:
flight_data_payload: List[CanonicalFlightState] = api_result["data"]
try:
self.output_queue.put_nowait({"type": MSG_TYPE_FLIGHT_DATA, "payload": flight_data_payload})
module_logger.debug(f"{self.name}: Sent {len(flight_data_payload)} flight states to queue.")
except QueueFull:
module_logger.warning(f"{self.name}: Output queue full. Discarding {len(flight_data_payload)} flight states.")
except Exception as e:
module_logger.error(f"{self.name}: Error putting flight data into queue: {e}", exc_info=True)
elif ("error_type" in api_result):
error_details_for_controller = api_result.copy()
error_details_for_controller["type"] = MSG_TYPE_ADAPTER_STATUS
self._send_status_to_queue(
status_code=api_result["error_type"],
message=api_result.get("message", "An API error occurred."),
details=api_result,
)
else:
module_logger.error(f"{self.name}: Unknown result structure from _perform_api_request: {api_result}")
time_to_wait_seconds: float
if self._in_backoff_mode:
time_to_wait_seconds = self._current_backoff_delay
module_logger.debug(f"{self.name}: In backoff, next attempt in {time_to_wait_seconds:.1f}s.")
else:
time_to_wait_seconds = self.base_polling_interval
module_logger.debug(f"{self.name}: Next fetch cycle in {time_to_wait_seconds:.1f}s.")
if time_to_wait_seconds > 0:
if self._stop_event.wait(timeout=time_to_wait_seconds):
print(f"{self.name}: Stop event received during wait period. Exiting loop.")
break
else:
if self._stop_event.is_set():
print(f"{self.name}: Stop event detected before waiting (wait time <= 0). Exiting loop.")
break
print(f"{self.name}: Exited main loop. Sending final STOPPED status.")
try:
final_status_payload = {
"type": MSG_TYPE_ADAPTER_STATUS,
"status_code": STATUS_STOPPED,
"message": f"{self.name}: Adapter thread stopped.",
}
self.output_queue.put_nowait(final_status_payload)
print(f"{self.name}: Successfully sent final STATUS_STOPPED to queue.")
except QueueFull:
print(f"{self.name}: Output queue full. Could not send final STATUS_STOPPED message.")
except Exception as e_final_put:
print(f"{self.name}: Error sending final STATUS_STOPPED message: {e_final_put}", exc_info=True)
print(f"{self.name}: RUN method is terminating now.")