SXXXXXXX_GitUtility/gitutility/logic/remote_handler.py

368 lines
14 KiB
Python

# --- FILE: gitsync_tool/logic/remote_handler.py ---
import os
from typing import TYPE_CHECKING, Optional
from gitutility.async_tasks import async_workers
from gitutility.config.config_manager import DEFAULT_REMOTE_NAME
from gitutility.commands.git_commands import GitCommandError
from gitutility.gui.dialogs import CloneFromRemoteDialog
from gitutility.logging_setup import log_handler
# Forward reference for type hinting
if TYPE_CHECKING:
from ..app import GitSvnSyncApp
class RemoteHandler:
"""
Handles logic for remote repository operations like configuring, checking
connection, fetching, pulling, pushing, and cloning.
"""
def __init__(self, app: "GitSvnSyncApp"):
"""
Initializes the RemoteHandler.
Args:
app: The main application instance.
"""
self.app = app
self.main_frame = app.main_frame
self.git_commands = app.git_commands
self.remote_action_handler = app.remote_action_handler
self.action_handler = app.action_handler
def apply_remote_config(self):
"""Applies the remote configuration from the GUI to the local repository."""
func_name = "apply_remote_config"
svn_path = self.app._get_and_validate_svn_path("Apply Remote Config")
if not svn_path or not self.app._is_repo_ready(svn_path):
self.main_frame.show_error(
"Action Failed", "Repository path is not valid or not prepared."
)
return
remote_url = self.main_frame.remote_tab.remote_url_var.get().strip()
remote_name = self.main_frame.remote_tab.remote_name_var.get().strip()
if not remote_url:
self.main_frame.show_error("Input Error", "Remote URL cannot be empty.")
return
if not remote_name:
remote_name = DEFAULT_REMOTE_NAME
self.main_frame.remote_tab.remote_name_var.set(remote_name)
if not self.app.profile_handler.save_profile_settings():
if not self.main_frame.ask_yes_no(
"Warning",
"Could not save profile settings.\nContinue applying config anyway?",
):
return
args = (self.remote_action_handler, svn_path, remote_name, remote_url)
self.app._start_async_operation(
worker_func=async_workers.run_apply_remote_config_async,
args_tuple=args,
context_dict={
"context": "apply_remote_config",
"status_msg": f"Applying config for remote '{remote_name}'",
},
)
def check_connection_auth(self):
"""Checks the connection and authentication status of the remote."""
func_name = "check_connection_auth"
svn_path = self.app._get_and_validate_svn_path("Check Connection")
if not svn_path or not self.app._is_repo_ready(svn_path):
self.app._update_gui_auth_status("unknown")
return
remote_name = (
self.main_frame.remote_tab.remote_name_var.get().strip()
or DEFAULT_REMOTE_NAME
)
self.main_frame.remote_tab.remote_name_var.set(remote_name)
self.app._update_gui_auth_status("checking")
args = (self.git_commands, svn_path, remote_name)
self.app._start_async_operation(
worker_func=async_workers.run_check_connection_async,
args_tuple=args,
context_dict={
"context": "check_connection",
"status_msg": f"Checking remote '{remote_name}'",
"remote_name_checked": remote_name,
"repo_path_checked": svn_path,
},
)
def fetch_remote(self):
"""Starts the asynchronous 'git fetch' operation."""
func_name = "fetch_remote"
svn_path = self.app._get_and_validate_svn_path("Fetch Remote")
if not svn_path or not self.app._is_repo_ready(svn_path):
self.main_frame.show_error("Action Failed", "Repository is not prepared.")
return
remote_name = (
self.main_frame.remote_tab.remote_name_var.get().strip()
or DEFAULT_REMOTE_NAME
)
args = (self.remote_action_handler, svn_path, remote_name)
self.app._start_async_operation(
worker_func=async_workers.run_fetch_remote_async,
args_tuple=args,
context_dict={
"context": "fetch_remote",
"status_msg": f"Fetching from remote '{remote_name}'",
},
)
def pull_remote(self):
"""Starts the asynchronous 'git pull' operation."""
func_name = "pull_remote"
svn_path = self.app._get_and_validate_svn_path("Pull Remote")
if not svn_path or not self.app._is_repo_ready(svn_path):
self.main_frame.show_error("Action Failed", "Repository is not prepared.")
return
remote_name = (
self.main_frame.remote_tab.remote_name_var.get().strip()
or DEFAULT_REMOTE_NAME
)
if self.app.remote_auth_status != "ok":
self.main_frame.show_warning(
"Action Blocked",
"Cannot Pull: Connection status is not 'Connected'. Please check connection first.",
)
return
args = (self.remote_action_handler, self.git_commands, svn_path, remote_name)
self.app._start_async_operation(
worker_func=async_workers.run_pull_remote_async,
args_tuple=args,
context_dict={
"context": "pull_remote",
"status_msg": f"Pulling from remote '{remote_name}'",
"repo_path": svn_path,
"remote_name": remote_name,
},
)
def push_remote(self):
"""Starts the asynchronous 'git push' operation."""
func_name = "push_remote"
svn_path = self.app._get_and_validate_svn_path("Push Branch")
if not svn_path or not self.app._is_repo_ready(svn_path):
self.main_frame.show_error("Action Failed", "Repository is not prepared.")
return
remote_name = (
self.main_frame.remote_tab.remote_name_var.get().strip()
or DEFAULT_REMOTE_NAME
)
if self.app.remote_auth_status != "ok":
self.main_frame.show_warning(
"Action Blocked",
"Cannot Push: Connection status is not 'Connected'. Please check connection first.",
)
return
try:
if self.git_commands.git_status_has_changes(svn_path):
if not self.main_frame.ask_yes_no(
"Uncommitted Changes", "There are uncommitted changes. Push anyway?"
):
return
except GitCommandError as e:
self.main_frame.show_error(
"Status Error", f"Could not check repo status:\n{e}"
)
return
args = (self.remote_action_handler, self.git_commands, svn_path, remote_name)
self.app._start_async_operation(
worker_func=async_workers.run_push_remote_async,
args_tuple=args,
context_dict={
"context": "push_remote",
"status_msg": f"Pushing current branch to '{remote_name}'",
"remote_name": remote_name,
},
)
def push_tags_remote(self):
"""Starts the asynchronous 'git push --tags' operation."""
func_name = "push_tags_remote"
svn_path = self.app._get_and_validate_svn_path("Push Tags")
if not svn_path or not self.app._is_repo_ready(svn_path):
self.main_frame.show_error("Action Failed", "Repository is not prepared.")
return
remote_name = (
self.main_frame.remote_tab.remote_name_var.get().strip()
or DEFAULT_REMOTE_NAME
)
if self.app.remote_auth_status != "ok":
self.main_frame.show_warning(
"Action Blocked",
"Cannot Push Tags: Connection status is not 'Connected'.",
)
return
if not self.main_frame.ask_yes_no(
"Confirm Push Tags", f"Push all local tags to remote '{remote_name}'?"
):
return
args = (self.remote_action_handler, svn_path, remote_name)
self.app._start_async_operation(
worker_func=async_workers.run_push_tags_async,
args_tuple=args,
context_dict={
"context": "push_tags_remote",
"status_msg": f"Pushing tags to '{remote_name}'",
"remote_name": remote_name,
},
)
def clone_remote_repo(self):
"""Handles the 'Clone from Remote...' action via a dialog."""
func_name = "clone_remote_repo"
dialog = CloneFromRemoteDialog(self.main_frame.master)
dialog_result = dialog.result
if not dialog_result:
self.main_frame.update_status_bar("Clone cancelled.")
return
remote_url, local_parent_dir, profile_name_input = dialog_result
try:
repo_name_from_url = os.path.basename(remote_url)
if repo_name_from_url.lower().endswith(".git"):
repo_name_from_url = repo_name_from_url[:-4]
if not repo_name_from_url:
raise ValueError("Could not derive repository name from URL.")
target_clone_dir = os.path.join(local_parent_dir, repo_name_from_url)
# Find a unique profile name
final_profile_name = profile_name_input or repo_name_from_url
counter = 1
while final_profile_name in self.app.config_manager.get_profile_sections():
final_profile_name = (
f"{profile_name_input or repo_name_from_url}_{counter}"
)
counter += 1
if os.path.exists(target_clone_dir):
raise ValueError(
f"Clone failed: Target directory already exists:\n{target_clone_dir}"
)
except ValueError as ve:
self.main_frame.show_error("Configuration Error", str(ve))
return
args = (self.git_commands, remote_url, target_clone_dir, final_profile_name)
self.app._start_async_operation(
worker_func=async_workers.run_clone_remote_async,
args_tuple=args,
context_dict={
"context": "clone_remote",
"status_msg": f"Cloning '{repo_name_from_url}'...",
"clone_success_data": {
"profile_name": final_profile_name,
"cloned_path": target_clone_dir,
"remote_url": remote_url,
},
},
)
def checkout_remote_branch_as_local(
self, remote_branch_full: str, local_branch_suggestion: str
):
"""Handles checkout of a remote branch as a new local tracking branch."""
func_name = "checkout_remote_branch_as_local"
svn_path = self.app._get_and_validate_svn_path("Checkout Remote Branch")
if not svn_path or not self.app._is_repo_ready(svn_path):
self.main_frame.show_error("Action Failed", "Repository path is not valid.")
return
try:
local_branches, _ = self.git_commands.list_branches(svn_path)
if local_branch_suggestion in local_branches:
if self.main_frame.ask_yes_no(
"Branch Exists",
f"A local branch named '{local_branch_suggestion}' already exists.\n\nDo you want to check out the existing local branch instead?",
):
self.app.repository_handler.checkout_branch(
branch_to_checkout=local_branch_suggestion
)
else:
self.main_frame.update_status_bar("Checkout cancelled.")
return
args = (
self.action_handler,
svn_path,
local_branch_suggestion,
remote_branch_full,
)
self.app._start_async_operation(
worker_func=async_workers.run_checkout_tracking_branch_async,
args_tuple=args,
context_dict={
"context": "checkout_tracking_branch",
"status_msg": f"Checking out '{local_branch_suggestion}'",
},
)
except Exception as e:
log_handler.log_exception(
f"Error preparing for tracking branch checkout: {e}",
func_name=func_name,
)
self.main_frame.show_error(
"Checkout Error", f"Could not start checkout operation:\n{e}"
)
def handle_compare_branch_with_current(self, other_branch_ref: str):
"""Handles comparing a selected branch with the current branch."""
func_name = "compare_branch_with_current"
svn_path = self.app._get_and_validate_svn_path(f"Compare Branch")
if not svn_path or not self.app._is_repo_ready(svn_path):
return
try:
current_branch = self.git_commands.get_current_branch_name(svn_path)
if not current_branch:
raise ValueError("Cannot compare: Currently in detached HEAD state.")
if current_branch == other_branch_ref:
self.main_frame.show_info(
"Compare Info", "Cannot compare a branch with itself."
)
return
except (GitCommandError, ValueError) as e:
self.main_frame.show_error("Compare Error", f"Cannot start compare:\n{e}")
return
args = (self.git_commands, svn_path, current_branch, other_branch_ref)
self.app._start_async_operation(
worker_func=async_workers.run_compare_branches_async,
args_tuple=args,
context_dict={
"context": "compare_branches",
"status_msg": f"Comparing '{current_branch}' vs '{other_branch_ref}'",
"ref1": current_branch,
"ref2": other_branch_ref,
"repo_path": svn_path,
},
)