Kombu/Celery 消息传递

Posted

技术标签:

【中文标题】Kombu/Celery 消息传递【英文标题】:Kombu/Celery messaging 【发布时间】:2015-06-26 09:02:12 【问题描述】:

我有一个简单的应用程序,可以发送和接收消息、kombu,并使用 Celery 来处理消息。 Kombu alon,我可以正确接收消息。当我发送“Hello”时,kombu 会收到“Hello”。但是当我添加任务时,kombu 收到的是芹菜的任务 ID。

我做这个项目的目的是让我可以安排何时发送和接收消息,因此是 Celery。

我想知道的是为什么 kombu 接收的是任务 ID 而不是发送的消息?我找了又找,没有找到关于这件事的任何相关结果。我是使用此应用程序的初学者,如果能帮助我解决此问题,我将不胜感激。

我的代码:

task.py

from celery import Celery

app = Celery('tasks', broker='amqp://xx:xx@localhost/xx', backend='amqp://')

@app.task(name='task.add')
def add(x, y):
    return x+y

发送.py

import kombu
from task import add
#declare connection with broker connection
connection = kombu.Connection(hostname='xx',
                              userid='xx',
                              password='xx',
                              virtual_host='xx')

connection.connect()
if connection.connect() is False:
    print("not connected")
else:
    print("connected")

#checks if connection is okay


#rabbitmq connection
channel = connection.channel()

#queue & exchange for kombu
exchange = kombu.Exchange('exchnge', type='direct')
queue = kombu.Queue('kombu_queue', exchange, routing_key='queue1')

#message here

x = input ("Enter first name: ")
y = input ("Enter last name: ")
result= add.delay(x,y)
print(result)



#syntax used for sending messages to queue
producer = kombu.Producer(channel, exchange)
producer.publish(result,
                 exchange = exchange,
                 routing_key='queue1')

print("Message sent: [x]")
connection.release()

receive.py

import kombu

#receive
connection = kombu.Connection(hostname='xx',
                              userid='xx',
                              password='xx',
                              virtual_host='xx')
connection.connect()

channel = connection.channel()

exchange = kombu.Exchange('exchnge', type='direct')
queue = kombu.Queue('kombu_queue', exchange, routing_key='queue1')

print("Waiting for messages...")
def callback(body, message):
    print('Got message - %s' % body)
    message.ack()

consumer = kombu.Consumer(channel,
                          queues=queue,
                          callbacks=[callback])
consumer.consume()

while True:
    connection.drain_events()

我正在使用:

Kombu 3.0.26
Celery 3.1.18
RabbitMQ as the broker

我发送的内容:

xxx
yyy

kombu 收到什么:

Got message - d22880c9-b22c-48d8-bc96-5d839b224f2a

【问题讨论】:

Celery 'Getting Started' not able to retrieve results; always pending的可能重复 这个问题已经结束了。 【参考方案1】:

我找到了我的问题的答案,对于任何可能遇到此类问题的人,我将分享对我有用的答案。

I found the solution here.

Or here - 如果第一个链接不起作用,用户 jennaliu 的回答可能会对您有所帮助。

【讨论】:

【参考方案2】:

您需要调用result.get() 来接收add.delay() 的实际值。您所看到的消息正文是字符串格式的AsyncResult 实例。这没有多大意义。

【讨论】:

当我在 result = add.delay(x,y) 之后添加 result.get() 时显示 AttributeError: 'str' object has no attribute 'get'

以上是关于Kombu/Celery 消息传递的主要内容,如果未能解决你的问题,请参考以下文章

akka消息传递

DGL中的消息传递相关内容的讲解

Android消息传递用啥好?

objc_msgSend消息传递学习笔记 – 对象方法消息传递流程

Redis应用----消息传递

我们如何确定 XMPP 消息何时传递?