Flask+Vue Api认证

Posted 木小撇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flask+Vue Api认证相关的知识,希望对你有一定的参考价值。

这篇文章其实是重构,因为最近写一个基于Flask_socket的后台的时候发现基于flask_httpauth的这个Api认证违法满足需求。再兼之上次写得过于匆忙,于是决心重构一波。

首先Flask的项目结构基本采用狗书的Flasky, 采用flask_cors扩展解决跨域的问题。

### 后端部分

项目结构如下,具体请参考开源Flasky。

  config.py
 manage.py
 README.md
 requirements.txt
└─app
     models.py
     __init__.py
   
   ├─api
         authentication.py
         errors.py
         __init__.py
   
   ├─auth
         forms.py
         views.py
         __init__.py
   
   └─main
           views.py
           __init__.py

其中api提供认证接口。

项目初始化文件 app/__init__.py

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
from config import config

lm = LoginManager()
lm.session_protection = 'strong'
lm.login_view = 'auth.login'
db = SQLAlchemy()

def register_blueprints(app):
   # Prevents circular imports
   from auth import mod as auth
   from main import mod as main
   from api import mod as api
   app.register_blueprint(auth, url_prefix='/auth')
   app.register_blueprint(main)
   app.register_blueprint(api, url_prefix='/api')

def create_app(config_name):
   app = Flask(__name__)
   app.config.from_object(config[config_name])
   config[config_name].init_app(app)
   register_blueprints(app)
   db.init_app(app)
   lm.init_app(app)
   return app

数据库model

from app import db
from datetime import datetime
from flask_login import UserMixin
from app import lm
from flask import current_app
from werkzeug.security import generate_password_hash,check_password_hash
from itsdangerous import (TimedJSONWebSignatureSerializer
                         as Serializer, BadSignature, SignatureExpired)
class User(db.Model,UserMixin):
   __tablename__ = 'user'
   id = db.Column(db.Integer, primary_key=True)
   username = db.Column(db.String(64), index=True)
   password_hash = db.Column(db.String(64), index=True)
   nickname = db.Column(db.String(64), index=True, unique=True)
   email = db.Column(db.String(120), index=True, unique=True)
   phone = db.Column(db.String(32), index=True)
   add_time = db.Column(db.DateTime, default=datetime.now())
   update_time = db.Column(db.DateTime, default=datetime.now())

   @property
   def password(self):
       raise AttributeError('password is not readable attribute')

   @password.setter
   def password(self, password):
       self.password_hash = generate_password_hash(password)

   def verify_password(self, password):
       return check_password_hash(self.password_hash, password)

   def generate_auth_token(self, expiration):
       s = Serializer(current_app.config['SECRET_KEY'],
                      expires_in=expiration)
       return s.dumps({'id': self.id}).decode('utf-8')

   @staticmethod
   def verify_auth_token(token):
       s = Serializer(current_app.config['SECRET_KEY'])
       try:
           data = s.loads(token)
       except:
           return None
       return User.query.get(data['id'])

   @lm.user_loader
   def load_user(user_id):
       return User.query.get(int(user_id))

   def to_json(self):
       json_user = {
           'id':self.id,
           'username':self.username,
           'email':self.email,
           'phone':self.phone

       }
       return json_user
       
   def __repr__(self):
       return '<User %r>' % (self.username)

认证文件api/authentication.py

# -*- coding: utf-8 -*-
from flask import g, jsonify, make_response
from flask_httpauth import HTTPBasicAuth
from ..models import User
from . import mod
from .errors import unauthorized, forbidden
from flask_cors import CORS

auth = HTTPBasicAuth()

CORS(mod)

@auth.verify_password
def verify_password(username_or_token, password):
   user = User.verify_auth_token(username_or_token)
   if not user:
       user = User.query.filter_by(username=username_or_token).first()
       if not user or not user.verify_password(password):
           return False
   g.user = user
   g.current_user = User.verify_auth_token(username_or_token)
   return True


@auth.error_handler
def auth_error():
   return unauthorized('Invalid credentials')

@mod.route('/resource/')
@auth.login_required
def get_resource():
   user = g.user
   return jsonify(user.to_json())

@mod.route('/token/', methods=['POST'])
@auth.login_required
def get_auth_token():
   token = g.user.generate_auth_token(3600)
   return jsonify({'token': token.decode('ascii'), 'duration': 3600})


@mod.route("/")
@auth.login_required
def index():
   return jsonify('Hello, %s' % g.user.username)

@mod.route('/login/')
def login():
   return jsonify({'token': 'chensi', 'duration': 600})

### 前端部分
axios.defaults.baseURL = config.API_URL
axios.defaults.auth = {
   username: localStorage.token,
   password: localStorage.token
}

axios.interceptors.request.use(function (config) {
 if (JwtToken.getToken()){

 }
}, function (error) {
 return Promise.reject(error);
});

router.beforeEach((to, from, next) => {
 if (to.meta.require) {
   if (store.state.token || JwtToken.getToken()) {
     axios.defaults.auth = {
       username: JwtToken.getToken(),
       password: JwtToken.getToken(),
     }
     return next()
   } else {
     return next({'name': 'login'})
   }
 }
 next()
})

组件login.vue

this.$axios.defaults.auth = {
   username: this.loginForm.name,
   password: this.loginForm.passwd
}

this.$axios.get('/api/token/').then(response => {
   let data = response.data
   // 保存token
   this.$store.dispatch('login', data)
   this.$router.push('/home')
}).catch(error => {
   this.$Message.error(error.status)
})

保存token

export default {
   //保存到localStorage
   actions: {
       login({commit, dispatch}, tokenData) {
           JwtToken.setToken(tokenData.token)
           commit({
               type: types.SET_USER_TOKEN,
               token: tokenData.token
           })
       }
   }
}

mutations: {
   //保存到store
   [types.SET_USER_TOKEN](state, payload) {
       state.username = payload.token
       state.password = ''
   }
}

以上是利用flask_httpauth做权限验证,相对来说比较简单,大部分教程在网上可以搜到。flask_httpauth的token其实是在headers里面传递,那么假如让其写一个基于websock的API认证呢,由于websocket不能使用headers传递参数,所以只能重写了

我们可以利用jwt这个包,由于Flask使用装饰器进行认证,我们可以重新写一个装饰器进行认证。

from flask_cors import CORS
from apps.models import User
from functools import wraps
import jwt

# 认证的装饰器
def login_required(func):
   @wraps(func)
   def checkToken(*args,**kwargs):
       access_token = request.args.get('access_token')
       if access_token is None:
           return jsonify({
               'status': 401,
               'message': "没有有效的token"
           }), 401
     
       # 取消过期时间验证,可在配置文件中定义一个多少时长过期  对比
       payload = jwt.decode(access_token, current_app.config['SECRET_KEY'], options={'verify_exp': False})
       if 'data' in payload and 'id' in payload['data']:
           user = User.query.filter_by(id=payload['data']['id']).first()
           g.user = user
           return func(*args, **kwargs)
       return jsonify({
           'status': 401,
           'message': "token失效"
       }), 401
   return checkToken

# 登录
@mod.route('/login')
def login():
   # 登录判断 代码省略
   payload = {
       'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, seconds=10),
       'iat': datetime.datetime.utcnow(),
       'iss': 'ken',
       'data': {
           'id': 1,
           'last_time': datetime.datetime.utcnow()
       }
   }
   # 将最后登录时间写入数据库  代码省略
   return jwt.encode(
       payload,
       current_app.config['SECRET_KEY'],
       algorithm='HS256'
   )

前端

import VueSocketio from 'vue-socket.io';
Vue.use(VueSocketio, 'http://192.168.10.10:5000/');
//把token放到url之后吧



以上是关于Flask+Vue Api认证的主要内容,如果未能解决你的问题,请参考以下文章

使用 Flask 设计 RESTful 的认证

Flask+vue的Token认证

Express实战 - 应用案例- realworld-API - 路由设计 - mongoose - 数据验证 - 密码加密 - 登录接口 - 身份认证 - token - 增删改查API(代码片段

Flask 编写http接口api及接口自动化测试

[flask] flask api + vue 跨域问题

Flask Restless CSRF 豁免认证 URL