# --- START OF FILE git_commands.py --- # git_commands.py import os import subprocess import logging # Rimosso import logging import re # Importa il nuovo gestore della coda log import log_handler from typing import Tuple, Dict, List # --- Custom Exception Definition (invariata) --- class GitCommandError(Exception): """ Custom exception for handling Git command execution errors. Includes the original command and stderr details if available. """ def __init__(self, message, command=None, stderr=None): """Initialize the GitCommandError.""" super().__init__(str(message)) self.command = command self.stderr = stderr if stderr else "" def __str__(self): """Return a formatted string representation including command details.""" base_message = super().__str__() details = [] if self.command: safe_command_parts = [str(part) for part in self.command] command_str = " ".join(safe_command_parts) details.append(f"Command: '{command_str}'") stderr_str = self.stderr.strip() if stderr_str: details.append(f"Stderr: {stderr_str}") if details: details_str = "; ".join(details) return f"{base_message} ({details_str})" else: return base_message # --- Main Git Commands Class --- class GitCommands: """ Manages Git command execution, logging (via log_handler), and error handling. """ # Rimosso logger da __init__ def __init__(self, logger_ignored=None): # Accetta argomento ma lo ignora """Initializes the GitCommands class.""" # Non c'è più self.logger log_handler.log_debug("GitCommands initialized.", func_name="__init__") def log_and_execute( self, command: list[str], working_directory: str, check: bool = True, log_output_level: int = logging.INFO, # ---<<< NUOVI PARAMETRI >>>--- capture: bool = True, # Cattura stdout/stderr? hide_console: bool = True, # Nascondi finestra console (Windows)? # ---<<< FINE NUOVI PARAMETRI >>>--- ) -> subprocess.CompletedProcess: """ Executes a shell command, logs details, handles errors, with options for capturing output and hiding the console window. Args: command (list): Command and arguments as a list of strings. working_directory (str): The directory to execute the command in. check (bool): If True, raises GitCommandError on non-zero exit code. log_output_level (int): Logging level for stdout/stderr on success. capture (bool): If True, capture stdout and stderr. If False, they might go to the parent console/terminal. hide_console (bool): If True (and on Windows), try to hide the console window created for the subprocess. Returns: subprocess.CompletedProcess: Result object from subprocess.run. Raises: GitCommandError, ValueError, FileNotFoundError (wrapped), PermissionError (wrapped) """ func_name = "log_and_execute" safe_command_parts = [str(part) for part in command] command_str = " ".join(safe_command_parts) log_handler.log_debug( f"Executing in '{working_directory}': {command_str} " f"(Capture={capture}, HideConsole={hide_console})", # Log nuovi parametri func_name=func_name, ) # --- Validazione Working Directory (invariato) --- if not working_directory: msg = "Working directory cannot be None or empty." log_handler.log_error(msg, func_name=func_name) raise ValueError(msg) if working_directory == ".": effective_cwd = os.getcwd() else: effective_cwd = os.path.abspath(working_directory) if not os.path.isdir(effective_cwd): msg = ( f"Working directory does not exist or is not a dir: {effective_cwd}" ) log_handler.log_error(msg, func_name=func_name) raise GitCommandError(msg, command=safe_command_parts) # log_handler.log_debug(f"Effective CWD: {effective_cwd}", func_name=func_name) # Log meno verboso # --- Esecuzione Comando --- try: # ---<<< MODIFICA: Configurazione startupinfo/flags >>>--- startupinfo = None creationflags = 0 # Applica solo se richiesto E siamo su Windows if hide_console and os.name == "nt": startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW startupinfo.wShowWindow = subprocess.SW_HIDE # CREATE_NO_WINDOW potrebbe essere troppo aggressivo e impedire prompt # creationflags = subprocess.CREATE_NO_WINDOW elif not hide_console and os.name == "nt": # Se NON vogliamo nascondere, potremmo esplicitamente chiedere una nuova console # per isolare l'input/output del comando interattivo creationflags = subprocess.CREATE_NEW_CONSOLE # Su Linux/macOS, non impostiamo nulla di speciale per la finestra # ---<<< FINE MODIFICA >>>--- # Timeout diagnostico (mantenuto) timeout_seconds = 60 # Aumentato leggermente per operazioni remote # log_handler.log_debug(f"Setting timeout to {timeout_seconds} seconds.", func_name=func_name) # log_handler.log_debug(f"Attempting subprocess.run for: {command_str}", func_name=func_name) # Esecuzione comando con le opzioni corrette result = subprocess.run( safe_command_parts, cwd=effective_cwd, capture_output=capture, # Usa il nuovo parametro text=True, # Decodifica output come testo check=check, # Solleva eccezione su errore se True encoding="utf-8", # Encoding standard errors="replace", # Gestione errori decodifica timeout=timeout_seconds, startupinfo=startupinfo, # Passa la configurazione (o None) creationflags=creationflags, # Passa i flag (o 0) ) log_handler.log_debug( f"Command '{command_str}' finished. RC={result.returncode}", func_name=func_name, ) # --- Log Output di Successo (solo se catturato) --- if capture and (result.returncode == 0 or not check): stdout_log_debug = ( result.stdout.strip() if result.stdout else "" ) stderr_log_debug = ( result.stderr.strip() if result.stderr else "" ) # Logga sempre a DEBUG log_handler.log_debug( f"Command successful (RC={result.returncode}). Output:\n" f"--- stdout ---\n{stdout_log_debug}\n" f"--- stderr ---\n{stderr_log_debug}\n" f"--- End Output ---", func_name=func_name, ) # Logga anche al livello richiesto se diverso da DEBUG if log_output_level > logging.DEBUG: log_handler.log_message( log_output_level, f"Command successful. Output logged at DEBUG level.", # Messaggio più conciso qui func_name=func_name, ) return result # --- Gestione Errori (modificata per CalledProcessError quando capture=False) --- except subprocess.TimeoutExpired as e: # (Gestione Timeout invariata) log_handler.log_error( f"Command timed out after {timeout_seconds}s: {command_str}", func_name=func_name, ) stderr_out = ( e.stderr.strip() if capture and e.stderr else "" ) stdout_out = ( e.stdout.strip() if capture and e.stdout else "" ) log_handler.log_error(f"Timeout stderr: {stderr_out}", func_name=func_name) raise GitCommandError( f"Timeout after {timeout_seconds}s.", command=safe_command_parts, stderr=e.stderr if capture else None, ) from e except subprocess.CalledProcessError as e: # (Gestione errore comando fallito) # Se l'output non è stato catturato, e.stdout/e.stderr saranno None stderr_err = ( e.stderr.strip() if capture and e.stderr else "" ) stdout_err = ( e.stdout.strip() if capture and e.stdout else "" ) err_msg = ( f"Command failed (RC {e.returncode}) in '{effective_cwd}'.\n" f"CMD: {command_str}\nSTDERR: {stderr_err}\nSTDOUT: {stdout_err}" ) log_handler.log_error(err_msg, func_name=func_name) # Passa stderr all'eccezione solo se è stato catturato raise GitCommandError( f"Git command failed in '{effective_cwd}'. RC: {e.returncode}", command=safe_command_parts, stderr=e.stderr if capture else None, ) from e # (Gestione FileNotFoundError, PermissionError, Exception generica invariata) except FileNotFoundError as e: log_handler.log_error( f"FileNotFoundError for command: {safe_command_parts[0]}", func_name=func_name, ) error_msg = f"Command not found: '{safe_command_parts[0]}'. Is Git installed/in PATH?" log_handler.log_error(error_msg, func_name=func_name) raise GitCommandError(error_msg, command=safe_command_parts) from e except PermissionError as e: log_handler.log_error( f"PermissionError executing in '{effective_cwd}': {e}", func_name=func_name, ) error_msg = f"Permission denied executing command in '{effective_cwd}'." log_handler.log_error(error_msg, func_name=func_name) raise GitCommandError( error_msg, command=safe_command_parts, stderr=str(e) ) from e except Exception as e: log_handler.log_exception( f"Unexpected Exception executing command {command_str}: {e}", func_name=func_name, ) raise GitCommandError( f"Unexpected execution error: {e}", command=safe_command_parts ) from e # --- Core Repo Operations (usano log_handler) --- def prepare_svn_for_git(self, working_directory: str): func_name = "prepare_svn_for_git" log_handler.log_info( f"Preparing directory for Git: '{working_directory}'", func_name=func_name ) if not working_directory: raise ValueError("Working directory cannot be None or empty.") if not os.path.isdir(working_directory): raise GitCommandError( f"Target directory does not exist: {working_directory}" ) gitignore_path = os.path.join(working_directory, ".gitignore") git_dir_path = os.path.join(working_directory, ".git") svn_ignore = ".svn" if not os.path.exists(git_dir_path): log_handler.log_info( "No existing Git repo found. Initializing...", func_name=func_name ) try: init_cmd = ["git", "init"] self.log_and_execute(init_cmd, working_directory, check=True) log_handler.log_info("Git repo initialized.", func_name=func_name) except (GitCommandError, ValueError) as e: log_handler.log_error(f"Failed init Git repo: {e}", func_name=func_name) raise else: log_handler.log_info( "Existing Git repo found. Skipping init.", func_name=func_name ) log_handler.log_debug( f"Checking/updating .gitignore: {gitignore_path}", func_name=func_name ) try: needs_write = False content = "" write_content = "" if not os.path.exists(gitignore_path): log_handler.log_info( "'.gitignore' not found. Creating with .svn entry.", func_name=func_name, ) write_content = f"{svn_ignore}\n" needs_write = True else: try: with open(gitignore_path, "r", encoding="utf-8") as f: lines = f.readlines() ignored = any( l.strip() == svn_ignore or l.strip().startswith(svn_ignore + "/") for l in lines ) if not ignored: log_handler.log_info( "'.svn' not found in .gitignore. Appending...", func_name=func_name, ) content = "".join(lines) write_content += ( "\n" if not content.endswith("\n") else "" ) + f"{svn_ignore}\n" needs_write = True else: log_handler.log_info( "'.svn' already in .gitignore.", func_name=func_name ) except IOError as r_err: log_handler.log_error( f"Could not read .gitignore: {r_err}", func_name=func_name ) raise IOError(f"Failed read: {r_err}") from r_err if needs_write: mode = ( "a" if os.path.exists(gitignore_path) and write_content.startswith("\n") else "w" ) log_handler.log_debug( f"Writing to {gitignore_path} (mode: {mode})", func_name=func_name ) try: with open( gitignore_path, mode, encoding="utf-8", newline="\n" ) as f: f.write(write_content) log_handler.log_info( f"Updated '{gitignore_path}'.", func_name=func_name ) except IOError as w_err: log_handler.log_error( f"Error writing .gitignore: {w_err}", func_name=func_name ) raise IOError(f"Failed write: {w_err}") from w_err except IOError as io_err: log_handler.log_error( f"IO error for .gitignore: {io_err}", func_name=func_name ) raise except Exception as e: log_handler.log_exception( f"Unexpected error managing .gitignore: {e}", func_name=func_name ) raise GitCommandError(f"Unexpected .gitignore error: {e}") from e log_handler.log_info( f"Directory preparation complete for '{working_directory}'.", func_name=func_name, ) def create_git_bundle(self, working_directory: str, bundle_path: str): func_name = "create_git_bundle" norm_path = os.path.normpath(bundle_path) git_path = norm_path.replace("\\", "/") log_handler.log_info( f"Attempting to create Git bundle: {git_path}", func_name=func_name ) log_handler.log_debug(f"Source repo: {working_directory}", func_name=func_name) command = ["git", "bundle", "create", git_path, "--all"] try: result = self.log_and_execute(command, working_directory, check=False) if result.returncode != 0: stderr_low = result.stderr.lower() if result.stderr else "" if ( "refusing to create empty bundle" in stderr_low or "does not contain any references" in stderr_low ): log_handler.log_warning( f"Bundle creation skipped: Repo '{working_directory}' empty.", func_name=func_name, ) else: err_msg = f"Git bundle command failed (RC {result.returncode})." log_handler.log_error( f"{err_msg} Stderr: {result.stderr.strip()}", func_name=func_name, ) raise GitCommandError( err_msg, command=command, stderr=result.stderr ) else: if not os.path.exists(norm_path) or os.path.getsize(norm_path) == 0: log_handler.log_warning( f"Bundle cmd OK, but file '{norm_path}' missing/empty.", func_name=func_name, ) else: log_handler.log_info( f"Bundle created successfully: '{norm_path}'.", func_name=func_name, ) except Exception as e: log_handler.log_exception( f"Unexpected bundle creation error: {e}", func_name=func_name ) raise GitCommandError( f"Unexpected bundle error: {e}", command=command ) from e def fetch_from_git_bundle(self, working_directory: str, bundle_path: str): func_name = "fetch_from_git_bundle" if not os.path.isfile(bundle_path): raise FileNotFoundError(f"Bundle file not found: {bundle_path}") norm_path = os.path.normpath(bundle_path).replace("\\", "/") log_handler.log_info( f"Fetching from '{norm_path}' into '{working_directory}'", func_name=func_name, ) fetch_cmd = ["git", "fetch", norm_path, "--verbose"] merge_cmd = ["git", "merge", "FETCH_HEAD", "--no-ff", "--no-edit"] try: log_handler.log_debug("Executing fetch...", func_name=func_name) self.log_and_execute(fetch_cmd, working_directory, check=True) log_handler.log_info("Fetch successful.", func_name=func_name) log_handler.log_debug("Executing merge...", func_name=func_name) merge_res = self.log_and_execute(merge_cmd, working_directory, check=False) out_log = merge_res.stdout.strip() if merge_res.stdout else "" err_log = merge_res.stderr.strip() if merge_res.stderr else "" combined_low = (out_log + err_log).lower() if merge_res.returncode == 0: if "already up to date" in combined_low: log_handler.log_info( "Repo already up-to-date.", func_name=func_name ) else: log_handler.log_info("Merge successful.", func_name=func_name) log_handler.log_debug( f"Merge stdout:\n{out_log}", func_name=func_name ) else: if ( "conflict" in combined_low or "automatic merge failed" in combined_low ): msg = f"Merge conflict occurred. Resolve manually in '{working_directory}' and commit." log_handler.log_error(msg, func_name=func_name) raise GitCommandError( msg, command=merge_cmd, stderr=merge_res.stderr ) else: msg = f"Merge command failed (RC {merge_res.returncode})." log_handler.log_error( f"{msg} Stderr: {err_log}", func_name=func_name ) raise GitCommandError( msg, command=merge_cmd, stderr=merge_res.stderr ) except (GitCommandError, ValueError, FileNotFoundError) as e: log_handler.log_error( f"Fetch/merge failed for '{working_directory}': {e}", func_name=func_name, ) raise except Exception as e: log_handler.log_exception( f"Unexpected fetch/merge error for '{working_directory}': {e}", func_name=func_name, ) raise GitCommandError(f"Unexpected fetch/merge error: {e}") from e # --- Commit and Status --- def git_commit(self, working_directory: str, message: str): func_name = "git_commit" log_handler.log_info( f"Attempting commit in '{working_directory}' msg: '{message[:50]}...'", func_name=func_name, ) if not message or message.isspace(): raise ValueError("Commit message empty.") try: add_cmd = ["git", "add", "."] log_handler.log_debug( "Staging all changes ('git add .')...", func_name=func_name ) self.log_and_execute(add_cmd, working_directory, check=True) log_handler.log_debug("Staging successful.", func_name=func_name) commit_cmd = ["git", "commit", "-m", message] log_handler.log_debug("Attempting commit...", func_name=func_name) result = self.log_and_execute(commit_cmd, working_directory, check=False) out_low = result.stdout.lower() if result.stdout else "" err_low = result.stderr.lower() if result.stderr else "" combined_low = out_low + err_low if result.returncode == 0: log_handler.log_info("Commit successful.", func_name=func_name) return True elif ( "nothing to commit" in combined_low or "no changes added to commit" in combined_low or "nothing added to commit" in combined_low ): log_handler.log_info( "No changes available to commit.", func_name=func_name ) return False else: msg = f"Commit command failed unexpectedly (RC {result.returncode})." log_handler.log_error( f"{msg} Stderr: {result.stderr.strip()}", func_name=func_name ) raise GitCommandError(msg, command=commit_cmd, stderr=result.stderr) except (GitCommandError, ValueError) as e: log_handler.log_error( f"Error during staging/commit: {e}", func_name=func_name ) raise def git_status_has_changes(self, working_directory: str): func_name = "git_status_has_changes" log_handler.log_debug( f"Checking status for changes in '{working_directory}'...", func_name=func_name, ) try: cmd = ["git", "status", "--porcelain=v1", "-unormal"] result = self.log_and_execute( cmd, working_directory, check=True, log_output_level=logging.DEBUG ) has_changes = bool(result.stdout.strip()) log_handler.log_debug( f"Status check complete. Has changes: {has_changes}", func_name=func_name, ) return has_changes except GitCommandError as e: log_handler.log_error( f"Git status check command failed: {e}", func_name=func_name ) raise # --- Tag Management --- def list_tags(self, working_directory: str): func_name = "list_tags" log_handler.log_info( f"Listing tags in '{working_directory}'...", func_name=func_name ) fmt = "%(refname:short)%09%(contents:subject)" cmd = ["git", "tag", "--list", f"--format={fmt}", "--sort=-creatordate"] tags = [] try: result = self.log_and_execute(cmd, working_directory, check=True) for line in result.stdout.splitlines(): line_strip = line.strip() if line_strip: parts = line_strip.split("\t", 1) name = parts[0].strip() subject = ( parts[1].strip() if len(parts) > 1 and parts[1].strip() else "(No subject/Lightweight)" ) tags.append((name, subject)) log_handler.log_info(f"Found {len(tags)} tags.", func_name=func_name) log_handler.log_debug(f"Tags found: {tags}", func_name=func_name) return tags except GitCommandError as e: log_handler.log_error(f"Error listing tags: {e}", func_name=func_name) return [] except Exception as e: log_handler.log_exception( f"Unexpected error listing tags: {e}", func_name=func_name ) return [] def create_tag(self, working_directory: str, tag_name: str, message: str): func_name = "create_tag" log_handler.log_info( f"Creating tag '{tag_name}' in '{working_directory}'", func_name=func_name ) if not tag_name or tag_name.isspace(): raise ValueError("Tag name empty.") if not message or message.isspace(): raise ValueError("Tag message empty.") pattern = r"^(?![./]|.*([./]{2,}|[.]$|\.lock$|@\{|\\\\))[^ \t\n\r\f\v~^:?*[\\]+(? subprocess.CompletedProcess: """ Executes 'git ls-remote ' to check connection and list refs. Captures output and hides console by default. Raises GitCommandError on failure. """ func_name = "git_ls_remote" log_handler.log_debug( f"Running ls-remote for '{remote_name}' in '{working_directory}'", func_name=func_name, ) cmd = [ "git", "ls-remote", "--exit-code", remote_name, ] # --exit-code fa fallire se remote non raggiungibile # Esegui catturando output e nascondendo console, solleva eccezione su errore # Non impostiamo check=True qui, analizzeremo il CompletedProcess nel chiamante result = self.log_and_execute( command=cmd, working_directory=working_directory, check=False, # Analizziamo noi il codice di ritorno e stderr capture=True, hide_console=True, log_output_level=logging.DEBUG, # Logga output solo a DEBUG ) return result def git_fetch_interactive( self, working_directory: str, remote_name: str ) -> subprocess.CompletedProcess: """ Executes 'git fetch ' allowing user interaction. Does NOT capture output and tries to show a console window for prompts. """ func_name = "git_fetch_interactive" log_handler.log_info( f"Running interactive fetch for '{remote_name}' in '{working_directory}'. User may see a terminal.", func_name=func_name, ) cmd = ["git", "fetch", remote_name] # Esegui SENZA catturare output e SENZA nascondere la console # check=False perché vogliamo analizzare noi l'esito nel worker result = self.log_and_execute( command=cmd, working_directory=working_directory, check=False, capture=False, # Non catturare stdout/stderr hide_console=False, # Non nascondere la console ) # Nota: result.stdout e result.stderr saranno None qui return result def get_matching_gitignore_rule(self, working_directory: str, path_to_check: str): func_name = "get_matching_gitignore_rule" cmd = ["git", "check-ignore", "-v", "--no-index", "--", path_to_check] log_handler.log_debug( f"Getting matching rule for: '{path_to_check}'", func_name=func_name ) try: result = self.log_and_execute(cmd, working_directory, check=False) if result.returncode == 0: line = result.stdout.strip() if line and "\t" in line: rule_part = line.split("\t", 1)[0] parts = rule_part.split(":", 2) if len(parts) == 3: pattern = parts[2] log_handler.log_debug( f"Path '{path_to_check}' matched rule: '{pattern}'", func_name=func_name, ) return pattern else: log_handler.log_warning( f"Could not parse pattern from: {rule_part}", func_name=func_name, ) return None else: log_handler.log_warning( f"Unexpected output from check-ignore -v: {line}", func_name=func_name, ) return None elif result.returncode == 1: log_handler.log_debug( f"Path '{path_to_check}' not ignored.", func_name=func_name ) return None else: msg = f"check-ignore -v failed (RC {result.returncode})" log_handler.log_error( f"{msg}. Stderr: {result.stderr.strip()}", func_name=func_name ) raise GitCommandError(msg, command=cmd, stderr=result.stderr) except GitCommandError as e: log_handler.log_error( f"Failed get_matching_rule for '{path_to_check}': {e}", func_name=func_name, ) raise def get_status_short(self, working_directory: str): func_name = "get_status_short" log_handler.log_debug( f"Getting short status for '{working_directory}' (-z)", func_name=func_name ) cmd = ["git", "status", "--short", "-z", "--ignored=no"] try: result = self.log_and_execute( cmd, working_directory, check=True, log_output_level=logging.DEBUG ) # <<< MODIFICA/VERIFICA >>> raw_output = result.stdout log_handler.log_debug( f"Raw stdout length: {len(raw_output)}", func_name=func_name ) # Logga la rappresentazione repr() che mostra caratteri speciali come \x00 log_handler.log_debug( f"Raw stdout repr: {repr(raw_output)}", func_name=func_name ) # Esegui lo split e verifica status_lines = [ line for line in raw_output.split("\0") if line ] # Filtra stringhe vuote log_handler.log_debug( f"Split resulted in {len(status_lines)} non-empty lines.", func_name=func_name, ) # Logga la lista risultante per conferma log_handler.log_debug( f"Split lines list: {status_lines}", func_name=func_name ) # <<< FINE MODIFICA/VERIFICA >>> log_handler.log_info( f"Status check returned {len(status_lines)} items.", func_name=func_name ) return status_lines except GitCommandError as e: log_handler.log_error(f"Failed get status: {e}", func_name=func_name) return [] except Exception as e: log_handler.log_exception( f"Unexpected error getting status: {e}", func_name=func_name ) return [] def get_file_content_from_ref( self, working_directory: str, file_path: str, ref: str = "HEAD" ): func_name = "get_file_content_from_ref" git_path = file_path.replace(os.path.sep, "/") ref_prefix = f"{ref}:" if ref else ":" ref_path_arg = f"{ref_prefix}{git_path}" log_handler.log_debug( f"Getting content for '{ref_path_arg}' in '{working_directory}'", func_name=func_name, ) cmd = ["git", "show", ref_path_arg] try: result = self.log_and_execute( cmd, working_directory, check=False, log_output_level=logging.DEBUG ) if result.returncode == 0: return result.stdout # Success elif result.returncode == 128 and ( "exists on disk, but not in" in result.stderr or "does not exist in" in result.stderr or "fatal: Path" in result.stderr and "does not exist" in result.stderr or "did not match any file(s)" in result.stderr ): log_handler.log_debug( f"File '{git_path}' not found in ref '{ref}'.", func_name=func_name ) return None else: log_handler.log_error( f"git show failed for '{ref_path_arg}' (RC {result.returncode}). Stderr: {result.stderr.strip()}", func_name=func_name, ) return None except GitCommandError as e: log_handler.log_error( f"Error executing git show for '{ref_path_arg}': {e}", func_name=func_name, ) return None def add_file(self, working_directory: str, file_path: str): func_name = "add_file" if not file_path or file_path.isspace(): raise ValueError("File path empty.") log_handler.log_info( f"Adding path to staging: '{file_path}' in '{working_directory}'", func_name=func_name, ) cmd = ["git", "add", "--", file_path] try: self.log_and_execute( cmd, working_directory, check=True, log_output_level=logging.DEBUG ) log_handler.log_info( f"Path '{file_path}' added successfully.", func_name=func_name ) return True except GitCommandError as add_e: log_handler.log_error( f"Failed to add path '{file_path}': {add_e}", func_name=func_name ) stderr = add_e.stderr.lower() if add_e.stderr else "" if "did not match any files" in stderr: raise GitCommandError( f"Pathspec '{file_path}' did not match any files.", command=cmd, stderr=add_e.stderr, ) from add_e else: raise add_e def get_remotes(self, working_directory: str) -> dict[str, str]: """Gets a dictionary of remote names and their fetch URLs.""" # (Implementazione precedente invariata) func_name = "get_remotes" log_handler.log_debug( f"Getting remotes for '{working_directory}'", func_name=func_name ) cmd = ["git", "remote", "-v"] remotes = {} try: # Usa le opzioni di default (capture=True, hide_console=True) result = self.log_and_execute( cmd, working_directory, check=True, log_output_level=logging.DEBUG ) lines = result.stdout.strip().splitlines() for line in lines: parts = line.split() if len(parts) == 3 and parts[2] == "(fetch)": name = parts[0] url = parts[1] remotes[name] = url log_handler.log_info( f"Found {len(remotes)} remotes: {list(remotes.keys())}", func_name=func_name, ) return remotes except GitCommandError as e: # Gestione caso nessun remote trovato (non è un errore) # Se check=True fallisce, è un errore; altrimenti controlla output is_error = ( e.stderr and "fatal:" in e.stderr.lower() ) # Un vero errore git di solito è fatal if not is_error and not remotes: log_handler.log_info("No remotes found.", func_name=func_name) return {} else: log_handler.log_error( f"Failed to get remotes: {e}", func_name=func_name ) raise # Rilancia veri errori except Exception as e: log_handler.log_exception( f"Unexpected error getting remotes: {e}", func_name=func_name ) raise GitCommandError(f"Unexpected error getting remotes: {e}") from e def add_remote( self, working_directory: str, remote_name: str, remote_url: str ) -> bool: """Adds a new remote repository reference.""" # (Implementazione precedente invariata) func_name = "add_remote" log_handler.log_info( f"Adding remote '{remote_name}' -> '{remote_url}'", func_name=func_name ) cmd = ["git", "remote", "add", remote_name, remote_url] try: # Usa le opzioni di default (capture=True, hide_console=True, check=True) self.log_and_execute(cmd, working_directory, check=True) log_handler.log_info( f"Remote '{remote_name}' added successfully.", func_name=func_name ) return True except GitCommandError as e: # Gestisce errore specifico "already exists" stderr_low = e.stderr.lower() if e.stderr else "" if "already exists" in stderr_low: # Rilancia per farlo gestire al chiamante (RemoteActionHandler) raise GitCommandError( f"Remote '{remote_name}' already exists.", command=cmd, stderr=e.stderr, ) from e else: log_handler.log_error( f"Failed to add remote '{remote_name}': {e}", func_name=func_name ) raise # Rilancia altri errori Git except Exception as e: log_handler.log_exception( f"Unexpected error adding remote '{remote_name}': {e}", func_name=func_name, ) raise GitCommandError( f"Unexpected error adding remote: {e}", command=cmd ) from e def set_remote_url( self, working_directory: str, remote_name: str, remote_url: str ) -> bool: """Changes the URL of an existing remote.""" # (Implementazione precedente invariata) func_name = "set_remote_url" log_handler.log_info( f"Setting URL for remote '{remote_name}' to '{remote_url}'", func_name=func_name, ) cmd = ["git", "remote", "set-url", remote_name, remote_url] try: # Usa le opzioni di default (capture=True, hide_console=True, check=True) self.log_and_execute(cmd, working_directory, check=True) log_handler.log_info( f"URL for remote '{remote_name}' set successfully.", func_name=func_name ) return True except GitCommandError as e: # Gestisce errore specifico "no such remote" stderr_low = e.stderr.lower() if e.stderr else "" if "no such remote" in stderr_low: raise GitCommandError( f"Remote '{remote_name}' does not exist, cannot set URL.", command=cmd, stderr=e.stderr, ) from e else: log_handler.log_error( f"Failed to set URL for remote '{remote_name}': {e}", func_name=func_name, ) raise # Rilancia altri errori Git except Exception as e: log_handler.log_exception( f"Unexpected error setting remote URL for '{remote_name}': {e}", func_name=func_name, ) raise GitCommandError( f"Unexpected error setting remote URL: {e}", command=cmd ) from e # --- Placeholder for future remote commands --- def git_fetch( self, working_directory: str, remote_name: str, prune: bool = True ) -> subprocess.CompletedProcess: """ Executes 'git fetch ' possibly with --prune. Captures output and hides console by default. Does NOT raise exception on non-zero exit code by default (check=False), allowing the caller to analyze the result. Args: working_directory (str): Path to the repository. remote_name (str): The name of the remote to fetch from. prune (bool): If True, add '--prune' to remove stale remote-tracking branches. Returns: subprocess.CompletedProcess: The result of the command execution. """ func_name = "git_fetch" log_handler.log_info( f"Fetching from remote '{remote_name}' in '{working_directory}' (Prune={prune})", func_name=func_name, ) cmd = ["git", "fetch", remote_name] if prune: cmd.append( "--prune" ) # Aggiunge opzione per pulire branch remoti non più esistenti # Esegui catturando output, nascondendo console, ma NON sollevare eccezione su errore (check=False) # Il worker analizzerà il codice di ritorno e stderr per capire l'esito. result = self.log_and_execute( command=cmd, working_directory=working_directory, check=False, # Importante: non sollevare eccezioni qui capture=True, hide_console=True, log_output_level=logging.INFO, # Logga output di fetch (es. aggiornamenti branch) a INFO ) return result def git_pull( self, working_directory: str, remote_name: str, branch_name: str ) -> subprocess.CompletedProcess: """ Executes 'git pull '. This performs a fetch and then merges the fetched branch into the current local branch. Captures output and hides console by default. Does NOT raise exception on non-zero exit code by default (check=False), allowing the caller to analyze the result for success, conflicts, or errors. Args: working_directory (str): Path to the repository. remote_name (str): The name of the remote to pull from. branch_name (str): The name of the local branch currently checked out, which corresponds to the remote branch to merge. Returns: subprocess.CompletedProcess: The result of the command execution. """ func_name = "git_pull" # Nota: Git pull implicitamente opera sul branch corrente se non specificato diversamente, # ma specificare remote e branch rende il comando più esplicito e meno dipendente # dalla configurazione upstream (anche se idealmente quella è impostata). # Il branch_name qui è più per riferimento nel log e potenziali future opzioni. # Il comando di base `git pull ` di solito basta se l'upstream è settato. # Per ora, manteniamo il comando semplice `git pull `. # Se l'upstream non è settato, il comando fallirà e l'utente dovrà impostarlo # (potremmo aggiungere una feature per questo in futuro). log_handler.log_info( f"Pulling from remote '{remote_name}' into current branch ('{branch_name}') " f"in '{working_directory}'", func_name=func_name, ) cmd = ["git", "pull", remote_name] # Esegui catturando output, nascondendo console, ma NON sollevare eccezione (check=False) # Il worker analizzerà il risultato per conflitti o altri errori. result = self.log_and_execute( command=cmd, working_directory=working_directory, check=False, # Fondamentale per rilevare conflitti (RC=1) capture=True, hide_console=True, log_output_level=logging.INFO, # Logga output (aggiornamenti, merge) a INFO ) return result def git_push( self, working_directory: str, remote_name: str, branch_name: str, set_upstream: bool = False, force: bool = False, # Aggiunto parametro opzionale per force push ) -> subprocess.CompletedProcess: """ Executes 'git push [] '. Handles setting upstream and optional force push. Does NOT raise exception on non-zero exit code (check=False). Args: working_directory (str): Path to the repository. remote_name (str): The name of the remote to push to. branch_name (str): The name of the local branch to push. set_upstream (bool): If True, add '-u' or '--set-upstream' flag. force (bool): If True, add '--force' flag (use with extreme caution!). Returns: subprocess.CompletedProcess: The result of the command execution. """ func_name = "git_push" push_options = [] if set_upstream: push_options.append("--set-upstream") log_handler.log_info( f"Pushing branch '{branch_name}' to '{remote_name}' and setting upstream.", func_name=func_name, ) else: log_handler.log_info( f"Pushing branch '{branch_name}' to '{remote_name}'.", func_name=func_name, ) if force: push_options.append("--force") log_handler.log_warning( f"Executing FORCE PUSH for branch '{branch_name}' to '{remote_name}'!", func_name=func_name, ) # Comando base: git push [options] [:] # Specificare il refspec completo (:) è più robusto, # specialmente se i nomi non coincidono. Per ora, assumiamo che coincidano. cmd = ["git", "push"] + push_options + [remote_name, branch_name] # Esegui catturando output, nascondendo console, ma NON sollevare eccezione (check=False) # Il worker analizzerà il risultato per errori specifici (rifiutato, auth, etc.) result = self.log_and_execute( command=cmd, working_directory=working_directory, check=False, # Importante per rilevare push rifiutati (RC=1) capture=True, hide_console=True, log_output_level=logging.INFO, # Logga output (es. riepilogo push) a INFO ) return result def git_push_tags( self, working_directory: str, remote_name: str ) -> subprocess.CompletedProcess: """ Executes 'git push --tags'. Does NOT raise exception on non-zero exit code (check=False). Args: working_directory (str): Path to the repository. remote_name (str): The name of the remote to push tags to. Returns: subprocess.CompletedProcess: The result of the command execution. """ func_name = "git_push_tags" log_handler.log_info( f"Pushing all tags to remote '{remote_name}' from '{working_directory}'", func_name=func_name, ) cmd = ["git", "push", remote_name, "--tags"] # Esegui catturando output, nascondendo console, ma NON sollevare eccezione (check=False) result = self.log_and_execute( command=cmd, working_directory=working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.INFO, ) return result def get_current_branch_name(self, working_directory: str) -> str | None: """ Gets the name of the currently checked-out branch. Returns None if in detached HEAD state or on error. """ func_name = "get_current_branch_name" log_handler.log_debug( f"Getting current branch name in '{working_directory}'", func_name=func_name ) # Usa 'git branch --show-current' (disponibile da Git 2.22+) # In alternativa 'git rev-parse --abbrev-ref HEAD', che funziona anche prima # ma può restituire 'HEAD' in detached state. 'symbolic-ref' è più robusto. cmd = [ "git", "symbolic-ref", "--short", "-q", "HEAD", ] # -q sopprime errori se non è un branch (detached) try: # check=False perché può fallire legittimamente in detached HEAD (RC=1) result = self.log_and_execute( cmd, working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.DEBUG, ) if result.returncode == 0 and result.stdout: branch_name = result.stdout.strip() log_handler.log_info( f"Current branch is '{branch_name}'", func_name=func_name ) return branch_name elif ( result.returncode == 1 ): # Codice atteso per detached HEAD con symbolic-ref -q log_handler.log_warning( "Currently in detached HEAD state.", func_name=func_name ) return None else: # Altro errore log_handler.log_error( f"Failed to get current branch (RC={result.returncode}). Stderr: {result.stderr.strip() if result.stderr else 'N/A'}", func_name=func_name, ) return None except Exception as e: log_handler.log_exception( f"Unexpected error getting current branch: {e}", func_name=func_name ) return None # Ritorna None anche per eccezioni impreviste def get_branch_upstream( self, working_directory: str, branch_name: str ) -> str | None: """ Gets the upstream remote branch configured for a local branch. Args: working_directory (str): Path to the repository. branch_name (str): The name of the local branch. Returns: str | None: The full name of the upstream branch (e.g., 'origin/main') or None if no upstream is configured or on error. """ func_name = "get_branch_upstream" log_handler.log_debug( f"Getting upstream for branch '{branch_name}' in '{working_directory}'", func_name=func_name, ) # Usa 'git rev-parse --abbrev-ref @{upstream}' # Questo comando restituisce il nome breve dell'upstream se esiste, altrimenti fallisce. # L'uso di @{upstream} è un modo standard per riferirsi all'upstream configurato. cmd = ["git", "rev-parse", "--abbrev-ref", f"{branch_name}@{{upstream}}"] try: # Esegui catturando output, nascondendo console. check=False perché fallisce se non c'è upstream. result = self.log_and_execute( cmd, working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.DEBUG, ) if result.returncode == 0 and result.stdout: upstream_name = result.stdout.strip() # Il comando sopra restituisce solo il nome del branch remoto (es. 'main'), # ma noi vogliamo il nome completo 'remote/branch'. Dobbiamo costruirlo. # Possiamo usare `git config branch..remote` e `branch..merge`. remote_cmd = ["git", "config", "--get", f"branch.{branch_name}.remote"] merge_ref_cmd = [ "git", "config", "--get", f"branch.{branch_name}.merge", ] remote_result = self.log_and_execute( remote_cmd, working_directory, check=False, capture=True, hide_console=True, ) merge_ref_result = self.log_and_execute( merge_ref_cmd, working_directory, check=False, capture=True, hide_console=True, ) if remote_result.returncode == 0 and merge_ref_result.returncode == 0: remote = remote_result.stdout.strip() merge_ref = merge_ref_result.stdout.strip() # Es. 'refs/heads/main' # Estrai il nome semplice del branch dal ref completo remote_branch_name = merge_ref.split("/")[-1] full_upstream_name = f"{remote}/{remote_branch_name}" log_handler.log_info( f"Upstream for '{branch_name}' is '{full_upstream_name}'", func_name=func_name, ) return full_upstream_name else: # Se non riusciamo a ottenere remote/merge, c'è un problema di configurazione log_handler.log_warning( f"Could not determine full upstream name for '{branch_name}' despite rev-parse success.", func_name=func_name, ) return None # O forse l'upstream_name da rev-parse? Meglio essere sicuri. elif "no upstream configured" in (result.stderr or "").lower(): log_handler.log_info( f"No upstream configured for branch '{branch_name}'.", func_name=func_name, ) return None else: # Altro errore da rev-parse log_handler.log_error( f"Failed to get upstream for '{branch_name}' (RC={result.returncode}). Stderr: {result.stderr.strip() if result.stderr else 'N/A'}", func_name=func_name, ) return None except Exception as e: log_handler.log_exception( f"Unexpected error getting upstream for '{branch_name}': {e}", func_name=func_name, ) return None def get_ahead_behind_count(self, working_directory: str, local_branch: str, upstream_branch: str) -> Tuple[int | None, int | None]: """ Gets the number of commits the local branch is ahead and behind its upstream counterpart using two separate 'git rev-list --count' commands. Args: working_directory (str): Path to the repository. local_branch (str): The name of the local branch (implicitly HEAD when checked out). upstream_branch (str): The full name of the upstream branch (e.g., 'origin/main'). Returns: Tuple[int | None, int | None]: A tuple containing (ahead_count, behind_count). Returns (None, None) if any command fails. """ func_name = "get_ahead_behind_count" log_handler.log_debug(f"Getting ahead/behind count for '{local_branch}' vs '{upstream_branch}' using separate counts.", func_name=func_name) ahead_count = None behind_count = None try: # --- Get Ahead Count --- # Conta i commit in locale non presenti nell'upstream ahead_cmd = ["git", "rev-list", "--count", f"{upstream_branch}..{local_branch}"] log_handler.log_debug(f"Executing ahead count command: {' '.join(ahead_cmd)}", func_name=func_name) ahead_result = self.log_and_execute( ahead_cmd, working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.DEBUG ) if ahead_result.returncode == 0 and ahead_result.stdout: try: ahead_count = int(ahead_result.stdout.strip()) log_handler.log_debug(f"Ahead count: {ahead_count}", func_name=func_name) except ValueError: log_handler.log_error(f"Failed to parse ahead count output: '{ahead_result.stdout.strip()}'", func_name=func_name) return None, None # Errore parsing else: log_handler.log_warning(f"Ahead count command failed (RC={ahead_result.returncode}). Stderr: {ahead_result.stderr.strip() if ahead_result.stderr else 'N/A'}", func_name=func_name) return None, None # Comando fallito # --- Get Behind Count --- # Conta i commit nell'upstream non presenti in locale behind_cmd = ["git", "rev-list", "--count", f"{local_branch}..{upstream_branch}"] log_handler.log_debug(f"Executing behind count command: {' '.join(behind_cmd)}", func_name=func_name) behind_result = self.log_and_execute( behind_cmd, working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.DEBUG ) if behind_result.returncode == 0 and behind_result.stdout: try: behind_count = int(behind_result.stdout.strip()) log_handler.log_debug(f"Behind count: {behind_count}", func_name=func_name) except ValueError: log_handler.log_error(f"Failed to parse behind count output: '{behind_result.stdout.strip()}'", func_name=func_name) return None, None # Errore parsing else: log_handler.log_warning(f"Behind count command failed (RC={behind_result.returncode}). Stderr: {behind_result.stderr.strip() if behind_result.stderr else 'N/A'}", func_name=func_name) return None, None # Comando fallito # --- Return Result --- log_handler.log_info(f"Ahead/Behind for '{local_branch}': Ahead={ahead_count}, Behind={behind_count}", func_name=func_name) # Restituisce la tupla nell'ordine (ahead, behind) return ahead_count, behind_count except Exception as e: log_handler.log_exception(f"Unexpected error getting ahead/behind count: {e}", func_name=func_name) return None, None # Segnala fallimento generico # Segnala fallimento # Segnala fallimento def git_clone(self, remote_url: str, local_directory_path: str) -> subprocess.CompletedProcess: """ Executes 'git clone --progress '. Captures output (including progress) and hides console by default. Does NOT raise exception on non-zero exit code (check=False). Args: remote_url (str): The URL of the remote repository to clone. local_directory_path (str): The full path to the new local directory where the repository will be cloned. Returns: subprocess.CompletedProcess: The result of the command execution. """ func_name = "git_clone" log_handler.log_info(f"Cloning repository from '{remote_url}' into '{local_directory_path}'", func_name=func_name) # Comando: git clone --progress # --progress forza l'output dello stato anche se stderr non è un terminale, # utile per il logging. cmd = ["git", "clone", "--progress", remote_url, local_directory_path] # Esegui catturando output, nascondendo console, ma NON sollevare eccezione (check=False) # Il worker analizzerà il risultato per errori specifici (auth, path, etc.) # Usiamo un timeout più lungo per clone, che può richiedere tempo clone_timeout = 300 # 5 minuti, da aggiustare se necessario # Nota: Eseguiamo il clone nella directory *corrente* del processo principale # perché la directory di destinazione viene creata dal comando stesso. # Non passiamo un working_directory specifico a log_and_execute. result = self.log_and_execute( command=cmd, working_directory=".", # Esegui da CWD, Git crea la dir specificata check=False, # Fondamentale per gestire errori specifici capture=True, hide_console=True, log_output_level=logging.INFO # Logga output (progresso, errori) a INFO # Timeout aumentato viene gestito internamente da log_and_execute se lo modifichiamo lì, # altrimenti possiamo passarlo come argomento extra se log_and_execute lo accetta. # Per ora, assumiamo che il timeout di log_and_execute sia sufficiente o lo aumentiamo lì. ) return result # --- END OF FILE git_commands.py ---