RabbitMQ 消息队列

Posted Zoe233

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 消息队列相关的知识,希望对你有一定的参考价值。

一、前提

Python中的队列:

1.线程QUEUE

  线程队列不能跨进程,只是单线程下的多个线程间的数据交互

2.进程QUEUE

  支持父进程于子进程进行交互,或者同属于同一父进程下的多个子进程进行交互。

 

  因此,两个独立的程序之间是不能使用Python中的QUEUE实现交互。(因为每个程序是独立的,是一个独立的进程,所以Python的队列无法实现两个独立的程序之间的交互)。

  所以想要实现两个独立的进程间的通信,可以使用下面几种方法:

  如:

  在独立的程序之间建立socket,实现通信;

  将要通信的内容通过json写到硬盘中,a程序写入到硬盘中,b程序中硬盘中读取(耗时);

  使用broker中间商代理,a程序与代理建立socket,b程序也与代理建立socket,a程序的消息发给代理,代理再发给a程序(易于维护)。

  本节主要介绍broker的使用,目前流行的broker有RabbitMQ,ZeroMQ,ActiveMQ,MSMQ。

 

二、RabbitMQ 消息队列介绍

  RabbitMQ是用erlang开发的,所以RabbitMQ依赖于erlang语言。在windows上安装和使用RabbitMQ的时候,要先装上erlang语言。

 

  在windows上安装完成erlang和RabbitMQ之后就自动启动,RabbitMQ在任务管理器的“服务”中可以进行查看确认。

  RabbitMQ支持多种语言,如:Java,.NET,Ruby,Python,phpjavascript等,可以在官网 http://www.rabbitmq.com查看相应的语言支持的模块。
  在Python中:

  pika,a pure-Python AMQP 0-9-1 client ( source code ,API reference )  

  Celery ,a distributed task queue for Django and pure Python

  Halgha, an asynchrounous AMQP 0-0-1 client based on libevent ( the source code and docs are on github )

  其中,pika是本文介绍使用的模块。

 

三、RabbitMQ基础信息

在RabbitMQ中,关于在Python中如何使用的官方文档:http://www.rabbitmq.com/tutorials/tutorial-one-python.html

3.1 RabbitMQ官方介绍

RabbitMQ是一个消息代理(中间商):它接收并推送消息。

想象一下,有一个邮政局:当你把想要邮寄出去的信放入邮箱,你可以确定邮政员先生最终可以将你的信发送到你的收信方。类似的,RabbitMQ在这种情况下,就是一个邮箱,邮政局和邮政员先生。

RabbitMQ与邮政局最大的区别是,邮政局可以发送纸质的信息,但是RabbitMQ不行。RabbitMQ只接收,存储和发送二进制的数据信息。

RabbitMQ一般情况下使用一些术语来传输信息。

3.1.1 数据定义

Producing就是发送。发送信息的程序是生产者。用P来指代。

Consuming就是接收。接收信息的程序就是消费者。用C来指代。

queue(队列)是在RabbitMQ内的一个邮箱的名称。尽管消息通过RabbitMQ和你的应用程序传输,信息只能被存储在queue中。队列仅有主机的内存和字旁限制绑定,它本质上是一个大的消息缓冲区。许多生产者可以发送信息到指定的队列,许多的消费者可以从指定的队列中收取信息。用queue_name表示。

注意:生产者,消费者和代理不必驻留在同一台主机上。在实际大多数应用中,也是如此。

3.1.2 RabbitMQ库介绍 

RabbitMQ说的是AMQP 0.9.1,它是一个开放的、通用的消息传递协议。有许多不同语言的RabbitMQ客户端。在这个实例中,我们将使用Pika,这是RabbitMQ团队推荐的Python模块。要安装它,您可以使用pip包管理工具。

pip install pika

3.1.3 生产者,消费者和代理之间实现通信的图示 

在生产者消费者模型中,生产者产生的信息发送隔离Broker,由Broker发送给相应的消费者。

 

四、在Python中利用pika模块实现不同类型的队列通信

实例1:最简单的使用pika实例——Hello World!

在官方文档中,关于使用pika实现的最简单的Hello World案例中,将用Python写两个小程序。Proceder(发送方)发送信息,Consumer(接收方)接收信息并且打印这些信息。要传输的信息就是\'Hello World!\'

在下面的图表中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列——RabbitMQ代表消费者保存的消息缓冲区。

我们的整体设计将会是:

生产者将消息发送到“hello”队列。消费者从该队列接收消息。

 

Sending

 

第一个程序send.py将会把单条信息\'Hello World!\'发送给队列hello。在send.py文件中,我们需要做的第一个事就是建立和RabbitMQ server的连接。

import pika  #导入模块pika

connection=pika.BlockingConnection(pika.ConnectionParameters(\'localhost\')) 
channel=connection.channel()  #建立和RabbitMQ server的连接我们 

通过上面步骤,我们的send.py小程序实现了连接到本地机器上的broker(代理)。如果我们想要连接到另一台机器上的代理,我们只需在这里指定它的名称或IP地址。

 

接下来,在发送之前,我们需要确定收件人队列是否存在。如果我们将消息发送到不存在的位置,RabbitMQ将会删除该消息。让我们创建一个名为hello的队列,send发送的消息将会发送到hello队列:

channel.queue_declare(queue=\'hello\')

在这时,我们准备发送一条消息。我们的第一个消息仅为\'Hello World !\'字符串。我们想把它发送到我们的hello队列。

 

在RabbitMQ中,消息从来不能直接发送到队列中,它总是需要通过交换。但是,让我们不要被细节所拖累——如果想要了解关于exchange的内容,可以在本教程的第三部分阅读更多关于交流exchange的内容。现在我们需要知道的是如何使用由空字符串标识的默认交换(exchange=\'\')。这个交换exchange是特殊的——它允许我们精确地指定消息应该发送到哪个队列。在routing_key参数中需要指定队列名称:

channel.basic_publish(exchange=\'\',routing_key=\'hello\',body=\'Hello World!\')
print("[x] Sent \'Hello World!\'")

在退出程序之前,我们需要确保网络缓冲区已被刷新,并且我们的消息已发送到RabbitMQ。我们可以关闭连接。

connection.close()

 

综上所述:

 1 import pika
 2 #第一步:与RabbitMQ server建立连接
 3 connection=pika.BlockingConnection(pika.ConnectionParameters(\'localhost\'))
 4 channel=connection.channel()
 5 
 6 #第二步:生产指定要发送的队列
 7 channel.queue_declare(queue=\'hello\')
 8 
 9 #第三步:通过交换exchange,发送数据到相应的队列;通过routing_key指定queue name;body中存放要传输的信息
10 channel.basic_publish(exchange=\'\',routing_key=\'hello\',body=\'Hello World!\')
11 print("[x] Sent \'Hello World!\'")
12 
13 #第四步:关闭连接
14 connection.close()

 

可能出现的问题:Sending doesn\'t work!

如果这是你第一次使用RabbitMQ,而你没有看到“发送”的消息,那么你可能会挠头,想知道到底出了什么问题。可能代理开始时没有足够的空闲磁盘空间(默认情况下,它至少需要200 MB),因此代理拒绝接收消息。检查代理日志文件以确认并减少必要的限制。配置文件文档将向您展示如何设置disk_free_limit。

 

 

Receiving

 

 

  

我们的第二个程序receive.py将从队列queue接收消息并在屏幕上打印它们。

首先,我们需要连接到RabbitMQ服务器。连接到RabbitMQ的代码和send.py中一样。

import pika  #导入模块pika

connection=pika.BlockingConnection(pika.ConnectionParameters(\'localhost\')) 
channel=connection.channel()  #建立和RabbitMQ server的连接我们 

  

第二步,就像send.py一样,确保队列存在。使用queue_declare创建队列是具有幂等性的——我们可以按照我们喜欢的次数运行这个命令,并且只创建一个命令。

channel.queue_declare(queue=\'hello\')

  您可能会问为什么我们再次声明队列——我们已经在以前的代码中声明了它。如果我们确信队列已经存在,我们可以避免这种情况。例如,如果send.py程序之前运行过。但是我们还不能确定先运行哪个程序。在这种情况下,重复在两个程序中声明队列是很好的做法。

 

Listing queues中看RabbitMQ中的queues和messages

您可能希望看到RabbitMQ中有哪些队列,有多少消息在队列中。可以使用rabbitmqctl工具(作为特权用户):

sudo rabbitmqctl list_queues

在Windows上,省略了sudo

rabbitmqctl.bat list_queues

  

从队列接收消息更加复杂。它的工作方式是订阅一个回调函数到一个队列。当我们收到消息时,这个回调函数被Pika库调用。在我们的例子中,这个函数将在屏幕上打印消息的内容。

def callback(ch,method,properties,body):
    print("[x] Received %r"%body)

  

 接下来,我们需要告诉RabbitMQ,这个特定的回调函数应该接收来自我们的hello队列的消息:

channel.basic_consume(callback,queue=\'hello\',no_ack=True)

Receiving.py要成功运行,我们必须确保我们希望订阅的队列是存在的。幸运的是,我们对此有信心——我们已经创建了一个上面的队列——使用queue_declare。

no_ack参数在之后会进行解释。

 

最后,我们输入一个永无止境的循环,等待数据并在必要时运行回调函数。

print("[x] Waiting for messages.To exit press CTRL")
channel.start_consuming()

 

综上所述:

 1 import pika
 2 #第一步:与RabbitMQ server建立连接
 3 connection=pika.BlockingConnection(pika.ConnectionParameters(\'localhost\'))
 4 channel=connection.channel()
 5 
 6 #第二步:生产指定要接收的队列
 7 channel.queue_declare(queue=\'hello\')
 8 
 9 #第三步:用回调函数到队列queue中,接收到信息时回调函数会被Pika库调用
10 def callback(ch,method,properties,body):
11     print("[x] Received %r"%body)
12 
13 #第四步:告诉RabbitMQ,这个特定的回调函数将接收来自hello队列的信息
14 channel.basic_consume(callback,queue=\'hello\',no_ack=True)
15 
16 #第五步:进入死循环等待数据,在必要的时候运行回调函数。
17 print("[x] Waiting for messages.To exit press CTRL + C")
18 channel.start_consuming()

 

现在我们就完成了一个一对一的Producer和Consumer的单条消息队列传输。

首先运行receive.py,它会一直等待数据。再运行send.py之后,Producer会在数据传输完成后停止。Consumer还会继续保持等待数据的状态。(receive.py可以一直运行,但是可能可以被Ctrl+C打断)

消息分发轮询:

先运行N个receive.py,再运行send.py。每个receive.py会按照运行的先后相继收到新触发的send.py的消息。每个Consumer轮询地去接收Producer的信息。

 

实例2:构建一个简单的工作队列 Work queues

 

在上面的第一个"Hello World!"的例子中,我们编写了一些程序来从一个已命名的队列发送和接收消息。在这个过程中,我们将创建一个工作队列work queue,用于在多个工作之间分配耗时的任务。

工作队列work queues(即任务队列task queues)背后的主要思想是避免立即执行资源密集型任务,并避免进程必须等待它完成。相反,我们可以把任务安排在以后做。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程会弹出任务并最终执行任务。当你运行多个工作时,任务将在他们之间共享。

Work queues这个概念在web应用程序中特别有用,因为在短HTTP请求窗口中无法处理复杂的任务。

Preparation

在教程的"Hello World!"例子中,我们发送了一个包含“Hello World !”的消息。现在我们将发送用于复杂任务的字符串。我们没有现实世界的任务,比如要调整图像大小或者呈现pdf文件,所以让我们通过使用time . sleep()函数来假装我们很忙。我们取字符串中的点dot的个数作为它的复杂度的依据(假设情况);字符串中有多少个dot,就说明处理该条字符串需要多少秒例如,一个“Hello…”字符串中有三个点,因此假定处理该字符串需要三秒钟。

我们会稍微修改一下send.py,使得Producer允许从命令行发送任意消息。这个修改过的程序将任务调度到我们的工作队列中。我们将其命名为new_task.py:

 1 import sys
 2 import pika
 3 
 4 #step1:create connection with RabbitMQ Server
 5 connection=pika.BlockingConnection(pika.ConnectionParameters(\'localhost\'))
 6 channel=connection.channel()
 7 
 8 #step2:create a named queue
 9 channel.queue_declare(queue=\'hello\')
10 
11 #step3:message的默认值为"Hello World!",如果terminal中传入了参数,则为参数
12 message=\'\'.join(sys.argv[1:]) or "Hello World!"
13 channel.basic_publish(exchange=\'\',routing_key=\'hello\',body=message)
14 print("[x] Sent %r"%message)
15 
16 #step4:close
17 connection.close()

 

我们也要稍微修改一下receive.py,使得接收的数据不立即在屏幕上打印,而是等待数据处理(依据字符串的dot数作为处理时长)完成后再向屏幕打印数据。新的Consumer将被命名为work.py:

 1 import pika
 2 import time
 3 
 4 #step1:create connection with RabbitMQ Server
 5 connection=pika.BlockingConnection(pika.ConnectionParameters(\'localhost\'))
 6 channel=connection.channel()
 7 
 8 #step2:create a named queue
 9 channel.queue_declare(queue=\'hello\')
10 
11 #step3:define callback function.
12 def callback(ch,method,properties,body):
13     print(\'[x] Received %r\'%body)
14     time.sleep(body.count(b\'.\'))
15     print(\'[x] Done.\')
16 
17 #step4:when Consumer receive message,the program call callback function to deal with message.
18 channel.basic_consume(callback,queue=\'hello\',no_ack=True)
19 
20 print(\'[x] Waiting for messages. To exit press "CTRL+C"\')
21 
22 #write an endless loop.
23 channel.start_consuming()

 

 

Round-robin dispatching循环调度

使用任务队列的优点之一是能够很容易地并行工作。如果我们积累了大量的工作,我们就可以增加更多的任务,这样就可以很容易地扩大规模。

首先,让我们同时运行两个worker.py脚本。他们将从队列中获取消息,但具体如何呢?让我们来看看。

你需要三个控制台。两个将会运行这个worker.py脚本。这些控制台将是我们的两个消费者——C1和C2。

# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C

  

# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C

在第三个console,可以发布Producer任务:

# shell 3
python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
python new_task.py Fourth message....
python new_task.py Fifth message.....

返回结果:

# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received \'First message.\'
# => [x] Received \'Third message...\'
# => [x] Received \'Fifth message.....\'

  

# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received \'Second message..\'
# => [x] Received \'Fourth message....\'

  默认情况下,RabbitMQ将按顺序将每个消息发送到下一个消费者。平均每个消费者将得到相同数量的消息。这种分配消息的方式称为循环。

 

Message acknowledgment消息确认

完成一个任务可能需要几秒钟。如果其中一个消费者开始了一项很长的任务,而只在一定程度上完成了,那么会发生什么呢?使用我们当前的代码,一旦RabbitMQ将消息传递给Consumer,RabbitMQ就会立即从内存中删除队列中的这条信息。在这种情况下,如果Consumer中断,我们将失去这个Consumer正在处理的信息。我们也将丢失所有发送给这个特定Consumer的消息,但是实际上这条信息还没有被Consumer处理。 

但我们不想失去任何任务信息。如果一个Consumer中断,我们希望把任务交给另一个Consumer。

为了确保消息不会丢失,RabbitMQ支持消息确认。Consumer在处理完该任务信息后,会给RabbitMQ发送一个ack(全称 acknowledgement),告诉RabbittMQ可以删除它。

如果一个Consumer中断(它的通道关闭了,连接关闭了,或者TCP连接丢失了),而没有发送ack,RabbitMQ将理解一条消息没有被完全处理,并且将重新排队。如果同时有其他Consumer在运行,它会很快将其转递给另一个Consumer。这样我们就可以确信,即使Consumer偶尔死亡,也不会失去任何信息。 

其中,没有任何消息超时的限制。RabbitMQ将在Consumer中段后重新传递消息。即使处理消息需要很长时间,也不会由于超时原因导致失败。 

默认情况下打开消息确认。在前面的例子中,我们明确地通过no_ack = True标记关闭了它们。

所以,在默认情况下,我们都不用设置no_ack,直接使用默认的no_ack=False即可。这样确保每次Consumer收到处理完消息后给Producer返回ack消息。

def callback(ch, method, properties, body):
    print ("[x] Received %r" % (body,))
    time.sleep( body.count(\'.\') )
    print ("[x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,queue=\'hello\')  #删除了no_ack=True

  把worker.py脚本中的代码按照上面的代码进行修改,就能保证Consumer中止,未给RabbitMQ发送acknowlegement,队列中的信息不会被删除。再次运行新的Consumer,信息还是会再次发送给新的在运行中的Consumer。

 

Forgotten acknowledgment

  忽略basic_ack是一个常见的错误。这是一个简单的错误,但后果却是严重的。当您的客户端退出时,消息将会被重新发送(看起来可能是随机的再次发送),但是RabbitMQ将会消耗越来越多的内存,因为它将无法释放任何未加处理的消息。

  为了调试这种错误,您可以使用rabbitmqctl来打印messages_unacknowledged字段:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

  如果脚本中忽略了basic_ack,那么队列中的信息会不停地在内存中堆积,队列中已经删除的信息,在内存中还是会放着。

c:\\Program Files\\RabbitMQ Server\\rabbitmq_server-3.6.10\\sbin>rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
Listing queues
hello   0       3

  通过上面在终端中查看的结果发现,队列中所有的信息已经被Consumer处理完后,messages

_unacknowleged的数量并未随着Consumer处理掉的信息(序列中删除的信息)的结果而减少相应的条数。

  加上了basic_ack之后,随着Consumer的处理结束,messages_unacknowleged的数量也随之减少,最后降为0.

 

Message durability 消息持久化 

我们已经学会了如何确保即使Consumer中断或停止,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。 

当RabbitMQ退出或崩溃时,它将忘记队列queues和消息messages。为了将队列和消息持久化,我们需要做两件事.。

首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为了这样做,我们需要宣布它是持久的:

channel.queue_declare(queue=\'hello\',durable=True)

尽管这个命令本身是正确的,但它在我们的设置中不起作用。这是因为我们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ不允许您重新定义一个具有不同参数的现有队列,并将对试图执行此操作的任何程序返回一个错误。但是有一个快速的解决方案——让我们用不同的名称来声明一个队列,例如task_queue:

channel.queue_declare(queue=\'task_queue\', durable=True)

上面的queue_declare的修改在Producer和Consumer之间都需要修改。

在这一点上,我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要把我们的信息标记为persisitent持久的。标记信息为persisitent,可以通过下面的代码实现:

channel.basic_publish(exchange=\'\',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

  

Note on message persistence

将消息标记为持久的并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受了消息并保存消息期间,仍然有很短的时间窗口。而且,RabbitMQ对每条消息都不执行fsync(2),它可能只保存到缓存,而不是真正写入磁盘。持久性保证不强,但是对于简单的任务队列来说已经足够了。如果你需要一个更有力的保证,那么你可以使用publisher confirms.

 

Fair dispatch 消息公平分发

您可能已经注意到,调度仍然不像我们希望的那样工作。例如,在一个有两个Consumer在运行的情况下,当部分消息需要很长时间处理,部分消息只需极短的时间处理时,一个Consumer会不停地在处理,另一个Consumer几乎没有什么计算量。RabbitMQ对此一无所知,并且仍然还是会轮询地平均分配消息。

这是因为当消息进入队列时,RabbitMQ才会发送一条消息。它不考虑Consumer未确认的消息messages_noackownledgement的数量。它只是盲目地向n个Consumer发送n个消息。

 

为了解决上面的这个问题,我们可以是basic.qos方法,将prefetch_count设置为1。

channel.basic_qos(prefetch_count=1)

这告诉RabbitMQ一次不给一个Consumer发送一个以上的消息。或者,换句话说,在Consumer没有处理并确认之前的消息之前,不要向员工发送新消息。相反,它会把它发送给下一个不在处理的Consumer。

Note about queue size

 如果所有在运行中的Consumers都在处理中。如果想要对此保持关注,那么可能需要增加更多的Consumer,或者使用message TTL

 

最后,下面是实现消息确认+消息和队列持久化+消息公平分发的Work queues:

 new_task.py:

 1 import sys
 2 import pika
 3 
 4 #step1:create connection with RabbitMQ Server
 5 connection=pika.BlockingConnection(pika.ConnectionParameters(\'localhost\'))
 6 channel=connection.channel()
 7 
 8 #step2:create a named queue
 9 channel.queue_declare(queue=\'hello\',durable=True)  #durable=True实现队列持久化
10 
11 #step3:message的默认值为"Hello World!",如果terminal中传入了参数,则为参数
12 message=\'\'.join(sys.argv[1:]) or "Hello World!"
13 channel.basic_publish(exchange=\'\',routing_key=\'hello\',body=message,
14                       properties=pika.BasicProperties(delivery_mode=2,)  #make message persisitent实现消息持久化
15                       )
16 print("[x] Sent %r"%message)
17 
18 #step4:close
19 connection.close()

 

work.py:

 1 import pika
 2 import time
 3 
 4 #step1:create connection with RabbitMQ Server
 5 connection=pika.BlockingConnection(pika.ConnectionParameters(\'localhost\'))
 6 channel=connection.channel()
 7 
 8 #step2:create a named queue
 9 channel.queue_declare(queue=\'hello\',durable=True)  #durable=True实现队列持久化,C和P都要保持一致
10 
11 #step3:define callback function.
12 def callback(ch,method,properties,body):
13     print(\'[x] Received %r\'%body)
14     time.sleep(body.count(b\'.\'))
15     print(\'[x] Done.\')
16     ch.basic_ack(delivery_tag=method.delivery_tag)  #basic_ack是消息确认的不可缺少的一部分,少了它内存中的messages_noacknowledgement不会自动较少
17 
18 
19 #step4:when Consumer receive message,the program call callback function to deal with message.
20 channel.basic_qos(prefetch_count=1)  #消息公平分发Fair dispatch ,设置RabbitMQ只向每个C发送一条message
21 channel.basic_consume(callback,queue=\'hello\') #使用no_ack=False的默认值,确保消息确认
22 
23 print(\'[x] Waiting for messages. To exit press "CTRL+C"\')
24 
25 #write an endless loop.
26 channel.start_consuming()

 

实例3:构建一个简单的日志记录系统:消息发布\\订阅Publish/Subscribe

在第二个实例中,我们创建了一个工作队列。工作队列背后的假设是,每个任务都交付给一个worker。在这一部分中,我们将做一些完全不同的事情——我们将向多个消费者传递消息。这种模式被称为“发布/订阅”。

为了说明这个模式,我们将构建一个简单的日志记录系统。它将由两个程序组成——第一个程序将发出日志消息,第二个程序将接收并打印它们。

在我们的日志系统中,接收程序Consumer的每一个运行副本都将得到消息。这样我们就能运行一个接收器,并将日志引导到磁盘;同时,我们可以运行另一个接收器,在屏幕上打印日志。

基本上,发布的日志消息将被所有Consumer接收,这种类型叫做广播broadcast。

 

Exchanges

在前两个实例中,Producer发送消息到队列,Consumer从队列中接收消息列。现在,我们来介绍一下RabbitMQ的完整消息传递模型。

让我们快速回顾一下前面的教程中介绍的内容:

  生产者是发送消息的用户应用程序。

  队列是存储消息的缓冲区。

  消费者是接收消息的用户应用程序。

RabbitMQ中的消息传递模型的核心思想是,生产者不会直接向队列发送任何消息。实际上,生产者甚至不知道消息是否会被传递到哪个队列。

生产者只能将消息发送到exchange。exchange接收来自生产者的消息,再将消息推送到队列中。

exchange必须知道如何处理它收到的消息:

  它是否应该附加到特定的队列?

  它应该被附加到许多队列吗?

  或者应该被抛弃。

这些规则由交换类型定义。

 

可用的交换类型有:direct,topic,headers,fanout。

1.fanout  以类似广播的方式向所有的C发送信息:所有bind到类型为fanout的这个exchange的queue都可以接收信息

2.direct  通过routingKey和exchange决定的那个唯一的queue可以接收消息

3.topic 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

4.headers

 

在这个实例中,我们将使用fanout类型的exchange来实现。

channel.exchange_declare(exchange=\'logs\', type=\'fanout\')

上面的代码声明了类型为fanout,名为logs的exchange。

 

fanout交换非常简单。它只是将接收到的所有消息广播到它所知道的所有队列。

代码实现:

channel.basic_publish(exchange=\'log\',  #exchange的名称为logs
                      routing_key=\'\',   #不指定序列名称
                      body=message)
                 

  

 

Listing exchanges

想要内存在RabbitMQ Server中的所有exchanges,也可以使用rabbitmqctl工具:

rabbitmqctl list_exchanges

在返回的列表中会有一些amq.*交换 和 默认的(未命名)交换。这些exchange是默认创建的,但现在我们不太可能需要使用它们。

 

 

The default exchange

在本教程的前几部分中,我们对交换一无所知,但仍然能够将消息发送到队列。这是因为我们使用的是默认交换,我们通过空字符串(“”)来识别它。

交换参数是交换的名称。空字符串表示默认或匿名交换:消息被发送到参数routing_key指定的队列中去。

 

Temporary queues 临时队列

在前两个实例中,我们使用有指定名称的队列(如:hello)。对我们来说,能够命名队列是至关重要的——我们需要将Consumer指向相同的队列。当您想要在生产者和消费者之间共享队列时,给队列命名是很重要的。

但在这个实例中,命名队列的情况却并非如此。

我们想要听到所有日志信息,而不仅仅是他们的子集。我们也只对当前flowing messages感兴趣,而不是旧消息。要解决这两个问题,我们需要两步。

第一步,当我们连接RabbbitMQ时,我们需要一个新的空队列。我们可以创建一个随机名称的队列或让server选择一个随机队列名称。这个可以通过不在queue_declare()中设置参数queue来实现:

result = channel.queue_declare()

result.method.queue包含了一个随机队列名。比方说“amq.gen-JzTY20BRgKO-HjmUJj0wLg”这样的类似的名称。

第二步,一旦Consumer不和该随机名称的队列连接了,这个队列就可以删除。通过设置exclusive参数实现。

result = channel.queue_declare(exclusive=True)  

 

Bindings

 我们已经创建了一个类型为fanout的exchange和一个队列。现在我们需要告诉exchange将消息发送到我们的队列中去。exchange和queue之间的联系被称为binding绑定。

channel.queue_bind(exchange=\'logs\',
            queue=result.method.queue #result.method.queue中生成一个随机名的queue )

从现在起,名为logs的exchange将会向我们的队列中添加消息。

 

Listing bindings

 我们可以通过rabbitmqctl工具列出现有的bindings。

rabbitmqctl list_bindings

  

综上所述:

Producer程序负责发送日志信息,大部分实现和前面两个实例中的Producer没有大的区别。最重要的变化是,我们现在想要把消息发布到名为logs,类型为fanout的exchange中去,而不是默认的exchange。我们需要在发送时提供一个routing_key,但是它的值被忽略了。

下面是Producer的代码,emit_log.py: