使用 oauth2 和 jwt 在 FastApi 中进行身份验证

Posted

技术标签:

【中文标题】使用 oauth2 和 jwt 在 FastApi 中进行身份验证【英文标题】:Authentication in FastApi using oauth2 and jwt 【发布时间】:2021-07-18 01:04:07 【问题描述】:

我是 FastApi 的新手。我正在尝试对用户进行身份验证并将他重定向到受保护的端点。在 /docs 或 /token 中提供用户名和密码(johndoe、secret)后,我将获得身份验证令牌。在示例中,他们使用 cURL 将请求“传递令牌”连同请求一起发送到受保护的端点,但是在部署期间我该如何做呢?

curl -X 'GET' \
  'http://127.0.0.1:8000/view' \
  -H 'accept: application/json' \
  -H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huZG9lIiwiZXhwIjoxNjE5MjEwMjYxfQ.g-NCbaEP0DzSw6FY1SQVHMlH3hzG3lTvWBkeJabuLwE'

以下代码来自教程。

from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi import FastAPI, Request, Depends, HTTPException, status
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.responses import htmlResponse
from passlib.context import CryptContext
from datetime import datetime, timedelta
from pydantic import BaseModel
from jose import JWTError, jwt
from typing import Optional


fake_users_db = 
    "johndoe": 
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    



# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

app = FastAPI()

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# for css files
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")


class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Optional[str] = None

class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

class UserInDB(User):
    hashed_password: str


def get_password_hash(password):
    return pwd_context.hash(password)

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update("exp": expire)
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers="WWW-Authenticate": "Bearer",
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.get("/token", response_class=HTMLResponse)
async def read_login(request: Request):
    # login.html is a simple html form that takes in an username and a password
    return templates.TemplateResponse("login.html", "request": request)

@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers="WWW-Authenticate": "Bearer",
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data="sub": user.username, expires_delta=access_token_expires
    )
    return "access_token": access_token, "token_type": "bearer"

@app.get("/view"): # private endpoint. redirect here!
async def read_view(current_user: User = Depends(get_current_active_user)):
    return "Neo": "We need guns. Lots of guns."

简而言之,我希望服务器进行身份验证(完成)并将用户重定向到私有端点,并且浏览器记住令牌并在它请求私有端点时传递它。对不起,如果这很愚蠢!

【问题讨论】:

【参考方案1】:

你试过了吗

from starlette.responses import RedirectResponse

@app.get("/")
def read_view(current_user: User = Depends(get_current_active_user)):
    return RedirectResponse("/view")

【讨论】:

以上是关于使用 oauth2 和 jwt 在 FastApi 中进行身份验证的主要内容,如果未能解决你的问题,请参考以下文章

存储用于编码/解码令牌数据的 JWT 密钥的安全方式

26.FastAPI安全性

Oauth2 和 JWT 的区别

FastAPI - 支持多种认证依赖

是否可以使用用户和凭据(JWT)登录,而 Oauth2 可以使用 google 登录?

如何以管理员用户身份撤销用户的访问令牌和刷新令牌?在 Oauth2 中使用 JWT