RabbitMQ——死信队列的三大来源应用举例
Posted 张起灵-小哥
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ——死信队列的三大来源应用举例相关的知识,希望对你有一定的参考价值。
1.什么是死信队列?
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景: 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
三大来源:
- 消息 TTL 过期
- 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false
2.三大来源之消息TTL过期
我们就参考上面的架构图来写代码。首先是生产者,其中还有一个工具类代码。
package com.szh.rabbitmq.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*
*/
public class RabbitMqUtils
public static Channel getChannel()
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.40.130");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
Connection connection = null;
Channel channel = null;
try
connection = factory.newConnection();
channel = connection.createChannel();
catch (IOException e)
e.printStackTrace();
catch (TimeoutException e)
e.printStackTrace();
return channel;
package com.szh.rabbitmq.dead;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.szh.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* 死信队列之生产者
*/
public class Producer
//普通交换机的名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws IOException
Channel channel = RabbitMqUtils.getChannel();
//设置接收消息的过期时间,超过这个时间则转到死信队列
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder().expiration("10000").build();
for (int i = 1; i <= 10; i++)
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE,"zql",properties,message.getBytes(StandardCharsets.UTF_8));
下面是两个消费者。
package com.szh.rabbitmq.dead;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.szh.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* 死信队列之消费者01
*/
public class Consumer01
//普通交换机的名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机的名称
public static final String DEAD_EXCHANGE = "dead_exchange";
//普通队列的名称
public static final String NORMAL_QUEUE = "normal_queue";
//死信队列的名称
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException
Channel channel = RabbitMqUtils.getChannel();
//绑定交换机、声明交换机的类型
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
//绑定普通队列, 正常队列绑定死信队列信息
Map<String, Object> arguments = new HashMap<>();
//正常队列设置死信交换机, 参数 key 是固定值
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//正常队列设置死信 routing-key, 参数 key 是固定值
arguments.put("x-dead-letter-routing-key","szh");
//正常队列最大长度限制
//arguments.put("x-max-length",6);
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
//绑定死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//普通交换机与普通队列进行绑定
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zql");
//死信交换机与死信队列进行绑定
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"szh");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag,message) ->
// String msg = new String(message.getBody());
// if ("info5".equals(msg))
// System.out.println(msg + "此消息已被Consumer01拒绝....");
// //执行拒绝策略,被拒绝的消息将转到死信队列中
// channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
// else
// System.out.println("Consumer01接收的消息是:" + msg);
// channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
//
String msg = new String(message.getBody());
System.out.println("Consumer01接收的消息是:" + msg);
;
//修改autoAck为false,表示不自动应答
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag -> );
package com.szh.rabbitmq.dead;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.szh.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
/**
*
*/
public class Consumer02
//死信队列的名称
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException
Channel channel = RabbitMqUtils.getChannel();
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, message) ->
System.out.println("Consumer02接收的消息是:" + new String(message.getBody()));
;
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> );
下面我们测试一下,因为有关队列的声明都写在了第一个消费者中,所以先启动第一个消费者,然后模拟消息TTL过期(直接将消费者1down掉)。
然后再启动生产者,此时会向MQ中发送10条消息,这些消息此时会存在normal_queue中。由于消费者1已经down掉,它自然接收不到消息,那么等消息过期之后(在生产者代码中设定的是10s),这些消息会被转到死信队列dead_queue中,此时再启动消费者2,它就可以从死信队列中接收到这10条消息。
3.三大来源之队列达到最大长度
工具类和上面的案例是一样的,其余的生产者和消费者稍有变动。
package com.szh.rabbitmq.dead;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.szh.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* 死信队列之生产者
*/
public class Producer
//普通交换机的名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws IOException
Channel channel = RabbitMqUtils.getChannel();
//设置接收消息的过期时间,超过这个时间则转到死信队列
// AMQP.BasicProperties properties = new AMQP.BasicProperties()
// .builder().expiration("10000").build();
for (int i = 1; i <= 10; i++)
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE,"zql",null,message.getBytes(StandardCharsets.UTF_8));
package com.szh.rabbitmq.dead;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.szh.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* 死信队列之消费者01
*/
public class Consumer01
//普通交换机的名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机的名称
public static final String DEAD_EXCHANGE = "dead_exchange";
//普通队列的名称
public static final String NORMAL_QUEUE = "normal_queue";
//死信队列的名称
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException
Channel channel = RabbitMqUtils.getChannel();
//绑定交换机、声明交换机的类型
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
//绑定普通队列, 正常队列绑定死信队列信息
Map<String, Object> arguments = new HashMap<>();
//正常队列设置死信交换机, 参数 key 是固定值
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//正常队列设置死信 routing-key, 参数 key 是固定值
arguments.put("x-dead-letter-routing-key","szh");
//正常队列最大长度限制
arguments.put("x-max-length",6);
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
//绑定死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//普通交换机与普通队列进行绑定
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zql");
//死信交换机与死信队列进行绑定
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"szh");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag,message) ->
// String msg = new String(message.getBody());
// if ("info5".equals(msg))
// System.out.println(msg + "此消息已被Consumer01拒绝....");
// //执行拒绝策略,被拒绝的消息将转到死信队列中
// channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
// else
// System.out.println("Consumer01接收的消息是:" + msg);
// channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
//
String msg = new String(message.getBody());
System.out.println("Consumer01接收的消息是:" + msg);
;
//修改autoAck为false,表示不自动应答
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag -> );
package com.szh.rabbitmq.dead;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.szh.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
/**
*
*/
public class Consumer02
//死信队列的名称
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException
Channel channel = RabbitMqUtils.getChannel();
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, message) ->
System.out.println("Consumer02接收的消息是:" + new String(message.getBody()));
;
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> );
下面我们测试一下,还是先启动消费者1确保MQ中已经有了相应的交换机和队列,然后将消费者1先停掉,去启动生产者,先向MQ中发送10条消息,看看结果。
此时由于消费者1被停掉了,它就无法接收消息,而它所承受的队列最大长度为6,所以这6个会堆积在normal_queue队列中,剩下的 10-6=4 条消息会转到死信队列中。
当我们启动消费者1、2之后,可以看到它们能够接收到相应队列中的消息。
4.三大来源之消息被拒绝
package com.szh.rabbitmq.dead;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.szh.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* 死信队列之生产者
*/
public class Producer
//普通交换机的名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws IOException
Channel channel = RabbitMqUtils.getChannel();
//设置接收消息的过期时间,超过这个时间则转到死信队列
// AMQP.BasicProperties properties = new AMQP.BasicProperties()
// .builder().expiration("10000").build();
for (int i = 1; i <= 10; i++)
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE,"zql",null,message.getBytes(StandardCharsets.UTF_8));
package com.szh.rabbitmq.dead;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.szh.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* 死信队列之消费者01
*/
public class Consumer01
//普通交换机的名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机的名称
public static final String DEAD_EXCHANGE = "dead_exchange";
//普通队列的名称
public static final String NORMAL_QUEUE = "normal_queue";
//死信队列的名称
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException
Channel channel = RabbitMqUtils.getChannel();
//绑定交换机、声明交换机的类型
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
//绑定普通队列, 正常队列绑定死信队列信息
Map<String, Object> arguments = new HashMap<>();
//正常队列设置死信交换机, 参数 key 是固定值
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//正常队列设置死信 routing-key, 参数 key 是固定值
arguments.put("x-dead-letter-routing-key","szh");
//正常队列最大长度限制
//arguments.put("x-max-length",6);
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
//绑定死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//普通交换机与普通队列进行绑定
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zql");
//死信交换机与死信队列进行绑定
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"szh");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag,message) ->
String msg = new String(message.getBody());
if ("info5".equals(msg))
System.out.println(msg + "此消息已被Consumer01拒绝....");
//执行拒绝策略,被拒绝的消息将转到死信队列中
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
else
System.out.println("Consumer01接收的消息是:" + msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
// String msg = new String(message.getBody());
// System.out.println("Consumer01接收的消息是:" + msg);
;
//修改autoAck为false,表示不自动应答
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag -> );
package com.szh.rabbitmq.dead;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.szh.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
/**
*
*/
public class Consumer02
//死信队列的名称
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException
Channel channel = RabbitMqUtils.getChannel();
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, message) ->
System.out.println("Consumer02接收的消息是:" + new String(message.getBody()));
;
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> );
下面我们测试一下,还是先启动消费者1确保MQ中已经有了相应的交换机和队列,然后将消费者1先停掉,去启动生产者,先向MQ中发送10条消息,看看结果。
生产者消息发送完毕之后,因为消费者1被down掉了,所以这10条消息被堆积到了normal_queue队列中。
此时我们再启动消费者1,可以看到它正常的去MQ中消费,但是其中的info5被拒绝了,而这个拒绝的消息就会转到死信队列中。
在死信队列中就看到了info5这条消息,此时再启动消费者2,它就可以顺利的去死信队列中消费了。
以上是关于RabbitMQ——死信队列的三大来源应用举例的主要内容,如果未能解决你的问题,请参考以下文章