Files
mcma-backend/app/application/subsonic_auth_service.py
T
Senko-san 7a17e3babd feat(subsonic): per-user encrypted app-password foundation
Subsonic auth (t=md5(password+salt), legacy p=) needs a recoverable secret,
but login passwords are stored as a one-way argon2 hash. Add a separate,
per-user app-password: high-entropy, random, and encrypted at rest with a
Fernet key derived from SUBSONIC_SECRET_KEY (never stored in the DB).

- SubsonicPasswordCipher + generate_subsonic_password in core.security
- users.subsonic_password_enc column (+ Alembic migration), repo + port methods
- SubsonicAuthService: verify (t+s / p / p=enc:) and rotate/reveal lifecycle
- self-service GET/POST /users/me/subsonic-password + admin rotate endpoint
- domain SubsonicCredentials + SubsonicCipher port; deps wiring

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 18:23:19 +03:00

101 lines
4.0 KiB
Python

"""SubsonicAuthService — app-password lifecycle + Subsonic auth verification.
The Subsonic protocol authenticates with either ``t=md5(password+salt)`` (+``s``)
or the legacy ``p=`` (plaintext or ``enc:<hex>``). Both need a *recoverable*
secret server-side, so Subsonic clients authenticate against a dedicated,
high-entropy app-password — never the argon2 login password. That app-password
is encrypted at rest (:class:`~app.domain.ports.SubsonicCipher`) and decrypted
only here, to verify a request or to reveal it for copying into a client.
This is an adapter over the existing user store; it adds no business state of its
own beyond the app-password column (CLAUDE.md: Subsonic is an adapter, not a
reimplementation).
"""
import hashlib
import hmac
import uuid
from app.core.security import generate_subsonic_password
from app.domain.entities import User
from app.domain.errors import AuthenticationError, NotFoundError, ValidationError
from app.domain.ports import SubsonicCipher, UserRepository
def _md5_hex(value: str) -> str:
return hashlib.md5(value.encode("utf-8"), usedforsecurity=False).hexdigest()
def _decode_legacy_password(p: str) -> str:
"""Decode a Subsonic ``p`` param: ``enc:<hex>`` (hex-encoded) or plaintext."""
if p.startswith("enc:"):
try:
return bytes.fromhex(p[4:]).decode("utf-8")
except ValueError as exc:
raise AuthenticationError("Wrong username or password.") from exc
return p
class SubsonicAuthService:
def __init__(self, *, users: UserRepository, cipher: SubsonicCipher) -> None:
self._users = users
self._cipher = cipher
async def authenticate(
self,
*,
username: str | None,
token: str | None,
salt: str | None,
password: str | None,
) -> User:
"""Resolve Subsonic query auth params to a domain :class:`User`.
Raises :class:`ValidationError` (Subsonic code 10) for missing params and
:class:`AuthenticationError` (code 40) for any credential mismatch — an
unknown user is reported identically to a wrong password (no enumeration).
"""
if not username:
raise ValidationError("Required parameter 'u' is missing.")
if not ((token and salt) or password):
raise ValidationError("Required authentication parameter is missing.")
creds = await self._users.get_subsonic_credentials_by_username(username)
if creds is None or not creds.user.is_active or creds.password_enc is None:
raise AuthenticationError("Wrong username or password.")
app_password = self._cipher.decrypt(creds.password_enc)
if token and salt:
expected = _md5_hex(app_password + salt)
if not hmac.compare_digest(expected, token.lower()):
raise AuthenticationError("Wrong username or password.")
else:
assert password is not None # guaranteed by the missing-param check above
supplied = _decode_legacy_password(password)
if not hmac.compare_digest(supplied, app_password):
raise AuthenticationError("Wrong username or password.")
return creds.user
async def rotate(self, user_id: uuid.UUID) -> str:
"""Generate a fresh app-password, store it encrypted, return the plaintext."""
await self._require_user(user_id)
password = generate_subsonic_password()
await self._users.set_subsonic_password_enc(user_id, self._cipher.encrypt(password))
return password
async def reveal(self, user_id: uuid.UUID) -> str:
"""Return the current app-password, generating one on first access."""
await self._require_user(user_id)
enc = await self._users.get_subsonic_password_enc(user_id)
if enc is None:
return await self.rotate(user_id)
return self._cipher.decrypt(enc)
async def _require_user(self, user_id: uuid.UUID) -> User:
user = await self._users.get_by_id(user_id)
if user is None:
raise NotFoundError("User not found.")
return user