Compare commits

...

2 Commits

Author SHA1 Message Date
Senko-san 73d7da440f feat(enrichment): record status/errors and trust high-confidence AcoustID
Docker Build & Publish / build (push) Has been cancelled
Docker Build & Publish / push (push) Has been cancelled
Docker Build & Publish / Prune old image versions (push) Has been cancelled
Two related gaps surfaced from "uploaded a track, nothing changed / no status":

- A track could stay stuck on `pending` forever (an unexpected worker error
  rolled back the run without recording anything), and `failed` carried no
  reason. Add `tracks.metadata_error` + `tracks.enriched_at` (migration), stamp
  the outcome in apply_enrichment, add TrackRepository.mark_enrichment_failed,
  wrap enrich_task to persist crashes as `failed` in a fresh session, and emit a
  human-readable no-match reason. Expose metadata_error/enriched_at in TrackOut.

- The tag-first merge let junk embedded tags (e.g. "Music Track"/"Sound_13958")
  override even a 0.99-confidence AcoustID match. Add acoustid_trust_score
  (default 0.85): above it the acoustic identity wins for title/artist/album/
  year, tags are fallback; below it, tag-first as before.

Add a license-free real-file fixture (Scarlet Fire / Otis McDonald) whose junk
tags AcoustID overrides, with an always-on tag-reader test plus fpcalc/AcoustID/
network-gated identity + full-pipeline tests (skip on host, run in the container).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-13 13:29:08 +03:00
Senko-san 30cb8901f2 fix(tests): isolate suite to a dedicated *_test database
Integration fixtures call Base.metadata.drop_all/create_all on get_engine(),
whose DATABASE_URL points at the developer's real DB — localhost:5432/mcma for
host pytest, db:5432/mcma for `make test-api` (pytest runs inside the api
container). Every run silently wiped dev data: drop_all removes ORM tables but
leaves alembic_version (outside Base.metadata), the exact "tables keep
disappearing, version survives" symptom.

conftest now redirects the whole suite to a <db>_test database before settings
load and creates it on demand via asyncpg, so the dev DB is never opened.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-13 13:27:58 +03:00
18 changed files with 547 additions and 33 deletions
@@ -0,0 +1,39 @@
"""tracks: enrichment outcome (error reason + completion time)
Revision ID: 20260613_enrich_outcome
Revises: 20260608_subsonic_pw
Create Date: 2026-06-13 13:00:00.000000
Adds ``tracks.metadata_error`` and ``tracks.enriched_at`` so a finished
enrichment run records *why* it failed and *when* it completed. Lets the UI
distinguish a still-pending/running track from one that is done or failed, and
surface an actionable reason instead of a silent spinner (plan §6.2).
"""
from __future__ import annotations
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
revision: str = "20260613_enrich_outcome"
down_revision: str | None = "20260608_subsonic_pw"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.add_column(
"tracks",
sa.Column("metadata_error", sa.String(length=2048), nullable=True),
)
op.add_column(
"tracks",
sa.Column("enriched_at", sa.DateTime(timezone=True), nullable=True),
)
def downgrade() -> None:
op.drop_column("tracks", "enriched_at")
op.drop_column("tracks", "metadata_error")
+2
View File
@@ -17,6 +17,8 @@ class TrackOut(BaseModel):
file_format: str
file_size: int
metadata_status: str
metadata_error: str | None
enriched_at: dt.datetime | None
source: str
has_cover: bool
created_at: dt.datetime
+2
View File
@@ -42,6 +42,8 @@ async def _build_track_out(
file_format=t.file_format,
file_size=t.file_size,
metadata_status=t.metadata_status,
metadata_error=t.metadata_error,
enriched_at=t.enriched_at,
source=t.source,
has_cover=bool(t.album_id and albums.get(t.album_id) and albums[t.album_id].cover_path),
created_at=t.created_at,
+37 -7
View File
@@ -58,6 +58,7 @@ class MetadataEnrichmentService:
acoustid: AcoustIdClient,
cover_extractor: CoverArtExtractor | None = None,
cover_provider: CoverArtProvider | None = None,
acoustid_trust_score: float = 0.85,
) -> None:
self._tracks = tracks
self._artists = artists
@@ -68,6 +69,7 @@ class MetadataEnrichmentService:
self._acoustid = acoustid
self._cover_extractor = cover_extractor
self._cover_provider = cover_provider
self._acoustid_trust_score = acoustid_trust_score
async def enrich(self, track_id: uuid.UUID) -> EnrichmentResult:
track = await self._tracks.get_by_id(track_id)
@@ -81,16 +83,31 @@ class MetadataEnrichmentService:
tags = await self._read_local(track.storage_uri)
match = await self._identify(track.storage_uri)
# Merge sources: prefer embedded tags, fall back to the AcoustID match.
# ``title`` is guaranteed non-None by the existing track title; the rest
# stay None when neither source has them.
# Merge order is tag-first by default — embedded tags fix the common
# well-tagged offline case. But a *high-confidence* AcoustID match is the
# more trustworthy identity (downloaded files routinely carry junk tags
# like "Music Track"/"Sound_12345"), so above the trust threshold the
# acoustic match wins for the identity fields and tags become fallback.
tag_title = tags.title if tags else None
tag_artist = tags.artist if tags else None
tag_album = tags.album if tags else None
title = _opt_str(tag_title, match.title if match else None) or track.title
artist_name = _opt_str(tag_artist, match.artist if match else None)
album_title = _opt_str(tag_album, match.album if match else None)
year = _first_int(tags.year if tags else None, match.year if match else None)
match_title = match.title if match else None
match_artist = match.artist if match else None
match_album = match.album if match else None
match_year = match.year if match else None
tag_year = tags.year if tags else None
trust_match = match is not None and match.score >= self._acoustid_trust_score
if trust_match:
title = _opt_str(match_title, tag_title) or track.title
artist_name = _opt_str(match_artist, tag_artist)
album_title = _opt_str(match_album, tag_album)
year = _first_int(match_year, tag_year)
else:
title = _opt_str(tag_title, match_title) or track.title
artist_name = _opt_str(tag_artist, match_artist)
album_title = _opt_str(tag_album, match_album)
year = _first_int(tag_year, match_year)
genre = tags.genre if tags else None
track_number = tags.track_number if tags else None
duration = _first_int(
@@ -114,6 +131,9 @@ class MetadataEnrichmentService:
identified = bool(artist_name) or album_id is not None or mbid is not None
status = "enriched" if identified else "failed"
# On a clean "no identity" outcome, record *why* so the UI shows a reason
# rather than a bare "failed". A successful run clears any prior error.
metadata_error = None if identified else self._no_match_reason()
await self._tracks.apply_enrichment(
track_id,
@@ -128,10 +148,20 @@ class MetadataEnrichmentService:
acoustid_fingerprint=acoustid_id,
musicbrainz_id=mbid,
metadata_status=status,
metadata_error=metadata_error,
)
log.info("enrich_complete", track_id=str(track_id), status=status, mbid=mbid)
return EnrichmentResult(track_id=track_id, status=status, matched_mbid=mbid)
def _no_match_reason(self) -> str:
"""Explain a ``failed`` (no-identity) run in terms a user can act on:
which optional identification step was unavailable, if any."""
if not self._fingerprinter.is_available():
return "No metadata match: audio fingerprinting (fpcalc) is unavailable."
if not self._acoustid.is_available():
return "No metadata match: AcoustID lookup is unavailable (no API key)."
return "No metadata match found in tags or AcoustID."
async def _read_local(self, storage_uri: str) -> AudioTags | None:
try:
async with self._storage.as_local_path(storage_uri) as path:
+4
View File
@@ -90,6 +90,10 @@ class Settings(BaseSettings):
ml_service_url: str | None = None
acoustid_api_key: SecretStr | None = None
acoustid_api_url: str = "https://api.acoustid.org/v2/lookup"
# Above this AcoustID match score, trust the acoustic identification over
# embedded file tags (which are frequently junk on downloaded files —
# e.g. "Music Track" / "Sound_12345"). Below it, keep the tag-first merge.
acoustid_trust_score: float = 0.85
# MusicBrainz/AcoustID require a meaningful User-Agent identifying the
# application and a way to contact its maintainer (see
# https://musicbrainz.org/doc/XML_Web_Service/Rate_Limiting). Self-hosted
+2
View File
@@ -28,5 +28,7 @@ class Track:
genre: str | None
year: int | None
metadata_status: str
metadata_error: str | None
enriched_at: dt.datetime | None
created_at: dt.datetime
updated_at: dt.datetime
+9 -2
View File
@@ -172,11 +172,18 @@ class TrackRepository(Protocol):
acoustid_fingerprint: str | None,
musicbrainz_id: str | None,
metadata_status: str,
metadata_error: str | None = None,
) -> Track:
"""Persist auto-enrichment results. Nullable fields are filled only when
a non-``None`` value is supplied (re-enrich never erases prior data);
``title``/``artist_id``/``metadata_status`` are always written. Callers
must not invoke this for ``metadata_status == 'manual'`` tracks."""
``title``/``artist_id``/``metadata_status`` are always written, and the
run's outcome (``metadata_error`` + completion time) is always stamped.
Callers must not invoke this for ``metadata_status == 'manual'`` tracks."""
...
async def mark_enrichment_failed(self, track_id: uuid.UUID, *, error: str) -> None:
"""Record that an enrichment run crashed unexpectedly: set ``failed`` +
the error reason. A no-op for ``manual`` or missing tracks."""
...
+12 -1
View File
@@ -6,9 +6,10 @@
imports/downloads stay idempotent (plan §4, §6.1).
"""
import datetime as dt
import uuid
from sqlalchemy import ForeignKey, Integer, String, UniqueConstraint
from sqlalchemy import DateTime, ForeignKey, Integer, String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column
from app.infrastructure.db.base import Base
@@ -63,6 +64,16 @@ class TrackModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
nullable=False,
default=MetadataStatus.PENDING.value,
)
# Human-readable reason the last enrichment run set ``failed`` (no match, or
# an unexpected worker error). ``None`` once a run succeeds. Surfaced in the
# UI so a stuck/failed track is diagnosable, not silent.
metadata_error: Mapped[str | None] = mapped_column(String(2048), nullable=True)
# When the last enrichment run finished (success or failure). ``None`` while
# still ``pending`` — lets the UI distinguish "queued/running" from "done".
enriched_at: Mapped[dt.datetime | None] = mapped_column(
DateTime(timezone=True),
nullable=True,
)
added_by: Mapped[uuid.UUID | None] = mapped_column(
ForeignKey("users.id", ondelete="SET NULL"),
@@ -39,6 +39,8 @@ def _track_to_entity(row: TrackModel) -> Track:
genre=row.genre,
year=row.year,
metadata_status=row.metadata_status,
metadata_error=row.metadata_error,
enriched_at=row.enriched_at,
created_at=row.created_at,
updated_at=row.updated_at,
)
@@ -38,6 +38,8 @@ def _track_to_entity(row: TrackModel) -> Track:
genre=row.genre,
year=row.year,
metadata_status=row.metadata_status,
metadata_error=row.metadata_error,
enriched_at=row.enriched_at,
created_at=row.created_at,
updated_at=row.updated_at,
)
@@ -1,5 +1,6 @@
"""Track repository — adapter over ``AsyncSession``."""
import datetime as dt
import uuid
from sqlalchemy import func, select
@@ -26,6 +27,8 @@ def _to_entity(row: TrackModel) -> Track:
genre=row.genre,
year=row.year,
metadata_status=row.metadata_status,
metadata_error=row.metadata_error,
enriched_at=row.enriched_at,
created_at=row.created_at,
updated_at=row.updated_at,
)
@@ -189,6 +192,7 @@ class SqlAlchemyTrackRepository:
acoustid_fingerprint: str | None,
musicbrainz_id: str | None,
metadata_status: str,
metadata_error: str | None = None,
) -> Track:
row = await self._session.get(TrackModel, track_id)
if row is None:
@@ -197,6 +201,10 @@ class SqlAlchemyTrackRepository:
row.title = title
row.artist_id = artist_id
row.metadata_status = metadata_status
# A finished run always stamps outcome: clear/set the reason and mark the
# completion time so the UI can tell "still pending" from "done/failed".
row.metadata_error = metadata_error
row.enriched_at = dt.datetime.now(dt.UTC)
# Nullable extras: fill gaps only — never erase data a prior run found.
if album_id is not None:
row.album_id = album_id
@@ -217,3 +225,16 @@ class SqlAlchemyTrackRepository:
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def mark_enrichment_failed(self, track_id: uuid.UUID, *, error: str) -> None:
"""Record that an enrichment run crashed (unexpected exception). Runs in
its own session so the failure is persisted even though the run's own
transaction rolled back. Never overwrites ``manual`` (a no-op then), and
a missing track is a clean no-op."""
row = await self._session.get(TrackModel, track_id)
if row is None or row.metadata_status == "manual":
return
row.metadata_status = "failed"
row.metadata_error = error
row.enriched_at = dt.datetime.now(dt.UTC)
await self._session.flush()
+26 -13
View File
@@ -42,19 +42,32 @@ async def enrich_track(_ctx: dict[str, Any], *, track_id: str) -> dict[str, Any]
base_url=settings.coverart_base_url,
)
async with session_scope() as session:
service = MetadataEnrichmentService(
tracks=SqlAlchemyTrackRepository(session),
artists=SqlAlchemyArtistRepository(session),
albums=SqlAlchemyAlbumRepository(session),
storage=get_file_storage(),
tag_reader=MutagenTagReader(),
fingerprinter=FpcalcFingerprinter(settings.fpcalc_path),
acoustid=acoustid,
cover_extractor=MutagenCoverExtractor(),
cover_provider=cover_provider,
)
result = await service.enrich(uuid.UUID(track_id))
tid = uuid.UUID(track_id)
try:
async with session_scope() as session:
service = MetadataEnrichmentService(
tracks=SqlAlchemyTrackRepository(session),
artists=SqlAlchemyArtistRepository(session),
albums=SqlAlchemyAlbumRepository(session),
storage=get_file_storage(),
tag_reader=MutagenTagReader(),
fingerprinter=FpcalcFingerprinter(settings.fpcalc_path),
acoustid=acoustid,
cover_extractor=MutagenCoverExtractor(),
cover_provider=cover_provider,
acoustid_trust_score=settings.acoustid_trust_score,
)
result = await service.enrich(tid)
except Exception as exc:
# The run's own transaction rolled back, leaving the track stuck at
# ``pending``. Record the failure in a fresh session so the UI shows a
# ``failed`` status with a reason instead of a silent, endless spinner.
log.exception("enrich_failed", track_id=track_id)
async with session_scope() as session:
await SqlAlchemyTrackRepository(session).mark_enrichment_failed(
tid, error=f"Enrichment crashed: {type(exc).__name__}: {exc}"
)
return {"track_id": track_id, "status": "failed", "mbid": None}
return {
"track_id": str(result.track_id),
+79
View File
@@ -3,10 +3,28 @@
The ASGI app is driven in-process via httpx + asgi-lifespan (no network, no
running server). DB/Redis-backed integration fixtures arrive with the data
layer (plan §11 step 2).
DB safety
---------
Integration fixtures call ``Base.metadata.drop_all`` / ``create_all`` on
``get_engine()``. That engine is built from ``DATABASE_URL``, which in normal
runs points at the *developer's* database — ``localhost:5432/mcma`` for host
``pytest`` and ``db:5432/mcma`` for ``make test-api`` (which execs ``pytest``
inside the api container). Running the suite there silently wipes real data:
``drop_all`` removes every ORM table while leaving Alembic's ``alembic_version``
(it lives outside ``Base.metadata``) — the exact "tables keep disappearing,
version survives" symptom.
To make that impossible, this module redirects every test to a dedicated
``<db>_test`` database *before settings/engine load*, and creates it on demand.
The real dev database is never opened by the test suite.
"""
import asyncio
import os
from collections.abc import AsyncIterator
from pathlib import Path
from urllib.parse import urlsplit, urlunsplit
import pytest
from asgi_lifespan import LifespanManager
@@ -17,6 +35,67 @@ os.environ.setdefault("ENVIRONMENT", "test")
os.environ.setdefault("JWT_SECRET", "test-secret")
def _base_database_url() -> str:
"""Resolve the DB URL the app *would* use, mirroring pydantic-settings
precedence: real env var → ``.env`` file → the app's compiled-in default."""
if env := os.environ.get("DATABASE_URL"):
return env
dotenv = Path(__file__).resolve().parents[1] / ".env"
if dotenv.exists():
for raw in dotenv.read_text().splitlines():
line = raw.strip()
if line.startswith("DATABASE_URL=") and not line.startswith("#"):
return line.split("=", 1)[1].strip().strip("\"'")
return "postgresql+asyncpg://mcma:mcma@localhost:5432/mcma"
def _with_database(url: str, name: str) -> str:
"""Return ``url`` with its database name swapped for ``name``."""
return urlunsplit(urlsplit(url)._replace(path=f"/{name}"))
_BASE_DB_URL = _base_database_url()
_BASE_DB_NAME = urlsplit(_BASE_DB_URL).path.lstrip("/")
# Idempotent: if we're already pointed at a *_test DB, keep it as-is.
_TEST_DB_NAME = _BASE_DB_NAME if _BASE_DB_NAME.endswith("_test") else f"{_BASE_DB_NAME}_test"
_TEST_DB_URL = _with_database(_BASE_DB_URL, _TEST_DB_NAME)
# Redirect the whole suite to the test DB before anything reads settings.
os.environ["DATABASE_URL"] = _TEST_DB_URL
async def _create_test_db_if_missing() -> None:
"""Create ``<db>_test`` if the server is reachable. Best-effort: if Postgres
is down the integration fixtures skip on their own reachability probe, so a
failure here must stay silent rather than break unit-only runs."""
import asyncpg # type: ignore[import-untyped] # driver behind postgresql+asyncpg
# asyncpg wants a plain libpq DSN (no SQLAlchemy "+asyncpg" suffix), against
# the always-present ``postgres`` maintenance database.
dsn = _with_database(_TEST_DB_URL, "postgres").replace("+asyncpg", "")
try:
async with asyncio.timeout(5):
conn = await asyncpg.connect(dsn)
except Exception:
return
try:
exists = await conn.fetchval(
"SELECT 1 FROM pg_database WHERE datname = $1", _TEST_DB_NAME
)
if not exists:
# CREATE DATABASE can't run inside a transaction; asyncpg's implicit
# autocommit on a bare connection handles that.
await conn.execute(f'CREATE DATABASE "{_TEST_DB_NAME}"')
finally:
await conn.close()
@pytest.fixture(scope="session", autouse=True)
def _ensure_test_database() -> None:
"""Guarantee the dedicated test database exists once per session."""
asyncio.run(_create_test_db_if_missing())
@pytest.fixture
async def client() -> AsyncIterator[AsyncClient]:
from app.main import create_app
+20
View File
@@ -0,0 +1,20 @@
# Test fixtures
## `scarlet_fire_otis_mcdonald.mp3`
"Scarlet Fire" by **Otis McDonald** — a royalty-free / license-free track
(YouTube Audio Library; distributed via Pro-Sound.org). Used as a real-world
audio fixture for the enrichment pipeline.
What makes it a good fixture: its **embedded ID3 tags are junk**
(`title=Sound_13958`, `artist=Music Track`, `album=Музыка`, `genre=Hip Hop & Rap`)
while AcoustID identifies it with very high confidence as *Scarlet Fire /
Otis McDonald*. So it exercises both:
- the offline tag reader (deterministic, always runs), and
- the "trust a high-confidence AcoustID match over junk tags" path
(`acoustid_trust_score`), which only runs when `fpcalc` + an AcoustID API key
+ network are available — see `tests/test_real_file_enrichment.py`.
Because it's license-free, it may also seed a built-in demo track for fresh
instances.
Binary file not shown.
+4
View File
@@ -52,6 +52,8 @@ class FakeTrackRepo:
genre=None,
year=None,
metadata_status=str(kw["metadata_status"]),
metadata_error=None,
enriched_at=None,
created_at=now,
updated_at=now,
)
@@ -133,6 +135,8 @@ async def test_dedup_skips_already_imported() -> None:
genre=None,
year=None,
metadata_status="pending",
metadata_error=None,
enriched_at=None,
created_at=now,
updated_at=now,
)
+80 -10
View File
@@ -39,6 +39,8 @@ def _track(*, metadata_status: str = "pending", title: str = "raw-stem") -> Trac
genre=None,
year=None,
metadata_status=metadata_status,
metadata_error=None,
enriched_at=None,
created_at=now,
updated_at=now,
)
@@ -251,6 +253,33 @@ async def test_nothing_found_marks_failed() -> None:
assert applied is not None
assert applied["artist_id"] == track.artist_id # fallback kept
assert applied["metadata_status"] == "failed"
# A failed run records a human-readable reason; here both id steps were
# available, so it's the generic "no match" message.
assert applied["metadata_error"] == "No metadata match found in tags or AcoustID."
async def test_failed_reason_names_unavailable_fingerprinter() -> None:
track = _track()
service, tracks, _, _, _ = _service(track=track, tags=None, fp=None, fp_available=False)
result = await service.enrich(track.id)
assert result.status == "failed"
applied = tracks.applied
assert applied is not None
assert "fingerprinting" in str(applied["metadata_error"])
async def test_successful_enrich_clears_error() -> None:
track = _track()
service, tracks, _, _, _ = _service(track=track, tags=AudioTags(artist="Pink Floyd"))
result = await service.enrich(track.id)
assert result.status == "enriched"
applied = tracks.applied
assert applied is not None
assert applied["metadata_error"] is None
async def test_acoustid_path_fills_when_tags_absent() -> None:
@@ -281,13 +310,14 @@ async def test_acoustid_path_fills_when_tags_absent() -> None:
assert "Daft Punk" in artists.created
async def test_tags_win_over_acoustid_for_overlapping_fields() -> None:
async def test_tags_win_over_low_confidence_acoustid() -> None:
track = _track()
fp = Fingerprint(fingerprint="AQAA", duration_seconds=200)
tags = AudioTags(title="Tagged Title", artist="Tagged Artist")
# Below the 0.85 trust threshold → keep tag-first.
match = RecordingMatch(
acoustid="aid",
score=0.9,
score=0.5,
recording_mbid="mbid",
title="AcoustID Title",
artist="AcoustID Artist",
@@ -306,6 +336,36 @@ async def test_tags_win_over_acoustid_for_overlapping_fields() -> None:
assert applied["musicbrainz_id"] == "mbid"
async def test_high_confidence_acoustid_wins_over_junk_tags() -> None:
track = _track()
fp = Fingerprint(fingerprint="AQAA", duration_seconds=200)
# The real-world bug: junk embedded tags on a downloaded file vs a near-
# certain acoustic identification. The match must win for the identity.
tags = AudioTags(title="Sound_13958", artist="Music Track", album="Музыка")
match = RecordingMatch(
acoustid="aid",
score=0.98,
recording_mbid="mbid",
release_group_mbid="rg",
title="Scarlet Fire",
artist="Otis McDonald",
album="Scarlet Fire",
)
service, tracks, artists, albums, _acoustid = _service(
track=track, tags=tags, fp=fp, match=match
)
await service.enrich(track.id)
applied = tracks.applied
assert applied is not None
assert applied["title"] == "Scarlet Fire"
assert "Otis McDonald" in artists.created
assert "Music Track" not in artists.created
assert albums.created and albums.created[0][0] == "Scarlet Fire"
assert applied["metadata_status"] == "enriched"
async def test_fingerprint_skipped_when_acoustid_unavailable() -> None:
track = _track()
fp = Fingerprint(fingerprint="AQAA", duration_seconds=200)
@@ -356,8 +416,10 @@ async def test_cover_extracted_from_embedded_art() -> None:
extractor = FakeCoverExtractor(_PNG)
provider = FakeCoverProvider(_JPG)
service, albums, storage = _cover_service(
track=track, tags=AudioTags(album="The Wall", artist="PF"),
extractor=extractor, provider=provider,
track=track,
tags=AudioTags(album="The Wall", artist="PF"),
extractor=extractor,
provider=provider,
)
await service.enrich(track.id)
@@ -377,8 +439,12 @@ async def test_cover_falls_back_to_archive() -> None:
match = RecordingMatch(acoustid="ac", score=1.0, release_group_mbid="rg-123", album="The Wall")
fp = Fingerprint(fingerprint="AQAA", duration_seconds=200)
service, albums, storage = _cover_service(
track=track, tags=AudioTags(album="The Wall", artist="PF"),
match=match, fp=fp, extractor=extractor, provider=provider,
track=track,
tags=AudioTags(album="The Wall", artist="PF"),
match=match,
fp=fp,
extractor=extractor,
provider=provider,
)
await service.enrich(track.id)
@@ -394,8 +460,10 @@ async def test_cover_not_fetched_without_release_group() -> None:
track = _track()
provider = FakeCoverProvider(_JPG)
service, albums, _ = _cover_service(
track=track, tags=AudioTags(album="The Wall", artist="PF"),
extractor=FakeCoverExtractor(None), provider=provider,
track=track,
tags=AudioTags(album="The Wall", artist="PF"),
extractor=FakeCoverExtractor(None),
provider=provider,
)
await service.enrich(track.id)
@@ -408,8 +476,10 @@ async def test_existing_cover_is_not_overwritten() -> None:
track = _track()
extractor = FakeCoverExtractor(_PNG)
service, albums, storage = _cover_service(
track=track, tags=AudioTags(album="The Wall", artist="PF"),
extractor=extractor, existing_cover="covers/old.jpg",
track=track,
tags=AudioTags(album="The Wall", artist="PF"),
extractor=extractor,
existing_cover="covers/old.jpg",
)
await service.enrich(track.id)
+206
View File
@@ -0,0 +1,206 @@
"""Enrichment tests against a real audio file (``tests/fixtures/``).
The fixture "Scarlet Fire" by Otis McDonald carries *junk* embedded tags
(``Sound_13958`` / ``Music Track`` / ``Музыка``) yet is identified by AcoustID
with ~0.99 confidence. That makes it the real-world reproduction of the
"uploaded a track, got the wrong name/artist" bug: tag reading must be exact,
and a high-confidence AcoustID match must override the junk tags.
Two layers:
- The tag-reader test is offline and deterministic — it always runs.
- The AcoustID/identity tests need the ``fpcalc`` binary, an AcoustID API key,
and network. They *skip* (never fail) when those aren't present, honouring the
project rule that the suite never hard-requires network. They do run inside the
api/worker container (``make test-api``), which ships fpcalc + the key.
"""
import uuid
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
import pytest
from app.application.metadata_service import MetadataEnrichmentService
from app.core.config import get_settings
from app.domain.entities.album import Album
from app.domain.entities.track import Artist, Track
from app.infrastructure.metadata.acoustid import AcoustIdHttpClient
from app.infrastructure.metadata.fingerprint import FpcalcFingerprinter
from app.infrastructure.metadata.tags import MutagenTagReader
pytestmark = pytest.mark.asyncio
FIXTURE = Path(__file__).parent / "fixtures" / "scarlet_fire_otis_mcdonald.mp3"
_settings = get_settings()
_fpcalc = FpcalcFingerprinter(_settings.fpcalc_path)
# Gate for the network/identity tests — present in the container, absent in CI.
requires_acoustid = pytest.mark.skipif(
not (_fpcalc.is_available() and _settings.acoustid_api_key is not None),
reason="needs the fpcalc binary + ACOUSTID_API_KEY (+ network)",
)
def _acoustid_client() -> AcoustIdHttpClient:
key = _settings.acoustid_api_key
return AcoustIdHttpClient(
api_key=key.get_secret_value() if key else None,
user_agent=_settings.musicbrainz_user_agent,
api_url=_settings.acoustid_api_url,
)
# --- offline: tag reading on a real file -----------------------------------
async def test_real_file_embedded_tags_are_read() -> None:
"""The reader extracts the file's actual (junk) embedded tags verbatim —
proving real-file tag parsing works end to end, no network involved."""
assert FIXTURE.exists(), "fixture mp3 missing"
tags = await MutagenTagReader().read(FIXTURE)
assert tags is not None
assert tags.title == "Sound_13958"
assert tags.artist == "Music Track"
assert tags.album == "Музыка"
assert tags.genre == "Hip Hop & Rap"
assert tags.year == 2018
assert tags.duration_seconds == 143
assert tags.bitrate == 128
# --- networked: AcoustID identifies the real recording ---------------------
@requires_acoustid
async def test_real_file_identified_by_acoustid() -> None:
"""fpcalc → AcoustID identifies the real audio as Scarlet Fire / Otis
McDonald with high confidence (despite the junk tags)."""
fingerprint = await _fpcalc.calculate(FIXTURE)
if fingerprint is None:
pytest.skip("fpcalc produced no fingerprint")
match = await _acoustid_client().lookup(fingerprint)
if match is None:
pytest.skip("AcoustID returned no match (network/rate limit?)")
assert match.score >= _settings.acoustid_trust_score
assert match.title == "Scarlet Fire"
assert match.artist == "Otis McDonald"
assert match.recording_mbid is not None
@requires_acoustid
async def test_real_file_enrichment_overrides_junk_tags() -> None:
"""Full pipeline on the real file with the real tag-reader, fingerprinter
and AcoustID client: the high-confidence match wins over the junk embedded
tags, so the track is stored as Scarlet Fire / Otis McDonald."""
track = _pending_track()
tracks = _FakeTrackRepo(track)
artists = _FakeArtistRepo()
albums = _FakeAlbumRepo()
service = MetadataEnrichmentService(
tracks=tracks, # type: ignore[arg-type]
artists=artists, # type: ignore[arg-type]
albums=albums, # type: ignore[arg-type]
storage=_FixtureStorage(), # type: ignore[arg-type]
tag_reader=MutagenTagReader(),
fingerprinter=_fpcalc,
acoustid=_acoustid_client(),
acoustid_trust_score=_settings.acoustid_trust_score,
)
result = await service.enrich(track.id)
if result.status == "failed":
pytest.skip("AcoustID unavailable at run time (network/rate limit?)")
assert result.status == "enriched"
applied = tracks.applied
assert applied is not None
assert applied["title"] == "Scarlet Fire"
assert "Otis McDonald" in artists.created
assert "Music Track" not in artists.created
assert albums.created and albums.created[0][0] == "Scarlet Fire"
# --- minimal in-memory adapters --------------------------------------------
def _pending_track() -> Track:
now = datetime.now(UTC)
return Track(
id=uuid.uuid4(),
title="scarlet_fire_otis_mcdonald", # the upload-time filename stem
artist_id=uuid.uuid4(),
album_id=None,
storage_uri="tracks/sf/scarlet.mp3",
file_format="mp3",
file_size=FIXTURE.stat().st_size,
source="upload",
source_id="sha-real",
duration_seconds=None,
genre=None,
year=None,
metadata_status="pending",
metadata_error=None,
enriched_at=None,
created_at=now,
updated_at=now,
)
class _FixtureStorage:
@asynccontextmanager
async def as_local_path(self, _key: str) -> AsyncIterator[Path]:
yield FIXTURE
class _FakeTrackRepo:
def __init__(self, track: Track) -> None:
self._track = track
self.applied: dict[str, object] | None = None
async def get_by_id(self, _track_id: uuid.UUID) -> Track:
return self._track
async def apply_enrichment(self, _track_id: uuid.UUID, **kw: object) -> Track:
self.applied = kw
return self._track
@dataclass
class _FakeArtistRepo:
created: list[str] = field(default_factory=list)
async def get_or_create(self, name: str) -> Artist:
self.created.append(name)
now = datetime.now(UTC)
return Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now)
@dataclass
class _FakeAlbumRepo:
created: list[tuple[str, uuid.UUID]] = field(default_factory=list)
async def get_or_create(
self, *, title: str, artist_id: uuid.UUID, year: int | None, musicbrainz_id: str | None
) -> Album:
self.created.append((title, artist_id))
now = datetime.now(UTC)
return Album(
id=uuid.uuid4(),
title=title,
artist_id=artist_id,
year=year,
cover_path=None,
musicbrainz_id=musicbrainz_id,
created_at=now,
updated_at=now,
)
async def set_cover_path(self, _album_id: uuid.UUID, _cover_path: str) -> None:
return None