# --- FILE: gitsync_tool/logic/repository_handler.py --- import os from typing import TYPE_CHECKING, Optional, List, Dict, Any, Tuple from gitutility.async_tasks import async_workers from gitutility.gui.dialogs import CreateTagDialog, CreateBranchDialog from gitutility.logging_setup import log_handler from gitutility.commands.git_commands import GitCommandError # Forward reference for type hinting to avoid circular import if TYPE_CHECKING: from ..app import GitSvnSyncApp class RepositoryHandler: """ Handles logic for local repository operations such as preparing the repo, managing bundles, committing changes, and managing local tags and branches. """ def __init__(self, app: "GitSvnSyncApp"): """ Initializes the RepositoryHandler. Args: app: The main application instance. """ self.app = app self.main_frame = app.main_frame self.action_handler = app.action_handler self.git_commands = app.git_commands self.backup_handler = app.backup_handler def prepare_svn_for_git(self): """Starts async operation to prepare the repository (init, gitignore).""" func_name = "prepare_svn_for_git" svn_path = self.app._get_and_validate_svn_path("Prepare Repository") if not svn_path: return if self.app._is_repo_ready(svn_path): log_handler.log_info( "Prepare skipped: Repository already prepared.", func_name=func_name ) self.main_frame.show_info("Info", "Repository is already prepared.") self.app.update_svn_status_indicator(svn_path) return args = (self.action_handler, svn_path) self.app._start_async_operation( worker_func=async_workers.run_prepare_async, args_tuple=args, context_dict={ "context": "prepare_repo", "status_msg": "Preparing repository", }, ) def create_git_bundle(self): """Starts async operation to create a Git bundle file.""" func_name = "create_git_bundle" profile = self.main_frame.profile_var.get() svn_path = self.app._get_and_validate_svn_path("Create Bundle") usb_path = self.app._get_and_validate_usb_path("Create Bundle") bundle_name = self.main_frame.repo_tab.bundle_name_entry.get().strip() if not all([profile, svn_path, usb_path, bundle_name]): log_handler.log_warning( "Create Bundle cancelled: Missing inputs.", func_name=func_name ) return if not self.app._is_repo_ready(svn_path): self.main_frame.show_error("Action Failed", "Repository is not prepared.") return if not bundle_name.lower().endswith(".bundle"): bundle_name += ".bundle" bundle_full_path = os.path.join(usb_path, bundle_name) if not self.app.profile_handler.save_profile_settings(): if not self.main_frame.ask_yes_no( "Warning", "Could not save profile settings.\nContinue creating bundle anyway?", ): return excluded_extensions, excluded_dirs = self.app._parse_exclusions() backup_enabled = self.main_frame.backup_tab.autobackup_var.get() backup_dir = self.main_frame.backup_tab.backup_dir_var.get() commit_enabled = self.main_frame.commit_tab.autocommit_var.get() commit_msg = self.main_frame.commit_tab.get_commit_message() args = ( self.action_handler, svn_path, bundle_full_path, profile, backup_enabled, backup_dir, commit_enabled, commit_msg, excluded_extensions, excluded_dirs, ) self.app._start_async_operation( worker_func=async_workers.run_create_bundle_async, args_tuple=args, context_dict={ "context": "create_bundle", "status_msg": f"Creating bundle '{bundle_name}'", "committed_flag_possible": True, }, ) def fetch_from_git_bundle(self): """Starts async operation to fetch/clone from a Git bundle file.""" func_name = "fetch_from_git_bundle" profile = self.main_frame.profile_var.get() svn_path_str = self.main_frame.repo_tab.svn_path_entry.get().strip() usb_path = self.app._get_and_validate_usb_path("Fetch Bundle") bundle_name = self.main_frame.repo_tab.bundle_updated_name_entry.get().strip() if not all([profile, svn_path_str, usb_path, bundle_name]): log_handler.log_warning( "Fetch Bundle cancelled: Missing inputs.", func_name=func_name ) return bundle_full_path = os.path.join(usb_path, bundle_name) if not os.path.isfile(bundle_full_path): self.main_frame.show_error( "File Not Found", f"Bundle file not found:\n{bundle_full_path}" ) return if not self.app.profile_handler.save_profile_settings(): if not self.main_frame.ask_yes_no( "Warning", "Could not save profile settings.\nContinue fetching anyway?" ): return excluded_extensions, excluded_dirs = self.app._parse_exclusions() backup_enabled = self.main_frame.backup_tab.main_frame.autobackup_var.get() backup_dir = self.main_frame.backup_tab.main_frame.backup_dir_var.get() args = ( self.action_handler, svn_path_str, bundle_full_path, profile, backup_enabled, backup_dir, excluded_extensions, excluded_dirs, ) self.app._start_async_operation( worker_func=async_workers.run_fetch_bundle_async, args_tuple=args, context_dict={ "context": "fetch_bundle", "status_msg": f"Fetching from '{bundle_name}'", "repo_path": svn_path_str, }, ) def commit_changes(self): """Starts async operation to commit staged changes.""" func_name = "commit_changes" svn_path = self.app._get_and_validate_svn_path("Commit") if not svn_path or not self.app._is_repo_ready(svn_path): self.main_frame.show_error("Action Failed", "Repository is not ready.") return commit_msg = self.main_frame.commit_tab.get_commit_message() if not commit_msg: self.main_frame.show_error("Input Error", "Commit message cannot be empty.") return args = (self.action_handler, svn_path, commit_msg) self.app._start_async_operation( worker_func=async_workers.run_commit_async, args_tuple=args, context_dict={ "context": "commit", "status_msg": "Committing changes", "committed_flag_possible": True, }, ) def create_tag(self): """Handles tag creation: shows dialog, suggests name, starts async operation.""" func_name = "create_tag" svn_path = self.app._get_and_validate_svn_path("Create Tag") if not svn_path or not self.app._is_repo_ready(svn_path): self.main_frame.show_error("Action Failed", "Repository is not ready.") return self.main_frame.update_status_bar("Processing: Generating tag suggestion...") suggested_name = self.app._generate_next_tag_suggestion(svn_path) dialog = CreateTagDialog( self.main_frame.master, suggested_tag_name=suggested_name ) tag_info = dialog.result if tag_info: tag_name, tag_message = tag_info args = (self.action_handler, svn_path, tag_name, tag_message) self.app._start_async_operation( worker_func=async_workers.run_create_tag_async, args_tuple=args, context_dict={ "context": "create_tag", "status_msg": f"Creating tag '{tag_name}'", "committed_flag_possible": True, }, ) else: self.main_frame.update_status_bar("Cancelled.") def checkout_tag(self): """Handles tag checkout after user confirmation.""" func_name = "checkout_tag" svn_path = self.app._get_and_validate_svn_path("Checkout Tag") if not svn_path or not self.app._is_repo_ready(svn_path): self.main_frame.show_error("Action Failed", "Repository is not ready.") return tag_name = self.main_frame.tags_tab.get_selected_tag() if not tag_name: self.main_frame.show_error( "Selection Error", "No tag selected from the list." ) return confirmation_message = ( f"Checkout tag '{tag_name}'?\n\n" f"Warning: This will put your repository in a 'detached HEAD' state." ) if not self.main_frame.ask_yes_no("Confirm Checkout Tag", confirmation_message): self.main_frame.update_status_bar("Cancelled.") return args = (self.action_handler, svn_path, tag_name) self.app._start_async_operation( worker_func=async_workers.run_checkout_tag_async, args_tuple=args, context_dict={ "context": "checkout_tag", "status_msg": f"Checking out tag '{tag_name}'", }, ) def revert_to_tag(self): """Handles the destructive 'Revert to Tag' action.""" func_name = "revert_to_tag" svn_path = self.app._get_and_validate_svn_path("Revert to Tag") if not svn_path or not self.app._is_repo_ready(svn_path): self.main_frame.show_error("Action Failed", "Repository is not ready.") return tag_name = self.main_frame.tags_tab.get_selected_tag() if not tag_name: self.main_frame.show_error( "Selection Error", "No tag selected from the list." ) return warning_message = ( f"WARNING: Destructive Operation\n\n" f"This will reset the repository to the state of tag '{tag_name}', " f"permanently deleting all uncommitted changes and later commits on this branch.\n\n" "Are you absolutely sure?" ) if not self.main_frame.ask_yes_no( "Confirm Destructive Revert", warning_message ): self.main_frame.update_status_bar("Revert cancelled.") return args = (self.action_handler, svn_path, tag_name) self.app._start_async_operation( worker_func=async_workers.run_revert_to_tag_async, args_tuple=args, context_dict={ "context": "revert_to_tag", "status_msg": f"Reverting to tag '{tag_name}'", "tag_name": tag_name, }, ) def create_branch(self): """Handles branch creation after getting name from a dialog.""" func_name = "create_branch" svn_path = self.app._get_and_validate_svn_path("Create Branch") if not svn_path or not self.app._is_repo_ready(svn_path): self.main_frame.show_error("Action Failed", "Repository is not ready.") return dialog = CreateBranchDialog(self.main_frame.master) branch_name = dialog.result if branch_name: args = (self.action_handler, svn_path, branch_name) self.app._start_async_operation( worker_func=async_workers.run_create_branch_async, args_tuple=args, context_dict={ "context": "create_branch", "status_msg": f"Creating branch '{branch_name}'", "new_branch_name": branch_name, }, ) else: self.main_frame.update_status_bar("Cancelled.") def checkout_branch(self, branch_to_checkout: Optional[str] = None): """Handles checkout of an existing local branch.""" func_name = "checkout_branch" svn_path = self.app._get_and_validate_svn_path("Checkout Branch") if not svn_path or not self.app._is_repo_ready(svn_path): self.main_frame.show_error("Action Failed", "Repository is not ready.") return target_branch = branch_to_checkout or self.main_frame.get_selected_branch() if not target_branch: self.main_frame.show_error( "Selection Error", "No branch selected from the list." ) return if not branch_to_checkout: # Only ask for confirmation if triggered by button if not self.main_frame.ask_yes_no( "Confirm Checkout", f"Switch to branch '{target_branch}'?" ): self.main_frame.update_status_bar("Cancelled.") return args = (self.action_handler, svn_path, target_branch) self.app._start_async_operation( worker_func=async_workers.run_checkout_branch_async, args_tuple=args, context_dict={ "context": "checkout_branch", "status_msg": f"Checking out branch '{target_branch}'", }, ) def get_gitignore_path(self, repo_path: str) -> str: """Helper to get the full path of the .gitignore file.""" return os.path.join(repo_path, ".gitignore") def handle_gitignore_save(self): """Callback executed after .gitignore is saved. Starts an async untrack check.""" func_name = "handle_gitignore_save" svn_path = self.app._get_and_validate_svn_path("Untrack Check") if not svn_path or not self.app._is_repo_ready(svn_path): log_handler.log_error( "Cannot start untrack check: Invalid/Not ready path.", func_name=func_name, ) return args = (self.action_handler, svn_path) self.app._start_async_operation( worker_func=async_workers.run_untrack_async, args_tuple=args, context_dict={ "context": "_handle_gitignore_save", "status_msg": "Checking files to untrack", "committed_flag_possible": True, }, ) def handle_manual_backup(self): """Starts a manual backup operation.""" func_name = "handle_manual_backup" profile = self.main_frame.profile_var.get() svn_path = self.app._get_and_validate_svn_path(f"Manual Backup ({profile})") backup_dir_str = self.main_frame.backup_tab.backup_dir_var.get().strip() if not profile or not svn_path: return if not backup_dir_str: self.main_frame.show_error( "Input Error", "Backup directory cannot be empty for manual backup." ) return if not self.app.profile_handler.save_profile_settings(): if not self.main_frame.ask_yes_no( "Warning", "Could not save profile settings.\nContinue backup anyway?" ): return excluded_extensions, excluded_dirs = self.app._parse_exclusions() args = ( self.backup_handler, svn_path, os.path.abspath(backup_dir_str), profile, excluded_extensions, excluded_dirs, ) self.app._start_async_operation( worker_func=async_workers.run_manual_backup_async, args_tuple=args, context_dict={ "context": "manual_backup", "status_msg": "Creating manual backup", }, ) def handle_add_selected_file(self, file_status_line: str): """Starts an async operation to add a selected untracked file.""" func_name = "handle_add_selected_file" svn_path = self.app._get_and_validate_svn_path("Add File") if not svn_path or not self.app._is_repo_ready(svn_path): self.main_frame.show_error("Action Failed", "Repository is not ready.") return relative_path = self.app._extract_path_from_status_line(file_status_line) if not relative_path: self.main_frame.show_error( "Parsing Error", f"Cannot parse file path from:\n{file_status_line}" ) return if not file_status_line.strip().startswith("??"): self.main_frame.show_error( "Invalid Action", f"Cannot 'Add' file with status '{file_status_line[:2]}'.", ) return args = (self.git_commands, svn_path, relative_path) self.app._start_async_operation( worker_func=async_workers.run_add_file_async, args_tuple=args, context_dict={ "context": "add_file", "status_msg": f"Adding '{os.path.basename(relative_path)}'", }, ) def handle_delete_local_branch(self, branch_name: str, force: bool): """Handles the request to delete a local branch.""" func_name = "handle_delete_local_branch" action_desc = "Force delete" if force else "Delete" svn_path = self.app._get_and_validate_svn_path(f"{action_desc} Branch") if not svn_path or not self.app._is_repo_ready(svn_path): return confirm_message = ( f"Are you sure you want to delete the local branch '{branch_name}'?" ) if force: confirm_message += ( "\n\nWARNING: Force delete will discard any unmerged changes!" ) if not self.main_frame.ask_yes_no(f"Confirm {action_desc}", confirm_message): return args = (self.action_handler, svn_path, branch_name, force) self.app._start_async_operation( worker_func=async_workers.run_delete_local_branch_async, args_tuple=args, context_dict={ "context": "delete_local_branch", "status_msg": f"{action_desc} branch '{branch_name}'", }, ) def handle_merge_local_branch(self, branch_to_merge: str): """Handles the request to merge a local branch.""" func_name = "handle_merge_local_branch" svn_path = self.app._get_and_validate_svn_path("Merge Branch") if not svn_path or not self.app._is_repo_ready(svn_path): return try: current_branch = self.git_commands.get_current_branch_name(svn_path) if not current_branch: raise ValueError("Cannot merge: Currently in detached HEAD state.") if current_branch == branch_to_merge: raise ValueError("Cannot merge a branch into itself.") except (GitCommandError, ValueError) as e: self.main_frame.show_error("Merge Error", f"Cannot start merge:\n{e}") return if not self.main_frame.ask_yes_no( "Confirm Merge", f"Merge branch '{branch_to_merge}' into '{current_branch}'?", ): return args = ( self.action_handler, self.git_commands, svn_path, branch_to_merge, False, ) # no_ff = False self.app._start_async_operation( worker_func=async_workers.run_merge_local_branch_async, args_tuple=args, context_dict={ "context": "merge_local_branch", "status_msg": f"Merging '{branch_to_merge}' into '{current_branch}'", "repo_path": svn_path, }, ) def handle_promote_branch_to_main(self, branch_to_promote: Optional[str] = None): """Promote a selected branch to become the new main. This creates an optional local backup of `main`, checks out `main`, resets it to the target branch's tip, and (optionally) pushes the updated main to remote with a force-with-lease or force push. """ func_name = "handle_promote_branch_to_main" svn_path = self.app._get_and_validate_svn_path("Promote Branch to main") if not svn_path or not self.app._is_repo_ready(svn_path): return # Determine the actual target. The branch list may contain special entries # like "(HEAD detached from ...)" which `get_selected_branch()` returns as None. target_branch = None if branch_to_promote: target_branch = branch_to_promote else: # Try the helper first (normal branch names) try: target_branch = self.main_frame.get_selected_branch() except Exception: target_branch = None # If still None, inspect the raw listbox selection to handle detached HEAD entries if not target_branch: try: lb = getattr(self.main_frame, "branch_tab", None) if lb and hasattr(lb, "branch_listbox"): sel = lb.branch_listbox.curselection() if sel: raw = lb.branch_listbox.get(sel[0]).strip() # If the entry is a parenthesized label like '(HEAD detached ...)', # we'll treat the promotion target as the current HEAD commit. if raw.startswith("("): target_branch = "HEAD" else: # Fallback to normal parsing target_branch = raw.lstrip("* ").strip() except Exception: target_branch = None if not target_branch: self.main_frame.show_error( "Selection Error", "No branch selected to promote." ) return # Ask which branch should become the target (default to 'master') target_main = self.main_frame.ask_branch_name( "Target Branch", "Enter the name of the branch to be replaced (target branch):", default="master", ) if not target_main: self.main_frame.show_error( "Input Error", "No target branch specified. Promotion cancelled." ) return # Prevent promoting into the same branch if target_branch == target_main: self.main_frame.show_error( "Invalid Action", "Selected branch is the same as the target branch." ) return # Confirmation + options confirm = self.main_frame.ask_yes_no( "Confirm Promote", f"This will make '{target_main}' point to the tip of '{target_branch}'.\n" f"This is potentially destructive for history on '{target_main}'.\n\n" "Proceed?", ) if not confirm: self.main_frame.update_status_bar("Cancelled.") return create_backup = self.main_frame.ask_yes_no( "Create Backup", f"Create a local backup branch for the current '{target_main}' before changing it?", ) push_to_remote = self.main_frame.ask_yes_no( "Push to Remote", f"After changing '{target_main}' locally, push the new '{target_main}' to remote? (This may require force push)", ) force_with_lease = True if push_to_remote: # Prefer --force-with-lease but ask if user explicitly wants plain force use_fwl = self.main_frame.ask_yes_no( "Use Force-With-Lease?", "Use '--force-with-lease' when pushing (safer)?\nSelect 'No' to use plain --force.", ) force_with_lease = bool(use_fwl) # Determine remote name (fallback to 'origin') remote_name = getattr(self.main_frame, "remote_tab", None) remote_val = "origin" try: if hasattr(self.main_frame, "remote_tab"): remote_val = ( self.main_frame.remote_tab.remote_name_var.get() or "origin" ) except Exception: remote_val = "origin" # If the repository doesn't have the target branch, ask the user whether # to create it before proceeding. We pass a flag to the worker that # instructs it to create the target branch at the target commit if the user agreed. create_main_if_missing = False try: branches, _ = self.git_commands.list_branches(svn_path) except Exception: branches = [] if target_main not in branches: want_create = self.main_frame.ask_yes_no( "Target Not Found", ( f"Branch '{target_main}' was not found in the repository.\n\n" f"Do you want to create '{target_main}' pointing to the tip of '{target_branch}' and continue?\n\n" "Selecting 'No' will cancel the promote action." ), ) if not want_create: self.main_frame.show_error( "Action Aborted", f"Promotion cancelled because '{target_main}' branch does not exist.", ) return create_main_if_missing = True args = ( self.git_commands, svn_path, target_branch, target_main, remote_val, create_main_if_missing, create_backup, push_to_remote, force_with_lease, ) # Resolve the worker function lazily to avoid AttributeError if the # module was partially imported earlier. Try direct attribute access # first, then fallback to importlib to re-import the module. try: worker_func = async_workers.run_promote_branch_to_main_async except Exception: try: import importlib aw_mod = importlib.import_module("gitutility.async_tasks.async_workers") worker_func = getattr(aw_mod, "run_promote_branch_to_main_async", None) except Exception: worker_func = None if not callable(worker_func): log_handler.log_error( "Promote worker is not available in async_workers module.", func_name=func_name, ) self.main_frame.show_error( "Internal Error", "Promote action is not available. Please restart the application.", ) return self.app._start_async_operation( worker_func=worker_func, args_tuple=args, context_dict={ "context": "promote_branch", "status_msg": f"Promoting '{target_branch}' to '{target_main}'", "repo_path": svn_path, }, ) def handle_view_commit_details(self, commit_hash_short: str): """Starts an async worker to fetch and display details for a commit.""" func_name = "handle_view_commit_details" svn_path = self.app._get_and_validate_svn_path("View Commit Details") if not svn_path or not self.app._is_repo_ready(svn_path): self.main_frame.show_error("Action Failed", "Repository path is not valid.") return args = (self.git_commands, svn_path, commit_hash_short) self.app._start_async_operation( worker_func=async_workers.run_get_commit_details_async, args_tuple=args, context_dict={ "context": "get_commit_details", "status_msg": f"Loading details for commit {commit_hash_short}", }, ) def handle_reset_to_commit(self, commit_hash: str): """Handles the destructive 'Reset to Commit' action.""" func_name = "handle_reset_to_commit" svn_path = self.app._get_and_validate_svn_path("Reset to Commit") if not svn_path or not self.app._is_repo_ready(svn_path): self.main_frame.show_error("Action Failed", "Repository is not ready.") return args = (self.action_handler, svn_path, commit_hash) self.app._start_async_operation( worker_func=async_workers.run_reset_to_commit_async, args_tuple=args, context_dict={ "context": "reset_to_commit", "status_msg": f"Resetting to commit '{commit_hash[:7]}'", }, )