springboot集成Rabbitmq——消费者手动应答
Posted 替罪的羊
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot集成Rabbitmq——消费者手动应答相关的知识,希望对你有一定的参考价值。
消费者默认是自动确认的,这是及其不安全的。一般情况下我们都需要手动确认去保证数据的安全性。
消费者确认方式
1.basicAck
basicAck方法是肯定的交付,一般在该消息处理完后执行,该消息才会在队列里面被删除,不然会处于UnAcked的状态存在队列中。
其方法有两个参数:
参数1:消费消息的index
参数2: 是否批量确认消息,前提是在同一个channel里面,且是在该消息确认前没有被确认的消息才能批量确认。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
2.basicReject
basicReject是否定的交付,一般在消费消息时出现异常等的时候执行。可以将该消息丢弃或重排序去重新处理消息
其方法有两个参数:
参数1: 消费消息的index
参数2: 对异常消息的处理,true表示重排序,false表示丢弃
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
3.basicNack
basicNack也是否定的交付,其功能和basicReject是一样的。区别是basicNack比basicReject的功能更强一些。他能够一次丢弃多个或重排序多个消息
其方法有三个参数:
参数1:消费消息的index
参数2:是否批量否定多个消息,设为false就与basicReject功能一样,triue的前提也是在同一个channel,且在该消息否定前存在未确认的消息
参数3: 对异常消息的处理,true表示重排序,false表示丢弃
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
案例
一.配置文件
#消费者每次消费一个消息
spring.rabbitmq.listener.simple.prefetch=1
#消费者手动答应
spring.rabbitmq.listener.simple.acknowledge-mode=manual
二.创建队列
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig
@Bean
public Queue beanHello()
return new Queue("beanHello", true, false, false, null);
三.生产者
import org.slf4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.UUID;
@RestController
public class MyRabbitmqController
@Resource
private RabbitTemplate rabbitTemplate;
private Logger logger = LoggerFactory.getLogger(MyRabbitmqController.class);
@GetMapping("beanHello")
public void hello(String mess)
MessageProperties messageProperties = new MessageProperties();
//设置消息唯一ID
messageProperties.setMessageId(UUID.randomUUID().toString());
Message message = new Message(mess.getBytes(),messageProperties);
logger.info("生产者 消息id: ",messageProperties.getMessageId());
rabbitTemplate.convertAndSend("beanHello",message);
四.消费者
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* @author jitwxs
* @date 2023年02月06日 11:56
*/
@Component
public class MyRabbitListener
private Logger logger = LoggerFactory.getLogger(MyRabbitmqController.class);
@RabbitListener(queuesToDeclare = @Queue("beanHello"))
public void hello(Message message, Channel channel) throws IOException
String messageId = message.getMessageProperties().getMessageId();
String s = new String(message.getBody(), StandardCharsets.UTF_8);
logger.info("消费者 消息id:,消息为:",messageId, s);
try
/*
* 业务代码,
* int n = 0/0 模拟异常
*
* */
int n = 0 / 0;
//确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), fale);
logger.info("ok");
catch (Exception e)
//退回消息,将消息重新放回队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
logger.info("error");
五.测试
以上是关于springboot集成Rabbitmq——消费者手动应答的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot(二十二)集成RabbitMQ---MQ实战演练
RabbitMQ 进阶 -- SpringBoot 集成 RabbitMQ实现生产者与消费者模式
SpringBoot集成RabbitMQ之死信队列限流队列延迟队列(第四节)