# --- FILE: gitsync_tool/core/remote_actions.py --- import os from typing import Dict, Any, Optional # Aggiunto Optional # Importa usando il percorso assoluto dal pacchetto gitsync_tool from gitutility.logging_setup import log_handler # Usa import relativo per salire di livello e raggiungere 'commands' from ..commands.git_commands import GitCommands, GitCommandError class RemoteActionHandler: """ Handles the execution logic for actions involving remote Git repositories, such as configuring remotes, fetching, pulling, and pushing changes. Uses GitCommands for underlying Git operations and log_handler for logging. """ # ---<<< MODIFICA: Type Hint per costruttore >>>--- def __init__(self, git_commands: GitCommands): # ---<<< FINE MODIFICA >>>--- """ Initializes the RemoteActionHandler. Args: git_commands (GitCommands): Instance for executing Git commands. Raises: TypeError: If git_commands argument is not a GitCommands instance. """ # Input type validation using the imported class name if not isinstance(git_commands, GitCommands): raise TypeError("RemoteActionHandler requires a GitCommands instance.") self.git_commands: GitCommands = git_commands log_handler.log_debug("RemoteActionHandler initialized.", func_name="__init__") def apply_remote_config( self, repo_path: str, remote_name: str, remote_url: str ) -> bool: """ Ensures the specified remote configuration exists in the local repository. Adds the remote if it doesn't exist, or updates the URL if the name exists but the URL differs. Uses log_handler. Args: repo_path (str): Path to the local Git repository. remote_name (str): The desired local alias for the remote (e.g., 'origin'). remote_url (str): The URL of the remote repository. Returns: bool: True if the configuration was applied successfully or already correct, False otherwise (should raise exception on failure). Raises: ValueError: If input arguments are invalid or repo path is not a Git repo. GitCommandError: If underlying Git commands fail. Exception: For unexpected errors. """ func_name: str = "apply_remote_config" log_handler.log_info( f"Applying remote config in '{repo_path}': Name='{remote_name}', URL='{remote_url}'", func_name=func_name, ) # --- Input Validation --- if not repo_path or not os.path.isdir(repo_path): raise ValueError(f"Invalid repository path provided: '{repo_path}'") if not remote_name or remote_name.isspace(): raise ValueError("Remote name cannot be empty or whitespace.") if not remote_url or remote_url.isspace(): raise ValueError("Remote URL cannot be empty or whitespace.") if not os.path.exists(os.path.join(repo_path, ".git")): raise ValueError(f"Directory '{repo_path}' is not a valid Git repository.") try: # 1. Check existing remotes using GitCommands log_handler.log_debug( f"Checking existing remotes in '{repo_path}'...", func_name=func_name ) # get_remotes should return a dict like {'origin': 'url1', 'upstream': 'url2'} existing_remotes: Dict[str, str] = self.git_commands.get_remotes(repo_path) log_handler.log_debug( f"Found remotes: {existing_remotes}", func_name=func_name ) # 2. Decide whether to add, update, or do nothing if remote_name in existing_remotes: # Remote name already exists current_url: str = existing_remotes[remote_name] if current_url == remote_url: # URL is already correct, no action needed log_handler.log_info( f"Remote '{remote_name}' with URL '{remote_url}' already configured correctly. No changes needed.", func_name=func_name, ) return True # Indicate success (no change needed) else: # URL differs, need to update it log_handler.log_info( f"Remote '{remote_name}' exists but URL differs. Updating URL...", func_name=func_name, ) log_handler.log_debug( f" Current URL: {current_url}", func_name=func_name ) log_handler.log_debug( f" New URL : {remote_url}", func_name=func_name ) # Call GitCommands method to set the URL update_success: bool = self.git_commands.set_remote_url( repo_path, remote_name, remote_url ) if update_success: log_handler.log_info( f"URL for remote '{remote_name}' updated successfully.", func_name=func_name, ) return True # Indicate success else: # Should not happen if set_remote_url raises on error log_handler.log_error( f"set_remote_url command reported failure unexpectedly for '{remote_name}'.", func_name=func_name, ) return False # Indicate failure else: # Remote name does not exist, add it log_handler.log_info( f"Remote '{remote_name}' not found. Adding new remote with URL '{remote_url}'.", func_name=func_name, ) # Call GitCommands method to add the remote add_success: bool = self.git_commands.add_remote( repo_path, remote_name, remote_url ) if add_success: log_handler.log_info( f"Remote '{remote_name}' added successfully.", func_name=func_name, ) return True # Indicate success else: # Should not happen if add_remote raises on error log_handler.log_error( f"add_remote command reported failure unexpectedly for '{remote_name}'.", func_name=func_name, ) return False # Indicate failure except (GitCommandError, ValueError) as e: # Log specific known errors and re-raise for the worker log_handler.log_error( f"Failed to apply remote configuration for '{remote_name}': {e}", func_name=func_name, ) raise e except Exception as e: # Log unexpected errors and wrap them log_handler.log_exception( f"Unexpected error applying remote config for '{remote_name}': {e}", func_name=func_name, ) raise GitCommandError( f"Unexpected error applying remote config: {e}" ) from e def execute_remote_fetch(self, repo_path: str, remote_name: str) -> Dict[str, Any]: """ Executes 'git fetch' for the specified remote and analyzes the result. Args: repo_path (str): Path to the local repository. remote_name (str): The name of the remote to fetch (e.g., 'origin'). Returns: Dict[str, Any]: Dictionary containing 'status', 'message', and optional 'exception'. Status can be 'success' or 'error'. """ func_name: str = "execute_remote_fetch" log_handler.log_info( f"Executing fetch for remote '{remote_name}' in '{repo_path}'", func_name=func_name, ) # --- Input Validation --- if not repo_path or not os.path.isdir(repo_path): raise ValueError(f"Invalid repository path: '{repo_path}'") if not remote_name or remote_name.isspace(): raise ValueError("Remote name cannot be empty.") if not os.path.exists(os.path.join(repo_path, ".git")): raise ValueError(f"Directory '{repo_path}' is not a Git repository.") result_info: Dict[str, Any] = { "status": "unknown", "message": "Fetch operation not completed.", "exception": None, } try: # Call git_fetch from GitCommands (check=False) fetch_result = self.git_commands.git_fetch( working_directory=repo_path, remote_name=remote_name, prune=True, # Prune remote branches by default ) # --- Analyze Result --- if fetch_result.returncode == 0: # Success result_info["status"] = "success" result_info["message"] = ( f"Fetch from '{remote_name}' completed successfully." ) # Log stdout/stderr from fetch command if needed (at DEBUG level) if fetch_result.stdout: log_handler.log_debug( f"Fetch stdout: {fetch_result.stdout.strip()}", func_name=func_name, ) if fetch_result.stderr: log_handler.log_debug( f"Fetch stderr: {fetch_result.stderr.strip()}", func_name=func_name, ) log_handler.log_info( f"Fetch successful for '{remote_name}'.", func_name=func_name ) else: # Error during fetch result_info["status"] = "error" stderr_content: str = fetch_result.stderr if fetch_result.stderr else "" stderr_lower: str = stderr_content.lower() log_handler.log_error( f"Fetch command failed for '{remote_name}' (RC={fetch_result.returncode}). Stderr: {stderr_content.strip()}", func_name=func_name, ) # Check for specific known error patterns in stderr auth_errors = [ "authentication failed", "permission denied", "could not read username", "could not read password", ] conn_errors = [ "repository not found", "could not resolve host", "name or service not known", "network is unreachable", "failed to connect", "unable to access", ] if any(err in stderr_lower for err in auth_errors): result_info["message"] = ( f"Authentication required or failed for remote '{remote_name}' during fetch." ) elif any(err in stderr_lower for err in conn_errors): result_info["message"] = ( f"Connection failed for remote '{remote_name}': Repository or host not found/reachable." ) else: # Generic error message result_info["message"] = ( f"Fetch from '{remote_name}' failed (RC={fetch_result.returncode}). Check logs." ) # Include exception details in the result result_info["exception"] = GitCommandError( result_info["message"], stderr=stderr_content ) except (GitCommandError, ValueError) as e: # Catch validation errors or errors raised by git_fetch if check=True was used (unlikely here) log_handler.log_error( f"Error during fetch setup or execution for '{remote_name}': {e}", func_name=func_name, ) result_info = { "status": "error", "message": f"Fetch failed: {e}", "exception": e, } except Exception as e: # Catch unexpected errors log_handler.log_exception( f"Unexpected error during fetch for '{remote_name}': {e}", func_name=func_name, ) result_info = { "status": "error", "message": f"Unexpected fetch error: {type(e).__name__}", "exception": e, } return result_info def execute_remote_pull( self, repo_path: str, remote_name: str, current_branch_name: str ) -> Dict[str, Any]: """ Executes 'git pull' for the specified remote and current branch. Detects success, merge conflicts, and other errors. Uses log_handler. Args: repo_path (str): Path to the local repository. remote_name (str): The name of the remote to pull from (e.g., 'origin'). current_branch_name (str): The name of the currently checked-out local branch. Returns: Dict[str, Any]: Dictionary containing status ('success', 'conflict', 'error'), message, and optionally exception. """ func_name: str = "execute_remote_pull" log_handler.log_info( f"Executing pull from remote '{remote_name}' into branch '{current_branch_name}' in '{repo_path}'", func_name=func_name, ) # --- Input Validation --- if not repo_path or not os.path.isdir(repo_path): raise ValueError(f"Invalid repository path: '{repo_path}'") if not remote_name or remote_name.isspace(): raise ValueError("Remote name cannot be empty.") if not current_branch_name or current_branch_name.isspace(): raise ValueError("Current branch name cannot be empty.") if not os.path.exists(os.path.join(repo_path, ".git")): raise ValueError(f"Directory '{repo_path}' is not a Git repository.") # --- Pre-Pull Check: Uncommitted Changes --- try: if self.git_commands.git_status_has_changes(repo_path): msg: str = ( "Pull aborted: Uncommitted changes detected. Please commit or stash first." ) log_handler.log_warning(msg, func_name=func_name) return {"status": "error", "message": msg, "exception": ValueError(msg)} except GitCommandError as status_err: msg = f"Pull aborted: Failed to check repository status: {status_err}" log_handler.log_error(msg, func_name=func_name) return {"status": "error", "message": msg, "exception": status_err} # --- Esecuzione Git Pull --- result_info: Dict[str, Any] = { "status": "unknown", "message": "Pull not completed.", "exception": None, } try: # Chiama git_pull (check=False) pull_result = self.git_commands.git_pull( repo_path, remote_name, current_branch_name ) # --- Analyze Result --- stdout_full: str = pull_result.stdout if pull_result.stdout else "" stderr_full: str = pull_result.stderr if pull_result.stderr else "" combined_output_lower: str = (stdout_full + stderr_full).lower() if pull_result.returncode == 0: result_info["status"] = "success" if "already up to date" in combined_output_lower: result_info["message"] = ( f"Pull from '{remote_name}': Repository already up-to-date." ) log_handler.log_info( f"Pull successful (already up-to-date) for '{remote_name}'.", func_name=func_name, ) else: result_info["message"] = ( f"Pull from '{remote_name}' completed successfully." ) log_handler.log_info( f"Pull successful for '{remote_name}'. Output logged.", func_name=func_name, ) elif pull_result.returncode == 1 and ( "conflict" in combined_output_lower or "automatic merge failed" in combined_output_lower or "fix conflicts and then commit the result" in combined_output_lower ): result_info["status"] = "conflict" # Specific status for conflicts result_info["message"] = ( f"Pull from '{remote_name}' resulted in merge conflicts. Please resolve them manually in '{repo_path}' and commit." ) log_handler.log_error( f"Merge conflict detected during pull from '{remote_name}'.", func_name=func_name, ) else: # Other Error (RC != 0 and not a conflict) result_info["status"] = "error" stderr_lower: str = stderr_full.lower() log_handler.log_error( f"Pull command failed for '{remote_name}' (RC={pull_result.returncode}). Stderr: {stderr_full.strip()}", func_name=func_name, ) # Check specific error patterns auth_errors = [ "authentication failed", "permission denied", "could not read username", "could not read password", ] conn_errors = [ "repository not found", "could not resolve host", "name or service not known", "network is unreachable", "failed to connect", "unable to access", ] upstream_errors = [ "no tracking information", "no upstream branch", "refusing to merge unrelated histories", ] if any(err in stderr_lower for err in auth_errors): result_info["message"] = ( f"Authentication required or failed for remote '{remote_name}' during pull." ) elif any(err in stderr_lower for err in conn_errors): result_info["message"] = ( f"Connection failed for remote '{remote_name}' during pull." ) elif any(err in stderr_lower for err in upstream_errors): result_info["message"] = ( f"Pull failed for '{remote_name}': Check branch upstream config or history. Details: {stderr_full.strip()}" ) else: result_info["message"] = ( f"Pull from '{remote_name}' failed (RC={pull_result.returncode}). Check logs." ) result_info["exception"] = GitCommandError( result_info["message"], stderr=stderr_full ) except (GitCommandError, ValueError) as e: log_handler.log_error( f"Error during pull setup/execution for '{remote_name}': {e}", func_name=func_name, ) result_info = { "status": "error", "message": f"Pull failed: {e}", "exception": e, } except Exception as e: log_handler.log_exception( f"Unexpected error during pull for '{remote_name}': {e}", func_name=func_name, ) result_info = { "status": "error", "message": f"Unexpected pull error: {type(e).__name__}", "exception": e, } return result_info def execute_remote_push( self, repo_path: str, remote_name: str, current_branch_name: str, force: bool = False, ) -> Dict[str, Any]: """ Executes 'git push' for the current branch, handles upstream setup and errors. Args: repo_path (str): Path to the local repository. remote_name (str): Name of the remote to push to. current_branch_name (str): Name of the local branch to push. force (bool): Whether to force the push. Returns: Dict[str, Any]: Dictionary with 'status', 'message', optional 'exception'. Status can be 'success', 'rejected', 'error'. """ func_name: str = "execute_remote_push" action_desc: str = ( f"Push branch '{current_branch_name}' to remote '{remote_name}'" ) if force: action_desc += " (FORCE PUSH)" log_handler.log_info( f"Executing: {action_desc} in '{repo_path}'", func_name=func_name ) # --- Input Validation --- if not repo_path or not os.path.isdir(repo_path): raise ValueError(f"Invalid repository path: '{repo_path}'") if not remote_name or remote_name.isspace(): raise ValueError("Remote name cannot be empty.") if not current_branch_name or current_branch_name.isspace(): raise ValueError("Current branch name cannot be empty.") if not os.path.exists(os.path.join(repo_path, ".git")): raise ValueError(f"Directory '{repo_path}' is not a Git repository.") # --- Pre-Push Checks --- needs_set_upstream: bool = False try: upstream: Optional[str] = self.git_commands.get_branch_upstream( repo_path, current_branch_name ) if upstream is None: needs_set_upstream = True log_handler.log_info( f"Upstream not set for '{current_branch_name}'. Will use --set-upstream.", func_name=func_name, ) else: log_handler.log_debug( f"Branch '{current_branch_name}' tracks '{upstream}'.", func_name=func_name, ) except GitCommandError as e: msg: str = ( f"Push aborted: Failed checking upstream for '{current_branch_name}': {e}" ) log_handler.log_error(msg, func_name=func_name) return {"status": "error", "message": msg, "exception": e} # --- Esecuzione Git Push --- result_info: Dict[str, Any] = { "status": "unknown", "message": "Push not completed.", "exception": None, } try: # Chiama git_push (check=False) push_result = self.git_commands.git_push( working_directory=repo_path, remote_name=remote_name, branch_name=current_branch_name, set_upstream=needs_set_upstream, force=force, ) # --- Analyze Result --- stdout_full: str = push_result.stdout if push_result.stdout else "" stderr_full: str = push_result.stderr if push_result.stderr else "" stderr_lower: str = stderr_full.lower() combined_output_lower: str = (stdout_full + stderr_full).lower() if push_result.returncode == 0: result_info["status"] = "success" if "everything up-to-date" in combined_output_lower: result_info["message"] = ( f"Push to '{remote_name}': Branch '{current_branch_name}' already up-to-date." ) log_handler.log_info( f"Push successful (already up-to-date) for '{current_branch_name}'.", func_name=func_name, ) else: result_info["message"] = ( f"Push branch '{current_branch_name}' to '{remote_name}' completed successfully." ) log_handler.log_info( f"Push successful for '{current_branch_name}'. Output logged.", func_name=func_name, ) elif push_result.returncode == 1 and ( "rejected" in stderr_lower or "failed to push some refs" in stderr_lower ): is_non_fast_forward: bool = ( "non-fast-forward" in stderr_lower or "updates were rejected because the remote contains work" in stderr_lower ) if is_non_fast_forward and not force: result_info["status"] = "rejected" result_info["message"] = ( f"Push rejected: Remote has changes you don't have locally for '{current_branch_name}'. Try pulling first." ) log_handler.log_error( f"Push rejected (non-fast-forward) for '{current_branch_name}'. User needs to pull.", func_name=func_name, ) elif force and is_non_fast_forward: result_info["status"] = ( "error" # Force push rejected is still an error ) result_info["message"] = ( f"FORCE PUSH rejected for '{current_branch_name}'. Reason: {stderr_full.strip()}" ) log_handler.log_error( f"FORCE PUSH rejected for '{current_branch_name}'. Stderr: {stderr_full.strip()}", func_name=func_name, ) result_info["exception"] = GitCommandError( result_info["message"], stderr=push_result.stderr ) else: # Other rejection reason result_info["status"] = "rejected" result_info["message"] = ( f"Push rejected for '{current_branch_name}'. Reason: {stderr_full.strip()}" ) log_handler.log_error( f"Push rejected for '{current_branch_name}'. Stderr: {stderr_full.strip()}", func_name=func_name, ) result_info["exception"] = GitCommandError( result_info["message"], stderr=push_result.stderr ) else: # Other Error result_info["status"] = "error" log_handler.log_error( f"Push command failed for '{current_branch_name}' (RC={push_result.returncode}). Stderr: {stderr_lower}", func_name=func_name, ) auth_errors = [ "authentication failed", "permission denied", "could not read username", "could not read password", ] conn_errors = [ "repository not found", "could not resolve host", "name or service not known", "network is unreachable", "failed to connect", "unable to access", ] if any(err in stderr_lower for err in auth_errors): result_info["message"] = ( f"Authentication required or failed for remote '{remote_name}' during push." ) elif any(err in stderr_lower for err in conn_errors): result_info["message"] = ( f"Connection failed for remote '{remote_name}' during push." ) elif ( f"src refspec {current_branch_name} does not match any" in stderr_lower ): result_info["message"] = ( f"Push failed: Local branch '{current_branch_name}' not found." ) else: result_info["message"] = ( f"Push to '{remote_name}' failed (RC={push_result.returncode}). Check logs." ) result_info["exception"] = GitCommandError( result_info["message"], stderr=push_result.stderr ) except (GitCommandError, ValueError) as e: log_handler.log_error( f"Error during push setup/execution for '{current_branch_name}': {e}", func_name=func_name, ) result_info = { "status": "error", "message": f"Push failed: {e}", "exception": e, } except Exception as e: log_handler.log_exception( f"Unexpected error during push for '{current_branch_name}': {e}", func_name=func_name, ) result_info = { "status": "error", "message": f"Unexpected push error: {type(e).__name__}", "exception": e, } return result_info def execute_push_tags(self, repo_path: str, remote_name: str) -> Dict[str, Any]: """ Executes 'git push --tags' to the specified remote. Uses log_handler. Args: repo_path (str): Path to the local repository. remote_name (str): The name of the remote to push tags to. Returns: Dict[str, Any]: Dictionary with 'status', 'message', optional 'exception'. """ func_name: str = "execute_push_tags" log_handler.log_info( f"Executing push tags to remote '{remote_name}' in '{repo_path}'", func_name=func_name, ) # --- Input Validation --- if not repo_path or not os.path.isdir(repo_path): raise ValueError(f"Invalid repository path: '{repo_path}'") if not remote_name or remote_name.isspace(): raise ValueError("Remote name cannot be empty.") if not os.path.exists(os.path.join(repo_path, ".git")): raise ValueError(f"Directory '{repo_path}' is not a Git repository.") # --- Esecuzione Git Push Tags --- result_info: Dict[str, Any] = { "status": "unknown", "message": "Push tags not completed.", "exception": None, } try: # Chiama git_push_tags (check=False) push_result = self.git_commands.git_push_tags(repo_path, remote_name) # --- Analyze Result --- stderr_full: str = push_result.stderr if push_result.stderr else "" stderr_lower: str = stderr_full.lower() stdout_lower: str = (push_result.stdout or "").lower() if push_result.returncode == 0: result_info["status"] = "success" if ( "everything up-to-date" in stdout_lower or "everything up-to-date" in stderr_lower ): result_info["message"] = ( f"Push tags to '{remote_name}': All tags already up-to-date." ) log_handler.log_info( f"Push tags successful (already up-to-date) for '{remote_name}'.", func_name=func_name, ) else: result_info["message"] = ( f"Push tags to '{remote_name}' completed successfully." ) log_handler.log_info( f"Push tags successful for '{remote_name}'. Output logged.", func_name=func_name, ) else: # Error result_info["status"] = "error" log_handler.log_error( f"Push tags command failed for '{remote_name}' (RC={push_result.returncode}). Stderr: {stderr_lower}", func_name=func_name, ) auth_errors = [ "authentication failed", "permission denied", "could not read username", "could not read password", ] conn_errors = [ "repository not found", "could not resolve host", "name or service not known", "network is unreachable", "failed to connect", "unable to access", ] if any(err in stderr_lower for err in auth_errors): result_info["message"] = ( f"Authentication required or failed for remote '{remote_name}' during push tags." ) elif any(err in stderr_lower for err in conn_errors): result_info["message"] = ( f"Connection failed for remote '{remote_name}' during push tags." ) else: result_info["message"] = ( f"Push tags to '{remote_name}' failed (RC={push_result.returncode}). Check logs." ) result_info["exception"] = GitCommandError( result_info["message"], stderr=push_result.stderr ) except (GitCommandError, ValueError) as e: log_handler.log_error( f"Error during push tags execution for '{remote_name}': {e}", func_name=func_name, ) result_info = { "status": "error", "message": f"Push tags failed: {e}", "exception": e, } except Exception as e: log_handler.log_exception( f"Unexpected error during push tags for '{remote_name}': {e}", func_name=func_name, ) result_info = { "status": "error", "message": f"Unexpected push tags error: {type(e).__name__}", "exception": e, } return result_info # --- END OF FILE gitsync_tool/core/remote_actions.py ---