"""Integration tests for the metadata-editor endpoints (§A7, §1H). Requires a reachable Postgres; skips otherwise. """ import asyncio import os from collections.abc import AsyncIterator from pathlib import Path import pytest from app.core.config import get_settings from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope from app.infrastructure.db.repositories import ( SqlAlchemyRefreshTokenRepository, SqlAlchemyUserRepository, ) from asgi_lifespan import LifespanManager from httpx import ASGITransport, AsyncClient pytestmark = pytest.mark.asyncio _db_reachable_cache: bool | None = None async def _db_reachable() -> bool: global _db_reachable_cache if _db_reachable_cache is not None: return _db_reachable_cache from sqlalchemy import text try: async with asyncio.timeout(3): async with get_engine().connect() as conn: await conn.execute(text("SELECT 1")) _db_reachable_cache = True except Exception: _db_reachable_cache = False return _db_reachable_cache @pytest.fixture async def api(tmp_path: Path) -> AsyncIterator[AsyncClient]: if not await _db_reachable(): pytest.skip("Postgres not reachable — integration test skipped.") os.environ["MEDIA_PATH"] = str(tmp_path) get_settings.cache_clear() import app.infrastructure.storage.provider as _storage_provider _storage_provider._storage = None try: async with get_engine().begin() as conn: await conn.run_sync(Base.metadata.drop_all) await conn.run_sync(Base.metadata.create_all) from app.application.user_service import UserService from app.core.security import Argon2PasswordHasher async with session_scope() as session: await UserService( users=SqlAlchemyUserRepository(session), refresh_tokens=SqlAlchemyRefreshTokenRepository(session), hasher=Argon2PasswordHasher(), ).create_user(username="testuser", password="testpass1", is_superuser=False) from app.main import create_app app = create_app() async with LifespanManager(app): transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://test") as client: yield client async with get_engine().begin() as conn: await conn.run_sync(Base.metadata.drop_all) await dispose_engine() finally: _storage_provider._storage = None os.environ.pop("MEDIA_PATH", None) get_settings.cache_clear() async def _login(api: AsyncClient) -> str: resp = await api.post( "/api/v1/auth/login", json={"username": "testuser", "password": "testpass1"} ) assert resp.status_code == 200 return str(resp.json()["access_token"]) async def _upload(api: AsyncClient, token: str, *, name: str = "song.mp3") -> str: audio = b"fake audio bytes for metadata test" * 10 resp = await api.post( "/api/v1/upload", files={"file": (name, audio, "audio/mpeg")}, headers={"Authorization": f"Bearer {token}"}, ) assert resp.status_code == 200, resp.text return str(resp.json()["track_id"]) async def test_track_out_includes_genre_year_track_number(api: AsyncClient) -> None: token = await _login(api) track_id = await _upload(api, token) resp = await api.get(f"/api/v1/tracks/{track_id}", headers={"Authorization": f"Bearer {token}"}) assert resp.status_code == 200, resp.text body = resp.json() assert "genre" in body assert "year" in body assert "track_number" in body async def test_metadata_matches_degrades_without_acoustid(api: AsyncClient) -> None: # No ACOUSTID_API_KEY / fpcalc configured in the test environment — the # endpoint must degrade to an empty list, not error. token = await _login(api) track_id = await _upload(api, token) resp = await api.get( f"/api/v1/tracks/{track_id}/metadata/matches", headers={"Authorization": f"Bearer {token}"}, ) assert resp.status_code == 200, resp.text assert resp.json() == {"items": []} async def test_metadata_matches_not_found(api: AsyncClient) -> None: token = await _login(api) resp = await api.get( "/api/v1/tracks/00000000-0000-0000-0000-000000000000/metadata/matches", headers={"Authorization": f"Bearer {token}"}, ) assert resp.status_code == 404 async def test_apply_metadata_updates_fields_and_sets_manual(api: AsyncClient) -> None: token = await _login(api) track_id = await _upload(api, token) headers = {"Authorization": f"Bearer {token}"} resp = await api.put( f"/api/v1/tracks/{track_id}/metadata", json={ "title": "New Title", "artist_name": "New Artist", "album_title": "New Album", "year": 1999, "genre": "Rock", "track_number": 3, }, headers=headers, ) assert resp.status_code == 200, resp.text body = resp.json() assert body["title"] == "New Title" assert body["artist_name"] == "New Artist" assert body["album_title"] == "New Album" assert body["year"] == 1999 assert body["genre"] == "Rock" assert body["track_number"] == 3 assert body["metadata_status"] == "manual" # Re-fetch to confirm persistence. again = await api.get(f"/api/v1/tracks/{track_id}", headers=headers) assert again.status_code == 200 assert again.json()["title"] == "New Title" assert again.json()["metadata_status"] == "manual" async def test_apply_metadata_partial_update(api: AsyncClient) -> None: token = await _login(api) track_id = await _upload(api, token) headers = {"Authorization": f"Bearer {token}"} resp = await api.put( f"/api/v1/tracks/{track_id}/metadata", json={"genre": "Jazz"}, headers=headers, ) assert resp.status_code == 200, resp.text body = resp.json() assert body["genre"] == "Jazz" assert body["metadata_status"] == "manual" async def test_apply_metadata_not_found(api: AsyncClient) -> None: token = await _login(api) resp = await api.put( "/api/v1/tracks/00000000-0000-0000-0000-000000000000/metadata", json={"title": "x"}, headers={"Authorization": f"Bearer {token}"}, ) assert resp.status_code == 404