RabbitMQ:消费者ACK机制生产者消息确认
Posted 呜昂王0521
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ:消费者ACK机制生产者消息确认相关的知识,希望对你有一定的参考价值。
文章目录
基础案例环境搭建:
基础环境搭建(手把手教你环境搭建和五种工作模式):https://blog.csdn.net/m0_48325361/article/details/123174843?spm=1001.2014.3001.5502
环境:
队列:
交换机:
交换机和队列进行绑定:
1. 生产者发送消息确认
如果保证消息的可靠性?需要解决如下问题
问题1:生产者能百分之百将消息发送给消息队列!
- 两种意外情况:
- 第一,消费者发送消息给MQ失败,消息丢失;
- 第二,交换机路由到队列失败,路由键写错;
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
-
confirm 确认模式
-
return 退回模式
rabbitmq 整个消息投递的路径为:
-
消息从生产者(producer)发送消息到交换机(exchange),不论是否成功,都会执行一个确认回调方法confirmCallback 。
-
消息从交换机(exchange)到消息队列( queue )投递失败则会执行一个返回回调方法 returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递
1.1 confirm 确认模式
目标:演示消息确认模式效果
生产者发布消息确认模式特点,不论消息是否进入交换机均执行回调方法
实现过程:
-
在生产者配置文件中,开启生产者发布消息确认模式
# 开启生产者确认模式:(confirm),投递到交换机,不论失败或者成功都回调 spring.rabbitmq.publisher-confirm-type=correlated # 开启return退回模式 spring.rabbitmq.publisher-returns=true
-
编写生产者确认回调方法
//发送消息回调确认类,实现回调接口ConfirmCallback,重写其中confirm()方法 @Component public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback /** * 投递到交换机,不论投递成功还是失败都回调次方法 * @param correlationData 投递相关数据 * @param ack 是否投递到交换机 * @param cause 投递失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) if (ack) System.out.println("消息进入交换机成功"); else System.out.println("消息进入交换机失败 , 失败原因:" + cause);
-
在RabbitTemplate中,设置消息发布确认回调方法
@Component public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback @Autowired private RabbitTemplate rabbitTemplate; /** * 创建RabbitTemplate对象之后执行当前方法,为模板对象设置回调确认方法 * 设置消息确认回调方法 * 设置消息回退回调方法 */ @PostConstruct public void initRabbitTemplate() //设置消息确认回调方法 rabbitTemplate.setConfirmCallback(this::confirm); /** * 投递到交换机,不论投递成功还是失败都回调次方法 * @param correlationData 投递相关数据 * @param ack 是否投递到交换机 * @param cause 投递失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) if (ack) System.out.println("消息进入交换机成功"); else System.out.println("消息进入交换机失败 , 失败原因:" + cause);
-
测试:
成功:
失败:
1.2 return 退回模式
消息回退模式特点:消息进入交换机,路由到队列过程中出现异常则执行回调方法
实现ReturnCallback接口
实现过程:
-
在配置文件中,开启生产者发布消息回退模式
# 开启生产者回退模式:(returns),交换机将消息路由到队列,出现异常则回调 spring.rabbitmq.publisher-returns=true
-
在MessageConfirmCallback类中,实现接口RabbitTemplate.ReturnCallback
@Component public class RabbitConfirm implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback //..省略
-
并重写RabbitTemplate.ReturnCallback接口中returnedMessage()方法
/** * 当消息投递到交换机,交换机路由到消息队列中出现异常,执行returnedMessaged方法 * @param message 投递消息内容 * @param replyCode 返回错误状态码 * @param replyText 返回错误内容 * @param exchange 交换机名称 * @param routingKey 路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) System.out.println("交换机路由至消息队列出错:>>>>>>>"); System.out.println("交换机:"+exchange); System.out.println("路由键:"+routingKey); System.out.println("错误状态码:"+replyCode); System.out.println("错误原因:"+replyText); System.out.println("发送消息内容:"+message.toString()); System.out.println("<<<<<<<<");
-
在RabbitTemplate中,设置消息发布回退回调方法
@PostConstruct public void initRabbitTemplate() //设置消息确认回调方法 rabbitTemplate.setConfirmCallback(this::confirm); //设置消息回退回调方法 rabbitTemplate.setReturnCallback(this::returnedMessage);
测试:
成功:失败:
源代码
@Component
public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 创建RabbitTemplate对象之后执行当前方法,为模板对象设置回调确认方法
* 设置消息确认回调方法
* 设置消息回退回调方法
*/
@PostConstruct
public void initRabbitTemplate()
//设置消息确认回调方法
rabbitTemplate.setConfirmCallback(this::confirm);
//设置消息退回方法
rabbitTemplate.setReturnsCallback(this::returnedMessage);
/**
* 投递到交换机,不论投递成功还是失败都回调次方法
*
* @param correlationData 投递相关数据
* @param ack 是否投递到交换机
* @param cause 投递失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
if (ack)
System.out.println("消息进入交换机成功");
else
System.out.println("消息进入交换机失败 , 失败原因:" + cause);
/**
* 当消息投递到交换机,交换机路由到消息队列中出现异常,执行returnedMessage方法
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage)
System.out.println("交换机路由至消息队列出错:>>>>>>>");
System.out.println("错误原因:" + returnedMessage.getReplyText());
System.out.println("发送消息内容:" + returnedMessage.getMessage());
System.out.println("错误状态码:" + returnedMessage.getReplyCode());
System.out.println("路由键:" + returnedMessage.getRoutingKey());
System.out.println("交换机:" + returnedMessage.getExchange());
1.1.3 小结
确认模式:
- 设置publisher-confirms=“true” 开启 确认模式。
- 实现RabbitTemplate.ConfirmCallback接口,重写confirm方法
- 特点:不论消息是否成功投递至交换机,都回调confirm方法,只有在发送失败时需要写业务代码进行处理。
退回模式
- 设置publisher-returns=“true” 开启 退回模式。
- 实现RabbitTemplate.ReturnCallback接口,重写returnedMessage方法
- 特点:消息进入交换机后,只有当从exchange路由到queue失败,才触发回调returnedMessage方法;
2. 消费者签收消息(ACK)
问题2:如何保证消费者能百分百接收到请求,且业务执行过程中还不能出错!
ack指 Acknowledge,拥有确认的含义,是消费端收到消息的一种确认机制;
消息确认的三种类型:
-
自动确认:acknowledge=“none”
-
手动确认:acknowledge=“manual”
-
根据异常情况确认:acknowledge=“auto”,(这种方式使用麻烦,不作讲解)
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck()
,手动签收,如果出现异常,则调用channel.basicNack()
方法,让其自动重新发送消息。
2.1 代码实现
目标:演示消费者手动确认效果
- 1.在消费者配置文件中开启ack机制
- 2.在@RabbitListener消费者监听器方法中加入Message和Channel参数
实现过程:
在消费者工程中,创建自定义监听器类CustomAckConsumerListener,实现ChannelAwareMessageListener接口
#rabbitmq启动ack机制,手动确认
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
修改消费者监听器方法
测试成功案例:
发送请求:
消费者控制台打印:
测试失败案例:
修改topic_queue1队列的业务逻辑,让其抛出异常
在可视化界面也可以看到消息一直在队列中
- 如果想手动清楚队列的消息
点击队列
源代码
@RabbitListener(queues = "topic_queue2")
public void topic2Ack(String msg, Channel channel, Message message) throws IOException
System.out.println("=====routingInfo====>" + msg);
/**
* 手动拒绝签收
* 参数1:当前消息的投递标签
* 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//ack方式二
@RabbitListener(queues = "topic_queue1")
public void routingAck(String msg, Channel channel, Message message) throws Exception
System.out.println("=====routingAck====>" + msg);
try
int i = 1 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("receiver success");
catch (Exception e)
System.out.println("业务逻辑产生异常" + e.getMessage());
/**
* 手动拒绝签收
* 参数1:当前消息的投递标签
* 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
* 参数3:是否重回队列,true为重回队列,false为不重回
*/
//消息重回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.out.println("receiver fail");
总结
后续还会更新TTL,死信队列,延迟队列等内容
以上是关于RabbitMQ:消费者ACK机制生产者消息确认的主要内容,如果未能解决你的问题,请参考以下文章