"""Security adapters: password hashing (argon2 via pwdlib) and JWT (pyjwt). These are the concrete implementations of the domain ports ``PasswordHasher`` and ``TokenService``. Vendor crypto libraries are confined to this module (CLAUDE.md: security is a cross-cutting concern in ``core``). Higher layers depend only on the Protocols, never on pwdlib/pyjwt directly. """ import base64 import datetime as dt import hashlib import secrets import uuid import jwt from cryptography.fernet import Fernet, InvalidToken from pwdlib import PasswordHash from app.core.config import Settings from app.domain.errors import AuthenticationError from app.domain.tokens import IssuedToken, TokenClaims, TokenType # Length (in bytes of entropy) of a generated Subsonic app-password. 18 bytes of # url-safe base64 → 24 characters, well above the Subsonic auth threat model. _SUBSONIC_PASSWORD_ENTROPY_BYTES = 18 def generate_subsonic_password() -> str: """A fresh, high-entropy Subsonic app-password (url-safe, ~24 chars).""" return secrets.token_urlsafe(_SUBSONIC_PASSWORD_ENTROPY_BYTES) class SubsonicPasswordCipher: """Symmetric encrypt/decrypt for the recoverable Subsonic app-password. Subsonic auth (``t=md5(password+salt)`` and legacy ``p=``) needs the plaintext password server-side, so — unlike the argon2-hashed login password — the app-password is stored *encrypted*, not hashed. A Fernet key (AES-128-CBC + HMAC) is derived from the configured secret; the plaintext key never touches the DB. Implements :class:`app.domain.ports.SubsonicCipher`. """ def __init__(self, secret_key: str) -> None: digest = hashlib.sha256(secret_key.encode("utf-8")).digest() self._fernet = Fernet(base64.urlsafe_b64encode(digest)) def encrypt(self, plaintext: str) -> str: return self._fernet.encrypt(plaintext.encode("utf-8")).decode("ascii") def decrypt(self, token: str) -> str: try: return self._fernet.decrypt(token.encode("ascii")).decode("utf-8") except InvalidToken as exc: # Wrong/rotated secret key, or corrupted ciphertext. raise AuthenticationError("Stored Subsonic password could not be decrypted.") from exc class Argon2PasswordHasher: """argon2id hasher with sensible defaults from pwdlib.""" def __init__(self) -> None: self._ph = PasswordHash.recommended() def hash(self, password: str) -> str: return self._ph.hash(password) def verify_and_update(self, password: str, password_hash: str) -> tuple[bool, str | None]: return self._ph.verify_and_update(password, password_hash) class JwtTokenService: """Issues and verifies HS256 JWTs for access + refresh tokens. TTLs come from settings (access: short; refresh: long, offline-first). Every token carries a unique ``jti`` so refresh tokens can be tracked and revoked server-side. """ def __init__(self, settings: Settings) -> None: self._secret = settings.jwt_secret.get_secret_value() self._algorithm = settings.jwt_algorithm self._ttl = { TokenType.ACCESS: settings.access_token_ttl_seconds, TokenType.REFRESH: settings.refresh_token_ttl_seconds, } def issue(self, *, subject: uuid.UUID, token_type: TokenType) -> IssuedToken: now = dt.datetime.now(dt.UTC) expires_at = now + dt.timedelta(seconds=self._ttl[token_type]) jti = uuid.uuid4() payload = { "sub": str(subject), "type": token_type.value, "jti": str(jti), "iat": int(now.timestamp()), "exp": int(expires_at.timestamp()), } encoded = jwt.encode(payload, self._secret, algorithm=self._algorithm) return IssuedToken(encoded=encoded, jti=jti, expires_at=expires_at) def decode(self, encoded: str) -> TokenClaims: try: payload = jwt.decode(encoded, self._secret, algorithms=[self._algorithm]) return TokenClaims( subject=uuid.UUID(payload["sub"]), token_type=TokenType(payload["type"]), jti=uuid.UUID(payload["jti"]), expires_at=dt.datetime.fromtimestamp(payload["exp"], tz=dt.UTC), ) except (jwt.InvalidTokenError, KeyError, ValueError) as exc: raise AuthenticationError("Invalid or expired token.") from exc