在 Celery 任务之间共享一个通用的实用函数
Posted
技术标签:
【中文标题】在 Celery 任务之间共享一个通用的实用函数【英文标题】:Share a common utility function between Celery tasks 【发布时间】:2015-07-04 05:35:12 【问题描述】:我在 Celery 中有很多任务都使用 canvas chain
连接。
@shared_task(bind=True)
def my_task_A(self):
try:
logger.debug('running task A')
do something
except Exception:
run common cleanup function
@shared_task(bind=True)
def my_task_B(self):
try:
logger.debug('running task B')
do something else
except Exception:
run common cleanup function
...
到目前为止一切顺利。问题是我正在寻找使用这样的通用实用程序功能的最佳实践:
def cleanup_and_notify_user(task_data):
logger.debug('task failed')
send email
delete folders
...
在没有任务阻塞的情况下最好的方法是什么?
例如,我可以将run common cleanup function
替换为对cleanup_and_notify_user(task_data)
的调用吗?如果来自多个工作人员的多个任务尝试同时调用该函数会发生什么?
每个工人都有自己的副本吗?我显然对这里的几个概念有点困惑。非常感谢任何帮助。
提前谢谢大家。
【问题讨论】:
任务在自己的进程中运行,因此它们将被足够隔离,Python 明智。至于同时修改外部资源的行为如何......很难说。可能想在问题中添加您的操作系统,并考虑外部同步/排除机制,例如创建工作标记文件或操作系统多进程信号机制。 @JLPeyret 我只关心某种机制来“协调”任务失败后对回退功能的访问......顺便说一句,我使用的是 ubuntu 机器。 在功能级别无关紧要。在功能正在修改的级别上执行。即 func X 和 Y 都删除文件夹可能是一个问题,无论它们是不同的功能。而且,由于您不在此处使用线程,因此请注意线程上下文中给出的 Python 指令。 我对锁之类的东西不是很聪明,所以我不想诱导你出错。但是,Python 有一种称为多进程的替代并发机制,它不是基于线程的,因此对于您的上下文来说就像芹菜一样。这个搜索,[python] lock multiprocess,可能会有所启发。另请参阅***.com/questions/28670524/…,这是另一种方法 - 告诉 Celery 不要同时运行某些任务(在你的情况下也是如此) - 我认为这可能是最简单的方法。 @JLPeyret 感谢您的提示。我会调查的。 【参考方案1】:在 celery 任务中,您只是在编写 python 代码,因此该任务有自己的进程,并且该函数将像在任何基本 OOP 逻辑中一样为每个任务实例化。 当然,如果此清理功能尝试删除系统文件夹或数据库行等共享资源,您最终会遇到外部资源的并发访问问题,您需要以其他方式解决,例如在文件系统的情况下,您可以为每个任务创建一个沙箱. 希望这会有所帮助
【讨论】:
以上是关于在 Celery 任务之间共享一个通用的实用函数的主要内容,如果未能解决你的问题,请参考以下文章
在共享一个通用插件的两个 grails 应用程序之间共享配置