767 lines
35 KiB
Python
767 lines
35 KiB
Python
# git_commands.py
|
|
import os
|
|
import subprocess
|
|
import logging
|
|
import re # Ensure re is imported
|
|
|
|
class GitCommandError(Exception):
|
|
"""
|
|
Custom exception for handling Git command errors.
|
|
Includes the original command and error details if available.
|
|
"""
|
|
def __init__(self, message, command=None, stderr=None):
|
|
"""
|
|
Initialize the GitCommandError.
|
|
|
|
Args:
|
|
message (str): The error message.
|
|
command (list, optional): The command that caused the error. Defaults to None.
|
|
stderr (str, optional): The standard error output. Defaults to None.
|
|
"""
|
|
super().__init__(message)
|
|
self.command = command
|
|
self.stderr = stderr
|
|
|
|
def __str__(self):
|
|
"""Return a formatted string representation of the error."""
|
|
base_message = super().__str__()
|
|
details = []
|
|
if self.command:
|
|
# Ensure command is list of strings for join
|
|
safe_command = [str(part) for part in self.command]
|
|
command_str = ' '.join(safe_command)
|
|
details.append(f"Command: '{command_str}'")
|
|
if self.stderr:
|
|
stderr_str = self.stderr.strip()
|
|
details.append(f"Stderr: {stderr_str}")
|
|
|
|
if details:
|
|
details_str = '; '.join(details)
|
|
return f"{base_message} ({details_str})"
|
|
else:
|
|
return base_message
|
|
|
|
|
|
class GitCommands:
|
|
"""
|
|
Manages Git commands execution, logging, and error handling.
|
|
Includes tag and branch management functionalities.
|
|
"""
|
|
def __init__(self, logger):
|
|
"""
|
|
Initializes the GitCommands with a logger.
|
|
|
|
Args:
|
|
logger (logging.Logger): Logger instance for logging messages.
|
|
"""
|
|
if not isinstance(logger, logging.Logger):
|
|
# Raise error if logger is not a valid logger instance
|
|
raise ValueError("A valid logging.Logger instance is required.")
|
|
self.logger = logger
|
|
|
|
def log_and_execute(self, command, working_directory, check=True):
|
|
"""
|
|
Executes a command within a specific working directory, logs it,
|
|
and handles errors.
|
|
|
|
Args:
|
|
command (list): List representing the command and its arguments.
|
|
working_directory (str): Path to the directory for command execution.
|
|
check (bool, optional): Raise error on non-zero exit code if True.
|
|
Defaults to True.
|
|
|
|
Returns:
|
|
subprocess.CompletedProcess: The result object from subprocess.run.
|
|
|
|
Raises:
|
|
GitCommandError: If path invalid, command fails (and check=True), etc.
|
|
ValueError: If working_directory is None or empty.
|
|
"""
|
|
# Ensure all parts of command are strings for logging and execution
|
|
safe_command = [str(part) for part in command]
|
|
command_str = ' '.join(safe_command)
|
|
log_message = f"Executing: {command_str}"
|
|
self.logger.debug(log_message)
|
|
|
|
# Validate working directory
|
|
if not working_directory:
|
|
msg = "Working directory cannot be None or empty."
|
|
self.logger.error(msg)
|
|
raise ValueError(msg)
|
|
|
|
abs_path = os.path.abspath(working_directory)
|
|
if not os.path.isdir(abs_path):
|
|
msg = f"Working directory does not exist or is not a directory: {abs_path}"
|
|
self.logger.error(msg)
|
|
# Include command context in the error
|
|
raise GitCommandError(msg, command=safe_command)
|
|
|
|
cwd = abs_path
|
|
self.logger.debug(f"Working directory: {cwd}")
|
|
|
|
try:
|
|
# Platform-specific setup to hide console window on Windows
|
|
startupinfo = None
|
|
creationflags = 0 # Default creation flags
|
|
if os.name == 'nt':
|
|
startupinfo = subprocess.STARTUPINFO()
|
|
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
|
startupinfo.wShowWindow = subprocess.SW_HIDE
|
|
# Optional: Use CREATE_NO_WINDOW if std I/O redirection not needed
|
|
# creationflags = subprocess.CREATE_NO_WINDOW
|
|
|
|
# Execute the command
|
|
result = subprocess.run(
|
|
safe_command, # Use the validated command list
|
|
cwd=cwd,
|
|
capture_output=True,
|
|
text=True,
|
|
check=check,
|
|
encoding='utf-8',
|
|
errors='replace', # Handle potential decoding errors
|
|
startupinfo=startupinfo,
|
|
creationflags=creationflags
|
|
)
|
|
|
|
# Log stdout and stderr clearly
|
|
stdout_log = result.stdout.strip() if result.stdout else "<no stdout>"
|
|
stderr_log = result.stderr.strip() if result.stderr else "<no stderr>"
|
|
|
|
# Log success with output details
|
|
# Use DEBUG level for full output, INFO for summary? Let's use INFO for now.
|
|
self.logger.info(
|
|
f"Command successful. Output:\n"
|
|
f"--- stdout ---\n{stdout_log}\n"
|
|
f"--- stderr ---\n{stderr_log}\n---"
|
|
)
|
|
return result
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
# Log detailed error information from the failed process
|
|
stderr_err = e.stderr.strip() if e.stderr else "<no stderr>"
|
|
stdout_err = e.stdout.strip() if e.stdout else "<no stdout>"
|
|
error_log_msg = (
|
|
f"Command failed with return code {e.returncode} in '{cwd}'.\n"
|
|
f"--- command ---\n{command_str}\n"
|
|
f"--- stderr ---\n{stderr_err}\n"
|
|
f"--- stdout ---\n{stdout_err}\n"
|
|
f"---"
|
|
)
|
|
self.logger.error(error_log_msg)
|
|
# Wrap the original exception for consistent error handling
|
|
raise GitCommandError(
|
|
f"Git command failed in '{cwd}'.",
|
|
command=safe_command,
|
|
stderr=e.stderr
|
|
) from e
|
|
|
|
except FileNotFoundError as e:
|
|
# Log error if the command itself (e.g., 'git') is not found
|
|
error_msg = (
|
|
f"Command not found: '{safe_command[0]}'. Is Git installed "
|
|
f"and in system PATH? (WD: '{cwd}')"
|
|
)
|
|
self.logger.error(error_msg)
|
|
# Log exception details at debug level for more info if needed
|
|
self.logger.debug(f"FileNotFoundError details: {e}")
|
|
raise GitCommandError(error_msg, command=safe_command) from e
|
|
|
|
except PermissionError as e:
|
|
# Handle errors related to execution permissions
|
|
error_msg = f"Permission denied executing command in '{cwd}'."
|
|
self.logger.error(error_msg)
|
|
self.logger.debug(f"PermissionError details: {e}")
|
|
raise GitCommandError(error_msg, command=safe_command, stderr=str(e)) from e
|
|
|
|
except Exception as e:
|
|
# Catch any other unexpected errors during execution
|
|
self.logger.exception(f"Unexpected error executing command in '{cwd}': {e}")
|
|
raise GitCommandError(
|
|
f"Unexpected error during command execution: {e}",
|
|
command=safe_command
|
|
) from e
|
|
|
|
|
|
def create_git_bundle(self, working_directory, bundle_path):
|
|
"""
|
|
Creates a Git bundle file from the repository in working_directory.
|
|
|
|
Args:
|
|
working_directory (str): Path to the local Git repository.
|
|
bundle_path (str): Full path where the bundle file should be saved.
|
|
|
|
Raises:
|
|
GitCommandError: If command fails or path invalid.
|
|
ValueError: If working_directory is None or empty.
|
|
"""
|
|
normalized_bundle_path = os.path.normpath(bundle_path)
|
|
normalized_bundle_path = normalized_bundle_path.replace("\\", "/")
|
|
command = ["git", "bundle", "create", normalized_bundle_path, "--all"]
|
|
self.logger.info(f"Attempting to create Git bundle: {normalized_bundle_path}")
|
|
|
|
try:
|
|
result = self.log_and_execute(
|
|
command,
|
|
working_directory,
|
|
check=False # Check result manually
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
stderr_lower = result.stderr.lower() if result.stderr else ""
|
|
if "refusing to create empty bundle" in stderr_lower:
|
|
self.logger.warning(f"Bundle creation skipped: Empty repo at '{working_directory}'.")
|
|
# Not an error, do not raise
|
|
else:
|
|
error_msg = f"Git bundle command failed (code {result.returncode})."
|
|
raise GitCommandError(error_msg, command, result.stderr)
|
|
elif not os.path.exists(normalized_bundle_path) or \
|
|
os.path.getsize(normalized_bundle_path) == 0:
|
|
self.logger.warning(f"Bundle success, but file '{normalized_bundle_path}' missing/empty.")
|
|
# Consider if this is an error case
|
|
else:
|
|
self.logger.info(f"Git bundle created successfully: '{normalized_bundle_path}'.")
|
|
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Failed create bundle for '{working_directory}': {e}")
|
|
raise
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected bundle error for '{working_directory}': {e}")
|
|
raise GitCommandError(f"Unexpected bundle error: {e}", command) from e
|
|
|
|
|
|
def fetch_from_git_bundle(self, working_directory, bundle_path):
|
|
"""
|
|
Fetches changes from a Git bundle file and merges them.
|
|
|
|
Args:
|
|
working_directory (str): Path to the local Git repository.
|
|
bundle_path (str): Path to the Git bundle file.
|
|
|
|
Raises:
|
|
GitCommandError: If fetch or merge fails.
|
|
ValueError: If arguments invalid.
|
|
"""
|
|
normalized_bundle_path = os.path.normpath(bundle_path).replace("\\", "/")
|
|
self.logger.info(f"Fetching from '{normalized_bundle_path}' into '{working_directory}'")
|
|
|
|
fetch_command = ["git", "fetch", normalized_bundle_path]
|
|
merge_command = ["git", "merge", "FETCH_HEAD", "--no-ff"] # No fast-forward merge
|
|
|
|
try:
|
|
# 1. Fetch changes
|
|
self.logger.debug("Executing fetch command...")
|
|
self.log_and_execute(fetch_command, working_directory, check=True) # Error if fetch fails
|
|
self.logger.info("Successfully fetched from Git bundle.")
|
|
|
|
# 2. Merge fetched changes
|
|
self.logger.debug("Executing merge command...")
|
|
merge_result = self.log_and_execute(merge_command, working_directory, check=False) # Check manually
|
|
|
|
# Analyze merge result
|
|
stdout_log = merge_result.stdout.strip() if merge_result.stdout else ""
|
|
stderr_log = merge_result.stderr.strip() if merge_result.stderr else ""
|
|
|
|
if merge_result.returncode == 0:
|
|
if "already up to date" in stdout_log.lower():
|
|
self.logger.info("Repository already up-to-date.")
|
|
else:
|
|
self.logger.info("Successfully merged fetched changes.")
|
|
else:
|
|
# Merge failed, likely conflicts
|
|
output_lower = (stderr_log + stdout_log).lower()
|
|
if "conflict" in output_lower:
|
|
conflict_msg = (f"Merge conflict after fetching. Resolve manually "
|
|
f"in '{working_directory}' and commit.")
|
|
self.logger.error(conflict_msg)
|
|
# Raise specific error for caller
|
|
raise GitCommandError(conflict_msg, merge_command, merge_result.stderr)
|
|
else:
|
|
# Other merge error
|
|
error_msg = f"Merge command failed (code {merge_result.returncode})."
|
|
self.logger.error(error_msg)
|
|
raise GitCommandError(error_msg, merge_command, merge_result.stderr)
|
|
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Fetch/merge error for '{working_directory}': {e}")
|
|
raise
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected fetch/merge error for '{working_directory}': {e}")
|
|
raise GitCommandError(f"Unexpected fetch/merge error: {e}") from e
|
|
|
|
|
|
def prepare_svn_for_git(self, working_directory):
|
|
"""
|
|
Prepares a directory for Git: initializes repo and ensures .gitignore.
|
|
|
|
Args:
|
|
working_directory (str): Path to the directory.
|
|
|
|
Raises:
|
|
GitCommandError/ValueError/IOError: On failure.
|
|
"""
|
|
self.logger.info(f"Preparing directory for Git: '{working_directory}'")
|
|
|
|
# Validate path
|
|
if not working_directory:
|
|
raise ValueError("Working directory cannot be empty.")
|
|
if not os.path.isdir(working_directory):
|
|
raise GitCommandError(f"Directory does not exist: {working_directory}")
|
|
|
|
# Define paths
|
|
gitignore_path = os.path.join(working_directory, ".gitignore")
|
|
git_dir_path = os.path.join(working_directory, ".git")
|
|
|
|
# 1. Initialize Git repository if needed
|
|
if not os.path.exists(git_dir_path):
|
|
self.logger.info("No existing Git repo found. Initializing...")
|
|
try:
|
|
init_command = ["git", "init"]
|
|
self.log_and_execute(init_command, working_directory, check=True)
|
|
self.logger.info("Git repository initialized successfully.")
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Failed to initialize Git repository: {e}")
|
|
raise # Re-raise to signal failure
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error initializing repo: {e}")
|
|
raise GitCommandError(f"Unexpected init error: {e}") from e
|
|
else:
|
|
self.logger.info("Git repository already exists. Skipping init.")
|
|
|
|
# 2. Ensure .gitignore exists and ignores .svn
|
|
self.logger.debug(f"Checking/updating .gitignore: {gitignore_path}")
|
|
try:
|
|
svn_ignore_entry = ".svn"
|
|
needs_write = False
|
|
content_to_write = ""
|
|
|
|
if not os.path.exists(gitignore_path):
|
|
# File doesn't exist, create it with the entry
|
|
self.logger.info("'.gitignore' not found. Creating with .svn entry.")
|
|
content_to_write = f"{svn_ignore_entry}\n"
|
|
needs_write = True
|
|
else:
|
|
# File exists, check content
|
|
try:
|
|
with open(gitignore_path, "r", encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
# Check if entry or entry/ exists
|
|
is_ignored = any(
|
|
line.strip() == svn_ignore_entry or \
|
|
line.strip().startswith(svn_ignore_entry + '/')
|
|
for line in lines
|
|
)
|
|
if not is_ignored:
|
|
# Entry not found, need to append
|
|
self.logger.info(f"'{svn_ignore_entry}' not found. Appending.")
|
|
current_content = "".join(lines)
|
|
# Add newline before entry if file doesn't end with one
|
|
if not current_content.endswith('\n'):
|
|
content_to_write = f"\n{svn_ignore_entry}\n"
|
|
else:
|
|
content_to_write = f"{svn_ignore_entry}\n"
|
|
needs_write = True
|
|
else:
|
|
self.logger.info(f"'{svn_ignore_entry}' already ignored.")
|
|
except IOError as e:
|
|
# Cannot read existing file, log warning, don't modify
|
|
self.logger.warning(f"Could not read existing '.gitignore': {e}.")
|
|
|
|
# Write to file only if necessary
|
|
if needs_write:
|
|
# Use 'a'ppend mode if appending, 'w'rite if creating new
|
|
mode = 'a' if os.path.exists(gitignore_path) and content_to_write.startswith("\n") else 'w'
|
|
# If creating new, content shouldn't start with newline
|
|
if mode == 'w':
|
|
content_to_write = content_to_write.lstrip()
|
|
|
|
try:
|
|
with open(gitignore_path, mode, encoding='utf-8', newline='\n') as f:
|
|
f.write(content_to_write)
|
|
self.logger.info("Updated '.gitignore' file.")
|
|
except IOError as e:
|
|
self.logger.error(f"Error writing to '.gitignore': {e}")
|
|
raise GitCommandError(f"Failed update .gitignore: {e}") from e
|
|
|
|
except IOError as e: # Catch errors from os.path.exists or final write
|
|
self.logger.error(f"Error accessing/writing '.gitignore': {e}")
|
|
raise GitCommandError(f"File I/O error for .gitignore: {e}") from e
|
|
except Exception as e: # Catch other unexpected errors
|
|
self.logger.exception(f"Unexpected error managing '.gitignore': {e}")
|
|
raise GitCommandError(f"Unexpected error with .gitignore: {e}") from e
|
|
|
|
self.logger.info(f"Directory preparation complete for '{working_directory}'.")
|
|
|
|
|
|
def git_commit(self, working_directory, message="Autocommit"):
|
|
""" Stages all changes and commits them. """
|
|
self.logger.info(f"Attempting commit in '{working_directory}': '{message}'")
|
|
try:
|
|
# 1. Stage all changes
|
|
add_command = ["git", "add", "."]
|
|
self.logger.debug("Staging all changes (git add .)...")
|
|
self.log_and_execute(add_command, working_directory, check=True)
|
|
self.logger.debug("Staging successful.")
|
|
|
|
# 2. Commit staged changes
|
|
commit_command = ["git", "commit", "-m", message]
|
|
self.logger.debug("Attempting commit...")
|
|
result = self.log_and_execute(commit_command, working_directory, check=False)
|
|
|
|
# Analyze result
|
|
stdout_lower = result.stdout.lower() if result.stdout else ""
|
|
stderr_lower = result.stderr.lower() if result.stderr else ""
|
|
nothing_to_commit = False
|
|
if "nothing to commit" in stdout_lower: nothing_to_commit = True
|
|
if "no changes added to commit" in stdout_lower: nothing_to_commit = True
|
|
if "nothing added to commit" in stdout_lower: nothing_to_commit = True
|
|
# Handle Git returning 1 with no output for no changes case
|
|
if result.returncode == 1 and not stderr_lower and not stdout_lower: nothing_to_commit = True
|
|
|
|
if result.returncode == 0:
|
|
self.logger.info("Commit successful.")
|
|
return True
|
|
elif nothing_to_commit:
|
|
self.logger.info("No changes to commit.")
|
|
return False
|
|
else:
|
|
# Unexpected error
|
|
error_msg = f"Commit command failed (code {result.returncode})."
|
|
raise GitCommandError(error_msg, commit_command, result.stderr)
|
|
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Commit process error: {e}")
|
|
raise
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected commit error: {e}")
|
|
raise GitCommandError(f"Unexpected commit error: {e}") from e
|
|
|
|
|
|
def git_status_has_changes(self, working_directory):
|
|
""" Checks if the Git repository has uncommitted changes. """
|
|
self.logger.debug(f"Checking Git status in '{working_directory}'...")
|
|
try:
|
|
status_command = ["git", "status", "--porcelain"]
|
|
result = self.log_and_execute(status_command, working_directory, check=True)
|
|
# Any output from porcelain means changes exist
|
|
has_changes = bool(result.stdout.strip())
|
|
self.logger.debug(f"Status check complete. Has changes: {has_changes}")
|
|
return has_changes
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Status check error: {e}")
|
|
raise
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected status error: {e}")
|
|
raise GitCommandError(f"Unexpected status error: {e}") from e
|
|
|
|
|
|
def list_tags(self, working_directory):
|
|
""" Lists tags with subjects, sorted newest first. """
|
|
self.logger.info(f"Listing tags with subjects in '{working_directory}'...")
|
|
format_string = "%(refname:short)%09%(contents:subject)"
|
|
command = ["git", "tag", "--list", f"--format={format_string}", "--sort=-creatordate"]
|
|
tags_with_subjects = []
|
|
try:
|
|
result = self.log_and_execute(command, working_directory, check=True)
|
|
output_lines = result.stdout.splitlines()
|
|
for line in output_lines:
|
|
line_stripped = line.strip()
|
|
if line_stripped:
|
|
parts = line_stripped.split('\t', 1) # Split only on first tab
|
|
tag_name = parts[0].strip()
|
|
tag_subject = parts[1].strip() if len(parts) > 1 else "(No subject)"
|
|
tags_with_subjects.append((tag_name, tag_subject))
|
|
|
|
count = len(tags_with_subjects)
|
|
self.logger.info(f"Found {count} tags with subjects.")
|
|
self.logger.debug(f"Tags found: {tags_with_subjects}")
|
|
return tags_with_subjects
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Error listing tags: {e}")
|
|
return [] # Return empty list on known errors
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error listing tags: {e}")
|
|
return []
|
|
|
|
|
|
def create_tag(self, working_directory, tag_name, message):
|
|
""" Creates an annotated tag. """
|
|
self.logger.info(f"Creating tag '{tag_name}' in '{working_directory}'")
|
|
# Validate inputs
|
|
if not tag_name: raise ValueError("Tag name cannot be empty.")
|
|
if not message: raise ValueError("Tag message cannot be empty.")
|
|
# Validate tag name format
|
|
pattern = r"^(?![./]|.*([./]{2,}|[.]$|\.lock$))[^ \t\n\r\f\v~^:?*[\\]+(?<!\.)$"
|
|
if not re.match(pattern, tag_name):
|
|
raise ValueError(f"Invalid tag name format: '{tag_name}'.")
|
|
|
|
command = ["git", "tag", "-a", tag_name, "-m", message]
|
|
try:
|
|
self.log_and_execute(command, working_directory, check=True)
|
|
self.logger.info(f"Tag '{tag_name}' created successfully.")
|
|
except GitCommandError as e:
|
|
err = e.stderr.lower() if e.stderr else ""
|
|
if "already exists" in err:
|
|
msg = f"Tag '{tag_name}' already exists."
|
|
self.logger.error(msg)
|
|
raise GitCommandError(msg, command, e.stderr) from e
|
|
else:
|
|
self.logger.error(f"Failed create tag '{tag_name}': {e}")
|
|
raise
|
|
except ValueError as ve: # Re-raise validation errors
|
|
raise ve
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected tag creation error: {e}")
|
|
raise GitCommandError(f"Unexpected tag error: {e}", command) from e
|
|
|
|
|
|
def checkout_tag(self, working_directory, tag_name):
|
|
""" Checks out a specific tag (detached HEAD state). """
|
|
self.logger.info(f"Checking out tag '{tag_name}' in '{working_directory}'...")
|
|
if not tag_name:
|
|
raise ValueError("Tag name cannot be empty.")
|
|
|
|
command = ["git", "checkout", tag_name]
|
|
try:
|
|
result = self.log_and_execute(command, working_directory, check=True)
|
|
self.logger.info(f"Successfully checked out tag '{tag_name}'.")
|
|
# Check output for detached HEAD warning
|
|
output_lower = (result.stderr + result.stdout).lower()
|
|
if "detached head" in output_lower:
|
|
self.logger.warning("Repository is now in a 'detached HEAD' state.")
|
|
return True # Indicate success
|
|
except GitCommandError as e:
|
|
err = e.stderr.lower() if e.stderr else ""
|
|
not_found_patterns = [
|
|
"did not match any file(s)",
|
|
f"pathspec '{tag_name.lower()}' did not match any file(s)"
|
|
]
|
|
if any(p in err for p in not_found_patterns):
|
|
msg = f"Tag '{tag_name}' not found or invalid."
|
|
self.logger.error(msg)
|
|
raise GitCommandError(msg, command, e.stderr) from e
|
|
else:
|
|
self.logger.error(f"Failed checkout tag '{tag_name}': {e}")
|
|
raise # Re-raise other Git errors
|
|
except ValueError as ve:
|
|
raise ve
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected checkout error: {e}")
|
|
raise GitCommandError(f"Unexpected checkout error: {e}", command) from e
|
|
|
|
# --- Branch Management Methods ---
|
|
def get_current_branch(self, working_directory):
|
|
""" Gets the current branch name or indicates detached HEAD. """
|
|
self.logger.debug(f"Getting current branch in '{working_directory}'...")
|
|
cmd_show = ["git", "branch", "--show-current"]
|
|
try:
|
|
res_show = self.log_and_execute(cmd_show, working_directory, check=False)
|
|
if res_show.returncode == 0 and res_show.stdout.strip():
|
|
name = res_show.stdout.strip()
|
|
self.logger.debug(f"Current branch (show-current): {name}")
|
|
return name
|
|
else:
|
|
# Fallback for older Git or detached head
|
|
self.logger.debug("Using fallback: symbolic-ref.")
|
|
cmd_ref = ["git", "symbolic-ref", "--short", "HEAD"]
|
|
res_ref = self.log_and_execute(cmd_ref, working_directory, check=False)
|
|
if res_ref.returncode == 0 and res_ref.stdout.strip():
|
|
name = res_ref.stdout.strip()
|
|
self.logger.debug(f"Current branch (symbolic-ref): {name}")
|
|
return name
|
|
else:
|
|
# Detached HEAD or error
|
|
msg = f"Could not get branch name (symbolic-ref exit: {res_ref.returncode})."
|
|
self.logger.warning(msg)
|
|
if res_ref.returncode == 128: # Common detached code
|
|
return "(DETACHED HEAD)"
|
|
else:
|
|
return "<Error>"
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Error getting current branch: {e}")
|
|
return "<Error>"
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error getting branch: {e}")
|
|
return "<Error>"
|
|
|
|
def list_branches(self, working_directory):
|
|
""" Lists local Git branches. """
|
|
self.logger.info(f"Listing local branches in '{working_directory}'...")
|
|
cmd = ["git", "branch", "--list", "--no-color"]
|
|
branches = []
|
|
try:
|
|
result = self.log_and_execute(cmd, working_directory, check=True)
|
|
for line in result.stdout.splitlines():
|
|
# Remove decoration ('*' ) and whitespace
|
|
name = line.lstrip('* ').strip()
|
|
# Filter out potential detached HEAD message if present
|
|
if name and "HEAD detached" not in name:
|
|
branches.append(name)
|
|
count = len(branches)
|
|
self.logger.info(f"Found {count} local branches.")
|
|
self.logger.debug(f"Branches: {branches}")
|
|
return branches
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Error listing branches: {e}")
|
|
return []
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error listing branches: {e}")
|
|
return []
|
|
|
|
def create_branch(self, working_directory, branch_name, start_point=None):
|
|
""" Creates a new local Git branch. """
|
|
self.logger.info(f"Creating branch '{branch_name}' in '{working_directory}'...")
|
|
if not branch_name:
|
|
raise ValueError("Branch name cannot be empty.")
|
|
# Branch name validation
|
|
pattern = r"^(?![./]|.*([./]{2,}|[.]$|[/]$|@\{))[^ \t\n\r\f\v~^:?*[\\]+(?<!\.lock)$"
|
|
if not re.match(pattern, branch_name):
|
|
raise ValueError(f"Invalid branch name format: '{branch_name}'.")
|
|
|
|
cmd = ["git", "branch", branch_name]
|
|
if start_point:
|
|
cmd.append(start_point)
|
|
self.logger.info(f"Starting branch from: {start_point}")
|
|
|
|
try:
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
self.logger.info(f"Branch '{branch_name}' created successfully.")
|
|
except GitCommandError as e:
|
|
err = e.stderr.lower() if e.stderr else ""
|
|
if "already exists" in err:
|
|
msg = f"Branch '{branch_name}' already exists."
|
|
self.logger.error(msg)
|
|
raise GitCommandError(msg, cmd, e.stderr) from e
|
|
else:
|
|
self.logger.error(f"Failed to create branch '{branch_name}': {e}")
|
|
raise
|
|
except ValueError as ve:
|
|
raise ve
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error creating branch: {e}")
|
|
raise GitCommandError(f"Unexpected branch error: {e}", cmd) from e
|
|
|
|
def checkout_branch(self, working_directory, branch_name):
|
|
""" Checks out an existing local branch. """
|
|
self.logger.info(f"Switching to branch '{branch_name}' in '{working_directory}'...")
|
|
if not branch_name:
|
|
raise ValueError("Branch name cannot be empty.")
|
|
|
|
# Use 'git switch' (preferred)
|
|
cmd = ["git", "switch", branch_name]
|
|
# Alternative fallback: cmd = ["git", "checkout", branch_name]
|
|
|
|
try:
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
self.logger.info(f"Switched successfully to branch '{branch_name}'.")
|
|
return True
|
|
except GitCommandError as e:
|
|
err = e.stderr.lower() if e.stderr else ""
|
|
# Check specific errors
|
|
if "invalid reference" in err or "did not match" in err:
|
|
msg = f"Branch '{branch_name}' not found or invalid."
|
|
self.logger.error(msg)
|
|
raise GitCommandError(msg, cmd, e.stderr) from e
|
|
elif "local changes" in err or "would be overwritten" in err:
|
|
# This check should ideally happen before calling this method
|
|
msg = "Checkout failed: Uncommitted changes conflict."
|
|
self.logger.error(msg)
|
|
raise GitCommandError(msg, cmd, e.stderr) from e
|
|
else:
|
|
self.logger.error(f"Failed switch to branch '{branch_name}': {e}")
|
|
raise # Re-raise other Git errors
|
|
except ValueError as ve:
|
|
raise ve
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error switching branch: {e}")
|
|
raise GitCommandError(f"Unexpected switch error: {e}", cmd) from e
|
|
|
|
def delete_branch(self, working_directory, branch_name, force=False):
|
|
""" Deletes a local branch. """
|
|
self.logger.info(f"Deleting branch '{branch_name}' in '{working_directory}' (force={force})...")
|
|
if not branch_name:
|
|
raise ValueError("Branch name cannot be empty.")
|
|
# Prevent deleting common main branches
|
|
if branch_name in ['main', 'master']:
|
|
raise ValueError(f"Safety check: Cannot delete '{branch_name}'.")
|
|
|
|
delete_flag = "-D" if force else "-d"
|
|
cmd = ["git", "branch", delete_flag, branch_name]
|
|
self.logger.info(f"Using delete flag: {delete_flag}")
|
|
|
|
try:
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
self.logger.info(f"Branch '{branch_name}' deleted successfully.")
|
|
return True
|
|
except GitCommandError as e:
|
|
err = e.stderr.lower() if e.stderr else ""
|
|
# Check for specific deletion errors
|
|
if "not found" in err:
|
|
msg = f"Branch '{branch_name}' not found for deletion."
|
|
self.logger.error(msg)
|
|
raise GitCommandError(msg, cmd, e.stderr) from e
|
|
elif "not fully merged" in err and not force:
|
|
# Only happens with '-d'
|
|
msg = f"Branch '{branch_name}' not fully merged. Use force delete?"
|
|
self.logger.warning(msg)
|
|
# Raise specific error for UI to potentially offer force option
|
|
raise GitCommandError(msg, cmd, e.stderr) from e
|
|
elif "cannot delete branch" in err and "checked out" in err:
|
|
msg = f"Cannot delete currently checked out branch '{branch_name}'."
|
|
self.logger.error(msg)
|
|
raise GitCommandError(msg, cmd, e.stderr) from e
|
|
else:
|
|
# Other delete errors
|
|
self.logger.error(f"Failed to delete branch '{branch_name}': {e}")
|
|
raise # Re-raise
|
|
except ValueError as ve:
|
|
raise ve
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error deleting branch: {e}")
|
|
raise GitCommandError(f"Unexpected delete error: {e}", cmd) from e
|
|
|
|
# --- ADDED: Delete Tag Method ---
|
|
def delete_tag(self, working_directory, tag_name):
|
|
"""
|
|
Deletes a local tag.
|
|
|
|
Args:
|
|
working_directory (str): Path to the local Git repository.
|
|
tag_name (str): The name of the tag to delete.
|
|
|
|
Returns:
|
|
bool: True on success.
|
|
Raises:
|
|
GitCommandError: If tag not found or delete fails.
|
|
ValueError: If tag_name is invalid.
|
|
"""
|
|
self.logger.info(f"Deleting tag '{tag_name}' in '{working_directory}'...")
|
|
if not tag_name:
|
|
raise ValueError("Tag name cannot be empty for deletion.")
|
|
|
|
# Optional: Basic tag name validation before attempting delete
|
|
pattern = r"^(?![./]|.*([./]{2,}|[.]$|\.lock$))[^ \t\n\r\f\v~^:?*[\\]+(?<!\.)$"
|
|
if not re.match(pattern, tag_name):
|
|
raise ValueError(f"Invalid tag name format for deletion: '{tag_name}'.")
|
|
|
|
cmd = ["git", "tag", "-d", tag_name]
|
|
try:
|
|
# Execute command, check=True raises error if tag doesn't exist
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
self.logger.info(f"Tag '{tag_name}' deleted successfully.")
|
|
return True
|
|
except GitCommandError as e:
|
|
err = e.stderr.lower() if e.stderr else ""
|
|
if "not found" in err:
|
|
msg = f"Tag '{tag_name}' not found for deletion."
|
|
self.logger.error(msg)
|
|
raise GitCommandError(msg, cmd, e.stderr) from e
|
|
else:
|
|
# Other Git errors during delete
|
|
self.logger.error(f"Failed to delete tag '{tag_name}': {e}")
|
|
raise # Re-raise
|
|
except ValueError as ve:
|
|
# Re-raise validation errors
|
|
raise ve
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error deleting tag: {e}")
|
|
raise GitCommandError(f"Unexpected delete tag error: {e}", cmd) from e |