SXXXXXXX_GitUtility/async_result_handler.py
2025-04-28 08:22:56 +02:00

1518 lines
76 KiB
Python

# --- FILE: async_result_handler.py ---
import tkinter as tk
import queue
import os
from typing import TYPE_CHECKING
from typing import Dict
from typing import Any
from typing import List
from typing import Tuple
from typing import Callable
from typing import Optional # Use Optional for type hints
# Import application modules needed for handling results
import log_handler
from git_commands import GitCommandError
import async_workers # Needed for scheduling interactive auth task
# Import GUI elements that might be updated or interacted with
# Import only high-level GUI classes if needed, avoid specific windows if possible
# from gui import MainFrame # Example if MainFrame methods are needed directly
# Import specific windows ONLY if they are instantiated here
# from gui import DiffSummaryWindow # Example - Removed to break import cycle
from config_manager import DEFAULT_REMOTE_NAME # For clone_remote result
# --- Forward Reference for Type Hinting ---
# Avoids circular imports while enabling type checking for the app instance.
if TYPE_CHECKING:
# Import the main app class only for type hinting
from GitUtility import GitSvnSyncApp
# Import MainFrame for type hinting if needed directly (e.g., self.main_frame)
from gui import MainFrame
class AsyncResultHandler:
"""
Handles processing the results from asynchronous worker functions.
Updates the GUI based on the outcome of background tasks and triggers
necessary follow-up actions like GUI refreshes.
Separates result processing logic from the main application controller.
"""
def __init__(self, app: 'GitSvnSyncApp'):
"""
Initializes the result handler.
Args:
app (GitSvnSyncApp): The main application instance, used to access
GUI elements and other application methods.
Raises:
ValueError: If the provided 'app' instance is invalid.
"""
if app is None:
# Raise error immediately if the main app instance is not provided
raise ValueError("AsyncResultHandler requires a valid 'app' instance.")
# Store a reference to the main application instance
self.app: 'GitSvnSyncApp' = app
# Store a convenience reference to the main GUI frame
# Assumes self.app.main_frame exists and is valid
self.main_frame: 'MainFrame' = app.main_frame
# Log initialization
log_handler.log_debug("AsyncResultHandler initialized.", func_name="__init__")
def process(self, result_data: Dict[str, Any], context: Dict[str, Any]):
"""
Main entry point for processing results from async workers.
Dispatches the result to the appropriate handler method based on context.
Schedules subsequent GUI refreshes if needed.
Args:
result_data (Dict[str, Any]): The dictionary returned by the worker.
context (Dict[str, Any]): The context dictionary passed when the
async operation was started.
"""
# Extract task context and status from the received data
task_context: str = context.get('context', 'unknown')
status: str = result_data.get('status', 'error') # Default to 'error'
func_name: str = f"process (ctx: {task_context})" # For logging context
log_handler.log_debug(
f"Processing result for context '{task_context}' with status '{status}'.",
func_name=func_name
)
# --- Determine refresh needs based on the result ---
# These flags will be set by the specific handler methods called below.
should_trigger_refreshes: bool = False
post_action_sync_refresh_needed: bool = False # For ahead/behind status
# --- Dispatch to the appropriate handler method ---
# Map context strings to their corresponding handler methods within this class.
handler_map: Dict[str, Callable] = {
'compare_branches': self._handle_compare_branches_result,
'refresh_tags': self._handle_refresh_tags_result,
'refresh_branches': self._handle_refresh_local_branches_result,
'refresh_history': self._handle_refresh_history_result,
'refresh_changes': self._handle_refresh_changes_result,
'refresh_remote_branches': self._handle_refresh_remote_branches_result,
'get_ahead_behind': self._handle_get_ahead_behind_result,
'prepare_repo': self._handle_prepare_repo_result,
'create_bundle': self._handle_create_bundle_result,
'fetch_bundle': self._handle_fetch_bundle_result,
'manual_backup': self._handle_manual_backup_result,
'commit': self._handle_commit_result,
'_handle_gitignore_save': self._handle_untrack_result, # Result from untrack
'add_file': self._handle_add_file_result,
'create_tag': self._handle_create_tag_result,
'checkout_tag': self._handle_checkout_tag_result,
'create_branch': self._handle_create_branch_result,
'checkout_branch': self._handle_checkout_branch_result,
'delete_local_branch': self._handle_delete_local_branch_result,
'merge_local_branch': self._handle_merge_local_branch_result,
'check_connection': self._handle_check_connection_result, # Correct mapping
'interactive_auth': self._handle_interactive_auth_result, # Correct mapping
'apply_remote_config': self._handle_apply_remote_config_result,
'fetch_remote': self._handle_fetch_remote_result,
'pull_remote': self._handle_pull_remote_result,
'push_remote': self._handle_push_remote_result,
'push_tags_remote': self._handle_push_tags_remote_result,
'clone_remote': self._handle_clone_remote_result,
'checkout_tracking_branch': self._handle_checkout_tracking_branch_result,
'get_commit_details': self._handle_get_commit_details_result,
}
# Get the appropriate handler method from the map
handler_method: Optional[Callable] = handler_map.get(task_context)
# --- Execute the handler ---
if handler_method and callable(handler_method):
# Call the specific handler method for this context.
# It's expected to return a tuple indicating refresh needs.
refresh_flags: Any = handler_method(result_data, context)
# Unpack the refresh flags returned by the handler
if isinstance(refresh_flags, tuple) and len(refresh_flags) == 2:
# Assign the returned boolean flags
should_trigger_refreshes, post_action_sync_refresh_needed = refresh_flags
else:
# Log a warning if the handler didn't return the expected flags
log_handler.log_warning(
f"Handler for '{task_context}' did not return expected refresh flags tuple.",
func_name=func_name
)
# Default to no refreshes if flags are missing or invalid
should_trigger_refreshes = False
post_action_sync_refresh_needed = False
else:
# If no specific handler exists, use the generic fallback
log_handler.log_warning(
f"No specific handler method found for context: '{task_context}'. Performing generic handling.",
func_name=func_name
)
self._handle_generic_result(result_data, context)
# No refreshes triggered for generic handling by default
should_trigger_refreshes = False
post_action_sync_refresh_needed = False
# --- Trigger necessary GUI refreshes ---
# This happens *after* the specific result has been handled.
if should_trigger_refreshes:
self._trigger_post_action_refreshes(post_action_sync_refresh_needed)
# --- Handler Methods for Refresh Tasks ---
def _handle_refresh_tags_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'refresh_tags' async task. """
func_name: str = "_handle_refresh_tags_result"
status: Optional[str] = result_data.get('status')
result_value: Any = result_data.get('result') # Expected: List[Tuple[str, str]] or List
# Ensure the target GUI update method exists
if not hasattr(self.main_frame, "update_tag_list"):
log_handler.log_error(
"Cannot update tags: MainFrame missing 'update_tag_list'.",
func_name=func_name
)
return False, False # No refresh needed if GUI can't be updated
# Update GUI based on the status
if status == 'success':
# Validate result type before passing to GUI method
tags_list: list = result_value if isinstance(result_value, list) else []
self.main_frame.update_tag_list(tags_list)
log_handler.log_debug(
f"Tag list updated in GUI with {len(tags_list)} items.",
func_name=func_name
)
elif status == 'error':
log_handler.log_error(
"Error reported during tag refresh.", func_name=func_name
)
# Update GUI to show error state
self.main_frame.update_tag_list([("(Error)", "")])
# Refresh actions themselves do not trigger further standard refreshes
return False, False
def _handle_refresh_local_branches_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'refresh_branches' (local) async task. """
func_name: str = "_handle_refresh_local_branches_result"
status: Optional[str] = result_data.get('status')
result_value: Any = result_data.get('result') # Expected: Tuple[List[str], str | None]
# Ensure target GUI update methods exist
gui_methods_exist: bool = (
hasattr(self.main_frame, "update_branch_list") and
hasattr(self.main_frame, "update_history_branch_filter")
)
if not gui_methods_exist:
log_handler.log_error(
"Cannot update local branches: MainFrame missing update methods.",
func_name=func_name
)
return False, False
# Default values
branches: List[str] = []
current_branch: Optional[str] = None
# Process result based on status
if status == 'success':
# Validate result type and structure
if isinstance(result_value, tuple) and len(result_value) == 2:
branches, current_branch = result_value
# Update internal state variable in the app controller
self.app.current_local_branch = current_branch
log_handler.log_debug(
f"Local branches updated in GUI. Current: {current_branch}",
func_name=func_name
)
else:
# Log warning if result format is unexpected
log_handler.log_warning(
"Received success status for local branches refresh, but result format is invalid.",
func_name=func_name
)
branches = ["(Invalid Data)"]
current_branch = None
self.app.current_local_branch = None
elif status == 'error':
log_handler.log_error(
"Error reported during local branches refresh.", func_name=func_name
)
branches = ["(Error)"]
current_branch = None
self.app.current_local_branch = None
# Update the GUI lists
self.main_frame.update_branch_list(branches, current_branch)
self.main_frame.update_history_branch_filter(branches)
# Trigger sync status update *after* branch info is updated
if hasattr(self.app, "refresh_remote_status") and callable(self.app.refresh_remote_status):
log_handler.log_debug(
"Scheduling remote status refresh after local branches update.",
func_name=func_name
)
# Schedule with a slight delay to allow GUI processing
self.app.master.after(100, self.app.refresh_remote_status)
else:
log_handler.log_warning(
"Could not schedule sync status refresh: method missing on app.",
func_name=func_name
)
# Refresh itself doesn't trigger the standard refresh list
return False, False
def _handle_refresh_history_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'refresh_history' async task. """
func_name: str = "_handle_refresh_history_result"
status: Optional[str] = result_data.get('status')
result_value: Any = result_data.get('result') # Expected: List[str]
if not hasattr(self.main_frame, "update_history_display"):
log_handler.log_error(
"Cannot update history: MainFrame missing 'update_history_display'.",
func_name=func_name
)
return False, False
if status == 'success':
log_lines: list = result_value if isinstance(result_value, list) else []
self.main_frame.update_history_display(log_lines)
log_handler.log_debug(
f"History display updated in GUI with {len(log_lines)} lines.",
func_name=func_name
)
elif status == 'error':
log_handler.log_error(
"Error reported during history refresh.", func_name=func_name
)
self.main_frame.update_history_display(["(Error retrieving history)"])
return False, False
def _handle_refresh_changes_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'refresh_changes' async task. """
func_name: str = "_handle_refresh_changes_result"
status: Optional[str] = result_data.get('status')
result_value: Any = result_data.get('result') # Expected: List[str]
if not hasattr(self.main_frame, "update_changed_files_list"):
log_handler.log_error(
"Cannot update changes: MainFrame missing 'update_changed_files_list'.",
func_name=func_name
)
return False, False
if status == 'success':
files_list: list = result_value if isinstance(result_value, list) else []
self.main_frame.update_changed_files_list(files_list)
log_handler.log_debug(
f"Changed files list updated in GUI with {len(files_list)} items.",
func_name=func_name
)
elif status == 'error':
log_handler.log_error(
"Error reported during changes refresh.", func_name=func_name
)
self.main_frame.update_changed_files_list(["(Error refreshing changes)"])
return False, False
def _handle_refresh_remote_branches_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'refresh_remote_branches' async task. """
func_name: str = "_handle_refresh_remote_branches_result"
status: Optional[str] = result_data.get('status')
result_value: Any = result_data.get('result') # Expected: List[str]
if not hasattr(self.main_frame, "update_remote_branches_list"):
log_handler.log_error(
"Cannot update remote branches: MainFrame missing 'update_remote_branches_list'.",
func_name=func_name
)
return False, False
if status == 'success':
branch_list: list = result_value if isinstance(result_value, list) else ["(Invalid Data)"]
self.main_frame.update_remote_branches_list(branch_list)
log_handler.log_debug(
f"Remote branches list updated in GUI with {len(branch_list)} items.",
func_name=func_name
)
elif status == 'error':
log_handler.log_error(
"Error reported during remote branches refresh.", func_name=func_name
)
self.main_frame.update_remote_branches_list(["(Error)"])
# Update sync status if remote branch list fails
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(status_text="Sync Status: Unknown")
return False, False
def _handle_get_ahead_behind_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'get_ahead_behind' async task. """
func_name: str = "_handle_get_ahead_behind_result"
status: Optional[str] = result_data.get('status')
result_value: Any = result_data.get('result') # Expected: Tuple[int | None, int | None]
message: Optional[str] = result_data.get('message')
local_branch_ctx: Optional[str] = context.get("local_branch") # Get from context
if not hasattr(self.main_frame, "update_ahead_behind_status"):
log_handler.log_error(
"Cannot update sync status: MainFrame missing 'update_ahead_behind_status'.",
func_name=func_name
)
return False, False
if status == 'success':
ahead, behind = result_value if isinstance(result_value, tuple) else (None, None)
log_handler.log_info(
f"Ahead/Behind status updated for '{local_branch_ctx}': Ahead={ahead}, Behind={behind}",
func_name=func_name
)
# Pass details to the GUI update method
self.main_frame.update_ahead_behind_status(
current_branch=local_branch_ctx, ahead=ahead, behind=behind
)
elif status == 'error':
log_handler.log_error(
f"Failed to get ahead/behind status for '{local_branch_ctx}': {message}",
func_name=func_name
)
# Construct error message for display
error_text: str = f"Sync Status: Error" if not message else f"Sync Status: Error ({message})"
self.main_frame.update_ahead_behind_status(
current_branch=local_branch_ctx, status_text=error_text
)
return False, False
# --- Handler Methods for Local Git Actions ---
def _handle_prepare_repo_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'prepare_repo' async task. """
func_name: str = "_handle_prepare_repo_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == 'success':
log_handler.log_info(
"Repository preparation successful.", func_name=func_name
)
trigger_refreshes = True # Refresh needed after init/prepare
sync_refresh = True # Sync status might change (becomes trackable)
elif status == 'warning': # e.g., "already prepared"
log_handler.log_warning(
f"Prepare repo warning: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_info"): # Use info popup for this warning
self.main_frame.show_info("Prepare Info", message)
trigger_refreshes = True # Update GUI state anyway
sync_refresh = True
elif status == 'error':
log_handler.log_error(
f"Repository preparation failed: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Prepare Error", f"Failed to prepare repository:\n{message}"
)
# Force GUI status indicator update after prepare attempt
if hasattr(self.app, 'update_svn_status_indicator') and hasattr(self.app.main_frame, 'svn_path_entry'):
# Schedule update slightly later to ensure state consistency
self.app.master.after(
50, lambda: self.app.update_svn_status_indicator(self.app.main_frame.svn_path_entry.get())
)
return trigger_refreshes, sync_refresh
def _handle_create_bundle_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'create_bundle' async task. """
func_name: str = "_handle_create_bundle_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
committed: bool = result_data.get('committed', False) # Check if autocommit was attempted
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == 'success':
log_handler.log_info(
f"Create bundle finished: {message}", func_name=func_name
)
# If autocommit was done, trigger refreshes
if committed:
trigger_refreshes = True
sync_refresh = True
elif status == 'error':
log_handler.log_error(
f"Create bundle failed: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Create Bundle Error", f"Failed:\n{message}")
return trigger_refreshes, sync_refresh
def _handle_fetch_bundle_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'fetch_bundle' async task (fetch/clone). """
func_name: str = "_handle_fetch_bundle_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
is_conflict: bool = result_data.get('conflict', False)
repo_path_conflict: Optional[str] = result_data.get('repo_path')
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == 'success':
log_handler.log_info(
f"Fetch from bundle successful: {message}", func_name=func_name
)
trigger_refreshes = True # Full refresh needed after fetch/clone
sync_refresh = True
elif status == 'error':
log_handler.log_error(
f"Fetch from bundle failed: {message}", func_name=func_name
)
# Handle specific case of merge conflict during fetch
if is_conflict and repo_path_conflict:
log_handler.log_error(
"Merge conflict detected during fetch from bundle.",
func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Merge Conflict",
f"Conflict occurred during bundle fetch.\nResolve in:\n{repo_path_conflict}\nThen commit."
)
trigger_refreshes = True # Only refresh changes list
sync_refresh = False # Sync status invalid during conflict
# Re-enable widgets after showing conflict popup
self._reenable_widgets_after_modal()
else:
# Handle other fetch errors
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Fetch Bundle Error", f"Failed:\n{message}")
# Force GUI status indicator update after fetch/clone attempt
if hasattr(self.app, 'update_svn_status_indicator') and hasattr(self.app.main_frame, 'svn_path_entry'):
self.app.master.after(
50, lambda: self.app.update_svn_status_indicator(self.app.main_frame.svn_path_entry.get())
)
return trigger_refreshes, sync_refresh
def _handle_manual_backup_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'manual_backup' async task. """
func_name: str = "_handle_manual_backup_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
if status == 'success':
log_handler.log_info(
f"Manual backup finished: {message}", func_name=func_name
)
# Optionally show a success info message
# if hasattr(self.main_frame, "show_info"):
# self.main_frame.show_info("Backup Complete", message)
elif status == 'error':
log_handler.log_error(
f"Manual backup failed: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Manual Backup Error", f"Failed:\n{message}")
# Backup doesn't change repo state, no refreshes needed
return False, False
def _handle_commit_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'commit' async task. """
func_name: str = "_handle_commit_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
committed: bool = result_data.get('committed', False) # Flag if commit was actually made
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == 'success':
log_handler.log_info(
f"Commit operation finished: {message}", func_name=func_name
)
# If a commit was made, clear message and trigger refreshes
if committed:
if hasattr(self.main_frame, "clear_commit_message"):
self.main_frame.clear_commit_message()
trigger_refreshes = True # Refresh history, changes
sync_refresh = True # Commit changes sync status
elif status == 'error':
log_handler.log_error(f"Commit failed: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Commit Error", f"Failed:\n{message}")
return trigger_refreshes, sync_refresh
def _handle_untrack_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the untrack operation (context: '_handle_gitignore_save'). """
func_name: str = "_handle_untrack_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
committed: bool = result_data.get('committed', False) # Flag if untrack commit was made
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == 'success':
log_handler.log_info(
f"Untrack operation finished: {message}", func_name=func_name
)
# If untracking resulted in a commit, refresh lists
if committed:
trigger_refreshes = True # Refresh history, changes
sync_refresh = True # Commit changes sync status
elif status == 'error':
log_handler.log_error(f"Untracking failed: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Untrack Error", f"Failed:\n{message}")
return trigger_refreshes, sync_refresh
def _handle_get_commit_details_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
"""
Handles the result of the 'get_commit_details' async task.
If successful, calls the main app to display the commit detail window.
"""
func_name: str = "_handle_get_commit_details_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
# result_value qui è il dizionario commit_details dal worker
commit_details: Optional[Dict[str, Any]] = result_data.get('result')
if status == 'success':
log_handler.log_info(
"Commit details retrieved successfully. Requesting detail window display...",
func_name=func_name
)
# Verifica che i dettagli siano un dizionario valido
if isinstance(commit_details, dict):
# Chiama un nuovo metodo sull'app principale per mostrare la finestra
if hasattr(self.app, "show_commit_details") and callable(self.app.show_commit_details):
# Passa il dizionario completo dei dettagli
self.app.show_commit_details(commit_details)
# Nota: show_commit_details in GitUtility si occuperà di
# riabilitare i widget dopo la chiusura della finestra modale.
else:
log_handler.log_error(
"Cannot display commit details: 'show_commit_details' method missing on app.",
func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Internal Error", "Cannot display commit details window."
)
# Riabilita subito i widget in caso di errore interno
self._reenable_widgets_after_modal()
else:
# Errore: il risultato non è nel formato atteso
log_handler.log_error(
f"Received success status for commit details, but result is not a dict: {type(commit_details)}",
func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Data Error", "Failed to process commit details.")
self._reenable_widgets_after_modal()
elif status == 'error':
# Gestisce l'errore (es. commit hash non valido)
log_handler.log_error(f"Failed to get commit details: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Commit Details Error", f"Could not get details:\n{message}")
# Widgets sono già riabilitati dal chiamante per stato 'error'
# Ottenere dettagli commit non triggera altri refresh standard
return False, False
def _handle_add_file_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'add_file' async task. """
func_name: str = "_handle_add_file_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
trigger_refreshes: bool = False
sync_refresh: bool = False # Adding doesn't directly change sync status
if status == 'success':
log_handler.log_info(
f"Add file successful: {message}", func_name=func_name
)
trigger_refreshes = True # Refresh changes list
elif status == 'error':
log_handler.log_error(f"Add file failed: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Add File Error", f"Failed:\n{message}")
return trigger_refreshes, sync_refresh
def _handle_create_tag_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'create_tag' async task. """
func_name: str = "_handle_create_tag_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
trigger_refreshes: bool = False
sync_refresh: bool = False # Creating local tag doesn't change sync
if status == 'success':
log_handler.log_info(
f"Tag creation successful: {message}", func_name=func_name
)
trigger_refreshes = True # Refresh tags list, history
elif status == 'error':
log_handler.log_error(f"Create tag failed: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Create Tag Error", f"Failed:\n{message}")
return trigger_refreshes, sync_refresh
def _handle_checkout_tag_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'checkout_tag' async task. """
func_name: str = "_handle_checkout_tag_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
exception: Optional[Exception] = result_data.get('exception')
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == 'success':
log_handler.log_info(
f"Checkout tag successful: {message}", func_name=func_name
)
trigger_refreshes = True # Update history, changes, branch list (shows detached)
sync_refresh = True # Sync status becomes detached/unknown
# Widgets re-enabled after refreshes by _trigger_post_action_refreshes
elif status == 'error':
log_handler.log_error(
f"Checkout tag failed: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
# Handle specific error for uncommitted changes
if exception and "Uncommitted changes" in str(exception):
self.main_frame.show_warning(
"Checkout Blocked", f"{message}\nCommit or stash first."
)
else:
self.main_frame.show_error("Checkout Tag Error", f"Failed:\n{message}")
# Re-enable widgets immediately on checkout error
self._reenable_widgets_after_modal()
return trigger_refreshes, sync_refresh
def _handle_create_branch_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'create_branch' async task. """
func_name: str = "_handle_create_branch_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
new_branch_context: Optional[str] = context.get('new_branch_name') # Name of created branch
trigger_refreshes: bool = False
sync_refresh: bool = False # Creating doesn't change sync status
if status == 'success':
log_handler.log_info(
f"Create branch successful: {message}", func_name=func_name
)
# Ask user if they want to checkout the newly created branch
if new_branch_context and hasattr(self.main_frame, "ask_yes_no"):
if self.main_frame.ask_yes_no(
"Checkout?", f"Switch to new branch '{new_branch_context}'?"
):
# If yes, call the checkout action (which starts another async task)
if hasattr(self.app, 'checkout_branch') and callable(self.app.checkout_branch):
# Pass the new branch name directly
# Note: This relies on getting the correct repo path again inside checkout_branch
self.app.checkout_branch(branch_to_checkout=new_branch_context)
# Prevent standard refreshes here; checkout result will handle them
trigger_refreshes = False
sync_refresh = False
else:
# Fallback if checkout method is missing
log_handler.log_error(
"Cannot checkout new branch: checkout method missing on app.",
func_name=func_name
)
trigger_refreshes = True # Refresh branch list only
else:
# User chose not to checkout, just refresh the branch list
trigger_refreshes = True
else:
# Should not happen if context is passed correctly, but refresh anyway
trigger_refreshes = True
elif status == 'error':
log_handler.log_error(f"Create branch failed: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Create Branch Error", f"Failed:\n{message}")
return trigger_refreshes, sync_refresh
def _handle_checkout_branch_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'checkout_branch' (existing) async task. """
func_name: str = "_handle_checkout_branch_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
exception: Optional[Exception] = result_data.get('exception')
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == 'success':
log_handler.log_info(
f"Checkout branch successful: {message}", func_name=func_name
)
trigger_refreshes = True # Full refresh needed after checkout
sync_refresh = True # Sync status needs update
# Widgets re-enabled after refreshes by _trigger_post_action_refreshes
elif status == 'error':
log_handler.log_error(
f"Checkout branch failed: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
# Handle specific error for uncommitted changes
if exception and "Uncommitted changes" in str(exception):
self.main_frame.show_warning(
"Checkout Blocked", f"{message}\nCommit or stash first."
)
else:
self.main_frame.show_error("Checkout Branch Error", f"Failed:\n{message}")
# Re-enable widgets immediately on checkout error
self._reenable_widgets_after_modal()
return trigger_refreshes, sync_refresh
def _handle_delete_local_branch_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'delete_local_branch' async task. """
func_name: str = "_handle_delete_local_branch_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
exception: Optional[Exception] = result_data.get('exception')
deleted_branch_name: Optional[str] = context.get("branch_name")
trigger_refreshes: bool = False
sync_refresh: bool = False # Deleting local doesn't change sync status
if status == 'success':
log_handler.log_info(
f"Local branch '{deleted_branch_name}' deleted successfully.",
func_name=func_name
)
trigger_refreshes = True # Refresh local branch list
elif status == 'error':
log_handler.log_error(
f"Delete local branch '{deleted_branch_name}' failed: {message}",
func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
# Show warning specifically for "not fully merged" error
stderr_text = getattr(exception, 'stderr', '') if isinstance(exception, GitCommandError) else ''
if "not fully merged" in message or "not fully merged" in stderr_text:
self.main_frame.show_warning("Delete Warning", f"{message}")
else:
self.main_frame.show_error("Delete Error", f"{message}")
return trigger_refreshes, sync_refresh
def _handle_merge_local_branch_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'merge_local_branch' async task. """
func_name: str = "_handle_merge_local_branch_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
# Get branch names and path from context for messages/popups
merged_into: Optional[str] = context.get("branch_merged_into")
merged_from: Optional[str] = context.get("branch_merged_from")
repo_path_conflict: Optional[str] = context.get("repo_path")
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == 'success':
log_handler.log_info(
f"Successfully merged '{merged_from}' into '{merged_into}'.",
func_name=func_name
)
trigger_refreshes = True # Refresh history, changes
sync_refresh = True # Merge changes sync status
elif status == 'conflict':
log_handler.log_error(
f"Merge conflict occurred merging '{merged_from}'. User needs to resolve manually.",
func_name=func_name
)
if hasattr(self, "main_frame"):
# Show detailed conflict message
self.main_frame.show_error(
"Merge Conflict",
f"Merge conflict occurred while merging '{merged_from}' into '{merged_into}'.\n\n"
f"Please resolve the conflicts manually in:\n{repo_path_conflict}\n\n"
f"After resolving, stage the changes and commit them."
)
trigger_refreshes = True # Refresh changes list
sync_refresh = False # Sync status invalid during conflict
# Re-enable widgets after showing conflict popup
self._reenable_widgets_after_modal()
elif status == 'error':
log_handler.log_error(f"Merge local branch failed: {message}", func_name=func_name)
if hasattr(self, "main_frame"):
self.main_frame.show_error("Merge Error", f"{message}")
# Re-enable widgets immediately on merge error
self._reenable_widgets_after_modal()
return trigger_refreshes, sync_refresh
# --- Handlers for Authentication and Remote Config ---
def _handle_check_connection_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'check_connection' async task. """
func_name: str = "_handle_check_connection_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
result_value: Optional[str] = result_data.get('result') # e.g., 'connected', 'auth_needed', etc.
remote_name: str = context.get("remote_name_checked", "unknown remote")
repo_path_checked: Optional[str] = context.get("repo_path_checked")
sync_refresh: bool = False # Flag to trigger sync status check later
if status == 'success':
auth_status_to_set: str = 'ok'
log_handler.log_info(
f"Connection check successful for '{remote_name}'.", func_name=func_name
)
# Update internal state and GUI indicator
self.app._update_gui_auth_status(auth_status_to_set)
sync_refresh = True # Sync status can be checked now
elif status == 'auth_required':
log_handler.log_warning(
f"Authentication required for remote '{remote_name}'.", func_name=func_name
)
self.app._update_gui_auth_status('required')
# Ask user if they want to attempt interactive authentication
if repo_path_checked and hasattr(self.main_frame, "ask_yes_no") and \
self.main_frame.ask_yes_no(
"Authentication Required",
f"Authentication is required to connect to remote '{remote_name}'.\n\n"
f"Do you want to attempt authentication now?\n"
f"(This may open a separate terminal window for credential input.)"
):
log_handler.log_info(
"User requested interactive authentication attempt.", func_name=func_name
)
# Prepare args for the interactive worker
args_interactive: tuple = (self.app.git_commands, repo_path_checked, remote_name)
# Use the app's method to start the async operation
if hasattr(self.app, '_start_async_operation'):
self.app._start_async_operation(
worker_func=async_workers.run_interactive_auth_attempt_async,
args_tuple=args_interactive,
context_dict={
"context": "interactive_auth",
"status_msg": f"Attempting interactive auth for '{remote_name}'",
"original_context": context # Pass original context for reference
}
)
else:
log_handler.log_error(
"Cannot start interactive auth: _start_async_operation missing on app.",
func_name=func_name
)
# Re-enable widgets if we cannot start the next task
self._reenable_widgets_after_modal()
else: # User cancelled or repo_path missing
log_handler.log_info(
"User declined interactive authentication attempt or required data missing.",
func_name=func_name
)
# Re-enable widgets if user declines
self._reenable_widgets_after_modal()
elif status == 'error':
# Determine the specific error type from the 'result' field
error_type: str = result_value if result_value in ['connection_failed', 'unknown_error', 'worker_exception'] else 'unknown_error'
# Update internal state and GUI indicator
self.app._update_gui_auth_status(error_type)
# Update sync status label
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(status_text="Sync Status: Error")
# Show error popup
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Connection Error", f"{message}")
# Check connection itself doesn't trigger standard refreshes, only sync status if OK
return False, sync_refresh
def _handle_interactive_auth_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'interactive_auth' async task. """
func_name: str = "_handle_interactive_auth_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
result_value: Optional[str] = result_data.get('result') # e.g., 'auth_attempt_success'
original_context: dict = context.get("original_context", {})
remote_name: str = original_context.get("remote_name_checked", "unknown remote")
if status == 'success' and result_value == 'auth_attempt_success':
log_handler.log_info(
f"Interactive auth attempt for '{remote_name}' successful. Re-checking connection...",
func_name=func_name
)
if hasattr(self.main_frame, "update_status_bar"):
self.main_frame.update_status_bar(
f"Authentication successful. Checking status..."
)
# --- Trigger a new connection check ---
if hasattr(self.app, 'check_connection_auth') and callable(self.app.check_connection_auth):
# Schedule the check after a short delay
self.app.master.after(100, self.app.check_connection_auth)
else:
log_handler.log_error(
"Cannot re-check connection: check_connection_auth missing on app.",
func_name=func_name
)
# Re-enable widgets if we cannot re-check
self._reenable_widgets_after_modal()
elif status == 'error':
log_handler.log_warning(
f"Interactive auth attempt for '{remote_name}' failed or error occurred: {message}",
func_name=func_name
)
# Update internal state and GUI
self.app._update_gui_auth_status('failed')
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(status_text="Sync Status: Auth Failed")
# Show warning popup
if hasattr(self.main_frame, "show_warning"):
self.main_frame.show_warning("Authentication Attempt Failed", f"{message}")
# Re-enable widgets after interactive attempt fails
self._reenable_widgets_after_modal()
# This action itself doesn't trigger standard GUI refreshes
return False, False
def _handle_apply_remote_config_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'apply_remote_config' async task. """
func_name: str = "_handle_apply_remote_config_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
sync_refresh: bool = False
if status == 'success':
log_handler.log_info(
f"Apply remote config successful: {message}", func_name=func_name
)
# After applying config, trigger a connection check
if hasattr(self.app, 'check_connection_auth') and callable(self.app.check_connection_auth):
log_handler.log_debug(
"Scheduling connection check after applying config.", func_name=func_name
)
self.app.master.after(50, self.app.check_connection_auth)
else:
log_handler.log_warning(
"Cannot auto-check connection after apply config.", func_name=func_name
)
sync_refresh = True # At least trigger sync status update later
elif status == 'error':
log_handler.log_error(
f"Apply remote config failed: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Apply Config Error", f"Failed:\n{message}")
# Reset auth/sync status indicators on failure
self.app._update_gui_auth_status('unknown')
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(status_text="Sync Status: Unknown")
# Applying config doesn't require a full standard refresh list
return False, sync_refresh
# --- Handlers for Remote Git Actions ---
def _handle_fetch_remote_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'fetch_remote' async task. """
func_name: str = "_handle_fetch_remote_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
exception: Optional[Exception] = result_data.get('exception')
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == 'success':
log_handler.log_info(
f"Fetch remote successful: {message}", func_name=func_name
)
# Fetch updates remote branches and potentially tags/history
trigger_refreshes = True
sync_refresh = True # Fetch definitely changes sync status
elif status == 'error':
log_handler.log_error(f"Fetch remote failed: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Fetch Error", f"Failed:\n{message}")
# Update auth status based on the type of error
auth_status: str = self._determine_auth_status_from_error(exception)
self.app._update_gui_auth_status(auth_status)
# Update sync status label to show error
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(status_text="Sync Status: Error")
return trigger_refreshes, sync_refresh
def _handle_pull_remote_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'pull_remote' async task. """
func_name: str = "_handle_pull_remote_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
exception: Optional[Exception] = result_data.get('exception')
repo_path_conflict: Optional[str] = context.get('repo_path')
remote_name_context: Optional[str] = context.get("remote_name")
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == 'success':
log_handler.log_info(f"Pull remote successful: {message}", func_name=func_name)
trigger_refreshes = True # Pull potentially updates everything
sync_refresh = True
elif status == 'conflict':
log_handler.log_error(f"Pull resulted in conflict: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Merge Conflict",
f"Merge conflict occurred during pull from '{remote_name_context or 'remote'}'.\n\n"
f"Please resolve the conflicts manually in:\n{repo_path_conflict}\n\n"
f"After resolving, stage the changes and commit them."
)
trigger_refreshes = True # Refresh changes list
sync_refresh = False # State is invalid during conflict
self._reenable_widgets_after_modal() # Re-enable after popup
elif status == 'error':
log_handler.log_error(f"Pull remote failed: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Pull Error", f"Failed:\n{message}")
# Update auth and sync status based on error
auth_status: str = self._determine_auth_status_from_error(exception)
self.app._update_gui_auth_status(auth_status)
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(status_text="Sync Status: Error")
return trigger_refreshes, sync_refresh
def _handle_push_remote_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'push_remote' async task. """
func_name: str = "_handle_push_remote_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
exception: Optional[Exception] = result_data.get('exception')
rejected_branch: Optional[str] = result_data.get('branch_name') # Branch name if rejected
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == 'success':
log_handler.log_info(
f"Push remote successful: {message}", func_name=func_name
)
# Refresh history? Remote branches might not change immediately.
# Refreshing sync status is most important.
trigger_refreshes = False # Avoid full refresh list
sync_refresh = True # Sync status definitely changes
elif status == 'rejected':
log_handler.log_error(
f"Push rejected for branch '{rejected_branch}': {message}",
func_name=func_name
)
if hasattr(self.main_frame, "show_warning"):
self.main_frame.show_warning("Push Rejected", f"{message}")
# After rejection, user needs to fetch/pull, so trigger fetch
if hasattr(self.app, 'fetch_remote') and callable(self.app.fetch_remote):
log_handler.log_debug(
"Scheduling fetch after push rejection.", func_name=func_name
)
self.app.master.after(100, self.app.fetch_remote)
else: # Fallback if fetch method not found
trigger_refreshes = True # Refresh remote branches at least
sync_refresh = True # Sync status needs update
elif status == 'error':
log_handler.log_error(f"Push remote failed: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Push Error", f"Failed:\n{message}")
# Update auth and sync status based on error
auth_status: str = self._determine_auth_status_from_error(exception)
self.app._update_gui_auth_status(auth_status)
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(status_text="Sync Status: Error")
return trigger_refreshes, sync_refresh
def _handle_push_tags_remote_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'push_tags_remote' async task. """
func_name: str = "_handle_push_tags_remote_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
exception: Optional[Exception] = result_data.get('exception')
trigger_refreshes: bool = False
sync_refresh: bool = False # Pushing tags doesn't directly change branch sync status
if status == 'success':
log_handler.log_info(
f"Push tags successful: {message}", func_name=func_name
)
# Optional: Refresh remote tags? Git doesn't have a direct command for this easily.
# Refreshing remote branches might show new tags indirectly.
trigger_refreshes = True # Refresh remote branch list might be useful
elif status == 'error':
log_handler.log_error(f"Push tags failed: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Push Tags Error", f"Failed:\n{message}")
# Update auth status based on error
auth_status: str = self._determine_auth_status_from_error(exception)
self.app._update_gui_auth_status(auth_status)
return trigger_refreshes, sync_refresh
def _handle_clone_remote_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'clone_remote' async task. """
func_name: str = "_handle_clone_remote_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
success_data: Optional[dict] = context.get('clone_success_data') # Retrieve data passed in context
# Cloning success triggers profile creation and load, not standard refreshes
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == 'success':
log_handler.log_info(f"Clone successful. Creating profile...", func_name=func_name)
if success_data and isinstance(success_data, dict):
new_profile_name: Optional[str] = success_data.get('profile_name')
cloned_repo_path: Optional[str] = success_data.get('cloned_path')
cloned_remote_url: Optional[str] = success_data.get('remote_url')
if new_profile_name and cloned_repo_path and cloned_remote_url:
try:
# Create and save the new profile based on clone data
defaults: dict = self.app.config_manager._get_expected_keys_with_defaults()
defaults['svn_working_copy_path'] = cloned_repo_path
defaults['remote_url'] = cloned_remote_url
defaults['remote_name'] = DEFAULT_REMOTE_NAME # Assume origin
defaults['bundle_name'] = f"{new_profile_name}.bundle"
defaults['bundle_name_updated'] = f"{new_profile_name}_update.bundle"
# Set other defaults as needed
defaults['autobackup'] = "False"; defaults['autocommit'] = "False"; defaults['commit_message'] = ""
self.app.config_manager.add_section(new_profile_name)
for key, value in defaults.items():
self.app.config_manager.set_profile_option(new_profile_name, key, value)
self.app.config_manager.save_config()
log_handler.log_info(
f"Profile '{new_profile_name}' created successfully for cloned repo.",
func_name=func_name
)
# Update GUI dropdown and select the new profile (triggers load)
sections: list = self.app.config_manager.get_profile_sections()
if hasattr(self.main_frame, "update_profile_dropdown") and hasattr(self.main_frame, "profile_var"):
self.main_frame.update_profile_dropdown(sections)
# Setting var triggers load_profile_settings, which re-enables widgets
self.main_frame.profile_var.set(new_profile_name)
except Exception as profile_e:
log_handler.log_exception(
f"Clone successful, but failed to create profile '{new_profile_name}': {profile_e}",
func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Profile Creation Error", f"Clone succeeded, but failed to create profile:\n{profile_e}"
)
self._reenable_widgets_after_modal() # Re-enable on profile error
else:
log_handler.log_error(
"Clone successful, but missing data to create profile.",
func_name=func_name
)
if hasattr(self.main_frame, "update_status_bar"):
self.main_frame.update_status_bar("Clone done, profile data missing.")
self._reenable_widgets_after_modal() # Re-enable
else:
log_handler.log_error(
"Clone successful, but success data is missing or invalid.",
func_name=func_name
)
if hasattr(self.main_frame, "update_status_bar"):
self.main_frame.update_status_bar("Clone done, profile data error.")
self._reenable_widgets_after_modal() # Re-enable
elif status == 'error':
log_handler.log_error(f"Clone operation failed: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Clone Error", f"{message}")
# Re-enable widgets after clone error
self._reenable_widgets_after_modal()
return trigger_refreshes, sync_refresh
def _handle_checkout_tracking_branch_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Handles the result of the 'checkout_tracking_branch' async task. """
func_name: str = "_handle_checkout_tracking_branch_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == 'success':
log_handler.log_info(
f"Successfully checked out tracking branch: {message}", func_name=func_name
)
trigger_refreshes = True # Update everything after checkout
sync_refresh = True
# Widgets re-enabled after refreshes by _trigger_post_action_refreshes
elif status == 'error':
log_handler.log_error(
f"Checkout tracking branch failed: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Checkout Error", f"{message}")
# Update sync status to error
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(status_text="Sync Status: Error")
# Re-enable widgets immediately after error
self._reenable_widgets_after_modal()
return trigger_refreshes, sync_refresh
# --- Handler Specifico Spostato (Compare Branches) ---
def _handle_compare_branches_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
"""
Handles the result of 'compare_branches'. If successful, calls the main app
to display the summary window.
"""
func_name: str = "_handle_compare_branches_result"
status: Optional[str] = result_data.get('status')
message: Optional[str] = result_data.get('message')
result_value: Any = result_data.get('result') # Expected: List[str]
if status == 'success':
log_handler.log_info(
"Branch comparison successful. Requesting summary window display...",
func_name=func_name
)
# Retrieve context needed to show the summary window
ref1_ctx: Optional[str] = context.get("ref1")
ref2_ctx: Optional[str] = context.get("ref2")
repo_path_ctx: Optional[str] = context.get("repo_path")
changed_files_list: list = result_value if isinstance(result_value, list) else []
# Check if all necessary data is available
if ref1_ctx and ref2_ctx and repo_path_ctx:
# Call the app method to display the summary window
# This keeps the GUI instantiation logic within the main app/GUI modules
if hasattr(self.app, "show_comparison_summary") and callable(self.app.show_comparison_summary):
self.app.show_comparison_summary(
ref1=ref1_ctx,
ref2=ref2_ctx,
repo_path=repo_path_ctx,
changed_files=changed_files_list
)
# Widgets are re-enabled by show_comparison_summary in its finally block
else:
# Fallback if the display method is missing
log_handler.log_error(
"Cannot display comparison summary: 'show_comparison_summary' method missing on app.",
func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Internal Error", "Cannot display comparison results."
)
self._reenable_widgets_after_modal() # Re-enable on internal error
else: # Missing context data
log_handler.log_error(
"Missing context data (ref1, ref2, repo_path) to show comparison summary.",
func_name=func_name
)
if hasattr(self.main_frame, "update_status_bar"):
self.main_frame.update_status_bar("Error: Internal data missing for comparison.")
self._reenable_widgets_after_modal() # Re-enable
elif status == 'error':
# Handle error during the compare operation itself
log_handler.log_error(f"Branch comparison failed: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Comparison Error", f"Could not compare branches:\n{message}"
)
# Widgets already re-enabled by caller for error status
# Compare action itself does not trigger standard post-action refreshes
return False, False
# --- Gestore Generico ---
def _handle_generic_result(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, bool]:
""" Generic handler for results without a specific method yet. """
func_name: str = "_handle_generic_result"
status: Optional[str] = result_data.get('status')
message: str = result_data.get('message', "Operation finished.")
exception: Optional[Exception] = result_data.get('exception')
task_context: str = context.get('context', 'unknown')
log_handler.log_debug(
f"Handling generic result for '{task_context}' with status '{status}'.",
func_name=func_name
)
# Basic error/warning display for unhandled contexts
if status == 'error':
log_handler.log_error(
f"Error reported for task '{task_context}': {message}",
func_name=func_name
)
error_details: str = f"{message}\n({type(exception).__name__}: {exception})" if exception else message
if hasattr(self.main_frame, "show_error"):
# Show error popup using task context in title
error_title: str = f"Error: {task_context.replace('_',' ').title()}"
self.main_frame.show_error(error_title, error_details)
elif status == 'warning':
if hasattr(self.main_frame, "show_warning"):
self.main_frame.show_warning("Operation Info", message)
# No refreshes triggered by default for generic handler
return False, False
# --- Helper Methods ---
def _determine_auth_status_from_error(self, exception: Optional[Exception]) -> str:
""" Analyzes an exception (usually GitCommandError) to guess auth/conn status. """
# Check if it's a GitCommandError and has stderr content
if isinstance(exception, GitCommandError) and exception.stderr:
stderr_low: str = exception.stderr.lower()
# Define common error substrings
auth_errors: List[str] = ["authentication failed", "permission denied", "could not read"]
conn_errors: List[str] = [
"repository not found", "could not resolve host", "failed to connect",
"network is unreachable", "timed out", "unable to access"
]
# Check for authentication errors
if any(e in stderr_low for e in auth_errors):
return 'failed' # Authentication explicitly failed
# Check for connection errors
if any(e in stderr_low for e in conn_errors):
return 'connection_failed'
# If no specific error pattern matched, return unknown error
return 'unknown_error'
def _reenable_widgets_after_modal(self):
""" Schedules widget re-enabling after a short delay. """
# Use master.after to ensure this runs in the main GUI thread
if hasattr(self.app, "master") and self.app.master.winfo_exists():
# Schedule the re-enable function defined in the main app class
self.app.master.after(50, self.app._reenable_widgets_if_ready)
def _trigger_post_action_refreshes(self, sync_refresh_needed: bool):
"""
Determines and schedules necessary GUI refreshes after an action has
been successfully processed by its handler.
"""
func_name: str = "_trigger_post_action_refreshes"
# Ensure main window exists
if not hasattr(self.app, 'master') or not self.app.master.winfo_exists():
log_handler.log_warning(
"Cannot trigger refreshes: Master window closed.", func_name=func_name
)
return
# Get current repo path, exit if not valid/ready
current_repo_path: Optional[str] = self.app._get_and_validate_svn_path("Post-Action Refresh Trigger")
if not current_repo_path or not self.app._is_repo_ready(current_repo_path):
log_handler.log_warning(
"Cannot trigger refreshes: Repo path unavailable or not ready.",
func_name=func_name
)
# Ensure widgets are re-enabled if repo isn't ready after an action
self._reenable_widgets_after_modal()
return
# List of standard refresh functions to potentially call
refresh_functions_to_call: List[Callable] = [
self.app.refresh_commit_history,
self.app.refresh_branch_list, # Refreshes local branches
self.app.refresh_tag_list,
self.app.refresh_changed_files_list,
self.app.refresh_remote_branches, # Also refresh remote branch list
]
log_handler.log_debug(
f"Scheduling {len(refresh_functions_to_call)} standard refreshes. Sync refresh needed: {sync_refresh_needed}",
func_name=func_name
)
# Schedule standard refreshes with increasing delays
delay_ms: int = 50 # Initial delay
for refresh_func in refresh_functions_to_call:
try:
# Use lambda to capture the current function in the loop correctly
# This ensures the correct function is called when the 'after' timer fires
schedule_func: Callable = lambda rf=refresh_func: rf()
self.app.master.after(delay_ms, schedule_func)
log_handler.log_debug(
f"Scheduled {getattr(refresh_func, '__name__', 'unknown_refresh')} with delay {delay_ms}ms.",
func_name=func_name
)
# Increment delay for the next refresh function
delay_ms += 75
except Exception as ref_e:
log_handler.log_error(
f"Error scheduling {getattr(refresh_func, '__name__', 'refresh function')}: {ref_e}",
func_name=func_name
)
# Schedule sync status refresh (ahead/behind) if needed, after others
if sync_refresh_needed:
if hasattr(self.app, "refresh_remote_status") and callable(self.app.refresh_remote_status):
log_handler.log_debug(
f"Scheduling remote status refresh with delay {delay_ms}ms.",
func_name=func_name
)
self.app.master.after(delay_ms, self.app.refresh_remote_status)
else:
# Log warning if the sync status refresh method is missing
log_handler.log_warning(
"Sync refresh needed but refresh_remote_status method not found on app.",
func_name=func_name
)
# Increment delay even if scheduling failed, before re-enabling widgets
delay_ms += 75
# --- Schedule Final Widget Re-enable ---
# This ensures widgets are re-enabled *after* all scheduled refreshes have had a chance to start.
def _reenable_widgets_final():
""" Re-enables widgets after all post-action refreshes are scheduled. """
if hasattr(self.main_frame, "winfo_exists") and self.main_frame.winfo_exists():
log_handler.log_debug(
"Re-enabling widgets after post-action refreshes scheduled.",
func_name="_reenable_widgets_final"
)
self.main_frame.set_action_widgets_state(tk.NORMAL)
# Schedule the re-enable function after the last potential refresh delay
self.app.master.after(delay_ms + 50, _reenable_widgets_final)
# --- End of AsyncResultHandler Class ---