RabbitMQ的延时重试队列
Posted 小黄鸡1992
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ的延时重试队列相关的知识,希望对你有一定的参考价值。
1.背景
通过上文学习知道了死信队列,如果只是网络抖动,出现异常那么直接进入死信队列,那么是不合理的。这就可以使用延时重试队列,本文将介绍如何实现延时重试队列。
2.原理
图是俺在网上找的,请原作者谅解。
- 发送到业务队里 如果正常收到 正常运行
- 如果处理失败 重试 并投入延时队列 如果超过延时时间 重新投入业务队列
- 如果重试次数大于3 那么进入死信队列
3.代码实现
1.业务队列
这里声明业务队列与绑定关系。
@Configuration
public class BusinessConfig
/**
* yewu1模块direct交换机的名字
*/
public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";
/**
* demo业务的队列名称
*/
public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";
/**
* demo业务的routekey
*/
public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";
/**
* 业务交换机交换机(一个项目一个业务交换机即可)
* 1.定义direct exchange,绑定queueTest
* 2.durable="true" rabbitmq重启的时候不需要创建新的交换机
* 3.direct交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列
* fanout交换器中没有路由键的概念,他会把消息发送到所有绑定在此交换器上面的队列中。
* topic交换器你采用模糊匹配路由键的原则进行转发消息到队列中
*/
@Bean
public DirectExchange yewu1Exchange()
DirectExchange directExchange = new DirectExchange(YEWU1_EXCHANGE, true, false);
return directExchange;
/**
* 新建队列(一个业务需要一个队列一个routekey 命名格式 项目名-业务名)
* 1.队列名称
* 2.durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
* 3.exclusive 表示该消息队列是否只在当前connection生效,默认是false
* 4.auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
* 5.对nack或者发送超时的 发送给死信队列 args是绑定死信队列
*
*/
@Bean
public Queue yewu1DemoQueue()
return new Queue(YEWU1_DEMO_QUEUE, true, false, false);
/**
* 交换机与routekey绑定
*
* @return
*/
@Bean
public Binding yewu1DemoBinding()
return BindingBuilder.bind(yewu1DemoQueue()).to(yewu1Exchange())
.with(YEWU1_DEMO_ROUTINGKEY);
2.延时队列
声明延时队列与绑定关系。
@Configuration
public class RetryConfig
/**
* 延时队列 交换机配置标识符(固定)
*/
public static final String RETRY_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 延时队列交换机绑定配置键标识符(固定)
*/
public static final String RETRY_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
/**
* 延时队列消息的配置超时时间枚举(固定)
*/
public static final String RETRY_MESSAGE_TTL = "x-message-ttl";
/**
* yewu1模块延时队列交换机
*/
public final static String YEWU1_RETRY_EXCHANGE_NAME = "yewu1_retry_exchange";
/**
* yewu1模块DEMO业务延时队列
*/
public final static String YEWU1_DEMO_RETRY_QUEUE_NAME = "yewu1_demo_retry_queue";
/**
* yewu1模块DEMO延时队列routekey
*/
public final static String YEWU1_DEMO_RETRY_ROUTING_KEY = "yewu1_demo_retry_key";
/**
* 延时队列交换机
*
* @return
*/
@Bean
public DirectExchange yewu1RetryExchange()
DirectExchange directExchange = new DirectExchange(YEWU1_RETRY_EXCHANGE_NAME, true, false);
return directExchange;
/**
* 新建延时队列 一个业务队列需要一个延时队列
*
* @return
*/
@Bean
public Queue yewu1DemoRetryQueue()
Map<String, Object> args = new ConcurrentHashMap<>(3);
// 将消息重新投递到业务交换机Exchange中
args.put(RETRY_LETTER_QUEUE_KEY, BusinessConfig.YEWU1_EXCHANGE);
args.put(RETRY_LETTER_ROUTING_KEY, BusinessConfig.YEWU1_DEMO_ROUTINGKEY);
// 消息在队列中延迟3s后超时,消息会重新投递到x-dead-letter-exchage对应的队列中,routingkey为自己指定
args.put(RETRY_MESSAGE_TTL, 3 * 1000);
return new Queue(YEWU1_DEMO_RETRY_QUEUE_NAME, true, false, false, args);
/**
* 绑定以上定义关系
*
* @return
*/
@Bean
public Binding retryDirectBinding()
return BindingBuilder.bind(yewu1DemoRetryQueue()).to(yewu1RetryExchange())
.with(YEWU1_DEMO_RETRY_ROUTING_KEY);
3.死信队列
声明私信队列与绑定关系。
@Configuration
public class DeadConfig
/**
* 死信队列
*/
public final static String FAIL_QUEUE_NAME = "fail_queue";
/**
* 死信交换机
*/
public final static String FAIL_EXCHANGE_NAME = "fail_exchange";
/**
* 死信routing
*/
public final static String FAIL_ROUTING_KEY = "fail_routing";
/**
* 创建配置死信队列
*
*/
@Bean
public Queue deadQueue()
return new Queue(FAIL_QUEUE_NAME, true, false, false);
/**
* 死信交换机
*
* @return
*/
@Bean
public DirectExchange deadExchange()
DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);
return directExchange;
/**
* 绑定关系
*
* @return
*/
@Bean
public Binding failBinding()
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);
4.生产者
生产者如上文,通用代码。
@RestController
@RequestMapping("/TestRabbit")
public class ProducerDemo
@Resource
private RabbitTemplate rabbitTemplate;
//@RequestMapping("/sendDirect")
String sendDirect(@RequestBody String message) throws Exception
System.out.println("开始生产");
CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, BusinessConfig.YEWU1_DEMO_ROUTINGKEY,
message, data);
System.out.println("结束生产");
System.out.println("发送id:" + data);
return "OK,sendDirect:" + message;
5.消费者
大量的逻辑,请参考注释。
public enum RabbitEnum
/**
* 处理成功
*/
ACCEPT,
/**
* 可以重试的错误
*/
RETRY,
/**
* 无需重试的错误
*/
REJECT
@Component
public class ConsumerDemo
private final static Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);
@Resource
private RabbitTemplate rabbitTemplate;
// @RabbitListener(queues = "yewu1_demo_queue")
protected void consumer(Message message, Channel channel) throws Exception
RabbitEnum ackSign = RabbitEnum.RETRY;
System.out.println(message.getMessageProperties().getCorrelationId());
try
// 可以加入重复消费判断
int i = 1 / 0;
catch (Exception e)
ackSign = RabbitEnum.RETRY;
throw e;
finally
// 通过finally块来保证Ack/Nack会且只会执行一次
if (ackSign == RabbitEnum.ACCEPT)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
else if (ackSign == RabbitEnum.RETRY)
String correlationData =
(String)message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
System.out.println(message.getMessageProperties().getCorrelationId());
long retryCount = getRetryCount(message.getMessageProperties());
if (retryCount >= 3)
// 重试次数超过3次,则将消息发送到失败队列等待特定消费者处理或者人工处理
try
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
rabbitTemplate.convertAndSend(DeadConfig.FAIL_EXCHANGE_NAME, DeadConfig.FAIL_ROUTING_KEY,
message, new CorrelationData(correlationData));
logger.info("连续失败三次,将消息发送到死信队列,发送消息:" + new String(message.getBody()));
catch (Exception e1)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
logger.error("发送死信队列报错:" + e1.getMessage() + ",原始消息:" + new String(message.getBody()));
else
try
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重试次数不超过3次,则将消息发送到重试队列等待重新被消费
rabbitTemplate.convertAndSend(RetryConfig.YEWU1_RETRY_EXCHANGE_NAME,
RetryConfig.YEWU1_DEMO_RETRY_ROUTING_KEY, message,
new CorrelationData(correlationData));
logger.info("消费失败,消息发送到重试队列;" + "原始消息:" + new String(message.getBody()) + ";第"
+ (retryCount + 1) + "次重试");
catch (Exception e1)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
logger.error("消息发送到重试队列的时候,异常了:" + e1.getMessage() + ",重新发送消息");
/**
* 获取消息被重试的次数
*/
public long getRetryCount(MessageProperties messageProperties)
Long retryCount = 0L;
if (null != messageProperties)
List<Map<String, ?>> deaths = messageProperties.getXDeathHeader();
if (deaths != null && deaths.size() > 0)
Map<String, Object> death = (Map<String, Object>)deaths.get(0);
retryCount = (Long)death.get("count");
return retryCount;
参考:https://www.cnblogs.com/mfrank/p/11260355.html
以上是关于RabbitMQ的延时重试队列的主要内容,如果未能解决你的问题,请参考以下文章