RabbitMQ - DLX (Dead Letter Exchanges)实现延迟队列

Posted OkidoGreen

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ - DLX (Dead Letter Exchanges)实现延迟队列相关的知识,希望对你有一定的参考价值。

一、延迟队列设计
队列中的消息成为死信(Dead Letter)的几种情况:
1) The message is rejected (basic.reject or basic.nack) with requeue=false,
2) The TTL for the message expires; or
3) The queue length limit is exceeded.
而通过配置队列的x-dead-letter-exchange及x-dead-letter-routing-key键值,Dead Letter就会被重新发送到指定的DLX中。
  
如上图所示,注意延迟队列并没有消费者,并且设置参数: 

x-dead-letter-exchange:DLX
x-dead-letter-routing-key:DLK
x-max-length:1000000 // length = 1million
x-message-ttl:30000 // TTL = 30s

生产者发布的消息路由到该延迟队列,30秒后消息成为死信并被重新发送到死信交换器DLX中;消费者订阅死信队列DLQ,消费消息。
二、延迟队列实现

@Configuration
@PropertySource("classpath:rabbitmq-cfg.properties")
public class RabbitConfig 
    /**
     * 配置RabbitMQ连接工厂
     * 
     * @param host
     * @param port
     * @param username
     * @param password
     * @param virtualHost
     * @param connectionTimeout
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory(@Value("$spring.rabbitmq.host") String host,
            @Value("$spring.rabbitmq.port") String port, @Value("$spring.rabbitmq.username") String username,
            @Value("$spring.rabbitmq.password") String password,
            @Value("$spring.rabbitmq.virtual-host") String virtualHost,
            @Value("$spring.rabbitmq.connection-timeout") String connectionTimeout) 
        CachingConnectionFactory ret = new CachingConnectionFactory();
        ret.setHost(host);
        ret.setPort(Integer.parseInt(port));
        ret.setUsername(username);
        ret.setPassword(password);
        ret.setVirtualHost(virtualHost);
        ret.setConnectionTimeout(Integer.parseInt(connectionTimeout));
        return ret;
    
 
    /**
     * 声明30s延迟队列
     * 
     * @param name
     * @return
     */
    @Bean
    public Queue queueDelay30s(@Value("$atf.queue.delay.30s") String name) 
        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-message-ttl", 30 * 1000); // Message TTL = 30s
        arguments.put("x-max-length", 1000000); // Max length = 1million
        // 死信路由到死信交换器DLX
        arguments.put("x-dead-letter-exchange", "atf.exchange.delay.dlx");
        arguments.put("x-dead-letter-routing-key", "atf.routingkey.delay.dlk");
        return new Queue(name, true, false, false, arguments);
    
 
    /**
     * 声明死信队列DLQ
     * 
     * @param name
     * @return
     */
    @Bean
    public Queue queueDelayDlq(@Value("$atf.queue.delay.dlq") String name) 
        return new Queue(name, true, false, false);
    
 
    /**
     * 声明延迟交换器
     * 
     * @param name
     * @return
     */
    @Bean
    public DirectExchange exchangeDelay(@Value("$atf.exchange.delay") String name) 
        DirectExchange ret = new DirectExchange(name, true, false);
        ret.setInternal(false);
        return ret;
    
 
    /**
     * 声明死信交换器DLX
     * 
     * @param name
     * @return
     */
    @Bean
    public DirectExchange exchangeDelayDlx(@Value("$atf.exchange.delay.dlx") String name) 
        return new DirectExchange(name, true, false);
    
 
    /**
     * 绑定延迟交换器和30s延迟队列
     * 
     * @param queue
     * @param exchange
     * @param routingKey
     * @return
     */
    @Bean
    public Binding bindingDelay(@Qualifier("queueDelay30s") Queue queue,
            @Qualifier("exchangeDelay") DirectExchange exchange,
            @Value("$atf.routingkey.delay.30s") String routingKey) 
        return BindingBuilder.bind(queue).to(exchange).with(routingKey);
    
 
    /**
     * 绑定死信交换器DLX和死信队列DLQ
     * 
     * @param queue
     * @param exchange
     * @param routingKey
     * @return
     */
    @Bean
    public Binding bindingDelayConsume(@Qualifier("queueDelayDlq") Queue queue,
            @Qualifier("exchangeDelayDlx") DirectExchange exchange,
            @Value("$atf.routingkey.delay.dlk") String routingKey) 
        return BindingBuilder.bind(queue).to(exchange).with(routingKey);
    

public void sendDelay(AtfEventPayload payload) 
rabbitTemplate.convertAndSend("atf.exchange.delay", "atf.routingkey.delay.30s", payload);

@RabbitListener(queues = "atf.queue.delay.dlq", containerFactory = "rabbitListenerContainerFactory")
public void consumeDelay(AtfEventPayload payload) 
    try 
        AtWorkflow workflow = workflowSvs.find(payload.getWorkflowName());
        workflowSvs.execute(workflow, null);
     catch (Exception e) 
        e.printStackTrace();
    


————————————————
参考文献:
[1] RabbitMQ - Dead Letter Exchanges(http://www.rabbitmq.com/dlx.html)

以上是关于RabbitMQ - DLX (Dead Letter Exchanges)实现延迟队列的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ通过DLX实现消息延迟接收

RabbitMQ之死信队列

RabbitMQ死信队列

RabbitMQ 死信队列

使用RabbitMQ处理死信队列

死信交换 RabbitMQ 丢弃消息