Celery 在使用 app.control.purge() 时运行任务会发生啥?

Posted

技术标签:

【中文标题】Celery 在使用 app.control.purge() 时运行任务会发生啥?【英文标题】:Celery what happen to running tasks when using app.control.purge()?Celery 在使用 app.control.purge() 时运行任务会发生什么? 【发布时间】:2020-06-18 19:03:10 【问题描述】:

目前我有一个用 django 运行的芹菜批次,如下所示:

芹菜.py:

from __future__ import absolute_import, unicode_literals
import os
import celery
from celery import Celery
from celery.schedules import crontab
import django

load_dotenv(os.path.join(os.path.dirname(os.path.dirname(__file__)), '.env'))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'base.settings')
django.setup()
app = Celery('base')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
   app.control.purge()
   sender.add_periodic_task(30.0, check_loop.s())
   recursion_function.delay() #need to use recursive because it need to wait for loop to finish(time can't be predict)
   print("setup_periodic_tasks")

@app.task()
def check_loop():
    .....
    start = database start number
    end = database end number
    callling apis in a list from id=start to id=end
    create objects
    update database(start number = end, end number = end + 3)

    ....


@app.task()
def recursion_function(default_retry_delay=10):
   .....
   do some looping
   ....
   #when finished, call itself again
   recursion_function.apply_async(countdown=30)

我的目标是每当 celery 文件被编辑然后它会重新启动所有任务 - 删除尚未执行的排队任务(我这样做是因为recursion_function 将在完成检查每条记录的工作后再次运行我的数据库中的一个表,所以我不担心它会在中途停止)。

check_loop 函数将调用具有分页功能的 api 以返回对象列表,我将按表中的记录将其与表中的记录进行比较,如果匹配则创建另一个模型的新自定义记录

我的问题是当我清除所有消息时,当前正在运行的任务会中途停止还是继续运行?因为如果 check_loop 函数中途停止循环遍历 api 列表,那么它将再次运行循环,我将创建我不想要的新重复记录

示例:

check_loop() 的运行任务期间,它在中途创建了对象(在从元素 id=2 到 id=5 的 api 列表上),服务器重新启动 -> 再次运行,现在 check_loop() 从头开始​​运行(在元素 id 的 api 列表上=2 到 id=5) 并再次从该列表创建对象(100% 我不想要)

它是这样运行的吗?我只需要确认

编辑:

https://docs.celeryproject.org/en/4.4.1/faq.html#how-do-i-purge-all-waiting-tasks

我添加了app.control.purge(),因为当我重新启动时,recursion_functionsetup_periodic_tasks 中再次被调用,而之前来自recursion_function.apply_async(countdown=30)recursion_function 也执行,所以它会自我繁殖

【问题讨论】:

【参考方案1】:

我不会写像上面 Oleg 的优秀文章那样的文章。答案很简单 - 所有正在运行的任务都会继续运行purge 是关于队列中的所有任务,等待 Celery 工人挑选。

【讨论】:

【参考方案2】:

是的,除非工人也重新启动,否则工人将continue execution of currently running task。

此外,Celery Way始终期望任务在并发环境中运行,并考虑以下几点:

有许多任务同时运行 有很多 celery 工人在执行任务 同样的任务可能会再次运行 同一任务的多个实例可能同时运行 任何任务都可以随时终止

即使您确定在您的环境中只有一个工作人员手动启动/停止并且这些不适用 - 应该以这样的方式创建任务以允许这一切发生。

一些有用的技巧:

使用数据库事务 使用锁定 将长时间运行的任务拆分为更快的任务 如果任务有要保存的中间值或者它们很重要(即像某些 api 调用那样不可重现)并且它们在下一步的处理需要时间 - 考虑拆分成几个链式任务

如果您一次只需要运行一个任务实例 - 使用某种锁定 - 在数据库或数据库中创建/更新锁定记录缓存以便其他人(相同任务)可以检查并知道此任务正在运行,然后返回或等待前一个任务完成。

recursion_function 也可以是Periodic Task。作为周期性任务将确保它在每个间隔运行,即使前一个因任何原因失败(因此无法像在常规非周期性任务中一样再次排队)。通过锁定,您可以确保一次只运行一个。


check_loop():

首先,建议将结果在一个事务中保存在数据库中,以确保在数据库中保存/修改全部或全部。

您还可以保存一些标记,指示已保存对象的数量/状态,因此以后的任务可以只检查这个标记,而不是每个对象。

或者在创建每个元素之前以某种方式检查它是否已经存在于数据库中。

【讨论】:

为了解释,我决定在每个循环之后将每个“结束”分页保存到模型中,所以如果它重新启动,那么它将从上一个正在运行的分页开始

以上是关于Celery 在使用 app.control.purge() 时运行任务会发生啥?的主要内容,如果未能解决你的问题,请参考以下文章

python之celery在flask中使用

使用celery遇到的坑

celery 设置多少时间后运行

Django 中使用 Celery

异步任务利器Celery在django项目中使用Celery

如何让celery接受定制的参数