58b98ab5ed
Adds nullable storage fields + availability column on tracks, remote source/source_id identity on albums/artists, TrackRepository.materialize() and get_or_create_remote() repos — groundwork for on-demand YTM library (placeholders saved without audio, materialized in-place on first play). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
173 lines
5.5 KiB
Python
173 lines
5.5 KiB
Python
"""Unit tests for LibraryImportService — DB-free, in-memory fakes."""
|
|
|
|
import datetime as dt
|
|
import uuid
|
|
from collections.abc import Iterator
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from app.application.import_service import LibraryImportService
|
|
from app.domain.entities import Artist, Track
|
|
from app.domain.sources import SourceFile, SourceInfo
|
|
|
|
pytestmark = pytest.mark.asyncio
|
|
|
|
|
|
class FakeArtistRepo:
|
|
def __init__(self) -> None:
|
|
self._by_name: dict[str, Artist] = {}
|
|
|
|
async def get_or_create(self, name: str) -> Artist:
|
|
if name not in self._by_name:
|
|
now = dt.datetime.now(dt.UTC)
|
|
self._by_name[name] = Artist(
|
|
id=uuid.uuid4(),
|
|
name=name,
|
|
source=None,
|
|
source_id=None,
|
|
created_at=now,
|
|
updated_at=now,
|
|
)
|
|
return self._by_name[name]
|
|
|
|
|
|
class FakeTrackRepo:
|
|
def __init__(self, *, fail_on: set[str] | None = None) -> None:
|
|
self.by_source: dict[tuple[str, str], Track] = {}
|
|
self.added: list[Track] = []
|
|
self._fail_on = fail_on or set()
|
|
|
|
async def get_by_source(self, source: str, source_id: str) -> Track | None:
|
|
return self.by_source.get((source, source_id))
|
|
|
|
async def add(self, **kw: object) -> Track:
|
|
source_id = str(kw["source_id"])
|
|
if source_id in self._fail_on:
|
|
raise RuntimeError("simulated add failure")
|
|
now = dt.datetime.now(dt.UTC)
|
|
track = Track(
|
|
id=uuid.UUID(str(kw["id"])) if not isinstance(kw["id"], uuid.UUID) else kw["id"],
|
|
title=str(kw["title"]),
|
|
artist_id=kw["artist_id"], # type: ignore[arg-type]
|
|
album_id=None,
|
|
storage_uri=str(kw["storage_uri"]),
|
|
file_format=str(kw["file_format"]),
|
|
file_size=int(kw["file_size"]), # type: ignore[call-overload]
|
|
source=str(kw["source"]),
|
|
source_id=source_id,
|
|
duration_seconds=None,
|
|
genre=None,
|
|
year=None,
|
|
track_number=None,
|
|
metadata_status=str(kw["metadata_status"]),
|
|
metadata_error=None,
|
|
enriched_at=None,
|
|
availability="local",
|
|
created_at=now,
|
|
updated_at=now,
|
|
)
|
|
self.by_source[(track.source, track.source_id)] = track
|
|
self.added.append(track)
|
|
return track
|
|
|
|
|
|
class FakeStorage:
|
|
def __init__(self) -> None:
|
|
self.saved: dict[str, Path] = {}
|
|
self.deleted: list[str] = []
|
|
|
|
async def save_file(self, key: str, src_path: Path) -> int:
|
|
self.saved[key] = src_path
|
|
return 1
|
|
|
|
async def delete(self, key: str) -> None:
|
|
self.deleted.append(key)
|
|
|
|
|
|
class FakeSource:
|
|
name = "local"
|
|
|
|
def __init__(self, files: list[SourceFile]) -> None:
|
|
self._files = files
|
|
|
|
def info(self) -> SourceInfo:
|
|
return SourceInfo(name=self.name, label="Local", kind="indexable", available=True)
|
|
|
|
def is_available(self) -> bool:
|
|
return True
|
|
|
|
def scan(self) -> Iterator[SourceFile]:
|
|
yield from self._files
|
|
|
|
|
|
def _file(source_id: str) -> SourceFile:
|
|
return SourceFile(
|
|
source_id=source_id,
|
|
path=Path("/music") / source_id,
|
|
suggested_title=Path(source_id).stem,
|
|
file_format="mp3",
|
|
file_size=123,
|
|
)
|
|
|
|
|
|
def _service(tracks: FakeTrackRepo, storage: FakeStorage) -> LibraryImportService:
|
|
return LibraryImportService(tracks=tracks, artists=FakeArtistRepo(), storage=storage) # type: ignore[arg-type]
|
|
|
|
|
|
async def test_imports_new_files() -> None:
|
|
tracks, storage = FakeTrackRepo(), FakeStorage()
|
|
source = FakeSource([_file("a.mp3"), _file("b/c.mp3")])
|
|
|
|
summary = await _service(tracks, storage).scan_and_import(source, added_by=None) # type: ignore[arg-type]
|
|
|
|
assert (summary.seen, summary.imported, summary.skipped, summary.failed) == (2, 2, 0, 0)
|
|
assert len(tracks.added) == 2
|
|
assert len(storage.saved) == 2
|
|
assert all(t.metadata_status == "pending" for t in tracks.added)
|
|
assert all(t.source == "local" for t in tracks.added)
|
|
|
|
|
|
async def test_dedup_skips_already_imported() -> None:
|
|
tracks, storage = FakeTrackRepo(), FakeStorage()
|
|
now = dt.datetime.now(dt.UTC)
|
|
tracks.by_source[("local", "a.mp3")] = Track(
|
|
id=uuid.uuid4(),
|
|
title="a",
|
|
artist_id=uuid.uuid4(),
|
|
album_id=None,
|
|
storage_uri="k",
|
|
file_format="mp3",
|
|
file_size=1,
|
|
source="local",
|
|
source_id="a.mp3",
|
|
duration_seconds=None,
|
|
genre=None,
|
|
year=None,
|
|
track_number=None,
|
|
metadata_status="pending",
|
|
metadata_error=None,
|
|
enriched_at=None,
|
|
availability="local",
|
|
created_at=now,
|
|
updated_at=now,
|
|
)
|
|
source = FakeSource([_file("a.mp3"), _file("new.mp3")])
|
|
|
|
summary = await _service(tracks, storage).scan_and_import(source, added_by=None) # type: ignore[arg-type]
|
|
|
|
assert (summary.imported, summary.skipped) == (1, 1)
|
|
assert len(storage.saved) == 1 # only the new file copied
|
|
|
|
|
|
async def test_per_file_failure_is_isolated_and_rolls_back_storage() -> None:
|
|
tracks = FakeTrackRepo(fail_on={"bad.mp3"})
|
|
storage = FakeStorage()
|
|
source = FakeSource([_file("good.mp3"), _file("bad.mp3")])
|
|
|
|
summary = await _service(tracks, storage).scan_and_import(source, added_by=None) # type: ignore[arg-type]
|
|
|
|
assert (summary.seen, summary.imported, summary.failed) == (2, 1, 1)
|
|
# The failed import's copied file was cleaned up; the good one stays.
|
|
assert len(storage.deleted) == 1
|
|
assert len(tracks.added) == 1
|