我应该如何在芹菜中实现任务集的回调
Posted
技术标签:
【中文标题】我应该如何在芹菜中实现任务集的回调【英文标题】:How should I implement callback for taskset in celery 【发布时间】:2012-06-01 19:40:32 【问题描述】:问题
我使用 celery 启动如下所示的任务集:
-
我执行了一批可以并行运行的任务,这批任务的数量从几十到几千不等。
我将这些任务的结果汇总为单个答案,然后对这个答案做一些事情——比如存储到数据库、保存到特殊的结果文件等等。基本上在执行完任务后,我必须调用具有以下签名的函数:
def callback(result_file_name, task_result_list):
#store in file
def callback(entity_key, task_result_list):
#store in db
现在第 1 步在 Celery 队列中完成,第 2 步在 celery 外部完成:
tasks = []
# add taksks to tasks list
task_group = group()
task_group.tasks = tasks
result = task_group.apply_async()
res = result.join()
# Aggregate results
# Save results to file, database whatever
这种方法很麻烦,因为我必须停止单个线程,直到所有任务都执行完毕(这可能需要几个小时)。
我也想以某种方式将第 2 步移到 celery --- 基本上我需要向整个任务集添加一个回调(据我所知它在 Celery 中不受支持)或提交一个在所有这些之后执行的任务子任务。
有人知道怎么做吗?我在 django 环境中使用它,所以我可以在数据库中存储一些状态。
总结一下我最近的发现
和弦不行
我不能直接使用和弦,因为和弦使我能够创建这样的回调:
def callback(task_result_list):
#store in file
没有明显的方法可以将附加参数传递给回调(特别是因为这些回调不能是本地函数)。
也使用数据库
我可以使用TaskSetMeta
来存储结果,但是这个实体没有状态字段 --- 所以即使我要向 TaskSetMeta 添加一个信号,我也必须汇集任务结果,这可能会产生巨大的开销。
【问题讨论】:
你试过chords吗? 我研究了和弦,我认为他们不会这样做。我想将结果存储在数据库或文件或其他任何东西中。在这种情况下,chord 需要传递两个参数,首先是报告文件名,或者任务实体第二个参数的一些细节将被连接 args。截至今天,芹菜和弦仅列出结果。还是我错了? 【参考方案1】:答案真的很简单,我确实可以使用和弦 --- 并且附加参数(如报告文件名等)必须作为 kwargs 传递。
这是和弦任务:
@task
def print_and_sum(to_sum, file_name):
print file_name
print sum(to_sum)
return file_name, sum(to_sum)
下面是如何实例化它:
subtasks = [...]
result = chord(subtasks)(print_and_sum.subtask(kwargs='file_name' : 'report_file.csv'))
【讨论】:
注意 -result = chord(subtasks)(print_and_sum.s(kwargs = 'file_name' : 'report_file.csv'))
不起作用。它说kwargs
是一个身份不明的论点。使用 substask
而不是 s
时效果很好。不知道原因。
谢谢@Siddharth。问题和答案都已经过时了,我现在并没有真正使用 cleery。随意发布您的发现作为另一个答案,因为它可能对其他人有用。以上是关于我应该如何在芹菜中实现任务集的回调的主要内容,如果未能解决你的问题,请参考以下文章
Python Celery - 如何在其他任务中调用芹菜任务
如何在任务中获取芹菜结果模型(使用 django-celery-results)