Python Celery - 如何在其他任务中调用芹菜任务

Posted

技术标签:

【中文标题】Python Celery - 如何在其他任务中调用芹菜任务【英文标题】:Python Celery - How to call celery tasks inside other task 【发布时间】:2014-03-16 19:05:35 【问题描述】:

我在 Django-Celery 的一个任务中调用一个任务

这是我的任务。

@shared_task
def post_notification(data,url):
    url = "http://posttestserver.com/data/?dir=praful" # when in production, remove this line.
    headers = 'content-type': 'application/json'
    requests.post(url, data=json.dumps(data), headers=headers)


@shared_task
def shipment_server(data,notification_type):
    notification_obj = Notification.objects.get(name = notification_type)
    server_list = ServerNotificationMapping.objects.filter(notification_name=notification_obj)

    for server in server_list:
        task = post_notification.delay(data,server.server_id.url)
        print task.status # it prints 'Nonetype' has no attribute id

如何在任务中调用任务? 我在某处读到它可以使用group 完成,但我无法形成正确的语法。我该怎么做?

我试过了

for server in server_list:
    task = group(post_notification.s(data, server.server_id.url))().get()
    print task.status

抛出警告说

TxIsolationWarning: Polling results w│                                                                        
ith transaction isolation level repeatable-read within the same transacti│                                                                        
on may give outdated results. Be sure to commit the transaction for each │                                                                        
poll iteration.                                                          │                                                                        
  'Polling results with transaction isolation level '

不知道是什么!!!

我该如何解决我的问题?

【问题讨论】:

result = task.delay/task.apply_async 给出了一个AsyncResult 对象。这支持轮询.status 属性,每次访问该属性时都会检查任务的状态。在您发送任务后立即调用 .state 是没有意义的,因为工作人员可能还没有开始执行它。在您后面的示例中,您调用 task = .....get().status 这将不起作用,因为您在任务的返回值上调用状态,而不是结果(result.status 与 result.get().status)。 最后你不应该等待子任务的结果,因为这可能会导致死锁,而应该使用回调任务:(post_notification.s() | do_sometihing_after_posted.s()).delay()。见docs.celeryproject.org/en/latest/userguide/… 和docs.celeryproject.org/en/latest/userguide/canvas.html 【参考方案1】:

这应该可行:

celery.current_app.send_task('mymodel.tasks.mytask', args=[arg1, arg2, arg3])

【讨论】:

什么是 my_model 和 current_app? current_app 是 celery 模块的属性。 mymodel.tasks 是您的tasks.py 的路径。如有必要,请更改。 由于要调用的任务在同一个模块中,所以我是这样做的task = celery.current_app.send_task('post_notification', args=[data, url]) print task.status 请注意,您不必使用send_task,您可以在没有问题的任务中使用task.delay,您的问题是轮询返回的结果对象。 在这里工作得很好!谢谢...我使用 from celery._state import current_app 到版本 4.2.x【参考方案2】:

你是对的,因为 for 循环中的每个任务都将被覆盖 task 变量。

你可以试试celery.group点赞

from celery import group

@shared_task
def shipment_server(data,notification_type):
    notification_obj = Notification.objects.get(name = notification_type)
    server_list = ServerNotificationMapping.objects.filter(notification_name=notification_obj)


    tasks = [post_notification.s(data, server.server_id.url) for server in server_list]
    results = group(tasks)()
    print results.get() # results.status() what ever you want

【讨论】:

【参考方案3】:

您可以使用延迟函数从任务中调用任务

from app.tasks import celery_add_task
    celery_add_task.apply_async(args=[task_name]) 

...它会起作用的

【讨论】:

以上是关于Python Celery - 如何在其他任务中调用芹菜任务的主要内容,如果未能解决你的问题,请参考以下文章

python3+celery+redis实现异步任务

python celery介绍和基本使用

如何按名称限制运行 Celery 任务的最大数量

44. Python Celery多实例 定时任务

如何让celery接受定制的参数

如何在不延迟任务的情况下优雅地重启 Celery