pythonCelery实现异步任务
Posted sysu_lluozh
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了pythonCelery实现异步任务相关的知识,希望对你有一定的参考价值。
一、简介
Celery是一个异步任务的调度工具
Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个worker的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农
关于Celery的介绍可查看官方文档
二、工作流程
Broker
在Python中定义Celery的时候,要引入Broker(消息中间件),Broker起到一个中间人的角色。在工头提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农等着取出一个个任务准备着手做
Backend
这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的,所以要引入Backend来保存每次任务的结果。这个Backend有点像Broker,也是存储任务的信息,只不过这里存的是那些任务的返回结果。
可以选择只让错误执行的任务返回结果到Backend,这样取回结果便可以知道有多少任务执行失败
三、应用场景
- 在项目中有一些耗时的操作需要执行,但是又不想一直阻塞前端,那么可以尝试使用Celery的后台任务,将请求发送到Celery后台,然后前端不再阻塞,最后Celery后台将任务完成之后将结果返回。用户即可知道任务是否执行成功
- 做数据库异步更新,可能要在某个时间点与远程数据库进行同步,更新的时间也比较久,不希望阻塞前端的进程
- 执行定时任务,比如每天检测一下所有客户的资料,如果发现今天是客户的生日,就给他发个短信祝福
四、使用示例
4.1 安装依赖包
pip install celery
4.2 在Flask中初始化Celery
from flask import Flask
from celery import Celery
app = Flask(__name__)
# 配置
# 配置消息代理的路径,如果是在远程服务器上,则配置远程服务器中redis的URL
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
# 要存储 Celery 任务的状态或运行结果时就必须要配置
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
# 初始化Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
# 将Flask中的配置直接传递给Celery
celery.conf.update(app.config)
上述代码中,通过Celery类初始化celery对象,传入的应用名称与消息代理的连接URL
4.3 通过celery.task装饰器装饰对应函数
@app.route('/', methods=['GET', 'POST'])
def index():
task = long_task.delay(1, 2)
delay()方法是applyasync()方法的快捷方式,applyasync()参数更多,可以更加细致的控制耗时任务,比如想要long_task()在一分钟后再执行
@app.route('/', methods=['GET', 'POST'])
def index():
task = long_task.apply_async(args=[1, 2], countdown=60)
delay()与apply_async()会返回一个任务对象,该对象可以获取任务的状态与各种相关信息
五、获取状态信息
接着具体来实现让前端可以通过一个进度条来判断后端任务的执行情况的需求
# bind为True,会传入self给被装饰的方法
@celery.task(bind=True)
def long_task(self):
verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
message = ''
total = random.randint(10, 50)
for i in range(total):
if not message or random.random() < 0.25:
# 随机的获取一些信息
message = '0 1 2...'.format(
random.choice(verb),
random.choice(adjective),
random.choice(noun)
)
# 更新Celery任务状态
self.update_state(
state='PROGRESS',
meta='current': i, 'total': total,'status': message
)
time.sleep(1)
# 返回字典
return
'current': 100,
'total': 100,
'status': 'Task completed!',
'result': 42
上述代码中,celery.task()装饰器使用了bind=True参数,这个参数会让Celery将Celery本身传入,可以用于记录与更新任务状态
然后就是一个for迭代,迭代的逻辑没什么意义,就是随机从 list 中抽取一些词汇来模拟一些逻辑的运行,为了表示这是耗时逻辑,通过time.sleep(1)休眠一秒
每次获取一次词汇,就通过self.update_state()更新Celery任务的状态,Celery包含一些内置状态,如SUCCESS、STARTED等等,这里使用了自定义状态PROGRESS,除了状态外,还将本次循环的一些信息通过meta参数(元数据)以字典的形式存储起来。有了这些数据,前端即可显示进度条
定义好耗时方法后,再定义一个Flask接口方法来调用该耗时方法
@app.route('/longtask', methods=['POST'])
def longtask():
# 异步调用
task = long_task.apply_async()
# 返回 202,与Location头
return jsonify(), 202, 'Location': url_for('taskstatus', task_id=task.id)
简单而言,前端通过POST请求到/longtask,让后端开始去执行耗时任务
返回的状态码为202,202通常表示一个请求正在进行中,然后还在返回数据包的包头(Header)中添加了 Location头信息,前端可以通过读取数据包中Header中的Location的信息来获取任务id对应的完整url
前端有了任务id对应的url后,还需要提供一个接口给前端,让前端可以通过任务id去获取当前时刻任务的具体状态
@app.route('/status/<task_id>')
def taskstatus(task_id):
task = long_task.AsyncResult(task_id)
if task.state == 'PENDING': # 在等待
response =
'state': task.state,
'current': 0,
'total': 1,
'status': 'Pending...'
elif task.state != 'FAILURE': # 没有失败
response =
'state': task.state, # 状态
# meta中的数据,通过task.info.get()可以获得
'current': task.info.get('current', 0), # 当前循环进度
'total': task.info.get('total', 1), # 总循环进度
'status': task.info.get('status', '')
if 'result' in task.info:
response['result'] = task.info['result']
else:
# 后端执行任务出现了一些问题
response =
'state': task.state,
'current': 1,
'total': 1,
'status': str(task.info), # 报错的具体异常
return jsonify(response)
为了可以获得任务对象中的信息,使用任务id初始化AsyncResult类,获得任务对象,然后可以从任务对象中获得当前任务的信息
该方法会返回一个JSON,其中包含了任务状态以及meta中指定的信息,前端可以利用这些信息构建一个进度条
如果任务在PENDING状态,表示该任务还没有开始,在这种状态下,任务中是没有什么信息的,这里人为的返回一些数据。如果任务执行失败,就返回task.info中包含的异常信息,此外就是正常执行,正常执行可以通task.info获得任务中具体的信息
六、服务启动
首先运行Redis
redis-server
然后运行celery
celery worker -A app.celery --loglevel=info
最后运行Flask项目
python app.py
以上是关于pythonCelery实现异步任务的主要内容,如果未能解决你的问题,请参考以下文章