Files
Senko-san 58b98ab5ed
Docker Build & Publish / build (push) Successful in 1m10s
Docker Build & Publish / push (push) Failing after 7s
Docker Build & Publish / Prune old image versions (push) Has been skipped
feat(library): lazy materialization foundation for remote tracks (§Phase1)
Adds nullable storage fields + availability column on tracks, remote
source/source_id identity on albums/artists, TrackRepository.materialize()
and get_or_create_remote() repos — groundwork for on-demand YTM library
(placeholders saved without audio, materialized in-place on first play).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-14 17:51:43 +03:00

448 lines
16 KiB
Python

"""Ports — the contracts the application layer depends on.
These are Protocols, not implementations. Concrete adapters live in
``app.infrastructure`` (repositories) and ``app.core.security`` (crypto) and
are bound to these ports at the composition root (``app.api.deps``).
"""
import datetime as dt
import uuid
from collections.abc import AsyncIterator, Awaitable, Callable, Iterator
from contextlib import AbstractAsyncContextManager
from pathlib import Path
from typing import Protocol
from app.domain.entities import (
Album,
AudioTags,
CoverArt,
Credentials,
DiskUsage,
DownloadJob,
Fingerprint,
LibraryStats,
Like,
ObjectStat,
PlayHistoryEntry,
Playlist,
RecordingMatch,
SubsonicCredentials,
User,
)
from app.domain.entities.track import Artist, Track
from app.domain.sources import DownloadResult, RawMetadata, SearchResult, SourceFile, SourceInfo
from app.domain.tokens import IssuedToken, TokenClaims, TokenType
# A fetch source reports download progress as a fraction in [0.0, 1.0]. It's a
# plain callback (not a port) because it's an inversion of control supplied per
# call by the worker, which persists it to the download job.
ProgressCallback = Callable[[float], Awaitable[None]]
class UserRepository(Protocol):
async def get_by_id(self, user_id: uuid.UUID) -> User | None: ...
async def get_credentials_by_username(self, username: str) -> Credentials | None: ...
async def add(self, *, username: str, password_hash: str, is_superuser: bool) -> User: ...
async def list(self, *, limit: int, offset: int) -> list[User]: ...
async def set_password_hash(self, user_id: uuid.UUID, password_hash: str) -> None: ...
async def set_superuser(self, user_id: uuid.UUID, is_superuser: bool) -> User: ...
async def set_active(self, user_id: uuid.UUID, is_active: bool) -> User: ...
async def count(self) -> int: ...
# -- subsonic app-password (recoverable, encrypted at rest) ----------
async def get_subsonic_credentials_by_username(
self, username: str
) -> SubsonicCredentials | None: ...
async def get_subsonic_password_enc(self, user_id: uuid.UUID) -> str | None: ...
async def set_subsonic_password_enc(self, user_id: uuid.UUID, password_enc: str) -> None: ...
class SubsonicCipher(Protocol):
"""Symmetric encrypt/decrypt for the recoverable Subsonic app-password."""
def encrypt(self, plaintext: str) -> str: ...
def decrypt(self, token: str) -> str: ...
class RefreshTokenRepository(Protocol):
async def add(
self,
*,
jti: uuid.UUID,
user_id: uuid.UUID,
token_hash: str,
expires_at: dt.datetime,
) -> None: ...
async def is_valid(self, jti: uuid.UUID) -> bool:
"""True iff a row exists for ``jti`` that is neither revoked nor expired."""
...
async def revoke(self, jti: uuid.UUID) -> None: ...
async def revoke_all_for_user(self, user_id: uuid.UUID) -> None: ...
class PasswordHasher(Protocol):
def hash(self, password: str) -> str: ...
def verify_and_update(self, password: str, password_hash: str) -> tuple[bool, str | None]:
"""Verify ``password`` against ``password_hash``. Returns
``(is_valid, updated_hash)`` where ``updated_hash`` is a fresh hash to
persist when the stored one uses outdated parameters, else ``None``."""
...
class TokenService(Protocol):
def issue(self, *, subject: uuid.UUID, token_type: TokenType) -> IssuedToken: ...
def decode(self, encoded: str) -> TokenClaims:
"""Verify signature + expiry and return claims. Raises
:class:`~app.domain.errors.AuthenticationError` on any failure."""
...
class FileStorage(Protocol):
async def save_file(self, key: str, src_path: Path) -> int: ...
async def open_range(
self, key: str, start: int, end: int | None
) -> tuple[AsyncIterator[bytes], int]: ...
async def stat(self, key: str) -> ObjectStat: ...
async def exists(self, key: str) -> bool: ...
async def delete(self, key: str) -> None: ...
def as_local_path(self, key: str) -> AbstractAsyncContextManager[Path]: ...
async def disk_usage(self) -> DiskUsage | None:
"""Capacity of the volume backing the store, or ``None`` when the
backend has no addressable disk (e.g. an object store)."""
...
class ArtistRepository(Protocol):
async def get_or_create(self, name: str) -> Artist: ...
async def get_or_create_remote(self, *, name: str, source: str, source_id: str) -> Artist:
"""Resolve/create an artist bound to a remote ``(source, source_id)``
(lazy materialization save-to-library)."""
...
async def get_by_id(self, artist_id: uuid.UUID) -> Artist | None: ...
async def get_many(self, ids: list[uuid.UUID]) -> list[Artist]: ...
async def list(self, *, q: str | None, limit: int, offset: int) -> list[Artist]: ...
async def count(self, *, q: str | None) -> int: ...
async def album_count(self, artist_id: uuid.UUID) -> int: ...
async def track_count(self, artist_id: uuid.UUID) -> int: ...
class TrackRepository(Protocol):
async def get_by_id(self, track_id: uuid.UUID) -> Track | None: ...
async def get_by_source(self, source: str, source_id: str) -> Track | None: ...
async def add(
self,
*,
id: uuid.UUID,
title: str,
artist_id: uuid.UUID,
storage_uri: str | None,
file_format: str | None,
file_size: int | None,
source: str,
source_id: str,
metadata_status: str,
added_by: uuid.UUID | None,
availability: str = ...,
) -> Track: ...
async def materialize(
self,
track_id: uuid.UUID,
*,
storage_uri: str,
file_format: str,
file_size: int,
bitrate: int | None,
) -> Track:
"""Fill in a remote placeholder's audio fields after a download
(lazy materialization), flipping ``availability`` to ``local``."""
...
async def delete(self, track_id: uuid.UUID) -> None: ...
# genres / library_stats must come before ``list`` — the method named
# ``list`` shadows the builtin in later annotations (same pattern as
# AlbumRepository below).
async def genres(self) -> list[tuple[str, int]]: ...
async def library_stats(self) -> LibraryStats: ...
async def list(
self,
*,
artist_id: uuid.UUID | None,
album_id: uuid.UUID | None,
q: str | None,
source: str | None = None,
sort_by: str,
order: str,
limit: int,
offset: int,
) -> list[Track]: ...
async def count(
self,
*,
artist_id: uuid.UUID | None,
album_id: uuid.UUID | None,
q: str | None,
source: str | None = None,
) -> int: ...
async def update(
self,
track_id: uuid.UUID,
*,
title: str | None,
genre: str | None,
year: int | None,
) -> Track: ...
async def apply_enrichment(
self,
track_id: uuid.UUID,
*,
title: str,
artist_id: uuid.UUID,
album_id: uuid.UUID | None,
genre: str | None,
year: int | None,
track_number: int | None,
duration_seconds: int | None,
bitrate: int | None,
acoustid_fingerprint: str | None,
musicbrainz_id: str | None,
metadata_status: str,
metadata_error: str | None = None,
) -> Track:
"""Persist auto-enrichment results. Nullable fields are filled only when
a non-``None`` value is supplied (re-enrich never erases prior data);
``title``/``artist_id``/``metadata_status`` are always written, and the
run's outcome (``metadata_error`` + completion time) is always stamped.
Callers must not invoke this for ``metadata_status == 'manual'`` tracks."""
...
async def mark_enrichment_failed(self, track_id: uuid.UUID, *, error: str) -> None:
"""Record that an enrichment run crashed unexpectedly: set ``failed`` +
the error reason. A no-op for ``manual`` or missing tracks."""
...
class AlbumRepository(Protocol):
async def get_or_create(
self,
*,
title: str,
artist_id: uuid.UUID,
year: int | None,
musicbrainz_id: str | None,
) -> Album: ...
async def get_or_create_remote(
self,
*,
title: str,
artist_id: uuid.UUID,
year: int | None,
musicbrainz_id: str | None,
source: str,
source_id: str,
) -> Album:
"""Resolve/create an album bound to a remote ``(source, source_id)``
(lazy materialization save-to-library)."""
...
async def set_cover_path(self, album_id: uuid.UUID, cover_path: str) -> None: ...
async def get_by_id(self, album_id: uuid.UUID) -> Album | None: ...
async def get_many(self, ids: list[uuid.UUID]) -> list[Album]: ...
async def count(self, *, artist_id: uuid.UUID | None, q: str | None) -> int: ...
async def track_count(self, album_id: uuid.UUID) -> int: ...
async def track_count_many(self, album_ids: list[uuid.UUID]) -> dict[uuid.UUID, int]: ...
# list must come after any method using list[...] in its signature (name shadowing)
async def list(
self,
*,
artist_id: uuid.UUID | None,
q: str | None,
limit: int,
offset: int,
sort_by: str = "title",
order: str = "asc",
) -> list[Album]: ...
class PlaylistRepository(Protocol):
async def get_by_id(self, playlist_id: uuid.UUID) -> Playlist | None: ...
async def count(self, *, owner_id: uuid.UUID) -> int: ...
async def add(self, *, name: str, description: str | None, owner_id: uuid.UUID) -> Playlist: ...
async def update(
self, playlist_id: uuid.UUID, *, name: str | None, description: str | None
) -> Playlist: ...
async def delete(self, playlist_id: uuid.UUID) -> None: ...
async def track_count(self, playlist_id: uuid.UUID) -> int: ...
async def track_count_many(self, playlist_ids: list[uuid.UUID]) -> dict[uuid.UUID, int]: ...
async def get_tracks(
self, playlist_id: uuid.UUID, *, limit: int, offset: int
) -> list[Track]: ...
async def get_track_total(self, playlist_id: uuid.UUID) -> int: ...
async def add_track(
self, playlist_id: uuid.UUID, track_id: uuid.UUID, *, position: float
) -> None: ...
async def remove_track(self, playlist_id: uuid.UUID, track_id: uuid.UUID) -> None: ...
async def max_position(self, playlist_id: uuid.UUID) -> float: ...
# list must come after any method using list[...] in its signature (name shadowing)
async def list(self, *, owner_id: uuid.UUID, limit: int, offset: int) -> list[Playlist]: ...
class LikeRepository(Protocol):
async def add(self, *, user_id: uuid.UUID, track_id: uuid.UUID, value: str) -> Like: ...
async def get_latest_state(
self, *, user_id: uuid.UUID, track_ids: list[uuid.UUID]
) -> list[Like]: ...
async def list_liked_tracks(
self, *, user_id: uuid.UUID, limit: int, offset: int
) -> list[Track]: ...
async def count_liked_tracks(self, *, user_id: uuid.UUID) -> int: ...
class HistoryRepository(Protocol):
async def add(
self,
*,
user_id: uuid.UUID,
track_id: uuid.UUID,
played_at: dt.datetime,
play_duration_seconds: int | None,
completed: bool,
) -> PlayHistoryEntry: ...
async def list(
self, *, user_id: uuid.UUID, limit: int, offset: int
) -> list[PlayHistoryEntry]: ...
async def count(self, *, user_id: uuid.UUID) -> int: ...
class DownloadJobRepository(Protocol):
"""Persistence for download jobs (plan §6.1). Drives the §A5 download manager
and the worker's retry/backoff loop."""
async def add(
self,
*,
source: str,
source_id: str | None,
query: str | None,
requested_by: uuid.UUID | None,
) -> DownloadJob: ...
async def get_by_id(self, job_id: uuid.UUID) -> DownloadJob | None: ...
async def get_active_for_source(self, source: str, source_id: str) -> DownloadJob | None:
"""An unfinished (queued/downloading/enriching) job for the same item, if
any — used to dedup before enqueuing so a double-click can't queue twice."""
...
async def list(
self,
*,
requested_by: uuid.UUID | None,
status: str | None,
limit: int,
offset: int,
) -> list[DownloadJob]: ...
async def count(self, *, requested_by: uuid.UUID | None, status: str | None) -> int: ...
async def set_status(
self,
job_id: uuid.UUID,
*,
status: str,
error_message: str | None = None,
track_id: uuid.UUID | None = None,
) -> None: ...
async def set_progress(self, job_id: uuid.UUID, progress: float) -> None: ...
async def increment_retry(self, job_id: uuid.UUID) -> int:
"""Bump ``retry_count`` and return the new value."""
...
async def delete(self, job_id: uuid.UUID) -> None: ...
async def failure_rate(self, source: str, *, since: dt.datetime) -> float:
"""Fraction of jobs for ``source`` created since ``since`` that ended
``failed`` (0.0 when there are none) — drives the §A5 "source unhealthy"
banner."""
...
class SourceBackend(Protocol):
"""A registered source of tracks (mounted folder, YouTube, …).
``name`` is the stable identifier used in URLs and stored on ``track.source``.
"""
name: str
def info(self) -> SourceInfo: ...
def is_available(self) -> bool: ...
class IndexableSource(SourceBackend, Protocol):
"""A source that enumerates files already on disk (e.g. the local folder)."""
def scan(self) -> Iterator[SourceFile]: ...
class SearchableSource(SourceBackend, Protocol):
"""A source that can be searched by free text (e.g. YouTube Music).
Returns ``[]`` (never raises) on no results / the service being down — the
discover screen degrades to "nothing found" rather than erroring."""
async def search(self, query: str, *, limit: int) -> list[SearchResult]: ...
class FetchableSource(SourceBackend, Protocol):
"""A source that can download a previously-discovered item to local disk.
``fetch`` resolves a ``source_id`` (from a :class:`SearchResult`) into a file
and reports progress through ``on_progress``. It runs only in a worker (heavy
I/O) and raises on failure so the download task can retry with backoff."""
async def fetch(
self, source_id: str, *, on_progress: ProgressCallback | None = None
) -> DownloadResult: ...
async def get_metadata(self, source_id: str) -> RawMetadata | None: ...
# -- metadata enrichment (plan §6.2) -----------------------------------------
class AudioTagReader(Protocol):
"""Reads embedded tags from a local audio file. Returns ``None`` only when
the file can't be parsed at all — never raises (graceful degradation)."""
async def read(self, path: Path) -> AudioTags | None: ...
class AudioFingerprinter(Protocol):
"""Chromaprint (fpcalc) wrapper. ``is_available`` reflects whether the
binary is present; ``calculate`` returns ``None`` on any failure."""
def is_available(self) -> bool: ...
async def calculate(self, path: Path) -> Fingerprint | None: ...
class AcoustIdClient(Protocol):
"""AcoustID lookup. ``is_available`` is False without an API key (the whole
fingerprint path is then skipped). ``lookup`` returns the best match or
``None`` (no result / service down), never raising. ``lookup_all`` returns
the same candidates ranked by confidence (``[]`` on no result / unavailable
/ error), for the metadata editor's match picker."""
def is_available(self) -> bool: ...
async def lookup(self, fingerprint: Fingerprint) -> RecordingMatch | None: ...
async def lookup_all(self, fingerprint: Fingerprint) -> list[RecordingMatch]: ...
class CoverArtExtractor(Protocol):
"""Pulls embedded cover art out of a local audio file (offline, no network).
Returns ``None`` when the file has no picture or can't be parsed — never raises."""
async def extract(self, path: Path) -> CoverArt | None: ...
class CoverArtProvider(Protocol):
"""Fetches cover art from an external service (Cover Art Archive) by
MusicBrainz release-group id. ``is_available`` may gate it off; ``fetch``
returns ``None`` (not found / service down), never raising."""
def is_available(self) -> bool: ...
async def fetch_release_group(self, release_group_mbid: str) -> CoverArt | None: ...