RabbitMQ项目使用之死信队列

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ项目使用之死信队列相关的知识,希望对你有一定的参考价值。

参考技术A 消息消费失败处理方式:

一 进入死信队列(进入死信的三种方式)

1.消息被拒绝(basic.reject or basic.nack)并且requeue=false

2.消息TTL过期过期时间

3.队列达到最大长度

DLX也是一下正常的Exchange同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列, publish可以监听这个队列中消息做相应的处理, 这个特性可以弥补R abbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。

rabbitmq的三种模式:

一. Fanout Exchange  广播

所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。Fanout Exchange  不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。所以,Fanout Exchange 转发消息是最快的。

二. Direct Exchange  点对点

所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

三. Topic Exchange  模糊匹配

所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.*” 只会匹配到“log.error”。所以,Topic Exchange 使用非常灵活。

mq也支持重发机制:

rabbitmq的消息确认机制分两部分一部分是生产端,一部分是消费端生产端有两种选择,transaction   和   confirm。confirm  的性能要好于transaction

//transaction 机制

channel.txSelect();

String msg ="msg  test !!!";

for(inti=0;i<10000;i++)

   msg = i+" : msg  test !!!";

  channel.basicPublish(EXCHAGE, QUEUE_NAME,null,msg.getBytes());

  System.out.println("publish msg "+msg);

  if(i>0&&i%100==0)

    //批量提交

    channel.txCommit();

   

  // 若出现异常 进行 channel.txRollback(),对相应批次的msg进行重发或记录

channel.txCommit();

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:rabbit="http://www.springframework.org/schema/rabbit"

xsi:schemaLocation="

        http://www.springframework.org/schema/beans

        http://www.springframework.org/schema/beans/spring-beans.xsd

        http://www.springframework.org/schema/rabbit

         http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

<rabbit:connection-factoryid="connectionFactory"

    host="10.153.25.15"username="insurance"password="insurance"port="5672"/>

<rabbit:adminconnection-factory="connectionFactory"/>

<rabbit:queueid="queue_insurance"durable="true"auto-delete="false"

        exclusive="false"name="queue_insurance">正常队列当中指向死信

<rabbit:queue-arguments>

        <entrykey="x-message-ttl">设置超时

            <valuetype="java.lang.Long">30000

        </entry>

        <entrykey="x-dead-letter-exchange">指定交换机

            <valuetype="java.lang.String">alter</value>

        </entry>

  </rabbit:queue-arguments>

</rabbit:queue>

<rabbit:queueid="alter_queue"durable="true"auto-delete="false"exclusive="false"name="alter_queue"/>死信队列

<rabbit:direct-exchangename="alter"
        durable="true"auto-delete="false"id="alter">死信交换机

        <rabbit:bindings>

             <rabbit:bindingqueue="alter_queue"key="queue_key_insurance"/>

        </rabbit:bindings>
< /rabbit:direct-exchange>

<rabbit:direct-exchangename="exchange_insurance"
      durable="true"auto-delete="false"id="exchange_insurance">正常交换机

     <rabbit:bindings>

           <rabbit:bindingqueue="queue_insurance"key="queue_key_insurance"/>

     </rabbit:bindings>
< /rabbit:direct-exchange>

<!-- (5)客户端投递消息到exchange。 -->

<rabbit:templateid="amqpTemplate"exchange="exchange_insurance"

      connection-factory="connectionFactory"/>

</beans>

消费者配置:

<?xml version="1.0"encoding="UTF-8"?>

<beansxmlns="http://www.springframework.org/schema/beans"

     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

      xmlns:rabbit="http://www.springframework.org/schema/rabbit"

      xsi:schemaLocation="

              http://www.springframework.org/schema/beans

              http://www.springframework.org/schema/beans/spring-beans.xsd

              http://www.springframework.org/schema/rabbit

               http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

<!-- 连接服务配置 -->
<rabbit:connection-factoryid="connectionFactory2"host="10.153.25.15"

               username="insurance"password="insurance"port="5672"/>

<rabbit:adminconnection-factory="connectionFactory2"/>

<!-- queue 队列声明 -->
<!-- queue 队列声明  name 队里的额name 是关联生产表和消费表的为唯一线索  -->

<rabbit:queueid="queue_insurance"name="queue_insurance">

        <rabbit:queue-argumentsvalue-type="java.lang.Long">

             <entrykey="x-message-ttl"value="30000"/>

        </rabbit:queue-arguments>

</rabbit:queue>

<!-- 定义消费者监听器 -->

<!-- 创建一个bean实例,bean实例中声明处理请求的类 -->

<beanid="consumerLitener2"class="com.insurance.mq.CommissionController"></bean>
<rabbit:listener-container connection-factory="connectionFactory2"acknowledge="auto"concurrency="8">
<!-- queues属性从那个队列中接收消息,ref属性是当存在消息是使用哪个类去处理 -->
< rabbit:listenerqueues="queue_insurance"ref="consumerLitener2"/>
</rabbit:listener-container>

</beans>

rabbitmq死信队列及延迟队列

RabbitMQ死信队列及延迟队列

新手笔记,存在错误还请指正

尚硅谷RabbitMQ
rabbitmq官方文档

RabbitMQ死信队列及延迟队列

死信队列:

死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息。进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时
间未支付时自动失效。
死信的来源
消息 TTL 过期
队列达到最大长度(队列满了,无法再添加数据到 mq 中)
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

死信的例子

消息 TTL 过期


生产者代码:


/**
 * 死信队列,生产者
 */
public class Producer 
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    public static void main(String[] args) throws Exception
        Channel channel= (Channel) RabbitMqUtils.getChannel();
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
         //设置死信消息的 TTL 时间   10s
        AMQP.BasicProperties properties =
                new AMQP.BasicProperties().builder().expiration("10000").build();
         //该信息是用作演示队列个数限制
        for (int i = 1; i <11 ; i++) 
            String message="info"+i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
            System.out.println("生产者发送消息:"+message);
        

    


消费者01代码

/**
 * 死信队列
 * 消费者1
 */
public class Consumer01 
    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        //声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
       //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信队列绑定死信交换机与 routingkey
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
//        //过期时间
//        params.put("x-message-ttl",10000);//10s
      //正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
     //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        //声明正常队列
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        //普通队列绑定普通交换机与 routingkey
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");


        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) ->
        String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer01 接收到消息"+message);
        ;
        channel.basicConsume(normalQueue,true,deliverCallback,consumerTag -> );

    



消费者 2 代码(以上步骤完成后 启动 2 消费者 它消费死信队列里面的消息)


/**
 * 死信队列
 * 消费者2
 */
public class Consumer02 
    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) ->
        String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer01 接收到消息"+message);
        ;
        channel.basicConsume(deadQueue,true,deliverCallback,consumerTag -> );

    


队列达到最大长度

1、 消息生产者代码去掉 TTL 属性
2、1 消费者修改以下代码(启动之后关闭该消费者 模拟其接收不到消息)


     //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        //声明正常队列长度限制
        params.put("x-max-length",6);

注意此时需要把原先队列删除 因为参数改变了.
3. 2 消费者代码不变(启动 2 消费者)

消息被拒

1.消息生产者代码同上生产者一致
2.1 消费者代码(启动之后关闭该消费者 模拟其接收不到消息)



/**
 * 死信队列
 * 消费者1
 */
public class Consumer01 
    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        //声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
       //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信队列绑定死信交换机与 routingkey
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
//        //过期时间
//        params.put("x-message-ttl",10000);//10s
      //正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
     //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        //声明正常队列长度限制
       // params.put("x-max-length",6);
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        //普通队列绑定普通交换机与 routingkey
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");


        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) ->
        String msg = new String(delivery.getBody(), "UTF-8");
        if(msg.equals("info5"))
        
            System.out.println("Consumer01 接收到消息"+msg+"此消息是被c1拒绝的");
            //拒绝该消息(不放回普通队列)/requeue 设置为 false 代表拒绝重新入队
            channel.basicReject(delivery.getEnvelope().getDeliveryTag(),false);
        else
            System.out.println("Consumer01 接收到消息"+msg);
        

        ;
        //开启手动应答
        channel.basicConsume(normalQueue,false,deliverCallback,consumerTag -> );

    


延迟队列

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望
在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的
元素的队列。

延迟队列使用场景

1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
即:需要在某个事件发生之后或者之前的指定时间点完成某一项任务。
如:
发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎
使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果
数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,
如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支
付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十
分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万
级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单
的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

延迟队列测试

引入的依赖:

<!--amqp-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
				<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.76</version>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
		</dependency>
		<!--swagger-->
		<dependency>
			<groupId>io.springfox</groupId>
			<artifactId>springfox-swagger2</artifactId>
			<version>2.9.2</version>
		</dependency>
		<dependency>
			<groupId>io.springfox</groupId>
			<artifactId>springfox-swagger-ui</artifactId>
			<version>2.9.2</version>
		</dependency>
		<!--RabbitMQ 测试依赖-->
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit-test</artifactId>
			<scope>test</scope>
		</dependency>

properties配置文件:

spring.rabbitmq.host=xxx.xxx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.mvc.pathmatch.matching-strategy=ant_path_matcher  #springboot版本过高时报空指针错误,加上这句话

添加Swagger 配置类

package com.atguigu.rabbitmq.Config;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerConfig 
    public Docket webApiConfig()
        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select()
                .build();
           
       private ApiInfo webApiInfo()
        return new ApiInfoBuilder()
                .title("rabbitmq 接口文档")
                .description("本文档描述了 rabbitmq 微服务接口定义")
                .version("1.0")
                .contact(new Contact("enjoy6288", "http://atguigu.com","1551388580@qq.com"))
                 .build();
    



代码架构

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是direct,创建一个死信队列 QD,它们的绑定关系如下:

配置文件类代码

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * TTL队列,配置文件类代码
 */
@Configuration
public class TtlQueueConfig 
    //普通交换机的名称
    public static final String X_EXCHANGE = "X";

    //普通队列的名称
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B="QB";

    //死信交换机的名称
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //死信队列的名称
    public static final String DEAD_LETTER_QUEUE="QD";

    // 声明xExchange
    @Bean("xExchange")
    public DirectExchange xExchange() 
        return new DirectExchange(X_EXCHANGE);
    
    // 声明yExchange
    @Bean("yExchange")
    public DirectExchange yExchange()
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    
   //声明队列Attl为10s并绑定到对应的死信交换机
   @Bean("queueA")
   public Queue queueA()
       Map<String,Object> args=new HashMap<>(3);
        //声明当前队列绑定的死信交换机
       args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由key
       args.put("x-dead-letter-routing-key","YD");
        //声明队列的TTL
       args.put("x-message-ttl",10000);
       return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    

    // 声明队列A绑定X交换机
    @Bean
    public Binding queueaBindingX(@Qualifier("queueA")Queue queueA, @Qualifier("xExchange")DirectExchange xExchange) 
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    
    //声明队列Bttl为40s并绑定到对应的死信交换机
    @Bean("queueB")
    public Queue queueB()
        Map<String,Object>args=new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key","YD");
        //声明队列的TTL
        args.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    
    //声明队列B绑定X交换机
    @Bean
    public Binding queuebBindingX(@Qualifier("queueB")Queue queue1B,@Qualifier("xExchange")DirectExchange xExchange)
        return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
   

    //声明死信队列QD
    @Bean("queueD")
    public Queue queueD()
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    
    //声明死信队列QD绑定关系
    @Bean
    public Binding deadLetterBindingQAD(@Qualifier("queueD")Queue queueD,@Qualifier("yExchange")DirectExchange yExchange)
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    



消息生产者代码


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.Date;

/**
 *  * @author lenovo
 * 发送延迟消息
 * http:/localhost:8080/ttl/sendMsg/嘻嘻嘻
 */
@Slf4j
@RequestMapping("/ttl")
@RestController
public class SendMsgController
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //开始发消息
    @GetMapping("/sendMsg/message")
    public void sendMsg(@PathVariable String message)
    
        log.info("当前时间:,发送一条信息给两个TTL队列:", new Date(), message);
        rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10S的队列: " + message);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40S的队列: " + message);

    




消息消费者代码

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;

/**
 * 队列TTL  消费者
 */
@Slf4j
@Component
public class DeadLetterQueueConsumer 

    @RabbitListener(queues="QD")
    public void receiveD(Message message, Channel channel) throws IOException 
        String msg = new String(message.getBody());
        log.info("当前时间:,收到死信队列信息", new Date().toString(), msg);
    



启动服务发送消息

http://localhost:8080/ttl/sendMsg/嘻嘻嘻

延迟队列优化

新增了一个队列QC,绑定关系如下,该队列不设置TTL时间

配置文件类代码修改

@Configuration
public class TtlQueueConfig 
    //普通交换机的名称
    public static final String X_EXCHANGE = "X";

    //普通队列的名称
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B="QB";
    //增加普通·队列qc
    public static final String QUEUE_C="QC";
    ......
    ......
    ......
 //声明队列C死信交换机
    @Bean("queueC")
    public Queue queueC()
        Map<String,Object>args=new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key","YD");
        //没有声明TTL属性
        return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
    
   //声明队列C绑定X交换机
   @Bean
   public Binding queuecBindingX(@Qualifier("queueC")Queue queue1C,@Qualifier("xExchange")DirectExchange xExchange)
       return BindingBuilder.bind(queue1C).to(xExchange).with("XC");
   

消息生产者代码

@GetMapping("sendExpirationMsg/message/ttlTime")
    public void sendMsg (@PathVariable String  message,@PathVariable String ttlTime) 
        rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->
        
            //发送消息的时候延迟时长
            correlationData.getMessageProperties().setExpiration(ttlTime);
                return correlationData;
         );
        log.info("当前时间:,发送一条时长(毫秒TTL信息给队列c: ", new Date(), ttlTime, message);
    

发起请求:

http://localhost:8080/ttl/sendExpirationMsg/你好1/20000
http://localhost:8080/ttl/sendExpirationMsg/你好2/2000


看起来似乎没什么问题,但是如果使用在消息属性上设置TTL的方式,消息可能并不会按时“死亡“,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。即队列先进先出的特性,耗时短的消息仍在耗时长的消息之后排队。这个特性的不足可以使用插件进行了弥补。

基于插件实现的延迟队列

如果不能实现在消息粒度上的TTL,并使其在设置的TTL时间及时死亡,就无法设计成一个通用的延时队列。那如何解决呢->rabbitmq_delayed_message_exchange插件(基于插件的延迟队列实现是在交换机部分,所以可以解决上述问题)
官网下载地址
下载好后解压放置到RabbitMQ的插件目录。
linux下是进入RabbitMQ的安装目录下的plgins目录,执行下面命令让该插件生效,然后重

RabbitMQ /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-pluginsenablerabbitmq_delayed_message_exchange


代码架构图

新增了一个队列delayed.queue,一个自定义交换机delayed.exchange,绑定关系如下:
配置文件类代码
在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 基于插件的延迟队列配置
 */
@Configuration
public class DelayedQueueConfig 

    //队列
    public static final String DELAYED_QUEUE_NAME="delayed.queue";
    //交换机
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY="delayed.routingkey";
    @Bean
    public Queue delayedQueue()
        return new Queue(DELAYED_QUEUE_NAME);
    
    //自定义交换机我们在这里定义的是一个延迟交换机
    @Bean
    public CustomExchange delayedExchange()
    
        Map<String,Object> args=new HashMap<>();//自定义交换机的类型
        args.put("x-delayed-type","direct");

        /**
         *     1.交换机的名称
         *     2.交换机的类型
         *     3.是否需要持久化
         *     4.是否需要自动删除
         *     5.其他参数
         */
        return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,args);
    

    //绑定
    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue")Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange)
        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    


消息生产者代码

/**
 * 基于插件的消息及延迟
 */
      public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
      public static final String DELAYED_ROUTING_KEY="delayed.routingkey";
    @GetMapping("sendDelayMsg/message/delayTime")
    public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime)
          rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME,DELAYED_ROUTING_KEY,message,correlationData->
            correlationData.getMessageProperties().setDelay(delayTime);
            return correlationData;
        );
        log.info("当前时间:, 发送一条延迟毫秒的信息给延迟队列", new Date(), delayTime, message);
    

消息消费者代码

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 消费者,基于插件的延迟消息
 */
@Slf4j
@Component
public class DelayQueueConsumer 
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    //监听消息
    @RabbitListener(queues=DELAYED_QUEUE_NAME)
    public void receiveDelayedQueue(Message message) 
        String msg = new String(message.getBody());
        log.info("当前时间:,收到延时队列的消息:", new Date().toString(), msg);
    



发起请求:

http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000 
http://localhost:8080/ttl/sendDelayMsg/comeonbaby2/2000

第二个消息被先消费掉了,符合预期.

延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失.
当然,延时队列还有很多其它选择,比如利用Java的DelayQueue,利用Redis的zset,利用Quartz 或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景.

以上是关于RabbitMQ项目使用之死信队列的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ学习(中)——交换机死信队列和延迟队列

RabbitMq高级特性之死信队列 通俗易懂 超详细 内含案例

springBoot集成rabbitmq 之延时(死信)队列

RabbitMQ之死信队列

SpringBoot集成RabbitMQ之死信队列限流队列延迟队列(第四节)

RabbitMQ之过期时间(TTL)