1835 lines
80 KiB
Python
1835 lines
80 KiB
Python
# --- START OF FILE git_commands.py ---
|
|
|
|
# git_commands.py
|
|
import os
|
|
import subprocess
|
|
import logging
|
|
|
|
# Rimosso import logging
|
|
import re
|
|
|
|
# Importa il nuovo gestore della coda log
|
|
import log_handler
|
|
from typing import Tuple, Dict, List
|
|
|
|
|
|
# --- Custom Exception Definition (invariata) ---
|
|
class GitCommandError(Exception):
|
|
"""
|
|
Custom exception for handling Git command execution errors.
|
|
Includes the original command and stderr details if available.
|
|
"""
|
|
|
|
def __init__(self, message, command=None, stderr=None):
|
|
"""Initialize the GitCommandError."""
|
|
super().__init__(str(message))
|
|
self.command = command
|
|
self.stderr = stderr if stderr else ""
|
|
|
|
def __str__(self):
|
|
"""Return a formatted string representation including command details."""
|
|
base_message = super().__str__()
|
|
details = []
|
|
if self.command:
|
|
safe_command_parts = [str(part) for part in self.command]
|
|
command_str = " ".join(safe_command_parts)
|
|
details.append(f"Command: '{command_str}'")
|
|
stderr_str = self.stderr.strip()
|
|
if stderr_str:
|
|
details.append(f"Stderr: {stderr_str}")
|
|
if details:
|
|
details_str = "; ".join(details)
|
|
return f"{base_message} ({details_str})"
|
|
else:
|
|
return base_message
|
|
|
|
|
|
# --- Main Git Commands Class ---
|
|
class GitCommands:
|
|
"""
|
|
Manages Git command execution, logging (via log_handler), and error handling.
|
|
"""
|
|
|
|
# Rimosso logger da __init__
|
|
def __init__(self, logger_ignored=None): # Accetta argomento ma lo ignora
|
|
"""Initializes the GitCommands class."""
|
|
# Non c'è più self.logger
|
|
log_handler.log_debug("GitCommands initialized.", func_name="__init__")
|
|
|
|
def log_and_execute(
|
|
self,
|
|
command: list[str],
|
|
working_directory: str,
|
|
check: bool = True,
|
|
log_output_level: int = logging.INFO,
|
|
# ---<<< NUOVI PARAMETRI >>>---
|
|
capture: bool = True, # Cattura stdout/stderr?
|
|
hide_console: bool = True, # Nascondi finestra console (Windows)?
|
|
# ---<<< FINE NUOVI PARAMETRI >>>---
|
|
) -> subprocess.CompletedProcess:
|
|
"""
|
|
Executes a shell command, logs details, handles errors, with options
|
|
for capturing output and hiding the console window.
|
|
|
|
Args:
|
|
command (list): Command and arguments as a list of strings.
|
|
working_directory (str): The directory to execute the command in.
|
|
check (bool): If True, raises GitCommandError on non-zero exit code.
|
|
log_output_level (int): Logging level for stdout/stderr on success.
|
|
capture (bool): If True, capture stdout and stderr. If False, they
|
|
might go to the parent console/terminal.
|
|
hide_console (bool): If True (and on Windows), try to hide the console
|
|
window created for the subprocess.
|
|
|
|
Returns:
|
|
subprocess.CompletedProcess: Result object from subprocess.run.
|
|
|
|
Raises:
|
|
GitCommandError, ValueError, FileNotFoundError (wrapped), PermissionError (wrapped)
|
|
"""
|
|
func_name = "log_and_execute"
|
|
safe_command_parts = [str(part) for part in command]
|
|
command_str = " ".join(safe_command_parts)
|
|
log_handler.log_debug(
|
|
f"Executing in '{working_directory}': {command_str} "
|
|
f"(Capture={capture}, HideConsole={hide_console})", # Log nuovi parametri
|
|
func_name=func_name,
|
|
)
|
|
|
|
# --- Validazione Working Directory (invariato) ---
|
|
if not working_directory:
|
|
msg = "Working directory cannot be None or empty."
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise ValueError(msg)
|
|
if working_directory == ".":
|
|
effective_cwd = os.getcwd()
|
|
else:
|
|
effective_cwd = os.path.abspath(working_directory)
|
|
if not os.path.isdir(effective_cwd):
|
|
msg = (
|
|
f"Working directory does not exist or is not a dir: {effective_cwd}"
|
|
)
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise GitCommandError(msg, command=safe_command_parts)
|
|
# log_handler.log_debug(f"Effective CWD: {effective_cwd}", func_name=func_name) # Log meno verboso
|
|
|
|
# --- Esecuzione Comando ---
|
|
try:
|
|
# ---<<< MODIFICA: Configurazione startupinfo/flags >>>---
|
|
startupinfo = None
|
|
creationflags = 0
|
|
# Applica solo se richiesto E siamo su Windows
|
|
if hide_console and os.name == "nt":
|
|
startupinfo = subprocess.STARTUPINFO()
|
|
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
|
startupinfo.wShowWindow = subprocess.SW_HIDE
|
|
# CREATE_NO_WINDOW potrebbe essere troppo aggressivo e impedire prompt
|
|
# creationflags = subprocess.CREATE_NO_WINDOW
|
|
elif not hide_console and os.name == "nt":
|
|
# Se NON vogliamo nascondere, potremmo esplicitamente chiedere una nuova console
|
|
# per isolare l'input/output del comando interattivo
|
|
creationflags = subprocess.CREATE_NEW_CONSOLE
|
|
# Su Linux/macOS, non impostiamo nulla di speciale per la finestra
|
|
# ---<<< FINE MODIFICA >>>---
|
|
|
|
# Timeout diagnostico (mantenuto)
|
|
timeout_seconds = 60 # Aumentato leggermente per operazioni remote
|
|
# log_handler.log_debug(f"Setting timeout to {timeout_seconds} seconds.", func_name=func_name)
|
|
# log_handler.log_debug(f"Attempting subprocess.run for: {command_str}", func_name=func_name)
|
|
|
|
# Esecuzione comando con le opzioni corrette
|
|
result = subprocess.run(
|
|
safe_command_parts,
|
|
cwd=effective_cwd,
|
|
capture_output=capture, # Usa il nuovo parametro
|
|
text=True, # Decodifica output come testo
|
|
check=check, # Solleva eccezione su errore se True
|
|
encoding="utf-8", # Encoding standard
|
|
errors="replace", # Gestione errori decodifica
|
|
timeout=timeout_seconds,
|
|
startupinfo=startupinfo, # Passa la configurazione (o None)
|
|
creationflags=creationflags, # Passa i flag (o 0)
|
|
)
|
|
log_handler.log_debug(
|
|
f"Command '{command_str}' finished. RC={result.returncode}",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# --- Log Output di Successo (solo se catturato) ---
|
|
if capture and (result.returncode == 0 or not check):
|
|
stdout_log_debug = (
|
|
result.stdout.strip() if result.stdout else "<no stdout>"
|
|
)
|
|
stderr_log_debug = (
|
|
result.stderr.strip() if result.stderr else "<no stderr>"
|
|
)
|
|
# Logga sempre a DEBUG
|
|
log_handler.log_debug(
|
|
f"Command successful (RC={result.returncode}). Output:\n"
|
|
f"--- stdout ---\n{stdout_log_debug}\n"
|
|
f"--- stderr ---\n{stderr_log_debug}\n"
|
|
f"--- End Output ---",
|
|
func_name=func_name,
|
|
)
|
|
# Logga anche al livello richiesto se diverso da DEBUG
|
|
if log_output_level > logging.DEBUG:
|
|
log_handler.log_message(
|
|
log_output_level,
|
|
f"Command successful. Output logged at DEBUG level.", # Messaggio più conciso qui
|
|
func_name=func_name,
|
|
)
|
|
|
|
return result
|
|
|
|
# --- Gestione Errori (modificata per CalledProcessError quando capture=False) ---
|
|
except subprocess.TimeoutExpired as e:
|
|
# (Gestione Timeout invariata)
|
|
log_handler.log_error(
|
|
f"Command timed out after {timeout_seconds}s: {command_str}",
|
|
func_name=func_name,
|
|
)
|
|
stderr_out = (
|
|
e.stderr.strip() if capture and e.stderr else "<stderr not captured>"
|
|
)
|
|
stdout_out = (
|
|
e.stdout.strip() if capture and e.stdout else "<stdout not captured>"
|
|
)
|
|
log_handler.log_error(f"Timeout stderr: {stderr_out}", func_name=func_name)
|
|
raise GitCommandError(
|
|
f"Timeout after {timeout_seconds}s.",
|
|
command=safe_command_parts,
|
|
stderr=e.stderr if capture else None,
|
|
) from e
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
# (Gestione errore comando fallito)
|
|
# Se l'output non è stato catturato, e.stdout/e.stderr saranno None
|
|
stderr_err = (
|
|
e.stderr.strip() if capture and e.stderr else "<stderr not captured>"
|
|
)
|
|
stdout_err = (
|
|
e.stdout.strip() if capture and e.stdout else "<stdout not captured>"
|
|
)
|
|
err_msg = (
|
|
f"Command failed (RC {e.returncode}) in '{effective_cwd}'.\n"
|
|
f"CMD: {command_str}\nSTDERR: {stderr_err}\nSTDOUT: {stdout_err}"
|
|
)
|
|
log_handler.log_error(err_msg, func_name=func_name)
|
|
# Passa stderr all'eccezione solo se è stato catturato
|
|
raise GitCommandError(
|
|
f"Git command failed in '{effective_cwd}'. RC: {e.returncode}",
|
|
command=safe_command_parts,
|
|
stderr=e.stderr if capture else None,
|
|
) from e
|
|
|
|
# (Gestione FileNotFoundError, PermissionError, Exception generica invariata)
|
|
except FileNotFoundError as e:
|
|
log_handler.log_error(
|
|
f"FileNotFoundError for command: {safe_command_parts[0]}",
|
|
func_name=func_name,
|
|
)
|
|
error_msg = f"Command not found: '{safe_command_parts[0]}'. Is Git installed/in PATH?"
|
|
log_handler.log_error(error_msg, func_name=func_name)
|
|
raise GitCommandError(error_msg, command=safe_command_parts) from e
|
|
except PermissionError as e:
|
|
log_handler.log_error(
|
|
f"PermissionError executing in '{effective_cwd}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
error_msg = f"Permission denied executing command in '{effective_cwd}'."
|
|
log_handler.log_error(error_msg, func_name=func_name)
|
|
raise GitCommandError(
|
|
error_msg, command=safe_command_parts, stderr=str(e)
|
|
) from e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected Exception executing command {command_str}: {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise GitCommandError(
|
|
f"Unexpected execution error: {e}", command=safe_command_parts
|
|
) from e
|
|
|
|
# --- Core Repo Operations (usano log_handler) ---
|
|
def prepare_svn_for_git(self, working_directory: str):
|
|
func_name = "prepare_svn_for_git"
|
|
log_handler.log_info(
|
|
f"Preparing directory for Git: '{working_directory}'", func_name=func_name
|
|
)
|
|
if not working_directory:
|
|
raise ValueError("Working directory cannot be None or empty.")
|
|
if not os.path.isdir(working_directory):
|
|
raise GitCommandError(
|
|
f"Target directory does not exist: {working_directory}"
|
|
)
|
|
gitignore_path = os.path.join(working_directory, ".gitignore")
|
|
git_dir_path = os.path.join(working_directory, ".git")
|
|
svn_ignore = ".svn"
|
|
if not os.path.exists(git_dir_path):
|
|
log_handler.log_info(
|
|
"No existing Git repo found. Initializing...", func_name=func_name
|
|
)
|
|
try:
|
|
init_cmd = ["git", "init"]
|
|
self.log_and_execute(init_cmd, working_directory, check=True)
|
|
log_handler.log_info("Git repo initialized.", func_name=func_name)
|
|
except (GitCommandError, ValueError) as e:
|
|
log_handler.log_error(f"Failed init Git repo: {e}", func_name=func_name)
|
|
raise
|
|
else:
|
|
log_handler.log_info(
|
|
"Existing Git repo found. Skipping init.", func_name=func_name
|
|
)
|
|
log_handler.log_debug(
|
|
f"Checking/updating .gitignore: {gitignore_path}", func_name=func_name
|
|
)
|
|
try:
|
|
needs_write = False
|
|
content = ""
|
|
write_content = ""
|
|
if not os.path.exists(gitignore_path):
|
|
log_handler.log_info(
|
|
"'.gitignore' not found. Creating with .svn entry.",
|
|
func_name=func_name,
|
|
)
|
|
write_content = f"{svn_ignore}\n"
|
|
needs_write = True
|
|
else:
|
|
try:
|
|
with open(gitignore_path, "r", encoding="utf-8") as f:
|
|
lines = f.readlines()
|
|
ignored = any(
|
|
l.strip() == svn_ignore
|
|
or l.strip().startswith(svn_ignore + "/")
|
|
for l in lines
|
|
)
|
|
if not ignored:
|
|
log_handler.log_info(
|
|
"'.svn' not found in .gitignore. Appending...",
|
|
func_name=func_name,
|
|
)
|
|
content = "".join(lines)
|
|
write_content += (
|
|
"\n" if not content.endswith("\n") else ""
|
|
) + f"{svn_ignore}\n"
|
|
needs_write = True
|
|
else:
|
|
log_handler.log_info(
|
|
"'.svn' already in .gitignore.", func_name=func_name
|
|
)
|
|
except IOError as r_err:
|
|
log_handler.log_error(
|
|
f"Could not read .gitignore: {r_err}", func_name=func_name
|
|
)
|
|
raise IOError(f"Failed read: {r_err}") from r_err
|
|
if needs_write:
|
|
mode = (
|
|
"a"
|
|
if os.path.exists(gitignore_path) and write_content.startswith("\n")
|
|
else "w"
|
|
)
|
|
log_handler.log_debug(
|
|
f"Writing to {gitignore_path} (mode: {mode})", func_name=func_name
|
|
)
|
|
try:
|
|
with open(
|
|
gitignore_path, mode, encoding="utf-8", newline="\n"
|
|
) as f:
|
|
f.write(write_content)
|
|
log_handler.log_info(
|
|
f"Updated '{gitignore_path}'.", func_name=func_name
|
|
)
|
|
except IOError as w_err:
|
|
log_handler.log_error(
|
|
f"Error writing .gitignore: {w_err}", func_name=func_name
|
|
)
|
|
raise IOError(f"Failed write: {w_err}") from w_err
|
|
except IOError as io_err:
|
|
log_handler.log_error(
|
|
f"IO error for .gitignore: {io_err}", func_name=func_name
|
|
)
|
|
raise
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error managing .gitignore: {e}", func_name=func_name
|
|
)
|
|
raise GitCommandError(f"Unexpected .gitignore error: {e}") from e
|
|
log_handler.log_info(
|
|
f"Directory preparation complete for '{working_directory}'.",
|
|
func_name=func_name,
|
|
)
|
|
|
|
def create_git_bundle(self, working_directory: str, bundle_path: str):
|
|
func_name = "create_git_bundle"
|
|
norm_path = os.path.normpath(bundle_path)
|
|
git_path = norm_path.replace("\\", "/")
|
|
log_handler.log_info(
|
|
f"Attempting to create Git bundle: {git_path}", func_name=func_name
|
|
)
|
|
log_handler.log_debug(f"Source repo: {working_directory}", func_name=func_name)
|
|
command = ["git", "bundle", "create", git_path, "--all"]
|
|
try:
|
|
result = self.log_and_execute(command, working_directory, check=False)
|
|
if result.returncode != 0:
|
|
stderr_low = result.stderr.lower() if result.stderr else ""
|
|
if (
|
|
"refusing to create empty bundle" in stderr_low
|
|
or "does not contain any references" in stderr_low
|
|
):
|
|
log_handler.log_warning(
|
|
f"Bundle creation skipped: Repo '{working_directory}' empty.",
|
|
func_name=func_name,
|
|
)
|
|
else:
|
|
err_msg = f"Git bundle command failed (RC {result.returncode})."
|
|
log_handler.log_error(
|
|
f"{err_msg} Stderr: {result.stderr.strip()}",
|
|
func_name=func_name,
|
|
)
|
|
raise GitCommandError(
|
|
err_msg, command=command, stderr=result.stderr
|
|
)
|
|
else:
|
|
if not os.path.exists(norm_path) or os.path.getsize(norm_path) == 0:
|
|
log_handler.log_warning(
|
|
f"Bundle cmd OK, but file '{norm_path}' missing/empty.",
|
|
func_name=func_name,
|
|
)
|
|
else:
|
|
log_handler.log_info(
|
|
f"Bundle created successfully: '{norm_path}'.",
|
|
func_name=func_name,
|
|
)
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected bundle creation error: {e}", func_name=func_name
|
|
)
|
|
raise GitCommandError(
|
|
f"Unexpected bundle error: {e}", command=command
|
|
) from e
|
|
|
|
def fetch_from_git_bundle(self, working_directory: str, bundle_path: str):
|
|
func_name = "fetch_from_git_bundle"
|
|
if not os.path.isfile(bundle_path):
|
|
raise FileNotFoundError(f"Bundle file not found: {bundle_path}")
|
|
norm_path = os.path.normpath(bundle_path).replace("\\", "/")
|
|
log_handler.log_info(
|
|
f"Fetching from '{norm_path}' into '{working_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
fetch_cmd = ["git", "fetch", norm_path, "--verbose"]
|
|
merge_cmd = ["git", "merge", "FETCH_HEAD", "--no-ff", "--no-edit"]
|
|
try:
|
|
log_handler.log_debug("Executing fetch...", func_name=func_name)
|
|
self.log_and_execute(fetch_cmd, working_directory, check=True)
|
|
log_handler.log_info("Fetch successful.", func_name=func_name)
|
|
log_handler.log_debug("Executing merge...", func_name=func_name)
|
|
merge_res = self.log_and_execute(merge_cmd, working_directory, check=False)
|
|
out_log = merge_res.stdout.strip() if merge_res.stdout else ""
|
|
err_log = merge_res.stderr.strip() if merge_res.stderr else ""
|
|
combined_low = (out_log + err_log).lower()
|
|
if merge_res.returncode == 0:
|
|
if "already up to date" in combined_low:
|
|
log_handler.log_info(
|
|
"Repo already up-to-date.", func_name=func_name
|
|
)
|
|
else:
|
|
log_handler.log_info("Merge successful.", func_name=func_name)
|
|
log_handler.log_debug(
|
|
f"Merge stdout:\n{out_log}", func_name=func_name
|
|
)
|
|
else:
|
|
if (
|
|
"conflict" in combined_low
|
|
or "automatic merge failed" in combined_low
|
|
):
|
|
msg = f"Merge conflict occurred. Resolve manually in '{working_directory}' and commit."
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise GitCommandError(
|
|
msg, command=merge_cmd, stderr=merge_res.stderr
|
|
)
|
|
else:
|
|
msg = f"Merge command failed (RC {merge_res.returncode})."
|
|
log_handler.log_error(
|
|
f"{msg} Stderr: {err_log}", func_name=func_name
|
|
)
|
|
raise GitCommandError(
|
|
msg, command=merge_cmd, stderr=merge_res.stderr
|
|
)
|
|
except (GitCommandError, ValueError, FileNotFoundError) as e:
|
|
log_handler.log_error(
|
|
f"Fetch/merge failed for '{working_directory}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected fetch/merge error for '{working_directory}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise GitCommandError(f"Unexpected fetch/merge error: {e}") from e
|
|
|
|
# --- Commit and Status ---
|
|
def git_commit(self, working_directory: str, message: str):
|
|
func_name = "git_commit"
|
|
log_handler.log_info(
|
|
f"Attempting commit in '{working_directory}' msg: '{message[:50]}...'",
|
|
func_name=func_name,
|
|
)
|
|
if not message or message.isspace():
|
|
raise ValueError("Commit message empty.")
|
|
try:
|
|
add_cmd = ["git", "add", "."]
|
|
log_handler.log_debug(
|
|
"Staging all changes ('git add .')...", func_name=func_name
|
|
)
|
|
self.log_and_execute(add_cmd, working_directory, check=True)
|
|
log_handler.log_debug("Staging successful.", func_name=func_name)
|
|
commit_cmd = ["git", "commit", "-m", message]
|
|
log_handler.log_debug("Attempting commit...", func_name=func_name)
|
|
result = self.log_and_execute(commit_cmd, working_directory, check=False)
|
|
out_low = result.stdout.lower() if result.stdout else ""
|
|
err_low = result.stderr.lower() if result.stderr else ""
|
|
combined_low = out_low + err_low
|
|
if result.returncode == 0:
|
|
log_handler.log_info("Commit successful.", func_name=func_name)
|
|
return True
|
|
elif (
|
|
"nothing to commit" in combined_low
|
|
or "no changes added to commit" in combined_low
|
|
or "nothing added to commit" in combined_low
|
|
):
|
|
log_handler.log_info(
|
|
"No changes available to commit.", func_name=func_name
|
|
)
|
|
return False
|
|
else:
|
|
msg = f"Commit command failed unexpectedly (RC {result.returncode})."
|
|
log_handler.log_error(
|
|
f"{msg} Stderr: {result.stderr.strip()}", func_name=func_name
|
|
)
|
|
raise GitCommandError(msg, command=commit_cmd, stderr=result.stderr)
|
|
except (GitCommandError, ValueError) as e:
|
|
log_handler.log_error(
|
|
f"Error during staging/commit: {e}", func_name=func_name
|
|
)
|
|
raise
|
|
|
|
def git_status_has_changes(self, working_directory: str):
|
|
func_name = "git_status_has_changes"
|
|
log_handler.log_debug(
|
|
f"Checking status for changes in '{working_directory}'...",
|
|
func_name=func_name,
|
|
)
|
|
try:
|
|
cmd = ["git", "status", "--porcelain=v1", "-unormal"]
|
|
result = self.log_and_execute(
|
|
cmd, working_directory, check=True, log_output_level=logging.DEBUG
|
|
)
|
|
has_changes = bool(result.stdout.strip())
|
|
log_handler.log_debug(
|
|
f"Status check complete. Has changes: {has_changes}",
|
|
func_name=func_name,
|
|
)
|
|
return has_changes
|
|
except GitCommandError as e:
|
|
log_handler.log_error(
|
|
f"Git status check command failed: {e}", func_name=func_name
|
|
)
|
|
raise
|
|
|
|
# --- Tag Management ---
|
|
def list_tags(self, working_directory: str):
|
|
func_name = "list_tags"
|
|
log_handler.log_info(
|
|
f"Listing tags in '{working_directory}'...", func_name=func_name
|
|
)
|
|
fmt = "%(refname:short)%09%(contents:subject)"
|
|
cmd = ["git", "tag", "--list", f"--format={fmt}", "--sort=-creatordate"]
|
|
tags = []
|
|
try:
|
|
result = self.log_and_execute(cmd, working_directory, check=True)
|
|
for line in result.stdout.splitlines():
|
|
line_strip = line.strip()
|
|
if line_strip:
|
|
parts = line_strip.split("\t", 1)
|
|
name = parts[0].strip()
|
|
subject = (
|
|
parts[1].strip()
|
|
if len(parts) > 1 and parts[1].strip()
|
|
else "(No subject/Lightweight)"
|
|
)
|
|
tags.append((name, subject))
|
|
log_handler.log_info(f"Found {len(tags)} tags.", func_name=func_name)
|
|
log_handler.log_debug(f"Tags found: {tags}", func_name=func_name)
|
|
return tags
|
|
except GitCommandError as e:
|
|
log_handler.log_error(f"Error listing tags: {e}", func_name=func_name)
|
|
return []
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error listing tags: {e}", func_name=func_name
|
|
)
|
|
return []
|
|
|
|
def create_tag(self, working_directory: str, tag_name: str, message: str):
|
|
func_name = "create_tag"
|
|
log_handler.log_info(
|
|
f"Creating tag '{tag_name}' in '{working_directory}'", func_name=func_name
|
|
)
|
|
if not tag_name or tag_name.isspace():
|
|
raise ValueError("Tag name empty.")
|
|
if not message or message.isspace():
|
|
raise ValueError("Tag message empty.")
|
|
pattern = r"^(?![./]|.*([./]{2,}|[.]$|\.lock$|@\{|\\\\))[^ \t\n\r\f\v~^:?*[\\]+(?<!\.)$"
|
|
if not re.match(pattern, tag_name):
|
|
raise ValueError(f"Invalid tag name format: '{tag_name}'.")
|
|
cmd = ["git", "tag", "-a", tag_name, "-m", message]
|
|
try:
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
log_handler.log_info(
|
|
f"Annotated tag '{tag_name}' created.", func_name=func_name
|
|
)
|
|
except GitCommandError as e:
|
|
stderr_low = e.stderr.lower() if e.stderr else ""
|
|
if "already exists" in stderr_low:
|
|
msg = f"Tag '{tag_name}' already exists."
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e
|
|
else:
|
|
log_handler.log_error(
|
|
f"Failed create tag '{tag_name}': {e}", func_name=func_name
|
|
)
|
|
raise
|
|
|
|
def checkout_tag(self, working_directory: str, tag_name: str):
|
|
func_name = "checkout_tag"
|
|
log_handler.log_info(
|
|
f"Checking out tag '{tag_name}' in '{working_directory}'...",
|
|
func_name=func_name,
|
|
)
|
|
if not tag_name or tag_name.isspace():
|
|
raise ValueError("Tag name empty.")
|
|
cmd = ["git", "checkout", tag_name]
|
|
try:
|
|
result = self.log_and_execute(cmd, working_directory, check=True)
|
|
log_handler.log_info(
|
|
f"Successfully checked out tag '{tag_name}'.", func_name=func_name
|
|
)
|
|
output = (result.stderr + result.stdout).lower()
|
|
if "detached head" in output:
|
|
log_handler.log_warning(
|
|
"Repo is now in 'detached HEAD' state.", func_name=func_name
|
|
)
|
|
return True
|
|
except GitCommandError as e:
|
|
stderr_low = e.stderr.lower() if e.stderr else ""
|
|
not_found = [
|
|
"did not match any file(s)",
|
|
f"pathspec '{tag_name.lower()}' did not match",
|
|
"reference is not a tree",
|
|
]
|
|
if any(p in stderr_low for p in not_found):
|
|
msg = f"Tag '{tag_name}' not found or invalid."
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e
|
|
else:
|
|
log_handler.log_error(
|
|
f"Failed checkout tag '{tag_name}': {e}", func_name=func_name
|
|
)
|
|
raise
|
|
|
|
# --- Branch Management ---
|
|
def list_branches(self, working_directory: str):
|
|
func_name = "list_branches"
|
|
log_handler.log_info(
|
|
f"Listing local branches in '{working_directory}'...", func_name=func_name
|
|
)
|
|
fmt = "%(HEAD)%(refname:short)"
|
|
cmd = ["git", "branch", "--list", f"--format={fmt}"]
|
|
branches = []
|
|
current = None
|
|
try:
|
|
result = self.log_and_execute(cmd, working_directory, check=True)
|
|
for line in result.stdout.splitlines():
|
|
line_strip = line.strip()
|
|
if line_strip:
|
|
is_curr = line_strip.startswith("*")
|
|
name = line_strip.lstrip("* ").strip()
|
|
branches.append(name)
|
|
if is_curr:
|
|
current = name
|
|
branches.sort()
|
|
curr_disp = current if current else "None (Detached?)"
|
|
log_handler.log_info(
|
|
f"Found {len(branches)} branches. Current: {curr_disp}",
|
|
func_name=func_name,
|
|
)
|
|
log_handler.log_debug(f"Branches: {branches}", func_name=func_name)
|
|
return branches, current
|
|
except GitCommandError as e:
|
|
log_handler.log_error(f"Error listing branches: {e}", func_name=func_name)
|
|
return [], None
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error listing branches: {e}", func_name=func_name
|
|
)
|
|
return [], None
|
|
|
|
def checkout_branch(self, working_directory: str, branch_name: str):
|
|
func_name = "checkout_branch"
|
|
log_handler.log_info(
|
|
f"Checking out branch '{branch_name}' in '{working_directory}'...",
|
|
func_name=func_name,
|
|
)
|
|
if not branch_name or branch_name.isspace():
|
|
raise ValueError("Branch name empty.")
|
|
cmd = ["git", "checkout", branch_name]
|
|
try:
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
log_handler.log_info(
|
|
f"Successfully checked out branch '{branch_name}'.", func_name=func_name
|
|
)
|
|
return True
|
|
except GitCommandError as e:
|
|
stderr_low = e.stderr.lower() if e.stderr else ""
|
|
not_found = [
|
|
"did not match any file(s)",
|
|
f"pathspec '{branch_name.lower()}' did not match",
|
|
"is not a commit and a branch",
|
|
]
|
|
if any(p in stderr_low for p in not_found):
|
|
msg = f"Branch '{branch_name}' not found or invalid."
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e
|
|
else:
|
|
log_handler.log_error(
|
|
f"Failed checkout branch '{branch_name}': {e}", func_name=func_name
|
|
)
|
|
raise
|
|
|
|
def create_branch(self, working_directory: str, new_branch_name: str):
|
|
func_name = "create_branch"
|
|
log_handler.log_info(
|
|
f"Creating branch '{new_branch_name}' in '{working_directory}'...",
|
|
func_name=func_name,
|
|
)
|
|
if not new_branch_name or new_branch_name.isspace():
|
|
raise ValueError("Branch name empty.")
|
|
pattern = (
|
|
r"^(?!\.| |.*[/.]\.|\.|.*\\|.*@\{|.*[/]$|.*\.\.)[^ \t\n\r\f\v~^:?*[\\]+$"
|
|
)
|
|
if not re.match(pattern, new_branch_name) or new_branch_name.lower() == "head":
|
|
raise ValueError(f"Invalid branch name format: '{new_branch_name}'.")
|
|
cmd = ["git", "branch", new_branch_name]
|
|
try:
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
log_handler.log_info(
|
|
f"Branch '{new_branch_name}' created.", func_name=func_name
|
|
)
|
|
return True
|
|
except GitCommandError as e:
|
|
stderr_low = e.stderr.lower() if e.stderr else ""
|
|
if "already exists" in stderr_low:
|
|
msg = f"Branch '{new_branch_name}' already exists."
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e
|
|
else:
|
|
log_handler.log_error(
|
|
f"Failed create branch '{new_branch_name}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise
|
|
|
|
# --- History / Log ---
|
|
def get_commit_log(
|
|
self, working_directory: str, max_count: int = 200, branch: str | None = None
|
|
):
|
|
func_name = "get_commit_log"
|
|
scope = f"'{branch}'" if branch else "all"
|
|
log_handler.log_info(
|
|
f"Retrieving commit log for {scope} in '{working_directory}' (max {max_count})...",
|
|
func_name=func_name,
|
|
)
|
|
fmt = "%h %ad | %an |%d %s"
|
|
date_fmt = "format:%Y-%m-%d %H:%M"
|
|
cmd = [
|
|
"git",
|
|
"log",
|
|
f"--max-count={max_count}",
|
|
f"--pretty=format:{fmt}",
|
|
f"--date={date_fmt}",
|
|
"--decorate=short",
|
|
"--source",
|
|
]
|
|
if branch:
|
|
cmd.append(branch)
|
|
else:
|
|
cmd.append("--all")
|
|
try:
|
|
result = self.log_and_execute(
|
|
cmd, working_directory, check=True, log_output_level=logging.DEBUG
|
|
)
|
|
lines = [
|
|
line.strip() for line in result.stdout.splitlines() if line.strip()
|
|
]
|
|
log_handler.log_info(
|
|
f"Retrieved {len(lines)} log entries for {scope}.", func_name=func_name
|
|
)
|
|
return lines
|
|
except GitCommandError as e:
|
|
log_handler.log_error(
|
|
f"Error retrieving commit log for {scope}: {e}", func_name=func_name
|
|
)
|
|
return []
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error retrieving commit log for {scope}: {e}",
|
|
func_name=func_name,
|
|
)
|
|
return []
|
|
|
|
# --- Clone from Bundle ---
|
|
def clone_from_bundle(self, bundle_path: str, destination_directory: str):
|
|
func_name = "clone_from_bundle"
|
|
log_handler.log_info(
|
|
f"Cloning from '{bundle_path}' into '{destination_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
if not bundle_path:
|
|
raise ValueError("Bundle path empty.")
|
|
if not destination_directory:
|
|
raise ValueError("Destination empty.")
|
|
abs_bundle = os.path.abspath(bundle_path)
|
|
abs_dest = os.path.abspath(destination_directory)
|
|
if not os.path.isfile(abs_bundle):
|
|
msg = f"Bundle not found: {abs_bundle}"
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise FileNotFoundError(msg)
|
|
if os.path.exists(abs_dest):
|
|
if not os.path.isdir(abs_dest):
|
|
msg = f"Dest exists but not dir: {abs_dest}"
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise IOError(msg)
|
|
if os.listdir(abs_dest):
|
|
msg = f"Dest dir not empty: {abs_dest}"
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise IOError(msg)
|
|
log_handler.log_debug("Dest dir exists and empty.", func_name=func_name)
|
|
else:
|
|
log_handler.log_debug(
|
|
f"Dest dir does not exist. Creating: {abs_dest}", func_name=func_name
|
|
)
|
|
try:
|
|
os.makedirs(abs_dest)
|
|
log_handler.log_info(
|
|
f"Created dest dir: {abs_dest}", func_name=func_name
|
|
)
|
|
except OSError as e:
|
|
msg = f"Failed create dest dir '{abs_dest}': {e}"
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise IOError(msg) from e
|
|
cmd = ["git", "clone", abs_bundle, abs_dest, "--progress"]
|
|
try:
|
|
self.log_and_execute(
|
|
cmd, working_directory=".", check=True, log_output_level=logging.INFO
|
|
)
|
|
log_handler.log_info(
|
|
f"Cloned successfully into '{abs_dest}'.", func_name=func_name
|
|
)
|
|
return True
|
|
except GitCommandError as clone_e:
|
|
log_handler.log_error(
|
|
f"Failed clone from bundle '{abs_bundle}': {clone_e}",
|
|
func_name=func_name,
|
|
)
|
|
if os.path.isdir(abs_dest):
|
|
try:
|
|
if not os.listdir(abs_dest):
|
|
os.rmdir(abs_dest)
|
|
log_handler.log_info(
|
|
f"Removed empty dest dir after failed clone: {abs_dest}",
|
|
func_name=func_name,
|
|
)
|
|
else:
|
|
log_handler.log_warning(
|
|
f"Dest dir not empty after failed clone: {abs_dest}",
|
|
func_name=func_name,
|
|
)
|
|
except OSError as rm_e:
|
|
log_handler.log_warning(
|
|
f"Could not remove dest dir '{abs_dest}': {rm_e}",
|
|
func_name=func_name,
|
|
)
|
|
raise clone_e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error cloning from bundle: {e}", func_name=func_name
|
|
)
|
|
raise GitCommandError(f"Unexpected clone error: {e}", command=cmd) from e
|
|
|
|
# --- Gitignore and File Tracking ---
|
|
def get_tracked_files(self, working_directory: str):
|
|
func_name = "get_tracked_files"
|
|
log_handler.log_debug(
|
|
f"Getting tracked files in '{working_directory}'", func_name=func_name
|
|
)
|
|
cmd = ["git", "ls-files", "-z"]
|
|
try:
|
|
result = self.log_and_execute(
|
|
cmd, working_directory, check=True, log_output_level=logging.DEBUG
|
|
)
|
|
files = [f for f in result.stdout.split("\0") if f]
|
|
log_handler.log_info(
|
|
f"Found {len(files)} tracked files.", func_name=func_name
|
|
)
|
|
return files
|
|
except GitCommandError as e:
|
|
log_handler.log_error(
|
|
f"Failed list tracked files: {e}", func_name=func_name
|
|
)
|
|
raise
|
|
|
|
def check_if_would_be_ignored(self, working_directory: str, path_to_check: str):
|
|
func_name = "check_if_would_be_ignored"
|
|
cmd = ["git", "check-ignore", "--quiet", "--no-index", "--", path_to_check]
|
|
log_handler.log_debug(
|
|
f"Checking ignore status for: '{path_to_check}'", func_name=func_name
|
|
)
|
|
try:
|
|
result = self.log_and_execute(cmd, working_directory, check=False)
|
|
if result.returncode == 0:
|
|
log_handler.log_debug(
|
|
f"Path '{path_to_check}' WOULD be ignored.", func_name=func_name
|
|
)
|
|
return True
|
|
elif result.returncode == 1:
|
|
log_handler.log_debug(
|
|
f"Path '{path_to_check}' would NOT be ignored.", func_name=func_name
|
|
)
|
|
return False
|
|
else:
|
|
msg = f"check-ignore failed (RC {result.returncode})"
|
|
log_handler.log_error(
|
|
f"{msg}. Stderr: {result.stderr.strip()}", func_name=func_name
|
|
)
|
|
raise GitCommandError(msg, command=cmd, stderr=result.stderr)
|
|
except GitCommandError as e:
|
|
log_handler.log_error(
|
|
f"Failed check_ignore for '{path_to_check}': {e}", func_name=func_name
|
|
)
|
|
raise
|
|
|
|
def remove_from_tracking(self, working_directory: str, files_to_untrack: list[str]):
|
|
func_name = "remove_from_tracking"
|
|
if not files_to_untrack:
|
|
log_handler.log_debug("No files provided to untrack.", func_name=func_name)
|
|
return True
|
|
num = len(files_to_untrack)
|
|
log_handler.log_info(
|
|
f"Removing {num} items from tracking index...", func_name=func_name
|
|
)
|
|
batch_size = 100
|
|
succeeded = True
|
|
for i in range(0, num, batch_size):
|
|
batch = files_to_untrack[i : i + batch_size]
|
|
batch_num = i // batch_size + 1
|
|
log_handler.log_info(
|
|
f"Processing untrack batch {batch_num} ({len(batch)} items)...",
|
|
func_name=func_name,
|
|
)
|
|
log_handler.log_debug(
|
|
f"Batch {batch_num} items: {batch}", func_name=func_name
|
|
)
|
|
cmd = ["git", "rm", "--cached", "-r", "--ignore-unmatch", "--"]
|
|
cmd.extend(batch)
|
|
try:
|
|
self.log_and_execute(
|
|
cmd, working_directory, check=True, log_output_level=logging.DEBUG
|
|
)
|
|
log_handler.log_info(
|
|
f"Batch {batch_num} untracked successfully.", func_name=func_name
|
|
)
|
|
except GitCommandError as e:
|
|
log_handler.log_error(
|
|
f"Failed untrack batch {batch_num}: {e}", func_name=func_name
|
|
)
|
|
succeeded = False
|
|
raise GitCommandError(
|
|
f"Failed batch {batch_num}. Error: {e}",
|
|
command=cmd,
|
|
stderr=getattr(e, "stderr", None),
|
|
) from e
|
|
log_handler.log_info("All untracking batches completed.", func_name=func_name)
|
|
return succeeded
|
|
|
|
def git_ls_remote(
|
|
self, working_directory: str, remote_name: str
|
|
) -> subprocess.CompletedProcess:
|
|
"""
|
|
Executes 'git ls-remote <remote_name>' to check connection and list refs.
|
|
Captures output and hides console by default. Raises GitCommandError on failure.
|
|
"""
|
|
func_name = "git_ls_remote"
|
|
log_handler.log_debug(
|
|
f"Running ls-remote for '{remote_name}' in '{working_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
cmd = [
|
|
"git",
|
|
"ls-remote",
|
|
"--exit-code",
|
|
remote_name,
|
|
] # --exit-code fa fallire se remote non raggiungibile
|
|
# Esegui catturando output e nascondendo console, solleva eccezione su errore
|
|
# Non impostiamo check=True qui, analizzeremo il CompletedProcess nel chiamante
|
|
result = self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=False, # Analizziamo noi il codice di ritorno e stderr
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.DEBUG, # Logga output solo a DEBUG
|
|
)
|
|
return result
|
|
|
|
def git_fetch_interactive(
|
|
self, working_directory: str, remote_name: str
|
|
) -> subprocess.CompletedProcess:
|
|
"""
|
|
Executes 'git fetch <remote_name>' allowing user interaction.
|
|
Does NOT capture output and tries to show a console window for prompts.
|
|
"""
|
|
func_name = "git_fetch_interactive"
|
|
log_handler.log_info(
|
|
f"Running interactive fetch for '{remote_name}' in '{working_directory}'. User may see a terminal.",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "fetch", remote_name]
|
|
# Esegui SENZA catturare output e SENZA nascondere la console
|
|
# check=False perché vogliamo analizzare noi l'esito nel worker
|
|
result = self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=False,
|
|
capture=False, # Non catturare stdout/stderr
|
|
hide_console=False, # Non nascondere la console
|
|
)
|
|
# Nota: result.stdout e result.stderr saranno None qui
|
|
return result
|
|
|
|
def get_matching_gitignore_rule(self, working_directory: str, path_to_check: str):
|
|
func_name = "get_matching_gitignore_rule"
|
|
cmd = ["git", "check-ignore", "-v", "--no-index", "--", path_to_check]
|
|
log_handler.log_debug(
|
|
f"Getting matching rule for: '{path_to_check}'", func_name=func_name
|
|
)
|
|
try:
|
|
result = self.log_and_execute(cmd, working_directory, check=False)
|
|
if result.returncode == 0:
|
|
line = result.stdout.strip()
|
|
if line and "\t" in line:
|
|
rule_part = line.split("\t", 1)[0]
|
|
parts = rule_part.split(":", 2)
|
|
if len(parts) == 3:
|
|
pattern = parts[2]
|
|
log_handler.log_debug(
|
|
f"Path '{path_to_check}' matched rule: '{pattern}'",
|
|
func_name=func_name,
|
|
)
|
|
return pattern
|
|
else:
|
|
log_handler.log_warning(
|
|
f"Could not parse pattern from: {rule_part}",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
else:
|
|
log_handler.log_warning(
|
|
f"Unexpected output from check-ignore -v: {line}",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
elif result.returncode == 1:
|
|
log_handler.log_debug(
|
|
f"Path '{path_to_check}' not ignored.", func_name=func_name
|
|
)
|
|
return None
|
|
else:
|
|
msg = f"check-ignore -v failed (RC {result.returncode})"
|
|
log_handler.log_error(
|
|
f"{msg}. Stderr: {result.stderr.strip()}", func_name=func_name
|
|
)
|
|
raise GitCommandError(msg, command=cmd, stderr=result.stderr)
|
|
except GitCommandError as e:
|
|
log_handler.log_error(
|
|
f"Failed get_matching_rule for '{path_to_check}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise
|
|
|
|
def get_status_short(self, working_directory: str):
|
|
func_name = "get_status_short"
|
|
log_handler.log_debug(
|
|
f"Getting short status for '{working_directory}' (-z)", func_name=func_name
|
|
)
|
|
cmd = ["git", "status", "--short", "-z", "--ignored=no"]
|
|
try:
|
|
result = self.log_and_execute(
|
|
cmd, working_directory, check=True, log_output_level=logging.DEBUG
|
|
)
|
|
|
|
# <<< MODIFICA/VERIFICA >>>
|
|
raw_output = result.stdout
|
|
log_handler.log_debug(
|
|
f"Raw stdout length: {len(raw_output)}", func_name=func_name
|
|
)
|
|
# Logga la rappresentazione repr() che mostra caratteri speciali come \x00
|
|
log_handler.log_debug(
|
|
f"Raw stdout repr: {repr(raw_output)}", func_name=func_name
|
|
)
|
|
|
|
# Esegui lo split e verifica
|
|
status_lines = [
|
|
line for line in raw_output.split("\0") if line
|
|
] # Filtra stringhe vuote
|
|
log_handler.log_debug(
|
|
f"Split resulted in {len(status_lines)} non-empty lines.",
|
|
func_name=func_name,
|
|
)
|
|
# Logga la lista risultante per conferma
|
|
log_handler.log_debug(
|
|
f"Split lines list: {status_lines}", func_name=func_name
|
|
)
|
|
# <<< FINE MODIFICA/VERIFICA >>>
|
|
|
|
log_handler.log_info(
|
|
f"Status check returned {len(status_lines)} items.", func_name=func_name
|
|
)
|
|
return status_lines
|
|
except GitCommandError as e:
|
|
log_handler.log_error(f"Failed get status: {e}", func_name=func_name)
|
|
return []
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error getting status: {e}", func_name=func_name
|
|
)
|
|
return []
|
|
|
|
def get_file_content_from_ref(
|
|
self, working_directory: str, file_path: str, ref: str = "HEAD"
|
|
):
|
|
func_name = "get_file_content_from_ref"
|
|
git_path = file_path.replace(os.path.sep, "/")
|
|
ref_prefix = f"{ref}:" if ref else ":"
|
|
ref_path_arg = f"{ref_prefix}{git_path}"
|
|
log_handler.log_debug(
|
|
f"Getting content for '{ref_path_arg}' in '{working_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "show", ref_path_arg]
|
|
try:
|
|
result = self.log_and_execute(
|
|
cmd, working_directory, check=False, log_output_level=logging.DEBUG
|
|
)
|
|
if result.returncode == 0:
|
|
return result.stdout # Success
|
|
elif result.returncode == 128 and (
|
|
"exists on disk, but not in" in result.stderr
|
|
or "does not exist in" in result.stderr
|
|
or "fatal: Path" in result.stderr
|
|
and "does not exist" in result.stderr
|
|
or "did not match any file(s)" in result.stderr
|
|
):
|
|
log_handler.log_debug(
|
|
f"File '{git_path}' not found in ref '{ref}'.", func_name=func_name
|
|
)
|
|
return None
|
|
else:
|
|
log_handler.log_error(
|
|
f"git show failed for '{ref_path_arg}' (RC {result.returncode}). Stderr: {result.stderr.strip()}",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
except GitCommandError as e:
|
|
log_handler.log_error(
|
|
f"Error executing git show for '{ref_path_arg}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
|
|
def add_file(self, working_directory: str, file_path: str):
|
|
func_name = "add_file"
|
|
if not file_path or file_path.isspace():
|
|
raise ValueError("File path empty.")
|
|
log_handler.log_info(
|
|
f"Adding path to staging: '{file_path}' in '{working_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "add", "--", file_path]
|
|
try:
|
|
self.log_and_execute(
|
|
cmd, working_directory, check=True, log_output_level=logging.DEBUG
|
|
)
|
|
log_handler.log_info(
|
|
f"Path '{file_path}' added successfully.", func_name=func_name
|
|
)
|
|
return True
|
|
except GitCommandError as add_e:
|
|
log_handler.log_error(
|
|
f"Failed to add path '{file_path}': {add_e}", func_name=func_name
|
|
)
|
|
stderr = add_e.stderr.lower() if add_e.stderr else ""
|
|
if "did not match any files" in stderr:
|
|
raise GitCommandError(
|
|
f"Pathspec '{file_path}' did not match any files.",
|
|
command=cmd,
|
|
stderr=add_e.stderr,
|
|
) from add_e
|
|
else:
|
|
raise add_e
|
|
|
|
def get_remotes(self, working_directory: str) -> dict[str, str]:
|
|
"""Gets a dictionary of remote names and their fetch URLs."""
|
|
# (Implementazione precedente invariata)
|
|
func_name = "get_remotes"
|
|
log_handler.log_debug(
|
|
f"Getting remotes for '{working_directory}'", func_name=func_name
|
|
)
|
|
cmd = ["git", "remote", "-v"]
|
|
remotes = {}
|
|
try:
|
|
# Usa le opzioni di default (capture=True, hide_console=True)
|
|
result = self.log_and_execute(
|
|
cmd, working_directory, check=True, log_output_level=logging.DEBUG
|
|
)
|
|
lines = result.stdout.strip().splitlines()
|
|
for line in lines:
|
|
parts = line.split()
|
|
if len(parts) == 3 and parts[2] == "(fetch)":
|
|
name = parts[0]
|
|
url = parts[1]
|
|
remotes[name] = url
|
|
log_handler.log_info(
|
|
f"Found {len(remotes)} remotes: {list(remotes.keys())}",
|
|
func_name=func_name,
|
|
)
|
|
return remotes
|
|
except GitCommandError as e:
|
|
# Gestione caso nessun remote trovato (non è un errore)
|
|
# Se check=True fallisce, è un errore; altrimenti controlla output
|
|
is_error = (
|
|
e.stderr and "fatal:" in e.stderr.lower()
|
|
) # Un vero errore git di solito è fatal
|
|
if not is_error and not remotes:
|
|
log_handler.log_info("No remotes found.", func_name=func_name)
|
|
return {}
|
|
else:
|
|
log_handler.log_error(
|
|
f"Failed to get remotes: {e}", func_name=func_name
|
|
)
|
|
raise # Rilancia veri errori
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error getting remotes: {e}", func_name=func_name
|
|
)
|
|
raise GitCommandError(f"Unexpected error getting remotes: {e}") from e
|
|
|
|
def add_remote(
|
|
self, working_directory: str, remote_name: str, remote_url: str
|
|
) -> bool:
|
|
"""Adds a new remote repository reference."""
|
|
# (Implementazione precedente invariata)
|
|
func_name = "add_remote"
|
|
log_handler.log_info(
|
|
f"Adding remote '{remote_name}' -> '{remote_url}'", func_name=func_name
|
|
)
|
|
cmd = ["git", "remote", "add", remote_name, remote_url]
|
|
try:
|
|
# Usa le opzioni di default (capture=True, hide_console=True, check=True)
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
log_handler.log_info(
|
|
f"Remote '{remote_name}' added successfully.", func_name=func_name
|
|
)
|
|
return True
|
|
except GitCommandError as e:
|
|
# Gestisce errore specifico "already exists"
|
|
stderr_low = e.stderr.lower() if e.stderr else ""
|
|
if "already exists" in stderr_low:
|
|
# Rilancia per farlo gestire al chiamante (RemoteActionHandler)
|
|
raise GitCommandError(
|
|
f"Remote '{remote_name}' already exists.",
|
|
command=cmd,
|
|
stderr=e.stderr,
|
|
) from e
|
|
else:
|
|
log_handler.log_error(
|
|
f"Failed to add remote '{remote_name}': {e}", func_name=func_name
|
|
)
|
|
raise # Rilancia altri errori Git
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error adding remote '{remote_name}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise GitCommandError(
|
|
f"Unexpected error adding remote: {e}", command=cmd
|
|
) from e
|
|
|
|
def set_remote_url(
|
|
self, working_directory: str, remote_name: str, remote_url: str
|
|
) -> bool:
|
|
"""Changes the URL of an existing remote."""
|
|
# (Implementazione precedente invariata)
|
|
func_name = "set_remote_url"
|
|
log_handler.log_info(
|
|
f"Setting URL for remote '{remote_name}' to '{remote_url}'",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "remote", "set-url", remote_name, remote_url]
|
|
try:
|
|
# Usa le opzioni di default (capture=True, hide_console=True, check=True)
|
|
self.log_and_execute(cmd, working_directory, check=True)
|
|
log_handler.log_info(
|
|
f"URL for remote '{remote_name}' set successfully.", func_name=func_name
|
|
)
|
|
return True
|
|
except GitCommandError as e:
|
|
# Gestisce errore specifico "no such remote"
|
|
stderr_low = e.stderr.lower() if e.stderr else ""
|
|
if "no such remote" in stderr_low:
|
|
raise GitCommandError(
|
|
f"Remote '{remote_name}' does not exist, cannot set URL.",
|
|
command=cmd,
|
|
stderr=e.stderr,
|
|
) from e
|
|
else:
|
|
log_handler.log_error(
|
|
f"Failed to set URL for remote '{remote_name}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise # Rilancia altri errori Git
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error setting remote URL for '{remote_name}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise GitCommandError(
|
|
f"Unexpected error setting remote URL: {e}", command=cmd
|
|
) from e
|
|
|
|
# --- Placeholder for future remote commands ---
|
|
|
|
def git_fetch(
|
|
self, working_directory: str, remote_name: str, prune: bool = True
|
|
) -> subprocess.CompletedProcess:
|
|
"""
|
|
Executes 'git fetch <remote_name>' possibly with --prune.
|
|
Captures output and hides console by default.
|
|
Does NOT raise exception on non-zero exit code by default (check=False),
|
|
allowing the caller to analyze the result.
|
|
|
|
Args:
|
|
working_directory (str): Path to the repository.
|
|
remote_name (str): The name of the remote to fetch from.
|
|
prune (bool): If True, add '--prune' to remove stale remote-tracking branches.
|
|
|
|
Returns:
|
|
subprocess.CompletedProcess: The result of the command execution.
|
|
"""
|
|
func_name = "git_fetch"
|
|
log_handler.log_info(
|
|
f"Fetching from remote '{remote_name}' in '{working_directory}' (Prune={prune})",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "fetch", remote_name]
|
|
if prune:
|
|
cmd.append(
|
|
"--prune"
|
|
) # Aggiunge opzione per pulire branch remoti non più esistenti
|
|
|
|
# Esegui catturando output, nascondendo console, ma NON sollevare eccezione su errore (check=False)
|
|
# Il worker analizzerà il codice di ritorno e stderr per capire l'esito.
|
|
result = self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=False, # Importante: non sollevare eccezioni qui
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.INFO, # Logga output di fetch (es. aggiornamenti branch) a INFO
|
|
)
|
|
return result
|
|
|
|
def git_pull(
|
|
self, working_directory: str, remote_name: str, branch_name: str
|
|
) -> subprocess.CompletedProcess:
|
|
"""
|
|
Executes 'git pull <remote_name> <branch_name>'.
|
|
This performs a fetch and then merges the fetched branch into the current local branch.
|
|
Captures output and hides console by default.
|
|
Does NOT raise exception on non-zero exit code by default (check=False),
|
|
allowing the caller to analyze the result for success, conflicts, or errors.
|
|
|
|
Args:
|
|
working_directory (str): Path to the repository.
|
|
remote_name (str): The name of the remote to pull from.
|
|
branch_name (str): The name of the local branch currently checked out,
|
|
which corresponds to the remote branch to merge.
|
|
|
|
Returns:
|
|
subprocess.CompletedProcess: The result of the command execution.
|
|
"""
|
|
func_name = "git_pull"
|
|
# Nota: Git pull implicitamente opera sul branch corrente se non specificato diversamente,
|
|
# ma specificare remote e branch rende il comando più esplicito e meno dipendente
|
|
# dalla configurazione upstream (anche se idealmente quella è impostata).
|
|
# Il branch_name qui è più per riferimento nel log e potenziali future opzioni.
|
|
# Il comando di base `git pull <remote_name>` di solito basta se l'upstream è settato.
|
|
# Per ora, manteniamo il comando semplice `git pull <remote_name>`.
|
|
# Se l'upstream non è settato, il comando fallirà e l'utente dovrà impostarlo
|
|
# (potremmo aggiungere una feature per questo in futuro).
|
|
log_handler.log_info(
|
|
f"Pulling from remote '{remote_name}' into current branch ('{branch_name}') "
|
|
f"in '{working_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "pull", remote_name]
|
|
|
|
# Esegui catturando output, nascondendo console, ma NON sollevare eccezione (check=False)
|
|
# Il worker analizzerà il risultato per conflitti o altri errori.
|
|
result = self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=False, # Fondamentale per rilevare conflitti (RC=1)
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.INFO, # Logga output (aggiornamenti, merge) a INFO
|
|
)
|
|
return result
|
|
|
|
def git_push(
|
|
self,
|
|
working_directory: str,
|
|
remote_name: str,
|
|
branch_name: str,
|
|
set_upstream: bool = False,
|
|
force: bool = False, # Aggiunto parametro opzionale per force push
|
|
) -> subprocess.CompletedProcess:
|
|
"""
|
|
Executes 'git push [<options>] <remote_name> <branch_name>'.
|
|
Handles setting upstream and optional force push.
|
|
Does NOT raise exception on non-zero exit code (check=False).
|
|
|
|
Args:
|
|
working_directory (str): Path to the repository.
|
|
remote_name (str): The name of the remote to push to.
|
|
branch_name (str): The name of the local branch to push.
|
|
set_upstream (bool): If True, add '-u' or '--set-upstream' flag.
|
|
force (bool): If True, add '--force' flag (use with extreme caution!).
|
|
|
|
Returns:
|
|
subprocess.CompletedProcess: The result of the command execution.
|
|
"""
|
|
func_name = "git_push"
|
|
push_options = []
|
|
if set_upstream:
|
|
push_options.append("--set-upstream")
|
|
log_handler.log_info(
|
|
f"Pushing branch '{branch_name}' to '{remote_name}' and setting upstream.",
|
|
func_name=func_name,
|
|
)
|
|
else:
|
|
log_handler.log_info(
|
|
f"Pushing branch '{branch_name}' to '{remote_name}'.",
|
|
func_name=func_name,
|
|
)
|
|
|
|
if force:
|
|
push_options.append("--force")
|
|
log_handler.log_warning(
|
|
f"Executing FORCE PUSH for branch '{branch_name}' to '{remote_name}'!",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# Comando base: git push [options] <remote> <local_branch>[:<remote_branch>]
|
|
# Specificare il refspec completo (<local>:<remote>) è più robusto,
|
|
# specialmente se i nomi non coincidono. Per ora, assumiamo che coincidano.
|
|
cmd = ["git", "push"] + push_options + [remote_name, branch_name]
|
|
|
|
# Esegui catturando output, nascondendo console, ma NON sollevare eccezione (check=False)
|
|
# Il worker analizzerà il risultato per errori specifici (rifiutato, auth, etc.)
|
|
result = self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=False, # Importante per rilevare push rifiutati (RC=1)
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.INFO, # Logga output (es. riepilogo push) a INFO
|
|
)
|
|
return result
|
|
|
|
def git_push_tags(
|
|
self, working_directory: str, remote_name: str
|
|
) -> subprocess.CompletedProcess:
|
|
"""
|
|
Executes 'git push <remote_name> --tags'.
|
|
Does NOT raise exception on non-zero exit code (check=False).
|
|
|
|
Args:
|
|
working_directory (str): Path to the repository.
|
|
remote_name (str): The name of the remote to push tags to.
|
|
|
|
Returns:
|
|
subprocess.CompletedProcess: The result of the command execution.
|
|
"""
|
|
func_name = "git_push_tags"
|
|
log_handler.log_info(
|
|
f"Pushing all tags to remote '{remote_name}' from '{working_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
cmd = ["git", "push", remote_name, "--tags"]
|
|
|
|
# Esegui catturando output, nascondendo console, ma NON sollevare eccezione (check=False)
|
|
result = self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.INFO,
|
|
)
|
|
return result
|
|
|
|
def get_current_branch_name(self, working_directory: str) -> str | None:
|
|
"""
|
|
Gets the name of the currently checked-out branch.
|
|
Returns None if in detached HEAD state or on error.
|
|
"""
|
|
func_name = "get_current_branch_name"
|
|
log_handler.log_debug(
|
|
f"Getting current branch name in '{working_directory}'", func_name=func_name
|
|
)
|
|
# Usa 'git branch --show-current' (disponibile da Git 2.22+)
|
|
# In alternativa 'git rev-parse --abbrev-ref HEAD', che funziona anche prima
|
|
# ma può restituire 'HEAD' in detached state. 'symbolic-ref' è più robusto.
|
|
cmd = [
|
|
"git",
|
|
"symbolic-ref",
|
|
"--short",
|
|
"-q",
|
|
"HEAD",
|
|
] # -q sopprime errori se non è un branch (detached)
|
|
try:
|
|
# check=False perché può fallire legittimamente in detached HEAD (RC=1)
|
|
result = self.log_and_execute(
|
|
cmd,
|
|
working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.DEBUG,
|
|
)
|
|
|
|
if result.returncode == 0 and result.stdout:
|
|
branch_name = result.stdout.strip()
|
|
log_handler.log_info(
|
|
f"Current branch is '{branch_name}'", func_name=func_name
|
|
)
|
|
return branch_name
|
|
elif (
|
|
result.returncode == 1
|
|
): # Codice atteso per detached HEAD con symbolic-ref -q
|
|
log_handler.log_warning(
|
|
"Currently in detached HEAD state.", func_name=func_name
|
|
)
|
|
return None
|
|
else: # Altro errore
|
|
log_handler.log_error(
|
|
f"Failed to get current branch (RC={result.returncode}). Stderr: {result.stderr.strip() if result.stderr else 'N/A'}",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error getting current branch: {e}", func_name=func_name
|
|
)
|
|
return None # Ritorna None anche per eccezioni impreviste
|
|
|
|
def get_branch_upstream(
|
|
self, working_directory: str, branch_name: str
|
|
) -> str | None:
|
|
"""
|
|
Gets the upstream remote branch configured for a local branch.
|
|
|
|
Args:
|
|
working_directory (str): Path to the repository.
|
|
branch_name (str): The name of the local branch.
|
|
|
|
Returns:
|
|
str | None: The full name of the upstream branch (e.g., 'origin/main')
|
|
or None if no upstream is configured or on error.
|
|
"""
|
|
func_name = "get_branch_upstream"
|
|
log_handler.log_debug(
|
|
f"Getting upstream for branch '{branch_name}' in '{working_directory}'",
|
|
func_name=func_name,
|
|
)
|
|
# Usa 'git rev-parse --abbrev-ref <branch>@{upstream}'
|
|
# Questo comando restituisce il nome breve dell'upstream se esiste, altrimenti fallisce.
|
|
# L'uso di @{upstream} è un modo standard per riferirsi all'upstream configurato.
|
|
cmd = ["git", "rev-parse", "--abbrev-ref", f"{branch_name}@{{upstream}}"]
|
|
try:
|
|
# Esegui catturando output, nascondendo console. check=False perché fallisce se non c'è upstream.
|
|
result = self.log_and_execute(
|
|
cmd,
|
|
working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.DEBUG,
|
|
)
|
|
|
|
if result.returncode == 0 and result.stdout:
|
|
upstream_name = result.stdout.strip()
|
|
# Il comando sopra restituisce solo il nome del branch remoto (es. 'main'),
|
|
# ma noi vogliamo il nome completo 'remote/branch'. Dobbiamo costruirlo.
|
|
# Possiamo usare `git config branch.<name>.remote` e `branch.<name>.merge`.
|
|
remote_cmd = ["git", "config", "--get", f"branch.{branch_name}.remote"]
|
|
merge_ref_cmd = [
|
|
"git",
|
|
"config",
|
|
"--get",
|
|
f"branch.{branch_name}.merge",
|
|
]
|
|
|
|
remote_result = self.log_and_execute(
|
|
remote_cmd,
|
|
working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
)
|
|
merge_ref_result = self.log_and_execute(
|
|
merge_ref_cmd,
|
|
working_directory,
|
|
check=False,
|
|
capture=True,
|
|
hide_console=True,
|
|
)
|
|
|
|
if remote_result.returncode == 0 and merge_ref_result.returncode == 0:
|
|
remote = remote_result.stdout.strip()
|
|
merge_ref = merge_ref_result.stdout.strip() # Es. 'refs/heads/main'
|
|
# Estrai il nome semplice del branch dal ref completo
|
|
remote_branch_name = merge_ref.split("/")[-1]
|
|
full_upstream_name = f"{remote}/{remote_branch_name}"
|
|
log_handler.log_info(
|
|
f"Upstream for '{branch_name}' is '{full_upstream_name}'",
|
|
func_name=func_name,
|
|
)
|
|
return full_upstream_name
|
|
else:
|
|
# Se non riusciamo a ottenere remote/merge, c'è un problema di configurazione
|
|
log_handler.log_warning(
|
|
f"Could not determine full upstream name for '{branch_name}' despite rev-parse success.",
|
|
func_name=func_name,
|
|
)
|
|
return None # O forse l'upstream_name da rev-parse? Meglio essere sicuri.
|
|
|
|
elif "no upstream configured" in (result.stderr or "").lower():
|
|
log_handler.log_info(
|
|
f"No upstream configured for branch '{branch_name}'.",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
else: # Altro errore da rev-parse
|
|
log_handler.log_error(
|
|
f"Failed to get upstream for '{branch_name}' (RC={result.returncode}). Stderr: {result.stderr.strip() if result.stderr else 'N/A'}",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error getting upstream for '{branch_name}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
|
|
def get_ahead_behind_count(self, working_directory: str, local_branch: str, upstream_branch: str) -> Tuple[int | None, int | None]:
|
|
"""
|
|
Gets the number of commits the local branch is ahead and behind its upstream counterpart
|
|
using two separate 'git rev-list --count' commands.
|
|
|
|
Args:
|
|
working_directory (str): Path to the repository.
|
|
local_branch (str): The name of the local branch (implicitly HEAD when checked out).
|
|
upstream_branch (str): The full name of the upstream branch (e.g., 'origin/main').
|
|
|
|
Returns:
|
|
Tuple[int | None, int | None]: A tuple containing (ahead_count, behind_count).
|
|
Returns (None, None) if any command fails.
|
|
"""
|
|
func_name = "get_ahead_behind_count"
|
|
log_handler.log_debug(f"Getting ahead/behind count for '{local_branch}' vs '{upstream_branch}' using separate counts.", func_name=func_name)
|
|
|
|
ahead_count = None
|
|
behind_count = None
|
|
|
|
try:
|
|
# --- Get Ahead Count ---
|
|
# Conta i commit in locale non presenti nell'upstream
|
|
ahead_cmd = ["git", "rev-list", "--count", f"{upstream_branch}..{local_branch}"]
|
|
log_handler.log_debug(f"Executing ahead count command: {' '.join(ahead_cmd)}", func_name=func_name)
|
|
ahead_result = self.log_and_execute(
|
|
ahead_cmd, working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.DEBUG
|
|
)
|
|
|
|
if ahead_result.returncode == 0 and ahead_result.stdout:
|
|
try:
|
|
ahead_count = int(ahead_result.stdout.strip())
|
|
log_handler.log_debug(f"Ahead count: {ahead_count}", func_name=func_name)
|
|
except ValueError:
|
|
log_handler.log_error(f"Failed to parse ahead count output: '{ahead_result.stdout.strip()}'", func_name=func_name)
|
|
return None, None # Errore parsing
|
|
else:
|
|
log_handler.log_warning(f"Ahead count command failed (RC={ahead_result.returncode}). Stderr: {ahead_result.stderr.strip() if ahead_result.stderr else 'N/A'}", func_name=func_name)
|
|
return None, None # Comando fallito
|
|
|
|
# --- Get Behind Count ---
|
|
# Conta i commit nell'upstream non presenti in locale
|
|
behind_cmd = ["git", "rev-list", "--count", f"{local_branch}..{upstream_branch}"]
|
|
log_handler.log_debug(f"Executing behind count command: {' '.join(behind_cmd)}", func_name=func_name)
|
|
behind_result = self.log_and_execute(
|
|
behind_cmd, working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.DEBUG
|
|
)
|
|
|
|
if behind_result.returncode == 0 and behind_result.stdout:
|
|
try:
|
|
behind_count = int(behind_result.stdout.strip())
|
|
log_handler.log_debug(f"Behind count: {behind_count}", func_name=func_name)
|
|
except ValueError:
|
|
log_handler.log_error(f"Failed to parse behind count output: '{behind_result.stdout.strip()}'", func_name=func_name)
|
|
return None, None # Errore parsing
|
|
else:
|
|
log_handler.log_warning(f"Behind count command failed (RC={behind_result.returncode}). Stderr: {behind_result.stderr.strip() if behind_result.stderr else 'N/A'}", func_name=func_name)
|
|
return None, None # Comando fallito
|
|
|
|
# --- Return Result ---
|
|
log_handler.log_info(f"Ahead/Behind for '{local_branch}': Ahead={ahead_count}, Behind={behind_count}", func_name=func_name)
|
|
# Restituisce la tupla nell'ordine (ahead, behind)
|
|
return ahead_count, behind_count
|
|
|
|
except Exception as e:
|
|
log_handler.log_exception(f"Unexpected error getting ahead/behind count: {e}", func_name=func_name)
|
|
return None, None # Segnala fallimento generico # Segnala fallimento # Segnala fallimento
|
|
|
|
def git_clone(self, remote_url: str, local_directory_path: str) -> subprocess.CompletedProcess:
|
|
"""
|
|
Executes 'git clone --progress <remote_url> <local_directory_path>'.
|
|
Captures output (including progress) and hides console by default.
|
|
Does NOT raise exception on non-zero exit code (check=False).
|
|
|
|
Args:
|
|
remote_url (str): The URL of the remote repository to clone.
|
|
local_directory_path (str): The full path to the new local directory
|
|
where the repository will be cloned.
|
|
|
|
Returns:
|
|
subprocess.CompletedProcess: The result of the command execution.
|
|
"""
|
|
func_name = "git_clone"
|
|
log_handler.log_info(f"Cloning repository from '{remote_url}' into '{local_directory_path}'", func_name=func_name)
|
|
|
|
# Comando: git clone --progress <url> <directory>
|
|
# --progress forza l'output dello stato anche se stderr non è un terminale,
|
|
# utile per il logging.
|
|
cmd = ["git", "clone", "--progress", remote_url, local_directory_path]
|
|
|
|
# Esegui catturando output, nascondendo console, ma NON sollevare eccezione (check=False)
|
|
# Il worker analizzerà il risultato per errori specifici (auth, path, etc.)
|
|
# Usiamo un timeout più lungo per clone, che può richiedere tempo
|
|
clone_timeout = 300 # 5 minuti, da aggiustare se necessario
|
|
|
|
# Nota: Eseguiamo il clone nella directory *corrente* del processo principale
|
|
# perché la directory di destinazione viene creata dal comando stesso.
|
|
# Non passiamo un working_directory specifico a log_and_execute.
|
|
result = self.log_and_execute(
|
|
command=cmd,
|
|
working_directory=".", # Esegui da CWD, Git crea la dir specificata
|
|
check=False, # Fondamentale per gestire errori specifici
|
|
capture=True,
|
|
hide_console=True,
|
|
log_output_level=logging.INFO # Logga output (progresso, errori) a INFO
|
|
# Timeout aumentato viene gestito internamente da log_and_execute se lo modifichiamo lì,
|
|
# altrimenti possiamo passarlo come argomento extra se log_and_execute lo accetta.
|
|
# Per ora, assumiamo che il timeout di log_and_execute sia sufficiente o lo aumentiamo lì.
|
|
)
|
|
return result
|
|
|
|
def git_list_remote_branches(self, working_directory: str, remote_name: str) -> List[str]:
|
|
"""
|
|
Lists remote-tracking branches for a specific remote by running 'git branch -r'
|
|
and filtering the output.
|
|
|
|
Args:
|
|
working_directory (str): Path to the repository.
|
|
remote_name (str): The name of the remote (e.g., 'origin').
|
|
|
|
Returns:
|
|
List[str]: A list of remote-tracking branch names for the specified remote
|
|
(e.g., ['origin/main', 'origin/develop']).
|
|
Excludes the 'HEAD -> ...' entry.
|
|
Returns an empty list if no branches found or on error.
|
|
"""
|
|
func_name = "git_list_remote_branches"
|
|
log_handler.log_debug(f"Listing remote branches for '{remote_name}' via 'git branch -r' in '{working_directory}'", func_name=func_name)
|
|
|
|
# ---<<< MODIFICA: Usa 'git branch -r' e filtra dopo >>>---
|
|
# Comando: git branch -r (elenca tutti i branch remoti di tutti i remote)
|
|
cmd = ["git", "branch", "-r"]
|
|
# ---<<< FINE MODIFICA >>>---
|
|
|
|
try:
|
|
# Esegui catturando output, nascondendo console. Check=True.
|
|
result = self.log_and_execute(
|
|
cmd, working_directory, check=True, capture=True, hide_console=True, log_output_level=logging.DEBUG
|
|
)
|
|
|
|
remote_branches = []
|
|
if result.stdout:
|
|
prefix_to_match = f"{remote_name}/" # Es: "origin/"
|
|
raw_lines = result.stdout.splitlines()
|
|
for line in raw_lines:
|
|
branch_name_raw = line.strip()
|
|
# ---<<< MODIFICA: Filtra per remote e escludi HEAD >>>---
|
|
# Ignora la riga HEAD -> ...
|
|
if "->" in branch_name_raw:
|
|
continue
|
|
# Verifica se inizia con il nome del remote corretto
|
|
if branch_name_raw.startswith(prefix_to_match):
|
|
# Prendi il nome pulito (es. origin/master)
|
|
branch_name = branch_name_raw # Già nel formato desiderato
|
|
remote_branches.append(branch_name)
|
|
# ---<<< FINE MODIFICA >>>---
|
|
|
|
log_handler.log_info(f"Found {len(remote_branches)} remote branches for '{remote_name}'.", func_name=func_name)
|
|
log_handler.log_debug(f"Filtered remote branches for '{remote_name}': {remote_branches}", func_name=func_name)
|
|
return sorted(remote_branches) # Restituisci in ordine alfabetico
|
|
else:
|
|
# Comando riuscito ma nessun output = nessun branch remoto
|
|
log_handler.log_info(f"No remote branches found in 'git branch -r' output.", func_name=func_name)
|
|
return []
|
|
|
|
except GitCommandError as e:
|
|
# Se 'git branch -r' fallisce, è un errore più generico
|
|
log_handler.log_error(f"Failed to list remote branches using 'git branch -r' for '{remote_name}': {e}", func_name=func_name)
|
|
return [] # Restituisci lista vuota
|
|
except Exception as e:
|
|
log_handler.log_exception(f"Unexpected error listing remote branches for '{remote_name}': {e}", func_name=func_name)
|
|
return [] # Restituisci lista vuota # Restituisci lista vuota per eccezioni impreviste
|
|
|
|
|
|
# --- END OF FILE git_commands.py ---
|