"""arq task: download one queued job through a fetch source (plan §6.1). Flow: load job → ``downloading`` → ``backend.fetch`` (progress streamed to the job row) → ``enriching`` → store file + minimal track → ``done`` → enqueue enrichment. yt-dlp fails often, so a failed fetch retries with exponential backoff (``download_max_retries``); only after the last try is the job marked ``failed`` with a reason for the §A5 download manager. Heavy I/O belongs off the request cycle (CLAUDE.md); the HTTP endpoint only enqueues. The job row tolerates being deleted mid-flight (cancellation) — status writes against a missing row are no-ops. """ import uuid from typing import Any from arq import Retry from app.application.download_service import DownloadService from app.core.config import get_settings from app.core.logging import correlation_id, get_logger from app.domain.entities.download import DownloadJob from app.domain.errors import NotFoundError, ValidationError from app.domain.ports import FetchableSource, ProgressCallback from app.domain.sources import DownloadResult from app.infrastructure.db import session_scope from app.infrastructure.db.repositories import ( SqlAlchemyArtistRepository, SqlAlchemyDownloadJobRepository, SqlAlchemyTrackRepository, ) from app.infrastructure.sources.registry import build_source_registry from app.infrastructure.storage.provider import get_file_storage from app.workers.queue import enqueue_enrich log = get_logger("worker.download") # Exponential backoff between retries: 30s, 60s, 120s … capped. _BACKOFF_BASE_SECONDS = 30 _BACKOFF_MAX_SECONDS = 600 # Only write progress when it advances by at least this much (avoid hammering # the DB on every yt-dlp chunk). _PROGRESS_STEP = 0.01 async def download_track(_ctx: dict[str, Any], *, job_id: str) -> dict[str, Any]: correlation_id.set(f"dl:{job_id}") jid = uuid.UUID(job_id) settings = get_settings() job = await _load_job(jid) if job is None: log.info("download_job_missing", job_id=job_id) # cancelled before pickup return {"job_id": job_id, "status": "missing"} registry = build_source_registry(settings) try: backend = registry.fetchable(job.source) except (NotFoundError, ValidationError) as exc: await _mark_failed(jid, f"Source unavailable: {exc}") return {"job_id": job_id, "status": "failed"} if job.source_id is None: await _mark_failed(jid, "Job has no source_id to download.") return {"job_id": job_id, "status": "failed"} await _set_status(jid, "downloading") try: result = await _run_fetch(backend, job.source_id, jid) except Exception as exc: return await _handle_failure(jid, exc, settings.download_max_retries, job_id) try: track_id = await _import_result(jid, job, result) except Exception as exc: log.exception("download_import_failed", job_id=job_id) await _mark_failed(jid, f"Import failed: {type(exc).__name__}: {exc}") return {"job_id": job_id, "status": "failed"} await enqueue_enrich(track_id) log.info("download_complete", job_id=job_id, track_id=str(track_id)) return {"job_id": job_id, "status": "done", "track_id": str(track_id)} async def _run_fetch( backend: FetchableSource, source_id: str, jid: uuid.UUID ) -> DownloadResult: """Fetch the file, streaming progress into the job row. A single session is held for the download so progress writes don't churn connections; each throttled update is committed so API pollers see it.""" async with session_scope() as session: repo = SqlAlchemyDownloadJobRepository(session) last = 0.0 async def on_progress(frac: float) -> None: nonlocal last if frac - last < _PROGRESS_STEP: return last = frac await repo.set_progress(jid, frac) await session.commit() cb: ProgressCallback = on_progress return await backend.fetch(source_id, on_progress=cb) async def _import_result(jid: uuid.UUID, job: DownloadJob, result: DownloadResult) -> uuid.UUID: async with session_scope() as session: repo = SqlAlchemyDownloadJobRepository(session) await repo.set_status(jid, status="enriching") service = DownloadService( jobs=repo, tracks=SqlAlchemyTrackRepository(session), artists=SqlAlchemyArtistRepository(session), storage=get_file_storage(), ) track_id = await service.store_result( source=job.source, result=result, requested_by=job.requested_by ) await repo.set_status(jid, status="done", track_id=track_id) return track_id async def _handle_failure( jid: uuid.UUID, exc: Exception, max_retries: int, job_id: str ) -> dict[str, Any]: async with session_scope() as session: tries = await SqlAlchemyDownloadJobRepository(session).increment_retry(jid) if tries <= max_retries: backoff = min(_BACKOFF_BASE_SECONDS * 2 ** (tries - 1), _BACKOFF_MAX_SECONDS) log.warning("download_retry", job_id=job_id, attempt=tries, defer=backoff) raise Retry(defer=backoff) from exc log.exception("download_failed", job_id=job_id) await _mark_failed(jid, f"Download failed after {tries} attempts: {type(exc).__name__}: {exc}") return {"job_id": job_id, "status": "failed"} async def _load_job(jid: uuid.UUID) -> DownloadJob | None: async with session_scope() as session: return await SqlAlchemyDownloadJobRepository(session).get_by_id(jid) async def _set_status(jid: uuid.UUID, status: str) -> None: async with session_scope() as session: await SqlAlchemyDownloadJobRepository(session).set_status(jid, status=status) async def _mark_failed(jid: uuid.UUID, error: str) -> None: async with session_scope() as session: await SqlAlchemyDownloadJobRepository(session).set_status( jid, status="failed", error_message=error )