diff --git a/.env.example b/.env.example index 9382805..0c9d15a 100644 --- a/.env.example +++ b/.env.example @@ -27,6 +27,10 @@ MEDIA_PATH=/data/media TRANSCODE_CACHE_PATH=/data/transcode-cache MAX_PARALLEL_DOWNLOADS=2 +# sources — mounted folder the `local` source indexes (copies into MEDIA_PATH). +# Unset → the local source is not registered. Mount read-only in compose. +# LOCAL_MEDIA_IMPORT_PATH=/import + # external services (all optional — backend degrades gracefully if unset) # ML_SERVICE_URL=http://ml:9000 # ACOUSTID_API_KEY= diff --git a/README.md b/README.md index 180d489..89a35ae 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,26 @@ All settings come from environment variables (or `.env` in dev). See [`.env.example`](.env.example). External services (ML, AcoustID, MusicBrainz) are **optional** — the backend degrades gracefully when they are absent. +## Sources & importing music + +Music enters the library through **source backends** (`app/infrastructure/sources`), +selected via a registry. The first backend is **`local`** — it indexes a mounted +folder, copying each audio file into managed storage and creating a track +(`metadata_status=pending`; real metadata is filled later by enrichment). + +```bash +# point the instance at an existing library (mount read-only in compose) +LOCAL_MEDIA_IMPORT_PATH=/import + +GET /api/v1/sources # list configured sources + availability +POST /api/v1/sources/local/scan # admin: enqueue an import (runs in the worker) +GET /api/v1/sources/local/health # availability check +``` + +Scanning is a background job (arq worker) — the endpoint only enqueues it; the +walk + file copies never run in the request cycle. Re-scans are idempotent +(dedup on `(source, source_id)`, where `source_id` is the path within the root). + ## Subsonic API (`/rest`) A Subsonic-compatible API is mounted at `/rest`, so standard clients (Symfonium, diff --git a/app/api/deps.py b/app/api/deps.py index 2917434..8329ed7 100644 --- a/app/api/deps.py +++ b/app/api/deps.py @@ -35,6 +35,7 @@ from app.infrastructure.db.repositories import ( SqlAlchemyTrackRepository, SqlAlchemyUserRepository, ) +from app.infrastructure.sources.registry import SourceRegistry, build_source_registry from app.infrastructure.storage.provider import get_file_storage @@ -70,6 +71,14 @@ def get_subsonic_cipher() -> SubsonicCipher: return SubsonicPasswordCipher(get_settings().subsonic_secret_key.get_secret_value()) +@lru_cache +def get_source_registry() -> SourceRegistry: + return build_source_registry(get_settings()) + + +SourceRegistryDep = Annotated[SourceRegistry, Depends(get_source_registry)] + + # -- request-scoped services --------------------------------------------------- def get_auth_service(session: SessionDep) -> AuthService: return AuthService( diff --git a/app/api/schemas/source.py b/app/api/schemas/source.py new file mode 100644 index 0000000..f5fbf04 --- /dev/null +++ b/app/api/schemas/source.py @@ -0,0 +1,29 @@ +"""Schemas for the source endpoints.""" + +from pydantic import BaseModel + +from app.domain.sources import SourceInfo + + +class SourceInfoOut(BaseModel): + name: str + label: str + kind: str + available: bool + + @classmethod + def from_entity(cls, info: SourceInfo) -> SourceInfoOut: + return cls(name=info.name, label=info.label, kind=info.kind, available=info.available) + + +class ScanResponse(BaseModel): + """Result of enqueuing a source scan.""" + + source: str + job_id: str + status: str = "queued" + + +class SourceHealthOut(BaseModel): + name: str + available: bool diff --git a/app/api/v1/sources.py b/app/api/v1/sources.py index bd8a0c9..383b748 100644 --- a/app/api/v1/sources.py +++ b/app/api/v1/sources.py @@ -1,19 +1,44 @@ -"""External source endpoints (yt-dlp etc.).""" +"""External source endpoints: enumerate sources and trigger imports. + +Listing/health are read-only (any authenticated user). Scanning a source is an +admin action and runs in a worker — the endpoint only enqueues it. +""" from typing import Any from fastapi import APIRouter +from app.api.deps import CurrentUser, SourceRegistryDep, SuperUser +from app.api.schemas.source import ScanResponse, SourceHealthOut, SourceInfoOut +from app.domain.errors import DependencyUnavailableError +from app.workers.queue import enqueue + router = APIRouter(prefix="/sources", tags=["sources"]) @router.get("") -async def list_sources() -> Any: ... +async def list_sources(_: CurrentUser, registry: SourceRegistryDep) -> list[SourceInfoOut]: + return [SourceInfoOut.from_entity(info) for info in registry.infos()] -@router.get("/{source}/search") -async def search_source(source: str) -> Any: ... +@router.post("/{source}/scan") +async def scan_source(source: str, user: SuperUser, registry: SourceRegistryDep) -> ScanResponse: + backend = registry.indexable(source) # 404 if unknown, 422 if not indexable + if not backend.is_available(): + raise DependencyUnavailableError(f"Source {source!r} is not available.") + job_id = await enqueue("scan_local_folder", source=source, added_by=str(user.id)) + return ScanResponse(source=source, job_id=job_id) @router.get("/{source}/health") -async def source_health(source: str) -> Any: ... +async def source_health( + source: str, _: CurrentUser, registry: SourceRegistryDep +) -> SourceHealthOut: + backend = registry.get(source) # 404 if unknown + return SourceHealthOut(name=backend.name, available=backend.is_available()) + + +@router.get("/{source}/search") +async def search_source(source: str, _: CurrentUser) -> Any: + # Search is for fetch-style sources (youtube, …) — not yet implemented. + ... diff --git a/app/application/import_service.py b/app/application/import_service.py new file mode 100644 index 0000000..056e64e --- /dev/null +++ b/app/application/import_service.py @@ -0,0 +1,96 @@ +"""LibraryImportService — imports files discovered by an indexable source. + +Batch sibling of :class:`UploadService`: for each discovered file it dedups on +``(source, source_id)``, copies the file into managed storage, creates a minimal +track (artist ``Unknown Artist``, ``metadata_status=pending``), and leaves the +rest to enrichment (plan §6.2). Per-file failures are isolated — one bad file +must not abort the whole scan (graceful degradation). +""" + +import contextlib +import uuid +from dataclasses import dataclass + +from app.core.logging import get_logger +from app.domain.ports import ArtistRepository, FileStorage, IndexableSource, TrackRepository +from app.domain.sources import SourceFile + +log = get_logger(__name__) + +_UNKNOWN_ARTIST = "Unknown Artist" + + +@dataclass(frozen=True) +class ImportSummary: + source: str + seen: int + imported: int + skipped: int + failed: int + + +class LibraryImportService: + def __init__( + self, + *, + tracks: TrackRepository, + artists: ArtistRepository, + storage: FileStorage, + ) -> None: + self._tracks = tracks + self._artists = artists + self._storage = storage + + async def scan_and_import( + self, source: IndexableSource, *, added_by: uuid.UUID | None + ) -> ImportSummary: + seen = imported = skipped = failed = 0 + for file in source.scan(): + seen += 1 + try: + existing = await self._tracks.get_by_source(source.name, file.source_id) + if existing is not None: + skipped += 1 + continue + await self._import_one(source.name, file, added_by) + imported += 1 + except Exception: + failed += 1 + log.warning("import_file_failed", source=source.name, source_id=file.source_id) + summary = ImportSummary( + source=source.name, seen=seen, imported=imported, skipped=skipped, failed=failed + ) + log.info( + "import_complete", + source=summary.source, + seen=summary.seen, + imported=summary.imported, + skipped=summary.skipped, + failed=summary.failed, + ) + return summary + + async def _import_one( + self, source_name: str, file: SourceFile, added_by: uuid.UUID | None + ) -> None: + track_id = uuid.uuid4() + key = f"tracks/{str(track_id)[:2]}/{track_id}.{file.file_format}" + await self._storage.save_file(key, file.path) + try: + artist = await self._artists.get_or_create(_UNKNOWN_ARTIST) + await self._tracks.add( + id=track_id, + title=file.suggested_title, + artist_id=artist.id, + storage_uri=key, + file_format=file.file_format, + file_size=file.file_size, + source=source_name, + source_id=file.source_id, + metadata_status="pending", + added_by=added_by, + ) + except Exception: + with contextlib.suppress(Exception): + await self._storage.delete(key) + raise diff --git a/app/core/config.py b/app/core/config.py index 86e2fe6..4d9a3c4 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -58,6 +58,11 @@ class Settings(BaseSettings): storage_backend: Literal["local", "s3"] = "local" upload_tmp_dir: Path | None = None + # -- sources ---------------------------------------------------------- + # Mounted folder the ``local`` source indexes (copies into managed storage). + # Unset → the local source is simply not registered. + local_media_import_path: Path | None = None + # -- S3 storage (deferred; set storage_backend="s3" to use) ---------- s3_endpoint_url: str | None = None s3_bucket: str | None = None diff --git a/app/domain/ports.py b/app/domain/ports.py index e69c9c8..efcf387 100644 --- a/app/domain/ports.py +++ b/app/domain/ports.py @@ -7,7 +7,7 @@ are bound to these ports at the composition root (``app.api.deps``). import datetime as dt import uuid -from collections.abc import AsyncIterator +from collections.abc import AsyncIterator, Iterator from contextlib import AbstractAsyncContextManager from pathlib import Path from typing import Protocol @@ -23,6 +23,7 @@ from app.domain.entities import ( User, ) from app.domain.entities.track import Artist, Track +from app.domain.sources import SourceFile, SourceInfo from app.domain.tokens import IssuedToken, TokenClaims, TokenType @@ -221,3 +222,21 @@ class HistoryRepository(Protocol): self, *, user_id: uuid.UUID, limit: int, offset: int ) -> list[PlayHistoryEntry]: ... async def count(self, *, user_id: uuid.UUID) -> int: ... + + +class SourceBackend(Protocol): + """A registered source of tracks (mounted folder, YouTube, …). + + ``name`` is the stable identifier used in URLs and stored on ``track.source``. + """ + + name: str + + def info(self) -> SourceInfo: ... + def is_available(self) -> bool: ... + + +class IndexableSource(SourceBackend, Protocol): + """A source that enumerates files already on disk (e.g. the local folder).""" + + def scan(self) -> Iterator[SourceFile]: ... diff --git a/app/domain/sources.py b/app/domain/sources.py new file mode 100644 index 0000000..e0c0e9c --- /dev/null +++ b/app/domain/sources.py @@ -0,0 +1,39 @@ +"""Source-backend value objects — framework-free. + +A *source* is a place tracks come from (a mounted folder, YouTube, an upload). +Backends are driven adapters (``app.infrastructure.sources``); these are the +shapes they speak in, and the ports they satisfy live in ``app.domain.ports``. + +The first backend, ``local``, is *indexable*: it enumerates files already on +disk. Concrete metadata (artist/album/tags) is intentionally **not** resolved +here — a source yields a file plus a minimal title; enrichment (plan §6.2) fills +the rest later, so this stays a thin discovery layer (CLAUDE.md: no duplicated +business logic).""" + +from dataclasses import dataclass +from pathlib import Path + + +@dataclass(frozen=True, slots=True) +class SourceInfo: + """Describes a registered source for enumeration / health (UI, admin).""" + + name: str + label: str + kind: str # "indexable" (more kinds — search/download — arrive with youtube) + available: bool + + +@dataclass(frozen=True, slots=True) +class SourceFile: + """A single importable file discovered by an indexable source. + + ``source_id`` is stable per source (the local backend uses the path relative + to its root) so re-scans are idempotent — already-imported files are skipped. + """ + + source_id: str + path: Path + suggested_title: str + file_format: str + file_size: int diff --git a/app/infrastructure/sources/__init__.py b/app/infrastructure/sources/__init__.py new file mode 100644 index 0000000..49e6e04 --- /dev/null +++ b/app/infrastructure/sources/__init__.py @@ -0,0 +1 @@ +"""Source backends — driven adapters that discover/fetch tracks.""" diff --git a/app/infrastructure/sources/local_folder.py b/app/infrastructure/sources/local_folder.py new file mode 100644 index 0000000..901c02b --- /dev/null +++ b/app/infrastructure/sources/local_folder.py @@ -0,0 +1,60 @@ +"""``local`` source — indexes audio files from a mounted folder. + +Walks a configured root directory and yields each audio file as a +:class:`SourceFile`. It does **not** parse tags or resolve artist/album — that's +enrichment's job (plan §6.2); this stays a thin discovery layer. ``source_id`` +is the path relative to the root, so re-scans are idempotent. +""" + +import os +from collections.abc import Iterator +from pathlib import Path + +from app.domain.sources import SourceFile, SourceInfo +from app.infrastructure.db.models.enums import TrackSource + +# Extensions we treat as audio. Mirrors the formats StreamingService serves. +_AUDIO_EXTENSIONS = frozenset( + {"mp3", "flac", "m4a", "aac", "ogg", "opus", "wav", "wma", "aiff", "aif", "alac"} +) + + +class LocalFolderSource: + """Implements :class:`app.domain.ports.IndexableSource`.""" + + name = TrackSource.LOCAL.value + + def __init__(self, root: Path) -> None: + self._root = root + + def info(self) -> SourceInfo: + return SourceInfo( + name=self.name, + label="Local folder", + kind="indexable", + available=self.is_available(), + ) + + def is_available(self) -> bool: + return self._root.is_dir() + + def scan(self) -> Iterator[SourceFile]: + if not self.is_available(): + return + for dirpath, _dirnames, filenames in os.walk(self._root): + for filename in sorted(filenames): + ext = Path(filename).suffix.lower().lstrip(".") + if ext not in _AUDIO_EXTENSIONS: + continue + path = Path(dirpath) / filename + try: + size = path.stat().st_size + except OSError: + continue # vanished/unreadable between walk and stat → skip + yield SourceFile( + source_id=path.relative_to(self._root).as_posix(), + path=path, + suggested_title=path.stem or "Unknown", + file_format=ext, + file_size=size, + ) diff --git a/app/infrastructure/sources/registry.py b/app/infrastructure/sources/registry.py new file mode 100644 index 0000000..505ce4c --- /dev/null +++ b/app/infrastructure/sources/registry.py @@ -0,0 +1,41 @@ +"""Source registry — selection + enumeration of configured backends. + +Built from settings at the composition root. Only sources that are configured +are registered (e.g. ``local`` appears only when ``LOCAL_MEDIA_IMPORT_PATH`` is +set), so enumeration reflects what the instance can actually use. +""" + +from typing import cast + +from app.core.config import Settings +from app.domain.errors import NotFoundError, ValidationError +from app.domain.ports import IndexableSource, SourceBackend +from app.domain.sources import SourceInfo +from app.infrastructure.sources.local_folder import LocalFolderSource + + +class SourceRegistry: + def __init__(self, backends: list[SourceBackend]) -> None: + self._by_name = {backend.name: backend for backend in backends} + + def get(self, name: str) -> SourceBackend: + backend = self._by_name.get(name) + if backend is None: + raise NotFoundError(f"Source {name!r} is not configured.") + return backend + + def indexable(self, name: str) -> IndexableSource: + backend = self.get(name) + if not hasattr(backend, "scan"): + raise ValidationError(f"Source {name!r} cannot be indexed.") + return cast(IndexableSource, backend) + + def infos(self) -> list[SourceInfo]: + return [backend.info() for backend in self._by_name.values()] + + +def build_source_registry(settings: Settings) -> SourceRegistry: + backends: list[SourceBackend] = [] + if settings.local_media_import_path is not None: + backends.append(LocalFolderSource(settings.local_media_import_path)) + return SourceRegistry(backends) diff --git a/app/workers/arq_worker.py b/app/workers/arq_worker.py index ae9937d..67b0263 100644 --- a/app/workers/arq_worker.py +++ b/app/workers/arq_worker.py @@ -10,6 +10,7 @@ from arq.connections import RedisSettings from app.core.config import get_settings from app.core.logging import configure_logging, get_logger +from app.workers.tasks.import_task import scan_local_folder log = get_logger("worker") @@ -24,12 +25,8 @@ async def shutdown(_ctx: dict[str, Any]) -> None: log.info("worker_shutdown") -async def _noop(_ctx: dict[str, Any]) -> None: - pass - - class WorkerSettings: - functions: ClassVar[list[Any]] = [_noop] # populated as tasks are implemented + functions: ClassVar[list[Any]] = [scan_local_folder] on_startup = startup on_shutdown = shutdown max_jobs = get_settings().max_parallel_downloads diff --git a/app/workers/queue.py b/app/workers/queue.py new file mode 100644 index 0000000..b84bab6 --- /dev/null +++ b/app/workers/queue.py @@ -0,0 +1,30 @@ +"""Enqueue helper — submit a job to the arq queue from the request cycle. + +A short-lived pool per call keeps things simple (enqueues are rare, admin-driven +actions). Redis being down degrades to a clean 503 rather than a crash +(graceful degradation).""" + +from typing import Any + +from arq import create_pool +from arq.connections import RedisSettings + +from app.core.config import get_settings +from app.domain.errors import DependencyUnavailableError + + +async def enqueue(function: str, **kwargs: Any) -> str: + """Enqueue ``function`` by name, returning the job id. Raises + :class:`DependencyUnavailableError` if the queue can't be reached.""" + settings = get_settings() + try: + pool = await create_pool(RedisSettings.from_dsn(str(settings.redis_url))) + except Exception as exc: + raise DependencyUnavailableError("Task queue (Redis) is unavailable.") from exc + try: + job = await pool.enqueue_job(function, **kwargs) + finally: + await pool.aclose() + if job is None: + raise DependencyUnavailableError("Could not enqueue job.") + return str(job.job_id) diff --git a/app/workers/tasks/__init__.py b/app/workers/tasks/__init__.py new file mode 100644 index 0000000..7c2bdd5 --- /dev/null +++ b/app/workers/tasks/__init__.py @@ -0,0 +1 @@ +"""arq task functions. Registered in ``app.workers.arq_worker.WorkerSettings``.""" diff --git a/app/workers/tasks/import_task.py b/app/workers/tasks/import_task.py new file mode 100644 index 0000000..ede9041 --- /dev/null +++ b/app/workers/tasks/import_task.py @@ -0,0 +1,46 @@ +"""arq task: scan an indexable source and import its files into the library. + +Heavy work (directory walk + file copies) belongs off the request cycle +(CLAUDE.md). The HTTP endpoint enqueues this; the worker runs it with its own +transactional session. +""" + +import uuid +from typing import Any + +from app.application.import_service import LibraryImportService +from app.core.config import get_settings +from app.core.logging import get_logger +from app.infrastructure.db import session_scope +from app.infrastructure.db.repositories import ( + SqlAlchemyArtistRepository, + SqlAlchemyTrackRepository, +) +from app.infrastructure.sources.registry import build_source_registry +from app.infrastructure.storage.provider import get_file_storage + +log = get_logger("worker.import") + + +async def scan_local_folder( + _ctx: dict[str, Any], *, source: str = "local", added_by: str | None = None +) -> dict[str, Any]: + registry = build_source_registry(get_settings()) + backend = registry.indexable(source) + actor = uuid.UUID(added_by) if added_by else None + + async with session_scope() as session: + service = LibraryImportService( + tracks=SqlAlchemyTrackRepository(session), + artists=SqlAlchemyArtistRepository(session), + storage=get_file_storage(), + ) + summary = await service.scan_and_import(backend, added_by=actor) + + return { + "source": summary.source, + "seen": summary.seen, + "imported": summary.imported, + "skipped": summary.skipped, + "failed": summary.failed, + } diff --git a/tests/test_import_service.py b/tests/test_import_service.py new file mode 100644 index 0000000..45d87ba --- /dev/null +++ b/tests/test_import_service.py @@ -0,0 +1,157 @@ +"""Unit tests for LibraryImportService — DB-free, in-memory fakes.""" + +import datetime as dt +import uuid +from collections.abc import Iterator +from pathlib import Path + +import pytest +from app.application.import_service import LibraryImportService +from app.domain.entities import Artist, Track +from app.domain.sources import SourceFile, SourceInfo + +pytestmark = pytest.mark.asyncio + + +class FakeArtistRepo: + def __init__(self) -> None: + self._by_name: dict[str, Artist] = {} + + async def get_or_create(self, name: str) -> Artist: + if name not in self._by_name: + now = dt.datetime.now(dt.UTC) + self._by_name[name] = Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now) + return self._by_name[name] + + +class FakeTrackRepo: + def __init__(self, *, fail_on: set[str] | None = None) -> None: + self.by_source: dict[tuple[str, str], Track] = {} + self.added: list[Track] = [] + self._fail_on = fail_on or set() + + async def get_by_source(self, source: str, source_id: str) -> Track | None: + return self.by_source.get((source, source_id)) + + async def add(self, **kw: object) -> Track: + source_id = str(kw["source_id"]) + if source_id in self._fail_on: + raise RuntimeError("simulated add failure") + now = dt.datetime.now(dt.UTC) + track = Track( + id=uuid.UUID(str(kw["id"])) if not isinstance(kw["id"], uuid.UUID) else kw["id"], + title=str(kw["title"]), + artist_id=kw["artist_id"], # type: ignore[arg-type] + album_id=None, + storage_uri=str(kw["storage_uri"]), + file_format=str(kw["file_format"]), + file_size=int(kw["file_size"]), # type: ignore[call-overload] + source=str(kw["source"]), + source_id=source_id, + duration_seconds=None, + genre=None, + year=None, + metadata_status=str(kw["metadata_status"]), + created_at=now, + updated_at=now, + ) + self.by_source[(track.source, track.source_id)] = track + self.added.append(track) + return track + + +class FakeStorage: + def __init__(self) -> None: + self.saved: dict[str, Path] = {} + self.deleted: list[str] = [] + + async def save_file(self, key: str, src_path: Path) -> int: + self.saved[key] = src_path + return 1 + + async def delete(self, key: str) -> None: + self.deleted.append(key) + + +class FakeSource: + name = "local" + + def __init__(self, files: list[SourceFile]) -> None: + self._files = files + + def info(self) -> SourceInfo: + return SourceInfo(name=self.name, label="Local", kind="indexable", available=True) + + def is_available(self) -> bool: + return True + + def scan(self) -> Iterator[SourceFile]: + yield from self._files + + +def _file(source_id: str) -> SourceFile: + return SourceFile( + source_id=source_id, + path=Path("/music") / source_id, + suggested_title=Path(source_id).stem, + file_format="mp3", + file_size=123, + ) + + +def _service(tracks: FakeTrackRepo, storage: FakeStorage) -> LibraryImportService: + return LibraryImportService(tracks=tracks, artists=FakeArtistRepo(), storage=storage) # type: ignore[arg-type] + + +async def test_imports_new_files() -> None: + tracks, storage = FakeTrackRepo(), FakeStorage() + source = FakeSource([_file("a.mp3"), _file("b/c.mp3")]) + + summary = await _service(tracks, storage).scan_and_import(source, added_by=None) # type: ignore[arg-type] + + assert (summary.seen, summary.imported, summary.skipped, summary.failed) == (2, 2, 0, 0) + assert len(tracks.added) == 2 + assert len(storage.saved) == 2 + assert all(t.metadata_status == "pending" for t in tracks.added) + assert all(t.source == "local" for t in tracks.added) + + +async def test_dedup_skips_already_imported() -> None: + tracks, storage = FakeTrackRepo(), FakeStorage() + now = dt.datetime.now(dt.UTC) + tracks.by_source[("local", "a.mp3")] = Track( + id=uuid.uuid4(), + title="a", + artist_id=uuid.uuid4(), + album_id=None, + storage_uri="k", + file_format="mp3", + file_size=1, + source="local", + source_id="a.mp3", + duration_seconds=None, + genre=None, + year=None, + metadata_status="pending", + created_at=now, + updated_at=now, + ) + source = FakeSource([_file("a.mp3"), _file("new.mp3")]) + + summary = await _service(tracks, storage).scan_and_import(source, added_by=None) # type: ignore[arg-type] + + assert (summary.imported, summary.skipped) == (1, 1) + assert len(storage.saved) == 1 # only the new file copied + + +async def test_per_file_failure_is_isolated_and_rolls_back_storage() -> None: + tracks = FakeTrackRepo(fail_on={"bad.mp3"}) + storage = FakeStorage() + source = FakeSource([_file("good.mp3"), _file("bad.mp3")]) + + summary = await _service(tracks, storage).scan_and_import(source, added_by=None) # type: ignore[arg-type] + + assert (summary.seen, summary.imported, summary.failed) == (2, 1, 1) + # The failed import's copied file was cleaned up; the good one stays. + assert len(storage.deleted) == 1 + assert len(tracks.added) == 1 diff --git a/tests/test_local_folder_source.py b/tests/test_local_folder_source.py new file mode 100644 index 0000000..6b8809e --- /dev/null +++ b/tests/test_local_folder_source.py @@ -0,0 +1,59 @@ +"""Unit tests for the local-folder source + registry (no DB, no network).""" + +from pathlib import Path + +from app.core.config import Settings +from app.infrastructure.sources.local_folder import LocalFolderSource +from app.infrastructure.sources.registry import build_source_registry + + +def _settings(**overrides: object) -> Settings: + return Settings(**overrides) # type: ignore[arg-type] + + +def test_scan_discovers_audio_recursively(tmp_path: Path) -> None: + (tmp_path / "a.mp3").write_bytes(b"x") + (tmp_path / "sub").mkdir() + (tmp_path / "sub" / "b.flac").write_bytes(b"yy") + (tmp_path / "notes.txt").write_bytes(b"ignore me") # non-audio → skipped + + files = list(LocalFolderSource(tmp_path).scan()) + by_id = {f.source_id: f for f in files} + + assert set(by_id) == {"a.mp3", "sub/b.flac"} + assert by_id["a.mp3"].file_format == "mp3" + assert by_id["a.mp3"].suggested_title == "a" + assert by_id["sub/b.flac"].file_format == "flac" + assert by_id["sub/b.flac"].file_size == 2 + + +def test_source_id_is_stable_relative_path(tmp_path: Path) -> None: + (tmp_path / "x.opus").write_bytes(b"z") + [only] = list(LocalFolderSource(tmp_path).scan()) + assert only.source_id == "x.opus" + assert only.path == tmp_path / "x.opus" + + +def test_is_available_false_when_missing(tmp_path: Path) -> None: + source = LocalFolderSource(tmp_path / "nope") + assert source.is_available() is False + assert list(source.scan()) == [] # scanning an unavailable source yields nothing + + +def test_info_reports_kind_and_availability(tmp_path: Path) -> None: + info = LocalFolderSource(tmp_path).info() + assert info.name == "local" + assert info.kind == "indexable" + assert info.available is True + + +def test_registry_registers_local_when_path_set(tmp_path: Path) -> None: + registry = build_source_registry(_settings(local_media_import_path=tmp_path)) + names = {info.name for info in registry.infos()} + assert names == {"local"} + assert registry.indexable("local").is_available() is True + + +def test_registry_empty_when_path_unset() -> None: + registry = build_source_registry(_settings(local_media_import_path=None)) + assert registry.infos() == [] diff --git a/tests/test_sources_api.py b/tests/test_sources_api.py new file mode 100644 index 0000000..3757afc --- /dev/null +++ b/tests/test_sources_api.py @@ -0,0 +1,151 @@ +"""Integration tests for sources: enumeration + the real import path. + +Requires a reachable Postgres; skips otherwise. The scan worker task is invoked +directly (no Redis needed) so the full DB + storage import path is covered. +""" + +import asyncio +import os +from collections.abc import AsyncIterator +from pathlib import Path + +import pytest +from app.core.config import get_settings +from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope +from app.infrastructure.db.repositories import ( + SqlAlchemyRefreshTokenRepository, + SqlAlchemyUserRepository, +) +from asgi_lifespan import LifespanManager +from httpx import ASGITransport, AsyncClient + +pytestmark = pytest.mark.asyncio + +_db_reachable_cache: bool | None = None + + +async def _db_reachable() -> bool: + global _db_reachable_cache + if _db_reachable_cache is not None: + return _db_reachable_cache + from sqlalchemy import text + + try: + async with asyncio.timeout(3): + async with get_engine().connect() as conn: + await conn.execute(text("SELECT 1")) + _db_reachable_cache = True + except Exception: + _db_reachable_cache = False + return _db_reachable_cache + + +@pytest.fixture +async def api(tmp_path: Path) -> AsyncIterator[AsyncClient]: + if not await _db_reachable(): + pytest.skip("Postgres not reachable — integration test skipped.") + + media = tmp_path / "media" + music = tmp_path / "music" + media.mkdir() + music.mkdir() + # Two audio files (+ a non-audio file that must be ignored) in a subfolder. + (music / "one.mp3").write_bytes(b"first track bytes" * 8) + (music / "artist").mkdir() + (music / "artist" / "two.flac").write_bytes(b"second track bytes" * 8) + (music / "cover.txt").write_bytes(b"not audio") + + os.environ["MEDIA_PATH"] = str(media) + os.environ["LOCAL_MEDIA_IMPORT_PATH"] = str(music) + get_settings.cache_clear() + + import app.infrastructure.storage.provider as _storage_provider + + _storage_provider._storage = None + + try: + async with get_engine().begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + await conn.run_sync(Base.metadata.create_all) + + from app.application.user_service import UserService + from app.core.security import Argon2PasswordHasher + + async with session_scope() as session: + await UserService( + users=SqlAlchemyUserRepository(session), + refresh_tokens=SqlAlchemyRefreshTokenRepository(session), + hasher=Argon2PasswordHasher(), + ).create_user(username="admin", password="adminpass1", is_superuser=True) + + from app.main import create_app + + app = create_app() + async with LifespanManager(app): + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as client: + yield client + + async with get_engine().begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + await dispose_engine() + finally: + _storage_provider._storage = None + os.environ.pop("MEDIA_PATH", None) + os.environ.pop("LOCAL_MEDIA_IMPORT_PATH", None) + get_settings.cache_clear() + + +async def _login(api: AsyncClient) -> str: + resp = await api.post( + "/api/v1/auth/login", json={"username": "admin", "password": "adminpass1"} + ) + assert resp.status_code == 200 + return str(resp.json()["access_token"]) + + +async def test_list_sources_includes_local(api: AsyncClient) -> None: + token = await _login(api) + resp = await api.get("/api/v1/sources", headers={"Authorization": f"Bearer {token}"}) + assert resp.status_code == 200 + sources = {s["name"]: s for s in resp.json()} + assert "local" in sources + assert sources["local"]["available"] is True + assert sources["local"]["kind"] == "indexable" + + +async def test_local_import_creates_streamable_tracks(api: AsyncClient) -> None: + token = await _login(api) + headers = {"Authorization": f"Bearer {token}"} + + # Run the worker task directly (bypasses Redis); it imports against the DB. + from app.workers.tasks.import_task import scan_local_folder + + summary = await scan_local_folder({}, source="local", added_by=None) + assert summary["seen"] == 2 + assert summary["imported"] == 2 + assert summary["failed"] == 0 + + # A second run is idempotent — everything already indexed. + again = await scan_local_folder({}, source="local", added_by=None) + assert again["imported"] == 0 + assert again["skipped"] == 2 + + listing = await api.get("/api/v1/tracks", headers=headers) + assert listing.status_code == 200 + items = listing.json()["items"] + assert len(items) == 2 + titles = {t["title"] for t in items} + assert titles == {"one", "two"} + + # And the imported file actually streams back. + track_id = items[0]["id"] + stream = await api.get(f"/api/v1/stream/{track_id}", headers=headers) + assert stream.status_code == 200 + assert len(stream.content) > 0 + + +async def test_scan_requires_admin(api: AsyncClient) -> None: + # The scan endpoint enqueues to Redis; here we only assert it's admin-gated. + resp = await api.post("/api/v1/sources/local/scan") + assert resp.status_code == 401