如果params和任务名称已在服务器中排队,是否可以跳过委托芹菜任务?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如果params和任务名称已在服务器中排队,是否可以跳过委托芹菜任务?相关的知识,希望对你有一定的参考价值。
说我有这个任务:
def do_stuff_for_some_time(some_id):
e = Model.objects.get(id=some_id)
e.domanystuff()
而我正在使用它:
do_stuff_for_some_time.apply_async(args=[some_id], queue='some_queue')
我面临的问题是,使用相同的arg参数有很多重复性任务,并且它在队列中令人难以置信。
只有在队列中没有相同的args和相同的任务时才可以应用异步吗?
答案
celery-singleton解决了这个要求
警告:需要redis代理(用于分布式锁)
pip install celery-singleton
使用Singleton
任务库类:
from celery_singleton import Singleton
@celery_app.task(base=Singleton)
def do_stuff_for_some_time(some_id):
e = Model.objects.get(id=some_id)
e.domanystuff()
来自文档:
对do_stuff.delay()的调用将为新任务排队或为当前排队/正在运行的任务实例返回AsyncResult
另一答案
我不确定芹菜是否有这样的选择。不过,我想建议一个解决方法。
1)为排队的所有芹菜任务创建一个模型。在该模型中,保存task_name,queue_name以及参数
2)对于准备排队的每个芹菜任务,在该模型上使用get_or_create。
3)如果在步骤2中created = True,则允许将任务添加到队列中,否则不要将任务添加到队列中
另一答案
我会尝试混合使用cache lock
和task result backend
来存储每个任务的结果:
- 缓存锁定将阻止具有相同参数的任务多次添加到队列中。 Celery文档包含缓存锁实现here的一个很好的例子,但是如果你不想自己创建它,你可以使用celery-once模块。
- 对于任务结果后端,我们将使用推荐的django-celery-results,它创建一个
TaskResult
表,我们将查询任务结果。
例:
- 安装和配置
django-celery-results
:settings.py
:INSTALLED_APPS = ( ..., 'django_celery_results', ) CELERY_RESULT_BACKEND = 'django-db' # You can also use 'django-cache'
./manage.py migrate django_celery_results
- 安装和配置
celery-once
模块:tasks.py
:from celery import Celery from celery_once import QueueOnce from time import sleep celery = Celery('tasks', broker='amqp://guest@localhost//') celery.conf.ONCE = { 'backend': 'celery_once.backends.Redis', 'settings': { 'url': 'redis://localhost:6379/0', 'default_timeout': 60 * 60 } } @celery.task(base=QueueOnce) def do_stuff_for_some_time(some_id): e = Model.objects.get(id=some_id) e.domanystuff()
此时,如果要执行具有相同参数的任务, 将提出AlreadyQueued
例外。 - 让我们用上面的话:
from django_celery_results.models import TaskResult try: result = do_stuff_for_some_time(some_id) except AlreadyQueued: result = TaskResult.objects.get(task_args=some_id)
注意事项:
- 请注意,当
AlreadyQueued
异常出现时,参数=some_id
的初始任务可能无法执行,因此它不会在TaskResult
表中产生结果。 - 记住代码中可能出错的所有内容并挂起上述任何进程(因为它会这样做!)。
额外阅读:
- 另一个Task with Lock DIY implementation
- django-celery-result的
TaskResult
模型。
以上是关于如果params和任务名称已在服务器中排队,是否可以跳过委托芹菜任务?的主要内容,如果未能解决你的问题,请参考以下文章