SpringAMQP处理RabbitMQ的常用五种消息模型
Posted ITLepeng
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringAMQP处理RabbitMQ的常用五种消息模型相关的知识,希望对你有一定的参考价值。
1.AMQP
AMQP(Advanced Message Queuing Protocol)AMQP,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
2.配置环境
引入依赖:
<dependencies>
<!--lombok依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--spring amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
在application.yml中添加RabbitMQ配置
spring:
rabbitmq:
host: 192.168.136.160
port: 5672
username: lepeng
password: lepeng
virtual-host: lepeng
3.SimpleQueue
简单模型,一个生产者,一个队列,一个消费者
消息发送:
@RunWith(SpringRunner.class)
@SpringBootTest
public class AmqpPublusherTest {
/**
* 发消息:
* 1、注入RabbitTemplate对象
* 2、调用convertAndSend方法发送消息
*/
@Autowired
private RabbitTemplate template;
//基础消息队列
@Test
public void testSendBaseMessage() {
String queueName = "simple.queue"; //队列名称
String message = "宝 你今天输液了嘛";
template.convertAndSend(queueName,message); //队列名,消息
}
}
发送完成后,可以去RabbitMQ中simple.queue里查看发送的消息<注意先不要启动消费者,消费者如果接收则查看不到了>
消息接收:
消费者需要保持一个开启状态接收信息,所以需要配置监听消息队列的监听器,注意监听器需要与启动类放在同一目录
@Component
public class MessageListener {
/**
* 业务方法:
* 没有返回值,有参数(生产者发送的消息)
* 参数类型和发送的消息参数类型一致
*/
@RabbitListener(queues = "simple.queue")
public void recBaseMessage1(String message) {
System.out.println("获取基础消息:"+message);
//Thread.sleep(5);
}
}
启动消费者类即可!!!
4.WorkQueue
循环模拟大量信息:
@Test
public void testSendWorkMessage() {
for (int i = 0; i < 10; i++) {
String queueName = "simple.queue"; //队列名称
String message = "宝 你今天输液了嘛";
template.convertAndSend(queueName,message); //队列名,消息
}
}
配置两个消费者,也就是监听器
@RabbitListener(queues = "simple.queue")
public void recBaseMessage1(String message) {
System.out.println("消费者1获取基础消息:"+message);
}
@RabbitListener(queues = "simple.queue")
public void recBaseMessage2(String message) {
System.out.println("消费者2获取基础消息:"+message);
}
结果:
但这样消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。
我们可以配置一个能者多劳机制。
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
5.发布/订阅
可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- C:消费者,与以前一样,订阅队列,没有变化
- Queue:消息队列也与以前一样,接收消息、缓存消息。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
6.Fanout(广播)
在广播模式下,消息发送流程是这样的:
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
广播模式主要实现了消息能发送给多个用户,因为队列中消息一次只能发送一个人,所以需要使用交换机去选择队列,发送给不同用户。
发送消息
//交换机
@Test
public void testFanoutMessage() {
String exchange = "lepengExchange";
String message = "输的什么液、想你的夜";
//交换机名称,null,消息
template.convertAndSend(exchange,"",message);
}
接收消息
@RabbitListener(
bindings = @QueueBinding(
value = @Queue( //绑定队列
name = "fanout.queue1"
),
exchange = @Exchange(
name = "lepengExchange", //绑定交换机
type = ExchangeTypes.FANOUT
)
)
)
public void recFanoutMessage(String message) throws InterruptedException {
System.out.println("消费1-获取基础消息:"+message);
Thread.sleep(200);
}
@RabbitListener(
bindings=@QueueBinding(
value=@Queue( //绑定队列
name = "fanout.quque2"
),
exchange = @Exchange( //绑定交换机
name = "lepengExchange",
type = ExchangeTypes.FANOUT //交换机类型
)
)
)
public void recFanoutMessage2(String message) throws InterruptedException {
System.out.println("消费2-获取基础消息:"+message);
Thread.sleep(200);
}
7.Direct
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
发送消息
//发送Direct模式的消息
@Test
public void testDirectMessage() {
String exchange = "exchange-direct";
String routingKey = "fan" ; //路由键
String message = "你看这牢饭又香又甜";
//交换机名称,路由键,消息
template.convertAndSend(exchange,routingKey,message);
}
接收消息
//监听Direct消息
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
name = "direct.queue1"
),
exchange = @Exchange(
name = "exchange-direct",
type = ExchangeTypes.DIRECT
),
key = {"fan"}
)
)
public void recDirectMessage(String message) throws InterruptedException {
System.out.println("消费1-获取基础消息:"+message);
Thread.sleep(200);
}
8.Topic
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者 item.spu
item.*
:只能匹配item.spu
解释:
- 红色Queue:绑定的是
usa.#
,因此凡是以usa.
开头的routing key
都会被匹配到 - 黄色Queue:绑定的是
#.news
,因此凡是以.news
结尾的routing key
都会被匹配
发送消息:
//发送topic类型的消息
@Test
public void testTopicMessage() {
String exchange = "exchange-topic";
String routingKey = "china.news";
String message = "吴签刑拘";
// Map message = new HashMap<>();
// message.put("id","123456");
// message.put("name","lepeng");
template.convertAndSend(exchange,routingKey,message);
}
接收消息:
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
name = "topic-queue1"
),
exchange = @Exchange(
name = "exchange-topic",
type = ExchangeTypes.TOPIC
),
key = "*.news"
)
)
//监听Topic类型消息
public void recTopicMessage(String message) throws InterruptedException {
System.out.println("消费1:"+message);
Thread.sleep(200);
}
注意:将消费者停止,去RabbitMQ的查看数据,可以看到我们传入的消息
经过base64解码我们可以看到
出现这种原因是因为:
Spring会把你发送的消息序列化,后进行一个base64的加密后放到队列当中。
默认情况下Spring采用的序列化方式是JDK序列化。JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
9.配置配置JSON转换器
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
并在启动类配置json转换的bean
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
测试:
监听器中修改参数为Map测试
10.源码
以上是关于SpringAMQP处理RabbitMQ的常用五种消息模型的主要内容,如果未能解决你的问题,请参考以下文章
SpringAMQP整合RabbitMQ-五种工作模式Demo