Files
mcma-backend/app/domain/ports.py
T
Senko-san 0bb752f582
Docker Build & Publish / build (push) Has been cancelled
Docker Build & Publish / push (push) Has been cancelled
Docker Build & Publish / Prune old image versions (push) Has been cancelled
feat: cover-art pipeline (§1D)
Resolve, store and serve album cover art.

Sources (tag-first, mirroring enrichment): embedded artwork extracted
offline via mutagen (ID3 APIC / FLAC+OGG Picture / MP4 covr), then Cover
Art Archive by release-group MBID as a network fallback. Resolution runs
inside MetadataEnrichmentService after album resolution, only when the
album has no cover yet (idempotent, never overwrites), and is best-effort
so a cover failure never affects enrichment status.

- CoverArt value object + CoverArtExtractor/CoverArtProvider ports
- MutagenCoverExtractor + CoverArtArchiveClient adapters
- AcoustID parser now captures release_group_mbid
- Covers stored via FileStorage at covers/{album_id}.{ext} (local + S3)
- AlbumRepository.set_cover_path
- Serve real covers: GET /api/v1/albums|tracks/{id}/cover (StreamUser,
  ?token=), Subsonic getCoverArt (placeholder fallback)
- has_cover flag on AlbumOut/TrackOut
- coverart_enabled / coverart_base_url settings
- tests: cover resolution units + release_group parse + DB-backed
  test_cover_api.py (139 green via make test-api)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-13 12:10:05 +03:00

318 lines
12 KiB
Python

"""Ports — the contracts the application layer depends on.
These are Protocols, not implementations. Concrete adapters live in
``app.infrastructure`` (repositories) and ``app.core.security`` (crypto) and
are bound to these ports at the composition root (``app.api.deps``).
"""
import datetime as dt
import uuid
from collections.abc import AsyncIterator, Iterator
from contextlib import AbstractAsyncContextManager
from pathlib import Path
from typing import Protocol
from app.domain.entities import (
Album,
AudioTags,
CoverArt,
Credentials,
Fingerprint,
Like,
ObjectStat,
PlayHistoryEntry,
Playlist,
RecordingMatch,
SubsonicCredentials,
User,
)
from app.domain.entities.track import Artist, Track
from app.domain.sources import SourceFile, SourceInfo
from app.domain.tokens import IssuedToken, TokenClaims, TokenType
class UserRepository(Protocol):
async def get_by_id(self, user_id: uuid.UUID) -> User | None: ...
async def get_credentials_by_username(self, username: str) -> Credentials | None: ...
async def add(self, *, username: str, password_hash: str, is_superuser: bool) -> User: ...
async def list(self, *, limit: int, offset: int) -> list[User]: ...
async def set_password_hash(self, user_id: uuid.UUID, password_hash: str) -> None: ...
async def set_superuser(self, user_id: uuid.UUID, is_superuser: bool) -> User: ...
async def set_active(self, user_id: uuid.UUID, is_active: bool) -> User: ...
async def count(self) -> int: ...
# -- subsonic app-password (recoverable, encrypted at rest) ----------
async def get_subsonic_credentials_by_username(
self, username: str
) -> SubsonicCredentials | None: ...
async def get_subsonic_password_enc(self, user_id: uuid.UUID) -> str | None: ...
async def set_subsonic_password_enc(self, user_id: uuid.UUID, password_enc: str) -> None: ...
class SubsonicCipher(Protocol):
"""Symmetric encrypt/decrypt for the recoverable Subsonic app-password."""
def encrypt(self, plaintext: str) -> str: ...
def decrypt(self, token: str) -> str: ...
class RefreshTokenRepository(Protocol):
async def add(
self,
*,
jti: uuid.UUID,
user_id: uuid.UUID,
token_hash: str,
expires_at: dt.datetime,
) -> None: ...
async def is_valid(self, jti: uuid.UUID) -> bool:
"""True iff a row exists for ``jti`` that is neither revoked nor expired."""
...
async def revoke(self, jti: uuid.UUID) -> None: ...
async def revoke_all_for_user(self, user_id: uuid.UUID) -> None: ...
class PasswordHasher(Protocol):
def hash(self, password: str) -> str: ...
def verify_and_update(self, password: str, password_hash: str) -> tuple[bool, str | None]:
"""Verify ``password`` against ``password_hash``. Returns
``(is_valid, updated_hash)`` where ``updated_hash`` is a fresh hash to
persist when the stored one uses outdated parameters, else ``None``."""
...
class TokenService(Protocol):
def issue(self, *, subject: uuid.UUID, token_type: TokenType) -> IssuedToken: ...
def decode(self, encoded: str) -> TokenClaims:
"""Verify signature + expiry and return claims. Raises
:class:`~app.domain.errors.AuthenticationError` on any failure."""
...
class FileStorage(Protocol):
async def save_file(self, key: str, src_path: Path) -> int: ...
async def open_range(
self, key: str, start: int, end: int | None
) -> tuple[AsyncIterator[bytes], int]: ...
async def stat(self, key: str) -> ObjectStat: ...
async def exists(self, key: str) -> bool: ...
async def delete(self, key: str) -> None: ...
def as_local_path(self, key: str) -> AbstractAsyncContextManager[Path]: ...
class ArtistRepository(Protocol):
async def get_or_create(self, name: str) -> Artist: ...
async def get_by_id(self, artist_id: uuid.UUID) -> Artist | None: ...
async def get_many(self, ids: list[uuid.UUID]) -> list[Artist]: ...
async def list(self, *, q: str | None, limit: int, offset: int) -> list[Artist]: ...
async def count(self, *, q: str | None) -> int: ...
async def album_count(self, artist_id: uuid.UUID) -> int: ...
async def track_count(self, artist_id: uuid.UUID) -> int: ...
class TrackRepository(Protocol):
async def get_by_id(self, track_id: uuid.UUID) -> Track | None: ...
async def get_by_source(self, source: str, source_id: str) -> Track | None: ...
async def add(
self,
*,
id: uuid.UUID,
title: str,
artist_id: uuid.UUID,
storage_uri: str,
file_format: str,
file_size: int,
source: str,
source_id: str,
metadata_status: str,
added_by: uuid.UUID | None,
) -> Track: ...
async def delete(self, track_id: uuid.UUID) -> None: ...
# genres must come before ``list`` — the method named ``list`` shadows the
# builtin in later annotations (same pattern as AlbumRepository below).
async def genres(self) -> list[tuple[str, int]]: ...
async def list(
self,
*,
artist_id: uuid.UUID | None,
album_id: uuid.UUID | None,
q: str | None,
sort_by: str,
order: str,
limit: int,
offset: int,
) -> list[Track]: ...
async def count(
self,
*,
artist_id: uuid.UUID | None,
album_id: uuid.UUID | None,
q: str | None,
) -> int: ...
async def update(
self,
track_id: uuid.UUID,
*,
title: str | None,
genre: str | None,
year: int | None,
) -> Track: ...
async def apply_enrichment(
self,
track_id: uuid.UUID,
*,
title: str,
artist_id: uuid.UUID,
album_id: uuid.UUID | None,
genre: str | None,
year: int | None,
track_number: int | None,
duration_seconds: int | None,
bitrate: int | None,
acoustid_fingerprint: str | None,
musicbrainz_id: str | None,
metadata_status: str,
) -> Track:
"""Persist auto-enrichment results. Nullable fields are filled only when
a non-``None`` value is supplied (re-enrich never erases prior data);
``title``/``artist_id``/``metadata_status`` are always written. Callers
must not invoke this for ``metadata_status == 'manual'`` tracks."""
...
class AlbumRepository(Protocol):
async def get_or_create(
self,
*,
title: str,
artist_id: uuid.UUID,
year: int | None,
musicbrainz_id: str | None,
) -> Album: ...
async def set_cover_path(self, album_id: uuid.UUID, cover_path: str) -> None: ...
async def get_by_id(self, album_id: uuid.UUID) -> Album | None: ...
async def get_many(self, ids: list[uuid.UUID]) -> list[Album]: ...
async def count(self, *, artist_id: uuid.UUID | None, q: str | None) -> int: ...
async def track_count(self, album_id: uuid.UUID) -> int: ...
async def track_count_many(self, album_ids: list[uuid.UUID]) -> dict[uuid.UUID, int]: ...
# list must come after any method using list[...] in its signature (name shadowing)
async def list(
self,
*,
artist_id: uuid.UUID | None,
q: str | None,
limit: int,
offset: int,
sort_by: str = "title",
order: str = "asc",
) -> list[Album]: ...
class PlaylistRepository(Protocol):
async def get_by_id(self, playlist_id: uuid.UUID) -> Playlist | None: ...
async def count(self, *, owner_id: uuid.UUID) -> int: ...
async def add(self, *, name: str, description: str | None, owner_id: uuid.UUID) -> Playlist: ...
async def update(
self, playlist_id: uuid.UUID, *, name: str | None, description: str | None
) -> Playlist: ...
async def delete(self, playlist_id: uuid.UUID) -> None: ...
async def track_count(self, playlist_id: uuid.UUID) -> int: ...
async def track_count_many(self, playlist_ids: list[uuid.UUID]) -> dict[uuid.UUID, int]: ...
async def get_tracks(
self, playlist_id: uuid.UUID, *, limit: int, offset: int
) -> list[Track]: ...
async def get_track_total(self, playlist_id: uuid.UUID) -> int: ...
async def add_track(
self, playlist_id: uuid.UUID, track_id: uuid.UUID, *, position: float
) -> None: ...
async def remove_track(self, playlist_id: uuid.UUID, track_id: uuid.UUID) -> None: ...
async def max_position(self, playlist_id: uuid.UUID) -> float: ...
# list must come after any method using list[...] in its signature (name shadowing)
async def list(self, *, owner_id: uuid.UUID, limit: int, offset: int) -> list[Playlist]: ...
class LikeRepository(Protocol):
async def add(self, *, user_id: uuid.UUID, track_id: uuid.UUID, value: str) -> Like: ...
async def get_latest_state(
self, *, user_id: uuid.UUID, track_ids: list[uuid.UUID]
) -> list[Like]: ...
async def list_liked_tracks(
self, *, user_id: uuid.UUID, limit: int, offset: int
) -> list[Track]: ...
async def count_liked_tracks(self, *, user_id: uuid.UUID) -> int: ...
class HistoryRepository(Protocol):
async def add(
self,
*,
user_id: uuid.UUID,
track_id: uuid.UUID,
played_at: dt.datetime,
play_duration_seconds: int | None,
completed: bool,
) -> PlayHistoryEntry: ...
async def list(
self, *, user_id: uuid.UUID, limit: int, offset: int
) -> list[PlayHistoryEntry]: ...
async def count(self, *, user_id: uuid.UUID) -> int: ...
class SourceBackend(Protocol):
"""A registered source of tracks (mounted folder, YouTube, …).
``name`` is the stable identifier used in URLs and stored on ``track.source``.
"""
name: str
def info(self) -> SourceInfo: ...
def is_available(self) -> bool: ...
class IndexableSource(SourceBackend, Protocol):
"""A source that enumerates files already on disk (e.g. the local folder)."""
def scan(self) -> Iterator[SourceFile]: ...
# -- metadata enrichment (plan §6.2) -----------------------------------------
class AudioTagReader(Protocol):
"""Reads embedded tags from a local audio file. Returns ``None`` only when
the file can't be parsed at all — never raises (graceful degradation)."""
async def read(self, path: Path) -> AudioTags | None: ...
class AudioFingerprinter(Protocol):
"""Chromaprint (fpcalc) wrapper. ``is_available`` reflects whether the
binary is present; ``calculate`` returns ``None`` on any failure."""
def is_available(self) -> bool: ...
async def calculate(self, path: Path) -> Fingerprint | None: ...
class AcoustIdClient(Protocol):
"""AcoustID lookup. ``is_available`` is False without an API key (the whole
fingerprint path is then skipped). ``lookup`` returns the best match or
``None`` (no result / service down), never raising."""
def is_available(self) -> bool: ...
async def lookup(self, fingerprint: Fingerprint) -> RecordingMatch | None: ...
class CoverArtExtractor(Protocol):
"""Pulls embedded cover art out of a local audio file (offline, no network).
Returns ``None`` when the file has no picture or can't be parsed — never raises."""
async def extract(self, path: Path) -> CoverArt | None: ...
class CoverArtProvider(Protocol):
"""Fetches cover art from an external service (Cover Art Archive) by
MusicBrainz release-group id. ``is_available`` may gate it off; ``fetch``
returns ``None`` (not found / service down), never raising."""
def is_available(self) -> bool: ...
async def fetch_release_group(self, release_group_mbid: str) -> CoverArt | None: ...