e45e578f54
Search results now report whether a hit is already saved (in_library,
track_id, availability). New RemoteLibraryService backs POST
/tracks/remote (idempotent placeholder save) and POST
/tracks/{id}/materialize (on-demand fetch via a new materialize_track
arq task, reusing in-flight jobs).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
361 lines
13 KiB
Python
361 lines
13 KiB
Python
"""Integration tests for downloads + external search.
|
|
|
|
Requires a reachable Postgres; skips otherwise. The download worker task is
|
|
invoked directly (no Redis needed) against a fake fetch source, so the full
|
|
DB + storage import path is covered without touching the network.
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
from collections.abc import AsyncIterator
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pytest
|
|
from app.core.config import get_settings
|
|
from app.domain.sources import KIND_FETCH, DownloadResult, SearchResult, SourceInfo
|
|
from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
|
|
from app.infrastructure.db.repositories import (
|
|
SqlAlchemyRefreshTokenRepository,
|
|
SqlAlchemyUserRepository,
|
|
)
|
|
from app.infrastructure.sources.registry import SourceRegistry
|
|
from asgi_lifespan import LifespanManager
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
pytestmark = pytest.mark.asyncio
|
|
|
|
_db_reachable_cache: bool | None = None
|
|
|
|
|
|
async def _db_reachable() -> bool:
|
|
global _db_reachable_cache
|
|
if _db_reachable_cache is not None:
|
|
return _db_reachable_cache
|
|
from sqlalchemy import text
|
|
|
|
try:
|
|
async with asyncio.timeout(3):
|
|
async with get_engine().connect() as conn:
|
|
await conn.execute(text("SELECT 1"))
|
|
_db_reachable_cache = True
|
|
except Exception:
|
|
_db_reachable_cache = False
|
|
return _db_reachable_cache
|
|
|
|
|
|
class FakeFetchSource:
|
|
"""A searchable + fetchable source that writes a local file (no network)."""
|
|
|
|
name = "youtube"
|
|
|
|
def __init__(self, tmp_dir: Path) -> None:
|
|
self._tmp_dir = tmp_dir
|
|
|
|
def info(self) -> SourceInfo:
|
|
return SourceInfo(name=self.name, label="YouTube Music", kind=KIND_FETCH, available=True)
|
|
|
|
def is_available(self) -> bool:
|
|
return True
|
|
|
|
async def search(self, query: str, *, limit: int) -> list[SearchResult]:
|
|
return [
|
|
SearchResult(
|
|
source=self.name,
|
|
source_id="vid-1",
|
|
title=f"{query} song",
|
|
artist="Some Artist",
|
|
album="Some Album",
|
|
duration_seconds=200,
|
|
thumbnail_url="http://img/large.jpg",
|
|
)
|
|
]
|
|
|
|
async def fetch(self, source_id: str, *, on_progress: Any = None) -> DownloadResult:
|
|
path = self._tmp_dir / f"{source_id}.m4a"
|
|
path.write_bytes(b"downloaded audio bytes" * 8)
|
|
if on_progress is not None:
|
|
await on_progress(0.5)
|
|
return DownloadResult(
|
|
source_id=source_id,
|
|
path=path,
|
|
file_format="webm",
|
|
file_size=path.stat().st_size,
|
|
bitrate=160,
|
|
suggested_title=f"Title for {source_id}",
|
|
)
|
|
|
|
async def get_metadata(self, source_id: str) -> None:
|
|
return None
|
|
|
|
|
|
@pytest.fixture
|
|
async def api(tmp_path: Path) -> AsyncIterator[AsyncClient]:
|
|
if not await _db_reachable():
|
|
pytest.skip("Postgres not reachable — integration test skipped.")
|
|
|
|
media = tmp_path / "media"
|
|
media.mkdir()
|
|
os.environ["MEDIA_PATH"] = str(media)
|
|
get_settings.cache_clear()
|
|
|
|
import app.infrastructure.storage.provider as _storage_provider
|
|
|
|
_storage_provider._storage = None
|
|
|
|
try:
|
|
async with get_engine().begin() as conn:
|
|
await conn.run_sync(Base.metadata.drop_all)
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
|
|
from app.application.user_service import UserService
|
|
from app.core.security import Argon2PasswordHasher
|
|
|
|
async with session_scope() as session:
|
|
await UserService(
|
|
users=SqlAlchemyUserRepository(session),
|
|
refresh_tokens=SqlAlchemyRefreshTokenRepository(session),
|
|
hasher=Argon2PasswordHasher(),
|
|
).create_user(username="admin", password="adminpass1", is_superuser=True)
|
|
|
|
from app.api.deps import get_source_registry
|
|
from app.main import create_app
|
|
|
|
app = create_app()
|
|
# Inject a fake fetch source so search/download never hit the network.
|
|
fake_registry = SourceRegistry([FakeFetchSource(tmp_path / "dl")]) # type: ignore[list-item]
|
|
(tmp_path / "dl").mkdir()
|
|
app.dependency_overrides[get_source_registry] = lambda: fake_registry
|
|
|
|
async with LifespanManager(app):
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
yield client
|
|
|
|
async with get_engine().begin() as conn:
|
|
await conn.run_sync(Base.metadata.drop_all)
|
|
await dispose_engine()
|
|
finally:
|
|
_storage_provider._storage = None
|
|
os.environ.pop("MEDIA_PATH", None)
|
|
get_settings.cache_clear()
|
|
|
|
|
|
async def _login(api: AsyncClient) -> str:
|
|
resp = await api.post(
|
|
"/api/v1/auth/login", json={"username": "admin", "password": "adminpass1"}
|
|
)
|
|
assert resp.status_code == 200
|
|
return str(resp.json()["access_token"])
|
|
|
|
|
|
async def test_search_aggregates_fetch_sources(api: AsyncClient) -> None:
|
|
token = await _login(api)
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
resp = await api.get("/api/v1/search", params={"q": "queen"}, headers=headers)
|
|
assert resp.status_code == 200
|
|
body = resp.json()
|
|
assert body["searched_sources"] == ["youtube"]
|
|
assert len(body["results"]) == 1
|
|
hit = body["results"][0]
|
|
assert hit["source"] == "youtube"
|
|
assert hit["source_id"] == "vid-1"
|
|
assert hit["title"] == "queen song"
|
|
|
|
|
|
async def test_search_reports_library_status(api: AsyncClient) -> None:
|
|
"""Remote browse (plan: Model C) — a fresh hit isn't in the library; after
|
|
saving it as a placeholder, the same search reports it as such."""
|
|
token = await _login(api)
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
resp = await api.get("/api/v1/search", params={"q": "queen"}, headers=headers)
|
|
hit = resp.json()["results"][0]
|
|
assert hit["in_library"] is False
|
|
assert hit["track_id"] is None
|
|
assert hit["availability"] is None
|
|
|
|
save = await api.post(
|
|
"/api/v1/tracks/remote",
|
|
json={
|
|
"source": hit["source"],
|
|
"source_id": hit["source_id"],
|
|
"title": hit["title"],
|
|
"artist": hit["artist"],
|
|
},
|
|
headers=headers,
|
|
)
|
|
assert save.status_code == 201
|
|
saved = save.json()
|
|
assert saved["availability"] == "remote"
|
|
assert saved["file_format"] is None
|
|
|
|
resp2 = await api.get("/api/v1/search", params={"q": "queen"}, headers=headers)
|
|
hit2 = resp2.json()["results"][0]
|
|
assert hit2["in_library"] is True
|
|
assert hit2["track_id"] == saved["id"]
|
|
assert hit2["availability"] == "remote"
|
|
|
|
|
|
async def test_save_remote_is_idempotent(api: AsyncClient) -> None:
|
|
token = await _login(api)
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
payload = {"source": "youtube", "source_id": "vid-idem", "title": "A", "artist": "Artist"}
|
|
|
|
first = await api.post("/api/v1/tracks/remote", json=payload, headers=headers)
|
|
second = await api.post("/api/v1/tracks/remote", json=payload, headers=headers)
|
|
|
|
assert first.status_code == 201
|
|
assert second.status_code == 201
|
|
assert first.json()["id"] == second.json()["id"]
|
|
|
|
|
|
async def test_materialize_flow(
|
|
api: AsyncClient, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
|
) -> None:
|
|
"""Save a placeholder, materialize it on demand, and confirm it streams
|
|
afterwards (plan: Model C lazy materialization)."""
|
|
token = await _login(api)
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
save = await api.post(
|
|
"/api/v1/tracks/remote",
|
|
json={
|
|
"source": "youtube",
|
|
"source_id": "vid-mat-1",
|
|
"title": "Materialize Me",
|
|
"artist": "Artist",
|
|
},
|
|
headers=headers,
|
|
)
|
|
track_id = save.json()["id"]
|
|
assert save.json()["availability"] == "remote"
|
|
|
|
# Streaming a placeholder before materialization fails (no audio yet).
|
|
stream_before = await api.get(f"/api/v1/stream/{track_id}", headers=headers)
|
|
assert stream_before.status_code == 404
|
|
|
|
materialize = await api.post(f"/api/v1/tracks/{track_id}/materialize", headers=headers)
|
|
assert materialize.status_code == 200
|
|
body = materialize.json()
|
|
assert body["job"] is not None
|
|
job_id = body["job"]["id"]
|
|
assert body["job"]["track_id"] == track_id
|
|
|
|
# A second materialize request reuses the same in-flight job.
|
|
again = await api.post(f"/api/v1/tracks/{track_id}/materialize", headers=headers)
|
|
assert again.json()["job"]["id"] == job_id
|
|
|
|
# Run the worker task directly (bypasses Redis) with the fake fetch source.
|
|
import app.workers.tasks.materialize_task as mat_task
|
|
|
|
worker_dir = tmp_path / "worker-mat"
|
|
worker_dir.mkdir()
|
|
fake = SourceRegistry([FakeFetchSource(worker_dir)]) # type: ignore[list-item]
|
|
monkeypatch.setattr(mat_task, "build_source_registry", lambda _settings: fake)
|
|
|
|
result = await mat_task.materialize_track({}, job_id=job_id)
|
|
assert result["status"] == "done"
|
|
assert result["track_id"] == track_id
|
|
|
|
got = await api.get(f"/api/v1/tracks/{track_id}", headers=headers)
|
|
assert got.json()["availability"] == "local"
|
|
assert got.json()["file_format"] == "webm"
|
|
|
|
# Streaming now works.
|
|
stream_after = await api.get(f"/api/v1/stream/{track_id}", headers=headers)
|
|
assert stream_after.status_code == 200
|
|
|
|
# Materializing an already-local track is a no-op.
|
|
noop = await api.post(f"/api/v1/tracks/{track_id}/materialize", headers=headers)
|
|
assert noop.json()["job"] is None
|
|
|
|
|
|
async def test_source_scoped_search(api: AsyncClient) -> None:
|
|
token = await _login(api)
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
resp = await api.get("/api/v1/sources/youtube/search", params={"q": "abba"}, headers=headers)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["results"][0]["title"] == "abba song"
|
|
|
|
|
|
async def test_download_create_list_and_complete(
|
|
api: AsyncClient, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
|
) -> None:
|
|
token = await _login(api)
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
# Request a download — Redis is absent, so enqueue degrades but the job persists.
|
|
create = await api.post(
|
|
"/api/v1/downloads",
|
|
json={"source": "youtube", "source_id": "vid-1", "query": "queen"},
|
|
headers=headers,
|
|
)
|
|
assert create.status_code == 202
|
|
body = create.json()
|
|
assert body["already_in_library"] is False
|
|
job_id = body["job"]["id"]
|
|
assert body["job"]["status"] == "queued"
|
|
|
|
# It shows up in the listing.
|
|
listing = await api.get("/api/v1/downloads", headers=headers)
|
|
assert listing.status_code == 200
|
|
assert any(j["id"] == job_id for j in listing.json()["items"])
|
|
|
|
# A duplicate request returns the same in-flight job, not a new one.
|
|
dup = await api.post(
|
|
"/api/v1/downloads",
|
|
json={"source": "youtube", "source_id": "vid-1"},
|
|
headers=headers,
|
|
)
|
|
assert dup.json()["job"]["id"] == job_id
|
|
|
|
# Run the worker task directly (bypasses Redis) with the fake fetch source.
|
|
import app.workers.tasks.download_task as dl_task
|
|
|
|
worker_dl = tmp_path / "worker-dl"
|
|
worker_dl.mkdir()
|
|
fake = SourceRegistry([FakeFetchSource(worker_dl)]) # type: ignore[list-item]
|
|
monkeypatch.setattr(dl_task, "build_source_registry", lambda _settings: fake)
|
|
|
|
result = await dl_task.download_track({}, job_id=job_id)
|
|
assert result["status"] == "done"
|
|
track_id = result["track_id"]
|
|
|
|
# The job is now done and linked to the imported track.
|
|
got = await api.get(f"/api/v1/downloads/{job_id}", headers=headers)
|
|
assert got.json()["status"] == "done"
|
|
assert got.json()["track_id"] == track_id
|
|
|
|
# The imported track streams back.
|
|
stream = await api.get(f"/api/v1/stream/{track_id}", headers=headers)
|
|
assert stream.status_code == 200
|
|
assert len(stream.content) > 0
|
|
|
|
# A new request for the same item now dedups against the library.
|
|
again = await api.post(
|
|
"/api/v1/downloads",
|
|
json={"source": "youtube", "source_id": "vid-1"},
|
|
headers=headers,
|
|
)
|
|
assert again.json()["already_in_library"] is True
|
|
assert again.json()["track_id"] == track_id
|
|
|
|
|
|
async def test_cancel_download(api: AsyncClient) -> None:
|
|
token = await _login(api)
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
create = await api.post(
|
|
"/api/v1/downloads",
|
|
json={"source": "youtube", "source_id": "vid-cancel"},
|
|
headers=headers,
|
|
)
|
|
job_id = create.json()["job"]["id"]
|
|
|
|
cancel = await api.delete(f"/api/v1/downloads/{job_id}", headers=headers)
|
|
assert cancel.status_code == 204
|
|
|
|
got = await api.get(f"/api/v1/downloads/{job_id}", headers=headers)
|
|
assert got.status_code == 404
|