from fastapi import Depends, HTTPException, status from typing import Annotated from sqlalchemy.orm import Session, joinedload from sqlalchemy import func from uuid import UUID import redis import asyncio from ...dependencies import get_db, get_pubsub, get_redis from ...db import models from ..auth import services as auth_services from ..auth import schemas as auth_schemas from . import schemas def get_queue_by_id( queue_id: UUID, db: Annotated[Session, Depends(get_db)] ) -> models.Queue: q = db.query(models.Queue).filter(models.Queue.id == queue_id).first() return q def get_owned_queues( current_user: Annotated[auth_schemas.User, Depends(auth_services.get_current_user)] ) -> list[schemas.QueueInDb]: return [schemas.QueueInDb.model_validate(q) for q in current_user.owns_queues] def get_user_queues( current_user: Annotated[auth_schemas.User, Depends(auth_services.get_anon_user)] ) -> list[schemas.QueueInDb]: return [ schemas.QueueInDb.model_validate(q.queue) for q in current_user.parts_in_queues.filter(models.QueueUser.passed == False) ] def create_queue( new_queue: schemas.Queue, current_user: auth_schemas.UserInDB, db: Session, ) -> schemas.QueueInDb: q = models.Queue( name=new_queue.name, description=new_queue.description, owner_id=current_user.id, status="created", ) db.add(q) db.commit() if new_queue.groups: db.add_all( instances=[ models.QueueGroup(name=qg.name, priority=qg.priority, queue_id=q.id) for qg in new_queue.groups ] ) db.commit() return schemas.QueueInDb.model_validate(q) def get_detailed_queue( queue_id: UUID, db: Annotated[Session, Depends(get_db)], ) -> schemas.QueueDetail: q = db.query(models.Queue).filter(models.Queue.id == queue_id).first() if q: return schemas.QueueDetail( id=q.id, name=q.name, description=q.description, status=q.status, owner_id=q.owner_id, groups=q.groups.order_by(models.QueueGroup.priority.asc()), participants=schemas.ParticipantInfo( total=q.users.count(), remaining=q.users.filter(models.QueueUser.passed == False).count(), users_list=q.users.filter(models.QueueUser.passed == False).order_by( models.QueueUser.position.asc() ), ), ) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Not Found", ) async def join_queue( queue_id: UUID, join_request: schemas.JoinRequest | None, client: Annotated[auth_schemas.AnonUser, Depends(auth_services.get_anon_user)], db: Annotated[Session, Depends(get_db)], r: Annotated[redis.client.Redis, Depends(get_redis)], ) -> schemas.QueueUser: q = get_queue_by_id(queue_id, db) if q: if not q.users.filter(models.QueueUser.user_id == client.id).first(): last_qu = q.users.order_by(models.QueueUser.position.desc()).first() position = last_qu.position + 1 if last_qu else 0 new_qu = models.QueueUser( user_id=client.id, queue_id=q.id, position=position, group_id=( join_request.group_id if join_request and join_request.group_id else None ), ) db.add(new_qu) db.commit() await rebuild_queue(queue=q, db=db) await r.publish(str(queue_id), "updated") return new_qu raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Already joined", ) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Not Found", ) async def set_queue_listener( queue_id: UUID, db: Annotated[Session, Depends(get_db)], ps: Annotated[redis.client.PubSub, Depends(get_pubsub)], ) -> schemas.QueueDetail: await ps.subscribe(str(queue_id)) async for m in ps.listen(): if m.get("data", None) == b"updated": break await ps.unsubscribe() new_queue = get_detailed_queue(queue_id=queue_id, db=db) return new_queue async def get_queue_owner( queue: Annotated[models.Queue, Depends(get_queue_by_id)] ) -> auth_schemas.UserInDB: return queue.owner if queue else None async def verify_queue_owner( queue_owner: Annotated[auth_schemas.UserInDB, Depends(get_queue_owner)], current_user: Annotated[ auth_schemas.UserInDB, Depends(auth_services.get_current_user_or_none) ], ) -> bool: return queue_owner.id == current_user.id if queue_owner and current_user else False async def rebuild_queue( queue: Annotated[models.Queue, Depends(get_queue_by_id)], db: Annotated[Session, Depends(get_db)], ): query = ( db.query(models.QueueUser) .join( models.QueueGroup, models.QueueUser.group_id == models.QueueGroup.id, isouter=True, ) .filter(models.QueueUser.passed == False, models.QueueUser.queue_id == queue.id) .order_by( func.coalesce(models.QueueGroup.priority, 0).asc(), models.QueueUser.position.asc(), ) .options(joinedload(models.QueueUser.group)) ) queueusers = query.all() first_qu_found_and_queue_in_process = False if queue.status == "active": for i, qu in enumerate(queueusers): if qu.position == 0: first_qu_found_and_queue_in_process = True del queueusers[i] break for i, qu in enumerate(queueusers): if first_qu_found_and_queue_in_process: setattr(qu, "position", i + 1) continue setattr(qu, "position", i) db.commit() async def kick_first( is_owner: Annotated[bool, Depends(verify_queue_owner)], queue: Annotated[models.Queue, Depends(get_queue_by_id)], db: Annotated[Session, Depends(get_db)], ): if is_owner: first_user = ( queue.users.filter(models.QueueUser.passed == False) .order_by(models.QueueUser.position.asc()) .first() ) if first_user: setattr(first_user, "passed", True) db.commit() await rebuild_queue(queue=queue, db=db) return raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No first user", ) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="You are not a queue owner!", ) async def start_queue( is_owner: Annotated[bool, Depends(verify_queue_owner)], queue: Annotated[models.Queue, Depends(get_queue_by_id)], db: Annotated[Session, Depends(get_db)], ): if queue and is_owner: setattr(queue, "status", "active") db.commit() await rebuild_queue(queue=queue, db=db) return raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="You are not a queue owner!", ) async def pass_queueuser( queue: Annotated[models.Queue, Depends(get_queue_by_id)], anon_user: Annotated[auth_schemas.AnonUser, Depends(auth_services.get_anon_user)], db: Annotated[Session, Depends(get_db)], ): if anon_user: qu = ( db.query(models.QueueUser) .filter( models.QueueUser.queue_id == queue.id, models.QueueUser.user_id == anon_user.id, ) .first() ) if qu: setattr(qu, "passed", True) db.commit() await rebuild_queue(queue=queue, db=db) return raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found!", ) async def action_wrapper( action: str, is_owner: Annotated[bool, Depends(verify_queue_owner)], queue: Annotated[models.Queue, Depends(get_queue_by_id)], anon_user: Annotated[auth_schemas.AnonUser, Depends(auth_services.get_anon_user)], db: Annotated[Session, Depends(get_db)], r: Annotated[redis.client.Redis, Depends(get_redis)], ) -> schemas.ActionResult: if queue: if action == "kick-first": await kick_first(is_owner=is_owner, queue=queue, db=db) await r.publish(str(queue.id), "updated") return {"action": action, "status": "success"} if action == "pass": await pass_queueuser(queue=queue, anon_user=anon_user, db=db) await r.publish(str(queue.id), "updated") return {"action": action, "status": "success"} if action == "start": await start_queue(queue=queue, is_owner=is_owner, db=db) await r.publish(str(queue.id), "updated") return {"action": action, "status": "success"} raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Action not found!", ) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Queue not found!", )