springboot rabbitmq 非阻塞重试机制实现
Posted wangjun5159
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot rabbitmq 非阻塞重试机制实现相关的知识,希望对你有一定的参考价值。
重试的应用场景
比如,系统之间同步数据,A系统发送数据给B系统,因为网络原因或者B系统正在重启,可能收不到信息,为了确保B能收到消息就得重试几次;经典的比如,微信支付回调
对后台通知交互时,如果微信收到商户的应答不符合规范或超时,微信认为通知失败,微信会通过一定的策略定期重新发起通知,尽可能提高通知的成功率,但微信不保证通知最终能成功。(通知频率为15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h
- 总计 24h4m)
程序执行完后必须打印输出 success。如果商家反馈给支付宝的字符不是 success 这 7
个字符,支付宝服务器会不断重发通知,直到超过 24 小时 22 分钟。一般情况下,25 小时以内完成 8
次通知(通知的间隔频率一般是:4m,10m,10m,1h,2h,6h,15h)。
问题背景
spring-retry的@Retryable方式,是阻塞式的,rabbitmq使用这种方式,如果重试次数过多,后边的消息会阻塞一直得不到处理,重试次数过少则不能保证对方收到回调;那提高消费者数量可以吗?也是不行的,最终会耗尽所有消费者。这就相当于你去银行办业务,轮到你时,你要办的业务正好办不了,窗口就一直等着,后边的人无法办业务;如果增加窗口的数量,同样的原因,最终导致全部窗口阻塞;
解决思路
解决思路是,每个消息只分配给一次机会,失败后,放入延迟队列,然后处理下一个消息,到达延迟时间,消息再次入列,这样消息不会阻塞。这就相当于,轮到你时,要办的业务办不了,那么就让你等一会,先让后边的人办,x时间后再把你放到窗口的队列中;
用一张图概括就是RabbitMQ Non-Blocking Retry Solutions in SpringBoot Solution B — Delay Plugin
具体步骤
安装延迟插件
首先rabbitmq要安装rabbitmq-delayed-message-exchange插件,我的rabbitmq用的是3.8.27所以使用对应的插件3.8.17
找到放插件的目录
root@xx:/# rabbitmq-plugins directories -s
Plugin archives directory: /opt/rabbitmq/plugins
Plugin expansion directory: /var/lib/rabbitmq/mnesia/rabbit@3f09bf7c586f-plugins-expand
Enabled plugins file: /etc/rabbitmq/enabled_plugins
将插件,放到/opt/rabbitmq/plugins
里,启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
springboot配置
application.yml配置
#RabbitMQ配置
rabbitmq:
host: localhost
port: 5672
username: username
password: password
listener:
simple:
# consumer收到消息,不回复ack
acknowledge-mode: NONE
# 抛出异常时,不再入队列
default-requeue-rejected: false
#自定义的mq errorhandler,处理失败时的重试次数
retry:
# 最大重试次数
max_retry: 5
# 延迟(毫秒)
delay: 1000
创建延迟exchange
搜了很多博客,都说要先通过management ui手动创建exchange,其实是不需要的,照下边这样就可以用代码创建exchange。
@Slf4j
@Configuration
public class MqConfig
@Autowired
private AmqpTemplate amqpTemplate;
/**
* 延迟交换机名称
*/
public static final String DELAY_EXCHANGE_NAME="delay-exchange";
/**
* 延迟交换机的类型
*/
public static final String DELAY_EXCHANGE_TYPE = "x-delayed-message";
/**
* 错误重试延迟
*/
private static Integer DELAY;
/**
* 最大重试次数,2880次*30秒=24小时,也就是说,默认重试24小时
*/
private static Integer MAX_RETRY;
@Value("$retry.max_retry")
public void setMaxRetry(Integer maxRetry)
MAX_RETRY = maxRetry;
@Value("$retry.delay")
public void setDelay(Integer delay)
DELAY = delay;
/**
* 创建延迟交换机,必须先创建才能监听
* @return
*/
@Bean
public CustomExchange delayExchange()
CustomExchange customExchange = new CustomExchange(MqConfig.DELAY_EXCHANGE_NAME,
MqConfig.DELAY_EXCHANGE_TYPE);
customExchange.getArguments().put("x-delayed-type",
ExchangeTypes.DIRECT);
return customExchange;
// mq的异常处理器
@Bean
public RabbitListenerErrorHandler retryErrorHandler()
RabbitListenerErrorHandler errorHandler = (amqpMessage, message, exception) ->
log.error("message监听器出错了",exception);
MessageProperties messageProperties = amqpMessage.getMessageProperties();
Map<String, Object> headers = messageProperties.getHeaders();
Integer xRetryCount = ((Integer) headers.get("retry-count"));
//根据失败次数,决定是否继续发送到延迟队列
if (xRetryCount == null)
xRetryCount = MAX_RETRY;
Integer retriedCount = (Integer) headers.get("retried-count");
if (retriedCount == null)
retriedCount = 1;
log.info("已执行次数:,最大重试次数:",retriedCount,xRetryCount);
if(retriedCount < xRetryCount)
log.info("已执行次数小于最大重试次数");
retriedCount++;
headers.put("retried-count",retriedCount);
String routingKey = messageProperties.getConsumerQueue();
messageProperties.setDelay(DELAY);
log.info("延迟:毫秒", DELAY);
log.info("路由key:",routingKey);
amqpTemplate.send(DELAY_EXCHANGE_NAME, routingKey, amqpMessage);
return null;
log.info("已执行次数达到最大重试次数了,不再进行重试");
return null;
;
return errorHandler;
监听器
/**
* 接收到消息后,将消息传给目的地
* @param message
*/
@RabbitListener(bindings =@QueueBinding(value = @Queue("队列名称"),
exchange = @Exchange(value = MqConfig.DELAY_EXCHANGE_NAME,
type = MqConfig.DELAY_EXCHANGE_TYPE),
key = "队列名称"),errorHandler = "retryErrorHandler")
public void doSynDataToThirdApp(String message)
//http请求发送给第三方,如果出错,则会执行errorHandler ,从而实现重试
简单来说,就是mq监听器处理消息失败,errorHandler会捕获到,如果未达到重试次数,则放入延迟队列,延迟时间后,再次处理;如果已经达到重试次数,则结束,不再入队列;
参考
- Message Listener Container Configuration
- Scheduling Messages with RabbitMQ
- RabbitMQ Delayed Message Plugin
- RabbitMQ Non-Blocking Retry Solutions in SpringBoot
以上是关于springboot rabbitmq 非阻塞重试机制实现的主要内容,如果未能解决你的问题,请参考以下文章