RabbitMQ 消息队列
Posted Zoe233
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 消息队列相关的知识,希望对你有一定的参考价值。
一、前提
Python中的队列:
1.线程QUEUE
线程队列不能跨进程,只是单线程下的多个线程间的数据交互
2.进程QUEUE
支持父进程于子进程进行交互,或者同属于同一父进程下的多个子进程进行交互。
因此,两个独立的程序之间是不能使用Python中的QUEUE实现交互。(因为每个程序是独立的,是一个独立的进程,所以Python的队列无法实现两个独立的程序之间的交互)。
所以想要实现两个独立的进程间的通信,可以使用下面几种方法:
如:
在独立的程序之间建立socket,实现通信;
将要通信的内容通过json写到硬盘中,a程序写入到硬盘中,b程序中硬盘中读取(耗时);
使用broker中间商代理,a程序与代理建立socket,b程序也与代理建立socket,a程序的消息发给代理,代理再发给a程序(易于维护)。
本节主要介绍broker的使用,目前流行的broker有RabbitMQ,ZeroMQ,ActiveMQ,MSMQ。
二、RabbitMQ 消息队列介绍
RabbitMQ是用erlang开发的,所以RabbitMQ依赖于erlang语言。在windows上安装和使用RabbitMQ的时候,要先装上erlang语言。
在windows上安装完成erlang和RabbitMQ之后就自动启动,RabbitMQ在任务管理器的“服务”中可以进行查看确认。
RabbitMQ支持多种语言,如:Java,.NET,Ruby,Python,php,javascript等,可以在官网 http://www.rabbitmq.com查看相应的语言支持的模块。
在Python中:
pika,a pure-Python AMQP 0-9-1 client ( source code ,API reference )
Celery ,a distributed task queue for Django and pure Python
Halgha, an asynchrounous AMQP 0-0-1 client based on libevent ( the source code and docs are on github )
其中,pika是本文介绍使用的模块。
三、RabbitMQ基础信息
在RabbitMQ中,关于在Python中如何使用的官方文档:http://www.rabbitmq.com/tutorials/tutorial-one-python.html
3.1 RabbitMQ官方介绍
RabbitMQ是一个消息代理(中间商):它接收并推送消息。
想象一下,有一个邮政局:当你把想要邮寄出去的信放入邮箱,你可以确定邮政员先生最终可以将你的信发送到你的收信方。类似的,RabbitMQ在这种情况下,就是一个邮箱,邮政局和邮政员先生。
RabbitMQ与邮政局最大的区别是,邮政局可以发送纸质的信息,但是RabbitMQ不行。RabbitMQ只接收,存储和发送二进制的数据信息。
RabbitMQ一般情况下使用一些术语来传输信息。
3.1.1 数据定义
Producing就是发送。发送信息的程序是生产者。用P来指代。
Consuming就是接收。接收信息的程序就是消费者。用C来指代。
queue(队列)是在RabbitMQ内的一个邮箱的名称。尽管消息通过RabbitMQ和你的应用程序传输,信息只能被存储在queue中。队列仅有主机的内存和字旁限制绑定,它本质上是一个大的消息缓冲区。许多生产者可以发送信息到指定的队列,许多的消费者可以从指定的队列中收取信息。用queue_name表示。
注意:生产者,消费者和代理不必驻留在同一台主机上。在实际大多数应用中,也是如此。
3.1.2 RabbitMQ库介绍
RabbitMQ说的是AMQP 0.9.1,它是一个开放的、通用的消息传递协议。有许多不同语言的RabbitMQ客户端。在这个实例中,我们将使用Pika,这是RabbitMQ团队推荐的Python模块。要安装它,您可以使用pip包管理工具。
pip install pika
3.1.3 生产者,消费者和代理之间实现通信的图示
在生产者消费者模型中,生产者产生的信息发送隔离Broker,由Broker发送给相应的消费者。
四、在Python中利用pika模块实现不同类型的队列通信
实例1:最简单的使用pika实例——Hello World!
在官方文档中,关于使用pika实现的最简单的Hello World案例中,将用Python写两个小程序。Proceder(发送方)发送信息,Consumer(接收方)接收信息并且打印这些信息。要传输的信息就是\'Hello World!\'
在下面的图表中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列——RabbitMQ代表消费者保存的消息缓冲区。
我们的整体设计将会是:
生产者将消息发送到“hello”队列。消费者从该队列接收消息。
Sending
第一个程序send.py将会把单条信息\'Hello World!\'发送给队列hello。在send.py文件中,我们需要做的第一个事就是建立和RabbitMQ server的连接。
import pika #导入模块pika connection=pika.BlockingConnection(pika.ConnectionParameters(\'localhost\')) channel=connection.channel() #建立和RabbitMQ server的连接我们
通过上面步骤,我们的send.py小程序实现了连接到本地机器上的broker(代理)。如果我们想要连接到另一台机器上的代理,我们只需在这里指定它的名称或IP地址。
接下来,在发送之前,我们需要确定收件人队列是否存在。如果我们将消息发送到不存在的位置,RabbitMQ将会删除该消息。让我们创建一个名为hello的队列,send发送的消息将会发送到hello队列:
channel.queue_declare(queue=\'hello\')
在这时,我们准备发送一条消息。我们的第一个消息仅为\'Hello World !\'字符串。我们想把它发送到我们的hello队列。
在RabbitMQ中,消息从来不能直接发送到队列中,它总是需要通过交换。但是,让我们不要被细节所拖累——如果想要了解关于exchange的内容,可以在本教程的第三部分阅读更多关于交流exchange的内容。现在我们需要知道的是如何使用由空字符串标识的默认交换(exchange=\'\')。这个交换exchange是特殊的——它允许我们精确地指定消息应该发送到哪个队列。在routing_key参数中需要指定队列名称:
channel.basic_publish(exchange=\'\',routing_key=\'hello\',body=\'Hello World!\') print("[x] Sent \'Hello World!\'")
在退出程序之前,我们需要确保网络缓冲区已被刷新,并且我们的消息已发送到RabbitMQ。我们可以关闭连接。
connection.close()
综上所述:
1 import pika 2 #第一步:与RabbitMQ server建立连接 3 connection=pika.BlockingConnection(pika.ConnectionParameters(\'localhost\')) 4 channel=connection.channel() 5 6 #第二步:生产指定要发送的队列 7 channel.queue_declare(queue=\'hello\') 8 9 #第三步:通过交换exchange,发送数据到相应的队列;通过routing_key指定queue name;body中存放要传输的信息 10 channel.basic_publish(exchange=\'\',routing_key=\'hello\',body=\'Hello World!\') 11 print("[x] Sent \'Hello World!\'") 12 13 #第四步:关闭连接 14 connection.close()
可能出现的问题:Sending doesn\'t work!
如果这是你第一次使用RabbitMQ,而你没有看到“发送”的消息,那么你可能会挠头,想知道到底出了什么问题。可能代理开始时没有足够的空闲磁盘空间(默认情况下,它至少需要200 MB),因此代理拒绝接收消息。检查代理日志文件以确认并减少必要的限制。配置文件文档将向您展示如何设置disk_free_limit。
Receiving
我们的第二个程序receive.py将从队列queue接收消息并在屏幕上打印它们。
首先,我们需要连接到RabbitMQ服务器。连接到RabbitMQ的代码和send.py中一样。
import pika #导入模块pika connection=pika.BlockingConnection(pika.ConnectionParameters(\'localhost\')) channel=connection.channel() #建立和RabbitMQ server的连接我们
第二步,就像send.py一样,确保队列存在。使用queue_declare创建队列是具有幂等性的——我们可以按照我们喜欢的次数运行这个命令,并且只创建一个命令。
channel.queue_declare(queue=\'hello\')
您可能会问为什么我们再次声明队列——我们已经在以前的代码中声明了它。如果我们确信队列已经存在,我们可以避免这种情况。例如,如果send.py程序之前运行过。但是我们还不能确定先运行哪个程序。在这种情况下,重复在两个程序中声明队列是很好的做法。
Listing queues中看RabbitMQ中的queues和messages
您可能希望看到RabbitMQ中有哪些队列,有多少消息在队列中。可以使用rabbitmqctl工具(作为特权用户):
sudo rabbitmqctl list_queues
在Windows上,省略了sudo
rabbitmqctl.bat list_queues
从队列接收消息更加复杂。它的工作方式是订阅一个回调函数到一个队列。当我们收到消息时,这个回调函数被Pika库调用。在我们的例子中,这个函数将在屏幕上打印消息的内容。
def callback(ch,method,properties,body): print("[x] Received %r"%body)
接下来,我们需要告诉RabbitMQ,这个特定的回调函数应该接收来自我们的hello队列的消息:
channel.basic_consume(callback,queue=\'hello\',no_ack=True)
Receiving.py要成功运行,我们必须确保我们希望订阅的队列是存在的。幸运的是,我们对此有信心——我们已经创建了一个上面的队列——使用queue_declare。
no_ack参数在之后会进行解释。
最后,我们输入一个永无止境的循环,等待数据并在必要时运行回调函数。
print("[x] Waiting for messages.To exit press CTRL") channel.start_consuming()
综上所述:
1 import pika 2 #第一步:与RabbitMQ server建立连接 3 connection=pika.BlockingConnection(pika.ConnectionParameters(\'localhost\')) 4 channel=connection.channel() 5 6 #第二步:生产指定要接收的队列 7 channel.queue_declare(queue=\'hello\') 8 9 #第三步:用回调函数到队列queue中,接收到信息时回调函数会被Pika库调用 10 def callback(ch,method,properties,body): 11 print("[x] Received %r"%body) 12 13 #第四步:告诉RabbitMQ,这个特定的回调函数将接收来自hello队列的信息 14 channel.basic_consume(callback,queue=\'hello\',no_ack=True) 15 16 #第五步:进入死循环等待数据,在必要的时候运行回调函数。 17 print("[x] Waiting for messages.To exit press CTRL + C") 18 channel.start_consuming()
现在我们就完成了一个一对一的Producer和Consumer的单条消息队列传输。
首先运行receive.py,它会一直等待数据。再运行send.py之后,Producer会在数据传输完成后停止。Consumer还会继续保持等待数据的状态。(receive.py可以一直运行,但是可能可以被Ctrl+C打断)
消息分发轮询:
先运行N个receive.py,再运行send.py。每个receive.py会按照运行的先后相继收到新触发的send.py的消息。每个Consumer轮询地去接收Producer的信息。
实例2:构建一个简单的工作队列 Work queues
在上面的第一个"Hello World!"的例子中,我们编写了一些程序来从一个已命名的队列发送和接收消息。在这个过程中,我们将创建一个工作队列work queue,用于在多个工作之间分配耗时的任务。
工作队列work queues(即任务队列task queues)背后的主要思想是避免立即执行资源密集型任务,并避免进程必须等待它完成。相反,我们可以把任务安排在以后做。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程会弹出任务并最终执行任务。当你运行多个工作时,任务将在他们之间共享。
Work queues这个概念在web应用程序中特别有用,因为在短HTTP请求窗口中无法处理复杂的任务。
Preparation
在教程的"Hello World!"例子中,我们发送了一个包含“Hello World !”的消息。现在我们将发送用于复杂任务的字符串。我们没有现实世界的任务,比如要调整图像大小或者呈现pdf文件,所以让我们通过使用time . sleep()函数来假装我们很忙。我们取字符串中的点dot的个数作为它的复杂度的依据(假设情况);字符串中有多少个dot,就说明处理该条字符串需要多少秒。例如,一个“Hello…”字符串中有三个点,因此假定处理该字符串需要三秒钟。
我们会稍微修改一下send.py,使得Producer允许从命令行发送任意消息。这个修改过的程序将任务调度到我们的工作队列中。我们将其命名为new_task.py:
1 import sys 2 import pika 3 4 #step1:create connection with RabbitMQ Server 5 connection=pika.BlockingConnection(pika.ConnectionParameters(\'localhost\')) 6 channel=connection.channel() 7 8 #step2:create a named queue 9 channel.queue_declare(queue=\'hello\') 10 11 #step3:message的默认值为"Hello World!",如果terminal中传入了参数,则为参数 12 message=\'\'.join(sys.argv[1:]) or "Hello World!" 13 channel.basic_publish(exchange=\'\',routing_key=\'hello\',body=message) 14 print("[x] Sent %r"%message) 15 16 #step4:close 17 connection.close()
我们也要稍微修改一下receive.py,使得接收的数据不立即在屏幕上打印,而是等待数据处理(依据字符串的dot数作为处理时长)完成后再向屏幕打印数据。新的Consumer将被命名为work.py:
1 import pika 2 import time 3 4 #step1:create connection with RabbitMQ Server 5 connection=pika.BlockingConnection(pika.ConnectionParameters(\'localhost\')) 6 channel=connection.channel() 7 8 #step2:create a named queue 9 channel.queue_declare(queue=\'hello\') 10 11 #step3:define callback function. 12 def callback(ch,method,properties,body): 13 print(\'[x] Received %r\'%body) 14 time.sleep(body.count(b\'.\')) 15 print(\'[x] Done.\') 16 17 #step4:when Consumer receive message,the program call callback function to deal with message. 18 channel.basic_consume(callback,queue=\'hello\',no_ack=True) 19 20 print(\'[x] Waiting for messages. To exit press "CTRL+C"\') 21 22 #write an endless loop. 23 channel.start_consuming()
Round-robin dispatching循环调度
使用任务队列的优点之一是能够很容易地并行工作。如果我们积累了大量的工作,我们就可以增加更多的任务,这样就可以很容易地扩大规模。
首先,让我们同时运行两个worker.py脚本。他们将从队列中获取消息,但具体如何呢?让我们来看看。
你需要三个控制台。两个将会运行这个worker.py脚本。这些控制台将是我们的两个消费者——C1和C2。
# shell 1 python worker.py # => [*] Waiting for messages. To exit press CTRL+C
# shell 2 python worker.py # => [*] Waiting for messages. To exit press CTRL+C
在第三个console,可以发布Producer任务:
# shell 3 python new_task.py First message. python new_task.py Second message.. python new_task.py Third message... python new_task.py Fourth message.... python new_task.py Fifth message.....
返回结果:
# shell 1 python worker.py # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received \'First message.\' # => [x] Received \'Third message...\' # => [x] Received \'Fifth message.....\'
# shell 2 python worker.py # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received \'Second message..\' # => [x] Received \'Fourth message....\'
默认情况下,RabbitMQ将按顺序将每个消息发送到下一个消费者。平均每个消费者将得到相同数量的消息。这种分配消息的方式称为循环。
Message acknowledgment消息确认
完成一个任务可能需要几秒钟。如果其中一个消费者开始了一项很长的任务,而只在一定程度上完成了,那么会发生什么呢?使用我们当前的代码,一旦RabbitMQ将消息传递给Consumer,RabbitMQ就会立即从内存中删除队列中的这条信息。在这种情况下,如果Consumer中断,我们将失去这个Consumer正在处理的信息。我们也将丢失所有发送给这个特定Consumer的消息,但是实际上这条信息还没有被Consumer处理。
但我们不想失去任何任务信息。如果一个Consumer中断,我们希望把任务交给另一个Consumer。
为了确保消息不会丢失,RabbitMQ支持消息确认。Consumer在处理完该任务信息后,会给RabbitMQ发送一个ack(全称 acknowledgement),告诉RabbittMQ可以删除它。
如果一个Consumer中断(它的通道关闭了,连接关闭了,或者TCP连接丢失了),而没有发送ack,RabbitMQ将理解一条消息没有被完全处理,并且将重新排队。如果同时有其他Consumer在运行,它会很快将其转递给另一个Consumer。这样我们就可以确信,即使Consumer偶尔死亡,也不会失去任何信息。
其中,没有任何消息超时的限制。RabbitMQ将在Consumer中段后重新传递消息。即使处理消息需要很长时间,也不会由于超时原因导致失败。
默认情况下打开消息确认。在前面的例子中,我们明确地通过no_ack = True标记关闭了它们。
所以,在默认情况下,我们都不用设置no_ack,直接使用默认的no_ack=False即可。这样确保每次Consumer收到处理完消息后给Producer返回ack消息。
def callback(ch, method, properties, body): print ("[x] Received %r" % (body,)) time.sleep( body.count(\'.\') ) print ("[x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback,queue=\'hello\') #删除了no_ack=True
把worker.py脚本中的代码按照上面的代码进行修改,就能保证Consumer中止,未给RabbitMQ发送acknowlegement,队列中的信息不会被删除。再次运行新的Consumer,信息还是会再次发送给新的在运行中的Consumer。
Forgotten acknowledgment
忽略basic_ack是一个常见的错误。这是一个简单的错误,但后果却是严重的。当您的客户端退出时,消息将会被重新发送(看起来可能是随机的再次发送),但是RabbitMQ将会消耗越来越多的内存,因为它将无法释放任何未加处理的消息。
为了调试这种错误,您可以使用rabbitmqctl来打印messages_unacknowledged字段:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
如果脚本中忽略了basic_ack,那么队列中的信息会不停地在内存中堆积,队列中已经删除的信息,在内存中还是会放着。
c:\\Program Files\\RabbitMQ Server\\rabbitmq_server-3.6.10\\sbin>rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged Listing queues hello 0 3
通过上面在终端中查看的结果发现,队列中所有的信息已经被Consumer处理完后,messages
_unacknowleged的数量并未随着Consumer处理掉的信息(序列中删除的信息)的结果而减少相应的条数。
加上了basic_ack之后,随着Consumer的处理结束,messages_unacknowleged的数量也随之减少,最后降为0.
Message durability 消息持久化
我们已经学会了如何确保即使Consumer中断或停止,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它将忘记队列queues和消息messages。为了将队列和消息持久化,我们需要做两件事.。
首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为了这样做,我们需要宣布它是持久的:
channel.queue_declare(queue=\'hello\',durable=True)
尽管这个命令本身是正确的,但它在我们的设置中不起作用。这是因为我们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ不允许您重新定义一个具有不同参数的现有队列,并将对试图执行此操作的任何程序返回一个错误。但是有一个快速的解决方案——让我们用不同的名称来声明一个队列,例如task_queue:
channel.queue_declare(queue=\'task_queue\', durable=True)
上面的queue_declare的修改在Producer和Consumer之间都需要修改。
在这一点上,我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要把我们的信息标记为persisitent持久的。标记信息为persisitent,可以通过下面的代码实现:
channel.basic_publish(exchange=\'\', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
Note on message persistence
将消息标记为持久的并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受了消息并保存消息期间,仍然有很短的时间窗口。而且,RabbitMQ对每条消息都不执行fsync(2),它可能只保存到缓存,而不是真正写入磁盘。持久性保证不强,但是对于简单的任务队列来说已经足够了。如果你需要一个更有力的保证,那么你可以使用publisher confirms.
Fair dispatch 消息公平分发
您可能已经注意到,调度仍然不像我们希望的那样工作。例如,在一个有两个Consumer在运行的情况下,当部分消息需要很长时间处理,部分消息只需极短的时间处理时,一个Consumer会不停地在处理,另一个Consumer几乎没有什么计算量。RabbitMQ对此一无所知,并且仍然还是会轮询地平均分配消息。
这是因为当消息进入队列时,RabbitMQ才会发送一条消息。它不考虑Consumer未确认的消息messages_noackownledgement的数量。它只是盲目地向n个Consumer发送n个消息。
为了解决上面的这个问题,我们可以是basic.qos方法,将prefetch_count设置为1。
channel.basic_qos(prefetch_count=1)
这告诉RabbitMQ一次不给一个Consumer发送一个以上的消息。或者,换句话说,在Consumer没有处理并确认之前的消息之前,不要向员工发送新消息。相反,它会把它发送给下一个不在处理的Consumer。
Note about queue size
如果所有在运行中的Consumers都在处理中。如果想要对此保持关注,那么可能需要增加更多的Consumer,或者使用message TTL。
最后,下面是实现消息确认+消息和队列持久化+消息公平分发的Work queues:
new_task.py:
1 import sys 2 import pika 3 4 #step1:create connection with RabbitMQ Server 5 connection=pika.BlockingConnection(pika.ConnectionParameters(\'localhost\')) 6 channel=connection.channel() 7 8 #step2:create a named queue 9 channel.queue_declare(queue=\'hello\',durable=True) #durable=True实现队列持久化 10 11 #step3:message的默认值为"Hello World!",如果terminal中传入了参数,则为参数 12 message=\'\'.join(sys.argv[1:]) or "Hello World!" 13 channel.basic_publish(exchange=\'\',routing_key=\'hello\',body=message, 14 properties=pika.BasicProperties(delivery_mode=2,) #make message persisitent实现消息持久化 15 ) 16 print("[x] Sent %r"%message) 17 18 #step4:close 19 connection.close()
work.py:
1 import pika 2 import time 3 4 #step1:create connection with RabbitMQ Server 5 connection=pika.BlockingConnection(pika.ConnectionParameters(\'localhost\')) 6 channel=connection.channel() 7 8 #step2:create a named queue 9 channel.queue_declare(queue=\'hello\',durable=True) #durable=True实现队列持久化,C和P都要保持一致 10 11 #step3:define callback function. 12 def callback(ch,method,properties,body): 13 print(\'[x] Received %r\'%body) 14 time.sleep(body.count(b\'.\')) 15 print(\'[x] Done.\') 16 ch.basic_ack(delivery_tag=method.delivery_tag) #basic_ack是消息确认的不可缺少的一部分,少了它内存中的messages_noacknowledgement不会自动较少 17 18 19 #step4:when Consumer receive message,the program call callback function to deal with message. 20 channel.basic_qos(prefetch_count=1) #消息公平分发Fair dispatch ,设置RabbitMQ只向每个C发送一条message 21 channel.basic_consume(callback,queue=\'hello\') #使用no_ack=False的默认值,确保消息确认 22 23 print(\'[x] Waiting for messages. To exit press "CTRL+C"\') 24 25 #write an endless loop. 26 channel.start_consuming()
实例3:构建一个简单的日志记录系统:消息发布\\订阅Publish/Subscribe
在第二个实例中,我们创建了一个工作队列。工作队列背后的假设是,每个任务都交付给一个worker。在这一部分中,我们将做一些完全不同的事情——我们将向多个消费者传递消息。这种模式被称为“发布/订阅”。
为了说明这个模式,我们将构建一个简单的日志记录系统。它将由两个程序组成——第一个程序将发出日志消息,第二个程序将接收并打印它们。
在我们的日志系统中,接收程序Consumer的每一个运行副本都将得到消息。这样我们就能运行一个接收器,并将日志引导到磁盘;同时,我们可以运行另一个接收器,在屏幕上打印日志。
基本上,发布的日志消息将被所有Consumer接收,这种类型叫做广播broadcast。
Exchanges
在前两个实例中,Producer发送消息到队列,Consumer从队列中接收消息列。现在,我们来介绍一下RabbitMQ的完整消息传递模型。
让我们快速回顾一下前面的教程中介绍的内容:
生产者是发送消息的用户应用程序。
队列是存储消息的缓冲区。
消费者是接收消息的用户应用程序。
RabbitMQ中的消息传递模型的核心思想是,生产者不会直接向队列发送任何消息。实际上,生产者甚至不知道消息是否会被传递到哪个队列。
生产者只能将消息发送到exchange。exchange接收来自生产者的消息,再将消息推送到队列中。
exchange必须知道如何处理它收到的消息:
它是否应该附加到特定的队列?
它应该被附加到许多队列吗?
或者应该被抛弃。
这些规则由交换类型定义。
可用的交换类型有:direct,topic,headers,fanout。
1.fanout 以类似广播的方式向所有的C发送信息:所有bind到类型为fanout的这个exchange的queue都可以接收信息
2.direct 通过routingKey和exchange决定的那个唯一的queue可以接收消息
3.topic 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
4.headers
在这个实例中,我们将使用fanout类型的exchange来实现。
channel.exchange_declare(exchange=\'logs\', type=\'fanout\')
上面的代码声明了类型为fanout,名为logs的exchange。
fanout交换非常简单。它只是将接收到的所有消息广播到它所知道的所有队列。
代码实现:
channel.basic_publish(exchange=\'log\', #exchange的名称为logs routing_key=\'\', #不指定序列名称 body=message)
Listing exchanges
想要内存在RabbitMQ Server中的所有exchanges,也可以使用rabbitmqctl工具:
rabbitmqctl list_exchanges
在返回的列表中会有一些amq.*交换 和 默认的(未命名)交换。这些exchange是默认创建的,但现在我们不太可能需要使用它们。
The default exchange
在本教程的前几部分中,我们对交换一无所知,但仍然能够将消息发送到队列。这是因为我们使用的是默认交换,我们通过空字符串(“”)来识别它。
交换参数是交换的名称。空字符串表示默认或匿名交换:消息被发送到参数routing_key指定的队列中去。
Temporary queues 临时队列
在前两个实例中,我们使用有指定名称的队列(如:hello)。对我们来说,能够命名队列是至关重要的——我们需要将Consumer指向相同的队列。当您想要在生产者和消费者之间共享队列时,给队列命名是很重要的。
但在这个实例中,命名队列的情况却并非如此。
我们想要听到所有日志信息,而不仅仅是他们的子集。我们也只对当前flowing messages感兴趣,而不是旧消息。要解决这两个问题,我们需要两步。
第一步,当我们连接RabbbitMQ时,我们需要一个新的空队列。我们可以创建一个随机名称的队列或让server选择一个随机队列名称。这个可以通过不在queue_declare()中设置参数queue来实现:
result = channel.queue_declare()
result.method.queue包含了一个随机队列名。比方说“amq.gen-JzTY20BRgKO-HjmUJj0wLg”这样的类似的名称。
第二步,一旦Consumer不和该随机名称的队列连接了,这个队列就可以删除。通过设置exclusive参数实现。
result = channel.queue_declare(exclusive=True)
Bindings
我们已经创建了一个类型为fanout的exchange和一个队列。现在我们需要告诉exchange将消息发送到我们的队列中去。exchange和queue之间的联系被称为binding绑定。
channel.queue_bind(exchange=\'logs\',
queue=result.method.queue #result.method.queue中生成一个随机名的queue )
从现在起,名为logs的exchange将会向我们的队列中添加消息。
Listing bindings
我们可以通过rabbitmqctl工具列出现有的bindings。
rabbitmqctl list_bindings
综上所述:
Producer程序负责发送日志信息,大部分实现和前面两个实例中的Producer没有大的区别。最重要的变化是,我们现在想要把消息发布到名为logs,类型为fanout的exchange中去,而不是默认的exchange。我们需要在发送时提供一个routing_key,但是它的值被忽略了。
下面是Producer的代码,emit_log.py:
1 import pika 2 import sys 3 4 #step1:create connection with RabbitMQ server 5 connection =pika.BlockingConnection(pika.ConnectionParameters(host=\'localhost\'以上是关于RabbitMQ 消息队列的主要内容,如果未能解决你的问题,请参考以下文章RabbitMQ学习笔记五:RabbitMQ之优先级消息队列
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码