first commit

This commit is contained in:
VALLONGOL 2025-09-15 10:49:26 +02:00
parent 6c26d0287a
commit 640d772dd1
8 changed files with 1038 additions and 12 deletions

15
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,15 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Module",
"type": "debugpy",
"request": "launch",
"module": "downloaderyoutube"
}
]
}

25
GEMINI.md Normal file
View File

@ -0,0 +1,25 @@
Sono un ingegnere informatico che sviluppa principalmente in python e c++.
Quando mi proponi del codice ricordati di indicarmi le modifiche con il codice precedente, perchè le hai fatte, dove e come sono state fatte.
Utilizzando tutte le regole per scrivere in maniera migliore possibile codice in python.
Non dare nulla per scontato e spiega tutti i passi che segui nei tuoi ragionamenti.
Con me parla in italiano.
Come deve essere scritto il codice:
1) come standard di scrittura del codice Python, lo standard PEP8.
2) una istruzione per ogni riga di codice
3) nomi di funzioni, variabili, commenti, doc_string devono essere in inglese
4) codice più chiaro, ordinato, riutilizzabile
5) i commenti nel codice devono essere essenziali, stringati e chiari, non devono essere prolissi. dobbiamo cercare di mantenere il codice più pulito possibile, senza troppi fronzoli
6) Non indicare le modifche che fai come commento al codice, ma solo in linea generale in chat. il codice lascialo più pulito possibile
Per semplificare l'operazione di aggiornamento del codice:
1) se le modifiche che proponi interessano solo poche funzioni del modulo, allora indicami il contenuto di tutte le funzioni dove ci sono le modifiche.
2) se le modifiche impattano la maggior parte delle funzioni dello stesso modulo, allora ripeti per intero il codice del modulo senza omissioni.
3) se le modifiche che proponi interessano meno di 5 righe di una funzione, indicami quali sono le linee che cambiano e come modificarle
4) passami sempre un modulo alla volta e ti dico io quando passarmi il successivo, sempre in maniera completa e senza omissioni
Se vedi che il codice di un singolo modulo è più lungo di 1000 righe, prendi in considerazione il fatto di creare un nuovo modulo spostando quelle funzioni che sono omogenee per argomento in questo nuovo modulo e rendere più leggere il file che sta crescendo troppo.
Quando ti passo del codice da analizzare, cerca sempre di capirne le funzionalità e se hai da proporre dei miglioramenti o delle modifiche prima ne discuti con me e poi decidiamo se applicarlo oppure no.
Se noti che nel codice c'è qualcosa da migliorare, ne parli con me e poi vediamo se applicarlo oppure no, per evitare di mettere mano a funzioni che sono già state ottimizzate e funzionano come io voglio, e concentrarsi sulla risoluzione di problemi o l'introduzione di nuove funzioni.

View File

@ -1,17 +1,67 @@
# downloaderyoutube/__main__.py
import tkinter as tk
import logging
import sys
# Example import assuming your main logic is in a 'main' function
# within a 'app' module in your 'downloaderyoutube.core' package.
# from downloaderyoutube.core.app import main as start_application
#
# Or, if you have a function in downloaderyoutube.core.core:
# from downloaderyoutube.core.core import main_function
# Import the necessary components from our application
from downloaderyoutube.gui.gui import App
from downloaderyoutube.utils.logger import setup_basic_logging, shutdown_logging_system
# --- Logging Configuration --
# This dictionary configures the logging system. It can be expanded later.
LOGGING_CONFIG = {
"default_root_level": logging.DEBUG,
"specific_levels": {
# You can set different levels for different modules if needed
# "pytube": logging.WARNING,
},
"format": "%(asctime)s [%(levelname)-8s] %(name)-25s : %(message)s",
"date_format": "%Y-%m-%d %H:%M:%S",
"enable_console": True,
"enable_file": False, # Set to True to enable file logging
# "file_path": "downloader_app.log", # Uncomment if file logging is enabled
}
def main():
print(f"Running DownloaderYouTube...")
# Placeholder: Replace with your application's entry point
# Example: start_application()
print("To customize, edit 'downloaderyoutube/__main__.py' and your core modules.")
"""
Application entry point.
Initializes logging, creates the GUI, and starts the main loop.
"""
# --- Diagnostic Prints ---
print("-" * 50)
print(f"--- PYTHON EXECUTABLE: {sys.executable}")
root = tk.Tk()
# --- Setup Logging ---
# This is a crucial step to initialize the queue-based logging system.
setup_basic_logging(
root_tk_instance_for_processor=root, logging_config_dict=LOGGING_CONFIG
)
# Get a logger for the main entry point
logger = logging.getLogger(__name__)
logger.info("Application starting...")
# --- Create and run the App ---
app = App(master=root)
def on_closing():
"""
Handles the window closing event to ensure a clean shutdown.
"""
logger.info("Shutdown sequence initiated.")
# Properly shut down the logging system before closing the app
shutdown_logging_system()
root.destroy()
# Assign the custom closing function to the window's close button
root.protocol("WM_DELETE_WINDOW", on_closing)
# Start the Tkinter event loop
root.mainloop()
print("Application has been closed.")
if __name__ == "__main__":
main()
main()

View File

@ -0,0 +1,131 @@
import logging
import traceback
from threading import Thread
from dataclasses import dataclass
import yt_dlp
from typing import Callable, Optional, List, Dict, Any
# Import the get_logger function from your custom logger module
from downloaderyoutube.utils.logger import get_logger
# Get a logger instance for this module
logger = get_logger(__name__)
@dataclass
class VideoFormat:
"""A simple class to hold structured information about a video format."""
format_id: str
resolution: str
ext: str
filesize: Optional[int]
note: str # e.g., "Video Only", "Audio Only", "Progressive"
def __str__(self) -> str:
"""User-friendly string representation for the combobox."""
size_mb = f"{self.filesize / (1024 * 1024):.2f} MB" if self.filesize else "N/A"
return f"{self.resolution} ({self.ext}) - {size_mb} - {self.note}"
class Downloader:
"""
Handles video operations using yt-dlp in separate threads to avoid
blocking the GUI.
"""
def __init__(self):
self.progress_callback: Optional[Callable[[int], None]] = None
self.completion_callback: Optional[Callable[[Optional[str]], None]] = None
self.formats_callback: Optional[Callable[[Optional[List[VideoFormat]]], None]] = None
def _progress_hook(self, d: Dict[str, Any]):
if d["status"] == "downloading":
if d.get("total_bytes") and d.get("downloaded_bytes"):
percentage = (d["downloaded_bytes"] / d["total_bytes"]) * 100
if self.progress_callback:
self.progress_callback(int(percentage))
elif d["status"] == "finished":
logger.info("yt-dlp finished downloading.")
def _get_formats_task(self, url: str):
"""Task to fetch video formats in a thread."""
try:
logger.info(f"Fetching formats for URL: {url}")
ydl_opts = {"noplaylist": True}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
formats = []
for f in info.get("formats", []):
# We are interested in mp4 files that have video
if f.get("vcodec", "none") != "none" and f.get("ext") == "mp4":
note = "Progressive" if f.get("acodec", "none") != "none" else "Video Only"
formats.append(VideoFormat(
format_id=f["format_id"],
resolution=f.get("format_note", f.get("resolution", "N/A")),
ext=f["ext"],
filesize=f.get("filesize"),
note=note
))
# Sort formats: progressive first, then by resolution
formats.sort(key=lambda x: (x.note != "Progressive", -int(x.resolution.replace("p", "")) if x.resolution.replace("p", "").isdigit() else 0))
logger.info(f"Found {len(formats)} suitable formats.")
if self.formats_callback:
self.formats_callback(formats)
except Exception as e:
error_message = f"Failed to fetch formats: {e}"
logger.error(error_message)
logger.debug(traceback.format_exc())
if self.formats_callback:
self.formats_callback(None) # Indicate failure
def get_video_formats(self, url: str, formats_callback: Callable[[Optional[List[VideoFormat]]], None]):
"""Starts the format fetching process in a new thread."""
self.formats_callback = formats_callback
thread = Thread(target=self._get_formats_task, args=(url,), daemon=True)
thread.start()
def _download_task(self, url: str, download_path: str, format_id: str):
"""The actual download logic that runs in a separate thread."""
try:
logger.info(f"Starting download for URL: {url} with format: {format_id}")
ydl_opts = {
"format": format_id,
"outtmpl": f"{download_path}/%(title)s.%(ext)s",
"progress_hooks": [self._progress_hook],
"logger": logger,
"noplaylist": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
logger.info(f"Successfully downloaded video from URL: {url}")
if self.completion_callback:
self.completion_callback(None)
except Exception as e:
error_message = f"An unexpected error occurred: {e}"
logger.error(error_message)
logger.debug(traceback.format_exc())
if self.completion_callback:
self.completion_callback(error_message)
def download_video(
self,
url: str,
download_path: str,
format_id: str,
progress_callback: Callable[[int], None],
completion_callback: Callable[[Optional[str]], None],
):
"""Starts the video download in a new thread."""
self.progress_callback = progress_callback
self.completion_callback = completion_callback
thread = Thread(
target=self._download_task, args=(url, download_path, format_id), daemon=True
)
thread.start()

View File

@ -0,0 +1,183 @@
import logging
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from tkinter.scrolledtext import ScrolledText
from typing import Optional, List
from downloaderyoutube.core.core import Downloader, VideoFormat
from downloaderyoutube.utils.logger import add_tkinter_handler, get_logger
# Get a logger instance for this module
logger = get_logger(__name__)
# A basic logging config dictionary for the Tkinter handler
LOGGING_CONFIG = {
"colors": {
logging.DEBUG: "gray",
logging.INFO: "black",
logging.WARNING: "orange",
logging.ERROR: "red",
logging.CRITICAL: "red",
},
"queue_poll_interval_ms": 100,
}
class App(tk.Frame):
"""
The main graphical user interface for the Downloader application.
"""
def __init__(self, master: tk.Tk):
super().__init__(master)
self.master = master
self.downloader = Downloader()
self.download_path: Optional[str] = None
self.available_formats: List[VideoFormat] = []
self.master.title("YouTube Downloader")
self.master.geometry("800x600")
self.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
self._create_widgets()
self._setup_logging_handler()
def _create_widgets(self):
"""Creates and lays out the widgets in the application window."""
# --- Top Frame for URL Input ---
url_frame = ttk.Frame(self)
url_frame.pack(fill=tk.X, pady=(0, 5))
ttk.Label(url_frame, text="Video URL:").pack(side=tk.LEFT, padx=(0, 5))
self.url_entry = ttk.Entry(url_frame)
self.url_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5)
self.analyze_button = ttk.Button(
url_frame, text="Analizza URL", command=self._analyze_url
)
self.analyze_button.pack(side=tk.LEFT, padx=(5, 0))
# --- Path Frame ---
path_frame = ttk.Frame(self)
path_frame.pack(fill=tk.X, pady=5)
self.path_button = ttk.Button(
path_frame, text="Choose Folder...", command=self.select_download_path
)
self.path_button.pack(side=tk.LEFT)
self.path_label = ttk.Label(path_frame, text="No download folder selected.")
self.path_label.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=10)
# --- Format Selection Frame ---
format_frame = ttk.Frame(self)
format_frame.pack(fill=tk.X, pady=5)
ttk.Label(format_frame, text="Formato:").pack(side=tk.LEFT, padx=(0, 5))
self.format_combobox = ttk.Combobox(format_frame, state="readonly")
self.format_combobox.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5)
self.download_button = ttk.Button(
format_frame, text="Download", command=self.start_download, state="disabled"
)
self.download_button.pack(side=tk.LEFT, padx=(5, 0))
# --- Progress Bar ---
self.progress_bar = ttk.Progressbar(
self, orient="horizontal", length=100, mode="determinate"
)
self.progress_bar.pack(fill=tk.X, pady=(10, 5))
# --- Log Viewer ---
log_frame = ttk.LabelFrame(self, text="Log")
log_frame.pack(fill=tk.BOTH, expand=True)
self.log_widget = ScrolledText(
log_frame, state=tk.DISABLED, wrap=tk.WORD, font=("Courier New", 9)
)
self.log_widget.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
def _setup_logging_handler(self):
logger.info("Setting up GUI logging handler.")
add_tkinter_handler(
gui_log_widget=self.log_widget,
root_tk_instance_for_gui_handler=self.master,
logging_config_dict=LOGGING_CONFIG,
)
logger.info("GUI logging handler configured.")
def select_download_path(self):
path = filedialog.askdirectory()
if path:
self.download_path = path
self.path_label.config(text=f"Saving to: {self.download_path}")
logger.info(f"Download path set to: {self.download_path}")
def _analyze_url(self):
url = self.url_entry.get()
if not url:
messagebox.showerror("Error", "Please enter a YouTube URL.")
return
logger.info(f"Analyze button clicked. Fetching formats for {url}")
self.analyze_button.config(state="disabled")
self.download_button.config(state="disabled")
self.format_combobox.set("Analisi in corso...")
self.downloader.get_video_formats(url, self._on_formats_received)
def _on_formats_received(self, formats: Optional[List[VideoFormat]]):
# This is called from a different thread, so we schedule GUI updates
self.master.after(0, self._update_formats_combobox, formats)
def _update_formats_combobox(self, formats: Optional[List[VideoFormat]]):
self.analyze_button.config(state="normal")
if formats:
logger.info("Successfully fetched formats. Populating combobox.")
self.available_formats = formats
self.format_combobox["values"] = [str(f) for f in formats]
self.format_combobox.current(0)
self.download_button.config(state="normal")
else:
logger.error("Failed to fetch video formats.")
messagebox.showerror("Error", "Could not retrieve video formats. Check the URL and logs.")
self.format_combobox.set("Analisi fallita.")
self.format_combobox["values"] = []
def start_download(self):
url = self.url_entry.get()
if not self.download_path:
messagebox.showerror("Error", "Please select a download folder.")
return
selected_index = self.format_combobox.current()
if selected_index < 0:
messagebox.showerror("Error", "Please select a format to download.")
return
selected_format = self.available_formats[selected_index]
logger.info(f"Download button clicked for format: {selected_format.format_id}")
self.download_button.config(state="disabled")
self.analyze_button.config(state="disabled")
self.progress_bar["value"] = 0
self.downloader.download_video(
url=url,
download_path=self.download_path,
format_id=selected_format.format_id,
progress_callback=self._on_download_progress,
completion_callback=self._on_download_complete,
)
def _on_download_progress(self, percentage: int):
self.master.after(0, self.progress_bar.config, {"value": percentage})
def _on_download_complete(self, error_message: Optional[str]):
self.master.after(0, self._finalize_download, error_message)
def _finalize_download(self, error_message: Optional[str]):
self.progress_bar["value"] = 0
self.download_button.config(state="normal")
self.analyze_button.config(state="normal")
if error_message:
messagebox.showerror("Download Failed", error_message)
else:
messagebox.showinfo("Success", "Video downloaded successfully!")

View File

View File

@ -0,0 +1,622 @@
# FlightMonitor/utils/logger.py
import logging
import logging.handlers # For RotatingFileHandler
import tkinter as tk
from tkinter.scrolledtext import ScrolledText
from queue import Queue, Empty as QueueEmpty # Renamed for clarity
from typing import Optional, Dict, Any
# --- Module-level globals for the new centralized logging queue system ---
_global_log_queue: Optional[Queue[logging.LogRecord]] = None
_actual_console_handler: Optional[logging.StreamHandler] = None
_actual_file_handler: Optional[logging.handlers.RotatingFileHandler] = None
_actual_tkinter_handler: Optional["TkinterTextHandler"] = None # Forward declaration
_log_processor_after_id: Optional[str] = None
_logging_system_active: bool = False
_tk_root_instance_for_processing: Optional[tk.Tk] = None
_base_formatter: Optional[logging.Formatter] = None # Store the main formatter
# Interval for polling the global log queue by the GUI thread processor
GLOBAL_LOG_QUEUE_POLL_INTERVAL_MS = 50
class TkinterTextHandler(logging.Handler):
"""
A logging handler that directs log messages to a Tkinter Text widget.
This handler itself is now called by _process_global_log_queue,
which runs in the GUI thread. Its internal queue is for batching updates
to the Text widget if needed, or could be simplified later.
"""
def __init__(
self,
text_widget: tk.Text, # ScrolledText is a tk.Text
level_colors: Dict[int, str],
# root_tk_instance and queue_poll_interval_ms for its own internal queue might be redundant
# if _process_global_log_queue handles batching, but kept for now.
# For now, this handler will still use its own internal queue and after loop for safety.
root_tk_instance_for_widget_update: tk.Tk,
internal_poll_interval_ms: int = 100,
):
super().__init__()
self.text_widget = text_widget
self.level_colors = level_colors
self._is_active = True # Internal active state for this specific handler
self._widget_update_queue = Queue()
self._root_for_widget_update = root_tk_instance_for_widget_update
self._internal_poll_interval_ms = internal_poll_interval_ms
self._internal_after_id: Optional[str] = None
if not (
self.text_widget
and hasattr(self.text_widget, "winfo_exists")
and self.text_widget.winfo_exists()
):
print(
"ERROR: TkinterTextHandler initialized with an invalid or non-existent text_widget.",
flush=True,
)
self._is_active = False
return
if not (
self._root_for_widget_update
and hasattr(self._root_for_widget_update, "winfo_exists")
and self._root_for_widget_update.winfo_exists()
):
print(
"ERROR: TkinterTextHandler initialized with an invalid root for widget update.",
flush=True,
)
self._is_active = False
return
if self._is_active:
self._configure_tags()
self._process_widget_update_queue() # Start its own update loop
def _configure_tags(self):
if not self._is_active or not self.text_widget.winfo_exists():
return
for level, color_value in self.level_colors.items():
level_name = logging.getLevelName(level)
if color_value:
try:
self.text_widget.tag_config(level_name, foreground=color_value)
except tk.TclError:
print(
f"Warning: Could not configure tag for {level_name} in TkinterTextHandler.",
flush=True,
)
def emit(self, record: logging.LogRecord):
# This emit is called by _process_global_log_queue (already in GUI thread)
if not self._is_active or not self.text_widget.winfo_exists():
# This case should ideally not happen if setup is correct
# print(
# f"DEBUG: TkinterTextHandler.emit called but inactive or widget gone. Record: {record.getMessage()}",
# flush=True,
# )
return
try:
msg = self.format(
record
) # Format the record using the formatter set on this handler
level_name = record.levelname
# Put in its own queue for batched widget updates
self._widget_update_queue.put_nowait((level_name, msg))
except Exception as e:
print(
f"Error in TkinterTextHandler.emit (to internal queue): {e}", flush=True
)
def _process_widget_update_queue(self):
if (
not self._is_active
or not self.text_widget.winfo_exists()
or not self._root_for_widget_update.winfo_exists()
):
self._is_active = False
if self._internal_after_id and self._root_for_widget_update.winfo_exists():
try:
self._root_for_widget_update.after_cancel(self._internal_after_id)
except Exception:
pass
self._internal_after_id = None
# print("DEBUG: TkinterTextHandler._process_widget_update_queue stopping (inactive/widget gone).", flush=True)
return
had_messages = False
try:
while not self._widget_update_queue.empty():
had_messages = True
level_name, msg = self._widget_update_queue.get_nowait()
if self.text_widget.winfo_exists():
self.text_widget.configure(state=tk.NORMAL)
self.text_widget.insert(tk.END, msg + "\n", (level_name,))
self.text_widget.configure(state=tk.DISABLED)
else: # Widget destroyed during processing
self._is_active = False
break
self._widget_update_queue.task_done()
if had_messages and self.text_widget.winfo_exists():
self.text_widget.see(tk.END)
except tk.TclError as e_tcl:
print(
f"TkinterTextHandler TclError during widget update: {e_tcl}. Handler stopping.",
flush=True,
)
self._is_active = False
except Exception as e_update:
print(
f"Unexpected error updating TkinterTextHandler widget: {e_update}. Handler stopping.",
flush=True,
)
self._is_active = False
if self._is_active and self._root_for_widget_update.winfo_exists():
self._internal_after_id = self._root_for_widget_update.after(
self._internal_poll_interval_ms, self._process_widget_update_queue
)
else: # Ensure it's cleaned up if we are stopping
if self._internal_after_id and self._root_for_widget_update.winfo_exists():
try:
self._root_for_widget_update.after_cancel(self._internal_after_id)
except Exception:
pass
self._internal_after_id = None
def close(self):
# print("INFO: TkinterTextHandler close called.", flush=True) # Reduced verbosity
self._is_active = False
if (
self._internal_after_id
and hasattr(self, "_root_for_widget_update")
and self._root_for_widget_update
and self._root_for_widget_update.winfo_exists()
):
try:
self._root_for_widget_update.after_cancel(self._internal_after_id)
except Exception:
pass
self._internal_after_id = None
# Drain its internal queue
while not self._widget_update_queue.empty():
try:
self._widget_update_queue.get_nowait()
self._widget_update_queue.task_done()
except Exception:
break
super().close()
class QueuePuttingHandler(logging.Handler):
"""
A simple handler that puts any received LogRecord into a global queue.
"""
def __init__(self, handler_queue: Queue[logging.LogRecord]):
super().__init__()
self.handler_queue = handler_queue
def emit(self, record: logging.LogRecord):
try:
self.handler_queue.put_nowait(record)
except queue.Full: # Standard library 'queue.Full'
print(
f"CRITICAL: Global log queue is full! Log record for '{record.name}' might be lost.",
flush=True,
)
except Exception as e:
print(
f"CRITICAL: Error putting log record into global queue: {e}", flush=True
)
def _process_global_log_queue():
"""
GUI Thread: Periodically processes LogRecords from the _global_log_queue
and dispatches them to the actual configured handlers.
"""
global _logging_system_active, _log_processor_after_id
# print(f"DEBUG: _process_global_log_queue called. Active: {_logging_system_active}", flush=True)
if not _logging_system_active:
if (
_log_processor_after_id
and _tk_root_instance_for_processing
and _tk_root_instance_for_processing.winfo_exists()
):
try:
_tk_root_instance_for_processing.after_cancel(_log_processor_after_id)
except Exception:
pass
_log_processor_after_id = None
return
if not (
_tk_root_instance_for_processing
and _tk_root_instance_for_processing.winfo_exists()
):
_logging_system_active = False
_log_processor_after_id = None
return
processed_count = 0
try:
while _global_log_queue and not _global_log_queue.empty():
if not _logging_system_active:
break
try:
record = _global_log_queue.get_nowait()
processed_count += 1
if _actual_console_handler:
try:
_actual_console_handler.handle(record)
except Exception as e_con:
print(f"Error in console_handler.handle: {e_con}", flush=True)
if _actual_file_handler:
try:
_actual_file_handler.handle(record)
except Exception as e_file:
print(f"Error in file_handler.handle: {e_file}", flush=True)
if _actual_tkinter_handler:
try:
_actual_tkinter_handler.handle(record)
except Exception as e_tk:
print(f"Error in tkinter_handler.handle: {e_tk}", flush=True)
_global_log_queue.task_done()
except QueueEmpty:
break
except Exception as e_proc_item:
print(
f"Error processing a log item from global queue: {e_proc_item}",
flush=True,
)
except Exception as e_outer_loop:
print(
f"Critical error in _process_global_log_queue outer loop: {e_outer_loop}",
flush=True,
)
if _logging_system_active and _tk_root_instance_for_processing.winfo_exists():
_log_processor_after_id = _tk_root_instance_for_processing.after(
GLOBAL_LOG_QUEUE_POLL_INTERVAL_MS, _process_global_log_queue
)
def setup_basic_logging( # MODIFIED: Renamed from setup_logging
root_tk_instance_for_processor: Optional[
tk.Tk
], # MODIFIED: Now mandatory for processor
logging_config_dict: Optional[Dict[str, Any]] = None,
):
global _global_log_queue, _actual_console_handler, _actual_file_handler
global _logging_system_active, _tk_root_instance_for_processing
global _log_processor_after_id, _base_formatter, _actual_tkinter_handler # _actual_tkinter_handler should be None here
# This function should only be called ONCE.
if _logging_system_active:
print(
"WARNING: setup_basic_logging called but logging system is already active. Ignoring call.",
flush=True,
)
return
print("INFO: Configuring basic centralized queued logging system...", flush=True)
_actual_tkinter_handler = None # Ensure it's None at basic setup
if logging_config_dict is None:
print(
"Warning: No logging_config_dict. Using basic console-only defaults for queued logging.",
flush=True,
)
# Simplified default, as Tkinter parts are handled by add_tkinter_handler
logging_config_dict = {
"default_root_level": logging.INFO,
"specific_levels": {},
"format": "%(asctime)s [%(levelname)-8s] %(name)-25s : %(message)s",
"date_format": "%Y-%m-%d %H:%M:%S",
"enable_console": True,
"enable_file": False,
}
root_log_level = logging_config_dict.get("default_root_level", logging.INFO)
specific_levels = logging_config_dict.get("specific_levels", {})
log_format_str = logging_config_dict.get(
"format", "%(asctime)s [%(levelname)-8s] %(name)-25s : %(message)s"
)
log_date_format_str = logging_config_dict.get("date_format", "%Y-%m-%d %H:%M:%S")
enable_console = logging_config_dict.get("enable_console", True)
enable_file = logging_config_dict.get("enable_file", False)
_base_formatter = logging.Formatter(log_format_str, datefmt=log_date_format_str)
_global_log_queue = Queue()
_tk_root_instance_for_processing = root_tk_instance_for_processor
root_logger = logging.getLogger()
for handler in root_logger.handlers[
:
]: # Clear any pre-existing handlers (e.g. from basicConfig)
try:
handler.close()
root_logger.removeHandler(handler)
except Exception:
pass
root_logger.handlers = []
root_logger.setLevel(root_log_level)
print(
f"INFO: Root logger level set to {logging.getLevelName(root_logger.level)}.",
flush=True,
)
for logger_name, level in specific_levels.items():
named_logger = logging.getLogger(logger_name)
named_logger.setLevel(level)
named_logger.propagate = True
print(
f"INFO: Logger '{logger_name}' level set to {logging.getLevelName(named_logger.level)}.",
flush=True,
)
if enable_console:
try:
_actual_console_handler = logging.StreamHandler()
_actual_console_handler.setFormatter(_base_formatter)
_actual_console_handler.setLevel(logging.DEBUG)
print("INFO: Actual ConsoleHandler created.", flush=True)
except Exception as e:
print(f"ERROR: Failed to create actual ConsoleHandler: {e}", flush=True)
_actual_console_handler = None
if enable_file:
try:
file_path = logging_config_dict.get("file_path", "flight_monitor_app.log")
file_max_bytes = logging_config_dict.get("file_max_bytes", 5 * 1024 * 1024)
file_backup_count = logging_config_dict.get("file_backup_count", 3)
_actual_file_handler = logging.handlers.RotatingFileHandler(
file_path,
maxBytes=file_max_bytes,
backupCount=file_backup_count,
encoding="utf-8",
)
_actual_file_handler.setFormatter(_base_formatter)
_actual_file_handler.setLevel(logging.DEBUG)
print(
f"INFO: Actual RotatingFileHandler created for '{file_path}'.",
flush=True,
)
except Exception as e:
print(
f"ERROR: Failed to create actual FileHandler for '{file_path}': {e}",
flush=True,
)
_actual_file_handler = None
if _global_log_queue is not None:
queue_putter = QueuePuttingHandler(handler_queue=_global_log_queue)
queue_putter.setLevel(logging.DEBUG)
root_logger.addHandler(queue_putter)
print(f"INFO: QueuePuttingHandler added to root logger.", flush=True)
else:
print(
"CRITICAL ERROR: Global log queue not initialized in basic setup!",
flush=True,
)
_logging_system_active = True
if (
_tk_root_instance_for_processing
and _tk_root_instance_for_processing.winfo_exists()
):
if _log_processor_after_id: # Should be None here, but defensive
try:
_tk_root_instance_for_processing.after_cancel(_log_processor_after_id)
except Exception:
pass
_log_processor_after_id = _tk_root_instance_for_processing.after(
GLOBAL_LOG_QUEUE_POLL_INTERVAL_MS, _process_global_log_queue
)
print(
f"INFO: Global log queue processor scheduled (ID: {_log_processor_after_id}).",
flush=True,
)
elif not _tk_root_instance_for_processing:
print(
"WARNING: No Tkinter root instance for processor. Logs will queue but not be dispatched by this setup function alone.",
flush=True,
)
print("INFO: Basic centralized queued logging system setup complete.", flush=True)
def add_tkinter_handler( # NEW FUNCTION # NEW FUNCTION
gui_log_widget: tk.Text,
root_tk_instance_for_gui_handler: tk.Tk,
logging_config_dict: Dict[str, Any],
):
global _actual_tkinter_handler, _base_formatter
if not _logging_system_active:
print(
"ERROR: Cannot add Tkinter handler, basic logging system not active.",
flush=True,
)
return
if not _base_formatter:
print("ERROR: Cannot add Tkinter handler, base formatter not set.", flush=True)
return
print("INFO: Attempting to add TkinterTextHandler...", flush=True)
if _actual_tkinter_handler: # If one already exists, close and prepare to replace
print(
"INFO: Existing TkinterTextHandler found, closing it before adding new one.",
flush=True,
)
try:
_actual_tkinter_handler.close()
except Exception as e_close_old_tk:
print(
f"Warning: Error closing old TkinterTextHandler: {e_close_old_tk}",
flush=True,
)
_actual_tkinter_handler = None
is_widget_valid = (
isinstance(gui_log_widget, (tk.Text, ScrolledText))
and hasattr(gui_log_widget, "winfo_exists")
and gui_log_widget.winfo_exists()
)
is_root_valid = (
root_tk_instance_for_gui_handler
and hasattr(root_tk_instance_for_gui_handler, "winfo_exists")
and root_tk_instance_for_gui_handler.winfo_exists()
)
if is_widget_valid and is_root_valid:
try:
level_colors = logging_config_dict.get("colors", {})
tkinter_handler_poll_ms = logging_config_dict.get(
"queue_poll_interval_ms", 100
)
_actual_tkinter_handler = TkinterTextHandler(
text_widget=gui_log_widget,
level_colors=level_colors,
root_tk_instance_for_widget_update=root_tk_instance_for_gui_handler,
internal_poll_interval_ms=tkinter_handler_poll_ms,
)
_actual_tkinter_handler.setFormatter(_base_formatter)
_actual_tkinter_handler.setLevel(logging.DEBUG)
print(
"INFO: TkinterTextHandler added and configured successfully.",
flush=True,
)
except Exception as e:
print(
f"ERROR: Failed to create and add TkinterTextHandler: {e}", flush=True
)
_actual_tkinter_handler = None
else:
if not is_widget_valid:
print(
"ERROR: GUI log widget invalid or non-existent, cannot add TkinterTextHandler.",
flush=True,
)
if not is_root_valid:
print(
"ERROR: Root Tk instance for GUI handler invalid or non-existent.",
flush=True,
)
def get_logger(name: str) -> logging.Logger:
"""
Retrieves a logger instance by name.
Module-level loggers should use `get_logger(__name__)`.
"""
return logging.getLogger(name)
def shutdown_logging_system():
"""
Shuts down the centralized logging system, processing remaining logs
and closing actual handlers.
"""
global _logging_system_active, _log_processor_after_id
global _actual_console_handler, _actual_file_handler, _actual_tkinter_handler
global _global_log_queue, _tk_root_instance_for_processing
if (
not _logging_system_active and not _global_log_queue
): # Extra check if it was never really active
print(
"INFO: Logging system shutdown called, but system was not fully active or already shut down.",
flush=True,
)
return
print("INFO: Initiating shutdown of centralized logging system...", flush=True)
_logging_system_active = False
if (
_log_processor_after_id
and _tk_root_instance_for_processing
and _tk_root_instance_for_processing.winfo_exists()
):
try:
_tk_root_instance_for_processing.after_cancel(_log_processor_after_id)
# print("INFO: Cancelled global log queue processor task.", flush=True) # Reduced verbosity
except Exception as e_cancel:
print(
f"Warning: Error cancelling log processor task: {e_cancel}", flush=True
)
_log_processor_after_id = None
# print("INFO: Processing any remaining logs in the global queue before full shutdown...", flush=True) # Reduced
if _global_log_queue:
final_processed_count = 0
while not _global_log_queue.empty():
try:
record = _global_log_queue.get_nowait()
final_processed_count += 1
if _actual_console_handler:
_actual_console_handler.handle(record)
if _actual_file_handler:
_actual_file_handler.handle(record)
if _actual_tkinter_handler:
_actual_tkinter_handler.handle(record)
_global_log_queue.task_done()
except QueueEmpty:
break
except Exception as e_final_proc:
print(f"Error during final log processing: {e_final_proc}", flush=True)
break
# if final_processed_count > 0: print(f"INFO: Processed {final_processed_count} remaining log messages.", flush=True)
# else: print("INFO: No remaining log messages in global queue to process.", flush=True)
if _actual_tkinter_handler:
try:
# print("INFO: Closing actual TkinterTextHandler.", flush=True) # Reduced
_actual_tkinter_handler.close()
except Exception as e:
print(f"Error closing TkinterTextHandler: {e}", flush=True)
_actual_tkinter_handler = None
if _actual_console_handler:
try:
# print("INFO: Closing actual ConsoleHandler.", flush=True) # Reduced
_actual_console_handler.close()
except Exception as e:
print(f"Error closing ConsoleHandler: {e}", flush=True)
_actual_console_handler = None
if _actual_file_handler:
try:
# print("INFO: Closing actual FileHandler.", flush=True) # Reduced
_actual_file_handler.close()
except Exception as e:
print(f"Error closing FileHandler: {e}", flush=True)
_actual_file_handler = None
root_logger = logging.getLogger()
for handler in root_logger.handlers[:]:
try:
if isinstance(handler, QueuePuttingHandler):
handler.close()
root_logger.removeHandler(handler)
# print(f"INFO: Removed and closed QueuePuttingHandler: {handler}", flush=True) # Reduced
except Exception as e_rem_qph:
print(f"Error removing QueuePuttingHandler: {e_rem_qph}", flush=True)
_global_log_queue = None
_tk_root_instance_for_processing = None
_base_formatter = None
print("INFO: Centralized logging system shutdown complete.", flush=True)