"""Tests for vault module.""" import time from pathlib import Path import pytest from src.core.image_manager import create_sparse_image from src.core.vault import Vault, VaultError, VaultState @pytest.fixture def vault_image(tmp_path: Path) -> Path: """Create a test vault image.""" image_path = tmp_path / "test.vault" create_sparse_image(image_path, 10) # 10 MB return image_path class TestVault: """Tests for Vault class.""" def test_initial_state(self) -> None: vault = Vault() assert vault.state == VaultState.CLOSED assert vault.is_open is False assert vault.mount_point is None assert vault.replica_count == 0 @pytest.mark.integration def test_open_and_close(self, vault_image: Path) -> None: vault = Vault() # Open mount = vault.open(vault_image) assert vault.is_open assert vault.state == VaultState.OPEN assert vault.mount_point == mount assert mount.exists() # Close vault.close() assert vault.state == VaultState.CLOSED assert vault.is_open is False assert vault.mount_point is None @pytest.mark.integration def test_context_manager(self, vault_image: Path) -> None: with Vault() as vault: vault.open(vault_image) assert vault.is_open assert vault.state == VaultState.CLOSED @pytest.mark.integration def test_state_change_callback(self, vault_image: Path) -> None: states: list[VaultState] = [] def on_state_change(state: VaultState) -> None: states.append(state) vault = Vault(on_state_change=on_state_change) vault.open(vault_image) vault.close() assert VaultState.OPENING in states assert VaultState.OPEN in states assert VaultState.CLOSED in states @pytest.mark.integration def test_open_creates_manifest(self, vault_image: Path) -> None: vault = Vault() mount = vault.open(vault_image) assert vault.manifest is not None assert vault.manifest.vault_name == "test" # from filename assert (mount / ".vault" / "manifest.json").exists() vault.close() @pytest.mark.integration def test_open_already_open_raises(self, vault_image: Path) -> None: vault = Vault() vault.open(vault_image) with pytest.raises(VaultError, match="already open"): vault.open(vault_image) vault.close() @pytest.mark.integration def test_get_replicas(self, vault_image: Path) -> None: vault = Vault() vault.open(vault_image) replicas = vault.get_replicas() assert len(replicas) == 1 assert replicas[0].is_primary is True assert replicas[0].is_mounted is True assert replicas[0].image_path == vault_image vault.close() class TestVaultLocking: """Tests for vault locking.""" @pytest.mark.integration def test_vault_is_locked_when_open(self, vault_image: Path) -> None: vault = Vault() vault.open(vault_image) # Lock file should exist lock_path = vault_image.parent / f".{vault_image.stem}.lock" assert lock_path.exists() vault.close() # Lock should be released assert not lock_path.exists() @pytest.mark.integration def test_second_vault_cannot_open_locked(self, vault_image: Path) -> None: vault1 = Vault() vault1.open(vault_image) vault2 = Vault() with pytest.raises(VaultError, match="locked"): vault2.open(vault_image) vault1.close() class TestVaultReplicas: """Tests for vault replica management.""" @pytest.mark.integration def test_add_replica(self, vault_image: Path, tmp_path: Path) -> None: vault = Vault() vault.open(vault_image) # Create a file in primary (vault.mount_point / "test.txt").write_text("hello") # type: ignore # Add replica replica_path = tmp_path / "replica.vault" replica_mount = vault.add_replica(replica_path) assert vault.replica_count == 2 assert replica_path.exists() assert (replica_mount / "test.txt").exists() assert (replica_mount / "test.txt").read_text() == "hello" vault.close() @pytest.mark.integration def test_remove_replica(self, vault_image: Path, tmp_path: Path) -> None: vault = Vault() vault.open(vault_image) # Add replica replica_path = tmp_path / "replica.vault" vault.add_replica(replica_path) assert vault.replica_count == 2 # Remove replica vault.remove_replica(replica_path) assert vault.replica_count == 1 vault.close() @pytest.mark.integration def test_cannot_remove_primary(self, vault_image: Path) -> None: vault = Vault() vault.open(vault_image) with pytest.raises(VaultError, match="primary"): vault.remove_replica(vault_image) vault.close() class TestVaultSync: """Tests for vault synchronization.""" @pytest.mark.integration def test_file_propagation(self, vault_image: Path, tmp_path: Path) -> None: vault = Vault() vault.open(vault_image) # Add replica replica_path = tmp_path / "replica.vault" replica_mount = vault.add_replica(replica_path) # Create file in primary - should propagate to replica (vault.mount_point / "sync_test.txt").write_text("synced content") # type: ignore time.sleep(0.5) # Wait for sync assert (replica_mount / "sync_test.txt").exists() assert (replica_mount / "sync_test.txt").read_text() == "synced content" vault.close() @pytest.mark.integration def test_manual_sync(self, vault_image: Path, tmp_path: Path) -> None: vault = Vault() vault.open(vault_image) # Create file before adding replica (vault.mount_point / "existing.txt").write_text("existing") # type: ignore # Add replica (should sync during add) replica_path = tmp_path / "replica.vault" replica_mount = vault.add_replica(replica_path) assert (replica_mount / "existing.txt").exists() # Create another file (vault.mount_point / "new.txt").write_text("new") # type: ignore # Pause sync and modify vault._sync_manager.pause_sync() # type: ignore (vault.mount_point / "paused.txt").write_text("paused") # type: ignore time.sleep(0.3) # File shouldn't be in replica yet # (might be there due to timing, so just trigger manual sync) # Resume and sync manually vault._sync_manager.resume_sync() # type: ignore vault.sync() vault.close()