RabbitMQ 死信队列 定时队列 延时队列
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 死信队列 定时队列 延时队列相关的知识,希望对你有一定的参考价值。
参考技术A Dead-Letter-Exchange利用DLX, 当消息在一个队列中变成死信(dead message)之后, 它能被重新publish到另一个Exchange, 这个Exchange就是DLX
DLX也是一个正常的Exchange, 和一般的Exchange没有区别, 它能在任何队列上被指定, 实际上就是设置某个队列的属性为死信队列
当这个队列中有死信时, RabbitMQ就会自动将这个消息重新发布到设置的Exchange上去, 进而被路由到另一个队列 可以监听这个队列中消息做相应的处理, 这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能
消息被拒绝 (basic.reject or basic.nack) 且带 requeue=false 参数
消息的TTL-存活时间已经过期
队列长度限制被超越(队列满)
首先要设置死信队列的exchange和queue, 然后进行绑定
1. Exchange : dlx.exchange
2. Queue : dlx.queue
3. RoutingKey : #
然后正常声明交换机, 队列, 绑定, 只不过需要在队列加上一个扩展参数即可 : arguments.put(“x-dead-letter-exchange”, “dlx.exchange”);
1. 列信息因消费不及时大量积压,消费方服务先处理不及时会影响到队列的生产者,进而会影响到所有消费此topic的队列
2. 队列的消息超时后消失,生产者和消费者完全无感知,只能靠查落地存储的历史记录,很不方便同时业务无法再消费处理
增加延时队列,通过dead-letter方式实现超过有效期后进入可以被消费的队列中,让消费者处理
计算倒计时,再放入延时队列让消费者处理
可接受队列,延时等待队列,死信队列分别加上特定的前缀
然后通过元注解参数来区分生产者发送的队列类型
在注解监听器监听参数,如果是延时队列发送给延时等待队列,普通队列发送给可接受队列
所有消费者统一消费可接受队列
RabbitMQ实现延时队列(死信队列)
基于队列和基于消息的TTLTTL是time to live 的简称,顾名思义指的是消息的存活时间。rabbitMq可以从两种维度设置消息过期时间,分别是队列和消息本身。 队列消息过期时间-Per-Queue Message TTL: 通过设置队列的x-message-ttl参数来设置指定队列上消息的存活时间,其值是一个非负整数,单位为微秒。不同队列的过期时间互相之间没有影响,即使是对于同一条消息。队列中的消息存在队列中的时间超过过期时间则成为死信。
死信交换机DLX
队列中的消息在以下三种情况下会变成死信 (1)消息被拒绝(basic.reject 或者 basic.nack),并且requeue=false; (2)消息的过期时间到期了; (3)队列长度限制超过了。 当队列中的消息成为死信以后,如果队列设置了DLX那么消息会被发送到DLX。通过x-dead-letter-exchange设置DLX,通过这个x-dead-letter-routing-key设置消息发送到DLX所用的routing-key,如果不设置默认使用消息本身的routing-key.
@Bean
public Queue lindQueue()
return QueueBuilder.durable(LIND_QUEUE)
.withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//设置死信交换机
.withArgument("x-message-ttl", makeCallExpire)
.withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//设置死信routingKey
.build();
实现的过程
完整的代码
@Component
public class AmqpConfig
/**
* 主要测试一个死信队列,功能主要实现延时消费,原理是先把消息发到正常队列,
* 正常队列有超时时间,当达到时间后自动发到死信队列,然后由消费者去消费死信队列里的消息.
*/
public static final String LIND_EXCHANGE = "lind.exchange";
public static final String LIND_DL_EXCHANGE = "lind.dl.exchange";
public static final String LIND_QUEUE = "lind.queue";
public static final String LIND_DEAD_QUEUE = "lind.queue.dead";
public static final String LIND_FANOUT_EXCHANGE = "lindFanoutExchange";
/**
* 单位为微秒.
*/
@Value("$tq.makecall.expire:60000")
private long makeCallExpire;
/**
* 创建普通交换机.
*/
@Bean
public TopicExchange lindExchange()
return (TopicExchange) ExchangeBuilder.topicExchange(LIND_EXCHANGE).durable(true)
.build();
/**
* 创建死信交换机.
*/
@Bean
public TopicExchange lindExchangeDl()
return (TopicExchange) ExchangeBuilder.topicExchange(LIND_DL_EXCHANGE).durable(true)
.build();
/**
* 创建普通队列.
*/
@Bean
public Queue lindQueue()
return QueueBuilder.durable(LIND_QUEUE)
.withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//设置死信交换机
.withArgument("x-message-ttl", makeCallExpire)
.withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//设置死信routingKey
.build();
/**
* 创建死信队列.
*/
@Bean
public Queue lindDelayQueue()
return QueueBuilder.durable(LIND_DEAD_QUEUE).build();
/**
* 绑定死信队列.
*/
@Bean
public Binding bindDeadBuilders()
return BindingBuilder.bind(lindDelayQueue())
.to(lindExchangeDl())
.with(LIND_DEAD_QUEUE);
/**
* 绑定普通队列.
*
* @return
*/
@Bean
public Binding bindBuilders()
return BindingBuilder.bind(lindQueue())
.to(lindExchange())
.with(LIND_QUEUE);
/**
* 广播交换机.
*
* @return
*/
@Bean
public FanoutExchange fanoutExchange()
return new FanoutExchange(LIND_FANOUT_EXCHANGE);
//-----------------
@Component
public class Publisher
@Autowired
private RabbitTemplate rabbitTemplate;
public void publish(String message)
try
rabbitTemplate
.convertAndSend(AmqpConfig.LIND_EXCHANGE, AmqpConfig.LIND_DELAY_QUEUE,
message);
catch (Exception e)
e.printStackTrace();
//-----------------
@Component
@Slf4j
public class Subscriber
@RabbitListener(queues = AmqpConfig.LIND_QUEUE)
public void customerSign(String data)
try
log.info("从队列拿到数据 :", data);
catch (Exception ex)
e.printStackTrace();
以上是关于RabbitMQ 死信队列 定时队列 延时队列的主要内容,如果未能解决你的问题,请参考以下文章