Compare commits

...

2 Commits

Author SHA1 Message Date
Senko-san e45e578f54 feat(library): remote browse status + save/materialize API (§Phase2-3)
Docker Build & Publish / build (push) Successful in 1m11s
Docker Build & Publish / push (push) Failing after 6s
Docker Build & Publish / Prune old image versions (push) Has been skipped
Search results now report whether a hit is already saved (in_library,
track_id, availability). New RemoteLibraryService backs POST
/tracks/remote (idempotent placeholder save) and POST
/tracks/{id}/materialize (on-demand fetch via a new materialize_track
arq task, reusing in-flight jobs).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-14 18:11:01 +03:00
Senko-san 58b98ab5ed feat(library): lazy materialization foundation for remote tracks (§Phase1)
Docker Build & Publish / build (push) Successful in 1m10s
Docker Build & Publish / push (push) Failing after 7s
Docker Build & Publish / Prune old image versions (push) Has been skipped
Adds nullable storage fields + availability column on tracks, remote
source/source_id identity on albums/artists, TrackRepository.materialize()
and get_or_create_remote() repos — groundwork for on-demand YTM library
(placeholders saved without audio, materialized in-place on first play).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-14 17:51:43 +03:00
34 changed files with 1215 additions and 42 deletions
@@ -0,0 +1,65 @@
"""remote placeholders: track availability, album/artist remote ids
Revision ID: dc126696f5a6
Revises: 20260614_dl_track_id
Create Date: 2026-06-14 11:25:30.643588
"""
from __future__ import annotations
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = 'dc126696f5a6'
down_revision: str | None = '20260614_dl_track_id'
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('albums', sa.Column('source', sa.String(length=32), nullable=True))
op.add_column('albums', sa.Column('source_id', sa.String(length=512), nullable=True))
op.create_unique_constraint('uq_albums_source_source_id', 'albums', ['source', 'source_id'])
op.add_column('artists', sa.Column('source', sa.String(length=32), nullable=True))
op.add_column('artists', sa.Column('source_id', sa.String(length=512), nullable=True))
op.create_unique_constraint('uq_artists_source_source_id', 'artists', ['source', 'source_id'])
op.add_column(
'tracks',
sa.Column('availability', sa.String(length=16), nullable=False, server_default='local'),
)
op.alter_column('tracks', 'availability', server_default=None)
op.alter_column('tracks', 'storage_uri',
existing_type=sa.VARCHAR(length=2048),
nullable=True)
op.alter_column('tracks', 'file_format',
existing_type=sa.VARCHAR(length=32),
nullable=True)
op.alter_column('tracks', 'file_size',
existing_type=sa.INTEGER(),
nullable=True)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('tracks', 'file_size',
existing_type=sa.INTEGER(),
nullable=False)
op.alter_column('tracks', 'file_format',
existing_type=sa.VARCHAR(length=32),
nullable=False)
op.alter_column('tracks', 'storage_uri',
existing_type=sa.VARCHAR(length=2048),
nullable=False)
op.drop_column('tracks', 'availability')
op.drop_constraint('uq_artists_source_source_id', 'artists', type_='unique')
op.drop_column('artists', 'source_id')
op.drop_column('artists', 'source')
op.drop_constraint('uq_albums_source_source_id', 'albums', type_='unique')
op.drop_column('albums', 'source_id')
op.drop_column('albums', 'source')
# ### end Alembic commands ###
+12 -1
View File
@@ -17,6 +17,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.application.auth_service import AuthService
from app.application.download_service import DownloadService
from app.application.metadata_service import MetadataEnrichmentService
from app.application.remote_library_service import RemoteLibraryService
from app.application.streaming_service import StreamingService
from app.application.subsonic_auth_service import SubsonicAuthService
from app.application.upload_service import UploadService
@@ -43,7 +44,7 @@ from app.infrastructure.metadata.fingerprint import FpcalcFingerprinter
from app.infrastructure.metadata.tags import MutagenTagReader
from app.infrastructure.sources.registry import SourceRegistry, build_source_registry
from app.infrastructure.storage.provider import get_file_storage
from app.workers.queue import enqueue_download, enqueue_enrich
from app.workers.queue import enqueue_download, enqueue_enrich, enqueue_materialize
async def get_session() -> AsyncIterator[AsyncSession]:
@@ -172,10 +173,20 @@ def get_download_service(session: SessionDep, storage: FileStorageDep) -> Downlo
)
def get_remote_library_service(session: SessionDep) -> RemoteLibraryService:
return RemoteLibraryService(
tracks=SqlAlchemyTrackRepository(session),
artists=SqlAlchemyArtistRepository(session),
jobs=SqlAlchemyDownloadJobRepository(session),
enqueue_materialize=enqueue_materialize,
)
UploadServiceDep = Annotated[UploadService, Depends(get_upload_service)]
StreamingServiceDep = Annotated[StreamingService, Depends(get_streaming_service)]
MetadataServiceDep = Annotated[MetadataEnrichmentService, Depends(get_metadata_service)]
DownloadServiceDep = Annotated[DownloadService, Depends(get_download_service)]
RemoteLibraryServiceDep = Annotated[RemoteLibraryService, Depends(get_remote_library_service)]
# -- library repository deps ---------------------------------------------------
+1 -1
View File
@@ -65,7 +65,7 @@ async def download(
if track is None:
raise NotFoundError("Song not found.")
result = await service.open_stream(track_id, None)
filename = f"{track.title}.{track.file_format}"
filename = f"{track.title}.{track.file_format or 'bin'}"
headers = {
"Content-Length": str(result.content_length),
"Content-Disposition": f'attachment; filename="{filename}"',
+2 -2
View File
@@ -80,8 +80,8 @@ def song_dict(
"albumId": encode_album(track.album_id) if track.album_id is not None else None,
"artistId": encode_artist(track.artist_id),
"coverArt": cover,
"size": track.file_size,
"contentType": content_type_for(track.file_format),
"size": track.file_size or 0,
"contentType": content_type_for(track.file_format or ""),
"suffix": track.file_format,
"duration": track.duration_seconds,
"year": track.year,
+14 -1
View File
@@ -1,7 +1,10 @@
"""Schemas for searching external (fetch) sources — the §A4 discover screen."""
import uuid
from pydantic import BaseModel
from app.domain.entities.track import Track
from app.domain.sources import SearchResult
@@ -13,9 +16,16 @@ class ExternalSearchResultOut(BaseModel):
album: str | None
duration_seconds: int | None
thumbnail_url: str | None
# Remote browse (plan: Model C) — set when this hit is already saved in the
# library, so the UI can show "Play"/"Saved" instead of "Save to library".
in_library: bool
track_id: uuid.UUID | None
availability: str | None
@classmethod
def from_entity(cls, r: SearchResult) -> ExternalSearchResultOut:
def from_entity(
cls, r: SearchResult, *, existing: Track | None = None
) -> ExternalSearchResultOut:
return cls(
source=r.source,
source_id=r.source_id,
@@ -24,6 +34,9 @@ class ExternalSearchResultOut(BaseModel):
album=r.album,
duration_seconds=r.duration_seconds,
thumbnail_url=r.thumbnail_url,
in_library=existing is not None,
track_id=existing.id if existing is not None else None,
availability=existing.availability if existing is not None else None,
)
+27 -3
View File
@@ -3,7 +3,9 @@
import datetime as dt
import uuid
from pydantic import BaseModel
from pydantic import BaseModel, Field
from app.api.schemas.download import DownloadJobOut
class TrackOut(BaseModel):
@@ -14,14 +16,15 @@ class TrackOut(BaseModel):
album_id: uuid.UUID | None
album_title: str | None
duration_seconds: int | None
file_format: str
file_size: int
file_format: str | None
file_size: int | None
genre: str | None
year: int | None
track_number: int | None
metadata_status: str
metadata_error: str | None
enriched_at: dt.datetime | None
availability: str
source: str
has_cover: bool
created_at: dt.datetime
@@ -61,3 +64,24 @@ class MetadataApply(BaseModel):
year: int | None = None
genre: str | None = None
track_number: int | None = None
class RemoteTrackSave(BaseModel):
"""Save a remote browse hit (§A4 discover) as a library placeholder —
``availability="remote"``, no audio until first play (plan: Model C)."""
source: str
source_id: str = Field(min_length=1)
title: str
artist: str | None = None
class MaterializeResponse(BaseModel):
"""Result of requesting that a placeholder track's audio be fetched.
``job`` is ``None`` when the track is already ``local`` — nothing to wait
for, the caller can stream immediately. Otherwise it's the (new or
already in-flight) job; poll ``GET /downloads/{job.id}`` until ``done``."""
track: TrackOut
job: DownloadJobOut | None
+7 -2
View File
@@ -18,6 +18,7 @@ router = APIRouter(prefix="/search", tags=["search"])
async def search(
_: CurrentUser,
registry: SourceRegistryDep,
track_repo: TrackRepoDep,
q: str = Query(min_length=1),
limit: int = Query(20, ge=1, le=50),
) -> ExternalSearchResponse:
@@ -25,7 +26,9 @@ async def search(
A source that is down contributes nothing rather than failing the whole
request (graceful degradation); only available sources are reported as
searched."""
searched. Each hit is checked against the library by ``(source,
source_id)`` so the UI can show "Saved"/"Play" instead of "Save to
library" without a separate round-trip (remote browse, plan: Model C)."""
results: list[ExternalSearchResultOut] = []
searched: list[str] = []
for backend in registry.searchables():
@@ -33,7 +36,9 @@ async def search(
continue
searched.append(backend.name)
hits = await backend.search(q, limit=limit)
results.extend(ExternalSearchResultOut.from_entity(h) for h in hits)
for h in hits:
existing = await track_repo.get_by_source(h.source, h.source_id)
results.append(ExternalSearchResultOut.from_entity(h, existing=existing))
return ExternalSearchResponse(results=results, searched_sources=searched)
+7 -5
View File
@@ -6,7 +6,7 @@ is an admin action and runs in a worker — the endpoint only enqueues it.
from fastapi import APIRouter, Query
from app.api.deps import CurrentUser, SourceRegistryDep, SuperUser
from app.api.deps import CurrentUser, SourceRegistryDep, SuperUser, TrackRepoDep
from app.api.schemas.external_search import ExternalSearchResponse, ExternalSearchResultOut
from app.api.schemas.source import ScanResponse, SourceHealthOut, SourceInfoOut
from app.domain.errors import DependencyUnavailableError
@@ -42,6 +42,7 @@ async def search_source(
source: str,
_: CurrentUser,
registry: SourceRegistryDep,
track_repo: TrackRepoDep,
q: str = Query(min_length=1),
limit: int = Query(20, ge=1, le=50),
) -> ExternalSearchResponse:
@@ -49,7 +50,8 @@ async def search_source(
if not backend.is_available():
raise DependencyUnavailableError(f"Source {source!r} is not available.")
results = await backend.search(q, limit=limit)
return ExternalSearchResponse(
results=[ExternalSearchResultOut.from_entity(r) for r in results],
searched_sources=[source],
)
out: list[ExternalSearchResultOut] = []
for r in results:
existing = await track_repo.get_by_source(r.source, r.source_id)
out.append(ExternalSearchResultOut.from_entity(r, existing=existing))
return ExternalSearchResponse(results=out, searched_sources=[source])
+58 -1
View File
@@ -13,14 +13,18 @@ from app.api.deps import (
CurrentUser,
FileStorageDep,
MetadataServiceDep,
RemoteLibraryServiceDep,
StreamUser,
TrackRepoDep,
)
from app.api.schemas.download import DownloadJobOut
from app.api.schemas.pagination import PagedResponse
from app.api.schemas.track import (
MaterializeResponse,
MetadataApply,
MetadataMatch,
MetadataMatchesOut,
RemoteTrackSave,
TrackOut,
TrackUpdate,
)
@@ -54,6 +58,7 @@ async def _build_track_out(
metadata_status=t.metadata_status,
metadata_error=t.metadata_error,
enriched_at=t.enriched_at,
availability=t.availability,
source=t.source,
has_cover=bool(t.album_id and albums.get(t.album_id) and albums[t.album_id].cover_path),
created_at=t.created_at,
@@ -98,6 +103,57 @@ async def list_tracks(
return PagedResponse(items=items, total=total, limit=limit, offset=offset)
@router.post("/remote", status_code=201)
async def save_remote_track(
body: RemoteTrackSave,
service: RemoteLibraryServiceDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
user: CurrentUser,
) -> TrackOut:
"""Save a remote browse hit (§A4 discover) as a library placeholder —
no audio is fetched yet (plan: Model C). Idempotent on ``(source,
source_id)``: saving an already-saved hit returns the existing track."""
track = await service.save_remote(
source=body.source,
source_id=body.source_id,
title=body.title,
artist=body.artist,
added_by=user.id,
)
artists = {a.id: a for a in await artist_repo.get_many([track.artist_id])}
album_ids = [track.album_id] if track.album_id else []
albums = {a.id: a for a in await album_repo.get_many(album_ids)}
items = await _build_track_out([track], artists, albums)
return items[0]
@router.post("/{track_id}/materialize")
async def materialize_track(
track_id: uuid.UUID,
service: RemoteLibraryServiceDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
user: CurrentUser,
) -> MaterializeResponse:
"""Fetch a placeholder track's audio on demand (plan: Model C lazy
materialization). Already-local tracks return ``job=None`` — nothing to
wait for. Otherwise poll ``GET /downloads/{job.id}`` until ``done``, then
stream as usual."""
outcome = await service.request_materialize(track_id, requested_by=user.id)
artists = {a.id: a for a in await artist_repo.get_many([outcome.track.artist_id])}
album_ids = [outcome.track.album_id] if outcome.track.album_id else []
albums = {a.id: a for a in await album_repo.get_many(album_ids)}
track_out = (await _build_track_out([outcome.track], artists, albums))[0]
return MaterializeResponse(
track=track_out,
job=DownloadJobOut.from_entity(outcome.job) if outcome.job is not None else None,
)
@router.get("/{track_id}")
async def get_track(
track_id: uuid.UUID,
@@ -155,7 +211,8 @@ async def delete_track(
if track is None:
raise NotFoundError(f"Track {track_id} not found.")
await track_repo.delete(track_id)
await storage.delete(track.storage_uri)
if track.storage_uri is not None:
await storage.delete(track.storage_uri)
return Response(status_code=204)
+9 -3
View File
@@ -79,9 +79,13 @@ class MetadataEnrichmentService:
if track.metadata_status == "manual":
log.info("enrich_skip_manual", track_id=str(track_id))
return EnrichmentResult(track_id=track_id, status="skipped")
storage_uri = track.storage_uri
if storage_uri is None:
log.info("enrich_skip_remote", track_id=str(track_id))
return EnrichmentResult(track_id=track_id, status="skipped")
tags = await self._read_local(track.storage_uri)
match = await self._identify(track.storage_uri)
tags = await self._read_local(storage_uri)
match = await self._identify(storage_uri)
# Merge order is tag-first by default — embedded tags fix the common
# well-tagged offline case. But a *high-confidence* AcoustID match is the
@@ -125,7 +129,7 @@ class MetadataEnrichmentService:
if album is not None:
await self._resolve_cover(
album,
storage_uri=track.storage_uri,
storage_uri=storage_uri,
release_group_mbid=match.release_group_mbid if match else None,
)
@@ -175,6 +179,8 @@ class MetadataEnrichmentService:
return []
if not self._acoustid.is_available() or not self._fingerprinter.is_available():
return []
if track.storage_uri is None:
return []
try:
async with self._storage.as_local_path(track.storage_uri) as path:
fingerprint = await self._fingerprinter.calculate(path)
+122
View File
@@ -0,0 +1,122 @@
"""RemoteLibraryService — save-to-library + materialize for remote browse hits
(plan: Model C, on-demand YTM library).
Two operations:
* ``save_remote`` persists a placeholder ``Track`` (``availability="remote"``,
``storage_uri=None``) for a remote browse hit. Idempotent on
``(source, source_id)`` — CLAUDE.md dedup.
* ``request_materialize`` lazily fills a placeholder's audio in place: it
creates (or reuses) a ``DownloadJob`` pointing at the existing track and
enqueues the materialize worker, which calls ``TrackRepository.materialize``
on completion. ``track.id`` never changes (CLAUDE.md), so likes/playlists/
queue entries referencing the placeholder keep working once it's filled in.
"""
import uuid
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from app.domain.entities.download import DownloadJob
from app.domain.entities.track import Track
from app.domain.errors import NotFoundError, ValidationError
from app.domain.ports import ArtistRepository, DownloadJobRepository, TrackRepository
_UNKNOWN_ARTIST = "Unknown Artist"
# (job_id) -> None — enqueue the materialize worker, same deferred pattern as
# download/enrich enqueuers.
MaterializeEnqueuer = Callable[[uuid.UUID], Awaitable[None]]
@dataclass(frozen=True)
class MaterializeOutcome:
"""Result of requesting materialization.
``job`` is ``None`` when the track is already ``local`` — nothing to do,
the caller can stream immediately. Otherwise it's the (new or already
in-flight) job filling the placeholder."""
track: Track
job: DownloadJob | None
class RemoteLibraryService:
def __init__(
self,
*,
tracks: TrackRepository,
artists: ArtistRepository,
jobs: DownloadJobRepository,
enqueue_materialize: MaterializeEnqueuer | None = None,
) -> None:
self._tracks = tracks
self._artists = artists
self._jobs = jobs
self._enqueue_materialize = enqueue_materialize
async def save_remote(
self,
*,
source: str,
source_id: str,
title: str,
artist: str | None,
added_by: uuid.UUID | None,
) -> Track:
"""Persist a placeholder for a remote browse hit. Idempotent: a hit
already saved (by ``(source, source_id)``) is returned as-is."""
source_id = source_id.strip()
if not source_id:
raise ValidationError("A source_id is required to save.")
existing = await self._tracks.get_by_source(source, source_id)
if existing is not None:
return existing
artist_entity = await self._artists.get_or_create(artist or _UNKNOWN_ARTIST)
return await self._tracks.add(
id=uuid.uuid4(),
title=title,
artist_id=artist_entity.id,
storage_uri=None,
file_format=None,
file_size=None,
source=source,
source_id=source_id,
metadata_status="pending",
added_by=added_by,
availability="remote",
)
async def request_materialize(
self, track_id: uuid.UUID, *, requested_by: uuid.UUID | None
) -> MaterializeOutcome:
"""Kick off (or report on) materializing a placeholder track.
Already-local tracks are a no-op (``job=None``). A track with no
remote ``source_id`` (e.g. a deleted upload row reused for something
else) can't be materialized."""
track = await self._tracks.get_by_id(track_id)
if track is None:
raise NotFoundError(f"Track {track_id} not found.")
if track.availability == "local":
return MaterializeOutcome(track=track, job=None)
if track.source_id is None:
raise ValidationError("Track has no remote source to materialize from.")
active = await self._jobs.get_active_for_source(track.source, track.source_id)
if active is not None:
return MaterializeOutcome(track=track, job=active)
job = await self._jobs.add(
source=track.source,
source_id=track.source_id,
query=None,
requested_by=requested_by,
)
await self._jobs.set_status(job.id, status="queued", track_id=track.id)
if self._enqueue_materialize is not None:
await self._enqueue_materialize(job.id)
refreshed = await self._jobs.get_by_id(job.id)
return MaterializeOutcome(track=track, job=refreshed if refreshed is not None else job)
+6 -3
View File
@@ -72,16 +72,19 @@ class StreamingService:
track = await self._tracks.get_by_id(track_id)
if track is None:
raise NotFoundError("Track not found.")
storage_uri = track.storage_uri
if storage_uri is None:
raise NotFoundError("Track is not yet downloaded.")
stat = await self._storage.stat(track.storage_uri)
stat = await self._storage.stat(storage_uri)
total_size = stat.size
content_type = stat.content_type or _FORMAT_CONTENT_TYPE.get(
track.file_format.lower(), "application/octet-stream"
(track.file_format or "").lower(), "application/octet-stream"
)
start, end, is_partial = _parse_range(range_header, total_size)
stream, _ = await self._storage.open_range(track.storage_uri, start, end)
stream, _ = await self._storage.open_range(storage_uri, start, end)
actual_end = end if end is not None else total_size - 1
content_length = actual_end - start + 1
+2
View File
@@ -13,5 +13,7 @@ class Album:
year: int | None
cover_path: str | None
musicbrainz_id: str | None
source: str | None
source_id: str | None
created_at: dt.datetime
updated_at: dt.datetime
+6 -3
View File
@@ -9,6 +9,8 @@ from dataclasses import dataclass
class Artist:
id: uuid.UUID
name: str
source: str | None
source_id: str | None
created_at: dt.datetime
updated_at: dt.datetime
@@ -19,9 +21,9 @@ class Track:
title: str
artist_id: uuid.UUID
album_id: uuid.UUID | None
storage_uri: str
file_format: str
file_size: int
storage_uri: str | None
file_format: str | None
file_size: int | None
source: str
source_id: str
duration_seconds: int | None
@@ -31,5 +33,6 @@ class Track:
metadata_status: str
metadata_error: str | None
enriched_at: dt.datetime | None
availability: str
created_at: dt.datetime
updated_at: dt.datetime
+36 -3
View File
@@ -114,6 +114,11 @@ class FileStorage(Protocol):
class ArtistRepository(Protocol):
async def get_or_create(self, name: str) -> Artist: ...
async def get_or_create_remote(self, *, name: str, source: str, source_id: str) -> Artist:
"""Resolve/create an artist bound to a remote ``(source, source_id)``
(lazy materialization save-to-library)."""
...
async def get_by_id(self, artist_id: uuid.UUID) -> Artist | None: ...
async def get_many(self, ids: list[uuid.UUID]) -> list[Artist]: ...
async def list(self, *, q: str | None, limit: int, offset: int) -> list[Artist]: ...
@@ -131,14 +136,28 @@ class TrackRepository(Protocol):
id: uuid.UUID,
title: str,
artist_id: uuid.UUID,
storage_uri: str,
file_format: str,
file_size: int,
storage_uri: str | None,
file_format: str | None,
file_size: int | None,
source: str,
source_id: str,
metadata_status: str,
added_by: uuid.UUID | None,
availability: str = ...,
) -> Track: ...
async def materialize(
self,
track_id: uuid.UUID,
*,
storage_uri: str,
file_format: str,
file_size: int,
bitrate: int | None,
) -> Track:
"""Fill in a remote placeholder's audio fields after a download
(lazy materialization), flipping ``availability`` to ``local``."""
...
async def delete(self, track_id: uuid.UUID) -> None: ...
# genres / library_stats must come before ``list`` — the method named
# ``list`` shadows the builtin in later annotations (same pattern as
@@ -212,6 +231,20 @@ class AlbumRepository(Protocol):
year: int | None,
musicbrainz_id: str | None,
) -> Album: ...
async def get_or_create_remote(
self,
*,
title: str,
artist_id: uuid.UUID,
year: int | None,
musicbrainz_id: str | None,
source: str,
source_id: str,
) -> Album:
"""Resolve/create an album bound to a remote ``(source, source_id)``
(lazy materialization save-to-library)."""
...
async def set_cover_path(self, album_id: uuid.UUID, cover_path: str) -> None: ...
async def get_by_id(self, album_id: uuid.UUID) -> Album | None: ...
async def get_many(self, ids: list[uuid.UUID]) -> list[Album]: ...
+11 -1
View File
@@ -2,7 +2,7 @@
import uuid
from sqlalchemy import ForeignKey, Integer, String
from sqlalchemy import ForeignKey, Integer, String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column
from app.infrastructure.db.base import Base
@@ -11,6 +11,12 @@ from app.infrastructure.db.models.mixins import TimestampMixin, UUIDPrimaryKeyMi
class AlbumModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
__tablename__ = "albums"
__table_args__ = (
# Binds a remote (browsable) album to its local row for re-browse/save
# dedup. Multiple NULLs are allowed by Postgres, so locally-created
# albums (source/source_id both NULL) never collide on this.
UniqueConstraint("source", "source_id", name="uq_albums_source_source_id"),
)
title: Mapped[str] = mapped_column(String(1024), index=True, nullable=False)
artist_id: Mapped[uuid.UUID] = mapped_column(
@@ -21,3 +27,7 @@ class AlbumModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
year: Mapped[int | None] = mapped_column(Integer, nullable=True)
cover_path: Mapped[str | None] = mapped_column(String(1024), nullable=True)
musicbrainz_id: Mapped[str | None] = mapped_column(String(36), index=True, nullable=True)
# -- remote identity (lazy materialization) --------------------------
source: Mapped[str | None] = mapped_column(String(32), nullable=True)
source_id: Mapped[str | None] = mapped_column(String(512), nullable=True)
+11 -1
View File
@@ -1,6 +1,6 @@
"""ORM model for artists."""
from sqlalchemy import String
from sqlalchemy import String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column
from app.infrastructure.db.base import Base
@@ -9,6 +9,16 @@ from app.infrastructure.db.models.mixins import TimestampMixin, UUIDPrimaryKeyMi
class ArtistModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
__tablename__ = "artists"
__table_args__ = (
# Binds a remote (browsable) artist to its local row for re-browse/save
# dedup. Multiple NULLs are allowed by Postgres, so locally-created
# artists (source/source_id both NULL) never collide on this.
UniqueConstraint("source", "source_id", name="uq_artists_source_source_id"),
)
name: Mapped[str] = mapped_column(String(512), index=True, nullable=False)
musicbrainz_id: Mapped[str | None] = mapped_column(String(36), index=True, nullable=True)
# -- remote identity (lazy materialization) --------------------------
source: Mapped[str | None] = mapped_column(String(32), nullable=True)
source_id: Mapped[str | None] = mapped_column(String(512), nullable=True)
+9
View File
@@ -64,3 +64,12 @@ class LyricsStatus(enum.StrEnum):
FOUND = "found"
NOT_FOUND = "not_found"
PENDING = "pending"
class TrackAvailability(enum.StrEnum):
"""Whether a track's audio is on local storage or still a remote placeholder
(plan: lazy materialization). ``remote`` tracks have ``storage_uri = NULL``
until ``TrackRepository.materialize`` fills it in."""
LOCAL = "local"
REMOTE = "remote"
+13 -4
View File
@@ -13,7 +13,7 @@ from sqlalchemy import DateTime, ForeignKey, Integer, String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column
from app.infrastructure.db.base import Base
from app.infrastructure.db.models.enums import MetadataStatus, StoragePolicy
from app.infrastructure.db.models.enums import MetadataStatus, StoragePolicy, TrackAvailability
from app.infrastructure.db.models.mixins import TimestampMixin, UUIDPrimaryKeyMixin
@@ -41,11 +41,20 @@ class TrackModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
year: Mapped[int | None] = mapped_column(Integer, nullable=True)
# -- file (original, stored as-is) -----------------------------------
storage_uri: Mapped[str] = mapped_column(String(2048), nullable=False)
file_format: Mapped[str] = mapped_column(String(32), nullable=False)
file_size: Mapped[int] = mapped_column(Integer, nullable=False)
# NULL on a remote placeholder (not yet materialized) — see ``availability``.
storage_uri: Mapped[str | None] = mapped_column(String(2048), nullable=True)
file_format: Mapped[str | None] = mapped_column(String(32), nullable=True)
file_size: Mapped[int | None] = mapped_column(Integer, nullable=True)
bitrate: Mapped[int | None] = mapped_column(Integer, nullable=True)
# ``remote`` = placeholder with no local audio yet; materialize() flips this
# to ``local`` once the file is downloaded and ``storage_uri`` is filled in.
availability: Mapped[str] = mapped_column(
String(16),
nullable=False,
default=TrackAvailability.LOCAL.value,
)
# -- dedup / external ids --------------------------------------------
acoustid_fingerprint: Mapped[str | None] = mapped_column(String(64), index=True, nullable=True)
musicbrainz_id: Mapped[str | None] = mapped_column(String(36), index=True, nullable=True)
@@ -18,6 +18,8 @@ def _to_entity(row: AlbumModel) -> Album:
year=row.year,
cover_path=row.cover_path,
musicbrainz_id=row.musicbrainz_id,
source=row.source,
source_id=row.source_id,
created_at=row.created_at,
updated_at=row.updated_at,
)
@@ -63,6 +65,58 @@ class SqlAlchemyAlbumRepository:
await self._session.refresh(row)
return _to_entity(row)
async def get_or_create_remote(
self,
*,
title: str,
artist_id: uuid.UUID,
year: int | None,
musicbrainz_id: str | None,
source: str,
source_id: str,
) -> Album:
"""Resolve an album by ``(source, source_id)`` first (re-browse/save
dedup), falling back to ``(title, artist_id)`` and gap-filling the
remote ids onto an existing row, else creating a new remote-bound row."""
row = (
await self._session.execute(
select(AlbumModel).where(
AlbumModel.source == source,
AlbumModel.source_id == source_id,
)
)
).scalar_one_or_none()
if row is None:
row = (
await self._session.execute(
select(AlbumModel).where(
AlbumModel.title == title,
AlbumModel.artist_id == artist_id,
)
)
).scalar_one_or_none()
if row is None:
row = AlbumModel(
title=title,
artist_id=artist_id,
year=year,
musicbrainz_id=musicbrainz_id,
source=source,
source_id=source_id,
)
self._session.add(row)
else:
if row.year is None and year is not None:
row.year = year
if row.musicbrainz_id is None and musicbrainz_id is not None:
row.musicbrainz_id = musicbrainz_id
if row.source is None and row.source_id is None:
row.source = source
row.source_id = source_id
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def set_cover_path(self, album_id: uuid.UUID, cover_path: str) -> None:
row = await self._session.get(AlbumModel, album_id)
if row is not None:
@@ -15,6 +15,8 @@ def _to_entity(row: ArtistModel) -> Artist:
return Artist(
id=row.id,
name=row.name,
source=row.source,
source_id=row.source_id,
created_at=row.created_at,
updated_at=row.updated_at,
)
@@ -35,6 +37,32 @@ class SqlAlchemyArtistRepository:
await self._session.refresh(row)
return _to_entity(row)
async def get_or_create_remote(self, *, name: str, source: str, source_id: str) -> Artist:
"""Resolve an artist by ``(source, source_id)`` first (re-browse/save
dedup), falling back to ``name`` and gap-filling the remote ids onto an
existing row, else creating a new remote-bound row."""
row = (
await self._session.execute(
select(ArtistModel).where(
ArtistModel.source == source,
ArtistModel.source_id == source_id,
)
)
).scalar_one_or_none()
if row is None:
row = (
await self._session.execute(select(ArtistModel).where(ArtistModel.name == name))
).scalar_one_or_none()
if row is None:
row = ArtistModel(name=name, source=source, source_id=source_id)
self._session.add(row)
elif row.source is None and row.source_id is None:
row.source = source
row.source_id = source_id
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def get_by_id(self, artist_id: uuid.UUID) -> Artist | None:
row = await self._session.get(ArtistModel, artist_id)
return _to_entity(row) if row is not None else None
@@ -42,6 +42,7 @@ def _track_to_entity(row: TrackModel) -> Track:
metadata_status=row.metadata_status,
metadata_error=row.metadata_error,
enriched_at=row.enriched_at,
availability=row.availability,
created_at=row.created_at,
updated_at=row.updated_at,
)
@@ -41,6 +41,7 @@ def _track_to_entity(row: TrackModel) -> Track:
metadata_status=row.metadata_status,
metadata_error=row.metadata_error,
enriched_at=row.enriched_at,
availability=row.availability,
created_at=row.created_at,
updated_at=row.updated_at,
)
@@ -10,6 +10,7 @@ from app.domain.entities.storage import FormatBreakdown, LibraryStats
from app.domain.entities.track import Track
from app.domain.errors import NotFoundError
from app.infrastructure.db.models.artist import ArtistModel
from app.infrastructure.db.models.enums import TrackAvailability
from app.infrastructure.db.models.track import TrackModel
@@ -31,6 +32,7 @@ def _to_entity(row: TrackModel) -> Track:
metadata_status=row.metadata_status,
metadata_error=row.metadata_error,
enriched_at=row.enriched_at,
availability=row.availability,
created_at=row.created_at,
updated_at=row.updated_at,
)
@@ -61,13 +63,14 @@ class SqlAlchemyTrackRepository:
id: uuid.UUID,
title: str,
artist_id: uuid.UUID,
storage_uri: str,
file_format: str,
file_size: int,
storage_uri: str | None,
file_format: str | None,
file_size: int | None,
source: str,
source_id: str,
metadata_status: str,
added_by: uuid.UUID | None,
availability: str = TrackAvailability.LOCAL.value,
) -> Track:
row = TrackModel(
id=id,
@@ -80,12 +83,38 @@ class SqlAlchemyTrackRepository:
source_id=source_id,
metadata_status=metadata_status,
added_by=added_by,
availability=availability,
)
self._session.add(row)
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def materialize(
self,
track_id: uuid.UUID,
*,
storage_uri: str,
file_format: str,
file_size: int,
bitrate: int | None,
) -> Track:
"""Fill in a remote placeholder's audio fields after a download (lazy
materialization). ``track.id`` is unchanged, so likes/playlists/queue
entries that already reference it keep working."""
row = await self._session.get(TrackModel, track_id)
if row is None:
raise NotFoundError(f"Track {track_id} not found.")
row.storage_uri = storage_uri
row.file_format = file_format
row.file_size = file_size
if bitrate is not None:
row.bitrate = bitrate
row.availability = TrackAvailability.LOCAL.value
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def delete(self, track_id: uuid.UUID) -> None:
row = await self._session.get(TrackModel, track_id)
if row is not None:
@@ -130,6 +159,7 @@ class SqlAlchemyTrackRepository:
func.count(TrackModel.id),
func.coalesce(func.sum(TrackModel.file_size), 0),
)
.where(TrackModel.file_format.is_not(None))
.group_by(TrackModel.file_format)
.order_by(func.sum(TrackModel.file_size).desc())
)
+7 -1
View File
@@ -12,6 +12,7 @@ from app.core.logging import configure_logging, get_logger
from app.workers.tasks.download_task import download_track
from app.workers.tasks.enrich_task import enrich_track
from app.workers.tasks.import_task import scan_local_folder
from app.workers.tasks.materialize_task import materialize_track
log = get_logger("worker")
@@ -27,7 +28,12 @@ async def shutdown(_ctx: dict[str, Any]) -> None:
class WorkerSettings:
functions: ClassVar[list[Any]] = [scan_local_folder, enrich_track, download_track]
functions: ClassVar[list[Any]] = [
scan_local_folder,
enrich_track,
download_track,
materialize_track,
]
on_startup = startup
on_shutdown = shutdown
max_jobs = get_settings().max_parallel_downloads
+11
View File
@@ -48,6 +48,17 @@ async def enqueue_download(job_id: uuid.UUID) -> None:
log.warning("download_enqueue_failed", job_id=str(job_id))
async def enqueue_materialize(job_id: uuid.UUID) -> None:
"""Best-effort enqueue of a materialize job for the worker (plan: Model C
lazy materialization). Same deferred-commit reasoning as
:func:`enqueue_download` — the job row stays ``queued`` and can be retried
if the queue is unreachable."""
try:
await enqueue("materialize_track", job_id=str(job_id), _defer_by=3)
except DependencyUnavailableError:
log.warning("materialize_enqueue_failed", job_id=str(job_id))
async def enqueue_enrich(track_id: uuid.UUID) -> None:
"""Best-effort enqueue of metadata enrichment for a freshly stored track.
+101
View File
@@ -0,0 +1,101 @@
"""arq task: materialize a remote placeholder track (plan: Model C).
Counterpart to ``download_task`` for tracks that were *saved* from a remote
browse hit without audio (``availability="remote"``, ``storage_uri=NULL``).
The job's ``track_id`` already points at the existing placeholder row — on
success the file is stored and ``TrackRepository.materialize`` fills the row
in place (the track's ``id`` never changes), then enrichment is enqueued as
usual.
Shares its fetch/retry/failure machinery with ``download_task`` — only the
"what happens on success" step differs (fill in an existing row vs. create a
new one).
"""
import contextlib
import uuid
from typing import Any
import anyio
from app.core.config import get_settings
from app.core.logging import correlation_id, get_logger
from app.domain.errors import NotFoundError, ValidationError
from app.domain.sources import DownloadResult
from app.infrastructure.db import session_scope
from app.infrastructure.db.repositories import (
SqlAlchemyDownloadJobRepository,
SqlAlchemyTrackRepository,
)
from app.infrastructure.sources.registry import build_source_registry
from app.infrastructure.storage.provider import get_file_storage
from app.workers.queue import enqueue_enrich
from app.workers.tasks.download_task import _handle_failure, _load_job, _mark_failed, _run_fetch
log = get_logger("worker.materialize")
async def materialize_track(_ctx: dict[str, Any], *, job_id: str) -> dict[str, Any]:
correlation_id.set(f"mat:{job_id}")
jid = uuid.UUID(job_id)
settings = get_settings()
job = await _load_job(jid)
if job is None:
log.info("materialize_job_missing", job_id=job_id) # cancelled before pickup
return {"job_id": job_id, "status": "missing"}
if job.track_id is None or job.source_id is None:
await _mark_failed(jid, "Materialize job missing track_id/source_id.")
return {"job_id": job_id, "status": "failed"}
registry = build_source_registry(settings)
try:
backend = registry.fetchable(job.source)
except (NotFoundError, ValidationError) as exc:
await _mark_failed(jid, f"Source unavailable: {exc}")
return {"job_id": job_id, "status": "failed"}
await _set_status(jid, "downloading")
try:
result = await _run_fetch(backend, job.source_id, jid)
except Exception as exc:
return await _handle_failure(jid, exc, settings.download_max_retries, job_id)
try:
await _materialize_result(jid, job.track_id, result)
except Exception as exc:
log.exception("materialize_finalize_failed", job_id=job_id)
await _mark_failed(jid, f"Materialize failed: {type(exc).__name__}: {exc}")
return {"job_id": job_id, "status": "failed"}
await enqueue_enrich(job.track_id)
log.info("materialize_complete", job_id=job_id, track_id=str(job.track_id))
return {"job_id": job_id, "status": "done", "track_id": str(job.track_id)}
async def _materialize_result(jid: uuid.UUID, track_id: uuid.UUID, result: DownloadResult) -> None:
"""Store the downloaded file and fill in the placeholder track in place."""
key = f"tracks/{str(track_id)[:2]}/{track_id}.{result.file_format}"
storage = get_file_storage()
try:
await storage.save_file(key, result.path)
async with session_scope() as session:
job_repo = SqlAlchemyDownloadJobRepository(session)
await job_repo.set_status(jid, status="enriching")
tracks = SqlAlchemyTrackRepository(session)
await tracks.materialize(
track_id,
storage_uri=key,
file_format=result.file_format,
file_size=result.file_size,
bitrate=result.bitrate,
)
await job_repo.set_status(jid, status="done", track_id=track_id)
finally:
with contextlib.suppress(Exception):
await anyio.Path(result.path).unlink(missing_ok=True)
async def _set_status(jid: uuid.UUID, status: str) -> None:
async with session_scope() as session:
await SqlAlchemyDownloadJobRepository(session).set_status(jid, status=status)
+10 -1
View File
@@ -16,7 +16,14 @@ pytestmark = pytest.mark.asyncio
class FakeArtistRepo:
async def get_or_create(self, name: str) -> Artist:
now = dt.datetime.now(dt.UTC)
return Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now)
return Artist(
id=uuid.uuid4(),
name=name,
source=None,
source_id=None,
created_at=now,
updated_at=now,
)
class FakeTrackRepo:
@@ -46,6 +53,7 @@ class FakeTrackRepo:
metadata_status=str(kw["metadata_status"]),
metadata_error=None,
enriched_at=None,
availability="local",
created_at=now,
updated_at=now,
)
@@ -140,6 +148,7 @@ def _track(source: str, source_id: str) -> Track:
metadata_status="pending",
metadata_error=None,
enriched_at=None,
availability="local",
created_at=now,
updated_at=now,
)
+108
View File
@@ -164,6 +164,114 @@ async def test_search_aggregates_fetch_sources(api: AsyncClient) -> None:
assert hit["title"] == "queen song"
async def test_search_reports_library_status(api: AsyncClient) -> None:
"""Remote browse (plan: Model C) — a fresh hit isn't in the library; after
saving it as a placeholder, the same search reports it as such."""
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
resp = await api.get("/api/v1/search", params={"q": "queen"}, headers=headers)
hit = resp.json()["results"][0]
assert hit["in_library"] is False
assert hit["track_id"] is None
assert hit["availability"] is None
save = await api.post(
"/api/v1/tracks/remote",
json={
"source": hit["source"],
"source_id": hit["source_id"],
"title": hit["title"],
"artist": hit["artist"],
},
headers=headers,
)
assert save.status_code == 201
saved = save.json()
assert saved["availability"] == "remote"
assert saved["file_format"] is None
resp2 = await api.get("/api/v1/search", params={"q": "queen"}, headers=headers)
hit2 = resp2.json()["results"][0]
assert hit2["in_library"] is True
assert hit2["track_id"] == saved["id"]
assert hit2["availability"] == "remote"
async def test_save_remote_is_idempotent(api: AsyncClient) -> None:
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
payload = {"source": "youtube", "source_id": "vid-idem", "title": "A", "artist": "Artist"}
first = await api.post("/api/v1/tracks/remote", json=payload, headers=headers)
second = await api.post("/api/v1/tracks/remote", json=payload, headers=headers)
assert first.status_code == 201
assert second.status_code == 201
assert first.json()["id"] == second.json()["id"]
async def test_materialize_flow(
api: AsyncClient, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Save a placeholder, materialize it on demand, and confirm it streams
afterwards (plan: Model C lazy materialization)."""
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
save = await api.post(
"/api/v1/tracks/remote",
json={
"source": "youtube",
"source_id": "vid-mat-1",
"title": "Materialize Me",
"artist": "Artist",
},
headers=headers,
)
track_id = save.json()["id"]
assert save.json()["availability"] == "remote"
# Streaming a placeholder before materialization fails (no audio yet).
stream_before = await api.get(f"/api/v1/stream/{track_id}", headers=headers)
assert stream_before.status_code == 404
materialize = await api.post(f"/api/v1/tracks/{track_id}/materialize", headers=headers)
assert materialize.status_code == 200
body = materialize.json()
assert body["job"] is not None
job_id = body["job"]["id"]
assert body["job"]["track_id"] == track_id
# A second materialize request reuses the same in-flight job.
again = await api.post(f"/api/v1/tracks/{track_id}/materialize", headers=headers)
assert again.json()["job"]["id"] == job_id
# Run the worker task directly (bypasses Redis) with the fake fetch source.
import app.workers.tasks.materialize_task as mat_task
worker_dir = tmp_path / "worker-mat"
worker_dir.mkdir()
fake = SourceRegistry([FakeFetchSource(worker_dir)]) # type: ignore[list-item]
monkeypatch.setattr(mat_task, "build_source_registry", lambda _settings: fake)
result = await mat_task.materialize_track({}, job_id=job_id)
assert result["status"] == "done"
assert result["track_id"] == track_id
got = await api.get(f"/api/v1/tracks/{track_id}", headers=headers)
assert got.json()["availability"] == "local"
assert got.json()["file_format"] == "webm"
# Streaming now works.
stream_after = await api.get(f"/api/v1/stream/{track_id}", headers=headers)
assert stream_after.status_code == 200
# Materializing an already-local track is a no-op.
noop = await api.post(f"/api/v1/tracks/{track_id}/materialize", headers=headers)
assert noop.json()["job"] is None
async def test_source_scoped_search(api: AsyncClient) -> None:
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
+10 -1
View File
@@ -20,7 +20,14 @@ class FakeArtistRepo:
async def get_or_create(self, name: str) -> Artist:
if name not in self._by_name:
now = dt.datetime.now(dt.UTC)
self._by_name[name] = Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now)
self._by_name[name] = Artist(
id=uuid.uuid4(),
name=name,
source=None,
source_id=None,
created_at=now,
updated_at=now,
)
return self._by_name[name]
@@ -55,6 +62,7 @@ class FakeTrackRepo:
metadata_status=str(kw["metadata_status"]),
metadata_error=None,
enriched_at=None,
availability="local",
created_at=now,
updated_at=now,
)
@@ -139,6 +147,7 @@ async def test_dedup_skips_already_imported() -> None:
metadata_status="pending",
metadata_error=None,
enriched_at=None,
availability="local",
created_at=now,
updated_at=now,
)
+156
View File
@@ -0,0 +1,156 @@
"""Integration tests for the lazy-materialization foundation:
``TrackRepository.materialize`` and ``Album``/``ArtistRepository.get_or_create_remote``.
Requires a reachable Postgres; skips otherwise (same pattern as
``test_upload_stream_api.py``).
"""
import asyncio
import uuid
from collections.abc import AsyncIterator
import pytest
from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
from app.infrastructure.db.models.enums import TrackAvailability
from app.infrastructure.db.repositories import (
SqlAlchemyAlbumRepository,
SqlAlchemyArtistRepository,
SqlAlchemyTrackRepository,
)
pytestmark = pytest.mark.asyncio
_db_reachable_cache: bool | None = None
async def _db_reachable() -> bool:
global _db_reachable_cache
if _db_reachable_cache is not None:
return _db_reachable_cache
from sqlalchemy import text
try:
async with asyncio.timeout(3):
async with get_engine().connect() as conn:
await conn.execute(text("SELECT 1"))
_db_reachable_cache = True
except Exception:
_db_reachable_cache = False
return _db_reachable_cache
@pytest.fixture
async def db() -> AsyncIterator[None]:
if not await _db_reachable():
pytest.skip("Postgres not reachable — integration test skipped.")
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
yield None
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await dispose_engine()
async def test_placeholder_track_materializes_in_place(db: None) -> None:
"""A remote placeholder (no storage) gets its audio fields filled in by
``materialize`` without changing ``track.id`` — the stable content id that
likes/playlists/queue may already reference."""
async with session_scope() as session:
artists = SqlAlchemyArtistRepository(session)
tracks = SqlAlchemyTrackRepository(session)
artist = await artists.get_or_create("Some Artist")
track_id = uuid.uuid4()
placeholder = await tracks.add(
id=track_id,
title="Remote Track",
artist_id=artist.id,
storage_uri=None,
file_format=None,
file_size=None,
source="youtube",
source_id="abc123",
metadata_status="pending",
added_by=None,
availability=TrackAvailability.REMOTE.value,
)
assert placeholder.availability == "remote"
assert placeholder.storage_uri is None
materialized = await tracks.materialize(
track_id,
storage_uri="tracks/ab/abc123.m4a",
file_format="m4a",
file_size=12345,
bitrate=160,
)
assert materialized.id == track_id
assert materialized.availability == "local"
assert materialized.storage_uri == "tracks/ab/abc123.m4a"
assert materialized.file_format == "m4a"
assert materialized.file_size == 12345
async def test_artist_get_or_create_remote_dedups_by_remote_id(db: None) -> None:
async with session_scope() as session:
artists = SqlAlchemyArtistRepository(session)
first = await artists.get_or_create_remote(
name="Daft Punk", source="youtube", source_id="UCabc"
)
again = await artists.get_or_create_remote(
name="Daft Punk (different display name)", source="youtube", source_id="UCabc"
)
assert first.id == again.id
assert again.source == "youtube"
assert again.source_id == "UCabc"
async def test_artist_get_or_create_remote_binds_existing_local_artist(db: None) -> None:
async with session_scope() as session:
artists = SqlAlchemyArtistRepository(session)
local = await artists.get_or_create("Pink Floyd")
remote = await artists.get_or_create_remote(
name="Pink Floyd", source="youtube", source_id="UCxyz"
)
assert remote.id == local.id
assert remote.source == "youtube"
assert remote.source_id == "UCxyz"
async def test_album_get_or_create_remote_dedups_by_remote_id(db: None) -> None:
async with session_scope() as session:
artists = SqlAlchemyArtistRepository(session)
albums = SqlAlchemyAlbumRepository(session)
artist = await artists.get_or_create("Daft Punk")
first = await albums.get_or_create_remote(
title="Discovery",
artist_id=artist.id,
year=2001,
musicbrainz_id=None,
source="youtube",
source_id="MPREb_abc",
)
again = await albums.get_or_create_remote(
title="Discovery",
artist_id=artist.id,
year=None,
musicbrainz_id=None,
source="youtube",
source_id="MPREb_abc",
)
assert first.id == again.id
assert again.source == "youtube"
assert again.source_id == "MPREb_abc"
assert again.year == 2001
+11 -1
View File
@@ -42,6 +42,7 @@ def _track(*, metadata_status: str = "pending", title: str = "raw-stem") -> Trac
metadata_status=metadata_status,
metadata_error=None,
enriched_at=None,
availability="local",
created_at=now,
updated_at=now,
)
@@ -67,7 +68,14 @@ class FakeArtistRepo:
async def get_or_create(self, name: str) -> Artist:
self.created.append(name)
now = dt.datetime.now(dt.UTC)
return Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now)
return Artist(
id=uuid.uuid4(),
name=name,
source=None,
source_id=None,
created_at=now,
updated_at=now,
)
class FakeAlbumRepo:
@@ -88,6 +96,8 @@ class FakeAlbumRepo:
year=year,
cover_path=self._existing_cover,
musicbrainz_id=musicbrainz_id,
source=None,
source_id=None,
created_at=now,
updated_at=now,
)
+11 -1
View File
@@ -149,6 +149,7 @@ def _pending_track() -> Track:
metadata_status="pending",
metadata_error=None,
enriched_at=None,
availability="local",
created_at=now,
updated_at=now,
)
@@ -180,7 +181,14 @@ class _FakeArtistRepo:
async def get_or_create(self, name: str) -> Artist:
self.created.append(name)
now = datetime.now(UTC)
return Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now)
return Artist(
id=uuid.uuid4(),
name=name,
source=None,
source_id=None,
created_at=now,
updated_at=now,
)
@dataclass
@@ -199,6 +207,8 @@ class _FakeAlbumRepo:
year=year,
cover_path=None,
musicbrainz_id=musicbrainz_id,
source=None,
source_id=None,
created_at=now,
updated_at=now,
)
+255
View File
@@ -0,0 +1,255 @@
"""Unit tests for RemoteLibraryService — DB-free, in-memory fakes (plan: Model C
remote browse + lazy materialization)."""
import datetime as dt
import uuid
import pytest
from app.application.remote_library_service import RemoteLibraryService
from app.domain.entities import Artist, DownloadJob, Track
from app.domain.errors import NotFoundError, ValidationError
pytestmark = pytest.mark.asyncio
class FakeArtistRepo:
def __init__(self) -> None:
self._by_name: dict[str, Artist] = {}
async def get_or_create(self, name: str) -> Artist:
if name not in self._by_name:
now = dt.datetime.now(dt.UTC)
self._by_name[name] = Artist(
id=uuid.uuid4(),
name=name,
source=None,
source_id=None,
created_at=now,
updated_at=now,
)
return self._by_name[name]
class FakeTrackRepo:
def __init__(self) -> None:
self.by_id: dict[uuid.UUID, Track] = {}
self.by_source: dict[tuple[str, str], Track] = {}
async def get_by_id(self, track_id: uuid.UUID) -> Track | None:
return self.by_id.get(track_id)
async def get_by_source(self, source: str, source_id: str) -> Track | None:
return self.by_source.get((source, source_id))
async def add(self, **kw: object) -> Track:
now = dt.datetime.now(dt.UTC)
track = Track(
id=kw["id"], # type: ignore[arg-type]
title=str(kw["title"]),
artist_id=kw["artist_id"], # type: ignore[arg-type]
album_id=None,
storage_uri=kw["storage_uri"], # type: ignore[arg-type]
file_format=kw["file_format"], # type: ignore[arg-type]
file_size=kw["file_size"], # type: ignore[arg-type]
source=str(kw["source"]),
source_id=str(kw["source_id"]),
duration_seconds=None,
genre=None,
year=None,
track_number=None,
metadata_status=str(kw["metadata_status"]),
metadata_error=None,
enriched_at=None,
availability=str(kw["availability"]),
created_at=now,
updated_at=now,
)
self.by_id[track.id] = track
self.by_source[(track.source, track.source_id)] = track
return track
def _local_track(source: str = "youtube", source_id: str = "local-1") -> Track:
now = dt.datetime.now(dt.UTC)
return Track(
id=uuid.uuid4(),
title="Already Here",
artist_id=uuid.uuid4(),
album_id=None,
storage_uri="tracks/aa/aa.m4a",
file_format="m4a",
file_size=123,
source=source,
source_id=source_id,
duration_seconds=None,
genre=None,
year=None,
track_number=None,
metadata_status="pending",
metadata_error=None,
enriched_at=None,
availability="local",
created_at=now,
updated_at=now,
)
class FakeJobRepo:
def __init__(self) -> None:
self.jobs: dict[uuid.UUID, DownloadJob] = {}
self.active: dict[tuple[str, str], DownloadJob] = {}
async def add(self, **kw: object) -> DownloadJob:
now = dt.datetime.now(dt.UTC)
job = DownloadJob(
id=uuid.uuid4(),
source=str(kw["source"]),
source_id=kw.get("source_id"), # type: ignore[arg-type]
query=kw.get("query"), # type: ignore[arg-type]
requested_by=kw.get("requested_by"), # type: ignore[arg-type]
status="queued",
progress=0.0,
error_message=None,
retry_count=0,
track_id=None,
created_at=now,
updated_at=now,
)
self.jobs[job.id] = job
return job
async def get_by_id(self, job_id: uuid.UUID) -> DownloadJob | None:
return self.jobs.get(job_id)
async def get_active_for_source(self, source: str, source_id: str) -> DownloadJob | None:
return self.active.get((source, source_id))
async def set_status(self, job_id: uuid.UUID, **kw: object) -> None:
job = self.jobs[job_id]
track_id = kw.get("track_id")
if track_id is not None:
self.jobs[job_id] = DownloadJob(
id=job.id,
source=job.source,
source_id=job.source_id,
query=job.query,
requested_by=job.requested_by,
status=str(kw.get("status", job.status)),
progress=job.progress,
error_message=job.error_message,
retry_count=job.retry_count,
track_id=track_id, # type: ignore[arg-type]
created_at=job.created_at,
updated_at=job.updated_at,
)
def _service(
*,
tracks: FakeTrackRepo,
artists: FakeArtistRepo,
jobs: FakeJobRepo,
enqueued: list[uuid.UUID],
) -> RemoteLibraryService:
async def enqueue_materialize(job_id: uuid.UUID) -> None:
enqueued.append(job_id)
return RemoteLibraryService(
tracks=tracks, # type: ignore[arg-type]
artists=artists, # type: ignore[arg-type]
jobs=jobs, # type: ignore[arg-type]
enqueue_materialize=enqueue_materialize,
)
async def test_save_remote_creates_placeholder() -> None:
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
track = await service.save_remote(
source="youtube", source_id="abc", title="Bohemian Rhapsody", artist="Queen", added_by=None
)
assert track.availability == "remote"
assert track.storage_uri is None
assert track.file_format is None
assert track.source == "youtube"
assert track.source_id == "abc"
async def test_save_remote_is_idempotent() -> None:
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
first = await service.save_remote(
source="youtube", source_id="abc", title="A", artist="Queen", added_by=None
)
second = await service.save_remote(
source="youtube", source_id="abc", title="B", artist="Other", added_by=None
)
assert first.id == second.id
assert second.title == "A" # untouched by the second call
async def test_materialize_already_local_is_noop() -> None:
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
track = _local_track()
tracks.by_id[track.id] = track
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
outcome = await service.request_materialize(track.id, requested_by=None)
assert outcome.job is None
assert outcome.track.id == track.id
assert enq == []
async def test_materialize_remote_creates_and_enqueues_job() -> None:
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
track = await service.save_remote(
source="youtube", source_id="abc", title="A", artist="Queen", added_by=None
)
outcome = await service.request_materialize(track.id, requested_by=None)
assert outcome.job is not None
assert outcome.job.source == "youtube"
assert outcome.job.source_id == "abc"
assert outcome.job.track_id == track.id
assert enq == [outcome.job.id]
async def test_materialize_reuses_active_job() -> None:
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
track = await service.save_remote(
source="youtube", source_id="abc", title="A", artist="Queen", added_by=None
)
existing = await jobs.add(source="youtube", source_id="abc", query=None, requested_by=None)
jobs.active[("youtube", "abc")] = existing
outcome = await service.request_materialize(track.id, requested_by=None)
assert outcome.job is not None
assert outcome.job.id == existing.id
assert enq == [] # not re-enqueued
async def test_materialize_missing_track_raises() -> None:
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
with pytest.raises(NotFoundError):
await service.request_materialize(uuid.uuid4(), requested_by=None)
async def test_save_remote_requires_source_id() -> None:
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
with pytest.raises(ValidationError):
await service.save_remote(
source="youtube", source_id=" ", title="A", artist=None, added_by=None
)