基于Python语言使用RabbitMQ消息队列
Posted ExplorerMan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Python语言使用RabbitMQ消息队列相关的知识,希望对你有一定的参考价值。
介绍
RabbitMQ 是一个消息中间人(broker): 它接收并且发送消息. 你可以把它想象成一个邮局: 当你把想要寄出的信放到邮筒里时, 你可以确定邮递员会把信件送到收信人那里. 在这个比喻中, RabbitMQ 就是一个邮筒, 同时也是邮局和邮递员 .
和邮局的主要不同点在于RabbitMQ不处理纸质信件, 而是 接收(accepts), 存储(stores) 和转发(forwards)二进制数据块 —— 消息(messages).
在RabbitMQ中有一些自己的行业术语要了解 .
生产(producing)在这里的意思就是发送(sending). 一个发送消息的程序就是生产者( producer) :
队列(queue) 可以看做是邮筒的别名 ,它存在于RabbitMQ中. 虽然消息在RabbitMQ和你的应用程序中流转, 但它只能被存储在队列当中. 一个队列只受到主机的内存和磁盘的限制, 它实际上是个大的消息缓冲区. 许多生产者可以发送消息到一个队列, 许多消费者可以从队列中接收数据. 下面是队列的示意图:
消费(consuming) 与接收(receiving)有相似的含义. 消费者(consumer)就是等待接收消息的程序 :
要注意的是 生产者, 消费者, 和中间人不必在相同的主机上,实际上大多数情况下它们都不在同一台主机上
(using the pika 0.10.0 Python client)
在教程的这部分里我们用Python写两个小程序; 一个 发送消息的生产者 (sender), 一个接收消息并把它打印出来的消费者consumer (receiver)
在下面的图例中, “P” 代表我们的生产者 , “C”代表我们的消费者. 中间的盒子是一个队列—由RabbitMQ 维持的消息缓冲区.
我们的整体设计大致如下图所示:
生产者发送消息到名为 “hello”的 队列. 消费者从那个队列中接收消息
RabbitMQ 库
RabbitMQ遵循 AMQP 0.9.1, 这是一个开源的, 多用途(general-purpose)的消息发送协议.
针对RabbitMQ,在不同语言中有多种客户端可用. 在本教程系列中我们将使用 Pika, 这是由RabbitMQ团队推荐的 Python客户端. 你可以使用pip安装.
发送
我们的第一个程序send.py 将会发送一条消息到队列中. 我们要做的第一件事是和 RabbitMQ 服务建立连接.
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))
channel = connection.channel()
- 1
- 2
- 3
- 4
- 5
现在我们已经建立了一个到本地机器的中间人(broker)的连接, 如果想要连接到不同的机器上的中间人,只要把‘localhost’替换成指定的名字和IP地址即可.
下一步, 在发送前我们要确保接收的队列存在. 如果我们发送消息到一个不存在的地址, RabbitMQ 会把消息丢弃掉. 我们创建一个名为‘hello’的队列 ,把消息发送到这个队列中:
channel.queue_declare(queue=‘hello‘)
- 1
到这里我们准备好要发送消息了,第一条消息只是一个简单的字符串“hello world!”,把它发送到队列中
In RabbitMQ 一条消息从不会被直接发送到队列, 它会先经过一个交换所(exchange). 但我我们不要被细节缠住 ? 你会在教程的第三部分了解更多关于交换所的内容. 目前我们需要知道的就是如何使用有空字符串所指定的默认交换所。这个交换所允许我们准确指定消息应该前往哪个队列。 队列名由 “routing_key”参数指定:
channel.basic_publish(exchange=‘‘,
routing_key=‘hello‘,
body=‘Hello World!‘)
print(" [x] Sent ‘Hello World!‘")
- 1
- 2
- 3
- 4
退出程序前我们需要确保网络缓冲区(network buffers)被冲刷(flushed),并且我们的消息真的被发送到了RabbitMQ. 这只需要通过关闭连接来完成:
connection.close()
- 1
接收
我们的第二个程序 receive.py 将会从队列接收消息并且打印出来。
同样,我们首先要连接到RabbitMQ 服务。 连接到Rabbit的代码同前面的一样 。
下一步,同先前一样,要确保队列存在. 使用queue_declare 创建队列是一个幂等(idempotent)操作 ? 我们想运行多少次这个命令都可以, 但只有一个队列被创建.
channel.queue_declare(queue=‘hello‘)
- 1
你可能会问为什么又一次声明队列 ? 我们在前面的代码中已经声明过一次. 如果我们确定队列存在的话的话可以避免那么做. 例如 send.py 已经运行了. 但我们不确定哪个程序先运行. 在这种情况下最好在两个程序中都声明一下,这是一个好的习惯。
列出所有队列
如果你想查看RabbitMQ 拥有哪些队列,有多少消息在其中.你可以使用 rabbitmqctl 工具:
sudo rabbitmqctl list_queues
在 Windows中:rabbitmqctl.bat list_queues
从队列中接收消息会稍微复杂一些. 通过给队列提供一个callback 函数来实现. 无论何时接收到消息, 这个callback 函数都会被 Pika 库调用. 在我们这里,这个函数会打印出接收到的消息.
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
- 1
- 2
下一步, 我们需要告诉 RabbitMQ 这个callback函数应该从我们的 “hello”队列中接收消息:
channel.basic_consume(callback,
queue=‘hello‘,
no_ack=True)
- 1
- 2
- 3
这里的“no_ack ”参数会在后面有介绍.
最后我们加一个等待接收数据并且在必要时运行回调函数的永远不会终止的循环.
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
- 1
- 2
整合
send.py的完整代码:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘hello‘)
channel.basic_publish(exchange=‘‘,
routing_key=‘hello‘,
body=‘Hello World!‘)
print(" [x] Sent ‘Hello World!‘")
connection.close()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
receive.py的完整代码:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘hello‘)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue=‘hello‘,
no_ack=True)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
现在在终端运行我们的程序. 首先,启动一个消费者程序, 这会持续运行来等待接收消息:
python receive.py
- 1
下面是在我的Ubuntu终端上的运行结果:
现在来启动生产者. 生产者程序在运行完会退出:
python send.py
- 1
在回头看之前打开的消费者程序终端,已经接到了消息:
我们已经学会了如何向一个命名队列中发送和接收消息. 下一节我们来构建一个简单的工作队列(work queue)
以上是关于基于Python语言使用RabbitMQ消息队列的主要内容,如果未能解决你的问题,请参考以下文章