Files
mcma-backend/tests/test_upload_stream_api.py
Senko-san ea880edd57
Docker Build & Publish / build (push) Has been cancelled
Docker Build & Publish / push (push) Has been cancelled
Docker Build & Publish / Prune old image versions (push) Has been cancelled
feat(tracks): filter track list by ingest source
Add an optional `source` filter to `GET /api/v1/tracks` (and the
`TrackRepository.list`/`count` port + SQLAlchemy adapter). Lets clients
query, e.g., only uploaded tracks (`?source=upload`) newest-first — the
backing for the webui's persistent "Recently uploaded" view.

- test: upload then list with `?source=upload` (hit) / `?source=youtube`
  (miss)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-14 01:35:51 +03:00

215 lines
6.9 KiB
Python

"""Integration tests for upload and streaming endpoints.
Requires a reachable Postgres; skips otherwise.
"""
import asyncio
import os
from collections.abc import AsyncIterator
from pathlib import Path
import pytest
from app.core.config import get_settings
from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
from app.infrastructure.db.repositories import (
SqlAlchemyRefreshTokenRepository,
SqlAlchemyUserRepository,
)
from asgi_lifespan import LifespanManager
from httpx import ASGITransport, AsyncClient
pytestmark = pytest.mark.asyncio
_db_reachable_cache: bool | None = None
async def _db_reachable() -> bool:
global _db_reachable_cache
if _db_reachable_cache is not None:
return _db_reachable_cache
from sqlalchemy import text
try:
async with asyncio.timeout(3):
async with get_engine().connect() as conn:
await conn.execute(text("SELECT 1"))
_db_reachable_cache = True
except Exception:
_db_reachable_cache = False
return _db_reachable_cache
@pytest.fixture
async def api(tmp_path: Path) -> AsyncIterator[AsyncClient]:
if not await _db_reachable():
pytest.skip("Postgres not reachable — integration test skipped.")
os.environ["MEDIA_PATH"] = str(tmp_path)
get_settings.cache_clear()
# Also reset the file storage singleton so it picks up the new media_path
import app.infrastructure.storage.provider as _storage_provider
_storage_provider._storage = None
try:
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
from app.application.user_service import UserService
from app.core.security import Argon2PasswordHasher
async with session_scope() as session:
await UserService(
users=SqlAlchemyUserRepository(session),
refresh_tokens=SqlAlchemyRefreshTokenRepository(session),
hasher=Argon2PasswordHasher(),
).create_user(username="testuser", password="testpass1", is_superuser=False)
from app.main import create_app
app = create_app()
async with LifespanManager(app):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await dispose_engine()
finally:
_storage_provider._storage = None
os.environ.pop("MEDIA_PATH", None)
get_settings.cache_clear()
async def _login(api: AsyncClient) -> str:
resp = await api.post(
"/api/v1/auth/login", json={"username": "testuser", "password": "testpass1"}
)
assert resp.status_code == 200
return str(resp.json()["access_token"])
async def test_upload_creates_track(api: AsyncClient) -> None:
token = await _login(api)
audio = b"fake mp3 bytes" * 100
resp = await api.post(
"/api/v1/upload",
files={"file": ("song.mp3", audio, "audio/mpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["already_exists"] is False
assert body["title"] == "song"
assert "track_id" in body
async def test_upload_dedup(api: AsyncClient) -> None:
token = await _login(api)
audio = b"same content" * 50
first = await api.post(
"/api/v1/upload",
files={"file": ("a.mp3", audio, "audio/mpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert first.status_code == 200
assert first.json()["already_exists"] is False
second = await api.post(
"/api/v1/upload",
files={"file": ("b.mp3", audio, "audio/mpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert second.status_code == 200
assert second.json()["already_exists"] is True
assert second.json()["track_id"] == first.json()["track_id"]
async def test_stream_full(api: AsyncClient) -> None:
token = await _login(api)
audio = b"audio data for streaming" * 10
up = await api.post(
"/api/v1/upload",
files={"file": ("track.mp3", audio, "audio/mpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert up.status_code == 200
track_id = up.json()["track_id"]
# Browser <audio> can't send headers — auth rides on the ?token= query param.
resp = await api.get(f"/api/v1/stream/{track_id}?token={token}")
assert resp.status_code == 200
assert resp.content == audio
assert resp.headers["content-type"].startswith("audio/mpeg")
assert "accept-ranges" in resp.headers
async def test_stream_range(api: AsyncClient) -> None:
token = await _login(api)
audio = b"0123456789" * 10
up = await api.post(
"/api/v1/upload",
files={"file": ("range.mp3", audio, "audio/mpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert up.status_code == 200
track_id = up.json()["track_id"]
resp = await api.get(
f"/api/v1/stream/{track_id}",
headers={"Range": "bytes=0-9", "Authorization": f"Bearer {token}"},
)
assert resp.status_code == 206
assert resp.content == b"0123456789"
assert resp.headers["content-range"] == f"bytes 0-9/{len(audio)}"
assert resp.headers["content-length"] == "10"
async def test_stream_not_found(api: AsyncClient) -> None:
token = await _login(api)
resp = await api.get(f"/api/v1/stream/00000000-0000-0000-0000-000000000000?token={token}")
assert resp.status_code == 404
async def test_stream_requires_auth(api: AsyncClient) -> None:
resp = await api.get("/api/v1/stream/00000000-0000-0000-0000-000000000000")
assert resp.status_code == 401
async def test_upload_requires_auth(api: AsyncClient) -> None:
resp = await api.post(
"/api/v1/upload",
files={"file": ("x.mp3", b"data", "audio/mpeg")},
)
assert resp.status_code == 401
async def test_list_tracks_filters_by_source(api: AsyncClient) -> None:
# Uploaded tracks carry source="upload"; `?source=` narrows the list (this
# powers the webui's "Recently uploaded" view, which survives a refresh).
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
await api.post(
"/api/v1/upload",
files={"file": ("uploaded.mp3", b"upload bytes" * 80, "audio/mpeg")},
headers=headers,
)
hit = await api.get("/api/v1/tracks", params={"source": "upload"}, headers=headers)
assert hit.status_code == 200, hit.text
items = hit.json()["items"]
assert len(items) == 1
assert items[0]["source"] == "upload"
miss = await api.get("/api/v1/tracks", params={"source": "youtube"}, headers=headers)
assert miss.status_code == 200
assert miss.json()["items"] == []