"""Integration tests for the Subsonic /rest layer (happy path per endpoint group). Requires a reachable Postgres; skips otherwise (mirrors test_upload_stream_api). Drives the real ASGI app: seeds a user + a track, mints a Subsonic app-password via the native API, then exercises /rest with real query-string auth. """ import asyncio import os from collections.abc import AsyncIterator from pathlib import Path from xml.etree import ElementTree as ET import pytest from app.core.config import get_settings from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope from app.infrastructure.db.repositories import ( SqlAlchemyRefreshTokenRepository, SqlAlchemyUserRepository, ) from asgi_lifespan import LifespanManager from httpx import ASGITransport, AsyncClient pytestmark = pytest.mark.asyncio _db_reachable_cache: bool | None = None async def _db_reachable() -> bool: global _db_reachable_cache if _db_reachable_cache is not None: return _db_reachable_cache from sqlalchemy import text try: async with asyncio.timeout(3): async with get_engine().connect() as conn: await conn.execute(text("SELECT 1")) _db_reachable_cache = True except Exception: _db_reachable_cache = False return _db_reachable_cache @pytest.fixture async def api(tmp_path: Path) -> AsyncIterator[AsyncClient]: if not await _db_reachable(): pytest.skip("Postgres not reachable — integration test skipped.") os.environ["MEDIA_PATH"] = str(tmp_path) get_settings.cache_clear() import app.infrastructure.storage.provider as _storage_provider _storage_provider._storage = None try: async with get_engine().begin() as conn: await conn.run_sync(Base.metadata.drop_all) await conn.run_sync(Base.metadata.create_all) from app.application.user_service import UserService from app.core.security import Argon2PasswordHasher async with session_scope() as session: await UserService( users=SqlAlchemyUserRepository(session), refresh_tokens=SqlAlchemyRefreshTokenRepository(session), hasher=Argon2PasswordHasher(), ).create_user(username="testuser", password="testpass1", is_superuser=False) from app.main import create_app app = create_app() async with LifespanManager(app): transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://test") as client: yield client async with get_engine().begin() as conn: await conn.run_sync(Base.metadata.drop_all) await dispose_engine() finally: _storage_provider._storage = None os.environ.pop("MEDIA_PATH", None) get_settings.cache_clear() async def _login(api: AsyncClient) -> str: resp = await api.post( "/api/v1/auth/login", json={"username": "testuser", "password": "testpass1"} ) assert resp.status_code == 200 return str(resp.json()["access_token"]) async def _subsonic_password(api: AsyncClient, token: str) -> str: resp = await api.get( "/api/v1/users/me/subsonic-password", headers={"Authorization": f"Bearer {token}"} ) assert resp.status_code == 200, resp.text return str(resp.json()["password"]) async def _seed_track(api: AsyncClient, token: str) -> str: resp = await api.post( "/api/v1/upload", files={"file": ("song.mp3", b"audio bytes for subsonic" * 20, "audio/mpeg")}, headers={"Authorization": f"Bearer {token}"}, ) assert resp.status_code == 200, resp.text return str(resp.json()["track_id"]) def _auth_params(password: str) -> dict[str, str]: # Legacy plaintext password auth (p=) keeps the test simple; t+s is covered # by the auth-service unit tests. return {"u": "testuser", "p": password, "c": "pytest", "v": "1.16.1", "f": "json"} async def _setup(api: AsyncClient) -> tuple[dict[str, str], str]: token = await _login(api) password = await _subsonic_password(api, token) track_id = await _seed_track(api, token) return _auth_params(password), track_id async def test_ping_ok(api: AsyncClient) -> None: params, _ = await _setup(api) resp = await api.get("/rest/ping", params=params) assert resp.status_code == 200 assert resp.json()["subsonic-response"]["status"] == "ok" async def test_ping_bad_credentials_returns_code_40(api: AsyncClient) -> None: await _setup(api) resp = await api.get( "/rest/ping", params={"u": "testuser", "p": "wrong", "c": "pytest", "v": "1.16.1", "f": "json"}, ) # Subsonic errors are HTTP 200 with the failure inside the envelope. assert resp.status_code == 200 body = resp.json()["subsonic-response"] assert body["status"] == "failed" assert body["error"]["code"] == 40 async def test_ping_xml_default(api: AsyncClient) -> None: params, _ = await _setup(api) xml_params = {k: v for k, v in params.items() if k != "f"} resp = await api.get("/rest/ping", params=xml_params) assert resp.status_code == 200 assert resp.headers["content-type"].startswith("application/xml") root = ET.fromstring(resp.content) assert root.attrib["status"] == "ok" async def test_get_artists(api: AsyncClient) -> None: params, _ = await _setup(api) resp = await api.get("/rest/getArtists", params=params) body = resp.json()["subsonic-response"] assert body["status"] == "ok" assert "artists" in body async def test_get_album_list2(api: AsyncClient) -> None: params, _ = await _setup(api) resp = await api.get("/rest/getAlbumList2", params={**params, "type": "newest"}) body = resp.json()["subsonic-response"] assert body["status"] == "ok" assert "albumList2" in body async def test_search3_finds_song(api: AsyncClient) -> None: params, track_id = await _setup(api) resp = await api.get("/rest/search3", params={**params, "query": "song"}) result = resp.json()["subsonic-response"]["searchResult3"] song_ids = [s["id"] for s in result.get("song", [])] assert f"tr-{track_id}" in song_ids async def test_get_song(api: AsyncClient) -> None: params, track_id = await _setup(api) resp = await api.get("/rest/getSong", params={**params, "id": f"tr-{track_id}"}) song = resp.json()["subsonic-response"]["song"] assert song["id"] == f"tr-{track_id}" async def test_stream_returns_audio(api: AsyncClient) -> None: params, track_id = await _setup(api) resp = await api.get("/rest/stream", params={**params, "id": f"tr-{track_id}"}) assert resp.status_code == 200 assert resp.headers["content-type"].startswith("audio/") assert resp.content == b"audio bytes for subsonic" * 20 async def test_get_cover_art_placeholder(api: AsyncClient) -> None: params, track_id = await _setup(api) resp = await api.get("/rest/getCoverArt", params={**params, "id": f"tr-{track_id}"}) assert resp.status_code == 200 assert resp.headers["content-type"] == "image/png" async def test_playlist_lifecycle(api: AsyncClient) -> None: params, track_id = await _setup(api) created = await api.get( "/rest/createPlaylist", params={**params, "name": "Roadtrip", "songId": f"tr-{track_id}"} ) playlist = created.json()["subsonic-response"]["playlist"] assert playlist["name"] == "Roadtrip" assert playlist["songCount"] == 1 playlist_id = playlist["id"] listed = await api.get("/rest/getPlaylists", params=params) names = [p["name"] for p in listed.json()["subsonic-response"]["playlists"]["playlist"]] assert "Roadtrip" in names deleted = await api.get("/rest/deletePlaylist", params={**params, "id": playlist_id}) assert deleted.json()["subsonic-response"]["status"] == "ok" async def test_star_and_scrobble(api: AsyncClient) -> None: params, track_id = await _setup(api) star = await api.get("/rest/star", params={**params, "id": f"tr-{track_id}"}) assert star.json()["subsonic-response"]["status"] == "ok" scrobble = await api.get( "/rest/scrobble", params={**params, "id": f"tr-{track_id}", "submission": "true"} ) assert scrobble.json()["subsonic-response"]["status"] == "ok" # The like landed in the append-only log → it surfaces via the native API. token = await _login(api) likes = await api.get("/api/v1/likes", headers={"Authorization": f"Bearer {token}"}) assert any(item["id"] == track_id for item in likes.json()["items"])