包含具有属性的 celery 任务的装饰器
Posted
技术标签:
【中文标题】包含具有属性的 celery 任务的装饰器【英文标题】:Decorator that includes celery task with attributes 【发布时间】:2021-06-25 11:14:25 【问题描述】:我正在开发一个包含多个应用程序的 Django 项目,每个应用程序都有自己的任务集,并且运行良好。但是,一般来说,每个应用程序的任务集使用相同的属性来进行速率限制、重试等。我试图创建一个具有所有这些通用属性的装饰器,并将目标函数设置为任务。
所以,我的example/tasks.py
中有这个:
from celery import current_app
@current_app.task(
queue='example_queue',
max_retries=3,
rate_limit='10/s')
def example_task_1():
...
@current_app.task(
queue='example_queue',
max_retries=3,
rate_limit='10/s')
def example_task_2():
...
我正在尝试类似的东西:
from celery import current_app, Task
class ExampleTask(Task):
def __init__(self, task, *args, **kwargs):
super().__init__(*args, **kwargs)
self.task = task
def run(self):
self.task()
def example_decorator_task(func):
@wraps(func)
def wrapped(self, *args, **kwargs):
return ExampleTask(func).delay(
queue='example_queue',
max_retries=3,
rate_limit='10/s')
@example_decorator_task
def example_task_1():
...
@example_decorator_task
def example_task_2():
...
我得到了这个工作,但调用example_task_1.delay(...)
,任务将无法像往常一样工作,因为正在包装内执行。
有什么想法吗?
【问题讨论】:
【参考方案1】:当您传递参数时,参数化装饰器会转换为无参数装饰器,但不会将它们应用于函数。因此,您可以通过将结果存储到变量中来创建新的装饰器,然后您可以将其应用于各种任务函数。见:
from celery import current_app
rate_limited_task = current_app.task(
queue='example_queue',
max_retries=3,
rate_limit='10/s'
)
@rate_limited_task
def example_task_1():
...
@rate_limited_task
def example_task_2():
...
【讨论】:
【参考方案2】:使用apply_async
而不是delay
,你的装饰器应该变成这样:
def decorator(function):
def wrapper(*args, **kwargs):
return function.apply_async(args=[*args], kwargs=**kwargs, **
'queue': 'example_queue',
'max_retries': 3,
'rate_limit': '10/s'
)
return wrapper
更多详情:
apply_async
delay
【讨论】:
好的,很酷 :) 但有了这个我仍然需要同时使用:@current_app.task
和 @decorator
,是否可以使 decorator
具有与 @current_app.task
相同的功能 +我想要的设置?
对不起,我一开始没有抓住问题,所以我没有确切的答案,但调用 celeryapp.task
装饰器并将函数传递给它,然后调用返回值上的apply_async
可能有效。以上是关于包含具有属性的 celery 任务的装饰器的主要内容,如果未能解决你的问题,请参考以下文章