Files
Vault/tests/test_sync_manager.py

392 lines
13 KiB
Python
Raw Permalink Normal View History

2026-01-30 07:07:11 +01:00
"""Tests for sync_manager module."""
import time
from pathlib import Path
from threading import Event
import pytest
from src.core.file_watcher import EventType
from src.core.manifest import Manifest
from src.core.sync_manager import (
ReplicaMount,
SyncEvent,
SyncManager,
SyncStatus,
)
class TestReplicaMount:
"""Tests for ReplicaMount dataclass."""
def test_get_file_path(self, tmp_path: Path) -> None:
replica = ReplicaMount(
mount_point=tmp_path / "mount",
image_path=tmp_path / "vault.vault",
is_primary=False,
)
assert replica.get_file_path("docs/file.txt") == tmp_path / "mount" / "docs" / "file.txt"
class TestSyncManager:
"""Tests for SyncManager class."""
def test_initial_state(self) -> None:
manager = SyncManager()
assert manager.status == SyncStatus.IDLE
assert manager.replica_count == 0
assert manager.primary_mount is None
def test_add_replica(self, tmp_path: Path) -> None:
manager = SyncManager()
mount = tmp_path / "mount"
mount.mkdir()
image = tmp_path / "vault.vault"
manager.add_replica(mount, image, is_primary=False)
assert manager.replica_count == 1
assert manager.primary_mount is None
def test_add_primary_replica(self, tmp_path: Path) -> None:
manager = SyncManager()
mount = tmp_path / "mount"
mount.mkdir()
image = tmp_path / "vault.vault"
manager.add_replica(mount, image, is_primary=True)
assert manager.replica_count == 1
assert manager.primary_mount == mount
def test_remove_replica(self, tmp_path: Path) -> None:
manager = SyncManager()
mount = tmp_path / "mount"
mount.mkdir()
image = tmp_path / "vault.vault"
manager.add_replica(mount, image)
assert manager.replica_count == 1
result = manager.remove_replica(mount)
assert result is True
assert manager.replica_count == 0
def test_remove_nonexistent_replica(self, tmp_path: Path) -> None:
manager = SyncManager()
result = manager.remove_replica(tmp_path / "nonexistent")
assert result is False
def test_start_watching_without_primary_raises(self) -> None:
manager = SyncManager()
with pytest.raises(ValueError, match="No primary replica"):
manager.start_watching()
def test_start_and_stop_watching(self, tmp_path: Path) -> None:
manager = SyncManager()
mount = tmp_path / "mount"
mount.mkdir()
image = tmp_path / "vault.vault"
manager.add_replica(mount, image, is_primary=True)
manager.start_watching()
manager.stop_watching()
def test_pause_and_resume_sync(self, tmp_path: Path) -> None:
events: list[SyncEvent] = []
manager = SyncManager(on_sync_event=events.append)
primary = tmp_path / "primary"
secondary = tmp_path / "secondary"
primary.mkdir()
secondary.mkdir()
manager.add_replica(primary, tmp_path / "primary.vault", is_primary=True)
manager.add_replica(secondary, tmp_path / "secondary.vault")
manager.start_watching()
# Pause sync
manager.pause_sync()
# Create file while paused
(primary / "paused.txt").write_text("created while paused")
time.sleep(0.3)
# No events should be recorded
assert len(events) == 0
assert not (secondary / "paused.txt").exists()
manager.stop_watching()
class TestSyncManagerPropagation:
"""Tests for file propagation in SyncManager."""
def test_propagate_file_creation(self, tmp_path: Path) -> None:
events: list[SyncEvent] = []
event_received = Event()
def on_event(event: SyncEvent) -> None:
events.append(event)
event_received.set()
manager = SyncManager(on_sync_event=on_event)
primary = tmp_path / "primary"
secondary = tmp_path / "secondary"
primary.mkdir()
secondary.mkdir()
manager.add_replica(primary, tmp_path / "primary.vault", is_primary=True)
manager.add_replica(secondary, tmp_path / "secondary.vault")
manager.start_watching()
# Create file in primary
(primary / "test.txt").write_text("hello")
# Wait for sync
event_received.wait(timeout=2.0)
manager.stop_watching()
# Check file was synced
assert (secondary / "test.txt").exists()
assert (secondary / "test.txt").read_text() == "hello"
# Check event
created_events = [e for e in events if e.event_type == EventType.CREATED]
assert len(created_events) >= 1
def test_propagate_file_deletion(self, tmp_path: Path) -> None:
events: list[SyncEvent] = []
event_received = Event()
def on_event(event: SyncEvent) -> None:
events.append(event)
if event.event_type == EventType.DELETED:
event_received.set()
manager = SyncManager(on_sync_event=on_event)
primary = tmp_path / "primary"
secondary = tmp_path / "secondary"
primary.mkdir()
secondary.mkdir()
# Create file in both
(primary / "delete.txt").write_text("to delete")
(secondary / "delete.txt").write_text("to delete")
manager.add_replica(primary, tmp_path / "primary.vault", is_primary=True)
manager.add_replica(secondary, tmp_path / "secondary.vault")
manager.start_watching()
# Delete file in primary
(primary / "delete.txt").unlink()
# Wait for sync
event_received.wait(timeout=2.0)
manager.stop_watching()
# Check file was deleted in secondary
assert not (secondary / "delete.txt").exists()
def test_propagate_file_move(self, tmp_path: Path) -> None:
events: list[SyncEvent] = []
event_received = Event()
def on_event(event: SyncEvent) -> None:
events.append(event)
if event.event_type == EventType.MOVED:
event_received.set()
manager = SyncManager(on_sync_event=on_event)
primary = tmp_path / "primary"
secondary = tmp_path / "secondary"
primary.mkdir()
secondary.mkdir()
# Create file in both
(primary / "old.txt").write_text("content")
(secondary / "old.txt").write_text("content")
manager.add_replica(primary, tmp_path / "primary.vault", is_primary=True)
manager.add_replica(secondary, tmp_path / "secondary.vault")
manager.start_watching()
# Move file in primary
(primary / "old.txt").rename(primary / "new.txt")
# Wait for sync
event_received.wait(timeout=2.0)
manager.stop_watching()
# Check file was moved in secondary
assert not (secondary / "old.txt").exists()
assert (secondary / "new.txt").exists()
def test_propagate_to_multiple_replicas(self, tmp_path: Path) -> None:
events: list[SyncEvent] = []
event_received = Event()
def on_event(event: SyncEvent) -> None:
events.append(event)
event_received.set()
manager = SyncManager(on_sync_event=on_event)
primary = tmp_path / "primary"
secondary1 = tmp_path / "secondary1"
secondary2 = tmp_path / "secondary2"
primary.mkdir()
secondary1.mkdir()
secondary2.mkdir()
manager.add_replica(primary, tmp_path / "primary.vault", is_primary=True)
manager.add_replica(secondary1, tmp_path / "secondary1.vault")
manager.add_replica(secondary2, tmp_path / "secondary2.vault")
manager.start_watching()
# Create file in primary
(primary / "multi.txt").write_text("multi content")
# Wait for sync
event_received.wait(timeout=2.0)
time.sleep(0.2) # Extra time for all replicas
manager.stop_watching()
# Check file was synced to all secondaries
assert (secondary1 / "multi.txt").exists()
assert (secondary2 / "multi.txt").exists()
class TestSyncManagerManifestSync:
"""Tests for manifest-based synchronization."""
def test_sync_from_manifest_new_file(self, tmp_path: Path) -> None:
manager = SyncManager()
primary = tmp_path / "primary"
secondary = tmp_path / "secondary"
primary.mkdir()
secondary.mkdir()
(primary / ".vault").mkdir()
(secondary / ".vault").mkdir()
# Create file in primary
(primary / "newfile.txt").write_text("new content")
manager.add_replica(primary, tmp_path / "primary.vault", is_primary=True)
manager.add_replica(secondary, tmp_path / "secondary.vault")
# Create manifests
source_manifest = Manifest.create_new("Test", 100, str(tmp_path / "primary.vault"))
source_manifest.add_file_from_path(primary, primary / "newfile.txt")
target_manifest = Manifest.create_new("Test", 100, str(tmp_path / "secondary.vault"))
# Sync
synced = manager.sync_from_manifest(source_manifest, secondary, target_manifest)
assert synced == 1
assert (secondary / "newfile.txt").exists()
assert (secondary / "newfile.txt").read_text() == "new content"
def test_sync_from_manifest_newer_source(self, tmp_path: Path) -> None:
manager = SyncManager()
primary = tmp_path / "primary"
secondary = tmp_path / "secondary"
primary.mkdir()
secondary.mkdir()
(primary / ".vault").mkdir()
(secondary / ".vault").mkdir()
# Create file in both with different content
(secondary / "update.txt").write_text("old content")
time.sleep(0.1)
(primary / "update.txt").write_text("new content")
manager.add_replica(primary, tmp_path / "primary.vault", is_primary=True)
manager.add_replica(secondary, tmp_path / "secondary.vault")
# Create manifests
source_manifest = Manifest.create_new("Test", 100, str(tmp_path / "primary.vault"))
source_manifest.add_file_from_path(primary, primary / "update.txt")
target_manifest = Manifest.create_new("Test", 100, str(tmp_path / "secondary.vault"))
target_manifest.add_file_from_path(secondary, secondary / "update.txt")
# Sync
synced = manager.sync_from_manifest(source_manifest, secondary, target_manifest)
assert synced == 1
assert (secondary / "update.txt").read_text() == "new content"
def test_sync_from_manifest_deleted_file(self, tmp_path: Path) -> None:
manager = SyncManager()
primary = tmp_path / "primary"
secondary = tmp_path / "secondary"
primary.mkdir()
secondary.mkdir()
(primary / ".vault").mkdir()
(secondary / ".vault").mkdir()
# Create file only in secondary (simulating deletion in primary)
(secondary / "deleted.txt").write_text("will be deleted")
manager.add_replica(primary, tmp_path / "primary.vault", is_primary=True)
manager.add_replica(secondary, tmp_path / "secondary.vault")
# Create manifests - source has no files, target has one
source_manifest = Manifest.create_new("Test", 100, str(tmp_path / "primary.vault"))
target_manifest = Manifest.create_new("Test", 100, str(tmp_path / "secondary.vault"))
target_manifest.add_file_from_path(secondary, secondary / "deleted.txt")
# Sync
synced = manager.sync_from_manifest(source_manifest, secondary, target_manifest)
assert synced == 1
assert not (secondary / "deleted.txt").exists()
def test_full_sync(self, tmp_path: Path) -> None:
manager = SyncManager()
primary = tmp_path / "primary"
secondary1 = tmp_path / "secondary1"
secondary2 = tmp_path / "secondary2"
primary.mkdir()
secondary1.mkdir()
secondary2.mkdir()
(primary / ".vault").mkdir()
(secondary1 / ".vault").mkdir()
(secondary2 / ".vault").mkdir()
# Create files in primary
(primary / "file1.txt").write_text("content1")
(primary / "file2.txt").write_text("content2")
manager.add_replica(primary, tmp_path / "primary.vault", is_primary=True)
manager.add_replica(secondary1, tmp_path / "secondary1.vault")
manager.add_replica(secondary2, tmp_path / "secondary2.vault")
# Create and save primary manifest
primary_manifest = Manifest.create_new("Test", 100, str(tmp_path / "primary.vault"))
primary_manifest.add_file_from_path(primary, primary / "file1.txt")
primary_manifest.add_file_from_path(primary, primary / "file2.txt")
primary_manifest.save(primary)
# Create empty manifests for secondaries
Manifest.create_new("Test", 100, str(tmp_path / "secondary1.vault")).save(secondary1)
Manifest.create_new("Test", 100, str(tmp_path / "secondary2.vault")).save(secondary2)
# Full sync
results = manager.full_sync()
assert results[secondary1] == 2
assert results[secondary2] == 2
assert (secondary1 / "file1.txt").read_text() == "content1"
assert (secondary2 / "file2.txt").read_text() == "content2"