flask-web—— JWT认证方案
Posted 胖虎是只mao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flask-web—— JWT认证方案相关的知识,希望对你有一定的参考价值。
一、JWT & JWS & JWE
Json Web Token(JWT)
JSON Web Token(JWT)
是一个非常轻巧的规范。这个规范允许我们使用JWT在两个组织之间传递安全可靠的信息。
现在网上大多数介绍JWT的文章实际介绍的都是JWS(JSON Web Signature),也往往导致了人们对于JWT的误解,但是JWT并不等于JWS,JWS只是JWT的一种实现,除了JWS外,JWE(JSON Web Encryption)也是JWT的一种实现。
下面就来详细介绍一下JWT与JWE的两种实现方式:
二、JSON Web Signature(JWS)——(常用)
JSON Web Signature
是一个有着简单的统一表达形式的字符串:
头部(Header)
头部用于描述关于该JWT的最基本的信息,例如其类型以及签名所用的算法等。 JSON内容要经Base64
编码生成字符串成为Header。
载荷(PayLoad)
payload的五个字段都是由JWT的标准所定义的。
- iss: 该JWT的签发者
- sub: 该JWT所面向的用户
- aud: 接收该JWT的一方
- exp(expires): 什么时候过期,这里是一个Unix时间戳
- iat(issued at): 在什么时候签发的
后面的信息可以按需补充。 JSON内容要经Base64
编码生成字符串成为PayLoad
。
签名(signature)
这个部分header
与payload
通过header中声明的加密方式,使用密钥secret
进行加密,生成签名。 JWS的主要目的是保证了数据在传输过程中不被修改,验证数据的完整性。但由于仅采用Base64对消息内容编码,因此不保证数据的不可泄露性。所以不适合用于传输敏感数据。
三、JSON Web Encryption(JWE)
相对于JWS,JWE则同时保证了安全性与数据完整性。 JWE由五部分组成:
JWE组成
具体生成步骤为:
- JOSE含义与JWS头部相同。
- 生成一个随机的Content Encryption Key (CEK)。
- 使用RSAES-OAEP 加密算法,用公钥加密CEK,生成JWE Encrypted Key。
- 生成JWE初始化向量。
- 使用AES GCM加密算法对明文部分进行加密生成密文Ciphertext,算法会随之生成一个128位的认证标记Authentication Tag。 6.对五个部分分别进行base64编码。
可见,JWE的计算过程相对繁琐,不够轻量级,因此适合与数据传输而非token认证,但该协议也足够安全可靠,用简短字符串描述了传输内容,兼顾数据的安全性与完整性。
四、JWT的Python库
独立的JWT Python库
-
itsdangerous
JSONWebSignatureSerializer
TimedJSONWebSignatureSerializer (可设置有效期) -
pyjwt
安装
$ pip install pyjwt
用例
>>> import jwt
>>> encoded_jwt = jwt.encode({'some': 'payload'}, 'secret', algorithm='HS256')
>>> encoded_jwt
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoicGF5bG9hZCJ9.4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZg'
>>> jwt.decode(encoded_jwt, 'secret', algorithms=['HS256'])
{'some': 'payload'}
项目封装
import jwt
from flask import current_app
def generate_jwt(payload, expiry, secret=None):
"""
生成jwt
:param payload: dict 载荷
:param expiry: datetime 有效期
:param secret: 密钥
:return: jwt
"""
_payload = {'exp': expiry}
_payload.update(payload)
if not secret:
secret = current_app.config['JWT_SECRET']
token = jwt.encode(_payload, secret, algorithm='HS256')
return token.decode()
def verify_jwt(token, secret=None):
"""
检验jwt
:param token: jwt
:param secret: 密钥
:return: dict: payload
"""
if not secret:
secret = current_app.config['JWT_SECRET']
try:
payload = jwt.decode(token, secret, algorithm=['HS256'])
except jwt.PyJWTError:
payload = None
return payload
五、项目实施方案
需求
设置有效期,但有效期不宜过长,需要刷新。
如何解决刷新问题?
-
手机号+验证码(或帐号+密码)验证后颁发接口调用token与refresh_token(刷新token)
-
Token 有效期为2小时,在调用接口时携带,每2小时刷新一次
-
提供refresh_token,refresh_token 有效期14天
-
在接口调用token过期后凭借refresh_token 获取新token
-
未携带token 、错误的token或接口调用token过期,返回401状态码
-
refresh_token 过期返回403状态码,前端在使用refresh_token请求新token时遇到403状态码则进入用户登录界面从新认证。
-
token的携带方式是在请求头中使用如下格式:
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoicGF5bG9hZCJ9.4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZg
注意:Bearer前缀与token中间有一个空格
JWT认证流程
JWT刷新流程
实现
注册或登录获取token
toutiao/resources/user/passport.pty
class AuthorizationResource(Resource):
"""
认证
"""
def _generate_tokens(self, user_id, with_refresh_token=True):
"""
生成token 和refresh_token
:param user_id: 用户id
:return: token, refresh_token
"""
# 颁发JWT
now = datetime.utcnow()
expiry = now + timedelta(hours=current_app.config['JWT_EXPIRY_HOURS'])
token = generate_jwt({'user_id': user_id, 'refresh': False}, expiry)
refresh_token = None
if with_refresh_token:
refresh_expiry = now + timedelta(days=current_app.config['JWT_REFRESH_DAYS'])
refresh_token = generate_jwt({'user_id': user_id, 'refresh': True}, refresh_expiry)
return token, refresh_token
def post(self):
"""
登录创建token
"""
json_parser = RequestParser()
json_parser.add_argument('mobile', type=parser.mobile, required=True, location='json')
json_parser.add_argument('code', type=parser.regex(r'^\\d{6}$'), required=True, location='json')
args = json_parser.parse_args()
mobile = args.mobile
code = args.code
# 从redis中获取验证码
key = 'app:code:{}'.format(mobile)
try:
real_code = current_app.redis_master.get(key)
except ConnectionError as e:
current_app.logger.error(e)
real_code = current_app.redis_slave.get(key)
try:
current_app.redis_master.delete(key)
except ConnectionError as e:
current_app.logger.error(e)
if not real_code or real_code.decode() != code:
return {'message': 'Invalid code.'}, 400
# 查询或保存用户
user = User.query.filter_by(mobile=mobile).first()
if user is None:
# 用户不存在,注册用户
user_id = current_app.id_worker.get_id()
user = User(id=user_id, mobile=mobile, name=mobile, last_login=datetime.now())
db.session.add(user)
profile = UserProfile(id=user.id)
db.session.add(profile)
db.session.commit()
else:
if user.status == User.STATUS.DISABLE:
return {'message': 'Invalid user.'}, 403
# 生成用户的jwt token
# 在payload 保存用户的什么信息
# user_id 一定
#
# 当需要从token取出数据 不需要查询数据库时候
# mobile 不一定
# user_name 不一定
token, refresh_token = self._generate_tokens(user.id)
return {'token': token, 'refresh_token': refresh_token}, 201
要生成用户的jwt token
,在payload 保存用户的什么信息
- user_id 一定需要
当需要从token取出数据 不需要查询数据库时候:
- mobile 不一定
- user_name 不一定
请求钩子
common/utils/middlewares.py
from flask import request, g
from .jwt_util import verify_jwt
def jwt_authentication():
"""
根据jwt验证用户身份
"""
g.user_id = None
g.is_refresh_token = False
authorization = request.headers.get('Authorization')
if authorization and authorization.startswith('Bearer '):
token = authorization.strip()[7:]
payload = verify_jwt(token)
if payload:
g.user_id = payload.get('user_id')
g.is_refresh_token = payload.get('refresh')
强制登录装饰器
common/utils/decorators.py
def login_required(func):
"""
用户必须登录装饰器
使用方法:放在method_decorators中
"""
@wraps(func)
def wrapper(*args, **kwargs):
if not g.user_id:
return {'message': 'User must be authorized.'}, 401
elif g.is_refresh_token:
return {'message': 'Do not use refresh token.'}, 403
else:
return func(*args, **kwargs)
return wrapper
更新token接口
toutiao/resources/user/passport.py
class AuthorizationResource(Resource):
"""
认证
"""
...
# 补充put方式 更新token接口
def put(self):
"""
刷新token
"""
user_id = g.user_id
if user_id and g.is_refresh_token:
token, refresh_token = self._generate_tokens(user_id, with_refresh_token=False)
return {'token': token}, 201
else:
return {'message': 'Wrong refresh token.'}, 403
六、JWT禁用问题
需求
token颁发给用户后,在有效期内服务端都会认可,但是如果在token的有效期内需要让token失效,该怎么办?
此问题的应用场景:
- 用户修改密码,需要颁发新的token,禁用还在有效期内的老token
- 后台封禁用户
解决方案
在redis中使用set类型保存新生成的token
key = 'user:{}:token'.format(user_id)
pl = redis_client.pipeline()
pl.sadd(key, new_token)
pl.expire(key, token有效期)
pl.execute()
客户端使用token进行请求时,如果验证token通过,则从redis中判断是否存在该用户的user:{}:token记录:
- 若不存在记录,放行,进入视图进行业务处理
## 首次登录注册
- 若存在,则对比本次请求的token是否在redis保存的set中:
- 若存在,则放行
- 若不在set的数值中,则返回403状态码,不再处理业务逻辑
key = 'user:{}:token'.format(user_id)
valid_tokens = redis_client.smembers(key, token)
if valid_tokens and token not in valid_tokens:
return {'message': 'Invalid token'.}, 403
说明:
- redis记录设置有效期的时长是一个token的有效期,保证旧token过期后,redis的记录也能自动清除,不占用空间。
- ==使用set保存新token的原因是,考虑到用户可能在旧token的有效期内,在其他多个设备进行了登录,需要生成多个新token,这
token接口调用携带的问题
接口调用时,只携带一个token
调用普通业务接口,只携带普通token
调用刷新接口,只携带refresh_token
前端 vue + axios(ajax) 在自己编写axios 的js代码中,设置请求头Authorization 值为token
axios.get()
axios.post()
axios.put()
新token都要保存下来,既保证新token都能正常登录,又能保证旧token被禁用==
hs256 vs sha256
sha256 是数据计算的散列算法 hash算法
hs256
-> 使用sha256计算签名的时候,使用一个secret秘钥字符串参与运算,使用sha256计算验签的时候,使用相同的secret 秘钥字符串参与运算
hs256
利用了sha256来进行计算,只是说明确了参与计算的时候要使用一个秘钥字符串参与一起计算
rs256
-> 使用sha256计算签名的时候,使用一个secret秘钥字符串参与运算,使用sha256计算验签的时候,使用另一个不同的secret 秘钥字符串参与运算
以上是关于flask-web—— JWT认证方案的主要内容,如果未能解决你的问题,请参考以下文章
JWT 登录认证 + Token 自动续期方案,写得太好了!