Compare commits

...

6 Commits

Author SHA1 Message Date
Senko-san 4ade6939b6 feat(stream): require auth on GET /stream/{id} via token query param
Docker Build & Publish / build (push) Has been cancelled
Docker Build & Publish / push (push) Has been cancelled
Docker Build & Publish / Prune old image versions (push) Has been cancelled
The audio stream endpoint was unauthenticated. Add a get_streaming_user
dependency that accepts the access token either as a ?token= query param
(the browser <audio> element can't send an Authorization header) or a
bearer header for native clients. Update streaming tests accordingly and
add a test asserting unauthenticated requests are rejected with 401.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 17:11:43 +03:00
Senko-san 5c5df5d3cc feat(storage): S3-compatible storage adapter + storage_uri rename
Add S3FileStorage adapter (any S3-compatible backend: AWS, MinIO, Garage)
alongside the local adapter, selected via STORAGE_BACKEND. Proxied range
streaming via get_object+Range; as_local_path downloads to a tempfile for
ffmpeg/fpcalc. Rename track.file_path -> storage_uri across domain entity,
ORM, repositories, port, and services, with an Alembic migration. Adds
mocked S3 unit tests.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 17:11:35 +03:00
Senko-san a8348e145a feat: build
Docker Build & Publish / build (push) Successful in 1m43s
Docker Build & Publish / push (push) Successful in 48s
Docker Build & Publish / Prune old image versions (push) Successful in 1s
2026-06-07 21:37:30 +03:00
Senko-san 7c920f38f6 feat: implement 1B domain entities/repos + 1H library API routes
1B — domain layer:
- New entities: Album, Playlist, Like, PlayHistoryEntry
- Track entity extended with album_id, genre, year fields
- New protocols: AlbumRepository, PlaylistRepository, LikeRepository, HistoryRepository
- ArtistRepository / TrackRepository protocols extended (list, count, update, get_many, etc.)
- New repos: SqlAlchemyAlbum/Playlist/Like/HistoryRepository
- Artist and track repos updated to match extended protocols

1H — library API:
- Pagination: PagedResponse[T] generic, offset-based, limit default 50 max 200
- Schemas: TrackOut, AlbumOut, ArtistOut, PlaylistOut/Create/Update,
  LikeEvent/State, HistoryIn/Out, LibrarySearchResponse
- GET/PATCH/DELETE /tracks with filters, sort, pagination
- GET /albums, /albums/{id}, /albums/{id}/tracks
- GET /artists, /artists/{id}, /artists/{id}/albums, /artists/{id}/tracks
- GET /search/library (ILIKE across tracks/albums/artists)
- Full /playlists CRUD + track add/remove (append-only version bump)
- POST /likes (append-only event log), GET /likes, GET /likes/state
- POST /history (scrobble), GET /history
- deps.py: TrackRepoDep, ArtistRepoDep, AlbumRepoDep, PlaylistRepoDep,
  LikeRepoDep, HistoryRepoDep

ruff   mypy   pytest 45/45 

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-07 16:43:51 +03:00
Senko-san 81ea93c371 feat: local storage logic & endpoints 2026-06-07 15:34:06 +03:00
Senko-san dfd512a13f feat: models 2026-06-07 14:50:35 +03:00
63 changed files with 4820 additions and 555 deletions
+116
View File
@@ -0,0 +1,116 @@
name: Docker Build & Publish
on:
push:
branches: [master]
workflow_dispatch:
env:
# Number of tagged (non-latest) versions to keep per image name.
KEEP_VERSIONS: "5"
jobs:
build:
runs-on: ubuntu-latest
outputs:
host: ${{ steps.meta.outputs.host }}
image: ${{ steps.meta.outputs.image }}
sha: ${{ steps.meta.outputs.sha }}
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Resolve registry metadata
id: meta
run: |
host=$(echo "${{ gitea.server_url }}" | sed 's|https\?://||; s|/$||')
repo_lc=$(echo "${{ gitea.repository }}" | tr '[:upper:]' '[:lower:]')
echo "host=$host" >> "$GITHUB_OUTPUT"
echo "image=$host/$repo_lc" >> "$GITHUB_OUTPUT"
echo "sha=$(git rev-parse --short HEAD)" >> "$GITHUB_OUTPUT"
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build image
uses: docker/build-push-action@v5
with:
context: .
file: dockerfiles/Dockerfile.prod
push: false
tags: |
${{ steps.meta.outputs.image }}:latest
${{ steps.meta.outputs.image }}:${{ steps.meta.outputs.sha }}
outputs: type=docker,dest=/tmp/image.tar
- name: Upload image artifact
uses: actions/upload-artifact@v3
with:
name: docker-image
path: /tmp/image.tar
retention-days: 1
push:
needs: build
runs-on: ubuntu-latest
steps:
- name: Download image artifact
uses: actions/download-artifact@v3
with:
name: docker-image
path: /tmp
- name: Load image
run: docker load < /tmp/image.tar
- name: Log in to Gitea registry
uses: docker/login-action@v3
with:
registry: ${{ needs.build.outputs.host }}
username: ${{ gitea.actor }}
password: ${{ secrets.PACKAGE_REGISTRY_TOKEN }}
- name: Push image
run: |
docker push ${{ needs.build.outputs.image }}:latest
docker push ${{ needs.build.outputs.image }}:${{ needs.build.outputs.sha }}
cleanup:
name: Prune old image versions
needs: push
runs-on: ubuntu-latest
steps:
- name: Delete versions beyond KEEP_VERSIONS
env:
GITEA_URL: ${{ gitea.server_url }}
OWNER: ${{ gitea.repository_owner }}
IMAGE: ${{ gitea.event.repository.name }}
TOKEN: ${{ secrets.PACKAGE_REGISTRY_TOKEN }}
run: |
image=$(echo "$IMAGE" | tr '[:upper:]' '[:lower:]')
response=$(curl -sf \
-H "Authorization: token $TOKEN" \
-H "Accept: application/json" \
"${GITEA_URL}/api/v1/packages/${OWNER}?type=container&limit=50&q=${image}")
to_delete=$(printf '%s' "$response" \
| jq -r \
--arg name "$image" \
--argjson keep "$KEEP_VERSIONS" \
'[.[] | select(.name == $name and .version != "latest")]
| sort_by(.created) | reverse
| .[$keep:][].version')
if [ -z "$to_delete" ]; then
echo "Nothing to prune."
exit 0
fi
while IFS= read -r version; do
echo "Deleting ${image}:${version}"
curl -sf -X DELETE \
-H "Authorization: token $TOKEN" \
"${GITEA_URL}/api/v1/packages/${OWNER}/container/${image}/${version}" \
&& echo " ok" || echo " failed (may already be gone, continuing)"
done <<< "$to_delete"
@@ -0,0 +1,330 @@
"""music schema
Revision ID: e670d6c41d0c
Revises: 0001_auth_users
Create Date: 2026-06-07 11:37:41.420644
"""
from __future__ import annotations
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "e670d6c41d0c"
down_revision: str | None = "0001_auth_users"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"artists",
sa.Column("name", sa.String(length=512), nullable=False),
sa.Column("musicbrainz_id", sa.String(length=36), nullable=True),
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_artists")),
)
op.create_index(op.f("ix_artists_musicbrainz_id"), "artists", ["musicbrainz_id"], unique=False)
op.create_index(op.f("ix_artists_name"), "artists", ["name"], unique=False)
op.create_table(
"albums",
sa.Column("title", sa.String(length=1024), nullable=False),
sa.Column("artist_id", sa.Uuid(), nullable=False),
sa.Column("year", sa.Integer(), nullable=True),
sa.Column("cover_path", sa.String(length=1024), nullable=True),
sa.Column("musicbrainz_id", sa.String(length=36), nullable=True),
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["artist_id"],
["artists.id"],
name=op.f("fk_albums_artist_id_artists"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_albums")),
)
op.create_index(op.f("ix_albums_artist_id"), "albums", ["artist_id"], unique=False)
op.create_index(op.f("ix_albums_musicbrainz_id"), "albums", ["musicbrainz_id"], unique=False)
op.create_index(op.f("ix_albums_title"), "albums", ["title"], unique=False)
op.create_table(
"download_jobs",
sa.Column("source", sa.String(length=32), nullable=False),
sa.Column("source_id", sa.String(length=512), nullable=True),
sa.Column("query", sa.String(length=1024), nullable=True),
sa.Column("requested_by", sa.Uuid(), nullable=True),
sa.Column("status", sa.String(length=16), nullable=False),
sa.Column("progress", sa.Float(), nullable=False),
sa.Column("error_message", sa.Text(), nullable=True),
sa.Column("retry_count", sa.Integer(), nullable=False),
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["requested_by"],
["users.id"],
name=op.f("fk_download_jobs_requested_by_users"),
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_download_jobs")),
)
op.create_index(
op.f("ix_download_jobs_requested_by"), "download_jobs", ["requested_by"], unique=False
)
op.create_index(op.f("ix_download_jobs_status"), "download_jobs", ["status"], unique=False)
op.create_table(
"playlists",
sa.Column("name", sa.String(length=512), nullable=False),
sa.Column("description", sa.String(length=2048), nullable=True),
sa.Column("owner_id", sa.Uuid(), nullable=False),
sa.Column("cover_path", sa.String(length=1024), nullable=True),
sa.Column("version", sa.Integer(), nullable=False),
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["owner_id"], ["users.id"], name=op.f("fk_playlists_owner_id_users"), ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_playlists")),
)
op.create_index(op.f("ix_playlists_owner_id"), "playlists", ["owner_id"], unique=False)
op.create_table(
"tracks",
sa.Column("title", sa.String(length=1024), nullable=False),
sa.Column("artist_id", sa.Uuid(), nullable=False),
sa.Column("album_id", sa.Uuid(), nullable=True),
sa.Column("track_number", sa.Integer(), nullable=True),
sa.Column("duration_seconds", sa.Integer(), nullable=True),
sa.Column("genre", sa.String(length=255), nullable=True),
sa.Column("year", sa.Integer(), nullable=True),
sa.Column("file_path", sa.String(length=2048), nullable=False),
sa.Column("file_format", sa.String(length=32), nullable=False),
sa.Column("file_size", sa.Integer(), nullable=False),
sa.Column("bitrate", sa.Integer(), nullable=True),
sa.Column("acoustid_fingerprint", sa.String(length=64), nullable=True),
sa.Column("musicbrainz_id", sa.String(length=36), nullable=True),
sa.Column("source", sa.String(length=32), nullable=False),
sa.Column("source_id", sa.String(length=512), nullable=False),
sa.Column("is_replaceable", sa.Boolean(), nullable=False),
sa.Column("storage_policy", sa.String(length=16), nullable=False),
sa.Column("metadata_status", sa.String(length=16), nullable=False),
sa.Column("added_by", sa.Uuid(), nullable=True),
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["added_by"], ["users.id"], name=op.f("fk_tracks_added_by_users"), ondelete="SET NULL"
),
sa.ForeignKeyConstraint(
["album_id"], ["albums.id"], name=op.f("fk_tracks_album_id_albums"), ondelete="SET NULL"
),
sa.ForeignKeyConstraint(
["artist_id"],
["artists.id"],
name=op.f("fk_tracks_artist_id_artists"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_tracks")),
sa.UniqueConstraint("source", "source_id", name="uq_tracks_source_source_id"),
)
op.create_index(
op.f("ix_tracks_acoustid_fingerprint"), "tracks", ["acoustid_fingerprint"], unique=False
)
op.create_index(op.f("ix_tracks_added_by"), "tracks", ["added_by"], unique=False)
op.create_index(op.f("ix_tracks_album_id"), "tracks", ["album_id"], unique=False)
op.create_index(op.f("ix_tracks_artist_id"), "tracks", ["artist_id"], unique=False)
op.create_index(op.f("ix_tracks_genre"), "tracks", ["genre"], unique=False)
op.create_index(op.f("ix_tracks_musicbrainz_id"), "tracks", ["musicbrainz_id"], unique=False)
op.create_index(op.f("ix_tracks_title"), "tracks", ["title"], unique=False)
op.create_table(
"likes",
sa.Column("user_id", sa.Uuid(), nullable=False),
sa.Column("track_id", sa.Uuid(), nullable=False),
sa.Column("value", sa.String(length=16), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column("id", sa.Uuid(), nullable=False),
sa.ForeignKeyConstraint(
["track_id"], ["tracks.id"], name=op.f("fk_likes_track_id_tracks"), ondelete="CASCADE"
),
sa.ForeignKeyConstraint(
["user_id"], ["users.id"], name=op.f("fk_likes_user_id_users"), ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_likes")),
)
op.create_index("ix_likes_user_id_track_id", "likes", ["user_id", "track_id"], unique=False)
op.create_table(
"lyrics",
sa.Column("track_id", sa.Uuid(), nullable=False),
sa.Column("synced", sa.Text(), nullable=True),
sa.Column("plain", sa.Text(), nullable=True),
sa.Column("source", sa.String(length=64), nullable=True),
sa.Column("status", sa.String(length=16), nullable=False),
sa.Column(
"fetched_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column("id", sa.Uuid(), nullable=False),
sa.ForeignKeyConstraint(
["track_id"], ["tracks.id"], name=op.f("fk_lyrics_track_id_tracks"), ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_lyrics")),
sa.UniqueConstraint("track_id", name=op.f("uq_lyrics_track_id")),
)
op.create_table(
"play_history",
sa.Column("user_id", sa.Uuid(), nullable=False),
sa.Column("track_id", sa.Uuid(), nullable=False),
sa.Column(
"played_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False
),
sa.Column("play_duration_seconds", sa.Integer(), nullable=True),
sa.Column("completed", sa.Boolean(), nullable=False),
sa.Column("id", sa.Uuid(), nullable=False),
sa.ForeignKeyConstraint(
["track_id"],
["tracks.id"],
name=op.f("fk_play_history_track_id_tracks"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["user_id"],
["users.id"],
name=op.f("fk_play_history_user_id_users"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_play_history")),
)
op.create_index(op.f("ix_play_history_track_id"), "play_history", ["track_id"], unique=False)
op.create_index(
"ix_play_history_user_id_played_at", "play_history", ["user_id", "played_at"], unique=False
)
op.create_table(
"playlist_tracks",
sa.Column("playlist_id", sa.Uuid(), nullable=False),
sa.Column("track_id", sa.Uuid(), nullable=False),
sa.Column("position", sa.Float(), nullable=False),
sa.Column(
"added_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False
),
sa.Column("id", sa.Uuid(), nullable=False),
sa.ForeignKeyConstraint(
["playlist_id"],
["playlists.id"],
name=op.f("fk_playlist_tracks_playlist_id_playlists"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["track_id"],
["tracks.id"],
name=op.f("fk_playlist_tracks_track_id_tracks"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_playlist_tracks")),
sa.UniqueConstraint(
"playlist_id", "track_id", name="uq_playlist_tracks_playlist_id_track_id"
),
)
op.create_index(
op.f("ix_playlist_tracks_playlist_id"), "playlist_tracks", ["playlist_id"], unique=False
)
op.create_index(
op.f("ix_playlist_tracks_track_id"), "playlist_tracks", ["track_id"], unique=False
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f("ix_playlist_tracks_track_id"), table_name="playlist_tracks")
op.drop_index(op.f("ix_playlist_tracks_playlist_id"), table_name="playlist_tracks")
op.drop_table("playlist_tracks")
op.drop_index("ix_play_history_user_id_played_at", table_name="play_history")
op.drop_index(op.f("ix_play_history_track_id"), table_name="play_history")
op.drop_table("play_history")
op.drop_table("lyrics")
op.drop_index("ix_likes_user_id_track_id", table_name="likes")
op.drop_table("likes")
op.drop_index(op.f("ix_tracks_title"), table_name="tracks")
op.drop_index(op.f("ix_tracks_musicbrainz_id"), table_name="tracks")
op.drop_index(op.f("ix_tracks_genre"), table_name="tracks")
op.drop_index(op.f("ix_tracks_artist_id"), table_name="tracks")
op.drop_index(op.f("ix_tracks_album_id"), table_name="tracks")
op.drop_index(op.f("ix_tracks_added_by"), table_name="tracks")
op.drop_index(op.f("ix_tracks_acoustid_fingerprint"), table_name="tracks")
op.drop_table("tracks")
op.drop_index(op.f("ix_playlists_owner_id"), table_name="playlists")
op.drop_table("playlists")
op.drop_index(op.f("ix_download_jobs_status"), table_name="download_jobs")
op.drop_index(op.f("ix_download_jobs_requested_by"), table_name="download_jobs")
op.drop_table("download_jobs")
op.drop_index(op.f("ix_albums_title"), table_name="albums")
op.drop_index(op.f("ix_albums_musicbrainz_id"), table_name="albums")
op.drop_index(op.f("ix_albums_artist_id"), table_name="albums")
op.drop_table("albums")
op.drop_index(op.f("ix_artists_name"), table_name="artists")
op.drop_index(op.f("ix_artists_musicbrainz_id"), table_name="artists")
op.drop_table("artists")
# ### end Alembic commands ###
@@ -0,0 +1,25 @@
"""rename track file_path to storage_uri
Revision ID: 20260608_storage_uri
Revises: e670d6c41d0c
Create Date: 2026-06-08 11:32:00.000000
"""
from __future__ import annotations
from collections.abc import Sequence
from alembic import op
revision: str = "20260608_storage_uri"
down_revision: str | None = "e670d6c41d0c"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.alter_column("tracks", "file_path", new_column_name="storage_uri")
def downgrade() -> None:
op.alter_column("tracks", "storage_uri", new_column_name="file_path")
+88 -1
View File
@@ -15,17 +15,26 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.application.auth_service import AuthService from app.application.auth_service import AuthService
from app.application.streaming_service import StreamingService
from app.application.upload_service import UploadService
from app.application.user_service import UserService from app.application.user_service import UserService
from app.core.config import get_settings from app.core.config import get_settings
from app.core.security import Argon2PasswordHasher, JwtTokenService from app.core.security import Argon2PasswordHasher, JwtTokenService
from app.domain.entities import User from app.domain.entities import User
from app.domain.errors import AuthenticationError, PermissionDeniedError from app.domain.errors import AuthenticationError, PermissionDeniedError
from app.domain.ports import PasswordHasher, TokenService from app.domain.ports import FileStorage, PasswordHasher, TokenService
from app.infrastructure.db import get_sessionmaker from app.infrastructure.db import get_sessionmaker
from app.infrastructure.db.repositories import ( from app.infrastructure.db.repositories import (
SqlAlchemyAlbumRepository,
SqlAlchemyArtistRepository,
SqlAlchemyHistoryRepository,
SqlAlchemyLikeRepository,
SqlAlchemyPlaylistRepository,
SqlAlchemyRefreshTokenRepository, SqlAlchemyRefreshTokenRepository,
SqlAlchemyTrackRepository,
SqlAlchemyUserRepository, SqlAlchemyUserRepository,
) )
from app.infrastructure.storage.provider import get_file_storage
async def get_session() -> AsyncIterator[AsyncSession]: async def get_session() -> AsyncIterator[AsyncSession]:
@@ -77,6 +86,64 @@ AuthServiceDep = Annotated[AuthService, Depends(get_auth_service)]
UserServiceDep = Annotated[UserService, Depends(get_user_service)] UserServiceDep = Annotated[UserService, Depends(get_user_service)]
# -- file storage (process-cached) ---------------------------------------------
FileStorageDep = Annotated[FileStorage, Depends(get_file_storage)]
def get_upload_service(session: SessionDep, storage: FileStorageDep) -> UploadService:
settings = get_settings()
return UploadService(
tracks=SqlAlchemyTrackRepository(session),
artists=SqlAlchemyArtistRepository(session),
storage=storage,
tmp_dir=settings.upload_tmp_dir,
)
def get_streaming_service(session: SessionDep, storage: FileStorageDep) -> StreamingService:
return StreamingService(
tracks=SqlAlchemyTrackRepository(session),
storage=storage,
)
UploadServiceDep = Annotated[UploadService, Depends(get_upload_service)]
StreamingServiceDep = Annotated[StreamingService, Depends(get_streaming_service)]
# -- library repository deps ---------------------------------------------------
def get_track_repository(session: SessionDep) -> SqlAlchemyTrackRepository:
return SqlAlchemyTrackRepository(session)
def get_artist_repository(session: SessionDep) -> SqlAlchemyArtistRepository:
return SqlAlchemyArtistRepository(session)
def get_album_repository(session: SessionDep) -> SqlAlchemyAlbumRepository:
return SqlAlchemyAlbumRepository(session)
def get_playlist_repository(session: SessionDep) -> SqlAlchemyPlaylistRepository:
return SqlAlchemyPlaylistRepository(session)
def get_like_repository(session: SessionDep) -> SqlAlchemyLikeRepository:
return SqlAlchemyLikeRepository(session)
def get_history_repository(session: SessionDep) -> SqlAlchemyHistoryRepository:
return SqlAlchemyHistoryRepository(session)
TrackRepoDep = Annotated[SqlAlchemyTrackRepository, Depends(get_track_repository)]
ArtistRepoDep = Annotated[SqlAlchemyArtistRepository, Depends(get_artist_repository)]
AlbumRepoDep = Annotated[SqlAlchemyAlbumRepository, Depends(get_album_repository)]
PlaylistRepoDep = Annotated[SqlAlchemyPlaylistRepository, Depends(get_playlist_repository)]
LikeRepoDep = Annotated[SqlAlchemyLikeRepository, Depends(get_like_repository)]
HistoryRepoDep = Annotated[SqlAlchemyHistoryRepository, Depends(get_history_repository)]
# -- current user / authorization ---------------------------------------------- # -- current user / authorization ----------------------------------------------
# auto_error=False: we raise domain AuthenticationError (mapped to 401) so the # auto_error=False: we raise domain AuthenticationError (mapped to 401) so the
# error envelope stays consistent with the rest of the API. # error envelope stays consistent with the rest of the API.
@@ -100,3 +167,23 @@ async def get_current_superuser(user: CurrentUser) -> User:
SuperUser = Annotated[User, Depends(get_current_superuser)] SuperUser = Annotated[User, Depends(get_current_superuser)]
async def get_streaming_user(
auth: AuthServiceDep,
credentials: BearerDep,
token: str | None = None,
) -> User:
"""Authenticate a stream request.
The browser ``<audio>`` element cannot send an ``Authorization`` header, so
the access token is accepted as a ``?token=`` query param; native clients may
still use a bearer header. Either way it's the same access token.
"""
raw = token or (credentials.credentials if credentials else None)
if not raw:
raise AuthenticationError("Missing access token.")
return await auth.authenticate_access(raw)
StreamUser = Annotated[User, Depends(get_streaming_user)]
+11
View File
@@ -12,6 +12,8 @@ from app.domain.errors import (
DomainError, DomainError,
NotFoundError, NotFoundError,
PermissionDeniedError, PermissionDeniedError,
RangeNotSatisfiableError,
StorageError,
ValidationError, ValidationError,
) )
@@ -25,6 +27,7 @@ _STATUS_BY_ERROR: dict[type[DomainError], int] = {
AuthenticationError: status.HTTP_401_UNAUTHORIZED, AuthenticationError: status.HTTP_401_UNAUTHORIZED,
PermissionDeniedError: status.HTTP_403_FORBIDDEN, PermissionDeniedError: status.HTTP_403_FORBIDDEN,
DependencyUnavailableError: status.HTTP_503_SERVICE_UNAVAILABLE, DependencyUnavailableError: status.HTTP_503_SERVICE_UNAVAILABLE,
StorageError: status.HTTP_500_INTERNAL_SERVER_ERROR,
} }
@@ -33,6 +36,14 @@ def _error_body(code: str, message: str) -> dict[str, dict[str, str]]:
def register_exception_handlers(app: FastAPI) -> None: def register_exception_handlers(app: FastAPI) -> None:
@app.exception_handler(RangeNotSatisfiableError)
async def _handle_range_error(_request: Request, exc: RangeNotSatisfiableError) -> JSONResponse:
return JSONResponse(
status_code=status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE,
content=_error_body(exc.code, exc.message),
headers={"Content-Range": f"bytes */{exc.total_size}"},
)
@app.exception_handler(DomainError) @app.exception_handler(DomainError)
async def _handle_domain_error(_request: Request, exc: DomainError) -> JSONResponse: async def _handle_domain_error(_request: Request, exc: DomainError) -> JSONResponse:
http_status = _STATUS_BY_ERROR.get(type(exc), status.HTTP_400_BAD_REQUEST) http_status = _STATUS_BY_ERROR.get(type(exc), status.HTTP_400_BAD_REQUEST)
+16
View File
@@ -0,0 +1,16 @@
"""Album request/response schemas."""
import datetime as dt
import uuid
from pydantic import BaseModel
class AlbumOut(BaseModel):
id: uuid.UUID
title: str
artist_id: uuid.UUID
artist_name: str
year: int | None
track_count: int
created_at: dt.datetime
+14
View File
@@ -0,0 +1,14 @@
"""Artist request/response schemas."""
import datetime as dt
import uuid
from pydantic import BaseModel
class ArtistOut(BaseModel):
id: uuid.UUID
name: str
album_count: int
track_count: int
created_at: dt.datetime
+21
View File
@@ -0,0 +1,21 @@
"""Play history request/response schemas."""
import datetime as dt
import uuid
from pydantic import BaseModel
class HistoryIn(BaseModel):
track_id: uuid.UUID
played_at: dt.datetime
play_duration_seconds: int | None = None
completed: bool = False
class HistoryOut(BaseModel):
id: uuid.UUID
track_id: uuid.UUID
played_at: dt.datetime
play_duration_seconds: int | None
completed: bool
+18
View File
@@ -0,0 +1,18 @@
"""Like request/response schemas."""
import datetime as dt
import uuid
from typing import Literal
from pydantic import BaseModel
class LikeEvent(BaseModel):
track_id: uuid.UUID
value: Literal["like", "dislike", "neutral"]
class LikeState(BaseModel):
track_id: uuid.UUID
value: str
updated_at: dt.datetime
+10
View File
@@ -0,0 +1,10 @@
"""Shared pagination envelope for all paged list responses."""
from pydantic import BaseModel
class PagedResponse[T](BaseModel):
items: list[T]
total: int
limit: int
offset: int
+31
View File
@@ -0,0 +1,31 @@
"""Playlist request/response schemas."""
import datetime as dt
import uuid
from pydantic import BaseModel
class PlaylistOut(BaseModel):
id: uuid.UUID
name: str
description: str | None
owner_id: uuid.UUID
version: int
track_count: int
created_at: dt.datetime
class PlaylistCreate(BaseModel):
name: str
description: str | None = None
class PlaylistUpdate(BaseModel):
name: str | None = None
description: str | None = None
class PlaylistAddTrack(BaseModel):
track_id: uuid.UUID
position: float | None = None
+13
View File
@@ -0,0 +1,13 @@
"""Search response schemas."""
from pydantic import BaseModel
from app.api.schemas.album import AlbumOut
from app.api.schemas.artist import ArtistOut
from app.api.schemas.track import TrackOut
class LibrarySearchResponse(BaseModel):
tracks: list[TrackOut]
albums: list[AlbumOut]
artists: list[ArtistOut]
+27
View File
@@ -0,0 +1,27 @@
"""Track request/response schemas."""
import datetime as dt
import uuid
from pydantic import BaseModel
class TrackOut(BaseModel):
id: uuid.UUID
title: str
artist_id: uuid.UUID
artist_name: str
album_id: uuid.UUID | None
album_title: str | None
duration_seconds: int | None
file_format: str
file_size: int
metadata_status: str
source: str
created_at: dt.datetime
class TrackUpdate(BaseModel):
title: str | None = None
genre: str | None = None
year: int | None = None
+11
View File
@@ -0,0 +1,11 @@
"""Schemas for upload responses."""
import uuid
from pydantic import BaseModel
class UploadResponse(BaseModel):
track_id: uuid.UUID
title: str
already_exists: bool
+93 -5
View File
@@ -3,22 +3,110 @@
import uuid import uuid
from typing import Any from typing import Any
from fastapi import APIRouter from fastapi import APIRouter, Query
from app.api.deps import AlbumRepoDep, ArtistRepoDep, CurrentUser, TrackRepoDep
from app.api.schemas.album import AlbumOut
from app.api.schemas.pagination import PagedResponse
from app.api.schemas.track import TrackOut
from app.api.v1.tracks import _build_track_out
from app.domain.entities.album import Album
from app.domain.entities.track import Artist
from app.domain.errors import NotFoundError
router = APIRouter(prefix="/albums", tags=["albums"]) router = APIRouter(prefix="/albums", tags=["albums"])
async def _build_album_out(
albums: list[Album],
artists: dict[uuid.UUID, Artist],
track_counts: dict[uuid.UUID, int],
) -> list[AlbumOut]:
return [
AlbumOut(
id=a.id,
title=a.title,
artist_id=a.artist_id,
artist_name=artists[a.artist_id].name if a.artist_id in artists else "Unknown Artist",
year=a.year,
track_count=track_counts.get(a.id, 0),
created_at=a.created_at,
)
for a in albums
]
@router.get("") @router.get("")
async def list_albums() -> Any: ... async def list_albums(
album_repo: AlbumRepoDep,
artist_repo: ArtistRepoDep,
_: CurrentUser,
artist_id: uuid.UUID | None = None,
q: str | None = None,
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
) -> PagedResponse[AlbumOut]:
albums = await album_repo.list(artist_id=artist_id, q=q, limit=limit, offset=offset)
total = await album_repo.count(artist_id=artist_id, q=q)
artist_ids = list({a.artist_id for a in albums})
artists = {a.id: a for a in await artist_repo.get_many(artist_ids)}
track_counts = await album_repo.track_count_many([a.id for a in albums])
items = await _build_album_out(albums, artists, track_counts)
return PagedResponse(items=items, total=total, limit=limit, offset=offset)
@router.get("/{album_id}") @router.get("/{album_id}")
async def get_album(album_id: uuid.UUID) -> Any: ... async def get_album(
album_id: uuid.UUID,
album_repo: AlbumRepoDep,
artist_repo: ArtistRepoDep,
_: CurrentUser,
) -> AlbumOut:
album = await album_repo.get_by_id(album_id)
if album is None:
raise NotFoundError(f"Album {album_id} not found.")
artists = {a.id: a for a in await artist_repo.get_many([album.artist_id])}
track_counts = await album_repo.track_count_many([album.id])
items = await _build_album_out([album], artists, track_counts)
return items[0]
@router.get("/{album_id}/tracks") @router.get("/{album_id}/tracks")
async def get_album_tracks(album_id: uuid.UUID) -> Any: ... async def get_album_tracks(
album_id: uuid.UUID,
track_repo: TrackRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
_: CurrentUser,
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
) -> PagedResponse[TrackOut]:
album = await album_repo.get_by_id(album_id)
if album is None:
raise NotFoundError(f"Album {album_id} not found.")
tracks = await track_repo.list(
artist_id=None,
album_id=album_id,
q=None,
sort_by="title",
order="asc",
limit=limit,
offset=offset,
)
total = await track_repo.count(artist_id=None, album_id=album_id, q=None)
artist_ids = list({t.artist_id for t in tracks})
artists = {a.id: a for a in await artist_repo.get_many(artist_ids)}
albums = {album.id: album}
items = await _build_track_out(tracks, artists, albums)
return PagedResponse(items=items, total=total, limit=limit, offset=offset)
@router.get("/{album_id}/cover") @router.get("/{album_id}/cover")
async def get_album_cover(album_id: uuid.UUID) -> Any: ... async def get_album_cover(album_id: uuid.UUID, _: CurrentUser) -> Any: ...
+105 -6
View File
@@ -3,26 +3,125 @@
import uuid import uuid
from typing import Any from typing import Any
from fastapi import APIRouter from fastapi import APIRouter, Query
from app.api.deps import AlbumRepoDep, ArtistRepoDep, CurrentUser, TrackRepoDep
from app.api.schemas.album import AlbumOut
from app.api.schemas.artist import ArtistOut
from app.api.schemas.pagination import PagedResponse
from app.api.schemas.track import TrackOut
from app.api.v1.albums import _build_album_out
from app.api.v1.tracks import _build_track_out
from app.domain.errors import NotFoundError
router = APIRouter(prefix="/artists", tags=["artists"]) router = APIRouter(prefix="/artists", tags=["artists"])
@router.get("") @router.get("")
async def list_artists() -> Any: ... async def list_artists(
artist_repo: ArtistRepoDep,
_: CurrentUser,
q: str | None = None,
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
) -> PagedResponse[ArtistOut]:
artists = await artist_repo.list(q=q, limit=limit, offset=offset)
total = await artist_repo.count(q=q)
items = []
for a in artists:
album_cnt = await artist_repo.album_count(a.id)
track_cnt = await artist_repo.track_count(a.id)
items.append(
ArtistOut(
id=a.id,
name=a.name,
album_count=album_cnt,
track_count=track_cnt,
created_at=a.created_at,
)
)
return PagedResponse(items=items, total=total, limit=limit, offset=offset)
@router.get("/{artist_id}") @router.get("/{artist_id}")
async def get_artist(artist_id: uuid.UUID) -> Any: ... async def get_artist(
artist_id: uuid.UUID,
artist_repo: ArtistRepoDep,
_: CurrentUser,
) -> ArtistOut:
artist = await artist_repo.get_by_id(artist_id)
if artist is None:
raise NotFoundError(f"Artist {artist_id} not found.")
album_cnt = await artist_repo.album_count(artist_id)
track_cnt = await artist_repo.track_count(artist_id)
return ArtistOut(
id=artist.id,
name=artist.name,
album_count=album_cnt,
track_count=track_cnt,
created_at=artist.created_at,
)
@router.get("/{artist_id}/albums") @router.get("/{artist_id}/albums")
async def get_artist_albums(artist_id: uuid.UUID) -> Any: ... async def get_artist_albums(
artist_id: uuid.UUID,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
_: CurrentUser,
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
) -> PagedResponse[AlbumOut]:
artist = await artist_repo.get_by_id(artist_id)
if artist is None:
raise NotFoundError(f"Artist {artist_id} not found.")
albums = await album_repo.list(artist_id=artist_id, q=None, limit=limit, offset=offset)
total = await album_repo.count(artist_id=artist_id, q=None)
artists_map = {artist.id: artist}
track_counts = await album_repo.track_count_many([a.id for a in albums])
items = await _build_album_out(albums, artists_map, track_counts)
return PagedResponse(items=items, total=total, limit=limit, offset=offset)
@router.get("/{artist_id}/tracks") @router.get("/{artist_id}/tracks")
async def get_artist_tracks(artist_id: uuid.UUID) -> Any: ... async def get_artist_tracks(
artist_id: uuid.UUID,
artist_repo: ArtistRepoDep,
track_repo: TrackRepoDep,
album_repo: AlbumRepoDep,
_: CurrentUser,
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
) -> PagedResponse[TrackOut]:
artist = await artist_repo.get_by_id(artist_id)
if artist is None:
raise NotFoundError(f"Artist {artist_id} not found.")
tracks = await track_repo.list(
artist_id=artist_id,
album_id=None,
q=None,
sort_by="title",
order="asc",
limit=limit,
offset=offset,
)
total = await track_repo.count(artist_id=artist_id, album_id=None, q=None)
album_ids = list({t.album_id for t in tracks if t.album_id is not None})
artists_map = {artist.id: artist}
albums_map = {a.id: a for a in await album_repo.get_many(album_ids)}
items = await _build_track_out(tracks, artists_map, albums_map)
return PagedResponse(items=items, total=total, limit=limit, offset=offset)
@router.get("/{artist_id}/similar") @router.get("/{artist_id}/similar")
async def get_similar_artists(artist_id: uuid.UUID) -> Any: ... async def get_similar_artists(artist_id: uuid.UUID, _: CurrentUser) -> Any: ...
+44 -7
View File
@@ -1,15 +1,52 @@
"""Playback history endpoints.""" """Playback history endpoints."""
from typing import Any from fastapi import APIRouter, Query, Response
from fastapi import APIRouter from app.api.deps import CurrentUser, HistoryRepoDep, TrackRepoDep
from app.api.schemas.history import HistoryIn, HistoryOut
from app.api.schemas.pagination import PagedResponse
from app.domain.errors import NotFoundError
router = APIRouter(prefix="/history", tags=["history"]) router = APIRouter(prefix="/history", tags=["history"])
@router.post("", status_code=204)
async def record_history(
body: HistoryIn,
history_repo: HistoryRepoDep,
track_repo: TrackRepoDep,
user: CurrentUser,
) -> Response:
track = await track_repo.get_by_id(body.track_id)
if track is None:
raise NotFoundError(f"Track {body.track_id} not found.")
await history_repo.add(
user_id=user.id,
track_id=body.track_id,
played_at=body.played_at,
play_duration_seconds=body.play_duration_seconds,
completed=body.completed,
)
return Response(status_code=204)
@router.get("") @router.get("")
async def get_history() -> Any: ... async def get_history(
history_repo: HistoryRepoDep,
user: CurrentUser,
@router.post("") limit: int = Query(50, ge=1, le=200),
async def record_history() -> Any: ... offset: int = Query(0, ge=0),
) -> PagedResponse[HistoryOut]:
entries = await history_repo.list(user_id=user.id, limit=limit, offset=offset)
total = await history_repo.count(user_id=user.id)
items = [
HistoryOut(
id=e.id,
track_id=e.track_id,
played_at=e.played_at,
play_duration_seconds=e.play_duration_seconds,
completed=e.completed,
)
for e in entries
]
return PagedResponse(items=items, total=total, limit=limit, offset=offset)
+50 -6
View File
@@ -1,19 +1,63 @@
"""Like endpoints. Likes are an append-only event-log — never updated in place.""" """Like endpoints. Likes are an append-only event-log — never updated in place."""
from typing import Any import uuid
from fastapi import APIRouter from fastapi import APIRouter, Query
from app.api.deps import AlbumRepoDep, ArtistRepoDep, CurrentUser, LikeRepoDep
from app.api.schemas.like import LikeEvent, LikeState
from app.api.schemas.pagination import PagedResponse
from app.api.schemas.track import TrackOut
from app.api.v1.tracks import _build_track_out
router = APIRouter(prefix="/likes", tags=["likes"]) router = APIRouter(prefix="/likes", tags=["likes"])
@router.post("", status_code=201)
async def add_like(
body: LikeEvent,
like_repo: LikeRepoDep,
user: CurrentUser,
) -> LikeState:
like = await like_repo.add(user_id=user.id, track_id=body.track_id, value=body.value)
return LikeState(track_id=like.track_id, value=like.value, updated_at=like.created_at)
@router.get("") @router.get("")
async def get_likes() -> Any: ... async def get_likes(
like_repo: LikeRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
user: CurrentUser,
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
) -> PagedResponse[TrackOut]:
tracks = await like_repo.list_liked_tracks(user_id=user.id, limit=limit, offset=offset)
total = await like_repo.count_liked_tracks(user_id=user.id)
artist_ids = list({t.artist_id for t in tracks})
album_ids = list({t.album_id for t in tracks if t.album_id is not None})
artists_map = {a.id: a for a in await artist_repo.get_many(artist_ids)}
albums_map = {a.id: a for a in await album_repo.get_many(album_ids)}
@router.post("") items = await _build_track_out(tracks, artists_map, albums_map)
async def add_like() -> Any: ... return PagedResponse(items=items, total=total, limit=limit, offset=offset)
@router.get("/state") @router.get("/state")
async def get_likes_state() -> Any: ... async def get_likes_state(
like_repo: LikeRepoDep,
user: CurrentUser,
track_ids: str = Query(default=""),
) -> list[LikeState]:
ids: list[uuid.UUID] = []
if track_ids:
try:
ids = [uuid.UUID(tid.strip()) for tid in track_ids.split(",") if tid.strip()]
except ValueError:
return []
likes = await like_repo.get_latest_state(user_id=user.id, track_ids=ids)
return [
LikeState(track_id=lk.track_id, value=lk.value, updated_at=lk.created_at) for lk in likes
]
+156 -15
View File
@@ -3,46 +3,187 @@
import uuid import uuid
from typing import Any from typing import Any
from fastapi import APIRouter from fastapi import APIRouter, Query, Response
from app.api.deps import (
AlbumRepoDep,
ArtistRepoDep,
CurrentUser,
PlaylistRepoDep,
TrackRepoDep,
)
from app.api.schemas.pagination import PagedResponse
from app.api.schemas.playlist import PlaylistAddTrack, PlaylistCreate, PlaylistOut, PlaylistUpdate
from app.api.schemas.track import TrackOut
from app.api.v1.tracks import _build_track_out
from app.domain.entities.playlist import Playlist
from app.domain.errors import NotFoundError, PermissionDeniedError
from app.infrastructure.db.repositories.playlist_repository import SqlAlchemyPlaylistRepository
router = APIRouter(prefix="/playlists", tags=["playlists"]) router = APIRouter(prefix="/playlists", tags=["playlists"])
async def _build_playlist_out(
playlists: list[Playlist], playlist_repo: SqlAlchemyPlaylistRepository
) -> list[PlaylistOut]:
ids = [p.id for p in playlists]
counts = await playlist_repo.track_count_many(ids)
return [
PlaylistOut(
id=p.id,
name=p.name,
description=p.description,
owner_id=p.owner_id,
version=p.version,
track_count=counts.get(p.id, 0),
created_at=p.created_at,
)
for p in playlists
]
@router.get("") @router.get("")
async def list_playlists() -> Any: ... async def list_playlists(
playlist_repo: PlaylistRepoDep,
user: CurrentUser,
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
) -> PagedResponse[PlaylistOut]:
playlists = await playlist_repo.list(owner_id=user.id, limit=limit, offset=offset)
total = await playlist_repo.count(owner_id=user.id)
items = await _build_playlist_out(playlists, playlist_repo)
return PagedResponse(items=items, total=total, limit=limit, offset=offset)
@router.post("") @router.post("", status_code=201)
async def create_playlist() -> Any: ... async def create_playlist(
body: PlaylistCreate,
playlist_repo: PlaylistRepoDep,
user: CurrentUser,
) -> PlaylistOut:
playlist = await playlist_repo.add(
name=body.name, description=body.description, owner_id=user.id
)
items = await _build_playlist_out([playlist], playlist_repo)
return items[0]
@router.get("/{playlist_id}") @router.get("/{playlist_id}")
async def get_playlist(playlist_id: uuid.UUID) -> Any: ... async def get_playlist(
playlist_id: uuid.UUID,
playlist_repo: PlaylistRepoDep,
_: CurrentUser,
) -> PlaylistOut:
playlist = await playlist_repo.get_by_id(playlist_id)
if playlist is None:
raise NotFoundError(f"Playlist {playlist_id} not found.")
items = await _build_playlist_out([playlist], playlist_repo)
return items[0]
@router.patch("/{playlist_id}") @router.patch("/{playlist_id}")
async def update_playlist(playlist_id: uuid.UUID) -> Any: ... async def update_playlist(
playlist_id: uuid.UUID,
body: PlaylistUpdate,
playlist_repo: PlaylistRepoDep,
user: CurrentUser,
) -> PlaylistOut:
playlist = await playlist_repo.get_by_id(playlist_id)
if playlist is None:
raise NotFoundError(f"Playlist {playlist_id} not found.")
if playlist.owner_id != user.id:
raise PermissionDeniedError("You don't own this playlist.")
updated = await playlist_repo.update(playlist_id, name=body.name, description=body.description)
items = await _build_playlist_out([updated], playlist_repo)
return items[0]
@router.delete("/{playlist_id}") @router.delete("/{playlist_id}", status_code=204)
async def delete_playlist(playlist_id: uuid.UUID) -> Any: ... async def delete_playlist(
playlist_id: uuid.UUID,
playlist_repo: PlaylistRepoDep,
user: CurrentUser,
) -> Response:
playlist = await playlist_repo.get_by_id(playlist_id)
if playlist is None:
raise NotFoundError(f"Playlist {playlist_id} not found.")
if playlist.owner_id != user.id:
raise PermissionDeniedError("You don't own this playlist.")
await playlist_repo.delete(playlist_id)
return Response(status_code=204)
@router.get("/{playlist_id}/tracks") @router.get("/{playlist_id}/tracks")
async def get_playlist_tracks(playlist_id: uuid.UUID) -> Any: ... async def get_playlist_tracks(
playlist_id: uuid.UUID,
playlist_repo: PlaylistRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
_: CurrentUser,
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
) -> PagedResponse[TrackOut]:
playlist = await playlist_repo.get_by_id(playlist_id)
if playlist is None:
raise NotFoundError(f"Playlist {playlist_id} not found.")
tracks = await playlist_repo.get_tracks(playlist_id, limit=limit, offset=offset)
total = await playlist_repo.get_track_total(playlist_id)
artist_ids = list({t.artist_id for t in tracks})
album_ids = list({t.album_id for t in tracks if t.album_id is not None})
artists_map = {a.id: a for a in await artist_repo.get_many(artist_ids)}
albums_map = {a.id: a for a in await album_repo.get_many(album_ids)}
items = await _build_track_out(tracks, artists_map, albums_map)
return PagedResponse(items=items, total=total, limit=limit, offset=offset)
@router.post("/{playlist_id}/tracks") @router.post("/{playlist_id}/tracks", status_code=204)
async def add_playlist_tracks(playlist_id: uuid.UUID) -> Any: ... async def add_playlist_track(
playlist_id: uuid.UUID,
body: PlaylistAddTrack,
playlist_repo: PlaylistRepoDep,
track_repo: TrackRepoDep,
user: CurrentUser,
) -> Response:
playlist = await playlist_repo.get_by_id(playlist_id)
if playlist is None:
raise NotFoundError(f"Playlist {playlist_id} not found.")
if playlist.owner_id != user.id:
raise PermissionDeniedError("You don't own this playlist.")
track = await track_repo.get_by_id(body.track_id)
if track is None:
raise NotFoundError(f"Track {body.track_id} not found.")
position = body.position
if position is None:
position = await playlist_repo.max_position(playlist_id) + 1.0
await playlist_repo.add_track(playlist_id, body.track_id, position=position)
return Response(status_code=204)
@router.delete("/{playlist_id}/tracks/{track_id}") @router.delete("/{playlist_id}/tracks/{track_id}", status_code=204)
async def remove_playlist_track(playlist_id: uuid.UUID, track_id: uuid.UUID) -> Any: ... async def remove_playlist_track(
playlist_id: uuid.UUID,
track_id: uuid.UUID,
playlist_repo: PlaylistRepoDep,
user: CurrentUser,
) -> Response:
playlist = await playlist_repo.get_by_id(playlist_id)
if playlist is None:
raise NotFoundError(f"Playlist {playlist_id} not found.")
if playlist.owner_id != user.id:
raise PermissionDeniedError("You don't own this playlist.")
await playlist_repo.remove_track(playlist_id, track_id)
return Response(status_code=204)
@router.put("/{playlist_id}/tracks/reorder") @router.put("/{playlist_id}/tracks/reorder")
async def reorder_playlist_tracks(playlist_id: uuid.UUID) -> Any: ... async def reorder_playlist_tracks(playlist_id: uuid.UUID, _: CurrentUser) -> Any: ...
@router.get("/{playlist_id}/cover") @router.get("/{playlist_id}/cover")
async def get_playlist_cover(playlist_id: uuid.UUID) -> Any: ... async def get_playlist_cover(playlist_id: uuid.UUID, _: CurrentUser) -> Any: ...
+66 -3
View File
@@ -2,14 +2,77 @@
from typing import Any from typing import Any
from fastapi import APIRouter from fastapi import APIRouter, Query
from app.api.deps import AlbumRepoDep, ArtistRepoDep, CurrentUser, TrackRepoDep
from app.api.schemas.album import AlbumOut
from app.api.schemas.artist import ArtistOut
from app.api.schemas.search import LibrarySearchResponse
from app.api.schemas.track import TrackOut
from app.api.v1.albums import _build_album_out
from app.api.v1.tracks import _build_track_out
router = APIRouter(prefix="/search", tags=["search"]) router = APIRouter(prefix="/search", tags=["search"])
@router.get("") @router.get("")
async def search() -> Any: ... async def search(_: CurrentUser) -> Any: ...
@router.get("/library") @router.get("/library")
async def search_library() -> Any: ... async def search_library(
track_repo: TrackRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
_: CurrentUser,
q: str = Query(min_length=1),
types: str = Query(default="tracks,albums,artists"),
limit: int = Query(20, ge=1, le=100),
) -> LibrarySearchResponse:
requested = {t.strip() for t in types.split(",")}
tracks_out: list[TrackOut] = []
albums_out: list[AlbumOut] = []
artists_out: list[ArtistOut] = []
if "tracks" in requested:
tracks = await track_repo.list(
artist_id=None,
album_id=None,
q=q,
sort_by="title",
order="asc",
limit=limit,
offset=0,
)
if tracks:
artist_ids = list({t.artist_id for t in tracks})
album_ids = list({t.album_id for t in tracks if t.album_id is not None})
artists_map = {a.id: a for a in await artist_repo.get_many(artist_ids)}
albums_map = {a.id: a for a in await album_repo.get_many(album_ids)}
tracks_out = await _build_track_out(tracks, artists_map, albums_map)
if "albums" in requested:
albums = await album_repo.list(artist_id=None, q=q, limit=limit, offset=0)
if albums:
artist_ids = list({a.artist_id for a in albums})
artists_map = {a.id: a for a in await artist_repo.get_many(artist_ids)}
track_counts = await album_repo.track_count_many([a.id for a in albums])
albums_out = await _build_album_out(albums, artists_map, track_counts)
if "artists" in requested:
raw_artists = await artist_repo.list(q=q, limit=limit, offset=0)
for a in raw_artists:
album_cnt = await artist_repo.album_count(a.id)
track_cnt = await artist_repo.track_count(a.id)
artists_out.append(
ArtistOut(
id=a.id,
name=a.name,
album_count=album_cnt,
track_count=track_cnt,
created_at=a.created_at,
)
)
return LibrarySearchResponse(tracks=tracks_out, albums=albums_out, artists=artists_out)
+28 -9
View File
@@ -1,20 +1,39 @@
"""Audio streaming endpoints: direct stream and HLS.""" """Audio streaming endpoint direct stream with Range support."""
import uuid import uuid
from typing import Any from typing import Annotated
from fastapi import APIRouter from fastapi import APIRouter, Header
from fastapi.responses import StreamingResponse
from app.api.deps import StreamingServiceDep, StreamUser
router = APIRouter(prefix="/stream", tags=["streaming"]) router = APIRouter(prefix="/stream", tags=["streaming"])
@router.get("/{track_id}") @router.get("/{track_id}")
async def stream_track(track_id: uuid.UUID) -> Any: ... async def stream_track(
track_id: uuid.UUID,
service: StreamingServiceDep,
_user: StreamUser,
range_header: Annotated[str | None, Header(alias="Range")] = None,
) -> StreamingResponse:
result = await service.open_stream(track_id, range_header)
headers = {
"Accept-Ranges": "bytes",
"Content-Length": str(result.content_length),
}
@router.get("/{track_id}/hls/playlist.m3u8") if result.is_partial:
async def hls_playlist(track_id: uuid.UUID) -> Any: ... headers["Content-Range"] = f"bytes {result.start}-{result.end}/{result.total_size}"
status_code = 206
else:
status_code = 200
return StreamingResponse(
@router.get("/{track_id}/hls/{segment}") result.stream,
async def hls_segment(track_id: uuid.UUID, segment: str) -> Any: ... status_code=status_code,
headers=headers,
media_type=result.content_type,
)
+123 -13
View File
@@ -1,48 +1,158 @@
"""Track endpoints (library CRUD, similarity, optimization, cover, metadata, streaming).""" """Track endpoints."""
import uuid import uuid
from typing import Any from typing import Any
from fastapi import APIRouter from fastapi import APIRouter, Query, Response
from app.api.deps import AlbumRepoDep, ArtistRepoDep, CurrentUser, FileStorageDep, TrackRepoDep
from app.api.schemas.pagination import PagedResponse
from app.api.schemas.track import TrackOut, TrackUpdate
from app.domain.entities.album import Album
from app.domain.entities.track import Artist, Track
from app.domain.errors import NotFoundError
router = APIRouter(prefix="/tracks", tags=["tracks"]) router = APIRouter(prefix="/tracks", tags=["tracks"])
async def _build_track_out(
tracks: list[Track],
artists: dict[uuid.UUID, Artist],
albums: dict[uuid.UUID, Album],
) -> list[TrackOut]:
return [
TrackOut(
id=t.id,
title=t.title,
artist_id=t.artist_id,
artist_name=artists[t.artist_id].name if t.artist_id in artists else "Unknown Artist",
album_id=t.album_id,
album_title=albums[t.album_id].title if t.album_id and t.album_id in albums else None,
duration_seconds=t.duration_seconds,
file_format=t.file_format,
file_size=t.file_size,
metadata_status=t.metadata_status,
source=t.source,
created_at=t.created_at,
)
for t in tracks
]
@router.get("") @router.get("")
async def list_tracks() -> Any: ... async def list_tracks(
track_repo: TrackRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
_: CurrentUser,
artist_id: uuid.UUID | None = None,
album_id: uuid.UUID | None = None,
q: str | None = None,
sort_by: str = Query("created_at", pattern="^(title|created_at|artist)$"),
order: str = Query("desc", pattern="^(asc|desc)$"),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
) -> PagedResponse[TrackOut]:
tracks = await track_repo.list(
artist_id=artist_id,
album_id=album_id,
q=q,
sort_by=sort_by,
order=order,
limit=limit,
offset=offset,
)
total = await track_repo.count(artist_id=artist_id, album_id=album_id, q=q)
artist_ids = list({t.artist_id for t in tracks})
album_ids = list({t.album_id for t in tracks if t.album_id is not None})
artists = {a.id: a for a in await artist_repo.get_many(artist_ids)}
albums = {a.id: a for a in await album_repo.get_many(album_ids)}
items = await _build_track_out(tracks, artists, albums)
return PagedResponse(items=items, total=total, limit=limit, offset=offset)
@router.get("/{track_id}") @router.get("/{track_id}")
async def get_track(track_id: uuid.UUID) -> Any: ... async def get_track(
track_id: uuid.UUID,
track_repo: TrackRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
_: CurrentUser,
) -> TrackOut:
track = await track_repo.get_by_id(track_id)
if track is None:
raise NotFoundError(f"Track {track_id} not found.")
artist_ids = [track.artist_id]
album_ids = [track.album_id] if track.album_id else []
artists = {a.id: a for a in await artist_repo.get_many(artist_ids)}
albums = {a.id: a for a in await album_repo.get_many(album_ids)}
items = await _build_track_out([track], artists, albums)
return items[0]
@router.patch("/{track_id}") @router.patch("/{track_id}")
async def update_track(track_id: uuid.UUID) -> Any: ... async def update_track(
track_id: uuid.UUID,
body: TrackUpdate,
track_repo: TrackRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
_: CurrentUser,
) -> TrackOut:
track = await track_repo.update(
track_id,
title=body.title,
genre=body.genre,
year=body.year,
)
artist_ids = [track.artist_id]
album_ids = [track.album_id] if track.album_id else []
artists = {a.id: a for a in await artist_repo.get_many(artist_ids)}
albums = {a.id: a for a in await album_repo.get_many(album_ids)}
items = await _build_track_out([track], artists, albums)
return items[0]
@router.delete("/{track_id}") @router.delete("/{track_id}", status_code=204)
async def delete_track(track_id: uuid.UUID) -> Any: ... async def delete_track(
track_id: uuid.UUID,
track_repo: TrackRepoDep,
storage: FileStorageDep,
_: CurrentUser,
) -> Response:
track = await track_repo.get_by_id(track_id)
if track is None:
raise NotFoundError(f"Track {track_id} not found.")
await track_repo.delete(track_id)
await storage.delete(track.storage_uri)
return Response(status_code=204)
@router.get("/{track_id}/similar") @router.get("/{track_id}/similar")
async def get_similar_tracks(track_id: uuid.UUID) -> Any: ... async def get_similar_tracks(track_id: uuid.UUID, _: CurrentUser) -> Any: ...
@router.post("/{track_id}/optimize") @router.post("/{track_id}/optimize")
async def optimize_track(track_id: uuid.UUID) -> Any: ... async def optimize_track(track_id: uuid.UUID, _: CurrentUser) -> Any: ...
@router.get("/{track_id}/cover") @router.get("/{track_id}/cover")
async def get_track_cover(track_id: uuid.UUID) -> Any: ... async def get_track_cover(track_id: uuid.UUID, _: CurrentUser) -> Any: ...
@router.post("/{track_id}/metadata/enrich") @router.post("/{track_id}/metadata/enrich")
async def enrich_metadata(track_id: uuid.UUID) -> Any: ... async def enrich_metadata(track_id: uuid.UUID, _: CurrentUser) -> Any: ...
@router.get("/{track_id}/metadata/matches") @router.get("/{track_id}/metadata/matches")
async def get_metadata_matches(track_id: uuid.UUID) -> Any: ... async def get_metadata_matches(track_id: uuid.UUID, _: CurrentUser) -> Any: ...
@router.put("/{track_id}/metadata") @router.put("/{track_id}/metadata")
async def set_metadata(track_id: uuid.UUID) -> Any: ... async def set_metadata(track_id: uuid.UUID, _: CurrentUser) -> Any: ...
+17 -4
View File
@@ -1,11 +1,24 @@
"""Local file upload endpoint.""" """Local file upload endpoint."""
from typing import Any from typing import Annotated
from fastapi import APIRouter from fastapi import APIRouter, File, UploadFile
from app.api.deps import CurrentUser, UploadServiceDep
from app.api.schemas.upload import UploadResponse
router = APIRouter(prefix="/upload", tags=["upload"]) router = APIRouter(prefix="/upload", tags=["upload"])
@router.post("") @router.post("", response_model=UploadResponse)
async def upload_file() -> Any: ... async def upload_file(
file: Annotated[UploadFile, File()],
current_user: CurrentUser,
service: UploadServiceDep,
) -> UploadResponse:
result = await service.handle_upload(upload=file, user=current_user)
return UploadResponse(
track_id=result.track_id,
title=result.title,
already_exists=result.already_exists,
)
+97
View File
@@ -0,0 +1,97 @@
"""StreamingService — resolves a track and opens a byte-range stream."""
import re
import uuid
from collections.abc import AsyncIterator
from dataclasses import dataclass
from app.domain.errors import NotFoundError, RangeNotSatisfiableError
from app.domain.ports import FileStorage, TrackRepository
_FORMAT_CONTENT_TYPE: dict[str, str] = {
"mp3": "audio/mpeg",
"flac": "audio/flac",
"m4a": "audio/mp4",
"aac": "audio/aac",
"ogg": "audio/ogg",
"opus": "audio/ogg",
"wav": "audio/wav",
"wma": "audio/x-ms-wma",
"aiff": "audio/aiff",
"aif": "audio/aiff",
}
_RANGE_RE = re.compile(r"bytes=(\d+)-(\d*)")
@dataclass
class StreamResult:
stream: AsyncIterator[bytes]
total_size: int
content_length: int
content_type: str
start: int
end: int
is_partial: bool
def _parse_range(header: str | None, total_size: int) -> tuple[int, int | None, bool]:
"""Return (start, end, is_partial). Raises RangeNotSatisfiableError on invalid range."""
if header is None:
return 0, None, False
m = _RANGE_RE.fullmatch(header.strip())
if not m:
return 0, None, False # malformed → treat as absent per RFC 7233
start = int(m.group(1))
end: int | None = int(m.group(2)) if m.group(2) else None
if start >= total_size:
raise RangeNotSatisfiableError(total_size)
if end is not None:
if end >= total_size:
end = total_size - 1
if end < start:
raise RangeNotSatisfiableError(total_size)
return start, end, True
class StreamingService:
def __init__(self, tracks: TrackRepository, storage: FileStorage) -> None:
self._tracks = tracks
self._storage = storage
async def open_stream(
self,
track_id: uuid.UUID,
range_header: str | None,
) -> StreamResult:
track = await self._tracks.get_by_id(track_id)
if track is None:
raise NotFoundError("Track not found.")
stat = await self._storage.stat(track.storage_uri)
total_size = stat.size
content_type = stat.content_type or _FORMAT_CONTENT_TYPE.get(
track.file_format.lower(), "application/octet-stream"
)
start, end, is_partial = _parse_range(range_header, total_size)
stream, _ = await self._storage.open_range(track.storage_uri, start, end)
actual_end = end if end is not None else total_size - 1
content_length = actual_end - start + 1
return StreamResult(
stream=stream,
total_size=total_size,
content_length=content_length,
content_type=content_type,
start=start,
end=actual_end,
is_partial=is_partial,
)
+116
View File
@@ -0,0 +1,116 @@
"""UploadService — handles user file uploads."""
import contextlib
import hashlib
import os
import tempfile
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Protocol
import anyio
from app.domain.entities.user import User
from app.domain.ports import ArtistRepository, FileStorage, TrackRepository
class UploadFileProtocol(Protocol):
filename: str | None
async def read(self, size: int = -1) -> bytes: ...
@dataclass(frozen=True)
class UploadResult:
track_id: uuid.UUID
title: str
already_exists: bool
async def _stream_to_temp(upload: UploadFileProtocol, dest: Path) -> tuple[str, int]:
h = hashlib.sha256()
size = 0
async with await anyio.open_file(dest, "wb") as out:
while True:
chunk = await upload.read(65536)
if not chunk:
break
h.update(chunk)
await out.write(chunk)
size += len(chunk)
return h.hexdigest(), size
class UploadService:
def __init__(
self,
tracks: TrackRepository,
artists: ArtistRepository,
storage: FileStorage,
tmp_dir: Path | None = None,
) -> None:
self._tracks = tracks
self._artists = artists
self._storage = storage
self._tmp_dir = tmp_dir
async def handle_upload(
self,
*,
upload: UploadFileProtocol,
user: User,
) -> UploadResult:
filename = upload.filename or "unknown"
ext = Path(filename).suffix.lower().lstrip(".") or "bin"
title = Path(filename).stem or "Unknown"
fd, tmp_str = tempfile.mkstemp(
suffix=f".{ext}",
dir=str(self._tmp_dir) if self._tmp_dir else None,
)
tmp_path = Path(tmp_str)
try:
os.close(fd)
sha256_hex, file_size = await _stream_to_temp(upload, tmp_path)
existing = await self._tracks.get_by_source("upload", sha256_hex)
if existing is not None:
return UploadResult(
track_id=existing.id,
title=existing.title,
already_exists=True,
)
track_id = uuid.uuid4()
key = f"tracks/{str(track_id)[:2]}/{track_id}.{ext}"
await self._storage.save_file(key, tmp_path)
try:
artist = await self._artists.get_or_create("Unknown Artist")
track = await self._tracks.add(
id=track_id,
title=title,
artist_id=artist.id,
storage_uri=key,
file_format=ext,
file_size=file_size,
source="upload",
source_id=sha256_hex,
metadata_status="pending",
added_by=user.id,
)
except Exception:
with contextlib.suppress(Exception):
await self._storage.delete(key)
raise
# TODO(1D): enqueue metadata enrichment task
return UploadResult(
track_id=track.id,
title=track.title,
already_exists=False,
)
finally:
await anyio.Path(tmp_path).unlink(missing_ok=True)
+9
View File
@@ -49,6 +49,15 @@ class Settings(BaseSettings):
media_path: Path = Path("/data/media") media_path: Path = Path("/data/media")
transcode_cache_path: Path = Path("/data/transcode-cache") transcode_cache_path: Path = Path("/data/transcode-cache")
max_parallel_downloads: int = 2 max_parallel_downloads: int = 2
storage_backend: Literal["local", "s3"] = "local"
upload_tmp_dir: Path | None = None
# -- S3 storage (deferred; set storage_backend="s3" to use) ----------
s3_endpoint_url: str | None = None
s3_bucket: str | None = None
s3_region: str | None = None
s3_access_key: SecretStr | None = None
s3_secret_key: SecretStr | None = None
# -- external services (all optional; graceful degradation) ---------- # -- external services (all optional; graceful degradation) ----------
ml_service_url: str | None = None ml_service_url: str | None = None
+12
View File
@@ -0,0 +1,12 @@
"""File hashing utilities."""
import hashlib
from pathlib import Path
def sha256_of_file(path: Path) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
+17 -1
View File
@@ -1,5 +1,21 @@
"""Domain entities and value objects — pure, framework-free.""" """Domain entities and value objects — pure, framework-free."""
from app.domain.entities.album import Album
from app.domain.entities.history import PlayHistoryEntry
from app.domain.entities.like import Like
from app.domain.entities.playlist import Playlist
from app.domain.entities.storage import ObjectStat
from app.domain.entities.track import Artist, Track
from app.domain.entities.user import Credentials, User from app.domain.entities.user import Credentials, User
__all__ = ["Credentials", "User"] __all__ = [
"Album",
"Artist",
"Credentials",
"Like",
"ObjectStat",
"PlayHistoryEntry",
"Playlist",
"Track",
"User",
]
+17
View File
@@ -0,0 +1,17 @@
"""Album domain entity."""
import datetime as dt
import uuid
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class Album:
id: uuid.UUID
title: str
artist_id: uuid.UUID
year: int | None
cover_path: str | None
musicbrainz_id: str | None
created_at: dt.datetime
updated_at: dt.datetime
+15
View File
@@ -0,0 +1,15 @@
"""Play history domain entity — append-only scrobble log entry."""
import datetime as dt
import uuid
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class PlayHistoryEntry:
id: uuid.UUID
user_id: uuid.UUID
track_id: uuid.UUID
played_at: dt.datetime
play_duration_seconds: int | None
completed: bool
+14
View File
@@ -0,0 +1,14 @@
"""Like domain entity — append-only event log entry."""
import datetime as dt
import uuid
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class Like:
id: uuid.UUID
user_id: uuid.UUID
track_id: uuid.UUID
value: str # "like" | "dislike" | "neutral"
created_at: dt.datetime
+16
View File
@@ -0,0 +1,16 @@
"""Playlist domain entity."""
import datetime as dt
import uuid
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class Playlist:
id: uuid.UUID
name: str
description: str | None
owner_id: uuid.UUID
version: int
created_at: dt.datetime
updated_at: dt.datetime
+9
View File
@@ -0,0 +1,9 @@
"""Value objects for file storage."""
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class ObjectStat:
size: int
content_type: str | None
+32
View File
@@ -0,0 +1,32 @@
"""Track and Artist domain entities."""
import datetime as dt
import uuid
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class Artist:
id: uuid.UUID
name: str
created_at: dt.datetime
updated_at: dt.datetime
@dataclass(frozen=True, slots=True)
class Track:
id: uuid.UUID
title: str
artist_id: uuid.UUID
album_id: uuid.UUID | None
storage_uri: str
file_format: str
file_size: int
source: str
source_id: str
duration_seconds: int | None
genre: str | None
year: int | None
metadata_status: str
created_at: dt.datetime
updated_at: dt.datetime
+16
View File
@@ -61,3 +61,19 @@ class DependencyUnavailableError(DomainError):
""" """
code = "dependency_unavailable" code = "dependency_unavailable"
class StorageError(DomainError):
"""File storage operation failed."""
code = "storage_error"
class RangeNotSatisfiableError(DomainError):
"""Requested byte range cannot be satisfied."""
code = "range_not_satisfiable"
def __init__(self, total_size: int) -> None:
super().__init__("Requested range is not satisfiable.")
self.total_size = total_size
+142 -1
View File
@@ -7,9 +7,21 @@ are bound to these ports at the composition root (``app.api.deps``).
import datetime as dt import datetime as dt
import uuid import uuid
from collections.abc import AsyncIterator
from contextlib import AbstractAsyncContextManager
from pathlib import Path
from typing import Protocol from typing import Protocol
from app.domain.entities import Credentials, User from app.domain.entities import (
Album,
Credentials,
Like,
ObjectStat,
PlayHistoryEntry,
Playlist,
User,
)
from app.domain.entities.track import Artist, Track
from app.domain.tokens import IssuedToken, TokenClaims, TokenType from app.domain.tokens import IssuedToken, TokenClaims, TokenType
@@ -56,3 +68,132 @@ class TokenService(Protocol):
"""Verify signature + expiry and return claims. Raises """Verify signature + expiry and return claims. Raises
:class:`~app.domain.errors.AuthenticationError` on any failure.""" :class:`~app.domain.errors.AuthenticationError` on any failure."""
... ...
class FileStorage(Protocol):
async def save_file(self, key: str, src_path: Path) -> int: ...
async def open_range(
self, key: str, start: int, end: int | None
) -> tuple[AsyncIterator[bytes], int]: ...
async def stat(self, key: str) -> ObjectStat: ...
async def exists(self, key: str) -> bool: ...
async def delete(self, key: str) -> None: ...
def as_local_path(self, key: str) -> AbstractAsyncContextManager[Path]: ...
class ArtistRepository(Protocol):
async def get_or_create(self, name: str) -> Artist: ...
async def get_by_id(self, artist_id: uuid.UUID) -> Artist | None: ...
async def get_many(self, ids: list[uuid.UUID]) -> list[Artist]: ...
async def list(self, *, q: str | None, limit: int, offset: int) -> list[Artist]: ...
async def count(self, *, q: str | None) -> int: ...
async def album_count(self, artist_id: uuid.UUID) -> int: ...
async def track_count(self, artist_id: uuid.UUID) -> int: ...
class TrackRepository(Protocol):
async def get_by_id(self, track_id: uuid.UUID) -> Track | None: ...
async def get_by_source(self, source: str, source_id: str) -> Track | None: ...
async def add(
self,
*,
id: uuid.UUID,
title: str,
artist_id: uuid.UUID,
storage_uri: str,
file_format: str,
file_size: int,
source: str,
source_id: str,
metadata_status: str,
added_by: uuid.UUID | None,
) -> Track: ...
async def delete(self, track_id: uuid.UUID) -> None: ...
async def list(
self,
*,
artist_id: uuid.UUID | None,
album_id: uuid.UUID | None,
q: str | None,
sort_by: str,
order: str,
limit: int,
offset: int,
) -> list[Track]: ...
async def count(
self,
*,
artist_id: uuid.UUID | None,
album_id: uuid.UUID | None,
q: str | None,
) -> int: ...
async def update(
self,
track_id: uuid.UUID,
*,
title: str | None,
genre: str | None,
year: int | None,
) -> Track: ...
class AlbumRepository(Protocol):
async def get_by_id(self, album_id: uuid.UUID) -> Album | None: ...
async def get_many(self, ids: list[uuid.UUID]) -> list[Album]: ...
async def count(self, *, artist_id: uuid.UUID | None, q: str | None) -> int: ...
async def track_count(self, album_id: uuid.UUID) -> int: ...
async def track_count_many(self, album_ids: list[uuid.UUID]) -> dict[uuid.UUID, int]: ...
# list must come after any method using list[...] in its signature (name shadowing)
async def list(
self, *, artist_id: uuid.UUID | None, q: str | None, limit: int, offset: int
) -> list[Album]: ...
class PlaylistRepository(Protocol):
async def get_by_id(self, playlist_id: uuid.UUID) -> Playlist | None: ...
async def count(self, *, owner_id: uuid.UUID) -> int: ...
async def add(self, *, name: str, description: str | None, owner_id: uuid.UUID) -> Playlist: ...
async def update(
self, playlist_id: uuid.UUID, *, name: str | None, description: str | None
) -> Playlist: ...
async def delete(self, playlist_id: uuid.UUID) -> None: ...
async def track_count(self, playlist_id: uuid.UUID) -> int: ...
async def track_count_many(self, playlist_ids: list[uuid.UUID]) -> dict[uuid.UUID, int]: ...
async def get_tracks(
self, playlist_id: uuid.UUID, *, limit: int, offset: int
) -> list[Track]: ...
async def get_track_total(self, playlist_id: uuid.UUID) -> int: ...
async def add_track(
self, playlist_id: uuid.UUID, track_id: uuid.UUID, *, position: float
) -> None: ...
async def remove_track(self, playlist_id: uuid.UUID, track_id: uuid.UUID) -> None: ...
async def max_position(self, playlist_id: uuid.UUID) -> float: ...
# list must come after any method using list[...] in its signature (name shadowing)
async def list(self, *, owner_id: uuid.UUID, limit: int, offset: int) -> list[Playlist]: ...
class LikeRepository(Protocol):
async def add(self, *, user_id: uuid.UUID, track_id: uuid.UUID, value: str) -> Like: ...
async def get_latest_state(
self, *, user_id: uuid.UUID, track_ids: list[uuid.UUID]
) -> list[Like]: ...
async def list_liked_tracks(
self, *, user_id: uuid.UUID, limit: int, offset: int
) -> list[Track]: ...
async def count_liked_tracks(self, *, user_id: uuid.UUID) -> int: ...
class HistoryRepository(Protocol):
async def add(
self,
*,
user_id: uuid.UUID,
track_id: uuid.UUID,
played_at: dt.datetime,
play_duration_seconds: int | None,
completed: bool,
) -> PlayHistoryEntry: ...
async def list(
self, *, user_id: uuid.UUID, limit: int, offset: int
) -> list[PlayHistoryEntry]: ...
async def count(self, *, user_id: uuid.UUID) -> int: ...
+21 -1
View File
@@ -5,6 +5,26 @@ autogenerate and ``create_all`` (tests) see the full schema. ``alembic/env.py``
imports it for exactly this side effect. imports it for exactly this side effect.
""" """
from app.infrastructure.db.models.album import AlbumModel
from app.infrastructure.db.models.artist import ArtistModel
from app.infrastructure.db.models.download_job import DownloadJobModel
from app.infrastructure.db.models.like import LikeModel
from app.infrastructure.db.models.lyrics import LyricsModel
from app.infrastructure.db.models.play_history import PlayHistoryModel
from app.infrastructure.db.models.playlist import PlaylistModel, PlaylistTrackModel
from app.infrastructure.db.models.track import TrackModel
from app.infrastructure.db.models.user import RefreshTokenModel, UserModel from app.infrastructure.db.models.user import RefreshTokenModel, UserModel
__all__ = ["RefreshTokenModel", "UserModel"] __all__ = [
"AlbumModel",
"ArtistModel",
"DownloadJobModel",
"LikeModel",
"LyricsModel",
"PlayHistoryModel",
"PlaylistModel",
"PlaylistTrackModel",
"RefreshTokenModel",
"TrackModel",
"UserModel",
]
+23
View File
@@ -0,0 +1,23 @@
"""ORM model for albums."""
import uuid
from sqlalchemy import ForeignKey, Integer, String
from sqlalchemy.orm import Mapped, mapped_column
from app.infrastructure.db.base import Base
from app.infrastructure.db.models.mixins import TimestampMixin, UUIDPrimaryKeyMixin
class AlbumModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
__tablename__ = "albums"
title: Mapped[str] = mapped_column(String(1024), index=True, nullable=False)
artist_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("artists.id", ondelete="CASCADE"),
index=True,
nullable=False,
)
year: Mapped[int | None] = mapped_column(Integer, nullable=True)
cover_path: Mapped[str | None] = mapped_column(String(1024), nullable=True)
musicbrainz_id: Mapped[str | None] = mapped_column(String(36), index=True, nullable=True)
+14
View File
@@ -0,0 +1,14 @@
"""ORM model for artists."""
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column
from app.infrastructure.db.base import Base
from app.infrastructure.db.models.mixins import TimestampMixin, UUIDPrimaryKeyMixin
class ArtistModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
__tablename__ = "artists"
name: Mapped[str] = mapped_column(String(512), index=True, nullable=False)
musicbrainz_id: Mapped[str | None] = mapped_column(String(36), index=True, nullable=True)
@@ -0,0 +1,37 @@
"""ORM model for download jobs (plan §6.1).
Tracks a queued download through its lifecycle. ``retry_count`` supports the
exponential-backoff retries that yt-dlp needs; ``progress`` drives the UI
download manager.
"""
import uuid
from sqlalchemy import Float, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column
from app.infrastructure.db.base import Base
from app.infrastructure.db.models.enums import DownloadStatus
from app.infrastructure.db.models.mixins import TimestampMixin, UUIDPrimaryKeyMixin
class DownloadJobModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
__tablename__ = "download_jobs"
source: Mapped[str] = mapped_column(String(32), nullable=False)
source_id: Mapped[str | None] = mapped_column(String(512), nullable=True)
query: Mapped[str | None] = mapped_column(String(1024), nullable=True)
requested_by: Mapped[uuid.UUID | None] = mapped_column(
ForeignKey("users.id", ondelete="SET NULL"),
index=True,
nullable=True,
)
status: Mapped[str] = mapped_column(
String(16),
index=True,
nullable=False,
default=DownloadStatus.QUEUED.value,
)
progress: Mapped[float] = mapped_column(Float, nullable=False, default=0.0)
error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
retry_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
+66
View File
@@ -0,0 +1,66 @@
"""Domain enums used by ORM columns.
Plain ``str``-valued enums, stored as strings (not native PG enums) — adding a
variant is a code change, never a migration (plan §4). Columns map these via
``mapped_column(String(...))`` and persist ``Enum.value``.
"""
import enum
class TrackSource(enum.StrEnum):
"""Which backend imported a track. Drives ``is_replaceable`` (plan §6.6)."""
YOUTUBE = "youtube"
LOCAL = "local"
UPLOAD = "upload"
SOUNDCLOUD = "soundcloud"
BANDCAMP = "bandcamp"
class StoragePolicy(enum.StrEnum):
"""What the system did / must do with the stored format (plan §6.6).
``master_keep`` is inviolable — never auto-optimized.
"""
AS_IS = "as_is"
OPTIMIZED = "optimized"
MASTER_KEEP = "master_keep"
class MetadataStatus(enum.StrEnum):
"""Enrichment state. ``manual`` is never overwritten by auto-enrichment."""
PENDING = "pending"
ENRICHED = "enriched"
FAILED = "failed"
MANUAL = "manual"
class LikeValue(enum.StrEnum):
"""A like event's value. Likes are an append-only log, not a boolean —
current state is the latest event per ``(user, track)`` (plan §4.1)."""
LIKE = "like"
DISLIKE = "dislike"
NEUTRAL = "neutral"
class DownloadStatus(enum.StrEnum):
"""Lifecycle of a download job (plan §6.1)."""
QUEUED = "queued"
DOWNLOADING = "downloading"
ENRICHING = "enriching"
DONE = "done"
FAILED = "failed"
class LyricsStatus(enum.StrEnum):
"""Lyrics fetch outcome. ``not_found`` is cached too (with TTL) so we don't
hammer the provider for tracks that have no lyrics (plan §6.7)."""
FOUND = "found"
NOT_FOUND = "not_found"
PENDING = "pending"
+39
View File
@@ -0,0 +1,39 @@
"""ORM model for likes — an append-only event log.
A like/dislike is **never** updated in place. Current state for a ``(user,
track)`` pair is the latest event by ``created_at``. This shape is required for
future sync and as a clean ML signal (plan §4.1). Hence: no ``updated_at``,
no unique constraint on ``(user, track)``.
"""
import datetime as dt
import uuid
from sqlalchemy import DateTime, ForeignKey, Index, String, func
from sqlalchemy.orm import Mapped, mapped_column
from app.infrastructure.db.base import Base
from app.infrastructure.db.models.mixins import UUIDPrimaryKeyMixin
class LikeModel(UUIDPrimaryKeyMixin, Base):
__tablename__ = "likes"
__table_args__ = (
# Latest-event lookups query by (user, track) ordered by time.
Index("ix_likes_user_id_track_id", "user_id", "track_id"),
)
user_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
)
track_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("tracks.id", ondelete="CASCADE"),
nullable=False,
)
value: Mapped[str] = mapped_column(String(16), nullable=False)
created_at: Mapped[dt.datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
)
+40
View File
@@ -0,0 +1,40 @@
"""ORM model for cached lyrics (plan §6.7).
Cached per track (one row, ``track_id`` unique) so the external provider
(LRCLIB) isn't hit on every play. ``not_found`` is cached too — re-fetch policy
(TTL) lives in the service. ``synced`` holds timestamped LRC; ``plain`` the
fallback text.
"""
import datetime as dt
import uuid
from sqlalchemy import DateTime, ForeignKey, String, Text, func
from sqlalchemy.orm import Mapped, mapped_column
from app.infrastructure.db.base import Base
from app.infrastructure.db.models.enums import LyricsStatus
from app.infrastructure.db.models.mixins import UUIDPrimaryKeyMixin
class LyricsModel(UUIDPrimaryKeyMixin, Base):
__tablename__ = "lyrics"
track_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("tracks.id", ondelete="CASCADE"),
unique=True,
nullable=False,
)
synced: Mapped[str | None] = mapped_column(Text, nullable=True)
plain: Mapped[str | None] = mapped_column(Text, nullable=True)
source: Mapped[str | None] = mapped_column(String(64), nullable=True)
status: Mapped[str] = mapped_column(
String(16),
nullable=False,
default=LyricsStatus.PENDING.value,
)
fetched_at: Mapped[dt.datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
)
@@ -0,0 +1,36 @@
"""ORM model for play history — an append-only event log (scrobbles).
``play_duration_seconds`` (how long was actually listened) feeds skip-rate for
future ML; ``completed`` marks a full play (plan §4).
"""
import datetime as dt
import uuid
from sqlalchemy import Boolean, DateTime, ForeignKey, Index, Integer, func
from sqlalchemy.orm import Mapped, mapped_column
from app.infrastructure.db.base import Base
from app.infrastructure.db.models.mixins import UUIDPrimaryKeyMixin
class PlayHistoryModel(UUIDPrimaryKeyMixin, Base):
__tablename__ = "play_history"
__table_args__ = (Index("ix_play_history_user_id_played_at", "user_id", "played_at"),)
user_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
)
track_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("tracks.id", ondelete="CASCADE"),
index=True,
nullable=False,
)
played_at: Mapped[dt.datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
)
play_duration_seconds: Mapped[int | None] = mapped_column(Integer, nullable=True)
completed: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
+55
View File
@@ -0,0 +1,55 @@
"""ORM models for playlists and their ordered tracks."""
import datetime as dt
import uuid
from sqlalchemy import DateTime, Float, ForeignKey, Integer, String, UniqueConstraint, func
from sqlalchemy.orm import Mapped, mapped_column
from app.infrastructure.db.base import Base
from app.infrastructure.db.models.mixins import TimestampMixin, UUIDPrimaryKeyMixin
class PlaylistModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
__tablename__ = "playlists"
name: Mapped[str] = mapped_column(String(512), nullable=False)
description: Mapped[str | None] = mapped_column(String(2048), nullable=True)
owner_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("users.id", ondelete="CASCADE"),
index=True,
nullable=False,
)
cover_path: Mapped[str | None] = mapped_column(String(1024), nullable=True)
# Optimistic-locking / future sync counter — bumped on every mutation.
version: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
class PlaylistTrackModel(UUIDPrimaryKeyMixin, Base):
"""A track's membership in a playlist.
``position`` is a float so a track can be inserted between two others
without reindexing the whole list (plan §4).
"""
__tablename__ = "playlist_tracks"
__table_args__ = (
UniqueConstraint("playlist_id", "track_id", name="uq_playlist_tracks_playlist_id_track_id"),
)
playlist_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("playlists.id", ondelete="CASCADE"),
index=True,
nullable=False,
)
track_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("tracks.id", ondelete="CASCADE"),
index=True,
nullable=False,
)
position: Mapped[float] = mapped_column(Float, nullable=False)
added_at: Mapped[dt.datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
)
+71
View File
@@ -0,0 +1,71 @@
"""ORM model for tracks — the central entity.
``id`` is the stable, client-facing ``content_id`` (generated app-side by
``UUIDPrimaryKeyMixin``) — never regenerate it. Dedup is enforced on both
``(source, source_id)`` (unique) and ``acoustid_fingerprint`` (indexed) so
imports/downloads stay idempotent (plan §4, §6.1).
"""
import uuid
from sqlalchemy import ForeignKey, Integer, String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column
from app.infrastructure.db.base import Base
from app.infrastructure.db.models.enums import MetadataStatus, StoragePolicy
from app.infrastructure.db.models.mixins import TimestampMixin, UUIDPrimaryKeyMixin
class TrackModel(UUIDPrimaryKeyMixin, TimestampMixin, Base):
__tablename__ = "tracks"
__table_args__ = (
# Dedup by source identity — a source never yields the same id twice.
UniqueConstraint("source", "source_id", name="uq_tracks_source_source_id"),
)
title: Mapped[str] = mapped_column(String(1024), index=True, nullable=False)
artist_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("artists.id", ondelete="CASCADE"),
index=True,
nullable=False,
)
album_id: Mapped[uuid.UUID | None] = mapped_column(
ForeignKey("albums.id", ondelete="SET NULL"),
index=True,
nullable=True,
)
track_number: Mapped[int | None] = mapped_column(Integer, nullable=True)
duration_seconds: Mapped[int | None] = mapped_column(Integer, nullable=True)
genre: Mapped[str | None] = mapped_column(String(255), index=True, nullable=True)
year: Mapped[int | None] = mapped_column(Integer, nullable=True)
# -- file (original, stored as-is) -----------------------------------
storage_uri: Mapped[str] = mapped_column(String(2048), nullable=False)
file_format: Mapped[str] = mapped_column(String(32), nullable=False)
file_size: Mapped[int] = mapped_column(Integer, nullable=False)
bitrate: Mapped[int | None] = mapped_column(Integer, nullable=True)
# -- dedup / external ids --------------------------------------------
acoustid_fingerprint: Mapped[str | None] = mapped_column(String(64), index=True, nullable=True)
musicbrainz_id: Mapped[str | None] = mapped_column(String(36), index=True, nullable=True)
# -- provenance / policy ---------------------------------------------
source: Mapped[str] = mapped_column(String(32), nullable=False)
source_id: Mapped[str] = mapped_column(String(512), nullable=False)
is_replaceable: Mapped[bool] = mapped_column(nullable=False, default=False)
storage_policy: Mapped[str] = mapped_column(
String(16),
nullable=False,
default=StoragePolicy.AS_IS.value,
)
metadata_status: Mapped[str] = mapped_column(
String(16),
nullable=False,
default=MetadataStatus.PENDING.value,
)
added_by: Mapped[uuid.UUID | None] = mapped_column(
ForeignKey("users.id", ondelete="SET NULL"),
index=True,
nullable=True,
)
+16 -1
View File
@@ -1,8 +1,23 @@
"""SQLAlchemy repository adapters implementing the domain ports.""" """SQLAlchemy repository adapters implementing the domain ports."""
from app.infrastructure.db.repositories.album_repository import SqlAlchemyAlbumRepository
from app.infrastructure.db.repositories.artist_repository import SqlAlchemyArtistRepository
from app.infrastructure.db.repositories.history_repository import SqlAlchemyHistoryRepository
from app.infrastructure.db.repositories.like_repository import SqlAlchemyLikeRepository
from app.infrastructure.db.repositories.playlist_repository import SqlAlchemyPlaylistRepository
from app.infrastructure.db.repositories.refresh_token_repository import ( from app.infrastructure.db.repositories.refresh_token_repository import (
SqlAlchemyRefreshTokenRepository, SqlAlchemyRefreshTokenRepository,
) )
from app.infrastructure.db.repositories.track_repository import SqlAlchemyTrackRepository
from app.infrastructure.db.repositories.user_repository import SqlAlchemyUserRepository from app.infrastructure.db.repositories.user_repository import SqlAlchemyUserRepository
__all__ = ["SqlAlchemyRefreshTokenRepository", "SqlAlchemyUserRepository"] __all__ = [
"SqlAlchemyAlbumRepository",
"SqlAlchemyArtistRepository",
"SqlAlchemyHistoryRepository",
"SqlAlchemyLikeRepository",
"SqlAlchemyPlaylistRepository",
"SqlAlchemyRefreshTokenRepository",
"SqlAlchemyTrackRepository",
"SqlAlchemyUserRepository",
]
@@ -0,0 +1,87 @@
"""Album repository — adapter over ``AsyncSession``."""
import uuid
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.entities.album import Album
from app.infrastructure.db.models.album import AlbumModel
from app.infrastructure.db.models.track import TrackModel
def _to_entity(row: AlbumModel) -> Album:
return Album(
id=row.id,
title=row.title,
artist_id=row.artist_id,
year=row.year,
cover_path=row.cover_path,
musicbrainz_id=row.musicbrainz_id,
created_at=row.created_at,
updated_at=row.updated_at,
)
class SqlAlchemyAlbumRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def get_by_id(self, album_id: uuid.UUID) -> Album | None:
row = await self._session.get(AlbumModel, album_id)
return _to_entity(row) if row is not None else None
async def get_many(self, ids: list[uuid.UUID]) -> list[Album]:
if not ids:
return []
rows = (
(await self._session.execute(select(AlbumModel).where(AlbumModel.id.in_(ids))))
.scalars()
.all()
)
return [_to_entity(r) for r in rows]
async def count(self, *, artist_id: uuid.UUID | None, q: str | None) -> int:
stmt = select(func.count()).select_from(AlbumModel)
if artist_id is not None:
stmt = stmt.where(AlbumModel.artist_id == artist_id)
if q:
stmt = stmt.where(AlbumModel.title.ilike(f"%{q}%"))
return (await self._session.execute(stmt)).scalar_one()
async def track_count(self, album_id: uuid.UUID) -> int:
return (
await self._session.execute(
select(func.count()).select_from(TrackModel).where(TrackModel.album_id == album_id)
)
).scalar_one()
async def track_count_many(self, album_ids: list[uuid.UUID]) -> dict[uuid.UUID, int]:
if not album_ids:
return {}
rows = (
await self._session.execute(
select(TrackModel.album_id, func.count(TrackModel.id).label("cnt"))
.where(TrackModel.album_id.in_(album_ids))
.group_by(TrackModel.album_id)
)
).all()
return {row.album_id: row.cnt for row in rows}
# list must come after methods using list[...] in signatures (builtin name shadowing)
async def list(
self,
*,
artist_id: uuid.UUID | None,
q: str | None,
limit: int,
offset: int,
) -> list[Album]:
stmt = select(AlbumModel)
if artist_id is not None:
stmt = stmt.where(AlbumModel.artist_id == artist_id)
if q:
stmt = stmt.where(AlbumModel.title.ilike(f"%{q}%"))
stmt = stmt.order_by(AlbumModel.title).limit(limit).offset(offset)
rows = (await self._session.execute(stmt)).scalars().all()
return [_to_entity(r) for r in rows]
@@ -0,0 +1,82 @@
"""Artist repository — adapter over ``AsyncSession``."""
import uuid
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.entities.track import Artist
from app.infrastructure.db.models.album import AlbumModel
from app.infrastructure.db.models.artist import ArtistModel
from app.infrastructure.db.models.track import TrackModel
def _to_entity(row: ArtistModel) -> Artist:
return Artist(
id=row.id,
name=row.name,
created_at=row.created_at,
updated_at=row.updated_at,
)
class SqlAlchemyArtistRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def get_or_create(self, name: str) -> Artist:
row = (
await self._session.execute(select(ArtistModel).where(ArtistModel.name == name))
).scalar_one_or_none()
if row is None:
row = ArtistModel(name=name)
self._session.add(row)
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def get_by_id(self, artist_id: uuid.UUID) -> Artist | None:
row = await self._session.get(ArtistModel, artist_id)
return _to_entity(row) if row is not None else None
async def get_many(self, ids: list[uuid.UUID]) -> list[Artist]:
if not ids:
return []
rows = (
(await self._session.execute(select(ArtistModel).where(ArtistModel.id.in_(ids))))
.scalars()
.all()
)
return [_to_entity(r) for r in rows]
async def list(self, *, q: str | None, limit: int, offset: int) -> list[Artist]:
stmt = select(ArtistModel)
if q:
stmt = stmt.where(ArtistModel.name.ilike(f"%{q}%"))
stmt = stmt.order_by(ArtistModel.name).limit(limit).offset(offset)
rows = (await self._session.execute(stmt)).scalars().all()
return [_to_entity(r) for r in rows]
async def count(self, *, q: str | None) -> int:
stmt = select(func.count()).select_from(ArtistModel)
if q:
stmt = stmt.where(ArtistModel.name.ilike(f"%{q}%"))
return (await self._session.execute(stmt)).scalar_one()
async def album_count(self, artist_id: uuid.UUID) -> int:
return (
await self._session.execute(
select(func.count())
.select_from(AlbumModel)
.where(AlbumModel.artist_id == artist_id)
)
).scalar_one()
async def track_count(self, artist_id: uuid.UUID) -> int:
return (
await self._session.execute(
select(func.count())
.select_from(TrackModel)
.where(TrackModel.artist_id == artist_id)
)
).scalar_one()
@@ -0,0 +1,72 @@
"""Play history repository — adapter over ``AsyncSession``."""
import datetime as dt
import uuid
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.entities.history import PlayHistoryEntry
from app.infrastructure.db.models.play_history import PlayHistoryModel
def _to_entity(row: PlayHistoryModel) -> PlayHistoryEntry:
return PlayHistoryEntry(
id=row.id,
user_id=row.user_id,
track_id=row.track_id,
played_at=row.played_at,
play_duration_seconds=row.play_duration_seconds,
completed=row.completed,
)
class SqlAlchemyHistoryRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def add(
self,
*,
user_id: uuid.UUID,
track_id: uuid.UUID,
played_at: dt.datetime,
play_duration_seconds: int | None,
completed: bool,
) -> PlayHistoryEntry:
row = PlayHistoryModel(
user_id=user_id,
track_id=track_id,
played_at=played_at,
play_duration_seconds=play_duration_seconds,
completed=completed,
)
self._session.add(row)
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def list(self, *, user_id: uuid.UUID, limit: int, offset: int) -> list[PlayHistoryEntry]:
rows = (
(
await self._session.execute(
select(PlayHistoryModel)
.where(PlayHistoryModel.user_id == user_id)
.order_by(PlayHistoryModel.played_at.desc())
.limit(limit)
.offset(offset)
)
)
.scalars()
.all()
)
return [_to_entity(r) for r in rows]
async def count(self, *, user_id: uuid.UUID) -> int:
return (
await self._session.execute(
select(func.count())
.select_from(PlayHistoryModel)
.where(PlayHistoryModel.user_id == user_id)
)
).scalar_one()
@@ -0,0 +1,150 @@
"""Like repository — adapter over ``AsyncSession``.
Likes are an append-only event log. Current state = latest event per (user, track).
"""
import uuid
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.entities.like import Like
from app.domain.entities.track import Track
from app.infrastructure.db.models.like import LikeModel
from app.infrastructure.db.models.track import TrackModel
def _to_entity(row: LikeModel) -> Like:
return Like(
id=row.id,
user_id=row.user_id,
track_id=row.track_id,
value=row.value,
created_at=row.created_at,
)
def _track_to_entity(row: TrackModel) -> Track:
return Track(
id=row.id,
title=row.title,
artist_id=row.artist_id,
album_id=row.album_id,
storage_uri=row.storage_uri,
file_format=row.file_format,
file_size=row.file_size,
source=row.source,
source_id=row.source_id,
duration_seconds=row.duration_seconds,
genre=row.genre,
year=row.year,
metadata_status=row.metadata_status,
created_at=row.created_at,
updated_at=row.updated_at,
)
class SqlAlchemyLikeRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def add(self, *, user_id: uuid.UUID, track_id: uuid.UUID, value: str) -> Like:
row = LikeModel(user_id=user_id, track_id=track_id, value=value)
self._session.add(row)
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def get_latest_state(
self, *, user_id: uuid.UUID, track_ids: list[uuid.UUID]
) -> list[Like]:
if not track_ids:
return []
# Subquery: max(created_at) per track for this user
max_sq = (
select(
LikeModel.track_id,
func.max(LikeModel.created_at).label("latest"),
)
.where(LikeModel.user_id == user_id, LikeModel.track_id.in_(track_ids))
.group_by(LikeModel.track_id)
.subquery()
)
rows = (
(
await self._session.execute(
select(LikeModel)
.join(
max_sq,
(LikeModel.track_id == max_sq.c.track_id)
& (LikeModel.created_at == max_sq.c.latest),
)
.where(LikeModel.user_id == user_id)
)
)
.scalars()
.all()
)
return [_to_entity(r) for r in rows]
async def list_liked_tracks(
self, *, user_id: uuid.UUID, limit: int, offset: int
) -> list[Track]:
# Tracks where the latest like event has value='like', ordered by like time desc
max_sq = (
select(
LikeModel.track_id,
func.max(LikeModel.created_at).label("latest"),
)
.where(LikeModel.user_id == user_id)
.group_by(LikeModel.track_id)
.subquery()
)
liked_sq = (
select(LikeModel.track_id, LikeModel.created_at)
.join(
max_sq,
(LikeModel.track_id == max_sq.c.track_id)
& (LikeModel.created_at == max_sq.c.latest),
)
.where(LikeModel.user_id == user_id, LikeModel.value == "like")
.subquery()
)
rows = (
(
await self._session.execute(
select(TrackModel)
.join(liked_sq, TrackModel.id == liked_sq.c.track_id)
.order_by(liked_sq.c.created_at.desc())
.limit(limit)
.offset(offset)
)
)
.scalars()
.all()
)
return [_track_to_entity(r) for r in rows]
async def count_liked_tracks(self, *, user_id: uuid.UUID) -> int:
max_sq = (
select(
LikeModel.track_id,
func.max(LikeModel.created_at).label("latest"),
)
.where(LikeModel.user_id == user_id)
.group_by(LikeModel.track_id)
.subquery()
)
liked_sq = (
select(LikeModel.track_id)
.join(
max_sq,
(LikeModel.track_id == max_sq.c.track_id)
& (LikeModel.created_at == max_sq.c.latest),
)
.where(LikeModel.user_id == user_id, LikeModel.value == "like")
.subquery()
)
return (
await self._session.execute(select(func.count()).select_from(liked_sq))
).scalar_one()
@@ -0,0 +1,188 @@
"""Playlist repository — adapter over ``AsyncSession``."""
import uuid
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.entities.playlist import Playlist
from app.domain.entities.track import Track
from app.infrastructure.db.models.playlist import PlaylistModel, PlaylistTrackModel
from app.infrastructure.db.models.track import TrackModel
def _to_entity(row: PlaylistModel) -> Playlist:
return Playlist(
id=row.id,
name=row.name,
description=row.description,
owner_id=row.owner_id,
version=row.version,
created_at=row.created_at,
updated_at=row.updated_at,
)
def _track_to_entity(row: TrackModel) -> Track:
return Track(
id=row.id,
title=row.title,
artist_id=row.artist_id,
album_id=row.album_id,
storage_uri=row.storage_uri,
file_format=row.file_format,
file_size=row.file_size,
source=row.source,
source_id=row.source_id,
duration_seconds=row.duration_seconds,
genre=row.genre,
year=row.year,
metadata_status=row.metadata_status,
created_at=row.created_at,
updated_at=row.updated_at,
)
class SqlAlchemyPlaylistRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def get_by_id(self, playlist_id: uuid.UUID) -> Playlist | None:
row = await self._session.get(PlaylistModel, playlist_id)
return _to_entity(row) if row is not None else None
async def count(self, *, owner_id: uuid.UUID) -> int:
return (
await self._session.execute(
select(func.count())
.select_from(PlaylistModel)
.where(PlaylistModel.owner_id == owner_id)
)
).scalar_one()
async def add(self, *, name: str, description: str | None, owner_id: uuid.UUID) -> Playlist:
row = PlaylistModel(name=name, description=description, owner_id=owner_id)
self._session.add(row)
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def update(
self, playlist_id: uuid.UUID, *, name: str | None, description: str | None
) -> Playlist:
row = await self._session.get(PlaylistModel, playlist_id)
if row is None:
from app.domain.errors import NotFoundError
raise NotFoundError(f"Playlist {playlist_id} not found.")
if name is not None:
row.name = name
if description is not None:
row.description = description
row.version = row.version + 1
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def delete(self, playlist_id: uuid.UUID) -> None:
row = await self._session.get(PlaylistModel, playlist_id)
if row is not None:
await self._session.delete(row)
await self._session.flush()
async def track_count(self, playlist_id: uuid.UUID) -> int:
return (
await self._session.execute(
select(func.count())
.select_from(PlaylistTrackModel)
.where(PlaylistTrackModel.playlist_id == playlist_id)
)
).scalar_one()
async def track_count_many(self, playlist_ids: list[uuid.UUID]) -> dict[uuid.UUID, int]:
if not playlist_ids:
return {}
rows = (
await self._session.execute(
select(
PlaylistTrackModel.playlist_id,
func.count(PlaylistTrackModel.id).label("cnt"),
)
.where(PlaylistTrackModel.playlist_id.in_(playlist_ids))
.group_by(PlaylistTrackModel.playlist_id)
)
).all()
return {row.playlist_id: row.cnt for row in rows}
async def get_tracks(self, playlist_id: uuid.UUID, *, limit: int, offset: int) -> list[Track]:
rows = (
(
await self._session.execute(
select(TrackModel)
.join(PlaylistTrackModel, TrackModel.id == PlaylistTrackModel.track_id)
.where(PlaylistTrackModel.playlist_id == playlist_id)
.order_by(PlaylistTrackModel.position)
.limit(limit)
.offset(offset)
)
)
.scalars()
.all()
)
return [_track_to_entity(r) for r in rows]
async def get_track_total(self, playlist_id: uuid.UUID) -> int:
return await self.track_count(playlist_id)
async def add_track(
self, playlist_id: uuid.UUID, track_id: uuid.UUID, *, position: float
) -> None:
row = PlaylistTrackModel(playlist_id=playlist_id, track_id=track_id, position=position)
self._session.add(row)
playlist = await self._session.get(PlaylistModel, playlist_id)
if playlist is not None:
playlist.version = playlist.version + 1
await self._session.flush()
async def remove_track(self, playlist_id: uuid.UUID, track_id: uuid.UUID) -> None:
row = (
await self._session.execute(
select(PlaylistTrackModel).where(
PlaylistTrackModel.playlist_id == playlist_id,
PlaylistTrackModel.track_id == track_id,
)
)
).scalar_one_or_none()
if row is not None:
await self._session.delete(row)
playlist = await self._session.get(PlaylistModel, playlist_id)
if playlist is not None:
playlist.version = playlist.version + 1
await self._session.flush()
async def max_position(self, playlist_id: uuid.UUID) -> float:
result = (
await self._session.execute(
select(func.max(PlaylistTrackModel.position)).where(
PlaylistTrackModel.playlist_id == playlist_id
)
)
).scalar_one_or_none()
return float(result) if result is not None else 0.0
# list must come after methods using list[...] in signatures (builtin name shadowing)
async def list(self, *, owner_id: uuid.UUID, limit: int, offset: int) -> list[Playlist]:
rows = (
(
await self._session.execute(
select(PlaylistModel)
.where(PlaylistModel.owner_id == owner_id)
.order_by(PlaylistModel.updated_at.desc())
.limit(limit)
.offset(offset)
)
)
.scalars()
.all()
)
return [_to_entity(r) for r in rows]
@@ -0,0 +1,160 @@
"""Track repository — adapter over ``AsyncSession``."""
import uuid
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.entities.track import Track
from app.domain.errors import NotFoundError
from app.infrastructure.db.models.artist import ArtistModel
from app.infrastructure.db.models.track import TrackModel
def _to_entity(row: TrackModel) -> Track:
return Track(
id=row.id,
title=row.title,
artist_id=row.artist_id,
album_id=row.album_id,
storage_uri=row.storage_uri,
file_format=row.file_format,
file_size=row.file_size,
source=row.source,
source_id=row.source_id,
duration_seconds=row.duration_seconds,
genre=row.genre,
year=row.year,
metadata_status=row.metadata_status,
created_at=row.created_at,
updated_at=row.updated_at,
)
class SqlAlchemyTrackRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def get_by_id(self, track_id: uuid.UUID) -> Track | None:
row = await self._session.get(TrackModel, track_id)
return _to_entity(row) if row is not None else None
async def get_by_source(self, source: str, source_id: str) -> Track | None:
row = (
await self._session.execute(
select(TrackModel).where(
TrackModel.source == source,
TrackModel.source_id == source_id,
)
)
).scalar_one_or_none()
return _to_entity(row) if row is not None else None
async def add(
self,
*,
id: uuid.UUID,
title: str,
artist_id: uuid.UUID,
storage_uri: str,
file_format: str,
file_size: int,
source: str,
source_id: str,
metadata_status: str,
added_by: uuid.UUID | None,
) -> Track:
row = TrackModel(
id=id,
title=title,
artist_id=artist_id,
storage_uri=storage_uri,
file_format=file_format,
file_size=file_size,
source=source,
source_id=source_id,
metadata_status=metadata_status,
added_by=added_by,
)
self._session.add(row)
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def delete(self, track_id: uuid.UUID) -> None:
row = await self._session.get(TrackModel, track_id)
if row is not None:
await self._session.delete(row)
await self._session.flush()
async def list(
self,
*,
artist_id: uuid.UUID | None,
album_id: uuid.UUID | None,
q: str | None,
sort_by: str = "created_at",
order: str = "desc",
limit: int = 50,
offset: int = 0,
) -> list[Track]:
stmt = select(TrackModel)
if artist_id is not None:
stmt = stmt.where(TrackModel.artist_id == artist_id)
if album_id is not None:
stmt = stmt.where(TrackModel.album_id == album_id)
if q:
stmt = stmt.where(TrackModel.title.ilike(f"%{q}%"))
if sort_by == "artist":
stmt = stmt.join(ArtistModel, TrackModel.artist_id == ArtistModel.id)
col_artist = ArtistModel.name
stmt = stmt.order_by(col_artist.asc() if order == "asc" else col_artist.desc())
elif sort_by == "title":
col_title = TrackModel.title
stmt = stmt.order_by(col_title.asc() if order == "asc" else col_title.desc())
else:
stmt = stmt.order_by(
TrackModel.created_at.asc() if order == "asc" else TrackModel.created_at.desc()
)
stmt = stmt.limit(limit).offset(offset)
rows = (await self._session.execute(stmt)).scalars().all()
return [_to_entity(r) for r in rows]
async def count(
self,
*,
artist_id: uuid.UUID | None,
album_id: uuid.UUID | None,
q: str | None,
) -> int:
stmt = select(func.count()).select_from(TrackModel)
if artist_id is not None:
stmt = stmt.where(TrackModel.artist_id == artist_id)
if album_id is not None:
stmt = stmt.where(TrackModel.album_id == album_id)
if q:
stmt = stmt.where(TrackModel.title.ilike(f"%{q}%"))
return (await self._session.execute(stmt)).scalar_one()
async def update(
self,
track_id: uuid.UUID,
*,
title: str | None,
genre: str | None,
year: int | None,
) -> Track:
row = await self._session.get(TrackModel, track_id)
if row is None:
raise NotFoundError(f"Track {track_id} not found.")
if title is not None:
row.title = title
if genre is not None:
row.genre = genre
if year is not None:
row.year = year
row.metadata_status = "manual"
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
+1
View File
@@ -0,0 +1 @@
"""File storage adapters."""
+86
View File
@@ -0,0 +1,86 @@
"""LocalFileStorage — stores files on the local filesystem."""
import os
import shutil
from collections.abc import AsyncGenerator, AsyncIterator
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from pathlib import Path
import anyio
from app.domain.entities.storage import ObjectStat
from app.domain.errors import StorageError
_EXT_CONTENT_TYPE: dict[str, str] = {
"mp3": "audio/mpeg",
"flac": "audio/flac",
"m4a": "audio/mp4",
"aac": "audio/aac",
"ogg": "audio/ogg",
"opus": "audio/ogg",
"wav": "audio/wav",
"wma": "audio/x-ms-wma",
"aiff": "audio/aiff",
"aif": "audio/aiff",
}
class LocalFileStorage:
def __init__(self, media_path: Path) -> None:
self._media_path = media_path
async def save_file(self, key: str, src_path: Path) -> int:
dest = self._media_path / key
dest.parent.mkdir(parents=True, exist_ok=True)
part = dest.with_suffix(dest.suffix + ".part")
shutil.copyfile(str(src_path), str(part))
os.replace(str(part), str(dest))
return dest.stat().st_size
async def open_range(
self, key: str, start: int, end: int | None
) -> tuple[AsyncIterator[bytes], int]:
path = self._media_path / key
if not path.exists():
raise StorageError(f"Object not found: {key}")
total_size = path.stat().st_size
_start = start
_end = end
_total_size = total_size
_path = path
async def _iter() -> AsyncGenerator[bytes]:
async with await anyio.open_file(_path, "rb") as f:
await f.seek(_start)
remaining = (_end - _start + 1) if _end is not None else (_total_size - _start)
while remaining > 0:
chunk: bytes = await f.read(min(65536, remaining))
if not chunk:
break
yield chunk
remaining -= len(chunk)
aiter: AsyncIterator[bytes] = _iter()
return aiter, total_size
async def stat(self, key: str) -> ObjectStat:
path = self._media_path / key
if not path.exists():
raise StorageError(f"Object not found: {key}")
st = path.stat()
ext = path.suffix.lower().lstrip(".")
return ObjectStat(size=st.st_size, content_type=_EXT_CONTENT_TYPE.get(ext))
async def exists(self, key: str) -> bool:
return (self._media_path / key).exists()
async def delete(self, key: str) -> None:
(self._media_path / key).unlink(missing_ok=True)
def as_local_path(self, key: str) -> AbstractAsyncContextManager[Path]:
return self._as_local_path_cm(key)
@asynccontextmanager
async def _as_local_path_cm(self, key: str) -> AsyncGenerator[Path]:
yield self._media_path / key
+31
View File
@@ -0,0 +1,31 @@
"""File storage provider — singleton factory wired from config."""
from app.core.config import get_settings
from app.domain.ports import FileStorage
from app.infrastructure.storage.local import LocalFileStorage
from app.infrastructure.storage.s3 import S3FileStorage
_storage: FileStorage | None = None
def get_file_storage() -> FileStorage:
global _storage
if _storage is None:
settings = get_settings()
if settings.storage_backend == "s3":
if not settings.s3_bucket:
raise RuntimeError("S3_BUCKET must be set when STORAGE_BACKEND=s3")
_storage = S3FileStorage(
settings.s3_bucket,
endpoint_url=settings.s3_endpoint_url,
region_name=settings.s3_region,
access_key=settings.s3_access_key.get_secret_value()
if settings.s3_access_key
else None,
secret_key=settings.s3_secret_key.get_secret_value()
if settings.s3_secret_key
else None,
)
else:
_storage = LocalFileStorage(settings.media_path)
return _storage
+157
View File
@@ -0,0 +1,157 @@
"""S3FileStorage — stores files in any S3-compatible object store."""
import tempfile
from collections.abc import AsyncGenerator, AsyncIterator
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from pathlib import Path
from typing import Any
import aioboto3
import anyio
from botocore.exceptions import ClientError
from app.domain.entities.storage import ObjectStat
from app.domain.errors import StorageError
_EXT_CONTENT_TYPE: dict[str, str] = {
"mp3": "audio/mpeg",
"flac": "audio/flac",
"m4a": "audio/mp4",
"aac": "audio/aac",
"ogg": "audio/ogg",
"opus": "audio/ogg",
"wav": "audio/wav",
"wma": "audio/x-ms-wma",
"aiff": "audio/aiff",
"aif": "audio/aiff",
}
def _not_found(key: str, exc: Exception) -> StorageError:
err = StorageError(f"Object not found: {key}")
err.__cause__ = exc
return err
def _is_404(exc: ClientError) -> bool:
return exc.response["Error"]["Code"] in ("404", "NoSuchKey")
class S3FileStorage:
def __init__(
self,
bucket: str,
*,
endpoint_url: str | None = None,
region_name: str | None = None,
access_key: str | None = None,
secret_key: str | None = None,
) -> None:
self._bucket = bucket
self._endpoint_url = endpoint_url
self._session: Any = aioboto3.Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region_name,
)
def _client(self) -> Any:
return self._session.client("s3", endpoint_url=self._endpoint_url)
async def save_file(self, key: str, src_path: Path) -> int:
async with await anyio.open_file(src_path, "rb") as f:
content: bytes = await f.read()
async with self._client() as s3:
await s3.put_object(Bucket=self._bucket, Key=key, Body=content)
return len(content)
async def open_range(
self, key: str, start: int, end: int | None
) -> tuple[AsyncIterator[bytes], int]:
async with self._client() as s3:
try:
head = await s3.head_object(Bucket=self._bucket, Key=key)
except ClientError as exc:
if _is_404(exc):
raise _not_found(key, exc) from exc
raise StorageError(str(exc)) from exc
total_size: int = head["ContentLength"]
range_header = f"bytes={start}-{end}" if end is not None else f"bytes={start}-"
_bucket = self._bucket
_key = key
async def _stream() -> AsyncGenerator[bytes]:
async with self._client() as s3:
try:
resp = await s3.get_object(
Bucket=_bucket, Key=_key, Range=range_header
)
except ClientError as exc:
raise StorageError(str(exc)) from exc
body = resp["Body"]
while True:
chunk: bytes = await body.read(65536)
if not chunk:
break
yield chunk
return _stream(), total_size
async def stat(self, key: str) -> ObjectStat:
async with self._client() as s3:
try:
head = await s3.head_object(Bucket=self._bucket, Key=key)
except ClientError as exc:
if _is_404(exc):
raise _not_found(key, exc) from exc
raise StorageError(str(exc)) from exc
ext = Path(key).suffix.lower().lstrip(".")
return ObjectStat(
size=head["ContentLength"],
content_type=head.get("ContentType") or _EXT_CONTENT_TYPE.get(ext),
)
async def exists(self, key: str) -> bool:
async with self._client() as s3:
try:
await s3.head_object(Bucket=self._bucket, Key=key)
return True
except ClientError as exc:
if _is_404(exc):
return False
raise StorageError(str(exc)) from exc
async def delete(self, key: str) -> None:
async with self._client() as s3:
try:
await s3.delete_object(Bucket=self._bucket, Key=key)
except ClientError as exc:
raise StorageError(str(exc)) from exc
def as_local_path(self, key: str) -> AbstractAsyncContextManager[Path]:
return self._as_local_path_cm(key)
@asynccontextmanager
async def _as_local_path_cm(self, key: str) -> AsyncGenerator[Path]:
suffix = Path(key).suffix
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as f:
tmp_path = Path(f.name)
try:
async with self._client() as s3:
try:
resp = await s3.get_object(Bucket=self._bucket, Key=key)
except ClientError as exc:
if _is_404(exc):
raise _not_found(key, exc) from exc
raise StorageError(str(exc)) from exc
async with await anyio.open_file(tmp_path, "wb") as out:
body = resp["Body"]
while True:
chunk: bytes = await body.read(65536)
if not chunk:
break
await out.write(chunk)
yield tmp_path
finally:
await anyio.Path(tmp_path).unlink(missing_ok=True)
+6
View File
@@ -23,6 +23,8 @@ dependencies = [
"pwdlib[argon2]>=0.2.1", "pwdlib[argon2]>=0.2.1",
# outbound http (ML client, MusicBrainz, AcoustID) # outbound http (ML client, MusicBrainz, AcoustID)
"httpx>=0.28", "httpx>=0.28",
# S3-compatible object storage
"aioboto3>=13.0",
# logging # logging
"structlog>=24.4", "structlog>=24.4",
] ]
@@ -80,6 +82,10 @@ disallow_untyped_defs = true
module = ["arq.*"] module = ["arq.*"]
ignore_missing_imports = true ignore_missing_imports = true
[[tool.mypy.overrides]]
module = ["aioboto3.*", "aiobotocore.*", "botocore.*"]
ignore_missing_imports = true
[tool.pytest.ini_options] [tool.pytest.ini_options]
asyncio_mode = "auto" asyncio_mode = "auto"
testpaths = ["tests"] testpaths = ["tests"]
+2 -1
View File
@@ -10,7 +10,7 @@ from collections.abc import AsyncIterator
import pytest import pytest
from app.core.security import Argon2PasswordHasher from app.core.security import Argon2PasswordHasher
from app.infrastructure.db import Base, get_engine, session_scope from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
from app.infrastructure.db.repositories import ( from app.infrastructure.db.repositories import (
SqlAlchemyRefreshTokenRepository, SqlAlchemyRefreshTokenRepository,
SqlAlchemyUserRepository, SqlAlchemyUserRepository,
@@ -73,6 +73,7 @@ async def api() -> AsyncIterator[AsyncClient]:
async with get_engine().begin() as conn: async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all) await conn.run_sync(Base.metadata.drop_all)
await dispose_engine()
async def _login(api: AsyncClient, username: str, password: str) -> tuple[str, str]: async def _login(api: AsyncClient, username: str, password: str) -> tuple[str, str]:
+104
View File
@@ -0,0 +1,104 @@
"""Unit tests for LocalFileStorage."""
import pytest
from app.infrastructure.storage.local import LocalFileStorage
pytestmark = pytest.mark.asyncio
async def test_save_and_stat(tmp_path):
storage = LocalFileStorage(tmp_path)
src = tmp_path / "src.mp3"
src.write_bytes(b"test audio data")
size = await storage.save_file("tracks/te/test.mp3", src)
assert size == 15
stat = await storage.stat("tracks/te/test.mp3")
assert stat.size == 15
assert stat.content_type == "audio/mpeg"
async def test_save_creates_parent_dirs(tmp_path):
storage = LocalFileStorage(tmp_path)
src = tmp_path / "src.flac"
src.write_bytes(b"x")
await storage.save_file("tracks/ab/cdef.flac", src)
assert (tmp_path / "tracks" / "ab" / "cdef.flac").exists()
async def test_open_range_full(tmp_path):
storage = LocalFileStorage(tmp_path)
data = b"hello world" * 100
src = tmp_path / "src.flac"
src.write_bytes(data)
await storage.save_file("tracks/he/hello.flac", src)
stream, total = await storage.open_range("tracks/he/hello.flac", 0, None)
result = b"".join([chunk async for chunk in stream])
assert result == data
assert total == len(data)
async def test_open_range_partial(tmp_path):
storage = LocalFileStorage(tmp_path)
data = b"0123456789"
src = tmp_path / "src.mp3"
src.write_bytes(data)
await storage.save_file("tracks/sr/src.mp3", src)
stream, total = await storage.open_range("tracks/sr/src.mp3", 3, 7)
result = b"".join([chunk async for chunk in stream])
assert result == b"34567"
assert total == 10
async def test_open_range_from_offset_to_end(tmp_path):
storage = LocalFileStorage(tmp_path)
data = b"abcdefghij"
src = tmp_path / "src.wav"
src.write_bytes(data)
await storage.save_file("tracks/sr/src.wav", src)
stream, total = await storage.open_range("tracks/sr/src.wav", 5, None)
result = b"".join([chunk async for chunk in stream])
assert result == b"fghij"
assert total == 10
async def test_exists_and_delete(tmp_path):
storage = LocalFileStorage(tmp_path)
src = tmp_path / "src.ogg"
src.write_bytes(b"ogg data")
await storage.save_file("tracks/sr/src.ogg", src)
assert await storage.exists("tracks/sr/src.ogg") is True
await storage.delete("tracks/sr/src.ogg")
assert await storage.exists("tracks/sr/src.ogg") is False
async def test_delete_missing_is_noop(tmp_path):
storage = LocalFileStorage(tmp_path)
await storage.delete("tracks/no/nope.mp3")
async def test_as_local_path(tmp_path):
storage = LocalFileStorage(tmp_path)
src = tmp_path / "src.mp3"
src.write_bytes(b"local bytes")
await storage.save_file("tracks/lo/local.mp3", src)
async with storage.as_local_path("tracks/lo/local.mp3") as path:
assert path.read_bytes() == b"local bytes"
async def test_stat_unknown_extension(tmp_path):
storage = LocalFileStorage(tmp_path)
src = tmp_path / "src.xyz"
src.write_bytes(b"mystery")
await storage.save_file("tracks/my/mystery.xyz", src)
stat = await storage.stat("tracks/my/mystery.xyz")
assert stat.size == 7
assert stat.content_type is None
+254
View File
@@ -0,0 +1,254 @@
"""Unit tests for S3FileStorage — all S3 calls are mocked."""
from __future__ import annotations
import io
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from app.domain.errors import StorageError
from app.infrastructure.storage.s3 import S3FileStorage
def _make_storage(**kwargs: Any) -> S3FileStorage:
return S3FileStorage("test-bucket", **kwargs)
def _client_error(code: str) -> Exception:
from botocore.exceptions import ClientError
return ClientError({"Error": {"Code": code, "Message": code}}, "op")
class _FakeBody:
"""Async-iterable body that yields chunks from a bytes buffer."""
def __init__(self, data: bytes, chunk_size: int = 65536) -> None:
self._buf = io.BytesIO(data)
self._chunk_size = chunk_size
async def read(self, size: int = -1) -> bytes:
return self._buf.read(size)
def _make_client_ctx(s3_mock: Any) -> Any:
ctx = MagicMock()
ctx.__aenter__ = AsyncMock(return_value=s3_mock)
ctx.__aexit__ = AsyncMock(return_value=False)
return ctx
@pytest.fixture()
def storage() -> S3FileStorage:
return _make_storage()
# ---------------------------------------------------------------------------
# save_file
# ---------------------------------------------------------------------------
async def test_save_file_calls_put_object(tmp_path: Path, storage: S3FileStorage) -> None:
src = tmp_path / "track.mp3"
src.write_bytes(b"audio bytes")
s3 = AsyncMock()
with patch.object(storage, "_client", return_value=_make_client_ctx(s3)):
size = await storage.save_file("tracks/ab/track.mp3", src)
s3.put_object.assert_awaited_once_with(
Bucket="test-bucket", Key="tracks/ab/track.mp3", Body=b"audio bytes"
)
assert size == 11
# ---------------------------------------------------------------------------
# stat
# ---------------------------------------------------------------------------
async def test_stat_returns_size_and_content_type(storage: S3FileStorage) -> None:
s3 = AsyncMock()
s3.head_object.return_value = {"ContentLength": 1024, "ContentType": "audio/mpeg"}
with patch.object(storage, "_client", return_value=_make_client_ctx(s3)):
stat = await storage.stat("tracks/ab/track.mp3")
assert stat.size == 1024
assert stat.content_type == "audio/mpeg"
async def test_stat_falls_back_to_ext_content_type(storage: S3FileStorage) -> None:
s3 = AsyncMock()
s3.head_object.return_value = {"ContentLength": 500, "ContentType": None}
with patch.object(storage, "_client", return_value=_make_client_ctx(s3)):
stat = await storage.stat("tracks/ab/track.flac")
assert stat.content_type == "audio/flac"
async def test_stat_not_found_raises_storage_error(storage: S3FileStorage) -> None:
s3 = AsyncMock()
s3.head_object.side_effect = _client_error("404")
with patch.object(storage, "_client", return_value=_make_client_ctx(s3)):
with pytest.raises(StorageError, match="not found"):
await storage.stat("tracks/missing.mp3")
# ---------------------------------------------------------------------------
# exists
# ---------------------------------------------------------------------------
async def test_exists_true(storage: S3FileStorage) -> None:
s3 = AsyncMock()
s3.head_object.return_value = {"ContentLength": 1}
with patch.object(storage, "_client", return_value=_make_client_ctx(s3)):
assert await storage.exists("tracks/ab/track.mp3") is True
async def test_exists_false_on_404(storage: S3FileStorage) -> None:
s3 = AsyncMock()
s3.head_object.side_effect = _client_error("NoSuchKey")
with patch.object(storage, "_client", return_value=_make_client_ctx(s3)):
assert await storage.exists("tracks/missing.mp3") is False
# ---------------------------------------------------------------------------
# delete
# ---------------------------------------------------------------------------
async def test_delete_calls_delete_object(storage: S3FileStorage) -> None:
s3 = AsyncMock()
with patch.object(storage, "_client", return_value=_make_client_ctx(s3)):
await storage.delete("tracks/ab/track.mp3")
s3.delete_object.assert_awaited_once_with(Bucket="test-bucket", Key="tracks/ab/track.mp3")
# ---------------------------------------------------------------------------
# open_range
# ---------------------------------------------------------------------------
async def test_open_range_full(storage: S3FileStorage) -> None:
data = b"hello world"
s3 = AsyncMock()
s3.head_object.return_value = {"ContentLength": len(data)}
s3.get_object.return_value = {"Body": _FakeBody(data)}
with patch.object(storage, "_client", return_value=_make_client_ctx(s3)):
stream, total = await storage.open_range("tracks/ab/t.mp3", 0, None)
chunks = [c async for c in stream]
assert b"".join(chunks) == data
assert total == len(data)
s3.get_object.assert_awaited_once_with(
Bucket="test-bucket", Key="tracks/ab/t.mp3", Range="bytes=0-"
)
async def test_open_range_partial(storage: S3FileStorage) -> None:
full = b"0123456789"
ranged = b"34567"
s3 = AsyncMock()
s3.head_object.return_value = {"ContentLength": len(full)}
s3.get_object.return_value = {"Body": _FakeBody(ranged)}
with patch.object(storage, "_client", return_value=_make_client_ctx(s3)):
stream, total = await storage.open_range("tracks/ab/t.mp3", 3, 7)
result = b"".join([c async for c in stream])
assert result == ranged
assert total == len(full)
s3.get_object.assert_awaited_once_with(
Bucket="test-bucket", Key="tracks/ab/t.mp3", Range="bytes=3-7"
)
async def test_open_range_not_found_raises_storage_error(storage: S3FileStorage) -> None:
s3 = AsyncMock()
s3.head_object.side_effect = _client_error("NoSuchKey")
with patch.object(storage, "_client", return_value=_make_client_ctx(s3)):
with pytest.raises(StorageError, match="not found"):
await storage.open_range("tracks/missing.mp3", 0, None)
# ---------------------------------------------------------------------------
# as_local_path
# ---------------------------------------------------------------------------
async def test_as_local_path_yields_file_with_content(storage: S3FileStorage) -> None:
data = b"local copy bytes"
s3 = AsyncMock()
s3.get_object.return_value = {"Body": _FakeBody(data)}
with patch.object(storage, "_client", return_value=_make_client_ctx(s3)):
async with storage.as_local_path("tracks/ab/track.mp3") as path:
assert path.exists()
assert path.read_bytes() == data
assert not path.exists()
async def test_as_local_path_cleans_up_on_error(storage: S3FileStorage) -> None:
s3 = AsyncMock()
s3.get_object.side_effect = _client_error("NoSuchKey")
captured: list[Path] = []
with patch.object(storage, "_client", return_value=_make_client_ctx(s3)):
with pytest.raises(StorageError):
async with storage.as_local_path("tracks/missing.mp3") as path:
captured.append(path)
if captured:
assert not captured[0].exists()
# ---------------------------------------------------------------------------
# provider wiring
# ---------------------------------------------------------------------------
def test_provider_returns_s3_storage_when_configured(tmp_path: Path) -> None:
from app.core.config import Settings
from app.infrastructure.storage import provider
provider._storage = None
mock_settings = Settings(
database_url="postgresql+asyncpg://x:x@localhost/x",
storage_backend="s3",
s3_bucket="my-bucket",
s3_region="us-east-1",
)
with patch("app.infrastructure.storage.provider.get_settings", return_value=mock_settings):
storage_instance = provider.get_file_storage()
assert isinstance(storage_instance, S3FileStorage)
provider._storage = None # reset singleton for other tests
def test_provider_raises_when_s3_bucket_missing() -> None:
from app.core.config import Settings
from app.infrastructure.storage import provider
provider._storage = None
mock_settings = Settings(
database_url="postgresql+asyncpg://x:x@localhost/x",
storage_backend="s3",
s3_bucket=None,
)
with patch("app.infrastructure.storage.provider.get_settings", return_value=mock_settings):
with pytest.raises(RuntimeError, match="S3_BUCKET"):
provider.get_file_storage()
provider._storage = None
+194
View File
@@ -0,0 +1,194 @@
"""Integration tests for upload and streaming endpoints.
Requires a reachable Postgres; skips otherwise.
"""
import asyncio
import os
from collections.abc import AsyncIterator
from pathlib import Path
import pytest
from app.core.config import get_settings
from app.infrastructure.db import Base, dispose_engine, get_engine, session_scope
from app.infrastructure.db.repositories import (
SqlAlchemyRefreshTokenRepository,
SqlAlchemyUserRepository,
)
from asgi_lifespan import LifespanManager
from httpx import ASGITransport, AsyncClient
pytestmark = pytest.mark.asyncio
_db_reachable_cache: bool | None = None
async def _db_reachable() -> bool:
global _db_reachable_cache
if _db_reachable_cache is not None:
return _db_reachable_cache
from sqlalchemy import text
try:
async with asyncio.timeout(3):
async with get_engine().connect() as conn:
await conn.execute(text("SELECT 1"))
_db_reachable_cache = True
except Exception:
_db_reachable_cache = False
return _db_reachable_cache
@pytest.fixture
async def api(tmp_path: Path) -> AsyncIterator[AsyncClient]:
if not await _db_reachable():
pytest.skip("Postgres not reachable — integration test skipped.")
os.environ["MEDIA_PATH"] = str(tmp_path)
get_settings.cache_clear()
# Also reset the file storage singleton so it picks up the new media_path
import app.infrastructure.storage.provider as _storage_provider
_storage_provider._storage = None
try:
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
from app.application.user_service import UserService
from app.core.security import Argon2PasswordHasher
async with session_scope() as session:
await UserService(
users=SqlAlchemyUserRepository(session),
refresh_tokens=SqlAlchemyRefreshTokenRepository(session),
hasher=Argon2PasswordHasher(),
).create_user(username="testuser", password="testpass1", is_superuser=False)
from app.main import create_app
app = create_app()
async with LifespanManager(app):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
async with get_engine().begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await dispose_engine()
finally:
_storage_provider._storage = None
os.environ.pop("MEDIA_PATH", None)
get_settings.cache_clear()
async def _login(api: AsyncClient) -> str:
resp = await api.post(
"/api/v1/auth/login", json={"username": "testuser", "password": "testpass1"}
)
assert resp.status_code == 200
return str(resp.json()["access_token"])
async def test_upload_creates_track(api: AsyncClient) -> None:
token = await _login(api)
audio = b"fake mp3 bytes" * 100
resp = await api.post(
"/api/v1/upload",
files={"file": ("song.mp3", audio, "audio/mpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["already_exists"] is False
assert body["title"] == "song"
assert "track_id" in body
async def test_upload_dedup(api: AsyncClient) -> None:
token = await _login(api)
audio = b"same content" * 50
first = await api.post(
"/api/v1/upload",
files={"file": ("a.mp3", audio, "audio/mpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert first.status_code == 200
assert first.json()["already_exists"] is False
second = await api.post(
"/api/v1/upload",
files={"file": ("b.mp3", audio, "audio/mpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert second.status_code == 200
assert second.json()["already_exists"] is True
assert second.json()["track_id"] == first.json()["track_id"]
async def test_stream_full(api: AsyncClient) -> None:
token = await _login(api)
audio = b"audio data for streaming" * 10
up = await api.post(
"/api/v1/upload",
files={"file": ("track.mp3", audio, "audio/mpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert up.status_code == 200
track_id = up.json()["track_id"]
# Browser <audio> can't send headers — auth rides on the ?token= query param.
resp = await api.get(f"/api/v1/stream/{track_id}?token={token}")
assert resp.status_code == 200
assert resp.content == audio
assert resp.headers["content-type"].startswith("audio/mpeg")
assert "accept-ranges" in resp.headers
async def test_stream_range(api: AsyncClient) -> None:
token = await _login(api)
audio = b"0123456789" * 10
up = await api.post(
"/api/v1/upload",
files={"file": ("range.mp3", audio, "audio/mpeg")},
headers={"Authorization": f"Bearer {token}"},
)
assert up.status_code == 200
track_id = up.json()["track_id"]
resp = await api.get(
f"/api/v1/stream/{track_id}",
headers={"Range": "bytes=0-9", "Authorization": f"Bearer {token}"},
)
assert resp.status_code == 206
assert resp.content == b"0123456789"
assert resp.headers["content-range"] == f"bytes 0-9/{len(audio)}"
assert resp.headers["content-length"] == "10"
async def test_stream_not_found(api: AsyncClient) -> None:
token = await _login(api)
resp = await api.get(
f"/api/v1/stream/00000000-0000-0000-0000-000000000000?token={token}"
)
assert resp.status_code == 404
async def test_stream_requires_auth(api: AsyncClient) -> None:
resp = await api.get("/api/v1/stream/00000000-0000-0000-0000-000000000000")
assert resp.status_code == 401
async def test_upload_requires_auth(api: AsyncClient) -> None:
resp = await api.post(
"/api/v1/upload",
files={"file": ("x.mp3", b"data", "audio/mpeg")},
)
assert resp.status_code == 401
Generated
+903 -481
View File
File diff suppressed because it is too large Load Diff