RRabbitMQ05_消息可靠性投递ACK限流处理TTL队列死信交换机延迟队列
Posted 所得皆惊喜
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RRabbitMQ05_消息可靠性投递ACK限流处理TTL队列死信交换机延迟队列相关的知识,希望对你有一定的参考价值。
文章目录
- ①. 消息可靠性投递 - 生产端
- ②. Consumer ACK - 消费端
- ③. 限流处理limit - 消费端
- ④. TTL过期队列 - 生产端
- ⑤. 死信队列 - 交换机
- ⑥. 延迟队列 - ttl+死信
①. 消息可靠性投递 - 生产端
- ①. 在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性模式
- confirm确认模式:消息从producer到exchange则会返回一个confirmCallback
- return退回模式:消息从exchange到queue投递失败则会返回一个returnCallback
- 我们将利用这两个callback控制消息的可靠性投递
- ②. rabbitmq整个消息投递的路径为:
- ③. confirm生产端配置文件编写
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="$rabbitmq.host"
port="$rabbitmq.port"
username="$rabbitmq.username"
password="$rabbitmq.password"
virtual-host="$rabbitmq.virtual-host"
publisher-confirms="true"/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--消息的可靠性投递(生产端)-->
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
<rabbit:direct-exchange id="test_exchange_confirm" name="test_exchange_confirm">
<rabbit:bindings>
<rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
- ④. 发送消息
@Test
public void testConfirm()
/**
* 确认模式,步骤:
* 1. 确认模式开启:connectionFactory中publisher-confirms="true
* 2. 在rabbitTemplate定义ConfirmCallBack回调函数
*/
//2.定义回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback()
/**
* @param correlationData 相关配置信息
* @param ack exchange交换机,是否成功收到了消息,true 成功 false 失败
* @param cause 失败原因,如果ack为false,那么cause就有值
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
if(ack)
// 接收成功
System.out.println("接收成功消息:"+cause);
else
// 接收失败
System.out.println("接收失败信息:"+cause);
// 做一些处理,让消息再次发送
System.out.println("confirm方法被执行了...");
);
//3. 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message confirm");
- ⑤. 回退模式ReturnCallBack代码编写
- 开启回退模式:publisher-returns=“true”
- 设置ReturnCallBack
- 设置Exchange处理消息的模式:
如果消息没有路由到Queue,则丢弃消息(默认)
如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
@Test
public void testReturn()
//设置交换机处理失败消息的模式
rabbitTemplate.setMandatory(true);
//2.设置ReturnCallBack
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback()
/**
*
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)
System.out.println("return 执行了....");
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
//处理
);
//3. 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm2", "message confirm....");
return 执行了....
10:44:56.551 [main] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@41e350f1 size = 1, maxSize = 32, parentContextCount = 0, hitCount = 2, missCount = 1]
(Body:'message confirm....' MessageProperties [headers=, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
312
NO_ROUTE
test_exchange_confirm
confirm2
②. Consumer ACK - 消费端
- ①. ack指Acknowledge,确认。 表示消费端收到消息后的确认方式,有三种确认方式:
- 自动确认:acknowledge=“none”,如果业务处理失败了,消息就丢失了
- 手动确认:acknowledge=“manual”,会等待业务处理成功,才去消费
- 根据异常情况确认:acknowledge=“auto”(这种方式使用麻烦,不作讲解)
-
②. 其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message从RabbitMQ的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失
-
③. 如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息
-
④. 消费端代码实现
- 配置文件中设置acknowledge=“manual”,手动签收
- 让监听器类实现ChannelAwareMessageListener接口
- 如果消息成功处理,则调用channel的basicAck()签收
- 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="$rabbitmq.host"
port="$rabbitmq.port"
username="$rabbitmq.username"
password="$rabbitmq.password"
virtual-host="$rabbitmq.virtual-host"/>
<bean id="ackListener" class="com.xiaozhi.rabbitmq.listener.AckListener"/>
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true" acknowledge="manual">
<rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/>
</rabbit:listener-container>
</beans>
public class AckListener implements ChannelAwareMessageListener
/**
* Consumer ACK机制:
* 1. 设置手动签收。acknowledge="manual"
* 2. 让监听器类实现ChannelAwareMessageListener接口
* 3. 如果消息成功处理,则调用channel的 basicAck()签收
* 4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
*/
@Override
public void onMessage(Message message, Channel channel) throws Exception
// 接收到的一个标签
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try
//1.接收转换消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
//int i = 3/0;//出现错误
//3. 手动签收
channel.basicAck(deliveryTag,false);
System.out.println("============");
catch (Exception e)
//e.printStackTrace();
//4.拒绝签收
/*
第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
*/
channel.basicNack(deliveryTag,true,true);
//channel.basicReject(deliveryTag,true);
③. 限流处理limit - 消费端
-
①. 如下图所示:如果在A系统中需要维护相关的业务功能,可能需要将A系统的服务停止,那么这个时候消息的生产者还是一直会向MQ中发送待处理的消息,消费者此时服务已经关闭,导致大量的消息都会在MQ中累积。如果当A系统成功启动后,默认情况下消息的消费者会一次性将MQ中累积的大量的消息全部拉取到自己的服务,导致服务在短时间内会处理大量的业务,可能会导致系统服务的崩溃。 所以消费端限流是非常有必要的
-
②. 给mq中发送100条消息
-
③. 可以通过MQ中的 listener-container配置属性
perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉取下一条消息 -
④. 代码实现
- 编写QosListener监听类,保证当前的监听类消息处理机制是ACK为手动方式
- 在配置文件的listener-container配置属性中添加配置
perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
public class QosListener implements ChannelAwareMessageListener
@Override
public void onMessage(Message message, Channel channel) throws Exception
Thread.sleep(1000);
//1.获取消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
//3. 签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" >
④. TTL过期队列 - 生产端
- ①. TTL全称Time To Live(存活时间/过期时间)。当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间
- ②. RabbitMQ管理控制台设置过期时间
- ③. 设置队列的过期时间
- 在消息的生产方中,在 spring-rabbitmq-producer.xml 配置文件中,添加如下配置:
- 编写发送消息测试方法
<!--ttl-->
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
<!--设置queue的参数-->
<rabbit:queue-arguments>
<!--队列的过期时间-->
<entry key="x-message-ttl" value="30000" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_ttl">
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
/**
* TTL:过期时间
* 1. 队列统一过期
* 2. 消息的单独过期
*/
@Test
public void ttlTest()
for (int i = 0; i < 10; i++)
rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.message","ttl message");
- ④. 设置单个消息的过期时间
- 如果设置了消息的过期时间,也设置了队列的过期时间,它以短的为准
- 队列过期后,会将队列所有消息全部移除
- 消息过期后,只有消息在队列顶端,才会去判断是否过期,如果过期,会移除掉
/**
* 如果设置了消息的过期时间,也设置了队列的过期时间,它以短的为准
* 队列过期后,会将队列所有消息全部移除
* 消息过期后,只有消息在队列顶端,才会去判断是否过期,如果过期,会移除掉
*/
@Test
public void ttlTest2()
for (int i = 0; i < 10; i++)
MessagePostProcessor postProcessor = new MessagePostProcessor()
@Override
public Message postProcessMessage(Message message) throws AmqpException
//1. 设置message的信息
message.getMessageProperties().setExpiration("5000");//消息的过期时间
return message;
;
rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.message","ttl message",postProcessor);
- ⑤. TTL队列总结
- 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期
- 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期
- 如果两者都进行了设置,以时间短的为准
⑤. 死信队列 - 交换机
- ①. 死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX
- ②. 消息成为死信的三种情况
- 队列消息长度到达限制(设置了某个消息最大为10,超过10后的消息为死信)
- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false
- 原队列存在消息过期设置,消息到达超时时间未被消费
-
③. 队列绑定死信交换机
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
-
④. 代码编码
<!--死信队列-->
<!--1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)-->
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
<!--3. 正常队列绑定死信交换机-->
<rabbit:queue-arguments>
<!--3.1 x-dead-letter-exchange:死信交换机名称-->
<!--<entry key="x-dead-letter-exchange" value="exchange_dlx"/>-->
<!--3.2 x-dead-letter-routing-key:发送给死信交换机的routing key (发送消息时指定的route key)-->
<entry key="x-dead-letter-routing-key" value="dlx.dead"></entry>
<!--(1). 设置队列的过期时间 x-message-ttl-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
<!--(2). 设置队列的长度限制 x-max-length-->
<entry key="x-max-length" value="10" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)-->
<rabbit:queue name="queue_dlx" id="queue_dlx">以上是关于RRabbitMQ05_消息可靠性投递ACK限流处理TTL队列死信交换机延迟队列的主要内容,如果未能解决你的问题,请参考以下文章
消息队列专题(高级特性篇):RabbitMQ 如何保证消息的可靠性投递传输和消费