RabbitMQ队列延迟

Posted ly-0919

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ队列延迟相关的知识,希望对你有一定的参考价值。

 

RabbitMQ队列延迟

1. 场景:

“订单下单成功后,15分钟未支付自动取消”  
1.传统处理超时订单
     采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,
     并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。当然传统的手法还可以再优化一下,
     即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,
     然后再做其他的业务操作
   2.rabbitMQ延时队列方案
     一台普通的rabbitmq服务器单队列容纳千万级别的消息还是没什么压力的,而且rabbitmq集群扩展支持的也是非常好的,
     并且队列中的消息是可以进行持久化,即使我们重启或者宕机也能保证数据不丢失

2. TTL和DLX

   rabbitMQ中是没有延时队列的,也没有属性可以设置,只能通过死信交换器(DLX)和设置过期时间(TTL)结合起来实现延迟队列
   1.TTL
     TTL是Time To Live的缩写, 也就是生存时间。
     RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。
     如果两种方式一起使用消息对TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。
     默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消费,否则丢弃。
     设置消息的过期时间用 x-message-ttl 参数实现,单位毫秒。
     设置队列的过期时间用 x-expires 参数,单位毫秒,注意,不能设置为0。
   2.DLX和死信队列
     DLX即Dead-Letter-Exchange(死信交换机),它其实就是一个正常的交换机,能够与任何队列绑定。
     死信队列是指队列(正常)上的消息(过期)变成死信后,能够后发送到另外一个交换机(DLX),然后被路由到一个队列上,
     这个队列,就是死信队列
     成为死信一般有以下几种情况:
     消息被拒绝(basic.reject or basic.nack)且带requeue=false参数
     消息的TTL-存活时间已经过期
     队列长度限制被超越(队列满)
    
     注1:如果队列上存在死信, RabbitMq会将死信消息投递到设置的DLX上去 ,
     注2:通过在队列里设置x-dead-letter-exchange参数来声明DLX,如果当前DLX是direct类型还要声明
          x-dead-letter-routing-key参数来指定路由键,如果没有指定,则使用原队列的路由键
   

3. 延迟队列

   通过DLX和TTL模拟出延迟队列的功能,即,消息发送以后,不让消费者拿到,而是等待过期时间,变成死信后,发送给死信交换机再路由到死信队列进行消费
  
   注1:延迟队列(即死信队列)产生流程见“images/01 死信队列产生流程.png”

4. 开发步骤

   1.生产者创建一个正常消息,并添加消息过期时间/死信交换机/死信路由键这3个参数
     关键代码1
     new Queue(name, durable, exclusive, autoDelete, arguments);
     new Queue(NORMAL_QUEUE, true, false, false, map)
     参数说明:
     name:队列名字
     durable:true则持久队列
     exclusive:如果我们声明一个排他队列(该队列将仅由声明者的连接使用),则为true
     autoDelete:服务器不再使用时应删除队列,则为true
     arguments:用于声明队列的参数
       map.put("x-message-ttl", 10000);//message在该队列queue的存活时间最大为10秒
       map.put("x-dead-letter-exchange", DELAY_EXCHANGE); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
       map.put("x-dead-letter-routing-key", DELAY_ROUTING_KEY);//x-dead-letter-routing-key参数是给这个DLX指定路由键
     关键代码2
     new DirectExchange(NORMAL_EXCHANGE, true, false);
   2.消费者A
     正常情况下,由消费者A去消费队列“normal-queue”中的消息,但实际上没有,而是等消息过期
   3.消费者B
     消息过期后,变成死信,根据配置会被投递到DLX,然后根据死信路由键投到死信队列(即延时队列)中
 

5. 子模块间共享Model

   1.创建公共子模块common
     添加公共的JavaBean对象,并使用lombok简化代码
     @Data:会为类的所有属性自动生成setter/getter、equals、canEqual、hashCode、toString方法
     @NoArgsConstructor:无参构造器
     @AllArgsConstructor:全参构造器
    
   2.主模块
     <!-- 1.packaging模式改为pom -->
     <packaging>pom</packaging>
     <!-- 2.添加子模块 -->
     <modules>
        <module>rabbitmq-provider</module>
        <module>rabbitmq-consumer</module>
        <module>common</module>
     </modules>
    
   3.各子模块
     <!-- 1.packaging模式改为jar -->
     <packaging>jar</packaging>
   4.配置公共common模块
  
   在主模块的POM的<dependencies>中添加公共子模块common
   <dependencies>
     <!--添加子模块common-->
     <dependency>
       <groupId>com.zking</groupId>
       <artifactId>common</artifactId>
       <version>0.0.1-SNAPSHOT</version>
     </dependency>
     ...
   </dependencies>
 

看代码

创建一个工程rabbitmq03  ,普通maven项目
pom.xml
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 4   <modelVersion>4.0.0</modelVersion>
 5   <parent>
 6     <groupId>org.springframework.boot</groupId>
 7     <artifactId>spring-boot-starter-parent</artifactId>
 8     <version>2.2.2.RELEASE</version>
 9     <relativePath/> <!-- lookup parent from repository -->
10   </parent>
11   <groupId>com.yuan</groupId>
12   <artifactId>rabbitmq03</artifactId>
13   <version>0.0.1-SNAPSHOT</version>
14   <name>rabbitmq03</name>
15   <packaging>pom</packaging>
16   <description>Demo project for Spring Boot</description>
17 
18   <properties>
19     <java.version>1.8</java.version>
20   </properties>
21 
22   <modules>
23     <module>rabbitmq-provider</module>
24     <module>rabbitmq-consumer</module>
25   </modules>
26 
27   <dependencies>
28     <dependency>
29       <groupId>org.springframework.boot</groupId>
30       <artifactId>spring-boot-starter-amqp</artifactId>
31     </dependency>
32     <dependency>
33       <groupId>junit</groupId>
34       <artifactId>junit</artifactId>
35       <scope>test</scope>
36     </dependency>
37     <dependency>
38       <groupId>org.springframework.boot</groupId>
39       <artifactId>spring-boot-starter-web</artifactId>
40     </dependency>
41 
42     <dependency>
43       <groupId>org.projectlombok</groupId>
44       <artifactId>lombok</artifactId>
45       <version>1.18.10</version>
46       <scope>provided</scope>
47     </dependency>
48 
49   </dependencies>
50 
51   <build>
52     <plugins>
53       <plugin>
54         <groupId>org.springframework.boot</groupId>
55         <artifactId>spring-boot-maven-plugin</artifactId>
56       </plugin>
57     </plugins>
58   </build>
59 
60 </project>

 

创建生产者模块rabbitmq-provider
技术图片

 

 

pom.xml
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5 
 6     <parent>
 7         <groupId>com.yuan</groupId>
 8         <artifactId>rabbitmq03</artifactId>
 9         <version>0.0.1-SNAPSHOT</version>
10     </parent>
11     <artifactId>rabbitmq-provider</artifactId>
12     <version>0.0.1-SNAPSHOT</version>
13     <name>rabbitmq-provider</name>
14     <description>子模块-生产者</description>
15     <packaging>jar</packaging>
16 </project>

QueueDelayConfig

 1 package com.yuan.rabbitmqprovider.rabbitmq;
 2 
 3 
 4 import org.springframework.amqp.core.Binding;
 5 import org.springframework.amqp.core.BindingBuilder;
 6 import org.springframework.amqp.core.DirectExchange;
 7 import org.springframework.amqp.core.Queue;
 8 import org.springframework.context.annotation.Bean;
 9 import org.springframework.context.annotation.Configuration;
10 
11 import javax.lang.model.element.NestingKind;
12 import java.util.HashMap;
13 import java.util.Map;
14 
15 @Configuration
16 public class QueueDelayConfig {
17 
18     /**
19      * 定义正常的队列、交换机、路由键
20      */
21     public static final String NORMAL_QUEUE="normal-queue";
22     public static final String NORMAL_EXCHANGE="normal-exchange";
23     public static final String NORMAL_ROUTINGKEY="normal-routingkey";
24 
25     /**
26      * 定义死信的队列、交换机、路由键
27      */
28     public static final String DELAY_QUEUE="delay-queue";
29     public static final String DELAY_EXCHANGE="delay-exchange";
30     public static final String DELAY_ROUTINGKEY="delay-routingkey";
31 
32 
33     /**
34      * 定义正常队列
35      * @return
36      */
37     @Bean
38     public Queue normalQueue(){
39         //设定消息过期时间/死信交换机/死信路由键3个参数
40         Map<String, Object> map = new HashMap<String, Object>();
41         map.put("x-message-ttl", 15000);//message在该队列queue的存活时间最大为15秒
42         map.put("x-dead-letter-exchange", DELAY_EXCHANGE); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
43         map.put("x-dead-letter-routing-key", DELAY_ROUTINGKEY);//x-dead-letter-routing-key参数是给这个DLX指定路由键
44 
45         return new Queue(NORMAL_QUEUE, true, false, false, map);
46     }
47 
48     @Bean
49     public DirectExchange normalExchange(){
50         return new DirectExchange(NORMAL_EXCHANGE, true, false);
51     }
52 
53     @Bean
54     public Binding normalRoutingkey(){
55         return BindingBuilder.bind(normalQueue())
56                 .to(normalExchange())
57                 .with(NORMAL_ROUTINGKEY);
58     }
59 
60 
61     /**
62      * 定义死信队列
63      */
64     @Bean
65     public Queue delayQueue(){
66         return new Queue(DELAY_QUEUE, true);
67     }
68 
69     @Bean
70     public DirectExchange delayExchange(){
71         return new DirectExchange(DELAY_EXCHANGE);
72     }
73 
74     @Bean
75     public Binding delayRoutingkey(){
76         return BindingBuilder.bind(delayQueue())
77                 .to(delayExchange())
78                 .with(DELAY_ROUTINGKEY);
79     }
80 
81 
82 
83 
84 }

SendController

 1 package com.yuan.rabbitmqprovider.controller;
 2 
 3 
 4 import com.yuan.rabbitmqprovider.rabbitmq.QueueDelayConfig;
 5 import lombok.extern.slf4j.Slf4j;
 6 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 7 import org.springframework.beans.factory.annotation.Autowired;
 8 import org.springframework.web.bind.annotation.RequestMapping;
 9 import org.springframework.web.bind.annotation.RestController;
10 
11 import java.time.LocalDateTime;
12 import java.time.format.DateTimeFormatter;
13 import java.util.HashMap;
14 import java.util.Map;
15 
16 @RestController
17 @Slf4j
18 public class SendController  {
19 
20     @Autowired
21     private RabbitTemplate rabbitTemplate;
22 
23     @RequestMapping("/sender")
24     public Map<String, Object> sender(){
25         Map<String, Object> data = this.createData();
26 
27         rabbitTemplate.convertAndSend(QueueDelayConfig.NORMAL_EXCHANGE,
28                 QueueDelayConfig.NORMAL_ROUTINGKEY,data);
29         Map<String, Object> result = new HashMap<String, Object>();
30     result.put("msg","OK");
31     result.put("code","1");
32     return result;
33     }
34 
35 
36 
37     private Map<String, Object> createData(){
38         Map<String, Object> map = new HashMap<String, Object>();
39 
40         String date = LocalDateTime.now().format(DateTimeFormatter.BASIC_ISO_DATE.
41                 ofPattern("yyyy-MM-dd HH:mm:ss"));
42         map.put("msg","hello rabbitmq!!");
43         map.put("success",true);
44         map.put("createdate", date);
45 
46 
47         return map;
48     }
49 
50 
51 
52 }

 

最后配置一下yml文件
 1 server:
 2   port: 8081
 3   servlet:
 4     context-path: /rabbitmq-provider
 5 spring:
 6   rabbitmq:
 7     virtual-host: /
 8     username: guest
 9     password: guest
10     host: 192.168.238.129
11     port: 5672

 

创建消费者模块rabbitmq-consumer
pom.xml
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5     <parent>
 6         <groupId>com.yuan</groupId>
 7         <artifactId>rabbitmq03</artifactId>
 8         <version>0.0.1-SNAPSHOT</version>
 9     </parent>
10     <artifactId>rabbitmq-consumer</artifactId>
11     <version>0.0.1-SNAPSHOT</version>
12     <name>rabbitmq-consumer</name>
13     <description>子模块-消费者</description>
14     <packaging>jar</packaging>
15 </project>

 

QueueRecevier
 1 package com.yuan.rabbitmqconsumer.controller;
 2 
 3 import lombok.extern.slf4j.Slf4j;
 4 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 5 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 6 import org.springframework.stereotype.Component;
 7 
 8 import java.util.Map;
 9 
10 @Component
11 @Slf4j
12 @RabbitListener(queues = {"delay-queue"})  //消费端监听队列,如果delay-queue死信队列中有消息过来就会被消费掉
13 public class QueueRecevier {
14 
15     @RabbitHandler
16     public void handlerMessage(Map<String, Object> data){
17         log.info("QueueRecevier.handlerMessage,data={}",data);
18     }
19 
20 
21 
22 
23 }

标红处的log使用需要下载一个插件Lombok

技术图片

 

 

直接右边install, 然后重启idea

 

yml文件配置
 1 server:
 2   port: 8082
 3   servlet:
 4     context-path: /rabbitmq-consumer
 5 spring:
 6   rabbitmq:
 7     virtual-host: /
 8     username: guest
 9     password: guest
10     host: 192.168.238.129
11     port: 5672

 

启动生产者,访问http://localhost:8081/rabbitmq-provider/sender  发送请求。
 
 
技术图片
 
生产端推送消息到正常队列等待被消费,我们设定的过期时间是15秒,,,
技术图片

 

 

技术图片

 

 启动消费端,消费端会根据我们设定的监听去监听队列中是否有消息有则会被消费掉。。

技术图片

 

6. json转换

   1.生产者
     @Bean
     public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);//指定json转换器
        return rabbitTemplate;
     }
   2.消费者
     @Bean
     public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jackson2JsonMessageConverter);
        return factory;
    
}
 
创建公共子模块common-vo
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5     <parent>
 6         <groupId>com.yuan</groupId>
 7         <artifactId>rabbitmq03</artifactId>
 8         <version>0.0.1-SNAPSHOT</version>
 9     </parent>
10     <artifactId>common-vo</artifactId>
11     <version>0.0.1-SNAPSHOT</version>
12     <name>common-vo</name>
13     <packaging>jar</packaging>
14     <description>公共子模块</description>
15 
16 
17 
18 </project>

 

创建一个model的Package,创建一个Order

package com.yuan.commonvo.model;

import lombok.Data;

import java.lang.reflect.ParameterizedType;
import java.util.Date;


@Data
public class Order {

    private  long orderId;
    private  String orderNo;
    private Date createdate;


}

 

vo包下创建一个OrderVo

 
package com.yuan.commonvo.vo;

import com.yuan.commonvo.model.Order;


public class OrderVo extends Order {

}

 

完了之后在父模块中添加common-vo子模块的一个pom依赖

<modules>
    <module>rabbitmq-provider</module>
    <module>rabbitmq-consumer</module>
    <module>common-vo</module>
  </modules>


<dependency>
<groupId>com.yuan</groupId>
<artifactId>common-vo</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>

 

修改生产者SendController

 @RequestMapping("/sender")
    public Map<String, Object> sender(){
//        Map<String, Object> data = this.createData();

        OrderVo orderVo = new OrderVo();
        orderVo.setOrderId(1);
        orderVo.setOrderNo("P001");

        rabbitTemplate.convertAndSend(QueueDelayConfig.NORMAL_EXCHANGE,
                QueueDelayConfig.NORMAL_ROUTINGKEY,orderVo);
        Map<String, Object> result = new HashMap<String, Object>();
    result.put("msg","OK");
    result.put("code","1");
    return result;
    }

 

添加QueueProviderMessageConvert

package com.yuan.rabbitmqprovider.rabbitmq;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueProviderMessageConvert {

@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate=new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}

@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}

}

 

 修改消费端QueueRecevier

package com.yuan.rabbitmqconsumer.controller;


import com.yuan.commonvo.vo.OrderVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@RabbitListener(queues = {"delay-queue"})  //消费端监听队列,如果delay-queue死信队列中有消息过来就会被消费掉
public class QueueRecevier {

    @RabbitHandler
    public void handlerMessage(OrderVo orderVo){
        log.info("QueueRecevier.handlerMessage,data={}",orderVo);
    }




}

 

添加消费端QueueRecevierMessageConvert

 
package com.yuan.rabbitmqconsumer.controller;


import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueRecevierMessageConvert {

@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate=new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}

@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}

}

 

 测试:

技术图片

 

 

以上是关于RabbitMQ队列延迟的主要内容,如果未能解决你的问题,请参考以下文章

rabbitmq死信队列及延迟队列

SpringBoot RabbitMQ 延迟队列代码实现

SpringBoot集成RabbitMQ之死信队列限流队列延迟队列(第四节)

RabbitMQ的动态创建交换机、队列、绑定、死信队列,延迟队列代码实现

RabbitMQ-消息可靠性&延迟消息

Docker版RabbitMQ安装延迟队列插件及延迟队列项目应用实战