Files
Senko-san 7a17e3babd feat(subsonic): per-user encrypted app-password foundation
Subsonic auth (t=md5(password+salt), legacy p=) needs a recoverable secret,
but login passwords are stored as a one-way argon2 hash. Add a separate,
per-user app-password: high-entropy, random, and encrypted at rest with a
Fernet key derived from SUBSONIC_SECRET_KEY (never stored in the DB).

- SubsonicPasswordCipher + generate_subsonic_password in core.security
- users.subsonic_password_enc column (+ Alembic migration), repo + port methods
- SubsonicAuthService: verify (t+s / p / p=enc:) and rotate/reveal lifecycle
- self-service GET/POST /users/me/subsonic-password + admin rotate endpoint
- domain SubsonicCredentials + SubsonicCipher port; deps wiring

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 18:23:19 +03:00

120 lines
4.0 KiB
Python

"""In-memory port implementations for fast, DB-free unit tests."""
import datetime as dt
import uuid
from dataclasses import dataclass, replace
from app.domain.entities import Credentials, SubsonicCredentials, User
@dataclass
class _Stored:
user: User
password_hash: str
subsonic_password_enc: str | None = None
class InMemoryUserRepository:
def __init__(self) -> None:
self._by_id: dict[uuid.UUID, _Stored] = {}
async def get_by_id(self, user_id: uuid.UUID) -> User | None:
stored = self._by_id.get(user_id)
return stored.user if stored else None
async def get_credentials_by_username(self, username: str) -> Credentials | None:
for stored in self._by_id.values():
if stored.user.username == username:
return Credentials(user=stored.user, password_hash=stored.password_hash)
return None
async def add(self, *, username: str, password_hash: str, is_superuser: bool) -> User:
now = dt.datetime.now(dt.UTC)
user = User(
id=uuid.uuid4(),
username=username,
is_superuser=is_superuser,
is_active=True,
created_at=now,
updated_at=now,
)
self._by_id[user.id] = _Stored(user=user, password_hash=password_hash)
return user
async def list(self, *, limit: int, offset: int) -> list[User]:
users = [s.user for s in self._by_id.values()]
users.sort(key=lambda u: u.created_at)
return users[offset : offset + limit]
async def set_password_hash(self, user_id: uuid.UUID, password_hash: str) -> None:
self._by_id[user_id].password_hash = password_hash
async def set_superuser(self, user_id: uuid.UUID, is_superuser: bool) -> User:
stored = self._by_id[user_id]
stored.user = replace(stored.user, is_superuser=is_superuser)
return stored.user
async def set_active(self, user_id: uuid.UUID, is_active: bool) -> User:
stored = self._by_id[user_id]
stored.user = replace(stored.user, is_active=is_active)
return stored.user
async def count(self) -> int:
return len(self._by_id)
async def get_subsonic_credentials_by_username(
self, username: str
) -> SubsonicCredentials | None:
for stored in self._by_id.values():
if stored.user.username == username:
return SubsonicCredentials(
user=stored.user, password_enc=stored.subsonic_password_enc
)
return None
async def get_subsonic_password_enc(self, user_id: uuid.UUID) -> str | None:
return self._by_id[user_id].subsonic_password_enc
async def set_subsonic_password_enc(self, user_id: uuid.UUID, password_enc: str) -> None:
self._by_id[user_id].subsonic_password_enc = password_enc
@dataclass
class _Token:
user_id: uuid.UUID
token_hash: str
expires_at: dt.datetime
revoked_at: dt.datetime | None = None
class InMemoryRefreshTokenRepository:
def __init__(self) -> None:
self._by_jti: dict[uuid.UUID, _Token] = {}
async def add(
self,
*,
jti: uuid.UUID,
user_id: uuid.UUID,
token_hash: str,
expires_at: dt.datetime,
) -> None:
self._by_jti[jti] = _Token(user_id=user_id, token_hash=token_hash, expires_at=expires_at)
async def is_valid(self, jti: uuid.UUID) -> bool:
token = self._by_jti.get(jti)
if token is None or token.revoked_at is not None:
return False
return token.expires_at > dt.datetime.now(dt.UTC)
async def revoke(self, jti: uuid.UUID) -> None:
token = self._by_jti.get(jti)
if token and token.revoked_at is None:
token.revoked_at = dt.datetime.now(dt.UTC)
async def revoke_all_for_user(self, user_id: uuid.UUID) -> None:
now = dt.datetime.now(dt.UTC)
for token in self._by_jti.values():
if token.user_id == user_id and token.revoked_at is None:
token.revoked_at = now