"""Authentication use cases: login, token refresh (rotation), logout, and access-token verification. Depends only on domain ports. Wired with concrete adapters at the composition root (``app.api.deps``). """ import hashlib import uuid from app.domain.entities import User from app.domain.errors import AuthenticationError from app.domain.ports import ( PasswordHasher, RefreshTokenRepository, TokenService, UserRepository, ) from app.domain.tokens import IssuedToken, TokenPair, TokenType def _hash_token(encoded: str) -> str: """At-rest hash of a refresh token. A signed JWT is high-entropy, so a fast SHA-256 suffices (no slow KDF needed) — we never store the raw token.""" return hashlib.sha256(encoded.encode("utf-8")).hexdigest() class AuthService: def __init__( self, *, users: UserRepository, refresh_tokens: RefreshTokenRepository, hasher: PasswordHasher, tokens: TokenService, ) -> None: self._users = users self._refresh_tokens = refresh_tokens self._hasher = hasher self._tokens = tokens async def login(self, username: str, password: str) -> TokenPair: credentials = await self._users.get_credentials_by_username(username) # Same error whether the user is missing or the password is wrong — # don't leak which usernames exist. if credentials is None: raise AuthenticationError("Invalid username or password.") valid, updated_hash = self._hasher.verify_and_update(password, credentials.password_hash) if not valid: raise AuthenticationError("Invalid username or password.") if not credentials.user.is_active: raise AuthenticationError("Account is disabled.") if updated_hash is not None: await self._users.set_password_hash(credentials.user.id, updated_hash) return await self._issue_pair(credentials.user.id) async def refresh(self, encoded_refresh: str) -> TokenPair: claims = self._tokens.decode(encoded_refresh) if claims.token_type is not TokenType.REFRESH: raise AuthenticationError("Not a refresh token.") if not await self._refresh_tokens.is_valid(claims.jti): raise AuthenticationError("Refresh token is revoked or expired.") user = await self._users.get_by_id(claims.subject) if user is None or not user.is_active: raise AuthenticationError("Account is unavailable.") # Rotation: invalidate the presented token before issuing a new pair. await self._refresh_tokens.revoke(claims.jti) return await self._issue_pair(user.id) async def logout(self, encoded_refresh: str) -> None: # Best-effort: a malformed/expired token simply has nothing to revoke. try: claims = self._tokens.decode(encoded_refresh) except AuthenticationError: return if claims.token_type is TokenType.REFRESH: await self._refresh_tokens.revoke(claims.jti) async def authenticate_access(self, encoded_access: str) -> User: claims = self._tokens.decode(encoded_access) if claims.token_type is not TokenType.ACCESS: raise AuthenticationError("Not an access token.") user = await self._users.get_by_id(claims.subject) if user is None or not user.is_active: raise AuthenticationError("Account is unavailable.") return user async def _issue_pair(self, user_id: uuid.UUID) -> TokenPair: access = self._tokens.issue(subject=user_id, token_type=TokenType.ACCESS) refresh = self._tokens.issue(subject=user_id, token_type=TokenType.REFRESH) await self._persist_refresh(user_id, refresh) return TokenPair(access=access, refresh=refresh) async def _persist_refresh(self, user_id: uuid.UUID, refresh: IssuedToken) -> None: await self._refresh_tokens.add( jti=refresh.jti, user_id=user_id, token_hash=_hash_token(refresh.encoded), expires_at=refresh.expires_at, )