rebuild project with rsbuild
This commit is contained in:
@ -9,5 +9,6 @@ class User(Base):
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
email = Column(String, unique=True, index=True)
|
||||
username = Column(String(length=24), unique=True, index=True)
|
||||
hashed_password = Column(String)
|
||||
is_active = Column(Boolean, default=True)
|
||||
|
||||
@ -1,5 +1,8 @@
|
||||
from fastapi import APIRouter, Depends, FastAPI, HTTPException
|
||||
from fastapi import APIRouter, Depends, FastAPI, HTTPException, status
|
||||
from typing import Annotated, Union
|
||||
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from . import crud
|
||||
from ...dependencies import get_db
|
||||
from ...db import schemas
|
||||
@ -10,6 +13,7 @@ router = APIRouter(
|
||||
dependencies=[Depends(get_db)],
|
||||
responses={404: {"description": "Not found"}},
|
||||
)
|
||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
||||
|
||||
|
||||
@router.post("/users/", response_model=schemas.User)
|
||||
@ -31,3 +35,70 @@ def read_user(user_id: int, db: Session = Depends(get_db)):
|
||||
db_user = crud.get_user(db, user_id=user_id)
|
||||
if db_user is None:
|
||||
raise HTTPException
|
||||
|
||||
|
||||
def fake_hash_password(password: str):
|
||||
return "fakehashed" + password
|
||||
|
||||
|
||||
class User(BaseModel):
|
||||
username: str
|
||||
email: Union[str, None] = None
|
||||
full_name: Union[str, None] = None
|
||||
disabled: Union[bool, None] = None
|
||||
|
||||
|
||||
class UserInDB(User):
|
||||
hashed_password: str
|
||||
|
||||
|
||||
def get_user(db, username: str):
|
||||
if username in db:
|
||||
user_dict = db[username]
|
||||
return UserInDB(**user_dict)
|
||||
|
||||
|
||||
def fake_decode_token(token):
|
||||
# This doesn't provide any security at all
|
||||
# Check the next version
|
||||
user = get_user(fake_users_db, token)
|
||||
return user
|
||||
|
||||
|
||||
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
|
||||
user = fake_decode_token(token)
|
||||
if not user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Invalid authentication credentials",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
return user
|
||||
|
||||
|
||||
async def get_current_active_user(
|
||||
current_user: Annotated[User, Depends(get_current_user)],
|
||||
):
|
||||
if current_user.disabled:
|
||||
raise HTTPException(status_code=400, detail="Inactive user")
|
||||
return current_user
|
||||
|
||||
|
||||
@app.post("/token")
|
||||
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
|
||||
user_dict = fake_users_db.get(form_data.username)
|
||||
if not user_dict:
|
||||
raise HTTPException(status_code=400, detail="Incorrect username or password")
|
||||
user = UserInDB(**user_dict)
|
||||
hashed_password = fake_hash_password(form_data.password)
|
||||
if not hashed_password == user.hashed_password:
|
||||
raise HTTPException(status_code=400, detail="Incorrect username or password")
|
||||
|
||||
return {"access_token": user.username, "token_type": "bearer"}
|
||||
|
||||
|
||||
@app.get("/users/me")
|
||||
async def read_users_me(
|
||||
current_user: Annotated[User, Depends(get_current_active_user)],
|
||||
):
|
||||
return current_user
|
||||
|
||||
@ -3,10 +3,14 @@ from sqlalchemy.orm import Session
|
||||
from ...db import models, schemas
|
||||
|
||||
|
||||
def get_user(db: Session, user_id: int):
|
||||
def get_user_by_id(db: Session, user_id: int):
|
||||
return db.query(models.User).filter(models.User.id == user_id).first()
|
||||
|
||||
|
||||
def get_user_by_username(db: Session, username: int):
|
||||
return db.query(models.User).filter(models.User.username == username).first()
|
||||
|
||||
|
||||
def get_user_by_email(db: Session, email: str):
|
||||
return db.query(models.User).filter(models.User.email == email).first()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user