Files
Senko-san 7a17e3babd feat(subsonic): per-user encrypted app-password foundation
Subsonic auth (t=md5(password+salt), legacy p=) needs a recoverable secret,
but login passwords are stored as a one-way argon2 hash. Add a separate,
per-user app-password: high-entropy, random, and encrypted at rest with a
Fernet key derived from SUBSONIC_SECRET_KEY (never stored in the DB).

- SubsonicPasswordCipher + generate_subsonic_password in core.security
- users.subsonic_password_enc column (+ Alembic migration), repo + port methods
- SubsonicAuthService: verify (t+s / p / p=enc:) and rotate/reveal lifecycle
- self-service GET/POST /users/me/subsonic-password + admin rotate endpoint
- domain SubsonicCredentials + SubsonicCipher port; deps wiring

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 18:23:19 +03:00

113 lines
4.0 KiB
Python

"""User repository — adapter over ``AsyncSession`` implementing
``app.domain.ports.UserRepository``.
Translates between ORM rows (``UserModel``) and domain entities (``User`` /
``Credentials``). The domain never sees ORM objects.
"""
import uuid
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.entities import Credentials, SubsonicCredentials, User
from app.domain.errors import NotFoundError
from app.infrastructure.db.models import UserModel
def _to_entity(row: UserModel) -> User:
return User(
id=row.id,
username=row.username,
is_superuser=row.is_superuser,
is_active=row.is_active,
created_at=row.created_at,
updated_at=row.updated_at,
)
class SqlAlchemyUserRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def _get_row(self, user_id: uuid.UUID) -> UserModel:
row = await self._session.get(UserModel, user_id)
if row is None:
raise NotFoundError("User not found.")
return row
async def get_by_id(self, user_id: uuid.UUID) -> User | None:
row = await self._session.get(UserModel, user_id)
return _to_entity(row) if row is not None else None
async def get_credentials_by_username(self, username: str) -> Credentials | None:
row = (
await self._session.execute(select(UserModel).where(UserModel.username == username))
).scalar_one_or_none()
if row is None:
return None
return Credentials(user=_to_entity(row), password_hash=row.password_hash)
async def add(self, *, username: str, password_hash: str, is_superuser: bool) -> User:
row = UserModel(
username=username,
password_hash=password_hash,
is_superuser=is_superuser,
is_active=True,
)
self._session.add(row)
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def list(self, *, limit: int, offset: int) -> list[User]:
rows = (
await self._session.execute(
select(UserModel).order_by(UserModel.created_at).limit(limit).offset(offset)
)
).scalars()
return [_to_entity(row) for row in rows]
async def set_password_hash(self, user_id: uuid.UUID, password_hash: str) -> None:
row = await self._get_row(user_id)
row.password_hash = password_hash
await self._session.flush()
async def set_superuser(self, user_id: uuid.UUID, is_superuser: bool) -> User:
row = await self._get_row(user_id)
row.is_superuser = is_superuser
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def set_active(self, user_id: uuid.UUID, is_active: bool) -> User:
row = await self._get_row(user_id)
row.is_active = is_active
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def count(self) -> int:
return (
await self._session.execute(select(func.count()).select_from(UserModel))
).scalar_one()
async def get_subsonic_credentials_by_username(
self, username: str
) -> SubsonicCredentials | None:
row = (
await self._session.execute(select(UserModel).where(UserModel.username == username))
).scalar_one_or_none()
if row is None:
return None
return SubsonicCredentials(user=_to_entity(row), password_enc=row.subsonic_password_enc)
async def get_subsonic_password_enc(self, user_id: uuid.UUID) -> str | None:
row = await self._get_row(user_id)
return row.subsonic_password_enc
async def set_subsonic_password_enc(self, user_id: uuid.UUID, password_enc: str) -> None:
row = await self._get_row(user_id)
row.subsonic_password_enc = password_enc
await self._session.flush()