Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 78007461e1 | |||
| ea880edd57 |
@@ -0,0 +1,47 @@
|
|||||||
|
"""download_jobs: link finished job to its imported track
|
||||||
|
|
||||||
|
Revision ID: 20260614_dl_track_id
|
||||||
|
Revises: 20260613_enrich_outcome
|
||||||
|
Create Date: 2026-06-14 10:00:00.000000
|
||||||
|
|
||||||
|
Adds ``download_jobs.track_id`` (nullable FK → ``tracks.id``) so a completed
|
||||||
|
download can point at the library track it produced — the §A5 download manager
|
||||||
|
links a "done" job to the track, and re-runs can tell a job already imported
|
||||||
|
(plan §6.1).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from collections.abc import Sequence
|
||||||
|
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from alembic import op
|
||||||
|
|
||||||
|
revision: str = "20260614_dl_track_id"
|
||||||
|
down_revision: str | None = "20260613_enrich_outcome"
|
||||||
|
branch_labels: str | Sequence[str] | None = None
|
||||||
|
depends_on: str | Sequence[str] | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
op.add_column(
|
||||||
|
"download_jobs",
|
||||||
|
sa.Column("track_id", sa.Uuid(), nullable=True),
|
||||||
|
)
|
||||||
|
op.create_foreign_key(
|
||||||
|
op.f("fk_download_jobs_track_id_tracks"),
|
||||||
|
"download_jobs",
|
||||||
|
"tracks",
|
||||||
|
["track_id"],
|
||||||
|
["id"],
|
||||||
|
ondelete="SET NULL",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
op.drop_constraint(
|
||||||
|
op.f("fk_download_jobs_track_id_tracks"),
|
||||||
|
"download_jobs",
|
||||||
|
type_="foreignkey",
|
||||||
|
)
|
||||||
|
op.drop_column("download_jobs", "track_id")
|
||||||
+16
-4
@@ -15,6 +15,7 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
from app.application.auth_service import AuthService
|
from app.application.auth_service import AuthService
|
||||||
|
from app.application.download_service import DownloadService
|
||||||
from app.application.metadata_service import MetadataEnrichmentService
|
from app.application.metadata_service import MetadataEnrichmentService
|
||||||
from app.application.streaming_service import StreamingService
|
from app.application.streaming_service import StreamingService
|
||||||
from app.application.subsonic_auth_service import SubsonicAuthService
|
from app.application.subsonic_auth_service import SubsonicAuthService
|
||||||
@@ -29,6 +30,7 @@ from app.infrastructure.db import get_sessionmaker
|
|||||||
from app.infrastructure.db.repositories import (
|
from app.infrastructure.db.repositories import (
|
||||||
SqlAlchemyAlbumRepository,
|
SqlAlchemyAlbumRepository,
|
||||||
SqlAlchemyArtistRepository,
|
SqlAlchemyArtistRepository,
|
||||||
|
SqlAlchemyDownloadJobRepository,
|
||||||
SqlAlchemyHistoryRepository,
|
SqlAlchemyHistoryRepository,
|
||||||
SqlAlchemyLikeRepository,
|
SqlAlchemyLikeRepository,
|
||||||
SqlAlchemyPlaylistRepository,
|
SqlAlchemyPlaylistRepository,
|
||||||
@@ -41,7 +43,7 @@ from app.infrastructure.metadata.fingerprint import FpcalcFingerprinter
|
|||||||
from app.infrastructure.metadata.tags import MutagenTagReader
|
from app.infrastructure.metadata.tags import MutagenTagReader
|
||||||
from app.infrastructure.sources.registry import SourceRegistry, build_source_registry
|
from app.infrastructure.sources.registry import SourceRegistry, build_source_registry
|
||||||
from app.infrastructure.storage.provider import get_file_storage
|
from app.infrastructure.storage.provider import get_file_storage
|
||||||
from app.workers.queue import enqueue_enrich
|
from app.workers.queue import enqueue_download, enqueue_enrich
|
||||||
|
|
||||||
|
|
||||||
async def get_session() -> AsyncIterator[AsyncSession]:
|
async def get_session() -> AsyncIterator[AsyncSession]:
|
||||||
@@ -136,9 +138,7 @@ def get_streaming_service(session: SessionDep, storage: FileStorageDep) -> Strea
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_metadata_service(
|
def get_metadata_service(session: SessionDep, storage: FileStorageDep) -> MetadataEnrichmentService:
|
||||||
session: SessionDep, storage: FileStorageDep
|
|
||||||
) -> MetadataEnrichmentService:
|
|
||||||
"""Wires the §6.2 fingerprint/AcoustID adapters for read-only, inline use
|
"""Wires the §6.2 fingerprint/AcoustID adapters for read-only, inline use
|
||||||
(the metadata editor's "find matches" — §A7). The full pipeline (incl.
|
(the metadata editor's "find matches" — §A7). The full pipeline (incl.
|
||||||
cover art) stays in the worker (`tasks/enrich_task.py`)."""
|
cover art) stays in the worker (`tasks/enrich_task.py`)."""
|
||||||
@@ -161,9 +161,21 @@ def get_metadata_service(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_download_service(session: SessionDep, storage: FileStorageDep) -> DownloadService:
|
||||||
|
return DownloadService(
|
||||||
|
jobs=SqlAlchemyDownloadJobRepository(session),
|
||||||
|
tracks=SqlAlchemyTrackRepository(session),
|
||||||
|
artists=SqlAlchemyArtistRepository(session),
|
||||||
|
storage=storage,
|
||||||
|
enqueue_download=enqueue_download,
|
||||||
|
enqueue_enrich=enqueue_enrich,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
UploadServiceDep = Annotated[UploadService, Depends(get_upload_service)]
|
UploadServiceDep = Annotated[UploadService, Depends(get_upload_service)]
|
||||||
StreamingServiceDep = Annotated[StreamingService, Depends(get_streaming_service)]
|
StreamingServiceDep = Annotated[StreamingService, Depends(get_streaming_service)]
|
||||||
MetadataServiceDep = Annotated[MetadataEnrichmentService, Depends(get_metadata_service)]
|
MetadataServiceDep = Annotated[MetadataEnrichmentService, Depends(get_metadata_service)]
|
||||||
|
DownloadServiceDep = Annotated[DownloadService, Depends(get_download_service)]
|
||||||
|
|
||||||
|
|
||||||
# -- library repository deps ---------------------------------------------------
|
# -- library repository deps ---------------------------------------------------
|
||||||
|
|||||||
@@ -0,0 +1,59 @@
|
|||||||
|
"""Schemas for the download job endpoints (§A5 download manager)."""
|
||||||
|
|
||||||
|
import datetime as dt
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
from app.domain.entities.download import DownloadJob
|
||||||
|
|
||||||
|
|
||||||
|
class DownloadCreate(BaseModel):
|
||||||
|
"""Request to download an item discovered on a fetch source."""
|
||||||
|
|
||||||
|
source: str
|
||||||
|
source_id: str = Field(min_length=1)
|
||||||
|
# Optional free-text the result came from — stored for display only.
|
||||||
|
query: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class DownloadJobOut(BaseModel):
|
||||||
|
id: uuid.UUID
|
||||||
|
source: str
|
||||||
|
source_id: str | None
|
||||||
|
query: str | None
|
||||||
|
status: str
|
||||||
|
progress: float
|
||||||
|
error_message: str | None
|
||||||
|
retry_count: int
|
||||||
|
track_id: uuid.UUID | None
|
||||||
|
created_at: dt.datetime
|
||||||
|
updated_at: dt.datetime
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_entity(cls, job: DownloadJob) -> DownloadJobOut:
|
||||||
|
return cls(
|
||||||
|
id=job.id,
|
||||||
|
source=job.source,
|
||||||
|
source_id=job.source_id,
|
||||||
|
query=job.query,
|
||||||
|
status=job.status,
|
||||||
|
progress=job.progress,
|
||||||
|
error_message=job.error_message,
|
||||||
|
retry_count=job.retry_count,
|
||||||
|
track_id=job.track_id,
|
||||||
|
created_at=job.created_at,
|
||||||
|
updated_at=job.updated_at,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class DownloadCreateResponse(BaseModel):
|
||||||
|
"""Result of requesting a download.
|
||||||
|
|
||||||
|
``already_in_library`` → the item was already imported (``track_id`` set, no
|
||||||
|
job). Otherwise ``job`` describes the queued (or already in-flight) download.
|
||||||
|
"""
|
||||||
|
|
||||||
|
already_in_library: bool
|
||||||
|
track_id: uuid.UUID | None
|
||||||
|
job: DownloadJobOut | None
|
||||||
@@ -0,0 +1,35 @@
|
|||||||
|
"""Schemas for searching external (fetch) sources — the §A4 discover screen."""
|
||||||
|
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from app.domain.sources import SearchResult
|
||||||
|
|
||||||
|
|
||||||
|
class ExternalSearchResultOut(BaseModel):
|
||||||
|
source: str
|
||||||
|
source_id: str
|
||||||
|
title: str
|
||||||
|
artist: str | None
|
||||||
|
album: str | None
|
||||||
|
duration_seconds: int | None
|
||||||
|
thumbnail_url: str | None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_entity(cls, r: SearchResult) -> ExternalSearchResultOut:
|
||||||
|
return cls(
|
||||||
|
source=r.source,
|
||||||
|
source_id=r.source_id,
|
||||||
|
title=r.title,
|
||||||
|
artist=r.artist,
|
||||||
|
album=r.album,
|
||||||
|
duration_seconds=r.duration_seconds,
|
||||||
|
thumbnail_url=r.thumbnail_url,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ExternalSearchResponse(BaseModel):
|
||||||
|
"""Flat list of hits across one or more searchable sources, plus the names of
|
||||||
|
sources that were unavailable (so the UI can show a soft warning)."""
|
||||||
|
|
||||||
|
results: list[ExternalSearchResultOut]
|
||||||
|
searched_sources: list[str]
|
||||||
+60
-18
@@ -1,36 +1,78 @@
|
|||||||
"""Download job endpoints. Heavy work is dispatched to arq workers."""
|
"""Download job endpoints (§A5). Heavy work is dispatched to arq workers — these
|
||||||
|
handlers only create/inspect/cancel/retry job records."""
|
||||||
|
|
||||||
import uuid
|
import uuid
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from fastapi import APIRouter
|
from fastapi import APIRouter, Query, Response
|
||||||
|
|
||||||
|
from app.api.deps import CurrentUser, DownloadServiceDep
|
||||||
|
from app.api.schemas.download import DownloadCreate, DownloadCreateResponse, DownloadJobOut
|
||||||
|
from app.api.schemas.pagination import PagedResponse
|
||||||
|
|
||||||
router = APIRouter(prefix="/downloads", tags=["downloads"])
|
router = APIRouter(prefix="/downloads", tags=["downloads"])
|
||||||
|
|
||||||
|
|
||||||
@router.get("")
|
@router.get("")
|
||||||
async def list_downloads() -> Any: ...
|
async def list_downloads(
|
||||||
|
service: DownloadServiceDep,
|
||||||
|
user: CurrentUser,
|
||||||
|
status: str | None = Query(default=None),
|
||||||
|
mine: bool = Query(default=False),
|
||||||
|
limit: int = Query(50, ge=1, le=200),
|
||||||
|
offset: int = Query(0, ge=0),
|
||||||
|
) -> PagedResponse[DownloadJobOut]:
|
||||||
|
jobs, total = await service.list(
|
||||||
|
requested_by=user.id if mine else None,
|
||||||
|
status=status,
|
||||||
|
limit=limit,
|
||||||
|
offset=offset,
|
||||||
|
)
|
||||||
|
return PagedResponse(
|
||||||
|
items=[DownloadJobOut.from_entity(j) for j in jobs],
|
||||||
|
total=total,
|
||||||
|
limit=limit,
|
||||||
|
offset=offset,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.post("")
|
@router.post("", status_code=202)
|
||||||
async def create_download() -> Any: ...
|
async def create_download(
|
||||||
|
body: DownloadCreate,
|
||||||
|
service: DownloadServiceDep,
|
||||||
|
user: CurrentUser,
|
||||||
|
) -> DownloadCreateResponse:
|
||||||
|
result = await service.request(
|
||||||
|
source=body.source,
|
||||||
|
source_id=body.source_id,
|
||||||
|
query=body.query,
|
||||||
|
requested_by=user.id,
|
||||||
|
)
|
||||||
|
return DownloadCreateResponse(
|
||||||
|
already_in_library=result.already_in_library,
|
||||||
|
track_id=result.track_id,
|
||||||
|
job=DownloadJobOut.from_entity(result.job) if result.job is not None else None,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.get("/{job_id}")
|
@router.get("/{job_id}")
|
||||||
async def get_download(job_id: uuid.UUID) -> Any: ...
|
async def get_download(
|
||||||
|
job_id: uuid.UUID, service: DownloadServiceDep, _: CurrentUser
|
||||||
|
) -> DownloadJobOut:
|
||||||
|
job = await service.get(job_id)
|
||||||
|
return DownloadJobOut.from_entity(job)
|
||||||
|
|
||||||
|
|
||||||
@router.delete("/{job_id}")
|
@router.delete("/{job_id}", status_code=204)
|
||||||
async def cancel_download(job_id: uuid.UUID) -> Any: ...
|
async def cancel_download(
|
||||||
|
job_id: uuid.UUID, service: DownloadServiceDep, _: CurrentUser
|
||||||
|
) -> Response:
|
||||||
|
await service.cancel(job_id)
|
||||||
|
return Response(status_code=204)
|
||||||
|
|
||||||
|
|
||||||
@router.post("/{job_id}/retry")
|
@router.post("/{job_id}/retry")
|
||||||
async def retry_download(job_id: uuid.UUID) -> Any: ...
|
async def retry_download(
|
||||||
|
job_id: uuid.UUID, service: DownloadServiceDep, _: CurrentUser
|
||||||
|
) -> DownloadJobOut:
|
||||||
@router.post("/pause")
|
job = await service.retry(job_id)
|
||||||
async def pause_downloads() -> Any: ...
|
return DownloadJobOut.from_entity(job)
|
||||||
|
|
||||||
|
|
||||||
@router.post("/resume")
|
|
||||||
async def resume_downloads() -> Any: ...
|
|
||||||
|
|||||||
+22
-4
@@ -1,12 +1,11 @@
|
|||||||
"""Search endpoints: global and library-scoped."""
|
"""Search endpoints: global and library-scoped."""
|
||||||
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from fastapi import APIRouter, Query
|
from fastapi import APIRouter, Query
|
||||||
|
|
||||||
from app.api.deps import AlbumRepoDep, ArtistRepoDep, CurrentUser, TrackRepoDep
|
from app.api.deps import AlbumRepoDep, ArtistRepoDep, CurrentUser, SourceRegistryDep, TrackRepoDep
|
||||||
from app.api.schemas.album import AlbumOut
|
from app.api.schemas.album import AlbumOut
|
||||||
from app.api.schemas.artist import ArtistOut
|
from app.api.schemas.artist import ArtistOut
|
||||||
|
from app.api.schemas.external_search import ExternalSearchResponse, ExternalSearchResultOut
|
||||||
from app.api.schemas.search import LibrarySearchResponse
|
from app.api.schemas.search import LibrarySearchResponse
|
||||||
from app.api.schemas.track import TrackOut
|
from app.api.schemas.track import TrackOut
|
||||||
from app.api.v1.albums import _build_album_out
|
from app.api.v1.albums import _build_album_out
|
||||||
@@ -16,7 +15,26 @@ router = APIRouter(prefix="/search", tags=["search"])
|
|||||||
|
|
||||||
|
|
||||||
@router.get("")
|
@router.get("")
|
||||||
async def search(_: CurrentUser) -> Any: ...
|
async def search(
|
||||||
|
_: CurrentUser,
|
||||||
|
registry: SourceRegistryDep,
|
||||||
|
q: str = Query(min_length=1),
|
||||||
|
limit: int = Query(20, ge=1, le=50),
|
||||||
|
) -> ExternalSearchResponse:
|
||||||
|
"""Search every available fetch source and merge the hits (§A4 discover).
|
||||||
|
|
||||||
|
A source that is down contributes nothing rather than failing the whole
|
||||||
|
request (graceful degradation); only available sources are reported as
|
||||||
|
searched."""
|
||||||
|
results: list[ExternalSearchResultOut] = []
|
||||||
|
searched: list[str] = []
|
||||||
|
for backend in registry.searchables():
|
||||||
|
if not backend.is_available():
|
||||||
|
continue
|
||||||
|
searched.append(backend.name)
|
||||||
|
hits = await backend.search(q, limit=limit)
|
||||||
|
results.extend(ExternalSearchResultOut.from_entity(h) for h in hits)
|
||||||
|
return ExternalSearchResponse(results=results, searched_sources=searched)
|
||||||
|
|
||||||
|
|
||||||
@router.get("/library")
|
@router.get("/library")
|
||||||
|
|||||||
+20
-9
@@ -1,14 +1,13 @@
|
|||||||
"""External source endpoints: enumerate sources and trigger imports.
|
"""External source endpoints: enumerate sources, search, and trigger imports.
|
||||||
|
|
||||||
Listing/health are read-only (any authenticated user). Scanning a source is an
|
Listing/health/search are read-only (any authenticated user). Scanning a source
|
||||||
admin action and runs in a worker — the endpoint only enqueues it.
|
is an admin action and runs in a worker — the endpoint only enqueues it.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from typing import Any
|
from fastapi import APIRouter, Query
|
||||||
|
|
||||||
from fastapi import APIRouter
|
|
||||||
|
|
||||||
from app.api.deps import CurrentUser, SourceRegistryDep, SuperUser
|
from app.api.deps import CurrentUser, SourceRegistryDep, SuperUser
|
||||||
|
from app.api.schemas.external_search import ExternalSearchResponse, ExternalSearchResultOut
|
||||||
from app.api.schemas.source import ScanResponse, SourceHealthOut, SourceInfoOut
|
from app.api.schemas.source import ScanResponse, SourceHealthOut, SourceInfoOut
|
||||||
from app.domain.errors import DependencyUnavailableError
|
from app.domain.errors import DependencyUnavailableError
|
||||||
from app.workers.queue import enqueue
|
from app.workers.queue import enqueue
|
||||||
@@ -39,6 +38,18 @@ async def source_health(
|
|||||||
|
|
||||||
|
|
||||||
@router.get("/{source}/search")
|
@router.get("/{source}/search")
|
||||||
async def search_source(source: str, _: CurrentUser) -> Any:
|
async def search_source(
|
||||||
# Search is for fetch-style sources (youtube, …) — not yet implemented.
|
source: str,
|
||||||
...
|
_: CurrentUser,
|
||||||
|
registry: SourceRegistryDep,
|
||||||
|
q: str = Query(min_length=1),
|
||||||
|
limit: int = Query(20, ge=1, le=50),
|
||||||
|
) -> ExternalSearchResponse:
|
||||||
|
backend = registry.searchable(source) # 404 if unknown, 422 if not searchable
|
||||||
|
if not backend.is_available():
|
||||||
|
raise DependencyUnavailableError(f"Source {source!r} is not available.")
|
||||||
|
results = await backend.search(q, limit=limit)
|
||||||
|
return ExternalSearchResponse(
|
||||||
|
results=[ExternalSearchResultOut.from_entity(r) for r in results],
|
||||||
|
searched_sources=[source],
|
||||||
|
)
|
||||||
|
|||||||
@@ -63,8 +63,7 @@ async def get_storage_stats(
|
|||||||
by_metadata_status=stats.by_metadata_status,
|
by_metadata_status=stats.by_metadata_status,
|
||||||
by_source=stats.by_source,
|
by_source=stats.by_source,
|
||||||
top_genres=[
|
top_genres=[
|
||||||
GenreCountOut(genre=genre, track_count=count)
|
GenreCountOut(genre=genre, track_count=count) for genre, count in genres[:_TOP_GENRES]
|
||||||
for genre, count in genres[:_TOP_GENRES]
|
|
||||||
],
|
],
|
||||||
disk=DiskUsageOut(total=disk.total, used=disk.used, free=disk.free) if disk else None,
|
disk=DiskUsageOut(total=disk.total, used=disk.used, free=disk.free) if disk else None,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -71,6 +71,7 @@ async def list_tracks(
|
|||||||
artist_id: uuid.UUID | None = None,
|
artist_id: uuid.UUID | None = None,
|
||||||
album_id: uuid.UUID | None = None,
|
album_id: uuid.UUID | None = None,
|
||||||
q: str | None = None,
|
q: str | None = None,
|
||||||
|
source: str | None = Query(None, max_length=32),
|
||||||
sort_by: str = Query("created_at", pattern="^(title|created_at|artist)$"),
|
sort_by: str = Query("created_at", pattern="^(title|created_at|artist)$"),
|
||||||
order: str = Query("desc", pattern="^(asc|desc)$"),
|
order: str = Query("desc", pattern="^(asc|desc)$"),
|
||||||
limit: int = Query(50, ge=1, le=200),
|
limit: int = Query(50, ge=1, le=200),
|
||||||
@@ -80,12 +81,13 @@ async def list_tracks(
|
|||||||
artist_id=artist_id,
|
artist_id=artist_id,
|
||||||
album_id=album_id,
|
album_id=album_id,
|
||||||
q=q,
|
q=q,
|
||||||
|
source=source,
|
||||||
sort_by=sort_by,
|
sort_by=sort_by,
|
||||||
order=order,
|
order=order,
|
||||||
limit=limit,
|
limit=limit,
|
||||||
offset=offset,
|
offset=offset,
|
||||||
)
|
)
|
||||||
total = await track_repo.count(artist_id=artist_id, album_id=album_id, q=q)
|
total = await track_repo.count(artist_id=artist_id, album_id=album_id, q=q, source=source)
|
||||||
|
|
||||||
artist_ids = list({t.artist_id for t in tracks})
|
artist_ids = list({t.artist_id for t in tracks})
|
||||||
album_ids = list({t.album_id for t in tracks if t.album_id is not None})
|
album_ids = list({t.album_id for t in tracks if t.album_id is not None})
|
||||||
|
|||||||
@@ -0,0 +1,183 @@
|
|||||||
|
"""DownloadService — request external downloads and import their results.
|
||||||
|
|
||||||
|
Two roles (plan §6.1):
|
||||||
|
|
||||||
|
* **Request side** (HTTP): validate + dedup a download request, create a
|
||||||
|
``queued`` job, and enqueue the worker. Dedup is on ``(source, source_id)``
|
||||||
|
against both the library (already imported) and in-flight jobs (a double-click
|
||||||
|
must not queue twice) — idempotency per CLAUDE.md.
|
||||||
|
* **Worker side**: ``store_result`` turns a backend's :class:`DownloadResult`
|
||||||
|
into a managed file + minimal ``pending`` track (sibling of
|
||||||
|
:class:`~app.application.import_service.LibraryImportService`); enrichment
|
||||||
|
(§6.2) fills the rest.
|
||||||
|
|
||||||
|
The fingerprint-level dedup (a different id that turns out to be the same audio)
|
||||||
|
happens later in enrichment, where the fingerprint is computed.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
import uuid
|
||||||
|
from collections.abc import Awaitable, Callable
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
import anyio
|
||||||
|
|
||||||
|
from app.core.logging import get_logger
|
||||||
|
from app.domain.entities.download import DownloadJob
|
||||||
|
from app.domain.errors import NotFoundError, ValidationError
|
||||||
|
from app.domain.ports import (
|
||||||
|
ArtistRepository,
|
||||||
|
DownloadJobRepository,
|
||||||
|
FileStorage,
|
||||||
|
TrackRepository,
|
||||||
|
)
|
||||||
|
from app.domain.sources import DownloadResult
|
||||||
|
|
||||||
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
_UNKNOWN_ARTIST = "Unknown Artist"
|
||||||
|
|
||||||
|
# (job_id) -> None — enqueue the download worker, deferred so the job row is
|
||||||
|
# committed before the worker reads it (same pattern as enrich).
|
||||||
|
DownloadEnqueuer = Callable[[uuid.UUID], Awaitable[None]]
|
||||||
|
EnrichEnqueuer = Callable[[uuid.UUID], Awaitable[None]]
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class DownloadRequest:
|
||||||
|
"""Outcome of asking for a download.
|
||||||
|
|
||||||
|
Exactly one of the three states holds: the item is already in the library
|
||||||
|
(``track_id`` set, ``already_in_library``), a job already covers it / was
|
||||||
|
just created (``job`` set), so the UI can route to the download manager.
|
||||||
|
"""
|
||||||
|
|
||||||
|
job: DownloadJob | None
|
||||||
|
track_id: uuid.UUID | None
|
||||||
|
already_in_library: bool
|
||||||
|
|
||||||
|
|
||||||
|
class DownloadService:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
jobs: DownloadJobRepository,
|
||||||
|
tracks: TrackRepository,
|
||||||
|
artists: ArtistRepository,
|
||||||
|
storage: FileStorage,
|
||||||
|
enqueue_download: DownloadEnqueuer | None = None,
|
||||||
|
enqueue_enrich: EnrichEnqueuer | None = None,
|
||||||
|
) -> None:
|
||||||
|
self._jobs = jobs
|
||||||
|
self._tracks = tracks
|
||||||
|
self._artists = artists
|
||||||
|
self._storage = storage
|
||||||
|
self._enqueue_download = enqueue_download
|
||||||
|
self._enqueue_enrich = enqueue_enrich
|
||||||
|
|
||||||
|
# -- request side ---------------------------------------------------------
|
||||||
|
async def request(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
source: str,
|
||||||
|
source_id: str,
|
||||||
|
query: str | None,
|
||||||
|
requested_by: uuid.UUID | None,
|
||||||
|
) -> DownloadRequest:
|
||||||
|
source_id = source_id.strip()
|
||||||
|
if not source_id:
|
||||||
|
raise ValidationError("A source_id is required to download.")
|
||||||
|
|
||||||
|
existing = await self._tracks.get_by_source(source, source_id)
|
||||||
|
if existing is not None:
|
||||||
|
return DownloadRequest(job=None, track_id=existing.id, already_in_library=True)
|
||||||
|
|
||||||
|
active = await self._jobs.get_active_for_source(source, source_id)
|
||||||
|
if active is not None:
|
||||||
|
return DownloadRequest(job=active, track_id=None, already_in_library=False)
|
||||||
|
|
||||||
|
job = await self._jobs.add(
|
||||||
|
source=source,
|
||||||
|
source_id=source_id,
|
||||||
|
query=query,
|
||||||
|
requested_by=requested_by,
|
||||||
|
)
|
||||||
|
if self._enqueue_download is not None:
|
||||||
|
await self._enqueue_download(job.id)
|
||||||
|
return DownloadRequest(job=job, track_id=None, already_in_library=False)
|
||||||
|
|
||||||
|
async def list(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
requested_by: uuid.UUID | None,
|
||||||
|
status: str | None,
|
||||||
|
limit: int,
|
||||||
|
offset: int,
|
||||||
|
) -> tuple[list[DownloadJob], int]:
|
||||||
|
jobs = await self._jobs.list(
|
||||||
|
requested_by=requested_by, status=status, limit=limit, offset=offset
|
||||||
|
)
|
||||||
|
total = await self._jobs.count(requested_by=requested_by, status=status)
|
||||||
|
return jobs, total
|
||||||
|
|
||||||
|
async def get(self, job_id: uuid.UUID) -> DownloadJob:
|
||||||
|
job = await self._jobs.get_by_id(job_id)
|
||||||
|
if job is None:
|
||||||
|
raise NotFoundError(f"Download job {job_id} not found.")
|
||||||
|
return job
|
||||||
|
|
||||||
|
async def cancel(self, job_id: uuid.UUID) -> None:
|
||||||
|
"""Remove the job record. True mid-flight cancellation of an in-progress
|
||||||
|
yt-dlp download is out of scope (MVP); the worker tolerates a vanished
|
||||||
|
job row (its status writes become no-ops)."""
|
||||||
|
job = await self._jobs.get_by_id(job_id)
|
||||||
|
if job is None:
|
||||||
|
raise NotFoundError(f"Download job {job_id} not found.")
|
||||||
|
await self._jobs.delete(job_id)
|
||||||
|
|
||||||
|
async def retry(self, job_id: uuid.UUID) -> DownloadJob:
|
||||||
|
job = await self.get(job_id)
|
||||||
|
await self._jobs.set_status(job_id, status="queued", error_message=None)
|
||||||
|
if self._enqueue_download is not None:
|
||||||
|
await self._enqueue_download(job_id)
|
||||||
|
refreshed = await self._jobs.get_by_id(job_id)
|
||||||
|
return refreshed if refreshed is not None else job
|
||||||
|
|
||||||
|
# -- worker side ----------------------------------------------------------
|
||||||
|
async def store_result(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
source: str,
|
||||||
|
result: DownloadResult,
|
||||||
|
requested_by: uuid.UUID | None,
|
||||||
|
) -> uuid.UUID:
|
||||||
|
"""Store a freshly downloaded file and create a minimal ``pending`` track.
|
||||||
|
|
||||||
|
Returns the new track id (the caller enqueues enrichment after commit).
|
||||||
|
The temp file produced by the backend is always removed."""
|
||||||
|
track_id = uuid.uuid4()
|
||||||
|
key = f"tracks/{str(track_id)[:2]}/{track_id}.{result.file_format}"
|
||||||
|
try:
|
||||||
|
await self._storage.save_file(key, result.path)
|
||||||
|
try:
|
||||||
|
artist = await self._artists.get_or_create(_UNKNOWN_ARTIST)
|
||||||
|
await self._tracks.add(
|
||||||
|
id=track_id,
|
||||||
|
title=result.suggested_title,
|
||||||
|
artist_id=artist.id,
|
||||||
|
storage_uri=key,
|
||||||
|
file_format=result.file_format,
|
||||||
|
file_size=result.file_size,
|
||||||
|
source=source,
|
||||||
|
source_id=result.source_id,
|
||||||
|
metadata_status="pending",
|
||||||
|
added_by=requested_by,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
await self._storage.delete(key)
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
await anyio.Path(result.path).unlink(missing_ok=True)
|
||||||
|
return track_id
|
||||||
@@ -71,6 +71,9 @@ class Settings(BaseSettings):
|
|||||||
media_path: Path = Path("/data/media")
|
media_path: Path = Path("/data/media")
|
||||||
transcode_cache_path: Path = Path("/data/transcode-cache")
|
transcode_cache_path: Path = Path("/data/transcode-cache")
|
||||||
max_parallel_downloads: int = 2
|
max_parallel_downloads: int = 2
|
||||||
|
# How many times the download worker retries a failed fetch (yt-dlp fails
|
||||||
|
# often) before marking the job ``failed`` — exponential backoff between tries.
|
||||||
|
download_max_retries: int = 3
|
||||||
storage_backend: Literal["local", "s3"] = "local"
|
storage_backend: Literal["local", "s3"] = "local"
|
||||||
upload_tmp_dir: Path | None = None
|
upload_tmp_dir: Path | None = None
|
||||||
|
|
||||||
@@ -100,6 +103,11 @@ class Settings(BaseSettings):
|
|||||||
# deployments should set their own contact email; see
|
# deployments should set their own contact email; see
|
||||||
# ``musicbrainz_user_agent`` below for how it's used.
|
# ``musicbrainz_user_agent`` below for how it's used.
|
||||||
musicbrainz_owner_email: str | None = None
|
musicbrainz_owner_email: str | None = None
|
||||||
|
# ``youtube`` fetch source (search + download via ytmusicapi/yt-dlp). Enabled
|
||||||
|
# by default; the source still reports unavailable if the libs aren't present.
|
||||||
|
youtube_enabled: bool = True
|
||||||
|
# Optional cookies file (Netscape format) for yt-dlp — lets it fetch
|
||||||
|
# age-restricted / region-locked items via an authenticated session.
|
||||||
youtube_cookies_path: Path | None = None
|
youtube_cookies_path: Path | None = None
|
||||||
|
|
||||||
# -- enrichment -------------------------------------------------------
|
# -- enrichment -------------------------------------------------------
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
from app.domain.entities.album import Album
|
from app.domain.entities.album import Album
|
||||||
from app.domain.entities.cover import CoverArt
|
from app.domain.entities.cover import CoverArt
|
||||||
|
from app.domain.entities.download import DownloadJob
|
||||||
from app.domain.entities.history import PlayHistoryEntry
|
from app.domain.entities.history import PlayHistoryEntry
|
||||||
from app.domain.entities.like import Like
|
from app.domain.entities.like import Like
|
||||||
from app.domain.entities.metadata import AudioTags, Fingerprint, RecordingMatch
|
from app.domain.entities.metadata import AudioTags, Fingerprint, RecordingMatch
|
||||||
@@ -22,6 +23,7 @@ __all__ = [
|
|||||||
"CoverArt",
|
"CoverArt",
|
||||||
"Credentials",
|
"Credentials",
|
||||||
"DiskUsage",
|
"DiskUsage",
|
||||||
|
"DownloadJob",
|
||||||
"Fingerprint",
|
"Fingerprint",
|
||||||
"FormatBreakdown",
|
"FormatBreakdown",
|
||||||
"LibraryStats",
|
"LibraryStats",
|
||||||
|
|||||||
@@ -0,0 +1,26 @@
|
|||||||
|
"""Download job domain entity (plan §6.1).
|
||||||
|
|
||||||
|
A queued fetch from an external source, tracked through its lifecycle so the UI
|
||||||
|
download manager (screen §A5) can show progress, errors, and retries. The
|
||||||
|
``status`` strings mirror :class:`~app.infrastructure.db.models.enums.DownloadStatus`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import datetime as dt
|
||||||
|
import uuid
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True, slots=True)
|
||||||
|
class DownloadJob:
|
||||||
|
id: uuid.UUID
|
||||||
|
source: str
|
||||||
|
source_id: str | None
|
||||||
|
query: str | None
|
||||||
|
requested_by: uuid.UUID | None
|
||||||
|
status: str
|
||||||
|
progress: float
|
||||||
|
error_message: str | None
|
||||||
|
retry_count: int
|
||||||
|
track_id: uuid.UUID | None
|
||||||
|
created_at: dt.datetime
|
||||||
|
updated_at: dt.datetime
|
||||||
+81
-2
@@ -7,7 +7,7 @@ are bound to these ports at the composition root (``app.api.deps``).
|
|||||||
|
|
||||||
import datetime as dt
|
import datetime as dt
|
||||||
import uuid
|
import uuid
|
||||||
from collections.abc import AsyncIterator, Iterator
|
from collections.abc import AsyncIterator, Awaitable, Callable, Iterator
|
||||||
from contextlib import AbstractAsyncContextManager
|
from contextlib import AbstractAsyncContextManager
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Protocol
|
from typing import Protocol
|
||||||
@@ -18,6 +18,7 @@ from app.domain.entities import (
|
|||||||
CoverArt,
|
CoverArt,
|
||||||
Credentials,
|
Credentials,
|
||||||
DiskUsage,
|
DiskUsage,
|
||||||
|
DownloadJob,
|
||||||
Fingerprint,
|
Fingerprint,
|
||||||
LibraryStats,
|
LibraryStats,
|
||||||
Like,
|
Like,
|
||||||
@@ -29,9 +30,14 @@ from app.domain.entities import (
|
|||||||
User,
|
User,
|
||||||
)
|
)
|
||||||
from app.domain.entities.track import Artist, Track
|
from app.domain.entities.track import Artist, Track
|
||||||
from app.domain.sources import SourceFile, SourceInfo
|
from app.domain.sources import DownloadResult, RawMetadata, SearchResult, SourceFile, SourceInfo
|
||||||
from app.domain.tokens import IssuedToken, TokenClaims, TokenType
|
from app.domain.tokens import IssuedToken, TokenClaims, TokenType
|
||||||
|
|
||||||
|
# A fetch source reports download progress as a fraction in [0.0, 1.0]. It's a
|
||||||
|
# plain callback (not a port) because it's an inversion of control supplied per
|
||||||
|
# call by the worker, which persists it to the download job.
|
||||||
|
ProgressCallback = Callable[[float], Awaitable[None]]
|
||||||
|
|
||||||
|
|
||||||
class UserRepository(Protocol):
|
class UserRepository(Protocol):
|
||||||
async def get_by_id(self, user_id: uuid.UUID) -> User | None: ...
|
async def get_by_id(self, user_id: uuid.UUID) -> User | None: ...
|
||||||
@@ -145,6 +151,7 @@ class TrackRepository(Protocol):
|
|||||||
artist_id: uuid.UUID | None,
|
artist_id: uuid.UUID | None,
|
||||||
album_id: uuid.UUID | None,
|
album_id: uuid.UUID | None,
|
||||||
q: str | None,
|
q: str | None,
|
||||||
|
source: str | None = None,
|
||||||
sort_by: str,
|
sort_by: str,
|
||||||
order: str,
|
order: str,
|
||||||
limit: int,
|
limit: int,
|
||||||
@@ -156,6 +163,7 @@ class TrackRepository(Protocol):
|
|||||||
artist_id: uuid.UUID | None,
|
artist_id: uuid.UUID | None,
|
||||||
album_id: uuid.UUID | None,
|
album_id: uuid.UUID | None,
|
||||||
q: str | None,
|
q: str | None,
|
||||||
|
source: str | None = None,
|
||||||
) -> int: ...
|
) -> int: ...
|
||||||
async def update(
|
async def update(
|
||||||
self,
|
self,
|
||||||
@@ -273,6 +281,54 @@ class HistoryRepository(Protocol):
|
|||||||
async def count(self, *, user_id: uuid.UUID) -> int: ...
|
async def count(self, *, user_id: uuid.UUID) -> int: ...
|
||||||
|
|
||||||
|
|
||||||
|
class DownloadJobRepository(Protocol):
|
||||||
|
"""Persistence for download jobs (plan §6.1). Drives the §A5 download manager
|
||||||
|
and the worker's retry/backoff loop."""
|
||||||
|
|
||||||
|
async def add(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
source: str,
|
||||||
|
source_id: str | None,
|
||||||
|
query: str | None,
|
||||||
|
requested_by: uuid.UUID | None,
|
||||||
|
) -> DownloadJob: ...
|
||||||
|
async def get_by_id(self, job_id: uuid.UUID) -> DownloadJob | None: ...
|
||||||
|
async def get_active_for_source(self, source: str, source_id: str) -> DownloadJob | None:
|
||||||
|
"""An unfinished (queued/downloading/enriching) job for the same item, if
|
||||||
|
any — used to dedup before enqueuing so a double-click can't queue twice."""
|
||||||
|
...
|
||||||
|
|
||||||
|
async def list(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
requested_by: uuid.UUID | None,
|
||||||
|
status: str | None,
|
||||||
|
limit: int,
|
||||||
|
offset: int,
|
||||||
|
) -> list[DownloadJob]: ...
|
||||||
|
async def count(self, *, requested_by: uuid.UUID | None, status: str | None) -> int: ...
|
||||||
|
async def set_status(
|
||||||
|
self,
|
||||||
|
job_id: uuid.UUID,
|
||||||
|
*,
|
||||||
|
status: str,
|
||||||
|
error_message: str | None = None,
|
||||||
|
track_id: uuid.UUID | None = None,
|
||||||
|
) -> None: ...
|
||||||
|
async def set_progress(self, job_id: uuid.UUID, progress: float) -> None: ...
|
||||||
|
async def increment_retry(self, job_id: uuid.UUID) -> int:
|
||||||
|
"""Bump ``retry_count`` and return the new value."""
|
||||||
|
...
|
||||||
|
|
||||||
|
async def delete(self, job_id: uuid.UUID) -> None: ...
|
||||||
|
async def failure_rate(self, source: str, *, since: dt.datetime) -> float:
|
||||||
|
"""Fraction of jobs for ``source`` created since ``since`` that ended
|
||||||
|
``failed`` (0.0 when there are none) — drives the §A5 "source unhealthy"
|
||||||
|
banner."""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
class SourceBackend(Protocol):
|
class SourceBackend(Protocol):
|
||||||
"""A registered source of tracks (mounted folder, YouTube, …).
|
"""A registered source of tracks (mounted folder, YouTube, …).
|
||||||
|
|
||||||
@@ -291,6 +347,29 @@ class IndexableSource(SourceBackend, Protocol):
|
|||||||
def scan(self) -> Iterator[SourceFile]: ...
|
def scan(self) -> Iterator[SourceFile]: ...
|
||||||
|
|
||||||
|
|
||||||
|
class SearchableSource(SourceBackend, Protocol):
|
||||||
|
"""A source that can be searched by free text (e.g. YouTube Music).
|
||||||
|
|
||||||
|
Returns ``[]`` (never raises) on no results / the service being down — the
|
||||||
|
discover screen degrades to "nothing found" rather than erroring."""
|
||||||
|
|
||||||
|
async def search(self, query: str, *, limit: int) -> list[SearchResult]: ...
|
||||||
|
|
||||||
|
|
||||||
|
class FetchableSource(SourceBackend, Protocol):
|
||||||
|
"""A source that can download a previously-discovered item to local disk.
|
||||||
|
|
||||||
|
``fetch`` resolves a ``source_id`` (from a :class:`SearchResult`) into a file
|
||||||
|
and reports progress through ``on_progress``. It runs only in a worker (heavy
|
||||||
|
I/O) and raises on failure so the download task can retry with backoff."""
|
||||||
|
|
||||||
|
async def fetch(
|
||||||
|
self, source_id: str, *, on_progress: ProgressCallback | None = None
|
||||||
|
) -> DownloadResult: ...
|
||||||
|
|
||||||
|
async def get_metadata(self, source_id: str) -> RawMetadata | None: ...
|
||||||
|
|
||||||
|
|
||||||
# -- metadata enrichment (plan §6.2) -----------------------------------------
|
# -- metadata enrichment (plan §6.2) -----------------------------------------
|
||||||
class AudioTagReader(Protocol):
|
class AudioTagReader(Protocol):
|
||||||
"""Reads embedded tags from a local audio file. Returns ``None`` only when
|
"""Reads embedded tags from a local audio file. Returns ``None`` only when
|
||||||
|
|||||||
+58
-2
@@ -10,8 +10,14 @@ here — a source yields a file plus a minimal title; enrichment (plan §6.2) fi
|
|||||||
the rest later, so this stays a thin discovery layer (CLAUDE.md: no duplicated
|
the rest later, so this stays a thin discovery layer (CLAUDE.md: no duplicated
|
||||||
business logic)."""
|
business logic)."""
|
||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass, field
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
# A source's ``kind`` describes which ports it satisfies, so the UI/admin can
|
||||||
|
# tell an indexed folder from a searchable fetch-source. A backend may be both.
|
||||||
|
KIND_INDEXABLE = "indexable" # enumerates files already on disk (local folder)
|
||||||
|
KIND_FETCH = "fetch" # searches + downloads from an external service (YTM, …)
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True, slots=True)
|
@dataclass(frozen=True, slots=True)
|
||||||
@@ -20,7 +26,7 @@ class SourceInfo:
|
|||||||
|
|
||||||
name: str
|
name: str
|
||||||
label: str
|
label: str
|
||||||
kind: str # "indexable" (more kinds — search/download — arrive with youtube)
|
kind: str # KIND_INDEXABLE | KIND_FETCH
|
||||||
available: bool
|
available: bool
|
||||||
|
|
||||||
|
|
||||||
@@ -37,3 +43,53 @@ class SourceFile:
|
|||||||
suggested_title: str
|
suggested_title: str
|
||||||
file_format: str
|
file_format: str
|
||||||
file_size: int
|
file_size: int
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True, slots=True)
|
||||||
|
class SearchResult:
|
||||||
|
"""One hit from a searchable source (plan §5), shown on the discover screen.
|
||||||
|
|
||||||
|
``source_id`` is the stable handle the same backend later resolves in
|
||||||
|
``fetch`` — it must round-trip a download request without re-searching.
|
||||||
|
``raw`` carries the backend's untouched payload for debugging / future use.
|
||||||
|
"""
|
||||||
|
|
||||||
|
source: str
|
||||||
|
source_id: str
|
||||||
|
title: str
|
||||||
|
artist: str | None
|
||||||
|
album: str | None
|
||||||
|
duration_seconds: int | None
|
||||||
|
thumbnail_url: str | None
|
||||||
|
raw: dict[str, Any] = field(default_factory=dict)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True, slots=True)
|
||||||
|
class RawMetadata:
|
||||||
|
"""Metadata a fetch-source can offer about an item *before* enrichment.
|
||||||
|
|
||||||
|
Best-effort and source-shaped — the canonical metadata still comes from the
|
||||||
|
enrichment pipeline (plan §6.2). Used to seed a more useful provisional
|
||||||
|
title than a bare id while a download is queued."""
|
||||||
|
|
||||||
|
title: str | None
|
||||||
|
artist: str | None
|
||||||
|
album: str | None
|
||||||
|
year: int | None
|
||||||
|
extra: dict[str, Any] = field(default_factory=dict)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True, slots=True)
|
||||||
|
class DownloadResult:
|
||||||
|
"""A file a fetch-source produced on local disk (plan §5).
|
||||||
|
|
||||||
|
``path`` is a temp file the caller owns: it is stored into managed storage
|
||||||
|
and then removed (same lifecycle as an upload). ``source_id`` is echoed back
|
||||||
|
because some backends only learn the canonical id during the download."""
|
||||||
|
|
||||||
|
source_id: str
|
||||||
|
path: Path
|
||||||
|
file_format: str
|
||||||
|
file_size: int
|
||||||
|
bitrate: int | None
|
||||||
|
suggested_title: str
|
||||||
|
|||||||
@@ -35,3 +35,9 @@ class DownloadJobModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
|
|||||||
progress: Mapped[float] = mapped_column(Float, nullable=False, default=0.0)
|
progress: Mapped[float] = mapped_column(Float, nullable=False, default=0.0)
|
||||||
error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
|
error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||||
retry_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
retry_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||||
|
# Set once the download finishes and the track is imported — lets the UI
|
||||||
|
# link a completed job to its library track.
|
||||||
|
track_id: Mapped[uuid.UUID | None] = mapped_column(
|
||||||
|
ForeignKey("tracks.id", ondelete="SET NULL"),
|
||||||
|
nullable=True,
|
||||||
|
)
|
||||||
|
|||||||
@@ -2,6 +2,9 @@
|
|||||||
|
|
||||||
from app.infrastructure.db.repositories.album_repository import SqlAlchemyAlbumRepository
|
from app.infrastructure.db.repositories.album_repository import SqlAlchemyAlbumRepository
|
||||||
from app.infrastructure.db.repositories.artist_repository import SqlAlchemyArtistRepository
|
from app.infrastructure.db.repositories.artist_repository import SqlAlchemyArtistRepository
|
||||||
|
from app.infrastructure.db.repositories.download_job_repository import (
|
||||||
|
SqlAlchemyDownloadJobRepository,
|
||||||
|
)
|
||||||
from app.infrastructure.db.repositories.history_repository import SqlAlchemyHistoryRepository
|
from app.infrastructure.db.repositories.history_repository import SqlAlchemyHistoryRepository
|
||||||
from app.infrastructure.db.repositories.like_repository import SqlAlchemyLikeRepository
|
from app.infrastructure.db.repositories.like_repository import SqlAlchemyLikeRepository
|
||||||
from app.infrastructure.db.repositories.playlist_repository import SqlAlchemyPlaylistRepository
|
from app.infrastructure.db.repositories.playlist_repository import SqlAlchemyPlaylistRepository
|
||||||
@@ -14,6 +17,7 @@ from app.infrastructure.db.repositories.user_repository import SqlAlchemyUserRep
|
|||||||
__all__ = [
|
__all__ = [
|
||||||
"SqlAlchemyAlbumRepository",
|
"SqlAlchemyAlbumRepository",
|
||||||
"SqlAlchemyArtistRepository",
|
"SqlAlchemyArtistRepository",
|
||||||
|
"SqlAlchemyDownloadJobRepository",
|
||||||
"SqlAlchemyHistoryRepository",
|
"SqlAlchemyHistoryRepository",
|
||||||
"SqlAlchemyLikeRepository",
|
"SqlAlchemyLikeRepository",
|
||||||
"SqlAlchemyPlaylistRepository",
|
"SqlAlchemyPlaylistRepository",
|
||||||
|
|||||||
@@ -0,0 +1,164 @@
|
|||||||
|
"""Download job repository — adapter over ``AsyncSession`` (plan §6.1)."""
|
||||||
|
|
||||||
|
import datetime as dt
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from sqlalchemy import func, select
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from app.domain.entities.download import DownloadJob
|
||||||
|
from app.infrastructure.db.models.download_job import DownloadJobModel
|
||||||
|
from app.infrastructure.db.models.enums import DownloadStatus
|
||||||
|
|
||||||
|
# Jobs that are not yet finished — used to dedup an in-flight download.
|
||||||
|
_ACTIVE_STATUSES = (
|
||||||
|
DownloadStatus.QUEUED.value,
|
||||||
|
DownloadStatus.DOWNLOADING.value,
|
||||||
|
DownloadStatus.ENRICHING.value,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _to_entity(row: DownloadJobModel) -> DownloadJob:
|
||||||
|
return DownloadJob(
|
||||||
|
id=row.id,
|
||||||
|
source=row.source,
|
||||||
|
source_id=row.source_id,
|
||||||
|
query=row.query,
|
||||||
|
requested_by=row.requested_by,
|
||||||
|
status=row.status,
|
||||||
|
progress=row.progress,
|
||||||
|
error_message=row.error_message,
|
||||||
|
retry_count=row.retry_count,
|
||||||
|
track_id=row.track_id,
|
||||||
|
created_at=row.created_at,
|
||||||
|
updated_at=row.updated_at,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class SqlAlchemyDownloadJobRepository:
|
||||||
|
def __init__(self, session: AsyncSession) -> None:
|
||||||
|
self._session = session
|
||||||
|
|
||||||
|
async def add(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
source: str,
|
||||||
|
source_id: str | None,
|
||||||
|
query: str | None,
|
||||||
|
requested_by: uuid.UUID | None,
|
||||||
|
) -> DownloadJob:
|
||||||
|
row = DownloadJobModel(
|
||||||
|
source=source,
|
||||||
|
source_id=source_id,
|
||||||
|
query=query,
|
||||||
|
requested_by=requested_by,
|
||||||
|
status=DownloadStatus.QUEUED.value,
|
||||||
|
progress=0.0,
|
||||||
|
retry_count=0,
|
||||||
|
)
|
||||||
|
self._session.add(row)
|
||||||
|
await self._session.flush()
|
||||||
|
await self._session.refresh(row)
|
||||||
|
return _to_entity(row)
|
||||||
|
|
||||||
|
async def get_by_id(self, job_id: uuid.UUID) -> DownloadJob | None:
|
||||||
|
row = await self._session.get(DownloadJobModel, job_id)
|
||||||
|
return _to_entity(row) if row is not None else None
|
||||||
|
|
||||||
|
async def get_active_for_source(self, source: str, source_id: str) -> DownloadJob | None:
|
||||||
|
row = (
|
||||||
|
await self._session.execute(
|
||||||
|
select(DownloadJobModel)
|
||||||
|
.where(
|
||||||
|
DownloadJobModel.source == source,
|
||||||
|
DownloadJobModel.source_id == source_id,
|
||||||
|
DownloadJobModel.status.in_(_ACTIVE_STATUSES),
|
||||||
|
)
|
||||||
|
.order_by(DownloadJobModel.created_at.desc())
|
||||||
|
.limit(1)
|
||||||
|
)
|
||||||
|
).scalar_one_or_none()
|
||||||
|
return _to_entity(row) if row is not None else None
|
||||||
|
|
||||||
|
async def list(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
requested_by: uuid.UUID | None,
|
||||||
|
status: str | None,
|
||||||
|
limit: int,
|
||||||
|
offset: int,
|
||||||
|
) -> list[DownloadJob]:
|
||||||
|
stmt = select(DownloadJobModel)
|
||||||
|
if requested_by is not None:
|
||||||
|
stmt = stmt.where(DownloadJobModel.requested_by == requested_by)
|
||||||
|
if status is not None:
|
||||||
|
stmt = stmt.where(DownloadJobModel.status == status)
|
||||||
|
stmt = stmt.order_by(DownloadJobModel.created_at.desc()).limit(limit).offset(offset)
|
||||||
|
rows = (await self._session.execute(stmt)).scalars().all()
|
||||||
|
return [_to_entity(r) for r in rows]
|
||||||
|
|
||||||
|
async def count(self, *, requested_by: uuid.UUID | None, status: str | None) -> int:
|
||||||
|
stmt = select(func.count()).select_from(DownloadJobModel)
|
||||||
|
if requested_by is not None:
|
||||||
|
stmt = stmt.where(DownloadJobModel.requested_by == requested_by)
|
||||||
|
if status is not None:
|
||||||
|
stmt = stmt.where(DownloadJobModel.status == status)
|
||||||
|
return (await self._session.execute(stmt)).scalar_one()
|
||||||
|
|
||||||
|
async def set_status(
|
||||||
|
self,
|
||||||
|
job_id: uuid.UUID,
|
||||||
|
*,
|
||||||
|
status: str,
|
||||||
|
error_message: str | None = None,
|
||||||
|
track_id: uuid.UUID | None = None,
|
||||||
|
) -> None:
|
||||||
|
row = await self._session.get(DownloadJobModel, job_id)
|
||||||
|
if row is None:
|
||||||
|
return
|
||||||
|
row.status = status
|
||||||
|
# ``error_message`` is always written: a successful transition clears a
|
||||||
|
# stale reason from an earlier failed attempt.
|
||||||
|
row.error_message = error_message
|
||||||
|
if track_id is not None:
|
||||||
|
row.track_id = track_id
|
||||||
|
if status == DownloadStatus.DONE.value:
|
||||||
|
row.progress = 1.0
|
||||||
|
await self._session.flush()
|
||||||
|
|
||||||
|
async def set_progress(self, job_id: uuid.UUID, progress: float) -> None:
|
||||||
|
row = await self._session.get(DownloadJobModel, job_id)
|
||||||
|
if row is None:
|
||||||
|
return
|
||||||
|
row.progress = max(0.0, min(1.0, progress))
|
||||||
|
await self._session.flush()
|
||||||
|
|
||||||
|
async def increment_retry(self, job_id: uuid.UUID) -> int:
|
||||||
|
row = await self._session.get(DownloadJobModel, job_id)
|
||||||
|
if row is None:
|
||||||
|
return 0
|
||||||
|
row.retry_count += 1
|
||||||
|
await self._session.flush()
|
||||||
|
return row.retry_count
|
||||||
|
|
||||||
|
async def delete(self, job_id: uuid.UUID) -> None:
|
||||||
|
row = await self._session.get(DownloadJobModel, job_id)
|
||||||
|
if row is not None:
|
||||||
|
await self._session.delete(row)
|
||||||
|
await self._session.flush()
|
||||||
|
|
||||||
|
async def failure_rate(self, source: str, *, since: dt.datetime) -> float:
|
||||||
|
total, failed = (
|
||||||
|
await self._session.execute(
|
||||||
|
select(
|
||||||
|
func.count(),
|
||||||
|
func.count().filter(DownloadJobModel.status == DownloadStatus.FAILED.value),
|
||||||
|
)
|
||||||
|
.select_from(DownloadJobModel)
|
||||||
|
.where(
|
||||||
|
DownloadJobModel.source == source,
|
||||||
|
DownloadJobModel.created_at >= since,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).one()
|
||||||
|
return (failed / total) if total else 0.0
|
||||||
@@ -170,6 +170,7 @@ class SqlAlchemyTrackRepository:
|
|||||||
artist_id: uuid.UUID | None,
|
artist_id: uuid.UUID | None,
|
||||||
album_id: uuid.UUID | None,
|
album_id: uuid.UUID | None,
|
||||||
q: str | None,
|
q: str | None,
|
||||||
|
source: str | None = None,
|
||||||
sort_by: str = "created_at",
|
sort_by: str = "created_at",
|
||||||
order: str = "desc",
|
order: str = "desc",
|
||||||
limit: int = 50,
|
limit: int = 50,
|
||||||
@@ -180,6 +181,8 @@ class SqlAlchemyTrackRepository:
|
|||||||
stmt = stmt.where(TrackModel.artist_id == artist_id)
|
stmt = stmt.where(TrackModel.artist_id == artist_id)
|
||||||
if album_id is not None:
|
if album_id is not None:
|
||||||
stmt = stmt.where(TrackModel.album_id == album_id)
|
stmt = stmt.where(TrackModel.album_id == album_id)
|
||||||
|
if source is not None:
|
||||||
|
stmt = stmt.where(TrackModel.source == source)
|
||||||
if q:
|
if q:
|
||||||
stmt = stmt.where(TrackModel.title.ilike(f"%{q}%"))
|
stmt = stmt.where(TrackModel.title.ilike(f"%{q}%"))
|
||||||
|
|
||||||
@@ -204,12 +207,15 @@ class SqlAlchemyTrackRepository:
|
|||||||
artist_id: uuid.UUID | None,
|
artist_id: uuid.UUID | None,
|
||||||
album_id: uuid.UUID | None,
|
album_id: uuid.UUID | None,
|
||||||
q: str | None,
|
q: str | None,
|
||||||
|
source: str | None = None,
|
||||||
) -> int:
|
) -> int:
|
||||||
stmt = select(func.count()).select_from(TrackModel)
|
stmt = select(func.count()).select_from(TrackModel)
|
||||||
if artist_id is not None:
|
if artist_id is not None:
|
||||||
stmt = stmt.where(TrackModel.artist_id == artist_id)
|
stmt = stmt.where(TrackModel.artist_id == artist_id)
|
||||||
if album_id is not None:
|
if album_id is not None:
|
||||||
stmt = stmt.where(TrackModel.album_id == album_id)
|
stmt = stmt.where(TrackModel.album_id == album_id)
|
||||||
|
if source is not None:
|
||||||
|
stmt = stmt.where(TrackModel.source == source)
|
||||||
if q:
|
if q:
|
||||||
stmt = stmt.where(TrackModel.title.ilike(f"%{q}%"))
|
stmt = stmt.where(TrackModel.title.ilike(f"%{q}%"))
|
||||||
return (await self._session.execute(stmt)).scalar_one()
|
return (await self._session.execute(stmt)).scalar_one()
|
||||||
|
|||||||
@@ -78,7 +78,7 @@ class AcoustIdHttpClient:
|
|||||||
)
|
)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
return resp.json() # type: ignore[no-any-return]
|
return resp.json() # type: ignore[no-any-return]
|
||||||
except (httpx.HTTPError, ValueError):
|
except httpx.HTTPError, ValueError:
|
||||||
log.warning("acoustid_lookup_failed")
|
log.warning("acoustid_lookup_failed")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|||||||
@@ -2,16 +2,18 @@
|
|||||||
|
|
||||||
Built from settings at the composition root. Only sources that are configured
|
Built from settings at the composition root. Only sources that are configured
|
||||||
are registered (e.g. ``local`` appears only when ``LOCAL_MEDIA_IMPORT_PATH`` is
|
are registered (e.g. ``local`` appears only when ``LOCAL_MEDIA_IMPORT_PATH`` is
|
||||||
set), so enumeration reflects what the instance can actually use.
|
set; ``youtube`` only when ``YOUTUBE_ENABLED``), so enumeration reflects what the
|
||||||
|
instance can actually use.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from typing import cast
|
from typing import cast
|
||||||
|
|
||||||
from app.core.config import Settings
|
from app.core.config import Settings
|
||||||
from app.domain.errors import NotFoundError, ValidationError
|
from app.domain.errors import NotFoundError, ValidationError
|
||||||
from app.domain.ports import IndexableSource, SourceBackend
|
from app.domain.ports import FetchableSource, IndexableSource, SearchableSource, SourceBackend
|
||||||
from app.domain.sources import SourceInfo
|
from app.domain.sources import SourceInfo
|
||||||
from app.infrastructure.sources.local_folder import LocalFolderSource
|
from app.infrastructure.sources.local_folder import LocalFolderSource
|
||||||
|
from app.infrastructure.sources.youtube import YouTubeMusicSource
|
||||||
|
|
||||||
|
|
||||||
class SourceRegistry:
|
class SourceRegistry:
|
||||||
@@ -30,6 +32,22 @@ class SourceRegistry:
|
|||||||
raise ValidationError(f"Source {name!r} cannot be indexed.")
|
raise ValidationError(f"Source {name!r} cannot be indexed.")
|
||||||
return cast(IndexableSource, backend)
|
return cast(IndexableSource, backend)
|
||||||
|
|
||||||
|
def searchable(self, name: str) -> SearchableSource:
|
||||||
|
backend = self.get(name)
|
||||||
|
if not hasattr(backend, "search"):
|
||||||
|
raise ValidationError(f"Source {name!r} cannot be searched.")
|
||||||
|
return cast(SearchableSource, backend)
|
||||||
|
|
||||||
|
def fetchable(self, name: str) -> FetchableSource:
|
||||||
|
backend = self.get(name)
|
||||||
|
if not hasattr(backend, "fetch"):
|
||||||
|
raise ValidationError(f"Source {name!r} cannot download.")
|
||||||
|
return cast(FetchableSource, backend)
|
||||||
|
|
||||||
|
def searchables(self) -> list[SearchableSource]:
|
||||||
|
"""Every registered source that supports search (for cross-source search)."""
|
||||||
|
return [cast(SearchableSource, b) for b in self._by_name.values() if hasattr(b, "search")]
|
||||||
|
|
||||||
def infos(self) -> list[SourceInfo]:
|
def infos(self) -> list[SourceInfo]:
|
||||||
return [backend.info() for backend in self._by_name.values()]
|
return [backend.info() for backend in self._by_name.values()]
|
||||||
|
|
||||||
@@ -38,4 +56,11 @@ def build_source_registry(settings: Settings) -> SourceRegistry:
|
|||||||
backends: list[SourceBackend] = []
|
backends: list[SourceBackend] = []
|
||||||
if settings.local_media_import_path is not None:
|
if settings.local_media_import_path is not None:
|
||||||
backends.append(LocalFolderSource(settings.local_media_import_path))
|
backends.append(LocalFolderSource(settings.local_media_import_path))
|
||||||
|
if settings.youtube_enabled:
|
||||||
|
backends.append(
|
||||||
|
YouTubeMusicSource(
|
||||||
|
cookies_path=settings.youtube_cookies_path,
|
||||||
|
tmp_dir=settings.upload_tmp_dir,
|
||||||
|
)
|
||||||
|
)
|
||||||
return SourceRegistry(backends)
|
return SourceRegistry(backends)
|
||||||
|
|||||||
@@ -0,0 +1,207 @@
|
|||||||
|
"""``youtube`` source — YouTube Music search + download (plan §5).
|
||||||
|
|
||||||
|
A *fetch* source: it searches YouTube Music (via ``ytmusicapi``, which returns
|
||||||
|
clean song/artist/album/duration rows) and downloads the chosen item with
|
||||||
|
``yt-dlp``. The two libraries are synchronous, so every call is bounced to a
|
||||||
|
worker thread (``anyio.to_thread``); the sync yt-dlp progress hook bridges back
|
||||||
|
to the async progress callback via ``anyio.from_thread``.
|
||||||
|
|
||||||
|
Both libraries are optional dependencies — if either is missing the source is
|
||||||
|
simply *unavailable* (it never crashes import or the registry; graceful
|
||||||
|
degradation per CLAUDE.md). The audio stream is stored **as-is** (YouTube serves
|
||||||
|
lossy Opus/AAC; re-encoding would be lossy→lossy, plan §6.6).
|
||||||
|
|
||||||
|
``source_id`` is the YouTube ``videoId`` — stable, so a re-download of the same
|
||||||
|
id is idempotent and dedups against an existing track.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import functools
|
||||||
|
import tempfile
|
||||||
|
from collections.abc import Callable
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import anyio
|
||||||
|
|
||||||
|
from app.core.logging import get_logger
|
||||||
|
from app.domain.ports import ProgressCallback
|
||||||
|
from app.domain.sources import (
|
||||||
|
KIND_FETCH,
|
||||||
|
DownloadResult,
|
||||||
|
RawMetadata,
|
||||||
|
SearchResult,
|
||||||
|
SourceInfo,
|
||||||
|
)
|
||||||
|
from app.infrastructure.db.models.enums import TrackSource
|
||||||
|
|
||||||
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
# Functions a caller may inject for testing (defaults do the real library work).
|
||||||
|
SearchFn = Callable[[str, int], list[dict[str, Any]]]
|
||||||
|
# (video_id, tmp_dir, progress_hook, cookies_path) -> normalized download dict
|
||||||
|
DownloadFn = Callable[[str, Path, Callable[[dict[str, Any]], None], Path | None], dict[str, Any]]
|
||||||
|
|
||||||
|
|
||||||
|
def _libs_available() -> bool:
|
||||||
|
try:
|
||||||
|
import yt_dlp # noqa: F401
|
||||||
|
import ytmusicapi # noqa: F401
|
||||||
|
except ImportError:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def _watch_url(video_id: str) -> str:
|
||||||
|
return f"https://music.youtube.com/watch?v={video_id}"
|
||||||
|
|
||||||
|
|
||||||
|
class YouTubeMusicSource:
|
||||||
|
"""Implements :class:`app.domain.ports.SearchableSource` and
|
||||||
|
:class:`~app.domain.ports.FetchableSource`."""
|
||||||
|
|
||||||
|
name = TrackSource.YOUTUBE.value
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
cookies_path: Path | None = None,
|
||||||
|
tmp_dir: Path | None = None,
|
||||||
|
search_fn: SearchFn | None = None,
|
||||||
|
download_fn: DownloadFn | None = None,
|
||||||
|
) -> None:
|
||||||
|
self._cookies_path = cookies_path
|
||||||
|
self._tmp_dir = tmp_dir
|
||||||
|
self._search_fn = search_fn or _default_search
|
||||||
|
self._download_fn = download_fn or _default_download
|
||||||
|
# Only the real library path needs the deps; an injected fn is self-contained.
|
||||||
|
self._injected = search_fn is not None or download_fn is not None
|
||||||
|
|
||||||
|
def info(self) -> SourceInfo:
|
||||||
|
return SourceInfo(
|
||||||
|
name=self.name,
|
||||||
|
label="YouTube Music",
|
||||||
|
kind=KIND_FETCH,
|
||||||
|
available=self.is_available(),
|
||||||
|
)
|
||||||
|
|
||||||
|
def is_available(self) -> bool:
|
||||||
|
return True if self._injected else _libs_available()
|
||||||
|
|
||||||
|
async def search(self, query: str, *, limit: int) -> list[SearchResult]:
|
||||||
|
query = query.strip()
|
||||||
|
if not query:
|
||||||
|
return []
|
||||||
|
try:
|
||||||
|
rows = await anyio.to_thread.run_sync(functools.partial(self._search_fn, query, limit))
|
||||||
|
except Exception:
|
||||||
|
# No results / service down → degrade to empty (plan §5, CLAUDE.md).
|
||||||
|
log.warning("ytm_search_failed", query=query)
|
||||||
|
return []
|
||||||
|
return [r for r in (self._to_result(row) for row in rows) if r is not None]
|
||||||
|
|
||||||
|
async def fetch(
|
||||||
|
self, source_id: str, *, on_progress: ProgressCallback | None = None
|
||||||
|
) -> DownloadResult:
|
||||||
|
tmp_dir = self._tmp_dir or Path(tempfile.gettempdir())
|
||||||
|
|
||||||
|
def hook(d: dict[str, Any]) -> None:
|
||||||
|
if on_progress is None or d.get("status") != "downloading":
|
||||||
|
return
|
||||||
|
total = d.get("total_bytes") or d.get("total_bytes_estimate")
|
||||||
|
done = d.get("downloaded_bytes")
|
||||||
|
if not total or done is None:
|
||||||
|
return
|
||||||
|
# Cap below 1.0 — the job only reaches 1.0 once stored + imported.
|
||||||
|
frac = min(done / total, 0.99)
|
||||||
|
# Bridge sync hook (worker thread) → async callback (event loop).
|
||||||
|
anyio.from_thread.run(on_progress, frac)
|
||||||
|
|
||||||
|
def _run() -> dict[str, Any]:
|
||||||
|
return self._download_fn(source_id, tmp_dir, hook, self._cookies_path)
|
||||||
|
|
||||||
|
info = await anyio.to_thread.run_sync(_run)
|
||||||
|
path = Path(info["filepath"])
|
||||||
|
stat = await anyio.Path(path).stat()
|
||||||
|
return DownloadResult(
|
||||||
|
source_id=source_id,
|
||||||
|
path=path,
|
||||||
|
file_format=info["file_format"],
|
||||||
|
file_size=stat.st_size,
|
||||||
|
bitrate=info.get("bitrate"),
|
||||||
|
suggested_title=info.get("title") or source_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def get_metadata(self, source_id: str) -> RawMetadata | None:
|
||||||
|
# The search result already carries a usable title/artist, and the
|
||||||
|
# canonical metadata comes from enrichment (§6.2). A dedicated lookup is
|
||||||
|
# an optional refinement — skipped for now (returns None gracefully).
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _to_result(self, row: dict[str, Any]) -> SearchResult | None:
|
||||||
|
video_id = row.get("videoId")
|
||||||
|
if not video_id:
|
||||||
|
return None # non-playable row (e.g. a video without audio id)
|
||||||
|
artists = row.get("artists") or []
|
||||||
|
artist = ", ".join(a["name"] for a in artists if a.get("name")) or None
|
||||||
|
album = (row.get("album") or {}).get("name") if isinstance(row.get("album"), dict) else None
|
||||||
|
thumbnails = row.get("thumbnails") or []
|
||||||
|
thumbnail = thumbnails[-1].get("url") if thumbnails else None
|
||||||
|
return SearchResult(
|
||||||
|
source=self.name,
|
||||||
|
source_id=str(video_id),
|
||||||
|
title=row.get("title") or "Unknown",
|
||||||
|
artist=artist,
|
||||||
|
album=album,
|
||||||
|
duration_seconds=row.get("duration_seconds"),
|
||||||
|
thumbnail_url=thumbnail,
|
||||||
|
raw=row,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _default_search(query: str, limit: int) -> list[dict[str, Any]]:
|
||||||
|
"""Real ytmusicapi search (songs only). Runs in a worker thread."""
|
||||||
|
from ytmusicapi import YTMusic
|
||||||
|
|
||||||
|
yt = YTMusic() # unauthenticated: public search needs no login
|
||||||
|
results: list[dict[str, Any]] = yt.search(query, filter="songs", limit=limit)
|
||||||
|
return results[:limit]
|
||||||
|
|
||||||
|
|
||||||
|
def _default_download(
|
||||||
|
video_id: str,
|
||||||
|
tmp_dir: Path,
|
||||||
|
progress_hook: Callable[[dict[str, Any]], None],
|
||||||
|
cookies_path: Path | None,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Real yt-dlp download of the best audio stream. Runs in a worker thread.
|
||||||
|
|
||||||
|
Stores the original stream (no transcode — plan §6.3/§6.6). Returns a
|
||||||
|
normalized dict the adapter maps to :class:`DownloadResult`.
|
||||||
|
"""
|
||||||
|
from yt_dlp import YoutubeDL
|
||||||
|
|
||||||
|
opts: dict[str, Any] = {
|
||||||
|
"format": "bestaudio/best",
|
||||||
|
"outtmpl": str(tmp_dir / "%(id)s.%(ext)s"),
|
||||||
|
"quiet": True,
|
||||||
|
"no_warnings": True,
|
||||||
|
"noprogress": True,
|
||||||
|
"progress_hooks": [progress_hook],
|
||||||
|
}
|
||||||
|
# Use cookies only when the file is actually present: the path can be set
|
||||||
|
# unconditionally (e.g. a mounted volume that may be empty) and downloads
|
||||||
|
# still work without it — cookies just unlock age/region-restricted items.
|
||||||
|
if cookies_path is not None and cookies_path.is_file():
|
||||||
|
opts["cookiefile"] = str(cookies_path)
|
||||||
|
|
||||||
|
with YoutubeDL(opts) as ydl:
|
||||||
|
info = ydl.extract_info(_watch_url(video_id), download=True)
|
||||||
|
filepath = Path(ydl.prepare_filename(info))
|
||||||
|
|
||||||
|
abr = info.get("abr")
|
||||||
|
return {
|
||||||
|
"filepath": filepath,
|
||||||
|
"file_format": filepath.suffix.lstrip(".").lower() or "m4a",
|
||||||
|
"bitrate": int(abr) if abr else None,
|
||||||
|
"title": info.get("title"),
|
||||||
|
}
|
||||||
@@ -1,7 +1,6 @@
|
|||||||
"""arq worker settings — the queue runtime. Task functions register here.
|
"""arq worker settings — the queue runtime. Task functions register here.
|
||||||
|
|
||||||
Run with: ``arq app.workers.arq_worker.WorkerSettings``.
|
Run with: ``arq app.workers.arq_worker.WorkerSettings``.
|
||||||
Tasks (download, transcode) are appended to ``functions`` in later steps.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from typing import Any, ClassVar
|
from typing import Any, ClassVar
|
||||||
@@ -10,6 +9,7 @@ from arq.connections import RedisSettings
|
|||||||
|
|
||||||
from app.core.config import get_settings
|
from app.core.config import get_settings
|
||||||
from app.core.logging import configure_logging, get_logger
|
from app.core.logging import configure_logging, get_logger
|
||||||
|
from app.workers.tasks.download_task import download_track
|
||||||
from app.workers.tasks.enrich_task import enrich_track
|
from app.workers.tasks.enrich_task import enrich_track
|
||||||
from app.workers.tasks.import_task import scan_local_folder
|
from app.workers.tasks.import_task import scan_local_folder
|
||||||
|
|
||||||
@@ -27,7 +27,7 @@ async def shutdown(_ctx: dict[str, Any]) -> None:
|
|||||||
|
|
||||||
|
|
||||||
class WorkerSettings:
|
class WorkerSettings:
|
||||||
functions: ClassVar[list[Any]] = [scan_local_folder, enrich_track]
|
functions: ClassVar[list[Any]] = [scan_local_folder, enrich_track, download_track]
|
||||||
on_startup = startup
|
on_startup = startup
|
||||||
on_shutdown = shutdown
|
on_shutdown = shutdown
|
||||||
max_jobs = get_settings().max_parallel_downloads
|
max_jobs = get_settings().max_parallel_downloads
|
||||||
|
|||||||
@@ -34,6 +34,20 @@ async def enqueue(function: str, **kwargs: Any) -> str:
|
|||||||
return str(job.job_id)
|
return str(job.job_id)
|
||||||
|
|
||||||
|
|
||||||
|
async def enqueue_download(job_id: uuid.UUID) -> None:
|
||||||
|
"""Best-effort enqueue of a download job for the worker.
|
||||||
|
|
||||||
|
The job row is already persisted as ``queued``, so this is a follow-up, not a
|
||||||
|
barrier: if the queue is unreachable we log and move on (graceful
|
||||||
|
degradation) — the job stays ``queued`` and can be retried later. Deferred a
|
||||||
|
few seconds so the request's DB transaction commits before the worker reads
|
||||||
|
the row (same reason as :func:`enqueue_enrich`)."""
|
||||||
|
try:
|
||||||
|
await enqueue("download_track", job_id=str(job_id), _defer_by=3)
|
||||||
|
except DependencyUnavailableError:
|
||||||
|
log.warning("download_enqueue_failed", job_id=str(job_id))
|
||||||
|
|
||||||
|
|
||||||
async def enqueue_enrich(track_id: uuid.UUID) -> None:
|
async def enqueue_enrich(track_id: uuid.UUID) -> None:
|
||||||
"""Best-effort enqueue of metadata enrichment for a freshly stored track.
|
"""Best-effort enqueue of metadata enrichment for a freshly stored track.
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,151 @@
|
|||||||
|
"""arq task: download one queued job through a fetch source (plan §6.1).
|
||||||
|
|
||||||
|
Flow: load job → ``downloading`` → ``backend.fetch`` (progress streamed to the
|
||||||
|
job row) → ``enriching`` → store file + minimal track → ``done`` → enqueue
|
||||||
|
enrichment. yt-dlp fails often, so a failed fetch retries with exponential
|
||||||
|
backoff (``download_max_retries``); only after the last try is the job marked
|
||||||
|
``failed`` with a reason for the §A5 download manager.
|
||||||
|
|
||||||
|
Heavy I/O belongs off the request cycle (CLAUDE.md); the HTTP endpoint only
|
||||||
|
enqueues. The job row tolerates being deleted mid-flight (cancellation) — status
|
||||||
|
writes against a missing row are no-ops.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import uuid
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from arq import Retry
|
||||||
|
|
||||||
|
from app.application.download_service import DownloadService
|
||||||
|
from app.core.config import get_settings
|
||||||
|
from app.core.logging import correlation_id, get_logger
|
||||||
|
from app.domain.entities.download import DownloadJob
|
||||||
|
from app.domain.errors import NotFoundError, ValidationError
|
||||||
|
from app.domain.ports import FetchableSource, ProgressCallback
|
||||||
|
from app.domain.sources import DownloadResult
|
||||||
|
from app.infrastructure.db import session_scope
|
||||||
|
from app.infrastructure.db.repositories import (
|
||||||
|
SqlAlchemyArtistRepository,
|
||||||
|
SqlAlchemyDownloadJobRepository,
|
||||||
|
SqlAlchemyTrackRepository,
|
||||||
|
)
|
||||||
|
from app.infrastructure.sources.registry import build_source_registry
|
||||||
|
from app.infrastructure.storage.provider import get_file_storage
|
||||||
|
from app.workers.queue import enqueue_enrich
|
||||||
|
|
||||||
|
log = get_logger("worker.download")
|
||||||
|
|
||||||
|
# Exponential backoff between retries: 30s, 60s, 120s … capped.
|
||||||
|
_BACKOFF_BASE_SECONDS = 30
|
||||||
|
_BACKOFF_MAX_SECONDS = 600
|
||||||
|
# Only write progress when it advances by at least this much (avoid hammering
|
||||||
|
# the DB on every yt-dlp chunk).
|
||||||
|
_PROGRESS_STEP = 0.01
|
||||||
|
|
||||||
|
|
||||||
|
async def download_track(_ctx: dict[str, Any], *, job_id: str) -> dict[str, Any]:
|
||||||
|
correlation_id.set(f"dl:{job_id}")
|
||||||
|
jid = uuid.UUID(job_id)
|
||||||
|
settings = get_settings()
|
||||||
|
|
||||||
|
job = await _load_job(jid)
|
||||||
|
if job is None:
|
||||||
|
log.info("download_job_missing", job_id=job_id) # cancelled before pickup
|
||||||
|
return {"job_id": job_id, "status": "missing"}
|
||||||
|
|
||||||
|
registry = build_source_registry(settings)
|
||||||
|
try:
|
||||||
|
backend = registry.fetchable(job.source)
|
||||||
|
except (NotFoundError, ValidationError) as exc:
|
||||||
|
await _mark_failed(jid, f"Source unavailable: {exc}")
|
||||||
|
return {"job_id": job_id, "status": "failed"}
|
||||||
|
if job.source_id is None:
|
||||||
|
await _mark_failed(jid, "Job has no source_id to download.")
|
||||||
|
return {"job_id": job_id, "status": "failed"}
|
||||||
|
|
||||||
|
await _set_status(jid, "downloading")
|
||||||
|
try:
|
||||||
|
result = await _run_fetch(backend, job.source_id, jid)
|
||||||
|
except Exception as exc:
|
||||||
|
return await _handle_failure(jid, exc, settings.download_max_retries, job_id)
|
||||||
|
|
||||||
|
try:
|
||||||
|
track_id = await _import_result(jid, job, result)
|
||||||
|
except Exception as exc:
|
||||||
|
log.exception("download_import_failed", job_id=job_id)
|
||||||
|
await _mark_failed(jid, f"Import failed: {type(exc).__name__}: {exc}")
|
||||||
|
return {"job_id": job_id, "status": "failed"}
|
||||||
|
|
||||||
|
await enqueue_enrich(track_id)
|
||||||
|
log.info("download_complete", job_id=job_id, track_id=str(track_id))
|
||||||
|
return {"job_id": job_id, "status": "done", "track_id": str(track_id)}
|
||||||
|
|
||||||
|
|
||||||
|
async def _run_fetch(
|
||||||
|
backend: FetchableSource, source_id: str, jid: uuid.UUID
|
||||||
|
) -> DownloadResult:
|
||||||
|
"""Fetch the file, streaming progress into the job row. A single session is
|
||||||
|
held for the download so progress writes don't churn connections; each
|
||||||
|
throttled update is committed so API pollers see it."""
|
||||||
|
async with session_scope() as session:
|
||||||
|
repo = SqlAlchemyDownloadJobRepository(session)
|
||||||
|
last = 0.0
|
||||||
|
|
||||||
|
async def on_progress(frac: float) -> None:
|
||||||
|
nonlocal last
|
||||||
|
if frac - last < _PROGRESS_STEP:
|
||||||
|
return
|
||||||
|
last = frac
|
||||||
|
await repo.set_progress(jid, frac)
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
cb: ProgressCallback = on_progress
|
||||||
|
return await backend.fetch(source_id, on_progress=cb)
|
||||||
|
|
||||||
|
|
||||||
|
async def _import_result(jid: uuid.UUID, job: DownloadJob, result: DownloadResult) -> uuid.UUID:
|
||||||
|
async with session_scope() as session:
|
||||||
|
repo = SqlAlchemyDownloadJobRepository(session)
|
||||||
|
await repo.set_status(jid, status="enriching")
|
||||||
|
service = DownloadService(
|
||||||
|
jobs=repo,
|
||||||
|
tracks=SqlAlchemyTrackRepository(session),
|
||||||
|
artists=SqlAlchemyArtistRepository(session),
|
||||||
|
storage=get_file_storage(),
|
||||||
|
)
|
||||||
|
track_id = await service.store_result(
|
||||||
|
source=job.source, result=result, requested_by=job.requested_by
|
||||||
|
)
|
||||||
|
await repo.set_status(jid, status="done", track_id=track_id)
|
||||||
|
return track_id
|
||||||
|
|
||||||
|
|
||||||
|
async def _handle_failure(
|
||||||
|
jid: uuid.UUID, exc: Exception, max_retries: int, job_id: str
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
async with session_scope() as session:
|
||||||
|
tries = await SqlAlchemyDownloadJobRepository(session).increment_retry(jid)
|
||||||
|
if tries <= max_retries:
|
||||||
|
backoff = min(_BACKOFF_BASE_SECONDS * 2 ** (tries - 1), _BACKOFF_MAX_SECONDS)
|
||||||
|
log.warning("download_retry", job_id=job_id, attempt=tries, defer=backoff)
|
||||||
|
raise Retry(defer=backoff) from exc
|
||||||
|
log.exception("download_failed", job_id=job_id)
|
||||||
|
await _mark_failed(jid, f"Download failed after {tries} attempts: {type(exc).__name__}: {exc}")
|
||||||
|
return {"job_id": job_id, "status": "failed"}
|
||||||
|
|
||||||
|
|
||||||
|
async def _load_job(jid: uuid.UUID) -> DownloadJob | None:
|
||||||
|
async with session_scope() as session:
|
||||||
|
return await SqlAlchemyDownloadJobRepository(session).get_by_id(jid)
|
||||||
|
|
||||||
|
|
||||||
|
async def _set_status(jid: uuid.UUID, status: str) -> None:
|
||||||
|
async with session_scope() as session:
|
||||||
|
await SqlAlchemyDownloadJobRepository(session).set_status(jid, status=status)
|
||||||
|
|
||||||
|
|
||||||
|
async def _mark_failed(jid: uuid.UUID, error: str) -> None:
|
||||||
|
async with session_scope() as session:
|
||||||
|
await SqlAlchemyDownloadJobRepository(session).set_status(
|
||||||
|
jid, status="failed", error_message=error
|
||||||
|
)
|
||||||
@@ -27,6 +27,9 @@ dependencies = [
|
|||||||
"httpx>=0.28",
|
"httpx>=0.28",
|
||||||
# embedded audio tag reading (enrichment tag pre-pass)
|
# embedded audio tag reading (enrichment tag pre-pass)
|
||||||
"mutagen>=1.47",
|
"mutagen>=1.47",
|
||||||
|
# youtube source: search (ytmusicapi) + download (yt-dlp)
|
||||||
|
"ytmusicapi>=1.8",
|
||||||
|
"yt-dlp>=2024.12.13",
|
||||||
# S3-compatible object storage
|
# S3-compatible object storage
|
||||||
"aioboto3>=13.0",
|
"aioboto3>=13.0",
|
||||||
# logging
|
# logging
|
||||||
@@ -95,6 +98,10 @@ ignore_missing_imports = true
|
|||||||
module = ["aioboto3.*", "aiobotocore.*", "botocore.*"]
|
module = ["aioboto3.*", "aiobotocore.*", "botocore.*"]
|
||||||
ignore_missing_imports = true
|
ignore_missing_imports = true
|
||||||
|
|
||||||
|
[[tool.mypy.overrides]]
|
||||||
|
module = ["ytmusicapi.*", "yt_dlp.*"]
|
||||||
|
ignore_missing_imports = true
|
||||||
|
|
||||||
[tool.pytest.ini_options]
|
[tool.pytest.ini_options]
|
||||||
asyncio_mode = "auto"
|
asyncio_mode = "auto"
|
||||||
testpaths = ["tests"]
|
testpaths = ["tests"]
|
||||||
|
|||||||
+1
-3
@@ -79,9 +79,7 @@ async def _create_test_db_if_missing() -> None:
|
|||||||
except Exception:
|
except Exception:
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
exists = await conn.fetchval(
|
exists = await conn.fetchval("SELECT 1 FROM pg_database WHERE datname = $1", _TEST_DB_NAME)
|
||||||
"SELECT 1 FROM pg_database WHERE datname = $1", _TEST_DB_NAME
|
|
||||||
)
|
|
||||||
if not exists:
|
if not exists:
|
||||||
# CREATE DATABASE can't run inside a transaction; asyncpg's implicit
|
# CREATE DATABASE can't run inside a transaction; asyncpg's implicit
|
||||||
# autocommit on a bare connection handles that.
|
# autocommit on a bare connection handles that.
|
||||||
|
|||||||
@@ -0,0 +1,213 @@
|
|||||||
|
"""Unit tests for DownloadService — DB-free, in-memory fakes."""
|
||||||
|
|
||||||
|
import datetime as dt
|
||||||
|
import uuid
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from app.application.download_service import DownloadService
|
||||||
|
from app.domain.entities import Artist, Track
|
||||||
|
from app.domain.entities.download import DownloadJob
|
||||||
|
from app.domain.sources import DownloadResult
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.asyncio
|
||||||
|
|
||||||
|
|
||||||
|
class FakeArtistRepo:
|
||||||
|
async def get_or_create(self, name: str) -> Artist:
|
||||||
|
now = dt.datetime.now(dt.UTC)
|
||||||
|
return Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now)
|
||||||
|
|
||||||
|
|
||||||
|
class FakeTrackRepo:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.by_source: dict[tuple[str, str], Track] = {}
|
||||||
|
self.added: list[Track] = []
|
||||||
|
|
||||||
|
async def get_by_source(self, source: str, source_id: str) -> Track | None:
|
||||||
|
return self.by_source.get((source, source_id))
|
||||||
|
|
||||||
|
async def add(self, **kw: object) -> Track:
|
||||||
|
now = dt.datetime.now(dt.UTC)
|
||||||
|
track = Track(
|
||||||
|
id=kw["id"], # type: ignore[arg-type]
|
||||||
|
title=str(kw["title"]),
|
||||||
|
artist_id=kw["artist_id"], # type: ignore[arg-type]
|
||||||
|
album_id=None,
|
||||||
|
storage_uri=str(kw["storage_uri"]),
|
||||||
|
file_format=str(kw["file_format"]),
|
||||||
|
file_size=int(kw["file_size"]), # type: ignore[call-overload]
|
||||||
|
source=str(kw["source"]),
|
||||||
|
source_id=str(kw["source_id"]),
|
||||||
|
duration_seconds=None,
|
||||||
|
genre=None,
|
||||||
|
year=None,
|
||||||
|
track_number=None,
|
||||||
|
metadata_status=str(kw["metadata_status"]),
|
||||||
|
metadata_error=None,
|
||||||
|
enriched_at=None,
|
||||||
|
created_at=now,
|
||||||
|
updated_at=now,
|
||||||
|
)
|
||||||
|
self.by_source[(track.source, track.source_id)] = track
|
||||||
|
self.added.append(track)
|
||||||
|
return track
|
||||||
|
|
||||||
|
|
||||||
|
class FakeStorage:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.saved: dict[str, Path] = {}
|
||||||
|
self.deleted: list[str] = []
|
||||||
|
|
||||||
|
async def save_file(self, key: str, src_path: Path) -> int:
|
||||||
|
self.saved[key] = src_path
|
||||||
|
return 1
|
||||||
|
|
||||||
|
async def delete(self, key: str) -> None:
|
||||||
|
self.deleted.append(key)
|
||||||
|
|
||||||
|
|
||||||
|
class FakeJobRepo:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.jobs: dict[uuid.UUID, DownloadJob] = {}
|
||||||
|
self.active: dict[tuple[str, str], DownloadJob] = {}
|
||||||
|
|
||||||
|
def _make(self, **kw: object) -> DownloadJob:
|
||||||
|
now = dt.datetime.now(dt.UTC)
|
||||||
|
return DownloadJob(
|
||||||
|
id=uuid.uuid4(),
|
||||||
|
source=str(kw["source"]),
|
||||||
|
source_id=kw.get("source_id"), # type: ignore[arg-type]
|
||||||
|
query=kw.get("query"), # type: ignore[arg-type]
|
||||||
|
requested_by=kw.get("requested_by"), # type: ignore[arg-type]
|
||||||
|
status="queued",
|
||||||
|
progress=0.0,
|
||||||
|
error_message=None,
|
||||||
|
retry_count=0,
|
||||||
|
track_id=None,
|
||||||
|
created_at=now,
|
||||||
|
updated_at=now,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def add(self, **kw: object) -> DownloadJob:
|
||||||
|
job = self._make(**kw)
|
||||||
|
self.jobs[job.id] = job
|
||||||
|
return job
|
||||||
|
|
||||||
|
async def get_by_id(self, job_id: uuid.UUID) -> DownloadJob | None:
|
||||||
|
return self.jobs.get(job_id)
|
||||||
|
|
||||||
|
async def get_active_for_source(self, source: str, source_id: str) -> DownloadJob | None:
|
||||||
|
return self.active.get((source, source_id))
|
||||||
|
|
||||||
|
async def set_status(self, job_id: uuid.UUID, **kw: object) -> None: ...
|
||||||
|
|
||||||
|
async def delete(self, job_id: uuid.UUID) -> None:
|
||||||
|
self.jobs.pop(job_id, None)
|
||||||
|
|
||||||
|
|
||||||
|
def _service(
|
||||||
|
*, jobs: FakeJobRepo, tracks: FakeTrackRepo, storage: FakeStorage, enqueued: list[uuid.UUID]
|
||||||
|
) -> DownloadService:
|
||||||
|
async def enqueue_download(job_id: uuid.UUID) -> None:
|
||||||
|
enqueued.append(job_id)
|
||||||
|
|
||||||
|
return DownloadService(
|
||||||
|
jobs=jobs, # type: ignore[arg-type]
|
||||||
|
tracks=tracks, # type: ignore[arg-type]
|
||||||
|
artists=FakeArtistRepo(), # type: ignore[arg-type]
|
||||||
|
storage=storage, # type: ignore[arg-type]
|
||||||
|
enqueue_download=enqueue_download,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _track(source: str, source_id: str) -> Track:
|
||||||
|
now = dt.datetime.now(dt.UTC)
|
||||||
|
return Track(
|
||||||
|
id=uuid.uuid4(),
|
||||||
|
title="t",
|
||||||
|
artist_id=uuid.uuid4(),
|
||||||
|
album_id=None,
|
||||||
|
storage_uri="k",
|
||||||
|
file_format="mp3",
|
||||||
|
file_size=1,
|
||||||
|
source=source,
|
||||||
|
source_id=source_id,
|
||||||
|
duration_seconds=None,
|
||||||
|
genre=None,
|
||||||
|
year=None,
|
||||||
|
track_number=None,
|
||||||
|
metadata_status="pending",
|
||||||
|
metadata_error=None,
|
||||||
|
enriched_at=None,
|
||||||
|
created_at=now,
|
||||||
|
updated_at=now,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def test_request_dedups_against_library() -> None:
|
||||||
|
jobs, tracks, storage, enq = FakeJobRepo(), FakeTrackRepo(), FakeStorage(), []
|
||||||
|
tracks.by_source[("youtube", "abc")] = _track("youtube", "abc")
|
||||||
|
svc = _service(jobs=jobs, tracks=tracks, storage=storage, enqueued=enq)
|
||||||
|
|
||||||
|
result = await svc.request(source="youtube", source_id="abc", query=None, requested_by=None)
|
||||||
|
|
||||||
|
assert result.already_in_library is True
|
||||||
|
assert result.track_id is not None
|
||||||
|
assert result.job is None
|
||||||
|
assert enq == [] # nothing enqueued
|
||||||
|
|
||||||
|
|
||||||
|
async def test_request_returns_existing_active_job() -> None:
|
||||||
|
jobs, tracks, storage, enq = FakeJobRepo(), FakeTrackRepo(), FakeStorage(), []
|
||||||
|
existing = await jobs.add(source="youtube", source_id="abc", query=None, requested_by=None)
|
||||||
|
jobs.active[("youtube", "abc")] = existing
|
||||||
|
svc = _service(jobs=jobs, tracks=tracks, storage=storage, enqueued=enq)
|
||||||
|
|
||||||
|
result = await svc.request(source="youtube", source_id="abc", query=None, requested_by=None)
|
||||||
|
|
||||||
|
assert result.already_in_library is False
|
||||||
|
assert result.job is not None
|
||||||
|
assert result.job.id == existing.id
|
||||||
|
assert enq == [] # not re-enqueued
|
||||||
|
|
||||||
|
|
||||||
|
async def test_request_creates_and_enqueues_new_job() -> None:
|
||||||
|
jobs, tracks, storage, enq = FakeJobRepo(), FakeTrackRepo(), FakeStorage(), []
|
||||||
|
svc = _service(jobs=jobs, tracks=tracks, storage=storage, enqueued=enq)
|
||||||
|
|
||||||
|
result = await svc.request(
|
||||||
|
source="youtube", source_id="abc", query="bohemian", requested_by=None
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result.already_in_library is False
|
||||||
|
assert result.job is not None
|
||||||
|
assert enq == [result.job.id]
|
||||||
|
|
||||||
|
|
||||||
|
async def test_store_result_imports_and_cleans_temp(tmp_path: Path) -> None:
|
||||||
|
jobs, tracks, storage, enq = FakeJobRepo(), FakeTrackRepo(), FakeStorage(), []
|
||||||
|
svc = _service(jobs=jobs, tracks=tracks, storage=storage, enqueued=enq)
|
||||||
|
|
||||||
|
audio = tmp_path / "abc.webm"
|
||||||
|
audio.write_bytes(b"audio" * 20)
|
||||||
|
result = DownloadResult(
|
||||||
|
source_id="abc",
|
||||||
|
path=audio,
|
||||||
|
file_format="m4a",
|
||||||
|
file_size=100,
|
||||||
|
bitrate=160,
|
||||||
|
suggested_title="Bohemian Rhapsody",
|
||||||
|
)
|
||||||
|
|
||||||
|
track_id = await svc.store_result(source="youtube", result=result, requested_by=None)
|
||||||
|
|
||||||
|
assert len(tracks.added) == 1
|
||||||
|
stored = tracks.added[0]
|
||||||
|
assert stored.id == track_id
|
||||||
|
assert stored.source == "youtube"
|
||||||
|
assert stored.source_id == "abc"
|
||||||
|
assert stored.metadata_status == "pending"
|
||||||
|
assert stored.title == "Bohemian Rhapsody"
|
||||||
|
assert len(storage.saved) == 1
|
||||||
|
assert not audio.exists() # temp file removed
|
||||||
@@ -0,0 +1,252 @@
|
|||||||
|
"""Integration tests for downloads + external search.
|
||||||
|
|
||||||
|
Requires a reachable Postgres; skips otherwise. The download worker task is
|
||||||
|
invoked directly (no Redis needed) against a fake fetch source, so the full
|
||||||
|
DB + storage import path is covered without touching the network.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import os
|
||||||
|
from collections.abc import AsyncIterator
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from app.core.config import get_settings
|
||||||
|
from app.domain.sources import KIND_FETCH, DownloadResult, SearchResult, SourceInfo
|
||||||
|
from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
|
||||||
|
from app.infrastructure.db.repositories import (
|
||||||
|
SqlAlchemyRefreshTokenRepository,
|
||||||
|
SqlAlchemyUserRepository,
|
||||||
|
)
|
||||||
|
from app.infrastructure.sources.registry import SourceRegistry
|
||||||
|
from asgi_lifespan import LifespanManager
|
||||||
|
from httpx import ASGITransport, AsyncClient
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.asyncio
|
||||||
|
|
||||||
|
_db_reachable_cache: bool | None = None
|
||||||
|
|
||||||
|
|
||||||
|
async def _db_reachable() -> bool:
|
||||||
|
global _db_reachable_cache
|
||||||
|
if _db_reachable_cache is not None:
|
||||||
|
return _db_reachable_cache
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with asyncio.timeout(3):
|
||||||
|
async with get_engine().connect() as conn:
|
||||||
|
await conn.execute(text("SELECT 1"))
|
||||||
|
_db_reachable_cache = True
|
||||||
|
except Exception:
|
||||||
|
_db_reachable_cache = False
|
||||||
|
return _db_reachable_cache
|
||||||
|
|
||||||
|
|
||||||
|
class FakeFetchSource:
|
||||||
|
"""A searchable + fetchable source that writes a local file (no network)."""
|
||||||
|
|
||||||
|
name = "youtube"
|
||||||
|
|
||||||
|
def __init__(self, tmp_dir: Path) -> None:
|
||||||
|
self._tmp_dir = tmp_dir
|
||||||
|
|
||||||
|
def info(self) -> SourceInfo:
|
||||||
|
return SourceInfo(name=self.name, label="YouTube Music", kind=KIND_FETCH, available=True)
|
||||||
|
|
||||||
|
def is_available(self) -> bool:
|
||||||
|
return True
|
||||||
|
|
||||||
|
async def search(self, query: str, *, limit: int) -> list[SearchResult]:
|
||||||
|
return [
|
||||||
|
SearchResult(
|
||||||
|
source=self.name,
|
||||||
|
source_id="vid-1",
|
||||||
|
title=f"{query} song",
|
||||||
|
artist="Some Artist",
|
||||||
|
album="Some Album",
|
||||||
|
duration_seconds=200,
|
||||||
|
thumbnail_url="http://img/large.jpg",
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
|
async def fetch(self, source_id: str, *, on_progress: Any = None) -> DownloadResult:
|
||||||
|
path = self._tmp_dir / f"{source_id}.m4a"
|
||||||
|
path.write_bytes(b"downloaded audio bytes" * 8)
|
||||||
|
if on_progress is not None:
|
||||||
|
await on_progress(0.5)
|
||||||
|
return DownloadResult(
|
||||||
|
source_id=source_id,
|
||||||
|
path=path,
|
||||||
|
file_format="webm",
|
||||||
|
file_size=path.stat().st_size,
|
||||||
|
bitrate=160,
|
||||||
|
suggested_title=f"Title for {source_id}",
|
||||||
|
)
|
||||||
|
|
||||||
|
async def get_metadata(self, source_id: str) -> None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
async def api(tmp_path: Path) -> AsyncIterator[AsyncClient]:
|
||||||
|
if not await _db_reachable():
|
||||||
|
pytest.skip("Postgres not reachable — integration test skipped.")
|
||||||
|
|
||||||
|
media = tmp_path / "media"
|
||||||
|
media.mkdir()
|
||||||
|
os.environ["MEDIA_PATH"] = str(media)
|
||||||
|
get_settings.cache_clear()
|
||||||
|
|
||||||
|
import app.infrastructure.storage.provider as _storage_provider
|
||||||
|
|
||||||
|
_storage_provider._storage = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with get_engine().begin() as conn:
|
||||||
|
await conn.run_sync(Base.metadata.drop_all)
|
||||||
|
await conn.run_sync(Base.metadata.create_all)
|
||||||
|
|
||||||
|
from app.application.user_service import UserService
|
||||||
|
from app.core.security import Argon2PasswordHasher
|
||||||
|
|
||||||
|
async with session_scope() as session:
|
||||||
|
await UserService(
|
||||||
|
users=SqlAlchemyUserRepository(session),
|
||||||
|
refresh_tokens=SqlAlchemyRefreshTokenRepository(session),
|
||||||
|
hasher=Argon2PasswordHasher(),
|
||||||
|
).create_user(username="admin", password="adminpass1", is_superuser=True)
|
||||||
|
|
||||||
|
from app.api.deps import get_source_registry
|
||||||
|
from app.main import create_app
|
||||||
|
|
||||||
|
app = create_app()
|
||||||
|
# Inject a fake fetch source so search/download never hit the network.
|
||||||
|
fake_registry = SourceRegistry([FakeFetchSource(tmp_path / "dl")]) # type: ignore[list-item]
|
||||||
|
(tmp_path / "dl").mkdir()
|
||||||
|
app.dependency_overrides[get_source_registry] = lambda: fake_registry
|
||||||
|
|
||||||
|
async with LifespanManager(app):
|
||||||
|
transport = ASGITransport(app=app)
|
||||||
|
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
||||||
|
yield client
|
||||||
|
|
||||||
|
async with get_engine().begin() as conn:
|
||||||
|
await conn.run_sync(Base.metadata.drop_all)
|
||||||
|
await dispose_engine()
|
||||||
|
finally:
|
||||||
|
_storage_provider._storage = None
|
||||||
|
os.environ.pop("MEDIA_PATH", None)
|
||||||
|
get_settings.cache_clear()
|
||||||
|
|
||||||
|
|
||||||
|
async def _login(api: AsyncClient) -> str:
|
||||||
|
resp = await api.post(
|
||||||
|
"/api/v1/auth/login", json={"username": "admin", "password": "adminpass1"}
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
return str(resp.json()["access_token"])
|
||||||
|
|
||||||
|
|
||||||
|
async def test_search_aggregates_fetch_sources(api: AsyncClient) -> None:
|
||||||
|
token = await _login(api)
|
||||||
|
headers = {"Authorization": f"Bearer {token}"}
|
||||||
|
|
||||||
|
resp = await api.get("/api/v1/search", params={"q": "queen"}, headers=headers)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
body = resp.json()
|
||||||
|
assert body["searched_sources"] == ["youtube"]
|
||||||
|
assert len(body["results"]) == 1
|
||||||
|
hit = body["results"][0]
|
||||||
|
assert hit["source"] == "youtube"
|
||||||
|
assert hit["source_id"] == "vid-1"
|
||||||
|
assert hit["title"] == "queen song"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_source_scoped_search(api: AsyncClient) -> None:
|
||||||
|
token = await _login(api)
|
||||||
|
headers = {"Authorization": f"Bearer {token}"}
|
||||||
|
resp = await api.get("/api/v1/sources/youtube/search", params={"q": "abba"}, headers=headers)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert resp.json()["results"][0]["title"] == "abba song"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_download_create_list_and_complete(
|
||||||
|
api: AsyncClient, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||||
|
) -> None:
|
||||||
|
token = await _login(api)
|
||||||
|
headers = {"Authorization": f"Bearer {token}"}
|
||||||
|
|
||||||
|
# Request a download — Redis is absent, so enqueue degrades but the job persists.
|
||||||
|
create = await api.post(
|
||||||
|
"/api/v1/downloads",
|
||||||
|
json={"source": "youtube", "source_id": "vid-1", "query": "queen"},
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
assert create.status_code == 202
|
||||||
|
body = create.json()
|
||||||
|
assert body["already_in_library"] is False
|
||||||
|
job_id = body["job"]["id"]
|
||||||
|
assert body["job"]["status"] == "queued"
|
||||||
|
|
||||||
|
# It shows up in the listing.
|
||||||
|
listing = await api.get("/api/v1/downloads", headers=headers)
|
||||||
|
assert listing.status_code == 200
|
||||||
|
assert any(j["id"] == job_id for j in listing.json()["items"])
|
||||||
|
|
||||||
|
# A duplicate request returns the same in-flight job, not a new one.
|
||||||
|
dup = await api.post(
|
||||||
|
"/api/v1/downloads",
|
||||||
|
json={"source": "youtube", "source_id": "vid-1"},
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
assert dup.json()["job"]["id"] == job_id
|
||||||
|
|
||||||
|
# Run the worker task directly (bypasses Redis) with the fake fetch source.
|
||||||
|
import app.workers.tasks.download_task as dl_task
|
||||||
|
|
||||||
|
worker_dl = tmp_path / "worker-dl"
|
||||||
|
worker_dl.mkdir()
|
||||||
|
fake = SourceRegistry([FakeFetchSource(worker_dl)]) # type: ignore[list-item]
|
||||||
|
monkeypatch.setattr(dl_task, "build_source_registry", lambda _settings: fake)
|
||||||
|
|
||||||
|
result = await dl_task.download_track({}, job_id=job_id)
|
||||||
|
assert result["status"] == "done"
|
||||||
|
track_id = result["track_id"]
|
||||||
|
|
||||||
|
# The job is now done and linked to the imported track.
|
||||||
|
got = await api.get(f"/api/v1/downloads/{job_id}", headers=headers)
|
||||||
|
assert got.json()["status"] == "done"
|
||||||
|
assert got.json()["track_id"] == track_id
|
||||||
|
|
||||||
|
# The imported track streams back.
|
||||||
|
stream = await api.get(f"/api/v1/stream/{track_id}", headers=headers)
|
||||||
|
assert stream.status_code == 200
|
||||||
|
assert len(stream.content) > 0
|
||||||
|
|
||||||
|
# A new request for the same item now dedups against the library.
|
||||||
|
again = await api.post(
|
||||||
|
"/api/v1/downloads",
|
||||||
|
json={"source": "youtube", "source_id": "vid-1"},
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
assert again.json()["already_in_library"] is True
|
||||||
|
assert again.json()["track_id"] == track_id
|
||||||
|
|
||||||
|
|
||||||
|
async def test_cancel_download(api: AsyncClient) -> None:
|
||||||
|
token = await _login(api)
|
||||||
|
headers = {"Authorization": f"Bearer {token}"}
|
||||||
|
create = await api.post(
|
||||||
|
"/api/v1/downloads",
|
||||||
|
json={"source": "youtube", "source_id": "vid-cancel"},
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
job_id = create.json()["job"]["id"]
|
||||||
|
|
||||||
|
cancel = await api.delete(f"/api/v1/downloads/{job_id}", headers=headers)
|
||||||
|
assert cancel.status_code == 204
|
||||||
|
|
||||||
|
got = await api.get(f"/api/v1/downloads/{job_id}", headers=headers)
|
||||||
|
assert got.status_code == 404
|
||||||
@@ -48,12 +48,17 @@ def test_info_reports_kind_and_availability(tmp_path: Path) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def test_registry_registers_local_when_path_set(tmp_path: Path) -> None:
|
def test_registry_registers_local_when_path_set(tmp_path: Path) -> None:
|
||||||
registry = build_source_registry(_settings(local_media_import_path=tmp_path))
|
# Disable youtube to isolate the local-source registration under test.
|
||||||
|
registry = build_source_registry(
|
||||||
|
_settings(local_media_import_path=tmp_path, youtube_enabled=False)
|
||||||
|
)
|
||||||
names = {info.name for info in registry.infos()}
|
names = {info.name for info in registry.infos()}
|
||||||
assert names == {"local"}
|
assert names == {"local"}
|
||||||
assert registry.indexable("local").is_available() is True
|
assert registry.indexable("local").is_available() is True
|
||||||
|
|
||||||
|
|
||||||
def test_registry_empty_when_path_unset() -> None:
|
def test_registry_empty_when_path_unset() -> None:
|
||||||
registry = build_source_registry(_settings(local_media_import_path=None))
|
registry = build_source_registry(
|
||||||
|
_settings(local_media_import_path=None, youtube_enabled=False)
|
||||||
|
)
|
||||||
assert registry.infos() == []
|
assert registry.infos() == []
|
||||||
|
|||||||
@@ -107,9 +107,7 @@ async def test_track_out_includes_genre_year_track_number(api: AsyncClient) -> N
|
|||||||
token = await _login(api)
|
token = await _login(api)
|
||||||
track_id = await _upload(api, token)
|
track_id = await _upload(api, token)
|
||||||
|
|
||||||
resp = await api.get(
|
resp = await api.get(f"/api/v1/tracks/{track_id}", headers={"Authorization": f"Bearer {token}"})
|
||||||
f"/api/v1/tracks/{track_id}", headers={"Authorization": f"Bearer {token}"}
|
|
||||||
)
|
|
||||||
assert resp.status_code == 200, resp.text
|
assert resp.status_code == 200, resp.text
|
||||||
body = resp.json()
|
body = resp.json()
|
||||||
assert "genre" in body
|
assert "genre" in body
|
||||||
|
|||||||
@@ -190,3 +190,25 @@ async def test_upload_requires_auth(api: AsyncClient) -> None:
|
|||||||
files={"file": ("x.mp3", b"data", "audio/mpeg")},
|
files={"file": ("x.mp3", b"data", "audio/mpeg")},
|
||||||
)
|
)
|
||||||
assert resp.status_code == 401
|
assert resp.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
async def test_list_tracks_filters_by_source(api: AsyncClient) -> None:
|
||||||
|
# Uploaded tracks carry source="upload"; `?source=` narrows the list (this
|
||||||
|
# powers the webui's "Recently uploaded" view, which survives a refresh).
|
||||||
|
token = await _login(api)
|
||||||
|
headers = {"Authorization": f"Bearer {token}"}
|
||||||
|
await api.post(
|
||||||
|
"/api/v1/upload",
|
||||||
|
files={"file": ("uploaded.mp3", b"upload bytes" * 80, "audio/mpeg")},
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
|
||||||
|
hit = await api.get("/api/v1/tracks", params={"source": "upload"}, headers=headers)
|
||||||
|
assert hit.status_code == 200, hit.text
|
||||||
|
items = hit.json()["items"]
|
||||||
|
assert len(items) == 1
|
||||||
|
assert items[0]["source"] == "upload"
|
||||||
|
|
||||||
|
miss = await api.get("/api/v1/tracks", params={"source": "youtube"}, headers=headers)
|
||||||
|
assert miss.status_code == 200
|
||||||
|
assert miss.json()["items"] == []
|
||||||
|
|||||||
@@ -0,0 +1,135 @@
|
|||||||
|
"""Unit tests for YouTubeMusicSource + registry (no network, injected libs)."""
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from app.core.config import Settings
|
||||||
|
from app.domain.sources import KIND_FETCH
|
||||||
|
from app.infrastructure.sources.registry import build_source_registry
|
||||||
|
from app.infrastructure.sources.youtube import YouTubeMusicSource
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.asyncio
|
||||||
|
|
||||||
|
|
||||||
|
def _song_row(**overrides: Any) -> dict[str, Any]:
|
||||||
|
row: dict[str, Any] = {
|
||||||
|
"videoId": "abc123",
|
||||||
|
"title": "Bohemian Rhapsody",
|
||||||
|
"artists": [{"name": "Queen", "id": "a1"}],
|
||||||
|
"album": {"name": "A Night at the Opera", "id": "al1"},
|
||||||
|
"duration_seconds": 354,
|
||||||
|
"thumbnails": [
|
||||||
|
{"url": "http://img/small.jpg", "width": 60, "height": 60},
|
||||||
|
{"url": "http://img/large.jpg", "width": 240, "height": 240},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
row.update(overrides)
|
||||||
|
return row
|
||||||
|
|
||||||
|
|
||||||
|
def _settings(**overrides: object) -> Settings:
|
||||||
|
return Settings(**overrides) # type: ignore[arg-type]
|
||||||
|
|
||||||
|
|
||||||
|
async def test_search_maps_ytmusic_rows() -> None:
|
||||||
|
source = YouTubeMusicSource(search_fn=lambda q, limit: [_song_row()])
|
||||||
|
[result] = await source.search("queen", limit=10)
|
||||||
|
|
||||||
|
assert result.source == "youtube"
|
||||||
|
assert result.source_id == "abc123"
|
||||||
|
assert result.title == "Bohemian Rhapsody"
|
||||||
|
assert result.artist == "Queen"
|
||||||
|
assert result.album == "A Night at the Opera"
|
||||||
|
assert result.duration_seconds == 354
|
||||||
|
assert result.thumbnail_url == "http://img/large.jpg" # last (largest)
|
||||||
|
|
||||||
|
|
||||||
|
async def test_search_joins_multiple_artists_and_tolerates_missing_fields() -> None:
|
||||||
|
row = _song_row(
|
||||||
|
artists=[{"name": "Queen"}, {"name": "David Bowie"}],
|
||||||
|
album=None,
|
||||||
|
thumbnails=[],
|
||||||
|
duration_seconds=None,
|
||||||
|
)
|
||||||
|
source = YouTubeMusicSource(search_fn=lambda q, limit: [row])
|
||||||
|
[result] = await source.search("under pressure", limit=10)
|
||||||
|
|
||||||
|
assert result.artist == "Queen, David Bowie"
|
||||||
|
assert result.album is None
|
||||||
|
assert result.thumbnail_url is None
|
||||||
|
assert result.duration_seconds is None
|
||||||
|
|
||||||
|
|
||||||
|
async def test_search_drops_rows_without_video_id() -> None:
|
||||||
|
rows = [_song_row(), _song_row(videoId=None), _song_row(videoId="xyz")]
|
||||||
|
source = YouTubeMusicSource(search_fn=lambda q, limit: rows)
|
||||||
|
results = await source.search("q", limit=10)
|
||||||
|
assert [r.source_id for r in results] == ["abc123", "xyz"]
|
||||||
|
|
||||||
|
|
||||||
|
async def test_search_empty_query_short_circuits() -> None:
|
||||||
|
called = False
|
||||||
|
|
||||||
|
def _search(q: str, limit: int) -> list[dict[str, Any]]:
|
||||||
|
nonlocal called
|
||||||
|
called = True
|
||||||
|
return []
|
||||||
|
|
||||||
|
source = YouTubeMusicSource(search_fn=_search)
|
||||||
|
assert await source.search(" ", limit=10) == []
|
||||||
|
assert called is False
|
||||||
|
|
||||||
|
|
||||||
|
async def test_search_degrades_to_empty_on_error() -> None:
|
||||||
|
def _boom(q: str, limit: int) -> list[dict[str, Any]]:
|
||||||
|
raise RuntimeError("service down")
|
||||||
|
|
||||||
|
source = YouTubeMusicSource(search_fn=_boom)
|
||||||
|
assert await source.search("q", limit=10) == []
|
||||||
|
|
||||||
|
|
||||||
|
async def test_fetch_maps_download_result(tmp_path: Path) -> None:
|
||||||
|
audio = tmp_path / "abc123.m4a"
|
||||||
|
audio.write_bytes(b"opus-bytes" * 10)
|
||||||
|
|
||||||
|
def _download(video_id: str, tmp_dir: Path, hook: Any, cookies: Path | None) -> dict[str, Any]:
|
||||||
|
return {
|
||||||
|
"filepath": audio,
|
||||||
|
"file_format": "m4a",
|
||||||
|
"bitrate": 160,
|
||||||
|
"title": "Bohemian Rhapsody",
|
||||||
|
}
|
||||||
|
|
||||||
|
source = YouTubeMusicSource(download_fn=_download)
|
||||||
|
result = await source.fetch("abc123")
|
||||||
|
|
||||||
|
assert result.source_id == "abc123"
|
||||||
|
assert result.path == audio
|
||||||
|
assert result.file_format == "m4a"
|
||||||
|
assert result.file_size == len(b"opus-bytes" * 10)
|
||||||
|
assert result.bitrate == 160
|
||||||
|
assert result.suggested_title == "Bohemian Rhapsody"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_info_and_availability_with_injected_fn() -> None:
|
||||||
|
source = YouTubeMusicSource(search_fn=lambda q, limit: [])
|
||||||
|
info = source.info()
|
||||||
|
assert info.name == "youtube"
|
||||||
|
assert info.kind == KIND_FETCH
|
||||||
|
assert info.available is True # injected fn → treated as available
|
||||||
|
|
||||||
|
|
||||||
|
async def test_registry_registers_youtube_when_enabled() -> None:
|
||||||
|
registry = build_source_registry(_settings(youtube_enabled=True))
|
||||||
|
names = {info.name for info in registry.infos()}
|
||||||
|
assert "youtube" in names
|
||||||
|
# youtube is searchable + fetchable, not indexable
|
||||||
|
assert registry.searchable("youtube").name == "youtube"
|
||||||
|
assert registry.fetchable("youtube").name == "youtube"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_registry_omits_youtube_when_disabled() -> None:
|
||||||
|
registry = build_source_registry(_settings(youtube_enabled=False))
|
||||||
|
names = {info.name for info in registry.infos()}
|
||||||
|
assert "youtube" not in names
|
||||||
Reference in New Issue
Block a user