SXXXXXXX_GitUtility/gitutility/core/remote_actions.py
2025-05-05 10:28:19 +02:00

805 lines
34 KiB
Python

# --- FILE: gitsync_tool/core/remote_actions.py ---
import os
from typing import Dict, Any, Optional # Aggiunto Optional
# Importa usando il percorso assoluto dal pacchetto gitsync_tool
from gitutility.logging_setup import log_handler
# Usa import relativo per salire di livello e raggiungere 'commands'
from ..commands.git_commands import GitCommands, GitCommandError
class RemoteActionHandler:
"""
Handles the execution logic for actions involving remote Git repositories,
such as configuring remotes, fetching, pulling, and pushing changes.
Uses GitCommands for underlying Git operations and log_handler for logging.
"""
# ---<<< MODIFICA: Type Hint per costruttore >>>---
def __init__(self, git_commands: GitCommands):
# ---<<< FINE MODIFICA >>>---
"""
Initializes the RemoteActionHandler.
Args:
git_commands (GitCommands): Instance for executing Git commands.
Raises:
TypeError: If git_commands argument is not a GitCommands instance.
"""
# Input type validation using the imported class name
if not isinstance(git_commands, GitCommands):
raise TypeError("RemoteActionHandler requires a GitCommands instance.")
self.git_commands: GitCommands = git_commands
log_handler.log_debug("RemoteActionHandler initialized.", func_name="__init__")
def apply_remote_config(
self, repo_path: str, remote_name: str, remote_url: str
) -> bool:
"""
Ensures the specified remote configuration exists in the local repository.
Adds the remote if it doesn't exist, or updates the URL if the name
exists but the URL differs. Uses log_handler.
Args:
repo_path (str): Path to the local Git repository.
remote_name (str): The desired local alias for the remote (e.g., 'origin').
remote_url (str): The URL of the remote repository.
Returns:
bool: True if the configuration was applied successfully or already correct,
False otherwise (should raise exception on failure).
Raises:
ValueError: If input arguments are invalid or repo path is not a Git repo.
GitCommandError: If underlying Git commands fail.
Exception: For unexpected errors.
"""
func_name: str = "apply_remote_config"
log_handler.log_info(
f"Applying remote config in '{repo_path}': Name='{remote_name}', URL='{remote_url}'",
func_name=func_name,
)
# --- Input Validation ---
if not repo_path or not os.path.isdir(repo_path):
raise ValueError(f"Invalid repository path provided: '{repo_path}'")
if not remote_name or remote_name.isspace():
raise ValueError("Remote name cannot be empty or whitespace.")
if not remote_url or remote_url.isspace():
raise ValueError("Remote URL cannot be empty or whitespace.")
if not os.path.exists(os.path.join(repo_path, ".git")):
raise ValueError(f"Directory '{repo_path}' is not a valid Git repository.")
try:
# 1. Check existing remotes using GitCommands
log_handler.log_debug(
f"Checking existing remotes in '{repo_path}'...", func_name=func_name
)
# get_remotes should return a dict like {'origin': 'url1', 'upstream': 'url2'}
existing_remotes: Dict[str, str] = self.git_commands.get_remotes(repo_path)
log_handler.log_debug(
f"Found remotes: {existing_remotes}", func_name=func_name
)
# 2. Decide whether to add, update, or do nothing
if remote_name in existing_remotes:
# Remote name already exists
current_url: str = existing_remotes[remote_name]
if current_url == remote_url:
# URL is already correct, no action needed
log_handler.log_info(
f"Remote '{remote_name}' with URL '{remote_url}' already configured correctly. No changes needed.",
func_name=func_name,
)
return True # Indicate success (no change needed)
else:
# URL differs, need to update it
log_handler.log_info(
f"Remote '{remote_name}' exists but URL differs. Updating URL...",
func_name=func_name,
)
log_handler.log_debug(
f" Current URL: {current_url}", func_name=func_name
)
log_handler.log_debug(
f" New URL : {remote_url}", func_name=func_name
)
# Call GitCommands method to set the URL
update_success: bool = self.git_commands.set_remote_url(
repo_path, remote_name, remote_url
)
if update_success:
log_handler.log_info(
f"URL for remote '{remote_name}' updated successfully.",
func_name=func_name,
)
return True # Indicate success
else:
# Should not happen if set_remote_url raises on error
log_handler.log_error(
f"set_remote_url command reported failure unexpectedly for '{remote_name}'.",
func_name=func_name,
)
return False # Indicate failure
else:
# Remote name does not exist, add it
log_handler.log_info(
f"Remote '{remote_name}' not found. Adding new remote with URL '{remote_url}'.",
func_name=func_name,
)
# Call GitCommands method to add the remote
add_success: bool = self.git_commands.add_remote(
repo_path, remote_name, remote_url
)
if add_success:
log_handler.log_info(
f"Remote '{remote_name}' added successfully.",
func_name=func_name,
)
return True # Indicate success
else:
# Should not happen if add_remote raises on error
log_handler.log_error(
f"add_remote command reported failure unexpectedly for '{remote_name}'.",
func_name=func_name,
)
return False # Indicate failure
except (GitCommandError, ValueError) as e:
# Log specific known errors and re-raise for the worker
log_handler.log_error(
f"Failed to apply remote configuration for '{remote_name}': {e}",
func_name=func_name,
)
raise e
except Exception as e:
# Log unexpected errors and wrap them
log_handler.log_exception(
f"Unexpected error applying remote config for '{remote_name}': {e}",
func_name=func_name,
)
raise GitCommandError(
f"Unexpected error applying remote config: {e}"
) from e
def execute_remote_fetch(self, repo_path: str, remote_name: str) -> Dict[str, Any]:
"""
Executes 'git fetch' for the specified remote and analyzes the result.
Args:
repo_path (str): Path to the local repository.
remote_name (str): The name of the remote to fetch (e.g., 'origin').
Returns:
Dict[str, Any]: Dictionary containing 'status', 'message', and optional 'exception'.
Status can be 'success' or 'error'.
"""
func_name: str = "execute_remote_fetch"
log_handler.log_info(
f"Executing fetch for remote '{remote_name}' in '{repo_path}'",
func_name=func_name,
)
# --- Input Validation ---
if not repo_path or not os.path.isdir(repo_path):
raise ValueError(f"Invalid repository path: '{repo_path}'")
if not remote_name or remote_name.isspace():
raise ValueError("Remote name cannot be empty.")
if not os.path.exists(os.path.join(repo_path, ".git")):
raise ValueError(f"Directory '{repo_path}' is not a Git repository.")
result_info: Dict[str, Any] = {
"status": "unknown",
"message": "Fetch operation not completed.",
"exception": None,
}
try:
# Call git_fetch from GitCommands (check=False)
fetch_result = self.git_commands.git_fetch(
working_directory=repo_path,
remote_name=remote_name,
prune=True, # Prune remote branches by default
)
# --- Analyze Result ---
if fetch_result.returncode == 0:
# Success
result_info["status"] = "success"
result_info["message"] = (
f"Fetch from '{remote_name}' completed successfully."
)
# Log stdout/stderr from fetch command if needed (at DEBUG level)
if fetch_result.stdout:
log_handler.log_debug(
f"Fetch stdout: {fetch_result.stdout.strip()}",
func_name=func_name,
)
if fetch_result.stderr:
log_handler.log_debug(
f"Fetch stderr: {fetch_result.stderr.strip()}",
func_name=func_name,
)
log_handler.log_info(
f"Fetch successful for '{remote_name}'.", func_name=func_name
)
else:
# Error during fetch
result_info["status"] = "error"
stderr_content: str = fetch_result.stderr if fetch_result.stderr else ""
stderr_lower: str = stderr_content.lower()
log_handler.log_error(
f"Fetch command failed for '{remote_name}' (RC={fetch_result.returncode}). Stderr: {stderr_content.strip()}",
func_name=func_name,
)
# Check for specific known error patterns in stderr
auth_errors = [
"authentication failed",
"permission denied",
"could not read username",
"could not read password",
]
conn_errors = [
"repository not found",
"could not resolve host",
"name or service not known",
"network is unreachable",
"failed to connect",
"unable to access",
]
if any(err in stderr_lower for err in auth_errors):
result_info["message"] = (
f"Authentication required or failed for remote '{remote_name}' during fetch."
)
elif any(err in stderr_lower for err in conn_errors):
result_info["message"] = (
f"Connection failed for remote '{remote_name}': Repository or host not found/reachable."
)
else:
# Generic error message
result_info["message"] = (
f"Fetch from '{remote_name}' failed (RC={fetch_result.returncode}). Check logs."
)
# Include exception details in the result
result_info["exception"] = GitCommandError(
result_info["message"], stderr=stderr_content
)
except (GitCommandError, ValueError) as e:
# Catch validation errors or errors raised by git_fetch if check=True was used (unlikely here)
log_handler.log_error(
f"Error during fetch setup or execution for '{remote_name}': {e}",
func_name=func_name,
)
result_info = {
"status": "error",
"message": f"Fetch failed: {e}",
"exception": e,
}
except Exception as e:
# Catch unexpected errors
log_handler.log_exception(
f"Unexpected error during fetch for '{remote_name}': {e}",
func_name=func_name,
)
result_info = {
"status": "error",
"message": f"Unexpected fetch error: {type(e).__name__}",
"exception": e,
}
return result_info
def execute_remote_pull(
self, repo_path: str, remote_name: str, current_branch_name: str
) -> Dict[str, Any]:
"""
Executes 'git pull' for the specified remote and current branch.
Detects success, merge conflicts, and other errors. Uses log_handler.
Args:
repo_path (str): Path to the local repository.
remote_name (str): The name of the remote to pull from (e.g., 'origin').
current_branch_name (str): The name of the currently checked-out local branch.
Returns:
Dict[str, Any]: Dictionary containing status ('success', 'conflict', 'error'),
message, and optionally exception.
"""
func_name: str = "execute_remote_pull"
log_handler.log_info(
f"Executing pull from remote '{remote_name}' into branch '{current_branch_name}' in '{repo_path}'",
func_name=func_name,
)
# --- Input Validation ---
if not repo_path or not os.path.isdir(repo_path):
raise ValueError(f"Invalid repository path: '{repo_path}'")
if not remote_name or remote_name.isspace():
raise ValueError("Remote name cannot be empty.")
if not current_branch_name or current_branch_name.isspace():
raise ValueError("Current branch name cannot be empty.")
if not os.path.exists(os.path.join(repo_path, ".git")):
raise ValueError(f"Directory '{repo_path}' is not a Git repository.")
# --- Pre-Pull Check: Uncommitted Changes ---
try:
if self.git_commands.git_status_has_changes(repo_path):
msg: str = (
"Pull aborted: Uncommitted changes detected. Please commit or stash first."
)
log_handler.log_warning(msg, func_name=func_name)
return {"status": "error", "message": msg, "exception": ValueError(msg)}
except GitCommandError as status_err:
msg = f"Pull aborted: Failed to check repository status: {status_err}"
log_handler.log_error(msg, func_name=func_name)
return {"status": "error", "message": msg, "exception": status_err}
# --- Esecuzione Git Pull ---
result_info: Dict[str, Any] = {
"status": "unknown",
"message": "Pull not completed.",
"exception": None,
}
try:
# Chiama git_pull (check=False)
pull_result = self.git_commands.git_pull(
repo_path, remote_name, current_branch_name
)
# --- Analyze Result ---
stdout_full: str = pull_result.stdout if pull_result.stdout else ""
stderr_full: str = pull_result.stderr if pull_result.stderr else ""
combined_output_lower: str = (stdout_full + stderr_full).lower()
if pull_result.returncode == 0:
result_info["status"] = "success"
if "already up to date" in combined_output_lower:
result_info["message"] = (
f"Pull from '{remote_name}': Repository already up-to-date."
)
log_handler.log_info(
f"Pull successful (already up-to-date) for '{remote_name}'.",
func_name=func_name,
)
else:
result_info["message"] = (
f"Pull from '{remote_name}' completed successfully."
)
log_handler.log_info(
f"Pull successful for '{remote_name}'. Output logged.",
func_name=func_name,
)
elif pull_result.returncode == 1 and (
"conflict" in combined_output_lower
or "automatic merge failed" in combined_output_lower
or "fix conflicts and then commit the result" in combined_output_lower
):
result_info["status"] = "conflict" # Specific status for conflicts
result_info["message"] = (
f"Pull from '{remote_name}' resulted in merge conflicts. Please resolve them manually in '{repo_path}' and commit."
)
log_handler.log_error(
f"Merge conflict detected during pull from '{remote_name}'.",
func_name=func_name,
)
else:
# Other Error (RC != 0 and not a conflict)
result_info["status"] = "error"
stderr_lower: str = stderr_full.lower()
log_handler.log_error(
f"Pull command failed for '{remote_name}' (RC={pull_result.returncode}). Stderr: {stderr_full.strip()}",
func_name=func_name,
)
# Check specific error patterns
auth_errors = [
"authentication failed",
"permission denied",
"could not read username",
"could not read password",
]
conn_errors = [
"repository not found",
"could not resolve host",
"name or service not known",
"network is unreachable",
"failed to connect",
"unable to access",
]
upstream_errors = [
"no tracking information",
"no upstream branch",
"refusing to merge unrelated histories",
]
if any(err in stderr_lower for err in auth_errors):
result_info["message"] = (
f"Authentication required or failed for remote '{remote_name}' during pull."
)
elif any(err in stderr_lower for err in conn_errors):
result_info["message"] = (
f"Connection failed for remote '{remote_name}' during pull."
)
elif any(err in stderr_lower for err in upstream_errors):
result_info["message"] = (
f"Pull failed for '{remote_name}': Check branch upstream config or history. Details: {stderr_full.strip()}"
)
else:
result_info["message"] = (
f"Pull from '{remote_name}' failed (RC={pull_result.returncode}). Check logs."
)
result_info["exception"] = GitCommandError(
result_info["message"], stderr=stderr_full
)
except (GitCommandError, ValueError) as e:
log_handler.log_error(
f"Error during pull setup/execution for '{remote_name}': {e}",
func_name=func_name,
)
result_info = {
"status": "error",
"message": f"Pull failed: {e}",
"exception": e,
}
except Exception as e:
log_handler.log_exception(
f"Unexpected error during pull for '{remote_name}': {e}",
func_name=func_name,
)
result_info = {
"status": "error",
"message": f"Unexpected pull error: {type(e).__name__}",
"exception": e,
}
return result_info
def execute_remote_push(
self,
repo_path: str,
remote_name: str,
current_branch_name: str,
force: bool = False,
) -> Dict[str, Any]:
"""
Executes 'git push' for the current branch, handles upstream setup and errors.
Args:
repo_path (str): Path to the local repository.
remote_name (str): Name of the remote to push to.
current_branch_name (str): Name of the local branch to push.
force (bool): Whether to force the push.
Returns:
Dict[str, Any]: Dictionary with 'status', 'message', optional 'exception'.
Status can be 'success', 'rejected', 'error'.
"""
func_name: str = "execute_remote_push"
action_desc: str = (
f"Push branch '{current_branch_name}' to remote '{remote_name}'"
)
if force:
action_desc += " (FORCE PUSH)"
log_handler.log_info(
f"Executing: {action_desc} in '{repo_path}'", func_name=func_name
)
# --- Input Validation ---
if not repo_path or not os.path.isdir(repo_path):
raise ValueError(f"Invalid repository path: '{repo_path}'")
if not remote_name or remote_name.isspace():
raise ValueError("Remote name cannot be empty.")
if not current_branch_name or current_branch_name.isspace():
raise ValueError("Current branch name cannot be empty.")
if not os.path.exists(os.path.join(repo_path, ".git")):
raise ValueError(f"Directory '{repo_path}' is not a Git repository.")
# --- Pre-Push Checks ---
needs_set_upstream: bool = False
try:
upstream: Optional[str] = self.git_commands.get_branch_upstream(
repo_path, current_branch_name
)
if upstream is None:
needs_set_upstream = True
log_handler.log_info(
f"Upstream not set for '{current_branch_name}'. Will use --set-upstream.",
func_name=func_name,
)
else:
log_handler.log_debug(
f"Branch '{current_branch_name}' tracks '{upstream}'.",
func_name=func_name,
)
except GitCommandError as e:
msg: str = (
f"Push aborted: Failed checking upstream for '{current_branch_name}': {e}"
)
log_handler.log_error(msg, func_name=func_name)
return {"status": "error", "message": msg, "exception": e}
# --- Esecuzione Git Push ---
result_info: Dict[str, Any] = {
"status": "unknown",
"message": "Push not completed.",
"exception": None,
}
try:
# Chiama git_push (check=False)
push_result = self.git_commands.git_push(
working_directory=repo_path,
remote_name=remote_name,
branch_name=current_branch_name,
set_upstream=needs_set_upstream,
force=force,
)
# --- Analyze Result ---
stdout_full: str = push_result.stdout if push_result.stdout else ""
stderr_full: str = push_result.stderr if push_result.stderr else ""
combined_output_lower: str = (stdout_full + stderr_full).lower()
if push_result.returncode == 0:
result_info["status"] = "success"
if "everything up-to-date" in combined_output_lower:
result_info["message"] = (
f"Push to '{remote_name}': Branch '{current_branch_name}' already up-to-date."
)
log_handler.log_info(
f"Push successful (already up-to-date) for '{current_branch_name}'.",
func_name=func_name,
)
else:
result_info["message"] = (
f"Push branch '{current_branch_name}' to '{remote_name}' completed successfully."
)
log_handler.log_info(
f"Push successful for '{current_branch_name}'. Output logged.",
func_name=func_name,
)
elif push_result.returncode == 1 and (
"rejected" in stderr_lower or "failed to push some refs" in stderr_lower
):
is_non_fast_forward: bool = (
"non-fast-forward" in stderr_lower
or "updates were rejected because the remote contains work"
in stderr_lower
)
if is_non_fast_forward and not force:
result_info["status"] = "rejected"
result_info["message"] = (
f"Push rejected: Remote has changes you don't have locally for '{current_branch_name}'. Try pulling first."
)
log_handler.log_error(
f"Push rejected (non-fast-forward) for '{current_branch_name}'. User needs to pull.",
func_name=func_name,
)
elif force and is_non_fast_forward:
result_info["status"] = (
"error" # Force push rejected is still an error
)
result_info["message"] = (
f"FORCE PUSH rejected for '{current_branch_name}'. Reason: {stderr_full.strip()}"
)
log_handler.log_error(
f"FORCE PUSH rejected for '{current_branch_name}'. Stderr: {stderr_full.strip()}",
func_name=func_name,
)
result_info["exception"] = GitCommandError(
result_info["message"], stderr=push_result.stderr
)
else: # Other rejection reason
result_info["status"] = "rejected"
result_info["message"] = (
f"Push rejected for '{current_branch_name}'. Reason: {stderr_full.strip()}"
)
log_handler.log_error(
f"Push rejected for '{current_branch_name}'. Stderr: {stderr_full.strip()}",
func_name=func_name,
)
result_info["exception"] = GitCommandError(
result_info["message"], stderr=push_result.stderr
)
else:
# Other Error
result_info["status"] = "error"
log_handler.log_error(
f"Push command failed for '{current_branch_name}' (RC={push_result.returncode}). Stderr: {stderr_lower}",
func_name=func_name,
)
auth_errors = [
"authentication failed",
"permission denied",
"could not read username",
"could not read password",
]
conn_errors = [
"repository not found",
"could not resolve host",
"name or service not known",
"network is unreachable",
"failed to connect",
"unable to access",
]
if any(err in stderr_lower for err in auth_errors):
result_info["message"] = (
f"Authentication required or failed for remote '{remote_name}' during push."
)
elif any(err in stderr_lower for err in conn_errors):
result_info["message"] = (
f"Connection failed for remote '{remote_name}' during push."
)
elif (
f"src refspec {current_branch_name} does not match any"
in stderr_lower
):
result_info["message"] = (
f"Push failed: Local branch '{current_branch_name}' not found."
)
else:
result_info["message"] = (
f"Push to '{remote_name}' failed (RC={push_result.returncode}). Check logs."
)
result_info["exception"] = GitCommandError(
result_info["message"], stderr=push_result.stderr
)
except (GitCommandError, ValueError) as e:
log_handler.log_error(
f"Error during push setup/execution for '{current_branch_name}': {e}",
func_name=func_name,
)
result_info = {
"status": "error",
"message": f"Push failed: {e}",
"exception": e,
}
except Exception as e:
log_handler.log_exception(
f"Unexpected error during push for '{current_branch_name}': {e}",
func_name=func_name,
)
result_info = {
"status": "error",
"message": f"Unexpected push error: {type(e).__name__}",
"exception": e,
}
return result_info
def execute_push_tags(self, repo_path: str, remote_name: str) -> Dict[str, Any]:
"""
Executes 'git push --tags' to the specified remote. Uses log_handler.
Args:
repo_path (str): Path to the local repository.
remote_name (str): The name of the remote to push tags to.
Returns:
Dict[str, Any]: Dictionary with 'status', 'message', optional 'exception'.
"""
func_name: str = "execute_push_tags"
log_handler.log_info(
f"Executing push tags to remote '{remote_name}' in '{repo_path}'",
func_name=func_name,
)
# --- Input Validation ---
if not repo_path or not os.path.isdir(repo_path):
raise ValueError(f"Invalid repository path: '{repo_path}'")
if not remote_name or remote_name.isspace():
raise ValueError("Remote name cannot be empty.")
if not os.path.exists(os.path.join(repo_path, ".git")):
raise ValueError(f"Directory '{repo_path}' is not a Git repository.")
# --- Esecuzione Git Push Tags ---
result_info: Dict[str, Any] = {
"status": "unknown",
"message": "Push tags not completed.",
"exception": None,
}
try:
# Chiama git_push_tags (check=False)
push_result = self.git_commands.git_push_tags(repo_path, remote_name)
# --- Analyze Result ---
stderr_full: str = push_result.stderr if push_result.stderr else ""
stderr_lower: str = stderr_full.lower()
stdout_lower: str = (push_result.stdout or "").lower()
if push_result.returncode == 0:
result_info["status"] = "success"
if (
"everything up-to-date" in stdout_lower
or "everything up-to-date" in stderr_lower
):
result_info["message"] = (
f"Push tags to '{remote_name}': All tags already up-to-date."
)
log_handler.log_info(
f"Push tags successful (already up-to-date) for '{remote_name}'.",
func_name=func_name,
)
else:
result_info["message"] = (
f"Push tags to '{remote_name}' completed successfully."
)
log_handler.log_info(
f"Push tags successful for '{remote_name}'. Output logged.",
func_name=func_name,
)
else:
# Error
result_info["status"] = "error"
log_handler.log_error(
f"Push tags command failed for '{remote_name}' (RC={push_result.returncode}). Stderr: {stderr_lower}",
func_name=func_name,
)
auth_errors = [
"authentication failed",
"permission denied",
"could not read username",
"could not read password",
]
conn_errors = [
"repository not found",
"could not resolve host",
"name or service not known",
"network is unreachable",
"failed to connect",
"unable to access",
]
if any(err in stderr_lower for err in auth_errors):
result_info["message"] = (
f"Authentication required or failed for remote '{remote_name}' during push tags."
)
elif any(err in stderr_lower for err in conn_errors):
result_info["message"] = (
f"Connection failed for remote '{remote_name}' during push tags."
)
else:
result_info["message"] = (
f"Push tags to '{remote_name}' failed (RC={push_result.returncode}). Check logs."
)
result_info["exception"] = GitCommandError(
result_info["message"], stderr=push_result.stderr
)
except (GitCommandError, ValueError) as e:
log_handler.log_error(
f"Error during push tags execution for '{remote_name}': {e}",
func_name=func_name,
)
result_info = {
"status": "error",
"message": f"Push tags failed: {e}",
"exception": e,
}
except Exception as e:
log_handler.log_exception(
f"Unexpected error during push tags for '{remote_name}': {e}",
func_name=func_name,
)
result_info = {
"status": "error",
"message": f"Unexpected push tags error: {type(e).__name__}",
"exception": e,
}
return result_info
# --- END OF FILE gitsync_tool/core/remote_actions.py ---