Files
mcma-backend/tests/test_auth_api.py
T
2026-06-07 15:34:06 +03:00

171 lines
6.2 KiB
Python

"""Integration tests for the auth + admin HTTP surface.
These require a reachable Postgres (the schema is created via metadata). When
no DB is available they *skip* — preserving the project rule that the test
suite never hard-requires a running database.
"""
import asyncio
from collections.abc import AsyncIterator
import pytest
from app.core.security import Argon2PasswordHasher
from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
from app.infrastructure.db.repositories import (
SqlAlchemyRefreshTokenRepository,
SqlAlchemyUserRepository,
)
from httpx import ASGITransport, AsyncClient
pytestmark = pytest.mark.asyncio
_db_reachable_cache: bool | None = None
async def _db_reachable() -> bool:
# Probe once per session (cached): bounded so the suite never hangs when
# nothing (or a half-open socket) is on the DB port — mirrors the
# readiness-probe rule (never hang).
global _db_reachable_cache
if _db_reachable_cache is not None:
return _db_reachable_cache
from sqlalchemy import text
try:
async with asyncio.timeout(3):
async with get_engine().connect() as conn:
await conn.execute(text("SELECT 1"))
_db_reachable_cache = True
except Exception:
_db_reachable_cache = False
return _db_reachable_cache
@pytest.fixture
async def api() -> AsyncIterator[AsyncClient]:
if not await _db_reachable():
pytest.skip("Postgres not reachable — integration test skipped.")
# Fresh schema for the test run.
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
# Seed an admin directly through the service layer.
from app.application.user_service import UserService
async with session_scope() as session:
await UserService(
users=SqlAlchemyUserRepository(session),
refresh_tokens=SqlAlchemyRefreshTokenRepository(session),
hasher=Argon2PasswordHasher(),
).create_user(username="admin", password="adminpass1", is_superuser=True)
from app.main import create_app
from asgi_lifespan import LifespanManager
app = create_app()
async with LifespanManager(app):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await dispose_engine()
async def _login(api: AsyncClient, username: str, password: str) -> tuple[str, str]:
resp = await api.post("/api/v1/auth/login", json={"username": username, "password": password})
assert resp.status_code == 200, resp.text
body = resp.json()
return body["access_token"], body["refresh_token"]
async def test_login_and_me(api: AsyncClient) -> None:
access, _ = await _login(api, "admin", "adminpass1")
resp = await api.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {access}"})
assert resp.status_code == 200
assert resp.json()["username"] == "admin"
assert resp.json()["is_superuser"] is True
async def test_login_bad_credentials(api: AsyncClient) -> None:
resp = await api.post("/api/v1/auth/login", json={"username": "admin", "password": "wrong"})
assert resp.status_code == 401
async def test_me_requires_token(api: AsyncClient) -> None:
resp = await api.get("/api/v1/auth/me")
assert resp.status_code == 401
async def test_refresh_rotation(api: AsyncClient) -> None:
_, refresh = await _login(api, "admin", "adminpass1")
resp = await api.post("/api/v1/auth/refresh", json={"refresh_token": refresh})
assert resp.status_code == 200
new_refresh = resp.json()["refresh_token"]
assert new_refresh != refresh
# Old refresh is revoked after rotation.
reuse = await api.post("/api/v1/auth/refresh", json={"refresh_token": refresh})
assert reuse.status_code == 401
async def test_logout_revokes(api: AsyncClient) -> None:
_, refresh = await _login(api, "admin", "adminpass1")
out = await api.post("/api/v1/auth/logout", json={"refresh_token": refresh})
assert out.status_code == 204
reuse = await api.post("/api/v1/auth/refresh", json={"refresh_token": refresh})
assert reuse.status_code == 401
async def test_admin_creates_user_and_nonadmin_forbidden(api: AsyncClient) -> None:
admin_access, _ = await _login(api, "admin", "adminpass1")
admin_headers = {"Authorization": f"Bearer {admin_access}"}
created = await api.post(
"/api/v1/admin/users",
headers=admin_headers,
json={"username": "carol", "password": "carolpass1"},
)
assert created.status_code == 201, created.text
assert created.json()["is_superuser"] is False
# Non-admin cannot reach admin routes.
user_access, _ = await _login(api, "carol", "carolpass1")
forbidden = await api.get(
"/api/v1/admin/users", headers={"Authorization": f"Bearer {user_access}"}
)
assert forbidden.status_code == 403
async def test_admin_create_duplicate_conflicts(api: AsyncClient) -> None:
admin_access, _ = await _login(api, "admin", "adminpass1")
headers = {"Authorization": f"Bearer {admin_access}"}
payload = {"username": "dave", "password": "davepass12"}
first = await api.post("/api/v1/admin/users", headers=headers, json=payload)
assert first.status_code == 201
dup = await api.post("/api/v1/admin/users", headers=headers, json=payload)
assert dup.status_code == 409
async def test_deactivated_user_cannot_login(api: AsyncClient) -> None:
admin_access, _ = await _login(api, "admin", "adminpass1")
headers = {"Authorization": f"Bearer {admin_access}"}
created = await api.post(
"/api/v1/admin/users",
headers=headers,
json={"username": "erin", "password": "erinpass12"},
)
user_id = created.json()["id"]
deactivate = await api.delete(f"/api/v1/admin/users/{user_id}", headers=headers)
assert deactivate.status_code == 200
assert deactivate.json()["is_active"] is False
resp = await api.post("/api/v1/auth/login", json={"username": "erin", "password": "erinpass12"})
assert resp.status_code == 401