Compare commits

...

5 Commits

Author SHA1 Message Date
Senko-san c72d19599a feat(enrichment): tag-first metadata pipeline (§1D)
Docker Build & Publish / push (push) Has been cancelled
Docker Build & Publish / Prune old image versions (push) Has been cancelled
Docker Build & Publish / build (push) Failing after 10m8s
Implements the §6.2 enrichment pipeline: embedded tags → Chromaprint
fingerprint → AcoustID lookup. Well-tagged files get correct
artist/album/title offline; the rest are identified via AcoustID
(which also yields a MusicBrainz recording id in one call).

- domain: AudioTags/Fingerprint/RecordingMatch value objects; ports
  AudioTagReader, AudioFingerprinter, AcoustIdClient; TrackRepository
  .apply_enrichment (gap-fill, never erases) + AlbumRepository.get_or_create
- infrastructure/metadata: MutagenTagReader, FpcalcFingerprinter,
  AcoustIdHttpClient (rich meta=recordings+releasegroups, throttled)
- application: MetadataEnrichmentService — tags preferred, AcoustID fills
  gaps; resolves artist/album; status enriched/failed; skips manual;
  every external step wrapped (graceful degradation)
- workers: enrich_task registered; enqueue_enrich is best-effort and
  deferred so the caller's txn commits before the worker reads the row
- wiring: upload enqueues after add; import returns imported_ids and
  enqueues post-commit (mid-scan would race the worker); manual
  POST /tracks/{id}/metadata/enrich endpoint
- deps: add mutagen (fpcalc/ffmpeg already in the image)

Tests: metadata service orchestration, AcoustID parser, tag helpers.
125 passed; mypy strict + ruff clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-09 13:04:02 +03:00
Senko-san 48e3418c7f feat(sources): local_folder source backend + import pipeline
Docker Build & Publish / build (push) Has been cancelled
Docker Build & Publish / push (push) Has been cancelled
Docker Build & Publish / Prune old image versions (push) Has been cancelled
First ingest path beyond manual upload (plan §1C). Source abstraction +
the first concrete backend, so a homelab can index an existing library.

- domain: SourceBackend/IndexableSource ports + SourceInfo/SourceFile shapes
- infrastructure/sources: LocalFolderSource (walks a mounted dir, idempotent
  source_id = relative path) + registry built from settings
- application: LibraryImportService — batch sibling of UploadService; dedup on
  (source, source_id), copy into storage, minimal track (metadata_status=pending,
  enrichment fills the rest in 1D), per-file failures isolated
- workers: scan_local_folder arq task (registered) + enqueue helper (503 if
  Redis down)
- api: GET /sources, POST /sources/{source}/scan (admin, enqueues), /health
- config: LOCAL_MEDIA_IMPORT_PATH; README + .env.example documented
- tests: scanner, registry, import service (fakes) + DB-gated sources API path

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 20:02:09 +03:00
Senko-san 551afbab13 feat(subsonic): browsing, search, media, playlist, annotation endpoints
Docker Build & Publish / build (push) Has been cancelled
Docker Build & Publish / push (push) Has been cancelled
Docker Build & Publish / Prune old image versions (push) Has been cancelled
Thin adapters over the existing services/repositories (no business logic):

- system: ping (auth check), getLicense
- browsing: getArtists/getArtist/getAlbum, getAlbumList(2) (newest/alpha/random),
  getSong, getGenres, getMusicFolders/getIndexes/getMusicDirectory (one folder)
- search: search3 (delegates to the library repos)
- media: stream + download (reuse StreamingService, honor Range); getCoverArt
  returns a placeholder until the cover pipeline lands
- playlists: get/create/update/delete over the playlist repo (owner-scoped)
- annotation: star/unstar → append-only like log, scrobble → play history,
  setRating → clean no-op
- all endpoints also accept the .view suffix and GET+POST for client compat

Repo support: album list ordering (newest/random), track genre facets.
README documents the mandatory-HTTPS requirement and app-password workflow.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 18:24:06 +03:00
Senko-san b975164fc2 feat(subsonic): response envelope, id scheme, and error mapping
- envelope: one serializer emitting the <subsonic-response> wrapper in XML
  (default) and JSON (f=json), carrying status/version/type/serverVersion
- ids: stable, reversible type-prefixed ids (tr-/al-/ar-/pl-) ↔ UUIDs
- errors: /rest requests render the Subsonic error envelope (always HTTP 200)
  with standard codes (10 missing param, 40 wrong creds, 50, 70 not found)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 18:23:30 +03:00
Senko-san 7a17e3babd feat(subsonic): per-user encrypted app-password foundation
Subsonic auth (t=md5(password+salt), legacy p=) needs a recoverable secret,
but login passwords are stored as a one-way argon2 hash. Add a separate,
per-user app-password: high-entropy, random, and encrypted at rest with a
Fernet key derived from SUBSONIC_SECRET_KEY (never stored in the DB).

- SubsonicPasswordCipher + generate_subsonic_password in core.security
- users.subsonic_password_enc column (+ Alembic migration), repo + port methods
- SubsonicAuthService: verify (t+s / p / p=enc:) and rotate/reveal lifecycle
- self-service GET/POST /users/me/subsonic-password + admin rotate endpoint
- domain SubsonicCredentials + SubsonicCipher port; deps wiring

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 18:23:19 +03:00
60 changed files with 4635 additions and 807 deletions
+9
View File
@@ -17,11 +17,20 @@ JWT_SECRET=change-me-in-prod
ACCESS_TOKEN_TTL_SECONDS=900
REFRESH_TOKEN_TTL_SECONDS=2592000
# subsonic — key that encrypts per-user Subsonic app-passwords at rest.
# GENERATE a strong secret for prod (`openssl rand -hex 32`); rotating it
# invalidates all stored app-passwords. NOTE: /rest must be served over HTTPS.
SUBSONIC_SECRET_KEY=change-me-subsonic-key
# media / storage
MEDIA_PATH=/data/media
TRANSCODE_CACHE_PATH=/data/transcode-cache
MAX_PARALLEL_DOWNLOADS=2
# sources — mounted folder the `local` source indexes (copies into MEDIA_PATH).
# Unset → the local source is not registered. Mount read-only in compose.
# LOCAL_MEDIA_IMPORT_PATH=/import
# external services (all optional — backend degrades gracefully if unset)
# ML_SERVICE_URL=http://ml:9000
# ACOUSTID_API_KEY=
+54
View File
@@ -70,3 +70,57 @@ The DB URL is injected from app settings — never hardcoded in `alembic.ini`.
All settings come from environment variables (or `.env` in dev). See
[`.env.example`](.env.example). External services (ML, AcoustID, MusicBrainz)
are **optional** — the backend degrades gracefully when they are absent.
## Sources & importing music
Music enters the library through **source backends** (`app/infrastructure/sources`),
selected via a registry. The first backend is **`local`** — it indexes a mounted
folder, copying each audio file into managed storage and creating a track
(`metadata_status=pending`; real metadata is filled later by enrichment).
```bash
# point the instance at an existing library (mount read-only in compose)
LOCAL_MEDIA_IMPORT_PATH=/import
GET /api/v1/sources # list configured sources + availability
POST /api/v1/sources/local/scan # admin: enqueue an import (runs in the worker)
GET /api/v1/sources/local/health # availability check
```
Scanning is a background job (arq worker) — the endpoint only enqueues it; the
walk + file copies never run in the request cycle. Re-scans are idempotent
(dedup on `(source, source_id)`, where `source_id` is the path within the root).
## Subsonic API (`/rest`)
A Subsonic-compatible API is mounted at `/rest`, so standard clients (Symfonium,
DSub, play:Sub, …) can browse the library and stream. It is a thin adapter over
the native services — it adds no business logic of its own.
**HTTPS is mandatory.** Subsonic authentication puts the credential in the URL
(`t=md5(password+salt)&s=…`, or the legacy `p=`), so `/rest` must only ever be
exposed behind TLS (terminate at the reverse proxy). Never serve it over plain
HTTP.
### App-passwords
Subsonic auth needs a recoverable secret, but login passwords are stored as a
one-way argon2 hash. So Subsonic clients authenticate against a separate,
per-user **app-password** — high-entropy, random, and encrypted at rest with a
key derived from `SUBSONIC_SECRET_KEY` (set this to a strong random string in
prod; rotating it invalidates all stored app-passwords).
Self-service lifecycle (native API, needs a normal JWT login):
```bash
GET /api/v1/users/me/subsonic-password # reveal (generated lazily on first read)
POST /api/v1/users/me/subsonic-password # rotate
# admin, for any user:
POST /api/v1/admin/users/{user_id}/subsonic-password
```
Point the client at the instance URL, use your **username** + the revealed
**app-password** (not your login password).
> **Cover art** (`getCoverArt`) currently returns a placeholder — the cover
> pipeline (`/api/v1/.../cover` endpoints) is not implemented yet.
@@ -0,0 +1,32 @@
"""subsonic: per-user encrypted app-password
Revision ID: 20260608_subsonic_pw
Revises: 20260608_storage_uri
Create Date: 2026-06-08 12:00:00.000000
Adds ``users.subsonic_password_enc`` — the recoverable, Fernet-encrypted
Subsonic app-password (plan §7). NULL until the user generates one.
"""
from __future__ import annotations
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
revision: str = "20260608_subsonic_pw"
down_revision: str | None = "20260608_storage_uri"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.add_column(
"users",
sa.Column("subsonic_password_enc", sa.String(length=255), nullable=True),
)
def downgrade() -> None:
op.drop_column("users", "subsonic_password_enc")
+54 -3
View File
@@ -10,19 +10,20 @@ from collections.abc import AsyncIterator
from functools import lru_cache
from typing import Annotated
from fastapi import Depends
from fastapi import Depends, Query
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.ext.asyncio import AsyncSession
from app.application.auth_service import AuthService
from app.application.streaming_service import StreamingService
from app.application.subsonic_auth_service import SubsonicAuthService
from app.application.upload_service import UploadService
from app.application.user_service import UserService
from app.core.config import get_settings
from app.core.security import Argon2PasswordHasher, JwtTokenService
from app.core.security import Argon2PasswordHasher, JwtTokenService, SubsonicPasswordCipher
from app.domain.entities import User
from app.domain.errors import AuthenticationError, PermissionDeniedError
from app.domain.ports import FileStorage, PasswordHasher, TokenService
from app.domain.ports import FileStorage, PasswordHasher, SubsonicCipher, TokenService
from app.infrastructure.db import get_sessionmaker
from app.infrastructure.db.repositories import (
SqlAlchemyAlbumRepository,
@@ -34,7 +35,9 @@ from app.infrastructure.db.repositories import (
SqlAlchemyTrackRepository,
SqlAlchemyUserRepository,
)
from app.infrastructure.sources.registry import SourceRegistry, build_source_registry
from app.infrastructure.storage.provider import get_file_storage
from app.workers.queue import enqueue_enrich
async def get_session() -> AsyncIterator[AsyncSession]:
@@ -64,6 +67,19 @@ def get_token_service() -> TokenService:
return JwtTokenService(get_settings())
@lru_cache
def get_subsonic_cipher() -> SubsonicCipher:
return SubsonicPasswordCipher(get_settings().subsonic_secret_key.get_secret_value())
@lru_cache
def get_source_registry() -> SourceRegistry:
return build_source_registry(get_settings())
SourceRegistryDep = Annotated[SourceRegistry, Depends(get_source_registry)]
# -- request-scoped services ---------------------------------------------------
def get_auth_service(session: SessionDep) -> AuthService:
return AuthService(
@@ -82,8 +98,16 @@ def get_user_service(session: SessionDep) -> UserService:
)
def get_subsonic_auth_service(session: SessionDep) -> SubsonicAuthService:
return SubsonicAuthService(
users=SqlAlchemyUserRepository(session),
cipher=get_subsonic_cipher(),
)
AuthServiceDep = Annotated[AuthService, Depends(get_auth_service)]
UserServiceDep = Annotated[UserService, Depends(get_user_service)]
SubsonicAuthServiceDep = Annotated[SubsonicAuthService, Depends(get_subsonic_auth_service)]
# -- file storage (process-cached) ---------------------------------------------
@@ -97,6 +121,7 @@ def get_upload_service(session: SessionDep, storage: FileStorageDep) -> UploadSe
artists=SqlAlchemyArtistRepository(session),
storage=storage,
tmp_dir=settings.upload_tmp_dir,
enqueue_enrich=enqueue_enrich,
)
@@ -187,3 +212,29 @@ async def get_streaming_user(
StreamUser = Annotated[User, Depends(get_streaming_user)]
# -- subsonic (/rest) authentication -------------------------------------------
# Subsonic puts credentials in the query string: u + (t & s) | p, plus c/v/f.
# The dep extracts them and delegates verification to the service; domain errors
# propagate to the rest-aware exception handler, which renders the Subsonic
# error envelope (HTTP 200). HTTPS is mandatory — the secret rides in the URL.
async def get_subsonic_user(
service: SubsonicAuthServiceDep,
u: Annotated[str | None, Query()] = None,
t: Annotated[str | None, Query()] = None,
s: Annotated[str | None, Query()] = None,
p: Annotated[str | None, Query()] = None,
) -> User:
return await service.authenticate(username=u, token=t, salt=s, password=p)
SubsonicUser = Annotated[User, Depends(get_subsonic_user)]
async def get_subsonic_format(f: Annotated[str | None, Query()] = None) -> str | None:
"""The requested response format (``f``): ``xml`` (default) or ``json``."""
return f
SubsonicFormat = Annotated[str | None, Depends(get_subsonic_format)]
+33 -4
View File
@@ -1,8 +1,15 @@
"""Maps domain exceptions to HTTP responses. The only place that knows both."""
"""Maps domain exceptions to HTTP responses. The only place that knows both.
from fastapi import FastAPI, Request, status
Two surfaces share this mapping: the native ``/api/v1`` API answers with a JSON
error body and an HTTP status code, while the Subsonic ``/rest`` layer answers
with its own envelope and **always HTTP 200** (the status lives in the body). A
request is routed to the Subsonic renderer by path prefix.
"""
from fastapi import FastAPI, Request, Response, status
from fastapi.responses import JSONResponse
from app.api.rest.envelope import subsonic_error
from app.core.logging import get_logger
from app.domain.errors import (
AlreadyExistsError,
@@ -30,6 +37,21 @@ _STATUS_BY_ERROR: dict[type[DomainError], int] = {
StorageError: status.HTTP_500_INTERNAL_SERVER_ERROR,
}
# Subsonic error codes (subsonic.org/restapi): 10 missing param, 40 wrong
# credentials, 50 not authorized, 70 not found, 0 generic.
_SUBSONIC_CODE_BY_ERROR: dict[type[DomainError], int] = {
ValidationError: 10,
AuthenticationError: 40,
PermissionDeniedError: 50,
NotFoundError: 70,
}
_SUBSONIC_PREFIX = "/rest"
def _is_subsonic(request: Request) -> bool:
return request.url.path.startswith(_SUBSONIC_PREFIX)
def _error_body(code: str, message: str) -> dict[str, dict[str, str]]:
return {"error": {"code": code, "message": message}}
@@ -45,7 +67,10 @@ def register_exception_handlers(app: FastAPI) -> None:
)
@app.exception_handler(DomainError)
async def _handle_domain_error(_request: Request, exc: DomainError) -> JSONResponse:
async def _handle_domain_error(request: Request, exc: DomainError) -> Response:
if _is_subsonic(request):
code = _SUBSONIC_CODE_BY_ERROR.get(type(exc), 0)
return subsonic_error(code, exc.message, fmt=request.query_params.get("f"))
http_status = _STATUS_BY_ERROR.get(type(exc), status.HTTP_400_BAD_REQUEST)
return JSONResponse(
status_code=http_status,
@@ -53,8 +78,12 @@ def register_exception_handlers(app: FastAPI) -> None:
)
@app.exception_handler(Exception)
async def _handle_unexpected(_request: Request, exc: Exception) -> JSONResponse:
async def _handle_unexpected(request: Request, exc: Exception) -> Response:
log.error("unhandled_exception", exc_info=exc)
if _is_subsonic(request):
return subsonic_error(
0, "An unexpected error occurred.", fmt=request.query_params.get("f")
)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=_error_body("internal_error", "An unexpected error occurred."),
+89 -11
View File
@@ -1,23 +1,101 @@
"""Subsonic annotation endpoints: star, rating, scrobble."""
"""Subsonic annotation endpoints: star/unstar, rating, scrobble.
from typing import Any
* ``star``/``unstar`` map to the **append-only** like event-log (a new event per
call — never a mutated boolean; CLAUDE.md invariant). Album/artist stars are
accepted but not persisted (no album/artist likes yet).
* ``scrobble`` appends to play history.
* ``setRating`` has no backing store yet — it's accepted as a clean no-op.
"""
from fastapi import APIRouter
import datetime as dt
from typing import Annotated
from fastapi import APIRouter, Query, Response
from app.api.deps import HistoryRepoDep, LikeRepoDep, SubsonicFormat, SubsonicUser, TrackRepoDep
from app.api.rest.envelope import subsonic_response
from app.api.rest.ids import decode_track
from app.domain.errors import NotFoundError
router = APIRouter()
@router.get("/star")
async def star() -> Any: ...
@router.api_route("/star", methods=["GET", "POST"])
@router.api_route("/star.view", methods=["GET", "POST"])
async def star(
user: SubsonicUser,
fmt: SubsonicFormat,
like_repo: LikeRepoDep,
track_repo: TrackRepoDep,
id: Annotated[list[str] | None, Query()] = None,
albumId: Annotated[list[str] | None, Query()] = None,
artistId: Annotated[list[str] | None, Query()] = None,
) -> Response:
# albumId/artistId are accepted for client compatibility but not persisted.
for raw in id or []:
track_id = decode_track(raw)
if await track_repo.get_by_id(track_id) is None:
raise NotFoundError("Song not found.")
await like_repo.add(user_id=user.id, track_id=track_id, value="like")
return subsonic_response(fmt=fmt)
@router.get("/unstar")
async def unstar() -> Any: ...
@router.api_route("/unstar", methods=["GET", "POST"])
@router.api_route("/unstar.view", methods=["GET", "POST"])
async def unstar(
user: SubsonicUser,
fmt: SubsonicFormat,
like_repo: LikeRepoDep,
track_repo: TrackRepoDep,
id: Annotated[list[str] | None, Query()] = None,
albumId: Annotated[list[str] | None, Query()] = None,
artistId: Annotated[list[str] | None, Query()] = None,
) -> Response:
for raw in id or []:
track_id = decode_track(raw)
if await track_repo.get_by_id(track_id) is None:
raise NotFoundError("Song not found.")
await like_repo.add(user_id=user.id, track_id=track_id, value="neutral")
return subsonic_response(fmt=fmt)
@router.get("/setRating")
async def set_rating() -> Any: ...
@router.api_route("/setRating", methods=["GET", "POST"])
@router.api_route("/setRating.view", methods=["GET", "POST"])
async def set_rating(
_user: SubsonicUser,
fmt: SubsonicFormat,
id: Annotated[str, Query()],
rating: Annotated[int, Query(ge=0, le=5)],
) -> Response:
# No rating store yet — accept cleanly so clients don't error.
return subsonic_response(fmt=fmt)
@router.get("/scrobble")
async def scrobble() -> Any: ...
@router.api_route("/scrobble", methods=["GET", "POST"])
@router.api_route("/scrobble.view", methods=["GET", "POST"])
async def scrobble(
user: SubsonicUser,
fmt: SubsonicFormat,
history_repo: HistoryRepoDep,
track_repo: TrackRepoDep,
id: Annotated[list[str] | None, Query()] = None,
time: Annotated[list[int] | None, Query()] = None,
submission: Annotated[bool, Query()] = True,
) -> Response:
times = time or []
for index, raw in enumerate(id or []):
track_id = decode_track(raw)
if await track_repo.get_by_id(track_id) is None:
raise NotFoundError("Song not found.")
if index < len(times):
played_at = dt.datetime.fromtimestamp(times[index] / 1000, tz=dt.UTC)
else:
played_at = dt.datetime.now(dt.UTC)
await history_repo.add(
user_id=user.id,
track_id=track_id,
played_at=played_at,
play_duration_seconds=None,
completed=submission,
)
return subsonic_response(fmt=fmt)
+254 -24
View File
@@ -1,47 +1,277 @@
"""Subsonic browsing endpoints."""
"""Subsonic browsing endpoints — thin adapters over the library repositories.
from typing import Any
A single synthetic music folder (id ``0``) is exposed; this is a homelab, not a
multi-library server. Heavy lifting stays in the repositories; these handlers
only fan out queries and reshape rows into the Subsonic element dicts.
"""
from fastapi import APIRouter
from typing import Annotated, Any
from fastapi import APIRouter, Query, Response
from app.api.deps import AlbumRepoDep, ArtistRepoDep, SubsonicFormat, SubsonicUser, TrackRepoDep
from app.api.rest.envelope import subsonic_response
from app.api.rest.ids import IdKind, encode_album, encode_artist, parse
from app.api.rest.serializers import album_dict, artist_dict, iso, song_dict
from app.domain.entities import Album, Artist
from app.domain.errors import NotFoundError
router = APIRouter()
@router.get("/getMusicFolders")
async def get_music_folders() -> Any: ...
_IGNORED_ARTICLES = "The El La Los Las Le Les"
_MAX_ARTISTS = 10_000 # homelab scale; one pass is fine
@router.get("/getIndexes")
async def get_indexes() -> Any: ...
async def _artists_index(artist_repo: ArtistRepoDep) -> list[dict[str, Any]]:
"""Group artists into Subsonic A-Z index buckets, each with an album count."""
artists = await artist_repo.list(q=None, limit=_MAX_ARTISTS, offset=0)
buckets: dict[str, list[dict[str, Any]]] = {}
for artist in artists:
album_count = await artist_repo.album_count(artist.id)
letter = artist.name[:1].upper()
if not letter.isalpha():
letter = "#"
buckets.setdefault(letter, []).append(artist_dict(artist, album_count=album_count))
return [{"name": name, "artist": buckets[name]} for name in sorted(buckets)]
@router.get("/getMusicDirectory")
async def get_music_directory() -> Any: ...
async def _albums_for_artist(artist: Artist, album_repo: AlbumRepoDep) -> list[dict[str, Any]]:
albums = await album_repo.list(artist_id=artist.id, q=None, limit=500, offset=0)
counts = await album_repo.track_count_many([a.id for a in albums])
return [album_dict(a, artist, song_count=counts.get(a.id, 0)) for a in albums]
@router.get("/getArtists")
async def get_artists() -> Any: ...
@router.api_route("/getMusicFolders", methods=["GET", "POST"])
@router.api_route("/getMusicFolders.view", methods=["GET", "POST"])
async def get_music_folders(_user: SubsonicUser, fmt: SubsonicFormat) -> Response:
return subsonic_response(
{"musicFolders": {"musicFolder": [{"id": 0, "name": "Music"}]}}, fmt=fmt
)
@router.get("/getArtist")
async def get_artist() -> Any: ...
@router.api_route("/getIndexes", methods=["GET", "POST"])
@router.api_route("/getIndexes.view", methods=["GET", "POST"])
async def get_indexes(
_user: SubsonicUser, fmt: SubsonicFormat, artist_repo: ArtistRepoDep
) -> Response:
index = await _artists_index(artist_repo)
return subsonic_response(
{"indexes": {"ignoredArticles": _IGNORED_ARTICLES, "lastModified": 0, "index": index}},
fmt=fmt,
)
@router.get("/getAlbum")
async def get_album() -> Any: ...
@router.api_route("/getArtists", methods=["GET", "POST"])
@router.api_route("/getArtists.view", methods=["GET", "POST"])
async def get_artists(
_user: SubsonicUser, fmt: SubsonicFormat, artist_repo: ArtistRepoDep
) -> Response:
index = await _artists_index(artist_repo)
return subsonic_response(
{"artists": {"ignoredArticles": _IGNORED_ARTICLES, "index": index}}, fmt=fmt
)
@router.get("/getAlbumList")
async def get_album_list() -> Any: ...
@router.api_route("/getArtist", methods=["GET", "POST"])
@router.api_route("/getArtist.view", methods=["GET", "POST"])
async def get_artist(
_user: SubsonicUser,
fmt: SubsonicFormat,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
id: Annotated[str, Query()],
) -> Response:
_, artist_id = parse(id)
artist = await artist_repo.get_by_id(artist_id)
if artist is None:
raise NotFoundError("Artist not found.")
albums = await _albums_for_artist(artist, album_repo)
payload = {
**artist_dict(artist, album_count=len(albums)),
"album": albums,
}
return subsonic_response({"artist": payload}, fmt=fmt)
@router.get("/getAlbumList2")
async def get_album_list2() -> Any: ...
@router.api_route("/getAlbum", methods=["GET", "POST"])
@router.api_route("/getAlbum.view", methods=["GET", "POST"])
async def get_album(
_user: SubsonicUser,
fmt: SubsonicFormat,
album_repo: AlbumRepoDep,
artist_repo: ArtistRepoDep,
track_repo: TrackRepoDep,
id: Annotated[str, Query()],
) -> Response:
_, album_id = parse(id)
album = await album_repo.get_by_id(album_id)
if album is None:
raise NotFoundError("Album not found.")
artist = await artist_repo.get_by_id(album.artist_id)
tracks = await track_repo.list(
artist_id=None,
album_id=album_id,
q=None,
sort_by="title",
order="asc",
limit=500,
offset=0,
)
duration = sum(t.duration_seconds or 0 for t in tracks)
songs = [song_dict(t, artist, album) for t in tracks]
payload = {
**album_dict(album, artist, song_count=len(songs), duration=duration),
"song": songs,
}
return subsonic_response({"album": payload}, fmt=fmt)
@router.get("/getSong")
async def get_song() -> Any: ...
@router.api_route("/getAlbumList", methods=["GET", "POST"])
@router.api_route("/getAlbumList.view", methods=["GET", "POST"])
async def get_album_list(
_user: SubsonicUser,
fmt: SubsonicFormat,
album_repo: AlbumRepoDep,
artist_repo: ArtistRepoDep,
type: Annotated[str, Query()] = "newest",
size: Annotated[int, Query(ge=1, le=500)] = 10,
offset: Annotated[int, Query(ge=0)] = 0,
) -> Response:
albums = await _list_albums(album_repo, artist_repo, type, size, offset)
return subsonic_response({"albumList": {"album": albums}}, fmt=fmt)
@router.get("/getGenres")
async def get_genres() -> Any: ...
@router.api_route("/getAlbumList2", methods=["GET", "POST"])
@router.api_route("/getAlbumList2.view", methods=["GET", "POST"])
async def get_album_list2(
_user: SubsonicUser,
fmt: SubsonicFormat,
album_repo: AlbumRepoDep,
artist_repo: ArtistRepoDep,
type: Annotated[str, Query()] = "newest",
size: Annotated[int, Query(ge=1, le=500)] = 10,
offset: Annotated[int, Query(ge=0)] = 0,
) -> Response:
albums = await _list_albums(album_repo, artist_repo, type, size, offset)
return subsonic_response({"albumList2": {"album": albums}}, fmt=fmt)
async def _list_albums(
album_repo: AlbumRepoDep,
artist_repo: ArtistRepoDep,
type_: str,
size: int,
offset: int,
) -> list[dict[str, Any]]:
if type_ == "alphabeticalByName":
sort_by, order = "title", "asc"
elif type_ == "random":
sort_by, order = "title", "random"
else: # newest / recent / frequent → newest (no play stats yet)
sort_by, order = "created", "desc"
albums = await album_repo.list(
artist_id=None, q=None, limit=size, offset=offset, sort_by=sort_by, order=order
)
return await _decorate_albums(albums, album_repo, artist_repo)
async def _decorate_albums(
albums: list[Album], album_repo: AlbumRepoDep, artist_repo: ArtistRepoDep
) -> list[dict[str, Any]]:
artist_ids = list({a.artist_id for a in albums})
artists = {a.id: a for a in await artist_repo.get_many(artist_ids)}
counts = await album_repo.track_count_many([a.id for a in albums])
return [album_dict(a, artists.get(a.artist_id), song_count=counts.get(a.id, 0)) for a in albums]
@router.api_route("/getSong", methods=["GET", "POST"])
@router.api_route("/getSong.view", methods=["GET", "POST"])
async def get_song(
_user: SubsonicUser,
fmt: SubsonicFormat,
track_repo: TrackRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
id: Annotated[str, Query()],
) -> Response:
_, track_id = parse(id)
track = await track_repo.get_by_id(track_id)
if track is None:
raise NotFoundError("Song not found.")
artist = await artist_repo.get_by_id(track.artist_id)
album = await album_repo.get_by_id(track.album_id) if track.album_id else None
return subsonic_response({"song": song_dict(track, artist, album)}, fmt=fmt)
@router.api_route("/getGenres", methods=["GET", "POST"])
@router.api_route("/getGenres.view", methods=["GET", "POST"])
async def get_genres(
_user: SubsonicUser, fmt: SubsonicFormat, track_repo: TrackRepoDep
) -> Response:
genres = [
{"value": name, "songCount": count, "albumCount": 0}
for name, count in await track_repo.genres()
]
return subsonic_response({"genres": {"genre": genres}}, fmt=fmt)
@router.api_route("/getMusicDirectory", methods=["GET", "POST"])
@router.api_route("/getMusicDirectory.view", methods=["GET", "POST"])
async def get_music_directory(
_user: SubsonicUser,
fmt: SubsonicFormat,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
track_repo: TrackRepoDep,
id: Annotated[str, Query()],
) -> Response:
kind, entity_id = parse(id)
if kind is IdKind.ARTIST:
artist = await artist_repo.get_by_id(entity_id)
if artist is None:
raise NotFoundError("Artist not found.")
albums = await album_repo.list(artist_id=artist.id, q=None, limit=500, offset=0)
counts = await album_repo.track_count_many([a.id for a in albums])
children = [
{
"id": encode_album(a.id),
"parent": encode_artist(artist.id),
"isDir": True,
"title": a.title,
"name": a.title,
"artist": artist.name,
"artistId": encode_artist(artist.id),
"coverArt": encode_album(a.id),
"songCount": counts.get(a.id, 0),
"created": iso(a.created_at),
"year": a.year,
}
for a in albums
]
directory = {"id": id, "name": artist.name, "child": children}
return subsonic_response({"directory": directory}, fmt=fmt)
if kind is IdKind.ALBUM:
album = await album_repo.get_by_id(entity_id)
if album is None:
raise NotFoundError("Album not found.")
artist = await artist_repo.get_by_id(album.artist_id)
tracks = await track_repo.list(
artist_id=None,
album_id=album.id,
q=None,
sort_by="title",
order="asc",
limit=500,
offset=0,
)
children = [song_dict(t, artist, album) for t in tracks]
directory = {
"id": id,
"parent": encode_artist(album.artist_id),
"name": album.title,
"child": children,
}
return subsonic_response({"directory": directory}, fmt=fmt)
raise NotFoundError("Directory not found.")
+102
View File
@@ -0,0 +1,102 @@
"""The Subsonic response envelope — one serializer, two wire formats.
Every Subsonic endpoint answers with a ``<subsonic-response>`` wrapper carrying
``status`` / ``version`` / ``type`` / ``serverVersion``, in XML (default) or JSON
(``f=json``). All handlers return through :func:`subsonic_response`; errors go
through the rest-aware exception handler (see ``app.api.errors``).
Payload data model (shared by both formats):
* a scalar value → an XML attribute / a JSON field
* a nested dict → a single child element / nested object
* a list of dicts → repeated child elements / a JSON array
* the key ``"value"`` → element text content (used by e.g. lyrics)
``None`` values are dropped. Subsonic always replies with **HTTP 200**, even for
errors — the status lives inside the envelope — so clients parse the body.
"""
import json
from collections.abc import Mapping
from typing import Any
from xml.etree import ElementTree as ET
from fastapi import Response
SUBSONIC_API_VERSION = "1.16.1"
SERVER_TYPE = "mcma"
SERVER_VERSION = "0.1.0"
_XML_NS = "http://subsonic.org/restapi"
_XML_MEDIA_TYPE = "application/xml; charset=utf-8"
_JSON_MEDIA_TYPE = "application/json; charset=utf-8"
def _is_json(fmt: str | None) -> bool:
return fmt in ("json", "jsonp")
def _scalar(value: object) -> str:
if isinstance(value, bool):
return "true" if value else "false"
return str(value)
def _build_xml(parent: ET.Element, data: Mapping[str, Any]) -> None:
for key, value in data.items():
if value is None:
continue
if key == "value":
parent.text = _scalar(value)
elif isinstance(value, Mapping):
_build_xml(ET.SubElement(parent, key), value)
elif isinstance(value, list):
for item in value:
_build_xml(ET.SubElement(parent, key), item)
else:
parent.set(key, _scalar(value))
def _strip_none(value: Any) -> Any:
"""Recursively drop ``None`` values so JSON output matches XML (no empty attrs)."""
if isinstance(value, Mapping):
return {k: _strip_none(v) for k, v in value.items() if v is not None}
if isinstance(value, list):
return [_strip_none(v) for v in value]
return value
def _render(body: Mapping[str, Any], fmt: str | None) -> Response:
envelope: dict[str, Any] = {
"status": body["status"],
"version": SUBSONIC_API_VERSION,
"type": SERVER_TYPE,
"serverVersion": SERVER_VERSION,
"openSubsonic": True,
**{k: v for k, v in body.items() if k != "status"},
}
if _is_json(fmt):
payload = json.dumps({"subsonic-response": _strip_none(envelope)})
return Response(content=payload, media_type=_JSON_MEDIA_TYPE)
root = ET.Element("subsonic-response", {"xmlns": _XML_NS})
_build_xml(root, envelope)
xml = b'<?xml version="1.0" encoding="UTF-8"?>\n' + ET.tostring(root, encoding="utf-8")
return Response(content=xml, media_type=_XML_MEDIA_TYPE)
def subsonic_response(
payload: Mapping[str, Any] | None = None, *, fmt: str | None = None
) -> Response:
"""A successful ``status="ok"`` envelope wrapping ``payload``."""
body: dict[str, Any] = {"status": "ok"}
if payload:
body.update(payload)
return _render(body, fmt)
def subsonic_error(code: int, message: str, *, fmt: str | None = None) -> Response:
"""A ``status="failed"`` envelope carrying a Subsonic ``<error>``."""
body = {"status": "failed", "error": {"code": code, "message": message}}
return _render(body, fmt)
+75
View File
@@ -0,0 +1,75 @@
"""Stable, reversible mapping between Subsonic opaque string ids and our UUIDs.
Subsonic ids are opaque strings; ours are UUIDs. We use a type-prefixed,
human-debuggable convention (``tr-<uuid>`` track, ``al-<uuid>`` album,
``ar-<uuid>`` artist, ``pl-<uuid>`` playlist). Cover-art ids reuse the entity's
own id (an album cover is ``al-<uuid>``, a track cover ``tr-<uuid>``). Centralize
encode/decode here so the convention lives in exactly one place.
"""
import uuid
from enum import StrEnum
from app.domain.errors import NotFoundError
class IdKind(StrEnum):
TRACK = "tr"
ALBUM = "al"
ARTIST = "ar"
PLAYLIST = "pl"
def encode(kind: IdKind, value: uuid.UUID) -> str:
return f"{kind.value}-{value}"
def encode_track(value: uuid.UUID) -> str:
return encode(IdKind.TRACK, value)
def encode_album(value: uuid.UUID) -> str:
return encode(IdKind.ALBUM, value)
def encode_artist(value: uuid.UUID) -> str:
return encode(IdKind.ARTIST, value)
def encode_playlist(value: uuid.UUID) -> str:
return encode(IdKind.PLAYLIST, value)
def parse(raw: str) -> tuple[IdKind, uuid.UUID]:
"""Decode any prefixed id into its kind + UUID. Raises ``NotFoundError`` on a
malformed id (an unknown id is, from the client's view, simply not found)."""
prefix, _, rest = raw.partition("-")
try:
kind = IdKind(prefix)
value = uuid.UUID(rest)
except ValueError as exc:
raise NotFoundError(f"Unknown id {raw!r}.") from exc
return kind, value
def _decode_as(raw: str, expected: IdKind) -> uuid.UUID:
kind, value = parse(raw)
if kind is not expected:
raise NotFoundError(f"Expected a {expected.name.lower()} id, got {raw!r}.")
return value
def decode_track(raw: str) -> uuid.UUID:
return _decode_as(raw, IdKind.TRACK)
def decode_album(raw: str) -> uuid.UUID:
return _decode_as(raw, IdKind.ALBUM)
def decode_artist(raw: str) -> uuid.UUID:
return _decode_as(raw, IdKind.ARTIST)
def decode_playlist(raw: str) -> uuid.UUID:
return _decode_as(raw, IdKind.PLAYLIST)
+69 -10
View File
@@ -1,19 +1,78 @@
"""Subsonic media endpoints: stream, download, cover art."""
"""Subsonic media endpoints: stream, download, cover art.
from typing import Any
``stream`` and ``download`` reuse :class:`StreamingService` (honouring HTTP
Range) — they return raw bytes, not the Subsonic envelope. Transcoding params
(``maxBitRate``/``format``) are accepted but ignored; the original file is served
(no in-request ffmpeg — CLAUDE.md). ``getCoverArt`` returns a placeholder until
the cover pipeline lands (the ``/api/v1`` cover endpoints are still stubs).
"""
from fastapi import APIRouter
import base64
from typing import Annotated
from fastapi import APIRouter, Header, Query
from fastapi.responses import Response, StreamingResponse
from app.api.deps import StreamingServiceDep, SubsonicUser, TrackRepoDep
from app.api.rest.ids import decode_track, parse
from app.domain.errors import NotFoundError
router = APIRouter()
@router.get("/stream")
async def stream() -> Any: ...
# 1x1 transparent PNG - a graceful placeholder until cover art is wired up.
_PLACEHOLDER_PNG = base64.b64decode(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mNk+M8AAAMBAQDJ/pLvAAAAAElFTkSuQmCC"
)
@router.get("/download")
async def download() -> Any: ...
@router.api_route("/stream", methods=["GET", "POST"])
@router.api_route("/stream.view", methods=["GET", "POST"])
async def stream(
_user: SubsonicUser,
service: StreamingServiceDep,
id: Annotated[str, Query()],
range_header: Annotated[str | None, Header(alias="Range")] = None,
) -> StreamingResponse:
result = await service.open_stream(decode_track(id), range_header)
headers = {"Accept-Ranges": "bytes", "Content-Length": str(result.content_length)}
status_code = 200
if result.is_partial:
headers["Content-Range"] = f"bytes {result.start}-{result.end}/{result.total_size}"
status_code = 206
return StreamingResponse(
result.stream, status_code=status_code, headers=headers, media_type=result.content_type
)
@router.get("/getCoverArt")
async def get_cover_art() -> Any: ...
@router.api_route("/download", methods=["GET", "POST"])
@router.api_route("/download.view", methods=["GET", "POST"])
async def download(
_user: SubsonicUser,
service: StreamingServiceDep,
track_repo: TrackRepoDep,
id: Annotated[str, Query()],
) -> StreamingResponse:
track_id = decode_track(id)
track = await track_repo.get_by_id(track_id)
if track is None:
raise NotFoundError("Song not found.")
result = await service.open_stream(track_id, None)
filename = f"{track.title}.{track.file_format}"
headers = {
"Content-Length": str(result.content_length),
"Content-Disposition": f'attachment; filename="{filename}"',
}
return StreamingResponse(result.stream, headers=headers, media_type=result.content_type)
@router.api_route("/getCoverArt", methods=["GET", "POST"])
@router.api_route("/getCoverArt.view", methods=["GET", "POST"])
async def get_cover_art(
_user: SubsonicUser,
id: Annotated[str, Query()],
size: Annotated[int | None, Query()] = None,
) -> Response:
# Validate the id shape so clients get a clean error on garbage, then serve a
# placeholder. TODO: stream real covers once the cover pipeline exists.
parse(id)
return Response(content=_PLACEHOLDER_PNG, media_type="image/png")
+168 -13
View File
@@ -1,27 +1,182 @@
"""Subsonic playlist endpoints."""
"""Subsonic playlist endpoints — adapters over the playlist repository.
from typing import Any
Playlists are private to their owner (no public-playlist concept yet), so every
read/write is scoped to the authenticated user. ``createPlaylist`` doubles as a
full replace when given a ``playlistId`` (Subsonic overloads it that way).
"""
from fastapi import APIRouter
from typing import Annotated, Any
from fastapi import APIRouter, Query, Response
from app.api.deps import (
AlbumRepoDep,
ArtistRepoDep,
PlaylistRepoDep,
SubsonicFormat,
SubsonicUser,
)
from app.api.rest.envelope import subsonic_response
from app.api.rest.ids import decode_playlist, decode_track, encode_playlist
from app.api.rest.serializers import iso, song_dict
from app.domain.entities import Playlist, User
from app.domain.errors import NotFoundError, PermissionDeniedError
router = APIRouter()
@router.get("/getPlaylists")
async def get_playlists() -> Any: ...
def _playlist_dict(playlist: Playlist, owner: str, *, song_count: int) -> dict[str, Any]:
return {
"id": encode_playlist(playlist.id),
"name": playlist.name,
"comment": playlist.description,
"owner": owner,
"public": False,
"songCount": song_count,
"created": iso(playlist.created_at),
"changed": iso(playlist.updated_at),
}
@router.get("/getPlaylist")
async def get_playlist() -> Any: ...
async def _owned_playlist(
playlist_id_raw: str, playlist_repo: PlaylistRepoDep, user: User
) -> Playlist:
playlist = await playlist_repo.get_by_id(decode_playlist(playlist_id_raw))
if playlist is None:
raise NotFoundError("Playlist not found.")
if playlist.owner_id != user.id:
raise PermissionDeniedError("You don't own this playlist.")
return playlist
@router.get("/createPlaylist")
async def create_playlist() -> Any: ...
async def _playlist_songs(
playlist_id: str,
playlist_repo: PlaylistRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
) -> list[dict[str, Any]]:
tracks = await playlist_repo.get_tracks(decode_playlist(playlist_id), limit=10_000, offset=0)
artist_map = {a.id: a for a in await artist_repo.get_many(list({t.artist_id for t in tracks}))}
album_map = {
a.id: a
for a in await album_repo.get_many(
list({t.album_id for t in tracks if t.album_id is not None})
)
}
return [
song_dict(
t,
artist_map.get(t.artist_id),
album_map.get(t.album_id) if t.album_id is not None else None,
)
for t in tracks
]
@router.get("/updatePlaylist")
async def update_playlist() -> Any: ...
@router.api_route("/getPlaylists", methods=["GET", "POST"])
@router.api_route("/getPlaylists.view", methods=["GET", "POST"])
async def get_playlists(
user: SubsonicUser, fmt: SubsonicFormat, playlist_repo: PlaylistRepoDep
) -> Response:
playlists = await playlist_repo.list(owner_id=user.id, limit=500, offset=0)
counts = await playlist_repo.track_count_many([p.id for p in playlists])
items = [_playlist_dict(p, user.username, song_count=counts.get(p.id, 0)) for p in playlists]
return subsonic_response({"playlists": {"playlist": items}}, fmt=fmt)
@router.get("/deletePlaylist")
async def delete_playlist() -> Any: ...
@router.api_route("/getPlaylist", methods=["GET", "POST"])
@router.api_route("/getPlaylist.view", methods=["GET", "POST"])
async def get_playlist(
user: SubsonicUser,
fmt: SubsonicFormat,
playlist_repo: PlaylistRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
id: Annotated[str, Query()],
) -> Response:
playlist = await _owned_playlist(id, playlist_repo, user)
songs = await _playlist_songs(id, playlist_repo, artist_repo, album_repo)
payload = {**_playlist_dict(playlist, user.username, song_count=len(songs)), "entry": songs}
return subsonic_response({"playlist": payload}, fmt=fmt)
@router.api_route("/createPlaylist", methods=["GET", "POST"])
@router.api_route("/createPlaylist.view", methods=["GET", "POST"])
async def create_playlist(
user: SubsonicUser,
fmt: SubsonicFormat,
playlist_repo: PlaylistRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
name: Annotated[str | None, Query()] = None,
playlistId: Annotated[str | None, Query()] = None,
songId: Annotated[list[str] | None, Query()] = None,
) -> Response:
song_ids = [decode_track(s) for s in (songId or [])]
if playlistId is not None:
# Overloaded form: replace the existing playlist's tracks (and name).
playlist = await _owned_playlist(playlistId, playlist_repo, user)
if name is not None:
playlist = await playlist_repo.update(playlist.id, name=name, description=None)
existing = await playlist_repo.get_tracks(playlist.id, limit=10_000, offset=0)
for t in existing:
await playlist_repo.remove_track(playlist.id, t.id)
else:
playlist = await playlist_repo.add(
name=name or "Untitled", description=None, owner_id=user.id
)
for position, track_id in enumerate(song_ids, start=1):
await playlist_repo.add_track(playlist.id, track_id, position=float(position))
encoded = encode_playlist(playlist.id)
songs = await _playlist_songs(encoded, playlist_repo, artist_repo, album_repo)
payload = {**_playlist_dict(playlist, user.username, song_count=len(songs)), "entry": songs}
return subsonic_response({"playlist": payload}, fmt=fmt)
@router.api_route("/updatePlaylist", methods=["GET", "POST"])
@router.api_route("/updatePlaylist.view", methods=["GET", "POST"])
async def update_playlist(
user: SubsonicUser,
fmt: SubsonicFormat,
playlist_repo: PlaylistRepoDep,
playlistId: Annotated[str, Query()],
name: Annotated[str | None, Query()] = None,
comment: Annotated[str | None, Query()] = None,
songIdToAdd: Annotated[list[str] | None, Query()] = None,
songIndexToRemove: Annotated[list[int] | None, Query()] = None,
) -> Response:
playlist = await _owned_playlist(playlistId, playlist_repo, user)
if name is not None or comment is not None:
playlist = await playlist_repo.update(playlist.id, name=name, description=comment)
# Removals are by index into the current ordered track list — resolve first.
if songIndexToRemove:
current = await playlist_repo.get_tracks(playlist.id, limit=10_000, offset=0)
for index in sorted(set(songIndexToRemove)):
if 0 <= index < len(current):
await playlist_repo.remove_track(playlist.id, current[index].id)
if songIdToAdd:
position = await playlist_repo.max_position(playlist.id)
for raw in songIdToAdd:
position += 1.0
await playlist_repo.add_track(playlist.id, decode_track(raw), position=position)
return subsonic_response(fmt=fmt)
@router.api_route("/deletePlaylist", methods=["GET", "POST"])
@router.api_route("/deletePlaylist.view", methods=["GET", "POST"])
async def delete_playlist(
user: SubsonicUser,
fmt: SubsonicFormat,
playlist_repo: PlaylistRepoDep,
id: Annotated[str, Query()],
) -> Response:
playlist = await _owned_playlist(id, playlist_repo, user)
await playlist_repo.delete(playlist.id)
return subsonic_response(fmt=fmt)
+80 -5
View File
@@ -1,11 +1,86 @@
"""Subsonic search endpoints."""
"""Subsonic search endpoints — search3 over the library repositories.
from typing import Any
Mirrors the native ``/api/v1/search/library`` fan-out (tracks/albums/artists),
reshaped into the Subsonic ``searchResult3`` element. An empty query returns
results so clients can use search3 to browse.
"""
from fastapi import APIRouter
from typing import Annotated, Any
from fastapi import APIRouter, Query, Response
from app.api.deps import AlbumRepoDep, ArtistRepoDep, SubsonicFormat, SubsonicUser, TrackRepoDep
from app.api.rest.envelope import subsonic_response
from app.api.rest.serializers import album_dict, artist_dict, song_dict
router = APIRouter()
@router.get("/search3")
async def search3() -> Any: ...
@router.api_route("/search3", methods=["GET", "POST"])
@router.api_route("/search3.view", methods=["GET", "POST"])
async def search3(
_user: SubsonicUser,
fmt: SubsonicFormat,
track_repo: TrackRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
query: Annotated[str, Query()] = "",
artistCount: Annotated[int, Query(ge=0, le=500)] = 20,
artistOffset: Annotated[int, Query(ge=0)] = 0,
albumCount: Annotated[int, Query(ge=0, le=500)] = 20,
albumOffset: Annotated[int, Query(ge=0)] = 0,
songCount: Annotated[int, Query(ge=0, le=500)] = 20,
songOffset: Annotated[int, Query(ge=0)] = 0,
) -> Response:
# Subsonic sends "" (and some clients '""') to mean "everything".
q: str | None = query.strip().strip('"') or None
artists_out: list[dict[str, Any]] = []
if artistCount:
artists = await artist_repo.list(q=q, limit=artistCount, offset=artistOffset)
for a in artists:
album_count = await artist_repo.album_count(a.id)
artists_out.append(artist_dict(a, album_count=album_count))
albums_out: list[dict[str, Any]] = []
if albumCount:
albums = await album_repo.list(artist_id=None, q=q, limit=albumCount, offset=albumOffset)
album_artist_ids = list({a.artist_id for a in albums})
album_artist_map = {a.id: a for a in await artist_repo.get_many(album_artist_ids)}
counts = await album_repo.track_count_many([a.id for a in albums])
albums_out = [
album_dict(a, album_artist_map.get(a.artist_id), song_count=counts.get(a.id, 0))
for a in albums
]
songs_out: list[dict[str, Any]] = []
if songCount:
tracks = await track_repo.list(
artist_id=None,
album_id=None,
q=q,
sort_by="title",
order="asc",
limit=songCount,
offset=songOffset,
)
song_artist_map = {
a.id: a for a in await artist_repo.get_many(list({t.artist_id for t in tracks}))
}
song_album_map = {
a.id: a
for a in await album_repo.get_many(
list({t.album_id for t in tracks if t.album_id is not None})
)
}
songs_out = [
song_dict(
t,
song_artist_map.get(t.artist_id),
song_album_map.get(t.album_id) if t.album_id is not None else None,
)
for t in tracks
]
payload = {"searchResult3": {"artist": artists_out, "album": albums_out, "song": songs_out}}
return subsonic_response(payload, fmt=fmt)
+92
View File
@@ -0,0 +1,92 @@
"""Entity → Subsonic child-dict mappers (presentation only).
Pure functions turning domain entities into the attribute dicts the envelope
serializer renders as ``<artist>`` / ``<album>`` / ``<song>`` elements (or their
JSON equivalents). No business logic — they only reshape and rename.
"""
import datetime as dt
from typing import Any
from app.api.rest.ids import encode_album, encode_artist, encode_track
from app.domain.entities import Album, Artist, Track
# Suffix → MIME, for the ``contentType``/``suffix`` song attributes. A
# presentation detail (mirrors StreamingService's content-type negotiation).
_CONTENT_TYPE: dict[str, str] = {
"mp3": "audio/mpeg",
"flac": "audio/flac",
"m4a": "audio/mp4",
"aac": "audio/aac",
"ogg": "audio/ogg",
"opus": "audio/ogg",
"wav": "audio/wav",
"aiff": "audio/aiff",
"aif": "audio/aiff",
}
def iso(value: dt.datetime) -> str:
return value.astimezone(dt.UTC).strftime("%Y-%m-%dT%H:%M:%S.000Z")
def content_type_for(file_format: str) -> str:
return _CONTENT_TYPE.get(file_format.lower(), "application/octet-stream")
def artist_dict(artist: Artist, *, album_count: int) -> dict[str, Any]:
return {
"id": encode_artist(artist.id),
"name": artist.name,
"albumCount": album_count,
"coverArt": encode_artist(artist.id),
}
def album_dict(
album: Album,
artist: Artist | None,
*,
song_count: int,
duration: int | None = None,
) -> dict[str, Any]:
return {
"id": encode_album(album.id),
"name": album.title,
"title": album.title,
"artist": artist.name if artist is not None else None,
"artistId": encode_artist(album.artist_id),
"coverArt": encode_album(album.id),
"songCount": song_count,
"duration": duration,
"created": iso(album.created_at),
"year": album.year,
}
def song_dict(
track: Track,
artist: Artist | None,
album: Album | None,
) -> dict[str, Any]:
cover = encode_album(track.album_id) if track.album_id is not None else encode_track(track.id)
return {
"id": encode_track(track.id),
"parent": encode_album(track.album_id) if track.album_id is not None else None,
"isDir": False,
"title": track.title,
"album": album.title if album is not None else None,
"artist": artist.name if artist is not None else None,
"albumId": encode_album(track.album_id) if track.album_id is not None else None,
"artistId": encode_artist(track.artist_id),
"coverArt": cover,
"size": track.file_size,
"contentType": content_type_for(track.file_format),
"suffix": track.file_format,
"duration": track.duration_seconds,
"year": track.year,
"genre": track.genre,
"created": iso(track.created_at),
"type": "music",
"isVideo": False,
}
+13 -6
View File
@@ -1,15 +1,22 @@
"""Subsonic system endpoints: ping and license."""
from typing import Any
from fastapi import APIRouter, Response
from fastapi import APIRouter
from app.api.deps import SubsonicFormat, SubsonicUser
from app.api.rest.envelope import subsonic_response
router = APIRouter()
@router.get("/ping")
async def ping() -> Any: ...
@router.api_route("/ping", methods=["GET", "POST"])
@router.api_route("/ping.view", methods=["GET", "POST"])
async def ping(_user: SubsonicUser, fmt: SubsonicFormat) -> Response:
# Requiring auth makes ping a credential check — exactly how clients use it.
return subsonic_response(fmt=fmt)
@router.get("/getLicense")
async def get_license() -> Any: ...
@router.api_route("/getLicense", methods=["GET", "POST"])
@router.api_route("/getLicense.view", methods=["GET", "POST"])
async def get_license(_user: SubsonicUser, fmt: SubsonicFormat) -> Response:
# Self-hosted and free — the license is always valid.
return subsonic_response({"license": {"valid": True}}, fmt=fmt)
+29
View File
@@ -0,0 +1,29 @@
"""Schemas for the source endpoints."""
from pydantic import BaseModel
from app.domain.sources import SourceInfo
class SourceInfoOut(BaseModel):
name: str
label: str
kind: str
available: bool
@classmethod
def from_entity(cls, info: SourceInfo) -> SourceInfoOut:
return cls(name=info.name, label=info.label, kind=info.kind, available=info.available)
class ScanResponse(BaseModel):
"""Result of enqueuing a source scan."""
source: str
job_id: str
status: str = "queued"
class SourceHealthOut(BaseModel):
name: str
available: bool
+13
View File
@@ -0,0 +1,13 @@
"""Schemas for Subsonic app-password self-service (native /api/v1 surface).
The Subsonic /rest layer itself returns its own XML/JSON envelope, not these
pydantic models — these only back the lifecycle endpoints that reveal/rotate the
recoverable app-password."""
from pydantic import BaseModel
class SubsonicPasswordResponse(BaseModel):
"""The plaintext Subsonic app-password, for pasting into a client."""
password: str
+10 -1
View File
@@ -9,7 +9,8 @@ from typing import Any
from fastapi import APIRouter, Query, status
from app.api.deps import SuperUser, UserServiceDep
from app.api.deps import SubsonicAuthServiceDep, SuperUser, UserServiceDep
from app.api.schemas.subsonic import SubsonicPasswordResponse
from app.api.schemas.user import (
CreateUserRequest,
ResetPasswordRequest,
@@ -81,6 +82,14 @@ async def deactivate_user(
return UserResponse.from_entity(await users.deactivate(user_id))
@router.post("/users/{user_id}/subsonic-password", response_model=SubsonicPasswordResponse)
async def rotate_user_subsonic_password(
user_id: uuid.UUID, _admin: SuperUser, subsonic: SubsonicAuthServiceDep
) -> SubsonicPasswordResponse:
"""Rotate any user's Subsonic app-password and return the new plaintext."""
return SubsonicPasswordResponse(password=await subsonic.rotate(user_id))
@router.get("/services")
async def list_services(_admin: SuperUser) -> Any: ...
+30 -5
View File
@@ -1,19 +1,44 @@
"""External source endpoints (yt-dlp etc.)."""
"""External source endpoints: enumerate sources and trigger imports.
Listing/health are read-only (any authenticated user). Scanning a source is an
admin action and runs in a worker — the endpoint only enqueues it.
"""
from typing import Any
from fastapi import APIRouter
from app.api.deps import CurrentUser, SourceRegistryDep, SuperUser
from app.api.schemas.source import ScanResponse, SourceHealthOut, SourceInfoOut
from app.domain.errors import DependencyUnavailableError
from app.workers.queue import enqueue
router = APIRouter(prefix="/sources", tags=["sources"])
@router.get("")
async def list_sources() -> Any: ...
async def list_sources(_: CurrentUser, registry: SourceRegistryDep) -> list[SourceInfoOut]:
return [SourceInfoOut.from_entity(info) for info in registry.infos()]
@router.get("/{source}/search")
async def search_source(source: str) -> Any: ...
@router.post("/{source}/scan")
async def scan_source(source: str, user: SuperUser, registry: SourceRegistryDep) -> ScanResponse:
backend = registry.indexable(source) # 404 if unknown, 422 if not indexable
if not backend.is_available():
raise DependencyUnavailableError(f"Source {source!r} is not available.")
job_id = await enqueue("scan_local_folder", source=source, added_by=str(user.id))
return ScanResponse(source=source, job_id=job_id)
@router.get("/{source}/health")
async def source_health(source: str) -> Any: ...
async def source_health(
source: str, _: CurrentUser, registry: SourceRegistryDep
) -> SourceHealthOut:
backend = registry.get(source) # 404 if unknown
return SourceHealthOut(name=backend.name, available=backend.is_available())
@router.get("/{source}/search")
async def search_source(source: str, _: CurrentUser) -> Any:
# Search is for fetch-style sources (youtube, …) — not yet implemented.
...
+13 -1
View File
@@ -11,6 +11,7 @@ from app.api.schemas.track import TrackOut, TrackUpdate
from app.domain.entities.album import Album
from app.domain.entities.track import Artist, Track
from app.domain.errors import NotFoundError
from app.workers.queue import enqueue
router = APIRouter(prefix="/tracks", tags=["tracks"])
@@ -147,7 +148,18 @@ async def get_track_cover(track_id: uuid.UUID, _: CurrentUser) -> Any: ...
@router.post("/{track_id}/metadata/enrich")
async def enrich_metadata(track_id: uuid.UUID, _: CurrentUser) -> Any: ...
async def enrich_metadata(
track_id: uuid.UUID,
track_repo: TrackRepoDep,
_: CurrentUser,
) -> dict[str, str]:
"""Re-run metadata enrichment for a track (admin/user-triggered). The work
happens in a worker; this only enqueues it. 503 if the queue is down."""
track = await track_repo.get_by_id(track_id)
if track is None:
raise NotFoundError(f"Track {track_id} not found.")
job_id = await enqueue("enrich_track", track_id=str(track_id))
return {"track_id": str(track_id), "job_id": job_id}
@router.get("/{track_id}/metadata/matches")
+21 -1
View File
@@ -2,7 +2,8 @@
from fastapi import APIRouter, status
from app.api.deps import CurrentUser, UserServiceDep
from app.api.deps import CurrentUser, SubsonicAuthServiceDep, UserServiceDep
from app.api.schemas.subsonic import SubsonicPasswordResponse
from app.api.schemas.user import ChangePasswordRequest
router = APIRouter(prefix="/users", tags=["users"])
@@ -17,3 +18,22 @@ async def change_my_password(
current_password=body.current_password,
new_password=body.new_password,
)
@router.get("/me/subsonic-password", response_model=SubsonicPasswordResponse)
async def reveal_my_subsonic_password(
user: CurrentUser, subsonic: SubsonicAuthServiceDep
) -> SubsonicPasswordResponse:
"""Reveal the caller's Subsonic app-password for copying into a client.
It's recoverable, so it can be read on demand; one is generated lazily on
first access. Paste it (with the username) into Symfonium/DSub."""
return SubsonicPasswordResponse(password=await subsonic.reveal(user.id))
@router.post("/me/subsonic-password", response_model=SubsonicPasswordResponse)
async def rotate_my_subsonic_password(
user: CurrentUser, subsonic: SubsonicAuthServiceDep
) -> SubsonicPasswordResponse:
"""Rotate the caller's Subsonic app-password (invalidates the previous one)."""
return SubsonicPasswordResponse(password=await subsonic.rotate(user.id))
+106
View File
@@ -0,0 +1,106 @@
"""LibraryImportService — imports files discovered by an indexable source.
Batch sibling of :class:`UploadService`: for each discovered file it dedups on
``(source, source_id)``, copies the file into managed storage, creates a minimal
track (artist ``Unknown Artist``, ``metadata_status=pending``), and leaves the
rest to enrichment (plan §6.2). Per-file failures are isolated — one bad file
must not abort the whole scan (graceful degradation).
"""
import contextlib
import uuid
from dataclasses import dataclass, field
from app.core.logging import get_logger
from app.domain.ports import ArtistRepository, FileStorage, IndexableSource, TrackRepository
from app.domain.sources import SourceFile
log = get_logger(__name__)
_UNKNOWN_ARTIST = "Unknown Artist"
@dataclass(frozen=True)
class ImportSummary:
source: str
seen: int
imported: int
skipped: int
failed: int
# IDs of freshly imported tracks, for the caller to enqueue enrichment
# *after* its transaction commits (enqueuing mid-scan would race the worker).
imported_ids: list[uuid.UUID] = field(default_factory=list)
class LibraryImportService:
def __init__(
self,
*,
tracks: TrackRepository,
artists: ArtistRepository,
storage: FileStorage,
) -> None:
self._tracks = tracks
self._artists = artists
self._storage = storage
async def scan_and_import(
self, source: IndexableSource, *, added_by: uuid.UUID | None
) -> ImportSummary:
seen = skipped = failed = 0
imported_ids: list[uuid.UUID] = []
for file in source.scan():
seen += 1
try:
existing = await self._tracks.get_by_source(source.name, file.source_id)
if existing is not None:
skipped += 1
continue
track_id = await self._import_one(source.name, file, added_by)
imported_ids.append(track_id)
except Exception:
failed += 1
log.warning("import_file_failed", source=source.name, source_id=file.source_id)
summary = ImportSummary(
source=source.name,
seen=seen,
imported=len(imported_ids),
skipped=skipped,
failed=failed,
imported_ids=imported_ids,
)
log.info(
"import_complete",
source=summary.source,
seen=summary.seen,
imported=summary.imported,
skipped=summary.skipped,
failed=summary.failed,
)
return summary
async def _import_one(
self, source_name: str, file: SourceFile, added_by: uuid.UUID | None
) -> uuid.UUID:
track_id = uuid.uuid4()
key = f"tracks/{str(track_id)[:2]}/{track_id}.{file.file_format}"
await self._storage.save_file(key, file.path)
try:
artist = await self._artists.get_or_create(_UNKNOWN_ARTIST)
await self._tracks.add(
id=track_id,
title=file.suggested_title,
artist_id=artist.id,
storage_uri=key,
file_format=file.file_format,
file_size=file.file_size,
source=source_name,
source_id=file.source_id,
metadata_status="pending",
added_by=added_by,
)
except Exception:
with contextlib.suppress(Exception):
await self._storage.delete(key)
raise
return track_id
+174
View File
@@ -0,0 +1,174 @@
"""MetadataEnrichmentService — the §6.2 pipeline orchestrator.
Order (tag-first): embedded tags → Chromaprint fingerprint → AcoustID lookup.
Tags fix the common well-tagged case offline; AcoustID identifies the rest and
supplies a MusicBrainz id. The result updates the track and sets
``metadata_status`` to ``enriched`` (identity found) or ``failed`` (nothing).
Invariants (plan §6.2, CLAUDE.md):
- **Never touch ``manual``** — a user-edited track is returned untouched.
- **Graceful degradation** — every external step is wrapped; one failure (no
fpcalc, no API key, service down) degrades the result, never crashes.
- **Idempotent** — re-running only fills gaps; ``apply_enrichment`` never erases.
"""
import uuid
from dataclasses import dataclass
from app.core.logging import get_logger
from app.domain.entities.metadata import AudioTags, RecordingMatch
from app.domain.ports import (
AcoustIdClient,
AlbumRepository,
ArtistRepository,
AudioFingerprinter,
AudioTagReader,
FileStorage,
TrackRepository,
)
log = get_logger(__name__)
_UNKNOWN_ARTIST = "Unknown Artist"
@dataclass(frozen=True)
class EnrichmentResult:
track_id: uuid.UUID
status: str # "enriched" | "failed" | "skipped"
matched_mbid: str | None = None
class MetadataEnrichmentService:
def __init__(
self,
*,
tracks: TrackRepository,
artists: ArtistRepository,
albums: AlbumRepository,
storage: FileStorage,
tag_reader: AudioTagReader,
fingerprinter: AudioFingerprinter,
acoustid: AcoustIdClient,
) -> None:
self._tracks = tracks
self._artists = artists
self._albums = albums
self._storage = storage
self._tag_reader = tag_reader
self._fingerprinter = fingerprinter
self._acoustid = acoustid
async def enrich(self, track_id: uuid.UUID) -> EnrichmentResult:
track = await self._tracks.get_by_id(track_id)
if track is None:
log.info("enrich_track_missing", track_id=str(track_id))
return EnrichmentResult(track_id=track_id, status="skipped")
if track.metadata_status == "manual":
log.info("enrich_skip_manual", track_id=str(track_id))
return EnrichmentResult(track_id=track_id, status="skipped")
tags = await self._read_local(track.storage_uri)
match = await self._identify(track.storage_uri)
# Merge sources: prefer embedded tags, fall back to the AcoustID match.
# ``title`` is guaranteed non-None by the existing track title; the rest
# stay None when neither source has them.
tag_title = tags.title if tags else None
tag_artist = tags.artist if tags else None
tag_album = tags.album if tags else None
title = _opt_str(tag_title, match.title if match else None) or track.title
artist_name = _opt_str(tag_artist, match.artist if match else None)
album_title = _opt_str(tag_album, match.album if match else None)
year = _first_int(tags.year if tags else None, match.year if match else None)
genre = tags.genre if tags else None
track_number = tags.track_number if tags else None
duration = _first_int(
tags.duration_seconds if tags else None,
track.duration_seconds,
)
bitrate = tags.bitrate if tags else None
mbid = match.recording_mbid if match else None
acoustid_id = match.acoustid if match else None
artist_id = await self._resolve_artist(artist_name, fallback=track.artist_id)
album_id = await self._resolve_album(album_title, artist_id=artist_id, year=year, mbid=mbid)
identified = bool(artist_name) or album_id is not None or mbid is not None
status = "enriched" if identified else "failed"
await self._tracks.apply_enrichment(
track_id,
title=title,
artist_id=artist_id,
album_id=album_id,
genre=genre,
year=year,
track_number=track_number,
duration_seconds=duration,
bitrate=bitrate,
acoustid_fingerprint=acoustid_id,
musicbrainz_id=mbid,
metadata_status=status,
)
log.info("enrich_complete", track_id=str(track_id), status=status, mbid=mbid)
return EnrichmentResult(track_id=track_id, status=status, matched_mbid=mbid)
async def _read_local(self, storage_uri: str) -> AudioTags | None:
try:
async with self._storage.as_local_path(storage_uri) as path:
return await self._tag_reader.read(path)
except Exception:
log.warning("enrich_tag_step_failed", storage_uri=storage_uri)
return None
async def _identify(self, storage_uri: str) -> RecordingMatch | None:
if not self._acoustid.is_available() or not self._fingerprinter.is_available():
return None
try:
async with self._storage.as_local_path(storage_uri) as path:
fingerprint = await self._fingerprinter.calculate(path)
if fingerprint is None:
return None
return await self._acoustid.lookup(fingerprint)
except Exception:
log.warning("enrich_identify_step_failed", storage_uri=storage_uri)
return None
async def _resolve_artist(self, name: str | None, *, fallback: uuid.UUID) -> uuid.UUID:
if not name or name == _UNKNOWN_ARTIST:
return fallback
artist = await self._artists.get_or_create(name)
return artist.id
async def _resolve_album(
self,
title: str | None,
*,
artist_id: uuid.UUID,
year: int | None,
mbid: str | None,
) -> uuid.UUID | None:
if not title:
return None
album = await self._albums.get_or_create(
title=title,
artist_id=artist_id,
year=year,
musicbrainz_id=mbid,
)
return album.id
def _opt_str(*values: str | None) -> str | None:
for value in values:
if value:
return value
return None
def _first_int(*values: int | None) -> int | None:
for value in values:
if value is not None:
return value
return None
+100
View File
@@ -0,0 +1,100 @@
"""SubsonicAuthService — app-password lifecycle + Subsonic auth verification.
The Subsonic protocol authenticates with either ``t=md5(password+salt)`` (+``s``)
or the legacy ``p=`` (plaintext or ``enc:<hex>``). Both need a *recoverable*
secret server-side, so Subsonic clients authenticate against a dedicated,
high-entropy app-password — never the argon2 login password. That app-password
is encrypted at rest (:class:`~app.domain.ports.SubsonicCipher`) and decrypted
only here, to verify a request or to reveal it for copying into a client.
This is an adapter over the existing user store; it adds no business state of its
own beyond the app-password column (CLAUDE.md: Subsonic is an adapter, not a
reimplementation).
"""
import hashlib
import hmac
import uuid
from app.core.security import generate_subsonic_password
from app.domain.entities import User
from app.domain.errors import AuthenticationError, NotFoundError, ValidationError
from app.domain.ports import SubsonicCipher, UserRepository
def _md5_hex(value: str) -> str:
return hashlib.md5(value.encode("utf-8"), usedforsecurity=False).hexdigest()
def _decode_legacy_password(p: str) -> str:
"""Decode a Subsonic ``p`` param: ``enc:<hex>`` (hex-encoded) or plaintext."""
if p.startswith("enc:"):
try:
return bytes.fromhex(p[4:]).decode("utf-8")
except ValueError as exc:
raise AuthenticationError("Wrong username or password.") from exc
return p
class SubsonicAuthService:
def __init__(self, *, users: UserRepository, cipher: SubsonicCipher) -> None:
self._users = users
self._cipher = cipher
async def authenticate(
self,
*,
username: str | None,
token: str | None,
salt: str | None,
password: str | None,
) -> User:
"""Resolve Subsonic query auth params to a domain :class:`User`.
Raises :class:`ValidationError` (Subsonic code 10) for missing params and
:class:`AuthenticationError` (code 40) for any credential mismatch — an
unknown user is reported identically to a wrong password (no enumeration).
"""
if not username:
raise ValidationError("Required parameter 'u' is missing.")
if not ((token and salt) or password):
raise ValidationError("Required authentication parameter is missing.")
creds = await self._users.get_subsonic_credentials_by_username(username)
if creds is None or not creds.user.is_active or creds.password_enc is None:
raise AuthenticationError("Wrong username or password.")
app_password = self._cipher.decrypt(creds.password_enc)
if token and salt:
expected = _md5_hex(app_password + salt)
if not hmac.compare_digest(expected, token.lower()):
raise AuthenticationError("Wrong username or password.")
else:
assert password is not None # guaranteed by the missing-param check above
supplied = _decode_legacy_password(password)
if not hmac.compare_digest(supplied, app_password):
raise AuthenticationError("Wrong username or password.")
return creds.user
async def rotate(self, user_id: uuid.UUID) -> str:
"""Generate a fresh app-password, store it encrypted, return the plaintext."""
await self._require_user(user_id)
password = generate_subsonic_password()
await self._users.set_subsonic_password_enc(user_id, self._cipher.encrypt(password))
return password
async def reveal(self, user_id: uuid.UUID) -> str:
"""Return the current app-password, generating one on first access."""
await self._require_user(user_id)
enc = await self._users.get_subsonic_password_enc(user_id)
if enc is None:
return await self.rotate(user_id)
return self._cipher.decrypt(enc)
async def _require_user(self, user_id: uuid.UUID) -> User:
user = await self._users.get_by_id(user_id)
if user is None:
raise NotFoundError("User not found.")
return user
+7 -1
View File
@@ -5,6 +5,7 @@ import hashlib
import os
import tempfile
import uuid
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from pathlib import Path
from typing import Protocol
@@ -14,6 +15,8 @@ import anyio
from app.domain.entities.user import User
from app.domain.ports import ArtistRepository, FileStorage, TrackRepository
EnrichEnqueuer = Callable[[uuid.UUID], Awaitable[None]]
class UploadFileProtocol(Protocol):
filename: str | None
@@ -49,11 +52,13 @@ class UploadService:
artists: ArtistRepository,
storage: FileStorage,
tmp_dir: Path | None = None,
enqueue_enrich: EnrichEnqueuer | None = None,
) -> None:
self._tracks = tracks
self._artists = artists
self._storage = storage
self._tmp_dir = tmp_dir
self._enqueue_enrich = enqueue_enrich
async def handle_upload(
self,
@@ -105,7 +110,8 @@ class UploadService:
await self._storage.delete(key)
raise
# TODO(1D): enqueue metadata enrichment task
if self._enqueue_enrich is not None:
await self._enqueue_enrich(track.id)
return UploadResult(
track_id=track.id,
+17
View File
@@ -45,6 +45,12 @@ class Settings(BaseSettings):
access_token_ttl_seconds: int = 60 * 15 # 15 min
refresh_token_ttl_seconds: int = 60 * 60 * 24 * 30 # 30 days (offline-first)
# -- subsonic ---------------------------------------------------------
# Symmetric key (any string) used to encrypt each user's recoverable
# Subsonic app-password at rest. A Fernet key is derived from it; rotating
# this value renders stored app-passwords undecryptable (rotate them too).
subsonic_secret_key: SecretStr = SecretStr("change-me-subsonic-key")
# -- media / storage --------------------------------------------------
media_path: Path = Path("/data/media")
transcode_cache_path: Path = Path("/data/transcode-cache")
@@ -52,6 +58,11 @@ class Settings(BaseSettings):
storage_backend: Literal["local", "s3"] = "local"
upload_tmp_dir: Path | None = None
# -- sources ----------------------------------------------------------
# Mounted folder the ``local`` source indexes (copies into managed storage).
# Unset → the local source is simply not registered.
local_media_import_path: Path | None = None
# -- S3 storage (deferred; set storage_backend="s3" to use) ----------
s3_endpoint_url: str | None = None
s3_bucket: str | None = None
@@ -62,9 +73,15 @@ class Settings(BaseSettings):
# -- external services (all optional; graceful degradation) ----------
ml_service_url: str | None = None
acoustid_api_key: SecretStr | None = None
acoustid_api_url: str = "https://api.acoustid.org/v2/lookup"
musicbrainz_user_agent: str = "mcma-backend/0.1.0 ( https://github.com/your/repo )"
youtube_cookies_path: Path | None = None
# -- enrichment -------------------------------------------------------
# ``fpcalc`` (Chromaprint) binary; resolved on PATH by default. The Docker
# image installs it via libchromaprint-tools.
fpcalc_path: str = "fpcalc"
@field_validator("database_url")
@classmethod
def _require_async_driver(cls, v: str) -> str:
+38
View File
@@ -6,16 +6,54 @@ to this module (CLAUDE.md: security is a cross-cutting concern in ``core``).
Higher layers depend only on the Protocols, never on pwdlib/pyjwt directly.
"""
import base64
import datetime as dt
import hashlib
import secrets
import uuid
import jwt
from cryptography.fernet import Fernet, InvalidToken
from pwdlib import PasswordHash
from app.core.config import Settings
from app.domain.errors import AuthenticationError
from app.domain.tokens import IssuedToken, TokenClaims, TokenType
# Length (in bytes of entropy) of a generated Subsonic app-password. 18 bytes of
# url-safe base64 → 24 characters, well above the Subsonic auth threat model.
_SUBSONIC_PASSWORD_ENTROPY_BYTES = 18
def generate_subsonic_password() -> str:
"""A fresh, high-entropy Subsonic app-password (url-safe, ~24 chars)."""
return secrets.token_urlsafe(_SUBSONIC_PASSWORD_ENTROPY_BYTES)
class SubsonicPasswordCipher:
"""Symmetric encrypt/decrypt for the recoverable Subsonic app-password.
Subsonic auth (``t=md5(password+salt)`` and legacy ``p=``) needs the plaintext
password server-side, so — unlike the argon2-hashed login password — the
app-password is stored *encrypted*, not hashed. A Fernet key (AES-128-CBC +
HMAC) is derived from the configured secret; the plaintext key never touches
the DB. Implements :class:`app.domain.ports.SubsonicCipher`.
"""
def __init__(self, secret_key: str) -> None:
digest = hashlib.sha256(secret_key.encode("utf-8")).digest()
self._fernet = Fernet(base64.urlsafe_b64encode(digest))
def encrypt(self, plaintext: str) -> str:
return self._fernet.encrypt(plaintext.encode("utf-8")).decode("ascii")
def decrypt(self, token: str) -> str:
try:
return self._fernet.decrypt(token.encode("ascii")).decode("utf-8")
except InvalidToken as exc:
# Wrong/rotated secret key, or corrupted ciphertext.
raise AuthenticationError("Stored Subsonic password could not be decrypted.") from exc
class Argon2PasswordHasher:
"""argon2id hasher with sensible defaults from pwdlib."""
+6 -1
View File
@@ -3,19 +3,24 @@
from app.domain.entities.album import Album
from app.domain.entities.history import PlayHistoryEntry
from app.domain.entities.like import Like
from app.domain.entities.metadata import AudioTags, Fingerprint, RecordingMatch
from app.domain.entities.playlist import Playlist
from app.domain.entities.storage import ObjectStat
from app.domain.entities.track import Artist, Track
from app.domain.entities.user import Credentials, User
from app.domain.entities.user import Credentials, SubsonicCredentials, User
__all__ = [
"Album",
"Artist",
"AudioTags",
"Credentials",
"Fingerprint",
"Like",
"ObjectStat",
"PlayHistoryEntry",
"Playlist",
"RecordingMatch",
"SubsonicCredentials",
"Track",
"User",
]
+53
View File
@@ -0,0 +1,53 @@
"""Value objects for the metadata-enrichment pipeline (plan §6.2).
Pure data carriers between the enrichment service and its adapters (tag reader,
fingerprinter, AcoustID). No framework imports — these cross the domain boundary.
"""
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class AudioTags:
"""Embedded tags read from the file itself (ID3 / Vorbis / MP4 …).
Every field is optional — files are tagged inconsistently. The reader fills
what it can and leaves the rest ``None`` for downstream identification.
"""
title: str | None = None
artist: str | None = None
album: str | None = None
album_artist: str | None = None
genre: str | None = None
year: int | None = None
track_number: int | None = None
duration_seconds: int | None = None
bitrate: int | None = None
@dataclass(frozen=True, slots=True)
class Fingerprint:
"""Chromaprint fingerprint plus the decoded duration (both needed by AcoustID)."""
fingerprint: str
duration_seconds: int
@dataclass(frozen=True, slots=True)
class RecordingMatch:
"""A single AcoustID result, flattened to the fields enrichment cares about.
``acoustid`` is the stable AcoustID identifier (a UUID) — used as the
dedup key persisted on ``track.acoustid_fingerprint`` (fits the 64-char
column; the raw fingerprint does not). ``recording_mbid`` is the MusicBrainz
recording id when present.
"""
acoustid: str
score: float
recording_mbid: str | None = None
title: str | None = None
artist: str | None = None
album: str | None = None
year: int | None = None
+11
View File
@@ -31,3 +31,14 @@ class Credentials:
user: User
password_hash: str
@dataclass(frozen=True, slots=True)
class SubsonicCredentials:
"""A user paired with their *encrypted* Subsonic app-password.
``password_enc`` is ``None`` until the user generates one. Stays inside the
application layer; the plaintext is only recovered for auth verification."""
user: User
password_enc: str | None
+102 -2
View File
@@ -7,21 +7,26 @@ are bound to these ports at the composition root (``app.api.deps``).
import datetime as dt
import uuid
from collections.abc import AsyncIterator
from collections.abc import AsyncIterator, Iterator
from contextlib import AbstractAsyncContextManager
from pathlib import Path
from typing import Protocol
from app.domain.entities import (
Album,
AudioTags,
Credentials,
Fingerprint,
Like,
ObjectStat,
PlayHistoryEntry,
Playlist,
RecordingMatch,
SubsonicCredentials,
User,
)
from app.domain.entities.track import Artist, Track
from app.domain.sources import SourceFile, SourceInfo
from app.domain.tokens import IssuedToken, TokenClaims, TokenType
@@ -34,6 +39,19 @@ class UserRepository(Protocol):
async def set_superuser(self, user_id: uuid.UUID, is_superuser: bool) -> User: ...
async def set_active(self, user_id: uuid.UUID, is_active: bool) -> User: ...
async def count(self) -> int: ...
# -- subsonic app-password (recoverable, encrypted at rest) ----------
async def get_subsonic_credentials_by_username(
self, username: str
) -> SubsonicCredentials | None: ...
async def get_subsonic_password_enc(self, user_id: uuid.UUID) -> str | None: ...
async def set_subsonic_password_enc(self, user_id: uuid.UUID, password_enc: str) -> None: ...
class SubsonicCipher(Protocol):
"""Symmetric encrypt/decrypt for the recoverable Subsonic app-password."""
def encrypt(self, plaintext: str) -> str: ...
def decrypt(self, token: str) -> str: ...
class RefreshTokenRepository(Protocol):
@@ -109,6 +127,9 @@ class TrackRepository(Protocol):
added_by: uuid.UUID | None,
) -> Track: ...
async def delete(self, track_id: uuid.UUID) -> None: ...
# genres must come before ``list`` — the method named ``list`` shadows the
# builtin in later annotations (same pattern as AlbumRepository below).
async def genres(self) -> list[tuple[str, int]]: ...
async def list(
self,
*,
@@ -135,9 +156,38 @@ class TrackRepository(Protocol):
genre: str | None,
year: int | None,
) -> Track: ...
async def apply_enrichment(
self,
track_id: uuid.UUID,
*,
title: str,
artist_id: uuid.UUID,
album_id: uuid.UUID | None,
genre: str | None,
year: int | None,
track_number: int | None,
duration_seconds: int | None,
bitrate: int | None,
acoustid_fingerprint: str | None,
musicbrainz_id: str | None,
metadata_status: str,
) -> Track:
"""Persist auto-enrichment results. Nullable fields are filled only when
a non-``None`` value is supplied (re-enrich never erases prior data);
``title``/``artist_id``/``metadata_status`` are always written. Callers
must not invoke this for ``metadata_status == 'manual'`` tracks."""
...
class AlbumRepository(Protocol):
async def get_or_create(
self,
*,
title: str,
artist_id: uuid.UUID,
year: int | None,
musicbrainz_id: str | None,
) -> Album: ...
async def get_by_id(self, album_id: uuid.UUID) -> Album | None: ...
async def get_many(self, ids: list[uuid.UUID]) -> list[Album]: ...
async def count(self, *, artist_id: uuid.UUID | None, q: str | None) -> int: ...
@@ -145,7 +195,14 @@ class AlbumRepository(Protocol):
async def track_count_many(self, album_ids: list[uuid.UUID]) -> dict[uuid.UUID, int]: ...
# list must come after any method using list[...] in its signature (name shadowing)
async def list(
self, *, artist_id: uuid.UUID | None, q: str | None, limit: int, offset: int
self,
*,
artist_id: uuid.UUID | None,
q: str | None,
limit: int,
offset: int,
sort_by: str = "title",
order: str = "asc",
) -> list[Album]: ...
@@ -197,3 +254,46 @@ class HistoryRepository(Protocol):
self, *, user_id: uuid.UUID, limit: int, offset: int
) -> list[PlayHistoryEntry]: ...
async def count(self, *, user_id: uuid.UUID) -> int: ...
class SourceBackend(Protocol):
"""A registered source of tracks (mounted folder, YouTube, …).
``name`` is the stable identifier used in URLs and stored on ``track.source``.
"""
name: str
def info(self) -> SourceInfo: ...
def is_available(self) -> bool: ...
class IndexableSource(SourceBackend, Protocol):
"""A source that enumerates files already on disk (e.g. the local folder)."""
def scan(self) -> Iterator[SourceFile]: ...
# -- metadata enrichment (plan §6.2) -----------------------------------------
class AudioTagReader(Protocol):
"""Reads embedded tags from a local audio file. Returns ``None`` only when
the file can't be parsed at all — never raises (graceful degradation)."""
async def read(self, path: Path) -> AudioTags | None: ...
class AudioFingerprinter(Protocol):
"""Chromaprint (fpcalc) wrapper. ``is_available`` reflects whether the
binary is present; ``calculate`` returns ``None`` on any failure."""
def is_available(self) -> bool: ...
async def calculate(self, path: Path) -> Fingerprint | None: ...
class AcoustIdClient(Protocol):
"""AcoustID lookup. ``is_available`` is False without an API key (the whole
fingerprint path is then skipped). ``lookup`` returns the best match or
``None`` (no result / service down), never raising."""
def is_available(self) -> bool: ...
async def lookup(self, fingerprint: Fingerprint) -> RecordingMatch | None: ...
+39
View File
@@ -0,0 +1,39 @@
"""Source-backend value objects — framework-free.
A *source* is a place tracks come from (a mounted folder, YouTube, an upload).
Backends are driven adapters (``app.infrastructure.sources``); these are the
shapes they speak in, and the ports they satisfy live in ``app.domain.ports``.
The first backend, ``local``, is *indexable*: it enumerates files already on
disk. Concrete metadata (artist/album/tags) is intentionally **not** resolved
here — a source yields a file plus a minimal title; enrichment (plan §6.2) fills
the rest later, so this stays a thin discovery layer (CLAUDE.md: no duplicated
business logic)."""
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True, slots=True)
class SourceInfo:
"""Describes a registered source for enumeration / health (UI, admin)."""
name: str
label: str
kind: str # "indexable" (more kinds — search/download — arrive with youtube)
available: bool
@dataclass(frozen=True, slots=True)
class SourceFile:
"""A single importable file discovered by an indexable source.
``source_id`` is stable per source (the local backend uses the path relative
to its root) so re-scans are idempotent — already-imported files are skipped.
"""
source_id: str
path: Path
suggested_title: str
file_format: str
file_size: int
+3
View File
@@ -18,6 +18,9 @@ class UserModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
# Admin is a single flag in Phase 1 — no role system (plan §3.5).
is_superuser: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
# Recoverable Subsonic app-password, Fernet-encrypted at rest. NULL until the
# user generates one. Never the argon2 login password — see core.security.
subsonic_password_enc: Mapped[str | None] = mapped_column(String(255), nullable=True)
class RefreshTokenModel(UUIDPrimaryKeyMixin, Base):
@@ -27,6 +27,42 @@ class SqlAlchemyAlbumRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def get_or_create(
self,
*,
title: str,
artist_id: uuid.UUID,
year: int | None,
musicbrainz_id: str | None,
) -> Album:
"""Resolve an album by ``(title, artist_id)``, creating it if absent.
Backfills ``year``/``musicbrainz_id`` onto an existing row when it lacks
them and enrichment now has values (gap-fill, never overwrite)."""
row = (
await self._session.execute(
select(AlbumModel).where(
AlbumModel.title == title,
AlbumModel.artist_id == artist_id,
)
)
).scalar_one_or_none()
if row is None:
row = AlbumModel(
title=title,
artist_id=artist_id,
year=year,
musicbrainz_id=musicbrainz_id,
)
self._session.add(row)
else:
if row.year is None and year is not None:
row.year = year
if row.musicbrainz_id is None and musicbrainz_id is not None:
row.musicbrainz_id = musicbrainz_id
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def get_by_id(self, album_id: uuid.UUID) -> Album | None:
row = await self._session.get(AlbumModel, album_id)
return _to_entity(row) if row is not None else None
@@ -76,12 +112,20 @@ class SqlAlchemyAlbumRepository:
q: str | None,
limit: int,
offset: int,
sort_by: str = "title",
order: str = "asc",
) -> list[Album]:
stmt = select(AlbumModel)
if artist_id is not None:
stmt = stmt.where(AlbumModel.artist_id == artist_id)
if q:
stmt = stmt.where(AlbumModel.title.ilike(f"%{q}%"))
stmt = stmt.order_by(AlbumModel.title).limit(limit).offset(offset)
if order == "random":
stmt = stmt.order_by(func.random())
else:
col = AlbumModel.created_at if sort_by == "created" else AlbumModel.title
stmt = stmt.order_by(col.desc() if order == "desc" else col.asc())
stmt = stmt.limit(limit).offset(offset)
rows = (await self._session.execute(stmt)).scalars().all()
return [_to_entity(r) for r in rows]
@@ -87,6 +87,21 @@ class SqlAlchemyTrackRepository:
await self._session.delete(row)
await self._session.flush()
async def genres(self) -> list[tuple[str, int]]:
"""Distinct non-null genres with their song counts, most common first.
Defined before ``list`` — the method named ``list`` shadows the builtin
in later annotations within the class body."""
rows = (
await self._session.execute(
select(TrackModel.genre, func.count(TrackModel.id).label("cnt"))
.where(TrackModel.genre.is_not(None))
.group_by(TrackModel.genre)
.order_by(func.count(TrackModel.id).desc())
)
).all()
return [(row.genre, row.cnt) for row in rows]
async def list(
self,
*,
@@ -158,3 +173,47 @@ class SqlAlchemyTrackRepository:
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def apply_enrichment(
self,
track_id: uuid.UUID,
*,
title: str,
artist_id: uuid.UUID,
album_id: uuid.UUID | None,
genre: str | None,
year: int | None,
track_number: int | None,
duration_seconds: int | None,
bitrate: int | None,
acoustid_fingerprint: str | None,
musicbrainz_id: str | None,
metadata_status: str,
) -> Track:
row = await self._session.get(TrackModel, track_id)
if row is None:
raise NotFoundError(f"Track {track_id} not found.")
# Identity + status are authoritative for an enrichment run.
row.title = title
row.artist_id = artist_id
row.metadata_status = metadata_status
# Nullable extras: fill gaps only — never erase data a prior run found.
if album_id is not None:
row.album_id = album_id
if genre is not None:
row.genre = genre
if year is not None:
row.year = year
if track_number is not None:
row.track_number = track_number
if duration_seconds is not None:
row.duration_seconds = duration_seconds
if bitrate is not None:
row.bitrate = bitrate
if acoustid_fingerprint is not None:
row.acoustid_fingerprint = acoustid_fingerprint
if musicbrainz_id is not None:
row.musicbrainz_id = musicbrainz_id
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
@@ -10,7 +10,7 @@ import uuid
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.entities import Credentials, User
from app.domain.entities import Credentials, SubsonicCredentials, User
from app.domain.errors import NotFoundError
from app.infrastructure.db.models import UserModel
@@ -91,3 +91,22 @@ class SqlAlchemyUserRepository:
return (
await self._session.execute(select(func.count()).select_from(UserModel))
).scalar_one()
async def get_subsonic_credentials_by_username(
self, username: str
) -> SubsonicCredentials | None:
row = (
await self._session.execute(select(UserModel).where(UserModel.username == username))
).scalar_one_or_none()
if row is None:
return None
return SubsonicCredentials(user=_to_entity(row), password_enc=row.subsonic_password_enc)
async def get_subsonic_password_enc(self, user_id: uuid.UUID) -> str | None:
row = await self._get_row(user_id)
return row.subsonic_password_enc
async def set_subsonic_password_enc(self, user_id: uuid.UUID, password_enc: str) -> None:
row = await self._get_row(user_id)
row.subsonic_password_enc = password_enc
await self._session.flush()
+1
View File
@@ -0,0 +1 @@
"""Metadata-enrichment adapters: tag reader, fingerprinter, AcoustID client."""
+129
View File
@@ -0,0 +1,129 @@
"""AcoustIdHttpClient — identifies a recording from its Chromaprint fingerprint.
One ``/v2/lookup`` call with ``meta=recordings+releasegroups`` returns the
AcoustID id, the MusicBrainz recording id, and canonical title/artist/album —
metadata that itself originates from MusicBrainz, so a separate MB call is not
needed for Phase 1 (plan §6.2 steps 2-3 collapsed into one request).
Graceful degradation: no API key → ``is_available()`` is False and the whole
fingerprint path is skipped; any network/parse error → ``lookup`` returns
``None``. A small inter-call delay keeps us within AcoustID's rate limit.
"""
import asyncio
import time
import httpx
from app.core.logging import get_logger
from app.domain.entities.metadata import Fingerprint, RecordingMatch
log = get_logger(__name__)
_DEFAULT_URL = "https://api.acoustid.org/v2/lookup"
_TIMEOUT_SECONDS = 10.0
_MIN_INTERVAL_SECONDS = 0.34 # AcoustID allows ~3 req/s; stay polite
class AcoustIdHttpClient:
"""Implements :class:`app.domain.ports.AcoustIdClient`."""
_throttle_lock = asyncio.Lock()
_last_call_monotonic = 0.0
def __init__(
self,
*,
api_key: str | None,
user_agent: str,
api_url: str = _DEFAULT_URL,
) -> None:
self._api_key = api_key
self._user_agent = user_agent
self._api_url = api_url
def is_available(self) -> bool:
return bool(self._api_key)
async def lookup(self, fingerprint: Fingerprint) -> RecordingMatch | None:
if not self._api_key:
return None
try:
await self._throttle()
async with httpx.AsyncClient(
timeout=_TIMEOUT_SECONDS,
headers={"User-Agent": self._user_agent},
) as client:
resp = await client.get(
self._api_url,
params={
"client": self._api_key,
"duration": str(fingerprint.duration_seconds),
"fingerprint": fingerprint.fingerprint,
"meta": "recordings releasegroups",
"format": "json",
},
)
resp.raise_for_status()
payload = resp.json()
except (httpx.HTTPError, ValueError):
log.warning("acoustid_lookup_failed")
return None
return _parse_best_match(payload)
@classmethod
async def _throttle(cls) -> None:
async with cls._throttle_lock:
elapsed = time.monotonic() - cls._last_call_monotonic
wait = _MIN_INTERVAL_SECONDS - elapsed
if wait > 0:
await asyncio.sleep(wait)
cls._last_call_monotonic = time.monotonic()
def _parse_best_match(payload: object) -> RecordingMatch | None:
if not isinstance(payload, dict) or payload.get("status") != "ok":
return None
results = payload.get("results")
if not isinstance(results, list) or not results:
return None
# Results are returned best-score-first; take the top scoring one.
best = max(results, key=lambda r: r.get("score", 0.0) if isinstance(r, dict) else 0.0)
if not isinstance(best, dict):
return None
acoustid = best.get("id")
if not isinstance(acoustid, str):
return None
score = float(best.get("score", 0.0))
recording_mbid: str | None = None
title: str | None = None
artist: str | None = None
album: str | None = None
recordings = best.get("recordings")
if isinstance(recordings, list) and recordings and isinstance(recordings[0], dict):
rec = recordings[0]
recording_mbid = rec.get("id") if isinstance(rec.get("id"), str) else None
title = rec.get("title") if isinstance(rec.get("title"), str) else None
artists = rec.get("artists")
if isinstance(artists, list) and artists and isinstance(artists[0], dict):
name = artists[0].get("name")
artist = name if isinstance(name, str) else None
groups = rec.get("releasegroups")
if isinstance(groups, list) and groups and isinstance(groups[0], dict):
gtitle = groups[0].get("title")
album = gtitle if isinstance(gtitle, str) else None
return RecordingMatch(
acoustid=acoustid,
score=score,
recording_mbid=recording_mbid,
title=title,
artist=artist,
album=album,
year=None,
)
@@ -0,0 +1,62 @@
"""FpcalcFingerprinter — Chromaprint fingerprint via the ``fpcalc`` binary.
``fpcalc -json <file>`` emits ``{"duration": float, "fingerprint": str}``. The
binary ships in the Docker image (``libchromaprint-tools``). Any failure (binary
missing, bad file, timeout) degrades to ``None`` — the pipeline then falls back
to tag-only metadata (plan §6.2: one external dependency must never crash it).
"""
import asyncio
import json
import shutil
from pathlib import Path
from app.core.logging import get_logger
from app.domain.entities.metadata import Fingerprint
log = get_logger(__name__)
_TIMEOUT_SECONDS = 30
class FpcalcFingerprinter:
"""Implements :class:`app.domain.ports.AudioFingerprinter`."""
def __init__(self, binary: str = "fpcalc") -> None:
self._binary = binary
def is_available(self) -> bool:
return shutil.which(self._binary) is not None
async def calculate(self, path: Path) -> Fingerprint | None:
if not self.is_available():
return None
try:
proc = await asyncio.create_subprocess_exec(
self._binary,
"-json",
str(path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
async with asyncio.timeout(_TIMEOUT_SECONDS):
stdout, _stderr = await proc.communicate()
except (TimeoutError, OSError):
log.warning("fpcalc_failed", path=str(path))
return None
if proc.returncode != 0:
log.warning("fpcalc_nonzero", path=str(path), returncode=proc.returncode)
return None
try:
data = json.loads(stdout)
fingerprint = str(data["fingerprint"])
duration = round(float(data["duration"]))
except (json.JSONDecodeError, KeyError, ValueError):
log.warning("fpcalc_bad_output", path=str(path))
return None
if not fingerprint or duration <= 0:
return None
return Fingerprint(fingerprint=fingerprint, duration_seconds=duration)
+88
View File
@@ -0,0 +1,88 @@
"""MutagenTagReader — reads embedded tags from a local audio file.
The offline first pass of enrichment (plan §6.2): well-tagged files get correct
artist/album/title without any network call. mutagen's ``easy=True`` mode
normalises tag keys across ID3 / Vorbis / MP4, so one code path covers all the
formats the library accepts. Parsing is blocking, so it runs in a worker thread.
"""
import re
from pathlib import Path
import anyio
from mutagen import File as MutagenFile # type: ignore[attr-defined]
from app.core.logging import get_logger
from app.domain.entities.metadata import AudioTags
log = get_logger(__name__)
_YEAR_RE = re.compile(r"(\d{4})")
def _first(value: object) -> str | None:
"""EasyXxx tags expose values as lists; take the first non-empty string."""
if isinstance(value, list):
value = value[0] if value else None
if value is None:
return None
text = str(value).strip()
return text or None
def _parse_year(value: object) -> int | None:
text = _first(value)
if text is None:
return None
m = _YEAR_RE.search(text)
return int(m.group(1)) if m else None
def _parse_track_number(value: object) -> int | None:
text = _first(value)
if text is None:
return None
# "3" or "3/12" → 3
head = text.split("/", 1)[0].strip()
return int(head) if head.isdigit() else None
class MutagenTagReader:
"""Implements :class:`app.domain.ports.AudioTagReader`."""
async def read(self, path: Path) -> AudioTags | None:
try:
return await anyio.to_thread.run_sync(self._read_sync, path)
except Exception:
log.warning("tag_read_failed", path=str(path))
return None
def _read_sync(self, path: Path) -> AudioTags | None:
audio = MutagenFile(str(path), easy=True)
if audio is None:
return None # unrecognised container
tags = audio.tags or {}
info = getattr(audio, "info", None)
duration = None
bitrate = None
if info is not None:
length = getattr(info, "length", None)
if length:
duration = round(float(length))
raw_bitrate = getattr(info, "bitrate", None)
if raw_bitrate:
bitrate = int(raw_bitrate) // 1000 # bits/s → kbps for display
return AudioTags(
title=_first(tags.get("title")),
artist=_first(tags.get("artist")),
album=_first(tags.get("album")),
album_artist=_first(tags.get("albumartist")),
genre=_first(tags.get("genre")),
year=_parse_year(tags.get("date") or tags.get("year")),
track_number=_parse_track_number(tags.get("tracknumber")),
duration_seconds=duration,
bitrate=bitrate,
)
+1
View File
@@ -0,0 +1 @@
"""Source backends — driven adapters that discover/fetch tracks."""
@@ -0,0 +1,60 @@
"""``local`` source — indexes audio files from a mounted folder.
Walks a configured root directory and yields each audio file as a
:class:`SourceFile`. It does **not** parse tags or resolve artist/album — that's
enrichment's job (plan §6.2); this stays a thin discovery layer. ``source_id``
is the path relative to the root, so re-scans are idempotent.
"""
import os
from collections.abc import Iterator
from pathlib import Path
from app.domain.sources import SourceFile, SourceInfo
from app.infrastructure.db.models.enums import TrackSource
# Extensions we treat as audio. Mirrors the formats StreamingService serves.
_AUDIO_EXTENSIONS = frozenset(
{"mp3", "flac", "m4a", "aac", "ogg", "opus", "wav", "wma", "aiff", "aif", "alac"}
)
class LocalFolderSource:
"""Implements :class:`app.domain.ports.IndexableSource`."""
name = TrackSource.LOCAL.value
def __init__(self, root: Path) -> None:
self._root = root
def info(self) -> SourceInfo:
return SourceInfo(
name=self.name,
label="Local folder",
kind="indexable",
available=self.is_available(),
)
def is_available(self) -> bool:
return self._root.is_dir()
def scan(self) -> Iterator[SourceFile]:
if not self.is_available():
return
for dirpath, _dirnames, filenames in os.walk(self._root):
for filename in sorted(filenames):
ext = Path(filename).suffix.lower().lstrip(".")
if ext not in _AUDIO_EXTENSIONS:
continue
path = Path(dirpath) / filename
try:
size = path.stat().st_size
except OSError:
continue # vanished/unreadable between walk and stat → skip
yield SourceFile(
source_id=path.relative_to(self._root).as_posix(),
path=path,
suggested_title=path.stem or "Unknown",
file_format=ext,
file_size=size,
)
+41
View File
@@ -0,0 +1,41 @@
"""Source registry — selection + enumeration of configured backends.
Built from settings at the composition root. Only sources that are configured
are registered (e.g. ``local`` appears only when ``LOCAL_MEDIA_IMPORT_PATH`` is
set), so enumeration reflects what the instance can actually use.
"""
from typing import cast
from app.core.config import Settings
from app.domain.errors import NotFoundError, ValidationError
from app.domain.ports import IndexableSource, SourceBackend
from app.domain.sources import SourceInfo
from app.infrastructure.sources.local_folder import LocalFolderSource
class SourceRegistry:
def __init__(self, backends: list[SourceBackend]) -> None:
self._by_name = {backend.name: backend for backend in backends}
def get(self, name: str) -> SourceBackend:
backend = self._by_name.get(name)
if backend is None:
raise NotFoundError(f"Source {name!r} is not configured.")
return backend
def indexable(self, name: str) -> IndexableSource:
backend = self.get(name)
if not hasattr(backend, "scan"):
raise ValidationError(f"Source {name!r} cannot be indexed.")
return cast(IndexableSource, backend)
def infos(self) -> list[SourceInfo]:
return [backend.info() for backend in self._by_name.values()]
def build_source_registry(settings: Settings) -> SourceRegistry:
backends: list[SourceBackend] = []
if settings.local_media_import_path is not None:
backends.append(LocalFolderSource(settings.local_media_import_path))
return SourceRegistry(backends)
+4 -6
View File
@@ -1,7 +1,7 @@
"""arq worker settings — the queue runtime. Task functions register here.
Run with: ``arq app.workers.arq_worker.WorkerSettings``.
Tasks (download, enrich, transcode) are appended to ``functions`` in later steps.
Tasks (download, transcode) are appended to ``functions`` in later steps.
"""
from typing import Any, ClassVar
@@ -10,6 +10,8 @@ from arq.connections import RedisSettings
from app.core.config import get_settings
from app.core.logging import configure_logging, get_logger
from app.workers.tasks.enrich_task import enrich_track
from app.workers.tasks.import_task import scan_local_folder
log = get_logger("worker")
@@ -24,12 +26,8 @@ async def shutdown(_ctx: dict[str, Any]) -> None:
log.info("worker_shutdown")
async def _noop(_ctx: dict[str, Any]) -> None:
pass
class WorkerSettings:
functions: ClassVar[list[Any]] = [_noop] # populated as tasks are implemented
functions: ClassVar[list[Any]] = [scan_local_folder, enrich_track]
on_startup = startup
on_shutdown = shutdown
max_jobs = get_settings().max_parallel_downloads
+49
View File
@@ -0,0 +1,49 @@
"""Enqueue helper — submit a job to the arq queue from the request cycle.
A short-lived pool per call keeps things simple (enqueues are rare, admin-driven
actions). Redis being down degrades to a clean 503 rather than a crash
(graceful degradation)."""
import uuid
from typing import Any
from arq import create_pool
from arq.connections import RedisSettings
from app.core.config import get_settings
from app.core.logging import get_logger
from app.domain.errors import DependencyUnavailableError
log = get_logger("worker.queue")
async def enqueue(function: str, **kwargs: Any) -> str:
"""Enqueue ``function`` by name, returning the job id. Raises
:class:`DependencyUnavailableError` if the queue can't be reached."""
settings = get_settings()
try:
pool = await create_pool(RedisSettings.from_dsn(str(settings.redis_url)))
except Exception as exc:
raise DependencyUnavailableError("Task queue (Redis) is unavailable.") from exc
try:
job = await pool.enqueue_job(function, **kwargs)
finally:
await pool.aclose()
if job is None:
raise DependencyUnavailableError("Could not enqueue job.")
return str(job.job_id)
async def enqueue_enrich(track_id: uuid.UUID) -> None:
"""Best-effort enqueue of metadata enrichment for a freshly stored track.
The track is already persisted, so enrichment is a follow-up, not a barrier:
if the queue is unreachable we log and move on (graceful degradation). The
track stays ``metadata_status=pending`` and can be re-enriched later.
Deferred a few seconds so the caller's DB transaction is committed before the
worker looks the track up (the upload request commits only after it returns)."""
try:
await enqueue("enrich_track", track_id=str(track_id), _defer_by=5)
except DependencyUnavailableError:
log.warning("enrich_enqueue_failed", track_id=str(track_id))
+1
View File
@@ -0,0 +1 @@
"""arq task functions. Registered in ``app.workers.arq_worker.WorkerSettings``."""
+56
View File
@@ -0,0 +1,56 @@
"""arq task: enrich one track's metadata (plan §6.2, §1D).
Wires the §6.2 pipeline adapters to :class:`MetadataEnrichmentService` and runs
it in the worker's own transactional session. Enqueued (deferred) after upload
and after a local-folder import. Idempotent and best-effort — a missing track or
a ``manual`` one is a clean no-op.
"""
import uuid
from typing import Any
from app.application.metadata_service import MetadataEnrichmentService
from app.core.config import get_settings
from app.core.logging import get_logger
from app.infrastructure.db import session_scope
from app.infrastructure.db.repositories import (
SqlAlchemyAlbumRepository,
SqlAlchemyArtistRepository,
SqlAlchemyTrackRepository,
)
from app.infrastructure.metadata.acoustid import AcoustIdHttpClient
from app.infrastructure.metadata.fingerprint import FpcalcFingerprinter
from app.infrastructure.metadata.tags import MutagenTagReader
from app.infrastructure.storage.provider import get_file_storage
log = get_logger("worker.enrich")
async def enrich_track(_ctx: dict[str, Any], *, track_id: str) -> dict[str, Any]:
settings = get_settings()
api_key = (
settings.acoustid_api_key.get_secret_value() if settings.acoustid_api_key else None
)
acoustid = AcoustIdHttpClient(
api_key=api_key,
user_agent=settings.musicbrainz_user_agent,
api_url=settings.acoustid_api_url,
)
async with session_scope() as session:
service = MetadataEnrichmentService(
tracks=SqlAlchemyTrackRepository(session),
artists=SqlAlchemyArtistRepository(session),
albums=SqlAlchemyAlbumRepository(session),
storage=get_file_storage(),
tag_reader=MutagenTagReader(),
fingerprinter=FpcalcFingerprinter(settings.fpcalc_path),
acoustid=acoustid,
)
result = await service.enrich(uuid.UUID(track_id))
return {
"track_id": str(result.track_id),
"status": result.status,
"mbid": result.matched_mbid,
}
+52
View File
@@ -0,0 +1,52 @@
"""arq task: scan an indexable source and import its files into the library.
Heavy work (directory walk + file copies) belongs off the request cycle
(CLAUDE.md). The HTTP endpoint enqueues this; the worker runs it with its own
transactional session.
"""
import uuid
from typing import Any
from app.application.import_service import LibraryImportService
from app.core.config import get_settings
from app.core.logging import get_logger
from app.infrastructure.db import session_scope
from app.infrastructure.db.repositories import (
SqlAlchemyArtistRepository,
SqlAlchemyTrackRepository,
)
from app.infrastructure.sources.registry import build_source_registry
from app.infrastructure.storage.provider import get_file_storage
from app.workers.queue import enqueue_enrich
log = get_logger("worker.import")
async def scan_local_folder(
_ctx: dict[str, Any], *, source: str = "local", added_by: str | None = None
) -> dict[str, Any]:
registry = build_source_registry(get_settings())
backend = registry.indexable(source)
actor = uuid.UUID(added_by) if added_by else None
async with session_scope() as session:
service = LibraryImportService(
tracks=SqlAlchemyTrackRepository(session),
artists=SqlAlchemyArtistRepository(session),
storage=get_file_storage(),
)
summary = await service.scan_and_import(backend, added_by=actor)
# Enqueue enrichment only after the import transaction has committed above,
# so the enrich worker is guaranteed to see the new rows.
for track_id in summary.imported_ids:
await enqueue_enrich(track_id)
return {
"source": summary.source,
"seen": summary.seen,
"imported": summary.imported,
"skipped": summary.skipped,
"failed": summary.failed,
}
+9
View File
@@ -21,8 +21,12 @@ dependencies = [
# auth
"pyjwt>=2.10",
"pwdlib[argon2]>=0.2.1",
# symmetric encryption for the recoverable Subsonic app-password (Fernet)
"cryptography>=44.0",
# outbound http (ML client, MusicBrainz, AcoustID)
"httpx>=0.28",
# embedded audio tag reading (enrichment tag pre-pass)
"mutagen>=1.47",
# S3-compatible object storage
"aioboto3>=13.0",
# logging
@@ -71,6 +75,11 @@ select = [
]
ignore = ["B008"] # FastAPI Depends() in defaults is idiomatic
[tool.ruff.lint.per-file-ignores]
# Subsonic query params are camelCase by spec (artistCount, songId, …); the
# handler arg names must match the wire names exactly.
"app/api/rest/*" = ["N803"]
[tool.mypy]
python_version = "3.14"
strict = true
+18 -1
View File
@@ -4,13 +4,14 @@ import datetime as dt
import uuid
from dataclasses import dataclass, replace
from app.domain.entities import Credentials, User
from app.domain.entities import Credentials, SubsonicCredentials, User
@dataclass
class _Stored:
user: User
password_hash: str
subsonic_password_enc: str | None = None
class InMemoryUserRepository:
@@ -61,6 +62,22 @@ class InMemoryUserRepository:
async def count(self) -> int:
return len(self._by_id)
async def get_subsonic_credentials_by_username(
self, username: str
) -> SubsonicCredentials | None:
for stored in self._by_id.values():
if stored.user.username == username:
return SubsonicCredentials(
user=stored.user, password_enc=stored.subsonic_password_enc
)
return None
async def get_subsonic_password_enc(self, user_id: uuid.UUID) -> str | None:
return self._by_id[user_id].subsonic_password_enc
async def set_subsonic_password_enc(self, user_id: uuid.UUID, password_enc: str) -> None:
self._by_id[user_id].subsonic_password_enc = password_enc
@dataclass
class _Token:
+75
View File
@@ -0,0 +1,75 @@
"""Unit tests for the AcoustID response parser — pure, no network."""
from app.infrastructure.metadata.acoustid import _parse_best_match
def _payload_with_results(results: list[object]) -> dict[str, object]:
return {"status": "ok", "results": results}
def test_parses_full_recording() -> None:
payload = _payload_with_results(
[
{
"id": "acoustid-1",
"score": 0.97,
"recordings": [
{
"id": "mb-rec-1",
"title": "One More Time",
"artists": [{"id": "a1", "name": "Daft Punk"}],
"releasegroups": [{"id": "rg1", "title": "Discovery"}],
}
],
}
]
)
match = _parse_best_match(payload)
assert match is not None
assert match.acoustid == "acoustid-1"
assert match.recording_mbid == "mb-rec-1"
assert match.title == "One More Time"
assert match.artist == "Daft Punk"
assert match.album == "Discovery"
assert match.score == 0.97
def test_picks_highest_score() -> None:
payload = _payload_with_results(
[
{"id": "low", "score": 0.40, "recordings": [{"id": "r-low", "title": "Low"}]},
{"id": "high", "score": 0.92, "recordings": [{"id": "r-high", "title": "High"}]},
]
)
match = _parse_best_match(payload)
assert match is not None
assert match.acoustid == "high"
assert match.title == "High"
def test_result_without_recordings_still_returns_id() -> None:
payload = _payload_with_results([{"id": "acoustid-only", "score": 0.5}])
match = _parse_best_match(payload)
assert match is not None
assert match.acoustid == "acoustid-only"
assert match.recording_mbid is None
assert match.title is None
def test_error_status_returns_none() -> None:
assert _parse_best_match({"status": "error", "error": {"message": "bad"}}) is None
def test_empty_results_returns_none() -> None:
assert _parse_best_match(_payload_with_results([])) is None
def test_non_dict_payload_returns_none() -> None:
assert _parse_best_match("nonsense") is None
assert _parse_best_match(None) is None
+157
View File
@@ -0,0 +1,157 @@
"""Unit tests for LibraryImportService — DB-free, in-memory fakes."""
import datetime as dt
import uuid
from collections.abc import Iterator
from pathlib import Path
import pytest
from app.application.import_service import LibraryImportService
from app.domain.entities import Artist, Track
from app.domain.sources import SourceFile, SourceInfo
pytestmark = pytest.mark.asyncio
class FakeArtistRepo:
def __init__(self) -> None:
self._by_name: dict[str, Artist] = {}
async def get_or_create(self, name: str) -> Artist:
if name not in self._by_name:
now = dt.datetime.now(dt.UTC)
self._by_name[name] = Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now)
return self._by_name[name]
class FakeTrackRepo:
def __init__(self, *, fail_on: set[str] | None = None) -> None:
self.by_source: dict[tuple[str, str], Track] = {}
self.added: list[Track] = []
self._fail_on = fail_on or set()
async def get_by_source(self, source: str, source_id: str) -> Track | None:
return self.by_source.get((source, source_id))
async def add(self, **kw: object) -> Track:
source_id = str(kw["source_id"])
if source_id in self._fail_on:
raise RuntimeError("simulated add failure")
now = dt.datetime.now(dt.UTC)
track = Track(
id=uuid.UUID(str(kw["id"])) if not isinstance(kw["id"], uuid.UUID) else kw["id"],
title=str(kw["title"]),
artist_id=kw["artist_id"], # type: ignore[arg-type]
album_id=None,
storage_uri=str(kw["storage_uri"]),
file_format=str(kw["file_format"]),
file_size=int(kw["file_size"]), # type: ignore[call-overload]
source=str(kw["source"]),
source_id=source_id,
duration_seconds=None,
genre=None,
year=None,
metadata_status=str(kw["metadata_status"]),
created_at=now,
updated_at=now,
)
self.by_source[(track.source, track.source_id)] = track
self.added.append(track)
return track
class FakeStorage:
def __init__(self) -> None:
self.saved: dict[str, Path] = {}
self.deleted: list[str] = []
async def save_file(self, key: str, src_path: Path) -> int:
self.saved[key] = src_path
return 1
async def delete(self, key: str) -> None:
self.deleted.append(key)
class FakeSource:
name = "local"
def __init__(self, files: list[SourceFile]) -> None:
self._files = files
def info(self) -> SourceInfo:
return SourceInfo(name=self.name, label="Local", kind="indexable", available=True)
def is_available(self) -> bool:
return True
def scan(self) -> Iterator[SourceFile]:
yield from self._files
def _file(source_id: str) -> SourceFile:
return SourceFile(
source_id=source_id,
path=Path("/music") / source_id,
suggested_title=Path(source_id).stem,
file_format="mp3",
file_size=123,
)
def _service(tracks: FakeTrackRepo, storage: FakeStorage) -> LibraryImportService:
return LibraryImportService(tracks=tracks, artists=FakeArtistRepo(), storage=storage) # type: ignore[arg-type]
async def test_imports_new_files() -> None:
tracks, storage = FakeTrackRepo(), FakeStorage()
source = FakeSource([_file("a.mp3"), _file("b/c.mp3")])
summary = await _service(tracks, storage).scan_and_import(source, added_by=None) # type: ignore[arg-type]
assert (summary.seen, summary.imported, summary.skipped, summary.failed) == (2, 2, 0, 0)
assert len(tracks.added) == 2
assert len(storage.saved) == 2
assert all(t.metadata_status == "pending" for t in tracks.added)
assert all(t.source == "local" for t in tracks.added)
async def test_dedup_skips_already_imported() -> None:
tracks, storage = FakeTrackRepo(), FakeStorage()
now = dt.datetime.now(dt.UTC)
tracks.by_source[("local", "a.mp3")] = Track(
id=uuid.uuid4(),
title="a",
artist_id=uuid.uuid4(),
album_id=None,
storage_uri="k",
file_format="mp3",
file_size=1,
source="local",
source_id="a.mp3",
duration_seconds=None,
genre=None,
year=None,
metadata_status="pending",
created_at=now,
updated_at=now,
)
source = FakeSource([_file("a.mp3"), _file("new.mp3")])
summary = await _service(tracks, storage).scan_and_import(source, added_by=None) # type: ignore[arg-type]
assert (summary.imported, summary.skipped) == (1, 1)
assert len(storage.saved) == 1 # only the new file copied
async def test_per_file_failure_is_isolated_and_rolls_back_storage() -> None:
tracks = FakeTrackRepo(fail_on={"bad.mp3"})
storage = FakeStorage()
source = FakeSource([_file("good.mp3"), _file("bad.mp3")])
summary = await _service(tracks, storage).scan_and_import(source, added_by=None) # type: ignore[arg-type]
assert (summary.seen, summary.imported, summary.failed) == (2, 1, 1)
# The failed import's copied file was cleaned up; the good one stays.
assert len(storage.deleted) == 1
assert len(tracks.added) == 1
+59
View File
@@ -0,0 +1,59 @@
"""Unit tests for the local-folder source + registry (no DB, no network)."""
from pathlib import Path
from app.core.config import Settings
from app.infrastructure.sources.local_folder import LocalFolderSource
from app.infrastructure.sources.registry import build_source_registry
def _settings(**overrides: object) -> Settings:
return Settings(**overrides) # type: ignore[arg-type]
def test_scan_discovers_audio_recursively(tmp_path: Path) -> None:
(tmp_path / "a.mp3").write_bytes(b"x")
(tmp_path / "sub").mkdir()
(tmp_path / "sub" / "b.flac").write_bytes(b"yy")
(tmp_path / "notes.txt").write_bytes(b"ignore me") # non-audio → skipped
files = list(LocalFolderSource(tmp_path).scan())
by_id = {f.source_id: f for f in files}
assert set(by_id) == {"a.mp3", "sub/b.flac"}
assert by_id["a.mp3"].file_format == "mp3"
assert by_id["a.mp3"].suggested_title == "a"
assert by_id["sub/b.flac"].file_format == "flac"
assert by_id["sub/b.flac"].file_size == 2
def test_source_id_is_stable_relative_path(tmp_path: Path) -> None:
(tmp_path / "x.opus").write_bytes(b"z")
[only] = list(LocalFolderSource(tmp_path).scan())
assert only.source_id == "x.opus"
assert only.path == tmp_path / "x.opus"
def test_is_available_false_when_missing(tmp_path: Path) -> None:
source = LocalFolderSource(tmp_path / "nope")
assert source.is_available() is False
assert list(source.scan()) == [] # scanning an unavailable source yields nothing
def test_info_reports_kind_and_availability(tmp_path: Path) -> None:
info = LocalFolderSource(tmp_path).info()
assert info.name == "local"
assert info.kind == "indexable"
assert info.available is True
def test_registry_registers_local_when_path_set(tmp_path: Path) -> None:
registry = build_source_registry(_settings(local_media_import_path=tmp_path))
names = {info.name for info in registry.infos()}
assert names == {"local"}
assert registry.indexable("local").is_available() is True
def test_registry_empty_when_path_unset() -> None:
registry = build_source_registry(_settings(local_media_import_path=None))
assert registry.infos() == []
+283
View File
@@ -0,0 +1,283 @@
"""Unit tests for MetadataEnrichmentService — DB-free, in-memory fakes.
Covers the §6.2 orchestration contract: tag-first merge, AcoustID fallback,
artist/album resolution, status transitions, and the hard invariants
(``manual`` untouched, graceful degradation, idempotent gap-fill).
"""
import datetime as dt
import uuid
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from pathlib import Path
import pytest
from app.application.metadata_service import MetadataEnrichmentService
from app.domain.entities import Artist, Track
from app.domain.entities.album import Album
from app.domain.entities.metadata import AudioTags, Fingerprint, RecordingMatch
pytestmark = pytest.mark.asyncio
_UNKNOWN = "Unknown Artist"
def _track(*, metadata_status: str = "pending", title: str = "raw-stem") -> Track:
now = dt.datetime.now(dt.UTC)
return Track(
id=uuid.uuid4(),
title=title,
artist_id=uuid.uuid4(), # the "Unknown Artist" id
album_id=None,
storage_uri="tracks/aa/song.mp3",
file_format="mp3",
file_size=123,
source="upload",
source_id="deadbeef",
duration_seconds=None,
genre=None,
year=None,
metadata_status=metadata_status,
created_at=now,
updated_at=now,
)
class FakeTrackRepo:
def __init__(self, track: Track | None) -> None:
self._track = track
self.applied: dict[str, object] | None = None
async def get_by_id(self, track_id: uuid.UUID) -> Track | None:
return self._track
async def apply_enrichment(self, track_id: uuid.UUID, **kw: object) -> Track:
self.applied = kw
return self._track # type: ignore[return-value]
class FakeArtistRepo:
def __init__(self) -> None:
self.created: list[str] = []
async def get_or_create(self, name: str) -> Artist:
self.created.append(name)
now = dt.datetime.now(dt.UTC)
return Artist(id=uuid.uuid4(), name=name, created_at=now, updated_at=now)
class FakeAlbumRepo:
def __init__(self) -> None:
self.created: list[tuple[str, uuid.UUID]] = []
async def get_or_create(
self, *, title: str, artist_id: uuid.UUID, year: int | None, musicbrainz_id: str | None
) -> Album:
self.created.append((title, artist_id))
now = dt.datetime.now(dt.UTC)
return Album(
id=uuid.uuid4(),
title=title,
artist_id=artist_id,
year=year,
cover_path=None,
musicbrainz_id=musicbrainz_id,
created_at=now,
updated_at=now,
)
class FakeStorage:
@asynccontextmanager
async def as_local_path(self, key: str) -> AsyncIterator[Path]:
yield Path("/tmp") / key
class FakeTagReader:
def __init__(self, tags: AudioTags | None) -> None:
self._tags = tags
async def read(self, path: Path) -> AudioTags | None:
return self._tags
class FakeFingerprinter:
def __init__(self, fp: Fingerprint | None, *, available: bool = True) -> None:
self._fp = fp
self._available = available
def is_available(self) -> bool:
return self._available
async def calculate(self, path: Path) -> Fingerprint | None:
return self._fp
class FakeAcoustId:
def __init__(self, match: RecordingMatch | None, *, available: bool = True) -> None:
self._match = match
self._available = available
self.calls = 0
def is_available(self) -> bool:
return self._available
async def lookup(self, fingerprint: Fingerprint) -> RecordingMatch | None:
self.calls += 1
return self._match
def _service(
*,
track: Track | None,
tags: AudioTags | None = None,
fp: Fingerprint | None = None,
match: RecordingMatch | None = None,
fp_available: bool = True,
acoustid_available: bool = True,
) -> tuple[MetadataEnrichmentService, FakeTrackRepo, FakeArtistRepo, FakeAlbumRepo, FakeAcoustId]:
tracks = FakeTrackRepo(track)
artists = FakeArtistRepo()
albums = FakeAlbumRepo()
acoustid = FakeAcoustId(match, available=acoustid_available)
service = MetadataEnrichmentService(
tracks=tracks, # type: ignore[arg-type]
artists=artists, # type: ignore[arg-type]
albums=albums, # type: ignore[arg-type]
storage=FakeStorage(), # type: ignore[arg-type]
tag_reader=FakeTagReader(tags), # type: ignore[arg-type]
fingerprinter=FakeFingerprinter(fp, available=fp_available), # type: ignore[arg-type]
acoustid=acoustid, # type: ignore[arg-type]
)
return service, tracks, artists, albums, acoustid
async def test_tags_only_enriches_and_relinks_artist_and_album() -> None:
track = _track()
tags = AudioTags(
title="Real Title",
artist="Pink Floyd",
album="The Wall",
genre="Rock",
year=1979,
track_number=1,
duration_seconds=222,
)
service, tracks, artists, albums, acoustid = _service(track=track, tags=tags)
result = await service.enrich(track.id)
assert result.status == "enriched"
assert acoustid.calls == 0 # no fingerprint → no lookup needed
assert "Pink Floyd" in artists.created
assert albums.created and albums.created[0][0] == "The Wall"
applied = tracks.applied
assert applied is not None
assert applied["title"] == "Real Title"
assert applied["genre"] == "Rock"
assert applied["year"] == 1979
assert applied["track_number"] == 1
assert applied["duration_seconds"] == 222
assert applied["metadata_status"] == "enriched"
async def test_manual_track_is_never_touched() -> None:
track = _track(metadata_status="manual")
service, tracks, _, _, _ = _service(track=track, tags=AudioTags(artist="X"))
result = await service.enrich(track.id)
assert result.status == "skipped"
assert tracks.applied is None # nothing written
async def test_missing_track_is_a_clean_noop() -> None:
service, tracks, _, _, _ = _service(track=None)
result = await service.enrich(uuid.uuid4())
assert result.status == "skipped"
assert tracks.applied is None
async def test_nothing_found_marks_failed() -> None:
track = _track()
# No tags, no fingerprint → no identity at all.
service, tracks, artists, albums, _acoustid = _service(track=track, tags=None, fp=None)
result = await service.enrich(track.id)
assert result.status == "failed"
assert artists.created == [] # artist stays the original unknown
assert albums.created == []
applied = tracks.applied
assert applied is not None
assert applied["artist_id"] == track.artist_id # fallback kept
assert applied["metadata_status"] == "failed"
async def test_acoustid_path_fills_when_tags_absent() -> None:
track = _track()
fp = Fingerprint(fingerprint="AQAAxyz", duration_seconds=200)
match = RecordingMatch(
acoustid="acoustid-uuid",
score=0.95,
recording_mbid="mb-recording-id",
title="Identified Title",
artist="Daft Punk",
album="Discovery",
)
service, tracks, artists, _albums, acoustid = _service(
track=track, tags=None, fp=fp, match=match
)
result = await service.enrich(track.id)
assert result.status == "enriched"
assert result.matched_mbid == "mb-recording-id"
assert acoustid.calls == 1
applied = tracks.applied
assert applied is not None
assert applied["title"] == "Identified Title"
assert applied["musicbrainz_id"] == "mb-recording-id"
assert applied["acoustid_fingerprint"] == "acoustid-uuid"
assert "Daft Punk" in artists.created
async def test_tags_win_over_acoustid_for_overlapping_fields() -> None:
track = _track()
fp = Fingerprint(fingerprint="AQAA", duration_seconds=200)
tags = AudioTags(title="Tagged Title", artist="Tagged Artist")
match = RecordingMatch(
acoustid="aid",
score=0.9,
recording_mbid="mbid",
title="AcoustID Title",
artist="AcoustID Artist",
)
service, tracks, artists, _albums, _acoustid = _service(
track=track, tags=tags, fp=fp, match=match
)
await service.enrich(track.id)
applied = tracks.applied
assert applied is not None
assert applied["title"] == "Tagged Title" # tag preferred
assert "Tagged Artist" in artists.created
# but the MBID from AcoustID is still captured
assert applied["musicbrainz_id"] == "mbid"
async def test_fingerprint_skipped_when_acoustid_unavailable() -> None:
track = _track()
fp = Fingerprint(fingerprint="AQAA", duration_seconds=200)
service, _tracks, _artists, _albums, acoustid = _service(
track=track, tags=AudioTags(artist="Tagged"), fp=fp, acoustid_available=False
)
result = await service.enrich(track.id)
# tags still enrich, but no AcoustID call is attempted
assert acoustid.calls == 0
assert result.status == "enriched"
+151
View File
@@ -0,0 +1,151 @@
"""Integration tests for sources: enumeration + the real import path.
Requires a reachable Postgres; skips otherwise. The scan worker task is invoked
directly (no Redis needed) so the full DB + storage import path is covered.
"""
import asyncio
import os
from collections.abc import AsyncIterator
from pathlib import Path
import pytest
from app.core.config import get_settings
from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
from app.infrastructure.db.repositories import (
SqlAlchemyRefreshTokenRepository,
SqlAlchemyUserRepository,
)
from asgi_lifespan import LifespanManager
from httpx import ASGITransport, AsyncClient
pytestmark = pytest.mark.asyncio
_db_reachable_cache: bool | None = None
async def _db_reachable() -> bool:
global _db_reachable_cache
if _db_reachable_cache is not None:
return _db_reachable_cache
from sqlalchemy import text
try:
async with asyncio.timeout(3):
async with get_engine().connect() as conn:
await conn.execute(text("SELECT 1"))
_db_reachable_cache = True
except Exception:
_db_reachable_cache = False
return _db_reachable_cache
@pytest.fixture
async def api(tmp_path: Path) -> AsyncIterator[AsyncClient]:
if not await _db_reachable():
pytest.skip("Postgres not reachable — integration test skipped.")
media = tmp_path / "media"
music = tmp_path / "music"
media.mkdir()
music.mkdir()
# Two audio files (+ a non-audio file that must be ignored) in a subfolder.
(music / "one.mp3").write_bytes(b"first track bytes" * 8)
(music / "artist").mkdir()
(music / "artist" / "two.flac").write_bytes(b"second track bytes" * 8)
(music / "cover.txt").write_bytes(b"not audio")
os.environ["MEDIA_PATH"] = str(media)
os.environ["LOCAL_MEDIA_IMPORT_PATH"] = str(music)
get_settings.cache_clear()
import app.infrastructure.storage.provider as _storage_provider
_storage_provider._storage = None
try:
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
from app.application.user_service import UserService
from app.core.security import Argon2PasswordHasher
async with session_scope() as session:
await UserService(
users=SqlAlchemyUserRepository(session),
refresh_tokens=SqlAlchemyRefreshTokenRepository(session),
hasher=Argon2PasswordHasher(),
).create_user(username="admin", password="adminpass1", is_superuser=True)
from app.main import create_app
app = create_app()
async with LifespanManager(app):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await dispose_engine()
finally:
_storage_provider._storage = None
os.environ.pop("MEDIA_PATH", None)
os.environ.pop("LOCAL_MEDIA_IMPORT_PATH", None)
get_settings.cache_clear()
async def _login(api: AsyncClient) -> str:
resp = await api.post(
"/api/v1/auth/login", json={"username": "admin", "password": "adminpass1"}
)
assert resp.status_code == 200
return str(resp.json()["access_token"])
async def test_list_sources_includes_local(api: AsyncClient) -> None:
token = await _login(api)
resp = await api.get("/api/v1/sources", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
sources = {s["name"]: s for s in resp.json()}
assert "local" in sources
assert sources["local"]["available"] is True
assert sources["local"]["kind"] == "indexable"
async def test_local_import_creates_streamable_tracks(api: AsyncClient) -> None:
token = await _login(api)
headers = {"Authorization": f"Bearer {token}"}
# Run the worker task directly (bypasses Redis); it imports against the DB.
from app.workers.tasks.import_task import scan_local_folder
summary = await scan_local_folder({}, source="local", added_by=None)
assert summary["seen"] == 2
assert summary["imported"] == 2
assert summary["failed"] == 0
# A second run is idempotent — everything already indexed.
again = await scan_local_folder({}, source="local", added_by=None)
assert again["imported"] == 0
assert again["skipped"] == 2
listing = await api.get("/api/v1/tracks", headers=headers)
assert listing.status_code == 200
items = listing.json()["items"]
assert len(items) == 2
titles = {t["title"] for t in items}
assert titles == {"one", "two"}
# And the imported file actually streams back.
track_id = items[0]["id"]
stream = await api.get(f"/api/v1/stream/{track_id}", headers=headers)
assert stream.status_code == 200
assert len(stream.content) > 0
async def test_scan_requires_admin(api: AsyncClient) -> None:
# The scan endpoint enqueues to Redis; here we only assert it's admin-gated.
resp = await api.post("/api/v1/sources/local/scan")
assert resp.status_code == 401
+236
View File
@@ -0,0 +1,236 @@
"""Integration tests for the Subsonic /rest layer (happy path per endpoint group).
Requires a reachable Postgres; skips otherwise (mirrors test_upload_stream_api).
Drives the real ASGI app: seeds a user + a track, mints a Subsonic app-password
via the native API, then exercises /rest with real query-string auth.
"""
import asyncio
import os
from collections.abc import AsyncIterator
from pathlib import Path
from xml.etree import ElementTree as ET
import pytest
from app.core.config import get_settings
from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
from app.infrastructure.db.repositories import (
SqlAlchemyRefreshTokenRepository,
SqlAlchemyUserRepository,
)
from asgi_lifespan import LifespanManager
from httpx import ASGITransport, AsyncClient
pytestmark = pytest.mark.asyncio
_db_reachable_cache: bool | None = None
async def _db_reachable() -> bool:
global _db_reachable_cache
if _db_reachable_cache is not None:
return _db_reachable_cache
from sqlalchemy import text
try:
async with asyncio.timeout(3):
async with get_engine().connect() as conn:
await conn.execute(text("SELECT 1"))
_db_reachable_cache = True
except Exception:
_db_reachable_cache = False
return _db_reachable_cache
@pytest.fixture
async def api(tmp_path: Path) -> AsyncIterator[AsyncClient]:
if not await _db_reachable():
pytest.skip("Postgres not reachable — integration test skipped.")
os.environ["MEDIA_PATH"] = str(tmp_path)
get_settings.cache_clear()
import app.infrastructure.storage.provider as _storage_provider
_storage_provider._storage = None
try:
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
from app.application.user_service import UserService
from app.core.security import Argon2PasswordHasher
async with session_scope() as session:
await UserService(
users=SqlAlchemyUserRepository(session),
refresh_tokens=SqlAlchemyRefreshTokenRepository(session),
hasher=Argon2PasswordHasher(),
).create_user(username="testuser", password="testpass1", is_superuser=False)
from app.main import create_app
app = create_app()
async with LifespanManager(app):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await dispose_engine()
finally:
_storage_provider._storage = None
os.environ.pop("MEDIA_PATH", None)
get_settings.cache_clear()
async def _login(api: AsyncClient) -> str:
resp = await api.post(
"/api/v1/auth/login", json={"username": "testuser", "password": "testpass1"}
)
assert resp.status_code == 200
return str(resp.json()["access_token"])
async def _subsonic_password(api: AsyncClient, token: str) -> str:
resp = await api.get(
"/api/v1/users/me/subsonic-password", headers={"Authorization": f"Bearer {token}"}
)
assert resp.status_code == 200, resp.text
return str(resp.json()["password"])
async def _seed_track(api: AsyncClient, token: str) -> str:
resp = await api.post(
"/api/v1/upload",
files={"file": ("song.mp3", b"audio bytes for subsonic" * 20, "audio/mpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 200, resp.text
return str(resp.json()["track_id"])
def _auth_params(password: str) -> dict[str, str]:
# Legacy plaintext password auth (p=) keeps the test simple; t+s is covered
# by the auth-service unit tests.
return {"u": "testuser", "p": password, "c": "pytest", "v": "1.16.1", "f": "json"}
async def _setup(api: AsyncClient) -> tuple[dict[str, str], str]:
token = await _login(api)
password = await _subsonic_password(api, token)
track_id = await _seed_track(api, token)
return _auth_params(password), track_id
async def test_ping_ok(api: AsyncClient) -> None:
params, _ = await _setup(api)
resp = await api.get("/rest/ping", params=params)
assert resp.status_code == 200
assert resp.json()["subsonic-response"]["status"] == "ok"
async def test_ping_bad_credentials_returns_code_40(api: AsyncClient) -> None:
await _setup(api)
resp = await api.get(
"/rest/ping",
params={"u": "testuser", "p": "wrong", "c": "pytest", "v": "1.16.1", "f": "json"},
)
# Subsonic errors are HTTP 200 with the failure inside the envelope.
assert resp.status_code == 200
body = resp.json()["subsonic-response"]
assert body["status"] == "failed"
assert body["error"]["code"] == 40
async def test_ping_xml_default(api: AsyncClient) -> None:
params, _ = await _setup(api)
xml_params = {k: v for k, v in params.items() if k != "f"}
resp = await api.get("/rest/ping", params=xml_params)
assert resp.status_code == 200
assert resp.headers["content-type"].startswith("application/xml")
root = ET.fromstring(resp.content)
assert root.attrib["status"] == "ok"
async def test_get_artists(api: AsyncClient) -> None:
params, _ = await _setup(api)
resp = await api.get("/rest/getArtists", params=params)
body = resp.json()["subsonic-response"]
assert body["status"] == "ok"
assert "artists" in body
async def test_get_album_list2(api: AsyncClient) -> None:
params, _ = await _setup(api)
resp = await api.get("/rest/getAlbumList2", params={**params, "type": "newest"})
body = resp.json()["subsonic-response"]
assert body["status"] == "ok"
assert "albumList2" in body
async def test_search3_finds_song(api: AsyncClient) -> None:
params, track_id = await _setup(api)
resp = await api.get("/rest/search3", params={**params, "query": "song"})
result = resp.json()["subsonic-response"]["searchResult3"]
song_ids = [s["id"] for s in result.get("song", [])]
assert f"tr-{track_id}" in song_ids
async def test_get_song(api: AsyncClient) -> None:
params, track_id = await _setup(api)
resp = await api.get("/rest/getSong", params={**params, "id": f"tr-{track_id}"})
song = resp.json()["subsonic-response"]["song"]
assert song["id"] == f"tr-{track_id}"
async def test_stream_returns_audio(api: AsyncClient) -> None:
params, track_id = await _setup(api)
resp = await api.get("/rest/stream", params={**params, "id": f"tr-{track_id}"})
assert resp.status_code == 200
assert resp.headers["content-type"].startswith("audio/")
assert resp.content == b"audio bytes for subsonic" * 20
async def test_get_cover_art_placeholder(api: AsyncClient) -> None:
params, track_id = await _setup(api)
resp = await api.get("/rest/getCoverArt", params={**params, "id": f"tr-{track_id}"})
assert resp.status_code == 200
assert resp.headers["content-type"] == "image/png"
async def test_playlist_lifecycle(api: AsyncClient) -> None:
params, track_id = await _setup(api)
created = await api.get(
"/rest/createPlaylist", params={**params, "name": "Roadtrip", "songId": f"tr-{track_id}"}
)
playlist = created.json()["subsonic-response"]["playlist"]
assert playlist["name"] == "Roadtrip"
assert playlist["songCount"] == 1
playlist_id = playlist["id"]
listed = await api.get("/rest/getPlaylists", params=params)
names = [p["name"] for p in listed.json()["subsonic-response"]["playlists"]["playlist"]]
assert "Roadtrip" in names
deleted = await api.get("/rest/deletePlaylist", params={**params, "id": playlist_id})
assert deleted.json()["subsonic-response"]["status"] == "ok"
async def test_star_and_scrobble(api: AsyncClient) -> None:
params, track_id = await _setup(api)
star = await api.get("/rest/star", params={**params, "id": f"tr-{track_id}"})
assert star.json()["subsonic-response"]["status"] == "ok"
scrobble = await api.get(
"/rest/scrobble", params={**params, "id": f"tr-{track_id}", "submission": "true"}
)
assert scrobble.json()["subsonic-response"]["status"] == "ok"
# The like landed in the append-only log → it surfaces via the native API.
token = await _login(api)
likes = await api.get("/api/v1/likes", headers={"Authorization": f"Bearer {token}"})
assert any(item["id"] == track_id for item in likes.json()["items"])
+131
View File
@@ -0,0 +1,131 @@
"""Unit tests for SubsonicAuthService — verification + app-password lifecycle.
DB-free: uses the in-memory user repository and a real cipher.
"""
import hashlib
import pytest
from app.application.subsonic_auth_service import SubsonicAuthService
from app.core.security import SubsonicPasswordCipher
from app.domain.errors import AuthenticationError, ValidationError
from tests.fakes import InMemoryUserRepository
pytestmark = pytest.mark.asyncio
_KNOWN_PASSWORD = "s3cret-app-password"
def _md5(value: str) -> str:
return hashlib.md5(value.encode(), usedforsecurity=False).hexdigest()
async def _service_with_user(*, password: str | None = _KNOWN_PASSWORD, active: bool = True):
users = InMemoryUserRepository()
cipher = SubsonicPasswordCipher("test-key")
user = await users.add(username="alice", password_hash="x", is_superuser=False)
if not active:
await users.set_active(user.id, False)
if password is not None:
await users.set_subsonic_password_enc(user.id, cipher.encrypt(password))
service = SubsonicAuthService(users=users, cipher=cipher)
return service, user
async def test_authenticate_token_salt_success() -> None:
service, user = await _service_with_user()
salt = "abcdef"
token = _md5(_KNOWN_PASSWORD + salt)
result = await service.authenticate(username="alice", token=token, salt=salt, password=None)
assert result.id == user.id
async def test_authenticate_plain_password_success() -> None:
service, user = await _service_with_user()
result = await service.authenticate(
username="alice", token=None, salt=None, password=_KNOWN_PASSWORD
)
assert result.id == user.id
async def test_authenticate_enc_password_success() -> None:
service, user = await _service_with_user()
enc = "enc:" + _KNOWN_PASSWORD.encode().hex()
result = await service.authenticate(username="alice", token=None, salt=None, password=enc)
assert result.id == user.id
async def test_authenticate_wrong_token_fails() -> None:
service, _ = await _service_with_user()
with pytest.raises(AuthenticationError):
await service.authenticate(
username="alice", token=_md5("wrong" + "abc"), salt="abc", password=None
)
async def test_authenticate_wrong_password_fails() -> None:
service, _ = await _service_with_user()
with pytest.raises(AuthenticationError):
await service.authenticate(username="alice", token=None, salt=None, password="nope")
async def test_authenticate_unknown_user_fails() -> None:
service, _ = await _service_with_user()
with pytest.raises(AuthenticationError):
await service.authenticate(
username="ghost", token=None, salt=None, password=_KNOWN_PASSWORD
)
async def test_authenticate_inactive_user_fails() -> None:
service, _ = await _service_with_user(active=False)
with pytest.raises(AuthenticationError):
await service.authenticate(
username="alice", token=None, salt=None, password=_KNOWN_PASSWORD
)
async def test_authenticate_no_password_set_fails() -> None:
service, _ = await _service_with_user(password=None)
with pytest.raises(AuthenticationError):
await service.authenticate(
username="alice", token=None, salt=None, password=_KNOWN_PASSWORD
)
async def test_authenticate_missing_username_is_validation_error() -> None:
service, _ = await _service_with_user()
with pytest.raises(ValidationError):
await service.authenticate(username=None, token=None, salt=None, password=_KNOWN_PASSWORD)
async def test_authenticate_missing_credentials_is_validation_error() -> None:
service, _ = await _service_with_user()
with pytest.raises(ValidationError):
await service.authenticate(username="alice", token=None, salt=None, password=None)
async def test_rotate_then_authenticate() -> None:
users = InMemoryUserRepository()
cipher = SubsonicPasswordCipher("test-key")
user = await users.add(username="bob", password_hash="x", is_superuser=False)
service = SubsonicAuthService(users=users, cipher=cipher)
password = await service.rotate(user.id)
result = await service.authenticate(username="bob", token=None, salt=None, password=password)
assert result.id == user.id
async def test_reveal_generates_then_is_stable() -> None:
users = InMemoryUserRepository()
cipher = SubsonicPasswordCipher("test-key")
user = await users.add(username="cara", password_hash="x", is_superuser=False)
service = SubsonicAuthService(users=users, cipher=cipher)
first = await service.reveal(user.id)
second = await service.reveal(user.id)
assert first == second # lazily generated once, then stable
rotated = await service.rotate(user.id)
assert rotated != first
+72
View File
@@ -0,0 +1,72 @@
"""Unit tests for the Subsonic response envelope (XML + JSON shapes)."""
import json
from xml.etree import ElementTree as ET
from app.api.rest.envelope import (
SUBSONIC_API_VERSION,
subsonic_error,
subsonic_response,
)
def _xml_root(body: bytes) -> ET.Element:
return ET.fromstring(body)
def _local(tag: str) -> str:
return tag.rsplit("}", 1)[-1] # strip namespace
def test_ok_xml_shape() -> None:
resp = subsonic_response({"license": {"valid": True}}, fmt="xml")
assert resp.media_type.startswith("application/xml")
root = _xml_root(resp.body)
assert _local(root.tag) == "subsonic-response"
assert root.attrib["status"] == "ok"
assert root.attrib["version"] == SUBSONIC_API_VERSION
assert root.attrib["type"] == "mcma"
child = root[0]
assert _local(child.tag) == "license"
assert child.attrib["valid"] == "true"
def test_ok_json_shape() -> None:
resp = subsonic_response({"license": {"valid": True}}, fmt="json")
assert resp.media_type.startswith("application/json")
payload = json.loads(resp.body)["subsonic-response"]
assert payload["status"] == "ok"
assert payload["version"] == SUBSONIC_API_VERSION
assert payload["type"] == "mcma"
assert payload["license"] == {"valid": True}
def test_error_xml_shape() -> None:
resp = subsonic_error(40, "Wrong username or password.", fmt="xml")
root = _xml_root(resp.body)
assert root.attrib["status"] == "failed"
error = root[0]
assert _local(error.tag) == "error"
assert error.attrib["code"] == "40"
assert error.attrib["message"] == "Wrong username or password."
def test_error_json_shape() -> None:
resp = subsonic_error(70, "Not found.", fmt="json")
payload = json.loads(resp.body)["subsonic-response"]
assert payload["status"] == "failed"
assert payload["error"] == {"code": 70, "message": "Not found."}
def test_default_format_is_xml() -> None:
resp = subsonic_response(fmt=None)
assert resp.media_type.startswith("application/xml")
assert _xml_root(resp.body).attrib["status"] == "ok"
def test_list_renders_repeated_elements() -> None:
payload = {"genres": {"genre": [{"value": "Rock"}, {"value": "Jazz"}]}}
root = _xml_root(subsonic_response(payload, fmt="xml").body)
genres = root[0]
values = [g.text for g in genres]
assert values == ["Rock", "Jazz"]
+70
View File
@@ -0,0 +1,70 @@
"""Unit tests for Subsonic crypto + id helpers (no DB, no network)."""
import hashlib
import uuid
import pytest
from app.api.rest import ids
from app.api.rest.ids import IdKind
from app.core.security import SubsonicPasswordCipher, generate_subsonic_password
from app.domain.errors import AuthenticationError, NotFoundError
def test_generate_subsonic_password_is_long_and_unique() -> None:
a = generate_subsonic_password()
b = generate_subsonic_password()
assert a != b
assert len(a) >= 20
def test_cipher_roundtrip() -> None:
cipher = SubsonicPasswordCipher("a-secret-key")
plaintext = generate_subsonic_password()
token = cipher.encrypt(plaintext)
assert token != plaintext
assert cipher.decrypt(token) == plaintext
def test_cipher_token_then_md5_matches() -> None:
"""The decrypted app-password must reproduce a client's t=md5(password+salt)."""
cipher = SubsonicPasswordCipher("a-secret-key")
password = generate_subsonic_password()
enc = cipher.encrypt(password)
salt = "c19b2d"
decrypted = cipher.decrypt(enc)
expected = hashlib.md5((decrypted + salt).encode(), usedforsecurity=False).hexdigest()
client_token = hashlib.md5((password + salt).encode(), usedforsecurity=False).hexdigest()
assert expected == client_token
def test_cipher_wrong_key_fails() -> None:
token = SubsonicPasswordCipher("key-one").encrypt("hunter2")
with pytest.raises(AuthenticationError):
SubsonicPasswordCipher("key-two").decrypt(token)
def test_id_encode_decode_roundtrip() -> None:
value = uuid.uuid4()
assert ids.decode_track(ids.encode_track(value)) == value
assert ids.decode_album(ids.encode_album(value)) == value
assert ids.decode_artist(ids.encode_artist(value)) == value
assert ids.decode_playlist(ids.encode_playlist(value)) == value
def test_id_parse_returns_kind() -> None:
value = uuid.uuid4()
kind, parsed = ids.parse(ids.encode_album(value))
assert kind is IdKind.ALBUM
assert parsed == value
def test_id_wrong_prefix_rejected() -> None:
track = ids.encode_track(uuid.uuid4())
with pytest.raises(NotFoundError):
ids.decode_album(track)
def test_id_malformed_rejected() -> None:
with pytest.raises(NotFoundError):
ids.parse("not-a-real-id")
+27
View File
@@ -0,0 +1,27 @@
"""Unit tests for the mutagen tag-parsing helpers — pure, no files."""
from app.infrastructure.metadata.tags import _first, _parse_track_number, _parse_year
def test_first_takes_head_of_list() -> None:
assert _first(["Pink Floyd", "other"]) == "Pink Floyd"
assert _first("Solo") == "Solo"
assert _first([]) is None
assert _first(None) is None
assert _first([" "]) is None # whitespace-only → None
def test_parse_year_extracts_four_digits() -> None:
assert _parse_year(["1979"]) == 1979
assert _parse_year(["1979-01-02"]) == 1979
assert _parse_year("2021-12") == 2021
assert _parse_year(["no year"]) is None
assert _parse_year(None) is None
def test_parse_track_number_handles_slash_form() -> None:
assert _parse_track_number(["3/12"]) == 3
assert _parse_track_number(["7"]) == 7
assert _parse_track_number("1/10") == 1
assert _parse_track_number(["A1"]) is None
assert _parse_track_number(None) is None
Generated
+783 -710
View File
File diff suppressed because it is too large Load Diff