Initial commit for profile FlightMonitor

This commit is contained in:
VALLONGOL 2025-05-15 09:52:24 +02:00
parent 871f6b6906
commit 8b284ea109
7 changed files with 627 additions and 13 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 0 B

After

Width:  |  Height:  |  Size: 43 KiB

View File

@ -1,17 +1,32 @@
# flightmonitor/__main__.py
# Example import assuming your main logic is in a 'main' function
# within a 'app' module in your 'flightmonitor.core' package.
# from flightmonitor.core.app import main as start_application
#
# Or, if you have a function in flightmonitor.core.core:
# from flightmonitor.core.core import main_function
import tkinter as tk
from .gui.main_window import MainWindow
from .controller.app_controller import AppController
def main():
print(f"Running FlightMonitor...")
# Placeholder: Replace with your application's entry point
# Example: start_application()
print("To customize, edit 'flightmonitor/__main__.py' and your core modules.")
"""
Main function to launch the Flight Monitor application.
Initializes the Tkinter root, the application controller, and the main window.
"""
# Initialize the Tkinter root window
root = tk.Tk()
# Create the application controller
# The controller might need a reference to the window,
# and the window will need a reference to the controller.
# We can pass the controller to the window, and then set the window in the controller.
app_controller = AppController()
# Create the main application window
# Pass the root and the controller to the MainWindow
main_app_window = MainWindow(root, app_controller)
# Set the main window instance in the controller if it needs to call methods on the window
app_controller.set_main_window(main_app_window)
# Start the Tkinter event loop
root.mainloop()
if __name__ == "__main__":
main()
# This ensures that main() is called only when the script is executed directly
# (e.g., python -m FlightMonitor) and not when imported as a module.
main()

View File

@ -0,0 +1,110 @@
# FlightMonitor/controller/app_controller.py
# Import LiveFetcher from the data module
from .data.live_fetcher import LiveFetcher
# We will need to instantiate it, so no need to import the entire module if only class is used
class AppController:
"""
Coordinates operations between the GUI and the data/logic modules.
Manages the application's state and flow.
"""
def __init__(self, main_window=None): # main_window can be set later
"""
Initializes the AppController.
Args:
main_window: The main GUI window instance.
"""
self.main_window = main_window
self.live_fetcher = None # Will be an instance of LiveFetcher
self.is_live_monitoring_active = False
def set_main_window(self, main_window):
"""
Sets the main window instance after initialization, if needed.
Args:
main_window: The main GUI window instance.
"""
self.main_window = main_window
def start_live_monitoring(self, bounding_box: dict):
"""
Starts the live flight data monitoring process.
Args:
bounding_box (dict): The geographical bounding box for filtering flights.
"""
if not self.main_window:
print("Error: Main window not set in controller.")
return
if self.is_live_monitoring_active:
self.main_window.update_status("Live monitoring is already active.")
return
self.main_window.update_status(f"Attempting to fetch live flights for bbox: {bounding_box}...")
self.is_live_monitoring_active = True # Set flag before potential blocking call
if self.live_fetcher is None:
self.live_fetcher = LiveFetcher(timeout_seconds=15) # Initialize with a timeout
# --- This is a SYNCHRONOUS call for now ---
# In a later phase, this will be moved to a separate thread.
flights_data = self.live_fetcher.fetch_flights(bounding_box)
if flights_data is not None: # fetch_flights returns None on error, [] if no flights
self.main_window.update_status(f"Fetched {len(flights_data)} flights. Displaying...")
self.main_window.display_flights_on_canvas(flights_data, bounding_box)
if not flights_data: # If list is empty
self.main_window.update_status("No flights found in the selected area.")
else:
error_msg = "Failed to fetch live flight data. Check console for details."
self.main_window.update_status(error_msg)
self.main_window.show_error_message("API Error", error_msg)
# Since fetching failed, we should probably allow user to try again
# or revert the UI state to "stopped"
self.stop_live_monitoring(from_error=True) # Call stop to reset UI
# For now, live monitoring is a one-shot fetch.
# We'll implement periodic polling later.
# So, let's simulate it stopping after one fetch for now, or the UI will stay "stuck"
# self.stop_live_monitoring() # Commented out: let user stop it manually or we'll add timer
def stop_live_monitoring(self, from_error=False):
"""Stops the live flight data monitoring process."""
if not self.is_live_monitoring_active and not from_error:
# if called not due to an error, and not active, do nothing.
# if from_error is true, we might want to proceed to update UI regardless of this flag
if not from_error:
self.main_window.update_status("Live monitoring is not active.")
return
self.is_live_monitoring_active = False
# In the future, if there's a running thread/task, it would be stopped here.
if self.main_window and not from_error: # only update status if not already handled by an error message
self.main_window.update_status("Live monitoring stopped.")
# If called from an error in start_live_monitoring, main_window.stop_monitoring()
# will be called by main_window itself to reset buttons.
# If we want controller to explicitly tell window to reset UI:
if self.main_window and from_error:
self.main_window._stop_monitoring() # Call the UI method to reset buttons etc.
def start_history_monitoring(self):
"""Starts the historical flight data analysis process."""
if not self.main_window:
print("Error: Main window not set in controller.")
return
self.main_window.update_status("History monitoring started (placeholder).")
# Logic for loading and displaying historical data will go here.
# For now, main_window._start_monitoring handles the placeholder display.
def stop_history_monitoring(self):
"""Stops the historical flight data analysis process."""
if not self.main_window:
# print("Error: Main window not set in controller.") # Can be noisy if not active
return
self.main_window.update_status("History monitoring stopped.")
# Clean up historical data display, etc.

View File

@ -0,0 +1,140 @@
# FlightMonitor/data/live_fetcher.py
import requests # External library for making HTTP requests
import json # For parsing JSON responses
# It's good practice to define the API endpoint URL as a constant
OPENSKY_API_URL = "https://opensky-network.org/api/states/all"
class LiveFetcher:
"""
Fetches live flight data from an external API (e.g., OpenSky Network).
"""
def __init__(self, timeout_seconds=10):
"""
Initializes the LiveFetcher.
Args:
timeout_seconds (int): Timeout for the API request in seconds.
"""
self.timeout = timeout_seconds
def fetch_flights(self, bounding_box: dict):
"""
Fetches current flight states within a given geographical bounding box.
Args:
bounding_box (dict): A dictionary containing the keys
'lat_min', 'lon_min', 'lat_max', 'lon_max'.
Returns:
list: A list of dictionaries, where each dictionary represents a flight
and contains keys like 'icao24', 'callsign', 'longitude', 'latitude',
'altitude', 'velocity', 'heading'.
Returns None if an error occurs (e.g., network issue, API error).
"""
if not bounding_box:
print("Error: Bounding box not provided to fetch_flights.")
return None
params = {
"lamin": bounding_box["lat_min"],
"lomin": bounding_box["lon_min"],
"lamax": bounding_box["lat_max"],
"lomax": bounding_box["lon_max"],
}
try:
response = requests.get(OPENSKY_API_URL, params=params, timeout=self.timeout)
response.raise_for_status() # Raises an HTTPError for bad responses (4XX or 5XX)
data = response.json()
# print(f"Raw data from OpenSky: {json.dumps(data, indent=2)}") # For debugging
if data and "states" in data and data["states"] is not None:
flights_list = []
for state_vector in data["states"]:
# According to OpenSky API documentation for /states/all:
# 0: icao24
# 1: callsign
# 2: origin_country
# 5: longitude
# 6: latitude
# 7: baro_altitude (meters)
# 8: on_ground (boolean)
# 9: velocity (m/s over ground)
# 10: true_track (degrees, 0-360, clockwise from North)
# 13: geo_altitude (meters)
# Ensure vector has enough elements and longitude/latitude are not None
if len(state_vector) > 13 and state_vector[5] is not None and state_vector[6] is not None:
flight_info = {
"icao24": state_vector[0].strip() if state_vector[0] else "N/A",
"callsign": state_vector[1].strip() if state_vector[1] else "N/A",
"origin_country": state_vector[2],
"longitude": float(state_vector[5]),
"latitude": float(state_vector[6]),
"baro_altitude": float(state_vector[7]) if state_vector[7] is not None else None,
"on_ground": bool(state_vector[8]),
"velocity": float(state_vector[9]) if state_vector[9] is not None else None,
"true_track": float(state_vector[10]) if state_vector[10] is not None else None,
"geo_altitude": float(state_vector[13]) if state_vector[13] is not None else None,
}
# Use callsign if available and not empty, otherwise icao24 for display label
flight_info["display_label"] = flight_info["callsign"] if flight_info["callsign"] != "N/A" and flight_info["callsign"] else flight_info["icao24"]
flights_list.append(flight_info)
return flights_list
else:
# No states found or states is null (can happen if area is empty or over water with no traffic)
print("No flight states returned from API or 'states' field is null.")
return [] # Return empty list if no flights, not None
except requests.exceptions.Timeout:
print(f"Error: API request timed out after {self.timeout} seconds.")
return None
except requests.exceptions.HTTPError as http_err:
print(f"Error: HTTP error occurred: {http_err} - Status: {response.status_code}")
# You could inspect response.text or response.json() here for more details if needed
# print(f"Response content: {response.text}")
return None
except requests.exceptions.RequestException as req_err:
print(f"Error: An error occurred during API request: {req_err}")
return None
except json.JSONDecodeError:
print("Error: Could not decode JSON response from API.")
return None
except ValueError as val_err: # Handles potential float conversion errors
print(f"Error: Could not parse flight data value: {val_err}")
return None
if __name__ == '__main__':
# Example usage for testing the fetcher directly
print("Testing LiveFetcher...")
# Example bounding box (Switzerland)
# Note: OpenSky may return empty for very small or specific unloaded areas
# Larger areas like Central Europe are more likely to have data.
example_bbox = {
"lat_min": 45.8389, "lon_min": 5.9962,
"lat_max": 47.8229, "lon_max": 10.5226
}
# example_bbox = { # Broader Central Europe
# "lat_min": 45.0, "lon_min": 5.0,
# "lat_max": 55.0, "lon_max": 15.0
# }
fetcher = LiveFetcher(timeout_seconds=15)
flights = fetcher.fetch_flights(example_bbox)
if flights is not None:
if flights: # Check if list is not empty
print(f"Successfully fetched {len(flights)} flights.")
for i, flight in enumerate(flights):
if i < 5: # Print details of first 5 flights
print(
f" Flight {i+1}: {flight.get('display_label', 'N/A')} "
f"Lat={flight.get('latitude')}, Lon={flight.get('longitude')}, "
f"Alt={flight.get('baro_altitude')}m, Vel={flight.get('velocity')}m/s"
)
else:
print("No flights found in the specified area.")
else:
print("Failed to fetch flights.")

View File

@ -0,0 +1,349 @@
import tkinter as tk
from tkinter import ttk
from tkinter import messagebox # Importato per eventuali messaggi di errore
# Valori di default per il bounding box (Europa centrale circa)
DEFAULT_LAT_MIN = 45.0
DEFAULT_LON_MIN = 5.0
DEFAULT_LAT_MAX = 55.0
DEFAULT_LON_MAX = 15.0
DEFAULT_CANVAS_WIDTH = 800
DEFAULT_CANVAS_HEIGHT = 600
class MainWindow:
"""
Main window of the Flight Monitor application.
It handles the layout and basic user interactions.
"""
def __init__(self, root: tk.Tk, controller):
"""
Initializes the main window.
Args:
root (tk.Tk): The root Tkinter object.
controller: The application controller instance.
"""
self.root = root
self.controller = controller
self.root.title("Flight Monitor")
self.root.geometry("1000x750") # Aumentata un po' la dimensione per i nuovi widget
# Main frame
self.main_frame = ttk.Frame(self.root, padding="10")
self.main_frame.pack(fill=tk.BOTH, expand=True)
# --- Control Frame ---
self.control_frame = ttk.LabelFrame(self.main_frame, text="Controls", padding="10")
self.control_frame.pack(side=tk.TOP, fill=tk.X, pady=5)
# Mode selection
self.mode_var = tk.StringVar(value="Live") # Default to Live
self.live_radio = ttk.Radiobutton(
self.control_frame,
text="Live",
variable=self.mode_var,
value="Live",
command=self._on_mode_change
)
self.live_radio.pack(side=tk.LEFT, padx=5)
self.history_radio = ttk.Radiobutton(
self.control_frame,
text="History",
variable=self.mode_var,
value="History",
command=self._on_mode_change
)
self.history_radio.pack(side=tk.LEFT, padx=5)
# Start/Stop buttons
self.start_button = ttk.Button(
self.control_frame,
text="Start Monitoring",
command=self._start_monitoring
)
self.start_button.pack(side=tk.LEFT, padx=5)
self.stop_button = ttk.Button(
self.control_frame,
text="Stop Monitoring",
command=self._stop_monitoring,
state=tk.DISABLED
)
self.stop_button.pack(side=tk.LEFT, padx=5)
# --- Bounding Box Input Frame ---
self.bbox_frame = ttk.LabelFrame(self.main_frame, text="Geographic Area (Bounding Box)", padding="10")
self.bbox_frame.pack(side=tk.TOP, fill=tk.X, pady=5)
# Lat Min
ttk.Label(self.bbox_frame, text="Lat Min:").grid(row=0, column=0, padx=5, pady=2, sticky=tk.W)
self.lat_min_var = tk.StringVar(value=str(DEFAULT_LAT_MIN))
self.lat_min_entry = ttk.Entry(self.bbox_frame, textvariable=self.lat_min_var, width=10)
self.lat_min_entry.grid(row=0, column=1, padx=5, pady=2)
# Lon Min
ttk.Label(self.bbox_frame, text="Lon Min:").grid(row=0, column=2, padx=5, pady=2, sticky=tk.W)
self.lon_min_var = tk.StringVar(value=str(DEFAULT_LON_MIN))
self.lon_min_entry = ttk.Entry(self.bbox_frame, textvariable=self.lon_min_var, width=10)
self.lon_min_entry.grid(row=0, column=3, padx=5, pady=2)
# Lat Max
ttk.Label(self.bbox_frame, text="Lat Max:").grid(row=1, column=0, padx=5, pady=2, sticky=tk.W)
self.lat_max_var = tk.StringVar(value=str(DEFAULT_LAT_MAX))
self.lat_max_entry = ttk.Entry(self.bbox_frame, textvariable=self.lat_max_var, width=10)
self.lat_max_entry.grid(row=1, column=1, padx=5, pady=2)
# Lon Max
ttk.Label(self.bbox_frame, text="Lon Max:").grid(row=1, column=2, padx=5, pady=2, sticky=tk.W)
self.lon_max_var = tk.StringVar(value=str(DEFAULT_LON_MAX))
self.lon_max_entry = ttk.Entry(self.bbox_frame, textvariable=self.lon_max_var, width=10)
self.lon_max_entry.grid(row=1, column=3, padx=5, pady=2)
# --- Output Area (Canvas) ---
self.output_frame = ttk.LabelFrame(self.main_frame, text="Flight Map", padding="10")
self.output_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True, pady=5)
self.flight_canvas = tk.Canvas(
self.output_frame,
bg="lightgray", # Background color for visibility
width=DEFAULT_CANVAS_WIDTH,
height=DEFAULT_CANVAS_HEIGHT
)
self.flight_canvas.pack(fill=tk.BOTH, expand=True)
# Placeholder text on canvas (can be removed or updated dynamically)
self.flight_canvas.create_text(
DEFAULT_CANVAS_WIDTH / 2,
DEFAULT_CANVAS_HEIGHT / 2,
text="Map Area - Waiting for data...",
tags="placeholder_text"
)
# --- Status Bar ---
self.status_bar_frame = ttk.Frame(self.root, padding=(5, 2))
self.status_bar_frame.pack(side=tk.BOTTOM, fill=tk.X)
self.status_label = ttk.Label(self.status_bar_frame, text="Status: Idle")
self.status_label.pack(side=tk.LEFT)
self._on_mode_change() # Initialize view based on default mode
def _on_mode_change(self):
"""Handles UI changes when the monitoring mode (Live/History) changes."""
selected_mode = self.mode_var.get()
self.update_status(f"Mode changed to {selected_mode}")
if selected_mode == "Live":
self.bbox_frame.pack(side=tk.TOP, fill=tk.X, pady=5) # Show bbox for live
self.flight_canvas.itemconfig("placeholder_text", text="Map Area - Waiting for live data...")
# Enable/disable fields as needed
self._set_bbox_entries_state(tk.NORMAL)
else: # History mode
# self.bbox_frame.pack_forget() # Hide bbox for history (or adapt for history filters)
# For now, let's keep it visible but disabled for History
self._set_bbox_entries_state(tk.DISABLED)
self.flight_canvas.itemconfig("placeholder_text", text="Map Area - Waiting for historical data...")
self.clear_canvas() # Clear canvas on mode change
def _set_bbox_entries_state(self, state):
"""Enable or disable bounding box entry fields."""
self.lat_min_entry.config(state=state)
self.lon_min_entry.config(state=state)
self.lat_max_entry.config(state=state)
self.lon_max_entry.config(state=state)
def _start_monitoring(self):
"""Starts the monitoring process based on the selected mode."""
selected_mode = self.mode_var.get()
self.update_status(f"Starting {selected_mode} monitoring...")
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
self.live_radio.config(state=tk.DISABLED)
self.history_radio.config(state=tk.DISABLED)
self._set_bbox_entries_state(tk.DISABLED) # Disable bbox while running
if selected_mode == "Live":
try:
bounding_box = self.get_bounding_box()
if bounding_box:
self.controller.start_live_monitoring(bounding_box)
else:
# Error already shown by get_bounding_box
self._stop_monitoring() # Revert state
except ValueError as e:
messagebox.showerror("Input Error", str(e))
self._stop_monitoring() # Revert state if there was an error before calling controller
elif selected_mode == "History":
self.controller.start_history_monitoring()
# Placeholder for history:
self.update_status("History mode selected. Displaying sample historical data.")
self.flight_canvas.delete("placeholder_text") # Remove placeholder
# Example: Draw a dummy item for history
self.flight_canvas.create_oval(90, 90, 110, 110, fill="blue", outline="black", tags="flight_dot")
self.flight_canvas.create_text(100, 125, text="HIST_FLIGHT", tags="flight_label")
def _stop_monitoring(self):
"""Stops the monitoring process."""
self.update_status("Monitoring stopped. Status: Idle")
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self.live_radio.config(state=tk.NORMAL)
self.history_radio.config(state=tk.NORMAL)
self._set_bbox_entries_state(tk.NORMAL if self.mode_var.get() == "Live" else tk.DISABLED)
selected_mode = self.mode_var.get()
if selected_mode == "Live":
self.controller.stop_live_monitoring()
elif selected_mode == "History":
self.controller.stop_history_monitoring()
# self.clear_canvas() # Optionally clear canvas on stop
# self.flight_canvas.create_text(
# DEFAULT_CANVAS_WIDTH / 2,
# DEFAULT_CANVAS_HEIGHT / 2,
# text="Map Area - Monitoring stopped.",
# tags="placeholder_text"
# )
def get_bounding_box(self):
"""
Retrieves and validates the bounding box coordinates from the input fields.
Returns:
dict: A dictionary with 'lat_min', 'lon_min', 'lat_max', 'lon_max' if valid.
None: If validation fails.
"""
try:
lat_min = float(self.lat_min_var.get())
lon_min = float(self.lon_min_var.get())
lat_max = float(self.lat_max_var.get())
lon_max = float(self.lon_max_var.get())
if not (-90 <= lat_min <= 90 and -90 <= lat_max <= 90 and
-180 <= lon_min <= 180 and -180 <= lon_max <= 180):
messagebox.showerror("Input Error", "Invalid latitude or longitude range.")
return None
if lat_min >= lat_max:
messagebox.showerror("Input Error", "Lat Min must be less than Lat Max.")
return None
if lon_min >= lon_max:
messagebox.showerror("Input Error", "Lon Min must be less than Lon Max.")
return None
return {
"lat_min": lat_min, "lon_min": lon_min,
"lat_max": lat_max, "lon_max": lon_max
}
except ValueError:
messagebox.showerror("Input Error", "Bounding box coordinates must be valid numbers.")
return None
def display_flights_on_canvas(self, flights_data, bounding_box):
"""
Displays flight data on the canvas.
Args:
flights_data (list): A list of flight data dictionaries.
Each dictionary should have 'latitude', 'longitude',
and 'callsign' (or 'icao24').
bounding_box (dict): The bounding box used for fetching,
needed for coordinate scaling.
"""
self.clear_canvas()
if not flights_data:
self.flight_canvas.create_text(
self.flight_canvas.winfo_width() / 2,
self.flight_canvas.winfo_height() / 2,
text="No flights found in the selected area.",
tags="placeholder_text"
)
return
canvas_width = self.flight_canvas.winfo_width()
canvas_height = self.flight_canvas.winfo_height()
# If canvas hasn't been drawn yet, its dimensions might be 1. Use defaults.
if canvas_width <= 1 or canvas_height <= 1:
canvas_width = DEFAULT_CANVAS_WIDTH
canvas_height = DEFAULT_CANVAS_HEIGHT
# Schedule a redraw if we used defaults, as winfo_width/height might be updated later
self.root.after(100, lambda: self.display_flights_on_canvas(flights_data, bounding_box))
# return # Optional: wait for actual dimensions
# Unpack bounding box for easier access
lat_min = bounding_box["lat_min"]
lon_min = bounding_box["lon_min"]
lat_max = bounding_box["lat_max"]
lon_max = bounding_box["lon_max"]
# Ensure deltas are not zero to avoid division by zero
delta_lat = lat_max - lat_min
delta_lon = lon_max - lon_min
if delta_lat == 0: delta_lat = 1 # Avoid division by zero
if delta_lon == 0: delta_lon = 1 # Avoid division by zero
for flight in flights_data:
lat = flight.get("latitude")
lon = flight.get("longitude")
callsign = flight.get("callsign", flight.get("icao24", "N/A")) # Use callsign or icao24
if lat is None or lon is None:
# print(f"Skipping flight {callsign} due to missing coordinates.")
continue # Skip if no coordinates
# Basic scaling: map geographic coordinates to canvas coordinates
# X corresponds to longitude, Y corresponds to latitude
# Canvas Y is inverted (0 at top, height at bottom)
# Ensure longitude is within the bounding box to avoid extreme scaling issues if data is outside
# (though API should filter, this is a safeguard for display)
if not (lon_min <= lon <= lon_max and lat_min <= lat <= lat_max):
# print(f"Flight {callsign} ({lat}, {lon}) is outside bbox {bounding_box} - skipping display.")
continue
x_pixel = ((lon - lon_min) / delta_lon) * canvas_width
y_pixel = canvas_height - (((lat - lat_min) / delta_lat) * canvas_height)
# Draw a small circle for the aircraft
radius = 3
self.flight_canvas.create_oval(
x_pixel - radius, y_pixel - radius,
x_pixel + radius, y_pixel + radius,
fill="red", outline="black", tags="flight_dot"
)
# Optionally, display callsign
self.flight_canvas.create_text(
x_pixel, y_pixel - (radius + 5), # Position text above dot
text=callsign,
font=("Arial", 7),
tags="flight_label"
)
self.update_status(f"Displayed {len(flights_data)} flights on map.")
def clear_canvas(self):
"""Clears all items (flights, placeholder) from the canvas."""
self.flight_canvas.delete("flight_dot")
self.flight_canvas.delete("flight_label")
self.flight_canvas.delete("placeholder_text")
def update_status(self, message: str):
"""
Updates the status bar with a message.
Args:
message (str): The message to display.
"""
self.status_label.config(text=f"Status: {message}")
# print(f"Status Update: {message}") # For console logging during development
def show_error_message(self, title: str, message: str):
"""
Displays an error message box.
Args:
title (str): The title of the message box.
message (str): The error message.
"""
messagebox.showerror(title, message)
self.update_status(f"Error: {message[:50]}...") # Update status bar as well