"""MetadataEnrichmentService — the §6.2 pipeline orchestrator. Order (tag-first): embedded tags → Chromaprint fingerprint → AcoustID lookup. Tags fix the common well-tagged case offline; AcoustID identifies the rest and supplies a MusicBrainz id. The result updates the track and sets ``metadata_status`` to ``enriched`` (identity found) or ``failed`` (nothing). Invariants (plan §6.2, CLAUDE.md): - **Never touch ``manual``** — a user-edited track is returned untouched. - **Graceful degradation** — every external step is wrapped; one failure (no fpcalc, no API key, service down) degrades the result, never crashes. - **Idempotent** — re-running only fills gaps; ``apply_enrichment`` never erases. """ import uuid from dataclasses import dataclass from app.core.logging import get_logger from app.domain.entities.metadata import AudioTags, RecordingMatch from app.domain.ports import ( AcoustIdClient, AlbumRepository, ArtistRepository, AudioFingerprinter, AudioTagReader, FileStorage, TrackRepository, ) log = get_logger(__name__) _UNKNOWN_ARTIST = "Unknown Artist" @dataclass(frozen=True) class EnrichmentResult: track_id: uuid.UUID status: str # "enriched" | "failed" | "skipped" matched_mbid: str | None = None class MetadataEnrichmentService: def __init__( self, *, tracks: TrackRepository, artists: ArtistRepository, albums: AlbumRepository, storage: FileStorage, tag_reader: AudioTagReader, fingerprinter: AudioFingerprinter, acoustid: AcoustIdClient, ) -> None: self._tracks = tracks self._artists = artists self._albums = albums self._storage = storage self._tag_reader = tag_reader self._fingerprinter = fingerprinter self._acoustid = acoustid async def enrich(self, track_id: uuid.UUID) -> EnrichmentResult: track = await self._tracks.get_by_id(track_id) if track is None: log.info("enrich_track_missing", track_id=str(track_id)) return EnrichmentResult(track_id=track_id, status="skipped") if track.metadata_status == "manual": log.info("enrich_skip_manual", track_id=str(track_id)) return EnrichmentResult(track_id=track_id, status="skipped") tags = await self._read_local(track.storage_uri) match = await self._identify(track.storage_uri) # Merge sources: prefer embedded tags, fall back to the AcoustID match. # ``title`` is guaranteed non-None by the existing track title; the rest # stay None when neither source has them. tag_title = tags.title if tags else None tag_artist = tags.artist if tags else None tag_album = tags.album if tags else None title = _opt_str(tag_title, match.title if match else None) or track.title artist_name = _opt_str(tag_artist, match.artist if match else None) album_title = _opt_str(tag_album, match.album if match else None) year = _first_int(tags.year if tags else None, match.year if match else None) genre = tags.genre if tags else None track_number = tags.track_number if tags else None duration = _first_int( tags.duration_seconds if tags else None, track.duration_seconds, ) bitrate = tags.bitrate if tags else None mbid = match.recording_mbid if match else None acoustid_id = match.acoustid if match else None artist_id = await self._resolve_artist(artist_name, fallback=track.artist_id) album_id = await self._resolve_album(album_title, artist_id=artist_id, year=year, mbid=mbid) identified = bool(artist_name) or album_id is not None or mbid is not None status = "enriched" if identified else "failed" await self._tracks.apply_enrichment( track_id, title=title, artist_id=artist_id, album_id=album_id, genre=genre, year=year, track_number=track_number, duration_seconds=duration, bitrate=bitrate, acoustid_fingerprint=acoustid_id, musicbrainz_id=mbid, metadata_status=status, ) log.info("enrich_complete", track_id=str(track_id), status=status, mbid=mbid) return EnrichmentResult(track_id=track_id, status=status, matched_mbid=mbid) async def _read_local(self, storage_uri: str) -> AudioTags | None: try: async with self._storage.as_local_path(storage_uri) as path: return await self._tag_reader.read(path) except Exception: log.warning("enrich_tag_step_failed", storage_uri=storage_uri) return None async def _identify(self, storage_uri: str) -> RecordingMatch | None: if not self._acoustid.is_available() or not self._fingerprinter.is_available(): return None try: async with self._storage.as_local_path(storage_uri) as path: fingerprint = await self._fingerprinter.calculate(path) if fingerprint is None: return None return await self._acoustid.lookup(fingerprint) except Exception: log.warning("enrich_identify_step_failed", storage_uri=storage_uri) return None async def _resolve_artist(self, name: str | None, *, fallback: uuid.UUID) -> uuid.UUID: if not name or name == _UNKNOWN_ARTIST: return fallback artist = await self._artists.get_or_create(name) return artist.id async def _resolve_album( self, title: str | None, *, artist_id: uuid.UUID, year: int | None, mbid: str | None, ) -> uuid.UUID | None: if not title: return None album = await self._albums.get_or_create( title=title, artist_id=artist_id, year=year, musicbrainz_id=mbid, ) return album.id def _opt_str(*values: str | None) -> str | None: for value in values: if value: return value return None def _first_int(*values: int | None) -> int | None: for value in values: if value is not None: return value return None