SXXXXXXX_RepoSync/reposync/core/sync_manager.py
2025-07-07 10:11:56 +02:00

262 lines
11 KiB
Python

# RepoSync/core/sync_manager.py
"""
Orchestrates the offline synchronization process using Git bundles and a manifest file.
"""
import datetime
import json
import logging
import os
import shutil
import stat
from enum import Enum, auto
from pathlib import Path
from typing import List, Dict, Any, Optional, Callable
from .base_vcs_client import BaseVCSClient
from .git_manager import GitManager, GitCommandError
def _remove_readonly(func, path, exc_info):
"""
Error handler for shutil.rmtree to handle read-only files on Windows.
"""
if not os.access(path, os.W_OK):
os.chmod(path, stat.S_IWRITE)
func(path)
else:
raise
class SyncState(Enum):
"""Enumeration for the synchronization status of a repository."""
IDENTICAL = auto()
AHEAD = auto()
BEHIND = auto()
DIVERGED = auto()
NEW_REPO = auto()
ORPHANED_BUNDLE = auto()
ERROR = auto()
class SyncManager:
"""
Manages two-phase synchronization using a manifest for state comparison.
"""
def __init__(
self,
git_manager: GitManager,
vcs_client: BaseVCSClient,
sync_bundle_path: Path,
logger: logging.Logger,
):
self.git_manager = git_manager
self.vcs_client = vcs_client
self.sync_bundle_path = sync_bundle_path
self.logger = logger
self.temp_clone_path = self.sync_bundle_path / "temp_clones"
self.manifest_path = self.sync_bundle_path / "manifest.json"
self.logger.debug("SyncManager for offline sync initialized.")
def _write_manifest(self, manifest_data: Dict[str, Any]):
"""Writes the manifest data to a JSON file."""
self.logger.info(f"Writing manifest file to: {self.manifest_path}")
try:
with open(self.manifest_path, "w", encoding="utf-8") as f:
json.dump(manifest_data, f, indent=4)
self.logger.info("Manifest file created successfully.")
except IOError as e:
self.logger.error(f"Failed to write manifest file: {e}")
raise
def export_repositories_to_bundles(
self,
source_repos: List[Dict],
progress_callback: Optional[Callable[[int, int, int], None]] = None
):
"""
Exports repositories to .bundle files and creates a manifest.json.
Args:
source_repos: A list of repository dictionaries from the source VCS.
progress_callback: A function to call after each repo is processed.
It receives (current_index, total_repos, bundle_size_bytes).
"""
self.logger.info(f"Starting export of {len(source_repos)} repositories...")
self.sync_bundle_path.mkdir(exist_ok=True)
self.temp_clone_path.mkdir(exist_ok=True)
manifest_repos = {}
total_repos = len(source_repos)
for i, repo in enumerate(source_repos):
repo_name = repo["name"]
local_repo_path = self.temp_clone_path / repo_name
bundle_file_path = self.sync_bundle_path / f"{repo_name}.bundle"
bundle_size_bytes = 0
self.logger.info(f"--- Exporting '{repo_name}' ({i+1}/{total_repos}) ---")
try:
if local_repo_path.exists():
shutil.rmtree(local_repo_path, onerror=_remove_readonly)
self.git_manager.clone(
remote_url=repo["clone_url"],
local_path=str(local_repo_path),
token=self.vcs_client.token
)
self.git_manager.create_git_bundle(str(local_repo_path), str(bundle_file_path))
if bundle_file_path.exists():
bundle_size_bytes = bundle_file_path.stat().st_size
branches = self.git_manager.get_branches_with_heads(str(local_repo_path))
manifest_repos[repo_name] = {
"bundle_file": bundle_file_path.name,
"last_update": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"branches": branches,
"description": repo.get("description", ""),
"private": repo.get("private", True),
"owner": repo.get("owner"),
"clone_url": repo.get("clone_url")
}
except (GitCommandError, Exception) as e:
self.logger.error(f"Failed to export repository '{repo_name}': {e}", exc_info=True)
finally:
if local_repo_path.exists():
shutil.rmtree(local_repo_path, onerror=_remove_readonly)
# Call the progress callback regardless of success or failure
if progress_callback:
progress_callback(i + 1, total_repos, bundle_size_bytes)
full_manifest = {
"export_timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"source_server_url": self.vcs_client.api_url,
"repositories": manifest_repos,
}
self._write_manifest(full_manifest)
self.logger.info("Export process finished.")
def compare_bundles_with_remote(self) -> List[Dict[str, Any]]:
"""Compares bundles in the sync path with the remote VCS using the manifest."""
self.logger.info("Comparing local bundles with remote server state...")
if not self.manifest_path.exists():
self.logger.warning("manifest.json not found. Cannot compare state.")
return []
with open(self.manifest_path, "r", encoding="utf-8") as f:
manifest = json.load(f)
comparison_results = []
manifest_repos = manifest.get("repositories", {})
for repo_name, bundle_info in manifest_repos.items():
self.logger.debug(f"Comparing state for '{repo_name}'...")
owner = bundle_info.get("owner")
if not owner:
self.logger.error(f"Missing owner for '{repo_name}' in manifest. Skipping.")
comparison_results.append({"name": repo_name, "state": SyncState.ERROR, "details": "Missing owner."})
continue
remote_repo = self.vcs_client.get_repository(repo_name)
remote_branches = self.vcs_client.get_repository_branches(owner, repo_name)
bundle_branches = bundle_info.get("branches", {})
if not remote_repo:
comparison_results.append({"name": repo_name, "state": SyncState.NEW_REPO, "bundle_info": bundle_info})
continue
all_branches = set(bundle_branches.keys()) | set(remote_branches.keys())
is_identical = True
is_ahead = False
is_behind = False
for branch in all_branches:
bundle_hash = bundle_branches.get(branch)
remote_hash = remote_branches.get(branch)
if bundle_hash != remote_hash:
is_identical = False
if bundle_hash and not remote_hash:
is_ahead = True
elif not bundle_hash and remote_hash:
is_behind = True
else:
is_ahead = True
is_behind = True
state = SyncState.IDENTICAL
if not is_identical:
if is_ahead and not is_behind:
state = SyncState.AHEAD
elif not is_ahead and is_behind:
state = SyncState.BEHIND
else:
state = SyncState.DIVERGED
comparison_results.append({"name": repo_name, "state": state, "bundle_info": bundle_info})
return comparison_results
def import_repositories_from_bundles(self, repos_to_import: List[Dict]):
"""Imports or updates selected repositories from their bundle files."""
# This function could also be updated with a progress callback if needed.
# For now, we focus on the export part.
self.logger.info(f"Starting import of {len(repos_to_import)} selected repositories...")
self.temp_clone_path.mkdir(exist_ok=True)
for repo_info in repos_to_import:
repo_name = repo_info["name"]
bundle_info = repo_info["bundle_info"]
bundle_file = self.sync_bundle_path / bundle_info["bundle_file"]
local_repo_path = self.temp_clone_path / repo_name
if not bundle_file.exists():
self.logger.error(f"Bundle file {bundle_file} for '{repo_name}' not found. Skipping.")
continue
self.logger.info(f"--- Importing '{repo_name}' from bundle ---")
try:
dest_repo = self.vcs_client.get_repository(repo_name)
if not dest_repo:
self.logger.info(f"'{repo_name}' not found on destination, creating...")
dest_repo = self.vcs_client.create_repository(
name=repo_name,
description=bundle_info.get("description", ""),
private=bundle_info.get("private", True)
)
if local_repo_path.exists():
shutil.rmtree(local_repo_path, onerror=_remove_readonly)
clone_url = dest_repo.get("clone_url")
if not clone_url:
owner = dest_repo.get('owner') or bundle_info.get('owner')
clone_url = f"{self.vcs_client.api_url}/{owner}/{repo_name}.git"
self.logger.warning(f"Could not find clone_url, constructed manually: {clone_url}")
self.logger.info(f"Cloning '{repo_name}' from destination to set up remote...")
self.git_manager.clone(
remote_url=clone_url,
local_path=str(local_repo_path),
token=self.vcs_client.token
)
self.logger.info(f"Fetching updates from bundle for '{repo_name}'...")
self.git_manager.fetch(repo_path=str(local_repo_path), remote=str(bundle_file))
self.logger.info(f"Pushing updates to destination for '{repo_name}'...")
self.git_manager.push(str(local_repo_path), "origin", all_branches=True, all_tags=True)
except (GitCommandError, Exception) as e:
self.logger.error(f"Failed to import repository '{repo_name}': {e}", exc_info=True)
finally:
if local_repo_path.exists():
shutil.rmtree(local_repo_path, onerror=_remove_readonly)
self.logger.info("Import process finished.")