RabbitMQ:如何在 Python 生产者和消费者之间发送 Python 字典?

Posted

技术标签:

【中文标题】RabbitMQ:如何在 Python 生产者和消费者之间发送 Python 字典?【英文标题】:RabbitMQ: How to send Python dictionary between Python producer and consumer? 【发布时间】:2016-04-04 16:51:56 【问题描述】:

我正在尝试使用 RabbitMQ 将 python 字典从 python 生产者发送到 python 消费者。生产者首先建立与本地 RabbitMQ 服务器的连接。然后它创建一个消息将被传递到的队列,最后发送消息。消费者首先连接到 RabbitMQ 服务器,然后通过创建相同的队列来确保队列存在。然后它在回调函数中接收来自生产者的消息,并打印“id”值(1)。以下是生产者和消费者的脚本:

producer.py 脚本:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = 'id': 1, 'name': 'name1'
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()

consumer.py 脚本:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    print(body['id'])
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

但是,当我运行 producer.py 时,我得到了这个错误:

line 18, in <module>
    delivery_mode = 2, # make message persistent
  File "/Library/Python/2.7/site-packages/pika/adapters/blocking_connection.py", line 1978, in basic_publish
    mandatory, immediate)
  File "/Library/Python/2.7/site-packages/pika/adapters/blocking_connection.py", line 2064, in publish
    immediate=immediate)
  File "/Library/Python/2.7/site-packages/pika/channel.py", line 338, in basic_publish
    (properties, body))
  File "/Library/Python/2.7/site-packages/pika/channel.py", line 1150, in _send_method
    self.connection._send_method(self.channel_number, method_frame, content)
  File "/Library/Python/2.7/site-packages/pika/connection.py", line 1571, in _send_method
    self._send_message(channel_number, method_frame, content)
  File "/Library/Python/2.7/site-packages/pika/connection.py", line 1596, in _send_message
    content[1][s:e]).marshal())
TypeError: unhashable type

有人可以帮我吗?谢谢!

【问题讨论】:

您可以尝试将您的消息转换为 json 对象然后发送。 【参考方案1】:

您不能将本机 Python 类型作为有效负载发送,您必须先将它们序列化。我推荐使用 JSON:

import json
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=json.dumps(message),
                      properties=pika.BasicProperties(
                          delivery_mode = 2, # make message persistent
                      ))

def callback(ch, method, properties, body):
print(" [x] Received %r" % json.loads(body))

【讨论】:

谢谢!我已经成功发送了消息,但是在运行消费者之后出现了这个错误:ValueError: No JSON object could be decoded 好吧,您可以打印出body 来看看它的样子。 JSON 只是一个字符串,因此很容易被人类解析。 修复json消息格式后没有错误。非常感谢您的解决方案。你拯救了我的一天:) 感谢简短而直接的回答。它适用于python3.8

以上是关于RabbitMQ:如何在 Python 生产者和消费者之间发送 Python 字典?的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ介绍 + python操作

python操作RabbitMQ

python操作RabbitMQ

python-- RabbitMQ

python学习-day11

python使用rabbitMQ介绍三(发布订阅模式)