267 lines
14 KiB
Python
267 lines
14 KiB
Python
# RepoSync/core/sync_manager.py
|
|
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import stat
|
|
import time
|
|
from enum import Enum, auto
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional, Callable
|
|
|
|
from .base_vcs_client import BaseVCSClient
|
|
from .git_manager import GitManager, GitCommandError
|
|
|
|
def robust_rmtree(path, max_retries=3, delay=0.1):
|
|
for i in range(max_retries):
|
|
try:
|
|
shutil.rmtree(path, onerror=_remove_readonly_onerror)
|
|
return
|
|
except PermissionError as e:
|
|
if "used by another process" in str(e) and i < max_retries - 1:
|
|
time.sleep(delay)
|
|
else:
|
|
raise
|
|
except FileNotFoundError:
|
|
return
|
|
|
|
def _remove_readonly_onerror(func, path, exc_info):
|
|
if not os.access(path, os.W_OK):
|
|
os.chmod(path, stat.S_IWRITE)
|
|
func(path)
|
|
else:
|
|
raise exc_info[1]
|
|
|
|
class WikiInitializationRequiredError(Exception):
|
|
"""Custom exception raised when a wiki requires manual initialization."""
|
|
def __init__(self, message, repo_name):
|
|
super().__init__(message)
|
|
self.repo_name = repo_name
|
|
|
|
class SyncState(Enum):
|
|
IDENTICAL = auto()
|
|
AHEAD = auto()
|
|
BEHIND = auto()
|
|
DIVERGED = auto()
|
|
NEW_REPO = auto()
|
|
ORPHANED_BUNDLE = auto()
|
|
ERROR = auto()
|
|
|
|
class SyncManager:
|
|
def __init__(self, git_manager: GitManager, vcs_client: BaseVCSClient, sync_bundle_path: Path, logger: logging.Logger):
|
|
self.git_manager, self.vcs_client, self.sync_bundle_path, self.logger = git_manager, vcs_client, sync_bundle_path, logger
|
|
self.temp_clone_path, self.manifest_path = self.sync_bundle_path / "temp_clones", self.sync_bundle_path / "manifest.json"
|
|
self.logger.debug("SyncManager for offline sync initialized.")
|
|
|
|
# ... (le altre funzioni rimangono invariate) ...
|
|
|
|
def _load_manifest(self) -> Dict[str, Any]:
|
|
if not self.manifest_path.exists(): return {"repositories": {}}
|
|
try:
|
|
with open(self.manifest_path, "r", encoding="utf-8") as f: return json.load(f)
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
self.logger.error(f"Could not read or parse manifest file, starting fresh: {e}")
|
|
return {"repositories": {}}
|
|
|
|
def _write_manifest(self, manifest_data: Dict[str, Any]):
|
|
try:
|
|
with open(self.manifest_path, "w", encoding="utf-8") as f: json.dump(manifest_data, f, indent=4)
|
|
self.logger.info(f"Manifest file written successfully to: {self.manifest_path}")
|
|
except IOError as e:
|
|
self.logger.error(f"Failed to write manifest file: {e}")
|
|
raise
|
|
|
|
def export_repositories_to_bundles(self, source_repos: List[Dict], progress_callback: Optional[Callable[[int, int, int], None]] = None):
|
|
self.logger.info(f"Starting export of {len(source_repos)} items...")
|
|
self.sync_bundle_path.mkdir(exist_ok=True)
|
|
self.temp_clone_path.mkdir(exist_ok=True)
|
|
full_manifest = self._load_manifest()
|
|
manifest_repos = full_manifest.get("repositories", {})
|
|
total_repos, successes = len(source_repos), 0
|
|
|
|
for i, repo in enumerate(source_repos):
|
|
repo_name, local_repo_path = repo["name"], self.temp_clone_path / repo["name"]
|
|
bundle_file_name = repo["name"].replace(" (Wiki)", ".wiki") + ".bundle"
|
|
bundle_file_path = self.sync_bundle_path / bundle_file_name
|
|
bundle_size_bytes, is_wiki = 0, repo.get("is_wiki", False)
|
|
|
|
self.logger.info(f"--- Processing '{repo_name}' ({i+1}/{total_repos}) ---")
|
|
try:
|
|
if local_repo_path.exists(): robust_rmtree(local_repo_path)
|
|
self.git_manager.clone(remote_url=repo["clone_url"], local_path=str(local_repo_path), token=self.vcs_client.token)
|
|
self.git_manager.create_git_bundle(str(local_repo_path), str(bundle_file_path))
|
|
if bundle_file_path.exists(): bundle_size_bytes = bundle_file_path.stat().st_size
|
|
|
|
manifest_repos[repo_name] = {
|
|
"bundle_file": bundle_file_path.name,
|
|
"last_update": datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
|
"branches": self.git_manager.get_branches_with_heads(str(local_repo_path)),
|
|
"description": repo.get("description", ""), "private": repo.get("private", True),
|
|
"owner": repo.get("owner"), "clone_url": repo.get("clone_url")
|
|
}
|
|
successes += 1
|
|
except GitCommandError as e:
|
|
if is_wiki and ("Could not read from remote repository" in str(e) or "does not appear to be a git repository" in str(e)):
|
|
self.logger.warning(f"Skipping empty or uninitialized wiki for '{repo_name}'.")
|
|
else: self.logger.error(f"Failed to process repository '{repo_name}': {e}")
|
|
except Exception as e:
|
|
self.logger.error(f"An unexpected error occurred while processing '{repo_name}': {e}", exc_info=True)
|
|
finally:
|
|
if local_repo_path.exists(): robust_rmtree(local_repo_path)
|
|
if progress_callback: progress_callback(i + 1, total_repos, bundle_size_bytes)
|
|
|
|
if successes > 0:
|
|
full_manifest["repositories"].update(manifest_repos)
|
|
full_manifest["export_timestamp"] = datetime.datetime.now(datetime.timezone.utc).isoformat()
|
|
full_manifest["source_server_url"] = self.vcs_client.api_url
|
|
self._write_manifest(full_manifest)
|
|
else:
|
|
self.logger.warning("Export process finished with no successful exports. Manifest was not updated.")
|
|
self.logger.info("Export process finished.")
|
|
|
|
def compare_server_with_manifest(self, server_repos: List[Dict]) -> List[Dict[str, Any]]:
|
|
self.logger.info("Comparing server state with local manifest...")
|
|
manifest = self._load_manifest()
|
|
manifest_repos, results = manifest.get("repositories", {}), []
|
|
server_repo_names, manifest_repo_names = {r['name'] for r in server_repos}, set(manifest_repos.keys())
|
|
|
|
for repo in server_repos:
|
|
repo_name = repo['name']
|
|
state = SyncState.NEW_REPO
|
|
if repo_name in manifest_repos:
|
|
manifest_branches = manifest_repos[repo_name].get("branches", {})
|
|
remote_branches = self.vcs_client.get_repository_branches(repo['owner'], repo_name)
|
|
state = SyncState.AHEAD if manifest_branches != remote_branches else SyncState.IDENTICAL
|
|
repo['state'] = state
|
|
results.append(repo)
|
|
|
|
for name in manifest_repo_names - server_repo_names:
|
|
self.logger.warning(f"Found orphaned bundle: '{name}' is in the manifest but not on the server.")
|
|
results.append({
|
|
"name": name, "state": SyncState.ORPHANED_BUNDLE,
|
|
"description": manifest_repos[name].get("description", "Orphaned bundle."),
|
|
"is_wiki": " (Wiki)" in name, "size_kb": 0
|
|
})
|
|
return results
|
|
|
|
def compare_bundles_with_remote(self) -> List[Dict[str, Any]]:
|
|
self.logger.info("Comparing local bundles with remote server state...")
|
|
manifest = self._load_manifest()
|
|
if not manifest.get("repositories"):
|
|
self.logger.warning("Manifest is empty. Cannot compare."); return []
|
|
comparison_results, manifest_repos = [], manifest.get("repositories", {})
|
|
|
|
for repo_name, bundle_info in manifest_repos.items():
|
|
self.logger.debug(f"Comparing state for '{repo_name}'...")
|
|
bundle_file = self.sync_bundle_path / bundle_info.get("bundle_file", "")
|
|
bundle_info["bundle_size_bytes"] = bundle_file.stat().st_size if bundle_file.exists() else 0
|
|
remote_repo = self.vcs_client.get_repository(repo_name)
|
|
|
|
# For wikis, if the base repo exists but has_wiki is false, it's effectively a NEW_REPO
|
|
if remote_repo and " (Wiki)" in repo_name and not remote_repo.get("clone_url"):
|
|
remote_repo = None # Treat as non-existent for sync state purposes
|
|
|
|
if not remote_repo:
|
|
comparison_results.append({"name": repo_name, "state": SyncState.NEW_REPO, "bundle_info": bundle_info})
|
|
continue
|
|
|
|
remote_owner = remote_repo.get("owner")
|
|
if not remote_owner:
|
|
comparison_results.append({"name": repo_name, "state": SyncState.ERROR, "details": "Could not determine remote owner.", "bundle_info": bundle_info})
|
|
continue
|
|
|
|
remote_branches = self.vcs_client.get_repository_branches(remote_owner, repo_name)
|
|
bundle_branches = bundle_info.get("branches", {})
|
|
all_branches = set(bundle_branches.keys()) | set(remote_branches.keys())
|
|
is_identical, is_ahead, is_behind = True, False, False
|
|
|
|
for branch in all_branches:
|
|
bundle_hash, remote_hash = bundle_branches.get(branch), remote_branches.get(branch)
|
|
if bundle_hash != remote_hash:
|
|
is_identical = False
|
|
if bundle_hash and not remote_hash: is_behind = True
|
|
elif not bundle_hash and remote_hash: is_ahead = True
|
|
else: is_ahead, is_behind = True, True
|
|
|
|
state = SyncState.IDENTICAL
|
|
if not is_identical:
|
|
if is_ahead and not is_behind: state = SyncState.AHEAD
|
|
elif not is_ahead and is_behind: state = SyncState.BEHIND
|
|
else: state = SyncState.DIVERGED
|
|
comparison_results.append({"name": repo_name, "state": state, "bundle_info": bundle_info})
|
|
return comparison_results
|
|
|
|
def import_repositories_from_bundles(self, repos_to_import: List[Dict], progress_callback: Optional[Callable[[int, int], None]] = None):
|
|
self.logger.info(f"Starting import of {len(repos_to_import)} selected repositories...")
|
|
self.temp_clone_path.mkdir(exist_ok=True)
|
|
total_repos = len(repos_to_import)
|
|
|
|
for i, repo_info in enumerate(repos_to_import):
|
|
repo_name, bundle_info = repo_info["name"], repo_info["bundle_info"]
|
|
bundle_file = self.sync_bundle_path / bundle_info["bundle_file"]
|
|
local_repo_path = self.temp_clone_path / repo_name
|
|
is_wiki = " (Wiki)" in repo_name
|
|
|
|
if not bundle_file.exists():
|
|
self.logger.error(f"Bundle file {bundle_file} for '{repo_name}' not found. Skipping.")
|
|
if progress_callback: progress_callback(i + 1, total_repos)
|
|
continue
|
|
|
|
self.logger.info(f"--- Importing '{repo_name}' ({i+1}/{total_repos}) ---")
|
|
try:
|
|
if local_repo_path.exists(): robust_rmtree(local_repo_path)
|
|
base_repo_name = repo_name.removesuffix(" (Wiki)").strip()
|
|
dest_repo = self.vcs_client.get_repository(base_repo_name)
|
|
|
|
is_new_repo = not dest_repo
|
|
if is_new_repo:
|
|
self.logger.info(f"Base repository '{base_repo_name}' not found, creating...")
|
|
dest_repo = self.vcs_client.create_repository(name=base_repo_name, description=bundle_info.get("description", ""), private=bundle_info.get("private", True))
|
|
|
|
if is_wiki:
|
|
# Check if the remote wiki is already initialized
|
|
remote_wiki_exists = self.git_manager.check_remote_exists(
|
|
self.vcs_client.get_repository(repo_name)["clone_url"],
|
|
self.vcs_client.token
|
|
)
|
|
|
|
if not remote_wiki_exists:
|
|
# Attempt to enable the wiki via API, but prepare for failure
|
|
self.vcs_client.enable_wiki(dest_repo["owner"], base_repo_name)
|
|
# Raise the specific error to be caught by the UI
|
|
raise WikiInitializationRequiredError(
|
|
"The remote wiki repository must be initialized manually.",
|
|
repo_name=repo_name
|
|
)
|
|
|
|
# If we get here, it means the wiki already exists, so proceed with normal sync
|
|
self.git_manager.clone_from_bundle(str(bundle_file), str(local_repo_path))
|
|
dest_clone_url = self.vcs_client.get_repository(repo_name)["clone_url"]
|
|
auth_clone_url = self.git_manager.get_url_with_token(dest_clone_url, self.vcs_client.token)
|
|
self.git_manager.set_remote_url(str(local_repo_path), "origin", auth_clone_url)
|
|
self.git_manager.push(str(local_repo_path), "origin", all_branches=True, all_tags=True)
|
|
else:
|
|
# Standard import for non-wiki repos
|
|
self.git_manager.clone_from_bundle(str(bundle_file), str(local_repo_path))
|
|
dest_clone_url = dest_repo.get("clone_url")
|
|
auth_clone_url = self.git_manager.get_url_with_token(dest_clone_url, self.vcs_client.token)
|
|
self.git_manager.set_remote_url(str(local_repo_path), "origin", auth_clone_url)
|
|
self.git_manager.push(str(local_repo_path), "origin", all_branches=True, all_tags=True)
|
|
|
|
if is_new_repo and not is_wiki:
|
|
bundle_branches = bundle_info.get("branches", {})
|
|
default_branch = "main" if "main" in bundle_branches else "master"
|
|
if default_branch in bundle_branches:
|
|
self.vcs_client.set_default_branch(dest_repo["owner"], base_repo_name, default_branch)
|
|
else: self.logger.warning(f"Could not determine a default branch for '{base_repo_name}'. Please set it manually.")
|
|
except (GitCommandError, WikiInitializationRequiredError, Exception) as e:
|
|
self.logger.error(f"Failed to import repository '{repo_name}': {e}", exc_info=True)
|
|
if progress_callback: progress_callback(i + 1, total_repos)
|
|
raise
|
|
finally:
|
|
if local_repo_path.exists(): robust_rmtree(local_repo_path)
|
|
if progress_callback: progress_callback(i + 1, total_repos)
|
|
|
|
self.logger.info("Import process finished.") |