160 lines
8.7 KiB
Python
160 lines
8.7 KiB
Python
# RepoSync/core/git_manager.py
|
|
|
|
"""
|
|
A manager class for executing Git commands via subprocess.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
from typing import List, Optional, Tuple, Dict
|
|
from urllib.parse import urlparse, urlunparse
|
|
|
|
class GitCommandError(Exception):
|
|
def __init__(self, message: str, command: Optional[List[str]] = None, stderr: Optional[str] = None):
|
|
super().__init__(message)
|
|
self.command = command
|
|
self.stderr = stderr or ""
|
|
def __str__(self) -> str:
|
|
base_message = super().__str__()
|
|
if self.command:
|
|
command_str = " ".join(map(str, self.command))
|
|
return f"{base_message} (Command: '{command_str}')"
|
|
return base_message
|
|
|
|
class GitManager:
|
|
def __init__(self, logger: logging.Logger):
|
|
self.logger = logger
|
|
self.logger.debug("GitManager initialized.")
|
|
|
|
def _execute(self, command: List[str], working_directory: str, check: bool = True, capture: bool = True, timeout: int = 120) -> subprocess.CompletedProcess:
|
|
command_str = " ".join(command)
|
|
self.logger.debug(f"Executing in '{working_directory}': {command_str}")
|
|
if working_directory != "." and not os.path.isdir(working_directory):
|
|
msg = f"Working directory does not exist: {working_directory}"
|
|
self.logger.error(msg)
|
|
raise GitCommandError(msg, command=command)
|
|
try:
|
|
result = subprocess.run(command, cwd=working_directory, capture_output=capture, text=True, encoding="utf-8", errors="replace", check=False, timeout=timeout)
|
|
if check and result.returncode != 0:
|
|
error_message = f"Git command failed with exit code {result.returncode}.\nStderr: {result.stderr.strip()}"
|
|
self.logger.error(f"Command failed: {command_str}\n{error_message}")
|
|
raise GitCommandError(error_message, command=command, stderr=result.stderr)
|
|
if result.stdout and result.stdout.strip(): self.logger.debug(f"Command stdout: {result.stdout.strip()}")
|
|
if result.stderr and result.stderr.strip(): self.logger.debug(f"Command stderr: {result.stderr.strip()}")
|
|
return result
|
|
except FileNotFoundError as e:
|
|
msg = f"Command '{command[0]}' not found. Is Git installed and in your system's PATH?"; self.logger.error(msg)
|
|
raise GitCommandError(msg, command=command) from e
|
|
except subprocess.TimeoutExpired as e:
|
|
msg = f"Command timed out after {timeout}s: {command_str}"; self.logger.error(msg)
|
|
raise GitCommandError(msg, command=command, stderr=e.stderr) from e
|
|
except Exception as e:
|
|
msg = f"An unexpected error occurred while executing git command: {e}"; self.logger.exception(msg)
|
|
raise GitCommandError(msg, command=command) from e
|
|
|
|
def clone(self, remote_url: str, local_path: str, token: Optional[str] = None):
|
|
self.logger.info(f"Preparing to clone from '{remote_url}' into '{local_path}'...")
|
|
parent_dir = os.path.dirname(local_path)
|
|
if not os.path.exists(parent_dir): os.makedirs(parent_dir, exist_ok=True)
|
|
url_to_clone = remote_url
|
|
if token and remote_url.startswith("http"):
|
|
try:
|
|
parsed_url = urlparse(remote_url)
|
|
netloc_with_token = f"{token}@{parsed_url.hostname}"
|
|
if parsed_url.port: netloc_with_token += f":{parsed_url.port}"
|
|
url_to_clone = urlunparse((parsed_url.scheme, netloc_with_token, parsed_url.path, parsed_url.params, parsed_url.query, parsed_url.fragment))
|
|
self.logger.debug("Using URL with injected token for cloning.")
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to parse and inject token into URL: {e}. Using original URL.")
|
|
command = ["git", "clone", "--progress", url_to_clone, local_path]
|
|
self._execute(command, working_directory=parent_dir, timeout=300)
|
|
self.logger.info(f"Repository cloned successfully to '{local_path}'.")
|
|
|
|
def create_git_bundle(self, repo_path: str, bundle_path: str):
|
|
"""Creates a Git bundle and verifies that the output file is valid."""
|
|
self.logger.info(f"Creating Git bundle file: {bundle_path}")
|
|
command = ["git", "bundle", "create", bundle_path, "--all"]
|
|
self._execute(command, working_directory=repo_path)
|
|
if not os.path.exists(bundle_path) or os.path.getsize(bundle_path) == 0:
|
|
error_msg = f"Bundle creation failed silently. Output file '{bundle_path}' is missing or empty."
|
|
self.logger.error(error_msg)
|
|
if os.path.exists(bundle_path):
|
|
try: os.remove(bundle_path)
|
|
except OSError: pass
|
|
raise GitCommandError(error_msg, command=command)
|
|
self.logger.info("Bundle created and verified successfully.")
|
|
|
|
def get_branches_with_heads(self, repo_path: str) -> Dict[str, str]:
|
|
self.logger.debug(f"Getting local branches and HEADs for '{repo_path}'...")
|
|
command = ["git", "branch", "--format=%(refname:short) %(objectname)"]
|
|
try:
|
|
result = self._execute(command, working_directory=repo_path)
|
|
branches = {}
|
|
for line in result.stdout.strip().splitlines():
|
|
parts = line.split()
|
|
if len(parts) == 2: branches[parts[0]] = parts[1]
|
|
self.logger.debug(f"Found branches and heads: {branches}")
|
|
return branches
|
|
except GitCommandError as e:
|
|
self.logger.error(f"Failed to get branches and heads for '{repo_path}': {e}")
|
|
return {}
|
|
|
|
def fetch(self, repo_path: str, remote: str = "origin", prune: bool = True):
|
|
self.logger.info(f"Fetching from '{remote}'...")
|
|
command = ["git", "fetch", remote]
|
|
if prune and not os.path.isfile(remote): command.append("--prune")
|
|
self._execute(command, working_directory=repo_path)
|
|
self.logger.info(f"Fetch from '{remote}' complete.")
|
|
|
|
def push(self, repo_path: str, remote: str, all_branches: bool = False, all_tags: bool = False):
|
|
self.logger.info(f"Preparing to push to remote '{remote}'...")
|
|
if all_branches:
|
|
self.logger.info(f"Pushing all branches to '{remote}'...")
|
|
self._execute(["git", "push", remote, "--all"], working_directory=repo_path, timeout=300)
|
|
self.logger.info("All branches pushed successfully.")
|
|
if all_tags:
|
|
self.logger.info(f"Pushing all tags to '{remote}'...")
|
|
self._execute(["git", "push", remote, "--tags"], working_directory=repo_path, timeout=300)
|
|
self.logger.info("All tags pushed successfully.")
|
|
if not all_branches and not all_tags:
|
|
self.logger.warning("Push command called without specifying what to push. No action taken.")
|
|
|
|
# --- METODI AGGIUNTI ---
|
|
|
|
def clone_from_bundle(self, bundle_path: str, destination_path: str):
|
|
"""Clones a repository directly from a local .bundle file."""
|
|
self.logger.info(f"Cloning from bundle '{bundle_path}' into '{destination_path}'...")
|
|
parent_dir = os.path.dirname(destination_path)
|
|
if not os.path.exists(parent_dir):
|
|
os.makedirs(parent_dir, exist_ok=True)
|
|
|
|
# The working directory for this command doesn't matter much,
|
|
# as we are using absolute paths for source and destination.
|
|
command = ["git", "clone", bundle_path, destination_path]
|
|
self._execute(command, working_directory=".", timeout=300)
|
|
self.logger.info(f"Repository cloned successfully from bundle to '{destination_path}'.")
|
|
|
|
def set_remote_url(self, repo_path: str, remote_name: str, url: str):
|
|
"""Sets the URL for an existing remote."""
|
|
self.logger.info(f"Setting URL for remote '{remote_name}' to '{url}' in '{repo_path}'")
|
|
command = ["git", "remote", "set-url", remote_name, url]
|
|
self._execute(command, working_directory=repo_path)
|
|
self.logger.info("Remote URL updated successfully.")
|
|
|
|
def get_url_with_token(self, remote_url: str, token: Optional[str] = None) -> str:
|
|
"""Helper to inject a token into an HTTPS URL for authentication."""
|
|
if token and remote_url.startswith("http"):
|
|
try:
|
|
parsed_url = urlparse(remote_url)
|
|
netloc_with_token = f"{token}@{parsed_url.hostname}"
|
|
if parsed_url.port:
|
|
netloc_with_token += f":{parsed_url.port}"
|
|
return urlunparse((
|
|
parsed_url.scheme, netloc_with_token, parsed_url.path,
|
|
parsed_url.params, parsed_url.query, parsed_url.fragment
|
|
))
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to parse and inject token into URL: {e}. Using original URL.")
|
|
return remote_url |