Files
mcma-backend/app/core/security.py
T
2026-06-03 10:40:00 +03:00

74 lines
2.7 KiB
Python

"""Security adapters: password hashing (argon2 via pwdlib) and JWT (pyjwt).
These are the concrete implementations of the domain ports
``PasswordHasher`` and ``TokenService``. Vendor crypto libraries are confined
to this module (CLAUDE.md: security is a cross-cutting concern in ``core``).
Higher layers depend only on the Protocols, never on pwdlib/pyjwt directly.
"""
import datetime as dt
import uuid
import jwt
from pwdlib import PasswordHash
from app.core.config import Settings
from app.domain.errors import AuthenticationError
from app.domain.tokens import IssuedToken, TokenClaims, TokenType
class Argon2PasswordHasher:
"""argon2id hasher with sensible defaults from pwdlib."""
def __init__(self) -> None:
self._ph = PasswordHash.recommended()
def hash(self, password: str) -> str:
return self._ph.hash(password)
def verify_and_update(self, password: str, password_hash: str) -> tuple[bool, str | None]:
return self._ph.verify_and_update(password, password_hash)
class JwtTokenService:
"""Issues and verifies HS256 JWTs for access + refresh tokens.
TTLs come from settings (access: short; refresh: long, offline-first).
Every token carries a unique ``jti`` so refresh tokens can be tracked and
revoked server-side.
"""
def __init__(self, settings: Settings) -> None:
self._secret = settings.jwt_secret.get_secret_value()
self._algorithm = settings.jwt_algorithm
self._ttl = {
TokenType.ACCESS: settings.access_token_ttl_seconds,
TokenType.REFRESH: settings.refresh_token_ttl_seconds,
}
def issue(self, *, subject: uuid.UUID, token_type: TokenType) -> IssuedToken:
now = dt.datetime.now(dt.UTC)
expires_at = now + dt.timedelta(seconds=self._ttl[token_type])
jti = uuid.uuid4()
payload = {
"sub": str(subject),
"type": token_type.value,
"jti": str(jti),
"iat": int(now.timestamp()),
"exp": int(expires_at.timestamp()),
}
encoded = jwt.encode(payload, self._secret, algorithm=self._algorithm)
return IssuedToken(encoded=encoded, jti=jti, expires_at=expires_at)
def decode(self, encoded: str) -> TokenClaims:
try:
payload = jwt.decode(encoded, self._secret, algorithms=[self._algorithm])
return TokenClaims(
subject=uuid.UUID(payload["sub"]),
token_type=TokenType(payload["type"]),
jti=uuid.UUID(payload["jti"]),
expires_at=dt.datetime.fromtimestamp(payload["exp"], tz=dt.UTC),
)
except (jwt.InvalidTokenError, KeyError, ValueError) as exc:
raise AuthenticationError("Invalid or expired token.") from exc