Files
Vault/tests/test_lock.py

183 lines
6.0 KiB
Python

"""Tests for VaultLock."""
import multiprocessing
import os
import tempfile
import time
from pathlib import Path
import pytest
from src.core.lock import VaultLock, VaultLockError
class TestVaultLock:
"""Tests for VaultLock class."""
def test_acquire_and_release(self) -> None:
"""Test basic lock acquire and release."""
with tempfile.TemporaryDirectory() as tmpdir:
lock_path = Path(tmpdir) / ".vault" / "lock"
lock = VaultLock(lock_path)
assert lock.acquire()
assert lock_path.exists()
lock.release()
def test_lock_creates_directory(self) -> None:
"""Test that lock creates parent directory if needed."""
with tempfile.TemporaryDirectory() as tmpdir:
lock_path = Path(tmpdir) / "nested" / "dir" / "lock"
lock = VaultLock(lock_path)
assert lock.acquire()
assert lock_path.parent.exists()
lock.release()
def test_lock_writes_pid(self) -> None:
"""Test that lock file contains PID."""
with tempfile.TemporaryDirectory() as tmpdir:
lock_path = Path(tmpdir) / "lock"
lock = VaultLock(lock_path)
lock.acquire()
pid = lock.get_owner_pid()
lock.release()
assert pid == os.getpid()
def test_release_removes_lock_file(self) -> None:
"""Test that release removes lock file."""
with tempfile.TemporaryDirectory() as tmpdir:
lock_path = Path(tmpdir) / "lock"
lock = VaultLock(lock_path)
lock.acquire()
lock.release()
assert not lock_path.exists()
def test_release_safe_when_not_locked(self) -> None:
"""Test that release is safe to call when not locked."""
with tempfile.TemporaryDirectory() as tmpdir:
lock_path = Path(tmpdir) / "lock"
lock = VaultLock(lock_path)
# Should not raise
lock.release()
def test_is_locked_when_not_locked(self) -> None:
"""Test is_locked returns False when not locked."""
with tempfile.TemporaryDirectory() as tmpdir:
lock_path = Path(tmpdir) / "lock"
lock = VaultLock(lock_path)
assert not lock.is_locked()
def test_is_locked_when_locked(self) -> None:
"""Test is_locked returns True when locked."""
with tempfile.TemporaryDirectory() as tmpdir:
lock_path = Path(tmpdir) / "lock"
lock = VaultLock(lock_path)
lock.acquire()
try:
# Check from different VaultLock instance
other_lock = VaultLock(lock_path)
assert other_lock.is_locked()
finally:
lock.release()
def test_context_manager(self) -> None:
"""Test context manager usage."""
with tempfile.TemporaryDirectory() as tmpdir:
lock_path = Path(tmpdir) / "lock"
with VaultLock(lock_path):
assert lock_path.exists()
assert not lock_path.exists()
def test_context_manager_raises_when_locked(self) -> None:
"""Test context manager raises when already locked."""
with tempfile.TemporaryDirectory() as tmpdir:
lock_path = Path(tmpdir) / "lock"
lock1 = VaultLock(lock_path)
lock1.acquire()
try:
with pytest.raises(VaultLockError):
with VaultLock(lock_path):
pass
finally:
lock1.release()
def test_get_owner_pid_no_lock_file(self) -> None:
"""Test get_owner_pid returns None when no lock file."""
with tempfile.TemporaryDirectory() as tmpdir:
lock_path = Path(tmpdir) / "lock"
lock = VaultLock(lock_path)
assert lock.get_owner_pid() is None
def _acquire_lock_in_subprocess(lock_path: str, result_queue: multiprocessing.Queue) -> None:
"""Helper function to acquire lock in subprocess."""
lock = VaultLock(Path(lock_path))
acquired = lock.acquire()
result_queue.put(acquired)
if acquired:
time.sleep(0.5) # Hold lock briefly
lock.release()
class TestVaultLockMultiprocess:
"""Tests for VaultLock with multiple processes."""
def test_second_process_cannot_acquire(self) -> None:
"""Test that second process cannot acquire lock."""
with tempfile.TemporaryDirectory() as tmpdir:
lock_path = Path(tmpdir) / "lock"
lock = VaultLock(lock_path)
# Acquire lock in main process
assert lock.acquire()
try:
# Try to acquire in subprocess
result_queue: multiprocessing.Queue = multiprocessing.Queue()
process = multiprocessing.Process(
target=_acquire_lock_in_subprocess,
args=(str(lock_path), result_queue),
)
process.start()
process.join(timeout=2)
# Subprocess should not have acquired the lock
acquired_in_subprocess = result_queue.get(timeout=1)
assert not acquired_in_subprocess
finally:
lock.release()
def test_process_can_acquire_after_release(self) -> None:
"""Test that process can acquire lock after it's released."""
with tempfile.TemporaryDirectory() as tmpdir:
lock_path = Path(tmpdir) / "lock"
lock = VaultLock(lock_path)
# Acquire and release
lock.acquire()
lock.release()
# Now subprocess should be able to acquire
result_queue: multiprocessing.Queue = multiprocessing.Queue()
process = multiprocessing.Process(
target=_acquire_lock_in_subprocess,
args=(str(lock_path), result_queue),
)
process.start()
process.join(timeout=2)
acquired_in_subprocess = result_queue.get(timeout=1)
assert acquired_in_subprocess