RabbitMQ - DLX (Dead Letter Exchanges)实现延迟队列
Posted OkidoGreen
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ - DLX (Dead Letter Exchanges)实现延迟队列相关的知识,希望对你有一定的参考价值。
一、延迟队列设计
队列中的消息成为死信(Dead Letter)的几种情况:
1) The message is rejected (basic.reject or basic.nack) with requeue=false,
2) The TTL for the message expires; or
3) The queue length limit is exceeded.
而通过配置队列的x-dead-letter-exchange及x-dead-letter-routing-key键值,Dead Letter就会被重新发送到指定的DLX中。
如上图所示,注意延迟队列并没有消费者,并且设置参数:
x-dead-letter-exchange:DLX
x-dead-letter-routing-key:DLK
x-max-length:1000000 // length = 1million
x-message-ttl:30000 // TTL = 30s
生产者发布的消息路由到该延迟队列,30秒后消息成为死信并被重新发送到死信交换器DLX中;消费者订阅死信队列DLQ,消费消息。
二、延迟队列实现
@Configuration
@PropertySource("classpath:rabbitmq-cfg.properties")
public class RabbitConfig
/**
* 配置RabbitMQ连接工厂
*
* @param host
* @param port
* @param username
* @param password
* @param virtualHost
* @param connectionTimeout
* @return
*/
@Bean
public ConnectionFactory connectionFactory(@Value("$spring.rabbitmq.host") String host,
@Value("$spring.rabbitmq.port") String port, @Value("$spring.rabbitmq.username") String username,
@Value("$spring.rabbitmq.password") String password,
@Value("$spring.rabbitmq.virtual-host") String virtualHost,
@Value("$spring.rabbitmq.connection-timeout") String connectionTimeout)
CachingConnectionFactory ret = new CachingConnectionFactory();
ret.setHost(host);
ret.setPort(Integer.parseInt(port));
ret.setUsername(username);
ret.setPassword(password);
ret.setVirtualHost(virtualHost);
ret.setConnectionTimeout(Integer.parseInt(connectionTimeout));
return ret;
/**
* 声明30s延迟队列
*
* @param name
* @return
*/
@Bean
public Queue queueDelay30s(@Value("$atf.queue.delay.30s") String name)
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 30 * 1000); // Message TTL = 30s
arguments.put("x-max-length", 1000000); // Max length = 1million
// 死信路由到死信交换器DLX
arguments.put("x-dead-letter-exchange", "atf.exchange.delay.dlx");
arguments.put("x-dead-letter-routing-key", "atf.routingkey.delay.dlk");
return new Queue(name, true, false, false, arguments);
/**
* 声明死信队列DLQ
*
* @param name
* @return
*/
@Bean
public Queue queueDelayDlq(@Value("$atf.queue.delay.dlq") String name)
return new Queue(name, true, false, false);
/**
* 声明延迟交换器
*
* @param name
* @return
*/
@Bean
public DirectExchange exchangeDelay(@Value("$atf.exchange.delay") String name)
DirectExchange ret = new DirectExchange(name, true, false);
ret.setInternal(false);
return ret;
/**
* 声明死信交换器DLX
*
* @param name
* @return
*/
@Bean
public DirectExchange exchangeDelayDlx(@Value("$atf.exchange.delay.dlx") String name)
return new DirectExchange(name, true, false);
/**
* 绑定延迟交换器和30s延迟队列
*
* @param queue
* @param exchange
* @param routingKey
* @return
*/
@Bean
public Binding bindingDelay(@Qualifier("queueDelay30s") Queue queue,
@Qualifier("exchangeDelay") DirectExchange exchange,
@Value("$atf.routingkey.delay.30s") String routingKey)
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
/**
* 绑定死信交换器DLX和死信队列DLQ
*
* @param queue
* @param exchange
* @param routingKey
* @return
*/
@Bean
public Binding bindingDelayConsume(@Qualifier("queueDelayDlq") Queue queue,
@Qualifier("exchangeDelayDlx") DirectExchange exchange,
@Value("$atf.routingkey.delay.dlk") String routingKey)
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
public void sendDelay(AtfEventPayload payload)
rabbitTemplate.convertAndSend("atf.exchange.delay", "atf.routingkey.delay.30s", payload);
@RabbitListener(queues = "atf.queue.delay.dlq", containerFactory = "rabbitListenerContainerFactory")
public void consumeDelay(AtfEventPayload payload)
try
AtWorkflow workflow = workflowSvs.find(payload.getWorkflowName());
workflowSvs.execute(workflow, null);
catch (Exception e)
e.printStackTrace();
————————————————
参考文献:
[1] RabbitMQ - Dead Letter Exchanges(http://www.rabbitmq.com/dlx.html)
以上是关于RabbitMQ - DLX (Dead Letter Exchanges)实现延迟队列的主要内容,如果未能解决你的问题,请参考以下文章