Compare commits

...

4 Commits

Author SHA1 Message Date
Senko-san e45e578f54 feat(library): remote browse status + save/materialize API (§Phase2-3)
Docker Build & Publish / build (push) Successful in 1m11s
Docker Build & Publish / push (push) Failing after 6s
Docker Build & Publish / Prune old image versions (push) Has been skipped
Search results now report whether a hit is already saved (in_library,
track_id, availability). New RemoteLibraryService backs POST
/tracks/remote (idempotent placeholder save) and POST
/tracks/{id}/materialize (on-demand fetch via a new materialize_track
arq task, reusing in-flight jobs).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-14 18:11:01 +03:00
Senko-san 58b98ab5ed feat(library): lazy materialization foundation for remote tracks (§Phase1)
Docker Build & Publish / build (push) Successful in 1m10s
Docker Build & Publish / push (push) Failing after 7s
Docker Build & Publish / Prune old image versions (push) Has been skipped
Adds nullable storage fields + availability column on tracks, remote
source/source_id identity on albums/artists, TrackRepository.materialize()
and get_or_create_remote() repos — groundwork for on-demand YTM library
(placeholders saved without audio, materialized in-place on first play).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-14 17:51:43 +03:00
Senko-san 78007461e1 feat(sources): YouTube Music search + download pipeline (§1C/§1E)
Docker Build & Publish / build (push) Successful in 2m39s
Docker Build & Publish / push (push) Failing after 36s
Docker Build & Publish / Prune old image versions (push) Has been skipped
Pluggable fetch source: ytmusicapi search + yt-dlp download (cookies-file guard), DownloadJob entity/repo + DownloadService, download_task worker with exponential-backoff retries, and wired /search, /sources/{source}/search, and /downloads endpoints. Adds youtube_enabled/cookies config, yt-dlp+ytmusicapi deps, and the download_jobs.track_id migration. Snapshot also bundles in-progress storage/tracks/acoustid edits.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-14 14:04:33 +03:00
Senko-san ea880edd57 feat(tracks): filter track list by ingest source
Docker Build & Publish / build (push) Has been cancelled
Docker Build & Publish / push (push) Has been cancelled
Docker Build & Publish / Prune old image versions (push) Has been cancelled
Add an optional `source` filter to `GET /api/v1/tracks` (and the
`TrackRepository.list`/`count` port + SQLAlchemy adapter). Lets clients
query, e.g., only uploaded tracks (`?source=upload`) newest-first — the
backing for the webui's persistent "Recently uploaded" view.

- test: upload then list with `?source=upload` (hit) / `?source=youtube`
  (miss)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-14 01:35:51 +03:00
57 changed files with 3882 additions and 849 deletions
@@ -0,0 +1,47 @@
"""download_jobs: link finished job to its imported track
Revision ID: 20260614_dl_track_id
Revises: 20260613_enrich_outcome
Create Date: 2026-06-14 10:00:00.000000
Adds ``download_jobs.track_id`` (nullable FK → ``tracks.id``) so a completed
download can point at the library track it produced — the §A5 download manager
links a "done" job to the track, and re-runs can tell a job already imported
(plan §6.1).
"""
from __future__ import annotations
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
revision: str = "20260614_dl_track_id"
down_revision: str | None = "20260613_enrich_outcome"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.add_column(
"download_jobs",
sa.Column("track_id", sa.Uuid(), nullable=True),
)
op.create_foreign_key(
op.f("fk_download_jobs_track_id_tracks"),
"download_jobs",
"tracks",
["track_id"],
["id"],
ondelete="SET NULL",
)
def downgrade() -> None:
op.drop_constraint(
op.f("fk_download_jobs_track_id_tracks"),
"download_jobs",
type_="foreignkey",
)
op.drop_column("download_jobs", "track_id")
@@ -0,0 +1,65 @@
"""remote placeholders: track availability, album/artist remote ids
Revision ID: dc126696f5a6
Revises: 20260614_dl_track_id
Create Date: 2026-06-14 11:25:30.643588
"""
from __future__ import annotations
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = 'dc126696f5a6'
down_revision: str | None = '20260614_dl_track_id'
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('albums', sa.Column('source', sa.String(length=32), nullable=True))
op.add_column('albums', sa.Column('source_id', sa.String(length=512), nullable=True))
op.create_unique_constraint('uq_albums_source_source_id', 'albums', ['source', 'source_id'])
op.add_column('artists', sa.Column('source', sa.String(length=32), nullable=True))
op.add_column('artists', sa.Column('source_id', sa.String(length=512), nullable=True))
op.create_unique_constraint('uq_artists_source_source_id', 'artists', ['source', 'source_id'])
op.add_column(
'tracks',
sa.Column('availability', sa.String(length=16), nullable=False, server_default='local'),
)
op.alter_column('tracks', 'availability', server_default=None)
op.alter_column('tracks', 'storage_uri',
existing_type=sa.VARCHAR(length=2048),
nullable=True)
op.alter_column('tracks', 'file_format',
existing_type=sa.VARCHAR(length=32),
nullable=True)
op.alter_column('tracks', 'file_size',
existing_type=sa.INTEGER(),
nullable=True)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('tracks', 'file_size',
existing_type=sa.INTEGER(),
nullable=False)
op.alter_column('tracks', 'file_format',
existing_type=sa.VARCHAR(length=32),
nullable=False)
op.alter_column('tracks', 'storage_uri',
existing_type=sa.VARCHAR(length=2048),
nullable=False)
op.drop_column('tracks', 'availability')
op.drop_constraint('uq_artists_source_source_id', 'artists', type_='unique')
op.drop_column('artists', 'source_id')
op.drop_column('artists', 'source')
op.drop_constraint('uq_albums_source_source_id', 'albums', type_='unique')
op.drop_column('albums', 'source_id')
op.drop_column('albums', 'source')
# ### end Alembic commands ###
+27 -4
View File
@@ -15,7 +15,9 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.application.auth_service import AuthService from app.application.auth_service import AuthService
from app.application.download_service import DownloadService
from app.application.metadata_service import MetadataEnrichmentService from app.application.metadata_service import MetadataEnrichmentService
from app.application.remote_library_service import RemoteLibraryService
from app.application.streaming_service import StreamingService from app.application.streaming_service import StreamingService
from app.application.subsonic_auth_service import SubsonicAuthService from app.application.subsonic_auth_service import SubsonicAuthService
from app.application.upload_service import UploadService from app.application.upload_service import UploadService
@@ -29,6 +31,7 @@ from app.infrastructure.db import get_sessionmaker
from app.infrastructure.db.repositories import ( from app.infrastructure.db.repositories import (
SqlAlchemyAlbumRepository, SqlAlchemyAlbumRepository,
SqlAlchemyArtistRepository, SqlAlchemyArtistRepository,
SqlAlchemyDownloadJobRepository,
SqlAlchemyHistoryRepository, SqlAlchemyHistoryRepository,
SqlAlchemyLikeRepository, SqlAlchemyLikeRepository,
SqlAlchemyPlaylistRepository, SqlAlchemyPlaylistRepository,
@@ -41,7 +44,7 @@ from app.infrastructure.metadata.fingerprint import FpcalcFingerprinter
from app.infrastructure.metadata.tags import MutagenTagReader from app.infrastructure.metadata.tags import MutagenTagReader
from app.infrastructure.sources.registry import SourceRegistry, build_source_registry from app.infrastructure.sources.registry import SourceRegistry, build_source_registry
from app.infrastructure.storage.provider import get_file_storage from app.infrastructure.storage.provider import get_file_storage
from app.workers.queue import enqueue_enrich from app.workers.queue import enqueue_download, enqueue_enrich, enqueue_materialize
async def get_session() -> AsyncIterator[AsyncSession]: async def get_session() -> AsyncIterator[AsyncSession]:
@@ -136,9 +139,7 @@ def get_streaming_service(session: SessionDep, storage: FileStorageDep) -> Strea
) )
def get_metadata_service( def get_metadata_service(session: SessionDep, storage: FileStorageDep) -> MetadataEnrichmentService:
session: SessionDep, storage: FileStorageDep
) -> MetadataEnrichmentService:
"""Wires the §6.2 fingerprint/AcoustID adapters for read-only, inline use """Wires the §6.2 fingerprint/AcoustID adapters for read-only, inline use
(the metadata editor's "find matches" — §A7). The full pipeline (incl. (the metadata editor's "find matches" — §A7). The full pipeline (incl.
cover art) stays in the worker (`tasks/enrich_task.py`).""" cover art) stays in the worker (`tasks/enrich_task.py`)."""
@@ -161,9 +162,31 @@ def get_metadata_service(
) )
def get_download_service(session: SessionDep, storage: FileStorageDep) -> DownloadService:
return DownloadService(
jobs=SqlAlchemyDownloadJobRepository(session),
tracks=SqlAlchemyTrackRepository(session),
artists=SqlAlchemyArtistRepository(session),
storage=storage,
enqueue_download=enqueue_download,
enqueue_enrich=enqueue_enrich,
)
def get_remote_library_service(session: SessionDep) -> RemoteLibraryService:
return RemoteLibraryService(
tracks=SqlAlchemyTrackRepository(session),
artists=SqlAlchemyArtistRepository(session),
jobs=SqlAlchemyDownloadJobRepository(session),
enqueue_materialize=enqueue_materialize,
)
UploadServiceDep = Annotated[UploadService, Depends(get_upload_service)] UploadServiceDep = Annotated[UploadService, Depends(get_upload_service)]
StreamingServiceDep = Annotated[StreamingService, Depends(get_streaming_service)] StreamingServiceDep = Annotated[StreamingService, Depends(get_streaming_service)]
MetadataServiceDep = Annotated[MetadataEnrichmentService, Depends(get_metadata_service)] MetadataServiceDep = Annotated[MetadataEnrichmentService, Depends(get_metadata_service)]
DownloadServiceDep = Annotated[DownloadService, Depends(get_download_service)]
RemoteLibraryServiceDep = Annotated[RemoteLibraryService, Depends(get_remote_library_service)]
# -- library repository deps --------------------------------------------------- # -- library repository deps ---------------------------------------------------
+1 -1
View File
@@ -65,7 +65,7 @@ async def download(
if track is None: if track is None:
raise NotFoundError("Song not found.") raise NotFoundError("Song not found.")
result = await service.open_stream(track_id, None) result = await service.open_stream(track_id, None)
filename = f"{track.title}.{track.file_format}" filename = f"{track.title}.{track.file_format or 'bin'}"
headers = { headers = {
"Content-Length": str(result.content_length), "Content-Length": str(result.content_length),
"Content-Disposition": f'attachment; filename="{filename}"', "Content-Disposition": f'attachment; filename="{filename}"',
+2 -2
View File
@@ -80,8 +80,8 @@ def song_dict(
"albumId": encode_album(track.album_id) if track.album_id is not None else None, "albumId": encode_album(track.album_id) if track.album_id is not None else None,
"artistId": encode_artist(track.artist_id), "artistId": encode_artist(track.artist_id),
"coverArt": cover, "coverArt": cover,
"size": track.file_size, "size": track.file_size or 0,
"contentType": content_type_for(track.file_format), "contentType": content_type_for(track.file_format or ""),
"suffix": track.file_format, "suffix": track.file_format,
"duration": track.duration_seconds, "duration": track.duration_seconds,
"year": track.year, "year": track.year,
+59
View File
@@ -0,0 +1,59 @@
"""Schemas for the download job endpoints (§A5 download manager)."""
import datetime as dt
import uuid
from pydantic import BaseModel, Field
from app.domain.entities.download import DownloadJob
class DownloadCreate(BaseModel):
"""Request to download an item discovered on a fetch source."""
source: str
source_id: str = Field(min_length=1)
# Optional free-text the result came from — stored for display only.
query: str | None = None
class DownloadJobOut(BaseModel):
id: uuid.UUID
source: str
source_id: str | None
query: str | None
status: str
progress: float
error_message: str | None
retry_count: int
track_id: uuid.UUID | None
created_at: dt.datetime
updated_at: dt.datetime
@classmethod
def from_entity(cls, job: DownloadJob) -> DownloadJobOut:
return cls(
id=job.id,
source=job.source,
source_id=job.source_id,
query=job.query,
status=job.status,
progress=job.progress,
error_message=job.error_message,
retry_count=job.retry_count,
track_id=job.track_id,
created_at=job.created_at,
updated_at=job.updated_at,
)
class DownloadCreateResponse(BaseModel):
"""Result of requesting a download.
``already_in_library`` → the item was already imported (``track_id`` set, no
job). Otherwise ``job`` describes the queued (or already in-flight) download.
"""
already_in_library: bool
track_id: uuid.UUID | None
job: DownloadJobOut | None
+48
View File
@@ -0,0 +1,48 @@
"""Schemas for searching external (fetch) sources — the §A4 discover screen."""
import uuid
from pydantic import BaseModel
from app.domain.entities.track import Track
from app.domain.sources import SearchResult
class ExternalSearchResultOut(BaseModel):
source: str
source_id: str
title: str
artist: str | None
album: str | None
duration_seconds: int | None
thumbnail_url: str | None
# Remote browse (plan: Model C) — set when this hit is already saved in the
# library, so the UI can show "Play"/"Saved" instead of "Save to library".
in_library: bool
track_id: uuid.UUID | None
availability: str | None
@classmethod
def from_entity(
cls, r: SearchResult, *, existing: Track | None = None
) -> ExternalSearchResultOut:
return cls(
source=r.source,
source_id=r.source_id,
title=r.title,
artist=r.artist,
album=r.album,
duration_seconds=r.duration_seconds,
thumbnail_url=r.thumbnail_url,
in_library=existing is not None,
track_id=existing.id if existing is not None else None,
availability=existing.availability if existing is not None else None,
)
class ExternalSearchResponse(BaseModel):
"""Flat list of hits across one or more searchable sources, plus the names of
sources that were unavailable (so the UI can show a soft warning)."""
results: list[ExternalSearchResultOut]
searched_sources: list[str]
+27 -3
View File
@@ -3,7 +3,9 @@
import datetime as dt import datetime as dt
import uuid import uuid
from pydantic import BaseModel from pydantic import BaseModel, Field
from app.api.schemas.download import DownloadJobOut
class TrackOut(BaseModel): class TrackOut(BaseModel):
@@ -14,14 +16,15 @@ class TrackOut(BaseModel):
album_id: uuid.UUID | None album_id: uuid.UUID | None
album_title: str | None album_title: str | None
duration_seconds: int | None duration_seconds: int | None
file_format: str file_format: str | None
file_size: int file_size: int | None
genre: str | None genre: str | None
year: int | None year: int | None
track_number: int | None track_number: int | None
metadata_status: str metadata_status: str
metadata_error: str | None metadata_error: str | None
enriched_at: dt.datetime | None enriched_at: dt.datetime | None
availability: str
source: str source: str
has_cover: bool has_cover: bool
created_at: dt.datetime created_at: dt.datetime
@@ -61,3 +64,24 @@ class MetadataApply(BaseModel):
year: int | None = None year: int | None = None
genre: str | None = None genre: str | None = None
track_number: int | None = None track_number: int | None = None
class RemoteTrackSave(BaseModel):
"""Save a remote browse hit (§A4 discover) as a library placeholder —
``availability="remote"``, no audio until first play (plan: Model C)."""
source: str
source_id: str = Field(min_length=1)
title: str
artist: str | None = None
class MaterializeResponse(BaseModel):
"""Result of requesting that a placeholder track's audio be fetched.
``job`` is ``None`` when the track is already ``local`` — nothing to wait
for, the caller can stream immediately. Otherwise it's the (new or
already in-flight) job; poll ``GET /downloads/{job.id}`` until ``done``."""
track: TrackOut
job: DownloadJobOut | None
+60 -18
View File
@@ -1,36 +1,78 @@
"""Download job endpoints. Heavy work is dispatched to arq workers.""" """Download job endpoints (§A5). Heavy work is dispatched to arq workers — these
handlers only create/inspect/cancel/retry job records."""
import uuid import uuid
from typing import Any
from fastapi import APIRouter from fastapi import APIRouter, Query, Response
from app.api.deps import CurrentUser, DownloadServiceDep
from app.api.schemas.download import DownloadCreate, DownloadCreateResponse, DownloadJobOut
from app.api.schemas.pagination import PagedResponse
router = APIRouter(prefix="/downloads", tags=["downloads"]) router = APIRouter(prefix="/downloads", tags=["downloads"])
@router.get("") @router.get("")
async def list_downloads() -> Any: ... async def list_downloads(
service: DownloadServiceDep,
user: CurrentUser,
status: str | None = Query(default=None),
mine: bool = Query(default=False),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
) -> PagedResponse[DownloadJobOut]:
jobs, total = await service.list(
requested_by=user.id if mine else None,
status=status,
limit=limit,
offset=offset,
)
return PagedResponse(
items=[DownloadJobOut.from_entity(j) for j in jobs],
total=total,
limit=limit,
offset=offset,
)
@router.post("") @router.post("", status_code=202)
async def create_download() -> Any: ... async def create_download(
body: DownloadCreate,
service: DownloadServiceDep,
user: CurrentUser,
) -> DownloadCreateResponse:
result = await service.request(
source=body.source,
source_id=body.source_id,
query=body.query,
requested_by=user.id,
)
return DownloadCreateResponse(
already_in_library=result.already_in_library,
track_id=result.track_id,
job=DownloadJobOut.from_entity(result.job) if result.job is not None else None,
)
@router.get("/{job_id}") @router.get("/{job_id}")
async def get_download(job_id: uuid.UUID) -> Any: ... async def get_download(
job_id: uuid.UUID, service: DownloadServiceDep, _: CurrentUser
) -> DownloadJobOut:
job = await service.get(job_id)
return DownloadJobOut.from_entity(job)
@router.delete("/{job_id}") @router.delete("/{job_id}", status_code=204)
async def cancel_download(job_id: uuid.UUID) -> Any: ... async def cancel_download(
job_id: uuid.UUID, service: DownloadServiceDep, _: CurrentUser
) -> Response:
await service.cancel(job_id)
return Response(status_code=204)
@router.post("/{job_id}/retry") @router.post("/{job_id}/retry")
async def retry_download(job_id: uuid.UUID) -> Any: ... async def retry_download(
job_id: uuid.UUID, service: DownloadServiceDep, _: CurrentUser
) -> DownloadJobOut:
@router.post("/pause") job = await service.retry(job_id)
async def pause_downloads() -> Any: ... return DownloadJobOut.from_entity(job)
@router.post("/resume")
async def resume_downloads() -> Any: ...
+27 -4
View File
@@ -1,12 +1,11 @@
"""Search endpoints: global and library-scoped.""" """Search endpoints: global and library-scoped."""
from typing import Any
from fastapi import APIRouter, Query from fastapi import APIRouter, Query
from app.api.deps import AlbumRepoDep, ArtistRepoDep, CurrentUser, TrackRepoDep from app.api.deps import AlbumRepoDep, ArtistRepoDep, CurrentUser, SourceRegistryDep, TrackRepoDep
from app.api.schemas.album import AlbumOut from app.api.schemas.album import AlbumOut
from app.api.schemas.artist import ArtistOut from app.api.schemas.artist import ArtistOut
from app.api.schemas.external_search import ExternalSearchResponse, ExternalSearchResultOut
from app.api.schemas.search import LibrarySearchResponse from app.api.schemas.search import LibrarySearchResponse
from app.api.schemas.track import TrackOut from app.api.schemas.track import TrackOut
from app.api.v1.albums import _build_album_out from app.api.v1.albums import _build_album_out
@@ -16,7 +15,31 @@ router = APIRouter(prefix="/search", tags=["search"])
@router.get("") @router.get("")
async def search(_: CurrentUser) -> Any: ... async def search(
_: CurrentUser,
registry: SourceRegistryDep,
track_repo: TrackRepoDep,
q: str = Query(min_length=1),
limit: int = Query(20, ge=1, le=50),
) -> ExternalSearchResponse:
"""Search every available fetch source and merge the hits (§A4 discover).
A source that is down contributes nothing rather than failing the whole
request (graceful degradation); only available sources are reported as
searched. Each hit is checked against the library by ``(source,
source_id)`` so the UI can show "Saved"/"Play" instead of "Save to
library" without a separate round-trip (remote browse, plan: Model C)."""
results: list[ExternalSearchResultOut] = []
searched: list[str] = []
for backend in registry.searchables():
if not backend.is_available():
continue
searched.append(backend.name)
hits = await backend.search(q, limit=limit)
for h in hits:
existing = await track_repo.get_by_source(h.source, h.source_id)
results.append(ExternalSearchResultOut.from_entity(h, existing=existing))
return ExternalSearchResponse(results=results, searched_sources=searched)
@router.get("/library") @router.get("/library")
+23 -10
View File
@@ -1,14 +1,13 @@
"""External source endpoints: enumerate sources and trigger imports. """External source endpoints: enumerate sources, search, and trigger imports.
Listing/health are read-only (any authenticated user). Scanning a source is an Listing/health/search are read-only (any authenticated user). Scanning a source
admin action and runs in a worker — the endpoint only enqueues it. is an admin action and runs in a worker — the endpoint only enqueues it.
""" """
from typing import Any from fastapi import APIRouter, Query
from fastapi import APIRouter from app.api.deps import CurrentUser, SourceRegistryDep, SuperUser, TrackRepoDep
from app.api.schemas.external_search import ExternalSearchResponse, ExternalSearchResultOut
from app.api.deps import CurrentUser, SourceRegistryDep, SuperUser
from app.api.schemas.source import ScanResponse, SourceHealthOut, SourceInfoOut from app.api.schemas.source import ScanResponse, SourceHealthOut, SourceInfoOut
from app.domain.errors import DependencyUnavailableError from app.domain.errors import DependencyUnavailableError
from app.workers.queue import enqueue from app.workers.queue import enqueue
@@ -39,6 +38,20 @@ async def source_health(
@router.get("/{source}/search") @router.get("/{source}/search")
async def search_source(source: str, _: CurrentUser) -> Any: async def search_source(
# Search is for fetch-style sources (youtube, …) — not yet implemented. source: str,
... _: CurrentUser,
registry: SourceRegistryDep,
track_repo: TrackRepoDep,
q: str = Query(min_length=1),
limit: int = Query(20, ge=1, le=50),
) -> ExternalSearchResponse:
backend = registry.searchable(source) # 404 if unknown, 422 if not searchable
if not backend.is_available():
raise DependencyUnavailableError(f"Source {source!r} is not available.")
results = await backend.search(q, limit=limit)
out: list[ExternalSearchResultOut] = []
for r in results:
existing = await track_repo.get_by_source(r.source, r.source_id)
out.append(ExternalSearchResultOut.from_entity(r, existing=existing))
return ExternalSearchResponse(results=out, searched_sources=[source])
+1 -2
View File
@@ -63,8 +63,7 @@ async def get_storage_stats(
by_metadata_status=stats.by_metadata_status, by_metadata_status=stats.by_metadata_status,
by_source=stats.by_source, by_source=stats.by_source,
top_genres=[ top_genres=[
GenreCountOut(genre=genre, track_count=count) GenreCountOut(genre=genre, track_count=count) for genre, count in genres[:_TOP_GENRES]
for genre, count in genres[:_TOP_GENRES]
], ],
disk=DiskUsageOut(total=disk.total, used=disk.used, free=disk.free) if disk else None, disk=DiskUsageOut(total=disk.total, used=disk.used, free=disk.free) if disk else None,
) )
+61 -2
View File
@@ -13,14 +13,18 @@ from app.api.deps import (
CurrentUser, CurrentUser,
FileStorageDep, FileStorageDep,
MetadataServiceDep, MetadataServiceDep,
RemoteLibraryServiceDep,
StreamUser, StreamUser,
TrackRepoDep, TrackRepoDep,
) )
from app.api.schemas.download import DownloadJobOut
from app.api.schemas.pagination import PagedResponse from app.api.schemas.pagination import PagedResponse
from app.api.schemas.track import ( from app.api.schemas.track import (
MaterializeResponse,
MetadataApply, MetadataApply,
MetadataMatch, MetadataMatch,
MetadataMatchesOut, MetadataMatchesOut,
RemoteTrackSave,
TrackOut, TrackOut,
TrackUpdate, TrackUpdate,
) )
@@ -54,6 +58,7 @@ async def _build_track_out(
metadata_status=t.metadata_status, metadata_status=t.metadata_status,
metadata_error=t.metadata_error, metadata_error=t.metadata_error,
enriched_at=t.enriched_at, enriched_at=t.enriched_at,
availability=t.availability,
source=t.source, source=t.source,
has_cover=bool(t.album_id and albums.get(t.album_id) and albums[t.album_id].cover_path), has_cover=bool(t.album_id and albums.get(t.album_id) and albums[t.album_id].cover_path),
created_at=t.created_at, created_at=t.created_at,
@@ -71,6 +76,7 @@ async def list_tracks(
artist_id: uuid.UUID | None = None, artist_id: uuid.UUID | None = None,
album_id: uuid.UUID | None = None, album_id: uuid.UUID | None = None,
q: str | None = None, q: str | None = None,
source: str | None = Query(None, max_length=32),
sort_by: str = Query("created_at", pattern="^(title|created_at|artist)$"), sort_by: str = Query("created_at", pattern="^(title|created_at|artist)$"),
order: str = Query("desc", pattern="^(asc|desc)$"), order: str = Query("desc", pattern="^(asc|desc)$"),
limit: int = Query(50, ge=1, le=200), limit: int = Query(50, ge=1, le=200),
@@ -80,12 +86,13 @@ async def list_tracks(
artist_id=artist_id, artist_id=artist_id,
album_id=album_id, album_id=album_id,
q=q, q=q,
source=source,
sort_by=sort_by, sort_by=sort_by,
order=order, order=order,
limit=limit, limit=limit,
offset=offset, offset=offset,
) )
total = await track_repo.count(artist_id=artist_id, album_id=album_id, q=q) total = await track_repo.count(artist_id=artist_id, album_id=album_id, q=q, source=source)
artist_ids = list({t.artist_id for t in tracks}) artist_ids = list({t.artist_id for t in tracks})
album_ids = list({t.album_id for t in tracks if t.album_id is not None}) album_ids = list({t.album_id for t in tracks if t.album_id is not None})
@@ -96,6 +103,57 @@ async def list_tracks(
return PagedResponse(items=items, total=total, limit=limit, offset=offset) return PagedResponse(items=items, total=total, limit=limit, offset=offset)
@router.post("/remote", status_code=201)
async def save_remote_track(
body: RemoteTrackSave,
service: RemoteLibraryServiceDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
user: CurrentUser,
) -> TrackOut:
"""Save a remote browse hit (§A4 discover) as a library placeholder —
no audio is fetched yet (plan: Model C). Idempotent on ``(source,
source_id)``: saving an already-saved hit returns the existing track."""
track = await service.save_remote(
source=body.source,
source_id=body.source_id,
title=body.title,
artist=body.artist,
added_by=user.id,
)
artists = {a.id: a for a in await artist_repo.get_many([track.artist_id])}
album_ids = [track.album_id] if track.album_id else []
albums = {a.id: a for a in await album_repo.get_many(album_ids)}
items = await _build_track_out([track], artists, albums)
return items[0]
@router.post("/{track_id}/materialize")
async def materialize_track(
track_id: uuid.UUID,
service: RemoteLibraryServiceDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
user: CurrentUser,
) -> MaterializeResponse:
"""Fetch a placeholder track's audio on demand (plan: Model C lazy
materialization). Already-local tracks return ``job=None`` — nothing to
wait for. Otherwise poll ``GET /downloads/{job.id}`` until ``done``, then
stream as usual."""
outcome = await service.request_materialize(track_id, requested_by=user.id)
artists = {a.id: a for a in await artist_repo.get_many([outcome.track.artist_id])}
album_ids = [outcome.track.album_id] if outcome.track.album_id else []
albums = {a.id: a for a in await album_repo.get_many(album_ids)}
track_out = (await _build_track_out([outcome.track], artists, albums))[0]
return MaterializeResponse(
track=track_out,
job=DownloadJobOut.from_entity(outcome.job) if outcome.job is not None else None,
)
@router.get("/{track_id}") @router.get("/{track_id}")
async def get_track( async def get_track(
track_id: uuid.UUID, track_id: uuid.UUID,
@@ -153,7 +211,8 @@ async def delete_track(
if track is None: if track is None:
raise NotFoundError(f"Track {track_id} not found.") raise NotFoundError(f"Track {track_id} not found.")
await track_repo.delete(track_id) await track_repo.delete(track_id)
await storage.delete(track.storage_uri) if track.storage_uri is not None:
await storage.delete(track.storage_uri)
return Response(status_code=204) return Response(status_code=204)
+183
View File
@@ -0,0 +1,183 @@
"""DownloadService — request external downloads and import their results.
Two roles (plan §6.1):
* **Request side** (HTTP): validate + dedup a download request, create a
``queued`` job, and enqueue the worker. Dedup is on ``(source, source_id)``
against both the library (already imported) and in-flight jobs (a double-click
must not queue twice) — idempotency per CLAUDE.md.
* **Worker side**: ``store_result`` turns a backend's :class:`DownloadResult`
into a managed file + minimal ``pending`` track (sibling of
:class:`~app.application.import_service.LibraryImportService`); enrichment
(§6.2) fills the rest.
The fingerprint-level dedup (a different id that turns out to be the same audio)
happens later in enrichment, where the fingerprint is computed.
"""
import contextlib
import uuid
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
import anyio
from app.core.logging import get_logger
from app.domain.entities.download import DownloadJob
from app.domain.errors import NotFoundError, ValidationError
from app.domain.ports import (
ArtistRepository,
DownloadJobRepository,
FileStorage,
TrackRepository,
)
from app.domain.sources import DownloadResult
log = get_logger(__name__)
_UNKNOWN_ARTIST = "Unknown Artist"
# (job_id) -> None — enqueue the download worker, deferred so the job row is
# committed before the worker reads it (same pattern as enrich).
DownloadEnqueuer = Callable[[uuid.UUID], Awaitable[None]]
EnrichEnqueuer = Callable[[uuid.UUID], Awaitable[None]]
@dataclass(frozen=True)
class DownloadRequest:
"""Outcome of asking for a download.
Exactly one of the three states holds: the item is already in the library
(``track_id`` set, ``already_in_library``), a job already covers it / was
just created (``job`` set), so the UI can route to the download manager.
"""
job: DownloadJob | None
track_id: uuid.UUID | None
already_in_library: bool
class DownloadService:
def __init__(
self,
*,
jobs: DownloadJobRepository,
tracks: TrackRepository,
artists: ArtistRepository,
storage: FileStorage,
enqueue_download: DownloadEnqueuer | None = None,
enqueue_enrich: EnrichEnqueuer | None = None,
) -> None:
self._jobs = jobs
self._tracks = tracks
self._artists = artists
self._storage = storage
self._enqueue_download = enqueue_download
self._enqueue_enrich = enqueue_enrich
# -- request side ---------------------------------------------------------
async def request(
self,
*,
source: str,
source_id: str,
query: str | None,
requested_by: uuid.UUID | None,
) -> DownloadRequest:
source_id = source_id.strip()
if not source_id:
raise ValidationError("A source_id is required to download.")
existing = await self._tracks.get_by_source(source, source_id)
if existing is not None:
return DownloadRequest(job=None, track_id=existing.id, already_in_library=True)
active = await self._jobs.get_active_for_source(source, source_id)
if active is not None:
return DownloadRequest(job=active, track_id=None, already_in_library=False)
job = await self._jobs.add(
source=source,
source_id=source_id,
query=query,
requested_by=requested_by,
)
if self._enqueue_download is not None:
await self._enqueue_download(job.id)
return DownloadRequest(job=job, track_id=None, already_in_library=False)
async def list(
self,
*,
requested_by: uuid.UUID | None,
status: str | None,
limit: int,
offset: int,
) -> tuple[list[DownloadJob], int]:
jobs = await self._jobs.list(
requested_by=requested_by, status=status, limit=limit, offset=offset
)
total = await self._jobs.count(requested_by=requested_by, status=status)
return jobs, total
async def get(self, job_id: uuid.UUID) -> DownloadJob:
job = await self._jobs.get_by_id(job_id)
if job is None:
raise NotFoundError(f"Download job {job_id} not found.")
return job
async def cancel(self, job_id: uuid.UUID) -> None:
"""Remove the job record. True mid-flight cancellation of an in-progress
yt-dlp download is out of scope (MVP); the worker tolerates a vanished
job row (its status writes become no-ops)."""
job = await self._jobs.get_by_id(job_id)
if job is None:
raise NotFoundError(f"Download job {job_id} not found.")
await self._jobs.delete(job_id)
async def retry(self, job_id: uuid.UUID) -> DownloadJob:
job = await self.get(job_id)
await self._jobs.set_status(job_id, status="queued", error_message=None)
if self._enqueue_download is not None:
await self._enqueue_download(job_id)
refreshed = await self._jobs.get_by_id(job_id)
return refreshed if refreshed is not None else job
# -- worker side ----------------------------------------------------------
async def store_result(
self,
*,
source: str,
result: DownloadResult,
requested_by: uuid.UUID | None,
) -> uuid.UUID:
"""Store a freshly downloaded file and create a minimal ``pending`` track.
Returns the new track id (the caller enqueues enrichment after commit).
The temp file produced by the backend is always removed."""
track_id = uuid.uuid4()
key = f"tracks/{str(track_id)[:2]}/{track_id}.{result.file_format}"
try:
await self._storage.save_file(key, result.path)
try:
artist = await self._artists.get_or_create(_UNKNOWN_ARTIST)
await self._tracks.add(
id=track_id,
title=result.suggested_title,
artist_id=artist.id,
storage_uri=key,
file_format=result.file_format,
file_size=result.file_size,
source=source,
source_id=result.source_id,
metadata_status="pending",
added_by=requested_by,
)
except Exception:
with contextlib.suppress(Exception):
await self._storage.delete(key)
raise
finally:
with contextlib.suppress(Exception):
await anyio.Path(result.path).unlink(missing_ok=True)
return track_id
+9 -3
View File
@@ -79,9 +79,13 @@ class MetadataEnrichmentService:
if track.metadata_status == "manual": if track.metadata_status == "manual":
log.info("enrich_skip_manual", track_id=str(track_id)) log.info("enrich_skip_manual", track_id=str(track_id))
return EnrichmentResult(track_id=track_id, status="skipped") return EnrichmentResult(track_id=track_id, status="skipped")
storage_uri = track.storage_uri
if storage_uri is None:
log.info("enrich_skip_remote", track_id=str(track_id))
return EnrichmentResult(track_id=track_id, status="skipped")
tags = await self._read_local(track.storage_uri) tags = await self._read_local(storage_uri)
match = await self._identify(track.storage_uri) match = await self._identify(storage_uri)
# Merge order is tag-first by default — embedded tags fix the common # Merge order is tag-first by default — embedded tags fix the common
# well-tagged offline case. But a *high-confidence* AcoustID match is the # well-tagged offline case. But a *high-confidence* AcoustID match is the
@@ -125,7 +129,7 @@ class MetadataEnrichmentService:
if album is not None: if album is not None:
await self._resolve_cover( await self._resolve_cover(
album, album,
storage_uri=track.storage_uri, storage_uri=storage_uri,
release_group_mbid=match.release_group_mbid if match else None, release_group_mbid=match.release_group_mbid if match else None,
) )
@@ -175,6 +179,8 @@ class MetadataEnrichmentService:
return [] return []
if not self._acoustid.is_available() or not self._fingerprinter.is_available(): if not self._acoustid.is_available() or not self._fingerprinter.is_available():
return [] return []
if track.storage_uri is None:
return []
try: try:
async with self._storage.as_local_path(track.storage_uri) as path: async with self._storage.as_local_path(track.storage_uri) as path:
fingerprint = await self._fingerprinter.calculate(path) fingerprint = await self._fingerprinter.calculate(path)
+122
View File
@@ -0,0 +1,122 @@
"""RemoteLibraryService — save-to-library + materialize for remote browse hits
(plan: Model C, on-demand YTM library).
Two operations:
* ``save_remote`` persists a placeholder ``Track`` (``availability="remote"``,
``storage_uri=None``) for a remote browse hit. Idempotent on
``(source, source_id)`` — CLAUDE.md dedup.
* ``request_materialize`` lazily fills a placeholder's audio in place: it
creates (or reuses) a ``DownloadJob`` pointing at the existing track and
enqueues the materialize worker, which calls ``TrackRepository.materialize``
on completion. ``track.id`` never changes (CLAUDE.md), so likes/playlists/
queue entries referencing the placeholder keep working once it's filled in.
"""
import uuid
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from app.domain.entities.download import DownloadJob
from app.domain.entities.track import Track
from app.domain.errors import NotFoundError, ValidationError
from app.domain.ports import ArtistRepository, DownloadJobRepository, TrackRepository
_UNKNOWN_ARTIST = "Unknown Artist"
# (job_id) -> None — enqueue the materialize worker, same deferred pattern as
# download/enrich enqueuers.
MaterializeEnqueuer = Callable[[uuid.UUID], Awaitable[None]]
@dataclass(frozen=True)
class MaterializeOutcome:
"""Result of requesting materialization.
``job`` is ``None`` when the track is already ``local`` — nothing to do,
the caller can stream immediately. Otherwise it's the (new or already
in-flight) job filling the placeholder."""
track: Track
job: DownloadJob | None
class RemoteLibraryService:
def __init__(
self,
*,
tracks: TrackRepository,
artists: ArtistRepository,
jobs: DownloadJobRepository,
enqueue_materialize: MaterializeEnqueuer | None = None,
) -> None:
self._tracks = tracks
self._artists = artists
self._jobs = jobs
self._enqueue_materialize = enqueue_materialize
async def save_remote(
self,
*,
source: str,
source_id: str,
title: str,
artist: str | None,
added_by: uuid.UUID | None,
) -> Track:
"""Persist a placeholder for a remote browse hit. Idempotent: a hit
already saved (by ``(source, source_id)``) is returned as-is."""
source_id = source_id.strip()
if not source_id:
raise ValidationError("A source_id is required to save.")
existing = await self._tracks.get_by_source(source, source_id)
if existing is not None:
return existing
artist_entity = await self._artists.get_or_create(artist or _UNKNOWN_ARTIST)
return await self._tracks.add(
id=uuid.uuid4(),
title=title,
artist_id=artist_entity.id,
storage_uri=None,
file_format=None,
file_size=None,
source=source,
source_id=source_id,
metadata_status="pending",
added_by=added_by,
availability="remote",
)
async def request_materialize(
self, track_id: uuid.UUID, *, requested_by: uuid.UUID | None
) -> MaterializeOutcome:
"""Kick off (or report on) materializing a placeholder track.
Already-local tracks are a no-op (``job=None``). A track with no
remote ``source_id`` (e.g. a deleted upload row reused for something
else) can't be materialized."""
track = await self._tracks.get_by_id(track_id)
if track is None:
raise NotFoundError(f"Track {track_id} not found.")
if track.availability == "local":
return MaterializeOutcome(track=track, job=None)
if track.source_id is None:
raise ValidationError("Track has no remote source to materialize from.")
active = await self._jobs.get_active_for_source(track.source, track.source_id)
if active is not None:
return MaterializeOutcome(track=track, job=active)
job = await self._jobs.add(
source=track.source,
source_id=track.source_id,
query=None,
requested_by=requested_by,
)
await self._jobs.set_status(job.id, status="queued", track_id=track.id)
if self._enqueue_materialize is not None:
await self._enqueue_materialize(job.id)
refreshed = await self._jobs.get_by_id(job.id)
return MaterializeOutcome(track=track, job=refreshed if refreshed is not None else job)
+6 -3
View File
@@ -72,16 +72,19 @@ class StreamingService:
track = await self._tracks.get_by_id(track_id) track = await self._tracks.get_by_id(track_id)
if track is None: if track is None:
raise NotFoundError("Track not found.") raise NotFoundError("Track not found.")
storage_uri = track.storage_uri
if storage_uri is None:
raise NotFoundError("Track is not yet downloaded.")
stat = await self._storage.stat(track.storage_uri) stat = await self._storage.stat(storage_uri)
total_size = stat.size total_size = stat.size
content_type = stat.content_type or _FORMAT_CONTENT_TYPE.get( content_type = stat.content_type or _FORMAT_CONTENT_TYPE.get(
track.file_format.lower(), "application/octet-stream" (track.file_format or "").lower(), "application/octet-stream"
) )
start, end, is_partial = _parse_range(range_header, total_size) start, end, is_partial = _parse_range(range_header, total_size)
stream, _ = await self._storage.open_range(track.storage_uri, start, end) stream, _ = await self._storage.open_range(storage_uri, start, end)
actual_end = end if end is not None else total_size - 1 actual_end = end if end is not None else total_size - 1
content_length = actual_end - start + 1 content_length = actual_end - start + 1
+8
View File
@@ -71,6 +71,9 @@ class Settings(BaseSettings):
media_path: Path = Path("/data/media") media_path: Path = Path("/data/media")
transcode_cache_path: Path = Path("/data/transcode-cache") transcode_cache_path: Path = Path("/data/transcode-cache")
max_parallel_downloads: int = 2 max_parallel_downloads: int = 2
# How many times the download worker retries a failed fetch (yt-dlp fails
# often) before marking the job ``failed`` — exponential backoff between tries.
download_max_retries: int = 3
storage_backend: Literal["local", "s3"] = "local" storage_backend: Literal["local", "s3"] = "local"
upload_tmp_dir: Path | None = None upload_tmp_dir: Path | None = None
@@ -100,6 +103,11 @@ class Settings(BaseSettings):
# deployments should set their own contact email; see # deployments should set their own contact email; see
# ``musicbrainz_user_agent`` below for how it's used. # ``musicbrainz_user_agent`` below for how it's used.
musicbrainz_owner_email: str | None = None musicbrainz_owner_email: str | None = None
# ``youtube`` fetch source (search + download via ytmusicapi/yt-dlp). Enabled
# by default; the source still reports unavailable if the libs aren't present.
youtube_enabled: bool = True
# Optional cookies file (Netscape format) for yt-dlp — lets it fetch
# age-restricted / region-locked items via an authenticated session.
youtube_cookies_path: Path | None = None youtube_cookies_path: Path | None = None
# -- enrichment ------------------------------------------------------- # -- enrichment -------------------------------------------------------
+2
View File
@@ -2,6 +2,7 @@
from app.domain.entities.album import Album from app.domain.entities.album import Album
from app.domain.entities.cover import CoverArt from app.domain.entities.cover import CoverArt
from app.domain.entities.download import DownloadJob
from app.domain.entities.history import PlayHistoryEntry from app.domain.entities.history import PlayHistoryEntry
from app.domain.entities.like import Like from app.domain.entities.like import Like
from app.domain.entities.metadata import AudioTags, Fingerprint, RecordingMatch from app.domain.entities.metadata import AudioTags, Fingerprint, RecordingMatch
@@ -22,6 +23,7 @@ __all__ = [
"CoverArt", "CoverArt",
"Credentials", "Credentials",
"DiskUsage", "DiskUsage",
"DownloadJob",
"Fingerprint", "Fingerprint",
"FormatBreakdown", "FormatBreakdown",
"LibraryStats", "LibraryStats",
+2
View File
@@ -13,5 +13,7 @@ class Album:
year: int | None year: int | None
cover_path: str | None cover_path: str | None
musicbrainz_id: str | None musicbrainz_id: str | None
source: str | None
source_id: str | None
created_at: dt.datetime created_at: dt.datetime
updated_at: dt.datetime updated_at: dt.datetime
+26
View File
@@ -0,0 +1,26 @@
"""Download job domain entity (plan §6.1).
A queued fetch from an external source, tracked through its lifecycle so the UI
download manager (screen §A5) can show progress, errors, and retries. The
``status`` strings mirror :class:`~app.infrastructure.db.models.enums.DownloadStatus`.
"""
import datetime as dt
import uuid
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class DownloadJob:
id: uuid.UUID
source: str
source_id: str | None
query: str | None
requested_by: uuid.UUID | None
status: str
progress: float
error_message: str | None
retry_count: int
track_id: uuid.UUID | None
created_at: dt.datetime
updated_at: dt.datetime
+6 -3
View File
@@ -9,6 +9,8 @@ from dataclasses import dataclass
class Artist: class Artist:
id: uuid.UUID id: uuid.UUID
name: str name: str
source: str | None
source_id: str | None
created_at: dt.datetime created_at: dt.datetime
updated_at: dt.datetime updated_at: dt.datetime
@@ -19,9 +21,9 @@ class Track:
title: str title: str
artist_id: uuid.UUID artist_id: uuid.UUID
album_id: uuid.UUID | None album_id: uuid.UUID | None
storage_uri: str storage_uri: str | None
file_format: str file_format: str | None
file_size: int file_size: int | None
source: str source: str
source_id: str source_id: str
duration_seconds: int | None duration_seconds: int | None
@@ -31,5 +33,6 @@ class Track:
metadata_status: str metadata_status: str
metadata_error: str | None metadata_error: str | None
enriched_at: dt.datetime | None enriched_at: dt.datetime | None
availability: str
created_at: dt.datetime created_at: dt.datetime
updated_at: dt.datetime updated_at: dt.datetime
+117 -5
View File
@@ -7,7 +7,7 @@ are bound to these ports at the composition root (``app.api.deps``).
import datetime as dt import datetime as dt
import uuid import uuid
from collections.abc import AsyncIterator, Iterator from collections.abc import AsyncIterator, Awaitable, Callable, Iterator
from contextlib import AbstractAsyncContextManager from contextlib import AbstractAsyncContextManager
from pathlib import Path from pathlib import Path
from typing import Protocol from typing import Protocol
@@ -18,6 +18,7 @@ from app.domain.entities import (
CoverArt, CoverArt,
Credentials, Credentials,
DiskUsage, DiskUsage,
DownloadJob,
Fingerprint, Fingerprint,
LibraryStats, LibraryStats,
Like, Like,
@@ -29,9 +30,14 @@ from app.domain.entities import (
User, User,
) )
from app.domain.entities.track import Artist, Track from app.domain.entities.track import Artist, Track
from app.domain.sources import SourceFile, SourceInfo from app.domain.sources import DownloadResult, RawMetadata, SearchResult, SourceFile, SourceInfo
from app.domain.tokens import IssuedToken, TokenClaims, TokenType from app.domain.tokens import IssuedToken, TokenClaims, TokenType
# A fetch source reports download progress as a fraction in [0.0, 1.0]. It's a
# plain callback (not a port) because it's an inversion of control supplied per
# call by the worker, which persists it to the download job.
ProgressCallback = Callable[[float], Awaitable[None]]
class UserRepository(Protocol): class UserRepository(Protocol):
async def get_by_id(self, user_id: uuid.UUID) -> User | None: ... async def get_by_id(self, user_id: uuid.UUID) -> User | None: ...
@@ -108,6 +114,11 @@ class FileStorage(Protocol):
class ArtistRepository(Protocol): class ArtistRepository(Protocol):
async def get_or_create(self, name: str) -> Artist: ... async def get_or_create(self, name: str) -> Artist: ...
async def get_or_create_remote(self, *, name: str, source: str, source_id: str) -> Artist:
"""Resolve/create an artist bound to a remote ``(source, source_id)``
(lazy materialization save-to-library)."""
...
async def get_by_id(self, artist_id: uuid.UUID) -> Artist | None: ... async def get_by_id(self, artist_id: uuid.UUID) -> Artist | None: ...
async def get_many(self, ids: list[uuid.UUID]) -> list[Artist]: ... async def get_many(self, ids: list[uuid.UUID]) -> list[Artist]: ...
async def list(self, *, q: str | None, limit: int, offset: int) -> list[Artist]: ... async def list(self, *, q: str | None, limit: int, offset: int) -> list[Artist]: ...
@@ -125,14 +136,28 @@ class TrackRepository(Protocol):
id: uuid.UUID, id: uuid.UUID,
title: str, title: str,
artist_id: uuid.UUID, artist_id: uuid.UUID,
storage_uri: str, storage_uri: str | None,
file_format: str, file_format: str | None,
file_size: int, file_size: int | None,
source: str, source: str,
source_id: str, source_id: str,
metadata_status: str, metadata_status: str,
added_by: uuid.UUID | None, added_by: uuid.UUID | None,
availability: str = ...,
) -> Track: ... ) -> Track: ...
async def materialize(
self,
track_id: uuid.UUID,
*,
storage_uri: str,
file_format: str,
file_size: int,
bitrate: int | None,
) -> Track:
"""Fill in a remote placeholder's audio fields after a download
(lazy materialization), flipping ``availability`` to ``local``."""
...
async def delete(self, track_id: uuid.UUID) -> None: ... async def delete(self, track_id: uuid.UUID) -> None: ...
# genres / library_stats must come before ``list`` — the method named # genres / library_stats must come before ``list`` — the method named
# ``list`` shadows the builtin in later annotations (same pattern as # ``list`` shadows the builtin in later annotations (same pattern as
@@ -145,6 +170,7 @@ class TrackRepository(Protocol):
artist_id: uuid.UUID | None, artist_id: uuid.UUID | None,
album_id: uuid.UUID | None, album_id: uuid.UUID | None,
q: str | None, q: str | None,
source: str | None = None,
sort_by: str, sort_by: str,
order: str, order: str,
limit: int, limit: int,
@@ -156,6 +182,7 @@ class TrackRepository(Protocol):
artist_id: uuid.UUID | None, artist_id: uuid.UUID | None,
album_id: uuid.UUID | None, album_id: uuid.UUID | None,
q: str | None, q: str | None,
source: str | None = None,
) -> int: ... ) -> int: ...
async def update( async def update(
self, self,
@@ -204,6 +231,20 @@ class AlbumRepository(Protocol):
year: int | None, year: int | None,
musicbrainz_id: str | None, musicbrainz_id: str | None,
) -> Album: ... ) -> Album: ...
async def get_or_create_remote(
self,
*,
title: str,
artist_id: uuid.UUID,
year: int | None,
musicbrainz_id: str | None,
source: str,
source_id: str,
) -> Album:
"""Resolve/create an album bound to a remote ``(source, source_id)``
(lazy materialization save-to-library)."""
...
async def set_cover_path(self, album_id: uuid.UUID, cover_path: str) -> None: ... async def set_cover_path(self, album_id: uuid.UUID, cover_path: str) -> None: ...
async def get_by_id(self, album_id: uuid.UUID) -> Album | None: ... async def get_by_id(self, album_id: uuid.UUID) -> Album | None: ...
async def get_many(self, ids: list[uuid.UUID]) -> list[Album]: ... async def get_many(self, ids: list[uuid.UUID]) -> list[Album]: ...
@@ -273,6 +314,54 @@ class HistoryRepository(Protocol):
async def count(self, *, user_id: uuid.UUID) -> int: ... async def count(self, *, user_id: uuid.UUID) -> int: ...
class DownloadJobRepository(Protocol):
"""Persistence for download jobs (plan §6.1). Drives the §A5 download manager
and the worker's retry/backoff loop."""
async def add(
self,
*,
source: str,
source_id: str | None,
query: str | None,
requested_by: uuid.UUID | None,
) -> DownloadJob: ...
async def get_by_id(self, job_id: uuid.UUID) -> DownloadJob | None: ...
async def get_active_for_source(self, source: str, source_id: str) -> DownloadJob | None:
"""An unfinished (queued/downloading/enriching) job for the same item, if
any — used to dedup before enqueuing so a double-click can't queue twice."""
...
async def list(
self,
*,
requested_by: uuid.UUID | None,
status: str | None,
limit: int,
offset: int,
) -> list[DownloadJob]: ...
async def count(self, *, requested_by: uuid.UUID | None, status: str | None) -> int: ...
async def set_status(
self,
job_id: uuid.UUID,
*,
status: str,
error_message: str | None = None,
track_id: uuid.UUID | None = None,
) -> None: ...
async def set_progress(self, job_id: uuid.UUID, progress: float) -> None: ...
async def increment_retry(self, job_id: uuid.UUID) -> int:
"""Bump ``retry_count`` and return the new value."""
...
async def delete(self, job_id: uuid.UUID) -> None: ...
async def failure_rate(self, source: str, *, since: dt.datetime) -> float:
"""Fraction of jobs for ``source`` created since ``since`` that ended
``failed`` (0.0 when there are none) — drives the §A5 "source unhealthy"
banner."""
...
class SourceBackend(Protocol): class SourceBackend(Protocol):
"""A registered source of tracks (mounted folder, YouTube, …). """A registered source of tracks (mounted folder, YouTube, …).
@@ -291,6 +380,29 @@ class IndexableSource(SourceBackend, Protocol):
def scan(self) -> Iterator[SourceFile]: ... def scan(self) -> Iterator[SourceFile]: ...
class SearchableSource(SourceBackend, Protocol):
"""A source that can be searched by free text (e.g. YouTube Music).
Returns ``[]`` (never raises) on no results / the service being down — the
discover screen degrades to "nothing found" rather than erroring."""
async def search(self, query: str, *, limit: int) -> list[SearchResult]: ...
class FetchableSource(SourceBackend, Protocol):
"""A source that can download a previously-discovered item to local disk.
``fetch`` resolves a ``source_id`` (from a :class:`SearchResult`) into a file
and reports progress through ``on_progress``. It runs only in a worker (heavy
I/O) and raises on failure so the download task can retry with backoff."""
async def fetch(
self, source_id: str, *, on_progress: ProgressCallback | None = None
) -> DownloadResult: ...
async def get_metadata(self, source_id: str) -> RawMetadata | None: ...
# -- metadata enrichment (plan §6.2) ----------------------------------------- # -- metadata enrichment (plan §6.2) -----------------------------------------
class AudioTagReader(Protocol): class AudioTagReader(Protocol):
"""Reads embedded tags from a local audio file. Returns ``None`` only when """Reads embedded tags from a local audio file. Returns ``None`` only when
+58 -2
View File
@@ -10,8 +10,14 @@ here — a source yields a file plus a minimal title; enrichment (plan §6.2) fi
the rest later, so this stays a thin discovery layer (CLAUDE.md: no duplicated the rest later, so this stays a thin discovery layer (CLAUDE.md: no duplicated
business logic).""" business logic)."""
from dataclasses import dataclass from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any
# A source's ``kind`` describes which ports it satisfies, so the UI/admin can
# tell an indexed folder from a searchable fetch-source. A backend may be both.
KIND_INDEXABLE = "indexable" # enumerates files already on disk (local folder)
KIND_FETCH = "fetch" # searches + downloads from an external service (YTM, …)
@dataclass(frozen=True, slots=True) @dataclass(frozen=True, slots=True)
@@ -20,7 +26,7 @@ class SourceInfo:
name: str name: str
label: str label: str
kind: str # "indexable" (more kinds — search/download — arrive with youtube) kind: str # KIND_INDEXABLE | KIND_FETCH
available: bool available: bool
@@ -37,3 +43,53 @@ class SourceFile:
suggested_title: str suggested_title: str
file_format: str file_format: str
file_size: int file_size: int
@dataclass(frozen=True, slots=True)
class SearchResult:
"""One hit from a searchable source (plan §5), shown on the discover screen.
``source_id`` is the stable handle the same backend later resolves in
``fetch`` — it must round-trip a download request without re-searching.
``raw`` carries the backend's untouched payload for debugging / future use.
"""
source: str
source_id: str
title: str
artist: str | None
album: str | None
duration_seconds: int | None
thumbnail_url: str | None
raw: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True, slots=True)
class RawMetadata:
"""Metadata a fetch-source can offer about an item *before* enrichment.
Best-effort and source-shaped — the canonical metadata still comes from the
enrichment pipeline (plan §6.2). Used to seed a more useful provisional
title than a bare id while a download is queued."""
title: str | None
artist: str | None
album: str | None
year: int | None
extra: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True, slots=True)
class DownloadResult:
"""A file a fetch-source produced on local disk (plan §5).
``path`` is a temp file the caller owns: it is stored into managed storage
and then removed (same lifecycle as an upload). ``source_id`` is echoed back
because some backends only learn the canonical id during the download."""
source_id: str
path: Path
file_format: str
file_size: int
bitrate: int | None
suggested_title: str
+11 -1
View File
@@ -2,7 +2,7 @@
import uuid import uuid
from sqlalchemy import ForeignKey, Integer, String from sqlalchemy import ForeignKey, Integer, String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
from app.infrastructure.db.base import Base from app.infrastructure.db.base import Base
@@ -11,6 +11,12 @@ from app.infrastructure.db.models.mixins import TimestampMixin, UUIDPrimaryKeyMi
class AlbumModel(UUIDPrimaryKeyMixin, TimestampMixin, Base): class AlbumModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
__tablename__ = "albums" __tablename__ = "albums"
__table_args__ = (
# Binds a remote (browsable) album to its local row for re-browse/save
# dedup. Multiple NULLs are allowed by Postgres, so locally-created
# albums (source/source_id both NULL) never collide on this.
UniqueConstraint("source", "source_id", name="uq_albums_source_source_id"),
)
title: Mapped[str] = mapped_column(String(1024), index=True, nullable=False) title: Mapped[str] = mapped_column(String(1024), index=True, nullable=False)
artist_id: Mapped[uuid.UUID] = mapped_column( artist_id: Mapped[uuid.UUID] = mapped_column(
@@ -21,3 +27,7 @@ class AlbumModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
year: Mapped[int | None] = mapped_column(Integer, nullable=True) year: Mapped[int | None] = mapped_column(Integer, nullable=True)
cover_path: Mapped[str | None] = mapped_column(String(1024), nullable=True) cover_path: Mapped[str | None] = mapped_column(String(1024), nullable=True)
musicbrainz_id: Mapped[str | None] = mapped_column(String(36), index=True, nullable=True) musicbrainz_id: Mapped[str | None] = mapped_column(String(36), index=True, nullable=True)
# -- remote identity (lazy materialization) --------------------------
source: Mapped[str | None] = mapped_column(String(32), nullable=True)
source_id: Mapped[str | None] = mapped_column(String(512), nullable=True)
+11 -1
View File
@@ -1,6 +1,6 @@
"""ORM model for artists.""" """ORM model for artists."""
from sqlalchemy import String from sqlalchemy import String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
from app.infrastructure.db.base import Base from app.infrastructure.db.base import Base
@@ -9,6 +9,16 @@ from app.infrastructure.db.models.mixins import TimestampMixin, UUIDPrimaryKeyMi
class ArtistModel(UUIDPrimaryKeyMixin, TimestampMixin, Base): class ArtistModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
__tablename__ = "artists" __tablename__ = "artists"
__table_args__ = (
# Binds a remote (browsable) artist to its local row for re-browse/save
# dedup. Multiple NULLs are allowed by Postgres, so locally-created
# artists (source/source_id both NULL) never collide on this.
UniqueConstraint("source", "source_id", name="uq_artists_source_source_id"),
)
name: Mapped[str] = mapped_column(String(512), index=True, nullable=False) name: Mapped[str] = mapped_column(String(512), index=True, nullable=False)
musicbrainz_id: Mapped[str | None] = mapped_column(String(36), index=True, nullable=True) musicbrainz_id: Mapped[str | None] = mapped_column(String(36), index=True, nullable=True)
# -- remote identity (lazy materialization) --------------------------
source: Mapped[str | None] = mapped_column(String(32), nullable=True)
source_id: Mapped[str | None] = mapped_column(String(512), nullable=True)
@@ -35,3 +35,9 @@ class DownloadJobModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
progress: Mapped[float] = mapped_column(Float, nullable=False, default=0.0) progress: Mapped[float] = mapped_column(Float, nullable=False, default=0.0)
error_message: Mapped[str | None] = mapped_column(Text, nullable=True) error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
retry_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0) retry_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
# Set once the download finishes and the track is imported — lets the UI
# link a completed job to its library track.
track_id: Mapped[uuid.UUID | None] = mapped_column(
ForeignKey("tracks.id", ondelete="SET NULL"),
nullable=True,
)
+9
View File
@@ -64,3 +64,12 @@ class LyricsStatus(enum.StrEnum):
FOUND = "found" FOUND = "found"
NOT_FOUND = "not_found" NOT_FOUND = "not_found"
PENDING = "pending" PENDING = "pending"
class TrackAvailability(enum.StrEnum):
"""Whether a track's audio is on local storage or still a remote placeholder
(plan: lazy materialization). ``remote`` tracks have ``storage_uri = NULL``
until ``TrackRepository.materialize`` fills it in."""
LOCAL = "local"
REMOTE = "remote"
+13 -4
View File
@@ -13,7 +13,7 @@ from sqlalchemy import DateTime, ForeignKey, Integer, String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
from app.infrastructure.db.base import Base from app.infrastructure.db.base import Base
from app.infrastructure.db.models.enums import MetadataStatus, StoragePolicy from app.infrastructure.db.models.enums import MetadataStatus, StoragePolicy, TrackAvailability
from app.infrastructure.db.models.mixins import TimestampMixin, UUIDPrimaryKeyMixin from app.infrastructure.db.models.mixins import TimestampMixin, UUIDPrimaryKeyMixin
@@ -41,11 +41,20 @@ class TrackModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
year: Mapped[int | None] = mapped_column(Integer, nullable=True) year: Mapped[int | None] = mapped_column(Integer, nullable=True)
# -- file (original, stored as-is) ----------------------------------- # -- file (original, stored as-is) -----------------------------------
storage_uri: Mapped[str] = mapped_column(String(2048), nullable=False) # NULL on a remote placeholder (not yet materialized) — see ``availability``.
file_format: Mapped[str] = mapped_column(String(32), nullable=False) storage_uri: Mapped[str | None] = mapped_column(String(2048), nullable=True)
file_size: Mapped[int] = mapped_column(Integer, nullable=False) file_format: Mapped[str | None] = mapped_column(String(32), nullable=True)
file_size: Mapped[int | None] = mapped_column(Integer, nullable=True)
bitrate: Mapped[int | None] = mapped_column(Integer, nullable=True) bitrate: Mapped[int | None] = mapped_column(Integer, nullable=True)
# ``remote`` = placeholder with no local audio yet; materialize() flips this
# to ``local`` once the file is downloaded and ``storage_uri`` is filled in.
availability: Mapped[str] = mapped_column(
String(16),
nullable=False,
default=TrackAvailability.LOCAL.value,
)
# -- dedup / external ids -------------------------------------------- # -- dedup / external ids --------------------------------------------
acoustid_fingerprint: Mapped[str | None] = mapped_column(String(64), index=True, nullable=True) acoustid_fingerprint: Mapped[str | None] = mapped_column(String(64), index=True, nullable=True)
musicbrainz_id: Mapped[str | None] = mapped_column(String(36), index=True, nullable=True) musicbrainz_id: Mapped[str | None] = mapped_column(String(36), index=True, nullable=True)
@@ -2,6 +2,9 @@
from app.infrastructure.db.repositories.album_repository import SqlAlchemyAlbumRepository from app.infrastructure.db.repositories.album_repository import SqlAlchemyAlbumRepository
from app.infrastructure.db.repositories.artist_repository import SqlAlchemyArtistRepository from app.infrastructure.db.repositories.artist_repository import SqlAlchemyArtistRepository
from app.infrastructure.db.repositories.download_job_repository import (
SqlAlchemyDownloadJobRepository,
)
from app.infrastructure.db.repositories.history_repository import SqlAlchemyHistoryRepository from app.infrastructure.db.repositories.history_repository import SqlAlchemyHistoryRepository
from app.infrastructure.db.repositories.like_repository import SqlAlchemyLikeRepository from app.infrastructure.db.repositories.like_repository import SqlAlchemyLikeRepository
from app.infrastructure.db.repositories.playlist_repository import SqlAlchemyPlaylistRepository from app.infrastructure.db.repositories.playlist_repository import SqlAlchemyPlaylistRepository
@@ -14,6 +17,7 @@ from app.infrastructure.db.repositories.user_repository import SqlAlchemyUserRep
__all__ = [ __all__ = [
"SqlAlchemyAlbumRepository", "SqlAlchemyAlbumRepository",
"SqlAlchemyArtistRepository", "SqlAlchemyArtistRepository",
"SqlAlchemyDownloadJobRepository",
"SqlAlchemyHistoryRepository", "SqlAlchemyHistoryRepository",
"SqlAlchemyLikeRepository", "SqlAlchemyLikeRepository",
"SqlAlchemyPlaylistRepository", "SqlAlchemyPlaylistRepository",
@@ -18,6 +18,8 @@ def _to_entity(row: AlbumModel) -> Album:
year=row.year, year=row.year,
cover_path=row.cover_path, cover_path=row.cover_path,
musicbrainz_id=row.musicbrainz_id, musicbrainz_id=row.musicbrainz_id,
source=row.source,
source_id=row.source_id,
created_at=row.created_at, created_at=row.created_at,
updated_at=row.updated_at, updated_at=row.updated_at,
) )
@@ -63,6 +65,58 @@ class SqlAlchemyAlbumRepository:
await self._session.refresh(row) await self._session.refresh(row)
return _to_entity(row) return _to_entity(row)
async def get_or_create_remote(
self,
*,
title: str,
artist_id: uuid.UUID,
year: int | None,
musicbrainz_id: str | None,
source: str,
source_id: str,
) -> Album:
"""Resolve an album by ``(source, source_id)`` first (re-browse/save
dedup), falling back to ``(title, artist_id)`` and gap-filling the
remote ids onto an existing row, else creating a new remote-bound row."""
row = (
await self._session.execute(
select(AlbumModel).where(
AlbumModel.source == source,
AlbumModel.source_id == source_id,
)
)
).scalar_one_or_none()
if row is None:
row = (
await self._session.execute(
select(AlbumModel).where(
AlbumModel.title == title,
AlbumModel.artist_id == artist_id,
)
)
).scalar_one_or_none()
if row is None:
row = AlbumModel(
title=title,
artist_id=artist_id,
year=year,
musicbrainz_id=musicbrainz_id,
source=source,
source_id=source_id,
)
self._session.add(row)
else:
if row.year is None and year is not None:
row.year = year
if row.musicbrainz_id is None and musicbrainz_id is not None:
row.musicbrainz_id = musicbrainz_id
if row.source is None and row.source_id is None:
row.source = source
row.source_id = source_id
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def set_cover_path(self, album_id: uuid.UUID, cover_path: str) -> None: async def set_cover_path(self, album_id: uuid.UUID, cover_path: str) -> None:
row = await self._session.get(AlbumModel, album_id) row = await self._session.get(AlbumModel, album_id)
if row is not None: if row is not None:
@@ -15,6 +15,8 @@ def _to_entity(row: ArtistModel) -> Artist:
return Artist( return Artist(
id=row.id, id=row.id,
name=row.name, name=row.name,
source=row.source,
source_id=row.source_id,
created_at=row.created_at, created_at=row.created_at,
updated_at=row.updated_at, updated_at=row.updated_at,
) )
@@ -35,6 +37,32 @@ class SqlAlchemyArtistRepository:
await self._session.refresh(row) await self._session.refresh(row)
return _to_entity(row) return _to_entity(row)
async def get_or_create_remote(self, *, name: str, source: str, source_id: str) -> Artist:
"""Resolve an artist by ``(source, source_id)`` first (re-browse/save
dedup), falling back to ``name`` and gap-filling the remote ids onto an
existing row, else creating a new remote-bound row."""
row = (
await self._session.execute(
select(ArtistModel).where(
ArtistModel.source == source,
ArtistModel.source_id == source_id,
)
)
).scalar_one_or_none()
if row is None:
row = (
await self._session.execute(select(ArtistModel).where(ArtistModel.name == name))
).scalar_one_or_none()
if row is None:
row = ArtistModel(name=name, source=source, source_id=source_id)
self._session.add(row)
elif row.source is None and row.source_id is None:
row.source = source
row.source_id = source_id
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def get_by_id(self, artist_id: uuid.UUID) -> Artist | None: async def get_by_id(self, artist_id: uuid.UUID) -> Artist | None:
row = await self._session.get(ArtistModel, artist_id) row = await self._session.get(ArtistModel, artist_id)
return _to_entity(row) if row is not None else None return _to_entity(row) if row is not None else None
@@ -0,0 +1,164 @@
"""Download job repository — adapter over ``AsyncSession`` (plan §6.1)."""
import datetime as dt
import uuid
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.entities.download import DownloadJob
from app.infrastructure.db.models.download_job import DownloadJobModel
from app.infrastructure.db.models.enums import DownloadStatus
# Jobs that are not yet finished — used to dedup an in-flight download.
_ACTIVE_STATUSES = (
DownloadStatus.QUEUED.value,
DownloadStatus.DOWNLOADING.value,
DownloadStatus.ENRICHING.value,
)
def _to_entity(row: DownloadJobModel) -> DownloadJob:
return DownloadJob(
id=row.id,
source=row.source,
source_id=row.source_id,
query=row.query,
requested_by=row.requested_by,
status=row.status,
progress=row.progress,
error_message=row.error_message,
retry_count=row.retry_count,
track_id=row.track_id,
created_at=row.created_at,
updated_at=row.updated_at,
)
class SqlAlchemyDownloadJobRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def add(
self,
*,
source: str,
source_id: str | None,
query: str | None,
requested_by: uuid.UUID | None,
) -> DownloadJob:
row = DownloadJobModel(
source=source,
source_id=source_id,
query=query,
requested_by=requested_by,
status=DownloadStatus.QUEUED.value,
progress=0.0,
retry_count=0,
)
self._session.add(row)
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def get_by_id(self, job_id: uuid.UUID) -> DownloadJob | None:
row = await self._session.get(DownloadJobModel, job_id)
return _to_entity(row) if row is not None else None
async def get_active_for_source(self, source: str, source_id: str) -> DownloadJob | None:
row = (
await self._session.execute(
select(DownloadJobModel)
.where(
DownloadJobModel.source == source,
DownloadJobModel.source_id == source_id,
DownloadJobModel.status.in_(_ACTIVE_STATUSES),
)
.order_by(DownloadJobModel.created_at.desc())
.limit(1)
)
).scalar_one_or_none()
return _to_entity(row) if row is not None else None
async def list(
self,
*,
requested_by: uuid.UUID | None,
status: str | None,
limit: int,
offset: int,
) -> list[DownloadJob]:
stmt = select(DownloadJobModel)
if requested_by is not None:
stmt = stmt.where(DownloadJobModel.requested_by == requested_by)
if status is not None:
stmt = stmt.where(DownloadJobModel.status == status)
stmt = stmt.order_by(DownloadJobModel.created_at.desc()).limit(limit).offset(offset)
rows = (await self._session.execute(stmt)).scalars().all()
return [_to_entity(r) for r in rows]
async def count(self, *, requested_by: uuid.UUID | None, status: str | None) -> int:
stmt = select(func.count()).select_from(DownloadJobModel)
if requested_by is not None:
stmt = stmt.where(DownloadJobModel.requested_by == requested_by)
if status is not None:
stmt = stmt.where(DownloadJobModel.status == status)
return (await self._session.execute(stmt)).scalar_one()
async def set_status(
self,
job_id: uuid.UUID,
*,
status: str,
error_message: str | None = None,
track_id: uuid.UUID | None = None,
) -> None:
row = await self._session.get(DownloadJobModel, job_id)
if row is None:
return
row.status = status
# ``error_message`` is always written: a successful transition clears a
# stale reason from an earlier failed attempt.
row.error_message = error_message
if track_id is not None:
row.track_id = track_id
if status == DownloadStatus.DONE.value:
row.progress = 1.0
await self._session.flush()
async def set_progress(self, job_id: uuid.UUID, progress: float) -> None:
row = await self._session.get(DownloadJobModel, job_id)
if row is None:
return
row.progress = max(0.0, min(1.0, progress))
await self._session.flush()
async def increment_retry(self, job_id: uuid.UUID) -> int:
row = await self._session.get(DownloadJobModel, job_id)
if row is None:
return 0
row.retry_count += 1
await self._session.flush()
return row.retry_count
async def delete(self, job_id: uuid.UUID) -> None:
row = await self._session.get(DownloadJobModel, job_id)
if row is not None:
await self._session.delete(row)
await self._session.flush()
async def failure_rate(self, source: str, *, since: dt.datetime) -> float:
total, failed = (
await self._session.execute(
select(
func.count(),
func.count().filter(DownloadJobModel.status == DownloadStatus.FAILED.value),
)
.select_from(DownloadJobModel)
.where(
DownloadJobModel.source == source,
DownloadJobModel.created_at >= since,
)
)
).one()
return (failed / total) if total else 0.0
@@ -42,6 +42,7 @@ def _track_to_entity(row: TrackModel) -> Track:
metadata_status=row.metadata_status, metadata_status=row.metadata_status,
metadata_error=row.metadata_error, metadata_error=row.metadata_error,
enriched_at=row.enriched_at, enriched_at=row.enriched_at,
availability=row.availability,
created_at=row.created_at, created_at=row.created_at,
updated_at=row.updated_at, updated_at=row.updated_at,
) )
@@ -41,6 +41,7 @@ def _track_to_entity(row: TrackModel) -> Track:
metadata_status=row.metadata_status, metadata_status=row.metadata_status,
metadata_error=row.metadata_error, metadata_error=row.metadata_error,
enriched_at=row.enriched_at, enriched_at=row.enriched_at,
availability=row.availability,
created_at=row.created_at, created_at=row.created_at,
updated_at=row.updated_at, updated_at=row.updated_at,
) )
@@ -10,6 +10,7 @@ from app.domain.entities.storage import FormatBreakdown, LibraryStats
from app.domain.entities.track import Track from app.domain.entities.track import Track
from app.domain.errors import NotFoundError from app.domain.errors import NotFoundError
from app.infrastructure.db.models.artist import ArtistModel from app.infrastructure.db.models.artist import ArtistModel
from app.infrastructure.db.models.enums import TrackAvailability
from app.infrastructure.db.models.track import TrackModel from app.infrastructure.db.models.track import TrackModel
@@ -31,6 +32,7 @@ def _to_entity(row: TrackModel) -> Track:
metadata_status=row.metadata_status, metadata_status=row.metadata_status,
metadata_error=row.metadata_error, metadata_error=row.metadata_error,
enriched_at=row.enriched_at, enriched_at=row.enriched_at,
availability=row.availability,
created_at=row.created_at, created_at=row.created_at,
updated_at=row.updated_at, updated_at=row.updated_at,
) )
@@ -61,13 +63,14 @@ class SqlAlchemyTrackRepository:
id: uuid.UUID, id: uuid.UUID,
title: str, title: str,
artist_id: uuid.UUID, artist_id: uuid.UUID,
storage_uri: str, storage_uri: str | None,
file_format: str, file_format: str | None,
file_size: int, file_size: int | None,
source: str, source: str,
source_id: str, source_id: str,
metadata_status: str, metadata_status: str,
added_by: uuid.UUID | None, added_by: uuid.UUID | None,
availability: str = TrackAvailability.LOCAL.value,
) -> Track: ) -> Track:
row = TrackModel( row = TrackModel(
id=id, id=id,
@@ -80,12 +83,38 @@ class SqlAlchemyTrackRepository:
source_id=source_id, source_id=source_id,
metadata_status=metadata_status, metadata_status=metadata_status,
added_by=added_by, added_by=added_by,
availability=availability,
) )
self._session.add(row) self._session.add(row)
await self._session.flush() await self._session.flush()
await self._session.refresh(row) await self._session.refresh(row)
return _to_entity(row) return _to_entity(row)
async def materialize(
self,
track_id: uuid.UUID,
*,
storage_uri: str,
file_format: str,
file_size: int,
bitrate: int | None,
) -> Track:
"""Fill in a remote placeholder's audio fields after a download (lazy
materialization). ``track.id`` is unchanged, so likes/playlists/queue
entries that already reference it keep working."""
row = await self._session.get(TrackModel, track_id)
if row is None:
raise NotFoundError(f"Track {track_id} not found.")
row.storage_uri = storage_uri
row.file_format = file_format
row.file_size = file_size
if bitrate is not None:
row.bitrate = bitrate
row.availability = TrackAvailability.LOCAL.value
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def delete(self, track_id: uuid.UUID) -> None: async def delete(self, track_id: uuid.UUID) -> None:
row = await self._session.get(TrackModel, track_id) row = await self._session.get(TrackModel, track_id)
if row is not None: if row is not None:
@@ -130,6 +159,7 @@ class SqlAlchemyTrackRepository:
func.count(TrackModel.id), func.count(TrackModel.id),
func.coalesce(func.sum(TrackModel.file_size), 0), func.coalesce(func.sum(TrackModel.file_size), 0),
) )
.where(TrackModel.file_format.is_not(None))
.group_by(TrackModel.file_format) .group_by(TrackModel.file_format)
.order_by(func.sum(TrackModel.file_size).desc()) .order_by(func.sum(TrackModel.file_size).desc())
) )
@@ -170,6 +200,7 @@ class SqlAlchemyTrackRepository:
artist_id: uuid.UUID | None, artist_id: uuid.UUID | None,
album_id: uuid.UUID | None, album_id: uuid.UUID | None,
q: str | None, q: str | None,
source: str | None = None,
sort_by: str = "created_at", sort_by: str = "created_at",
order: str = "desc", order: str = "desc",
limit: int = 50, limit: int = 50,
@@ -180,6 +211,8 @@ class SqlAlchemyTrackRepository:
stmt = stmt.where(TrackModel.artist_id == artist_id) stmt = stmt.where(TrackModel.artist_id == artist_id)
if album_id is not None: if album_id is not None:
stmt = stmt.where(TrackModel.album_id == album_id) stmt = stmt.where(TrackModel.album_id == album_id)
if source is not None:
stmt = stmt.where(TrackModel.source == source)
if q: if q:
stmt = stmt.where(TrackModel.title.ilike(f"%{q}%")) stmt = stmt.where(TrackModel.title.ilike(f"%{q}%"))
@@ -204,12 +237,15 @@ class SqlAlchemyTrackRepository:
artist_id: uuid.UUID | None, artist_id: uuid.UUID | None,
album_id: uuid.UUID | None, album_id: uuid.UUID | None,
q: str | None, q: str | None,
source: str | None = None,
) -> int: ) -> int:
stmt = select(func.count()).select_from(TrackModel) stmt = select(func.count()).select_from(TrackModel)
if artist_id is not None: if artist_id is not None:
stmt = stmt.where(TrackModel.artist_id == artist_id) stmt = stmt.where(TrackModel.artist_id == artist_id)
if album_id is not None: if album_id is not None:
stmt = stmt.where(TrackModel.album_id == album_id) stmt = stmt.where(TrackModel.album_id == album_id)
if source is not None:
stmt = stmt.where(TrackModel.source == source)
if q: if q:
stmt = stmt.where(TrackModel.title.ilike(f"%{q}%")) stmt = stmt.where(TrackModel.title.ilike(f"%{q}%"))
return (await self._session.execute(stmt)).scalar_one() return (await self._session.execute(stmt)).scalar_one()
+1 -1
View File
@@ -78,7 +78,7 @@ class AcoustIdHttpClient:
) )
resp.raise_for_status() resp.raise_for_status()
return resp.json() # type: ignore[no-any-return] return resp.json() # type: ignore[no-any-return]
except (httpx.HTTPError, ValueError): except httpx.HTTPError, ValueError:
log.warning("acoustid_lookup_failed") log.warning("acoustid_lookup_failed")
return None return None
+27 -2
View File
@@ -2,16 +2,18 @@
Built from settings at the composition root. Only sources that are configured Built from settings at the composition root. Only sources that are configured
are registered (e.g. ``local`` appears only when ``LOCAL_MEDIA_IMPORT_PATH`` is are registered (e.g. ``local`` appears only when ``LOCAL_MEDIA_IMPORT_PATH`` is
set), so enumeration reflects what the instance can actually use. set; ``youtube`` only when ``YOUTUBE_ENABLED``), so enumeration reflects what the
instance can actually use.
""" """
from typing import cast from typing import cast
from app.core.config import Settings from app.core.config import Settings
from app.domain.errors import NotFoundError, ValidationError from app.domain.errors import NotFoundError, ValidationError
from app.domain.ports import IndexableSource, SourceBackend from app.domain.ports import FetchableSource, IndexableSource, SearchableSource, SourceBackend
from app.domain.sources import SourceInfo from app.domain.sources import SourceInfo
from app.infrastructure.sources.local_folder import LocalFolderSource from app.infrastructure.sources.local_folder import LocalFolderSource
from app.infrastructure.sources.youtube import YouTubeMusicSource
class SourceRegistry: class SourceRegistry:
@@ -30,6 +32,22 @@ class SourceRegistry:
raise ValidationError(f"Source {name!r} cannot be indexed.") raise ValidationError(f"Source {name!r} cannot be indexed.")
return cast(IndexableSource, backend) return cast(IndexableSource, backend)
def searchable(self, name: str) -> SearchableSource:
backend = self.get(name)
if not hasattr(backend, "search"):
raise ValidationError(f"Source {name!r} cannot be searched.")
return cast(SearchableSource, backend)
def fetchable(self, name: str) -> FetchableSource:
backend = self.get(name)
if not hasattr(backend, "fetch"):
raise ValidationError(f"Source {name!r} cannot download.")
return cast(FetchableSource, backend)
def searchables(self) -> list[SearchableSource]:
"""Every registered source that supports search (for cross-source search)."""
return [cast(SearchableSource, b) for b in self._by_name.values() if hasattr(b, "search")]
def infos(self) -> list[SourceInfo]: def infos(self) -> list[SourceInfo]:
return [backend.info() for backend in self._by_name.values()] return [backend.info() for backend in self._by_name.values()]
@@ -38,4 +56,11 @@ def build_source_registry(settings: Settings) -> SourceRegistry:
backends: list[SourceBackend] = [] backends: list[SourceBackend] = []
if settings.local_media_import_path is not None: if settings.local_media_import_path is not None:
backends.append(LocalFolderSource(settings.local_media_import_path)) backends.append(LocalFolderSource(settings.local_media_import_path))
if settings.youtube_enabled:
backends.append(
YouTubeMusicSource(
cookies_path=settings.youtube_cookies_path,
tmp_dir=settings.upload_tmp_dir,
)
)
return SourceRegistry(backends) return SourceRegistry(backends)
+207
View File
@@ -0,0 +1,207 @@
"""``youtube`` source — YouTube Music search + download (plan §5).
A *fetch* source: it searches YouTube Music (via ``ytmusicapi``, which returns
clean song/artist/album/duration rows) and downloads the chosen item with
``yt-dlp``. The two libraries are synchronous, so every call is bounced to a
worker thread (``anyio.to_thread``); the sync yt-dlp progress hook bridges back
to the async progress callback via ``anyio.from_thread``.
Both libraries are optional dependencies — if either is missing the source is
simply *unavailable* (it never crashes import or the registry; graceful
degradation per CLAUDE.md). The audio stream is stored **as-is** (YouTube serves
lossy Opus/AAC; re-encoding would be lossy→lossy, plan §6.6).
``source_id`` is the YouTube ``videoId`` — stable, so a re-download of the same
id is idempotent and dedups against an existing track.
"""
import functools
import tempfile
from collections.abc import Callable
from pathlib import Path
from typing import Any
import anyio
from app.core.logging import get_logger
from app.domain.ports import ProgressCallback
from app.domain.sources import (
KIND_FETCH,
DownloadResult,
RawMetadata,
SearchResult,
SourceInfo,
)
from app.infrastructure.db.models.enums import TrackSource
log = get_logger(__name__)
# Functions a caller may inject for testing (defaults do the real library work).
SearchFn = Callable[[str, int], list[dict[str, Any]]]
# (video_id, tmp_dir, progress_hook, cookies_path) -> normalized download dict
DownloadFn = Callable[[str, Path, Callable[[dict[str, Any]], None], Path | None], dict[str, Any]]
def _libs_available() -> bool:
try:
import yt_dlp # noqa: F401
import ytmusicapi # noqa: F401
except ImportError:
return False
return True
def _watch_url(video_id: str) -> str:
return f"https://music.youtube.com/watch?v={video_id}"
class YouTubeMusicSource:
"""Implements :class:`app.domain.ports.SearchableSource` and
:class:`~app.domain.ports.FetchableSource`."""
name = TrackSource.YOUTUBE.value
def __init__(
self,
*,
cookies_path: Path | None = None,
tmp_dir: Path | None = None,
search_fn: SearchFn | None = None,
download_fn: DownloadFn | None = None,
) -> None:
self._cookies_path = cookies_path
self._tmp_dir = tmp_dir
self._search_fn = search_fn or _default_search
self._download_fn = download_fn or _default_download
# Only the real library path needs the deps; an injected fn is self-contained.
self._injected = search_fn is not None or download_fn is not None
def info(self) -> SourceInfo:
return SourceInfo(
name=self.name,
label="YouTube Music",
kind=KIND_FETCH,
available=self.is_available(),
)
def is_available(self) -> bool:
return True if self._injected else _libs_available()
async def search(self, query: str, *, limit: int) -> list[SearchResult]:
query = query.strip()
if not query:
return []
try:
rows = await anyio.to_thread.run_sync(functools.partial(self._search_fn, query, limit))
except Exception:
# No results / service down → degrade to empty (plan §5, CLAUDE.md).
log.warning("ytm_search_failed", query=query)
return []
return [r for r in (self._to_result(row) for row in rows) if r is not None]
async def fetch(
self, source_id: str, *, on_progress: ProgressCallback | None = None
) -> DownloadResult:
tmp_dir = self._tmp_dir or Path(tempfile.gettempdir())
def hook(d: dict[str, Any]) -> None:
if on_progress is None or d.get("status") != "downloading":
return
total = d.get("total_bytes") or d.get("total_bytes_estimate")
done = d.get("downloaded_bytes")
if not total or done is None:
return
# Cap below 1.0 — the job only reaches 1.0 once stored + imported.
frac = min(done / total, 0.99)
# Bridge sync hook (worker thread) → async callback (event loop).
anyio.from_thread.run(on_progress, frac)
def _run() -> dict[str, Any]:
return self._download_fn(source_id, tmp_dir, hook, self._cookies_path)
info = await anyio.to_thread.run_sync(_run)
path = Path(info["filepath"])
stat = await anyio.Path(path).stat()
return DownloadResult(
source_id=source_id,
path=path,
file_format=info["file_format"],
file_size=stat.st_size,
bitrate=info.get("bitrate"),
suggested_title=info.get("title") or source_id,
)
async def get_metadata(self, source_id: str) -> RawMetadata | None:
# The search result already carries a usable title/artist, and the
# canonical metadata comes from enrichment (§6.2). A dedicated lookup is
# an optional refinement — skipped for now (returns None gracefully).
return None
def _to_result(self, row: dict[str, Any]) -> SearchResult | None:
video_id = row.get("videoId")
if not video_id:
return None # non-playable row (e.g. a video without audio id)
artists = row.get("artists") or []
artist = ", ".join(a["name"] for a in artists if a.get("name")) or None
album = (row.get("album") or {}).get("name") if isinstance(row.get("album"), dict) else None
thumbnails = row.get("thumbnails") or []
thumbnail = thumbnails[-1].get("url") if thumbnails else None
return SearchResult(
source=self.name,
source_id=str(video_id),
title=row.get("title") or "Unknown",
artist=artist,
album=album,
duration_seconds=row.get("duration_seconds"),
thumbnail_url=thumbnail,
raw=row,
)
def _default_search(query: str, limit: int) -> list[dict[str, Any]]:
"""Real ytmusicapi search (songs only). Runs in a worker thread."""
from ytmusicapi import YTMusic
yt = YTMusic() # unauthenticated: public search needs no login
results: list[dict[str, Any]] = yt.search(query, filter="songs", limit=limit)
return results[:limit]
def _default_download(
video_id: str,
tmp_dir: Path,
progress_hook: Callable[[dict[str, Any]], None],
cookies_path: Path | None,
) -> dict[str, Any]:
"""Real yt-dlp download of the best audio stream. Runs in a worker thread.
Stores the original stream (no transcode — plan §6.3/§6.6). Returns a
normalized dict the adapter maps to :class:`DownloadResult`.
"""
from yt_dlp import YoutubeDL
opts: dict[str, Any] = {
"format": "bestaudio/best",
"outtmpl": str(tmp_dir / "%(id)s.%(ext)s"),
"quiet": True,
"no_warnings": True,
"noprogress": True,
"progress_hooks": [progress_hook],
}
# Use cookies only when the file is actually present: the path can be set
# unconditionally (e.g. a mounted volume that may be empty) and downloads
# still work without it — cookies just unlock age/region-restricted items.
if cookies_path is not None and cookies_path.is_file():
opts["cookiefile"] = str(cookies_path)
with YoutubeDL(opts) as ydl:
info = ydl.extract_info(_watch_url(video_id), download=True)
filepath = Path(ydl.prepare_filename(info))
abr = info.get("abr")
return {
"filepath": filepath,
"file_format": filepath.suffix.lstrip(".").lower() or "m4a",
"bitrate": int(abr) if abr else None,
"title": info.get("title"),
}
+8 -2
View File
@@ -1,7 +1,6 @@
"""arq worker settings — the queue runtime. Task functions register here. """arq worker settings — the queue runtime. Task functions register here.
Run with: ``arq app.workers.arq_worker.WorkerSettings``. Run with: ``arq app.workers.arq_worker.WorkerSettings``.
Tasks (download, transcode) are appended to ``functions`` in later steps.
""" """
from typing import Any, ClassVar from typing import Any, ClassVar
@@ -10,8 +9,10 @@ from arq.connections import RedisSettings
from app.core.config import get_settings from app.core.config import get_settings
from app.core.logging import configure_logging, get_logger from app.core.logging import configure_logging, get_logger
from app.workers.tasks.download_task import download_track
from app.workers.tasks.enrich_task import enrich_track from app.workers.tasks.enrich_task import enrich_track
from app.workers.tasks.import_task import scan_local_folder from app.workers.tasks.import_task import scan_local_folder
from app.workers.tasks.materialize_task import materialize_track
log = get_logger("worker") log = get_logger("worker")
@@ -27,7 +28,12 @@ async def shutdown(_ctx: dict[str, Any]) -> None:
class WorkerSettings: class WorkerSettings:
functions: ClassVar[list[Any]] = [scan_local_folder, enrich_track] functions: ClassVar[list[Any]] = [
scan_local_folder,
enrich_track,
download_track,
materialize_track,
]
on_startup = startup on_startup = startup
on_shutdown = shutdown on_shutdown = shutdown
max_jobs = get_settings().max_parallel_downloads max_jobs = get_settings().max_parallel_downloads
+25
View File
@@ -34,6 +34,31 @@ async def enqueue(function: str, **kwargs: Any) -> str:
return str(job.job_id) return str(job.job_id)
async def enqueue_download(job_id: uuid.UUID) -> None:
"""Best-effort enqueue of a download job for the worker.
The job row is already persisted as ``queued``, so this is a follow-up, not a
barrier: if the queue is unreachable we log and move on (graceful
degradation) — the job stays ``queued`` and can be retried later. Deferred a
few seconds so the request's DB transaction commits before the worker reads
the row (same reason as :func:`enqueue_enrich`)."""
try:
await enqueue("download_track", job_id=str(job_id), _defer_by=3)
except DependencyUnavailableError:
log.warning("download_enqueue_failed", job_id=str(job_id))
async def enqueue_materialize(job_id: uuid.UUID) -> None:
"""Best-effort enqueue of a materialize job for the worker (plan: Model C
lazy materialization). Same deferred-commit reasoning as
:func:`enqueue_download` — the job row stays ``queued`` and can be retried
if the queue is unreachable."""
try:
await enqueue("materialize_track", job_id=str(job_id), _defer_by=3)
except DependencyUnavailableError:
log.warning("materialize_enqueue_failed", job_id=str(job_id))
async def enqueue_enrich(track_id: uuid.UUID) -> None: async def enqueue_enrich(track_id: uuid.UUID) -> None:
"""Best-effort enqueue of metadata enrichment for a freshly stored track. """Best-effort enqueue of metadata enrichment for a freshly stored track.
+151
View File
@@ -0,0 +1,151 @@
"""arq task: download one queued job through a fetch source (plan §6.1).
Flow: load job → ``downloading`` → ``backend.fetch`` (progress streamed to the
job row) → ``enriching`` → store file + minimal track → ``done`` → enqueue
enrichment. yt-dlp fails often, so a failed fetch retries with exponential
backoff (``download_max_retries``); only after the last try is the job marked
``failed`` with a reason for the §A5 download manager.
Heavy I/O belongs off the request cycle (CLAUDE.md); the HTTP endpoint only
enqueues. The job row tolerates being deleted mid-flight (cancellation) — status
writes against a missing row are no-ops.
"""
import uuid
from typing import Any
from arq import Retry
from app.application.download_service import DownloadService
from app.core.config import get_settings
from app.core.logging import correlation_id, get_logger
from app.domain.entities.download import DownloadJob
from app.domain.errors import NotFoundError, ValidationError
from app.domain.ports import FetchableSource, ProgressCallback
from app.domain.sources import DownloadResult
from app.infrastructure.db import session_scope
from app.infrastructure.db.repositories import (
SqlAlchemyArtistRepository,
SqlAlchemyDownloadJobRepository,
SqlAlchemyTrackRepository,
)
from app.infrastructure.sources.registry import build_source_registry
from app.infrastructure.storage.provider import get_file_storage
from app.workers.queue import enqueue_enrich
log = get_logger("worker.download")
# Exponential backoff between retries: 30s, 60s, 120s … capped.
_BACKOFF_BASE_SECONDS = 30
_BACKOFF_MAX_SECONDS = 600
# Only write progress when it advances by at least this much (avoid hammering
# the DB on every yt-dlp chunk).
_PROGRESS_STEP = 0.01
async def download_track(_ctx: dict[str, Any], *, job_id: str) -> dict[str, Any]:
correlation_id.set(f"dl:{job_id}")
jid = uuid.UUID(job_id)
settings = get_settings()
job = await _load_job(jid)
if job is None:
log.info("download_job_missing", job_id=job_id) # cancelled before pickup
return {"job_id": job_id, "status": "missing"}
registry = build_source_registry(settings)
try:
backend = registry.fetchable(job.source)
except (NotFoundError, ValidationError) as exc:
await _mark_failed(jid, f"Source unavailable: {exc}")
return {"job_id": job_id, "status": "failed"}
if job.source_id is None:
await _mark_failed(jid, "Job has no source_id to download.")
return {"job_id": job_id, "status": "failed"}
await _set_status(jid, "downloading")
try:
result = await _run_fetch(backend, job.source_id, jid)
except Exception as exc:
return await _handle_failure(jid, exc, settings.download_max_retries, job_id)
try:
track_id = await _import_result(jid, job, result)
except Exception as exc:
log.exception("download_import_failed", job_id=job_id)
await _mark_failed(jid, f"Import failed: {type(exc).__name__}: {exc}")
return {"job_id": job_id, "status": "failed"}
await enqueue_enrich(track_id)
log.info("download_complete", job_id=job_id, track_id=str(track_id))
return {"job_id": job_id, "status": "done", "track_id": str(track_id)}
async def _run_fetch(
backend: FetchableSource, source_id: str, jid: uuid.UUID
) -> DownloadResult:
"""Fetch the file, streaming progress into the job row. A single session is
held for the download so progress writes don't churn connections; each
throttled update is committed so API pollers see it."""
async with session_scope() as session:
repo = SqlAlchemyDownloadJobRepository(session)
last = 0.0
async def on_progress(frac: float) -> None:
nonlocal last
if frac - last < _PROGRESS_STEP:
return
last = frac
await repo.set_progress(jid, frac)
await session.commit()
cb: ProgressCallback = on_progress
return await backend.fetch(source_id, on_progress=cb)
async def _import_result(jid: uuid.UUID, job: DownloadJob, result: DownloadResult) -> uuid.UUID:
async with session_scope() as session:
repo = SqlAlchemyDownloadJobRepository(session)
await repo.set_status(jid, status="enriching")
service = DownloadService(
jobs=repo,
tracks=SqlAlchemyTrackRepository(session),
artists=SqlAlchemyArtistRepository(session),
storage=get_file_storage(),
)
track_id = await service.store_result(
source=job.source, result=result, requested_by=job.requested_by
)
await repo.set_status(jid, status="done", track_id=track_id)
return track_id
async def _handle_failure(
jid: uuid.UUID, exc: Exception, max_retries: int, job_id: str
) -> dict[str, Any]:
async with session_scope() as session:
tries = await SqlAlchemyDownloadJobRepository(session).increment_retry(jid)
if tries <= max_retries:
backoff = min(_BACKOFF_BASE_SECONDS * 2 ** (tries - 1), _BACKOFF_MAX_SECONDS)
log.warning("download_retry", job_id=job_id, attempt=tries, defer=backoff)
raise Retry(defer=backoff) from exc
log.exception("download_failed", job_id=job_id)
await _mark_failed(jid, f"Download failed after {tries} attempts: {type(exc).__name__}: {exc}")
return {"job_id": job_id, "status": "failed"}
async def _load_job(jid: uuid.UUID) -> DownloadJob | None:
async with session_scope() as session:
return await SqlAlchemyDownloadJobRepository(session).get_by_id(jid)
async def _set_status(jid: uuid.UUID, status: str) -> None:
async with session_scope() as session:
await SqlAlchemyDownloadJobRepository(session).set_status(jid, status=status)
async def _mark_failed(jid: uuid.UUID, error: str) -> None:
async with session_scope() as session:
await SqlAlchemyDownloadJobRepository(session).set_status(
jid, status="failed", error_message=error
)
+101
View File
@@ -0,0 +1,101 @@
"""arq task: materialize a remote placeholder track (plan: Model C).
Counterpart to ``download_task`` for tracks that were *saved* from a remote
browse hit without audio (``availability="remote"``, ``storage_uri=NULL``).
The job's ``track_id`` already points at the existing placeholder row — on
success the file is stored and ``TrackRepository.materialize`` fills the row
in place (the track's ``id`` never changes), then enrichment is enqueued as
usual.
Shares its fetch/retry/failure machinery with ``download_task`` — only the
"what happens on success" step differs (fill in an existing row vs. create a
new one).
"""
import contextlib
import uuid
from typing import Any
import anyio
from app.core.config import get_settings
from app.core.logging import correlation_id, get_logger
from app.domain.errors import NotFoundError, ValidationError
from app.domain.sources import DownloadResult
from app.infrastructure.db import session_scope
from app.infrastructure.db.repositories import (
SqlAlchemyDownloadJobRepository,
SqlAlchemyTrackRepository,
)
from app.infrastructure.sources.registry import build_source_registry
from app.infrastructure.storage.provider import get_file_storage
from app.workers.queue import enqueue_enrich
from app.workers.tasks.download_task import _handle_failure, _load_job, _mark_failed, _run_fetch
log = get_logger("worker.materialize")
async def materialize_track(_ctx: dict[str, Any], *, job_id: str) -> dict[str, Any]:
correlation_id.set(f"mat:{job_id}")
jid = uuid.UUID(job_id)
settings = get_settings()
job = await _load_job(jid)
if job is None:
log.info("materialize_job_missing", job_id=job_id) # cancelled before pickup
return {"job_id": job_id, "status": "missing"}
if job.track_id is None or job.source_id is None:
await _mark_failed(jid, "Materialize job missing track_id/source_id.")
return {"job_id": job_id, "status": "failed"}
registry = build_source_registry(settings)
try:
backend = registry.fetchable(job.source)
except (NotFoundError, ValidationError) as exc:
await _mark_failed(jid, f"Source unavailable: {exc}")
return {"job_id": job_id, "status": "failed"}
await _set_status(jid, "downloading")
try:
result = await _run_fetch(backend, job.source_id, jid)
except Exception as exc:
return await _handle_failure(jid, exc, settings.download_max_retries, job_id)
try:
await _materialize_result(jid, job.track_id, result)
except Exception as exc:
log.exception("materialize_finalize_failed", job_id=job_id)
await _mark_failed(jid, f"Materialize failed: {type(exc).__name__}: {exc}")
return {"job_id": job_id, "status": "failed"}
await enqueue_enrich(job.track_id)
log.info("materialize_complete", job_id=job_id, track_id=str(job.track_id))
return {"job_id": job_id, "status": "done", "track_id": str(job.track_id)}
async def _materialize_result(jid: uuid.UUID, track_id: uuid.UUID, result: DownloadResult) -> None:
"""Store the downloaded file and fill in the placeholder track in place."""
key = f"tracks/{str(track_id)[:2]}/{track_id}.{result.file_format}"
storage = get_file_storage()
try:
await storage.save_file(key, result.path)
async with session_scope() as session:
job_repo = SqlAlchemyDownloadJobRepository(session)
await job_repo.set_status(jid, status="enriching")
tracks = SqlAlchemyTrackRepository(session)
await tracks.materialize(
track_id,
storage_uri=key,
file_format=result.file_format,
file_size=result.file_size,
bitrate=result.bitrate,
)
await job_repo.set_status(jid, status="done", track_id=track_id)
finally:
with contextlib.suppress(Exception):
await anyio.Path(result.path).unlink(missing_ok=True)
async def _set_status(jid: uuid.UUID, status: str) -> None:
async with session_scope() as session:
await SqlAlchemyDownloadJobRepository(session).set_status(jid, status=status)
+7
View File
@@ -27,6 +27,9 @@ dependencies = [
"httpx>=0.28", "httpx>=0.28",
# embedded audio tag reading (enrichment tag pre-pass) # embedded audio tag reading (enrichment tag pre-pass)
"mutagen>=1.47", "mutagen>=1.47",
# youtube source: search (ytmusicapi) + download (yt-dlp)
"ytmusicapi>=1.8",
"yt-dlp>=2024.12.13",
# S3-compatible object storage # S3-compatible object storage
"aioboto3>=13.0", "aioboto3>=13.0",
# logging # logging
@@ -95,6 +98,10 @@ ignore_missing_imports = true
module = ["aioboto3.*", "aiobotocore.*", "botocore.*"] module = ["aioboto3.*", "aiobotocore.*", "botocore.*"]
ignore_missing_imports = true ignore_missing_imports = true
[[tool.mypy.overrides]]
module = ["ytmusicapi.*", "yt_dlp.*"]
ignore_missing_imports = true
[tool.pytest.ini_options] [tool.pytest.ini_options]
asyncio_mode = "auto" asyncio_mode = "auto"
testpaths = ["tests"] testpaths = ["tests"]
+1 -3
View File
@@ -79,9 +79,7 @@ async def _create_test_db_if_missing() -> None:
except Exception: except Exception:
return return
try: try:
exists = await conn.fetchval( exists = await conn.fetchval("SELECT 1 FROM pg_database WHERE datname = $1", _TEST_DB_NAME)
"SELECT 1 FROM pg_database WHERE datname = $1", _TEST_DB_NAME
)
if not exists: if not exists:
# CREATE DATABASE can't run inside a transaction; asyncpg's implicit # CREATE DATABASE can't run inside a transaction; asyncpg's implicit
# autocommit on a bare connection handles that. # autocommit on a bare connection handles that.
+222
View File
@@ -0,0 +1,222 @@
"""Unit tests for DownloadService — DB-free, in-memory fakes."""
import datetime as dt
import uuid
from pathlib import Path
import pytest
from app.application.download_service import DownloadService
from app.domain.entities import Artist, Track
from app.domain.entities.download import DownloadJob
from app.domain.sources import DownloadResult
pytestmark = pytest.mark.asyncio
class FakeArtistRepo:
async def get_or_create(self, name: str) -> Artist:
now = dt.datetime.now(dt.UTC)
return Artist(
id=uuid.uuid4(),
name=name,
source=None,
source_id=None,
created_at=now,
updated_at=now,
)
class FakeTrackRepo:
def __init__(self) -> None:
self.by_source: dict[tuple[str, str], Track] = {}
self.added: list[Track] = []
async def get_by_source(self, source: str, source_id: str) -> Track | None:
return self.by_source.get((source, source_id))
async def add(self, **kw: object) -> Track:
now = dt.datetime.now(dt.UTC)
track = Track(
id=kw["id"], # type: ignore[arg-type]
title=str(kw["title"]),
artist_id=kw["artist_id"], # type: ignore[arg-type]
album_id=None,
storage_uri=str(kw["storage_uri"]),
file_format=str(kw["file_format"]),
file_size=int(kw["file_size"]), # type: ignore[call-overload]
source=str(kw["source"]),
source_id=str(kw["source_id"]),
duration_seconds=None,
genre=None,
year=None,
track_number=None,
metadata_status=str(kw["metadata_status"]),
metadata_error=None,
enriched_at=None,
availability="local",
created_at=now,
updated_at=now,
)
self.by_source[(track.source, track.source_id)] = track
self.added.append(track)
return track
class FakeStorage:
def __init__(self) -> None:
self.saved: dict[str, Path] = {}
self.deleted: list[str] = []
async def save_file(self, key: str, src_path: Path) -> int:
self.saved[key] = src_path
return 1
async def delete(self, key: str) -> None:
self.deleted.append(key)
class FakeJobRepo:
def __init__(self) -> None:
self.jobs: dict[uuid.UUID, DownloadJob] = {}
self.active: dict[tuple[str, str], DownloadJob] = {}
def _make(self, **kw: object) -> DownloadJob:
now = dt.datetime.now(dt.UTC)
return DownloadJob(
id=uuid.uuid4(),
source=str(kw["source"]),
source_id=kw.get("source_id"), # type: ignore[arg-type]
query=kw.get("query"), # type: ignore[arg-type]
requested_by=kw.get("requested_by"), # type: ignore[arg-type]
status="queued",
progress=0.0,
error_message=None,
retry_count=0,
track_id=None,
created_at=now,
updated_at=now,
)
async def add(self, **kw: object) -> DownloadJob:
job = self._make(**kw)
self.jobs[job.id] = job
return job
async def get_by_id(self, job_id: uuid.UUID) -> DownloadJob | None:
return self.jobs.get(job_id)
async def get_active_for_source(self, source: str, source_id: str) -> DownloadJob | None:
return self.active.get((source, source_id))
async def set_status(self, job_id: uuid.UUID, **kw: object) -> None: ...
async def delete(self, job_id: uuid.UUID) -> None:
self.jobs.pop(job_id, None)
def _service(
*, jobs: FakeJobRepo, tracks: FakeTrackRepo, storage: FakeStorage, enqueued: list[uuid.UUID]
) -> DownloadService:
async def enqueue_download(job_id: uuid.UUID) -> None:
enqueued.append(job_id)
return DownloadService(
jobs=jobs, # type: ignore[arg-type]
tracks=tracks, # type: ignore[arg-type]
artists=FakeArtistRepo(), # type: ignore[arg-type]
storage=storage, # type: ignore[arg-type]
enqueue_download=enqueue_download,
)
def _track(source: str, source_id: str) -> Track:
now = dt.datetime.now(dt.UTC)
return Track(
id=uuid.uuid4(),
title="t",
artist_id=uuid.uuid4(),
album_id=None,
storage_uri="k",
file_format="mp3",
file_size=1,
source=source,
source_id=source_id,
duration_seconds=None,
genre=None,
year=None,
track_number=None,
metadata_status="pending",
metadata_error=None,
enriched_at=None,
availability="local",
created_at=now,
updated_at=now,
)
async def test_request_dedups_against_library() -> None:
jobs, tracks, storage, enq = FakeJobRepo(), FakeTrackRepo(), FakeStorage(), []
tracks.by_source[("youtube", "abc")] = _track("youtube", "abc")
svc = _service(jobs=jobs, tracks=tracks, storage=storage, enqueued=enq)
result = await svc.request(source="youtube", source_id="abc", query=None, requested_by=None)
assert result.already_in_library is True
assert result.track_id is not None
assert result.job is None
assert enq == [] # nothing enqueued
async def test_request_returns_existing_active_job() -> None:
jobs, tracks, storage, enq = FakeJobRepo(), FakeTrackRepo(), FakeStorage(), []
existing = await jobs.add(source="youtube", source_id="abc", query=None, requested_by=None)
jobs.active[("youtube", "abc")] = existing
svc = _service(jobs=jobs, tracks=tracks, storage=storage, enqueued=enq)
result = await svc.request(source="youtube", source_id="abc", query=None, requested_by=None)
assert result.already_in_library is False
assert result.job is not None
assert result.job.id == existing.id
assert enq == [] # not re-enqueued
async def test_request_creates_and_enqueues_new_job() -> None:
jobs, tracks, storage, enq = FakeJobRepo(), FakeTrackRepo(), FakeStorage(), []
svc = _service(jobs=jobs, tracks=tracks, storage=storage, enqueued=enq)
result = await svc.request(
source="youtube", source_id="abc", query="bohemian", requested_by=None
)
assert result.already_in_library is False
assert result.job is not None
assert enq == [result.job.id]
async def test_store_result_imports_and_cleans_temp(tmp_path: Path) -> None:
jobs, tracks, storage, enq = FakeJobRepo(), FakeTrackRepo(), FakeStorage(), []
svc = _service(jobs=jobs, tracks=tracks, storage=storage, enqueued=enq)
audio = tmp_path / "abc.webm"
audio.write_bytes(b"audio" * 20)
result = DownloadResult(
source_id="abc",
path=audio,
file_format="m4a",
file_size=100,
bitrate=160,
suggested_title="Bohemian Rhapsody",
)
track_id = await svc.store_result(source="youtube", result=result, requested_by=None)
assert len(tracks.added) == 1
stored = tracks.added[0]
assert stored.id == track_id
assert stored.source == "youtube"
assert stored.source_id == "abc"
assert stored.metadata_status == "pending"
assert stored.title == "Bohemian Rhapsody"
assert len(storage.saved) == 1
assert not audio.exists() # temp file removed
+360
View File
@@ -0,0 +1,360 @@
"""Integration tests for downloads + external search.
Requires a reachable Postgres; skips otherwise. The download worker task is
invoked directly (no Redis needed) against a fake fetch source, so the full
DB + storage import path is covered without touching the network.
"""
import asyncio
import os
from collections.abc import AsyncIterator
from pathlib import Path
from typing import Any
import pytest
from app.core.config import get_settings
from app.domain.sources import KIND_FETCH, DownloadResult, SearchResult, SourceInfo
from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
from app.infrastructure.db.repositories import (
SqlAlchemyRefreshTokenRepository,
SqlAlchemyUserRepository,
)
from app.infrastructure.sources.registry import SourceRegistry
from asgi_lifespan import LifespanManager
from httpx import ASGITransport, AsyncClient
pytestmark = pytest.mark.asyncio
_db_reachable_cache: bool | None = None
async def _db_reachable() -> bool:
global _db_reachable_cache
if _db_reachable_cache is not None:
return _db_reachable_cache
from sqlalchemy import text
try:
async with asyncio.timeout(3):
async with get_engine().connect() as conn:
await conn.execute(text("SELECT 1"))
_db_reachable_cache = True
except Exception:
_db_reachable_cache = False
return _db_reachable_cache
class FakeFetchSource:
"""A searchable + fetchable source that writes a local file (no network)."""
name = "youtube"
def __init__(self, tmp_dir: Path) -> None:
self._tmp_dir = tmp_dir
def info(self) -> SourceInfo:
return SourceInfo(name=self.name, label="YouTube Music", kind=KIND_FETCH, available=True)
def is_available(self) -> bool:
return True
async def search(self, query: str, *, limit: int) -> list[SearchResult]:
return [
SearchResult(
source=self.name,
source_id="vid-1",
title=f"{query} song",
artist="Some Artist",
album="Some Album",
duration_seconds=200,
thumbnail_url="http://img/large.jpg",
)
]
async def fetch(self, source_id: str, *, on_progress: Any = None) -> DownloadResult:
path = self._tmp_dir / f"{source_id}.m4a"
path.write_bytes(b"downloaded audio bytes" * 8)
if on_progress is not None:
await on_progress(0.5)
return DownloadResult(
source_id=source_id,
path=path,
file_format="webm",
file_size=path.stat().st_size,
bitrate=160,
suggested_title=f"Title for {source_id}",
)
async def get_metadata(self, source_id: str) -> None:
return None
@pytest.fixture
async def api(tmp_path: Path) -> AsyncIterator[AsyncClient]:
if not await _db_reachable():
pytest.skip("Postgres not reachable — integration test skipped.")
media = tmp_path / "media"
media.mkdir()
os.environ["MEDIA_PATH"] = str(media)
get_settings.cache_clear()
import app.infrastructure.storage.provider as _storage_provider
_storage_provider._storage = None
try:
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
from app.application.user_service import UserService
from app.core.security import Argon2PasswordHasher
async with session_scope() as session:
await UserService(
users=SqlAlchemyUserRepository(session),
refresh_tokens=SqlAlchemyRefreshTokenRepository(session),
hasher=Argon2PasswordHasher(),
).create_user(username="admin", password="adminpass1", is_superuser=True)
from app.api.deps import get_source_registry
from app.main import create_app
app = create_app()
# Inject a fake fetch source so search/download never hit the network.
fake_registry = SourceRegistry([FakeFetchSource(tmp_path / "dl")]) # type: ignore[list-item]
(tmp_path / "dl").mkdir()
app.dependency_overrides[get_source_registry] = lambda: fake_registry
async with LifespanManager(app):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await dispose_engine()
finally:
_storage_provider._storage = None
os.environ.pop("MEDIA_PATH", None)
get_settings.cache_clear()
async def _login(api: AsyncClient) -> str:
resp = await api.post(
"/api/v1/auth/login", json={"username": "admin", "password": "adminpass1"}
)
assert resp.status_code == 200
return str(resp.json()["access_token"])
async def test_search_aggregates_fetch_sources(api: AsyncClient) -> None:
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
resp = await api.get("/api/v1/search", params={"q": "queen"}, headers=headers)
assert resp.status_code == 200
body = resp.json()
assert body["searched_sources"] == ["youtube"]
assert len(body["results"]) == 1
hit = body["results"][0]
assert hit["source"] == "youtube"
assert hit["source_id"] == "vid-1"
assert hit["title"] == "queen song"
async def test_search_reports_library_status(api: AsyncClient) -> None:
"""Remote browse (plan: Model C) — a fresh hit isn't in the library; after
saving it as a placeholder, the same search reports it as such."""
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
resp = await api.get("/api/v1/search", params={"q": "queen"}, headers=headers)
hit = resp.json()["results"][0]
assert hit["in_library"] is False
assert hit["track_id"] is None
assert hit["availability"] is None
save = await api.post(
"/api/v1/tracks/remote",
json={
"source": hit["source"],
"source_id": hit["source_id"],
"title": hit["title"],
"artist": hit["artist"],
},
headers=headers,
)
assert save.status_code == 201
saved = save.json()
assert saved["availability"] == "remote"
assert saved["file_format"] is None
resp2 = await api.get("/api/v1/search", params={"q": "queen"}, headers=headers)
hit2 = resp2.json()["results"][0]
assert hit2["in_library"] is True
assert hit2["track_id"] == saved["id"]
assert hit2["availability"] == "remote"
async def test_save_remote_is_idempotent(api: AsyncClient) -> None:
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
payload = {"source": "youtube", "source_id": "vid-idem", "title": "A", "artist": "Artist"}
first = await api.post("/api/v1/tracks/remote", json=payload, headers=headers)
second = await api.post("/api/v1/tracks/remote", json=payload, headers=headers)
assert first.status_code == 201
assert second.status_code == 201
assert first.json()["id"] == second.json()["id"]
async def test_materialize_flow(
api: AsyncClient, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Save a placeholder, materialize it on demand, and confirm it streams
afterwards (plan: Model C lazy materialization)."""
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
save = await api.post(
"/api/v1/tracks/remote",
json={
"source": "youtube",
"source_id": "vid-mat-1",
"title": "Materialize Me",
"artist": "Artist",
},
headers=headers,
)
track_id = save.json()["id"]
assert save.json()["availability"] == "remote"
# Streaming a placeholder before materialization fails (no audio yet).
stream_before = await api.get(f"/api/v1/stream/{track_id}", headers=headers)
assert stream_before.status_code == 404
materialize = await api.post(f"/api/v1/tracks/{track_id}/materialize", headers=headers)
assert materialize.status_code == 200
body = materialize.json()
assert body["job"] is not None
job_id = body["job"]["id"]
assert body["job"]["track_id"] == track_id
# A second materialize request reuses the same in-flight job.
again = await api.post(f"/api/v1/tracks/{track_id}/materialize", headers=headers)
assert again.json()["job"]["id"] == job_id
# Run the worker task directly (bypasses Redis) with the fake fetch source.
import app.workers.tasks.materialize_task as mat_task
worker_dir = tmp_path / "worker-mat"
worker_dir.mkdir()
fake = SourceRegistry([FakeFetchSource(worker_dir)]) # type: ignore[list-item]
monkeypatch.setattr(mat_task, "build_source_registry", lambda _settings: fake)
result = await mat_task.materialize_track({}, job_id=job_id)
assert result["status"] == "done"
assert result["track_id"] == track_id
got = await api.get(f"/api/v1/tracks/{track_id}", headers=headers)
assert got.json()["availability"] == "local"
assert got.json()["file_format"] == "webm"
# Streaming now works.
stream_after = await api.get(f"/api/v1/stream/{track_id}", headers=headers)
assert stream_after.status_code == 200
# Materializing an already-local track is a no-op.
noop = await api.post(f"/api/v1/tracks/{track_id}/materialize", headers=headers)
assert noop.json()["job"] is None
async def test_source_scoped_search(api: AsyncClient) -> None:
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
resp = await api.get("/api/v1/sources/youtube/search", params={"q": "abba"}, headers=headers)
assert resp.status_code == 200
assert resp.json()["results"][0]["title"] == "abba song"
async def test_download_create_list_and_complete(
api: AsyncClient, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
# Request a download — Redis is absent, so enqueue degrades but the job persists.
create = await api.post(
"/api/v1/downloads",
json={"source": "youtube", "source_id": "vid-1", "query": "queen"},
headers=headers,
)
assert create.status_code == 202
body = create.json()
assert body["already_in_library"] is False
job_id = body["job"]["id"]
assert body["job"]["status"] == "queued"
# It shows up in the listing.
listing = await api.get("/api/v1/downloads", headers=headers)
assert listing.status_code == 200
assert any(j["id"] == job_id for j in listing.json()["items"])
# A duplicate request returns the same in-flight job, not a new one.
dup = await api.post(
"/api/v1/downloads",
json={"source": "youtube", "source_id": "vid-1"},
headers=headers,
)
assert dup.json()["job"]["id"] == job_id
# Run the worker task directly (bypasses Redis) with the fake fetch source.
import app.workers.tasks.download_task as dl_task
worker_dl = tmp_path / "worker-dl"
worker_dl.mkdir()
fake = SourceRegistry([FakeFetchSource(worker_dl)]) # type: ignore[list-item]
monkeypatch.setattr(dl_task, "build_source_registry", lambda _settings: fake)
result = await dl_task.download_track({}, job_id=job_id)
assert result["status"] == "done"
track_id = result["track_id"]
# The job is now done and linked to the imported track.
got = await api.get(f"/api/v1/downloads/{job_id}", headers=headers)
assert got.json()["status"] == "done"
assert got.json()["track_id"] == track_id
# The imported track streams back.
stream = await api.get(f"/api/v1/stream/{track_id}", headers=headers)
assert stream.status_code == 200
assert len(stream.content) > 0
# A new request for the same item now dedups against the library.
again = await api.post(
"/api/v1/downloads",
json={"source": "youtube", "source_id": "vid-1"},
headers=headers,
)
assert again.json()["already_in_library"] is True
assert again.json()["track_id"] == track_id
async def test_cancel_download(api: AsyncClient) -> None:
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
create = await api.post(
"/api/v1/downloads",
json={"source": "youtube", "source_id": "vid-cancel"},
headers=headers,
)
job_id = create.json()["job"]["id"]
cancel = await api.delete(f"/api/v1/downloads/{job_id}", headers=headers)
assert cancel.status_code == 204
got = await api.get(f"/api/v1/downloads/{job_id}", headers=headers)
assert got.status_code == 404
+10 -1
View File
@@ -20,7 +20,14 @@ class FakeArtistRepo:
async def get_or_create(self, name: str) -> Artist: async def get_or_create(self, name: str) -> Artist:
if name not in self._by_name: if name not in self._by_name:
now = dt.datetime.now(dt.UTC) now = dt.datetime.now(dt.UTC)
self._by_name[name] = Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now) self._by_name[name] = Artist(
id=uuid.uuid4(),
name=name,
source=None,
source_id=None,
created_at=now,
updated_at=now,
)
return self._by_name[name] return self._by_name[name]
@@ -55,6 +62,7 @@ class FakeTrackRepo:
metadata_status=str(kw["metadata_status"]), metadata_status=str(kw["metadata_status"]),
metadata_error=None, metadata_error=None,
enriched_at=None, enriched_at=None,
availability="local",
created_at=now, created_at=now,
updated_at=now, updated_at=now,
) )
@@ -139,6 +147,7 @@ async def test_dedup_skips_already_imported() -> None:
metadata_status="pending", metadata_status="pending",
metadata_error=None, metadata_error=None,
enriched_at=None, enriched_at=None,
availability="local",
created_at=now, created_at=now,
updated_at=now, updated_at=now,
) )
+156
View File
@@ -0,0 +1,156 @@
"""Integration tests for the lazy-materialization foundation:
``TrackRepository.materialize`` and ``Album``/``ArtistRepository.get_or_create_remote``.
Requires a reachable Postgres; skips otherwise (same pattern as
``test_upload_stream_api.py``).
"""
import asyncio
import uuid
from collections.abc import AsyncIterator
import pytest
from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
from app.infrastructure.db.models.enums import TrackAvailability
from app.infrastructure.db.repositories import (
SqlAlchemyAlbumRepository,
SqlAlchemyArtistRepository,
SqlAlchemyTrackRepository,
)
pytestmark = pytest.mark.asyncio
_db_reachable_cache: bool | None = None
async def _db_reachable() -> bool:
global _db_reachable_cache
if _db_reachable_cache is not None:
return _db_reachable_cache
from sqlalchemy import text
try:
async with asyncio.timeout(3):
async with get_engine().connect() as conn:
await conn.execute(text("SELECT 1"))
_db_reachable_cache = True
except Exception:
_db_reachable_cache = False
return _db_reachable_cache
@pytest.fixture
async def db() -> AsyncIterator[None]:
if not await _db_reachable():
pytest.skip("Postgres not reachable — integration test skipped.")
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
yield None
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await dispose_engine()
async def test_placeholder_track_materializes_in_place(db: None) -> None:
"""A remote placeholder (no storage) gets its audio fields filled in by
``materialize`` without changing ``track.id`` — the stable content id that
likes/playlists/queue may already reference."""
async with session_scope() as session:
artists = SqlAlchemyArtistRepository(session)
tracks = SqlAlchemyTrackRepository(session)
artist = await artists.get_or_create("Some Artist")
track_id = uuid.uuid4()
placeholder = await tracks.add(
id=track_id,
title="Remote Track",
artist_id=artist.id,
storage_uri=None,
file_format=None,
file_size=None,
source="youtube",
source_id="abc123",
metadata_status="pending",
added_by=None,
availability=TrackAvailability.REMOTE.value,
)
assert placeholder.availability == "remote"
assert placeholder.storage_uri is None
materialized = await tracks.materialize(
track_id,
storage_uri="tracks/ab/abc123.m4a",
file_format="m4a",
file_size=12345,
bitrate=160,
)
assert materialized.id == track_id
assert materialized.availability == "local"
assert materialized.storage_uri == "tracks/ab/abc123.m4a"
assert materialized.file_format == "m4a"
assert materialized.file_size == 12345
async def test_artist_get_or_create_remote_dedups_by_remote_id(db: None) -> None:
async with session_scope() as session:
artists = SqlAlchemyArtistRepository(session)
first = await artists.get_or_create_remote(
name="Daft Punk", source="youtube", source_id="UCabc"
)
again = await artists.get_or_create_remote(
name="Daft Punk (different display name)", source="youtube", source_id="UCabc"
)
assert first.id == again.id
assert again.source == "youtube"
assert again.source_id == "UCabc"
async def test_artist_get_or_create_remote_binds_existing_local_artist(db: None) -> None:
async with session_scope() as session:
artists = SqlAlchemyArtistRepository(session)
local = await artists.get_or_create("Pink Floyd")
remote = await artists.get_or_create_remote(
name="Pink Floyd", source="youtube", source_id="UCxyz"
)
assert remote.id == local.id
assert remote.source == "youtube"
assert remote.source_id == "UCxyz"
async def test_album_get_or_create_remote_dedups_by_remote_id(db: None) -> None:
async with session_scope() as session:
artists = SqlAlchemyArtistRepository(session)
albums = SqlAlchemyAlbumRepository(session)
artist = await artists.get_or_create("Daft Punk")
first = await albums.get_or_create_remote(
title="Discovery",
artist_id=artist.id,
year=2001,
musicbrainz_id=None,
source="youtube",
source_id="MPREb_abc",
)
again = await albums.get_or_create_remote(
title="Discovery",
artist_id=artist.id,
year=None,
musicbrainz_id=None,
source="youtube",
source_id="MPREb_abc",
)
assert first.id == again.id
assert again.source == "youtube"
assert again.source_id == "MPREb_abc"
assert again.year == 2001
+7 -2
View File
@@ -48,12 +48,17 @@ def test_info_reports_kind_and_availability(tmp_path: Path) -> None:
def test_registry_registers_local_when_path_set(tmp_path: Path) -> None: def test_registry_registers_local_when_path_set(tmp_path: Path) -> None:
registry = build_source_registry(_settings(local_media_import_path=tmp_path)) # Disable youtube to isolate the local-source registration under test.
registry = build_source_registry(
_settings(local_media_import_path=tmp_path, youtube_enabled=False)
)
names = {info.name for info in registry.infos()} names = {info.name for info in registry.infos()}
assert names == {"local"} assert names == {"local"}
assert registry.indexable("local").is_available() is True assert registry.indexable("local").is_available() is True
def test_registry_empty_when_path_unset() -> None: def test_registry_empty_when_path_unset() -> None:
registry = build_source_registry(_settings(local_media_import_path=None)) registry = build_source_registry(
_settings(local_media_import_path=None, youtube_enabled=False)
)
assert registry.infos() == [] assert registry.infos() == []
+1 -3
View File
@@ -107,9 +107,7 @@ async def test_track_out_includes_genre_year_track_number(api: AsyncClient) -> N
token = await _login(api) token = await _login(api)
track_id = await _upload(api, token) track_id = await _upload(api, token)
resp = await api.get( resp = await api.get(f"/api/v1/tracks/{track_id}", headers={"Authorization": f"Bearer {token}"})
f"/api/v1/tracks/{track_id}", headers={"Authorization": f"Bearer {token}"}
)
assert resp.status_code == 200, resp.text assert resp.status_code == 200, resp.text
body = resp.json() body = resp.json()
assert "genre" in body assert "genre" in body
+11 -1
View File
@@ -42,6 +42,7 @@ def _track(*, metadata_status: str = "pending", title: str = "raw-stem") -> Trac
metadata_status=metadata_status, metadata_status=metadata_status,
metadata_error=None, metadata_error=None,
enriched_at=None, enriched_at=None,
availability="local",
created_at=now, created_at=now,
updated_at=now, updated_at=now,
) )
@@ -67,7 +68,14 @@ class FakeArtistRepo:
async def get_or_create(self, name: str) -> Artist: async def get_or_create(self, name: str) -> Artist:
self.created.append(name) self.created.append(name)
now = dt.datetime.now(dt.UTC) now = dt.datetime.now(dt.UTC)
return Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now) return Artist(
id=uuid.uuid4(),
name=name,
source=None,
source_id=None,
created_at=now,
updated_at=now,
)
class FakeAlbumRepo: class FakeAlbumRepo:
@@ -88,6 +96,8 @@ class FakeAlbumRepo:
year=year, year=year,
cover_path=self._existing_cover, cover_path=self._existing_cover,
musicbrainz_id=musicbrainz_id, musicbrainz_id=musicbrainz_id,
source=None,
source_id=None,
created_at=now, created_at=now,
updated_at=now, updated_at=now,
) )
+11 -1
View File
@@ -149,6 +149,7 @@ def _pending_track() -> Track:
metadata_status="pending", metadata_status="pending",
metadata_error=None, metadata_error=None,
enriched_at=None, enriched_at=None,
availability="local",
created_at=now, created_at=now,
updated_at=now, updated_at=now,
) )
@@ -180,7 +181,14 @@ class _FakeArtistRepo:
async def get_or_create(self, name: str) -> Artist: async def get_or_create(self, name: str) -> Artist:
self.created.append(name) self.created.append(name)
now = datetime.now(UTC) now = datetime.now(UTC)
return Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now) return Artist(
id=uuid.uuid4(),
name=name,
source=None,
source_id=None,
created_at=now,
updated_at=now,
)
@dataclass @dataclass
@@ -199,6 +207,8 @@ class _FakeAlbumRepo:
year=year, year=year,
cover_path=None, cover_path=None,
musicbrainz_id=musicbrainz_id, musicbrainz_id=musicbrainz_id,
source=None,
source_id=None,
created_at=now, created_at=now,
updated_at=now, updated_at=now,
) )
+255
View File
@@ -0,0 +1,255 @@
"""Unit tests for RemoteLibraryService — DB-free, in-memory fakes (plan: Model C
remote browse + lazy materialization)."""
import datetime as dt
import uuid
import pytest
from app.application.remote_library_service import RemoteLibraryService
from app.domain.entities import Artist, DownloadJob, Track
from app.domain.errors import NotFoundError, ValidationError
pytestmark = pytest.mark.asyncio
class FakeArtistRepo:
def __init__(self) -> None:
self._by_name: dict[str, Artist] = {}
async def get_or_create(self, name: str) -> Artist:
if name not in self._by_name:
now = dt.datetime.now(dt.UTC)
self._by_name[name] = Artist(
id=uuid.uuid4(),
name=name,
source=None,
source_id=None,
created_at=now,
updated_at=now,
)
return self._by_name[name]
class FakeTrackRepo:
def __init__(self) -> None:
self.by_id: dict[uuid.UUID, Track] = {}
self.by_source: dict[tuple[str, str], Track] = {}
async def get_by_id(self, track_id: uuid.UUID) -> Track | None:
return self.by_id.get(track_id)
async def get_by_source(self, source: str, source_id: str) -> Track | None:
return self.by_source.get((source, source_id))
async def add(self, **kw: object) -> Track:
now = dt.datetime.now(dt.UTC)
track = Track(
id=kw["id"], # type: ignore[arg-type]
title=str(kw["title"]),
artist_id=kw["artist_id"], # type: ignore[arg-type]
album_id=None,
storage_uri=kw["storage_uri"], # type: ignore[arg-type]
file_format=kw["file_format"], # type: ignore[arg-type]
file_size=kw["file_size"], # type: ignore[arg-type]
source=str(kw["source"]),
source_id=str(kw["source_id"]),
duration_seconds=None,
genre=None,
year=None,
track_number=None,
metadata_status=str(kw["metadata_status"]),
metadata_error=None,
enriched_at=None,
availability=str(kw["availability"]),
created_at=now,
updated_at=now,
)
self.by_id[track.id] = track
self.by_source[(track.source, track.source_id)] = track
return track
def _local_track(source: str = "youtube", source_id: str = "local-1") -> Track:
now = dt.datetime.now(dt.UTC)
return Track(
id=uuid.uuid4(),
title="Already Here",
artist_id=uuid.uuid4(),
album_id=None,
storage_uri="tracks/aa/aa.m4a",
file_format="m4a",
file_size=123,
source=source,
source_id=source_id,
duration_seconds=None,
genre=None,
year=None,
track_number=None,
metadata_status="pending",
metadata_error=None,
enriched_at=None,
availability="local",
created_at=now,
updated_at=now,
)
class FakeJobRepo:
def __init__(self) -> None:
self.jobs: dict[uuid.UUID, DownloadJob] = {}
self.active: dict[tuple[str, str], DownloadJob] = {}
async def add(self, **kw: object) -> DownloadJob:
now = dt.datetime.now(dt.UTC)
job = DownloadJob(
id=uuid.uuid4(),
source=str(kw["source"]),
source_id=kw.get("source_id"), # type: ignore[arg-type]
query=kw.get("query"), # type: ignore[arg-type]
requested_by=kw.get("requested_by"), # type: ignore[arg-type]
status="queued",
progress=0.0,
error_message=None,
retry_count=0,
track_id=None,
created_at=now,
updated_at=now,
)
self.jobs[job.id] = job
return job
async def get_by_id(self, job_id: uuid.UUID) -> DownloadJob | None:
return self.jobs.get(job_id)
async def get_active_for_source(self, source: str, source_id: str) -> DownloadJob | None:
return self.active.get((source, source_id))
async def set_status(self, job_id: uuid.UUID, **kw: object) -> None:
job = self.jobs[job_id]
track_id = kw.get("track_id")
if track_id is not None:
self.jobs[job_id] = DownloadJob(
id=job.id,
source=job.source,
source_id=job.source_id,
query=job.query,
requested_by=job.requested_by,
status=str(kw.get("status", job.status)),
progress=job.progress,
error_message=job.error_message,
retry_count=job.retry_count,
track_id=track_id, # type: ignore[arg-type]
created_at=job.created_at,
updated_at=job.updated_at,
)
def _service(
*,
tracks: FakeTrackRepo,
artists: FakeArtistRepo,
jobs: FakeJobRepo,
enqueued: list[uuid.UUID],
) -> RemoteLibraryService:
async def enqueue_materialize(job_id: uuid.UUID) -> None:
enqueued.append(job_id)
return RemoteLibraryService(
tracks=tracks, # type: ignore[arg-type]
artists=artists, # type: ignore[arg-type]
jobs=jobs, # type: ignore[arg-type]
enqueue_materialize=enqueue_materialize,
)
async def test_save_remote_creates_placeholder() -> None:
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
track = await service.save_remote(
source="youtube", source_id="abc", title="Bohemian Rhapsody", artist="Queen", added_by=None
)
assert track.availability == "remote"
assert track.storage_uri is None
assert track.file_format is None
assert track.source == "youtube"
assert track.source_id == "abc"
async def test_save_remote_is_idempotent() -> None:
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
first = await service.save_remote(
source="youtube", source_id="abc", title="A", artist="Queen", added_by=None
)
second = await service.save_remote(
source="youtube", source_id="abc", title="B", artist="Other", added_by=None
)
assert first.id == second.id
assert second.title == "A" # untouched by the second call
async def test_materialize_already_local_is_noop() -> None:
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
track = _local_track()
tracks.by_id[track.id] = track
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
outcome = await service.request_materialize(track.id, requested_by=None)
assert outcome.job is None
assert outcome.track.id == track.id
assert enq == []
async def test_materialize_remote_creates_and_enqueues_job() -> None:
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
track = await service.save_remote(
source="youtube", source_id="abc", title="A", artist="Queen", added_by=None
)
outcome = await service.request_materialize(track.id, requested_by=None)
assert outcome.job is not None
assert outcome.job.source == "youtube"
assert outcome.job.source_id == "abc"
assert outcome.job.track_id == track.id
assert enq == [outcome.job.id]
async def test_materialize_reuses_active_job() -> None:
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
track = await service.save_remote(
source="youtube", source_id="abc", title="A", artist="Queen", added_by=None
)
existing = await jobs.add(source="youtube", source_id="abc", query=None, requested_by=None)
jobs.active[("youtube", "abc")] = existing
outcome = await service.request_materialize(track.id, requested_by=None)
assert outcome.job is not None
assert outcome.job.id == existing.id
assert enq == [] # not re-enqueued
async def test_materialize_missing_track_raises() -> None:
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
with pytest.raises(NotFoundError):
await service.request_materialize(uuid.uuid4(), requested_by=None)
async def test_save_remote_requires_source_id() -> None:
tracks, artists, jobs, enq = FakeTrackRepo(), FakeArtistRepo(), FakeJobRepo(), []
service = _service(tracks=tracks, artists=artists, jobs=jobs, enqueued=enq)
with pytest.raises(ValidationError):
await service.save_remote(
source="youtube", source_id=" ", title="A", artist=None, added_by=None
)
+22
View File
@@ -190,3 +190,25 @@ async def test_upload_requires_auth(api: AsyncClient) -> None:
files={"file": ("x.mp3", b"data", "audio/mpeg")}, files={"file": ("x.mp3", b"data", "audio/mpeg")},
) )
assert resp.status_code == 401 assert resp.status_code == 401
async def test_list_tracks_filters_by_source(api: AsyncClient) -> None:
# Uploaded tracks carry source="upload"; `?source=` narrows the list (this
# powers the webui's "Recently uploaded" view, which survives a refresh).
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
await api.post(
"/api/v1/upload",
files={"file": ("uploaded.mp3", b"upload bytes" * 80, "audio/mpeg")},
headers=headers,
)
hit = await api.get("/api/v1/tracks", params={"source": "upload"}, headers=headers)
assert hit.status_code == 200, hit.text
items = hit.json()["items"]
assert len(items) == 1
assert items[0]["source"] == "upload"
miss = await api.get("/api/v1/tracks", params={"source": "youtube"}, headers=headers)
assert miss.status_code == 200
assert miss.json()["items"] == []
+135
View File
@@ -0,0 +1,135 @@
"""Unit tests for YouTubeMusicSource + registry (no network, injected libs)."""
from pathlib import Path
from typing import Any
import pytest
from app.core.config import Settings
from app.domain.sources import KIND_FETCH
from app.infrastructure.sources.registry import build_source_registry
from app.infrastructure.sources.youtube import YouTubeMusicSource
pytestmark = pytest.mark.asyncio
def _song_row(**overrides: Any) -> dict[str, Any]:
row: dict[str, Any] = {
"videoId": "abc123",
"title": "Bohemian Rhapsody",
"artists": [{"name": "Queen", "id": "a1"}],
"album": {"name": "A Night at the Opera", "id": "al1"},
"duration_seconds": 354,
"thumbnails": [
{"url": "http://img/small.jpg", "width": 60, "height": 60},
{"url": "http://img/large.jpg", "width": 240, "height": 240},
],
}
row.update(overrides)
return row
def _settings(**overrides: object) -> Settings:
return Settings(**overrides) # type: ignore[arg-type]
async def test_search_maps_ytmusic_rows() -> None:
source = YouTubeMusicSource(search_fn=lambda q, limit: [_song_row()])
[result] = await source.search("queen", limit=10)
assert result.source == "youtube"
assert result.source_id == "abc123"
assert result.title == "Bohemian Rhapsody"
assert result.artist == "Queen"
assert result.album == "A Night at the Opera"
assert result.duration_seconds == 354
assert result.thumbnail_url == "http://img/large.jpg" # last (largest)
async def test_search_joins_multiple_artists_and_tolerates_missing_fields() -> None:
row = _song_row(
artists=[{"name": "Queen"}, {"name": "David Bowie"}],
album=None,
thumbnails=[],
duration_seconds=None,
)
source = YouTubeMusicSource(search_fn=lambda q, limit: [row])
[result] = await source.search("under pressure", limit=10)
assert result.artist == "Queen, David Bowie"
assert result.album is None
assert result.thumbnail_url is None
assert result.duration_seconds is None
async def test_search_drops_rows_without_video_id() -> None:
rows = [_song_row(), _song_row(videoId=None), _song_row(videoId="xyz")]
source = YouTubeMusicSource(search_fn=lambda q, limit: rows)
results = await source.search("q", limit=10)
assert [r.source_id for r in results] == ["abc123", "xyz"]
async def test_search_empty_query_short_circuits() -> None:
called = False
def _search(q: str, limit: int) -> list[dict[str, Any]]:
nonlocal called
called = True
return []
source = YouTubeMusicSource(search_fn=_search)
assert await source.search(" ", limit=10) == []
assert called is False
async def test_search_degrades_to_empty_on_error() -> None:
def _boom(q: str, limit: int) -> list[dict[str, Any]]:
raise RuntimeError("service down")
source = YouTubeMusicSource(search_fn=_boom)
assert await source.search("q", limit=10) == []
async def test_fetch_maps_download_result(tmp_path: Path) -> None:
audio = tmp_path / "abc123.m4a"
audio.write_bytes(b"opus-bytes" * 10)
def _download(video_id: str, tmp_dir: Path, hook: Any, cookies: Path | None) -> dict[str, Any]:
return {
"filepath": audio,
"file_format": "m4a",
"bitrate": 160,
"title": "Bohemian Rhapsody",
}
source = YouTubeMusicSource(download_fn=_download)
result = await source.fetch("abc123")
assert result.source_id == "abc123"
assert result.path == audio
assert result.file_format == "m4a"
assert result.file_size == len(b"opus-bytes" * 10)
assert result.bitrate == 160
assert result.suggested_title == "Bohemian Rhapsody"
async def test_info_and_availability_with_injected_fn() -> None:
source = YouTubeMusicSource(search_fn=lambda q, limit: [])
info = source.info()
assert info.name == "youtube"
assert info.kind == KIND_FETCH
assert info.available is True # injected fn → treated as available
async def test_registry_registers_youtube_when_enabled() -> None:
registry = build_source_registry(_settings(youtube_enabled=True))
names = {info.name for info in registry.infos()}
assert "youtube" in names
# youtube is searchable + fetchable, not indexable
assert registry.searchable("youtube").name == "youtube"
assert registry.fetchable("youtube").name == "youtube"
async def test_registry_omits_youtube_when_disabled() -> None:
registry = build_source_registry(_settings(youtube_enabled=False))
names = {info.name for info in registry.infos()}
assert "youtube" not in names
Generated
+836 -762
View File
File diff suppressed because it is too large Load Diff