使用类方法作为 celery 任务

Posted

技术标签:

【中文标题】使用类方法作为 celery 任务【英文标题】:using class methods as celery tasks 【发布时间】:2012-03-04 06:03:45 【问题描述】:

我正在尝试将类的方法用作 django-celery 任务,并使用 @task 装饰器对其进行标记。 Anand Jeyahar 提出了同样的情况here。 是这样的

class A:
    @task
    def foo(self, bar):
        ...

def main():
    a = A()
    ...
    # what i need
    a.foo.delay(bar) # executes as celery task 
    a.foo(bar) # executes locally

问题是即使我使用像 a.foo.delay(bar) 这样的类实例,它说 foo 至少需要两个参数,这意味着 self 指针丢失。

更多信息:

由于继承,我无法将类转换为模块 方法强烈依赖于类成员,所以我不能让它们静态 使用@task 装饰器将标记为任务使该类本身成为任务,并且可以使用某些参数作为方法选择的键来执行run()方法中的方法,但这并不是我想要的。 创建一个类的实例并将其作为self 参数传递给方法会改变我执行方法的方式不是像芹菜一样,而是像通常的方法一样(即在测试时) 我试图找出如何以动态方式注册任务,例如从构造函数,但 celery 在工作人员之间共享代码,所以这似乎是不可能的。

感谢您的帮助!

【问题讨论】:

如何执行?同样的例子也适用于我。 a = A() a.method(1,2) or a.method.delay(1,2) -- 结果是一样的 【参考方案1】:

Jeremy Satterfield 有一个简洁明了的教程来编写基于类的任务,如果这是您想要完成的。您可以查看here。

魔术基本上是扩展 celery.Task 类,包括一个 run() 方法,就像这样:

from celery import Task

class CustomTask(Task):
    ignore_result = True

    def __init__(self, arg):
        self.arg = arg

    def run(self):
        do_something_with_arg(self.arg)

然后像这样运行任务:

your_arg = 3

custom_task = CustomTask()
custom_task.delay(your_arg)

我不确定ignore_result = True 部分是否必要。

【讨论】:

我试过这个,但它不起作用,它抛出这个错误 - AttributeError: FileTask instance has no attribute 'delay' 您不应该在基于类的任务中覆盖 init。 @SheeshMohsin 请去掉init,直接在run函数中获取arg def run(self, arg): do_something_with_arg(arg)【参考方案2】:

我遇到了类似的情况,并决定将类方法包装在一个简单的函数中,该函数会将其参数重定向到类的实例并执行此类方法:

class A:
    def foo(self, bar):
       # do this

a = A()

@app.task
def a_wrapper(bar):
    return a.foo(bar)

# probably in a different size with an import in-place:

a_wrapper.delay(bar)

【讨论】:

【参考方案3】:

Celery 自 3.0 版起实验性支持将方法用作任务。

这方面的文档在 celery.contrib.methods 中,还提到了一些您应该注意的警告:

https://docs.celeryproject.org/en/3.1/reference/celery.contrib.methods.html

注意:从 Celery 中删除对 contrib.methods 的支持自 4.0 起

【讨论】:

很高兴在未来的版本中看到这一点! @ThiefMaster 这实际上已经在 Celery 3.0 中了,见celery.contrib.methods:docs.celeryproject.org/en/latest/reference/… 很好,考虑更新您的答案以包含此内容。 请注意:这似乎在 2014 年 10 月被删除,因为它的错误太大而无法使用 - github.com/celery/celery/commit/…【参考方案4】:

对我来说唯一有效的是 celery.current_app 因为这会将self 传递给方法。

所以这应该是这样的:

from celery import current_app
from celery.contrib.methods import task_method

class A:
@current_app.task(filter=task_method, name='A.foo')
def foo(self, bar):
    ...

如果您在不同的类中有同名的方法,则必须使用该名称。

【讨论】:

正如@asksol 提到的,自从芹菜版本4 docs.celeryproject.org/en/latest/history/…987654321@【参考方案5】:

当你有:

    a = A()

你可以这样做:

    A.foo.delay(a, param0, .., paramN)

干杯

【讨论】:

如何对apply_async做同样的事情,只是为了在一个地方获取语法?

以上是关于使用类方法作为 celery 任务的主要内容,如果未能解决你的问题,请参考以下文章

celery中filter=task_method, bind=True修饰实例方法和类方法传参

Django Celery 结果将任务 ID 设置为人类可读的内容?

如何将基于类的任务传递给 CELERY_BEAT_SCHEDULE

如何按任务名称检查和取消 Celery 任务

celery 4.0.0 和基于类的任务工作流程

您可以使用 ctypes 通过其内存 id 将对象传递给 celery 任务吗?