290 lines
9.1 KiB
Python
290 lines
9.1 KiB
Python
from fastapi import Depends, HTTPException, status
|
|
from typing import Annotated
|
|
from sqlalchemy.orm import Session, joinedload
|
|
from sqlalchemy import func
|
|
from uuid import UUID
|
|
import redis
|
|
import asyncio
|
|
|
|
from ...dependencies import get_db, get_pubsub, get_redis
|
|
from ...db import models
|
|
|
|
from ..auth import services as auth_services
|
|
from ..auth import schemas as auth_schemas
|
|
|
|
from . import schemas
|
|
|
|
|
|
def get_queue_by_id(
|
|
queue_id: UUID, db: Annotated[Session, Depends(get_db)]
|
|
) -> models.Queue:
|
|
q = db.query(models.Queue).filter(models.Queue.id == queue_id).first()
|
|
return q
|
|
|
|
|
|
def get_owned_queues(
|
|
current_user: Annotated[auth_schemas.User, Depends(auth_services.get_current_user)]
|
|
) -> list[schemas.QueueInDb]:
|
|
return [schemas.QueueInDb.model_validate(q) for q in current_user.owns_queues]
|
|
|
|
|
|
def get_user_queues(
|
|
current_user: Annotated[auth_schemas.User, Depends(auth_services.get_anon_user)]
|
|
) -> list[schemas.QueueInDb]:
|
|
return [
|
|
schemas.QueueInDb.model_validate(q.queue)
|
|
for q in current_user.parts_in_queues.filter(models.QueueUser.passed == False)
|
|
]
|
|
|
|
|
|
def create_queue(
|
|
new_queue: schemas.Queue,
|
|
current_user: auth_schemas.UserInDB,
|
|
db: Session,
|
|
) -> schemas.QueueInDb:
|
|
q = models.Queue(
|
|
name=new_queue.name,
|
|
description=new_queue.description,
|
|
owner_id=current_user.id,
|
|
status="created",
|
|
)
|
|
db.add(q)
|
|
db.commit()
|
|
if new_queue.groups:
|
|
db.add_all(
|
|
instances=[
|
|
models.QueueGroup(name=qg.name, priority=qg.priority, queue_id=q.id)
|
|
for qg in new_queue.groups
|
|
]
|
|
)
|
|
db.commit()
|
|
return schemas.QueueInDb.model_validate(q)
|
|
|
|
|
|
def get_detailed_queue(
|
|
queue_id: UUID,
|
|
db: Annotated[Session, Depends(get_db)],
|
|
) -> schemas.QueueDetail:
|
|
q = db.query(models.Queue).filter(models.Queue.id == queue_id).first()
|
|
if q:
|
|
return schemas.QueueDetail(
|
|
id=q.id,
|
|
name=q.name,
|
|
description=q.description,
|
|
status=q.status,
|
|
owner_id=q.owner_id,
|
|
groups=q.groups.order_by(models.QueueGroup.priority.asc()),
|
|
participants=schemas.ParticipantInfo(
|
|
total=q.users.count(),
|
|
remaining=q.users.filter(models.QueueUser.passed == False).count(),
|
|
users_list=q.users.filter(models.QueueUser.passed == False).order_by(
|
|
models.QueueUser.position.asc()
|
|
),
|
|
),
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Not Found",
|
|
)
|
|
|
|
|
|
async def join_queue(
|
|
queue_id: UUID,
|
|
join_request: schemas.JoinRequest | None,
|
|
client: Annotated[auth_schemas.AnonUser, Depends(auth_services.get_anon_user)],
|
|
db: Annotated[Session, Depends(get_db)],
|
|
r: Annotated[redis.client.Redis, Depends(get_redis)],
|
|
) -> schemas.QueueUser:
|
|
q = get_queue_by_id(queue_id, db)
|
|
if q:
|
|
if not q.users.filter(models.QueueUser.user_id == client.id).first():
|
|
last_qu = q.users.order_by(models.QueueUser.position.desc()).first()
|
|
position = last_qu.position + 1 if last_qu else 0
|
|
new_qu = models.QueueUser(
|
|
user_id=client.id,
|
|
queue_id=q.id,
|
|
position=position,
|
|
group_id=(
|
|
join_request.group_id
|
|
if join_request and join_request.group_id
|
|
else None
|
|
),
|
|
)
|
|
db.add(new_qu)
|
|
db.commit()
|
|
await rebuild_queue(queue=q, db=db)
|
|
await r.publish(str(queue_id), "updated")
|
|
return new_qu
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Already joined",
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Not Found",
|
|
)
|
|
|
|
|
|
async def set_queue_listener(
|
|
queue_id: UUID,
|
|
db: Annotated[Session, Depends(get_db)],
|
|
ps: Annotated[redis.client.PubSub, Depends(get_pubsub)],
|
|
) -> schemas.QueueDetail:
|
|
await ps.subscribe(str(queue_id))
|
|
async for m in ps.listen():
|
|
if m.get("data", None) == b"updated":
|
|
break
|
|
await ps.unsubscribe()
|
|
new_queue = get_detailed_queue(queue_id=queue_id, db=db)
|
|
return new_queue
|
|
|
|
|
|
async def get_queue_owner(
|
|
queue: Annotated[models.Queue, Depends(get_queue_by_id)]
|
|
) -> auth_schemas.UserInDB:
|
|
return queue.owner if queue else None
|
|
|
|
|
|
async def verify_queue_owner(
|
|
queue_owner: Annotated[auth_schemas.UserInDB, Depends(get_queue_owner)],
|
|
current_user: Annotated[
|
|
auth_schemas.UserInDB, Depends(auth_services.get_current_user_or_none)
|
|
],
|
|
) -> bool:
|
|
return queue_owner.id == current_user.id if queue_owner and current_user else False
|
|
|
|
|
|
async def rebuild_queue(
|
|
queue: Annotated[models.Queue, Depends(get_queue_by_id)],
|
|
db: Annotated[Session, Depends(get_db)],
|
|
):
|
|
query = (
|
|
db.query(models.QueueUser)
|
|
.join(
|
|
models.QueueGroup,
|
|
models.QueueUser.group_id == models.QueueGroup.id,
|
|
isouter=True,
|
|
)
|
|
.filter(models.QueueUser.passed == False, models.QueueUser.queue_id == queue.id)
|
|
.order_by(
|
|
func.coalesce(models.QueueGroup.priority, 0).asc(),
|
|
models.QueueUser.position.asc(),
|
|
)
|
|
.options(joinedload(models.QueueUser.group))
|
|
)
|
|
queueusers = query.all()
|
|
first_qu_found_and_queue_in_process = False
|
|
if queue.status == "active":
|
|
for i, qu in enumerate(queueusers):
|
|
if qu.position == 0:
|
|
first_qu_found_and_queue_in_process = True
|
|
del queueusers[i]
|
|
break
|
|
for i, qu in enumerate(queueusers):
|
|
if first_qu_found_and_queue_in_process:
|
|
setattr(qu, "position", i + 1)
|
|
continue
|
|
setattr(qu, "position", i)
|
|
db.commit()
|
|
|
|
|
|
async def kick_first(
|
|
is_owner: Annotated[bool, Depends(verify_queue_owner)],
|
|
queue: Annotated[models.Queue, Depends(get_queue_by_id)],
|
|
db: Annotated[Session, Depends(get_db)],
|
|
):
|
|
if is_owner:
|
|
first_user = (
|
|
queue.users.filter(models.QueueUser.passed == False)
|
|
.order_by(models.QueueUser.position.asc())
|
|
.first()
|
|
)
|
|
if first_user:
|
|
setattr(first_user, "passed", True)
|
|
|
|
db.commit()
|
|
await rebuild_queue(queue=queue, db=db)
|
|
return
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="No first user",
|
|
)
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="You are not a queue owner!",
|
|
)
|
|
|
|
|
|
async def start_queue(
|
|
is_owner: Annotated[bool, Depends(verify_queue_owner)],
|
|
queue: Annotated[models.Queue, Depends(get_queue_by_id)],
|
|
db: Annotated[Session, Depends(get_db)],
|
|
):
|
|
if queue and is_owner:
|
|
setattr(queue, "status", "active")
|
|
db.commit()
|
|
await rebuild_queue(queue=queue, db=db)
|
|
return
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="You are not a queue owner!",
|
|
)
|
|
|
|
|
|
async def pass_queueuser(
|
|
queue: Annotated[models.Queue, Depends(get_queue_by_id)],
|
|
anon_user: Annotated[auth_schemas.AnonUser, Depends(auth_services.get_anon_user)],
|
|
db: Annotated[Session, Depends(get_db)],
|
|
):
|
|
if anon_user:
|
|
qu = (
|
|
db.query(models.QueueUser)
|
|
.filter(
|
|
models.QueueUser.queue_id == queue.id,
|
|
models.QueueUser.user_id == anon_user.id,
|
|
)
|
|
.first()
|
|
)
|
|
if qu:
|
|
setattr(qu, "passed", True)
|
|
db.commit()
|
|
await rebuild_queue(queue=queue, db=db)
|
|
return
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found!",
|
|
)
|
|
|
|
|
|
async def action_wrapper(
|
|
action: str,
|
|
is_owner: Annotated[bool, Depends(verify_queue_owner)],
|
|
queue: Annotated[models.Queue, Depends(get_queue_by_id)],
|
|
anon_user: Annotated[auth_schemas.AnonUser, Depends(auth_services.get_anon_user)],
|
|
db: Annotated[Session, Depends(get_db)],
|
|
r: Annotated[redis.client.Redis, Depends(get_redis)],
|
|
) -> schemas.ActionResult:
|
|
if queue:
|
|
if action == "kick-first":
|
|
await kick_first(is_owner=is_owner, queue=queue, db=db)
|
|
await r.publish(str(queue.id), "updated")
|
|
return {"action": action, "status": "success"}
|
|
if action == "pass":
|
|
await pass_queueuser(queue=queue, anon_user=anon_user, db=db)
|
|
await r.publish(str(queue.id), "updated")
|
|
return {"action": action, "status": "success"}
|
|
if action == "start":
|
|
await start_queue(queue=queue, is_owner=is_owner, db=db)
|
|
await r.publish(str(queue.id), "updated")
|
|
return {"action": action, "status": "success"}
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Action not found!",
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Queue not found!",
|
|
)
|