feat(subsonic): per-user encrypted app-password foundation

Subsonic auth (t=md5(password+salt), legacy p=) needs a recoverable secret,
but login passwords are stored as a one-way argon2 hash. Add a separate,
per-user app-password: high-entropy, random, and encrypted at rest with a
Fernet key derived from SUBSONIC_SECRET_KEY (never stored in the DB).

- SubsonicPasswordCipher + generate_subsonic_password in core.security
- users.subsonic_password_enc column (+ Alembic migration), repo + port methods
- SubsonicAuthService: verify (t+s / p / p=enc:) and rotate/reveal lifecycle
- self-service GET/POST /users/me/subsonic-password + admin rotate endpoint
- domain SubsonicCredentials + SubsonicCipher port; deps wiring

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Senko-san
2026-06-08 18:23:19 +03:00
parent 4ade6939b6
commit 7a17e3babd
17 changed files with 535 additions and 9 deletions
+131
View File
@@ -0,0 +1,131 @@
"""Unit tests for SubsonicAuthService — verification + app-password lifecycle.
DB-free: uses the in-memory user repository and a real cipher.
"""
import hashlib
import pytest
from app.application.subsonic_auth_service import SubsonicAuthService
from app.core.security import SubsonicPasswordCipher
from app.domain.errors import AuthenticationError, ValidationError
from tests.fakes import InMemoryUserRepository
pytestmark = pytest.mark.asyncio
_KNOWN_PASSWORD = "s3cret-app-password"
def _md5(value: str) -> str:
return hashlib.md5(value.encode(), usedforsecurity=False).hexdigest()
async def _service_with_user(*, password: str | None = _KNOWN_PASSWORD, active: bool = True):
users = InMemoryUserRepository()
cipher = SubsonicPasswordCipher("test-key")
user = await users.add(username="alice", password_hash="x", is_superuser=False)
if not active:
await users.set_active(user.id, False)
if password is not None:
await users.set_subsonic_password_enc(user.id, cipher.encrypt(password))
service = SubsonicAuthService(users=users, cipher=cipher)
return service, user
async def test_authenticate_token_salt_success() -> None:
service, user = await _service_with_user()
salt = "abcdef"
token = _md5(_KNOWN_PASSWORD + salt)
result = await service.authenticate(username="alice", token=token, salt=salt, password=None)
assert result.id == user.id
async def test_authenticate_plain_password_success() -> None:
service, user = await _service_with_user()
result = await service.authenticate(
username="alice", token=None, salt=None, password=_KNOWN_PASSWORD
)
assert result.id == user.id
async def test_authenticate_enc_password_success() -> None:
service, user = await _service_with_user()
enc = "enc:" + _KNOWN_PASSWORD.encode().hex()
result = await service.authenticate(username="alice", token=None, salt=None, password=enc)
assert result.id == user.id
async def test_authenticate_wrong_token_fails() -> None:
service, _ = await _service_with_user()
with pytest.raises(AuthenticationError):
await service.authenticate(
username="alice", token=_md5("wrong" + "abc"), salt="abc", password=None
)
async def test_authenticate_wrong_password_fails() -> None:
service, _ = await _service_with_user()
with pytest.raises(AuthenticationError):
await service.authenticate(username="alice", token=None, salt=None, password="nope")
async def test_authenticate_unknown_user_fails() -> None:
service, _ = await _service_with_user()
with pytest.raises(AuthenticationError):
await service.authenticate(
username="ghost", token=None, salt=None, password=_KNOWN_PASSWORD
)
async def test_authenticate_inactive_user_fails() -> None:
service, _ = await _service_with_user(active=False)
with pytest.raises(AuthenticationError):
await service.authenticate(
username="alice", token=None, salt=None, password=_KNOWN_PASSWORD
)
async def test_authenticate_no_password_set_fails() -> None:
service, _ = await _service_with_user(password=None)
with pytest.raises(AuthenticationError):
await service.authenticate(
username="alice", token=None, salt=None, password=_KNOWN_PASSWORD
)
async def test_authenticate_missing_username_is_validation_error() -> None:
service, _ = await _service_with_user()
with pytest.raises(ValidationError):
await service.authenticate(username=None, token=None, salt=None, password=_KNOWN_PASSWORD)
async def test_authenticate_missing_credentials_is_validation_error() -> None:
service, _ = await _service_with_user()
with pytest.raises(ValidationError):
await service.authenticate(username="alice", token=None, salt=None, password=None)
async def test_rotate_then_authenticate() -> None:
users = InMemoryUserRepository()
cipher = SubsonicPasswordCipher("test-key")
user = await users.add(username="bob", password_hash="x", is_superuser=False)
service = SubsonicAuthService(users=users, cipher=cipher)
password = await service.rotate(user.id)
result = await service.authenticate(username="bob", token=None, salt=None, password=password)
assert result.id == user.id
async def test_reveal_generates_then_is_stable() -> None:
users = InMemoryUserRepository()
cipher = SubsonicPasswordCipher("test-key")
user = await users.add(username="cara", password_hash="x", is_superuser=False)
service = SubsonicAuthService(users=users, cipher=cipher)
first = await service.reveal(user.id)
second = await service.reveal(user.id)
assert first == second # lazily generated once, then stable
rotated = await service.rotate(user.id)
assert rotated != first