# --- FILE: gitsync_tool/core/backup_handler.py --- import os import datetime import zipfile import sys # Per fallback print in caso di errore grave from typing import Set, Optional, List, Any # Importa usando il percorso assoluto dal pacchetto gitsync_tool from gitutility.logging_setup import log_handler class BackupHandler: """ Handles the creation of ZIP backups for specified directories, allowing for exclusion of specific file extensions and directory names. Uses the centralized log_handler for logging. """ def __init__( self, logger_ignored: Optional[Any] = None ): # Accetta argomento ma lo ignora """Initializes the BackupHandler.""" # self.logger non è più usato log_handler.log_debug("BackupHandler initialized.", func_name="__init__") def _log_walk_error(self, os_error: OSError) -> None: """ Error handler callback for os.walk to log issues during directory traversal. This function is passed to the 'onerror' argument of os.walk. Uses the centralized log_handler. """ func_name = "_log_walk_error" # Nome funzione per contesto log # Log PermissionError and other OSError subclasses encountered during walk if isinstance(os_error, PermissionError): log_handler.log_warning( f"OS error during directory walk: Cannot access '{os_error.filename}'. " f"Reason: Permission denied. Skipping item/directory.", func_name=func_name, ) elif isinstance(os_error, FileNotFoundError): log_handler.log_warning( f"OS error during directory walk: File or directory not found: '{os_error.filename}'. " f"Reason: {os_error.strerror}. Skipping item/directory.", func_name=func_name, ) elif isinstance(os_error, OSError): # Log generic OSError message log_handler.log_warning( f"OS error during directory walk: Cannot access '{os_error.filename}'. " f"Reason: {os_error.strerror}. Skipping item/directory.", func_name=func_name, ) else: # Log unexpected errors during walk using log_exception for traceback log_handler.log_exception( f"Unexpected error during directory walk: {os_error}", func_name=func_name, ) # Returning None (implicitly) tells os.walk to continue if possible def create_zip_backup( self, source_repo_path: str, backup_base_dir: str, profile_name: str, excluded_extensions: Set[str], excluded_dirs_base: Set[str], ) -> Optional[str]: """ Creates a timestamped ZIP backup of the source directory, respecting exclusions. Uses the centralized log_handler for logging. Args: source_repo_path (str): Absolute path to the directory to back up. backup_base_dir (str): Absolute path to the base backup directory. profile_name (str): Name of the current profile (for filename). excluded_extensions (Set[str]): Set of lowercase file extensions to exclude (e.g., {'.log'}). excluded_dirs_base (Set[str]): Set of lowercase directory base names to exclude (e.g., {'__pycache__'}). Returns: Optional[str]: Full path of the created ZIP file on success, or None otherwise. Raises: ValueError: If input paths are invalid or required args missing/wrong type. IOError: If directory creation or file writing fails critically. PermissionError: If permissions are insufficient for directories or files. Exception: For other unexpected errors during ZIP creation. """ func_name: str = "create_zip_backup" # Function name for logs log_handler.log_info( f"Starting ZIP backup creation for profile '{profile_name}'...", func_name=func_name, ) log_handler.log_debug( f" Source Path: '{source_repo_path}'", func_name=func_name ) log_handler.log_debug( f" Backup Base Directory: '{backup_base_dir}'", func_name=func_name ) log_handler.log_debug( f" Excluded Extensions: {excluded_extensions}", func_name=func_name ) log_handler.log_debug( f" Excluded Directory Names: {excluded_dirs_base}", func_name=func_name ) # --- 1. Validate Inputs --- if not source_repo_path or not os.path.isdir(source_repo_path): raise ValueError( f"Invalid or non-existent source directory path: {source_repo_path}" ) if not backup_base_dir: raise ValueError("Backup base directory cannot be empty.") if not isinstance(excluded_extensions, set): raise TypeError("excluded_extensions must be a set.") if not isinstance(excluded_dirs_base, set): raise TypeError("excluded_dirs_base must be a set.") # --- 2. Prepare Destination Directory --- try: # Check if base directory exists, create if not if not os.path.isdir(backup_base_dir): log_handler.log_info( f"Backup base directory does not exist. Creating: {backup_base_dir}", func_name=func_name, ) # exist_ok=True prevents error if dir was created between check and makedirs call os.makedirs(backup_base_dir, exist_ok=True) # Check write and execute (traverse) permissions explicitly if not os.access(backup_base_dir, os.W_OK | os.X_OK): # Raise specific PermissionError raise PermissionError( f"Write/traverse permission denied for backup directory: {backup_base_dir}" ) except OSError as e: # Log and re-raise directory creation/access errors as IOError for caller log_handler.log_error( f"Cannot create or access backup directory '{backup_base_dir}': {e}", func_name=func_name, ) raise IOError(f"Could not prepare backup directory: {e}") from e # --- 3. Construct Backup Filename --- try: # Timestamp format YYYYMMDD_HHMMSS now_str: str = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # Sanitize profile name for use in filename (allow letters, numbers, underscore, hyphen) safe_profile: str = "".join( c for c in profile_name if c.isalnum() or c in ("_", "-") ).rstrip() # Use a default name if sanitization results in an empty string safe_profile = safe_profile or "profile" # Construct filename: Timestamp_backup_ProfileName.zip backup_filename: str = f"{now_str}_backup_{safe_profile}.zip" backup_full_path: str = os.path.join(backup_base_dir, backup_filename) log_handler.log_info( f"Target backup ZIP file path: {backup_full_path}", func_name=func_name ) except Exception as e: # Wrap filename construction errors for clarity raise ValueError(f"Could not construct a valid backup filename: {e}") from e # --- 4. Create ZIP Archive --- files_added_count: int = 0 files_excluded_count: int = 0 dirs_excluded_count: int = 0 zip_file_object: Optional[zipfile.ZipFile] = None # Use a more descriptive name try: # Open ZIP file with compression and large file support (>4GB) # Using 'with' ensures the file is closed automatically, even on errors with zipfile.ZipFile( file=backup_full_path, mode="w", # Write mode creates a new archive compression=zipfile.ZIP_DEFLATED, # Use standard DEFLATE compression allowZip64=True, # Enable support for large ZIP files ) as zip_file_object: log_handler.log_debug("Starting directory walk...", func_name=func_name) # Walk through the source directory tree # topdown=True allows modification of 'dirs' list to prune traversal # onerror=_log_walk_error handles issues like permission errors during walk for root, dirs, files in os.walk( source_repo_path, topdown=True, # Process directories before their contents onerror=self._log_walk_error, # Use the defined error handler ): # --- Directory Exclusion Logic --- # Keep a copy of original dirs list for logging excluded ones original_dirs: List[str] = list(dirs) # Modify 'dirs' in-place: keep only directories whose lowercase # base name is NOT in the excluded_dirs_base set. # This prevents os.walk from descending into excluded directories. dirs[:] = [d for d in dirs if d.lower() not in excluded_dirs_base] # Log and count the directories that were excluded at this level excluded_at_this_level: Set[str] = set(original_dirs) - set(dirs) if excluded_at_this_level: count: int = len(excluded_at_this_level) dirs_excluded_count += count for excluded_dir_name in excluded_at_this_level: # Log the full path of the excluded directory excluded_dir_path: str = os.path.join( root, excluded_dir_name ) log_handler.log_debug( f"Excluding directory (and its contents): {excluded_dir_path}", func_name=func_name, ) # --- File Exclusion and Addition --- for filename in files: # Get file extension (lowercase) for checking exclusions _, file_ext = os.path.splitext(filename) file_ext_lower: str = file_ext.lower() # Get the full path to the current file file_full_path: str = os.path.join(root, filename) # Check exclusion rules (by extension) if file_ext_lower in excluded_extensions: log_handler.log_debug( f"Excluding file by extension '{file_ext_lower}': {file_full_path}", func_name=func_name, ) files_excluded_count += 1 continue # Skip to the next file # --- Add file to ZIP --- # Calculate the relative path for storage inside the ZIP archive # This preserves the directory structure relative to source_repo_path try: archive_name: str = os.path.relpath( file_full_path, source_repo_path ) except ValueError: # Fallback if relpath fails (e.g., different drives on Windows) # Use a structure like: top_level_dir_name/sub_dir/file.txt archive_name = os.path.join( os.path.basename(root), filename ) log_handler.log_warning( f"Could not get relative path for {file_full_path}. Using fallback arcname: {archive_name}", func_name=func_name, ) # Write the file to the ZIP archive using its relative path try: zip_file_object.write( filename=file_full_path, arcname=archive_name ) files_added_count += 1 # Log progress periodically for very large backups if files_added_count % 1000 == 0 and files_added_count > 0: log_handler.log_debug( f"Added {files_added_count} files to backup...", func_name=func_name, ) except FileNotFoundError: # Log if a file disappears between os.walk and write log_handler.log_warning( f"File not found during write attempt (skipped): {file_full_path}", func_name=func_name, ) files_excluded_count += 1 # Count as excluded due to error except PermissionError: # Log if reading the file is denied log_handler.log_warning( f"Permission denied reading file (skipped): {file_full_path}", func_name=func_name, ) files_excluded_count += 1 # Count as excluded due to error except Exception as write_error: # Log other errors writing a specific file but allow backup to continue log_handler.log_error( f"Error writing file '{file_full_path}' to ZIP (skipped): {write_error}", func_name=func_name, ) files_excluded_count += 1 # Count as excluded due to error # --- End of os.walk loop for current root directory --- # --- End of 'with zipfile.ZipFile' block (file automatically closed) --- log_handler.log_info( "Finished walking directory tree and attempting to add files.", func_name=func_name, ) log_handler.log_debug( f"ZIP file '{backup_full_path}' closed.", func_name=func_name ) # Log final summary after successful walk and write attempts log_handler.log_info( f"Backup Summary - Files Added: {files_added_count}, " f"Files Skipped/Excluded: {files_excluded_count}, " f"Directories Excluded: {dirs_excluded_count}", func_name=func_name, ) except (OSError, zipfile.BadZipFile, zipfile.LargeZipFile) as zip_error: # Handle critical OS errors (permissions, disk space) and ZIP format errors log_handler.log_error( f"Error creating/writing ZIP file '{backup_full_path}': {zip_error}", func_name=func_name, ) # Re-raise as IOError for the caller (ActionHandler) to handle raise IOError( f"Failed to create or write backup ZIP: {zip_error}" ) from zip_error except Exception as unexpected_zip_error: # Catch any other unexpected error during the ZIP creation process log_handler.log_exception( f"Unexpected error during ZIP backup creation: {unexpected_zip_error}", func_name=func_name, ) raise # Re-raise the original exception # --- Final Check and Cleanup --- final_path_to_return: Optional[str] = None try: # Verify if the ZIP file exists after the process completes if os.path.exists(backup_full_path): # Check if any files were actually added to the archive if files_added_count > 0: # Success: file exists and contains content final_path_to_return = backup_full_path log_handler.log_debug( "Final check: Backup file exists and is not empty.", func_name=func_name, ) else: # File exists but is empty (likely source was empty or all files excluded/skipped) log_handler.log_warning( f"Backup ZIP file is empty (0 files added): {backup_full_path}. Removing the empty file.", func_name=func_name, ) try: os.remove(backup_full_path) log_handler.log_info( "Successfully removed empty backup ZIP file.", func_name=func_name, ) except OSError as remove_error: # Log error if removal fails, but proceed returning None log_handler.log_error( f"Failed to remove empty backup ZIP file '{backup_full_path}': {remove_error}", func_name=func_name, ) final_path_to_return = None # Indicate no valid backup was created else: # File doesn't exist at the end if files_added_count > 0: # This indicates an unexpected issue if files were reported added log_handler.log_error( f"Backup process reported adding files, but the final ZIP file is missing: {backup_full_path}", func_name=func_name, ) else: # This is normal if the source directory was empty or all items were excluded/skipped log_handler.log_info( "Backup process completed without creating a file (source possibly empty or all items excluded/skipped).", func_name=func_name, ) final_path_to_return = None # Indicate no backup created except Exception as final_check_error: # Log errors during the final check/cleanup phase log_handler.log_error( f"Error during final check/cleanup of backup file '{backup_full_path}': {final_check_error}", func_name=func_name, ) final_path_to_return = None # Assume failure if the final check fails # Return the full path to the created ZIP file, or None if unsuccessful return final_path_to_return # --- END OF FILE gitsync_tool/core/backup_handler.py ---