# --- FILE: GitUtility.py --- import os import datetime import tkinter as tk from tkinter import messagebox from tkinter import filedialog # Import moduli logging e utilità import logging # Mantenuto per i livelli import re import threading import queue import traceback import sys from typing import Callable from typing import List from typing import Dict from typing import Any from typing import Tuple from typing import Optional # PEP8 compliance for Optional type hints # Import application modules try: # Configuration management from config_manager import ConfigManager from config_manager import DEFAULT_PROFILE from config_manager import DEFAULT_BACKUP_DIR from config_manager import DEFAULT_REMOTE_NAME # Action handlers and backend logic from action_handler import ActionHandler from backup_handler import BackupHandler from git_commands import GitCommands from git_commands import GitCommandError from remote_actions import RemoteActionHandler # Logging system import log_handler from logger_config import setup_file_logging # For file logger setup # GUI components from gui import MainFrame from gui import GitignoreEditorWindow from gui import CreateTagDialog from gui import CreateBranchDialog from gui import CloneFromRemoteDialog from diff_viewer import DiffViewerWindow from diff_summary_viewer import DiffSummaryWindow # Asynchronous operations import async_workers from async_result_handler import AsyncResultHandler # For processing results except ImportError as e: # Critical error handling if imports fail critical_msg: str = f"Critical Error: Failed to import required application modules: {e}" print(f"FATAL IMPORT ERROR: {critical_msg}", file=sys.stderr) # Attempt to show a graphical error message as a fallback try: root_fallback = tk.Tk() root_fallback.withdraw() # Hide the empty root window messagebox.showerror( "Startup Error", f"Failed to load components:\n{e}\n\nApplication cannot start.", ) root_fallback.destroy() except Exception: # Ignore errors in the graphical fallback itself pass # Exit the application forcefully if core components are missing sys.exit(1) class GitSvnSyncApp: """ Main application controller class for the Git Sync Tool. Orchestrates GUI and backend actions using asynchronous operations and a centralized logging queue. Initializes and connects components. """ # Constants for polling intervals (in milliseconds) LOG_QUEUE_CHECK_INTERVAL_MS: int = 100 ASYNC_QUEUE_CHECK_INTERVAL_MS: int = 100 def __init__(self, master: tk.Tk): """ Initializes the application components and GUI. Args: master (tk.Tk): The main Tkinter root window. """ self.master: tk.Tk = master master.title("Git Sync Tool (Bundle & Remote Manager)") # Define behavior on window close button press master.protocol("WM_DELETE_WINDOW", self.on_closing) # Initial logging (console) print("Initializing GitSvnSyncApp...") log_handler.log_debug("GitSvnSyncApp initialization started.", func_name="__init__") # --- Initialize Core Backend Components --- try: # Instantiate core handlers and managers self.config_manager: ConfigManager = ConfigManager(None) self.git_commands: GitCommands = GitCommands(None) self.backup_handler: BackupHandler = BackupHandler(None) self.action_handler: ActionHandler = ActionHandler( self.git_commands, self.backup_handler ) self.remote_action_handler: RemoteActionHandler = RemoteActionHandler( self.git_commands ) # Internal state variables self.remote_auth_status: str = "unknown" self.current_local_branch: Optional[str] = None # Use Optional print("Core components initialized.") log_handler.log_debug("Core components initialized successfully.", func_name="__init__") except Exception as e: # Handle critical errors during backend initialization print(f"FATAL: Failed to initialize core components: {e}", file=sys.stderr) log_handler.log_critical(f"Failed to initialize core components: {e}", func_name="__init__") self.show_fatal_error( f"Initialization Error:\n{e}\n\nApplication cannot start." ) # Attempt to close gracefully if backend fails self.master.after(10, self.on_closing) return # Stop initialization # --- Initialize Graphical User Interface (GUI) --- try: print("Creating MainFrame GUI...") log_handler.log_debug("Creating MainFrame GUI.", func_name="__init__") # Instantiate the main GUI frame, passing necessary callbacks and data self.main_frame: MainFrame = MainFrame( master=master, # Callbacks mapping GUI actions to controller methods load_profile_settings_cb=self.load_profile_settings, save_profile_cb=self.save_profile_settings, add_profile_cb=self.add_profile, remove_profile_cb=self.remove_profile, browse_folder_cb=self.browse_folder, update_svn_status_cb=self.update_svn_status_indicator, prepare_svn_for_git_cb=self.prepare_svn_for_git, create_git_bundle_cb=self.create_git_bundle, fetch_from_git_bundle_cb=self.fetch_from_git_bundle, manual_backup_cb=self.manual_backup, open_gitignore_editor_cb=self.open_gitignore_editor, commit_changes_cb=self.commit_changes, refresh_changed_files_cb=self.refresh_changed_files_list, open_diff_viewer_cb=self.open_diff_viewer, add_selected_file_cb=self.add_selected_file, refresh_tags_cb=self.refresh_tag_list, create_tag_cb=self.create_tag, checkout_tag_cb=self.checkout_tag, refresh_branches_cb=self.refresh_branch_list, # For local branches create_branch_cb=self.create_branch, checkout_branch_cb=self.checkout_branch, # For local branches refresh_history_cb=self.refresh_commit_history, apply_remote_config_cb=self.apply_remote_config, check_connection_auth_cb=self.check_connection_auth, fetch_remote_cb=self.fetch_remote, pull_remote_cb=self.pull_remote, push_remote_cb=self.push_remote, push_tags_remote_cb=self.push_tags_remote, refresh_remote_status_cb=self.refresh_remote_status, clone_remote_repo_cb=self.clone_remote_repo, refresh_remote_branches_cb=self.refresh_remote_branches, # For remote branches checkout_remote_branch_cb=self.checkout_remote_branch_as_local, delete_local_branch_cb=self.delete_local_branch, merge_local_branch_cb=self.merge_local_branch, compare_branch_with_current_cb=self.compare_branch_with_current, # Instance and initial data for the GUI config_manager_instance=self.config_manager, profile_sections_list=self.config_manager.get_profile_sections(), ) print("MainFrame GUI created.") log_handler.log_debug("MainFrame GUI created successfully.", func_name="__init__") except Exception as e: # Handle critical errors during GUI initialization print(f"FATAL: Failed to initialize MainFrame GUI: {e}", file=sys.stderr) log_handler.log_exception("Failed to initialize MainFrame GUI.", func_name="__init__") self.show_fatal_error(f"GUI Initialization Error:\n{e}\n\nApplication cannot start.") self.master.after(10, self.on_closing) # Attempt to close return # Stop initialization # --- Setup Logging Processing --- self._setup_logging_processing() # --- Log Application Start --- log_handler.log_info("Git Sync Tool application starting.", func_name="__init__") # --- Initial Profile Load --- # Loads the initially selected profile (default or first) into the GUI self._perform_initial_load() log_handler.log_info("Git Sync Tool initialization complete.", func_name="__init__") # --- Static Method Helper --- @staticmethod def _extract_path_from_status_line(file_status_line: str) -> Optional[str]: """ Extracts a clean relative path from a git status line. Handles various statuses including renames ('R old -> new'). Returns the 'new' path for renames/copies, or the main path otherwise. Returns None if parsing fails. """ func_name: str = "_extract_path_from_status_line" try: # Clean NUL characters and leading/trailing whitespace line: str = file_status_line.strip('\x00').strip() # Basic validation for minimum length (e.g., " M a") if not line or len(line) < 3: log_handler.log_warning( f"Invalid/short status line received: '{file_status_line}'", func_name=func_name ) return None path_part: str = "" # Handle rename/copy format: "XY old -> new" if "->" in line: # Take the part after the last "->" path_part = line.split("->")[-1].strip() else: # Handle other statuses: "XY path" or "?? path" # Regex captures 1 or 2 status chars, space, then the rest match = re.match(r"^[ MARCUD?!]{1,2}\s+(.*)", line) if match: # Group 1 contains the path part path_part = match.group(1) else: # Fallback if regex doesn't match (unusual format?) log_handler.log_warning( f"Could not match expected status line format: '{line}'", func_name=func_name ) # Simple fallback: take everything after the first space first_space_index: int = line.find(' ') if first_space_index != -1: path_part = line[first_space_index:].strip() else: # No space found, cannot extract path return None # If no path part was extracted, return None if not path_part: return None # Remove potential surrounding quotes relative_path: str if len(path_part) >= 2 and path_part.startswith('"') and path_part.endswith('"'): relative_path = path_part[1:-1] else: relative_path = path_part # Attempt to decode octal escape sequences (common with core.quotepath=true) try: # Only attempt if escape characters seem present if '\\' in relative_path: # Decode assuming UTF-8 escaped bytes # Note: This encoding/decoding dance is necessary for Python's unicode_escape decoded_bytes: bytes = bytes(relative_path, "utf-8").decode("unicode_escape").encode("latin-1") decoded_path: str = decoded_bytes.decode("utf-8") # Use decoded path only if it changed and looks valid # (This is heuristic, might need refinement based on actual Git output) if decoded_path != relative_path and os.path.normpath(decoded_path): log_handler.log_debug( f"Path '{relative_path}' decoded to '{decoded_path}'", func_name=func_name ) relative_path = decoded_path except Exception as decode_err: # Log warning but proceed with the undecoded path if decoding fails log_handler.log_warning( f"Could not decode potential escape sequences in path '{relative_path}': {decode_err}", func_name=func_name ) # Final check if path is empty after processing if not relative_path: return None log_handler.log_debug( f"Cleaned path from status line: '{relative_path}'", func_name=func_name ) return relative_path except Exception as e: # Log any unexpected error during path extraction log_handler.log_exception( f"Error cleaning path from status line '{file_status_line}': {e}", func_name=func_name ) return None # Return None on error # --- Logging Setup and Processing --- def _setup_logging_processing(self): """ Configures file logging and starts the log queue processing loop to update the GUI log area. """ func_name: str = "_setup_logging_processing" try: # 1. Configure file logging (writes to LOG_FILE) # The level set here determines the minimum level for the file log. setup_file_logging(level=logging.DEBUG) # Log DEBUG+ to file # 2. Start polling the shared log queue to update the GUI if hasattr(self, "main_frame") and hasattr(self.main_frame, "log_text"): log_handler.log_info( "Starting log queue processing for GUI.", func_name=func_name ) # Schedule the first check of the log queue self.master.after( self.LOG_QUEUE_CHECK_INTERVAL_MS, self._process_log_queue ) else: # Log error if the GUI log widget isn't ready print( "ERROR: Cannot start log queue processing - GUI log widget not found.", file=sys.stderr ) log_handler.log_error( "Cannot start log queue processing - GUI log widget not found.", func_name=func_name ) except Exception as e: # Log error if logging setup fails print(f"ERROR during logging setup: {e}", file=sys.stderr) log_handler.log_exception( "Failed to setup logging processing.", func_name=func_name ) def _process_log_queue(self): """ Processes messages from the shared log queue, writes them to the root logger (which has the file handler), and updates the GUI log widget. """ func_name: str = "_process_log_queue" # Safely get the log widget reference log_widget: Optional[tk.scrolledtext.ScrolledText] = getattr(self.main_frame, "log_text", None) # Stop processing if the main window or log widget is destroyed if not log_widget or not log_widget.winfo_exists(): log_handler.log_warning( "Log widget not found, stopping queue processing.", func_name=func_name ) return processed_count: int = 0 # Limit processing per cycle to keep GUI responsive max_proc_per_cycle: int = 50 # Process messages currently in the queue while not log_handler.log_queue.empty(): # Pause processing if limit is reached for this cycle if processed_count >= max_proc_per_cycle: log_handler.log_debug( f"Processed {max_proc_per_cycle} log entries, pausing.", func_name=func_name ) break try: # Get log entry without blocking log_entry: dict = log_handler.log_queue.get_nowait() level: int = log_entry.get("level", logging.INFO) message: str = log_entry.get("message", "") level_name: str = log_handler.get_log_level_name(level) # 1. Write to root logger (handled by configured file handler) logging.getLogger().log(level, message) processed_count += 1 # 2. Update GUI log widget if level is appropriate (DEBUG+) if level >= logging.DEBUG: try: # Temporarily enable widget, insert text with tag, scroll, disable original_state: str = log_widget.cget("state") log_widget.config(state=tk.NORMAL) log_widget.insert(tk.END, message + "\n", (level_name,)) log_widget.see(tk.END) # Auto-scroll to the end log_widget.config(state=original_state) except tk.TclError as e_gui: # Handle specific Tkinter errors (e.g., widget destroyed) print( f"TclError updating log widget: {e_gui} - Message: {message}", file=sys.stderr ) except Exception as e_gui: # Handle other errors updating the GUI print( f"Error updating log widget: {e_gui} - Message: {message}", file=sys.stderr ) except queue.Empty: # Queue is empty, exit loop for this cycle break except Exception as e_proc: # Handle unexpected errors during queue processing print(f"Error processing log queue item: {e_proc}", file=sys.stderr) # Attempt to log the processing error itself try: logging.getLogger().error(f"Error processing log queue item: {e_proc}") except Exception: pass # Ignore errors during fallback logging # Reschedule the next check if the main window still exists if self.master.winfo_exists(): self.master.after(self.LOG_QUEUE_CHECK_INTERVAL_MS, self._process_log_queue) # --- Initial Application Load --- def _perform_initial_load(self): """ Loads the initially selected profile (default or first available) into the GUI when the application starts. """ func_name: str = "_perform_initial_load" log_handler.log_debug("Performing initial profile load.", func_name=func_name) # Ensure the main GUI frame is available if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): log_handler.log_error( "Cannot perform initial load: MainFrame not ready.", func_name=func_name ) return # Get the profile currently selected in the dropdown initial_profile: str = self.main_frame.profile_var.get() # If a profile is selected, load its settings if initial_profile: log_handler.log_debug( f"Loading initial profile: '{initial_profile}'", func_name=func_name ) # Call the standard method to load settings into the GUI self.load_profile_settings(initial_profile) else: # Handle the case where no profiles exist in the config file log_handler.log_warning( "No initial profile set (no profiles found?).", func_name=func_name ) # Clear GUI fields and disable actions self._clear_and_disable_fields() # Update status bar to inform the user self.main_frame.update_status_bar("No profiles found. Please add a profile.") # --- Application Closing Handler --- def on_closing(self): """ Handles the window close event (e.g., clicking the 'X' button). """ func_name: str = "on_closing" log_handler.log_info("Application closing initiated.", func_name=func_name) # Attempt to update status bar before closing (might fail if GUI is broken) if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): try: self.main_frame.update_status_bar("Exiting...") except Exception: pass # Ignore errors during shutdown status update # Destroy the main Tkinter window if self.master and self.master.winfo_exists(): self.master.destroy() log_handler.log_info("Application closed.", func_name=func_name) # Note: Daemon threads should terminate automatically upon main thread exit. # --- Profile Management Callbacks --- def load_profile_settings(self, profile_name: str): """ Loads settings for the selected profile name into the GUI fields and triggers necessary refreshes or updates based on the loaded path. """ func_name: str = "load_profile_settings" log_handler.log_info( f"Loading settings for profile: '{profile_name}'", func_name=func_name ) # Ensure main frame is available if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): log_handler.log_error( "Cannot load profile: Main frame not available.", func_name=func_name ) return # Update status bar to indicate loading self.main_frame.update_status_bar( f"Processing: Loading profile '{profile_name}'..." ) # Validate the profile name if not profile_name or profile_name not in self.config_manager.get_profile_sections(): log_handler.log_warning( f"Profile '{profile_name}' invalid/not found.", func_name=func_name ) # Clear fields and disable actions if profile is invalid self._clear_and_disable_fields() # Show error message if a specific (invalid) profile was requested if profile_name: self.main_frame.show_error( "Profile Load Error", f"Profile '{profile_name}' not found." ) # Update status bar status_msg: str = f"Error: Profile '{profile_name}' not found." if profile_name else "No profile selected." self.main_frame.update_status_bar(status_msg) return # Get settings from ConfigManager cm: ConfigManager = self.config_manager keys_with_defaults: dict = cm._get_expected_keys_with_defaults() settings: dict = {} for key, default_value in keys_with_defaults.items(): settings[key] = cm.get_profile_option( profile_name, key, fallback=default_value ) # Apply settings to GUI widgets mf: MainFrame = self.main_frame repo_path_for_refresh: str = "" try: # Load paths and bundle names mf.svn_path_entry.delete(0, tk.END) svn_path_value: str = settings.get("svn_working_copy_path", "") mf.svn_path_entry.insert(0, svn_path_value) repo_path_for_refresh = svn_path_value # Store path for later checks mf.usb_path_entry.delete(0, tk.END) mf.usb_path_entry.insert(0, settings.get("usb_drive_path", "")) mf.bundle_name_entry.delete(0, tk.END) mf.bundle_name_entry.insert(0, settings.get("bundle_name", "")) mf.bundle_updated_name_entry.delete(0, tk.END) mf.bundle_updated_name_entry.insert( 0, settings.get("bundle_name_updated", "") ) # Load backup settings mf.autobackup_var.set( str(settings.get("autobackup", "False")).lower() == "true" ) mf.backup_dir_var.set( settings.get("backup_dir", DEFAULT_BACKUP_DIR) ) mf.backup_exclude_extensions_var.set( settings.get("backup_exclude_extensions", "") ) mf.backup_exclude_dirs_var.set( settings.get("backup_exclude_dirs", "") ) mf.toggle_backup_dir() # Update state of backup dir entry # Load commit settings mf.autocommit_var.set( str(settings.get("autocommit", "False")).lower() == "true" ) mf.clear_commit_message() # Clear existing message if mf.commit_message_text.winfo_exists(): # Safely insert new message current_state = mf.commit_message_text.cget("state") mf.commit_message_text.config(state=tk.NORMAL) mf.commit_message_text.insert("1.0", settings.get("commit_message", "")) mf.commit_message_text.config(state=current_state) # Restore original state # Load remote repository settings if hasattr(mf, "remote_url_var") and hasattr(mf, "remote_name_var"): mf.remote_url_var.set(settings.get("remote_url", "")) mf.remote_name_var.set( settings.get("remote_name", DEFAULT_REMOTE_NAME) ) else: # Log if remote widgets are unexpectedly missing log_handler.log_warning( "Remote URL/Name widgets not found in GUI during load.", func_name=func_name ) log_handler.log_info( f"Applied settings from '{profile_name}' to GUI fields.", func_name=func_name ) # --- Update Repo Status and Trigger Refreshes --- # Update the SVN status indicator based on the loaded path self.update_svn_status_indicator(repo_path_for_refresh) # Check if the repository is ready (valid Git repo) is_ready: bool = self._is_repo_ready(repo_path_for_refresh) if is_ready: # If ready, trigger asynchronous refreshes for repo data log_handler.log_info( "Repo ready, triggering async refreshes.", func_name=func_name ) self.refresh_tag_list() self.refresh_branch_list() # Refreshes local branches self.refresh_commit_history() self.refresh_changed_files_list() # Also check remote status after loading a ready profile self.check_connection_auth() # Check auth/conn status self.refresh_remote_status() # Check ahead/behind status # Status bar will be updated by the results of these async operations else: # If not ready, clear dynamic GUI lists log_handler.log_info( "Repo not ready, clearing dynamic lists.", func_name=func_name ) self._update_gui_for_not_ready_state() # Use helper method # Set final status bar message for this case mf.update_status_bar( f"Profile '{profile_name}' loaded (Repo not ready)." ) except Exception as e: # Handle errors during settings application log_handler.log_exception( f"Error applying settings for '{profile_name}': {e}", func_name=func_name ) # Update sync status label to indicate error if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status(status_text="Sync Status: Error") # Show error popup and update status bar mf.show_error("Profile Load Error", f"Failed to apply settings:\n{e}") mf.update_status_bar(f"Error loading profile '{profile_name}'.") def save_profile_settings(self) -> bool: """ Saves current GUI field values to the selected profile in the config file. """ func_name: str = "save_profile_settings" # Get currently selected profile name profile_name: str = self.main_frame.profile_var.get() # Validate profile selection if not profile_name: log_handler.log_warning( "Save failed: No profile selected.", func_name=func_name ) if hasattr(self, "main_frame"): self.main_frame.update_status_bar("Save failed: No profile selected.") return False # Indicate failure # Ensure main frame is available if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): log_handler.log_error( "Cannot save profile: Main frame not available.", func_name=func_name ) return False # Indicate failure log_handler.log_info( f"Saving settings for profile: '{profile_name}'", func_name=func_name ) mf: MainFrame = self.main_frame cm: ConfigManager = self.config_manager status_final: str = "Ready." # Default status message success: bool = False try: # Gather all settings from GUI widgets settings_to_save: dict = { "svn_working_copy_path": mf.svn_path_entry.get(), "usb_drive_path": mf.usb_path_entry.get(), "bundle_name": mf.bundle_name_entry.get(), "bundle_name_updated": mf.bundle_updated_name_entry.get(), "autocommit": str(mf.autocommit_var.get()), # Convert bool to string "commit_message": mf.get_commit_message(), "autobackup": str(mf.autobackup_var.get()), # Convert bool to string "backup_dir": mf.backup_dir_var.get(), "backup_exclude_extensions": mf.backup_exclude_extensions_var.get(), "backup_exclude_dirs": mf.backup_exclude_dirs_var.get(), "remote_url": mf.remote_url_var.get(), "remote_name": mf.remote_name_var.get().strip() or DEFAULT_REMOTE_NAME, # Use default if empty } # Log the settings being saved (optional, for debugging) log_handler.log_debug( f"Settings to save: {settings_to_save}", func_name=func_name ) # Save each setting using ConfigManager for key, value in settings_to_save.items(): # set_profile_option handles string conversion and section creation cm.set_profile_option(profile_name, key, value) # Write the updated configuration to the .ini file cm.save_config() log_handler.log_info( f"Settings saved successfully for '{profile_name}'.", func_name=func_name ) status_final = f"Profile '{profile_name}' saved." success = True # Indicate success except Exception as e: # Handle errors during saving process log_handler.log_exception( f"Error saving profile '{profile_name}': {e}", func_name=func_name ) status_final = f"Error saving profile '{profile_name}'." mf.show_error("Save Error", f"Failed:\n{e}") success = False # Indicate failure finally: # Update the status bar regardless of success/failure mf.update_status_bar(status_final) return success # Return success status def add_profile(self): """ Handles adding a new profile (dialog and config update). """ func_name: str = "add_profile" log_handler.log_debug("'Add Profile' button clicked.", func_name=func_name) # Ensure main frame is available if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Update status bar and prompt user for new profile name self.main_frame.update_status_bar("Adding new profile...") name: Optional[str] = self.main_frame.ask_new_profile_name() # Returns str or None # Handle cancellation if name is None: # Check for None explicitly log_handler.log_info("Add profile cancelled.", func_name=func_name) self.main_frame.update_status_bar("Add profile cancelled.") return # Validate the entered name name = name.strip() # Remove leading/trailing whitespace if not name: # Show error if name is empty after stripping log_handler.log_warning("Add failed: Name empty.", func_name=func_name) self.main_frame.show_error("Input Error", "Profile name cannot be empty.") self.main_frame.update_status_bar("Add failed: Empty name.") return if name in self.config_manager.get_profile_sections(): # Show error if profile name already exists log_handler.log_warning( f"Add failed: '{name}' exists.", func_name=func_name ) self.main_frame.show_error("Error", f"Profile '{name}' already exists.") self.main_frame.update_status_bar(f"Add failed: '{name}' exists.") return # Proceed with adding the new profile log_handler.log_info( f"Attempting to add new profile: '{name}'", func_name=func_name ) status_final: str = "Ready." # Default status message try: # Get default settings from ConfigManager defaults: dict = self.config_manager._get_expected_keys_with_defaults() # Customize some defaults for a new profile defaults["bundle_name"] = f"{name}_repo.bundle" defaults["bundle_name_updated"] = f"{name}_update.bundle" defaults["svn_working_copy_path"] = "" # Start with empty paths defaults["usb_drive_path"] = "" defaults["remote_url"] = "" # Start with empty remote URL defaults["commit_message"] = f"Initial commit for profile {name}" # Example commit message # Add the new section and set default options self.config_manager.add_section(name) # Creates section if needed for key, value in defaults.items(): # Use set_profile_option which handles string conversion self.config_manager.set_profile_option(name, key, value) # Save the configuration file self.config_manager.save_config() log_handler.log_info( f"Profile '{name}' added successfully.", func_name=func_name ) # Update the GUI: add profile to dropdown and select it sections: list = self.config_manager.get_profile_sections() self.main_frame.update_profile_dropdown(sections) # Setting the variable triggers load_profile_settings via trace self.main_frame.profile_var.set(name) # Status bar will be updated by load_profile_settings except Exception as e: # Handle errors during profile addition log_handler.log_exception( f"Error adding profile '{name}': {e}", func_name=func_name ) status_final = f"Error adding profile '{name}'." self.main_frame.show_error("Add Error", f"Failed:\n{e}") self.main_frame.update_status_bar(status_final) def remove_profile(self): """ Handles removing the selected profile after user confirmation. """ func_name: str = "remove_profile" log_handler.log_debug("'Remove Profile' button clicked.", func_name=func_name) # Ensure main frame is available if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Get the currently selected profile profile: str = self.main_frame.profile_var.get() # Validate profile selection if not profile: log_handler.log_warning( "Remove failed: No profile selected.", func_name=func_name ) self.main_frame.show_error("Error", "No profile selected.") self.main_frame.update_status_bar("Remove failed: No profile.") return # Prevent removing the default profile if profile == DEFAULT_PROFILE: log_handler.log_warning( "Attempt remove default denied.", func_name=func_name ) self.main_frame.show_error( "Denied", f"Cannot remove default profile ('{DEFAULT_PROFILE}')." ) self.main_frame.update_status_bar("Cannot remove default.") return # Confirm removal with the user if self.main_frame.ask_yes_no( title="Confirm Remove", message=f"Remove profile '{profile}'?\nThis cannot be undone." ): # User confirmed, proceed with removal log_handler.log_info( f"Attempting remove profile: '{profile}'", func_name=func_name ) self.main_frame.update_status_bar( f"Processing: Removing profile '{profile}'..." ) status_final: str = "Ready." # Default status message try: # Call ConfigManager to remove the profile section removed: bool = self.config_manager.remove_profile_section(profile) if removed: # Save the configuration file after successful removal self.config_manager.save_config() log_handler.log_info( f"Profile '{profile}' removed.", func_name=func_name ) status_final = f"Profile '{profile}' removed." # Update the profile dropdown and select another profile (e.g., default) sections: list = self.config_manager.get_profile_sections() self.main_frame.update_profile_dropdown(sections) # Selecting another profile will trigger load_profile_settings else: # Handle rare case where remove_profile_section returns False log_handler.log_error( f"Failed remove profile '{profile}' (ConfigManager returned False).", func_name=func_name ) status_final = f"Error removing profile '{profile}'." self.main_frame.show_error( "Error", f"Could not remove '{profile}'. ConfigManager denied." ) self.main_frame.update_status_bar(status_final) except Exception as e: # Handle errors during profile removal log_handler.log_exception( f"Error removing profile '{profile}': {e}", func_name=func_name ) status_final = f"Error removing profile '{profile}'." self.main_frame.show_error("Error", f"Failed:\n{e}") self.main_frame.update_status_bar(status_final) else: # User cancelled the removal log_handler.log_info("Profile removal cancelled.", func_name=func_name) self.main_frame.update_status_bar("Removal cancelled.") # --- GUI Interaction & Helper Methods --- def browse_folder(self, entry_widget: tk.Entry): """ Opens a directory chooser dialog and updates the given Entry widget. """ func_name: str = "browse_folder" current_path: str = entry_widget.get() # Determine a sensible initial directory for the dialog initial_dir: str = os.path.expanduser("~") # Default to user's home if current_path and os.path.isdir(current_path): # If current path is a valid directory, use it initial_dir = current_path elif current_path and os.path.exists(os.path.dirname(current_path)): # If current path is not a dir, but its parent exists, use the parent initial_dir = os.path.dirname(current_path) log_handler.log_debug( f"Opening folder browser. Initial: {initial_dir}", func_name=func_name ) # Show the directory selection dialog directory: Optional[str] = filedialog.askdirectory( # Returns str or None initialdir=initial_dir, title="Select Directory", parent=self.master, # Make dialog modal to the main window ) # If a directory was selected, update the entry widget if directory: log_handler.log_debug( f"Directory selected: {directory}", func_name=func_name ) entry_widget.delete(0, tk.END) # Clear current content entry_widget.insert(0, directory) # Insert selected path # If the SVN path was changed, trigger status update if hasattr(self.main_frame, "svn_path_entry") and entry_widget == self.main_frame.svn_path_entry: self.update_svn_status_indicator(directory) else: # User cancelled the dialog log_handler.log_debug("Folder browse cancelled.", func_name=func_name) def update_svn_status_indicator(self, svn_path: str): """ Checks the status of the directory specified by svn_path. Updates the GUI indicator (Green/Red circle) and enables/disables relevant action widgets based on whether it's a valid Git repository. """ func_name: str = "update_svn_status_indicator" # Check if path is a valid directory is_valid_dir: bool = bool(svn_path and os.path.isdir(svn_path)) # Check if it's a valid Git repo (contains a .git directory) is_repo_ready: bool = is_valid_dir and os.path.exists(os.path.join(svn_path, ".git")) log_handler.log_debug( f"Updating repo status indicator. Path='{svn_path}', ValidDir={is_valid_dir}, Ready={is_repo_ready}", func_name=func_name ) # Ensure main frame is available if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return mf: MainFrame = self.main_frame # Update the colored indicator and its tooltip in the GUI mf.update_svn_indicator(is_repo_ready) # --- Determine State for Various Widgets --- # State based on whether it's a prepared Git repo repo_ready_state: str = tk.NORMAL if is_repo_ready else tk.DISABLED # State based only on whether it's a valid directory path valid_dir_state: str = tk.NORMAL if is_valid_dir else tk.DISABLED # State for 'Prepare' button: enabled only if it's a valid dir BUT NOT already a repo prepare_state: str = tk.NORMAL if is_valid_dir and not is_repo_ready else tk.DISABLED # State for 'Fetch from Bundle' button (depends on repo state or bundle existence) fetch_button_state: str = self._calculate_fetch_button_state(mf, svn_path, is_repo_ready) # --- Apply States to Widgets --- # Use try-except to prevent errors if a widget doesn't exist try: # Repository Tab if hasattr(mf, "prepare_svn_button"): mf.prepare_svn_button.config(state=prepare_state) if hasattr(mf, "create_bundle_button"): mf.create_bundle_button.config(state=repo_ready_state) if hasattr(mf, "fetch_bundle_button"): mf.fetch_bundle_button.config(state=fetch_button_state) if hasattr(mf, "edit_gitignore_button"): mf.edit_gitignore_button.config(state=repo_ready_state) # Backup Tab if hasattr(mf, "manual_backup_button"): mf.manual_backup_button.config(state=valid_dir_state) # Only needs valid dir # Commit/Changes Tab if hasattr(mf, "autocommit_checkbox"): mf.autocommit_checkbox.config(state=repo_ready_state) if hasattr(mf, "commit_message_text"): mf.commit_message_text.config(state=repo_ready_state) if hasattr(mf, "refresh_changes_button"): mf.refresh_changes_button.config(state=repo_ready_state) if hasattr(mf, "commit_button"): mf.commit_button.config(state=repo_ready_state) # Tags Tab if hasattr(mf, "refresh_tags_button"): mf.refresh_tags_button.config(state=repo_ready_state) if hasattr(mf, "create_tag_button"): mf.create_tag_button.config(state=repo_ready_state) if hasattr(mf, "checkout_tag_button"): mf.checkout_tag_button.config(state=repo_ready_state) if hasattr(mf, "tag_listbox"): mf.tag_listbox.config(state=repo_ready_state) # Branches (Local Ops) Tab if hasattr(mf, "refresh_branches_button"): mf.refresh_branches_button.config(state=repo_ready_state) if hasattr(mf, "create_branch_button"): mf.create_branch_button.config(state=repo_ready_state) if hasattr(mf, "checkout_branch_button"): mf.checkout_branch_button.config(state=repo_ready_state) if hasattr(mf, "branch_listbox"): mf.branch_listbox.config(state=repo_ready_state) # History Tab if hasattr(mf, "refresh_history_button"): mf.refresh_history_button.config(state=repo_ready_state) if hasattr(mf, "history_branch_filter_combo"): combo_state: str = "readonly" if is_repo_ready else tk.DISABLED mf.history_branch_filter_combo.config(state=combo_state) if hasattr(mf, "history_text"): mf.history_text.config(state=repo_ready_state) # Remote Tab Widgets if hasattr(mf, "apply_remote_config_button"): mf.apply_remote_config_button.config(state=repo_ready_state) if hasattr(mf, "check_auth_button"): mf.check_auth_button.config(state=repo_ready_state) if hasattr(mf, "fetch_button"): mf.fetch_button.config(state=repo_ready_state) if hasattr(mf, "pull_button"): mf.pull_button.config(state=repo_ready_state) if hasattr(mf, "push_button"): mf.push_button.config(state=repo_ready_state) if hasattr(mf, "push_tags_button"): mf.push_tags_button.config(state=repo_ready_state) if hasattr(mf, "refresh_sync_status_button"): mf.refresh_sync_status_button.config(state=repo_ready_state) if hasattr(mf, "refresh_remote_branches_button"): mf.refresh_remote_branches_button.config(state=repo_ready_state) if hasattr(mf, "remote_branches_listbox"): mf.remote_branches_listbox.config(state=repo_ready_state) if hasattr(mf, "local_branches_listbox_remote_tab"): mf.local_branches_listbox_remote_tab.config(state=repo_ready_state) if hasattr(mf, "refresh_local_branches_button_remote_tab"): mf.refresh_local_branches_button_remote_tab.config(state=repo_ready_state) # Clear Changed Files list only if repo is NOT ready if hasattr(mf, "changed_files_listbox"): if not is_repo_ready: log_handler.log_debug( "Repo not ready, clearing changes list via status update.", func_name=func_name ) mf.update_changed_files_list(["(Repository not ready)"]) # else: If repo is ready, let the async refresh handle the list content except Exception as e: # Log errors if updating widget states fails log_handler.log_error( f"Error updating widget states based on repo status: {e}", func_name=func_name ) def _calculate_fetch_button_state(self, main_frame: 'MainFrame', svn_path: str, is_repo_ready: bool) -> str: """ Determines the state (NORMAL/DISABLED) for the 'Fetch from Bundle' button. Enabled if the repo is ready (for fetch/merge) OR if the target directory is usable and the specified bundle file exists (for cloning). """ func_name: str = "_calculate_fetch_button_state" try: # Check if the svn_path directory can be used as a clone destination can_use_svn_dir_for_clone: bool = False if svn_path: if os.path.isdir(svn_path): # Usable if it's an empty directory try: if not os.listdir(svn_path): can_use_svn_dir_for_clone = True except OSError: # Ignore permission errors etc. when checking listdir pass else: # If not a directory, check if the parent directory exists # (so the target directory could potentially be created) parent_dir: str = os.path.dirname(svn_path) if parent_dir and os.path.isdir(parent_dir): can_use_svn_dir_for_clone = True elif not parent_dir: # Path is just a name in the current dir can_use_svn_dir_for_clone = True # Check if the specified bundle file exists in the USB/Target path bundle_file_exists: bool = False usb_path_str: str = main_frame.usb_path_entry.get().strip() bundle_fetch_name: str = main_frame.bundle_updated_name_entry.get().strip() # Check if USB path and bundle name are set and USB path is a directory if usb_path_str and bundle_fetch_name and os.path.isdir(usb_path_str): bundle_full_path: str = os.path.join(usb_path_str, bundle_fetch_name) # Check if the bundle file actually exists if os.path.isfile(bundle_full_path): bundle_file_exists = True # Enable the button if either condition is met if is_repo_ready or (can_use_svn_dir_for_clone and bundle_file_exists): return tk.NORMAL else: return tk.DISABLED except Exception as e: # Log errors during state calculation and default to disabled log_handler.log_error( f"Error checking fetch button state: {e}", func_name=func_name ) return tk.DISABLED def _is_repo_ready(self, repo_path: str) -> bool: """ Checks if the given path points to a valid Git repository (.git exists). """ # Check if path is non-empty, is a directory, and contains a '.git' subdirectory is_valid_git_repo: bool = bool( repo_path and os.path.isdir(repo_path) and os.path.exists(os.path.join(repo_path, ".git")) ) return is_valid_git_repo def _parse_exclusions(self) -> tuple[set[str], set[str]]: """ Parses comma-separated exclusion strings from the GUI variables into sets of lowercase extensions and directory base names. Always includes '.git' and '.svn' in directory exclusions. """ # Initialize sets excluded_extensions: set[str] = set() # Always exclude .git and .svn directories from backups excluded_dirs: set[str] = {".git", ".svn"} # Ensure main frame is available if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return excluded_extensions, excluded_dirs mf: MainFrame = self.main_frame # Parse excluded extensions ext_string: str = mf.backup_exclude_extensions_var.get() if ext_string: # Split by comma, strip whitespace, convert to lowercase for ext in ext_string.split(","): cleaned_ext: str = ext.strip().lower() if cleaned_ext: # Ensure extension starts with a dot normalized_ext: str = "." + cleaned_ext.lstrip(".") excluded_extensions.add(normalized_ext) # Parse excluded directory names dir_string: str = mf.backup_exclude_dirs_var.get() if dir_string: # Split by comma, strip whitespace/slashes, convert to lowercase for dirname in dir_string.split(","): # Remove leading/trailing whitespace and path separators cleaned_dir: str = dirname.strip().lower().strip(os.path.sep + "/") # Add if valid name and not already included if cleaned_dir and cleaned_dir not in {".", ".."} and cleaned_dir not in excluded_dirs: excluded_dirs.add(cleaned_dir) log_handler.log_debug( f"Parsed Exclusions - Exts: {excluded_extensions}, Dirs: {excluded_dirs}", func_name="_parse_exclusions", ) return excluded_extensions, excluded_dirs def _get_and_validate_svn_path(self, operation_name: str = "Operation") -> Optional[str]: """ Gets the Working Directory path from the GUI and validates that it's an existing directory. Returns the absolute path or None if invalid. """ func_name: str = "_get_and_validate_svn_path" # Ensure main frame and entry widget exist if not hasattr(self, "main_frame") or not hasattr(mf := self.main_frame, "svn_path_entry"): log_handler.log_error( f"{operation_name} failed: SVN path entry widget missing.", func_name=func_name ) return None # Get path from entry and remove leading/trailing whitespace path_str: str = mf.svn_path_entry.get().strip() # Check if the path string is empty if not path_str: log_handler.log_warning( f"{operation_name} failed: Working Directory path is empty.", func_name=func_name ) mf.show_error("Input Error", "Working Directory path cannot be empty.") mf.update_status_bar(f"{operation_name} failed: Path empty.") return None # Convert to absolute path and check if it's a valid directory abs_path: str = os.path.abspath(path_str) if not os.path.isdir(abs_path): log_handler.log_warning( f"{operation_name} failed: Path is not a valid directory: {abs_path}", func_name=func_name ) mf.show_error( "Path Error", f"The specified path is not a valid directory:\n{abs_path}" ) mf.update_status_bar(f"{operation_name} failed: Not a directory.") return None # Path is valid, log and return the absolute path log_handler.log_debug( f"{operation_name}: Using validated Working Directory path: {abs_path}", func_name=func_name ) return abs_path def _get_and_validate_usb_path(self, operation_name: str = "Operation") -> Optional[str]: """ Gets the Bundle Target Directory path from the GUI and validates that it's an existing directory. Returns the absolute path or None if invalid. """ func_name: str = "_get_and_validate_usb_path" # Ensure main frame and entry widget exist if not hasattr(self, "main_frame") or not hasattr(mf := self.main_frame, "usb_path_entry"): log_handler.log_error( f"{operation_name} failed: Bundle Target path entry widget missing.", func_name=func_name ) return None # Get path from entry and remove leading/trailing whitespace path_str: str = mf.usb_path_entry.get().strip() # Check if the path string is empty if not path_str: log_handler.log_warning( f"{operation_name} failed: Bundle Target path is empty.", func_name=func_name ) mf.show_error("Input Error", "Bundle Target path cannot be empty.") mf.update_status_bar(f"{operation_name} failed: Path empty.") return None # Convert to absolute path and check if it's a valid directory abs_path: str = os.path.abspath(path_str) if not os.path.isdir(abs_path): log_handler.log_warning( f"{operation_name} failed: Path is not a valid directory: {abs_path}", func_name=func_name ) mf.show_error( "Path Error", f"The specified path is not a valid directory:\n{abs_path}" ) mf.update_status_bar(f"{operation_name} failed: Not a directory.") return None # Path is valid, log and return the absolute path log_handler.log_debug( f"{operation_name}: Using validated Bundle Target path: {abs_path}", func_name=func_name ) return abs_path def _clear_and_disable_fields(self): """ Clears most GUI input fields and list displays, and disables action buttons. Typically used when no profile is selected or the repository is not ready. """ # Ensure main frame is available if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return mf: MainFrame = self.main_frame log_handler.log_debug( "Clearing and disabling fields.", func_name="_clear_and_disable_fields" ) # --- Clear Text Entries and Variables --- if hasattr(mf, "svn_path_entry"): mf.svn_path_entry.delete(0, tk.END) if hasattr(mf, "usb_path_entry"): mf.usb_path_entry.delete(0, tk.END) if hasattr(mf, "bundle_name_entry"): mf.bundle_name_entry.delete(0, tk.END) if hasattr(mf, "bundle_updated_name_entry"): mf.bundle_updated_name_entry.delete(0, tk.END) if hasattr(mf, "clear_commit_message"): mf.clear_commit_message() if hasattr(mf, "backup_dir_var"): mf.backup_dir_var.set("") if hasattr(mf, "backup_exclude_extensions_var"): mf.backup_exclude_extensions_var.set("") if hasattr(mf, "backup_exclude_dirs_var"): mf.backup_exclude_dirs_var.set("") if hasattr(mf, "remote_url_var"): mf.remote_url_var.set("") if hasattr(mf, "remote_name_var"): mf.remote_name_var.set("") if hasattr(mf, "autobackup_var"): mf.autobackup_var.set(False) if hasattr(mf, "autocommit_var"): mf.autocommit_var.set(False) # Update widgets linked to variables (like backup dir entry state) if hasattr(mf, "toggle_backup_dir"): mf.toggle_backup_dir() # --- Clear Dynamic Lists using helper --- self._update_gui_for_not_ready_state() # --- Disable Action Widgets --- # Calling update_svn_status_indicator with empty path handles disabling most widgets self.update_svn_status_indicator("") # Ensure profile buttons (except Add/Clone) are also disabled if hasattr(mf, "remove_profile_button"): mf.remove_profile_button.config(state=tk.DISABLED) if hasattr(mf, "save_settings_button"): mf.save_settings_button.config(state=tk.DISABLED) # Set final status bar message mf.update_status_bar("No profile selected or repository not ready.") def show_fatal_error(self, message: str): """ Displays a fatal error message box and attempts to close the application. """ log_handler.log_critical( f"FATAL ERROR: {message}", func_name="show_fatal_error" ) try: # Try to use the main window as parent for the message box parent_window: Optional[tk.Tk] = None if hasattr(self, "master") and self.master and self.master.winfo_exists(): parent_window = self.master messagebox.showerror("Fatal Error", message, parent=parent_window) except Exception as e: # Fallback to printing error if GUI fails print(f"FATAL ERROR (GUI message failed: {e}): {message}", file=sys.stderr) finally: # Always attempt to close the application after a fatal error self.on_closing() def show_comparison_summary(self, ref1: str, ref2: str, repo_path: str, changed_files: List[str]): """ Opens the DiffSummaryWindow to display comparison results. Called by the AsyncResultHandler. """ func_name: str = "show_comparison_summary" log_handler.log_debug( f"Attempting to show comparison summary: {ref1} vs {ref2}", func_name=func_name ) # Verifica che i dati necessari siano presenti if not all([ref1, ref2, repo_path, isinstance(changed_files, list)]): log_handler.log_error( "Missing data required to show comparison summary.", func_name=func_name ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error( "Display Error", "Internal error: Missing data for comparison." ) # Assicurati di riabilitare i widget anche in caso di errore qui self._reenable_widgets_after_modal() # Usa l'helper per riabilitare return try: # Crea e mostra la finestra modale log_handler.log_debug( f"Opening DiffSummaryWindow with {len(changed_files)} files.", func_name=func_name ) # Assicurati che DiffSummaryWindow sia importato correttamente all'inizio del file DiffSummaryWindow( master=self.master, # Parent è la finestra root git_commands=self.git_commands, # Passa l'istanza dei comandi repo_path=repo_path, ref1=ref1, ref2=ref2, changed_files_status=changed_files # Passa la lista ) # Il codice attende qui finché la finestra non viene chiusa log_handler.log_info("Diff Summary window closed by user.", func_name=func_name) # Ripristina la status bar (la finestra era modale) if hasattr(self.main_frame, "update_status_bar"): self.main_frame.update_status_bar("Ready.") except Exception as e_summary: # Gestisci errori durante la creazione/visualizzazione della finestra log_handler.log_exception( f"Error opening diff summary window: {e_summary}", func_name=func_name ) if hasattr(self.main_frame, "show_error") and hasattr(self.main_frame, "update_status_bar"): self.main_frame.show_error( "Display Error", f"Could not display comparison results:\n{e_summary}" ) self.main_frame.update_status_bar("Error displaying comparison.") finally: # Assicurati che i widget vengano riabilitati dopo che la finestra # (o il messaggio di errore) è stata chiusa. self._reenable_widgets_after_modal() # Usa l'helper per riabilitare # --- Helper Methods for Updating GUI in Specific States --- def _update_gui_for_not_ready_state(self): """ Centralizes GUI updates needed when the repository is not ready. """ if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return mf: MainFrame = self.main_frame log_handler.log_debug( "Updating GUI for 'Repo Not Ready' state.", func_name="_update_gui_for_not_ready_state" ) # Clear all dynamic list displays if hasattr(mf, "update_tag_list"): mf.update_tag_list([("(Repo not ready)", "")]) if hasattr(mf, "update_branch_list"): mf.update_branch_list([], None) # Clears both local lists if hasattr(mf, "update_history_display"): mf.update_history_display(["(Repo not ready)"]) if hasattr(mf, "update_history_branch_filter"): mf.update_history_branch_filter([]) if hasattr(mf, "update_changed_files_list"): mf.update_changed_files_list(["(Repo not ready)"]) if hasattr(mf, "update_remote_branches_list"): mf.update_remote_branches_list(["(Repo not ready)"]) # Update sync status label if hasattr(mf, "update_ahead_behind_status"): mf.update_ahead_behind_status(status_text="Sync Status: (Repo not ready)") # Optional: Update status bar explicitly if needed # mf.update_status_bar("Ready (Repo not ready).") def _update_gui_for_detached_head(self, current_branch_name: Optional[str]): """ Centralizes GUI updates for detached HEAD state. """ if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return mf: MainFrame = self.main_frame log_handler.log_debug( "Updating GUI for 'Detached HEAD' state.", func_name="_update_gui_for_detached_head" ) # Update sync status label if hasattr(mf, "update_ahead_behind_status"): mf.update_ahead_behind_status( current_branch=current_branch_name, # Pass None here status_text="Sync Status: (Detached HEAD)" ) # Disable sync status refresh button when detached if hasattr(mf, "refresh_sync_status_button"): mf.refresh_sync_status_button.config(state=tk.DISABLED) def _update_gui_for_no_upstream(self, current_branch_name: Optional[str]): """ Centralizes GUI updates when no upstream is set for the current branch. """ if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return mf: MainFrame = self.main_frame log_handler.log_debug( "Updating GUI for 'No Upstream' state.", func_name="_update_gui_for_no_upstream" ) # Update sync status label if hasattr(mf, "update_ahead_behind_status"): mf.update_ahead_behind_status( current_branch=current_branch_name, status_text=f"Sync Status: Upstream not set" ) # Disable sync status refresh button if no upstream is configured if hasattr(mf, "refresh_sync_status_button"): mf.refresh_sync_status_button.config(state=tk.DISABLED) def _reenable_widgets_after_modal(self): """ Schedules widget re-enabling after a short delay. Useful after modal windows close or errors are handled. """ func_name: str = "_reenable_widgets_after_modal" # ---<<< CORREZIONE: Usa self.master e self._reenable_widgets_if_ready >>>--- # Verifica se la finestra master (attributo di GitSvnSyncApp) esiste if hasattr(self, "master") and self.master.winfo_exists(): # Usa self.master.after per schedulare la chiamata a self._reenable_widgets_if_ready self.master.after(50, self._reenable_widgets_if_ready) log_handler.log_debug("Scheduled widget re-enable.", func_name=func_name) # ---<<< FINE CORREZIONE >>>--- else: log_handler.log_warning( "Cannot schedule widget re-enable: Master window destroyed.", func_name=func_name ) def _update_gui_for_status_error(self): """ Centralizes GUI updates when there's an error getting status info. """ if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return mf: MainFrame = self.main_frame log_handler.log_debug( "Updating GUI for 'Status Error' state.", func_name="_update_gui_for_status_error" ) # Update sync status label if hasattr(mf, "update_ahead_behind_status"): mf.update_ahead_behind_status(status_text="Sync Status: Error getting info") # Disable sync status refresh button on error if hasattr(mf, "refresh_sync_status_button"): mf.refresh_sync_status_button.config(state=tk.DISABLED) # --- Helper to update authentication status (internal state + GUI) --- def _update_gui_auth_status(self, status: str): """ Updates the internal authentication status state variable and calls the MainFrame method to update the visual indicator. Also updates the sync status label if auth/connection fails. """ # Update internal state tracking authentication status self.remote_auth_status = status # Update the GUI indicator if the main frame exists if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): # Call the method within MainFrame to update the indicator's text/color self.main_frame._update_auth_status_indicator(status) # If connection or authentication failed, update the sync status label too if status != "ok": if hasattr(self.main_frame, "update_ahead_behind_status"): # Provide a user-friendly status text based on the error type # Replace underscores and capitalize for display sync_status_text: str = f"Sync Status: ({status.replace('_', ' ').title()})" self.main_frame.update_ahead_behind_status(status_text=sync_status_text) def _start_async_operation( self, worker_func: Callable, # The worker function from async_workers.py args_tuple: tuple, # Arguments for the worker function (excluding queue) context_dict: dict # Context information for result handling ): """ Generic helper to start an async operation in a separate thread. Disables GUI action widgets and updates the status bar during execution. Args: worker_func (Callable): The target function to run in the thread. args_tuple (tuple): A tuple containing arguments for the worker_func. context_dict (dict): A dictionary containing context information (like 'context', 'status_msg') for the task and its result processing. """ # Ensure main frame is available if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): log_handler.log_error( "Cannot start async op: Main frame missing.", func_name="_start_async_operation" ) return # Extract context details for logging and status updates context_name: str = context_dict.get("context", "unknown_op") # Status message displayed to the user while processing status_msg: str = context_dict.get("status_msg", context_name) log_handler.log_info( f"--- Action Triggered: {context_name} (Async Start) ---", func_name=context_name ) # --- Prepare GUI for Async Operation --- # Disable main action widgets to prevent concurrent operations self.main_frame.set_action_widgets_state(tk.DISABLED) # Update status bar to indicate processing with a yellow background self.main_frame.update_status_bar( message=f"Processing: {status_msg}...", bg_color=self.main_frame.STATUS_YELLOW # Use yellow for processing ) # --- Setup Communication Queue --- # Create a queue for this specific operation to receive the result # Maxsize=1 is sufficient as we expect only one result per operation results_queue: queue.Queue = queue.Queue(maxsize=1) # --- Prepare Worker Thread --- # Combine the provided arguments with the results queue for the worker full_args: tuple = args_tuple + (results_queue,) # Create and start the worker thread log_handler.log_debug( f"Creating worker thread for {context_name}. Worker func: {worker_func.__name__}", func_name="_start_async_operation" ) try: worker_thread = threading.Thread( target=worker_func, args=full_args, daemon=True # Set as daemon so it exits if main app closes unexpectedly ) log_handler.log_debug( f"Starting worker thread for {context_name}.", func_name="_start_async_operation" ) worker_thread.start() except Exception as thread_e: # Handle errors during thread creation/start log_handler.log_exception( f"Failed to start worker thread for {context_name}: {thread_e}", func_name="_start_async_operation" ) # Show error to user and re-enable GUI immediately self.main_frame.show_error( "Threading Error", f"Could not start background task for {context_name}." ) self.main_frame.update_status_bar( f"Error starting task: {context_name}", bg_color=self.main_frame.STATUS_RED, duration_ms=10000 ) self.main_frame.set_action_widgets_state(tk.NORMAL) return # Stop if thread cannot be started # --- Schedule Result Check --- # Schedule the _check_completion_queue method to run after a short delay # to check for results from the worker thread. log_handler.log_debug( f"Scheduling completion check for {context_name}.", func_name="_start_async_operation" ) self.master.after( self.ASYNC_QUEUE_CHECK_INTERVAL_MS, self._check_completion_queue, # Method to call for checking results_queue, # Queue to check context_dict # Pass original context for handling ) # --- Specific Action Launchers (Call _start_async_operation) --- # --- Refresh Actions --- def refresh_tag_list(self): """ Starts async operation to refresh the tag list in the GUI. """ func_name: str ="refresh_tag_list" svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Tags") # Check if repo is ready before starting async task if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_debug( "Refresh Tags skipped: Repo not ready.", func_name=func_name ) # Update GUI immediately if repo not ready self._update_gui_for_not_ready_state() return # Prepare arguments for the worker function args: tuple = (self.git_commands, svn_path) # Start the async operation self._start_async_operation( worker_func=async_workers.run_refresh_tags_async, args_tuple=args, context_dict={"context": "refresh_tags", "status_msg": "Refreshing tags"} ) def refresh_remote_status(self): """ Starts the asynchronous check for ahead/behind status of the current branch against its upstream counterpart. """ func_name: str = "refresh_remote_status" log_handler.log_info( f"--- Action Triggered: Refresh Remote Sync Status ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Sync Status") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Refresh Status skipped: Repo not ready.", func_name=func_name ) # Update GUI to show 'not ready' state self._update_gui_for_not_ready_state() return # --- Get Current Branch and Upstream --- current_branch: Optional[str] = None upstream_branch: Optional[str] = None try: # Get the name of the current local branch current_branch = self.git_commands.get_current_branch_name(svn_path) if current_branch: # If on a branch, get its configured upstream upstream_branch = self.git_commands.get_branch_upstream( svn_path, current_branch ) else: # Handle detached HEAD state log_handler.log_warning( "Refresh Status: Cannot get status, currently in detached HEAD state.", func_name=func_name ) self._update_gui_for_detached_head(current_branch) return # Cannot check sync status in detached HEAD # Handle case where upstream is not configured if not upstream_branch: log_handler.log_info( f"Refresh Status: No upstream configured for branch '{current_branch}'.", func_name=func_name ) self._update_gui_for_no_upstream(current_branch) return # Cannot check sync status without upstream # Enable refresh button if we have branch and upstream (might be disabled) if hasattr(self.main_frame, "refresh_sync_status_button"): self.main_frame.refresh_sync_status_button.config(state=tk.NORMAL) except Exception as e: # Handle errors getting branch/upstream info log_handler.log_exception( f"Error getting branch/upstream before status check: {e}", func_name=func_name ) self._update_gui_for_status_error() # Update GUI to show error state return # --- Start Async Worker --- log_handler.log_info( f"Checking ahead/behind status for '{current_branch}' vs '{upstream_branch}'...", func_name=func_name ) # Update GUI label to "Checking..." if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status( current_branch=current_branch, status_text="Sync Status: Checking..." ) # Prepare arguments for the worker args: tuple = (self.git_commands, svn_path, current_branch, upstream_branch) # Start async operation self._start_async_operation( worker_func=async_workers.run_get_ahead_behind_async, # Worker function args_tuple=args, context_dict={ "context": "get_ahead_behind", "status_msg": f"Checking sync status for '{current_branch}'", "local_branch": current_branch, # Pass context for result handler "upstream_branch": upstream_branch, } ) def refresh_branch_list(self): """ Starts async operation to refresh the local branch list in the GUI. """ func_name: str ="refresh_branch_list" svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Branches") # Check if repo is ready if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_debug( "Refresh Branches skipped: Repo not ready.", func_name=func_name ) self._update_gui_for_not_ready_state() return # Prepare arguments args: tuple = (self.git_commands, svn_path) # Start the async operation self._start_async_operation( worker_func=async_workers.run_refresh_branches_async, # Worker for local branches args_tuple=args, context_dict={"context": "refresh_branches", "status_msg": "Refreshing branches"} ) def refresh_commit_history(self): """ Starts async operation to refresh the commit history display. """ func_name: str ="refresh_commit_history" svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh History") # Check if repo is ready if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_debug( "Refresh History skipped: Repo not ready.", func_name=func_name ) self._update_gui_for_not_ready_state() return # Determine branch filter from GUI branch_filter: Optional[str] = None log_scope: str = "All History" if hasattr(self.main_frame, "history_branch_filter_var"): filter_selection: str = self.main_frame.history_branch_filter_var.get() # Use filter only if a specific branch/tag is selected if filter_selection and filter_selection != "-- All History --": branch_filter = filter_selection log_scope = f"'{branch_filter}'" # Prepare arguments args: tuple = (self.git_commands, svn_path, branch_filter, log_scope) # Start the async operation self._start_async_operation( worker_func=async_workers.run_refresh_history_async, args_tuple=args, context_dict={ "context": "refresh_history", "status_msg": f"Refreshing history for {log_scope}" } ) def refresh_changed_files_list(self): """ Starts async operation to refresh the list of changed files. """ func_name: str ="refresh_changed_files_list" svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Changed Files") # Check if repo is ready if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_debug( "Refresh Changes skipped: Repo not ready.", func_name=func_name ) # GUI update handled by _update_gui_for_not_ready_state if needed return # Prepare arguments args: tuple = (self.git_commands, svn_path) # Start the async operation self._start_async_operation( worker_func=async_workers.run_refresh_changes_async, args_tuple=args, context_dict={ "context": "refresh_changes", "status_msg": "Refreshing changed files" } ) # --- Local Repo / Bundle / Backup Actions --- def prepare_svn_for_git(self): """ Starts async operation to prepare the repository (init, gitignore). """ func_name: str ="prepare_svn_for_git" svn_path: Optional[str] = self._get_and_validate_svn_path("Prepare Repository") # Check if path is valid before starting if not svn_path: # Error message shown by validation method self.main_frame.update_status_bar("Prepare failed: Invalid path.") return # Check if already prepared (avoid unnecessary work) if self._is_repo_ready(svn_path): log_handler.log_info( "Prepare skipped: Repository already prepared.", func_name=func_name ) self.main_frame.show_info("Info", "Repository is already prepared.") # Ensure GUI state reflects readiness self.update_svn_status_indicator(svn_path) return # Prepare arguments args: tuple = (self.action_handler, svn_path) # Start the async operation self._start_async_operation( worker_func=async_workers.run_prepare_async, args_tuple=args, context_dict={ "context": "prepare_repo", "status_msg": "Preparing repository" } ) def create_git_bundle(self): """ Starts async operation to create a Git bundle file. """ func_name: str ="create_git_bundle" # Gather and validate inputs from GUI profile: str = self.main_frame.profile_var.get() svn_path: Optional[str] = self._get_and_validate_svn_path("Create Bundle") usb_path: Optional[str] = self._get_and_validate_usb_path("Create Bundle") bundle_name: str = self.main_frame.bundle_name_entry.get().strip() # Check if all required inputs are present if not profile or not svn_path or not usb_path or not bundle_name: log_handler.log_warning( "Create Bundle cancelled: Missing inputs.", func_name=func_name ) # Specific error messages shown by validation methods return # Check if repository is ready if not self._is_repo_ready(svn_path): log_handler.log_warning( "Create Bundle failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not prepared.") self.main_frame.update_status_bar("Create Bundle failed: Repo not ready.") return # Ensure bundle name has the correct extension if not bundle_name.lower().endswith(".bundle"): bundle_name += ".bundle" bundle_full_path: str = os.path.join(usb_path, bundle_name) # Save profile settings before starting the operation if not self.save_profile_settings(): # Ask user if they want to proceed even if saving failed if not self.main_frame.ask_yes_no( "Warning", "Could not save profile settings.\nContinue creating bundle anyway?" ): self.main_frame.update_status_bar( "Create Bundle cancelled (profile save failed)." ) return # Prepare parameters for the worker excluded_extensions: set[str] excluded_dirs: set[str] excluded_extensions, excluded_dirs = self._parse_exclusions() backup_enabled: bool = self.main_frame.autobackup_var.get() backup_dir: str = self.main_frame.backup_dir_var.get() commit_enabled: bool = self.main_frame.autocommit_var.get() commit_msg: str = self.main_frame.get_commit_message() # Prepare arguments tuple for the worker args: tuple = ( self.action_handler, svn_path, bundle_full_path, profile, backup_enabled, backup_dir, commit_enabled, commit_msg, excluded_extensions, excluded_dirs, ) # Start the async operation self._start_async_operation( worker_func=async_workers.run_create_bundle_async, args_tuple=args, context_dict={ "context": "create_bundle", "status_msg": f"Creating bundle '{bundle_name}'", "committed_flag_possible": True, # Context hint for result handler } ) def fetch_from_git_bundle(self): """ Starts async operation to fetch/clone from a Git bundle file. """ func_name: str ="fetch_from_git_bundle" # Gather and validate inputs profile: str = self.main_frame.profile_var.get() # svn_path_str can be a non-existent dir if cloning svn_path_str: str = self.main_frame.svn_path_entry.get().strip() usb_path: Optional[str] = self._get_and_validate_usb_path("Fetch Bundle") bundle_name: str = self.main_frame.bundle_updated_name_entry.get().strip() # Check for missing inputs if not profile or not svn_path_str or not usb_path or not bundle_name: log_handler.log_warning( "Fetch Bundle cancelled: Missing inputs.", func_name=func_name ) return # Construct full bundle path and check if it exists BEFORE starting async op bundle_full_path: str = os.path.join(usb_path, bundle_name) if not os.path.isfile(bundle_full_path): log_handler.log_error( f"Fetch Bundle failed: Bundle file not found at '{bundle_full_path}'", func_name=func_name ) self.main_frame.show_error( "File Not Found", f"Bundle file not found:\n{bundle_full_path}" ) self.main_frame.update_status_bar("Fetch failed: Bundle not found.") return # Save profile settings before action if not self.save_profile_settings(): if not self.main_frame.ask_yes_no( "Warning", "Could not save profile settings.\nContinue fetching from bundle anyway?" ): self.main_frame.update_status_bar( "Fetch cancelled (profile save failed)." ) return # Prepare parameters for worker excluded_extensions: set[str] excluded_dirs: set[str] excluded_extensions, excluded_dirs = self._parse_exclusions() backup_enabled: bool = self.main_frame.autobackup_var.get() backup_dir: str = self.main_frame.backup_dir_var.get() # Prepare arguments tuple args: tuple = ( self.action_handler, svn_path_str, bundle_full_path, profile, backup_enabled, backup_dir, excluded_extensions, excluded_dirs, ) # Start async operation self._start_async_operation( worker_func=async_workers.run_fetch_bundle_async, args_tuple=args, context_dict={ "context": "fetch_bundle", "status_msg": f"Fetching from '{bundle_name}'", "repo_path": svn_path_str, # Pass path for potential conflict message } ) def manual_backup(self): """ Starts async operation for creating a manual backup ZIP. """ func_name: str ="manual_backup" # Gather and validate inputs profile: str = self.main_frame.profile_var.get() svn_path: Optional[str] = self._get_and_validate_svn_path(f"Manual Backup ({profile})") backup_dir_str: str = self.main_frame.backup_dir_var.get().strip() # Check required inputs if not profile or not svn_path: # Error already shown by validation method return if not backup_dir_str: log_handler.log_warning( "Manual backup failed: Backup directory is empty.", func_name=func_name ) self.main_frame.show_error( "Input Error", "Backup directory cannot be empty for manual backup." ) self.main_frame.update_status_bar("Manual backup failed: Backup dir empty.") return # Validate backup directory path backup_dir_abs: str = os.path.abspath(backup_dir_str) # Check if path exists and is not a directory (create_zip_backup handles creation) if os.path.exists(backup_dir_abs) and not os.path.isdir(backup_dir_abs): log_handler.log_error( f"Manual backup failed: Backup path exists but is not a directory: {backup_dir_abs}", func_name=func_name ) self.main_frame.show_error( "Path Error", f"Backup path exists but is not a directory:\n{backup_dir_abs}" ) self.main_frame.update_status_bar( "Manual backup failed: Invalid backup path." ) return # Save profile settings before backup if not self.save_profile_settings(): if not self.main_frame.ask_yes_no( "Warning", "Could not save profile settings.\nContinue backup anyway?" ): self.main_frame.update_status_bar( "Backup cancelled (profile save failed)." ) return # Prepare parameters for worker excluded_extensions: set[str] excluded_dirs: set[str] excluded_extensions, excluded_dirs = self._parse_exclusions() # Prepare arguments tuple args: tuple = ( self.backup_handler, svn_path, backup_dir_abs, profile, excluded_extensions, excluded_dirs ) # Start async operation self._start_async_operation( worker_func=async_workers.run_manual_backup_async, args_tuple=args, context_dict={"context": "manual_backup", "status_msg": "Creating manual backup"} ) # --- Git Actions (Commit, Tag, Branch, etc.) --- def commit_changes(self): """ Starts async operation to commit staged changes with GUI message. """ func_name: str = "commit_changes" # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Commit") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Commit failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not ready.") self.main_frame.update_status_bar("Commit failed: Repo not ready.") return # Validate commit message commit_msg: str = self.main_frame.get_commit_message() if not commit_msg: log_handler.log_warning( "Commit failed: Commit message is empty.", func_name=func_name ) self.main_frame.show_error("Input Error", "Commit message cannot be empty.") self.main_frame.update_status_bar("Commit failed: Empty message.") return # Prepare args and start async operation args: tuple = (self.action_handler, svn_path, commit_msg) self._start_async_operation( worker_func=async_workers.run_commit_async, args_tuple=args, context_dict={ "context": "commit", "status_msg": "Committing changes", "committed_flag_possible": True # Hint for result handler } ) def open_gitignore_editor(self): """ Opens the .gitignore editor window (Synchronous GUI action). """ # This action is synchronous as it opens a modal dialog func_name: str = "open_gitignore_editor" log_handler.log_info( f"--- Action Triggered: Edit .gitignore ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness self.main_frame.update_status_bar("Processing: Opening .gitignore editor...") svn_path: Optional[str] = self._get_and_validate_svn_path("Edit .gitignore") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Cannot edit .gitignore: Repo path invalid/not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Select a valid and prepared repository first." ) self.main_frame.update_status_bar("Edit failed: Repo not ready.") return # Construct path and open editor window gitignore_path: str = os.path.join(svn_path, ".gitignore") log_handler.log_debug( f"Target .gitignore path: {gitignore_path}", func_name=func_name ) status_after_edit: str = "Ready." # Default status after editor closes try: log_handler.log_debug("Opening GitignoreEditorWindow...", func_name=func_name) # Open the modal editor window, passing the callback for successful save GitignoreEditorWindow( master=self.master, # Parent window gitignore_path=gitignore_path, logger_ignored=None, # Logger no longer passed on_save_success_callback=self._handle_gitignore_save # Method to call on save ) # Code execution pauses here until the editor window is closed log_handler.log_debug("Gitignore editor window closed.", func_name=func_name) # Update status bar only if no async operation was started by the callback if not self.main_frame.status_bar_var.get().startswith("Processing"): self.main_frame.update_status_bar(status_after_edit) except Exception as e: # Handle errors opening the editor log_handler.log_exception( f"Error opening or running .gitignore editor: {e}", func_name=func_name ) status_after_edit = "Error opening .gitignore editor." self.main_frame.show_error("Editor Error", f"Could not open editor:\n{e}") self.main_frame.update_status_bar(status_after_edit) def open_diff_viewer(self, file_status_line: str): """ Opens the Diff Viewer window for a file from the 'changed files' list. Compares Working Directory vs HEAD by default for changed files. Uses helper to extract path. This is a synchronous GUI action. """ func_name: str = "open_diff_viewer" log_handler.log_debug( f"Received file_status_line: {repr(file_status_line)}", func_name=func_name ) log_handler.log_info( f"--- Action Triggered: Open Diff Viewer for Changed File ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Open Diff Viewer") if not svn_path: # Error message shown by validation method self.main_frame.update_status_bar( "Error: Cannot open diff (invalid repo path)." ) return # Extract the relative path using the helper method relative_path: Optional[str] = self._extract_path_from_status_line( file_status_line ) log_handler.log_debug( f"Extracted relative_path via helper: {repr(relative_path)}", func_name=func_name ) # Check if path extraction was successful if not relative_path: log_handler.log_error( f"Could not extract valid path from status line: {file_status_line}", func_name=func_name ) self.main_frame.show_error( "Path Error", f"Could not parse file path from selected line:\n{file_status_line}" ) self.main_frame.update_status_bar("Error: Invalid selection for diff.") return # --- Check Status Code (e.g., prevent diff for deleted files vs WD) --- status_code: str = file_status_line.strip('\x00').strip()[:2].strip() # Prevent showing diff for files marked as Deleted (' D') against Working Dir if status_code == 'D': msg: str = ( f"Cannot show Working Dir vs HEAD diff for a deleted file:\n" f"{relative_path}" ) log_handler.log_info(msg, func_name=func_name) self.main_frame.show_info("Diff Not Applicable", msg) self.main_frame.update_status_bar( "Ready (Diff not applicable for deleted file)." ) return # Add checks for other non-diffable statuses if needed (e.g., '??', '!!') if status_code in ['??', '!!']: msg: str = ( f"Cannot show diff for file with status '{status_code}':\n" f"{relative_path}\n\n" f"(Untracked or Ignored files cannot be diffed against HEAD)." ) log_handler.log_info( f"Diff not applicable for status '{status_code}'.", func_name=func_name ) self.main_frame.show_info("Diff Not Applicable", msg) self.main_frame.update_status_bar("Ready (Diff not applicable).") return # --- Open DiffViewerWindow --- log_handler.log_debug( f"Opening DiffViewerWindow for '{relative_path}' (Working Dir vs HEAD)", func_name=func_name ) status_final: str = "Ready." # Default status after closing viewer try: # Instantiate and display the modal DiffViewerWindow DiffViewerWindow( master=self.master, # Parent window git_commands=self.git_commands, # Pass GitCommands instance repo_path=svn_path, relative_file_path=relative_path, # Use the cleaned path ref1='WORKING_DIR', # Compare working directory... ref2='HEAD' # ...against HEAD commit ) # Code execution pauses here until the DiffViewerWindow is closed log_handler.log_debug("Diff viewer window closed.", func_name=func_name) status_final = "Ready." except Exception as e: # Handle errors opening the diff viewer window log_handler.log_exception( f"Error opening or running diff viewer: {e}", func_name=func_name ) status_final = "Error: Failed to open diff viewer." self.main_frame.show_error( "Diff Viewer Error", f"Could not display diff:\n{e}" ) finally: # Update status bar after the window closes or an error occurs, # but only if another async operation hasn't started in the meantime. if hasattr(self, "main_frame") and not self.main_frame.status_bar_var.get().startswith("Processing"): self.main_frame.update_status_bar(status_final) def _handle_gitignore_save(self): """ Callback executed after .gitignore is saved successfully by the editor. Starts an asynchronous task to check for and untrack files if necessary. """ func_name: str = "_handle_gitignore_save" log_handler.log_info( "Callback: .gitignore saved. Starting async untrack check.", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Untrack Check after Gitignore Save") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_error( "Cannot start untrack check: Invalid/Not ready path.", func_name=func_name ) self.main_frame.update_status_bar( "Error: Untrack check failed (invalid path)." ) return # Prepare args and start the untrack worker args: tuple = (self.action_handler, svn_path) self._start_async_operation( worker_func=async_workers.run_untrack_async, args_tuple=args, context_dict={ "context": "_handle_gitignore_save", # Context identifies origin "status_msg": "Checking files to untrack", "committed_flag_possible": True # Untracking involves a commit } ) def add_selected_file(self, file_status_line: str): """ Starts async operation to add a selected untracked file ('??') to staging. """ func_name: str = "add_selected_file" log_handler.log_info( f"--- Action Triggered: Add File '{file_status_line}' (Async) ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Add File") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Add file failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not ready.") self.main_frame.update_status_bar("Add failed: Repo not ready.") return # Extract relative path from the status line (should start with '??') relative_path: Optional[str] = None try: line: str = file_status_line.strip("\x00").strip() # Only proceed if the status indicates an untracked file if line.startswith("??"): # Extract path after '?? ' handling potential quotes path_raw: str = line[2:].lstrip() if len(path_raw) >= 2 and path_raw.startswith('"') and path_raw.endswith('"'): relative_path = path_raw[1:-1] else: relative_path = path_raw else: # Show error if trying to add a non-untracked file log_handler.log_error( f"Cannot add non-untracked file: {line}", func_name=func_name ) self.main_frame.show_error( "Invalid Action", f"Cannot 'Add' file with status '{line[:2]}'.\nUse commit for modified/staged files." ) self.main_frame.update_status_bar("Add failed: Not an untracked file.") return # Check if path extraction failed if not relative_path: raise ValueError("Extracted relative path is empty.") except Exception as e: # Handle errors parsing the path log_handler.log_error( f"Error parsing path for add from line '{file_status_line}': {e}", func_name=func_name ) self.main_frame.show_error( "Parsing Error", f"Cannot parse file path from:\n{file_status_line}" ) self.main_frame.update_status_bar("Add failed: Parse error.") return # Prepare args and start the add worker args: tuple = (self.git_commands, svn_path, relative_path) base_filename: str = os.path.basename(relative_path) # For status message self._start_async_operation( worker_func=async_workers.run_add_file_async, args_tuple=args, context_dict={ "context": "add_file", "status_msg": f"Adding '{base_filename}'" } ) def create_tag(self): """ Handles tag creation: shows dialog, suggests name, starts async operation. """ func_name: str = "create_tag" log_handler.log_info( f"--- Action Triggered: Create Tag ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Create Tag") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Create Tag failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not ready.") self.main_frame.update_status_bar("Create Tag failed: Repo not ready.") return # Generate suggested tag name and show dialog self.main_frame.update_status_bar("Processing: Generating tag suggestion...") suggested_name: str = self._generate_next_tag_suggestion(svn_path) self.main_frame.update_status_bar("Ready for tag input.") dialog = CreateTagDialog(self.master, suggested_tag_name=suggested_name) tag_info: Optional[tuple[str, str]] = dialog.result # (name, message) or None # If user provided input, start async operation if tag_info: tag_name, tag_message = tag_info log_handler.log_info(f"User provided tag: '{tag_name}'", func_name=func_name) # Prepare args args: tuple = (self.action_handler, svn_path, tag_name, tag_message) # Start async operation self._start_async_operation( worker_func=async_workers.run_create_tag_async, args_tuple=args, context_dict={ "context": "create_tag", "status_msg": f"Creating tag '{tag_name}'", "committed_flag_possible": True # Annotated tag creates commit object } ) else: # User cancelled the dialog log_handler.log_info("Tag creation cancelled.", func_name=func_name) self.main_frame.update_status_bar("Cancelled.") def checkout_tag(self): """ Handles tag checkout: confirms with user, starts async operation. """ func_name: str = "checkout_tag" log_handler.log_info( f"--- Action Triggered: Checkout Tag ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Checkout Tag") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Checkout Tag failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not ready.") self.main_frame.update_status_bar("Checkout Tag failed: Repo not ready.") return # Get selected tag from GUI listbox tag_name: Optional[str] = self.main_frame.get_selected_tag() if not tag_name: # Show error if no tag is selected self.main_frame.show_error( "Selection Error", "No tag selected from the list." ) self.main_frame.update_status_bar("Checkout failed: No tag selected.") return # Confirm with user due to 'detached HEAD' state implications confirmation_message: str = ( f"Checkout tag '{tag_name}'?\n\n" f"Warning: This will put your repository in a 'detached HEAD' state. " f"You can look around, make experimental changes and commit them, " f"but they won't belong to any branch. " f"Use 'Checkout Branch' to return to a branch." ) if not self.main_frame.ask_yes_no("Confirm Checkout Tag", confirmation_message): # User cancelled log_handler.log_info("Tag checkout cancelled by user.", func_name=func_name) self.main_frame.update_status_bar("Cancelled.") return # Prepare args and start async operation args: tuple = (self.action_handler, svn_path, tag_name) self._start_async_operation( worker_func=async_workers.run_checkout_tag_async, args_tuple=args, context_dict={ "context": "checkout_tag", "status_msg": f"Checking out tag '{tag_name}'" } ) def create_branch(self): """ Handles branch creation: shows dialog, starts async operation. """ func_name: str = "create_branch" log_handler.log_info( f"--- Action Triggered: Create Branch ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Create Branch") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Create Branch failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not ready.") self.main_frame.update_status_bar("Create Branch failed: Repo not ready.") return # Show dialog to get new branch name self.main_frame.update_status_bar("Ready for branch name input.") dialog = CreateBranchDialog(self.master) branch_name: Optional[str] = dialog.result # Name or None if cancelled # If user provided a name, start async operation if branch_name: log_handler.log_info( f"User provided branch name: '{branch_name}'", func_name=func_name ) # Prepare args args: tuple = (self.action_handler, svn_path, branch_name) # Start async operation self._start_async_operation( worker_func=async_workers.run_create_branch_async, args_tuple=args, context_dict={ "context": "create_branch", "status_msg": f"Creating branch '{branch_name}'", "new_branch_name": branch_name # Pass name for potential checkout later } ) else: # User cancelled the dialog log_handler.log_info("Branch creation cancelled.", func_name=func_name) self.main_frame.update_status_bar("Cancelled.") def checkout_branch( self, branch_to_checkout: Optional[str] = None, repo_path_override: Optional[str] = None ): """ Handles checkout of an existing local branch. Confirms if triggered by button, starts async operation. Can be called directly with a branch name (e.g., after creation). """ func_name: str = "checkout_branch" target_description: str = branch_to_checkout if branch_to_checkout else "Selected Branch" log_handler.log_info( f"--- Action Triggered: Checkout Branch (Target: {target_description}) ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Determine repository path (use override if provided) svn_path: Optional[str] = repo_path_override or self._get_and_validate_svn_path("Checkout Branch") # Validate repo path and readiness if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Checkout Branch failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not ready.") self.main_frame.update_status_bar("Checkout Branch failed: Repo not ready.") return # Determine target branch and if confirmation is needed target_branch: Optional[str] = branch_to_checkout needs_confirmation: bool = False if not target_branch: # If no branch name passed, get selection from GUI and require confirmation target_branch = self.main_frame.get_selected_branch() needs_confirmation = True # Validate if a branch was determined if not target_branch: self.main_frame.show_error( "Selection Error", "No branch selected from the list." ) self.main_frame.update_status_bar("Checkout failed: No branch selected.") return # Ask for confirmation only if triggered by button press (not directly called) if needs_confirmation: if not self.main_frame.ask_yes_no( "Confirm Checkout Branch", f"Switch to branch '{target_branch}'?" ): log_handler.log_info( "Branch checkout cancelled by user.", func_name=func_name ) self.main_frame.update_status_bar("Cancelled.") return # Prepare args and start async operation args: tuple = (self.action_handler, svn_path, target_branch) self._start_async_operation( worker_func=async_workers.run_checkout_branch_async, args_tuple=args, context_dict={ "context": "checkout_branch", "status_msg": f"Checking out branch '{target_branch}'" } ) def delete_local_branch(self, branch_name: str, force: bool): """ Handles the request to delete a local branch (with confirmation). """ func_name: str = "delete_local_branch" action_description: str = "Force delete" if force else "Delete" log_handler.log_info( f"--- Action Triggered: {action_description} Local Branch '{branch_name}' ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path(f"{action_description} Branch") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( f"{action_description} Branch skipped: Repo not ready.", func_name=func_name ) self.main_frame.update_status_bar(f"{action_description} failed: Repo not ready.") return # --- User Confirmation --- confirm_message: str = f"Are you sure you want to delete the local branch '{branch_name}'?" dialog_title: str = "Confirm Delete Branch" ask_function: Callable = self.main_frame.ask_yes_no # Default confirmation # Add warning for force delete if force: confirm_message += "\n\nWARNING: Force delete will discard any unmerged changes on this branch!" dialog_title = "Confirm Force Delete Branch" # Ask user for confirmation if not ask_function(dialog_title, confirm_message): log_handler.log_info( "Local branch deletion cancelled by user.", func_name=func_name ) self.main_frame.update_status_bar("Delete branch cancelled.") return # --- Start Async Worker --- log_handler.log_info( f"Starting {action_description.lower()} for local branch '{branch_name}'...", func_name=func_name ) # Prepare arguments args: tuple = (self.action_handler, svn_path, branch_name, force) # Start async operation self._start_async_operation( worker_func=async_workers.run_delete_local_branch_async, args_tuple=args, context_dict={ "context": "delete_local_branch", "status_msg": f"{action_description} branch '{branch_name}'", "branch_name": branch_name, # Pass name for result handling "force": force # Pass force flag } ) def merge_local_branch(self, branch_to_merge: str): """ Handles the request to merge a local branch into the current branch. """ func_name: str = "merge_local_branch" log_handler.log_info( f"--- Action Triggered: Merge Local Branch '{branch_to_merge}' ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path(f"Merge Branch '{branch_to_merge}'") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Merge Branch skipped: Repo not ready.", func_name=func_name ) self.main_frame.update_status_bar("Merge failed: Repo not ready.") return # Get current branch for validation and confirmation message current_branch: Optional[str] = None try: current_branch = self.git_commands.get_current_branch_name(svn_path) if not current_branch: raise ValueError("Could not determine the current branch (Detached HEAD?).") if current_branch == branch_to_merge: raise ValueError("Cannot merge a branch into itself.") except (GitCommandError, ValueError) as e: log_handler.log_error(f"Merge aborted during pre-check: {e}", func_name=func_name) self.main_frame.show_error("Merge Error", f"Cannot start merge:\n{e}") self.main_frame.update_status_bar("Merge failed: Pre-check error.") return # --- User Confirmation --- confirm_message: str = f"Merge branch '{branch_to_merge}' into current branch '{current_branch}'?" # Default: allow fast-forward merges without forcing a merge commit no_ff_option: bool = False if not self.main_frame.ask_yes_no("Confirm Merge", confirm_message): log_handler.log_info( "Local branch merge cancelled by user.", func_name=func_name ) self.main_frame.update_status_bar("Merge cancelled.") return # --- Start Async Worker --- log_handler.log_info( f"Starting merge of '{branch_to_merge}' into '{current_branch}'...", func_name=func_name ) # Prepare arguments (pass git_commands for worker's internal checks) args: tuple = ( self.action_handler, self.git_commands, svn_path, branch_to_merge, no_ff_option ) # Start async operation self._start_async_operation( worker_func=async_workers.run_merge_local_branch_async, args_tuple=args, context_dict={ "context": "merge_local_branch", "status_msg": f"Merging '{branch_to_merge}' into '{current_branch}'", "branch_merged_into": current_branch, "branch_merged_from": branch_to_merge, "repo_path": svn_path, # Needed for conflict message } ) def compare_branch_with_current(self, other_branch_ref: str): """ Handles comparing a selected branch (local or remote) with the current branch. """ func_name: str = "compare_branch_with_current" log_handler.log_info( f"--- Action Triggered: Compare Branch '{other_branch_ref}' with Current ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path(f"Compare Branch '{other_branch_ref}'") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Compare Branch skipped: Repo not ready.", func_name=func_name ) self.main_frame.update_status_bar("Compare failed: Repo not ready.") return # Get current branch for comparison current_branch: Optional[str] = None try: current_branch = self.git_commands.get_current_branch_name(svn_path) if not current_branch: raise ValueError("Cannot compare: Currently in detached HEAD state.") # Prevent comparing a branch with itself if current_branch == other_branch_ref: log_handler.log_warning( "Compare Branch skipped: Cannot compare a branch with itself.", func_name=func_name ) self.main_frame.show_info("Compare Info", "Cannot compare a branch with itself.") return except (GitCommandError, ValueError) as e: log_handler.log_error(f"Compare aborted during pre-check: {e}", func_name=func_name) self.main_frame.show_error("Compare Error", f"Cannot start compare:\n{e}") return # --- Start Async Worker --- log_handler.log_info( f"Starting comparison between '{current_branch}' and '{other_branch_ref}'...", func_name=func_name ) # Prepare arguments: ref1 is current, ref2 is the other branch args: tuple = (self.git_commands, svn_path, current_branch, other_branch_ref) # Start async operation self._start_async_operation( worker_func=async_workers.run_compare_branches_async, args_tuple=args, context_dict={ "context": "compare_branches", "status_msg": f"Comparing '{current_branch}' vs '{other_branch_ref}'", "ref1": current_branch, # Pass refs for summary window "ref2": other_branch_ref, "repo_path": svn_path, # Pass repo path for summary window } ) # --- Remote Action Launchers --- def apply_remote_config(self): """ Callback for 'Apply Config' button. Starts async worker. """ func_name: str = "apply_remote_config" log_handler.log_info( f"--- Action Triggered: Apply Remote Config ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): log_handler.log_error( "Cannot apply config: Main frame missing.", func_name=func_name ) return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Apply Remote Config") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Apply config skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) self.main_frame.update_status_bar("Apply config failed: Repo not ready.") return # Get remote URL and name from GUI remote_url: str = self.main_frame.remote_url_var.get().strip() remote_name: str = self.main_frame.remote_name_var.get().strip() # Validate remote URL if not remote_url: log_handler.log_warning( "Apply config failed: Remote URL is empty.", func_name=func_name ) self.main_frame.show_error("Input Error", "Remote URL cannot be empty.") self.main_frame.update_status_bar("Apply config failed: URL empty.") return # Use default remote name if empty if not remote_name: remote_name = DEFAULT_REMOTE_NAME log_handler.log_info( f"Remote name empty, using default: '{remote_name}'", func_name=func_name ) self.main_frame.remote_name_var.set(remote_name) # Save profile settings BEFORE applying to Git config if not self.save_profile_settings(): if not self.main_frame.ask_yes_no( "Warning", "Could not save profile settings.\nContinue applying remote config anyway?" ): self.main_frame.update_status_bar( "Apply config cancelled (profile save failed)." ) return # Prepare args and start async operation args: tuple = (self.remote_action_handler, svn_path, remote_name, remote_url) self._start_async_operation( worker_func=async_workers.run_apply_remote_config_async, args_tuple=args, context_dict={ "context": "apply_remote_config", "status_msg": f"Applying config for remote '{remote_name}'" } ) def check_connection_auth(self): """ Callback for 'Check Connection & Auth' button. """ func_name: str = "check_connection_auth" log_handler.log_info( f"--- Action Triggered: Check Connection & Auth ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Check Connection") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Check Connection skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) # Reset auth status indicator if repo not ready self._update_gui_auth_status("unknown") return # Get remote name, using default if empty remote_name: str = self.main_frame.remote_name_var.get().strip() if not remote_name: remote_name = DEFAULT_REMOTE_NAME self.main_frame.remote_name_var.set(remote_name) log_handler.log_info( f"Checking connection/auth for remote '{remote_name}'...", func_name=func_name ) # Update GUI indicator to 'checking' state self._update_gui_auth_status("checking") # Prepare args and start async operation args: tuple = (self.git_commands, svn_path, remote_name) self._start_async_operation( worker_func=async_workers.run_check_connection_async, args_tuple=args, context_dict={ "context": "check_connection", "status_msg": f"Checking remote '{remote_name}'", "remote_name_checked": remote_name, # Pass context for result handler "repo_path_checked": svn_path, } ) def fetch_remote(self): """ Starts the asynchronous 'git fetch' operation. """ func_name: str = "fetch_remote" log_handler.log_info( f"--- Action Triggered: Fetch Remote ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Fetch Remote") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Fetch Remote skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) self.main_frame.update_status_bar("Fetch failed: Repo not ready.") return # Get remote name, using default if empty remote_name: str = self.main_frame.remote_name_var.get().strip() if not remote_name: remote_name = DEFAULT_REMOTE_NAME self.main_frame.remote_name_var.set(remote_name) # Optional: Check auth status before fetching (consider if needed) # if self.remote_auth_status != 'ok': ... (warning/confirmation) ... log_handler.log_info( f"Starting fetch for remote '{remote_name}'...", func_name=func_name ) # Prepare args and start async operation args: tuple = (self.remote_action_handler, svn_path, remote_name) self._start_async_operation( worker_func=async_workers.run_fetch_remote_async, args_tuple=args, context_dict={ "context": "fetch_remote", "status_msg": f"Fetching from remote '{remote_name}'" } ) def pull_remote(self): """ Starts the asynchronous 'git pull' operation for the current branch. """ func_name: str = "pull_remote" log_handler.log_info( f"--- Action Triggered: Pull Remote ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Pull Remote") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Pull Remote skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) self.main_frame.update_status_bar("Pull failed: Repo not ready.") return # Get remote name, using default if empty remote_name: str = self.main_frame.remote_name_var.get().strip() if not remote_name: remote_name = DEFAULT_REMOTE_NAME self.main_frame.remote_name_var.set(remote_name) # Check authentication/connection status before attempting pull if self.remote_auth_status != "ok": auth_msg: str = f"Cannot Pull from '{remote_name}':\n" if self.remote_auth_status == "required": auth_msg += "Authentication is required. Use 'Check Connection / Auth' first." elif self.remote_auth_status == "failed": auth_msg += "Authentication previously failed. Use 'Check Connection / Auth' to retry." elif self.remote_auth_status == "connection_failed": auth_msg += "Connection previously failed. Check URL and network." else: auth_msg += "Connection status is unknown or in error. Use 'Check Connection / Auth' first." log_handler.log_warning(f"Pull Remote skipped: Auth/Connection status is '{self.remote_auth_status}'.", func_name=func_name) self.main_frame.show_warning("Action Blocked", auth_msg) self.main_frame.update_status_bar(f"Pull failed: {self.remote_auth_status}") return # Worker will get current branch name internally log_handler.log_info( f"Starting pull for remote '{remote_name}'...", func_name=func_name ) # Prepare args (pass GitCommands for the worker to get current branch) args: tuple = (self.remote_action_handler, self.git_commands, svn_path, remote_name) # Start async operation self._start_async_operation( worker_func=async_workers.run_pull_remote_async, args_tuple=args, context_dict={ "context": "pull_remote", "status_msg": f"Pulling from remote '{remote_name}'", "repo_path": svn_path, # Pass context for conflict messages "remote_name": remote_name, } ) def push_remote(self): """ Starts the asynchronous 'git push' operation for the current branch. """ func_name: str = "push_remote" log_handler.log_info( f"--- Action Triggered: Push Branch to Remote ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Push Branch") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Push Branch skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) self.main_frame.update_status_bar("Push failed: Repo not ready.") return # Get remote name, using default if empty remote_name: str = self.main_frame.remote_name_var.get().strip() if not remote_name: remote_name = DEFAULT_REMOTE_NAME self.main_frame.remote_name_var.set(remote_name) # Check authentication/connection status before attempting push if self.remote_auth_status != "ok": auth_msg: str = f"Cannot Push to '{remote_name}':\n" # ... (Build specific auth message as in pull_remote) ... if self.remote_auth_status == "required": auth_msg += "Authentication is required..." # Shortened for brevity elif self.remote_auth_status == "failed": auth_msg += "Authentication previously failed..." elif self.remote_auth_status == "connection_failed": auth_msg += "Connection previously failed..." else: auth_msg += "Connection status is unknown or in error..." log_handler.log_warning( f"Push Remote skipped: Auth/Connection status is '{self.remote_auth_status}'.", func_name=func_name ) self.main_frame.show_warning("Action Blocked", auth_msg) self.main_frame.update_status_bar(f"Push failed: {self.remote_auth_status}") return # Optional: Check for uncommitted changes before push try: if self.git_commands.git_status_has_changes(svn_path): if not self.main_frame.ask_yes_no( "Uncommitted Changes", "There are uncommitted changes in your working directory.\n" "Push anyway? (Only committed changes will be pushed)" ): self.main_frame.update_status_bar( "Push cancelled by user (uncommitted changes)." ) return except GitCommandError as status_err: # Handle error during status check log_handler.log_error( f"Push aborted: Failed to check repository status before push: {status_err}", func_name=func_name ) self.main_frame.show_error( "Status Error", f"Could not check repo status:\n{status_err}" ) return log_handler.log_info( f"Starting push for current branch to remote '{remote_name}'...", func_name=func_name ) # Worker will get current branch name # Prepare args (pass GitCommands for the worker) args: tuple = (self.remote_action_handler, self.git_commands, svn_path, remote_name) # Start async operation self._start_async_operation( worker_func=async_workers.run_push_remote_async, args_tuple=args, context_dict={ "context": "push_remote", "status_msg": f"Pushing current branch to remote '{remote_name}'", "remote_name": remote_name, # Pass context for result messages } ) def push_tags_remote(self): """ Starts the asynchronous 'git push --tags' operation. """ func_name: str = "push_tags_remote" log_handler.log_info( f"--- Action Triggered: Push Tags to Remote ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Push Tags") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Push Tags skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) self.main_frame.update_status_bar("Push tags failed: Repo not ready.") return # Get remote name, using default if empty remote_name: str = self.main_frame.remote_name_var.get().strip() if not remote_name: remote_name = DEFAULT_REMOTE_NAME self.main_frame.remote_name_var.set(remote_name) # Check authentication/connection status if self.remote_auth_status != "ok": auth_msg: str = f"Cannot Push Tags to '{remote_name}':\n" # ... (Build specific auth message as in push_remote) ... log_handler.log_warning( f"Push Tags skipped: Auth/Connection status is '{self.remote_auth_status}'.", func_name=func_name ) self.main_frame.show_warning("Action Blocked", auth_msg) self.main_frame.update_status_bar(f"Push tags failed: {self.remote_auth_status}") return # Confirm with user before pushing all local tags if not self.main_frame.ask_yes_no( "Confirm Push Tags", f"Push all local tags to remote '{remote_name}'?\n" f"(Existing tags on the remote with the same name will " f"NOT be overwritten unless forced, which this action does not do)." ): self.main_frame.update_status_bar("Push tags cancelled by user.") return log_handler.log_info( f"Starting push tags to remote '{remote_name}'...", func_name=func_name ) # Prepare args args: tuple = (self.remote_action_handler, svn_path, remote_name) # Start async operation self._start_async_operation( worker_func=async_workers.run_push_tags_async, args_tuple=args, context_dict={ "context": "push_tags_remote", "status_msg": f"Pushing tags to remote '{remote_name}'", "remote_name": remote_name, # Pass context for messages } ) def clone_remote_repo(self): """ Handles the 'Clone from Remote...' action: shows dialog, validates, starts worker. """ func_name: str = "clone_remote_repo" log_handler.log_info( f"--- Action Triggered: Clone Remote Repository ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): log_handler.log_error( "Cannot start clone: Main frame not available.", func_name=func_name ) return # Show modal dialog to get clone parameters dialog = CloneFromRemoteDialog(self.master) # Result is None if cancelled, or (url, parent_dir, profile_name_input) dialog_result: Optional[tuple[str, str, str]] = dialog.result if not dialog_result: log_handler.log_info( "Clone operation cancelled by user in dialog.", func_name=func_name ) self.main_frame.update_status_bar("Clone cancelled.") return # Extract data from dialog result remote_url, local_parent_dir, profile_name_input = dialog_result # --- Derive target directory and profile name, validate paths --- final_profile_name: str = "" target_clone_dir: str = "" repo_name_from_url: str = "" try: # Derive repo name from URL (remove .git suffix) repo_name_from_url = os.path.basename(remote_url) if repo_name_from_url.endswith(".git"): repo_name_from_url = repo_name_from_url[:-4] if not repo_name_from_url: raise ValueError("Could not derive repository name from URL.") # Construct full target path for the clone target_clone_dir = os.path.join(local_parent_dir, repo_name_from_url) # Normalize the path target_clone_dir = os.path.abspath(target_clone_dir) # Determine final profile name (use input or derive from repo name) if profile_name_input: final_profile_name = profile_name_input # Check if proposed profile name already exists if final_profile_name in self.config_manager.get_profile_sections(): raise ValueError( f"Profile name '{final_profile_name}' already exists. " f"Please choose a different name." ) else: # Use repo name as base, add counter if it exists final_profile_name = repo_name_from_url counter: int = 1 while final_profile_name in self.config_manager.get_profile_sections(): final_profile_name = f"{repo_name_from_url}_{counter}" counter += 1 log_handler.log_debug( f"Derived target clone directory: {target_clone_dir}", func_name=func_name ) log_handler.log_debug( f"Determined profile name: {final_profile_name}", func_name=func_name ) # --- CRITICAL CHECK: Target directory must NOT exist --- if os.path.exists(target_clone_dir): error_msg: str = ( f"Clone failed: Target directory already exists:\n{target_clone_dir}\n" f"Please choose a different parent directory or ensure the target is clear." ) log_handler.log_error(error_msg, func_name=func_name) self.main_frame.show_error("Clone Path Error", error_msg) self.main_frame.update_status_bar("Clone failed: Target directory exists.") return # Stop the operation except ValueError as ve: # Handle errors deriving names or validating profile name log_handler.log_error( f"Clone configuration error: {ve}", func_name=func_name ) self.main_frame.show_error("Configuration Error", str(ve)) self.main_frame.update_status_bar("Clone failed: Configuration error.") return except Exception as e: # Handle unexpected errors during preparation log_handler.log_exception( f"Unexpected error preparing for clone: {e}", func_name=func_name ) self.main_frame.show_error( "Internal Error", f"An unexpected error occurred:\n{e}" ) self.main_frame.update_status_bar("Clone failed: Internal error.") return # --- Start Asynchronous Worker --- log_handler.log_info( f"Starting clone for '{remote_url}' into '{target_clone_dir}'...", func_name=func_name ) # Prepare arguments args: tuple = (self.git_commands, remote_url, target_clone_dir, final_profile_name) # Start async operation self._start_async_operation( worker_func=async_workers.run_clone_remote_async, args_tuple=args, context_dict={ "context": "clone_remote", "status_msg": f"Cloning '{repo_name_from_url}'...", # Use repo name "clone_success_data": { # Pass data needed for profile creation "profile_name": final_profile_name, "cloned_path": target_clone_dir, "remote_url": remote_url, } } ) def refresh_remote_branches(self): """ Starts the asynchronous refresh of the remote branches list. """ func_name: str = "refresh_remote_branches" log_handler.log_info( f"--- Action Triggered: Refresh Remote Branches List ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Remote Branches") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Refresh Remote Branches skipped: Repo not ready.", func_name=func_name ) self._update_gui_for_not_ready_state() # Update GUI lists return # Get remote name, using default if empty remote_name: str = self.main_frame.remote_name_var.get().strip() if not remote_name: remote_name = DEFAULT_REMOTE_NAME self.main_frame.remote_name_var.set(remote_name) log_handler.log_info( f"Starting refresh of remote branches list for '{remote_name}'...", func_name=func_name ) # Update GUI list to show "Loading..." if hasattr(self.main_frame, "update_remote_branches_list"): self.main_frame.update_remote_branches_list(["(Loading...)"]) # Prepare args and start async operation args: tuple = (self.git_commands, svn_path, remote_name) self._start_async_operation( worker_func=async_workers.run_refresh_remote_branches_async, args_tuple=args, context_dict={ "context": "refresh_remote_branches", "status_msg": f"Refreshing remote branches for '{remote_name}'" } ) def checkout_remote_branch_as_local( self, remote_branch_full: str, local_branch_suggestion: str ): """ Handles checkout of a remote branch as a new local tracking branch. Checks if local branch exists, confirms if needed, starts async worker. """ func_name: str = "checkout_remote_branch_as_local" log_handler.log_info( f"--- Action Triggered: Checkout Remote Branch '{remote_branch_full}' as Local '{local_branch_suggestion}' ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Checkout Remote Branch") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Checkout Remote Branch skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) # No status bar update here as it's usually from a menu click return try: # Check if a local branch with the suggested name already exists local_branches: list[str] current: Optional[str] local_branches, current = self.git_commands.list_branches(svn_path) if local_branch_suggestion in local_branches: # If it exists, ask user if they want to checkout the existing one instead log_handler.log_warning( f"Local branch '{local_branch_suggestion}' already exists.", func_name=func_name ) if self.main_frame.ask_yes_no( "Branch Exists", f"A local branch named '{local_branch_suggestion}' already exists.\n\n" f"Do you want to check out the existing local branch instead?" ): # User wants to checkout existing local branch log_handler.log_info( f"User chose to checkout existing local branch '{local_branch_suggestion}'.", func_name=func_name ) # Call the standard checkout function, passing the path override self.checkout_branch( branch_to_checkout=local_branch_suggestion, repo_path_override=svn_path ) else: # User cancelled log_handler.log_info( "Checkout cancelled because local branch exists.", func_name=func_name ) self.main_frame.update_status_bar("Checkout cancelled.") # Exit in either case (another async task started or cancelled) return # If local branch doesn't exist, proceed to create tracking branch log_handler.log_info( f"Starting checkout of '{remote_branch_full}' as new local branch '{local_branch_suggestion}'...", func_name=func_name ) # Prepare args and start async operation args: tuple = ( self.action_handler, svn_path, local_branch_suggestion, remote_branch_full, ) self._start_async_operation( worker_func=async_workers.run_checkout_tracking_branch_async, args_tuple=args, context_dict={ "context": "checkout_tracking_branch", "status_msg": f"Checking out '{local_branch_suggestion}' tracking '{remote_branch_full}'" } ) except Exception as e: # Handle errors during local branch check or starting the worker log_handler.log_exception( f"Error preparing for tracking branch checkout: {e}", func_name=func_name ) if hasattr(self, "main_frame"): self.main_frame.show_error( "Checkout Error", f"Could not start checkout operation:\n{e}" ) self.main_frame.update_status_bar("Checkout failed: Internal error.") # --- Simplified Queue Checking Method (Refactored) --- def _check_completion_queue(self, results_queue: queue.Queue, context: dict): """ Checks result queue, updates status bar briefly, delegates processing. """ task_context: str = context.get('context', 'unknown') func_name: str = "_check_completion_queue" try: # Get result without blocking result_data: Dict[str, Any] = results_queue.get_nowait() log_handler.log_info( f"Result received for '{task_context}'. Status: {result_data.get('status')}", func_name=func_name ) # Determine if widgets should be re-enabled immediately should_reenable_now: bool = self._should_reenable_widgets_now( task_context, result_data.get('status') ) if not should_reenable_now: log_handler.log_debug( f"Postponing widget re-enable for context: {task_context}", func_name=func_name ) # Re-enable widgets now if appropriate if should_reenable_now: self._reenable_widgets_if_ready() # Update status bar with generic message (may be overridden by handler) self._update_status_bar_from_result(task_context, result_data) # Process the result using the dedicated handler self._process_result_with_handler(result_data, context) except queue.Empty: # Queue empty, reschedule check self._reschedule_queue_check(results_queue, context) except Exception as e: # Critical error during queue check or basic handling self._handle_queue_check_error(e, task_context) # --- Helper methods for _check_completion_queue --- def _should_reenable_widgets_now(self, task_context: str, status: Optional[str]) -> bool: """ Determines if GUI widgets should be re-enabled immediately. """ # Don't re-enable immediately if waiting for user interaction or follow-up task if task_context == "check_connection" and status == 'auth_required': return False if task_context == "interactive_auth" and status == 'success': return False if task_context == 'clone_remote' and status == 'success': return False if task_context in ['checkout_tracking_branch', 'checkout_branch', 'checkout_tag'] and status == 'success': return False if task_context in ['pull_remote', 'merge_local_branch'] and status == 'conflict': return False if task_context == 'compare_branches' and status == 'success': return False # Default: Re-enable immediately return True def _reenable_widgets_if_ready(self): """ Safely re-enables action widgets if the main frame exists. """ if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): log_handler.log_debug( "Re-enabling widgets now.", func_name="_reenable_widgets_if_ready" ) self.main_frame.set_action_widgets_state(tk.NORMAL) else: log_handler.log_warning( "Cannot re-enable widgets, MainFrame missing.", func_name="_reenable_widgets_if_ready" ) def _update_status_bar_from_result(self, task_context: str, result_data: dict): """ Updates status bar based on result, unless specific conditions apply. """ status: Optional[str] = result_data.get('status') message: str = result_data.get('message', "Operation finished.") # Conditions where status bar update is skipped or handled differently skip_update: bool = False if (task_context == 'clone_remote' and status == 'success'): skip_update = True if (task_context in ['checkout_tracking_branch', 'checkout_branch', 'checkout_tag'] and status == 'success'): skip_update = True if (task_context == 'compare_branches' and status == 'success'): skip_update = True if status in ['conflict', 'rejected']: skip_update = True # Update status bar if not skipped and main frame exists if not skip_update and hasattr(self, "main_frame") and self.main_frame.winfo_exists(): status_color: Optional[str] = None reset_duration: int = 5000 # Default reset time # Determine color and duration based on status if status == 'success': status_color = self.main_frame.STATUS_GREEN elif status == 'warning': status_color = self.main_frame.STATUS_YELLOW; reset_duration = 7000 elif status == 'auth_required': status_color = self.main_frame.STATUS_YELLOW; reset_duration = 15000 elif status == 'conflict': status_color = self.main_frame.STATUS_RED; reset_duration = 15000 elif status == 'rejected': status_color = self.main_frame.STATUS_RED; reset_duration = 15000 elif status == 'error': status_color = self.main_frame.STATUS_RED; reset_duration = 10000 # Call the GUI update method self.main_frame.update_status_bar( message, bg_color=status_color, duration_ms=reset_duration ) def _process_result_with_handler(self, result_data: dict, context: dict): """ Instantiates and calls the AsyncResultHandler to process the result. """ task_context: str = context.get('context', 'unknown') func_name: str = "_process_result_with_handler" try: # Create handler instance, passing the current app instance result_handler = AsyncResultHandler(self) # Delegate detailed processing result_handler.process(result_data, context) log_handler.log_debug( f"Result processing delegated to handler for context '{task_context}'.", func_name=func_name ) except Exception as handler_e: # Handle errors occurring *within* the result handler log_handler.log_exception( f"Error during result processing by handler for {task_context}: {handler_e}", func_name=func_name ) # Show error to user if hasattr(self, "main_frame"): self.main_frame.show_error( "Processing Error", f"Failed to handle task result:\n{handler_e}" ) # Ensure widgets are re-enabled if handler fails unexpectedly if not self._should_reenable_widgets_now(task_context, result_data.get('status')) and \ hasattr(self.main_frame, "winfo_exists") and self.main_frame.winfo_exists(): log_handler.log_warning( "Re-enabling widgets after handler error.", func_name=func_name ) self.main_frame.set_action_widgets_state(tk.NORMAL) def _reschedule_queue_check(self, results_queue: queue.Queue, context: dict): """ Reschedules the check for the completion queue if the app is running. """ if hasattr(self, "master") and self.master.winfo_exists(): self.master.after( self.ASYNC_QUEUE_CHECK_INTERVAL_MS, self._check_completion_queue, results_queue, context ) def _handle_queue_check_error(self, error: Exception, task_context: str): """ Handles critical errors during the queue check process itself. """ func_name: str = "_handle_queue_check_error" log_handler.log_exception( f"Critical error checking completion queue for {task_context}: {error}", func_name=func_name ) try: # Attempt GUI recovery if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): # Re-enable widgets self.main_frame.set_action_widgets_state(tk.NORMAL) # Show error in status bar self.main_frame.update_status_bar( "Error processing async result.", bg_color=self.main_frame.STATUS_RED, duration_ms=10000 ) # Reset other relevant GUI states (e.g., sync status) self._update_gui_for_status_error() except Exception as recovery_e: # Log error during recovery attempt log_handler.log_error( f"Failed to recover GUI after queue processing error: {recovery_e}", func_name=func_name ) # --- Helper per Suggestion Tag (Metodo interno) --- def _generate_next_tag_suggestion(self, svn_path: str) -> str: """ Generates a suggested tag name based on the latest v.X.X.X.X tag. """ func_name: str = "_generate_next_tag_suggestion" log_handler.log_debug("Generating next tag suggestion...", func_name=func_name) default_suggestion: str = "v.0.0.0.1" latest_valid_tag: Optional[str] = None # Regex to match tags like v.1.2.3.4 tag_pattern = re.compile(r"^v\.(\d+)\.(\d+)\.(\d+)\.(\d+)$") try: # Get existing tags sorted by creation date (newest first) tags_data: list[tuple[str, str]] = self.git_commands.list_tags(svn_path) if not tags_data: log_handler.log_debug( "No existing tags found. Suggesting default.", func_name=func_name ) return default_suggestion # Find the latest tag matching the pattern for tag_name, _ in tags_data: match = tag_pattern.match(tag_name) if match: latest_valid_tag = tag_name log_handler.log_debug( f"Found latest tag matching pattern: {latest_valid_tag}", func_name=func_name ) break # Found the newest matching tag # If no matching tag found, return default if not latest_valid_tag: log_handler.log_debug( "No tags matched the pattern v.X.X.X.X. Suggesting default.", func_name=func_name ) return default_suggestion # Extract version numbers from the latest tag match = tag_pattern.match(latest_valid_tag) # This check should ideally not fail if latest_valid_tag was set correctly if not match: log_handler.log_error( f"Internal error: Could not re-match tag {latest_valid_tag}", func_name=func_name ) return default_suggestion # Increment the version numbers (with carry-over logic) v1, v2, v3, v4 = map(int, match.groups()) limit: int = 99 # Assuming max 99 for each part v4 += 1 if v4 > limit: v4 = 0 v3 += 1 if v3 > limit: v3 = 0 v2 += 1 if v2 > limit: v2 = 0 v1 += 1 # Handle potential overflow of v1 if needed, or just let it increase # Format the next suggested tag name next_tag: str = f"v.{v1}.{v2}.{v3}.{v4}" log_handler.log_debug( f"Generated suggestion: {next_tag}", func_name=func_name ) return next_tag except Exception as e: # Log errors during suggestion generation and return default log_handler.log_exception( f"Error generating tag suggestion: {e}", func_name=func_name ) return default_suggestion # --- Application Entry Point --- def main(): """ Main function to initialize and run the Tkinter application. """ root: Optional[tk.Tk] = None app: Optional[GitSvnSyncApp] = None try: # Create the main Tkinter window print("Creating Tkinter root window...") root = tk.Tk() # Set minimum window size root.minsize(850, 750) print("Tkinter root window created.") # Initialize the main application controller print("Initializing GitSvnSyncApp...") app = GitSvnSyncApp(root) print("GitSvnSyncApp initialization attempt complete.") # Start the Tkinter event loop only if GUI initialization was successful if hasattr(app, "main_frame") and app.main_frame and app.main_frame.winfo_exists(): print("Starting Tkinter main event loop.") root.mainloop() print("Tkinter main event loop finished.") else: # If GUI init failed, an error was likely shown already print( "CRITICAL: App init failed before mainloop could start. Exiting.", file=sys.stderr ) # Ensure root window is destroyed if it still exists if root and root.winfo_exists(): try: root.destroy() except Exception: pass # Ignore errors during final cleanup except Exception as e: # Catch unexpected errors during startup or runtime print(f"FATAL error during application execution: {e}", file=sys.stderr) traceback.print_exc() # Print detailed traceback # Attempt to show a final error message in a popup try: parent_window = root if root and root.winfo_exists() else None messagebox.showerror( "Fatal Application Error", f"Application failed unexpectedly:\n{e}", parent=parent_window ) except Exception as msg_e: # Fallback if even the error message fails print(f"FATAL (GUI error message failed: {msg_e}):\n{e}", file=sys.stderr) finally: # Log application exit print("Application exiting.") # Standard Python entry point check if __name__ == "__main__": main() # --- END OF FILE GitUtility.py ---