SXXXXXXX_RepoSync/reposync/core/git_manager.py

176 lines
7.3 KiB
Python

# RepoSync/core/git_manager.py
"""
A manager class for executing Git commands via subprocess.
This class provides a high-level interface for common Git operations,
handling command execution, error wrapping, and output parsing.
It relies on a logger instance provided during initialization.
"""
import logging
import os
import re
import subprocess
from typing import List, Optional, Tuple, Dict
from urllib.parse import urlparse, urlunparse
# --- Custom Exception Definition ---
class GitCommandError(Exception):
"""Custom exception for Git command execution errors."""
def __init__(
self,
message: str,
command: Optional[List[str]] = None,
stderr: Optional[str] = None
):
super().__init__(message)
self.command = command
self.stderr = stderr or ""
def __str__(self) -> str:
base_message = super().__str__()
if self.command:
command_str = " ".join(map(str, self.command))
return f"{base_message} (Command: '{command_str}')"
return base_message
# --- Main Git Manager Class ---
class GitManager:
"""
Manages Git command execution, error handling, and output parsing.
"""
def __init__(self, logger: logging.Logger):
"""
Initializes the GitManager with a logger instance.
"""
self.logger = logger
self.logger.debug("GitManager initialized.")
def _execute(
self,
command: List[str],
working_directory: str,
check: bool = True,
capture: bool = True,
timeout: int = 120,
) -> subprocess.CompletedProcess:
"""
Executes a shell command, logs details, and handles errors.
"""
command_str = " ".join(command)
self.logger.debug(f"Executing in '{working_directory}': {command_str}")
if working_directory != "." and not os.path.isdir(working_directory):
msg = f"Working directory does not exist: {working_directory}"
self.logger.error(msg)
raise GitCommandError(msg, command=command)
try:
result = subprocess.run(
command,
cwd=working_directory,
capture_output=capture,
text=True,
encoding="utf-8",
errors="replace",
check=False,
timeout=timeout,
)
if check and result.returncode != 0:
error_message = f"Git command failed with exit code {result.returncode}.\nStderr: {result.stderr.strip()}"
self.logger.error(f"Command failed: {command_str}\n{error_message}")
raise GitCommandError(error_message, command=command, stderr=result.stderr)
if result.stdout and result.stdout.strip(): self.logger.debug(f"Command stdout: {result.stdout.strip()}")
if result.stderr and result.stderr.strip(): self.logger.debug(f"Command stderr: {result.stderr.strip()}")
return result
except FileNotFoundError as e:
msg = f"Command '{command[0]}' not found. Is Git installed and in your system's PATH?"
self.logger.error(msg)
raise GitCommandError(msg, command=command) from e
except subprocess.TimeoutExpired as e:
msg = f"Command timed out after {timeout}s: {command_str}"
self.logger.error(msg)
raise GitCommandError(msg, command=command, stderr=e.stderr) from e
except Exception as e:
msg = f"An unexpected error occurred while executing git command: {e}"
self.logger.exception(msg)
raise GitCommandError(msg, command=command) from e
def clone(self, remote_url: str, local_path: str, token: Optional[str] = None, timeout: int = 300):
"""
Clones a repository from a remote URL to a local path.
"""
self.logger.info(f"Preparing to clone from '{remote_url}' into '{local_path}'...")
parent_dir = os.path.dirname(local_path)
if not os.path.exists(parent_dir): os.makedirs(parent_dir, exist_ok=True)
url_to_clone = remote_url
if token and remote_url.startswith("http"):
try:
parsed_url = urlparse(remote_url)
netloc_with_token = f"{token}@{parsed_url.hostname}"
if parsed_url.port: netloc_with_token += f":{parsed_url.port}"
url_to_clone = urlunparse((parsed_url.scheme, netloc_with_token, parsed_url.path, parsed_url.params, parsed_url.query, parsed_url.fragment))
self.logger.debug("Using URL with injected token for cloning.")
except Exception as e:
self.logger.error(f"Failed to parse and inject token into URL: {e}. Using original URL.")
url_to_clone = remote_url
command = ["git", "clone", "--progress", url_to_clone, local_path]
self._execute(command, working_directory=parent_dir, timeout=timeout)
self.logger.info(f"Repository cloned successfully to '{local_path}'.")
def create_git_bundle(self, repo_path: str, bundle_path: str):
"""Creates a Git bundle file containing all refs."""
self.logger.info(f"Creating Git bundle file: {bundle_path}")
command = ["git", "bundle", "create", bundle_path, "--all"]
self._execute(command, working_directory=repo_path)
self.logger.info(f"Bundle created successfully: {bundle_path}")
def get_branches_with_heads(self, repo_path: str) -> Dict[str, str]:
"""Gets a dictionary of local branches and their corresponding HEAD commit hashes."""
self.logger.debug(f"Getting local branches and HEADs for '{repo_path}'...")
command = ["git", "branch", "--format=%(refname:short) %(objectname)"]
try:
result = self._execute(command, working_directory=repo_path)
branches = {}
for line in result.stdout.strip().splitlines():
parts = line.split()
if len(parts) == 2:
branch_name, commit_hash = parts
branches[branch_name] = commit_hash
self.logger.debug(f"Found branches and heads: {branches}")
return branches
except GitCommandError as e:
self.logger.error(f"Failed to get branches and heads for '{repo_path}': {e}")
return {}
def fetch(self, repo_path: str, remote: str = "origin", prune: bool = True):
"""Fetches updates from a remote or a bundle file."""
self.logger.info(f"Fetching from '{remote}'...")
command = ["git", "fetch", remote]
if prune and os.path.isdir(remote): command.append("--prune")
self._execute(command, working_directory=repo_path)
self.logger.info(f"Fetch from '{remote}' complete.")
def push(self, repo_path: str, remote: str, all_branches: bool = False, all_tags: bool = False):
"""Pushes changes to a remote."""
self.logger.info(f"Pushing to remote '{remote}'...")
command = ["git", "push", remote]
if all_branches: command.append("--all")
if all_tags: command.append("--tags")
if not all_branches and not all_tags:
self.logger.warning("Push command called without specifying what to push. Use all_branches or all_tags.")
return
self._execute(command, working_directory=repo_path, timeout=300)
self.logger.info("Push complete.")