# RepoSync/core/git_manager.py """ A manager class for executing Git commands via subprocess. """ import logging import os import re import subprocess from typing import List, Optional, Tuple, Dict from urllib.parse import urlparse, urlunparse class GitCommandError(Exception): def __init__(self, message: str, command: Optional[List[str]] = None, stderr: Optional[str] = None): super().__init__(message) self.command = command self.stderr = stderr or "" def __str__(self) -> str: base_message = super().__str__() if self.command: command_str = " ".join(map(str, self.command)) return f"{base_message} (Command: '{command_str}')" return base_message class GitManager: def __init__(self, logger: logging.Logger): self.logger = logger self.logger.debug("GitManager initialized.") def _execute(self, command: List[str], working_directory: str, check: bool = True, capture: bool = True, timeout: int = 120, suppress_errors: bool = False) -> subprocess.CompletedProcess: command_str = " ".join(command) if "ls-remote" not in command_str: self.logger.debug(f"Executing in '{working_directory}': {command_str}") if working_directory != "." and not os.path.isdir(working_directory): msg = f"Working directory does not exist: {working_directory}" if not suppress_errors: self.logger.error(msg) raise GitCommandError(msg, command=command) try: result = subprocess.run( command, cwd=working_directory, capture_output=capture, text=True, encoding="utf-8", errors="replace", check=False, timeout=timeout ) if check and result.returncode != 0: stderr_output = result.stderr.strip() if result.stderr else "No stderr output." error_message = f"Git command failed with exit code {result.returncode}.\nStderr: {stderr_output}" if not suppress_errors: self.logger.error(f"Command failed: {command_str}\n{error_message}") raise GitCommandError(error_message, command=command, stderr=result.stderr) if result.stdout and result.stdout.strip(): self.logger.debug(f"Command stdout: {result.stdout.strip()}") if result.stderr and result.stderr.strip() and "ls-remote" not in command_str: self.logger.debug(f"Command stderr: {result.stderr.strip()}") return result except FileNotFoundError as e: msg = f"Command '{command[0]}' not found. Is Git installed and in your system's PATH?" if not suppress_errors: self.logger.error(msg) raise GitCommandError(msg, command=command) from e except subprocess.TimeoutExpired as e: msg = f"Command timed out after {timeout}s: {command_str}" if not suppress_errors: self.logger.error(msg) raise GitCommandError(msg, command=command, stderr=e.stderr) from e except Exception as e: msg = f"An unexpected error occurred while executing git command: {e}" if not suppress_errors: self.logger.exception(msg) raise GitCommandError(msg, command=command) from e def init(self, repo_path: str): self.logger.info(f"Initializing empty Git repository at '{repo_path}'...") os.makedirs(repo_path, exist_ok=True) self._execute(["git", "init"], working_directory=repo_path) self.logger.info(f"Repository initialized at '{repo_path}'.") def checkout(self, repo_path: str, branch_name: str, create_new: bool = False): """Checks out a branch, optionally creating it.""" self.logger.info(f"Switching to branch '{branch_name}' in '{repo_path}'...") command = ["git", "checkout"] if create_new: command.extend(["-b", branch_name]) else: command.append(branch_name) self._execute(command, working_directory=repo_path) self.logger.info(f"Switched to branch '{branch_name}'.") def add(self, repo_path: str, file_pattern: str = "."): self.logger.debug(f"Staging files matching '{file_pattern}' in '{repo_path}'...") self._execute(["git", "add", file_pattern], working_directory=repo_path) def commit(self, repo_path: str, message: str): self.logger.info(f"Committing in '{repo_path}' with message: '{message}'") self._execute(["git", "commit", "-m", message], working_directory=repo_path) def clone(self, remote_url: str, local_path: str, token: Optional[str] = None): self.logger.info(f"Preparing to clone from '{remote_url}' into '{local_path}'...") parent_dir = os.path.dirname(local_path) if not os.path.exists(parent_dir): os.makedirs(parent_dir, exist_ok=True) url_to_clone = self.get_url_with_token(remote_url, token) if url_to_clone != remote_url: self.logger.debug("Using URL with injected token for cloning.") command = ["git", "clone", "--progress", url_to_clone, local_path] self._execute(command, working_directory=parent_dir, timeout=900) self.logger.info(f"Repository cloned successfully to '{local_path}'.") def create_git_bundle(self, repo_path: str, bundle_path: str): self.logger.info(f"Creating Git bundle file: {bundle_path}") command = ["git", "bundle", "create", bundle_path, "--all"] self._execute(command, working_directory=repo_path) if not os.path.exists(bundle_path) or os.path.getsize(bundle_path) == 0: error_msg = f"Bundle creation failed silently. Output file '{bundle_path}' is missing or empty." self.logger.error(error_msg) if os.path.exists(bundle_path): try: os.remove(bundle_path) except OSError: pass raise GitCommandError(error_msg, command=command) self.logger.info("Bundle created and verified successfully.") def get_branches_with_heads(self, repo_path: str) -> Dict[str, str]: self.logger.debug(f"Getting local branches and HEADs for '{repo_path}'...") command = ["git", "branch", "--format=%(refname:short) %(objectname)"] try: result = self._execute(command, working_directory=repo_path) branches = {} for line in result.stdout.strip().splitlines(): parts = line.split() if len(parts) == 2: branches[parts[0]] = parts[1] self.logger.debug(f"Found branches and heads: {branches}") return branches except GitCommandError as e: self.logger.error(f"Failed to get branches and heads for '{repo_path}': {e}") return {} def fetch(self, repo_path: str, remote: str = "origin", prune: bool = True): self.logger.info(f"Fetching from '{remote}'...") command = ["git", "fetch", remote] if prune and not os.path.isfile(remote): command.append("--prune") self._execute(command, working_directory=repo_path) self.logger.info(f"Fetch from '{remote}' complete.") def push(self, repo_path: str, remote: str, all_branches: bool = False, all_tags: bool = False, branch_to_push: Optional[str] = None): self.logger.info(f"Preparing to push to remote '{remote}'...") if branch_to_push: self.logger.info(f"Pushing branch '{branch_to_push}' to '{remote}'...") self._execute(["git", "push", remote, branch_to_push], working_directory=repo_path, timeout=300) self.logger.info(f"Branch '{branch_to_push}' pushed successfully.") if all_branches: self.logger.info(f"Pushing all branches to '{remote}'...") self._execute(["git", "push", remote, "--all"], working_directory=repo_path, timeout=300) self.logger.info("All branches pushed successfully.") if all_tags: self.logger.info(f"Pushing all tags to '{remote}'...") self._execute(["git", "push", remote, "--tags"], working_directory=repo_path, timeout=300) self.logger.info("All tags pushed successfully.") if not all_branches and not all_tags and not branch_to_push: self.logger.warning("Push command called without specifying what to push. No action taken.") def clone_from_bundle(self, bundle_path: str, destination_path: str): self.logger.info(f"Cloning from bundle '{bundle_path}' into '{destination_path}'...") parent_dir = os.path.dirname(destination_path) if not os.path.exists(parent_dir): os.makedirs(parent_dir, exist_ok=True) command = ["git", "clone", bundle_path, destination_path] self._execute(command, working_directory=".", timeout=300) self.logger.info(f"Repository cloned successfully from bundle to '{destination_path}'.") def add_remote(self, repo_path: str, name: str, url: str): self.logger.info(f"Adding remote '{name}' with URL '{url}' to '{repo_path}'") command = ["git", "remote", "add", name, url] self._execute(command, working_directory=repo_path) self.logger.info(f"Remote '{name}' added successfully.") def set_remote_url(self, repo_path: str, remote_name: str, url: str): self.logger.info(f"Setting URL for remote '{remote_name}' to '{url}' in '{repo_path}'") command = ["git", "remote", "set-url", remote_name, url] self._execute(command, working_directory=repo_path) self.logger.info("Remote URL updated successfully.") def get_url_with_token(self, remote_url: str, token: Optional[str] = None) -> str: if token and remote_url.startswith("http"): try: parsed_url = urlparse(remote_url) netloc_with_token = f"{token}@{parsed_url.hostname}" if parsed_url.port: netloc_with_token += f":{parsed_url.port}" return urlunparse(( parsed_url.scheme, netloc_with_token, parsed_url.path, parsed_url.params, parsed_url.query, parsed_url.fragment )) except Exception as e: self.logger.error(f"Failed to parse and inject token into URL: {e}. Using original URL.") return remote_url def check_remote_exists(self, remote_url: str, token: Optional[str] = None) -> bool: url_to_check = self.get_url_with_token(remote_url, token) command = ["git", "ls-remote", "--exit-code", "--quiet", url_to_check] try: self._execute( command, working_directory=".", capture=False, timeout=30, suppress_errors=True ) self.logger.debug(f"Remote '{remote_url}' exists.") return True except GitCommandError: self.logger.debug(f"Remote '{remote_url}' does not exist or is not accessible.") return False