Compare commits

..

3 Commits

Author SHA1 Message Date
Senko-san 551afbab13 feat(subsonic): browsing, search, media, playlist, annotation endpoints
Docker Build & Publish / build (push) Has been cancelled
Docker Build & Publish / push (push) Has been cancelled
Docker Build & Publish / Prune old image versions (push) Has been cancelled
Thin adapters over the existing services/repositories (no business logic):

- system: ping (auth check), getLicense
- browsing: getArtists/getArtist/getAlbum, getAlbumList(2) (newest/alpha/random),
  getSong, getGenres, getMusicFolders/getIndexes/getMusicDirectory (one folder)
- search: search3 (delegates to the library repos)
- media: stream + download (reuse StreamingService, honor Range); getCoverArt
  returns a placeholder until the cover pipeline lands
- playlists: get/create/update/delete over the playlist repo (owner-scoped)
- annotation: star/unstar → append-only like log, scrobble → play history,
  setRating → clean no-op
- all endpoints also accept the .view suffix and GET+POST for client compat

Repo support: album list ordering (newest/random), track genre facets.
README documents the mandatory-HTTPS requirement and app-password workflow.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 18:24:06 +03:00
Senko-san b975164fc2 feat(subsonic): response envelope, id scheme, and error mapping
- envelope: one serializer emitting the <subsonic-response> wrapper in XML
  (default) and JSON (f=json), carrying status/version/type/serverVersion
- ids: stable, reversible type-prefixed ids (tr-/al-/ar-/pl-) ↔ UUIDs
- errors: /rest requests render the Subsonic error envelope (always HTTP 200)
  with standard codes (10 missing param, 40 wrong creds, 50, 70 not found)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 18:23:30 +03:00
Senko-san 7a17e3babd feat(subsonic): per-user encrypted app-password foundation
Subsonic auth (t=md5(password+salt), legacy p=) needs a recoverable secret,
but login passwords are stored as a one-way argon2 hash. Add a separate,
per-user app-password: high-entropy, random, and encrypted at rest with a
Fernet key derived from SUBSONIC_SECRET_KEY (never stored in the DB).

- SubsonicPasswordCipher + generate_subsonic_password in core.security
- users.subsonic_password_enc column (+ Alembic migration), repo + port methods
- SubsonicAuthService: verify (t+s / p / p=enc:) and rotate/reveal lifecycle
- self-service GET/POST /users/me/subsonic-password + admin rotate endpoint
- domain SubsonicCredentials + SubsonicCipher port; deps wiring

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 18:23:19 +03:00
34 changed files with 1951 additions and 83 deletions
+5
View File
@@ -17,6 +17,11 @@ JWT_SECRET=change-me-in-prod
ACCESS_TOKEN_TTL_SECONDS=900
REFRESH_TOKEN_TTL_SECONDS=2592000
# subsonic — key that encrypts per-user Subsonic app-passwords at rest.
# GENERATE a strong secret for prod (`openssl rand -hex 32`); rotating it
# invalidates all stored app-passwords. NOTE: /rest must be served over HTTPS.
SUBSONIC_SECRET_KEY=change-me-subsonic-key
# media / storage
MEDIA_PATH=/data/media
TRANSCODE_CACHE_PATH=/data/transcode-cache
+34
View File
@@ -70,3 +70,37 @@ The DB URL is injected from app settings — never hardcoded in `alembic.ini`.
All settings come from environment variables (or `.env` in dev). See
[`.env.example`](.env.example). External services (ML, AcoustID, MusicBrainz)
are **optional** — the backend degrades gracefully when they are absent.
## Subsonic API (`/rest`)
A Subsonic-compatible API is mounted at `/rest`, so standard clients (Symfonium,
DSub, play:Sub, …) can browse the library and stream. It is a thin adapter over
the native services — it adds no business logic of its own.
**HTTPS is mandatory.** Subsonic authentication puts the credential in the URL
(`t=md5(password+salt)&s=…`, or the legacy `p=`), so `/rest` must only ever be
exposed behind TLS (terminate at the reverse proxy). Never serve it over plain
HTTP.
### App-passwords
Subsonic auth needs a recoverable secret, but login passwords are stored as a
one-way argon2 hash. So Subsonic clients authenticate against a separate,
per-user **app-password** — high-entropy, random, and encrypted at rest with a
key derived from `SUBSONIC_SECRET_KEY` (set this to a strong random string in
prod; rotating it invalidates all stored app-passwords).
Self-service lifecycle (native API, needs a normal JWT login):
```bash
GET /api/v1/users/me/subsonic-password # reveal (generated lazily on first read)
POST /api/v1/users/me/subsonic-password # rotate
# admin, for any user:
POST /api/v1/admin/users/{user_id}/subsonic-password
```
Point the client at the instance URL, use your **username** + the revealed
**app-password** (not your login password).
> **Cover art** (`getCoverArt`) currently returns a placeholder — the cover
> pipeline (`/api/v1/.../cover` endpoints) is not implemented yet.
@@ -0,0 +1,32 @@
"""subsonic: per-user encrypted app-password
Revision ID: 20260608_subsonic_pw
Revises: 20260608_storage_uri
Create Date: 2026-06-08 12:00:00.000000
Adds ``users.subsonic_password_enc`` — the recoverable, Fernet-encrypted
Subsonic app-password (plan §7). NULL until the user generates one.
"""
from __future__ import annotations
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
revision: str = "20260608_subsonic_pw"
down_revision: str | None = "20260608_storage_uri"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.add_column(
"users",
sa.Column("subsonic_password_enc", sa.String(length=255), nullable=True),
)
def downgrade() -> None:
op.drop_column("users", "subsonic_password_enc")
+43 -3
View File
@@ -10,19 +10,20 @@ from collections.abc import AsyncIterator
from functools import lru_cache
from typing import Annotated
from fastapi import Depends
from fastapi import Depends, Query
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.ext.asyncio import AsyncSession
from app.application.auth_service import AuthService
from app.application.streaming_service import StreamingService
from app.application.subsonic_auth_service import SubsonicAuthService
from app.application.upload_service import UploadService
from app.application.user_service import UserService
from app.core.config import get_settings
from app.core.security import Argon2PasswordHasher, JwtTokenService
from app.core.security import Argon2PasswordHasher, JwtTokenService, SubsonicPasswordCipher
from app.domain.entities import User
from app.domain.errors import AuthenticationError, PermissionDeniedError
from app.domain.ports import FileStorage, PasswordHasher, TokenService
from app.domain.ports import FileStorage, PasswordHasher, SubsonicCipher, TokenService
from app.infrastructure.db import get_sessionmaker
from app.infrastructure.db.repositories import (
SqlAlchemyAlbumRepository,
@@ -64,6 +65,11 @@ def get_token_service() -> TokenService:
return JwtTokenService(get_settings())
@lru_cache
def get_subsonic_cipher() -> SubsonicCipher:
return SubsonicPasswordCipher(get_settings().subsonic_secret_key.get_secret_value())
# -- request-scoped services ---------------------------------------------------
def get_auth_service(session: SessionDep) -> AuthService:
return AuthService(
@@ -82,8 +88,16 @@ def get_user_service(session: SessionDep) -> UserService:
)
def get_subsonic_auth_service(session: SessionDep) -> SubsonicAuthService:
return SubsonicAuthService(
users=SqlAlchemyUserRepository(session),
cipher=get_subsonic_cipher(),
)
AuthServiceDep = Annotated[AuthService, Depends(get_auth_service)]
UserServiceDep = Annotated[UserService, Depends(get_user_service)]
SubsonicAuthServiceDep = Annotated[SubsonicAuthService, Depends(get_subsonic_auth_service)]
# -- file storage (process-cached) ---------------------------------------------
@@ -187,3 +201,29 @@ async def get_streaming_user(
StreamUser = Annotated[User, Depends(get_streaming_user)]
# -- subsonic (/rest) authentication -------------------------------------------
# Subsonic puts credentials in the query string: u + (t & s) | p, plus c/v/f.
# The dep extracts them and delegates verification to the service; domain errors
# propagate to the rest-aware exception handler, which renders the Subsonic
# error envelope (HTTP 200). HTTPS is mandatory — the secret rides in the URL.
async def get_subsonic_user(
service: SubsonicAuthServiceDep,
u: Annotated[str | None, Query()] = None,
t: Annotated[str | None, Query()] = None,
s: Annotated[str | None, Query()] = None,
p: Annotated[str | None, Query()] = None,
) -> User:
return await service.authenticate(username=u, token=t, salt=s, password=p)
SubsonicUser = Annotated[User, Depends(get_subsonic_user)]
async def get_subsonic_format(f: Annotated[str | None, Query()] = None) -> str | None:
"""The requested response format (``f``): ``xml`` (default) or ``json``."""
return f
SubsonicFormat = Annotated[str | None, Depends(get_subsonic_format)]
+33 -4
View File
@@ -1,8 +1,15 @@
"""Maps domain exceptions to HTTP responses. The only place that knows both."""
"""Maps domain exceptions to HTTP responses. The only place that knows both.
from fastapi import FastAPI, Request, status
Two surfaces share this mapping: the native ``/api/v1`` API answers with a JSON
error body and an HTTP status code, while the Subsonic ``/rest`` layer answers
with its own envelope and **always HTTP 200** (the status lives in the body). A
request is routed to the Subsonic renderer by path prefix.
"""
from fastapi import FastAPI, Request, Response, status
from fastapi.responses import JSONResponse
from app.api.rest.envelope import subsonic_error
from app.core.logging import get_logger
from app.domain.errors import (
AlreadyExistsError,
@@ -30,6 +37,21 @@ _STATUS_BY_ERROR: dict[type[DomainError], int] = {
StorageError: status.HTTP_500_INTERNAL_SERVER_ERROR,
}
# Subsonic error codes (subsonic.org/restapi): 10 missing param, 40 wrong
# credentials, 50 not authorized, 70 not found, 0 generic.
_SUBSONIC_CODE_BY_ERROR: dict[type[DomainError], int] = {
ValidationError: 10,
AuthenticationError: 40,
PermissionDeniedError: 50,
NotFoundError: 70,
}
_SUBSONIC_PREFIX = "/rest"
def _is_subsonic(request: Request) -> bool:
return request.url.path.startswith(_SUBSONIC_PREFIX)
def _error_body(code: str, message: str) -> dict[str, dict[str, str]]:
return {"error": {"code": code, "message": message}}
@@ -45,7 +67,10 @@ def register_exception_handlers(app: FastAPI) -> None:
)
@app.exception_handler(DomainError)
async def _handle_domain_error(_request: Request, exc: DomainError) -> JSONResponse:
async def _handle_domain_error(request: Request, exc: DomainError) -> Response:
if _is_subsonic(request):
code = _SUBSONIC_CODE_BY_ERROR.get(type(exc), 0)
return subsonic_error(code, exc.message, fmt=request.query_params.get("f"))
http_status = _STATUS_BY_ERROR.get(type(exc), status.HTTP_400_BAD_REQUEST)
return JSONResponse(
status_code=http_status,
@@ -53,8 +78,12 @@ def register_exception_handlers(app: FastAPI) -> None:
)
@app.exception_handler(Exception)
async def _handle_unexpected(_request: Request, exc: Exception) -> JSONResponse:
async def _handle_unexpected(request: Request, exc: Exception) -> Response:
log.error("unhandled_exception", exc_info=exc)
if _is_subsonic(request):
return subsonic_error(
0, "An unexpected error occurred.", fmt=request.query_params.get("f")
)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=_error_body("internal_error", "An unexpected error occurred."),
+89 -11
View File
@@ -1,23 +1,101 @@
"""Subsonic annotation endpoints: star, rating, scrobble."""
"""Subsonic annotation endpoints: star/unstar, rating, scrobble.
from typing import Any
* ``star``/``unstar`` map to the **append-only** like event-log (a new event per
call — never a mutated boolean; CLAUDE.md invariant). Album/artist stars are
accepted but not persisted (no album/artist likes yet).
* ``scrobble`` appends to play history.
* ``setRating`` has no backing store yet — it's accepted as a clean no-op.
"""
from fastapi import APIRouter
import datetime as dt
from typing import Annotated
from fastapi import APIRouter, Query, Response
from app.api.deps import HistoryRepoDep, LikeRepoDep, SubsonicFormat, SubsonicUser, TrackRepoDep
from app.api.rest.envelope import subsonic_response
from app.api.rest.ids import decode_track
from app.domain.errors import NotFoundError
router = APIRouter()
@router.get("/star")
async def star() -> Any: ...
@router.api_route("/star", methods=["GET", "POST"])
@router.api_route("/star.view", methods=["GET", "POST"])
async def star(
user: SubsonicUser,
fmt: SubsonicFormat,
like_repo: LikeRepoDep,
track_repo: TrackRepoDep,
id: Annotated[list[str] | None, Query()] = None,
albumId: Annotated[list[str] | None, Query()] = None,
artistId: Annotated[list[str] | None, Query()] = None,
) -> Response:
# albumId/artistId are accepted for client compatibility but not persisted.
for raw in id or []:
track_id = decode_track(raw)
if await track_repo.get_by_id(track_id) is None:
raise NotFoundError("Song not found.")
await like_repo.add(user_id=user.id, track_id=track_id, value="like")
return subsonic_response(fmt=fmt)
@router.get("/unstar")
async def unstar() -> Any: ...
@router.api_route("/unstar", methods=["GET", "POST"])
@router.api_route("/unstar.view", methods=["GET", "POST"])
async def unstar(
user: SubsonicUser,
fmt: SubsonicFormat,
like_repo: LikeRepoDep,
track_repo: TrackRepoDep,
id: Annotated[list[str] | None, Query()] = None,
albumId: Annotated[list[str] | None, Query()] = None,
artistId: Annotated[list[str] | None, Query()] = None,
) -> Response:
for raw in id or []:
track_id = decode_track(raw)
if await track_repo.get_by_id(track_id) is None:
raise NotFoundError("Song not found.")
await like_repo.add(user_id=user.id, track_id=track_id, value="neutral")
return subsonic_response(fmt=fmt)
@router.get("/setRating")
async def set_rating() -> Any: ...
@router.api_route("/setRating", methods=["GET", "POST"])
@router.api_route("/setRating.view", methods=["GET", "POST"])
async def set_rating(
_user: SubsonicUser,
fmt: SubsonicFormat,
id: Annotated[str, Query()],
rating: Annotated[int, Query(ge=0, le=5)],
) -> Response:
# No rating store yet — accept cleanly so clients don't error.
return subsonic_response(fmt=fmt)
@router.get("/scrobble")
async def scrobble() -> Any: ...
@router.api_route("/scrobble", methods=["GET", "POST"])
@router.api_route("/scrobble.view", methods=["GET", "POST"])
async def scrobble(
user: SubsonicUser,
fmt: SubsonicFormat,
history_repo: HistoryRepoDep,
track_repo: TrackRepoDep,
id: Annotated[list[str] | None, Query()] = None,
time: Annotated[list[int] | None, Query()] = None,
submission: Annotated[bool, Query()] = True,
) -> Response:
times = time or []
for index, raw in enumerate(id or []):
track_id = decode_track(raw)
if await track_repo.get_by_id(track_id) is None:
raise NotFoundError("Song not found.")
if index < len(times):
played_at = dt.datetime.fromtimestamp(times[index] / 1000, tz=dt.UTC)
else:
played_at = dt.datetime.now(dt.UTC)
await history_repo.add(
user_id=user.id,
track_id=track_id,
played_at=played_at,
play_duration_seconds=None,
completed=submission,
)
return subsonic_response(fmt=fmt)
+254 -24
View File
@@ -1,47 +1,277 @@
"""Subsonic browsing endpoints."""
"""Subsonic browsing endpoints — thin adapters over the library repositories.
from typing import Any
A single synthetic music folder (id ``0``) is exposed; this is a homelab, not a
multi-library server. Heavy lifting stays in the repositories; these handlers
only fan out queries and reshape rows into the Subsonic element dicts.
"""
from fastapi import APIRouter
from typing import Annotated, Any
from fastapi import APIRouter, Query, Response
from app.api.deps import AlbumRepoDep, ArtistRepoDep, SubsonicFormat, SubsonicUser, TrackRepoDep
from app.api.rest.envelope import subsonic_response
from app.api.rest.ids import IdKind, encode_album, encode_artist, parse
from app.api.rest.serializers import album_dict, artist_dict, iso, song_dict
from app.domain.entities import Album, Artist
from app.domain.errors import NotFoundError
router = APIRouter()
@router.get("/getMusicFolders")
async def get_music_folders() -> Any: ...
_IGNORED_ARTICLES = "The El La Los Las Le Les"
_MAX_ARTISTS = 10_000 # homelab scale; one pass is fine
@router.get("/getIndexes")
async def get_indexes() -> Any: ...
async def _artists_index(artist_repo: ArtistRepoDep) -> list[dict[str, Any]]:
"""Group artists into Subsonic A-Z index buckets, each with an album count."""
artists = await artist_repo.list(q=None, limit=_MAX_ARTISTS, offset=0)
buckets: dict[str, list[dict[str, Any]]] = {}
for artist in artists:
album_count = await artist_repo.album_count(artist.id)
letter = artist.name[:1].upper()
if not letter.isalpha():
letter = "#"
buckets.setdefault(letter, []).append(artist_dict(artist, album_count=album_count))
return [{"name": name, "artist": buckets[name]} for name in sorted(buckets)]
@router.get("/getMusicDirectory")
async def get_music_directory() -> Any: ...
async def _albums_for_artist(artist: Artist, album_repo: AlbumRepoDep) -> list[dict[str, Any]]:
albums = await album_repo.list(artist_id=artist.id, q=None, limit=500, offset=0)
counts = await album_repo.track_count_many([a.id for a in albums])
return [album_dict(a, artist, song_count=counts.get(a.id, 0)) for a in albums]
@router.get("/getArtists")
async def get_artists() -> Any: ...
@router.api_route("/getMusicFolders", methods=["GET", "POST"])
@router.api_route("/getMusicFolders.view", methods=["GET", "POST"])
async def get_music_folders(_user: SubsonicUser, fmt: SubsonicFormat) -> Response:
return subsonic_response(
{"musicFolders": {"musicFolder": [{"id": 0, "name": "Music"}]}}, fmt=fmt
)
@router.get("/getArtist")
async def get_artist() -> Any: ...
@router.api_route("/getIndexes", methods=["GET", "POST"])
@router.api_route("/getIndexes.view", methods=["GET", "POST"])
async def get_indexes(
_user: SubsonicUser, fmt: SubsonicFormat, artist_repo: ArtistRepoDep
) -> Response:
index = await _artists_index(artist_repo)
return subsonic_response(
{"indexes": {"ignoredArticles": _IGNORED_ARTICLES, "lastModified": 0, "index": index}},
fmt=fmt,
)
@router.get("/getAlbum")
async def get_album() -> Any: ...
@router.api_route("/getArtists", methods=["GET", "POST"])
@router.api_route("/getArtists.view", methods=["GET", "POST"])
async def get_artists(
_user: SubsonicUser, fmt: SubsonicFormat, artist_repo: ArtistRepoDep
) -> Response:
index = await _artists_index(artist_repo)
return subsonic_response(
{"artists": {"ignoredArticles": _IGNORED_ARTICLES, "index": index}}, fmt=fmt
)
@router.get("/getAlbumList")
async def get_album_list() -> Any: ...
@router.api_route("/getArtist", methods=["GET", "POST"])
@router.api_route("/getArtist.view", methods=["GET", "POST"])
async def get_artist(
_user: SubsonicUser,
fmt: SubsonicFormat,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
id: Annotated[str, Query()],
) -> Response:
_, artist_id = parse(id)
artist = await artist_repo.get_by_id(artist_id)
if artist is None:
raise NotFoundError("Artist not found.")
albums = await _albums_for_artist(artist, album_repo)
payload = {
**artist_dict(artist, album_count=len(albums)),
"album": albums,
}
return subsonic_response({"artist": payload}, fmt=fmt)
@router.get("/getAlbumList2")
async def get_album_list2() -> Any: ...
@router.api_route("/getAlbum", methods=["GET", "POST"])
@router.api_route("/getAlbum.view", methods=["GET", "POST"])
async def get_album(
_user: SubsonicUser,
fmt: SubsonicFormat,
album_repo: AlbumRepoDep,
artist_repo: ArtistRepoDep,
track_repo: TrackRepoDep,
id: Annotated[str, Query()],
) -> Response:
_, album_id = parse(id)
album = await album_repo.get_by_id(album_id)
if album is None:
raise NotFoundError("Album not found.")
artist = await artist_repo.get_by_id(album.artist_id)
tracks = await track_repo.list(
artist_id=None,
album_id=album_id,
q=None,
sort_by="title",
order="asc",
limit=500,
offset=0,
)
duration = sum(t.duration_seconds or 0 for t in tracks)
songs = [song_dict(t, artist, album) for t in tracks]
payload = {
**album_dict(album, artist, song_count=len(songs), duration=duration),
"song": songs,
}
return subsonic_response({"album": payload}, fmt=fmt)
@router.get("/getSong")
async def get_song() -> Any: ...
@router.api_route("/getAlbumList", methods=["GET", "POST"])
@router.api_route("/getAlbumList.view", methods=["GET", "POST"])
async def get_album_list(
_user: SubsonicUser,
fmt: SubsonicFormat,
album_repo: AlbumRepoDep,
artist_repo: ArtistRepoDep,
type: Annotated[str, Query()] = "newest",
size: Annotated[int, Query(ge=1, le=500)] = 10,
offset: Annotated[int, Query(ge=0)] = 0,
) -> Response:
albums = await _list_albums(album_repo, artist_repo, type, size, offset)
return subsonic_response({"albumList": {"album": albums}}, fmt=fmt)
@router.get("/getGenres")
async def get_genres() -> Any: ...
@router.api_route("/getAlbumList2", methods=["GET", "POST"])
@router.api_route("/getAlbumList2.view", methods=["GET", "POST"])
async def get_album_list2(
_user: SubsonicUser,
fmt: SubsonicFormat,
album_repo: AlbumRepoDep,
artist_repo: ArtistRepoDep,
type: Annotated[str, Query()] = "newest",
size: Annotated[int, Query(ge=1, le=500)] = 10,
offset: Annotated[int, Query(ge=0)] = 0,
) -> Response:
albums = await _list_albums(album_repo, artist_repo, type, size, offset)
return subsonic_response({"albumList2": {"album": albums}}, fmt=fmt)
async def _list_albums(
album_repo: AlbumRepoDep,
artist_repo: ArtistRepoDep,
type_: str,
size: int,
offset: int,
) -> list[dict[str, Any]]:
if type_ == "alphabeticalByName":
sort_by, order = "title", "asc"
elif type_ == "random":
sort_by, order = "title", "random"
else: # newest / recent / frequent → newest (no play stats yet)
sort_by, order = "created", "desc"
albums = await album_repo.list(
artist_id=None, q=None, limit=size, offset=offset, sort_by=sort_by, order=order
)
return await _decorate_albums(albums, album_repo, artist_repo)
async def _decorate_albums(
albums: list[Album], album_repo: AlbumRepoDep, artist_repo: ArtistRepoDep
) -> list[dict[str, Any]]:
artist_ids = list({a.artist_id for a in albums})
artists = {a.id: a for a in await artist_repo.get_many(artist_ids)}
counts = await album_repo.track_count_many([a.id for a in albums])
return [album_dict(a, artists.get(a.artist_id), song_count=counts.get(a.id, 0)) for a in albums]
@router.api_route("/getSong", methods=["GET", "POST"])
@router.api_route("/getSong.view", methods=["GET", "POST"])
async def get_song(
_user: SubsonicUser,
fmt: SubsonicFormat,
track_repo: TrackRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
id: Annotated[str, Query()],
) -> Response:
_, track_id = parse(id)
track = await track_repo.get_by_id(track_id)
if track is None:
raise NotFoundError("Song not found.")
artist = await artist_repo.get_by_id(track.artist_id)
album = await album_repo.get_by_id(track.album_id) if track.album_id else None
return subsonic_response({"song": song_dict(track, artist, album)}, fmt=fmt)
@router.api_route("/getGenres", methods=["GET", "POST"])
@router.api_route("/getGenres.view", methods=["GET", "POST"])
async def get_genres(
_user: SubsonicUser, fmt: SubsonicFormat, track_repo: TrackRepoDep
) -> Response:
genres = [
{"value": name, "songCount": count, "albumCount": 0}
for name, count in await track_repo.genres()
]
return subsonic_response({"genres": {"genre": genres}}, fmt=fmt)
@router.api_route("/getMusicDirectory", methods=["GET", "POST"])
@router.api_route("/getMusicDirectory.view", methods=["GET", "POST"])
async def get_music_directory(
_user: SubsonicUser,
fmt: SubsonicFormat,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
track_repo: TrackRepoDep,
id: Annotated[str, Query()],
) -> Response:
kind, entity_id = parse(id)
if kind is IdKind.ARTIST:
artist = await artist_repo.get_by_id(entity_id)
if artist is None:
raise NotFoundError("Artist not found.")
albums = await album_repo.list(artist_id=artist.id, q=None, limit=500, offset=0)
counts = await album_repo.track_count_many([a.id for a in albums])
children = [
{
"id": encode_album(a.id),
"parent": encode_artist(artist.id),
"isDir": True,
"title": a.title,
"name": a.title,
"artist": artist.name,
"artistId": encode_artist(artist.id),
"coverArt": encode_album(a.id),
"songCount": counts.get(a.id, 0),
"created": iso(a.created_at),
"year": a.year,
}
for a in albums
]
directory = {"id": id, "name": artist.name, "child": children}
return subsonic_response({"directory": directory}, fmt=fmt)
if kind is IdKind.ALBUM:
album = await album_repo.get_by_id(entity_id)
if album is None:
raise NotFoundError("Album not found.")
artist = await artist_repo.get_by_id(album.artist_id)
tracks = await track_repo.list(
artist_id=None,
album_id=album.id,
q=None,
sort_by="title",
order="asc",
limit=500,
offset=0,
)
children = [song_dict(t, artist, album) for t in tracks]
directory = {
"id": id,
"parent": encode_artist(album.artist_id),
"name": album.title,
"child": children,
}
return subsonic_response({"directory": directory}, fmt=fmt)
raise NotFoundError("Directory not found.")
+102
View File
@@ -0,0 +1,102 @@
"""The Subsonic response envelope — one serializer, two wire formats.
Every Subsonic endpoint answers with a ``<subsonic-response>`` wrapper carrying
``status`` / ``version`` / ``type`` / ``serverVersion``, in XML (default) or JSON
(``f=json``). All handlers return through :func:`subsonic_response`; errors go
through the rest-aware exception handler (see ``app.api.errors``).
Payload data model (shared by both formats):
* a scalar value → an XML attribute / a JSON field
* a nested dict → a single child element / nested object
* a list of dicts → repeated child elements / a JSON array
* the key ``"value"`` → element text content (used by e.g. lyrics)
``None`` values are dropped. Subsonic always replies with **HTTP 200**, even for
errors — the status lives inside the envelope — so clients parse the body.
"""
import json
from collections.abc import Mapping
from typing import Any
from xml.etree import ElementTree as ET
from fastapi import Response
SUBSONIC_API_VERSION = "1.16.1"
SERVER_TYPE = "mcma"
SERVER_VERSION = "0.1.0"
_XML_NS = "http://subsonic.org/restapi"
_XML_MEDIA_TYPE = "application/xml; charset=utf-8"
_JSON_MEDIA_TYPE = "application/json; charset=utf-8"
def _is_json(fmt: str | None) -> bool:
return fmt in ("json", "jsonp")
def _scalar(value: object) -> str:
if isinstance(value, bool):
return "true" if value else "false"
return str(value)
def _build_xml(parent: ET.Element, data: Mapping[str, Any]) -> None:
for key, value in data.items():
if value is None:
continue
if key == "value":
parent.text = _scalar(value)
elif isinstance(value, Mapping):
_build_xml(ET.SubElement(parent, key), value)
elif isinstance(value, list):
for item in value:
_build_xml(ET.SubElement(parent, key), item)
else:
parent.set(key, _scalar(value))
def _strip_none(value: Any) -> Any:
"""Recursively drop ``None`` values so JSON output matches XML (no empty attrs)."""
if isinstance(value, Mapping):
return {k: _strip_none(v) for k, v in value.items() if v is not None}
if isinstance(value, list):
return [_strip_none(v) for v in value]
return value
def _render(body: Mapping[str, Any], fmt: str | None) -> Response:
envelope: dict[str, Any] = {
"status": body["status"],
"version": SUBSONIC_API_VERSION,
"type": SERVER_TYPE,
"serverVersion": SERVER_VERSION,
"openSubsonic": True,
**{k: v for k, v in body.items() if k != "status"},
}
if _is_json(fmt):
payload = json.dumps({"subsonic-response": _strip_none(envelope)})
return Response(content=payload, media_type=_JSON_MEDIA_TYPE)
root = ET.Element("subsonic-response", {"xmlns": _XML_NS})
_build_xml(root, envelope)
xml = b'<?xml version="1.0" encoding="UTF-8"?>\n' + ET.tostring(root, encoding="utf-8")
return Response(content=xml, media_type=_XML_MEDIA_TYPE)
def subsonic_response(
payload: Mapping[str, Any] | None = None, *, fmt: str | None = None
) -> Response:
"""A successful ``status="ok"`` envelope wrapping ``payload``."""
body: dict[str, Any] = {"status": "ok"}
if payload:
body.update(payload)
return _render(body, fmt)
def subsonic_error(code: int, message: str, *, fmt: str | None = None) -> Response:
"""A ``status="failed"`` envelope carrying a Subsonic ``<error>``."""
body = {"status": "failed", "error": {"code": code, "message": message}}
return _render(body, fmt)
+75
View File
@@ -0,0 +1,75 @@
"""Stable, reversible mapping between Subsonic opaque string ids and our UUIDs.
Subsonic ids are opaque strings; ours are UUIDs. We use a type-prefixed,
human-debuggable convention (``tr-<uuid>`` track, ``al-<uuid>`` album,
``ar-<uuid>`` artist, ``pl-<uuid>`` playlist). Cover-art ids reuse the entity's
own id (an album cover is ``al-<uuid>``, a track cover ``tr-<uuid>``). Centralize
encode/decode here so the convention lives in exactly one place.
"""
import uuid
from enum import StrEnum
from app.domain.errors import NotFoundError
class IdKind(StrEnum):
TRACK = "tr"
ALBUM = "al"
ARTIST = "ar"
PLAYLIST = "pl"
def encode(kind: IdKind, value: uuid.UUID) -> str:
return f"{kind.value}-{value}"
def encode_track(value: uuid.UUID) -> str:
return encode(IdKind.TRACK, value)
def encode_album(value: uuid.UUID) -> str:
return encode(IdKind.ALBUM, value)
def encode_artist(value: uuid.UUID) -> str:
return encode(IdKind.ARTIST, value)
def encode_playlist(value: uuid.UUID) -> str:
return encode(IdKind.PLAYLIST, value)
def parse(raw: str) -> tuple[IdKind, uuid.UUID]:
"""Decode any prefixed id into its kind + UUID. Raises ``NotFoundError`` on a
malformed id (an unknown id is, from the client's view, simply not found)."""
prefix, _, rest = raw.partition("-")
try:
kind = IdKind(prefix)
value = uuid.UUID(rest)
except ValueError as exc:
raise NotFoundError(f"Unknown id {raw!r}.") from exc
return kind, value
def _decode_as(raw: str, expected: IdKind) -> uuid.UUID:
kind, value = parse(raw)
if kind is not expected:
raise NotFoundError(f"Expected a {expected.name.lower()} id, got {raw!r}.")
return value
def decode_track(raw: str) -> uuid.UUID:
return _decode_as(raw, IdKind.TRACK)
def decode_album(raw: str) -> uuid.UUID:
return _decode_as(raw, IdKind.ALBUM)
def decode_artist(raw: str) -> uuid.UUID:
return _decode_as(raw, IdKind.ARTIST)
def decode_playlist(raw: str) -> uuid.UUID:
return _decode_as(raw, IdKind.PLAYLIST)
+69 -10
View File
@@ -1,19 +1,78 @@
"""Subsonic media endpoints: stream, download, cover art."""
"""Subsonic media endpoints: stream, download, cover art.
from typing import Any
``stream`` and ``download`` reuse :class:`StreamingService` (honouring HTTP
Range) — they return raw bytes, not the Subsonic envelope. Transcoding params
(``maxBitRate``/``format``) are accepted but ignored; the original file is served
(no in-request ffmpeg — CLAUDE.md). ``getCoverArt`` returns a placeholder until
the cover pipeline lands (the ``/api/v1`` cover endpoints are still stubs).
"""
from fastapi import APIRouter
import base64
from typing import Annotated
from fastapi import APIRouter, Header, Query
from fastapi.responses import Response, StreamingResponse
from app.api.deps import StreamingServiceDep, SubsonicUser, TrackRepoDep
from app.api.rest.ids import decode_track, parse
from app.domain.errors import NotFoundError
router = APIRouter()
@router.get("/stream")
async def stream() -> Any: ...
# 1x1 transparent PNG - a graceful placeholder until cover art is wired up.
_PLACEHOLDER_PNG = base64.b64decode(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mNk+M8AAAMBAQDJ/pLvAAAAAElFTkSuQmCC"
)
@router.get("/download")
async def download() -> Any: ...
@router.api_route("/stream", methods=["GET", "POST"])
@router.api_route("/stream.view", methods=["GET", "POST"])
async def stream(
_user: SubsonicUser,
service: StreamingServiceDep,
id: Annotated[str, Query()],
range_header: Annotated[str | None, Header(alias="Range")] = None,
) -> StreamingResponse:
result = await service.open_stream(decode_track(id), range_header)
headers = {"Accept-Ranges": "bytes", "Content-Length": str(result.content_length)}
status_code = 200
if result.is_partial:
headers["Content-Range"] = f"bytes {result.start}-{result.end}/{result.total_size}"
status_code = 206
return StreamingResponse(
result.stream, status_code=status_code, headers=headers, media_type=result.content_type
)
@router.get("/getCoverArt")
async def get_cover_art() -> Any: ...
@router.api_route("/download", methods=["GET", "POST"])
@router.api_route("/download.view", methods=["GET", "POST"])
async def download(
_user: SubsonicUser,
service: StreamingServiceDep,
track_repo: TrackRepoDep,
id: Annotated[str, Query()],
) -> StreamingResponse:
track_id = decode_track(id)
track = await track_repo.get_by_id(track_id)
if track is None:
raise NotFoundError("Song not found.")
result = await service.open_stream(track_id, None)
filename = f"{track.title}.{track.file_format}"
headers = {
"Content-Length": str(result.content_length),
"Content-Disposition": f'attachment; filename="{filename}"',
}
return StreamingResponse(result.stream, headers=headers, media_type=result.content_type)
@router.api_route("/getCoverArt", methods=["GET", "POST"])
@router.api_route("/getCoverArt.view", methods=["GET", "POST"])
async def get_cover_art(
_user: SubsonicUser,
id: Annotated[str, Query()],
size: Annotated[int | None, Query()] = None,
) -> Response:
# Validate the id shape so clients get a clean error on garbage, then serve a
# placeholder. TODO: stream real covers once the cover pipeline exists.
parse(id)
return Response(content=_PLACEHOLDER_PNG, media_type="image/png")
+168 -13
View File
@@ -1,27 +1,182 @@
"""Subsonic playlist endpoints."""
"""Subsonic playlist endpoints — adapters over the playlist repository.
from typing import Any
Playlists are private to their owner (no public-playlist concept yet), so every
read/write is scoped to the authenticated user. ``createPlaylist`` doubles as a
full replace when given a ``playlistId`` (Subsonic overloads it that way).
"""
from fastapi import APIRouter
from typing import Annotated, Any
from fastapi import APIRouter, Query, Response
from app.api.deps import (
AlbumRepoDep,
ArtistRepoDep,
PlaylistRepoDep,
SubsonicFormat,
SubsonicUser,
)
from app.api.rest.envelope import subsonic_response
from app.api.rest.ids import decode_playlist, decode_track, encode_playlist
from app.api.rest.serializers import iso, song_dict
from app.domain.entities import Playlist, User
from app.domain.errors import NotFoundError, PermissionDeniedError
router = APIRouter()
@router.get("/getPlaylists")
async def get_playlists() -> Any: ...
def _playlist_dict(playlist: Playlist, owner: str, *, song_count: int) -> dict[str, Any]:
return {
"id": encode_playlist(playlist.id),
"name": playlist.name,
"comment": playlist.description,
"owner": owner,
"public": False,
"songCount": song_count,
"created": iso(playlist.created_at),
"changed": iso(playlist.updated_at),
}
@router.get("/getPlaylist")
async def get_playlist() -> Any: ...
async def _owned_playlist(
playlist_id_raw: str, playlist_repo: PlaylistRepoDep, user: User
) -> Playlist:
playlist = await playlist_repo.get_by_id(decode_playlist(playlist_id_raw))
if playlist is None:
raise NotFoundError("Playlist not found.")
if playlist.owner_id != user.id:
raise PermissionDeniedError("You don't own this playlist.")
return playlist
@router.get("/createPlaylist")
async def create_playlist() -> Any: ...
async def _playlist_songs(
playlist_id: str,
playlist_repo: PlaylistRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
) -> list[dict[str, Any]]:
tracks = await playlist_repo.get_tracks(decode_playlist(playlist_id), limit=10_000, offset=0)
artist_map = {a.id: a for a in await artist_repo.get_many(list({t.artist_id for t in tracks}))}
album_map = {
a.id: a
for a in await album_repo.get_many(
list({t.album_id for t in tracks if t.album_id is not None})
)
}
return [
song_dict(
t,
artist_map.get(t.artist_id),
album_map.get(t.album_id) if t.album_id is not None else None,
)
for t in tracks
]
@router.get("/updatePlaylist")
async def update_playlist() -> Any: ...
@router.api_route("/getPlaylists", methods=["GET", "POST"])
@router.api_route("/getPlaylists.view", methods=["GET", "POST"])
async def get_playlists(
user: SubsonicUser, fmt: SubsonicFormat, playlist_repo: PlaylistRepoDep
) -> Response:
playlists = await playlist_repo.list(owner_id=user.id, limit=500, offset=0)
counts = await playlist_repo.track_count_many([p.id for p in playlists])
items = [_playlist_dict(p, user.username, song_count=counts.get(p.id, 0)) for p in playlists]
return subsonic_response({"playlists": {"playlist": items}}, fmt=fmt)
@router.get("/deletePlaylist")
async def delete_playlist() -> Any: ...
@router.api_route("/getPlaylist", methods=["GET", "POST"])
@router.api_route("/getPlaylist.view", methods=["GET", "POST"])
async def get_playlist(
user: SubsonicUser,
fmt: SubsonicFormat,
playlist_repo: PlaylistRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
id: Annotated[str, Query()],
) -> Response:
playlist = await _owned_playlist(id, playlist_repo, user)
songs = await _playlist_songs(id, playlist_repo, artist_repo, album_repo)
payload = {**_playlist_dict(playlist, user.username, song_count=len(songs)), "entry": songs}
return subsonic_response({"playlist": payload}, fmt=fmt)
@router.api_route("/createPlaylist", methods=["GET", "POST"])
@router.api_route("/createPlaylist.view", methods=["GET", "POST"])
async def create_playlist(
user: SubsonicUser,
fmt: SubsonicFormat,
playlist_repo: PlaylistRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
name: Annotated[str | None, Query()] = None,
playlistId: Annotated[str | None, Query()] = None,
songId: Annotated[list[str] | None, Query()] = None,
) -> Response:
song_ids = [decode_track(s) for s in (songId or [])]
if playlistId is not None:
# Overloaded form: replace the existing playlist's tracks (and name).
playlist = await _owned_playlist(playlistId, playlist_repo, user)
if name is not None:
playlist = await playlist_repo.update(playlist.id, name=name, description=None)
existing = await playlist_repo.get_tracks(playlist.id, limit=10_000, offset=0)
for t in existing:
await playlist_repo.remove_track(playlist.id, t.id)
else:
playlist = await playlist_repo.add(
name=name or "Untitled", description=None, owner_id=user.id
)
for position, track_id in enumerate(song_ids, start=1):
await playlist_repo.add_track(playlist.id, track_id, position=float(position))
encoded = encode_playlist(playlist.id)
songs = await _playlist_songs(encoded, playlist_repo, artist_repo, album_repo)
payload = {**_playlist_dict(playlist, user.username, song_count=len(songs)), "entry": songs}
return subsonic_response({"playlist": payload}, fmt=fmt)
@router.api_route("/updatePlaylist", methods=["GET", "POST"])
@router.api_route("/updatePlaylist.view", methods=["GET", "POST"])
async def update_playlist(
user: SubsonicUser,
fmt: SubsonicFormat,
playlist_repo: PlaylistRepoDep,
playlistId: Annotated[str, Query()],
name: Annotated[str | None, Query()] = None,
comment: Annotated[str | None, Query()] = None,
songIdToAdd: Annotated[list[str] | None, Query()] = None,
songIndexToRemove: Annotated[list[int] | None, Query()] = None,
) -> Response:
playlist = await _owned_playlist(playlistId, playlist_repo, user)
if name is not None or comment is not None:
playlist = await playlist_repo.update(playlist.id, name=name, description=comment)
# Removals are by index into the current ordered track list — resolve first.
if songIndexToRemove:
current = await playlist_repo.get_tracks(playlist.id, limit=10_000, offset=0)
for index in sorted(set(songIndexToRemove)):
if 0 <= index < len(current):
await playlist_repo.remove_track(playlist.id, current[index].id)
if songIdToAdd:
position = await playlist_repo.max_position(playlist.id)
for raw in songIdToAdd:
position += 1.0
await playlist_repo.add_track(playlist.id, decode_track(raw), position=position)
return subsonic_response(fmt=fmt)
@router.api_route("/deletePlaylist", methods=["GET", "POST"])
@router.api_route("/deletePlaylist.view", methods=["GET", "POST"])
async def delete_playlist(
user: SubsonicUser,
fmt: SubsonicFormat,
playlist_repo: PlaylistRepoDep,
id: Annotated[str, Query()],
) -> Response:
playlist = await _owned_playlist(id, playlist_repo, user)
await playlist_repo.delete(playlist.id)
return subsonic_response(fmt=fmt)
+80 -5
View File
@@ -1,11 +1,86 @@
"""Subsonic search endpoints."""
"""Subsonic search endpoints — search3 over the library repositories.
from typing import Any
Mirrors the native ``/api/v1/search/library`` fan-out (tracks/albums/artists),
reshaped into the Subsonic ``searchResult3`` element. An empty query returns
results so clients can use search3 to browse.
"""
from fastapi import APIRouter
from typing import Annotated, Any
from fastapi import APIRouter, Query, Response
from app.api.deps import AlbumRepoDep, ArtistRepoDep, SubsonicFormat, SubsonicUser, TrackRepoDep
from app.api.rest.envelope import subsonic_response
from app.api.rest.serializers import album_dict, artist_dict, song_dict
router = APIRouter()
@router.get("/search3")
async def search3() -> Any: ...
@router.api_route("/search3", methods=["GET", "POST"])
@router.api_route("/search3.view", methods=["GET", "POST"])
async def search3(
_user: SubsonicUser,
fmt: SubsonicFormat,
track_repo: TrackRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
query: Annotated[str, Query()] = "",
artistCount: Annotated[int, Query(ge=0, le=500)] = 20,
artistOffset: Annotated[int, Query(ge=0)] = 0,
albumCount: Annotated[int, Query(ge=0, le=500)] = 20,
albumOffset: Annotated[int, Query(ge=0)] = 0,
songCount: Annotated[int, Query(ge=0, le=500)] = 20,
songOffset: Annotated[int, Query(ge=0)] = 0,
) -> Response:
# Subsonic sends "" (and some clients '""') to mean "everything".
q: str | None = query.strip().strip('"') or None
artists_out: list[dict[str, Any]] = []
if artistCount:
artists = await artist_repo.list(q=q, limit=artistCount, offset=artistOffset)
for a in artists:
album_count = await artist_repo.album_count(a.id)
artists_out.append(artist_dict(a, album_count=album_count))
albums_out: list[dict[str, Any]] = []
if albumCount:
albums = await album_repo.list(artist_id=None, q=q, limit=albumCount, offset=albumOffset)
album_artist_ids = list({a.artist_id for a in albums})
album_artist_map = {a.id: a for a in await artist_repo.get_many(album_artist_ids)}
counts = await album_repo.track_count_many([a.id for a in albums])
albums_out = [
album_dict(a, album_artist_map.get(a.artist_id), song_count=counts.get(a.id, 0))
for a in albums
]
songs_out: list[dict[str, Any]] = []
if songCount:
tracks = await track_repo.list(
artist_id=None,
album_id=None,
q=q,
sort_by="title",
order="asc",
limit=songCount,
offset=songOffset,
)
song_artist_map = {
a.id: a for a in await artist_repo.get_many(list({t.artist_id for t in tracks}))
}
song_album_map = {
a.id: a
for a in await album_repo.get_many(
list({t.album_id for t in tracks if t.album_id is not None})
)
}
songs_out = [
song_dict(
t,
song_artist_map.get(t.artist_id),
song_album_map.get(t.album_id) if t.album_id is not None else None,
)
for t in tracks
]
payload = {"searchResult3": {"artist": artists_out, "album": albums_out, "song": songs_out}}
return subsonic_response(payload, fmt=fmt)
+92
View File
@@ -0,0 +1,92 @@
"""Entity → Subsonic child-dict mappers (presentation only).
Pure functions turning domain entities into the attribute dicts the envelope
serializer renders as ``<artist>`` / ``<album>`` / ``<song>`` elements (or their
JSON equivalents). No business logic — they only reshape and rename.
"""
import datetime as dt
from typing import Any
from app.api.rest.ids import encode_album, encode_artist, encode_track
from app.domain.entities import Album, Artist, Track
# Suffix → MIME, for the ``contentType``/``suffix`` song attributes. A
# presentation detail (mirrors StreamingService's content-type negotiation).
_CONTENT_TYPE: dict[str, str] = {
"mp3": "audio/mpeg",
"flac": "audio/flac",
"m4a": "audio/mp4",
"aac": "audio/aac",
"ogg": "audio/ogg",
"opus": "audio/ogg",
"wav": "audio/wav",
"aiff": "audio/aiff",
"aif": "audio/aiff",
}
def iso(value: dt.datetime) -> str:
return value.astimezone(dt.UTC).strftime("%Y-%m-%dT%H:%M:%S.000Z")
def content_type_for(file_format: str) -> str:
return _CONTENT_TYPE.get(file_format.lower(), "application/octet-stream")
def artist_dict(artist: Artist, *, album_count: int) -> dict[str, Any]:
return {
"id": encode_artist(artist.id),
"name": artist.name,
"albumCount": album_count,
"coverArt": encode_artist(artist.id),
}
def album_dict(
album: Album,
artist: Artist | None,
*,
song_count: int,
duration: int | None = None,
) -> dict[str, Any]:
return {
"id": encode_album(album.id),
"name": album.title,
"title": album.title,
"artist": artist.name if artist is not None else None,
"artistId": encode_artist(album.artist_id),
"coverArt": encode_album(album.id),
"songCount": song_count,
"duration": duration,
"created": iso(album.created_at),
"year": album.year,
}
def song_dict(
track: Track,
artist: Artist | None,
album: Album | None,
) -> dict[str, Any]:
cover = encode_album(track.album_id) if track.album_id is not None else encode_track(track.id)
return {
"id": encode_track(track.id),
"parent": encode_album(track.album_id) if track.album_id is not None else None,
"isDir": False,
"title": track.title,
"album": album.title if album is not None else None,
"artist": artist.name if artist is not None else None,
"albumId": encode_album(track.album_id) if track.album_id is not None else None,
"artistId": encode_artist(track.artist_id),
"coverArt": cover,
"size": track.file_size,
"contentType": content_type_for(track.file_format),
"suffix": track.file_format,
"duration": track.duration_seconds,
"year": track.year,
"genre": track.genre,
"created": iso(track.created_at),
"type": "music",
"isVideo": False,
}
+13 -6
View File
@@ -1,15 +1,22 @@
"""Subsonic system endpoints: ping and license."""
from typing import Any
from fastapi import APIRouter, Response
from fastapi import APIRouter
from app.api.deps import SubsonicFormat, SubsonicUser
from app.api.rest.envelope import subsonic_response
router = APIRouter()
@router.get("/ping")
async def ping() -> Any: ...
@router.api_route("/ping", methods=["GET", "POST"])
@router.api_route("/ping.view", methods=["GET", "POST"])
async def ping(_user: SubsonicUser, fmt: SubsonicFormat) -> Response:
# Requiring auth makes ping a credential check — exactly how clients use it.
return subsonic_response(fmt=fmt)
@router.get("/getLicense")
async def get_license() -> Any: ...
@router.api_route("/getLicense", methods=["GET", "POST"])
@router.api_route("/getLicense.view", methods=["GET", "POST"])
async def get_license(_user: SubsonicUser, fmt: SubsonicFormat) -> Response:
# Self-hosted and free — the license is always valid.
return subsonic_response({"license": {"valid": True}}, fmt=fmt)
+13
View File
@@ -0,0 +1,13 @@
"""Schemas for Subsonic app-password self-service (native /api/v1 surface).
The Subsonic /rest layer itself returns its own XML/JSON envelope, not these
pydantic models — these only back the lifecycle endpoints that reveal/rotate the
recoverable app-password."""
from pydantic import BaseModel
class SubsonicPasswordResponse(BaseModel):
"""The plaintext Subsonic app-password, for pasting into a client."""
password: str
+10 -1
View File
@@ -9,7 +9,8 @@ from typing import Any
from fastapi import APIRouter, Query, status
from app.api.deps import SuperUser, UserServiceDep
from app.api.deps import SubsonicAuthServiceDep, SuperUser, UserServiceDep
from app.api.schemas.subsonic import SubsonicPasswordResponse
from app.api.schemas.user import (
CreateUserRequest,
ResetPasswordRequest,
@@ -81,6 +82,14 @@ async def deactivate_user(
return UserResponse.from_entity(await users.deactivate(user_id))
@router.post("/users/{user_id}/subsonic-password", response_model=SubsonicPasswordResponse)
async def rotate_user_subsonic_password(
user_id: uuid.UUID, _admin: SuperUser, subsonic: SubsonicAuthServiceDep
) -> SubsonicPasswordResponse:
"""Rotate any user's Subsonic app-password and return the new plaintext."""
return SubsonicPasswordResponse(password=await subsonic.rotate(user_id))
@router.get("/services")
async def list_services(_admin: SuperUser) -> Any: ...
+21 -1
View File
@@ -2,7 +2,8 @@
from fastapi import APIRouter, status
from app.api.deps import CurrentUser, UserServiceDep
from app.api.deps import CurrentUser, SubsonicAuthServiceDep, UserServiceDep
from app.api.schemas.subsonic import SubsonicPasswordResponse
from app.api.schemas.user import ChangePasswordRequest
router = APIRouter(prefix="/users", tags=["users"])
@@ -17,3 +18,22 @@ async def change_my_password(
current_password=body.current_password,
new_password=body.new_password,
)
@router.get("/me/subsonic-password", response_model=SubsonicPasswordResponse)
async def reveal_my_subsonic_password(
user: CurrentUser, subsonic: SubsonicAuthServiceDep
) -> SubsonicPasswordResponse:
"""Reveal the caller's Subsonic app-password for copying into a client.
It's recoverable, so it can be read on demand; one is generated lazily on
first access. Paste it (with the username) into Symfonium/DSub."""
return SubsonicPasswordResponse(password=await subsonic.reveal(user.id))
@router.post("/me/subsonic-password", response_model=SubsonicPasswordResponse)
async def rotate_my_subsonic_password(
user: CurrentUser, subsonic: SubsonicAuthServiceDep
) -> SubsonicPasswordResponse:
"""Rotate the caller's Subsonic app-password (invalidates the previous one)."""
return SubsonicPasswordResponse(password=await subsonic.rotate(user.id))
+100
View File
@@ -0,0 +1,100 @@
"""SubsonicAuthService — app-password lifecycle + Subsonic auth verification.
The Subsonic protocol authenticates with either ``t=md5(password+salt)`` (+``s``)
or the legacy ``p=`` (plaintext or ``enc:<hex>``). Both need a *recoverable*
secret server-side, so Subsonic clients authenticate against a dedicated,
high-entropy app-password — never the argon2 login password. That app-password
is encrypted at rest (:class:`~app.domain.ports.SubsonicCipher`) and decrypted
only here, to verify a request or to reveal it for copying into a client.
This is an adapter over the existing user store; it adds no business state of its
own beyond the app-password column (CLAUDE.md: Subsonic is an adapter, not a
reimplementation).
"""
import hashlib
import hmac
import uuid
from app.core.security import generate_subsonic_password
from app.domain.entities import User
from app.domain.errors import AuthenticationError, NotFoundError, ValidationError
from app.domain.ports import SubsonicCipher, UserRepository
def _md5_hex(value: str) -> str:
return hashlib.md5(value.encode("utf-8"), usedforsecurity=False).hexdigest()
def _decode_legacy_password(p: str) -> str:
"""Decode a Subsonic ``p`` param: ``enc:<hex>`` (hex-encoded) or plaintext."""
if p.startswith("enc:"):
try:
return bytes.fromhex(p[4:]).decode("utf-8")
except ValueError as exc:
raise AuthenticationError("Wrong username or password.") from exc
return p
class SubsonicAuthService:
def __init__(self, *, users: UserRepository, cipher: SubsonicCipher) -> None:
self._users = users
self._cipher = cipher
async def authenticate(
self,
*,
username: str | None,
token: str | None,
salt: str | None,
password: str | None,
) -> User:
"""Resolve Subsonic query auth params to a domain :class:`User`.
Raises :class:`ValidationError` (Subsonic code 10) for missing params and
:class:`AuthenticationError` (code 40) for any credential mismatch — an
unknown user is reported identically to a wrong password (no enumeration).
"""
if not username:
raise ValidationError("Required parameter 'u' is missing.")
if not ((token and salt) or password):
raise ValidationError("Required authentication parameter is missing.")
creds = await self._users.get_subsonic_credentials_by_username(username)
if creds is None or not creds.user.is_active or creds.password_enc is None:
raise AuthenticationError("Wrong username or password.")
app_password = self._cipher.decrypt(creds.password_enc)
if token and salt:
expected = _md5_hex(app_password + salt)
if not hmac.compare_digest(expected, token.lower()):
raise AuthenticationError("Wrong username or password.")
else:
assert password is not None # guaranteed by the missing-param check above
supplied = _decode_legacy_password(password)
if not hmac.compare_digest(supplied, app_password):
raise AuthenticationError("Wrong username or password.")
return creds.user
async def rotate(self, user_id: uuid.UUID) -> str:
"""Generate a fresh app-password, store it encrypted, return the plaintext."""
await self._require_user(user_id)
password = generate_subsonic_password()
await self._users.set_subsonic_password_enc(user_id, self._cipher.encrypt(password))
return password
async def reveal(self, user_id: uuid.UUID) -> str:
"""Return the current app-password, generating one on first access."""
await self._require_user(user_id)
enc = await self._users.get_subsonic_password_enc(user_id)
if enc is None:
return await self.rotate(user_id)
return self._cipher.decrypt(enc)
async def _require_user(self, user_id: uuid.UUID) -> User:
user = await self._users.get_by_id(user_id)
if user is None:
raise NotFoundError("User not found.")
return user
+6
View File
@@ -45,6 +45,12 @@ class Settings(BaseSettings):
access_token_ttl_seconds: int = 60 * 15 # 15 min
refresh_token_ttl_seconds: int = 60 * 60 * 24 * 30 # 30 days (offline-first)
# -- subsonic ---------------------------------------------------------
# Symmetric key (any string) used to encrypt each user's recoverable
# Subsonic app-password at rest. A Fernet key is derived from it; rotating
# this value renders stored app-passwords undecryptable (rotate them too).
subsonic_secret_key: SecretStr = SecretStr("change-me-subsonic-key")
# -- media / storage --------------------------------------------------
media_path: Path = Path("/data/media")
transcode_cache_path: Path = Path("/data/transcode-cache")
+38
View File
@@ -6,16 +6,54 @@ to this module (CLAUDE.md: security is a cross-cutting concern in ``core``).
Higher layers depend only on the Protocols, never on pwdlib/pyjwt directly.
"""
import base64
import datetime as dt
import hashlib
import secrets
import uuid
import jwt
from cryptography.fernet import Fernet, InvalidToken
from pwdlib import PasswordHash
from app.core.config import Settings
from app.domain.errors import AuthenticationError
from app.domain.tokens import IssuedToken, TokenClaims, TokenType
# Length (in bytes of entropy) of a generated Subsonic app-password. 18 bytes of
# url-safe base64 → 24 characters, well above the Subsonic auth threat model.
_SUBSONIC_PASSWORD_ENTROPY_BYTES = 18
def generate_subsonic_password() -> str:
"""A fresh, high-entropy Subsonic app-password (url-safe, ~24 chars)."""
return secrets.token_urlsafe(_SUBSONIC_PASSWORD_ENTROPY_BYTES)
class SubsonicPasswordCipher:
"""Symmetric encrypt/decrypt for the recoverable Subsonic app-password.
Subsonic auth (``t=md5(password+salt)`` and legacy ``p=``) needs the plaintext
password server-side, so — unlike the argon2-hashed login password — the
app-password is stored *encrypted*, not hashed. A Fernet key (AES-128-CBC +
HMAC) is derived from the configured secret; the plaintext key never touches
the DB. Implements :class:`app.domain.ports.SubsonicCipher`.
"""
def __init__(self, secret_key: str) -> None:
digest = hashlib.sha256(secret_key.encode("utf-8")).digest()
self._fernet = Fernet(base64.urlsafe_b64encode(digest))
def encrypt(self, plaintext: str) -> str:
return self._fernet.encrypt(plaintext.encode("utf-8")).decode("ascii")
def decrypt(self, token: str) -> str:
try:
return self._fernet.decrypt(token.encode("ascii")).decode("utf-8")
except InvalidToken as exc:
# Wrong/rotated secret key, or corrupted ciphertext.
raise AuthenticationError("Stored Subsonic password could not be decrypted.") from exc
class Argon2PasswordHasher:
"""argon2id hasher with sensible defaults from pwdlib."""
+2 -1
View File
@@ -6,7 +6,7 @@ from app.domain.entities.like import Like
from app.domain.entities.playlist import Playlist
from app.domain.entities.storage import ObjectStat
from app.domain.entities.track import Artist, Track
from app.domain.entities.user import Credentials, User
from app.domain.entities.user import Credentials, SubsonicCredentials, User
__all__ = [
"Album",
@@ -16,6 +16,7 @@ __all__ = [
"ObjectStat",
"PlayHistoryEntry",
"Playlist",
"SubsonicCredentials",
"Track",
"User",
]
+11
View File
@@ -31,3 +31,14 @@ class Credentials:
user: User
password_hash: str
@dataclass(frozen=True, slots=True)
class SubsonicCredentials:
"""A user paired with their *encrypted* Subsonic app-password.
``password_enc`` is ``None`` until the user generates one. Stays inside the
application layer; the plaintext is only recovered for auth verification."""
user: User
password_enc: str | None
+25 -1
View File
@@ -19,6 +19,7 @@ from app.domain.entities import (
ObjectStat,
PlayHistoryEntry,
Playlist,
SubsonicCredentials,
User,
)
from app.domain.entities.track import Artist, Track
@@ -34,6 +35,19 @@ class UserRepository(Protocol):
async def set_superuser(self, user_id: uuid.UUID, is_superuser: bool) -> User: ...
async def set_active(self, user_id: uuid.UUID, is_active: bool) -> User: ...
async def count(self) -> int: ...
# -- subsonic app-password (recoverable, encrypted at rest) ----------
async def get_subsonic_credentials_by_username(
self, username: str
) -> SubsonicCredentials | None: ...
async def get_subsonic_password_enc(self, user_id: uuid.UUID) -> str | None: ...
async def set_subsonic_password_enc(self, user_id: uuid.UUID, password_enc: str) -> None: ...
class SubsonicCipher(Protocol):
"""Symmetric encrypt/decrypt for the recoverable Subsonic app-password."""
def encrypt(self, plaintext: str) -> str: ...
def decrypt(self, token: str) -> str: ...
class RefreshTokenRepository(Protocol):
@@ -109,6 +123,9 @@ class TrackRepository(Protocol):
added_by: uuid.UUID | None,
) -> Track: ...
async def delete(self, track_id: uuid.UUID) -> None: ...
# genres must come before ``list`` — the method named ``list`` shadows the
# builtin in later annotations (same pattern as AlbumRepository below).
async def genres(self) -> list[tuple[str, int]]: ...
async def list(
self,
*,
@@ -145,7 +162,14 @@ class AlbumRepository(Protocol):
async def track_count_many(self, album_ids: list[uuid.UUID]) -> dict[uuid.UUID, int]: ...
# list must come after any method using list[...] in its signature (name shadowing)
async def list(
self, *, artist_id: uuid.UUID | None, q: str | None, limit: int, offset: int
self,
*,
artist_id: uuid.UUID | None,
q: str | None,
limit: int,
offset: int,
sort_by: str = "title",
order: str = "asc",
) -> list[Album]: ...
+3
View File
@@ -18,6 +18,9 @@ class UserModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
# Admin is a single flag in Phase 1 — no role system (plan §3.5).
is_superuser: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
# Recoverable Subsonic app-password, Fernet-encrypted at rest. NULL until the
# user generates one. Never the argon2 login password — see core.security.
subsonic_password_enc: Mapped[str | None] = mapped_column(String(255), nullable=True)
class RefreshTokenModel(UUIDPrimaryKeyMixin, Base):
@@ -76,12 +76,20 @@ class SqlAlchemyAlbumRepository:
q: str | None,
limit: int,
offset: int,
sort_by: str = "title",
order: str = "asc",
) -> list[Album]:
stmt = select(AlbumModel)
if artist_id is not None:
stmt = stmt.where(AlbumModel.artist_id == artist_id)
if q:
stmt = stmt.where(AlbumModel.title.ilike(f"%{q}%"))
stmt = stmt.order_by(AlbumModel.title).limit(limit).offset(offset)
if order == "random":
stmt = stmt.order_by(func.random())
else:
col = AlbumModel.created_at if sort_by == "created" else AlbumModel.title
stmt = stmt.order_by(col.desc() if order == "desc" else col.asc())
stmt = stmt.limit(limit).offset(offset)
rows = (await self._session.execute(stmt)).scalars().all()
return [_to_entity(r) for r in rows]
@@ -87,6 +87,21 @@ class SqlAlchemyTrackRepository:
await self._session.delete(row)
await self._session.flush()
async def genres(self) -> list[tuple[str, int]]:
"""Distinct non-null genres with their song counts, most common first.
Defined before ``list`` — the method named ``list`` shadows the builtin
in later annotations within the class body."""
rows = (
await self._session.execute(
select(TrackModel.genre, func.count(TrackModel.id).label("cnt"))
.where(TrackModel.genre.is_not(None))
.group_by(TrackModel.genre)
.order_by(func.count(TrackModel.id).desc())
)
).all()
return [(row.genre, row.cnt) for row in rows]
async def list(
self,
*,
@@ -10,7 +10,7 @@ import uuid
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.entities import Credentials, User
from app.domain.entities import Credentials, SubsonicCredentials, User
from app.domain.errors import NotFoundError
from app.infrastructure.db.models import UserModel
@@ -91,3 +91,22 @@ class SqlAlchemyUserRepository:
return (
await self._session.execute(select(func.count()).select_from(UserModel))
).scalar_one()
async def get_subsonic_credentials_by_username(
self, username: str
) -> SubsonicCredentials | None:
row = (
await self._session.execute(select(UserModel).where(UserModel.username == username))
).scalar_one_or_none()
if row is None:
return None
return SubsonicCredentials(user=_to_entity(row), password_enc=row.subsonic_password_enc)
async def get_subsonic_password_enc(self, user_id: uuid.UUID) -> str | None:
row = await self._get_row(user_id)
return row.subsonic_password_enc
async def set_subsonic_password_enc(self, user_id: uuid.UUID, password_enc: str) -> None:
row = await self._get_row(user_id)
row.subsonic_password_enc = password_enc
await self._session.flush()
+7
View File
@@ -21,6 +21,8 @@ dependencies = [
# auth
"pyjwt>=2.10",
"pwdlib[argon2]>=0.2.1",
# symmetric encryption for the recoverable Subsonic app-password (Fernet)
"cryptography>=44.0",
# outbound http (ML client, MusicBrainz, AcoustID)
"httpx>=0.28",
# S3-compatible object storage
@@ -71,6 +73,11 @@ select = [
]
ignore = ["B008"] # FastAPI Depends() in defaults is idiomatic
[tool.ruff.lint.per-file-ignores]
# Subsonic query params are camelCase by spec (artistCount, songId, …); the
# handler arg names must match the wire names exactly.
"app/api/rest/*" = ["N803"]
[tool.mypy]
python_version = "3.14"
strict = true
+18 -1
View File
@@ -4,13 +4,14 @@ import datetime as dt
import uuid
from dataclasses import dataclass, replace
from app.domain.entities import Credentials, User
from app.domain.entities import Credentials, SubsonicCredentials, User
@dataclass
class _Stored:
user: User
password_hash: str
subsonic_password_enc: str | None = None
class InMemoryUserRepository:
@@ -61,6 +62,22 @@ class InMemoryUserRepository:
async def count(self) -> int:
return len(self._by_id)
async def get_subsonic_credentials_by_username(
self, username: str
) -> SubsonicCredentials | None:
for stored in self._by_id.values():
if stored.user.username == username:
return SubsonicCredentials(
user=stored.user, password_enc=stored.subsonic_password_enc
)
return None
async def get_subsonic_password_enc(self, user_id: uuid.UUID) -> str | None:
return self._by_id[user_id].subsonic_password_enc
async def set_subsonic_password_enc(self, user_id: uuid.UUID, password_enc: str) -> None:
self._by_id[user_id].subsonic_password_enc = password_enc
@dataclass
class _Token:
+236
View File
@@ -0,0 +1,236 @@
"""Integration tests for the Subsonic /rest layer (happy path per endpoint group).
Requires a reachable Postgres; skips otherwise (mirrors test_upload_stream_api).
Drives the real ASGI app: seeds a user + a track, mints a Subsonic app-password
via the native API, then exercises /rest with real query-string auth.
"""
import asyncio
import os
from collections.abc import AsyncIterator
from pathlib import Path
from xml.etree import ElementTree as ET
import pytest
from app.core.config import get_settings
from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
from app.infrastructure.db.repositories import (
SqlAlchemyRefreshTokenRepository,
SqlAlchemyUserRepository,
)
from asgi_lifespan import LifespanManager
from httpx import ASGITransport, AsyncClient
pytestmark = pytest.mark.asyncio
_db_reachable_cache: bool | None = None
async def _db_reachable() -> bool:
global _db_reachable_cache
if _db_reachable_cache is not None:
return _db_reachable_cache
from sqlalchemy import text
try:
async with asyncio.timeout(3):
async with get_engine().connect() as conn:
await conn.execute(text("SELECT 1"))
_db_reachable_cache = True
except Exception:
_db_reachable_cache = False
return _db_reachable_cache
@pytest.fixture
async def api(tmp_path: Path) -> AsyncIterator[AsyncClient]:
if not await _db_reachable():
pytest.skip("Postgres not reachable — integration test skipped.")
os.environ["MEDIA_PATH"] = str(tmp_path)
get_settings.cache_clear()
import app.infrastructure.storage.provider as _storage_provider
_storage_provider._storage = None
try:
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
from app.application.user_service import UserService
from app.core.security import Argon2PasswordHasher
async with session_scope() as session:
await UserService(
users=SqlAlchemyUserRepository(session),
refresh_tokens=SqlAlchemyRefreshTokenRepository(session),
hasher=Argon2PasswordHasher(),
).create_user(username="testuser", password="testpass1", is_superuser=False)
from app.main import create_app
app = create_app()
async with LifespanManager(app):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await dispose_engine()
finally:
_storage_provider._storage = None
os.environ.pop("MEDIA_PATH", None)
get_settings.cache_clear()
async def _login(api: AsyncClient) -> str:
resp = await api.post(
"/api/v1/auth/login", json={"username": "testuser", "password": "testpass1"}
)
assert resp.status_code == 200
return str(resp.json()["access_token"])
async def _subsonic_password(api: AsyncClient, token: str) -> str:
resp = await api.get(
"/api/v1/users/me/subsonic-password", headers={"Authorization": f"Bearer {token}"}
)
assert resp.status_code == 200, resp.text
return str(resp.json()["password"])
async def _seed_track(api: AsyncClient, token: str) -> str:
resp = await api.post(
"/api/v1/upload",
files={"file": ("song.mp3", b"audio bytes for subsonic" * 20, "audio/mpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 200, resp.text
return str(resp.json()["track_id"])
def _auth_params(password: str) -> dict[str, str]:
# Legacy plaintext password auth (p=) keeps the test simple; t+s is covered
# by the auth-service unit tests.
return {"u": "testuser", "p": password, "c": "pytest", "v": "1.16.1", "f": "json"}
async def _setup(api: AsyncClient) -> tuple[dict[str, str], str]:
token = await _login(api)
password = await _subsonic_password(api, token)
track_id = await _seed_track(api, token)
return _auth_params(password), track_id
async def test_ping_ok(api: AsyncClient) -> None:
params, _ = await _setup(api)
resp = await api.get("/rest/ping", params=params)
assert resp.status_code == 200
assert resp.json()["subsonic-response"]["status"] == "ok"
async def test_ping_bad_credentials_returns_code_40(api: AsyncClient) -> None:
await _setup(api)
resp = await api.get(
"/rest/ping",
params={"u": "testuser", "p": "wrong", "c": "pytest", "v": "1.16.1", "f": "json"},
)
# Subsonic errors are HTTP 200 with the failure inside the envelope.
assert resp.status_code == 200
body = resp.json()["subsonic-response"]
assert body["status"] == "failed"
assert body["error"]["code"] == 40
async def test_ping_xml_default(api: AsyncClient) -> None:
params, _ = await _setup(api)
xml_params = {k: v for k, v in params.items() if k != "f"}
resp = await api.get("/rest/ping", params=xml_params)
assert resp.status_code == 200
assert resp.headers["content-type"].startswith("application/xml")
root = ET.fromstring(resp.content)
assert root.attrib["status"] == "ok"
async def test_get_artists(api: AsyncClient) -> None:
params, _ = await _setup(api)
resp = await api.get("/rest/getArtists", params=params)
body = resp.json()["subsonic-response"]
assert body["status"] == "ok"
assert "artists" in body
async def test_get_album_list2(api: AsyncClient) -> None:
params, _ = await _setup(api)
resp = await api.get("/rest/getAlbumList2", params={**params, "type": "newest"})
body = resp.json()["subsonic-response"]
assert body["status"] == "ok"
assert "albumList2" in body
async def test_search3_finds_song(api: AsyncClient) -> None:
params, track_id = await _setup(api)
resp = await api.get("/rest/search3", params={**params, "query": "song"})
result = resp.json()["subsonic-response"]["searchResult3"]
song_ids = [s["id"] for s in result.get("song", [])]
assert f"tr-{track_id}" in song_ids
async def test_get_song(api: AsyncClient) -> None:
params, track_id = await _setup(api)
resp = await api.get("/rest/getSong", params={**params, "id": f"tr-{track_id}"})
song = resp.json()["subsonic-response"]["song"]
assert song["id"] == f"tr-{track_id}"
async def test_stream_returns_audio(api: AsyncClient) -> None:
params, track_id = await _setup(api)
resp = await api.get("/rest/stream", params={**params, "id": f"tr-{track_id}"})
assert resp.status_code == 200
assert resp.headers["content-type"].startswith("audio/")
assert resp.content == b"audio bytes for subsonic" * 20
async def test_get_cover_art_placeholder(api: AsyncClient) -> None:
params, track_id = await _setup(api)
resp = await api.get("/rest/getCoverArt", params={**params, "id": f"tr-{track_id}"})
assert resp.status_code == 200
assert resp.headers["content-type"] == "image/png"
async def test_playlist_lifecycle(api: AsyncClient) -> None:
params, track_id = await _setup(api)
created = await api.get(
"/rest/createPlaylist", params={**params, "name": "Roadtrip", "songId": f"tr-{track_id}"}
)
playlist = created.json()["subsonic-response"]["playlist"]
assert playlist["name"] == "Roadtrip"
assert playlist["songCount"] == 1
playlist_id = playlist["id"]
listed = await api.get("/rest/getPlaylists", params=params)
names = [p["name"] for p in listed.json()["subsonic-response"]["playlists"]["playlist"]]
assert "Roadtrip" in names
deleted = await api.get("/rest/deletePlaylist", params={**params, "id": playlist_id})
assert deleted.json()["subsonic-response"]["status"] == "ok"
async def test_star_and_scrobble(api: AsyncClient) -> None:
params, track_id = await _setup(api)
star = await api.get("/rest/star", params={**params, "id": f"tr-{track_id}"})
assert star.json()["subsonic-response"]["status"] == "ok"
scrobble = await api.get(
"/rest/scrobble", params={**params, "id": f"tr-{track_id}", "submission": "true"}
)
assert scrobble.json()["subsonic-response"]["status"] == "ok"
# The like landed in the append-only log → it surfaces via the native API.
token = await _login(api)
likes = await api.get("/api/v1/likes", headers={"Authorization": f"Bearer {token}"})
assert any(item["id"] == track_id for item in likes.json()["items"])
+131
View File
@@ -0,0 +1,131 @@
"""Unit tests for SubsonicAuthService — verification + app-password lifecycle.
DB-free: uses the in-memory user repository and a real cipher.
"""
import hashlib
import pytest
from app.application.subsonic_auth_service import SubsonicAuthService
from app.core.security import SubsonicPasswordCipher
from app.domain.errors import AuthenticationError, ValidationError
from tests.fakes import InMemoryUserRepository
pytestmark = pytest.mark.asyncio
_KNOWN_PASSWORD = "s3cret-app-password"
def _md5(value: str) -> str:
return hashlib.md5(value.encode(), usedforsecurity=False).hexdigest()
async def _service_with_user(*, password: str | None = _KNOWN_PASSWORD, active: bool = True):
users = InMemoryUserRepository()
cipher = SubsonicPasswordCipher("test-key")
user = await users.add(username="alice", password_hash="x", is_superuser=False)
if not active:
await users.set_active(user.id, False)
if password is not None:
await users.set_subsonic_password_enc(user.id, cipher.encrypt(password))
service = SubsonicAuthService(users=users, cipher=cipher)
return service, user
async def test_authenticate_token_salt_success() -> None:
service, user = await _service_with_user()
salt = "abcdef"
token = _md5(_KNOWN_PASSWORD + salt)
result = await service.authenticate(username="alice", token=token, salt=salt, password=None)
assert result.id == user.id
async def test_authenticate_plain_password_success() -> None:
service, user = await _service_with_user()
result = await service.authenticate(
username="alice", token=None, salt=None, password=_KNOWN_PASSWORD
)
assert result.id == user.id
async def test_authenticate_enc_password_success() -> None:
service, user = await _service_with_user()
enc = "enc:" + _KNOWN_PASSWORD.encode().hex()
result = await service.authenticate(username="alice", token=None, salt=None, password=enc)
assert result.id == user.id
async def test_authenticate_wrong_token_fails() -> None:
service, _ = await _service_with_user()
with pytest.raises(AuthenticationError):
await service.authenticate(
username="alice", token=_md5("wrong" + "abc"), salt="abc", password=None
)
async def test_authenticate_wrong_password_fails() -> None:
service, _ = await _service_with_user()
with pytest.raises(AuthenticationError):
await service.authenticate(username="alice", token=None, salt=None, password="nope")
async def test_authenticate_unknown_user_fails() -> None:
service, _ = await _service_with_user()
with pytest.raises(AuthenticationError):
await service.authenticate(
username="ghost", token=None, salt=None, password=_KNOWN_PASSWORD
)
async def test_authenticate_inactive_user_fails() -> None:
service, _ = await _service_with_user(active=False)
with pytest.raises(AuthenticationError):
await service.authenticate(
username="alice", token=None, salt=None, password=_KNOWN_PASSWORD
)
async def test_authenticate_no_password_set_fails() -> None:
service, _ = await _service_with_user(password=None)
with pytest.raises(AuthenticationError):
await service.authenticate(
username="alice", token=None, salt=None, password=_KNOWN_PASSWORD
)
async def test_authenticate_missing_username_is_validation_error() -> None:
service, _ = await _service_with_user()
with pytest.raises(ValidationError):
await service.authenticate(username=None, token=None, salt=None, password=_KNOWN_PASSWORD)
async def test_authenticate_missing_credentials_is_validation_error() -> None:
service, _ = await _service_with_user()
with pytest.raises(ValidationError):
await service.authenticate(username="alice", token=None, salt=None, password=None)
async def test_rotate_then_authenticate() -> None:
users = InMemoryUserRepository()
cipher = SubsonicPasswordCipher("test-key")
user = await users.add(username="bob", password_hash="x", is_superuser=False)
service = SubsonicAuthService(users=users, cipher=cipher)
password = await service.rotate(user.id)
result = await service.authenticate(username="bob", token=None, salt=None, password=password)
assert result.id == user.id
async def test_reveal_generates_then_is_stable() -> None:
users = InMemoryUserRepository()
cipher = SubsonicPasswordCipher("test-key")
user = await users.add(username="cara", password_hash="x", is_superuser=False)
service = SubsonicAuthService(users=users, cipher=cipher)
first = await service.reveal(user.id)
second = await service.reveal(user.id)
assert first == second # lazily generated once, then stable
rotated = await service.rotate(user.id)
assert rotated != first
+72
View File
@@ -0,0 +1,72 @@
"""Unit tests for the Subsonic response envelope (XML + JSON shapes)."""
import json
from xml.etree import ElementTree as ET
from app.api.rest.envelope import (
SUBSONIC_API_VERSION,
subsonic_error,
subsonic_response,
)
def _xml_root(body: bytes) -> ET.Element:
return ET.fromstring(body)
def _local(tag: str) -> str:
return tag.rsplit("}", 1)[-1] # strip namespace
def test_ok_xml_shape() -> None:
resp = subsonic_response({"license": {"valid": True}}, fmt="xml")
assert resp.media_type.startswith("application/xml")
root = _xml_root(resp.body)
assert _local(root.tag) == "subsonic-response"
assert root.attrib["status"] == "ok"
assert root.attrib["version"] == SUBSONIC_API_VERSION
assert root.attrib["type"] == "mcma"
child = root[0]
assert _local(child.tag) == "license"
assert child.attrib["valid"] == "true"
def test_ok_json_shape() -> None:
resp = subsonic_response({"license": {"valid": True}}, fmt="json")
assert resp.media_type.startswith("application/json")
payload = json.loads(resp.body)["subsonic-response"]
assert payload["status"] == "ok"
assert payload["version"] == SUBSONIC_API_VERSION
assert payload["type"] == "mcma"
assert payload["license"] == {"valid": True}
def test_error_xml_shape() -> None:
resp = subsonic_error(40, "Wrong username or password.", fmt="xml")
root = _xml_root(resp.body)
assert root.attrib["status"] == "failed"
error = root[0]
assert _local(error.tag) == "error"
assert error.attrib["code"] == "40"
assert error.attrib["message"] == "Wrong username or password."
def test_error_json_shape() -> None:
resp = subsonic_error(70, "Not found.", fmt="json")
payload = json.loads(resp.body)["subsonic-response"]
assert payload["status"] == "failed"
assert payload["error"] == {"code": 70, "message": "Not found."}
def test_default_format_is_xml() -> None:
resp = subsonic_response(fmt=None)
assert resp.media_type.startswith("application/xml")
assert _xml_root(resp.body).attrib["status"] == "ok"
def test_list_renders_repeated_elements() -> None:
payload = {"genres": {"genre": [{"value": "Rock"}, {"value": "Jazz"}]}}
root = _xml_root(subsonic_response(payload, fmt="xml").body)
genres = root[0]
values = [g.text for g in genres]
assert values == ["Rock", "Jazz"]
+70
View File
@@ -0,0 +1,70 @@
"""Unit tests for Subsonic crypto + id helpers (no DB, no network)."""
import hashlib
import uuid
import pytest
from app.api.rest import ids
from app.api.rest.ids import IdKind
from app.core.security import SubsonicPasswordCipher, generate_subsonic_password
from app.domain.errors import AuthenticationError, NotFoundError
def test_generate_subsonic_password_is_long_and_unique() -> None:
a = generate_subsonic_password()
b = generate_subsonic_password()
assert a != b
assert len(a) >= 20
def test_cipher_roundtrip() -> None:
cipher = SubsonicPasswordCipher("a-secret-key")
plaintext = generate_subsonic_password()
token = cipher.encrypt(plaintext)
assert token != plaintext
assert cipher.decrypt(token) == plaintext
def test_cipher_token_then_md5_matches() -> None:
"""The decrypted app-password must reproduce a client's t=md5(password+salt)."""
cipher = SubsonicPasswordCipher("a-secret-key")
password = generate_subsonic_password()
enc = cipher.encrypt(password)
salt = "c19b2d"
decrypted = cipher.decrypt(enc)
expected = hashlib.md5((decrypted + salt).encode(), usedforsecurity=False).hexdigest()
client_token = hashlib.md5((password + salt).encode(), usedforsecurity=False).hexdigest()
assert expected == client_token
def test_cipher_wrong_key_fails() -> None:
token = SubsonicPasswordCipher("key-one").encrypt("hunter2")
with pytest.raises(AuthenticationError):
SubsonicPasswordCipher("key-two").decrypt(token)
def test_id_encode_decode_roundtrip() -> None:
value = uuid.uuid4()
assert ids.decode_track(ids.encode_track(value)) == value
assert ids.decode_album(ids.encode_album(value)) == value
assert ids.decode_artist(ids.encode_artist(value)) == value
assert ids.decode_playlist(ids.encode_playlist(value)) == value
def test_id_parse_returns_kind() -> None:
value = uuid.uuid4()
kind, parsed = ids.parse(ids.encode_album(value))
assert kind is IdKind.ALBUM
assert parsed == value
def test_id_wrong_prefix_rejected() -> None:
track = ids.encode_track(uuid.uuid4())
with pytest.raises(NotFoundError):
ids.decode_album(track)
def test_id_malformed_rejected() -> None:
with pytest.raises(NotFoundError):
ids.parse("not-a-real-id")
Generated
+55
View File
@@ -415,6 +415,59 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "cryptography"
version = "48.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "cffi", marker = "platform_python_implementation != 'PyPy'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/9f/a9/db8f313fdcd85d767d4973515e1db101f9c71f95fced83233de224673757/cryptography-48.0.0.tar.gz", hash = "sha256:5c3932f4436d1cccb036cb0eaef46e6e2db91035166f1ad6505c3c9d5a635920", size = 832984, upload-time = "2026-05-04T22:59:38.133Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/df/3d/01f6dd9190170a5a241e0e98c2d04be3664a9e6f5b9b872cde63aff1c3dd/cryptography-48.0.0-cp311-abi3-macosx_10_9_universal2.whl", hash = "sha256:0c558d2cdffd8f4bbb30fc7134c74d2ca9a476f830bb053074498fbc86f41ed6", size = 8001587, upload-time = "2026-05-04T22:57:36.803Z" },
{ url = "https://files.pythonhosted.org/packages/b2/6e/e90527eef33f309beb811cf7c982c3aeffcce8e3edb178baa4ca3ae4a6fa/cryptography-48.0.0-cp311-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:f5333311663ea94f75dd408665686aaf426563556bb5283554a3539177e03b8c", size = 4690433, upload-time = "2026-05-04T22:57:40.373Z" },
{ url = "https://files.pythonhosted.org/packages/90/04/673510ed51ddff56575f306cf1617d80411ee76831ccd3097599140efdfe/cryptography-48.0.0-cp311-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:7995ef305d7165c3f11ae07f2517e5a4f1d5c18da1376a0a9ed496336b69e5f3", size = 4710620, upload-time = "2026-05-04T22:57:42.935Z" },
{ url = "https://files.pythonhosted.org/packages/14/d5/e9c4ef932c8d800490c34d8bd589d64a31d5890e27ec9e9ad532be893294/cryptography-48.0.0-cp311-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:40ba1f85eaa6959837b1d51c9767e230e14612eea4ef110ee8854ada22da1bf5", size = 4696283, upload-time = "2026-05-04T22:57:45.294Z" },
{ url = "https://files.pythonhosted.org/packages/0c/29/174b9dfb60b12d59ecfc6cfa04bc88c21b42a54f01b8aae09bb6e51e4c7f/cryptography-48.0.0-cp311-abi3-manylinux_2_28_ppc64le.whl", hash = "sha256:369a6348999f94bbd53435c894377b20ab95f25a9065c283570e70150d8abc3c", size = 5296573, upload-time = "2026-05-04T22:57:47.933Z" },
{ url = "https://files.pythonhosted.org/packages/95/38/0d29a6fd7d0d1373f0c0c88a04ba20e359b257753ac497564cd660fc1d55/cryptography-48.0.0-cp311-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:a0e692c683f4df67815a2d258b324e66f4738bd7a96a218c826dce4f4bd05d8f", size = 4743677, upload-time = "2026-05-04T22:57:50.067Z" },
{ url = "https://files.pythonhosted.org/packages/30/be/eef653013d5c63b6a490529e0316f9ac14a37602965d4903efed1399f32b/cryptography-48.0.0-cp311-abi3-manylinux_2_31_armv7l.whl", hash = "sha256:18349bbc56f4743c8b12dc32e2bccb2cf83ee8b69a3bba74ef8ae857e26b3d25", size = 4330808, upload-time = "2026-05-04T22:57:52.301Z" },
{ url = "https://files.pythonhosted.org/packages/84/9e/500463e87abb7a0a0f9f256ec21123ecde0a7b5541a15e840ea54551fd81/cryptography-48.0.0-cp311-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:7e8eac43dfca5c4cccc6dad9a80504436fca53bb9bc3100a2386d730fbe6b602", size = 4695941, upload-time = "2026-05-04T22:57:54.603Z" },
{ url = "https://files.pythonhosted.org/packages/e3/dc/7303087450c2ec9e7fbb750e17c2abfbc658f23cbd0e54009509b7cc4091/cryptography-48.0.0-cp311-abi3-manylinux_2_34_ppc64le.whl", hash = "sha256:9ccdac7d40688ecb5a3b4a604b8a88c8002e3442d6c60aead1db2a89a041560c", size = 5252579, upload-time = "2026-05-04T22:57:57.207Z" },
{ url = "https://files.pythonhosted.org/packages/d0/c0/7101d3b7215edcdc90c45da544961fd8ed2d6448f77577460fa75a8443f7/cryptography-48.0.0-cp311-abi3-manylinux_2_34_x86_64.whl", hash = "sha256:bd72e68b06bb1e96913f97dd4901119bc17f39d4586a5adf2d3e47bc2b9d58b5", size = 4743326, upload-time = "2026-05-04T22:57:59.535Z" },
{ url = "https://files.pythonhosted.org/packages/ac/d8/5b833bad13016f562ab9d063d68199a4bd121d18458e439515601d3357ec/cryptography-48.0.0-cp311-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:59baa2cb386c4f0b9905bd6eb4c2a79a69a128408fd31d32ca4d7102d4156321", size = 4826672, upload-time = "2026-05-04T22:58:01.996Z" },
{ url = "https://files.pythonhosted.org/packages/98/e1/7074eb8bf3c135558c73fc2bcf0f5633f912e6fb87e868a55c454080ef09/cryptography-48.0.0-cp311-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:9249e3cd978541d665967ac2cb2787fd6a62bddf1e75b3e347a594d7dacf4f74", size = 4972574, upload-time = "2026-05-04T22:58:03.968Z" },
{ url = "https://files.pythonhosted.org/packages/04/70/e5a1b41d325f797f39427aa44ef8baf0be500065ab6d8e10369d850d4a4f/cryptography-48.0.0-cp311-abi3-win32.whl", hash = "sha256:9c459db21422be75e2809370b829a87eb37f74cd785fc4aa9ea1e5f43b47cda4", size = 3294868, upload-time = "2026-05-04T22:58:06.467Z" },
{ url = "https://files.pythonhosted.org/packages/f4/ac/8ac51b4a5fc5932eb7ee5c517ba7dc8cd834f0048962b6b352f00f41ebf9/cryptography-48.0.0-cp311-abi3-win_amd64.whl", hash = "sha256:5b012212e08b8dd5edc78ef54da83dd9892fd9105323b3993eff6bea65dc21d7", size = 3817107, upload-time = "2026-05-04T22:58:08.845Z" },
{ url = "https://files.pythonhosted.org/packages/6b/84/70e3feea9feea87fd7cbe77efb2712ae1e3e6edf10749dc6e95f4e60e455/cryptography-48.0.0-cp314-cp314t-macosx_10_9_universal2.whl", hash = "sha256:3cb07a3ed6431663cd321ea8a000a1314c74211f823e4177fefa2255e057d1ec", size = 7986556, upload-time = "2026-05-04T22:58:11.172Z" },
{ url = "https://files.pythonhosted.org/packages/89/6e/18e07a618bb5442ba10cf4df16e99c071365528aa570dfcb8c02e25a303b/cryptography-48.0.0-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:8c7378637d7d88016fa6791c159f698b3d3eed28ebf844ac36b9dc04a14dae18", size = 4684776, upload-time = "2026-05-04T22:58:13.712Z" },
{ url = "https://files.pythonhosted.org/packages/be/6a/4ea3b4c6c6759794d5ee2103c304a5076dc4b19ae1f9fe47dba439e159e9/cryptography-48.0.0-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:cc90c0b39b2e3c65ef52c804b72e3c58f8a04ab2a1871272798e5f9572c17d20", size = 4698121, upload-time = "2026-05-04T22:58:16.448Z" },
{ url = "https://files.pythonhosted.org/packages/2f/59/6ff6ad6cae03bb887da2a5860b2c9805f8dac969ef01ce563336c49bd1d1/cryptography-48.0.0-cp314-cp314t-manylinux_2_28_aarch64.whl", hash = "sha256:76341972e1eff8b4bea859f09c0d3e64b96ce931b084f9b9b7db8ef364c30eff", size = 4690042, upload-time = "2026-05-04T22:58:18.544Z" },
{ url = "https://files.pythonhosted.org/packages/ca/b4/fc334ed8cfd705aca282fe4d8f5ae64a8e0f74932e9feecb344610cf6e4d/cryptography-48.0.0-cp314-cp314t-manylinux_2_28_ppc64le.whl", hash = "sha256:55b7718303bf06a5753dcdccf2f3945cf18ad7bffde41b61226e4db31ab89a9c", size = 5282526, upload-time = "2026-05-04T22:58:20.75Z" },
{ url = "https://files.pythonhosted.org/packages/11/08/9f8c5386cc4cd90d8255c7cdd0f5baf459a08502a09de30dc51f553d38dc/cryptography-48.0.0-cp314-cp314t-manylinux_2_28_x86_64.whl", hash = "sha256:a64697c641c7b1b2178e573cbc31c7c6684cd56883a478d75143dbb7118036db", size = 4733116, upload-time = "2026-05-04T22:58:23.627Z" },
{ url = "https://files.pythonhosted.org/packages/b8/77/99307d7574045699f8805aa500fa0fb83422d115b5400a064ddd306d7750/cryptography-48.0.0-cp314-cp314t-manylinux_2_31_armv7l.whl", hash = "sha256:561215ea3879cb1cbbf272867e2efda62476f240fb58c64de6b393ae19246741", size = 4316030, upload-time = "2026-05-04T22:58:25.581Z" },
{ url = "https://files.pythonhosted.org/packages/fd/36/a608b98337af3cb2aff4818e406649d30572b7031918b04c87d979495348/cryptography-48.0.0-cp314-cp314t-manylinux_2_34_aarch64.whl", hash = "sha256:ad64688338ed4bc1a6618076ba75fd7194a5f1797ac60b47afe926285adb3166", size = 4689640, upload-time = "2026-05-04T22:58:27.747Z" },
{ url = "https://files.pythonhosted.org/packages/dd/a6/825010a291b4438aecc1f568bc428189fc1175515223632477c07dc0a6df/cryptography-48.0.0-cp314-cp314t-manylinux_2_34_ppc64le.whl", hash = "sha256:906cbf0670286c6e0044156bc7d4af9cbb0ef6db9f73e52c3ec56ba6bdde5336", size = 5237657, upload-time = "2026-05-04T22:58:29.848Z" },
{ url = "https://files.pythonhosted.org/packages/b9/09/4e76a09b4caa29aad535ddc806f5d4c5d01885bd978bd984fbc6ca032cae/cryptography-48.0.0-cp314-cp314t-manylinux_2_34_x86_64.whl", hash = "sha256:ea8990436d914540a40ab24b6a77c0969695ed52f4a4874c5137ccf7045a7057", size = 4732362, upload-time = "2026-05-04T22:58:32.009Z" },
{ url = "https://files.pythonhosted.org/packages/18/78/444fa04a77d0cb95f417dda20d450e13c56ba8e5220fc892a1658f44f882/cryptography-48.0.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:c18684a7f0cc9a3cb60328f496b8e3372def7c5d2df39ac267878b05565aaaae", size = 4819580, upload-time = "2026-05-04T22:58:34.254Z" },
{ url = "https://files.pythonhosted.org/packages/38/85/ea67067c70a1fd4be2c63d35eeed82658023021affccc7b17705f8527dd2/cryptography-48.0.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:9be5aafa5736574f8f15f262adc81b2a9869e2cfe9014d52a44633905b40d52c", size = 4963283, upload-time = "2026-05-04T22:58:36.376Z" },
{ url = "https://files.pythonhosted.org/packages/75/54/cc6d0f3deac3e81c7f847e8a189a12b6cdd65059b43dad25d4316abd849a/cryptography-48.0.0-cp314-cp314t-win32.whl", hash = "sha256:c17dfe85494deaeddc5ce251aebd1d60bbe6afc8b62071bb0b469431a000124f", size = 3270954, upload-time = "2026-05-04T22:58:38.791Z" },
{ url = "https://files.pythonhosted.org/packages/49/67/cc947e288c0758a4e5473d1dcb743037ab7785541265a969240b8885441a/cryptography-48.0.0-cp314-cp314t-win_amd64.whl", hash = "sha256:27241b1dc9962e056062a8eef1991d02c3a24569c95975bd2322a8a52c6e5e12", size = 3797313, upload-time = "2026-05-04T22:58:40.746Z" },
{ url = "https://files.pythonhosted.org/packages/f2/63/61d4a4e1c6b6bab6ce1e213cd36a24c415d90e76d78c5eb8577c5541d2e8/cryptography-48.0.0-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:58d00498e8933e4a194f3076aee1b4a97dfec1a6da444535755822fe5d8b0b86", size = 7983482, upload-time = "2026-05-04T22:58:43.769Z" },
{ url = "https://files.pythonhosted.org/packages/d5/ac/f5b5995b87770c693e2596559ffafe195b4033a57f14a82268a2842953f3/cryptography-48.0.0-cp39-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:614d0949f4790582d2cc25553abd09dd723025f0c0e7c67376a1d77196743d6e", size = 4683266, upload-time = "2026-05-04T22:58:46.064Z" },
{ url = "https://files.pythonhosted.org/packages/ec/c6/8b14f67e18338fbc4adb76f66c001f5c3610b3e2d1837f268f47a347dbbb/cryptography-48.0.0-cp39-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:7ce4bfae76319a532a2dc68f82cc32f5676ee792a983187dac07183690e5c66f", size = 4696228, upload-time = "2026-05-04T22:58:48.22Z" },
{ url = "https://files.pythonhosted.org/packages/ea/73/f808fbae9514bd91b47875b003f13e284c8c6bdfd904b7944e803937eec1/cryptography-48.0.0-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:2eb992bbd4661238c5a397594c83f5b4dc2bc5b848c365c8f991b6780efcc5c7", size = 4689097, upload-time = "2026-05-04T22:58:50.9Z" },
{ url = "https://files.pythonhosted.org/packages/93/01/d86632d7d28db8ae83221995752eeb6639ffb374c2d22955648cf8d52797/cryptography-48.0.0-cp39-abi3-manylinux_2_28_ppc64le.whl", hash = "sha256:22a5cb272895dce158b2cacdfdc3debd299019659f42947dbdac6f32d68fe832", size = 5283582, upload-time = "2026-05-04T22:58:53.017Z" },
{ url = "https://files.pythonhosted.org/packages/02/e1/50edc7a50334807cc4791fc4a0ce7468b4a1416d9138eab358bfc9a3d70b/cryptography-48.0.0-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:2b4d59804e8408e2fea7d1fbaf218e5ec984325221db76e6a241a9abd6cdd95c", size = 4730479, upload-time = "2026-05-04T22:58:55.611Z" },
{ url = "https://files.pythonhosted.org/packages/6f/af/99a582b1b1641ff5911ac559beb45097cf79efd4ead4657f578ef1af2d47/cryptography-48.0.0-cp39-abi3-manylinux_2_31_armv7l.whl", hash = "sha256:984a20b0f62a26f48a3396c72e4bc34c66e356d356bf370053066b3b6d54634a", size = 4326481, upload-time = "2026-05-04T22:58:57.607Z" },
{ url = "https://files.pythonhosted.org/packages/90/ee/89aa26a06ef0a7d7611788ffd571a7c50e368cc6a4d5eef8b4884e866edb/cryptography-48.0.0-cp39-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:5a5ed8fde7a1d09376ca0b40e68cd59c69fe23b1f9768bd5824f54681626032a", size = 4688713, upload-time = "2026-05-04T22:59:00.077Z" },
{ url = "https://files.pythonhosted.org/packages/70/ba/bcb1b0bb7a33d4c7c0c4d4c7874b4a62ae4f56113a5f4baefa362dfb1f0f/cryptography-48.0.0-cp39-abi3-manylinux_2_34_ppc64le.whl", hash = "sha256:8cd666227ef7af430aa5914a9910e0ddd703e75f039cef0825cd0da71b6b711a", size = 5238165, upload-time = "2026-05-04T22:59:02.317Z" },
{ url = "https://files.pythonhosted.org/packages/c9/70/ca4003b1ce5ca3dc3186ada51908c8a9b9ff7d5cab83cc0d43ee14ec144f/cryptography-48.0.0-cp39-abi3-manylinux_2_34_x86_64.whl", hash = "sha256:9071196d81abc88b3516ac8cdfad32e2b66dd4a5393a8e68a961e9161ddc6239", size = 4729947, upload-time = "2026-05-04T22:59:05.255Z" },
{ url = "https://files.pythonhosted.org/packages/44/a0/4ec7cf774207905aef1a8d11c3750d5a1db805eb380ee4e16df317870128/cryptography-48.0.0-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:1e2d54c8be6152856a36f0882ab231e70f8ec7f14e93cf87db8a2ed056bf160c", size = 4822059, upload-time = "2026-05-04T22:59:07.802Z" },
{ url = "https://files.pythonhosted.org/packages/1e/75/a2e55f99c16fcac7b5d6c1eb19ad8e00799854d6be5ca845f9259eae1681/cryptography-48.0.0-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:a5da777e32ffed6f85a7b2b3f7c5cbc88c146bfcd0a1d7baf5fcc6c52ee35dd4", size = 4960575, upload-time = "2026-05-04T22:59:09.851Z" },
{ url = "https://files.pythonhosted.org/packages/b8/23/6e6f32143ab5d8b36ca848a502c4bcd477ae75b9e1677e3530d669062578/cryptography-48.0.0-cp39-abi3-win32.whl", hash = "sha256:77a2ccbbe917f6710e05ba9adaa25fb5075620bf3ea6fb751997875aff4ae4bd", size = 3279117, upload-time = "2026-05-04T22:59:12.019Z" },
{ url = "https://files.pythonhosted.org/packages/9d/9a/0fea98a70cf1749d41d738836f6349d97945f7c89433a259a6c2642eefeb/cryptography-48.0.0-cp39-abi3-win_amd64.whl", hash = "sha256:16cd65b9330583e4619939b3a3843eec1e6e789744bb01e7c7e2e62e33c239c8", size = 3792100, upload-time = "2026-05-04T22:59:14.884Z" },
]
[[package]]
name = "fastapi"
version = "0.136.3"
@@ -724,6 +777,7 @@ dependencies = [
{ name = "alembic" },
{ name = "arq" },
{ name = "asyncpg" },
{ name = "cryptography" },
{ name = "fastapi" },
{ name = "httpx" },
{ name = "pwdlib", extra = ["argon2"] },
@@ -752,6 +806,7 @@ requires-dist = [
{ name = "alembic", specifier = ">=1.14" },
{ name = "arq", specifier = ">=0.26" },
{ name = "asyncpg", specifier = ">=0.30" },
{ name = "cryptography", specifier = ">=44.0" },
{ name = "fastapi", specifier = ">=0.115" },
{ name = "httpx", specifier = ">=0.28" },
{ name = "pwdlib", extras = ["argon2"], specifier = ">=0.2.1" },