springboot rabbitmq 非阻塞重试机制实现

Posted wangjun5159

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot rabbitmq 非阻塞重试机制实现相关的知识,希望对你有一定的参考价值。

重试的应用场景

比如,系统之间同步数据,A系统发送数据给B系统,因为网络原因或者B系统正在重启,可能收不到信息,为了确保B能收到消息就得重试几次;经典的比如,微信支付回调

对后台通知交互时,如果微信收到商户的应答不符合规范或超时,微信认为通知失败,微信会通过一定的策略定期重新发起通知,尽可能提高通知的成功率,但微信不保证通知最终能成功。(通知频率为15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h

  • 总计 24h4m)

支付宝支付回调

程序执行完后必须打印输出 success。如果商家反馈给支付宝的字符不是 success 这 7
个字符,支付宝服务器会不断重发通知,直到超过 24 小时 22 分钟。一般情况下,25 小时以内完成 8
次通知(通知的间隔频率一般是:4m,10m,10m,1h,2h,6h,15h)。

问题背景

spring-retry的@Retryable方式,是阻塞式的,rabbitmq使用这种方式,如果重试次数过多,后边的消息会阻塞一直得不到处理,重试次数过少则不能保证对方收到回调;那提高消费者数量可以吗?也是不行的,最终会耗尽所有消费者。这就相当于你去银行办业务,轮到你时,你要办的业务正好办不了,窗口就一直等着,后边的人无法办业务;如果增加窗口的数量,同样的原因,最终导致全部窗口阻塞;

解决思路

解决思路是,每个消息只分配给一次机会,失败后,放入延迟队列,然后处理下一个消息,到达延迟时间,消息再次入列,这样消息不会阻塞。这就相当于,轮到你时,要办的业务办不了,那么就让你等一会,先让后边的人办,x时间后再把你放到窗口的队列中;
用一张图概括就是RabbitMQ Non-Blocking Retry Solutions in SpringBoot Solution B — Delay Plugin

具体步骤

安装延迟插件

首先rabbitmq要安装rabbitmq-delayed-message-exchange插件,我的rabbitmq用的是3.8.27所以使用对应的插件3.8.17

找到放插件的目录

root@xx:/# rabbitmq-plugins directories -s
Plugin archives directory: /opt/rabbitmq/plugins
Plugin expansion directory: /var/lib/rabbitmq/mnesia/rabbit@3f09bf7c586f-plugins-expand
Enabled plugins file: /etc/rabbitmq/enabled_plugins

将插件,放到/opt/rabbitmq/plugins里,启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

springboot配置

application.yml配置

#RabbitMQ配置
  rabbitmq:
    host: localhost
    port: 5672
    username: username
    password: password
    listener:
      simple:
       # consumer收到消息,不回复ack
        acknowledge-mode: NONE
        # 抛出异常时,不再入队列
        default-requeue-rejected: false

#自定义的mq errorhandler,处理失败时的重试次数
retry:
  # 最大重试次数
  max_retry: 5
  # 延迟(毫秒)
  delay: 1000

创建延迟exchange

搜了很多博客,都说要先通过management ui手动创建exchange,其实是不需要的,照下边这样就可以用代码创建exchange。

@Slf4j
@Configuration
public class MqConfig 
    @Autowired
    private AmqpTemplate amqpTemplate;
 /**
     * 延迟交换机名称
     */
    public static final String DELAY_EXCHANGE_NAME="delay-exchange";
    /**
     * 延迟交换机的类型
     */
    public static final String DELAY_EXCHANGE_TYPE = "x-delayed-message";
      /**
     * 错误重试延迟
     */
    private static  Integer DELAY;
    /**
     * 最大重试次数,2880次*30秒=24小时,也就是说,默认重试24小时
     */
    private static  Integer MAX_RETRY;
    @Value("$retry.max_retry")
    public void setMaxRetry(Integer maxRetry)
        MAX_RETRY = maxRetry;
    
    @Value("$retry.delay")
    public void setDelay(Integer delay)
        DELAY = delay;
    
/**
     * 创建延迟交换机,必须先创建才能监听
     * @return
     */
    @Bean
    public CustomExchange delayExchange() 
        CustomExchange customExchange = new CustomExchange(MqConfig.DELAY_EXCHANGE_NAME,
                MqConfig.DELAY_EXCHANGE_TYPE);
        customExchange.getArguments().put("x-delayed-type",
                ExchangeTypes.DIRECT);
        return customExchange;
    
// mq的异常处理器
@Bean
    public RabbitListenerErrorHandler retryErrorHandler() 
        RabbitListenerErrorHandler errorHandler = (amqpMessage, message, exception) -> 
            log.error("message监听器出错了",exception);
            MessageProperties messageProperties = amqpMessage.getMessageProperties();
            Map<String, Object> headers = messageProperties.getHeaders();
            Integer xRetryCount = ((Integer) headers.get("retry-count"));
            //根据失败次数,决定是否继续发送到延迟队列
            if (xRetryCount == null) 
                xRetryCount = MAX_RETRY;
            
            Integer retriedCount = (Integer) headers.get("retried-count");
            if (retriedCount == null) 
                retriedCount = 1;
            
            log.info("已执行次数:,最大重试次数:",retriedCount,xRetryCount);
            if(retriedCount < xRetryCount)
                log.info("已执行次数小于最大重试次数");
                retriedCount++;
                headers.put("retried-count",retriedCount);
                String routingKey = messageProperties.getConsumerQueue();
                messageProperties.setDelay(DELAY);
                log.info("延迟:毫秒", DELAY);
                log.info("路由key:",routingKey);
                amqpTemplate.send(DELAY_EXCHANGE_NAME, routingKey, amqpMessage);
                return null;
            
            log.info("已执行次数达到最大重试次数了,不再进行重试");
            return null;
        ;
        return errorHandler;
    
    

监听器

 /**
     * 接收到消息后,将消息传给目的地
     * @param message
     */
    @RabbitListener(bindings =@QueueBinding(value = @Queue("队列名称"),
            exchange = @Exchange(value = MqConfig.DELAY_EXCHANGE_NAME,
                    type = MqConfig.DELAY_EXCHANGE_TYPE),
            key = "队列名称"),errorHandler = "retryErrorHandler")
    public void doSynDataToThirdApp(String message)
        //http请求发送给第三方,如果出错,则会执行errorHandler  ,从而实现重试
    

简单来说,就是mq监听器处理消息失败,errorHandler会捕获到,如果未达到重试次数,则放入延迟队列,延迟时间后,再次处理;如果已经达到重试次数,则结束,不再入队列;

参考

以上是关于springboot rabbitmq 非阻塞重试机制实现的主要内容,如果未能解决你的问题,请参考以下文章

springboot rabbitmq 非阻塞重试机制实现

SpringBoot整合RabbitMQ重试机制及配置

SpringBoot整合RabbitMQ--重试/消息序列化--方法/实例

在使用 Flux 时顺序调用非阻塞操作,包括重试

springboot使用RabbitMQ实现延时任务

Spring Boot 实现 RabbitMQ 延迟消费和延迟重试队列