celery实现任务进度查询

Posted staff

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了celery实现任务进度查询相关的知识,希望对你有一定的参考价值。

 

如果想在任务成功或者失败额外做点事,可以重写Task类。

tasks.py

from __future__ import absolute_import, unicode_literals
from .celery import app
from celery.task import Task

class MyTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print(task done==================>: {0}.format(retval))
        return super(MyTask, self).on_success(retval, task_id, args, kwargs)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print(task fail, reason=================>: {0}.format(exc))
        return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)

@app.task(base=MyTask)
def add(x, y):
    return x + y

 

@app.task(bind=True)

意思是绑定任务为实例方法,执行中的任务能获取到了自己执行任务的各种信息,可以根据这些信息做很多其他操作,例如判断链式任务是否到结尾等等。

示例:

# tasks.py
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)
@app.task(bind=True)
def add(self, x, y):
    logger.info(self.request.__dict__)
    return x + y

 

案例模拟进度条,查看任务进度

------------------目录结构-------------------

--start.py

--proj

   --celery.py

   --tasks.py

--------------------------------------------------

技术图片
from __future__ import absolute_import, unicode_literals
#导入安装的celery(所以上面要写绝对导入),而不是自己导自己(from .celery)
from celery import Celery

app = Celery(proj,
             broker=redis://,
             backend=redis://,
             include=[proj.tasks,])  # 这个celery管理了哪些task文件可以有多个,其中有个定时任务periodic_task

# Optional configuration, see the application user guide.
# update方法更新配置,也可以直接写在上面初始化Celery里面
app.conf.update(
    result_expires=3600,  # 任务结果一小时内没人取就丢弃
)

if __name__ == __main__:
    app.start()
celery.py

 

技术图片
from __future__ import absolute_import, unicode_literals
from .celery import app
#from celery import Celery
import time

@app.task(bind=True)
def test_mes(self):
    for i in range(1, 11):
        time.sleep(1)
        self.update_state(state="PROGRESS", meta={p: i*10})
    return finish
tasks.py

 

技术图片
from proj.tasks import test_mes
import sys

def pm(body):
    res = body.get(result)
    if body.get(status) == PROGRESS:
        sys.stdout.write(
任务进度: {0}%.format(res.get(p)))
        sys.stdout.flush()
    else:
        print(
)
        print(res)
r = test_mes.delay()
r.get(on_message=pm, propagate=False)  #‘FINISH‘
run.py

 

excel入库步骤进度显示

tasks.py

from .celery import app
import time

@app.task(bind=True)
def excel_info_db(self):
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={step:加载excel到内存, progress:100%})
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={step: 在内存中校验excel, progress: 100%})
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={step: 入库, progress: 100%})
    return finish

 

链式任务:

有些任务可能需由几个子任务组成,此时调用各个子任务的方式就变的很重要,尽量不要以同步阻塞的方式调用子任务,而是用异步回调的方式进行链式任务的调用:

from .celery import app
import time

def update_page_info(url):
    # fetch_page -> parse_page -> store_page
    chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
    chain()
‘‘‘
也可以这样写:
fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)])
‘‘‘


@app.task()
def fetch_page(url):
    time.sleep(3)
    return "抓取到了%s的内容" % url

@app.task()
def parse_page(page):
    # page是fentch_page的返回值
    time.sleep(3)
    return "对%s做解析,得到数据" % page

@app.task(ignore_result=True)
def store_page_info(info, url):
    # info是parse_page的返回值,url是调用时传的参
    time.sleep(3)
    print(info)
    print("%s的数据成功入库" % url)
    return(入库成功)

调用:

>>> from proj.tasks import update_page_info
>>> t = update_page_info(‘www.google.com‘)

技术图片

 

 

链式任务中前一个任务的返回值默认是下一个任务的输入值之一 ( 不想让返回值做默认参数可以用 si() 或者 s(immutable=True) 的方式调用 )。

这里的 s() 是方法 celery.signature() 的快捷调用方式,signature 具体作用就是生成一个包含调用任务及其调用参数与其他信息的对象,个人感觉有点类似偏函数的概念:先不执行任务,而是把任务与任务参数存起来以供其他地方调用。

调用任务方式二:fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)])

前面讲了调用任务不能直接使用普通的调用方式,而是要用类似 add.delay(2, 2) 的方式调用,而链式任务中又用到了 apply_async 方法进行调用,实际上 delay 只是 apply_async 的快捷方式,二者作用相同,只是 apply_async 可以进行更多的任务属性设置,比如 callbacks/errbacks 正常回调与错误回调、执行超时、重试、重试时间等等,具体参数可以参考:https://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async

 

以上是关于celery实现任务进度查询的主要内容,如果未能解决你的问题,请参考以下文章

是否可以显示 django celery 任务的进度条? [复制]

celery使用group或者chord如何实时更新状态进度?

Celery实现定时任务crontab

Celery学习--- Celery 最佳实践之与django结合实现异步任务

Django-Celery 进度条

Celery+Rabbitmq实现异步任务