# --- FILE: remote_actions.py --- import os import log_handler from git_commands import GitCommands, GitCommandError class RemoteActionHandler: """ Handles the execution logic for remote Git repository actions. """ def __init__(self, git_commands: GitCommands): """ Initializes the RemoteActionHandler. Args: git_commands (GitCommands): Instance for executing Git commands. Raises: TypeError: If git_commands argument is not a GitCommands instance. """ if not isinstance(git_commands, GitCommands): raise TypeError("RemoteActionHandler requires a GitCommands instance.") self.git_commands = git_commands log_handler.log_debug("RemoteActionHandler initialized.", func_name="__init__") def apply_remote_config(self, repo_path: str, remote_name: str, remote_url: str): """ Ensures the specified remote configuration exists in the local repository. Adds the remote if it doesn't exist, or updates the URL if the name exists but the URL is different. Args: repo_path (str): Path to the local repository. remote_name (str): The desired name for the remote (e.g., 'origin'). remote_url (str): The URL of the remote repository. Returns: bool: True if the configuration was applied successfully or already existed, False otherwise. Raises: ValueError: If input arguments are invalid. GitCommandError: If Git commands fail. """ func_name = "apply_remote_config" log_handler.log_info( f"Applying remote config in '{repo_path}': Name='{remote_name}', URL='{remote_url}'", func_name=func_name, ) # --- Input Validation --- if not repo_path or not os.path.isdir(repo_path): raise ValueError(f"Invalid repository path: '{repo_path}'") if not remote_name or remote_name.isspace(): raise ValueError("Remote name cannot be empty.") if not remote_url or remote_url.isspace(): # Allow removing remote by setting empty URL? Maybe not here. Assume URL is required. raise ValueError("Remote URL cannot be empty.") if not os.path.exists(os.path.join(repo_path, ".git")): raise ValueError(f"Directory '{repo_path}' is not a Git repository.") try: # Check existing remotes existing_remotes = self.git_commands.get_remotes( repo_path ) # Need to implement this in GitCommands if remote_name in existing_remotes: # Remote name exists, check if URL matches current_url = existing_remotes[remote_name] if current_url == remote_url: log_handler.log_info( f"Remote '{remote_name}' already configured correctly.", func_name=func_name, ) return True # Nothing to do else: # URL differs, update it log_handler.log_info( f"Updating URL for existing remote '{remote_name}'.", func_name=func_name, ) log_handler.log_debug( f" Old URL: {current_url}", func_name=func_name ) log_handler.log_debug( f" New URL: {remote_url}", func_name=func_name ) success = self.git_commands.set_remote_url( repo_path, remote_name, remote_url ) if success: log_handler.log_info( f"URL for remote '{remote_name}' updated successfully.", func_name=func_name, ) return True else: # set_remote_url should raise on error, but check anyway log_handler.log_error( f"Failed to update URL for remote '{remote_name}'.", func_name=func_name, ) return False else: # Remote name does not exist, add it log_handler.log_info( f"Adding new remote '{remote_name}' with URL '{remote_url}'.", func_name=func_name, ) success = self.git_commands.add_remote( repo_path, remote_name, remote_url ) if success: log_handler.log_info( f"Remote '{remote_name}' added successfully.", func_name=func_name, ) return True else: # add_remote should raise on error log_handler.log_error( f"Failed to add remote '{remote_name}'.", func_name=func_name ) return False except (GitCommandError, ValueError) as e: log_handler.log_error( f"Failed to apply remote config for '{remote_name}': {e}", func_name=func_name, ) raise # Re-raise the specific error for the caller (worker) to handle except Exception as e: log_handler.log_exception( f"Unexpected error applying remote config for '{remote_name}': {e}", func_name=func_name, ) # Wrap unexpected errors in GitCommandError or a custom one if needed raise GitCommandError(f"Unexpected error: {e}") from e # --- Placeholder for future remote methods --- def execute_remote_fetch(self, repo_path: str, remote_name: str) -> dict: """ Executes 'git fetch' for the specified remote. Args: repo_path (str): Path to the local repository. remote_name (str): The name of the remote to fetch (e.g., 'origin'). Returns: dict: A dictionary containing the status and potential error details. Example success: {'status': 'success', 'message': 'Fetch successful.'} Example error: {'status': 'error', 'message': 'Auth failed', 'exception': GitCommandError} Raises: ValueError: If input arguments are invalid. GitCommandError: Propagated from git_commands if fetch fails critically (though git_fetch is called with check=False). """ func_name = "execute_remote_fetch" log_handler.log_info( f"Executing fetch for remote '{remote_name}' in '{repo_path}'", func_name=func_name, ) # --- Input Validation --- if not repo_path or not os.path.isdir(repo_path): raise ValueError(f"Invalid repository path: '{repo_path}'") if not remote_name or remote_name.isspace(): raise ValueError("Remote name cannot be empty.") if not os.path.exists(os.path.join(repo_path, ".git")): raise ValueError(f"Directory '{repo_path}' is not a Git repository.") result_info = { "status": "unknown", "message": "Fetch not completed.", } # Default result try: # Chiama il metodo git_fetch (che ha check=False) fetch_result = self.git_commands.git_fetch( repo_path, remote_name, prune=True ) # Analizza il risultato del comando fetch if fetch_result.returncode == 0: # Successo result_info["status"] = "success" result_info["message"] = ( f"Fetch from '{remote_name}' completed successfully." ) # Potremmo analizzare stdout per dettagli, ma per ora basta il successo log_handler.log_info( f"Fetch successful for '{remote_name}'.", func_name=func_name ) else: # Errore durante il fetch: analizza stderr result_info["status"] = "error" stderr_lower = ( fetch_result.stderr.lower() if fetch_result.stderr else "" ) log_handler.log_error( f"Fetch command failed for '{remote_name}' (RC={fetch_result.returncode}). Stderr: {stderr_lower}", func_name=func_name, ) # Controlla errori specifici noti auth_errors = [ "authentication failed", "permission denied", "could not read username", "fatal: could not read password", ] connection_errors = [ "repository not found", "could not resolve host", "name or service not known", "network is unreachable", ] if any(err in stderr_lower for err in auth_errors): result_info["message"] = ( f"Authentication required or failed for remote '{remote_name}'." ) # Crea un'eccezione fittizia o riusa quella originale se possibile result_info["exception"] = GitCommandError( result_info["message"], stderr=fetch_result.stderr ) elif any(err in stderr_lower for err in connection_errors): result_info["message"] = ( f"Failed to connect to remote '{remote_name}': Repository or host not found/reachable." ) result_info["exception"] = GitCommandError( result_info["message"], stderr=fetch_result.stderr ) else: # Errore generico di Git result_info["message"] = ( f"Fetch from '{remote_name}' failed (RC={fetch_result.returncode}). Check logs." ) result_info["exception"] = GitCommandError( result_info["message"], stderr=fetch_result.stderr ) except (GitCommandError, ValueError) as e: # Cattura errori sollevati dalla validazione o da git_commands (se check=True fosse usato) log_handler.log_error( f"Error during fetch execution for '{remote_name}': {e}", func_name=func_name, ) result_info = { "status": "error", "message": f"Fetch failed: {e}", "exception": e, } # Non rilanciamo qui, il worker gestirà l'errore tramite il dizionario restituito except Exception as e: # Cattura errori imprevisti log_handler.log_exception( f"Unexpected error during fetch for '{remote_name}': {e}", func_name=func_name, ) result_info = { "status": "error", "message": f"Unexpected fetch error: {type(e).__name__}", "exception": e, } return result_info def execute_remote_pull(self, repo_path: str, remote_name: str, current_branch_name: str) -> dict: """ Executes 'git pull' for the specified remote and current branch. Detects success, merge conflicts, and other errors. Args: repo_path (str): Path to the local repository. remote_name (str): The name of the remote to pull from (e.g., 'origin'). current_branch_name (str): The name of the currently checked-out local branch. Returns: dict: A dictionary containing the status ('success', 'conflict', 'error'), a message, and optionally an exception. Example success: {'status': 'success', 'message': 'Pull successful.'} Example conflict: {'status': 'conflict', 'message': 'Merge conflict occurred.'} Example error: {'status': 'error', 'message': 'Auth failed', 'exception': GitCommandError} Raises: ValueError: If input arguments are invalid. """ func_name = "execute_remote_pull" log_handler.log_info( f"Executing pull from remote '{remote_name}' into branch '{current_branch_name}' in '{repo_path}'", func_name=func_name ) # --- Input Validation --- if not repo_path or not os.path.isdir(repo_path): raise ValueError(f"Invalid repository path: '{repo_path}'") if not remote_name or remote_name.isspace(): raise ValueError("Remote name cannot be empty.") if not current_branch_name or current_branch_name.isspace(): # Potremmo provare a ottenerlo se non fornito, ma è meglio che il chiamante lo sappia raise ValueError("Current branch name cannot be empty.") if not os.path.exists(os.path.join(repo_path, ".git")): raise ValueError(f"Directory '{repo_path}' is not a Git repository.") # --- Controllo Modifiche Locali (Pre-Pull Check) --- # È buona pratica non fare pull se ci sono modifiche non committate try: if self.git_commands.git_status_has_changes(repo_path): msg = "Pull aborted: Uncommitted changes detected in the working directory. Please commit or stash first." log_handler.log_warning(msg, func_name=func_name) # Restituisce un errore specifico per questo caso return {'status': 'error', 'message': msg, 'exception': ValueError(msg)} except GitCommandError as status_err: # Se il controllo dello stato fallisce, non procedere msg = f"Pull aborted: Failed to check repository status before pull: {status_err}" log_handler.log_error(msg, func_name=func_name) return {'status': 'error', 'message': msg, 'exception': status_err} # --- Esecuzione Git Pull --- result_info = {'status': 'unknown', 'message': 'Pull not completed.'} # Default try: # Chiama il metodo git_pull (che ha check=False) pull_result = self.git_commands.git_pull(repo_path, remote_name, current_branch_name) # Analizza il risultato del comando pull stdout_full = pull_result.stdout if pull_result.stdout else "" stderr_full = pull_result.stderr if pull_result.stderr else "" combined_output_lower = (stdout_full + stderr_full).lower() if pull_result.returncode == 0: # Successo result_info['status'] = 'success' if "already up to date" in combined_output_lower: result_info['message'] = f"Pull from '{remote_name}': Repository already up-to-date." log_handler.log_info(f"Pull successful (already up-to-date) for '{remote_name}'.", func_name=func_name) else: result_info['message'] = f"Pull from '{remote_name}' completed successfully." log_handler.log_info(f"Pull successful for '{remote_name}'. Output logged.", func_name=func_name) # L'output dettagliato è già loggato da log_and_execute a livello INFO # --- Rilevamento Conflitti (RC=1 e messaggi specifici) --- elif pull_result.returncode == 1 and ( "conflict" in combined_output_lower or "automatic merge failed" in combined_output_lower or "fix conflicts and then commit the result" in combined_output_lower ): result_info['status'] = 'conflict' # Stato specifico per conflitti result_info['message'] = f"Pull from '{remote_name}' resulted in merge conflicts. Please resolve them manually in '{repo_path}' and commit." log_handler.log_error(f"Merge conflict detected during pull from '{remote_name}'.", func_name=func_name) # Non impostiamo 'exception' qui, lo stato 'conflict' è l'informazione chiave else: # Altro Errore (RC != 0 e non è conflitto) result_info['status'] = 'error' stderr_lower = stderr_full.lower() # Usa solo stderr per errori specifici log_handler.log_error(f"Pull command failed for '{remote_name}' (RC={pull_result.returncode}). Stderr: {stderr_lower}", func_name=func_name) # Controlla errori specifici noti (simili a fetch) auth_errors = ["authentication failed", "permission denied", "could not read username", "fatal: could not read password"] connection_errors = ["repository not found", "could not resolve host", "name or service not known", "network is unreachable"] upstream_errors = ["no tracking information", "no upstream branch", "refusing to merge unrelated histories"] # Errori legati a config/history if any(err in stderr_lower for err in auth_errors): result_info['message'] = f"Authentication required or failed for remote '{remote_name}' during pull." result_info['exception'] = GitCommandError(result_info['message'], stderr=pull_result.stderr) elif any(err in stderr_lower for err in connection_errors): result_info['message'] = f"Failed to connect to remote '{remote_name}' during pull: Repository or host not found/reachable." result_info['exception'] = GitCommandError(result_info['message'], stderr=pull_result.stderr) elif any(err in stderr_lower for err in upstream_errors): result_info['message'] = f"Pull failed for '{remote_name}': Check branch upstream configuration or related history. Error: {pull_result.stderr.strip()}" result_info['exception'] = GitCommandError(result_info['message'], stderr=pull_result.stderr) else: # Errore generico di Git result_info['message'] = f"Pull from '{remote_name}' failed (RC={pull_result.returncode}). Check logs." result_info['exception'] = GitCommandError(result_info['message'], stderr=pull_result.stderr) except (GitCommandError, ValueError) as e: # Cattura errori dalla validazione iniziale o dal check dello stato log_handler.log_error(f"Error during pull execution for '{remote_name}': {e}", func_name=func_name) result_info = {'status': 'error', 'message': f"Pull failed: {e}", 'exception': e} except Exception as e: # Cattura errori imprevisti log_handler.log_exception(f"Unexpected error during pull for '{remote_name}': {e}", func_name=func_name) result_info = {'status': 'error', 'message': f"Unexpected pull error: {type(e).__name__}", 'exception': e} return result_info def execute_remote_push(self, repo_path: str, remote_name: str, branch_name: str): # To be implemented: Calls git_commands.git_push, handles upstream/errors pass def execute_push_tags(self, repo_path: str, remote_name: str): # To be implemented: Calls git_commands.git_push_tags pass # --- END OF FILE remote_actions.py ---