使用类方法作为 celery 任务
Posted
技术标签:
【中文标题】使用类方法作为 celery 任务【英文标题】:using class methods as celery tasks 【发布时间】:2012-03-04 06:03:45 【问题描述】:我正在尝试将类的方法用作 django-celery 任务,并使用 @task 装饰器对其进行标记。 Anand Jeyahar 提出了同样的情况here。 是这样的
class A:
@task
def foo(self, bar):
...
def main():
a = A()
...
# what i need
a.foo.delay(bar) # executes as celery task
a.foo(bar) # executes locally
问题是即使我使用像 a.foo.delay(bar)
这样的类实例,它说 foo
至少需要两个参数,这意味着 self
指针丢失。
更多信息:
由于继承,我无法将类转换为模块 方法强烈依赖于类成员,所以我不能让它们静态 使用@task 装饰器将类标记为任务使该类本身成为任务,并且可以使用某些参数作为方法选择的键来执行run()
方法中的方法,但这并不是我想要的。
创建一个类的实例并将其作为self
参数传递给方法会改变我执行方法的方式不是像芹菜一样,而是像通常的方法一样(即在测试时)
我试图找出如何以动态方式注册任务,例如从构造函数,但 celery 在工作人员之间共享代码,所以这似乎是不可能的。
感谢您的帮助!
【问题讨论】:
如何执行?同样的例子也适用于我。a = A()
a.method(1,2)
or a.method.delay(1,2)
-- 结果是一样的
【参考方案1】:
Jeremy Satterfield 有一个简洁明了的教程来编写基于类的任务,如果这是您想要完成的。您可以查看here。
魔术基本上是扩展 celery.Task
类,包括一个 run()
方法,就像这样:
from celery import Task
class CustomTask(Task):
ignore_result = True
def __init__(self, arg):
self.arg = arg
def run(self):
do_something_with_arg(self.arg)
然后像这样运行任务:
your_arg = 3
custom_task = CustomTask()
custom_task.delay(your_arg)
我不确定ignore_result = True
部分是否必要。
【讨论】:
我试过这个,但它不起作用,它抛出这个错误 - AttributeError: FileTask instance has no attribute 'delay' 您不应该在基于类的任务中覆盖 init。 @SheeshMohsin 请去掉init,直接在run函数中获取arg def run(self, arg): do_something_with_arg(arg)【参考方案2】:我遇到了类似的情况,并决定将类方法包装在一个简单的函数中,该函数会将其参数重定向到类的实例并执行此类方法:
class A:
def foo(self, bar):
# do this
a = A()
@app.task
def a_wrapper(bar):
return a.foo(bar)
# probably in a different size with an import in-place:
a_wrapper.delay(bar)
【讨论】:
【参考方案3】:Celery 自 3.0 版起实验性支持将方法用作任务。
这方面的文档在 celery.contrib.methods
中,还提到了一些您应该注意的警告:
https://docs.celeryproject.org/en/3.1/reference/celery.contrib.methods.html
注意:从 Celery 中删除对 contrib.methods
的支持自 4.0 起
【讨论】:
很高兴在未来的版本中看到这一点! @ThiefMaster 这实际上已经在 Celery 3.0 中了,见celery.contrib.methods
:docs.celeryproject.org/en/latest/reference/…
很好,考虑更新您的答案以包含此内容。
请注意:这似乎在 2014 年 10 月被删除,因为它的错误太大而无法使用 - github.com/celery/celery/commit/…【参考方案4】:
对我来说唯一有效的是 celery.current_app 因为这会将self
传递给方法。
所以这应该是这样的:
from celery import current_app
from celery.contrib.methods import task_method
class A:
@current_app.task(filter=task_method, name='A.foo')
def foo(self, bar):
...
如果您在不同的类中有同名的方法,则必须使用该名称。
【讨论】:
正如@asksol 提到的,自从芹菜版本4 docs.celeryproject.org/en/latest/history/…987654321@【参考方案5】:当你有:
a = A()
你可以这样做:
A.foo.delay(a, param0, .., paramN)
干杯
【讨论】:
如何对apply_async
做同样的事情,只是为了在一个地方获取语法?以上是关于使用类方法作为 celery 任务的主要内容,如果未能解决你的问题,请参考以下文章
celery中filter=task_method, bind=True修饰实例方法和类方法传参
Django Celery 结果将任务 ID 设置为人类可读的内容?