14c1bc16e0
Add POST /auth/register: creates a non-superuser then auto-logs in, returning the same TokenResponse as login. Gated by the new allow_registration setting (env ALLOW_REGISTRATION, default true); when disabled it raises PermissionDeniedError (403). Accounts remain admin-only for superusers. Tests cover create+login, duplicate (409), short password (422), and the disabled (403) path. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
218 lines
7.8 KiB
Python
218 lines
7.8 KiB
Python
"""Integration tests for the auth + admin HTTP surface.
|
|
|
|
These require a reachable Postgres (the schema is created via metadata). When
|
|
no DB is available they *skip* — preserving the project rule that the test
|
|
suite never hard-requires a running database.
|
|
"""
|
|
|
|
import asyncio
|
|
from collections.abc import AsyncIterator
|
|
|
|
import pytest
|
|
from app.core.security import Argon2PasswordHasher
|
|
from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
|
|
from app.infrastructure.db.repositories import (
|
|
SqlAlchemyRefreshTokenRepository,
|
|
SqlAlchemyUserRepository,
|
|
)
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
pytestmark = pytest.mark.asyncio
|
|
|
|
_db_reachable_cache: bool | None = None
|
|
|
|
|
|
async def _db_reachable() -> bool:
|
|
# Probe once per session (cached): bounded so the suite never hangs when
|
|
# nothing (or a half-open socket) is on the DB port — mirrors the
|
|
# readiness-probe rule (never hang).
|
|
global _db_reachable_cache
|
|
if _db_reachable_cache is not None:
|
|
return _db_reachable_cache
|
|
|
|
from sqlalchemy import text
|
|
|
|
try:
|
|
async with asyncio.timeout(3):
|
|
async with get_engine().connect() as conn:
|
|
await conn.execute(text("SELECT 1"))
|
|
_db_reachable_cache = True
|
|
except Exception:
|
|
_db_reachable_cache = False
|
|
return _db_reachable_cache
|
|
|
|
|
|
@pytest.fixture
|
|
async def api() -> AsyncIterator[AsyncClient]:
|
|
if not await _db_reachable():
|
|
pytest.skip("Postgres not reachable — integration test skipped.")
|
|
|
|
# Fresh schema for the test run.
|
|
async with get_engine().begin() as conn:
|
|
await conn.run_sync(Base.metadata.drop_all)
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
|
|
# Seed an admin directly through the service layer.
|
|
from app.application.user_service import UserService
|
|
|
|
async with session_scope() as session:
|
|
await UserService(
|
|
users=SqlAlchemyUserRepository(session),
|
|
refresh_tokens=SqlAlchemyRefreshTokenRepository(session),
|
|
hasher=Argon2PasswordHasher(),
|
|
).create_user(username="admin", password="adminpass1", is_superuser=True)
|
|
|
|
from app.main import create_app
|
|
from asgi_lifespan import LifespanManager
|
|
|
|
app = create_app()
|
|
async with LifespanManager(app):
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
yield client
|
|
|
|
async with get_engine().begin() as conn:
|
|
await conn.run_sync(Base.metadata.drop_all)
|
|
await dispose_engine()
|
|
|
|
|
|
async def _login(api: AsyncClient, username: str, password: str) -> tuple[str, str]:
|
|
resp = await api.post("/api/v1/auth/login", json={"username": username, "password": password})
|
|
assert resp.status_code == 200, resp.text
|
|
body = resp.json()
|
|
return body["access_token"], body["refresh_token"]
|
|
|
|
|
|
async def test_login_and_me(api: AsyncClient) -> None:
|
|
access, _ = await _login(api, "admin", "adminpass1")
|
|
resp = await api.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {access}"})
|
|
assert resp.status_code == 200
|
|
assert resp.json()["username"] == "admin"
|
|
assert resp.json()["is_superuser"] is True
|
|
|
|
|
|
async def test_login_bad_credentials(api: AsyncClient) -> None:
|
|
resp = await api.post("/api/v1/auth/login", json={"username": "admin", "password": "wrong"})
|
|
assert resp.status_code == 401
|
|
|
|
|
|
async def test_me_requires_token(api: AsyncClient) -> None:
|
|
resp = await api.get("/api/v1/auth/me")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
async def test_refresh_rotation(api: AsyncClient) -> None:
|
|
_, refresh = await _login(api, "admin", "adminpass1")
|
|
resp = await api.post("/api/v1/auth/refresh", json={"refresh_token": refresh})
|
|
assert resp.status_code == 200
|
|
new_refresh = resp.json()["refresh_token"]
|
|
assert new_refresh != refresh
|
|
|
|
# Old refresh is revoked after rotation.
|
|
reuse = await api.post("/api/v1/auth/refresh", json={"refresh_token": refresh})
|
|
assert reuse.status_code == 401
|
|
|
|
|
|
async def test_logout_revokes(api: AsyncClient) -> None:
|
|
_, refresh = await _login(api, "admin", "adminpass1")
|
|
out = await api.post("/api/v1/auth/logout", json={"refresh_token": refresh})
|
|
assert out.status_code == 204
|
|
reuse = await api.post("/api/v1/auth/refresh", json={"refresh_token": refresh})
|
|
assert reuse.status_code == 401
|
|
|
|
|
|
async def test_admin_creates_user_and_nonadmin_forbidden(api: AsyncClient) -> None:
|
|
admin_access, _ = await _login(api, "admin", "adminpass1")
|
|
admin_headers = {"Authorization": f"Bearer {admin_access}"}
|
|
|
|
created = await api.post(
|
|
"/api/v1/admin/users",
|
|
headers=admin_headers,
|
|
json={"username": "carol", "password": "carolpass1"},
|
|
)
|
|
assert created.status_code == 201, created.text
|
|
assert created.json()["is_superuser"] is False
|
|
|
|
# Non-admin cannot reach admin routes.
|
|
user_access, _ = await _login(api, "carol", "carolpass1")
|
|
forbidden = await api.get(
|
|
"/api/v1/admin/users", headers={"Authorization": f"Bearer {user_access}"}
|
|
)
|
|
assert forbidden.status_code == 403
|
|
|
|
|
|
async def test_admin_create_duplicate_conflicts(api: AsyncClient) -> None:
|
|
admin_access, _ = await _login(api, "admin", "adminpass1")
|
|
headers = {"Authorization": f"Bearer {admin_access}"}
|
|
payload = {"username": "dave", "password": "davepass12"}
|
|
|
|
first = await api.post("/api/v1/admin/users", headers=headers, json=payload)
|
|
assert first.status_code == 201
|
|
dup = await api.post("/api/v1/admin/users", headers=headers, json=payload)
|
|
assert dup.status_code == 409
|
|
|
|
|
|
async def test_register_creates_user_and_logs_in(api: AsyncClient) -> None:
|
|
resp = await api.post(
|
|
"/api/v1/auth/register",
|
|
json={"username": "frank", "password": "frankpass1"},
|
|
)
|
|
assert resp.status_code == 201, resp.text
|
|
access = resp.json()["access_token"]
|
|
assert resp.json()["refresh_token"]
|
|
|
|
# The returned access token is immediately usable; account is a regular user.
|
|
me = await api.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {access}"})
|
|
assert me.status_code == 200
|
|
assert me.json()["username"] == "frank"
|
|
assert me.json()["is_superuser"] is False
|
|
|
|
|
|
async def test_register_duplicate_conflicts(api: AsyncClient) -> None:
|
|
payload = {"username": "grace", "password": "gracepass1"}
|
|
first = await api.post("/api/v1/auth/register", json=payload)
|
|
assert first.status_code == 201
|
|
dup = await api.post("/api/v1/auth/register", json=payload)
|
|
assert dup.status_code == 409
|
|
|
|
|
|
async def test_register_short_password_rejected(api: AsyncClient) -> None:
|
|
resp = await api.post(
|
|
"/api/v1/auth/register",
|
|
json={"username": "heidi", "password": "short"},
|
|
)
|
|
assert resp.status_code == 422
|
|
|
|
|
|
async def test_register_disabled_forbidden(api: AsyncClient) -> None:
|
|
from app.core.config import get_settings
|
|
|
|
settings = get_settings()
|
|
settings.allow_registration = False
|
|
try:
|
|
resp = await api.post(
|
|
"/api/v1/auth/register",
|
|
json={"username": "ivan", "password": "ivanpass12"},
|
|
)
|
|
assert resp.status_code == 403
|
|
finally:
|
|
settings.allow_registration = True
|
|
|
|
|
|
async def test_deactivated_user_cannot_login(api: AsyncClient) -> None:
|
|
admin_access, _ = await _login(api, "admin", "adminpass1")
|
|
headers = {"Authorization": f"Bearer {admin_access}"}
|
|
created = await api.post(
|
|
"/api/v1/admin/users",
|
|
headers=headers,
|
|
json={"username": "erin", "password": "erinpass12"},
|
|
)
|
|
user_id = created.json()["id"]
|
|
|
|
deactivate = await api.delete(f"/api/v1/admin/users/{user_id}", headers=headers)
|
|
assert deactivate.status_code == 200
|
|
assert deactivate.json()["is_active"] is False
|
|
|
|
resp = await api.post("/api/v1/auth/login", json={"username": "erin", "password": "erinpass12"})
|
|
assert resp.status_code == 401
|