芹菜任务和自定义装饰器

Posted

技术标签:

【中文标题】芹菜任务和自定义装饰器【英文标题】:celery task and customize decorator 【发布时间】:2011-09-17 15:26:31 【问题描述】:

我正在开发一个使用 django 和 celery(django-celery) 的项目。我们的团队决定将所有数据访问代码封装在(app-name)/manager.py 中(而不是像django 那样封装到管理器中),并让 (app-name)/task.py 中的代码只处理 celery 的组装和执行任务(所以我们在这一层没有 django ORM 依赖)。

在我的manager.py,我有这样的东西:

def get_tag(tag_name):
    ctype = ContentType.objects.get_for_model(Photo)
    try:
        tag = Tag.objects.get(name=tag_name)
    except ObjectDoesNotExist:
        return Tag.objects.none()
    return tag

def get_tagged_photos(tag):
    ctype = ContentType.objects.get_for_model(Photo)
    return TaggedItem.objects.filter(content_type__pk=ctype.pk, tag__pk=tag.pk)

def get_tagged_photos_count(tag):
    return get_tagged_photos(tag).count()

在我的task.py中,我喜欢将它们包装成任务(然后可能使用这些任务来完成更复杂的任务),所以我编写了这个装饰器:

import manager #the module within same app containing data access functions

class mfunc_to_task(object):
    def __init__(mfunc_type='get'):
        self.mfunc_type = mfunc_type

    def __call__(self, f):
        def wrapper_f(*args, **kwargs):
            callback = kwargs.pop('callback', None)

            mfunc = getattr(manager, f.__name__)

            result = mfunc(*args, **kwargs)
            if callback:
                if self.mfunc_type == 'get':
                    subtask(callback).delay(result)
                elif self.mfunc_type == 'get_or_create':
                    subtask(callback).delay(result[0])
                else:
                    subtask(callback).delay()
            return result            

        return wrapper_f

那么(还在task.py):

#@task
@mfunc_to_task()
def get_tag():
    pass

#@task
@mfunc_to_task()
def get_tagged_photos():
    pass

#@task
@mfunc_to_task()
def get_tagged_photos_count():
    pass

没有@task 一切正常。 但是,在应用了 @task 装饰器(按照 celery 文档的说明放在顶部)之后,事情就开始崩溃了。显然,每次调用mfunc_to_task.__call__ 时,都会将相同的task.get_tag 函数作为f 传递。所以我每次都得到相同的wrapper_f,现在我唯一要做的就是得到一个标签。

我是装饰器的新手。任何人都可以帮助我理解这里出了什么问题,或者指出其他方法来完成任务?我真的很讨厌为我的每个数据访问函数编写相同的任务包装代码。

【问题讨论】:

【参考方案1】:

不太清楚为什么传递参数不起作用?

如果你使用这个例子:

@task()
def add(x, y):
    return x + y

让我们为 MyCoolTask​​ 添加一些日志记录:

from celery import task
from celery.registry import tasks

import logging
import celery

logger = logging.getLogger(__name__)

class MyCoolTask(celery.Task):

    def __call__(self, *args, **kwargs):
        """In celery task this function call the run method, here you can
        set some environment variable before the run of the task"""
        logger.info("Starting to run")
        return self.run(*args, **kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        #exit point of the task whatever is the state
        logger.info("Ending run")
        pass

并创建一个扩展类(扩展 MyCoolTask​​,但现在带有参数):

class AddTask(MyCoolTask):

    def run(self,x,y):
        if x and y:
            result=add(x,y)
            logger.info('result = %d' % result)
            return result
        else:
            logger.error('No x or y in arguments')

tasks.register(AddTask)

并确保将 kwargs 作为 json 数据传递:

"x":8,"y":9

我得到了结果:

[2013-03-05 17:30:25,853: INFO/MainProcess] Starting to run
[2013-03-05 17:30:25,855: INFO/MainProcess] result = 17
[2013-03-05 17:30:26,739: INFO/MainProcess] Ending run
[2013-03-05 17:30:26,741: INFO/MainProcess] Task iamscheduler.tasks.AddTask[6a62641d-16a6-44b6-a1cf-7d4bdc8ea9e0] succeeded in 0.888684988022s: 17

【讨论】:

使用 Celery4.0 执行此操作会产生以下错误:TypeError: __init__() takes exactly 3 arguments (1 given)。尝试使用任务装饰器@app.task(bind=True, base=AddTask) 创建一个单独的函数时也是如此,如here 所示。我知道的唯一选择是在注册之前创建一个类的实例。有没有其他方法,类似于你在这里所做的? 是的,这是 2013 年的一篇文章...... 5 年前......自那以后发生了很多变化。我可以看看我是否可以用 celery 4.0 制作一个更好的例子。你能定义一个新问题吗?我有点摆脱了基于课堂的任务……最终对我来说没有成功。我现在只使用@shared_task.... 你如何运行这个? delay 给出错误。【参考方案2】:

为什么不创建扩展celery.Task 的基类而不是使用装饰器?

通过这种方式,您的所有任务都可以扩展您自定义的任务类,您可以在其中使用方法__call__after_return 来实现您的个人行为 . 您还可以为所有任务定义通用方法和对象。

class MyCoolTask(celery.Task):

    def __call__(self, *args, **kwargs):
        """In celery task this function call the run method, here you can
        set some environment variable before the run of the task"""
        return self.run(*args, **kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        #exit point of the task whatever is the state
        pass

【讨论】:

谢谢,伙计。扩展 cerlery.task.Task 肯定是一种方法,但由于 celery 的一些深层元类黑魔法,我发现我无法将参数传递给 MyTask 的 init 并在 callrun,所以我将所有逻辑放在 MyTask 中,并提出了一个命名模式来通过 self.__class__.__name__ 传递参数。然后使用命名模式为我的每个数据访问函数扩展 MyTask 以传递参数,并实例化一次以获得我需要的任务。这样我确实把所有的逻辑都放在了一个地方,但看起来还是有点乱。有什么优雅的解决方案吗? 真诚地认为这已经足够优雅了。个人建议是,当您找不到优雅的解决方案时,可能整个方法都是错误的。但有可能我错了;-),由你决定 @MauroRocco:感谢您在 europycon 2011 youtube 演示文稿中的回答和清晰的解释,这让我(在更加困惑之后)得到了我在这里给出的答案。 (见:slideshare.net/fireantology/…)

以上是关于芹菜任务和自定义装饰器的主要内容,如果未能解决你的问题,请参考以下文章

关于路径和自定义装饰器的 Python3 Django 问题

你如何使用 Flask-Login 和自定义 Python 装饰器来分配用户权限?

芹菜 - 如何使用多个队列?

类方法的python装饰器

django 内置“信号”机制和自定义方法

python进阶之装饰器之2.定义一个可接受参数的装饰器如何定义一个属性可由用户修改的装饰器定义一个能接受可选参数的装饰器