Files
Senko-san 78007461e1
Docker Build & Publish / build (push) Successful in 2m39s
Docker Build & Publish / push (push) Failing after 36s
Docker Build & Publish / Prune old image versions (push) Has been skipped
feat(sources): YouTube Music search + download pipeline (§1C/§1E)
Pluggable fetch source: ytmusicapi search + yt-dlp download (cookies-file guard), DownloadJob entity/repo + DownloadService, download_task worker with exponential-backoff retries, and wired /search, /sources/{source}/search, and /downloads endpoints. Adds youtube_enabled/cookies config, yt-dlp+ytmusicapi deps, and the download_jobs.track_id migration. Snapshot also bundles in-progress storage/tracks/acoustid edits.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-14 14:04:33 +03:00

208 lines
7.6 KiB
Python

"""``youtube`` source — YouTube Music search + download (plan §5).
A *fetch* source: it searches YouTube Music (via ``ytmusicapi``, which returns
clean song/artist/album/duration rows) and downloads the chosen item with
``yt-dlp``. The two libraries are synchronous, so every call is bounced to a
worker thread (``anyio.to_thread``); the sync yt-dlp progress hook bridges back
to the async progress callback via ``anyio.from_thread``.
Both libraries are optional dependencies — if either is missing the source is
simply *unavailable* (it never crashes import or the registry; graceful
degradation per CLAUDE.md). The audio stream is stored **as-is** (YouTube serves
lossy Opus/AAC; re-encoding would be lossy→lossy, plan §6.6).
``source_id`` is the YouTube ``videoId`` — stable, so a re-download of the same
id is idempotent and dedups against an existing track.
"""
import functools
import tempfile
from collections.abc import Callable
from pathlib import Path
from typing import Any
import anyio
from app.core.logging import get_logger
from app.domain.ports import ProgressCallback
from app.domain.sources import (
KIND_FETCH,
DownloadResult,
RawMetadata,
SearchResult,
SourceInfo,
)
from app.infrastructure.db.models.enums import TrackSource
log = get_logger(__name__)
# Functions a caller may inject for testing (defaults do the real library work).
SearchFn = Callable[[str, int], list[dict[str, Any]]]
# (video_id, tmp_dir, progress_hook, cookies_path) -> normalized download dict
DownloadFn = Callable[[str, Path, Callable[[dict[str, Any]], None], Path | None], dict[str, Any]]
def _libs_available() -> bool:
try:
import yt_dlp # noqa: F401
import ytmusicapi # noqa: F401
except ImportError:
return False
return True
def _watch_url(video_id: str) -> str:
return f"https://music.youtube.com/watch?v={video_id}"
class YouTubeMusicSource:
"""Implements :class:`app.domain.ports.SearchableSource` and
:class:`~app.domain.ports.FetchableSource`."""
name = TrackSource.YOUTUBE.value
def __init__(
self,
*,
cookies_path: Path | None = None,
tmp_dir: Path | None = None,
search_fn: SearchFn | None = None,
download_fn: DownloadFn | None = None,
) -> None:
self._cookies_path = cookies_path
self._tmp_dir = tmp_dir
self._search_fn = search_fn or _default_search
self._download_fn = download_fn or _default_download
# Only the real library path needs the deps; an injected fn is self-contained.
self._injected = search_fn is not None or download_fn is not None
def info(self) -> SourceInfo:
return SourceInfo(
name=self.name,
label="YouTube Music",
kind=KIND_FETCH,
available=self.is_available(),
)
def is_available(self) -> bool:
return True if self._injected else _libs_available()
async def search(self, query: str, *, limit: int) -> list[SearchResult]:
query = query.strip()
if not query:
return []
try:
rows = await anyio.to_thread.run_sync(functools.partial(self._search_fn, query, limit))
except Exception:
# No results / service down → degrade to empty (plan §5, CLAUDE.md).
log.warning("ytm_search_failed", query=query)
return []
return [r for r in (self._to_result(row) for row in rows) if r is not None]
async def fetch(
self, source_id: str, *, on_progress: ProgressCallback | None = None
) -> DownloadResult:
tmp_dir = self._tmp_dir or Path(tempfile.gettempdir())
def hook(d: dict[str, Any]) -> None:
if on_progress is None or d.get("status") != "downloading":
return
total = d.get("total_bytes") or d.get("total_bytes_estimate")
done = d.get("downloaded_bytes")
if not total or done is None:
return
# Cap below 1.0 — the job only reaches 1.0 once stored + imported.
frac = min(done / total, 0.99)
# Bridge sync hook (worker thread) → async callback (event loop).
anyio.from_thread.run(on_progress, frac)
def _run() -> dict[str, Any]:
return self._download_fn(source_id, tmp_dir, hook, self._cookies_path)
info = await anyio.to_thread.run_sync(_run)
path = Path(info["filepath"])
stat = await anyio.Path(path).stat()
return DownloadResult(
source_id=source_id,
path=path,
file_format=info["file_format"],
file_size=stat.st_size,
bitrate=info.get("bitrate"),
suggested_title=info.get("title") or source_id,
)
async def get_metadata(self, source_id: str) -> RawMetadata | None:
# The search result already carries a usable title/artist, and the
# canonical metadata comes from enrichment (§6.2). A dedicated lookup is
# an optional refinement — skipped for now (returns None gracefully).
return None
def _to_result(self, row: dict[str, Any]) -> SearchResult | None:
video_id = row.get("videoId")
if not video_id:
return None # non-playable row (e.g. a video without audio id)
artists = row.get("artists") or []
artist = ", ".join(a["name"] for a in artists if a.get("name")) or None
album = (row.get("album") or {}).get("name") if isinstance(row.get("album"), dict) else None
thumbnails = row.get("thumbnails") or []
thumbnail = thumbnails[-1].get("url") if thumbnails else None
return SearchResult(
source=self.name,
source_id=str(video_id),
title=row.get("title") or "Unknown",
artist=artist,
album=album,
duration_seconds=row.get("duration_seconds"),
thumbnail_url=thumbnail,
raw=row,
)
def _default_search(query: str, limit: int) -> list[dict[str, Any]]:
"""Real ytmusicapi search (songs only). Runs in a worker thread."""
from ytmusicapi import YTMusic
yt = YTMusic() # unauthenticated: public search needs no login
results: list[dict[str, Any]] = yt.search(query, filter="songs", limit=limit)
return results[:limit]
def _default_download(
video_id: str,
tmp_dir: Path,
progress_hook: Callable[[dict[str, Any]], None],
cookies_path: Path | None,
) -> dict[str, Any]:
"""Real yt-dlp download of the best audio stream. Runs in a worker thread.
Stores the original stream (no transcode — plan §6.3/§6.6). Returns a
normalized dict the adapter maps to :class:`DownloadResult`.
"""
from yt_dlp import YoutubeDL
opts: dict[str, Any] = {
"format": "bestaudio/best",
"outtmpl": str(tmp_dir / "%(id)s.%(ext)s"),
"quiet": True,
"no_warnings": True,
"noprogress": True,
"progress_hooks": [progress_hook],
}
# Use cookies only when the file is actually present: the path can be set
# unconditionally (e.g. a mounted volume that may be empty) and downloads
# still work without it — cookies just unlock age/region-restricted items.
if cookies_path is not None and cookies_path.is_file():
opts["cookiefile"] = str(cookies_path)
with YoutubeDL(opts) as ydl:
info = ydl.extract_info(_watch_url(video_id), download=True)
filepath = Path(ydl.prepare_filename(info))
abr = info.get("abr")
return {
"filepath": filepath,
"file_format": filepath.suffix.lstrip(".").lower() or "m4a",
"bitrate": int(abr) if abr else None,
"title": info.get("title"),
}