234 lines
6.7 KiB
Python
234 lines
6.7 KiB
Python
|
|
"""Tests for vault module."""
|
||
|
|
|
||
|
|
import time
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
|
||
|
|
from src.core.image_manager import create_sparse_image
|
||
|
|
from src.core.vault import Vault, VaultError, VaultState
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def vault_image(tmp_path: Path) -> Path:
|
||
|
|
"""Create a test vault image."""
|
||
|
|
image_path = tmp_path / "test.vault"
|
||
|
|
create_sparse_image(image_path, 10) # 10 MB
|
||
|
|
return image_path
|
||
|
|
|
||
|
|
|
||
|
|
class TestVault:
|
||
|
|
"""Tests for Vault class."""
|
||
|
|
|
||
|
|
def test_initial_state(self) -> None:
|
||
|
|
vault = Vault()
|
||
|
|
assert vault.state == VaultState.CLOSED
|
||
|
|
assert vault.is_open is False
|
||
|
|
assert vault.mount_point is None
|
||
|
|
assert vault.replica_count == 0
|
||
|
|
|
||
|
|
@pytest.mark.integration
|
||
|
|
def test_open_and_close(self, vault_image: Path) -> None:
|
||
|
|
vault = Vault()
|
||
|
|
|
||
|
|
# Open
|
||
|
|
mount = vault.open(vault_image)
|
||
|
|
assert vault.is_open
|
||
|
|
assert vault.state == VaultState.OPEN
|
||
|
|
assert vault.mount_point == mount
|
||
|
|
assert mount.exists()
|
||
|
|
|
||
|
|
# Close
|
||
|
|
vault.close()
|
||
|
|
assert vault.state == VaultState.CLOSED
|
||
|
|
assert vault.is_open is False
|
||
|
|
assert vault.mount_point is None
|
||
|
|
|
||
|
|
@pytest.mark.integration
|
||
|
|
def test_context_manager(self, vault_image: Path) -> None:
|
||
|
|
with Vault() as vault:
|
||
|
|
vault.open(vault_image)
|
||
|
|
assert vault.is_open
|
||
|
|
|
||
|
|
assert vault.state == VaultState.CLOSED
|
||
|
|
|
||
|
|
@pytest.mark.integration
|
||
|
|
def test_state_change_callback(self, vault_image: Path) -> None:
|
||
|
|
states: list[VaultState] = []
|
||
|
|
|
||
|
|
def on_state_change(state: VaultState) -> None:
|
||
|
|
states.append(state)
|
||
|
|
|
||
|
|
vault = Vault(on_state_change=on_state_change)
|
||
|
|
vault.open(vault_image)
|
||
|
|
vault.close()
|
||
|
|
|
||
|
|
assert VaultState.OPENING in states
|
||
|
|
assert VaultState.OPEN in states
|
||
|
|
assert VaultState.CLOSED in states
|
||
|
|
|
||
|
|
@pytest.mark.integration
|
||
|
|
def test_open_creates_manifest(self, vault_image: Path) -> None:
|
||
|
|
vault = Vault()
|
||
|
|
mount = vault.open(vault_image)
|
||
|
|
|
||
|
|
assert vault.manifest is not None
|
||
|
|
assert vault.manifest.vault_name == "test" # from filename
|
||
|
|
assert (mount / ".vault" / "manifest.json").exists()
|
||
|
|
|
||
|
|
vault.close()
|
||
|
|
|
||
|
|
@pytest.mark.integration
|
||
|
|
def test_open_already_open_raises(self, vault_image: Path) -> None:
|
||
|
|
vault = Vault()
|
||
|
|
vault.open(vault_image)
|
||
|
|
|
||
|
|
with pytest.raises(VaultError, match="already open"):
|
||
|
|
vault.open(vault_image)
|
||
|
|
|
||
|
|
vault.close()
|
||
|
|
|
||
|
|
@pytest.mark.integration
|
||
|
|
def test_get_replicas(self, vault_image: Path) -> None:
|
||
|
|
vault = Vault()
|
||
|
|
vault.open(vault_image)
|
||
|
|
|
||
|
|
replicas = vault.get_replicas()
|
||
|
|
assert len(replicas) == 1
|
||
|
|
assert replicas[0].is_primary is True
|
||
|
|
assert replicas[0].is_mounted is True
|
||
|
|
assert replicas[0].image_path == vault_image
|
||
|
|
|
||
|
|
vault.close()
|
||
|
|
|
||
|
|
|
||
|
|
class TestVaultLocking:
|
||
|
|
"""Tests for vault locking."""
|
||
|
|
|
||
|
|
@pytest.mark.integration
|
||
|
|
def test_vault_is_locked_when_open(self, vault_image: Path) -> None:
|
||
|
|
vault = Vault()
|
||
|
|
vault.open(vault_image)
|
||
|
|
|
||
|
|
# Lock file should exist
|
||
|
|
lock_path = vault_image.parent / f".{vault_image.stem}.lock"
|
||
|
|
assert lock_path.exists()
|
||
|
|
|
||
|
|
vault.close()
|
||
|
|
|
||
|
|
# Lock should be released
|
||
|
|
assert not lock_path.exists()
|
||
|
|
|
||
|
|
@pytest.mark.integration
|
||
|
|
def test_second_vault_cannot_open_locked(self, vault_image: Path) -> None:
|
||
|
|
vault1 = Vault()
|
||
|
|
vault1.open(vault_image)
|
||
|
|
|
||
|
|
vault2 = Vault()
|
||
|
|
with pytest.raises(VaultError, match="locked"):
|
||
|
|
vault2.open(vault_image)
|
||
|
|
|
||
|
|
vault1.close()
|
||
|
|
|
||
|
|
|
||
|
|
class TestVaultReplicas:
|
||
|
|
"""Tests for vault replica management."""
|
||
|
|
|
||
|
|
@pytest.mark.integration
|
||
|
|
def test_add_replica(self, vault_image: Path, tmp_path: Path) -> None:
|
||
|
|
vault = Vault()
|
||
|
|
vault.open(vault_image)
|
||
|
|
|
||
|
|
# Create a file in primary
|
||
|
|
(vault.mount_point / "test.txt").write_text("hello") # type: ignore
|
||
|
|
|
||
|
|
# Add replica
|
||
|
|
replica_path = tmp_path / "replica.vault"
|
||
|
|
replica_mount = vault.add_replica(replica_path)
|
||
|
|
|
||
|
|
assert vault.replica_count == 2
|
||
|
|
assert replica_path.exists()
|
||
|
|
assert (replica_mount / "test.txt").exists()
|
||
|
|
assert (replica_mount / "test.txt").read_text() == "hello"
|
||
|
|
|
||
|
|
vault.close()
|
||
|
|
|
||
|
|
@pytest.mark.integration
|
||
|
|
def test_remove_replica(self, vault_image: Path, tmp_path: Path) -> None:
|
||
|
|
vault = Vault()
|
||
|
|
vault.open(vault_image)
|
||
|
|
|
||
|
|
# Add replica
|
||
|
|
replica_path = tmp_path / "replica.vault"
|
||
|
|
vault.add_replica(replica_path)
|
||
|
|
assert vault.replica_count == 2
|
||
|
|
|
||
|
|
# Remove replica
|
||
|
|
vault.remove_replica(replica_path)
|
||
|
|
assert vault.replica_count == 1
|
||
|
|
|
||
|
|
vault.close()
|
||
|
|
|
||
|
|
@pytest.mark.integration
|
||
|
|
def test_cannot_remove_primary(self, vault_image: Path) -> None:
|
||
|
|
vault = Vault()
|
||
|
|
vault.open(vault_image)
|
||
|
|
|
||
|
|
with pytest.raises(VaultError, match="primary"):
|
||
|
|
vault.remove_replica(vault_image)
|
||
|
|
|
||
|
|
vault.close()
|
||
|
|
|
||
|
|
|
||
|
|
class TestVaultSync:
|
||
|
|
"""Tests for vault synchronization."""
|
||
|
|
|
||
|
|
@pytest.mark.integration
|
||
|
|
def test_file_propagation(self, vault_image: Path, tmp_path: Path) -> None:
|
||
|
|
vault = Vault()
|
||
|
|
vault.open(vault_image)
|
||
|
|
|
||
|
|
# Add replica
|
||
|
|
replica_path = tmp_path / "replica.vault"
|
||
|
|
replica_mount = vault.add_replica(replica_path)
|
||
|
|
|
||
|
|
# Create file in primary - should propagate to replica
|
||
|
|
(vault.mount_point / "sync_test.txt").write_text("synced content") # type: ignore
|
||
|
|
time.sleep(0.5) # Wait for sync
|
||
|
|
|
||
|
|
assert (replica_mount / "sync_test.txt").exists()
|
||
|
|
assert (replica_mount / "sync_test.txt").read_text() == "synced content"
|
||
|
|
|
||
|
|
vault.close()
|
||
|
|
|
||
|
|
@pytest.mark.integration
|
||
|
|
def test_manual_sync(self, vault_image: Path, tmp_path: Path) -> None:
|
||
|
|
vault = Vault()
|
||
|
|
vault.open(vault_image)
|
||
|
|
|
||
|
|
# Create file before adding replica
|
||
|
|
(vault.mount_point / "existing.txt").write_text("existing") # type: ignore
|
||
|
|
|
||
|
|
# Add replica (should sync during add)
|
||
|
|
replica_path = tmp_path / "replica.vault"
|
||
|
|
replica_mount = vault.add_replica(replica_path)
|
||
|
|
|
||
|
|
assert (replica_mount / "existing.txt").exists()
|
||
|
|
|
||
|
|
# Create another file
|
||
|
|
(vault.mount_point / "new.txt").write_text("new") # type: ignore
|
||
|
|
|
||
|
|
# Pause sync and modify
|
||
|
|
vault._sync_manager.pause_sync() # type: ignore
|
||
|
|
(vault.mount_point / "paused.txt").write_text("paused") # type: ignore
|
||
|
|
time.sleep(0.3)
|
||
|
|
|
||
|
|
# File shouldn't be in replica yet
|
||
|
|
# (might be there due to timing, so just trigger manual sync)
|
||
|
|
|
||
|
|
# Resume and sync manually
|
||
|
|
vault._sync_manager.resume_sync() # type: ignore
|
||
|
|
vault.sync()
|
||
|
|
|
||
|
|
vault.close()
|