SXXXXXXX_RepoSync/reposync/gui/main_window.py
2025-07-07 09:12:52 +02:00

430 lines
19 KiB
Python

# RepoSync/gui/main_window.py
"""
Main window and application logic for the RepoSync GUI.
Organizes the UI into separate tabs for different functionalities and
manages server profiles and settings dynamically.
"""
import logging
import threading
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
from tkinter.scrolledtext import ScrolledText
from pathlib import Path
# Custom module imports
from ..config.profile_manager import ProfileManager
from .profile_dialog import ProfileDialog
from ..core.gitea_client import GiteaClient
from ..core.git_manager import GitManager
from ..core.sync_manager import SyncManager, SyncState
from ..utility import logger as logger_util
class BaseTab(ttk.Frame):
"""Base class for our application tabs to reduce code duplication."""
def __init__(self, parent, app_instance, tab_name, **kwargs):
super().__init__(parent, **kwargs)
self.app = app_instance
self.logger = logging.getLogger(tab_name)
self.active_client = None
self.selected_profile_name = tk.StringVar()
self.sync_bundle_path = None # Will be set by the main app window
def create_profile_selector(self, parent_frame, label_text):
"""Creates the profile selection Combobox and status label."""
selector_frame = ttk.Frame(parent_frame)
ttk.Label(selector_frame, text=label_text).pack(side="left", padx=(0, 5))
self.profile_combo = ttk.Combobox(
selector_frame,
textvariable=self.selected_profile_name,
state="readonly",
width=25
)
self.profile_combo.pack(side="left")
self.profile_combo.bind("<<ComboboxSelected>>", self._on_profile_select)
self.status_label = ttk.Label(selector_frame, text="Status: Not Connected", foreground="red")
self.status_label.pack(side="left", padx=10)
return selector_frame
def update_profile_list(self):
"""Updates the list of profiles in the Combobox."""
profile_names = self.app.profile_manager.get_profile_names()
self.profile_combo['values'] = sorted(profile_names)
if not profile_names:
self.selected_profile_name.set("")
self._update_status_label(False, "No profiles configured.")
def update_bundle_path(self):
"""Updates the bundle path from the main application's setting."""
new_path_str = self.app.bundle_path_var.get()
if new_path_str:
self.sync_bundle_path = Path(new_path_str)
self.logger.debug(f"Tab's bundle path set to {self.sync_bundle_path}")
else:
self.sync_bundle_path = None
def _on_profile_select(self, event=None):
"""Handles profile selection and initializes the appropriate client."""
profile_name = self.selected_profile_name.get()
if not profile_name:
self.active_client = None
self._update_status_label(False)
return
profile_data = self.app.profile_manager.get_profile(profile_name)
if not profile_data:
self.active_client = None
self.logger.error(f"Could not load data for profile '{profile_name}'.")
self._update_status_label(False, "Profile data missing.")
return
service_type = profile_data.get("type", "gitea")
if service_type == "gitea":
try:
self.active_client = GiteaClient(
api_url=profile_data["url"],
token=profile_data["token"],
logger=logging.getLogger(f"GiteaClient({profile_name})")
)
self._update_status_label(True, f"Selected: {profile_data['url']}")
except Exception as e:
self.logger.error(f"Failed to initialize GiteaClient for '{profile_name}': {e}")
self._update_status_label(False, f"Error initializing client.")
else:
self.logger.error(f"Unsupported service type '{service_type}' for profile '{profile_name}'.")
self._update_status_label(False, f"Unsupported type: {service_type}")
def _update_status_label(self, is_connected, message=""):
"""Updates the status label text and color."""
if is_connected:
self.status_label.config(text=message or "Status: Connected", foreground="green")
else:
self.status_label.config(text=message or "Status: Not Connected", foreground="red")
class ExportTab(BaseTab):
"""Tab for fetching repos from a server and exporting them to bundles."""
def __init__(self, parent, app_instance, **kwargs):
super().__init__(parent, app_instance, "ExportTab", **kwargs)
self.repositories = []
self._create_widgets()
self.update_profile_list()
def _create_widgets(self):
top_frame = ttk.Frame(self, padding="5")
top_frame.pack(side="top", fill="x")
selector_frame = self.create_profile_selector(top_frame, "Export from Server:")
selector_frame.pack(side="left")
self.fetch_button = ttk.Button(top_frame, text="Fetch Repositories", command=self._start_repo_fetch)
self.fetch_button.pack(side="left", padx=10)
self.fetch_button.state(['disabled'])
self.export_button = ttk.Button(top_frame, text="Export Selected", command=self._start_export)
self.export_button.pack(side="left", padx=5)
self.export_button.state(['disabled'])
tree_frame = ttk.Frame(self)
tree_frame.pack(expand=True, fill="both", pady=5)
columns = ("name", "description", "private")
self.repo_tree = ttk.Treeview(tree_frame, columns=columns, show="headings", selectmode="extended")
self.repo_tree.heading("name", text="Name")
self.repo_tree.heading("description", text="Description")
self.repo_tree.heading("private", text="Private")
self.repo_tree.column("name", width=200, stretch=tk.NO)
self.repo_tree.column("description", width=500)
self.repo_tree.column("private", width=80, stretch=tk.NO, anchor="center")
vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.repo_tree.yview)
self.repo_tree.configure(yscrollcommand=vsb.set)
vsb.pack(side="right", fill="y")
self.repo_tree.pack(side="left", expand=True, fill="both")
def _on_profile_select(self, event=None):
super()._on_profile_select(event)
self.fetch_button.state(['!disabled'] if self.active_client else ['disabled'])
self.export_button.state(['disabled'])
self.repo_tree.delete(*self.repo_tree.get_children())
def _start_repo_fetch(self):
if not self.active_client:
messagebox.showerror("Error", "Please select a valid server profile first.", parent=self)
return
self.logger.info(f"Fetching repository list from '{self.selected_profile_name.get()}'...")
self.fetch_button.state(['disabled'])
self.export_button.state(['disabled'])
threading.Thread(target=self._load_repositories_thread, daemon=True).start()
def _load_repositories_thread(self):
try:
self.repositories = self.active_client.get_repositories()
self.app.root.after(10, self._update_repo_treeview)
except Exception as e:
self.logger.error(f"Failed to fetch repositories: {e}", exc_info=True)
messagebox.showerror("Fetch Error", f"Failed to fetch repositories:\n{e}")
finally:
self.app.root.after(10, lambda: self.fetch_button.state(['!disabled']))
def _update_repo_treeview(self):
self.repo_tree.delete(*self.repo_tree.get_children())
for repo in self.repositories:
self.repo_tree.insert("", "end", values=(
repo["name"],
repo.get("description", ""),
"Yes" if repo.get("private") else "No"
), iid=repo["name"])
self.logger.info(f"Displayed {len(self.repositories)} repositories.")
self.export_button.state(['!disabled'] if self.repositories else ['disabled'])
def _start_export(self):
selected_iids = self.repo_tree.selection()
if not selected_iids:
messagebox.showwarning("No Selection", "Please select at least one repository to export.", parent=self)
return
if not self.sync_bundle_path:
messagebox.showerror("Configuration Error", "Bundle path is not set.", parent=self)
return
repos_to_export = [repo for repo in self.repositories if repo["name"] in selected_iids]
sync_manager = SyncManager(self.app.git_manager, self.active_client, self.sync_bundle_path, logging.getLogger("SyncManager"))
self.logger.info(f"Starting export for {len(repos_to_export)} repositories...")
self.export_button.state(['disabled'])
self.fetch_button.state(['disabled'])
threading.Thread(target=self._export_thread, args=(sync_manager, repos_to_export), daemon=True).start()
def _export_thread(self, sync_manager, repos_to_export):
try:
sync_manager.export_repositories_to_bundles(repos_to_export)
messagebox.showinfo("Success", "Export completed successfully.")
except Exception as e:
self.logger.error(f"Export failed: {e}", exc_info=True)
messagebox.showerror("Export Error", f"An error occurred: {e}")
finally:
self.app.root.after(10, lambda: self.export_button.state(['!disabled']))
self.app.root.after(10, lambda: self.fetch_button.state(['!disabled']))
class ImportTab(BaseTab):
"""Tab for comparing bundles with a server and importing updates."""
def __init__(self, parent, app_instance, **kwargs):
super().__init__(parent, app_instance, "ImportTab", **kwargs)
self.comparison_results = []
self._create_widgets()
self.update_profile_list()
def _create_widgets(self):
top_frame = ttk.Frame(self, padding="5")
top_frame.pack(side="top", fill="x")
selector_frame = self.create_profile_selector(top_frame, "Import to Server:")
selector_frame.pack(side="left")
self.check_status_button = ttk.Button(top_frame, text="Check Bundle Status", command=self._start_bundle_check)
self.check_status_button.pack(side="left", padx=10)
self.check_status_button.state(['disabled'])
self.import_button = ttk.Button(top_frame, text="Import Selected", command=self._start_import)
self.import_button.pack(side="left", padx=5)
self.import_button.state(['disabled'])
tree_frame = ttk.Frame(self)
tree_frame.pack(expand=True, fill="both", pady=5)
columns = ("name", "status")
self.bundle_tree = ttk.Treeview(tree_frame, columns=columns, show="headings", selectmode="extended")
self.bundle_tree.heading("name", text="Repository Name")
self.bundle_tree.heading("status", text="Sync Status")
self.bundle_tree.column("name", width=300)
self.bundle_tree.column("status", width=200, anchor="center")
for state in SyncState:
self.bundle_tree.tag_configure(state.name, background=self._get_color_for_state(state))
vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.bundle_tree.yview)
self.bundle_tree.configure(yscrollcommand=vsb.set)
vsb.pack(side="right", fill="y")
self.bundle_tree.pack(side="left", expand=True, fill="both")
def _get_color_for_state(self, state):
return {
SyncState.AHEAD: "lightblue", SyncState.NEW_REPO: "lightgreen",
SyncState.BEHIND: "khaki", SyncState.DIVERGED: "salmon",
SyncState.ERROR: "lightgrey", SyncState.IDENTICAL: "white"
}.get(state, "white")
def _on_profile_select(self, event=None):
super()._on_profile_select(event)
self.check_status_button.state(['!disabled'] if self.active_client else ['disabled'])
self.import_button.state(['disabled'])
self.bundle_tree.delete(*self.bundle_tree.get_children())
def _start_bundle_check(self):
if not self.active_client:
messagebox.showerror("Error", "Please select a valid server profile first.", parent=self)
return
if not self.sync_bundle_path:
messagebox.showerror("Configuration Error", "Bundle path is not set.", parent=self)
return
sync_manager = SyncManager(self.app.git_manager, self.active_client, self.sync_bundle_path, logging.getLogger("SyncManager"))
self.logger.info("Checking bundle status...")
self.check_status_button.state(['disabled'])
threading.Thread(target=self._check_status_thread, args=(sync_manager,), daemon=True).start()
def _check_status_thread(self, sync_manager):
try:
self.comparison_results = sync_manager.compare_bundles_with_remote()
self.app.root.after(10, self._update_bundle_treeview)
except Exception as e:
self.logger.error(f"Failed to compare bundle status: {e}", exc_info=True)
messagebox.showerror("Comparison Error", f"Failed to compare bundle status:\n{e}")
finally:
self.app.root.after(10, lambda: self.check_status_button.state(['!disabled']))
def _update_bundle_treeview(self):
self.bundle_tree.delete(*self.bundle_tree.get_children())
for result in self.comparison_results:
state_name = result["state"].name
self.bundle_tree.insert("", "end", values=(result["name"], state_name), tags=(state_name,), iid=result["name"])
self.import_button.state(['!disabled'] if self.comparison_results else ['disabled'])
def _start_import(self):
selected_iids = self.bundle_tree.selection()
if not selected_iids:
messagebox.showwarning("No Selection", "Please select at least one repository to import.", parent=self)
return
repos_to_import = [r for r in self.comparison_results if r["name"] in selected_iids and r["state"] in [SyncState.AHEAD, SyncState.NEW_REPO]]
if not repos_to_import:
messagebox.showwarning("Invalid Selection", "Please select repositories that are 'AHEAD' or 'NEW_REPO' to import.", parent=self)
return
sync_manager = SyncManager(self.app.git_manager, self.active_client, self.sync_bundle_path, logging.getLogger("SyncManager"))
self.logger.info(f"Starting import for {len(repos_to_import)} repositories...")
self.import_button.state(['disabled'])
self.check_status_button.state(['disabled'])
threading.Thread(target=self._import_thread, args=(sync_manager, repos_to_import), daemon=True).start()
def _import_thread(self, sync_manager, repos_to_import):
try:
sync_manager.import_repositories_from_bundles(repos_to_import)
messagebox.showinfo("Success", "Import process completed.")
except Exception as e:
self.logger.error(f"Import failed: {e}", exc_info=True)
messagebox.showerror("Import Error", f"An error occurred: {e}")
finally:
self.app.root.after(10, lambda: self.import_button.state(['!disabled']))
self.app.root.after(10, lambda: self.check_status_button.state(['!disabled']))
self.app.root.after(10, self._start_bundle_check)
class Application:
"""The main class for the RepoSync graphical user interface."""
def __init__(self):
self.root = tk.Tk()
self.root.title("RepoSync - Offline Repository Synchronization")
self.root.geometry("1024x768")
self.profile_manager = ProfileManager()
self.git_manager = None
self.bundle_path_var = tk.StringVar()
self._create_menu()
self._create_main_widgets()
self._setup_logging()
self._init_backend()
self.root.protocol("WM_DELETE_WINDOW", self._on_closing)
def _create_menu(self):
menubar = tk.Menu(self.root)
self.root.config(menu=menubar)
file_menu = tk.Menu(menubar, tearoff=0)
file_menu.add_command(label="Configure Profiles...", command=self._open_profile_dialog)
file_menu.add_separator()
file_menu.add_command(label="Exit", command=self._on_closing)
menubar.add_cascade(label="File", menu=file_menu)
def _create_main_widgets(self):
main_frame = ttk.Frame(self.root, padding="10")
main_frame.pack(expand=True, fill="both")
bundle_path_frame = ttk.LabelFrame(main_frame, text="Bundle Folder Path", padding="5")
bundle_path_frame.pack(side="top", fill="x", pady=(0, 10))
self.bundle_path_entry = ttk.Entry(bundle_path_frame, textvariable=self.bundle_path_var, state="readonly", width=100)
self.bundle_path_entry.pack(side="left", fill="x", expand=True, padx=(0, 5))
browse_button = ttk.Button(bundle_path_frame, text="Browse...", command=self._browse_bundle_path)
browse_button.pack(side="left")
paned_window = ttk.PanedWindow(main_frame, orient=tk.VERTICAL)
paned_window.pack(expand=True, fill="both")
notebook = ttk.Notebook(paned_window)
paned_window.add(notebook, weight=3)
self.export_tab = ExportTab(notebook, self, padding="10")
self.import_tab = ImportTab(notebook, self, padding="10")
notebook.add(self.export_tab, text="Export from Server")
notebook.add(self.import_tab, text="Import to Server")
log_frame = ttk.LabelFrame(paned_window, text="Logs", padding="5")
paned_window.add(log_frame, weight=1)
self.log_widget = ScrolledText(log_frame, state=tk.DISABLED, wrap=tk.WORD, height=10)
self.log_widget.pack(expand=True, fill="both")
def _browse_bundle_path(self):
"""Opens a dialog to select the bundle directory and saves the setting."""
initial_dir = self.bundle_path_var.get() or str(Path.home())
new_path = filedialog.askdirectory(title="Select Bundle Folder", initialdir=initial_dir)
if new_path:
self.bundle_path_var.set(new_path)
self.profile_manager.set_setting("bundle_path", new_path)
self.logger.info(f"Bundle path updated to: {new_path}")
self.export_tab.update_bundle_path()
self.import_tab.update_bundle_path()
def _setup_logging(self):
log_config = {"level": logging.INFO, "enable_console": True}
logger_util.setup_logging(self.log_widget, self.root, log_config)
self.logger = logging.getLogger(__name__)
def _init_backend(self):
self.logger.info("Initializing backend components...")
self.git_manager = GitManager(logger=logging.getLogger("GitManager"))
initial_bundle_path = self.profile_manager.get_setting("bundle_path")
self.bundle_path_var.set(initial_bundle_path)
self.export_tab.update_bundle_path()
self.import_tab.update_bundle_path()
self.logger.info("Backend components initialized.")
def _open_profile_dialog(self):
dialog = ProfileDialog(self.root, self.profile_manager)
self.root.wait_window(dialog)
self.update_all_profile_comboboxes()
def update_all_profile_comboboxes(self):
self.export_tab.update_profile_list()
self.import_tab.update_profile_list()
def _on_closing(self):
self.logger.info("Application is closing.")
logger_util.shutdown_logging()
self.root.destroy()
def run(self):
self.logger.info("Starting RepoSync application...")
self.root.mainloop()