"""Tests for VaultLock.""" import multiprocessing import os import tempfile import time from pathlib import Path import pytest from src.core.lock import VaultLock, VaultLockError class TestVaultLock: """Tests for VaultLock class.""" def test_acquire_and_release(self) -> None: """Test basic lock acquire and release.""" with tempfile.TemporaryDirectory() as tmpdir: lock_path = Path(tmpdir) / ".vault" / "lock" lock = VaultLock(lock_path) assert lock.acquire() assert lock_path.exists() lock.release() def test_lock_creates_directory(self) -> None: """Test that lock creates parent directory if needed.""" with tempfile.TemporaryDirectory() as tmpdir: lock_path = Path(tmpdir) / "nested" / "dir" / "lock" lock = VaultLock(lock_path) assert lock.acquire() assert lock_path.parent.exists() lock.release() def test_lock_writes_pid(self) -> None: """Test that lock file contains PID.""" with tempfile.TemporaryDirectory() as tmpdir: lock_path = Path(tmpdir) / "lock" lock = VaultLock(lock_path) lock.acquire() pid = lock.get_owner_pid() lock.release() assert pid == os.getpid() def test_release_removes_lock_file(self) -> None: """Test that release removes lock file.""" with tempfile.TemporaryDirectory() as tmpdir: lock_path = Path(tmpdir) / "lock" lock = VaultLock(lock_path) lock.acquire() lock.release() assert not lock_path.exists() def test_release_safe_when_not_locked(self) -> None: """Test that release is safe to call when not locked.""" with tempfile.TemporaryDirectory() as tmpdir: lock_path = Path(tmpdir) / "lock" lock = VaultLock(lock_path) # Should not raise lock.release() def test_is_locked_when_not_locked(self) -> None: """Test is_locked returns False when not locked.""" with tempfile.TemporaryDirectory() as tmpdir: lock_path = Path(tmpdir) / "lock" lock = VaultLock(lock_path) assert not lock.is_locked() def test_is_locked_when_locked(self) -> None: """Test is_locked returns True when locked.""" with tempfile.TemporaryDirectory() as tmpdir: lock_path = Path(tmpdir) / "lock" lock = VaultLock(lock_path) lock.acquire() try: # Check from different VaultLock instance other_lock = VaultLock(lock_path) assert other_lock.is_locked() finally: lock.release() def test_context_manager(self) -> None: """Test context manager usage.""" with tempfile.TemporaryDirectory() as tmpdir: lock_path = Path(tmpdir) / "lock" with VaultLock(lock_path): assert lock_path.exists() assert not lock_path.exists() def test_context_manager_raises_when_locked(self) -> None: """Test context manager raises when already locked.""" with tempfile.TemporaryDirectory() as tmpdir: lock_path = Path(tmpdir) / "lock" lock1 = VaultLock(lock_path) lock1.acquire() try: with pytest.raises(VaultLockError): with VaultLock(lock_path): pass finally: lock1.release() def test_get_owner_pid_no_lock_file(self) -> None: """Test get_owner_pid returns None when no lock file.""" with tempfile.TemporaryDirectory() as tmpdir: lock_path = Path(tmpdir) / "lock" lock = VaultLock(lock_path) assert lock.get_owner_pid() is None def _acquire_lock_in_subprocess(lock_path: str, result_queue: multiprocessing.Queue) -> None: """Helper function to acquire lock in subprocess.""" lock = VaultLock(Path(lock_path)) acquired = lock.acquire() result_queue.put(acquired) if acquired: time.sleep(0.5) # Hold lock briefly lock.release() class TestVaultLockMultiprocess: """Tests for VaultLock with multiple processes.""" def test_second_process_cannot_acquire(self) -> None: """Test that second process cannot acquire lock.""" with tempfile.TemporaryDirectory() as tmpdir: lock_path = Path(tmpdir) / "lock" lock = VaultLock(lock_path) # Acquire lock in main process assert lock.acquire() try: # Try to acquire in subprocess result_queue: multiprocessing.Queue = multiprocessing.Queue() process = multiprocessing.Process( target=_acquire_lock_in_subprocess, args=(str(lock_path), result_queue), ) process.start() process.join(timeout=2) # Subprocess should not have acquired the lock acquired_in_subprocess = result_queue.get(timeout=1) assert not acquired_in_subprocess finally: lock.release() def test_process_can_acquire_after_release(self) -> None: """Test that process can acquire lock after it's released.""" with tempfile.TemporaryDirectory() as tmpdir: lock_path = Path(tmpdir) / "lock" lock = VaultLock(lock_path) # Acquire and release lock.acquire() lock.release() # Now subprocess should be able to acquire result_queue: multiprocessing.Queue = multiprocessing.Queue() process = multiprocessing.Process( target=_acquire_lock_in_subprocess, args=(str(lock_path), result_queue), ) process.start() process.join(timeout=2) acquired_in_subprocess = result_queue.get(timeout=1) assert acquired_in_subprocess