SXXXXXXX_GitUtility/gitutility/async_tasks/async_result_handler.py
2025-07-07 15:05:51 +02:00

1597 lines
72 KiB
Python

# --- FILE: gitsync_tool/async_tasks/async_result_handler.py ---
import tkinter as tk
from tkinter import messagebox # Importa messagebox
import queue
import os
import logging # Per livelli
from typing import (
TYPE_CHECKING,
Dict,
Any,
List,
Tuple,
Callable,
Optional,
Set,
) # Aggiunto Set
# Usa percorsi assoluti per importare moduli dal pacchetto
from gitutility.logging_setup import log_handler
from gitutility.commands.git_commands import GitCommandError
# Importa async_workers (necessario per schedulare task interattivo)
from gitutility.async_tasks import async_workers
# Importa costanti di configurazione
from gitutility.config.config_manager import DEFAULT_REMOTE_NAME
# Forward Reference per Type Hinting (Aggiornato percorsi)
if TYPE_CHECKING:
from gitutility.app import GitSvnSyncApp
# Importa MainFrame dal nuovo percorso
from gitutility.gui.main_frame import MainFrame
class AsyncResultHandler:
"""
Handles processing the results received from asynchronous worker functions.
Updates the GUI based on the outcome of background tasks (e.g., populating
lists, showing messages) and triggers necessary follow-up actions like
GUI refreshes or subsequent asynchronous tasks.
Separates result processing logic from the main application controller.
"""
def __init__(self, app: "GitSvnSyncApp"):
"""
Initializes the result handler.
Args:
app (GitSvnSyncApp): The main application instance, used to access
GUI elements and other application methods.
Raises:
ValueError: If the provided 'app' instance is invalid or missing main_frame.
"""
# Validate the main application instance passed
if app is None:
raise ValueError("AsyncResultHandler requires a valid 'app' instance.")
# Ensure the main_frame is already initialized on the app instance
if not hasattr(app, "main_frame") or not app.main_frame:
raise ValueError(
"AsyncResultHandler requires the app's main_frame to be initialized before instantiation."
)
# Store references to the app and its main frame
self.app: "GitSvnSyncApp" = app
# ---<<< MODIFICA: Assicurati che il type hint per main_frame sia corretto >>>---
self.main_frame: "MainFrame" = app.main_frame # Usa il forward reference
# ---<<< FINE MODIFICA >>>---
log_handler.log_debug("AsyncResultHandler initialized.", func_name="__init__")
def process(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> None:
"""
Main entry point for processing results from asynchronous workers.
Dispatches the result to the appropriate handler method based on the
provided context. Schedules subsequent GUI refreshes if needed.
Args:
result_data (Dict[str, Any]): The dictionary returned by the worker thread.
Expected keys: 'status', 'message', 'result', 'exception'.
context (Dict[str, Any]): The context dictionary passed when the
async operation was initially started.
Expected keys: 'context', 'status_msg', etc.
"""
# Extract task context and status from the received data
task_context: str = context.get("context", "unknown")
status: str = result_data.get(
"status", "error"
) # Default to 'error' if missing
func_name: str = f"process (ctx: {task_context})" # For logging
log_handler.log_debug(
f"Processing async result for context '{task_context}' with status '{status}'.",
func_name=func_name,
)
# --- Determine Refresh Needs ---
# Flags set by specific handler methods, default to no refresh
should_trigger_refreshes: bool = False
post_action_sync_refresh_needed: bool = False # For ahead/behind status
# --- Dispatch to Appropriate Handler Method ---
# Map context strings to their corresponding handler methods
handler_map: Dict[str, Callable] = {
"compare_branches": self._handle_compare_branches_result,
"refresh_tags": self._handle_refresh_tags_result,
"refresh_branches": self._handle_refresh_local_branches_result,
"refresh_history": self._handle_refresh_history_result,
"refresh_changes": self._handle_refresh_changes_result,
"refresh_remote_branches": self._handle_refresh_remote_branches_result,
"get_ahead_behind": self._handle_get_ahead_behind_result,
"prepare_repo": self._handle_prepare_repo_result,
"create_bundle": self._handle_create_bundle_result,
"fetch_bundle": self._handle_fetch_bundle_result,
"manual_backup": self._handle_manual_backup_result,
"commit": self._handle_commit_result,
"_handle_gitignore_save": self._handle_untrack_result, # Context for untrack after save
"add_file": self._handle_add_file_result,
"create_tag": self._handle_create_tag_result,
"checkout_tag": self._handle_checkout_tag_result,
"create_branch": self._handle_create_branch_result,
"checkout_branch": self._handle_checkout_branch_result,
"delete_local_branch": self._handle_delete_local_branch_result,
"merge_local_branch": self._handle_merge_local_branch_result,
"check_connection": self._handle_check_connection_result,
"interactive_auth": self._handle_interactive_auth_result,
"apply_remote_config": self._handle_apply_remote_config_result,
"fetch_remote": self._handle_fetch_remote_result,
"pull_remote": self._handle_pull_remote_result,
"push_remote": self._handle_push_remote_result,
"push_tags_remote": self._handle_push_tags_remote_result,
"clone_remote": self._handle_clone_remote_result,
"checkout_tracking_branch": self._handle_checkout_tracking_branch_result,
"get_commit_details": self._handle_get_commit_details_result,
"update_wiki": self._handle_generic_result,
"revert_to_tag": self._handle_revert_to_tag_result,
"analyze_history": self._handle_analyze_history_result,
"purge_history": self._handle_purge_history_result,
}
# Get the handler method from the map
handler_method: Optional[Callable] = handler_map.get(task_context)
# --- Execute the Handler ---
if handler_method and callable(handler_method):
log_handler.log_debug(
f"Dispatching to handler: {handler_method.__name__}",
func_name=func_name,
)
# Call the specific handler method
# It's expected to return a tuple: (should_trigger_refreshes, post_action_sync_refresh_needed)
refresh_flags: Any = handler_method(result_data, context)
# Unpack the refresh flags
if isinstance(refresh_flags, tuple) and len(refresh_flags) == 2:
should_trigger_refreshes, post_action_sync_refresh_needed = (
refresh_flags
)
else:
# Log warning if handler didn't return expected flags
log_handler.log_warning(
f"Handler '{handler_method.__name__}' for context '{task_context}' did not return the expected refresh flags tuple (bool, bool). Defaulting to no refresh.",
func_name=func_name,
)
should_trigger_refreshes = False
post_action_sync_refresh_needed = False
else:
# Fallback for unhandled contexts
log_handler.log_warning(
f"No specific handler method found for context: '{task_context}'. Using generic handler.",
func_name=func_name,
)
# Call generic handler (returns default False, False for refreshes)
should_trigger_refreshes, post_action_sync_refresh_needed = (
self._handle_generic_result(result_data, context)
)
# --- Trigger Necessary GUI Refreshes (if needed) ---
# This happens *after* the specific result has been processed
if should_trigger_refreshes:
log_handler.log_debug(
f"Triggering post-action refreshes for context '{task_context}'. Sync refresh needed: {post_action_sync_refresh_needed}",
func_name=func_name,
)
self._trigger_post_action_refreshes(post_action_sync_refresh_needed)
else:
log_handler.log_debug(
f"No standard post-action refreshes required for context '{task_context}'.",
func_name=func_name,
)
# --- Handler Methods for Refresh Tasks ---
# (Metodi _handle_refresh_... , _handle_get_ahead_behind - INVARIATI rispetto a prima)
# Assicurati che chiamino i metodi corretti su self.main_frame e self.app
def _handle_refresh_tags_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_refresh_tags_result"
status: Optional[str] = result_data.get("status")
result_value: Any = result_data.get("result")
if not hasattr(self.main_frame, "update_tag_list"):
log_handler.log_error(
"MainFrame missing 'update_tag_list'.", func_name=func_name
)
return False, False
if status == "success":
tags_list: list = result_value if isinstance(result_value, list) else []
self.main_frame.update_tag_list(tags_list)
log_handler.log_debug(
f"Tag list updated ({len(tags_list)} items).", func_name=func_name
)
elif status == "error":
log_handler.log_error(
"Error reported during tag refresh.", func_name=func_name
)
self.main_frame.update_tag_list([("(Error)", "")])
return False, False
def _handle_refresh_local_branches_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato, usa self.app.current_local_branch)
func_name: str = "_handle_refresh_local_branches_result"
status: Optional[str] = result_data.get("status")
result_value: Any = result_data.get("result")
gui_methods_exist: bool = hasattr(
self.main_frame, "update_branch_list"
) and hasattr(self.main_frame, "update_history_branch_filter")
if not gui_methods_exist:
log_handler.log_error(
"MainFrame missing update methods for local branches.",
func_name=func_name,
)
return False, False
branches: List[str] = []
current_branch: Optional[str] = None
if status == "success":
if isinstance(result_value, tuple) and len(result_value) == 2:
branches, current_branch = result_value
self.app.current_local_branch = current_branch
log_handler.log_debug(
f"Local branches updated. Current: {current_branch}",
func_name=func_name,
)
else:
log_handler.log_warning(
"Invalid result format for local branches refresh.",
func_name=func_name,
)
branches = ["(Invalid Data)"]
current_branch = None
self.app.current_local_branch = None
elif status == "error":
log_handler.log_error(
"Error reported during local branches refresh.", func_name=func_name
)
branches = ["(Error)"]
current_branch = None
self.app.current_local_branch = None
self.main_frame.update_branch_list(branches, current_branch)
self.main_frame.update_history_branch_filter(branches)
if hasattr(self.app, "refresh_remote_status") and callable(
self.app.refresh_remote_status
):
log_handler.log_debug(
"Scheduling remote status refresh after local branches update.",
func_name=func_name,
)
self.app.master.after(100, self.app.refresh_remote_status)
else:
log_handler.log_warning(
"Could not schedule sync status refresh: method missing on app.",
func_name=func_name,
)
return False, False
def _handle_refresh_history_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato, usa Treeview ora)
func_name: str = "_handle_refresh_history_result"
status: Optional[str] = result_data.get("status")
result_value: Any = result_data.get("result")
if not hasattr(self.main_frame, "update_history_display"):
log_handler.log_error(
"MainFrame missing 'update_history_display'.", func_name=func_name
)
return False, False
if status == "success":
log_lines: list = result_value if isinstance(result_value, list) else []
self.main_frame.update_history_display(log_lines)
log_handler.log_debug(
f"History display updated ({len(log_lines)} lines).",
func_name=func_name,
)
elif status == "error":
log_handler.log_error(
"Error reported during history refresh.", func_name=func_name
)
self.main_frame.update_history_display(["(Error retrieving history)"])
return False, False
def _handle_refresh_changes_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_refresh_changes_result"
status: Optional[str] = result_data.get("status")
result_value: Any = result_data.get("result")
if not hasattr(self.main_frame, "update_changed_files_list"):
log_handler.log_error(
"MainFrame missing 'update_changed_files_list'.", func_name=func_name
)
return False, False
if status == "success":
files_list: list = result_value if isinstance(result_value, list) else []
self.main_frame.update_changed_files_list(files_list)
log_handler.log_debug(
f"Changed files list updated ({len(files_list)} items).",
func_name=func_name,
)
elif status == "error":
log_handler.log_error(
"Error reported during changes refresh.", func_name=func_name
)
self.main_frame.update_changed_files_list(["(Error refreshing changes)"])
return False, False
def _handle_refresh_remote_branches_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_refresh_remote_branches_result"
status: Optional[str] = result_data.get("status")
result_value: Any = result_data.get("result")
if not hasattr(self.main_frame, "update_remote_branches_list"):
log_handler.log_error(
"MainFrame missing 'update_remote_branches_list'.", func_name=func_name
)
return False, False
if status == "success":
branch_list: list = (
result_value if isinstance(result_value, list) else ["(Invalid Data)"]
)
self.main_frame.update_remote_branches_list(branch_list)
log_handler.log_debug(
f"Remote branches list updated ({len(branch_list)} items).",
func_name=func_name,
)
elif status == "error":
log_handler.log_error(
"Error reported during remote branches refresh.", func_name=func_name
)
self.main_frame.update_remote_branches_list(["(Error)"])
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(
status_text="Sync Status: Unknown"
)
return False, False
def _handle_get_ahead_behind_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_get_ahead_behind_result"
status: Optional[str] = result_data.get("status")
result_value: Any = result_data.get("result")
message: Optional[str] = result_data.get("message")
local_branch_ctx: Optional[str] = context.get("local_branch")
if not hasattr(self.main_frame, "update_ahead_behind_status"):
log_handler.log_error(
"MainFrame missing 'update_ahead_behind_status'.", func_name=func_name
)
return False, False
if status == "success":
ahead, behind = (
result_value if isinstance(result_value, tuple) else (None, None)
)
log_handler.log_info(
f"Ahead/Behind status for '{local_branch_ctx}': Ahead={ahead}, Behind={behind}",
func_name=func_name,
)
self.main_frame.update_ahead_behind_status(
current_branch=local_branch_ctx, ahead=ahead, behind=behind
)
elif status == "error":
log_handler.log_error(
f"Failed getting ahead/behind for '{local_branch_ctx}': {message}",
func_name=func_name,
)
error_text: str = (
f"Sync Status: Error"
if not message
else f"Sync Status: Error ({message})"
)
self.main_frame.update_ahead_behind_status(
current_branch=local_branch_ctx, status_text=error_text
)
return False, False
# --- Handler Methods for Local Git Actions ---
# (Metodi _handle_... INVARIATI rispetto a prima)
# Assicurati che chiamino i metodi corretti su self.main_frame e self.app
def _handle_prepare_repo_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_prepare_repo_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
"Repository preparation successful.", func_name=func_name
)
trigger_refreshes = True
sync_refresh = True
elif status == "warning":
log_handler.log_warning(
f"Prepare repo warning: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_info"):
self.main_frame.show_info("Prepare Info", message)
trigger_refreshes = True
sync_refresh = True
elif status == "error":
log_handler.log_error(
f"Repository preparation failed: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Prepare Error", f"Failed:\n{message}")
if hasattr(self.app, "update_svn_status_indicator") and hasattr(
self.app.main_frame, "svn_path_entry"
):
self.app.master.after(
50,
lambda: self.app.update_svn_status_indicator(
self.app.main_frame.svn_path_entry.get()
),
)
return trigger_refreshes, sync_refresh
def _handle_create_bundle_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_create_bundle_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
committed: bool = result_data.get("committed", False)
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
f"Create bundle finished: {message}", func_name=func_name
)
if committed:
trigger_refreshes = True
sync_refresh = True
elif status == "error":
log_handler.log_error(
f"Create bundle failed: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Create Bundle Error", f"Failed:\n{message}")
return trigger_refreshes, sync_refresh
def _handle_fetch_bundle_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_fetch_bundle_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
is_conflict: bool = result_data.get("conflict", False)
repo_path_conflict: Optional[str] = result_data.get("repo_path")
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
f"Fetch from bundle successful: {message}", func_name=func_name
)
trigger_refreshes = True
sync_refresh = True
elif status == "error":
log_handler.log_error(
f"Fetch from bundle failed: {message}", func_name=func_name
)
if is_conflict and repo_path_conflict:
log_handler.log_error(
"Merge conflict detected during fetch from bundle.",
func_name=func_name,
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Merge Conflict",
f"Conflict occurred during bundle fetch.\nResolve in:\n{repo_path_conflict}\nThen commit.",
)
trigger_refreshes = True
sync_refresh = False
self._reenable_widgets_after_modal()
else:
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Fetch Bundle Error", f"Failed:\n{message}"
)
if hasattr(self.app, "update_svn_status_indicator") and hasattr(
self.app.main_frame, "svn_path_entry"
):
self.app.master.after(
50,
lambda: self.app.update_svn_status_indicator(
self.app.main_frame.svn_path_entry.get()
),
)
return trigger_refreshes, sync_refresh
def _handle_manual_backup_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_manual_backup_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
if status == "success":
log_handler.log_info(
f"Manual backup finished: {message}", func_name=func_name
) # Optional: self.main_frame.show_info("Backup Complete", message)
elif status == "error":
log_handler.log_error(
f"Manual backup failed: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Manual Backup Error", f"Failed:\n{message}")
return False, False
def _handle_commit_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_commit_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
committed: bool = result_data.get("committed", False)
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
f"Commit operation finished: {message}", func_name=func_name
)
if committed:
if hasattr(self.main_frame, "clear_commit_message"):
self.main_frame.clear_commit_message()
trigger_refreshes = True
sync_refresh = True
elif status == "error":
log_handler.log_error(f"Commit failed: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Commit Error", f"Failed:\n{message}")
return trigger_refreshes, sync_refresh
def _handle_untrack_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_untrack_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
committed: bool = result_data.get("committed", False)
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
f"Untrack operation finished: {message}", func_name=func_name
)
if committed:
trigger_refreshes = True
sync_refresh = True
elif status == "error":
log_handler.log_error(f"Untracking failed: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Untrack Error", f"Failed:\n{message}")
return trigger_refreshes, sync_refresh
def _handle_get_commit_details_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_get_commit_details_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
commit_details: Optional[Dict[str, Any]] = result_data.get("result")
if status == "success":
log_handler.log_info(
"Commit details retrieved successfully. Requesting detail window display...",
func_name=func_name,
)
if isinstance(commit_details, dict):
if hasattr(self.app, "show_commit_details") and callable(
self.app.show_commit_details
):
self.app.show_commit_details(commit_details)
else:
log_handler.log_error(
"Cannot display commit details: 'show_commit_details' method missing on app.",
func_name=func_name,
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Internal Error", "Cannot display commit details window."
)
self._reenable_widgets_after_modal()
else:
log_handler.log_error(
f"Received success for commit details, but result is not dict: {type(commit_details)}",
func_name=func_name,
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Data Error", "Failed to process commit details."
)
self._reenable_widgets_after_modal()
elif status == "error":
log_handler.log_error(
f"Failed to get commit details: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Commit Details Error", f"Could not get details:\n{message}"
) # Widgets already re-enabled
return False, False
def _handle_add_file_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_add_file_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(f"Add file successful: {message}", func_name=func_name)
trigger_refreshes = True
elif status == "error":
log_handler.log_error(f"Add file failed: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Add File Error", f"Failed:\n{message}")
return trigger_refreshes, sync_refresh
def _handle_create_tag_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_create_tag_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
f"Tag creation successful: {message}", func_name=func_name
)
trigger_refreshes = True
elif status == "error":
log_handler.log_error(f"Create tag failed: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Create Tag Error", f"Failed:\n{message}")
return trigger_refreshes, sync_refresh
def _handle_checkout_tag_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_checkout_tag_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
exception: Optional[Exception] = result_data.get("exception")
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
f"Checkout tag successful: {message}", func_name=func_name
)
trigger_refreshes = True
sync_refresh = True
elif status == "error":
log_handler.log_error(
f"Checkout tag failed: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
if exception and "Uncommitted changes" in str(exception):
self.main_frame.show_warning(
"Checkout Blocked", f"{message}\nCommit or stash first."
)
else:
self.main_frame.show_error(
"Checkout Tag Error", f"Failed:\n{message}"
)
self._reenable_widgets_after_modal()
return trigger_refreshes, sync_refresh
def _handle_create_branch_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_create_branch_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
new_branch_context: Optional[str] = context.get("new_branch_name")
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
f"Create branch successful: {message}", func_name=func_name
)
if new_branch_context and hasattr(self.main_frame, "ask_yes_no"):
if self.main_frame.ask_yes_no(
"Checkout?", f"Switch to new branch '{new_branch_context}'?"
):
if hasattr(self.app, "checkout_branch") and callable(
self.app.checkout_branch
):
self.app.checkout_branch(branch_to_checkout=new_branch_context)
trigger_refreshes = False
sync_refresh = False # Let checkout handle refreshes
else:
log_handler.log_error(
"Cannot checkout new branch: method missing on app.",
func_name=func_name,
)
trigger_refreshes = True
else:
trigger_refreshes = True
else:
trigger_refreshes = True
elif status == "error":
log_handler.log_error(
f"Create branch failed: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Create Branch Error", f"Failed:\n{message}")
return trigger_refreshes, sync_refresh
def _handle_checkout_branch_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_checkout_branch_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
exception: Optional[Exception] = result_data.get("exception")
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
f"Checkout branch successful: {message}", func_name=func_name
)
trigger_refreshes = True
sync_refresh = True
elif status == "error":
log_handler.log_error(
f"Checkout branch failed: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
if exception and "Uncommitted changes" in str(exception):
self.main_frame.show_warning(
"Checkout Blocked", f"{message}\nCommit or stash first."
)
else:
self.main_frame.show_error(
"Checkout Branch Error", f"Failed:\n{message}"
)
self._reenable_widgets_after_modal()
return trigger_refreshes, sync_refresh
def _handle_delete_local_branch_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_delete_local_branch_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
exception: Optional[Exception] = result_data.get("exception")
deleted_branch_name: Optional[str] = context.get("branch_name")
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
f"Local branch '{deleted_branch_name}' deleted successfully.",
func_name=func_name,
)
trigger_refreshes = True
elif status == "error":
log_handler.log_error(
f"Delete local branch '{deleted_branch_name}' failed: {message}",
func_name=func_name,
)
if hasattr(self.main_frame, "show_error"):
stderr_text = (
getattr(exception, "stderr", "")
if isinstance(exception, GitCommandError)
else ""
)
if "not fully merged" in message or "not fully merged" in stderr_text:
self.main_frame.show_warning("Delete Warning", f"{message}")
else:
self.main_frame.show_error("Delete Error", f"{message}")
return trigger_refreshes, sync_refresh
def _handle_merge_local_branch_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_merge_local_branch_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
merged_into: Optional[str] = context.get("branch_merged_into")
merged_from: Optional[str] = context.get("branch_merged_from")
repo_path_conflict: Optional[str] = context.get("repo_path")
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
f"Successfully merged '{merged_from}' into '{merged_into}'.",
func_name=func_name,
)
trigger_refreshes = True
sync_refresh = True
elif status == "conflict":
log_handler.log_error(
f"Merge conflict occurred merging '{merged_from}'. User needs to resolve manually.",
func_name=func_name,
)
if hasattr(self, "main_frame"):
self.main_frame.show_error(
"Merge Conflict",
f"Merge conflict occurred while merging '{merged_from}' into '{merged_into}'.\n\nPlease resolve the conflicts manually in:\n{repo_path_conflict}\n\nAfter resolving, stage the changes and commit them.",
)
trigger_refreshes = True
sync_refresh = False
self._reenable_widgets_after_modal()
elif status == "error":
log_handler.log_error(
f"Merge local branch failed: {message}", func_name=func_name
)
if hasattr(self, "main_frame"):
self.main_frame.show_error("Merge Error", f"{message}")
self._reenable_widgets_after_modal()
return trigger_refreshes, sync_refresh
# --- Handlers for Authentication and Remote Config ---
# (Metodi _handle_... INVARIATI rispetto a prima)
def _handle_check_connection_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_check_connection_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
result_value: Optional[str] = result_data.get("result")
remote_name: str = context.get("remote_name_checked", "unknown remote")
repo_path_checked: Optional[str] = context.get("repo_path_checked")
sync_refresh: bool = False
if status == "success":
auth_status_to_set: str = "ok"
log_handler.log_info(
f"Connection check successful for '{remote_name}'.", func_name=func_name
)
self.app._update_gui_auth_status(auth_status_to_set)
sync_refresh = True
elif status == "auth_required":
log_handler.log_warning(
f"Authentication required for remote '{remote_name}'.",
func_name=func_name,
)
self.app._update_gui_auth_status("required")
if (
repo_path_checked
and hasattr(self.main_frame, "ask_yes_no")
and self.main_frame.ask_yes_no(
"Authentication Required",
f"Authentication is required to connect to remote '{remote_name}'.\n\nDo you want to attempt authentication now?\n(This may open a separate terminal window for credential input.)",
)
):
log_handler.log_info(
"User requested interactive authentication attempt.",
func_name=func_name,
)
args_interactive: tuple = (
self.app.git_commands,
repo_path_checked,
remote_name,
)
if hasattr(self.app, "_start_async_operation"):
self.app._start_async_operation(
worker_func=async_workers.run_interactive_auth_attempt_async,
args_tuple=args_interactive,
context_dict={
"context": "interactive_auth",
"status_msg": f"Attempting interactive auth for '{remote_name}'",
"original_context": context,
},
)
else:
log_handler.log_error(
"Cannot start interactive auth: _start_async_operation missing on app.",
func_name=func_name,
)
self._reenable_widgets_after_modal()
else:
log_handler.log_info(
"User declined interactive authentication attempt or required data missing.",
func_name=func_name,
)
self._reenable_widgets_after_modal()
elif status == "error":
error_type: str = (
result_value
if result_value
in ["connection_failed", "unknown_error", "worker_exception"]
else "unknown_error"
)
self.app._update_gui_auth_status(error_type)
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(
status_text="Sync Status: Error"
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Connection Error", f"{message}")
return False, sync_refresh
def _handle_interactive_auth_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_interactive_auth_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
result_value: Optional[str] = result_data.get("result")
original_context: dict = context.get("original_context", {})
remote_name: str = original_context.get("remote_name_checked", "unknown remote")
if status == "success" and result_value == "auth_attempt_success":
log_handler.log_info(
f"Interactive auth attempt for '{remote_name}' successful. Re-checking connection...",
func_name=func_name,
)
if hasattr(self.main_frame, "update_status_bar"):
self.main_frame.update_status_bar(
f"Authentication successful. Checking status..."
)
if hasattr(self.app, "check_connection_auth") and callable(
self.app.check_connection_auth
):
self.app.master.after(100, self.app.check_connection_auth)
else:
log_handler.log_error(
"Cannot re-check connection: check_connection_auth missing on app.",
func_name=func_name,
)
self._reenable_widgets_after_modal()
elif status == "error":
log_handler.log_warning(
f"Interactive auth attempt for '{remote_name}' failed: {message}",
func_name=func_name,
)
self.app._update_gui_auth_status("failed")
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(
status_text="Sync Status: Auth Failed"
)
if hasattr(self.main_frame, "show_warning"):
self.main_frame.show_warning(
"Authentication Attempt Failed", f"{message}"
)
self._reenable_widgets_after_modal()
return False, False
def _handle_apply_remote_config_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_apply_remote_config_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
f"Apply remote config successful: {message}", func_name=func_name
)
if hasattr(self.app, "check_connection_auth") and callable(
self.app.check_connection_auth
):
log_handler.log_debug(
"Scheduling connection check after applying config.",
func_name=func_name,
)
self.app.master.after(50, self.app.check_connection_auth)
else:
log_handler.log_warning(
"Cannot auto-check connection after apply config.",
func_name=func_name,
)
sync_refresh = True
elif status == "error":
log_handler.log_error(
f"Apply remote config failed: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Apply Config Error", f"Failed:\n{message}")
self.app._update_gui_auth_status("unknown")
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(
status_text="Sync Status: Unknown"
)
return False, sync_refresh
# --- Handlers for Remote Git Actions ---
# (Metodi _handle_... INVARIATI rispetto a prima)
def _handle_fetch_remote_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_fetch_remote_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
exception: Optional[Exception] = result_data.get("exception")
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
f"Fetch remote successful: {message}", func_name=func_name
)
trigger_refreshes = True
sync_refresh = True
elif status == "error":
log_handler.log_error(
f"Fetch remote failed: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Fetch Error", f"Failed:\n{message}")
auth_status: str = self._determine_auth_status_from_error(exception)
self.app._update_gui_auth_status(auth_status)
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(
status_text="Sync Status: Error"
)
return trigger_refreshes, sync_refresh
def _handle_pull_remote_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_pull_remote_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
exception: Optional[Exception] = result_data.get("exception")
repo_path_conflict: Optional[str] = context.get("repo_path")
remote_name_context: Optional[str] = context.get("remote_name")
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
f"Pull remote successful: {message}", func_name=func_name
)
trigger_refreshes = True
sync_refresh = True
elif status == "conflict":
log_handler.log_error(
f"Pull resulted in conflict: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Merge Conflict",
f"Merge conflict occurred during pull from '{remote_name_context or 'remote'}'.\n\nPlease resolve conflicts manually in:\n{repo_path_conflict}\n\nAfter resolving, stage and commit.",
)
trigger_refreshes = True
sync_refresh = False
self._reenable_widgets_after_modal()
elif status == "error":
log_handler.log_error(f"Pull remote failed: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Pull Error", f"Failed:\n{message}")
auth_status: str = self._determine_auth_status_from_error(exception)
self.app._update_gui_auth_status(auth_status)
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(
status_text="Sync Status: Error"
)
return trigger_refreshes, sync_refresh
def _handle_push_remote_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_push_remote_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
exception: Optional[Exception] = result_data.get("exception")
rejected_branch: Optional[str] = result_data.get("branch_name")
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
f"Push remote successful: {message}", func_name=func_name
)
trigger_refreshes = False
sync_refresh = True
elif status == "rejected":
log_handler.log_error(
f"Push rejected for branch '{rejected_branch}': {message}",
func_name=func_name,
)
if hasattr(self.main_frame, "show_warning"):
self.main_frame.show_warning("Push Rejected", f"{message}")
if hasattr(self.app, "fetch_remote") and callable(self.app.fetch_remote):
log_handler.log_debug(
"Scheduling fetch after push rejection.", func_name=func_name
)
self.app.master.after(100, self.app.fetch_remote)
else:
trigger_refreshes = True
sync_refresh = True
elif status == "error":
log_handler.log_error(f"Push remote failed: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Push Error", f"Failed:\n{message}")
auth_status: str = self._determine_auth_status_from_error(exception)
self.app._update_gui_auth_status(auth_status)
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(
status_text="Sync Status: Error"
)
return trigger_refreshes, sync_refresh
def _handle_push_tags_remote_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_push_tags_remote_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
exception: Optional[Exception] = result_data.get("exception")
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
f"Push tags successful: {message}", func_name=func_name
)
trigger_refreshes = True
elif status == "error":
log_handler.log_error(f"Push tags failed: {message}", func_name=func_name)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Push Tags Error", f"Failed:\n{message}")
auth_status: str = self._determine_auth_status_from_error(exception)
self.app._update_gui_auth_status(auth_status)
return trigger_refreshes, sync_refresh
def _handle_clone_remote_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_clone_remote_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
success_data: Optional[dict] = context.get("clone_success_data")
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
f"Clone successful. Creating profile...", func_name=func_name
)
if success_data and isinstance(success_data, dict):
new_profile_name: Optional[str] = success_data.get("profile_name")
cloned_repo_path: Optional[str] = success_data.get("cloned_path")
cloned_remote_url: Optional[str] = success_data.get("remote_url")
if new_profile_name and cloned_repo_path and cloned_remote_url:
try:
defaults: dict = (
self.app.config_manager._get_expected_keys_with_defaults()
)
defaults["svn_working_copy_path"] = cloned_repo_path
defaults["remote_url"] = cloned_remote_url
defaults["remote_name"] = DEFAULT_REMOTE_NAME
defaults["bundle_name"] = f"{new_profile_name}.bundle"
defaults["bundle_name_updated"] = (
f"{new_profile_name}_update.bundle"
)
defaults["autobackup"] = "False"
defaults["autocommit"] = "False"
defaults["commit_message"] = ""
self.app.config_manager.add_section(new_profile_name)
for key, value in defaults.items():
self.app.config_manager.set_profile_option(
new_profile_name, key, value
)
self.app.config_manager.save_config()
log_handler.log_info(
f"Profile '{new_profile_name}' created successfully.",
func_name=func_name,
)
sections: list = self.app.config_manager.get_profile_sections()
if hasattr(
self.main_frame, "update_profile_dropdown"
) and hasattr(self.main_frame, "profile_var"):
self.main_frame.update_profile_dropdown(sections)
self.main_frame.profile_var.set(new_profile_name)
except Exception as profile_e:
log_handler.log_exception(
f"Clone ok, but failed to create profile '{new_profile_name}': {profile_e}",
func_name=func_name,
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Profile Creation Error",
f"Clone ok, failed create profile:\n{profile_e}",
)
self._reenable_widgets_after_modal()
else:
log_handler.log_error(
"Clone ok, but missing data for profile.", func_name=func_name
)
if hasattr(self.main_frame, "update_status_bar"):
self.main_frame.update_status_bar(
"Clone done, profile data missing."
)
self._reenable_widgets_after_modal()
else:
log_handler.log_error(
"Clone ok, but success data missing/invalid.", func_name=func_name
)
if hasattr(self.main_frame, "update_status_bar"):
self.main_frame.update_status_bar("Clone done, profile data error.")
self._reenable_widgets_after_modal()
elif status == "error":
log_handler.log_error(
f"Clone operation failed: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Clone Error", f"{message}")
self._reenable_widgets_after_modal()
return trigger_refreshes, sync_refresh
def _handle_checkout_tracking_branch_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_checkout_tracking_branch_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
f"Successfully checked out tracking branch: {message}",
func_name=func_name,
)
trigger_refreshes = True
sync_refresh = True
elif status == "error":
log_handler.log_error(
f"Checkout tracking branch failed: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Checkout Error", f"{message}")
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(
status_text="Sync Status: Error"
)
self._reenable_widgets_after_modal()
return trigger_refreshes, sync_refresh
# --- Handler Specifico Spostato (Compare Branches) ---
def _handle_compare_branches_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_compare_branches_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
result_value: Any = result_data.get("result")
if status == "success":
log_handler.log_info(
"Branch comparison successful. Requesting summary window display...",
func_name=func_name,
)
ref1_ctx: Optional[str] = context.get("ref1")
ref2_ctx: Optional[str] = context.get("ref2")
repo_path_ctx: Optional[str] = context.get("repo_path")
changed_files_list: list = (
result_value if isinstance(result_value, list) else []
)
if ref1_ctx and ref2_ctx and repo_path_ctx:
if hasattr(self.app, "show_comparison_summary") and callable(
self.app.show_comparison_summary
):
self.app.show_comparison_summary(
ref1=ref1_ctx,
ref2=ref2_ctx,
repo_path=repo_path_ctx,
changed_files=changed_files_list,
)
else:
log_handler.log_error(
"Cannot display comparison summary: 'show_comparison_summary' method missing on app.",
func_name=func_name,
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Internal Error", "Cannot display comparison results."
)
self._reenable_widgets_after_modal()
else:
log_handler.log_error(
"Missing context data (ref1, ref2, repo_path) for comparison summary.",
func_name=func_name,
)
if hasattr(self.main_frame, "update_status_bar"):
self.main_frame.update_status_bar(
"Error: Internal data missing for comparison."
)
self._reenable_widgets_after_modal()
elif status == "error":
log_handler.log_error(
f"Branch comparison failed: {message}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Comparison Error", f"Could not compare branches:\n{message}"
) # Widgets already re-enabled
return False, False
# --- Gestore Generico ---
def _handle_generic_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
# ... (Codice invariato)
func_name: str = "_handle_generic_result"
status: Optional[str] = result_data.get("status")
message: str = result_data.get("message", "Operation finished.")
exception: Optional[Exception] = result_data.get("exception")
task_context: str = context.get("context", "unknown")
log_handler.log_debug(
f"Handling generic result for '{task_context}' with status '{status}'.",
func_name=func_name,
)
if status == "error":
log_handler.log_error(
f"Error reported for task '{task_context}': {message}",
func_name=func_name,
)
error_details: str = (
f"{message}\n({type(exception).__name__}: {exception})"
if exception
else message
)
if hasattr(self.main_frame, "show_error"):
error_title: str = f"Error: {task_context.replace('_',' ').title()}"
self.main_frame.show_error(error_title, error_details)
elif status == "warning":
if hasattr(self.main_frame, "show_warning"):
self.main_frame.show_warning("Operation Info", message)
return False, False
# --- Helper Methods ---
# (Metodi _determine_auth_status_from_error, _reenable_widgets_after_modal,
# _trigger_post_action_refreshes - INVARIATI rispetto a prima)
def _determine_auth_status_from_error(self, exception: Optional[Exception]) -> str:
# ... (Codice invariato)
if isinstance(exception, GitCommandError) and exception.stderr:
stderr_low: str = exception.stderr.lower()
auth_errors: List[str] = [
"authentication failed",
"permission denied",
"could not read",
]
conn_errors: List[str] = [
"repository not found",
"could not resolve host",
"failed to connect",
"network is unreachable",
"timed out",
"unable to access",
]
if any(e in stderr_low for e in auth_errors):
return "failed"
if any(e in stderr_low for e in conn_errors):
return "connection_failed"
return "unknown_error"
def _reenable_widgets_after_modal(self):
# ... (Codice invariato)
if hasattr(self.app, "master") and self.app.master.winfo_exists():
self.app.master.after(
50, self.app._reenable_widgets_if_ready
) # Usa self.app._reenable_widgets_if_ready
def _trigger_post_action_refreshes(self, sync_refresh_needed: bool):
# ... (Codice invariato)
func_name: str = "_trigger_post_action_refreshes"
if not hasattr(self.app, "master") or not self.app.master.winfo_exists():
log_handler.log_warning(
"Cannot trigger refreshes: Master window closed.", func_name=func_name
)
return
current_repo_path: Optional[str] = self.app._get_and_validate_svn_path(
"Post-Action Refresh Trigger"
)
if not current_repo_path or not self.app._is_repo_ready(current_repo_path):
log_handler.log_warning(
"Cannot trigger refreshes: Repo path unavailable or not ready.",
func_name=func_name,
)
self._reenable_widgets_after_modal()
return
refresh_functions_to_call: List[Callable] = [
self.app.refresh_commit_history,
self.app.refresh_branch_list,
self.app.refresh_tag_list,
self.app.refresh_changed_files_list,
self.app.refresh_remote_branches,
]
log_handler.log_debug(
f"Scheduling {len(refresh_functions_to_call)} standard refreshes. Sync refresh needed: {sync_refresh_needed}",
func_name=func_name,
)
delay_ms: int = 50
for refresh_func in refresh_functions_to_call:
try:
schedule_func: Callable = lambda rf=refresh_func: rf()
self.app.master.after(delay_ms, schedule_func)
log_handler.log_debug(
f"Scheduled {getattr(refresh_func, '__name__', 'unknown_refresh')} with delay {delay_ms}ms.",
func_name=func_name,
)
delay_ms += 75
except Exception as ref_e:
log_handler.log_error(
f"Error scheduling {getattr(refresh_func, '__name__', 'refresh function')}: {ref_e}",
func_name=func_name,
)
if sync_refresh_needed:
if hasattr(self.app, "refresh_remote_status") and callable(
self.app.refresh_remote_status
):
log_handler.log_debug(
f"Scheduling remote status refresh with delay {delay_ms}ms.",
func_name=func_name,
)
self.app.master.after(delay_ms, self.app.refresh_remote_status)
else:
log_handler.log_warning(
"Sync refresh needed but refresh_remote_status method not found on app.",
func_name=func_name,
)
delay_ms += 75
def _reenable_widgets_final():
if (
hasattr(self.main_frame, "winfo_exists")
and self.main_frame.winfo_exists()
):
log_handler.log_debug(
"Re-enabling widgets after post-action refreshes scheduled.",
func_name="_reenable_widgets_final",
)
self.main_frame.set_action_widgets_state(tk.NORMAL)
self.app.master.after(delay_ms + 50, _reenable_widgets_final)
def _handle_revert_to_tag_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
"""
Handles the result of the 'revert to tag' operation.
Shows success/error message and triggers a full GUI refresh on success.
"""
func_name: str = "_handle_revert_to_tag_result"
status: Optional[str] = result_data.get("status")
message: Optional[str] = result_data.get("message")
tag_name_ctx: Optional[str] = context.get("tag_name")
trigger_refreshes: bool = False
sync_refresh: bool = False
if status == "success":
log_handler.log_info(
f"Successfully reverted repository to tag '{tag_name_ctx}'. Triggering full refresh.",
func_name=func_name,
)
# Mostra un messaggio di informazione all'utente
if hasattr(self.main_frame, "show_info"):
self.main_frame.show_info("Operation Successful", message)
# Segnala la necessità di un refresh completo della GUI
trigger_refreshes = True
sync_refresh = True
elif status == "error":
log_handler.log_error(
f"Revert to tag '{tag_name_ctx}' failed: {message}", func_name=func_name
)
# Mostra un messaggio di errore all'utente
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Revert to Tag Error", f"Failed:\n{message}")
# Non è necessario chiamare _reenable_widgets_after_modal() qui,
# perché o il refresh lo farà, o il gestore generico lo farà se non ci sono refresh.
return trigger_refreshes, sync_refresh
def _handle_analyze_history_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
"""
Handles the result of the history analysis. Opens a confirmation dialog if
purgeable files are found.
"""
func_name = "_handle_analyze_history_result"
status = result_data.get("status")
message = result_data.get("message")
purgeable_files = result_data.get("result", [])
repo_path = context.get("repo_path")
if status == "success":
if purgeable_files:
log_handler.log_info(
"Analysis successful. Found files to purge. Showing confirmation dialog.",
func_name=func_name
)
# Chiamata al metodo di app.py per mostrare il dialogo
# Questo metodo gestirà il flusso successivo (conferma/annulla)
# e riabiliterà i widget se necessario.
self.app.show_purge_confirmation_and_purge(repo_path, purgeable_files)
else:
log_handler.log_info(
"Analysis successful. No purgeable files found.",
func_name=func_name
)
self.main_frame.show_info("Analysis Complete", message)
# Riabilita i widget perché non c'è nessuna azione successiva
self._reenable_widgets_after_modal()
elif status == "error":
log_handler.log_error(
f"History analysis failed: {message}",
func_name=func_name
)
self.main_frame.show_error("Analysis Error", f"Failed to analyze repository history:\n{message}")
# Riabilita i widget perché l'operazione è fallita
self._reenable_widgets_after_modal()
# Questo handler non innesca un refresh automatico; il flusso è gestito dal dialogo.
return False, False
def _handle_purge_history_result(
self, result_data: Dict[str, Any], context: Dict[str, Any]
) -> Tuple[bool, bool]:
"""
Handles the result of the history purge operation. Shows a message and
triggers a full GUI refresh on success.
"""
func_name = "_handle_purge_history_result"
status = result_data.get("status")
message = result_data.get("message")
trigger_refreshes = False
sync_refresh = False
if status == "success":
log_handler.log_info(
f"History purge successful. Message: {message}",
func_name=func_name
)
self.main_frame.show_info("Purge Successful", message)
# È FONDAMENTALE fare un refresh completo dopo la riscrittura della storia
trigger_refreshes = True
sync_refresh = True
elif status == "error":
log_handler.log_error(
f"History purge failed: {message}",
func_name=func_name
)
self.main_frame.show_error("Purge Failed", f"The history cleaning process failed:\n\n{message}")
# Riabilita i widget dato che l'operazione è fallita
self._reenable_widgets_after_modal()
return trigger_refreshes, sync_refresh
# --- End of AsyncResultHandler Class ---