SXXXXXXX_RepoSync/reposync/core/git_manager.py
2025-07-10 15:38:26 +02:00

160 lines
8.7 KiB
Python

# RepoSync/core/git_manager.py
"""
A manager class for executing Git commands via subprocess.
"""
import logging
import os
import re
import subprocess
from typing import List, Optional, Tuple, Dict
from urllib.parse import urlparse, urlunparse
class GitCommandError(Exception):
def __init__(self, message: str, command: Optional[List[str]] = None, stderr: Optional[str] = None):
super().__init__(message)
self.command = command
self.stderr = stderr or ""
def __str__(self) -> str:
base_message = super().__str__()
if self.command:
command_str = " ".join(map(str, self.command))
return f"{base_message} (Command: '{command_str}')"
return base_message
class GitManager:
def __init__(self, logger: logging.Logger):
self.logger = logger
self.logger.debug("GitManager initialized.")
def _execute(self, command: List[str], working_directory: str, check: bool = True, capture: bool = True, timeout: int = 120) -> subprocess.CompletedProcess:
command_str = " ".join(command)
self.logger.debug(f"Executing in '{working_directory}': {command_str}")
if working_directory != "." and not os.path.isdir(working_directory):
msg = f"Working directory does not exist: {working_directory}"
self.logger.error(msg)
raise GitCommandError(msg, command=command)
try:
result = subprocess.run(command, cwd=working_directory, capture_output=capture, text=True, encoding="utf-8", errors="replace", check=False, timeout=timeout)
if check and result.returncode != 0:
error_message = f"Git command failed with exit code {result.returncode}.\nStderr: {result.stderr.strip()}"
self.logger.error(f"Command failed: {command_str}\n{error_message}")
raise GitCommandError(error_message, command=command, stderr=result.stderr)
if result.stdout and result.stdout.strip(): self.logger.debug(f"Command stdout: {result.stdout.strip()}")
if result.stderr and result.stderr.strip(): self.logger.debug(f"Command stderr: {result.stderr.strip()}")
return result
except FileNotFoundError as e:
msg = f"Command '{command[0]}' not found. Is Git installed and in your system's PATH?"; self.logger.error(msg)
raise GitCommandError(msg, command=command) from e
except subprocess.TimeoutExpired as e:
msg = f"Command timed out after {timeout}s: {command_str}"; self.logger.error(msg)
raise GitCommandError(msg, command=command, stderr=e.stderr) from e
except Exception as e:
msg = f"An unexpected error occurred while executing git command: {e}"; self.logger.exception(msg)
raise GitCommandError(msg, command=command) from e
def clone(self, remote_url: str, local_path: str, token: Optional[str] = None):
self.logger.info(f"Preparing to clone from '{remote_url}' into '{local_path}'...")
parent_dir = os.path.dirname(local_path)
if not os.path.exists(parent_dir): os.makedirs(parent_dir, exist_ok=True)
url_to_clone = remote_url
if token and remote_url.startswith("http"):
try:
parsed_url = urlparse(remote_url)
netloc_with_token = f"{token}@{parsed_url.hostname}"
if parsed_url.port: netloc_with_token += f":{parsed_url.port}"
url_to_clone = urlunparse((parsed_url.scheme, netloc_with_token, parsed_url.path, parsed_url.params, parsed_url.query, parsed_url.fragment))
self.logger.debug("Using URL with injected token for cloning.")
except Exception as e:
self.logger.error(f"Failed to parse and inject token into URL: {e}. Using original URL.")
command = ["git", "clone", "--progress", url_to_clone, local_path]
self._execute(command, working_directory=parent_dir, timeout=900)
self.logger.info(f"Repository cloned successfully to '{local_path}'.")
def create_git_bundle(self, repo_path: str, bundle_path: str):
"""Creates a Git bundle and verifies that the output file is valid."""
self.logger.info(f"Creating Git bundle file: {bundle_path}")
command = ["git", "bundle", "create", bundle_path, "--all"]
self._execute(command, working_directory=repo_path)
if not os.path.exists(bundle_path) or os.path.getsize(bundle_path) == 0:
error_msg = f"Bundle creation failed silently. Output file '{bundle_path}' is missing or empty."
self.logger.error(error_msg)
if os.path.exists(bundle_path):
try: os.remove(bundle_path)
except OSError: pass
raise GitCommandError(error_msg, command=command)
self.logger.info("Bundle created and verified successfully.")
def get_branches_with_heads(self, repo_path: str) -> Dict[str, str]:
self.logger.debug(f"Getting local branches and HEADs for '{repo_path}'...")
command = ["git", "branch", "--format=%(refname:short) %(objectname)"]
try:
result = self._execute(command, working_directory=repo_path)
branches = {}
for line in result.stdout.strip().splitlines():
parts = line.split()
if len(parts) == 2: branches[parts[0]] = parts[1]
self.logger.debug(f"Found branches and heads: {branches}")
return branches
except GitCommandError as e:
self.logger.error(f"Failed to get branches and heads for '{repo_path}': {e}")
return {}
def fetch(self, repo_path: str, remote: str = "origin", prune: bool = True):
self.logger.info(f"Fetching from '{remote}'...")
command = ["git", "fetch", remote]
if prune and not os.path.isfile(remote): command.append("--prune")
self._execute(command, working_directory=repo_path)
self.logger.info(f"Fetch from '{remote}' complete.")
def push(self, repo_path: str, remote: str, all_branches: bool = False, all_tags: bool = False):
self.logger.info(f"Preparing to push to remote '{remote}'...")
if all_branches:
self.logger.info(f"Pushing all branches to '{remote}'...")
self._execute(["git", "push", remote, "--all"], working_directory=repo_path, timeout=300)
self.logger.info("All branches pushed successfully.")
if all_tags:
self.logger.info(f"Pushing all tags to '{remote}'...")
self._execute(["git", "push", remote, "--tags"], working_directory=repo_path, timeout=300)
self.logger.info("All tags pushed successfully.")
if not all_branches and not all_tags:
self.logger.warning("Push command called without specifying what to push. No action taken.")
# --- METODI AGGIUNTI ---
def clone_from_bundle(self, bundle_path: str, destination_path: str):
"""Clones a repository directly from a local .bundle file."""
self.logger.info(f"Cloning from bundle '{bundle_path}' into '{destination_path}'...")
parent_dir = os.path.dirname(destination_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir, exist_ok=True)
# The working directory for this command doesn't matter much,
# as we are using absolute paths for source and destination.
command = ["git", "clone", bundle_path, destination_path]
self._execute(command, working_directory=".", timeout=300)
self.logger.info(f"Repository cloned successfully from bundle to '{destination_path}'.")
def set_remote_url(self, repo_path: str, remote_name: str, url: str):
"""Sets the URL for an existing remote."""
self.logger.info(f"Setting URL for remote '{remote_name}' to '{url}' in '{repo_path}'")
command = ["git", "remote", "set-url", remote_name, url]
self._execute(command, working_directory=repo_path)
self.logger.info("Remote URL updated successfully.")
def get_url_with_token(self, remote_url: str, token: Optional[str] = None) -> str:
"""Helper to inject a token into an HTTPS URL for authentication."""
if token and remote_url.startswith("http"):
try:
parsed_url = urlparse(remote_url)
netloc_with_token = f"{token}@{parsed_url.hostname}"
if parsed_url.port:
netloc_with_token += f":{parsed_url.port}"
return urlunparse((
parsed_url.scheme, netloc_with_token, parsed_url.path,
parsed_url.params, parsed_url.query, parsed_url.fragment
))
except Exception as e:
self.logger.error(f"Failed to parse and inject token into URL: {e}. Using original URL.")
return remote_url