# --- FILE: GitUtility.py --- import os import datetime import tkinter as tk from tkinter import messagebox, filedialog # Nota: Non importiamo più logging qui direttamente per la configurazione, # ma potremmo averne bisogno per i livelli (logging.INFO, etc.) se non già in log_handler import logging # Mantenuto per i livelli usati in alcuni check/log import re import threading import queue import traceback # Per log eccezioni in main import sys # Per fallback print # Import application modules try: from config_manager import ( ConfigManager, DEFAULT_PROFILE, DEFAULT_BACKUP_DIR, DEFAULT_REMOTE_NAME, ) from action_handler import ActionHandler from backup_handler import BackupHandler from git_commands import GitCommands, GitCommandError # Importa la nuova gestione log basata su coda import log_handler # Importa solo la funzione per configurare il file logger from logger_config import setup_file_logging # Importa GUI from gui import ( MainFrame, GitignoreEditorWindow, CreateTagDialog, CreateBranchDialog, ) from diff_viewer import DiffViewerWindow # --- NUOVI IMPORT --- from remote_actions import RemoteActionHandler # Handler per azioni remote import async_workers # Modulo con le funzioni worker asincrone # --- FINE NUOVI IMPORT --- except ImportError as e: critical_msg = f"Critical Error: Failed to import required application modules: {e}" print(f"FATAL IMPORT ERROR: {critical_msg}") try: # Tentativo di mostrare errore grafico come fallback root = tk.Tk() root.withdraw() messagebox.showerror( "Startup Error", f"Failed to load components:\n{e}\n\nApplication cannot start.", ) root.destroy() except Exception: pass # Ignora errori nel fallback grafico sys.exit(1) # Esce comunque class GitSvnSyncApp: """ Main application controller class for the Git Sync Tool. Orchestrates GUI and backend actions using asynchronous operations and a centralized logging queue. Initializes and connects components. """ LOG_QUEUE_CHECK_INTERVAL_MS = 100 # Poll log queue every 100ms ASYNC_QUEUE_CHECK_INTERVAL_MS = 100 # Poll result queues every 100ms def __init__(self, master: tk.Tk): """Initializes the application components and GUI.""" self.master = master master.title("Git Sync Tool (Bundle & Remote Manager)") # Titolo aggiornato master.protocol("WM_DELETE_WINDOW", self.on_closing) # --- Log iniziale (console) --- print("Initializing GitSvnSyncApp...") log_handler.log_debug( "GitSvnSyncApp initialization started.", func_name="__init__" ) # --- Initialize Core Components --- # Componenti core vengono creati qui e passati dove necessario try: self.config_manager = ConfigManager(None) self.git_commands = GitCommands(None) self.backup_handler = BackupHandler(None) # ActionHandler per operazioni locali, bundle, backup self.action_handler = ActionHandler(self.git_commands, self.backup_handler) # RemoteActionHandler per operazioni remote self.remote_action_handler = RemoteActionHandler(self.git_commands) self.remote_auth_status = "unknown" print("Core components initialized.") log_handler.log_debug( "Core components initialized successfully.", func_name="__init__" ) except Exception as e: print(f"FATAL: Failed to initialize core components: {e}") log_handler.log_critical( f"Failed to initialize core components: {e}", func_name="__init__" ) # Mostra errore fatale e termina self.show_fatal_error( f"Initialization Error:\n{e}\n\nApplication cannot start." ) # È importante uscire o non procedere se i componenti core falliscono # Potremmo sollevare un'eccezione qui o chiamare self.on_closing() self.master.after(10, self.on_closing) # Tenta chiusura pulita return # Interrompe __init__ # --- Initialize GUI --- try: print("Creating MainFrame GUI...") log_handler.log_debug("Creating MainFrame GUI.", func_name="__init__") self.main_frame = MainFrame( master, # === Callbacks collegati ai metodi di questa classe === # Profile & Config load_profile_settings_cb=self.load_profile_settings, save_profile_cb=self.save_profile_settings, add_profile_cb=self.add_profile, remove_profile_cb=self.remove_profile, # Paths browse_folder_cb=self.browse_folder, update_svn_status_cb=self.update_svn_status_indicator, # Repo Locale / Bundle / Backup prepare_svn_for_git_cb=self.prepare_svn_for_git, create_git_bundle_cb=self.create_git_bundle, fetch_from_git_bundle_cb=self.fetch_from_git_bundle, manual_backup_cb=self.manual_backup, # Gitignore open_gitignore_editor_cb=self.open_gitignore_editor, # Commit / Changes commit_changes_cb=self.commit_changes, refresh_changed_files_cb=self.refresh_changed_files_list, open_diff_viewer_cb=self.open_diff_viewer, add_selected_file_cb=self.add_selected_file, # Tags refresh_tags_cb=self.refresh_tag_list, create_tag_cb=self.create_tag, checkout_tag_cb=self.checkout_tag, # Branches refresh_branches_cb=self.refresh_branch_list, create_branch_cb=self.create_branch, checkout_branch_cb=self.checkout_branch, # History refresh_history_cb=self.refresh_commit_history, # --- Remote Actions --- apply_remote_config_cb=self.apply_remote_config, check_connection_auth_cb=self.check_connection_auth, fetch_remote_cb=self.fetch_remote, pull_remote_cb=self.pull_remote, push_remote_cb=self.push_remote, push_tags_remote_cb=self.push_tags_remote, # === Dati / Istanze per la GUI === config_manager_instance=self.config_manager, profile_sections_list=self.config_manager.get_profile_sections(), refresh_remote_status_cb=self.refresh_remote_status, clone_remote_repo_cb=self.clone_remote_repo, ) print("MainFrame GUI created.") log_handler.log_debug( "MainFrame GUI created successfully.", func_name="__init__" ) except Exception as e: print(f"FATAL: Failed to initialize MainFrame GUI: {e}") log_handler.log_exception( "Failed to initialize MainFrame GUI.", func_name="__init__" ) self.show_fatal_error( f"GUI Initialization Error:\n{e}\n\nApplication cannot start." ) self.master.after(10, self.on_closing) # Tenta chiusura pulita return # Interrompe __init__ # --- Setup Logging Processing (File + Queue Polling) --- self._setup_logging_processing() # --- Log Application Start (via Queue) --- log_handler.log_info( "Git Sync Tool application starting.", func_name="__init__" ) # --- Initial Profile Load --- # Carica il profilo selezionato inizialmente (default o il primo) self._perform_initial_load() log_handler.log_info( "Git Sync Tool initialization complete.", func_name="__init__" ) def _setup_logging_processing(self): """Configures file logging and starts the log queue processing loop.""" func_name = "_setup_logging_processing" try: # 1. Configura solo il file logging (livello INFO di default) # Modifica qui se vuoi un livello diverso per il file (es. logging.DEBUG) setup_file_logging( level=logging.DEBUG ) # Impostato a DEBUG per tracciamento completo # 2. Avvia il polling della coda log per aggiornare la GUI if hasattr(self, "main_frame") and hasattr(self.main_frame, "log_text"): log_handler.log_info( "Starting log queue processing for GUI.", func_name=func_name ) self.master.after( self.LOG_QUEUE_CHECK_INTERVAL_MS, self._process_log_queue ) else: # Questo non dovrebbe accadere se l'init della GUI è andato a buon fine print( "ERROR: Cannot start log queue processing - GUI log widget not found." ) log_handler.log_error( "Cannot start log queue processing - GUI log widget not found.", func_name=func_name, ) except Exception as e: print(f"ERROR during logging setup: {e}") log_handler.log_exception( "Failed to setup logging processing.", func_name=func_name ) def _process_log_queue(self): """Processes messages from the log queue to update file and GUI log.""" # (Logica interna di _process_log_queue rimane invariata, gestisce # l'aggiornamento del widget log_text nella GUI e usa il logger root # per scrivere su file tramite l'handler configurato) log_widget = getattr(self.main_frame, "log_text", None) if not log_widget or not log_widget.winfo_exists(): # Se la GUI non esiste più, ferma il polling log_handler.log_warning( "Log widget not found, stopping queue processing.", func_name="_process_log_queue", ) return processed_count = 0 max_proc_per_cycle = 50 # Limite per non bloccare la GUI while not log_handler.log_queue.empty(): if processed_count >= max_proc_per_cycle: log_handler.log_debug( f"Processed {max_proc_per_cycle} log entries, pausing.", func_name="_process_log_queue", ) break # Processa in batch per mantenere la GUI reattiva try: log_entry = log_handler.log_queue.get_nowait() level = log_entry.get("level", logging.INFO) message = log_entry.get("message", "") level_name = log_handler.get_log_level_name(level) # 1. Scrivi nel logger root (che ha il FileHandler collegato) logging.getLogger().log(level, message) processed_count += 1 # 2. Aggiorna il widget della GUI se il livello è appropriato if level >= logging.DEBUG: # Mostra DEBUG e superiori nella GUI try: # Salva stato, abilita, inserisci, disabilita per evitare modifiche utente original_state = log_widget.cget("state") log_widget.config(state=tk.NORMAL) # Inserisci il messaggio con il tag per il colore log_widget.insert(tk.END, message + "\n", (level_name,)) log_widget.see(tk.END) # Scroll automatico log_widget.config(state=original_state) # Ripristina stato except tk.TclError as e_gui: # Errore specifico di Tkinter (es. widget distrutto nel frattempo) print( f"TclError updating log widget: {e_gui} - Message: {message}", file=sys.stderr, ) except Exception as e_gui: print( f"Error updating log widget: {e_gui} - Message: {message}", file=sys.stderr, ) except queue.Empty: break # Coda vuota except Exception as e_proc: print(f"Error processing log queue item: {e_proc}", file=sys.stderr) # Prova a loggare l'errore stesso, se possibile try: logging.getLogger().error( f"Error processing log queue item: {e_proc}" ) except Exception: pass # Ripianifica il prossimo controllo della coda if self.master.winfo_exists(): # Assicurati che la finestra root esista ancora self.master.after(self.LOG_QUEUE_CHECK_INTERVAL_MS, self._process_log_queue) def _perform_initial_load(self): """Loads the initially selected profile settings into the GUI.""" func_name = "_perform_initial_load" log_handler.log_debug("Performing initial profile load.", func_name=func_name) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): log_handler.log_error( "Cannot perform initial load: MainFrame not ready.", func_name=func_name ) return # Ottieni il profilo selezionato dal dropdown (che dovrebbe essere il default o il primo) initial_profile = self.main_frame.profile_var.get() if initial_profile: log_handler.log_debug( f"Loading initial profile: '{initial_profile}'", func_name=func_name ) # Chiama la funzione standard di caricamento profilo self.load_profile_settings(initial_profile) else: # Caso in cui non ci sono profili nel file .ini log_handler.log_warning( "No initial profile set (no profiles found?).", func_name=func_name ) # Pulisci e disabilita i campi self._clear_and_disable_fields() self.main_frame.update_status_bar( "No profiles found. Please add a profile." ) def on_closing(self): """Handles the window close event.""" func_name = "on_closing" log_handler.log_info("Application closing initiated.", func_name=func_name) # Prova ad aggiornare la status bar (potrebbe fallire se la GUI è già compromessa) if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): try: self.main_frame.update_status_bar("Exiting...") except Exception: pass # Distruggi la finestra principale Tkinter if self.master and self.master.winfo_exists(): self.master.destroy() log_handler.log_info("Application closed.", func_name=func_name) # Nota: I thread daemon dovrebbero terminare automaticamente # --- Profile Management Callbacks --- # (load_profile_settings e save_profile_settings sono state aggiornate prima # per includere i campi remote) def load_profile_settings(self, profile_name: str): """Loads settings for the selected profile into the GUI.""" func_name = "load_profile_settings" log_handler.log_info( f"Loading settings for profile: '{profile_name}'", func_name=func_name ) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): log_handler.log_error( "Cannot load profile: Main frame not available.", func_name=func_name ) return self.main_frame.update_status_bar( f"Processing: Loading profile '{profile_name}'..." ) # Validazione profilo if ( not profile_name or profile_name not in self.config_manager.get_profile_sections() ): log_handler.log_warning( f"Profile '{profile_name}' invalid/not found.", func_name=func_name ) self._clear_and_disable_fields() if profile_name: self.main_frame.show_error( "Profile Load Error", f"Profile '{profile_name}' not found." ) self.main_frame.update_status_bar( f"Error: Profile '{profile_name}' not found." if profile_name else "No profile selected." ) return cm = self.config_manager # Ottieni tutte le chiavi e i default attesi keys_with_defaults = cm._get_expected_keys_with_defaults() settings = {} for k, d in keys_with_defaults.items(): settings[k] = cm.get_profile_option(profile_name, k, fallback=d) mf = self.main_frame repo_path_for_refresh = "" # Salva il path per i refresh successivi try: # --- Carica campi esistenti --- mf.svn_path_entry.delete(0, tk.END) svn_path_value = settings.get("svn_working_copy_path", "") mf.svn_path_entry.insert(0, svn_path_value) repo_path_for_refresh = svn_path_value # Usa questo path per i controlli mf.usb_path_entry.delete(0, tk.END) mf.usb_path_entry.insert(0, settings.get("usb_drive_path", "")) mf.bundle_name_entry.delete(0, tk.END) mf.bundle_name_entry.insert(0, settings.get("bundle_name", "")) mf.bundle_updated_name_entry.delete(0, tk.END) mf.bundle_updated_name_entry.insert( 0, settings.get("bundle_name_updated", "") ) mf.autobackup_var.set( str(settings.get("autobackup", "False")).lower() == "true" ) mf.backup_dir_var.set(settings.get("backup_dir", DEFAULT_BACKUP_DIR)) mf.backup_exclude_extensions_var.set( settings.get("backup_exclude_extensions", "") ) mf.backup_exclude_dirs_var.set(settings.get("backup_exclude_dirs", "")) mf.toggle_backup_dir() # Aggiorna stato widget backup dir mf.autocommit_var.set( str(settings.get("autocommit", "False")).lower() == "true" ) mf.clear_commit_message() if mf.commit_message_text.winfo_exists(): state = mf.commit_message_text.cget("state") if state == tk.DISABLED: mf.commit_message_text.config(state=tk.NORMAL) mf.commit_message_text.insert("1.0", settings.get("commit_message", "")) if state == tk.DISABLED: mf.commit_message_text.config(state=tk.DISABLED) # --- Carica NUOVI campi REMOTE --- if hasattr(mf, "remote_url_var") and hasattr(mf, "remote_name_var"): mf.remote_url_var.set(settings.get("remote_url", "")) mf.remote_name_var.set(settings.get("remote_name", DEFAULT_REMOTE_NAME)) else: log_handler.log_warning( "Remote URL/Name widgets not found in GUI during load.", func_name=func_name, ) log_handler.log_info( f"Applied settings from '{profile_name}' to GUI fields.", func_name=func_name, ) # --- Aggiorna stato repository e avvia refresh se necessario --- self.update_svn_status_indicator( repo_path_for_refresh ) # Aggiorna stato repo (abilita/disabilita widget) # Controlla se il repo è pronto DOPO aver aggiornato l'indicatore is_ready = self._is_repo_ready(repo_path_for_refresh) if is_ready: log_handler.log_info( "Repo ready, triggering async refreshes.", func_name=func_name ) # Avvia i refresh asincroni (non bloccano) self.refresh_tag_list() self.refresh_branch_list() self.refresh_commit_history() self.refresh_changed_files_list() # Status bar verrà aggiornata dai singoli refresh o dai loro errori else: log_handler.log_info( "Repo not ready, clearing dynamic lists.", func_name=func_name ) # Pulisci le liste che dipendono dal repo if hasattr(mf, "update_tag_list"): mf.update_tag_list([]) if hasattr(mf, "update_branch_list"): mf.update_branch_list([], None) if hasattr(mf, "update_history_display"): mf.update_history_display([]) if hasattr(mf, "update_history_branch_filter"): mf.update_history_branch_filter([]) if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status(status_text="Sync Status: (Repo not ready)") # La lista changed files viene già gestita da update_svn_status_indicator # Imposta status bar finale per questo caso mf.update_status_bar( f"Profile '{profile_name}' loaded (Repo not ready)." ) except Exception as e: log_handler.log_exception( f"Error applying settings for '{profile_name}': {e}", func_name=func_name, ) if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status(status_text="Sync Status: Error") mf.show_error("Profile Load Error", f"Failed to apply settings:\n{e}") mf.update_status_bar(f"Error loading profile '{profile_name}'.") def save_profile_settings(self) -> bool: """Saves current GUI values to the selected profile config file.""" func_name = "save_profile_settings" profile_name = self.main_frame.profile_var.get() if not profile_name: log_handler.log_warning( "Save failed: No profile selected.", func_name=func_name ) if hasattr(self, "main_frame"): self.main_frame.update_status_bar("Save failed: No profile selected.") return False if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): log_handler.log_error( "Cannot save profile: Main frame not available.", func_name=func_name ) return False log_handler.log_info( f"Saving settings for profile: '{profile_name}'", func_name=func_name ) mf = self.main_frame cm = self.config_manager status_final = "Ready." success = False try: # Raccogli tutti i valori dalla GUI settings = { # Esistenti "svn_working_copy_path": mf.svn_path_entry.get(), "usb_drive_path": mf.usb_path_entry.get(), "bundle_name": mf.bundle_name_entry.get(), "bundle_name_updated": mf.bundle_updated_name_entry.get(), "autocommit": str(mf.autocommit_var.get()), "commit_message": mf.get_commit_message(), "autobackup": str(mf.autobackup_var.get()), "backup_dir": mf.backup_dir_var.get(), "backup_exclude_extensions": mf.backup_exclude_extensions_var.get(), "backup_exclude_dirs": mf.backup_exclude_dirs_var.get(), # Nuovi per Remote "remote_url": mf.remote_url_var.get(), "remote_name": mf.remote_name_var.get().strip() or DEFAULT_REMOTE_NAME, # Usa default se vuoto } # Itera e salva ogni opzione usando ConfigManager log_handler.log_debug(f"Settings to save: {settings}", func_name=func_name) for key, value in settings.items(): # set_profile_option gestisce la conversione a stringa e la creazione della sezione se necessario cm.set_profile_option(profile_name, key, value) # Scrivi le modifiche nel file .ini cm.save_config() log_handler.log_info( f"Settings saved successfully for '{profile_name}'.", func_name=func_name, ) status_final = f"Profile '{profile_name}' saved." success = True except Exception as e: log_handler.log_exception( f"Error saving profile '{profile_name}': {e}", func_name=func_name ) status_final = f"Error saving profile '{profile_name}'." mf.show_error("Save Error", f"Failed:\n{e}") success = False finally: # Aggiorna la status bar in ogni caso mf.update_status_bar(status_final) return success def add_profile(self): """Handles adding a new profile (Synchronous GUI part).""" func_name = "add_profile" log_handler.log_debug("'Add Profile' button clicked.", func_name=func_name) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return self.main_frame.update_status_bar("Adding new profile...") # Chiedi nome all'utente name = self.main_frame.ask_new_profile_name() if not name: log_handler.log_info("Add profile cancelled.", func_name=func_name) self.main_frame.update_status_bar("Add profile cancelled.") return name = name.strip() # Validazioni nome if not name: log_handler.log_warning("Add failed: Name empty.", func_name=func_name) self.main_frame.show_error("Input Error", "Profile name cannot be empty.") self.main_frame.update_status_bar("Add failed: Empty name.") return if name in self.config_manager.get_profile_sections(): log_handler.log_warning( f"Add failed: '{name}' exists.", func_name=func_name ) self.main_frame.show_error("Error", f"Profile '{name}' already exists.") self.main_frame.update_status_bar(f"Add failed: '{name}' exists.") return log_handler.log_info( f"Attempting to add new profile: '{name}'", func_name=func_name ) status_final = "Ready." try: # Ottieni i default da ConfigManager (che ora includono remote) defaults = self.config_manager._get_expected_keys_with_defaults() # Personalizza alcuni default specifici per il nuovo profilo defaults["bundle_name"] = f"{name}_repo.bundle" # Suggerimento nome bundle defaults["bundle_name_updated"] = f"{name}_update.bundle" defaults["svn_working_copy_path"] = "" # Path inizialmente vuoti defaults["usb_drive_path"] = "" defaults["remote_url"] = "" # URL remoto vuoto defaults["commit_message"] = ( f"Initial commit for profile {name}" # Messaggio commit esempio ) # Aggiungi sezione e imposta tutte le opzioni di default self.config_manager.add_section(name) # Crea sezione se non esiste for key, value in defaults.items(): self.config_manager.set_profile_option(profile_name, key, value) # Salva il file di configurazione self.config_manager.save_config() log_handler.log_info( f"Profile '{name}' added successfully.", func_name=func_name ) # Aggiorna la GUI: aggiungi il nuovo profilo al dropdown e selezionalo sections = self.config_manager.get_profile_sections() self.main_frame.update_profile_dropdown(sections) self.main_frame.profile_var.set( name ) # Questo triggererà load_profile_settings except Exception as e: log_handler.log_exception( f"Error adding profile '{name}': {e}", func_name=func_name ) status_final = f"Error adding profile '{name}'." self.main_frame.show_error("Add Error", f"Failed:\n{e}") self.main_frame.update_status_bar(status_final) def remove_profile(self): """Handles removing the selected profile (Synchronous GUI part).""" # (Logica invariata rispetto a prima, ma ora rimuoverà il profilo con tutti i suoi campi) func_name = "remove_profile" log_handler.log_debug("'Remove Profile' button clicked.", func_name=func_name) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return profile = self.main_frame.profile_var.get() # Validazioni if not profile: log_handler.log_warning( "Remove failed: No profile selected.", func_name=func_name ) self.main_frame.show_error("Error", "No profile selected.") self.main_frame.update_status_bar("Remove failed: No profile.") return if profile == DEFAULT_PROFILE: log_handler.log_warning( "Attempt remove default denied.", func_name=func_name ) self.main_frame.show_error( "Denied", f"Cannot remove default profile ('{DEFAULT_PROFILE}')." ) self.main_frame.update_status_bar("Cannot remove default.") return # Conferma utente if self.main_frame.ask_yes_no( "Confirm Remove", f"Remove profile '{profile}'?\nThis cannot be undone." ): log_handler.log_info( f"Attempting remove profile: '{profile}'", func_name=func_name ) self.main_frame.update_status_bar( f"Processing: Removing profile '{profile}'..." ) status_final = "Ready." try: # Chiama il metodo del config manager per rimuovere la sezione removed = self.config_manager.remove_profile_section(profile) if removed: # Salva la configurazione dopo la rimozione self.config_manager.save_config() log_handler.log_info( f"Profile '{profile}' removed.", func_name=func_name ) status_final = f"Profile '{profile}' removed." # Aggiorna il dropdown e seleziona un altro profilo (es. default) sections = self.config_manager.get_profile_sections() self.main_frame.update_profile_dropdown( sections ) # Questo dovrebbe selezionare il default o il primo else: # Caso strano: remove_profile_section ritorna False log_handler.log_error( f"Failed remove profile '{profile}' (ConfigManager returned False).", func_name=func_name, ) status_final = f"Error removing profile '{profile}'." self.main_frame.show_error( "Error", f"Could not remove '{profile}'. ConfigManager denied." ) self.main_frame.update_status_bar(status_final) except Exception as e: log_handler.log_exception( f"Error removing profile '{profile}': {e}", func_name=func_name ) status_final = f"Error removing profile '{profile}'." self.main_frame.show_error("Error", f"Failed:\n{e}") self.main_frame.update_status_bar(status_final) else: # Utente ha annullato log_handler.log_info("Profile removal cancelled.", func_name=func_name) self.main_frame.update_status_bar("Removal cancelled.") # --- GUI Interaction & Helpers --- def browse_folder(self, entry_widget): """Opens a directory chooser and updates the given entry widget.""" func_name = "browse_folder" current_path = entry_widget.get() # Determina directory iniziale più sensata initial_dir = os.path.expanduser("~") # Default alla home if current_path and os.path.isdir(current_path): initial_dir = current_path elif current_path and os.path.exists(os.path.dirname(current_path)): initial_dir = os.path.dirname(current_path) log_handler.log_debug( f"Opening folder browser. Initial: {initial_dir}", func_name=func_name ) directory = filedialog.askdirectory( initialdir=initial_dir, title="Select Directory", parent=self.master, # Assicura che sia modale rispetto alla finestra principale ) if directory: log_handler.log_debug( f"Directory selected: {directory}", func_name=func_name ) entry_widget.delete(0, tk.END) entry_widget.insert(0, directory) # Se è stato modificato il path SVN, aggiorna lo stato if ( hasattr(self.main_frame, "svn_path_entry") and entry_widget == self.main_frame.svn_path_entry ): self.update_svn_status_indicator(directory) else: log_handler.log_debug("Folder browse cancelled.", func_name=func_name) def update_svn_status_indicator(self, svn_path: str): """ Checks repo status, updates GUI indicator, and enables/disables relevant action widgets based on repo readiness. """ func_name = "update_svn_status_indicator" # Controlla se il path è una directory valida e se contiene .git is_valid_dir = bool(svn_path and os.path.isdir(svn_path)) is_repo_ready = is_valid_dir and os.path.exists(os.path.join(svn_path, ".git")) log_handler.log_debug( f"Updating repo status indicator. Path='{svn_path}', ValidDir={is_valid_dir}, Ready={is_repo_ready}", func_name=func_name, ) # Esci se la GUI non è pronta if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return mf = self.main_frame # Aggiorna l'indicatore colorato e il suo tooltip mf.update_svn_indicator(is_repo_ready) # --- Determina lo stato (NORMAL/DISABLED) dei vari widget --- # Stato basato sulla prontezza del repo (richiede .git) repo_ready_state = tk.NORMAL if is_repo_ready else tk.DISABLED # Stato basato solo sull'esistenza della directory (per 'Prepare') valid_dir_state = tk.NORMAL if is_valid_dir else tk.DISABLED # Stato per 'Prepare': normale solo se è una dir valida MA NON è già un repo pronto prepare_state = tk.NORMAL if is_valid_dir and not is_repo_ready else tk.DISABLED # Stato per 'Fetch from Bundle': complesso, dipende da stato repo o esistenza bundle fetch_button_state = self._calculate_fetch_button_state( mf, svn_path, is_repo_ready ) # --- Applica gli stati ai widget --- try: # Pulsanti Tab Repository if hasattr(mf, "prepare_svn_button"): mf.prepare_svn_button.config(state=prepare_state) if hasattr(mf, "create_bundle_button"): mf.create_bundle_button.config(state=repo_ready_state) if hasattr(mf, "fetch_bundle_button"): mf.fetch_bundle_button.config(state=fetch_button_state) if hasattr(mf, "edit_gitignore_button"): mf.edit_gitignore_button.config( state=repo_ready_state ) # Richiede repo pronto # Pulsanti/Widget Tab Backup if hasattr(mf, "manual_backup_button"): mf.manual_backup_button.config( state=valid_dir_state ) # Richiede solo dir valida # Pulsanti/Widget Tab Commit/Changes if hasattr(mf, "autocommit_checkbox"): mf.autocommit_checkbox.config(state=repo_ready_state) if hasattr(mf, "commit_message_text"): mf.commit_message_text.config(state=repo_ready_state) if hasattr(mf, "refresh_changes_button"): mf.refresh_changes_button.config(state=repo_ready_state) if hasattr(mf, "commit_button"): mf.commit_button.config(state=repo_ready_state) # Pulsanti/Widget Tab Tags if hasattr(mf, "refresh_tags_button"): mf.refresh_tags_button.config(state=repo_ready_state) if hasattr(mf, "create_tag_button"): mf.create_tag_button.config(state=repo_ready_state) if hasattr(mf, "checkout_tag_button"): mf.checkout_tag_button.config(state=repo_ready_state) if hasattr(mf, "tag_listbox"): mf.tag_listbox.config(state=repo_ready_state) # Pulsanti/Widget Tab Branches if hasattr(mf, "refresh_branches_button"): mf.refresh_branches_button.config(state=repo_ready_state) if hasattr(mf, "create_branch_button"): mf.create_branch_button.config(state=repo_ready_state) if hasattr(mf, "checkout_branch_button"): mf.checkout_branch_button.config(state=repo_ready_state) if hasattr(mf, "branch_listbox"): mf.branch_listbox.config(state=repo_ready_state) # Pulsanti/Widget Tab History if hasattr(mf, "refresh_history_button"): mf.refresh_history_button.config(state=repo_ready_state) if hasattr(mf, "history_branch_filter_combo"): combo_state = "readonly" if is_repo_ready else tk.DISABLED mf.history_branch_filter_combo.config(state=combo_state) if hasattr(mf, "history_text"): mf.history_text.config(state=repo_ready_state) # Pulsanti/Widget Tab Remote (per ora solo Apply Config) if hasattr(mf, "apply_remote_config_button"): mf.apply_remote_config_button.config(state=repo_ready_state) # Aggiungere qui gli altri pulsanti remote quando implementati # Gestione Lista Changed Files: Pulisci SOLO se repo NON è pronto if hasattr(mf, "changed_files_listbox"): if not is_repo_ready: log_handler.log_debug( "Repo not ready, clearing changes list via status update.", func_name=func_name, ) mf.update_changed_files_list(["(Repository not ready)"]) # else: Lascia che sia il refresh asincrono a popolarla se repo è pronto except Exception as e: # Logga errore se l'aggiornamento di stato dei widget fallisce log_handler.log_error( f"Error updating widget states based on repo status: {e}", func_name=func_name, ) def _calculate_fetch_button_state( self, main_frame: MainFrame, svn_path: str, is_repo_ready: bool ) -> str: """Determines the state (NORMAL/DISABLED) for the fetch button.""" # (Logica interna invariata) func_name = "_calculate_fetch_button_state" try: # Controlla se la directory SVN è utilizzabile per un clone can_use_svn_dir_for_clone = False if svn_path: if os.path.isdir(svn_path): # È utilizzabile se è una directory vuota try: if not os.listdir(svn_path): can_use_svn_dir_for_clone = True except OSError: pass # Ignora errori di listdir (es. permessi) else: # Se non è una dir, controlla se il parent esiste (per creare la dir) parent_dir = os.path.dirname(svn_path) if parent_dir and os.path.isdir(parent_dir): can_use_svn_dir_for_clone = True elif ( not parent_dir ): # Caso in cui il path è solo un nome file nella dir corrente can_use_svn_dir_for_clone = True # Controlla se il bundle specificato esiste bundle_file_exists = False usb_path_str = main_frame.usb_path_entry.get().strip() bundle_fetch_name = main_frame.bundle_updated_name_entry.get().strip() if usb_path_str and bundle_fetch_name and os.path.isdir(usb_path_str): bundle_full_path = os.path.join(usb_path_str, bundle_fetch_name) if os.path.isfile(bundle_full_path): bundle_file_exists = True # Abilita il fetch se: # 1. Il repo è già pronto (per fare fetch/merge) # 2. O se la directory di destinazione è utilizzabile E il bundle esiste (per fare clone) if is_repo_ready or (can_use_svn_dir_for_clone and bundle_file_exists): return tk.NORMAL else: return tk.DISABLED except Exception as e: log_handler.log_error( f"Error checking fetch button state: {e}", func_name=func_name ) return tk.DISABLED # Disabilita in caso di errore def _is_repo_ready(self, repo_path: str) -> bool: """Checks if the given path is a valid Git repository.""" # (Logica interna invariata) return bool( repo_path and os.path.isdir(repo_path) and os.path.exists(os.path.join(repo_path, ".git")) ) def _parse_exclusions(self) -> tuple[set[str], set[str]]: """Parses exclusion strings from GUI vars into sets.""" # (Logica interna invariata) exts = set() # Includi sempre .git e .svn nelle directory escluse per il backup dirs = {".git", ".svn"} if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return exts, dirs mf = self.main_frame # Estensioni da escludere ext_str = mf.backup_exclude_extensions_var.get() if ext_str: for ext in ext_str.split(","): clean = ext.strip().lower() if clean: # Assicura che inizi con un punto exts.add("." + clean.lstrip(".")) # Directory da escludere (solo nome base) dir_str = mf.backup_exclude_dirs_var.get() if dir_str: for dname in dir_str.split(","): # Pulisci spazi e separatori di directory clean = dname.strip().lower().strip(os.path.sep + "/") # Ignora nomi non validi o riferimenti relativi if clean and clean not in {".", ".."} and clean not in dirs: dirs.add(clean) log_handler.log_debug( f"Parsed Exclusions - Exts: {exts}, Dirs: {dirs}", func_name="_parse_exclusions", ) return exts, dirs def _get_and_validate_svn_path( self, operation_name: str = "Operation" ) -> str | None: """Gets and validates the SVN/Working Directory path from the GUI.""" # (Logica interna invariata) func_name = "_get_and_validate_svn_path" if not hasattr(self, "main_frame") or not hasattr( mf := self.main_frame, "svn_path_entry" ): log_handler.log_error( f"{operation_name} failed: SVN path entry widget missing.", func_name=func_name, ) return None path_str = mf.svn_path_entry.get().strip() if not path_str: log_handler.log_warning( f"{operation_name} failed: Working Directory path is empty.", func_name=func_name, ) mf.show_error("Input Error", "Working Directory path cannot be empty.") mf.update_status_bar(f"{operation_name} failed: Path empty.") return None abs_path = os.path.abspath(path_str) if not os.path.isdir(abs_path): log_handler.log_warning( f"{operation_name} failed: Path is not a valid directory: {abs_path}", func_name=func_name, ) mf.show_error( "Path Error", f"The specified path is not a valid directory:\n{abs_path}", ) mf.update_status_bar(f"{operation_name} failed: Not a directory.") return None log_handler.log_debug( f"{operation_name}: Using validated Working Directory path: {abs_path}", func_name=func_name, ) return abs_path def _get_and_validate_usb_path( self, operation_name: str = "Operation" ) -> str | None: """Gets and validates the USB/Bundle Target path from the GUI.""" # (Logica interna invariata) func_name = "_get_and_validate_usb_path" if not hasattr(self, "main_frame") or not hasattr( mf := self.main_frame, "usb_path_entry" ): log_handler.log_error( f"{operation_name} failed: Bundle Target path entry widget missing.", func_name=func_name, ) return None path_str = mf.usb_path_entry.get().strip() if not path_str: log_handler.log_warning( f"{operation_name} failed: Bundle Target path is empty.", func_name=func_name, ) mf.show_error("Input Error", "Bundle Target path cannot be empty.") mf.update_status_bar(f"{operation_name} failed: Path empty.") return None abs_path = os.path.abspath(path_str) if not os.path.isdir(abs_path): log_handler.log_warning( f"{operation_name} failed: Path is not a valid directory: {abs_path}", func_name=func_name, ) mf.show_error( "Path Error", f"The specified path is not a valid directory:\n{abs_path}", ) mf.update_status_bar(f"{operation_name} failed: Not a directory.") return None log_handler.log_debug( f"{operation_name}: Using validated Bundle Target path: {abs_path}", func_name=func_name, ) return abs_path def _clear_and_disable_fields(self): """Clears GUI fields and disables most action widgets.""" # (Logica interna invariata) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return mf = self.main_frame log_handler.log_debug( "Clearing and disabling fields.", func_name="_clear_and_disable_fields" ) # Pulisci campi di testo e variabili if hasattr(mf, "svn_path_entry"): mf.svn_path_entry.delete(0, tk.END) if hasattr(mf, "usb_path_entry"): mf.usb_path_entry.delete(0, tk.END) if hasattr(mf, "bundle_name_entry"): mf.bundle_name_entry.delete(0, tk.END) if hasattr(mf, "bundle_updated_name_entry"): mf.bundle_updated_name_entry.delete(0, tk.END) if hasattr(mf, "clear_commit_message"): mf.clear_commit_message() if hasattr(mf, "backup_dir_var"): mf.backup_dir_var.set("") if hasattr(mf, "backup_exclude_extensions_var"): mf.backup_exclude_extensions_var.set("") if hasattr(mf, "backup_exclude_dirs_var"): mf.backup_exclude_dirs_var.set("") if hasattr(mf, "remote_url_var"): mf.remote_url_var.set("") if hasattr(mf, "remote_name_var"): mf.remote_name_var.set("") if hasattr(mf, "autobackup_var"): mf.autobackup_var.set(False) if hasattr(mf, "autocommit_var"): mf.autocommit_var.set(False) # Aggiorna stato widget collegati a variabili (es. backup dir) if hasattr(mf, "toggle_backup_dir"): mf.toggle_backup_dir() # Pulisci liste dinamiche if hasattr(mf, "update_tag_list"): mf.update_tag_list([]) if hasattr(mf, "update_branch_list"): mf.update_branch_list([], None) if hasattr(mf, "update_history_display"): mf.update_history_display([]) if hasattr(mf, "update_history_branch_filter"): mf.update_history_branch_filter([]) if hasattr(mf, "update_changed_files_list"): mf.update_changed_files_list([]) # Disabilita tutti i widget di azione e aggiorna l'indicatore repo self.update_svn_status_indicator( "" ) # Passa path vuoto per forzare stato non pronto # Assicura che anche i bottoni del profilo (tranne Add) siano disabilitati if hasattr(mf, "remove_profile_button"): mf.remove_profile_button.config(state=tk.DISABLED) if hasattr(mf, "save_settings_button"): mf.save_settings_button.config(state=tk.DISABLED) mf.update_status_bar("No profile selected or repository not ready.") def show_fatal_error(self, message: str): """Displays a fatal error message box and attempts to close.""" # (Logica interna invariata) log_handler.log_critical( f"FATAL ERROR: {message}", func_name="show_fatal_error" ) try: # Tenta di usare la finestra master come parent, se esiste parent = ( self.master if hasattr(self, "master") and self.master and self.master.winfo_exists() else None ) messagebox.showerror("Fatal Error", message, parent=parent) except Exception as e: # Fallback: stampa su stderr se la GUI non è disponibile print(f"FATAL ERROR (GUI message failed: {e}): {message}", file=sys.stderr) finally: # Tenta comunque di chiudere l'applicazione self.on_closing() # --- ==== ASYNCHRONOUS ACTION LAUNCHERS ==== --- # (Questi metodi ora preparano gli args e chiamano _start_async_operation # passando la funzione worker corretta da async_workers.py) def _start_async_operation( self, worker_func, args_tuple: tuple, context_dict: dict ): """ Generic helper to start an async operation in a separate thread. Handles UI feedback (disabling widgets, status bar). """ # (Logica interna invariata: controlla GUI, aggiorna status, crea coda, # avvia thread con worker_func e args_tuple + coda, pianifica check) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): log_handler.log_error( "Cannot start async op: Main frame missing.", func_name="_start_async_operation", ) return context_name = context_dict.get("context", "unknown_op") status_msg = context_dict.get("status_msg", context_name) log_handler.log_info( f"--- Action Triggered: {context_name} (Async Start) ---", func_name=context_name, ) # Disabilita GUI e aggiorna status bar self.main_frame.set_action_widgets_state(tk.DISABLED) self.main_frame.update_status_bar( f"Processing: {status_msg}...", bg_color=self.main_frame.STATUS_YELLOW ) # Crea coda per il risultato (dimensione 1 è sufficiente) results_queue = queue.Queue(maxsize=1) # Aggiungi la coda come ultimo argomento per il worker full_args = args_tuple + (results_queue,) # Avvia il thread worker log_handler.log_debug( f"Creating worker thread for {context_name}. Worker func: {worker_func.__name__}", func_name="_start_async_operation", ) try: worker_thread = threading.Thread( target=worker_func, args=full_args, daemon=True ) log_handler.log_debug( f"Starting worker thread for {context_name}.", func_name="_start_async_operation", ) worker_thread.start() except Exception as thread_e: log_handler.log_exception( f"Failed to start worker thread for {context_name}: {thread_e}", func_name="_start_async_operation", ) self.main_frame.show_error( "Threading Error", f"Could not start background task for {context_name}.", ) self.main_frame.update_status_bar( f"Error starting task: {context_name}", bg_color=self.main_frame.STATUS_RED, duration_ms=10000, ) self.main_frame.set_action_widgets_state( tk.NORMAL ) # Riabilita subito se il thread non parte return # Pianifica il controllo del risultato log_handler.log_debug( f"Scheduling completion check for {context_name}.", func_name="_start_async_operation", ) self.master.after( self.ASYNC_QUEUE_CHECK_INTERVAL_MS, self._check_completion_queue, results_queue, context_dict, # Passa il contesto originale per il check ) # --- Specific Action Wrappers (Chiamano i worker esterni) --- def refresh_tag_list(self): """Starts async operation to refresh the tag list.""" func_name = "refresh_tag_list" svn_path = self._get_and_validate_svn_path("Refresh Tags") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_debug( "Refresh Tags skipped: Repo not ready.", func_name=func_name ) # Aggiorna subito la GUI se il repo non è pronto if hasattr(self.main_frame, "update_tag_list"): self.main_frame.update_tag_list([("(Repo not ready)", "")]) self.main_frame.update_status_bar("Ready (Repo not ready).") return # Argomenti per il worker: dipendenze + parametri specifici args = (self.git_commands, svn_path) self._start_async_operation( async_workers.run_refresh_tags_async, # Funzione worker esterna args, {"context": "refresh_tags", "status_msg": "Refreshing tags"}, ) def refresh_branch_list(self): """Starts async operation to refresh the branch list.""" func_name = "refresh_branch_list" svn_path = self._get_and_validate_svn_path("Refresh Branches") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_debug( "Refresh Branches skipped: Repo not ready.", func_name=func_name ) if hasattr(self.main_frame, "update_branch_list"): self.main_frame.update_branch_list([], None) if hasattr(self.main_frame, "update_history_branch_filter"): self.main_frame.update_history_branch_filter([]) self.main_frame.update_status_bar("Ready (Repo not ready).") return args = (self.git_commands, svn_path) self._start_async_operation( async_workers.run_refresh_branches_async, # Worker esterno args, {"context": "refresh_branches", "status_msg": "Refreshing branches"}, ) def refresh_commit_history(self): """Starts async operation to refresh the commit history.""" func_name = "refresh_commit_history" svn_path = self._get_and_validate_svn_path("Refresh History") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_debug( "Refresh History skipped: Repo not ready.", func_name=func_name ) if hasattr(self.main_frame, "update_history_display"): self.main_frame.update_history_display(["(Repo not ready)"]) self.main_frame.update_status_bar("Ready (Repo not ready).") return # Determina filtro branch/tag dalla GUI branch_filter = None log_scope = "All History" if hasattr(self.main_frame, "history_branch_filter_var"): filter_sel = self.main_frame.history_branch_filter_var.get() if filter_sel and filter_sel != "-- All History --": branch_filter = filter_sel log_scope = f"'{branch_filter}'" args = (self.git_commands, svn_path, branch_filter, log_scope) self._start_async_operation( async_workers.run_refresh_history_async, # Worker esterno args, { "context": "refresh_history", "status_msg": f"Refreshing history for {log_scope}", }, ) def refresh_changed_files_list(self): """Starts async operation to refresh the changed files list.""" func_name = "refresh_changed_files_list" svn_path = self._get_and_validate_svn_path("Refresh Changed Files") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_debug( "Refresh Changes skipped: Repo not ready.", func_name=func_name ) # La lista viene già pulita da update_svn_status_indicator se necessario self.main_frame.update_status_bar("Ready (Repo not ready).") return args = (self.git_commands, svn_path) self._start_async_operation( async_workers.run_refresh_changes_async, # Worker esterno args, {"context": "refresh_changes", "status_msg": "Refreshing changed files"}, ) def open_diff_viewer(self, file_status_line: str): """Opens the Diff Viewer window for the selected file (Synchronous GUI action).""" # (Logica invariata: validazioni, controllo stato file, apre finestra modale) func_name = "open_diff_viewer" log_handler.log_info( f"--- Action Triggered: Open Diff Viewer for '{file_status_line}' ---", func_name=func_name, ) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return self.main_frame.update_status_bar("Processing: Opening diff viewer...") svn_path = self._get_and_validate_svn_path("Open Diff Viewer") if not svn_path: self.main_frame.update_status_bar( "Error: Cannot open diff (invalid repo path)." ) return cleaned_line = file_status_line.strip("\x00").strip() if not cleaned_line or len(cleaned_line) < 2: log_handler.log_warning( f"Invalid status line received for diff: '{file_status_line}'", func_name=func_name, ) self.main_frame.show_warning( "Diff Error", "Invalid file status line selected." ) self.main_frame.update_status_bar("Error: Invalid selection for diff.") return status_code = cleaned_line[:2].strip() # Impedisci diff per stati non appropriati if status_code in ["??", "!!", "D"]: display_path = "(Could not parse path)" try: # Tenta estrazione path per messaggio errore if "->" in cleaned_line: display_path = cleaned_line.split("->")[-1].strip().strip('"') else: display_path = cleaned_line[len(status_code) :].lstrip().strip('"') except Exception: pass msg = f"Cannot show diff for file with status '{status_code}':\n{display_path}\n\n(Untracked, Ignored, or Deleted files cannot be diffed against HEAD)." log_handler.log_info( f"Diff not applicable for status '{status_code}'.", func_name=func_name ) self.main_frame.show_info("Diff Not Applicable", msg) self.main_frame.update_status_bar("Ready (Diff not applicable).") return log_handler.log_debug( f"Opening DiffViewerWindow with status line: '{file_status_line}'", func_name=func_name, ) status_final = "Ready." try: # Passa git_commands necessario per l'operazione interna al DiffViewer DiffViewerWindow(self.master, self.git_commands, svn_path, file_status_line) log_handler.log_debug("Diff viewer window closed.", func_name=func_name) status_final = "Ready." except Exception as e: log_handler.log_exception( f"Error opening or running diff viewer: {e}", func_name=func_name ) status_final = "Error: Failed to open diff viewer." self.main_frame.show_error( "Diff Viewer Error", f"Could not display diff:\n{e}" ) finally: self.main_frame.update_status_bar(status_final) def prepare_svn_for_git(self): """Starts async operation to prepare the repository.""" func_name = "prepare_svn_for_git" svn_path = self._get_and_validate_svn_path("Prepare Repository") if not svn_path: self.main_frame.update_status_bar("Prepare failed: Invalid path.") return # Controlla se è già pronto PRIMA di avviare l'operazione if self._is_repo_ready(svn_path): log_handler.log_info( "Prepare skipped: Repository already prepared.", func_name=func_name ) self.main_frame.show_info("Info", "Repository is already prepared.") self.update_svn_status_indicator(svn_path) # Assicura stato GUI corretto return # Argomenti: dipendenza action_handler + path args = (self.action_handler, svn_path) self._start_async_operation( async_workers.run_prepare_async, # Worker esterno args, {"context": "prepare_repo", "status_msg": "Preparing repository"}, ) def create_git_bundle(self): """Starts async operation to create a Git bundle.""" func_name = "create_git_bundle" # Raccolta e validazione input dalla GUI profile = self.main_frame.profile_var.get() svn_path = self._get_and_validate_svn_path("Create Bundle") usb_path = self._get_and_validate_usb_path("Create Bundle") bundle_name = self.main_frame.bundle_name_entry.get().strip() if not profile or not svn_path or not usb_path or not bundle_name: log_handler.log_warning( "Create Bundle cancelled: Missing inputs.", func_name=func_name ) # Messaggi di errore specifici mostrati da get_and_validate return if not self._is_repo_ready(svn_path): log_handler.log_warning( "Create Bundle failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not prepared.") self.main_frame.update_status_bar("Create Bundle failed: Repo not ready.") return # Aggiusta nome bundle se necessario if not bundle_name.lower().endswith(".bundle"): bundle_name += ".bundle" bundle_full_path = os.path.join(usb_path, bundle_name) # Salva impostazioni profilo prima dell'azione if not self.save_profile_settings(): if not self.main_frame.ask_yes_no( "Warning", "Could not save profile settings.\nContinue creating bundle anyway?", ): self.main_frame.update_status_bar( "Create Bundle cancelled (profile save failed)." ) return # Prepara parametri per il worker exts, dirs = self._parse_exclusions() backup_enabled = self.main_frame.autobackup_var.get() backup_dir = self.main_frame.backup_dir_var.get() commit_enabled = self.main_frame.autocommit_var.get() commit_msg = self.main_frame.get_commit_message() # Argomenti per il worker: dipendenza + parametri args = ( self.action_handler, svn_path, bundle_full_path, profile, backup_enabled, backup_dir, commit_enabled, commit_msg, exts, dirs, ) self._start_async_operation( async_workers.run_create_bundle_async, # Worker esterno args, { "context": "create_bundle", "status_msg": f"Creating bundle '{bundle_name}'", "committed_flag_possible": True, # Segnala che un commit potrebbe avvenire }, ) def fetch_from_git_bundle(self): """Starts async operation to fetch/clone from a Git bundle.""" func_name = "fetch_from_git_bundle" # Raccolta e validazione input profile = self.main_frame.profile_var.get() # Nota: svn_path_str può essere una directory non ancora esistente se si fa clone svn_path_str = self.main_frame.svn_path_entry.get().strip() usb_path = self._get_and_validate_usb_path("Fetch Bundle") bundle_name = self.main_frame.bundle_updated_name_entry.get().strip() if not profile or not svn_path_str or not usb_path or not bundle_name: log_handler.log_warning( "Fetch Bundle cancelled: Missing inputs.", func_name=func_name ) return bundle_full_path = os.path.join(usb_path, bundle_name) # Controlla esistenza bundle PRIMA di iniziare l'async op if not os.path.isfile(bundle_full_path): log_handler.log_error( f"Fetch Bundle failed: Bundle file not found at '{bundle_full_path}'", func_name=func_name, ) self.main_frame.show_error( "File Not Found", f"Bundle file not found:\n{bundle_full_path}" ) self.main_frame.update_status_bar("Fetch failed: Bundle not found.") return # Salva profilo if not self.save_profile_settings(): if not self.main_frame.ask_yes_no( "Warning", "Could not save profile settings.\nContinue fetching from bundle anyway?", ): self.main_frame.update_status_bar( "Fetch cancelled (profile save failed)." ) return # Prepara parametri exts, dirs = self._parse_exclusions() backup_enabled = self.main_frame.autobackup_var.get() backup_dir = self.main_frame.backup_dir_var.get() # Argomenti per il worker args = ( self.action_handler, svn_path_str, bundle_full_path, profile, backup_enabled, backup_dir, exts, dirs, ) self._start_async_operation( async_workers.run_fetch_bundle_async, # Worker esterno args, { "context": "fetch_bundle", "status_msg": f"Fetching from '{bundle_name}'", # Passa repo_path nel contesto per eventuale gestione errore conflitto "repo_path": svn_path_str, }, ) def manual_backup(self): """Starts async operation for manual backup.""" func_name = "manual_backup" # Raccolta e validazione input profile = self.main_frame.profile_var.get() svn_path = self._get_and_validate_svn_path(f"Manual Backup ({profile})") bk_dir_str = self.main_frame.backup_dir_var.get().strip() if not profile or not svn_path: return # Errore già mostrato if not bk_dir_str: log_handler.log_warning( "Manual backup failed: Backup directory is empty.", func_name=func_name ) self.main_frame.show_error( "Input Error", "Backup directory cannot be empty for manual backup." ) self.main_frame.update_status_bar("Manual backup failed: Backup dir empty.") return # Validazione backup directory bk_dir = os.path.abspath(bk_dir_str) # Nota: create_zip_backup creerà la directory se non esiste, ma controlliamo permessi se esiste if os.path.exists(bk_dir) and not os.path.isdir(bk_dir): log_handler.log_error( f"Manual backup failed: Backup path exists but is not a directory: {bk_dir}", func_name=func_name, ) self.main_frame.show_error( "Path Error", f"Backup path exists but is not a directory:\n{bk_dir}" ) self.main_frame.update_status_bar( "Manual backup failed: Invalid backup path." ) return # Salva profilo if not self.save_profile_settings(): if not self.main_frame.ask_yes_no( "Warning", "Could not save profile settings.\nContinue backup anyway?" ): self.main_frame.update_status_bar( "Backup cancelled (profile save failed)." ) return exts, dirs = self._parse_exclusions() # Argomenti: dipendenza backup_handler + parametri args = (self.backup_handler, svn_path, bk_dir, profile, exts, dirs) self._start_async_operation( async_workers.run_manual_backup_async, # Worker esterno args, {"context": "manual_backup", "status_msg": "Creating manual backup"}, ) def commit_changes(self): """Starts async operation to commit staged changes.""" func_name = "commit_changes" svn_path = self._get_and_validate_svn_path("Commit") commit_msg = self.main_frame.get_commit_message() if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Commit failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not ready.") self.main_frame.update_status_bar("Commit failed: Repo not ready.") return if not commit_msg: log_handler.log_warning( "Commit failed: Commit message is empty.", func_name=func_name ) self.main_frame.show_error("Input Error", "Commit message cannot be empty.") self.main_frame.update_status_bar("Commit failed: Empty message.") return # Argomenti: dipendenza action_handler + parametri args = (self.action_handler, svn_path, commit_msg) self._start_async_operation( async_workers.run_commit_async, # Worker esterno args, { "context": "commit", "status_msg": "Committing changes", "committed_flag_possible": True, # Segnala possibile commit }, ) def open_gitignore_editor(self): """Opens the .gitignore editor window (Synchronous GUI action).""" # (Logica sync invariata: valida path, apre finestra modale) func_name = "open_gitignore_editor" log_handler.log_info( f"--- Action Triggered: Edit .gitignore ---", func_name=func_name ) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return self.main_frame.update_status_bar("Processing: Opening .gitignore editor...") svn_path = self._get_and_validate_svn_path("Edit .gitignore") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Cannot edit .gitignore: Repo path invalid/not ready.", func_name=func_name, ) self.main_frame.show_error( "Action Failed", "Select a valid and prepared repository first." ) self.main_frame.update_status_bar("Edit failed: Repo not ready.") return gitignore_path = os.path.join(svn_path, ".gitignore") log_handler.log_debug( f"Target .gitignore path: {gitignore_path}", func_name=func_name ) status_after_edit = "Ready." # Status bar di default dopo chiusura editor try: log_handler.log_debug( "Opening GitignoreEditorWindow...", func_name=func_name ) # Passa il callback per l'azione post-salvataggio GitignoreEditorWindow( self.master, gitignore_path, None, # Logger non più passato on_save_success_callback=self._handle_gitignore_save, # Chiama il metodo di questa classe ) log_handler.log_debug( "Gitignore editor window closed.", func_name=func_name ) # Controlla se un'operazione asincrona è stata avviata dal callback # (verrà aggiornata da _check_completion_queue) if not self.main_frame.status_bar_var.get().startswith("Processing"): self.main_frame.update_status_bar(status_after_edit) except Exception as e: log_handler.log_exception( f"Error opening or running .gitignore editor: {e}", func_name=func_name ) status_after_edit = "Error opening .gitignore editor." self.main_frame.show_error("Editor Error", f"Could not open editor:\n{e}") self.main_frame.update_status_bar(status_after_edit) def _handle_gitignore_save(self): """Callback executed after .gitignore is saved successfully. Starts async untrack check.""" # (Questo ora avvia solo l'operazione asincrona) func_name = "_handle_gitignore_save" log_handler.log_info( "Callback: .gitignore saved. Starting async untrack check.", func_name=func_name, ) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return svn_path = self._get_and_validate_svn_path("Untrack Check after Gitignore Save") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_error( "Cannot start untrack check: Invalid/Not ready path.", func_name=func_name, ) self.main_frame.update_status_bar( "Error: Untrack check failed (invalid path)." ) return # Argomenti: dipendenza action_handler + path args = ( self.action_handler, svn_path, ) self._start_async_operation( async_workers.run_untrack_async, # Worker esterno args, { "context": "_handle_gitignore_save", # Contesto per identificare l'origine "status_msg": "Checking files to untrack", "committed_flag_possible": True, # Untrack fa un commit }, ) def add_selected_file(self, file_status_line: str): """Starts async operation to add a selected untracked file.""" func_name = "add_selected_file" log_handler.log_info( f"--- Action Triggered: Add File '{file_status_line}' (Async) ---", func_name=func_name, ) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return svn_path = self._get_and_validate_svn_path("Add File") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Add file failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not ready.") self.main_frame.update_status_bar("Add failed: Repo not ready.") return relative_path = "" try: # Estrai path dalla riga di stato (solo per '??') line = file_status_line.strip("\x00").strip() if line.startswith("??"): # Gestisce path con spazi o virgolette (comune senza -z) rp_raw = line[2:].lstrip() if len(rp_raw) >= 2 and rp_raw.startswith('"') and rp_raw.endswith('"'): relative_path = rp_raw[1:-1] else: relative_path = rp_raw else: # Non è un file untracked, non si può aggiungere così log_handler.log_error( f"Cannot add non-untracked file: {line}", func_name=func_name ) self.main_frame.show_error( "Invalid Action", f"Cannot 'Add' file with status '{line[:2]}'.\nUse commit for modified/staged files.", ) self.main_frame.update_status_bar("Add failed: Not an untracked file.") return if not relative_path: raise ValueError("Extracted relative path is empty.") except Exception as e: log_handler.log_error( f"Error parsing path for add from line '{file_status_line}': {e}", func_name=func_name, ) self.main_frame.show_error( "Parsing Error", f"Cannot parse file path from:\n{file_status_line}" ) self.main_frame.update_status_bar("Add failed: Parse error.") return # Argomenti: dipendenza git_commands + parametri args = (self.git_commands, svn_path, relative_path) base_name = os.path.basename(relative_path) # Per messaggio status bar self._start_async_operation( async_workers.run_add_file_async, # Worker esterno args, {"context": "add_file", "status_msg": f"Adding '{base_name}'"}, ) def create_tag(self): """Handles tag creation: shows dialog then starts async operation.""" # (Logica sync per dialogo e chiamata a _generate_next_tag_suggestion è invariata) func_name = "create_tag" log_handler.log_info( f"--- Action Triggered: Create Tag ---", func_name=func_name ) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return svn_path = self._get_and_validate_svn_path("Create Tag") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Create Tag failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not ready.") self.main_frame.update_status_bar("Create Tag failed: Repo not ready.") return # Genera suggerimento e mostra dialogo self.main_frame.update_status_bar("Processing: Generating tag suggestion...") suggested = self._generate_next_tag_suggestion( svn_path ) # Chiama il metodo reintegrato self.main_frame.update_status_bar("Ready for tag input.") dialog = CreateTagDialog(self.master, suggested_tag_name=suggested) tag_info = dialog.result if tag_info: tag_name, tag_message = tag_info log_handler.log_info( f"User provided tag: '{tag_name}'", func_name=func_name ) # Argomenti: dipendenza action_handler + parametri args = (self.action_handler, svn_path, tag_name, tag_message) self._start_async_operation( async_workers.run_create_tag_async, # Worker esterno args, { "context": "create_tag", "status_msg": f"Creating tag '{tag_name}'", "committed_flag_possible": True, # Tag annotato fa commit }, ) else: log_handler.log_info("Tag creation cancelled.", func_name=func_name) self.main_frame.update_status_bar("Cancelled.") def checkout_tag(self): """Handles tag checkout: confirms then starts async operation.""" func_name = "checkout_tag" log_handler.log_info( f"--- Action Triggered: Checkout Tag ---", func_name=func_name ) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return svn_path = self._get_and_validate_svn_path("Checkout Tag") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Checkout Tag failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not ready.") self.main_frame.update_status_bar("Checkout Tag failed: Repo not ready.") return tag = self.main_frame.get_selected_tag() # Ottieni tag dalla listbox if not tag: self.main_frame.show_error( "Selection Error", "No tag selected from the list." ) self.main_frame.update_status_bar("Checkout failed: No tag selected.") return # Chiedi conferma all'utente (operazione potenzialmente confusa) msg = ( f"Checkout tag '{tag}'?\n\nWarning: This will put your repository in a 'detached HEAD' state. " "You can look around, make experimental changes and commit them, but they won't belong to any branch. " "Use 'Checkout Branch' to return to a branch." ) if not self.main_frame.ask_yes_no("Confirm Checkout Tag", msg): log_handler.log_info("Tag checkout cancelled by user.", func_name=func_name) self.main_frame.update_status_bar("Cancelled.") return # Argomenti: dipendenza action_handler + parametri args = (self.action_handler, svn_path, tag) self._start_async_operation( async_workers.run_checkout_tag_async, # Worker esterno args, {"context": "checkout_tag", "status_msg": f"Checking out tag '{tag}'"}, ) def create_branch(self): """Handles branch creation: shows dialog then starts async operation.""" func_name = "create_branch" log_handler.log_info( f"--- Action Triggered: Create Branch ---", func_name=func_name ) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return svn_path = self._get_and_validate_svn_path("Create Branch") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Create Branch failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not ready.") self.main_frame.update_status_bar("Create Branch failed: Repo not ready.") return # Mostra dialogo per ottenere nome branch self.main_frame.update_status_bar("Ready for branch name input.") dialog = CreateBranchDialog(self.master) branch_name = dialog.result if branch_name: log_handler.log_info( f"User provided branch name: '{branch_name}'", func_name=func_name ) # Argomenti: dipendenza action_handler + parametri args = (self.action_handler, svn_path, branch_name) self._start_async_operation( async_workers.run_create_branch_async, # Worker esterno args, { "context": "create_branch", "status_msg": f"Creating branch '{branch_name}'", "new_branch_name": branch_name, # Passa il nome per eventuale checkout post-creazione }, ) else: log_handler.log_info("Branch creation cancelled.", func_name=func_name) self.main_frame.update_status_bar("Cancelled.") def checkout_branch( self, branch_to_checkout: str | None = None, repo_path_override: str | None = None, ): """ Handles branch checkout: confirms (if needed) then starts async operation. Can be called directly with a branch name (e.g., after creation). """ func_name = "checkout_branch" target_branch = branch_to_checkout if branch_to_checkout else "Selected Branch" log_handler.log_info( f"--- Action Triggered: Checkout Branch (Target: {target_branch}) ---", func_name=func_name, ) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return # Usa path override se fornito, altrimenti prendi dalla GUI svn_path = repo_path_override or self._get_and_validate_svn_path( "Checkout Branch" ) if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Checkout Branch failed: Repo not ready.", func_name=func_name ) self.main_frame.show_error("Action Failed", "Repository is not ready.") self.main_frame.update_status_bar("Checkout Branch failed: Repo not ready.") return branch = branch_to_checkout needs_confirmation = False if not branch: # Se non fornito, prendi dalla selezione GUI e chiedi conferma branch = self.main_frame.get_selected_branch() needs_confirmation = True if not branch: self.main_frame.show_error( "Selection Error", "No branch selected from the list." ) self.main_frame.update_status_bar("Checkout failed: No branch selected.") return # Chiedi conferma solo se non è stato passato esplicitamente un branch if needs_confirmation: if not self.main_frame.ask_yes_no( "Confirm Checkout Branch", f"Switch to branch '{branch}'?" ): log_handler.log_info( "Branch checkout cancelled by user.", func_name=func_name ) self.main_frame.update_status_bar("Cancelled.") return # Argomenti: dipendenza action_handler + parametri args = (self.action_handler, svn_path, branch) self._start_async_operation( async_workers.run_checkout_branch_async, # Worker esterno args, { "context": "checkout_branch", "status_msg": f"Checking out branch '{branch}'", }, ) # --- NUOVO: Wrapper per Apply Remote Config --- def apply_remote_config(self): """Callback triggered by the 'Apply Config to Local Repo' button. Starts async worker.""" func_name = "apply_remote_config" log_handler.log_info( f"--- Action Triggered: Apply Remote Config ---", func_name=func_name ) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): log_handler.log_error( "Cannot apply config: Main frame missing.", func_name=func_name ) return svn_path = self._get_and_validate_svn_path("Apply Remote Config") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Apply config skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) self.main_frame.update_status_bar("Apply config failed: Repo not ready.") return remote_url = self.main_frame.remote_url_var.get().strip() remote_name = self.main_frame.remote_name_var.get().strip() if not remote_url: log_handler.log_warning( "Apply config failed: Remote URL is empty.", func_name=func_name ) self.main_frame.show_error("Input Error", "Remote URL cannot be empty.") self.main_frame.update_status_bar("Apply config failed: URL empty.") return if not remote_name: remote_name = DEFAULT_REMOTE_NAME # Usa default log_handler.log_info( f"Remote name empty, using default: '{remote_name}'", func_name=func_name, ) self.main_frame.remote_name_var.set(remote_name) # Salva profilo PRIMA di applicare if not self.save_profile_settings(): if not self.main_frame.ask_yes_no( "Warning", "Could not save profile settings.\nContinue applying remote config anyway?", ): self.main_frame.update_status_bar( "Apply config cancelled (profile save failed)." ) return # Argomenti: dipendenza remote_action_handler + parametri args = (self.remote_action_handler, svn_path, remote_name, remote_url) self._start_async_operation( async_workers.run_apply_remote_config_async, # Worker esterno args, { "context": "apply_remote_config", "status_msg": f"Applying config for remote '{remote_name}'", }, ) def check_connection_auth(self): """Callback for 'Check Connection & Auth' button.""" func_name = "check_connection_auth" log_handler.log_info( f"--- Action Triggered: Check Connection & Auth ---", func_name=func_name ) # Validazioni if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return svn_path = self._get_and_validate_svn_path("Check Connection") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Check Connection skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) self._update_gui_auth_status( "unknown" ) # Resetta indicatore se repo non pronto return remote_name = self.main_frame.remote_name_var.get().strip() if not remote_name: # Usa default se vuoto (coerente con apply_remote_config) remote_name = DEFAULT_REMOTE_NAME self.main_frame.remote_name_var.set(remote_name) log_handler.log_info( f"Checking connection/auth for remote '{remote_name}'...", func_name=func_name, ) self._update_gui_auth_status("checking") # Stato visivo temporaneo (opzionale) # Argomenti per il worker di controllo args = (self.git_commands, svn_path, remote_name) self._start_async_operation( async_workers.run_check_connection_async, # Worker che esegue ls-remote args, { "context": "check_connection", # Contesto per il check iniziale "status_msg": f"Checking remote '{remote_name}'", # Passiamo il nome del remote nel contesto per usarlo dopo "remote_name_checked": remote_name, "repo_path_checked": svn_path, # Passiamo anche il path }, ) def fetch_remote(self): """Starts the asynchronous 'git fetch' operation.""" func_name = "fetch_remote" log_handler.log_info( f"--- Action Triggered: Fetch Remote ---", func_name=func_name ) # Validazioni if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return svn_path = self._get_and_validate_svn_path("Fetch Remote") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Fetch Remote skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) self.main_frame.update_status_bar("Fetch failed: Repo not ready.") return remote_name = self.main_frame.remote_name_var.get().strip() if not remote_name: remote_name = DEFAULT_REMOTE_NAME # Usa default se vuoto self.main_frame.remote_name_var.set( remote_name ) # Aggiorna GUI per coerenza # Verifica lo stato dell'autenticazione PRIMA di tentare il fetch # Se sappiamo già che serve auth, potremmo avvisare l'utente # if self.remote_auth_status == 'required' or self.remote_auth_status == 'failed': # if not self.main_frame.ask_yes_no("Authentication May Be Required", # f"Last check indicated authentication is needed or failed for remote '{remote_name}'.\n" # f"Attempt fetch anyway? (May open a terminal for credentials)"): # self.main_frame.update_status_bar("Fetch cancelled by user.") # return # Potremmo anche forzare un check prima: self.check_connection_auth() e aspettare il risultato? Complesso. # Per ora, tentiamo direttamente il fetch. Sarà il worker a gestire errori auth. log_handler.log_info( f"Starting fetch for remote '{remote_name}'...", func_name=func_name ) # Argomenti per il worker: dipendenza + parametri args = (self.remote_action_handler, svn_path, remote_name) self._start_async_operation( async_workers.run_fetch_remote_async, # Worker esterno per fetch args, { "context": "fetch_remote", # Contesto per il risultato "status_msg": f"Fetching from remote '{remote_name}'", }, ) def pull_remote(self): """Starts the asynchronous 'git pull' operation for the current branch.""" func_name = "pull_remote" log_handler.log_info( f"--- Action Triggered: Pull Remote ---", func_name=func_name ) # Validazioni if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return svn_path = self._get_and_validate_svn_path("Pull Remote") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Pull Remote skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) self.main_frame.update_status_bar("Pull failed: Repo not ready.") return remote_name = self.main_frame.remote_name_var.get().strip() if not remote_name: remote_name = DEFAULT_REMOTE_NAME # Usa default self.main_frame.remote_name_var.set(remote_name) # Verifica stato autenticazione (opzionale, ma consigliato) # Se non connesso o auth richiesta, potremmo avvisare o impedire if self.remote_auth_status != "ok": msg = f"Cannot Pull from '{remote_name}':\n" if self.remote_auth_status == "required": msg += ( "Authentication is required. Use 'Check Connection / Auth' first." ) elif self.remote_auth_status == "failed": msg += "Authentication previously failed. Use 'Check Connection / Auth' to retry." elif self.remote_auth_status == "connection_failed": msg += "Connection previously failed. Check URL and network." else: # unknown or unknown_error msg += "Connection status is unknown or in error. Use 'Check Connection / Auth' first." log_handler.log_warning( f"Pull Remote skipped: Auth/Connection status is '{self.remote_auth_status}'.", func_name=func_name, ) self.main_frame.show_warning("Action Blocked", msg) self.main_frame.update_status_bar(f"Pull failed: {self.remote_auth_status}") return # Il worker `run_pull_remote_async` otterrà il nome del branch corrente internamente log_handler.log_info( f"Starting pull for remote '{remote_name}'...", func_name=func_name ) # Argomenti per il worker: dipendenze (remote handler + git commands) + parametri repo args = (self.remote_action_handler, self.git_commands, svn_path, remote_name) self._start_async_operation( async_workers.run_pull_remote_async, # Worker esterno per pull args, { "context": "pull_remote", # Contesto per il risultato "status_msg": f"Pulling from remote '{remote_name}'", # Passiamo il path nel contesto in caso di conflitto "repo_path": svn_path, }, ) def push_remote(self): """Starts the asynchronous 'git push' operation for the current branch.""" func_name = "push_remote" log_handler.log_info( f"--- Action Triggered: Push Branch to Remote ---", func_name=func_name ) # Validazioni if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return svn_path = self._get_and_validate_svn_path("Push Branch") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Push Branch skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) self.main_frame.update_status_bar("Push failed: Repo not ready.") return remote_name = self.main_frame.remote_name_var.get().strip() if not remote_name: remote_name = DEFAULT_REMOTE_NAME self.main_frame.remote_name_var.set(remote_name) # Verifica stato autenticazione if self.remote_auth_status != "ok": # (... Messaggio di avviso/blocco come in pull_remote ...) msg = f"Cannot Push to '{remote_name}':\n" if self.remote_auth_status == "required": msg += ( "Authentication is required. Use 'Check Connection / Auth' first." ) elif self.remote_auth_status == "failed": msg += "Authentication previously failed. Use 'Check Connection / Auth' to retry." elif self.remote_auth_status == "connection_failed": msg += "Connection previously failed. Check URL and network." else: msg += "Connection status is unknown or in error. Use 'Check Connection / Auth' first." log_handler.log_warning( f"Push Remote skipped: Auth/Connection status is '{self.remote_auth_status}'.", func_name=func_name, ) self.main_frame.show_warning("Action Blocked", msg) self.main_frame.update_status_bar(f"Push failed: {self.remote_auth_status}") return # Verifica se ci sono modifiche non committate (opzionale ma buona pratica) try: if self.git_commands.git_status_has_changes(svn_path): if not self.main_frame.ask_yes_no( "Uncommitted Changes", "There are uncommitted changes in your working directory.\nPush anyway? (Only commited changes will be pushed)", ): self.main_frame.update_status_bar( "Push cancelled by user (uncommitted changes)." ) return except GitCommandError as status_err: log_handler.log_error( f"Push aborted: Failed to check repository status before push: {status_err}", func_name=func_name, ) self.main_frame.show_error( "Status Error", f"Could not check repo status:\n{status_err}" ) return log_handler.log_info( f"Starting push for current branch to remote '{remote_name}'...", func_name=func_name, ) # Il worker `run_push_remote_async` otterrà il nome del branch corrente # Argomenti per il worker: dipendenze + parametri repo args = (self.remote_action_handler, self.git_commands, svn_path, remote_name) self._start_async_operation( async_workers.run_push_remote_async, # Worker esterno per push args, { "context": "push_remote", # Contesto per il risultato "status_msg": f"Pushing current branch to remote '{remote_name}'", # Passiamo il nome remoto nel contesto per usarlo nei messaggi di errore/rifiuto "remote_name": remote_name, }, ) def push_tags_remote(self): """Starts the asynchronous 'git push --tags' operation.""" func_name = "push_tags_remote" log_handler.log_info( f"--- Action Triggered: Push Tags to Remote ---", func_name=func_name ) # Validazioni if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return svn_path = self._get_and_validate_svn_path("Push Tags") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Push Tags skipped: Repo not ready.", func_name=func_name ) self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) self.main_frame.update_status_bar("Push tags failed: Repo not ready.") return remote_name = self.main_frame.remote_name_var.get().strip() if not remote_name: remote_name = DEFAULT_REMOTE_NAME self.main_frame.remote_name_var.set(remote_name) # Verifica stato autenticazione if self.remote_auth_status != "ok": # (... Messaggio di avviso/blocco come in push_remote ...) msg = f"Cannot Push Tags to '{remote_name}':\n" # (...) Dettagli messaggio in base a self.remote_auth_status (...) log_handler.log_warning( f"Push Tags skipped: Auth/Connection status is '{self.remote_auth_status}'.", func_name=func_name, ) self.main_frame.show_warning("Action Blocked", msg) self.main_frame.update_status_bar( f"Push tags failed: {self.remote_auth_status}" ) return # Chiedi conferma perché push --tags invia tutti i tag locali if not self.main_frame.ask_yes_no( "Confirm Push Tags", f"Push all local tags to remote '{remote_name}'?\n(Existing tags on the remote with the same name will NOT be overwritten unless forced, which this action does not do).", ): self.main_frame.update_status_bar("Push tags cancelled by user.") return log_handler.log_info( f"Starting push tags to remote '{remote_name}'...", func_name=func_name ) # Argomenti per il worker: dipendenza + parametri args = (self.remote_action_handler, svn_path, remote_name) self._start_async_operation( async_workers.run_push_tags_async, # Worker esterno per push tags args, { "context": "push_tags_remote", # Contesto per il risultato "status_msg": f"Pushing tags to remote '{remote_name}'", "remote_name": remote_name, # Passa nome remoto per messaggi }, ) def clone_remote_repo(self): """Handles the 'Clone from Remote...' action: shows dialog, validates, starts worker.""" func_name = "clone_remote_repo" log_handler.log_info(f"--- Action Triggered: Clone Remote Repository ---", func_name=func_name) if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): log_handler.log_error("Cannot start clone: Main frame not available.", func_name=func_name) return # Mostra il dialogo modale per ottenere URL, directory padre e nome profilo dialog = CloneFromRemoteDialog(self.master) # dialog.result conterrà None se premuto Cancel, o (url, parent_dir, profile_name) se OK dialog_result = dialog.result if not dialog_result: log_handler.log_info("Clone operation cancelled by user in dialog.", func_name=func_name) self.main_frame.update_status_bar("Clone cancelled.") return # Estrai i dati dal risultato del dialogo remote_url, local_parent_dir, profile_name_input = dialog_result # --- Logica per derivare nomi e validare percorso finale --- final_profile_name = "" target_clone_dir = "" try: # Deriva il nome della directory del repository dall'URL repo_name_from_url = os.path.basename(remote_url) if repo_name_from_url.endswith(".git"): repo_name_from_url = repo_name_from_url[:-4] if not repo_name_from_url: # Se l'URL termina con / o è strano raise ValueError("Could not derive repository name from URL.") # Costruisci il percorso completo dove verrà clonato il repository target_clone_dir = os.path.join(local_parent_dir, repo_name_from_url) target_clone_dir = os.path.abspath(target_clone_dir) # Normalizza il percorso # Determina il nome finale del profilo if profile_name_input: final_profile_name = profile_name_input # Validazione aggiuntiva: assicurati che il nome profilo non esista già if final_profile_name in self.config_manager.get_profile_sections(): raise ValueError(f"Profile name '{final_profile_name}' already exists. Please choose a different name.") else: # Usa il nome derivato dall'URL come nome profilo, verificando non esista final_profile_name = repo_name_from_url counter = 1 while final_profile_name in self.config_manager.get_profile_sections(): final_profile_name = f"{repo_name_from_url}_{counter}" counter += 1 log_handler.log_debug(f"Derived target clone directory: {target_clone_dir}", func_name=func_name) log_handler.log_debug(f"Determined profile name: {final_profile_name}", func_name=func_name) # --- CONTROLLO FONDAMENTALE: La directory di destinazione esiste già? --- if os.path.exists(target_clone_dir): # Non clonare se la directory esiste (git clone fallirebbe comunque) error_msg = f"Clone failed: Target directory already exists:\n{target_clone_dir}\nPlease choose a different parent directory or ensure the target is clear." log_handler.log_error(error_msg, func_name=func_name) self.main_frame.show_error("Clone Path Error", error_msg) self.main_frame.update_status_bar("Clone failed: Target directory exists.") return # Interrompe l'operazione except ValueError as ve: # Errore nella derivazione nomi o validazione profilo log_handler.log_error(f"Clone configuration error: {ve}", func_name=func_name) self.main_frame.show_error("Configuration Error", str(ve)) self.main_frame.update_status_bar("Clone failed: Configuration error.") return except Exception as e: # Errore imprevisto durante la preparazione log_handler.log_exception(f"Unexpected error preparing for clone: {e}", func_name=func_name) self.main_frame.show_error("Internal Error", f"An unexpected error occurred:\n{e}") self.main_frame.update_status_bar("Clone failed: Internal error.") return # --- Avvia Worker Asincrono --- log_handler.log_info(f"Starting clone for '{remote_url}' into '{target_clone_dir}'...", func_name=func_name) # Argomenti per il worker: dipendenza + parametri args = (self.git_commands, remote_url, target_clone_dir, final_profile_name) self._start_async_operation( async_workers.run_clone_remote_async, # Worker esterno per clone args, { "context": "clone_remote", # Contesto per il risultato "status_msg": f"Cloning '{repo_name_from_url}'...", # Usa nome repo per status # Passiamo i dati necessari per la creazione del profilo nel contesto, # così _check_completion_queue può accedervi facilmente in caso di successo. "clone_success_data": { 'profile_name': final_profile_name, 'cloned_path': target_clone_dir, 'remote_url': remote_url } }, ) def refresh_remote_status(self): """Starts the async check for ahead/behind status.""" func_name = "refresh_remote_status" log_handler.log_info( f"--- Action Triggered: Refresh Remote Sync Status ---", func_name=func_name ) # Validazioni if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return svn_path = self._get_and_validate_svn_path("Refresh Sync Status") if not svn_path or not self._is_repo_ready(svn_path): log_handler.log_warning( "Refresh Status skipped: Repo not ready.", func_name=func_name ) # Aggiorna label a stato neutro/sconosciuto se repo non pronto if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status( status_text="Sync Status: (Repo not ready)" ) return # --- Ottieni branch corrente e upstream --- current_branch = None upstream_branch = None try: current_branch = self.git_commands.get_current_branch_name(svn_path) if current_branch: upstream_branch = self.git_commands.get_branch_upstream( svn_path, current_branch ) else: log_handler.log_warning("Refresh Status: Cannot get status, currently in detached HEAD state.", func_name=func_name) if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status(current_branch=None, status_text="Sync Status: (Detached HEAD)") return # Esce se detached if not upstream_branch: log_handler.log_info(f"Refresh Status: No upstream configured for branch '{current_branch}'.", func_name=func_name) if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status(current_branch=current_branch, status_text=f"Sync Status: Upstream not set") if hasattr(self.main_frame, "refresh_sync_status_button"): self.main_frame.refresh_sync_status_button.config(state=tk.DISABLED) # Se siamo qui, abbiamo branch e upstream, abilita il pulsante refresh (se era disabilitato) if hasattr(self.main_frame, "refresh_sync_status_button"): self.main_frame.refresh_sync_status_button.config(state=tk.NORMAL) except Exception as e: log_handler.log_exception( f"Error getting branch/upstream before status check: {e}", func_name=func_name, ) if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status(status_text="Sync Status: Error getting info") return # --- Avvia worker asincrono --- log_handler.log_info( f"Checking ahead/behind status for '{current_branch}' vs '{upstream_branch}'...", func_name=func_name, ) # Aggiorna label GUI a "checking..." if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status(current_branch=current_branch, status_text="Sync Status: Checking...") args = (self.git_commands, svn_path, current_branch, upstream_branch) self._start_async_operation( async_workers.run_get_ahead_behind_async, # Worker esterno args, { "context": "get_ahead_behind", # Contesto per il risultato "status_msg": f"Checking sync status for '{current_branch}'", # Passa nomi branch nel contesto per riferimento nel risultato "local_branch": current_branch, "upstream_branch": upstream_branch, }, ) def _update_gui_auth_status(self, status: str): """Updates internal state and calls GUI update for auth indicator.""" self.remote_auth_status = status # Aggiorna stato interno if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): # Chiama il metodo della GUI per aggiornare il label self.main_frame._update_auth_status_indicator(status) if status != 'ok' and hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status(status_text=f"Sync Status: ({status})") # --- ==== Gestione Coda Risultati ==== --- def _check_completion_queue(self, results_queue: queue.Queue, context: dict): """Checks result queue from async workers, updates GUI accordingly.""" task_context = context.get('context', 'unknown') # func_name per i log interni a questa funzione func_name = "_check_completion_queue" # log_handler.log_debug(f"Checking completion queue for context: {task_context}", func_name=func_name) # Log inizio check (opzionale) try: # Tenta di ottenere un risultato dalla coda senza bloccare result_data = results_queue.get_nowait() log_handler.log_info(f"Result received for '{task_context}'. Status: {result_data.get('status')}", func_name=func_name) # --- Determina se riabilitare subito i widget --- should_reenable_now = True # Default: riabilita subito status_from_result = result_data.get('status') # Ottieni lo stato dal risultato # Non riabilitare subito se l'utente deve interagire o se parte un'altra azione if task_context == "check_connection" and status_from_result == 'auth_required': should_reenable_now = False log_handler.log_debug("Delaying widget re-enable: waiting for auth prompt.", func_name=func_name) elif task_context == "interactive_auth" and status_from_result == 'success': should_reenable_now = False log_handler.log_debug("Delaying widget re-enable: re-checking connection after interactive auth.", func_name=func_name) elif task_context == 'clone_remote' and status_from_result == 'success': # Non riabilitare dopo clone successo, il caricamento profilo gestirà lo stato should_reenable_now = False log_handler.log_debug("Delaying widget re-enable: profile load will handle state after clone.", func_name=func_name) # Riabilita i widget se non è necessario attendere if should_reenable_now: if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): log_handler.log_debug("Re-enabling widgets.", func_name=func_name) self.main_frame.set_action_widgets_state(tk.NORMAL) else: # Se la GUI non c'è, non c'è nulla da fare o riabilitare log_handler.log_warning("Cannot re-enable widgets, MainFrame missing.", func_name=func_name) return # Esce dalla funzione # --- Estrai dettagli dal risultato --- status = status_from_result # Usa la variabile già ottenuta message = result_data.get('message', "Operation finished.") # Messaggio di default result_value = result_data.get('result') # Valore specifico del risultato exception = result_data.get('exception') # Eventuale eccezione catturata committed = result_data.get('committed', False) # Flag per operazioni che committano # Estrai flag conflitto specifico per pull o fetch_bundle is_conflict = False # Default repo_path_conflict = None # Default if task_context == 'pull_remote': is_conflict = (status == 'conflict') # Determinato dallo stato specifico repo_path_conflict = context.get('repo_path') # Path passato nel contesto originale elif task_context == 'fetch_bundle': is_conflict = result_data.get('conflict', False) # Dipende da flag nel risultato repo_path_conflict = result_data.get('repo_path') # Path nel risultato # Estrai flag rifiuto e nome branch per push is_rejected = False # Default rejected_branch = None # Default if task_context == 'push_remote': is_rejected = (status == 'rejected') # Determinato dallo stato specifico rejected_branch = result_data.get('branch_name') # Nome branch passato nel risultato # Altri dati dal contesto originale new_branch_context = context.get('new_branch_name') # Info se si crea branch remote_name_context = context.get("remote_name") # Nome remote dall'azione originale # --- Aggiorna Status Bar con colore e reset temporizzato --- status_color = None reset_duration = 5000 # Default reset 5 secondi if status == 'success': status_color = self.main_frame.STATUS_GREEN elif status == 'warning': status_color = self.main_frame.STATUS_YELLOW; reset_duration = 7000 elif status == 'auth_required': status_color = self.main_frame.STATUS_YELLOW; reset_duration = 15000 elif status == 'conflict': status_color = self.main_frame.STATUS_RED; reset_duration = 15000 elif status == 'rejected': status_color = self.main_frame.STATUS_RED; reset_duration = 15000 elif status == 'error': status_color = self.main_frame.STATUS_RED; reset_duration = 10000 # Aggiorna la status bar (usa la funzione helper della GUI) if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): # Non aggiornare la status bar immediatamente dopo un clone successo, # il caricamento del profilo lo farà. if not (task_context == 'clone_remote' and status == 'success'): self.main_frame.update_status_bar(message, bg_color=status_color, duration_ms=reset_duration) # --- Processa risultato specifico per task --- # Ottieni path corrente per eventuali refresh repo_path_for_refreshes = self._get_and_validate_svn_path("Post-Action Refresh Check") # Lista per raccogliere funzioni di refresh da chiamare alla fine refresh_list = [] # Flag per triggerare refresh stato sync post-azione post_action_sync_refresh_needed = False # --- Gestione specifica per check_connection e interactive_auth --- if task_context == "check_connection": remote_name = context.get("remote_name_checked", remote_name_context or "unknown remote") if status == 'success': auth_status = 'ok' log_handler.log_info(f"Connection check successful for '{remote_name}'.", func_name=func_name) self._update_gui_auth_status(auth_status) post_action_sync_refresh_needed = True # Aggiorna stato A/B dopo check OK elif status == 'auth_required': log_handler.log_warning(f"Authentication required for remote '{remote_name}'.", func_name=func_name) self._update_gui_auth_status('required') repo_path_checked = context.get("repo_path_checked") if repo_path_checked and hasattr(self,"main_frame") and self.main_frame.ask_yes_no( "Authentication Required", f"Authentication is required to connect to remote '{remote_name}'.\n\n" f"Do you want to attempt authentication now?\n" f"(This may open a separate terminal window for credential input.)" ): log_handler.log_info("User requested interactive authentication attempt.", func_name=func_name) args_interactive = (self.git_commands, repo_path_checked, remote_name) self._start_async_operation( async_workers.run_interactive_auth_attempt_async, args_interactive, { "context": "interactive_auth", "status_msg": f"Attempting interactive auth for '{remote_name}'", "original_context": context } ) # Non riabilitare widget qui else: log_handler.log_info("User declined interactive authentication attempt.", func_name=func_name) if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): self.main_frame.set_action_widgets_state(tk.NORMAL) elif status == 'error': error_type = result_value if result_value in ['connection_failed', 'unknown_error', 'worker_exception'] else 'unknown_error' self._update_gui_auth_status(error_type) if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status(status_text="Sync Status: Error") if hasattr(self, "main_frame"): self.main_frame.show_error("Connection Error", f"{message}") elif task_context == "interactive_auth": original_context = context.get("original_context", {}) remote_name = original_context.get("remote_name_checked", remote_name_context or "unknown remote") if status == 'success' and result_value == 'auth_attempt_success': log_handler.log_info(f"Interactive auth attempt for '{remote_name}' successful. Re-checking connection...", func_name=func_name) if hasattr(self, "main_frame"): self.main_frame.update_status_bar(f"Authentication successful. Checking status...") self.check_connection_auth() # Ri-avvia check silenzioso elif status == 'error': log_handler.log_warning(f"Interactive auth attempt for '{remote_name}' failed or error occurred: {message}", func_name=func_name) self._update_gui_auth_status('failed') if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status(status_text="Sync Status: Auth Failed") if hasattr(self, "main_frame"): self.main_frame.show_warning("Authentication Attempt Failed", f"{message}") if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): self.main_frame.set_action_widgets_state(tk.NORMAL) # --- Gestione specifica per PULL CONFLICT --- elif task_context == 'pull_remote' and status == 'conflict': log_handler.log_error(f"Merge conflict occurred during pull. User needs to resolve manually in '{repo_path_conflict}'.", func_name=func_name) if hasattr(self, "main_frame"): self.main_frame.show_error( "Merge Conflict", f"Merge conflict occurred during pull from '{remote_name_context or 'remote'}'.\n\n" f"Please resolve the conflicts manually in:\n{repo_path_conflict}\n\n" f"After resolving, stage the changes and commit them." ) if self.refresh_changed_files_list not in refresh_list: refresh_list.append(self.refresh_changed_files_list) if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status(status_text="Sync Status: Conflict") # --- Gestione specifica per PUSH REJECTED --- elif task_context == 'push_remote' and status == 'rejected': log_handler.log_error(f"Push rejected for branch '{rejected_branch}'. User needs to pull.", func_name=func_name) if hasattr(self, "main_frame"): self.main_frame.show_warning("Push Rejected", f"{message}") if self.fetch_remote not in refresh_list: refresh_list.append(self.fetch_remote) # Fetch aggiornerà stato sync # --- Gestione specifica per GET_AHEAD_BEHIND --- elif task_context == 'get_ahead_behind': local_branch_ctx = context.get("local_branch") if status == 'success': ahead, behind = result_value if isinstance(result_value, tuple) else (None, None) log_handler.log_info(f"Ahead/Behind status updated for '{local_branch_ctx}': Ahead={ahead}, Behind={behind}", func_name=func_name) if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status(current_branch=local_branch_ctx, ahead=ahead, behind=behind) elif status == 'error': log_handler.log_error(f"Failed to get ahead/behind status for '{local_branch_ctx}': {message}", func_name=func_name) if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status(current_branch=local_branch_ctx, status_text=f"Sync Status: Error") # --- Gestione specifica per CLONE_REMOTE --- elif task_context == 'clone_remote': if status == 'success': log_handler.log_info(f"Clone successful. Creating profile...", func_name=func_name) success_data = context.get('clone_success_data') or result_value if success_data and isinstance(success_data, dict): new_profile_name = success_data.get('profile_name') cloned_repo_path = success_data.get('cloned_path') cloned_remote_url = success_data.get('remote_url') if new_profile_name and cloned_repo_path and cloned_remote_url: try: defaults = self.config_manager._get_expected_keys_with_defaults() defaults['svn_working_copy_path'] = cloned_repo_path defaults['remote_url'] = cloned_remote_url defaults['remote_name'] = DEFAULT_REMOTE_NAME defaults['bundle_name'] = f"{new_profile_name}.bundle" defaults['bundle_name_updated'] = f"{new_profile_name}_update.bundle" defaults['autobackup'] = "False"; defaults['autocommit'] = "False" defaults['commit_message'] = "Initial commit check" self.config_manager.add_section(new_profile_name) for key, value in defaults.items(): self.config_manager.set_profile_option(new_profile_name, key, value) self.config_manager.save_config() log_handler.log_info(f"Profile '{new_profile_name}' created successfully for cloned repo.", func_name=func_name) # Aggiorna GUI e seleziona nuovo profilo (triggera load) sections = self.config_manager.get_profile_sections() if hasattr(self, "main_frame"): self.main_frame.update_profile_dropdown(sections) self.main_frame.profile_var.set(new_profile_name) # Non aggiorniamo status bar qui, load_profile_settings lo farà except Exception as profile_e: log_handler.log_exception(f"Clone successful, but failed to create profile '{new_profile_name}': {profile_e}", func_name=func_name) if hasattr(self, "main_frame"): self.main_frame.show_error("Profile Creation Error", f"Repository cloned, but failed to save profile '{new_profile_name}'.\nPlease add it manually.") self.main_frame.update_status_bar("Clone successful, but profile creation failed.") # Riabilita widget se la creazione profilo fallisce if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): self.main_frame.set_action_widgets_state(tk.NORMAL) else: log_handler.log_error("Clone successful, but missing data to create profile.", func_name=func_name) if hasattr(self, "main_frame"): self.main_frame.update_status_bar("Clone successful, but failed to retrieve data for profile creation.") if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): self.main_frame.set_action_widgets_state(tk.NORMAL) else: log_handler.log_error("Clone successful, but success data is missing or invalid in result.", func_name=func_name) if hasattr(self, "main_frame"): self.main_frame.update_status_bar("Clone successful, but internal data error occurred.") if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): self.main_frame.set_action_widgets_state(tk.NORMAL) elif status == 'error': # Clone fallito log_handler.log_error(f"Clone operation failed: {message}", func_name=func_name) if hasattr(self, "main_frame"): self.main_frame.show_error("Clone Error", f"{message}") # Widget già riabilitati all'inizio # --- Gestione risultati altri task (successo) --- elif status == 'success': # Determina quali refresh avviare e se aggiornare lo stato sync if task_context in ['prepare_repo', 'fetch_bundle', 'commit', 'create_tag', 'checkout_tag', 'create_branch', 'checkout_branch', '_handle_gitignore_save', 'add_file', 'apply_remote_config', 'fetch_remote', 'pull_remote', # Pull non-conflict 'push_remote', 'push_tags_remote', # Push non-rejected 'refresh_branches']: # Refresh branches richiede aggiornamento stato sync # Logica per popolare refresh_list if task_context == 'push_remote': if self.refresh_commit_history not in refresh_list: refresh_list.append(self.refresh_commit_history) if self.refresh_branch_list not in refresh_list: refresh_list.append(self.refresh_branch_list) post_action_sync_refresh_needed = True elif task_context == 'push_tags_remote': if self.refresh_tag_list not in refresh_list: refresh_list.append(self.refresh_tag_list) post_action_sync_refresh_needed = True elif task_context == 'pull_remote': if self.refresh_commit_history not in refresh_list: refresh_list.append(self.refresh_commit_history) if self.refresh_branch_list not in refresh_list: refresh_list.append(self.refresh_branch_list) if self.refresh_tag_list not in refresh_list: refresh_list.append(self.refresh_tag_list) if self.refresh_changed_files_list not in refresh_list: refresh_list.append(self.refresh_changed_files_list) post_action_sync_refresh_needed = True elif task_context == 'fetch_remote': if self.refresh_commit_history not in refresh_list: refresh_list.append(self.refresh_commit_history) if self.refresh_branch_list not in refresh_list: refresh_list.append(self.refresh_branch_list) if self.refresh_tag_list not in refresh_list: refresh_list.append(self.refresh_tag_list) post_action_sync_refresh_needed = True elif task_context == 'apply_remote_config': refresh_list.append(self.check_connection_auth) if self.refresh_commit_history not in refresh_list: refresh_list.append(self.refresh_commit_history) if self.refresh_branch_list not in refresh_list: refresh_list.append(self.refresh_branch_list) post_action_sync_refresh_needed = True elif task_context == 'checkout_branch' or task_context == 'checkout_tag': post_action_sync_refresh_needed = True if self.refresh_commit_history not in refresh_list: refresh_list.append(self.refresh_commit_history) if self.refresh_branch_list not in refresh_list: refresh_list.append(self.refresh_branch_list) if self.refresh_tag_list not in refresh_list: refresh_list.append(self.refresh_tag_list) if self.refresh_changed_files_list not in refresh_list: refresh_list.append(self.refresh_changed_files_list) elif task_context == 'create_branch' and not new_branch_context: post_action_sync_refresh_needed = True if self.refresh_commit_history not in refresh_list: refresh_list.append(self.refresh_commit_history) if self.refresh_branch_list not in refresh_list: refresh_list.append(self.refresh_branch_list) elif task_context == 'refresh_branches': # Caso specifico refresh branches post_action_sync_refresh_needed = True # Serve aggiornare lo stato sync # Logica refresh per le altre azioni locali else: if committed or task_context in ['fetch_bundle','prepare_repo','create_tag','_handle_gitignore_save']: if self.refresh_commit_history not in refresh_list: refresh_list.append(self.refresh_commit_history) if task_context != 'refresh_changes': if self.refresh_changed_files_list not in refresh_list: refresh_list.append(self.refresh_changed_files_list) if task_context not in ['refresh_tags','checkout_tag'] or committed: if self.refresh_tag_list not in refresh_list: refresh_list.append(self.refresh_tag_list) if task_context not in ['refresh_branches', 'checkout_branch']: if self.refresh_branch_list not in refresh_list: refresh_list.append(self.refresh_branch_list) # --- Aggiornamenti diretti GUI (per i task di refresh stessi) --- elif task_context == 'refresh_tags': if hasattr(self, "main_frame"): self.main_frame.update_tag_list(result_value if isinstance(result_value, list) else []) elif task_context == 'refresh_branches': # Già gestito sopra per triggerare post_action_sync_refresh_needed branches, current = result_value if isinstance(result_value, tuple) and len(result_value) == 2 else ([], None) if hasattr(self, "main_frame"): self.main_frame.update_branch_list(branches, current) self.main_frame.update_history_branch_filter(branches) elif task_context == 'refresh_history': if hasattr(self, "main_frame"): self.main_frame.update_history_display(result_value if isinstance(result_value, list) else []) elif task_context == 'refresh_changes': if hasattr(self, "main_frame"): self.main_frame.update_changed_files_list(result_value if isinstance(result_value, list) else []) # --- Azioni post-successo specifiche --- if task_context == 'commit' and committed: if hasattr(self, "main_frame"): self.main_frame.clear_commit_message() if task_context == 'create_branch' and new_branch_context: if hasattr(self, "main_frame") and self.main_frame.ask_yes_no("Checkout?", f"Switch to new branch '{new_branch_context}'?"): self.checkout_branch(branch_to_checkout=new_branch_context, repo_path_override=repo_path_for_refreshes) post_action_sync_refresh_needed = False # Verrà fatto dopo il checkout # Se non fa checkout, i refresh sono già in lista e post_action_sync_refresh_needed è True elif status == 'warning': # Gestione warning generica: mostra popup if hasattr(self, "main_frame"): self.main_frame.show_warning("Operation Info", message) # Logica specifica per warning "already prepared" if "already prepared" in message: if self.refresh_changed_files_list not in refresh_list: refresh_list.append(self.refresh_changed_files_list) if self.refresh_branch_list not in refresh_list: refresh_list.append(self.refresh_branch_list) post_action_sync_refresh_needed = True elif status == 'error': # Gestione errori generica (esclusi contesti speciali gestiti sopra) log_handler.log_error(f"Error reported for task '{task_context}': {message}", func_name=func_name) error_details = f"{message}\n({type(exception).__name__}: {exception})" if exception else message # Gestione errore per fetch_remote, pull (non conflitto), push (non rifiuto), push_tags, apply_config if task_context in ['fetch_remote', 'pull_remote', 'push_remote', 'push_tags_remote', 'apply_remote_config']: auth_related_error = False; conn_related_error = False if isinstance(exception, GitCommandError) and exception.stderr: stderr_low = exception.stderr.lower(); if any(e in stderr_low for e in ["authentication failed", "permission denied", "could not read"]): auth_related_error = True; if any(e in stderr_low for e in ["repository not found", "could not resolve host"]): conn_related_error = True if auth_related_error: self._update_gui_auth_status('failed') elif conn_related_error: self._update_gui_auth_status('connection_failed') else: self._update_gui_auth_status('unknown_error') action_name = task_context.replace("_remote", "").replace("_", " ").title() if hasattr(self, "main_frame"): self.main_frame.show_error(f"{action_name} Error", f"{message}") if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status(status_text="Sync Status: Error") # Gestione errori per altri task else: if is_conflict and repo_path_conflict and task_context == 'fetch_bundle': if hasattr(self, "main_frame"): self.main_frame.show_error("Merge Conflict", f"Conflict occurred during bundle fetch.\nResolve in:\n{repo_path_conflict}\nThen commit.") elif exception and "Uncommitted changes" in str(exception): if hasattr(self, "main_frame"): self.main_frame.show_warning("Action Blocked", f"{exception}\nCommit or stash first.") else: if hasattr(self, "main_frame"): self.main_frame.show_error("Error: Operation Failed", error_details) # Aggiorna liste GUI con stato errore if task_context == 'refresh_tags': if hasattr(self, "main_frame"): self.main_frame.update_tag_list([("(Error)", "")]) elif task_context == 'refresh_branches': if hasattr(self, "main_frame"): self.main_frame.update_branch_list([], None) self.main_frame.update_history_branch_filter([]) if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status(status_text="Sync Status: Error") elif task_context == 'refresh_history': if hasattr(self, "main_frame"): self.main_frame.update_history_display(["(Error retrieving history)"]) elif task_context == 'refresh_changes': if hasattr(self, "main_frame"): self.main_frame.update_changed_files_list(["(Error refreshing changes)"]) # Non serve aggiornare stato auth/sync per errori locali generici # --- Trigger finale dei refresh asincroni raccolti --- # (Spostato dopo tutta la logica if/elif/else sullo stato) if repo_path_for_refreshes and refresh_list: log_handler.log_debug(f"Triggering {len(refresh_list)} async refreshes after '{task_context}'", func_name=func_name) current_delay = 50 # Ritardo base for refresh_func in refresh_list: try: self.master.after(current_delay, refresh_func) current_delay += 50 # Scaletta leggermente i refresh except Exception as ref_e: log_handler.log_error(f"Error scheduling {getattr(refresh_func, '__name__', 'refresh function')}: {ref_e}", func_name=func_name) # Usa l'ultimo delay per il refresh dello stato sync delay_ms = current_delay elif refresh_list: log_handler.log_warning("Cannot trigger post-action UI refreshes: Repo path unavailable.", func_name=func_name) delay_ms = 50 # Resetta delay se non ci sono refresh standard else: delay_ms = 50 # Resetta delay se non ci sono refresh standard # Triggera refresh stato ahead/behind SE necessario e non già in refresh_list if post_action_sync_refresh_needed and self.refresh_remote_status not in refresh_list: current_repo_path_sync = self._get_and_validate_svn_path("Post-Action Sync Status Check") if current_repo_path_sync: log_handler.log_debug(f"Triggering remote sync status refresh after '{task_context}'.", func_name=func_name) self.master.after(delay_ms + 50, self.refresh_remote_status) # Aggiunge un ulteriore piccolo delay # Log finale solo se non è stata gestita una ricorsione/nuovo avvio if should_reenable_now: log_handler.log_debug(f"Finished processing result for context '{task_context}'.", func_name=func_name) except queue.Empty: # Coda vuota, riprogramma check se la finestra esiste ancora if hasattr(self, "master") and self.master.winfo_exists(): self.master.after(self.ASYNC_QUEUE_CHECK_INTERVAL_MS, self._check_completion_queue, results_queue, context) except Exception as e: # Errore critico nel processare la coda stessa log_handler.log_exception(f"Critical error processing completion queue for {task_context}: {e}", func_name=func_name) # Tenta recupero GUI try: if hasattr(self, "main_frame") and self.main_frame.winfo_exists(): self.main_frame.set_action_widgets_state(tk.NORMAL) # Tenta riabilitazione self.main_frame.update_status_bar("Error processing async result.", bg_color=self.main_frame.STATUS_RED, duration_ms=10000) if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status(status_text="Sync Status: Error") except Exception as recovery_e: log_handler.log_error(f"Failed to recover GUI after queue processing error: {recovery_e}", func_name=func_name) # --- Helper Methods (interni alla classe) --- def _generate_next_tag_suggestion(self, svn_path: str) -> str: """Generates a suggested tag name based on the latest v.X.X.X.X tag.""" # (Metodo reintegrato e adattato per log_handler nel passo precedente) func_name = "_generate_next_tag_suggestion" log_handler.log_debug("Generating next tag suggestion...", func_name=func_name) default_suggestion = "v.0.0.0.1" latest_valid_tag = None tag_pattern = re.compile(r"^v\.(\d+)\.(\d+)\.(\d+)\.(\d+)$") try: tags_data = self.git_commands.list_tags(svn_path) if not tags_data: log_handler.log_debug( "No existing tags found. Suggesting default.", func_name=func_name ) return default_suggestion for tag_name, _ in tags_data: match = tag_pattern.match(tag_name) if match: latest_valid_tag = tag_name log_handler.log_debug( f"Found latest tag matching pattern: {latest_valid_tag}", func_name=func_name, ) break # Trovato il più recente if not latest_valid_tag: log_handler.log_debug( "No tags matched the pattern v.X.X.X.X. Suggesting default.", func_name=func_name, ) return default_suggestion match = tag_pattern.match(latest_valid_tag) if not match: log_handler.log_error( f"Internal error: Could not re-match tag {latest_valid_tag}", func_name=func_name, ) return default_suggestion # Incrementa con riporto a 99 v1, v2, v3, v4 = map(int, match.groups()) limit = 99 v4 += 1 if v4 > limit: v4 = 0 v3 += 1 if v3 > limit: v3 = 0 v2 += 1 if v2 > limit: v2 = 0 v1 += 1 next_tag = f"v.{v1}.{v2}.{v3}.{v4}" log_handler.log_debug( f"Generated suggestion: {next_tag}", func_name=func_name ) return next_tag except Exception as e: log_handler.log_exception( f"Error generating tag suggestion: {e}", func_name=func_name ) return default_suggestion # --- Punto di Ingresso (main) --- def main(): """Main entry point for the application.""" # Non configuriamo il logging qui, ci pensa GitSvnSyncApp root = None app = None try: print("Creating Tkinter root window...") root = tk.Tk() root.minsize(850, 750) # Manteniamo dimensioni minime print("Tkinter root window created.") print("Initializing GitSvnSyncApp...") # L'init ora crea tutti gli handler necessari app = GitSvnSyncApp(root) print("GitSvnSyncApp initialization attempt complete.") # Avvia il main loop solo se l'inizializzazione della GUI è andata a buon fine if ( hasattr(app, "main_frame") and app.main_frame and app.main_frame.winfo_exists() ): print("Starting Tkinter main event loop.") root.mainloop() print("Tkinter main event loop finished.") else: # Se main_frame non esiste, l'init è fallito e ha già mostrato errore/chiuso print("CRITICAL: App init failed before mainloop could start. Exiting.") if root and root.winfo_exists(): # Assicura chiusura finestra root se ancora esiste try: root.destroy() except: pass except Exception as e: # Cattura eccezioni gravissime durante startup o mainloop print(f"FATAL error during application execution: {e}") traceback.print_exc() # Stampa traceback su console # Tenta di mostrare errore GUI come ultima risorsa try: parent = root if root and root.winfo_exists() else None messagebox.showerror( "Fatal Application Error", f"Application failed unexpectedly:\n{e}", parent=parent, ) except Exception as msg_e: print(f"FATAL (GUI error message failed: {msg_e}):\n{e}") finally: print("Application exiting.") if __name__ == "__main__": main() # --- END OF FILE GitUtility.py ---