RabbitMQ 实现消息队列延迟

Posted BUG弄潮儿

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 实现消息队列延迟相关的知识,希望对你有一定的参考价值。

1.概述

要实现RabbitMQ的消息队列延迟功能,一般采用官方提供的 rabbitmq_delayed_message_exchange插件。但RabbitMQ版本必须是3.5.8以上才支持该插件,否则得用其死信队列功能。

2.安装RabbitMQ延迟插件

  • 检查插件
    使用rabbitmq-plugins list命令用于查看RabbitMQ安装的插件。
rabbitmq-plugins list

检查RabbitMQ插件安装情况

  • 下载插件

如果没有安装插件,则直接访问官网进行下载

https://www.rabbitmq.com/community-plugins.html

  • 安装插件

下载后,将其拷贝到RabbitMQ安装目录的plugins目录;并进行解压,如:

E:\\software\\RabbitMQ Server\\rabbitmq_server-3.11.13\\plugins

打开cmd命令行窗口,如果系统已经配置RabbitMQ环境变量,则直接执行以下的命令进行安装;否则需要进入到RabbitMQ安装目录的sbin目录。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.实现RabbitMQ消息队列延迟功能

  • pom.xml配置信息文件中,添加相关依赖文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.olive</groupId>
	<artifactId>rabbitmq-spring-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.7</version>
		<relativePath />
	</parent>
	<dependencies>
		<!--rabbitmq-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		
	<dependency>
		    <groupId>org.eclipse.paho</groupId>
		    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
		    <version>1.2.5</version>
		</dependency>

	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>
  • application.yml配置文件中配置RabbitMQ信息
server:
  port: 8080
spring:
  #给项目来个名字
  application:
    name: rabbitmq-spring-demo
  #配置rabbitMq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin123
    #虚拟host。可以不设置,使用server默认host;不同虚拟路径下的队列是隔离的
    virtual-host: /
  • RabbitMQ配置类
package com.olive.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * RabbitMQ配置类
 **/
@Configuration
public class RabbitMqConfig 
	
	public static final String DELAY_EXCHANGE_NAME = "delayed_exchange";
	
	public static final String DELAY_QUEUE_NAME = "delay_queue_name";
	
	public static final String DELAY_ROUTING_KEY = "delay_routing_key";

	@Bean
	public CustomExchange delayExchange() 
		Map<String, Object> args = new HashMap<>();
		args.put("x-delayed-type", "direct");
		return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);
	

	@Bean
	public Queue queue() 
		Queue queue = new Queue(DELAY_QUEUE_NAME, true);
		return queue;
	

	@Bean
	public Binding binding(Queue queue, CustomExchange delayExchange) 
		return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
	

  • 发送消息

实现消息发送,设置消息延迟5s。

package com.olive.service;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.olive.config.RabbitMqConfig;

/**
 * 消息发送者
 **/
@Service
public class CustomMessageSender 
	
	@Autowired
	private RabbitTemplate rabbitTemplate;

	public void sendMsg(String msg) 
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		System.out.println("消息发送时间:" + sdf.format(new Date()));
		rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE_NAME, 
				RabbitMqConfig.DELAY_ROUTING_KEY, 
				msg, 
				new MessagePostProcessor() 
					@Override
					public Message postProcessMessage(Message message) throws AmqpException 
						// 消息延迟5秒
						message.getMessageProperties().setHeader("x-delay", 5000);
						return message;
					
				);
	

  • 接收消息
package com.olive.service;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.olive.config.RabbitMqConfig;

/**
 * 消息接收者
 **/
@Component
public class CustomMessageReceiver 
	
	@RabbitListener(queues = RabbitMqConfig.DELAY_QUEUE_NAME)
	public void receive(String msg) 
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		System.out.println(sdf.format(new Date()) + msg);
		System.out.println("Receiver:执行取消订单");
	

  • 测试验证
package com.olive.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.olive.service.CustomMessageSender;

@RestController
public class DelayMessageController 
	
	@Autowired
	private CustomMessageSender customMessageSender;
	
	@GetMapping("/sendMessage")
	public String sendMessage() 
		// 发送消息
		customMessageSender.sendMsg("你已经支付超时,取消订单通知!");
		return "success";
	


发送消息,访问

http://127.0.0.1:8080/sendMessage

查看控制台打印的信息

rabbitmq的延迟消息队列实现

第一部分:延迟消息的实现原理和知识点

使用RabbitMQ来实现延迟任务必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,通过这两者的组合来实现上述需求。

消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串:

技术分享图片

当上面的消息扔到队列中后,过了3分钟,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去。单靠死信还不能实现延迟任务,还要靠Dead Letter Exchange。

Dead Letter Exchanges

Exchage的概念在这里就不在赘述。一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

2. 上面的消息的TTL到了,消息过期了。

3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

实现延迟队列

延迟任务通过消息的TTL和Dead Letter Exchange来实现。我们需要建立2个队列,一个用于发送消息,一个用于消息过期后的转发目标队列。

技术分享图片

生产者输出消息到Queue1,并且这个消息是设置有有效时间的,比如3分钟。消息会在Queue1中等待3分钟,如果没有消费者收掉的话,它就是被转发到Queue2,Queue2有消费者,收到,处理延迟任务。

完成延迟任务的实现。

 第二部分:具体实现例子

1、新建立消息队列配置文件rabbitmq.properties

 1 #rabbitmq消息队列的属性配置文件properties
 2 rabbitmq.study.host=192.168.56.101
 3 rabbitmq.study.username=duanml
 4 rabbitmq.study.password=[email protected]
 5 rabbitmq.study.port=5672
 6 rabbitmq.study.vhost=studymq
 7 
 8 #Mail 消息队列的相关变量值
 9 mail.exchange=mailExchange
10 mail.exchange.key=mail_queue_key
11 
12 
13 #Phone 消息队列的相关变量值
14 phone.topic.key=phone.one
15 phone.topic.key.more=phone.one.more
16 
17 #delay 延迟消息队列的相关变量值
18 delay.directQueue.key=TradePayNotify_delay_2m
19 delay.directMessage.key=TradePayNotify_delay_3m

2、新建立配置文件,申明延迟队列相关的配置信息如:spring-rabbigmq-dlx.xml

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 5        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
 6 
 7     <!--利用rabbitmq的TTL和延迟队列,实现延迟通知任务的例子
 8         1、申明了一个订单通知服务的队列  queue_Notify
 9         2、申明了一个延迟队列Notify_delay_15s,给整个队列设置消息过期时间 为15秒  ——————》 queue ttl  例子
10         3、申明了一个延迟队列Notify_delay_30s  给发送到这个队列的消息,消息本身设置过期时间 ————————》  message ttl  例子
11         4、当消息发送到2、3队列的时候,达到了过期时间,即转发到订单通知服务工作队列 1、
12         5、给队列1 配置消费者服务工作监听,即可完成延迟任务的结果。
13     -->
14 
15     <!-- ################ 订单通知服务消费者配置 ################ -->
16     <!--队列声明-->
17     <rabbit:queue id="queue_Notify" name="queue_Notify" durable="true" auto-delete="false" exclusive="false"/>
18 
19     <!-- 订单通知服务消费者 exchange -->
20     <rabbit:direct-exchange name="trade_direct" durable="true" auto-delete="false">
21         <rabbit:bindings>
22             <rabbit:binding queue="queue_Notify" key="TradePayNotify"/>
23         </rabbit:bindings>
24     </rabbit:direct-exchange>
25 
26     <!-- 订单通知监听处理器 -->
27     <bean id="notifyConsumerListener" class="org.seckill.rabbitmqListener.notify.NotifyConsumerListener"/>
28     <!--订单消息队列确认回调-->
29     <bean id="notifyConfirmCallBackListener" class="org.seckill.rabbitmqListener.notify.NotifyConfirmCallBackListener"></bean>
30     <!--订单消息队列消息发送失败回调-->
31     <bean id="notifyFailedCallBackListener" class="org.seckill.rabbitmqListener.notify.NotifyFailedCallBackListener"></bean>
32 
33     <!-- 监听器acknowledge=manual表示手工确认消息已处理(异常时可以不确认消息),auto表示自动确认(只要不抛出异常,消息就会被消费) -->
34     <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
35         <rabbit:listener queues="queue_Notify" ref="notifyConsumerListener"/>
36     </rabbit:listener-container>
37 
38     <!--*****************************************分割线*********************************************************-->
39 
40     <!-- ################ 延迟队列生产者配置 ################ -->
41     <rabbit:template id="rabbitTemplateDelay" mandatory="true" exchange="trade_direct_delay"
42                      connection-factory="connectionFactory"
43                      confirm-callback="notifyConfirmCallBackListener"
44                      return-callback="notifyFailedCallBackListener"
45                      message-converter="jsonMessageConverter"/>
46 
47     <!--配置生产消息的延迟队列操作主体类-->
48     <bean id="delayMQProducerImpl" class="org.seckill.utils.rabbitmq.Impl.MQProducerImpl">
49         <property name="rabbitTemplate" ref="rabbitTemplateDelay"></property>
50     </bean>
51 
52     <!--申明一个延迟队列,给整个队列的消息设置消息过期时间 x-message-ttl 2分钟
53         当消息达到过期时间的时候,rabbitmq将会把消息重新定位转发到其它的队列中去,本例子转发到
54         exchange:trade_direct
55         routing-key:TradePayNotify
56         满足如上两点的队列中去即为:queue_Notify
57     -->
58     <rabbit:queue id="Notify_delay_2m" name="Notify_delay_2m" durable="true" auto-delete="false"
59                   exclusive="false">
60         <rabbit:queue-arguments>
61             <entry key="x-message-ttl" value="120000" value-type="java.lang.Long"/>
62             <entry key="x-dead-letter-exchange" value="trade_direct"/>
63             <entry key="x-dead-letter-routing-key" value="TradePayNotify"/>
64         </rabbit:queue-arguments>
65     </rabbit:queue>
66 
67     <!--申明一个延迟队列,在发送消息的时候给消息设置过期时间 3分钟
68            当消息达到过期时间的时候,rabbitmq将会把消息重新定位转发到其它的队列中去,本例子转发到
69            exchange:trade_direct
70            routing-key:TradePayNotify
71            满足如上两点的队列中去即为:queue_Notify
72     -->
73     <rabbit:queue id="Notify_delay_3m" name="Notify_delay_3m" durable="true" auto-delete="false"
74                   exclusive="false">
75         <rabbit:queue-arguments>
76             <entry key="x-dead-letter-exchange" value="trade_direct"/>
77             <entry key="x-dead-letter-routing-key" value="TradePayNotify"/>
78         </rabbit:queue-arguments>
79     </rabbit:queue>
80 
81     <!-- 延迟队列工作的 exchange -->
82     <rabbit:direct-exchange name="trade_direct_delay" durable="true" auto-delete="false">
83         <rabbit:bindings>
84             <rabbit:binding queue="Notify_delay_2m" key="TradePayNotify_delay_2m"/>
85             <rabbit:binding queue="Notify_delay_3m" key="TradePayNotify_delay_3m"/>
86         </rabbit:bindings>
87     </rabbit:direct-exchange>
88 
89 </beans>

3、新建立延迟队列测试Controller

  1 package org.seckill.web;
  2 
  3 import org.seckill.dto.SeckillResult;
  4 import org.seckill.entity.Seckill;
  5 import org.seckill.utils.rabbitmq.Impl.MQProducerImpl;
  6 import org.seckill.utils.rabbitmq.MQProducer;
  7 import org.slf4j.Logger;
  8 import org.slf4j.LoggerFactory;
  9 import org.springframework.amqp.core.Message;
 10 import org.springframework.beans.factory.annotation.Autowired;
 11 import org.springframework.beans.factory.annotation.Value;
 12 import org.springframework.stereotype.Controller;
 13 import org.springframework.web.bind.annotation.RequestMapping;
 14 import org.springframework.web.bind.annotation.ResponseBody;
 15 
 16 import java.util.Date;
 17 
 18 /**
 19  * <p>Title: org.seckill.web</p>
 20  * <p>Company:东软集团(neusoft)</p>
 21  * <p>Copyright:Copyright(c)2018</p>
 22  * User: 段美林
 23  * Date: 2018/5/30 17:33
 24  * Description: 消息队列测试
 25  */
 26 @Controller
 27 @RequestMapping("/rabbitmq")
 28 public class RabbitmqController {
 29 
 30     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 31  40 
 41     @Value("${delay.directQueue.key}")
 42     private String delay_directQueue_key;
 43 
 44     @Value("${delay.directMessage.key}")
 45     private String delay_directMessage_key;
 46  52 
 53     @Autowired
 54     private MQProducerImpl delayMQProducerImpl;111 
112     /**
113      * @Description: 消息队列
114      * @Author:
115      * @CreateTime:
116      */
117     @ResponseBody
118     @RequestMapping("/sendDelayQueue")
119     public SeckillResult<Long> testDelayQueue() {
120         SeckillResult<Long> result = null;
121         Date now = new Date();
122         try {
123             Seckill seckill = new Seckill();
124        //第一种情况,给队列设置消息ttl,详情见配置文件
125             for (int i = 0; i < 2; i++) {
126                 seckill.setSeckillId(1922339387 + i);
127                 seckill.setName("delay_queue_ttl_" + i);
128                 String msgId = delayMQProducerImpl.getMsgId();
129                 Message message = delayMQProducerImpl.messageBuil(seckill,msgId);
130                 delayMQProducerImpl.sendDataToRabbitMQ(delay_directQueue_key, message);
131             }
132         //第二种情况,给消息设置ttl
133             for (int i = 0; i < 2; i++) {
134                 seckill.setSeckillId(1922339287 + i);
135                 seckill.setName("delay_message_ttl_" + i);
136                 String msgId = delayMQProducerImpl.getMsgId();
137                 Message message = delayMQProducerImpl.messageBuil(seckill,msgId);
138                 if (message != null) {
139                     //给消息设置过期时间ttl,为3分钟
140                     message.getMessageProperties().setExpiration("180000");
141                     delayMQProducerImpl.sendDataToRabbitMQ(delay_directMessage_key, message);
142                 }
143             }
144             result = new SeckillResult<Long>(true, now.getTime());
145         } catch (Exception e) {
146             logger.error(e.getMessage(), e);
147         }
148         return result;
149     }
150 
151 }

4、编写延迟消息确认类和监听类:

NotifyConfirmCallBackListener.java
 1 package org.seckill.rabbitmqListener.notify;
 2 
 3 import org.slf4j.Logger;
 4 import org.slf4j.LoggerFactory;
 5 import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
 6 import org.springframework.amqp.rabbit.support.CorrelationData;
 7 
 8 /**
 9  * <p>Title: org.seckill.rabbitmqListener.notify</p>
10  * <p>Company:东软集团(neusoft)</p>
11  * <p>Copyright:Copyright(c)2018</p>
12  * User: 段美林
13  * Date: 2018/6/3 0:27
14  * Description: 延迟任务测试--->消息确认回调类
15  */
16 public class NotifyConfirmCallBackListener implements ConfirmCallback {
17 
18     private final Logger logger = LoggerFactory.getLogger(this.getClass());
19 
20     /**
21      * Confirmation callback.
22      *
23      * @param correlationData correlation data for the callback.
24      * @param ack             true for ack, false for nack
25      * @param cause           An optional cause, for nack, when available, otherwise null.
26      */
27     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
28         logger.info("延迟测试---确认消息完成-------->confirm--:correlationData:" + correlationData.getId() + ",ack:" + ack + ",cause:" + cause);
29     }
30 }
NotifyConsumerListener.java
 1 package org.seckill.rabbitmqListener.notify;
 2 
 3 import com.alibaba.fastjson.JSONObject;
 4 import com.rabbitmq.client.Channel;
 5 import org.slf4j.Logger;
 6 import org.slf4j.LoggerFactory;
 7 import org.springframework.amqp.core.Message;
 8 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
 9 
10 /**
11  * <p>Title: org.seckill.rabbitmqListener.notify</p>
12  * <p>Company:东软集团(neusoft)</p>
13  * <p>Copyright:Copyright(c)2018</p>
14  * User: 段美林
15  * Date: 2018/6/3 0:27
16  * Description: 订单通知队列监听服务
17  * 实现延迟任务的功能
18  */
19 public class NotifyConsumerListener implements ChannelAwareMessageListener {
20 
21 
22     private final Logger logger = LoggerFactory.getLogger(this.getClass());
23 
24     /**
25      * Callback for processing a received Rabbit message.
26      * <p>Implementors are supposed to process the given Message,
27      * typically sending reply messages through the given Session.
28      *
29      * @param message the received AMQP message (never <code>null</code>)
30      * @param channel the underlying Rabbit Channel (never <code>null</code>)
31      * @throws Exception Any.
32      */
33     public void onMessage(Message message, Channel channel) throws Exception {
34         try {
35             //将字节流对象转换成Java对象
36 //            Seckill seckill=(Seckill) new ObjectInputStream(new ByteArrayInputStream(message.getBody())).readObject();
37 
38             String returnStr = new String(message.getBody(),"UTF-8");
39             JSONObject jsStr = JSONObject.parseObject(returnStr);
40 
41             logger.info("延迟测试--消费开始:名称为--===>" + jsStr.getString("name") + "----->返回消息:" + returnStr + "||||消息的Properties:--》" + message.getMessageProperties());
42 
43             //TODO 进行相关业务操作
44 
45             //成功处理业务,那么返回消息确认机制,这个消息成功处理OK
46             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
47 
48         } catch (Exception e) {
49             if (message.getMessageProperties().getRedelivered()) {
50                 //消息已经进行过一次轮询操作,还是失败,将拒绝再次接收本消息
51                 logger.info("消息已重复处理失败,拒绝再次接收...");
52                 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
53 
54                 //TODO 进行相关业务操作
55 
56             } else {
57                 //消息第一次接收处理失败后,将再此回到队列中进行  再一次轮询操作
58                 logger.info("消息即将再次返回队列处理...");
59                 //处理失败,那么返回消息确认机制,这个消息没有成功处理,返回到队列中
60                 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
61             }
62         }
63     }
64 }
NotifyFailedCallBackListener.java
 1 package org.seckill.rabbitmqListener.notify;
 2 
 3 import org.slf4j.Logger;
 4 import org.slf4j.LoggerFactory;
 5 import org.springframework.amqp.core.Message;
 6 import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
 7 
 8 /**
 9  * <p>Title: org.seckill.rabbitmqListener.notify</p>
10  * <p>Company:东软集团(neusoft)</p>
11  * <p>Copyright:Copyright(c)2018</p>
12  * User: 段美林
13  * Date: 2018/6/3 0:28
14  * Description: 延迟任务测试----> 消息发送失败回调类
15  */
16 public class NotifyFailedCallBackListener implements ReturnCallback {
17 
18     private final Logger logger = LoggerFactory.getLogger(this.getClass());
19 
20     /**
21      * Returned message callback.
22      *
23      * @param message    the returned message.
24      * @param replyCode  the reply code.
25      * @param replyText  the reply text.
26      * @param exchange   the exchange.
27      * @param routingKey the routing key.
28      */
29     public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
30         logger.info("延迟测试------------->return--message:" +
31                 new String(message.getBody()) +
32                 ",replyCode:" + replyCode + ",replyText:" + replyText +
33                 ",exchange:" + exchange + ",routingKey:" + routingKey);
34     }
35 }

5、编写消息队列的操作类和接口:

MQProducer.java
  1 package org.seckill.utils.rabbitmq;
  2 
  3 import org.springframework.amqp.core.Message;
  4 import org.springframework.amqp.core.MessagePostProcessor;
  5 import org.springframework.amqp.rabbit.support.CorrelationData;
  6 
  7 /**
  8  * <p>Title: org.seckill.utils.rabbitmq</p>
  9  * <p>Company:东软集团(neusoft)</p>
 10  * <p>Copyright:Copyright(c)2018</p>
 11  * User: 段美林
 12  * Date: 2018/5/30 11:49
 13  * Description: No Description
 14  */
 15 public interface MQProducer {
 16 
 17     /**
 18      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
 19      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
 20      * @param message
 21      */
 22     void sendDataToRabbitMQ(java.lang.Object message);
 23 
 24     /**
 25      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
 26      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
 27      * @param message
 28      * @param messagePostProcessor
 29      */
 30     void sendDataToRabbitMQ(java.lang.Object message, MessagePostProcessor messagePostProcessor);
 31 
 32     /**
 33      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
 34      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
 35      * @param message
 36      * @param messagePostProcessor
 37      * @param correlationData
 38      */
 39     void sendDataToRabbitMQ(java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
 40 
 41     /**
 42      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
 43      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
 44      * @param routingKey
 45      * @param message
 46      */
 47     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message);
 48 
 49     /**
 50      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
 51      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
 52      * @param routingKey
 53      * @param message
 54      * @param correlationData
 55      */
 56     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, CorrelationData correlationData);
 57 
 58     /**
 59      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
 60      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
 61      * @param routingKey
 62      * @param message
 63      * @param messagePostProcessor
 64      */
 65     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor);
 66 
 67     /**
 68      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
 69      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
 70      * @param routingKey
 71      * @param message
 72      * @param messagePostProcessor
 73      * @param correlationData
 74      */
 75     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
 76 
 77     /**
 78      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
 79      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
 80      * @param exchange
 81      * @param routingKey
 82      * @param message
 83      */
 84     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message);
 85 
 86     /**
 87      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
 88      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
 89      * @param exchange
 90      * @param routingKey
 91      * @param message
 92      * @param correlationData
 93      */
 94     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, CorrelationData correlationData);
 95 
 96     /**
 97      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
 98      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
 99      * @param exchange
100      * @param routingKey
101      * @param message
102      * @param messagePostProcessor
103      */
104     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor);
105 
106     /**
107      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
108      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
109      * @param exchange
110      * @param routingKey
111      * @param message
112      * @param messagePostProcessor
113      * @param correlationData
114      */
115     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
116 
117     Message messageBuil(Object handleObject, String msgId);
118 
119     String getMsgId();
120 }
MQProducerImpl.java
  1 package org.seckill.utils.rabbitmq.Impl;
  2 
  3 import com.alibaba.fastjson.JSONObject;
  4 import org.seckill.utils.rabbitmq.MQProducer;
  5 import org.slf4j.Logger;
  6 import org.slf4j.LoggerFactory;
  7 import org.springframework.amqp.AmqpException;
  8 import org.springframework.amqp.core.Message;
  9 import org.springframework.amqp.core.MessageBuilder;
 10 import org.springframework.amqp.core.MessagePostProcessor;
 11 import org.springframework.amqp.core.MessageProperties;
 12 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 13 import org.springframework.amqp.rabbit.support.CorrelationData;
 14 import org.springframework.stereotype.Component;
 15 
 16 import java.io.UnsupportedEncodingException;
 17 import java.util.UUID;
 18 
 19 /**
 20  * <p>Title: org.seckill.utils.rabbitmq.Impl</p>
 21  * <p>Company:东软集团(neusoft)</p>
 22  * <p>Copyright:Copyright(c)2018</p>
 23  * User: 段美林
 24  * Date: 2018/6/2 22:54
 25  * Description: 消息生产者操作主体类
 26  */
 27 @Component
 28 public class MQProducerImpl implements MQProducer{
 29 
 30     private static final Logger logger = LoggerFactory.getLogger(MQProducerImpl.class);
 31 
 32     private RabbitTemplate rabbitTemplate;
 33 
 34     /**
 35      * Sets the rabbitTemplate.
 36      * <p>
 37      * <p>You can use getRabbitTemplate() to get the value of rabbitTemplate</p>
 38      *
 39      * @param rabbitTemplate rabbitTemplate
 40      */
 41     public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
 42         this.rabbitTemplate = rabbitTemplate;
 43     }
 44 
 45     /**
 46      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
 47      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
 48      *
 49      * @param message
 50      */
 51     public void sendDataToRabbitMQ(Object message) {
 52         try {
 53             if (message instanceof Message){
 54                 Message messageSend = (Message) message;
 55                 String msgId = messageSend.getMessageProperties().getCorrelationId();
 56                 CorrelationData correlationData = new CorrelationData(msgId);
 57                 rabbitTemplate.convertAndSend(rabbitTemplate.getRoutingKey(),message,correlationData);
 58             }else {
 59                 rabbitTemplate.convertAndSend(message);
 60             }
 61         } catch (AmqpException e) {
 62             logger.error(e.getMessage(), e);
 63         }
 64     }
 65 
 66     /**
 67      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
 68      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
 69      *
 70      * @param message
 71      * @param messagePostProcessor
 72      */
 73     public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor) {
 74         try {
 75             if (message instanceof Message){
 76                 Message messageSend = (Message) message;
 77                 String msgId = messageSend.getMessageProperties().getCorrelationId();
 78                 CorrelationData correlationData = new CorrelationData(msgId);
 79                 rabbitTemplate.convertAndSend(rabbitTemplate.getRoutingKey(),message,messagePostProcessor,correlationData);
 80             }else {
 81                 rabbitTemplate.convertAndSend(message, messagePostProcessor);
 82             }
 83         } catch (AmqpException e) {
 84             logger.error(e.getMessage(), e);
 85         }
 86     }
 87 
 88     /**
 89      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
 90      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
 91      *
 92      * @param message
 93      * @param messagePostProcessor
 94      * @param correlationData
 95      */
 96     public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {
 97         try {
 98             rabbitTemplate.convertAndSend(message, messagePostProcessor, correlationData);
 99         } catch (AmqpException e) {
100             logger.error(e.getMessage(), e);
101         }
102     }
103 
104     /**
105      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
106      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
107      *
108      * @param routingKey
109      * @param message
110      */
111     public void sendDataToRabbitMQ(String routingKey, Object message) {
112         try {
113             if (message instanceof Message){
114                 Message messageSend = (Message) message;
115                 String msgId = messageSend.getMessageProperties().getCorrelationId();
116                 CorrelationData correlationData = new CorrelationData(msgId);
117                 rabbitTemplate.convertAndSend(routingKey,message,correlationData);
118             }else {
119                 rabbitTemplate.convertAndSend(routingKey, message);
120             }
121         } catch (AmqpException e) {
122             logger.error(e.getMessage(), e);
123         }
124     }
125 
126     /**
127      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
128      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
129      *
130      * @param routingKey
131      * @param message
132      * @param correlationData
133      */
134     public void sendDataToRabbitMQ(String routingKey, Object message, CorrelationData correlationData) {
135         try {
136             rabbitTemplate.convertAndSend(routingKey, message, correlationData);
137         } catch (AmqpException e) {
138             logger.error(e.getMessage(), e);
139         }
140     }
141 
142     /**
143      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
144      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
145      *
146      * @param routingKey
147      * @param message
148      * @param messagePostProcessor
149      */
150     public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor) {
151         try {
152             if (message instanceof Message){
153                 Message messageSend = (Message) message;
154                 String msgId = messageSend.getMessageProperties().getCorrelationId();
155                 CorrelationData correlationData = new CorrelationData(msgId);
156                 rabbitTemplate.convertAndSend(routingKey,message,messagePostProcessor,correlationData);
157             }else {
158                 rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor);
159             }
160         } catch (AmqpException e) {
161             logger.error(e.getMessage(), e);
162         }
163     }
164 
165     /**
166      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
167      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
168      *
169      * @param routingKey
170      * @param message
171      * @param messagePostProcessor
172      * @param correlationData
173      */
174     public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {
175         try {
176             rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor, correlationData);
177         } catch (AmqpException e) {
178             logger.error(e.getMessage(), e);
179         }
180     }
181 
182     /**
183      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
184      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
185      *
186      * @param exchange
187      * @param routingKey
188      * @param message
189      */
190     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message) {
191         try {
192             if (message instanceof Message){
193                 Message messageSend = (Message) message;
194                 String msgId = messageSend.getMessageProperties().getCorrelationId();
195                 CorrelationData correlationData = new CorrelationData(msgId);
196                 rabbitTemplate.convertAndSend(routingKey,message,correlationData);
197             }else {
198                 rabbitTemplate.convertAndSend(exchange, routingKey, message);
199             }
200         } catch (AmqpException e) {
201             logger.error(e.getMessage(), e);
202         }
203     }
204 
205     /**
206      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
207      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
208      *
209      * @param exchange
210      * @param routingKey
211      * @param message
212      * @param correlationData
213      */
214     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, CorrelationData correlationData) {
215         try {
216             rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
217         } catch (AmqpException e) {
218             logger.error(e.getMessage(), e);
219         }
220     }
221 
222     /**
223      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
224      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
225      *
226      * @param exchange
227      * @param routingKey
228      * @param message
229      * @param messagePostProcessor
230      */
231     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) {
232         try {
233             rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor);
234         } catch (AmqpException e) {
235             logger.error(e.getMessage(), e);
236         }
237     }
238 
239     /**
240      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
241      * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
242      *
243      * @param exchange
244      * @param routingKey
245      * @param message
246      * @param messagePostProcessor
247      * @param correlationData
248      */
249     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {
250         try {
251             rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);
252         } catch (AmqpException e) {
253             logger.error(e.getMessage(), e);
254         }
255     }
256 
257     /**
258      * 构建Message对象,进行消息发送
259      * @param handleObject
260      * @param msgId
261      * @return
262      */
263     public Message messageBuil(Object handleObject, String msgId) {
264         try {
265             //先转成JSON
266             String objectJSON = JSONObject.toJSONString(handleObject);
267             //再构建Message对象
268             Message messageBuil = MessageBuilder.withBody(objectJSON.getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
269                     .setCorrelationId(msgId).build();
270             return messageBuil;
271         } catch (UnsupportedEncodingException e) {
272             logger.error("构建Message出错:" + e.getMessage(),e);
273             return null;
274         }
275     }
276 
277     /**
278      * 生成唯一的消息操作id
279      * @return
280      */
281     public String getMsgId() {
282         return UUID.randomUUID().toString();
283     }
284 
285 }

至此就完成了延迟消息队列的所有代码实现,

 

以上是关于RabbitMQ 实现消息队列延迟的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ实现延迟发送消息

基于消息队列(RabbitMQ)实现延迟任务

RabbitMQ如何实现延迟队列?

RabbitMQ---延迟队列,整合springboot

用 RabbitMQ 延迟队列,实现消息延迟推送

实践:RabbitMQ 延迟队列,消息延迟推送的实现