refactor gui and add logger

This commit is contained in:
VALLONGOL 2025-09-30 10:58:10 +02:00
parent 5bad020362
commit 96279fb220
3 changed files with 256 additions and 71 deletions

View File

@ -7,7 +7,9 @@ widgets, and coordinating the simulation and plotting managers.
import tkinter as tk import tkinter as tk
from tkinter import ttk, messagebox, simpledialog from tkinter import ttk, messagebox, simpledialog
from tkinter.scrolledtext import ScrolledText
import queue import queue
import logging
import numpy as np import numpy as np
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
@ -17,9 +19,10 @@ from matplotlib.figure import Figure
from ..core.simulation_engine import RadarConfig, AntennaConfig, ScanConfig, Target from ..core.simulation_engine import RadarConfig, AntennaConfig, ScanConfig, Target
from ..utils import radar_math, config_manager from ..utils import radar_math, config_manager
# Import new manager classes # Import new manager classes and logger
from .plot_manager import PlotManager from .plot_manager import PlotManager
from .simulation_manager import SimulationManager from .simulation_manager import SimulationManager
from ..utils import logger
# --- Helper Dialog for Adding/Editing Targets --- # --- Helper Dialog for Adding/Editing Targets ---
class AddTargetDialog(tk.Toplevel): class AddTargetDialog(tk.Toplevel):
@ -79,16 +82,32 @@ class App(tk.Tk):
# --- Main Layout --- # --- Main Layout ---
main_paned_window = ttk.PanedWindow(self, orient=tk.HORIZONTAL) main_paned_window = ttk.PanedWindow(self, orient=tk.HORIZONTAL)
main_paned_window.pack(fill=tk.BOTH, expand=True) main_paned_window.pack(fill=tk.BOTH, expand=True)
# --- Left Column (Notebook and Logs) ---
left_column_frame = ttk.Frame(main_paned_window, width=600) left_column_frame = ttk.Frame(main_paned_window, width=600)
main_paned_window.add(left_column_frame, weight=1) main_paned_window.add(left_column_frame, weight=1)
right_frame = ttk.Frame(main_paned_window)
main_paned_window.add(right_frame, weight=2)
left_paned_window = ttk.PanedWindow(left_column_frame, orient=tk.VERTICAL) left_paned_window = ttk.PanedWindow(left_column_frame, orient=tk.VERTICAL)
left_paned_window.pack(fill=tk.BOTH, expand=True) left_paned_window.pack(fill=tk.BOTH, expand=True)
notebook_frame = ttk.Frame(left_paned_window, height=450)
left_paned_window.add(notebook_frame, weight=1) notebook_frame = ttk.Frame(left_paned_window, height=500)
bottom_left_frame = ttk.Frame(left_paned_window) left_paned_window.add(notebook_frame, weight=2)
left_paned_window.add(bottom_left_frame, weight=1)
log_frame = ttk.LabelFrame(left_paned_window, text="Log", height=200)
left_paned_window.add(log_frame, weight=1)
# --- Right Column (RD and PPI plots) ---
right_column_frame = ttk.Frame(main_paned_window)
main_paned_window.add(right_column_frame, weight=3)
right_paned_window = ttk.PanedWindow(right_column_frame, orient=tk.VERTICAL)
right_paned_window.pack(fill=tk.BOTH, expand=True)
rd_frame = ttk.LabelFrame(right_paned_window, text="Range-Doppler Map")
right_paned_window.add(rd_frame, weight=1)
ppi_frame = ttk.Frame(right_paned_window)
right_paned_window.add(ppi_frame, weight=1)
# --- Initialize UI Variables --- # --- Initialize UI Variables ---
self._init_vars() self._init_vars()
@ -107,15 +126,20 @@ class App(tk.Tk):
self._populate_target_tab(self.target_tab) self._populate_target_tab(self.target_tab)
self._populate_scenario_tab(self.scenario_tab) self._populate_scenario_tab(self.scenario_tab)
# --- Setup Logging Area ---
self.log_widget = ScrolledText(log_frame, state=tk.DISABLED, wrap=tk.WORD, font=("TkDefaultFont", 9))
self.log_widget.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self._init_logging()
# --- Setup Plotting Area --- # --- Setup Plotting Area ---
ppi_plot_frame = ttk.LabelFrame(bottom_left_frame, text="PPI Display") ppi_plot_frame = ttk.LabelFrame(ppi_frame, text="PPI Display")
ppi_plot_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(5, 0), pady=5) ppi_plot_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(5, 0), pady=5)
scan_controls_frame = ttk.LabelFrame(bottom_left_frame, text="Scan & PPI Control") scan_controls_frame = ttk.LabelFrame(ppi_frame, text="Scan & PPI Control")
scan_controls_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 5), pady=5) scan_controls_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(5, 5), pady=5)
self._populate_scan_ppi_controls(scan_controls_frame) self._populate_scan_ppi_controls(scan_controls_frame)
self.rd_figure = Figure(figsize=(8, 6), dpi=100, facecolor='#3a3a3a') self.rd_figure = Figure(figsize=(8, 6), dpi=100, facecolor='#3a3a3a')
self.rd_canvas = FigureCanvasTkAgg(self.rd_figure, master=right_frame) self.rd_canvas = FigureCanvasTkAgg(self.rd_figure, master=rd_frame)
self.rd_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) self.rd_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
self.ppi_figure = Figure(figsize=(4, 4), dpi=100, facecolor='#3a3a3a') self.ppi_figure = Figure(figsize=(4, 4), dpi=100, facecolor='#3a3a3a')
@ -130,10 +154,31 @@ class App(tk.Tk):
self.toggle_amplitude_controls() self.toggle_amplitude_controls()
self.update_derived_parameters() self.update_derived_parameters()
self.protocol("WM_DELETE_WINDOW", self.on_closing) self.protocol("WM_DELETE_WINDOW", self.on_closing)
self.log.info("Application initialized successfully.")
def _init_logging(self):
logging_config = {
"default_root_level": logging.INFO,
"format": "%(asctime)s [%(levelname)-8s] %(name)-20s: %(message)s",
"date_format": "%H:%M:%S",
"enable_console": True,
"colors": {
logging.DEBUG: "gray",
logging.INFO: "black",
logging.WARNING: "orange",
logging.ERROR: "red",
logging.CRITICAL: "red",
}
}
logger.setup_basic_logging(self, logging_config)
logger.add_tkinter_handler(self.log_widget, logging_config)
self.log = logger.get_logger(__name__)
def on_closing(self): def on_closing(self):
self.log.info("Shutdown sequence initiated.")
if self.simulation_manager.is_running(): if self.simulation_manager.is_running():
self.stop_simulation() self.stop_simulation()
logger.shutdown_logging_system()
self.destroy() self.destroy()
def _init_vars(self): def _init_vars(self):
@ -288,11 +333,13 @@ class App(tk.Tk):
def start_simulation(self): def start_simulation(self):
if self.simulation_manager.is_running(): if self.simulation_manager.is_running():
return return
self.log.info("Starting simulation...")
self.generate_button.config(state=tk.DISABLED) self.generate_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL) self.stop_button.config(state=tk.NORMAL)
radar_cfg = self.get_radar_config_from_gui() radar_cfg = self.get_radar_config_from_gui()
targets = self.get_targets_from_gui() targets = self.get_targets_from_gui()
if not targets: if not targets:
self.log.warning("Simulation start requested but no targets are defined.")
messagebox.showwarning("No Targets", "Please add at least one target to simulate.", parent=self) messagebox.showwarning("No Targets", "Please add at least one target to simulate.", parent=self)
self.stop_simulation() self.stop_simulation()
return return
@ -305,6 +352,7 @@ class App(tk.Tk):
self.after(100, self._check_simulation_queue) self.after(100, self._check_simulation_queue)
def stop_simulation(self): def stop_simulation(self):
self.log.info("Stopping simulation...")
self.simulation_manager.stop() self.simulation_manager.stop()
self.generate_button.config(state=tk.NORMAL) self.generate_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED) self.stop_button.config(state=tk.DISABLED)
@ -318,6 +366,7 @@ class App(tk.Tk):
try: try:
frame_data = self.simulation_manager.data_queue.get_nowait() frame_data = self.simulation_manager.data_queue.get_nowait()
if frame_data is None: if frame_data is None:
self.log.info("Simulation thread finished.")
self.stop_simulation() self.stop_simulation()
return return
radar_cfg = self.get_radar_config_from_gui() radar_cfg = self.get_radar_config_from_gui()
@ -350,24 +399,9 @@ class App(tk.Tk):
self.plot_manager.redraw_ppi_targets(self.get_targets_from_gui()) self.plot_manager.redraw_ppi_targets(self.get_targets_from_gui())
def get_radar_config_from_gui(self) -> RadarConfig: def get_radar_config_from_gui(self) -> RadarConfig:
antenna_cfg = AntennaConfig( antenna_cfg = AntennaConfig(beamwidth_az_deg=self.vars["beamwidth_az_deg"].get(), beamwidth_el_deg=self.vars["beamwidth_el_deg"].get())
beamwidth_az_deg=self.vars["beamwidth_az_deg"].get(), scan_cfg = ScanConfig(mode=self.vars["scan_mode"].get(), min_az_deg=self.vars["min_az_deg"].get(), max_az_deg=self.vars["max_az_deg"].get(), scan_speed_deg_s=self.vars["scan_speed_deg_s"].get())
beamwidth_el_deg=self.vars["beamwidth_el_deg"].get() return RadarConfig(carrier_frequency=self.vars["carrier_frequency"].get(), prf=self.vars["prf"].get(), duty_cycle=self.vars["duty_cycle"].get(), sample_rate=self.vars["sample_rate"].get(), antenna_config=antenna_cfg, scan_config=scan_cfg)
)
scan_cfg = ScanConfig(
mode=self.vars["scan_mode"].get(),
min_az_deg=self.vars["min_az_deg"].get(),
max_az_deg=self.vars["max_az_deg"].get(),
scan_speed_deg_s=self.vars["scan_speed_deg_s"].get()
)
return RadarConfig(
carrier_frequency=self.vars["carrier_frequency"].get(),
prf=self.vars["prf"].get(),
duty_cycle=self.vars["duty_cycle"].get(),
sample_rate=self.vars["sample_rate"].get(),
antenna_config=antenna_cfg,
scan_config=scan_cfg
)
def get_targets_from_gui(self) -> list[Target]: def get_targets_from_gui(self) -> list[Target]:
targets = [] targets = []
@ -380,6 +414,7 @@ class App(tk.Tk):
rcs = float_values[6] rcs = float_values[6]
targets.append(Target(initial_position=pos, velocity=vel, rcs=rcs)) targets.append(Target(initial_position=pos, velocity=vel, rcs=rcs))
except (ValueError, IndexError) as e: except (ValueError, IndexError) as e:
self.log.error(f"Skipping invalid target data: {values}. Error: {e}")
messagebox.showwarning("Invalid Data", f"Skipping invalid target data: {values}. Error: {e}", parent=self) messagebox.showwarning("Invalid Data", f"Skipping invalid target data: {values}. Error: {e}", parent=self)
return targets return targets
@ -389,7 +424,7 @@ class App(tk.Tk):
max_vel = radar_math.calculate_max_unambiguous_velocity(radar_cfg.carrier_frequency, radar_cfg.prf) max_vel = radar_math.calculate_max_unambiguous_velocity(radar_cfg.carrier_frequency, radar_cfg.prf)
report.append(f"--- Radar Configuration Analysis ---") report.append(f"--- Radar Configuration Analysis ---")
report.append(f"Max Unambiguous Range: {max_range:,.0f} m") report.append(f"Max Unambiguous Range: {max_range:,.0f} m")
report.append(f"Max Unambiguous Velocity: \u00b1{max_vel:,.1f} m/s") report.append(f"Max Unambiguous Velocity: b1{max_vel:,.1f} m/s")
report.append(f"Current Antenna Azimuth: {current_az_deg:,.1f} deg") report.append(f"Current Antenna Azimuth: {current_az_deg:,.1f} deg")
if radar_cfg.scan_config.mode != 'staring' and radar_cfg.scan_config.scan_speed_deg_s > 0: if radar_cfg.scan_config.mode != 'staring' and radar_cfg.scan_config.scan_speed_deg_s > 0:
dwell_time = radar_math.calculate_dwell_time(radar_cfg.antenna_config.beamwidth_az_deg, radar_cfg.scan_config.scan_speed_deg_s) dwell_time = radar_math.calculate_dwell_time(radar_cfg.antenna_config.beamwidth_az_deg, radar_cfg.scan_config.scan_speed_deg_s)
@ -420,13 +455,12 @@ class App(tk.Tk):
scan_speed = self.vars["scan_speed_deg_s"].get() scan_speed = self.vars["scan_speed_deg_s"].get()
pri = 1.0 / prf pri = 1.0 / prf
pulse_width = pri * (duty_cycle / 100.0) pulse_width = pri * (duty_cycle / 100.0)
listening_time = pri
self.vars["pulse_width_text"].set(f"Pulse Width: {pulse_width * 1e6:, .2f} µs") self.vars["pulse_width_text"].set(f"Pulse Width: {pulse_width * 1e6:, .2f} µs")
self.vars["listening_time_text"].set(f"Listening Window: {listening_time * 1e6:,.2f} µs (Max Range Time)") self.vars["listening_time_text"].set(f"Listening Window: {pri * 1e6:, .2f} µs (Max Range Time)")
max_range = radar_math.calculate_max_unambiguous_range(prf) max_range = radar_math.calculate_max_unambiguous_range(prf)
max_vel = radar_math.calculate_max_unambiguous_velocity(carrier_freq, prf) max_vel = radar_math.calculate_max_unambiguous_velocity(carrier_freq, prf)
self.vars["max_range_text"].set(f"Max Unambiguous Range: {max_range:, .0f} m") self.vars["max_range_text"].set(f"Max Unambiguous Range: {max_range:, .0f} m")
self.vars["max_velocity_text"].set(f"Max Unambiguous Velocity: \u00b1{max_vel:,.1f} m/s") self.vars["max_velocity_text"].set(f"Max Unambiguous Velocity: b1{max_vel:, .1f} m/s")
if scan_mode == 'staring': if scan_mode == 'staring':
self.vars["dwell_time_text"].set("Dwell Time: N/A (Staring)") self.vars["dwell_time_text"].set("Dwell Time: N/A (Staring)")
self.vars["pulses_on_target_text"].set("Pulses on Target: N/A (Staring)") self.vars["pulses_on_target_text"].set("Pulses on Target: N/A (Staring)")
@ -437,32 +471,27 @@ class App(tk.Tk):
self.vars["pulses_on_target_text"].set(f"Pulses on Target: {pulses_on_target:,}") self.vars["pulses_on_target_text"].set(f"Pulses on Target: {pulses_on_target:,}")
self.check_target_warnings() self.check_target_warnings()
self._update_scan_info_text() self._update_scan_info_text()
except (tk.TclError, ValueError, ZeroDivisionError): except (tk.TclError, ValueError, ZeroDivisionError): pass
pass
def _update_scan_info_text(self): def _update_scan_info_text(self):
mode = self.vars['scan_mode'].get() mode = self.vars['scan_mode'].get()
if mode == 'staring': if mode == 'staring': self.vars['scan_info_text'].set("Mode: Staring")
self.vars['scan_info_text'].set("Mode: Staring")
else: else:
min_az = self.vars['min_az_deg'].get() min_az, max_az, speed = self.vars['min_az_deg'].get(), self.vars['max_az_deg'].get(), self.vars['scan_speed_deg_s'].get()
max_az = self.vars['max_az_deg'].get()
speed = self.vars['scan_speed_deg_s'].get()
self.vars['scan_info_text'].set(f"Mode: Sector Scan\nAz: [{min_az}°, {max_az}°] @ {speed}°/s") self.vars['scan_info_text'].set(f"Mode: Sector Scan\nAz: [{min_az}°, {max_az}°] @ {speed}°/s")
def check_target_warnings(self): def check_target_warnings(self):
if not hasattr(self, 'target_table') or not hasattr(self, 'max_range_label'): if not hasattr(self, 'target_table'): return
return
try: try:
prf = self.vars["prf"].get() prf, carrier_freq = self.vars["prf"].get(), self.vars["carrier_frequency"].get()
carrier_freq = self.vars["carrier_frequency"].get()
max_range = radar_math.calculate_max_unambiguous_range(prf) max_range = radar_math.calculate_max_unambiguous_range(prf)
max_vel = radar_math.calculate_max_unambiguous_velocity(carrier_freq, prf) max_vel = radar_math.calculate_max_unambiguous_velocity(carrier_freq, prf)
range_warning, vel_warning = False, False range_warning, vel_warning = False, False
for item in self.target_table.get_children(): for item in self.target_table.get_children():
values = [float(v) for v in self.target_table.item(item)['values']] values = [float(v) for v in self.target_table.item(item)['values']]
target_initial_range = np.linalg.norm(values[0:3]) target_initial_range = np.linalg.norm(values[0:3])
target_radial_vel = np.dot(values[3:6], values[0:3] / target_initial_range) if target_initial_range > 0 else 0 if target_initial_range > 0: target_radial_vel = np.dot(values[3:6], values[0:3] / target_initial_range)
else: target_radial_vel = 0
if target_initial_range > max_range: range_warning = True if target_initial_range > max_range: range_warning = True
if abs(target_radial_vel) > max_vel: vel_warning = True if abs(target_radial_vel) > max_vel: vel_warning = True
self.max_range_label.config(foreground='orange' if range_warning else 'black') self.max_range_label.config(foreground='orange' if range_warning else 'black')
@ -474,33 +503,26 @@ class App(tk.Tk):
def open_add_target_dialog(self): def open_add_target_dialog(self):
dialog = AddTargetDialog(self) dialog = AddTargetDialog(self)
result = dialog.show() result = dialog.show()
if result: if result: self.add_target_to_table(result)
self.add_target_to_table(result)
def on_target_double_click(self, event): def on_target_double_click(self, event):
selected_item = self.target_table.selection() selected_item = self.target_table.selection()
if not selected_item: return if not selected_item: return
item_data = self.target_table.item(selected_item[0])['values'] item_data = self.target_table.item(selected_item[0])['values']
target_data = { target_data = { "pos_x": float(item_data[0]), "pos_y": float(item_data[1]), "pos_z": float(item_data[2]), "vel_x": float(item_data[3]), "vel_y": float(item_data[4]), "vel_z": float(item_data[5]), "rcs": float(item_data[6]) }
"pos_x": float(item_data[0]), "pos_y": float(item_data[1]), "pos_z": float(item_data[2]),
"vel_x": float(item_data[3]), "vel_y": float(item_data[4]), "vel_z": float(item_data[5]),
"rcs": float(item_data[6])
}
dialog = AddTargetDialog(self, target_data=target_data) dialog = AddTargetDialog(self, target_data=target_data)
result = dialog.show() result = dialog.show()
if result: if result:
self.target_table.item(selected_item[0], values=[f"{v:.2f}" for v in result.values()]) self.target_table.item(selected_item[0], values=[f"{v:.2f}" for v in result.values()])
self.check_target_warnings() self.check_target_warnings()
if self.plot_manager: if self.plot_manager: self.plot_manager.redraw_ppi_targets(self.get_targets_from_gui())
self.plot_manager.redraw_ppi_targets(self.get_targets_from_gui())
def on_profile_select(self, event=None): def on_profile_select(self, event=None):
profile_name = self.selected_profile.get() profile_name = self.selected_profile.get()
if profile_name in self.profiles: if profile_name in self.profiles:
profile_data = self.profiles[profile_name] profile_data = self.profiles[profile_name]
for key, value in profile_data.items(): for key, value in profile_data.items():
if key in self.vars: if key in self.vars: self.vars[key].set(value)
self.vars[key].set(value)
self.update_scan_mode_controls() self.update_scan_mode_controls()
messagebox.showinfo("Profile Loaded", f"Profile '{profile_name}' has been loaded.", parent=self) messagebox.showinfo("Profile Loaded", f"Profile '{profile_name}' has been loaded.", parent=self)
@ -520,8 +542,7 @@ class App(tk.Tk):
def delete_profile(self): def delete_profile(self):
profile_name = self.selected_profile.get() profile_name = self.selected_profile.get()
if not profile_name: if not profile_name: messagebox.showwarning("No Profile Selected", "Please select a profile to delete.", parent=self); return
messagebox.showwarning("No Profile Selected", "Please select a profile to delete.", parent=self); return
if messagebox.askyesno("Delete Profile", f"Are you sure you want to delete the profile '{profile_name}'?", parent=self): if messagebox.askyesno("Delete Profile", f"Are you sure you want to delete the profile '{profile_name}'?", parent=self):
if profile_name in self.profiles: if profile_name in self.profiles:
del self.profiles[profile_name] del self.profiles[profile_name]
@ -536,19 +557,17 @@ class App(tk.Tk):
def toggle_amplitude_controls(self): def toggle_amplitude_controls(self):
state = tk.DISABLED if self.vars["auto_scale"].get() else tk.NORMAL state = tk.DISABLED if self.vars["auto_scale"].get() else tk.NORMAL
self.min_db_spinbox.config(state=state) if hasattr(self, 'min_db_spinbox'): self.min_db_spinbox.config(state=state)
self.max_db_spinbox.config(state=state) if hasattr(self, 'max_db_spinbox'): self.max_db_spinbox.config(state=state)
def update_scan_mode_controls(self, *args): def update_scan_mode_controls(self, *args):
mode = self.vars["scan_mode"].get() mode = self.vars["scan_mode"].get()
state = tk.NORMAL if mode != 'staring' else tk.DISABLED state = tk.NORMAL if mode != 'staring' else tk.DISABLED
if mode == 'staring': if mode == 'staring': self.vars["scan_speed_deg_s"].set(0.0001)
self.vars["scan_speed_deg_s"].set(0.0001) elif self.vars["scan_speed_deg_s"].get() <= 0.0001: self.vars["scan_speed_deg_s"].set(20.0)
elif self.vars["scan_speed_deg_s"].get() <= 0.0001: if hasattr(self, 'min_az_spinbox'): self.min_az_spinbox.config(state=state)
self.vars["scan_speed_deg_s"].set(20.0) if hasattr(self, 'max_az_spinbox'): self.max_az_spinbox.config(state=state)
self.min_az_spinbox.config(state=state) if hasattr(self, 'scan_speed_spinbox'): self.scan_speed_spinbox.config(state=state)
self.max_az_spinbox.config(state=state)
self.scan_speed_spinbox.config(state=state)
self.update_derived_parameters() self.update_derived_parameters()
def start_gui(): def start_gui():

View File

@ -119,6 +119,13 @@ class PlotManager:
self.im_rd.set_data(range_doppler_map_db) self.im_rd.set_data(range_doppler_map_db)
self.im_rd.set_clim(vmin=vmin, vmax=vmax) self.im_rd.set_clim(vmin=vmin, vmax=vmax)
# Re-create colorbar to prevent state issues
if self.cbar_rd:
self.cbar_rd.remove()
self.cbar_rd = self.rd_figure.colorbar(self.im_rd, ax=self.ax_rd)
self.cbar_rd.ax.tick_params(colors='white', labelsize=8)
self.cbar_rd.set_label('Amplitude (dB)', color='white', fontsize=8)
doppler_freq_axis = np.fft.fftshift(np.fft.fftfreq(iq_data_cpi.shape[0], d=1.0/radar_cfg.prf)) doppler_freq_axis = np.fft.fftshift(np.fft.fftfreq(iq_data_cpi.shape[0], d=1.0/radar_cfg.prf))
velocity_axis = doppler_freq_axis * (c / radar_cfg.carrier_frequency) / 2 velocity_axis = doppler_freq_axis * (c / radar_cfg.carrier_frequency) / 2
range_axis_samples = iq_data_cpi.shape[1] range_axis_samples = iq_data_cpi.shape[1]

View File

@ -0,0 +1,159 @@
# FlightMonitor/utils/logger.py
import logging
import logging.handlers # For RotatingFileHandler
import tkinter as tk
from tkinter.scrolledtext import ScrolledText
from queue import Queue, Empty as QueueEmpty
from typing import Optional, Dict, Any
# --- Module-level globals for the centralized logging queue system ---
_global_log_queue: Optional[Queue[logging.LogRecord]] = None
_actual_console_handler: Optional[logging.StreamHandler] = None
_actual_file_handler: Optional[logging.handlers.RotatingFileHandler] = None
_actual_tkinter_handler: Optional["TkinterTextHandler"] = None
_log_processor_after_id: Optional[str] = None
_logging_system_active: bool = False
_tk_root_instance_for_processing: Optional[tk.Tk] = None
_base_formatter: Optional[logging.Formatter] = None
GLOBAL_LOG_QUEUE_POLL_INTERVAL_MS = 100
class TkinterTextHandler(logging.Handler):
"""
A logging handler that directs log messages to a Tkinter Text widget.
This handler is called directly from the GUI thread's processing loop.
"""
def __init__(self, text_widget: tk.Text, level_colors: Dict[int, str]):
super().__init__()
self.text_widget = text_widget
self.level_colors = level_colors
self._configure_tags()
def _configure_tags(self):
for level, color_value in self.level_colors.items():
level_name = logging.getLevelName(level)
if color_value:
try:
self.text_widget.tag_config(level_name, foreground=color_value)
except tk.TclError:
pass # Widget might not be ready
def emit(self, record: logging.LogRecord):
try:
if not self.text_widget.winfo_exists():
return
msg = self.format(record)
level_name = record.levelname
self.text_widget.configure(state=tk.NORMAL)
self.text_widget.insert(tk.END, msg + "\n", (level_name,))
self.text_widget.configure(state=tk.DISABLED)
self.text_widget.see(tk.END)
except Exception as e:
print(f"Error in TkinterTextHandler.emit: {e}", flush=True)
class QueuePuttingHandler(logging.Handler):
"""
A simple handler that puts any received LogRecord into a global queue.
"""
def __init__(self, handler_queue: Queue[logging.LogRecord]):
super().__init__()
self.handler_queue = handler_queue
def emit(self, record: logging.LogRecord):
self.handler_queue.put_nowait(record)
def _process_global_log_queue():
"""
GUI Thread: Periodically processes LogRecords from the _global_log_queue
and dispatches them to the actual configured handlers.
"""
global _logging_system_active, _log_processor_after_id
if not _logging_system_active or not _tk_root_instance_for_processing or not _tk_root_instance_for_processing.winfo_exists():
return
try:
while _global_log_queue and not _global_log_queue.empty():
record = _global_log_queue.get_nowait()
if _actual_console_handler:
_actual_console_handler.handle(record)
if _actual_file_handler:
_actual_file_handler.handle(record)
if _actual_tkinter_handler:
_actual_tkinter_handler.handle(record)
_global_log_queue.task_done()
except QueueEmpty:
pass
except Exception as e:
print(f"Error in log processing queue: {e}", flush=True)
finally:
if _logging_system_active:
_log_processor_after_id = _tk_root_instance_for_processing.after(
GLOBAL_LOG_QUEUE_POLL_INTERVAL_MS, _process_global_log_queue
)
def setup_basic_logging(root_tk_instance_for_processor: tk.Tk, logging_config_dict: Optional[Dict[str, Any]] = None):
global _global_log_queue, _actual_console_handler, _actual_file_handler, _logging_system_active
global _tk_root_instance_for_processing, _log_processor_after_id, _base_formatter
if _logging_system_active: return
if logging_config_dict is None: logging_config_dict = {}
log_format_str = logging_config_dict.get("format", "% (asctime)s [%(levelname)-8s] %(name)-25s : %(message)s")
log_date_format_str = logging_config_dict.get("date_format", "%Y-%m-%d %H:%M:%S")
_base_formatter = logging.Formatter(log_format_str, datefmt=log_date_format_str)
_global_log_queue = Queue()
_tk_root_instance_for_processing = root_tk_instance_for_processor
root_logger = logging.getLogger()
for handler in root_logger.handlers[:]: root_logger.removeHandler(handler)
root_logger.setLevel(logging_config_dict.get("default_root_level", logging.INFO))
if logging_config_dict.get("enable_console", True):
_actual_console_handler = logging.StreamHandler()
_actual_console_handler.setFormatter(_base_formatter)
_actual_console_handler.setLevel(logging.DEBUG)
queue_putter = QueuePuttingHandler(handler_queue=_global_log_queue)
queue_putter.setLevel(logging.DEBUG)
root_logger.addHandler(queue_putter)
_logging_system_active = True
_log_processor_after_id = _tk_root_instance_for_processing.after(GLOBAL_LOG_QUEUE_POLL_INTERVAL_MS, _process_global_log_queue)
def add_tkinter_handler(gui_log_widget: tk.Text, logging_config_dict: Dict[str, Any]):
global _actual_tkinter_handler, _base_formatter
if not _logging_system_active or not _base_formatter: return
if _actual_tkinter_handler: _actual_tkinter_handler.close()
if isinstance(gui_log_widget, (tk.Text, ScrolledText)) and gui_log_widget.winfo_exists():
level_colors = logging_config_dict.get("colors", {})
_actual_tkinter_handler = TkinterTextHandler(text_widget=gui_log_widget, level_colors=level_colors)
_actual_tkinter_handler.setFormatter(_base_formatter)
_actual_tkinter_handler.setLevel(logging.DEBUG)
logging.getLogger(__name__).info("Tkinter log handler added successfully.")
else:
print("ERROR: GUI log widget invalid, cannot add TkinterTextHandler.", flush=True)
def get_logger(name: str) -> logging.Logger:
return logging.getLogger(name)
def shutdown_logging_system():
global _logging_system_active, _log_processor_after_id
if not _logging_system_active: return
_logging_system_active = False
if _log_processor_after_id and _tk_root_instance_for_processing and _tk_root_instance_for_processing.winfo_exists():
_tk_root_instance_for_processing.after_cancel(_log_processor_after_id)
# Final flush of the queue
_process_global_log_queue()
logging.shutdown()