1180 lines
56 KiB
Python
1180 lines
56 KiB
Python
# git_commands.py
|
|
import os
|
|
import subprocess
|
|
import logging
|
|
import re # Modulo per espressioni regolari (usato per validazione nomi)
|
|
|
|
|
|
# --- Definizione Eccezione Personalizzata ---
|
|
class GitCommandError(Exception):
|
|
"""
|
|
Custom exception for handling Git command errors.
|
|
Includes the original command and stderr details if available.
|
|
"""
|
|
|
|
def __init__(self, message, command=None, stderr=None):
|
|
"""
|
|
Initialize the GitCommandError.
|
|
|
|
Args:
|
|
message (str): The main error message.
|
|
command (list, optional): The command list that caused the error.
|
|
stderr (str, optional): The standard error output from the command.
|
|
"""
|
|
super().__init__(message)
|
|
self.command = command
|
|
self.stderr = stderr
|
|
|
|
def __str__(self):
|
|
"""Return a formatted string representation including command details."""
|
|
base_message = super().__str__()
|
|
details = []
|
|
if self.command:
|
|
# Safely convert all command parts to string for joining
|
|
safe_command = [str(part) for part in self.command]
|
|
command_str = " ".join(safe_command)
|
|
details.append(f"Command: '{command_str}'")
|
|
if self.stderr:
|
|
# Add stripped stderr if available
|
|
stderr_str = self.stderr.strip()
|
|
if stderr_str: # Only add if stderr has content
|
|
details.append(f"Stderr: {stderr_str}")
|
|
|
|
# Combine base message with details if details exist
|
|
if details:
|
|
details_str = "; ".join(details)
|
|
return f"{base_message} ({details_str})"
|
|
else:
|
|
return base_message
|
|
|
|
|
|
# --- Classe Principale per Comandi Git ---
|
|
class GitCommands:
|
|
"""
|
|
Manages Git command execution, logging, and error handling.
|
|
Decoupled from the GUI, operates on provided directory paths.
|
|
Includes functionalities for core operations, tags, and branches.
|
|
"""
|
|
|
|
def __init__(self, logger):
|
|
"""
|
|
Initializes the GitCommands class with a logger instance.
|
|
|
|
Args:
|
|
logger (logging.Logger): Instance for logging messages.
|
|
|
|
Raises:
|
|
ValueError: If the provided logger is not a valid logging.Logger.
|
|
"""
|
|
if not isinstance(logger, logging.Logger):
|
|
raise ValueError("A valid logging.Logger instance is required.")
|
|
self.logger = logger
|
|
|
|
def log_and_execute(self, command, working_directory, check=True, log_output_level=logging.INFO):
|
|
"""
|
|
Executes a shell command, logs details, handles errors, and controls output logging level.
|
|
|
|
Args:
|
|
command (list): Command and arguments as a list of strings.
|
|
working_directory (str): The directory to execute the command in.
|
|
check (bool, optional): If True, raises GitCommandError on non-zero exit. Defaults to True.
|
|
log_output_level (int, optional): Logging level for stdout/stderr on success.
|
|
Defaults to logging.INFO. Use logging.DEBUG
|
|
to hide noisy command output from INFO logs.
|
|
|
|
Returns:
|
|
subprocess.CompletedProcess: Result object.
|
|
|
|
Raises:
|
|
GitCommandError, ValueError, FileNotFoundError, PermissionError.
|
|
"""
|
|
# --- FINE MODIFICA ---
|
|
safe_command = [str(part) for part in command]
|
|
command_str = " ".join(safe_command)
|
|
# Log command execution always at DEBUG level
|
|
self.logger.debug(f"Executing in '{working_directory}': {command_str}")
|
|
|
|
# --- Validate Working Directory ---
|
|
# (Logica validazione working_directory invariata)
|
|
if not working_directory:
|
|
msg = "Working directory cannot be None or empty."
|
|
self.logger.error(msg)
|
|
raise ValueError(msg)
|
|
# Use '.' as a special case for current working directory if needed by caller
|
|
if working_directory == ".":
|
|
cwd = os.getcwd()
|
|
else:
|
|
abs_path = os.path.abspath(working_directory)
|
|
if not os.path.isdir(abs_path):
|
|
msg = f"Working directory does not exist or is not a directory: {abs_path}"
|
|
self.logger.error(msg)
|
|
raise GitCommandError(msg, command=safe_command)
|
|
cwd = abs_path
|
|
# self.logger.debug(f"Effective Working Directory: {cwd}") # Already logged above
|
|
|
|
# --- Execute Command ---
|
|
try:
|
|
# (Logica startupinfo/creationflags invariata)
|
|
startupinfo = None
|
|
creationflags = 0
|
|
if os.name == "nt":
|
|
startupinfo = subprocess.STARTUPINFO()
|
|
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
|
startupinfo.wShowWindow = subprocess.SW_HIDE
|
|
|
|
result = subprocess.run(
|
|
safe_command,
|
|
cwd=cwd,
|
|
capture_output=True,
|
|
text=True,
|
|
check=check,
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
startupinfo=startupinfo,
|
|
creationflags=creationflags,
|
|
)
|
|
|
|
# Log command output based on specified level for success
|
|
# Error output is always logged at ERROR level below
|
|
if check or result.returncode == 0:
|
|
# --- MODIFICA: Usa log_output_level per l'output di successo ---
|
|
# Only log stdout/stderr if the requested level is met by the logger config
|
|
if self.logger.isEnabledFor(log_output_level):
|
|
stdout_log = result.stdout.strip() if result.stdout else "<no stdout>"
|
|
stderr_log = result.stderr.strip() if result.stderr else "<no stderr>"
|
|
# Use the passed level for logging the output
|
|
self.logger.log(
|
|
log_output_level,
|
|
f"Command successful. Output:\n"
|
|
f"--- stdout ---\n{stdout_log}\n"
|
|
f"--- stderr ---\n{stderr_log}\n---"
|
|
)
|
|
else:
|
|
# Log minimal success message if output level is suppressed
|
|
self.logger.debug(f"Command successful (output logging suppressed by level).")
|
|
# --- FINE MODIFICA ---
|
|
|
|
return result
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
# (Gestione CalledProcessError invariata - logga sempre a ERROR)
|
|
stderr_err = e.stderr.strip() if e.stderr else "<no stderr>"
|
|
stdout_err = e.stdout.strip() if e.stdout else "<no stdout>"
|
|
error_log_msg = (
|
|
f"Command failed with return code {e.returncode} in '{cwd}'.\n"
|
|
f"--- command ---\n{command_str}\n"
|
|
f"--- stderr ---\n{stderr_err}\n"
|
|
f"--- stdout ---\n{stdout_err}\n---"
|
|
)
|
|
self.logger.error(error_log_msg)
|
|
raise GitCommandError(
|
|
f"Git command failed in '{cwd}'.", command=safe_command, stderr=e.stderr
|
|
) from e
|
|
|
|
except FileNotFoundError as e:
|
|
# (Gestione FileNotFoundError invariata)
|
|
error_msg = f"Command not found: '{safe_command[0]}'. Is Git installed and in system PATH? (Working directory: '{cwd}')"
|
|
self.logger.error(error_msg)
|
|
self.logger.debug(f"FileNotFoundError details: {e}")
|
|
raise GitCommandError(error_msg, command=safe_command) from e
|
|
|
|
except PermissionError as e:
|
|
# (Gestione PermissionError invariata)
|
|
error_msg = f"Permission denied executing command in '{cwd}'."
|
|
self.logger.error(error_msg)
|
|
self.logger.debug(f"PermissionError details: {e}")
|
|
raise GitCommandError(error_msg, command=safe_command, stderr=str(e)) from e
|
|
|
|
except Exception as e:
|
|
# (Gestione Exception generica invariata)
|
|
self.logger.exception(f"Unexpected error executing command in '{cwd}': {e}")
|
|
raise GitCommandError(
|
|
f"Unexpected command execution error: {e}", command=safe_command
|
|
) from e
|
|
|
|
# --- Core Repo Operations ---
|
|
def prepare_svn_for_git(self, working_directory):
|
|
"""
|
|
Prepares a directory for Git use: initializes if needed, ensures
|
|
.gitignore ignores '.svn'.
|
|
|
|
Args:
|
|
working_directory (str): Path to the target directory.
|
|
|
|
Raises:
|
|
GitCommandError, ValueError: If operations fail.
|
|
"""
|
|
self.logger.info(f"Preparing directory for Git: '{working_directory}'")
|
|
# Basic validation
|
|
if not working_directory:
|
|
raise ValueError("Working directory cannot be None or empty.")
|
|
if not os.path.isdir(working_directory):
|
|
raise GitCommandError(f"Directory does not exist: {working_directory}")
|
|
|
|
# Define relevant paths
|
|
gitignore_path = os.path.join(working_directory, ".gitignore")
|
|
git_dir_path = os.path.join(working_directory, ".git")
|
|
|
|
# 1. Initialize Git repository if '.git' directory doesn't exist
|
|
if not os.path.exists(git_dir_path):
|
|
self.logger.info("No existing Git repository found. Initializing...")
|
|
try:
|
|
init_command = ["git", "init"]
|
|
# Use check=True to ensure init succeeds
|
|
self.log_and_execute(init_command, working_directory, check=True)
|
|
self.logger.info("Git repository initialized successfully.")
|
|
except (GitCommandError, ValueError) as e:
|
|
# Handle specific errors from git init
|
|
self.logger.error(f"Failed to initialize Git repository: {e}")
|
|
raise # Re-raise to signal preparation failure
|
|
except Exception as e:
|
|
# Handle unexpected errors during init
|
|
self.logger.exception(f"Unexpected error initializing repository: {e}")
|
|
raise GitCommandError(f"Unexpected init error: {e}") from e
|
|
else:
|
|
# Repository already exists
|
|
self.logger.info("Git repository already exists. Skipping initialization.")
|
|
|
|
# 2. Ensure .gitignore handles '.svn'
|
|
self.logger.debug(f"Checking/updating .gitignore file: {gitignore_path}")
|
|
try:
|
|
svn_ignore_entry = ".svn" # Entry to ensure is ignored
|
|
needs_write = False # Flag to track if file needs modification
|
|
content_to_write = "" # Content to add if needed
|
|
|
|
if not os.path.exists(gitignore_path):
|
|
# .gitignore doesn't exist, create it with the entry
|
|
self.logger.info(
|
|
"'.gitignore' file not found. Creating with .svn entry."
|
|
)
|
|
content_to_write = f"{svn_ignore_entry}\n"
|
|
needs_write = True
|
|
else:
|
|
# .gitignore exists, check if '.svn' is already ignored
|
|
try:
|
|
with open(gitignore_path, "r", encoding="utf-8") as f:
|
|
lines = f.readlines()
|
|
# Check ignores .svn directory or files within it
|
|
is_ignored = any(
|
|
line.strip() == svn_ignore_entry
|
|
or line.strip().startswith(svn_ignore_entry + "/")
|
|
for line in lines
|
|
)
|
|
if not is_ignored:
|
|
# Not ignored, prepare to append the entry
|
|
self.logger.info(
|
|
f"'{svn_ignore_entry}' not found. Appending..."
|
|
)
|
|
current_content = "".join(lines)
|
|
# Ensure it's added on a new line
|
|
if not current_content.endswith("\n"):
|
|
content_to_write = f"\n{svn_ignore_entry}\n"
|
|
else:
|
|
content_to_write = f"{svn_ignore_entry}\n"
|
|
needs_write = True # Mark file for update
|
|
else:
|
|
# Already ignored, no action needed
|
|
self.logger.info(f"'{svn_ignore_entry}' entry already present.")
|
|
except IOError as e:
|
|
# Handle error reading the existing file
|
|
self.logger.warning(
|
|
f"Could not read existing '.gitignore': {e}. "
|
|
f"Cannot verify {svn_ignore_entry} entry."
|
|
)
|
|
# Decide on recovery: maybe try appending anyway? Risky.
|
|
# For safety, let's not modify if we can't read it.
|
|
needs_write = False
|
|
|
|
# Write to file only if modification is necessary
|
|
if needs_write:
|
|
# Use 'a'ppend mode if file exists, 'w'rite mode if new
|
|
mode = "a" if os.path.exists(gitignore_path) else "w"
|
|
try:
|
|
# Write with consistent newline handling
|
|
with open(
|
|
gitignore_path, mode, encoding="utf-8", newline="\n"
|
|
) as f:
|
|
f.write(content_to_write)
|
|
self.logger.info("Updated '.gitignore' file successfully.")
|
|
except IOError as e:
|
|
# Handle error during write/append
|
|
self.logger.error(f"Error writing to '.gitignore': {e}")
|
|
raise GitCommandError(f"Failed to update .gitignore: {e}") from e
|
|
|
|
except IOError as e: # Catch errors from os.path.exists or final write attempt
|
|
self.logger.error(f"Error accessing or writing '.gitignore': {e}")
|
|
raise GitCommandError(f"File I/O error for .gitignore: {e}") from e
|
|
except Exception as e: # Catch other unexpected errors
|
|
self.logger.exception(f"Unexpected error managing '.gitignore': {e}")
|
|
raise GitCommandError(f"Unexpected error with .gitignore: {e}") from e
|
|
|
|
self.logger.info(f"Directory preparation complete for '{working_directory}'.")
|
|
|
|
def create_git_bundle(self, working_directory, bundle_path):
|
|
"""Creates a Git bundle file containing all refs."""
|
|
# Normalize path for cross-OS compatibility and command usage
|
|
normalized_bundle_path = os.path.normpath(bundle_path)
|
|
normalized_bundle_path = normalized_bundle_path.replace("\\", "/")
|
|
command = ["git", "bundle", "create", normalized_bundle_path, "--all"]
|
|
self.logger.info(f"Attempting to create bundle: {normalized_bundle_path}")
|
|
|
|
try:
|
|
# Use check=False to manually handle 'empty bundle' warning
|
|
result = self.log_and_execute(command, working_directory, check=False)
|
|
|
|
if result.returncode != 0:
|
|
# Command failed, check stderr for specific non-fatal cases
|
|
stderr_lower = result.stderr.lower() if result.stderr else ""
|
|
if "refusing to create empty bundle" in stderr_lower:
|
|
# This is a warning, not an error for our purposes
|
|
self.logger.warning(
|
|
f"Bundle creation skipped: No commits in '{working_directory}'."
|
|
)
|
|
# Do not raise error, let caller handle info
|
|
else:
|
|
# An actual error occurred
|
|
error_msg = f"Bundle command failed (code {result.returncode})."
|
|
raise GitCommandError(
|
|
error_msg, command=command, stderr=result.stderr
|
|
)
|
|
# Check file existence and size even on success (paranoid check)
|
|
elif (
|
|
not os.path.exists(normalized_bundle_path)
|
|
or os.path.getsize(normalized_bundle_path) == 0
|
|
):
|
|
self.logger.warning(f"Bundle cmd success, but file missing/empty.")
|
|
else:
|
|
self.logger.info(
|
|
f"Bundle created successfully: '{normalized_bundle_path}'."
|
|
)
|
|
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Failed create bundle for '{working_directory}': {e}")
|
|
raise
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected bundle creation error: {e}")
|
|
raise GitCommandError(
|
|
f"Unexpected bundle error: {e}", command=command
|
|
) from e
|
|
|
|
def fetch_from_git_bundle(self, working_directory, bundle_path):
|
|
"""Fetches from a bundle file and merges fetched refs."""
|
|
normalized_bundle_path = os.path.normpath(bundle_path).replace("\\", "/")
|
|
self.logger.info(
|
|
f"Fetching from '{normalized_bundle_path}' into '{working_directory}'"
|
|
)
|
|
|
|
fetch_command = ["git", "fetch", normalized_bundle_path]
|
|
# Merge FETCH_HEAD, creating merge commit if necessary (--no-ff)
|
|
merge_command = ["git", "merge", "FETCH_HEAD", "--no-ff"]
|
|
|
|
try:
|
|
# 1. Fetch
|
|
self.logger.debug("Executing fetch...")
|
|
self.log_and_execute(fetch_command, working_directory, check=True)
|
|
self.logger.info("Fetch successful.")
|
|
|
|
# 2. Merge
|
|
self.logger.debug("Executing merge...")
|
|
# Use check=False as conflicts are expected failures
|
|
merge_result = self.log_and_execute(
|
|
merge_command, working_directory, check=False
|
|
)
|
|
|
|
# Analyze merge result
|
|
stdout_log = merge_result.stdout.strip() if merge_result.stdout else ""
|
|
stderr_log = merge_result.stderr.strip() if merge_result.stderr else ""
|
|
|
|
if merge_result.returncode == 0:
|
|
# Success or already up-to-date
|
|
if "already up to date" in stdout_log.lower():
|
|
self.logger.info("Repository already up-to-date.")
|
|
else:
|
|
self.logger.info("Merge successful.")
|
|
else:
|
|
# Merge failed, likely conflicts
|
|
output_lower = (stderr_log + stdout_log).lower()
|
|
if "conflict" in output_lower:
|
|
conflict_msg = (
|
|
f"Merge conflict occurred. Resolve manually in "
|
|
f"'{working_directory}' and commit."
|
|
)
|
|
self.logger.error(conflict_msg)
|
|
# Raise specific error for caller to handle
|
|
raise GitCommandError(
|
|
conflict_msg, command=merge_command, stderr=merge_result.stderr
|
|
)
|
|
else:
|
|
# Other merge error
|
|
error_msg = f"Merge failed (code {merge_result.returncode})."
|
|
self.logger.error(error_msg)
|
|
raise GitCommandError(
|
|
error_msg, command=merge_command, stderr=merge_result.stderr
|
|
)
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Fetch/merge error for '{working_directory}': {e}")
|
|
raise
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected fetch/merge error: {e}")
|
|
raise GitCommandError(f"Unexpected fetch/merge error: {e}") from e
|
|
|
|
# --- Commit and Status ---
|
|
def git_commit(self, working_directory, message):
|
|
"""Stages all changes and commits them with the given message."""
|
|
self.logger.info(
|
|
f"Attempting commit in '{working_directory}' with msg: '{message}'"
|
|
)
|
|
if not message:
|
|
# Disallow empty commit messages programmatically
|
|
raise ValueError("Commit message cannot be empty.")
|
|
|
|
try:
|
|
# 1. Stage all changes using 'git add .'
|
|
add_command = ["git", "add", "."]
|
|
self.logger.debug("Staging all changes...")
|
|
# Use check=True to ensure staging succeeds
|
|
self.log_and_execute(add_command, working_directory, check=True)
|
|
self.logger.debug("Staging successful.")
|
|
|
|
# 2. Commit staged changes
|
|
commit_command = ["git", "commit", "-m", message]
|
|
self.logger.debug("Attempting commit...")
|
|
# Use check=False to handle 'nothing to commit' case gracefully
|
|
result = self.log_and_execute(
|
|
commit_command, working_directory, check=False
|
|
)
|
|
|
|
# Analyze commit result
|
|
stdout_lower = result.stdout.lower() if result.stdout else ""
|
|
stderr_lower = result.stderr.lower() if result.stderr else ""
|
|
|
|
if result.returncode == 0:
|
|
# Standard successful commit
|
|
self.logger.info("Commit successful.")
|
|
return True
|
|
# Check for various 'nothing to commit' messages
|
|
elif (
|
|
"nothing to commit" in stdout_lower
|
|
or "no changes added to commit" in stdout_lower
|
|
or "nothing added to commit" in stdout_lower
|
|
or (result.returncode == 1 and not stderr_lower and not stdout_lower)
|
|
):
|
|
# Common non-error case: repo was clean or add didn't stage anything new
|
|
self.logger.info("No changes were available to commit.")
|
|
return False # Indicate no commit was made
|
|
else:
|
|
# An unexpected error occurred during the commit
|
|
error_msg = (
|
|
f"Commit cmd failed unexpectedly (code {result.returncode})."
|
|
)
|
|
# Details likely logged by log_and_execute already
|
|
raise GitCommandError(
|
|
error_msg, command=commit_command, stderr=result.stderr
|
|
)
|
|
|
|
except (GitCommandError, ValueError) as e:
|
|
# Catch errors from staging or commit steps, or validation
|
|
self.logger.error(f"Error during staging or commit: {e}")
|
|
raise # Re-raise the specific error
|
|
except Exception as e:
|
|
# Catch any other unexpected error
|
|
self.logger.exception(f"Unexpected error during commit process: {e}")
|
|
raise GitCommandError(f"Unexpected commit error: {e}") from e
|
|
|
|
def git_status_has_changes(self, working_directory):
|
|
"""Checks if the Git repository has uncommitted changes."""
|
|
self.logger.debug(f"Checking Git status in '{working_directory}'...")
|
|
try:
|
|
# Use '--porcelain' for reliable script parsing
|
|
status_command = ["git", "status", "--porcelain"]
|
|
# Use check=True to ensure the status command itself runs correctly
|
|
result = self.log_and_execute(status_command, working_directory, check=True)
|
|
# If porcelain output is non-empty, there are changes
|
|
has_changes = bool(result.stdout.strip())
|
|
self.logger.debug(f"Status check complete. Has changes: {has_changes}")
|
|
return has_changes
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Status check error: {e}")
|
|
raise # Re-raise specific error
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected status check error: {e}")
|
|
raise GitCommandError(f"Unexpected status error: {e}") from e
|
|
|
|
# --- Tag Management ---
|
|
def list_tags(self, working_directory):
|
|
"""Lists tags with subjects, sorted by creation date (desc)."""
|
|
self.logger.info(f"Listing tags with subjects in '{working_directory}'...")
|
|
# Format: Tagname<TAB>SubjectLine
|
|
fmt = "%(refname:short)%09%(contents:subject)"
|
|
cmd = ["git", "tag", "--list", f"--format={fmt}", "--sort=-creatordate"]
|
|
tags_data = [] # List to store (name, subject) tuples
|
|
try:
|
|
result = self.log_and_execute(cmd, working_directory, check=True)
|
|
for line in result.stdout.splitlines():
|
|
line_strip = line.strip()
|
|
if line_strip:
|
|
# Split only on the first tab to handle subjects with tabs
|
|
parts = line_strip.split("\t", 1)
|
|
name = parts[0].strip()
|
|
# Provide default if subject is missing
|
|
subject = parts[1].strip() if len(parts) > 1 else "(No subject)"
|
|
tags_data.append((name, subject))
|
|
self.logger.info(f"Found {len(tags_data)} tags.")
|
|
self.logger.debug(f"Tags found: {tags_data}")
|
|
return tags_data
|
|
except (GitCommandError, ValueError) as e:
|
|
# Log error but return empty list for graceful GUI handling
|
|
self.logger.error(f"Error listing tags: {e}")
|
|
return []
|
|
except Exception as e:
|
|
# Log unexpected errors
|
|
self.logger.exception(f"Unexpected error listing tags: {e}")
|
|
return []
|
|
|
|
def create_tag(self, working_directory, tag_name, message):
|
|
"""Creates a new annotated Git tag."""
|
|
self.logger.info(f"Creating tag '{tag_name}' in '{working_directory}'")
|
|
# Validate inputs
|
|
if not tag_name:
|
|
raise ValueError("Tag name cannot be empty.")
|
|
if not message:
|
|
raise ValueError("Tag message cannot be empty.")
|
|
# Validate tag name format using regex
|
|
pattern = r"^(?![./]|.*([./]{2,}|[.]$|\.lock$))[^ \t\n\r\f\v~^:?*[\\]+(?<!\.)$"
|
|
if not re.match(pattern, tag_name):
|
|
raise ValueError(f"Invalid tag name format: '{tag_name}'.")
|
|
|
|
# Build and execute command
|
|
cmd = ["git", "tag", "-a", tag_name, "-m", message]
|
|
try:
|
|
# Use check=True to catch errors like 'tag already exists'
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
self.logger.info(f"Tag '{tag_name}' created successfully.")
|
|
except GitCommandError as e:
|
|
# Check stderr for specific common errors
|
|
stderr = e.stderr.lower() if e.stderr else ""
|
|
if "already exists" in stderr:
|
|
msg = f"Tag '{tag_name}' already exists."
|
|
self.logger.error(msg)
|
|
# Re-raise with specific message
|
|
raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e
|
|
else:
|
|
# Re-raise other Git command errors
|
|
self.logger.error(f"Failed create tag '{tag_name}': {e}")
|
|
raise
|
|
except ValueError as ve:
|
|
# Re-raise validation errors
|
|
self.logger.error(f"Validation error creating tag: {ve}")
|
|
raise
|
|
except Exception as e:
|
|
# Handle unexpected errors
|
|
self.logger.exception(f"Unexpected error creating tag: {e}")
|
|
raise GitCommandError(f"Unexpected tag error: {e}", command=cmd) from e
|
|
|
|
def checkout_tag(self, working_directory, tag_name):
|
|
"""Checks out a specific Git tag (enters detached HEAD state)."""
|
|
self.logger.info(f"Checking out tag '{tag_name}' in '{working_directory}'...")
|
|
if not tag_name:
|
|
raise ValueError("Tag name cannot be empty for checkout.")
|
|
|
|
cmd = ["git", "checkout", tag_name]
|
|
try:
|
|
# Use check=True; will raise error if tag not found
|
|
result = self.log_and_execute(cmd, working_directory, check=True)
|
|
self.logger.info(f"Successfully checked out tag '{tag_name}'.")
|
|
# Check output for detached HEAD warning
|
|
output = (result.stderr + result.stdout).lower()
|
|
if "detached head" in output:
|
|
self.logger.warning("Repository is now in a 'detached HEAD' state.")
|
|
return True # Indicate success
|
|
except GitCommandError as e:
|
|
# Check stderr for specific 'tag not found' errors
|
|
stderr = e.stderr.lower() if e.stderr else ""
|
|
not_found_patterns = [
|
|
"did not match any file(s)",
|
|
f"pathspec '{tag_name.lower()}' did not match", # Check lowercase
|
|
]
|
|
if any(p in stderr for p in not_found_patterns):
|
|
msg = f"Tag '{tag_name}' not found or invalid."
|
|
self.logger.error(msg)
|
|
# Re-raise with specific message
|
|
raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e
|
|
else:
|
|
# Re-raise other Git errors
|
|
self.logger.error(f"Failed checkout tag '{tag_name}': {e}")
|
|
raise
|
|
except ValueError as ve:
|
|
# Re-raise validation errors
|
|
self.logger.error(f"Validation error checking out tag: {ve}")
|
|
raise
|
|
except Exception as e:
|
|
# Handle unexpected errors
|
|
self.logger.exception(f"Unexpected error checking out tag: {e}")
|
|
raise GitCommandError(f"Unexpected checkout error: {e}", command=cmd) from e
|
|
|
|
# --- Branch Management ---
|
|
def list_branches(self, working_directory):
|
|
"""
|
|
Lists local branches, identifying the current one.
|
|
|
|
Returns:
|
|
tuple: (list of all branch names, current branch name or None).
|
|
Returns ([], None) on error.
|
|
"""
|
|
self.logger.info(f"Listing local branches in '{working_directory}'...")
|
|
# Format: '*' if current, space otherwise, then branch name
|
|
fmt = "%(HEAD) %(refname:short)"
|
|
command = ["git", "branch", "--list", f"--format={fmt}"]
|
|
branches = []
|
|
current_branch = None
|
|
try:
|
|
result = self.log_and_execute(command, working_directory, check=True)
|
|
for line in result.stdout.splitlines():
|
|
line_strip = line.strip()
|
|
if line_strip:
|
|
is_current = line_strip.startswith("*")
|
|
# Remove indicator and leading/trailing spaces
|
|
branch_name = line_strip.lstrip("* ").strip()
|
|
branches.append(branch_name)
|
|
if is_current:
|
|
current_branch = branch_name
|
|
# Sort branches alphabetically for consistent display order
|
|
branches.sort()
|
|
self.logger.info(
|
|
f"Found {len(branches)} branches. Current: {current_branch}"
|
|
)
|
|
self.logger.debug(f"Branches: {branches}")
|
|
return branches, current_branch
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Error listing branches: {e}")
|
|
return [], None # Return empty data on error
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error listing branches: {e}")
|
|
return [], None
|
|
|
|
def checkout_branch(self, working_directory, branch_name):
|
|
"""Checks out an existing local branch."""
|
|
self.logger.info(
|
|
f"Checking out branch '{branch_name}' in '{working_directory}'..."
|
|
)
|
|
if not branch_name:
|
|
raise ValueError("Branch name cannot be empty for checkout.")
|
|
|
|
# Use 'git switch' if available? Might provide cleaner output/errors.
|
|
# Sticking with 'checkout' for broader compatibility for now.
|
|
command = ["git", "checkout", branch_name]
|
|
try:
|
|
# Use check=True to catch errors like branch not found
|
|
self.log_and_execute(command, working_directory, check=True)
|
|
self.logger.info(f"Successfully checked out branch '{branch_name}'.")
|
|
return True # Indicate success
|
|
except GitCommandError as e:
|
|
# Check for specific 'branch not found' errors
|
|
stderr = e.stderr.lower() if e.stderr else ""
|
|
if (
|
|
"did not match any file(s)" in stderr
|
|
or f"pathspec '{branch_name.lower()}' did not match" in stderr
|
|
):
|
|
msg = f"Branch '{branch_name}' not found or invalid."
|
|
self.logger.error(msg)
|
|
raise GitCommandError(msg, command=command, stderr=e.stderr) from e
|
|
else:
|
|
# Re-raise other Git errors
|
|
self.logger.error(f"Failed checkout branch '{branch_name}': {e}")
|
|
raise
|
|
except ValueError as ve:
|
|
self.logger.error(f"Validation error checking out branch: {ve}")
|
|
raise
|
|
except Exception as e:
|
|
# Handle unexpected errors
|
|
self.logger.exception(f"Unexpected error checking out branch: {e}")
|
|
raise GitCommandError(
|
|
f"Unexpected checkout error: {e}", command=command
|
|
) from e
|
|
|
|
def create_branch(self, working_directory, new_branch_name):
|
|
"""Creates a new local branch from the current HEAD position."""
|
|
self.logger.info(
|
|
f"Creating new branch '{new_branch_name}' in '{working_directory}'..."
|
|
)
|
|
if not new_branch_name:
|
|
raise ValueError("New branch name cannot be empty.")
|
|
|
|
# Validate branch name format (simplified check for common issues)
|
|
# Disallows spaces, consecutive/trailing dots/slashes, control chars, etc.
|
|
# Also prevents creating a branch named 'HEAD'.
|
|
pattern = (
|
|
r"^(?![./]|.*([./]{2,}|[.]$|@\{|\\\\))[^ \t\n\r\f\v~^:?*[\\]+(?<![/.])$"
|
|
)
|
|
if not re.match(pattern, new_branch_name) or new_branch_name.lower() == "head":
|
|
raise ValueError(f"Invalid branch name format: '{new_branch_name}'.")
|
|
|
|
command = ["git", "branch", new_branch_name]
|
|
try:
|
|
# Use check=True to catch errors like 'branch already exists'
|
|
self.log_and_execute(command, working_directory, check=True)
|
|
self.logger.info(f"Branch '{new_branch_name}' created successfully.")
|
|
return True # Indicate success
|
|
except GitCommandError as e:
|
|
# Check for specific 'already exists' error
|
|
stderr = e.stderr.lower() if e.stderr else ""
|
|
if "already exists" in stderr:
|
|
msg = f"Branch '{new_branch_name}' already exists."
|
|
self.logger.error(msg)
|
|
raise GitCommandError(msg, command=command, stderr=e.stderr) from e
|
|
else:
|
|
# Re-raise other Git errors
|
|
self.logger.error(f"Failed create branch '{new_branch_name}': {e}")
|
|
raise
|
|
except ValueError as ve:
|
|
# Re-raise validation errors
|
|
self.logger.error(f"Validation error creating branch: {ve}")
|
|
raise
|
|
except Exception as e:
|
|
# Handle unexpected errors
|
|
self.logger.exception(f"Unexpected error creating branch: {e}")
|
|
raise GitCommandError(
|
|
f"Unexpected create branch error: {e}", command=command
|
|
) from e
|
|
|
|
# --- History / Log ---
|
|
def get_commit_log(self, working_directory, max_count=100, branch=None):
|
|
"""
|
|
Retrieves formatted commit log, optionally filtered by branch.
|
|
|
|
Args:
|
|
working_directory (str): Path to the Git repository.
|
|
max_count (int): Maximum number of log entries to return.
|
|
branch (str, optional): Filter history to only this branch/ref.
|
|
If None, shows history from all refs.
|
|
|
|
Returns:
|
|
list: List of formatted log strings, newest first. Empty on error.
|
|
"""
|
|
log_scope = f"branch '{branch}'" if branch else "all branches/tags"
|
|
self.logger.info(
|
|
f"Retrieving commit log for {log_scope} in "
|
|
f"'{working_directory}' (max {max_count})..."
|
|
)
|
|
|
|
# Define log format: AbbrevHash Date | Author | Refs Subject
|
|
log_format = "%h %ad | %an |%d %s"
|
|
# Define date format (e.g., 'iso-strict', 'relative', 'short')
|
|
date_format = "iso-strict"
|
|
|
|
# Build the base git log command
|
|
command = [
|
|
"git",
|
|
"log",
|
|
f"--max-count={max_count}", # Limit number of entries
|
|
f"--pretty=format:{log_format}", # Apply custom format
|
|
f"--date={date_format}", # Format commit date
|
|
"--decorate", # Show ref names (branches, tags)
|
|
]
|
|
|
|
# Add branch filter or --all
|
|
if branch:
|
|
# Add the specific branch name to filter the log
|
|
command.append(branch)
|
|
else:
|
|
# Show history from all reachable commits if no branch specified
|
|
command.append("--all")
|
|
|
|
try:
|
|
# Execute the log command
|
|
result = self.log_and_execute(
|
|
command,
|
|
working_directory,
|
|
check=True, # Ensure log command itself succeeds
|
|
)
|
|
# Process the output lines
|
|
log_lines = [
|
|
line.strip() for line in result.stdout.splitlines() if line.strip()
|
|
] # Remove empty lines
|
|
self.logger.info(f"Retrieved {len(log_lines)} log entries for {log_scope}.")
|
|
return log_lines
|
|
except (GitCommandError, ValueError) as e:
|
|
# Log known errors and return empty list
|
|
self.logger.error(f"Error retrieving commit log: {e}")
|
|
return []
|
|
except Exception as e:
|
|
# Log unexpected errors and return empty list
|
|
self.logger.exception(f"Unexpected error retrieving commit log: {e}")
|
|
return []
|
|
|
|
def clone_from_bundle(self, bundle_path, destination_directory):
|
|
"""
|
|
Clones a repository from a Git bundle file into the specified directory.
|
|
The destination directory should not exist or be empty.
|
|
|
|
Args:
|
|
bundle_path (str): Absolute path to the .bundle file.
|
|
destination_directory (str): Path to the directory where the repo
|
|
will be cloned.
|
|
|
|
Returns:
|
|
bool: True on successful clone.
|
|
|
|
Raises:
|
|
GitCommandError: If git clone fails.
|
|
ValueError: If bundle_path or destination_directory are invalid.
|
|
FileNotFoundError: If the bundle file does not exist.
|
|
IOError: If the destination directory exists and is not empty.
|
|
"""
|
|
self.logger.info(
|
|
f"Attempting to clone from bundle '{bundle_path}' into '{destination_directory}'"
|
|
)
|
|
|
|
# --- Validate Inputs ---
|
|
if not bundle_path:
|
|
raise ValueError("Bundle path cannot be empty.")
|
|
if not destination_directory:
|
|
raise ValueError("Destination directory cannot be empty.")
|
|
|
|
abs_bundle_path = os.path.abspath(bundle_path)
|
|
if not os.path.isfile(abs_bundle_path):
|
|
msg = f"Bundle file not found: {abs_bundle_path}"
|
|
self.logger.error(msg)
|
|
raise FileNotFoundError(msg)
|
|
|
|
abs_dest_path = os.path.abspath(destination_directory)
|
|
|
|
# Check destination directory constraints
|
|
if os.path.exists(abs_dest_path):
|
|
if not os.path.isdir(abs_dest_path):
|
|
msg = f"Destination path exists but is not a directory: {abs_dest_path}"
|
|
self.logger.error(msg)
|
|
raise IOError(msg)
|
|
# Check if the directory is empty
|
|
if os.listdir(abs_dest_path):
|
|
msg = f"Destination directory is not empty: {abs_dest_path}. Cannot clone here."
|
|
self.logger.error(msg)
|
|
raise IOError(msg)
|
|
self.logger.debug("Destination directory exists and is empty. Proceeding.")
|
|
else:
|
|
# Attempt to create the destination directory if it doesn't exist
|
|
try:
|
|
os.makedirs(abs_dest_path, exist_ok=True)
|
|
self.logger.info(f"Created destination directory: {abs_dest_path}")
|
|
except OSError as e:
|
|
msg = f"Failed to create destination directory '{abs_dest_path}': {e}"
|
|
self.logger.error(msg, exc_info=True)
|
|
raise IOError(msg) from e
|
|
|
|
# --- Prepare and Execute Clone Command ---
|
|
# Use --bundle option and specify the bundle file and destination directory.
|
|
# Use --no-hardlinks potentially for robustness across filesystems (optional).
|
|
command = [
|
|
"git",
|
|
"clone",
|
|
#"--bundle",
|
|
abs_bundle_path, # Path to the bundle file
|
|
abs_dest_path, # Path to the destination directory
|
|
# "--no-hardlinks" # Consider adding if moving between different drives/filesystems
|
|
]
|
|
|
|
try:
|
|
# Execute the clone command. Run it from a neutral directory (like parent of dest)
|
|
# or let it run from the script's default CWD. check=True ensures failure raises error.
|
|
# Cloning doesn't typically need a specific 'working_directory' context like fetch/merge.
|
|
self.log_and_execute(command, working_directory=".", check=True) # Use '.' for current dir or script location
|
|
self.logger.info(f"Successfully cloned repository into '{abs_dest_path}' from bundle.")
|
|
return True
|
|
except GitCommandError as clone_error:
|
|
# Log specific Git errors and re-raise
|
|
self.logger.error(
|
|
f"Failed to clone from bundle '{abs_bundle_path}': {clone_error}",
|
|
exc_info=True
|
|
)
|
|
# Attempt cleanup: remove potentially partially created destination directory
|
|
if os.path.isdir(abs_dest_path) and not os.listdir(abs_dest_path): # Check if empty after failure
|
|
try:
|
|
os.rmdir(abs_dest_path)
|
|
self.logger.info(f"Removed empty destination directory after failed clone: {abs_dest_path}")
|
|
except OSError as rm_e:
|
|
self.logger.warning(f"Could not remove destination directory '{abs_dest_path}' after failed clone: {rm_e}")
|
|
|
|
raise clone_error # Re-raise the original error
|
|
except Exception as e:
|
|
# Catch any other unexpected errors
|
|
self.logger.exception(f"Unexpected error cloning from bundle: {e}")
|
|
raise GitCommandError(f"Unexpected clone error: {e}", command=command) from e
|
|
|
|
def get_tracked_files(self, working_directory):
|
|
"""
|
|
Retrieves a list of all files currently tracked by Git in the repository.
|
|
Logs command output only at DEBUG level.
|
|
"""
|
|
self.logger.debug(f"Getting tracked files for '{working_directory}'")
|
|
command = ["git", "ls-files", "-z"]
|
|
try:
|
|
# --- MODIFICA: Passa log_output_level=logging.DEBUG ---
|
|
result = self.log_and_execute(
|
|
command,
|
|
working_directory,
|
|
check=True,
|
|
log_output_level=logging.DEBUG # Log stdout/stderr only if logger level is DEBUG
|
|
)
|
|
# --- FINE MODIFICA ---
|
|
tracked_files = [
|
|
f for f in result.stdout.split('\0') if f
|
|
]
|
|
# Log the *count* at INFO level, the list itself might only appear in DEBUG log now.
|
|
self.logger.info(f"Found {len(tracked_files)} tracked files.")
|
|
self.logger.debug(f"Tracked file list: {tracked_files}") # Log list at DEBUG
|
|
return tracked_files
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Failed to list tracked files: {e}")
|
|
raise
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error listing tracked files: {e}")
|
|
raise GitCommandError(f"Unexpected error listing files: {e}", command=command) from e
|
|
|
|
def check_if_would_be_ignored(self, working_directory, path_to_check):
|
|
"""
|
|
Checks if a given path would be ignored by current .gitignore rules,
|
|
regardless of whether it's currently tracked.
|
|
|
|
Args:
|
|
working_directory (str): Path to the Git repository.
|
|
path_to_check (str): The relative path within the repo to check.
|
|
|
|
Returns:
|
|
bool: True if the path matches an ignore rule, False otherwise.
|
|
|
|
Raises:
|
|
GitCommandError: If the git command fails unexpectedly.
|
|
"""
|
|
# Use --no-index to check against .gitignore rules directly,
|
|
# not the index (tracked status).
|
|
# Use --quiet to suppress output, rely only on exit code.
|
|
command = ["git", "check-ignore", "--quiet", "--no-index", "--", path_to_check]
|
|
self.logger.debug(f"Checking ignore status for path: '{path_to_check}'")
|
|
try:
|
|
# check=False because exit code 1 (not ignored) is not an error here
|
|
result = self.log_and_execute(command, working_directory, check=False)
|
|
|
|
if result.returncode == 0:
|
|
self.logger.debug(f"Path '{path_to_check}' WOULD be ignored.")
|
|
return True # Exit code 0 means it IS ignored
|
|
elif result.returncode == 1:
|
|
self.logger.debug(f"Path '{path_to_check}' would NOT be ignored.")
|
|
return False # Exit code 1 means it is NOT ignored
|
|
else:
|
|
# Other non-zero exit codes (e.g., 128) indicate an error
|
|
error_msg = f"git check-ignore failed with exit code {result.returncode}"
|
|
self.logger.error(f"{error_msg}. Stderr: {result.stderr}")
|
|
raise GitCommandError(error_msg, command=command, stderr=result.stderr)
|
|
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Failed check_if_would_be_ignored for '{path_to_check}': {e}")
|
|
raise
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error in check_if_would_be_ignored: {e}")
|
|
raise GitCommandError(f"Unexpected check-ignore error: {e}", command=command) from e
|
|
|
|
def remove_from_tracking(self, working_directory, files_to_untrack):
|
|
"""
|
|
Removes specified files/directories from Git tracking (index)
|
|
without deleting them from the working directory. Processes files in batches
|
|
to avoid command line length limits.
|
|
|
|
Args:
|
|
working_directory (str): Path to the Git repository.
|
|
files_to_untrack (list): List of relative paths to untrack.
|
|
|
|
Returns:
|
|
bool: True if all batches executed successfully.
|
|
|
|
Raises:
|
|
GitCommandError: If any git rm batch command fails.
|
|
ValueError: If the list of files is empty.
|
|
"""
|
|
if not files_to_untrack:
|
|
# Non è un errore se la lista è vuota, semplicemente non c'è nulla da fare.
|
|
self.logger.debug("No files provided to remove_from_tracking.")
|
|
return True # Considera successo non fare nulla
|
|
|
|
self.logger.info(f"Removing {len(files_to_untrack)} items from Git tracking in batches...")
|
|
|
|
# --- MODIFICA: Logica Batch ---
|
|
batch_size = 100 # Processa N file alla volta (regola se necessario)
|
|
all_batches_succeeded = True
|
|
|
|
for i in range(0, len(files_to_untrack), batch_size):
|
|
batch = files_to_untrack[i : i + batch_size]
|
|
if not batch: # Salta batch vuoti (non dovrebbe succedere con range)
|
|
continue
|
|
|
|
self.logger.info(f"Processing untrack batch {i // batch_size + 1}: {len(batch)} items...")
|
|
self.logger.debug(f"Batch items: {batch}")
|
|
|
|
# Costruisci il comando per questo batch
|
|
command = ["git", "rm", "--cached", "-r", "--ignore-unmatch", "--"] # Aggiunto -r e --ignore-unmatch
|
|
# -r: Necessario se alcuni 'file' sono in realtà directory (come _build/, _dist/)
|
|
# --ignore-unmatch: Evita errori se un file è già stato rimosso o non trovato per qualche motivo strano
|
|
command.extend(batch)
|
|
|
|
try:
|
|
# Esegui il comando per il batch corrente
|
|
# Logga l'output a DEBUG per non inondare il log INFO
|
|
self.log_and_execute(command, working_directory, check=True, log_output_level=logging.DEBUG)
|
|
self.logger.info(f"Batch {i // batch_size + 1} untracked successfully.")
|
|
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Failed to process untrack batch {i // batch_size + 1}: {e}")
|
|
# Puoi decidere se interrompere o continuare con gli altri batch.
|
|
# Interrompere è generalmente più sicuro.
|
|
all_batches_succeeded = False
|
|
raise GitCommandError(f"Failed untracking batch {i // batch_size + 1}. Error: {e}", command=command, stderr=getattr(e, 'stderr', None)) from e
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error processing untrack batch {i // batch_size + 1}: {e}")
|
|
all_batches_succeeded = False
|
|
raise GitCommandError(f"Unexpected untrack batch error: {e}", command=command) from e
|
|
|
|
# Se arriviamo qui, il batch ha avuto successo
|
|
|
|
# Ritorna True solo se TUTTI i batch hanno avuto successo
|
|
if all_batches_succeeded:
|
|
self.logger.info("All untracking batches completed successfully.")
|
|
return True
|
|
else:
|
|
# Questo caso non dovrebbe essere raggiunto se solleviamo eccezioni sopra,
|
|
# ma lo teniamo per sicurezza.
|
|
self.logger.error("Untracking process completed with errors in some batches.")
|
|
return False
|
|
|
|
def get_matching_gitignore_rule(self, working_directory, path_to_check):
|
|
"""
|
|
Checks if a given path matches a .gitignore rule and returns the matching pattern.
|
|
|
|
Args:
|
|
working_directory (str): Path to the Git repository.
|
|
path_to_check (str): The relative path within the repo to check.
|
|
|
|
Returns:
|
|
str or None: The pattern from .gitignore that matched the path,
|
|
or None if the path is not ignored.
|
|
|
|
Raises:
|
|
GitCommandError: If the git command fails unexpectedly.
|
|
"""
|
|
# Use -v (--verbose) to get the matching rule details.
|
|
# Use --no-index to check against .gitignore rules directly.
|
|
command = ["git", "check-ignore", "-v", "--no-index", "--", path_to_check]
|
|
self.logger.debug(f"Getting matching ignore rule for path: '{path_to_check}'")
|
|
try:
|
|
# check=False because exit code 1 (not ignored) is not an error here
|
|
result = self.log_and_execute(command, working_directory, check=False)
|
|
|
|
if result.returncode == 0:
|
|
# Output format: <source>:<line_num>:<pattern>\t<pathname>
|
|
# Example: .gitignore:5:*.log logs/latest.log
|
|
output_line = result.stdout.strip()
|
|
if output_line and '\t' in output_line:
|
|
rule_part = output_line.split('\t', 1)[0]
|
|
# Extract pattern (part after the second colon)
|
|
parts = rule_part.split(':', 2)
|
|
if len(parts) == 3:
|
|
pattern = parts[2]
|
|
self.logger.debug(f"Path '{path_to_check}' matched rule: '{pattern}'")
|
|
return pattern
|
|
else:
|
|
self.logger.warning(f"Could not parse pattern from check-ignore output: {output_line}")
|
|
return None # Indicate parsing failure rather than no match
|
|
else:
|
|
self.logger.warning(f"Unexpected output format from check-ignore -v: {output_line}")
|
|
return None # Indicate unexpected output
|
|
elif result.returncode == 1:
|
|
# Exit code 1 means the path is NOT ignored
|
|
self.logger.debug(f"Path '{path_to_check}' is not ignored by any rule.")
|
|
return None
|
|
else:
|
|
# Other non-zero exit codes (e.g., 128) indicate an error
|
|
error_msg = f"git check-ignore -v failed with exit code {result.returncode}"
|
|
self.logger.error(f"{error_msg}. Stderr: {result.stderr}")
|
|
raise GitCommandError(error_msg, command=command, stderr=result.stderr)
|
|
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Failed get_matching_gitignore_rule for '{path_to_check}': {e}")
|
|
raise
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error in get_matching_gitignore_rule: {e}")
|
|
raise GitCommandError(f"Unexpected check-ignore -v error: {e}", command=command) from e
|
|
|
|
def get_status_short(self, working_directory):
|
|
"""
|
|
Gets the repository status in short format (-z for null termination).
|
|
|
|
Returns:
|
|
list: A list of strings, each representing a changed file
|
|
and its status (e.g., " M filename", "?? newfile").
|
|
Returns empty list on error.
|
|
"""
|
|
self.logger.debug(f"Getting short status for '{working_directory}'")
|
|
# -z null terminates filenames, safer for paths with spaces/special chars
|
|
# --ignored=no : Non mostrare file ignorati (di solito default, ma esplicito)
|
|
command = ["git", "status", "--short", "-z", "--ignored=no"]
|
|
try:
|
|
# L'output di status può essere utile, ma non enorme. Logghiamolo a DEBUG.
|
|
result = self.log_and_execute(command, working_directory, check=True, log_output_level=logging.DEBUG)
|
|
# Split by null terminator and filter out empty strings
|
|
status_lines = [line for line in result.stdout.split('\0') if line]
|
|
self.logger.info(f"Git status check returned {len(status_lines)} changed/untracked items.")
|
|
return status_lines
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Failed to get git status: {e}")
|
|
return [] # Ritorna lista vuota in caso di errore
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error getting git status: {e}")
|
|
return []
|
|
|
|
def get_file_content_from_ref(self, working_directory, file_path, ref="HEAD"):
|
|
"""
|
|
Retrieves the content of a file from a specific Git reference (e.g., HEAD, index).
|
|
|
|
Args:
|
|
working_directory (str): Path to the Git repository.
|
|
file_path (str): Relative path to the file within the repo.
|
|
ref (str): The Git reference (e.g., "HEAD", "master", commit hash, or ":" for index).
|
|
|
|
Returns:
|
|
str or None: The file content as a string, or None if the file
|
|
doesn't exist in that ref or an error occurs.
|
|
"""
|
|
# Normalizza i separatori per Git (usa sempre '/')
|
|
git_style_path = file_path.replace(os.path.sep, '/')
|
|
ref_prefix = f"{ref}:" if ref else ":" # Usa ":" per l'index se ref è vuoto o None
|
|
|
|
# Costruisci l'argomento ref:path
|
|
ref_path_arg = f"{ref_prefix}{git_style_path}"
|
|
|
|
self.logger.debug(f"Getting file content for '{ref_path_arg}' in '{working_directory}'")
|
|
command = ["git", "show", ref_path_arg]
|
|
|
|
try:
|
|
# check=False perché un file non trovato in HEAD/index non è un errore fatale,
|
|
# significa solo che è nuovo o cancellato. L'output va a DEBUG.
|
|
result = self.log_and_execute(command, working_directory, check=False, log_output_level=logging.DEBUG)
|
|
|
|
if result.returncode == 0:
|
|
# Successo, ritorna il contenuto (stdout)
|
|
return result.stdout
|
|
elif result.returncode == 128 and ("exists on disk, but not in" in result.stderr or \
|
|
"does not exist in" in result.stderr or \
|
|
"did not match any file" in result.stderr):
|
|
# Codice 128 con errore specifico -> file non trovato nel ref
|
|
self.logger.debug(f"File '{git_style_path}' not found in ref '{ref}'.")
|
|
return None # Ritorna None per indicare file non esistente nel ref
|
|
else:
|
|
# Altro errore di git show
|
|
self.logger.error(f"git show command failed for '{ref_path_arg}' with code {result.returncode}. Stderr: {result.stderr}")
|
|
return None # Ritorna None per indicare errore generico
|
|
|
|
except (GitCommandError, ValueError) as e:
|
|
# Questo catturerebbe errori nell'esecuzione del comando stesso (raro se check=False)
|
|
self.logger.error(f"Error executing git show for '{ref_path_arg}': {e}")
|
|
return None
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error in get_file_content_from_ref for '{ref_path_arg}': {e}")
|
|
return None |