Files
2026-06-03 10:40:00 +03:00

105 lines
4.0 KiB
Python

"""Authentication use cases: login, token refresh (rotation), logout, and
access-token verification.
Depends only on domain ports. Wired with concrete adapters at the composition
root (``app.api.deps``).
"""
import hashlib
import uuid
from app.domain.entities import User
from app.domain.errors import AuthenticationError
from app.domain.ports import (
PasswordHasher,
RefreshTokenRepository,
TokenService,
UserRepository,
)
from app.domain.tokens import IssuedToken, TokenPair, TokenType
def _hash_token(encoded: str) -> str:
"""At-rest hash of a refresh token. A signed JWT is high-entropy, so a fast
SHA-256 suffices (no slow KDF needed) — we never store the raw token."""
return hashlib.sha256(encoded.encode("utf-8")).hexdigest()
class AuthService:
def __init__(
self,
*,
users: UserRepository,
refresh_tokens: RefreshTokenRepository,
hasher: PasswordHasher,
tokens: TokenService,
) -> None:
self._users = users
self._refresh_tokens = refresh_tokens
self._hasher = hasher
self._tokens = tokens
async def login(self, username: str, password: str) -> TokenPair:
credentials = await self._users.get_credentials_by_username(username)
# Same error whether the user is missing or the password is wrong —
# don't leak which usernames exist.
if credentials is None:
raise AuthenticationError("Invalid username or password.")
valid, updated_hash = self._hasher.verify_and_update(password, credentials.password_hash)
if not valid:
raise AuthenticationError("Invalid username or password.")
if not credentials.user.is_active:
raise AuthenticationError("Account is disabled.")
if updated_hash is not None:
await self._users.set_password_hash(credentials.user.id, updated_hash)
return await self._issue_pair(credentials.user.id)
async def refresh(self, encoded_refresh: str) -> TokenPair:
claims = self._tokens.decode(encoded_refresh)
if claims.token_type is not TokenType.REFRESH:
raise AuthenticationError("Not a refresh token.")
if not await self._refresh_tokens.is_valid(claims.jti):
raise AuthenticationError("Refresh token is revoked or expired.")
user = await self._users.get_by_id(claims.subject)
if user is None or not user.is_active:
raise AuthenticationError("Account is unavailable.")
# Rotation: invalidate the presented token before issuing a new pair.
await self._refresh_tokens.revoke(claims.jti)
return await self._issue_pair(user.id)
async def logout(self, encoded_refresh: str) -> None:
# Best-effort: a malformed/expired token simply has nothing to revoke.
try:
claims = self._tokens.decode(encoded_refresh)
except AuthenticationError:
return
if claims.token_type is TokenType.REFRESH:
await self._refresh_tokens.revoke(claims.jti)
async def authenticate_access(self, encoded_access: str) -> User:
claims = self._tokens.decode(encoded_access)
if claims.token_type is not TokenType.ACCESS:
raise AuthenticationError("Not an access token.")
user = await self._users.get_by_id(claims.subject)
if user is None or not user.is_active:
raise AuthenticationError("Account is unavailable.")
return user
async def _issue_pair(self, user_id: uuid.UUID) -> TokenPair:
access = self._tokens.issue(subject=user_id, token_type=TokenType.ACCESS)
refresh = self._tokens.issue(subject=user_id, token_type=TokenType.REFRESH)
await self._persist_refresh(user_id, refresh)
return TokenPair(access=access, refresh=refresh)
async def _persist_refresh(self, user_id: uuid.UUID, refresh: IssuedToken) -> None:
await self._refresh_tokens.add(
jti=refresh.jti,
user_id=user_id,
token_hash=_hash_token(refresh.encoded),
expires_at=refresh.expires_at,
)