Files
mcma-backend/app/infrastructure/db/repositories/user_repository.py
T
2026-06-03 10:40:00 +03:00

94 lines
3.2 KiB
Python

"""User repository — adapter over ``AsyncSession`` implementing
``app.domain.ports.UserRepository``.
Translates between ORM rows (``UserModel``) and domain entities (``User`` /
``Credentials``). The domain never sees ORM objects.
"""
import uuid
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.entities import Credentials, User
from app.domain.errors import NotFoundError
from app.infrastructure.db.models import UserModel
def _to_entity(row: UserModel) -> User:
return User(
id=row.id,
username=row.username,
is_superuser=row.is_superuser,
is_active=row.is_active,
created_at=row.created_at,
updated_at=row.updated_at,
)
class SqlAlchemyUserRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def _get_row(self, user_id: uuid.UUID) -> UserModel:
row = await self._session.get(UserModel, user_id)
if row is None:
raise NotFoundError("User not found.")
return row
async def get_by_id(self, user_id: uuid.UUID) -> User | None:
row = await self._session.get(UserModel, user_id)
return _to_entity(row) if row is not None else None
async def get_credentials_by_username(self, username: str) -> Credentials | None:
row = (
await self._session.execute(select(UserModel).where(UserModel.username == username))
).scalar_one_or_none()
if row is None:
return None
return Credentials(user=_to_entity(row), password_hash=row.password_hash)
async def add(self, *, username: str, password_hash: str, is_superuser: bool) -> User:
row = UserModel(
username=username,
password_hash=password_hash,
is_superuser=is_superuser,
is_active=True,
)
self._session.add(row)
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def list(self, *, limit: int, offset: int) -> list[User]:
rows = (
await self._session.execute(
select(UserModel).order_by(UserModel.created_at).limit(limit).offset(offset)
)
).scalars()
return [_to_entity(row) for row in rows]
async def set_password_hash(self, user_id: uuid.UUID, password_hash: str) -> None:
row = await self._get_row(user_id)
row.password_hash = password_hash
await self._session.flush()
async def set_superuser(self, user_id: uuid.UUID, is_superuser: bool) -> User:
row = await self._get_row(user_id)
row.is_superuser = is_superuser
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def set_active(self, user_id: uuid.UUID, is_active: bool) -> User:
row = await self._get_row(user_id)
row.is_active = is_active
await self._session.flush()
await self._session.refresh(row)
return _to_entity(row)
async def count(self) -> int:
return (
await self._session.execute(select(func.count()).select_from(UserModel))
).scalar_one()