使用 oauth2 和 jwt 在 FastApi 中进行身份验证
Posted
技术标签:
【中文标题】使用 oauth2 和 jwt 在 FastApi 中进行身份验证【英文标题】:Authentication in FastApi using oauth2 and jwt 【发布时间】:2021-07-18 01:04:07 【问题描述】:我是 FastApi 的新手。我正在尝试对用户进行身份验证并将他重定向到受保护的端点。在 /docs 或 /token 中提供用户名和密码(johndoe、secret)后,我将获得身份验证令牌。在示例中,他们使用 cURL 将请求“传递令牌”连同请求一起发送到受保护的端点,但是在部署期间我该如何做呢?
curl -X 'GET' \
'http://127.0.0.1:8000/view' \
-H 'accept: application/json' \
-H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huZG9lIiwiZXhwIjoxNjE5MjEwMjYxfQ.g-NCbaEP0DzSw6FY1SQVHMlH3hzG3lTvWBkeJabuLwE'
以下代码来自教程。
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi import FastAPI, Request, Depends, HTTPException, status
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.responses import htmlResponse
from passlib.context import CryptContext
from datetime import datetime, timedelta
from pydantic import BaseModel
from jose import JWTError, jwt
from typing import Optional
fake_users_db =
"johndoe":
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
app = FastAPI()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# for css files
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
def get_password_hash(password):
return pwd_context.hash(password)
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update("exp": expire)
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers="WWW-Authenticate": "Bearer",
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.get("/token", response_class=HTMLResponse)
async def read_login(request: Request):
# login.html is a simple html form that takes in an username and a password
return templates.TemplateResponse("login.html", "request": request)
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers="WWW-Authenticate": "Bearer",
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data="sub": user.username, expires_delta=access_token_expires
)
return "access_token": access_token, "token_type": "bearer"
@app.get("/view"): # private endpoint. redirect here!
async def read_view(current_user: User = Depends(get_current_active_user)):
return "Neo": "We need guns. Lots of guns."
简而言之,我希望服务器进行身份验证(完成)并将用户重定向到私有端点,并且浏览器记住令牌并在它请求私有端点时传递它。对不起,如果这很愚蠢!
【问题讨论】:
【参考方案1】:你试过了吗
from starlette.responses import RedirectResponse
@app.get("/")
def read_view(current_user: User = Depends(get_current_active_user)):
return RedirectResponse("/view")
【讨论】:
以上是关于使用 oauth2 和 jwt 在 FastApi 中进行身份验证的主要内容,如果未能解决你的问题,请参考以下文章