2584 lines
106 KiB
Python
2584 lines
106 KiB
Python
# --- FILE: gitsync_tool/commands/git_commands.py ---
|
|
|
|
import os
|
|
import subprocess
|
|
import logging # Usato solo per i livelli di logging (es. logging.INFO)
|
|
import re
|
|
from typing import Tuple, Dict, List, Optional, Union, Any # Aggiunto Union
|
|
|
|
from gitutility.logging_setup import log_handler
|
|
|
|
|
|
|
|
# --- Custom Exception Definition ---
|
|
class GitCommandError(Exception):
|
|
"""
|
|
Custom exception for handling Git command execution errors.
|
|
Includes the original command and stderr details if available.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
message: str,
|
|
command: Optional[List[str]] = None,
|
|
stderr: Optional[str] = None,
|
|
):
|
|
super().__init__(str(message))
|
|
self.command: Optional[List[str]] = command
|
|
self.stderr: str = stderr if stderr is not None else ""
|
|
|
|
def __str__(self) -> str:
|
|
base_message: str = super().__str__()
|
|
details: List[str] = []
|
|
if self.command:
|
|
safe_command_parts: List[str] = [str(part) for part in self.command]
|
|
command_str: str = " ".join(safe_command_parts)
|
|
details.append(f"Command: '{command_str}'")
|
|
|
|
stderr_str: str = self.stderr.strip()
|
|
if stderr_str:
|
|
max_stderr_len = 150
|
|
truncated_stderr = (
|
|
(stderr_str[:max_stderr_len] + "...")
|
|
if len(stderr_str) > max_stderr_len
|
|
else stderr_str
|
|
)
|
|
details.append(f"Stderr: {truncated_stderr}")
|
|
|
|
if details:
|
|
details_str: str = "; ".join(details)
|
|
return f"{base_message} ({details_str})"
|
|
else:
|
|
return base_message
|
|
|
|
|
|
# --- Main Git Commands Class ---
|
|
class GitCommands:
|
|
"""
|
|
Manages Git command execution, logging (via log_handler), error handling,
|
|
and parsing of command output.
|
|
"""
|
|
|
|
def __init__(
|
|
self, logger_ignored: Optional[Any] = None
|
|
): # Accetta argomento ma lo ignora
|
|
"""Initializes the GitCommands class."""
|
|
# self.logger non è più usato
|
|
log_handler.log_debug("GitCommands initialized.", func_name="__init__")
|
|
|
|
def log_and_execute(
|
|
self,
|
|
command: List[str],
|
|
working_directory: str,
|
|
check: bool = True,
|
|
log_output_level: int = logging.INFO,
|
|
capture: bool = True,
|
|
hide_console: bool = True,
|
|
timeout_seconds: int = 3600,
|
|
) -> subprocess.CompletedProcess:
|
|
"""
|
|
Executes a shell command, logs details, handles errors, with options
|
|
for capturing output and hiding the console window.
|
|
"""
|
|
func_name: str = "log_and_execute"
|
|
safe_command_parts: List[str] = [str(part) for part in command]
|
|
command_str: str = " ".join(safe_command_parts)
|
|
log_handler.log_debug(
|
|
f"Executing in '{working_directory}': {command_str} "
|
|
f"(Capture={capture}, HideConsole={hide_console}, Timeout={timeout_seconds})",
|
|
func_name=func_name,
|
|
)
|
|
|
|
if not working_directory:
|
|
msg: str = "Working directory cannot be None or empty."
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise ValueError(msg)
|
|
|
|
if working_directory == ".":
|
|
effective_cwd: str = os.getcwd()
|
|
else:
|
|
effective_cwd = os.path.abspath(working_directory)
|
|
if not os.path.isdir(effective_cwd):
|
|
msg = f"Working directory does not exist or is not a directory: {effective_cwd}"
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise GitCommandError(msg, command=safe_command_parts)
|
|
|
|
try:
|
|
startupinfo: Optional[subprocess.STARTUPINFO] = None
|
|
creationflags: int = 0
|
|
|
|
# Prepare environment to prevent interactive prompts when console is hidden
|
|
custom_env = os.environ.copy()
|
|
if hide_console:
|
|
custom_env["GIT_TERMINAL_PROMPT"] = "0"
|
|
|
|
if hide_console and os.name == "nt":
|
|
startupinfo = subprocess.STARTUPINFO()
|
|
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
|
startupinfo.wShowWindow = subprocess.SW_HIDE
|
|
elif not hide_console and os.name == "nt":
|
|
creationflags = subprocess.CREATE_NEW_CONSOLE
|
|
|
|
result: subprocess.CompletedProcess = subprocess.run(
|
|
safe_command_parts,
|
|
cwd=effective_cwd,
|
|
capture_output=capture,
|
|
text=True,
|
|
check=False,
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
timeout=timeout_seconds,
|
|
startupinfo=startupinfo,
|
|
creationflags=creationflags,
|
|
env=custom_env, # Use the custom environment
|
|
)
|
|
|
|
log_handler.log_debug(
|
|
f"Command '{command_str}' finished. RC={result.returncode}",
|
|
func_name=func_name,
|
|
)
|
|
|
|
if capture and (result.returncode == 0 or not check):
|
|
stdout_repr = repr(result.stdout) if result.stdout else "<no stdout>"
|
|
stderr_repr = repr(result.stderr) if result.stderr else "<no stderr>"
|
|
log_handler.log_debug(
|
|
f"Command RC={result.returncode}. RAW Output:\n"
|
|
f"--- stdout (repr) ---\n{stdout_repr}\n"
|
|
f"--- stderr (repr) ---\n{stderr_repr}\n"
|
|
f"--- End RAW Output ---",
|
|
func_name=func_name,
|
|
)
|
|
if log_output_level > logging.DEBUG:
|
|
stdout_log_brief = (
|
|
result.stdout.strip()[:200] + "..."
|
|
if result.stdout and len(result.stdout) > 200
|
|
else (result.stdout.strip() if result.stdout else "<no stdout>")
|
|
)
|
|
stderr_log_brief = (
|
|
result.stderr.strip()[:200] + "..."
|
|
if result.stderr and len(result.stderr) > 200
|
|
else (result.stderr.strip() if result.stderr else "<no stderr>")
|
|
)
|
|
log_handler.log_message(
|
|
log_output_level,
|
|
f"Command RC={result.returncode}. Output(brief): stdout='{stdout_log_brief}', stderr='{stderr_log_brief}'",
|
|
func_name=func_name,
|
|
)
|
|
|
|
if check and result.returncode != 0:
|
|
raise subprocess.CalledProcessError(
|
|
returncode=result.returncode,
|
|
cmd=safe_command_parts,
|
|
output=result.stdout if capture else None,
|
|
stderr=result.stderr if capture else None,
|
|
)
|
|
|
|
return result
|
|
|
|
except subprocess.TimeoutExpired as e:
|
|
log_handler.log_error(
|
|
f"Command timed out after {timeout_seconds}s: {command_str}",
|
|
func_name=func_name,
|
|
)
|
|
stderr_out = (
|
|
repr(e.stderr) if capture and e.stderr else "<stderr not captured>"
|
|
)
|
|
stdout_out = (
|
|
repr(e.stdout) if capture and e.stdout else "<stdout not captured>"
|
|
)
|
|
log_handler.log_error(f"Timeout stdout: {stdout_out}", func_name=func_name)
|
|
log_handler.log_error(f"Timeout stderr: {stderr_out}", func_name=func_name)
|
|
raise GitCommandError(
|
|
f"Timeout after {timeout_seconds}s.",
|
|
command=safe_command_parts,
|
|
stderr=e.stderr if capture else None,
|
|
) from e
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
stderr_repr = (
|
|
repr(e.stderr) if capture and e.stderr else "<stderr not captured>"
|
|
)
|
|
stdout_repr = (
|
|
repr(e.stdout) if capture and e.stdout else "<stdout not captured>"
|
|
)
|
|
err_msg: str = (
|
|
f"Command failed (RC {e.returncode}) in '{effective_cwd}'.\n"
|
|
f"CMD: {command_str}\nRAW_STDERR: {stderr_repr}\nRAW_STDOUT: {stdout_repr}"
|
|
)
|
|
log_handler.log_error(err_msg, func_name=func_name)
|
|
raise GitCommandError(
|
|
f"Git command failed in '{effective_cwd}'. RC: {e.returncode}",
|
|
command=safe_command_parts,
|
|
stderr=e.stderr if capture else None,
|
|
) from e
|
|
|
|
except FileNotFoundError as e:
|
|
error_msg: str = (
|
|
f"Command '{safe_command_parts[0]}' not found. Is Git installed and in your system's PATH?"
|
|
)
|
|
log_handler.log_error(
|
|
f"FileNotFoundError for command: {safe_command_parts[0]} - {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise GitCommandError(error_msg, command=safe_command_parts) from e
|
|
|
|
except PermissionError as e:
|
|
error_msg: str = (
|
|
f"Permission denied executing command in '{effective_cwd}'."
|
|
)
|
|
log_handler.log_error(
|
|
f"PermissionError executing in '{effective_cwd}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise GitCommandError(
|
|
error_msg, command=safe_command_parts, stderr=str(e)
|
|
) from e
|
|
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected Exception executing command {command_str}: {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise GitCommandError(
|
|
f"Unexpected execution error: {e}", command=safe_command_parts
|
|
) from e
|
|
|
|
# --- Core Repo Operations ---
|
|
def prepare_svn_for_git(self, working_directory: str) -> None:
|
|
"""
|
|
Initializes a Git repository if needed and ensures .svn is in .gitignore.
|
|
Raises errors on failure.
|
|
"""
|
|
func_name: str = "prepare_svn_for_git"
|
|
log_handler.log_info(
|
|
f"Preparing directory for Git: '{working_directory}'", func_name=func_name
|
|
)
|
|
# Basic path validation
|
|
if not working_directory or not os.path.isdir(working_directory):
|
|
raise ValueError(f"Invalid target directory: {working_directory}")
|
|
|
|
gitignore_path: str = os.path.join(working_directory, ".gitignore")
|
|
git_dir_path: str = os.path.join(working_directory, ".git")
|
|
svn_ignore_pattern: str = ".svn" # Pattern to ignore
|
|
|
|
# 1. Initialize Git Repo if it doesn't exist
|
|
if not os.path.exists(git_dir_path):
|
|
log_handler.log_info(
|
|
"No existing Git repo found. Initializing...", func_name=func_name
|
|
)
|
|
try:
|
|
init_cmd: List[str] = ["git", "init"]
|
|
# Use check=True as init failure is critical here
|
|
self.log_and_execute(init_cmd, working_directory, check=True)
|
|
log_handler.log_info(
|
|
"Git repository initialized successfully.", func_name=func_name
|
|
)
|
|
except (GitCommandError, ValueError) as e:
|
|
log_handler.log_error(
|
|
f"Failed to initialize Git repository: {e}", func_name=func_name
|
|
)
|
|
raise # Re-raise critical error
|
|
else:
|
|
log_handler.log_info(
|
|
"Existing Git repository found. Skipping initialization.",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# 2. Check and Update .gitignore
|
|
log_handler.log_debug(
|
|
f"Checking/updating .gitignore file: {gitignore_path}", func_name=func_name
|
|
)
|
|
try:
|
|
needs_write: bool = False
|
|
current_content: str = ""
|
|
new_content_to_append: str = ""
|
|
|
|
if not os.path.exists(gitignore_path):
|
|
# File doesn't exist, create it with the pattern
|
|
log_handler.log_info(
|
|
"'.gitignore' not found. Creating file with '.svn' entry.",
|
|
func_name=func_name,
|
|
)
|
|
new_content_to_append = f"{svn_ignore_pattern}\n"
|
|
needs_write = True
|
|
else:
|
|
# File exists, read its content
|
|
try:
|
|
with open(gitignore_path, "r", encoding="utf-8") as f:
|
|
lines: List[str] = f.readlines()
|
|
current_content = "".join(lines) # Store full original content
|
|
|
|
# Check if pattern (exactly or starting a dir) is already present
|
|
is_ignored: bool = any(
|
|
line.strip() == svn_ignore_pattern
|
|
or line.strip().startswith(svn_ignore_pattern + "/")
|
|
for line in lines
|
|
)
|
|
|
|
if not is_ignored:
|
|
log_handler.log_info(
|
|
f"Pattern '{svn_ignore_pattern}' not found in .gitignore. Appending...",
|
|
func_name=func_name,
|
|
)
|
|
# Add newline before appending if file doesn't end with one
|
|
prefix = (
|
|
"\n"
|
|
if current_content and not current_content.endswith("\n")
|
|
else ""
|
|
)
|
|
new_content_to_append = f"{prefix}{svn_ignore_pattern}\n"
|
|
needs_write = True
|
|
else:
|
|
log_handler.log_info(
|
|
f"Pattern '{svn_ignore_pattern}' already present in .gitignore.",
|
|
func_name=func_name,
|
|
)
|
|
|
|
except IOError as read_err:
|
|
log_handler.log_error(
|
|
f"Could not read existing .gitignore: {read_err}",
|
|
func_name=func_name,
|
|
)
|
|
raise IOError(
|
|
f"Failed to read .gitignore: {read_err}"
|
|
) from read_err
|
|
|
|
# Write to file if necessary
|
|
if needs_write:
|
|
# Use 'a' (append) mode if adding to existing content, 'w' (write) if creating new
|
|
write_mode = "a" if os.path.exists(gitignore_path) else "w"
|
|
log_handler.log_debug(
|
|
f"Writing to {gitignore_path} (Mode: {write_mode})",
|
|
func_name=func_name,
|
|
)
|
|
try:
|
|
# Ensure consistent newline handling
|
|
with open(
|
|
gitignore_path, write_mode, encoding="utf-8", newline="\n"
|
|
) as f:
|
|
f.write(new_content_to_append)
|
|
log_handler.log_info(
|
|
f"Successfully updated '{gitignore_path}'.", func_name=func_name
|
|
)
|
|
except IOError as write_err:
|
|
log_handler.log_error(
|
|
f"Error writing to .gitignore: {write_err}", func_name=func_name
|
|
)
|
|
raise IOError(
|
|
f"Failed to write .gitignore: {write_err}"
|
|
) from write_err
|
|
|
|
except IOError as io_err:
|
|
# Catch IOErrors from reading/writing and re-raise
|
|
log_handler.log_error(
|
|
f"I/O error processing .gitignore: {io_err}", func_name=func_name
|
|
)
|
|
raise io_err
|
|
except Exception as e:
|
|
# Catch unexpected errors during gitignore handling
|
|
log_handler.log_exception(
|
|
f"Unexpected error managing .gitignore: {e}", func_name=func_name
|
|
)
|
|
raise GitCommandError(f"Unexpected .gitignore error: {e}") from e
|
|
|
|
log_handler.log_info(
|
|
f"Directory preparation completed successfully for '{working_directory}'.",
|
|
func_name=func_name,
|
|
)
|
|
|
|
def create_git_bundle(self, working_directory: str, bundle_path: str) -> None:
|
|
"""
|
|
Creates a Git bundle file containing all refs. Raises error on failure.
|
|
Note: Git may refuse to create an empty bundle if the repo has no refs.
|
|
"""
|
|
func_name: str = "create_git_bundle"
|
|
# Normalize path for Git command (use forward slashes)
|
|
norm_path: str = os.path.normpath(bundle_path)
|
|
git_path: str = norm_path.replace("\\", "/")
|
|
log_handler.log_info(
|
|
f"Attempting to create Git bundle file: {git_path}", func_name=func_name
|
|
)
|
|
log_handler.log_debug(
|
|
f" Source Repository: {working_directory}", func_name=func_name
|
|
)
|
|
|
|
# Command: git bundle create <file> --all
|
|
command: List[str] = ["git", "bundle", "create", git_path, "--all"]
|
|
try:
|
|
# Execute command, use check=False to analyze output/stderr manually
|
|
result: subprocess.CompletedProcess = self.log_and_execute(
|
|
command, working_directory, check=False, log_output_level=logging.DEBUG
|
|
)
|
|
|
|
# Check result
|
|
if result.returncode != 0:
|
|
stderr_lower: str = result.stderr.lower() if result.stderr else ""
|
|
# Check specific Git messages for empty repo case
|
|
if (
|
|
"refusing to create empty bundle" in stderr_lower
|
|
or "does not contain any references" in stderr_lower
|
|
):
|
|
# This is not a critical error, but the bundle wasn't created
|
|
log_handler.log_warning(
|
|
f"Bundle creation skipped by Git: Repository '{os.path.basename(working_directory)}' is empty or has no references.",
|
|
func_name=func_name,
|
|
)
|
|
# No exception raised, but caller should check if file exists
|
|
else:
|
|
# Other Git error during bundle creation
|
|
err_msg: str = (
|
|
f"Git bundle command failed (RC {result.returncode})."
|
|
)
|
|
log_handler.log_error(
|
|
f"{err_msg} Stderr: {result.stderr.strip()}",
|
|
func_name=func_name,
|
|
)
|
|
raise GitCommandError(
|
|
err_msg, command=command, stderr=result.stderr
|
|
)
|
|
else:
|
|
# Command succeeded (RC=0), verify file existence and size
|
|
if not os.path.exists(norm_path) or os.path.getsize(norm_path) == 0:
|
|
log_handler.log_warning(
|
|
f"Git bundle command succeeded, but the output file '{norm_path}' is missing or empty.",
|
|
func_name=func_name,
|
|
)
|
|
# Caller should handle this (e.g., by removing the empty file)
|
|
else:
|
|
log_handler.log_info(
|
|
f"Bundle created successfully: '{norm_path}'.",
|
|
func_name=func_name,
|
|
)
|
|
|
|
except GitCommandError:
|
|
raise # Re-raise GitCommandError directly
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error during bundle creation: {e}", func_name=func_name
|
|
)
|
|
raise GitCommandError(
|
|
f"Unexpected bundle error: {e}", command=command
|
|
) from e
|
|
|
|
def fetch_from_git_bundle(self, working_directory: str, bundle_path: str) -> None:
|
|
"""
|
|
Fetches updates from a bundle file and merges FETCH_HEAD into the current branch.
|
|
Raises errors on failure (e.g., file not found, fetch/merge errors, conflicts).
|
|
"""
|
|
func_name: str = "fetch_from_git_bundle"
|
|
# Validate bundle file existence first
|
|
if not os.path.isfile(bundle_path):
|
|
raise FileNotFoundError(f"Bundle file not found at path: {bundle_path}")
|
|
|
|
# Normalize path for Git
|
|
norm_path: str = os.path.normpath(bundle_path).replace("\\", "/")
|
|
log_handler.log_info(
|
|
f"Fetching from bundle '{norm_path}' into repository '{working_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# 1. Fetch from the bundle
|
|
fetch_cmd: List[str] = [
|
|
"git",
|
|
"fetch",
|
|
norm_path,
|
|
"--verbose",
|
|
] # Use verbose for more log info
|
|
try:
|
|
log_handler.log_debug(
|
|
"Executing git fetch from bundle...", func_name=func_name
|
|
)
|
|
# Use check=True for fetch, as fetch failure is usually critical
|
|
self.log_and_execute(
|
|
fetch_cmd, working_directory, check=True, log_output_level=logging.INFO
|
|
)
|
|
log_handler.log_info(
|
|
"Fetch from bundle completed successfully.", func_name=func_name
|
|
)
|
|
except (GitCommandError, ValueError) as fetch_error:
|
|
log_handler.log_error(
|
|
f"Failed to fetch from bundle '{norm_path}': {fetch_error}",
|
|
func_name=func_name,
|
|
)
|
|
raise fetch_error # Re-raise fetch errors
|
|
|
|
# 2. Merge FETCH_HEAD (the fetched commits) into the current branch
|
|
# Use --no-ff to always create a merge commit for clarity, --no-edit to avoid editor
|
|
merge_cmd: List[str] = ["git", "merge", "FETCH_HEAD", "--no-ff", "--no-edit"]
|
|
try:
|
|
log_handler.log_debug(
|
|
"Executing git merge FETCH_HEAD...", func_name=func_name
|
|
)
|
|
# Use check=False for merge to detect conflicts (RC=1)
|
|
merge_res: subprocess.CompletedProcess = self.log_and_execute(
|
|
merge_cmd, working_directory, check=False, log_output_level=logging.INFO
|
|
)
|
|
|
|
# Analyze merge result
|
|
out_log: str = merge_res.stdout.strip() if merge_res.stdout else ""
|
|
err_log: str = merge_res.stderr.strip() if merge_res.stderr else ""
|
|
combined_log: str = out_log + err_log
|
|
combined_lower: str = combined_log.lower()
|
|
|
|
if merge_res.returncode == 0:
|
|
# Merge succeeded
|
|
if "already up to date" in combined_lower:
|
|
log_handler.log_info(
|
|
"Repository already up-to-date with fetched bundle content.",
|
|
func_name=func_name,
|
|
)
|
|
else:
|
|
log_handler.log_info(
|
|
"Merge of fetched bundle content successful.",
|
|
func_name=func_name,
|
|
)
|
|
# Log stdout/stderr from merge if needed (at DEBUG level)
|
|
log_handler.log_debug(
|
|
f"Merge output:\n{combined_log}", func_name=func_name
|
|
)
|
|
else:
|
|
# Merge failed (RC != 0)
|
|
# Check for merge conflict specifically (usually RC=1)
|
|
if (
|
|
"conflict" in combined_lower
|
|
or "automatic merge failed" in combined_lower
|
|
):
|
|
msg: str = (
|
|
f"Merge conflict occurred after fetching from bundle. Please resolve conflicts manually in '{working_directory}' and then commit the result."
|
|
)
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
# Raise specific error for conflicts
|
|
raise GitCommandError(
|
|
msg, command=merge_cmd, stderr=merge_res.stderr
|
|
)
|
|
else:
|
|
# Other merge error
|
|
msg = f"Git merge command failed after fetch (RC {merge_res.returncode})."
|
|
log_handler.log_error(
|
|
f"{msg} Stderr: {err_log}", func_name=func_name
|
|
)
|
|
raise GitCommandError(
|
|
msg, command=merge_cmd, stderr=merge_res.stderr
|
|
)
|
|
|
|
except GitCommandError as merge_error:
|
|
# Catch and re-raise merge errors (including conflicts)
|
|
log_handler.log_error(
|
|
f"Error merging FETCH_HEAD from bundle: {merge_error}",
|
|
func_name=func_name,
|
|
)
|
|
raise merge_error
|
|
except Exception as e:
|
|
# Catch unexpected errors during merge
|
|
log_handler.log_exception(
|
|
f"Unexpected error merging FETCH_HEAD from bundle: {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise GitCommandError(
|
|
f"Unexpected merge error: {e}", command=merge_cmd
|
|
) from e
|
|
|
|
# --- Commit and Status ---
|
|
def git_commit(self, working_directory: str, message: str, stage_all_first: bool = True) -> bool:
|
|
"""
|
|
Commits changes in the repository.
|
|
|
|
Args:
|
|
working_directory (str): Path to the repository.
|
|
message (str): The commit message.
|
|
stage_all_first (bool): If True, runs 'git add .' before committing.
|
|
Set to False if changes are already staged.
|
|
|
|
Returns:
|
|
bool: True if a commit was made, False otherwise.
|
|
"""
|
|
func_name = "git_commit"
|
|
log_handler.log_info(
|
|
f"Attempting commit in '{working_directory}' msg: '{message[:50]}...'",
|
|
func_name=func_name,
|
|
)
|
|
if not message or message.isspace():
|
|
raise ValueError("Commit message cannot be empty.")
|
|
try:
|
|
if stage_all_first:
|
|
add_cmd = ["git", "add", "."]
|
|
log_handler.log_debug(
|
|
"Staging all changes ('git add .')...", func_name=func_name
|
|
)
|
|
self.log_and_execute(add_cmd, working_directory, check=True)
|
|
log_handler.log_debug("Staging successful.", func_name=func_name)
|
|
else:
|
|
log_handler.log_debug(
|
|
"Skipping 'git add .', assuming changes are already staged.", func_name=func_name
|
|
)
|
|
|
|
commit_cmd = ["git", "commit", "-m", message]
|
|
log_handler.log_debug("Attempting commit...", func_name=func_name)
|
|
result = self.log_and_execute(commit_cmd, working_directory, check=False)
|
|
|
|
out_low = (result.stdout or "").lower()
|
|
err_low = (result.stderr or "").lower()
|
|
combined_low = out_low + err_low
|
|
|
|
if result.returncode == 0:
|
|
# Se c'è un output e non è un messaggio di "nessun cambiamento", allora è un successo.
|
|
if "nothing to commit" not in combined_low:
|
|
log_handler.log_info("Commit successful.", func_name=func_name)
|
|
return True
|
|
else:
|
|
# Questo caso può accadere se `stage_all_first` è False e non c'era nulla in staging
|
|
log_handler.log_info(
|
|
"No changes were available to commit.", func_name=func_name
|
|
)
|
|
return False
|
|
elif (
|
|
"nothing to commit" in combined_low
|
|
or "no changes added to commit" in combined_low
|
|
):
|
|
log_handler.log_info(
|
|
"No changes available to commit.", func_name=func_name
|
|
)
|
|
return False
|
|
else:
|
|
msg = f"Commit command failed unexpectedly (RC {result.returncode})."
|
|
log_handler.log_error(
|
|
f"{msg} Stderr: {result.stderr.strip()}", func_name=func_name
|
|
)
|
|
raise GitCommandError(msg, command=commit_cmd, stderr=result.stderr)
|
|
except (GitCommandError, ValueError) as e:
|
|
log_handler.log_error(
|
|
f"Error during staging/commit: {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
|
|
def git_status_has_changes(self, working_directory: str) -> bool:
|
|
func_name = "git_status_has_changes"
|
|
log_handler.log_debug(
|
|
f"Checking status for changes in '{working_directory}'...",
|
|
func_name=func_name,
|
|
)
|
|
try:
|
|
cmd = ["git", "status", "--porcelain=v1", "-unormal"]
|
|
result = self.log_and_execute(
|
|
cmd, working_directory, check=True, log_output_level=logging.DEBUG
|
|
)
|
|
has_changes = bool(result.stdout.strip())
|
|
log_handler.log_debug(
|
|
f"Status check complete. Has changes: {has_changes}",
|
|
func_name=func_name,
|
|
)
|
|
return has_changes
|
|
except GitCommandError as e:
|
|
log_handler.log_error(
|
|
f"Git status check command failed: {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
|
|
def git_rm(self, working_directory: str, path: str, force: bool = False) -> None:
|
|
"""
|
|
Removes a file or directory from the Git index.
|
|
|
|
Args:
|
|
working_directory (str): Path to the repository.
|
|
path (str): The relative path to the item to remove.
|
|
force (bool): If True, uses the '-f' flag to force removal.
|
|
"""
|
|
func_name = "git_rm"
|
|
action = "Forcibly removing" if force else "Removing"
|
|
log_handler.log_info(
|
|
f"{action} path from index: '{path}' in '{working_directory}'",
|
|
func_name=func_name
|
|
)
|
|
cmd = ["git", "rm"]
|
|
if force:
|
|
cmd.append("-f")
|
|
cmd.append(path)
|
|
|
|
try:
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
log_handler.log_info(f"Path '{path}' removed from index successfully.", func_name=func_name)
|
|
except GitCommandError as e:
|
|
log_handler.log_error(f"Failed to 'git rm {path}': {e}", func_name=func_name)
|
|
raise
|
|
|
|
# --- Tag Management ---
|
|
def list_tags(self, working_directory: str) -> List[Tuple[str, str]]:
|
|
func_name = "list_tags"
|
|
log_handler.log_info(
|
|
f"Listing tags in '{working_directory}'...", func_name=func_name
|
|
)
|
|
fmt = "%(refname:short)%09%(contents:subject)"
|
|
cmd = ["git", "tag", "--list", f"--format={fmt}", "--sort=-creatordate"]
|
|
tags = []
|
|
try:
|
|
result = self.log_and_execute(cmd, working_directory, check=True)
|
|
for line in result.stdout.splitlines():
|
|
line_strip = line.strip()
|
|
if line_strip:
|
|
parts = line_strip.split("\t", 1)
|
|
name = parts[0].strip()
|
|
subject = (
|
|
parts[1].strip()
|
|
if len(parts) > 1 and parts[1].strip()
|
|
else "(No subject/Lightweight)"
|
|
)
|
|
tags.append((name, subject))
|
|
log_handler.log_info(f"Found {len(tags)} tags.", func_name=func_name)
|
|
log_handler.log_debug(f"Tags found: {tags}", func_name=func_name)
|
|
return tags
|
|
except GitCommandError as e:
|
|
log_handler.log_error(f"Error listing tags: {e}", func_name=func_name)
|
|
return []
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error listing tags: {e}", func_name=func_name
|
|
)
|
|
return []
|
|
|
|
def create_tag(self, working_directory: str, tag_name: str, message: str) -> None:
|
|
func_name = "create_tag"
|
|
log_handler.log_info(
|
|
f"Creating tag '{tag_name}' in '{working_directory}'", func_name=func_name
|
|
)
|
|
if not tag_name or tag_name.isspace():
|
|
raise ValueError("Tag name cannot be empty.")
|
|
if not message or message.isspace():
|
|
raise ValueError("Tag message cannot be empty for annotated tag.")
|
|
pattern = r"^(?![./]|.*([./]{2,}|[.]$|\.lock$|@\{|\\\\))[^ \t\n\r\f\v~^:?*[\\]+(?<!\.)$"
|
|
if not re.match(pattern, tag_name):
|
|
raise ValueError(f"Invalid tag name format: '{tag_name}'.")
|
|
cmd = ["git", "tag", "-a", tag_name, "-m", message]
|
|
try:
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
log_handler.log_info(
|
|
f"Annotated tag '{tag_name}' created.", func_name=func_name
|
|
)
|
|
except GitCommandError as e:
|
|
stderr_low = (e.stderr or "").lower()
|
|
if "already exists" in stderr_low:
|
|
msg = f"Tag '{tag_name}' already exists."
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e
|
|
else:
|
|
log_handler.log_error(
|
|
f"Failed create tag '{tag_name}': {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
|
|
def checkout_tag(self, working_directory: str, tag_name: str) -> bool:
|
|
func_name = "checkout_tag"
|
|
log_handler.log_info(
|
|
f"Checking out tag '{tag_name}' in '{working_directory}'...",
|
|
func_name=func_name,
|
|
)
|
|
if not tag_name or tag_name.isspace():
|
|
raise ValueError("Tag name cannot be empty.")
|
|
cmd = ["git", "checkout", tag_name]
|
|
try:
|
|
result = self.log_and_execute(cmd, working_directory, check=True)
|
|
log_handler.log_info(
|
|
f"Successfully checked out tag '{tag_name}'.", func_name=func_name
|
|
)
|
|
output = (result.stderr or "" + result.stdout or "").lower()
|
|
if "detached head" in output:
|
|
log_handler.log_warning(
|
|
"Repo is now in 'detached HEAD' state.", func_name=func_name
|
|
)
|
|
return True
|
|
except GitCommandError as e:
|
|
stderr_low = (e.stderr or "").lower()
|
|
not_found = [
|
|
"did not match any file(s)",
|
|
f"pathspec '{tag_name.lower()}' did not match",
|
|
"reference is not a tree",
|
|
]
|
|
if any(p in stderr_low for p in not_found):
|
|
msg = f"Tag '{tag_name}' not found or invalid."
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e
|
|
else:
|
|
log_handler.log_error(
|
|
f"Failed checkout tag '{tag_name}': {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
|
|
# --- Branch Management ---
|
|
def list_branches(self, working_directory: str) -> Tuple[List[str], Optional[str]]:
|
|
func_name = "list_branches"
|
|
log_handler.log_info(
|
|
f"Listing local branches in '{working_directory}'...", func_name=func_name
|
|
)
|
|
fmt = "%(HEAD)%(refname:short)"
|
|
cmd = ["git", "branch", "--list", f"--format={fmt}"]
|
|
branches = []
|
|
current = None
|
|
try:
|
|
result = self.log_and_execute(cmd, working_directory, check=True)
|
|
for line in result.stdout.splitlines():
|
|
line_strip = line.strip()
|
|
if line_strip:
|
|
is_curr = line_strip.startswith("*")
|
|
name = line_strip.lstrip("* ").strip()
|
|
branches.append(name)
|
|
if is_curr:
|
|
current = name
|
|
branches.sort()
|
|
curr_disp = current if current else "None (Detached?)"
|
|
log_handler.log_info(
|
|
f"Found {len(branches)} branches. Current: {curr_disp}",
|
|
func_name=func_name,
|
|
)
|
|
log_handler.log_debug(f"Branches: {branches}", func_name=func_name)
|
|
return branches, current
|
|
except GitCommandError as e:
|
|
log_handler.log_error(f"Error listing branches: {e}", func_name=func_name)
|
|
return [], None
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error listing branches: {e}", func_name=func_name
|
|
)
|
|
return [], None
|
|
|
|
def checkout_branch(self, working_directory: str, branch_name: str) -> bool:
|
|
func_name = "checkout_branch"
|
|
log_handler.log_info(
|
|
f"Checking out branch '{branch_name}' in '{working_directory}'...",
|
|
func_name=func_name,
|
|
)
|
|
if not branch_name or branch_name.isspace():
|
|
raise ValueError("Branch name cannot be empty.")
|
|
cmd = ["git", "checkout", branch_name]
|
|
try:
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
log_handler.log_info(
|
|
f"Successfully checked out branch '{branch_name}'.", func_name=func_name
|
|
)
|
|
return True
|
|
except GitCommandError as e:
|
|
stderr_low = (e.stderr or "").lower()
|
|
not_found = [
|
|
"did not match any file(s)",
|
|
f"pathspec '{branch_name.lower()}' did not match",
|
|
"is not a commit and a branch",
|
|
]
|
|
if any(p in stderr_low for p in not_found):
|
|
msg = f"Branch '{branch_name}' not found or invalid."
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e
|
|
else:
|
|
log_handler.log_error(
|
|
f"Failed checkout branch '{branch_name}': {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
|
|
def create_branch(self, working_directory: str, new_branch_name: str) -> bool:
|
|
func_name = "create_branch"
|
|
log_handler.log_info(
|
|
f"Creating branch '{new_branch_name}' in '{working_directory}'...",
|
|
func_name=func_name,
|
|
)
|
|
if not new_branch_name or new_branch_name.isspace():
|
|
raise ValueError("Branch name cannot be empty.")
|
|
pattern = (
|
|
r"^(?!\.| |.*[/.]\.|\.|.*\\|.*@\{|.*[/]$|.*\.\.)[^ \t\n\r\f\v~^:?*[\\]+$"
|
|
)
|
|
if not re.match(pattern, new_branch_name) or new_branch_name.lower() == "head":
|
|
raise ValueError(f"Invalid branch name format: '{new_branch_name}'.")
|
|
cmd = ["git", "branch", new_branch_name]
|
|
try:
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
log_handler.log_info(
|
|
f"Branch '{new_branch_name}' created.", func_name=func_name
|
|
)
|
|
return True
|
|
except GitCommandError as e:
|
|
stderr_low = (e.stderr or "").lower()
|
|
if "already exists" in stderr_low:
|
|
msg = f"Branch '{new_branch_name}' already exists."
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e
|
|
else:
|
|
log_handler.log_error(
|
|
f"Failed create branch '{new_branch_name}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise e
|
|
|
|
# --- History / Log ---
|
|
def reset_hard(self, working_directory: str, commit_hash: str) -> None:
|
|
"""
|
|
Performs a git reset --hard to the specified commit hash.
|
|
|
|
Args:
|
|
working_directory (str): Path to the repository.
|
|
commit_hash (str): The commit hash to reset to.
|
|
"""
|
|
func_name = "reset_hard"
|
|
log_handler.log_warning(
|
|
f"Performing a destructive 'git reset --hard' to '{commit_hash}' in '{working_directory}'",
|
|
func_name=func_name
|
|
)
|
|
if not commit_hash or commit_hash.isspace():
|
|
raise ValueError("Commit hash cannot be empty.")
|
|
|
|
cmd = ["git", "reset", "--hard", commit_hash]
|
|
try:
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
log_handler.log_info(
|
|
f"Successfully reset to commit '{commit_hash}'.", func_name=func_name
|
|
)
|
|
except GitCommandError as e:
|
|
log_handler.log_error(f"Failed to 'git reset --hard': {e}", func_name=func_name)
|
|
raise
|
|
|
|
def get_commit_log(
|
|
self, working_directory: str, max_count: int = 200, branch: Optional[str] = None
|
|
) -> List[str]:
|
|
func_name = "get_commit_log"
|
|
scope = f"'{branch}'" if branch else "all"
|
|
log_handler.log_info(
|
|
f"Retrieving commit log for {scope} in '{working_directory}' (max {max_count})...",
|
|
func_name=func_name,
|
|
)
|
|
fmt = "%h %ad | %an |%d %s"
|
|
date_fmt = "format:%Y-%m-%d %H:%M"
|
|
cmd = [
|
|
"git",
|
|
"log",
|
|
f"--max-count={max_count}",
|
|
f"--pretty=format:{fmt}",
|
|
f"--date={date_fmt}",
|
|
"--decorate=short",
|
|
"--source",
|
|
]
|
|
if branch:
|
|
cmd.append(branch)
|
|
else:
|
|
cmd.append("--all")
|
|
try:
|
|
result = self.log_and_execute(
|
|
cmd, working_directory, check=True, log_output_level=logging.DEBUG
|
|
)
|
|
lines = [
|
|
line.strip() for line in result.stdout.splitlines() if line.strip()
|
|
]
|
|
log_handler.log_info(
|
|
f"Retrieved {len(lines)} log entries for {scope}.", func_name=func_name
|
|
)
|
|
return lines
|
|
except GitCommandError as e:
|
|
log_handler.log_error(
|
|
f"Error retrieving commit log for {scope}: {e}", func_name=func_name
|
|
)
|
|
return []
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error retrieving commit log for {scope}: {e}",
|
|
func_name=func_name,
|
|
)
|
|
return []
|
|
|
|
# --- Clone from Bundle ---
|
|
def clone_from_bundle(self, bundle_path: str, destination_directory: str) -> bool:
|
|
func_name = "clone_from_bundle"
|
|
log_handler.log_info(
|
|
f"Cloning from '{bundle_path}' into '{destination_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
if not bundle_path:
|
|
raise ValueError("Bundle path cannot be empty.")
|
|
if not destination_directory:
|
|
raise ValueError("Destination directory cannot be empty.")
|
|
abs_bundle = os.path.abspath(bundle_path)
|
|
abs_dest = os.path.abspath(destination_directory)
|
|
if not os.path.isfile(abs_bundle):
|
|
msg = f"Bundle file not found: {abs_bundle}"
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise FileNotFoundError(msg)
|
|
if os.path.exists(abs_dest):
|
|
if not os.path.isdir(abs_dest):
|
|
msg = f"Destination exists but is not a directory: {abs_dest}"
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise IOError(msg)
|
|
if os.listdir(abs_dest):
|
|
msg = f"Destination directory is not empty: {abs_dest}"
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise IOError(msg)
|
|
log_handler.log_debug(
|
|
"Destination directory exists and is empty.", func_name=func_name
|
|
)
|
|
else:
|
|
log_handler.log_debug(
|
|
f"Destination directory does not exist. Creating: {abs_dest}",
|
|
func_name=func_name,
|
|
)
|
|
try:
|
|
os.makedirs(abs_dest)
|
|
log_handler.log_info(
|
|
f"Created destination directory: {abs_dest}", func_name=func_name
|
|
)
|
|
except OSError as e:
|
|
msg = f"Failed to create destination directory '{abs_dest}': {e}"
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise IOError(msg) from e
|
|
cmd = ["git", "clone", abs_bundle, abs_dest, "--progress"]
|
|
try:
|
|
self.log_and_execute(
|
|
cmd, working_directory=".", check=True, log_output_level=logging.INFO
|
|
)
|
|
log_handler.log_info(
|
|
f"Cloned successfully from bundle into '{abs_dest}'.",
|
|
func_name=func_name,
|
|
)
|
|
return True
|
|
except GitCommandError as clone_e:
|
|
log_handler.log_error(
|
|
f"Failed to clone from bundle '{os.path.basename(abs_bundle)}': {clone_e}",
|
|
func_name=func_name,
|
|
)
|
|
if os.path.isdir(abs_dest):
|
|
try:
|
|
if not os.listdir(abs_dest):
|
|
os.rmdir(abs_dest)
|
|
log_handler.log_info(
|
|
f"Removed empty destination directory after failed clone: {abs_dest}",
|
|
func_name=func_name,
|
|
)
|
|
else:
|
|
log_handler.log_warning(
|
|
f"Destination directory not empty after failed clone: {abs_dest}",
|
|
func_name=func_name,
|
|
)
|
|
except OSError as rm_e:
|
|
log_handler.log_warning(
|
|
f"Could not remove destination directory '{abs_dest}' after failed clone: {rm_e}",
|
|
func_name=func_name,
|
|
)
|
|
raise clone_e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error cloning from bundle: {e}", func_name=func_name
|
|
)
|
|
raise GitCommandError(f"Unexpected clone error: {e}", command=cmd) from e
|
|
|
|
# --- Gitignore and File Tracking ---
|
|
def get_tracked_files(self, working_directory: str) -> List[str]:
|
|
func_name = "get_tracked_files"
|
|
log_handler.log_debug(
|
|
f"Getting tracked files in '{working_directory}'", func_name=func_name
|
|
)
|
|
cmd = ["git", "ls-files", "-z"]
|
|
try:
|
|
result = self.log_and_execute(
|
|
cmd, working_directory, check=True, log_output_level=logging.DEBUG
|
|
)
|
|
files = [f for f in result.stdout.split("\0") if f]
|
|
log_handler.log_info(
|
|
f"Found {len(files)} tracked files.", func_name=func_name
|
|
)
|
|
return files
|
|
except GitCommandError as e:
|
|
log_handler.log_error(
|
|
f"Failed list tracked files: {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
|
|
def check_if_would_be_ignored(
|
|
self, working_directory: str, path_to_check: str
|
|
) -> bool:
|
|
func_name = "check_if_would_be_ignored"
|
|
cmd = ["git", "check-ignore", "--quiet", "--no-index", "--", path_to_check]
|
|
log_handler.log_debug(
|
|
f"Checking ignore status for: '{path_to_check}'", func_name=func_name
|
|
)
|
|
try:
|
|
result = self.log_and_execute(cmd, working_directory, check=False)
|
|
if result.returncode == 0:
|
|
log_handler.log_debug(
|
|
f"Path '{path_to_check}' WOULD be ignored.", func_name=func_name
|
|
)
|
|
return True
|
|
elif result.returncode == 1:
|
|
log_handler.log_debug(
|
|
f"Path '{path_to_check}' would NOT be ignored.", func_name=func_name
|
|
)
|
|
return False
|
|
else:
|
|
msg = f"check-ignore failed (RC {result.returncode})"
|
|
log_handler.log_error(
|
|
f"{msg}. Stderr: {result.stderr.strip()}", func_name=func_name
|
|
)
|
|
raise GitCommandError(msg, command=cmd, stderr=result.stderr)
|
|
except GitCommandError as e:
|
|
log_handler.log_error(
|
|
f"Failed check_ignore for '{path_to_check}': {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
|
|
def remove_from_tracking(
|
|
self, working_directory: str, files_to_untrack: List[str]
|
|
) -> bool:
|
|
func_name = "remove_from_tracking"
|
|
if not files_to_untrack:
|
|
log_handler.log_debug("No files provided to untrack.", func_name=func_name)
|
|
return True
|
|
num = len(files_to_untrack)
|
|
log_handler.log_info(
|
|
f"Removing {num} items from tracking index...", func_name=func_name
|
|
)
|
|
batch_size = 100
|
|
succeeded = True
|
|
for i in range(0, num, batch_size):
|
|
batch = files_to_untrack[i : i + batch_size]
|
|
batch_num = i // batch_size + 1
|
|
log_handler.log_info(
|
|
f"Processing untrack batch {batch_num} ({len(batch)} items)...",
|
|
func_name=func_name,
|
|
)
|
|
log_handler.log_debug(
|
|
f"Batch {batch_num} items: {batch}", func_name=func_name
|
|
)
|
|
cmd = ["git", "rm", "--cached", "-r", "--ignore-unmatch", "--"]
|
|
cmd.extend(batch)
|
|
try:
|
|
self.log_and_execute(
|
|
cmd, working_directory, check=True, log_output_level=logging.DEBUG
|
|
)
|
|
log_handler.log_info(
|
|
f"Batch {batch_num} untracked successfully.", func_name=func_name
|
|
)
|
|
except GitCommandError as e:
|
|
log_handler.log_error(
|
|
f"Failed untrack batch {batch_num}: {e}", func_name=func_name
|
|
)
|
|
succeeded = False
|
|
raise GitCommandError(
|
|
f"Failed batch {batch_num}. Error: {e}",
|
|
command=cmd,
|
|
stderr=getattr(e, "stderr", None),
|
|
) from e
|
|
log_handler.log_info("All untracking batches completed.", func_name=func_name)
|
|
return succeeded
|
|
|
|
def get_matching_gitignore_rule(
|
|
self, working_directory: str, path_to_check: str
|
|
) -> Optional[str]:
|
|
func_name = "get_matching_gitignore_rule"
|
|
cmd = ["git", "check-ignore", "-v", "--no-index", "--", path_to_check]
|
|
log_handler.log_debug(
|
|
f"Getting matching rule for: '{path_to_check}'", func_name=func_name
|
|
)
|
|
try:
|
|
result = self.log_and_execute(cmd, working_directory, check=False)
|
|
if result.returncode == 0:
|
|
line = result.stdout.strip()
|
|
if line and "\t" in line:
|
|
rule_part = line.split("\t", 1)[0]
|
|
parts = rule_part.split(":", 2)
|
|
if len(parts) == 3:
|
|
pattern = parts[2]
|
|
log_handler.log_debug(
|
|
f"Path '{path_to_check}' matched rule: '{pattern}'",
|
|
func_name=func_name,
|
|
)
|
|
return pattern
|
|
else:
|
|
log_handler.log_warning(
|
|
f"Could not parse pattern from: {rule_part}",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
else:
|
|
log_handler.log_warning(
|
|
f"Unexpected output from check-ignore -v: {line}",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
elif result.returncode == 1:
|
|
log_handler.log_debug(
|
|
f"Path '{path_to_check}' not ignored.", func_name=func_name
|
|
)
|
|
return None
|
|
else:
|
|
msg = f"check-ignore -v failed (RC {result.returncode})"
|
|
log_handler.log_error(
|
|
f"{msg}. Stderr: {result.stderr.strip()}", func_name=func_name
|
|
)
|
|
raise GitCommandError(msg, command=cmd, stderr=result.stderr)
|
|
except GitCommandError as e:
|
|
log_handler.log_error(
|
|
f"Failed get_matching_rule for '{path_to_check}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise e
|
|
|
|
def get_status_short(self, working_directory: str) -> List[str]:
|
|
func_name = "get_status_short"
|
|
log_handler.log_debug(
|
|
f"Getting short status for '{working_directory}' (-z)", func_name=func_name
|
|
)
|
|
cmd = ["git", "status", "--short", "-z", "--ignored=no"]
|
|
try:
|
|
result = self.log_and_execute(
|
|
cmd, working_directory, check=True, log_output_level=logging.DEBUG
|
|
)
|
|
raw_output = result.stdout
|
|
log_handler.log_debug(
|
|
f"Raw stdout length: {len(raw_output)}", func_name=func_name
|
|
)
|
|
log_handler.log_debug(
|
|
f"Raw stdout repr: {repr(raw_output)}", func_name=func_name
|
|
)
|
|
status_lines = [line for line in raw_output.split("\0") if line]
|
|
log_handler.log_debug(
|
|
f"Split resulted in {len(status_lines)} non-empty lines.",
|
|
func_name=func_name,
|
|
)
|
|
log_handler.log_debug(
|
|
f"Split lines list: {status_lines}", func_name=func_name
|
|
)
|
|
log_handler.log_info(
|
|
f"Status check returned {len(status_lines)} items.", func_name=func_name
|
|
)
|
|
return status_lines
|
|
except GitCommandError as e:
|
|
log_handler.log_error(f"Failed get status: {e}", func_name=func_name)
|
|
return []
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error getting status: {e}", func_name=func_name
|
|
)
|
|
return []
|
|
|
|
# --- Remote Interaction Helpers ---
|
|
def get_file_content_from_ref(
|
|
self, working_directory: str, file_path: str, ref: str
|
|
) -> Optional[str]:
|
|
# ... (codice invariato) ...
|
|
func_name = "get_file_content_from_ref"
|
|
git_file_path = file_path.replace(os.path.sep, "/")
|
|
git_ref = ref.strip() if ref else "HEAD"
|
|
if git_ref == ":":
|
|
ref_path_arg = f":{git_file_path}"
|
|
else:
|
|
ref_path_arg = f"{git_ref}:{git_file_path}"
|
|
log_handler.log_debug(
|
|
f"Getting content for file='{git_file_path}' at ref='{git_ref}' (using '{ref_path_arg}') in '{working_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "show", ref_path_arg]
|
|
try:
|
|
result = self.log_and_execute(
|
|
cmd,
|
|
working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.DEBUG,
|
|
)
|
|
if result.returncode == 0:
|
|
return result.stdout
|
|
elif result.returncode == 128 and (
|
|
"exists on disk, but not in" in (result.stderr or "")
|
|
or "does not exist in" in (result.stderr or "")
|
|
or (
|
|
"fatal: Path" in (result.stderr or "")
|
|
and "does not exist" in (result.stderr or "")
|
|
)
|
|
or "did not match any file(s)" in (result.stderr or "")
|
|
):
|
|
log_handler.log_warning(
|
|
f"File '{git_file_path}' not found in ref '{git_ref}'.",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
else:
|
|
log_handler.log_error(
|
|
f"git show failed for '{ref_path_arg}' (RC {result.returncode}). Stderr: {(result.stderr or 'N/A').strip()}",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Error executing git show for '{ref_path_arg}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
|
|
def add_file(self, working_directory: str, file_path: str, renormalize: bool = False) -> bool:
|
|
"""
|
|
Adds a path to the staging area.
|
|
|
|
Args:
|
|
working_directory (str): Path to the repository.
|
|
file_path (str): Path to stage (e.g., '.', 'a_file.txt').
|
|
renormalize (bool): If True, use '--renormalize' to fix line endings.
|
|
"""
|
|
func_name = "add_file"
|
|
if not file_path or file_path.isspace():
|
|
raise ValueError("File path cannot be empty.")
|
|
log_handler.log_info(
|
|
f"Adding path to staging: '{file_path}' in '{working_directory}' (renormalize={renormalize})",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "add"]
|
|
if renormalize:
|
|
cmd.append("--renormalize")
|
|
cmd.extend(["--", file_path])
|
|
|
|
try:
|
|
self.log_and_execute(
|
|
cmd, working_directory, check=True, log_output_level=logging.DEBUG
|
|
)
|
|
log_handler.log_info(
|
|
f"Path '{file_path}' added successfully.", func_name=func_name
|
|
)
|
|
return True
|
|
except GitCommandError as add_e:
|
|
log_handler.log_error(
|
|
f"Failed to add path '{file_path}': {add_e}", func_name=func_name
|
|
)
|
|
stderr = (add_e.stderr or "").lower()
|
|
if "did not match any files" in stderr:
|
|
raise GitCommandError(
|
|
f"Pathspec '{file_path}' did not match any files.",
|
|
command=cmd,
|
|
stderr=add_e.stderr,
|
|
) from add_e
|
|
else:
|
|
raise add_e
|
|
|
|
def get_remotes(self, working_directory: str) -> Dict[str, str]:
|
|
# ... (codice invariato) ...
|
|
func_name = "get_remotes"
|
|
log_handler.log_debug(
|
|
f"Getting remotes for '{working_directory}'", func_name=func_name
|
|
)
|
|
cmd = ["git", "remote", "-v"]
|
|
remotes = {}
|
|
try:
|
|
result = self.log_and_execute(
|
|
cmd, working_directory, check=True, log_output_level=logging.DEBUG
|
|
)
|
|
lines = result.stdout.strip().splitlines()
|
|
for line in lines:
|
|
parts = line.split()
|
|
if len(parts) == 3 and parts[2] == "(fetch)":
|
|
name = parts[0]
|
|
url = parts[1]
|
|
remotes[name] = url
|
|
log_handler.log_info(
|
|
f"Found {len(remotes)} remotes: {list(remotes.keys())}",
|
|
func_name=func_name,
|
|
)
|
|
return remotes
|
|
except GitCommandError as e:
|
|
is_error = e.stderr and "fatal:" in e.stderr.lower()
|
|
if not is_error and not remotes:
|
|
log_handler.log_info("No remotes found.", func_name=func_name)
|
|
return {}
|
|
else:
|
|
log_handler.log_error(
|
|
f"Failed to get remotes: {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error getting remotes: {e}", func_name=func_name
|
|
)
|
|
raise GitCommandError(f"Unexpected error getting remotes: {e}") from e
|
|
|
|
def add_remote(
|
|
self, working_directory: str, remote_name: str, remote_url: str
|
|
) -> bool:
|
|
func_name = "add_remote"
|
|
log_handler.log_info(
|
|
f"Adding remote '{remote_name}' -> '{remote_url}'", func_name=func_name
|
|
)
|
|
cmd = ["git", "remote", "add", remote_name, remote_url]
|
|
try:
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
log_handler.log_info(
|
|
f"Remote '{remote_name}' added successfully.", func_name=func_name
|
|
)
|
|
return True
|
|
except GitCommandError as e:
|
|
stderr_low = (e.stderr or "").lower()
|
|
if "already exists" in stderr_low:
|
|
raise GitCommandError(
|
|
f"Remote '{remote_name}' already exists.",
|
|
command=cmd,
|
|
stderr=e.stderr,
|
|
) from e
|
|
else:
|
|
log_handler.log_error(
|
|
f"Failed to add remote '{remote_name}': {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error adding remote '{remote_name}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise GitCommandError(
|
|
f"Unexpected error adding remote: {e}", command=cmd
|
|
) from e
|
|
|
|
def set_remote_url(
|
|
self, working_directory: str, remote_name: str, remote_url: str
|
|
) -> bool:
|
|
func_name = "set_remote_url"
|
|
log_handler.log_info(
|
|
f"Setting URL for remote '{remote_name}' to '{remote_url}'",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "remote", "set-url", remote_name, remote_url]
|
|
try:
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
log_handler.log_info(
|
|
f"URL for remote '{remote_name}' set successfully.", func_name=func_name
|
|
)
|
|
return True
|
|
except GitCommandError as e:
|
|
stderr_low = (e.stderr or "").lower()
|
|
if "no such remote" in stderr_low:
|
|
raise GitCommandError(
|
|
f"Remote '{remote_name}' does not exist, cannot set URL.",
|
|
command=cmd,
|
|
stderr=e.stderr,
|
|
) from e
|
|
else:
|
|
log_handler.log_error(
|
|
f"Failed to set URL for remote '{remote_name}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error setting remote URL for '{remote_name}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise GitCommandError(
|
|
f"Unexpected error setting remote URL: {e}", command=cmd
|
|
) from e
|
|
|
|
# --- Remote Operations (Fetch, Pull, Push, etc.) ---
|
|
def git_ls_remote(
|
|
self, working_directory: str, remote_name: str
|
|
) -> subprocess.CompletedProcess:
|
|
func_name = "git_ls_remote"
|
|
log_handler.log_debug(
|
|
f"Running ls-remote for '{remote_name}' in '{working_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "ls-remote", "--exit-code", remote_name]
|
|
result = self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.DEBUG,
|
|
)
|
|
return result
|
|
|
|
def git_fetch_interactive(
|
|
self, working_directory: str, remote_name: str
|
|
) -> subprocess.CompletedProcess:
|
|
func_name = "git_fetch_interactive"
|
|
log_handler.log_info(
|
|
f"Running interactive fetch for '{remote_name}' in '{working_directory}'. User may see a terminal.",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "fetch", remote_name]
|
|
result = self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=False,
|
|
capture=False,
|
|
hide_console=False,
|
|
)
|
|
return result
|
|
|
|
def git_fetch(
|
|
self, working_directory: str, remote_name: str, prune: bool = True
|
|
) -> subprocess.CompletedProcess:
|
|
func_name = "git_fetch"
|
|
log_handler.log_info(
|
|
f"Fetching from remote '{remote_name}' in '{working_directory}' (Prune={prune})",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "fetch", remote_name]
|
|
if prune:
|
|
cmd.append("--prune")
|
|
result = self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.INFO,
|
|
)
|
|
return result
|
|
|
|
def git_pull(
|
|
self, working_directory: str, remote_name: str, branch_name: str
|
|
) -> subprocess.CompletedProcess:
|
|
"""
|
|
Pulls a specific branch from a remote, allowing for unrelated histories.
|
|
"""
|
|
func_name = "git_pull"
|
|
log_handler.log_info(
|
|
f"Pulling from remote '{remote_name}' branch '{branch_name}' into current branch in '{working_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
# Comando esplicito: git pull <remote> <branch>
|
|
# Aggiungiamo --allow-unrelated-histories per risolvere i conflitti dopo un reset del repo
|
|
cmd = ["git", "pull", "--allow-unrelated-histories", remote_name, branch_name]
|
|
result = self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.INFO,
|
|
)
|
|
return result
|
|
|
|
def git_push(
|
|
self,
|
|
working_directory: str,
|
|
remote_name: str,
|
|
branch_name: str,
|
|
set_upstream: bool = False,
|
|
force: bool = False,
|
|
) -> subprocess.CompletedProcess:
|
|
func_name = "git_push"
|
|
push_options = []
|
|
if set_upstream:
|
|
push_options.append("--set-upstream")
|
|
log_handler.log_info(
|
|
f"Pushing branch '{branch_name}' to '{remote_name}' and setting upstream.",
|
|
func_name=func_name,
|
|
)
|
|
else:
|
|
log_handler.log_info(
|
|
f"Pushing branch '{branch_name}' to '{remote_name}'.",
|
|
func_name=func_name,
|
|
)
|
|
if force:
|
|
push_options.append("--force")
|
|
log_handler.log_warning(
|
|
f"Executing FORCE PUSH for branch '{branch_name}' to '{remote_name}'!",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "push"] + push_options + [remote_name, branch_name]
|
|
result = self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.INFO,
|
|
)
|
|
return result
|
|
|
|
def git_push_tags(
|
|
self, working_directory: str, remote_name: str
|
|
) -> subprocess.CompletedProcess:
|
|
func_name = "git_push_tags"
|
|
log_handler.log_info(
|
|
f"Pushing all tags to remote '{remote_name}' from '{working_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "push", remote_name, "--tags"]
|
|
result = self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.INFO,
|
|
)
|
|
return result
|
|
|
|
# --- Branch Info and Status ---
|
|
def get_current_branch_name(self, working_directory: str) -> Optional[str]:
|
|
func_name = "get_current_branch_name"
|
|
log_handler.log_debug(
|
|
f"Getting current branch name in '{working_directory}'", func_name=func_name
|
|
)
|
|
cmd = ["git", "symbolic-ref", "--short", "-q", "HEAD"]
|
|
try:
|
|
result = self.log_and_execute(
|
|
cmd,
|
|
working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.DEBUG,
|
|
)
|
|
if result.returncode == 0 and result.stdout:
|
|
branch_name = result.stdout.strip()
|
|
log_handler.log_info(
|
|
f"Current branch is '{branch_name}'", func_name=func_name
|
|
)
|
|
return branch_name
|
|
elif result.returncode == 1:
|
|
log_handler.log_warning(
|
|
"Currently in detached HEAD state.", func_name=func_name
|
|
)
|
|
return None
|
|
else:
|
|
log_handler.log_error(
|
|
f"Failed to get current branch (RC={result.returncode}). Stderr: {(result.stderr or 'N/A').strip()}",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error getting current branch: {e}", func_name=func_name
|
|
)
|
|
return None
|
|
|
|
def get_branch_upstream(
|
|
self, working_directory: str, branch_name: str
|
|
) -> Optional[str]:
|
|
func_name = "get_branch_upstream"
|
|
log_handler.log_debug(
|
|
f"Getting upstream for branch '{branch_name}' in '{working_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "rev-parse", "--abbrev-ref", f"{branch_name}@{{upstream}}"]
|
|
try:
|
|
result = self.log_and_execute(
|
|
cmd,
|
|
working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.DEBUG,
|
|
)
|
|
if result.returncode == 0 and result.stdout:
|
|
upstream_name = result.stdout.strip()
|
|
remote_cmd = ["git", "config", "--get", f"branch.{branch_name}.remote"]
|
|
merge_ref_cmd = [
|
|
"git",
|
|
"config",
|
|
"--get",
|
|
f"branch.{branch_name}.merge",
|
|
]
|
|
remote_result = self.log_and_execute(
|
|
remote_cmd,
|
|
working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
)
|
|
merge_ref_result = self.log_and_execute(
|
|
merge_ref_cmd,
|
|
working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
)
|
|
if remote_result.returncode == 0 and merge_ref_result.returncode == 0:
|
|
remote = remote_result.stdout.strip()
|
|
merge_ref = merge_ref_result.stdout.strip()
|
|
remote_branch_name = merge_ref.split("/")[-1]
|
|
full_upstream_name = f"{remote}/{remote_branch_name}"
|
|
log_handler.log_info(
|
|
f"Upstream for '{branch_name}' is '{full_upstream_name}'",
|
|
func_name=func_name,
|
|
)
|
|
return full_upstream_name
|
|
else:
|
|
log_handler.log_warning(
|
|
f"Could not determine full upstream name for '{branch_name}'.",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
elif "no upstream configured" in (result.stderr or "").lower():
|
|
log_handler.log_info(
|
|
f"No upstream configured for branch '{branch_name}'.",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
else:
|
|
log_handler.log_error(
|
|
f"Failed to get upstream for '{branch_name}' (RC={result.returncode}). Stderr: {(result.stderr or 'N/A').strip()}",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error getting upstream for '{branch_name}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
|
|
def get_ahead_behind_count(
|
|
self, working_directory: str, local_branch: str, upstream_branch: str
|
|
) -> Tuple[Optional[int], Optional[int]]:
|
|
func_name = "get_ahead_behind_count"
|
|
log_handler.log_debug(
|
|
f"Getting ahead/behind count for '{local_branch}' vs '{upstream_branch}' using separate counts.",
|
|
func_name=func_name,
|
|
)
|
|
ahead_count = None
|
|
behind_count = None
|
|
try:
|
|
ahead_cmd = [
|
|
"git",
|
|
"rev-list",
|
|
"--count",
|
|
f"{upstream_branch}..{local_branch}",
|
|
]
|
|
log_handler.log_debug(
|
|
f"Executing ahead count command: {' '.join(ahead_cmd)}",
|
|
func_name=func_name,
|
|
)
|
|
ahead_result = self.log_and_execute(
|
|
ahead_cmd,
|
|
working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.DEBUG,
|
|
)
|
|
if ahead_result.returncode == 0 and ahead_result.stdout:
|
|
try:
|
|
ahead_count = int(ahead_result.stdout.strip())
|
|
log_handler.log_debug(
|
|
f"Ahead count: {ahead_count}", func_name=func_name
|
|
)
|
|
except ValueError:
|
|
log_handler.log_error(
|
|
f"Failed to parse ahead count output: '{ahead_result.stdout.strip()}'",
|
|
func_name=func_name,
|
|
)
|
|
return None, None
|
|
else:
|
|
log_handler.log_warning(
|
|
f"Ahead count command failed (RC={ahead_result.returncode}). Stderr: {(ahead_result.stderr or 'N/A').strip()}",
|
|
func_name=func_name,
|
|
)
|
|
return None, None
|
|
behind_cmd = [
|
|
"git",
|
|
"rev-list",
|
|
"--count",
|
|
f"{local_branch}..{upstream_branch}",
|
|
]
|
|
log_handler.log_debug(
|
|
f"Executing behind count command: {' '.join(behind_cmd)}",
|
|
func_name=func_name,
|
|
)
|
|
behind_result = self.log_and_execute(
|
|
behind_cmd,
|
|
working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.DEBUG,
|
|
)
|
|
if behind_result.returncode == 0 and behind_result.stdout:
|
|
try:
|
|
behind_count = int(behind_result.stdout.strip())
|
|
log_handler.log_debug(
|
|
f"Behind count: {behind_count}", func_name=func_name
|
|
)
|
|
except ValueError:
|
|
log_handler.log_error(
|
|
f"Failed to parse behind count output: '{behind_result.stdout.strip()}'",
|
|
func_name=func_name,
|
|
)
|
|
return None, None
|
|
else:
|
|
log_handler.log_warning(
|
|
f"Behind count command failed (RC={behind_result.returncode}). Stderr: {(behind_result.stderr or 'N/A').strip()}",
|
|
func_name=func_name,
|
|
)
|
|
return None, None
|
|
log_handler.log_info(
|
|
f"Ahead/Behind for '{local_branch}': Ahead={ahead_count}, Behind={behind_count}",
|
|
func_name=func_name,
|
|
)
|
|
return ahead_count, behind_count
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error getting ahead/behind count: {e}", func_name=func_name
|
|
)
|
|
return None, None
|
|
|
|
# --- Clone and Remote Branch Listing ---
|
|
def git_clone(
|
|
self, remote_url: str, local_directory_path: str
|
|
) -> subprocess.CompletedProcess:
|
|
func_name = "git_clone"
|
|
log_handler.log_info(
|
|
f"Cloning repository from '{remote_url}' into '{local_directory_path}'",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "clone", "--progress", remote_url, local_directory_path]
|
|
|
|
# --- MODIFICA PER DEBUG ---
|
|
# Riduciamo il timeout per forzare un errore invece di un blocco infinito
|
|
debug_timeout = 20 # secondi
|
|
log_handler.log_warning(
|
|
f"DEBUG: Using short timeout ({debug_timeout}s) for git clone.",
|
|
func_name=func_name
|
|
)
|
|
|
|
result = self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=".",
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.INFO,
|
|
timeout_seconds=debug_timeout, # Usa il timeout di debug
|
|
)
|
|
# --- FINE MODIFICA PER DEBUG ---
|
|
|
|
return result
|
|
|
|
def git_list_remote_branches(
|
|
self, working_directory: str, remote_name: str
|
|
) -> List[str]:
|
|
func_name = "git_list_remote_branches"
|
|
log_handler.log_debug(
|
|
f"Listing remote branches for '{remote_name}' via 'git branch -r' in '{working_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "branch", "-r"]
|
|
try:
|
|
result = self.log_and_execute(
|
|
cmd,
|
|
working_directory,
|
|
check=True,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.DEBUG,
|
|
)
|
|
remote_branches = []
|
|
if result.stdout:
|
|
prefix_to_match = f"{remote_name}/"
|
|
raw_lines = result.stdout.splitlines()
|
|
for line in raw_lines:
|
|
branch_name_raw = line.strip()
|
|
if "->" in branch_name_raw:
|
|
continue
|
|
if branch_name_raw.startswith(prefix_to_match):
|
|
remote_branches.append(branch_name_raw)
|
|
log_handler.log_info(
|
|
f"Found {len(remote_branches)} remote branches for '{remote_name}'.",
|
|
func_name=func_name,
|
|
)
|
|
log_handler.log_debug(
|
|
f"Filtered remote branches for '{remote_name}': {remote_branches}",
|
|
func_name=func_name,
|
|
)
|
|
return sorted(remote_branches)
|
|
else:
|
|
log_handler.log_info(
|
|
"No remote branches found in 'git branch -r' output.",
|
|
func_name=func_name,
|
|
)
|
|
return []
|
|
except GitCommandError as e:
|
|
log_handler.log_error(
|
|
f"Failed to list remote branches using 'git branch -r' for '{remote_name}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
return []
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error listing remote branches for '{remote_name}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
return []
|
|
|
|
def checkout_new_branch_from_remote(
|
|
self,
|
|
working_directory: str,
|
|
new_local_branch_name: str,
|
|
remote_tracking_branch_full_name: str,
|
|
) -> subprocess.CompletedProcess:
|
|
func_name = "checkout_new_branch_from_remote"
|
|
log_handler.log_info(
|
|
f"Checking out remote branch '{remote_tracking_branch_full_name}' as new local branch '{new_local_branch_name}' in '{working_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
cmd = [
|
|
"git",
|
|
"checkout",
|
|
"-b",
|
|
new_local_branch_name,
|
|
remote_tracking_branch_full_name,
|
|
]
|
|
result = self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.INFO,
|
|
)
|
|
return result
|
|
|
|
# --- Local Branch/Tag Deletion and Merging ---
|
|
def delete_local_branch(
|
|
self, working_directory: str, branch_name: str, force: bool = False
|
|
) -> subprocess.CompletedProcess:
|
|
func_name = "delete_local_branch"
|
|
delete_option = "-D" if force else "-d"
|
|
action_type = "Forcing delete" if force else "Deleting"
|
|
log_handler.log_info(
|
|
f"{action_type} local branch '{branch_name}' in '{working_directory}' (Option: {delete_option})",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "branch", delete_option, branch_name]
|
|
result = self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.INFO,
|
|
)
|
|
return result
|
|
|
|
def git_merge(
|
|
self,
|
|
working_directory: str,
|
|
branch_to_merge: str,
|
|
commit_msg: Optional[str] = None,
|
|
no_ff: bool = False,
|
|
) -> subprocess.CompletedProcess:
|
|
func_name = "git_merge"
|
|
log_handler.log_info(
|
|
f"Merging branch '{branch_to_merge}' into current branch in '{working_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "merge"]
|
|
if no_ff:
|
|
cmd.append("--no-ff")
|
|
log_handler.log_debug(
|
|
"Using --no-ff option for merge.", func_name=func_name
|
|
)
|
|
if commit_msg:
|
|
cmd.extend(["-m", commit_msg])
|
|
log_handler.log_debug(
|
|
f"Using custom merge commit message: '{commit_msg[:50]}...'",
|
|
func_name=func_name,
|
|
)
|
|
else:
|
|
cmd.append("--no-edit")
|
|
log_handler.log_debug(
|
|
"Using --no-edit option (default merge message).", func_name=func_name
|
|
)
|
|
cmd.append(branch_to_merge)
|
|
result = self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.INFO,
|
|
)
|
|
return result
|
|
|
|
# --- Diff Tree ---
|
|
def git_diff_tree(
|
|
self, working_directory: str, ref1: str, ref2: str
|
|
) -> Tuple[int, List[str]]:
|
|
func_name = "git_diff_tree"
|
|
log_handler.log_debug(
|
|
f"Comparing trees: '{ref1}' vs '{ref2}' in '{working_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "diff-tree", "--no-commit-id", "--name-status", "-r", ref1, ref2]
|
|
try:
|
|
result = self.log_and_execute(
|
|
cmd,
|
|
working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.DEBUG,
|
|
)
|
|
changed_files = []
|
|
if result.returncode == 0 and result.stdout:
|
|
lines = result.stdout.strip().splitlines()
|
|
for line in lines:
|
|
if line.strip():
|
|
changed_files.append(line.strip())
|
|
log_handler.log_info(
|
|
f"Found {len(changed_files)} differences between '{ref1}' and '{ref2}'.",
|
|
func_name=func_name,
|
|
)
|
|
elif result.returncode == 0 and not result.stdout:
|
|
log_handler.log_info(
|
|
f"No differences found between '{ref1}' and '{ref2}'.",
|
|
func_name=func_name,
|
|
)
|
|
else:
|
|
log_handler.log_error(
|
|
f"git diff-tree failed (RC={result.returncode}) between '{ref1}' and '{ref2}'. Stderr: {(result.stderr or 'N/A').strip()}",
|
|
func_name=func_name,
|
|
)
|
|
return result.returncode, changed_files
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error running git diff-tree: {e}", func_name=func_name
|
|
)
|
|
return -1, []
|
|
|
|
def git_reset_hard(self, working_directory: str, reference: str) -> None:
|
|
"""
|
|
Performs a hard reset to a given reference and cleans the working directory.
|
|
WARNING: This is a destructive operation.
|
|
|
|
Args:
|
|
working_directory (str): The path to the Git repository.
|
|
reference (str): The tag, branch, or commit hash to reset to.
|
|
|
|
Raises:
|
|
ValueError: If the reference is empty.
|
|
GitCommandError: If any of the Git commands fail.
|
|
"""
|
|
func_name: str = "git_reset_hard"
|
|
if not reference or reference.isspace():
|
|
raise ValueError("Reference for reset cannot be empty.")
|
|
|
|
log_handler.log_warning(
|
|
f"Performing DESTRUCTIVE operation: git reset --hard {reference}",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# 1. Esegui git reset --hard
|
|
reset_cmd: List[str] = ["git", "reset", "--hard", reference]
|
|
try:
|
|
self.log_and_execute(
|
|
reset_cmd, working_directory, check=True, log_output_level=logging.INFO
|
|
)
|
|
log_handler.log_info(
|
|
f"Hard reset to '{reference}' completed successfully.", func_name=func_name
|
|
)
|
|
except GitCommandError as e:
|
|
log_handler.log_error(
|
|
f"git reset --hard to '{reference}' FAILED.", func_name=func_name
|
|
)
|
|
# Rilancia l'eccezione per fermare l'operazione
|
|
raise e
|
|
|
|
log_handler.log_warning(
|
|
"Performing DESTRUCTIVE operation: git clean -fd", func_name=func_name
|
|
)
|
|
|
|
# 2. Esegui git clean -fd per rimuovere file non tracciati
|
|
clean_cmd: List[str] = ["git", "clean", "-fd"]
|
|
try:
|
|
self.log_and_execute(
|
|
clean_cmd, working_directory, check=True, log_output_level=logging.INFO
|
|
)
|
|
log_handler.log_info(
|
|
"Clean of untracked files completed successfully.", func_name=func_name
|
|
)
|
|
except GitCommandError as e:
|
|
log_handler.log_error("git clean -fdx FAILED.", func_name=func_name)
|
|
# Rilancia l'eccezione
|
|
raise e
|
|
|
|
# --- History Cleaner Methods ---
|
|
def list_all_historical_blobs(self, working_directory: str) -> List[Tuple[str, str]]:
|
|
"""
|
|
Lists all blobs (file contents) ever recorded in the repository history,
|
|
along with their original file paths.
|
|
|
|
Returns:
|
|
List[Tuple[str, str]]: A list of (blob_hash, file_path) tuples.
|
|
"""
|
|
func_name = "list_all_historical_blobs"
|
|
log_handler.log_debug(f"Listing all historical blobs in '{working_directory}'...", func_name=func_name)
|
|
|
|
cmd = ["git", "rev-list", "--all", "--objects"]
|
|
|
|
try:
|
|
result = self.log_and_execute(cmd, working_directory, check=True, log_output_level=logging.DEBUG)
|
|
|
|
blobs = []
|
|
lines = result.stdout.strip().splitlines()
|
|
for line in lines:
|
|
parts = line.split(maxsplit=1)
|
|
if len(parts) == 2:
|
|
blob_hash, file_path = parts
|
|
blobs.append((blob_hash, file_path))
|
|
|
|
log_handler.log_info(f"Found {len(blobs)} total object entries. These will be filtered.", func_name=func_name)
|
|
return blobs
|
|
except GitCommandError as e:
|
|
log_handler.log_error(f"Failed to list historical objects: {e}", func_name=func_name)
|
|
raise
|
|
|
|
def get_blob_size(self, working_directory: str, blob_hash: str) -> int:
|
|
"""
|
|
Gets the size of a git blob in bytes.
|
|
|
|
Args:
|
|
blob_hash (str): The SHA-1 hash of the blob.
|
|
|
|
Returns:
|
|
int: The size of the blob in bytes.
|
|
"""
|
|
cmd = ["git", "cat-file", "-s", blob_hash]
|
|
try:
|
|
result = self.log_and_execute(cmd, working_directory, check=True, log_output_level=logging.DEBUG)
|
|
return int(result.stdout.strip())
|
|
except (GitCommandError, ValueError) as e:
|
|
raise GitCommandError(f"Failed to get size for blob {blob_hash}: {e}", command=cmd) from e
|
|
|
|
def run_filter_repo(self, working_directory: str, paths_file: str) -> None:
|
|
"""
|
|
Executes 'git-filter-repo' to remove files specified in a file.
|
|
|
|
Args:
|
|
working_directory (str): Path to the repository.
|
|
paths_file (str): Path to a file containing one file path per line to remove.
|
|
"""
|
|
func_name = "run_filter_repo"
|
|
log_handler.log_warning(f"--- Running DESTRUCTIVE git-filter-repo in '{working_directory}' ---", func_name=func_name)
|
|
|
|
cmd = [
|
|
"git-filter-repo",
|
|
"--paths-from-file",
|
|
paths_file,
|
|
"--invert-paths",
|
|
"--force"
|
|
]
|
|
|
|
self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=True,
|
|
capture=True,
|
|
hide_console=False,
|
|
log_output_level=logging.INFO,
|
|
timeout_seconds=7200
|
|
)
|
|
|
|
def force_push_all(self, working_directory: str, remote_name: str) -> None:
|
|
"""Force pushes all local branches to the specified remote."""
|
|
func_name = "force_push_all"
|
|
log_handler.log_warning(f"Force pushing all branches to remote '{remote_name}'...", func_name=func_name)
|
|
cmd = ["git", "push", remote_name, "--all", "--force"]
|
|
self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=True,
|
|
capture=True,
|
|
hide_console=False
|
|
)
|
|
|
|
def force_push_tags(self, working_directory: str, remote_name: str) -> None:
|
|
"""Force pushes all local tags to the specified remote."""
|
|
func_name = "force_push_tags"
|
|
log_handler.log_warning(f"Force pushing all tags to remote '{remote_name}'...", func_name=func_name)
|
|
cmd = ["git", "push", remote_name, "--tags", "--force"]
|
|
self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=True,
|
|
capture=True,
|
|
hide_console=False
|
|
)
|
|
|
|
def set_branch_upstream(self, working_directory: str, branch_name: str, remote_name: str) -> None:
|
|
"""
|
|
Sets the upstream for a local branch to track a remote branch with the same name.
|
|
|
|
Args:
|
|
working_directory (str): The path to the Git repository.
|
|
branch_name (str): The local branch name.
|
|
remote_name (str): The name of the remote (e.g., 'origin').
|
|
|
|
Raises:
|
|
GitCommandError: If the command fails.
|
|
"""
|
|
func_name = "set_branch_upstream"
|
|
upstream_ref = f"{remote_name}/{branch_name}"
|
|
log_handler.log_debug(
|
|
f"Setting upstream for branch '{branch_name}' to '{upstream_ref}'",
|
|
func_name=func_name
|
|
)
|
|
cmd = ["git", "branch", f"--set-upstream-to={upstream_ref}", branch_name]
|
|
|
|
self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=True
|
|
)
|
|
|
|
# --- Submodule Management ---
|
|
|
|
def submodule_add(self, working_directory: str, repo_url: str, path: str, branch: Optional[str] = None) -> None:
|
|
"""Adds a new submodule to the repository, optionally tracking a specific branch."""
|
|
func_name = "submodule_add"
|
|
log_handler.log_info(f"Adding submodule from '{repo_url}' into '{path}'", func_name=func_name)
|
|
cmd = ["git", "submodule", "add"]
|
|
if branch:
|
|
# L'opzione -b dice a git di aggiungere la configurazione del branch in .gitmodules
|
|
log_handler.log_info(f"Configuring submodule to track branch: '{branch}'", func_name=func_name)
|
|
cmd.extend(["-b", branch])
|
|
cmd.extend(["--", repo_url, path])
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
|
|
def submodule_status(self, working_directory: str) -> List[Dict[str, str]]:
|
|
"""
|
|
Gets the status of all submodules, including details about local modifications.
|
|
"""
|
|
func_name = "submodule_status"
|
|
log_handler.log_debug(f"Getting submodule status in '{working_directory}'", func_name=func_name)
|
|
cmd = ["git", "submodule", "status", "--recursive"]
|
|
result = self.log_and_execute(cmd, working_directory, check=True, log_output_level=logging.DEBUG)
|
|
|
|
submodules = []
|
|
status_regex = re.compile(
|
|
r"^\s*([U +-])?"
|
|
r"([0-9a-fA-F]+)\s"
|
|
r"(.*?)"
|
|
r"(?:\s\((.*?)\))?\s*$"
|
|
)
|
|
|
|
for line in result.stdout.strip().splitlines():
|
|
line = line.strip()
|
|
if not line: continue
|
|
|
|
match = status_regex.match(line)
|
|
if match:
|
|
status_char, commit, path, description = match.groups()
|
|
final_status_char = status_char.strip() if status_char else ' '
|
|
|
|
tracked_branch = "N/A"
|
|
try:
|
|
# Diciamo esplicitamente a git config di leggere SOLO dal file .gitmodules
|
|
branch_cmd = [
|
|
"git", "config",
|
|
"--file", ".gitmodules",
|
|
f"submodule.{path.strip()}.branch"
|
|
]
|
|
branch_result = self.log_and_execute(branch_cmd, working_directory, check=False)
|
|
|
|
# check=False, quindi dobbiamo controllare il returncode.
|
|
# Se il comando ha successo (RC=0) e c'è un output, abbiamo trovato il branch.
|
|
if branch_result.returncode == 0 and branch_result.stdout:
|
|
tracked_branch = branch_result.stdout.strip()
|
|
except GitCommandError:
|
|
# Se il comando stesso fallisce per un'altra ragione, ignoriamo.
|
|
pass
|
|
|
|
# --- NUOVA LOGICA: ARRICCHIMENTO DATI ---
|
|
dirty_details = ""
|
|
# Se lo status indica una modifica ('+' per nuovo commit o 'U' per conflitto),
|
|
# o se il carattere è spazio (potrebbe avere modifiche non committate)
|
|
if final_status_char in ['+', 'U', ' ']:
|
|
submodule_full_path = os.path.join(working_directory, path)
|
|
if os.path.isdir(submodule_full_path):
|
|
try:
|
|
# Controlla se ci sono modifiche locali nel submodule
|
|
if self.git_status_has_changes(submodule_full_path):
|
|
dirty_details = "modified content"
|
|
except GitCommandError:
|
|
# Potrebbe fallire se la cartella non è un repo git (stato corrotto)
|
|
dirty_details = "error checking status"
|
|
# ------------------------------------
|
|
|
|
submodules.append({
|
|
"status_char": final_status_char,
|
|
"commit": commit.strip(),
|
|
"path": path.strip(),
|
|
"description": (description or "N/A").strip(),
|
|
"dirty_details": dirty_details ,
|
|
"tracked_branch": tracked_branch
|
|
})
|
|
else:
|
|
log_handler.log_warning(f"Could not parse submodule status line: '{line}'", func_name=func_name)
|
|
|
|
return submodules
|
|
|
|
def submodule_update(
|
|
self, working_directory: str, init: bool, remote: bool, recursive: bool, merge: bool
|
|
) -> None:
|
|
"""Updates submodules with flexible options."""
|
|
func_name = "submodule_update"
|
|
log_handler.log_info(f"Updating submodules in '{working_directory}' (init={init}, remote={remote}, recursive={recursive})", func_name=func_name)
|
|
cmd = ["git", "submodule", "update"]
|
|
if init:
|
|
cmd.append("--init")
|
|
if remote:
|
|
cmd.append("--remote")
|
|
if recursive:
|
|
cmd.append("--recursive")
|
|
if merge:
|
|
cmd.append("--merge")
|
|
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
|
|
def submodule_deinit(self, working_directory: str, path: str) -> None:
|
|
"""De-initializes a submodule, a step in removing it."""
|
|
func_name = "submodule_deinit"
|
|
log_handler.log_info(f"De-initializing submodule at '{path}'", func_name=func_name)
|
|
# Use -f to force deinit even with local changes
|
|
cmd = ["git", "submodule", "deinit", "-f", "--", path]
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
|
|
def remove_from_index(self, working_directory: str, path: str) -> None:
|
|
"""
|
|
Forcibly removes a path from the Git index (staging area) only.
|
|
This does not delete the file from the working directory.
|
|
|
|
Args:
|
|
working_directory (str): Path to the repository.
|
|
path (str): The relative path to the item to remove from the index.
|
|
|
|
Raises:
|
|
GitCommandError: If the 'git rm --cached' command fails.
|
|
"""
|
|
func_name = "remove_from_index"
|
|
log_handler.log_info(
|
|
f"Forcibly removing path from index (cached): '{path}' in '{working_directory}'",
|
|
func_name=func_name
|
|
)
|
|
# Use -f to force removal even if there are local modifications
|
|
cmd = ["git", "rm", "--cached", "-f", path]
|
|
try:
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
log_handler.log_info(f"Path '{path}' removed from index successfully.", func_name=func_name)
|
|
except GitCommandError as e:
|
|
log_handler.log_error(f"Failed to 'git rm --cached {path}': {e}", func_name=func_name)
|
|
raise
|
|
|
|
def remove_config_section(self, working_directory: str, section_name: str) -> None:
|
|
"""
|
|
Removes a section from the local .git/config file.
|
|
|
|
Args:
|
|
working_directory (str): Path to the repository.
|
|
section_name (str): The full name of the section to remove (e.g., 'submodule.libs/path').
|
|
|
|
Raises:
|
|
GitCommandError: If the command fails with an unexpected error.
|
|
"""
|
|
func_name = "remove_config_section"
|
|
log_handler.log_info(
|
|
f"Removing config section '{section_name}' from local .git/config",
|
|
func_name=func_name
|
|
)
|
|
cmd = ["git", "config", "-f", ".git/config", "--remove-section", section_name]
|
|
try:
|
|
result = self.log_and_execute(cmd, working_directory, check=False)
|
|
|
|
# Git returns non-zero if the section doesn't exist. This is not an error for a clean-up operation.
|
|
# We only raise an error if the command fails for a different reason.
|
|
if result.returncode != 0 and "no such section" not in (result.stderr or "").lower():
|
|
raise GitCommandError(
|
|
f"Git config command failed with an unexpected error (RC {result.returncode})",
|
|
command=cmd,
|
|
stderr=result.stderr
|
|
)
|
|
|
|
log_handler.log_info(f"Config section '{section_name}' removed (if it existed).", func_name=func_name)
|
|
except GitCommandError as e:
|
|
log_handler.log_error(f"Failed to remove config section '{section_name}': {e}", func_name=func_name)
|
|
raise
|
|
|
|
def get_registered_submodules(self, working_directory: str) -> Dict[str, str]:
|
|
"""
|
|
Retrieves the list of registered submodules and their URLs from the .gitmodules file.
|
|
This reads the configuration, not the on-disk status.
|
|
|
|
Args:
|
|
working_directory (str): Path to the repository.
|
|
|
|
Returns:
|
|
A dictionary mapping submodule paths to their URLs.
|
|
"""
|
|
func_name = "get_registered_submodules"
|
|
log_handler.log_debug(
|
|
f"Getting registered submodules from config in '{working_directory}'",
|
|
func_name=func_name
|
|
)
|
|
submodules = {}
|
|
# The command 'git config --file .gitmodules --get-regexp path' is perfect for this.
|
|
# It reads .gitmodules and finds all entries that have a 'path' property.
|
|
cmd = ["git", "config", "--file", ".gitmodules", "--get-regexp", r"submodule\..*\.path"]
|
|
|
|
try:
|
|
result = self.log_and_execute(cmd, working_directory, check=False)
|
|
|
|
# If .gitmodules doesn't exist, the command will fail, which is okay.
|
|
if result.returncode != 0 or not result.stdout:
|
|
log_handler.log_info("No .gitmodules file found or it's empty. No registered submodules.", func_name=func_name)
|
|
return {}
|
|
|
|
# Output is like: submodule.libs/log-handler-lib.path libs/log-handler-lib
|
|
for line in result.stdout.strip().splitlines():
|
|
# Extract the section name (e.g., submodule.libs/log-handler-lib) and the path
|
|
key, path = line.split(maxsplit=1)
|
|
submodule_name = key.replace("submodule.", "").replace(".path", "")
|
|
|
|
# Now get the URL for this submodule
|
|
url_cmd = ["git", "config", "--file", ".gitmodules", f"submodule.{submodule_name}.url"]
|
|
url_result = self.log_and_execute(url_cmd, working_directory, check=True)
|
|
url = url_result.stdout.strip()
|
|
|
|
submodules[path] = url
|
|
|
|
log_handler.log_info(f"Found {len(submodules)} registered submodules: {list(submodules.keys())}", func_name=func_name)
|
|
return submodules
|
|
|
|
except GitCommandError as e:
|
|
# This can happen if .gitmodules is malformed.
|
|
log_handler.log_error(f"Failed to read registered submodules: {e}", func_name=func_name)
|
|
return {} # Return empty on error
|
|
|
|
def discover_submodules_in_any_state(self, working_directory: str) -> List[str]:
|
|
"""
|
|
Discovers submodule paths by looking in ALL possible Git locations:
|
|
.gitmodules (on disk and in index), .git/config, and the index itself
|
|
for gitlink entries. This is the most robust way to find submodules
|
|
in any consistent or inconsistent state.
|
|
|
|
Args:
|
|
working_directory (str): Path to the repository.
|
|
|
|
Returns:
|
|
A list of unique submodule paths found.
|
|
"""
|
|
func_name = "discover_submodules_in_any_state"
|
|
log_handler.log_debug(
|
|
f"Discovering submodules in any state in '{working_directory}'",
|
|
func_name=func_name
|
|
)
|
|
|
|
found_paths = set()
|
|
|
|
# Method 1: Look in the on-disk .gitmodules file
|
|
try:
|
|
cmd_gitmodules = ["git", "config", "--file", ".gitmodules", "--get-regexp", r"submodule\..*\.path"]
|
|
result_gitmodules = self.log_and_execute(cmd_gitmodules, working_directory, check=False)
|
|
if result_gitmodules.returncode == 0 and result_gitmodules.stdout:
|
|
for line in result_gitmodules.stdout.strip().splitlines():
|
|
path = line.split(maxsplit=1)[1]
|
|
found_paths.add(path)
|
|
except GitCommandError as e:
|
|
log_handler.log_warning(f"Could not read on-disk .gitmodules: {e}", func_name=func_name)
|
|
|
|
# Method 2: Look in the version of .gitmodules in the Git index
|
|
try:
|
|
gitmodules_content_from_index = self.get_file_content_from_ref(
|
|
working_directory, file_path=".gitmodules", ref=":"
|
|
)
|
|
if gitmodules_content_from_index:
|
|
log_handler.log_info("Found .gitmodules in Git index. Parsing its content.", func_name=func_name)
|
|
for match in re.finditer(r"^\s*path\s*=\s*(.*)$", gitmodules_content_from_index, re.MULTILINE):
|
|
path = match.group(1).strip()
|
|
if path:
|
|
found_paths.add(path)
|
|
except Exception as e_index:
|
|
log_handler.log_error(f"Error parsing .gitmodules content from index: {e_index}", func_name=func_name)
|
|
|
|
# Method 3: Look in .git/config for submodule sections
|
|
try:
|
|
cmd_config = ["git", "config", "-f", ".git/config", "--get-regexp", r"submodule\..*\.path"]
|
|
result_config = self.log_and_execute(cmd_config, working_directory, check=False)
|
|
if result_config.returncode == 0 and result_config.stdout:
|
|
for line in result_config.stdout.strip().splitlines():
|
|
path = line.split(maxsplit=1)[1]
|
|
found_paths.add(path)
|
|
except GitCommandError as e:
|
|
log_handler.log_warning(f"Could not read submodule paths from .git/config: {e}", func_name=func_name)
|
|
|
|
# Method 4: Look in the index for gitlink entries (mode 160000)
|
|
# This is the definitive way to find correctly registered submodules.
|
|
try:
|
|
cmd_index = ["git", "ls-files", "--stage"]
|
|
result_index = self.log_and_execute(cmd_index, working_directory, check=True)
|
|
for line in result_index.stdout.strip().splitlines():
|
|
parts = line.split()
|
|
if len(parts) >= 4 and parts[0] == "160000":
|
|
path = parts[3]
|
|
found_paths.add(path)
|
|
except GitCommandError as e:
|
|
log_handler.log_error(f"Failed to read submodules from git index: {e}", func_name=func_name)
|
|
|
|
sorted_paths = sorted(list(found_paths))
|
|
log_handler.log_info(f"Discovered {len(sorted_paths)} unique submodule paths: {sorted_paths}", func_name=func_name)
|
|
return sorted_paths
|
|
|
|
def git_clean(self, working_directory: str) -> None:
|
|
"""
|
|
Cleans the working tree by recursively removing untracked files
|
|
from the current directory.
|
|
|
|
Args:
|
|
working_directory (str): The directory to clean.
|
|
|
|
Raises:
|
|
GitCommandError: If the 'git clean' command fails.
|
|
"""
|
|
func_name = "git_clean"
|
|
log_handler.log_warning(
|
|
f"Performing 'git clean -fd' in '{working_directory}'. This will delete untracked files.",
|
|
func_name=func_name
|
|
)
|
|
# -f (force) is required. -d removes untracked directories.
|
|
cmd = ["git", "clean", "-fd"]
|
|
try:
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
log_handler.log_info(f"Successfully cleaned working directory: '{working_directory}'", func_name=func_name)
|
|
except GitCommandError as e:
|
|
log_handler.log_error(f"Failed to clean directory '{working_directory}': {e}", func_name=func_name)
|
|
raise
|
|
|
|
def get_diff_files(self, working_directory: str, ref1: str, ref2: Optional[str]) -> List[str]:
|
|
"""
|
|
Gets a list of changed files between two references using 'git diff'.
|
|
If ref2 is None, compares ref1 to the working directory.
|
|
"""
|
|
func_name = "get_diff_files"
|
|
log_handler.log_debug(
|
|
f"Getting diff files between '{ref1}' and '{ref2 or 'Working Dir'}'",
|
|
func_name=func_name
|
|
)
|
|
cmd = ["git", "diff", "--name-status", ref1]
|
|
if ref2:
|
|
cmd.append(ref2)
|
|
|
|
result = self.log_and_execute(cmd, working_directory, check=True, log_output_level=logging.DEBUG)
|
|
return [line.strip() for line in result.stdout.strip().splitlines() if line.strip()]
|
|
|
|
def fetch_in_directory(self, working_directory: str) -> None:
|
|
"""
|
|
Runs 'git fetch' in a specific directory. Used for submodules.
|
|
"""
|
|
func_name = "fetch_in_directory"
|
|
log_handler.log_info(f"Fetching remote updates in '{working_directory}'", func_name=func_name)
|
|
cmd = ["git", "fetch"]
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
|
|
def get_commit_count_between(self, working_directory: str, ref1: str, ref2: str) -> int:
|
|
"""
|
|
Counts the number of commits between two references.
|
|
Typically used to find how many commits ahead/behind a branch is.
|
|
Example: get_commit_count_between(path, "HEAD", "origin/master") -> commits behind.
|
|
"""
|
|
func_name = "get_commit_count_between"
|
|
cmd = ["git", "rev-list", "--count", f"{ref1}..{ref2}"]
|
|
try:
|
|
result = self.log_and_execute(cmd, working_directory, check=True)
|
|
return int(result.stdout.strip())
|
|
except (GitCommandError, ValueError) as e:
|
|
log_handler.log_error(f"Could not count commits between '{ref1}' and '{ref2}': {e}", func_name=func_name)
|
|
return -1 # Ritorna -1 per indicare un errore |