RabbitMQ之发布订阅
Posted zengnansheng
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ之发布订阅相关的知识,希望对你有一定的参考价值。
将同一个队列的消息发送给多个消费者的模式就是“发布/订阅”,
这种模式的基础是将消息广播到所有的接收器上。
实际上,RabbitMQ中消息传递模型的核心思想是:
生产者不直接发送消息到队列。
实际的运行环境中,生产者是不知道消息会发送到哪个队列上,
她只会将消息发送到一个交换器,
交换器也像一个生产线,她一边接收生产者发来的消息,另外一边则根据交换规则,将消息放到队列中。
交换器必须知道她所接收的消息是什么?它应该被放到哪个队列中?它应该被添加到多个队列吗?还是应该丢弃?
这些规则都是按照交换器的规则来确定的。
交换器的规则有:
direct (直连)
topic (主题)
headers (标题)
fanout (分发)也有翻译为扇出的
我们将使用【fanout】类型创建一个名称为myexchange的交换器
channel.exchangeDeclare("myexchange", "fanout");
分发交换器很简单,你通过名称也能想到,她是广播所有的消息
通过rabbitmqctl list_exchanges指令可以列出服务器上所有可用的交换器列表
这个列表里面所有以【amq.*】开头的交换器都是RabbitMQ默认创建的。在生产环境中,可以自己定义。
在之前的实例中,我们知道,发送消息到队列时根本没有使用交换器,但是消息也能发送到队列。
这是因为RabbitMQ选择了一个空“”字符串的默认交换器。
来看看我们之前的代码:
channel.basicPublish("", "myqueue", null, message.getBytes());
第一个参数就是交换器的名称。如果输入“”空字符串,表示使用默认的匿名交换器。
第二个参数是【routingKey】路由线索
匿名交换器规则:
发送到routingKey名称对应的队列
现在,我们可以发送消息到交换器中
channel.basicPublish( "myexchange", "", null, message.getBytes());
如果要在生产者和消费者之间创建一个新的队列,又不想使用原来的队列,临时队列就是为这个场景而生的:
1.首先,每当我们连接到RabbitMQ,我们需要一个新的空队列,我们可以用一个随机名称来创建,或者说让服务器选择一个随机队列名称给我们。
2.一旦我们断开消费者,队列应该立即被删除。
提供queuedeclare()为我们创建一个非持久化、独立、自动删除的随机队列名称
String queueName = channel.queueDeclare().getQueue();
如果我们已经创建了一个分发交换器和队列,现在我们就可以就将我们的队列跟交换器进行绑定。
channel.queueBind(queueName, exchangeName, "");
执行完这段代码后,交换器会将消息添加到该队列中
执行rabbitmqctl list_bindings 命令可以查看绑定列表
以下是代码:
生产者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { private static final String EXCHANGE_NAME = "myexchange"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 分发消息 for (int i = 1; i <= 5; i++) { String message = "Hello World! " + i; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" p Sent ‘" + message + "‘"); } channel.close(); connection.close(); } }
消费者1
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Consumer1 { private static final String EXCHANGE_NAME = "myexchange"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" Consumer1 Waiting for messages..."); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" Consumer1 Received ‘" + message + "‘"); } }; channel.basicConsume(queueName, true, consumer); } }
消费者2
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Consumer2 { private static final String EXCHANGE_NAME = "myexchange"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" Consumer2 Waiting for messages..."); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" Consumer2 Received ‘" + message + "‘"); } }; channel.basicConsume(queueName, true, consumer); } }
运行Consumer1 打印
Consumer1 Waiting for messages...
运行Consumer2 打印
Consumer2 Waiting for messages...
运行Producer 打印
p Sent ‘Hello World! 1‘
p Sent ‘Hello World! 2‘
p Sent ‘Hello World! 3‘
p Sent ‘Hello World! 4‘
p Sent ‘Hello World! 5‘
Consumer1监听到消息最终打印
Consumer1 Waiting for messages...
Consumer1 Received ‘Hello World! 1‘
Consumer1 Received ‘Hello World! 2‘
Consumer1 Received ‘Hello World! 3‘
Consumer1 Received ‘Hello World! 4‘
Consumer1 Received ‘Hello World! 5‘
Consumer2监听到消息最终打印
Consumer2 Waiting for messages...
Consumer2 Received ‘Hello World! 1‘
Consumer2 Received ‘Hello World! 2‘
Consumer2 Received ‘Hello World! 3‘
Consumer2 Received ‘Hello World! 4‘
Consumer2 Received ‘Hello World! 5‘
可以看到同一个消息被2个消费者接收
使用rabbitmqctl list_bindings命令可以看到两个临时队列的名称
以上是关于RabbitMQ之发布订阅的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ指南之三:发布/订阅模式(Publish/Subscribe)
RabbitMQ : 订阅者模式之路由模式 ( direct )