RabbitMQ的死信队列
Posted 小黄鸡1992
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ的死信队列相关的知识,希望对你有一定的参考价值。
1.业务背景
如果有有错误消息,如果手动nack同时将消息放回到队列中,那么这条消息会反复消费,留在队列中 。
如果nack后将消息丢弃,那么如果碰到网络抖动,消息也会丢失 。所以 通过建立死信队列避免消息丢失。
2.实现
文件目录如下:
1.原理
我们额外建立一条队列。当消息进入进入业务队列后,如果收到nack那么就将这条消息放入这条条队列中 。
2.修改pom文件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.修改配置文件
server:
port: 8088
spring:
rabbitmq:
host: 192.168.*.*
port: 5672
username: root
password: root
virtual-host: /
listener:
simple:
acknowledge-mode: manual #手动应答
prefetch: 1 # 每次只处理一个信息
publisher-confirms: true #开启消息确认机制
publisher-returns: true #支持消息发送失败返回队列
4.rabbitmq的配置
@Configuration
public class RabbitMqConfig
/**
* 连接工厂
*/
@Autowired
private ConnectionFactory connectionFactory;
/**
* 定制化amqp模版
*
* ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调 即消息发送到exchange ack
* ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调 即消息发送不到任何一个队列中 ack
*/
@Bean
public RabbitTemplate rabbitTemplate()
Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
rabbitTemplate.setMandatory(true);
// 发送消息确认, yml需要配置 publisher-confirms: true
rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
// 消息返回, yml需要配置 publisher-returns: true
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) ->
String correlationId = message.getMessageProperties().getCorrelationId().toString();
logger.debug("消息: 发送失败, 应答码: 原因: 交换机: 路由键: ", correlationId, replyCode, replyText, exchange,
routingKey);
);
return rabbitTemplate;
/**
* 确认发送消息是否成功(调用util方法)
*
* @return
*/
@Bean
public MsgSendConfirmCallBack msgSendConfirmCallBack()
return new MsgSendConfirmCallBack();
5.util类
发送是否成功的回调方法。
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback
/**
* 回调方法
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
System.out.println("MsgSendConfirmCallBack , 回调id:" + correlationData);
if (ack)
System.out.println("消息发送成功");
else
//可以将消息写入本地,使用定时任务重新发送
System.out.println("消息发送失败:" + cause + "\\n重新发送");
这里有一个点,如果想做实现消息失败重新发送,在注释处可以实现。需要将消息写入本地,如果失败从本地读取,然后发送,如果成功删除本地信息。
6.业务队列(如:订单业务)
这里声明了一个业务队列 ,关键点在于x-dead-letter-exchange,x-dead-letter-routing-key 两个参数。
@Configuration
public class BusinessConfig
/**
* 业务1模块direct交换机的名字
*/
public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";
/**
* 业务1 demo业务的队列名称
*/
public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";
/**
* 业务1 demo业务的routekey
*/
public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";
@Bean
public Queue yewu1DemoDeadQueue()
// 将普通队列绑定到死信队列交换机上
Map<String, Object> args = new HashMap<>(2);
args.put(RetryConfig.RETRY_LETTER_QUEUE_KEY, DeadConfig.FAIL_EXCHANGE_NAME);
args.put(RetryConfig.RETRY_LETTER_ROUTING_KEY, DeadConfig.FAIL_ROUTING_KEY);
return new Queue("yewu1_demo_dead_queue", true, false, false, args);
/**
* 将消息队列和交换机进行绑定
*/
@Bean
public Binding binding_one()
return BindingBuilder.bind(yewu1DemoDeadQueue()).to(yewu1Exchange())
.with("yewu1_demo_dead_key");
这里有一个点如果想持久化消息到磁盘,需要新建队列时,new Queue将第二个参数输入为true,但是面对大并发时效率会变低 。
7.死信队列
这里声明死信队列与绑定关系。
@Configuration
public class DeadConfig
/**
* 死信队列
*/
public final static String FAIL_QUEUE_NAME = "fail_queue";
/**
* 死信交换机
*/
public final static String FAIL_EXCHANGE_NAME = "fail_exchange";
/**
* 死信routing
*/
public final static String FAIL_ROUTING_KEY = "fail_routing";
/**
* 创建配置死信队列
*
*/
@Bean
public Queue deadQueue()
return new Queue(FAIL_QUEUE_NAME, true, false, false);
/**
* 死信交换机
*
* @return
*/
@Bean
public DirectExchange deadExchange()
DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);
return directExchange;
/**
* 绑定关系
*
* @return
*/
@Bean
public Binding failBinding()
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);
8.生产者消费者
生产者与消费者的代码实现。
public enum RabbitEnum
/**
* 处理成功
*/
ACCEPT,
/**
* 可以重试的错误
*/
RETRY,
/**
* 无需重试的错误
*/
REJECT
@RequestMapping("/sendDirectDead")
String sendDirectDead(@RequestBody String message) throws Exception
System.out.println("开始生产");
CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, "yewu1_demo_dead_key",
message, data);
System.out.println("结束生产");
System.out.println("发送id:" + data);
return "OK,sendDirect:" + message;
@RabbitListener(queues = "yewu1_demo_dead_queue")
protected void consumerDead(Message message, Channel channel) throws Exception
RabbitEnum ackSign = RabbitEnum.RETRY;
try
int i = 10 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
catch (Exception e)
ackSign = RabbitEnum.RETRY;
throw e;
finally
// 通过finally块来保证Ack/Nack会且只会执行一次
if (ackSign == RabbitEnum.ACCEPT)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
else if (ackSign == RabbitEnum.RETRY)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
9.实验
当发送yewu1_demo_dead_queue队列时,如果抛出异常,会放入死信队列中。
以上是关于RabbitMQ的死信队列的主要内容,如果未能解决你的问题,请参考以下文章