Files
mcma-backend/tests/test_download_service.py
Senko-san 58b98ab5ed
Docker Build & Publish / build (push) Successful in 1m10s
Docker Build & Publish / push (push) Failing after 7s
Docker Build & Publish / Prune old image versions (push) Has been skipped
feat(library): lazy materialization foundation for remote tracks (§Phase1)
Adds nullable storage fields + availability column on tracks, remote
source/source_id identity on albums/artists, TrackRepository.materialize()
and get_or_create_remote() repos — groundwork for on-demand YTM library
(placeholders saved without audio, materialized in-place on first play).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-14 17:51:43 +03:00

223 lines
7.1 KiB
Python

"""Unit tests for DownloadService — DB-free, in-memory fakes."""
import datetime as dt
import uuid
from pathlib import Path
import pytest
from app.application.download_service import DownloadService
from app.domain.entities import Artist, Track
from app.domain.entities.download import DownloadJob
from app.domain.sources import DownloadResult
pytestmark = pytest.mark.asyncio
class FakeArtistRepo:
async def get_or_create(self, name: str) -> Artist:
now = dt.datetime.now(dt.UTC)
return Artist(
id=uuid.uuid4(),
name=name,
source=None,
source_id=None,
created_at=now,
updated_at=now,
)
class FakeTrackRepo:
def __init__(self) -> None:
self.by_source: dict[tuple[str, str], Track] = {}
self.added: list[Track] = []
async def get_by_source(self, source: str, source_id: str) -> Track | None:
return self.by_source.get((source, source_id))
async def add(self, **kw: object) -> Track:
now = dt.datetime.now(dt.UTC)
track = Track(
id=kw["id"], # type: ignore[arg-type]
title=str(kw["title"]),
artist_id=kw["artist_id"], # type: ignore[arg-type]
album_id=None,
storage_uri=str(kw["storage_uri"]),
file_format=str(kw["file_format"]),
file_size=int(kw["file_size"]), # type: ignore[call-overload]
source=str(kw["source"]),
source_id=str(kw["source_id"]),
duration_seconds=None,
genre=None,
year=None,
track_number=None,
metadata_status=str(kw["metadata_status"]),
metadata_error=None,
enriched_at=None,
availability="local",
created_at=now,
updated_at=now,
)
self.by_source[(track.source, track.source_id)] = track
self.added.append(track)
return track
class FakeStorage:
def __init__(self) -> None:
self.saved: dict[str, Path] = {}
self.deleted: list[str] = []
async def save_file(self, key: str, src_path: Path) -> int:
self.saved[key] = src_path
return 1
async def delete(self, key: str) -> None:
self.deleted.append(key)
class FakeJobRepo:
def __init__(self) -> None:
self.jobs: dict[uuid.UUID, DownloadJob] = {}
self.active: dict[tuple[str, str], DownloadJob] = {}
def _make(self, **kw: object) -> DownloadJob:
now = dt.datetime.now(dt.UTC)
return DownloadJob(
id=uuid.uuid4(),
source=str(kw["source"]),
source_id=kw.get("source_id"), # type: ignore[arg-type]
query=kw.get("query"), # type: ignore[arg-type]
requested_by=kw.get("requested_by"), # type: ignore[arg-type]
status="queued",
progress=0.0,
error_message=None,
retry_count=0,
track_id=None,
created_at=now,
updated_at=now,
)
async def add(self, **kw: object) -> DownloadJob:
job = self._make(**kw)
self.jobs[job.id] = job
return job
async def get_by_id(self, job_id: uuid.UUID) -> DownloadJob | None:
return self.jobs.get(job_id)
async def get_active_for_source(self, source: str, source_id: str) -> DownloadJob | None:
return self.active.get((source, source_id))
async def set_status(self, job_id: uuid.UUID, **kw: object) -> None: ...
async def delete(self, job_id: uuid.UUID) -> None:
self.jobs.pop(job_id, None)
def _service(
*, jobs: FakeJobRepo, tracks: FakeTrackRepo, storage: FakeStorage, enqueued: list[uuid.UUID]
) -> DownloadService:
async def enqueue_download(job_id: uuid.UUID) -> None:
enqueued.append(job_id)
return DownloadService(
jobs=jobs, # type: ignore[arg-type]
tracks=tracks, # type: ignore[arg-type]
artists=FakeArtistRepo(), # type: ignore[arg-type]
storage=storage, # type: ignore[arg-type]
enqueue_download=enqueue_download,
)
def _track(source: str, source_id: str) -> Track:
now = dt.datetime.now(dt.UTC)
return Track(
id=uuid.uuid4(),
title="t",
artist_id=uuid.uuid4(),
album_id=None,
storage_uri="k",
file_format="mp3",
file_size=1,
source=source,
source_id=source_id,
duration_seconds=None,
genre=None,
year=None,
track_number=None,
metadata_status="pending",
metadata_error=None,
enriched_at=None,
availability="local",
created_at=now,
updated_at=now,
)
async def test_request_dedups_against_library() -> None:
jobs, tracks, storage, enq = FakeJobRepo(), FakeTrackRepo(), FakeStorage(), []
tracks.by_source[("youtube", "abc")] = _track("youtube", "abc")
svc = _service(jobs=jobs, tracks=tracks, storage=storage, enqueued=enq)
result = await svc.request(source="youtube", source_id="abc", query=None, requested_by=None)
assert result.already_in_library is True
assert result.track_id is not None
assert result.job is None
assert enq == [] # nothing enqueued
async def test_request_returns_existing_active_job() -> None:
jobs, tracks, storage, enq = FakeJobRepo(), FakeTrackRepo(), FakeStorage(), []
existing = await jobs.add(source="youtube", source_id="abc", query=None, requested_by=None)
jobs.active[("youtube", "abc")] = existing
svc = _service(jobs=jobs, tracks=tracks, storage=storage, enqueued=enq)
result = await svc.request(source="youtube", source_id="abc", query=None, requested_by=None)
assert result.already_in_library is False
assert result.job is not None
assert result.job.id == existing.id
assert enq == [] # not re-enqueued
async def test_request_creates_and_enqueues_new_job() -> None:
jobs, tracks, storage, enq = FakeJobRepo(), FakeTrackRepo(), FakeStorage(), []
svc = _service(jobs=jobs, tracks=tracks, storage=storage, enqueued=enq)
result = await svc.request(
source="youtube", source_id="abc", query="bohemian", requested_by=None
)
assert result.already_in_library is False
assert result.job is not None
assert enq == [result.job.id]
async def test_store_result_imports_and_cleans_temp(tmp_path: Path) -> None:
jobs, tracks, storage, enq = FakeJobRepo(), FakeTrackRepo(), FakeStorage(), []
svc = _service(jobs=jobs, tracks=tracks, storage=storage, enqueued=enq)
audio = tmp_path / "abc.webm"
audio.write_bytes(b"audio" * 20)
result = DownloadResult(
source_id="abc",
path=audio,
file_format="m4a",
file_size=100,
bitrate=160,
suggested_title="Bohemian Rhapsody",
)
track_id = await svc.store_result(source="youtube", result=result, requested_by=None)
assert len(tracks.added) == 1
stored = tracks.added[0]
assert stored.id == track_id
assert stored.source == "youtube"
assert stored.source_id == "abc"
assert stored.metadata_status == "pending"
assert stored.title == "Bohemian Rhapsody"
assert len(storage.saved) == 1
assert not audio.exists() # temp file removed