206 lines
11 KiB
Python
206 lines
11 KiB
Python
# RepoSync/core/git_manager.py
|
|
|
|
"""
|
|
A manager class for executing Git commands via subprocess.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
from typing import List, Optional, Tuple, Dict
|
|
from urllib.parse import urlparse, urlunparse
|
|
|
|
class GitCommandError(Exception):
|
|
def __init__(self, message: str, command: Optional[List[str]] = None, stderr: Optional[str] = None):
|
|
super().__init__(message)
|
|
self.command = command
|
|
self.stderr = stderr or ""
|
|
def __str__(self) -> str:
|
|
base_message = super().__str__()
|
|
if self.command:
|
|
command_str = " ".join(map(str, self.command))
|
|
return f"{base_message} (Command: '{command_str}')"
|
|
return base_message
|
|
|
|
class GitManager:
|
|
def __init__(self, logger: logging.Logger):
|
|
self.logger = logger
|
|
self.logger.debug("GitManager initialized.")
|
|
|
|
def _execute(self, command: List[str], working_directory: str, check: bool = True, capture: bool = True, timeout: int = 120, suppress_errors: bool = False) -> subprocess.CompletedProcess:
|
|
command_str = " ".join(command)
|
|
if "ls-remote" not in command_str:
|
|
self.logger.debug(f"Executing in '{working_directory}': {command_str}")
|
|
|
|
if working_directory != "." and not os.path.isdir(working_directory):
|
|
msg = f"Working directory does not exist: {working_directory}"
|
|
if not suppress_errors:
|
|
self.logger.error(msg)
|
|
raise GitCommandError(msg, command=command)
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
command, cwd=working_directory, capture_output=capture, text=True,
|
|
encoding="utf-8", errors="replace", check=False, timeout=timeout
|
|
)
|
|
|
|
if check and result.returncode != 0:
|
|
stderr_output = result.stderr.strip() if result.stderr else "No stderr output."
|
|
error_message = f"Git command failed with exit code {result.returncode}.\nStderr: {stderr_output}"
|
|
if not suppress_errors:
|
|
self.logger.error(f"Command failed: {command_str}\n{error_message}")
|
|
raise GitCommandError(error_message, command=command, stderr=result.stderr)
|
|
|
|
if result.stdout and result.stdout.strip(): self.logger.debug(f"Command stdout: {result.stdout.strip()}")
|
|
if result.stderr and result.stderr.strip() and "ls-remote" not in command_str: self.logger.debug(f"Command stderr: {result.stderr.strip()}")
|
|
|
|
return result
|
|
except FileNotFoundError as e:
|
|
msg = f"Command '{command[0]}' not found. Is Git installed and in your system's PATH?"
|
|
if not suppress_errors: self.logger.error(msg)
|
|
raise GitCommandError(msg, command=command) from e
|
|
except subprocess.TimeoutExpired as e:
|
|
msg = f"Command timed out after {timeout}s: {command_str}"
|
|
if not suppress_errors: self.logger.error(msg)
|
|
raise GitCommandError(msg, command=command, stderr=e.stderr) from e
|
|
except Exception as e:
|
|
msg = f"An unexpected error occurred while executing git command: {e}"
|
|
if not suppress_errors: self.logger.exception(msg)
|
|
raise GitCommandError(msg, command=command) from e
|
|
|
|
def init(self, repo_path: str):
|
|
self.logger.info(f"Initializing empty Git repository at '{repo_path}'...")
|
|
os.makedirs(repo_path, exist_ok=True)
|
|
self._execute(["git", "init"], working_directory=repo_path)
|
|
self.logger.info(f"Repository initialized at '{repo_path}'.")
|
|
|
|
def checkout(self, repo_path: str, branch_name: str, create_new: bool = False):
|
|
"""Checks out a branch, optionally creating it."""
|
|
self.logger.info(f"Switching to branch '{branch_name}' in '{repo_path}'...")
|
|
command = ["git", "checkout"]
|
|
if create_new:
|
|
command.extend(["-b", branch_name])
|
|
else:
|
|
command.append(branch_name)
|
|
self._execute(command, working_directory=repo_path)
|
|
self.logger.info(f"Switched to branch '{branch_name}'.")
|
|
|
|
def add(self, repo_path: str, file_pattern: str = "."):
|
|
self.logger.debug(f"Staging files matching '{file_pattern}' in '{repo_path}'...")
|
|
self._execute(["git", "add", file_pattern], working_directory=repo_path)
|
|
|
|
def commit(self, repo_path: str, message: str):
|
|
self.logger.info(f"Committing in '{repo_path}' with message: '{message}'")
|
|
self._execute(["git", "commit", "-m", message], working_directory=repo_path)
|
|
|
|
def clone(self, remote_url: str, local_path: str, token: Optional[str] = None):
|
|
self.logger.info(f"Preparing to clone from '{remote_url}' into '{local_path}'...")
|
|
parent_dir = os.path.dirname(local_path)
|
|
if not os.path.exists(parent_dir): os.makedirs(parent_dir, exist_ok=True)
|
|
url_to_clone = self.get_url_with_token(remote_url, token)
|
|
if url_to_clone != remote_url: self.logger.debug("Using URL with injected token for cloning.")
|
|
command = ["git", "clone", "--progress", url_to_clone, local_path]
|
|
self._execute(command, working_directory=parent_dir, timeout=900)
|
|
self.logger.info(f"Repository cloned successfully to '{local_path}'.")
|
|
|
|
def create_git_bundle(self, repo_path: str, bundle_path: str):
|
|
self.logger.info(f"Creating Git bundle file: {bundle_path}")
|
|
command = ["git", "bundle", "create", bundle_path, "--all"]
|
|
self._execute(command, working_directory=repo_path)
|
|
if not os.path.exists(bundle_path) or os.path.getsize(bundle_path) == 0:
|
|
error_msg = f"Bundle creation failed silently. Output file '{bundle_path}' is missing or empty."
|
|
self.logger.error(error_msg)
|
|
if os.path.exists(bundle_path):
|
|
try: os.remove(bundle_path)
|
|
except OSError: pass
|
|
raise GitCommandError(error_msg, command=command)
|
|
self.logger.info("Bundle created and verified successfully.")
|
|
|
|
def get_branches_with_heads(self, repo_path: str) -> Dict[str, str]:
|
|
self.logger.debug(f"Getting local branches and HEADs for '{repo_path}'...")
|
|
command = ["git", "branch", "--format=%(refname:short) %(objectname)"]
|
|
try:
|
|
result = self._execute(command, working_directory=repo_path)
|
|
branches = {}
|
|
for line in result.stdout.strip().splitlines():
|
|
parts = line.split()
|
|
if len(parts) == 2: branches[parts[0]] = parts[1]
|
|
self.logger.debug(f"Found branches and heads: {branches}")
|
|
return branches
|
|
except GitCommandError as e:
|
|
self.logger.error(f"Failed to get branches and heads for '{repo_path}': {e}")
|
|
return {}
|
|
|
|
def fetch(self, repo_path: str, remote: str = "origin", prune: bool = True):
|
|
self.logger.info(f"Fetching from '{remote}'...")
|
|
command = ["git", "fetch", remote]
|
|
if prune and not os.path.isfile(remote): command.append("--prune")
|
|
self._execute(command, working_directory=repo_path)
|
|
self.logger.info(f"Fetch from '{remote}' complete.")
|
|
|
|
def push(self, repo_path: str, remote: str, all_branches: bool = False, all_tags: bool = False, branch_to_push: Optional[str] = None):
|
|
self.logger.info(f"Preparing to push to remote '{remote}'...")
|
|
if branch_to_push:
|
|
self.logger.info(f"Pushing branch '{branch_to_push}' to '{remote}'...")
|
|
self._execute(["git", "push", remote, branch_to_push], working_directory=repo_path, timeout=300)
|
|
self.logger.info(f"Branch '{branch_to_push}' pushed successfully.")
|
|
if all_branches:
|
|
self.logger.info(f"Pushing all branches to '{remote}'...")
|
|
self._execute(["git", "push", remote, "--all"], working_directory=repo_path, timeout=300)
|
|
self.logger.info("All branches pushed successfully.")
|
|
if all_tags:
|
|
self.logger.info(f"Pushing all tags to '{remote}'...")
|
|
self._execute(["git", "push", remote, "--tags"], working_directory=repo_path, timeout=300)
|
|
self.logger.info("All tags pushed successfully.")
|
|
if not all_branches and not all_tags and not branch_to_push:
|
|
self.logger.warning("Push command called without specifying what to push. No action taken.")
|
|
|
|
def clone_from_bundle(self, bundle_path: str, destination_path: str):
|
|
self.logger.info(f"Cloning from bundle '{bundle_path}' into '{destination_path}'...")
|
|
parent_dir = os.path.dirname(destination_path)
|
|
if not os.path.exists(parent_dir):
|
|
os.makedirs(parent_dir, exist_ok=True)
|
|
command = ["git", "clone", bundle_path, destination_path]
|
|
self._execute(command, working_directory=".", timeout=300)
|
|
self.logger.info(f"Repository cloned successfully from bundle to '{destination_path}'.")
|
|
|
|
def add_remote(self, repo_path: str, name: str, url: str):
|
|
self.logger.info(f"Adding remote '{name}' with URL '{url}' to '{repo_path}'")
|
|
command = ["git", "remote", "add", name, url]
|
|
self._execute(command, working_directory=repo_path)
|
|
self.logger.info(f"Remote '{name}' added successfully.")
|
|
|
|
def set_remote_url(self, repo_path: str, remote_name: str, url: str):
|
|
self.logger.info(f"Setting URL for remote '{remote_name}' to '{url}' in '{repo_path}'")
|
|
command = ["git", "remote", "set-url", remote_name, url]
|
|
self._execute(command, working_directory=repo_path)
|
|
self.logger.info("Remote URL updated successfully.")
|
|
|
|
def get_url_with_token(self, remote_url: str, token: Optional[str] = None) -> str:
|
|
if token and remote_url.startswith("http"):
|
|
try:
|
|
parsed_url = urlparse(remote_url)
|
|
netloc_with_token = f"{token}@{parsed_url.hostname}"
|
|
if parsed_url.port: netloc_with_token += f":{parsed_url.port}"
|
|
return urlunparse((
|
|
parsed_url.scheme, netloc_with_token, parsed_url.path,
|
|
parsed_url.params, parsed_url.query, parsed_url.fragment
|
|
))
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to parse and inject token into URL: {e}. Using original URL.")
|
|
return remote_url
|
|
|
|
def check_remote_exists(self, remote_url: str, token: Optional[str] = None) -> bool:
|
|
url_to_check = self.get_url_with_token(remote_url, token)
|
|
command = ["git", "ls-remote", "--exit-code", "--quiet", url_to_check]
|
|
try:
|
|
self._execute(
|
|
command, working_directory=".", capture=False,
|
|
timeout=30, suppress_errors=True
|
|
)
|
|
self.logger.debug(f"Remote '{remote_url}' exists.")
|
|
return True
|
|
except GitCommandError:
|
|
self.logger.debug(f"Remote '{remote_url}' does not exist or is not accessible.")
|
|
return False |