Spring + RabbitMQ 指数退避与 RetryTemplate 无响应

Posted

技术标签:

【中文标题】Spring + RabbitMQ 指数退避与 RetryTemplate 无响应【英文标题】:Spring + RabbitMQ Exponential Backoff with RetryTemplate unresponsive 【发布时间】:2017-12-14 15:11:31 【问题描述】:

我正在开发一个 Spring 项目,并且正在尝试为 RabbitMQ 队列实现带有死字的指数退避。 在此过程中,我创建了一个死信队列和一个死信交换(扇出),并将原始队列的 x-dead-letter-exchange 参数设置为死信交换的名称,并创建了一个带有 ExponentialBackOffPolicy 的 RetryTemplate . 出于测试目的,我的消费者只是通过抛出异常来拒绝它收到的所有消息。

这就是我的 RabbitMQConfiguration 类的样子:

@Configuration
@EnableAutoConfiguration
@PropertySource("file:$HOME/common/config/wave-planning.properties")
public class RabbitMQConfiguration 

    private final static String QUEUE_NAME = "orderPlanQueue";

    private static final String EXCHANGE_NAME = "orderPlanExchange";

    private static final String DL_EXCHANGE_NAME = "deadLetterExchange";

    private static final String DL_QUEUE_NAME = "deadLetterQueue";

    @Value("$rabbitmq.host:localhost")
    private String host;

    @Value("$rabbitmq.port:5672")
    private int port;

    @Value("$rabbitmq.user:guest")
    private String userName;

    @Value("$rabbitmq.password:guest")
    private String password;

    @Value("$rabbitmq.initial_backoff_interval:1000")
    private int INITIAL_INTERVAL_IN_MILLISECONDS;

    @Value("$rabbitmq.max_backoff_interval:10000")
    private int MAX_INTERVAL_IN_MILLISECONDS;

    @Autowired
    OrderPlanService orderPlanService;

    @Bean
    Queue queue() 
        Map<String, Object> qargs = new HashMap<String, Object>();
        qargs.put("x-dead-letter-exchange", DL_EXCHANGE_NAME);
        return new Queue(QUEUE_NAME, false, false, false, qargs);
    

    @Bean
    TopicExchange exchange() 
        return new TopicExchange(EXCHANGE_NAME);
    

    @Bean
    FanoutExchange deadLetterExchange()  return new FanoutExchange(DL_EXCHANGE_NAME); 

    @Bean
    Queue deadLetterQueue()  return new Queue(DL_QUEUE_NAME); 

    @Bean
    Binding deadLetterBinding(Queue deadLetterQueue, FanoutExchange deadLetterExchange) 
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange);
    

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) 
        return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);
    

    @Bean
    public ConnectionFactory connectionFactory() 
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        return connectionFactory;
    

    @Bean
    public MessageConverter Jackson2JsonMessageConverter() 
        return new Jackson2JsonMessageConverter();
    

    @Bean
    public AmqpTemplate rabbitTemplate() 
        RabbitTemplate template = new RabbitTemplate(connectionFactory());

        RetryTemplate retry = new RetryTemplate();
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();

        policy.setInitialInterval(INITIAL_INTERVAL_IN_MILLISECONDS);
        policy.setMultiplier(2);
        policy.setMaxInterval(MAX_INTERVAL_IN_MILLISECONDS);

        retry.setBackOffPolicy(policy);
        template.setRetryTemplate(retry);

        template.setRoutingKey(QUEUE_NAME);
        template.setMessageConverter(Jackson2JsonMessageConverter());
        return template;
    

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) 
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setMessageConverter(Jackson2JsonMessageConverter());
        container.setQueueNames(QUEUE_NAME);
        container.setMessageListener(listenerAdapter);
        container.setDefaultRequeueRejected(false);
        return container;
    

    @Bean
    MessageListenerAdapter listenerAdapter() 
        return new MessageListenerAdapter(orderPlanService, "consume");
    

消费者的相关部分基本上是这样的:

@Service
@Transactional
public class BaseOrderPlanService implements OrderPlanService 

    ....

    @Override
    public void consume(Object object) 
        throw new IllegalArgumentException("Test");
    

对于自动装配的整数值,使用默认值。 在运行此程序时,我看到交换和队列按预期在 rabbitmq 上创建,并在相关时使用预期的绑定和参数。 但是,当我使用路由键“orderPlanQueue”将消息传递给 orderPlanExchange 时,会导致无限循环,因为消息在队列中被拒绝并重复替换。 另一方面,如果 IllegalArgumentException 被 AmqpRejectAndDontRequeueException 替换,则在第一次拒绝尝试时,该消息会被简单地扔到死信队列中。

如果有人能指出我在这里可能做错的地方,即未应用重试策略,我将不胜感激。

编辑: 根据 Artem 的建议,使用 StatefulRetryOperationsInterceptor 编写代码。

@Configuration
@EnableAutoConfiguration
@PropertySource("file:$HOME/common/config/wave-planning.properties")
public class RabbitMQConfiguration 

    private final static String QUEUE_NAME = "orderPlanQueue";

    private static final String EXCHANGE_NAME = "orderPlanExchange";

    private static final String DL_EXCHANGE_NAME = "deadLetterExchange";

    private static final String DL_QUEUE_NAME = "deadLetterQueue";

    @Value("$rabbitmq.host:localhost")
    private String host;

    @Value("$rabbitmq.port:5672")
    private int port;

    @Value("$rabbitmq.user:guest")
    private String userName;

    @Value("$rabbitmq.password:guest")
    private String password;

    @Value("$rabbitmq.initial_backoff_interval:1000")
    private int INITIAL_INTERVAL_IN_MILLISECONDS;

    @Value("$rabbitmq.max_backoff_interval:10000")
    private int MAX_INTERVAL_IN_MILLISECONDS;

    @Autowired
    OrderPlanService orderPlanService;

    @Bean
    Queue queue() 
        Map<String, Object> qargs = new HashMap<String, Object>();
        qargs.put("x-dead-letter-exchange", DL_EXCHANGE_NAME);
        return new Queue(QUEUE_NAME, false, false, false, qargs);
    

    @Bean
    TopicExchange exchange() 
        return new TopicExchange(EXCHANGE_NAME);
    

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) 
        return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);
    

    @Bean
    FanoutExchange deadLetterExchange()  return new FanoutExchange(DL_EXCHANGE_NAME); 

    @Bean
    Queue deadLetterQueue()  return new Queue(DL_QUEUE_NAME); 

    @Bean
    Binding deadLetterBinding(Queue deadLetterQueue, FanoutExchange deadLetterExchange) 
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange);
    

    @Bean
    public ConnectionFactory connectionFactory() 
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        return connectionFactory;
    

    @Bean
    public MessageConverter Jackson2JsonMessageConverter() 
        return new Jackson2JsonMessageConverter();
    

    @Bean
    public AmqpTemplate rabbitTemplate() 
        RabbitTemplate template = new RabbitTemplate(connectionFactory());

        /*
        RetryTemplate retry = new RetryTemplate();
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();

        policy.setInitialInterval(INITIAL_INTERVAL_IN_MILLISECONDS);
        policy.setMultiplier(2);
        policy.setMaxInterval(MAX_INTERVAL_IN_MILLISECONDS);

        retry.setBackOffPolicy(policy);
        template.setRetryTemplate(retry);
        */

        template.setRoutingKey(QUEUE_NAME);
        template.setMessageConverter(Jackson2JsonMessageConverter());
        return template;
    

    @Bean
    StatefulRetryOperationsInterceptor interceptor() 
        return RetryInterceptorBuilder.stateful()
                .maxAttempts(4)
                .backOffOptions(INITIAL_INTERVAL_IN_MILLISECONDS, 2, MAX_INTERVAL_IN_MILLISECONDS)
                .build();
    

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) 
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setMessageConverter(Jackson2JsonMessageConverter());
        container.setQueueNames(QUEUE_NAME);
        container.setMessageListener(listenerAdapter);
        container.setAdviceChain(new Advice[] interceptor());
        return container;
    

    @Bean
    MessageListenerAdapter listenerAdapter() 
        return new MessageListenerAdapter(orderPlanService, "consume");
    


【问题讨论】:

【参考方案1】:

RabbitTemplate 的重试策略与 DLQ/DLX 完全无关。这是针对消费者方面的。

参见参考手册here的区别:

您现在可以将RabbitTemplate 配置为使用RetryTemplate 来帮助处理代理连接问题。

和here:

要限制客户端重新发送的次数,一种选择是在侦听器的建议链中使用StatefulRetryOperationsInterceptor

因此,您必须重新考虑您的逻辑并将重试功能置于SimpleMessageListenerContainer 定义中。

【讨论】:

感谢您的回答。但是,我尝试按照建议通过 setAdviceChain 将带有 maxAttempt 和 backOffOptions 的 StatefulRetryOperationsInterceptor 添加到容器中,但似乎没有任何区别。 不确定发生了什么。有github.com/spring-projects/spring-amqp/blob/master/…在你身边学习。 感谢您的帮助。不确定问题出在哪里,但使用无状态拦截器有效。 嗯,没错。从某些版本开始,默认情况下 Spring Retry 不会为方法参数构建唯一键。 Spring AMQP 2.0 现在对这个问题做了一些不同的逻辑:docs.spring.io/spring-amqp/docs/2.0.0.BUILD-SNAPSHOT/reference/…。关注MissingMessageIdAdvice。但是,是的,stateless 在大多数情况下确实足够了。

以上是关于Spring + RabbitMQ 指数退避与 RetryTemplate 无响应的主要内容,如果未能解决你的问题,请参考以下文章

指数退避和AIMD为什么都青睐数字2

算法6指数退避算法

AWS Beanstalk:SQS 的指数退避?

python中的指数退避实现

RxJava 中的指数退避

如何使用固定超时和尝试次数实现指数退避/延迟计算?