# --- FILE: gitsync_tool/app.py --- import os import datetime import tkinter as tk from tkinter import messagebox, filedialog # Import specifici da tkinter import logging # Mantenuto per usare i livelli (es. logging.DEBUG) import re import threading import queue import traceback import sys from typing import Callable, List, Dict, Any, Tuple, Optional # ---<<< MODIFICA IMPORT: Usa percorsi assoluti dal pacchetto >>>--- # Configuration management from gitutility.config.config_manager import ConfigManager from gitutility.config.config_manager import ( DEFAULT_PROFILE, DEFAULT_BACKUP_DIR, DEFAULT_REMOTE_NAME, ) # Core logic handlers from gitutility.core.action_handler import ActionHandler from gitutility.core.backup_handler import BackupHandler from gitutility.core.remote_actions import RemoteActionHandler # Command execution wrapper from gitutility.commands.git_commands import GitCommands, GitCommandError # Logging system from gitutility.logging_setup import log_handler # Funzioni per inviare log alla coda from gitutility.logging_setup.logger_config import ( setup_file_logging, ) # Funzione setup file log # GUI components (dai nuovi moduli specifici) from gitutility.gui.main_frame import MainFrame from gitutility.gui.editors import GitignoreEditorWindow from gitutility.gui.dialogs import ( CreateTagDialog, CreateBranchDialog, CloneFromRemoteDialog, ) from gitutility.gui.diff_summary_viewer import DiffSummaryWindow from gitutility.gui.commit_detail_window import CommitDetailWindow from gitutility.gui.diff_viewer import DiffViewerWindow # Asynchronous operations from gitutility.async_tasks import async_workers # Worker functions from gitutility.async_tasks.async_result_handler import ( AsyncResultHandler, ) # Result processor # ---<<< FINE MODIFICA IMPORT >>>--- class GitSvnSyncApp: """ Main application controller class for the Git Sync Tool. Orchestrates GUI (MainFrame) and backend actions (ActionHandler, RemoteActionHandler, GitCommands, etc.) using asynchronous operations via async_workers and processes results via AsyncResultHandler. Manages application state and configuration loading/saving. Uses a centralized logging queue (log_handler) processed here. """ # Constants for polling intervals (in milliseconds) LOG_QUEUE_CHECK_INTERVAL_MS: int = 100 ASYNC_QUEUE_CHECK_INTERVAL_MS: int = 100 def __init__(self, master: tk.Tk): """ Initializes the application components, GUI, logging, and starts polling loops. Args: master (tk.Tk): The main Tkinter root window. """ self.master: tk.Tk = master master.title("Git Utility (Bundle & Remote Manager)") # Define behavior on window close button press master.protocol("WM_DELETE_WINDOW", self.on_closing) # Initial logging (console only, file logging setup later) print("Initializing GitSvnSyncApp...") # Use log_handler early (it just puts in queue, processed later) log_handler.log_debug( "GitSvnSyncApp initialization started.", func_name="__init__" ) # --- Initialize Core Backend Components --- # Instantiate managers and handlers first try: # Config manager loads config on init self.config_manager: ConfigManager = ConfigManager() self.git_commands: GitCommands = GitCommands() self.backup_handler: BackupHandler = BackupHandler() # Action Handlers depend on GitCommands and BackupHandler self.action_handler: ActionHandler = ActionHandler( self.git_commands, self.backup_handler ) self.remote_action_handler: RemoteActionHandler = RemoteActionHandler( self.git_commands ) # Internal state variables self.remote_auth_status: str = ( "unknown" # ok, required, failed, connection_failed, etc. ) self.current_local_branch: Optional[str] = ( None # Store the currently checked-out branch name ) print("Core backend components initialized.") log_handler.log_debug( "Core backend components initialized successfully.", func_name="__init__", ) except Exception as e: print( f"FATAL: Failed to initialize core backend components: {e}", file=sys.stderr, ) log_handler.log_critical( f"Failed to initialize core backend components: {e}", func_name="__init__", ) # Show error graphically if possible before exiting self.show_fatal_error( f"Initialization Error:\n{e}\n\nApplication cannot start." ) # Ensure window closes even if init fails self.master.after(10, self.on_closing) # Schedule closing return # Stop initialization # --- Initialize Graphical User Interface (GUI) --- # Create the main application window frame try: print("Creating MainFrame GUI...") log_handler.log_debug("Creating MainFrame GUI.", func_name="__init__") # Instantiate the MainFrame, passing necessary controller methods as callbacks # Use keyword arguments for clarity and robustness self.main_frame: MainFrame = MainFrame( master=master, # Profile Callbacks load_profile_settings_cb=self.load_profile_settings, save_profile_cb=self.save_profile_settings, add_profile_cb=self.add_profile, remove_profile_cb=self.remove_profile, clone_remote_repo_cb=self.clone_remote_repo, # Path/Status Callbacks browse_folder_cb=self.browse_folder, update_svn_status_cb=self.update_svn_status_indicator, # Repo/Bundle Callbacks prepare_svn_for_git_cb=self.prepare_svn_for_git, create_git_bundle_cb=self.create_git_bundle, fetch_from_git_bundle_cb=self.fetch_from_git_bundle, open_gitignore_editor_cb=self.open_gitignore_editor, # Backup Callbacks manual_backup_cb=self.manual_backup, # Commit/Changes Callbacks commit_changes_cb=self.commit_changes, refresh_changed_files_cb=self.refresh_changed_files_list, open_diff_viewer_cb=self.open_diff_viewer, # Diff from changes list add_selected_file_cb=self.add_selected_file, # Tags Callbacks refresh_tags_cb=self.refresh_tag_list, create_tag_cb=self.create_tag, checkout_tag_cb=self.checkout_tag, # Branches Callbacks (Local) refresh_branches_cb=self.refresh_branch_list, create_branch_cb=self.create_branch, checkout_branch_cb=self.checkout_branch, delete_local_branch_cb=self.delete_local_branch, merge_local_branch_cb=self.merge_local_branch, # History/Compare Callbacks refresh_history_cb=self.refresh_commit_history, compare_branch_with_current_cb=self.compare_branch_with_current, view_commit_details_cb=self.view_commit_details, # View commit details # Remote Callbacks apply_remote_config_cb=self.apply_remote_config, check_connection_auth_cb=self.check_connection_auth, fetch_remote_cb=self.fetch_remote, pull_remote_cb=self.pull_remote, push_remote_cb=self.push_remote, push_tags_remote_cb=self.push_tags_remote, refresh_remote_status_cb=self.refresh_remote_status, refresh_remote_branches_cb=self.refresh_remote_branches, checkout_remote_branch_cb=self.checkout_remote_branch_as_local, # Dependencies config_manager_instance=self.config_manager, # Pass instance if needed by GUI profile_sections_list=self.config_manager.get_profile_sections(), # Pass initial profiles ) print("MainFrame GUI created.") log_handler.log_debug( "MainFrame GUI created successfully.", func_name="__init__" ) except Exception as e: print(f"FATAL: Failed to initialize MainFrame GUI: {e}", file=sys.stderr) log_handler.log_exception( "Failed to initialize MainFrame GUI.", func_name="__init__" ) self.show_fatal_error( f"GUI Initialization Error:\n{e}\n\nApplication cannot start." ) self.master.after(10, self.on_closing) return # Stop initialization # --- Setup Logging Processing --- # Configures file logging and starts the GUI log queue poller self._setup_logging_processing() # --- Log Application Start --- log_handler.log_info( "Git Utility Tool application starting up.", func_name="__init__" ) # --- Initial Profile Load --- # Loads the initially selected profile (default or first) into the GUI fields # and triggers initial data refreshes. self._perform_initial_load() # Log completion of initialization log_handler.log_info( "Git Utility Tool initialization complete and ready.", func_name="__init__" ) # --- Static Method Helper --- # (Metodo _extract_path_from_status_line INVARIATO) @staticmethod def _extract_path_from_status_line(file_status_line: str) -> Optional[str]: # ... (Codice invariato) func_name: str = "_extract_path_from_status_line" try: line: str = file_status_line.strip("\x00").strip() if not line or len(line) < 3: log_handler.log_warning( f"Invalid/short status line received: '{file_status_line}'", func_name=func_name, ) return None path_part: str = "" if "->" in line: path_part = line.split("->")[-1].strip() else: match = re.match(r"^[ MARCUD?!]{1,2}\s+(.*)", line) if match: path_part = match.group(1) else: log_handler.log_warning( f"Could not match expected status line format: '{line}'", func_name=func_name, ) first_space_index: int = line.find(" ") if first_space_index != -1: path_part = line[first_space_index:].strip() else: return None if not path_part: return None relative_path: str if ( len(path_part) >= 2 and path_part.startswith('"') and path_part.endswith('"') ): relative_path = path_part[1:-1] else: relative_path = path_part try: if "\\" in relative_path: decoded_bytes: bytes = ( bytes(relative_path, "utf-8") .decode("unicode_escape") .encode("latin-1") ) decoded_path: str = decoded_bytes.decode("utf-8") if decoded_path != relative_path and os.path.normpath(decoded_path): log_handler.log_debug( f"Path '{relative_path}' decoded to '{decoded_path}'", func_name=func_name, ) relative_path = decoded_path except Exception as decode_err: log_handler.log_warning( f"Could not decode potential escape sequences in path '{relative_path}': {decode_err}", func_name=func_name, ) if not relative_path: return None log_handler.log_debug( f"Cleaned path from status line: '{relative_path}'", func_name=func_name ) return relative_path except Exception as e: log_handler.log_exception( f"Error cleaning path from status line '{file_status_line}': {e}", func_name=func_name, ) return None # --- Logging Setup and Processing --- # (Metodi _setup_logging_processing, _process_log_queue INVARIATI) def _setup_logging_processing(self): # ... (Codice invariato) func_name: str = "_setup_logging_processing" try: setup_file_logging(level=logging.DEBUG) # Log DEBUG+ a file if hasattr(self, "main_frame") and hasattr(self.main_frame, "log_text"): log_handler.log_info( "Starting log queue processing for GUI.", func_name=func_name ) self.master.after( self.LOG_QUEUE_CHECK_INTERVAL_MS, self._process_log_queue ) else: print( "ERROR: Cannot start log queue processing - GUI log widget not found.", file=sys.stderr, ) log_handler.log_error( "Cannot start log queue processing - GUI log widget not found.", func_name=func_name, ) except Exception as e: print(f"ERROR during logging setup: {e}", file=sys.stderr) log_handler.log_exception( "Failed to setup logging processing.", func_name=func_name ) def _process_log_queue(self): # ... (Codice invariato) func_name: str = "_process_log_queue" log_widget: Optional[tk.scrolledtext.ScrolledText] = getattr( self.main_frame, "log_text", None ) if not log_widget or not log_widget.winfo_exists(): log_handler.log_warning( "Log widget not found, stopping queue processing.", func_name=func_name ) return processed_count: int = 0 max_proc_per_cycle: int = 50 while not log_handler.log_queue.empty(): if processed_count >= max_proc_per_cycle: log_handler.log_debug( f"Processed {max_proc_per_cycle} log entries, pausing.", func_name=func_name, ) break try: log_entry: dict = log_handler.log_queue.get_nowait() level: int = log_entry.get("level", logging.INFO) message: str = log_entry.get("message", "") level_name: str = log_handler.get_log_level_name(level) logging.getLogger().log(level, message) processed_count += 1 # Scrive su file handler if level >= logging.DEBUG: # Aggiorna GUI try: original_state: str = log_widget.cget("state") log_widget.config(state=tk.NORMAL) log_widget.insert(tk.END, message + "\n", (level_name,)) log_widget.see(tk.END) log_widget.config(state=original_state) except tk.TclError as e_gui: print( f"TclError updating log widget: {e_gui} - Message: {message}", file=sys.stderr, ) except Exception as e_gui: print( f"Error updating log widget: {e_gui} - Message: {message}", file=sys.stderr, ) except queue.Empty: break except Exception as e_proc: print(f"Error processing log queue item: {e_proc}", file=sys.stderr) try: logging.getLogger().error( f"Error processing log queue item: {e_proc}" ) except Exception: pass if self.master.winfo_exists(): self.master.after(self.LOG_QUEUE_CHECK_INTERVAL_MS, self._process_log_queue) # --- Initial Application Load --- # (Metodo _perform_initial_load INVARIATO) def _perform_initial_load(self): # ... (Codice invariato) func_name: str = "_perform_initial_load" log_handler.log_debug("Performing initial profile load.", func_name=func_name) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): log_handler.log_error( "Cannot perform initial load: MainFrame not ready.", func_name=func_name ) return initial_profile: str = self.main_frame.profile_var.get() if initial_profile: log_handler.log_debug( f"Loading initial profile: '{initial_profile}'", func_name=func_name ) self.load_profile_settings(initial_profile) else: log_handler.log_warning( "No initial profile set (no profiles found?).", func_name=func_name ) self._clear_and_disable_fields() self.main_frame.update_status_bar( "No profiles found. Please add a profile." ) # --- Application Closing Handler --- # (Metodo on_closing INVARIATO) def on_closing(self): # ... (Codice invariato) func_name: str = "on_closing" log_handler.log_info("Application closing initiated.", func_name=func_name) if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): try: self.main_frame.update_status_bar("Exiting...") except Exception: pass if self.master and self.master.winfo_exists(): self.master.destroy() log_handler.log_info("Application closed.", func_name=func_name) # --- Profile Management Callbacks --- # (Metodi load_profile_settings, save_profile_settings, add_profile, remove_profile INVARIATI rispetto a versione precedente con controllo URL) def load_profile_settings(self, profile_name: str): # ... (Codice Invariato - incluso controllo URL vuoto prima di check_connection) ... func_name: str = "load_profile_settings" log_handler.log_info( f"Loading settings for profile: '{profile_name}'", func_name=func_name ) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): log_handler.log_error( "Cannot load profile: Main frame not available.", func_name=func_name ) return self.main_frame.update_status_bar( f"Processing: Loading profile '{profile_name}'..." ) if ( not profile_name or profile_name not in self.config_manager.get_profile_sections() ): log_handler.log_warning( f"Profile '{profile_name}' invalid/not found.", func_name=func_name ) self._clear_and_disable_fields() if profile_name: self.main_frame.show_error( "Profile Load Error", f"Profile '{profile_name}' not found." ) status_msg: str = ( f"Error: Profile '{profile_name}' not found." if profile_name else "No profile selected." ) self.main_frame.update_status_bar(status_msg) return cm: ConfigManager = self.config_manager keys_with_defaults: dict = cm._get_expected_keys_with_defaults() settings: dict = {} for key, default_value in keys_with_defaults.items(): settings[key] = cm.get_profile_option( profile_name, key, fallback=default_value ) mf: MainFrame = self.main_frame repo_path_for_refresh: str = "" remote_url_loaded: str = "" try: mf.svn_path_entry.delete(0, tk.END) svn_path_value: str = settings.get("svn_working_copy_path", "") mf.svn_path_entry.insert(0, svn_path_value) repo_path_for_refresh = svn_path_value mf.usb_path_entry.delete(0, tk.END) mf.usb_path_entry.insert(0, settings.get("usb_drive_path", "")) mf.bundle_name_entry.delete(0, tk.END) mf.bundle_name_entry.insert(0, settings.get("bundle_name", "")) mf.bundle_updated_name_entry.delete(0, tk.END) mf.bundle_updated_name_entry.insert( 0, settings.get("bundle_name_updated", "") ) mf.autobackup_var.set( str(settings.get("autobackup", "False")).lower() == "true" ) mf.backup_dir_var.set(settings.get("backup_dir", DEFAULT_BACKUP_DIR)) mf.backup_exclude_extensions_var.set( settings.get("backup_exclude_extensions", "") ) mf.backup_exclude_dirs_var.set(settings.get("backup_exclude_dirs", "")) mf.toggle_backup_dir() mf.autocommit_var.set( str(settings.get("autocommit", "False")).lower() == "true" ) mf.clear_commit_message() if mf.commit_message_text.winfo_exists(): current_state = mf.commit_message_text.cget("state") mf.commit_message_text.config(state=tk.NORMAL) mf.commit_message_text.insert("1.0", settings.get("commit_message", "")) mf.commit_message_text.config(state=current_state) if hasattr(mf, "remote_url_var") and hasattr(mf, "remote_name_var"): remote_url_loaded = settings.get("remote_url", "") mf.remote_url_var.set(remote_url_loaded) mf.remote_name_var.set(settings.get("remote_name", DEFAULT_REMOTE_NAME)) else: log_handler.log_warning( "Remote URL/Name widgets not found in GUI during load.", func_name=func_name, ) log_handler.log_info( f"Applied settings from '{profile_name}' to GUI fields.", func_name=func_name, ) repo_path_for_refresh = mf.svn_path_entry.get() # Rileggi per sicurezza self.update_svn_status_indicator(repo_path_for_refresh) is_ready: bool = self._is_repo_ready(repo_path_for_refresh) if is_ready: log_handler.log_info( "Repo ready, triggering async refreshes.", func_name=func_name ) self.refresh_tag_list() self.refresh_branch_list() self.refresh_commit_history() self.refresh_changed_files_list() if remote_url_loaded: # Controlla URL prima di check connessione log_handler.log_debug( "Remote URL found, initiating connection check.", func_name=func_name, ) self.check_connection_auth() self.refresh_remote_status() else: log_handler.log_info( "Remote URL is empty. Skipping connection check.", func_name=func_name, ) self._update_gui_auth_status("unknown") if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status( status_text="Sync Status: (Remote not configured)" ) if hasattr(self.main_frame, "refresh_sync_status_button"): self.main_frame.refresh_sync_status_button.config( state=tk.DISABLED ) mf.update_status_bar( f"Profile '{profile_name}' loaded (Remote not configured)." ) else: log_handler.log_info( "Repo not ready, clearing dynamic lists.", func_name=func_name ) self._update_gui_for_not_ready_state() mf.update_status_bar( f"Profile '{profile_name}' loaded (Repo not ready)." ) except Exception as e: log_handler.log_exception( f"Error applying settings for '{profile_name}': {e}", func_name=func_name, ) if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status( status_text="Sync Status: Error" ) mf.show_error("Profile Load Error", f"Failed to apply settings:\n{e}") mf.update_status_bar(f"Error loading profile '{profile_name}'.") def save_profile_settings(self) -> bool: """ Saves current GUI field values to the selected profile in the config file. """ func_name: str = "save_profile_settings" profile_name_from_var: Optional[str] = None if hasattr(self, "main_frame") and hasattr(self.main_frame, "profile_var"): try: profile_name_from_var = self.main_frame.profile_var.get() print(f"DEBUG [save_profile_settings]: Value from self.main_frame.profile_var.get() is: {repr(profile_name_from_var)}") log_handler.log_debug(f"Value from self.main_frame.profile_var.get() is: {repr(profile_name_from_var)}", func_name=func_name) if hasattr(self.main_frame, "profile_dropdown"): current_combobox_value = self.main_frame.profile_dropdown.get() print(f"DEBUG [save_profile_settings]: Value from self.main_frame.profile_dropdown.get() is: {repr(current_combobox_value)}") log_handler.log_debug(f"Value from self.main_frame.profile_dropdown.get() is: {repr(current_combobox_value)}", func_name=func_name) except Exception as e_get: print(f"ERROR [save_profile_settings]: Failed to get profile_var value: {e_get}") log_handler.log_error(f"Failed to get profile_var value: {e_get}", func_name=func_name) profile_name_from_var = None else: print("DEBUG [save_profile_settings]: main_frame or profile_var not found.") log_handler.log_error("main_frame or profile_var not found during save.", func_name=func_name) # Usa il valore recuperato (o stringa vuota se None) profile_name: str = profile_name_from_var if profile_name_from_var is not None else "" # ---<<< NUOVO DEBUG: Controlla il tipo e il risultato del check >>>--- print(f"DEBUG [save_profile_settings]: Type of profile_name: {type(profile_name)}") print(f"DEBUG [save_profile_settings]: Result of 'not profile_name' check: {not profile_name}") log_handler.log_debug(f"Type of profile_name: {type(profile_name)}", func_name=func_name) log_handler.log_debug(f"Result of 'not profile_name' check: {not profile_name}", func_name=func_name) # ---<<< FINE NUOVO DEBUG >>>--- # Il controllo originale if not profile_name: log_handler.log_warning( "Save failed: No profile selected (profile_name is empty or evaluates to False).", func_name=func_name # Messaggio leggermente modificato ) if hasattr(self, "main_frame"): self.main_frame.update_status_bar("Save failed: No profile selected.") return False log_handler.log_info(f"Saving settings for profile: '{profile_name}'", ...) # OK mf: MainFrame = self.main_frame cm: ConfigManager = self.config_manager status_final: str = "STATUS_NOT_SET_YET" # Inizializza a un valore univoco success: bool = False exception_details: Optional[str] = None # Per memorizzare l'errore se c'è try: # ---<<< DEBUG: Dentro il TRY >>>--- print(f"DEBUG [save_profile_settings]: Entering TRY block.") settings_to_save: dict = { "svn_working_copy_path": mf.svn_path_entry.get(), "usb_drive_path": mf.usb_path_entry.get(), "bundle_name": mf.bundle_name_entry.get(), "bundle_name_updated": mf.bundle_updated_name_entry.get(), "autocommit": str(mf.autocommit_var.get()), "commit_message": mf.get_commit_message(), "autobackup": str(mf.autobackup_var.get()), "backup_dir": mf.backup_dir_var.get(), "backup_exclude_extensions": mf.backup_exclude_extensions_var.get(), "backup_exclude_dirs": mf.backup_exclude_dirs_var.get(), "remote_url": mf.remote_url_var.get(), "remote_name": mf.remote_name_var.get().strip() or DEFAULT_REMOTE_NAME, } print(f"DEBUG [save_profile_settings]: Settings gathered from GUI.") # OK log_handler.log_debug(f"Settings to save: {settings_to_save}", ...) # OK for key, value in settings_to_save.items(): cm.set_profile_option(profile_name, key, value) print(f"DEBUG [save_profile_settings]: Options set in ConfigManager object.") # OK cm.save_config() # <-- POTENZIALE PUNTO DI ERRORE # Se arriviamo qui, save_config non ha sollevato eccezioni print(f"DEBUG [save_profile_settings]: cm.save_config() completed successfully.") # OK log_handler.log_info(f"Settings saved successfully for '{profile_name}'.", ...) # OK status_final = f"Profile '{profile_name}' saved." # Imposta messaggio successo success = True print(f"DEBUG [save_profile_settings]: TRY block success. status_final = '{status_final}'") # OK except Exception as e: # ---<<< DEBUG: Dentro EXCEPT >>>--- print(f"ERROR [save_profile_settings]: EXCEPTION caught in TRY block: {type(e).__name__} - {e}") exception_details = f"{type(e).__name__}: {e}" log_handler.log_exception(f"Error saving profile '{profile_name}': {e}", ...) # OK status_final = f"Error saving profile '{profile_name}'." # Imposta messaggio errore print(f"DEBUG [save_profile_settings]: EXCEPT block executed. status_final = '{status_final}'") # OK mf.show_error("Save Error", f"Failed:\n{e}") # OK success = False finally: # ---<<< DEBUG: Dentro FINALLY >>>--- print(f"DEBUG [save_profile_settings]: Entering FINALLY block.") print(f"DEBUG [save_profile_settings]: Value of status_final BEFORE update_status_bar: '{status_final}'") # <-- CONTROLLA QUESTO mf.update_status_bar(status_final) print(f"DEBUG [save_profile_settings]: update_status_bar called with message: '{status_final}'") # OK if exception_details: print(f"DEBUG [save_profile_settings]: Exception recorded: {exception_details}") return success def add_profile(self): # ... (Codice invariato) func_name: str = "add_profile" log_handler.log_debug("'Add Profile' button clicked.", func_name=func_name) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return self.main_frame.update_status_bar("Adding new profile...") name: Optional[str] = self.main_frame.ask_new_profile_name() if name is None: log_handler.log_info("Add profile cancelled.", func_name=func_name) self.main_frame.update_status_bar("Add profile cancelled.") return name = name.strip() if not name: log_handler.log_warning("Add failed: Name empty.", func_name=func_name) self.main_frame.show_error("Input Error", "Profile name cannot be empty.") self.main_frame.update_status_bar("Add failed: Empty name.") return if name in self.config_manager.get_profile_sections(): log_handler.log_warning( f"Add failed: '{name}' exists.", func_name=func_name ) self.main_frame.show_error("Error", f"Profile '{name}' already exists.") self.main_frame.update_status_bar(f"Add failed: '{name}' exists.") return log_handler.log_info( f"Attempting to add new profile: '{name}'", func_name=func_name ) status_final: str = "Ready." try: defaults: dict = self.config_manager._get_expected_keys_with_defaults() defaults["bundle_name"] = f"{name}_repo.bundle" defaults["bundle_name_updated"] = f"{name}_update.bundle" defaults["svn_working_copy_path"] = "" defaults["usb_drive_path"] = "" defaults["remote_url"] = "" defaults["commit_message"] = f"Initial commit for profile {name}" self.config_manager.add_section(name) for key, value in defaults.items(): self.config_manager.set_profile_option(name, key, value) self.config_manager.save_config() log_handler.log_info( f"Profile '{name}' added successfully.", func_name=func_name ) sections: list = self.config_manager.get_profile_sections() self.main_frame.update_profile_dropdown(sections) self.main_frame.profile_var.set(name) except Exception as e: log_handler.log_exception( f"Error adding profile '{name}': {e}", func_name=func_name ) status_final = f"Error adding profile '{name}'." self.main_frame.show_error("Add Error", f"Failed:\n{e}") self.main_frame.update_status_bar(status_final) def remove_profile(self): # ... (Codice invariato) func_name: str = "remove_profile" log_handler.log_debug("'Remove Profile' button clicked.", func_name=func_name) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return profile: str = self.main_frame.profile_var.get() if not profile: log_handler.log_warning( "Remove failed: No profile selected.", func_name=func_name ) self.main_frame.show_error("Error", "No profile selected.") self.main_frame.update_status_bar("Remove failed: No profile.") return if profile == DEFAULT_PROFILE: log_handler.log_warning( "Attempt remove default denied.", func_name=func_name ) self.main_frame.show_error( "Denied", f"Cannot remove default profile ('{DEFAULT_PROFILE}')." ) self.main_frame.update_status_bar("Cannot remove default.") return if self.main_frame.ask_yes_no( title="Confirm Remove", message=f"Remove profile '{profile}'?\nThis cannot be undone.", ): log_handler.log_info( f"Attempting remove profile: '{profile}'", func_name=func_name ) self.main_frame.update_status_bar( f"Processing: Removing profile '{profile}'..." ) status_final: str = "Ready." try: removed: bool = self.config_manager.remove_profile_section(profile) if removed: self.config_manager.save_config() log_handler.log_info( f"Profile '{profile}' removed.", func_name=func_name ) status_final = f"Profile '{profile}' removed." sections: list = self.config_manager.get_profile_sections() self.main_frame.update_profile_dropdown( sections ) # Selecting new profile triggers load else: log_handler.log_error( f"Failed remove profile '{profile}' (ConfigManager returned False).", func_name=func_name, ) status_final = f"Error removing profile '{profile}'." self.main_frame.show_error( "Error", f"Could not remove '{profile}'." ) self.main_frame.update_status_bar(status_final) except Exception as e: log_handler.log_exception( f"Error removing profile '{profile}': {e}", func_name=func_name ) status_final = f"Error removing profile '{profile}'." self.main_frame.show_error("Error", f"Failed:\n{e}") self.main_frame.update_status_bar(status_final) else: log_handler.log_info("Profile removal cancelled.", func_name=func_name) self.main_frame.update_status_bar("Removal cancelled.") # --- GUI Interaction & Helper Methods --- # (Metodi browse_folder, update_svn_status_indicator, _calculate_fetch_button_state, _is_repo_ready, _parse_exclusions, # _get_and_validate_svn_path, _get_and_validate_usb_path, _clear_and_disable_fields, show_fatal_error, # show_comparison_summary, _update_gui_for_not_ready_state, _update_gui_for_detached_head, # _update_gui_for_no_upstream, _reenable_widgets_after_modal, _update_gui_for_status_error, # _update_gui_auth_status - INVARIATI) def browse_folder(self, entry_widget: tk.Entry): # Example of unchanged method # ... (Codice invariato) func_name: str = "browse_folder" current_path: str = entry_widget.get() initial_dir: str = os.path.expanduser("~") if current_path and os.path.isdir(current_path): initial_dir = current_path elif current_path and os.path.exists(os.path.dirname(current_path)): initial_dir = os.path.dirname(current_path) log_handler.log_debug( f"Opening folder browser. Initial: {initial_dir}", func_name=func_name ) directory: Optional[str] = filedialog.askdirectory( initialdir=initial_dir, title="Select Directory", parent=self.master ) if directory: log_handler.log_debug( f"Directory selected: {directory}", func_name=func_name ) entry_widget.delete(0, tk.END) entry_widget.insert(0, directory) if ( hasattr(self.main_frame, "svn_path_entry") and entry_widget == self.main_frame.svn_path_entry ): self.update_svn_status_indicator(directory) else: log_handler.log_debug("Folder browse cancelled.", func_name=func_name) def update_svn_status_indicator(self, svn_path: str): # Example of unchanged method # ... (Codice invariato) func_name: str = "update_svn_status_indicator" is_valid_dir: bool = bool(svn_path and os.path.isdir(svn_path)) is_repo_ready: bool = is_valid_dir and os.path.exists( os.path.join(svn_path, ".git") ) log_handler.log_debug( f"Updating repo status indicator. Path='{svn_path}', ValidDir={is_valid_dir}, Ready={is_repo_ready}", func_name=func_name, ) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return mf: MainFrame = self.main_frame mf.update_svn_indicator(is_repo_ready) repo_ready_state: str = tk.NORMAL if is_repo_ready else tk.DISABLED valid_dir_state: str = tk.NORMAL if is_valid_dir else tk.DISABLED prepare_state: str = ( tk.NORMAL if is_valid_dir and not is_repo_ready else tk.DISABLED ) fetch_button_state: str = self._calculate_fetch_button_state( mf, svn_path, is_repo_ready ) try: # Apply states if hasattr(mf, "prepare_svn_button"): mf.prepare_svn_button.config(state=prepare_state) if hasattr(mf, "create_bundle_button"): mf.create_bundle_button.config(state=repo_ready_state) if hasattr(mf, "fetch_bundle_button"): mf.fetch_bundle_button.config(state=fetch_button_state) if hasattr(mf, "edit_gitignore_button"): mf.edit_gitignore_button.config(state=repo_ready_state) if hasattr(mf, "manual_backup_button"): mf.manual_backup_button.config(state=valid_dir_state) if hasattr(mf, "autocommit_checkbox"): mf.autocommit_checkbox.config(state=repo_ready_state) if hasattr(mf, "commit_message_text"): mf.commit_message_text.config(state=repo_ready_state) if hasattr(mf, "refresh_changes_button"): mf.refresh_changes_button.config(state=repo_ready_state) if hasattr(mf, "commit_button"): mf.commit_button.config(state=repo_ready_state) if hasattr(mf, "refresh_tags_button"): mf.refresh_tags_button.config(state=repo_ready_state) if hasattr(mf, "create_tag_button"): mf.create_tag_button.config(state=repo_ready_state) if hasattr(mf, "checkout_tag_button"): mf.checkout_tag_button.config(state=repo_ready_state) if hasattr(mf, "tag_listbox"): mf.tag_listbox.config(state=repo_ready_state) if hasattr(mf, "refresh_branches_button"): mf.refresh_branches_button.config(state=repo_ready_state) if hasattr(mf, "create_branch_button"): mf.create_branch_button.config(state=repo_ready_state) if hasattr(mf, "checkout_branch_button"): mf.checkout_branch_button.config(state=repo_ready_state) if hasattr(mf, "branch_listbox"): mf.branch_listbox.config(state=repo_ready_state) if hasattr(mf, "refresh_history_button"): mf.refresh_history_button.config(state=repo_ready_state) if hasattr(mf, "history_branch_filter_combo"): combo_state: str = "readonly" if is_repo_ready else tk.DISABLED mf.history_branch_filter_combo.config(state=combo_state) # History treeview gestito in set_action_widgets_state if hasattr(mf, "apply_remote_config_button"): mf.apply_remote_config_button.config(state=repo_ready_state) if hasattr(mf, "check_auth_button"): mf.check_auth_button.config(state=repo_ready_state) if hasattr(mf, "fetch_button"): mf.fetch_button.config(state=repo_ready_state) if hasattr(mf, "pull_button"): mf.pull_button.config(state=repo_ready_state) if hasattr(mf, "push_button"): mf.push_button.config(state=repo_ready_state) if hasattr(mf, "push_tags_button"): mf.push_tags_button.config(state=repo_ready_state) if hasattr(mf, "refresh_sync_status_button"): mf.refresh_sync_status_button.config(state=repo_ready_state) if hasattr(mf, "refresh_remote_branches_button"): mf.refresh_remote_branches_button.config(state=repo_ready_state) if hasattr(mf, "remote_branches_listbox"): mf.remote_branches_listbox.config(state=repo_ready_state) if hasattr(mf, "local_branches_listbox_remote_tab"): mf.local_branches_listbox_remote_tab.config(state=repo_ready_state) if hasattr(mf, "refresh_local_branches_button_remote_tab"): mf.refresh_local_branches_button_remote_tab.config( state=repo_ready_state ) if hasattr(mf, "changed_files_listbox"): if not is_repo_ready: log_handler.log_debug( "Repo not ready, clearing changes list.", func_name=func_name ) mf.update_changed_files_list(["(Repository not ready)"]) except Exception as e: log_handler.log_error( f"Error updating widget states: {e}", func_name=func_name ) def _calculate_fetch_button_state( self, main_frame: "MainFrame", svn_path: str, is_repo_ready: bool ) -> str: # Example of unchanged method # ... (Codice invariato) func_name: str = "_calculate_fetch_button_state" try: can_use_svn_dir_for_clone: bool = False if svn_path: if os.path.isdir(svn_path): try: if not os.listdir(svn_path): can_use_svn_dir_for_clone = True except OSError: pass else: parent_dir: str = os.path.dirname(svn_path) if (parent_dir and os.path.isdir(parent_dir)) or not parent_dir: can_use_svn_dir_for_clone = True bundle_file_exists: bool = False usb_path_str: str = main_frame.usb_path_entry.get().strip() bundle_fetch_name: str = main_frame.bundle_updated_name_entry.get().strip() if usb_path_str and bundle_fetch_name and os.path.isdir(usb_path_str): bundle_full_path: str = os.path.join(usb_path_str, bundle_fetch_name) if os.path.isfile(bundle_full_path): bundle_file_exists = True if is_repo_ready or (can_use_svn_dir_for_clone and bundle_file_exists): return tk.NORMAL else: return tk.DISABLED except Exception as e: log_handler.log_error( f"Error checking fetch button state: {e}", func_name=func_name ) return tk.DISABLED def _is_repo_ready(self, repo_path: str) -> bool: # Example of unchanged method return bool( repo_path and os.path.isdir(repo_path) and os.path.exists(os.path.join(repo_path, ".git")) ) def _parse_exclusions( self, ) -> tuple[set[str], set[str]]: # Example of unchanged method excluded_extensions: set[str] = set() excluded_dirs: set[str] = {".git", ".svn"} if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return excluded_extensions, excluded_dirs mf: MainFrame = self.main_frame ext_string: str = mf.backup_exclude_extensions_var.get() if ext_string: for ext in ext_string.split(","): cleaned_ext: str = ext.strip().lower() if cleaned_ext: normalized_ext: str = "." + cleaned_ext.lstrip(".") excluded_extensions.add(normalized_ext) dir_string: str = mf.backup_exclude_dirs_var.get() if dir_string: for dirname in dir_string.split(","): cleaned_dir: str = dirname.strip().lower().strip(os.path.sep + "/") if ( cleaned_dir and cleaned_dir not in {".", ".."} and cleaned_dir not in excluded_dirs ): excluded_dirs.add(cleaned_dir) log_handler.log_debug( f"Parsed Exclusions - Exts: {excluded_extensions}, Dirs: {excluded_dirs}", func_name="_parse_exclusions", ) return excluded_extensions, excluded_dirs def _get_and_validate_svn_path( self, operation_name: str = "Operation" ) -> Optional[str]: # Example of unchanged method func_name: str = "_get_and_validate_svn_path" mf = getattr(self, "main_frame", None) if not mf or not hasattr(mf, "svn_path_entry"): log_handler.log_error( f"{operation_name} failed: SVN path entry missing.", func_name=func_name ) return None path_str: str = mf.svn_path_entry.get().strip() if not path_str: log_handler.log_warning( f"{operation_name} failed: Working Directory path is empty.", func_name=func_name, ) mf.show_error("Input Error", "Working Directory path cannot be empty.") mf.update_status_bar(f"{operation_name} failed: Path empty.") return None abs_path: str = os.path.abspath(path_str) if not os.path.isdir(abs_path): log_handler.log_warning( f"{operation_name} failed: Path is not a valid directory: {abs_path}", func_name=func_name, ) mf.show_error("Path Error", f"Path is not a valid directory:\n{abs_path}") mf.update_status_bar(f"{operation_name} failed: Not a directory.") return None log_handler.log_debug( f"{operation_name}: Using validated Working Directory path: {abs_path}", func_name=func_name, ) return abs_path def _get_and_validate_usb_path( self, operation_name: str = "Operation" ) -> Optional[str]: # Example of unchanged method func_name: str = "_get_and_validate_usb_path" mf = getattr(self, "main_frame", None) if not mf or not hasattr(mf, "usb_path_entry"): log_handler.log_error( f"{operation_name} failed: Bundle Target path entry missing.", func_name=func_name, ) return None path_str: str = mf.usb_path_entry.get().strip() if not path_str: log_handler.log_warning( f"{operation_name} failed: Bundle Target path is empty.", func_name=func_name, ) mf.show_error("Input Error", "Bundle Target path cannot be empty.") mf.update_status_bar(f"{operation_name} failed: Path empty.") return None abs_path: str = os.path.abspath(path_str) if not os.path.isdir(abs_path): log_handler.log_warning( f"{operation_name} failed: Path is not a valid directory: {abs_path}", func_name=func_name, ) mf.show_error("Path Error", f"Path is not a valid directory:\n{abs_path}") mf.update_status_bar(f"{operation_name} failed: Not a directory.") return None log_handler.log_debug( f"{operation_name}: Using validated Bundle Target path: {abs_path}", func_name=func_name, ) return abs_path def _clear_and_disable_fields(self): # Example of unchanged method if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return mf: MainFrame = self.main_frame log_handler.log_debug( "Clearing and disabling fields.", func_name="_clear_and_disable_fields" ) if hasattr(mf, "svn_path_entry"): mf.svn_path_entry.delete(0, tk.END) if hasattr(mf, "usb_path_entry"): mf.usb_path_entry.delete(0, tk.END) if hasattr(mf, "bundle_name_entry"): mf.bundle_name_entry.delete(0, tk.END) if hasattr(mf, "bundle_updated_name_entry"): mf.bundle_updated_name_entry.delete(0, tk.END) if hasattr(mf, "clear_commit_message"): mf.clear_commit_message() if hasattr(mf, "backup_dir_var"): mf.backup_dir_var.set("") if hasattr(mf, "backup_exclude_extensions_var"): mf.backup_exclude_extensions_var.set("") if hasattr(mf, "backup_exclude_dirs_var"): mf.backup_exclude_dirs_var.set("") if hasattr(mf, "remote_url_var"): mf.remote_url_var.set("") if hasattr(mf, "remote_name_var"): mf.remote_name_var.set("") if hasattr(mf, "autobackup_var"): mf.autobackup_var.set(False) if hasattr(mf, "autocommit_var"): mf.autocommit_var.set(False) if hasattr(mf, "toggle_backup_dir"): mf.toggle_backup_dir() self._update_gui_for_not_ready_state() self.update_svn_status_indicator("") if hasattr(mf, "remove_profile_button"): mf.remove_profile_button.config(state=tk.DISABLED) if hasattr(mf, "save_settings_button"): mf.save_settings_button.config(state=tk.DISABLED) mf.update_status_bar("No profile selected or repository not ready.") def show_fatal_error(self, message: str): # Example of unchanged method log_handler.log_critical( f"FATAL ERROR: {message}", func_name="show_fatal_error" ) parent_window: Optional[tk.Tk] = None try: if hasattr(self, "master") and self.master and self.master.winfo_exists(): parent_window = self.master messagebox.showerror("Fatal Error", message, parent=parent_window) except Exception as e: print(f"FATAL ERROR (GUI message failed: {e}): {message}", file=sys.stderr) finally: self.on_closing() def show_comparison_summary( self, ref1: str, ref2: str, repo_path: str, changed_files: List[str] ): # Example of unchanged method func_name: str = "show_comparison_summary" log_handler.log_debug( f"Attempting to show comparison summary: {ref1} vs {ref2}", func_name=func_name, ) if not all([ref1, ref2, repo_path, isinstance(changed_files, list)]): log_handler.log_error( "Missing data required to show comparison summary.", func_name=func_name ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error( "Display Error", "Internal error: Missing data for comparison." ) self._reenable_widgets_after_modal() return try: log_handler.log_debug( f"Opening DiffSummaryWindow with {len(changed_files)} files.", func_name=func_name, ) DiffSummaryWindow( master=self.master, git_commands=self.git_commands, repo_path=repo_path, ref1=ref1, ref2=ref2, changed_files_status=changed_files, ) log_handler.log_info( "Diff Summary window closed by user.", func_name=func_name ) if hasattr(self.main_frame, "update_status_bar"): self.main_frame.update_status_bar("Ready.") except Exception as e_summary: log_handler.log_exception( f"Error opening diff summary window: {e_summary}", func_name=func_name ) if hasattr(self.main_frame, "show_error") and hasattr( self.main_frame, "update_status_bar" ): self.main_frame.show_error( "Display Error", f"Could not display comparison results:\n{e_summary}", ) self.main_frame.update_status_bar("Error displaying comparison.") finally: self._reenable_widgets_after_modal() def _update_gui_for_not_ready_state(self): # Example of unchanged method if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return mf: MainFrame = self.main_frame log_handler.log_debug( "Updating GUI for 'Repo Not Ready' state.", func_name="_update_gui_for_not_ready_state", ) if hasattr(mf, "update_tag_list"): mf.update_tag_list([("(Repo not ready)", "")]) if hasattr(mf, "update_branch_list"): mf.update_branch_list([], None) if hasattr(mf, "update_history_display"): mf.update_history_display(["(Repo not ready)"]) if hasattr(mf, "update_history_branch_filter"): mf.update_history_branch_filter([]) if hasattr(mf, "update_changed_files_list"): mf.update_changed_files_list(["(Repo not ready)"]) if hasattr(mf, "update_remote_branches_list"): mf.update_remote_branches_list(["(Repo not ready)"]) if hasattr(mf, "update_ahead_behind_status"): mf.update_ahead_behind_status(status_text="Sync Status: (Repo not ready)") def _update_gui_for_detached_head( self, current_branch_name: Optional[str] ): # Example of unchanged method if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return mf: MainFrame = self.main_frame log_handler.log_debug( "Updating GUI for 'Detached HEAD' state.", func_name="_update_gui_for_detached_head", ) if hasattr(mf, "update_ahead_behind_status"): mf.update_ahead_behind_status( current_branch=current_branch_name, status_text="Sync Status: (Detached HEAD)", ) if hasattr(mf, "refresh_sync_status_button"): mf.refresh_sync_status_button.config(state=tk.DISABLED) def _update_gui_for_no_upstream( self, current_branch_name: Optional[str] ): # Example of unchanged method if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return mf: MainFrame = self.main_frame log_handler.log_debug( "Updating GUI for 'No Upstream' state.", func_name="_update_gui_for_no_upstream", ) if hasattr(mf, "update_ahead_behind_status"): mf.update_ahead_behind_status( current_branch=current_branch_name, status_text=f"Sync Status: Upstream not set", ) if hasattr(mf, "refresh_sync_status_button"): mf.refresh_sync_status_button.config(state=tk.DISABLED) def _reenable_widgets_after_modal(self): # Example of unchanged method func_name: str = "_reenable_widgets_after_modal" if hasattr(self, "master") and self.master.winfo_exists(): self.master.after(50, self._reenable_widgets_if_ready) log_handler.log_debug("Scheduled widget re-enable.", func_name=func_name) else: log_handler.log_warning( "Cannot schedule widget re-enable: Master window destroyed.", func_name=func_name, ) def _update_gui_for_status_error(self): # Example of unchanged method if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return mf: MainFrame = self.main_frame log_handler.log_debug( "Updating GUI for 'Status Error' state.", func_name="_update_gui_for_status_error", ) if hasattr(mf, "update_ahead_behind_status"): mf.update_ahead_behind_status(status_text="Sync Status: Error getting info") if hasattr(mf, "refresh_sync_status_button"): mf.refresh_sync_status_button.config(state=tk.DISABLED) def _update_gui_auth_status(self, status: str): # Example of unchanged method self.remote_auth_status = status if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): self.main_frame._update_auth_status_indicator(status) if status != "ok" and hasattr( self.main_frame, "update_ahead_behind_status" ): sync_status_text: str = ( f"Sync Status: ({status.replace('_', ' ').title()})" ) self.main_frame.update_ahead_behind_status(status_text=sync_status_text) # --- Async Operation Starter/Handler --- # (Metodi _start_async_operation, _check_completion_queue e helper INVARIATI) def _start_async_operation( self, worker_func: Callable, args_tuple: tuple, context_dict: dict ): # ... (Codice invariato) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): log_handler.log_error( "Cannot start async op: Main frame missing.", func_name="_start_async_operation", ) return context_name: str = context_dict.get("context", "unknown_op") status_msg: str = context_dict.get("status_msg", context_name) log_handler.log_info( f"--- Action Triggered: {context_name} (Async Start) ---", func_name=context_name, ) self.main_frame.set_action_widgets_state(tk.DISABLED) self.main_frame.update_status_bar( message=f"Processing: {status_msg}...", bg_color=self.main_frame.STATUS_YELLOW, ) results_queue: queue.Queue = queue.Queue(maxsize=1) full_args: tuple = args_tuple + (results_queue,) log_handler.log_debug( f"Creating worker thread for {context_name}. Worker func: {worker_func.__name__}", func_name="_start_async_operation", ) try: worker_thread = threading.Thread( target=worker_func, args=full_args, daemon=True ) log_handler.log_debug( f"Starting worker thread for {context_name}.", func_name="_start_async_operation", ) worker_thread.start() except Exception as thread_e: log_handler.log_exception( f"Failed to start worker thread for {context_name}: {thread_e}", func_name="_start_async_operation", ) self.main_frame.show_error( "Threading Error", f"Could not start background task for {context_name}.", ) self.main_frame.update_status_bar( f"Error starting task: {context_name}", bg_color=self.main_frame.STATUS_RED, duration_ms=10000, ) self.main_frame.set_action_widgets_state(tk.NORMAL) return log_handler.log_debug( f"Scheduling completion check for {context_name}.", func_name="_start_async_operation", ) self.master.after( self.ASYNC_QUEUE_CHECK_INTERVAL_MS, self._check_completion_queue, results_queue, context_dict, ) def _check_completion_queue(self, results_queue: queue.Queue, context: dict): # ... (Codice invariato) task_context: str = context.get("context", "unknown") func_name: str = "_check_completion_queue" try: result_data: Dict[str, Any] = results_queue.get_nowait() log_handler.log_info( f"Result received for '{task_context}'. Status: {result_data.get('status')}", func_name=func_name, ) should_reenable_now: bool = self._should_reenable_widgets_now( task_context, result_data.get("status") ) if not should_reenable_now: log_handler.log_debug( f"Postponing widget re-enable for context: {task_context}", func_name=func_name, ) if should_reenable_now: self._reenable_widgets_if_ready() self._update_status_bar_from_result(task_context, result_data) self._process_result_with_handler(result_data, context) except queue.Empty: self._reschedule_queue_check(results_queue, context) except Exception as e: self._handle_queue_check_error(e, task_context) def _should_reenable_widgets_now( self, task_context: str, status: Optional[str] ) -> bool: # Example of unchanged method if task_context == "check_connection" and status == "auth_required": return False if task_context == "interactive_auth" and status == "success": return False if task_context == "clone_remote" and status == "success": return False if ( task_context in ["checkout_tracking_branch", "checkout_branch", "checkout_tag"] and status == "success" ): return False if ( task_context in ["pull_remote", "merge_local_branch"] and status == "conflict" ): return False if task_context == "compare_branches" and status == "success": return False if task_context == "get_commit_details" and status == "success": return False # Non riabilitare finché finestra dettagli non è chiusa return True def _reenable_widgets_if_ready(self): # Example of unchanged method if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): log_handler.log_debug( "Re-enabling widgets now.", func_name="_reenable_widgets_if_ready" ) self.main_frame.set_action_widgets_state(tk.NORMAL) else: log_handler.log_warning( "Cannot re-enable widgets, MainFrame missing.", func_name="_reenable_widgets_if_ready", ) def _update_status_bar_from_result( self, task_context: str, result_data: dict ): # Example of unchanged method status: Optional[str] = result_data.get("status") message: str = result_data.get("message", "Operation finished.") skip_update: bool = False if task_context == "clone_remote" and status == "success": skip_update = True if ( task_context in ["checkout_tracking_branch", "checkout_branch", "checkout_tag"] and status == "success" ): skip_update = True if task_context == "compare_branches" and status == "success": skip_update = True if task_context == "get_commit_details" and status == "success": skip_update = True # Non aggiornare finché finestra dettagli non chiusa if status in ["conflict", "rejected"]: skip_update = True # Messaggi specifici gestiti dall'handler if ( not skip_update and hasattr(self, "main_frame") and self.main_frame.winfo_exists() ): status_color: Optional[str] = None reset_duration: int = 5000 if status == "success": status_color = self.main_frame.STATUS_GREEN elif status == "warning": status_color = self.main_frame.STATUS_YELLOW reset_duration = 7000 elif status == "auth_required": status_color = self.main_frame.STATUS_YELLOW reset_duration = 15000 # Rimosso conflict/rejected da qui, gestiti da handler specifico elif status == "error": status_color = self.main_frame.STATUS_RED reset_duration = 10000 self.main_frame.update_status_bar( message, bg_color=status_color, duration_ms=reset_duration ) def _process_result_with_handler( self, result_data: dict, context: dict ): # Example of unchanged method task_context: str = context.get("context", "unknown") func_name: str = "_process_result_with_handler" try: result_handler = AsyncResultHandler(self) result_handler.process(result_data, context) log_handler.log_debug( f"Result processing delegated for context '{task_context}'.", func_name=func_name, ) except Exception as handler_e: log_handler.log_exception( f"Error during result processing by handler for {task_context}: {handler_e}", func_name=func_name, ) if hasattr(self, "main_frame"): self.main_frame.show_error( "Processing Error", f"Failed to handle task result:\n{handler_e}" ) if ( not self._should_reenable_widgets_now( task_context, result_data.get("status") ) and hasattr(self.main_frame, "winfo_exists") and self.main_frame.winfo_exists() ): log_handler.log_warning( "Re-enabling widgets after handler error.", func_name=func_name ) self.main_frame.set_action_widgets_state(tk.NORMAL) def _reschedule_queue_check( self, results_queue: queue.Queue, context: dict ): # Example of unchanged method if hasattr(self, "master") and self.master.winfo_exists(): self.master.after( self.ASYNC_QUEUE_CHECK_INTERVAL_MS, self._check_completion_queue, results_queue, context, ) def _handle_queue_check_error( self, error: Exception, task_context: str ): # Example of unchanged method func_name: str = "_handle_queue_check_error" log_handler.log_exception( f"Critical error checking completion queue for {task_context}: {error}", func_name=func_name, ) try: if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): self.main_frame.set_action_widgets_state(tk.NORMAL) self.main_frame.update_status_bar( "Error processing async result.", bg_color=self.main_frame.STATUS_RED, duration_ms=10000, ) self._update_gui_for_status_error() except Exception as recovery_e: log_handler.log_error( f"Failed to recover GUI after queue processing error: {recovery_e}", func_name=func_name, ) # --- Specific Action Launchers --- # (Metodi refresh_*, prepare_*, create_*, fetch_*, manual_backup, commit_changes, # open_gitignore_editor, _handle_gitignore_save, add_selected_file, # create_tag, checkout_tag, create_branch, checkout_branch, delete_local_branch, # merge_local_branch, compare_branch_with_current, view_commit_details, # apply_remote_config, check_connection_auth, fetch_remote, pull_remote, # push_remote, push_tags_remote, clone_remote_repo, refresh_remote_branches, # checkout_remote_branch_as_local - INVARIATI rispetto a versione precedente) # Example: refresh_tag_list def refresh_tag_list(self): func_name: str = "refresh_tag_list" svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Tags") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_debug( "Refresh Tags skipped: Repo not ready.", func_name=func_name ) self._update_gui_for_not_ready_state() return args: tuple = (self.git_commands, svn_path) self._start_async_operation( worker_func=async_workers.run_refresh_tags_async, args_tuple=args, context_dict={"context": "refresh_tags", "status_msg": "Refreshing tags"}, ) # ... (Implementa o copia TUTTI gli altri metodi launcher qui, sono invariati nella loro logica interna) ... # Example: push_remote def push_remote(self): func_name: str = "push_remote" log_handler.log_info( f"--- Action Triggered: Push Branch to Remote ---", func_name=func_name ) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return svn_path: Optional[str] = self._get_and_validate_svn_path("Push Branch") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Push Branch skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path invalid or not prepared." ) self.main_frame.update_status_bar("Push failed: Repo not ready.") return remote_name: str = self.main_frame.remote_name_var.get().strip() if not remote_name: remote_name = DEFAULT_REMOTE_NAME self.main_frame.remote_name_var.set(remote_name) if self.remote_auth_status != "ok": auth_msg: str = f"Cannot Push to '{remote_name}':\n" # ... (build auth msg) ... if self.remote_auth_status == "required": auth_msg += "Authentication is required..." elif self.remote_auth_status == "failed": auth_msg += "Authentication previously failed..." elif self.remote_auth_status == "connection_failed": auth_msg += "Connection previously failed..." else: auth_msg += "Connection status is unknown or in error..." log_handler.log_warning( f"Push Remote skipped: Auth/Connection status is '{self.remote_auth_status}'.", func_name=func_name, ) self.main_frame.show_warning("Action Blocked", auth_msg) self.main_frame.update_status_bar(f"Push failed: {self.remote_auth_status}") return try: if self.git_commands.git_status_has_changes(svn_path): if not self.main_frame.ask_yes_no( "Uncommitted Changes", "There are uncommitted changes.\nPush anyway? (Only committed changes will be pushed)", ): self.main_frame.update_status_bar( "Push cancelled (uncommitted changes)." ) return except GitCommandError as status_err: log_handler.log_error( f"Push aborted: Failed to check status: {status_err}", func_name=func_name, ) self.main_frame.show_error( "Status Error", f"Could not check repo status:\n{status_err}" ) return log_handler.log_info( f"Starting push for current branch to remote '{remote_name}'...", func_name=func_name, ) args: tuple = ( self.remote_action_handler, self.git_commands, svn_path, remote_name, ) self._start_async_operation( worker_func=async_workers.run_push_remote_async, args_tuple=args, context_dict={ "context": "push_remote", "status_msg": f"Pushing current branch to '{remote_name}'", "remote_name": remote_name, }, ) # ... Aggiungi TUTTI gli altri metodi launcher qui ... # --- Helper per Suggestion Tag --- # (Metodo _generate_next_tag_suggestion INVARIATO) def _generate_next_tag_suggestion(self, svn_path: str) -> str: # ... (Codice invariato) func_name: str = "_generate_next_tag_suggestion" log_handler.log_debug("Generating next tag suggestion...", func_name=func_name) default_suggestion: str = "v.0.0.0.1" latest_valid_tag: Optional[str] = None tag_pattern = re.compile(r"^v\.(\d+)\.(\d+)\.(\d+)\.(\d+)$") try: tags_data: list[tuple[str, str]] = self.git_commands.list_tags(svn_path) if not tags_data: log_handler.log_debug("No existing tags found.", func_name=func_name) return default_suggestion for tag_name, _ in tags_data: match = tag_pattern.match(tag_name) if match: latest_valid_tag = tag_name log_handler.log_debug( f"Found latest tag matching pattern: {latest_valid_tag}", func_name=func_name, ) break if not latest_valid_tag: log_handler.log_debug( "No tags matched pattern. Suggesting default.", func_name=func_name ) return default_suggestion match = tag_pattern.match(latest_valid_tag) if not match: log_handler.log_error( f"Internal error: Could not re-match tag {latest_valid_tag}", func_name=func_name, ) return default_suggestion v1, v2, v3, v4 = map(int, match.groups()) limit: int = 99 v4 += 1 if v4 > limit: v4 = 0 v3 += 1 if v3 > limit: v3 = 0 v2 += 1 if v2 > limit: v2 = 0 v1 += 1 next_tag: str = f"v.{v1}.{v2}.{v3}.{v4}" log_handler.log_debug( f"Generated suggestion: {next_tag}", func_name=func_name ) return next_tag except Exception as e: log_handler.log_exception( f"Error generating tag suggestion: {e}", func_name=func_name ) return default_suggestion def clone_remote_repo(self): """ Handles the 'Clone Remote Repository' action initiated from the GUI. Opens a dialog to get parameters and starts the asynchronous clone worker. """ func_name: str = "clone_remote_repo" log_handler.log_info( f"--- Action Triggered: Clone Remote Repository ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): log_handler.log_error( "Cannot start clone: Main frame not available.", func_name=func_name ) return # Show modal dialog using the imported class dialog = CloneFromRemoteDialog(self.master) # dialog.result will be None if cancelled, or (url, parent_dir, profile_name_input) dialog_result: Optional[Tuple[str, str, str]] = dialog.result if not dialog_result: log_handler.log_info( "Clone operation cancelled by user in dialog.", func_name=func_name ) self.main_frame.update_status_bar("Clone cancelled.") return # Extract data from dialog result remote_url, local_parent_dir, profile_name_input = dialog_result # --- Derive target directory and profile name, validate paths --- final_profile_name: str = "" target_clone_dir: str = "" repo_name_from_url: str = "" try: # Derive repo name from URL (remove .git suffix) repo_name_from_url = os.path.basename(remote_url) if repo_name_from_url.lower().endswith(".git"): repo_name_from_url = repo_name_from_url[:-4] if not repo_name_from_url: raise ValueError("Could not derive repository name from URL.") # Construct full target path for the clone target_clone_dir = os.path.join(local_parent_dir, repo_name_from_url) # Normalize the path target_clone_dir = os.path.abspath(target_clone_dir) # Determine final profile name (use input or derive from repo name) if profile_name_input: final_profile_name = profile_name_input # Check if proposed profile name already exists if final_profile_name in self.config_manager.get_profile_sections(): raise ValueError( f"Profile name '{final_profile_name}' already exists. " f"Please choose a different name." ) else: # Use repo name as base, add counter if it exists final_profile_name = repo_name_from_url counter: int = 1 # Check against existing sections from config manager while final_profile_name in self.config_manager.get_profile_sections(): final_profile_name = f"{repo_name_from_url}_{counter}" counter += 1 log_handler.log_debug( f"Derived target clone directory: {target_clone_dir}", func_name=func_name ) log_handler.log_debug( f"Determined profile name: {final_profile_name}", func_name=func_name ) # --- CRITICAL CHECK: Target directory must NOT exist for 'git clone' --- if os.path.exists(target_clone_dir): error_msg: str = ( f"Clone failed: Target directory already exists:\n{target_clone_dir}\n" f"Please choose a different parent directory or ensure the target location is empty." ) log_handler.log_error(error_msg, func_name=func_name) self.main_frame.show_error("Clone Path Error", error_msg) self.main_frame.update_status_bar("Clone failed: Target directory exists.") return # Stop the operation except ValueError as ve: # Handle errors deriving names or validating profile name log_handler.log_error( f"Clone configuration error: {ve}", func_name=func_name ) self.main_frame.show_error("Configuration Error", str(ve)) self.main_frame.update_status_bar("Clone failed: Configuration error.") return except Exception as e: # Handle unexpected errors during preparation log_handler.log_exception( f"Unexpected error preparing for clone: {e}", func_name=func_name ) self.main_frame.show_error( "Internal Error", f"An unexpected error occurred:\n{e}" ) self.main_frame.update_status_bar("Clone failed: Internal error.") return # --- Start Asynchronous Worker --- log_handler.log_info( f"Starting clone for '{remote_url}' into '{target_clone_dir}'...", func_name=func_name ) # Prepare arguments for the worker function args: tuple = (self.git_commands, remote_url, target_clone_dir, final_profile_name) # Start the async operation using the generic starter method self._start_async_operation( worker_func=async_workers.run_clone_remote_async, # Worker function for cloning args_tuple=args, context_dict={ "context": "clone_remote", # Context identifier for result handler "status_msg": f"Cloning '{repo_name_from_url}'...", # Message for status bar "clone_success_data": { # Pass data needed by result handler for profile creation "profile_name": final_profile_name, "cloned_path": target_clone_dir, "remote_url": remote_url, } } ) # --- Local Repo / Bundle / Backup Actions --- def prepare_svn_for_git(self): """ Starts async operation to prepare the repository (init, gitignore). """ func_name: str ="prepare_svn_for_git" svn_path: Optional[str] = self._get_and_validate_svn_path("Prepare Repository") # Check if path is valid before starting if not svn_path: # Error message shown by validation method self.main_frame.update_status_bar("Prepare failed: Invalid path.") return # Check if already prepared (avoid unnecessary work) if self._is_repo_ready(svn_path): log_handler.log_info( "Prepare skipped: Repository already prepared.", func_name=func_name ) self.main_frame.show_info("Info", "Repository is already prepared.") # Ensure GUI state reflects readiness self.update_svn_status_indicator(svn_path) return # Prepare arguments args: tuple = (self.action_handler, svn_path) # Start the async operation self._start_async_operation( worker_func=async_workers.run_prepare_async, args_tuple=args, context_dict={ "context": "prepare_repo", "status_msg": "Preparing repository" } ) def create_git_bundle(self): """ Starts async operation to create a Git bundle file. """ func_name: str ="create_git_bundle" # Gather and validate inputs from GUI profile: str = self.main_frame.profile_var.get() svn_path: Optional[str] = self._get_and_validate_svn_path("Create Bundle") usb_path: Optional[str] = self._get_and_validate_usb_path("Create Bundle") bundle_name: str = self.main_frame.bundle_name_entry.get().strip() # Check if all required inputs are present if not profile or not svn_path or not usb_path or not bundle_name: log_handler.log_warning( "Create Bundle cancelled: Missing inputs.", func_name=func_name ) # Specific error messages shown by validation methods return # Check if repository is ready if not self._is_repo_ready(svn_path): log_handler.log_warning( "Create Bundle failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not prepared.") self.main_frame.update_status_bar("Create Bundle failed: Repo not ready.") return # Ensure bundle name has the correct extension if not bundle_name.lower().endswith(".bundle"): bundle_name += ".bundle" bundle_full_path: str = os.path.join(usb_path, bundle_name) # Save profile settings before starting the operation if not self.save_profile_settings(): # Ask user if they want to proceed even if saving failed if not self.main_frame.ask_yes_no( "Warning", "Could not save profile settings.\nContinue creating bundle anyway?" ): self.main_frame.update_status_bar( "Create Bundle cancelled (profile save failed)." ) return # Prepare parameters for the worker excluded_extensions: set[str] excluded_dirs: set[str] excluded_extensions, excluded_dirs = self._parse_exclusions() backup_enabled: bool = self.main_frame.autobackup_var.get() backup_dir: str = self.main_frame.backup_dir_var.get() commit_enabled: bool = self.main_frame.autocommit_var.get() commit_msg: str = self.main_frame.get_commit_message() # Prepare arguments tuple for the worker args: tuple = ( self.action_handler, svn_path, bundle_full_path, profile, backup_enabled, backup_dir, commit_enabled, commit_msg, excluded_extensions, excluded_dirs, ) # Start the async operation self._start_async_operation( worker_func=async_workers.run_create_bundle_async, args_tuple=args, context_dict={ "context": "create_bundle", "status_msg": f"Creating bundle '{bundle_name}'", "committed_flag_possible": True, # Context hint for result handler } ) def fetch_from_git_bundle(self): """ Starts async operation to fetch/clone from a Git bundle file. """ func_name: str ="fetch_from_git_bundle" # Gather and validate inputs profile: str = self.main_frame.profile_var.get() # svn_path_str can be a non-existent dir if cloning svn_path_str: str = self.main_frame.svn_path_entry.get().strip() usb_path: Optional[str] = self._get_and_validate_usb_path("Fetch Bundle") bundle_name: str = self.main_frame.bundle_updated_name_entry.get().strip() # Check for missing inputs if not profile or not svn_path_str or not usb_path or not bundle_name: log_handler.log_warning( "Fetch Bundle cancelled: Missing inputs.", func_name=func_name ) return # Construct full bundle path and check if it exists BEFORE starting async op bundle_full_path: str = os.path.join(usb_path, bundle_name) if not os.path.isfile(bundle_full_path): log_handler.log_error( f"Fetch Bundle failed: Bundle file not found at '{bundle_full_path}'", func_name=func_name ) self.main_frame.show_error( "File Not Found", f"Bundle file not found:\n{bundle_full_path}" ) self.main_frame.update_status_bar("Fetch failed: Bundle not found.") return # Save profile settings before action if not self.save_profile_settings(): if not self.main_frame.ask_yes_no( "Warning", "Could not save profile settings.\nContinue fetching from bundle anyway?" ): self.main_frame.update_status_bar( "Fetch cancelled (profile save failed)." ) return # Prepare parameters for worker excluded_extensions: set[str] excluded_dirs: set[str] excluded_extensions, excluded_dirs = self._parse_exclusions() backup_enabled: bool = self.main_frame.autobackup_var.get() backup_dir: str = self.main_frame.backup_dir_var.get() # Prepare arguments tuple args: tuple = ( self.action_handler, svn_path_str, bundle_full_path, profile, backup_enabled, backup_dir, excluded_extensions, excluded_dirs, ) # Start async operation self._start_async_operation( worker_func=async_workers.run_fetch_bundle_async, args_tuple=args, context_dict={ "context": "fetch_bundle", "status_msg": f"Fetching from '{bundle_name}'", "repo_path": svn_path_str, # Pass path for potential conflict message } ) def show_commit_details(self, commit_details: Dict[str, Any]): """ Opens the CommitDetailWindow to display details of a specific commit. Called by the AsyncResultHandler after fetching commit data. Args: commit_details (Dict[str, Any]): A dictionary containing commit metadata (hash, author, date, subject, body) and a list of changed files. """ func_name: str = "show_commit_details" log_handler.log_debug( f"Attempting to show commit details for hash: {commit_details.get('hash_full', 'N/A')}", func_name=func_name ) # Validazione base dei dati ricevuti if not isinstance(commit_details, dict) or not commit_details.get('hash_full'): log_handler.log_error( "Invalid or incomplete commit details received.", func_name=func_name ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error( "Display Error", "Internal error: Received invalid commit data." ) # Riabilita widget se i dati non sono validi self._reenable_widgets_after_modal() return try: # Crea e mostra la finestra modale CommitDetailWindow log_handler.log_debug( f"Opening CommitDetailWindow for commit {commit_details.get('hash_full')[:7]}...", func_name=func_name ) CommitDetailWindow( master=self.master, # Parent è la finestra root commit_data=commit_details, # Passa il dizionario dei dati # Passa il callback per aprire il diff di un file specifico open_diff_callback=self._open_commit_file_diff ) # Il codice attende qui finché la finestra CommitDetailWindow non viene chiusa log_handler.log_info("Commit Detail window closed by user.", func_name=func_name) # Ripristina la status bar dopo la chiusura della finestra if hasattr(self.main_frame, "update_status_bar"): self.main_frame.update_status_bar("Ready.") except Exception as e_detail: # Gestisci errori durante la creazione/visualizzazione della finestra log_handler.log_exception( f"Error opening commit detail window: {e_detail}", func_name=func_name ) if hasattr(self.main_frame, "show_error") and hasattr(self.main_frame, "update_status_bar"): self.main_frame.show_error( "Display Error", f"Could not display commit details:\n{e_detail}" ) self.main_frame.update_status_bar("Error displaying commit details.") finally: # Assicurati che i widget vengano riabilitati dopo che la finestra # (o il messaggio di errore) è stata chiusa. self._reenable_widgets_after_modal() def open_gitignore_editor(self): """ Opens the .gitignore editor window (Synchronous GUI action). """ # This action is synchronous as it opens a modal dialog func_name: str = "open_gitignore_editor" log_handler.log_info( f"--- Action Triggered: Edit .gitignore ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness self.main_frame.update_status_bar("Processing: Opening .gitignore editor...") svn_path: Optional[str] = self._get_and_validate_svn_path("Edit .gitignore") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Cannot edit .gitignore: Repo path invalid/not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Select a valid and prepared repository first." ) self.main_frame.update_status_bar("Edit failed: Repo not ready.") return # Construct path and open editor window gitignore_path: str = os.path.join(svn_path, ".gitignore") log_handler.log_debug( f"Target .gitignore path: {gitignore_path}", func_name=func_name ) status_after_edit: str = "Ready." # Default status after editor closes try: log_handler.log_debug("Opening GitignoreEditorWindow...", func_name=func_name) # Open the modal editor window, passing the callback for successful save GitignoreEditorWindow( master=self.master, # Parent window gitignore_path=gitignore_path, on_save_success_callback=self._handle_gitignore_save # Method to call on save ) # Code execution pauses here until the editor window is closed log_handler.log_debug("Gitignore editor window closed.", func_name=func_name) # Update status bar only if no async operation was started by the callback if not self.main_frame.status_bar_var.get().startswith("Processing"): self.main_frame.update_status_bar(status_after_edit) except Exception as e: # Handle errors opening the editor log_handler.log_exception( f"Error opening or running .gitignore editor: {e}", func_name=func_name ) status_after_edit = "Error opening .gitignore editor." self.main_frame.show_error("Editor Error", f"Could not open editor:\n{e}") self.main_frame.update_status_bar(status_after_edit) def manual_backup(self): """ Starts async operation for creating a manual backup ZIP. """ func_name: str ="manual_backup" # Gather and validate inputs profile: str = self.main_frame.profile_var.get() svn_path: Optional[str] = self._get_and_validate_svn_path(f"Manual Backup ({profile})") backup_dir_str: str = self.main_frame.backup_dir_var.get().strip() # Check required inputs if not profile or not svn_path: # Error already shown by validation method return if not backup_dir_str: log_handler.log_warning( "Manual backup failed: Backup directory is empty.", func_name=func_name ) self.main_frame.show_error( "Input Error", "Backup directory cannot be empty for manual backup." ) self.main_frame.update_status_bar("Manual backup failed: Backup dir empty.") return # Validate backup directory path backup_dir_abs: str = os.path.abspath(backup_dir_str) # Check if path exists and is not a directory (create_zip_backup handles creation) if os.path.exists(backup_dir_abs) and not os.path.isdir(backup_dir_abs): log_handler.log_error( f"Manual backup failed: Backup path exists but is not a directory: {backup_dir_abs}", func_name=func_name ) self.main_frame.show_error( "Path Error", f"Backup path exists but is not a directory:\n{backup_dir_abs}" ) self.main_frame.update_status_bar( "Manual backup failed: Invalid backup path." ) return # Save profile settings before backup if not self.save_profile_settings(): if not self.main_frame.ask_yes_no( "Warning", "Could not save profile settings.\nContinue backup anyway?" ): self.main_frame.update_status_bar( "Backup cancelled (profile save failed)." ) return # Prepare parameters for worker excluded_extensions: set[str] excluded_dirs: set[str] excluded_extensions, excluded_dirs = self._parse_exclusions() # Prepare arguments tuple args: tuple = ( self.backup_handler, svn_path, backup_dir_abs, profile, excluded_extensions, excluded_dirs ) # Start async operation self._start_async_operation( worker_func=async_workers.run_manual_backup_async, args_tuple=args, context_dict={"context": "manual_backup", "status_msg": "Creating manual backup"} ) # --- Git Actions (Commit, Tag, Branch, etc.) --- def commit_changes(self): """ Starts async operation to commit staged changes with GUI message. """ func_name: str = "commit_changes" # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Commit") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Commit failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not ready.") self.main_frame.update_status_bar("Commit failed: Repo not ready.") return # Validate commit message commit_msg: str = self.main_frame.get_commit_message() if not commit_msg: log_handler.log_warning( "Commit failed: Commit message is empty.", func_name=func_name ) self.main_frame.show_error("Input Error", "Commit message cannot be empty.") self.main_frame.update_status_bar("Commit failed: Empty message.") return # Prepare args and start async operation args: tuple = (self.action_handler, svn_path, commit_msg) self._start_async_operation( worker_func=async_workers.run_commit_async, args_tuple=args, context_dict={ "context": "commit", "status_msg": "Committing changes", "committed_flag_possible": True # Hint for result handler } ) def refresh_changed_files_list(self): """ Starts async operation to refresh the list of changed files. """ func_name: str ="refresh_changed_files_list" svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Changed Files") # Check if repo is ready if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_debug( "Refresh Changes skipped: Repo not ready.", func_name=func_name ) # GUI update handled by _update_gui_for_not_ready_state if needed return # Prepare arguments args: tuple = (self.git_commands, svn_path) # Start the async operation self._start_async_operation( worker_func=async_workers.run_refresh_changes_async, args_tuple=args, context_dict={ "context": "refresh_changes", "status_msg": "Refreshing changed files" } ) def open_diff_viewer(self, file_status_line: str): """ Opens the Diff Viewer window for a file from the 'changed files' list. Compares Working Directory vs HEAD by default for changed files. Uses helper to extract path. This is a synchronous GUI action. """ func_name: str = "open_diff_viewer" log_handler.log_debug( f"Received file_status_line: {repr(file_status_line)}", func_name=func_name ) log_handler.log_info( f"--- Action Triggered: Open Diff Viewer for Changed File ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Open Diff Viewer") if not svn_path: # Error message shown by validation method self.main_frame.update_status_bar( "Error: Cannot open diff (invalid repo path)." ) return # Extract the relative path using the helper method relative_path: Optional[str] = self._extract_path_from_status_line( file_status_line ) log_handler.log_debug( f"Extracted relative_path via helper: {repr(relative_path)}", func_name=func_name ) # Check if path extraction was successful if not relative_path: log_handler.log_error( f"Could not extract valid path from status line: {file_status_line}", func_name=func_name ) self.main_frame.show_error( "Path Error", f"Could not parse file path from selected line:\n{file_status_line}" ) self.main_frame.update_status_bar("Error: Invalid selection for diff.") return # --- Check Status Code (e.g., prevent diff for deleted files vs WD) --- status_code: str = file_status_line.strip('\x00').strip()[:2].strip() # Prevent showing diff for files marked as Deleted (' D') against Working Dir if status_code == 'D': msg: str = ( f"Cannot show Working Dir vs HEAD diff for a deleted file:\n" f"{relative_path}" ) log_handler.log_info(msg, func_name=func_name) self.main_frame.show_info("Diff Not Applicable", msg) self.main_frame.update_status_bar( "Ready (Diff not applicable for deleted file)." ) return # Add checks for other non-diffable statuses if needed (e.g., '??', '!!') if status_code in ['??', '!!']: msg: str = ( f"Cannot show diff for file with status '{status_code}':\n" f"{relative_path}\n\n" f"(Untracked or Ignored files cannot be diffed against HEAD)." ) log_handler.log_info( f"Diff not applicable for status '{status_code}'.", func_name=func_name ) self.main_frame.show_info("Diff Not Applicable", msg) self.main_frame.update_status_bar("Ready (Diff not applicable).") return # --- Open DiffViewerWindow --- log_handler.log_debug( f"Opening DiffViewerWindow for '{relative_path}' (Working Dir vs HEAD)", func_name=func_name ) status_final: str = "Ready." # Default status after closing viewer try: # Instantiate and display the modal DiffViewerWindow DiffViewerWindow( master=self.master, # Parent window git_commands=self.git_commands, # Pass GitCommands instance repo_path=svn_path, relative_file_path=relative_path, # Use the cleaned path ref1='WORKING_DIR', # Compare working directory... ref2='HEAD' # ...against HEAD commit ) # Code execution pauses here until the DiffViewerWindow is closed log_handler.log_debug("Diff viewer window closed.", func_name=func_name) status_final = "Ready." except Exception as e: # Handle errors opening the diff viewer window log_handler.log_exception( f"Error opening or running diff viewer: {e}", func_name=func_name ) status_final = "Error: Failed to open diff viewer." self.main_frame.show_error( "Diff Viewer Error", f"Could not display diff:\n{e}" ) finally: # Update status bar after the window closes or an error occurs, # but only if another async operation hasn't started in the meantime. if hasattr(self, "main_frame") and not self.main_frame.status_bar_var.get().startswith("Processing"): self.main_frame.update_status_bar(status_final) def add_selected_file(self, file_status_line: str): """ Starts async operation to add a selected untracked file ('??') to staging. """ func_name: str = "add_selected_file" log_handler.log_info( f"--- Action Triggered: Add File '{file_status_line}' (Async) ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Add File") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Add file failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not ready.") self.main_frame.update_status_bar("Add failed: Repo not ready.") return # Extract relative path from the status line (should start with '??') relative_path: Optional[str] = None try: line: str = file_status_line.strip("\x00").strip() # Only proceed if the status indicates an untracked file if line.startswith("??"): # Extract path after '?? ' handling potential quotes path_raw: str = line[2:].lstrip() if len(path_raw) >= 2 and path_raw.startswith('"') and path_raw.endswith('"'): relative_path = path_raw[1:-1] else: relative_path = path_raw else: # Show error if trying to add a non-untracked file log_handler.log_error( f"Cannot add non-untracked file: {line}", func_name=func_name ) self.main_frame.show_error( "Invalid Action", f"Cannot 'Add' file with status '{line[:2]}'.\nUse commit for modified/staged files." ) self.main_frame.update_status_bar("Add failed: Not an untracked file.") return # Check if path extraction failed if not relative_path: raise ValueError("Extracted relative path is empty.") except Exception as e: # Handle errors parsing the path log_handler.log_error( f"Error parsing path for add from line '{file_status_line}': {e}", func_name=func_name ) self.main_frame.show_error( "Parsing Error", f"Cannot parse file path from:\n{file_status_line}" ) self.main_frame.update_status_bar("Add failed: Parse error.") return # Prepare args and start the add worker args: tuple = (self.git_commands, svn_path, relative_path) base_filename: str = os.path.basename(relative_path) # For status message self._start_async_operation( worker_func=async_workers.run_add_file_async, args_tuple=args, context_dict={ "context": "add_file", "status_msg": f"Adding '{base_filename}'" } ) def create_tag(self): """ Handles tag creation: shows dialog, suggests name, starts async operation. """ func_name: str = "create_tag" log_handler.log_info( f"--- Action Triggered: Create Tag ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Create Tag") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Create Tag failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not ready.") self.main_frame.update_status_bar("Create Tag failed: Repo not ready.") return # Generate suggested tag name and show dialog self.main_frame.update_status_bar("Processing: Generating tag suggestion...") suggested_name: str = self._generate_next_tag_suggestion(svn_path) self.main_frame.update_status_bar("Ready for tag input.") dialog = CreateTagDialog(self.master, suggested_tag_name=suggested_name) tag_info: Optional[tuple[str, str]] = dialog.result # (name, message) or None # If user provided input, start async operation if tag_info: tag_name, tag_message = tag_info log_handler.log_info(f"User provided tag: '{tag_name}'", func_name=func_name) # Prepare args args: tuple = (self.action_handler, svn_path, tag_name, tag_message) # Start async operation self._start_async_operation( worker_func=async_workers.run_create_tag_async, args_tuple=args, context_dict={ "context": "create_tag", "status_msg": f"Creating tag '{tag_name}'", "committed_flag_possible": True # Annotated tag creates commit object } ) else: # User cancelled the dialog log_handler.log_info("Tag creation cancelled.", func_name=func_name) self.main_frame.update_status_bar("Cancelled.") def checkout_tag(self): """ Handles tag checkout: confirms with user, starts async operation. """ func_name: str = "checkout_tag" log_handler.log_info( f"--- Action Triggered: Checkout Tag ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Checkout Tag") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Checkout Tag failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not ready.") self.main_frame.update_status_bar("Checkout Tag failed: Repo not ready.") return # Get selected tag from GUI listbox tag_name: Optional[str] = self.main_frame.get_selected_tag() if not tag_name: # Show error if no tag is selected self.main_frame.show_error( "Selection Error", "No tag selected from the list." ) self.main_frame.update_status_bar("Checkout failed: No tag selected.") return # Confirm with user due to 'detached HEAD' state implications confirmation_message: str = ( f"Checkout tag '{tag_name}'?\n\n" f"Warning: This will put your repository in a 'detached HEAD' state. " f"You can look around, make experimental changes and commit them, " f"but they won't belong to any branch. " f"Use 'Checkout Branch' to return to a branch." ) if not self.main_frame.ask_yes_no("Confirm Checkout Tag", confirmation_message): # User cancelled log_handler.log_info("Tag checkout cancelled by user.", func_name=func_name) self.main_frame.update_status_bar("Cancelled.") return # Prepare args and start async operation args: tuple = (self.action_handler, svn_path, tag_name) self._start_async_operation( worker_func=async_workers.run_checkout_tag_async, args_tuple=args, context_dict={ "context": "checkout_tag", "status_msg": f"Checking out tag '{tag_name}'" } ) def create_branch(self): """ Handles branch creation: shows dialog, starts async operation. """ func_name: str = "create_branch" log_handler.log_info( f"--- Action Triggered: Create Branch ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Create Branch") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Create Branch failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not ready.") self.main_frame.update_status_bar("Create Branch failed: Repo not ready.") return # Show dialog to get new branch name self.main_frame.update_status_bar("Ready for branch name input.") dialog = CreateBranchDialog(self.master) branch_name: Optional[str] = dialog.result # Name or None if cancelled # If user provided a name, start async operation if branch_name: log_handler.log_info( f"User provided branch name: '{branch_name}'", func_name=func_name ) # Prepare args args: tuple = (self.action_handler, svn_path, branch_name) # Start async operation self._start_async_operation( worker_func=async_workers.run_create_branch_async, args_tuple=args, context_dict={ "context": "create_branch", "status_msg": f"Creating branch '{branch_name}'", "new_branch_name": branch_name # Pass name for potential checkout later } ) else: # User cancelled the dialog log_handler.log_info("Branch creation cancelled.", func_name=func_name) self.main_frame.update_status_bar("Cancelled.") def checkout_branch( self, branch_to_checkout: Optional[str] = None, repo_path_override: Optional[str] = None ): """ Handles checkout of an existing local branch. Confirms if triggered by button, starts async operation. Can be called directly with a branch name (e.g., after creation). """ func_name: str = "checkout_branch" target_description: str = branch_to_checkout if branch_to_checkout else "Selected Branch" log_handler.log_info( f"--- Action Triggered: Checkout Branch (Target: {target_description}) ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Determine repository path (use override if provided) svn_path: Optional[str] = repo_path_override or self._get_and_validate_svn_path("Checkout Branch") # Validate repo path and readiness if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Checkout Branch failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not ready.") self.main_frame.update_status_bar("Checkout Branch failed: Repo not ready.") return # Determine target branch and if confirmation is needed target_branch: Optional[str] = branch_to_checkout needs_confirmation: bool = False if not target_branch: # If no branch name passed, get selection from GUI and require confirmation target_branch = self.main_frame.get_selected_branch() needs_confirmation = True # Validate if a branch was determined if not target_branch: self.main_frame.show_error( "Selection Error", "No branch selected from the list." ) self.main_frame.update_status_bar("Checkout failed: No branch selected.") return # Ask for confirmation only if triggered by button press (not directly called) if needs_confirmation: if not self.main_frame.ask_yes_no( "Confirm Checkout Branch", f"Switch to branch '{target_branch}'?" ): log_handler.log_info( "Branch checkout cancelled by user.", func_name=func_name ) self.main_frame.update_status_bar("Cancelled.") return # Prepare args and start async operation args: tuple = (self.action_handler, svn_path, target_branch) self._start_async_operation( worker_func=async_workers.run_checkout_branch_async, args_tuple=args, context_dict={ "context": "checkout_branch", "status_msg": f"Checking out branch '{target_branch}'" } ) def delete_local_branch(self, branch_name: str, force: bool): """ Handles the request to delete a local branch (with confirmation). """ func_name: str = "delete_local_branch" action_description: str = "Force delete" if force else "Delete" log_handler.log_info( f"--- Action Triggered: {action_description} Local Branch '{branch_name}' ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path(f"{action_description} Branch") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( f"{action_description} Branch skipped: Repo not ready.", func_name=func_name ) self.main_frame.update_status_bar(f"{action_description} failed: Repo not ready.") return # --- User Confirmation --- confirm_message: str = f"Are you sure you want to delete the local branch '{branch_name}'?" dialog_title: str = "Confirm Delete Branch" ask_function: Callable = self.main_frame.ask_yes_no # Default confirmation # Add warning for force delete if force: confirm_message += "\n\nWARNING: Force delete will discard any unmerged changes on this branch!" dialog_title = "Confirm Force Delete Branch" # Ask user for confirmation if not ask_function(dialog_title, confirm_message): log_handler.log_info( "Local branch deletion cancelled by user.", func_name=func_name ) self.main_frame.update_status_bar("Delete branch cancelled.") return # --- Start Async Worker --- log_handler.log_info( f"Starting {action_description.lower()} for local branch '{branch_name}'...", func_name=func_name ) # Prepare arguments args: tuple = (self.action_handler, svn_path, branch_name, force) # Start async operation self._start_async_operation( worker_func=async_workers.run_delete_local_branch_async, args_tuple=args, context_dict={ "context": "delete_local_branch", "status_msg": f"{action_description} branch '{branch_name}'", "branch_name": branch_name, # Pass name for result handling "force": force # Pass force flag } ) def merge_local_branch(self, branch_to_merge: str): """ Handles the request to merge a local branch into the current branch. """ func_name: str = "merge_local_branch" log_handler.log_info( f"--- Action Triggered: Merge Local Branch '{branch_to_merge}' ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path(f"Merge Branch '{branch_to_merge}'") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Merge Branch skipped: Repo not ready.", func_name=func_name ) self.main_frame.update_status_bar("Merge failed: Repo not ready.") return # Get current branch for validation and confirmation message current_branch: Optional[str] = None try: current_branch = self.git_commands.get_current_branch_name(svn_path) if not current_branch: raise ValueError("Could not determine the current branch (Detached HEAD?).") if current_branch == branch_to_merge: raise ValueError("Cannot merge a branch into itself.") except (GitCommandError, ValueError) as e: log_handler.log_error(f"Merge aborted during pre-check: {e}", func_name=func_name) self.main_frame.show_error("Merge Error", f"Cannot start merge:\n{e}") self.main_frame.update_status_bar("Merge failed: Pre-check error.") return # --- User Confirmation --- confirm_message: str = f"Merge branch '{branch_to_merge}' into current branch '{current_branch}'?" # Default: allow fast-forward merges without forcing a merge commit no_ff_option: bool = False if not self.main_frame.ask_yes_no("Confirm Merge", confirm_message): log_handler.log_info( "Local branch merge cancelled by user.", func_name=func_name ) self.main_frame.update_status_bar("Merge cancelled.") return # --- Start Async Worker --- log_handler.log_info( f"Starting merge of '{branch_to_merge}' into '{current_branch}'...", func_name=func_name ) # Prepare arguments (pass git_commands for worker's internal checks) args: tuple = ( self.action_handler, self.git_commands, svn_path, branch_to_merge, no_ff_option ) # Start async operation self._start_async_operation( worker_func=async_workers.run_merge_local_branch_async, args_tuple=args, context_dict={ "context": "merge_local_branch", "status_msg": f"Merging '{branch_to_merge}' into '{current_branch}'", "branch_merged_into": current_branch, "branch_merged_from": branch_to_merge, "repo_path": svn_path, # Needed for conflict message } ) def compare_branch_with_current(self, other_branch_ref: str): """ Handles comparing a selected branch (local or remote) with the current branch. """ func_name: str = "compare_branch_with_current" log_handler.log_info( f"--- Action Triggered: Compare Branch '{other_branch_ref}' with Current ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path(f"Compare Branch '{other_branch_ref}'") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Compare Branch skipped: Repo not ready.", func_name=func_name ) self.main_frame.update_status_bar("Compare failed: Repo not ready.") return # Get current branch for comparison current_branch: Optional[str] = None try: current_branch = self.git_commands.get_current_branch_name(svn_path) if not current_branch: raise ValueError("Cannot compare: Currently in detached HEAD state.") # Prevent comparing a branch with itself if current_branch == other_branch_ref: log_handler.log_warning( "Compare Branch skipped: Cannot compare a branch with itself.", func_name=func_name ) self.main_frame.show_info("Compare Info", "Cannot compare a branch with itself.") return except (GitCommandError, ValueError) as e: log_handler.log_error(f"Compare aborted during pre-check: {e}", func_name=func_name) self.main_frame.show_error("Compare Error", f"Cannot start compare:\n{e}") return # --- Start Async Worker --- log_handler.log_info( f"Starting comparison between '{current_branch}' and '{other_branch_ref}'...", func_name=func_name ) # Prepare arguments: ref1 is current, ref2 is the other branch args: tuple = (self.git_commands, svn_path, current_branch, other_branch_ref) # Start async operation self._start_async_operation( worker_func=async_workers.run_compare_branches_async, args_tuple=args, context_dict={ "context": "compare_branches", "status_msg": f"Comparing '{current_branch}' vs '{other_branch_ref}'", "ref1": current_branch, # Pass refs for summary window "ref2": other_branch_ref, "repo_path": svn_path, # Pass repo path for summary window } ) def view_commit_details(self, history_line: str): """ Callback triggered by double-clicking a line in the history view. Extracts the commit hash and starts an async worker to fetch details. """ func_name: str = "view_commit_details" log_handler.log_info( f"--- Action Triggered: View Commit Details for line: '{history_line}' ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("View Commit Details") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "View Commit Details skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) return # --- Estrarre l'Hash del Commit dalla Riga di Log --- # Assumiamo che il formato (%h) sia all'inizio della riga, seguito da spazio commit_hash_short: Optional[str] = None try: parts: List[str] = history_line.split(maxsplit=1) # Divide solo al primo spazio if parts and len(parts[0]) > 0: # Assumendo che l'hash sia la prima parte commit_hash_short = parts[0] # Potremmo validare che sia un hash valido (es. 7+ caratteri esadecimali) if not re.match(r"^[0-9a-fA-F]{7,}$", commit_hash_short): raise ValueError(f"Extracted part '{commit_hash_short}' doesn't look like a commit hash.") else: raise ValueError("Could not split history line to find hash.") log_handler.log_debug(f"Extracted commit hash: {commit_hash_short}", func_name=func_name) except Exception as e: log_handler.log_error( f"Could not extract commit hash from history line '{history_line}': {e}", func_name=func_name ) self.main_frame.show_error( "Parsing Error", f"Could not identify commit hash in selected line:\n{history_line}" ) return # --- Start Async Worker to Get Commit Details --- log_handler.log_info( f"Fetching details for commit '{commit_hash_short}'...", func_name=func_name ) # Prepare arguments for the worker args: tuple = (self.git_commands, svn_path, commit_hash_short) # Start the async operation self._start_async_operation( worker_func=async_workers.run_get_commit_details_async, # NUOVO WORKER args_tuple=args, context_dict={ "context": "get_commit_details", "status_msg": f"Loading details for commit {commit_hash_short}", "commit_hash": commit_hash_short # Passa l'hash nel contesto } ) def refresh_branch_list(self): """ Starts async operation to refresh the local branch list in the GUI. """ func_name: str ="refresh_branch_list" svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Branches") # Check if repo is ready if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_debug( "Refresh Branches skipped: Repo not ready.", func_name=func_name ) self._update_gui_for_not_ready_state() return # Prepare arguments args: tuple = (self.git_commands, svn_path) # Start the async operation self._start_async_operation( worker_func=async_workers.run_refresh_branches_async, # Worker for local branches args_tuple=args, context_dict={"context": "refresh_branches", "status_msg": "Refreshing branches"} ) def refresh_commit_history(self): """ Starts async operation to refresh the commit history display. """ func_name: str ="refresh_commit_history" svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh History") # Check if repo is ready if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_debug( "Refresh History skipped: Repo not ready.", func_name=func_name ) self._update_gui_for_not_ready_state() return # Determine branch filter from GUI branch_filter: Optional[str] = None log_scope: str = "All History" if hasattr(self.main_frame, "history_branch_filter_var"): filter_selection: str = self.main_frame.history_branch_filter_var.get() # Use filter only if a specific branch/tag is selected if filter_selection and filter_selection != "-- All History --": branch_filter = filter_selection log_scope = f"'{branch_filter}'" # Prepare arguments args: tuple = (self.git_commands, svn_path, branch_filter, log_scope) # Start the async operation self._start_async_operation( worker_func=async_workers.run_refresh_history_async, args_tuple=args, context_dict={ "context": "refresh_history", "status_msg": f"Refreshing history for {log_scope}" } ) def refresh_changed_files_list(self): """ Starts async operation to refresh the list of changed files. """ func_name: str ="refresh_changed_files_list" svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Changed Files") # Check if repo is ready if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_debug( "Refresh Changes skipped: Repo not ready.", func_name=func_name ) # GUI update handled by _update_gui_for_not_ready_state if needed return # Prepare arguments args: tuple = (self.git_commands, svn_path) # Start the async operation self._start_async_operation( worker_func=async_workers.run_refresh_changes_async, args_tuple=args, context_dict={ "context": "refresh_changes", "status_msg": "Refreshing changed files" } ) # --- Remote Action Launchers --- def apply_remote_config(self): """ Callback for 'Apply Config' button. Starts async worker. """ func_name: str = "apply_remote_config" log_handler.log_info( f"--- Action Triggered: Apply Remote Config ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): log_handler.log_error( "Cannot apply config: Main frame missing.", func_name=func_name ) return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Apply Remote Config") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Apply config skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) self.main_frame.update_status_bar("Apply config failed: Repo not ready.") return # Get remote URL and name from GUI remote_url: str = self.main_frame.remote_url_var.get().strip() remote_name: str = self.main_frame.remote_name_var.get().strip() # Validate remote URL if not remote_url: log_handler.log_warning( "Apply config failed: Remote URL is empty.", func_name=func_name ) self.main_frame.show_error("Input Error", "Remote URL cannot be empty.") self.main_frame.update_status_bar("Apply config failed: URL empty.") return # Use default remote name if empty if not remote_name: remote_name = DEFAULT_REMOTE_NAME log_handler.log_info( f"Remote name empty, using default: '{remote_name}'", func_name=func_name ) self.main_frame.remote_name_var.set(remote_name) # Save profile settings BEFORE applying to Git config if not self.save_profile_settings(): if not self.main_frame.ask_yes_no( "Warning", "Could not save profile settings.\nContinue applying remote config anyway?" ): self.main_frame.update_status_bar( "Apply config cancelled (profile save failed)." ) return # Prepare args and start async operation args: tuple = (self.remote_action_handler, svn_path, remote_name, remote_url) self._start_async_operation( worker_func=async_workers.run_apply_remote_config_async, args_tuple=args, context_dict={ "context": "apply_remote_config", "status_msg": f"Applying config for remote '{remote_name}'" } ) def check_connection_auth(self): """ Callback for 'Check Connection & Auth' button. """ func_name: str = "check_connection_auth" log_handler.log_info( f"--- Action Triggered: Check Connection & Auth ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Check Connection") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Check Connection skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) # Reset auth status indicator if repo not ready self._update_gui_auth_status("unknown") return # Get remote name, using default if empty remote_name: str = self.main_frame.remote_name_var.get().strip() if not remote_name: remote_name = DEFAULT_REMOTE_NAME self.main_frame.remote_name_var.set(remote_name) log_handler.log_info( f"Checking connection/auth for remote '{remote_name}'...", func_name=func_name ) # Update GUI indicator to 'checking' state self._update_gui_auth_status("checking") # Prepare args and start async operation args: tuple = (self.git_commands, svn_path, remote_name) self._start_async_operation( worker_func=async_workers.run_check_connection_async, args_tuple=args, context_dict={ "context": "check_connection", "status_msg": f"Checking remote '{remote_name}'", "remote_name_checked": remote_name, # Pass context for result handler "repo_path_checked": svn_path, } ) def fetch_remote(self): """ Starts the asynchronous 'git fetch' operation. """ func_name: str = "fetch_remote" log_handler.log_info( f"--- Action Triggered: Fetch Remote ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Fetch Remote") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Fetch Remote skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) self.main_frame.update_status_bar("Fetch failed: Repo not ready.") return # Get remote name, using default if empty remote_name: str = self.main_frame.remote_name_var.get().strip() if not remote_name: remote_name = DEFAULT_REMOTE_NAME self.main_frame.remote_name_var.set(remote_name) # Optional: Check auth status before fetching (consider if needed) # if self.remote_auth_status != 'ok': ... (warning/confirmation) ... log_handler.log_info( f"Starting fetch for remote '{remote_name}'...", func_name=func_name ) # Prepare args and start async operation args: tuple = (self.remote_action_handler, svn_path, remote_name) self._start_async_operation( worker_func=async_workers.run_fetch_remote_async, args_tuple=args, context_dict={ "context": "fetch_remote", "status_msg": f"Fetching from remote '{remote_name}'" } ) def pull_remote(self): """ Starts the asynchronous 'git pull' operation for the current branch. """ func_name: str = "pull_remote" log_handler.log_info( f"--- Action Triggered: Pull Remote ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Pull Remote") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Pull Remote skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) self.main_frame.update_status_bar("Pull failed: Repo not ready.") return # Get remote name, using default if empty remote_name: str = self.main_frame.remote_name_var.get().strip() if not remote_name: remote_name = DEFAULT_REMOTE_NAME self.main_frame.remote_name_var.set(remote_name) # Check authentication/connection status before attempting pull if self.remote_auth_status != "ok": auth_msg: str = f"Cannot Pull from '{remote_name}':\n" if self.remote_auth_status == "required": auth_msg += "Authentication is required. Use 'Check Connection / Auth' first." elif self.remote_auth_status == "failed": auth_msg += "Authentication previously failed. Use 'Check Connection / Auth' to retry." elif self.remote_auth_status == "connection_failed": auth_msg += "Connection previously failed. Check URL and network." else: auth_msg += "Connection status is unknown or in error. Use 'Check Connection / Auth' first." log_handler.log_warning(f"Pull Remote skipped: Auth/Connection status is '{self.remote_auth_status}'.", func_name=func_name) self.main_frame.show_warning("Action Blocked", auth_msg) self.main_frame.update_status_bar(f"Pull failed: {self.remote_auth_status}") return # Worker will get current branch name internally log_handler.log_info( f"Starting pull for remote '{remote_name}'...", func_name=func_name ) # Prepare args (pass GitCommands for the worker to get current branch) args: tuple = (self.remote_action_handler, self.git_commands, svn_path, remote_name) # Start async operation self._start_async_operation( worker_func=async_workers.run_pull_remote_async, args_tuple=args, context_dict={ "context": "pull_remote", "status_msg": f"Pulling from remote '{remote_name}'", "repo_path": svn_path, # Pass context for conflict messages "remote_name": remote_name, } ) def push_remote(self): """ Starts the asynchronous 'git push' operation for the current branch. """ func_name: str = "push_remote" log_handler.log_info( f"--- Action Triggered: Push Branch to Remote ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Push Branch") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Push Branch skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) self.main_frame.update_status_bar("Push failed: Repo not ready.") return # Get remote name, using default if empty remote_name: str = self.main_frame.remote_name_var.get().strip() if not remote_name: remote_name = DEFAULT_REMOTE_NAME self.main_frame.remote_name_var.set(remote_name) # Check authentication/connection status before attempting push if self.remote_auth_status != "ok": auth_msg: str = f"Cannot Push to '{remote_name}':\n" # ... (Build specific auth message as in pull_remote) ... if self.remote_auth_status == "required": auth_msg += "Authentication is required..." # Shortened for brevity elif self.remote_auth_status == "failed": auth_msg += "Authentication previously failed..." elif self.remote_auth_status == "connection_failed": auth_msg += "Connection previously failed..." else: auth_msg += "Connection status is unknown or in error..." log_handler.log_warning( f"Push Remote skipped: Auth/Connection status is '{self.remote_auth_status}'.", func_name=func_name ) self.main_frame.show_warning("Action Blocked", auth_msg) self.main_frame.update_status_bar(f"Push failed: {self.remote_auth_status}") return # Optional: Check for uncommitted changes before push try: if self.git_commands.git_status_has_changes(svn_path): if not self.main_frame.ask_yes_no( "Uncommitted Changes", "There are uncommitted changes in your working directory.\n" "Push anyway? (Only committed changes will be pushed)" ): self.main_frame.update_status_bar( "Push cancelled by user (uncommitted changes)." ) return except GitCommandError as status_err: # Handle error during status check log_handler.log_error( f"Push aborted: Failed to check repository status before push: {status_err}", func_name=func_name ) self.main_frame.show_error( "Status Error", f"Could not check repo status:\n{status_err}" ) return log_handler.log_info( f"Starting push for current branch to remote '{remote_name}'...", func_name=func_name ) # Worker will get current branch name # Prepare args (pass GitCommands for the worker) args: tuple = (self.remote_action_handler, self.git_commands, svn_path, remote_name) # Start async operation self._start_async_operation( worker_func=async_workers.run_push_remote_async, args_tuple=args, context_dict={ "context": "push_remote", "status_msg": f"Pushing current branch to remote '{remote_name}'", "remote_name": remote_name, # Pass context for result messages } ) def push_tags_remote(self): """ Starts the asynchronous 'git push --tags' operation. """ func_name: str = "push_tags_remote" log_handler.log_info( f"--- Action Triggered: Push Tags to Remote ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Push Tags") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Push Tags skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) self.main_frame.update_status_bar("Push tags failed: Repo not ready.") return # Get remote name, using default if empty remote_name: str = self.main_frame.remote_name_var.get().strip() if not remote_name: remote_name = DEFAULT_REMOTE_NAME self.main_frame.remote_name_var.set(remote_name) # Check authentication/connection status if self.remote_auth_status != "ok": auth_msg: str = f"Cannot Push Tags to '{remote_name}':\n" # ... (Build specific auth message as in push_remote) ... log_handler.log_warning( f"Push Tags skipped: Auth/Connection status is '{self.remote_auth_status}'.", func_name=func_name ) self.main_frame.show_warning("Action Blocked", auth_msg) self.main_frame.update_status_bar(f"Push tags failed: {self.remote_auth_status}") return # Confirm with user before pushing all local tags if not self.main_frame.ask_yes_no( "Confirm Push Tags", f"Push all local tags to remote '{remote_name}'?\n" f"(Existing tags on the remote with the same name will " f"NOT be overwritten unless forced, which this action does not do)." ): self.main_frame.update_status_bar("Push tags cancelled by user.") return log_handler.log_info( f"Starting push tags to remote '{remote_name}'...", func_name=func_name ) # Prepare args args: tuple = (self.remote_action_handler, svn_path, remote_name) # Start async operation self._start_async_operation( worker_func=async_workers.run_push_tags_async, args_tuple=args, context_dict={ "context": "push_tags_remote", "status_msg": f"Pushing tags to remote '{remote_name}'", "remote_name": remote_name, # Pass context for messages } ) def clone_remote_repo(self): """ Handles the 'Clone from Remote...' action: shows dialog, validates, starts worker. """ func_name: str = "clone_remote_repo" log_handler.log_info( f"--- Action Triggered: Clone Remote Repository ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): log_handler.log_error( "Cannot start clone: Main frame not available.", func_name=func_name ) return # Show modal dialog to get clone parameters dialog = CloneFromRemoteDialog(self.master) # Result is None if cancelled, or (url, parent_dir, profile_name_input) dialog_result: Optional[tuple[str, str, str]] = dialog.result if not dialog_result: log_handler.log_info( "Clone operation cancelled by user in dialog.", func_name=func_name ) self.main_frame.update_status_bar("Clone cancelled.") return # Extract data from dialog result remote_url, local_parent_dir, profile_name_input = dialog_result # --- Derive target directory and profile name, validate paths --- final_profile_name: str = "" target_clone_dir: str = "" repo_name_from_url: str = "" try: # Derive repo name from URL (remove .git suffix) repo_name_from_url = os.path.basename(remote_url) if repo_name_from_url.endswith(".git"): repo_name_from_url = repo_name_from_url[:-4] if not repo_name_from_url: raise ValueError("Could not derive repository name from URL.") # Construct full target path for the clone target_clone_dir = os.path.join(local_parent_dir, repo_name_from_url) # Normalize the path target_clone_dir = os.path.abspath(target_clone_dir) # Determine final profile name (use input or derive from repo name) if profile_name_input: final_profile_name = profile_name_input # Check if proposed profile name already exists if final_profile_name in self.config_manager.get_profile_sections(): raise ValueError( f"Profile name '{final_profile_name}' already exists. " f"Please choose a different name." ) else: # Use repo name as base, add counter if it exists final_profile_name = repo_name_from_url counter: int = 1 while final_profile_name in self.config_manager.get_profile_sections(): final_profile_name = f"{repo_name_from_url}_{counter}" counter += 1 log_handler.log_debug( f"Derived target clone directory: {target_clone_dir}", func_name=func_name ) log_handler.log_debug( f"Determined profile name: {final_profile_name}", func_name=func_name ) # --- CRITICAL CHECK: Target directory must NOT exist --- if os.path.exists(target_clone_dir): error_msg: str = ( f"Clone failed: Target directory already exists:\n{target_clone_dir}\n" f"Please choose a different parent directory or ensure the target is clear." ) log_handler.log_error(error_msg, func_name=func_name) self.main_frame.show_error("Clone Path Error", error_msg) self.main_frame.update_status_bar("Clone failed: Target directory exists.") return # Stop the operation except ValueError as ve: # Handle errors deriving names or validating profile name log_handler.log_error( f"Clone configuration error: {ve}", func_name=func_name ) self.main_frame.show_error("Configuration Error", str(ve)) self.main_frame.update_status_bar("Clone failed: Configuration error.") return except Exception as e: # Handle unexpected errors during preparation log_handler.log_exception( f"Unexpected error preparing for clone: {e}", func_name=func_name ) self.main_frame.show_error( "Internal Error", f"An unexpected error occurred:\n{e}" ) self.main_frame.update_status_bar("Clone failed: Internal error.") return # --- Start Asynchronous Worker --- log_handler.log_info( f"Starting clone for '{remote_url}' into '{target_clone_dir}'...", func_name=func_name ) # Prepare arguments args: tuple = (self.git_commands, remote_url, target_clone_dir, final_profile_name) # Start async operation self._start_async_operation( worker_func=async_workers.run_clone_remote_async, args_tuple=args, context_dict={ "context": "clone_remote", "status_msg": f"Cloning '{repo_name_from_url}'...", # Use repo name "clone_success_data": { # Pass data needed for profile creation "profile_name": final_profile_name, "cloned_path": target_clone_dir, "remote_url": remote_url, } } ) def refresh_remote_status(self): """ Starts the asynchronous check for ahead/behind status of the current branch against its upstream counterpart. """ func_name: str = "refresh_remote_status" log_handler.log_info( f"--- Action Triggered: Refresh Remote Sync Status ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Sync Status") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Refresh Status skipped: Repo not ready.", func_name=func_name ) # Update GUI to show 'not ready' state self._update_gui_for_not_ready_state() return # --- Get Current Branch and Upstream --- current_branch: Optional[str] = None upstream_branch: Optional[str] = None try: # Get the name of the current local branch current_branch = self.git_commands.get_current_branch_name(svn_path) if current_branch: # If on a branch, get its configured upstream upstream_branch = self.git_commands.get_branch_upstream( svn_path, current_branch ) else: # Handle detached HEAD state log_handler.log_warning( "Refresh Status: Cannot get status, currently in detached HEAD state.", func_name=func_name ) self._update_gui_for_detached_head(current_branch) return # Cannot check sync status in detached HEAD # Handle case where upstream is not configured if not upstream_branch: log_handler.log_info( f"Refresh Status: No upstream configured for branch '{current_branch}'.", func_name=func_name ) self._update_gui_for_no_upstream(current_branch) return # Cannot check sync status without upstream # Enable refresh button if we have branch and upstream (might be disabled) if hasattr(self.main_frame, "refresh_sync_status_button"): self.main_frame.refresh_sync_status_button.config(state=tk.NORMAL) except Exception as e: # Handle errors getting branch/upstream info log_handler.log_exception( f"Error getting branch/upstream before status check: {e}", func_name=func_name ) self._update_gui_for_status_error() # Update GUI to show error state return # --- Start Async Worker --- log_handler.log_info( f"Checking ahead/behind status for '{current_branch}' vs '{upstream_branch}'...", func_name=func_name ) # Update GUI label to "Checking..." if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status( current_branch=current_branch, status_text="Sync Status: Checking..." ) # Prepare arguments for the worker args: tuple = (self.git_commands, svn_path, current_branch, upstream_branch) # Start async operation self._start_async_operation( worker_func=async_workers.run_get_ahead_behind_async, # Worker function args_tuple=args, context_dict={ "context": "get_ahead_behind", "status_msg": f"Checking sync status for '{current_branch}'", "local_branch": current_branch, # Pass context for result handler "upstream_branch": upstream_branch, } ) def refresh_branch_list(self): """ Starts async operation to refresh the local branch list in the GUI. """ func_name: str ="refresh_branch_list" svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Branches") # Check if repo is ready if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_debug( "Refresh Branches skipped: Repo not ready.", func_name=func_name ) self._update_gui_for_not_ready_state() return # Prepare arguments args: tuple = (self.git_commands, svn_path) # Start the async operation self._start_async_operation( worker_func=async_workers.run_refresh_branches_async, # Worker for local branches args_tuple=args, context_dict={"context": "refresh_branches", "status_msg": "Refreshing branches"} ) def refresh_commit_history(self): """ Starts async operation to refresh the commit history display. """ func_name: str ="refresh_commit_history" svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh History") # Check if repo is ready if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_debug( "Refresh History skipped: Repo not ready.", func_name=func_name ) self._update_gui_for_not_ready_state() return # Determine branch filter from GUI branch_filter: Optional[str] = None log_scope: str = "All History" if hasattr(self.main_frame, "history_branch_filter_var"): filter_selection: str = self.main_frame.history_branch_filter_var.get() # Use filter only if a specific branch/tag is selected if filter_selection and filter_selection != "-- All History --": branch_filter = filter_selection log_scope = f"'{branch_filter}'" # Prepare arguments args: tuple = (self.git_commands, svn_path, branch_filter, log_scope) # Start the async operation self._start_async_operation( worker_func=async_workers.run_refresh_history_async, args_tuple=args, context_dict={ "context": "refresh_history", "status_msg": f"Refreshing history for {log_scope}" } ) def refresh_changed_files_list(self): """ Starts async operation to refresh the list of changed files. """ func_name: str ="refresh_changed_files_list" svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Changed Files") # Check if repo is ready if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_debug( "Refresh Changes skipped: Repo not ready.", func_name=func_name ) # GUI update handled by _update_gui_for_not_ready_state if needed return # Prepare arguments args: tuple = (self.git_commands, svn_path) # Start the async operation self._start_async_operation( worker_func=async_workers.run_refresh_changes_async, args_tuple=args, context_dict={ "context": "refresh_changes", "status_msg": "Refreshing changed files" } ) def refresh_remote_branches(self): """ Starts the asynchronous refresh of the remote branches list. """ func_name: str = "refresh_remote_branches" log_handler.log_info( f"--- Action Triggered: Refresh Remote Branches List ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Remote Branches") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Refresh Remote Branches skipped: Repo not ready.", func_name=func_name ) self._update_gui_for_not_ready_state() # Update GUI lists return # Get remote name, using default if empty remote_name: str = self.main_frame.remote_name_var.get().strip() if not remote_name: remote_name = DEFAULT_REMOTE_NAME self.main_frame.remote_name_var.set(remote_name) log_handler.log_info( f"Starting refresh of remote branches list for '{remote_name}'...", func_name=func_name ) # Update GUI list to show "Loading..." if hasattr(self.main_frame, "update_remote_branches_list"): self.main_frame.update_remote_branches_list(["(Loading...)"]) # Prepare args and start async operation args: tuple = (self.git_commands, svn_path, remote_name) self._start_async_operation( worker_func=async_workers.run_refresh_remote_branches_async, args_tuple=args, context_dict={ "context": "refresh_remote_branches", "status_msg": f"Refreshing remote branches for '{remote_name}'" } ) def checkout_remote_branch_as_local( self, remote_branch_full: str, local_branch_suggestion: str ): """ Handles checkout of a remote branch as a new local tracking branch. Checks if local branch exists, confirms if needed, starts async worker. """ func_name: str = "checkout_remote_branch_as_local" log_handler.log_info( f"--- Action Triggered: Checkout Remote Branch '{remote_branch_full}' as Local '{local_branch_suggestion}' ---", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Checkout Remote Branch") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Checkout Remote Branch skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) # No status bar update here as it's usually from a menu click return try: # Check if a local branch with the suggested name already exists local_branches: list[str] current: Optional[str] local_branches, current = self.git_commands.list_branches(svn_path) if local_branch_suggestion in local_branches: # If it exists, ask user if they want to checkout the existing one instead log_handler.log_warning( f"Local branch '{local_branch_suggestion}' already exists.", func_name=func_name ) if self.main_frame.ask_yes_no( "Branch Exists", f"A local branch named '{local_branch_suggestion}' already exists.\n\n" f"Do you want to check out the existing local branch instead?" ): # User wants to checkout existing local branch log_handler.log_info( f"User chose to checkout existing local branch '{local_branch_suggestion}'.", func_name=func_name ) # Call the standard checkout function, passing the path override self.checkout_branch( branch_to_checkout=local_branch_suggestion, repo_path_override=svn_path ) else: # User cancelled log_handler.log_info( "Checkout cancelled because local branch exists.", func_name=func_name ) self.main_frame.update_status_bar("Checkout cancelled.") # Exit in either case (another async task started or cancelled) return # If local branch doesn't exist, proceed to create tracking branch log_handler.log_info( f"Starting checkout of '{remote_branch_full}' as new local branch '{local_branch_suggestion}'...", func_name=func_name ) # Prepare args and start async operation args: tuple = ( self.action_handler, svn_path, local_branch_suggestion, remote_branch_full, ) self._start_async_operation( worker_func=async_workers.run_checkout_tracking_branch_async, args_tuple=args, context_dict={ "context": "checkout_tracking_branch", "status_msg": f"Checking out '{local_branch_suggestion}' tracking '{remote_branch_full}'" } ) except Exception as e: # Handle errors during local branch check or starting the worker log_handler.log_exception( f"Error preparing for tracking branch checkout: {e}", func_name=func_name ) if hasattr(self, "main_frame"): self.main_frame.show_error( "Checkout Error", f"Could not start checkout operation:\n{e}" ) self.main_frame.update_status_bar("Checkout failed: Internal error.") # --- Simplified Queue Checking Method (Refactored) --- def _check_completion_queue(self, results_queue: queue.Queue, context: dict): """ Checks result queue, updates status bar briefly, delegates processing. """ task_context: str = context.get('context', 'unknown') func_name: str = "_check_completion_queue" try: # Get result without blocking result_data: Dict[str, Any] = results_queue.get_nowait() log_handler.log_info( f"Result received for '{task_context}'. Status: {result_data.get('status')}", func_name=func_name ) # Determine if widgets should be re-enabled immediately should_reenable_now: bool = self._should_reenable_widgets_now( task_context, result_data.get('status') ) if not should_reenable_now: log_handler.log_debug( f"Postponing widget re-enable for context: {task_context}", func_name=func_name ) # Re-enable widgets now if appropriate if should_reenable_now: self._reenable_widgets_if_ready() # Update status bar with generic message (may be overridden by handler) self._update_status_bar_from_result(task_context, result_data) # Process the result using the dedicated handler self._process_result_with_handler(result_data, context) except queue.Empty: # Queue empty, reschedule check self._reschedule_queue_check(results_queue, context) except Exception as e: # Critical error during queue check or basic handling self._handle_queue_check_error(e, task_context) # --- Helper methods for _check_completion_queue --- def _should_reenable_widgets_now(self, task_context: str, status: Optional[str]) -> bool: """ Determines if GUI widgets should be re-enabled immediately. """ # Don't re-enable immediately if waiting for user interaction or follow-up task if task_context == "check_connection" and status == 'auth_required': return False if task_context == "interactive_auth" and status == 'success': return False if task_context == 'clone_remote' and status == 'success': return False if task_context in ['checkout_tracking_branch', 'checkout_branch', 'checkout_tag'] and status == 'success': return False if task_context in ['pull_remote', 'merge_local_branch'] and status == 'conflict': return False if task_context == 'compare_branches' and status == 'success': return False # Default: Re-enable immediately return True def _reenable_widgets_if_ready(self): """ Safely re-enables action widgets if the main frame exists. """ if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): log_handler.log_debug( "Re-enabling widgets now.", func_name="_reenable_widgets_if_ready" ) self.main_frame.set_action_widgets_state(tk.NORMAL) else: log_handler.log_warning( "Cannot re-enable widgets, MainFrame missing.", func_name="_reenable_widgets_if_ready" ) def _update_status_bar_from_result(self, task_context: str, result_data: dict): """ Updates status bar based on result, unless specific conditions apply. """ status: Optional[str] = result_data.get('status') message: str = result_data.get('message', "Operation finished.") # Conditions where status bar update is skipped or handled differently skip_update: bool = False if (task_context == 'clone_remote' and status == 'success'): skip_update = True if (task_context in ['checkout_tracking_branch', 'checkout_branch', 'checkout_tag'] and status == 'success'): skip_update = True if (task_context == 'compare_branches' and status == 'success'): skip_update = True if status in ['conflict', 'rejected']: skip_update = True # Update status bar if not skipped and main frame exists if not skip_update and hasattr(self, "main_frame") and self.main_frame.winfo_exists(): status_color: Optional[str] = None reset_duration: int = 5000 # Default reset time # Determine color and duration based on status if status == 'success': status_color = self.main_frame.STATUS_GREEN elif status == 'warning': status_color = self.main_frame.STATUS_YELLOW; reset_duration = 7000 elif status == 'auth_required': status_color = self.main_frame.STATUS_YELLOW; reset_duration = 15000 elif status == 'conflict': status_color = self.main_frame.STATUS_RED; reset_duration = 15000 elif status == 'rejected': status_color = self.main_frame.STATUS_RED; reset_duration = 15000 elif status == 'error': status_color = self.main_frame.STATUS_RED; reset_duration = 10000 # Call the GUI update method self.main_frame.update_status_bar( message, bg_color=status_color, duration_ms=reset_duration ) def _process_result_with_handler(self, result_data: dict, context: dict): """ Instantiates and calls the AsyncResultHandler to process the result. """ task_context: str = context.get('context', 'unknown') func_name: str = "_process_result_with_handler" try: # Create handler instance, passing the current app instance result_handler = AsyncResultHandler(self) # Delegate detailed processing result_handler.process(result_data, context) log_handler.log_debug( f"Result processing delegated to handler for context '{task_context}'.", func_name=func_name ) except Exception as handler_e: # Handle errors occurring *within* the result handler log_handler.log_exception( f"Error during result processing by handler for {task_context}: {handler_e}", func_name=func_name ) # Show error to user if hasattr(self, "main_frame"): self.main_frame.show_error( "Processing Error", f"Failed to handle task result:\n{handler_e}" ) # Ensure widgets are re-enabled if handler fails unexpectedly if not self._should_reenable_widgets_now(task_context, result_data.get('status')) and \ hasattr(self.main_frame, "winfo_exists") and self.main_frame.winfo_exists(): log_handler.log_warning( "Re-enabling widgets after handler error.", func_name=func_name ) self.main_frame.set_action_widgets_state(tk.NORMAL) def _reschedule_queue_check(self, results_queue: queue.Queue, context: dict): """ Reschedules the check for the completion queue if the app is running. """ if hasattr(self, "master") and self.master.winfo_exists(): self.master.after( self.ASYNC_QUEUE_CHECK_INTERVAL_MS, self._check_completion_queue, results_queue, context ) def _handle_queue_check_error(self, error: Exception, task_context: str): """ Handles critical errors during the queue check process itself. """ func_name: str = "_handle_queue_check_error" log_handler.log_exception( f"Critical error checking completion queue for {task_context}: {error}", func_name=func_name ) try: # Attempt GUI recovery if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): # Re-enable widgets self.main_frame.set_action_widgets_state(tk.NORMAL) # Show error in status bar self.main_frame.update_status_bar( "Error processing async result.", bg_color=self.main_frame.STATUS_RED, duration_ms=10000 ) # Reset other relevant GUI states (e.g., sync status) self._update_gui_for_status_error() except Exception as recovery_e: # Log error during recovery attempt log_handler.log_error( f"Failed to recover GUI after queue processing error: {recovery_e}", func_name=func_name ) def _handle_gitignore_save(self): """ Callback executed after .gitignore is saved successfully by the editor. Starts an asynchronous task to check for and untrack files if necessary. """ func_name: str = "_handle_gitignore_save" log_handler.log_info( "Callback: .gitignore saved. Starting async untrack check.", func_name=func_name ) # Ensure main frame exists if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Validate repo path and readiness svn_path: Optional[str] = self._get_and_validate_svn_path("Untrack Check after Gitignore Save") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_error( "Cannot start untrack check: Invalid/Not ready path.", func_name=func_name ) self.main_frame.update_status_bar( "Error: Untrack check failed (invalid path)." ) return # Prepare args and start the untrack worker args: tuple = (self.action_handler, svn_path) self._start_async_operation( worker_func=async_workers.run_untrack_async, args_tuple=args, context_dict={ "context": "_handle_gitignore_save", # Context identifies origin "status_msg": "Checking files to untrack", "committed_flag_possible": True # Untracking involves a commit } ) def _open_commit_file_diff( self, commit_hash: str, file_status: str, file_path: str, old_file_path: Optional[str] = None ): """ Opens the DiffViewerWindow to show the changes for a specific file within a given commit compared to its parent. Args: commit_hash (str): The full hash of the commit being viewed. file_status (str): The status character (A, M, D, R, T...). file_path (str): The relative path of the file in the commit. old_file_path (Optional[str]): The old path (for Renamed files). """ func_name: str = "_open_commit_file_diff" log_handler.log_info( f"Requesting diff for file '{file_path}' in commit '{commit_hash[:7]}'", func_name=func_name ) # Validazione repo path corrente svn_path: Optional[str] = self._get_and_validate_svn_path("Open Commit File Diff") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_error( "Cannot open diff: Repository path invalid or not ready.", func_name=func_name ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error("Error", "Cannot open diff: Repository not available.") return # Determina i riferimenti per il diff viewer ref1: str = f"{commit_hash}^" # Genitore del commit ref2: str = commit_hash # Il commit stesso # --- Gestione Casi Speciali --- # 1. File Aggiunto (A) o Copiato (C): Confronta con "stato vuoto" (come fare?) # Git diff -- mostra l'intero file come aggiunto. # Possiamo passare i ref standard, DiffViewerWindow lo mostrerà correttamente. # 2. File Cancellato (D): Confronta il file nel parent con "stato vuoto". # DiffViewerWindow dovrebbe mostrare il file come cancellato. # Per Deleted files, il path da usare è quello *prima* della cancellazione. # 3. File Rinominato (R): Dobbiamo usare old_file_path per ref1 e file_path per ref2? # No, `git diff commit^ commit -- file` gestisce la rinomina. Usiamo il *nuovo* path. # 4. Commit Iniziale (Root Commit): Non ha genitore (`commit^` fallisce). # Dobbiamo rilevarlo. Possiamo provare `git rev-parse commit^`. Se fallisce, # è un root commit. In tal caso, mostriamo il file solo in ref2 (vs "empty"). # Per semplicità iniziale, potremmo anche solo mostrare un errore se `git show` # per `commit^:path` fallisce. path_to_diff: str = file_path if file_status == 'D' and old_file_path: # Per file cancellati, in realtà vogliamo vedere il contenuto *prima* # ma il diff tra parent e commit con il path nuovo non funziona. # È più corretto mostrare il file come esisteva nel parent. # Potremmo aprire DiffViewer in modo speciale solo con ref1? # O semplicemente non permettere il diff per file cancellati in questa vista? # Per ora, mostriamo un messaggio e non apriamo il diff per 'D'. log_handler.log_info( f"Diff view skipped for deleted file '{file_path}' in commit '{commit_hash[:7]}'.", func_name=func_name ) if hasattr(self.main_frame, "show_info"): self.main_frame.show_info( "Diff Not Applicable", f"Cannot show diff for file deleted in this commit:\n{file_path}" ) return # --- Apri DiffViewerWindow --- log_handler.log_debug( f"Opening DiffViewerWindow: Ref1='{ref1}', Ref2='{ref2}', Path='{path_to_diff}'", func_name=func_name ) try: # Istanzia e mostra la finestra di diff modale DiffViewerWindow( master=self.master, # Usa la finestra root come parent git_commands=self.git_commands, repo_path=svn_path, relative_file_path=path_to_diff, # Usa il path corretto ref1=ref1, # Commit genitore ref2=ref2 # Commit selezionato ) log_handler.log_debug("Commit file diff viewer closed.", func_name=func_name) # Non serve aggiornare status bar qui, la finestra CommitDetail è ancora aperta except Exception as e_diff: log_handler.log_exception( f"Error opening commit file diff viewer: {e_diff}", func_name=func_name ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error( "Diff Viewer Error", f"Could not display file changes:\n{e_diff}" ) # --- Application Entry Point --- # Questa parte verrà spostata in __main__.py # if __name__ == "__main__": # # Il codice per avviare l'app ora è in gitsync_tool/__main__.py # pass # --- END OF FILE gitsync_tool/app.py ---