从芹菜任务中获取芹菜工人的名字?

Posted

技术标签:

【中文标题】从芹菜任务中获取芹菜工人的名字?【英文标题】:Get the name of celery worker from inside a celery task? 【发布时间】:2014-07-14 10:52:13 【问题描述】:

我希望 celery 任务能够获取执行它的工作人员的名称,以用于记录目的。我需要从任务中处理这个问题,而不是直接查询代理。有没有办法做到这一点?如果这很重要,我将芹菜与 RabbitMQ 一起使用。

【问题讨论】:

如果对您有用,请将我的解决方案设置为已解决 :) 【参考方案1】:

出于报告目的,我还需要工作人员名称,因此我尝试了@cacois 解决方案,但它似乎不适用于 eventlet(current_process() 没有 initargs 属性)。所以我将把我的解决方案留在这里以供将来参考:

from celery import task

@task(bind=True)
def getName(self):
    return self.request.hostname

属性的名称对我来说听起来很奇怪,但它包含启动工作程序时使用“-n”选项指定的名称。 self 像你期望的类方法一样工作,你不需要在调用函数时指定它(例如:getName.delay())。

【讨论】:

您不需要在每次运行任务时都这样做。在连接时更容易捕获工作人员名称。请参阅下面的答案。 主机名是服务器的名字,不是worker的名字。【参考方案2】:

使用celeryd_after_setup 信号来捕获工人名称,如下所示:

from celery.signals import celeryd_after_setup

@celeryd_after_setup.connect
def capture_worker_name(sender, instance, **kwargs):
    os.environ["WORKER_NAME"] = '0'.format(sender)

【讨论】:

这应该是公认的答案,但是我会将名称放在 celery 配置中,而不是放在环境变量中 如果您使用celery_worker 夹具进行测试,请改用worker_init 信号,因为celeryd_after_setup 在测试中未被调用。【参考方案3】:

您需要使用容纳工人的台球:

from celery import task
from billiard import current_process

@task
def getName():
    p = current_process()
    return p.index

然后制作一个全局字典,将 ids->names 映射到进程创建。

【讨论】:

这不是我想要的。当你启动一个 celery worker 时,你可以给它一个名字(-n 标志)。我想知道是否有办法从任务中获取该名称。 不是那么直接,但为什么不制作一个全局 ids->names 字典? 台球绝对是关键,但是这个解决方案没有按要求得到名称。我在回答中添加了一个小更新。【参考方案4】:

您最初是在寻找您使用 -n 标志输入的名称,对吗?它在 initargs 数组中。这是答案的修改版本,可以帮助您:

from celery import task
from billiard import current_process

@task
def getName():
    p = current_process()
    return p.initargs[1].split('@')[1]

【讨论】:

不幸的是,这似乎不再适用于最近的 celery 版本。 current_process() 现在返回一个 ForkProcess 对象,它没有 initargs

以上是关于从芹菜任务中获取芹菜工人的名字?的主要内容,如果未能解决你的问题,请参考以下文章

芹菜工人在当前任务完成后不会再接新任务

Flower UI 不显示芹菜工人和任务

芹菜任务应该在工人迷路时排队

有没有办法非暴力地停止芹菜工人的特定任务?

芹菜节拍服务旧(已删除)任务

芹菜 - 无法获取任务结果