Files
Senko-san fa23568214
Docker Build & Publish / build (push) Has been cancelled
Docker Build & Publish / push (push) Has been cancelled
Docker Build & Publish / Prune old image versions (push) Has been cancelled
feat(storage): library + disk statistics endpoint (§A6)
Implement `GET /api/v1/storage`, replacing the stub. Returns aggregate
library facts (track/artist/album counts, total footprint, playtime,
per-format / per-source / metadata-status breakdowns, top genres) plus
the real capacity of the backing volume.

- domain: `LibraryStats`, `FormatBreakdown`, `DiskUsage` value objects
- ports: `FileStorage.disk_usage()` (local = shutil.disk_usage walking up
  to the nearest existing ancestor; S3 returns None — no fixed disk)
- repo: `TrackRepository.library_stats()` (single set of GROUP BYs)
- tests: storage stats API (auth, empty library, upload counting)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-14 01:19:53 +03:00

96 lines
3.3 KiB
Python

"""LocalFileStorage — stores files on the local filesystem."""
import os
import shutil
from collections.abc import AsyncGenerator, AsyncIterator
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from pathlib import Path
import anyio
from app.domain.entities.storage import DiskUsage, ObjectStat
from app.domain.errors import StorageError
_EXT_CONTENT_TYPE: dict[str, str] = {
"mp3": "audio/mpeg",
"flac": "audio/flac",
"m4a": "audio/mp4",
"aac": "audio/aac",
"ogg": "audio/ogg",
"opus": "audio/ogg",
"wav": "audio/wav",
"wma": "audio/x-ms-wma",
"aiff": "audio/aiff",
"aif": "audio/aiff",
}
class LocalFileStorage:
def __init__(self, media_path: Path) -> None:
self._media_path = media_path
async def save_file(self, key: str, src_path: Path) -> int:
dest = self._media_path / key
dest.parent.mkdir(parents=True, exist_ok=True)
part = dest.with_suffix(dest.suffix + ".part")
shutil.copyfile(str(src_path), str(part))
os.replace(str(part), str(dest))
return dest.stat().st_size
async def open_range(
self, key: str, start: int, end: int | None
) -> tuple[AsyncIterator[bytes], int]:
path = self._media_path / key
if not path.exists():
raise StorageError(f"Object not found: {key}")
total_size = path.stat().st_size
_start = start
_end = end
_total_size = total_size
_path = path
async def _iter() -> AsyncGenerator[bytes]:
async with await anyio.open_file(_path, "rb") as f:
await f.seek(_start)
remaining = (_end - _start + 1) if _end is not None else (_total_size - _start)
while remaining > 0:
chunk: bytes = await f.read(min(65536, remaining))
if not chunk:
break
yield chunk
remaining -= len(chunk)
aiter: AsyncIterator[bytes] = _iter()
return aiter, total_size
async def stat(self, key: str) -> ObjectStat:
path = self._media_path / key
if not path.exists():
raise StorageError(f"Object not found: {key}")
st = path.stat()
ext = path.suffix.lower().lstrip(".")
return ObjectStat(size=st.st_size, content_type=_EXT_CONTENT_TYPE.get(ext))
async def exists(self, key: str) -> bool:
return (self._media_path / key).exists()
async def delete(self, key: str) -> None:
(self._media_path / key).unlink(missing_ok=True)
async def disk_usage(self) -> DiskUsage | None:
# The media root may not exist yet on a fresh instance — walk up to the
# nearest existing ancestor so we still report the underlying volume.
path = self._media_path
while not path.exists() and path != path.parent:
path = path.parent
usage = await anyio.to_thread.run_sync(shutil.disk_usage, str(path))
return DiskUsage(total=usage.total, used=usage.used, free=usage.free)
def as_local_path(self, key: str) -> AbstractAsyncContextManager[Path]:
return self._as_local_path_cm(key)
@asynccontextmanager
async def _as_local_path_cm(self, key: str) -> AsyncGenerator[Path]:
yield self._media_path / key