7a17e3babd
Subsonic auth (t=md5(password+salt), legacy p=) needs a recoverable secret, but login passwords are stored as a one-way argon2 hash. Add a separate, per-user app-password: high-entropy, random, and encrypted at rest with a Fernet key derived from SUBSONIC_SECRET_KEY (never stored in the DB). - SubsonicPasswordCipher + generate_subsonic_password in core.security - users.subsonic_password_enc column (+ Alembic migration), repo + port methods - SubsonicAuthService: verify (t+s / p / p=enc:) and rotate/reveal lifecycle - self-service GET/POST /users/me/subsonic-password + admin rotate endpoint - domain SubsonicCredentials + SubsonicCipher port; deps wiring Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
112 lines
4.3 KiB
Python
112 lines
4.3 KiB
Python
"""Security adapters: password hashing (argon2 via pwdlib) and JWT (pyjwt).
|
|
|
|
These are the concrete implementations of the domain ports
|
|
``PasswordHasher`` and ``TokenService``. Vendor crypto libraries are confined
|
|
to this module (CLAUDE.md: security is a cross-cutting concern in ``core``).
|
|
Higher layers depend only on the Protocols, never on pwdlib/pyjwt directly.
|
|
"""
|
|
|
|
import base64
|
|
import datetime as dt
|
|
import hashlib
|
|
import secrets
|
|
import uuid
|
|
|
|
import jwt
|
|
from cryptography.fernet import Fernet, InvalidToken
|
|
from pwdlib import PasswordHash
|
|
|
|
from app.core.config import Settings
|
|
from app.domain.errors import AuthenticationError
|
|
from app.domain.tokens import IssuedToken, TokenClaims, TokenType
|
|
|
|
# Length (in bytes of entropy) of a generated Subsonic app-password. 18 bytes of
|
|
# url-safe base64 → 24 characters, well above the Subsonic auth threat model.
|
|
_SUBSONIC_PASSWORD_ENTROPY_BYTES = 18
|
|
|
|
|
|
def generate_subsonic_password() -> str:
|
|
"""A fresh, high-entropy Subsonic app-password (url-safe, ~24 chars)."""
|
|
return secrets.token_urlsafe(_SUBSONIC_PASSWORD_ENTROPY_BYTES)
|
|
|
|
|
|
class SubsonicPasswordCipher:
|
|
"""Symmetric encrypt/decrypt for the recoverable Subsonic app-password.
|
|
|
|
Subsonic auth (``t=md5(password+salt)`` and legacy ``p=``) needs the plaintext
|
|
password server-side, so — unlike the argon2-hashed login password — the
|
|
app-password is stored *encrypted*, not hashed. A Fernet key (AES-128-CBC +
|
|
HMAC) is derived from the configured secret; the plaintext key never touches
|
|
the DB. Implements :class:`app.domain.ports.SubsonicCipher`.
|
|
"""
|
|
|
|
def __init__(self, secret_key: str) -> None:
|
|
digest = hashlib.sha256(secret_key.encode("utf-8")).digest()
|
|
self._fernet = Fernet(base64.urlsafe_b64encode(digest))
|
|
|
|
def encrypt(self, plaintext: str) -> str:
|
|
return self._fernet.encrypt(plaintext.encode("utf-8")).decode("ascii")
|
|
|
|
def decrypt(self, token: str) -> str:
|
|
try:
|
|
return self._fernet.decrypt(token.encode("ascii")).decode("utf-8")
|
|
except InvalidToken as exc:
|
|
# Wrong/rotated secret key, or corrupted ciphertext.
|
|
raise AuthenticationError("Stored Subsonic password could not be decrypted.") from exc
|
|
|
|
|
|
class Argon2PasswordHasher:
|
|
"""argon2id hasher with sensible defaults from pwdlib."""
|
|
|
|
def __init__(self) -> None:
|
|
self._ph = PasswordHash.recommended()
|
|
|
|
def hash(self, password: str) -> str:
|
|
return self._ph.hash(password)
|
|
|
|
def verify_and_update(self, password: str, password_hash: str) -> tuple[bool, str | None]:
|
|
return self._ph.verify_and_update(password, password_hash)
|
|
|
|
|
|
class JwtTokenService:
|
|
"""Issues and verifies HS256 JWTs for access + refresh tokens.
|
|
|
|
TTLs come from settings (access: short; refresh: long, offline-first).
|
|
Every token carries a unique ``jti`` so refresh tokens can be tracked and
|
|
revoked server-side.
|
|
"""
|
|
|
|
def __init__(self, settings: Settings) -> None:
|
|
self._secret = settings.jwt_secret.get_secret_value()
|
|
self._algorithm = settings.jwt_algorithm
|
|
self._ttl = {
|
|
TokenType.ACCESS: settings.access_token_ttl_seconds,
|
|
TokenType.REFRESH: settings.refresh_token_ttl_seconds,
|
|
}
|
|
|
|
def issue(self, *, subject: uuid.UUID, token_type: TokenType) -> IssuedToken:
|
|
now = dt.datetime.now(dt.UTC)
|
|
expires_at = now + dt.timedelta(seconds=self._ttl[token_type])
|
|
jti = uuid.uuid4()
|
|
payload = {
|
|
"sub": str(subject),
|
|
"type": token_type.value,
|
|
"jti": str(jti),
|
|
"iat": int(now.timestamp()),
|
|
"exp": int(expires_at.timestamp()),
|
|
}
|
|
encoded = jwt.encode(payload, self._secret, algorithm=self._algorithm)
|
|
return IssuedToken(encoded=encoded, jti=jti, expires_at=expires_at)
|
|
|
|
def decode(self, encoded: str) -> TokenClaims:
|
|
try:
|
|
payload = jwt.decode(encoded, self._secret, algorithms=[self._algorithm])
|
|
return TokenClaims(
|
|
subject=uuid.UUID(payload["sub"]),
|
|
token_type=TokenType(payload["type"]),
|
|
jti=uuid.UUID(payload["jti"]),
|
|
expires_at=dt.datetime.fromtimestamp(payload["exp"], tz=dt.UTC),
|
|
)
|
|
except (jwt.InvalidTokenError, KeyError, ValueError) as exc:
|
|
raise AuthenticationError("Invalid or expired token.") from exc
|