了解 celery 工作节点

Posted

技术标签:

【中文标题】了解 celery 工作节点【英文标题】:Understanding celery worker nodes 【发布时间】:2017-10-19 21:24:24 【问题描述】:

我想在这里了解 celery 和 AMQP 的工作原理。

我的场景

我在我的机器上安装了 celery

pip 安装芹菜

我使用

制作任务
from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

@app.task
def print_hello():
    print 'hello there'

据我了解,celery 将此任务转换为消息并通过 AMQP 协议发送到代理(redis 或 rabbitmq)。然后将这些消息排队并传递到工作节点以处理消息。

我的问题是,

    假设我在 Java 环境中创建任务,如果消息发送到外部工作节点,是否意味着工作节点服务器必须安装 Java 才能执行任务? 如果消息被外部工作节点选中,工作节点和代理如何找到对方?在上面的代码中,我只有代理地址来存储任务。

另外,为什么我们将任务存储在代理中?为什么我们不能在 celery 中实现交换算法并将消息直接发送给工人?

SOAP 和 AMQP 有什么区别?

【问题讨论】:

【参考方案1】:

工作人员不仅需要 Python,还需要您要在其上运行的任务的所有代码。

但是您没有专门针对节点,这正是存在代理的原因。你把你的任务放在队列中,然后工人来接它们。

我不知道您为什么在这种情况下提到 SOAP。它与任何事情都没有任何关系。

【讨论】:

谢谢。那么,无论我的应用程序在哪里创建任务(应用程序服务器),芹菜和工人也住在那里吗?我们不能有多个外部工作节点? 不,不一定。工作人员可以在应用程序服务器上,也可以是(许多)外部机器。正如我所说,重要的是它具有运行所需任务的代码。 好吧,让我这样提出我的问题。如果我在本地机器上创建了任务并将其发送到外部代理(test.redis.cache:6357)。现在此消息存储在代理中。假设如果我有一个外部节点工作者,这个节点如何知道我的代理(test.redis.cahe:6357)来获取任务? 当你在一个worker上启动Celery时,你用broker的地址来配置它,这样它就可以连接了。 Running the Celery worker server【参考方案2】:

您的问题的具体答案是:

    “如果消息被发送到外部工作节点”有点误导。消息本身不会发送到工作节点。它被发送到代理(由 URL 标识),特别是该代理上的 Exchange,并带有一个路由密钥,该路由密钥看到它进入队列。 Worker 都配置了相同的 Broker URL 并读取此 Queue,这在很大程度上是 [first-in-best-dressed][1] 的情况,第一个使用消息的 Worker(读取 AMQP 中的消息)在一个原子操作中从队列中删除)。 [消息][2] 与语言无关。然而,Workers 是用 Python 编写的,并且任务定义必须用 Python 编写,尽管 Python 任务定义当然可以通过任何方式调用任何其他库来执行任务。但从某种意义上说是的,无论您的任务需要什么运行时库来运行它,它都需要与 Worker 在同一台机器上,并且它们必须有一个 Python 包装器,以便 Worker 可以加载它们。

    "如果消息被外部工作节点选中,工作节点和代理如何找到对方?" - 这个问题具有误导性。他们没有找到对方。 Worker 配置了与 Client 完全相同的 Broker URL。它知道 URL。 Celery 通常在 Python 中解决此问题的方式是您共享的代码 sn-p 由 Client 和 Worker 加载。这实际上是芹菜的美丽之一。你用 Python 编写任务,然后将定义加载到 Worker 中而不会改变。因此,它们使用相同的代理,并定义了相同的任务。 @app.task 实际上创建了一个 Task 类实例,它有两个非常重要的方法:apply_async() 用于创建和发送请求任务的消息,run() 用于运行修饰函数。前者被称为客户。后者由 Worker(实际运行任务)。

    “为什么我们将任务存储在代理中?” - 任务不存储在代理中。该任务在 python 文件中定义,例如您的代码 sn-p。如 2 所述。相同的定义被 Client 和 Worker 读取。一条消息从客户端发送到 Worker,要求它运行任务。

    “为什么我们不能在 celery 中实现交换算法并将消息直接发送给工人?” - 我不得不在这里猜测一下,但我会问,为什么要重新发明***?定义了一个标准,AMQP(高级消息队列协议),并且该标准有许多实现。为什么还要写一篇? Celery 是 FOSS,和很多 FOSS 一样,我想开始编写它的人想要专注于任务管理而不是消息管理,并选择依靠 AMQP 进行消息管理。一个公平的选择。但值得一提的是,Celery 确实在 Kombu 中实现了很多,为 AMQP 提供 Python API。

SOAP(简单对象访问协议的缩写)是一种消息传递协议规范,用于在计算机网络中实现 Web 服务时交换结构化信息。

AMQP(Advanced Message Queuing Protocol 的缩写)是面向消息中间件的开放标准应用层协议。 AMQP 的定义特性是消息导向、队列、路由(包括点对点和发布订阅)、可靠性和安全性。

SOAP 在协议栈中通常是更高级别的。此处描述:

https://www.amqp.org/product/different

【讨论】:

以上是关于了解 celery 工作节点的主要内容,如果未能解决你的问题,请参考以下文章

使用java程序作为celery的工作节点

如何在多个机器上搭建celery的集群环境

Celery 检查工作任务并检索任务文档字符串

django/celery - 芹菜状态:错误:在时间限制内没有节点回复

基础入门_Python-模块和包.深入Celery之节点管理/任务调度/任务追踪?

Celery-定时任务