551afbab13
Thin adapters over the existing services/repositories (no business logic): - system: ping (auth check), getLicense - browsing: getArtists/getArtist/getAlbum, getAlbumList(2) (newest/alpha/random), getSong, getGenres, getMusicFolders/getIndexes/getMusicDirectory (one folder) - search: search3 (delegates to the library repos) - media: stream + download (reuse StreamingService, honor Range); getCoverArt returns a placeholder until the cover pipeline lands - playlists: get/create/update/delete over the playlist repo (owner-scoped) - annotation: star/unstar → append-only like log, scrobble → play history, setRating → clean no-op - all endpoints also accept the .view suffix and GET+POST for client compat Repo support: album list ordering (newest/random), track genre facets. README documents the mandatory-HTTPS requirement and app-password workflow. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
237 lines
8.4 KiB
Python
237 lines
8.4 KiB
Python
"""Integration tests for the Subsonic /rest layer (happy path per endpoint group).
|
|
|
|
Requires a reachable Postgres; skips otherwise (mirrors test_upload_stream_api).
|
|
Drives the real ASGI app: seeds a user + a track, mints a Subsonic app-password
|
|
via the native API, then exercises /rest with real query-string auth.
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
from collections.abc import AsyncIterator
|
|
from pathlib import Path
|
|
from xml.etree import ElementTree as ET
|
|
|
|
import pytest
|
|
from app.core.config import get_settings
|
|
from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
|
|
from app.infrastructure.db.repositories import (
|
|
SqlAlchemyRefreshTokenRepository,
|
|
SqlAlchemyUserRepository,
|
|
)
|
|
from asgi_lifespan import LifespanManager
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
pytestmark = pytest.mark.asyncio
|
|
|
|
_db_reachable_cache: bool | None = None
|
|
|
|
|
|
async def _db_reachable() -> bool:
|
|
global _db_reachable_cache
|
|
if _db_reachable_cache is not None:
|
|
return _db_reachable_cache
|
|
from sqlalchemy import text
|
|
|
|
try:
|
|
async with asyncio.timeout(3):
|
|
async with get_engine().connect() as conn:
|
|
await conn.execute(text("SELECT 1"))
|
|
_db_reachable_cache = True
|
|
except Exception:
|
|
_db_reachable_cache = False
|
|
return _db_reachable_cache
|
|
|
|
|
|
@pytest.fixture
|
|
async def api(tmp_path: Path) -> AsyncIterator[AsyncClient]:
|
|
if not await _db_reachable():
|
|
pytest.skip("Postgres not reachable — integration test skipped.")
|
|
|
|
os.environ["MEDIA_PATH"] = str(tmp_path)
|
|
get_settings.cache_clear()
|
|
|
|
import app.infrastructure.storage.provider as _storage_provider
|
|
|
|
_storage_provider._storage = None
|
|
|
|
try:
|
|
async with get_engine().begin() as conn:
|
|
await conn.run_sync(Base.metadata.drop_all)
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
|
|
from app.application.user_service import UserService
|
|
from app.core.security import Argon2PasswordHasher
|
|
|
|
async with session_scope() as session:
|
|
await UserService(
|
|
users=SqlAlchemyUserRepository(session),
|
|
refresh_tokens=SqlAlchemyRefreshTokenRepository(session),
|
|
hasher=Argon2PasswordHasher(),
|
|
).create_user(username="testuser", password="testpass1", is_superuser=False)
|
|
|
|
from app.main import create_app
|
|
|
|
app = create_app()
|
|
async with LifespanManager(app):
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
|
yield client
|
|
|
|
async with get_engine().begin() as conn:
|
|
await conn.run_sync(Base.metadata.drop_all)
|
|
await dispose_engine()
|
|
finally:
|
|
_storage_provider._storage = None
|
|
os.environ.pop("MEDIA_PATH", None)
|
|
get_settings.cache_clear()
|
|
|
|
|
|
async def _login(api: AsyncClient) -> str:
|
|
resp = await api.post(
|
|
"/api/v1/auth/login", json={"username": "testuser", "password": "testpass1"}
|
|
)
|
|
assert resp.status_code == 200
|
|
return str(resp.json()["access_token"])
|
|
|
|
|
|
async def _subsonic_password(api: AsyncClient, token: str) -> str:
|
|
resp = await api.get(
|
|
"/api/v1/users/me/subsonic-password", headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
assert resp.status_code == 200, resp.text
|
|
return str(resp.json()["password"])
|
|
|
|
|
|
async def _seed_track(api: AsyncClient, token: str) -> str:
|
|
resp = await api.post(
|
|
"/api/v1/upload",
|
|
files={"file": ("song.mp3", b"audio bytes for subsonic" * 20, "audio/mpeg")},
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
assert resp.status_code == 200, resp.text
|
|
return str(resp.json()["track_id"])
|
|
|
|
|
|
def _auth_params(password: str) -> dict[str, str]:
|
|
# Legacy plaintext password auth (p=) keeps the test simple; t+s is covered
|
|
# by the auth-service unit tests.
|
|
return {"u": "testuser", "p": password, "c": "pytest", "v": "1.16.1", "f": "json"}
|
|
|
|
|
|
async def _setup(api: AsyncClient) -> tuple[dict[str, str], str]:
|
|
token = await _login(api)
|
|
password = await _subsonic_password(api, token)
|
|
track_id = await _seed_track(api, token)
|
|
return _auth_params(password), track_id
|
|
|
|
|
|
async def test_ping_ok(api: AsyncClient) -> None:
|
|
params, _ = await _setup(api)
|
|
resp = await api.get("/rest/ping", params=params)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["subsonic-response"]["status"] == "ok"
|
|
|
|
|
|
async def test_ping_bad_credentials_returns_code_40(api: AsyncClient) -> None:
|
|
await _setup(api)
|
|
resp = await api.get(
|
|
"/rest/ping",
|
|
params={"u": "testuser", "p": "wrong", "c": "pytest", "v": "1.16.1", "f": "json"},
|
|
)
|
|
# Subsonic errors are HTTP 200 with the failure inside the envelope.
|
|
assert resp.status_code == 200
|
|
body = resp.json()["subsonic-response"]
|
|
assert body["status"] == "failed"
|
|
assert body["error"]["code"] == 40
|
|
|
|
|
|
async def test_ping_xml_default(api: AsyncClient) -> None:
|
|
params, _ = await _setup(api)
|
|
xml_params = {k: v for k, v in params.items() if k != "f"}
|
|
resp = await api.get("/rest/ping", params=xml_params)
|
|
assert resp.status_code == 200
|
|
assert resp.headers["content-type"].startswith("application/xml")
|
|
root = ET.fromstring(resp.content)
|
|
assert root.attrib["status"] == "ok"
|
|
|
|
|
|
async def test_get_artists(api: AsyncClient) -> None:
|
|
params, _ = await _setup(api)
|
|
resp = await api.get("/rest/getArtists", params=params)
|
|
body = resp.json()["subsonic-response"]
|
|
assert body["status"] == "ok"
|
|
assert "artists" in body
|
|
|
|
|
|
async def test_get_album_list2(api: AsyncClient) -> None:
|
|
params, _ = await _setup(api)
|
|
resp = await api.get("/rest/getAlbumList2", params={**params, "type": "newest"})
|
|
body = resp.json()["subsonic-response"]
|
|
assert body["status"] == "ok"
|
|
assert "albumList2" in body
|
|
|
|
|
|
async def test_search3_finds_song(api: AsyncClient) -> None:
|
|
params, track_id = await _setup(api)
|
|
resp = await api.get("/rest/search3", params={**params, "query": "song"})
|
|
result = resp.json()["subsonic-response"]["searchResult3"]
|
|
song_ids = [s["id"] for s in result.get("song", [])]
|
|
assert f"tr-{track_id}" in song_ids
|
|
|
|
|
|
async def test_get_song(api: AsyncClient) -> None:
|
|
params, track_id = await _setup(api)
|
|
resp = await api.get("/rest/getSong", params={**params, "id": f"tr-{track_id}"})
|
|
song = resp.json()["subsonic-response"]["song"]
|
|
assert song["id"] == f"tr-{track_id}"
|
|
|
|
|
|
async def test_stream_returns_audio(api: AsyncClient) -> None:
|
|
params, track_id = await _setup(api)
|
|
resp = await api.get("/rest/stream", params={**params, "id": f"tr-{track_id}"})
|
|
assert resp.status_code == 200
|
|
assert resp.headers["content-type"].startswith("audio/")
|
|
assert resp.content == b"audio bytes for subsonic" * 20
|
|
|
|
|
|
async def test_get_cover_art_placeholder(api: AsyncClient) -> None:
|
|
params, track_id = await _setup(api)
|
|
resp = await api.get("/rest/getCoverArt", params={**params, "id": f"tr-{track_id}"})
|
|
assert resp.status_code == 200
|
|
assert resp.headers["content-type"] == "image/png"
|
|
|
|
|
|
async def test_playlist_lifecycle(api: AsyncClient) -> None:
|
|
params, track_id = await _setup(api)
|
|
|
|
created = await api.get(
|
|
"/rest/createPlaylist", params={**params, "name": "Roadtrip", "songId": f"tr-{track_id}"}
|
|
)
|
|
playlist = created.json()["subsonic-response"]["playlist"]
|
|
assert playlist["name"] == "Roadtrip"
|
|
assert playlist["songCount"] == 1
|
|
playlist_id = playlist["id"]
|
|
|
|
listed = await api.get("/rest/getPlaylists", params=params)
|
|
names = [p["name"] for p in listed.json()["subsonic-response"]["playlists"]["playlist"]]
|
|
assert "Roadtrip" in names
|
|
|
|
deleted = await api.get("/rest/deletePlaylist", params={**params, "id": playlist_id})
|
|
assert deleted.json()["subsonic-response"]["status"] == "ok"
|
|
|
|
|
|
async def test_star_and_scrobble(api: AsyncClient) -> None:
|
|
params, track_id = await _setup(api)
|
|
star = await api.get("/rest/star", params={**params, "id": f"tr-{track_id}"})
|
|
assert star.json()["subsonic-response"]["status"] == "ok"
|
|
|
|
scrobble = await api.get(
|
|
"/rest/scrobble", params={**params, "id": f"tr-{track_id}", "submission": "true"}
|
|
)
|
|
assert scrobble.json()["subsonic-response"]["status"] == "ok"
|
|
|
|
# The like landed in the append-only log → it surfaces via the native API.
|
|
token = await _login(api)
|
|
likes = await api.get("/api/v1/likes", headers={"Authorization": f"Bearer {token}"})
|
|
assert any(item["id"] == track_id for item in likes.json()["items"])
|