from datetime import datetime, timedelta import jwt from fastapi import Depends, FastAPI, HTTPException from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jwt import PyJWTError from passlib.context import CryptContext from starlette.status import HTTP_401_UNAUTHORIZED from .models import Token, TokenData, TicketCollection, User, UserInDB, NewCollection, NewUser, TicketCollectionDB from typing import List # to get a string like this run: # openssl rand -hex 32 SECRET_KEY = "da06994dd99d0c3a01df358c9e9bcec40db567378c3bbd47e10661d5fd8d7359" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token") app = FastAPI(title="Ticket Backend", description="The backend for the ticket scanner application", redoc_url=None) def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) async def get_user(username: str): try: return await UserInDB.objects.get(username=username) except: return None async def authenticate_user(username: str, password: str): user = await get_user(username) if not user: return False if not verify_password(password, user.hashed_password): return False return user def create_access_token(*, data: dict, expires_delta: timedelta = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except PyJWTError: raise credentials_exception user = await get_user(username=token_data.username) if user is None: raise credentials_exception return user async def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user @app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = await authenticate_user( form_data.username, form_data.password) if not user: raise HTTPException( status_code=HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=30) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} @app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)): return current_user @app.get("/users/me/items/") async def read_own_items(current_user: User = Depends(get_current_active_user)): return [{"item_id": "Foo", "owner": current_user.username}] @app.post("/users/new", response_model=User) async def add_new_user(them: NewUser): return await UserInDB.objects.create(username=them.username, hashed_password=get_password_hash(them.password), email=str(them.email), full_name=them.name) @app.post("/collections/new", response_model=TicketCollection) async def add_new_collection(collection: NewCollection, current_user: UserInDB = Depends(get_current_active_user)): return await TicketCollectionDB.objects.create(name=collection.name, user=current_user, price=int(collection.price*100), description=collection.description, requires_student=collection.requires_student) @app.get("/collections", response_model=List[TicketCollection]) async def get_all_collections(current_user: User = Depends(get_current_active_user)): return await TicketCollectionDB.objects.filter(user=current_user).all() @app.on_event("startup") async def start(): try: await UserInDB.objects.create( username="johndoe", hashed_password="$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", email="johndoe@example.com", full_name="John Doe") except: pass