Project started 🍾

This commit is contained in:
2026-06-01 18:47:59 +03:00
commit 4bca90a50e
39 changed files with 2340 additions and 0 deletions
+13
View File
@@ -0,0 +1,13 @@
.git
.gitignore
.venv
__pycache__/
*.py[oc]
.mypy_cache/
.ruff_cache/
.pytest_cache/
.env
*.md
tests/
media/
data/
+29
View File
@@ -0,0 +1,29 @@
# Copy to .env and adjust. Never commit real secrets.
# runtime
ENVIRONMENT=dev # dev | test | prod
LOG_LEVEL=INFO
LOG_JSON=false # true in prod
# database (async driver required)
DATABASE_URL=postgresql+asyncpg://mcma:mcma@localhost:5432/mcma
DB_ECHO=false
# redis (cache + arq broker)
REDIS_URL=redis://localhost:6379/0
# auth — GENERATE a strong secret for prod: `openssl rand -hex 32`
JWT_SECRET=change-me-in-prod
ACCESS_TOKEN_TTL_SECONDS=900
REFRESH_TOKEN_TTL_SECONDS=2592000
# media / storage
MEDIA_PATH=/data/media
TRANSCODE_CACHE_PATH=/data/transcode-cache
MAX_PARALLEL_DOWNLOADS=2
# external services (all optional — backend degrades gracefully if unset)
# ML_SERVICE_URL=http://ml:9000
# ACOUSTID_API_KEY=
MUSICBRAINZ_USER_AGENT=mcma-backend/0.1.0 ( https://github.com/your/repo )
# YOUTUBE_COOKIES_PATH=/data/cookies.txt
+22
View File
@@ -0,0 +1,22 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
# Tooling caches
.mypy_cache/
.ruff_cache/
.pytest_cache/
# Secrets & local config
.env
# Local data (media, transcode cache, db dumps)
/data/
media/
+1
View File
@@ -0,0 +1 @@
3.14
+70
View File
@@ -0,0 +1,70 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## What this is
`mcma-backend` — a self-hosted, **offline-first** music service (FOSS, homelab scale: unitstens of users). Downloads music from external sources (yt-dlp etc.), stores files + metadata, streams to clients, serves a native REST API plus a Subsonic-compatible API. The server is a **sync layer**, not the sole source of truth — clients are expected to work offline, which shapes the data model (timestamps, event-logs) even before sync is built.
Status: **Phase 1 (MVP)**. Step 1 (skeleton) is done; the data model, auth, sources, enrichment, streaming, and Subsonic layer follow. There is no in-repo plan document — the build order and deferred work live in Claude's project memory.
## Commands
Uses **uv** (managed Python 3.14) for everything.
```bash
uv sync # install deps (runtime + dev group)
uv run uvicorn app.main:app --reload # run API (needs Postgres + Redis up)
uv run arq app.workers.arq_worker.WorkerSettings # run background worker
uv run ruff check . # lint
uv run ruff format . # format
uv run mypy app # type-check (strict)
uv run pytest # all tests
uv run pytest tests/test_health.py::test_liveness_ok # single test
uv run alembic revision --autogenerate -m "msg" # new migration (after model changes)
uv run alembic upgrade head # apply migrations
docker compose up --build # full stack: api, worker, db, redis
```
Tests run in-process against the ASGI app (httpx + asgi-lifespan) — no server, no network. They do **not** require a running DB/Redis: dependency-backed checks degrade rather than fail.
## Architecture — hexagonal (ports & adapters)
Dependencies point **inward**. The domain knows nothing about frameworks; outer layers are wired together only at the composition root.
- `app/domain/` — pure business core: entities, value objects, errors, and **ports** (Protocols). No framework imports (no FastAPI/SQLAlchemy/redis here, ever).
- `app/application/` — use cases / services. Depend on domain ports, never on concrete adapters.
- `app/infrastructure/` — driven adapters: ORM models + repositories (`db/`), Redis client (`cache/`), source backends, ML/HTTP clients. All vendor-specific code is confined here.
- `app/api/` — driving adapter (FastAPI): routers, schemas, `deps.py` (request-scoped wiring / composition root), middleware, error mapping.
- `app/core/` — cross-cutting: config, logging, security.
- `app/workers/` — arq background tasks (heavy I/O and CPU work).
**Composition roots** are `app/main.py` (`create_app`, lifespan) and `app/api/deps.py` (binds concrete adapters to ports per request). When adding a feature: define the port in `domain`, the service in `application`, the adapter in `infrastructure`, and wire it in `deps.py`.
### Cross-cutting conventions
- **Errors:** services raise framework-agnostic exceptions from `app/domain/errors.py`. `app/api/errors.py` is the *only* place that maps domain errors → HTTP status codes. Don't reference HTTP status from domain/application code.
- **Config:** `app/core/config.py``get_settings()` is a cached singleton over pydantic-settings. Everything comes from env (or `.env` in dev); nothing is hardcoded. `database_url` must use the `+asyncpg` driver (validated).
- **Logging:** `app/core/logging.py` — structlog, console in dev / JSON in prod. A `correlation_id` contextvar is bound per HTTP request (middleware) and should be bound per worker task; it auto-attaches to every log line.
- **DB sessions:** API uses `SessionDep` (request-scoped, commit-on-success/rollback-on-error). Workers/scripts use `session_scope()` from `app.infrastructure.db`.
- **Migrations:** Alembic is async and settings-driven — the URL is injected in `alembic/env.py`, never written to `alembic.ini`. Uncomment the models import in `env.py` once ORM models exist so autogenerate sees them.
- **Health:** `/health` is liveness; `/health/ready` checks db + redis (required) + ml (optional). Every dependency check is wrapped in `asyncio.timeout` — a readiness probe must never hang.
## Non-negotiable domain invariants
These exist for future sync + ML and are easy to violate by accident:
- **Likes are an append-only event-log, not a boolean.** Current state = the latest event per `(user, track)`. Never update a like row in place.
- **`track.id` is the stable `content_id`** — a uuid that never changes; it's the basis of client-side caching. Don't regenerate it.
- **Dedup on both** `(source, source_id)` **and** `acoustid_fingerprint`. Downloads/imports must be idempotent.
- **Graceful degradation is mandatory.** ML, music sources, MusicBrainz/AcoustID are all optional. One being down must degrade a feature, never crash the system. The ML service is *not* a required dependency — radio must still work (worse) without it.
- **Never overwrite `metadata_status = manual`** with automatic enrichment.
- **No heavy work in the request cycle.** ffmpeg, fingerprinting, downloads — all go to arq workers.
- **Subsonic compatibility is a strategic goal:** the Subsonic API is an adapter over services, not a reimplementation of logic.
## Python 3.14 note
The project targets Python 3.14, which makes annotations lazy by default (PEP 649). `from __future__ import annotations` is therefore intentionally **absent** — do not add it back. All pins (pyproject `requires-python`, ruff `target-version`, mypy `python_version`, Dockerfile, `.python-version`) are on 3.14.
+34
View File
@@ -0,0 +1,34 @@
# syntax=docker/dockerfile:1
# Single image serves both `api` and `worker` (different commands in compose).
FROM python:3.14-slim AS base
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
UV_COMPILE_BYTECODE=1 \
UV_LINK_MODE=copy \
VIRTUAL_ENV=/app/.venv \
PATH="/app/.venv/bin:$PATH"
# Runtime tools: ffmpeg (transcode/HLS), fpcalc (Chromaprint fingerprinting).
RUN apt-get update \
&& apt-get install -y --no-install-recommends ffmpeg libchromaprint-tools \
&& rm -rf /var/lib/apt/lists/*
COPY --from=ghcr.io/astral-sh/uv:0.4 /uv /uvx /bin/
WORKDIR /app
# Dependency layer — cached unless lockfile changes.
COPY pyproject.toml uv.lock* ./
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --no-install-project --no-dev
# Project layer.
COPY . .
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --no-dev
EXPOSE 8000
# Default: API server. Worker overrides command in docker-compose.
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
+68
View File
@@ -0,0 +1,68 @@
# mcma-backend
Self-hosted, **offline-first** music service — backend. Searches/downloads music
from external sources, stores files + metadata, streams to clients, and serves a
native REST API plus a Subsonic-compatible API.
> Status: **Phase 1 (core/MVP)** — project skeleton in place. See the
> implementation plan for the full roadmap.
## Architecture — hexagonal (ports & adapters)
```
app/
├── domain/ pure core: entities, value objects, errors, ports (Protocols)
├── application/ use cases / services — orchestrate domain via ports
├── infrastructure/ driven adapters: ORM, repositories, db, redis, sources, ML client
│ ├── db/ declarative base, async engine, session factory
│ └── cache/ redis client
├── api/ driving adapter: FastAPI routers, schemas, deps, middleware
├── workers/ arq background tasks (download, enrich, transcode)
└── core/ cross-cutting: config, logging, security
```
**Rule:** dependencies point inward. `domain` imports nothing framework-specific;
`application` depends on domain ports; `infrastructure`/`api` are the outer ring
and are wired together at the composition root (`app/main.py`, `app/api/deps.py`).
## Quick start (Docker)
```bash
cp .env.example .env # then set a real JWT_SECRET
docker compose up --build # api on :8000, worker, postgres, redis
curl localhost:8000/health # {"status":"ok"}
curl localhost:8000/health/ready
```
## Local dev (without Docker)
```bash
uv sync # install deps (uses managed Python 3.14)
# start Postgres + Redis (e.g. `docker compose up db redis`)
cp .env.example .env
uv run uvicorn app.main:app --reload
```
## Tooling
```bash
uv run ruff check . # lint
uv run ruff format . # format
uv run mypy app # type-check (strict)
uv run pytest # tests
```
## Database migrations (Alembic)
```bash
uv run alembic revision --autogenerate -m "message" # after model changes
uv run alembic upgrade head # apply
```
The DB URL is injected from app settings — never hardcoded in `alembic.ini`.
## Configuration
All settings come from environment variables (or `.env` in dev). See
[`.env.example`](.env.example). External services (ML, AcoustID, MusicBrainz)
are **optional** — the backend degrades gracefully when they are absent.
+42
View File
@@ -0,0 +1,42 @@
# Alembic configuration. The database URL is injected at runtime from
# app settings (see alembic/env.py) — do NOT hardcode it here.
[alembic]
script_location = alembic
prepend_sys_path = .
path_separator = os
# Use a timestamp + slug for migration filenames.
file_template = %%(year)d%%(month).2d%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
+67
View File
@@ -0,0 +1,67 @@
"""Alembic environment — async, settings-driven, model-aware.
The DB URL comes from app settings (env), never alembic.ini. ``target_metadata``
points at the ORM ``Base.metadata``; importing ``app.infrastructure.db.models``
registers every model so autogenerate sees the full schema.
"""
import asyncio
from logging.config import fileConfig
from alembic import context
from app.core.config import get_settings
from app.infrastructure.db import Base
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
# Import side-effect: registers all ORM models on Base.metadata.
# Uncomment once models exist (plan §11 step 2):
# import app.infrastructure.db.models
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
config.set_main_option("sqlalchemy.url", get_settings().database_url)
target_metadata = Base.metadata
def _run_migrations(connection: Connection) -> None:
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
compare_server_default=True,
render_as_batch=False,
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_offline() -> None:
context.configure(
url=config.get_main_option("sqlalchemy.url"),
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
async def run_migrations_online() -> None:
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
)
async with connectable.connect() as connection:
await connection.run_sync(_run_migrations)
await connectable.dispose()
if context.is_offline_mode():
run_migrations_offline()
else:
asyncio.run(run_migrations_online())
+28
View File
@@ -0,0 +1,28 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from __future__ import annotations
from collections.abc import Sequence
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: str | None = ${repr(down_revision)}
branch_labels: str | Sequence[str] | None = ${repr(branch_labels)}
depends_on: str | Sequence[str] | None = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}
View File
+14
View File
@@ -0,0 +1,14 @@
"""mcma-backend — self-hosted, offline-first music service.
Hexagonal (ports & adapters) architecture:
* ``app.domain`` — pure business core: entities, value objects, errors,
and *ports* (Protocols). No framework imports.
* ``app.application`` — use cases / services. Orchestrate domain via ports.
* ``app.infrastructure`` — driven adapters: ORM models, repositories, db, redis,
source backends, ML/HTTP clients.
* ``app.api`` — driving adapter: FastAPI routers & schemas.
* ``app.core`` — cross-cutting concerns: config, logging, security.
"""
__version__ = "0.1.0"
+1
View File
@@ -0,0 +1 @@
"""Driving adapter — FastAPI routers, schemas, dependency wiring."""
+30
View File
@@ -0,0 +1,30 @@
"""Shared FastAPI dependencies — the composition root for request-scoped wiring.
Concrete adapters are bound to ports here so routers and services stay
decoupled from infrastructure. Repository/service providers are added in
later steps as the domain grows.
"""
from collections.abc import AsyncIterator
from typing import Annotated
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.infrastructure.db import get_sessionmaker
async def get_session() -> AsyncIterator[AsyncSession]:
"""Request-scoped DB session. Commits on success, rolls back on exception."""
session = get_sessionmaker()()
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
SessionDep = Annotated[AsyncSession, Depends(get_session)]
+50
View File
@@ -0,0 +1,50 @@
"""Maps domain exceptions to HTTP responses. The only place that knows both."""
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from app.core.logging import get_logger
from app.domain.errors import (
AlreadyExistsError,
AuthenticationError,
ConflictError,
DependencyUnavailableError,
DomainError,
NotFoundError,
PermissionDeniedError,
ValidationError,
)
log = get_logger(__name__)
_STATUS_BY_ERROR: dict[type[DomainError], int] = {
NotFoundError: status.HTTP_404_NOT_FOUND,
AlreadyExistsError: status.HTTP_409_CONFLICT,
ConflictError: status.HTTP_409_CONFLICT,
ValidationError: status.HTTP_422_UNPROCESSABLE_CONTENT,
AuthenticationError: status.HTTP_401_UNAUTHORIZED,
PermissionDeniedError: status.HTTP_403_FORBIDDEN,
DependencyUnavailableError: status.HTTP_503_SERVICE_UNAVAILABLE,
}
def _error_body(code: str, message: str) -> dict[str, dict[str, str]]:
return {"error": {"code": code, "message": message}}
def register_exception_handlers(app: FastAPI) -> None:
@app.exception_handler(DomainError)
async def _handle_domain_error(_request: Request, exc: DomainError) -> JSONResponse:
http_status = _STATUS_BY_ERROR.get(type(exc), status.HTTP_400_BAD_REQUEST)
return JSONResponse(
status_code=http_status,
content=_error_body(exc.code, exc.message),
)
@app.exception_handler(Exception)
async def _handle_unexpected(_request: Request, exc: Exception) -> JSONResponse:
log.error("unhandled_exception", exc_info=exc)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=_error_body("internal_error", "An unexpected error occurred."),
)
+83
View File
@@ -0,0 +1,83 @@
"""Health & readiness endpoints — used by compose healthchecks and the admin UI.
* ``/health`` — liveness: the process is up. Always 200 if serving.
* ``/health/ready`` — readiness: checks DB, Redis, and (optionally) ML.
Returns 503 if a *required* dependency is down. ML is optional — its absence
degrades, never fails, readiness (graceful degradation, see plan §6.5).
"""
import asyncio
from typing import Literal
from fastapi import APIRouter, Response, status
from pydantic import BaseModel
from sqlalchemy import text
from app.core.config import get_settings
from app.core.logging import get_logger
from app.infrastructure.cache import get_redis
from app.infrastructure.db import get_sessionmaker
log = get_logger(__name__)
router = APIRouter(tags=["health"])
CheckStatus = Literal["ok", "down", "skipped"]
# A readiness probe must answer fast and never hang — bound every dependency
# check. A check that exceeds this is reported "down".
CHECK_TIMEOUT_SECONDS = 2.0
class HealthResponse(BaseModel):
status: Literal["ok"] = "ok"
class ReadinessResponse(BaseModel):
status: Literal["ready", "degraded"]
checks: dict[str, CheckStatus]
@router.get("/health", response_model=HealthResponse)
async def health() -> HealthResponse:
return HealthResponse()
async def _check_db() -> CheckStatus:
try:
async with asyncio.timeout(CHECK_TIMEOUT_SECONDS):
async with get_sessionmaker()() as session:
await session.execute(text("SELECT 1"))
return "ok"
except Exception as exc:
log.warning("healthcheck_db_down", error=str(exc))
return "down"
async def _check_redis() -> CheckStatus:
try:
async with asyncio.timeout(CHECK_TIMEOUT_SECONDS):
await get_redis().ping()
return "ok"
except Exception as exc:
log.warning("healthcheck_redis_down", error=str(exc))
return "down"
async def _check_ml() -> CheckStatus:
# Optional dependency. A real client lands in step 12; absence is fine.
return "skipped" if get_settings().ml_service_url is None else "ok"
@router.get("/health/ready", response_model=ReadinessResponse)
async def readiness(response: Response) -> ReadinessResponse:
db, redis, ml = await asyncio.gather(_check_db(), _check_redis(), _check_ml())
checks: dict[str, CheckStatus] = {"database": db, "redis": redis, "ml": ml}
required_down = db == "down" or redis == "down"
if required_down:
response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE
return ReadinessResponse(
status="degraded" if required_down else "ready",
checks=checks,
)
+52
View File
@@ -0,0 +1,52 @@
"""HTTP middleware: bind a correlation id and log each request."""
import time
import uuid
from starlette.types import ASGIApp, Message, Receive, Scope, Send
from app.core.logging import correlation_id, get_logger
log = get_logger("http")
_HEADER = "x-correlation-id"
class CorrelationIdMiddleware:
"""Pure-ASGI middleware: reuse inbound ``X-Correlation-Id`` or mint one,
bind it for downstream logs, echo it back, and log request completion."""
def __init__(self, app: ASGIApp) -> None:
self.app = app
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return
headers = dict(scope["headers"])
inbound = headers.get(_HEADER.encode())
cid = inbound.decode() if inbound else uuid.uuid4().hex
token = correlation_id.set(cid)
started = time.perf_counter()
status_code = 0
async def send_wrapper(message: Message) -> None:
nonlocal status_code
if message["type"] == "http.response.start":
status_code = message["status"]
message.setdefault("headers", [])
message["headers"].append((_HEADER.encode(), cid.encode()))
await send(message)
try:
await self.app(scope, receive, send_wrapper)
finally:
log.info(
"request",
method=scope["method"],
path=scope["path"],
status=status_code,
duration_ms=round((time.perf_counter() - started) * 1000, 1),
)
correlation_id.reset(token)
+6
View File
@@ -0,0 +1,6 @@
"""Application layer — use cases / services.
Services orchestrate the domain through *ports* (Protocols defined in
``app.domain``). They never import concrete adapters directly; adapters are
injected at the composition root (``app.main`` / ``app.api.deps``).
"""
+25
View File
@@ -0,0 +1,25 @@
"""Management CLI (``mcma``). Admin/seed commands land here in later steps.
For now it exposes ``mcma version``. ``mcma create-admin`` arrives with auth
(plan §11 step 3).
"""
import argparse
from app import __version__
def main() -> None:
parser = argparse.ArgumentParser(prog="mcma", description="mcma-backend management CLI")
sub = parser.add_subparsers(dest="command")
sub.add_parser("version", help="Print the backend version")
args = parser.parse_args()
if args.command == "version":
print(__version__)
else:
parser.print_help()
if __name__ == "__main__":
main()
+1
View File
@@ -0,0 +1 @@
"""Cross-cutting concerns: configuration, logging, security."""
+74
View File
@@ -0,0 +1,74 @@
"""Application settings — single source of truth, sourced from environment.
Nothing is hardcoded. All knobs come from env vars (or a local ``.env`` during
development). Access the cached singleton via :func:`get_settings`.
"""
from functools import lru_cache
from pathlib import Path
from typing import Literal
from pydantic import Field, SecretStr, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
case_sensitive=False,
)
# -- runtime ----------------------------------------------------------
environment: Literal["dev", "test", "prod"] = "dev"
log_level: str = "INFO"
log_json: bool = Field(
default=False,
description="Structured JSON logs (enable in prod).",
)
# -- database ---------------------------------------------------------
# Async driver required (asyncpg). Example:
# postgresql+asyncpg://mcma:mcma@db:5432/mcma
database_url: str = "postgresql+asyncpg://mcma:mcma@localhost:5432/mcma"
db_echo: bool = False
db_pool_size: int = 5
db_max_overflow: int = 10
# -- redis (cache + arq broker) --------------------------------------
redis_url: str = "redis://localhost:6379/0"
# -- auth -------------------------------------------------------------
jwt_secret: SecretStr = SecretStr("change-me-in-prod")
jwt_algorithm: str = "HS256"
access_token_ttl_seconds: int = 60 * 15 # 15 min
refresh_token_ttl_seconds: int = 60 * 60 * 24 * 30 # 30 days (offline-first)
# -- media / storage --------------------------------------------------
media_path: Path = Path("/data/media")
transcode_cache_path: Path = Path("/data/transcode-cache")
max_parallel_downloads: int = 2
# -- external services (all optional; graceful degradation) ----------
ml_service_url: str | None = None
acoustid_api_key: SecretStr | None = None
musicbrainz_user_agent: str = "mcma-backend/0.1.0 ( https://github.com/your/repo )"
youtube_cookies_path: Path | None = None
@field_validator("database_url")
@classmethod
def _require_async_driver(cls, v: str) -> str:
if "+asyncpg" not in v:
raise ValueError("database_url must use the asyncpg driver: postgresql+asyncpg://")
return v
@property
def is_prod(self) -> bool:
return self.environment == "prod"
@lru_cache
def get_settings() -> Settings:
"""Cached settings singleton. Patch the cache in tests via ``get_settings.cache_clear()``."""
return Settings()
+52
View File
@@ -0,0 +1,52 @@
"""Structured logging via structlog.
Emits key=value (dev) or JSON (prod) with a per-request/task ``correlation_id``
bound through a contextvar. Call :func:`configure_logging` once at startup.
"""
import logging
from contextvars import ContextVar
from typing import cast
import structlog
from structlog.typing import EventDict, FilteringBoundLogger, Processor, WrappedLogger
# Bound by middleware (HTTP) and worker entrypoints (tasks) so every log line
# downstream carries the same id without explicit passing.
correlation_id: ContextVar[str | None] = ContextVar("correlation_id", default=None)
def _add_correlation_id(
_logger: WrappedLogger, _method: str, event_dict: EventDict
) -> EventDict:
cid = correlation_id.get()
if cid is not None:
event_dict["correlation_id"] = cid
return event_dict
def configure_logging(*, level: str = "INFO", json: bool = False) -> None:
shared: list[Processor] = [
structlog.contextvars.merge_contextvars,
_add_correlation_id,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
]
renderer = (
structlog.processors.JSONRenderer() if json else structlog.dev.ConsoleRenderer(colors=True)
)
structlog.configure(
processors=[*shared, renderer],
wrapper_class=structlog.make_filtering_bound_logger(logging.getLevelName(level.upper())),
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
# Route stdlib logging (uvicorn, sqlalchemy) through structlog formatting.
logging.basicConfig(level=level.upper(), format="%(message)s")
def get_logger(name: str | None = None) -> FilteringBoundLogger:
return cast(FilteringBoundLogger, structlog.get_logger(name))
+5
View File
@@ -0,0 +1,5 @@
"""Pure business core — entities, value objects, errors, and ports (Protocols).
This package MUST NOT import frameworks (FastAPI, SQLAlchemy, redis, …).
Adapters depend on the domain; the domain depends on nothing but stdlib.
"""
+63
View File
@@ -0,0 +1,63 @@
"""Domain exception hierarchy — framework-agnostic.
Services raise these; the API layer maps them to HTTP responses
(see ``app.api.errors``). The domain never references HTTP status codes.
"""
class DomainError(Exception):
"""Base for all expected, business-meaningful failures.
``code`` is a stable, machine-readable identifier returned to clients.
"""
code: str = "domain_error"
def __init__(self, message: str | None = None) -> None:
super().__init__(message or self.__class__.__doc__ or self.code)
self.message = str(self)
class NotFoundError(DomainError):
"""Requested resource does not exist."""
code = "not_found"
class AlreadyExistsError(DomainError):
"""Resource conflicts with an existing one (e.g. duplicate)."""
code = "already_exists"
class ConflictError(DomainError):
"""Operation conflicts with current state (e.g. stale version on write)."""
code = "conflict"
class ValidationError(DomainError):
"""Input is well-formed but violates a business rule."""
code = "validation_error"
class AuthenticationError(DomainError):
"""Caller could not be authenticated."""
code = "authentication_error"
class PermissionDeniedError(DomainError):
"""Caller authenticated but not authorized for this action."""
code = "permission_denied"
class DependencyUnavailableError(DomainError):
"""An external dependency (source, ML, MusicBrainz) is unavailable.
Callers should degrade gracefully rather than propagate as a hard 500.
"""
code = "dependency_unavailable"
+6
View File
@@ -0,0 +1,6 @@
"""Driven adapters — concrete implementations of domain ports.
ORM models, repositories, the async DB engine, the Redis client, source
backends, and external HTTP clients live here. Everything framework- and
vendor-specific is confined to this package.
"""
+5
View File
@@ -0,0 +1,5 @@
"""Redis adapter: shared connection pool for cache and arq broker."""
from app.infrastructure.cache.redis import close_redis, get_redis
__all__ = ["close_redis", "get_redis"]
+26
View File
@@ -0,0 +1,26 @@
"""Redis client provider — a single shared async connection pool."""
from redis.asyncio import Redis
from app.core.config import get_settings
_client: Redis | None = None
def get_redis() -> Redis:
"""Return the process-wide Redis client (created on first use)."""
global _client
if _client is None:
_client = Redis.from_url(
str(get_settings().redis_url),
encoding="utf-8",
decode_responses=True,
)
return _client
async def close_redis() -> None:
global _client
if _client is not None:
await _client.aclose()
_client = None
+17
View File
@@ -0,0 +1,17 @@
"""Database adapter: declarative base, async engine, session factory."""
from app.infrastructure.db.base import Base
from app.infrastructure.db.engine import (
dispose_engine,
get_engine,
get_sessionmaker,
session_scope,
)
__all__ = [
"Base",
"dispose_engine",
"get_engine",
"get_sessionmaker",
"session_scope",
]
+22
View File
@@ -0,0 +1,22 @@
"""Declarative base with a fixed naming convention.
The naming convention makes Alembic autogenerate deterministic, named
constraints — essential for clean, reversible migrations.
"""
from sqlalchemy import MetaData
from sqlalchemy.orm import DeclarativeBase
NAMING_CONVENTION = {
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s",
}
class Base(DeclarativeBase):
"""Base for all ORM models. Import models so Alembic sees their metadata."""
metadata = MetaData(naming_convention=NAMING_CONVENTION)
+61
View File
@@ -0,0 +1,61 @@
"""Async engine + session factory, created lazily from settings.
The engine is process-global and cached. The API binds a session per request
(see ``app.api.deps``); workers and scripts use :func:`session_scope`.
"""
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from functools import lru_cache
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from app.core.config import get_settings
@lru_cache
def get_engine() -> AsyncEngine:
settings = get_settings()
return create_async_engine(
settings.database_url,
echo=settings.db_echo,
pool_size=settings.db_pool_size,
max_overflow=settings.db_max_overflow,
pool_pre_ping=True, # survive Postgres restarts / idle drops
)
@lru_cache
def get_sessionmaker() -> async_sessionmaker[AsyncSession]:
return async_sessionmaker(
bind=get_engine(),
expire_on_commit=False,
autoflush=False,
)
@asynccontextmanager
async def session_scope() -> AsyncIterator[AsyncSession]:
"""Transactional session for workers/scripts: commit on success, rollback on error."""
session = get_sessionmaker()()
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def dispose_engine() -> None:
"""Dispose the pooled engine on shutdown. Safe to call if never initialized."""
if get_engine.cache_info().currsize:
await get_engine().dispose()
get_engine.cache_clear()
get_sessionmaker.cache_clear()
+51
View File
@@ -0,0 +1,51 @@
"""FastAPI composition root: wiring, lifespan, middleware, routers."""
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.api.errors import register_exception_handlers
from app.api.health import router as health_router
from app.api.middleware import CorrelationIdMiddleware
from app.core.config import get_settings
from app.core.logging import configure_logging, get_logger
from app.infrastructure.cache import close_redis
from app.infrastructure.db import dispose_engine
log = get_logger(__name__)
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncIterator[None]:
settings = get_settings()
log.info("startup", environment=settings.environment)
yield
log.info("shutdown")
await dispose_engine()
await close_redis()
def create_app() -> FastAPI:
settings = get_settings()
configure_logging(level=settings.log_level, json=settings.log_json)
app = FastAPI(
title="mcma-backend",
version="0.1.0",
summary="Self-hosted, offline-first music service.",
lifespan=lifespan,
)
app.add_middleware(CorrelationIdMiddleware)
register_exception_handlers(app)
app.include_router(health_router)
# Versioned API routers (auth, library, …) are mounted in later steps:
# app.include_router(api_v1_router, prefix="/api/v1")
# app.include_router(subsonic_router, prefix="/rest")
return app
app = create_app()
+4
View File
@@ -0,0 +1,4 @@
"""arq worker — background tasks (downloads, enrichment, transcoding).
CPU/IO-heavy work runs here, never in the request cycle (plan §2.6).
"""
+32
View File
@@ -0,0 +1,32 @@
"""arq worker settings — the queue runtime. Task functions register here.
Run with: ``arq app.workers.arq_worker.WorkerSettings``.
Tasks (download, enrich, transcode) are appended to ``functions`` in later steps.
"""
from typing import Any, ClassVar
from arq.connections import RedisSettings
from app.core.config import get_settings
from app.core.logging import configure_logging, get_logger
log = get_logger("worker")
async def startup(_ctx: dict[str, Any]) -> None:
settings = get_settings()
configure_logging(level=settings.log_level, json=settings.log_json)
log.info("worker_startup", environment=settings.environment)
async def shutdown(_ctx: dict[str, Any]) -> None:
log.info("worker_shutdown")
class WorkerSettings:
functions: ClassVar[list[Any]] = [] # populated as tasks are implemented
on_startup = startup
on_shutdown = shutdown
max_jobs = get_settings().max_parallel_downloads
redis_settings = RedisSettings.from_dsn(str(get_settings().redis_url))
+97
View File
@@ -0,0 +1,97 @@
# Dev/prod stack. Worker reuses the api image with a different command.
# The ML service is intentionally a commented placeholder — it is optional
# (graceful degradation) and lands in a later phase.
services:
api:
build: .
command: uvicorn app.main:app --host 0.0.0.0 --port 8000
env_file: .env
environment:
DATABASE_URL: postgresql+asyncpg://mcma:mcma@db:5432/mcma
REDIS_URL: redis://redis:6379/0
ports:
- "8000:8000"
volumes:
- media:/data/media
- transcode_cache:/data/transcode-cache
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:8000/health').status==200 else 1)"]
interval: 30s
timeout: 5s
retries: 3
restart: unless-stopped
worker:
build: .
command: arq app.workers.arq_worker.WorkerSettings
env_file: .env
environment:
DATABASE_URL: postgresql+asyncpg://mcma:mcma@db:5432/mcma
REDIS_URL: redis://redis:6379/0
volumes:
- media:/data/media
- transcode_cache:/data/transcode-cache
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
restart: unless-stopped
db:
image: postgres:16-alpine
environment:
POSTGRES_USER: mcma
POSTGRES_PASSWORD: mcma
POSTGRES_DB: mcma
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U mcma"]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
redis:
image: redis:7-alpine
command: redis-server --save 60 1 --loglevel warning
volumes:
- redisdata:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
# Reverse proxy + auto-HTTPS (enable on prod with a real domain).
# caddy:
# image: caddy:2-alpine
# ports: ["80:80", "443:443"]
# volumes:
# - ./Caddyfile:/etc/caddy/Caddyfile
# - caddy_data:/data
# depends_on: [api]
# restart: unless-stopped
# ML recommendation service — OPTIONAL, added in a later phase.
# The backend must run fully without it (set ML_SERVICE_URL to enable).
# ml:
# image: mcma-ml:latest
# environment:
# MODEL_PATH: /models
# restart: unless-stopped
volumes:
pgdata:
redisdata:
media:
transcode_cache:
# caddy_data:
+87
View File
@@ -0,0 +1,87 @@
[project]
name = "mcma-backend"
version = "0.1.0"
description = "Self-hosted, offline-first music service — backend (hexagonal architecture)"
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
# web
"fastapi>=0.115",
"uvicorn[standard]>=0.32",
"python-multipart>=0.0.12",
# data
"sqlalchemy[asyncio]>=2.0.36",
"asyncpg>=0.30",
"alembic>=1.14",
# config / validation
"pydantic>=2.10",
"pydantic-settings>=2.6",
# cache / queue
"redis>=5.2",
"arq>=0.26",
# auth
"pyjwt>=2.10",
"pwdlib[argon2]>=0.2.1",
# outbound http (ML client, MusicBrainz, AcoustID)
"httpx>=0.28",
# logging
"structlog>=24.4",
]
[project.scripts]
mcma = "app.cli:main"
[dependency-groups]
dev = [
"pytest>=8.3",
"pytest-asyncio>=0.24",
"ruff>=0.8",
"mypy>=1.13",
"asgi-lifespan>=2.1",
]
[tool.uv]
package = true
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["app"]
# ---------------------------------------------------------------------------
# Tooling
# ---------------------------------------------------------------------------
[tool.ruff]
target-version = "py314"
line-length = 100
src = ["app", "tests"]
[tool.ruff.lint]
select = [
"E", "F", "W", # pycodestyle / pyflakes
"I", # isort
"N", # pep8-naming
"UP", # pyupgrade
"B", # bugbear
"ASYNC", # async pitfalls
"RUF",
]
ignore = ["B008"] # FastAPI Depends() in defaults is idiomatic
[tool.mypy]
python_version = "3.14"
strict = true
plugins = ["pydantic.mypy"]
warn_unused_ignores = true
disallow_untyped_defs = true
[[tool.mypy.overrides]]
module = ["arq.*"]
ignore_missing_imports = true
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
addopts = "-ra"
View File
+28
View File
@@ -0,0 +1,28 @@
"""Shared test fixtures.
The ASGI app is driven in-process via httpx + asgi-lifespan (no network, no
running server). DB/Redis-backed integration fixtures arrive with the data
layer (plan §11 step 2).
"""
import os
from collections.abc import AsyncIterator
import pytest
from asgi_lifespan import LifespanManager
from httpx import ASGITransport, AsyncClient
# Force a test-safe environment before settings load.
os.environ.setdefault("ENVIRONMENT", "test")
os.environ.setdefault("JWT_SECRET", "test-secret")
@pytest.fixture
async def client() -> AsyncIterator[AsyncClient]:
from app.main import create_app
app = create_app()
async with LifespanManager(app):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
+22
View File
@@ -0,0 +1,22 @@
"""Smoke tests for the health endpoints."""
from httpx import AsyncClient
async def test_liveness_ok(client: AsyncClient) -> None:
resp = await client.get("/health")
assert resp.status_code == 200
assert resp.json() == {"status": "ok"}
async def test_correlation_id_echoed(client: AsyncClient) -> None:
resp = await client.get("/health", headers={"X-Correlation-Id": "abc123"})
assert resp.headers.get("x-correlation-id") == "abc123"
async def test_readiness_reports_checks(client: AsyncClient) -> None:
# No DB/Redis running in unit env → degraded, but the contract holds.
resp = await client.get("/health/ready")
body = resp.json()
assert set(body["checks"]) == {"database", "redis", "ml"}
assert body["status"] in {"ready", "degraded"}
Generated
+1052
View File
File diff suppressed because it is too large Load Diff