1645 lines
71 KiB
Python
1645 lines
71 KiB
Python
# --- FILE: gitsync_tool/core/action_handler.py ---
|
|
|
|
import os
|
|
import datetime
|
|
from typing import Tuple, Dict, List, Optional, Set, Any # Aggiunto Set
|
|
|
|
# ---<<< MODIFICA IMPORT >>>---
|
|
# Importa usando il percorso assoluto dal pacchetto gitsync_tool
|
|
from gitutility.logging_setup import log_handler
|
|
|
|
# Usa import relativi per salire di livello e raggiungere altri moduli
|
|
from ..commands.git_commands import GitCommands, GitCommandError
|
|
|
|
# Usa import relativo '.' per moduli nella stessa directory (core)
|
|
from .backup_handler import BackupHandler
|
|
|
|
# ---<<< FINE MODIFICA IMPORT >>>---
|
|
|
|
|
|
class ActionHandler:
|
|
"""
|
|
Handles the execution logic for core application actions that primarily
|
|
operate on the local repository or involve file system operations like
|
|
bundling and backups. Orchestrates calls to GitCommands and BackupHandler.
|
|
Methods are intended to be called from asynchronous workers.
|
|
Uses log_handler for centralized logging.
|
|
"""
|
|
|
|
# ---<<< MODIFICA: Type Hint per costruttore >>>---
|
|
# Usa i nomi delle classi importate
|
|
def __init__(self, git_commands: GitCommands, backup_handler: BackupHandler):
|
|
# ---<<< FINE MODIFICA >>>---
|
|
"""
|
|
Initializes the ActionHandler.
|
|
|
|
Args:
|
|
git_commands (GitCommands): Instance for executing Git commands.
|
|
backup_handler (BackupHandler): Instance for handling backups.
|
|
|
|
Raises:
|
|
TypeError: If provided arguments are not of the expected types.
|
|
"""
|
|
# Input type validation
|
|
if not isinstance(git_commands, GitCommands):
|
|
# Usa il nome corretto della classe nell'errore
|
|
raise TypeError("ActionHandler requires a GitCommands instance.")
|
|
if not isinstance(backup_handler, BackupHandler):
|
|
raise TypeError("ActionHandler requires a BackupHandler instance.")
|
|
|
|
self.git_commands: GitCommands = git_commands
|
|
self.backup_handler: BackupHandler = backup_handler
|
|
log_handler.log_debug("ActionHandler initialized.", func_name="__init__")
|
|
|
|
def _perform_backup_if_enabled(
|
|
self,
|
|
repo_path: str,
|
|
profile_name: str,
|
|
autobackup_enabled: bool,
|
|
backup_base_dir: str,
|
|
excluded_extensions: Set[str], # Usa Set importato
|
|
excluded_dirs: Set[str], # Usa Set importato
|
|
) -> Optional[str]: # Ritorna str o None
|
|
"""
|
|
Performs backup using BackupHandler if autobackup is enabled.
|
|
|
|
Uses the centralized log_handler for logging messages.
|
|
|
|
Args:
|
|
repo_path (str): Path to the repository to back up.
|
|
profile_name (str): Name of the profile (used for backup naming).
|
|
autobackup_enabled (bool): Flag indicating if autobackup is enabled.
|
|
backup_base_dir (str): Base destination directory for backups.
|
|
excluded_extensions (Set[str]): Set of lowercase file extensions to exclude.
|
|
excluded_dirs (Set[str]): Set of lowercase directory base names to exclude.
|
|
|
|
Returns:
|
|
Optional[str]: Path to the created backup ZIP file if successful and enabled,
|
|
otherwise None.
|
|
|
|
Raises:
|
|
IOError: If the backup process fails critically (propagated from BackupHandler).
|
|
ValueError: If input arguments are invalid.
|
|
"""
|
|
func_name = "_perform_backup_if_enabled" # Function name for logs
|
|
|
|
if not autobackup_enabled:
|
|
log_handler.log_debug(
|
|
f"Autobackup is disabled for profile '{profile_name}', skipping backup.",
|
|
func_name=func_name,
|
|
)
|
|
return None # Return None if autobackup is disabled
|
|
|
|
log_handler.log_info(
|
|
f"Autobackup enabled for '{profile_name}'. Starting backup process...",
|
|
func_name=func_name,
|
|
)
|
|
log_handler.log_debug(f" Source Repository: {repo_path}", func_name=func_name)
|
|
log_handler.log_debug(
|
|
f" Backup Destination Base: {backup_base_dir}", func_name=func_name
|
|
)
|
|
log_handler.log_debug(
|
|
f" Excluded Extensions: {excluded_extensions}", func_name=func_name
|
|
)
|
|
log_handler.log_debug(
|
|
f" Excluded Directory Names: {excluded_dirs}", func_name=func_name
|
|
)
|
|
|
|
# Input Validation
|
|
if not repo_path or not os.path.isdir(repo_path):
|
|
raise ValueError(
|
|
f"Invalid repository path provided for backup: '{repo_path}'"
|
|
)
|
|
if not backup_base_dir:
|
|
raise ValueError("Backup base directory cannot be empty.")
|
|
# Check types using Set from typing
|
|
if not isinstance(excluded_extensions, set):
|
|
raise TypeError("excluded_extensions parameter must be a set.")
|
|
if not isinstance(excluded_dirs, set):
|
|
raise TypeError("excluded_dirs parameter must be a set.")
|
|
|
|
# Delegate the backup creation task to BackupHandler
|
|
try:
|
|
backup_path: Optional[str] = self.backup_handler.create_zip_backup(
|
|
source_repo_path=repo_path,
|
|
backup_base_dir=backup_base_dir,
|
|
profile_name=profile_name,
|
|
excluded_extensions=excluded_extensions,
|
|
excluded_dirs_base=excluded_dirs, # Passa il nome corretto
|
|
)
|
|
|
|
# Log result based on backup_path
|
|
if backup_path:
|
|
log_handler.log_info(
|
|
f"Backup completed successfully. Archive created at: {backup_path}",
|
|
func_name=func_name,
|
|
)
|
|
else:
|
|
# This case occurs if create_zip_backup ran but returned None
|
|
# (e.g., source was empty, all files excluded, or empty zip removed)
|
|
log_handler.log_warning(
|
|
"Backup process finished, but no backup file was generated (source empty or fully excluded?).",
|
|
func_name=func_name,
|
|
)
|
|
return backup_path # Return path or None
|
|
|
|
except (IOError, ValueError, PermissionError) as backup_error:
|
|
# Log specific errors from BackupHandler
|
|
log_handler.log_error(
|
|
f"Backup failed for profile '{profile_name}': {backup_error}",
|
|
func_name=func_name,
|
|
)
|
|
# Re-raise as IOError for consistency or let original exception propagate
|
|
raise IOError(
|
|
f"Autobackup failed for profile '{profile_name}'. Reason: {backup_error}"
|
|
) from backup_error
|
|
except Exception as unexpected_error:
|
|
# Log unexpected errors during backup
|
|
log_handler.log_exception(
|
|
f"Unexpected error during backup for profile '{profile_name}': {unexpected_error}",
|
|
func_name=func_name,
|
|
)
|
|
# Re-raise wrapped in a standard error type if desired
|
|
raise IOError(
|
|
f"Unexpected autobackup failure for '{profile_name}'. Reason: {unexpected_error}"
|
|
) from unexpected_error
|
|
|
|
def execute_prepare_repo(self, repo_path: str) -> bool:
|
|
"""
|
|
Executes the 'prepare repository' action: initializes Git if needed
|
|
and ensures .svn is ignored in .gitignore. Uses log_handler.
|
|
|
|
Args:
|
|
repo_path (str): The path to the working directory.
|
|
|
|
Returns:
|
|
bool: True if preparation was successful or already prepared correctly.
|
|
|
|
Raises:
|
|
ValueError: If the repo_path is invalid or already prepared incorrectly.
|
|
GitCommandError: If git init fails.
|
|
IOError: If .gitignore manipulation fails.
|
|
"""
|
|
func_name: str = "execute_prepare_repo"
|
|
log_handler.log_info(
|
|
f"Executing repository preparation for directory: {repo_path}",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# Validate input path
|
|
if not repo_path or not os.path.isdir(repo_path):
|
|
raise ValueError(
|
|
f"Invalid directory provided for preparation: '{repo_path}'"
|
|
)
|
|
|
|
git_dir: str = os.path.join(repo_path, ".git")
|
|
|
|
# Check if Git repository already exists
|
|
if os.path.exists(git_dir):
|
|
# If it exists, we just need to ensure .svn is ignored, but signal warning
|
|
log_handler.log_warning(
|
|
f"Repository at '{repo_path}' is already initialized. Checking .gitignore only.",
|
|
func_name=func_name,
|
|
)
|
|
# Proceed to .gitignore check, but don't raise ValueError here unless .gitignore fails
|
|
else:
|
|
# Repository does not exist, initialize it
|
|
try:
|
|
log_handler.log_info(
|
|
"No existing Git repo found. Initializing...", func_name=func_name
|
|
)
|
|
self.git_commands.prepare_svn_for_git(
|
|
repo_path
|
|
) # This should handle init + gitignore
|
|
log_handler.log_info(
|
|
f"Repository initialized and prepared successfully: {repo_path}",
|
|
func_name=func_name,
|
|
)
|
|
return True # Initialization successful
|
|
except (GitCommandError, ValueError, IOError) as e:
|
|
log_handler.log_error(
|
|
f"Failed to prepare (initialize) repo '{repo_path}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise e # Re-raise the original exception
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error preparing repo '{repo_path}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise Exception(f"Unexpected preparation error in '{repo_path}'") from e
|
|
|
|
# If repo already existed, we still need to check/update .gitignore via prepare_svn_for_git
|
|
try:
|
|
# Call prepare_svn_for_git again, it should handle the ignore part even if init is skipped
|
|
self.git_commands.prepare_svn_for_git(repo_path)
|
|
log_handler.log_info(
|
|
f"Ensured .gitignore is configured for existing repo: {repo_path}",
|
|
func_name=func_name,
|
|
)
|
|
return True
|
|
except (
|
|
GitCommandError,
|
|
ValueError,
|
|
IOError,
|
|
) as e: # Catch errors during .gitignore update
|
|
log_handler.log_error(
|
|
f"Failed to update .gitignore for existing repo '{repo_path}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error updating .gitignore for existing repo '{repo_path}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise Exception(
|
|
f"Unexpected .gitignore update error in '{repo_path}'"
|
|
) from e
|
|
|
|
def execute_create_bundle(
|
|
self,
|
|
repo_path: str,
|
|
bundle_full_path: str,
|
|
profile_name: str,
|
|
autobackup_enabled: bool,
|
|
backup_base_dir: str,
|
|
autocommit_enabled: bool,
|
|
commit_message: str,
|
|
excluded_extensions: Set[str], # Use Set
|
|
excluded_dirs: Set[str], # Use Set
|
|
) -> Optional[str]: # Return path or None
|
|
"""
|
|
Executes the 'create bundle' action: performs backup (if enabled),
|
|
autocommit (if enabled and changes exist), and creates the Git bundle file.
|
|
|
|
Args:
|
|
repo_path (str): Path to the local repository.
|
|
bundle_full_path (str): Full path where the bundle file should be created.
|
|
profile_name (str): Name of the profile for logging/backup naming.
|
|
autobackup_enabled (bool): Whether to perform backup before bundling.
|
|
backup_base_dir (str): Directory for backups.
|
|
autocommit_enabled (bool): Whether to commit changes before bundling.
|
|
commit_message (str): Message to use for autocommit.
|
|
excluded_extensions (Set[str]): Extensions to exclude from backup.
|
|
excluded_dirs (Set[str]): Directory names to exclude from backup.
|
|
|
|
Returns:
|
|
Optional[str]: Full path to the created bundle file, or None if not created/empty.
|
|
|
|
Raises:
|
|
IOError: If backup fails critically.
|
|
GitCommandError: If git status, commit, or bundle commands fail.
|
|
ValueError: For invalid inputs.
|
|
Exception: For unexpected errors.
|
|
"""
|
|
func_name: str = "execute_create_bundle"
|
|
log_handler.log_info(
|
|
f"Executing 'Create Bundle' for profile '{profile_name}'...",
|
|
func_name=func_name,
|
|
)
|
|
# Log input parameters for debugging
|
|
log_handler.log_debug(f" Repository Path: {repo_path}", func_name=func_name)
|
|
log_handler.log_debug(
|
|
f" Target Bundle Path: {bundle_full_path}", func_name=func_name
|
|
)
|
|
log_handler.log_debug(
|
|
f" Autobackup Enabled: {autobackup_enabled}, Backup Dir: {backup_base_dir}",
|
|
func_name=func_name,
|
|
)
|
|
log_handler.log_debug(
|
|
f" Autocommit Enabled: {autocommit_enabled}", func_name=func_name
|
|
)
|
|
log_handler.log_debug(
|
|
f" Backup Excluded Extensions: {excluded_extensions}", func_name=func_name
|
|
)
|
|
log_handler.log_debug(
|
|
f" Backup Excluded Dirs: {excluded_dirs}", func_name=func_name
|
|
)
|
|
|
|
# --- 1. Backup Step (if enabled) ---
|
|
# Calls _perform_backup_if_enabled which now uses log_handler
|
|
# It raises IOError on critical failure.
|
|
self._perform_backup_if_enabled(
|
|
repo_path=repo_path,
|
|
profile_name=profile_name,
|
|
autobackup_enabled=autobackup_enabled,
|
|
backup_base_dir=backup_base_dir,
|
|
excluded_extensions=excluded_extensions,
|
|
excluded_dirs=excluded_dirs,
|
|
)
|
|
log_handler.log_debug(
|
|
"Backup step completed (if enabled).", func_name=func_name
|
|
)
|
|
|
|
# --- 2. Autocommit Step (if enabled) ---
|
|
commit_made_in_this_step: bool = False
|
|
if autocommit_enabled:
|
|
log_handler.log_info(
|
|
"Autocommit is enabled. Checking for changes...", func_name=func_name
|
|
)
|
|
try:
|
|
has_changes: bool = self.git_commands.git_status_has_changes(repo_path)
|
|
if has_changes:
|
|
log_handler.log_info(
|
|
"Changes detected, performing autocommit...",
|
|
func_name=func_name,
|
|
)
|
|
# Use provided commit message or generate a default one
|
|
commit_msg_to_use: str = commit_message.strip()
|
|
if not commit_msg_to_use:
|
|
now_str: str = datetime.datetime.now().strftime(
|
|
"%Y-%m-%d %H:%M"
|
|
)
|
|
commit_msg_to_use = (
|
|
f"Autocommit '{profile_name}' before bundle - {now_str}"
|
|
)
|
|
log_handler.log_debug(
|
|
f"Using default autocommit message: '{commit_msg_to_use}'",
|
|
func_name=func_name,
|
|
)
|
|
else:
|
|
log_handler.log_debug(
|
|
f"Using provided autocommit message: '{commit_msg_to_use}'",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# Execute the commit
|
|
commit_made_in_this_step = self.git_commands.git_commit(
|
|
repo_path, commit_msg_to_use
|
|
)
|
|
|
|
if commit_made_in_this_step:
|
|
log_handler.log_info(
|
|
"Autocommit successful.", func_name=func_name
|
|
)
|
|
else:
|
|
# git_commit returns False if there was nothing to commit after staging
|
|
log_handler.log_warning(
|
|
"Autocommit attempted, but no effective changes were committed (changes might be unstaged or only whitespace?).",
|
|
func_name=func_name,
|
|
)
|
|
else:
|
|
log_handler.log_info(
|
|
"No changes detected, skipping autocommit.", func_name=func_name
|
|
)
|
|
|
|
except (GitCommandError, ValueError) as commit_error:
|
|
# Log specific autocommit errors
|
|
log_handler.log_error(
|
|
f"Autocommit failed for profile '{profile_name}': {commit_error}",
|
|
func_name=func_name,
|
|
)
|
|
raise commit_error # Re-raise to stop the process
|
|
except Exception as unexpected_commit_error:
|
|
log_handler.log_exception(
|
|
f"Unexpected error during autocommit for profile '{profile_name}': {unexpected_commit_error}",
|
|
func_name=func_name,
|
|
)
|
|
raise Exception(
|
|
"Unexpected autocommit error"
|
|
) from unexpected_commit_error
|
|
else:
|
|
log_handler.log_debug(
|
|
"Autocommit is disabled, skipping commit step.", func_name=func_name
|
|
)
|
|
|
|
# --- 3. Create Bundle Step ---
|
|
log_handler.log_info(
|
|
f"Proceeding to create bundle file: {bundle_full_path}", func_name=func_name
|
|
)
|
|
try:
|
|
# Execute the bundle creation command
|
|
self.git_commands.create_git_bundle(repo_path, bundle_full_path)
|
|
|
|
# Verify bundle creation and size
|
|
bundle_exists: bool = os.path.exists(bundle_full_path)
|
|
bundle_not_empty: bool = (
|
|
bundle_exists and os.path.getsize(bundle_full_path) > 0
|
|
)
|
|
|
|
if bundle_exists and bundle_not_empty:
|
|
log_handler.log_info(
|
|
"Git bundle created successfully.", func_name=func_name
|
|
)
|
|
return bundle_full_path # Success, return path
|
|
else:
|
|
# Handle cases where bundle wasn't created or is empty
|
|
log_handler.log_warning(
|
|
f"Bundle file '{os.path.basename(bundle_full_path)}' was not created or is empty.",
|
|
func_name=func_name,
|
|
)
|
|
# Try to remove empty bundle file if it exists
|
|
if bundle_exists and not bundle_not_empty:
|
|
try:
|
|
os.remove(bundle_full_path)
|
|
log_handler.log_info(
|
|
f"Removed empty bundle file: {bundle_full_path}",
|
|
func_name=func_name,
|
|
)
|
|
except OSError as remove_error:
|
|
log_handler.log_warning(
|
|
f"Could not remove empty bundle file: {remove_error}",
|
|
func_name=func_name,
|
|
)
|
|
return None # Indicate bundle was not created successfully
|
|
except (GitCommandError, ValueError) as bundle_error:
|
|
log_handler.log_error(
|
|
f"Bundle creation failed for profile '{profile_name}': {bundle_error}",
|
|
func_name=func_name,
|
|
)
|
|
raise bundle_error # Re-raise
|
|
except Exception as unexpected_bundle_error:
|
|
log_handler.log_exception(
|
|
f"Unexpected error during bundle creation for profile '{profile_name}': {unexpected_bundle_error}",
|
|
func_name=func_name,
|
|
)
|
|
raise Exception(
|
|
"Unexpected bundle creation error"
|
|
) from unexpected_bundle_error
|
|
|
|
def execute_fetch_bundle(
|
|
self,
|
|
target_repo_path_str: str,
|
|
bundle_full_path: str,
|
|
profile_name: str,
|
|
autobackup_enabled: bool,
|
|
backup_base_dir: str,
|
|
excluded_extensions: Set[str], # Use Set
|
|
excluded_dirs: Set[str], # Use Set
|
|
) -> bool: # Returns True on success, raises error otherwise
|
|
"""
|
|
Executes the 'fetch from bundle' action. Clones if target is not a repo,
|
|
otherwise fetches and merges into the existing repo.
|
|
|
|
Args:
|
|
target_repo_path_str (str): Path to the target directory (may not exist).
|
|
bundle_full_path (str): Full path to the source bundle file.
|
|
profile_name (str): Profile name for logging/backup.
|
|
autobackup_enabled (bool): Whether to backup before fetching into existing repo.
|
|
backup_base_dir (str): Directory for backups.
|
|
excluded_extensions (Set[str]): Extensions to exclude from backup.
|
|
excluded_dirs (Set[str]): Dirs to exclude from backup.
|
|
|
|
Returns:
|
|
bool: True if the fetch/clone and merge (if applicable) succeeded.
|
|
|
|
Raises:
|
|
FileNotFoundError: If the bundle file does not exist.
|
|
IOError: If backup fails critically.
|
|
GitCommandError: If git clone, fetch, or merge commands fail.
|
|
ValueError: For invalid inputs.
|
|
Exception: For unexpected errors.
|
|
"""
|
|
func_name: str = "execute_fetch_bundle"
|
|
log_handler.log_info(
|
|
f"Executing 'Fetch from Bundle' for profile '{profile_name}'...",
|
|
func_name=func_name,
|
|
)
|
|
# Log parameters
|
|
log_handler.log_debug(
|
|
f" Target Directory: {target_repo_path_str}", func_name=func_name
|
|
)
|
|
log_handler.log_debug(
|
|
f" Source Bundle File: {bundle_full_path}", func_name=func_name
|
|
)
|
|
log_handler.log_debug(
|
|
f" Autobackup Enabled: {autobackup_enabled}", func_name=func_name
|
|
)
|
|
# Add logs for other params if needed
|
|
|
|
# --- Validate Bundle File ---
|
|
if not os.path.isfile(bundle_full_path):
|
|
error_msg: str = f"Bundle file not found at path: '{bundle_full_path}'"
|
|
log_handler.log_error(error_msg, func_name=func_name)
|
|
# Raise specific error for clear handling
|
|
raise FileNotFoundError(error_msg)
|
|
|
|
# --- Determine if Target is Existing Repo ---
|
|
# Note: target_repo_path_str might not exist yet if we need to clone
|
|
git_dir_path: str = os.path.join(target_repo_path_str, ".git")
|
|
is_existing_repo: bool = os.path.isdir(target_repo_path_str) and os.path.exists(
|
|
git_dir_path
|
|
)
|
|
log_handler.log_debug(
|
|
f"Target path is an existing Git repository: {is_existing_repo}",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# --- Perform Action: Fetch/Merge or Clone ---
|
|
if is_existing_repo:
|
|
# --- Fetch/Merge into Existing Repo ---
|
|
log_handler.log_info(
|
|
"Target is an existing repository. Performing Fetch and Merge...",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# 1. Backup Step (if enabled)
|
|
self._perform_backup_if_enabled(
|
|
repo_path=target_repo_path_str, # Backup the target repo
|
|
profile_name=profile_name,
|
|
autobackup_enabled=autobackup_enabled,
|
|
backup_base_dir=backup_base_dir,
|
|
excluded_extensions=excluded_extensions,
|
|
excluded_dirs=excluded_dirs,
|
|
)
|
|
log_handler.log_debug(
|
|
"Backup step completed (if enabled).", func_name=func_name
|
|
)
|
|
|
|
# 2. Fetch/Merge Step
|
|
log_handler.log_info(
|
|
f"Fetching updates from bundle into '{target_repo_path_str}'...",
|
|
func_name=func_name,
|
|
)
|
|
try:
|
|
# Use the dedicated method in GitCommands
|
|
self.git_commands.fetch_from_git_bundle(
|
|
target_repo_path_str, bundle_full_path
|
|
)
|
|
# If fetch_from_git_bundle completes without error, log success
|
|
log_handler.log_info(
|
|
"Fetch and merge process completed successfully.",
|
|
func_name=func_name,
|
|
)
|
|
return True # Indicate overall success
|
|
except GitCommandError as fetch_merge_error:
|
|
# Log the specific error (already logged by GitCommands method, but good to confirm here)
|
|
log_handler.log_error(
|
|
f"Fetch/Merge from bundle failed: {fetch_merge_error}",
|
|
func_name=func_name,
|
|
)
|
|
# Re-raise the original error to be handled by the worker
|
|
raise fetch_merge_error
|
|
except Exception as unexpected_fetch_error:
|
|
log_handler.log_exception(
|
|
f"Unexpected error during fetch/merge from bundle: {unexpected_fetch_error}",
|
|
func_name=func_name,
|
|
)
|
|
raise Exception(
|
|
"Unexpected fetch/merge error"
|
|
) from unexpected_fetch_error
|
|
|
|
else:
|
|
# --- Clone from Bundle ---
|
|
log_handler.log_info(
|
|
f"Target path '{target_repo_path_str}' is not an existing repository. Attempting to clone from bundle...",
|
|
func_name=func_name,
|
|
)
|
|
try:
|
|
# Use the dedicated method in GitCommands
|
|
success: bool = self.git_commands.clone_from_bundle(
|
|
bundle_full_path, target_repo_path_str
|
|
)
|
|
# clone_from_bundle should raise on error, so reaching here means success
|
|
if success:
|
|
log_handler.log_info(
|
|
f"Repository cloned successfully from bundle into '{target_repo_path_str}'.",
|
|
func_name=func_name,
|
|
)
|
|
return True # Indicate success
|
|
else:
|
|
# This case should ideally not be reached if clone_from_bundle raises exceptions
|
|
log_handler.log_error(
|
|
"Cloning reported failure unexpectedly.", func_name=func_name
|
|
)
|
|
raise Exception(
|
|
f"Cloning into '{target_repo_path_str}' failed unexpectedly."
|
|
)
|
|
except (
|
|
GitCommandError,
|
|
ValueError,
|
|
FileNotFoundError,
|
|
IOError,
|
|
) as clone_error:
|
|
# Log specific clone errors (already logged by GitCommands)
|
|
log_handler.log_error(
|
|
f"Cloning from bundle failed: {clone_error}", func_name=func_name
|
|
)
|
|
raise clone_error # Re-raise original error
|
|
except Exception as unexpected_clone_error:
|
|
log_handler.log_exception(
|
|
f"Unexpected error during cloning from bundle: {unexpected_clone_error}",
|
|
func_name=func_name,
|
|
)
|
|
raise Exception("Unexpected clone error") from unexpected_clone_error
|
|
|
|
# --- Other Local Git Actions ---
|
|
|
|
def execute_manual_commit(self, repo_path: str, commit_message: str) -> bool:
|
|
"""
|
|
Executes a manual commit action: stages all changes and commits.
|
|
|
|
Args:
|
|
repo_path (str): Path to the repository.
|
|
commit_message (str): The commit message.
|
|
|
|
Returns:
|
|
bool: True if a commit was successfully made, False if no changes were committed.
|
|
|
|
Raises:
|
|
ValueError: If commit message is empty.
|
|
GitCommandError: If staging or commit fails.
|
|
"""
|
|
func_name: str = "execute_manual_commit"
|
|
log_handler.log_info(
|
|
f"Executing manual commit for repository: {repo_path}", func_name=func_name
|
|
)
|
|
# Validate commit message
|
|
if not commit_message or commit_message.isspace():
|
|
raise ValueError(
|
|
"Commit message cannot be empty or contain only whitespace."
|
|
)
|
|
|
|
try:
|
|
# Call the git_commit method which handles staging and committing
|
|
commit_made: bool = self.git_commands.git_commit(repo_path, commit_message)
|
|
# Log based on whether a commit actually happened
|
|
if commit_made:
|
|
log_handler.log_info("Manual commit successful.", func_name=func_name)
|
|
else:
|
|
log_handler.log_info(
|
|
"Manual commit: No changes available to commit.",
|
|
func_name=func_name,
|
|
)
|
|
return commit_made # Return the status
|
|
except (GitCommandError, ValueError) as e:
|
|
# Log and re-raise errors from git_commit
|
|
log_handler.log_error(f"Manual commit failed: {e}", func_name=func_name)
|
|
raise e
|
|
except Exception as e:
|
|
# Log and wrap unexpected errors
|
|
log_handler.log_exception(
|
|
f"Unexpected error during manual commit: {e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected commit error") from e
|
|
|
|
def execute_create_tag(
|
|
self, repo_path: str, ignored: None, tag_name: str, tag_message: str
|
|
) -> bool:
|
|
"""
|
|
Executes the create annotated tag action. Performs a pre-tag commit if changes exist.
|
|
(Parameter 'ignored' is unused, kept for signature compatibility if needed).
|
|
|
|
Args:
|
|
repo_path (str): Path to the repository.
|
|
ignored (None): Unused parameter.
|
|
tag_name (str): The name for the new tag.
|
|
tag_message (str): The annotation message for the tag (also used for pre-tag commit).
|
|
|
|
Returns:
|
|
bool: True if the tag was created successfully.
|
|
|
|
Raises:
|
|
ValueError: If tag name or message is invalid.
|
|
GitCommandError: If the pre-tag commit or tag creation fails.
|
|
"""
|
|
func_name: str = "execute_create_tag"
|
|
log_handler.log_info(
|
|
f"Executing create tag '{tag_name}' for repository: {repo_path}",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# Validate tag name and message
|
|
if not tag_name or tag_name.isspace():
|
|
raise ValueError("Tag name cannot be empty.")
|
|
if not tag_message or tag_message.isspace():
|
|
raise ValueError(
|
|
"Tag message cannot be empty (required for annotated tag)."
|
|
)
|
|
# GitCommands.create_tag handles detailed name validation
|
|
|
|
# --- Attempt Pre-Tag Commit ---
|
|
# Commit any outstanding changes using the tag message before creating the tag
|
|
log_handler.log_info(
|
|
"Attempting pre-tag commit using the tag message...", func_name=func_name
|
|
)
|
|
try:
|
|
commit_made: bool = self.git_commands.git_commit(repo_path, tag_message)
|
|
if commit_made:
|
|
log_handler.log_info(
|
|
f"Pre-tag commit successful with message: '{tag_message}'",
|
|
func_name=func_name,
|
|
)
|
|
else:
|
|
log_handler.log_info(
|
|
"No changes detected for pre-tag commit.", func_name=func_name
|
|
)
|
|
except (GitCommandError, ValueError) as commit_error:
|
|
# If pre-tag commit fails, stop and raise the error
|
|
log_handler.log_error(
|
|
f"Failed during pre-tag commit for tag '{tag_name}': {commit_error}",
|
|
func_name=func_name,
|
|
)
|
|
raise commit_error # Re-raise
|
|
|
|
# --- Create Annotated Tag ---
|
|
log_handler.log_info(
|
|
f"Creating annotated tag '{tag_name}'...", func_name=func_name
|
|
)
|
|
try:
|
|
# Call the Git command to create the tag
|
|
self.git_commands.create_tag(repo_path, tag_name, tag_message)
|
|
log_handler.log_info(
|
|
f"Tag '{tag_name}' created successfully.", func_name=func_name
|
|
)
|
|
return True # Tag creation successful
|
|
except (GitCommandError, ValueError) as tag_error:
|
|
# Handle errors during tag creation (e.g., tag already exists)
|
|
log_handler.log_error(
|
|
f"Failed to create tag '{tag_name}': {tag_error}", func_name=func_name
|
|
)
|
|
raise tag_error # Re-raise
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error creating tag '{tag_name}': {e}", func_name=func_name
|
|
)
|
|
raise Exception(f"Unexpected error creating tag '{tag_name}'") from e
|
|
|
|
def execute_checkout_tag(self, repo_path: str, tag_name: str) -> bool:
|
|
"""
|
|
Executes checkout of a specific tag, entering detached HEAD state.
|
|
Checks for uncommitted changes before proceeding.
|
|
|
|
Args:
|
|
repo_path (str): Path to the repository.
|
|
tag_name (str): The name of the tag to checkout.
|
|
|
|
Returns:
|
|
bool: True if checkout was successful.
|
|
|
|
Raises:
|
|
ValueError: If tag name is empty or uncommitted changes exist.
|
|
GitCommandError: If status check or checkout fails.
|
|
"""
|
|
func_name: str = "execute_checkout_tag"
|
|
log_handler.log_info(
|
|
f"Executing checkout for tag '{tag_name}' in repository: {repo_path}",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# Validate tag name
|
|
if not tag_name or tag_name.isspace():
|
|
raise ValueError("Tag name cannot be empty.")
|
|
|
|
# --- Pre-Checkout Check: Uncommitted Changes ---
|
|
try:
|
|
log_handler.log_debug(
|
|
"Checking for uncommitted changes before tag checkout...",
|
|
func_name=func_name,
|
|
)
|
|
if self.git_commands.git_status_has_changes(repo_path):
|
|
# Raise specific error if changes exist
|
|
raise ValueError(
|
|
"Checkout aborted: Uncommitted changes exist. Please commit or stash first."
|
|
)
|
|
log_handler.log_debug(
|
|
"No uncommitted changes found. Proceeding with checkout.",
|
|
func_name=func_name,
|
|
)
|
|
except (GitCommandError, ValueError) as status_check_error:
|
|
# Handle errors during the status check itself
|
|
log_handler.log_error(
|
|
f"Status check failed before tag checkout: {status_check_error}",
|
|
func_name=func_name,
|
|
)
|
|
raise status_check_error # Re-raise
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error during status check: {e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected error checking repository status") from e
|
|
|
|
# --- Execute Checkout ---
|
|
try:
|
|
success: bool = self.git_commands.checkout_tag(repo_path, tag_name)
|
|
if success:
|
|
log_handler.log_info(
|
|
f"Successfully checked out tag '{tag_name}' (Detached HEAD state).",
|
|
func_name=func_name,
|
|
)
|
|
return True
|
|
else:
|
|
# This case might occur if checkout_tag doesn't raise on failure
|
|
log_handler.log_error(
|
|
f"Tag checkout command for '{tag_name}' reported failure without exception.",
|
|
func_name=func_name,
|
|
)
|
|
raise GitCommandError(
|
|
f"Checkout tag '{tag_name}' failed for an unknown reason."
|
|
)
|
|
except (GitCommandError, ValueError) as checkout_error:
|
|
# Handle specific checkout errors (e.g., tag not found)
|
|
log_handler.log_error(
|
|
f"Failed to checkout tag '{tag_name}': {checkout_error}",
|
|
func_name=func_name,
|
|
)
|
|
raise checkout_error # Re-raise
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error during tag checkout: {e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected tag checkout error") from e
|
|
|
|
def execute_create_branch(self, repo_path: str, branch_name: str) -> bool:
|
|
"""
|
|
Executes creation of a new local branch.
|
|
|
|
Args:
|
|
repo_path (str): Path to the repository.
|
|
branch_name (str): The name for the new branch.
|
|
|
|
Returns:
|
|
bool: True if branch creation was successful.
|
|
|
|
Raises:
|
|
ValueError: If branch name is invalid.
|
|
GitCommandError: If branch creation fails (e.g., already exists).
|
|
"""
|
|
func_name: str = "execute_create_branch"
|
|
log_handler.log_info(
|
|
f"Executing create branch '{branch_name}' in repository: {repo_path}",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# Validate branch name (basic check, detailed check in GitCommands)
|
|
if not branch_name or branch_name.isspace():
|
|
raise ValueError("Branch name cannot be empty.")
|
|
|
|
try:
|
|
success: bool = self.git_commands.create_branch(repo_path, branch_name)
|
|
if success:
|
|
log_handler.log_info(
|
|
f"Branch '{branch_name}' created successfully.", func_name=func_name
|
|
)
|
|
return True
|
|
else:
|
|
# Should not happen if create_branch raises errors correctly
|
|
log_handler.log_error(
|
|
f"Branch creation for '{branch_name}' reported failure without exception.",
|
|
func_name=func_name,
|
|
)
|
|
raise GitCommandError(
|
|
f"Branch creation '{branch_name}' failed unexpectedly."
|
|
)
|
|
except (GitCommandError, ValueError) as create_error:
|
|
# Handle errors like "already exists" or invalid name format
|
|
log_handler.log_error(
|
|
f"Failed to create branch '{branch_name}': {create_error}",
|
|
func_name=func_name,
|
|
)
|
|
raise create_error # Re-raise
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error creating branch '{branch_name}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise Exception("Unexpected branch creation error") from e
|
|
|
|
def execute_switch_branch(self, repo_path: str, branch_name: str) -> bool:
|
|
"""
|
|
Executes checkout (switch) to an existing local branch.
|
|
Checks for uncommitted changes before proceeding.
|
|
|
|
Args:
|
|
repo_path (str): Path to the repository.
|
|
branch_name (str): The name of the local branch to switch to.
|
|
|
|
Returns:
|
|
bool: True if switch was successful.
|
|
|
|
Raises:
|
|
ValueError: If branch name is empty or uncommitted changes exist.
|
|
GitCommandError: If status check or checkout fails.
|
|
"""
|
|
func_name: str = "execute_switch_branch"
|
|
log_handler.log_info(
|
|
f"Executing switch to branch '{branch_name}' in repository: {repo_path}",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# Validate branch name
|
|
if not branch_name or branch_name.isspace():
|
|
raise ValueError("Branch name cannot be empty.")
|
|
|
|
# --- Pre-Checkout Check: Uncommitted Changes ---
|
|
try:
|
|
log_handler.log_debug(
|
|
"Checking for uncommitted changes before branch switch...",
|
|
func_name=func_name,
|
|
)
|
|
if self.git_commands.git_status_has_changes(repo_path):
|
|
raise ValueError(
|
|
"Switch aborted: Uncommitted changes exist. Please commit or stash first."
|
|
)
|
|
log_handler.log_debug(
|
|
"No uncommitted changes found. Proceeding with switch.",
|
|
func_name=func_name,
|
|
)
|
|
except (GitCommandError, ValueError) as status_check_error:
|
|
log_handler.log_error(
|
|
f"Status check failed before branch switch: {status_check_error}",
|
|
func_name=func_name,
|
|
)
|
|
raise status_check_error
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error during status check: {e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected error checking repository status") from e
|
|
|
|
# --- Execute Checkout ---
|
|
try:
|
|
success: bool = self.git_commands.checkout_branch(repo_path, branch_name)
|
|
if success:
|
|
log_handler.log_info(
|
|
f"Switched successfully to branch '{branch_name}'.",
|
|
func_name=func_name,
|
|
)
|
|
return True
|
|
else:
|
|
log_handler.log_error(
|
|
f"Branch switch command for '{branch_name}' reported failure without exception.",
|
|
func_name=func_name,
|
|
)
|
|
raise GitCommandError(
|
|
f"Switch to branch '{branch_name}' failed unexpectedly."
|
|
)
|
|
except (GitCommandError, ValueError) as checkout_error:
|
|
# Handle errors like "branch not found"
|
|
log_handler.log_error(
|
|
f"Failed to switch to branch '{branch_name}': {checkout_error}",
|
|
func_name=func_name,
|
|
)
|
|
raise checkout_error
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error during branch switch: {e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected branch switch error") from e
|
|
|
|
def execute_delete_branch(
|
|
self, repo_path: str, branch_name: str, force: bool = False
|
|
):
|
|
# ---<<< NOT IMPLEMENTED - Mantenuto come prima >>>---
|
|
# (Il codice precedente sollevava NotImplementedError, manteniamo quello)
|
|
func_name = "execute_delete_branch"
|
|
if not branch_name:
|
|
raise ValueError("Branch name required.")
|
|
log_handler.log_info(
|
|
f"Executing delete branch '{branch_name}' (force={force}).",
|
|
func_name=func_name,
|
|
)
|
|
try:
|
|
# success = self.git_commands.delete_branch(repo_path, branch_name, force) # Metodo da implementare in GitCommands
|
|
# return success
|
|
log_handler.log_warning(
|
|
"execute_delete_branch not fully implemented.", func_name=func_name
|
|
)
|
|
raise NotImplementedError("Branch deletion logic needs implementation.")
|
|
except (GitCommandError, ValueError) as e:
|
|
log_handler.log_error(
|
|
f"Failed delete branch '{branch_name}': {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Failed delete branch '{branch_name}': {e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected branch deletion error") from e
|
|
|
|
def execute_delete_tag(self, repo_path: str, tag_name: str):
|
|
# ---<<< NOT IMPLEMENTED - Mantenuto come prima >>>---
|
|
func_name = "execute_delete_tag"
|
|
if not tag_name:
|
|
raise ValueError("Tag name required.")
|
|
log_handler.log_info(
|
|
f"Executing delete tag '{tag_name}' in: {repo_path}", func_name=func_name
|
|
)
|
|
try:
|
|
# success = self.git_commands.delete_tag(repo_path, tag_name) # Metodo da implementare in GitCommands
|
|
# return success
|
|
log_handler.log_warning(
|
|
"execute_delete_tag not fully implemented.", func_name=func_name
|
|
)
|
|
raise NotImplementedError("Tag deletion logic needs implementation.")
|
|
except (GitCommandError, ValueError) as e:
|
|
log_handler.log_error(
|
|
f"Failed delete tag '{tag_name}': {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Failed delete tag '{tag_name}': {e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected tag deletion error") from e
|
|
|
|
def execute_untrack_files_from_gitignore(self, repo_path: str) -> bool:
|
|
"""
|
|
Checks tracked files against .gitignore rules and untracks matching files.
|
|
Creates a commit if files were untracked.
|
|
|
|
Args:
|
|
repo_path (str): Path to the repository.
|
|
|
|
Returns:
|
|
bool: True if files were untracked and committed, False otherwise.
|
|
|
|
Raises:
|
|
GitCommandError: If git commands fail.
|
|
Exception: For unexpected errors.
|
|
"""
|
|
func_name: str = "execute_untrack_files_from_gitignore"
|
|
log_handler.log_info(
|
|
f"Checking for tracked files matching .gitignore rules in '{repo_path}'...",
|
|
func_name=func_name,
|
|
)
|
|
# Map to store rule -> [list of files matching that rule]
|
|
rules_to_files_map: Dict[str, List[str]] = {}
|
|
# List to store all file paths that need untracking
|
|
all_files_to_untrack: List[str] = []
|
|
|
|
try:
|
|
# Get list of all currently tracked files
|
|
tracked_files: List[str] = self.git_commands.get_tracked_files(repo_path)
|
|
log_handler.log_debug(
|
|
f"Checking {len(tracked_files)} tracked files against .gitignore...",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# Iterate through tracked files
|
|
for file_path in tracked_files:
|
|
# Normalize path for consistency (optional but good practice)
|
|
norm_file_path: str = os.path.normpath(file_path)
|
|
# Skip the .gitignore file itself
|
|
if norm_file_path == ".gitignore":
|
|
continue
|
|
|
|
# Check if this file matches any rule in .gitignore
|
|
# get_matching_gitignore_rule returns the rule string or None
|
|
matching_rule: Optional[str] = (
|
|
self.git_commands.get_matching_gitignore_rule(repo_path, file_path)
|
|
)
|
|
|
|
if matching_rule is not None:
|
|
# File is tracked but matches a rule, should be untracked
|
|
log_handler.log_info(
|
|
f"Tracked file '{file_path}' matches gitignore rule: '{matching_rule}'. Queued for untracking.",
|
|
func_name=func_name,
|
|
)
|
|
# Add to map for commit message summary
|
|
if matching_rule not in rules_to_files_map:
|
|
rules_to_files_map[matching_rule] = []
|
|
rules_to_files_map[matching_rule].append(file_path)
|
|
# Add to the main list for the 'git rm --cached' command
|
|
all_files_to_untrack.append(file_path)
|
|
|
|
# --- Perform Untracking and Commit (if needed) ---
|
|
if all_files_to_untrack:
|
|
num_untrack: int = len(all_files_to_untrack)
|
|
log_handler.log_info(
|
|
f"Found {num_untrack} tracked file(s) matching .gitignore rules. Proceeding to untrack.",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# Execute 'git rm --cached' for the files found
|
|
self.git_commands.remove_from_tracking(repo_path, all_files_to_untrack)
|
|
log_handler.log_info(
|
|
f"Successfully removed {num_untrack} item(s) from Git tracking index.",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# Create a meaningful commit message summarizing the untracking
|
|
commit_message: str = (
|
|
"Chore: Stop tracking files based on .gitignore update.\n\n"
|
|
)
|
|
commit_message += "Untracked files matching the following rules:\n"
|
|
# Sort rules alphabetically for consistent commit messages
|
|
sorted_rules: List[str] = sorted(rules_to_files_map.keys())
|
|
for rule in sorted_rules:
|
|
count: int = len(rules_to_files_map[rule])
|
|
plural: str = "s" if count > 1 else ""
|
|
# Add bullet point for each rule and count
|
|
commit_message += f'- Rule "{rule}": {count} file{plural}\n'
|
|
# Optional: List actual filenames (can make message very long)
|
|
# for f_path in rules_to_files_map[rule][:5]: # Limit listed files
|
|
# commit_message += f" - {f_path}\n"
|
|
# if count > 5: commit_message += " - ... and more\n"
|
|
|
|
log_handler.log_info(
|
|
"Creating automatic commit for untracking changes.",
|
|
func_name=func_name,
|
|
)
|
|
log_handler.log_debug(
|
|
f"Commit message:\n{commit_message}", func_name=func_name
|
|
)
|
|
|
|
# Execute the commit
|
|
commit_made: bool = self.git_commands.git_commit(
|
|
repo_path, commit_message
|
|
)
|
|
|
|
if commit_made:
|
|
log_handler.log_info(
|
|
"Automatic untrack commit successful.", func_name=func_name
|
|
)
|
|
return True # Untracking and commit succeeded
|
|
else:
|
|
# Should not happen if 'git rm --cached' ran, but log warning
|
|
log_handler.log_warning(
|
|
"Untracking performed, but subsequent commit reported no changes to commit.",
|
|
func_name=func_name,
|
|
)
|
|
# Return False because the expected commit didn't happen
|
|
return False
|
|
else:
|
|
# No tracked files matched .gitignore rules
|
|
log_handler.log_info(
|
|
"No tracked files found matching .gitignore rules. No untracking action needed.",
|
|
func_name=func_name,
|
|
)
|
|
return False # No action was taken
|
|
|
|
except (GitCommandError, ValueError) as git_error:
|
|
log_handler.log_error(
|
|
f"Error during untracking operation: {git_error}", func_name=func_name
|
|
)
|
|
raise git_error # Re-raise Git-related errors
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error during untracking: {e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected untracking error") from e
|
|
|
|
def execute_checkout_tracking_branch(
|
|
self,
|
|
repo_path: str,
|
|
new_local_branch_name: str,
|
|
remote_tracking_branch_full_name: str,
|
|
) -> Dict[str, Any]: # Ritorna dizionario risultato
|
|
"""
|
|
Checks out a remote branch as a new local tracking branch.
|
|
Handles checks for uncommitted changes and potential errors.
|
|
|
|
Args:
|
|
repo_path (str): Path to the local repository.
|
|
new_local_branch_name (str): The name for the new local branch.
|
|
remote_tracking_branch_full_name (str): Full name ('remote/branch').
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing status ('success', 'error'),
|
|
message, and optionally an exception.
|
|
"""
|
|
func_name: str = "execute_checkout_tracking_branch"
|
|
log_handler.log_info(
|
|
f"Executing checkout tracking branch: Local='{new_local_branch_name}', "
|
|
f"Remote='{remote_tracking_branch_full_name}' in '{repo_path}'",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# --- Input Validation ---
|
|
if not repo_path or not os.path.isdir(repo_path):
|
|
raise ValueError(f"Invalid repository path: '{repo_path}'")
|
|
if not new_local_branch_name or new_local_branch_name.isspace():
|
|
raise ValueError("New local branch name cannot be empty.")
|
|
if (
|
|
not remote_tracking_branch_full_name
|
|
or "/" not in remote_tracking_branch_full_name
|
|
):
|
|
raise ValueError(
|
|
f"Invalid remote tracking branch name format: '{remote_tracking_branch_full_name}' (expecting 'remote/branch')"
|
|
)
|
|
if not os.path.exists(os.path.join(repo_path, ".git")):
|
|
raise ValueError(f"Directory '{repo_path}' is not a Git repository.")
|
|
|
|
# --- Pre-check: Uncommitted changes? ---
|
|
try:
|
|
if self.git_commands.git_status_has_changes(repo_path):
|
|
msg: str = (
|
|
f"Checkout aborted: Uncommitted changes detected. Please commit or stash first before checking out '{new_local_branch_name}'."
|
|
)
|
|
log_handler.log_warning(msg, func_name=func_name)
|
|
# Return error dictionary for the worker
|
|
return {"status": "error", "message": msg, "exception": ValueError(msg)}
|
|
except GitCommandError as status_err:
|
|
msg = f"Checkout aborted: Failed to check repository status: {status_err}"
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
return {"status": "error", "message": msg, "exception": status_err}
|
|
|
|
# --- Esecuzione Comando Git ---
|
|
result_info: Dict[str, Any] = {
|
|
"status": "unknown",
|
|
"message": "Checkout not completed.",
|
|
}
|
|
try:
|
|
# Chiama il comando specifico in GitCommands (che ha check=False)
|
|
checkout_result = self.git_commands.checkout_new_branch_from_remote(
|
|
working_directory=repo_path,
|
|
new_local_branch_name=new_local_branch_name,
|
|
remote_tracking_branch_full_name=remote_tracking_branch_full_name,
|
|
)
|
|
|
|
# Analizza il risultato
|
|
if checkout_result.returncode == 0:
|
|
result_info["status"] = "success"
|
|
result_info["message"] = (
|
|
f"Successfully checked out '{remote_tracking_branch_full_name}' as new local branch '{new_local_branch_name}'."
|
|
)
|
|
log_handler.log_info(result_info["message"], func_name=func_name)
|
|
else:
|
|
# Errore
|
|
result_info["status"] = "error"
|
|
stderr_full: str = (
|
|
checkout_result.stderr if checkout_result.stderr else ""
|
|
)
|
|
stderr_lower: str = stderr_full.lower()
|
|
log_handler.log_error(
|
|
f"Checkout tracking branch command failed (RC={checkout_result.returncode}). Stderr: {stderr_lower}",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# Controlla errori specifici
|
|
if (
|
|
f"a branch named '{new_local_branch_name.lower()}' already exists"
|
|
in stderr_lower
|
|
):
|
|
result_info["message"] = (
|
|
f"Checkout failed: A local branch named '{new_local_branch_name}' already exists."
|
|
)
|
|
elif (
|
|
"invalid object name" in stderr_lower
|
|
or "not a valid object name" in stderr_lower
|
|
):
|
|
result_info["message"] = (
|
|
f"Checkout failed: Remote branch '{remote_tracking_branch_full_name}' not found or invalid."
|
|
)
|
|
else:
|
|
result_info["message"] = (
|
|
f"Checkout tracking branch failed (RC={checkout_result.returncode}). Check logs."
|
|
)
|
|
|
|
result_info["exception"] = GitCommandError(
|
|
result_info["message"], stderr=checkout_result.stderr
|
|
)
|
|
|
|
except (GitCommandError, ValueError) as e:
|
|
# Errore dalla validazione o dal controllo stato
|
|
log_handler.log_error(
|
|
f"Error during checkout tracking branch setup: {e}", func_name=func_name
|
|
)
|
|
result_info = {
|
|
"status": "error",
|
|
"message": f"Checkout failed: {e}",
|
|
"exception": e,
|
|
}
|
|
except Exception as e:
|
|
# Errore imprevisto
|
|
log_handler.log_exception(
|
|
f"Unexpected error during checkout tracking branch: {e}",
|
|
func_name=func_name,
|
|
)
|
|
result_info = {
|
|
"status": "error",
|
|
"message": f"Unexpected checkout error: {type(e).__name__}",
|
|
"exception": e,
|
|
}
|
|
|
|
return result_info
|
|
|
|
def execute_delete_local_branch(
|
|
self, repo_path: str, branch_name: str, force: bool = False
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Deletes a local branch, performing safety checks.
|
|
|
|
Args:
|
|
repo_path (str): Path to the local repository.
|
|
branch_name (str): The name of the local branch to delete.
|
|
force (bool): If True, force delete even if unmerged (-D).
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing status, message, and optionally exception.
|
|
|
|
Raises:
|
|
ValueError: If input args are invalid or attempting to delete current branch.
|
|
"""
|
|
func_name: str = "execute_delete_local_branch"
|
|
action_type: str = "Force deleting" if force else "Deleting"
|
|
log_handler.log_info(
|
|
f"Executing: {action_type} local branch '{branch_name}' in '{repo_path}'",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# --- Input Validation & Safety Checks ---
|
|
if not repo_path or not os.path.isdir(repo_path):
|
|
raise ValueError(f"Invalid repository path: '{repo_path}'")
|
|
if not branch_name or branch_name.isspace():
|
|
raise ValueError("Branch name cannot be empty.")
|
|
if not os.path.exists(os.path.join(repo_path, ".git")):
|
|
raise ValueError(f"Directory '{repo_path}' is not a Git repository.")
|
|
|
|
# **Safety Check 1: Non cancellare il branch corrente**
|
|
try:
|
|
current_branch: Optional[str] = self.git_commands.get_current_branch_name(
|
|
repo_path
|
|
)
|
|
if current_branch == branch_name:
|
|
msg: str = (
|
|
f"Cannot delete the currently checked-out branch ('{branch_name}'). Switch to another branch first."
|
|
)
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
# Solleva ValueError qui perché è errore logico, gestito nel worker
|
|
raise ValueError(msg)
|
|
log_handler.log_debug(
|
|
f"Current branch is '{current_branch}', safe to delete '{branch_name}'.",
|
|
func_name=func_name,
|
|
)
|
|
except GitCommandError as e:
|
|
msg = f"Delete aborted: Could not determine current branch: {e}"
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
return {"status": "error", "message": msg, "exception": e}
|
|
except ValueError as e: # Cattura l'errore sollevato sopra
|
|
raise e # Rilancia il ValueError per il worker
|
|
|
|
# --- Esecuzione Comando Git ---
|
|
result_info: Dict[str, Any] = {
|
|
"status": "unknown",
|
|
"message": "Delete branch not completed.",
|
|
}
|
|
try:
|
|
# Chiama il comando specifico in GitCommands (check=False)
|
|
delete_result = self.git_commands.delete_local_branch(
|
|
working_directory=repo_path, branch_name=branch_name, force=force
|
|
)
|
|
|
|
# Analizza il risultato
|
|
if delete_result.returncode == 0:
|
|
result_info["status"] = "success"
|
|
result_info["message"] = (
|
|
f"Local branch '{branch_name}' deleted successfully."
|
|
)
|
|
log_handler.log_info(result_info["message"], func_name=func_name)
|
|
if delete_result.stderr:
|
|
log_handler.log_info(
|
|
f"Delete output: {delete_result.stderr.strip()}",
|
|
func_name=func_name,
|
|
)
|
|
else:
|
|
# Errore
|
|
result_info["status"] = "error"
|
|
stderr_full: str = delete_result.stderr if delete_result.stderr else ""
|
|
stderr_lower: str = stderr_full.lower()
|
|
log_handler.log_error(
|
|
f"Delete branch command failed (RC={delete_result.returncode}). Stderr: {stderr_lower}",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# Controlla errori specifici
|
|
if f"branch '{branch_name.lower()}' not found" in stderr_lower:
|
|
result_info["message"] = (
|
|
f"Delete failed: Branch '{branch_name}' not found."
|
|
)
|
|
elif "not fully merged" in stderr_lower:
|
|
result_info["message"] = (
|
|
f"Delete failed: Branch '{branch_name}' has unmerged changes. Use 'Force Delete' to discard them."
|
|
)
|
|
else:
|
|
result_info["message"] = (
|
|
f"Delete branch '{branch_name}' failed (RC={delete_result.returncode}). Check logs."
|
|
)
|
|
|
|
result_info["exception"] = GitCommandError(
|
|
result_info["message"], stderr=delete_result.stderr
|
|
)
|
|
|
|
except (GitCommandError, ValueError) as e:
|
|
# Errore dalla validazione o GitCommand
|
|
log_handler.log_error(
|
|
f"Error during delete branch setup/execution for '{branch_name}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
if isinstance(e, ValueError):
|
|
raise e # Rilancia ValueError per worker
|
|
result_info = {
|
|
"status": "error",
|
|
"message": f"Delete failed: {e}",
|
|
"exception": e,
|
|
}
|
|
except Exception as e:
|
|
# Errore imprevisto
|
|
log_handler.log_exception(
|
|
f"Unexpected error deleting local branch '{branch_name}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
result_info = {
|
|
"status": "error",
|
|
"message": f"Unexpected delete error: {type(e).__name__}",
|
|
"exception": e,
|
|
}
|
|
|
|
return result_info
|
|
|
|
def execute_merge_local_branch(
|
|
self,
|
|
repo_path: str,
|
|
branch_to_merge: str,
|
|
current_branch: str,
|
|
no_ff: bool = False,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Merges a specified local branch into the current local branch.
|
|
|
|
Args:
|
|
repo_path (str): Path to the local repository.
|
|
branch_to_merge (str): The name of the branch to merge FROM.
|
|
current_branch (str): The name of the branch being merged INTO (current).
|
|
no_ff (bool): If True, force a merge commit even if fast-forward is possible.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing status ('success', 'conflict', 'error'),
|
|
message, and optionally exception.
|
|
|
|
Raises:
|
|
ValueError: If input arguments are invalid or logical errors found.
|
|
"""
|
|
func_name: str = "execute_merge_local_branch"
|
|
log_handler.log_info(
|
|
f"Executing: Merge branch '{branch_to_merge}' into '{current_branch}' in '{repo_path}' (no_ff={no_ff})",
|
|
func_name=func_name,
|
|
)
|
|
|
|
# --- Input Validation & Safety Checks ---
|
|
if not repo_path or not os.path.isdir(repo_path):
|
|
raise ValueError(f"Invalid repository path: '{repo_path}'")
|
|
if not branch_to_merge or branch_to_merge.isspace():
|
|
raise ValueError("Branch to merge name cannot be empty.")
|
|
if not current_branch or current_branch.isspace():
|
|
raise ValueError("Current branch name cannot be empty.")
|
|
if branch_to_merge == current_branch:
|
|
raise ValueError("Cannot merge a branch into itself.")
|
|
if not os.path.exists(os.path.join(repo_path, ".git")):
|
|
raise ValueError(f"Directory '{repo_path}' is not a Git repository.")
|
|
|
|
# **Safety Check 1: Siamo sul branch corretto?**
|
|
try:
|
|
actual_current: Optional[str] = self.git_commands.get_current_branch_name(
|
|
repo_path
|
|
)
|
|
if actual_current != current_branch:
|
|
msg: str = (
|
|
f"Merge aborted: The current active branch is '{actual_current}', but expected '{current_branch}'. Please checkout the correct branch first."
|
|
)
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise ValueError(msg)
|
|
log_handler.log_debug("Current branch check passed.", func_name=func_name)
|
|
except GitCommandError as e:
|
|
msg = f"Merge aborted: Could not verify the current branch: {e}"
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
return {"status": "error", "message": msg, "exception": e}
|
|
except ValueError as e:
|
|
raise e # Rilancia per worker
|
|
|
|
# **Safety Check 2: Branch da mergiare esiste?**
|
|
try:
|
|
local_branches: List[str]
|
|
_
|
|
local_branches, _ = self.git_commands.list_branches(repo_path)
|
|
if branch_to_merge not in local_branches:
|
|
msg = f"Merge failed: Branch '{branch_to_merge}' not found in the local repository."
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
raise ValueError(msg)
|
|
log_handler.log_debug(
|
|
f"Branch to merge '{branch_to_merge}' exists.", func_name=func_name
|
|
)
|
|
except GitCommandError as e:
|
|
msg = f"Merge aborted: Could not verify if branch '{branch_to_merge}' exists: {e}"
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
return {"status": "error", "message": msg, "exception": e}
|
|
except ValueError as e:
|
|
raise e # Rilancia per worker
|
|
|
|
# **Safety Check 3: Modifiche non committate?**
|
|
try:
|
|
if self.git_commands.git_status_has_changes(repo_path):
|
|
msg = f"Merge aborted: Uncommitted changes detected in the working directory. Please commit or stash first before merging '{branch_to_merge}'."
|
|
log_handler.log_warning(msg, func_name=func_name)
|
|
# Restituisce errore qui, non sollevare ValueError
|
|
return {"status": "error", "message": msg, "exception": ValueError(msg)}
|
|
log_handler.log_debug(
|
|
"No uncommitted changes detected.", func_name=func_name
|
|
)
|
|
except GitCommandError as status_err:
|
|
msg = f"Merge aborted: Failed to check repository status before merge: {status_err}"
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
return {"status": "error", "message": msg, "exception": status_err}
|
|
|
|
# --- Esecuzione Comando Git Merge ---
|
|
result_info: Dict[str, Any] = {
|
|
"status": "unknown",
|
|
"message": "Merge not completed.",
|
|
}
|
|
try:
|
|
# Chiama git_merge (check=False)
|
|
merge_result = self.git_commands.git_merge(
|
|
working_directory=repo_path,
|
|
branch_to_merge=branch_to_merge,
|
|
commit_msg=None, # Usa default message
|
|
no_ff=no_ff,
|
|
)
|
|
|
|
# Analizza il risultato
|
|
stdout_full: str = merge_result.stdout if merge_result.stdout else ""
|
|
stderr_full: str = merge_result.stderr if merge_result.stderr else ""
|
|
combined_output_lower: str = (stdout_full + stderr_full).lower()
|
|
|
|
if merge_result.returncode == 0:
|
|
result_info["status"] = "success"
|
|
if "already up to date" in combined_output_lower:
|
|
result_info["message"] = (
|
|
f"Merge: Branch '{current_branch}' already up-to-date with '{branch_to_merge}'."
|
|
)
|
|
log_handler.log_info(
|
|
f"Merge successful (already up-to-date): '{branch_to_merge}' -> '{current_branch}'.",
|
|
func_name=func_name,
|
|
)
|
|
elif "fast-forward" in combined_output_lower:
|
|
result_info["message"] = (
|
|
f"Merge successful (Fast-forward): '{branch_to_merge}' into '{current_branch}'."
|
|
)
|
|
log_handler.log_info(result_info["message"], func_name=func_name)
|
|
else:
|
|
result_info["message"] = (
|
|
f"Merge successful: '{branch_to_merge}' merged into '{current_branch}'."
|
|
)
|
|
log_handler.log_info(
|
|
result_info["message"] + " Output logged.", func_name=func_name
|
|
)
|
|
|
|
elif merge_result.returncode == 1 and (
|
|
"conflict" in combined_output_lower
|
|
or "automatic merge failed" in combined_output_lower
|
|
or "fix conflicts and then commit the result" in combined_output_lower
|
|
):
|
|
result_info["status"] = "conflict" # Stato specifico
|
|
result_info["message"] = (
|
|
f"Merge conflict occurred while merging '{branch_to_merge}' into '{current_branch}'. Please resolve conflicts manually and commit."
|
|
)
|
|
log_handler.log_error(
|
|
f"Merge conflict detected: '{branch_to_merge}' -> '{current_branch}'.",
|
|
func_name=func_name,
|
|
)
|
|
else:
|
|
# Altro Errore
|
|
result_info["status"] = "error"
|
|
stderr_lower = stderr_full.lower()
|
|
log_handler.log_error(
|
|
f"Merge command failed (RC={merge_result.returncode}). Stderr: {stderr_lower}",
|
|
func_name=func_name,
|
|
)
|
|
if (
|
|
"not something we can merge" in stderr_lower
|
|
or f"'{branch_to_merge}' does not point to a commit" in stderr_lower
|
|
):
|
|
result_info["message"] = (
|
|
f"Merge failed: Branch '{branch_to_merge}' not found or invalid."
|
|
)
|
|
elif "is not possible to fast-forward" in stderr_lower:
|
|
result_info["message"] = (
|
|
f"Merge failed: Not a fast-forward (and --no-ff not used or failed)."
|
|
)
|
|
else:
|
|
result_info["message"] = (
|
|
f"Merge of '{branch_to_merge}' failed (RC={merge_result.returncode}). Check logs."
|
|
)
|
|
result_info["exception"] = GitCommandError(
|
|
result_info["message"], stderr=merge_result.stderr
|
|
)
|
|
|
|
except (GitCommandError, ValueError) as e:
|
|
# Errore dalla validazione o GitCommand
|
|
log_handler.log_error(
|
|
f"Error during merge setup/execution: {e}", func_name=func_name
|
|
)
|
|
if isinstance(e, ValueError):
|
|
raise e # Rilancia per worker
|
|
result_info = {
|
|
"status": "error",
|
|
"message": f"Merge failed: {e}",
|
|
"exception": e,
|
|
}
|
|
except Exception as e:
|
|
# Errore imprevisto
|
|
log_handler.log_exception(
|
|
f"Unexpected error during merge: {e}", func_name=func_name
|
|
)
|
|
result_info = {
|
|
"status": "error",
|
|
"message": f"Unexpected merge error: {type(e).__name__}",
|
|
"exception": e,
|
|
}
|
|
|
|
return result_info
|
|
|
|
|
|
# --- END OF FILE gitsync_tool/core/action_handler.py ---
|