183 lines
6.0 KiB
Python
183 lines
6.0 KiB
Python
"""Tests for VaultLock."""
|
|
|
|
import multiprocessing
|
|
import os
|
|
import tempfile
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from src.core.lock import VaultLock, VaultLockError
|
|
|
|
|
|
class TestVaultLock:
|
|
"""Tests for VaultLock class."""
|
|
|
|
def test_acquire_and_release(self) -> None:
|
|
"""Test basic lock acquire and release."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
lock_path = Path(tmpdir) / ".vault" / "lock"
|
|
lock = VaultLock(lock_path)
|
|
|
|
assert lock.acquire()
|
|
assert lock_path.exists()
|
|
lock.release()
|
|
|
|
def test_lock_creates_directory(self) -> None:
|
|
"""Test that lock creates parent directory if needed."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
lock_path = Path(tmpdir) / "nested" / "dir" / "lock"
|
|
lock = VaultLock(lock_path)
|
|
|
|
assert lock.acquire()
|
|
assert lock_path.parent.exists()
|
|
lock.release()
|
|
|
|
def test_lock_writes_pid(self) -> None:
|
|
"""Test that lock file contains PID."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
lock_path = Path(tmpdir) / "lock"
|
|
lock = VaultLock(lock_path)
|
|
|
|
lock.acquire()
|
|
pid = lock.get_owner_pid()
|
|
lock.release()
|
|
|
|
assert pid == os.getpid()
|
|
|
|
def test_release_removes_lock_file(self) -> None:
|
|
"""Test that release removes lock file."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
lock_path = Path(tmpdir) / "lock"
|
|
lock = VaultLock(lock_path)
|
|
|
|
lock.acquire()
|
|
lock.release()
|
|
|
|
assert not lock_path.exists()
|
|
|
|
def test_release_safe_when_not_locked(self) -> None:
|
|
"""Test that release is safe to call when not locked."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
lock_path = Path(tmpdir) / "lock"
|
|
lock = VaultLock(lock_path)
|
|
|
|
# Should not raise
|
|
lock.release()
|
|
|
|
def test_is_locked_when_not_locked(self) -> None:
|
|
"""Test is_locked returns False when not locked."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
lock_path = Path(tmpdir) / "lock"
|
|
lock = VaultLock(lock_path)
|
|
|
|
assert not lock.is_locked()
|
|
|
|
def test_is_locked_when_locked(self) -> None:
|
|
"""Test is_locked returns True when locked."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
lock_path = Path(tmpdir) / "lock"
|
|
lock = VaultLock(lock_path)
|
|
|
|
lock.acquire()
|
|
try:
|
|
# Check from different VaultLock instance
|
|
other_lock = VaultLock(lock_path)
|
|
assert other_lock.is_locked()
|
|
finally:
|
|
lock.release()
|
|
|
|
def test_context_manager(self) -> None:
|
|
"""Test context manager usage."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
lock_path = Path(tmpdir) / "lock"
|
|
|
|
with VaultLock(lock_path):
|
|
assert lock_path.exists()
|
|
|
|
assert not lock_path.exists()
|
|
|
|
def test_context_manager_raises_when_locked(self) -> None:
|
|
"""Test context manager raises when already locked."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
lock_path = Path(tmpdir) / "lock"
|
|
lock1 = VaultLock(lock_path)
|
|
lock1.acquire()
|
|
|
|
try:
|
|
with pytest.raises(VaultLockError):
|
|
with VaultLock(lock_path):
|
|
pass
|
|
finally:
|
|
lock1.release()
|
|
|
|
def test_get_owner_pid_no_lock_file(self) -> None:
|
|
"""Test get_owner_pid returns None when no lock file."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
lock_path = Path(tmpdir) / "lock"
|
|
lock = VaultLock(lock_path)
|
|
|
|
assert lock.get_owner_pid() is None
|
|
|
|
|
|
def _acquire_lock_in_subprocess(lock_path: str, result_queue: multiprocessing.Queue) -> None:
|
|
"""Helper function to acquire lock in subprocess."""
|
|
lock = VaultLock(Path(lock_path))
|
|
acquired = lock.acquire()
|
|
result_queue.put(acquired)
|
|
if acquired:
|
|
time.sleep(0.5) # Hold lock briefly
|
|
lock.release()
|
|
|
|
|
|
class TestVaultLockMultiprocess:
|
|
"""Tests for VaultLock with multiple processes."""
|
|
|
|
def test_second_process_cannot_acquire(self) -> None:
|
|
"""Test that second process cannot acquire lock."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
lock_path = Path(tmpdir) / "lock"
|
|
lock = VaultLock(lock_path)
|
|
|
|
# Acquire lock in main process
|
|
assert lock.acquire()
|
|
|
|
try:
|
|
# Try to acquire in subprocess
|
|
result_queue: multiprocessing.Queue = multiprocessing.Queue()
|
|
process = multiprocessing.Process(
|
|
target=_acquire_lock_in_subprocess,
|
|
args=(str(lock_path), result_queue),
|
|
)
|
|
process.start()
|
|
process.join(timeout=2)
|
|
|
|
# Subprocess should not have acquired the lock
|
|
acquired_in_subprocess = result_queue.get(timeout=1)
|
|
assert not acquired_in_subprocess
|
|
finally:
|
|
lock.release()
|
|
|
|
def test_process_can_acquire_after_release(self) -> None:
|
|
"""Test that process can acquire lock after it's released."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
lock_path = Path(tmpdir) / "lock"
|
|
lock = VaultLock(lock_path)
|
|
|
|
# Acquire and release
|
|
lock.acquire()
|
|
lock.release()
|
|
|
|
# Now subprocess should be able to acquire
|
|
result_queue: multiprocessing.Queue = multiprocessing.Queue()
|
|
process = multiprocessing.Process(
|
|
target=_acquire_lock_in_subprocess,
|
|
args=(str(lock_path), result_queue),
|
|
)
|
|
process.start()
|
|
process.join(timeout=2)
|
|
|
|
acquired_in_subprocess = result_queue.get(timeout=1)
|
|
assert acquired_in_subprocess
|