751 lines
39 KiB
Python
751 lines
39 KiB
Python
# GitTool.py
|
|
import os
|
|
import shutil
|
|
import datetime
|
|
import tkinter as tk
|
|
from tkinter import messagebox
|
|
import logging
|
|
import zipfile # Import zipfile module
|
|
|
|
# Import application modules
|
|
from config_manager import ConfigManager, DEFAULT_PROFILE, DEFAULT_BACKUP_DIR
|
|
from git_commands import GitCommands, GitCommandError
|
|
from logger_config import setup_logger
|
|
from gui import MainFrame
|
|
|
|
class GitSvnSyncApp:
|
|
"""
|
|
Main application class for the Git SVN Sync Tool.
|
|
Coordinates the GUI, configuration loading/saving, and Git command execution.
|
|
Acts as the controller in a Model-View-Controller like pattern.
|
|
"""
|
|
|
|
def __init__(self, master):
|
|
"""
|
|
Initializes the GitSvnSyncApp.
|
|
Args:
|
|
master (tk.Tk): The main Tkinter root window.
|
|
"""
|
|
self.master = master
|
|
master.title("Git SVN Sync Tool")
|
|
master.protocol("WM_DELETE_WINDOW", self.on_closing)
|
|
|
|
# --- Early Logger Setup ---
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
|
|
self.logger = logging.getLogger("GitSvnSyncApp")
|
|
|
|
# --- Configuration Manager ---
|
|
try:
|
|
self.config_manager = ConfigManager(self.logger)
|
|
except Exception as e:
|
|
self.logger.critical(f"Failed to initialize ConfigManager: {e}", exc_info=True)
|
|
self.show_fatal_error(f"Failed to load or create configuration file.\n{e}\nApplication cannot continue.")
|
|
master.destroy()
|
|
return
|
|
|
|
# --- GUI Main Frame ---
|
|
try:
|
|
self.main_frame = MainFrame(
|
|
master,
|
|
load_profile_settings_cb=self.load_profile_settings,
|
|
browse_folder_cb=self.browse_folder,
|
|
update_svn_status_cb=self.update_svn_status_indicator,
|
|
prepare_svn_for_git_cb=self.prepare_svn_for_git,
|
|
create_git_bundle_cb=self.create_git_bundle,
|
|
fetch_from_git_bundle_cb=self.fetch_from_git_bundle,
|
|
config_manager_instance=self.config_manager,
|
|
profile_sections_list=self.config_manager.get_profile_sections(),
|
|
add_profile_cb=self.add_profile,
|
|
remove_profile_cb=self.remove_profile,
|
|
manual_backup_cb=self.manual_backup # Connect button to method
|
|
)
|
|
except Exception as e:
|
|
self.logger.critical(f"Failed to initialize MainFrame GUI: {e}", exc_info=True)
|
|
self.show_fatal_error(f"Failed to create the main application window.\n{e}\nApplication cannot continue.")
|
|
master.destroy()
|
|
return
|
|
|
|
# --- Enhanced Logger Setup ---
|
|
self.logger = setup_logger(self.main_frame.log_text)
|
|
self.config_manager.logger = self.logger # Ensure config manager uses the enhanced logger
|
|
|
|
# --- Git Commands Handler ---
|
|
self.git_commands = GitCommands(self.logger)
|
|
|
|
# --- Initial Application State ---
|
|
self.logger.info("Application initializing...")
|
|
initial_profile = self.main_frame.profile_var.get()
|
|
if initial_profile:
|
|
self.logger.debug(f"Initial profile selected: '{initial_profile}'. Loading settings...")
|
|
# The trace on profile_var in MainFrame should trigger load_profile_settings automatically
|
|
else:
|
|
self.logger.warning("No profile selected on startup. Disabling action buttons.")
|
|
self._clear_and_disable_fields()
|
|
|
|
self.logger.info("Application started successfully.")
|
|
|
|
|
|
def on_closing(self):
|
|
"""Handles the event when the user tries to close the window."""
|
|
self.logger.info("Close button clicked. Preparing to exit.")
|
|
# Add checks for unsaved changes or running ops if necessary
|
|
self.logger.info("Application closing.")
|
|
self.master.destroy()
|
|
|
|
|
|
# --- Profile Management Callbacks/Methods ---
|
|
|
|
def load_profile_settings(self, profile_name):
|
|
"""Loads settings for the specified profile into the GUI fields."""
|
|
self.logger.info(f"Loading settings for profile: '{profile_name}'")
|
|
|
|
if not profile_name:
|
|
self.logger.warning("Attempted to load settings with no profile selected.")
|
|
self._clear_and_disable_fields()
|
|
return
|
|
|
|
if profile_name not in self.config_manager.get_profile_sections():
|
|
self.logger.error(f"Profile '{profile_name}' not found in configuration. Cannot load settings.")
|
|
self.main_frame.show_error("Profile Error", f"Profile '{profile_name}' not found.")
|
|
self._clear_and_disable_fields()
|
|
return
|
|
|
|
# Load Repository settings
|
|
svn_path = self.config_manager.get_profile_option(profile_name, "svn_working_copy_path", fallback="")
|
|
usb_path = self.config_manager.get_profile_option(profile_name, "usb_drive_path", fallback="")
|
|
bundle_name = self.config_manager.get_profile_option(profile_name, "bundle_name", fallback="")
|
|
bundle_updated_name = self.config_manager.get_profile_option(profile_name, "bundle_name_updated", fallback="")
|
|
autocommit_str = self.config_manager.get_profile_option(profile_name, "autocommit", fallback="False")
|
|
commit_message = self.config_manager.get_profile_option(profile_name, "commit_message", fallback="") # Load commit message
|
|
|
|
# Load Backup settings
|
|
autobackup_str = self.config_manager.get_profile_option(profile_name, "autobackup", fallback="False")
|
|
backup_dir = self.config_manager.get_profile_option(profile_name, "backup_dir", fallback=DEFAULT_BACKUP_DIR)
|
|
exclude_extensions = self.config_manager.get_profile_option(profile_name, "backup_exclude_extensions", fallback=".log,.tmp") # Load exclude extensions
|
|
|
|
# Update GUI Elements
|
|
if hasattr(self, 'main_frame'):
|
|
# Repository Frame
|
|
self.main_frame.svn_path_entry.delete(0, tk.END); self.main_frame.svn_path_entry.insert(0, svn_path)
|
|
self.main_frame.usb_path_entry.delete(0, tk.END); self.main_frame.usb_path_entry.insert(0, usb_path)
|
|
self.main_frame.bundle_name_entry.delete(0, tk.END); self.main_frame.bundle_name_entry.insert(0, bundle_name)
|
|
self.main_frame.bundle_updated_name_entry.delete(0, tk.END); self.main_frame.bundle_updated_name_entry.insert(0, bundle_updated_name)
|
|
self.main_frame.autocommit_var.set(autocommit_str.lower() == "true")
|
|
self.main_frame.commit_message_var.set(commit_message) # Update commit message entry
|
|
|
|
# Backup Frame
|
|
self.main_frame.autobackup_var.set(autobackup_str.lower() == "true")
|
|
self.main_frame.backup_dir_var.set(backup_dir)
|
|
self.main_frame.backup_exclude_extensions_var.set(exclude_extensions) # Update exclude extensions entry
|
|
self.main_frame.toggle_backup_dir() # Update backup dir entry state
|
|
|
|
# Update SVN status indicator based on the loaded path
|
|
self.update_svn_status_indicator(svn_path)
|
|
|
|
# Enable function buttons now that a valid profile is loaded
|
|
self._enable_function_buttons()
|
|
self.logger.info(f"Settings loaded successfully for profile '{profile_name}'.")
|
|
else:
|
|
self.logger.error("Main frame not available, cannot load settings into GUI.")
|
|
|
|
|
|
def save_profile_settings(self):
|
|
"""Saves the current GUI field values to the currently selected profile."""
|
|
profile = self.main_frame.profile_var.get()
|
|
if not profile:
|
|
self.logger.warning("No profile selected. Cannot save settings.")
|
|
return False
|
|
|
|
self.logger.info(f"Saving settings for profile: '{profile}'")
|
|
try:
|
|
# Save Repository settings
|
|
self.config_manager.set_profile_option(profile, "svn_working_copy_path", self.main_frame.svn_path_entry.get())
|
|
self.config_manager.set_profile_option(profile, "usb_drive_path", self.main_frame.usb_path_entry.get())
|
|
self.config_manager.set_profile_option(profile, "bundle_name", self.main_frame.bundle_name_entry.get())
|
|
self.config_manager.set_profile_option(profile, "bundle_name_updated", self.main_frame.bundle_updated_name_entry.get())
|
|
self.config_manager.set_profile_option(profile, "autocommit", str(self.main_frame.autocommit_var.get()))
|
|
self.config_manager.set_profile_option(profile, "commit_message", self.main_frame.commit_message_var.get()) # Save commit message
|
|
|
|
# Save Backup settings
|
|
self.config_manager.set_profile_option(profile, "autobackup", str(self.main_frame.autobackup_var.get()))
|
|
self.config_manager.set_profile_option(profile, "backup_dir", self.main_frame.backup_dir_var.get())
|
|
self.config_manager.set_profile_option(profile, "backup_exclude_extensions", self.main_frame.backup_exclude_extensions_var.get()) # Save exclude extensions
|
|
|
|
# Persist changes to the configuration file
|
|
self.config_manager.save_config()
|
|
|
|
self.logger.info(f"Profile settings for '{profile}' saved successfully.")
|
|
return True # Indicate success
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error saving settings for profile '{profile}': {e}", exc_info=True)
|
|
self.main_frame.show_error("Save Error", f"Failed to save settings for profile '{profile}'.\n{e}")
|
|
return False # Indicate failure
|
|
|
|
|
|
def add_profile(self):
|
|
"""Handles adding a new profile."""
|
|
self.logger.debug("'Add Profile' button clicked.")
|
|
new_profile_name = self.main_frame.ask_new_profile_name()
|
|
|
|
if not new_profile_name:
|
|
self.logger.info("Profile addition cancelled by user.")
|
|
return
|
|
|
|
new_profile_name = new_profile_name.strip()
|
|
if not new_profile_name:
|
|
self.logger.warning("Attempted to add profile with empty name.")
|
|
self.main_frame.show_error("Error", "Profile name cannot be empty.")
|
|
return
|
|
|
|
if new_profile_name in self.config_manager.get_profile_sections():
|
|
self.logger.warning(f"Attempted to add existing profile name: '{new_profile_name}'")
|
|
self.main_frame.show_error("Error", f"Profile name '{new_profile_name}' already exists.")
|
|
return
|
|
|
|
self.logger.info(f"Adding new profile: '{new_profile_name}'")
|
|
try:
|
|
# Add section and set default options using ConfigManager
|
|
defaults = self.config_manager._get_expected_keys_with_defaults() # Get defaults dict
|
|
# Override specific defaults for a new profile if needed
|
|
defaults["bundle_name"] = f"{new_profile_name}_repo.bundle"
|
|
defaults["bundle_name_updated"] = f"{new_profile_name}_update.bundle"
|
|
defaults["svn_working_copy_path"] = "" # Start with empty paths
|
|
defaults["usb_drive_path"] = ""
|
|
|
|
for key, value in defaults.items():
|
|
self.config_manager.set_profile_option(new_profile_name, key, value)
|
|
|
|
self.config_manager.save_config()
|
|
|
|
# Update GUI dropdown
|
|
updated_sections = self.config_manager.get_profile_sections()
|
|
self.main_frame.update_profile_dropdown(updated_sections)
|
|
self.main_frame.profile_var.set(new_profile_name) # Select the new profile
|
|
# Loading settings triggered by trace
|
|
|
|
self.logger.info(f"Profile '{new_profile_name}' added successfully.")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error adding profile '{new_profile_name}': {e}", exc_info=True)
|
|
self.main_frame.show_error("Error", f"Failed to add profile '{new_profile_name}'.\n{e}")
|
|
|
|
|
|
def remove_profile(self):
|
|
"""Handles removing the currently selected profile."""
|
|
self.logger.debug("'Remove Profile' button clicked.")
|
|
profile_to_remove = self.main_frame.profile_var.get()
|
|
|
|
if not profile_to_remove:
|
|
self.logger.warning("Attempted to remove profile when none is selected.")
|
|
self.main_frame.show_error("Error", "No profile selected to remove.")
|
|
return
|
|
|
|
if profile_to_remove == DEFAULT_PROFILE:
|
|
self.logger.warning("Attempted to remove the default profile.")
|
|
self.main_frame.show_error("Error", f"Cannot remove the '{DEFAULT_PROFILE}' profile.")
|
|
return
|
|
|
|
if self.main_frame.ask_yes_no("Remove Profile", f"Are you sure you want to permanently remove profile '{profile_to_remove}'?"):
|
|
self.logger.info(f"Attempting to remove profile: '{profile_to_remove}'")
|
|
try:
|
|
success = self.config_manager.remove_profile_section(profile_to_remove)
|
|
if success:
|
|
self.config_manager.save_config()
|
|
self.logger.info(f"Profile '{profile_to_remove}' removed successfully.")
|
|
updated_sections = self.config_manager.get_profile_sections()
|
|
self.main_frame.update_profile_dropdown(updated_sections)
|
|
else:
|
|
self.main_frame.show_error("Error", f"Failed to remove profile '{profile_to_remove}'. Check logs.")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Unexpected error removing profile '{profile_to_remove}': {e}", exc_info=True)
|
|
self.main_frame.show_error("Error", f"An unexpected error occurred while removing profile '{profile_to_remove}'.\n{e}")
|
|
else:
|
|
self.logger.info("Profile removal cancelled by user.")
|
|
|
|
|
|
# --- GUI Interaction Callbacks ---
|
|
|
|
def browse_folder(self, entry_widget):
|
|
"""Opens a folder selection dialog and updates the specified Tkinter Entry widget."""
|
|
self.logger.debug(f"Browse folder requested for widget: {entry_widget}")
|
|
initial_dir = entry_widget.get() or os.path.expanduser("~")
|
|
if not os.path.isdir(initial_dir):
|
|
initial_dir = os.path.expanduser("~")
|
|
|
|
directory = filedialog.askdirectory(
|
|
initialdir=initial_dir,
|
|
title="Select Directory",
|
|
parent=self.master
|
|
)
|
|
|
|
if directory:
|
|
self.logger.debug(f"Directory selected: {directory}")
|
|
entry_widget.delete(0, tk.END)
|
|
entry_widget.insert(0, directory)
|
|
if entry_widget == self.main_frame.svn_path_entry:
|
|
self.update_svn_status_indicator(directory)
|
|
else:
|
|
self.logger.debug("Folder browse dialog cancelled.")
|
|
|
|
|
|
def update_svn_status_indicator(self, svn_path):
|
|
"""Checks if the given SVN path is prepared for Git and updates the GUI indicator/button."""
|
|
is_prepared = False
|
|
if svn_path and os.path.isdir(svn_path):
|
|
git_dir_path = os.path.join(svn_path, ".git")
|
|
is_prepared = os.path.exists(git_dir_path)
|
|
self.logger.debug(f"Checking SVN status for path '{svn_path}'. '.git' found: {is_prepared}")
|
|
else:
|
|
self.logger.debug(f"SVN path '{svn_path}' is invalid or empty. Status set to unprepared.")
|
|
|
|
if hasattr(self, 'main_frame'):
|
|
self.main_frame.update_svn_indicator(is_prepared)
|
|
|
|
|
|
# --- Core Functionality Methods ---
|
|
|
|
def _get_and_validate_svn_path(self, operation_name="Operation"):
|
|
"""Retrieves and validates the SVN path from the GUI."""
|
|
svn_path_str = self.main_frame.svn_path_entry.get().strip()
|
|
if not svn_path_str:
|
|
self.logger.error(f"{operation_name}: SVN Working Copy Path is empty.")
|
|
self.main_frame.show_error("Input Error", "Please specify the SVN Working Copy Path.")
|
|
return None
|
|
abs_path = os.path.abspath(svn_path_str)
|
|
if not os.path.isdir(abs_path):
|
|
self.logger.error(f"{operation_name}: Specified SVN path is not a valid directory: {abs_path}")
|
|
self.main_frame.show_error("Input Error", f"The specified SVN path is not a valid directory:\n{abs_path}")
|
|
return None
|
|
self.logger.debug(f"{operation_name}: Using validated SVN path: {abs_path}")
|
|
return abs_path
|
|
|
|
|
|
def _get_and_validate_usb_path(self, operation_name="Operation"):
|
|
"""Retrieves and validates the USB/Bundle Target path."""
|
|
usb_path_str = self.main_frame.usb_path_entry.get().strip()
|
|
if not usb_path_str:
|
|
self.logger.error(f"{operation_name}: Bundle Target Directory path is empty.")
|
|
self.main_frame.show_error("Input Error", "Please specify the Bundle Target Directory.")
|
|
return None
|
|
abs_path = os.path.abspath(usb_path_str)
|
|
if not os.path.isdir(abs_path):
|
|
self.logger.error(f"{operation_name}: Specified Bundle Target path is not a valid directory: {abs_path}")
|
|
self.main_frame.show_error("Input Error", f"The specified Bundle Target path is not a valid directory:\n{abs_path}")
|
|
return None
|
|
self.logger.debug(f"{operation_name}: Using validated Bundle Target path: {abs_path}")
|
|
return abs_path
|
|
|
|
|
|
def prepare_svn_for_git(self):
|
|
"""Handles the 'Prepare SVN for Git' action."""
|
|
self.logger.info("--- Action: Prepare SVN Repo ---")
|
|
svn_path = self._get_and_validate_svn_path("Prepare SVN")
|
|
if not svn_path: return
|
|
|
|
if not self.save_profile_settings():
|
|
self.logger.warning("Prepare SVN: Could not save profile settings before proceeding.")
|
|
|
|
git_dir_path = os.path.join(svn_path, ".git")
|
|
if os.path.exists(git_dir_path):
|
|
self.logger.info(f"Repository at '{svn_path}' is already prepared.")
|
|
self.main_frame.show_info("Already Prepared", f"The repository at:\n{svn_path}\nis already prepared.")
|
|
self.update_svn_status_indicator(svn_path)
|
|
return
|
|
|
|
self.logger.info(f"Executing git preparation steps for: {svn_path}")
|
|
try:
|
|
self.git_commands.prepare_svn_for_git(svn_path)
|
|
self.logger.info("SVN repository prepared successfully.")
|
|
self.main_frame.show_info("Success", f"Repository prepared successfully:\n{svn_path}")
|
|
self.update_svn_status_indicator(svn_path) # Update indicator to green
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Error preparing SVN repository: {e}")
|
|
self.main_frame.show_error("Preparation Error", f"Failed to prepare repository:\n{svn_path}\n\nError: {e}")
|
|
self.update_svn_status_indicator(svn_path)
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error during SVN preparation: {e}")
|
|
self.main_frame.show_error("Unexpected Error", f"An unexpected error occurred during preparation:\n{e}")
|
|
self.update_svn_status_indicator(svn_path)
|
|
|
|
|
|
def create_git_bundle(self):
|
|
"""Handles the 'Create Bundle' action."""
|
|
self.logger.info("--- Action: Create Git Bundle ---")
|
|
profile = self.main_frame.profile_var.get()
|
|
if not profile:
|
|
self.logger.error("Create Bundle: No profile selected.")
|
|
self.main_frame.show_error("Error", "Cannot create bundle without a selected profile.")
|
|
return
|
|
|
|
svn_path = self._get_and_validate_svn_path("Create Bundle")
|
|
if not svn_path: return
|
|
usb_path = self._get_and_validate_usb_path("Create Bundle")
|
|
if not usb_path: return
|
|
|
|
bundle_name = self.main_frame.bundle_name_entry.get().strip()
|
|
if not bundle_name:
|
|
self.logger.error("Create Bundle: Bundle name cannot be empty.")
|
|
self.main_frame.show_error("Input Error", "Please enter a name for the bundle file.")
|
|
return
|
|
if not bundle_name.lower().endswith(".bundle"):
|
|
self.logger.warning(f"Bundle name '{bundle_name}' does not end with '.bundle'. Adding extension.")
|
|
bundle_name += ".bundle"
|
|
self.main_frame.bundle_name_entry.delete(0, tk.END)
|
|
self.main_frame.bundle_name_entry.insert(0, bundle_name) # Update GUI
|
|
|
|
bundle_full_path = os.path.join(usb_path, bundle_name)
|
|
self.logger.debug(f"Target bundle file path: {bundle_full_path}")
|
|
|
|
if not self.save_profile_settings():
|
|
self.logger.warning("Create Bundle: Could not save profile settings before proceeding.")
|
|
|
|
# --- Backup ---
|
|
if self.main_frame.autobackup_var.get():
|
|
self.logger.info("Autobackup enabled. Starting backup...")
|
|
if not self.create_backup(svn_path, profile): # Pass profile name for exclusions
|
|
self.logger.error("Bundle creation aborted due to backup failure.")
|
|
return
|
|
|
|
# --- Autocommit ---
|
|
if self.main_frame.autocommit_var.get():
|
|
self.logger.info("Autocommit enabled. Checking for changes and committing if necessary...")
|
|
try:
|
|
if self.git_commands.git_status_has_changes(svn_path):
|
|
self.logger.info("Changes detected, performing autocommit...")
|
|
custom_message = self.main_frame.commit_message_var.get().strip()
|
|
commit_message_to_use = custom_message if custom_message else f"Autocommit profile '{profile}'" # Use profile in default msg
|
|
self.logger.debug(f"Using commit message: '{commit_message_to_use}'")
|
|
|
|
commit_made = self.git_commands.git_commit(svn_path, message=commit_message_to_use)
|
|
if commit_made:
|
|
self.logger.info("Autocommit successful.")
|
|
else:
|
|
self.logger.warning("Status reported changes, but autocommit resulted in 'nothing to commit'.")
|
|
else:
|
|
self.logger.info("No changes detected, skipping autocommit.")
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Error during autocommit process: {e}")
|
|
self.main_frame.show_error("Autocommit Error", f"Failed to check status or commit changes:\n{e}")
|
|
return
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error during autocommit: {e}")
|
|
self.main_frame.show_error("Unexpected Error", f"An unexpected error occurred during autocommit:\n{e}")
|
|
return
|
|
|
|
# --- Create Bundle ---
|
|
self.logger.info(f"Creating Git bundle at: {bundle_full_path}")
|
|
try:
|
|
self.git_commands.create_git_bundle(svn_path, bundle_full_path)
|
|
if os.path.exists(bundle_full_path) and os.path.getsize(bundle_full_path) > 0: # Check if file exists and is not empty
|
|
self.logger.info("Git bundle created successfully.")
|
|
self.main_frame.show_info("Success", f"Git bundle created successfully:\n{bundle_full_path}")
|
|
else:
|
|
self.logger.warning("Bundle file was not created or is empty (likely no new commits).")
|
|
self.main_frame.show_warning("Bundle Not Created", "Bundle file was not created or is empty.\nThis usually means the repository had no new commits.")
|
|
# Attempt to remove empty file if it exists
|
|
if os.path.exists(bundle_full_path):
|
|
try: os.remove(bundle_full_path)
|
|
except OSError: pass # Ignore error removing empty file
|
|
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Error creating Git bundle: {e}")
|
|
self.main_frame.show_error("Bundle Creation Error", f"Failed to create Git bundle:\n{e}")
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error during bundle creation: {e}")
|
|
self.main_frame.show_error("Unexpected Error", f"An unexpected error occurred during bundle creation:\n{e}")
|
|
|
|
|
|
def fetch_from_git_bundle(self):
|
|
"""Handles the 'Fetch from Bundle' action."""
|
|
self.logger.info("--- Action: Fetch from Git Bundle ---")
|
|
profile = self.main_frame.profile_var.get()
|
|
if not profile:
|
|
self.logger.error("Fetch Bundle: No profile selected.")
|
|
self.main_frame.show_error("Error", "Cannot fetch from bundle without a selected profile.")
|
|
return
|
|
|
|
svn_path = self._get_and_validate_svn_path("Fetch Bundle")
|
|
if not svn_path: return
|
|
usb_path = self._get_and_validate_usb_path("Fetch Bundle")
|
|
if not usb_path: return
|
|
|
|
bundle_updated_name = self.main_frame.bundle_updated_name_entry.get().strip()
|
|
if not bundle_updated_name:
|
|
self.logger.error("Fetch Bundle: Fetch bundle name cannot be empty.")
|
|
self.main_frame.show_error("Input Error", "Please enter the name of the bundle file to fetch from.")
|
|
return
|
|
|
|
bundle_full_path = os.path.join(usb_path, bundle_updated_name)
|
|
self.logger.debug(f"Source bundle file path: {bundle_full_path}")
|
|
|
|
if not os.path.isfile(bundle_full_path):
|
|
self.logger.error(f"Fetch Bundle: Bundle file does not exist or is not a file: '{bundle_full_path}'")
|
|
self.main_frame.show_error("File Not Found", f"The specified bundle file does not exist:\n{bundle_full_path}")
|
|
return
|
|
|
|
if not self.save_profile_settings():
|
|
self.logger.warning("Fetch Bundle: Could not save profile settings before proceeding.")
|
|
|
|
# --- Backup ---
|
|
if self.main_frame.autobackup_var.get():
|
|
self.logger.info("Autobackup enabled. Starting backup...")
|
|
if not self.create_backup(svn_path, profile): # Pass profile name
|
|
self.logger.error("Fetch/merge aborted due to backup failure.")
|
|
return
|
|
|
|
# --- Fetch and Merge ---
|
|
self.logger.info(f"Fetching/merging into '{svn_path}' from bundle: {bundle_full_path}")
|
|
try:
|
|
self.git_commands.fetch_from_git_bundle(svn_path, bundle_full_path)
|
|
self.logger.info("Changes fetched and potentially merged successfully.")
|
|
self.main_frame.show_info("Success", f"Changes fetched successfully into:\n{svn_path}\nfrom bundle:\n{bundle_full_path}")
|
|
|
|
except GitCommandError as e:
|
|
self.logger.error(f"Error fetching/merging from Git bundle: {e}")
|
|
if "merge conflict" in str(e).lower():
|
|
self.main_frame.show_error(
|
|
"Merge Conflict",
|
|
f"Merge conflict occurred while applying changes from the bundle.\n\n"
|
|
f"Please resolve the conflicts manually in the repository:\n{svn_path}\n\n"
|
|
f"After resolving, run 'git add .' and 'git commit' in that directory.\n\n"
|
|
f"Original Error: {e}"
|
|
)
|
|
else:
|
|
self.main_frame.show_error("Fetch/Merge Error", f"Failed to fetch or merge from bundle:\n{e}")
|
|
except ValueError as e:
|
|
self.logger.error(f"Validation error during fetch/merge: {e}")
|
|
self.main_frame.show_error("Input Error", f"Invalid input during fetch/merge:\n{e}")
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error during fetch/merge: {e}")
|
|
self.main_frame.show_error("Unexpected Error", f"An unexpected error occurred during fetch/merge:\n{e}")
|
|
|
|
|
|
def _parse_exclusions(self, profile_name):
|
|
"""Parses exclusion string from config into sets of extensions and dirs."""
|
|
exclude_str = self.config_manager.get_profile_option(profile_name, "backup_exclude_extensions", fallback="")
|
|
excluded_extensions = set()
|
|
if exclude_str:
|
|
raw_extensions = exclude_str.split(',')
|
|
for ext in raw_extensions:
|
|
clean_ext = ext.strip().lower()
|
|
if not clean_ext: continue
|
|
if not clean_ext.startswith('.'):
|
|
clean_ext = '.' + clean_ext
|
|
excluded_extensions.add(clean_ext)
|
|
# Always exclude these directories by default
|
|
excluded_dirs = {".git", ".svn"}
|
|
self.logger.debug(f"Parsed exclusions - Extensions: {excluded_extensions}, Dirs: {excluded_dirs}")
|
|
return excluded_extensions, excluded_dirs
|
|
|
|
|
|
def create_backup(self, source_repo_path, profile_name):
|
|
"""
|
|
Creates a timestamped ZIP backup of the source repository directory,
|
|
respecting exclusions defined in the profile. (ZIP Implementation)
|
|
|
|
Args:
|
|
source_repo_path (str): The absolute path to the repository to back up.
|
|
profile_name (str): The name of the current profile (for filename and exclusions).
|
|
|
|
Returns:
|
|
bool: True if backup was successful, False otherwise.
|
|
"""
|
|
self.logger.info(f"Creating ZIP backup for profile '{profile_name}' from '{source_repo_path}'")
|
|
|
|
backup_base_dir = self.main_frame.backup_dir_var.get().strip()
|
|
if not backup_base_dir:
|
|
self.logger.error("Backup failed: Backup directory not specified.")
|
|
self.main_frame.show_error("Backup Error", "Backup directory is not specified.")
|
|
return False
|
|
if not os.path.isdir(backup_base_dir):
|
|
self.logger.info(f"Backup directory '{backup_base_dir}' creating...")
|
|
try:
|
|
os.makedirs(backup_base_dir, exist_ok=True)
|
|
except OSError as e:
|
|
self.logger.error(f"Could not create backup directory '{backup_base_dir}': {e}")
|
|
self.main_frame.show_error("Backup Error", f"Could not create backup directory:\n{backup_base_dir}\nError: {e}")
|
|
return False
|
|
|
|
# Parse exclusions
|
|
excluded_extensions, excluded_dirs = self._parse_exclusions(profile_name)
|
|
|
|
# Construct Backup Filename
|
|
now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
safe_profile_name = "".join(c for c in profile_name if c.isalnum() or c in ('_', '-')).rstrip() or "profile"
|
|
backup_filename = f"{now}_backup_{safe_profile_name}.zip"
|
|
backup_full_path = os.path.join(backup_base_dir, backup_filename)
|
|
self.logger.info(f"Target backup ZIP file: {backup_full_path}")
|
|
|
|
# Create ZIP Archive
|
|
files_added_count = 0
|
|
files_excluded_count = 0
|
|
dirs_excluded_count = 0
|
|
try:
|
|
with zipfile.ZipFile(backup_full_path, 'w', compression=zipfile.ZIP_DEFLATED, allowZip64=True) as zipf:
|
|
for root, dirs, files in os.walk(source_repo_path, topdown=True):
|
|
# --- Directory Exclusion ---
|
|
original_dirs = list(dirs) # Copy before modification
|
|
dirs[:] = [d for d in dirs if d not in excluded_dirs]
|
|
# Log excluded directories
|
|
excluded_in_this_level = set(original_dirs) - set(dirs)
|
|
if excluded_in_this_level:
|
|
dirs_excluded_count += len(excluded_in_this_level)
|
|
for excluded_dir in excluded_in_this_level:
|
|
self.logger.debug(f"Excluding directory: {os.path.join(root, excluded_dir)}")
|
|
|
|
|
|
# --- File Exclusion and Addition ---
|
|
for filename in files:
|
|
_, ext = os.path.splitext(filename)
|
|
file_ext_lower = ext.lower()
|
|
|
|
# Combine directory and extension check
|
|
parent_dir_name = os.path.basename(root)
|
|
# Check if file itself should be excluded by dir name rule (e.g., file named '.git')
|
|
is_in_excluded_dir_structure = any(excluded_dir in os.path.normpath(root).split(os.sep) for excluded_dir in excluded_dirs)
|
|
|
|
|
|
if file_ext_lower in excluded_extensions or filename in excluded_dirs or is_in_excluded_dir_structure:
|
|
self.logger.debug(f"Excluding file: {os.path.join(root, filename)}")
|
|
files_excluded_count += 1
|
|
continue # Skip this file
|
|
|
|
file_full_path = os.path.join(root, filename)
|
|
archive_name = os.path.relpath(file_full_path, source_repo_path)
|
|
|
|
try:
|
|
zipf.write(file_full_path, arcname=archive_name)
|
|
files_added_count += 1
|
|
if files_added_count % 200 == 0: # Log progress less frequently
|
|
self.logger.debug(f"Added {files_added_count} files to ZIP...")
|
|
except Exception as write_e:
|
|
self.logger.error(f"Error writing file '{file_full_path}' to ZIP: {write_e}", exc_info=True)
|
|
# Decide whether to continue or abort
|
|
|
|
self.logger.info(f"Backup ZIP created: {backup_full_path}")
|
|
self.logger.info(f"Files added: {files_added_count}, Files excluded: {files_excluded_count}, Dirs excluded: {dirs_excluded_count}")
|
|
return True
|
|
|
|
except OSError as e:
|
|
self.logger.error(f"OS error creating backup ZIP: {e}", exc_info=True)
|
|
self.main_frame.show_error("Backup Error", f"Error creating backup ZIP:\n{e}")
|
|
if os.path.exists(backup_full_path): os.remove(backup_full_path)
|
|
return False
|
|
except zipfile.BadZipFile as e:
|
|
self.logger.error(f"Error related to ZIP file format during backup: {e}", exc_info=True)
|
|
self.main_frame.show_error("Backup Error", f"ZIP file format error during backup:\n{e}")
|
|
if os.path.exists(backup_full_path): os.remove(backup_full_path)
|
|
return False
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error during ZIP backup creation: {e}")
|
|
self.main_frame.show_error("Backup Error", f"An unexpected error occurred during ZIP backup:\n{e}")
|
|
if os.path.exists(backup_full_path): os.remove(backup_full_path)
|
|
return False
|
|
|
|
def manual_backup(self):
|
|
"""Handles the 'Backup Now' button click."""
|
|
self.logger.info("--- Action: Manual Backup Now ---")
|
|
profile = self.main_frame.profile_var.get()
|
|
if not profile:
|
|
self.logger.warning("Manual Backup: No profile selected.")
|
|
self.main_frame.show_error("Backup Error", "No profile selected to perform backup.")
|
|
return
|
|
|
|
svn_path = self._get_and_validate_svn_path(f"Manual Backup ({profile})")
|
|
if not svn_path: return
|
|
|
|
self.logger.info("Saving current settings before manual backup...")
|
|
if not self.save_profile_settings():
|
|
self.logger.error("Manual Backup: Could not save profile settings. Backup may use outdated settings.")
|
|
if not self.main_frame.ask_yes_no("Save Error", "Could not save current settings (e.g., exclusions).\nContinue backup with previously saved settings?"):
|
|
self.logger.warning("Manual backup aborted by user due to save failure.")
|
|
return
|
|
|
|
self.logger.info(f"Starting manual backup for profile '{profile}'...")
|
|
# Call the ZIP backup method
|
|
success = self.create_backup(svn_path, profile)
|
|
|
|
if success:
|
|
self.main_frame.show_info("Backup Complete", f"Manual ZIP backup for profile '{profile}' completed successfully.")
|
|
else:
|
|
self.logger.error(f"Manual backup failed for profile '{profile}'.")
|
|
# Error message already shown by create_backup
|
|
|
|
# --- GUI State Utilities ---
|
|
def _clear_and_disable_fields(self):
|
|
"""Clears repository config fields and disables action buttons."""
|
|
if hasattr(self, 'main_frame'):
|
|
self.main_frame.svn_path_entry.delete(0, tk.END)
|
|
self.main_frame.usb_path_entry.delete(0, tk.END)
|
|
self.main_frame.bundle_name_entry.delete(0, tk.END)
|
|
self.main_frame.bundle_updated_name_entry.delete(0, tk.END)
|
|
self.main_frame.autocommit_var.set(False)
|
|
self.main_frame.commit_message_var.set("") # Clear commit message
|
|
# Don't clear backup fields, keep defaults? Or load default profile settings?
|
|
# Reset SVN indicator
|
|
self.update_svn_status_indicator("")
|
|
# Disable action buttons
|
|
self._disable_function_buttons()
|
|
self.logger.debug("GUI fields cleared/reset and action buttons disabled.")
|
|
|
|
def _disable_function_buttons(self):
|
|
"""Disables the main action buttons."""
|
|
if hasattr(self, 'main_frame'):
|
|
buttons = [
|
|
getattr(self.main_frame, 'prepare_svn_button', None),
|
|
getattr(self.main_frame, 'create_bundle_button', None),
|
|
getattr(self.main_frame, 'fetch_bundle_button', None),
|
|
getattr(self.main_frame, 'manual_backup_button', None) # Also disable manual backup
|
|
]
|
|
for button in buttons:
|
|
if button: button.config(state=tk.DISABLED)
|
|
self.logger.debug("Function buttons disabled.")
|
|
|
|
def _enable_function_buttons(self):
|
|
"""Enables action buttons based on profile/path validity. Prepare button state depends on repo status."""
|
|
if hasattr(self, 'main_frame'):
|
|
# Enable Create, Fetch, and Manual Backup if a profile is loaded and paths likely valid
|
|
general_state = tk.NORMAL # Assume enabled if profile is loaded
|
|
buttons_to_enable = [
|
|
getattr(self.main_frame, 'create_bundle_button', None),
|
|
getattr(self.main_frame, 'fetch_bundle_button', None),
|
|
getattr(self.main_frame, 'manual_backup_button', None)
|
|
]
|
|
for button in buttons_to_enable:
|
|
if button: button.config(state=general_state)
|
|
|
|
# Prepare button state is handled separately by update_svn_indicator
|
|
self.update_svn_status_indicator(self.main_frame.svn_path_entry.get())
|
|
self.logger.debug("Create/Fetch/Backup buttons enabled. Prepare button state updated.")
|
|
|
|
def show_fatal_error(self, message):
|
|
"""Shows a fatal error message before the app potentially exits."""
|
|
try: # Try to show graphical error
|
|
messagebox.showerror("Fatal Error", message)
|
|
except tk.TclError: # Fallback if GUI is not ready
|
|
print(f"FATAL ERROR: {message}")
|
|
|
|
# --- Application Entry Point ---
|
|
def main():
|
|
"""Main function to create the Tkinter root window and run the application."""
|
|
root = tk.Tk()
|
|
root.minsize(650, 550) # Adjust min size slightly for new fields
|
|
try:
|
|
app = GitSvnSyncApp(root)
|
|
# Check if initialization failed early
|
|
if hasattr(app, 'main_frame'): # Check if GUI was likely initialized
|
|
root.mainloop()
|
|
else:
|
|
print("Application initialization failed, exiting.")
|
|
except Exception as e:
|
|
logging.exception("Fatal error during application startup or main loop.")
|
|
# Try showing message box, fallback to print
|
|
try:
|
|
messagebox.showerror("Fatal Error", f"Application failed unexpectedly:\n{e}")
|
|
except Exception:
|
|
print(f"FATAL ERROR: Application failed unexpectedly:\n{e}")
|
|
|
|
if __name__ == "__main__":
|
|
main() |