"""Security adapters: password hashing (argon2 via pwdlib) and JWT (pyjwt). These are the concrete implementations of the domain ports ``PasswordHasher`` and ``TokenService``. Vendor crypto libraries are confined to this module (CLAUDE.md: security is a cross-cutting concern in ``core``). Higher layers depend only on the Protocols, never on pwdlib/pyjwt directly. """ import datetime as dt import uuid import jwt from pwdlib import PasswordHash from app.core.config import Settings from app.domain.errors import AuthenticationError from app.domain.tokens import IssuedToken, TokenClaims, TokenType class Argon2PasswordHasher: """argon2id hasher with sensible defaults from pwdlib.""" def __init__(self) -> None: self._ph = PasswordHash.recommended() def hash(self, password: str) -> str: return self._ph.hash(password) def verify_and_update(self, password: str, password_hash: str) -> tuple[bool, str | None]: return self._ph.verify_and_update(password, password_hash) class JwtTokenService: """Issues and verifies HS256 JWTs for access + refresh tokens. TTLs come from settings (access: short; refresh: long, offline-first). Every token carries a unique ``jti`` so refresh tokens can be tracked and revoked server-side. """ def __init__(self, settings: Settings) -> None: self._secret = settings.jwt_secret.get_secret_value() self._algorithm = settings.jwt_algorithm self._ttl = { TokenType.ACCESS: settings.access_token_ttl_seconds, TokenType.REFRESH: settings.refresh_token_ttl_seconds, } def issue(self, *, subject: uuid.UUID, token_type: TokenType) -> IssuedToken: now = dt.datetime.now(dt.UTC) expires_at = now + dt.timedelta(seconds=self._ttl[token_type]) jti = uuid.uuid4() payload = { "sub": str(subject), "type": token_type.value, "jti": str(jti), "iat": int(now.timestamp()), "exp": int(expires_at.timestamp()), } encoded = jwt.encode(payload, self._secret, algorithm=self._algorithm) return IssuedToken(encoded=encoded, jti=jti, expires_at=expires_at) def decode(self, encoded: str) -> TokenClaims: try: payload = jwt.decode(encoded, self._secret, algorithms=[self._algorithm]) return TokenClaims( subject=uuid.UUID(payload["sub"]), token_type=TokenType(payload["type"]), jti=uuid.UUID(payload["jti"]), expires_at=dt.datetime.fromtimestamp(payload["exp"], tz=dt.UTC), ) except (jwt.InvalidTokenError, KeyError, ValueError) as exc: raise AuthenticationError("Invalid or expired token.") from exc