3.6.4 RabbitMQ教程四 - Publish/Subscribe

Posted infinitecodes

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了3.6.4 RabbitMQ教程四 - Publish/Subscribe相关的知识,希望对你有一定的参考价值。

Publish/Subscribe发布/订阅

What This Tutorial Focuses On

In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we‘ll do something completely different -- we‘ll deliver a message to multiple consumers. This pattern is known as "publish/subscribe".

之前的教程我们创建了一个工作队列。工作队列背后的假设是,每个任务都被精确地传递给一个工人。在这一部分中,我们会做一些完全不同的事情 - 我们将向多个消费端传送一条消息。这个模式被称为‘发布/订阅’。

To illustrate the pattern, we‘re going to build a simple logging system. It will consist of two programs -- the first will emit log messages and the second will receive and print them.

为了说明这个模式,我们将构建一个简单的日志系统。这个日志系统由两个程序组成 - 第一个会发出日志消息,第二个会接收并打印他们。

In our logging system every running copy of the receiver program will get the messages. That way we‘ll be able to run one receiver and direct the logs to disk; and at the same time we‘ll be able to run another receiver and see the logs on the screen.

在我们的日志系统里,接收程序的每个运行中的副本都会接收到消息。如此一来我们就可以运行一个接收端并将日志定向到磁盘;同时我们可以运行另一个接收端并在屏幕上查看日志。

Essentially, published log messages are going to be broadcast to all the receivers.

实际上,发布的日志消息将被广播给所有的接收端。

Exchanges

In previous parts of the tutorial we sent and received messages to and from a queue. Now it‘s time to introduce the full messaging model in Rabbit.

前面的教程里我们从一个队列里发送和接收消息。现在是时候介绍RabbitMQ中的完整的消息模型了。

Let‘s quickly go over what we covered in the previous tutorials:

  • A producer is a user application that sends messages.
  • A queue is a buffer that stores messages.
  • A consumer is a user application that receives messages.

让我们快速过一遍我们在之前教程中学到的:

  • 一个生产端是一个发送消息的用户应用程序
  • 一个队列是一个存储消息的缓冲区
  • 一个消费者是一个接收消息用户应用程序

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn‘t even know if a message will be delivered to any queue at all.

RabbitMQ中的消息模型的核心思想就是生产端从来不直接将任何消息发送到队列中。实际上,通常生产者根本不知道消息是否将被传递到任何队列。

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

相反,生产端只能向exchange(交换)发送消息。交换是个很简单的东西。在这边它从生产端接收消息然后在另一边它把消息推给队列。交换必循准确的知道对它接收的消息要做什么。是将他添加到一个特定的队列?还是将它添加到许多队列?又或者是直接忽略他。其中的规则由exchange type(交换类型)定义。

                                                        技术图片

There are a few exchange types available: direct, topic, headers and fanout. We‘ll focus on the last one -- the fanout. Let‘s create an exchange of that type, and call it logs:

以下是几种可用的交换类型:direct, topic, headers, 和fanout. 我们聚焦于最后一个 - fanout。让我们来创建一个这种类型的交换,定义为logs:

channel.exchange_declare(exchange=‘logs‘,
                         exchange_type=‘fanout‘)

The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that‘s exactly what we need for our logger.

fanout交换很简单。就像你可能从名字猜到的那样,它就是对所有它知道的队列guan广播它接收到的所有消息。而那正是我们的logger(日志记录器)所需要的。

Listing exchanges

To list the exchanges on the server you can run the ever useful rabbitmqctl:

要列出服务端上的交换你可以运行非常有用的rabbitmqctl

D:RabbitMQ Server
abbitmq_server-3.7.23sbin>rabbitmqctl list_exchanges

In this list there will be some amq.* exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you‘ll need to use them at the moment.

这个列表中会有一些amq.*的交换以及默认的(未命名)交换。这些都是由默认创建的,但目前你不太可能需要用到他们。

The default exchange

In previous parts of the tutorial we knew nothing about exchanges, but still were able to send messages to queues. That was possible because we were using a default exchange, which we identify by the empty string ("").

在教程前面的部分我们对交换还啥都不知道,但仍可以将消息发送给队列。之所以能行是因为我们使用了默认的交换,用空字符串(‘’)定义的交换。

Recall how we published a message before:

回想一下我们之前是如何发布一条消息的:

channel.basic_publish(exchange=‘‘,
                      routing_key=‘hello‘,
                      body=message)

The exchange parameter is the name of the exchange. The empty string denotes the default or nameless exchange: messages are routed to the queue with the name specified by routing_key, if it exists.

交换参数就是该交换的名字。空字符串表示默认或者未命名的交换:消息以routing_key指定的名称路由到队列,如果routing_key存在的话。

Now, we can publish to our named exchange instead:

现在,我们可以(将消息)发布到我们命名的交换中了:

channel.basic_publish(exchange=‘logs‘,
                      routing_key=‘‘,
                      body=message)

Temporary queues

As you may remember previously we were using queues that had specific names (remember hello and task_queue?). Being able to name a queue was crucial for us -- we needed to point the workers to the same queue. Giving a queue a name is important when you want to share the queue between producers and consumers.

就像你也许之前记得的,我们使用的队列都有具体的名字(记得hello和task_queue吗?)。能够对一个队列命名对我们至关重要 - 我们需要将工作端指向同一个队列。当你想在生产端和消费端共享队列时,给队列取一个名字是很重要的。

But that‘s not the case for our logger. We want to hear about all log messages, not just a subset of them. We‘re also interested only in currently flowing messages not in the old ones. To solve that we need two things.

但我们的日志记录器不是这样的。我们想收到所有的日志消息,不仅仅只是其中的一个子集。我们也只对当前的信息流感兴趣,而不是对旧的信息流感兴趣。要解决这个我们需要两个东西

Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do it we could create a queue with a random name, or, even better - let the server choose a random queue name for us. We can do this by supplying empty queue parameter to queue_declare:

首先,无论什么时候我们连接至Rabbit时我们都需要一个新的,空的队列。为此,我们可以创建一个具有随机名称的队列,或者,甚至更好 - 让服务器为我们选择一个随机的名字。我们可以通过给queue的参数queue_declare提供一个空值来实现它:

result = channel.queue_declare(queue=‘‘)

At this point result.method.queue contains a random queue name. For example it may look like amq.gen-JzTY20BRgKO-HjmUJj0wLg.

此时result.method.queue包含一个随机队列名。比如它也许看起来是这样amq.gen-JzTY20BRgKO-HjmUJj0wLg

Secondly, once the consumer connection is closed, the queue should be deleted. There‘s an exclusive flag for that:

其次,一旦消费端连接关闭,队列就会被删除掉。有一个exclusive的标记来标记它

result = channel.queue_declare(queue=‘‘, exclusive=True)

You can learn more about the exclusive flag and other queue properties in the guide on queues.

你可以在指南中queues的部分了解更多关于exclusive标记以及其他队列属性

Bindings

                                                        技术图片

We‘ve already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a binding.

我们已经创建了一个fanout类型的交换和一个队列。现在我们需要告诉该交换发送消息到我们的队列中。存在于该交换和该队列之间的这种关系被称为binding

channel.queue_bind(exchange=‘logs‘,
                   queue=result.method.queue)

From now on the logs exchange will append messages to our queue.

从现在起日志交换会将消息添加至我们的队列。

Listing bindings

You can list existing bindings using, you guessed it,

你可以列出使用中的现有bindings

D:RabbitMQ Server
abbitmq_server-3.7.23sbin>rabbitmqctl list_bindings

Putting it all together

                                                     技术图片

The producer program, which emits log messages, doesn‘t look much different from the previous tutorial. The most important change is that we now want to publish messages to our logs exchange instead of the nameless one. We need to supply a routing_key when sending, but its value is ignored for fanout exchanges.

生产程序,发送日志消息,跟之前教程展示的差不多。最大的区别是现在我们想要发布消息到我们的logs交换而不是没有名字的交换。发送时我们需要提供一个routing_key,但对于一个fanout类型的交换,routing_key的值可以被忽略

emit_log.py

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘logs‘, exchange_type=‘fanout‘)

message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange=‘logs‘, routing_key=‘‘, body=message)
print(" [x] Sent %r" % message)
connection.close()

As you see, after establishing the connection we declared the exchange. This step is necessary as publishing to a non-existing exchange is forbidden.

如你所见,建立链接后我们声明了交换。这一步是必要的,因为发布消息到一个不存在的交换是被禁止的。

The messages will be lost if no queue is bound to the exchange yet, but that‘s okay for us; if no consumer is listening yet we can safely discard the message.

如果还没有队列绑定到交换那么消息将会丢失,但对我们来说还好;如果还没有消费端收听我们可以安全地丢弃该条消息。

receive_logs.py

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘logs‘, exchange_type=‘fanout‘)

result = channel.queue_declare(queue=‘‘, exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange=‘logs‘, queue=queue_name)

print(‘ [*] Waiting for logs. To exit press CTRL+C‘)

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

We‘re done. If you want to save logs to a file, just open a console and type:

完成。如果你想将日志保存到文件,就打开一个控制台并输入:

python receive_logs.py > logs_from_rabbit.log

If you wish to see the logs on your screen, spawn a new terminal and run:

如果你希望在你的屏幕上看到日志,生成一个新的终端并运行:

python receive_logs.py

And of course, to emit logs type:

当然,要发送日志就输入:

python emit_log.py

Using rabbitmqctl list_bindings you can verify that the code actually creates bindings and queues as we want. With two receive_logs.py programs running you should see something like:

使用rabbitmqctl_list_bindings你可以验证代码是否创建了我们想要bindings和queues。运行两个receive_logs.py程序你应该可以看到以下内容:

D:RabbitMQ Server
abbitmq_server-3.7.23sbin>rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        amq.gen-GskokrqP-01WZhxyRmQnLQ  queue   amq.gen-GskokrqP-01WZhxyRmQnLQ  []
        exchange        amq.gen-5e4PGtwbLcztZTPgQ5gpPw  queue   amq.gen-5e4PGtwbLcztZTPgQ5gpPw  []
        exchange        amq.gen-OLz5701qsSsS9aBkIiXTCg  queue   amq.gen-OLz5701qsSsS9aBkIiXTCg  []
        exchange        task_queue      queue   task_queue      []
        exchange        amq.gen-9C_cSdFbqI9NwUEF27Zzfw  queue   amq.gen-9C_cSdFbqI9NwUEF27Zzfw  []
        exchange        amq.gen-NpSv46ZT0muUHLtGfGM6rQ  queue   amq.gen-NpSv46ZT0muUHLtGfGM6rQ  []
logs    exchange        amq.gen-5e4PGtwbLcztZTPgQ5gpPw  queue   amq.gen-5e4PGtwbLcztZTPgQ5gpPw  []
logs    exchange        amq.gen-9C_cSdFbqI9NwUEF27Zzfw  queue   amq.gen-9C_cSdFbqI9NwUEF27Zzfw  []
logs    exchange        amq.gen-GskokrqP-01WZhxyRmQnLQ  queue   amq.gen-GskokrqP-01WZhxyRmQnLQ  []
logs    exchange        amq.gen-NpSv46ZT0muUHLtGfGM6rQ  queue   amq.gen-NpSv46ZT0muUHLtGfGM6rQ  []
logs    exchange        amq.gen-OLz5701qsSsS9aBkIiXTCg  queue   amq.gen-OLz5701qsSsS9aBkIiXTCg  []

The interpretation of the result is straightforward: data from exchange logs goes to two queues with server-assigned names. And that‘s exactly what we intended.

对结果的解释很直白:来自名为‘logs’交换的数据发送到了由服务器端指定了名字的两个队列。并且这正是我们想要的。

以上是关于3.6.4 RabbitMQ教程四 - Publish/Subscribe的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ官方中文入门教程PHP版

RabbitMQ入门教程:工作队列(Work Queues)

RabbitMQ的transactionconfirmack三个概念的解释

RabbitMQ学习:远程结果调用

#yyds干货盘点#「MQ」RabbitMQ的基本概念介绍,通俗易懂!

CentOS Stream 9下RabbitMQ安装教程(最新RabbitMQ安装教程)