SXXXXXXX_GitUtility/git_commands.py

1097 lines
46 KiB
Python

# --- START OF FILE git_commands.py ---
# git_commands.py
import os
import subprocess
import logging
# Rimosso import logging
import re
# Importa il nuovo gestore della coda log
import log_handler
# --- Custom Exception Definition (invariata) ---
class GitCommandError(Exception):
"""
Custom exception for handling Git command execution errors.
Includes the original command and stderr details if available.
"""
def __init__(self, message, command=None, stderr=None):
"""Initialize the GitCommandError."""
super().__init__(str(message))
self.command = command
self.stderr = stderr if stderr else ""
def __str__(self):
"""Return a formatted string representation including command details."""
base_message = super().__str__()
details = []
if self.command:
safe_command_parts = [str(part) for part in self.command]
command_str = " ".join(safe_command_parts)
details.append(f"Command: '{command_str}'")
stderr_str = self.stderr.strip()
if stderr_str:
details.append(f"Stderr: {stderr_str}")
if details:
details_str = "; ".join(details)
return f"{base_message} ({details_str})"
else:
return base_message
# --- Main Git Commands Class ---
class GitCommands:
"""
Manages Git command execution, logging (via log_handler), and error handling.
"""
# Rimosso logger da __init__
def __init__(self, logger_ignored=None): # Accetta argomento ma lo ignora
"""Initializes the GitCommands class."""
# Non c'è più self.logger
log_handler.log_debug("GitCommands initialized.", func_name="__init__")
def log_and_execute(
self,
command: list,
working_directory: str,
check: bool = True,
# Usa i livelli di logging standard per il parametro
log_output_level: int = logging.INFO, # Default a INFO
):
"""
Executes a shell command, logs details via log_handler, handles errors.
Args:
command (list): Command and arguments as a list of strings.
working_directory (str): The directory to execute the command in.
check (bool, optional): If True, raises GitCommandError on non-zero exit.
log_output_level (int, optional): Logging level threshold for logging
stdout/stderr on SUCCESS. Uses standard
logging levels (e.g., logging.INFO, logging.DEBUG).
Returns:
subprocess.CompletedProcess: Result object.
Raises:
GitCommandError, ValueError, FileNotFoundError (wrapped), PermissionError (wrapped)
"""
func_name = "log_and_execute" # Per i log interni a questa funzione
safe_command_parts = [str(part) for part in command]
command_str = " ".join(safe_command_parts)
# Log l'intento di eseguire il comando
log_handler.log_debug(
f"Executing in '{working_directory}': {command_str}", func_name=func_name
)
# --- Validate Working Directory ---
if not working_directory:
msg = "Working directory cannot be None or empty."
log_handler.log_error(msg, func_name=func_name)
raise ValueError(msg)
if working_directory == ".":
effective_cwd = os.getcwd()
else:
effective_cwd = os.path.abspath(working_directory)
if not os.path.isdir(effective_cwd):
msg = (
f"Working directory does not exist or is not a dir: {effective_cwd}"
)
log_handler.log_error(msg, func_name=func_name)
raise GitCommandError(msg, command=safe_command_parts)
log_handler.log_debug(f"Effective CWD: {effective_cwd}", func_name=func_name)
# --- Execute Command ---
try:
# Setup per nascondere finestra console su Windows
startupinfo = None
creationflags = 0
if os.name == "nt":
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
# creationflags = subprocess.CREATE_NO_WINDOW
# Timeout diagnostico (mantenuto per ora)
timeout_seconds = 30
log_handler.log_debug(
f"Setting timeout to {timeout_seconds} seconds.", func_name=func_name
)
log_handler.log_debug(
f"Attempting subprocess.run for: {command_str}", func_name=func_name
)
# Esecuzione comando
result = subprocess.run(
safe_command_parts,
cwd=effective_cwd,
capture_output=True,
text=True,
check=check,
encoding="utf-8",
errors="replace",
timeout=timeout_seconds,
startupinfo=startupinfo,
creationflags=creationflags,
)
log_handler.log_debug(
f"subprocess.run finished. RC={result.returncode}", func_name=func_name
)
# --- Log Output di Successo ---
# Controlla se il comando è considerato "riuscito" (RC=0 o check=False)
# e se il livello di log desiderato per l'output è abilitato
# (Nota: il livello del logger non è più qui, il filtro avviene nel processore della coda)
# Per semplicità, logghiamo l'output qui se RC=0 o check=False
# Il processore della coda deciderà se mostrarlo nella GUI.
if result.returncode == 0 or not check:
# Logga sempre l'output a livello DEBUG per tracciamento
stdout_log_debug = (
result.stdout.strip() if result.stdout else "<no stdout>"
)
stderr_log_debug = (
result.stderr.strip() if result.stderr else "<no stderr>"
)
log_handler.log_debug(
f"Command successful (RC={result.returncode}). Output:\n"
f"--- stdout ---\n{stdout_log_debug}\n"
f"--- stderr ---\n{stderr_log_debug}\n"
f"--- End Output ---",
func_name=func_name,
)
# Logga anche al livello richiesto (es. INFO) se diverso da DEBUG
if log_output_level > logging.DEBUG:
# Usa lo stesso output già preparato
log_handler.log_message(
log_output_level, # Usa il livello passato
f"Command successful. Output:\n"
f"--- stdout ---\n{stdout_log_debug}\n"
f"--- stderr ---\n{stderr_log_debug}\n"
f"--- End Output ---",
func_name=func_name,
)
return result
# --- Gestione Errori ---
except subprocess.TimeoutExpired as e:
log_handler.log_error(
f"Command timed out after {timeout_seconds}s: {command_str}",
func_name=func_name,
)
stderr_out = e.stderr.strip() if e.stderr else "<no stderr>"
stdout_out = e.stdout.strip() if e.stdout else "<no stdout>"
log_handler.log_error(f"Timeout stderr: {stderr_out}", func_name=func_name)
log_handler.log_error(f"Timeout stdout: {stdout_out}", func_name=func_name)
raise GitCommandError(
f"Timeout after {timeout_seconds}s.",
command=safe_command_parts,
stderr=e.stderr,
) from e
except subprocess.CalledProcessError as e:
log_handler.log_error(
f"CalledProcessError caught. RC={e.returncode}", func_name=func_name
)
stderr_err = e.stderr.strip() if e.stderr else "<no stderr>"
stdout_err = e.stdout.strip() if e.stdout else "<no stdout>"
err_msg = (
f"Command failed (RC {e.returncode}) in '{effective_cwd}'.\n"
f"CMD: {command_str}\nSTDERR: {stderr_err}\nSTDOUT: {stdout_err}"
)
log_handler.log_error(err_msg, func_name=func_name)
raise GitCommandError(
f"Git command failed in '{effective_cwd}'.",
command=safe_command_parts,
stderr=e.stderr,
) from e
except FileNotFoundError as e:
log_handler.log_error(
f"FileNotFoundError for command: {safe_command_parts[0]}",
func_name=func_name,
)
error_msg = f"Command not found: '{safe_command_parts[0]}'. Is Git installed/in PATH?"
log_handler.log_error(error_msg, func_name=func_name)
raise GitCommandError(error_msg, command=safe_command_parts) from e
except PermissionError as e:
log_handler.log_error("PermissionError caught.", func_name=func_name)
error_msg = f"Permission denied executing command in '{effective_cwd}'."
log_handler.log_error(error_msg, func_name=func_name)
raise GitCommandError(
error_msg, command=safe_command_parts, stderr=str(e)
) from e
except Exception as e:
# Usa log_exception per includere traceback (su console)
log_handler.log_exception(
f"Unexpected Exception executing command: {e}", func_name=func_name
)
raise GitCommandError(
f"Unexpected execution error: {e}", command=safe_command_parts
) from e
# --- Core Repo Operations (usano log_handler) ---
def prepare_svn_for_git(self, working_directory: str):
func_name = "prepare_svn_for_git"
log_handler.log_info(
f"Preparing directory for Git: '{working_directory}'", func_name=func_name
)
if not working_directory:
raise ValueError("Working directory cannot be None or empty.")
if not os.path.isdir(working_directory):
raise GitCommandError(
f"Target directory does not exist: {working_directory}"
)
gitignore_path = os.path.join(working_directory, ".gitignore")
git_dir_path = os.path.join(working_directory, ".git")
svn_ignore = ".svn"
if not os.path.exists(git_dir_path):
log_handler.log_info(
"No existing Git repo found. Initializing...", func_name=func_name
)
try:
init_cmd = ["git", "init"]
self.log_and_execute(init_cmd, working_directory, check=True)
log_handler.log_info("Git repo initialized.", func_name=func_name)
except (GitCommandError, ValueError) as e:
log_handler.log_error(f"Failed init Git repo: {e}", func_name=func_name)
raise
else:
log_handler.log_info(
"Existing Git repo found. Skipping init.", func_name=func_name
)
log_handler.log_debug(
f"Checking/updating .gitignore: {gitignore_path}", func_name=func_name
)
try:
needs_write = False
content = ""
write_content = ""
if not os.path.exists(gitignore_path):
log_handler.log_info(
"'.gitignore' not found. Creating with .svn entry.",
func_name=func_name,
)
write_content = f"{svn_ignore}\n"
needs_write = True
else:
try:
with open(gitignore_path, "r", encoding="utf-8") as f:
lines = f.readlines()
ignored = any(
l.strip() == svn_ignore
or l.strip().startswith(svn_ignore + "/")
for l in lines
)
if not ignored:
log_handler.log_info(
"'.svn' not found in .gitignore. Appending...",
func_name=func_name,
)
content = "".join(lines)
write_content += (
"\n" if not content.endswith("\n") else ""
) + f"{svn_ignore}\n"
needs_write = True
else:
log_handler.log_info(
"'.svn' already in .gitignore.", func_name=func_name
)
except IOError as r_err:
log_handler.log_error(
f"Could not read .gitignore: {r_err}", func_name=func_name
)
raise IOError(f"Failed read: {r_err}") from r_err
if needs_write:
mode = (
"a"
if os.path.exists(gitignore_path) and write_content.startswith("\n")
else "w"
)
log_handler.log_debug(
f"Writing to {gitignore_path} (mode: {mode})", func_name=func_name
)
try:
with open(
gitignore_path, mode, encoding="utf-8", newline="\n"
) as f:
f.write(write_content)
log_handler.log_info(
f"Updated '{gitignore_path}'.", func_name=func_name
)
except IOError as w_err:
log_handler.log_error(
f"Error writing .gitignore: {w_err}", func_name=func_name
)
raise IOError(f"Failed write: {w_err}") from w_err
except IOError as io_err:
log_handler.log_error(
f"IO error for .gitignore: {io_err}", func_name=func_name
)
raise
except Exception as e:
log_handler.log_exception(
f"Unexpected error managing .gitignore: {e}", func_name=func_name
)
raise GitCommandError(f"Unexpected .gitignore error: {e}") from e
log_handler.log_info(
f"Directory preparation complete for '{working_directory}'.",
func_name=func_name,
)
def create_git_bundle(self, working_directory: str, bundle_path: str):
func_name = "create_git_bundle"
norm_path = os.path.normpath(bundle_path)
git_path = norm_path.replace("\\", "/")
log_handler.log_info(
f"Attempting to create Git bundle: {git_path}", func_name=func_name
)
log_handler.log_debug(f"Source repo: {working_directory}", func_name=func_name)
command = ["git", "bundle", "create", git_path, "--all"]
try:
result = self.log_and_execute(command, working_directory, check=False)
if result.returncode != 0:
stderr_low = result.stderr.lower() if result.stderr else ""
if (
"refusing to create empty bundle" in stderr_low
or "does not contain any references" in stderr_low
):
log_handler.log_warning(
f"Bundle creation skipped: Repo '{working_directory}' empty.",
func_name=func_name,
)
else:
err_msg = f"Git bundle command failed (RC {result.returncode})."
log_handler.log_error(
f"{err_msg} Stderr: {result.stderr.strip()}",
func_name=func_name,
)
raise GitCommandError(
err_msg, command=command, stderr=result.stderr
)
else:
if not os.path.exists(norm_path) or os.path.getsize(norm_path) == 0:
log_handler.log_warning(
f"Bundle cmd OK, but file '{norm_path}' missing/empty.",
func_name=func_name,
)
else:
log_handler.log_info(
f"Bundle created successfully: '{norm_path}'.",
func_name=func_name,
)
except Exception as e:
log_handler.log_exception(
f"Unexpected bundle creation error: {e}", func_name=func_name
)
raise GitCommandError(
f"Unexpected bundle error: {e}", command=command
) from e
def fetch_from_git_bundle(self, working_directory: str, bundle_path: str):
func_name = "fetch_from_git_bundle"
if not os.path.isfile(bundle_path):
raise FileNotFoundError(f"Bundle file not found: {bundle_path}")
norm_path = os.path.normpath(bundle_path).replace("\\", "/")
log_handler.log_info(
f"Fetching from '{norm_path}' into '{working_directory}'",
func_name=func_name,
)
fetch_cmd = ["git", "fetch", norm_path, "--verbose"]
merge_cmd = ["git", "merge", "FETCH_HEAD", "--no-ff", "--no-edit"]
try:
log_handler.log_debug("Executing fetch...", func_name=func_name)
self.log_and_execute(fetch_cmd, working_directory, check=True)
log_handler.log_info("Fetch successful.", func_name=func_name)
log_handler.log_debug("Executing merge...", func_name=func_name)
merge_res = self.log_and_execute(merge_cmd, working_directory, check=False)
out_log = merge_res.stdout.strip() if merge_res.stdout else ""
err_log = merge_res.stderr.strip() if merge_res.stderr else ""
combined_low = (out_log + err_log).lower()
if merge_res.returncode == 0:
if "already up to date" in combined_low:
log_handler.log_info(
"Repo already up-to-date.", func_name=func_name
)
else:
log_handler.log_info("Merge successful.", func_name=func_name)
log_handler.log_debug(
f"Merge stdout:\n{out_log}", func_name=func_name
)
else:
if (
"conflict" in combined_low
or "automatic merge failed" in combined_low
):
msg = f"Merge conflict occurred. Resolve manually in '{working_directory}' and commit."
log_handler.log_error(msg, func_name=func_name)
raise GitCommandError(
msg, command=merge_cmd, stderr=merge_res.stderr
)
else:
msg = f"Merge command failed (RC {merge_res.returncode})."
log_handler.log_error(
f"{msg} Stderr: {err_log}", func_name=func_name
)
raise GitCommandError(
msg, command=merge_cmd, stderr=merge_res.stderr
)
except (GitCommandError, ValueError, FileNotFoundError) as e:
log_handler.log_error(
f"Fetch/merge failed for '{working_directory}': {e}",
func_name=func_name,
)
raise
except Exception as e:
log_handler.log_exception(
f"Unexpected fetch/merge error for '{working_directory}': {e}",
func_name=func_name,
)
raise GitCommandError(f"Unexpected fetch/merge error: {e}") from e
# --- Commit and Status ---
def git_commit(self, working_directory: str, message: str):
func_name = "git_commit"
log_handler.log_info(
f"Attempting commit in '{working_directory}' msg: '{message[:50]}...'",
func_name=func_name,
)
if not message or message.isspace():
raise ValueError("Commit message empty.")
try:
add_cmd = ["git", "add", "."]
log_handler.log_debug(
"Staging all changes ('git add .')...", func_name=func_name
)
self.log_and_execute(add_cmd, working_directory, check=True)
log_handler.log_debug("Staging successful.", func_name=func_name)
commit_cmd = ["git", "commit", "-m", message]
log_handler.log_debug("Attempting commit...", func_name=func_name)
result = self.log_and_execute(commit_cmd, working_directory, check=False)
out_low = result.stdout.lower() if result.stdout else ""
err_low = result.stderr.lower() if result.stderr else ""
combined_low = out_low + err_low
if result.returncode == 0:
log_handler.log_info("Commit successful.", func_name=func_name)
return True
elif (
"nothing to commit" in combined_low
or "no changes added to commit" in combined_low
or "nothing added to commit" in combined_low
):
log_handler.log_info(
"No changes available to commit.", func_name=func_name
)
return False
else:
msg = f"Commit command failed unexpectedly (RC {result.returncode})."
log_handler.log_error(
f"{msg} Stderr: {result.stderr.strip()}", func_name=func_name
)
raise GitCommandError(msg, command=commit_cmd, stderr=result.stderr)
except (GitCommandError, ValueError) as e:
log_handler.log_error(
f"Error during staging/commit: {e}", func_name=func_name
)
raise
def git_status_has_changes(self, working_directory: str):
func_name = "git_status_has_changes"
log_handler.log_debug(
f"Checking status for changes in '{working_directory}'...",
func_name=func_name,
)
try:
cmd = ["git", "status", "--porcelain=v1", "-unormal"]
result = self.log_and_execute(
cmd, working_directory, check=True, log_output_level=logging.DEBUG
)
has_changes = bool(result.stdout.strip())
log_handler.log_debug(
f"Status check complete. Has changes: {has_changes}",
func_name=func_name,
)
return has_changes
except GitCommandError as e:
log_handler.log_error(
f"Git status check command failed: {e}", func_name=func_name
)
raise
# --- Tag Management ---
def list_tags(self, working_directory: str):
func_name = "list_tags"
log_handler.log_info(
f"Listing tags in '{working_directory}'...", func_name=func_name
)
fmt = "%(refname:short)%09%(contents:subject)"
cmd = ["git", "tag", "--list", f"--format={fmt}", "--sort=-creatordate"]
tags = []
try:
result = self.log_and_execute(cmd, working_directory, check=True)
for line in result.stdout.splitlines():
line_strip = line.strip()
if line_strip:
parts = line_strip.split("\t", 1)
name = parts[0].strip()
subject = (
parts[1].strip()
if len(parts) > 1 and parts[1].strip()
else "(No subject/Lightweight)"
)
tags.append((name, subject))
log_handler.log_info(f"Found {len(tags)} tags.", func_name=func_name)
log_handler.log_debug(f"Tags found: {tags}", func_name=func_name)
return tags
except GitCommandError as e:
log_handler.log_error(f"Error listing tags: {e}", func_name=func_name)
return []
except Exception as e:
log_handler.log_exception(
f"Unexpected error listing tags: {e}", func_name=func_name
)
return []
def create_tag(self, working_directory: str, tag_name: str, message: str):
func_name = "create_tag"
log_handler.log_info(
f"Creating tag '{tag_name}' in '{working_directory}'", func_name=func_name
)
if not tag_name or tag_name.isspace():
raise ValueError("Tag name empty.")
if not message or message.isspace():
raise ValueError("Tag message empty.")
pattern = r"^(?![./]|.*([./]{2,}|[.]$|\.lock$|@\{|\\\\))[^ \t\n\r\f\v~^:?*[\\]+(?<!\.)$"
if not re.match(pattern, tag_name):
raise ValueError(f"Invalid tag name format: '{tag_name}'.")
cmd = ["git", "tag", "-a", tag_name, "-m", message]
try:
self.log_and_execute(cmd, working_directory, check=True)
log_handler.log_info(
f"Annotated tag '{tag_name}' created.", func_name=func_name
)
except GitCommandError as e:
stderr_low = e.stderr.lower() if e.stderr else ""
if "already exists" in stderr_low:
msg = f"Tag '{tag_name}' already exists."
log_handler.log_error(msg, func_name=func_name)
raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e
else:
log_handler.log_error(
f"Failed create tag '{tag_name}': {e}", func_name=func_name
)
raise
def checkout_tag(self, working_directory: str, tag_name: str):
func_name = "checkout_tag"
log_handler.log_info(
f"Checking out tag '{tag_name}' in '{working_directory}'...",
func_name=func_name,
)
if not tag_name or tag_name.isspace():
raise ValueError("Tag name empty.")
cmd = ["git", "checkout", tag_name]
try:
result = self.log_and_execute(cmd, working_directory, check=True)
log_handler.log_info(
f"Successfully checked out tag '{tag_name}'.", func_name=func_name
)
output = (result.stderr + result.stdout).lower()
if "detached head" in output:
log_handler.log_warning(
"Repo is now in 'detached HEAD' state.", func_name=func_name
)
return True
except GitCommandError as e:
stderr_low = e.stderr.lower() if e.stderr else ""
not_found = [
"did not match any file(s)",
f"pathspec '{tag_name.lower()}' did not match",
"reference is not a tree",
]
if any(p in stderr_low for p in not_found):
msg = f"Tag '{tag_name}' not found or invalid."
log_handler.log_error(msg, func_name=func_name)
raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e
else:
log_handler.log_error(
f"Failed checkout tag '{tag_name}': {e}", func_name=func_name
)
raise
# --- Branch Management ---
def list_branches(self, working_directory: str):
func_name = "list_branches"
log_handler.log_info(
f"Listing local branches in '{working_directory}'...", func_name=func_name
)
fmt = "%(HEAD)%(refname:short)"
cmd = ["git", "branch", "--list", f"--format={fmt}"]
branches = []
current = None
try:
result = self.log_and_execute(cmd, working_directory, check=True)
for line in result.stdout.splitlines():
line_strip = line.strip()
if line_strip:
is_curr = line_strip.startswith("*")
name = line_strip.lstrip("* ").strip()
branches.append(name)
if is_curr:
current = name
branches.sort()
curr_disp = current if current else "None (Detached?)"
log_handler.log_info(
f"Found {len(branches)} branches. Current: {curr_disp}",
func_name=func_name,
)
log_handler.log_debug(f"Branches: {branches}", func_name=func_name)
return branches, current
except GitCommandError as e:
log_handler.log_error(f"Error listing branches: {e}", func_name=func_name)
return [], None
except Exception as e:
log_handler.log_exception(
f"Unexpected error listing branches: {e}", func_name=func_name
)
return [], None
def checkout_branch(self, working_directory: str, branch_name: str):
func_name = "checkout_branch"
log_handler.log_info(
f"Checking out branch '{branch_name}' in '{working_directory}'...",
func_name=func_name,
)
if not branch_name or branch_name.isspace():
raise ValueError("Branch name empty.")
cmd = ["git", "checkout", branch_name]
try:
self.log_and_execute(cmd, working_directory, check=True)
log_handler.log_info(
f"Successfully checked out branch '{branch_name}'.", func_name=func_name
)
return True
except GitCommandError as e:
stderr_low = e.stderr.lower() if e.stderr else ""
not_found = [
"did not match any file(s)",
f"pathspec '{branch_name.lower()}' did not match",
"is not a commit and a branch",
]
if any(p in stderr_low for p in not_found):
msg = f"Branch '{branch_name}' not found or invalid."
log_handler.log_error(msg, func_name=func_name)
raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e
else:
log_handler.log_error(
f"Failed checkout branch '{branch_name}': {e}", func_name=func_name
)
raise
def create_branch(self, working_directory: str, new_branch_name: str):
func_name = "create_branch"
log_handler.log_info(
f"Creating branch '{new_branch_name}' in '{working_directory}'...",
func_name=func_name,
)
if not new_branch_name or new_branch_name.isspace():
raise ValueError("Branch name empty.")
pattern = (
r"^(?!\.| |.*[/.]\.|\.|.*\\|.*@\{|.*[/]$|.*\.\.)[^ \t\n\r\f\v~^:?*[\\]+$"
)
if not re.match(pattern, new_branch_name) or new_branch_name.lower() == "head":
raise ValueError(f"Invalid branch name format: '{new_branch_name}'.")
cmd = ["git", "branch", new_branch_name]
try:
self.log_and_execute(cmd, working_directory, check=True)
log_handler.log_info(
f"Branch '{new_branch_name}' created.", func_name=func_name
)
return True
except GitCommandError as e:
stderr_low = e.stderr.lower() if e.stderr else ""
if "already exists" in stderr_low:
msg = f"Branch '{new_branch_name}' already exists."
log_handler.log_error(msg, func_name=func_name)
raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e
else:
log_handler.log_error(
f"Failed create branch '{new_branch_name}': {e}",
func_name=func_name,
)
raise
# --- History / Log ---
def get_commit_log(
self, working_directory: str, max_count: int = 200, branch: str | None = None
):
func_name = "get_commit_log"
scope = f"'{branch}'" if branch else "all"
log_handler.log_info(
f"Retrieving commit log for {scope} in '{working_directory}' (max {max_count})...",
func_name=func_name,
)
fmt = "%h %ad | %an |%d %s"
date_fmt = "format:%Y-%m-%d %H:%M"
cmd = [
"git",
"log",
f"--max-count={max_count}",
f"--pretty=format:{fmt}",
f"--date={date_fmt}",
"--decorate=short",
"--source",
]
if branch:
cmd.append(branch)
else:
cmd.append("--all")
try:
result = self.log_and_execute(
cmd, working_directory, check=True, log_output_level=logging.DEBUG
)
lines = [
line.strip() for line in result.stdout.splitlines() if line.strip()
]
log_handler.log_info(
f"Retrieved {len(lines)} log entries for {scope}.", func_name=func_name
)
return lines
except GitCommandError as e:
log_handler.log_error(
f"Error retrieving commit log for {scope}: {e}", func_name=func_name
)
return []
except Exception as e:
log_handler.log_exception(
f"Unexpected error retrieving commit log for {scope}: {e}",
func_name=func_name,
)
return []
# --- Clone from Bundle ---
def clone_from_bundle(self, bundle_path: str, destination_directory: str):
func_name = "clone_from_bundle"
log_handler.log_info(
f"Cloning from '{bundle_path}' into '{destination_directory}'",
func_name=func_name,
)
if not bundle_path:
raise ValueError("Bundle path empty.")
if not destination_directory:
raise ValueError("Destination empty.")
abs_bundle = os.path.abspath(bundle_path)
abs_dest = os.path.abspath(destination_directory)
if not os.path.isfile(abs_bundle):
msg = f"Bundle not found: {abs_bundle}"
log_handler.log_error(msg, func_name=func_name)
raise FileNotFoundError(msg)
if os.path.exists(abs_dest):
if not os.path.isdir(abs_dest):
msg = f"Dest exists but not dir: {abs_dest}"
log_handler.log_error(msg, func_name=func_name)
raise IOError(msg)
if os.listdir(abs_dest):
msg = f"Dest dir not empty: {abs_dest}"
log_handler.log_error(msg, func_name=func_name)
raise IOError(msg)
log_handler.log_debug("Dest dir exists and empty.", func_name=func_name)
else:
log_handler.log_debug(
f"Dest dir does not exist. Creating: {abs_dest}", func_name=func_name
)
try:
os.makedirs(abs_dest)
log_handler.log_info(
f"Created dest dir: {abs_dest}", func_name=func_name
)
except OSError as e:
msg = f"Failed create dest dir '{abs_dest}': {e}"
log_handler.log_error(msg, func_name=func_name)
raise IOError(msg) from e
cmd = ["git", "clone", abs_bundle, abs_dest, "--progress"]
try:
self.log_and_execute(
cmd, working_directory=".", check=True, log_output_level=logging.INFO
)
log_handler.log_info(
f"Cloned successfully into '{abs_dest}'.", func_name=func_name
)
return True
except GitCommandError as clone_e:
log_handler.log_error(
f"Failed clone from bundle '{abs_bundle}': {clone_e}",
func_name=func_name,
)
if os.path.isdir(abs_dest):
try:
if not os.listdir(abs_dest):
os.rmdir(abs_dest)
log_handler.log_info(
f"Removed empty dest dir after failed clone: {abs_dest}",
func_name=func_name,
)
else:
log_handler.log_warning(
f"Dest dir not empty after failed clone: {abs_dest}",
func_name=func_name,
)
except OSError as rm_e:
log_handler.log_warning(
f"Could not remove dest dir '{abs_dest}': {rm_e}",
func_name=func_name,
)
raise clone_e
except Exception as e:
log_handler.log_exception(
f"Unexpected error cloning from bundle: {e}", func_name=func_name
)
raise GitCommandError(f"Unexpected clone error: {e}", command=cmd) from e
# --- Gitignore and File Tracking ---
def get_tracked_files(self, working_directory: str):
func_name = "get_tracked_files"
log_handler.log_debug(
f"Getting tracked files in '{working_directory}'", func_name=func_name
)
cmd = ["git", "ls-files", "-z"]
try:
result = self.log_and_execute(
cmd, working_directory, check=True, log_output_level=logging.DEBUG
)
files = [f for f in result.stdout.split("\0") if f]
log_handler.log_info(
f"Found {len(files)} tracked files.", func_name=func_name
)
return files
except GitCommandError as e:
log_handler.log_error(
f"Failed list tracked files: {e}", func_name=func_name
)
raise
def check_if_would_be_ignored(self, working_directory: str, path_to_check: str):
func_name = "check_if_would_be_ignored"
cmd = ["git", "check-ignore", "--quiet", "--no-index", "--", path_to_check]
log_handler.log_debug(
f"Checking ignore status for: '{path_to_check}'", func_name=func_name
)
try:
result = self.log_and_execute(cmd, working_directory, check=False)
if result.returncode == 0:
log_handler.log_debug(
f"Path '{path_to_check}' WOULD be ignored.", func_name=func_name
)
return True
elif result.returncode == 1:
log_handler.log_debug(
f"Path '{path_to_check}' would NOT be ignored.", func_name=func_name
)
return False
else:
msg = f"check-ignore failed (RC {result.returncode})"
log_handler.log_error(
f"{msg}. Stderr: {result.stderr.strip()}", func_name=func_name
)
raise GitCommandError(msg, command=cmd, stderr=result.stderr)
except GitCommandError as e:
log_handler.log_error(
f"Failed check_ignore for '{path_to_check}': {e}", func_name=func_name
)
raise
def remove_from_tracking(self, working_directory: str, files_to_untrack: list[str]):
func_name = "remove_from_tracking"
if not files_to_untrack:
log_handler.log_debug("No files provided to untrack.", func_name=func_name)
return True
num = len(files_to_untrack)
log_handler.log_info(
f"Removing {num} items from tracking index...", func_name=func_name
)
batch_size = 100
succeeded = True
for i in range(0, num, batch_size):
batch = files_to_untrack[i : i + batch_size]
batch_num = i // batch_size + 1
log_handler.log_info(
f"Processing untrack batch {batch_num} ({len(batch)} items)...",
func_name=func_name,
)
log_handler.log_debug(
f"Batch {batch_num} items: {batch}", func_name=func_name
)
cmd = ["git", "rm", "--cached", "-r", "--ignore-unmatch", "--"]
cmd.extend(batch)
try:
self.log_and_execute(
cmd, working_directory, check=True, log_output_level=logging.DEBUG
)
log_handler.log_info(
f"Batch {batch_num} untracked successfully.", func_name=func_name
)
except GitCommandError as e:
log_handler.log_error(
f"Failed untrack batch {batch_num}: {e}", func_name=func_name
)
succeeded = False
raise GitCommandError(
f"Failed batch {batch_num}. Error: {e}",
command=cmd,
stderr=getattr(e, "stderr", None),
) from e
log_handler.log_info("All untracking batches completed.", func_name=func_name)
return succeeded
def get_matching_gitignore_rule(self, working_directory: str, path_to_check: str):
func_name = "get_matching_gitignore_rule"
cmd = ["git", "check-ignore", "-v", "--no-index", "--", path_to_check]
log_handler.log_debug(
f"Getting matching rule for: '{path_to_check}'", func_name=func_name
)
try:
result = self.log_and_execute(cmd, working_directory, check=False)
if result.returncode == 0:
line = result.stdout.strip()
if line and "\t" in line:
rule_part = line.split("\t", 1)[0]
parts = rule_part.split(":", 2)
if len(parts) == 3:
pattern = parts[2]
log_handler.log_debug(
f"Path '{path_to_check}' matched rule: '{pattern}'",
func_name=func_name,
)
return pattern
else:
log_handler.log_warning(
f"Could not parse pattern from: {rule_part}",
func_name=func_name,
)
return None
else:
log_handler.log_warning(
f"Unexpected output from check-ignore -v: {line}",
func_name=func_name,
)
return None
elif result.returncode == 1:
log_handler.log_debug(
f"Path '{path_to_check}' not ignored.", func_name=func_name
)
return None
else:
msg = f"check-ignore -v failed (RC {result.returncode})"
log_handler.log_error(
f"{msg}. Stderr: {result.stderr.strip()}", func_name=func_name
)
raise GitCommandError(msg, command=cmd, stderr=result.stderr)
except GitCommandError as e:
log_handler.log_error(
f"Failed get_matching_rule for '{path_to_check}': {e}",
func_name=func_name,
)
raise
def get_status_short(self, working_directory: str):
func_name = "get_status_short"
log_handler.log_debug(
f"Getting short status for '{working_directory}'", func_name=func_name
)
cmd = ["git", "status", "--short", "-z", "--ignored=no"]
try:
result = self.log_and_execute(
cmd, working_directory, check=True, log_output_level=logging.DEBUG
)
lines = [line for line in result.stdout.split("\0") if line]
log_handler.log_info(
f"Status check returned {len(lines)} items.", func_name=func_name
)
return lines
except GitCommandError as e:
log_handler.log_error(f"Failed get status: {e}", func_name=func_name)
return []
except Exception as e:
log_handler.log_exception(
f"Unexpected error getting status: {e}", func_name=func_name
)
return []
def get_file_content_from_ref(
self, working_directory: str, file_path: str, ref: str = "HEAD"
):
func_name = "get_file_content_from_ref"
git_path = file_path.replace(os.path.sep, "/")
ref_prefix = f"{ref}:" if ref else ":"
ref_path_arg = f"{ref_prefix}{git_path}"
log_handler.log_debug(
f"Getting content for '{ref_path_arg}' in '{working_directory}'",
func_name=func_name,
)
cmd = ["git", "show", ref_path_arg]
try:
result = self.log_and_execute(
cmd, working_directory, check=False, log_output_level=logging.DEBUG
)
if result.returncode == 0:
return result.stdout # Success
elif result.returncode == 128 and (
"exists on disk, but not in" in result.stderr
or "does not exist in" in result.stderr
or "fatal: Path" in result.stderr
and "does not exist" in result.stderr
or "did not match any file(s)" in result.stderr
):
log_handler.log_debug(
f"File '{git_path}' not found in ref '{ref}'.", func_name=func_name
)
return None
else:
log_handler.log_error(
f"git show failed for '{ref_path_arg}' (RC {result.returncode}). Stderr: {result.stderr.strip()}",
func_name=func_name,
)
return None
except GitCommandError as e:
log_handler.log_error(
f"Error executing git show for '{ref_path_arg}': {e}",
func_name=func_name,
)
return None
def add_file(self, working_directory: str, file_path: str):
func_name = "add_file"
if not file_path or file_path.isspace():
raise ValueError("File path empty.")
log_handler.log_info(
f"Adding path to staging: '{file_path}' in '{working_directory}'",
func_name=func_name,
)
cmd = ["git", "add", "--", file_path]
try:
self.log_and_execute(
cmd, working_directory, check=True, log_output_level=logging.DEBUG
)
log_handler.log_info(
f"Path '{file_path}' added successfully.", func_name=func_name
)
return True
except GitCommandError as add_e:
log_handler.log_error(
f"Failed to add path '{file_path}': {add_e}", func_name=func_name
)
stderr = add_e.stderr.lower() if add_e.stderr else ""
if "did not match any files" in stderr:
raise GitCommandError(
f"Pathspec '{file_path}' did not match any files.",
command=cmd,
stderr=add_e.stderr,
) from add_e
else:
raise add_e
# --- END OF FILE git_commands.py ---