640 lines
30 KiB
Python
640 lines
30 KiB
Python
# action_handler.py
|
|
import logging
|
|
import os
|
|
import datetime # Required for default commit messages
|
|
|
|
# Import dependencies (assuming original imports are correct)
|
|
from git_commands import GitCommands, GitCommandError
|
|
from backup_handler import BackupHandler
|
|
|
|
|
|
class ActionHandler:
|
|
"""
|
|
Handles the execution logic for core application actions.
|
|
Orchestrates calls to GitCommands and BackupHandler.
|
|
"""
|
|
|
|
def __init__(
|
|
self, logger, git_commands: GitCommands, backup_handler: BackupHandler
|
|
):
|
|
"""
|
|
Initializes the ActionHandler.
|
|
|
|
Args:
|
|
logger (logging.Logger or logging.LoggerAdapter): Logger instance.
|
|
git_commands (GitCommands): Instance for executing Git commands.
|
|
backup_handler (BackupHandler): Instance for handling backups.
|
|
"""
|
|
if not isinstance(logger, (logging.Logger, logging.LoggerAdapter)):
|
|
raise TypeError("ActionHandler requires a valid Logger or LoggerAdapter.")
|
|
if not isinstance(git_commands, GitCommands):
|
|
raise TypeError("ActionHandler requires a GitCommands instance.")
|
|
if not isinstance(backup_handler, BackupHandler):
|
|
raise TypeError("ActionHandler requires a BackupHandler instance.")
|
|
self.logger = logger
|
|
self.git_commands = git_commands
|
|
self.backup_handler = backup_handler
|
|
|
|
def _perform_backup_if_enabled(
|
|
self,
|
|
svn_path, # Path to the repository source
|
|
profile_name, # Name of the profile for logging/naming
|
|
autobackup_enabled, # Flag from settings
|
|
backup_base_dir, # Destination directory for backups
|
|
excluded_extensions, # Set of file extensions to exclude
|
|
excluded_dirs, # <<< MODIFICA: Ora riceve il set completo
|
|
):
|
|
"""
|
|
Performs backup using BackupHandler if enabled. Passes excluded directories.
|
|
|
|
Args:
|
|
svn_path (str): Path to the repository.
|
|
profile_name (str): Name of the profile (for backup naming).
|
|
autobackup_enabled (bool): Flag from settings.
|
|
backup_base_dir (str): Destination directory for backups.
|
|
excluded_extensions (set): File extensions to exclude (lowercase).
|
|
excluded_dirs (set): Directory base names to exclude (lowercase). <<< Tipo e nome aggiornati
|
|
|
|
Returns:
|
|
str or None: Path to created backup file, or None if disabled/failed/empty.
|
|
|
|
Raises:
|
|
IOError: If the backup process fails critically (re-raised from BackupHandler).
|
|
ValueError: If input arguments are invalid.
|
|
"""
|
|
if not autobackup_enabled:
|
|
self.logger.debug("Autobackup disabled, skipping backup.")
|
|
return None # Backup not needed
|
|
|
|
self.logger.info(f"Autobackup enabled for '{profile_name}'. Starting backup...")
|
|
# Log details including the excluded directories being used
|
|
self.logger.debug(f" Source: {svn_path}")
|
|
self.logger.debug(f" Destination: {backup_base_dir}")
|
|
self.logger.debug(f" Exclude Exts: {excluded_extensions}")
|
|
# --- MODIFICA: Log del set di directory escluse ---
|
|
self.logger.debug(f" Exclude Dirs: {excluded_dirs}")
|
|
# --- FINE MODIFICA ---
|
|
|
|
# Basic validation before calling handler
|
|
if not svn_path or not os.path.isdir(svn_path):
|
|
raise ValueError("Invalid svn_path for backup.")
|
|
if not backup_base_dir:
|
|
raise ValueError("backup_base_dir cannot be empty.")
|
|
if not isinstance(excluded_extensions, set):
|
|
raise TypeError("excluded_extensions must be a set.")
|
|
if not isinstance(excluded_dirs, set):
|
|
raise TypeError("excluded_dirs must be a set.") # <<< Valida tipo
|
|
|
|
try:
|
|
# Delegate backup creation to the BackupHandler instance
|
|
# Pass the excluded_dirs set to the `excluded_dirs_base` parameter of the handler.
|
|
backup_path = self.backup_handler.create_zip_backup(
|
|
source_repo_path=svn_path,
|
|
backup_base_dir=backup_base_dir,
|
|
profile_name=profile_name,
|
|
excluded_extensions=excluded_extensions,
|
|
# --- MODIFICA: Passaggio del set corretto ---
|
|
excluded_dirs_base=excluded_dirs,
|
|
# --- FINE MODIFICA ---
|
|
)
|
|
# Log success if backup handler doesn't raise an error and returns a path
|
|
if backup_path:
|
|
self.logger.info(f"Backup completed successfully: {backup_path}")
|
|
else:
|
|
self.logger.warning(
|
|
"Backup process finished but no backup file was generated (might be empty)."
|
|
)
|
|
return backup_path # Return path or None
|
|
|
|
except (
|
|
IOError,
|
|
ValueError,
|
|
PermissionError,
|
|
) as backup_e: # Catch specific errors from handler
|
|
# Log error and re-raise as IOError to signal critical failure
|
|
self.logger.error(f"Backup failed: {backup_e}", exc_info=True)
|
|
raise IOError(
|
|
f"Autobackup failed: {backup_e}"
|
|
) from backup_e # Standardize on IOError
|
|
except Exception as unexpected_e:
|
|
self.logger.exception(f"Unexpected error during backup: {unexpected_e}")
|
|
raise IOError(
|
|
f"Unexpected autobackup failure: {unexpected_e}"
|
|
) from unexpected_e
|
|
|
|
def execute_prepare_repo(self, svn_path):
|
|
"""Executes the 'prepare repository' action."""
|
|
# (No changes needed here for directory exclusion)
|
|
self.logger.info(f"Executing preparation for: {svn_path}")
|
|
if not svn_path or not os.path.isdir(svn_path):
|
|
raise ValueError("Invalid directory for preparation.")
|
|
git_dir = os.path.join(svn_path, ".git")
|
|
if os.path.exists(git_dir):
|
|
self.logger.warning("Repository is already prepared.")
|
|
raise ValueError("Repository is already prepared.")
|
|
try:
|
|
self.git_commands.prepare_svn_for_git(svn_path)
|
|
self.logger.info("Repository prepared successfully.")
|
|
return True
|
|
except (GitCommandError, ValueError, IOError) as e:
|
|
self.logger.error(f"Failed to prepare repository: {e}", exc_info=True)
|
|
raise e
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error preparing repository: {e}")
|
|
raise Exception("Unexpected preparation error") from e
|
|
|
|
def execute_create_bundle(
|
|
self,
|
|
svn_path,
|
|
bundle_full_path,
|
|
profile_name,
|
|
autobackup_enabled,
|
|
backup_base_dir,
|
|
autocommit_enabled,
|
|
commit_message,
|
|
excluded_extensions, # Set of excluded extensions
|
|
excluded_dirs, # <<< MODIFICA: Riceve il set completo di directory escluse
|
|
):
|
|
"""
|
|
Executes 'create bundle', including backup and commit logic.
|
|
Passes the correct set of excluded directories for backup.
|
|
|
|
Args:
|
|
svn_path (str): Validated path to repository.
|
|
bundle_full_path (str): Validated path for bundle file.
|
|
profile_name (str): Current profile name.
|
|
autobackup_enabled (bool): Whether to perform backup.
|
|
backup_base_dir (str): Validated base directory for backups.
|
|
autocommit_enabled (bool): Whether to perform autocommit.
|
|
commit_message (str): Commit message from GUI (for autocommit).
|
|
excluded_extensions (set): File extensions to exclude from backup.
|
|
excluded_dirs (set): Directory base names to exclude from backup. <<< Tipo aggiornato
|
|
|
|
Returns:
|
|
str or None: Path to created bundle on success, None if empty/not created.
|
|
Raises:
|
|
IOError: If backup fails critically.
|
|
GitCommandError/ValueError/Exception: If commit or bundle creation fails.
|
|
"""
|
|
self.logger.info(f"Executing 'Create Bundle' for profile '{profile_name}'...")
|
|
self.logger.debug(f" Repo: {svn_path}, Bundle: {bundle_full_path}")
|
|
self.logger.debug(
|
|
f" Autobackup: {autobackup_enabled}, Autocommit: {autocommit_enabled}"
|
|
)
|
|
# Log exclusion sets received
|
|
self.logger.debug(f" Backup Exclude Exts: {excluded_extensions}")
|
|
self.logger.debug(
|
|
f" Backup Exclude Dirs: {excluded_dirs}"
|
|
) # <<< Log del set ricevuto
|
|
|
|
# --- 1. Backup Step ---
|
|
# Call helper, passing all necessary info including the excluded_dirs set.
|
|
# Raises IOError on critical backup failure.
|
|
self._perform_backup_if_enabled(
|
|
svn_path,
|
|
profile_name,
|
|
autobackup_enabled,
|
|
backup_base_dir,
|
|
excluded_extensions,
|
|
excluded_dirs, # <<< Passa il set ricevuto
|
|
)
|
|
# Execution stops here if backup failed critically
|
|
|
|
# --- 2. Autocommit Step ---
|
|
if autocommit_enabled:
|
|
self.logger.info("Autocommit enabled. Checking for changes...")
|
|
try:
|
|
has_changes = self.git_commands.git_status_has_changes(svn_path)
|
|
if has_changes:
|
|
self.logger.info("Changes detected, performing autocommit...")
|
|
commit_msg_to_use = (
|
|
commit_message.strip()
|
|
if commit_message.strip()
|
|
else f"Autocommit '{profile_name}' before bundle - {datetime.datetime.now().strftime('%Y-%m-%d %H:%M')}"
|
|
)
|
|
self.logger.debug(
|
|
f"Using autocommit message: '{commit_msg_to_use}'"
|
|
)
|
|
commit_made = self.git_commands.git_commit(
|
|
svn_path, commit_msg_to_use
|
|
)
|
|
if commit_made:
|
|
self.logger.info("Autocommit successful.")
|
|
else:
|
|
self.logger.warning(
|
|
"Autocommit attempted, but no effective changes committed."
|
|
)
|
|
else:
|
|
self.logger.info("No changes detected for autocommit.")
|
|
except (GitCommandError, ValueError) as commit_e:
|
|
self.logger.error(f"Autocommit failed: {commit_e}", exc_info=True)
|
|
raise commit_e # Stop the process
|
|
except Exception as commit_e:
|
|
self.logger.exception(f"Unexpected error during autocommit: {commit_e}")
|
|
raise Exception("Unexpected autocommit error") from commit_e
|
|
|
|
# --- 3. Create Bundle Step ---
|
|
self.logger.info(f"Creating bundle file: {bundle_full_path}")
|
|
try:
|
|
# Execute command (handles empty bundle warning internally)
|
|
self.git_commands.create_git_bundle(svn_path, bundle_full_path)
|
|
bundle_exists = os.path.exists(bundle_full_path)
|
|
bundle_not_empty = bundle_exists and os.path.getsize(bundle_full_path) > 0
|
|
if bundle_exists and bundle_not_empty:
|
|
self.logger.info("Git bundle created successfully.")
|
|
return bundle_full_path
|
|
else:
|
|
self.logger.warning(
|
|
"Bundle file not created or is empty (repo might have no commits)."
|
|
)
|
|
if bundle_exists and not bundle_not_empty:
|
|
try:
|
|
os.remove(bundle_full_path)
|
|
self.logger.info("Removed empty bundle file.")
|
|
except OSError:
|
|
self.logger.warning("Could not remove empty bundle file.")
|
|
return None # Indicate non-fatal issue (empty bundle)
|
|
except (GitCommandError, ValueError) as bundle_e:
|
|
self.logger.error(f"Bundle creation failed: {bundle_e}", exc_info=True)
|
|
raise bundle_e
|
|
except Exception as bundle_e:
|
|
self.logger.exception(
|
|
f"Unexpected error during bundle creation: {bundle_e}"
|
|
)
|
|
raise Exception("Unexpected bundle creation error") from bundle_e
|
|
|
|
def execute_fetch_bundle(
|
|
self,
|
|
svn_path, # Path to the target repository directory
|
|
bundle_full_path, # Path to the bundle file to fetch/clone from
|
|
profile_name,
|
|
autobackup_enabled,
|
|
backup_base_dir,
|
|
excluded_extensions, # Set
|
|
excluded_dirs, # Set
|
|
):
|
|
"""
|
|
Executes the 'fetch bundle' action. If the target directory is not a
|
|
Git repository, it attempts to clone from the bundle instead. Otherwise,
|
|
it performs a fetch and merge. Includes backup logic for existing repos.
|
|
|
|
Args: See execute_create_bundle args, excluding commit related ones.
|
|
|
|
Returns:
|
|
bool: True on successful fetch/merge or clone.
|
|
Raises:
|
|
IOError: If backup fails critically, or clone destination is invalid.
|
|
FileNotFoundError: If bundle file not found.
|
|
GitCommandError/Exception: If fetch, merge, or clone fails.
|
|
"""
|
|
self.logger.info(
|
|
f"Executing 'Fetch from Bundle' for profile '{profile_name}'..."
|
|
)
|
|
self.logger.debug(f" Target Repo Dir: {svn_path}, Bundle: {bundle_full_path}")
|
|
self.logger.debug(f" Autobackup: {autobackup_enabled}")
|
|
# Log exclusion sets received
|
|
self.logger.debug(f" Backup Exclude Exts: {excluded_extensions}")
|
|
self.logger.debug(f" Backup Exclude Dirs: {excluded_dirs}")
|
|
|
|
# --- 0. Validate Bundle File Existence ---
|
|
if not os.path.isfile(bundle_full_path):
|
|
msg = f"Bundle file not found: '{bundle_full_path}'"
|
|
self.logger.error(msg)
|
|
raise FileNotFoundError(msg) # Fail fast
|
|
|
|
# --- MODIFICA: Distinguere tra Clone e Fetch/Merge ---
|
|
git_dir_path = os.path.join(svn_path, ".git")
|
|
is_existing_repo = os.path.exists(git_dir_path)
|
|
self.logger.debug(f"Target directory '{svn_path}' is existing repo: {is_existing_repo}")
|
|
|
|
if is_existing_repo:
|
|
# --- Scenario: Fetch/Merge su Repository Esistente ---
|
|
self.logger.info("Target is an existing Git repository. Proceeding with Fetch and Merge.")
|
|
|
|
# --- 1. Backup Step (Solo per repo esistenti) ---
|
|
# Call helper, passing the excluded_dirs set. Raises IOError on critical failure.
|
|
self._perform_backup_if_enabled(
|
|
svn_path,
|
|
profile_name,
|
|
autobackup_enabled,
|
|
backup_base_dir,
|
|
excluded_extensions,
|
|
excluded_dirs,
|
|
)
|
|
# Execution stops here if backup failed critically
|
|
|
|
# --- 2. Fetch and Merge Step ---
|
|
self.logger.info(f"Fetching into '{svn_path}' from: {bundle_full_path}")
|
|
try:
|
|
# Delegate to GitCommands; uses --allow-unrelated-histories if modified earlier
|
|
self.git_commands.fetch_from_git_bundle(svn_path, bundle_full_path)
|
|
self.logger.info(
|
|
"Fetch/merge process completed successfully (or repo already up-to-date)."
|
|
)
|
|
return True # Indicate success
|
|
except GitCommandError as fetch_e:
|
|
# Log and re-raise Git command errors (like conflicts)
|
|
self.logger.error(f"Fetch/merge failed: {fetch_e}", exc_info=True)
|
|
raise fetch_e # Preserve original error for UI handling
|
|
except Exception as fetch_e:
|
|
# Handle other unexpected errors during fetch/merge
|
|
self.logger.exception(f"Unexpected error during fetch/merge: {fetch_e}")
|
|
raise Exception("Unexpected fetch/merge error") from fetch_e
|
|
|
|
else:
|
|
# --- Scenario: Clonazione da Bundle su Nuova Directory ---
|
|
self.logger.info("Target is not a Git repository. Attempting to clone from bundle.")
|
|
# No backup needed when cloning
|
|
|
|
try:
|
|
# Call the new clone function in GitCommands
|
|
success = self.git_commands.clone_from_bundle(bundle_full_path, svn_path)
|
|
if success:
|
|
self.logger.info(f"Successfully cloned repository into '{svn_path}'.")
|
|
return True
|
|
else:
|
|
# This case should ideally not happen if clone_from_bundle raises exceptions on failure
|
|
self.logger.error(f"Cloning process for '{svn_path}' reported failure unexpectedly.")
|
|
raise Exception(f"Cloning into '{svn_path}' failed.")
|
|
except (GitCommandError, ValueError, FileNotFoundError, IOError) as clone_e:
|
|
# Log and re-raise specific errors from clone function
|
|
self.logger.error(f"Cloning from bundle failed: {clone_e}", exc_info=True)
|
|
raise clone_e # Re-raise for UI handling
|
|
except Exception as clone_e:
|
|
# Handle other unexpected errors during clone
|
|
self.logger.exception(f"Unexpected error during clone from bundle: {clone_e}")
|
|
raise Exception("Unexpected clone error") from clone_e
|
|
|
|
# --- Other execute_... methods (Commit, Tag, Branch) ---
|
|
# No changes needed in these methods specifically for the directory exclusion feature,
|
|
# as they don't involve the backup process directly.
|
|
# Ensure they maintain consistent logging and error handling as updated previously.
|
|
|
|
def execute_manual_commit(self, svn_path, commit_message):
|
|
"""Executes a manual commit with the provided message."""
|
|
# (Keep original or improved version - no backup involved)
|
|
if not commit_message or commit_message.isspace():
|
|
raise ValueError("Commit message cannot be empty.")
|
|
self.logger.info(f"Executing manual commit for: {svn_path}")
|
|
try:
|
|
commit_made = self.git_commands.git_commit(svn_path, commit_message)
|
|
if commit_made:
|
|
self.logger.info("Manual commit successful.")
|
|
else:
|
|
self.logger.info("Manual commit: No changes were available to commit.")
|
|
return commit_made
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Manual commit failed: {e}", exc_info=True)
|
|
raise e
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected manual commit error: {e}")
|
|
raise Exception("Unexpected manual commit error") from e
|
|
|
|
def execute_create_tag(self, svn_path, commit_message, tag_name, tag_message):
|
|
"""Executes tag creation, including pre-commit using commit_message if needed."""
|
|
# (Keep original or improved version - no backup involved)
|
|
if not tag_message or tag_message.isspace():
|
|
raise ValueError("Tag annotation message cannot be empty.")
|
|
self.logger.info(f"Executing create tag '{tag_name}' for: {svn_path}")
|
|
try: # Pre-commit
|
|
has_changes = self.git_commands.git_status_has_changes(svn_path)
|
|
if has_changes:
|
|
self.logger.info("Uncommitted changes detected before tagging.")
|
|
if not commit_message or commit_message.isspace():
|
|
raise ValueError(
|
|
"Changes exist. Commit message required before tagging."
|
|
)
|
|
self.logger.info(f"Performing pre-tag commit: '{commit_message}'")
|
|
self.git_commands.git_commit(svn_path, commit_message)
|
|
else:
|
|
self.logger.info("No uncommitted changes detected.")
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Pre-tag commit error: {e}", exc_info=True)
|
|
raise e
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected pre-tag commit error: {e}")
|
|
raise Exception("Unexpected pre-tag commit error") from e
|
|
try: # Create Tag
|
|
self.logger.info(f"Proceeding to create tag '{tag_name}'...")
|
|
self.git_commands.create_tag(svn_path, tag_name, tag_message)
|
|
self.logger.info(f"Tag '{tag_name}' created successfully.")
|
|
return True
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Failed create tag '{tag_name}': {e}", exc_info=True)
|
|
raise e
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected tag error: {e}")
|
|
raise Exception("Unexpected tag creation error") from e
|
|
|
|
def execute_checkout_tag(self, svn_path, tag_name):
|
|
"""Executes checkout for the specified tag after checking changes."""
|
|
# (Keep original or improved version - no backup involved)
|
|
if not tag_name:
|
|
raise ValueError("Tag name required for checkout.")
|
|
self.logger.info(f"Executing checkout tag '{tag_name}' in: {svn_path}")
|
|
try: # Check changes
|
|
if self.git_commands.git_status_has_changes(svn_path):
|
|
raise ValueError("Uncommitted changes exist. Commit or stash first.")
|
|
self.logger.debug("No uncommitted changes found.")
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Status check error before checkout: {e}", exc_info=True)
|
|
raise e
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected status check error: {e}")
|
|
raise Exception("Unexpected status check error") from e
|
|
try: # Checkout
|
|
checkout_success = self.git_commands.checkout_tag(svn_path, tag_name)
|
|
if checkout_success:
|
|
self.logger.info(f"Tag '{tag_name}' checked out (Detached HEAD).")
|
|
return True
|
|
else:
|
|
raise GitCommandError(
|
|
"Checkout failed for unknown reason."
|
|
) # Should not happen
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Failed checkout tag '{tag_name}': {e}", exc_info=True)
|
|
raise e
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected checkout error: {e}")
|
|
raise Exception("Unexpected tag checkout error") from e
|
|
|
|
def execute_create_branch(self, svn_path, branch_name):
|
|
"""Executes branch creation."""
|
|
# (Keep original or improved version - no backup involved)
|
|
self.logger.info(f"Executing create branch '{branch_name}' in: {svn_path}")
|
|
try:
|
|
success = self.git_commands.create_branch(svn_path, branch_name)
|
|
if success:
|
|
self.logger.info(f"Branch '{branch_name}' created successfully.")
|
|
return True
|
|
else:
|
|
raise GitCommandError(
|
|
f"Branch creation '{branch_name}' failed unexpectedly."
|
|
)
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Failed create branch: {e}", exc_info=True)
|
|
raise e
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected create branch error: {e}")
|
|
raise Exception("Unexpected branch creation error") from e
|
|
|
|
def execute_switch_branch(self, svn_path, branch_name):
|
|
"""Executes branch switch after checking for changes."""
|
|
# (Keep original or improved version - no backup involved)
|
|
if not branch_name:
|
|
raise ValueError("Branch name required for switch.")
|
|
self.logger.info(f"Executing switch to branch '{branch_name}'.")
|
|
try: # Check changes
|
|
if self.git_commands.git_status_has_changes(svn_path):
|
|
raise ValueError("Uncommitted changes exist. Commit or stash first.")
|
|
self.logger.debug("No uncommitted changes found.")
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Status check error before switch: {e}", exc_info=True)
|
|
raise e
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected status check error: {e}")
|
|
raise Exception("Unexpected status check error") from e
|
|
try: # Switch
|
|
success = self.git_commands.checkout_branch(svn_path, branch_name)
|
|
if success:
|
|
self.logger.info(f"Switched to branch '{branch_name}'.")
|
|
return True
|
|
else:
|
|
raise GitCommandError(
|
|
f"Switch to branch '{branch_name}' failed unexpectedly."
|
|
)
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(
|
|
f"Failed switch to branch '{branch_name}': {e}", exc_info=True
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected switch branch error: {e}")
|
|
raise Exception("Unexpected branch switch error") from e
|
|
|
|
def execute_delete_branch(self, svn_path, branch_name, force=False):
|
|
"""Executes branch deletion."""
|
|
# (Keep original placeholder - no backup involved)
|
|
if not branch_name:
|
|
raise ValueError("Branch name required for delete.")
|
|
self.logger.info(f"Executing delete branch '{branch_name}' (force={force}).")
|
|
try:
|
|
success = self.git_commands.delete_branch(svn_path, branch_name, force)
|
|
return success
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(
|
|
f"Failed delete branch '{branch_name}': {e}", exc_info=True
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
self.logger.exception(f"Failed delete branch '{branch_name}': {e}")
|
|
raise Exception("Unexpected branch deletion error") from e
|
|
|
|
def execute_delete_tag(self, svn_path, tag_name):
|
|
"""Executes deletion for the specified tag."""
|
|
# (Keep original placeholder - no backup involved)
|
|
if not tag_name:
|
|
raise ValueError("Tag name required for deletion.")
|
|
self.logger.info(f"Executing delete tag '{tag_name}' in: {svn_path}")
|
|
try:
|
|
success = self.git_commands.delete_tag(svn_path, tag_name)
|
|
return success
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Failed to delete tag '{tag_name}': {e}", exc_info=True)
|
|
raise e
|
|
except Exception as e:
|
|
self.logger.exception(f"Failed to delete tag '{tag_name}': {e}")
|
|
raise Exception("Unexpected tag deletion error") from e
|
|
|
|
def execute_untrack_files_from_gitignore(self, svn_path):
|
|
"""
|
|
Checks tracked files against current .gitignore rules, untracks newly
|
|
ignored files, and creates a commit summarizing which rules caused
|
|
the untracking.
|
|
|
|
Args:
|
|
svn_path (str): Path to the repository.
|
|
|
|
Returns:
|
|
bool: True if files were untracked and committed, False otherwise.
|
|
|
|
Raises:
|
|
GitCommandError/ValueError/Exception: If any Git operation fails.
|
|
"""
|
|
self.logger.info(f"Checking for tracked files to untrack based on .gitignore in '{svn_path}'...")
|
|
|
|
# --- MODIFICA: Usa un dizionario per raggruppare file per regola ---
|
|
# Key: matching pattern (rule), Value: list of file paths matched by that rule
|
|
rules_to_files_map = {}
|
|
# Lista piatta per il comando git rm
|
|
all_files_to_untrack = []
|
|
# --- FINE MODIFICA ---
|
|
|
|
try:
|
|
# 1. Get all currently tracked files
|
|
tracked_files = self.git_commands.get_tracked_files(svn_path)
|
|
|
|
# 2. Check each tracked file for matching ignore rules
|
|
self.logger.debug(f"Checking {len(tracked_files)} tracked files against ignore rules...")
|
|
for file_path in tracked_files:
|
|
# Skip checks for the .gitignore file itself
|
|
norm_file_path = os.path.normpath(file_path)
|
|
if norm_file_path == '.gitignore':
|
|
continue
|
|
|
|
# --- MODIFICA: Ottieni la regola corrispondente ---
|
|
# Check if the file *would* be ignored and get the rule
|
|
matching_rule = self.git_commands.get_matching_gitignore_rule(svn_path, file_path)
|
|
# --- FINE MODIFICA ---
|
|
|
|
if matching_rule is not None:
|
|
self.logger.info(f"Tracked file '{file_path}' now matches ignore rule: '{matching_rule}'")
|
|
# --- MODIFICA: Popola il dizionario e la lista ---
|
|
if matching_rule not in rules_to_files_map:
|
|
rules_to_files_map[matching_rule] = []
|
|
rules_to_files_map[matching_rule].append(file_path)
|
|
all_files_to_untrack.append(file_path)
|
|
# --- FINE MODIFICA ---
|
|
|
|
# 3. If files need untracking, perform git rm --cached and commit
|
|
# --- MODIFICA: Controlla la lista piatta ---
|
|
if all_files_to_untrack:
|
|
# --- FINE MODIFICA ---
|
|
self.logger.info(f"Found {len(all_files_to_untrack)} tracked files that are now ignored.")
|
|
|
|
# 3a. Untrack the files (remove from index) using the flat list
|
|
self.git_commands.remove_from_tracking(svn_path, all_files_to_untrack)
|
|
|
|
# --- MODIFICA: Crea messaggio di commit riassuntivo basato sulle regole ---
|
|
# 3b. Create an automatic commit message summarizing by rule
|
|
commit_message = "Chore: Stop tracking files based on .gitignore update.\n\nSummary:\n"
|
|
# Ordina le regole per leggibilità (opzionale)
|
|
sorted_rules = sorted(rules_to_files_map.keys())
|
|
for rule in sorted_rules:
|
|
file_count = len(rules_to_files_map[rule])
|
|
commit_message += f"- Rule \"{rule}\" untracked {file_count} file(s).\n"
|
|
# --- FINE MODIFICA ---
|
|
|
|
self.logger.info("Creating automatic commit for untracking changes.")
|
|
self.logger.debug(f"Commit message:\n{commit_message}") # Log message multi-line
|
|
|
|
# 3c. Perform the commit
|
|
commit_made = self.git_commands.git_commit(svn_path, commit_message)
|
|
|
|
if commit_made:
|
|
self.logger.info("Automatic commit successful.")
|
|
return True
|
|
else:
|
|
self.logger.warning("Untracking performed, but automatic commit reported no changes.")
|
|
return False
|
|
else:
|
|
self.logger.info("No tracked files found matching current .gitignore rules. No action needed.")
|
|
return False
|
|
|
|
except (GitCommandError, ValueError) as e:
|
|
self.logger.error(f"Error during automatic untracking process: {e}", exc_info=True)
|
|
raise
|
|
except Exception as e:
|
|
self.logger.exception(f"Unexpected error during automatic untracking: {e}")
|
|
raise Exception("Unexpected untracking error") from e
|