Files
mcma-backend/tests/test_auth_api.py
T
Senko-san 14c1bc16e0
Docker Build & Publish / build (push) Successful in 1m8s
Docker Build & Publish / push (push) Failing after 34s
Docker Build & Publish / Prune old image versions (push) Has been skipped
feat(auth): public self-service registration (ALLOW_REGISTRATION)
Add POST /auth/register: creates a non-superuser then auto-logs in,
returning the same TokenResponse as login. Gated by the new
allow_registration setting (env ALLOW_REGISTRATION, default true);
when disabled it raises PermissionDeniedError (403). Accounts remain
admin-only for superusers.

Tests cover create+login, duplicate (409), short password (422), and
the disabled (403) path.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 14:06:52 +03:00

218 lines
7.8 KiB
Python

"""Integration tests for the auth + admin HTTP surface.
These require a reachable Postgres (the schema is created via metadata). When
no DB is available they *skip* — preserving the project rule that the test
suite never hard-requires a running database.
"""
import asyncio
from collections.abc import AsyncIterator
import pytest
from app.core.security import Argon2PasswordHasher
from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
from app.infrastructure.db.repositories import (
SqlAlchemyRefreshTokenRepository,
SqlAlchemyUserRepository,
)
from httpx import ASGITransport, AsyncClient
pytestmark = pytest.mark.asyncio
_db_reachable_cache: bool | None = None
async def _db_reachable() -> bool:
# Probe once per session (cached): bounded so the suite never hangs when
# nothing (or a half-open socket) is on the DB port — mirrors the
# readiness-probe rule (never hang).
global _db_reachable_cache
if _db_reachable_cache is not None:
return _db_reachable_cache
from sqlalchemy import text
try:
async with asyncio.timeout(3):
async with get_engine().connect() as conn:
await conn.execute(text("SELECT 1"))
_db_reachable_cache = True
except Exception:
_db_reachable_cache = False
return _db_reachable_cache
@pytest.fixture
async def api() -> AsyncIterator[AsyncClient]:
if not await _db_reachable():
pytest.skip("Postgres not reachable — integration test skipped.")
# Fresh schema for the test run.
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
# Seed an admin directly through the service layer.
from app.application.user_service import UserService
async with session_scope() as session:
await UserService(
users=SqlAlchemyUserRepository(session),
refresh_tokens=SqlAlchemyRefreshTokenRepository(session),
hasher=Argon2PasswordHasher(),
).create_user(username="admin", password="adminpass1", is_superuser=True)
from app.main import create_app
from asgi_lifespan import LifespanManager
app = create_app()
async with LifespanManager(app):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await dispose_engine()
async def _login(api: AsyncClient, username: str, password: str) -> tuple[str, str]:
resp = await api.post("/api/v1/auth/login", json={"username": username, "password": password})
assert resp.status_code == 200, resp.text
body = resp.json()
return body["access_token"], body["refresh_token"]
async def test_login_and_me(api: AsyncClient) -> None:
access, _ = await _login(api, "admin", "adminpass1")
resp = await api.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {access}"})
assert resp.status_code == 200
assert resp.json()["username"] == "admin"
assert resp.json()["is_superuser"] is True
async def test_login_bad_credentials(api: AsyncClient) -> None:
resp = await api.post("/api/v1/auth/login", json={"username": "admin", "password": "wrong"})
assert resp.status_code == 401
async def test_me_requires_token(api: AsyncClient) -> None:
resp = await api.get("/api/v1/auth/me")
assert resp.status_code == 401
async def test_refresh_rotation(api: AsyncClient) -> None:
_, refresh = await _login(api, "admin", "adminpass1")
resp = await api.post("/api/v1/auth/refresh", json={"refresh_token": refresh})
assert resp.status_code == 200
new_refresh = resp.json()["refresh_token"]
assert new_refresh != refresh
# Old refresh is revoked after rotation.
reuse = await api.post("/api/v1/auth/refresh", json={"refresh_token": refresh})
assert reuse.status_code == 401
async def test_logout_revokes(api: AsyncClient) -> None:
_, refresh = await _login(api, "admin", "adminpass1")
out = await api.post("/api/v1/auth/logout", json={"refresh_token": refresh})
assert out.status_code == 204
reuse = await api.post("/api/v1/auth/refresh", json={"refresh_token": refresh})
assert reuse.status_code == 401
async def test_admin_creates_user_and_nonadmin_forbidden(api: AsyncClient) -> None:
admin_access, _ = await _login(api, "admin", "adminpass1")
admin_headers = {"Authorization": f"Bearer {admin_access}"}
created = await api.post(
"/api/v1/admin/users",
headers=admin_headers,
json={"username": "carol", "password": "carolpass1"},
)
assert created.status_code == 201, created.text
assert created.json()["is_superuser"] is False
# Non-admin cannot reach admin routes.
user_access, _ = await _login(api, "carol", "carolpass1")
forbidden = await api.get(
"/api/v1/admin/users", headers={"Authorization": f"Bearer {user_access}"}
)
assert forbidden.status_code == 403
async def test_admin_create_duplicate_conflicts(api: AsyncClient) -> None:
admin_access, _ = await _login(api, "admin", "adminpass1")
headers = {"Authorization": f"Bearer {admin_access}"}
payload = {"username": "dave", "password": "davepass12"}
first = await api.post("/api/v1/admin/users", headers=headers, json=payload)
assert first.status_code == 201
dup = await api.post("/api/v1/admin/users", headers=headers, json=payload)
assert dup.status_code == 409
async def test_register_creates_user_and_logs_in(api: AsyncClient) -> None:
resp = await api.post(
"/api/v1/auth/register",
json={"username": "frank", "password": "frankpass1"},
)
assert resp.status_code == 201, resp.text
access = resp.json()["access_token"]
assert resp.json()["refresh_token"]
# The returned access token is immediately usable; account is a regular user.
me = await api.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {access}"})
assert me.status_code == 200
assert me.json()["username"] == "frank"
assert me.json()["is_superuser"] is False
async def test_register_duplicate_conflicts(api: AsyncClient) -> None:
payload = {"username": "grace", "password": "gracepass1"}
first = await api.post("/api/v1/auth/register", json=payload)
assert first.status_code == 201
dup = await api.post("/api/v1/auth/register", json=payload)
assert dup.status_code == 409
async def test_register_short_password_rejected(api: AsyncClient) -> None:
resp = await api.post(
"/api/v1/auth/register",
json={"username": "heidi", "password": "short"},
)
assert resp.status_code == 422
async def test_register_disabled_forbidden(api: AsyncClient) -> None:
from app.core.config import get_settings
settings = get_settings()
settings.allow_registration = False
try:
resp = await api.post(
"/api/v1/auth/register",
json={"username": "ivan", "password": "ivanpass12"},
)
assert resp.status_code == 403
finally:
settings.allow_registration = True
async def test_deactivated_user_cannot_login(api: AsyncClient) -> None:
admin_access, _ = await _login(api, "admin", "adminpass1")
headers = {"Authorization": f"Bearer {admin_access}"}
created = await api.post(
"/api/v1/admin/users",
headers=headers,
json={"username": "erin", "password": "erinpass12"},
)
user_id = created.json()["id"]
deactivate = await api.delete(f"/api/v1/admin/users/{user_id}", headers=headers)
assert deactivate.status_code == 200
assert deactivate.json()["is_active"] is False
resp = await api.post("/api/v1/auth/login", json={"username": "erin", "password": "erinpass12"})
assert resp.status_code == 401