# --- FILE: gitsync_tool/async_tasks/async_result_handler.py --- import tkinter as tk from tkinter import messagebox # Importa messagebox import queue import os import logging # Per livelli from typing import ( TYPE_CHECKING, Dict, Any, List, Tuple, Callable, Optional, Set, ) # Aggiunto Set # ---<<< MODIFICA IMPORT >>>--- # Usa percorsi assoluti per importare moduli dal pacchetto from gitutility.logging_setup import log_handler from gitutility.commands.git_commands import GitCommandError # Importa async_workers (necessario per schedulare task interattivo) from gitutility.async_tasks import async_workers # Importa costanti di configurazione from gitutility.config.config_manager import DEFAULT_REMOTE_NAME # Forward Reference per Type Hinting (Aggiornato percorsi) if TYPE_CHECKING: from gitutility.app import GitSvnSyncApp # Importa MainFrame dal nuovo percorso from gitutility.gui.main_frame import MainFrame # ---<<< FINE MODIFICA IMPORT >>>--- class AsyncResultHandler: """ Handles processing the results received from asynchronous worker functions. Updates the GUI based on the outcome of background tasks (e.g., populating lists, showing messages) and triggers necessary follow-up actions like GUI refreshes or subsequent asynchronous tasks. Separates result processing logic from the main application controller. """ def __init__(self, app: "GitSvnSyncApp"): """ Initializes the result handler. Args: app (GitSvnSyncApp): The main application instance, used to access GUI elements and other application methods. Raises: ValueError: If the provided 'app' instance is invalid or missing main_frame. """ # Validate the main application instance passed if app is None: raise ValueError("AsyncResultHandler requires a valid 'app' instance.") # Ensure the main_frame is already initialized on the app instance if not hasattr(app, "main_frame") or not app.main_frame: raise ValueError( "AsyncResultHandler requires the app's main_frame to be initialized before instantiation." ) # Store references to the app and its main frame self.app: "GitSvnSyncApp" = app # ---<<< MODIFICA: Assicurati che il type hint per main_frame sia corretto >>>--- self.main_frame: "MainFrame" = app.main_frame # Usa il forward reference # ---<<< FINE MODIFICA >>>--- log_handler.log_debug("AsyncResultHandler initialized.", func_name="__init__") def process(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> None: """ Main entry point for processing results from asynchronous workers. Dispatches the result to the appropriate handler method based on the provided context. Schedules subsequent GUI refreshes if needed. Args: result_data (Dict[str, Any]): The dictionary returned by the worker thread. Expected keys: 'status', 'message', 'result', 'exception'. context (Dict[str, Any]): The context dictionary passed when the async operation was initially started. Expected keys: 'context', 'status_msg', etc. """ # Extract task context and status from the received data task_context: str = context.get("context", "unknown") status: str = result_data.get( "status", "error" ) # Default to 'error' if missing func_name: str = f"process (ctx: {task_context})" # For logging log_handler.log_debug( f"Processing async result for context '{task_context}' with status '{status}'.", func_name=func_name, ) # --- Determine Refresh Needs --- # Flags set by specific handler methods, default to no refresh should_trigger_refreshes: bool = False post_action_sync_refresh_needed: bool = False # For ahead/behind status # --- Dispatch to Appropriate Handler Method --- # Map context strings to their corresponding handler methods handler_map: Dict[str, Callable] = { "compare_branches": self._handle_compare_branches_result, "refresh_tags": self._handle_refresh_tags_result, "refresh_branches": self._handle_refresh_local_branches_result, "refresh_history": self._handle_refresh_history_result, "refresh_changes": self._handle_refresh_changes_result, "refresh_remote_branches": self._handle_refresh_remote_branches_result, "get_ahead_behind": self._handle_get_ahead_behind_result, "prepare_repo": self._handle_prepare_repo_result, "create_bundle": self._handle_create_bundle_result, "fetch_bundle": self._handle_fetch_bundle_result, "manual_backup": self._handle_manual_backup_result, "commit": self._handle_commit_result, "_handle_gitignore_save": self._handle_untrack_result, # Context for untrack after save "add_file": self._handle_add_file_result, "create_tag": self._handle_create_tag_result, "checkout_tag": self._handle_checkout_tag_result, "create_branch": self._handle_create_branch_result, "checkout_branch": self._handle_checkout_branch_result, "delete_local_branch": self._handle_delete_local_branch_result, "merge_local_branch": self._handle_merge_local_branch_result, "check_connection": self._handle_check_connection_result, "interactive_auth": self._handle_interactive_auth_result, "apply_remote_config": self._handle_apply_remote_config_result, "fetch_remote": self._handle_fetch_remote_result, "pull_remote": self._handle_pull_remote_result, "push_remote": self._handle_push_remote_result, "push_tags_remote": self._handle_push_tags_remote_result, "clone_remote": self._handle_clone_remote_result, "checkout_tracking_branch": self._handle_checkout_tracking_branch_result, "get_commit_details": self._handle_get_commit_details_result, "update_wiki": self._handle_generic_result, "revert_to_tag": self._handle_revert_to_tag_result, } # Get the handler method from the map handler_method: Optional[Callable] = handler_map.get(task_context) # --- Execute the Handler --- if handler_method and callable(handler_method): log_handler.log_debug( f"Dispatching to handler: {handler_method.__name__}", func_name=func_name, ) # Call the specific handler method # It's expected to return a tuple: (should_trigger_refreshes, post_action_sync_refresh_needed) refresh_flags: Any = handler_method(result_data, context) # Unpack the refresh flags if isinstance(refresh_flags, tuple) and len(refresh_flags) == 2: should_trigger_refreshes, post_action_sync_refresh_needed = ( refresh_flags ) else: # Log warning if handler didn't return expected flags log_handler.log_warning( f"Handler '{handler_method.__name__}' for context '{task_context}' did not return the expected refresh flags tuple (bool, bool). Defaulting to no refresh.", func_name=func_name, ) should_trigger_refreshes = False post_action_sync_refresh_needed = False else: # Fallback for unhandled contexts log_handler.log_warning( f"No specific handler method found for context: '{task_context}'. Using generic handler.", func_name=func_name, ) # Call generic handler (returns default False, False for refreshes) should_trigger_refreshes, post_action_sync_refresh_needed = ( self._handle_generic_result(result_data, context) ) # --- Trigger Necessary GUI Refreshes (if needed) --- # This happens *after* the specific result has been processed if should_trigger_refreshes: log_handler.log_debug( f"Triggering post-action refreshes for context '{task_context}'. Sync refresh needed: {post_action_sync_refresh_needed}", func_name=func_name, ) self._trigger_post_action_refreshes(post_action_sync_refresh_needed) else: log_handler.log_debug( f"No standard post-action refreshes required for context '{task_context}'.", func_name=func_name, ) # --- Handler Methods for Refresh Tasks --- # (Metodi _handle_refresh_... , _handle_get_ahead_behind - INVARIATI rispetto a prima) # Assicurati che chiamino i metodi corretti su self.main_frame e self.app def _handle_refresh_tags_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_refresh_tags_result" status: Optional[str] = result_data.get("status") result_value: Any = result_data.get("result") if not hasattr(self.main_frame, "update_tag_list"): log_handler.log_error( "MainFrame missing 'update_tag_list'.", func_name=func_name ) return False, False if status == "success": tags_list: list = result_value if isinstance(result_value, list) else [] self.main_frame.update_tag_list(tags_list) log_handler.log_debug( f"Tag list updated ({len(tags_list)} items).", func_name=func_name ) elif status == "error": log_handler.log_error( "Error reported during tag refresh.", func_name=func_name ) self.main_frame.update_tag_list([("(Error)", "")]) return False, False def _handle_refresh_local_branches_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato, usa self.app.current_local_branch) func_name: str = "_handle_refresh_local_branches_result" status: Optional[str] = result_data.get("status") result_value: Any = result_data.get("result") gui_methods_exist: bool = hasattr( self.main_frame, "update_branch_list" ) and hasattr(self.main_frame, "update_history_branch_filter") if not gui_methods_exist: log_handler.log_error( "MainFrame missing update methods for local branches.", func_name=func_name, ) return False, False branches: List[str] = [] current_branch: Optional[str] = None if status == "success": if isinstance(result_value, tuple) and len(result_value) == 2: branches, current_branch = result_value self.app.current_local_branch = current_branch log_handler.log_debug( f"Local branches updated. Current: {current_branch}", func_name=func_name, ) else: log_handler.log_warning( "Invalid result format for local branches refresh.", func_name=func_name, ) branches = ["(Invalid Data)"] current_branch = None self.app.current_local_branch = None elif status == "error": log_handler.log_error( "Error reported during local branches refresh.", func_name=func_name ) branches = ["(Error)"] current_branch = None self.app.current_local_branch = None self.main_frame.update_branch_list(branches, current_branch) self.main_frame.update_history_branch_filter(branches) if hasattr(self.app, "refresh_remote_status") and callable( self.app.refresh_remote_status ): log_handler.log_debug( "Scheduling remote status refresh after local branches update.", func_name=func_name, ) self.app.master.after(100, self.app.refresh_remote_status) else: log_handler.log_warning( "Could not schedule sync status refresh: method missing on app.", func_name=func_name, ) return False, False def _handle_refresh_history_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato, usa Treeview ora) func_name: str = "_handle_refresh_history_result" status: Optional[str] = result_data.get("status") result_value: Any = result_data.get("result") if not hasattr(self.main_frame, "update_history_display"): log_handler.log_error( "MainFrame missing 'update_history_display'.", func_name=func_name ) return False, False if status == "success": log_lines: list = result_value if isinstance(result_value, list) else [] self.main_frame.update_history_display(log_lines) log_handler.log_debug( f"History display updated ({len(log_lines)} lines).", func_name=func_name, ) elif status == "error": log_handler.log_error( "Error reported during history refresh.", func_name=func_name ) self.main_frame.update_history_display(["(Error retrieving history)"]) return False, False def _handle_refresh_changes_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_refresh_changes_result" status: Optional[str] = result_data.get("status") result_value: Any = result_data.get("result") if not hasattr(self.main_frame, "update_changed_files_list"): log_handler.log_error( "MainFrame missing 'update_changed_files_list'.", func_name=func_name ) return False, False if status == "success": files_list: list = result_value if isinstance(result_value, list) else [] self.main_frame.update_changed_files_list(files_list) log_handler.log_debug( f"Changed files list updated ({len(files_list)} items).", func_name=func_name, ) elif status == "error": log_handler.log_error( "Error reported during changes refresh.", func_name=func_name ) self.main_frame.update_changed_files_list(["(Error refreshing changes)"]) return False, False def _handle_refresh_remote_branches_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_refresh_remote_branches_result" status: Optional[str] = result_data.get("status") result_value: Any = result_data.get("result") if not hasattr(self.main_frame, "update_remote_branches_list"): log_handler.log_error( "MainFrame missing 'update_remote_branches_list'.", func_name=func_name ) return False, False if status == "success": branch_list: list = ( result_value if isinstance(result_value, list) else ["(Invalid Data)"] ) self.main_frame.update_remote_branches_list(branch_list) log_handler.log_debug( f"Remote branches list updated ({len(branch_list)} items).", func_name=func_name, ) elif status == "error": log_handler.log_error( "Error reported during remote branches refresh.", func_name=func_name ) self.main_frame.update_remote_branches_list(["(Error)"]) if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status( status_text="Sync Status: Unknown" ) return False, False def _handle_get_ahead_behind_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_get_ahead_behind_result" status: Optional[str] = result_data.get("status") result_value: Any = result_data.get("result") message: Optional[str] = result_data.get("message") local_branch_ctx: Optional[str] = context.get("local_branch") if not hasattr(self.main_frame, "update_ahead_behind_status"): log_handler.log_error( "MainFrame missing 'update_ahead_behind_status'.", func_name=func_name ) return False, False if status == "success": ahead, behind = ( result_value if isinstance(result_value, tuple) else (None, None) ) log_handler.log_info( f"Ahead/Behind status for '{local_branch_ctx}': Ahead={ahead}, Behind={behind}", func_name=func_name, ) self.main_frame.update_ahead_behind_status( current_branch=local_branch_ctx, ahead=ahead, behind=behind ) elif status == "error": log_handler.log_error( f"Failed getting ahead/behind for '{local_branch_ctx}': {message}", func_name=func_name, ) error_text: str = ( f"Sync Status: Error" if not message else f"Sync Status: Error ({message})" ) self.main_frame.update_ahead_behind_status( current_branch=local_branch_ctx, status_text=error_text ) return False, False # --- Handler Methods for Local Git Actions --- # (Metodi _handle_... INVARIATI rispetto a prima) # Assicurati che chiamino i metodi corretti su self.main_frame e self.app def _handle_prepare_repo_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_prepare_repo_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info( "Repository preparation successful.", func_name=func_name ) trigger_refreshes = True sync_refresh = True elif status == "warning": log_handler.log_warning( f"Prepare repo warning: {message}", func_name=func_name ) if hasattr(self.main_frame, "show_info"): self.main_frame.show_info("Prepare Info", message) trigger_refreshes = True sync_refresh = True elif status == "error": log_handler.log_error( f"Repository preparation failed: {message}", func_name=func_name ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error("Prepare Error", f"Failed:\n{message}") if hasattr(self.app, "update_svn_status_indicator") and hasattr( self.app.main_frame, "svn_path_entry" ): self.app.master.after( 50, lambda: self.app.update_svn_status_indicator( self.app.main_frame.svn_path_entry.get() ), ) return trigger_refreshes, sync_refresh def _handle_create_bundle_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_create_bundle_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") committed: bool = result_data.get("committed", False) trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info( f"Create bundle finished: {message}", func_name=func_name ) if committed: trigger_refreshes = True sync_refresh = True elif status == "error": log_handler.log_error( f"Create bundle failed: {message}", func_name=func_name ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error("Create Bundle Error", f"Failed:\n{message}") return trigger_refreshes, sync_refresh def _handle_fetch_bundle_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_fetch_bundle_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") is_conflict: bool = result_data.get("conflict", False) repo_path_conflict: Optional[str] = result_data.get("repo_path") trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info( f"Fetch from bundle successful: {message}", func_name=func_name ) trigger_refreshes = True sync_refresh = True elif status == "error": log_handler.log_error( f"Fetch from bundle failed: {message}", func_name=func_name ) if is_conflict and repo_path_conflict: log_handler.log_error( "Merge conflict detected during fetch from bundle.", func_name=func_name, ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error( "Merge Conflict", f"Conflict occurred during bundle fetch.\nResolve in:\n{repo_path_conflict}\nThen commit.", ) trigger_refreshes = True sync_refresh = False self._reenable_widgets_after_modal() else: if hasattr(self.main_frame, "show_error"): self.main_frame.show_error( "Fetch Bundle Error", f"Failed:\n{message}" ) if hasattr(self.app, "update_svn_status_indicator") and hasattr( self.app.main_frame, "svn_path_entry" ): self.app.master.after( 50, lambda: self.app.update_svn_status_indicator( self.app.main_frame.svn_path_entry.get() ), ) return trigger_refreshes, sync_refresh def _handle_manual_backup_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_manual_backup_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") if status == "success": log_handler.log_info( f"Manual backup finished: {message}", func_name=func_name ) # Optional: self.main_frame.show_info("Backup Complete", message) elif status == "error": log_handler.log_error( f"Manual backup failed: {message}", func_name=func_name ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error("Manual Backup Error", f"Failed:\n{message}") return False, False def _handle_commit_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_commit_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") committed: bool = result_data.get("committed", False) trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info( f"Commit operation finished: {message}", func_name=func_name ) if committed: if hasattr(self.main_frame, "clear_commit_message"): self.main_frame.clear_commit_message() trigger_refreshes = True sync_refresh = True elif status == "error": log_handler.log_error(f"Commit failed: {message}", func_name=func_name) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error("Commit Error", f"Failed:\n{message}") return trigger_refreshes, sync_refresh def _handle_untrack_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_untrack_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") committed: bool = result_data.get("committed", False) trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info( f"Untrack operation finished: {message}", func_name=func_name ) if committed: trigger_refreshes = True sync_refresh = True elif status == "error": log_handler.log_error(f"Untracking failed: {message}", func_name=func_name) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error("Untrack Error", f"Failed:\n{message}") return trigger_refreshes, sync_refresh def _handle_get_commit_details_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_get_commit_details_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") commit_details: Optional[Dict[str, Any]] = result_data.get("result") if status == "success": log_handler.log_info( "Commit details retrieved successfully. Requesting detail window display...", func_name=func_name, ) if isinstance(commit_details, dict): if hasattr(self.app, "show_commit_details") and callable( self.app.show_commit_details ): self.app.show_commit_details(commit_details) else: log_handler.log_error( "Cannot display commit details: 'show_commit_details' method missing on app.", func_name=func_name, ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error( "Internal Error", "Cannot display commit details window." ) self._reenable_widgets_after_modal() else: log_handler.log_error( f"Received success for commit details, but result is not dict: {type(commit_details)}", func_name=func_name, ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error( "Data Error", "Failed to process commit details." ) self._reenable_widgets_after_modal() elif status == "error": log_handler.log_error( f"Failed to get commit details: {message}", func_name=func_name ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error( "Commit Details Error", f"Could not get details:\n{message}" ) # Widgets already re-enabled return False, False def _handle_add_file_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_add_file_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info(f"Add file successful: {message}", func_name=func_name) trigger_refreshes = True elif status == "error": log_handler.log_error(f"Add file failed: {message}", func_name=func_name) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error("Add File Error", f"Failed:\n{message}") return trigger_refreshes, sync_refresh def _handle_create_tag_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_create_tag_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info( f"Tag creation successful: {message}", func_name=func_name ) trigger_refreshes = True elif status == "error": log_handler.log_error(f"Create tag failed: {message}", func_name=func_name) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error("Create Tag Error", f"Failed:\n{message}") return trigger_refreshes, sync_refresh def _handle_checkout_tag_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_checkout_tag_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") exception: Optional[Exception] = result_data.get("exception") trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info( f"Checkout tag successful: {message}", func_name=func_name ) trigger_refreshes = True sync_refresh = True elif status == "error": log_handler.log_error( f"Checkout tag failed: {message}", func_name=func_name ) if hasattr(self.main_frame, "show_error"): if exception and "Uncommitted changes" in str(exception): self.main_frame.show_warning( "Checkout Blocked", f"{message}\nCommit or stash first." ) else: self.main_frame.show_error( "Checkout Tag Error", f"Failed:\n{message}" ) self._reenable_widgets_after_modal() return trigger_refreshes, sync_refresh def _handle_create_branch_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_create_branch_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") new_branch_context: Optional[str] = context.get("new_branch_name") trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info( f"Create branch successful: {message}", func_name=func_name ) if new_branch_context and hasattr(self.main_frame, "ask_yes_no"): if self.main_frame.ask_yes_no( "Checkout?", f"Switch to new branch '{new_branch_context}'?" ): if hasattr(self.app, "checkout_branch") and callable( self.app.checkout_branch ): self.app.checkout_branch(branch_to_checkout=new_branch_context) trigger_refreshes = False sync_refresh = False # Let checkout handle refreshes else: log_handler.log_error( "Cannot checkout new branch: method missing on app.", func_name=func_name, ) trigger_refreshes = True else: trigger_refreshes = True else: trigger_refreshes = True elif status == "error": log_handler.log_error( f"Create branch failed: {message}", func_name=func_name ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error("Create Branch Error", f"Failed:\n{message}") return trigger_refreshes, sync_refresh def _handle_checkout_branch_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_checkout_branch_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") exception: Optional[Exception] = result_data.get("exception") trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info( f"Checkout branch successful: {message}", func_name=func_name ) trigger_refreshes = True sync_refresh = True elif status == "error": log_handler.log_error( f"Checkout branch failed: {message}", func_name=func_name ) if hasattr(self.main_frame, "show_error"): if exception and "Uncommitted changes" in str(exception): self.main_frame.show_warning( "Checkout Blocked", f"{message}\nCommit or stash first." ) else: self.main_frame.show_error( "Checkout Branch Error", f"Failed:\n{message}" ) self._reenable_widgets_after_modal() return trigger_refreshes, sync_refresh def _handle_delete_local_branch_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_delete_local_branch_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") exception: Optional[Exception] = result_data.get("exception") deleted_branch_name: Optional[str] = context.get("branch_name") trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info( f"Local branch '{deleted_branch_name}' deleted successfully.", func_name=func_name, ) trigger_refreshes = True elif status == "error": log_handler.log_error( f"Delete local branch '{deleted_branch_name}' failed: {message}", func_name=func_name, ) if hasattr(self.main_frame, "show_error"): stderr_text = ( getattr(exception, "stderr", "") if isinstance(exception, GitCommandError) else "" ) if "not fully merged" in message or "not fully merged" in stderr_text: self.main_frame.show_warning("Delete Warning", f"{message}") else: self.main_frame.show_error("Delete Error", f"{message}") return trigger_refreshes, sync_refresh def _handle_merge_local_branch_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_merge_local_branch_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") merged_into: Optional[str] = context.get("branch_merged_into") merged_from: Optional[str] = context.get("branch_merged_from") repo_path_conflict: Optional[str] = context.get("repo_path") trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info( f"Successfully merged '{merged_from}' into '{merged_into}'.", func_name=func_name, ) trigger_refreshes = True sync_refresh = True elif status == "conflict": log_handler.log_error( f"Merge conflict occurred merging '{merged_from}'. User needs to resolve manually.", func_name=func_name, ) if hasattr(self, "main_frame"): self.main_frame.show_error( "Merge Conflict", f"Merge conflict occurred while merging '{merged_from}' into '{merged_into}'.\n\nPlease resolve the conflicts manually in:\n{repo_path_conflict}\n\nAfter resolving, stage the changes and commit them.", ) trigger_refreshes = True sync_refresh = False self._reenable_widgets_after_modal() elif status == "error": log_handler.log_error( f"Merge local branch failed: {message}", func_name=func_name ) if hasattr(self, "main_frame"): self.main_frame.show_error("Merge Error", f"{message}") self._reenable_widgets_after_modal() return trigger_refreshes, sync_refresh # --- Handlers for Authentication and Remote Config --- # (Metodi _handle_... INVARIATI rispetto a prima) def _handle_check_connection_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_check_connection_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") result_value: Optional[str] = result_data.get("result") remote_name: str = context.get("remote_name_checked", "unknown remote") repo_path_checked: Optional[str] = context.get("repo_path_checked") sync_refresh: bool = False if status == "success": auth_status_to_set: str = "ok" log_handler.log_info( f"Connection check successful for '{remote_name}'.", func_name=func_name ) self.app._update_gui_auth_status(auth_status_to_set) sync_refresh = True elif status == "auth_required": log_handler.log_warning( f"Authentication required for remote '{remote_name}'.", func_name=func_name, ) self.app._update_gui_auth_status("required") if ( repo_path_checked and hasattr(self.main_frame, "ask_yes_no") and self.main_frame.ask_yes_no( "Authentication Required", f"Authentication is required to connect to remote '{remote_name}'.\n\nDo you want to attempt authentication now?\n(This may open a separate terminal window for credential input.)", ) ): log_handler.log_info( "User requested interactive authentication attempt.", func_name=func_name, ) args_interactive: tuple = ( self.app.git_commands, repo_path_checked, remote_name, ) if hasattr(self.app, "_start_async_operation"): self.app._start_async_operation( worker_func=async_workers.run_interactive_auth_attempt_async, args_tuple=args_interactive, context_dict={ "context": "interactive_auth", "status_msg": f"Attempting interactive auth for '{remote_name}'", "original_context": context, }, ) else: log_handler.log_error( "Cannot start interactive auth: _start_async_operation missing on app.", func_name=func_name, ) self._reenable_widgets_after_modal() else: log_handler.log_info( "User declined interactive authentication attempt or required data missing.", func_name=func_name, ) self._reenable_widgets_after_modal() elif status == "error": error_type: str = ( result_value if result_value in ["connection_failed", "unknown_error", "worker_exception"] else "unknown_error" ) self.app._update_gui_auth_status(error_type) if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status( status_text="Sync Status: Error" ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error("Connection Error", f"{message}") return False, sync_refresh def _handle_interactive_auth_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_interactive_auth_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") result_value: Optional[str] = result_data.get("result") original_context: dict = context.get("original_context", {}) remote_name: str = original_context.get("remote_name_checked", "unknown remote") if status == "success" and result_value == "auth_attempt_success": log_handler.log_info( f"Interactive auth attempt for '{remote_name}' successful. Re-checking connection...", func_name=func_name, ) if hasattr(self.main_frame, "update_status_bar"): self.main_frame.update_status_bar( f"Authentication successful. Checking status..." ) if hasattr(self.app, "check_connection_auth") and callable( self.app.check_connection_auth ): self.app.master.after(100, self.app.check_connection_auth) else: log_handler.log_error( "Cannot re-check connection: check_connection_auth missing on app.", func_name=func_name, ) self._reenable_widgets_after_modal() elif status == "error": log_handler.log_warning( f"Interactive auth attempt for '{remote_name}' failed: {message}", func_name=func_name, ) self.app._update_gui_auth_status("failed") if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status( status_text="Sync Status: Auth Failed" ) if hasattr(self.main_frame, "show_warning"): self.main_frame.show_warning( "Authentication Attempt Failed", f"{message}" ) self._reenable_widgets_after_modal() return False, False def _handle_apply_remote_config_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_apply_remote_config_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") sync_refresh: bool = False if status == "success": log_handler.log_info( f"Apply remote config successful: {message}", func_name=func_name ) if hasattr(self.app, "check_connection_auth") and callable( self.app.check_connection_auth ): log_handler.log_debug( "Scheduling connection check after applying config.", func_name=func_name, ) self.app.master.after(50, self.app.check_connection_auth) else: log_handler.log_warning( "Cannot auto-check connection after apply config.", func_name=func_name, ) sync_refresh = True elif status == "error": log_handler.log_error( f"Apply remote config failed: {message}", func_name=func_name ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error("Apply Config Error", f"Failed:\n{message}") self.app._update_gui_auth_status("unknown") if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status( status_text="Sync Status: Unknown" ) return False, sync_refresh # --- Handlers for Remote Git Actions --- # (Metodi _handle_... INVARIATI rispetto a prima) def _handle_fetch_remote_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_fetch_remote_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") exception: Optional[Exception] = result_data.get("exception") trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info( f"Fetch remote successful: {message}", func_name=func_name ) trigger_refreshes = True sync_refresh = True elif status == "error": log_handler.log_error( f"Fetch remote failed: {message}", func_name=func_name ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error("Fetch Error", f"Failed:\n{message}") auth_status: str = self._determine_auth_status_from_error(exception) self.app._update_gui_auth_status(auth_status) if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status( status_text="Sync Status: Error" ) return trigger_refreshes, sync_refresh def _handle_pull_remote_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_pull_remote_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") exception: Optional[Exception] = result_data.get("exception") repo_path_conflict: Optional[str] = context.get("repo_path") remote_name_context: Optional[str] = context.get("remote_name") trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info( f"Pull remote successful: {message}", func_name=func_name ) trigger_refreshes = True sync_refresh = True elif status == "conflict": log_handler.log_error( f"Pull resulted in conflict: {message}", func_name=func_name ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error( "Merge Conflict", f"Merge conflict occurred during pull from '{remote_name_context or 'remote'}'.\n\nPlease resolve conflicts manually in:\n{repo_path_conflict}\n\nAfter resolving, stage and commit.", ) trigger_refreshes = True sync_refresh = False self._reenable_widgets_after_modal() elif status == "error": log_handler.log_error(f"Pull remote failed: {message}", func_name=func_name) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error("Pull Error", f"Failed:\n{message}") auth_status: str = self._determine_auth_status_from_error(exception) self.app._update_gui_auth_status(auth_status) if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status( status_text="Sync Status: Error" ) return trigger_refreshes, sync_refresh def _handle_push_remote_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_push_remote_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") exception: Optional[Exception] = result_data.get("exception") rejected_branch: Optional[str] = result_data.get("branch_name") trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info( f"Push remote successful: {message}", func_name=func_name ) trigger_refreshes = False sync_refresh = True elif status == "rejected": log_handler.log_error( f"Push rejected for branch '{rejected_branch}': {message}", func_name=func_name, ) if hasattr(self.main_frame, "show_warning"): self.main_frame.show_warning("Push Rejected", f"{message}") if hasattr(self.app, "fetch_remote") and callable(self.app.fetch_remote): log_handler.log_debug( "Scheduling fetch after push rejection.", func_name=func_name ) self.app.master.after(100, self.app.fetch_remote) else: trigger_refreshes = True sync_refresh = True elif status == "error": log_handler.log_error(f"Push remote failed: {message}", func_name=func_name) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error("Push Error", f"Failed:\n{message}") auth_status: str = self._determine_auth_status_from_error(exception) self.app._update_gui_auth_status(auth_status) if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status( status_text="Sync Status: Error" ) return trigger_refreshes, sync_refresh def _handle_push_tags_remote_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_push_tags_remote_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") exception: Optional[Exception] = result_data.get("exception") trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info( f"Push tags successful: {message}", func_name=func_name ) trigger_refreshes = True elif status == "error": log_handler.log_error(f"Push tags failed: {message}", func_name=func_name) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error("Push Tags Error", f"Failed:\n{message}") auth_status: str = self._determine_auth_status_from_error(exception) self.app._update_gui_auth_status(auth_status) return trigger_refreshes, sync_refresh def _handle_clone_remote_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_clone_remote_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") success_data: Optional[dict] = context.get("clone_success_data") trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info( f"Clone successful. Creating profile...", func_name=func_name ) if success_data and isinstance(success_data, dict): new_profile_name: Optional[str] = success_data.get("profile_name") cloned_repo_path: Optional[str] = success_data.get("cloned_path") cloned_remote_url: Optional[str] = success_data.get("remote_url") if new_profile_name and cloned_repo_path and cloned_remote_url: try: defaults: dict = ( self.app.config_manager._get_expected_keys_with_defaults() ) defaults["svn_working_copy_path"] = cloned_repo_path defaults["remote_url"] = cloned_remote_url defaults["remote_name"] = DEFAULT_REMOTE_NAME defaults["bundle_name"] = f"{new_profile_name}.bundle" defaults["bundle_name_updated"] = ( f"{new_profile_name}_update.bundle" ) defaults["autobackup"] = "False" defaults["autocommit"] = "False" defaults["commit_message"] = "" self.app.config_manager.add_section(new_profile_name) for key, value in defaults.items(): self.app.config_manager.set_profile_option( new_profile_name, key, value ) self.app.config_manager.save_config() log_handler.log_info( f"Profile '{new_profile_name}' created successfully.", func_name=func_name, ) sections: list = self.app.config_manager.get_profile_sections() if hasattr( self.main_frame, "update_profile_dropdown" ) and hasattr(self.main_frame, "profile_var"): self.main_frame.update_profile_dropdown(sections) self.main_frame.profile_var.set(new_profile_name) except Exception as profile_e: log_handler.log_exception( f"Clone ok, but failed to create profile '{new_profile_name}': {profile_e}", func_name=func_name, ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error( "Profile Creation Error", f"Clone ok, failed create profile:\n{profile_e}", ) self._reenable_widgets_after_modal() else: log_handler.log_error( "Clone ok, but missing data for profile.", func_name=func_name ) if hasattr(self.main_frame, "update_status_bar"): self.main_frame.update_status_bar( "Clone done, profile data missing." ) self._reenable_widgets_after_modal() else: log_handler.log_error( "Clone ok, but success data missing/invalid.", func_name=func_name ) if hasattr(self.main_frame, "update_status_bar"): self.main_frame.update_status_bar("Clone done, profile data error.") self._reenable_widgets_after_modal() elif status == "error": log_handler.log_error( f"Clone operation failed: {message}", func_name=func_name ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error("Clone Error", f"{message}") self._reenable_widgets_after_modal() return trigger_refreshes, sync_refresh def _handle_checkout_tracking_branch_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_checkout_tracking_branch_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info( f"Successfully checked out tracking branch: {message}", func_name=func_name, ) trigger_refreshes = True sync_refresh = True elif status == "error": log_handler.log_error( f"Checkout tracking branch failed: {message}", func_name=func_name ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error("Checkout Error", f"{message}") if hasattr(self.main_frame, "update_ahead_behind_status"): self.main_frame.update_ahead_behind_status( status_text="Sync Status: Error" ) self._reenable_widgets_after_modal() return trigger_refreshes, sync_refresh # --- Handler Specifico Spostato (Compare Branches) --- def _handle_compare_branches_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_compare_branches_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") result_value: Any = result_data.get("result") if status == "success": log_handler.log_info( "Branch comparison successful. Requesting summary window display...", func_name=func_name, ) ref1_ctx: Optional[str] = context.get("ref1") ref2_ctx: Optional[str] = context.get("ref2") repo_path_ctx: Optional[str] = context.get("repo_path") changed_files_list: list = ( result_value if isinstance(result_value, list) else [] ) if ref1_ctx and ref2_ctx and repo_path_ctx: if hasattr(self.app, "show_comparison_summary") and callable( self.app.show_comparison_summary ): self.app.show_comparison_summary( ref1=ref1_ctx, ref2=ref2_ctx, repo_path=repo_path_ctx, changed_files=changed_files_list, ) else: log_handler.log_error( "Cannot display comparison summary: 'show_comparison_summary' method missing on app.", func_name=func_name, ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error( "Internal Error", "Cannot display comparison results." ) self._reenable_widgets_after_modal() else: log_handler.log_error( "Missing context data (ref1, ref2, repo_path) for comparison summary.", func_name=func_name, ) if hasattr(self.main_frame, "update_status_bar"): self.main_frame.update_status_bar( "Error: Internal data missing for comparison." ) self._reenable_widgets_after_modal() elif status == "error": log_handler.log_error( f"Branch comparison failed: {message}", func_name=func_name ) if hasattr(self.main_frame, "show_error"): self.main_frame.show_error( "Comparison Error", f"Could not compare branches:\n{message}" ) # Widgets already re-enabled return False, False # --- Gestore Generico --- def _handle_generic_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: # ... (Codice invariato) func_name: str = "_handle_generic_result" status: Optional[str] = result_data.get("status") message: str = result_data.get("message", "Operation finished.") exception: Optional[Exception] = result_data.get("exception") task_context: str = context.get("context", "unknown") log_handler.log_debug( f"Handling generic result for '{task_context}' with status '{status}'.", func_name=func_name, ) if status == "error": log_handler.log_error( f"Error reported for task '{task_context}': {message}", func_name=func_name, ) error_details: str = ( f"{message}\n({type(exception).__name__}: {exception})" if exception else message ) if hasattr(self.main_frame, "show_error"): error_title: str = f"Error: {task_context.replace('_',' ').title()}" self.main_frame.show_error(error_title, error_details) elif status == "warning": if hasattr(self.main_frame, "show_warning"): self.main_frame.show_warning("Operation Info", message) return False, False # --- Helper Methods --- # (Metodi _determine_auth_status_from_error, _reenable_widgets_after_modal, # _trigger_post_action_refreshes - INVARIATI rispetto a prima) def _determine_auth_status_from_error(self, exception: Optional[Exception]) -> str: # ... (Codice invariato) if isinstance(exception, GitCommandError) and exception.stderr: stderr_low: str = exception.stderr.lower() auth_errors: List[str] = [ "authentication failed", "permission denied", "could not read", ] conn_errors: List[str] = [ "repository not found", "could not resolve host", "failed to connect", "network is unreachable", "timed out", "unable to access", ] if any(e in stderr_low for e in auth_errors): return "failed" if any(e in stderr_low for e in conn_errors): return "connection_failed" return "unknown_error" def _reenable_widgets_after_modal(self): # ... (Codice invariato) if hasattr(self.app, "master") and self.app.master.winfo_exists(): self.app.master.after( 50, self.app._reenable_widgets_if_ready ) # Usa self.app._reenable_widgets_if_ready def _trigger_post_action_refreshes(self, sync_refresh_needed: bool): # ... (Codice invariato) func_name: str = "_trigger_post_action_refreshes" if not hasattr(self.app, "master") or not self.app.master.winfo_exists(): log_handler.log_warning( "Cannot trigger refreshes: Master window closed.", func_name=func_name ) return current_repo_path: Optional[str] = self.app._get_and_validate_svn_path( "Post-Action Refresh Trigger" ) if not current_repo_path or not self.app._is_repo_ready(current_repo_path): log_handler.log_warning( "Cannot trigger refreshes: Repo path unavailable or not ready.", func_name=func_name, ) self._reenable_widgets_after_modal() return refresh_functions_to_call: List[Callable] = [ self.app.refresh_commit_history, self.app.refresh_branch_list, self.app.refresh_tag_list, self.app.refresh_changed_files_list, self.app.refresh_remote_branches, ] log_handler.log_debug( f"Scheduling {len(refresh_functions_to_call)} standard refreshes. Sync refresh needed: {sync_refresh_needed}", func_name=func_name, ) delay_ms: int = 50 for refresh_func in refresh_functions_to_call: try: schedule_func: Callable = lambda rf=refresh_func: rf() self.app.master.after(delay_ms, schedule_func) log_handler.log_debug( f"Scheduled {getattr(refresh_func, '__name__', 'unknown_refresh')} with delay {delay_ms}ms.", func_name=func_name, ) delay_ms += 75 except Exception as ref_e: log_handler.log_error( f"Error scheduling {getattr(refresh_func, '__name__', 'refresh function')}: {ref_e}", func_name=func_name, ) if sync_refresh_needed: if hasattr(self.app, "refresh_remote_status") and callable( self.app.refresh_remote_status ): log_handler.log_debug( f"Scheduling remote status refresh with delay {delay_ms}ms.", func_name=func_name, ) self.app.master.after(delay_ms, self.app.refresh_remote_status) else: log_handler.log_warning( "Sync refresh needed but refresh_remote_status method not found on app.", func_name=func_name, ) delay_ms += 75 def _reenable_widgets_final(): if ( hasattr(self.main_frame, "winfo_exists") and self.main_frame.winfo_exists() ): log_handler.log_debug( "Re-enabling widgets after post-action refreshes scheduled.", func_name="_reenable_widgets_final", ) self.main_frame.set_action_widgets_state(tk.NORMAL) self.app.master.after(delay_ms + 50, _reenable_widgets_final) def _handle_revert_to_tag_result( self, result_data: Dict[str, Any], context: Dict[str, Any] ) -> Tuple[bool, bool]: """ Handles the result of the 'revert to tag' operation. Shows success/error message and triggers a full GUI refresh on success. """ func_name: str = "_handle_revert_to_tag_result" status: Optional[str] = result_data.get("status") message: Optional[str] = result_data.get("message") tag_name_ctx: Optional[str] = context.get("tag_name") trigger_refreshes: bool = False sync_refresh: bool = False if status == "success": log_handler.log_info( f"Successfully reverted repository to tag '{tag_name_ctx}'. Triggering full refresh.", func_name=func_name, ) # Mostra un messaggio di informazione all'utente if hasattr(self.main_frame, "show_info"): self.main_frame.show_info("Operation Successful", message) # Segnala la necessità di un refresh completo della GUI trigger_refreshes = True sync_refresh = True elif status == "error": log_handler.log_error( f"Revert to tag '{tag_name_ctx}' failed: {message}", func_name=func_name ) # Mostra un messaggio di errore all'utente if hasattr(self.main_frame, "show_error"): self.main_frame.show_error("Revert to Tag Error", f"Failed:\n{message}") # Non è necessario chiamare _reenable_widgets_after_modal() qui, # perché o il refresh lo farà, o il gestore generico lo farà se non ci sono refresh. return trigger_refreshes, sync_refresh # --- End of AsyncResultHandler Class ---