e45e578f54
Search results now report whether a hit is already saved (in_library,
track_id, availability). New RemoteLibraryService backs POST
/tracks/remote (idempotent placeholder save) and POST
/tracks/{id}/materialize (on-demand fetch via a new materialize_track
arq task, reusing in-flight jobs).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
75 lines
3.0 KiB
Python
75 lines
3.0 KiB
Python
"""Enqueue helper — submit a job to the arq queue from the request cycle.
|
|
|
|
A short-lived pool per call keeps things simple (enqueues are rare, admin-driven
|
|
actions). Redis being down degrades to a clean 503 rather than a crash
|
|
(graceful degradation)."""
|
|
|
|
import uuid
|
|
from typing import Any
|
|
|
|
from arq import create_pool
|
|
from arq.connections import RedisSettings
|
|
|
|
from app.core.config import get_settings
|
|
from app.core.logging import get_logger
|
|
from app.domain.errors import DependencyUnavailableError
|
|
|
|
log = get_logger("worker.queue")
|
|
|
|
|
|
async def enqueue(function: str, **kwargs: Any) -> str:
|
|
"""Enqueue ``function`` by name, returning the job id. Raises
|
|
:class:`DependencyUnavailableError` if the queue can't be reached."""
|
|
settings = get_settings()
|
|
try:
|
|
pool = await create_pool(RedisSettings.from_dsn(str(settings.redis_url)))
|
|
except Exception as exc:
|
|
raise DependencyUnavailableError("Task queue (Redis) is unavailable.") from exc
|
|
try:
|
|
job = await pool.enqueue_job(function, **kwargs)
|
|
finally:
|
|
await pool.aclose()
|
|
if job is None:
|
|
raise DependencyUnavailableError("Could not enqueue job.")
|
|
return str(job.job_id)
|
|
|
|
|
|
async def enqueue_download(job_id: uuid.UUID) -> None:
|
|
"""Best-effort enqueue of a download job for the worker.
|
|
|
|
The job row is already persisted as ``queued``, so this is a follow-up, not a
|
|
barrier: if the queue is unreachable we log and move on (graceful
|
|
degradation) — the job stays ``queued`` and can be retried later. Deferred a
|
|
few seconds so the request's DB transaction commits before the worker reads
|
|
the row (same reason as :func:`enqueue_enrich`)."""
|
|
try:
|
|
await enqueue("download_track", job_id=str(job_id), _defer_by=3)
|
|
except DependencyUnavailableError:
|
|
log.warning("download_enqueue_failed", job_id=str(job_id))
|
|
|
|
|
|
async def enqueue_materialize(job_id: uuid.UUID) -> None:
|
|
"""Best-effort enqueue of a materialize job for the worker (plan: Model C
|
|
lazy materialization). Same deferred-commit reasoning as
|
|
:func:`enqueue_download` — the job row stays ``queued`` and can be retried
|
|
if the queue is unreachable."""
|
|
try:
|
|
await enqueue("materialize_track", job_id=str(job_id), _defer_by=3)
|
|
except DependencyUnavailableError:
|
|
log.warning("materialize_enqueue_failed", job_id=str(job_id))
|
|
|
|
|
|
async def enqueue_enrich(track_id: uuid.UUID) -> None:
|
|
"""Best-effort enqueue of metadata enrichment for a freshly stored track.
|
|
|
|
The track is already persisted, so enrichment is a follow-up, not a barrier:
|
|
if the queue is unreachable we log and move on (graceful degradation). The
|
|
track stays ``metadata_status=pending`` and can be re-enriched later.
|
|
|
|
Deferred a few seconds so the caller's DB transaction is committed before the
|
|
worker looks the track up (the upload request commits only after it returns)."""
|
|
try:
|
|
await enqueue("enrich_track", track_id=str(track_id), _defer_by=5)
|
|
except DependencyUnavailableError:
|
|
log.warning("enrich_enqueue_failed", track_id=str(track_id))
|