Flask+Vue Api认证
Posted 木小撇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flask+Vue Api认证相关的知识,希望对你有一定的参考价值。
这篇文章其实是重构,因为最近写一个基于Flask_socket的后台的时候发现基于flask_httpauth的这个Api认证违法满足需求。再兼之上次写得过于匆忙,于是决心重构一波。
首先Flask的项目结构基本采用狗书的Flasky, 采用flask_cors扩展解决跨域的问题。
### 后端部分
项目结构如下,具体请参考开源Flasky。
│ config.py
│ manage.py
│ README.md
│ requirements.txt
└─app
│ models.py
│ __init__.py
│
├─api
│ authentication.py
│ errors.py
│ __init__.py
│
├─auth
│ forms.py
│ views.py
│ __init__.py
│
└─main
views.py
__init__.py
其中api提供认证接口。
项目初始化文件 app/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
from config import config
lm = LoginManager()
lm.session_protection = 'strong'
lm.login_view = 'auth.login'
db = SQLAlchemy()
def register_blueprints(app):
# Prevents circular imports
from auth import mod as auth
from main import mod as main
from api import mod as api
app.register_blueprint(auth, url_prefix='/auth')
app.register_blueprint(main)
app.register_blueprint(api, url_prefix='/api')
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
register_blueprints(app)
db.init_app(app)
lm.init_app(app)
return app
数据库model
from app import db
from datetime import datetime
from flask_login import UserMixin
from app import lm
from flask import current_app
from werkzeug.security import generate_password_hash,check_password_hash
from itsdangerous import (TimedJSONWebSignatureSerializer
as Serializer, BadSignature, SignatureExpired)
class User(db.Model,UserMixin):
__tablename__ = 'user'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), index=True)
password_hash = db.Column(db.String(64), index=True)
nickname = db.Column(db.String(64), index=True, unique=True)
email = db.Column(db.String(120), index=True, unique=True)
phone = db.Column(db.String(32), index=True)
add_time = db.Column(db.DateTime, default=datetime.now())
update_time = db.Column(db.DateTime, default=datetime.now())
@property
def password(self):
raise AttributeError('password is not readable attribute')
@password.setter
def password(self, password):
self.password_hash = generate_password_hash(password)
def verify_password(self, password):
return check_password_hash(self.password_hash, password)
def generate_auth_token(self, expiration):
s = Serializer(current_app.config['SECRET_KEY'],
expires_in=expiration)
return s.dumps({'id': self.id}).decode('utf-8')
@staticmethod
def verify_auth_token(token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return None
return User.query.get(data['id'])
@lm.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
def to_json(self):
json_user = {
'id':self.id,
'username':self.username,
'email':self.email,
'phone':self.phone
}
return json_user
def __repr__(self):
return '<User %r>' % (self.username)
认证文件api/authentication.py
# -*- coding: utf-8 -*-
from flask import g, jsonify, make_response
from flask_httpauth import HTTPBasicAuth
from ..models import User
from . import mod
from .errors import unauthorized, forbidden
from flask_cors import CORS
auth = HTTPBasicAuth()
CORS(mod)
@auth.verify_password
def verify_password(username_or_token, password):
user = User.verify_auth_token(username_or_token)
if not user:
user = User.query.filter_by(username=username_or_token).first()
if not user or not user.verify_password(password):
return False
g.user = user
g.current_user = User.verify_auth_token(username_or_token)
return True
@auth.error_handler
def auth_error():
return unauthorized('Invalid credentials')
@mod.route('/resource/')
@auth.login_required
def get_resource():
user = g.user
return jsonify(user.to_json())
@mod.route('/token/', methods=['POST'])
@auth.login_required
def get_auth_token():
token = g.user.generate_auth_token(3600)
return jsonify({'token': token.decode('ascii'), 'duration': 3600})
@mod.route("/")
@auth.login_required
def index():
return jsonify('Hello, %s' % g.user.username)
@mod.route('/login/')
def login():
return jsonify({'token': 'chensi', 'duration': 600})
### 前端部分
axios.defaults.baseURL = config.API_URL
axios.defaults.auth = {
username: localStorage.token,
password: localStorage.token
}
axios.interceptors.request.use(function (config) {
if (JwtToken.getToken()){
}
}, function (error) {
return Promise.reject(error);
});
router.beforeEach((to, from, next) => {
if (to.meta.require) {
if (store.state.token || JwtToken.getToken()) {
axios.defaults.auth = {
username: JwtToken.getToken(),
password: JwtToken.getToken(),
}
return next()
} else {
return next({'name': 'login'})
}
}
next()
})
组件login.vue
this.$axios.defaults.auth = {
username: this.loginForm.name,
password: this.loginForm.passwd
}
this.$axios.get('/api/token/').then(response => {
let data = response.data
// 保存token
this.$store.dispatch('login', data)
this.$router.push('/home')
}).catch(error => {
this.$Message.error(error.status)
})
保存token
export default {
//保存到localStorage
actions: {
login({commit, dispatch}, tokenData) {
JwtToken.setToken(tokenData.token)
commit({
type: types.SET_USER_TOKEN,
token: tokenData.token
})
}
}
}
mutations: {
//保存到store
[types.SET_USER_TOKEN](state, payload) {
state.username = payload.token
state.password = ''
}
}
以上是利用flask_httpauth做权限验证,相对来说比较简单,大部分教程在网上可以搜到。flask_httpauth的token其实是在headers里面传递,那么假如让其写一个基于websock的API认证呢,由于websocket不能使用headers传递参数,所以只能重写了
我们可以利用jwt这个包,由于Flask使用装饰器进行认证,我们可以重新写一个装饰器进行认证。
from flask_cors import CORS
from apps.models import User
from functools import wraps
import jwt
# 认证的装饰器
def login_required(func):
@wraps(func)
def checkToken(*args,**kwargs):
access_token = request.args.get('access_token')
if access_token is None:
return jsonify({
'status': 401,
'message': "没有有效的token"
}), 401
# 取消过期时间验证,可在配置文件中定义一个多少时长过期 对比
payload = jwt.decode(access_token, current_app.config['SECRET_KEY'], options={'verify_exp': False})
if 'data' in payload and 'id' in payload['data']:
user = User.query.filter_by(id=payload['data']['id']).first()
g.user = user
return func(*args, **kwargs)
return jsonify({
'status': 401,
'message': "token失效"
}), 401
return checkToken
# 登录
@mod.route('/login')
def login():
# 登录判断 代码省略
payload = {
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, seconds=10),
'iat': datetime.datetime.utcnow(),
'iss': 'ken',
'data': {
'id': 1,
'last_time': datetime.datetime.utcnow()
}
}
# 将最后登录时间写入数据库 代码省略
return jwt.encode(
payload,
current_app.config['SECRET_KEY'],
algorithm='HS256'
)
前端
import VueSocketio from 'vue-socket.io';
Vue.use(VueSocketio, 'http://192.168.10.10:5000/');
//把token放到url之后吧
以上是关于Flask+Vue Api认证的主要内容,如果未能解决你的问题,请参考以下文章
Express实战 - 应用案例- realworld-API - 路由设计 - mongoose - 数据验证 - 密码加密 - 登录接口 - 身份认证 - token - 增删改查API(代码片段