FastAPI - 支持多种认证依赖

Posted

技术标签:

【中文标题】FastAPI - 支持多种认证依赖【英文标题】:FastAPI - Supporting multiple authentication dependencies 【发布时间】:2021-02-20 04:57:27 【问题描述】:

问题

我目前有一个名为 jwt 的 JWT 依赖项,它确保它在访问端点之前通过 JWT 身份验证阶段,如下所示:

sample_endpoint.py:

from fastapi import APIRouter, Depends, Request
from JWTBearer import JWTBearer
from jwt import jwks

router = APIRouter()

jwt = JWTBearer(jwks)

@router.get("/test_jwt", dependencies=[Depends(jwt)])
async def test_endpoint(request: Request):
    return True

以下是使用 JWT 对用户进行身份验证的 JWT 依赖项(来源:https://medium.com/datadriveninvestor/jwt-authentication-with-fastapi-and-aws-cognito-1333f7f2729e):

JWTBearer.py

from typing import Dict, Optional, List

from fastapi import HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, jwk, JWTError
from jose.utils import base64url_decode
from pydantic import BaseModel
from starlette.requests import Request
from starlette.status import HTTP_403_FORBIDDEN

JWK = Dict[str, str]


class JWKS(BaseModel):
    keys: List[JWK]


class JWTAuthorizationCredentials(BaseModel):
    jwt_token: str
    header: Dict[str, str]
    claims: Dict[str, str]
    signature: str
    message: str


class JWTBearer(HTTPBearer):
    def __init__(self, jwks: JWKS, auto_error: bool = True):
        super().__init__(auto_error=auto_error)

        self.kid_to_jwk = jwk["kid"]: jwk for jwk in jwks.keys

    def verify_jwk_token(self, jwt_credentials: JWTAuthorizationCredentials) -> bool:
        try:
            public_key = self.kid_to_jwk[jwt_credentials.header["kid"]]
        except KeyError:
            raise HTTPException(
                status_code=HTTP_403_FORBIDDEN, detail="JWK public key not found"
            )

        key = jwk.construct(public_key)
        decoded_signature = base64url_decode(jwt_credentials.signature.encode())

        return key.verify(jwt_credentials.message.encode(), decoded_signature)

    async def __call__(self, request: Request) -> Optional[JWTAuthorizationCredentials]:
        credentials: HTTPAuthorizationCredentials = await super().__call__(request)

        if credentials:
            if not credentials.scheme == "Bearer":
                raise HTTPException(
                    status_code=HTTP_403_FORBIDDEN, detail="Wrong authentication method"
                )

            jwt_token = credentials.credentials

            message, signature = jwt_token.rsplit(".", 1)

            try:
                jwt_credentials = JWTAuthorizationCredentials(
                    jwt_token=jwt_token,
                    header=jwt.get_unverified_header(jwt_token),
                    claims=jwt.get_unverified_claims(jwt_token),
                    signature=signature,
                    message=message,
                )
            except JWTError:
                raise HTTPException(status_code=HTTP_403_FORBIDDEN, detail="JWK invalid")

            if not self.verify_jwk_token(jwt_credentials):
                raise HTTPException(status_code=HTTP_403_FORBIDDEN, detail="JWK invalid")

            return jwt_credentials 

jwt.py:

import os

import requests
from dotenv import load_dotenv
from fastapi import Depends, HTTPException
from starlette.status import HTTP_403_FORBIDDEN

from app.JWTBearer import JWKS, JWTBearer, JWTAuthorizationCredentials

load_dotenv()  # Automatically load environment variables from a '.env' file.

jwks = JWKS.parse_obj(
    requests.get(
        f"https://cognito-idp.os.environ.get('COGNITO_REGION').amazonaws.com/"
        f"os.environ.get('COGNITO_POOL_ID')/.well-known/jwks.json"
    ).json()
)

jwt = JWTBearer(jwks)


async def get_current_user(
    credentials: JWTAuthorizationCredentials = Depends(auth)
) -> str:
    try:
        return credentials.claims["username"]
    except KeyError:
        HTTPException(status_code=HTTP_403_FORBIDDEN, detail="Username missing") 

api_key_dependency.py(现在很简化,会改的):

from fastapi import Security, FastAPI, HTTPException
from fastapi.security.api_key import APIKeyHeader

from starlette.status import HTTP_403_FORBIDDEN

async def get_api_key(
    api_key_header: str = Security(api_key_header)
):
    API_KEY = ... getting API KEY logic ...

    if api_key_header == API_KEY:
        return True
    else:
        raise HTTPException(
            status_code=HTTP_403_FORBIDDEN, detail="Could not validate credentials"
        ) 

问题

根据情况,我想先检查它是否在标头中有 API Key,如果存在,则使用它进行身份验证。否则,我想使用 jwt 依赖项进行身份验证。我想确保如果 api-key 身份验证或 jwt 身份验证通过,则用户已通过身份验证。这在 FastAPI 中是否可行(即具有多个依赖项,如果其中一个通过,则通过身份验证)。谢谢!

【问题讨论】:

一个简单的解决方案可能是拥有一个唯一的Dependency 来执行检查并调用正确的身份验证方法(JWT 或 KEY)。如果您需要仅使用 JWT 对某些路径进行身份验证,则可以直接使用该依赖项(相同的方法仅适用于 KEY 身份验证) 谢谢 Isabi,澄清一下,您的意思是添加执行检查并调用正确身份验证依赖项(JWT 依赖项或密钥)的第三个依赖项(这将是 sample_endpoint 的唯一依赖项)依赖)。这是正确的吗? 是的,某种工厂方法 您能否提供一个简单的示例,我可以在工厂依赖项中调用其他依赖项?我对如何做到这一点进行了一些研究,但无法找到一个很好的例子。我已经用 api 密钥验证的示例依赖项更新了帖子。 我尝试创建工厂依赖项(类),在工厂依赖项中有 _call_ 方法,并在类中创建了两个单独的函数(check_api_keycheck_jwt )。例如,它看起来像 def call_api_key(self, b = Depends(get_api_key)):。但它似乎根本没有在 check_api_keycheck_jwt 键中调用相应的依赖项。您能在这里提供一些指导吗? 【参考方案1】:

对不起,有事要办了

端点有一个唯一的依赖,从文件check_auth中调用它检查

端点

from fastapi import APIRouter, Depends, Request
from check_auth import check
from JWTBearer import JWTBearer
from jwt import jwks

router = APIRouter()

jwt = JWTBearer(jwks)

@router.get("/test_jwt", dependencies=[Depends(check)])
async def test_endpoint(request: Request):
    return True

函数检查将依赖于两个独立的依赖项,一个用于 api-key,一个用于 JWT。如果两者或其中之一通过,则身份验证通过。否则,我们将引发异常,如下所示。

依赖

def key_auth(api_key=Header(None)):
    if not api_key:
      return None
    ... verification logic goes here ...

def jwt(authorization=Header(None)):
    if not authorization:
      return None
    ... verification logic goes here ... 
    
async def check(key_result=Depends(jwt_auth), jwt_result=Depends(key_auth)):
    if not (key_result or jwt_result):
        raise Exception
     

【讨论】:

谢谢伊莎比!我完成了这部分,但无法在工厂依赖项中调用其他依赖项。我不确定 FastAPI 是否支持此功能。 可能是由于类所具有的依赖关系。也许直接处理请求可以让您访问密钥 @louprogramming 你设法解决了你的问题吗?如果是这样,请随时改进我的答案(也许在更改之前写 EDIT) 是的,我最终找到了一种解决方法,即将工厂依赖项中的依赖项作为两个独立的依赖项(方法)移动。我打算对此发表评论,但没有机会这样做。我马上更新。 @louprogramming 你能提供一个例子吗?我做了类似的事情(创建了另一个允许两种身份验证方案的依赖项);但后来我失去了开放 api 规范中的授权选项(锁消失了)。【参考方案2】:

这对我有用(JWT 或 APIkey Auth)。如果两种或一种身份验证方法都通过,则身份验证通过。

def jwt_auth(auth: HTTPAuthorizationCredentials = Depends(HTTPBearer(auto_error=False))):
    if not auth:
      return None
    ## validation logic
    return True

def key_auth(apikey_header=Depends(APIKeyHeader(name='X-API-Key', auto_error=False))):
    if not apikey_header:
      return None
    ## validation logic
    return True

async def jwt_or_key_auth(jwt_result=Depends(jwt_auth), key_result=Depends(key_auth)):
    if not (key_result or jwt_result):
        raise HTTPException(status_code=401, detail="Not authenticated")


@app.get("/", dependencies=[Depends(jwt_or_key_auth)])
async def root():
    return "message": "Hello World"

【讨论】:

以上是关于FastAPI - 支持多种认证依赖的主要内容,如果未能解决你的问题,请参考以下文章

17.FastAPI 表单数据

FastAPI:如何访问依赖项中的 APIRoute 对象

fastapi教程翻译(七):Body - 多种参数

FastAPI Web框架 [依赖项]

FastAPI Web框架 [依赖项]

[FastAPI-32]依赖注入缓存