Flask+Celery+Redis实现队列化异步任务
Posted 心软且酷丶
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flask+Celery+Redis实现队列化异步任务相关的知识,希望对你有一定的参考价值。
概述:
我们考虑一个场景,公司有一个需求,现在需要做一套web系统,而这套系统某些功能需要使用一些开源工具的sdk和api,或是运行一些耗时比较大的任务(单个大任务下可能有多个小任务),需要一段时间才能提供执行结果,而前端同事要求不能让用户在页面等待,需要马上提供一个返回结果给他,任务执行完后可以拿到最终结果,并且用户退出web界面或浏览器异常关闭之后,再次返回界面,执行的过程不会中断,并且支持多用户同时执行不同操作的需要。
很明显,这是一个-异步多线程-的场景,在Python中可以想到的有:
1.引入Asyncio模块,利用多协程实现。
2.使用Threading模块,自己编写线程任务,线程等待,睡眠,释放线程的过程。
3.使用异步框架,例如Cerely、Tornado、Twisted等等,装饰异步任务。
这里边最便捷且开发效率最高的应该是使用异步框架,咱们选择使用Celery来实现这个需求。
Celery介绍:
截图与描述来自celery官网:Celery - Distributed Task Queue — Celery 5.2.0 documentation
Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。
它是一个专注于实时处理的任务队列,同时也支持任务调度。
Celery 拥有庞大而多样化的用户和贡献者社区,您应该加入我们的 IRC 或我们的邮件列表。
消费者与消费结果:
我们除了需要Celery做异步任务的处理,还需要一个中间件来充当消费者,并保存最终的任务处理结果(消费结果),这里有很多中间件可以选,例如常用的消息中间件,rabbitmq,kafka等,还可以使用mysql,redis等作为消费者并保存消费结果(因为最终的处理结果要返回给前端同事),楼主最终选择了redis。
Redis安装与配置:
这里不再赘述windows下安装redis步骤,只介绍linux下安装redis与配置,我的机器是centos7.6:
yum方式安装(注意:这样安装的redis不是最新版本的,如有对版本要求比较高的,建议去官网下载源码包去手动安装,官网地址:Redis,最新版本:6.2.6)
yum -y install redis
安装完成之后配置redis.conf文件:
vi /etc/redis.conf
修改这一行,改成 0.0.0.0,这样别的应用和组件才可以访问到redis的服务与端口:
同理,redis的默认端口也可以在此配置里修改:
还有一些关闭匿名访问,设置密码等配置的修改,项目若要上到公网环境下,建议配置。
启动并测试redis服务功能是否正常:
启动redis:
redis-cli -h 0.0.0.0
测试redis:
1 redis> set name "zzz"
2
3 OK
4
5 redis> get name
6
7 "zzz"
记住,代码并没有实际引用redis,但也需要安装redis模块,否则会报错。(redis模块版本不要太高,高了也会报错,这些坑都是楼主亲自趟过的,我这里使用2.10.6)
pip install redis==2.10.6
Celery的安装和配置:
windos和linux下都可以使用pip安装:
pip install celery==3.1.25
我的项目目录:(celeryconfig.py与__init__.py文件为celery与redis配置文件):
在项目中先创建一个名为config的python目录,并在__init__.py中导入celery模块并配置:
__init__.py:
from celery import Celery,platforms
platforms.C_FORCE_ROOT = True
app = Celery('prod') # 创建 Celery 实例
app.config_from_object('kernel.config.celeryconfig') # 通过 Celery 实例加载配置模块
platforms.C_FORCE_ROOT = True 这个配置一定要有,否则会报权限问题。
在config目录下的celeryconfig.py中配置任务队列消费者与消费结果保存在redis的地址:
celeryconfig.py:
## celery配置
BROKER_URL = 'redis://redis-host:6379/1' # 指定 Broker消费者,我们使用redis 1号数据库
CELERY_RESULT_BACKEND = 'redis://redis-host:6379/2' # 指定 Backend,最终消费结果,我们使用redis 2号数据库
CELERY_TIMEZONE = 'Asia/Shanghai' # 指定时区,默认是 UTC
CELERY_IMPORTS = ( # 指定导入的任务模块
'kernel.views.api' ## 异步任务代码文件路径即可
)
至此,前期需要的工具准备工作全部完毕,我们开始我们的开发任务。
异步任务开发:
楼主因为主要负责后端这块,这里选择使用flask来写,整体的项目模块与版本,大概罗列下:
Python 3.5.4
Mysql 5.5.64
Celery==3.1.25
Flask==1.1.4
Redis==2.10.6
这时我们与前端同事再次详细沟通了下,初步约定如下:
1.前端通过form表单传数据给后端,格式为json,分析:需要解析json数据。
2.因为存在长耗时的任务,要求一旦前端请求过来,后端要马上返回一个中间结果给前端(这样解决了前端页面等待的问题),分析:需要马上提供一个返回结果。
3.前端最终要拿到任务的最终执行结果,分析:我们需要把长耗时异步任务的最终结果推送给前端,需要任务代码最后推送执行结果。(自己先定义回调接口去测试)
1.后端Flask接口代码:
文件名称与路径:
项目名称-kernel-view-api.py,与celery配置下的任务模块对应。
api.py:
# -*- coding: utf-8 -*-
import json, sys
import logging
import requests
import datetime,pymysql
import os,subprocess
from flask import render_template, Blueprint, request, g, abort, url_for, jsonify, session, redirect,Response
from kernel.models.playbook import PlayBook_file
from kernel.utils import render_response, Retval
from kernel.models import db
from sqlalchemy import or_,text
import gitlab ## 导入gitlab模块
from kernel.config import app, cmdb_config,hcacp_config
import pymysql,uuid,hashlib,time
from datetime import timezone
bp = Blueprint('test', __name__) ## 蓝图自己定义,这里只是实例化
log = logging.getLogger(__name__) ## 日志自己定义,这里只是实例化
class status: ## 定义一些状态码
success = 0
warning = 1
pending = 2
faild = -1
## 回调接口
@bp.route('/test/callback/', methods=['GET', 'POST'])
def ansible_aaa():
data1 = request.get_data(as_text=True)
# data2 = json.loads(data1)
log.info(data1)
return data1
@bp.route('/test/add/', methods=['POST', 'GET'])
def devops_add():
'''
获取form表单json数据
'''
# return True
try:
data = request.get_data()
_data = json.loads((str(data, 'utf-8')))
print(_data)
except Exception as requestdata_except:
log.error('获取表单数据异常,异常原因:%s' % requestdata_except)
return render_response(status.faild, u"获取表单数据异常,异常原因:%s" % requestdata_except, {})
## 获取标识tag的结果
try:
'''
工单json数据要带工单标识符select_tag:
create_project:新建项目申请工单
'''
select_tag = _data.get('select_tag')
except Exception as request_select_tag_except:
log.error('获取表单需求标识select_tag异常,异常原因:%s' % request_select_tag_except)
return render_response(status.faild, u"获取表单需求标识select_tag异常,异常原因:%s" % request_select_tag_except, {})
try:
"""
!--当参数select_tag == create_project 时,建立项目--!
"""
if select_tag == 'create_project':
projname = _data.get('projname')
add_project_result = add_project.delay(cmdb_config, _data)
return render_response(status.pending, u"devops系统添加项目工单任务执行中--pending--", {'项目中文名称': projname})
except Exception as do_celery_job_except:
log.error('执行异步celery任务异常,异常原因:%s' % do_celery_job_except)
return render_response(status.faild, u"执行异步celery任务异常,异常原因:%s" % do_celery_job_except, {})
这里代表前端请求过来之后,马上返回一个执行结果,满足需求2:
在devops_add接口里执行异步任务:
add_project_result = add_project.delay(cmdb_config, _data)
官网的示例:
## 1.扩号里为异步任务所需的参数
## 2.add_project_result 是异步任务执行的对象,包含很多属性方法,下边介绍一些常用的:
获取任务结果和状态:
add_project_result = task.apply_async()
add_project_result.ready() # 查看任务状态,返回布尔值, 任务执行完成, 返回 True, 否则返回 False.
add_project_result.wait() # 会阻塞等待任务完成, 返回任务执行结果,很少使用;
add_project_result.get(timeout=1) # 获取任务执行结果,可以设置等待时间,如果超时但任务未完成返回None;
add_project_result.result # 任务执行结果,未完成返回None;
add_project_result.state # PENDING, START, SUCCESS,任务当前的状态
add_project_result.status # PENDING, START, SUCCESS,任务当前的状态
add_project_result.successful # 任务成功返回true
add_project_result.traceback # 如果任务抛出了一个异常,可以获取原始的回溯信息
2.异步任务代码:
文件名称与路径:
项目名称-kernel-view-api.py
api.py
解释:
因为要满足需求3,把最终异步耗时任务的真正结果给到前端,所以我们需要在异步任务里写一个回调的操作。
header = {'Content-Type': 'application/json'} ## 构造请求头和数据类型
_json = {"status": sttaus.faild, "msg": u"失败", "data": {}} ## 失败就返回给前端json类型失败
_json = {"status": sttaus.success, "msg": u"成功", "data": {}} ## 成功就返回给前端json类型成功
requests.post(callback_url, headers=header, data=json.dumps(_json)) ## 带参回调请求
# -*- coding: utf-8 -*-
import json, sys
import logging
import requests
import datetime,pymysql
import os,subprocess
from flask import render_template, Blueprint, request, g, abort, url_for, jsonify, session, redirect,Response
from kernel.utils import render_response, Retval
from datetime import timezone
from kernel.config import * ## 导入config目录下的celery配置
bp = Blueprint('test', __name__) ## 蓝图自己定义,这里只是实例化
log = logging.getLogger(__name__) ## 日志自己定义,这里只是实例化
class status: ## 定义一些状态码
success = 0
warning = 1
pending = 2
faild = -1
## 示例函数:一个添加信息函数,前端给我们json数据,后端接受之后去插入数据库,完成操作并告诉前端
@app.task ## celery添加项目任务
def add_project(mysql_config, _data):
try:
## 系统添加项目信息工单
projname = _data.get('projname') ## 项目名称,必填
prodesc= _data.get('prodesc') ## 项目描述,必填
projctime = datetime.datetime.now() ## 项目发布时间
callback_url = _data.get('callback_url') ## 回调接口地址
except Exception as describe_form_except:
log.error('解析表单数据出现异常,异常原因:%s' % describe_form_except)
header = {'Content-Type': 'application/json'} ## 回调接口请求头
_json = {"status": status.faild, "msg": u"失败", "data": {}}
requests.post(callback_url, headers=header, data=json.dumps(_json))
try:
# 获取数据库连接
conn = pymysql.connect(cmdb_config.server, cmdb_config.user, cmdb_config.password, database=cmdb_config.db)
# 返回连接
cursor = conn.cursor()
except Exception as connect_except:
log.error('系统数据库连接出现异常,异常原因:%s' % connect_except)
_json = {"status": status.faild, "msg": u"失败", "data": {}}
requests.post(callback_url, headers=header, data=json.dumps(_json))
try:
proj_sql = "insert into project_tb_project (projname,prodesc,projctime) VALUES ('{}','{}','{}');".format(projname, prodesc, projctime)
cursor.execute(proj_sql)
conn.commit()
_json = {"status": status.success, "msg": u"成功", "data": {}}
requests.post(callback_url, headers=header, data=json.dumps(_json))
## 任务执行完成之后调用回调接口,返回任务执行成功结果
log.info('系统建项目工单执行成功,%s' % proj_sql)
except Exception as do_add_project_except:
_json = {"status": status.faild, "msg": u"失败", "data": {}}
requests.post(callback_url, headers=header, data=json.dumps(_json))
log.error('执行添加项目工单异常,异常原因:%s' % do_add_project_except)
## 任务执行完成之后调用回调接口,返回任务执行失败结果
楼主用的最简单,没有在task里写一些属性,类似下边的这种方式还可以给task添加一些属性:
@app.task(name='test',bind=True,base=BaseTask)
补充介绍下异步task有的一些属性:
TASK的一般属性:
Task.name:任务名称;
Task.request:当前任务的信息;
Task.max_retries:设置重试的最大次数
Task.throws:预期错误类的可选元组,不应被视为实际错误,而是结果失败;
Task.rate_limit:设置此任务类型的速率限制
Task.time_limit:此任务的硬限时(以秒为单位)。
Task.ignore_result:不存储任务状态。默认False;
Task.store_errors_even_if_ignored:如果True,即使任务配置为忽略结果,也会存储错误。
Task.serializer:标识要使用的默认序列化方法的字符串。
Task.compression:标识要使用的默认压缩方案的字符串。默认为task_compression设置。
Task.backend:指定该任务的结果存储后端用于此任务。
Task.acks_late:如果设置True为此任务的消息将在任务执行后确认 ,而不是在执行任务之前(默认行为),即默认任务执行之前就会发送确认;
Task.track_started:如果True任务在工作人员执行任务时将其状态报告为“已启动”。默认是False;
我们启动celery来看下celery里在执行任务的过程中有什么变化:
(1)启动项目:
楼主用的是gunicorn工具启动,配置多线程:
gunicorn.conf
workers = 16 ## 多线程配置
bind = '0.0.0.0:7777'
proc_name = 'websocket(项目名称)'
limit_request_field_size = 0
limit_request_line = 0
log_level = 'error'
debug = True
chdir = '/data/websocket' ## 项目目录
启动命令:gunicorn -c /项目目录/gunicorn.conf kernel:app
(2)启动celery:
cd 到项目目录下,执行 celery -A kernel.views.api worker -l info
(3)使用postman调用接口:
可以看到直接先返回我们状态码2-等待状态:
(4)从日志看异步任务执行过程:
1.会先在celery里出现一个异步任务,并生成一个异步任务的task-id号:
2.redis去查看是否已有task任务,task-id号是一致的:
用add_project_result保存异步任务执行结果的对象,最终的结果是在redis中,我们也可以去redis里去拿,redis保存的结果。
我们用的redis 2号数据库,select 2 号数据库,keys * 查看redis是否已有任务
任务最终的执行结果(celery日志里也可以看到,在redis里也可以看到,celery日志看的更直观,succeded代表异步任务执行成功):
3. 查看项目日志,状态码为1,是回调接口打印出来的,代表返回给回调接口最终结果是成功。
4.最终去数据库看下新添加记录是否已有,这里就不截图了,记录插入成功,异步任务执行成功,也满足了开始我们沟通的三个需求。
5.前端同学给你竖起了大拇指,直呼你牛!
备注:
celery还可以用来做定时任务,感兴趣的伙伴们可以去官网或者其他途径去研究下,楼主第一次写这么大的博客,有些地方我描述不清楚的或者您没太看懂的可以私信我答疑解惑,我的微信zcw576020095,热爱python,热爱运维,一起加油!
以上是关于Flask+Celery+Redis实现队列化异步任务的主要内容,如果未能解决你的问题,请参考以下文章