我应该如何在芹菜中实现任务集的回调

Posted

技术标签:

【中文标题】我应该如何在芹菜中实现任务集的回调【英文标题】:How should I implement callback for taskset in celery 【发布时间】:2012-06-01 19:40:32 【问题描述】:

问题

我使用 celery 启动如下所示的任务集:

    我执行了一批可以并行运行的任务,这批任务的数量从几十到几千不等。

    我将这些任务的结果汇总为单个答案,然后对这个答案做一些事情——比如存储到数据库、保存到特殊的结果文件等等。基本上在执行完任务后,我必须调用具有以下签名的函数:

    def callback(result_file_name, task_result_list): 
        #store in file
    
    
    def callback(entity_key, task_result_list):
        #store in db 
    

现在第 1 步在 Celery 队列中完成,第 2 步在 celery 外部完成:

    tasks = []

    # add taksks to tasks list 

    task_group = group()
    task_group.tasks = tasks

    result = task_group.apply_async()

    res = result.join()

    # Aggregate results 

    # Save results to file, database whatever

这种方法很麻烦,因为我必须停止单个线程,直到所有任务都执行完毕(这可能需要几个小时)。

我也想以某种方式将第 2 步移到 celery --- 基本上我需要向整个任务集添加一个回调(据我所知它在 Celery 中不受支持)或提交一个在所有这些之后执行的任务子任务。

有人知道怎么做吗?我在 django 环境中使用它,所以我可以在数据库中存储一些状态。

总结一下我最近的发现

和弦不行

我不能直接使用和弦,因为和弦使我能够创建这样的回调:

    def callback(task_result_list): 
        #store in file

没有明显的方法可以将附加参数传递给回调(特别是因为这些回调不能是本地函数)。

也使用数据库

我可以使用TaskSetMeta 来存储结果,但是这个实体没有状态字段 --- 所以即使我要向 TaskSetMeta 添加一个信号,我也必须汇集任务结果,这可能会产生巨大的开销。

【问题讨论】:

你试过chords吗? 我研究了和弦,我认为他们不会这样做。我想将结果存储在数据库或文件或其他任何东西中。在这种情况下,chord 需要传递两个参数,首先是报告文件名,或者任务实体第二个参数的一些细节将被连接 args。截至今天,芹菜和弦仅列出结果。还是我错了? 【参考方案1】:

答案真的很简单,我确实可以使用和弦 --- 并且附加参数(如报告文件名等)必须作为 kwargs 传递。

这是和弦任务:

@task
def print_and_sum(to_sum, file_name):
    print file_name
    print sum(to_sum)
    return file_name, sum(to_sum)

下面是如何实例化它:

subtasks = [...]
result = chord(subtasks)(print_and_sum.subtask(kwargs='file_name' : 'report_file.csv'))

【讨论】:

注意 - result = chord(subtasks)(print_and_sum.s(kwargs = 'file_name' : 'report_file.csv')) 不起作用。它说kwargs 是一个身份不明的论点。使用 substask 而不是 s 时效果很好。不知道原因。 谢谢@Siddharth。问题和答案都已经过时了,我现在并没有真正使用 cleery。随意发布您的发现作为另一个答案,因为它可能对其他人有用。

以上是关于我应该如何在芹菜中实现任务集的回调的主要内容,如果未能解决你的问题,请参考以下文章

如何防止芹菜执行相同的任务?

如何在不重复的情况下重试芹菜任务 - SQS

Python Celery - 如何在其他任务中调用芹菜任务

如何在任务中获取芹菜结果模型(使用 django-celery-results)

在 dask 或 Dramatiq 中带有 (bind=True) 的芹菜?

芹菜:在特定时间间隔后执行任务