This commit is contained in:
2024-03-27 20:00:59 +03:00
parent f92171ff84
commit 717596f55c
12 changed files with 226 additions and 133 deletions

View File

@ -0,0 +1,4 @@
from .secret import SECRET_KEY
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

View File

@ -1,5 +1,7 @@
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
import uuid
from .database import Base
@ -7,8 +9,8 @@ from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String, unique=True, index=True)
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String, index=True)
username = Column(String(length=24), unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)

View File

@ -1,18 +0,0 @@
from typing import Union
from pydantic import BaseModel
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
class Config:
orm_mode = True

View File

@ -14,5 +14,5 @@ app.include_router(auth_router)
@app.get("/")
def read_root():
async def read_root():
return {"message": "OK"}

View File

@ -1,11 +1,18 @@
from fastapi import APIRouter, Depends, FastAPI, HTTPException, status
from datetime import datetime, timedelta, timezone
from typing import Annotated, Union
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from . import crud
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel
from ...config import jwt_config
from ...dependencies import get_db
from ...db import schemas
from . import schemas
from . import services
router = APIRouter(
prefix="/auth",
@ -13,92 +20,57 @@ router = APIRouter(
dependencies=[Depends(get_db)],
responses={404: {"description": "Not found"}},
)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@router.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@router.get("/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@router.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException
def fake_hash_password(password: str):
return "fakehashed" + password
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
user = fake_decode_token(token)
@router.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: Annotated[Session, Depends(get_db)],
) -> schemas.Token:
user = services.authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
status_code=status.HTTP_409_CONFLICT,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=jwt_config.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = services.create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return schemas.Token(access_token=access_token, token_type="bearer")
@router.post("/register")
async def register(
user_data: Annotated[schemas.UserRegister, Depends()],
db: Annotated[Session, Depends(get_db)],
) -> schemas.User:
user = services.get_user_by_username(db, user_data.username)
if user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User with this username already exists",
headers={"WWW-Authenticate": "Bearer"},
)
user = services.create_user(
db=db,
username=user_data.username,
plain_password=user_data.password,
name=user_data.name,
)
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)],
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
@router.get("/users/me/", response_model=schemas.User)
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)],
current_user: Annotated[schemas.User, Depends(services.get_current_active_user)],
):
return current_user
# @app.get("/users/me/items/")
# async def read_own_items(
# current_user: Annotated[User, Depends(get_current_active_user)],
# ):
# return [{"item_id": "Foo", "owner": current_user.username}]

View File

@ -1,28 +0,0 @@
from sqlalchemy.orm import Session
from ...db import models, schemas
def get_user_by_id(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_username(db: Session, username: int):
return db.query(models.User).filter(models.User.username == username).first()
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user

View File

@ -0,0 +1,27 @@
from typing import Union
from pydantic import BaseModel
class User(BaseModel):
username: str
name: Union[str, None] = None
class UserInDB(User):
hashed_password: str
class Config:
from_attributes = True
class UserRegister(User):
plain_password: str
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None

View File

@ -0,0 +1,99 @@
from fastapi import status, HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from jose import JWTError, jwt
from typing import Annotated, Union
from datetime import timedelta
from passlib.context import CryptContext
from ...db import models
from . import schemas
from ...dependencies import get_db
from ...config import jwt_config
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user_by_id(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_username(db: Session, username: int):
return db.query(models.User).filter(models.User.username == username).first()
def authenticate_user(db: Session, username: str, password: str):
user = get_user_by_username(db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(
to_encode, jwt_config.SECRET_KEY, algorithm=jwt_config.ALGORITHM
)
return encoded_jwt
def create_user(
db: Session, username: str, plain_password: str, name: Union[str, None] = None
) -> schemas.User:
user = models.User(
username=username,
name=name,
hashed_password=get_password_hash(plain_password),
)
db.add(user)
db.commit()
return schemas.User(user)
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: Annotated[Session, Depends(get_db)],
) -> schemas.User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
token, jwt_config.SECRET_KEY, algorithms=[jwt_config.ALGORITHM]
)
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[schemas.User, Depends(get_current_user)],
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user