celery使用方法中遇见的问题
Posted Jason_WangYing
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了celery使用方法中遇见的问题相关的知识,希望对你有一定的参考价值。
1. Windows中使用Celery 4.0及以后版本
Celery 4.0+及以后版本不支持在windows系统上运行。如果你希望在windows系统上使用celery, 有两种方法。
方法一:安装3.1.25版本
pip install celery==3.1.25
方法二:安装gevent
pip install gevent
# 启动worker
celery -A <module> worker -l info -P gevent
2. @task与@shared_task的区别
当我们使用@app.task装饰器定义我们的异步任务时,那么这个任务依赖于根据项目名myproject生成的Celery实例。
app = Celery('myproject')
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
然而我们在进行Django开发时为了保证每个app的可重用性,我们经常会在每个app文件夹下编写异步任务,这些任务并不依赖于具体的Django项目名。使用@shared_task 装饰器能让我们避免对某个项目名对应Celery实例的依赖,使app的可移植性更强。
from __future__ import absolute_import
from celery import shared_task
@shared_task
def add(x, y):
return x + y
3. 如果异步的任务包括耗时的I/O操作
一个无限期阻塞的任务会使得工作单元无法再做其他事情。如果你的任务里有 I/O 操作,请确保给这些操作加上超时时间,例如使用 requests 库时给网络请求添加一个超时时间:
connect_timeout, read_timeout = 5.0, 30.0
response = requests.get(URL, timeout=(connect_timeout, read_timeout))
默认的 prefork 池调度器对长时间任务不是很友好,所以如果你的任务需要运行很长时间,确保在启动工作单元时使能了 -ofair 选项。
4. 使用多装饰器
当使用多个装饰器装饰任务函数时,确保 task 装饰器最后应用(在python中,这意味它必须在第一个位置):
@app.task
@decorator2
@decorator1
def add(x, y):
return x + y
5. 使用bind=True绑定任务
一个绑定任务意味着任务函数的第一个参数总是任务实例本身(self
),就像 Python 绑定方法类似,如下例所示:
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@task(bind=True)
def add(self, x, y):
logger.info(self.request.id)
绑定任务在这些情况下是必须的:任务重试(使用 app.Task.retry()
),访问当前任务请求的信息,以及你添加到自定义任务基类的附加功能。
6. 忽略不想要的结果
如果你不在意任务的返回结果,可以设置 ignore_result
选项,因为存储结果耗费时间和资源。你还可以可以通过 task_ignore_result
设置全局忽略任务结果。
@app.task(ignore_result=True)
def mytask():
something()
7. 避免启动同步子任务
让一个任务等待另外一个任务的返回结果是很低效的,并且如果工作单元池被耗尽的话这将会导致死锁。
# 坏例子
@app.task
def update_page_info(url):
page = fetch_page.delay(url).get()
info = parse_page.delay(url, page).get()
store_page_info.delay(url, info)
@app.task
def fetch_page(url):
return myhttplib.get(url)
@app.task
def parse_page(url, page):
return myparser.parse_document(page)
@app.task
def store_page_info(url, info):
return PageInfo.objects.create(url, info)
# 好例子
def update_page_info(url):
# fetch_page -> parse_page -> store_page
chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
chain()
@app.task()
def fetch_page(url):
return myhttplib.get(url)
@app.task()
def parse_page(page):
return myparser.parse_document(page)
@app.task(ignore_result=True)
def store_page_info(info, url):
PageInfo.objects.create(url=url, info=info)
在好例子里,我们将不同的任务签名链接起来创建一个任务链,三个子任务按顺序执行。警告:不建议同步执行子任务!
8. Django的模型对象不应该作为参数传递给任务
Django 的模型对象。他们不应该作为参数传递给任务。几乎总是在任务运行时从数据库获取对象是最好的,因为老的数据会导致竞态条件。假象有这样一个场景,你有一篇文章,以及自动展开文章中缩写的任务:
class Article(models.Model):
title = models.CharField()
body = models.TextField()
@app.task
def expand_abbreviations(article):
article.body.replace('MyCorp', 'My Corporation')
article.save()
首先,作者创建一篇文章并保存,这时作者点击一个按钮初始化一个缩写展开任务:
>>> article = Article.objects.get(id=102)
>>> expand_abbreviations.delay(article)
现在,队列非常忙,所以任务在2分钟内都不会运行。与此同时,另一个作者修改了这篇文章,当这个任务最终运行,因为老版本的文章作为参数传递给了这个任务,所以这篇文章会回滚到老的版本。修复这个竞态条件很简单,只要参数传递文章的 id 即可,此时可以在任务中重新获取这篇文章:
@app.task
def expand_abbreviations(article_id):
article = Article.objects.get(id=article_id)
article.body.replace('MyCorp', 'My Corporation')
article.save()
9. 使用on_commit函数处理事务
我们再看另外一个celery中处理事务的例子。这是在数据库中创建一个文章对象的 Django 视图,此时传递主键给任务。它使用 commit_on_success
装饰器,当视图返回时该事务会被提交,当视图抛出异常时会进行回滚。
from django.db import transaction
@transaction.commit_on_success
def create_article(request):
article = Article.objects.create()
expand_abbreviations.delay(article.pk)
如果在事务提交之前任务已经开始执行会产生一个竞态条件;数据库对象还不存在。解决方案是使用 on_commit
回调函数来在所有事务提交成功后启动任务。
from django.db.transaction import on_commit
def create_article(request):
article = Article.objects.create()
on_commit(lambda: expand_abbreviations.delay(article.pk))
10. 自定义重试延迟
当任务发送例外时,app.Task.retry()
函数可以用来重新执行任务。当一个任务被重试,它在重试前会等待给定的时间,并且默认的由 default_retry_delay
属性定义。默认设置为 3 分钟。注意延迟设置的单位是秒(int 或者 float)。你可以通过提供 countdown
参数覆盖这个默认值。
# retry in 30 minutes.
@app.task(bind=True, default_retry_delay=30 * 60)
def add(self, x, y):
try:
something_raising()
except Exception as exc:
# overrides the default delay to retry after 1 minute
raise self.retry(exc=exc, countdown=60)
以上是关于celery使用方法中遇见的问题的主要内容,如果未能解决你的问题,请参考以下文章
异步 celery 任务完成后自动调用 PHP 代码(celery-php)
Celery 在远程任务上使用 Django Result Backend