# --- FILE: gitsync_tool/logic/remote_handler.py --- import os from typing import TYPE_CHECKING, Optional from gitutility.async_tasks import async_workers from gitutility.config.config_manager import DEFAULT_REMOTE_NAME from gitutility.commands.git_commands import GitCommandError from gitutility.gui.dialogs import CloneFromRemoteDialog from gitutility.logging_setup import log_handler # Forward reference for type hinting if TYPE_CHECKING: from ..app import GitSvnSyncApp class RemoteHandler: """ Handles logic for remote repository operations like configuring, checking connection, fetching, pulling, pushing, and cloning. """ def __init__(self, app: "GitSvnSyncApp"): """ Initializes the RemoteHandler. Args: app: The main application instance. """ self.app = app self.main_frame = app.main_frame self.git_commands = app.git_commands self.remote_action_handler = app.remote_action_handler self.action_handler = app.action_handler def apply_remote_config(self): """Applies the remote configuration from the GUI to the local repository.""" func_name = "apply_remote_config" svn_path = self.app._get_and_validate_svn_path("Apply Remote Config") if not svn_path or not self.app._is_repo_ready(svn_path): self.main_frame.show_error( "Action Failed", "Repository path is not valid or not prepared." ) return remote_url = self.main_frame.remote_tab.remote_url_var.get().strip() remote_name = self.main_frame.remote_tab.remote_name_var.get().strip() if not remote_url: self.main_frame.show_error("Input Error", "Remote URL cannot be empty.") return if not remote_name: remote_name = DEFAULT_REMOTE_NAME self.main_frame.remote_tab.remote_name_var.set(remote_name) if not self.app.profile_handler.save_profile_settings(): if not self.main_frame.ask_yes_no( "Warning", "Could not save profile settings.\nContinue applying config anyway?", ): return args = (self.remote_action_handler, svn_path, remote_name, remote_url) self.app._start_async_operation( worker_func=async_workers.run_apply_remote_config_async, args_tuple=args, context_dict={ "context": "apply_remote_config", "status_msg": f"Applying config for remote '{remote_name}'", }, ) def check_connection_auth(self): """Checks the connection and authentication status of the remote.""" func_name = "check_connection_auth" svn_path = self.app._get_and_validate_svn_path("Check Connection") if not svn_path or not self.app._is_repo_ready(svn_path): self.app._update_gui_auth_status("unknown") return remote_name = ( self.main_frame.remote_tab.remote_name_var.get().strip() or DEFAULT_REMOTE_NAME ) self.main_frame.remote_tab.remote_name_var.set(remote_name) self.app._update_gui_auth_status("checking") args = (self.git_commands, svn_path, remote_name) self.app._start_async_operation( worker_func=async_workers.run_check_connection_async, args_tuple=args, context_dict={ "context": "check_connection", "status_msg": f"Checking remote '{remote_name}'", "remote_name_checked": remote_name, "repo_path_checked": svn_path, }, ) def fetch_remote(self): """Starts the asynchronous 'git fetch' operation.""" func_name = "fetch_remote" svn_path = self.app._get_and_validate_svn_path("Fetch Remote") if not svn_path or not self.app._is_repo_ready(svn_path): self.main_frame.show_error("Action Failed", "Repository is not prepared.") return remote_name = ( self.main_frame.remote_tab.remote_name_var.get().strip() or DEFAULT_REMOTE_NAME ) args = (self.remote_action_handler, svn_path, remote_name) self.app._start_async_operation( worker_func=async_workers.run_fetch_remote_async, args_tuple=args, context_dict={ "context": "fetch_remote", "status_msg": f"Fetching from remote '{remote_name}'", }, ) def pull_remote(self): """Starts the asynchronous 'git pull' operation.""" func_name = "pull_remote" svn_path = self.app._get_and_validate_svn_path("Pull Remote") if not svn_path or not self.app._is_repo_ready(svn_path): self.main_frame.show_error("Action Failed", "Repository is not prepared.") return remote_name = ( self.main_frame.remote_tab.remote_name_var.get().strip() or DEFAULT_REMOTE_NAME ) if self.app.remote_auth_status != "ok": self.main_frame.show_warning( "Action Blocked", "Cannot Pull: Connection status is not 'Connected'. Please check connection first.", ) return args = (self.remote_action_handler, self.git_commands, svn_path, remote_name) self.app._start_async_operation( worker_func=async_workers.run_pull_remote_async, args_tuple=args, context_dict={ "context": "pull_remote", "status_msg": f"Pulling from remote '{remote_name}'", "repo_path": svn_path, "remote_name": remote_name, }, ) def push_remote(self): """Starts the asynchronous 'git push' operation.""" func_name = "push_remote" svn_path = self.app._get_and_validate_svn_path("Push Branch") if not svn_path or not self.app._is_repo_ready(svn_path): self.main_frame.show_error("Action Failed", "Repository is not prepared.") return remote_name = ( self.main_frame.remote_tab.remote_name_var.get().strip() or DEFAULT_REMOTE_NAME ) if self.app.remote_auth_status != "ok": self.main_frame.show_warning( "Action Blocked", "Cannot Push: Connection status is not 'Connected'. Please check connection first.", ) return try: if self.git_commands.git_status_has_changes(svn_path): if not self.main_frame.ask_yes_no( "Uncommitted Changes", "There are uncommitted changes. Push anyway?" ): return except GitCommandError as e: self.main_frame.show_error( "Status Error", f"Could not check repo status:\n{e}" ) return args = (self.remote_action_handler, self.git_commands, svn_path, remote_name) self.app._start_async_operation( worker_func=async_workers.run_push_remote_async, args_tuple=args, context_dict={ "context": "push_remote", "status_msg": f"Pushing current branch to '{remote_name}'", "remote_name": remote_name, }, ) def push_tags_remote(self): """Starts the asynchronous 'git push --tags' operation.""" func_name = "push_tags_remote" svn_path = self.app._get_and_validate_svn_path("Push Tags") if not svn_path or not self.app._is_repo_ready(svn_path): self.main_frame.show_error("Action Failed", "Repository is not prepared.") return remote_name = ( self.main_frame.remote_tab.remote_name_var.get().strip() or DEFAULT_REMOTE_NAME ) if self.app.remote_auth_status != "ok": self.main_frame.show_warning( "Action Blocked", "Cannot Push Tags: Connection status is not 'Connected'.", ) return if not self.main_frame.ask_yes_no( "Confirm Push Tags", f"Push all local tags to remote '{remote_name}'?" ): return args = (self.remote_action_handler, svn_path, remote_name) self.app._start_async_operation( worker_func=async_workers.run_push_tags_async, args_tuple=args, context_dict={ "context": "push_tags_remote", "status_msg": f"Pushing tags to '{remote_name}'", "remote_name": remote_name, }, ) def clone_remote_repo(self): """Handles the 'Clone from Remote...' action via a dialog.""" func_name = "clone_remote_repo" dialog = CloneFromRemoteDialog(self.main_frame.master) dialog_result = dialog.result if not dialog_result: self.main_frame.update_status_bar("Clone cancelled.") return remote_url, local_parent_dir, profile_name_input = dialog_result try: repo_name_from_url = os.path.basename(remote_url) if repo_name_from_url.lower().endswith(".git"): repo_name_from_url = repo_name_from_url[:-4] if not repo_name_from_url: raise ValueError("Could not derive repository name from URL.") target_clone_dir = os.path.join(local_parent_dir, repo_name_from_url) # Find a unique profile name final_profile_name = profile_name_input or repo_name_from_url counter = 1 while final_profile_name in self.app.config_manager.get_profile_sections(): final_profile_name = ( f"{profile_name_input or repo_name_from_url}_{counter}" ) counter += 1 if os.path.exists(target_clone_dir): raise ValueError( f"Clone failed: Target directory already exists:\n{target_clone_dir}" ) except ValueError as ve: self.main_frame.show_error("Configuration Error", str(ve)) return args = (self.git_commands, remote_url, target_clone_dir, final_profile_name) self.app._start_async_operation( worker_func=async_workers.run_clone_remote_async, args_tuple=args, context_dict={ "context": "clone_remote", "status_msg": f"Cloning '{repo_name_from_url}'...", "clone_success_data": { "profile_name": final_profile_name, "cloned_path": target_clone_dir, "remote_url": remote_url, }, }, ) def checkout_remote_branch_as_local( self, remote_branch_full: str, local_branch_suggestion: str ): """Handles checkout of a remote branch as a new local tracking branch.""" func_name = "checkout_remote_branch_as_local" svn_path = self.app._get_and_validate_svn_path("Checkout Remote Branch") if not svn_path or not self.app._is_repo_ready(svn_path): self.main_frame.show_error("Action Failed", "Repository path is not valid.") return try: local_branches, _ = self.git_commands.list_branches(svn_path) if local_branch_suggestion in local_branches: if self.main_frame.ask_yes_no( "Branch Exists", f"A local branch named '{local_branch_suggestion}' already exists.\n\nDo you want to check out the existing local branch instead?", ): self.app.repository_handler.checkout_branch( branch_to_checkout=local_branch_suggestion ) else: self.main_frame.update_status_bar("Checkout cancelled.") return args = ( self.action_handler, svn_path, local_branch_suggestion, remote_branch_full, ) self.app._start_async_operation( worker_func=async_workers.run_checkout_tracking_branch_async, args_tuple=args, context_dict={ "context": "checkout_tracking_branch", "status_msg": f"Checking out '{local_branch_suggestion}'", }, ) except Exception as e: log_handler.log_exception( f"Error preparing for tracking branch checkout: {e}", func_name=func_name, ) self.main_frame.show_error( "Checkout Error", f"Could not start checkout operation:\n{e}" ) def handle_compare_branch_with_current(self, other_branch_ref: str): """Handles comparing a selected branch with the current branch.""" func_name = "compare_branch_with_current" svn_path = self.app._get_and_validate_svn_path(f"Compare Branch") if not svn_path or not self.app._is_repo_ready(svn_path): return try: current_branch = self.git_commands.get_current_branch_name(svn_path) if not current_branch: raise ValueError("Cannot compare: Currently in detached HEAD state.") if current_branch == other_branch_ref: self.main_frame.show_info( "Compare Info", "Cannot compare a branch with itself." ) return except (GitCommandError, ValueError) as e: self.main_frame.show_error("Compare Error", f"Cannot start compare:\n{e}") return args = (self.git_commands, svn_path, current_branch, other_branch_ref) self.app._start_async_operation( worker_func=async_workers.run_compare_branches_async, args_tuple=args, context_dict={ "context": "compare_branches", "status_msg": f"Comparing '{current_branch}' vs '{other_branch_ref}'", "ref1": current_branch, "ref2": other_branch_ref, "repo_path": svn_path, }, )