在 Celery 任务之间共享一个通用的实用函数

Posted

技术标签:

【中文标题】在 Celery 任务之间共享一个通用的实用函数【英文标题】:Share a common utility function between Celery tasks 【发布时间】:2015-07-04 05:35:12 【问题描述】:

我在 Celery 中有很多任务都使用 canvas chain 连接。

@shared_task(bind=True)
def my_task_A(self):
    try:
        logger.debug('running task A')
        do something
    except Exception:
        run common cleanup function

@shared_task(bind=True)
def my_task_B(self):
    try:
        logger.debug('running task B')
        do something else
    except Exception:
        run common cleanup function

...

到目前为止一切顺利。问题是我正在寻找使用这样的通用实用程序功能的最佳实践:

def cleanup_and_notify_user(task_data):
    logger.debug('task failed')
    send email
    delete folders
    ...

在没有任务阻塞的情况下最好的方法是什么? 例如,我可以将run common cleanup function 替换为对cleanup_and_notify_user(task_data) 的调用吗?如果来自多个工作人员的多个任务尝试同时调用该函数会发生什么?

每个工人都有自己的副本吗?我显然对这里的几个概念有点困惑。非常感谢任何帮助。

提前谢谢大家。

【问题讨论】:

任务在自己的进程中运行,因此它们将被足够隔离,Python 明智。至于同时修改外部资源的行为如何......很难说。可能想在问题中添加您的操作系统,并考虑外部同步/排除机制,例如创建工作标记文件或操作系统多进程信号机制。 @JLPeyret 我只关心某种机制来“协调”任务失败后对回退功能的访问......顺便说一句,我使用的是 ubuntu 机器。 在功能级别无关紧要。在功能正在修改的级别上执行。即 func X 和 Y 都删除文件夹可能是一个问题,无论它们是不同的功能。而且,由于您不在此处使用线程,因此请注意线程上下文中给出的 Python 指令。 我对锁之类的东西不是很聪明,所以我不想诱导你出错。但是,Python 有一种称为多进程的替代并发机制,它不是基于线程的,因此对于您的上下文来说就像芹菜一样。这个搜索,[python] lock multiprocess,可能会有所启发。另请参阅***.com/questions/28670524/…,这是另一种方法 - 告诉 Celery 不要同时运行某些任务(在你的情况下也是如此) - 我认为这可能是最简单的方法。 @JLPeyret 感谢您的提示。我会调查的。 【参考方案1】:

在 celery 任务中,您只是在编写 python 代码,因此该任务有自己的进程,并且该函数将像在任何基本 OOP 逻辑中一样为每个任务实例化。 当然,如果此清理功能尝试删除系统文件夹或数据库行等共享资源,您最终会遇到外部资源的并发访问问题,您需要以其他方式解决,例如在文件系统的情况下,您可以为每个任务创建一个沙箱. 希望这会有所帮助

【讨论】:

以上是关于在 Celery 任务之间共享一个通用的实用函数的主要内容,如果未能解决你的问题,请参考以下文章

在共享一个通用插件的两个 grails 应用程序之间共享配置

包含具有属性的 celery 任务的装饰器

Celery异步任务

你如何对 Celery 任务进行单元测试?

Django post_save 信号和 celery 任务之间可能的竞争条件

php怎么调用celery任务