# --- START OF FILE action_handler.py --- # action_handler.py # Rimosso import logging standard import os import datetime # Importa il nuovo gestore della coda log import log_handler # Importa le dipendenze necessarie from git_commands import GitCommands, GitCommandError from backup_handler import BackupHandler class ActionHandler: """ Handles the execution logic for core application actions. Orchestrates calls to GitCommands and BackupHandler. Methods are executed synchronously but are intended to be called from worker threads managed by the main application controller. Uses log_handler to send log messages to a central queue. """ # Rimosso logger dal costruttore def __init__(self, git_commands: GitCommands, backup_handler: BackupHandler): """ Initializes the ActionHandler. Args: git_commands (GitCommands): Instance for executing Git commands. backup_handler (BackupHandler): Instance for handling backups. Raises: TypeError: If provided arguments are not of the expected types. """ # Validazione tipi input if not isinstance(git_commands, GitCommands): raise TypeError("ActionHandler requires a GitCommands instance.") if not isinstance(backup_handler, BackupHandler): raise TypeError("ActionHandler requires a BackupHandler instance.") self.git_commands = git_commands self.backup_handler = backup_handler # Log tramite il nuovo handler log_handler.log_debug("ActionHandler initialized.", func_name="__init__") def _perform_backup_if_enabled( self, repo_path: str, profile_name: str, autobackup_enabled: bool, backup_base_dir: str, excluded_extensions: set, excluded_dirs: set, ): """ Performs backup using BackupHandler if enabled. Uses log_handler. Args: repo_path (str): Path to the repository. profile_name (str): Name of the profile (for backup naming). autobackup_enabled (bool): Flag from settings. backup_base_dir (str): Destination directory for backups. excluded_extensions (set): File extensions to exclude (lowercase). excluded_dirs (set): Directory base names to exclude (lowercase). Returns: str | None: Path to created backup file, or None if disabled/failed/empty. Raises: IOError: If the backup process fails critically. ValueError: If input arguments are invalid. """ func_name = "_perform_backup_if_enabled" # Nome funzione per log if not autobackup_enabled: log_handler.log_debug( f"Autobackup disabled for '{profile_name}', skipping.", func_name=func_name, ) return None log_handler.log_info( f"Autobackup enabled for '{profile_name}'. Starting backup...", func_name=func_name, ) log_handler.log_debug(f" Source: {repo_path}", func_name=func_name) log_handler.log_debug(f" Destination: {backup_base_dir}", func_name=func_name) log_handler.log_debug( f" Exclude Exts: {excluded_extensions}", func_name=func_name ) log_handler.log_debug(f" Exclude Dirs: {excluded_dirs}", func_name=func_name) # Validation if not repo_path or not os.path.isdir(repo_path): raise ValueError(f"Invalid repo_path for backup: '{repo_path}'") if not backup_base_dir: raise ValueError("backup_base_dir cannot be empty.") if not isinstance(excluded_extensions, set): raise TypeError("excluded_extensions must be a set.") if not isinstance(excluded_dirs, set): raise TypeError("excluded_dirs must be a set.") # Delegate to BackupHandler try: backup_path = self.backup_handler.create_zip_backup( source_repo_path=repo_path, backup_base_dir=backup_base_dir, profile_name=profile_name, excluded_extensions=excluded_extensions, excluded_dirs_base=excluded_dirs, ) if backup_path: log_handler.log_info( f"Backup completed successfully: {backup_path}", func_name=func_name ) else: log_handler.log_warning( "Backup finished but no file generated.", func_name=func_name ) return backup_path except (IOError, ValueError, PermissionError) as backup_e: # Usa log_error invece di logger.error log_handler.log_error( f"Backup failed for '{profile_name}': {backup_e}", func_name=func_name ) raise IOError( f"Autobackup failed for '{profile_name}': {backup_e}" ) from backup_e except Exception as unexpected_e: # Usa log_exception per errori imprevisti (include traceback su console) log_handler.log_exception( f"Unexpected error during backup for '{profile_name}': {unexpected_e}", func_name=func_name, ) raise IOError( f"Unexpected autobackup failure: {unexpected_e}" ) from unexpected_e def execute_prepare_repo(self, repo_path: str): """Executes 'prepare repository' action. Uses log_handler.""" func_name = "execute_prepare_repo" log_handler.log_info( f"Executing preparation for repository: {repo_path}", func_name=func_name ) if not repo_path or not os.path.isdir(repo_path): raise ValueError(f"Invalid directory for preparation: '{repo_path}'") git_dir = os.path.join(repo_path, ".git") if os.path.exists(git_dir): log_handler.log_warning( f"Repo at '{repo_path}' already prepared.", func_name=func_name ) raise ValueError("Repository is already prepared.") try: self.git_commands.prepare_svn_for_git(repo_path) log_handler.log_info( f"Repository prepared successfully: {repo_path}", func_name=func_name ) return True except (GitCommandError, ValueError, IOError) as e: log_handler.log_error( f"Failed to prepare repo '{repo_path}': {e}", func_name=func_name ) raise e except Exception as e: log_handler.log_exception( f"Unexpected error preparing repo '{repo_path}': {e}", func_name=func_name, ) raise Exception(f"Unexpected preparation error in '{repo_path}'") from e def execute_create_bundle( self, repo_path: str, bundle_full_path: str, profile_name: str, autobackup_enabled: bool, backup_base_dir: str, autocommit_enabled: bool, commit_message: str, excluded_extensions: set, excluded_dirs: set, ): """Executes 'create bundle'. Uses log_handler.""" func_name = "execute_create_bundle" log_handler.log_info( f"Executing 'Create Bundle' for profile '{profile_name}'...", func_name=func_name, ) log_handler.log_debug(f" Repo: {repo_path}", func_name=func_name) log_handler.log_debug(f" Bundle: {bundle_full_path}", func_name=func_name) log_handler.log_debug( f" Autobackup: {autobackup_enabled}, Backup Dir: {backup_base_dir}", func_name=func_name, ) log_handler.log_debug( f" Autocommit: {autocommit_enabled}", func_name=func_name ) log_handler.log_debug( f" Backup Exclude Exts: {excluded_extensions}", func_name=func_name ) log_handler.log_debug( f" Backup Exclude Dirs: {excluded_dirs}", func_name=func_name ) # --- 1. Backup Step --- # (Chiama _perform_backup_if_enabled che ora usa log_handler) self._perform_backup_if_enabled( repo_path, profile_name, autobackup_enabled, backup_base_dir, excluded_extensions, excluded_dirs, ) # --- 2. Autocommit Step --- if autocommit_enabled: log_handler.log_info( "Autocommit enabled. Checking for changes...", func_name=func_name ) try: has_changes = self.git_commands.git_status_has_changes(repo_path) if has_changes: log_handler.log_info( "Changes detected, performing autocommit...", func_name=func_name, ) commit_msg_to_use = commit_message.strip() if not commit_msg_to_use: now_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") commit_msg_to_use = ( f"Autocommit '{profile_name}' before bundle - {now_str}" ) log_handler.log_debug( f"Using autocommit message: '{commit_msg_to_use}'", func_name=func_name, ) commit_made = self.git_commands.git_commit( repo_path, commit_msg_to_use ) if commit_made: log_handler.log_info( "Autocommit successful.", func_name=func_name ) else: log_handler.log_warning( "Autocommit attempted, but no effective changes committed.", func_name=func_name, ) else: log_handler.log_info( "No changes detected, skipping autocommit.", func_name=func_name ) except (GitCommandError, ValueError) as commit_e: log_handler.log_error( f"Autocommit failed for '{profile_name}': {commit_e}", func_name=func_name, ) raise commit_e except Exception as commit_e: log_handler.log_exception( f"Unexpected error during autocommit for '{profile_name}': {commit_e}", func_name=func_name, ) raise Exception("Unexpected autocommit error") from commit_e else: log_handler.log_debug( "Autocommit disabled, skipping commit step.", func_name=func_name ) # --- 3. Create Bundle Step --- log_handler.log_info( f"Creating bundle file: {bundle_full_path}", func_name=func_name ) try: self.git_commands.create_git_bundle(repo_path, bundle_full_path) bundle_exists = os.path.exists(bundle_full_path) bundle_not_empty = bundle_exists and os.path.getsize(bundle_full_path) > 0 if bundle_exists and bundle_not_empty: log_handler.log_info( "Git bundle created successfully.", func_name=func_name ) return bundle_full_path else: log_handler.log_warning( f"Bundle file '{bundle_full_path}' not created or empty.", func_name=func_name, ) if bundle_exists and not bundle_not_empty: try: os.remove(bundle_full_path) log_handler.log_info( f"Removed empty bundle: {bundle_full_path}", func_name=func_name, ) except OSError as rm_err: log_handler.log_warning( f"Could not remove empty bundle: {rm_err}", func_name=func_name, ) return None except (GitCommandError, ValueError) as bundle_e: log_handler.log_error( f"Bundle creation failed for '{profile_name}': {bundle_e}", func_name=func_name, ) raise bundle_e except Exception as bundle_e: log_handler.log_exception( f"Unexpected error during bundle creation for '{profile_name}': {bundle_e}", func_name=func_name, ) raise Exception("Unexpected bundle creation error") from bundle_e def execute_fetch_bundle( self, target_repo_path_str: str, bundle_full_path: str, profile_name: str, autobackup_enabled: bool, backup_base_dir: str, excluded_extensions: set, excluded_dirs: set, ): """Executes 'fetch bundle' (clone or fetch/merge). Uses log_handler.""" func_name = "execute_fetch_bundle" log_handler.log_info( f"Executing 'Fetch from Bundle' for profile '{profile_name}'...", func_name=func_name, ) log_handler.log_debug( f" Target Dir: {target_repo_path_str}", func_name=func_name ) log_handler.log_debug(f" Bundle File: {bundle_full_path}", func_name=func_name) log_handler.log_debug( f" Autobackup: {autobackup_enabled}", func_name=func_name ) # ... (altri log debug) ... if not os.path.isfile(bundle_full_path): error_msg = f"Bundle file not found: '{bundle_full_path}'" log_handler.log_error(error_msg, func_name=func_name) raise FileNotFoundError(error_msg) git_dir_path = os.path.join(target_repo_path_str, ".git") is_existing_repo = os.path.isdir(target_repo_path_str) and os.path.exists( git_dir_path ) log_handler.log_debug( f"Target is existing repo: {is_existing_repo}", func_name=func_name ) if is_existing_repo: log_handler.log_info( "Target is existing repo. Fetching/Merging...", func_name=func_name ) # --- 1. Backup Step --- self._perform_backup_if_enabled( target_repo_path_str, profile_name, autobackup_enabled, backup_base_dir, excluded_extensions, excluded_dirs, ) # --- 2. Fetch/Merge Step --- log_handler.log_info( f"Fetching into '{target_repo_path_str}'...", func_name=func_name ) try: self.git_commands.fetch_from_git_bundle( target_repo_path_str, bundle_full_path ) log_handler.log_info( "Fetch/merge process completed successfully.", func_name=func_name ) return True except GitCommandError as fetch_e: log_handler.log_error( f"Fetch/merge failed: {fetch_e}", func_name=func_name ) raise fetch_e # Re-raise original error except Exception as fetch_e: log_handler.log_exception( f"Unexpected fetch/merge error: {fetch_e}", func_name=func_name ) raise Exception("Unexpected fetch/merge error") from fetch_e else: log_handler.log_info( f"Target '{target_repo_path_str}' not repo. Cloning...", func_name=func_name, ) try: success = self.git_commands.clone_from_bundle( bundle_full_path, target_repo_path_str ) if success: log_handler.log_info( f"Cloned successfully into '{target_repo_path_str}'.", func_name=func_name, ) return True else: log_handler.log_error( "Cloning reported failure unexpectedly.", func_name=func_name ) raise Exception( f"Cloning into '{target_repo_path_str}' failed unexpectedly." ) except (GitCommandError, ValueError, FileNotFoundError, IOError) as clone_e: log_handler.log_error(f"Cloning failed: {clone_e}", func_name=func_name) raise clone_e # Re-raise original error except Exception as clone_e: log_handler.log_exception( f"Unexpected clone error: {clone_e}", func_name=func_name ) raise Exception("Unexpected clone error") from clone_e # --- Altri Metodi (commit, tag, branch, untrack) --- # (Modificare tutte le chiamate a self.logger.* in log_handler.log_* ) def execute_manual_commit(self, repo_path: str, commit_message: str): func_name = "execute_manual_commit" log_handler.log_info( f"Executing manual commit for: {repo_path}", func_name=func_name ) if not commit_message or commit_message.isspace(): raise ValueError("Commit message cannot be empty.") try: commit_made = self.git_commands.git_commit(repo_path, commit_message) if commit_made: log_handler.log_info("Manual commit successful.", func_name=func_name) else: log_handler.log_info( "Manual commit: No changes available.", func_name=func_name ) return commit_made except (GitCommandError, ValueError) as e: log_handler.log_error(f"Manual commit failed: {e}", func_name=func_name) raise e except Exception as e: log_handler.log_exception( f"Unexpected manual commit error: {e}", func_name=func_name ) raise Exception("Unexpected commit error") from e def execute_create_tag( self, repo_path: str, ignored: None, tag_name: str, tag_message: str ): func_name = "execute_create_tag" if not tag_name or tag_name.isspace(): raise ValueError("Tag name cannot be empty.") if not tag_message or tag_message.isspace(): raise ValueError("Tag message cannot be empty.") log_handler.log_info( f"Executing create tag '{tag_name}' for: {repo_path}", func_name=func_name ) log_handler.log_info( "Attempting pre-tag commit using tag message...", func_name=func_name ) try: commit_made = self.git_commands.git_commit(repo_path, tag_message) if commit_made: log_handler.log_info( f"Pre-tag commit successful: '{tag_message}'", func_name=func_name ) else: log_handler.log_info( "No changes for pre-tag commit.", func_name=func_name ) log_handler.log_info( f"Creating annotated tag '{tag_name}'...", func_name=func_name ) self.git_commands.create_tag(repo_path, tag_name, tag_message) log_handler.log_info( f"Tag '{tag_name}' created successfully.", func_name=func_name ) return True except (GitCommandError, ValueError) as e: log_handler.log_error( f"Failed commit or create tag '{tag_name}': {e}", func_name=func_name ) raise e except Exception as e: log_handler.log_exception( f"Unexpected error creating tag '{tag_name}': {e}", func_name=func_name ) raise Exception(f"Unexpected error creating tag '{tag_name}'") from e def execute_checkout_tag(self, repo_path: str, tag_name: str): func_name = "execute_checkout_tag" if not tag_name: raise ValueError("Tag name required.") log_handler.log_info( f"Executing checkout tag '{tag_name}' in: {repo_path}", func_name=func_name ) try: # Check changes if self.git_commands.git_status_has_changes(repo_path): raise ValueError("Uncommitted changes exist. Commit or stash first.") log_handler.log_debug("No uncommitted changes found.", func_name=func_name) except (GitCommandError, ValueError) as e: log_handler.log_error( f"Status check error before checkout: {e}", func_name=func_name ) raise e except Exception as e: log_handler.log_exception( f"Unexpected status check error: {e}", func_name=func_name ) raise Exception("Unexpected status check error") from e try: # Checkout success = self.git_commands.checkout_tag(repo_path, tag_name) if success: log_handler.log_info( f"Tag '{tag_name}' checked out (Detached HEAD).", func_name=func_name, ) return True else: raise GitCommandError("Checkout failed for unknown reason.") except (GitCommandError, ValueError) as e: log_handler.log_error( f"Failed checkout tag '{tag_name}': {e}", func_name=func_name ) raise e except Exception as e: log_handler.log_exception( f"Unexpected checkout error: {e}", func_name=func_name ) raise Exception("Unexpected tag checkout error") from e def execute_create_branch(self, repo_path: str, branch_name: str): func_name = "execute_create_branch" log_handler.log_info( f"Executing create branch '{branch_name}' in: {repo_path}", func_name=func_name, ) if not branch_name: raise ValueError("Branch name cannot be empty.") try: success = self.git_commands.create_branch(repo_path, branch_name) if success: log_handler.log_info( f"Branch '{branch_name}' created successfully.", func_name=func_name ) return True else: raise GitCommandError(f"Branch creation '{branch_name}' failed.") except (GitCommandError, ValueError) as e: log_handler.log_error(f"Failed create branch: {e}", func_name=func_name) raise e except Exception as e: log_handler.log_exception( f"Unexpected create branch error: {e}", func_name=func_name ) raise Exception("Unexpected branch creation error") from e def execute_switch_branch(self, repo_path: str, branch_name: str): func_name = "execute_switch_branch" if not branch_name: raise ValueError("Branch name required.") log_handler.log_info( f"Executing switch to branch '{branch_name}'.", func_name=func_name ) try: # Check changes if self.git_commands.git_status_has_changes(repo_path): raise ValueError("Uncommitted changes exist. Commit or stash first.") log_handler.log_debug("No uncommitted changes found.", func_name=func_name) except (GitCommandError, ValueError) as e: log_handler.log_error( f"Status check error before switch: {e}", func_name=func_name ) raise e except Exception as e: log_handler.log_exception( f"Unexpected status check error: {e}", func_name=func_name ) raise Exception("Unexpected status check error") from e try: # Switch success = self.git_commands.checkout_branch(repo_path, branch_name) if success: log_handler.log_info( f"Switched to branch '{branch_name}'.", func_name=func_name ) return True else: raise GitCommandError(f"Switch to branch '{branch_name}' failed.") except (GitCommandError, ValueError) as e: log_handler.log_error( f"Failed switch branch '{branch_name}': {e}", func_name=func_name ) raise e except Exception as e: log_handler.log_exception( f"Unexpected switch branch error: {e}", func_name=func_name ) raise Exception("Unexpected branch switch error") from e def execute_delete_branch(self, repo_path: str, branch_name: str, force=False): func_name = "execute_delete_branch" if not branch_name: raise ValueError("Branch name required.") log_handler.log_info( f"Executing delete branch '{branch_name}' (force={force}).", func_name=func_name, ) try: # success = self.git_commands.delete_branch(repo_path, branch_name, force) # return success log_handler.log_warning( "execute_delete_branch not fully implemented.", func_name=func_name ) raise NotImplementedError("Branch deletion logic needs implementation.") except (GitCommandError, ValueError) as e: log_handler.log_error( f"Failed delete branch '{branch_name}': {e}", func_name=func_name ) raise e except Exception as e: log_handler.log_exception( f"Failed delete branch '{branch_name}': {e}", func_name=func_name ) raise Exception("Unexpected branch deletion error") from e def execute_delete_tag(self, repo_path: str, tag_name: str): func_name = "execute_delete_tag" if not tag_name: raise ValueError("Tag name required.") log_handler.log_info( f"Executing delete tag '{tag_name}' in: {repo_path}", func_name=func_name ) try: # success = self.git_commands.delete_tag(repo_path, tag_name) # return success log_handler.log_warning( "execute_delete_tag not fully implemented.", func_name=func_name ) raise NotImplementedError("Tag deletion logic needs implementation.") except (GitCommandError, ValueError) as e: log_handler.log_error( f"Failed delete tag '{tag_name}': {e}", func_name=func_name ) raise e except Exception as e: log_handler.log_exception( f"Failed delete tag '{tag_name}': {e}", func_name=func_name ) raise Exception("Unexpected tag deletion error") from e def execute_untrack_files_from_gitignore(self, repo_path: str): func_name = "execute_untrack_files_from_gitignore" log_handler.log_info( f"Checking for tracked files to untrack in '{repo_path}'...", func_name=func_name, ) rules_to_files_map = {} all_files_to_untrack = [] try: tracked_files = self.git_commands.get_tracked_files(repo_path) log_handler.log_debug( f"Checking {len(tracked_files)} tracked files...", func_name=func_name ) for file_path in tracked_files: norm_file_path = os.path.normpath(file_path) if norm_file_path == ".gitignore": continue matching_rule = self.git_commands.get_matching_gitignore_rule( repo_path, file_path ) if matching_rule is not None: log_handler.log_info( f"Tracked '{file_path}' matches rule: '{matching_rule}'", func_name=func_name, ) if matching_rule not in rules_to_files_map: rules_to_files_map[matching_rule] = [] rules_to_files_map[matching_rule].append(file_path) all_files_to_untrack.append(file_path) if all_files_to_untrack: num_untrack = len(all_files_to_untrack) log_handler.log_info( f"Found {num_untrack} tracked files to untrack.", func_name=func_name, ) self.git_commands.remove_from_tracking(repo_path, all_files_to_untrack) log_handler.log_info( f"Removed {num_untrack} items from tracking.", func_name=func_name ) commit_message = "Chore: Stop tracking files based on .gitignore update.\n\nSummary:\n" sorted_rules = sorted(rules_to_files_map.keys()) for rule in sorted_rules: count = len(rules_to_files_map[rule]) plural = "s" if count > 1 else "" commit_message += ( f'- Rule "{rule}" untracked {count} file{plural}.\n' ) log_handler.log_info( "Creating automatic commit for untracking changes.", func_name=func_name, ) log_handler.log_debug( f"Commit message:\n{commit_message}", func_name=func_name ) commit_made = self.git_commands.git_commit(repo_path, commit_message) if commit_made: log_handler.log_info( "Automatic untrack commit successful.", func_name=func_name ) return True else: log_handler.log_warning( "Untracking done, but commit reported no changes.", func_name=func_name, ) return False else: log_handler.log_info( "No tracked files match .gitignore rules. No action.", func_name=func_name, ) return False except (GitCommandError, ValueError) as e: log_handler.log_error(f"Error during untracking: {e}", func_name=func_name) raise except Exception as e: log_handler.log_exception( f"Unexpected untracking error: {e}", func_name=func_name ) raise Exception("Unexpected untracking error") from e # --- END OF FILE action_handler.py ---