feat(sources): YouTube Music search + download pipeline (§1C/§1E)
Pluggable fetch source: ytmusicapi search + yt-dlp download (cookies-file guard), DownloadJob entity/repo + DownloadService, download_task worker with exponential-backoff retries, and wired /search, /sources/{source}/search, and /downloads endpoints. Adds youtube_enabled/cookies config, yt-dlp+ytmusicapi deps, and the download_jobs.track_id migration. Snapshot also bundles in-progress storage/tracks/acoustid edits.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
"""arq worker settings — the queue runtime. Task functions register here.
|
||||
|
||||
Run with: ``arq app.workers.arq_worker.WorkerSettings``.
|
||||
Tasks (download, transcode) are appended to ``functions`` in later steps.
|
||||
"""
|
||||
|
||||
from typing import Any, ClassVar
|
||||
@@ -10,6 +9,7 @@ from arq.connections import RedisSettings
|
||||
|
||||
from app.core.config import get_settings
|
||||
from app.core.logging import configure_logging, get_logger
|
||||
from app.workers.tasks.download_task import download_track
|
||||
from app.workers.tasks.enrich_task import enrich_track
|
||||
from app.workers.tasks.import_task import scan_local_folder
|
||||
|
||||
@@ -27,7 +27,7 @@ async def shutdown(_ctx: dict[str, Any]) -> None:
|
||||
|
||||
|
||||
class WorkerSettings:
|
||||
functions: ClassVar[list[Any]] = [scan_local_folder, enrich_track]
|
||||
functions: ClassVar[list[Any]] = [scan_local_folder, enrich_track, download_track]
|
||||
on_startup = startup
|
||||
on_shutdown = shutdown
|
||||
max_jobs = get_settings().max_parallel_downloads
|
||||
|
||||
@@ -34,6 +34,20 @@ async def enqueue(function: str, **kwargs: Any) -> str:
|
||||
return str(job.job_id)
|
||||
|
||||
|
||||
async def enqueue_download(job_id: uuid.UUID) -> None:
|
||||
"""Best-effort enqueue of a download job for the worker.
|
||||
|
||||
The job row is already persisted as ``queued``, so this is a follow-up, not a
|
||||
barrier: if the queue is unreachable we log and move on (graceful
|
||||
degradation) — the job stays ``queued`` and can be retried later. Deferred a
|
||||
few seconds so the request's DB transaction commits before the worker reads
|
||||
the row (same reason as :func:`enqueue_enrich`)."""
|
||||
try:
|
||||
await enqueue("download_track", job_id=str(job_id), _defer_by=3)
|
||||
except DependencyUnavailableError:
|
||||
log.warning("download_enqueue_failed", job_id=str(job_id))
|
||||
|
||||
|
||||
async def enqueue_enrich(track_id: uuid.UUID) -> None:
|
||||
"""Best-effort enqueue of metadata enrichment for a freshly stored track.
|
||||
|
||||
|
||||
@@ -0,0 +1,151 @@
|
||||
"""arq task: download one queued job through a fetch source (plan §6.1).
|
||||
|
||||
Flow: load job → ``downloading`` → ``backend.fetch`` (progress streamed to the
|
||||
job row) → ``enriching`` → store file + minimal track → ``done`` → enqueue
|
||||
enrichment. yt-dlp fails often, so a failed fetch retries with exponential
|
||||
backoff (``download_max_retries``); only after the last try is the job marked
|
||||
``failed`` with a reason for the §A5 download manager.
|
||||
|
||||
Heavy I/O belongs off the request cycle (CLAUDE.md); the HTTP endpoint only
|
||||
enqueues. The job row tolerates being deleted mid-flight (cancellation) — status
|
||||
writes against a missing row are no-ops.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
from typing import Any
|
||||
|
||||
from arq import Retry
|
||||
|
||||
from app.application.download_service import DownloadService
|
||||
from app.core.config import get_settings
|
||||
from app.core.logging import correlation_id, get_logger
|
||||
from app.domain.entities.download import DownloadJob
|
||||
from app.domain.errors import NotFoundError, ValidationError
|
||||
from app.domain.ports import FetchableSource, ProgressCallback
|
||||
from app.domain.sources import DownloadResult
|
||||
from app.infrastructure.db import session_scope
|
||||
from app.infrastructure.db.repositories import (
|
||||
SqlAlchemyArtistRepository,
|
||||
SqlAlchemyDownloadJobRepository,
|
||||
SqlAlchemyTrackRepository,
|
||||
)
|
||||
from app.infrastructure.sources.registry import build_source_registry
|
||||
from app.infrastructure.storage.provider import get_file_storage
|
||||
from app.workers.queue import enqueue_enrich
|
||||
|
||||
log = get_logger("worker.download")
|
||||
|
||||
# Exponential backoff between retries: 30s, 60s, 120s … capped.
|
||||
_BACKOFF_BASE_SECONDS = 30
|
||||
_BACKOFF_MAX_SECONDS = 600
|
||||
# Only write progress when it advances by at least this much (avoid hammering
|
||||
# the DB on every yt-dlp chunk).
|
||||
_PROGRESS_STEP = 0.01
|
||||
|
||||
|
||||
async def download_track(_ctx: dict[str, Any], *, job_id: str) -> dict[str, Any]:
|
||||
correlation_id.set(f"dl:{job_id}")
|
||||
jid = uuid.UUID(job_id)
|
||||
settings = get_settings()
|
||||
|
||||
job = await _load_job(jid)
|
||||
if job is None:
|
||||
log.info("download_job_missing", job_id=job_id) # cancelled before pickup
|
||||
return {"job_id": job_id, "status": "missing"}
|
||||
|
||||
registry = build_source_registry(settings)
|
||||
try:
|
||||
backend = registry.fetchable(job.source)
|
||||
except (NotFoundError, ValidationError) as exc:
|
||||
await _mark_failed(jid, f"Source unavailable: {exc}")
|
||||
return {"job_id": job_id, "status": "failed"}
|
||||
if job.source_id is None:
|
||||
await _mark_failed(jid, "Job has no source_id to download.")
|
||||
return {"job_id": job_id, "status": "failed"}
|
||||
|
||||
await _set_status(jid, "downloading")
|
||||
try:
|
||||
result = await _run_fetch(backend, job.source_id, jid)
|
||||
except Exception as exc:
|
||||
return await _handle_failure(jid, exc, settings.download_max_retries, job_id)
|
||||
|
||||
try:
|
||||
track_id = await _import_result(jid, job, result)
|
||||
except Exception as exc:
|
||||
log.exception("download_import_failed", job_id=job_id)
|
||||
await _mark_failed(jid, f"Import failed: {type(exc).__name__}: {exc}")
|
||||
return {"job_id": job_id, "status": "failed"}
|
||||
|
||||
await enqueue_enrich(track_id)
|
||||
log.info("download_complete", job_id=job_id, track_id=str(track_id))
|
||||
return {"job_id": job_id, "status": "done", "track_id": str(track_id)}
|
||||
|
||||
|
||||
async def _run_fetch(
|
||||
backend: FetchableSource, source_id: str, jid: uuid.UUID
|
||||
) -> DownloadResult:
|
||||
"""Fetch the file, streaming progress into the job row. A single session is
|
||||
held for the download so progress writes don't churn connections; each
|
||||
throttled update is committed so API pollers see it."""
|
||||
async with session_scope() as session:
|
||||
repo = SqlAlchemyDownloadJobRepository(session)
|
||||
last = 0.0
|
||||
|
||||
async def on_progress(frac: float) -> None:
|
||||
nonlocal last
|
||||
if frac - last < _PROGRESS_STEP:
|
||||
return
|
||||
last = frac
|
||||
await repo.set_progress(jid, frac)
|
||||
await session.commit()
|
||||
|
||||
cb: ProgressCallback = on_progress
|
||||
return await backend.fetch(source_id, on_progress=cb)
|
||||
|
||||
|
||||
async def _import_result(jid: uuid.UUID, job: DownloadJob, result: DownloadResult) -> uuid.UUID:
|
||||
async with session_scope() as session:
|
||||
repo = SqlAlchemyDownloadJobRepository(session)
|
||||
await repo.set_status(jid, status="enriching")
|
||||
service = DownloadService(
|
||||
jobs=repo,
|
||||
tracks=SqlAlchemyTrackRepository(session),
|
||||
artists=SqlAlchemyArtistRepository(session),
|
||||
storage=get_file_storage(),
|
||||
)
|
||||
track_id = await service.store_result(
|
||||
source=job.source, result=result, requested_by=job.requested_by
|
||||
)
|
||||
await repo.set_status(jid, status="done", track_id=track_id)
|
||||
return track_id
|
||||
|
||||
|
||||
async def _handle_failure(
|
||||
jid: uuid.UUID, exc: Exception, max_retries: int, job_id: str
|
||||
) -> dict[str, Any]:
|
||||
async with session_scope() as session:
|
||||
tries = await SqlAlchemyDownloadJobRepository(session).increment_retry(jid)
|
||||
if tries <= max_retries:
|
||||
backoff = min(_BACKOFF_BASE_SECONDS * 2 ** (tries - 1), _BACKOFF_MAX_SECONDS)
|
||||
log.warning("download_retry", job_id=job_id, attempt=tries, defer=backoff)
|
||||
raise Retry(defer=backoff) from exc
|
||||
log.exception("download_failed", job_id=job_id)
|
||||
await _mark_failed(jid, f"Download failed after {tries} attempts: {type(exc).__name__}: {exc}")
|
||||
return {"job_id": job_id, "status": "failed"}
|
||||
|
||||
|
||||
async def _load_job(jid: uuid.UUID) -> DownloadJob | None:
|
||||
async with session_scope() as session:
|
||||
return await SqlAlchemyDownloadJobRepository(session).get_by_id(jid)
|
||||
|
||||
|
||||
async def _set_status(jid: uuid.UUID, status: str) -> None:
|
||||
async with session_scope() as session:
|
||||
await SqlAlchemyDownloadJobRepository(session).set_status(jid, status=status)
|
||||
|
||||
|
||||
async def _mark_failed(jid: uuid.UUID, error: str) -> None:
|
||||
async with session_scope() as session:
|
||||
await SqlAlchemyDownloadJobRepository(session).set_status(
|
||||
jid, status="failed", error_message=error
|
||||
)
|
||||
Reference in New Issue
Block a user