Rabbitmq通过死信队列实现过期监听

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbitmq通过死信队列实现过期监听相关的知识,希望对你有一定的参考价值。

参考技术A 上一篇 <<< Rabbitmq的重试策略
下一篇 >>> Rabbitmq解决分布式事务思路

推荐阅读:
<<< 消息中间件的核心思想
<<< 消息中间件常见问题汇总
<<< 基于Netty简单手写消息中间件思路
<<< 消息队列常用名词与中间件对比
<<< Rabbitmq基础知识
<<< Rabbitmq示例之点对点简单队列
<<< Rabbitmq示例之工作(公平)队列
<<< Rabbitmq示例之发布订阅模式
<<< Rabbitmq示例之路由模式Routing
<<< Rabbitmq示例之通配符模式Topics
<<< Rabbitmq示例之RPC模式
<<< Rabbitmq队列模式总结
<<< Rabbitmq如何保证消息不丢失
<<< Springboot利用AmqpTemplate整合Rabbitmq
<<< Rabbitmq如何保证幂等性
<<< Rabbitmq的重试策略
<<< Rabbitmq解决分布式事务思路
<<< Rabbitmq解决分布式事务demo
<<< Rabbitmq环境安装
<<< Kafka中的专业术语都有哪些
<<< Kafka的设计原理介绍
<<< Kafka集群如何实现相互感知
<<< Kafka如何实现分区及指定分区消费
<<< Kafka如何保证消息顺序消费
<<< Kafka如何保证高吞吐量
<<< Kafka集群环境搭建
<<< RocketMQ架构原理
<<< RocketMQ、RabbitMQ和Kafka的对比
<<< SpringBoot整合RocketMQ示例
<<< RocketMQ保证顺序消费demo
<<< RocketMQ如何动态扩容和缩容
<<< RocketMQ如何解决分布式事务
<<< RocketMQ单机版本安装
<<< RocketMQ集群环境程序启用相关知识点
<<< RocketMQ单机做主备实操
<<< RocketMQ所有配置说明

RabbitMQ实现延时队列(死信队列)

基于队列和基于消息的TTL

TTL是time to live 的简称,顾名思义指的是消息的存活时间。rabbitMq可以从两种维度设置消息过期时间,分别是队列和消息本身。 队列消息过期时间-Per-Queue Message TTL: 通过设置队列的x-message-ttl参数来设置指定队列上消息的存活时间,其值是一个非负整数,单位为微秒。不同队列的过期时间互相之间没有影响,即使是对于同一条消息。队列中的消息存在队列中的时间超过过期时间则成为死信。

死信交换机DLX

队列中的消息在以下三种情况下会变成死信 (1)消息被拒绝(basic.reject 或者 basic.nack),并且requeue=false; (2)消息的过期时间到期了; (3)队列长度限制超过了。 当队列中的消息成为死信以后,如果队列设置了DLX那么消息会被发送到DLX。通过x-dead-letter-exchange设置DLX,通过这个x-dead-letter-routing-key设置消息发送到DLX所用的routing-key,如果不设置默认使用消息本身的routing-key.

 @Bean
  public Queue lindQueue() 
    return QueueBuilder.durable(LIND_QUEUE)
        .withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//设置死信交换机
        .withArgument("x-message-ttl", makeCallExpire)
        .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//设置死信routingKey
        .build();
  

实现的过程

技术图片

完整的代码

@Component
public class AmqpConfig 
  /**
   * 主要测试一个死信队列,功能主要实现延时消费,原理是先把消息发到正常队列,
   * 正常队列有超时时间,当达到时间后自动发到死信队列,然后由消费者去消费死信队列里的消息.
   */
  public static final String LIND_EXCHANGE = "lind.exchange";
  public static final String LIND_DL_EXCHANGE = "lind.dl.exchange";
  public static final String LIND_QUEUE = "lind.queue";
  public static final String LIND_DEAD_QUEUE = "lind.queue.dead";

  public static final String LIND_FANOUT_EXCHANGE = "lindFanoutExchange";
  /**
   * 单位为微秒.
   */
  @Value("$tq.makecall.expire:60000")
  private long makeCallExpire;

  /**
   * 创建普通交换机.
   */
  @Bean
  public TopicExchange lindExchange() 
    return (TopicExchange) ExchangeBuilder.topicExchange(LIND_EXCHANGE).durable(true)
        .build();
  

  /**
   * 创建死信交换机.
   */
  @Bean
  public TopicExchange lindExchangeDl() 
    return (TopicExchange) ExchangeBuilder.topicExchange(LIND_DL_EXCHANGE).durable(true)
        .build();
  

  /**
   * 创建普通队列.
   */
  @Bean
  public Queue lindQueue() 
    return QueueBuilder.durable(LIND_QUEUE)
        .withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//设置死信交换机
        .withArgument("x-message-ttl", makeCallExpire)
        .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//设置死信routingKey
        .build();
  

  /**
   * 创建死信队列.
   */
  @Bean
  public Queue lindDelayQueue() 
    return QueueBuilder.durable(LIND_DEAD_QUEUE).build();
  

  /**
   * 绑定死信队列.
   */
  @Bean
  public Binding bindDeadBuilders() 
    return BindingBuilder.bind(lindDelayQueue())
        .to(lindExchangeDl())
        .with(LIND_DEAD_QUEUE);
  

  /**
   * 绑定普通队列.
   *
   * @return
   */
  @Bean
  public Binding bindBuilders() 
    return BindingBuilder.bind(lindQueue())
        .to(lindExchange())
        .with(LIND_QUEUE);
  

  /**
   * 广播交换机.
   *
   * @return
   */
  @Bean
  public FanoutExchange fanoutExchange() 
    return new FanoutExchange(LIND_FANOUT_EXCHANGE);
  


//-----------------

@Component
public class Publisher 
  @Autowired
  private RabbitTemplate rabbitTemplate;

  public void publish(String message) 
    try 
      rabbitTemplate
          .convertAndSend(AmqpConfig.LIND_EXCHANGE, AmqpConfig.LIND_DELAY_QUEUE,
              message);
     catch (Exception e) 
      e.printStackTrace();
    
  


//-----------------

@Component
@Slf4j
public class Subscriber 
  @RabbitListener(queues = AmqpConfig.LIND_QUEUE)
  public void customerSign(String data) 
    try 

      log.info("从队列拿到数据 :", data);

     catch (Exception ex) 
          e.printStackTrace();
    
  

以上是关于Rabbitmq通过死信队列实现过期监听的主要内容,如果未能解决你的问题,请参考以下文章

利用rabbitMq的死信队列实现延时消息

RabbitMQ实现延时队列(死信队列)

RabbitMQ实现延时队列(死信队列)

RabbitMQ项目使用之死信队列

RabbitMQ的动态创建交换机、队列、绑定、死信队列,延迟队列代码实现

消息队列 - 死信、延迟、重试队列