SXXXXXXX_RepoSync/reposync/core/sync_manager.py
VALLONGOL dacc220eeb fix export wiki
add instruction for import wiki
update readme and manual
2025-07-11 07:33:07 +02:00

267 lines
14 KiB
Python

# RepoSync/core/sync_manager.py
import datetime
import json
import logging
import os
import shutil
import stat
import time
from enum import Enum, auto
from pathlib import Path
from typing import List, Dict, Any, Optional, Callable
from .base_vcs_client import BaseVCSClient
from .git_manager import GitManager, GitCommandError
def robust_rmtree(path, max_retries=3, delay=0.1):
for i in range(max_retries):
try:
shutil.rmtree(path, onerror=_remove_readonly_onerror)
return
except PermissionError as e:
if "used by another process" in str(e) and i < max_retries - 1:
time.sleep(delay)
else:
raise
except FileNotFoundError:
return
def _remove_readonly_onerror(func, path, exc_info):
if not os.access(path, os.W_OK):
os.chmod(path, stat.S_IWRITE)
func(path)
else:
raise exc_info[1]
class WikiInitializationRequiredError(Exception):
"""Custom exception raised when a wiki requires manual initialization."""
def __init__(self, message, repo_name):
super().__init__(message)
self.repo_name = repo_name
class SyncState(Enum):
IDENTICAL = auto()
AHEAD = auto()
BEHIND = auto()
DIVERGED = auto()
NEW_REPO = auto()
ORPHANED_BUNDLE = auto()
ERROR = auto()
class SyncManager:
def __init__(self, git_manager: GitManager, vcs_client: BaseVCSClient, sync_bundle_path: Path, logger: logging.Logger):
self.git_manager, self.vcs_client, self.sync_bundle_path, self.logger = git_manager, vcs_client, sync_bundle_path, logger
self.temp_clone_path, self.manifest_path = self.sync_bundle_path / "temp_clones", self.sync_bundle_path / "manifest.json"
self.logger.debug("SyncManager for offline sync initialized.")
# ... (le altre funzioni rimangono invariate) ...
def _load_manifest(self) -> Dict[str, Any]:
if not self.manifest_path.exists(): return {"repositories": {}}
try:
with open(self.manifest_path, "r", encoding="utf-8") as f: return json.load(f)
except (json.JSONDecodeError, IOError) as e:
self.logger.error(f"Could not read or parse manifest file, starting fresh: {e}")
return {"repositories": {}}
def _write_manifest(self, manifest_data: Dict[str, Any]):
try:
with open(self.manifest_path, "w", encoding="utf-8") as f: json.dump(manifest_data, f, indent=4)
self.logger.info(f"Manifest file written successfully to: {self.manifest_path}")
except IOError as e:
self.logger.error(f"Failed to write manifest file: {e}")
raise
def export_repositories_to_bundles(self, source_repos: List[Dict], progress_callback: Optional[Callable[[int, int, int], None]] = None):
self.logger.info(f"Starting export of {len(source_repos)} items...")
self.sync_bundle_path.mkdir(exist_ok=True)
self.temp_clone_path.mkdir(exist_ok=True)
full_manifest = self._load_manifest()
manifest_repos = full_manifest.get("repositories", {})
total_repos, successes = len(source_repos), 0
for i, repo in enumerate(source_repos):
repo_name, local_repo_path = repo["name"], self.temp_clone_path / repo["name"]
bundle_file_name = repo["name"].replace(" (Wiki)", ".wiki") + ".bundle"
bundle_file_path = self.sync_bundle_path / bundle_file_name
bundle_size_bytes, is_wiki = 0, repo.get("is_wiki", False)
self.logger.info(f"--- Processing '{repo_name}' ({i+1}/{total_repos}) ---")
try:
if local_repo_path.exists(): robust_rmtree(local_repo_path)
self.git_manager.clone(remote_url=repo["clone_url"], local_path=str(local_repo_path), token=self.vcs_client.token)
self.git_manager.create_git_bundle(str(local_repo_path), str(bundle_file_path))
if bundle_file_path.exists(): bundle_size_bytes = bundle_file_path.stat().st_size
manifest_repos[repo_name] = {
"bundle_file": bundle_file_path.name,
"last_update": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"branches": self.git_manager.get_branches_with_heads(str(local_repo_path)),
"description": repo.get("description", ""), "private": repo.get("private", True),
"owner": repo.get("owner"), "clone_url": repo.get("clone_url")
}
successes += 1
except GitCommandError as e:
if is_wiki and ("Could not read from remote repository" in str(e) or "does not appear to be a git repository" in str(e)):
self.logger.warning(f"Skipping empty or uninitialized wiki for '{repo_name}'.")
else: self.logger.error(f"Failed to process repository '{repo_name}': {e}")
except Exception as e:
self.logger.error(f"An unexpected error occurred while processing '{repo_name}': {e}", exc_info=True)
finally:
if local_repo_path.exists(): robust_rmtree(local_repo_path)
if progress_callback: progress_callback(i + 1, total_repos, bundle_size_bytes)
if successes > 0:
full_manifest["repositories"].update(manifest_repos)
full_manifest["export_timestamp"] = datetime.datetime.now(datetime.timezone.utc).isoformat()
full_manifest["source_server_url"] = self.vcs_client.api_url
self._write_manifest(full_manifest)
else:
self.logger.warning("Export process finished with no successful exports. Manifest was not updated.")
self.logger.info("Export process finished.")
def compare_server_with_manifest(self, server_repos: List[Dict]) -> List[Dict[str, Any]]:
self.logger.info("Comparing server state with local manifest...")
manifest = self._load_manifest()
manifest_repos, results = manifest.get("repositories", {}), []
server_repo_names, manifest_repo_names = {r['name'] for r in server_repos}, set(manifest_repos.keys())
for repo in server_repos:
repo_name = repo['name']
state = SyncState.NEW_REPO
if repo_name in manifest_repos:
manifest_branches = manifest_repos[repo_name].get("branches", {})
remote_branches = self.vcs_client.get_repository_branches(repo['owner'], repo_name)
state = SyncState.AHEAD if manifest_branches != remote_branches else SyncState.IDENTICAL
repo['state'] = state
results.append(repo)
for name in manifest_repo_names - server_repo_names:
self.logger.warning(f"Found orphaned bundle: '{name}' is in the manifest but not on the server.")
results.append({
"name": name, "state": SyncState.ORPHANED_BUNDLE,
"description": manifest_repos[name].get("description", "Orphaned bundle."),
"is_wiki": " (Wiki)" in name, "size_kb": 0
})
return results
def compare_bundles_with_remote(self) -> List[Dict[str, Any]]:
self.logger.info("Comparing local bundles with remote server state...")
manifest = self._load_manifest()
if not manifest.get("repositories"):
self.logger.warning("Manifest is empty. Cannot compare."); return []
comparison_results, manifest_repos = [], manifest.get("repositories", {})
for repo_name, bundle_info in manifest_repos.items():
self.logger.debug(f"Comparing state for '{repo_name}'...")
bundle_file = self.sync_bundle_path / bundle_info.get("bundle_file", "")
bundle_info["bundle_size_bytes"] = bundle_file.stat().st_size if bundle_file.exists() else 0
remote_repo = self.vcs_client.get_repository(repo_name)
# For wikis, if the base repo exists but has_wiki is false, it's effectively a NEW_REPO
if remote_repo and " (Wiki)" in repo_name and not remote_repo.get("clone_url"):
remote_repo = None # Treat as non-existent for sync state purposes
if not remote_repo:
comparison_results.append({"name": repo_name, "state": SyncState.NEW_REPO, "bundle_info": bundle_info})
continue
remote_owner = remote_repo.get("owner")
if not remote_owner:
comparison_results.append({"name": repo_name, "state": SyncState.ERROR, "details": "Could not determine remote owner.", "bundle_info": bundle_info})
continue
remote_branches = self.vcs_client.get_repository_branches(remote_owner, repo_name)
bundle_branches = bundle_info.get("branches", {})
all_branches = set(bundle_branches.keys()) | set(remote_branches.keys())
is_identical, is_ahead, is_behind = True, False, False
for branch in all_branches:
bundle_hash, remote_hash = bundle_branches.get(branch), remote_branches.get(branch)
if bundle_hash != remote_hash:
is_identical = False
if bundle_hash and not remote_hash: is_behind = True
elif not bundle_hash and remote_hash: is_ahead = True
else: is_ahead, is_behind = True, True
state = SyncState.IDENTICAL
if not is_identical:
if is_ahead and not is_behind: state = SyncState.AHEAD
elif not is_ahead and is_behind: state = SyncState.BEHIND
else: state = SyncState.DIVERGED
comparison_results.append({"name": repo_name, "state": state, "bundle_info": bundle_info})
return comparison_results
def import_repositories_from_bundles(self, repos_to_import: List[Dict], progress_callback: Optional[Callable[[int, int], None]] = None):
self.logger.info(f"Starting import of {len(repos_to_import)} selected repositories...")
self.temp_clone_path.mkdir(exist_ok=True)
total_repos = len(repos_to_import)
for i, repo_info in enumerate(repos_to_import):
repo_name, bundle_info = repo_info["name"], repo_info["bundle_info"]
bundle_file = self.sync_bundle_path / bundle_info["bundle_file"]
local_repo_path = self.temp_clone_path / repo_name
is_wiki = " (Wiki)" in repo_name
if not bundle_file.exists():
self.logger.error(f"Bundle file {bundle_file} for '{repo_name}' not found. Skipping.")
if progress_callback: progress_callback(i + 1, total_repos)
continue
self.logger.info(f"--- Importing '{repo_name}' ({i+1}/{total_repos}) ---")
try:
if local_repo_path.exists(): robust_rmtree(local_repo_path)
base_repo_name = repo_name.removesuffix(" (Wiki)").strip()
dest_repo = self.vcs_client.get_repository(base_repo_name)
is_new_repo = not dest_repo
if is_new_repo:
self.logger.info(f"Base repository '{base_repo_name}' not found, creating...")
dest_repo = self.vcs_client.create_repository(name=base_repo_name, description=bundle_info.get("description", ""), private=bundle_info.get("private", True))
if is_wiki:
# Check if the remote wiki is already initialized
remote_wiki_exists = self.git_manager.check_remote_exists(
self.vcs_client.get_repository(repo_name)["clone_url"],
self.vcs_client.token
)
if not remote_wiki_exists:
# Attempt to enable the wiki via API, but prepare for failure
self.vcs_client.enable_wiki(dest_repo["owner"], base_repo_name)
# Raise the specific error to be caught by the UI
raise WikiInitializationRequiredError(
"The remote wiki repository must be initialized manually.",
repo_name=repo_name
)
# If we get here, it means the wiki already exists, so proceed with normal sync
self.git_manager.clone_from_bundle(str(bundle_file), str(local_repo_path))
dest_clone_url = self.vcs_client.get_repository(repo_name)["clone_url"]
auth_clone_url = self.git_manager.get_url_with_token(dest_clone_url, self.vcs_client.token)
self.git_manager.set_remote_url(str(local_repo_path), "origin", auth_clone_url)
self.git_manager.push(str(local_repo_path), "origin", all_branches=True, all_tags=True)
else:
# Standard import for non-wiki repos
self.git_manager.clone_from_bundle(str(bundle_file), str(local_repo_path))
dest_clone_url = dest_repo.get("clone_url")
auth_clone_url = self.git_manager.get_url_with_token(dest_clone_url, self.vcs_client.token)
self.git_manager.set_remote_url(str(local_repo_path), "origin", auth_clone_url)
self.git_manager.push(str(local_repo_path), "origin", all_branches=True, all_tags=True)
if is_new_repo and not is_wiki:
bundle_branches = bundle_info.get("branches", {})
default_branch = "main" if "main" in bundle_branches else "master"
if default_branch in bundle_branches:
self.vcs_client.set_default_branch(dest_repo["owner"], base_repo_name, default_branch)
else: self.logger.warning(f"Could not determine a default branch for '{base_repo_name}'. Please set it manually.")
except (GitCommandError, WikiInitializationRequiredError, Exception) as e:
self.logger.error(f"Failed to import repository '{repo_name}': {e}", exc_info=True)
if progress_callback: progress_callback(i + 1, total_repos)
raise
finally:
if local_repo_path.exists(): robust_rmtree(local_repo_path)
if progress_callback: progress_callback(i + 1, total_repos)
self.logger.info("Import process finished.")