如果params和任务名称已在服务器中排队,是否可以跳过委托芹菜任务?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如果params和任务名称已在服务器中排队,是否可以跳过委托芹菜任务?相关的知识,希望对你有一定的参考价值。

说我有这个任务:

def do_stuff_for_some_time(some_id):
    e = Model.objects.get(id=some_id)
    e.domanystuff()

而我正在使用它:

do_stuff_for_some_time.apply_async(args=[some_id], queue='some_queue')

我面临的问题是,使用相同的arg参数有很多重复性任务,并且它在队列中令人难以置信。

只有在队列中没有相同的args和相同的任务时才可以应用异步吗?

答案

celery-singleton解决了这个要求

警告:需要redis代理(用于分布式锁)

pip install celery-singleton

使用Singleton任务库类:

from celery_singleton import Singleton

@celery_app.task(base=Singleton)
def do_stuff_for_some_time(some_id):
    e = Model.objects.get(id=some_id)
    e.domanystuff()

来自文档:

对do_stuff.delay()的调用将为新任务排队或为当前排队/正在运行的任务实例返回AsyncResult

另一答案

我不确定芹菜是否有这样的选择。不过,我想建议一个解决方法。

1)为排队的所有芹菜任务创建一个模型。在该模型中,保存task_name,queue_name以及参数

2)对于准备排队的每个芹菜任务,在该模型上使用get_or_create。

3)如果在步骤2中created = True,则允许将任务添加到队列中,否则不要将任务添加到队列中

另一答案

我会尝试混合使用cache locktask result backend来存储每个任务的结果:

  • 缓存锁定将阻止具有相同参数的任务多次添加到队列中。 Celery文档包含缓存锁实现here的一个很好的例子,但是如果你不想自己创建它,你可以使用celery-once模块。
  • 对于任务结果后端,我们将使用推荐的django-celery-results,它创建一个TaskResult表,我们将查询任务结果。

例:

  • 安装和配置django-celery-resultssettings.pyINSTALLED_APPS = ( ..., 'django_celery_results', ) CELERY_RESULT_BACKEND = 'django-db' # You can also use 'django-cache' ./manage.py migrate django_celery_results
  • 安装和配置celery-once模块: tasks.pyfrom celery import Celery from celery_once import QueueOnce from time import sleep celery = Celery('tasks', broker='amqp://guest@localhost//') celery.conf.ONCE = { 'backend': 'celery_once.backends.Redis', 'settings': { 'url': 'redis://localhost:6379/0', 'default_timeout': 60 * 60 } } @celery.task(base=QueueOnce) def do_stuff_for_some_time(some_id): e = Model.objects.get(id=some_id) e.domanystuff() 此时,如果要执行具有相同参数的任务, 将提出AlreadyQueued例外。
  • 让我们用上面的话: from django_celery_results.models import TaskResult try: result = do_stuff_for_some_time(some_id) except AlreadyQueued: result = TaskResult.objects.get(task_args=some_id)

注意事项:

  • 请注意,当AlreadyQueued异常出现时,参数= some_id的初始任务可能无法执行,因此它不会在TaskResult表中产生结果。
  • 记住代码中可能出错的所有内容并挂起上述任何进程(因为它会这样做!)。

额外阅读:

以上是关于如果params和任务名称已在服务器中排队,是否可以跳过委托芹菜任务?的主要内容,如果未能解决你的问题,请参考以下文章

用于多线程的 NAudio:缓冲区已在播放错误时排队

如何按名称限制运行 Celery 任务的最大数量

银行排队问题平均等到时间计算java?

Java Executors 不排队某些任务

快速失败Cassandra NTR阻止了任务

如何判断任务是不是已经在 django-celery 中排队?