SpringCloud系列[MQ 篇] - 详述 RabbitMQ 五种模型的结构及具体实现
Posted Fug_Lee
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud系列[MQ 篇] - 详述 RabbitMQ 五种模型的结构及具体实现相关的知识,希望对你有一定的参考价值。
各种模型的具体实现的前提是你的虚拟机已经部署了 RabbitMQ 并启动.具体部署步骤请看此文章:SpringCloud系列(十)[MQ 篇] - RabbitMQ 初步学习及详细部署步骤.
RabbitMQ 五种模型
- 🐰 BasicQueue 简单队列模型
- 🐰🐰 WorkQueue 工作消息队列模型
- 🐰🐰🐰 Fanout Exchange 扇形交换机(广播模型)
- 🐰🐰🐰🐰 Direct Exchange 直连交换机(路由模型)
- 🐰🐰🐰🐰🐰 Topic Exchange 主题交换机(主题模型)
前提: 启动 RabbitMQ;
如果没有启动, 则执行指令 docker ps -a
找到容器的 ID, 然后执行 docker start [ID]
启动.
启动成功后进行登录, 如下图所示:
🐰 BasicQueue 简单队列模型
模型:
前提: 新建一个 simple.queue 队列;
消息发送:
步骤一: 在父工程 pom.xml 中引入 AMQP 依赖 (注意这里面已经包含了 RabbitMQ);
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
步骤二: 配置 RabbitMQ 地址, 在发布者(Publisher) 服务的 yml 文件中添加配置:
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
spring:
rabbitmq:
host: 172.16.**.** # rabbitMQ 的 ip 地址, 也就是你的虚拟机 ip 地址
port: 5672 # 端口号
username: myRabbitMQ
password: 123456 # 这里的用户名和密码是 docker 运行 RabbitMQ 容器时自己设定的, 具体指令看上篇文章
virtual-host: /
步骤三: 在 Publisher 服务中编写测试类, 并利用 RabbitTemplate 实现消息的发送;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue()
String queueName = "simple.queue";
String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend(queueName, message);
消息接收:
步骤一: 配置 RabbitMQ 地址, 在消费者(Consumer) 服务的 yml 文件中添加配置:
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
spring:
rabbitmq:
host: 172.16.**.*** # rabbitMQ 的 ip 地址
port: 5672 # 端口
username: myRabbitMQ
password: 123456
virtual-host: /
listener:
direct:
prefetch: 1
步骤二: 在 consumer 服务中新建 listener 包, 并新建 SpringRabbitListener 类用来接收消息;
@Component
public class SpringRabbitListener
@RabbitListener(queues="simple.queue")
public void listenSimpleQueue(String msg)
System.out.println("消费者接收到 simple.queue 的消息: [" + msg + "]");
测试:
启动 consumer 服务, 运行 publisher 中的测试代码, 进行消息的发送和接收;
🐰🐰 WorkQueue 工作消息队列模型
WorkQueue 又称 TaskQueue 任务模型, 也就是说让多个消费者绑定到一个队列来共同消费队列中的消息; 主要针对的问题是当消息处理比较耗时的时候, 可能生产消息的速度远远大于消息的消费速度, 这样队列中会堆积越来越多的消息无法及时处理, 而此模型多个消费者共同处理消息, 速度就能大大提高.
模型:
前提: 新建一个 work.queue 队列;
消息发送:
因为要解决的就是大量消息的堆积问题, 因此这里我们循环发送信息;
@Test
public void testSendMessage2WorkQueue() throws InterruptedException
String queueName = "work.queue";
String message = "hello, message --- ";
for (int i = 0; i <= 50; i++)
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
消息接收:
消息的接收也要模拟两个消费者共同绑定一个队列, 于是在 consumer 的 SpringRabbitListener 中添加了两个方法:
@RabbitListener(queues="work.queue")
public void listenSimpleQueue1(String msg) throws InterruptedException
System.out.println("消费者11111接收到 work.queue 的消息: [" + msg + "]" + LocalTime.now());
Thread.sleep(20);
@RabbitListener(queues="work.queue")
public void listenSimpleQueue2(String msg) throws InterruptedException
System.err.println("消费者22222接收到 work.queue 的消息: [" + msg + "]" + LocalTime.now());
Thread.sleep(200);
测试:
通过上面的结果可以看得出来, consumer1 很快就完成了自己的消息, 而 consumer2 在缓慢的处理自己的消息; 也就是说 consumer1 和 consumer2 都是处理相同数量的消息, 但是如果考虑到消费者的处理能力, 这样显然是有问题的, 因此最好的方式就是"能者多劳", 处理能力强的 consumer 处理的数量多才对, 因此需要在 consumer 的 yml 文件中需要添加以下配置, 目的是每个 consumer 每次只能获取一个消息, 只有处理完成了才能获取下一个消息.
测试结果如下:
很明显, consumer1 的处理能力比较强, 因此处理的消息也就比 consumer2 多得多.
总之:
- 多个消费者绑定到一个队列上, 同一条消息只会被一个消费者处理;
- 通过在 yml 中设置 prefetch 来控制消费者预取的消息数量.
🐰🐰🐰 Fanout Exchange 扇形交换机(广播模型)
消息发送:
在广播模式下, 消息的发送流程如下:
- 可以有多个队列, 但是多个队列都绑定到了 Exchange 交换机上;
- 发布者发送的消息只能发送到交换机, 交换机来决定要发给哪个队列, 发布者无法决定;
- 交换机将消息发送到绑定过的所有队列, 订阅这些队列的消费者就可以拿到消息;
- 交换机不能缓存消息, 当路由失败时, 消息也就丢失了.
步骤一: 声明队列和交换机
在 consumer 的 config 包中新建 FanoutConfig 类, 声明两个队列和一个交换机;
@Configuration
public class FanoutConfig
/**
* 声明交换机
*/
@Bean
public FanoutExchange fanoutExchange()
return new FanoutExchange("myrabbitmq.fanout");
/**
* 声明第一个队列
*/
@Bean
public Queue fanoutQueue1()
return new Queue("fanout.queue1");
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange)
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
/**
* 声明第二个队列
*/
@Bean
public Queue fanoutQueue2()
return new Queue("fanout.queue2");
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange)
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
步骤二: 消息发送
在 publisher 测试类中添加消息发送的测试方法;
@Test
public void testFanoutExchange()
// 交换机名称
String exchangeName = "myrabbitmq.fanout";
String message = "您好, 我是 FanoutExchange";
// FanoutExchange 路由键设置为空即可
rabbitTemplate.convertAndSend(exchangeName,"",message);
步骤三: 消息接收
/**
* FanoutExchange 消息接收
*/
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg)
System.out.println("consumer1 接收到 Fanout 的消息: [" + msg + "]");
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg)
System.out.println("consumer2 接收到 Fanout 的消息: [" + msg + "]");
测试:
在 RabbitMQ 管理网站上也可以看到我们的交换机和队列信息:
🐰🐰🐰🐰 Direct Exchange 直连交换机(路由模型)
在广播模式下我们可以知道一条消息可以被多个消费者订阅, 但是在一些场景下我们希望不同的消息被不同的队列消费, 这时候 Direct Exchange 就出现了.
**消息发送:**
在 Direct Exchange 模式下:
- 队列与交换机的绑定不再是任意绑定了, 而是需要指定一个 路由键 (RoutingKey);
- 发布者向 Exchange 发送消息时, 也必须指定消息的路由键;
- 交换机不再讲消息给每一个绑定的队列, 而是根据消息的路由键进行判断, 只有队列的路由键和消息的路由键完全一致才会接收到消息.
步骤一: 在 publisher 服务的测试类中添加消息发送测试;
/**
* DirectExchange
*/
@Test
public void testSendDirectExchange()
// 交换机名称
String exchangeName = "myrabbitmq.direct";
String message = "您好, 这里是 DirectExchange";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName,"routingkey1",message);
消息接收:
步骤一: 声明队列和交换机 (这次我们基于注解的形式来进行声明)
在 consumer 的 SpringRabbitListener 中添加两个消费者,同时基于注解来声明队列和交换机;
/**
* 1. 基于注解的方式声明队列和交换机
* 2. DirectExchange 消息接收
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "myrabbitmq.direct", type = ExchangeTypes.DIRECT),
key = "routingkey1","routingkey2" // 可以设置任意个路由键
))
public void listenDirectQueue1(String msg)
System.out.println("消费者接收到了 direct.queue1 的消息: [" + msg + "]");
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "myrabbitmq.direct", type = ExchangeTypes.DIRECT),
key = "routingkey3","routingkey4" // 可以设置任意个路由键
))
public void listenDirectQueue2(String msg)
System.out.println("消费者接收到了 direct.queue2 的消息: [" + msg + "]");
测试:
路由键为 routingkey1 时发送消息:
路由键为 routingkey3 时发送消息:
在 RabbitMQ 管理网站上也可以看到我们的交换机和队列信息:
绑定信息:
🐰🐰🐰🐰🐰 Topic Exchange 主题交换机(主题模型)
TopicExchange 类型的 Exchange 与 DirectExchange 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过TopicExchange 类型的 Exchange 可以让队列在绑定 Routing key 的同时使用通配符!
这里的 RoutingKey 一般都是由一个或者多个单词组成, 多个单词之间以 “.” 分割, 规则如下:
- #: 匹配一个或多个词, 如 topic.# 能够匹配 topic.aa.bb 或者 topic.aa;
- *: 匹配一个词, 只能匹配成 topic.aa 这种形式
消息发送:
/**
* TopicExchange
*/
@Test
public void testSendTopicExchange()
// 交换机名称
String exchangeName = "myrabbitmq.topic";
String message = "您好, 这里是 TopicExchange";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName,"AABBcc.key1",message);
消息接收:
/**
* 1. 基于注解的方式声明队列和交换机
* 2. TopicExchange 消息接收
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "myrabbitmq.topic", type = ExchangeTypes.TOPIC),
key = "#.key1" // 所有后缀是 key1 的都可以接收到
))
public void listenTopicQueue1(String msg)
System.out.println("消费者接收到了 topic.queue1 的消息: [" + msg + "]");
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "myrabbitmq.topic", type = ExchangeTypes.TOPIC),
key = "key2.*" // 类似于 key2.aa / key2.bb 这样的可以接收到
))
public void listenTopicQueue2(String msg)
System.out.println("消费者接收到了 topic.queue2 的消息: [" + msg + "]");
测试:
在 RabbitMQ 管理网站上也可以看到我们的交换机和队列信息:
以上是关于SpringCloud系列[MQ 篇] - 详述 RabbitMQ 五种模型的结构及具体实现的主要内容,如果未能解决你的问题,请参考以下文章
重学SpringCloud系列八之微服务网关安全认证-JWT篇
Java面试题超详细讲解微服务系列之十六SpringCloud篇
SpringCloud Alibaba系列Dubbo高级特性篇
SpringCloud Alibaba系列Dubbo基础入门篇
ESP 保姆级教程 预告疯狂Node.js服务器篇 ——案例:ESP8266 + MQ系列 + NodeJs本地服务 + 文件存储数据