310 lines
13 KiB
Python
310 lines
13 KiB
Python
# FlightMonitor/data/common_models.py
|
|
"""
|
|
Defines the canonical data models used throughout the FlightMonitor application.
|
|
These models provide a standardized representation of flight data, independent
|
|
of the data provider (e.g., OpenSky, ADS-B Exchange, etc.).
|
|
"""
|
|
|
|
from typing import Optional, Any # For type hints
|
|
import time # Used as a fallback for timestamps if absolutely necessary
|
|
|
|
# MODIFIED: Import logging instead of get_logger.
|
|
# WHY: This is a core data model module; keeping it minimal and avoiding dependencies on application-specific logger setup is better.
|
|
# HOW: Changed import.
|
|
import logging
|
|
|
|
# MODIFIED: Get module-level logger using standard logging.getLogger.
|
|
# WHY: Avoids dependency on application's get_logger function.
|
|
# HOW: Changed get_logger(__name__) to logging.getLogger(__name__).
|
|
logger = logging.getLogger(
|
|
__name__
|
|
) # flightmonitor.data.common_models (or just common_models if run standalone)
|
|
|
|
|
|
class CanonicalFlightState:
|
|
"""
|
|
Represents the standardized state of an aircraft at a specific point in time.
|
|
All timestamps are expected to be Unix epoch (seconds, UTC).
|
|
All measurements (altitude, velocity, rate) are in metric units (meters, m/s).
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
icao24: str,
|
|
timestamp: float, # Primary timestamp of the position/state data (UTC epoch)
|
|
last_contact_timestamp: float, # Timestamp of the last message from transponder (UTC epoch)
|
|
latitude: Optional[float],
|
|
longitude: Optional[float],
|
|
on_ground: bool,
|
|
callsign: Optional[str] = None,
|
|
origin_country: Optional[str] = None,
|
|
baro_altitude_m: Optional[float] = None, # Barometric altitude in meters
|
|
geo_altitude_m: Optional[float] = None, # Geometric altitude in meters
|
|
velocity_mps: Optional[float] = None, # Ground speed in meters/second
|
|
true_track_deg: Optional[
|
|
float
|
|
] = None, # True track in degrees (0-360, North is 0)
|
|
vertical_rate_mps: Optional[
|
|
float
|
|
] = None, # Vertical rate in meters/second (positive=climbing)
|
|
squawk: Optional[str] = None,
|
|
spi: Optional[bool] = None, # Special Purpose Indicator
|
|
position_source: Optional[
|
|
Any
|
|
] = None, # Source of position data (e.g., 0 for ADS-B, string name)
|
|
raw_data_provider: Optional[
|
|
str
|
|
] = None, # Identifier for the data provider (e.g., "OpenSky")
|
|
# provider_specific_data: Optional[dict] = None # Optional: for any extra data not in canonical model
|
|
):
|
|
"""
|
|
Initializes a CanonicalFlightState object.
|
|
|
|
Args:
|
|
icao24 (str): Unique ICAO 24-bit address of the aircraft (hex string).
|
|
timestamp (float): The primary timestamp (UTC epoch seconds) for this state record,
|
|
ideally from the data provider indicating when the state was valid.
|
|
last_contact_timestamp (float): The timestamp (UTC epoch seconds) of the last message
|
|
received from the aircraft's transponder.
|
|
latitude (Optional[float]): Latitude in decimal degrees (WGS-84).
|
|
longitude (Optional[float]): Longitude in decimal degrees (WGS-84).
|
|
on_ground (bool): True if the aircraft is on the ground.
|
|
callsign (Optional[str]): Callsign of the aircraft (e.g., "DAL123").
|
|
origin_country (Optional[str]): Origin country of the aircraft, often inferred from ICAO24.
|
|
baro_altitude_m (Optional[float]): Barometric altitude in meters.
|
|
geo_altitude_m (Optional[float]): Geometric (GNSS/WGS-84) altitude in meters.
|
|
velocity_mps (Optional[float]): Ground speed in meters per second.
|
|
true_track_deg (Optional[float]): True track/heading in degrees clockwise from North.
|
|
vertical_rate_mps (Optional[float]): Vertical rate in meters per second (positive for climb).
|
|
squawk (Optional[str]): Transponder squawk code.
|
|
spi (Optional[bool]): Special Purpose Indicator flag.
|
|
position_source (Optional[Any]): Identifier for the source of the position data.
|
|
raw_data_provider (Optional[str]): Name of the provider that supplied this data.
|
|
"""
|
|
if not icao24:
|
|
# MODIFIED: Use standard logger to log error in __init__.
|
|
# WHY: Use logger instead of print or raising raw exception for better error reporting integration.
|
|
# HOW: Added logger.error.
|
|
logger.error(
|
|
"icao24 cannot be None or empty during CanonicalFlightState initialization."
|
|
)
|
|
raise ValueError("icao24 cannot be None or empty.")
|
|
|
|
self.icao24: str = (
|
|
icao24.lower().strip()
|
|
) # Standardize to lowercase and strip whitespace
|
|
self.callsign: Optional[str] = (
|
|
callsign.strip() if callsign and callsign.strip() else None
|
|
)
|
|
self.origin_country: Optional[str] = origin_country
|
|
|
|
self.timestamp: float = timestamp
|
|
self.last_contact_timestamp: float = last_contact_timestamp
|
|
|
|
self.latitude: Optional[float] = latitude
|
|
self.longitude: Optional[float] = longitude
|
|
|
|
self.baro_altitude_m: Optional[float] = baro_altitude_m
|
|
self.geo_altitude_m: Optional[float] = geo_altitude_m
|
|
|
|
self.on_ground: bool = on_ground
|
|
self.velocity_mps: Optional[float] = velocity_mps
|
|
self.true_track_deg: Optional[float] = true_track_deg
|
|
self.vertical_rate_mps: Optional[float] = vertical_rate_mps
|
|
|
|
self.squawk: Optional[str] = squawk
|
|
self.spi: Optional[bool] = spi
|
|
self.position_source: Optional[Any] = (
|
|
position_source # Could be int or str based on provider
|
|
)
|
|
|
|
self.raw_data_provider: Optional[str] = raw_data_provider
|
|
# self.provider_specific_data = provider_specific_data or {}
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"{self.__class__.__name__}("
|
|
f"icao24='{self.icao24}', callsign='{self.callsign}', "
|
|
f"timestamp={self.timestamp:.0f}, lat={self.latitude}, lon={self.longitude}, "
|
|
f"alt_m={self.baro_altitude_m if self.baro_altitude_m is not None else self.geo_altitude_m}, "
|
|
f"on_ground={self.on_ground}"
|
|
")"
|
|
)
|
|
|
|
def __str__(self) -> str:
|
|
# Provides a more detailed, readable string representation
|
|
# MODIFIED: Include new attributes in the detailed string representation if not None.
|
|
# WHY: Provide a more complete view of the state.
|
|
# HOW: Added new attributes to the list comprehension.
|
|
details = [
|
|
f"icao24='{self.icao24}'",
|
|
f"callsign='{self.callsign}'" if self.callsign is not None else None,
|
|
(
|
|
f"origin_country='{self.origin_country}'"
|
|
if self.origin_country is not None
|
|
else None
|
|
),
|
|
f"timestamp={self.timestamp:.0f}",
|
|
f"last_contact={self.last_contact_timestamp:.0f}",
|
|
f"lat={self.latitude}" if self.latitude is not None else None,
|
|
f"lon={self.longitude}" if self.longitude is not None else None,
|
|
(
|
|
f"baro_alt_m={self.baro_altitude_m:.1f}"
|
|
if self.baro_altitude_m is not None
|
|
else None
|
|
),
|
|
(
|
|
f"geo_alt_m={self.geo_altitude_m:.1f}"
|
|
if self.geo_altitude_m is not None
|
|
else None
|
|
),
|
|
f"on_ground={self.on_ground}",
|
|
(
|
|
f"vel_mps={self.velocity_mps:.1f}"
|
|
if self.velocity_mps is not None
|
|
else None
|
|
),
|
|
(
|
|
f"track_deg={self.true_track_deg:.1f}"
|
|
if self.true_track_deg is not None
|
|
else None
|
|
),
|
|
(
|
|
f"vert_rate_mps={self.vertical_rate_mps:.1f}"
|
|
if self.vertical_rate_mps is not None
|
|
else None
|
|
),
|
|
f"squawk='{self.squawk}'" if self.squawk is not None else None,
|
|
f"spi={self.spi}" if self.spi is not None else None,
|
|
(
|
|
f"pos_src={self.position_source}"
|
|
if self.position_source is not None
|
|
else None
|
|
),
|
|
(
|
|
f"provider='{self.raw_data_provider}'"
|
|
if self.raw_data_provider is not None
|
|
else None
|
|
),
|
|
]
|
|
# Filter out None values from the list
|
|
details = [detail for detail in details if detail is not None]
|
|
|
|
return f"{self.__class__.__name__}({', '.join(details)})"
|
|
|
|
# It might be useful to have a method to convert to a dictionary,
|
|
# e.g., for serialization or if some parts of the code still expect dicts.
|
|
def to_dict(self) -> dict:
|
|
"""Converts the CanonicalFlightState object to a dictionary."""
|
|
# MODIFIED: Ensure all attributes are included in the dictionary.
|
|
# WHY: The previous version had a redundant filter; vars(self) is sufficient.
|
|
# HOW: Simplified the dictionary comprehension.
|
|
return {
|
|
attr: getattr(self, attr) for attr in vars(self)
|
|
} # Include all attributes
|
|
|
|
|
|
# Potremmo aggiungere altre classi qui in futuro, come:
|
|
# class CanonicalFlightTrack: ...
|
|
# class CanonicalAirportInfo: ...
|
|
# class CanonicalFlightRoute: ...
|
|
|
|
if __name__ == "__main__":
|
|
# Example usage and test
|
|
# MODIFIED: Added basicConfig for logging in the standalone test block.
|
|
# WHY: Ensures logger output is visible when running this script directly.
|
|
# HOW: Added logging.basicConfig.
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format="%(asctime)s [%(levelname)-8s] %(name)-20s : %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
|
|
logger.info("--- Testing CanonicalFlightState ---")
|
|
try:
|
|
# Test case 1: Minimal data
|
|
ts = time.time()
|
|
state1_data = {
|
|
"icao24": "AABBCC",
|
|
"timestamp": ts,
|
|
"last_contact_timestamp": ts - 1, # Last contact slightly before position
|
|
"latitude": 50.123,
|
|
"longitude": 10.456,
|
|
"on_ground": False,
|
|
"raw_data_provider": "TestProvider",
|
|
}
|
|
state1 = CanonicalFlightState(**state1_data)
|
|
print(f"\nState 1 (repr): {repr(state1)}")
|
|
print(f"State 1 (str): {str(state1)}")
|
|
print(f"State 1 (dict): {state1.to_dict()}")
|
|
assert state1.icao24 == "aabbcc"
|
|
assert state1.timestamp == ts
|
|
assert state1.vertical_rate_mps is None # Check optional default to None
|
|
|
|
# Test case 2: More complete data
|
|
state2_data = {
|
|
"icao24": "123456",
|
|
"callsign": "FLIGHT01 ", # With trailing spaces
|
|
"origin_country": "Testland",
|
|
"timestamp": ts + 10,
|
|
"last_contact_timestamp": ts + 9,
|
|
"latitude": -33.8688,
|
|
"longitude": 151.2093,
|
|
"baro_altitude_m": 10000.0,
|
|
"geo_altitude_m": 10200.5,
|
|
"on_ground": True,
|
|
"velocity_mps": 150.75,
|
|
"true_track_deg": 180.5,
|
|
"vertical_rate_mps": -5.2,
|
|
"squawk": "7000",
|
|
"spi": True,
|
|
"position_source": 0, # ADS-B
|
|
"raw_data_provider": "OpenSky-Test",
|
|
}
|
|
state2 = CanonicalFlightState(**state2_data)
|
|
print(f"\nState 2 (repr): {repr(state2)}")
|
|
print(f"State 2 (str): {str(state2)}")
|
|
print(f"State 2 (dict): {state2.to_dict()}")
|
|
assert state2.callsign == "FLIGHT01" # Check stripping
|
|
assert state2.on_ground is True
|
|
assert state2.vertical_rate_mps == -5.2
|
|
assert state2.squawk == "7000"
|
|
|
|
# Test case 3: Missing optional data
|
|
state3 = CanonicalFlightState(
|
|
icao24="abcdef",
|
|
timestamp=ts + 20,
|
|
last_contact_timestamp=ts + 19,
|
|
latitude=None, # Missing lat
|
|
longitude=None, # Missing lon
|
|
on_ground=False,
|
|
)
|
|
print(f"\nState 3 (repr): {repr(state3)}")
|
|
print(f"State 3 (str): {str(state3)}")
|
|
assert state3.latitude is None
|
|
assert state3.callsign is None
|
|
assert state3.squawk is None # Check optional default to None
|
|
|
|
# Test case 4: Invalid ICAO24 (should raise ValueError)
|
|
print("\nTesting invalid ICAO24 (expect ValueError):")
|
|
try:
|
|
CanonicalFlightState(
|
|
icao24="",
|
|
timestamp=ts,
|
|
last_contact_timestamp=ts,
|
|
latitude=0,
|
|
longitude=0,
|
|
on_ground=False,
|
|
)
|
|
except ValueError as e:
|
|
print(f"Caught expected error: {e}")
|
|
assert "icao24 cannot be None or empty" in str(e)
|
|
|
|
print("\n--- CanonicalFlightState tests passed ---")
|
|
|
|
except Exception as e:
|
|
print(f"Error during CanonicalFlightState tests: {e}")
|
|
import traceback
|
|
|
|
traceback.print_exc()
|