"""Unit tests for LibraryImportService — DB-free, in-memory fakes.""" import datetime as dt import uuid from collections.abc import Iterator from pathlib import Path import pytest from app.application.import_service import LibraryImportService from app.domain.entities import Artist, Track from app.domain.sources import SourceFile, SourceInfo pytestmark = pytest.mark.asyncio class FakeArtistRepo: def __init__(self) -> None: self._by_name: dict[str, Artist] = {} async def get_or_create(self, name: str) -> Artist: if name not in self._by_name: now = dt.datetime.now(dt.UTC) self._by_name[name] = Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now) return self._by_name[name] class FakeTrackRepo: def __init__(self, *, fail_on: set[str] | None = None) -> None: self.by_source: dict[tuple[str, str], Track] = {} self.added: list[Track] = [] self._fail_on = fail_on or set() async def get_by_source(self, source: str, source_id: str) -> Track | None: return self.by_source.get((source, source_id)) async def add(self, **kw: object) -> Track: source_id = str(kw["source_id"]) if source_id in self._fail_on: raise RuntimeError("simulated add failure") now = dt.datetime.now(dt.UTC) track = Track( id=uuid.UUID(str(kw["id"])) if not isinstance(kw["id"], uuid.UUID) else kw["id"], title=str(kw["title"]), artist_id=kw["artist_id"], # type: ignore[arg-type] album_id=None, storage_uri=str(kw["storage_uri"]), file_format=str(kw["file_format"]), file_size=int(kw["file_size"]), # type: ignore[call-overload] source=str(kw["source"]), source_id=source_id, duration_seconds=None, genre=None, year=None, track_number=None, metadata_status=str(kw["metadata_status"]), metadata_error=None, enriched_at=None, created_at=now, updated_at=now, ) self.by_source[(track.source, track.source_id)] = track self.added.append(track) return track class FakeStorage: def __init__(self) -> None: self.saved: dict[str, Path] = {} self.deleted: list[str] = [] async def save_file(self, key: str, src_path: Path) -> int: self.saved[key] = src_path return 1 async def delete(self, key: str) -> None: self.deleted.append(key) class FakeSource: name = "local" def __init__(self, files: list[SourceFile]) -> None: self._files = files def info(self) -> SourceInfo: return SourceInfo(name=self.name, label="Local", kind="indexable", available=True) def is_available(self) -> bool: return True def scan(self) -> Iterator[SourceFile]: yield from self._files def _file(source_id: str) -> SourceFile: return SourceFile( source_id=source_id, path=Path("/music") / source_id, suggested_title=Path(source_id).stem, file_format="mp3", file_size=123, ) def _service(tracks: FakeTrackRepo, storage: FakeStorage) -> LibraryImportService: return LibraryImportService(tracks=tracks, artists=FakeArtistRepo(), storage=storage) # type: ignore[arg-type] async def test_imports_new_files() -> None: tracks, storage = FakeTrackRepo(), FakeStorage() source = FakeSource([_file("a.mp3"), _file("b/c.mp3")]) summary = await _service(tracks, storage).scan_and_import(source, added_by=None) # type: ignore[arg-type] assert (summary.seen, summary.imported, summary.skipped, summary.failed) == (2, 2, 0, 0) assert len(tracks.added) == 2 assert len(storage.saved) == 2 assert all(t.metadata_status == "pending" for t in tracks.added) assert all(t.source == "local" for t in tracks.added) async def test_dedup_skips_already_imported() -> None: tracks, storage = FakeTrackRepo(), FakeStorage() now = dt.datetime.now(dt.UTC) tracks.by_source[("local", "a.mp3")] = Track( id=uuid.uuid4(), title="a", artist_id=uuid.uuid4(), album_id=None, storage_uri="k", file_format="mp3", file_size=1, source="local", source_id="a.mp3", duration_seconds=None, genre=None, year=None, track_number=None, metadata_status="pending", metadata_error=None, enriched_at=None, created_at=now, updated_at=now, ) source = FakeSource([_file("a.mp3"), _file("new.mp3")]) summary = await _service(tracks, storage).scan_and_import(source, added_by=None) # type: ignore[arg-type] assert (summary.imported, summary.skipped) == (1, 1) assert len(storage.saved) == 1 # only the new file copied async def test_per_file_failure_is_isolated_and_rolls_back_storage() -> None: tracks = FakeTrackRepo(fail_on={"bad.mp3"}) storage = FakeStorage() source = FakeSource([_file("good.mp3"), _file("bad.mp3")]) summary = await _service(tracks, storage).scan_and_import(source, added_by=None) # type: ignore[arg-type] assert (summary.seen, summary.imported, summary.failed) == (2, 1, 1) # The failed import's copied file was cleaned up; the good one stays. assert len(storage.deleted) == 1 assert len(tracks.added) == 1