异步通信rabbitmq——消息重试
Posted exman
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了异步通信rabbitmq——消息重试相关的知识,希望对你有一定的参考价值。
目标:
利用RabbitMQ实现消息重试和失败处理,实现可靠的消费消费。在消息消费异常时,自动延时将消息重试,当重试超过一定次数后,则列为异常消息,等待后续特殊处理。
准备:
TTL:Time-To-Live,通过给消息、队列设置过期时间(单位:毫秒),来控制消息、队列的生命周期。在达到时间后,消息会变成dead message。
Dead Letter Exchanges:同普通的exchange无区别
消息重制本质是通过消息转发来实现的。消息转发的触发是:
rejected - the message was rejected with requeue=false,
expired - the TTL of the message expired; or
maxlen - the maximum allowed queue length was exceeded.
这里,我们使用expired来实现。给消息设置TTL,到期后消息未被消费,则会变成dead messager,转发到dead letter exchange。
流程图:
实现:
1、创建三个exchange。没有特殊要求
2、创建三个queue。
[email protected]作为重试队列,需要特殊处理:
x-dead-letter-exchange: clickExchange
x-dead-letter-routing-key: clickKey
x-message-ttl: 30000
3、处理代码
public void retry() throws IOException {
//消息消费
GetResponse getResponse = null;
try {
getResponse = rabbitUtil.fetch(DQConstant.CLICK_QUEUE_NAME, false);
/**
* 业务处理
*/
throw new RuntimeException("错粗了");
} catch (Exception e) {
if(null != getResponse) {
long retryCount = getRetryCount(getResponse.getProps());
if(retryCount > 3) {
//重试超过3次的,直接存入失败队列
AMQP.BasicProperties properties = getResponse.getProps();
Map<String, Object> headers = properties.getHeaders();
if(null == headers) {
headers = new HashMap<>();
}
properties.builder().headers(headers);
rabbitUtil.send(DQConstant.CLICK_FAILED_EXCHANGE_NAME, DQConstant.CLICK_FAILED_ROUTING_KEY, properties, getResponse.getBody());
} else {
//重试不超过3次的,加入到重试队列
AMQP.BasicProperties properties = getResponse.getProps();
Map<String, Object> headers = properties.getHeaders();
if(null == headers) {
headers = new HashMap<>();
}
properties.builder().headers(headers);
rabbitUtil.send(DQConstant.CLICK_RETRY_EXCHANGE_NAME, DQConstant.CLICK_RETRY_ROUTING_KEY, properties, getResponse.getBody());
}
}
}
if(null != getResponse) {
rabbitUtil.ack(getResponse);
}
}
private long getRetryCount(AMQP.BasicProperties properties) {
long retryCount = 0;
Map<String, Object> headers = properties.getHeaders();
if(null != headers) {
if(headers.containsKey("x-death")) {
List<Map<String, Object>> deathList = (List<Map<String, Object>>) headers.get("x-death");
if(!deathList.isEmpty()) {
Map<String, Object> deathEntry = deathList.get(0);
retryCount = (Long)deathEntry.get("count");
}
}
}
return retryCount;
}
4、x-death的使用:message在转换成dead letter时,会在其header里添加一个名为x-death的数组。数组元素就是一次dead lettering event的记录。包含count:消息几次变成了dead letter。
总结:
此处只是本人的拙见。如有更好的提议,欢迎拍砖。
---------------------
作者:洛杉矶的管理局
来源:CSDN
原文:://blog.csdn.net/qq_18991441/article/details/80692255
版权声明:本文为博主原创文章,转载请附上博文链接!
以上是关于异步通信rabbitmq——消息重试的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ 服务异步通信 -- 入门案例(消息预存机制)SpringAMQP发布订阅模式(FanoutExchangeDirectExchangeTopicExchange)消息转换器
RabbitMQ 服务异步通信 -- 初识MQ(同步通信和异步通信MQ几种常见MQ的对比)RabbitMQ安装和介绍