Files
Senko-san 7a17e3babd feat(subsonic): per-user encrypted app-password foundation
Subsonic auth (t=md5(password+salt), legacy p=) needs a recoverable secret,
but login passwords are stored as a one-way argon2 hash. Add a separate,
per-user app-password: high-entropy, random, and encrypted at rest with a
Fernet key derived from SUBSONIC_SECRET_KEY (never stored in the DB).

- SubsonicPasswordCipher + generate_subsonic_password in core.security
- users.subsonic_password_enc column (+ Alembic migration), repo + port methods
- SubsonicAuthService: verify (t+s / p / p=enc:) and rotate/reveal lifecycle
- self-service GET/POST /users/me/subsonic-password + admin rotate endpoint
- domain SubsonicCredentials + SubsonicCipher port; deps wiring

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 18:23:19 +03:00

115 lines
3.6 KiB
Python

"""Admin endpoints: user management, services, sources, reindex, settings.
Registration is admin-only — this is a private instance, there is no public
sign-up (plan §6.4).
"""
import uuid
from typing import Any
from fastapi import APIRouter, Query, status
from app.api.deps import SubsonicAuthServiceDep, SuperUser, UserServiceDep
from app.api.schemas.subsonic import SubsonicPasswordResponse
from app.api.schemas.user import (
CreateUserRequest,
ResetPasswordRequest,
UpdateUserRequest,
UserResponse,
)
router = APIRouter(prefix="/admin", tags=["admin"])
@router.get("/users", response_model=list[UserResponse])
async def list_users(
_admin: SuperUser,
users: UserServiceDep,
limit: int = Query(default=50, ge=1, le=200),
offset: int = Query(default=0, ge=0),
) -> list[UserResponse]:
result = await users.list_users(limit=limit, offset=offset)
return [UserResponse.from_entity(u) for u in result]
@router.post("/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
body: CreateUserRequest, _admin: SuperUser, users: UserServiceDep
) -> UserResponse:
user = await users.create_user(
username=body.username,
password=body.password,
is_superuser=body.is_superuser,
)
return UserResponse.from_entity(user)
@router.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: uuid.UUID, _admin: SuperUser, users: UserServiceDep) -> UserResponse:
return UserResponse.from_entity(await users.get_user(user_id))
@router.patch("/users/{user_id}", response_model=UserResponse)
async def update_user(
user_id: uuid.UUID,
body: UpdateUserRequest,
_admin: SuperUser,
users: UserServiceDep,
) -> UserResponse:
user = await users.get_user(user_id)
if body.is_superuser is not None:
user = await users.set_superuser(user_id, is_superuser=body.is_superuser)
if body.is_active is not None:
user = await users.set_active(user_id, is_active=body.is_active)
return UserResponse.from_entity(user)
@router.post("/users/{user_id}/reset-password", status_code=status.HTTP_204_NO_CONTENT)
async def reset_password(
user_id: uuid.UUID,
body: ResetPasswordRequest,
_admin: SuperUser,
users: UserServiceDep,
) -> None:
await users.reset_password(user_id, new_password=body.new_password)
@router.delete("/users/{user_id}", response_model=UserResponse)
async def deactivate_user(
user_id: uuid.UUID, _admin: SuperUser, users: UserServiceDep
) -> UserResponse:
"""Soft delete — deactivates the account and revokes its sessions."""
return UserResponse.from_entity(await users.deactivate(user_id))
@router.post("/users/{user_id}/subsonic-password", response_model=SubsonicPasswordResponse)
async def rotate_user_subsonic_password(
user_id: uuid.UUID, _admin: SuperUser, subsonic: SubsonicAuthServiceDep
) -> SubsonicPasswordResponse:
"""Rotate any user's Subsonic app-password and return the new plaintext."""
return SubsonicPasswordResponse(password=await subsonic.rotate(user_id))
@router.get("/services")
async def list_services(_admin: SuperUser) -> Any: ...
@router.get("/sources")
async def list_admin_sources(_admin: SuperUser) -> Any: ...
@router.patch("/sources/{source}")
async def update_admin_source(source: str, _admin: SuperUser) -> Any: ...
@router.post("/reindex")
async def trigger_reindex(_admin: SuperUser) -> Any: ...
@router.get("/settings")
async def get_admin_settings(_admin: SuperUser) -> Any: ...
@router.patch("/settings")
async def update_admin_settings(_admin: SuperUser) -> Any: ...