Compare commits

..

2 Commits

Author SHA1 Message Date
Senko-san 78007461e1 feat(sources): YouTube Music search + download pipeline (§1C/§1E)
Docker Build & Publish / build (push) Successful in 2m39s
Docker Build & Publish / push (push) Failing after 36s
Docker Build & Publish / Prune old image versions (push) Has been skipped
Pluggable fetch source: ytmusicapi search + yt-dlp download (cookies-file guard), DownloadJob entity/repo + DownloadService, download_task worker with exponential-backoff retries, and wired /search, /sources/{source}/search, and /downloads endpoints. Adds youtube_enabled/cookies config, yt-dlp+ytmusicapi deps, and the download_jobs.track_id migration. Snapshot also bundles in-progress storage/tracks/acoustid edits.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-14 14:04:33 +03:00
Senko-san ea880edd57 feat(tracks): filter track list by ingest source
Docker Build & Publish / build (push) Has been cancelled
Docker Build & Publish / push (push) Has been cancelled
Docker Build & Publish / Prune old image versions (push) Has been cancelled
Add an optional `source` filter to `GET /api/v1/tracks` (and the
`TrackRepository.list`/`count` port + SQLAlchemy adapter). Lets clients
query, e.g., only uploaded tracks (`?source=upload`) newest-first — the
backing for the webui's persistent "Recently uploaded" view.

- test: upload then list with `?source=upload` (hit) / `?source=youtube`
  (miss)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-14 01:35:51 +03:00
34 changed files with 2677 additions and 817 deletions
@@ -0,0 +1,47 @@
"""download_jobs: link finished job to its imported track
Revision ID: 20260614_dl_track_id
Revises: 20260613_enrich_outcome
Create Date: 2026-06-14 10:00:00.000000
Adds ``download_jobs.track_id`` (nullable FK → ``tracks.id``) so a completed
download can point at the library track it produced — the §A5 download manager
links a "done" job to the track, and re-runs can tell a job already imported
(plan §6.1).
"""
from __future__ import annotations
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
revision: str = "20260614_dl_track_id"
down_revision: str | None = "20260613_enrich_outcome"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.add_column(
"download_jobs",
sa.Column("track_id", sa.Uuid(), nullable=True),
)
op.create_foreign_key(
op.f("fk_download_jobs_track_id_tracks"),
"download_jobs",
"tracks",
["track_id"],
["id"],
ondelete="SET NULL",
)
def downgrade() -> None:
op.drop_constraint(
op.f("fk_download_jobs_track_id_tracks"),
"download_jobs",
type_="foreignkey",
)
op.drop_column("download_jobs", "track_id")
+16 -4
View File
@@ -15,6 +15,7 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.application.auth_service import AuthService from app.application.auth_service import AuthService
from app.application.download_service import DownloadService
from app.application.metadata_service import MetadataEnrichmentService from app.application.metadata_service import MetadataEnrichmentService
from app.application.streaming_service import StreamingService from app.application.streaming_service import StreamingService
from app.application.subsonic_auth_service import SubsonicAuthService from app.application.subsonic_auth_service import SubsonicAuthService
@@ -29,6 +30,7 @@ from app.infrastructure.db import get_sessionmaker
from app.infrastructure.db.repositories import ( from app.infrastructure.db.repositories import (
SqlAlchemyAlbumRepository, SqlAlchemyAlbumRepository,
SqlAlchemyArtistRepository, SqlAlchemyArtistRepository,
SqlAlchemyDownloadJobRepository,
SqlAlchemyHistoryRepository, SqlAlchemyHistoryRepository,
SqlAlchemyLikeRepository, SqlAlchemyLikeRepository,
SqlAlchemyPlaylistRepository, SqlAlchemyPlaylistRepository,
@@ -41,7 +43,7 @@ from app.infrastructure.metadata.fingerprint import FpcalcFingerprinter
from app.infrastructure.metadata.tags import MutagenTagReader from app.infrastructure.metadata.tags import MutagenTagReader
from app.infrastructure.sources.registry import SourceRegistry, build_source_registry from app.infrastructure.sources.registry import SourceRegistry, build_source_registry
from app.infrastructure.storage.provider import get_file_storage from app.infrastructure.storage.provider import get_file_storage
from app.workers.queue import enqueue_enrich from app.workers.queue import enqueue_download, enqueue_enrich
async def get_session() -> AsyncIterator[AsyncSession]: async def get_session() -> AsyncIterator[AsyncSession]:
@@ -136,9 +138,7 @@ def get_streaming_service(session: SessionDep, storage: FileStorageDep) -> Strea
) )
def get_metadata_service( def get_metadata_service(session: SessionDep, storage: FileStorageDep) -> MetadataEnrichmentService:
session: SessionDep, storage: FileStorageDep
) -> MetadataEnrichmentService:
"""Wires the §6.2 fingerprint/AcoustID adapters for read-only, inline use """Wires the §6.2 fingerprint/AcoustID adapters for read-only, inline use
(the metadata editor's "find matches" — §A7). The full pipeline (incl. (the metadata editor's "find matches" — §A7). The full pipeline (incl.
cover art) stays in the worker (`tasks/enrich_task.py`).""" cover art) stays in the worker (`tasks/enrich_task.py`)."""
@@ -161,9 +161,21 @@ def get_metadata_service(
) )
def get_download_service(session: SessionDep, storage: FileStorageDep) -> DownloadService:
return DownloadService(
jobs=SqlAlchemyDownloadJobRepository(session),
tracks=SqlAlchemyTrackRepository(session),
artists=SqlAlchemyArtistRepository(session),
storage=storage,
enqueue_download=enqueue_download,
enqueue_enrich=enqueue_enrich,
)
UploadServiceDep = Annotated[UploadService, Depends(get_upload_service)] UploadServiceDep = Annotated[UploadService, Depends(get_upload_service)]
StreamingServiceDep = Annotated[StreamingService, Depends(get_streaming_service)] StreamingServiceDep = Annotated[StreamingService, Depends(get_streaming_service)]
MetadataServiceDep = Annotated[MetadataEnrichmentService, Depends(get_metadata_service)] MetadataServiceDep = Annotated[MetadataEnrichmentService, Depends(get_metadata_service)]
DownloadServiceDep = Annotated[DownloadService, Depends(get_download_service)]
# -- library repository deps --------------------------------------------------- # -- library repository deps ---------------------------------------------------
+59
View File
@@ -0,0 +1,59 @@
"""Schemas for the download job endpoints (§A5 download manager)."""
import datetime as dt
import uuid
from pydantic import BaseModel, Field
from app.domain.entities.download import DownloadJob
class DownloadCreate(BaseModel):
"""Request to download an item discovered on a fetch source."""
source: str
source_id: str = Field(min_length=1)
# Optional free-text the result came from — stored for display only.
query: str | None = None
class DownloadJobOut(BaseModel):
id: uuid.UUID
source: str
source_id: str | None
query: str | None
status: str
progress: float
error_message: str | None
retry_count: int
track_id: uuid.UUID | None
created_at: dt.datetime
updated_at: dt.datetime
@classmethod
def from_entity(cls, job: DownloadJob) -> DownloadJobOut:
return cls(
id=job.id,
source=job.source,
source_id=job.source_id,
query=job.query,
status=job.status,
progress=job.progress,
error_message=job.error_message,
retry_count=job.retry_count,
track_id=job.track_id,
created_at=job.created_at,
updated_at=job.updated_at,
)
class DownloadCreateResponse(BaseModel):
"""Result of requesting a download.
``already_in_library`` → the item was already imported (``track_id`` set, no
job). Otherwise ``job`` describes the queued (or already in-flight) download.
"""
already_in_library: bool
track_id: uuid.UUID | None
job: DownloadJobOut | None
+35
View File
@@ -0,0 +1,35 @@
"""Schemas for searching external (fetch) sources — the §A4 discover screen."""
from pydantic import BaseModel
from app.domain.sources import SearchResult
class ExternalSearchResultOut(BaseModel):
source: str
source_id: str
title: str
artist: str | None
album: str | None
duration_seconds: int | None
thumbnail_url: str | None
@classmethod
def from_entity(cls, r: SearchResult) -> ExternalSearchResultOut:
return cls(
source=r.source,
source_id=r.source_id,
title=r.title,
artist=r.artist,
album=r.album,
duration_seconds=r.duration_seconds,
thumbnail_url=r.thumbnail_url,
)
class ExternalSearchResponse(BaseModel):
"""Flat list of hits across one or more searchable sources, plus the names of
sources that were unavailable (so the UI can show a soft warning)."""
results: list[ExternalSearchResultOut]
searched_sources: list[str]
+60 -18
View File
@@ -1,36 +1,78 @@
"""Download job endpoints. Heavy work is dispatched to arq workers.""" """Download job endpoints (§A5). Heavy work is dispatched to arq workers — these
handlers only create/inspect/cancel/retry job records."""
import uuid import uuid
from typing import Any
from fastapi import APIRouter from fastapi import APIRouter, Query, Response
from app.api.deps import CurrentUser, DownloadServiceDep
from app.api.schemas.download import DownloadCreate, DownloadCreateResponse, DownloadJobOut
from app.api.schemas.pagination import PagedResponse
router = APIRouter(prefix="/downloads", tags=["downloads"]) router = APIRouter(prefix="/downloads", tags=["downloads"])
@router.get("") @router.get("")
async def list_downloads() -> Any: ... async def list_downloads(
service: DownloadServiceDep,
user: CurrentUser,
status: str | None = Query(default=None),
mine: bool = Query(default=False),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
) -> PagedResponse[DownloadJobOut]:
jobs, total = await service.list(
requested_by=user.id if mine else None,
status=status,
limit=limit,
offset=offset,
)
return PagedResponse(
items=[DownloadJobOut.from_entity(j) for j in jobs],
total=total,
limit=limit,
offset=offset,
)
@router.post("") @router.post("", status_code=202)
async def create_download() -> Any: ... async def create_download(
body: DownloadCreate,
service: DownloadServiceDep,
user: CurrentUser,
) -> DownloadCreateResponse:
result = await service.request(
source=body.source,
source_id=body.source_id,
query=body.query,
requested_by=user.id,
)
return DownloadCreateResponse(
already_in_library=result.already_in_library,
track_id=result.track_id,
job=DownloadJobOut.from_entity(result.job) if result.job is not None else None,
)
@router.get("/{job_id}") @router.get("/{job_id}")
async def get_download(job_id: uuid.UUID) -> Any: ... async def get_download(
job_id: uuid.UUID, service: DownloadServiceDep, _: CurrentUser
) -> DownloadJobOut:
job = await service.get(job_id)
return DownloadJobOut.from_entity(job)
@router.delete("/{job_id}") @router.delete("/{job_id}", status_code=204)
async def cancel_download(job_id: uuid.UUID) -> Any: ... async def cancel_download(
job_id: uuid.UUID, service: DownloadServiceDep, _: CurrentUser
) -> Response:
await service.cancel(job_id)
return Response(status_code=204)
@router.post("/{job_id}/retry") @router.post("/{job_id}/retry")
async def retry_download(job_id: uuid.UUID) -> Any: ... async def retry_download(
job_id: uuid.UUID, service: DownloadServiceDep, _: CurrentUser
) -> DownloadJobOut:
@router.post("/pause") job = await service.retry(job_id)
async def pause_downloads() -> Any: ... return DownloadJobOut.from_entity(job)
@router.post("/resume")
async def resume_downloads() -> Any: ...
+22 -4
View File
@@ -1,12 +1,11 @@
"""Search endpoints: global and library-scoped.""" """Search endpoints: global and library-scoped."""
from typing import Any
from fastapi import APIRouter, Query from fastapi import APIRouter, Query
from app.api.deps import AlbumRepoDep, ArtistRepoDep, CurrentUser, TrackRepoDep from app.api.deps import AlbumRepoDep, ArtistRepoDep, CurrentUser, SourceRegistryDep, TrackRepoDep
from app.api.schemas.album import AlbumOut from app.api.schemas.album import AlbumOut
from app.api.schemas.artist import ArtistOut from app.api.schemas.artist import ArtistOut
from app.api.schemas.external_search import ExternalSearchResponse, ExternalSearchResultOut
from app.api.schemas.search import LibrarySearchResponse from app.api.schemas.search import LibrarySearchResponse
from app.api.schemas.track import TrackOut from app.api.schemas.track import TrackOut
from app.api.v1.albums import _build_album_out from app.api.v1.albums import _build_album_out
@@ -16,7 +15,26 @@ router = APIRouter(prefix="/search", tags=["search"])
@router.get("") @router.get("")
async def search(_: CurrentUser) -> Any: ... async def search(
_: CurrentUser,
registry: SourceRegistryDep,
q: str = Query(min_length=1),
limit: int = Query(20, ge=1, le=50),
) -> ExternalSearchResponse:
"""Search every available fetch source and merge the hits (§A4 discover).
A source that is down contributes nothing rather than failing the whole
request (graceful degradation); only available sources are reported as
searched."""
results: list[ExternalSearchResultOut] = []
searched: list[str] = []
for backend in registry.searchables():
if not backend.is_available():
continue
searched.append(backend.name)
hits = await backend.search(q, limit=limit)
results.extend(ExternalSearchResultOut.from_entity(h) for h in hits)
return ExternalSearchResponse(results=results, searched_sources=searched)
@router.get("/library") @router.get("/library")
+20 -9
View File
@@ -1,14 +1,13 @@
"""External source endpoints: enumerate sources and trigger imports. """External source endpoints: enumerate sources, search, and trigger imports.
Listing/health are read-only (any authenticated user). Scanning a source is an Listing/health/search are read-only (any authenticated user). Scanning a source
admin action and runs in a worker — the endpoint only enqueues it. is an admin action and runs in a worker — the endpoint only enqueues it.
""" """
from typing import Any from fastapi import APIRouter, Query
from fastapi import APIRouter
from app.api.deps import CurrentUser, SourceRegistryDep, SuperUser from app.api.deps import CurrentUser, SourceRegistryDep, SuperUser
from app.api.schemas.external_search import ExternalSearchResponse, ExternalSearchResultOut
from app.api.schemas.source import ScanResponse, SourceHealthOut, SourceInfoOut from app.api.schemas.source import ScanResponse, SourceHealthOut, SourceInfoOut
from app.domain.errors import DependencyUnavailableError from app.domain.errors import DependencyUnavailableError
from app.workers.queue import enqueue from app.workers.queue import enqueue
@@ -39,6 +38,18 @@ async def source_health(
@router.get("/{source}/search") @router.get("/{source}/search")
async def search_source(source: str, _: CurrentUser) -> Any: async def search_source(
# Search is for fetch-style sources (youtube, …) — not yet implemented. source: str,
... _: CurrentUser,
registry: SourceRegistryDep,
q: str = Query(min_length=1),
limit: int = Query(20, ge=1, le=50),
) -> ExternalSearchResponse:
backend = registry.searchable(source) # 404 if unknown, 422 if not searchable
if not backend.is_available():
raise DependencyUnavailableError(f"Source {source!r} is not available.")
results = await backend.search(q, limit=limit)
return ExternalSearchResponse(
results=[ExternalSearchResultOut.from_entity(r) for r in results],
searched_sources=[source],
)
+1 -2
View File
@@ -63,8 +63,7 @@ async def get_storage_stats(
by_metadata_status=stats.by_metadata_status, by_metadata_status=stats.by_metadata_status,
by_source=stats.by_source, by_source=stats.by_source,
top_genres=[ top_genres=[
GenreCountOut(genre=genre, track_count=count) GenreCountOut(genre=genre, track_count=count) for genre, count in genres[:_TOP_GENRES]
for genre, count in genres[:_TOP_GENRES]
], ],
disk=DiskUsageOut(total=disk.total, used=disk.used, free=disk.free) if disk else None, disk=DiskUsageOut(total=disk.total, used=disk.used, free=disk.free) if disk else None,
) )
+3 -1
View File
@@ -71,6 +71,7 @@ async def list_tracks(
artist_id: uuid.UUID | None = None, artist_id: uuid.UUID | None = None,
album_id: uuid.UUID | None = None, album_id: uuid.UUID | None = None,
q: str | None = None, q: str | None = None,
source: str | None = Query(None, max_length=32),
sort_by: str = Query("created_at", pattern="^(title|created_at|artist)$"), sort_by: str = Query("created_at", pattern="^(title|created_at|artist)$"),
order: str = Query("desc", pattern="^(asc|desc)$"), order: str = Query("desc", pattern="^(asc|desc)$"),
limit: int = Query(50, ge=1, le=200), limit: int = Query(50, ge=1, le=200),
@@ -80,12 +81,13 @@ async def list_tracks(
artist_id=artist_id, artist_id=artist_id,
album_id=album_id, album_id=album_id,
q=q, q=q,
source=source,
sort_by=sort_by, sort_by=sort_by,
order=order, order=order,
limit=limit, limit=limit,
offset=offset, offset=offset,
) )
total = await track_repo.count(artist_id=artist_id, album_id=album_id, q=q) total = await track_repo.count(artist_id=artist_id, album_id=album_id, q=q, source=source)
artist_ids = list({t.artist_id for t in tracks}) artist_ids = list({t.artist_id for t in tracks})
album_ids = list({t.album_id for t in tracks if t.album_id is not None}) album_ids = list({t.album_id for t in tracks if t.album_id is not None})
+183
View File
@@ -0,0 +1,183 @@
"""DownloadService — request external downloads and import their results.
Two roles (plan §6.1):
* **Request side** (HTTP): validate + dedup a download request, create a
``queued`` job, and enqueue the worker. Dedup is on ``(source, source_id)``
against both the library (already imported) and in-flight jobs (a double-click
must not queue twice) — idempotency per CLAUDE.md.
* **Worker side**: ``store_result`` turns a backend's :class:`DownloadResult`
into a managed file + minimal ``pending`` track (sibling of
:class:`~app.application.import_service.LibraryImportService`); enrichment
(§6.2) fills the rest.
The fingerprint-level dedup (a different id that turns out to be the same audio)
happens later in enrichment, where the fingerprint is computed.
"""
import contextlib
import uuid
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
import anyio
from app.core.logging import get_logger
from app.domain.entities.download import DownloadJob
from app.domain.errors import NotFoundError, ValidationError
from app.domain.ports import (
ArtistRepository,
DownloadJobRepository,
FileStorage,
TrackRepository,
)
from app.domain.sources import DownloadResult
log = get_logger(__name__)
_UNKNOWN_ARTIST = "Unknown Artist"
# (job_id) -> None — enqueue the download worker, deferred so the job row is
# committed before the worker reads it (same pattern as enrich).
DownloadEnqueuer = Callable[[uuid.UUID], Awaitable[None]]
EnrichEnqueuer = Callable[[uuid.UUID], Awaitable[None]]
@dataclass(frozen=True)
class DownloadRequest:
"""Outcome of asking for a download.
Exactly one of the three states holds: the item is already in the library
(``track_id`` set, ``already_in_library``), a job already covers it / was
just created (``job`` set), so the UI can route to the download manager.
"""
job: DownloadJob | None
track_id: uuid.UUID | None
already_in_library: bool
class DownloadService:
def __init__(
self,
*,
jobs: DownloadJobRepository,
tracks: TrackRepository,
artists: ArtistRepository,
storage: FileStorage,
enqueue_download: DownloadEnqueuer | None = None,
enqueue_enrich: EnrichEnqueuer | None = None,
) -> None:
self._jobs = jobs
self._tracks = tracks
self._artists = artists
self._storage = storage
self._enqueue_download = enqueue_download
self._enqueue_enrich = enqueue_enrich
# -- request side ---------------------------------------------------------
async def request(
self,
*,
source: str,
source_id: str,
query: str | None,
requested_by: uuid.UUID | None,
) -> DownloadRequest:
source_id = source_id.strip()
if not source_id:
raise ValidationError("A source_id is required to download.")
existing = await self._tracks.get_by_source(source, source_id)
if existing is not None:
return DownloadRequest(job=None, track_id=existing.id, already_in_library=True)
active = await self._jobs.get_active_for_source(source, source_id)
if active is not None:
return DownloadRequest(job=active, track_id=None, already_in_library=False)
job = await self._jobs.add(
source=source,
source_id=source_id,
query=query,
requested_by=requested_by,
)
if self._enqueue_download is not None:
await self._enqueue_download(job.id)
return DownloadRequest(job=job, track_id=None, already_in_library=False)
async def list(
self,
*,
requested_by: uuid.UUID | None,
status: str | None,
limit: int,
offset: int,
) -> tuple[list[DownloadJob], int]:
jobs = await self._jobs.list(
requested_by=requested_by, status=status, limit=limit, offset=offset
)
total = await self._jobs.count(requested_by=requested_by, status=status)
return jobs, total
async def get(self, job_id: uuid.UUID) -> DownloadJob:
job = await self._jobs.get_by_id(job_id)
if job is None:
raise NotFoundError(f"Download job {job_id} not found.")
return job
async def cancel(self, job_id: uuid.UUID) -> None:
"""Remove the job record. True mid-flight cancellation of an in-progress
yt-dlp download is out of scope (MVP); the worker tolerates a vanished
job row (its status writes become no-ops)."""
job = await self._jobs.get_by_id(job_id)
if job is None:
raise NotFoundError(f"Download job {job_id} not found.")
await self._jobs.delete(job_id)
async def retry(self, job_id: uuid.UUID) -> DownloadJob:
job = await self.get(job_id)
await self._jobs.set_status(job_id, status="queued", error_message=None)
if self._enqueue_download is not None:
await self._enqueue_download(job_id)
refreshed = await self._jobs.get_by_id(job_id)
return refreshed if refreshed is not None else job
# -- worker side ----------------------------------------------------------
async def store_result(
self,
*,
source: str,
result: DownloadResult,
requested_by: uuid.UUID | None,
) -> uuid.UUID:
"""Store a freshly downloaded file and create a minimal ``pending`` track.
Returns the new track id (the caller enqueues enrichment after commit).
The temp file produced by the backend is always removed."""
track_id = uuid.uuid4()
key = f"tracks/{str(track_id)[:2]}/{track_id}.{result.file_format}"
try:
await self._storage.save_file(key, result.path)
try:
artist = await self._artists.get_or_create(_UNKNOWN_ARTIST)
await self._tracks.add(
id=track_id,
title=result.suggested_title,
artist_id=artist.id,
storage_uri=key,
file_format=result.file_format,
file_size=result.file_size,
source=source,
source_id=result.source_id,
metadata_status="pending",
added_by=requested_by,
)
except Exception:
with contextlib.suppress(Exception):
await self._storage.delete(key)
raise
finally:
with contextlib.suppress(Exception):
await anyio.Path(result.path).unlink(missing_ok=True)
return track_id
+8
View File
@@ -71,6 +71,9 @@ class Settings(BaseSettings):
media_path: Path = Path("/data/media") media_path: Path = Path("/data/media")
transcode_cache_path: Path = Path("/data/transcode-cache") transcode_cache_path: Path = Path("/data/transcode-cache")
max_parallel_downloads: int = 2 max_parallel_downloads: int = 2
# How many times the download worker retries a failed fetch (yt-dlp fails
# often) before marking the job ``failed`` — exponential backoff between tries.
download_max_retries: int = 3
storage_backend: Literal["local", "s3"] = "local" storage_backend: Literal["local", "s3"] = "local"
upload_tmp_dir: Path | None = None upload_tmp_dir: Path | None = None
@@ -100,6 +103,11 @@ class Settings(BaseSettings):
# deployments should set their own contact email; see # deployments should set their own contact email; see
# ``musicbrainz_user_agent`` below for how it's used. # ``musicbrainz_user_agent`` below for how it's used.
musicbrainz_owner_email: str | None = None musicbrainz_owner_email: str | None = None
# ``youtube`` fetch source (search + download via ytmusicapi/yt-dlp). Enabled
# by default; the source still reports unavailable if the libs aren't present.
youtube_enabled: bool = True
# Optional cookies file (Netscape format) for yt-dlp — lets it fetch
# age-restricted / region-locked items via an authenticated session.
youtube_cookies_path: Path | None = None youtube_cookies_path: Path | None = None
# -- enrichment ------------------------------------------------------- # -- enrichment -------------------------------------------------------
+2
View File
@@ -2,6 +2,7 @@
from app.domain.entities.album import Album from app.domain.entities.album import Album
from app.domain.entities.cover import CoverArt from app.domain.entities.cover import CoverArt
from app.domain.entities.download import DownloadJob
from app.domain.entities.history import PlayHistoryEntry from app.domain.entities.history import PlayHistoryEntry
from app.domain.entities.like import Like from app.domain.entities.like import Like
from app.domain.entities.metadata import AudioTags, Fingerprint, RecordingMatch from app.domain.entities.metadata import AudioTags, Fingerprint, RecordingMatch
@@ -22,6 +23,7 @@ __all__ = [
"CoverArt", "CoverArt",
"Credentials", "Credentials",
"DiskUsage", "DiskUsage",
"DownloadJob",
"Fingerprint", "Fingerprint",
"FormatBreakdown", "FormatBreakdown",
"LibraryStats", "LibraryStats",
+26
View File
@@ -0,0 +1,26 @@
"""Download job domain entity (plan §6.1).
A queued fetch from an external source, tracked through its lifecycle so the UI
download manager (screen §A5) can show progress, errors, and retries. The
``status`` strings mirror :class:`~app.infrastructure.db.models.enums.DownloadStatus`.
"""
import datetime as dt
import uuid
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class DownloadJob:
id: uuid.UUID
source: str
source_id: str | None
query: str | None
requested_by: uuid.UUID | None
status: str
progress: float
error_message: str | None
retry_count: int
track_id: uuid.UUID | None
created_at: dt.datetime
updated_at: dt.datetime
+81 -2
View File
@@ -7,7 +7,7 @@ are bound to these ports at the composition root (``app.api.deps``).
import datetime as dt import datetime as dt
import uuid import uuid
from collections.abc import AsyncIterator, Iterator from collections.abc import AsyncIterator, Awaitable, Callable, Iterator
from contextlib import AbstractAsyncContextManager from contextlib import AbstractAsyncContextManager
from pathlib import Path from pathlib import Path
from typing import Protocol from typing import Protocol
@@ -18,6 +18,7 @@ from app.domain.entities import (
CoverArt, CoverArt,
Credentials, Credentials,
DiskUsage, DiskUsage,
DownloadJob,
Fingerprint, Fingerprint,
LibraryStats, LibraryStats,
Like, Like,
@@ -29,9 +30,14 @@ from app.domain.entities import (
User, User,
) )
from app.domain.entities.track import Artist, Track from app.domain.entities.track import Artist, Track
from app.domain.sources import SourceFile, SourceInfo from app.domain.sources import DownloadResult, RawMetadata, SearchResult, SourceFile, SourceInfo
from app.domain.tokens import IssuedToken, TokenClaims, TokenType from app.domain.tokens import IssuedToken, TokenClaims, TokenType
# A fetch source reports download progress as a fraction in [0.0, 1.0]. It's a
# plain callback (not a port) because it's an inversion of control supplied per
# call by the worker, which persists it to the download job.
ProgressCallback = Callable[[float], Awaitable[None]]
class UserRepository(Protocol): class UserRepository(Protocol):
async def get_by_id(self, user_id: uuid.UUID) -> User | None: ... async def get_by_id(self, user_id: uuid.UUID) -> User | None: ...
@@ -145,6 +151,7 @@ class TrackRepository(Protocol):
artist_id: uuid.UUID | None, artist_id: uuid.UUID | None,
album_id: uuid.UUID | None, album_id: uuid.UUID | None,
q: str | None, q: str | None,
source: str | None = None,
sort_by: str, sort_by: str,
order: str, order: str,
limit: int, limit: int,
@@ -156,6 +163,7 @@ class TrackRepository(Protocol):
artist_id: uuid.UUID | None, artist_id: uuid.UUID | None,
album_id: uuid.UUID | None, album_id: uuid.UUID | None,
q: str | None, q: str | None,
source: str | None = None,
) -> int: ... ) -> int: ...
async def update( async def update(
self, self,
@@ -273,6 +281,54 @@ class HistoryRepository(Protocol):
async def count(self, *, user_id: uuid.UUID) -> int: ... async def count(self, *, user_id: uuid.UUID) -> int: ...
class DownloadJobRepository(Protocol):
"""Persistence for download jobs (plan §6.1). Drives the §A5 download manager
and the worker's retry/backoff loop."""
async def add(
self,
*,
source: str,
source_id: str | None,
query: str | None,
requested_by: uuid.UUID | None,
) -> DownloadJob: ...
async def get_by_id(self, job_id: uuid.UUID) -> DownloadJob | None: ...
async def get_active_for_source(self, source: str, source_id: str) -> DownloadJob | None:
"""An unfinished (queued/downloading/enriching) job for the same item, if
any — used to dedup before enqueuing so a double-click can't queue twice."""
...
async def list(
self,
*,
requested_by: uuid.UUID | None,
status: str | None,
limit: int,
offset: int,
) -> list[DownloadJob]: ...
async def count(self, *, requested_by: uuid.UUID | None, status: str | None) -> int: ...
async def set_status(
self,
job_id: uuid.UUID,
*,
status: str,
error_message: str | None = None,
track_id: uuid.UUID | None = None,
) -> None: ...
async def set_progress(self, job_id: uuid.UUID, progress: float) -> None: ...
async def increment_retry(self, job_id: uuid.UUID) -> int:
"""Bump ``retry_count`` and return the new value."""
...
async def delete(self, job_id: uuid.UUID) -> None: ...
async def failure_rate(self, source: str, *, since: dt.datetime) -> float:
"""Fraction of jobs for ``source`` created since ``since`` that ended
``failed`` (0.0 when there are none) — drives the §A5 "source unhealthy"
banner."""
...
class SourceBackend(Protocol): class SourceBackend(Protocol):
"""A registered source of tracks (mounted folder, YouTube, …). """A registered source of tracks (mounted folder, YouTube, …).
@@ -291,6 +347,29 @@ class IndexableSource(SourceBackend, Protocol):
def scan(self) -> Iterator[SourceFile]: ... def scan(self) -> Iterator[SourceFile]: ...
class SearchableSource(SourceBackend, Protocol):
"""A source that can be searched by free text (e.g. YouTube Music).
Returns ``[]`` (never raises) on no results / the service being down — the
discover screen degrades to "nothing found" rather than erroring."""
async def search(self, query: str, *, limit: int) -> list[SearchResult]: ...
class FetchableSource(SourceBackend, Protocol):
"""A source that can download a previously-discovered item to local disk.
``fetch`` resolves a ``source_id`` (from a :class:`SearchResult`) into a file
and reports progress through ``on_progress``. It runs only in a worker (heavy
I/O) and raises on failure so the download task can retry with backoff."""
async def fetch(
self, source_id: str, *, on_progress: ProgressCallback | None = None
) -> DownloadResult: ...
async def get_metadata(self, source_id: str) -> RawMetadata | None: ...
# -- metadata enrichment (plan §6.2) ----------------------------------------- # -- metadata enrichment (plan §6.2) -----------------------------------------
class AudioTagReader(Protocol): class AudioTagReader(Protocol):
"""Reads embedded tags from a local audio file. Returns ``None`` only when """Reads embedded tags from a local audio file. Returns ``None`` only when
+58 -2
View File
@@ -10,8 +10,14 @@ here — a source yields a file plus a minimal title; enrichment (plan §6.2) fi
the rest later, so this stays a thin discovery layer (CLAUDE.md: no duplicated the rest later, so this stays a thin discovery layer (CLAUDE.md: no duplicated
business logic).""" business logic)."""
from dataclasses import dataclass from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any
# A source's ``kind`` describes which ports it satisfies, so the UI/admin can
# tell an indexed folder from a searchable fetch-source. A backend may be both.
KIND_INDEXABLE = "indexable" # enumerates files already on disk (local folder)
KIND_FETCH = "fetch" # searches + downloads from an external service (YTM, …)
@dataclass(frozen=True, slots=True) @dataclass(frozen=True, slots=True)
@@ -20,7 +26,7 @@ class SourceInfo:
name: str name: str
label: str label: str
kind: str # "indexable" (more kinds — search/download — arrive with youtube) kind: str # KIND_INDEXABLE | KIND_FETCH
available: bool available: bool
@@ -37,3 +43,53 @@ class SourceFile:
suggested_title: str suggested_title: str
file_format: str file_format: str
file_size: int file_size: int
@dataclass(frozen=True, slots=True)
class SearchResult:
"""One hit from a searchable source (plan §5), shown on the discover screen.
``source_id`` is the stable handle the same backend later resolves in
``fetch`` — it must round-trip a download request without re-searching.
``raw`` carries the backend's untouched payload for debugging / future use.
"""
source: str
source_id: str
title: str
artist: str | None
album: str | None
duration_seconds: int | None
thumbnail_url: str | None
raw: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True, slots=True)
class RawMetadata:
"""Metadata a fetch-source can offer about an item *before* enrichment.
Best-effort and source-shaped — the canonical metadata still comes from the
enrichment pipeline (plan §6.2). Used to seed a more useful provisional
title than a bare id while a download is queued."""
title: str | None
artist: str | None
album: str | None
year: int | None
extra: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True, slots=True)
class DownloadResult:
"""A file a fetch-source produced on local disk (plan §5).
``path`` is a temp file the caller owns: it is stored into managed storage
and then removed (same lifecycle as an upload). ``source_id`` is echoed back
because some backends only learn the canonical id during the download."""
source_id: str
path: Path
file_format: str
file_size: int
bitrate: int | None
suggested_title: str
@@ -35,3 +35,9 @@ class DownloadJobModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
progress: Mapped[float] = mapped_column(Float, nullable=False, default=0.0) progress: Mapped[float] = mapped_column(Float, nullable=False, default=0.0)
error_message: Mapped[str | None] = mapped_column(Text, nullable=True) error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
retry_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0) retry_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
# Set once the download finishes and the track is imported — lets the UI
# link a completed job to its library track.
track_id: Mapped[uuid.UUID | None] = mapped_column(
ForeignKey("tracks.id", ondelete="SET NULL"),
nullable=True,
)
@@ -2,6 +2,9 @@
from app.infrastructure.db.repositories.album_repository import SqlAlchemyAlbumRepository from app.infrastructure.db.repositories.album_repository import SqlAlchemyAlbumRepository
from app.infrastructure.db.repositories.artist_repository import SqlAlchemyArtistRepository from app.infrastructure.db.repositories.artist_repository import SqlAlchemyArtistRepository
from app.infrastructure.db.repositories.download_job_repository import (
SqlAlchemyDownloadJobRepository,
)
from app.infrastructure.db.repositories.history_repository import SqlAlchemyHistoryRepository from app.infrastructure.db.repositories.history_repository import SqlAlchemyHistoryRepository
from app.infrastructure.db.repositories.like_repository import SqlAlchemyLikeRepository from app.infrastructure.db.repositories.like_repository import SqlAlchemyLikeRepository
from app.infrastructure.db.repositories.playlist_repository import SqlAlchemyPlaylistRepository from app.infrastructure.db.repositories.playlist_repository import SqlAlchemyPlaylistRepository
@@ -14,6 +17,7 @@ from app.infrastructure.db.repositories.user_repository import SqlAlchemyUserRep
__all__ = [ __all__ = [
"SqlAlchemyAlbumRepository", "SqlAlchemyAlbumRepository",
"SqlAlchemyArtistRepository", "SqlAlchemyArtistRepository",
"SqlAlchemyDownloadJobRepository",
"SqlAlchemyHistoryRepository", "SqlAlchemyHistoryRepository",
"SqlAlchemyLikeRepository", "SqlAlchemyLikeRepository",
"SqlAlchemyPlaylistRepository", "SqlAlchemyPlaylistRepository",
@@ -0,0 +1,164 @@
"""Download job repository — adapter over ``AsyncSession`` (plan §6.1)."""
import datetime as dt
import uuid
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.entities.download import DownloadJob
from app.infrastructure.db.models.download_job import DownloadJobModel
from app.infrastructure.db.models.enums import DownloadStatus
# Jobs that are not yet finished — used to dedup an in-flight download.
_ACTIVE_STATUSES = (
DownloadStatus.QUEUED.value,
DownloadStatus.DOWNLOADING.value,
DownloadStatus.ENRICHING.value,
)
def _to_entity(row: DownloadJobModel) -> DownloadJob:
return DownloadJob(
id=row.id,
source=row.source,
source_id=row.source_id,
query=row.query,
requested_by=row.requested_by,
status=row.status,
progress=row.progress,
error_message=row.error_message,
retry_count=row.retry_count,
track_id=row.track_id,
created_at=row.created_at,
updated_at=row.updated_at,
)
class SqlAlchemyDownloadJobRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def add(
self,
*,
source: str,
source_id: str | None,
query: str | None,
requested_by: uuid.UUID | None,
) -> DownloadJob:
row = DownloadJobModel(
source=source,
source_id=source_id,
query=query,
requested_by=requested_by,
status=DownloadStatus.QUEUED.value,
progress=0.0,
retry_count=0,
)
self._session.add(row)
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def get_by_id(self, job_id: uuid.UUID) -> DownloadJob | None:
row = await self._session.get(DownloadJobModel, job_id)
return _to_entity(row) if row is not None else None
async def get_active_for_source(self, source: str, source_id: str) -> DownloadJob | None:
row = (
await self._session.execute(
select(DownloadJobModel)
.where(
DownloadJobModel.source == source,
DownloadJobModel.source_id == source_id,
DownloadJobModel.status.in_(_ACTIVE_STATUSES),
)
.order_by(DownloadJobModel.created_at.desc())
.limit(1)
)
).scalar_one_or_none()
return _to_entity(row) if row is not None else None
async def list(
self,
*,
requested_by: uuid.UUID | None,
status: str | None,
limit: int,
offset: int,
) -> list[DownloadJob]:
stmt = select(DownloadJobModel)
if requested_by is not None:
stmt = stmt.where(DownloadJobModel.requested_by == requested_by)
if status is not None:
stmt = stmt.where(DownloadJobModel.status == status)
stmt = stmt.order_by(DownloadJobModel.created_at.desc()).limit(limit).offset(offset)
rows = (await self._session.execute(stmt)).scalars().all()
return [_to_entity(r) for r in rows]
async def count(self, *, requested_by: uuid.UUID | None, status: str | None) -> int:
stmt = select(func.count()).select_from(DownloadJobModel)
if requested_by is not None:
stmt = stmt.where(DownloadJobModel.requested_by == requested_by)
if status is not None:
stmt = stmt.where(DownloadJobModel.status == status)
return (await self._session.execute(stmt)).scalar_one()
async def set_status(
self,
job_id: uuid.UUID,
*,
status: str,
error_message: str | None = None,
track_id: uuid.UUID | None = None,
) -> None:
row = await self._session.get(DownloadJobModel, job_id)
if row is None:
return
row.status = status
# ``error_message`` is always written: a successful transition clears a
# stale reason from an earlier failed attempt.
row.error_message = error_message
if track_id is not None:
row.track_id = track_id
if status == DownloadStatus.DONE.value:
row.progress = 1.0
await self._session.flush()
async def set_progress(self, job_id: uuid.UUID, progress: float) -> None:
row = await self._session.get(DownloadJobModel, job_id)
if row is None:
return
row.progress = max(0.0, min(1.0, progress))
await self._session.flush()
async def increment_retry(self, job_id: uuid.UUID) -> int:
row = await self._session.get(DownloadJobModel, job_id)
if row is None:
return 0
row.retry_count += 1
await self._session.flush()
return row.retry_count
async def delete(self, job_id: uuid.UUID) -> None:
row = await self._session.get(DownloadJobModel, job_id)
if row is not None:
await self._session.delete(row)
await self._session.flush()
async def failure_rate(self, source: str, *, since: dt.datetime) -> float:
total, failed = (
await self._session.execute(
select(
func.count(),
func.count().filter(DownloadJobModel.status == DownloadStatus.FAILED.value),
)
.select_from(DownloadJobModel)
.where(
DownloadJobModel.source == source,
DownloadJobModel.created_at >= since,
)
)
).one()
return (failed / total) if total else 0.0
@@ -170,6 +170,7 @@ class SqlAlchemyTrackRepository:
artist_id: uuid.UUID | None, artist_id: uuid.UUID | None,
album_id: uuid.UUID | None, album_id: uuid.UUID | None,
q: str | None, q: str | None,
source: str | None = None,
sort_by: str = "created_at", sort_by: str = "created_at",
order: str = "desc", order: str = "desc",
limit: int = 50, limit: int = 50,
@@ -180,6 +181,8 @@ class SqlAlchemyTrackRepository:
stmt = stmt.where(TrackModel.artist_id == artist_id) stmt = stmt.where(TrackModel.artist_id == artist_id)
if album_id is not None: if album_id is not None:
stmt = stmt.where(TrackModel.album_id == album_id) stmt = stmt.where(TrackModel.album_id == album_id)
if source is not None:
stmt = stmt.where(TrackModel.source == source)
if q: if q:
stmt = stmt.where(TrackModel.title.ilike(f"%{q}%")) stmt = stmt.where(TrackModel.title.ilike(f"%{q}%"))
@@ -204,12 +207,15 @@ class SqlAlchemyTrackRepository:
artist_id: uuid.UUID | None, artist_id: uuid.UUID | None,
album_id: uuid.UUID | None, album_id: uuid.UUID | None,
q: str | None, q: str | None,
source: str | None = None,
) -> int: ) -> int:
stmt = select(func.count()).select_from(TrackModel) stmt = select(func.count()).select_from(TrackModel)
if artist_id is not None: if artist_id is not None:
stmt = stmt.where(TrackModel.artist_id == artist_id) stmt = stmt.where(TrackModel.artist_id == artist_id)
if album_id is not None: if album_id is not None:
stmt = stmt.where(TrackModel.album_id == album_id) stmt = stmt.where(TrackModel.album_id == album_id)
if source is not None:
stmt = stmt.where(TrackModel.source == source)
if q: if q:
stmt = stmt.where(TrackModel.title.ilike(f"%{q}%")) stmt = stmt.where(TrackModel.title.ilike(f"%{q}%"))
return (await self._session.execute(stmt)).scalar_one() return (await self._session.execute(stmt)).scalar_one()
+1 -1
View File
@@ -78,7 +78,7 @@ class AcoustIdHttpClient:
) )
resp.raise_for_status() resp.raise_for_status()
return resp.json() # type: ignore[no-any-return] return resp.json() # type: ignore[no-any-return]
except (httpx.HTTPError, ValueError): except httpx.HTTPError, ValueError:
log.warning("acoustid_lookup_failed") log.warning("acoustid_lookup_failed")
return None return None
+27 -2
View File
@@ -2,16 +2,18 @@
Built from settings at the composition root. Only sources that are configured Built from settings at the composition root. Only sources that are configured
are registered (e.g. ``local`` appears only when ``LOCAL_MEDIA_IMPORT_PATH`` is are registered (e.g. ``local`` appears only when ``LOCAL_MEDIA_IMPORT_PATH`` is
set), so enumeration reflects what the instance can actually use. set; ``youtube`` only when ``YOUTUBE_ENABLED``), so enumeration reflects what the
instance can actually use.
""" """
from typing import cast from typing import cast
from app.core.config import Settings from app.core.config import Settings
from app.domain.errors import NotFoundError, ValidationError from app.domain.errors import NotFoundError, ValidationError
from app.domain.ports import IndexableSource, SourceBackend from app.domain.ports import FetchableSource, IndexableSource, SearchableSource, SourceBackend
from app.domain.sources import SourceInfo from app.domain.sources import SourceInfo
from app.infrastructure.sources.local_folder import LocalFolderSource from app.infrastructure.sources.local_folder import LocalFolderSource
from app.infrastructure.sources.youtube import YouTubeMusicSource
class SourceRegistry: class SourceRegistry:
@@ -30,6 +32,22 @@ class SourceRegistry:
raise ValidationError(f"Source {name!r} cannot be indexed.") raise ValidationError(f"Source {name!r} cannot be indexed.")
return cast(IndexableSource, backend) return cast(IndexableSource, backend)
def searchable(self, name: str) -> SearchableSource:
backend = self.get(name)
if not hasattr(backend, "search"):
raise ValidationError(f"Source {name!r} cannot be searched.")
return cast(SearchableSource, backend)
def fetchable(self, name: str) -> FetchableSource:
backend = self.get(name)
if not hasattr(backend, "fetch"):
raise ValidationError(f"Source {name!r} cannot download.")
return cast(FetchableSource, backend)
def searchables(self) -> list[SearchableSource]:
"""Every registered source that supports search (for cross-source search)."""
return [cast(SearchableSource, b) for b in self._by_name.values() if hasattr(b, "search")]
def infos(self) -> list[SourceInfo]: def infos(self) -> list[SourceInfo]:
return [backend.info() for backend in self._by_name.values()] return [backend.info() for backend in self._by_name.values()]
@@ -38,4 +56,11 @@ def build_source_registry(settings: Settings) -> SourceRegistry:
backends: list[SourceBackend] = [] backends: list[SourceBackend] = []
if settings.local_media_import_path is not None: if settings.local_media_import_path is not None:
backends.append(LocalFolderSource(settings.local_media_import_path)) backends.append(LocalFolderSource(settings.local_media_import_path))
if settings.youtube_enabled:
backends.append(
YouTubeMusicSource(
cookies_path=settings.youtube_cookies_path,
tmp_dir=settings.upload_tmp_dir,
)
)
return SourceRegistry(backends) return SourceRegistry(backends)
+207
View File
@@ -0,0 +1,207 @@
"""``youtube`` source — YouTube Music search + download (plan §5).
A *fetch* source: it searches YouTube Music (via ``ytmusicapi``, which returns
clean song/artist/album/duration rows) and downloads the chosen item with
``yt-dlp``. The two libraries are synchronous, so every call is bounced to a
worker thread (``anyio.to_thread``); the sync yt-dlp progress hook bridges back
to the async progress callback via ``anyio.from_thread``.
Both libraries are optional dependencies — if either is missing the source is
simply *unavailable* (it never crashes import or the registry; graceful
degradation per CLAUDE.md). The audio stream is stored **as-is** (YouTube serves
lossy Opus/AAC; re-encoding would be lossy→lossy, plan §6.6).
``source_id`` is the YouTube ``videoId`` — stable, so a re-download of the same
id is idempotent and dedups against an existing track.
"""
import functools
import tempfile
from collections.abc import Callable
from pathlib import Path
from typing import Any
import anyio
from app.core.logging import get_logger
from app.domain.ports import ProgressCallback
from app.domain.sources import (
KIND_FETCH,
DownloadResult,
RawMetadata,
SearchResult,
SourceInfo,
)
from app.infrastructure.db.models.enums import TrackSource
log = get_logger(__name__)
# Functions a caller may inject for testing (defaults do the real library work).
SearchFn = Callable[[str, int], list[dict[str, Any]]]
# (video_id, tmp_dir, progress_hook, cookies_path) -> normalized download dict
DownloadFn = Callable[[str, Path, Callable[[dict[str, Any]], None], Path | None], dict[str, Any]]
def _libs_available() -> bool:
try:
import yt_dlp # noqa: F401
import ytmusicapi # noqa: F401
except ImportError:
return False
return True
def _watch_url(video_id: str) -> str:
return f"https://music.youtube.com/watch?v={video_id}"
class YouTubeMusicSource:
"""Implements :class:`app.domain.ports.SearchableSource` and
:class:`~app.domain.ports.FetchableSource`."""
name = TrackSource.YOUTUBE.value
def __init__(
self,
*,
cookies_path: Path | None = None,
tmp_dir: Path | None = None,
search_fn: SearchFn | None = None,
download_fn: DownloadFn | None = None,
) -> None:
self._cookies_path = cookies_path
self._tmp_dir = tmp_dir
self._search_fn = search_fn or _default_search
self._download_fn = download_fn or _default_download
# Only the real library path needs the deps; an injected fn is self-contained.
self._injected = search_fn is not None or download_fn is not None
def info(self) -> SourceInfo:
return SourceInfo(
name=self.name,
label="YouTube Music",
kind=KIND_FETCH,
available=self.is_available(),
)
def is_available(self) -> bool:
return True if self._injected else _libs_available()
async def search(self, query: str, *, limit: int) -> list[SearchResult]:
query = query.strip()
if not query:
return []
try:
rows = await anyio.to_thread.run_sync(functools.partial(self._search_fn, query, limit))
except Exception:
# No results / service down → degrade to empty (plan §5, CLAUDE.md).
log.warning("ytm_search_failed", query=query)
return []
return [r for r in (self._to_result(row) for row in rows) if r is not None]
async def fetch(
self, source_id: str, *, on_progress: ProgressCallback | None = None
) -> DownloadResult:
tmp_dir = self._tmp_dir or Path(tempfile.gettempdir())
def hook(d: dict[str, Any]) -> None:
if on_progress is None or d.get("status") != "downloading":
return
total = d.get("total_bytes") or d.get("total_bytes_estimate")
done = d.get("downloaded_bytes")
if not total or done is None:
return
# Cap below 1.0 — the job only reaches 1.0 once stored + imported.
frac = min(done / total, 0.99)
# Bridge sync hook (worker thread) → async callback (event loop).
anyio.from_thread.run(on_progress, frac)
def _run() -> dict[str, Any]:
return self._download_fn(source_id, tmp_dir, hook, self._cookies_path)
info = await anyio.to_thread.run_sync(_run)
path = Path(info["filepath"])
stat = await anyio.Path(path).stat()
return DownloadResult(
source_id=source_id,
path=path,
file_format=info["file_format"],
file_size=stat.st_size,
bitrate=info.get("bitrate"),
suggested_title=info.get("title") or source_id,
)
async def get_metadata(self, source_id: str) -> RawMetadata | None:
# The search result already carries a usable title/artist, and the
# canonical metadata comes from enrichment (§6.2). A dedicated lookup is
# an optional refinement — skipped for now (returns None gracefully).
return None
def _to_result(self, row: dict[str, Any]) -> SearchResult | None:
video_id = row.get("videoId")
if not video_id:
return None # non-playable row (e.g. a video without audio id)
artists = row.get("artists") or []
artist = ", ".join(a["name"] for a in artists if a.get("name")) or None
album = (row.get("album") or {}).get("name") if isinstance(row.get("album"), dict) else None
thumbnails = row.get("thumbnails") or []
thumbnail = thumbnails[-1].get("url") if thumbnails else None
return SearchResult(
source=self.name,
source_id=str(video_id),
title=row.get("title") or "Unknown",
artist=artist,
album=album,
duration_seconds=row.get("duration_seconds"),
thumbnail_url=thumbnail,
raw=row,
)
def _default_search(query: str, limit: int) -> list[dict[str, Any]]:
"""Real ytmusicapi search (songs only). Runs in a worker thread."""
from ytmusicapi import YTMusic
yt = YTMusic() # unauthenticated: public search needs no login
results: list[dict[str, Any]] = yt.search(query, filter="songs", limit=limit)
return results[:limit]
def _default_download(
video_id: str,
tmp_dir: Path,
progress_hook: Callable[[dict[str, Any]], None],
cookies_path: Path | None,
) -> dict[str, Any]:
"""Real yt-dlp download of the best audio stream. Runs in a worker thread.
Stores the original stream (no transcode — plan §6.3/§6.6). Returns a
normalized dict the adapter maps to :class:`DownloadResult`.
"""
from yt_dlp import YoutubeDL
opts: dict[str, Any] = {
"format": "bestaudio/best",
"outtmpl": str(tmp_dir / "%(id)s.%(ext)s"),
"quiet": True,
"no_warnings": True,
"noprogress": True,
"progress_hooks": [progress_hook],
}
# Use cookies only when the file is actually present: the path can be set
# unconditionally (e.g. a mounted volume that may be empty) and downloads
# still work without it — cookies just unlock age/region-restricted items.
if cookies_path is not None and cookies_path.is_file():
opts["cookiefile"] = str(cookies_path)
with YoutubeDL(opts) as ydl:
info = ydl.extract_info(_watch_url(video_id), download=True)
filepath = Path(ydl.prepare_filename(info))
abr = info.get("abr")
return {
"filepath": filepath,
"file_format": filepath.suffix.lstrip(".").lower() or "m4a",
"bitrate": int(abr) if abr else None,
"title": info.get("title"),
}
+2 -2
View File
@@ -1,7 +1,6 @@
"""arq worker settings — the queue runtime. Task functions register here. """arq worker settings — the queue runtime. Task functions register here.
Run with: ``arq app.workers.arq_worker.WorkerSettings``. Run with: ``arq app.workers.arq_worker.WorkerSettings``.
Tasks (download, transcode) are appended to ``functions`` in later steps.
""" """
from typing import Any, ClassVar from typing import Any, ClassVar
@@ -10,6 +9,7 @@ from arq.connections import RedisSettings
from app.core.config import get_settings from app.core.config import get_settings
from app.core.logging import configure_logging, get_logger from app.core.logging import configure_logging, get_logger
from app.workers.tasks.download_task import download_track
from app.workers.tasks.enrich_task import enrich_track from app.workers.tasks.enrich_task import enrich_track
from app.workers.tasks.import_task import scan_local_folder from app.workers.tasks.import_task import scan_local_folder
@@ -27,7 +27,7 @@ async def shutdown(_ctx: dict[str, Any]) -> None:
class WorkerSettings: class WorkerSettings:
functions: ClassVar[list[Any]] = [scan_local_folder, enrich_track] functions: ClassVar[list[Any]] = [scan_local_folder, enrich_track, download_track]
on_startup = startup on_startup = startup
on_shutdown = shutdown on_shutdown = shutdown
max_jobs = get_settings().max_parallel_downloads max_jobs = get_settings().max_parallel_downloads
+14
View File
@@ -34,6 +34,20 @@ async def enqueue(function: str, **kwargs: Any) -> str:
return str(job.job_id) return str(job.job_id)
async def enqueue_download(job_id: uuid.UUID) -> None:
"""Best-effort enqueue of a download job for the worker.
The job row is already persisted as ``queued``, so this is a follow-up, not a
barrier: if the queue is unreachable we log and move on (graceful
degradation) — the job stays ``queued`` and can be retried later. Deferred a
few seconds so the request's DB transaction commits before the worker reads
the row (same reason as :func:`enqueue_enrich`)."""
try:
await enqueue("download_track", job_id=str(job_id), _defer_by=3)
except DependencyUnavailableError:
log.warning("download_enqueue_failed", job_id=str(job_id))
async def enqueue_enrich(track_id: uuid.UUID) -> None: async def enqueue_enrich(track_id: uuid.UUID) -> None:
"""Best-effort enqueue of metadata enrichment for a freshly stored track. """Best-effort enqueue of metadata enrichment for a freshly stored track.
+151
View File
@@ -0,0 +1,151 @@
"""arq task: download one queued job through a fetch source (plan §6.1).
Flow: load job → ``downloading`` → ``backend.fetch`` (progress streamed to the
job row) → ``enriching`` → store file + minimal track → ``done`` → enqueue
enrichment. yt-dlp fails often, so a failed fetch retries with exponential
backoff (``download_max_retries``); only after the last try is the job marked
``failed`` with a reason for the §A5 download manager.
Heavy I/O belongs off the request cycle (CLAUDE.md); the HTTP endpoint only
enqueues. The job row tolerates being deleted mid-flight (cancellation) — status
writes against a missing row are no-ops.
"""
import uuid
from typing import Any
from arq import Retry
from app.application.download_service import DownloadService
from app.core.config import get_settings
from app.core.logging import correlation_id, get_logger
from app.domain.entities.download import DownloadJob
from app.domain.errors import NotFoundError, ValidationError
from app.domain.ports import FetchableSource, ProgressCallback
from app.domain.sources import DownloadResult
from app.infrastructure.db import session_scope
from app.infrastructure.db.repositories import (
SqlAlchemyArtistRepository,
SqlAlchemyDownloadJobRepository,
SqlAlchemyTrackRepository,
)
from app.infrastructure.sources.registry import build_source_registry
from app.infrastructure.storage.provider import get_file_storage
from app.workers.queue import enqueue_enrich
log = get_logger("worker.download")
# Exponential backoff between retries: 30s, 60s, 120s … capped.
_BACKOFF_BASE_SECONDS = 30
_BACKOFF_MAX_SECONDS = 600
# Only write progress when it advances by at least this much (avoid hammering
# the DB on every yt-dlp chunk).
_PROGRESS_STEP = 0.01
async def download_track(_ctx: dict[str, Any], *, job_id: str) -> dict[str, Any]:
correlation_id.set(f"dl:{job_id}")
jid = uuid.UUID(job_id)
settings = get_settings()
job = await _load_job(jid)
if job is None:
log.info("download_job_missing", job_id=job_id) # cancelled before pickup
return {"job_id": job_id, "status": "missing"}
registry = build_source_registry(settings)
try:
backend = registry.fetchable(job.source)
except (NotFoundError, ValidationError) as exc:
await _mark_failed(jid, f"Source unavailable: {exc}")
return {"job_id": job_id, "status": "failed"}
if job.source_id is None:
await _mark_failed(jid, "Job has no source_id to download.")
return {"job_id": job_id, "status": "failed"}
await _set_status(jid, "downloading")
try:
result = await _run_fetch(backend, job.source_id, jid)
except Exception as exc:
return await _handle_failure(jid, exc, settings.download_max_retries, job_id)
try:
track_id = await _import_result(jid, job, result)
except Exception as exc:
log.exception("download_import_failed", job_id=job_id)
await _mark_failed(jid, f"Import failed: {type(exc).__name__}: {exc}")
return {"job_id": job_id, "status": "failed"}
await enqueue_enrich(track_id)
log.info("download_complete", job_id=job_id, track_id=str(track_id))
return {"job_id": job_id, "status": "done", "track_id": str(track_id)}
async def _run_fetch(
backend: FetchableSource, source_id: str, jid: uuid.UUID
) -> DownloadResult:
"""Fetch the file, streaming progress into the job row. A single session is
held for the download so progress writes don't churn connections; each
throttled update is committed so API pollers see it."""
async with session_scope() as session:
repo = SqlAlchemyDownloadJobRepository(session)
last = 0.0
async def on_progress(frac: float) -> None:
nonlocal last
if frac - last < _PROGRESS_STEP:
return
last = frac
await repo.set_progress(jid, frac)
await session.commit()
cb: ProgressCallback = on_progress
return await backend.fetch(source_id, on_progress=cb)
async def _import_result(jid: uuid.UUID, job: DownloadJob, result: DownloadResult) -> uuid.UUID:
async with session_scope() as session:
repo = SqlAlchemyDownloadJobRepository(session)
await repo.set_status(jid, status="enriching")
service = DownloadService(
jobs=repo,
tracks=SqlAlchemyTrackRepository(session),
artists=SqlAlchemyArtistRepository(session),
storage=get_file_storage(),
)
track_id = await service.store_result(
source=job.source, result=result, requested_by=job.requested_by
)
await repo.set_status(jid, status="done", track_id=track_id)
return track_id
async def _handle_failure(
jid: uuid.UUID, exc: Exception, max_retries: int, job_id: str
) -> dict[str, Any]:
async with session_scope() as session:
tries = await SqlAlchemyDownloadJobRepository(session).increment_retry(jid)
if tries <= max_retries:
backoff = min(_BACKOFF_BASE_SECONDS * 2 ** (tries - 1), _BACKOFF_MAX_SECONDS)
log.warning("download_retry", job_id=job_id, attempt=tries, defer=backoff)
raise Retry(defer=backoff) from exc
log.exception("download_failed", job_id=job_id)
await _mark_failed(jid, f"Download failed after {tries} attempts: {type(exc).__name__}: {exc}")
return {"job_id": job_id, "status": "failed"}
async def _load_job(jid: uuid.UUID) -> DownloadJob | None:
async with session_scope() as session:
return await SqlAlchemyDownloadJobRepository(session).get_by_id(jid)
async def _set_status(jid: uuid.UUID, status: str) -> None:
async with session_scope() as session:
await SqlAlchemyDownloadJobRepository(session).set_status(jid, status=status)
async def _mark_failed(jid: uuid.UUID, error: str) -> None:
async with session_scope() as session:
await SqlAlchemyDownloadJobRepository(session).set_status(
jid, status="failed", error_message=error
)
+7
View File
@@ -27,6 +27,9 @@ dependencies = [
"httpx>=0.28", "httpx>=0.28",
# embedded audio tag reading (enrichment tag pre-pass) # embedded audio tag reading (enrichment tag pre-pass)
"mutagen>=1.47", "mutagen>=1.47",
# youtube source: search (ytmusicapi) + download (yt-dlp)
"ytmusicapi>=1.8",
"yt-dlp>=2024.12.13",
# S3-compatible object storage # S3-compatible object storage
"aioboto3>=13.0", "aioboto3>=13.0",
# logging # logging
@@ -95,6 +98,10 @@ ignore_missing_imports = true
module = ["aioboto3.*", "aiobotocore.*", "botocore.*"] module = ["aioboto3.*", "aiobotocore.*", "botocore.*"]
ignore_missing_imports = true ignore_missing_imports = true
[[tool.mypy.overrides]]
module = ["ytmusicapi.*", "yt_dlp.*"]
ignore_missing_imports = true
[tool.pytest.ini_options] [tool.pytest.ini_options]
asyncio_mode = "auto" asyncio_mode = "auto"
testpaths = ["tests"] testpaths = ["tests"]
+1 -3
View File
@@ -79,9 +79,7 @@ async def _create_test_db_if_missing() -> None:
except Exception: except Exception:
return return
try: try:
exists = await conn.fetchval( exists = await conn.fetchval("SELECT 1 FROM pg_database WHERE datname = $1", _TEST_DB_NAME)
"SELECT 1 FROM pg_database WHERE datname = $1", _TEST_DB_NAME
)
if not exists: if not exists:
# CREATE DATABASE can't run inside a transaction; asyncpg's implicit # CREATE DATABASE can't run inside a transaction; asyncpg's implicit
# autocommit on a bare connection handles that. # autocommit on a bare connection handles that.
+213
View File
@@ -0,0 +1,213 @@
"""Unit tests for DownloadService — DB-free, in-memory fakes."""
import datetime as dt
import uuid
from pathlib import Path
import pytest
from app.application.download_service import DownloadService
from app.domain.entities import Artist, Track
from app.domain.entities.download import DownloadJob
from app.domain.sources import DownloadResult
pytestmark = pytest.mark.asyncio
class FakeArtistRepo:
async def get_or_create(self, name: str) -> Artist:
now = dt.datetime.now(dt.UTC)
return Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now)
class FakeTrackRepo:
def __init__(self) -> None:
self.by_source: dict[tuple[str, str], Track] = {}
self.added: list[Track] = []
async def get_by_source(self, source: str, source_id: str) -> Track | None:
return self.by_source.get((source, source_id))
async def add(self, **kw: object) -> Track:
now = dt.datetime.now(dt.UTC)
track = Track(
id=kw["id"], # type: ignore[arg-type]
title=str(kw["title"]),
artist_id=kw["artist_id"], # type: ignore[arg-type]
album_id=None,
storage_uri=str(kw["storage_uri"]),
file_format=str(kw["file_format"]),
file_size=int(kw["file_size"]), # type: ignore[call-overload]
source=str(kw["source"]),
source_id=str(kw["source_id"]),
duration_seconds=None,
genre=None,
year=None,
track_number=None,
metadata_status=str(kw["metadata_status"]),
metadata_error=None,
enriched_at=None,
created_at=now,
updated_at=now,
)
self.by_source[(track.source, track.source_id)] = track
self.added.append(track)
return track
class FakeStorage:
def __init__(self) -> None:
self.saved: dict[str, Path] = {}
self.deleted: list[str] = []
async def save_file(self, key: str, src_path: Path) -> int:
self.saved[key] = src_path
return 1
async def delete(self, key: str) -> None:
self.deleted.append(key)
class FakeJobRepo:
def __init__(self) -> None:
self.jobs: dict[uuid.UUID, DownloadJob] = {}
self.active: dict[tuple[str, str], DownloadJob] = {}
def _make(self, **kw: object) -> DownloadJob:
now = dt.datetime.now(dt.UTC)
return DownloadJob(
id=uuid.uuid4(),
source=str(kw["source"]),
source_id=kw.get("source_id"), # type: ignore[arg-type]
query=kw.get("query"), # type: ignore[arg-type]
requested_by=kw.get("requested_by"), # type: ignore[arg-type]
status="queued",
progress=0.0,
error_message=None,
retry_count=0,
track_id=None,
created_at=now,
updated_at=now,
)
async def add(self, **kw: object) -> DownloadJob:
job = self._make(**kw)
self.jobs[job.id] = job
return job
async def get_by_id(self, job_id: uuid.UUID) -> DownloadJob | None:
return self.jobs.get(job_id)
async def get_active_for_source(self, source: str, source_id: str) -> DownloadJob | None:
return self.active.get((source, source_id))
async def set_status(self, job_id: uuid.UUID, **kw: object) -> None: ...
async def delete(self, job_id: uuid.UUID) -> None:
self.jobs.pop(job_id, None)
def _service(
*, jobs: FakeJobRepo, tracks: FakeTrackRepo, storage: FakeStorage, enqueued: list[uuid.UUID]
) -> DownloadService:
async def enqueue_download(job_id: uuid.UUID) -> None:
enqueued.append(job_id)
return DownloadService(
jobs=jobs, # type: ignore[arg-type]
tracks=tracks, # type: ignore[arg-type]
artists=FakeArtistRepo(), # type: ignore[arg-type]
storage=storage, # type: ignore[arg-type]
enqueue_download=enqueue_download,
)
def _track(source: str, source_id: str) -> Track:
now = dt.datetime.now(dt.UTC)
return Track(
id=uuid.uuid4(),
title="t",
artist_id=uuid.uuid4(),
album_id=None,
storage_uri="k",
file_format="mp3",
file_size=1,
source=source,
source_id=source_id,
duration_seconds=None,
genre=None,
year=None,
track_number=None,
metadata_status="pending",
metadata_error=None,
enriched_at=None,
created_at=now,
updated_at=now,
)
async def test_request_dedups_against_library() -> None:
jobs, tracks, storage, enq = FakeJobRepo(), FakeTrackRepo(), FakeStorage(), []
tracks.by_source[("youtube", "abc")] = _track("youtube", "abc")
svc = _service(jobs=jobs, tracks=tracks, storage=storage, enqueued=enq)
result = await svc.request(source="youtube", source_id="abc", query=None, requested_by=None)
assert result.already_in_library is True
assert result.track_id is not None
assert result.job is None
assert enq == [] # nothing enqueued
async def test_request_returns_existing_active_job() -> None:
jobs, tracks, storage, enq = FakeJobRepo(), FakeTrackRepo(), FakeStorage(), []
existing = await jobs.add(source="youtube", source_id="abc", query=None, requested_by=None)
jobs.active[("youtube", "abc")] = existing
svc = _service(jobs=jobs, tracks=tracks, storage=storage, enqueued=enq)
result = await svc.request(source="youtube", source_id="abc", query=None, requested_by=None)
assert result.already_in_library is False
assert result.job is not None
assert result.job.id == existing.id
assert enq == [] # not re-enqueued
async def test_request_creates_and_enqueues_new_job() -> None:
jobs, tracks, storage, enq = FakeJobRepo(), FakeTrackRepo(), FakeStorage(), []
svc = _service(jobs=jobs, tracks=tracks, storage=storage, enqueued=enq)
result = await svc.request(
source="youtube", source_id="abc", query="bohemian", requested_by=None
)
assert result.already_in_library is False
assert result.job is not None
assert enq == [result.job.id]
async def test_store_result_imports_and_cleans_temp(tmp_path: Path) -> None:
jobs, tracks, storage, enq = FakeJobRepo(), FakeTrackRepo(), FakeStorage(), []
svc = _service(jobs=jobs, tracks=tracks, storage=storage, enqueued=enq)
audio = tmp_path / "abc.webm"
audio.write_bytes(b"audio" * 20)
result = DownloadResult(
source_id="abc",
path=audio,
file_format="m4a",
file_size=100,
bitrate=160,
suggested_title="Bohemian Rhapsody",
)
track_id = await svc.store_result(source="youtube", result=result, requested_by=None)
assert len(tracks.added) == 1
stored = tracks.added[0]
assert stored.id == track_id
assert stored.source == "youtube"
assert stored.source_id == "abc"
assert stored.metadata_status == "pending"
assert stored.title == "Bohemian Rhapsody"
assert len(storage.saved) == 1
assert not audio.exists() # temp file removed
+252
View File
@@ -0,0 +1,252 @@
"""Integration tests for downloads + external search.
Requires a reachable Postgres; skips otherwise. The download worker task is
invoked directly (no Redis needed) against a fake fetch source, so the full
DB + storage import path is covered without touching the network.
"""
import asyncio
import os
from collections.abc import AsyncIterator
from pathlib import Path
from typing import Any
import pytest
from app.core.config import get_settings
from app.domain.sources import KIND_FETCH, DownloadResult, SearchResult, SourceInfo
from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
from app.infrastructure.db.repositories import (
SqlAlchemyRefreshTokenRepository,
SqlAlchemyUserRepository,
)
from app.infrastructure.sources.registry import SourceRegistry
from asgi_lifespan import LifespanManager
from httpx import ASGITransport, AsyncClient
pytestmark = pytest.mark.asyncio
_db_reachable_cache: bool | None = None
async def _db_reachable() -> bool:
global _db_reachable_cache
if _db_reachable_cache is not None:
return _db_reachable_cache
from sqlalchemy import text
try:
async with asyncio.timeout(3):
async with get_engine().connect() as conn:
await conn.execute(text("SELECT 1"))
_db_reachable_cache = True
except Exception:
_db_reachable_cache = False
return _db_reachable_cache
class FakeFetchSource:
"""A searchable + fetchable source that writes a local file (no network)."""
name = "youtube"
def __init__(self, tmp_dir: Path) -> None:
self._tmp_dir = tmp_dir
def info(self) -> SourceInfo:
return SourceInfo(name=self.name, label="YouTube Music", kind=KIND_FETCH, available=True)
def is_available(self) -> bool:
return True
async def search(self, query: str, *, limit: int) -> list[SearchResult]:
return [
SearchResult(
source=self.name,
source_id="vid-1",
title=f"{query} song",
artist="Some Artist",
album="Some Album",
duration_seconds=200,
thumbnail_url="http://img/large.jpg",
)
]
async def fetch(self, source_id: str, *, on_progress: Any = None) -> DownloadResult:
path = self._tmp_dir / f"{source_id}.m4a"
path.write_bytes(b"downloaded audio bytes" * 8)
if on_progress is not None:
await on_progress(0.5)
return DownloadResult(
source_id=source_id,
path=path,
file_format="webm",
file_size=path.stat().st_size,
bitrate=160,
suggested_title=f"Title for {source_id}",
)
async def get_metadata(self, source_id: str) -> None:
return None
@pytest.fixture
async def api(tmp_path: Path) -> AsyncIterator[AsyncClient]:
if not await _db_reachable():
pytest.skip("Postgres not reachable — integration test skipped.")
media = tmp_path / "media"
media.mkdir()
os.environ["MEDIA_PATH"] = str(media)
get_settings.cache_clear()
import app.infrastructure.storage.provider as _storage_provider
_storage_provider._storage = None
try:
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
from app.application.user_service import UserService
from app.core.security import Argon2PasswordHasher
async with session_scope() as session:
await UserService(
users=SqlAlchemyUserRepository(session),
refresh_tokens=SqlAlchemyRefreshTokenRepository(session),
hasher=Argon2PasswordHasher(),
).create_user(username="admin", password="adminpass1", is_superuser=True)
from app.api.deps import get_source_registry
from app.main import create_app
app = create_app()
# Inject a fake fetch source so search/download never hit the network.
fake_registry = SourceRegistry([FakeFetchSource(tmp_path / "dl")]) # type: ignore[list-item]
(tmp_path / "dl").mkdir()
app.dependency_overrides[get_source_registry] = lambda: fake_registry
async with LifespanManager(app):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await dispose_engine()
finally:
_storage_provider._storage = None
os.environ.pop("MEDIA_PATH", None)
get_settings.cache_clear()
async def _login(api: AsyncClient) -> str:
resp = await api.post(
"/api/v1/auth/login", json={"username": "admin", "password": "adminpass1"}
)
assert resp.status_code == 200
return str(resp.json()["access_token"])
async def test_search_aggregates_fetch_sources(api: AsyncClient) -> None:
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
resp = await api.get("/api/v1/search", params={"q": "queen"}, headers=headers)
assert resp.status_code == 200
body = resp.json()
assert body["searched_sources"] == ["youtube"]
assert len(body["results"]) == 1
hit = body["results"][0]
assert hit["source"] == "youtube"
assert hit["source_id"] == "vid-1"
assert hit["title"] == "queen song"
async def test_source_scoped_search(api: AsyncClient) -> None:
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
resp = await api.get("/api/v1/sources/youtube/search", params={"q": "abba"}, headers=headers)
assert resp.status_code == 200
assert resp.json()["results"][0]["title"] == "abba song"
async def test_download_create_list_and_complete(
api: AsyncClient, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
# Request a download — Redis is absent, so enqueue degrades but the job persists.
create = await api.post(
"/api/v1/downloads",
json={"source": "youtube", "source_id": "vid-1", "query": "queen"},
headers=headers,
)
assert create.status_code == 202
body = create.json()
assert body["already_in_library"] is False
job_id = body["job"]["id"]
assert body["job"]["status"] == "queued"
# It shows up in the listing.
listing = await api.get("/api/v1/downloads", headers=headers)
assert listing.status_code == 200
assert any(j["id"] == job_id for j in listing.json()["items"])
# A duplicate request returns the same in-flight job, not a new one.
dup = await api.post(
"/api/v1/downloads",
json={"source": "youtube", "source_id": "vid-1"},
headers=headers,
)
assert dup.json()["job"]["id"] == job_id
# Run the worker task directly (bypasses Redis) with the fake fetch source.
import app.workers.tasks.download_task as dl_task
worker_dl = tmp_path / "worker-dl"
worker_dl.mkdir()
fake = SourceRegistry([FakeFetchSource(worker_dl)]) # type: ignore[list-item]
monkeypatch.setattr(dl_task, "build_source_registry", lambda _settings: fake)
result = await dl_task.download_track({}, job_id=job_id)
assert result["status"] == "done"
track_id = result["track_id"]
# The job is now done and linked to the imported track.
got = await api.get(f"/api/v1/downloads/{job_id}", headers=headers)
assert got.json()["status"] == "done"
assert got.json()["track_id"] == track_id
# The imported track streams back.
stream = await api.get(f"/api/v1/stream/{track_id}", headers=headers)
assert stream.status_code == 200
assert len(stream.content) > 0
# A new request for the same item now dedups against the library.
again = await api.post(
"/api/v1/downloads",
json={"source": "youtube", "source_id": "vid-1"},
headers=headers,
)
assert again.json()["already_in_library"] is True
assert again.json()["track_id"] == track_id
async def test_cancel_download(api: AsyncClient) -> None:
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
create = await api.post(
"/api/v1/downloads",
json={"source": "youtube", "source_id": "vid-cancel"},
headers=headers,
)
job_id = create.json()["job"]["id"]
cancel = await api.delete(f"/api/v1/downloads/{job_id}", headers=headers)
assert cancel.status_code == 204
got = await api.get(f"/api/v1/downloads/{job_id}", headers=headers)
assert got.status_code == 404
+7 -2
View File
@@ -48,12 +48,17 @@ def test_info_reports_kind_and_availability(tmp_path: Path) -> None:
def test_registry_registers_local_when_path_set(tmp_path: Path) -> None: def test_registry_registers_local_when_path_set(tmp_path: Path) -> None:
registry = build_source_registry(_settings(local_media_import_path=tmp_path)) # Disable youtube to isolate the local-source registration under test.
registry = build_source_registry(
_settings(local_media_import_path=tmp_path, youtube_enabled=False)
)
names = {info.name for info in registry.infos()} names = {info.name for info in registry.infos()}
assert names == {"local"} assert names == {"local"}
assert registry.indexable("local").is_available() is True assert registry.indexable("local").is_available() is True
def test_registry_empty_when_path_unset() -> None: def test_registry_empty_when_path_unset() -> None:
registry = build_source_registry(_settings(local_media_import_path=None)) registry = build_source_registry(
_settings(local_media_import_path=None, youtube_enabled=False)
)
assert registry.infos() == [] assert registry.infos() == []
+1 -3
View File
@@ -107,9 +107,7 @@ async def test_track_out_includes_genre_year_track_number(api: AsyncClient) -> N
token = await _login(api) token = await _login(api)
track_id = await _upload(api, token) track_id = await _upload(api, token)
resp = await api.get( resp = await api.get(f"/api/v1/tracks/{track_id}", headers={"Authorization": f"Bearer {token}"})
f"/api/v1/tracks/{track_id}", headers={"Authorization": f"Bearer {token}"}
)
assert resp.status_code == 200, resp.text assert resp.status_code == 200, resp.text
body = resp.json() body = resp.json()
assert "genre" in body assert "genre" in body
+22
View File
@@ -190,3 +190,25 @@ async def test_upload_requires_auth(api: AsyncClient) -> None:
files={"file": ("x.mp3", b"data", "audio/mpeg")}, files={"file": ("x.mp3", b"data", "audio/mpeg")},
) )
assert resp.status_code == 401 assert resp.status_code == 401
async def test_list_tracks_filters_by_source(api: AsyncClient) -> None:
# Uploaded tracks carry source="upload"; `?source=` narrows the list (this
# powers the webui's "Recently uploaded" view, which survives a refresh).
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
await api.post(
"/api/v1/upload",
files={"file": ("uploaded.mp3", b"upload bytes" * 80, "audio/mpeg")},
headers=headers,
)
hit = await api.get("/api/v1/tracks", params={"source": "upload"}, headers=headers)
assert hit.status_code == 200, hit.text
items = hit.json()["items"]
assert len(items) == 1
assert items[0]["source"] == "upload"
miss = await api.get("/api/v1/tracks", params={"source": "youtube"}, headers=headers)
assert miss.status_code == 200
assert miss.json()["items"] == []
+135
View File
@@ -0,0 +1,135 @@
"""Unit tests for YouTubeMusicSource + registry (no network, injected libs)."""
from pathlib import Path
from typing import Any
import pytest
from app.core.config import Settings
from app.domain.sources import KIND_FETCH
from app.infrastructure.sources.registry import build_source_registry
from app.infrastructure.sources.youtube import YouTubeMusicSource
pytestmark = pytest.mark.asyncio
def _song_row(**overrides: Any) -> dict[str, Any]:
row: dict[str, Any] = {
"videoId": "abc123",
"title": "Bohemian Rhapsody",
"artists": [{"name": "Queen", "id": "a1"}],
"album": {"name": "A Night at the Opera", "id": "al1"},
"duration_seconds": 354,
"thumbnails": [
{"url": "http://img/small.jpg", "width": 60, "height": 60},
{"url": "http://img/large.jpg", "width": 240, "height": 240},
],
}
row.update(overrides)
return row
def _settings(**overrides: object) -> Settings:
return Settings(**overrides) # type: ignore[arg-type]
async def test_search_maps_ytmusic_rows() -> None:
source = YouTubeMusicSource(search_fn=lambda q, limit: [_song_row()])
[result] = await source.search("queen", limit=10)
assert result.source == "youtube"
assert result.source_id == "abc123"
assert result.title == "Bohemian Rhapsody"
assert result.artist == "Queen"
assert result.album == "A Night at the Opera"
assert result.duration_seconds == 354
assert result.thumbnail_url == "http://img/large.jpg" # last (largest)
async def test_search_joins_multiple_artists_and_tolerates_missing_fields() -> None:
row = _song_row(
artists=[{"name": "Queen"}, {"name": "David Bowie"}],
album=None,
thumbnails=[],
duration_seconds=None,
)
source = YouTubeMusicSource(search_fn=lambda q, limit: [row])
[result] = await source.search("under pressure", limit=10)
assert result.artist == "Queen, David Bowie"
assert result.album is None
assert result.thumbnail_url is None
assert result.duration_seconds is None
async def test_search_drops_rows_without_video_id() -> None:
rows = [_song_row(), _song_row(videoId=None), _song_row(videoId="xyz")]
source = YouTubeMusicSource(search_fn=lambda q, limit: rows)
results = await source.search("q", limit=10)
assert [r.source_id for r in results] == ["abc123", "xyz"]
async def test_search_empty_query_short_circuits() -> None:
called = False
def _search(q: str, limit: int) -> list[dict[str, Any]]:
nonlocal called
called = True
return []
source = YouTubeMusicSource(search_fn=_search)
assert await source.search(" ", limit=10) == []
assert called is False
async def test_search_degrades_to_empty_on_error() -> None:
def _boom(q: str, limit: int) -> list[dict[str, Any]]:
raise RuntimeError("service down")
source = YouTubeMusicSource(search_fn=_boom)
assert await source.search("q", limit=10) == []
async def test_fetch_maps_download_result(tmp_path: Path) -> None:
audio = tmp_path / "abc123.m4a"
audio.write_bytes(b"opus-bytes" * 10)
def _download(video_id: str, tmp_dir: Path, hook: Any, cookies: Path | None) -> dict[str, Any]:
return {
"filepath": audio,
"file_format": "m4a",
"bitrate": 160,
"title": "Bohemian Rhapsody",
}
source = YouTubeMusicSource(download_fn=_download)
result = await source.fetch("abc123")
assert result.source_id == "abc123"
assert result.path == audio
assert result.file_format == "m4a"
assert result.file_size == len(b"opus-bytes" * 10)
assert result.bitrate == 160
assert result.suggested_title == "Bohemian Rhapsody"
async def test_info_and_availability_with_injected_fn() -> None:
source = YouTubeMusicSource(search_fn=lambda q, limit: [])
info = source.info()
assert info.name == "youtube"
assert info.kind == KIND_FETCH
assert info.available is True # injected fn → treated as available
async def test_registry_registers_youtube_when_enabled() -> None:
registry = build_source_registry(_settings(youtube_enabled=True))
names = {info.name for info in registry.infos()}
assert "youtube" in names
# youtube is searchable + fetchable, not indexable
assert registry.searchable("youtube").name == "youtube"
assert registry.fetchable("youtube").name == "youtube"
async def test_registry_omits_youtube_when_disabled() -> None:
registry = build_source_registry(_settings(youtube_enabled=False))
names = {info.name for info in registry.infos()}
assert "youtube" not in names
Generated
+836 -762
View File
File diff suppressed because it is too large Load Diff