Compare commits
2 Commits
78007461e1
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| e45e578f54 | |||
| 58b98ab5ed |
+65
@@ -0,0 +1,65 @@
|
||||
"""remote placeholders: track availability, album/artist remote ids
|
||||
|
||||
Revision ID: dc126696f5a6
|
||||
Revises: 20260614_dl_track_id
|
||||
Create Date: 2026-06-14 11:25:30.643588
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Sequence
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = 'dc126696f5a6'
|
||||
down_revision: str | None = '20260614_dl_track_id'
|
||||
branch_labels: str | Sequence[str] | None = None
|
||||
depends_on: str | Sequence[str] | None = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.add_column('albums', sa.Column('source', sa.String(length=32), nullable=True))
|
||||
op.add_column('albums', sa.Column('source_id', sa.String(length=512), nullable=True))
|
||||
op.create_unique_constraint('uq_albums_source_source_id', 'albums', ['source', 'source_id'])
|
||||
op.add_column('artists', sa.Column('source', sa.String(length=32), nullable=True))
|
||||
op.add_column('artists', sa.Column('source_id', sa.String(length=512), nullable=True))
|
||||
op.create_unique_constraint('uq_artists_source_source_id', 'artists', ['source', 'source_id'])
|
||||
op.add_column(
|
||||
'tracks',
|
||||
sa.Column('availability', sa.String(length=16), nullable=False, server_default='local'),
|
||||
)
|
||||
op.alter_column('tracks', 'availability', server_default=None)
|
||||
op.alter_column('tracks', 'storage_uri',
|
||||
existing_type=sa.VARCHAR(length=2048),
|
||||
nullable=True)
|
||||
op.alter_column('tracks', 'file_format',
|
||||
existing_type=sa.VARCHAR(length=32),
|
||||
nullable=True)
|
||||
op.alter_column('tracks', 'file_size',
|
||||
existing_type=sa.INTEGER(),
|
||||
nullable=True)
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.alter_column('tracks', 'file_size',
|
||||
existing_type=sa.INTEGER(),
|
||||
nullable=False)
|
||||
op.alter_column('tracks', 'file_format',
|
||||
existing_type=sa.VARCHAR(length=32),
|
||||
nullable=False)
|
||||
op.alter_column('tracks', 'storage_uri',
|
||||
existing_type=sa.VARCHAR(length=2048),
|
||||
nullable=False)
|
||||
op.drop_column('tracks', 'availability')
|
||||
op.drop_constraint('uq_artists_source_source_id', 'artists', type_='unique')
|
||||
op.drop_column('artists', 'source_id')
|
||||
op.drop_column('artists', 'source')
|
||||
op.drop_constraint('uq_albums_source_source_id', 'albums', type_='unique')
|
||||
op.drop_column('albums', 'source_id')
|
||||
op.drop_column('albums', 'source')
|
||||
# ### end Alembic commands ###
|
||||
+12
-1
@@ -17,6 +17,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from app.application.auth_service import AuthService
|
||||
from app.application.download_service import DownloadService
|
||||
from app.application.metadata_service import MetadataEnrichmentService
|
||||
from app.application.remote_library_service import RemoteLibraryService
|
||||
from app.application.streaming_service import StreamingService
|
||||
from app.application.subsonic_auth_service import SubsonicAuthService
|
||||
from app.application.upload_service import UploadService
|
||||
@@ -43,7 +44,7 @@ from app.infrastructure.metadata.fingerprint import FpcalcFingerprinter
|
||||
from app.infrastructure.metadata.tags import MutagenTagReader
|
||||
from app.infrastructure.sources.registry import SourceRegistry, build_source_registry
|
||||
from app.infrastructure.storage.provider import get_file_storage
|
||||
from app.workers.queue import enqueue_download, enqueue_enrich
|
||||
from app.workers.queue import enqueue_download, enqueue_enrich, enqueue_materialize
|
||||
|
||||
|
||||
async def get_session() -> AsyncIterator[AsyncSession]:
|
||||
@@ -172,10 +173,20 @@ def get_download_service(session: SessionDep, storage: FileStorageDep) -> Downlo
|
||||
)
|
||||
|
||||
|
||||
def get_remote_library_service(session: SessionDep) -> RemoteLibraryService:
|
||||
return RemoteLibraryService(
|
||||
tracks=SqlAlchemyTrackRepository(session),
|
||||
artists=SqlAlchemyArtistRepository(session),
|
||||
jobs=SqlAlchemyDownloadJobRepository(session),
|
||||
enqueue_materialize=enqueue_materialize,
|
||||
)
|
||||
|
||||
|
||||
UploadServiceDep = Annotated[UploadService, Depends(get_upload_service)]
|
||||
StreamingServiceDep = Annotated[StreamingService, Depends(get_streaming_service)]
|
||||
MetadataServiceDep = Annotated[MetadataEnrichmentService, Depends(get_metadata_service)]
|
||||
DownloadServiceDep = Annotated[DownloadService, Depends(get_download_service)]
|
||||
RemoteLibraryServiceDep = Annotated[RemoteLibraryService, Depends(get_remote_library_service)]
|
||||
|
||||
|
||||
# -- library repository deps ---------------------------------------------------
|
||||
|
||||
@@ -65,7 +65,7 @@ async def download(
|
||||
if track is None:
|
||||
raise NotFoundError("Song not found.")
|
||||
result = await service.open_stream(track_id, None)
|
||||
filename = f"{track.title}.{track.file_format}"
|
||||
filename = f"{track.title}.{track.file_format or 'bin'}"
|
||||
headers = {
|
||||
"Content-Length": str(result.content_length),
|
||||
"Content-Disposition": f'attachment; filename="{filename}"',
|
||||
|
||||
@@ -80,8 +80,8 @@ def song_dict(
|
||||
"albumId": encode_album(track.album_id) if track.album_id is not None else None,
|
||||
"artistId": encode_artist(track.artist_id),
|
||||
"coverArt": cover,
|
||||
"size": track.file_size,
|
||||
"contentType": content_type_for(track.file_format),
|
||||
"size": track.file_size or 0,
|
||||
"contentType": content_type_for(track.file_format or ""),
|
||||
"suffix": track.file_format,
|
||||
"duration": track.duration_seconds,
|
||||
"year": track.year,
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
"""Schemas for searching external (fetch) sources — the §A4 discover screen."""
|
||||
|
||||
import uuid
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from app.domain.entities.track import Track
|
||||
from app.domain.sources import SearchResult
|
||||
|
||||
|
||||
@@ -13,9 +16,16 @@ class ExternalSearchResultOut(BaseModel):
|
||||
album: str | None
|
||||
duration_seconds: int | None
|
||||
thumbnail_url: str | None
|
||||
# Remote browse (plan: Model C) — set when this hit is already saved in the
|
||||
# library, so the UI can show "Play"/"Saved" instead of "Save to library".
|
||||
in_library: bool
|
||||
track_id: uuid.UUID | None
|
||||
availability: str | None
|
||||
|
||||
@classmethod
|
||||
def from_entity(cls, r: SearchResult) -> ExternalSearchResultOut:
|
||||
def from_entity(
|
||||
cls, r: SearchResult, *, existing: Track | None = None
|
||||
) -> ExternalSearchResultOut:
|
||||
return cls(
|
||||
source=r.source,
|
||||
source_id=r.source_id,
|
||||
@@ -24,6 +34,9 @@ class ExternalSearchResultOut(BaseModel):
|
||||
album=r.album,
|
||||
duration_seconds=r.duration_seconds,
|
||||
thumbnail_url=r.thumbnail_url,
|
||||
in_library=existing is not None,
|
||||
track_id=existing.id if existing is not None else None,
|
||||
availability=existing.availability if existing is not None else None,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -3,7 +3,9 @@
|
||||
import datetime as dt
|
||||
import uuid
|
||||
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from app.api.schemas.download import DownloadJobOut
|
||||
|
||||
|
||||
class TrackOut(BaseModel):
|
||||
@@ -14,14 +16,15 @@ class TrackOut(BaseModel):
|
||||
album_id: uuid.UUID | None
|
||||
album_title: str | None
|
||||
duration_seconds: int | None
|
||||
file_format: str
|
||||
file_size: int
|
||||
file_format: str | None
|
||||
file_size: int | None
|
||||
genre: str | None
|
||||
year: int | None
|
||||
track_number: int | None
|
||||
metadata_status: str
|
||||
metadata_error: str | None
|
||||
enriched_at: dt.datetime | None
|
||||
availability: str
|
||||
source: str
|
||||
has_cover: bool
|
||||
created_at: dt.datetime
|
||||
@@ -61,3 +64,24 @@ class MetadataApply(BaseModel):
|
||||
year: int | None = None
|
||||
genre: str | None = None
|
||||
track_number: int | None = None
|
||||
|
||||
|
||||
class RemoteTrackSave(BaseModel):
|
||||
"""Save a remote browse hit (§A4 discover) as a library placeholder —
|
||||
``availability="remote"``, no audio until first play (plan: Model C)."""
|
||||
|
||||
source: str
|
||||
source_id: str = Field(min_length=1)
|
||||
title: str
|
||||
artist: str | None = None
|
||||
|
||||
|
||||
class MaterializeResponse(BaseModel):
|
||||
"""Result of requesting that a placeholder track's audio be fetched.
|
||||
|
||||
``job`` is ``None`` when the track is already ``local`` — nothing to wait
|
||||
for, the caller can stream immediately. Otherwise it's the (new or
|
||||
already in-flight) job; poll ``GET /downloads/{job.id}`` until ``done``."""
|
||||
|
||||
track: TrackOut
|
||||
job: DownloadJobOut | None
|
||||
|
||||
@@ -18,6 +18,7 @@ router = APIRouter(prefix="/search", tags=["search"])
|
||||
async def search(
|
||||
_: CurrentUser,
|
||||
registry: SourceRegistryDep,
|
||||
track_repo: TrackRepoDep,
|
||||
q: str = Query(min_length=1),
|
||||
limit: int = Query(20, ge=1, le=50),
|
||||
) -> ExternalSearchResponse:
|
||||
@@ -25,7 +26,9 @@ async def search(
|
||||
|
||||
A source that is down contributes nothing rather than failing the whole
|
||||
request (graceful degradation); only available sources are reported as
|
||||
searched."""
|
||||
searched. Each hit is checked against the library by ``(source,
|
||||
source_id)`` so the UI can show "Saved"/"Play" instead of "Save to
|
||||
library" without a separate round-trip (remote browse, plan: Model C)."""
|
||||
results: list[ExternalSearchResultOut] = []
|
||||
searched: list[str] = []
|
||||
for backend in registry.searchables():
|
||||
@@ -33,7 +36,9 @@ async def search(
|
||||
continue
|
||||
searched.append(backend.name)
|
||||
hits = await backend.search(q, limit=limit)
|
||||
results.extend(ExternalSearchResultOut.from_entity(h) for h in hits)
|
||||
for h in hits:
|
||||
existing = await track_repo.get_by_source(h.source, h.source_id)
|
||||
results.append(ExternalSearchResultOut.from_entity(h, existing=existing))
|
||||
return ExternalSearchResponse(results=results, searched_sources=searched)
|
||||
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ is an admin action and runs in a worker — the endpoint only enqueues it.
|
||||
|
||||
from fastapi import APIRouter, Query
|
||||
|
||||
from app.api.deps import CurrentUser, SourceRegistryDep, SuperUser
|
||||
from app.api.deps import CurrentUser, SourceRegistryDep, SuperUser, TrackRepoDep
|
||||
from app.api.schemas.external_search import ExternalSearchResponse, ExternalSearchResultOut
|
||||
from app.api.schemas.source import ScanResponse, SourceHealthOut, SourceInfoOut
|
||||
from app.domain.errors import DependencyUnavailableError
|
||||
@@ -42,6 +42,7 @@ async def search_source(
|
||||
source: str,
|
||||
_: CurrentUser,
|
||||
registry: SourceRegistryDep,
|
||||
track_repo: TrackRepoDep,
|
||||
q: str = Query(min_length=1),
|
||||
limit: int = Query(20, ge=1, le=50),
|
||||
) -> ExternalSearchResponse:
|
||||
@@ -49,7 +50,8 @@ async def search_source(
|
||||
if not backend.is_available():
|
||||
raise DependencyUnavailableError(f"Source {source!r} is not available.")
|
||||
results = await backend.search(q, limit=limit)
|
||||
return ExternalSearchResponse(
|
||||
results=[ExternalSearchResultOut.from_entity(r) for r in results],
|
||||
searched_sources=[source],
|
||||
)
|
||||
out: list[ExternalSearchResultOut] = []
|
||||
for r in results:
|
||||
existing = await track_repo.get_by_source(r.source, r.source_id)
|
||||
out.append(ExternalSearchResultOut.from_entity(r, existing=existing))
|
||||
return ExternalSearchResponse(results=out, searched_sources=[source])
|
||||
|
||||
+58
-1
@@ -13,14 +13,18 @@ from app.api.deps import (
|
||||
CurrentUser,
|
||||
FileStorageDep,
|
||||
MetadataServiceDep,
|
||||
RemoteLibraryServiceDep,
|
||||
StreamUser,
|
||||
TrackRepoDep,
|
||||
)
|
||||
from app.api.schemas.download import DownloadJobOut
|
||||
from app.api.schemas.pagination import PagedResponse
|
||||
from app.api.schemas.track import (
|
||||
MaterializeResponse,
|
||||
MetadataApply,
|
||||
MetadataMatch,
|
||||
MetadataMatchesOut,
|
||||
RemoteTrackSave,
|
||||
TrackOut,
|
||||
TrackUpdate,
|
||||
)
|
||||
@@ -54,6 +58,7 @@ async def _build_track_out(
|
||||
metadata_status=t.metadata_status,
|
||||
metadata_error=t.metadata_error,
|
||||
enriched_at=t.enriched_at,
|
||||
availability=t.availability,
|
||||
source=t.source,
|
||||
has_cover=bool(t.album_id and albums.get(t.album_id) and albums[t.album_id].cover_path),
|
||||
created_at=t.created_at,
|
||||
@@ -98,6 +103,57 @@ async def list_tracks(
|
||||
return PagedResponse(items=items, total=total, limit=limit, offset=offset)
|
||||
|
||||
|
||||
@router.post("/remote", status_code=201)
|
||||
async def save_remote_track(
|
||||
body: RemoteTrackSave,
|
||||
service: RemoteLibraryServiceDep,
|
||||
artist_repo: ArtistRepoDep,
|
||||
album_repo: AlbumRepoDep,
|
||||
user: CurrentUser,
|
||||
) -> TrackOut:
|
||||
"""Save a remote browse hit (§A4 discover) as a library placeholder —
|
||||
no audio is fetched yet (plan: Model C). Idempotent on ``(source,
|
||||
source_id)``: saving an already-saved hit returns the existing track."""
|
||||
track = await service.save_remote(
|
||||
source=body.source,
|
||||
source_id=body.source_id,
|
||||
title=body.title,
|
||||
artist=body.artist,
|
||||
added_by=user.id,
|
||||
)
|
||||
|
||||
artists = {a.id: a for a in await artist_repo.get_many([track.artist_id])}
|
||||
album_ids = [track.album_id] if track.album_id else []
|
||||
albums = {a.id: a for a in await album_repo.get_many(album_ids)}
|
||||
items = await _build_track_out([track], artists, albums)
|
||||
return items[0]
|
||||
|
||||
|
||||
@router.post("/{track_id}/materialize")
|
||||
async def materialize_track(
|
||||
track_id: uuid.UUID,
|
||||
service: RemoteLibraryServiceDep,
|
||||
artist_repo: ArtistRepoDep,
|
||||
album_repo: AlbumRepoDep,
|
||||
user: CurrentUser,
|
||||
) -> MaterializeResponse:
|
||||
"""Fetch a placeholder track's audio on demand (plan: Model C lazy
|
||||
materialization). Already-local tracks return ``job=None`` — nothing to
|
||||
wait for. Otherwise poll ``GET /downloads/{job.id}`` until ``done``, then
|
||||
stream as usual."""
|
||||
outcome = await service.request_materialize(track_id, requested_by=user.id)
|
||||
|
||||
artists = {a.id: a for a in await artist_repo.get_many([outcome.track.artist_id])}
|
||||
album_ids = [outcome.track.album_id] if outcome.track.album_id else []
|
||||
albums = {a.id: a for a in await album_repo.get_many(album_ids)}
|
||||
track_out = (await _build_track_out([outcome.track], artists, albums))[0]
|
||||
|
||||
return MaterializeResponse(
|
||||
track=track_out,
|
||||
job=DownloadJobOut.from_entity(outcome.job) if outcome.job is not None else None,
|
||||
)
|
||||
|
||||
|
||||
@router.get("/{track_id}")
|
||||
async def get_track(
|
||||
track_id: uuid.UUID,
|
||||
@@ -155,7 +211,8 @@ async def delete_track(
|
||||
if track is None:
|
||||
raise NotFoundError(f"Track {track_id} not found.")
|
||||
await track_repo.delete(track_id)
|
||||
await storage.delete(track.storage_uri)
|
||||
if track.storage_uri is not None:
|
||||
await storage.delete(track.storage_uri)
|
||||
return Response(status_code=204)
|
||||
|
||||
|
||||
|
||||
@@ -79,9 +79,13 @@ class MetadataEnrichmentService:
|
||||
if track.metadata_status == "manual":
|
||||
log.info("enrich_skip_manual", track_id=str(track_id))
|
||||
return EnrichmentResult(track_id=track_id, status="skipped")
|
||||
storage_uri = track.storage_uri
|
||||
if storage_uri is None:
|
||||
log.info("enrich_skip_remote", track_id=str(track_id))
|
||||
return EnrichmentResult(track_id=track_id, status="skipped")
|
||||
|
||||
tags = await self._read_local(track.storage_uri)
|
||||
match = await self._identify(track.storage_uri)
|
||||
tags = await self._read_local(storage_uri)
|
||||
match = await self._identify(storage_uri)
|
||||
|
||||
# Merge order is tag-first by default — embedded tags fix the common
|
||||
# well-tagged offline case. But a *high-confidence* AcoustID match is the
|
||||
@@ -125,7 +129,7 @@ class MetadataEnrichmentService:
|
||||
if album is not None:
|
||||
await self._resolve_cover(
|
||||
album,
|
||||
storage_uri=track.storage_uri,
|
||||
storage_uri=storage_uri,
|
||||
release_group_mbid=match.release_group_mbid if match else None,
|
||||
)
|
||||
|
||||
@@ -175,6 +179,8 @@ class MetadataEnrichmentService:
|
||||
return []
|
||||
if not self._acoustid.is_available() or not self._fingerprinter.is_available():
|
||||
return []
|
||||
if track.storage_uri is None:
|
||||
return []
|
||||
try:
|
||||
async with self._storage.as_local_path(track.storage_uri) as path:
|
||||
fingerprint = await self._fingerprinter.calculate(path)
|
||||
|
||||
@@ -0,0 +1,122 @@
|
||||
"""RemoteLibraryService — save-to-library + materialize for remote browse hits
|
||||
(plan: Model C, on-demand YTM library).
|
||||
|
||||
Two operations:
|
||||
|
||||
* ``save_remote`` persists a placeholder ``Track`` (``availability="remote"``,
|
||||
``storage_uri=None``) for a remote browse hit. Idempotent on
|
||||
``(source, source_id)`` — CLAUDE.md dedup.
|
||||
* ``request_materialize`` lazily fills a placeholder's audio in place: it
|
||||
creates (or reuses) a ``DownloadJob`` pointing at the existing track and
|
||||
enqueues the materialize worker, which calls ``TrackRepository.materialize``
|
||||
on completion. ``track.id`` never changes (CLAUDE.md), so likes/playlists/
|
||||
queue entries referencing the placeholder keep working once it's filled in.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
from collections.abc import Awaitable, Callable
|
||||
from dataclasses import dataclass
|
||||
|
||||
from app.domain.entities.download import DownloadJob
|
||||
from app.domain.entities.track import Track
|
||||
from app.domain.errors import NotFoundError, ValidationError
|
||||
from app.domain.ports import ArtistRepository, DownloadJobRepository, TrackRepository
|
||||
|
||||
_UNKNOWN_ARTIST = "Unknown Artist"
|
||||
|
||||
# (job_id) -> None — enqueue the materialize worker, same deferred pattern as
|
||||
# download/enrich enqueuers.
|
||||
MaterializeEnqueuer = Callable[[uuid.UUID], Awaitable[None]]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MaterializeOutcome:
|
||||
"""Result of requesting materialization.
|
||||
|
||||
``job`` is ``None`` when the track is already ``local`` — nothing to do,
|
||||
the caller can stream immediately. Otherwise it's the (new or already
|
||||
in-flight) job filling the placeholder."""
|
||||
|
||||
track: Track
|
||||
job: DownloadJob | None
|
||||
|
||||
|
||||
class RemoteLibraryService:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
tracks: TrackRepository,
|
||||
artists: ArtistRepository,
|
||||
jobs: DownloadJobRepository,
|
||||
enqueue_materialize: MaterializeEnqueuer | None = None,
|
||||
) -> None:
|
||||
self._tracks = tracks
|
||||
self._artists = artists
|
||||
self._jobs = jobs
|
||||
self._enqueue_materialize = enqueue_materialize
|
||||
|
||||
async def save_remote(
|
||||
self,
|
||||
*,
|
||||
source: str,
|
||||
source_id: str,
|
||||
title: str,
|
||||
artist: str | None,
|
||||
added_by: uuid.UUID | None,
|
||||
) -> Track:
|
||||
"""Persist a placeholder for a remote browse hit. Idempotent: a hit
|
||||
already saved (by ``(source, source_id)``) is returned as-is."""
|
||||
source_id = source_id.strip()
|
||||
if not source_id:
|
||||
raise ValidationError("A source_id is required to save.")
|
||||
|
||||
existing = await self._tracks.get_by_source(source, source_id)
|
||||
if existing is not None:
|
||||
return existing
|
||||
|
||||
artist_entity = await self._artists.get_or_create(artist or _UNKNOWN_ARTIST)
|
||||
return await self._tracks.add(
|
||||
id=uuid.uuid4(),
|
||||
title=title,
|
||||
artist_id=artist_entity.id,
|
||||
storage_uri=None,
|
||||
file_format=None,
|
||||
file_size=None,
|
||||
source=source,
|
||||
source_id=source_id,
|
||||
metadata_status="pending",
|
||||
added_by=added_by,
|
||||
availability="remote",
|
||||
)
|
||||
|
||||
async def request_materialize(
|
||||
self, track_id: uuid.UUID, *, requested_by: uuid.UUID | None
|
||||
) -> MaterializeOutcome:
|
||||
"""Kick off (or report on) materializing a placeholder track.
|
||||
|
||||
Already-local tracks are a no-op (``job=None``). A track with no
|
||||
remote ``source_id`` (e.g. a deleted upload row reused for something
|
||||
else) can't be materialized."""
|
||||
track = await self._tracks.get_by_id(track_id)
|
||||
if track is None:
|
||||
raise NotFoundError(f"Track {track_id} not found.")
|
||||
if track.availability == "local":
|
||||
return MaterializeOutcome(track=track, job=None)
|
||||
if track.source_id is None:
|
||||
raise ValidationError("Track has no remote source to materialize from.")
|
||||
|
||||
active = await self._jobs.get_active_for_source(track.source, track.source_id)
|
||||
if active is not None:
|
||||
return MaterializeOutcome(track=track, job=active)
|
||||
|
||||
job = await self._jobs.add(
|
||||
source=track.source,
|
||||
source_id=track.source_id,
|
||||
query=None,
|
||||
requested_by=requested_by,
|
||||
)
|
||||
await self._jobs.set_status(job.id, status="queued", track_id=track.id)
|
||||
if self._enqueue_materialize is not None:
|
||||
await self._enqueue_materialize(job.id)
|
||||
refreshed = await self._jobs.get_by_id(job.id)
|
||||
return MaterializeOutcome(track=track, job=refreshed if refreshed is not None else job)
|
||||
@@ -72,16 +72,19 @@ class StreamingService:
|
||||
track = await self._tracks.get_by_id(track_id)
|
||||
if track is None:
|
||||
raise NotFoundError("Track not found.")
|
||||
storage_uri = track.storage_uri
|
||||
if storage_uri is None:
|
||||
raise NotFoundError("Track is not yet downloaded.")
|
||||
|
||||
stat = await self._storage.stat(track.storage_uri)
|
||||
stat = await self._storage.stat(storage_uri)
|
||||
total_size = stat.size
|
||||
content_type = stat.content_type or _FORMAT_CONTENT_TYPE.get(
|
||||
track.file_format.lower(), "application/octet-stream"
|
||||
(track.file_format or "").lower(), "application/octet-stream"
|
||||
)
|
||||
|
||||
start, end, is_partial = _parse_range(range_header, total_size)
|
||||
|
||||
stream, _ = await self._storage.open_range(track.storage_uri, start, end)
|
||||
stream, _ = await self._storage.open_range(storage_uri, start, end)
|
||||
|
||||
actual_end = end if end is not None else total_size - 1
|
||||
content_length = actual_end - start + 1
|
||||
|
||||
@@ -13,5 +13,7 @@ class Album:
|
||||
year: int | None
|
||||
cover_path: str | None
|
||||
musicbrainz_id: str | None
|
||||
source: str | None
|
||||
source_id: str | None
|
||||
created_at: dt.datetime
|
||||
updated_at: dt.datetime
|
||||
|
||||
@@ -9,6 +9,8 @@ from dataclasses import dataclass
|
||||
class Artist:
|
||||
id: uuid.UUID
|
||||
name: str
|
||||
source: str | None
|
||||
source_id: str | None
|
||||
created_at: dt.datetime
|
||||
updated_at: dt.datetime
|
||||
|
||||
@@ -19,9 +21,9 @@ class Track:
|
||||
title: str
|
||||
artist_id: uuid.UUID
|
||||
album_id: uuid.UUID | None
|
||||
storage_uri: str
|
||||
file_format: str
|
||||
file_size: int
|
||||
storage_uri: str | None
|
||||
file_format: str | None
|
||||
file_size: int | None
|
||||
source: str
|
||||
source_id: str
|
||||
duration_seconds: int | None
|
||||
@@ -31,5 +33,6 @@ class Track:
|
||||
metadata_status: str
|
||||
metadata_error: str | None
|
||||
enriched_at: dt.datetime | None
|
||||
availability: str
|
||||
created_at: dt.datetime
|
||||
updated_at: dt.datetime
|
||||
|
||||
+36
-3
@@ -114,6 +114,11 @@ class FileStorage(Protocol):
|
||||
|
||||
class ArtistRepository(Protocol):
|
||||
async def get_or_create(self, name: str) -> Artist: ...
|
||||
async def get_or_create_remote(self, *, name: str, source: str, source_id: str) -> Artist:
|
||||
"""Resolve/create an artist bound to a remote ``(source, source_id)``
|
||||
(lazy materialization save-to-library)."""
|
||||
...
|
||||
|
||||
async def get_by_id(self, artist_id: uuid.UUID) -> Artist | None: ...
|
||||
async def get_many(self, ids: list[uuid.UUID]) -> list[Artist]: ...
|
||||
async def list(self, *, q: str | None, limit: int, offset: int) -> list[Artist]: ...
|
||||
@@ -131,14 +136,28 @@ class TrackRepository(Protocol):
|
||||
id: uuid.UUID,
|
||||
title: str,
|
||||
artist_id: uuid.UUID,
|
||||
storage_uri: str,
|
||||
file_format: str,
|
||||
file_size: int,
|
||||
storage_uri: str | None,
|
||||
file_format: str | None,
|
||||
file_size: int | None,
|
||||
source: str,
|
||||
source_id: str,
|
||||
metadata_status: str,
|
||||
added_by: uuid.UUID | None,
|
||||
availability: str = ...,
|
||||
) -> Track: ...
|
||||
async def materialize(
|
||||
self,
|
||||
track_id: uuid.UUID,
|
||||
*,
|
||||
storage_uri: str,
|
||||
file_format: str,
|
||||
file_size: int,
|
||||
bitrate: int | None,
|
||||
) -> Track:
|
||||
"""Fill in a remote placeholder's audio fields after a download
|
||||
(lazy materialization), flipping ``availability`` to ``local``."""
|
||||
...
|
||||
|
||||
async def delete(self, track_id: uuid.UUID) -> None: ...
|
||||
# genres / library_stats must come before ``list`` — the method named
|
||||
# ``list`` shadows the builtin in later annotations (same pattern as
|
||||
@@ -212,6 +231,20 @@ class AlbumRepository(Protocol):
|
||||
year: int | None,
|
||||
musicbrainz_id: str | None,
|
||||
) -> Album: ...
|
||||
async def get_or_create_remote(
|
||||
self,
|
||||
*,
|
||||
title: str,
|
||||
artist_id: uuid.UUID,
|
||||
year: int | None,
|
||||
musicbrainz_id: str | None,
|
||||
source: str,
|
||||
source_id: str,
|
||||
) -> Album:
|
||||
"""Resolve/create an album bound to a remote ``(source, source_id)``
|
||||
(lazy materialization save-to-library)."""
|
||||
...
|
||||
|
||||
async def set_cover_path(self, album_id: uuid.UUID, cover_path: str) -> None: ...
|
||||
async def get_by_id(self, album_id: uuid.UUID) -> Album | None: ...
|
||||
async def get_many(self, ids: list[uuid.UUID]) -> list[Album]: ...
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import uuid
|
||||
|
||||
from sqlalchemy import ForeignKey, Integer, String
|
||||
from sqlalchemy import ForeignKey, Integer, String, UniqueConstraint
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from app.infrastructure.db.base import Base
|
||||
@@ -11,6 +11,12 @@ from app.infrastructure.db.models.mixins import TimestampMixin, UUIDPrimaryKeyMi
|
||||
|
||||
class AlbumModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
|
||||
__tablename__ = "albums"
|
||||
__table_args__ = (
|
||||
# Binds a remote (browsable) album to its local row for re-browse/save
|
||||
# dedup. Multiple NULLs are allowed by Postgres, so locally-created
|
||||
# albums (source/source_id both NULL) never collide on this.
|
||||
UniqueConstraint("source", "source_id", name="uq_albums_source_source_id"),
|
||||
)
|
||||
|
||||
title: Mapped[str] = mapped_column(String(1024), index=True, nullable=False)
|
||||
artist_id: Mapped[uuid.UUID] = mapped_column(
|
||||
@@ -21,3 +27,7 @@ class AlbumModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
|
||||
year: Mapped[int | None] = mapped_column(Integer, nullable=True)
|
||||
cover_path: Mapped[str | None] = mapped_column(String(1024), nullable=True)
|
||||
musicbrainz_id: Mapped[str | None] = mapped_column(String(36), index=True, nullable=True)
|
||||
|
||||
# -- remote identity (lazy materialization) --------------------------
|
||||
source: Mapped[str | None] = mapped_column(String(32), nullable=True)
|
||||
source_id: Mapped[str | None] = mapped_column(String(512), nullable=True)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""ORM model for artists."""
|
||||
|
||||
from sqlalchemy import String
|
||||
from sqlalchemy import String, UniqueConstraint
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from app.infrastructure.db.base import Base
|
||||
@@ -9,6 +9,16 @@ from app.infrastructure.db.models.mixins import TimestampMixin, UUIDPrimaryKeyMi
|
||||
|
||||
class ArtistModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
|
||||
__tablename__ = "artists"
|
||||
__table_args__ = (
|
||||
# Binds a remote (browsable) artist to its local row for re-browse/save
|
||||
# dedup. Multiple NULLs are allowed by Postgres, so locally-created
|
||||
# artists (source/source_id both NULL) never collide on this.
|
||||
UniqueConstraint("source", "source_id", name="uq_artists_source_source_id"),
|
||||
)
|
||||
|
||||
name: Mapped[str] = mapped_column(String(512), index=True, nullable=False)
|
||||
musicbrainz_id: Mapped[str | None] = mapped_column(String(36), index=True, nullable=True)
|
||||
|
||||
# -- remote identity (lazy materialization) --------------------------
|
||||
source: Mapped[str | None] = mapped_column(String(32), nullable=True)
|
||||
source_id: Mapped[str | None] = mapped_column(String(512), nullable=True)
|
||||
|
||||
@@ -64,3 +64,12 @@ class LyricsStatus(enum.StrEnum):
|
||||
FOUND = "found"
|
||||
NOT_FOUND = "not_found"
|
||||
PENDING = "pending"
|
||||
|
||||
|
||||
class TrackAvailability(enum.StrEnum):
|
||||
"""Whether a track's audio is on local storage or still a remote placeholder
|
||||
(plan: lazy materialization). ``remote`` tracks have ``storage_uri = NULL``
|
||||
until ``TrackRepository.materialize`` fills it in."""
|
||||
|
||||
LOCAL = "local"
|
||||
REMOTE = "remote"
|
||||
|
||||
@@ -13,7 +13,7 @@ from sqlalchemy import DateTime, ForeignKey, Integer, String, UniqueConstraint
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from app.infrastructure.db.base import Base
|
||||
from app.infrastructure.db.models.enums import MetadataStatus, StoragePolicy
|
||||
from app.infrastructure.db.models.enums import MetadataStatus, StoragePolicy, TrackAvailability
|
||||
from app.infrastructure.db.models.mixins import TimestampMixin, UUIDPrimaryKeyMixin
|
||||
|
||||
|
||||
@@ -41,11 +41,20 @@ class TrackModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
|
||||
year: Mapped[int | None] = mapped_column(Integer, nullable=True)
|
||||
|
||||
# -- file (original, stored as-is) -----------------------------------
|
||||
storage_uri: Mapped[str] = mapped_column(String(2048), nullable=False)
|
||||
file_format: Mapped[str] = mapped_column(String(32), nullable=False)
|
||||
file_size: Mapped[int] = mapped_column(Integer, nullable=False)
|
||||
# NULL on a remote placeholder (not yet materialized) — see ``availability``.
|
||||
storage_uri: Mapped[str | None] = mapped_column(String(2048), nullable=True)
|
||||
file_format: Mapped[str | None] = mapped_column(String(32), nullable=True)
|
||||
file_size: Mapped[int | None] = mapped_column(Integer, nullable=True)
|
||||
bitrate: Mapped[int | None] = mapped_column(Integer, nullable=True)
|
||||
|
||||
# ``remote`` = placeholder with no local audio yet; materialize() flips this
|
||||
# to ``local`` once the file is downloaded and ``storage_uri`` is filled in.
|
||||
availability: Mapped[str] = mapped_column(
|
||||
String(16),
|
||||
nullable=False,
|
||||
default=TrackAvailability.LOCAL.value,
|
||||
)
|
||||
|
||||
# -- dedup / external ids --------------------------------------------
|
||||
acoustid_fingerprint: Mapped[str | None] = mapped_column(String(64), index=True, nullable=True)
|
||||
musicbrainz_id: Mapped[str | None] = mapped_column(String(36), index=True, nullable=True)
|
||||
|
||||
@@ -18,6 +18,8 @@ def _to_entity(row: AlbumModel) -> Album:
|
||||
year=row.year,
|
||||
cover_path=row.cover_path,
|
||||
musicbrainz_id=row.musicbrainz_id,
|
||||
source=row.source,
|
||||
source_id=row.source_id,
|
||||
created_at=row.created_at,
|
||||
updated_at=row.updated_at,
|
||||
)
|
||||
@@ -63,6 +65,58 @@ class SqlAlchemyAlbumRepository:
|
||||
await self._session.refresh(row)
|
||||
return _to_entity(row)
|
||||
|
||||
async def get_or_create_remote(
|
||||
self,
|
||||
*,
|
||||
title: str,
|
||||
artist_id: uuid.UUID,
|
||||
year: int | None,
|
||||
musicbrainz_id: str | None,
|
||||
source: str,
|
||||
source_id: str,
|
||||
) -> Album:
|
||||
"""Resolve an album by ``(source, source_id)`` first (re-browse/save
|
||||
dedup), falling back to ``(title, artist_id)`` and gap-filling the
|
||||
remote ids onto an existing row, else creating a new remote-bound row."""
|
||||
row = (
|
||||
await self._session.execute(
|
||||
select(AlbumModel).where(
|
||||
AlbumModel.source == source,
|
||||
AlbumModel.source_id == source_id,
|
||||
)
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
if row is None:
|
||||
row = (
|
||||
await self._session.execute(
|
||||
select(AlbumModel).where(
|
||||
AlbumModel.title == title,
|
||||
AlbumModel.artist_id == artist_id,
|
||||
)
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
if row is None:
|
||||
row = AlbumModel(
|
||||
title=title,
|
||||
artist_id=artist_id,
|
||||
year=year,
|
||||
musicbrainz_id=musicbrainz_id,
|
||||
source=source,
|
||||
source_id=source_id,
|
||||
)
|
||||
self._session.add(row)
|
||||
else:
|
||||
if row.year is None and year is not None:
|
||||
row.year = year
|
||||
if row.musicbrainz_id is None and musicbrainz_id is not None:
|
||||
row.musicbrainz_id = musicbrainz_id
|
||||
if row.source is None and row.source_id is None:
|
||||
row.source = source
|
||||
row.source_id = source_id
|
||||
await self._session.flush()
|
||||
await self._session.refresh(row)
|
||||
return _to_entity(row)
|
||||
|
||||
async def set_cover_path(self, album_id: uuid.UUID, cover_path: str) -> None:
|
||||
row = await self._session.get(AlbumModel, album_id)
|
||||
if row is not None:
|
||||
|
||||
@@ -15,6 +15,8 @@ def _to_entity(row: ArtistModel) -> Artist:
|
||||
return Artist(
|
||||
id=row.id,
|
||||
name=row.name,
|
||||
source=row.source,
|
||||
source_id=row.source_id,
|
||||
created_at=row.created_at,
|
||||
updated_at=row.updated_at,
|
||||
)
|
||||
@@ -35,6 +37,32 @@ class SqlAlchemyArtistRepository:
|
||||
await self._session.refresh(row)
|
||||
return _to_entity(row)
|
||||
|
||||
async def get_or_create_remote(self, *, name: str, source: str, source_id: str) -> Artist:
|
||||
"""Resolve an artist by ``(source, source_id)`` first (re-browse/save
|
||||
dedup), falling back to ``name`` and gap-filling the remote ids onto an
|
||||
existing row, else creating a new remote-bound row."""
|
||||
row = (
|
||||
await self._session.execute(
|
||||
select(ArtistModel).where(
|
||||
ArtistModel.source == source,
|
||||
ArtistModel.source_id == source_id,
|
||||
)
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
if row is None:
|
||||
row = (
|
||||
await self._session.execute(select(ArtistModel).where(ArtistModel.name == name))
|
||||
).scalar_one_or_none()
|
||||
if row is None:
|
||||
row = ArtistModel(name=name, source=source, source_id=source_id)
|
||||
self._session.add(row)
|
||||
elif row.source is None and row.source_id is None:
|
||||
row.source = source
|
||||
row.source_id = source_id
|
||||
await self._session.flush()
|
||||
await self._session.refresh(row)
|
||||
return _to_entity(row)
|
||||
|
||||
async def get_by_id(self, artist_id: uuid.UUID) -> Artist | None:
|
||||
row = await self._session.get(ArtistModel, artist_id)
|
||||
return _to_entity(row) if row is not None else None
|
||||
|
||||
@@ -42,6 +42,7 @@ def _track_to_entity(row: TrackModel) -> Track:
|
||||
metadata_status=row.metadata_status,
|
||||
metadata_error=row.metadata_error,
|
||||
enriched_at=row.enriched_at,
|
||||
availability=row.availability,
|
||||
created_at=row.created_at,
|
||||
updated_at=row.updated_at,
|
||||
)
|
||||
|
||||
@@ -41,6 +41,7 @@ def _track_to_entity(row: TrackModel) -> Track:
|
||||
metadata_status=row.metadata_status,
|
||||
metadata_error=row.metadata_error,
|
||||
enriched_at=row.enriched_at,
|
||||
availability=row.availability,
|
||||
created_at=row.created_at,
|
||||
updated_at=row.updated_at,
|
||||
)
|
||||
|
||||
@@ -10,6 +10,7 @@ from app.domain.entities.storage import FormatBreakdown, LibraryStats
|
||||
from app.domain.entities.track import Track
|
||||
from app.domain.errors import NotFoundError
|
||||
from app.infrastructure.db.models.artist import ArtistModel
|
||||
from app.infrastructure.db.models.enums import TrackAvailability
|
||||
from app.infrastructure.db.models.track import TrackModel
|
||||
|
||||
|
||||
@@ -31,6 +32,7 @@ def _to_entity(row: TrackModel) -> Track:
|
||||
metadata_status=row.metadata_status,
|
||||
metadata_error=row.metadata_error,
|
||||
enriched_at=row.enriched_at,
|
||||
availability=row.availability,
|
||||
created_at=row.created_at,
|
||||
updated_at=row.updated_at,
|
||||
)
|
||||
@@ -61,13 +63,14 @@ class SqlAlchemyTrackRepository:
|
||||
id: uuid.UUID,
|
||||
title: str,
|
||||
artist_id: uuid.UUID,
|
||||
storage_uri: str,
|
||||
file_format: str,
|
||||
file_size: int,
|
||||
storage_uri: str | None,
|
||||
file_format: str | None,
|
||||
file_size: int | None,
|
||||
source: str,
|
||||
source_id: str,
|
||||
metadata_status: str,
|
||||
added_by: uuid.UUID | None,
|
||||
availability: str = TrackAvailability.LOCAL.value,
|
||||
) -> Track:
|
||||
row = TrackModel(
|
||||
id=id,
|
||||
@@ -80,12 +83,38 @@ class SqlAlchemyTrackRepository:
|
||||
source_id=source_id,
|
||||
metadata_status=metadata_status,
|
||||
added_by=added_by,
|
||||
availability=availability,
|
||||
)
|
||||
self._session.add(row)
|
||||
await self._session.flush()
|
||||
await self._session.refresh(row)
|
||||
return _to_entity(row)
|
||||
|
||||
async def materialize(
|
||||
self,
|
||||
track_id: uuid.UUID,
|
||||
*,
|
||||
storage_uri: str,
|
||||
file_format: str,
|
||||
file_size: int,
|
||||
bitrate: int | None,
|
||||
) -> Track:
|
||||
"""Fill in a remote placeholder's audio fields after a download (lazy
|
||||
materialization). ``track.id`` is unchanged, so likes/playlists/queue
|
||||
entries that already reference it keep working."""
|
||||
row = await self._session.get(TrackModel, track_id)
|
||||
if row is None:
|
||||
raise NotFoundError(f"Track {track_id} not found.")
|
||||
row.storage_uri = storage_uri
|
||||
row.file_format = file_format
|
||||
row.file_size = file_size
|
||||
if bitrate is not None:
|
||||
row.bitrate = bitrate
|
||||
row.availability = TrackAvailability.LOCAL.value
|
||||
await self._session.flush()
|
||||
await self._session.refresh(row)
|
||||
return _to_entity(row)
|
||||
|
||||
async def delete(self, track_id: uuid.UUID) -> None:
|
||||
row = await self._session.get(TrackModel, track_id)
|
||||
if row is not None:
|
||||
@@ -130,6 +159,7 @@ class SqlAlchemyTrackRepository:
|
||||
func.count(TrackModel.id),
|
||||
func.coalesce(func.sum(TrackModel.file_size), 0),
|
||||
)
|
||||
.where(TrackModel.file_format.is_not(None))
|
||||
.group_by(TrackModel.file_format)
|
||||
.order_by(func.sum(TrackModel.file_size).desc())
|
||||
)
|
||||
|
||||
@@ -12,6 +12,7 @@ from app.core.logging import configure_logging, get_logger
|
||||
from app.workers.tasks.download_task import download_track
|
||||
from app.workers.tasks.enrich_task import enrich_track
|
||||
from app.workers.tasks.import_task import scan_local_folder
|
||||
from app.workers.tasks.materialize_task import materialize_track
|
||||
|
||||
log = get_logger("worker")
|
||||
|
||||
@@ -27,7 +28,12 @@ async def shutdown(_ctx: dict[str, Any]) -> None:
|
||||
|
||||
|
||||
class WorkerSettings:
|
||||
functions: ClassVar[list[Any]] = [scan_local_folder, enrich_track, download_track]
|
||||
functions: ClassVar[list[Any]] = [
|
||||
scan_local_folder,
|
||||
enrich_track,
|
||||
download_track,
|
||||
materialize_track,
|
||||
]
|
||||
on_startup = startup
|
||||
on_shutdown = shutdown
|
||||
max_jobs = get_settings().max_parallel_downloads
|
||||
|
||||
@@ -48,6 +48,17 @@ async def enqueue_download(job_id: uuid.UUID) -> None:
|
||||
log.warning("download_enqueue_failed", job_id=str(job_id))
|
||||
|
||||
|
||||
async def enqueue_materialize(job_id: uuid.UUID) -> None:
|
||||
"""Best-effort enqueue of a materialize job for the worker (plan: Model C
|
||||
lazy materialization). Same deferred-commit reasoning as
|
||||
:func:`enqueue_download` — the job row stays ``queued`` and can be retried
|
||||
if the queue is unreachable."""
|
||||
try:
|
||||
await enqueue("materialize_track", job_id=str(job_id), _defer_by=3)
|
||||
except DependencyUnavailableError:
|
||||
log.warning("materialize_enqueue_failed", job_id=str(job_id))
|
||||
|
||||
|
||||
async def enqueue_enrich(track_id: uuid.UUID) -> None:
|
||||
"""Best-effort enqueue of metadata enrichment for a freshly stored track.
|
||||
|
||||
|
||||
@@ -0,0 +1,101 @@
|
||||
"""arq task: materialize a remote placeholder track (plan: Model C).
|
||||
|
||||
Counterpart to ``download_task`` for tracks that were *saved* from a remote
|
||||
browse hit without audio (``availability="remote"``, ``storage_uri=NULL``).
|
||||
The job's ``track_id`` already points at the existing placeholder row — on
|
||||
success the file is stored and ``TrackRepository.materialize`` fills the row
|
||||
in place (the track's ``id`` never changes), then enrichment is enqueued as
|
||||
usual.
|
||||
|
||||
Shares its fetch/retry/failure machinery with ``download_task`` — only the
|
||||
"what happens on success" step differs (fill in an existing row vs. create a
|
||||
new one).
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import uuid
|
||||
from typing import Any
|
||||
|
||||
import anyio
|
||||
|
||||
from app.core.config import get_settings
|
||||
from app.core.logging import correlation_id, get_logger
|
||||
from app.domain.errors import NotFoundError, ValidationError
|
||||
from app.domain.sources import DownloadResult
|
||||
from app.infrastructure.db import session_scope
|
||||
from app.infrastructure.db.repositories import (
|
||||
SqlAlchemyDownloadJobRepository,
|
||||
SqlAlchemyTrackRepository,
|
||||
)
|
||||
from app.infrastructure.sources.registry import build_source_registry
|
||||
from app.infrastructure.storage.provider import get_file_storage
|
||||
from app.workers.queue import enqueue_enrich
|
||||
from app.workers.tasks.download_task import _handle_failure, _load_job, _mark_failed, _run_fetch
|
||||
|
||||
log = get_logger("worker.materialize")
|
||||
|
||||
|
||||
async def materialize_track(_ctx: dict[str, Any], *, job_id: str) -> dict[str, Any]:
|
||||
correlation_id.set(f"mat:{job_id}")
|
||||
jid = uuid.UUID(job_id)
|
||||
settings = get_settings()
|
||||
|
||||
job = await _load_job(jid)
|
||||
if job is None:
|
||||
log.info("materialize_job_missing", job_id=job_id) # cancelled before pickup
|
||||
return {"job_id": job_id, "status": "missing"}
|
||||
if job.track_id is None or job.source_id is None:
|
||||
await _mark_failed(jid, "Materialize job missing track_id/source_id.")
|
||||
return {"job_id": job_id, "status": "failed"}
|
||||
|
||||
registry = build_source_registry(settings)
|
||||
try:
|
||||
backend = registry.fetchable(job.source)
|
||||
except (NotFoundError, ValidationError) as exc:
|
||||
await _mark_failed(jid, f"Source unavailable: {exc}")
|
||||
return {"job_id": job_id, "status": "failed"}
|
||||
|
||||
await _set_status(jid, "downloading")
|
||||
try:
|
||||
result = await _run_fetch(backend, job.source_id, jid)
|
||||
except Exception as exc:
|
||||
return await _handle_failure(jid, exc, settings.download_max_retries, job_id)
|
||||
|
||||
try:
|
||||
await _materialize_result(jid, job.track_id, result)
|
||||
except Exception as exc:
|
||||
log.exception("materialize_finalize_failed", job_id=job_id)
|
||||
await _mark_failed(jid, f"Materialize failed: {type(exc).__name__}: {exc}")
|
||||
return {"job_id": job_id, "status": "failed"}
|
||||
|
||||
await enqueue_enrich(job.track_id)
|
||||
log.info("materialize_complete", job_id=job_id, track_id=str(job.track_id))
|
||||
return {"job_id": job_id, "status": "done", "track_id": str(job.track_id)}
|
||||
|
||||
|
||||
async def _materialize_result(jid: uuid.UUID, track_id: uuid.UUID, result: DownloadResult) -> None:
|
||||
"""Store the downloaded file and fill in the placeholder track in place."""
|
||||
key = f"tracks/{str(track_id)[:2]}/{track_id}.{result.file_format}"
|
||||
storage = get_file_storage()
|
||||
try:
|
||||
await storage.save_file(key, result.path)
|
||||
async with session_scope() as session:
|
||||
job_repo = SqlAlchemyDownloadJobRepository(session)
|
||||
await job_repo.set_status(jid, status="enriching")
|
||||
tracks = SqlAlchemyTrackRepository(session)
|
||||
await tracks.materialize(
|
||||
track_id,
|
||||
storage_uri=key,
|
||||
file_format=result.file_format,
|
||||
file_size=result.file_size,
|
||||
bitrate=result.bitrate,
|
||||
)
|
||||
await job_repo.set_status(jid, status="done", track_id=track_id)
|
||||
finally:
|
||||
with contextlib.suppress(Exception):
|
||||
await anyio.Path(result.path).unlink(missing_ok=True)
|
||||
|
||||
|
||||
async def _set_status(jid: uuid.UUID, status: str) -> None:
|
||||
async with session_scope() as session:
|
||||
await SqlAlchemyDownloadJobRepository(session).set_status(jid, status=status)
|
||||
@@ -16,7 +16,14 @@ pytestmark = pytest.mark.asyncio
|
||||
class FakeArtistRepo:
|
||||
async def get_or_create(self, name: str) -> Artist:
|
||||
now = dt.datetime.now(dt.UTC)
|
||||
return Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now)
|
||||
return Artist(
|
||||
id=uuid.uuid4(),
|
||||
name=name,
|
||||
source=None,
|
||||
source_id=None,
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
|
||||
|
||||
class FakeTrackRepo:
|
||||
@@ -46,6 +53,7 @@ class FakeTrackRepo:
|
||||
metadata_status=str(kw["metadata_status"]),
|
||||
metadata_error=None,
|
||||
enriched_at=None,
|
||||
availability="local",
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
@@ -140,6 +148,7 @@ def _track(source: str, source_id: str) -> Track:
|
||||
metadata_status="pending",
|
||||
metadata_error=None,
|
||||
enriched_at=None,
|
||||
availability="local",
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
|
||||
@@ -164,6 +164,114 @@ async def test_search_aggregates_fetch_sources(api: AsyncClient) -> None:
|
||||
assert hit["title"] == "queen song"
|
||||
|
||||
|
||||
async def test_search_reports_library_status(api: AsyncClient) -> None:
|
||||
"""Remote browse (plan: Model C) — a fresh hit isn't in the library; after
|
||||
saving it as a placeholder, the same search reports it as such."""
|
||||
token = await _login(api)
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
|
||||
resp = await api.get("/api/v1/search", params={"q": "queen"}, headers=headers)
|
||||
hit = resp.json()["results"][0]
|
||||
assert hit["in_library"] is False
|
||||
assert hit["track_id"] is None
|
||||
assert hit["availability"] is None
|
||||
|
||||
save = await api.post(
|
||||
"/api/v1/tracks/remote",
|
||||
json={
|
||||
"source": hit["source"],
|
||||
"source_id": hit["source_id"],
|
||||
"title": hit["title"],
|
||||
"artist": hit["artist"],
|
||||
},
|
||||
headers=headers,
|
||||
)
|
||||
assert save.status_code == 201
|
||||
saved = save.json()
|
||||
assert saved["availability"] == "remote"
|
||||
assert saved["file_format"] is None
|
||||
|
||||
resp2 = await api.get("/api/v1/search", params={"q": "queen"}, headers=headers)
|
||||
hit2 = resp2.json()["results"][0]
|
||||
assert hit2["in_library"] is True
|
||||
assert hit2["track_id"] == saved["id"]
|
||||
assert hit2["availability"] == "remote"
|
||||
|
||||
|
||||
async def test_save_remote_is_idempotent(api: AsyncClient) -> None:
|
||||
token = await _login(api)
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
payload = {"source": "youtube", "source_id": "vid-idem", "title": "A", "artist": "Artist"}
|
||||
|
||||
first = await api.post("/api/v1/tracks/remote", json=payload, headers=headers)
|
||||
second = await api.post("/api/v1/tracks/remote", json=payload, headers=headers)
|
||||
|
||||
assert first.status_code == 201
|
||||
assert second.status_code == 201
|
||||
assert first.json()["id"] == second.json()["id"]
|
||||
|
||||
|
||||
async def test_materialize_flow(
|
||||
api: AsyncClient, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
"""Save a placeholder, materialize it on demand, and confirm it streams
|
||||
afterwards (plan: Model C lazy materialization)."""
|
||||
token = await _login(api)
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
|
||||
save = await api.post(
|
||||
"/api/v1/tracks/remote",
|
||||
json={
|
||||
"source": "youtube",
|
||||
"source_id": "vid-mat-1",
|
||||
"title": "Materialize Me",
|
||||
"artist": "Artist",
|
||||
},
|
||||
headers=headers,
|
||||
)
|
||||
track_id = save.json()["id"]
|
||||
assert save.json()["availability"] == "remote"
|
||||
|
||||
# Streaming a placeholder before materialization fails (no audio yet).
|
||||
stream_before = await api.get(f"/api/v1/stream/{track_id}", headers=headers)
|
||||
assert stream_before.status_code == 404
|
||||
|
||||
materialize = await api.post(f"/api/v1/tracks/{track_id}/materialize", headers=headers)
|
||||
assert materialize.status_code == 200
|
||||
body = materialize.json()
|
||||
assert body["job"] is not None
|
||||
job_id = body["job"]["id"]
|
||||
assert body["job"]["track_id"] == track_id
|
||||
|
||||
# A second materialize request reuses the same in-flight job.
|
||||
again = await api.post(f"/api/v1/tracks/{track_id}/materialize", headers=headers)
|
||||
assert again.json()["job"]["id"] == job_id
|
||||
|
||||
# Run the worker task directly (bypasses Redis) with the fake fetch source.
|
||||
import app.workers.tasks.materialize_task as mat_task
|
||||
|
||||
worker_dir = tmp_path / "worker-mat"
|
||||
worker_dir.mkdir()
|
||||
fake = SourceRegistry([FakeFetchSource(worker_dir)]) # type: ignore[list-item]
|
||||
monkeypatch.setattr(mat_task, "build_source_registry", lambda _settings: fake)
|
||||
|
||||
result = await mat_task.materialize_track({}, job_id=job_id)
|
||||
assert result["status"] == "done"
|
||||
assert result["track_id"] == track_id
|
||||
|
||||
got = await api.get(f"/api/v1/tracks/{track_id}", headers=headers)
|
||||
assert got.json()["availability"] == "local"
|
||||
assert got.json()["file_format"] == "webm"
|
||||
|
||||
# Streaming now works.
|
||||
stream_after = await api.get(f"/api/v1/stream/{track_id}", headers=headers)
|
||||
assert stream_after.status_code == 200
|
||||
|
||||
# Materializing an already-local track is a no-op.
|
||||
noop = await api.post(f"/api/v1/tracks/{track_id}/materialize", headers=headers)
|
||||
assert noop.json()["job"] is None
|
||||
|
||||
|
||||
async def test_source_scoped_search(api: AsyncClient) -> None:
|
||||
token = await _login(api)
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
|
||||
@@ -20,7 +20,14 @@ class FakeArtistRepo:
|
||||
async def get_or_create(self, name: str) -> Artist:
|
||||
if name not in self._by_name:
|
||||
now = dt.datetime.now(dt.UTC)
|
||||
self._by_name[name] = Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now)
|
||||
self._by_name[name] = Artist(
|
||||
id=uuid.uuid4(),
|
||||
name=name,
|
||||
source=None,
|
||||
source_id=None,
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
return self._by_name[name]
|
||||
|
||||
|
||||
@@ -55,6 +62,7 @@ class FakeTrackRepo:
|
||||
metadata_status=str(kw["metadata_status"]),
|
||||
metadata_error=None,
|
||||
enriched_at=None,
|
||||
availability="local",
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
@@ -139,6 +147,7 @@ async def test_dedup_skips_already_imported() -> None:
|
||||
metadata_status="pending",
|
||||
metadata_error=None,
|
||||
enriched_at=None,
|
||||
availability="local",
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
|
||||
@@ -0,0 +1,156 @@
|
||||
"""Integration tests for the lazy-materialization foundation:
|
||||
``TrackRepository.materialize`` and ``Album``/``ArtistRepository.get_or_create_remote``.
|
||||
|
||||
Requires a reachable Postgres; skips otherwise (same pattern as
|
||||
``test_upload_stream_api.py``).
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import uuid
|
||||
from collections.abc import AsyncIterator
|
||||
|
||||
import pytest
|
||||
from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
|
||||
from app.infrastructure.db.models.enums import TrackAvailability
|
||||
from app.infrastructure.db.repositories import (
|
||||
SqlAlchemyAlbumRepository,
|
||||
SqlAlchemyArtistRepository,
|
||||
SqlAlchemyTrackRepository,
|
||||
)
|
||||
|
||||
pytestmark = pytest.mark.asyncio
|
||||
|
||||
_db_reachable_cache: bool | None = None
|
||||
|
||||
|
||||
async def _db_reachable() -> bool:
|
||||
global _db_reachable_cache
|
||||
if _db_reachable_cache is not None:
|
||||
return _db_reachable_cache
|
||||
|
||||
from sqlalchemy import text
|
||||
|
||||
try:
|
||||
async with asyncio.timeout(3):
|
||||
async with get_engine().connect() as conn:
|
||||
await conn.execute(text("SELECT 1"))
|
||||
_db_reachable_cache = True
|
||||
except Exception:
|
||||
_db_reachable_cache = False
|
||||
return _db_reachable_cache
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def db() -> AsyncIterator[None]:
|
||||
if not await _db_reachable():
|
||||
pytest.skip("Postgres not reachable — integration test skipped.")
|
||||
|
||||
async with get_engine().begin() as conn:
|
||||
await conn.run_sync(Base.metadata.drop_all)
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
|
||||
yield None
|
||||
|
||||
async with get_engine().begin() as conn:
|
||||
await conn.run_sync(Base.metadata.drop_all)
|
||||
await dispose_engine()
|
||||
|
||||
|
||||
async def test_placeholder_track_materializes_in_place(db: None) -> None:
|
||||
"""A remote placeholder (no storage) gets its audio fields filled in by
|
||||
``materialize`` without changing ``track.id`` — the stable content id that
|
||||
likes/playlists/queue may already reference."""
|
||||
async with session_scope() as session:
|
||||
artists = SqlAlchemyArtistRepository(session)
|
||||
tracks = SqlAlchemyTrackRepository(session)
|
||||
artist = await artists.get_or_create("Some Artist")
|
||||
|
||||
track_id = uuid.uuid4()
|
||||
placeholder = await tracks.add(
|
||||
id=track_id,
|
||||
title="Remote Track",
|
||||
artist_id=artist.id,
|
||||
storage_uri=None,
|
||||
file_format=None,
|
||||
file_size=None,
|
||||
source="youtube",
|
||||
source_id="abc123",
|
||||
metadata_status="pending",
|
||||
added_by=None,
|
||||
availability=TrackAvailability.REMOTE.value,
|
||||
)
|
||||
assert placeholder.availability == "remote"
|
||||
assert placeholder.storage_uri is None
|
||||
|
||||
materialized = await tracks.materialize(
|
||||
track_id,
|
||||
storage_uri="tracks/ab/abc123.m4a",
|
||||
file_format="m4a",
|
||||
file_size=12345,
|
||||
bitrate=160,
|
||||
)
|
||||
|
||||
assert materialized.id == track_id
|
||||
assert materialized.availability == "local"
|
||||
assert materialized.storage_uri == "tracks/ab/abc123.m4a"
|
||||
assert materialized.file_format == "m4a"
|
||||
assert materialized.file_size == 12345
|
||||
|
||||
|
||||
async def test_artist_get_or_create_remote_dedups_by_remote_id(db: None) -> None:
|
||||
async with session_scope() as session:
|
||||
artists = SqlAlchemyArtistRepository(session)
|
||||
|
||||
first = await artists.get_or_create_remote(
|
||||
name="Daft Punk", source="youtube", source_id="UCabc"
|
||||
)
|
||||
again = await artists.get_or_create_remote(
|
||||
name="Daft Punk (different display name)", source="youtube", source_id="UCabc"
|
||||
)
|
||||
|
||||
assert first.id == again.id
|
||||
assert again.source == "youtube"
|
||||
assert again.source_id == "UCabc"
|
||||
|
||||
|
||||
async def test_artist_get_or_create_remote_binds_existing_local_artist(db: None) -> None:
|
||||
async with session_scope() as session:
|
||||
artists = SqlAlchemyArtistRepository(session)
|
||||
|
||||
local = await artists.get_or_create("Pink Floyd")
|
||||
remote = await artists.get_or_create_remote(
|
||||
name="Pink Floyd", source="youtube", source_id="UCxyz"
|
||||
)
|
||||
|
||||
assert remote.id == local.id
|
||||
assert remote.source == "youtube"
|
||||
assert remote.source_id == "UCxyz"
|
||||
|
||||
|
||||
async def test_album_get_or_create_remote_dedups_by_remote_id(db: None) -> None:
|
||||
async with session_scope() as session:
|
||||
artists = SqlAlchemyArtistRepository(session)
|
||||
albums = SqlAlchemyAlbumRepository(session)
|
||||
artist = await artists.get_or_create("Daft Punk")
|
||||
|
||||
first = await albums.get_or_create_remote(
|
||||
title="Discovery",
|
||||
artist_id=artist.id,
|
||||
year=2001,
|
||||
musicbrainz_id=None,
|
||||
source="youtube",
|
||||
source_id="MPREb_abc",
|
||||
)
|
||||
again = await albums.get_or_create_remote(
|
||||
title="Discovery",
|
||||
artist_id=artist.id,
|
||||
year=None,
|
||||
musicbrainz_id=None,
|
||||
source="youtube",
|
||||
source_id="MPREb_abc",
|
||||
)
|
||||
|
||||
assert first.id == again.id
|
||||
assert again.source == "youtube"
|
||||
assert again.source_id == "MPREb_abc"
|
||||
assert again.year == 2001
|
||||
@@ -42,6 +42,7 @@ def _track(*, metadata_status: str = "pending", title: str = "raw-stem") -> Trac
|
||||
metadata_status=metadata_status,
|
||||
metadata_error=None,
|
||||
enriched_at=None,
|
||||
availability="local",
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
@@ -67,7 +68,14 @@ class FakeArtistRepo:
|
||||
async def get_or_create(self, name: str) -> Artist:
|
||||
self.created.append(name)
|
||||
now = dt.datetime.now(dt.UTC)
|
||||
return Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now)
|
||||
return Artist(
|
||||
id=uuid.uuid4(),
|
||||
name=name,
|
||||
source=None,
|
||||
source_id=None,
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
|
||||
|
||||
class FakeAlbumRepo:
|
||||
@@ -88,6 +96,8 @@ class FakeAlbumRepo:
|
||||
year=year,
|
||||
cover_path=self._existing_cover,
|
||||
musicbrainz_id=musicbrainz_id,
|
||||
source=None,
|
||||
source_id=None,
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
|
||||
@@ -149,6 +149,7 @@ def _pending_track() -> Track:
|
||||
metadata_status="pending",
|
||||
metadata_error=None,
|
||||
enriched_at=None,
|
||||
availability="local",
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
@@ -180,7 +181,14 @@ class _FakeArtistRepo:
|
||||
async def get_or_create(self, name: str) -> Artist:
|
||||
self.created.append(name)
|
||||
now = datetime.now(UTC)
|
||||
return Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now)
|
||||
return Artist(
|
||||
id=uuid.uuid4(),
|
||||
name=name,
|
||||
source=None,
|
||||
source_id=None,
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -199,6 +207,8 @@ class _FakeAlbumRepo:
|
||||
year=year,
|
||||
cover_path=None,
|
||||
musicbrainz_id=musicbrainz_id,
|
||||
source=None,
|
||||
source_id=None,
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
|
||||
@@ -0,0 +1,255 @@
|
||||
"""Unit tests for RemoteLibraryService — DB-free, in-memory fakes (plan: Model C
|
||||
remote browse + lazy materialization)."""
|
||||
|
||||
import datetime as dt
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from app.application.remote_library_service import RemoteLibraryService
|
||||
from app.domain.entities import Artist, DownloadJob, Track
|
||||
from app.domain.errors import NotFoundError, ValidationError
|
||||
|
||||
pytestmark = pytest.mark.asyncio
|
||||
|
||||
|
||||
class FakeArtistRepo:
|
||||
def __init__(self) -> None:
|
||||
self._by_name: dict[str, Artist] = {}
|
||||
|
||||
async def get_or_create(self, name: str) -> Artist:
|
||||
if name not in self._by_name:
|
||||
now = dt.datetime.now(dt.UTC)
|
||||
self._by_name[name] = Artist(
|
||||
id=uuid.uuid4(),
|
||||
name=name,
|
||||
source=None,
|
||||
source_id=None,
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
return self._by_name[name]
|
||||
|
||||
|
||||
class FakeTrackRepo:
|
||||
def __init__(self) -> None:
|
||||
self.by_id: dict[uuid.UUID, Track] = {}
|
||||
self.by_source: dict[tuple[str, str], Track] = {}
|
||||
|
||||
async def get_by_id(self, track_id: uuid.UUID) -> Track | None:
|
||||
return self.by_id.get(track_id)
|
||||
|
||||
async def get_by_source(self, source: str, source_id: str) -> Track | None:
|
||||
return self.by_source.get((source, source_id))
|
||||
|
||||
async def add(self, **kw: object) -> Track:
|
||||
now = dt.datetime.now(dt.UTC)
|
||||
track = Track(
|
||||
id=kw["id"], # type: ignore[arg-type]
|
||||
title=str(kw["title"]),
|
||||
artist_id=kw["artist_id"], # type: ignore[arg-type]
|
||||
album_id=None,
|
||||
storage_uri=kw["storage_uri"], # type: ignore[arg-type]
|
||||
file_format=kw["file_format"], # type: ignore[arg-type]
|
||||
file_size=kw["file_size"], # type: ignore[arg-type]
|
||||
source=str(kw["source"]),
|
||||
source_id=str(kw["source_id"]),
|
||||
duration_seconds=None,
|
||||
genre=None,
|
||||
year=None,
|
||||
track_number=None,
|
||||
metadata_status=str(kw["metadata_status"]),
|
||||
metadata_error=None,
|
||||
enriched_at=None,
|
||||
availability=str(kw["availability"]),
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
self.by_id[track.id] = track
|
||||
self.by_source[(track.source, track.source_id)] = track
|
||||
return track
|
||||
|
||||
|
||||
def _local_track(source: str = "youtube", source_id: str = "local-1") -> Track:
|
||||
now = dt.datetime.now(dt.UTC)
|
||||
return Track(
|
||||
id=uuid.uuid4(),
|
||||
title="Already Here",
|
||||
artist_id=uuid.uuid4(),
|
||||
album_id=None,
|
||||
storage_uri="tracks/aa/aa.m4a",
|
||||
file_format="m4a",
|
||||
file_size=123,
|
||||
source=source,
|
||||
source_id=source_id,
|
||||
duration_seconds=None,
|
||||
genre=None,
|
||||
year=None,
|
||||
track_number=None,
|
||||
metadata_status="pending",
|
||||
metadata_error=None,
|
||||
enriched_at=None,
|
||||
availability="local",
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
|
||||
|
||||
class FakeJobRepo:
|
||||
def __init__(self) -> None:
|
||||
self.jobs: dict[uuid.UUID, DownloadJob] = {}
|
||||
self.active: dict[tuple[str, str], DownloadJob] = {}
|
||||
|
||||
async def add(self, **kw: object) -> DownloadJob:
|
||||
now = dt.datetime.now(dt.UTC)
|
||||
job = DownloadJob(
|
||||
id=uuid.uuid4(),
|
||||
source=str(kw["source"]),
|
||||
source_id=kw.get("source_id"), # type: ignore[arg-type]
|
||||
query=kw.get("query"), # type: ignore[arg-type]
|
||||
requested_by=kw.get("requested_by"), # type: ignore[arg-type]
|
||||
status="queued",
|
||||
progress=0.0,
|
||||
error_message=None,
|
||||
retry_count=0,
|
||||
track_id=None,
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
self.jobs[job.id] = job
|
||||
return job
|
||||
|
||||
async def get_by_id(self, job_id: uuid.UUID) -> DownloadJob | None:
|
||||
return self.jobs.get(job_id)
|
||||
|
||||
async def get_active_for_source(self, source: str, source_id: str) -> DownloadJob | None:
|
||||
return self.active.get((source, source_id))
|
||||
|
||||
async def set_status(self, job_id: uuid.UUID, **kw: object) -> None:
|
||||
job = self.jobs[job_id]
|
||||
track_id = kw.get("track_id")
|
||||
if track_id is not None:
|
||||
self.jobs[job_id] = DownloadJob(
|
||||
id=job.id,
|
||||
source=job.source,
|
||||
source_id=job.source_id,
|
||||
query=job.query,
|
||||
requested_by=job.requested_by,
|
||||
status=str(kw.get("status", job.status)),
|
||||
progress=job.progress,
|
||||
error_message=job.error_message,
|
||||
retry_count=job.retry_count,
|
||||
track_id=track_id, # type: ignore[arg-type]
|
||||
created_at=job.created_at,
|
||||
updated_at=job.updated_at,
|
||||
)
|
||||
|
||||
|
||||
def _service(
|
||||
*,
|
||||
tracks: FakeTrackRepo,
|
||||
artists: FakeArtistRepo,
|
||||
jobs: FakeJobRepo,
|
||||
enqueued: list[uuid.UUID],
|
||||
) -> RemoteLibraryService:
|
||||
async def enqueue_materialize(job_id: uuid.UUID) -> None:
|
||||
enqueued.append(job_id)
|
||||
|
||||
return RemoteLibraryService(
|
||||
tracks=tracks, # type: ignore[arg-type]
|
||||
artists=artists, # type: ignore[arg-type]
|
||||
jobs=jobs, # type: ignore[arg-type]
|
||||
enqueue_materialize=enqueue_materialize,
|
||||
)
|
||||
|
||||
|
||||
async def test_save_remote_creates_placeholder() -> None:
|
||||
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
|
||||
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
|
||||
|
||||
track = await service.save_remote(
|
||||
source="youtube", source_id="abc", title="Bohemian Rhapsody", artist="Queen", added_by=None
|
||||
)
|
||||
|
||||
assert track.availability == "remote"
|
||||
assert track.storage_uri is None
|
||||
assert track.file_format is None
|
||||
assert track.source == "youtube"
|
||||
assert track.source_id == "abc"
|
||||
|
||||
|
||||
async def test_save_remote_is_idempotent() -> None:
|
||||
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
|
||||
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
|
||||
|
||||
first = await service.save_remote(
|
||||
source="youtube", source_id="abc", title="A", artist="Queen", added_by=None
|
||||
)
|
||||
second = await service.save_remote(
|
||||
source="youtube", source_id="abc", title="B", artist="Other", added_by=None
|
||||
)
|
||||
|
||||
assert first.id == second.id
|
||||
assert second.title == "A" # untouched by the second call
|
||||
|
||||
|
||||
async def test_materialize_already_local_is_noop() -> None:
|
||||
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
|
||||
track = _local_track()
|
||||
tracks.by_id[track.id] = track
|
||||
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
|
||||
|
||||
outcome = await service.request_materialize(track.id, requested_by=None)
|
||||
|
||||
assert outcome.job is None
|
||||
assert outcome.track.id == track.id
|
||||
assert enq == []
|
||||
|
||||
|
||||
async def test_materialize_remote_creates_and_enqueues_job() -> None:
|
||||
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
|
||||
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
|
||||
track = await service.save_remote(
|
||||
source="youtube", source_id="abc", title="A", artist="Queen", added_by=None
|
||||
)
|
||||
|
||||
outcome = await service.request_materialize(track.id, requested_by=None)
|
||||
|
||||
assert outcome.job is not None
|
||||
assert outcome.job.source == "youtube"
|
||||
assert outcome.job.source_id == "abc"
|
||||
assert outcome.job.track_id == track.id
|
||||
assert enq == [outcome.job.id]
|
||||
|
||||
|
||||
async def test_materialize_reuses_active_job() -> None:
|
||||
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
|
||||
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
|
||||
track = await service.save_remote(
|
||||
source="youtube", source_id="abc", title="A", artist="Queen", added_by=None
|
||||
)
|
||||
existing = await jobs.add(source="youtube", source_id="abc", query=None, requested_by=None)
|
||||
jobs.active[("youtube", "abc")] = existing
|
||||
|
||||
outcome = await service.request_materialize(track.id, requested_by=None)
|
||||
|
||||
assert outcome.job is not None
|
||||
assert outcome.job.id == existing.id
|
||||
assert enq == [] # not re-enqueued
|
||||
|
||||
|
||||
async def test_materialize_missing_track_raises() -> None:
|
||||
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
|
||||
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
|
||||
|
||||
with pytest.raises(NotFoundError):
|
||||
await service.request_materialize(uuid.uuid4(), requested_by=None)
|
||||
|
||||
|
||||
async def test_save_remote_requires_source_id() -> None:
|
||||
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
|
||||
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
|
||||
|
||||
with pytest.raises(ValidationError):
|
||||
await service.save_remote(
|
||||
source="youtube", source_id=" ", title="A", artist=None, added_by=None
|
||||
)
|
||||
Reference in New Issue
Block a user