包含具有属性的 celery 任务的装饰器

Posted

技术标签:

【中文标题】包含具有属性的 celery 任务的装饰器【英文标题】:Decorator that includes celery task with attributes 【发布时间】:2021-06-25 11:14:25 【问题描述】:

我正在开发一个包含多个应用程序的 Django 项目,每个应用程序都有自己的任务集,并且运行良好。但是,一般来说,每个应用程序的任务集使用相同的属性来进行速率限制、重试等。我试图创建一个具有所有这些通用属性的装饰器,并将目标函数设置为任务。

所以,我的example/tasks.py 中有这个:

from celery import current_app


@current_app.task(
    queue='example_queue',
    max_retries=3,
    rate_limit='10/s')
def example_task_1():
    ...

@current_app.task(
    queue='example_queue',
    max_retries=3,
    rate_limit='10/s')
def example_task_2():
    ...

我正在尝试类似的东西:

from celery import current_app, Task


class ExampleTask(Task):

    def __init__(self, task, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.task = task

    def run(self):
        self.task()


def example_decorator_task(func):

    @wraps(func)
    def wrapped(self, *args, **kwargs):
        return ExampleTask(func).delay(
            queue='example_queue',
            max_retries=3,
            rate_limit='10/s')

@example_decorator_task
def example_task_1():
    ...

@example_decorator_task
def example_task_2():
    ...

我得到了这个工作,但调用example_task_1.delay(...),任务将无法像往常一样工作,因为正在包装内执行。 有什么想法吗?

【问题讨论】:

【参考方案1】:

当您传递参数时,参数化装饰器会转换为无参数装饰器,但不会将它们应用于函数。因此,您可以通过将结果存储到变量中来创建新的装饰器,然后您可以将其应用于各种任务函数。见:

from celery import current_app

rate_limited_task = current_app.task(
    queue='example_queue',
    max_retries=3,
    rate_limit='10/s'
)

@rate_limited_task
def example_task_1():
    ...

@rate_limited_task
def example_task_2():
    ...

【讨论】:

【参考方案2】:

使用apply_async而不是delay,你的装饰器应该变成这样:

def decorator(function):
    def wrapper(*args, **kwargs):
        return function.apply_async(args=[*args], kwargs=**kwargs, **
            'queue': 'example_queue',
            'max_retries': 3,
            'rate_limit': '10/s'
        )
    return wrapper

更多详情:

apply_async

delay

【讨论】:

好的,很酷 :) 但有了这个我仍然需要同时使用:@current_app.task@decorator,是否可以使 decorator 具有与 @current_app.task 相同的功能 +我想要的设置? 对不起,我一开始没有抓住问题,所以我没有确切的答案,但调用 celeryapp.task 装饰器并将函数传递给它,然后调用返回值上的apply_async 可能有效。

以上是关于包含具有属性的 celery 任务的装饰器的主要内容,如果未能解决你的问题,请参考以下文章

具有多个装饰器的芹菜任务不自动注册任务名称

@shared_task 装饰器不起作用

装饰器设置函数的属性

TypeScript基础入门之装饰器(三)

JS 装饰器解析

Python面向对象编程第18篇 属性装饰器