celery使用方法中遇见的问题

Posted Jason_WangYing

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了celery使用方法中遇见的问题相关的知识,希望对你有一定的参考价值。

1. Windows中使用Celery 4.0及以后版本

Celery 4.0+及以后版本不支持在windows系统上运行。如果你希望在windows系统上使用celery, 有两种方法。

方法一:安装3.1.25版本

pip install celery==3.1.25

方法二:安装gevent

  1. pip install gevent

  2. # 启动worker

  3. celery -A <module> worker -l info -P gevent

2. @task与@shared_task的区别

当我们使用@app.task装饰器定义我们的异步任务时,那么这个任务依赖于根据项目名myproject生成的Celery实例。

  1. app = Celery('myproject')

  2. @app.task(bind=True)

  3. def debug_task(self):

  4. print('Request: {0!r}'.format(self.request))

然而我们在进行Django开发时为了保证每个app的可重用性,我们经常会在每个app文件夹下编写异步任务,这些任务并不依赖于具体的Django项目名。使用@shared_task 装饰器能让我们避免对某个项目名对应Celery实例的依赖,使app的可移植性更强。

 
  1. from __future__ import absolute_import

  2. from celery import shared_task

  3. @shared_task

  4. def add(x, y):

  5. return x + y

3. 如果异步的任务包括耗时的I/O操作

一个无限期阻塞的任务会使得工作单元无法再做其他事情。如果你的任务里有 I/O 操作,请确保给这些操作加上超时时间,例如使用 requests 库时给网络请求添加一个超时时间:

  1. connect_timeout, read_timeout = 5.0, 30.0

  2. response = requests.get(URL, timeout=(connect_timeout, read_timeout))

默认的 prefork 池调度器对长时间任务不是很友好,所以如果你的任务需要运行很长时间,确保在启动工作单元时使能了 -ofair 选项。

4. 使用多装饰器

当使用多个装饰器装饰任务函数时,确保 task 装饰器最后应用(在python中,这意味它必须在第一个位置):

@app.task

@decorator2

@decorator1

def add(x, y):

    return x + y

5. 使用bind=True绑定任务

一个绑定任务意味着任务函数的第一个参数总是任务实例本身(self),就像 Python 绑定方法类似,如下例所示:

  1. from celery.utils.log import get_task_logger

  2. logger = get_task_logger(__name__)

  3. @task(bind=True)

  4. def add(self, x, y):

  5. logger.info(self.request.id)

绑定任务在这些情况下是必须的:任务重试(使用 app.Task.retry() ),访问当前任务请求的信息,以及你添加到自定义任务基类的附加功能。

6. 忽略不想要的结果

如果你不在意任务的返回结果,可以设置 ignore_result 选项,因为存储结果耗费时间和资源。你还可以可以通过 task_ignore_result 设置全局忽略任务结果。

  1. @app.task(ignore_result=True)

  2. def mytask():

  3.     something()

7. 避免启动同步子任务

让一个任务等待另外一个任务的返回结果是很低效的,并且如果工作单元池被耗尽的话这将会导致死锁。

# 坏例子

  1. @app.task

  2. def update_page_info(url):

  3. page = fetch_page.delay(url).get()

  4. info = parse_page.delay(url, page).get()

  5. store_page_info.delay(url, info)

  6. @app.task

  7. def fetch_page(url):

  8. return myhttplib.get(url)

  9. @app.task

  10. def parse_page(url, page):

  11. return myparser.parse_document(page)

  12. @app.task

  13. def store_page_info(url, info):

  14. return PageInfo.objects.create(url, info)

# 好例子

  1. def update_page_info(url):

  2. # fetch_page -> parse_page -> store_page

  3. chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)

  4. chain()

  5. @app.task()

  6. def fetch_page(url):

  7. return myhttplib.get(url)

  8. @app.task()

  9. def parse_page(page):

  10. return myparser.parse_document(page)

  11. @app.task(ignore_result=True)

  12. def store_page_info(info, url):

  13. PageInfo.objects.create(url=url, info=info)

在好例子里,我们将不同的任务签名链接起来创建一个任务链,三个子任务按顺序执行。警告:不建议同步执行子任务!

8.  Django的模型对象不应该作为参数传递给任务

Django 的模型对象。他们不应该作为参数传递给任务。几乎总是在任务运行时从数据库获取对象是最好的,因为老的数据会导致竞态条件。假象有这样一个场景,你有一篇文章,以及自动展开文章中缩写的任务:

  1. class Article(models.Model):

  2. title = models.CharField()

  3. body = models.TextField()

  4. @app.task

  5. def expand_abbreviations(article):

  6. article.body.replace('MyCorp', 'My Corporation')

  7. article.save()

首先,作者创建一篇文章并保存,这时作者点击一个按钮初始化一个缩写展开任务:

  1. >>> article = Article.objects.get(id=102)

  2. >>> expand_abbreviations.delay(article)

现在,队列非常忙,所以任务在2分钟内都不会运行。与此同时,另一个作者修改了这篇文章,当这个任务最终运行,因为老版本的文章作为参数传递给了这个任务,所以这篇文章会回滚到老的版本。修复这个竞态条件很简单,只要参数传递文章的 id 即可,此时可以在任务中重新获取这篇文章:

  1. @app.task

  2. def expand_abbreviations(article_id):

  3. article = Article.objects.get(id=article_id)

  4. article.body.replace('MyCorp', 'My Corporation')

  5. article.save()

9. 使用on_commit函数处理事务

我们再看另外一个celery中处理事务的例子。这是在数据库中创建一个文章对象的 Django 视图,此时传递主键给任务。它使用 commit_on_success 装饰器,当视图返回时该事务会被提交,当视图抛出异常时会进行回滚。

  1. from django.db import transaction

  2. @transaction.commit_on_success

  3. def create_article(request):

  4. article = Article.objects.create()

  5. expand_abbreviations.delay(article.pk)

如果在事务提交之前任务已经开始执行会产生一个竞态条件;数据库对象还不存在。解决方案是使用 on_commit 回调函数来在所有事务提交成功后启动任务。

  1. from django.db.transaction import on_commit

  2. def create_article(request):

  3. article = Article.objects.create()

  4. on_commit(lambda: expand_abbreviations.delay(article.pk))

10. 自定义重试延迟

当任务发送例外时,app.Task.retry() 函数可以用来重新执行任务。当一个任务被重试,它在重试前会等待给定的时间,并且默认的由 default_retry_delay 属性定义。默认设置为 3 分钟。注意延迟设置的单位是秒(int 或者 float)。你可以通过提供 countdown 参数覆盖这个默认值。

 
  1. # retry in 30 minutes.

  2. @app.task(bind=True, default_retry_delay=30 * 60)

  3. def add(self, x, y):

  4. try:

  5. something_raising()

  6. except Exception as exc:

  7. # overrides the default delay to retry after 1 minute

  8. raise self.retry(exc=exc, countdown=60)

以上是关于celery使用方法中遇见的问题的主要内容,如果未能解决你的问题,请参考以下文章

在 Celery 中使用 Python 标准日志记录

异步 celery 任务完成后自动调用 PHP 代码(celery-php)

django+celery实现异步任务

Celery 在远程任务上使用 Django Result Backend

尝试使用 celery beat 在 django 中调度一个函数但给出错误

django+celery+rabbitmq 编码错误和 sig-kill