Files
mcma-backend/app/api/v1/tracks.py
T
Senko-san 5c5df5d3cc feat(storage): S3-compatible storage adapter + storage_uri rename
Add S3FileStorage adapter (any S3-compatible backend: AWS, MinIO, Garage)
alongside the local adapter, selected via STORAGE_BACKEND. Proxied range
streaming via get_object+Range; as_local_path downloads to a tempfile for
ffmpeg/fpcalc. Rename track.file_path -> storage_uri across domain entity,
ORM, repositories, port, and services, with an Alembic migration. Adds
mocked S3 unit tests.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 17:11:35 +03:00

159 lines
4.8 KiB
Python

"""Track endpoints."""
import uuid
from typing import Any
from fastapi import APIRouter, Query, Response
from app.api.deps import AlbumRepoDep, ArtistRepoDep, CurrentUser, FileStorageDep, TrackRepoDep
from app.api.schemas.pagination import PagedResponse
from app.api.schemas.track import TrackOut, TrackUpdate
from app.domain.entities.album import Album
from app.domain.entities.track import Artist, Track
from app.domain.errors import NotFoundError
router = APIRouter(prefix="/tracks", tags=["tracks"])
async def _build_track_out(
tracks: list[Track],
artists: dict[uuid.UUID, Artist],
albums: dict[uuid.UUID, Album],
) -> list[TrackOut]:
return [
TrackOut(
id=t.id,
title=t.title,
artist_id=t.artist_id,
artist_name=artists[t.artist_id].name if t.artist_id in artists else "Unknown Artist",
album_id=t.album_id,
album_title=albums[t.album_id].title if t.album_id and t.album_id in albums else None,
duration_seconds=t.duration_seconds,
file_format=t.file_format,
file_size=t.file_size,
metadata_status=t.metadata_status,
source=t.source,
created_at=t.created_at,
)
for t in tracks
]
@router.get("")
async def list_tracks(
track_repo: TrackRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
_: CurrentUser,
artist_id: uuid.UUID | None = None,
album_id: uuid.UUID | None = None,
q: str | None = None,
sort_by: str = Query("created_at", pattern="^(title|created_at|artist)$"),
order: str = Query("desc", pattern="^(asc|desc)$"),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
) -> PagedResponse[TrackOut]:
tracks = await track_repo.list(
artist_id=artist_id,
album_id=album_id,
q=q,
sort_by=sort_by,
order=order,
limit=limit,
offset=offset,
)
total = await track_repo.count(artist_id=artist_id, album_id=album_id, q=q)
artist_ids = list({t.artist_id for t in tracks})
album_ids = list({t.album_id for t in tracks if t.album_id is not None})
artists = {a.id: a for a in await artist_repo.get_many(artist_ids)}
albums = {a.id: a for a in await album_repo.get_many(album_ids)}
items = await _build_track_out(tracks, artists, albums)
return PagedResponse(items=items, total=total, limit=limit, offset=offset)
@router.get("/{track_id}")
async def get_track(
track_id: uuid.UUID,
track_repo: TrackRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
_: CurrentUser,
) -> TrackOut:
track = await track_repo.get_by_id(track_id)
if track is None:
raise NotFoundError(f"Track {track_id} not found.")
artist_ids = [track.artist_id]
album_ids = [track.album_id] if track.album_id else []
artists = {a.id: a for a in await artist_repo.get_many(artist_ids)}
albums = {a.id: a for a in await album_repo.get_many(album_ids)}
items = await _build_track_out([track], artists, albums)
return items[0]
@router.patch("/{track_id}")
async def update_track(
track_id: uuid.UUID,
body: TrackUpdate,
track_repo: TrackRepoDep,
artist_repo: ArtistRepoDep,
album_repo: AlbumRepoDep,
_: CurrentUser,
) -> TrackOut:
track = await track_repo.update(
track_id,
title=body.title,
genre=body.genre,
year=body.year,
)
artist_ids = [track.artist_id]
album_ids = [track.album_id] if track.album_id else []
artists = {a.id: a for a in await artist_repo.get_many(artist_ids)}
albums = {a.id: a for a in await album_repo.get_many(album_ids)}
items = await _build_track_out([track], artists, albums)
return items[0]
@router.delete("/{track_id}", status_code=204)
async def delete_track(
track_id: uuid.UUID,
track_repo: TrackRepoDep,
storage: FileStorageDep,
_: CurrentUser,
) -> Response:
track = await track_repo.get_by_id(track_id)
if track is None:
raise NotFoundError(f"Track {track_id} not found.")
await track_repo.delete(track_id)
await storage.delete(track.storage_uri)
return Response(status_code=204)
@router.get("/{track_id}/similar")
async def get_similar_tracks(track_id: uuid.UUID, _: CurrentUser) -> Any: ...
@router.post("/{track_id}/optimize")
async def optimize_track(track_id: uuid.UUID, _: CurrentUser) -> Any: ...
@router.get("/{track_id}/cover")
async def get_track_cover(track_id: uuid.UUID, _: CurrentUser) -> Any: ...
@router.post("/{track_id}/metadata/enrich")
async def enrich_metadata(track_id: uuid.UUID, _: CurrentUser) -> Any: ...
@router.get("/{track_id}/metadata/matches")
async def get_metadata_matches(track_id: uuid.UUID, _: CurrentUser) -> Any: ...
@router.put("/{track_id}/metadata")
async def set_metadata(track_id: uuid.UUID, _: CurrentUser) -> Any: ...