Compare commits

..

4 Commits

Author SHA1 Message Date
Senko-san 63c7d05eca feat(metadata): implement single-track metadata editor API (§A7/§1H)
Docker Build & Publish / Prune old image versions (push) Has been cancelled
Docker Build & Publish / build (push) Has been cancelled
Docker Build & Publish / push (push) Has been cancelled
Adds inline AcoustID match-finding (multiple ranked candidates via
lookup_all) and PUT /tracks/{id}/metadata for manual edits, resolving
artist/album and setting metadata_status=manual. Extends TrackOut with
genre/year/track_number.
2026-06-13 14:34:43 +03:00
Senko-san 73d7da440f feat(enrichment): record status/errors and trust high-confidence AcoustID
Docker Build & Publish / build (push) Has been cancelled
Docker Build & Publish / push (push) Has been cancelled
Docker Build & Publish / Prune old image versions (push) Has been cancelled
Two related gaps surfaced from "uploaded a track, nothing changed / no status":

- A track could stay stuck on `pending` forever (an unexpected worker error
  rolled back the run without recording anything), and `failed` carried no
  reason. Add `tracks.metadata_error` + `tracks.enriched_at` (migration), stamp
  the outcome in apply_enrichment, add TrackRepository.mark_enrichment_failed,
  wrap enrich_task to persist crashes as `failed` in a fresh session, and emit a
  human-readable no-match reason. Expose metadata_error/enriched_at in TrackOut.

- The tag-first merge let junk embedded tags (e.g. "Music Track"/"Sound_13958")
  override even a 0.99-confidence AcoustID match. Add acoustid_trust_score
  (default 0.85): above it the acoustic identity wins for title/artist/album/
  year, tags are fallback; below it, tag-first as before.

Add a license-free real-file fixture (Scarlet Fire / Otis McDonald) whose junk
tags AcoustID overrides, with an always-on tag-reader test plus fpcalc/AcoustID/
network-gated identity + full-pipeline tests (skip on host, run in the container).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-13 13:29:08 +03:00
Senko-san 30cb8901f2 fix(tests): isolate suite to a dedicated *_test database
Integration fixtures call Base.metadata.drop_all/create_all on get_engine(),
whose DATABASE_URL points at the developer's real DB — localhost:5432/mcma for
host pytest, db:5432/mcma for `make test-api` (pytest runs inside the api
container). Every run silently wiped dev data: drop_all removes ORM tables but
leaves alembic_version (outside Base.metadata), the exact "tables keep
disappearing, version survives" symptom.

conftest now redirects the whole suite to a <db>_test database before settings
load and creates it on demand via asyncpg, so the dev DB is never opened.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-13 13:27:58 +03:00
Senko-san 0bb752f582 feat: cover-art pipeline (§1D)
Docker Build & Publish / build (push) Has been cancelled
Docker Build & Publish / push (push) Has been cancelled
Docker Build & Publish / Prune old image versions (push) Has been cancelled
Resolve, store and serve album cover art.

Sources (tag-first, mirroring enrichment): embedded artwork extracted
offline via mutagen (ID3 APIC / FLAC+OGG Picture / MP4 covr), then Cover
Art Archive by release-group MBID as a network fallback. Resolution runs
inside MetadataEnrichmentService after album resolution, only when the
album has no cover yet (idempotent, never overwrites), and is best-effort
so a cover failure never affects enrichment status.

- CoverArt value object + CoverArtExtractor/CoverArtProvider ports
- MutagenCoverExtractor + CoverArtArchiveClient adapters
- AcoustID parser now captures release_group_mbid
- Covers stored via FileStorage at covers/{album_id}.{ext} (local + S3)
- AlbumRepository.set_cover_path
- Serve real covers: GET /api/v1/albums|tracks/{id}/cover (StreamUser,
  ?token=), Subsonic getCoverArt (placeholder fallback)
- has_cover flag on AlbumOut/TrackOut
- coverart_enabled / coverart_base_url settings
- tests: cover resolution units + release_group parse + DB-backed
  test_cover_api.py (139 green via make test-api)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-13 12:10:05 +03:00
36 changed files with 1809 additions and 69 deletions
@@ -0,0 +1,39 @@
"""tracks: enrichment outcome (error reason + completion time)
Revision ID: 20260613_enrich_outcome
Revises: 20260608_subsonic_pw
Create Date: 2026-06-13 13:00:00.000000
Adds ``tracks.metadata_error`` and ``tracks.enriched_at`` so a finished
enrichment run records *why* it failed and *when* it completed. Lets the UI
distinguish a still-pending/running track from one that is done or failed, and
surface an actionable reason instead of a silent spinner (plan §6.2).
"""
from __future__ import annotations
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
revision: str = "20260613_enrich_outcome"
down_revision: str | None = "20260608_subsonic_pw"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.add_column(
"tracks",
sa.Column("metadata_error", sa.String(length=2048), nullable=True),
)
op.add_column(
"tracks",
sa.Column("enriched_at", sa.DateTime(timezone=True), nullable=True),
)
def downgrade() -> None:
op.drop_column("tracks", "enriched_at")
op.drop_column("tracks", "metadata_error")
+57
View File
@@ -0,0 +1,57 @@
"""Shared cover-art serving helper (presentation).
Streams a stored cover image from the :class:`FileStorage` port. Used by the
native ``/api/v1`` cover endpoints and the Subsonic ``getCoverArt`` adapter so
the streaming/content-type logic lives in one place.
"""
import uuid
from fastapi.responses import StreamingResponse
from app.domain.entities.album import Album
from app.domain.errors import NotFoundError, StorageError
from app.domain.ports import AlbumRepository, FileStorage, TrackRepository
_CONTENT_TYPE_BY_EXT: dict[str, str] = {
"jpg": "image/jpeg",
"jpeg": "image/jpeg",
"png": "image/png",
"webp": "image/webp",
"gif": "image/gif",
}
# Covers are immutable for a given album (a new cover means a new key), so let
# clients cache aggressively.
_CACHE_CONTROL = "public, max-age=86400"
def _content_type_for(key: str) -> str:
ext = key.rsplit(".", 1)[-1].lower() if "." in key else ""
return _CONTENT_TYPE_BY_EXT.get(ext, "application/octet-stream")
async def stream_cover(storage: FileStorage, cover_path: str) -> StreamingResponse:
"""Stream a stored cover by its storage key. Raises ``NotFoundError`` if the
object is missing (a dangling ``cover_path`` reads as "no cover")."""
try:
stream, total = await storage.open_range(cover_path, 0, None)
except StorageError as exc:
raise NotFoundError("Cover not found.") from exc
return StreamingResponse(
stream,
media_type=_content_type_for(cover_path),
headers={"Content-Length": str(total), "Cache-Control": _CACHE_CONTROL},
)
async def resolve_album_for_track(
track_repo: TrackRepository,
album_repo: AlbumRepository,
track_id: uuid.UUID,
) -> Album | None:
"""The album that owns a track (cover lives on the album), or ``None``."""
track = await track_repo.get_by_id(track_id)
if track is None or track.album_id is None:
return None
return await album_repo.get_by_id(track.album_id)
+30
View File
@@ -15,6 +15,7 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.ext.asyncio import AsyncSession
from app.application.auth_service import AuthService
from app.application.metadata_service import MetadataEnrichmentService
from app.application.streaming_service import StreamingService
from app.application.subsonic_auth_service import SubsonicAuthService
from app.application.upload_service import UploadService
@@ -35,6 +36,9 @@ from app.infrastructure.db.repositories import (
SqlAlchemyTrackRepository,
SqlAlchemyUserRepository,
)
from app.infrastructure.metadata.acoustid import AcoustIdHttpClient
from app.infrastructure.metadata.fingerprint import FpcalcFingerprinter
from app.infrastructure.metadata.tags import MutagenTagReader
from app.infrastructure.sources.registry import SourceRegistry, build_source_registry
from app.infrastructure.storage.provider import get_file_storage
from app.workers.queue import enqueue_enrich
@@ -132,8 +136,34 @@ def get_streaming_service(session: SessionDep, storage: FileStorageDep) -> Strea
)
def get_metadata_service(
session: SessionDep, storage: FileStorageDep
) -> MetadataEnrichmentService:
"""Wires the §6.2 fingerprint/AcoustID adapters for read-only, inline use
(the metadata editor's "find matches" — §A7). The full pipeline (incl.
cover art) stays in the worker (`tasks/enrich_task.py`)."""
settings = get_settings()
api_key = settings.acoustid_api_key.get_secret_value() if settings.acoustid_api_key else None
acoustid = AcoustIdHttpClient(
api_key=api_key,
user_agent=settings.musicbrainz_user_agent,
api_url=settings.acoustid_api_url,
)
return MetadataEnrichmentService(
tracks=SqlAlchemyTrackRepository(session),
artists=SqlAlchemyArtistRepository(session),
albums=SqlAlchemyAlbumRepository(session),
storage=storage,
tag_reader=MutagenTagReader(),
fingerprinter=FpcalcFingerprinter(settings.fpcalc_path),
acoustid=acoustid,
acoustid_trust_score=settings.acoustid_trust_score,
)
UploadServiceDep = Annotated[UploadService, Depends(get_upload_service)]
StreamingServiceDep = Annotated[StreamingService, Depends(get_streaming_service)]
MetadataServiceDep = Annotated[MetadataEnrichmentService, Depends(get_metadata_service)]
# -- library repository deps ---------------------------------------------------
+32 -6
View File
@@ -13,9 +13,17 @@ from typing import Annotated
from fastapi import APIRouter, Header, Query
from fastapi.responses import Response, StreamingResponse
from app.api.deps import StreamingServiceDep, SubsonicUser, TrackRepoDep
from app.api.rest.ids import decode_track, parse
from app.domain.errors import NotFoundError
from app.api.covers import resolve_album_for_track, stream_cover
from app.api.deps import (
AlbumRepoDep,
FileStorageDep,
StreamingServiceDep,
SubsonicUser,
TrackRepoDep,
)
from app.api.rest.ids import IdKind, decode_track, parse
from app.domain.entities.album import Album
from app.domain.errors import NotFoundError, StorageError
router = APIRouter()
@@ -69,10 +77,28 @@ async def download(
@router.api_route("/getCoverArt.view", methods=["GET", "POST"])
async def get_cover_art(
_user: SubsonicUser,
album_repo: AlbumRepoDep,
track_repo: TrackRepoDep,
storage: FileStorageDep,
id: Annotated[str, Query()],
size: Annotated[int | None, Query()] = None,
) -> Response:
# Validate the id shape so clients get a clean error on garbage, then serve a
# placeholder. TODO: stream real covers once the cover pipeline exists.
parse(id)
# Cover ids reuse the entity id: ``al-<uuid>`` (album) or ``tr-<uuid>``
# (track → its album). Unlike the native API, Subsonic clients expect an
# image either way, so a missing cover falls back to a placeholder rather
# than 404. ``size`` is accepted but ignored (we serve the stored image).
kind, value = parse(id)
album: Album | None
if kind is IdKind.ALBUM:
album = await album_repo.get_by_id(value)
elif kind is IdKind.TRACK:
album = await resolve_album_for_track(track_repo, album_repo, value)
else:
album = None
if album is not None and album.cover_path:
try:
return await stream_cover(storage, album.cover_path)
except NotFoundError, StorageError:
pass
return Response(content=_PLACEHOLDER_PNG, media_type="image/png")
+1
View File
@@ -13,4 +13,5 @@ class AlbumOut(BaseModel):
artist_name: str
year: int | None
track_count: int
has_cover: bool
created_at: dt.datetime
+36
View File
@@ -16,8 +16,14 @@ class TrackOut(BaseModel):
duration_seconds: int | None
file_format: str
file_size: int
genre: str | None
year: int | None
track_number: int | None
metadata_status: str
metadata_error: str | None
enriched_at: dt.datetime | None
source: str
has_cover: bool
created_at: dt.datetime
@@ -25,3 +31,33 @@ class TrackUpdate(BaseModel):
title: str | None = None
genre: str | None = None
year: int | None = None
class MetadataMatch(BaseModel):
"""One AcoustID candidate for the metadata editor's match picker (§A7)."""
acoustid: str
score: float
recording_mbid: str | None
release_group_mbid: str | None
title: str | None
artist: str | None
album: str | None
year: int | None
class MetadataMatchesOut(BaseModel):
items: list[MetadataMatch]
class MetadataApply(BaseModel):
"""Manual edits / accepted match applied via ``PUT /tracks/{id}/metadata``.
Sets ``metadata_status = manual`` (never overwritten by auto-enrichment)."""
title: str | None = None
artist_name: str | None = None
album_title: str | None = None
year: int | None = None
genre: str | None = None
track_number: int | None = None
+22 -3
View File
@@ -1,11 +1,19 @@
"""Album endpoints."""
import uuid
from typing import Any
from fastapi import APIRouter, Query
from fastapi.responses import StreamingResponse
from app.api.deps import AlbumRepoDep, ArtistRepoDep, CurrentUser, TrackRepoDep
from app.api.covers import stream_cover
from app.api.deps import (
AlbumRepoDep,
ArtistRepoDep,
CurrentUser,
FileStorageDep,
StreamUser,
TrackRepoDep,
)
from app.api.schemas.album import AlbumOut
from app.api.schemas.pagination import PagedResponse
from app.api.schemas.track import TrackOut
@@ -30,6 +38,7 @@ async def _build_album_out(
artist_name=artists[a.artist_id].name if a.artist_id in artists else "Unknown Artist",
year=a.year,
track_count=track_counts.get(a.id, 0),
has_cover=bool(a.cover_path),
created_at=a.created_at,
)
for a in albums
@@ -109,4 +118,14 @@ async def get_album_tracks(
@router.get("/{album_id}/cover")
async def get_album_cover(album_id: uuid.UUID, _: CurrentUser) -> Any: ...
async def get_album_cover(
album_id: uuid.UUID,
album_repo: AlbumRepoDep,
storage: FileStorageDep,
_: StreamUser,
) -> StreamingResponse:
# ``<img>`` can't send a bearer header → StreamUser accepts ``?token=``.
album = await album_repo.get_by_id(album_id)
if album is None or not album.cover_path:
raise NotFoundError("Cover not found.")
return await stream_cover(storage, album.cover_path)
+114 -5
View File
@@ -4,10 +4,26 @@ import uuid
from typing import Any
from fastapi import APIRouter, Query, Response
from fastapi.responses import StreamingResponse
from app.api.deps import AlbumRepoDep, ArtistRepoDep, CurrentUser, FileStorageDep, TrackRepoDep
from app.api.covers import resolve_album_for_track, stream_cover
from app.api.deps import (
AlbumRepoDep,
ArtistRepoDep,
CurrentUser,
FileStorageDep,
MetadataServiceDep,
StreamUser,
TrackRepoDep,
)
from app.api.schemas.pagination import PagedResponse
from app.api.schemas.track import TrackOut, TrackUpdate
from app.api.schemas.track import (
MetadataApply,
MetadataMatch,
MetadataMatchesOut,
TrackOut,
TrackUpdate,
)
from app.domain.entities.album import Album
from app.domain.entities.track import Artist, Track
from app.domain.errors import NotFoundError
@@ -32,8 +48,14 @@ async def _build_track_out(
duration_seconds=t.duration_seconds,
file_format=t.file_format,
file_size=t.file_size,
genre=t.genre,
year=t.year,
track_number=t.track_number,
metadata_status=t.metadata_status,
metadata_error=t.metadata_error,
enriched_at=t.enriched_at,
source=t.source,
has_cover=bool(t.album_id and albums.get(t.album_id) and albums[t.album_id].cover_path),
created_at=t.created_at,
)
for t in tracks
@@ -144,7 +166,19 @@ async def optimize_track(track_id: uuid.UUID, _: CurrentUser) -> Any: ...
@router.get("/{track_id}/cover")
async def get_track_cover(track_id: uuid.UUID, _: CurrentUser) -> Any: ...
async def get_track_cover(
track_id: uuid.UUID,
track_repo: TrackRepoDep,
album_repo: AlbumRepoDep,
storage: FileStorageDep,
_: StreamUser,
) -> StreamingResponse:
# A track's cover is its album's cover. ``<img>`` can't send a bearer
# header → StreamUser accepts ``?token=``.
album = await resolve_album_for_track(track_repo, album_repo, track_id)
if album is None or not album.cover_path:
raise NotFoundError("Cover not found.")
return await stream_cover(storage, album.cover_path)
@router.post("/{track_id}/metadata/enrich")
@@ -163,8 +197,83 @@ async def enrich_metadata(
@router.get("/{track_id}/metadata/matches")
async def get_metadata_matches(track_id: uuid.UUID, _: CurrentUser) -> Any: ...
async def get_metadata_matches(
track_id: uuid.UUID,
track_repo: TrackRepoDep,
metadata_service: MetadataServiceDep,
_: CurrentUser,
) -> MetadataMatchesOut:
"""AcoustID candidates for the metadata editor's match picker (§A7).
Runs the fingerprint lookup inline (single track, user-triggered) and
never mutates the track. Degrades to an empty list if fpcalc/AcoustID are
unavailable or no match is found.
"""
track = await track_repo.get_by_id(track_id)
if track is None:
raise NotFoundError(f"Track {track_id} not found.")
matches = await metadata_service.find_matches(track_id)
return MetadataMatchesOut(
items=[
MetadataMatch(
acoustid=m.acoustid,
score=m.score,
recording_mbid=m.recording_mbid,
release_group_mbid=m.release_group_mbid,
title=m.title,
artist=m.artist,
album=m.album,
year=m.year,
)
for m in matches
]
)
@router.put("/{track_id}/metadata")
async def set_metadata(track_id: uuid.UUID, _: CurrentUser) -> Any: ...
async def set_metadata(
track_id: uuid.UUID,
body: MetadataApply,
track_repo: TrackRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
_: CurrentUser,
) -> TrackOut:
"""Apply manual edits or an accepted AcoustID match (§A7). Sets
``metadata_status = manual`` — never overwritten by auto-enrichment."""
track = await track_repo.get_by_id(track_id)
if track is None:
raise NotFoundError(f"Track {track_id} not found.")
artist_id: uuid.UUID | None = None
if body.artist_name:
artist = await artist_repo.get_or_create(body.artist_name)
artist_id = artist.id
album_id: uuid.UUID | None = None
if body.album_title:
album = await album_repo.get_or_create(
title=body.album_title,
artist_id=artist_id or track.artist_id,
year=body.year,
musicbrainz_id=None,
)
album_id = album.id
track = await track_repo.update(
track_id,
title=body.title,
genre=body.genre,
year=body.year,
artist_id=artist_id,
album_id=album_id,
track_number=body.track_number,
)
artist_ids = [track.artist_id]
album_ids = [track.album_id] if track.album_id else []
artists = {a.id: a for a in await artist_repo.get_many(artist_ids)}
albums = {a.id: a for a in await album_repo.get_many(album_ids)}
items = await _build_track_out([track], artists, albums)
return items[0]
+137 -11
View File
@@ -12,10 +12,14 @@ Invariants (plan §6.2, CLAUDE.md):
- **Idempotent** — re-running only fills gaps; ``apply_enrichment`` never erases.
"""
import tempfile
import uuid
from dataclasses import dataclass
from pathlib import Path
from app.core.logging import get_logger
from app.domain.entities.album import Album
from app.domain.entities.cover import CoverArt
from app.domain.entities.metadata import AudioTags, RecordingMatch
from app.domain.ports import (
AcoustIdClient,
@@ -23,6 +27,8 @@ from app.domain.ports import (
ArtistRepository,
AudioFingerprinter,
AudioTagReader,
CoverArtExtractor,
CoverArtProvider,
FileStorage,
TrackRepository,
)
@@ -50,6 +56,9 @@ class MetadataEnrichmentService:
tag_reader: AudioTagReader,
fingerprinter: AudioFingerprinter,
acoustid: AcoustIdClient,
cover_extractor: CoverArtExtractor | None = None,
cover_provider: CoverArtProvider | None = None,
acoustid_trust_score: float = 0.85,
) -> None:
self._tracks = tracks
self._artists = artists
@@ -58,6 +67,9 @@ class MetadataEnrichmentService:
self._tag_reader = tag_reader
self._fingerprinter = fingerprinter
self._acoustid = acoustid
self._cover_extractor = cover_extractor
self._cover_provider = cover_provider
self._acoustid_trust_score = acoustid_trust_score
async def enrich(self, track_id: uuid.UUID) -> EnrichmentResult:
track = await self._tracks.get_by_id(track_id)
@@ -71,16 +83,31 @@ class MetadataEnrichmentService:
tags = await self._read_local(track.storage_uri)
match = await self._identify(track.storage_uri)
# Merge sources: prefer embedded tags, fall back to the AcoustID match.
# ``title`` is guaranteed non-None by the existing track title; the rest
# stay None when neither source has them.
# Merge order is tag-first by default — embedded tags fix the common
# well-tagged offline case. But a *high-confidence* AcoustID match is the
# more trustworthy identity (downloaded files routinely carry junk tags
# like "Music Track"/"Sound_12345"), so above the trust threshold the
# acoustic match wins for the identity fields and tags become fallback.
tag_title = tags.title if tags else None
tag_artist = tags.artist if tags else None
tag_album = tags.album if tags else None
title = _opt_str(tag_title, match.title if match else None) or track.title
artist_name = _opt_str(tag_artist, match.artist if match else None)
album_title = _opt_str(tag_album, match.album if match else None)
year = _first_int(tags.year if tags else None, match.year if match else None)
match_title = match.title if match else None
match_artist = match.artist if match else None
match_album = match.album if match else None
match_year = match.year if match else None
tag_year = tags.year if tags else None
trust_match = match is not None and match.score >= self._acoustid_trust_score
if trust_match:
title = _opt_str(match_title, tag_title) or track.title
artist_name = _opt_str(match_artist, tag_artist)
album_title = _opt_str(match_album, tag_album)
year = _first_int(match_year, tag_year)
else:
title = _opt_str(tag_title, match_title) or track.title
artist_name = _opt_str(tag_artist, match_artist)
album_title = _opt_str(tag_album, match_album)
year = _first_int(tag_year, match_year)
genre = tags.genre if tags else None
track_number = tags.track_number if tags else None
duration = _first_int(
@@ -92,10 +119,21 @@ class MetadataEnrichmentService:
acoustid_id = match.acoustid if match else None
artist_id = await self._resolve_artist(artist_name, fallback=track.artist_id)
album_id = await self._resolve_album(album_title, artist_id=artist_id, year=year, mbid=mbid)
album = await self._resolve_album(album_title, artist_id=artist_id, year=year, mbid=mbid)
album_id = album.id if album is not None else None
if album is not None:
await self._resolve_cover(
album,
storage_uri=track.storage_uri,
release_group_mbid=match.release_group_mbid if match else None,
)
identified = bool(artist_name) or album_id is not None or mbid is not None
status = "enriched" if identified else "failed"
# On a clean "no identity" outcome, record *why* so the UI shows a reason
# rather than a bare "failed". A successful run clears any prior error.
metadata_error = None if identified else self._no_match_reason()
await self._tracks.apply_enrichment(
track_id,
@@ -110,10 +148,43 @@ class MetadataEnrichmentService:
acoustid_fingerprint=acoustid_id,
musicbrainz_id=mbid,
metadata_status=status,
metadata_error=metadata_error,
)
log.info("enrich_complete", track_id=str(track_id), status=status, mbid=mbid)
return EnrichmentResult(track_id=track_id, status=status, matched_mbid=mbid)
def _no_match_reason(self) -> str:
"""Explain a ``failed`` (no-identity) run in terms a user can act on:
which optional identification step was unavailable, if any."""
if not self._fingerprinter.is_available():
return "No metadata match: audio fingerprinting (fpcalc) is unavailable."
if not self._acoustid.is_available():
return "No metadata match: AcoustID lookup is unavailable (no API key)."
return "No metadata match found in tags or AcoustID."
async def find_matches(self, track_id: uuid.UUID) -> list[RecordingMatch]:
"""AcoustID candidates for the metadata editor's match picker (§A7).
Read-only — unlike :meth:`enrich`, never touches the track. Runs
inline (single track, user-triggered) rather than via the worker.
Degrades to ``[]`` whenever fingerprinting/AcoustID is unavailable or
the file can't be read, same as the enrichment pipeline.
"""
track = await self._tracks.get_by_id(track_id)
if track is None:
return []
if not self._acoustid.is_available() or not self._fingerprinter.is_available():
return []
try:
async with self._storage.as_local_path(track.storage_uri) as path:
fingerprint = await self._fingerprinter.calculate(path)
if fingerprint is None:
return []
return await self._acoustid.lookup_all(fingerprint)
except Exception:
log.warning("find_matches_failed", track_id=str(track_id))
return []
async def _read_local(self, storage_uri: str) -> AudioTags | None:
try:
async with self._storage.as_local_path(storage_uri) as path:
@@ -148,16 +219,71 @@ class MetadataEnrichmentService:
artist_id: uuid.UUID,
year: int | None,
mbid: str | None,
) -> uuid.UUID | None:
) -> Album | None:
if not title:
return None
album = await self._albums.get_or_create(
return await self._albums.get_or_create(
title=title,
artist_id=artist_id,
year=year,
musicbrainz_id=mbid,
)
return album.id
async def _resolve_cover(
self,
album: Album,
*,
storage_uri: str,
release_group_mbid: str | None,
) -> None:
"""Fill in an album cover when it has none. Source order mirrors the
tag-first pipeline: embedded artwork (offline) → Cover Art Archive
(network, by release-group). Best-effort — any failure is swallowed so a
missing cover never affects enrichment status."""
if album.cover_path:
return # already has one — never overwrite (idempotent)
cover = await self._extract_cover(storage_uri)
if cover is None:
cover = await self._fetch_cover(release_group_mbid)
if cover is None:
return
try:
key = await self._save_cover(album.id, cover)
await self._albums.set_cover_path(album.id, key)
log.info("cover_resolved", album_id=str(album.id), content_type=cover.content_type)
except Exception:
log.warning("cover_save_failed", album_id=str(album.id))
async def _extract_cover(self, storage_uri: str) -> CoverArt | None:
if self._cover_extractor is None:
return None
try:
async with self._storage.as_local_path(storage_uri) as path:
return await self._cover_extractor.extract(path)
except Exception:
log.warning("cover_extract_step_failed", storage_uri=storage_uri)
return None
async def _fetch_cover(self, release_group_mbid: str | None) -> CoverArt | None:
if self._cover_provider is None or not release_group_mbid:
return None
if not self._cover_provider.is_available():
return None
try:
return await self._cover_provider.fetch_release_group(release_group_mbid)
except Exception:
log.warning("cover_fetch_step_failed", release_group=release_group_mbid)
return None
async def _save_cover(self, album_id: uuid.UUID, cover: CoverArt) -> str:
key = f"covers/{album_id}.{cover.extension}"
with tempfile.NamedTemporaryFile(suffix=f".{cover.extension}") as tmp:
tmp.write(cover.data)
tmp.flush()
await self._storage.save_file(key, Path(tmp.name))
return key
def _opt_str(*values: str | None) -> str | None:
+9
View File
@@ -90,6 +90,10 @@ class Settings(BaseSettings):
ml_service_url: str | None = None
acoustid_api_key: SecretStr | None = None
acoustid_api_url: str = "https://api.acoustid.org/v2/lookup"
# Above this AcoustID match score, trust the acoustic identification over
# embedded file tags (which are frequently junk on downloaded files —
# e.g. "Music Track" / "Sound_12345"). Below it, keep the tag-first merge.
acoustid_trust_score: float = 0.85
# MusicBrainz/AcoustID require a meaningful User-Agent identifying the
# application and a way to contact its maintainer (see
# https://musicbrainz.org/doc/XML_Web_Service/Rate_Limiting). Self-hosted
@@ -103,6 +107,11 @@ class Settings(BaseSettings):
# image installs it via libchromaprint-tools.
fpcalc_path: str = "fpcalc"
# Cover Art Archive — network fallback for album covers (after embedded art).
# Disable to keep enrichment fully offline; embedded artwork still works.
coverart_enabled: bool = True
coverart_base_url: str = "https://coverartarchive.org"
@field_validator("database_url")
@classmethod
def _require_async_driver(cls, v: str) -> str:
+2
View File
@@ -1,6 +1,7 @@
"""Domain entities and value objects — pure, framework-free."""
from app.domain.entities.album import Album
from app.domain.entities.cover import CoverArt
from app.domain.entities.history import PlayHistoryEntry
from app.domain.entities.like import Like
from app.domain.entities.metadata import AudioTags, Fingerprint, RecordingMatch
@@ -13,6 +14,7 @@ __all__ = [
"Album",
"Artist",
"AudioTags",
"CoverArt",
"Credentials",
"Fingerprint",
"Like",
+28
View File
@@ -0,0 +1,28 @@
"""Cover-art value object — raw image bytes plus their MIME type.
Crosses the domain boundary between the cover sources (embedded extractor,
Cover Art Archive) and the storage/serving layers. The bytes are the encoded
image as-is; we never decode/resize in Phase 1.
"""
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class CoverArt:
data: bytes
content_type: str # "image/jpeg" | "image/png" | …
@property
def extension(self) -> str:
"""File extension for the content type (no leading dot)."""
return _EXT_BY_TYPE.get(self.content_type.lower(), "jpg")
_EXT_BY_TYPE: dict[str, str] = {
"image/jpeg": "jpg",
"image/jpg": "jpg",
"image/png": "png",
"image/webp": "webp",
"image/gif": "gif",
}
+1
View File
@@ -47,6 +47,7 @@ class RecordingMatch:
acoustid: str
score: float
recording_mbid: str | None = None
release_group_mbid: str | None = None
title: str | None = None
artist: str | None = None
album: str | None = None
+3
View File
@@ -27,6 +27,9 @@ class Track:
duration_seconds: int | None
genre: str | None
year: int | None
track_number: int | None
metadata_status: str
metadata_error: str | None
enriched_at: dt.datetime | None
created_at: dt.datetime
updated_at: dt.datetime
+31 -3
View File
@@ -15,6 +15,7 @@ from typing import Protocol
from app.domain.entities import (
Album,
AudioTags,
CoverArt,
Credentials,
Fingerprint,
Like,
@@ -171,11 +172,18 @@ class TrackRepository(Protocol):
acoustid_fingerprint: str | None,
musicbrainz_id: str | None,
metadata_status: str,
metadata_error: str | None = None,
) -> Track:
"""Persist auto-enrichment results. Nullable fields are filled only when
a non-``None`` value is supplied (re-enrich never erases prior data);
``title``/``artist_id``/``metadata_status`` are always written. Callers
must not invoke this for ``metadata_status == 'manual'`` tracks."""
``title``/``artist_id``/``metadata_status`` are always written, and the
run's outcome (``metadata_error`` + completion time) is always stamped.
Callers must not invoke this for ``metadata_status == 'manual'`` tracks."""
...
async def mark_enrichment_failed(self, track_id: uuid.UUID, *, error: str) -> None:
"""Record that an enrichment run crashed unexpectedly: set ``failed`` +
the error reason. A no-op for ``manual`` or missing tracks."""
...
@@ -188,6 +196,7 @@ class AlbumRepository(Protocol):
year: int | None,
musicbrainz_id: str | None,
) -> Album: ...
async def set_cover_path(self, album_id: uuid.UUID, cover_path: str) -> None: ...
async def get_by_id(self, album_id: uuid.UUID) -> Album | None: ...
async def get_many(self, ids: list[uuid.UUID]) -> list[Album]: ...
async def count(self, *, artist_id: uuid.UUID | None, q: str | None) -> int: ...
@@ -293,7 +302,26 @@ class AudioFingerprinter(Protocol):
class AcoustIdClient(Protocol):
"""AcoustID lookup. ``is_available`` is False without an API key (the whole
fingerprint path is then skipped). ``lookup`` returns the best match or
``None`` (no result / service down), never raising."""
``None`` (no result / service down), never raising. ``lookup_all`` returns
the same candidates ranked by confidence (``[]`` on no result / unavailable
/ error), for the metadata editor's match picker."""
def is_available(self) -> bool: ...
async def lookup(self, fingerprint: Fingerprint) -> RecordingMatch | None: ...
async def lookup_all(self, fingerprint: Fingerprint) -> list[RecordingMatch]: ...
class CoverArtExtractor(Protocol):
"""Pulls embedded cover art out of a local audio file (offline, no network).
Returns ``None`` when the file has no picture or can't be parsed — never raises."""
async def extract(self, path: Path) -> CoverArt | None: ...
class CoverArtProvider(Protocol):
"""Fetches cover art from an external service (Cover Art Archive) by
MusicBrainz release-group id. ``is_available`` may gate it off; ``fetch``
returns ``None`` (not found / service down), never raising."""
def is_available(self) -> bool: ...
async def fetch_release_group(self, release_group_mbid: str) -> CoverArt | None: ...
+12 -1
View File
@@ -6,9 +6,10 @@
imports/downloads stay idempotent (plan §4, §6.1).
"""
import datetime as dt
import uuid
from sqlalchemy import ForeignKey, Integer, String, UniqueConstraint
from sqlalchemy import DateTime, ForeignKey, Integer, String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column
from app.infrastructure.db.base import Base
@@ -63,6 +64,16 @@ class TrackModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
nullable=False,
default=MetadataStatus.PENDING.value,
)
# Human-readable reason the last enrichment run set ``failed`` (no match, or
# an unexpected worker error). ``None`` once a run succeeds. Surfaced in the
# UI so a stuck/failed track is diagnosable, not silent.
metadata_error: Mapped[str | None] = mapped_column(String(2048), nullable=True)
# When the last enrichment run finished (success or failure). ``None`` while
# still ``pending`` — lets the UI distinguish "queued/running" from "done".
enriched_at: Mapped[dt.datetime | None] = mapped_column(
DateTime(timezone=True),
nullable=True,
)
added_by: Mapped[uuid.UUID | None] = mapped_column(
ForeignKey("users.id", ondelete="SET NULL"),
@@ -63,6 +63,12 @@ class SqlAlchemyAlbumRepository:
await self._session.refresh(row)
return _to_entity(row)
async def set_cover_path(self, album_id: uuid.UUID, cover_path: str) -> None:
row = await self._session.get(AlbumModel, album_id)
if row is not None:
row.cover_path = cover_path
await self._session.flush()
async def get_by_id(self, album_id: uuid.UUID) -> Album | None:
row = await self._session.get(AlbumModel, album_id)
return _to_entity(row) if row is not None else None
@@ -38,7 +38,10 @@ def _track_to_entity(row: TrackModel) -> Track:
duration_seconds=row.duration_seconds,
genre=row.genre,
year=row.year,
track_number=row.track_number,
metadata_status=row.metadata_status,
metadata_error=row.metadata_error,
enriched_at=row.enriched_at,
created_at=row.created_at,
updated_at=row.updated_at,
)
@@ -37,7 +37,10 @@ def _track_to_entity(row: TrackModel) -> Track:
duration_seconds=row.duration_seconds,
genre=row.genre,
year=row.year,
track_number=row.track_number,
metadata_status=row.metadata_status,
metadata_error=row.metadata_error,
enriched_at=row.enriched_at,
created_at=row.created_at,
updated_at=row.updated_at,
)
@@ -1,5 +1,6 @@
"""Track repository — adapter over ``AsyncSession``."""
import datetime as dt
import uuid
from sqlalchemy import func, select
@@ -25,7 +26,10 @@ def _to_entity(row: TrackModel) -> Track:
duration_seconds=row.duration_seconds,
genre=row.genre,
year=row.year,
track_number=row.track_number,
metadata_status=row.metadata_status,
metadata_error=row.metadata_error,
enriched_at=row.enriched_at,
created_at=row.created_at,
updated_at=row.updated_at,
)
@@ -159,6 +163,9 @@ class SqlAlchemyTrackRepository:
title: str | None,
genre: str | None,
year: int | None,
artist_id: uuid.UUID | None = None,
album_id: uuid.UUID | None = None,
track_number: int | None = None,
) -> Track:
row = await self._session.get(TrackModel, track_id)
if row is None:
@@ -169,6 +176,12 @@ class SqlAlchemyTrackRepository:
row.genre = genre
if year is not None:
row.year = year
if artist_id is not None:
row.artist_id = artist_id
if album_id is not None:
row.album_id = album_id
if track_number is not None:
row.track_number = track_number
row.metadata_status = "manual"
await self._session.flush()
await self._session.refresh(row)
@@ -189,6 +202,7 @@ class SqlAlchemyTrackRepository:
acoustid_fingerprint: str | None,
musicbrainz_id: str | None,
metadata_status: str,
metadata_error: str | None = None,
) -> Track:
row = await self._session.get(TrackModel, track_id)
if row is None:
@@ -197,6 +211,10 @@ class SqlAlchemyTrackRepository:
row.title = title
row.artist_id = artist_id
row.metadata_status = metadata_status
# A finished run always stamps outcome: clear/set the reason and mark the
# completion time so the UI can tell "still pending" from "done/failed".
row.metadata_error = metadata_error
row.enriched_at = dt.datetime.now(dt.UTC)
# Nullable extras: fill gaps only — never erase data a prior run found.
if album_id is not None:
row.album_id = album_id
@@ -217,3 +235,16 @@ class SqlAlchemyTrackRepository:
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def mark_enrichment_failed(self, track_id: uuid.UUID, *, error: str) -> None:
"""Record that an enrichment run crashed (unexpected exception). Runs in
its own session so the failure is persisted even though the run's own
transaction rolled back. Never overwrites ``manual`` (a no-op then), and
a missing track is a clean no-op."""
row = await self._session.get(TrackModel, track_id)
if row is None or row.metadata_status == "manual":
return
row.metadata_status = "failed"
row.metadata_error = error
row.enriched_at = dt.datetime.now(dt.UTC)
await self._session.flush()
+46 -14
View File
@@ -46,6 +46,18 @@ class AcoustIdHttpClient:
return bool(self._api_key)
async def lookup(self, fingerprint: Fingerprint) -> RecordingMatch | None:
payload = await self._lookup_raw(fingerprint)
if payload is None:
return None
return _parse_best_match(payload)
async def lookup_all(self, fingerprint: Fingerprint) -> list[RecordingMatch]:
payload = await self._lookup_raw(fingerprint)
if payload is None:
return []
return _parse_matches(payload)
async def _lookup_raw(self, fingerprint: Fingerprint) -> object | None:
if not self._api_key:
return None
try:
@@ -65,13 +77,11 @@ class AcoustIdHttpClient:
},
)
resp.raise_for_status()
payload = resp.json()
except (httpx.HTTPError, ValueError):
return resp.json() # type: ignore[no-any-return]
except httpx.HTTPError, ValueError:
log.warning("acoustid_lookup_failed")
return None
return _parse_best_match(payload)
@classmethod
async def _throttle(cls) -> None:
async with cls._throttle_lock:
@@ -82,29 +92,47 @@ class AcoustIdHttpClient:
cls._last_call_monotonic = time.monotonic()
_MAX_MATCHES = 5
def _parse_best_match(payload: object) -> RecordingMatch | None:
matches = _parse_matches(payload)
return matches[0] if matches else None
def _parse_matches(payload: object) -> list[RecordingMatch]:
if not isinstance(payload, dict) or payload.get("status") != "ok":
return None
return []
results = payload.get("results")
if not isinstance(results, list) or not results:
return None
return []
# Results are returned best-score-first; take the top scoring one.
best = max(results, key=lambda r: r.get("score", 0.0) if isinstance(r, dict) else 0.0)
if not isinstance(best, dict):
return None
# Results are returned best-score-first, but sort defensively and cap the
# number of candidates surfaced to the editor.
candidates = [r for r in results if isinstance(r, dict)]
candidates.sort(key=lambda r: r.get("score", 0.0), reverse=True)
acoustid = best.get("id")
matches: list[RecordingMatch] = []
for result in candidates[:_MAX_MATCHES]:
match = _parse_one(result)
if match is not None:
matches.append(match)
return matches
def _parse_one(result: dict[str, object]) -> RecordingMatch | None:
acoustid = result.get("id")
if not isinstance(acoustid, str):
return None
score = float(best.get("score", 0.0))
score = float(result.get("score", 0.0)) # type: ignore[arg-type]
recording_mbid: str | None = None
release_group_mbid: str | None = None
title: str | None = None
artist: str | None = None
album: str | None = None
recordings = best.get("recordings")
recordings = result.get("recordings")
if isinstance(recordings, list) and recordings and isinstance(recordings[0], dict):
rec = recordings[0]
recording_mbid = rec.get("id") if isinstance(rec.get("id"), str) else None
@@ -115,13 +143,17 @@ def _parse_best_match(payload: object) -> RecordingMatch | None:
artist = name if isinstance(name, str) else None
groups = rec.get("releasegroups")
if isinstance(groups, list) and groups and isinstance(groups[0], dict):
gtitle = groups[0].get("title")
group = groups[0]
gtitle = group.get("title")
album = gtitle if isinstance(gtitle, str) else None
gid = group.get("id")
release_group_mbid = gid if isinstance(gid, str) else None
return RecordingMatch(
acoustid=acoustid,
score=score,
recording_mbid=recording_mbid,
release_group_mbid=release_group_mbid,
title=title,
artist=artist,
album=album,
@@ -0,0 +1,111 @@
"""MutagenCoverExtractor — pulls embedded cover art from a local audio file.
The offline-first cover source (mirrors the tag pre-pass): a well-tagged file
often already carries front-cover artwork (ID3 ``APIC``, FLAC/OGG picture
blocks, MP4 ``covr``). We read it without any network call. Parsing is blocking,
so it runs in a worker thread. Any failure degrades to ``None`` — never raises.
mutagen ships no type stubs, so its objects are handled as ``Any`` and accessed
defensively (``getattr``) — the format zoo doesn't fit one static shape anyway.
"""
import base64
from pathlib import Path
from typing import Any
import anyio
from mutagen import File as MutagenFile # type: ignore[attr-defined]
from mutagen.flac import Picture
from mutagen.mp4 import MP4Cover
from app.core.logging import get_logger
from app.domain.entities.cover import CoverArt
log = get_logger(__name__)
# MP4 cover format flag → MIME (mutagen exposes an int, not a content type).
_MP4_FORMATS: dict[int, str] = {
MP4Cover.FORMAT_JPEG: "image/jpeg",
MP4Cover.FORMAT_PNG: "image/png",
}
_FRONT_COVER = 3 # APIC/Picture "type" value for the front cover
class MutagenCoverExtractor:
"""Implements :class:`app.domain.ports.CoverArtExtractor`."""
async def extract(self, path: Path) -> CoverArt | None:
try:
return await anyio.to_thread.run_sync(self._extract_sync, path)
except Exception:
log.warning("cover_extract_failed", path=str(path))
return None
def _extract_sync(self, path: Path) -> CoverArt | None:
audio: Any = MutagenFile(str(path))
if audio is None:
return None
# FLAC / OGG-FLAC: typed picture blocks on the file object.
pictures = getattr(audio, "pictures", None)
if pictures:
cover = _from_picture(_front_or_first(pictures))
if cover is not None:
return cover
tags = audio.tags
if tags is None:
return None
# MP3 / anything with ID3 frames: APIC frames keyed as "APIC:...".
apics = [frame for frame in tags.values() if frame.__class__.__name__ == "APIC"]
if apics:
cover = _from_picture(_front_or_first(apics))
if cover is not None:
return cover
get = getattr(tags, "get", None)
if get is None:
return None
# MP4 / M4A: "covr" atom holds a list of MP4Cover (a bytes subclass).
covr = get("covr")
if covr:
mp4_cover = covr[0]
content_type = _MP4_FORMATS.get(getattr(mp4_cover, "imageformat", -1), "image/jpeg")
return CoverArt(data=bytes(mp4_cover), content_type=content_type)
# OGG Vorbis: base64 picture block in METADATA_BLOCK_PICTURE.
block = get("metadata_block_picture")
if block:
cover = _from_picture(_decode_vorbis_picture(block[0]))
if cover is not None:
return cover
return None
def _from_picture(picture: Any) -> CoverArt | None:
"""Build a :class:`CoverArt` from a mutagen picture/APIC frame, or ``None``."""
if picture is None:
return None
data = getattr(picture, "data", None)
if not data:
return None
mime = getattr(picture, "mime", None) or "image/jpeg"
return CoverArt(data=bytes(data), content_type=str(mime))
def _front_or_first(pictures: list[Any]) -> Any:
"""Prefer the front-cover picture (type 3), else the first available."""
for pic in pictures:
if getattr(pic, "type", None) == _FRONT_COVER:
return pic
return pictures[0] if pictures else None
def _decode_vorbis_picture(encoded: str) -> Any:
try:
return Picture(base64.b64decode(encoded)) # type: ignore[no-untyped-call]
except Exception:
return None
+83
View File
@@ -0,0 +1,83 @@
"""CoverArtArchiveClient — fetches front cover art from the Cover Art Archive.
The network fallback when a file carries no embedded artwork: given a
MusicBrainz **release-group** id (supplied by the AcoustID lookup), request the
front image from ``coverartarchive.org``. The CAA redirects to the Internet
Archive, so redirects are followed. ``thumbnail`` 500px keeps payloads small.
Graceful degradation (CLAUDE.md): no release-group id → never called; any
network/HTTP error (incl. 404 "no cover") → returns ``None``, never raises. A
small inter-call delay respects the shared MusicBrainz/CAA infrastructure.
"""
import asyncio
import time
import httpx
from app.core.logging import get_logger
from app.domain.entities.cover import CoverArt
log = get_logger(__name__)
_DEFAULT_BASE_URL = "https://coverartarchive.org"
_TIMEOUT_SECONDS = 15.0
_MIN_INTERVAL_SECONDS = 1.0 # CAA piggybacks on MusicBrainz infra; stay polite
_MAX_BYTES = 10 * 1024 * 1024 # ignore absurdly large images
class CoverArtArchiveClient:
"""Implements :class:`app.domain.ports.CoverArtProvider`."""
_throttle_lock = asyncio.Lock()
_last_call_monotonic = 0.0
def __init__(
self,
*,
user_agent: str,
enabled: bool = True,
base_url: str = _DEFAULT_BASE_URL,
) -> None:
self._user_agent = user_agent
self._enabled = enabled
self._base_url = base_url.rstrip("/")
def is_available(self) -> bool:
return self._enabled
async def fetch_release_group(self, release_group_mbid: str) -> CoverArt | None:
if not self._enabled or not release_group_mbid:
return None
url = f"{self._base_url}/release-group/{release_group_mbid}/front-500"
try:
await self._throttle()
async with httpx.AsyncClient(
timeout=_TIMEOUT_SECONDS,
follow_redirects=True,
headers={"User-Agent": self._user_agent},
) as client:
resp = await client.get(url)
if resp.status_code == 404:
return None # no cover for this release group — normal, not an error
resp.raise_for_status()
except httpx.HTTPError:
log.warning("coverart_fetch_failed", release_group=release_group_mbid)
return None
data = resp.content
if not data or len(data) > _MAX_BYTES:
return None
content_type = resp.headers.get("content-type", "image/jpeg").split(";")[0].strip()
if not content_type.startswith("image/"):
return None
return CoverArt(data=data, content_type=content_type)
@classmethod
async def _throttle(cls) -> None:
async with cls._throttle_lock:
elapsed = time.monotonic() - cls._last_call_monotonic
wait = _MIN_INTERVAL_SECONDS - elapsed
if wait > 0:
await asyncio.sleep(wait)
cls._last_call_monotonic = time.monotonic()
+2 -2
View File
@@ -41,7 +41,7 @@ class FpcalcFingerprinter:
)
async with asyncio.timeout(_TIMEOUT_SECONDS):
stdout, _stderr = await proc.communicate()
except (TimeoutError, OSError):
except TimeoutError, OSError:
log.warning("fpcalc_failed", path=str(path))
return None
@@ -53,7 +53,7 @@ class FpcalcFingerprinter:
data = json.loads(stdout)
fingerprint = str(data["fingerprint"])
duration = round(float(data["duration"]))
except (json.JSONDecodeError, KeyError, ValueError):
except json.JSONDecodeError, KeyError, ValueError:
log.warning("fpcalc_bad_output", path=str(path))
return None
+1 -3
View File
@@ -84,9 +84,7 @@ class S3FileStorage:
async def _stream() -> AsyncGenerator[bytes]:
async with self._client() as s3:
try:
resp = await s3.get_object(
Bucket=_bucket, Key=_key, Range=range_header
)
resp = await s3.get_object(Bucket=_bucket, Key=_key, Range=range_header)
except ClientError as exc:
raise StorageError(str(exc)) from exc
body = resp["Body"]
+34 -14
View File
@@ -19,6 +19,8 @@ from app.infrastructure.db.repositories import (
SqlAlchemyTrackRepository,
)
from app.infrastructure.metadata.acoustid import AcoustIdHttpClient
from app.infrastructure.metadata.cover_extractor import MutagenCoverExtractor
from app.infrastructure.metadata.coverart import CoverArtArchiveClient
from app.infrastructure.metadata.fingerprint import FpcalcFingerprinter
from app.infrastructure.metadata.tags import MutagenTagReader
from app.infrastructure.storage.provider import get_file_storage
@@ -28,26 +30,44 @@ log = get_logger("worker.enrich")
async def enrich_track(_ctx: dict[str, Any], *, track_id: str) -> dict[str, Any]:
settings = get_settings()
api_key = (
settings.acoustid_api_key.get_secret_value() if settings.acoustid_api_key else None
)
api_key = settings.acoustid_api_key.get_secret_value() if settings.acoustid_api_key else None
acoustid = AcoustIdHttpClient(
api_key=api_key,
user_agent=settings.musicbrainz_user_agent,
api_url=settings.acoustid_api_url,
)
cover_provider = CoverArtArchiveClient(
user_agent=settings.musicbrainz_user_agent,
enabled=settings.coverart_enabled,
base_url=settings.coverart_base_url,
)
async with session_scope() as session:
service = MetadataEnrichmentService(
tracks=SqlAlchemyTrackRepository(session),
artists=SqlAlchemyArtistRepository(session),
albums=SqlAlchemyAlbumRepository(session),
storage=get_file_storage(),
tag_reader=MutagenTagReader(),
fingerprinter=FpcalcFingerprinter(settings.fpcalc_path),
acoustid=acoustid,
)
result = await service.enrich(uuid.UUID(track_id))
tid = uuid.UUID(track_id)
try:
async with session_scope() as session:
service = MetadataEnrichmentService(
tracks=SqlAlchemyTrackRepository(session),
artists=SqlAlchemyArtistRepository(session),
albums=SqlAlchemyAlbumRepository(session),
storage=get_file_storage(),
tag_reader=MutagenTagReader(),
fingerprinter=FpcalcFingerprinter(settings.fpcalc_path),
acoustid=acoustid,
cover_extractor=MutagenCoverExtractor(),
cover_provider=cover_provider,
acoustid_trust_score=settings.acoustid_trust_score,
)
result = await service.enrich(tid)
except Exception as exc:
# The run's own transaction rolled back, leaving the track stuck at
# ``pending``. Record the failure in a fresh session so the UI shows a
# ``failed`` status with a reason instead of a silent, endless spinner.
log.exception("enrich_failed", track_id=track_id)
async with session_scope() as session:
await SqlAlchemyTrackRepository(session).mark_enrichment_failed(
tid, error=f"Enrichment crashed: {type(exc).__name__}: {exc}"
)
return {"track_id": track_id, "status": "failed", "mbid": None}
return {
"track_id": str(result.track_id),
+79
View File
@@ -3,10 +3,28 @@
The ASGI app is driven in-process via httpx + asgi-lifespan (no network, no
running server). DB/Redis-backed integration fixtures arrive with the data
layer (plan §11 step 2).
DB safety
---------
Integration fixtures call ``Base.metadata.drop_all`` / ``create_all`` on
``get_engine()``. That engine is built from ``DATABASE_URL``, which in normal
runs points at the *developer's* database — ``localhost:5432/mcma`` for host
``pytest`` and ``db:5432/mcma`` for ``make test-api`` (which execs ``pytest``
inside the api container). Running the suite there silently wipes real data:
``drop_all`` removes every ORM table while leaving Alembic's ``alembic_version``
(it lives outside ``Base.metadata``) — the exact "tables keep disappearing,
version survives" symptom.
To make that impossible, this module redirects every test to a dedicated
``<db>_test`` database *before settings/engine load*, and creates it on demand.
The real dev database is never opened by the test suite.
"""
import asyncio
import os
from collections.abc import AsyncIterator
from pathlib import Path
from urllib.parse import urlsplit, urlunsplit
import pytest
from asgi_lifespan import LifespanManager
@@ -17,6 +35,67 @@ os.environ.setdefault("ENVIRONMENT", "test")
os.environ.setdefault("JWT_SECRET", "test-secret")
def _base_database_url() -> str:
"""Resolve the DB URL the app *would* use, mirroring pydantic-settings
precedence: real env var → ``.env`` file → the app's compiled-in default."""
if env := os.environ.get("DATABASE_URL"):
return env
dotenv = Path(__file__).resolve().parents[1] / ".env"
if dotenv.exists():
for raw in dotenv.read_text().splitlines():
line = raw.strip()
if line.startswith("DATABASE_URL=") and not line.startswith("#"):
return line.split("=", 1)[1].strip().strip("\"'")
return "postgresql+asyncpg://mcma:mcma@localhost:5432/mcma"
def _with_database(url: str, name: str) -> str:
"""Return ``url`` with its database name swapped for ``name``."""
return urlunsplit(urlsplit(url)._replace(path=f"/{name}"))
_BASE_DB_URL = _base_database_url()
_BASE_DB_NAME = urlsplit(_BASE_DB_URL).path.lstrip("/")
# Idempotent: if we're already pointed at a *_test DB, keep it as-is.
_TEST_DB_NAME = _BASE_DB_NAME if _BASE_DB_NAME.endswith("_test") else f"{_BASE_DB_NAME}_test"
_TEST_DB_URL = _with_database(_BASE_DB_URL, _TEST_DB_NAME)
# Redirect the whole suite to the test DB before anything reads settings.
os.environ["DATABASE_URL"] = _TEST_DB_URL
async def _create_test_db_if_missing() -> None:
"""Create ``<db>_test`` if the server is reachable. Best-effort: if Postgres
is down the integration fixtures skip on their own reachability probe, so a
failure here must stay silent rather than break unit-only runs."""
import asyncpg # type: ignore[import-untyped] # driver behind postgresql+asyncpg
# asyncpg wants a plain libpq DSN (no SQLAlchemy "+asyncpg" suffix), against
# the always-present ``postgres`` maintenance database.
dsn = _with_database(_TEST_DB_URL, "postgres").replace("+asyncpg", "")
try:
async with asyncio.timeout(5):
conn = await asyncpg.connect(dsn)
except Exception:
return
try:
exists = await conn.fetchval(
"SELECT 1 FROM pg_database WHERE datname = $1", _TEST_DB_NAME
)
if not exists:
# CREATE DATABASE can't run inside a transaction; asyncpg's implicit
# autocommit on a bare connection handles that.
await conn.execute(f'CREATE DATABASE "{_TEST_DB_NAME}"')
finally:
await conn.close()
@pytest.fixture(scope="session", autouse=True)
def _ensure_test_database() -> None:
"""Guarantee the dedicated test database exists once per session."""
asyncio.run(_create_test_db_if_missing())
@pytest.fixture
async def client() -> AsyncIterator[AsyncClient]:
from app.main import create_app
+20
View File
@@ -0,0 +1,20 @@
# Test fixtures
## `scarlet_fire_otis_mcdonald.mp3`
"Scarlet Fire" by **Otis McDonald** — a royalty-free / license-free track
(YouTube Audio Library; distributed via Pro-Sound.org). Used as a real-world
audio fixture for the enrichment pipeline.
What makes it a good fixture: its **embedded ID3 tags are junk**
(`title=Sound_13958`, `artist=Music Track`, `album=Музыка`, `genre=Hip Hop & Rap`)
while AcoustID identifies it with very high confidence as *Scarlet Fire /
Otis McDonald*. So it exercises both:
- the offline tag reader (deterministic, always runs), and
- the "trust a high-confidence AcoustID match over junk tags" path
(`acoustid_trust_score`), which only runs when `fpcalc` + an AcoustID API key
+ network are available — see `tests/test_real_file_enrichment.py`.
Because it's license-free, it may also seed a built-in demo track for fresh
instances.
Binary file not shown.
+1
View File
@@ -33,6 +33,7 @@ def test_parses_full_recording() -> None:
assert match.title == "One More Time"
assert match.artist == "Daft Punk"
assert match.album == "Discovery"
assert match.release_group_mbid == "rg1"
assert match.score == 0.97
+194
View File
@@ -0,0 +1,194 @@
"""Integration tests for the native cover-art endpoints.
Seeds an album with a stored cover, then exercises the ``/api/v1`` album/track
cover endpoints (token auth, 404 when absent). Requires a reachable Postgres;
skips otherwise.
"""
import asyncio
import os
import uuid
from collections.abc import AsyncIterator
from pathlib import Path
import pytest
from app.core.config import get_settings
from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
from app.infrastructure.db.repositories import (
SqlAlchemyAlbumRepository,
SqlAlchemyArtistRepository,
SqlAlchemyRefreshTokenRepository,
SqlAlchemyTrackRepository,
SqlAlchemyUserRepository,
)
from app.infrastructure.storage.provider import get_file_storage
from asgi_lifespan import LifespanManager
from httpx import ASGITransport, AsyncClient
pytestmark = pytest.mark.asyncio
# A minimal valid 1x1 PNG.
_PNG_BYTES = bytes.fromhex(
"89504e470d0a1a0a0000000d4948445200000001000000010802000000907753"
"de0000000c4944415408d763f8cfc0f01f0005000155a2b4f60000000049454e44ae426082"
)
_db_reachable_cache: bool | None = None
async def _db_reachable() -> bool:
global _db_reachable_cache
if _db_reachable_cache is not None:
return _db_reachable_cache
from sqlalchemy import text
try:
async with asyncio.timeout(3):
async with get_engine().connect() as conn:
await conn.execute(text("SELECT 1"))
_db_reachable_cache = True
except Exception:
_db_reachable_cache = False
return _db_reachable_cache
async def _seed_album_with_cover(*, with_cover: bool) -> tuple[uuid.UUID, uuid.UUID]:
"""Create an artist + album (+ optional cover file) + track. Returns
``(album_id, track_id)``."""
async with session_scope() as session:
artist = await SqlAlchemyArtistRepository(session).get_or_create("Coverart Artist")
album = await SqlAlchemyAlbumRepository(session).get_or_create(
title="Coverart Album", artist_id=artist.id, year=2020, musicbrainz_id=None
)
track = await SqlAlchemyTrackRepository(session).add(
id=uuid.uuid4(),
title="Coverart Track",
artist_id=artist.id,
storage_uri="tracks/zz/cover-track.mp3",
file_format="mp3",
file_size=10,
source="upload",
source_id="cover-src",
metadata_status="enriched",
added_by=None,
)
# Link the track to the album (add() doesn't take album_id).
await SqlAlchemyTrackRepository(session).apply_enrichment(
track.id,
title="Coverart Track",
artist_id=artist.id,
album_id=album.id,
genre=None,
year=2020,
track_number=1,
duration_seconds=1,
bitrate=None,
acoustid_fingerprint=None,
musicbrainz_id=None,
metadata_status="enriched",
)
if with_cover:
key = f"covers/{album.id}.png"
import tempfile
with tempfile.NamedTemporaryFile(suffix=".png") as tmp:
tmp.write(_PNG_BYTES)
tmp.flush()
await get_file_storage().save_file(key, Path(tmp.name))
await SqlAlchemyAlbumRepository(session).set_cover_path(album.id, key)
return album.id, track.id
@pytest.fixture
async def api(tmp_path: Path) -> AsyncIterator[AsyncClient]:
if not await _db_reachable():
pytest.skip("Postgres not reachable — integration test skipped.")
os.environ["MEDIA_PATH"] = str(tmp_path)
get_settings.cache_clear()
import app.infrastructure.storage.provider as _storage_provider
_storage_provider._storage = None
try:
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
from app.application.user_service import UserService
from app.core.security import Argon2PasswordHasher
async with session_scope() as session:
await UserService(
users=SqlAlchemyUserRepository(session),
refresh_tokens=SqlAlchemyRefreshTokenRepository(session),
hasher=Argon2PasswordHasher(),
).create_user(username="testuser", password="testpass1", is_superuser=False)
from app.main import create_app
app = create_app()
async with LifespanManager(app):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await dispose_engine()
finally:
_storage_provider._storage = None
os.environ.pop("MEDIA_PATH", None)
get_settings.cache_clear()
async def _login(api: AsyncClient) -> str:
resp = await api.post(
"/api/v1/auth/login", json={"username": "testuser", "password": "testpass1"}
)
assert resp.status_code == 200
return str(resp.json()["access_token"])
async def test_album_cover_served(api: AsyncClient) -> None:
token = await _login(api)
album_id, _ = await _seed_album_with_cover(with_cover=True)
resp = await api.get(f"/api/v1/albums/{album_id}/cover?token={token}")
assert resp.status_code == 200, resp.text
assert resp.headers["content-type"] == "image/png"
assert resp.content == _PNG_BYTES
async def test_track_cover_served_from_album(api: AsyncClient) -> None:
token = await _login(api)
_, track_id = await _seed_album_with_cover(with_cover=True)
resp = await api.get(f"/api/v1/tracks/{track_id}/cover?token={token}")
assert resp.status_code == 200, resp.text
assert resp.headers["content-type"] == "image/png"
assert resp.content == _PNG_BYTES
async def test_album_without_cover_is_404(api: AsyncClient) -> None:
token = await _login(api)
album_id, _ = await _seed_album_with_cover(with_cover=False)
resp = await api.get(f"/api/v1/albums/{album_id}/cover?token={token}")
assert resp.status_code == 404
async def test_cover_requires_auth(api: AsyncClient) -> None:
album_id, _ = await _seed_album_with_cover(with_cover=True)
resp = await api.get(f"/api/v1/albums/{album_id}/cover")
assert resp.status_code == 401
async def test_album_appears_with_has_cover_flag(api: AsyncClient) -> None:
token = await _login(api)
album_id, _ = await _seed_album_with_cover(with_cover=True)
resp = await api.get(f"/api/v1/albums/{album_id}", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
assert resp.json()["has_cover"] is True
+6
View File
@@ -51,7 +51,10 @@ class FakeTrackRepo:
duration_seconds=None,
genre=None,
year=None,
track_number=None,
metadata_status=str(kw["metadata_status"]),
metadata_error=None,
enriched_at=None,
created_at=now,
updated_at=now,
)
@@ -132,7 +135,10 @@ async def test_dedup_skips_already_imported() -> None:
duration_seconds=None,
genre=None,
year=None,
track_number=None,
metadata_status="pending",
metadata_error=None,
enriched_at=None,
created_at=now,
updated_at=now,
)
+200
View File
@@ -0,0 +1,200 @@
"""Integration tests for the metadata-editor endpoints (§A7, §1H).
Requires a reachable Postgres; skips otherwise.
"""
import asyncio
import os
from collections.abc import AsyncIterator
from pathlib import Path
import pytest
from app.core.config import get_settings
from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
from app.infrastructure.db.repositories import (
SqlAlchemyRefreshTokenRepository,
SqlAlchemyUserRepository,
)
from asgi_lifespan import LifespanManager
from httpx import ASGITransport, AsyncClient
pytestmark = pytest.mark.asyncio
_db_reachable_cache: bool | None = None
async def _db_reachable() -> bool:
global _db_reachable_cache
if _db_reachable_cache is not None:
return _db_reachable_cache
from sqlalchemy import text
try:
async with asyncio.timeout(3):
async with get_engine().connect() as conn:
await conn.execute(text("SELECT 1"))
_db_reachable_cache = True
except Exception:
_db_reachable_cache = False
return _db_reachable_cache
@pytest.fixture
async def api(tmp_path: Path) -> AsyncIterator[AsyncClient]:
if not await _db_reachable():
pytest.skip("Postgres not reachable — integration test skipped.")
os.environ["MEDIA_PATH"] = str(tmp_path)
get_settings.cache_clear()
import app.infrastructure.storage.provider as _storage_provider
_storage_provider._storage = None
try:
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
from app.application.user_service import UserService
from app.core.security import Argon2PasswordHasher
async with session_scope() as session:
await UserService(
users=SqlAlchemyUserRepository(session),
refresh_tokens=SqlAlchemyRefreshTokenRepository(session),
hasher=Argon2PasswordHasher(),
).create_user(username="testuser", password="testpass1", is_superuser=False)
from app.main import create_app
app = create_app()
async with LifespanManager(app):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await dispose_engine()
finally:
_storage_provider._storage = None
os.environ.pop("MEDIA_PATH", None)
get_settings.cache_clear()
async def _login(api: AsyncClient) -> str:
resp = await api.post(
"/api/v1/auth/login", json={"username": "testuser", "password": "testpass1"}
)
assert resp.status_code == 200
return str(resp.json()["access_token"])
async def _upload(api: AsyncClient, token: str, *, name: str = "song.mp3") -> str:
audio = b"fake audio bytes for metadata test" * 10
resp = await api.post(
"/api/v1/upload",
files={"file": (name, audio, "audio/mpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 200, resp.text
return str(resp.json()["track_id"])
async def test_track_out_includes_genre_year_track_number(api: AsyncClient) -> None:
token = await _login(api)
track_id = await _upload(api, token)
resp = await api.get(
f"/api/v1/tracks/{track_id}", headers={"Authorization": f"Bearer {token}"}
)
assert resp.status_code == 200, resp.text
body = resp.json()
assert "genre" in body
assert "year" in body
assert "track_number" in body
async def test_metadata_matches_degrades_without_acoustid(api: AsyncClient) -> None:
# No ACOUSTID_API_KEY / fpcalc configured in the test environment — the
# endpoint must degrade to an empty list, not error.
token = await _login(api)
track_id = await _upload(api, token)
resp = await api.get(
f"/api/v1/tracks/{track_id}/metadata/matches",
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 200, resp.text
assert resp.json() == {"items": []}
async def test_metadata_matches_not_found(api: AsyncClient) -> None:
token = await _login(api)
resp = await api.get(
"/api/v1/tracks/00000000-0000-0000-0000-000000000000/metadata/matches",
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 404
async def test_apply_metadata_updates_fields_and_sets_manual(api: AsyncClient) -> None:
token = await _login(api)
track_id = await _upload(api, token)
headers = {"Authorization": f"Bearer {token}"}
resp = await api.put(
f"/api/v1/tracks/{track_id}/metadata",
json={
"title": "New Title",
"artist_name": "New Artist",
"album_title": "New Album",
"year": 1999,
"genre": "Rock",
"track_number": 3,
},
headers=headers,
)
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["title"] == "New Title"
assert body["artist_name"] == "New Artist"
assert body["album_title"] == "New Album"
assert body["year"] == 1999
assert body["genre"] == "Rock"
assert body["track_number"] == 3
assert body["metadata_status"] == "manual"
# Re-fetch to confirm persistence.
again = await api.get(f"/api/v1/tracks/{track_id}", headers=headers)
assert again.status_code == 200
assert again.json()["title"] == "New Title"
assert again.json()["metadata_status"] == "manual"
async def test_apply_metadata_partial_update(api: AsyncClient) -> None:
token = await _login(api)
track_id = await _upload(api, token)
headers = {"Authorization": f"Bearer {token}"}
resp = await api.put(
f"/api/v1/tracks/{track_id}/metadata",
json={"genre": "Jazz"},
headers=headers,
)
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["genre"] == "Jazz"
assert body["metadata_status"] == "manual"
async def test_apply_metadata_not_found(api: AsyncClient) -> None:
token = await _login(api)
resp = await api.put(
"/api/v1/tracks/00000000-0000-0000-0000-000000000000/metadata",
json={"title": "x"},
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 404
+227 -4
View File
@@ -15,6 +15,7 @@ import pytest
from app.application.metadata_service import MetadataEnrichmentService
from app.domain.entities import Artist, Track
from app.domain.entities.album import Album
from app.domain.entities.cover import CoverArt
from app.domain.entities.metadata import AudioTags, Fingerprint, RecordingMatch
pytestmark = pytest.mark.asyncio
@@ -37,7 +38,10 @@ def _track(*, metadata_status: str = "pending", title: str = "raw-stem") -> Trac
duration_seconds=None,
genre=None,
year=None,
track_number=None,
metadata_status=metadata_status,
metadata_error=None,
enriched_at=None,
created_at=now,
updated_at=now,
)
@@ -67,8 +71,10 @@ class FakeArtistRepo:
class FakeAlbumRepo:
def __init__(self) -> None:
def __init__(self, *, cover_path: str | None = None) -> None:
self.created: list[tuple[str, uuid.UUID]] = []
self.covers: dict[uuid.UUID, str] = {}
self._existing_cover = cover_path
async def get_or_create(
self, *, title: str, artist_id: uuid.UUID, year: int | None, musicbrainz_id: str | None
@@ -80,18 +86,52 @@ class FakeAlbumRepo:
title=title,
artist_id=artist_id,
year=year,
cover_path=None,
cover_path=self._existing_cover,
musicbrainz_id=musicbrainz_id,
created_at=now,
updated_at=now,
)
async def set_cover_path(self, album_id: uuid.UUID, cover_path: str) -> None:
self.covers[album_id] = cover_path
class FakeStorage:
def __init__(self) -> None:
self.saved: list[str] = []
@asynccontextmanager
async def as_local_path(self, key: str) -> AsyncIterator[Path]:
yield Path("/tmp") / key
async def save_file(self, key: str, src_path: Path) -> int:
self.saved.append(key)
return 1
class FakeCoverExtractor:
def __init__(self, cover: CoverArt | None) -> None:
self._cover = cover
self.calls = 0
async def extract(self, path: Path) -> CoverArt | None:
self.calls += 1
return self._cover
class FakeCoverProvider:
def __init__(self, cover: CoverArt | None, *, available: bool = True) -> None:
self._cover = cover
self._available = available
self.calls = 0
def is_available(self) -> bool:
return self._available
async def fetch_release_group(self, release_group_mbid: str) -> CoverArt | None:
self.calls += 1
return self._cover
class FakeTagReader:
def __init__(self, tags: AudioTags | None) -> None:
@@ -126,6 +166,10 @@ class FakeAcoustId:
self.calls += 1
return self._match
async def lookup_all(self, fingerprint: Fingerprint) -> list[RecordingMatch]:
self.calls += 1
return [self._match] if self._match is not None else []
def _service(
*,
@@ -214,6 +258,33 @@ async def test_nothing_found_marks_failed() -> None:
assert applied is not None
assert applied["artist_id"] == track.artist_id # fallback kept
assert applied["metadata_status"] == "failed"
# A failed run records a human-readable reason; here both id steps were
# available, so it's the generic "no match" message.
assert applied["metadata_error"] == "No metadata match found in tags or AcoustID."
async def test_failed_reason_names_unavailable_fingerprinter() -> None:
track = _track()
service, tracks, _, _, _ = _service(track=track, tags=None, fp=None, fp_available=False)
result = await service.enrich(track.id)
assert result.status == "failed"
applied = tracks.applied
assert applied is not None
assert "fingerprinting" in str(applied["metadata_error"])
async def test_successful_enrich_clears_error() -> None:
track = _track()
service, tracks, _, _, _ = _service(track=track, tags=AudioTags(artist="Pink Floyd"))
result = await service.enrich(track.id)
assert result.status == "enriched"
applied = tracks.applied
assert applied is not None
assert applied["metadata_error"] is None
async def test_acoustid_path_fills_when_tags_absent() -> None:
@@ -244,13 +315,14 @@ async def test_acoustid_path_fills_when_tags_absent() -> None:
assert "Daft Punk" in artists.created
async def test_tags_win_over_acoustid_for_overlapping_fields() -> None:
async def test_tags_win_over_low_confidence_acoustid() -> None:
track = _track()
fp = Fingerprint(fingerprint="AQAA", duration_seconds=200)
tags = AudioTags(title="Tagged Title", artist="Tagged Artist")
# Below the 0.85 trust threshold → keep tag-first.
match = RecordingMatch(
acoustid="aid",
score=0.9,
score=0.5,
recording_mbid="mbid",
title="AcoustID Title",
artist="AcoustID Artist",
@@ -269,6 +341,36 @@ async def test_tags_win_over_acoustid_for_overlapping_fields() -> None:
assert applied["musicbrainz_id"] == "mbid"
async def test_high_confidence_acoustid_wins_over_junk_tags() -> None:
track = _track()
fp = Fingerprint(fingerprint="AQAA", duration_seconds=200)
# The real-world bug: junk embedded tags on a downloaded file vs a near-
# certain acoustic identification. The match must win for the identity.
tags = AudioTags(title="Sound_13958", artist="Music Track", album="Музыка")
match = RecordingMatch(
acoustid="aid",
score=0.98,
recording_mbid="mbid",
release_group_mbid="rg",
title="Scarlet Fire",
artist="Otis McDonald",
album="Scarlet Fire",
)
service, tracks, artists, albums, _acoustid = _service(
track=track, tags=tags, fp=fp, match=match
)
await service.enrich(track.id)
applied = tracks.applied
assert applied is not None
assert applied["title"] == "Scarlet Fire"
assert "Otis McDonald" in artists.created
assert "Music Track" not in artists.created
assert albums.created and albums.created[0][0] == "Scarlet Fire"
assert applied["metadata_status"] == "enriched"
async def test_fingerprint_skipped_when_acoustid_unavailable() -> None:
track = _track()
fp = Fingerprint(fingerprint="AQAA", duration_seconds=200)
@@ -281,3 +383,124 @@ async def test_fingerprint_skipped_when_acoustid_unavailable() -> None:
# tags still enrich, but no AcoustID call is attempted
assert acoustid.calls == 0
assert result.status == "enriched"
# -- cover-art resolution -----------------------------------------------------
_PNG = CoverArt(data=b"\x89PNG\r\n", content_type="image/png")
_JPG = CoverArt(data=b"\xff\xd8\xff", content_type="image/jpeg")
def _cover_service(
*,
track: Track,
tags: AudioTags | None = None,
match: RecordingMatch | None = None,
fp: Fingerprint | None = None,
extractor: FakeCoverExtractor | None = None,
provider: FakeCoverProvider | None = None,
existing_cover: str | None = None,
) -> tuple[MetadataEnrichmentService, FakeAlbumRepo, FakeStorage]:
albums = FakeAlbumRepo(cover_path=existing_cover)
storage = FakeStorage()
service = MetadataEnrichmentService(
tracks=FakeTrackRepo(track), # type: ignore[arg-type]
artists=FakeArtistRepo(), # type: ignore[arg-type]
albums=albums, # type: ignore[arg-type]
storage=storage, # type: ignore[arg-type]
tag_reader=FakeTagReader(tags), # type: ignore[arg-type]
fingerprinter=FakeFingerprinter(fp), # type: ignore[arg-type]
acoustid=FakeAcoustId(match), # type: ignore[arg-type]
cover_extractor=extractor, # type: ignore[arg-type]
cover_provider=provider, # type: ignore[arg-type]
)
return service, albums, storage
async def test_cover_extracted_from_embedded_art() -> None:
track = _track()
extractor = FakeCoverExtractor(_PNG)
provider = FakeCoverProvider(_JPG)
service, albums, storage = _cover_service(
track=track,
tags=AudioTags(album="The Wall", artist="PF"),
extractor=extractor,
provider=provider,
)
await service.enrich(track.id)
assert extractor.calls == 1
assert provider.calls == 0 # embedded art wins → no network fetch
assert len(albums.covers) == 1
key = next(iter(albums.covers.values()))
assert key.startswith("covers/") and key.endswith(".png")
assert storage.saved == [key]
async def test_cover_falls_back_to_archive() -> None:
track = _track()
extractor = FakeCoverExtractor(None) # no embedded art
provider = FakeCoverProvider(_JPG)
match = RecordingMatch(acoustid="ac", score=1.0, release_group_mbid="rg-123", album="The Wall")
fp = Fingerprint(fingerprint="AQAA", duration_seconds=200)
service, albums, storage = _cover_service(
track=track,
tags=AudioTags(album="The Wall", artist="PF"),
match=match,
fp=fp,
extractor=extractor,
provider=provider,
)
await service.enrich(track.id)
assert extractor.calls == 1
assert provider.calls == 1
key = next(iter(albums.covers.values()))
assert key.endswith(".jpg")
assert storage.saved == [key]
async def test_cover_not_fetched_without_release_group() -> None:
track = _track()
provider = FakeCoverProvider(_JPG)
service, albums, _ = _cover_service(
track=track,
tags=AudioTags(album="The Wall", artist="PF"),
extractor=FakeCoverExtractor(None),
provider=provider,
)
await service.enrich(track.id)
assert provider.calls == 0 # no release-group mbid → nothing to look up
assert albums.covers == {}
async def test_existing_cover_is_not_overwritten() -> None:
track = _track()
extractor = FakeCoverExtractor(_PNG)
service, albums, storage = _cover_service(
track=track,
tags=AudioTags(album="The Wall", artist="PF"),
extractor=extractor,
existing_cover="covers/old.jpg",
)
await service.enrich(track.id)
assert extractor.calls == 0 # album already has a cover → skip entirely
assert albums.covers == {}
assert storage.saved == []
async def test_cover_skipped_when_no_album() -> None:
track = _track()
extractor = FakeCoverExtractor(_PNG)
# no album tag and no match → no album resolved → no cover work
service, _albums, storage = _cover_service(track=track, extractor=extractor)
await service.enrich(track.id)
assert extractor.calls == 0
assert storage.saved == []
+207
View File
@@ -0,0 +1,207 @@
"""Enrichment tests against a real audio file (``tests/fixtures/``).
The fixture "Scarlet Fire" by Otis McDonald carries *junk* embedded tags
(``Sound_13958`` / ``Music Track`` / ``Музыка``) yet is identified by AcoustID
with ~0.99 confidence. That makes it the real-world reproduction of the
"uploaded a track, got the wrong name/artist" bug: tag reading must be exact,
and a high-confidence AcoustID match must override the junk tags.
Two layers:
- The tag-reader test is offline and deterministic — it always runs.
- The AcoustID/identity tests need the ``fpcalc`` binary, an AcoustID API key,
and network. They *skip* (never fail) when those aren't present, honouring the
project rule that the suite never hard-requires network. They do run inside the
api/worker container (``make test-api``), which ships fpcalc + the key.
"""
import uuid
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
import pytest
from app.application.metadata_service import MetadataEnrichmentService
from app.core.config import get_settings
from app.domain.entities.album import Album
from app.domain.entities.track import Artist, Track
from app.infrastructure.metadata.acoustid import AcoustIdHttpClient
from app.infrastructure.metadata.fingerprint import FpcalcFingerprinter
from app.infrastructure.metadata.tags import MutagenTagReader
pytestmark = pytest.mark.asyncio
FIXTURE = Path(__file__).parent / "fixtures" / "scarlet_fire_otis_mcdonald.mp3"
_settings = get_settings()
_fpcalc = FpcalcFingerprinter(_settings.fpcalc_path)
# Gate for the network/identity tests — present in the container, absent in CI.
requires_acoustid = pytest.mark.skipif(
not (_fpcalc.is_available() and _settings.acoustid_api_key is not None),
reason="needs the fpcalc binary + ACOUSTID_API_KEY (+ network)",
)
def _acoustid_client() -> AcoustIdHttpClient:
key = _settings.acoustid_api_key
return AcoustIdHttpClient(
api_key=key.get_secret_value() if key else None,
user_agent=_settings.musicbrainz_user_agent,
api_url=_settings.acoustid_api_url,
)
# --- offline: tag reading on a real file -----------------------------------
async def test_real_file_embedded_tags_are_read() -> None:
"""The reader extracts the file's actual (junk) embedded tags verbatim —
proving real-file tag parsing works end to end, no network involved."""
assert FIXTURE.exists(), "fixture mp3 missing"
tags = await MutagenTagReader().read(FIXTURE)
assert tags is not None
assert tags.title == "Sound_13958"
assert tags.artist == "Music Track"
assert tags.album == "Музыка"
assert tags.genre == "Hip Hop & Rap"
assert tags.year == 2018
assert tags.duration_seconds == 143
assert tags.bitrate == 128
# --- networked: AcoustID identifies the real recording ---------------------
@requires_acoustid
async def test_real_file_identified_by_acoustid() -> None:
"""fpcalc → AcoustID identifies the real audio as Scarlet Fire / Otis
McDonald with high confidence (despite the junk tags)."""
fingerprint = await _fpcalc.calculate(FIXTURE)
if fingerprint is None:
pytest.skip("fpcalc produced no fingerprint")
match = await _acoustid_client().lookup(fingerprint)
if match is None:
pytest.skip("AcoustID returned no match (network/rate limit?)")
assert match.score >= _settings.acoustid_trust_score
assert match.title == "Scarlet Fire"
assert match.artist == "Otis McDonald"
assert match.recording_mbid is not None
@requires_acoustid
async def test_real_file_enrichment_overrides_junk_tags() -> None:
"""Full pipeline on the real file with the real tag-reader, fingerprinter
and AcoustID client: the high-confidence match wins over the junk embedded
tags, so the track is stored as Scarlet Fire / Otis McDonald."""
track = _pending_track()
tracks = _FakeTrackRepo(track)
artists = _FakeArtistRepo()
albums = _FakeAlbumRepo()
service = MetadataEnrichmentService(
tracks=tracks, # type: ignore[arg-type]
artists=artists, # type: ignore[arg-type]
albums=albums, # type: ignore[arg-type]
storage=_FixtureStorage(), # type: ignore[arg-type]
tag_reader=MutagenTagReader(),
fingerprinter=_fpcalc,
acoustid=_acoustid_client(),
acoustid_trust_score=_settings.acoustid_trust_score,
)
result = await service.enrich(track.id)
if result.status == "failed":
pytest.skip("AcoustID unavailable at run time (network/rate limit?)")
assert result.status == "enriched"
applied = tracks.applied
assert applied is not None
assert applied["title"] == "Scarlet Fire"
assert "Otis McDonald" in artists.created
assert "Music Track" not in artists.created
assert albums.created and albums.created[0][0] == "Scarlet Fire"
# --- minimal in-memory adapters --------------------------------------------
def _pending_track() -> Track:
now = datetime.now(UTC)
return Track(
id=uuid.uuid4(),
title="scarlet_fire_otis_mcdonald", # the upload-time filename stem
artist_id=uuid.uuid4(),
album_id=None,
storage_uri="tracks/sf/scarlet.mp3",
file_format="mp3",
file_size=FIXTURE.stat().st_size,
source="upload",
source_id="sha-real",
duration_seconds=None,
genre=None,
year=None,
track_number=None,
metadata_status="pending",
metadata_error=None,
enriched_at=None,
created_at=now,
updated_at=now,
)
class _FixtureStorage:
@asynccontextmanager
async def as_local_path(self, _key: str) -> AsyncIterator[Path]:
yield FIXTURE
class _FakeTrackRepo:
def __init__(self, track: Track) -> None:
self._track = track
self.applied: dict[str, object] | None = None
async def get_by_id(self, _track_id: uuid.UUID) -> Track:
return self._track
async def apply_enrichment(self, _track_id: uuid.UUID, **kw: object) -> Track:
self.applied = kw
return self._track
@dataclass
class _FakeArtistRepo:
created: list[str] = field(default_factory=list)
async def get_or_create(self, name: str) -> Artist:
self.created.append(name)
now = datetime.now(UTC)
return Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now)
@dataclass
class _FakeAlbumRepo:
created: list[tuple[str, uuid.UUID]] = field(default_factory=list)
async def get_or_create(
self, *, title: str, artist_id: uuid.UUID, year: int | None, musicbrainz_id: str | None
) -> Album:
self.created.append((title, artist_id))
now = datetime.now(UTC)
return Album(
id=uuid.uuid4(),
title=title,
artist_id=artist_id,
year=year,
cover_path=None,
musicbrainz_id=musicbrainz_id,
created_at=now,
updated_at=now,
)
async def set_cover_path(self, _album_id: uuid.UUID, _cover_path: str) -> None:
return None
+1 -3
View File
@@ -175,9 +175,7 @@ async def test_stream_range(api: AsyncClient) -> None:
async def test_stream_not_found(api: AsyncClient) -> None:
token = await _login(api)
resp = await api.get(
f"/api/v1/stream/00000000-0000-0000-0000-000000000000?token={token}"
)
resp = await api.get(f"/api/v1/stream/00000000-0000-0000-0000-000000000000?token={token}")
assert resp.status_code == 404