"""Track repository — adapter over ``AsyncSession``.""" import uuid from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.domain.entities.track import Track from app.domain.errors import NotFoundError from app.infrastructure.db.models.artist import ArtistModel from app.infrastructure.db.models.track import TrackModel def _to_entity(row: TrackModel) -> Track: return Track( id=row.id, title=row.title, artist_id=row.artist_id, album_id=row.album_id, file_path=row.file_path, file_format=row.file_format, file_size=row.file_size, source=row.source, source_id=row.source_id, duration_seconds=row.duration_seconds, genre=row.genre, year=row.year, metadata_status=row.metadata_status, created_at=row.created_at, updated_at=row.updated_at, ) class SqlAlchemyTrackRepository: def __init__(self, session: AsyncSession) -> None: self._session = session async def get_by_id(self, track_id: uuid.UUID) -> Track | None: row = await self._session.get(TrackModel, track_id) return _to_entity(row) if row is not None else None async def get_by_source(self, source: str, source_id: str) -> Track | None: row = ( await self._session.execute( select(TrackModel).where( TrackModel.source == source, TrackModel.source_id == source_id, ) ) ).scalar_one_or_none() return _to_entity(row) if row is not None else None async def add( self, *, id: uuid.UUID, title: str, artist_id: uuid.UUID, file_path: str, file_format: str, file_size: int, source: str, source_id: str, metadata_status: str, added_by: uuid.UUID | None, ) -> Track: row = TrackModel( id=id, title=title, artist_id=artist_id, file_path=file_path, file_format=file_format, file_size=file_size, source=source, source_id=source_id, metadata_status=metadata_status, added_by=added_by, ) self._session.add(row) await self._session.flush() await self._session.refresh(row) return _to_entity(row) async def delete(self, track_id: uuid.UUID) -> None: row = await self._session.get(TrackModel, track_id) if row is not None: await self._session.delete(row) await self._session.flush() async def list( self, *, artist_id: uuid.UUID | None, album_id: uuid.UUID | None, q: str | None, sort_by: str = "created_at", order: str = "desc", limit: int = 50, offset: int = 0, ) -> list[Track]: stmt = select(TrackModel) if artist_id is not None: stmt = stmt.where(TrackModel.artist_id == artist_id) if album_id is not None: stmt = stmt.where(TrackModel.album_id == album_id) if q: stmt = stmt.where(TrackModel.title.ilike(f"%{q}%")) if sort_by == "artist": stmt = stmt.join(ArtistModel, TrackModel.artist_id == ArtistModel.id) col_artist = ArtistModel.name stmt = stmt.order_by(col_artist.asc() if order == "asc" else col_artist.desc()) elif sort_by == "title": col_title = TrackModel.title stmt = stmt.order_by(col_title.asc() if order == "asc" else col_title.desc()) else: stmt = stmt.order_by( TrackModel.created_at.asc() if order == "asc" else TrackModel.created_at.desc() ) stmt = stmt.limit(limit).offset(offset) rows = (await self._session.execute(stmt)).scalars().all() return [_to_entity(r) for r in rows] async def count( self, *, artist_id: uuid.UUID | None, album_id: uuid.UUID | None, q: str | None, ) -> int: stmt = select(func.count()).select_from(TrackModel) if artist_id is not None: stmt = stmt.where(TrackModel.artist_id == artist_id) if album_id is not None: stmt = stmt.where(TrackModel.album_id == album_id) if q: stmt = stmt.where(TrackModel.title.ilike(f"%{q}%")) return (await self._session.execute(stmt)).scalar_one() async def update( self, track_id: uuid.UUID, *, title: str | None, genre: str | None, year: int | None, ) -> Track: row = await self._session.get(TrackModel, track_id) if row is None: raise NotFoundError(f"Track {track_id} not found.") if title is not None: row.title = title if genre is not None: row.genre = genre if year is not None: row.year = year row.metadata_status = "manual" await self._session.flush() await self._session.refresh(row) return _to_entity(row)