from fastapi import Depends, HTTPException, status from typing import Annotated from sqlalchemy.orm import Session from uuid import UUID import redis import asyncio from ...dependencies import get_db, get_pubsub, get_redis from ...db import models from ..auth import services as auth_services from ..auth import schemas as auth_schemas from . import schemas def get_queue_by_id( queue_id: UUID, db: Annotated[Session, Depends(get_db)] ) -> models.Queue: q = db.query(models.Queue).filter(models.Queue.id == queue_id).first() return q def get_owned_queues( current_user: Annotated[auth_schemas.User, Depends(auth_services.get_current_user)] ) -> list[schemas.QueueInDb]: return [schemas.QueueInDb.model_validate(q) for q in current_user.owns_queues] def get_user_queues( current_user: Annotated[auth_schemas.User, Depends(auth_services.get_anon_user)] ) -> list[schemas.QueueInDb]: return [ schemas.QueueInDb.model_validate(q.queue) for q in current_user.parts_in_queues.filter(models.QueueUser.passed == False) ] def create_queue( new_queue: schemas.Queue, current_user: auth_schemas.UserInDB, db: Session, ) -> schemas.QueueInDb: q = models.Queue( name=new_queue.name, description=new_queue.description, owner_id=current_user.id, status="created", ) db.add(q) db.commit() return schemas.QueueInDb.model_validate(q) def get_detailed_queue( queue_id: UUID, db: Annotated[Session, Depends(get_db)], ) -> schemas.QueueDetail: q = db.query(models.Queue).filter(models.Queue.id == queue_id).first() if q: return schemas.QueueDetail( id=q.id, name=q.name, description=q.description, status=q.status, owner_id=q.owner_id, participants=schemas.ParticipantInfo( total=q.users.count(), remaining=q.users.filter(models.QueueUser.passed == False).count(), users_list=q.users.filter(models.QueueUser.passed == False).order_by( models.QueueUser.position.asc() ), ), ) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Not Found", ) async def join_queue( queue_id: UUID, client: Annotated[auth_schemas.AnonUser, Depends(auth_services.get_anon_user)], db: Annotated[Session, Depends(get_db)], r: Annotated[redis.client.Redis, Depends(get_redis)], ) -> schemas.QueueUser: q = get_queue_by_id(queue_id, db) if q: if not q.users.filter(models.QueueUser.user_id == client.id).first(): last_qu = q.users.order_by(models.QueueUser.position.desc()).first() position = last_qu.position + 1 if last_qu else 0 new_qu = models.QueueUser( user_id=client.id, queue_id=q.id, position=position ) db.add(new_qu) db.commit() await r.publish(str(queue_id), "updated") return new_qu raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Already joined", ) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Not Found", ) async def set_queue_listener( queue_id: UUID, db: Annotated[Session, Depends(get_db)], ps: Annotated[redis.client.PubSub, Depends(get_pubsub)], ) -> schemas.QueueDetail: await ps.subscribe(str(queue_id)) async for m in ps.listen(): if m.get("data", None) == b"updated": break await ps.unsubscribe() new_queue = get_detailed_queue(queue_id=queue_id, db=db) return new_queue async def get_queue_owner( queue: Annotated[models.Queue, Depends(get_queue_by_id)] ) -> auth_schemas.UserInDB: return queue.owner if queue else None async def verify_queue_owner( queue_owner: Annotated[auth_schemas.UserInDB, Depends(get_queue_owner)], current_user: Annotated[ auth_schemas.UserInDB, Depends(auth_services.get_current_user) ], ) -> bool: return queue_owner.id == current_user.id if queue_owner and current_user else False async def rebuild_queue( queue: Annotated[models.Queue, Depends(get_queue_by_id)], db: Annotated[Session, Depends(get_db)], ): for i, qu in enumerate( queue.users.filter(models.QueueUser.passed == False).order_by( models.QueueUser.position.asc() ) ): setattr(qu, "position", i) db.commit() async def kick_first( is_owner: Annotated[bool, Depends(verify_queue_owner)], queue: Annotated[models.Queue, Depends(get_queue_by_id)], db: Annotated[Session, Depends(get_db)], ): if is_owner: first_user = ( queue.users.filter(models.QueueUser.passed == False) .order_by(models.QueueUser.position.asc()) .first() ) if first_user: setattr(first_user, "passed", True) db.commit() await rebuild_queue(queue=queue, db=db) return raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No first user", ) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="You are not a queue owner!", ) async def start_queue( is_owner: Annotated[bool, Depends(verify_queue_owner)], queue: Annotated[models.Queue, Depends(get_queue_by_id)], db: Annotated[Session, Depends(get_db)], ): if queue and is_owner: setattr(queue, "status", "active") db.commit() await rebuild_queue(queue=queue, db=db) return raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="You are not a queue owner!", ) async def pass_queueuser( queue: Annotated[models.Queue, Depends(get_queue_by_id)], anon_user: Annotated[auth_schemas.AnonUser, Depends(auth_services.get_anon_user)], db: Annotated[Session, Depends(get_db)], ): if anon_user: qu = ( db.query(models.QueueUser) .filter( models.QueueUser.queue_id == queue.id, models.QueueUser.user_id == anon_user.id, ) .first() ) if qu: setattr(qu, "passed", True) db.commit() await rebuild_queue(queue=queue, db=db) return raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found!", ) async def action_wrapper( action: str, is_owner: Annotated[bool, Depends(verify_queue_owner)], queue: Annotated[models.Queue, Depends(get_queue_by_id)], anon_user: Annotated[auth_schemas.AnonUser, Depends(auth_services.get_anon_user)], db: Annotated[Session, Depends(get_db)], r: Annotated[redis.client.Redis, Depends(get_redis)], ) -> schemas.ActionResult: if queue: if action == "kick-first": await kick_first(is_owner=is_owner, queue=queue, db=db) await r.publish(str(queue.id), "updated") return {"action": action, "status": "success"} if action == "pass": await pass_queueuser(queue=queue, anon_user=anon_user, db=db) await r.publish(str(queue.id), "updated") return {"action": action, "status": "success"} if action == "start": await start_queue(queue=queue, is_owner=is_owner, db=db) await r.publish(str(queue.id), "updated") return {"action": action, "status": "success"} raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Action not found!", ) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Queue not found!", )