diff --git a/app/api/deps.py b/app/api/deps.py index d4a9296..d3dcf31 100644 --- a/app/api/deps.py +++ b/app/api/deps.py @@ -17,6 +17,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.application.auth_service import AuthService from app.application.download_service import DownloadService from app.application.metadata_service import MetadataEnrichmentService +from app.application.remote_library_service import RemoteLibraryService from app.application.streaming_service import StreamingService from app.application.subsonic_auth_service import SubsonicAuthService from app.application.upload_service import UploadService @@ -43,7 +44,7 @@ from app.infrastructure.metadata.fingerprint import FpcalcFingerprinter from app.infrastructure.metadata.tags import MutagenTagReader from app.infrastructure.sources.registry import SourceRegistry, build_source_registry from app.infrastructure.storage.provider import get_file_storage -from app.workers.queue import enqueue_download, enqueue_enrich +from app.workers.queue import enqueue_download, enqueue_enrich, enqueue_materialize async def get_session() -> AsyncIterator[AsyncSession]: @@ -172,10 +173,20 @@ def get_download_service(session: SessionDep, storage: FileStorageDep) -> Downlo ) +def get_remote_library_service(session: SessionDep) -> RemoteLibraryService: + return RemoteLibraryService( + tracks=SqlAlchemyTrackRepository(session), + artists=SqlAlchemyArtistRepository(session), + jobs=SqlAlchemyDownloadJobRepository(session), + enqueue_materialize=enqueue_materialize, + ) + + UploadServiceDep = Annotated[UploadService, Depends(get_upload_service)] StreamingServiceDep = Annotated[StreamingService, Depends(get_streaming_service)] MetadataServiceDep = Annotated[MetadataEnrichmentService, Depends(get_metadata_service)] DownloadServiceDep = Annotated[DownloadService, Depends(get_download_service)] +RemoteLibraryServiceDep = Annotated[RemoteLibraryService, Depends(get_remote_library_service)] # -- library repository deps --------------------------------------------------- diff --git a/app/api/schemas/external_search.py b/app/api/schemas/external_search.py index 8999f6f..4b7e944 100644 --- a/app/api/schemas/external_search.py +++ b/app/api/schemas/external_search.py @@ -1,7 +1,10 @@ """Schemas for searching external (fetch) sources — the §A4 discover screen.""" +import uuid + from pydantic import BaseModel +from app.domain.entities.track import Track from app.domain.sources import SearchResult @@ -13,9 +16,16 @@ class ExternalSearchResultOut(BaseModel): album: str | None duration_seconds: int | None thumbnail_url: str | None + # Remote browse (plan: Model C) — set when this hit is already saved in the + # library, so the UI can show "Play"/"Saved" instead of "Save to library". + in_library: bool + track_id: uuid.UUID | None + availability: str | None @classmethod - def from_entity(cls, r: SearchResult) -> ExternalSearchResultOut: + def from_entity( + cls, r: SearchResult, *, existing: Track | None = None + ) -> ExternalSearchResultOut: return cls( source=r.source, source_id=r.source_id, @@ -24,6 +34,9 @@ class ExternalSearchResultOut(BaseModel): album=r.album, duration_seconds=r.duration_seconds, thumbnail_url=r.thumbnail_url, + in_library=existing is not None, + track_id=existing.id if existing is not None else None, + availability=existing.availability if existing is not None else None, ) diff --git a/app/api/schemas/track.py b/app/api/schemas/track.py index b438af6..a0740f5 100644 --- a/app/api/schemas/track.py +++ b/app/api/schemas/track.py @@ -3,7 +3,9 @@ import datetime as dt import uuid -from pydantic import BaseModel +from pydantic import BaseModel, Field + +from app.api.schemas.download import DownloadJobOut class TrackOut(BaseModel): @@ -62,3 +64,24 @@ class MetadataApply(BaseModel): year: int | None = None genre: str | None = None track_number: int | None = None + + +class RemoteTrackSave(BaseModel): + """Save a remote browse hit (§A4 discover) as a library placeholder — + ``availability="remote"``, no audio until first play (plan: Model C).""" + + source: str + source_id: str = Field(min_length=1) + title: str + artist: str | None = None + + +class MaterializeResponse(BaseModel): + """Result of requesting that a placeholder track's audio be fetched. + + ``job`` is ``None`` when the track is already ``local`` — nothing to wait + for, the caller can stream immediately. Otherwise it's the (new or + already in-flight) job; poll ``GET /downloads/{job.id}`` until ``done``.""" + + track: TrackOut + job: DownloadJobOut | None diff --git a/app/api/v1/search.py b/app/api/v1/search.py index 219ea6a..8eb1401 100644 --- a/app/api/v1/search.py +++ b/app/api/v1/search.py @@ -18,6 +18,7 @@ router = APIRouter(prefix="/search", tags=["search"]) async def search( _: CurrentUser, registry: SourceRegistryDep, + track_repo: TrackRepoDep, q: str = Query(min_length=1), limit: int = Query(20, ge=1, le=50), ) -> ExternalSearchResponse: @@ -25,7 +26,9 @@ async def search( A source that is down contributes nothing rather than failing the whole request (graceful degradation); only available sources are reported as - searched.""" + searched. Each hit is checked against the library by ``(source, + source_id)`` so the UI can show "Saved"/"Play" instead of "Save to + library" without a separate round-trip (remote browse, plan: Model C).""" results: list[ExternalSearchResultOut] = [] searched: list[str] = [] for backend in registry.searchables(): @@ -33,7 +36,9 @@ async def search( continue searched.append(backend.name) hits = await backend.search(q, limit=limit) - results.extend(ExternalSearchResultOut.from_entity(h) for h in hits) + for h in hits: + existing = await track_repo.get_by_source(h.source, h.source_id) + results.append(ExternalSearchResultOut.from_entity(h, existing=existing)) return ExternalSearchResponse(results=results, searched_sources=searched) diff --git a/app/api/v1/sources.py b/app/api/v1/sources.py index e62e32e..4bbf0b6 100644 --- a/app/api/v1/sources.py +++ b/app/api/v1/sources.py @@ -6,7 +6,7 @@ is an admin action and runs in a worker — the endpoint only enqueues it. from fastapi import APIRouter, Query -from app.api.deps import CurrentUser, SourceRegistryDep, SuperUser +from app.api.deps import CurrentUser, SourceRegistryDep, SuperUser, TrackRepoDep from app.api.schemas.external_search import ExternalSearchResponse, ExternalSearchResultOut from app.api.schemas.source import ScanResponse, SourceHealthOut, SourceInfoOut from app.domain.errors import DependencyUnavailableError @@ -42,6 +42,7 @@ async def search_source( source: str, _: CurrentUser, registry: SourceRegistryDep, + track_repo: TrackRepoDep, q: str = Query(min_length=1), limit: int = Query(20, ge=1, le=50), ) -> ExternalSearchResponse: @@ -49,7 +50,8 @@ async def search_source( if not backend.is_available(): raise DependencyUnavailableError(f"Source {source!r} is not available.") results = await backend.search(q, limit=limit) - return ExternalSearchResponse( - results=[ExternalSearchResultOut.from_entity(r) for r in results], - searched_sources=[source], - ) + out: list[ExternalSearchResultOut] = [] + for r in results: + existing = await track_repo.get_by_source(r.source, r.source_id) + out.append(ExternalSearchResultOut.from_entity(r, existing=existing)) + return ExternalSearchResponse(results=out, searched_sources=[source]) diff --git a/app/api/v1/tracks.py b/app/api/v1/tracks.py index 427f64c..996af8f 100644 --- a/app/api/v1/tracks.py +++ b/app/api/v1/tracks.py @@ -13,14 +13,18 @@ from app.api.deps import ( CurrentUser, FileStorageDep, MetadataServiceDep, + RemoteLibraryServiceDep, StreamUser, TrackRepoDep, ) +from app.api.schemas.download import DownloadJobOut from app.api.schemas.pagination import PagedResponse from app.api.schemas.track import ( + MaterializeResponse, MetadataApply, MetadataMatch, MetadataMatchesOut, + RemoteTrackSave, TrackOut, TrackUpdate, ) @@ -99,6 +103,57 @@ async def list_tracks( return PagedResponse(items=items, total=total, limit=limit, offset=offset) +@router.post("/remote", status_code=201) +async def save_remote_track( + body: RemoteTrackSave, + service: RemoteLibraryServiceDep, + artist_repo: ArtistRepoDep, + album_repo: AlbumRepoDep, + user: CurrentUser, +) -> TrackOut: + """Save a remote browse hit (§A4 discover) as a library placeholder — + no audio is fetched yet (plan: Model C). Idempotent on ``(source, + source_id)``: saving an already-saved hit returns the existing track.""" + track = await service.save_remote( + source=body.source, + source_id=body.source_id, + title=body.title, + artist=body.artist, + added_by=user.id, + ) + + artists = {a.id: a for a in await artist_repo.get_many([track.artist_id])} + album_ids = [track.album_id] if track.album_id else [] + albums = {a.id: a for a in await album_repo.get_many(album_ids)} + items = await _build_track_out([track], artists, albums) + return items[0] + + +@router.post("/{track_id}/materialize") +async def materialize_track( + track_id: uuid.UUID, + service: RemoteLibraryServiceDep, + artist_repo: ArtistRepoDep, + album_repo: AlbumRepoDep, + user: CurrentUser, +) -> MaterializeResponse: + """Fetch a placeholder track's audio on demand (plan: Model C lazy + materialization). Already-local tracks return ``job=None`` — nothing to + wait for. Otherwise poll ``GET /downloads/{job.id}`` until ``done``, then + stream as usual.""" + outcome = await service.request_materialize(track_id, requested_by=user.id) + + artists = {a.id: a for a in await artist_repo.get_many([outcome.track.artist_id])} + album_ids = [outcome.track.album_id] if outcome.track.album_id else [] + albums = {a.id: a for a in await album_repo.get_many(album_ids)} + track_out = (await _build_track_out([outcome.track], artists, albums))[0] + + return MaterializeResponse( + track=track_out, + job=DownloadJobOut.from_entity(outcome.job) if outcome.job is not None else None, + ) + + @router.get("/{track_id}") async def get_track( track_id: uuid.UUID, diff --git a/app/application/remote_library_service.py b/app/application/remote_library_service.py new file mode 100644 index 0000000..59393a9 --- /dev/null +++ b/app/application/remote_library_service.py @@ -0,0 +1,122 @@ +"""RemoteLibraryService — save-to-library + materialize for remote browse hits +(plan: Model C, on-demand YTM library). + +Two operations: + +* ``save_remote`` persists a placeholder ``Track`` (``availability="remote"``, + ``storage_uri=None``) for a remote browse hit. Idempotent on + ``(source, source_id)`` — CLAUDE.md dedup. +* ``request_materialize`` lazily fills a placeholder's audio in place: it + creates (or reuses) a ``DownloadJob`` pointing at the existing track and + enqueues the materialize worker, which calls ``TrackRepository.materialize`` + on completion. ``track.id`` never changes (CLAUDE.md), so likes/playlists/ + queue entries referencing the placeholder keep working once it's filled in. +""" + +import uuid +from collections.abc import Awaitable, Callable +from dataclasses import dataclass + +from app.domain.entities.download import DownloadJob +from app.domain.entities.track import Track +from app.domain.errors import NotFoundError, ValidationError +from app.domain.ports import ArtistRepository, DownloadJobRepository, TrackRepository + +_UNKNOWN_ARTIST = "Unknown Artist" + +# (job_id) -> None — enqueue the materialize worker, same deferred pattern as +# download/enrich enqueuers. +MaterializeEnqueuer = Callable[[uuid.UUID], Awaitable[None]] + + +@dataclass(frozen=True) +class MaterializeOutcome: + """Result of requesting materialization. + + ``job`` is ``None`` when the track is already ``local`` — nothing to do, + the caller can stream immediately. Otherwise it's the (new or already + in-flight) job filling the placeholder.""" + + track: Track + job: DownloadJob | None + + +class RemoteLibraryService: + def __init__( + self, + *, + tracks: TrackRepository, + artists: ArtistRepository, + jobs: DownloadJobRepository, + enqueue_materialize: MaterializeEnqueuer | None = None, + ) -> None: + self._tracks = tracks + self._artists = artists + self._jobs = jobs + self._enqueue_materialize = enqueue_materialize + + async def save_remote( + self, + *, + source: str, + source_id: str, + title: str, + artist: str | None, + added_by: uuid.UUID | None, + ) -> Track: + """Persist a placeholder for a remote browse hit. Idempotent: a hit + already saved (by ``(source, source_id)``) is returned as-is.""" + source_id = source_id.strip() + if not source_id: + raise ValidationError("A source_id is required to save.") + + existing = await self._tracks.get_by_source(source, source_id) + if existing is not None: + return existing + + artist_entity = await self._artists.get_or_create(artist or _UNKNOWN_ARTIST) + return await self._tracks.add( + id=uuid.uuid4(), + title=title, + artist_id=artist_entity.id, + storage_uri=None, + file_format=None, + file_size=None, + source=source, + source_id=source_id, + metadata_status="pending", + added_by=added_by, + availability="remote", + ) + + async def request_materialize( + self, track_id: uuid.UUID, *, requested_by: uuid.UUID | None + ) -> MaterializeOutcome: + """Kick off (or report on) materializing a placeholder track. + + Already-local tracks are a no-op (``job=None``). A track with no + remote ``source_id`` (e.g. a deleted upload row reused for something + else) can't be materialized.""" + track = await self._tracks.get_by_id(track_id) + if track is None: + raise NotFoundError(f"Track {track_id} not found.") + if track.availability == "local": + return MaterializeOutcome(track=track, job=None) + if track.source_id is None: + raise ValidationError("Track has no remote source to materialize from.") + + active = await self._jobs.get_active_for_source(track.source, track.source_id) + if active is not None: + return MaterializeOutcome(track=track, job=active) + + job = await self._jobs.add( + source=track.source, + source_id=track.source_id, + query=None, + requested_by=requested_by, + ) + await self._jobs.set_status(job.id, status="queued", track_id=track.id) + if self._enqueue_materialize is not None: + await self._enqueue_materialize(job.id) + refreshed = await self._jobs.get_by_id(job.id) + return MaterializeOutcome(track=track, job=refreshed if refreshed is not None else job) diff --git a/app/workers/arq_worker.py b/app/workers/arq_worker.py index 7e6c785..682ac8d 100644 --- a/app/workers/arq_worker.py +++ b/app/workers/arq_worker.py @@ -12,6 +12,7 @@ from app.core.logging import configure_logging, get_logger from app.workers.tasks.download_task import download_track from app.workers.tasks.enrich_task import enrich_track from app.workers.tasks.import_task import scan_local_folder +from app.workers.tasks.materialize_task import materialize_track log = get_logger("worker") @@ -27,7 +28,12 @@ async def shutdown(_ctx: dict[str, Any]) -> None: class WorkerSettings: - functions: ClassVar[list[Any]] = [scan_local_folder, enrich_track, download_track] + functions: ClassVar[list[Any]] = [ + scan_local_folder, + enrich_track, + download_track, + materialize_track, + ] on_startup = startup on_shutdown = shutdown max_jobs = get_settings().max_parallel_downloads diff --git a/app/workers/queue.py b/app/workers/queue.py index e6b59ac..75682e8 100644 --- a/app/workers/queue.py +++ b/app/workers/queue.py @@ -48,6 +48,17 @@ async def enqueue_download(job_id: uuid.UUID) -> None: log.warning("download_enqueue_failed", job_id=str(job_id)) +async def enqueue_materialize(job_id: uuid.UUID) -> None: + """Best-effort enqueue of a materialize job for the worker (plan: Model C + lazy materialization). Same deferred-commit reasoning as + :func:`enqueue_download` — the job row stays ``queued`` and can be retried + if the queue is unreachable.""" + try: + await enqueue("materialize_track", job_id=str(job_id), _defer_by=3) + except DependencyUnavailableError: + log.warning("materialize_enqueue_failed", job_id=str(job_id)) + + async def enqueue_enrich(track_id: uuid.UUID) -> None: """Best-effort enqueue of metadata enrichment for a freshly stored track. diff --git a/app/workers/tasks/materialize_task.py b/app/workers/tasks/materialize_task.py new file mode 100644 index 0000000..76c285c --- /dev/null +++ b/app/workers/tasks/materialize_task.py @@ -0,0 +1,101 @@ +"""arq task: materialize a remote placeholder track (plan: Model C). + +Counterpart to ``download_task`` for tracks that were *saved* from a remote +browse hit without audio (``availability="remote"``, ``storage_uri=NULL``). +The job's ``track_id`` already points at the existing placeholder row — on +success the file is stored and ``TrackRepository.materialize`` fills the row +in place (the track's ``id`` never changes), then enrichment is enqueued as +usual. + +Shares its fetch/retry/failure machinery with ``download_task`` — only the +"what happens on success" step differs (fill in an existing row vs. create a +new one). +""" + +import contextlib +import uuid +from typing import Any + +import anyio + +from app.core.config import get_settings +from app.core.logging import correlation_id, get_logger +from app.domain.errors import NotFoundError, ValidationError +from app.domain.sources import DownloadResult +from app.infrastructure.db import session_scope +from app.infrastructure.db.repositories import ( + SqlAlchemyDownloadJobRepository, + SqlAlchemyTrackRepository, +) +from app.infrastructure.sources.registry import build_source_registry +from app.infrastructure.storage.provider import get_file_storage +from app.workers.queue import enqueue_enrich +from app.workers.tasks.download_task import _handle_failure, _load_job, _mark_failed, _run_fetch + +log = get_logger("worker.materialize") + + +async def materialize_track(_ctx: dict[str, Any], *, job_id: str) -> dict[str, Any]: + correlation_id.set(f"mat:{job_id}") + jid = uuid.UUID(job_id) + settings = get_settings() + + job = await _load_job(jid) + if job is None: + log.info("materialize_job_missing", job_id=job_id) # cancelled before pickup + return {"job_id": job_id, "status": "missing"} + if job.track_id is None or job.source_id is None: + await _mark_failed(jid, "Materialize job missing track_id/source_id.") + return {"job_id": job_id, "status": "failed"} + + registry = build_source_registry(settings) + try: + backend = registry.fetchable(job.source) + except (NotFoundError, ValidationError) as exc: + await _mark_failed(jid, f"Source unavailable: {exc}") + return {"job_id": job_id, "status": "failed"} + + await _set_status(jid, "downloading") + try: + result = await _run_fetch(backend, job.source_id, jid) + except Exception as exc: + return await _handle_failure(jid, exc, settings.download_max_retries, job_id) + + try: + await _materialize_result(jid, job.track_id, result) + except Exception as exc: + log.exception("materialize_finalize_failed", job_id=job_id) + await _mark_failed(jid, f"Materialize failed: {type(exc).__name__}: {exc}") + return {"job_id": job_id, "status": "failed"} + + await enqueue_enrich(job.track_id) + log.info("materialize_complete", job_id=job_id, track_id=str(job.track_id)) + return {"job_id": job_id, "status": "done", "track_id": str(job.track_id)} + + +async def _materialize_result(jid: uuid.UUID, track_id: uuid.UUID, result: DownloadResult) -> None: + """Store the downloaded file and fill in the placeholder track in place.""" + key = f"tracks/{str(track_id)[:2]}/{track_id}.{result.file_format}" + storage = get_file_storage() + try: + await storage.save_file(key, result.path) + async with session_scope() as session: + job_repo = SqlAlchemyDownloadJobRepository(session) + await job_repo.set_status(jid, status="enriching") + tracks = SqlAlchemyTrackRepository(session) + await tracks.materialize( + track_id, + storage_uri=key, + file_format=result.file_format, + file_size=result.file_size, + bitrate=result.bitrate, + ) + await job_repo.set_status(jid, status="done", track_id=track_id) + finally: + with contextlib.suppress(Exception): + await anyio.Path(result.path).unlink(missing_ok=True) + + +async def _set_status(jid: uuid.UUID, status: str) -> None: + async with session_scope() as session: + await SqlAlchemyDownloadJobRepository(session).set_status(jid, status=status) diff --git a/tests/test_downloads_api.py b/tests/test_downloads_api.py index dcffbc4..990d409 100644 --- a/tests/test_downloads_api.py +++ b/tests/test_downloads_api.py @@ -164,6 +164,114 @@ async def test_search_aggregates_fetch_sources(api: AsyncClient) -> None: assert hit["title"] == "queen song" +async def test_search_reports_library_status(api: AsyncClient) -> None: + """Remote browse (plan: Model C) — a fresh hit isn't in the library; after + saving it as a placeholder, the same search reports it as such.""" + token = await _login(api) + headers = {"Authorization": f"Bearer {token}"} + + resp = await api.get("/api/v1/search", params={"q": "queen"}, headers=headers) + hit = resp.json()["results"][0] + assert hit["in_library"] is False + assert hit["track_id"] is None + assert hit["availability"] is None + + save = await api.post( + "/api/v1/tracks/remote", + json={ + "source": hit["source"], + "source_id": hit["source_id"], + "title": hit["title"], + "artist": hit["artist"], + }, + headers=headers, + ) + assert save.status_code == 201 + saved = save.json() + assert saved["availability"] == "remote" + assert saved["file_format"] is None + + resp2 = await api.get("/api/v1/search", params={"q": "queen"}, headers=headers) + hit2 = resp2.json()["results"][0] + assert hit2["in_library"] is True + assert hit2["track_id"] == saved["id"] + assert hit2["availability"] == "remote" + + +async def test_save_remote_is_idempotent(api: AsyncClient) -> None: + token = await _login(api) + headers = {"Authorization": f"Bearer {token}"} + payload = {"source": "youtube", "source_id": "vid-idem", "title": "A", "artist": "Artist"} + + first = await api.post("/api/v1/tracks/remote", json=payload, headers=headers) + second = await api.post("/api/v1/tracks/remote", json=payload, headers=headers) + + assert first.status_code == 201 + assert second.status_code == 201 + assert first.json()["id"] == second.json()["id"] + + +async def test_materialize_flow( + api: AsyncClient, tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """Save a placeholder, materialize it on demand, and confirm it streams + afterwards (plan: Model C lazy materialization).""" + token = await _login(api) + headers = {"Authorization": f"Bearer {token}"} + + save = await api.post( + "/api/v1/tracks/remote", + json={ + "source": "youtube", + "source_id": "vid-mat-1", + "title": "Materialize Me", + "artist": "Artist", + }, + headers=headers, + ) + track_id = save.json()["id"] + assert save.json()["availability"] == "remote" + + # Streaming a placeholder before materialization fails (no audio yet). + stream_before = await api.get(f"/api/v1/stream/{track_id}", headers=headers) + assert stream_before.status_code == 404 + + materialize = await api.post(f"/api/v1/tracks/{track_id}/materialize", headers=headers) + assert materialize.status_code == 200 + body = materialize.json() + assert body["job"] is not None + job_id = body["job"]["id"] + assert body["job"]["track_id"] == track_id + + # A second materialize request reuses the same in-flight job. + again = await api.post(f"/api/v1/tracks/{track_id}/materialize", headers=headers) + assert again.json()["job"]["id"] == job_id + + # Run the worker task directly (bypasses Redis) with the fake fetch source. + import app.workers.tasks.materialize_task as mat_task + + worker_dir = tmp_path / "worker-mat" + worker_dir.mkdir() + fake = SourceRegistry([FakeFetchSource(worker_dir)]) # type: ignore[list-item] + monkeypatch.setattr(mat_task, "build_source_registry", lambda _settings: fake) + + result = await mat_task.materialize_track({}, job_id=job_id) + assert result["status"] == "done" + assert result["track_id"] == track_id + + got = await api.get(f"/api/v1/tracks/{track_id}", headers=headers) + assert got.json()["availability"] == "local" + assert got.json()["file_format"] == "webm" + + # Streaming now works. + stream_after = await api.get(f"/api/v1/stream/{track_id}", headers=headers) + assert stream_after.status_code == 200 + + # Materializing an already-local track is a no-op. + noop = await api.post(f"/api/v1/tracks/{track_id}/materialize", headers=headers) + assert noop.json()["job"] is None + + async def test_source_scoped_search(api: AsyncClient) -> None: token = await _login(api) headers = {"Authorization": f"Bearer {token}"} diff --git a/tests/test_remote_library_service.py b/tests/test_remote_library_service.py new file mode 100644 index 0000000..1735a25 --- /dev/null +++ b/tests/test_remote_library_service.py @@ -0,0 +1,255 @@ +"""Unit tests for RemoteLibraryService — DB-free, in-memory fakes (plan: Model C +remote browse + lazy materialization).""" + +import datetime as dt +import uuid + +import pytest +from app.application.remote_library_service import RemoteLibraryService +from app.domain.entities import Artist, DownloadJob, Track +from app.domain.errors import NotFoundError, ValidationError + +pytestmark = pytest.mark.asyncio + + +class FakeArtistRepo: + def __init__(self) -> None: + self._by_name: dict[str, Artist] = {} + + async def get_or_create(self, name: str) -> Artist: + if name not in self._by_name: + now = dt.datetime.now(dt.UTC) + self._by_name[name] = Artist( + id=uuid.uuid4(), + name=name, + source=None, + source_id=None, + created_at=now, + updated_at=now, + ) + return self._by_name[name] + + +class FakeTrackRepo: + def __init__(self) -> None: + self.by_id: dict[uuid.UUID, Track] = {} + self.by_source: dict[tuple[str, str], Track] = {} + + async def get_by_id(self, track_id: uuid.UUID) -> Track | None: + return self.by_id.get(track_id) + + async def get_by_source(self, source: str, source_id: str) -> Track | None: + return self.by_source.get((source, source_id)) + + async def add(self, **kw: object) -> Track: + now = dt.datetime.now(dt.UTC) + track = Track( + id=kw["id"], # type: ignore[arg-type] + title=str(kw["title"]), + artist_id=kw["artist_id"], # type: ignore[arg-type] + album_id=None, + storage_uri=kw["storage_uri"], # type: ignore[arg-type] + file_format=kw["file_format"], # type: ignore[arg-type] + file_size=kw["file_size"], # type: ignore[arg-type] + source=str(kw["source"]), + source_id=str(kw["source_id"]), + duration_seconds=None, + genre=None, + year=None, + track_number=None, + metadata_status=str(kw["metadata_status"]), + metadata_error=None, + enriched_at=None, + availability=str(kw["availability"]), + created_at=now, + updated_at=now, + ) + self.by_id[track.id] = track + self.by_source[(track.source, track.source_id)] = track + return track + + +def _local_track(source: str = "youtube", source_id: str = "local-1") -> Track: + now = dt.datetime.now(dt.UTC) + return Track( + id=uuid.uuid4(), + title="Already Here", + artist_id=uuid.uuid4(), + album_id=None, + storage_uri="tracks/aa/aa.m4a", + file_format="m4a", + file_size=123, + source=source, + source_id=source_id, + duration_seconds=None, + genre=None, + year=None, + track_number=None, + metadata_status="pending", + metadata_error=None, + enriched_at=None, + availability="local", + created_at=now, + updated_at=now, + ) + + +class FakeJobRepo: + def __init__(self) -> None: + self.jobs: dict[uuid.UUID, DownloadJob] = {} + self.active: dict[tuple[str, str], DownloadJob] = {} + + async def add(self, **kw: object) -> DownloadJob: + now = dt.datetime.now(dt.UTC) + job = DownloadJob( + id=uuid.uuid4(), + source=str(kw["source"]), + source_id=kw.get("source_id"), # type: ignore[arg-type] + query=kw.get("query"), # type: ignore[arg-type] + requested_by=kw.get("requested_by"), # type: ignore[arg-type] + status="queued", + progress=0.0, + error_message=None, + retry_count=0, + track_id=None, + created_at=now, + updated_at=now, + ) + self.jobs[job.id] = job + return job + + async def get_by_id(self, job_id: uuid.UUID) -> DownloadJob | None: + return self.jobs.get(job_id) + + async def get_active_for_source(self, source: str, source_id: str) -> DownloadJob | None: + return self.active.get((source, source_id)) + + async def set_status(self, job_id: uuid.UUID, **kw: object) -> None: + job = self.jobs[job_id] + track_id = kw.get("track_id") + if track_id is not None: + self.jobs[job_id] = DownloadJob( + id=job.id, + source=job.source, + source_id=job.source_id, + query=job.query, + requested_by=job.requested_by, + status=str(kw.get("status", job.status)), + progress=job.progress, + error_message=job.error_message, + retry_count=job.retry_count, + track_id=track_id, # type: ignore[arg-type] + created_at=job.created_at, + updated_at=job.updated_at, + ) + + +def _service( + *, + tracks: FakeTrackRepo, + artists: FakeArtistRepo, + jobs: FakeJobRepo, + enqueued: list[uuid.UUID], +) -> RemoteLibraryService: + async def enqueue_materialize(job_id: uuid.UUID) -> None: + enqueued.append(job_id) + + return RemoteLibraryService( + tracks=tracks, # type: ignore[arg-type] + artists=artists, # type: ignore[arg-type] + jobs=jobs, # type: ignore[arg-type] + enqueue_materialize=enqueue_materialize, + ) + + +async def test_save_remote_creates_placeholder() -> None: + tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), [] + service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq) + + track = await service.save_remote( + source="youtube", source_id="abc", title="Bohemian Rhapsody", artist="Queen", added_by=None + ) + + assert track.availability == "remote" + assert track.storage_uri is None + assert track.file_format is None + assert track.source == "youtube" + assert track.source_id == "abc" + + +async def test_save_remote_is_idempotent() -> None: + tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), [] + service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq) + + first = await service.save_remote( + source="youtube", source_id="abc", title="A", artist="Queen", added_by=None + ) + second = await service.save_remote( + source="youtube", source_id="abc", title="B", artist="Other", added_by=None + ) + + assert first.id == second.id + assert second.title == "A" # untouched by the second call + + +async def test_materialize_already_local_is_noop() -> None: + tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), [] + track = _local_track() + tracks.by_id[track.id] = track + service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq) + + outcome = await service.request_materialize(track.id, requested_by=None) + + assert outcome.job is None + assert outcome.track.id == track.id + assert enq == [] + + +async def test_materialize_remote_creates_and_enqueues_job() -> None: + tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), [] + service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq) + track = await service.save_remote( + source="youtube", source_id="abc", title="A", artist="Queen", added_by=None + ) + + outcome = await service.request_materialize(track.id, requested_by=None) + + assert outcome.job is not None + assert outcome.job.source == "youtube" + assert outcome.job.source_id == "abc" + assert outcome.job.track_id == track.id + assert enq == [outcome.job.id] + + +async def test_materialize_reuses_active_job() -> None: + tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), [] + service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq) + track = await service.save_remote( + source="youtube", source_id="abc", title="A", artist="Queen", added_by=None + ) + existing = await jobs.add(source="youtube", source_id="abc", query=None, requested_by=None) + jobs.active[("youtube", "abc")] = existing + + outcome = await service.request_materialize(track.id, requested_by=None) + + assert outcome.job is not None + assert outcome.job.id == existing.id + assert enq == [] # not re-enqueued + + +async def test_materialize_missing_track_raises() -> None: + tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), [] + service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq) + + with pytest.raises(NotFoundError): + await service.request_materialize(uuid.uuid4(), requested_by=None) + + +async def test_save_remote_requires_source_id() -> None: + tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), [] + service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq) + + with pytest.raises(ValidationError): + await service.save_remote( + source="youtube", source_id=" ", title="A", artist=None, added_by=None + )