"""arq task: materialize a remote placeholder track (plan: Model C). Counterpart to ``download_task`` for tracks that were *saved* from a remote browse hit without audio (``availability="remote"``, ``storage_uri=NULL``). The job's ``track_id`` already points at the existing placeholder row — on success the file is stored and ``TrackRepository.materialize`` fills the row in place (the track's ``id`` never changes), then enrichment is enqueued as usual. Shares its fetch/retry/failure machinery with ``download_task`` — only the "what happens on success" step differs (fill in an existing row vs. create a new one). """ import contextlib import uuid from typing import Any import anyio from app.core.config import get_settings from app.core.logging import correlation_id, get_logger from app.domain.errors import NotFoundError, ValidationError from app.domain.sources import DownloadResult from app.infrastructure.db import session_scope from app.infrastructure.db.repositories import ( SqlAlchemyDownloadJobRepository, SqlAlchemyTrackRepository, ) from app.infrastructure.sources.registry import build_source_registry from app.infrastructure.storage.provider import get_file_storage from app.workers.queue import enqueue_enrich from app.workers.tasks.download_task import _handle_failure, _load_job, _mark_failed, _run_fetch log = get_logger("worker.materialize") async def materialize_track(_ctx: dict[str, Any], *, job_id: str) -> dict[str, Any]: correlation_id.set(f"mat:{job_id}") jid = uuid.UUID(job_id) settings = get_settings() job = await _load_job(jid) if job is None: log.info("materialize_job_missing", job_id=job_id) # cancelled before pickup return {"job_id": job_id, "status": "missing"} if job.track_id is None or job.source_id is None: await _mark_failed(jid, "Materialize job missing track_id/source_id.") return {"job_id": job_id, "status": "failed"} registry = build_source_registry(settings) try: backend = registry.fetchable(job.source) except (NotFoundError, ValidationError) as exc: await _mark_failed(jid, f"Source unavailable: {exc}") return {"job_id": job_id, "status": "failed"} await _set_status(jid, "downloading") try: result = await _run_fetch(backend, job.source_id, jid) except Exception as exc: return await _handle_failure(jid, exc, settings.download_max_retries, job_id) try: await _materialize_result(jid, job.track_id, result) except Exception as exc: log.exception("materialize_finalize_failed", job_id=job_id) await _mark_failed(jid, f"Materialize failed: {type(exc).__name__}: {exc}") return {"job_id": job_id, "status": "failed"} await enqueue_enrich(job.track_id) log.info("materialize_complete", job_id=job_id, track_id=str(job.track_id)) return {"job_id": job_id, "status": "done", "track_id": str(job.track_id)} async def _materialize_result(jid: uuid.UUID, track_id: uuid.UUID, result: DownloadResult) -> None: """Store the downloaded file and fill in the placeholder track in place.""" key = f"tracks/{str(track_id)[:2]}/{track_id}.{result.file_format}" storage = get_file_storage() try: await storage.save_file(key, result.path) async with session_scope() as session: job_repo = SqlAlchemyDownloadJobRepository(session) await job_repo.set_status(jid, status="enriching") tracks = SqlAlchemyTrackRepository(session) await tracks.materialize( track_id, storage_uri=key, file_format=result.file_format, file_size=result.file_size, bitrate=result.bitrate, ) await job_repo.set_status(jid, status="done", track_id=track_id) finally: with contextlib.suppress(Exception): await anyio.Path(result.path).unlink(missing_ok=True) async def _set_status(jid: uuid.UUID, status: str) -> None: async with session_scope() as session: await SqlAlchemyDownloadJobRepository(session).set_status(jid, status=status)