SXXXXXXX_FlightMonitor/flightmonitor/data/adsb_exchange_live_adapter.py

476 lines
25 KiB
Python

# FlightMonitor/data/adsb_exchange_live_adapter.py
"""
Adapter for fetching live flight data from the ADS-B Exchange API.
This adapter polls the API periodically, transforms the raw data into
CanonicalFlightState objects, and puts structured messages into an output queue.
"""
import requests
import time
import threading
from queue import Queue, Empty as QueueEmpty, Full as QueueFull
import random # For mock data generation
from typing import Dict, Any, List, Optional, Tuple
from flightmonitor.utils.logger import get_logger
from flightmonitor.data.common_models import CanonicalFlightState
# MODIFICATO: Importa la classe base e le costanti di messaggio/stato
# PERCHÉ: Per conformarsi all'interfaccia dell'adapter e utilizzare le costanti di stato comuni.
# DOVE: Inizio del file.
# COME: Aggiunta l'importazione.
from .base_live_data_adapter import BaseLiveDataAdapter, AdapterMessage
from .opensky_live_adapter import ( # Importa le costanti di stato dall'opensky_live_adapter per coerenza
MSG_TYPE_FLIGHT_DATA,
MSG_TYPE_ADAPTER_STATUS,
STATUS_STARTING,
STATUS_FETCHING,
STATUS_RECOVERED,
STATUS_RATE_LIMITED,
STATUS_API_ERROR_TEMPORARY,
STATUS_PERMANENT_FAILURE,
STATUS_STOPPED,
)
from . import config as app_config # Per accedere alle configurazioni ADSBx
module_logger = get_logger(__name__)
# --- ADSB Exchange Specific Constants ---
PROVIDER_NAME = "ADSBExchange"
ADSBEXCHANGE_API_TIMEOUT_SECONDS = getattr(app_config, "DEFAULT_API_TIMEOUT_SECONDS", 15)
ADSBEXCHANGE_API_KEY = getattr(app_config, "ADSBEXCHANGE_API_KEY", None)
ADSBEXCHANGE_BASE_URL = getattr(app_config, "ADSBEXCHANGE_API_URL", "https://adsbexchange.com/api/aircraft/v2/lat/{lat_min}/lon/{lon_min}/latmax/{lat_max}/lonmax/{lon_max}/") # Default URL, check their docs
# Backoff constants (can be shared or specific)
INITIAL_BACKOFF_DELAY_SECONDS = 20.0
MAX_BACKOFF_DELAY_SECONDS = 300.0
BACKOFF_FACTOR = 1.8
MAX_CONSECUTIVE_ERRORS_THRESHOLD = 5
# MODIFICATO: ADSBExchangeLiveAdapter ora eredita da BaseLiveDataAdapter
# PERCHÉ: Per conformarsi all'interfaccia comune e riutilizzare la logica base del thread.
# DOVE: Dichiarazione della classe.
# COME: Eredita da BaseLiveDataAdapter.
class ADSBExchangeLiveAdapter(BaseLiveDataAdapter):
"""
Adapter for fetching live flight data from the ADS-B Exchange API.
Conforms to the BaseLiveDataAdapter interface.
"""
def __init__(
self,
output_queue: Queue[AdapterMessage],
bounding_box: Dict[str, float],
polling_interval: int = app_config.LIVE_POLLING_INTERVAL_SECONDS,
daemon: bool = True,
):
thread_name = f"ADSBXAdapter-bbox-{bounding_box.get('lat_min',0):.1f}"
super().__init__(output_queue, bounding_box, float(polling_interval), daemon, name=thread_name)
if not ADSBEXCHANGE_API_KEY:
module_logger.critical(f"{self.name}: ADS-B Exchange API Key is missing in config. Cannot initialize adapter.")
raise ValueError("ADS-B Exchange API Key is required for ADSBExchangeLiveAdapter.")
self.api_key = ADSBEXCHANGE_API_KEY
self.base_api_url = ADSBEXCHANGE_BASE_URL
self.api_timeout = ADSBEXCHANGE_API_TIMEOUT_SECONDS
self._consecutive_api_errors: int = 0
self._current_backoff_delay: float = 0.0
self._in_backoff_mode: bool = False
module_logger.debug(
f"{self.name} initialized. API Key: {'Present' if self.api_key else 'Missing'}, "
f"BBox (dict): {self.bounding_box}, Base Interval: {self.polling_interval}s"
)
def _send_status_to_queue(
self, status_code: str, message: str, details: Optional[Dict] = None
):
"""Helper to send status messages to the output queue."""
status_payload = {
"type": MSG_TYPE_ADAPTER_STATUS,
"status_code": status_code,
"message": f"{self.name}: {message}",
}
if details:
status_payload["details"] = details
try:
self.output_queue.put_nowait(status_payload)
module_logger.debug(f"{self.name}: Sent status '{status_code}' to queue. Msg: {message}")
except QueueFull:
module_logger.warning(f"{self.name}: Output queue full. Could not send status: {status_code}")
except Exception as e:
module_logger.error(f"{self.name}: Error sending status '{status_code}' to queue: {e}", exc_info=True)
def _generate_mock_flight_state(self, icao_suffix: int) -> CanonicalFlightState:
"""Helper to generate a single mock CanonicalFlightState for testing."""
now = time.time()
lat_center = (self.bounding_box["lat_min"] + self.bounding_box["lat_max"]) / 2
lon_center = (self.bounding_box["lon_min"] + self.bounding_box["lon_max"]) / 2
lat_span = self.bounding_box["lat_max"] - self.bounding_box["lat_min"]
lon_span = self.bounding_box["lon_max"] - self.bounding_box["lon_min"]
return CanonicalFlightState(
icao24=f"adsbx{icao_suffix:02x}",
callsign=f"ADBX{icao_suffix:02X}",
origin_country="ADSBXland",
timestamp=now,
last_contact_timestamp=now,
latitude=round(lat_center + random.uniform(-lat_span / 2.1, lat_span / 2.1), 4),
longitude=round(lon_center + random.uniform(-lon_span / 2.1, lon_span / 2.1), 4),
baro_altitude_m=random.uniform(1000, 12000),
geo_altitude_m=random.uniform(1000, 12000) + random.uniform(-200, 200),
on_ground=random.choice([True, False]),
velocity_mps=random.uniform(50, 250),
true_track_deg=random.uniform(0, 360),
vertical_rate_mps=random.uniform(-10, 10),
squawk=str(random.randint(1000, 7777)),
spi=random.choice([True, False]),
position_source=0, # Assuming ADS-B
raw_data_provider=f"{PROVIDER_NAME}-Mock",
)
def _convert_adsbx_state_to_canonical(self, raw_state: Dict[str, Any]) -> Optional[CanonicalFlightState]:
"""
Converts a raw ADS-B Exchange state dictionary into a CanonicalFlightState object.
NOTE: This mapping is based on common ADS-B Exchange API response fields.
You may need to adjust field names based on the exact API version/endpoint you use.
"""
try:
# ADSBx fields can vary, so use .get() with defaults for robustness.
# Some fields might be missing or null.
icao24 = raw_state.get("icao") or raw_state.get("hex")
if not icao24:
module_logger.warning(f"{self.name}: Skipping ADSBx state with missing ICAO24: {raw_state}")
return None
# Timestamps are often epoch seconds in ADSBx.
# "alt_baro" is barometric altitude in feet (convert to meters).
# "gs" is ground speed in knots (convert to m/s).
# "track" is true track in degrees.
# "alt_geom" is geometric altitude in feet (convert to meters).
# "nic" or "rc" might indicate position quality/source.
baro_altitude_ft = raw_state.get("alt_baro")
geo_altitude_ft = raw_state.get("alt_geom")
velocity_knots = raw_state.get("gs")
vertical_rate_fpm = raw_state.get("vrt") # Vertical rate in feet per minute
# Conversions
baro_altitude_m = float(baro_altitude_ft) * 0.3048 if isinstance(baro_altitude_ft, (int, float)) else None
geo_altitude_m = float(geo_altitude_ft) * 0.3048 if isinstance(geo_altitude_ft, (int, float)) else None
velocity_mps = float(velocity_knots) * 0.514444 if isinstance(velocity_knots, (int, float)) else None
vertical_rate_mps = float(vertical_rate_fpm) * 0.00508 if isinstance(vertical_rate_fpm, (int, float)) else None
# last_contact_timestamp is often 'seen' or 'ti' (time last updated)
# ADSBx doesn't always provide a specific "time_position" like OpenSky, so last contact is often the best timestamp
timestamp = raw_state.get("seen") or raw_state.get("td") or time.time() # 'seen' is seconds since last update, 'td' total seconds since last update. Need to be careful.
# For simplicity, if 'seen' is seconds since update, then current_time - seen_value
# Better check ADSBx API doc for exact meaning of 'seen' and 'td' if they are not epoch timestamps directly.
# For this example, let's assume raw_state.get("mlat") or raw_state.get("tisb") as epoch for last_contact/timestamp.
# If ADSBx provides a timestamp directly (e.g. `now` field), use that. Assuming for now, 'td' is last update timestamp.
# Let's use `now` field if available, or current time, for simplicity.
# For now, let's assume 'squawk' and 'flight' (callsign) are strings directly.
# `r` is registration, `t` is type code, `op` is operator.
# Common ADSBx API fields
canonical_data = CanonicalFlightState(
icao24=icao24.lower().strip(),
callsign=raw_state.get("flight", "").strip() or None,
origin_country=raw_state.get("reg_country", "Unknown"), # ADSBx might provide origin country
timestamp=raw_state.get("time") or raw_state.get("now") or time.time(), # API call time or last update time
last_contact_timestamp=raw_state.get("time") or raw_state.get("now") or time.time(), # Assuming same as timestamp for simplicity
latitude=raw_state.get("lat"),
longitude=raw_state.get("lon"),
baro_altitude_m=baro_altitude_m,
geo_altitude_m=geo_altitude_m,
on_ground=raw_state.get("gnd", 0) == 1, # 'gnd': 1 if on ground, 0 if airborne
velocity_mps=velocity_mps,
true_track_deg=raw_state.get("track"),
vertical_rate_mps=vertical_rate_mps,
squawk=str(raw_state.get("squawk")) if raw_state.get("squawk") else None,
spi=raw_state.get("spi", 0) == 1, # 'spi': 1 for SPI, 0 for normal
position_source=raw_state.get("nic"), # Or "rc" or "type" (e.g., "adsb_icao")
raw_data_provider=PROVIDER_NAME,
)
return canonical_data
except Exception as e:
module_logger.error(f"{self.name}: Error converting ADSBx state to CanonicalFlightState for ICAO '{raw_state.get('icao', raw_state.get('hex', 'UNKNOWN'))}': {e}. Raw state: {raw_state}", exc_info=True)
return None
def _perform_api_request(self) -> Dict[str, Any]:
"""
Performs the API request to ADS-B Exchange and returns a structured result.
Handles API errors and backoff logic.
"""
# --- MOCK API Logic (similar to OpenSkyLiveAdapter, for consistency) ---
if app_config.USE_MOCK_OPENSKY_API: # Re-use this flag for ADSBx mock as well
module_logger.info(f"{self.name}: Using MOCK API data as per configuration.")
self._send_status_to_queue(STATUS_FETCHING, "Generating mock flight data...")
if app_config.MOCK_API_ERROR_SIMULATION == "RATE_LIMITED":
self._consecutive_api_errors += 1
self._in_backoff_mode = True
mock_retry_after = str(getattr(app_config, "MOCK_RETRY_AFTER_SECONDS", 60))
delay = self._calculate_next_backoff_delay(provided_retry_after=mock_retry_after)
err_msg = f"MOCK: Rate limited. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.warning(f"{self.name}: {err_msg}")
return {
"error_type": STATUS_RATE_LIMITED,
"delay": delay,
"message": err_msg,
"consecutive_errors": self._consecutive_api_errors,
"status_code": 429,
}
if app_config.MOCK_API_ERROR_SIMULATION == "HTTP_ERROR":
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
mock_status_code = getattr(app_config, "MOCK_HTTP_ERROR_STATUS", 500)
err_msg = f"MOCK: HTTP error {mock_status_code}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.error(f"{self.name}: {err_msg}", exc_info=False)
return {
"error_type": STATUS_API_ERROR_TEMPORARY,
"status_code": mock_status_code,
"message": err_msg,
"delay": delay,
"consecutive_errors": self._consecutive_api_errors,
}
mock_states: List[CanonicalFlightState] = []
mock_count = getattr(app_config, "MOCK_API_FLIGHT_COUNT", 5)
for i in range(mock_count):
mock_states.append(self._generate_mock_flight_state(i + 1))
module_logger.info(f"{self.name}: Generated {len(mock_states)} mock flight states.")
self._reset_error_state()
return {"data": mock_states}
# --- END MOCK API Logic ---
# --- REAL API Call Logic for ADS-B Exchange ---
if not self.bounding_box:
err_msg = "Bounding box not set for REAL API request."
module_logger.error(f"{self.name}: {err_msg}")
return {
"error_type": STATUS_PERMANENT_FAILURE,
"message": err_msg,
"status_code": "NO_BBOX",
"delay": 0.0,
"consecutive_errors": self._consecutive_api_errors,
}
# Build the API URL with bounding box parameters
try:
api_url = self.base_api_url.format(
lat_min=self.bounding_box["lat_min"],
lon_min=self.bounding_box["lon_min"],
lat_max=self.bounding_box["lat_max"],
lon_max=self.bounding_box["lon_max"],
)
# Ensure the API key is passed as a header (check ADSBx API documentation for exact header name)
headers = {"api-key": self.api_key} # ADSBx typically uses 'api-key' or 'X-Auth-Token'
except KeyError as e_key:
err_msg = f"Missing key in bounding_box for URL formatting: {e_key}"
module_logger.error(f"{self.name}: {err_msg}")
return {
"error_type": STATUS_PERMANENT_FAILURE,
"message": err_msg,
"status_code": "INVALID_BBOX_FORMAT",
"delay": 0.0,
"consecutive_errors": self._consecutive_api_errors,
}
except Exception as e_url_format:
err_msg = f"Error formatting ADSBx API URL: {e_url_format}"
module_logger.error(f"{self.name}: {err_msg}")
return {
"error_type": STATUS_PERMANENT_FAILURE,
"message": err_msg,
"status_code": "URL_FORMAT_ERROR",
"delay": 0.0,
"consecutive_errors": self._consecutive_api_errors,
}
self._send_status_to_queue(STATUS_FETCHING, f"Requesting REAL data from ADSBx: {api_url[:100]}...")
try:
# Make the HTTP request
response = requests.get(api_url, headers=headers, timeout=self.api_timeout)
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
raw_adsbx_data = response.json()
# ADSBx API responses typically have a 'aircraft' key containing a list of aircraft states
aircraft_states_payload = raw_adsbx_data.get("aircraft", [])
canonical_states: List[CanonicalFlightState] = []
if aircraft_states_payload:
for raw_state in aircraft_states_payload:
canonical_sv = self._convert_adsbx_state_to_canonical(raw_state)
if canonical_sv:
canonical_states.append(canonical_sv)
module_logger.info(f"{self.name}: Fetched and parsed {len(canonical_states)} flight states via ADSBx API.")
else:
module_logger.info(f"{self.name}: ADSBx API returned no flight states (aircraft list is empty). Raw response: {raw_adsbx_data.keys()}")
self._reset_error_state() # Successful API call (or no data)
return {"data": canonical_states}
except requests.exceptions.HTTPError as http_err:
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
status_code = http_err.response.status_code if http_err.response else "N/A"
err_msg = f"HTTP error {status_code} from ADSBx API: {http_err}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.error(f"{self.name}: {err_msg}", exc_info=False)
return {
"error_type": STATUS_API_ERROR_TEMPORARY,
"status_code": status_code,
"message": err_msg,
"delay": delay,
"consecutive_errors": self._consecutive_api_errors,
}
except requests.exceptions.Timeout:
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
err_msg = f"ADSBx API request timed out. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.error(f"{self.name}: {err_msg}", exc_info=False)
return {
"error_type": STATUS_API_ERROR_TEMPORARY,
"status_code": "TIMEOUT",
"message": err_msg,
"delay": delay,
"consecutive_errors": self._consecutive_api_errors,
}
except requests.exceptions.RequestException as req_err:
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
err_msg = f"ADSBx Request error: {req_err}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.error(f"{self.name}: {err_msg}", exc_info=True)
return {
"error_type": STATUS_API_ERROR_TEMPORARY,
"status_code": "REQUEST_EXCEPTION",
"message": err_msg,
"delay": delay,
"consecutive_errors": self._consecutive_api_errors,
}
except Exception as e:
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
err_msg = f"ADSBx Unexpected critical error: {e}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.critical(f"{self.name}: {err_msg}", exc_info=True)
return {
"error_type": STATUS_API_ERROR_TEMPORARY,
"status_code": "UNEXPECTED_ERROR",
"message": err_msg,
"delay": delay,
"consecutive_errors": self._consecutive_api_errors,
}
def _calculate_next_backoff_delay(self, provided_retry_after: Optional[str] = None) -> float:
"""Calculates the delay for exponential backoff."""
calculated_delay = INITIAL_BACKOFF_DELAY_SECONDS * (BACKOFF_FACTOR ** (self._consecutive_api_errors - 1))
api_delay_from_header = 0.0
if provided_retry_after is not None and provided_retry_after.isdigit():
try:
api_delay_from_header = float(provided_retry_after)
module_logger.debug(f"{self.name}: Found Retry-After header value: {provided_retry_after}s")
except ValueError:
module_logger.warning(f"{self.name}: Could not parse Retry-After header value: '{provided_retry_after}'")
api_delay_from_header = 0.0
self._current_backoff_delay = max(api_delay_from_header, calculated_delay)
self._current_backoff_delay = min(self._current_backoff_delay, MAX_BACKOFF_DELAY_SECONDS)
self._current_backoff_delay = max(self._current_backoff_delay, 1.0)
module_logger.debug(f"{self.name}: Calculated next backoff delay: {self._current_backoff_delay:.1f}s")
return self._current_backoff_delay
def _reset_error_state(self):
"""Resets the error counter and backoff mode."""
if self._in_backoff_mode or self._consecutive_api_errors > 0:
module_logger.info(f"{self.name}: API connection successful after {self._consecutive_api_errors} error(s). Resetting backoff.")
self._send_status_to_queue(STATUS_RECOVERED, "API connection recovered.")
self._consecutive_api_errors = 0
self._current_backoff_delay = 0.0
self._in_backoff_mode = False
def run(self):
"""Main thread loop for the ADSBx adapter."""
initial_settle_delay_seconds = 0.1
module_logger.debug(f"{self.name} thread starting initial settle delay ({initial_settle_delay_seconds}s)...")
if self._stop_event.wait(timeout=initial_settle_delay_seconds):
module_logger.info(f"{self.name} thread received stop signal during initial settle. Terminating.")
return
module_logger.info(f"{self.name} thread is fully operational. Base polling interval: {self.polling_interval:.1f}s.")
self._send_status_to_queue(STATUS_STARTING, "Adapter thread started, preparing initial fetch.")
while not self._stop_event.is_set():
if self._consecutive_api_errors >= MAX_CONSECUTIVE_ERRORS_THRESHOLD:
perm_fail_msg = f"Reached max ({self._consecutive_api_errors}) consecutive API errors. Stopping live updates."
module_logger.critical(f"{self.name}: {perm_fail_msg}")
self._send_status_to_queue(STATUS_PERMANENT_FAILURE, perm_fail_msg)
break
api_result = self._perform_api_request()
if self._stop_event.is_set():
module_logger.info(f"{self.name}: Stop event detected after API request. Exiting loop.")
break
if "data" in api_result:
flight_data_payload: List[CanonicalFlightState] = api_result["data"]
try:
self.output_queue.put_nowait({"type": MSG_TYPE_FLIGHT_DATA, "payload": flight_data_payload})
module_logger.debug(f"{self.name}: Sent {len(flight_data_payload)} flight states to queue.")
except QueueFull:
module_logger.warning(f"{self.name}: Output queue full. Discarding {len(flight_data_payload)} flight states.")
except Exception as e:
module_logger.error(f"{self.name}: Error putting flight data into queue: {e}", exc_info=True)
elif ("error_type" in api_result):
error_details_for_controller = api_result.copy()
error_details_for_controller["type"] = MSG_TYPE_ADAPTER_STATUS
self._send_status_to_queue(
status_code=api_result["error_type"],
message=api_result.get("message", "An API error occurred."),
details=api_result,
)
else:
module_logger.error(f"{self.name}: Unknown result structure from _perform_api_request: {api_result}")
time_to_wait_seconds: float
if self._in_backoff_mode:
time_to_wait_seconds = self._current_backoff_delay
module_logger.debug(f"{self.name}: In backoff, next attempt in {time_to_wait_seconds:.1f}s.")
else:
time_to_wait_seconds = self.polling_interval
module_logger.debug(f"{self.name}: Next fetch cycle in {time_to_wait_seconds:.1f}s.")
if time_to_wait_seconds > 0:
if self._stop_event.wait(timeout=time_to_wait_seconds):
module_logger.info(f"{self.name}: Stop event received during wait period. Exiting loop.")
break
else:
if self._stop_event.is_set():
module_logger.info(f"{self.name}: Stop event detected before waiting (wait time <= 0). Exiting loop.")
break
module_logger.info(f"{self.name}: Exited main loop. Sending final STOPPED status.")
try:
final_status_payload = {
"type": MSG_TYPE_ADAPTER_STATUS,
"status_code": STATUS_STOPPED,
"message": f"{self.name}: Adapter thread stopped.",
}
self.output_queue.put_nowait(final_status_payload)
module_logger.info(f"{self.name}: Successfully sent final STATUS_STOPPED to queue.")
except QueueFull:
module_logger.warning(f"{self.name}: Output queue full. Could not send final STATUS_STOPPED message.")
except Exception as e_final_put:
module_logger.error(f"{self.name}: Error sending final STATUS_STOPPED message: {e_final_put}", exc_info=True)
module_logger.info(f"{self.name}: RUN method is terminating now.")