celery 4.0.0 和基于类的任务工作流程

Posted

技术标签:

【中文标题】celery 4.0.0 和基于类的任务工作流程【英文标题】:celery 4.0.0 and Class based task workflow 【发布时间】:2017-03-25 21:22:26 【问题描述】:

基于这些示例:

https://blog.balthazar-rouberol.com/celery-best-practices

https://shulhi.com/2015/10/13/class-based-celery-task/

Class based Task in django celery

我想创建类似的东西:

class CalculationWorker(Task):

    def __init__(self, id_user):
        self._id_user = id_user
        self._user = get_object_or_404(User, pk=self._id_user)

    # I need to understand if the bind work or if it's needed
    def _bind(self, app):
        return super(self.__class__, self).bind(celery_app)

    def _retrieve_some_task(self):
        # long calculation

    def _long_run_task(self):
        # long calculation
        self._retrieve_some_task()

    # Main entry
    def run(self):
        self._long_run_task()


# 
def run_job():  
    worker = CalculationWorker(id_user=323232)
    task = worker.apply_async()

文档似乎说这是可能的(无论如何我不清楚)http://docs.celeryproject.org/en/latest/userguide/tasks.html#custom-task-classes。它甚至说:

""" 这意味着 init 构造函数每个进程只会被调用一次,并且任务类在语义上更接近于 Actor。 ""

但http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#the-task-base-class-no-longer-automatically-register-tasks 明确表示:“最佳实践是仅使用自定义任务类来覆盖一般行为,然后使用任务装饰器来实现任务”。

因此,由于https://github.com/celery/celery/issues/3548,我得到了一个 NotRegistered 异常,但添加 app.tasks.register(CalculationWorker()) 并没有解决它。 我正在使用 Django 1.10.X 和 Celery 4.0.0

这种方法仍然有效吗?

谢谢

【问题讨论】:

【参考方案1】:

如果你使用 celery-4.0.1,那么你应该检查 chris 1 指出的文档 docs

Task 类不再使用自动在任务注册表中注册任务的特殊元类。

现在你应该像这样注册你的任务

class CustomTask(Task):
    def run(self):
        print('running')
app.register_task(CustomTask())

【讨论】:

我按照您展示的方式进行了操作,您知道如何使用此手动 register_tasks 添加超时等属性吗? @Leow 是的,在 Celery 中检查 class Task,你会看到它只是从配置中填充的类属性,以防它们是 None【参考方案2】:

我不确定这是否是最佳解决方案,但您可以使用解决方法来获得您想要的行为。我正在做类似的事情,以便我可以更清晰地在任务上添加错误处理程序。

来自文档1

最佳做法是仅将自定义任务类用于覆盖 一般行为,然后使用任务装饰器来实现 任务:

@app.task(bind=True, base=CustomTask) 
def custom(self):
    print('running')

但是您可以将所有任务代码放在 CustomTask 中,并在装饰任务声明中留下一个存根。您必须像这样在存根中调用任务的超类:

@app.task(bin=True, base=CustomTask)
def custom(self, *args):
    super(type(self), self).run(*args)

然后,您将修饰函数声明视为调用 celery 任务注册机制的一种方式。不过,我希望 5.0 能有更干净的东西出来。

【讨论】:

以上是关于celery 4.0.0 和基于类的任务工作流程的主要内容,如果未能解决你的问题,请参考以下文章

在 Django 项目中为可重用应用程序创建基于类的 Celery 任务

Celery

php怎么调用celery任务

Django中使用celery来异步处理和定时任务

如何在 Django 中等待芹菜任务的结果

python celery 模块