RabbitMQ 实现消息队列延迟
Posted BUG弄潮儿
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 实现消息队列延迟相关的知识,希望对你有一定的参考价值。
1.概述
要实现RabbitMQ的消息队列延迟功能,一般采用官方提供的 rabbitmq_delayed_message_exchange
插件。但RabbitMQ版本必须是3.5.8以上才支持该插件,否则得用其死信队列功能。
2.安装RabbitMQ延迟插件
- 检查插件
使用rabbitmq-plugins list
命令用于查看RabbitMQ安装的插件。
rabbitmq-plugins list
检查RabbitMQ插件安装情况
- 下载插件
如果没有安装插件,则直接访问官网进行下载
https://www.rabbitmq.com/community-plugins.html
- 安装插件
下载后,将其拷贝到RabbitMQ安装目录的plugins目录;并进行解压,如:
E:\\software\\RabbitMQ Server\\rabbitmq_server-3.11.13\\plugins
打开cmd命令行窗口,如果系统已经配置RabbitMQ环境变量,则直接执行以下的命令进行安装;否则需要进入到RabbitMQ安装目录的sbin目录。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3.实现RabbitMQ消息队列延迟功能
- pom.xml配置信息文件中,添加相关依赖文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.olive</groupId>
<artifactId>rabbitmq-spring-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.7</version>
<relativePath />
</parent>
<dependencies>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
- application.yml配置文件中配置RabbitMQ信息
server:
port: 8080
spring:
#给项目来个名字
application:
name: rabbitmq-spring-demo
#配置rabbitMq 服务器
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin123
#虚拟host。可以不设置,使用server默认host;不同虚拟路径下的队列是隔离的
virtual-host: /
- RabbitMQ配置类
package com.olive.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* RabbitMQ配置类
**/
@Configuration
public class RabbitMqConfig
public static final String DELAY_EXCHANGE_NAME = "delayed_exchange";
public static final String DELAY_QUEUE_NAME = "delay_queue_name";
public static final String DELAY_ROUTING_KEY = "delay_routing_key";
@Bean
public CustomExchange delayExchange()
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);
@Bean
public Queue queue()
Queue queue = new Queue(DELAY_QUEUE_NAME, true);
return queue;
@Bean
public Binding binding(Queue queue, CustomExchange delayExchange)
return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
- 发送消息
实现消息发送,设置消息延迟5s。
package com.olive.service;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.olive.config.RabbitMqConfig;
/**
* 消息发送者
**/
@Service
public class CustomMessageSender
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String msg)
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("消息发送时间:" + sdf.format(new Date()));
rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE_NAME,
RabbitMqConfig.DELAY_ROUTING_KEY,
msg,
new MessagePostProcessor()
@Override
public Message postProcessMessage(Message message) throws AmqpException
// 消息延迟5秒
message.getMessageProperties().setHeader("x-delay", 5000);
return message;
);
- 接收消息
package com.olive.service;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.olive.config.RabbitMqConfig;
/**
* 消息接收者
**/
@Component
public class CustomMessageReceiver
@RabbitListener(queues = RabbitMqConfig.DELAY_QUEUE_NAME)
public void receive(String msg)
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(sdf.format(new Date()) + msg);
System.out.println("Receiver:执行取消订单");
- 测试验证
package com.olive.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.olive.service.CustomMessageSender;
@RestController
public class DelayMessageController
@Autowired
private CustomMessageSender customMessageSender;
@GetMapping("/sendMessage")
public String sendMessage()
// 发送消息
customMessageSender.sendMsg("你已经支付超时,取消订单通知!");
return "success";
发送消息,访问
http://127.0.0.1:8080/sendMessage
查看控制台打印的信息
rabbitmq的延迟消息队列实现
第一部分:延迟消息的实现原理和知识点
使用RabbitMQ来实现延迟任务必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,通过这两者的组合来实现上述需求。
消息的TTL(Time To Live)
消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。
可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串:
当上面的消息扔到队列中后,过了3分钟,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去。单靠死信还不能实现延迟任务,还要靠Dead Letter Exchange。
Dead Letter Exchanges
Exchage的概念在这里就不在赘述。一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。
1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
2. 上面的消息的TTL到了,消息过期了。
3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
实现延迟队列
延迟任务通过消息的TTL和Dead Letter Exchange来实现。我们需要建立2个队列,一个用于发送消息,一个用于消息过期后的转发目标队列。
生产者输出消息到Queue1,并且这个消息是设置有有效时间的,比如3分钟。消息会在Queue1中等待3分钟,如果没有消费者收掉的话,它就是被转发到Queue2,Queue2有消费者,收到,处理延迟任务。
完成延迟任务的实现。
第二部分:具体实现例子
1、新建立消息队列配置文件rabbitmq.properties
1 #rabbitmq消息队列的属性配置文件properties 2 rabbitmq.study.host=192.168.56.101 3 rabbitmq.study.username=duanml 4 rabbitmq.study.password=[email protected] 5 rabbitmq.study.port=5672 6 rabbitmq.study.vhost=studymq 7 8 #Mail 消息队列的相关变量值 9 mail.exchange=mailExchange 10 mail.exchange.key=mail_queue_key 11 12 13 #Phone 消息队列的相关变量值 14 phone.topic.key=phone.one 15 phone.topic.key.more=phone.one.more 16 17 #delay 延迟消息队列的相关变量值 18 delay.directQueue.key=TradePayNotify_delay_2m 19 delay.directMessage.key=TradePayNotify_delay_3m
2、新建立配置文件,申明延迟队列相关的配置信息如:spring-rabbigmq-dlx.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:rabbit="http://www.springframework.org/schema/rabbit" 5 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> 6 7 <!--利用rabbitmq的TTL和延迟队列,实现延迟通知任务的例子 8 1、申明了一个订单通知服务的队列 queue_Notify 9 2、申明了一个延迟队列Notify_delay_15s,给整个队列设置消息过期时间 为15秒 ——————》 queue ttl 例子 10 3、申明了一个延迟队列Notify_delay_30s 给发送到这个队列的消息,消息本身设置过期时间 ————————》 message ttl 例子 11 4、当消息发送到2、3队列的时候,达到了过期时间,即转发到订单通知服务工作队列 1、 12 5、给队列1 配置消费者服务工作监听,即可完成延迟任务的结果。 13 --> 14 15 <!-- ################ 订单通知服务消费者配置 ################ --> 16 <!--队列声明--> 17 <rabbit:queue id="queue_Notify" name="queue_Notify" durable="true" auto-delete="false" exclusive="false"/> 18 19 <!-- 订单通知服务消费者 exchange --> 20 <rabbit:direct-exchange name="trade_direct" durable="true" auto-delete="false"> 21 <rabbit:bindings> 22 <rabbit:binding queue="queue_Notify" key="TradePayNotify"/> 23 </rabbit:bindings> 24 </rabbit:direct-exchange> 25 26 <!-- 订单通知监听处理器 --> 27 <bean id="notifyConsumerListener" class="org.seckill.rabbitmqListener.notify.NotifyConsumerListener"/> 28 <!--订单消息队列确认回调--> 29 <bean id="notifyConfirmCallBackListener" class="org.seckill.rabbitmqListener.notify.NotifyConfirmCallBackListener"></bean> 30 <!--订单消息队列消息发送失败回调--> 31 <bean id="notifyFailedCallBackListener" class="org.seckill.rabbitmqListener.notify.NotifyFailedCallBackListener"></bean> 32 33 <!-- 监听器acknowledge=manual表示手工确认消息已处理(异常时可以不确认消息),auto表示自动确认(只要不抛出异常,消息就会被消费) --> 34 <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> 35 <rabbit:listener queues="queue_Notify" ref="notifyConsumerListener"/> 36 </rabbit:listener-container> 37 38 <!--*****************************************分割线*********************************************************--> 39 40 <!-- ################ 延迟队列生产者配置 ################ --> 41 <rabbit:template id="rabbitTemplateDelay" mandatory="true" exchange="trade_direct_delay" 42 connection-factory="connectionFactory" 43 confirm-callback="notifyConfirmCallBackListener" 44 return-callback="notifyFailedCallBackListener" 45 message-converter="jsonMessageConverter"/> 46 47 <!--配置生产消息的延迟队列操作主体类--> 48 <bean id="delayMQProducerImpl" class="org.seckill.utils.rabbitmq.Impl.MQProducerImpl"> 49 <property name="rabbitTemplate" ref="rabbitTemplateDelay"></property> 50 </bean> 51 52 <!--申明一个延迟队列,给整个队列的消息设置消息过期时间 x-message-ttl 2分钟 53 当消息达到过期时间的时候,rabbitmq将会把消息重新定位转发到其它的队列中去,本例子转发到 54 exchange:trade_direct 55 routing-key:TradePayNotify 56 满足如上两点的队列中去即为:queue_Notify 57 --> 58 <rabbit:queue id="Notify_delay_2m" name="Notify_delay_2m" durable="true" auto-delete="false" 59 exclusive="false"> 60 <rabbit:queue-arguments> 61 <entry key="x-message-ttl" value="120000" value-type="java.lang.Long"/> 62 <entry key="x-dead-letter-exchange" value="trade_direct"/> 63 <entry key="x-dead-letter-routing-key" value="TradePayNotify"/> 64 </rabbit:queue-arguments> 65 </rabbit:queue> 66 67 <!--申明一个延迟队列,在发送消息的时候给消息设置过期时间 3分钟 68 当消息达到过期时间的时候,rabbitmq将会把消息重新定位转发到其它的队列中去,本例子转发到 69 exchange:trade_direct 70 routing-key:TradePayNotify 71 满足如上两点的队列中去即为:queue_Notify 72 --> 73 <rabbit:queue id="Notify_delay_3m" name="Notify_delay_3m" durable="true" auto-delete="false" 74 exclusive="false"> 75 <rabbit:queue-arguments> 76 <entry key="x-dead-letter-exchange" value="trade_direct"/> 77 <entry key="x-dead-letter-routing-key" value="TradePayNotify"/> 78 </rabbit:queue-arguments> 79 </rabbit:queue> 80 81 <!-- 延迟队列工作的 exchange --> 82 <rabbit:direct-exchange name="trade_direct_delay" durable="true" auto-delete="false"> 83 <rabbit:bindings> 84 <rabbit:binding queue="Notify_delay_2m" key="TradePayNotify_delay_2m"/> 85 <rabbit:binding queue="Notify_delay_3m" key="TradePayNotify_delay_3m"/> 86 </rabbit:bindings> 87 </rabbit:direct-exchange> 88 89 </beans>
3、新建立延迟队列测试Controller
1 package org.seckill.web; 2 3 import org.seckill.dto.SeckillResult; 4 import org.seckill.entity.Seckill; 5 import org.seckill.utils.rabbitmq.Impl.MQProducerImpl; 6 import org.seckill.utils.rabbitmq.MQProducer; 7 import org.slf4j.Logger; 8 import org.slf4j.LoggerFactory; 9 import org.springframework.amqp.core.Message; 10 import org.springframework.beans.factory.annotation.Autowired; 11 import org.springframework.beans.factory.annotation.Value; 12 import org.springframework.stereotype.Controller; 13 import org.springframework.web.bind.annotation.RequestMapping; 14 import org.springframework.web.bind.annotation.ResponseBody; 15 16 import java.util.Date; 17 18 /** 19 * <p>Title: org.seckill.web</p> 20 * <p>Company:东软集团(neusoft)</p> 21 * <p>Copyright:Copyright(c)2018</p> 22 * User: 段美林 23 * Date: 2018/5/30 17:33 24 * Description: 消息队列测试 25 */ 26 @Controller 27 @RequestMapping("/rabbitmq") 28 public class RabbitmqController { 29 30 private final Logger logger = LoggerFactory.getLogger(this.getClass()); 31 40 41 @Value("${delay.directQueue.key}") 42 private String delay_directQueue_key; 43 44 @Value("${delay.directMessage.key}") 45 private String delay_directMessage_key; 46 52 53 @Autowired 54 private MQProducerImpl delayMQProducerImpl;111 112 /** 113 * @Description: 消息队列 114 * @Author: 115 * @CreateTime: 116 */ 117 @ResponseBody 118 @RequestMapping("/sendDelayQueue") 119 public SeckillResult<Long> testDelayQueue() { 120 SeckillResult<Long> result = null; 121 Date now = new Date(); 122 try { 123 Seckill seckill = new Seckill(); 124 //第一种情况,给队列设置消息ttl,详情见配置文件 125 for (int i = 0; i < 2; i++) { 126 seckill.setSeckillId(1922339387 + i); 127 seckill.setName("delay_queue_ttl_" + i); 128 String msgId = delayMQProducerImpl.getMsgId(); 129 Message message = delayMQProducerImpl.messageBuil(seckill,msgId); 130 delayMQProducerImpl.sendDataToRabbitMQ(delay_directQueue_key, message); 131 } 132 //第二种情况,给消息设置ttl 133 for (int i = 0; i < 2; i++) { 134 seckill.setSeckillId(1922339287 + i); 135 seckill.setName("delay_message_ttl_" + i); 136 String msgId = delayMQProducerImpl.getMsgId(); 137 Message message = delayMQProducerImpl.messageBuil(seckill,msgId); 138 if (message != null) { 139 //给消息设置过期时间ttl,为3分钟 140 message.getMessageProperties().setExpiration("180000"); 141 delayMQProducerImpl.sendDataToRabbitMQ(delay_directMessage_key, message); 142 } 143 } 144 result = new SeckillResult<Long>(true, now.getTime()); 145 } catch (Exception e) { 146 logger.error(e.getMessage(), e); 147 } 148 return result; 149 } 150 151 }
4、编写延迟消息确认类和监听类:
NotifyConfirmCallBackListener.java
1 package org.seckill.rabbitmqListener.notify; 2 3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; 6 import org.springframework.amqp.rabbit.support.CorrelationData; 7 8 /** 9 * <p>Title: org.seckill.rabbitmqListener.notify</p> 10 * <p>Company:东软集团(neusoft)</p> 11 * <p>Copyright:Copyright(c)2018</p> 12 * User: 段美林 13 * Date: 2018/6/3 0:27 14 * Description: 延迟任务测试--->消息确认回调类 15 */ 16 public class NotifyConfirmCallBackListener implements ConfirmCallback { 17 18 private final Logger logger = LoggerFactory.getLogger(this.getClass()); 19 20 /** 21 * Confirmation callback. 22 * 23 * @param correlationData correlation data for the callback. 24 * @param ack true for ack, false for nack 25 * @param cause An optional cause, for nack, when available, otherwise null. 26 */ 27 public void confirm(CorrelationData correlationData, boolean ack, String cause) { 28 logger.info("延迟测试---确认消息完成-------->confirm--:correlationData:" + correlationData.getId() + ",ack:" + ack + ",cause:" + cause); 29 } 30 }
NotifyConsumerListener.java
1 package org.seckill.rabbitmqListener.notify; 2 3 import com.alibaba.fastjson.JSONObject; 4 import com.rabbitmq.client.Channel; 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 import org.springframework.amqp.core.Message; 8 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; 9 10 /** 11 * <p>Title: org.seckill.rabbitmqListener.notify</p> 12 * <p>Company:东软集团(neusoft)</p> 13 * <p>Copyright:Copyright(c)2018</p> 14 * User: 段美林 15 * Date: 2018/6/3 0:27 16 * Description: 订单通知队列监听服务 17 * 实现延迟任务的功能 18 */ 19 public class NotifyConsumerListener implements ChannelAwareMessageListener { 20 21 22 private final Logger logger = LoggerFactory.getLogger(this.getClass()); 23 24 /** 25 * Callback for processing a received Rabbit message. 26 * <p>Implementors are supposed to process the given Message, 27 * typically sending reply messages through the given Session. 28 * 29 * @param message the received AMQP message (never <code>null</code>) 30 * @param channel the underlying Rabbit Channel (never <code>null</code>) 31 * @throws Exception Any. 32 */ 33 public void onMessage(Message message, Channel channel) throws Exception { 34 try { 35 //将字节流对象转换成Java对象 36 // Seckill seckill=(Seckill) new ObjectInputStream(new ByteArrayInputStream(message.getBody())).readObject(); 37 38 String returnStr = new String(message.getBody(),"UTF-8"); 39 JSONObject jsStr = JSONObject.parseObject(returnStr); 40 41 logger.info("延迟测试--消费开始:名称为--===>" + jsStr.getString("name") + "----->返回消息:" + returnStr + "||||消息的Properties:--》" + message.getMessageProperties()); 42 43 //TODO 进行相关业务操作 44 45 //成功处理业务,那么返回消息确认机制,这个消息成功处理OK 46 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 47 48 } catch (Exception e) { 49 if (message.getMessageProperties().getRedelivered()) { 50 //消息已经进行过一次轮询操作,还是失败,将拒绝再次接收本消息 51 logger.info("消息已重复处理失败,拒绝再次接收..."); 52 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息 53 54 //TODO 进行相关业务操作 55 56 } else { 57 //消息第一次接收处理失败后,将再此回到队列中进行 再一次轮询操作 58 logger.info("消息即将再次返回队列处理..."); 59 //处理失败,那么返回消息确认机制,这个消息没有成功处理,返回到队列中 60 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); 61 } 62 } 63 } 64 }
NotifyFailedCallBackListener.java
1 package org.seckill.rabbitmqListener.notify; 2 3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 import org.springframework.amqp.core.Message; 6 import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; 7 8 /** 9 * <p>Title: org.seckill.rabbitmqListener.notify</p> 10 * <p>Company:东软集团(neusoft)</p> 11 * <p>Copyright:Copyright(c)2018</p> 12 * User: 段美林 13 * Date: 2018/6/3 0:28 14 * Description: 延迟任务测试----> 消息发送失败回调类 15 */ 16 public class NotifyFailedCallBackListener implements ReturnCallback { 17 18 private final Logger logger = LoggerFactory.getLogger(this.getClass()); 19 20 /** 21 * Returned message callback. 22 * 23 * @param message the returned message. 24 * @param replyCode the reply code. 25 * @param replyText the reply text. 26 * @param exchange the exchange. 27 * @param routingKey the routing key. 28 */ 29 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { 30 logger.info("延迟测试------------->return--message:" + 31 new String(message.getBody()) + 32 ",replyCode:" + replyCode + ",replyText:" + replyText + 33 ",exchange:" + exchange + ",routingKey:" + routingKey); 34 } 35 }
5、编写消息队列的操作类和接口:
MQProducer.java
1 package org.seckill.utils.rabbitmq; 2 3 import org.springframework.amqp.core.Message; 4 import org.springframework.amqp.core.MessagePostProcessor; 5 import org.springframework.amqp.rabbit.support.CorrelationData; 6 7 /** 8 * <p>Title: org.seckill.utils.rabbitmq</p> 9 * <p>Company:东软集团(neusoft)</p> 10 * <p>Copyright:Copyright(c)2018</p> 11 * User: 段美林 12 * Date: 2018/5/30 11:49 13 * Description: No Description 14 */ 15 public interface MQProducer { 16 17 /** 18 * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key. 19 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 20 * @param message 21 */ 22 void sendDataToRabbitMQ(java.lang.Object message); 23 24 /** 25 * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key. 26 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 27 * @param message 28 * @param messagePostProcessor 29 */ 30 void sendDataToRabbitMQ(java.lang.Object message, MessagePostProcessor messagePostProcessor); 31 32 /** 33 * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key. 34 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 35 * @param message 36 * @param messagePostProcessor 37 * @param correlationData 38 */ 39 void sendDataToRabbitMQ(java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData); 40 41 /** 42 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. 43 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 44 * @param routingKey 45 * @param message 46 */ 47 void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message); 48 49 /** 50 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. 51 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 52 * @param routingKey 53 * @param message 54 * @param correlationData 55 */ 56 void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, CorrelationData correlationData); 57 58 /** 59 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. 60 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 61 * @param routingKey 62 * @param message 63 * @param messagePostProcessor 64 */ 65 void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor); 66 67 /** 68 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. 69 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 70 * @param routingKey 71 * @param message 72 * @param messagePostProcessor 73 * @param correlationData 74 */ 75 void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData); 76 77 /** 78 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key. 79 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 80 * @param exchange 81 * @param routingKey 82 * @param message 83 */ 84 void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message); 85 86 /** 87 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key. 88 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 89 * @param exchange 90 * @param routingKey 91 * @param message 92 * @param correlationData 93 */ 94 void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, CorrelationData correlationData); 95 96 /** 97 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key. 98 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 99 * @param exchange 100 * @param routingKey 101 * @param message 102 * @param messagePostProcessor 103 */ 104 void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor); 105 106 /** 107 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key. 108 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 109 * @param exchange 110 * @param routingKey 111 * @param message 112 * @param messagePostProcessor 113 * @param correlationData 114 */ 115 void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData); 116 117 Message messageBuil(Object handleObject, String msgId); 118 119 String getMsgId(); 120 }
MQProducerImpl.java
1 package org.seckill.utils.rabbitmq.Impl; 2 3 import com.alibaba.fastjson.JSONObject; 4 import org.seckill.utils.rabbitmq.MQProducer; 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 import org.springframework.amqp.AmqpException; 8 import org.springframework.amqp.core.Message; 9 import org.springframework.amqp.core.MessageBuilder; 10 import org.springframework.amqp.core.MessagePostProcessor; 11 import org.springframework.amqp.core.MessageProperties; 12 import org.springframework.amqp.rabbit.core.RabbitTemplate; 13 import org.springframework.amqp.rabbit.support.CorrelationData; 14 import org.springframework.stereotype.Component; 15 16 import java.io.UnsupportedEncodingException; 17 import java.util.UUID; 18 19 /** 20 * <p>Title: org.seckill.utils.rabbitmq.Impl</p> 21 * <p>Company:东软集团(neusoft)</p> 22 * <p>Copyright:Copyright(c)2018</p> 23 * User: 段美林 24 * Date: 2018/6/2 22:54 25 * Description: 消息生产者操作主体类 26 */ 27 @Component 28 public class MQProducerImpl implements MQProducer{ 29 30 private static final Logger logger = LoggerFactory.getLogger(MQProducerImpl.class); 31 32 private RabbitTemplate rabbitTemplate; 33 34 /** 35 * Sets the rabbitTemplate. 36 * <p> 37 * <p>You can use getRabbitTemplate() to get the value of rabbitTemplate</p> 38 * 39 * @param rabbitTemplate rabbitTemplate 40 */ 41 public void setRabbitTemplate(RabbitTemplate rabbitTemplate) { 42 this.rabbitTemplate = rabbitTemplate; 43 } 44 45 /** 46 * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key. 47 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 48 * 49 * @param message 50 */ 51 public void sendDataToRabbitMQ(Object message) { 52 try { 53 if (message instanceof Message){ 54 Message messageSend = (Message) message; 55 String msgId = messageSend.getMessageProperties().getCorrelationId(); 56 CorrelationData correlationData = new CorrelationData(msgId); 57 rabbitTemplate.convertAndSend(rabbitTemplate.getRoutingKey(),message,correlationData); 58 }else { 59 rabbitTemplate.convertAndSend(message); 60 } 61 } catch (AmqpException e) { 62 logger.error(e.getMessage(), e); 63 } 64 } 65 66 /** 67 * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key. 68 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 69 * 70 * @param message 71 * @param messagePostProcessor 72 */ 73 public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor) { 74 try { 75 if (message instanceof Message){ 76 Message messageSend = (Message) message; 77 String msgId = messageSend.getMessageProperties().getCorrelationId(); 78 CorrelationData correlationData = new CorrelationData(msgId); 79 rabbitTemplate.convertAndSend(rabbitTemplate.getRoutingKey(),message,messagePostProcessor,correlationData); 80 }else { 81 rabbitTemplate.convertAndSend(message, messagePostProcessor); 82 } 83 } catch (AmqpException e) { 84 logger.error(e.getMessage(), e); 85 } 86 } 87 88 /** 89 * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key. 90 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 91 * 92 * @param message 93 * @param messagePostProcessor 94 * @param correlationData 95 */ 96 public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) { 97 try { 98 rabbitTemplate.convertAndSend(message, messagePostProcessor, correlationData); 99 } catch (AmqpException e) { 100 logger.error(e.getMessage(), e); 101 } 102 } 103 104 /** 105 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. 106 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 107 * 108 * @param routingKey 109 * @param message 110 */ 111 public void sendDataToRabbitMQ(String routingKey, Object message) { 112 try { 113 if (message instanceof Message){ 114 Message messageSend = (Message) message; 115 String msgId = messageSend.getMessageProperties().getCorrelationId(); 116 CorrelationData correlationData = new CorrelationData(msgId); 117 rabbitTemplate.convertAndSend(routingKey,message,correlationData); 118 }else { 119 rabbitTemplate.convertAndSend(routingKey, message); 120 } 121 } catch (AmqpException e) { 122 logger.error(e.getMessage(), e); 123 } 124 } 125 126 /** 127 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. 128 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 129 * 130 * @param routingKey 131 * @param message 132 * @param correlationData 133 */ 134 public void sendDataToRabbitMQ(String routingKey, Object message, CorrelationData correlationData) { 135 try { 136 rabbitTemplate.convertAndSend(routingKey, message, correlationData); 137 } catch (AmqpException e) { 138 logger.error(e.getMessage(), e); 139 } 140 } 141 142 /** 143 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. 144 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 145 * 146 * @param routingKey 147 * @param message 148 * @param messagePostProcessor 149 */ 150 public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor) { 151 try { 152 if (message instanceof Message){ 153 Message messageSend = (Message) message; 154 String msgId = messageSend.getMessageProperties().getCorrelationId(); 155 CorrelationData correlationData = new CorrelationData(msgId); 156 rabbitTemplate.convertAndSend(routingKey,message,messagePostProcessor,correlationData); 157 }else { 158 rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor); 159 } 160 } catch (AmqpException e) { 161 logger.error(e.getMessage(), e); 162 } 163 } 164 165 /** 166 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. 167 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 168 * 169 * @param routingKey 170 * @param message 171 * @param messagePostProcessor 172 * @param correlationData 173 */ 174 public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) { 175 try { 176 rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor, correlationData); 177 } catch (AmqpException e) { 178 logger.error(e.getMessage(), e); 179 } 180 } 181 182 /** 183 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key. 184 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 185 * 186 * @param exchange 187 * @param routingKey 188 * @param message 189 */ 190 public void sendDataToRabbitMQ(String exchange, String routingKey, Object message) { 191 try { 192 if (message instanceof Message){ 193 Message messageSend = (Message) message; 194 String msgId = messageSend.getMessageProperties().getCorrelationId(); 195 CorrelationData correlationData = new CorrelationData(msgId); 196 rabbitTemplate.convertAndSend(routingKey,message,correlationData); 197 }else { 198 rabbitTemplate.convertAndSend(exchange, routingKey, message); 199 } 200 } catch (AmqpException e) { 201 logger.error(e.getMessage(), e); 202 } 203 } 204 205 /** 206 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key. 207 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 208 * 209 * @param exchange 210 * @param routingKey 211 * @param message 212 * @param correlationData 213 */ 214 public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, CorrelationData correlationData) { 215 try { 216 rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData); 217 } catch (AmqpException e) { 218 logger.error(e.getMessage(), e); 219 } 220 } 221 222 /** 223 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key. 224 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 225 * 226 * @param exchange 227 * @param routingKey 228 * @param message 229 * @param messagePostProcessor 230 */ 231 public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) { 232 try { 233 rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor); 234 } catch (AmqpException e) { 235 logger.error(e.getMessage(), e); 236 } 237 } 238 239 /** 240 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key. 241 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。 242 * 243 * @param exchange 244 * @param routingKey 245 * @param message 246 * @param messagePostProcessor 247 * @param correlationData 248 */ 249 public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) { 250 try { 251 rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData); 252 } catch (AmqpException e) { 253 logger.error(e.getMessage(), e); 254 } 255 } 256 257 /** 258 * 构建Message对象,进行消息发送 259 * @param handleObject 260 * @param msgId 261 * @return 262 */ 263 public Message messageBuil(Object handleObject, String msgId) { 264 try { 265 //先转成JSON 266 String objectJSON = JSONObject.toJSONString(handleObject); 267 //再构建Message对象 268 Message messageBuil = MessageBuilder.withBody(objectJSON.getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) 269 .setCorrelationId(msgId).build(); 270 return messageBuil; 271 } catch (UnsupportedEncodingException e) { 272 logger.error("构建Message出错:" + e.getMessage(),e); 273 return null; 274 } 275 } 276 277 /** 278 * 生成唯一的消息操作id 279 * @return 280 */ 281 public String getMsgId() { 282 return UUID.randomUUID().toString(); 283 } 284 285 }
至此就完成了延迟消息队列的所有代码实现,
以上是关于RabbitMQ 实现消息队列延迟的主要内容,如果未能解决你的问题,请参考以下文章