"""SubsonicAuthService — app-password lifecycle + Subsonic auth verification. The Subsonic protocol authenticates with either ``t=md5(password+salt)`` (+``s``) or the legacy ``p=`` (plaintext or ``enc:``). Both need a *recoverable* secret server-side, so Subsonic clients authenticate against a dedicated, high-entropy app-password — never the argon2 login password. That app-password is encrypted at rest (:class:`~app.domain.ports.SubsonicCipher`) and decrypted only here, to verify a request or to reveal it for copying into a client. This is an adapter over the existing user store; it adds no business state of its own beyond the app-password column (CLAUDE.md: Subsonic is an adapter, not a reimplementation). """ import hashlib import hmac import uuid from app.core.security import generate_subsonic_password from app.domain.entities import User from app.domain.errors import AuthenticationError, NotFoundError, ValidationError from app.domain.ports import SubsonicCipher, UserRepository def _md5_hex(value: str) -> str: return hashlib.md5(value.encode("utf-8"), usedforsecurity=False).hexdigest() def _decode_legacy_password(p: str) -> str: """Decode a Subsonic ``p`` param: ``enc:`` (hex-encoded) or plaintext.""" if p.startswith("enc:"): try: return bytes.fromhex(p[4:]).decode("utf-8") except ValueError as exc: raise AuthenticationError("Wrong username or password.") from exc return p class SubsonicAuthService: def __init__(self, *, users: UserRepository, cipher: SubsonicCipher) -> None: self._users = users self._cipher = cipher async def authenticate( self, *, username: str | None, token: str | None, salt: str | None, password: str | None, ) -> User: """Resolve Subsonic query auth params to a domain :class:`User`. Raises :class:`ValidationError` (Subsonic code 10) for missing params and :class:`AuthenticationError` (code 40) for any credential mismatch — an unknown user is reported identically to a wrong password (no enumeration). """ if not username: raise ValidationError("Required parameter 'u' is missing.") if not ((token and salt) or password): raise ValidationError("Required authentication parameter is missing.") creds = await self._users.get_subsonic_credentials_by_username(username) if creds is None or not creds.user.is_active or creds.password_enc is None: raise AuthenticationError("Wrong username or password.") app_password = self._cipher.decrypt(creds.password_enc) if token and salt: expected = _md5_hex(app_password + salt) if not hmac.compare_digest(expected, token.lower()): raise AuthenticationError("Wrong username or password.") else: assert password is not None # guaranteed by the missing-param check above supplied = _decode_legacy_password(password) if not hmac.compare_digest(supplied, app_password): raise AuthenticationError("Wrong username or password.") return creds.user async def rotate(self, user_id: uuid.UUID) -> str: """Generate a fresh app-password, store it encrypted, return the plaintext.""" await self._require_user(user_id) password = generate_subsonic_password() await self._users.set_subsonic_password_enc(user_id, self._cipher.encrypt(password)) return password async def reveal(self, user_id: uuid.UUID) -> str: """Return the current app-password, generating one on first access.""" await self._require_user(user_id) enc = await self._users.get_subsonic_password_enc(user_id) if enc is None: return await self.rotate(user_id) return self._cipher.decrypt(enc) async def _require_user(self, user_id: uuid.UUID) -> User: user = await self._users.get_by_id(user_id) if user is None: raise NotFoundError("User not found.") return user