captcha
This commit is contained in:
@ -67,3 +67,11 @@ class QueueLog(Base):
|
||||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
||||
action = Column(String)
|
||||
created = Column(DateTime, default=datetime.datetime.utcnow)
|
||||
|
||||
|
||||
class Captcha(Base):
|
||||
__tablename__ = "captcha"
|
||||
|
||||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
||||
prompt = Column(String(length=6))
|
||||
used = Column(Boolean, default=False)
|
||||
|
||||
@ -2,8 +2,9 @@ from datetime import datetime, timedelta, timezone
|
||||
from typing import Annotated, Union
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from fastapi import APIRouter, Depends, HTTPException, status, Response
|
||||
from fastapi.security import OAuth2PasswordRequestForm
|
||||
from io import BytesIO
|
||||
|
||||
|
||||
from pydantic import BaseModel
|
||||
@ -46,21 +47,27 @@ async def register(
|
||||
user_data: schemas.UserRegister,
|
||||
db: Annotated[Session, Depends(get_db)],
|
||||
) -> schemas.User:
|
||||
user = services.get_user_by_username(db, user_data.username)
|
||||
if user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="User with this username already exists",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
if user_data.password != user_data.password2:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="Passwords do not match",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
user = services.create_user(db=db, user_data=user_data)
|
||||
return user
|
||||
if services.check_captcha(
|
||||
id=user_data.captcha.id, prompt=user_data.captcha.prompt, db=db
|
||||
):
|
||||
user = services.get_user_by_username(db, user_data.username)
|
||||
if user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="User with this username already exists",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
if user_data.password != user_data.password2:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="Passwords do not match",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
user = services.create_user(db=db, user_data=user_data)
|
||||
return user
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid captcha"
|
||||
)
|
||||
|
||||
|
||||
@router.get("/me")
|
||||
@ -75,3 +82,16 @@ async def get_qnon_user(
|
||||
anon_user: Annotated[schemas.AnonUser, Depends(services.get_anon_user)]
|
||||
) -> schemas.AnonUser:
|
||||
return anon_user
|
||||
|
||||
|
||||
@router.get(
|
||||
"/captcha/{captcha_id}",
|
||||
responses={200: {"content": {"image/png": {}}}},
|
||||
response_class=Response,
|
||||
)
|
||||
async def generate_captcha(
|
||||
captcha: Annotated[BytesIO, Depends(services.get_captcha)]
|
||||
) -> Response:
|
||||
captcha.seek(0)
|
||||
captcha_bytes = captcha.read()
|
||||
return Response(content=captcha_bytes, media_type="image/png")
|
||||
|
||||
@ -15,9 +15,22 @@ class UserInDB(User):
|
||||
from_attributes = True
|
||||
|
||||
|
||||
class Captcha(BaseModel):
|
||||
id: UUID
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
|
||||
class CaptchaCheck(BaseModel):
|
||||
id: UUID
|
||||
prompt: str
|
||||
|
||||
|
||||
class UserRegister(User):
|
||||
password: str
|
||||
password2: str
|
||||
captcha: CaptchaCheck
|
||||
|
||||
|
||||
class Token(BaseModel):
|
||||
|
||||
@ -6,12 +6,17 @@ from typing import Annotated, Union
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from passlib.context import CryptContext
|
||||
import uuid
|
||||
import random
|
||||
from io import BytesIO
|
||||
from captcha.image import ImageCaptcha
|
||||
|
||||
from ...db import models
|
||||
from . import schemas
|
||||
from ...dependencies import get_db
|
||||
from ...config import jwt_config
|
||||
|
||||
CAPTCHA_SYMBOLS = "abcdefghijklmnopqrstuvwxyz0123456789"
|
||||
|
||||
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
||||
|
||||
@ -138,6 +143,42 @@ def get_anon_user(
|
||||
return anon
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_418_IM_A_TEAPOT,
|
||||
detail={"message": "tf dude? trying to spoof your client id?"},
|
||||
)
|
||||
return create_anon_user(db)
|
||||
|
||||
|
||||
def get_captcha(
|
||||
captcha_id: uuid.UUID, db: Annotated[Session, Depends(get_db)]
|
||||
) -> BytesIO:
|
||||
prompt = "".join(random.choice(CAPTCHA_SYMBOLS) for i in range(6))
|
||||
c = models.Captcha(id=captcha_id, prompt=prompt)
|
||||
try:
|
||||
db.add(c)
|
||||
db.commit()
|
||||
except:
|
||||
db.rollback()
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_418_IM_A_TEAPOT,
|
||||
)
|
||||
captcha = ImageCaptcha()
|
||||
data = captcha.generate(prompt)
|
||||
return data
|
||||
|
||||
|
||||
def check_captcha(
|
||||
id: uuid.UUID, prompt: str, db: Annotated[Session, Depends(get_db)]
|
||||
) -> bool:
|
||||
c = (
|
||||
db.query(models.Captcha)
|
||||
.filter(
|
||||
models.Captcha.id == id,
|
||||
models.Captcha.prompt == prompt,
|
||||
models.Captcha.used == False,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
if c:
|
||||
setattr(c, "used", True)
|
||||
db.commit()
|
||||
return True
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user