7a17e3babd
Subsonic auth (t=md5(password+salt), legacy p=) needs a recoverable secret, but login passwords are stored as a one-way argon2 hash. Add a separate, per-user app-password: high-entropy, random, and encrypted at rest with a Fernet key derived from SUBSONIC_SECRET_KEY (never stored in the DB). - SubsonicPasswordCipher + generate_subsonic_password in core.security - users.subsonic_password_enc column (+ Alembic migration), repo + port methods - SubsonicAuthService: verify (t+s / p / p=enc:) and rotate/reveal lifecycle - self-service GET/POST /users/me/subsonic-password + admin rotate endpoint - domain SubsonicCredentials + SubsonicCipher port; deps wiring Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
132 lines
4.7 KiB
Python
132 lines
4.7 KiB
Python
"""Unit tests for SubsonicAuthService — verification + app-password lifecycle.
|
|
|
|
DB-free: uses the in-memory user repository and a real cipher.
|
|
"""
|
|
|
|
import hashlib
|
|
|
|
import pytest
|
|
from app.application.subsonic_auth_service import SubsonicAuthService
|
|
from app.core.security import SubsonicPasswordCipher
|
|
from app.domain.errors import AuthenticationError, ValidationError
|
|
|
|
from tests.fakes import InMemoryUserRepository
|
|
|
|
pytestmark = pytest.mark.asyncio
|
|
|
|
_KNOWN_PASSWORD = "s3cret-app-password"
|
|
|
|
|
|
def _md5(value: str) -> str:
|
|
return hashlib.md5(value.encode(), usedforsecurity=False).hexdigest()
|
|
|
|
|
|
async def _service_with_user(*, password: str | None = _KNOWN_PASSWORD, active: bool = True):
|
|
users = InMemoryUserRepository()
|
|
cipher = SubsonicPasswordCipher("test-key")
|
|
user = await users.add(username="alice", password_hash="x", is_superuser=False)
|
|
if not active:
|
|
await users.set_active(user.id, False)
|
|
if password is not None:
|
|
await users.set_subsonic_password_enc(user.id, cipher.encrypt(password))
|
|
service = SubsonicAuthService(users=users, cipher=cipher)
|
|
return service, user
|
|
|
|
|
|
async def test_authenticate_token_salt_success() -> None:
|
|
service, user = await _service_with_user()
|
|
salt = "abcdef"
|
|
token = _md5(_KNOWN_PASSWORD + salt)
|
|
result = await service.authenticate(username="alice", token=token, salt=salt, password=None)
|
|
assert result.id == user.id
|
|
|
|
|
|
async def test_authenticate_plain_password_success() -> None:
|
|
service, user = await _service_with_user()
|
|
result = await service.authenticate(
|
|
username="alice", token=None, salt=None, password=_KNOWN_PASSWORD
|
|
)
|
|
assert result.id == user.id
|
|
|
|
|
|
async def test_authenticate_enc_password_success() -> None:
|
|
service, user = await _service_with_user()
|
|
enc = "enc:" + _KNOWN_PASSWORD.encode().hex()
|
|
result = await service.authenticate(username="alice", token=None, salt=None, password=enc)
|
|
assert result.id == user.id
|
|
|
|
|
|
async def test_authenticate_wrong_token_fails() -> None:
|
|
service, _ = await _service_with_user()
|
|
with pytest.raises(AuthenticationError):
|
|
await service.authenticate(
|
|
username="alice", token=_md5("wrong" + "abc"), salt="abc", password=None
|
|
)
|
|
|
|
|
|
async def test_authenticate_wrong_password_fails() -> None:
|
|
service, _ = await _service_with_user()
|
|
with pytest.raises(AuthenticationError):
|
|
await service.authenticate(username="alice", token=None, salt=None, password="nope")
|
|
|
|
|
|
async def test_authenticate_unknown_user_fails() -> None:
|
|
service, _ = await _service_with_user()
|
|
with pytest.raises(AuthenticationError):
|
|
await service.authenticate(
|
|
username="ghost", token=None, salt=None, password=_KNOWN_PASSWORD
|
|
)
|
|
|
|
|
|
async def test_authenticate_inactive_user_fails() -> None:
|
|
service, _ = await _service_with_user(active=False)
|
|
with pytest.raises(AuthenticationError):
|
|
await service.authenticate(
|
|
username="alice", token=None, salt=None, password=_KNOWN_PASSWORD
|
|
)
|
|
|
|
|
|
async def test_authenticate_no_password_set_fails() -> None:
|
|
service, _ = await _service_with_user(password=None)
|
|
with pytest.raises(AuthenticationError):
|
|
await service.authenticate(
|
|
username="alice", token=None, salt=None, password=_KNOWN_PASSWORD
|
|
)
|
|
|
|
|
|
async def test_authenticate_missing_username_is_validation_error() -> None:
|
|
service, _ = await _service_with_user()
|
|
with pytest.raises(ValidationError):
|
|
await service.authenticate(username=None, token=None, salt=None, password=_KNOWN_PASSWORD)
|
|
|
|
|
|
async def test_authenticate_missing_credentials_is_validation_error() -> None:
|
|
service, _ = await _service_with_user()
|
|
with pytest.raises(ValidationError):
|
|
await service.authenticate(username="alice", token=None, salt=None, password=None)
|
|
|
|
|
|
async def test_rotate_then_authenticate() -> None:
|
|
users = InMemoryUserRepository()
|
|
cipher = SubsonicPasswordCipher("test-key")
|
|
user = await users.add(username="bob", password_hash="x", is_superuser=False)
|
|
service = SubsonicAuthService(users=users, cipher=cipher)
|
|
|
|
password = await service.rotate(user.id)
|
|
result = await service.authenticate(username="bob", token=None, salt=None, password=password)
|
|
assert result.id == user.id
|
|
|
|
|
|
async def test_reveal_generates_then_is_stable() -> None:
|
|
users = InMemoryUserRepository()
|
|
cipher = SubsonicPasswordCipher("test-key")
|
|
user = await users.add(username="cara", password_hash="x", is_superuser=False)
|
|
service = SubsonicAuthService(users=users, cipher=cipher)
|
|
|
|
first = await service.reveal(user.id)
|
|
second = await service.reveal(user.id)
|
|
assert first == second # lazily generated once, then stable
|
|
|
|
rotated = await service.rotate(user.id)
|
|
assert rotated != first
|