Files
mcma-backend/app/core/security.py
T
Senko-san 7a17e3babd feat(subsonic): per-user encrypted app-password foundation
Subsonic auth (t=md5(password+salt), legacy p=) needs a recoverable secret,
but login passwords are stored as a one-way argon2 hash. Add a separate,
per-user app-password: high-entropy, random, and encrypted at rest with a
Fernet key derived from SUBSONIC_SECRET_KEY (never stored in the DB).

- SubsonicPasswordCipher + generate_subsonic_password in core.security
- users.subsonic_password_enc column (+ Alembic migration), repo + port methods
- SubsonicAuthService: verify (t+s / p / p=enc:) and rotate/reveal lifecycle
- self-service GET/POST /users/me/subsonic-password + admin rotate endpoint
- domain SubsonicCredentials + SubsonicCipher port; deps wiring

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 18:23:19 +03:00

112 lines
4.3 KiB
Python

"""Security adapters: password hashing (argon2 via pwdlib) and JWT (pyjwt).
These are the concrete implementations of the domain ports
``PasswordHasher`` and ``TokenService``. Vendor crypto libraries are confined
to this module (CLAUDE.md: security is a cross-cutting concern in ``core``).
Higher layers depend only on the Protocols, never on pwdlib/pyjwt directly.
"""
import base64
import datetime as dt
import hashlib
import secrets
import uuid
import jwt
from cryptography.fernet import Fernet, InvalidToken
from pwdlib import PasswordHash
from app.core.config import Settings
from app.domain.errors import AuthenticationError
from app.domain.tokens import IssuedToken, TokenClaims, TokenType
# Length (in bytes of entropy) of a generated Subsonic app-password. 18 bytes of
# url-safe base64 → 24 characters, well above the Subsonic auth threat model.
_SUBSONIC_PASSWORD_ENTROPY_BYTES = 18
def generate_subsonic_password() -> str:
"""A fresh, high-entropy Subsonic app-password (url-safe, ~24 chars)."""
return secrets.token_urlsafe(_SUBSONIC_PASSWORD_ENTROPY_BYTES)
class SubsonicPasswordCipher:
"""Symmetric encrypt/decrypt for the recoverable Subsonic app-password.
Subsonic auth (``t=md5(password+salt)`` and legacy ``p=``) needs the plaintext
password server-side, so — unlike the argon2-hashed login password — the
app-password is stored *encrypted*, not hashed. A Fernet key (AES-128-CBC +
HMAC) is derived from the configured secret; the plaintext key never touches
the DB. Implements :class:`app.domain.ports.SubsonicCipher`.
"""
def __init__(self, secret_key: str) -> None:
digest = hashlib.sha256(secret_key.encode("utf-8")).digest()
self._fernet = Fernet(base64.urlsafe_b64encode(digest))
def encrypt(self, plaintext: str) -> str:
return self._fernet.encrypt(plaintext.encode("utf-8")).decode("ascii")
def decrypt(self, token: str) -> str:
try:
return self._fernet.decrypt(token.encode("ascii")).decode("utf-8")
except InvalidToken as exc:
# Wrong/rotated secret key, or corrupted ciphertext.
raise AuthenticationError("Stored Subsonic password could not be decrypted.") from exc
class Argon2PasswordHasher:
"""argon2id hasher with sensible defaults from pwdlib."""
def __init__(self) -> None:
self._ph = PasswordHash.recommended()
def hash(self, password: str) -> str:
return self._ph.hash(password)
def verify_and_update(self, password: str, password_hash: str) -> tuple[bool, str | None]:
return self._ph.verify_and_update(password, password_hash)
class JwtTokenService:
"""Issues and verifies HS256 JWTs for access + refresh tokens.
TTLs come from settings (access: short; refresh: long, offline-first).
Every token carries a unique ``jti`` so refresh tokens can be tracked and
revoked server-side.
"""
def __init__(self, settings: Settings) -> None:
self._secret = settings.jwt_secret.get_secret_value()
self._algorithm = settings.jwt_algorithm
self._ttl = {
TokenType.ACCESS: settings.access_token_ttl_seconds,
TokenType.REFRESH: settings.refresh_token_ttl_seconds,
}
def issue(self, *, subject: uuid.UUID, token_type: TokenType) -> IssuedToken:
now = dt.datetime.now(dt.UTC)
expires_at = now + dt.timedelta(seconds=self._ttl[token_type])
jti = uuid.uuid4()
payload = {
"sub": str(subject),
"type": token_type.value,
"jti": str(jti),
"iat": int(now.timestamp()),
"exp": int(expires_at.timestamp()),
}
encoded = jwt.encode(payload, self._secret, algorithm=self._algorithm)
return IssuedToken(encoded=encoded, jti=jti, expires_at=expires_at)
def decode(self, encoded: str) -> TokenClaims:
try:
payload = jwt.decode(encoded, self._secret, algorithms=[self._algorithm])
return TokenClaims(
subject=uuid.UUID(payload["sub"]),
token_type=TokenType(payload["type"]),
jti=uuid.UUID(payload["jti"]),
expires_at=dt.datetime.fromtimestamp(payload["exp"], tz=dt.UTC),
)
except (jwt.InvalidTokenError, KeyError, ValueError) as exc:
raise AuthenticationError("Invalid or expired token.") from exc