RabbitMQ-消息可靠性&延迟消息
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ-消息可靠性&延迟消息相关的知识,希望对你有一定的参考价值。
- 一、MQ常见问题
- 二、消息可靠性
1、消息丢失可能发生的节点 2、生产者确认机制 3、消息持久化 4、消费者确认消息 5、失败重试机制 - 三、死信交换机
1、死信 2、死信交换机 3、TTL 4、死信交换机&TTL代码实现 - 四、延迟消息
1、延迟队列 2、应用场景 3、延迟队列插件安装 4、延迟队列代码实现 5、修改延时消息报异常的处理逻辑 - 补充:Docker安装RabbitMQ(挂载插件数据卷)
一、MQ常见问题
- ① 消息可靠性
确保发送的消息至少被消费一次;
- ② 延迟消息
实现消息的延迟投递;
- ③ 消息堆积
处理消息无法及时消费的问题;
- ④ 高可用
避免单点MQ故障导致整体不可用;
二、消息可靠性
1、消息丢失可能发生的节点
- ① 发送时丢失
Ⅰ 生产者发送的消息未送达exchange;
Ⅱ 消息到达exchange后未到达queue。
- ② MQ宕机,queue将消息丢失
- ③ consumer接收到消息后未消费就宕机
2、生产者确认机制
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
- ①
publisher-confirm
,发送者确认
Ⅰ 消息成功投递到交换机,返回ack; - Ⅱ 消息未投递到交换机,返回nack。
- ②
publisher-return
,发送者回执
消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id
,以区分不同消息,避免ack冲突。
- ① application.yml配置
Ⅰ publish-confirm-type
:开启publisher-confirm,这里支持两种类型:
simple:同步等待confirm结果,直到超时;
correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback;
Ⅱ publish-returns
:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback;
Ⅲ template.mandatory
:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息。
spring:
rabbitmq:
publisher-confirm-type: correlated # 异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-returns: true # 开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
template:
mandatory: true # 定义消息路由失败时的策略。true:则调用ReturnCallback;false:则直接丢弃消息。
- ② 给RabbitTemplate配置ReturnCallback
注意:每个RabbitTemplate只能配置一个ReturnCallback。
Spring的bean默认为单例,让CommonConfig实现ApplicationContextAware接口,就是为了在Spring准备好容器后给rabbitTemplate对象设置ReturnCallback。
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware // Spring容器准备好后通知该类
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback:每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置。
rabbitTemplate.setReturnCallback((message, replyCode, replyTest, exchange, routingKey) ->
// 记录日志
log.info("发送消息到队列失败,应答码,原因,交换机,路由键,消息",
replyCode, replyTest, exchange, routingKey, message.toString());
// 重发消息代码
);
另外,此处省略了重发消息的代码实现,具体可以根据业务需求编写。
- ③ 代码实现ConfirmCallback
Ⅰ 获取CorrelationData对象,设置全局唯一ID,区分不同消息;
Ⅱ 设置ConfirmCallback的函数实现,消息发送到MQ成功和异常的处理函数;
注意:此处省略了重发消息的代码实现,具体可以根据业务需求编写。
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException
// 路由键
String routingKey = "simple";
// 消息体
String message = "hello, spring amqp!";
// 消息ID,封装到
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 添加callback
correlationData.getFuture().addCallback(
result ->
// 判断结果
if (result.isAck())
// ack,消息发送成功
log.debug("消息发送到交换机成功,ID:", correlationData.getId());
else
// nack,消息发送失败
log.error("消息发送到交换机失败,ID:,原因", correlationData.getId(), result.getReason());
// 重发消息
,
ex ->
// 记录日志
log.error("消息发送异常,ID:,原因", correlationData.getId(), ex.getMessage());
// 重发消息
);
// 发送消息,此处记得添加correlationData
rabbitTemplate.convertAndSend("amq.topic", routingKey, message, correlationData);
- ④ 处理消息确认的情形
Ⅰ publisher-comfirm:
消息成功发送到exchange,返回ack;
消息发送失败,没有到达交换机,返回nack;
消息发送过程中出现异常,没有收到回执。
Ⅱ 消息成功发送到exchange,但没有路由到queue,调用ReturnCallback。
3、消息持久化
- ① 交换机持久化
@Bean
public DirectExchange simpleExchange()
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new DirectExchange("simple.exchange", true, false);
注意:其实直接new也是持久化,默认走如下方法:
- ② 消息队列持久化
@Bean
public Queue simpleQueue()
// 使用QueueBuilder构建队列,durable持久化
return QueueBuilder.durable("simple.queue").build();
注意:直接new Queue("simple.queue");也是持久化的,如下:
- ③ 消息体持久化
@Test
public void testDurableMsg()
// 路由键
String routingKey = "simple";
// 消息体
String message = "Hello, durable.";
// 消息持久化
Message msg = MessageBuilder
.withBody(message.getBytes(StandardCharsets.UTF_8)) //消息体
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久化
.build();
// 发送消息
rabbitTemplate.convertAndSend("simple.exchange", routingKey, msg);
注意:直接发送普通消息,默认也是持久化的,如下:
public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
4、消费者确认消息
- ① 消息确认模式
Ⅰ manual:手动ack,需要在业务代码结束后,调用api发送ack;
Ⅱ auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack;
Ⅲ none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除。
- ② 配置文件配置
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack
5、失败重试机制
- ① 默认重试机制
当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环。
- ② Spring的retry机制
Spring机制重试次数耗尽后,消息会被reject,丢弃。
spring:
rabbitmq:
listener:
simple:
prefetch: 1
retry:
enabled: true # 开启消费失败重试
initial-interval: 1000 # 初始失败等待时间为1秒
multiplier: 3 # 下次失败等待时间的倍数,下次等待时长last*multiplier
max-attempts: 3 # 最大重试次数
stateless: true # 无状态;false则为有状态。业务中包含事务则改为false。
- ③ 失败消息处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要由MessageRecoverer接口来处理。
Ⅰ RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject,丢弃消息,默认就是这种方式;
Ⅱ ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack,消息重新入队;
Ⅲ RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机。
覆盖原有策略实现:
Ⅰ 自定义异常消息交换机、队列、绑定:
@Bean
public DirectExchange errorMessageExchange()
return new DirectExchange("error.direct");
@Bean
public Queue errorQueue()
return new Queue("error.queue", true);
@Bean
public Binding errorBinding()
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
Ⅱ 覆盖原有策略实现:
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate)
return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
三、死信交换机
1、死信
- ① 消费者使用
basic.reject
或basic.nack
声明消费失败,并且消息的requeue参数设置为false; - ② 消息是一个过期消息,
超时
无人消费; - ③ 要投递的队列消息
堆积满
了,最早的消息可能成为死信。
2、死信交换机
如果队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。
给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey。
3、TTL
TTL,也就是Time-To-Live。如果一个队列中的消息TTL结束仍未消费,则会变为死信。
TTL的两种情形:
Ⅰ 消息所在的队列
设置了存活时间;
Ⅱ 消息本身
设置了存活时间。
4、死信交换机&TTL代码实现
- ① 使用注解的形式声明一组死信交换机、队列,并绑定,如下:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.queue", durable = "true"),
exchange = @Exchange(name = "dl.direct"),
key = "dl"
))
public void listenDlQueue(String msg)
log.info("接收到dl.queue的延迟消息:", msg);
- ② 给队列设置超时时间,在声明队列时配置x-message-ttl属性:
@Bean
public DirectExchange ttlExchange()
return new DirectExchange("ttl.direct");
@Bean
public Queue ttlQueue()
return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化
.ttl(10000) // 设置队列的超时时间为10s
.deadLetterExchange("dl.direct") // 指定死信交换机
.deadLetterRoutingKey("dl") // 指定死信RoutingKey
.build();
@Bean
public Binding ttlBinding()
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("tl");
- ③ 给TTL队列发送消息
@Test
public void testTTLMsg()
// 路由键
String routingKey = "tl";
// 消息体
String message = "Hello, TTL.";
// 消息持久化
Message msg = MessageBuilder
.withBody(message.getBytes(StandardCharsets.UTF_8)) //消息体
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久化
.setExpiration("5000") // 5s
.build();
// 发送消息
rabbitTemplate.convertAndSend("ttl.direct", routingKey, msg);
注意:此处我们设置了queue的超时时间,以及msg的超时时间,最后MQ会以其中较短的时间来实现。
四、延迟消息
1、延迟队列
利用TTL结合死信交换机,实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。
2、应用场景
- ① 延迟发送短信;
- ② 用户下单,如果用户在15 分钟内未支付,则自动取消;
- ③ 预约工作会议,20分钟后自动通知所有参会人员。
3、延迟队列插件安装
- ① TTL+死信队列
详见死信交换机章节内容。
- ② 延迟队列插件
插件安装指南:Scheduling Messages with RabbitMQ。
官方插件社区:Community Plugins — RabbitMQ。
插件下载地址:Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub。
Ⅰ 下载插件
Ⅱ 将插件上传到数据卷目录/var/lib/docker/volumes/mq-plugins/_data
数据卷地址查看指令:docker volume inspect mq-plugins
Ⅲ 安装插件
进入容器内部:docker exec -it mq bash
启动插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
4、延迟队列代码实现
- ① 基于@RabbitListener的实现
DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayQueue(String msg)
log.info("接收到delay.queue的延迟消息:", msg);
- ② 基于Java代码的实现
delayed() // 设置delay属性为true
@Bean
public DirectExchange delayedExchange()
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型及名称
.delayed() // 设置delay属性为true
.durable(true) // 持久化
.build();
@Bean
public Queue delayedQueue()
return new Queue("delay.queue");
@Bean
public Binding delayedBinding()
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delay");
- ③ 向延迟队列发送消息
@Test
public void testDelayedMsg()
// 路由键
String routingKey = "delay";
// 消息体
String message = "Hello, Delay.";
// 消息延迟设置
Message msg = MessageBuilder
.withBody(message.getBytes(StandardCharsets.UTF_8)) //消息体
.setHeader("x-delay", 10000)
.build();
// 发送消息
rabbitTemplate.convertAndSend("delay.direct", routingKey, msg);
5、修改延时消息报异常的处理逻辑
- ① ReturnCallback处理逻辑
添加延时时间属性值的判断,该属性大于0则是延迟消息,不报错误提示。
// 判断是否是延迟消息
if (message.getMessageProperties().getReceivedDelay() > 0)
// 是一个延迟消息,忽略这个错误提示
return;
- ② 测试
- ③ ReturnCallback完整代码
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware // Spring容器准备好后通知该类
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback:每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置。
rabbitTemplate.setReturnCallback((message, replyCode, replyTest, exchange, routingKey) ->
// 判断是否是延迟消息
if (message.getMessageProperties().getReceivedDelay() > 0)
// 是一个延迟消息,忽略这个错误提示
return;
// 记录日志
log.info("发送消息到队列失败,应答码,原因,交换机,路由键,消息",
replyCode, replyTest, exchange, routingKey, message.toString());
// 重发消息代码
);
补充:Docker安装RabbitMQ(挂载插件数据卷)
- ① 拉取镜像
docker pull rabbitmq:3.8-management
- ② 运行容器
-e RABBITMQ_DEFAULT_USER=test
设置用户名为test;
-e RABBITMQ_DEFAULT_PASS=123321
设置密码为123456;
-v mq-plugins:/plugins
挂载数据卷;
--name mq
容器名mq;
--hostname mq1
主机名mq1;
-p 15672:15672
管理界面端口(此处前面的端口是我们设置的,后面的是需要被映射的,下同);
-p 5672:5672
MQ端口(内部使用);
docker run \\
-e RABBITMQ_DEFAULT_USER=test \\
-e RABBITMQ_DEFAULT_PASS=123456 \\
-v mq-plugins:/plugins \\
--name mq \\
--hostname mq1 \\
-p 15672:15672 \\
-p 5672:5672 \\
-d \\
rabbitmq:3.8-management
五、结尾
以上即为RabbitMQ-消息可靠性&延迟消息的全部内容
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码
系列文章目录
RabbitMQ:第一章:6 种工作模式以及消息确认机制(理论与代码相结合)
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)
RabbitMQ:第三章:Springboot集成RabbitMQ(直连模式,工作队列模式,发布订阅模式,路由模式,通配符模式
前言
本文通过实战代码,Spring整合RabbitMQ,项目分二个模块,consumer和produle。
提示:以下是本篇文章正文内容,下面案例可供参考
一、项目代码
1.生产者
1.项目架构图:
代码如下(示例):
2.pom.xml依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sky</groupId>
<artifactId>spring-rabbitmq-produle</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
3.spring-rabbitmq-producer.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机
默认交换机类型为direct,名字为:"",路由键为队列的名称
-->
<!--
id:bean的名称
name:queue的名称
auto-declare:自动创建
auto-delete:自动删除。 最后一个消费者和该队列断开连接后,自动删除队列
durable:是否持久化
-->
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播;所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
<!--定义广播类型交换机;并绑定上述两个队列-->
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1" />
<rabbit:binding queue="spring_fanout_queue_2"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- 定义队列-->
<rabbit:queue id="spring_direct_queue" name="spring_direct_queue" auto-declare="true"/>
<!--
定义 Routing 路由模式 交互机
-->
<rabbit:direct-exchange name="spring_direct_exchange" >
<rabbit:bindings>
<!--direct 类型的交换机绑定队列 key :路由key queue:队列名称-->
<rabbit:binding queue="spring_direct_queue" key="direct"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符;*匹配一个单词,#匹配多个单词 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue_one" name="spring_topic_queue_one" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue_two" name="spring_topic_queue_two" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue_three" name="spring_topic_queue_three" auto-declare="true"/>
<!--
声明 topic 类型的交换机
-->
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="one.*" queue="spring_topic_queue_one"/>
<rabbit:binding pattern="two.#" queue="spring_topic_queue_two"/>
<rabbit:binding pattern="three.#" queue="spring_topic_queue_three"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
4.rabbitmq.properties:
rabbitmq.host=110.42.239.246
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=spring
说明:这里免费提供rabbitmq连接方式给大家使用学习
5.ProducerTest:
package com.sky.springrabbitmqprodule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 简单模式发消息
*/
@Test
public void testHelloWorld(){
rabbitTemplate.convertAndSend("spring_queue","hello world spring....");
}
/**
* 广播模式发消息
*/
@Test
public void testFanout(){
rabbitTemplate.convertAndSend("spring_fanout_exchange","","spring fanout....");
}
/**
* 路由模式发消息
*/
@Test
public void testDirect(){
rabbitTemplate.convertAndSend("spring_direct_exchange","direct","spring Direct....");
}
/**
* 通配符模式发消息
*/
@Test
public void testTopics(){
rabbitTemplate.convertAndSend("spring_topic_exchange","one.onekey","spring topic one....");
rabbitTemplate.convertAndSend("spring_topic_exchange","two.twokey.topic","spring topic two....");
rabbitTemplate.convertAndSend("spring_topic_exchange","three.threekey.topic","spring topic three....");
}
}
2.消费者
1.项目架构图
代码如下(示例):
2.pom.xml依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sky</groupId>
<artifactId>spring-rabbitmq-consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
3.spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
以上是关于RabbitMQ-消息可靠性&延迟消息的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码
RRabbitMQ05_消息可靠性投递ACK限流处理TTL队列死信交换机延迟队列