1476 lines
67 KiB
Python
1476 lines
67 KiB
Python
# --- FILE: gitsync_tool/async_tasks/async_result_handler.py ---
|
|
|
|
import tkinter as tk
|
|
from tkinter import messagebox # Importa messagebox
|
|
import queue
|
|
import os
|
|
import logging # Per livelli
|
|
from typing import (
|
|
TYPE_CHECKING,
|
|
Dict,
|
|
Any,
|
|
List,
|
|
Tuple,
|
|
Callable,
|
|
Optional,
|
|
Set,
|
|
) # Aggiunto Set
|
|
|
|
# ---<<< MODIFICA IMPORT >>>---
|
|
# Usa percorsi assoluti per importare moduli dal pacchetto
|
|
from gitutility.logging_setup import log_handler
|
|
from gitutility.commands.git_commands import GitCommandError
|
|
|
|
# Importa async_workers (necessario per schedulare task interattivo)
|
|
from gitutility.async_tasks import async_workers
|
|
|
|
# Importa costanti di configurazione
|
|
from gitutility.config.config_manager import DEFAULT_REMOTE_NAME
|
|
|
|
# Forward Reference per Type Hinting (Aggiornato percorsi)
|
|
if TYPE_CHECKING:
|
|
from gitutility.app import GitSvnSyncApp
|
|
|
|
# Importa MainFrame dal nuovo percorso
|
|
from gitutility.gui.main_frame import MainFrame
|
|
# ---<<< FINE MODIFICA IMPORT >>>---
|
|
|
|
|
|
class AsyncResultHandler:
|
|
"""
|
|
Handles processing the results received from asynchronous worker functions.
|
|
Updates the GUI based on the outcome of background tasks (e.g., populating
|
|
lists, showing messages) and triggers necessary follow-up actions like
|
|
GUI refreshes or subsequent asynchronous tasks.
|
|
Separates result processing logic from the main application controller.
|
|
"""
|
|
|
|
def __init__(self, app: "GitSvnSyncApp"):
|
|
"""
|
|
Initializes the result handler.
|
|
|
|
Args:
|
|
app (GitSvnSyncApp): The main application instance, used to access
|
|
GUI elements and other application methods.
|
|
|
|
Raises:
|
|
ValueError: If the provided 'app' instance is invalid or missing main_frame.
|
|
"""
|
|
# Validate the main application instance passed
|
|
if app is None:
|
|
raise ValueError("AsyncResultHandler requires a valid 'app' instance.")
|
|
# Ensure the main_frame is already initialized on the app instance
|
|
if not hasattr(app, "main_frame") or not app.main_frame:
|
|
raise ValueError(
|
|
"AsyncResultHandler requires the app's main_frame to be initialized before instantiation."
|
|
)
|
|
|
|
# Store references to the app and its main frame
|
|
self.app: "GitSvnSyncApp" = app
|
|
# ---<<< MODIFICA: Assicurati che il type hint per main_frame sia corretto >>>---
|
|
self.main_frame: "MainFrame" = app.main_frame # Usa il forward reference
|
|
# ---<<< FINE MODIFICA >>>---
|
|
|
|
log_handler.log_debug("AsyncResultHandler initialized.", func_name="__init__")
|
|
|
|
def process(self, result_data: Dict[str, Any], context: Dict[str, Any]) -> None:
|
|
"""
|
|
Main entry point for processing results from asynchronous workers.
|
|
Dispatches the result to the appropriate handler method based on the
|
|
provided context. Schedules subsequent GUI refreshes if needed.
|
|
|
|
Args:
|
|
result_data (Dict[str, Any]): The dictionary returned by the worker thread.
|
|
Expected keys: 'status', 'message', 'result', 'exception'.
|
|
context (Dict[str, Any]): The context dictionary passed when the
|
|
async operation was initially started.
|
|
Expected keys: 'context', 'status_msg', etc.
|
|
"""
|
|
# Extract task context and status from the received data
|
|
task_context: str = context.get("context", "unknown")
|
|
status: str = result_data.get(
|
|
"status", "error"
|
|
) # Default to 'error' if missing
|
|
func_name: str = f"process (ctx: {task_context})" # For logging
|
|
|
|
log_handler.log_debug(
|
|
f"Processing async result for context '{task_context}' with status '{status}'.",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# --- Determine Refresh Needs ---
|
|
# Flags set by specific handler methods, default to no refresh
|
|
should_trigger_refreshes: bool = False
|
|
post_action_sync_refresh_needed: bool = False # For ahead/behind status
|
|
|
|
# --- Dispatch to Appropriate Handler Method ---
|
|
# Map context strings to their corresponding handler methods
|
|
handler_map: Dict[str, Callable] = {
|
|
"compare_branches": self._handle_compare_branches_result,
|
|
"refresh_tags": self._handle_refresh_tags_result,
|
|
"refresh_branches": self._handle_refresh_local_branches_result,
|
|
"refresh_history": self._handle_refresh_history_result,
|
|
"refresh_changes": self._handle_refresh_changes_result,
|
|
"refresh_remote_branches": self._handle_refresh_remote_branches_result,
|
|
"get_ahead_behind": self._handle_get_ahead_behind_result,
|
|
"prepare_repo": self._handle_prepare_repo_result,
|
|
"create_bundle": self._handle_create_bundle_result,
|
|
"fetch_bundle": self._handle_fetch_bundle_result,
|
|
"manual_backup": self._handle_manual_backup_result,
|
|
"commit": self._handle_commit_result,
|
|
"_handle_gitignore_save": self._handle_untrack_result, # Context for untrack after save
|
|
"add_file": self._handle_add_file_result,
|
|
"create_tag": self._handle_create_tag_result,
|
|
"checkout_tag": self._handle_checkout_tag_result,
|
|
"create_branch": self._handle_create_branch_result,
|
|
"checkout_branch": self._handle_checkout_branch_result,
|
|
"delete_local_branch": self._handle_delete_local_branch_result,
|
|
"merge_local_branch": self._handle_merge_local_branch_result,
|
|
"check_connection": self._handle_check_connection_result,
|
|
"interactive_auth": self._handle_interactive_auth_result,
|
|
"apply_remote_config": self._handle_apply_remote_config_result,
|
|
"fetch_remote": self._handle_fetch_remote_result,
|
|
"pull_remote": self._handle_pull_remote_result,
|
|
"push_remote": self._handle_push_remote_result,
|
|
"push_tags_remote": self._handle_push_tags_remote_result,
|
|
"clone_remote": self._handle_clone_remote_result,
|
|
"checkout_tracking_branch": self._handle_checkout_tracking_branch_result,
|
|
"get_commit_details": self._handle_get_commit_details_result,
|
|
"update_wiki": self._handle_generic_result,
|
|
}
|
|
|
|
# Get the handler method from the map
|
|
handler_method: Optional[Callable] = handler_map.get(task_context)
|
|
|
|
# --- Execute the Handler ---
|
|
if handler_method and callable(handler_method):
|
|
log_handler.log_debug(
|
|
f"Dispatching to handler: {handler_method.__name__}",
|
|
func_name=func_name,
|
|
)
|
|
# Call the specific handler method
|
|
# It's expected to return a tuple: (should_trigger_refreshes, post_action_sync_refresh_needed)
|
|
refresh_flags: Any = handler_method(result_data, context)
|
|
|
|
# Unpack the refresh flags
|
|
if isinstance(refresh_flags, tuple) and len(refresh_flags) == 2:
|
|
should_trigger_refreshes, post_action_sync_refresh_needed = (
|
|
refresh_flags
|
|
)
|
|
else:
|
|
# Log warning if handler didn't return expected flags
|
|
log_handler.log_warning(
|
|
f"Handler '{handler_method.__name__}' for context '{task_context}' did not return the expected refresh flags tuple (bool, bool). Defaulting to no refresh.",
|
|
func_name=func_name,
|
|
)
|
|
should_trigger_refreshes = False
|
|
post_action_sync_refresh_needed = False
|
|
else:
|
|
# Fallback for unhandled contexts
|
|
log_handler.log_warning(
|
|
f"No specific handler method found for context: '{task_context}'. Using generic handler.",
|
|
func_name=func_name,
|
|
)
|
|
# Call generic handler (returns default False, False for refreshes)
|
|
should_trigger_refreshes, post_action_sync_refresh_needed = (
|
|
self._handle_generic_result(result_data, context)
|
|
)
|
|
|
|
# --- Trigger Necessary GUI Refreshes (if needed) ---
|
|
# This happens *after* the specific result has been processed
|
|
if should_trigger_refreshes:
|
|
log_handler.log_debug(
|
|
f"Triggering post-action refreshes for context '{task_context}'. Sync refresh needed: {post_action_sync_refresh_needed}",
|
|
func_name=func_name,
|
|
)
|
|
self._trigger_post_action_refreshes(post_action_sync_refresh_needed)
|
|
else:
|
|
log_handler.log_debug(
|
|
f"No standard post-action refreshes required for context '{task_context}'.",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# --- Handler Methods for Refresh Tasks ---
|
|
# (Metodi _handle_refresh_... , _handle_get_ahead_behind - INVARIATI rispetto a prima)
|
|
# Assicurati che chiamino i metodi corretti su self.main_frame e self.app
|
|
def _handle_refresh_tags_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_refresh_tags_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
result_value: Any = result_data.get("result")
|
|
if not hasattr(self.main_frame, "update_tag_list"):
|
|
log_handler.log_error(
|
|
"MainFrame missing 'update_tag_list'.", func_name=func_name
|
|
)
|
|
return False, False
|
|
if status == "success":
|
|
tags_list: list = result_value if isinstance(result_value, list) else []
|
|
self.main_frame.update_tag_list(tags_list)
|
|
log_handler.log_debug(
|
|
f"Tag list updated ({len(tags_list)} items).", func_name=func_name
|
|
)
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
"Error reported during tag refresh.", func_name=func_name
|
|
)
|
|
self.main_frame.update_tag_list([("(Error)", "")])
|
|
return False, False
|
|
|
|
def _handle_refresh_local_branches_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato, usa self.app.current_local_branch)
|
|
func_name: str = "_handle_refresh_local_branches_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
result_value: Any = result_data.get("result")
|
|
gui_methods_exist: bool = hasattr(
|
|
self.main_frame, "update_branch_list"
|
|
) and hasattr(self.main_frame, "update_history_branch_filter")
|
|
if not gui_methods_exist:
|
|
log_handler.log_error(
|
|
"MainFrame missing update methods for local branches.",
|
|
func_name=func_name,
|
|
)
|
|
return False, False
|
|
branches: List[str] = []
|
|
current_branch: Optional[str] = None
|
|
if status == "success":
|
|
if isinstance(result_value, tuple) and len(result_value) == 2:
|
|
branches, current_branch = result_value
|
|
self.app.current_local_branch = current_branch
|
|
log_handler.log_debug(
|
|
f"Local branches updated. Current: {current_branch}",
|
|
func_name=func_name,
|
|
)
|
|
else:
|
|
log_handler.log_warning(
|
|
"Invalid result format for local branches refresh.",
|
|
func_name=func_name,
|
|
)
|
|
branches = ["(Invalid Data)"]
|
|
current_branch = None
|
|
self.app.current_local_branch = None
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
"Error reported during local branches refresh.", func_name=func_name
|
|
)
|
|
branches = ["(Error)"]
|
|
current_branch = None
|
|
self.app.current_local_branch = None
|
|
self.main_frame.update_branch_list(branches, current_branch)
|
|
self.main_frame.update_history_branch_filter(branches)
|
|
if hasattr(self.app, "refresh_remote_status") and callable(
|
|
self.app.refresh_remote_status
|
|
):
|
|
log_handler.log_debug(
|
|
"Scheduling remote status refresh after local branches update.",
|
|
func_name=func_name,
|
|
)
|
|
self.app.master.after(100, self.app.refresh_remote_status)
|
|
else:
|
|
log_handler.log_warning(
|
|
"Could not schedule sync status refresh: method missing on app.",
|
|
func_name=func_name,
|
|
)
|
|
return False, False
|
|
|
|
def _handle_refresh_history_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato, usa Treeview ora)
|
|
func_name: str = "_handle_refresh_history_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
result_value: Any = result_data.get("result")
|
|
if not hasattr(self.main_frame, "update_history_display"):
|
|
log_handler.log_error(
|
|
"MainFrame missing 'update_history_display'.", func_name=func_name
|
|
)
|
|
return False, False
|
|
if status == "success":
|
|
log_lines: list = result_value if isinstance(result_value, list) else []
|
|
self.main_frame.update_history_display(log_lines)
|
|
log_handler.log_debug(
|
|
f"History display updated ({len(log_lines)} lines).",
|
|
func_name=func_name,
|
|
)
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
"Error reported during history refresh.", func_name=func_name
|
|
)
|
|
self.main_frame.update_history_display(["(Error retrieving history)"])
|
|
return False, False
|
|
|
|
def _handle_refresh_changes_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_refresh_changes_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
result_value: Any = result_data.get("result")
|
|
if not hasattr(self.main_frame, "update_changed_files_list"):
|
|
log_handler.log_error(
|
|
"MainFrame missing 'update_changed_files_list'.", func_name=func_name
|
|
)
|
|
return False, False
|
|
if status == "success":
|
|
files_list: list = result_value if isinstance(result_value, list) else []
|
|
self.main_frame.update_changed_files_list(files_list)
|
|
log_handler.log_debug(
|
|
f"Changed files list updated ({len(files_list)} items).",
|
|
func_name=func_name,
|
|
)
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
"Error reported during changes refresh.", func_name=func_name
|
|
)
|
|
self.main_frame.update_changed_files_list(["(Error refreshing changes)"])
|
|
return False, False
|
|
|
|
def _handle_refresh_remote_branches_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_refresh_remote_branches_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
result_value: Any = result_data.get("result")
|
|
if not hasattr(self.main_frame, "update_remote_branches_list"):
|
|
log_handler.log_error(
|
|
"MainFrame missing 'update_remote_branches_list'.", func_name=func_name
|
|
)
|
|
return False, False
|
|
if status == "success":
|
|
branch_list: list = (
|
|
result_value if isinstance(result_value, list) else ["(Invalid Data)"]
|
|
)
|
|
self.main_frame.update_remote_branches_list(branch_list)
|
|
log_handler.log_debug(
|
|
f"Remote branches list updated ({len(branch_list)} items).",
|
|
func_name=func_name,
|
|
)
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
"Error reported during remote branches refresh.", func_name=func_name
|
|
)
|
|
self.main_frame.update_remote_branches_list(["(Error)"])
|
|
if hasattr(self.main_frame, "update_ahead_behind_status"):
|
|
self.main_frame.update_ahead_behind_status(
|
|
status_text="Sync Status: Unknown"
|
|
)
|
|
return False, False
|
|
|
|
def _handle_get_ahead_behind_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_get_ahead_behind_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
result_value: Any = result_data.get("result")
|
|
message: Optional[str] = result_data.get("message")
|
|
local_branch_ctx: Optional[str] = context.get("local_branch")
|
|
if not hasattr(self.main_frame, "update_ahead_behind_status"):
|
|
log_handler.log_error(
|
|
"MainFrame missing 'update_ahead_behind_status'.", func_name=func_name
|
|
)
|
|
return False, False
|
|
if status == "success":
|
|
ahead, behind = (
|
|
result_value if isinstance(result_value, tuple) else (None, None)
|
|
)
|
|
log_handler.log_info(
|
|
f"Ahead/Behind status for '{local_branch_ctx}': Ahead={ahead}, Behind={behind}",
|
|
func_name=func_name,
|
|
)
|
|
self.main_frame.update_ahead_behind_status(
|
|
current_branch=local_branch_ctx, ahead=ahead, behind=behind
|
|
)
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
f"Failed getting ahead/behind for '{local_branch_ctx}': {message}",
|
|
func_name=func_name,
|
|
)
|
|
error_text: str = (
|
|
f"Sync Status: Error"
|
|
if not message
|
|
else f"Sync Status: Error ({message})"
|
|
)
|
|
self.main_frame.update_ahead_behind_status(
|
|
current_branch=local_branch_ctx, status_text=error_text
|
|
)
|
|
return False, False
|
|
|
|
# --- Handler Methods for Local Git Actions ---
|
|
# (Metodi _handle_... INVARIATI rispetto a prima)
|
|
# Assicurati che chiamino i metodi corretti su self.main_frame e self.app
|
|
def _handle_prepare_repo_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_prepare_repo_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
trigger_refreshes: bool = False
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
"Repository preparation successful.", func_name=func_name
|
|
)
|
|
trigger_refreshes = True
|
|
sync_refresh = True
|
|
elif status == "warning":
|
|
log_handler.log_warning(
|
|
f"Prepare repo warning: {message}", func_name=func_name
|
|
)
|
|
if hasattr(self.main_frame, "show_info"):
|
|
self.main_frame.show_info("Prepare Info", message)
|
|
trigger_refreshes = True
|
|
sync_refresh = True
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
f"Repository preparation failed: {message}", func_name=func_name
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error("Prepare Error", f"Failed:\n{message}")
|
|
if hasattr(self.app, "update_svn_status_indicator") and hasattr(
|
|
self.app.main_frame, "svn_path_entry"
|
|
):
|
|
self.app.master.after(
|
|
50,
|
|
lambda: self.app.update_svn_status_indicator(
|
|
self.app.main_frame.svn_path_entry.get()
|
|
),
|
|
)
|
|
return trigger_refreshes, sync_refresh
|
|
|
|
def _handle_create_bundle_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_create_bundle_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
committed: bool = result_data.get("committed", False)
|
|
trigger_refreshes: bool = False
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
f"Create bundle finished: {message}", func_name=func_name
|
|
)
|
|
if committed:
|
|
trigger_refreshes = True
|
|
sync_refresh = True
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
f"Create bundle failed: {message}", func_name=func_name
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error("Create Bundle Error", f"Failed:\n{message}")
|
|
return trigger_refreshes, sync_refresh
|
|
|
|
def _handle_fetch_bundle_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_fetch_bundle_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
is_conflict: bool = result_data.get("conflict", False)
|
|
repo_path_conflict: Optional[str] = result_data.get("repo_path")
|
|
trigger_refreshes: bool = False
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
f"Fetch from bundle successful: {message}", func_name=func_name
|
|
)
|
|
trigger_refreshes = True
|
|
sync_refresh = True
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
f"Fetch from bundle failed: {message}", func_name=func_name
|
|
)
|
|
if is_conflict and repo_path_conflict:
|
|
log_handler.log_error(
|
|
"Merge conflict detected during fetch from bundle.",
|
|
func_name=func_name,
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error(
|
|
"Merge Conflict",
|
|
f"Conflict occurred during bundle fetch.\nResolve in:\n{repo_path_conflict}\nThen commit.",
|
|
)
|
|
trigger_refreshes = True
|
|
sync_refresh = False
|
|
self._reenable_widgets_after_modal()
|
|
else:
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error(
|
|
"Fetch Bundle Error", f"Failed:\n{message}"
|
|
)
|
|
if hasattr(self.app, "update_svn_status_indicator") and hasattr(
|
|
self.app.main_frame, "svn_path_entry"
|
|
):
|
|
self.app.master.after(
|
|
50,
|
|
lambda: self.app.update_svn_status_indicator(
|
|
self.app.main_frame.svn_path_entry.get()
|
|
),
|
|
)
|
|
return trigger_refreshes, sync_refresh
|
|
|
|
def _handle_manual_backup_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_manual_backup_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
f"Manual backup finished: {message}", func_name=func_name
|
|
) # Optional: self.main_frame.show_info("Backup Complete", message)
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
f"Manual backup failed: {message}", func_name=func_name
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error("Manual Backup Error", f"Failed:\n{message}")
|
|
return False, False
|
|
|
|
def _handle_commit_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_commit_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
committed: bool = result_data.get("committed", False)
|
|
trigger_refreshes: bool = False
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
f"Commit operation finished: {message}", func_name=func_name
|
|
)
|
|
if committed:
|
|
if hasattr(self.main_frame, "clear_commit_message"):
|
|
self.main_frame.clear_commit_message()
|
|
trigger_refreshes = True
|
|
sync_refresh = True
|
|
elif status == "error":
|
|
log_handler.log_error(f"Commit failed: {message}", func_name=func_name)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error("Commit Error", f"Failed:\n{message}")
|
|
return trigger_refreshes, sync_refresh
|
|
|
|
def _handle_untrack_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_untrack_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
committed: bool = result_data.get("committed", False)
|
|
trigger_refreshes: bool = False
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
f"Untrack operation finished: {message}", func_name=func_name
|
|
)
|
|
if committed:
|
|
trigger_refreshes = True
|
|
sync_refresh = True
|
|
elif status == "error":
|
|
log_handler.log_error(f"Untracking failed: {message}", func_name=func_name)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error("Untrack Error", f"Failed:\n{message}")
|
|
return trigger_refreshes, sync_refresh
|
|
|
|
def _handle_get_commit_details_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_get_commit_details_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
commit_details: Optional[Dict[str, Any]] = result_data.get("result")
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
"Commit details retrieved successfully. Requesting detail window display...",
|
|
func_name=func_name,
|
|
)
|
|
if isinstance(commit_details, dict):
|
|
if hasattr(self.app, "show_commit_details") and callable(
|
|
self.app.show_commit_details
|
|
):
|
|
self.app.show_commit_details(commit_details)
|
|
else:
|
|
log_handler.log_error(
|
|
"Cannot display commit details: 'show_commit_details' method missing on app.",
|
|
func_name=func_name,
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error(
|
|
"Internal Error", "Cannot display commit details window."
|
|
)
|
|
self._reenable_widgets_after_modal()
|
|
else:
|
|
log_handler.log_error(
|
|
f"Received success for commit details, but result is not dict: {type(commit_details)}",
|
|
func_name=func_name,
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error(
|
|
"Data Error", "Failed to process commit details."
|
|
)
|
|
self._reenable_widgets_after_modal()
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
f"Failed to get commit details: {message}", func_name=func_name
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error(
|
|
"Commit Details Error", f"Could not get details:\n{message}"
|
|
) # Widgets already re-enabled
|
|
return False, False
|
|
|
|
def _handle_add_file_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_add_file_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
trigger_refreshes: bool = False
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(f"Add file successful: {message}", func_name=func_name)
|
|
trigger_refreshes = True
|
|
elif status == "error":
|
|
log_handler.log_error(f"Add file failed: {message}", func_name=func_name)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error("Add File Error", f"Failed:\n{message}")
|
|
return trigger_refreshes, sync_refresh
|
|
|
|
def _handle_create_tag_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_create_tag_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
trigger_refreshes: bool = False
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
f"Tag creation successful: {message}", func_name=func_name
|
|
)
|
|
trigger_refreshes = True
|
|
elif status == "error":
|
|
log_handler.log_error(f"Create tag failed: {message}", func_name=func_name)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error("Create Tag Error", f"Failed:\n{message}")
|
|
return trigger_refreshes, sync_refresh
|
|
|
|
def _handle_checkout_tag_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_checkout_tag_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
exception: Optional[Exception] = result_data.get("exception")
|
|
trigger_refreshes: bool = False
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
f"Checkout tag successful: {message}", func_name=func_name
|
|
)
|
|
trigger_refreshes = True
|
|
sync_refresh = True
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
f"Checkout tag failed: {message}", func_name=func_name
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
if exception and "Uncommitted changes" in str(exception):
|
|
self.main_frame.show_warning(
|
|
"Checkout Blocked", f"{message}\nCommit or stash first."
|
|
)
|
|
else:
|
|
self.main_frame.show_error(
|
|
"Checkout Tag Error", f"Failed:\n{message}"
|
|
)
|
|
self._reenable_widgets_after_modal()
|
|
return trigger_refreshes, sync_refresh
|
|
|
|
def _handle_create_branch_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_create_branch_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
new_branch_context: Optional[str] = context.get("new_branch_name")
|
|
trigger_refreshes: bool = False
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
f"Create branch successful: {message}", func_name=func_name
|
|
)
|
|
if new_branch_context and hasattr(self.main_frame, "ask_yes_no"):
|
|
if self.main_frame.ask_yes_no(
|
|
"Checkout?", f"Switch to new branch '{new_branch_context}'?"
|
|
):
|
|
if hasattr(self.app, "checkout_branch") and callable(
|
|
self.app.checkout_branch
|
|
):
|
|
self.app.checkout_branch(branch_to_checkout=new_branch_context)
|
|
trigger_refreshes = False
|
|
sync_refresh = False # Let checkout handle refreshes
|
|
else:
|
|
log_handler.log_error(
|
|
"Cannot checkout new branch: method missing on app.",
|
|
func_name=func_name,
|
|
)
|
|
trigger_refreshes = True
|
|
else:
|
|
trigger_refreshes = True
|
|
else:
|
|
trigger_refreshes = True
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
f"Create branch failed: {message}", func_name=func_name
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error("Create Branch Error", f"Failed:\n{message}")
|
|
return trigger_refreshes, sync_refresh
|
|
|
|
def _handle_checkout_branch_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_checkout_branch_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
exception: Optional[Exception] = result_data.get("exception")
|
|
trigger_refreshes: bool = False
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
f"Checkout branch successful: {message}", func_name=func_name
|
|
)
|
|
trigger_refreshes = True
|
|
sync_refresh = True
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
f"Checkout branch failed: {message}", func_name=func_name
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
if exception and "Uncommitted changes" in str(exception):
|
|
self.main_frame.show_warning(
|
|
"Checkout Blocked", f"{message}\nCommit or stash first."
|
|
)
|
|
else:
|
|
self.main_frame.show_error(
|
|
"Checkout Branch Error", f"Failed:\n{message}"
|
|
)
|
|
self._reenable_widgets_after_modal()
|
|
return trigger_refreshes, sync_refresh
|
|
|
|
def _handle_delete_local_branch_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_delete_local_branch_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
exception: Optional[Exception] = result_data.get("exception")
|
|
deleted_branch_name: Optional[str] = context.get("branch_name")
|
|
trigger_refreshes: bool = False
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
f"Local branch '{deleted_branch_name}' deleted successfully.",
|
|
func_name=func_name,
|
|
)
|
|
trigger_refreshes = True
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
f"Delete local branch '{deleted_branch_name}' failed: {message}",
|
|
func_name=func_name,
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
stderr_text = (
|
|
getattr(exception, "stderr", "")
|
|
if isinstance(exception, GitCommandError)
|
|
else ""
|
|
)
|
|
if "not fully merged" in message or "not fully merged" in stderr_text:
|
|
self.main_frame.show_warning("Delete Warning", f"{message}")
|
|
else:
|
|
self.main_frame.show_error("Delete Error", f"{message}")
|
|
return trigger_refreshes, sync_refresh
|
|
|
|
def _handle_merge_local_branch_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_merge_local_branch_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
merged_into: Optional[str] = context.get("branch_merged_into")
|
|
merged_from: Optional[str] = context.get("branch_merged_from")
|
|
repo_path_conflict: Optional[str] = context.get("repo_path")
|
|
trigger_refreshes: bool = False
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
f"Successfully merged '{merged_from}' into '{merged_into}'.",
|
|
func_name=func_name,
|
|
)
|
|
trigger_refreshes = True
|
|
sync_refresh = True
|
|
elif status == "conflict":
|
|
log_handler.log_error(
|
|
f"Merge conflict occurred merging '{merged_from}'. User needs to resolve manually.",
|
|
func_name=func_name,
|
|
)
|
|
if hasattr(self, "main_frame"):
|
|
self.main_frame.show_error(
|
|
"Merge Conflict",
|
|
f"Merge conflict occurred while merging '{merged_from}' into '{merged_into}'.\n\nPlease resolve the conflicts manually in:\n{repo_path_conflict}\n\nAfter resolving, stage the changes and commit them.",
|
|
)
|
|
trigger_refreshes = True
|
|
sync_refresh = False
|
|
self._reenable_widgets_after_modal()
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
f"Merge local branch failed: {message}", func_name=func_name
|
|
)
|
|
if hasattr(self, "main_frame"):
|
|
self.main_frame.show_error("Merge Error", f"{message}")
|
|
self._reenable_widgets_after_modal()
|
|
return trigger_refreshes, sync_refresh
|
|
|
|
# --- Handlers for Authentication and Remote Config ---
|
|
# (Metodi _handle_... INVARIATI rispetto a prima)
|
|
def _handle_check_connection_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_check_connection_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
result_value: Optional[str] = result_data.get("result")
|
|
remote_name: str = context.get("remote_name_checked", "unknown remote")
|
|
repo_path_checked: Optional[str] = context.get("repo_path_checked")
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
auth_status_to_set: str = "ok"
|
|
log_handler.log_info(
|
|
f"Connection check successful for '{remote_name}'.", func_name=func_name
|
|
)
|
|
self.app._update_gui_auth_status(auth_status_to_set)
|
|
sync_refresh = True
|
|
elif status == "auth_required":
|
|
log_handler.log_warning(
|
|
f"Authentication required for remote '{remote_name}'.",
|
|
func_name=func_name,
|
|
)
|
|
self.app._update_gui_auth_status("required")
|
|
if (
|
|
repo_path_checked
|
|
and hasattr(self.main_frame, "ask_yes_no")
|
|
and self.main_frame.ask_yes_no(
|
|
"Authentication Required",
|
|
f"Authentication is required to connect to remote '{remote_name}'.\n\nDo you want to attempt authentication now?\n(This may open a separate terminal window for credential input.)",
|
|
)
|
|
):
|
|
log_handler.log_info(
|
|
"User requested interactive authentication attempt.",
|
|
func_name=func_name,
|
|
)
|
|
args_interactive: tuple = (
|
|
self.app.git_commands,
|
|
repo_path_checked,
|
|
remote_name,
|
|
)
|
|
if hasattr(self.app, "_start_async_operation"):
|
|
self.app._start_async_operation(
|
|
worker_func=async_workers.run_interactive_auth_attempt_async,
|
|
args_tuple=args_interactive,
|
|
context_dict={
|
|
"context": "interactive_auth",
|
|
"status_msg": f"Attempting interactive auth for '{remote_name}'",
|
|
"original_context": context,
|
|
},
|
|
)
|
|
else:
|
|
log_handler.log_error(
|
|
"Cannot start interactive auth: _start_async_operation missing on app.",
|
|
func_name=func_name,
|
|
)
|
|
self._reenable_widgets_after_modal()
|
|
else:
|
|
log_handler.log_info(
|
|
"User declined interactive authentication attempt or required data missing.",
|
|
func_name=func_name,
|
|
)
|
|
self._reenable_widgets_after_modal()
|
|
elif status == "error":
|
|
error_type: str = (
|
|
result_value
|
|
if result_value
|
|
in ["connection_failed", "unknown_error", "worker_exception"]
|
|
else "unknown_error"
|
|
)
|
|
self.app._update_gui_auth_status(error_type)
|
|
if hasattr(self.main_frame, "update_ahead_behind_status"):
|
|
self.main_frame.update_ahead_behind_status(
|
|
status_text="Sync Status: Error"
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error("Connection Error", f"{message}")
|
|
return False, sync_refresh
|
|
|
|
def _handle_interactive_auth_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_interactive_auth_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
result_value: Optional[str] = result_data.get("result")
|
|
original_context: dict = context.get("original_context", {})
|
|
remote_name: str = original_context.get("remote_name_checked", "unknown remote")
|
|
if status == "success" and result_value == "auth_attempt_success":
|
|
log_handler.log_info(
|
|
f"Interactive auth attempt for '{remote_name}' successful. Re-checking connection...",
|
|
func_name=func_name,
|
|
)
|
|
if hasattr(self.main_frame, "update_status_bar"):
|
|
self.main_frame.update_status_bar(
|
|
f"Authentication successful. Checking status..."
|
|
)
|
|
if hasattr(self.app, "check_connection_auth") and callable(
|
|
self.app.check_connection_auth
|
|
):
|
|
self.app.master.after(100, self.app.check_connection_auth)
|
|
else:
|
|
log_handler.log_error(
|
|
"Cannot re-check connection: check_connection_auth missing on app.",
|
|
func_name=func_name,
|
|
)
|
|
self._reenable_widgets_after_modal()
|
|
elif status == "error":
|
|
log_handler.log_warning(
|
|
f"Interactive auth attempt for '{remote_name}' failed: {message}",
|
|
func_name=func_name,
|
|
)
|
|
self.app._update_gui_auth_status("failed")
|
|
if hasattr(self.main_frame, "update_ahead_behind_status"):
|
|
self.main_frame.update_ahead_behind_status(
|
|
status_text="Sync Status: Auth Failed"
|
|
)
|
|
if hasattr(self.main_frame, "show_warning"):
|
|
self.main_frame.show_warning(
|
|
"Authentication Attempt Failed", f"{message}"
|
|
)
|
|
self._reenable_widgets_after_modal()
|
|
return False, False
|
|
|
|
def _handle_apply_remote_config_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_apply_remote_config_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
f"Apply remote config successful: {message}", func_name=func_name
|
|
)
|
|
if hasattr(self.app, "check_connection_auth") and callable(
|
|
self.app.check_connection_auth
|
|
):
|
|
log_handler.log_debug(
|
|
"Scheduling connection check after applying config.",
|
|
func_name=func_name,
|
|
)
|
|
self.app.master.after(50, self.app.check_connection_auth)
|
|
else:
|
|
log_handler.log_warning(
|
|
"Cannot auto-check connection after apply config.",
|
|
func_name=func_name,
|
|
)
|
|
sync_refresh = True
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
f"Apply remote config failed: {message}", func_name=func_name
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error("Apply Config Error", f"Failed:\n{message}")
|
|
self.app._update_gui_auth_status("unknown")
|
|
if hasattr(self.main_frame, "update_ahead_behind_status"):
|
|
self.main_frame.update_ahead_behind_status(
|
|
status_text="Sync Status: Unknown"
|
|
)
|
|
return False, sync_refresh
|
|
|
|
# --- Handlers for Remote Git Actions ---
|
|
# (Metodi _handle_... INVARIATI rispetto a prima)
|
|
def _handle_fetch_remote_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_fetch_remote_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
exception: Optional[Exception] = result_data.get("exception")
|
|
trigger_refreshes: bool = False
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
f"Fetch remote successful: {message}", func_name=func_name
|
|
)
|
|
trigger_refreshes = True
|
|
sync_refresh = True
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
f"Fetch remote failed: {message}", func_name=func_name
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error("Fetch Error", f"Failed:\n{message}")
|
|
auth_status: str = self._determine_auth_status_from_error(exception)
|
|
self.app._update_gui_auth_status(auth_status)
|
|
if hasattr(self.main_frame, "update_ahead_behind_status"):
|
|
self.main_frame.update_ahead_behind_status(
|
|
status_text="Sync Status: Error"
|
|
)
|
|
return trigger_refreshes, sync_refresh
|
|
|
|
def _handle_pull_remote_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_pull_remote_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
exception: Optional[Exception] = result_data.get("exception")
|
|
repo_path_conflict: Optional[str] = context.get("repo_path")
|
|
remote_name_context: Optional[str] = context.get("remote_name")
|
|
trigger_refreshes: bool = False
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
f"Pull remote successful: {message}", func_name=func_name
|
|
)
|
|
trigger_refreshes = True
|
|
sync_refresh = True
|
|
elif status == "conflict":
|
|
log_handler.log_error(
|
|
f"Pull resulted in conflict: {message}", func_name=func_name
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error(
|
|
"Merge Conflict",
|
|
f"Merge conflict occurred during pull from '{remote_name_context or 'remote'}'.\n\nPlease resolve conflicts manually in:\n{repo_path_conflict}\n\nAfter resolving, stage and commit.",
|
|
)
|
|
trigger_refreshes = True
|
|
sync_refresh = False
|
|
self._reenable_widgets_after_modal()
|
|
elif status == "error":
|
|
log_handler.log_error(f"Pull remote failed: {message}", func_name=func_name)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error("Pull Error", f"Failed:\n{message}")
|
|
auth_status: str = self._determine_auth_status_from_error(exception)
|
|
self.app._update_gui_auth_status(auth_status)
|
|
if hasattr(self.main_frame, "update_ahead_behind_status"):
|
|
self.main_frame.update_ahead_behind_status(
|
|
status_text="Sync Status: Error"
|
|
)
|
|
return trigger_refreshes, sync_refresh
|
|
|
|
def _handle_push_remote_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_push_remote_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
exception: Optional[Exception] = result_data.get("exception")
|
|
rejected_branch: Optional[str] = result_data.get("branch_name")
|
|
trigger_refreshes: bool = False
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
f"Push remote successful: {message}", func_name=func_name
|
|
)
|
|
trigger_refreshes = False
|
|
sync_refresh = True
|
|
elif status == "rejected":
|
|
log_handler.log_error(
|
|
f"Push rejected for branch '{rejected_branch}': {message}",
|
|
func_name=func_name,
|
|
)
|
|
if hasattr(self.main_frame, "show_warning"):
|
|
self.main_frame.show_warning("Push Rejected", f"{message}")
|
|
if hasattr(self.app, "fetch_remote") and callable(self.app.fetch_remote):
|
|
log_handler.log_debug(
|
|
"Scheduling fetch after push rejection.", func_name=func_name
|
|
)
|
|
self.app.master.after(100, self.app.fetch_remote)
|
|
else:
|
|
trigger_refreshes = True
|
|
sync_refresh = True
|
|
elif status == "error":
|
|
log_handler.log_error(f"Push remote failed: {message}", func_name=func_name)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error("Push Error", f"Failed:\n{message}")
|
|
auth_status: str = self._determine_auth_status_from_error(exception)
|
|
self.app._update_gui_auth_status(auth_status)
|
|
if hasattr(self.main_frame, "update_ahead_behind_status"):
|
|
self.main_frame.update_ahead_behind_status(
|
|
status_text="Sync Status: Error"
|
|
)
|
|
return trigger_refreshes, sync_refresh
|
|
|
|
def _handle_push_tags_remote_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_push_tags_remote_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
exception: Optional[Exception] = result_data.get("exception")
|
|
trigger_refreshes: bool = False
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
f"Push tags successful: {message}", func_name=func_name
|
|
)
|
|
trigger_refreshes = True
|
|
elif status == "error":
|
|
log_handler.log_error(f"Push tags failed: {message}", func_name=func_name)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error("Push Tags Error", f"Failed:\n{message}")
|
|
auth_status: str = self._determine_auth_status_from_error(exception)
|
|
self.app._update_gui_auth_status(auth_status)
|
|
return trigger_refreshes, sync_refresh
|
|
|
|
def _handle_clone_remote_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_clone_remote_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
success_data: Optional[dict] = context.get("clone_success_data")
|
|
trigger_refreshes: bool = False
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
f"Clone successful. Creating profile...", func_name=func_name
|
|
)
|
|
if success_data and isinstance(success_data, dict):
|
|
new_profile_name: Optional[str] = success_data.get("profile_name")
|
|
cloned_repo_path: Optional[str] = success_data.get("cloned_path")
|
|
cloned_remote_url: Optional[str] = success_data.get("remote_url")
|
|
if new_profile_name and cloned_repo_path and cloned_remote_url:
|
|
try:
|
|
defaults: dict = (
|
|
self.app.config_manager._get_expected_keys_with_defaults()
|
|
)
|
|
defaults["svn_working_copy_path"] = cloned_repo_path
|
|
defaults["remote_url"] = cloned_remote_url
|
|
defaults["remote_name"] = DEFAULT_REMOTE_NAME
|
|
defaults["bundle_name"] = f"{new_profile_name}.bundle"
|
|
defaults["bundle_name_updated"] = (
|
|
f"{new_profile_name}_update.bundle"
|
|
)
|
|
defaults["autobackup"] = "False"
|
|
defaults["autocommit"] = "False"
|
|
defaults["commit_message"] = ""
|
|
self.app.config_manager.add_section(new_profile_name)
|
|
for key, value in defaults.items():
|
|
self.app.config_manager.set_profile_option(
|
|
new_profile_name, key, value
|
|
)
|
|
self.app.config_manager.save_config()
|
|
log_handler.log_info(
|
|
f"Profile '{new_profile_name}' created successfully.",
|
|
func_name=func_name,
|
|
)
|
|
sections: list = self.app.config_manager.get_profile_sections()
|
|
if hasattr(
|
|
self.main_frame, "update_profile_dropdown"
|
|
) and hasattr(self.main_frame, "profile_var"):
|
|
self.main_frame.update_profile_dropdown(sections)
|
|
self.main_frame.profile_var.set(new_profile_name)
|
|
except Exception as profile_e:
|
|
log_handler.log_exception(
|
|
f"Clone ok, but failed to create profile '{new_profile_name}': {profile_e}",
|
|
func_name=func_name,
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error(
|
|
"Profile Creation Error",
|
|
f"Clone ok, failed create profile:\n{profile_e}",
|
|
)
|
|
self._reenable_widgets_after_modal()
|
|
else:
|
|
log_handler.log_error(
|
|
"Clone ok, but missing data for profile.", func_name=func_name
|
|
)
|
|
if hasattr(self.main_frame, "update_status_bar"):
|
|
self.main_frame.update_status_bar(
|
|
"Clone done, profile data missing."
|
|
)
|
|
self._reenable_widgets_after_modal()
|
|
else:
|
|
log_handler.log_error(
|
|
"Clone ok, but success data missing/invalid.", func_name=func_name
|
|
)
|
|
if hasattr(self.main_frame, "update_status_bar"):
|
|
self.main_frame.update_status_bar("Clone done, profile data error.")
|
|
self._reenable_widgets_after_modal()
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
f"Clone operation failed: {message}", func_name=func_name
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error("Clone Error", f"{message}")
|
|
self._reenable_widgets_after_modal()
|
|
return trigger_refreshes, sync_refresh
|
|
|
|
def _handle_checkout_tracking_branch_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_checkout_tracking_branch_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
trigger_refreshes: bool = False
|
|
sync_refresh: bool = False
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
f"Successfully checked out tracking branch: {message}",
|
|
func_name=func_name,
|
|
)
|
|
trigger_refreshes = True
|
|
sync_refresh = True
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
f"Checkout tracking branch failed: {message}", func_name=func_name
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error("Checkout Error", f"{message}")
|
|
if hasattr(self.main_frame, "update_ahead_behind_status"):
|
|
self.main_frame.update_ahead_behind_status(
|
|
status_text="Sync Status: Error"
|
|
)
|
|
self._reenable_widgets_after_modal()
|
|
return trigger_refreshes, sync_refresh
|
|
|
|
# --- Handler Specifico Spostato (Compare Branches) ---
|
|
def _handle_compare_branches_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_compare_branches_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: Optional[str] = result_data.get("message")
|
|
result_value: Any = result_data.get("result")
|
|
if status == "success":
|
|
log_handler.log_info(
|
|
"Branch comparison successful. Requesting summary window display...",
|
|
func_name=func_name,
|
|
)
|
|
ref1_ctx: Optional[str] = context.get("ref1")
|
|
ref2_ctx: Optional[str] = context.get("ref2")
|
|
repo_path_ctx: Optional[str] = context.get("repo_path")
|
|
changed_files_list: list = (
|
|
result_value if isinstance(result_value, list) else []
|
|
)
|
|
if ref1_ctx and ref2_ctx and repo_path_ctx:
|
|
if hasattr(self.app, "show_comparison_summary") and callable(
|
|
self.app.show_comparison_summary
|
|
):
|
|
self.app.show_comparison_summary(
|
|
ref1=ref1_ctx,
|
|
ref2=ref2_ctx,
|
|
repo_path=repo_path_ctx,
|
|
changed_files=changed_files_list,
|
|
)
|
|
else:
|
|
log_handler.log_error(
|
|
"Cannot display comparison summary: 'show_comparison_summary' method missing on app.",
|
|
func_name=func_name,
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error(
|
|
"Internal Error", "Cannot display comparison results."
|
|
)
|
|
self._reenable_widgets_after_modal()
|
|
else:
|
|
log_handler.log_error(
|
|
"Missing context data (ref1, ref2, repo_path) for comparison summary.",
|
|
func_name=func_name,
|
|
)
|
|
if hasattr(self.main_frame, "update_status_bar"):
|
|
self.main_frame.update_status_bar(
|
|
"Error: Internal data missing for comparison."
|
|
)
|
|
self._reenable_widgets_after_modal()
|
|
elif status == "error":
|
|
log_handler.log_error(
|
|
f"Branch comparison failed: {message}", func_name=func_name
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
self.main_frame.show_error(
|
|
"Comparison Error", f"Could not compare branches:\n{message}"
|
|
) # Widgets already re-enabled
|
|
return False, False
|
|
|
|
# --- Gestore Generico ---
|
|
def _handle_generic_result(
|
|
self, result_data: Dict[str, Any], context: Dict[str, Any]
|
|
) -> Tuple[bool, bool]:
|
|
# ... (Codice invariato)
|
|
func_name: str = "_handle_generic_result"
|
|
status: Optional[str] = result_data.get("status")
|
|
message: str = result_data.get("message", "Operation finished.")
|
|
exception: Optional[Exception] = result_data.get("exception")
|
|
task_context: str = context.get("context", "unknown")
|
|
log_handler.log_debug(
|
|
f"Handling generic result for '{task_context}' with status '{status}'.",
|
|
func_name=func_name,
|
|
)
|
|
if status == "error":
|
|
log_handler.log_error(
|
|
f"Error reported for task '{task_context}': {message}",
|
|
func_name=func_name,
|
|
)
|
|
error_details: str = (
|
|
f"{message}\n({type(exception).__name__}: {exception})"
|
|
if exception
|
|
else message
|
|
)
|
|
if hasattr(self.main_frame, "show_error"):
|
|
error_title: str = f"Error: {task_context.replace('_',' ').title()}"
|
|
self.main_frame.show_error(error_title, error_details)
|
|
elif status == "warning":
|
|
if hasattr(self.main_frame, "show_warning"):
|
|
self.main_frame.show_warning("Operation Info", message)
|
|
return False, False
|
|
|
|
# --- Helper Methods ---
|
|
# (Metodi _determine_auth_status_from_error, _reenable_widgets_after_modal,
|
|
# _trigger_post_action_refreshes - INVARIATI rispetto a prima)
|
|
def _determine_auth_status_from_error(self, exception: Optional[Exception]) -> str:
|
|
# ... (Codice invariato)
|
|
if isinstance(exception, GitCommandError) and exception.stderr:
|
|
stderr_low: str = exception.stderr.lower()
|
|
auth_errors: List[str] = [
|
|
"authentication failed",
|
|
"permission denied",
|
|
"could not read",
|
|
]
|
|
conn_errors: List[str] = [
|
|
"repository not found",
|
|
"could not resolve host",
|
|
"failed to connect",
|
|
"network is unreachable",
|
|
"timed out",
|
|
"unable to access",
|
|
]
|
|
if any(e in stderr_low for e in auth_errors):
|
|
return "failed"
|
|
if any(e in stderr_low for e in conn_errors):
|
|
return "connection_failed"
|
|
return "unknown_error"
|
|
|
|
def _reenable_widgets_after_modal(self):
|
|
# ... (Codice invariato)
|
|
if hasattr(self.app, "master") and self.app.master.winfo_exists():
|
|
self.app.master.after(
|
|
50, self.app._reenable_widgets_if_ready
|
|
) # Usa self.app._reenable_widgets_if_ready
|
|
|
|
def _trigger_post_action_refreshes(self, sync_refresh_needed: bool):
|
|
# ... (Codice invariato)
|
|
func_name: str = "_trigger_post_action_refreshes"
|
|
if not hasattr(self.app, "master") or not self.app.master.winfo_exists():
|
|
log_handler.log_warning(
|
|
"Cannot trigger refreshes: Master window closed.", func_name=func_name
|
|
)
|
|
return
|
|
current_repo_path: Optional[str] = self.app._get_and_validate_svn_path(
|
|
"Post-Action Refresh Trigger"
|
|
)
|
|
if not current_repo_path or not self.app._is_repo_ready(current_repo_path):
|
|
log_handler.log_warning(
|
|
"Cannot trigger refreshes: Repo path unavailable or not ready.",
|
|
func_name=func_name,
|
|
)
|
|
self._reenable_widgets_after_modal()
|
|
return
|
|
refresh_functions_to_call: List[Callable] = [
|
|
self.app.refresh_commit_history,
|
|
self.app.refresh_branch_list,
|
|
self.app.refresh_tag_list,
|
|
self.app.refresh_changed_files_list,
|
|
self.app.refresh_remote_branches,
|
|
]
|
|
log_handler.log_debug(
|
|
f"Scheduling {len(refresh_functions_to_call)} standard refreshes. Sync refresh needed: {sync_refresh_needed}",
|
|
func_name=func_name,
|
|
)
|
|
delay_ms: int = 50
|
|
for refresh_func in refresh_functions_to_call:
|
|
try:
|
|
schedule_func: Callable = lambda rf=refresh_func: rf()
|
|
self.app.master.after(delay_ms, schedule_func)
|
|
log_handler.log_debug(
|
|
f"Scheduled {getattr(refresh_func, '__name__', 'unknown_refresh')} with delay {delay_ms}ms.",
|
|
func_name=func_name,
|
|
)
|
|
delay_ms += 75
|
|
except Exception as ref_e:
|
|
log_handler.log_error(
|
|
f"Error scheduling {getattr(refresh_func, '__name__', 'refresh function')}: {ref_e}",
|
|
func_name=func_name,
|
|
)
|
|
if sync_refresh_needed:
|
|
if hasattr(self.app, "refresh_remote_status") and callable(
|
|
self.app.refresh_remote_status
|
|
):
|
|
log_handler.log_debug(
|
|
f"Scheduling remote status refresh with delay {delay_ms}ms.",
|
|
func_name=func_name,
|
|
)
|
|
self.app.master.after(delay_ms, self.app.refresh_remote_status)
|
|
else:
|
|
log_handler.log_warning(
|
|
"Sync refresh needed but refresh_remote_status method not found on app.",
|
|
func_name=func_name,
|
|
)
|
|
delay_ms += 75
|
|
|
|
def _reenable_widgets_final():
|
|
if (
|
|
hasattr(self.main_frame, "winfo_exists")
|
|
and self.main_frame.winfo_exists()
|
|
):
|
|
log_handler.log_debug(
|
|
"Re-enabling widgets after post-action refreshes scheduled.",
|
|
func_name="_reenable_widgets_final",
|
|
)
|
|
self.main_frame.set_action_widgets_state(tk.NORMAL)
|
|
|
|
self.app.master.after(delay_ms + 50, _reenable_widgets_final)
|
|
|
|
|
|
# --- End of AsyncResultHandler Class ---
|