如何设置 Celery 在运行任务之前调用自定义初始化函数?
Posted
技术标签:
【中文标题】如何设置 Celery 在运行任务之前调用自定义初始化函数?【英文标题】:How can I set up Celery to call a custom initialization function before running my tasks? 【发布时间】:2011-01-08 22:41:30 【问题描述】:我有一个 Django 项目,我正在尝试使用 Celery 提交任务以进行后台处理 (http://ask.github.com/celery/introduction.html)。 Celery 与 Django 集成得很好,我已经能够提交我的自定义任务并返回结果。
唯一的问题是我找不到在守护进程中执行自定义初始化的合理方法。在开始处理任务之前,我需要调用一个会加载大量内存的昂贵函数,而且我不能每次都调用该函数。
以前有人遇到过这个问题吗?任何想法如何在不修改 Celery 源代码的情况下解决它?
谢谢
【问题讨论】:
您需要运行什么样的自定义初始化? 我需要加载一个约 10MB 的数据结构来处理每个任务(所有任务的结构都相同)。 【参考方案1】:您可以编写自定义加载程序,也可以使用信号。
Loaders 有on_task_init
方法,当一个任务即将执行时被调用,
和on_worker_init
,由celery+celerybeat主进程调用。
使用信号可能是最简单的,可用的信号有:
0.8.x:
task_prerun(task_id, task, args, kwargs)
当任务即将由工作人员(或本地)执行时调度
如果使用apply
/或如果CELERY_ALWAYS_EAGER
已设置)。
task_postrun(task_id, task, args, kwargs, retval)
在与上述相同的条件下执行任务后调度。
task_sent(task_id, task, args, kwargs, eta, taskset)
在应用任务时调用(不适合长时间运行的操作)
0.9.x 中可用的其他信号(github 上的当前主分支):
worker_init()
在 celeryd 启动时调用(在任务初始化之前,所以如果在一个
支持fork
的系统,任何内存变化都会被复制到孩子
工作进程)。
worker_ready()
当 celeryd 能够接收任务时调用。
worker_shutdown()
当 celeryd 关闭时调用。
这是一个在进程中第一次运行任务时预先计算某些内容的示例:
from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun
_precalc_table =
class PowersOfTwo(Task):
def run(self, x):
if x in _precalc_table:
return _precalc_table[x]
else:
return x ** 2
tasks.register(PowersOfTwo)
def _precalc_numbers(**kwargs):
if not _precalc_table: # it's empty, so haven't been generated yet
for i in range(1024):
_precalc_table[i] = i ** 2
# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
如果您想为所有任务运行该函数,只需跳过 sender
参数即可。
【讨论】:
以上是关于如何设置 Celery 在运行任务之前调用自定义初始化函数?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Django admin 使用 Celery 创建自定义任务