SXXXXXXX_GitUtility/git_commands.py
2025-04-07 15:01:25 +02:00

416 lines
24 KiB
Python

# git_commands.py
import os
import subprocess
import logging
import re
class GitCommandError(Exception):
""" Custom exception for Git command errors. """
def __init__(self, message, command=None, stderr=None):
super().__init__(message)
self.command = command
self.stderr = stderr
def __str__(self):
base = super().__str__(); details = []
if self.command: details.append(f"Cmd: '{' '.join(map(str, self.command))}'")
if self.stderr: details.append(f"Stderr: {self.stderr.strip()}")
return f"{base} ({'; '.join(details)})" if details else base
class GitCommands:
""" Manages Git commands execution. """
def __init__(self, logger):
""" Initializes with a logger. """
if not isinstance(logger, logging.Logger): raise ValueError("Valid logger required.")
self.logger = logger
def log_and_execute(self, command, working_directory, check=True):
""" Executes a command, logs, handles errors. """
safe_cmd = [str(p) for p in command]; cmd_str = ' '.join(safe_cmd)
self.logger.debug(f"Executing: {cmd_str}")
if not working_directory: raise ValueError("Working directory required.")
abs_path = os.path.abspath(working_directory)
if not os.path.isdir(abs_path): raise GitCommandError(f"Invalid WD: {abs_path}", safe_cmd)
cwd = abs_path; self.logger.debug(f"WD: {cwd}")
try:
startupinfo = None; creationflags = 0
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO(); startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
result = subprocess.run(
safe_cmd, cwd=cwd, capture_output=True, text=True, check=check,
encoding='utf-8', errors='replace', startupinfo=startupinfo, creationflags=creationflags
)
out = result.stdout.strip() or "<no stdout>"; err = result.stderr.strip() or "<no stderr>"
self.logger.info(f"Success. Output:\n--- stdout ---\n{out}\n--- stderr ---\n{err}\n---")
return result
except subprocess.CalledProcessError as e:
err = e.stderr.strip() or "<no stderr>"; out = e.stdout.strip() or "<no stdout>"
log_msg = f"Cmd failed (code {e.returncode}) in '{cwd}'.\nCmd: {cmd_str}\nStderr: {err}\nStdout: {out}"
self.logger.error(log_msg)
raise GitCommandError(f"Git cmd failed in '{cwd}'.", safe_cmd, e.stderr) from e
except FileNotFoundError as e: self.logger.error(f"Cmd not found: '{safe_cmd[0]}'"); raise GitCommandError(f"Cmd not found: {safe_cmd[0]}", safe_cmd) from e
except PermissionError as e: self.logger.error(f"Permission denied in '{cwd}'."); raise GitCommandError(f"Permission denied", safe_cmd, str(e)) from e
except Exception as e: self.logger.exception(f"Unexpected error in '{cwd}': {e}"); raise GitCommandError(f"Unexpected error: {e}", safe_cmd) from e
# --- Existing Methods (Prepare, Bundle, Commit, Status, Tag) ---
# (Code omitted for brevity - Ensure they are present and correct)
def prepare_svn_for_git(self, working_directory):
""" Prepares a directory for use with Git. """
self.logger.info(f"Preparing directory for Git: '{working_directory}'")
if not working_directory: raise ValueError("WD empty.")
if not os.path.isdir(working_directory): raise GitCommandError(f"Dir not exists: {working_directory}")
gitignore = os.path.join(working_directory, ".gitignore")
git_dir = os.path.join(working_directory, ".git")
if not os.path.exists(git_dir):
self.logger.info("Initializing Git repo...")
try: self.log_and_execute(["git", "init"], working_directory, check=True); self.logger.info("Repo initialized.")
except Exception as e: self.logger.error(f"Failed init: {e}"); raise
else: self.logger.info("Repo already exists.")
self.logger.debug(f"Checking/updating gitignore: {gitignore}")
try:
entry = ".svn"; needs_write = False; content = ""
if not os.path.exists(gitignore): needs_write = True; content = f"{entry}\n"; self.logger.info("Creating .gitignore.")
else:
try:
with open(gitignore, "r", encoding='utf-8') as f: lines = f.readlines()
ignored = any(l.strip() == entry or l.strip().startswith(entry + '/') for l in lines)
if not ignored: needs_write = True; current = "".join(lines); content = f"\n{entry}\n" if not current.endswith('\n') else f"{entry}\n"; self.logger.info("Appending .svn to .gitignore.")
else: self.logger.info(".svn already ignored.")
except IOError as e: self.logger.warning(f"Cannot read gitignore: {e}")
if needs_write:
mode = 'a' if os.path.exists(gitignore) else 'w'
try:
with open(gitignore, mode, encoding='utf-8', newline='\n') as f: f.write(content)
self.logger.info("Updated .gitignore.")
except IOError as e: self.logger.error(f"Write error gitignore: {e}"); raise GitCommandError(f"Failed update gitignore: {e}") from e
except Exception as e: self.logger.exception(f"Gitignore error: {e}"); raise GitCommandError(f"Gitignore error: {e}") from e
self.logger.info("Preparation complete.")
def create_git_bundle(self, wd, path):
""" Creates a Git bundle file. """
norm_path = os.path.normpath(path).replace("\\", "/"); cmd = ["git", "bundle", "create", norm_path, "--all"]; self.logger.info(f"Creating bundle: {norm_path}")
try:
res = self.log_and_execute(cmd, wd, check=False)
if res.returncode != 0:
err = res.stderr.lower() if res.stderr else ""
if "refusing to create empty bundle" in err: self.logger.warning(f"Empty bundle skipped for '{wd}'.")
else: raise GitCommandError(f"Bundle cmd failed code {res.returncode}", cmd, res.stderr)
elif not os.path.exists(norm_path) or os.path.getsize(norm_path) == 0: self.logger.warning(f"Bundle file missing/empty: {norm_path}")
else: self.logger.info(f"Bundle created: '{norm_path}'.")
except (GitCommandError, ValueError) as e: self.logger.error(f"Failed create bundle for '{wd}': {e}"); raise
except Exception as e: self.logger.exception(f"Unexpected bundle error for '{wd}': {e}"); raise GitCommandError(f"Unexpected bundle error: {e}", cmd) from e
def fetch_from_git_bundle(self, wd, path):
""" Fetches from a bundle and merges. """
norm_path = os.path.normpath(path).replace("\\", "/"); self.logger.info(f"Fetching from '{norm_path}' into '{wd}'")
fetch_cmd = ["git", "fetch", norm_path]; merge_cmd = ["git", "merge", "FETCH_HEAD", "--no-ff"]
try:
self.logger.debug("Executing fetch..."); self.log_and_execute(fetch_cmd, wd, check=True); self.logger.info("Fetch successful.")
self.logger.debug("Executing merge..."); merge_res = self.log_and_execute(merge_cmd, wd, check=False)
out = merge_res.stdout.strip() or ""; err = merge_res.stderr.strip() or ""
if merge_res.returncode == 0:
if "already up to date" in out.lower(): self.logger.info("Already up-to-date.")
else: self.logger.info("Merge successful.")
else:
if "conflict" in (err + out).lower(): msg = f"Merge conflict fetching. Resolve in '{wd}' and commit."; self.logger.error(msg); raise GitCommandError(msg, merge_cmd, merge_res.stderr)
else: msg = f"Merge failed code {merge_res.returncode}."; self.logger.error(msg); raise GitCommandError(msg, merge_cmd, merge_res.stderr)
except (GitCommandError, ValueError) as e: self.logger.error(f"Fetch/merge error for '{wd}': {e}"); raise
except Exception as e: self.logger.exception(f"Unexpected fetch/merge error for '{wd}': {e}"); raise GitCommandError(f"Unexpected fetch/merge error: {e}") from e
def git_commit(self, wd, msg="Autocommit"):
""" Stages all and commits. """
self.logger.info(f"Attempting commit in '{wd}': '{msg}'")
try:
add_cmd = ["git", "add", "."]; self.logger.debug("Staging..."); self.log_and_execute(add_cmd, wd, check=True); self.logger.debug("Staged.")
commit_cmd = ["git", "commit", "-m", msg]; self.logger.debug("Committing..."); res = self.log_and_execute(commit_cmd, wd, check=False)
out_low = res.stdout.lower() if res.stdout else ""; err_low = res.stderr.lower() if res.stderr else ""
if res.returncode == 0: self.logger.info("Commit successful."); return True
elif "nothing to commit" in out_low or "no changes added" in out_low or "nothing added" in out_low or (res.returncode == 1 and not err_low and not out_low): self.logger.info("Nothing to commit."); return False
else: msg_err = f"Commit failed code {res.returncode}."; raise GitCommandError(msg_err, commit_cmd, res.stderr)
except (GitCommandError, ValueError) as e: self.logger.error(f"Commit process error: {e}"); raise
except Exception as e: self.logger.exception(f"Unexpected commit error: {e}"); raise GitCommandError(f"Unexpected commit error: {e}") from e
def git_status_has_changes(self, wd):
""" Checks if repo has uncommitted changes. """
self.logger.debug(f"Checking status in '{wd}'...")
try:
cmd = ["git", "status", "--porcelain"]; res = self.log_and_execute(cmd, wd, check=True)
changed = bool(res.stdout.strip()); self.logger.debug(f"Has changes: {changed}"); return changed
except (GitCommandError, ValueError) as e: self.logger.error(f"Status check error: {e}"); raise
except Exception as e: self.logger.exception(f"Unexpected status error: {e}"); raise GitCommandError(f"Unexpected status error: {e}") from e
def list_tags(self, wd):
""" Lists tags with subjects, sorted newest first. """
self.logger.info(f"Listing tags with subjects in '{wd}'...")
fmt = "%(refname:short)%09%(contents:subject)"; cmd = ["git", "tag", "--list", f"--format={fmt}", "--sort=-creatordate"]
tags = []
try:
res = self.log_and_execute(cmd, wd, check=True)
for line in res.stdout.splitlines():
if line.strip():
parts = line.split('\t', 1); name = parts[0].strip()
subject = parts[1].strip() if len(parts) > 1 else "(No subject)"
tags.append((name, subject))
self.logger.info(f"Found {len(tags)} tags."); self.logger.debug(f"Tags: {tags}"); return tags
except (GitCommandError, ValueError) as e: self.logger.error(f"Error listing tags: {e}"); return []
except Exception as e: self.logger.exception(f"Unexpected error listing tags: {e}"); return []
def create_tag(self, wd, name, message):
""" Creates an annotated tag. """
self.logger.info(f"Creating tag '{name}' in '{wd}'")
if not name or not message: raise ValueError("Tag name/message empty.")
pattern = r"^(?![./]|.*([./]{2,}|[.]$|\.lock$))[^ \t\n\r\f\v~^:?*[\\]+(?<!\.)$"
if not re.match(pattern, name): raise ValueError(f"Invalid tag name: '{name}'.")
cmd = ["git", "tag", "-a", name, "-m", message]
try:
self.log_and_execute(cmd, wd, check=True); self.logger.info(f"Tag '{name}' created.")
except GitCommandError as e:
err = e.stderr.lower() if e.stderr else ""
if "already exists" in err: msg = f"Tag '{name}' exists."; self.logger.error(msg); raise GitCommandError(msg, cmd, e.stderr) from e
else: self.logger.error(f"Failed create tag '{name}': {e}"); raise
except ValueError as ve: raise ve
except Exception as e: self.logger.exception(f"Unexpected tag error: {e}"); raise GitCommandError(f"Unexpected tag error: {e}", cmd) from e
def checkout_tag(self, wd, name):
""" Checks out a specific tag (detached HEAD). """
self.logger.info(f"Checking out tag '{name}' in '{wd}'...")
if not name: raise ValueError("Tag name empty.")
cmd = ["git", "checkout", name]
try:
res = self.log_and_execute(cmd, wd, check=True)
self.logger.info(f"Checked out tag '{name}'.")
output = (res.stderr + res.stdout).lower()
if "detached head" in output: self.logger.warning("Repo in 'detached HEAD'.")
return True
except GitCommandError as e:
err = e.stderr.lower() if e.stderr else ""
patterns = ["did not match any file(s)", f"pathspec '{name.lower()}'"]
if any(p in err for p in patterns): msg = f"Tag '{name}' not found."; self.logger.error(msg); raise GitCommandError(msg, cmd, e.stderr) from e
else: self.logger.error(f"Failed checkout tag '{name}': {e}"); raise
except ValueError as ve: raise ve
except Exception as e: self.logger.exception(f"Unexpected checkout error: {e}"); raise GitCommandError(f"Unexpected checkout error: {e}", cmd) from e
# --- ADDED: Branch Management Methods ---
def get_current_branch(self, working_directory):
"""
Gets the name of the current branch or indicates detached HEAD.
Args:
working_directory (str): Path to the local Git repository.
Returns:
str or None: Current branch name, '(DETACHED HEAD)', or None on error.
"""
self.logger.debug(f"Getting current branch in '{working_directory}'...")
# `git branch --show-current` is simpler if available (Git 2.22+)
cmd_show_current = ["git", "branch", "--show-current"]
try:
result_show = self.log_and_execute(cmd_show_current, working_directory, check=False)
# If successful and output is non-empty, that's the branch name
if result_show.returncode == 0 and result_show.stdout.strip():
branch_name = result_show.stdout.strip()
self.logger.debug(f"Current branch (show-current): {branch_name}")
return branch_name
# If --show-current failed or gave empty output, fallback for older Git or detached HEAD
else:
self.logger.debug("--show-current failed or empty, trying symbolic-ref.")
cmd_symbolic_ref = ["git", "symbolic-ref", "--short", "HEAD"]
result_ref = self.log_and_execute(cmd_symbolic_ref, working_directory, check=False)
if result_ref.returncode == 0 and result_ref.stdout.strip():
branch_name = result_ref.stdout.strip()
self.logger.debug(f"Current branch (symbolic-ref): {branch_name}")
return branch_name
else:
# Not on a branch (detached HEAD) or another error occurred
self.logger.warning("Could not determine current branch name "
f"(symbolic-ref exit: {result_ref.returncode}). Assuming detached HEAD or error.")
# Check if it's detached by checking exit code 128 of symbolic-ref
if result_ref.returncode == 128: # typical exit code for detached HEAD
return "(DETACHED HEAD)"
else:
# Some other error occurred
return "<Error>" # Indicate error state
except (GitCommandError, ValueError) as e:
self.logger.error(f"Error getting current branch: {e}")
return "<Error>"
except Exception as e:
self.logger.exception(f"Unexpected error getting current branch: {e}")
return "<Error>"
def list_branches(self, working_directory):
"""
Lists local Git branches.
Args:
working_directory (str): Path to the local Git repository.
Returns:
list: A list of local branch names (str). Empty on error.
"""
self.logger.info(f"Listing local branches in '{working_directory}'...")
cmd = ["git", "branch", "--list", "--no-color"]
branches = []
try:
result = self.log_and_execute(cmd, working_directory, check=True)
for line in result.stdout.splitlines():
# Remove leading '*' and whitespace from branch names
branch_name = line.lstrip('* ').strip()
if branch_name and "HEAD detached" not in branch_name: # Filter out detached HEAD message
branches.append(branch_name)
self.logger.info(f"Found {len(branches)} local branches.")
self.logger.debug(f"Branches: {branches}")
return branches
except (GitCommandError, ValueError) as e:
self.logger.error(f"Error listing branches: {e}")
return []
except Exception as e:
self.logger.exception(f"Unexpected error listing branches: {e}")
return []
def create_branch(self, working_directory, branch_name, start_point=None):
"""
Creates a new local Git branch.
Args:
working_directory (str): Path to the local Git repository.
branch_name (str): The name for the new branch.
start_point (str, optional): Commit, tag, or branch to start from.
Defaults to current HEAD.
Raises:
GitCommandError: If branch name invalid, exists, or command fails.
ValueError: If arguments invalid.
"""
self.logger.info(f"Creating branch '{branch_name}' in '{working_directory}'...")
if not branch_name: raise ValueError("Branch name cannot be empty.")
# Add validation for branch name format
pattern = r"^(?![./]|.*([./]{2,}|[.]$|[/]$|@\{))[^ \t\n\r\f\v~^:?*[\\]+(?<!\.lock)$"
if not re.match(pattern, branch_name):
raise ValueError(f"Invalid branch name format: '{branch_name}'.")
cmd = ["git", "branch", branch_name]
if start_point:
cmd.append(start_point)
self.logger.info(f"Starting branch from: {start_point}")
try:
self.log_and_execute(cmd, working_directory, check=True)
self.logger.info(f"Branch '{branch_name}' created successfully.")
except GitCommandError as e:
err = e.stderr.lower() if e.stderr else ""
if "already exists" in err:
msg = f"Branch '{branch_name}' already exists."
self.logger.error(msg)
raise GitCommandError(msg, cmd, e.stderr) from e
else:
self.logger.error(f"Failed to create branch '{branch_name}': {e}")
raise
except ValueError as ve: raise ve # Re-raise validation errors
except Exception as e:
self.logger.exception(f"Unexpected error creating branch: {e}")
raise GitCommandError(f"Unexpected branch error: {e}", cmd) from e
def checkout_branch(self, working_directory, branch_name):
"""
Checks out an existing local branch. Preferred over checkout_tag for branches.
Args:
working_directory (str): Path to the local Git repository.
branch_name (str): The name of the branch to switch to.
Returns:
bool: True on success.
Raises:
GitCommandError: If branch not found or checkout fails.
ValueError: If branch_name is invalid.
"""
self.logger.info(f"Switching to branch '{branch_name}' in '{working_directory}'...")
if not branch_name: raise ValueError("Branch name cannot be empty.")
# Use 'git switch' if available (more modern, clearer intent)
# Fallback to 'git checkout' if needed, though their behavior for
# ambiguous names (tag vs branch) can differ slightly.
cmd = ["git", "switch", branch_name]
# Alternative: cmd = ["git", "checkout", branch_name]
try:
# check=True raises error if branch doesn't exist or other failure
self.log_and_execute(cmd, working_directory, check=True)
self.logger.info(f"Switched successfully to branch '{branch_name}'.")
return True
except GitCommandError as e:
err = e.stderr.lower() if e.stderr else ""
# Check for common errors like branch not found
if "invalid reference" in err or "did not match" in err:
msg = f"Branch '{branch_name}' not found or invalid."
self.logger.error(msg)
raise GitCommandError(msg, cmd, e.stderr) from e
# Git switch/checkout might fail due to uncommitted changes,
# but we check this *before* calling this method now.
# If it still fails for changes, log it.
elif "local changes" in err or "would be overwritten" in err:
msg = f"Checkout failed due to uncommitted changes conflicting with switch to '{branch_name}'."
self.logger.error(msg)
raise GitCommandError(msg, cmd, e.stderr) from e
else:
self.logger.error(f"Failed switch to branch '{branch_name}': {e}")
raise # Re-raise other Git command errors
except ValueError as ve: raise ve
except Exception as e:
self.logger.exception(f"Unexpected error switching branch: {e}")
raise GitCommandError(f"Unexpected switch error: {e}", cmd) from e
def delete_branch(self, working_directory, branch_name, force=False):
"""
Deletes a local branch.
Args:
working_directory (str): Path to the local Git repository.
branch_name (str): The name of the branch to delete.
force (bool): If True, use '-D' (force delete) instead of '-d'.
Defaults to False.
Returns:
bool: True on success.
Raises:
GitCommandError: If branch not found or delete fails.
ValueError: If branch_name is invalid.
"""
self.logger.info(f"Deleting local branch '{branch_name}' in '{working_directory}'...")
if not branch_name: raise ValueError("Branch name cannot be empty.")
# Prevent deleting common main branches as a safeguard? Optional.
# if branch_name in ['main', 'master']: raise ValueError("Cannot delete 'main'/'master'.")
delete_flag = "-D" if force else "-d"
cmd = ["git", "branch", delete_flag, branch_name]
self.logger.info(f"Using delete flag: {delete_flag}")
try:
self.log_and_execute(cmd, working_directory, check=True)
self.logger.info(f"Branch '{branch_name}' deleted successfully.")
return True
except GitCommandError as e:
err = e.stderr.lower() if e.stderr else ""
# Check for specific errors
if "not found" in err:
msg = f"Branch '{branch_name}' not found for deletion."
self.logger.error(msg)
raise GitCommandError(msg, cmd, e.stderr) from e
elif "not fully merged" in err and not force:
# This error happens with '-d' but not '-D'
msg = f"Branch '{branch_name}' not fully merged. Use force delete?"
self.logger.warning(msg)
# Raise specific error for UI to potentially offer force delete
raise GitCommandError(msg, cmd, e.stderr) from e
elif "cannot delete branch" in err and "checked out" in err:
msg = f"Cannot delete branch '{branch_name}' because it is currently checked out."
self.logger.error(msg)
raise GitCommandError(msg, cmd, e.stderr) from e
else:
self.logger.error(f"Failed to delete branch '{branch_name}': {e}")
raise # Re-raise other errors
except ValueError as ve: raise ve
except Exception as e:
self.logger.exception(f"Unexpected error deleting branch: {e}")
raise GitCommandError(f"Unexpected delete error: {e}", cmd) from e