「MQ实战」RabbitMQ 延迟队列,消息延迟推送
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了「MQ实战」RabbitMQ 延迟队列,消息延迟推送相关的知识,希望对你有一定的参考价值。
参考技术A 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如:在上面两种场景中,如果我们使用下面两种传统解决方案无疑大大降低了系统的整体性能和吞吐量:
在 RabbitMQ 3.6.x 之前我们一般采用死信队列+TTL过期时间来实现延迟队列,我们这里不做过多介绍,可以参考之前文章来了解:TTL、死信队列
在 RabbitMQ 3.6.x 开始,RabbitMQ 官方提供了延迟队列的插件,可以下载放置到 RabbitMQ 根目录下的 plugins 下。延迟队列插件下载
首先我们创建交换机和消息队列,application.properties 中配置与上一篇文章相同。
发送消息时我们需要指定延迟推送的时间,我们这里在发送消息的方法中传入参数 new MessagePostProcessor() 是为了获得 Message对象,因为需要借助 Message对象的api 来设置延迟时间。
我们可以观察 setDelay(Integer i)底层代码,也是在 header 中设置 s-delay。等同于我们手动设置 header
测试结果
果然在 6 秒后收到了消息 lazy receive hello spring boot:
一篇带您搞懂MQ延迟队列实战操作
文章目录
前言
如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。
而且听说点赞的人每天的运气都不会太差,实在白嫖的话,那欢迎常来啊!!!
RabbitMq 专栏直通车
MQ-死信队列(延迟操作)外加消息确认模式
先回顾一下RabbitMq核心基础组件:
- 【生产者】: 用于生产、发送信息的模块
- 【消费者】: 用于监听、接受、消费和处理信息的模块
- 【消息】: 可以看成一串实质的数据,如文字、图片、等等,在整个传输过程中,消息是通过二进制数据流来传递的
- 【队列】:消费的暂存区或者存储区,可以理解为中转站,即生产者 -> 队列 -> 消费者
- 【交换机】:同样也可以看成是中转站,用于首次接受和分发消息
- 【路由】:相当于网关、秘钥、地址等等,一般不单独使用,绑定到交换机上,将消息指定到指定的队列
通过上篇文章,我们了解了死信队列的用处,这篇文章主要讲的就是实战,基本上都是代码。
死信队列消息模型构建大概有几步?
- 创建死信队列
- 创建基本交换机 —> 面向生产者
- 创建基本绑定 —>基本交换机+基本路由 —> 面向生产者
- 创建死信交换机
- 创建死信路由及其绑定真正的消费队列
好了,现在废话不多说,开始了;
<<<<<<<<<<<<<<<<<<<<<<<<<<开始演练>>>>>>>>>>>>>>>>>>>>>>>>>>>
01::前期准备:引入相关依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.3.3.RELEASE</version>
</dependency>
02::整合RabbitMQ
02::01-加入RabbitMq相关配置
即在配置文件中加入RabbitMq的配置,ip、端口号、账户密码等等…
spring:
rabbitmq: #RabbitMq 配置
virtual-host: /
host: 127.0.0.1 #IP
post: 5672 #提供服务时的端口
username: guest #连接RabbitMQ的账户
password: guest #连接RabbitMQ的密码
03:: 创建真实队列–交换机、队列、绑定,和确认消费相关配置
/**
* 基于手动确认消费模式实战配置
* @author yangzhenyu
* */
@Configuration
public class ManualMqConfig {
private static Logger log = LoggerFactory.getLogger(ManualMqConfig.class);
public ManualMqConfig() {
log.info("=================== 基于手动确认消费模式实战配置注入IOC===================");
}
@Autowired
private Environment environment;
//自动装配 RabbitMQ 的连接工厂实例
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
//消费者
@Autowired
private ManualConsumer manualConsumer;
//创建队列
@Bean(name = "manualQueueOne")
public Queue manualQueueOne(){
return new Queue(environment.getProperty("mq.yzy.info.manualqueue.name"),true);
}
//交换机
@Bean
public DirectExchange manualExchange(){
return new DirectExchange(environment.getProperty("mq.yzy.info.manualexchange.name"),true,false);
}
//创建绑定
//directQueueOne
@Bean
public Binding basicBindingOne(){
return BindingBuilder.bind(manualQueueOne()).to(manualExchange()).with(environment.getProperty("mq.yzy.info.manualrouting.key.name"));
}
/**
* 基于手动确认消费模式实战配置
* */
@Bean(name = "manualListenerContainer")
public SimpleMessageListenerContainer manualListenerContainer(@Qualifier("manualQueueOne") Queue manualQueue){
//自定义消息监听器所在的容器工厂
SimpleMessageListenerContainer factory = new SimpleMessageListenerContainer();
//设置容器工厂所用的实例
factory.setConnectionFactory(cachingConnectionFactory);
//设置消息的确认消费模式,在这里为MANUAL,表示手动确认消费
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置并发消费者实例的初始值
factory.setConcurrentConsumers(1);
//设置并发消费者实例的最大数量
factory.setMaxConcurrentConsumers(1);
//设置并发消费者实例中每个实例拉取的消息数量
factory.setPrefetchCount(1);
//指定该容器监听的队列
factory.setQueues(manualQueue);
//指定该容器中的消费监听器 即消费者
factory.setMessageListener(manualConsumer);
return factory;
}
}
03::01:设置消息的确认消费模式为手动确认
03::02:指定该容器监听的队列
03::03:指定该容器中的消费监听器 即消费者
03::04:交换机、队列持久化配置
03::05:相关配置信息:
mq:
env: loacl #自定义变量,表示本地开发
yzy:
info:
manualqueue: #消息确认模式
name: ${mq.env}.middleware.mq.yzy.info.manualqueue.one
manualexchange: #消息确认模式
name: ${mq.env}.middleware.mq.yzy.info.manualexchange.one
manualrouting:
key: #确认模式
name: ${mq.env}.middleware.mq.yzy.info.manualrouting.key.one
04:: 制作真实队列消费者,即上述模型中指定的消费者
如下:
/**
* 认为手动确认消费-消费者-字节流模式
* @author yangzhenyu
* */
@Component("manualConsumer")
public class ManualConsumer implements ChannelAwareMessageListener {
private static Logger log = LoggerFactory.getLogger(ManualConsumer.class);
//序列化和返序列化
@Autowired
private ObjectMapper objectMapper;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//获取消息属性
MessageProperties messageProperties = message.getMessageProperties();
//获取消息分发时的全局唯一标识
long tag = messageProperties.getDeliveryTag();
try{
//获得消息体
byte [] msg = message.getBody();
//解析消息体
Student student = objectMapper.readValue(msg,Student.class);
log.info("基于manual机制-确认消息模式-人为手动确定消费-监听到消息:【{}】",objectMapper.writeValueAsString(student));
//执行完逻辑后手动确认,第一个参数代表消息的分发标识(全局唯一),第二个参数代表是否允许批量确认消费
channel.basicAck(tag,true);
}catch (Exception e){
log.error("确认消息模式-人为手动确定消费-发生异常:",e.fillInStackTrace());
/**
* 如果在处理消息的过程中发生异常,则需要人为手动确认消费掉该消息
* 否则该消息将一直停留在队列中,从而导致重复消费
* */
channel.basicReject(tag,false);
}
}
}
05:: 制作死信队列模型配置
如下:
/**
* 死信队列消息模型构建
* @author yangzhenyu
* */
@Configuration
public class DeadExchangeConfig {
private static Logger log = LoggerFactory.getLogger(DeadExchangeConfig.class);
public DeadExchangeConfig() {
log.info("=================== 死信队列消息模型构建注入IOC===================");
}
@Autowired
private Environment environment;
//死信路由
private final static String DEAD_ROUTING="mq.yzy.info.deadrouting.key.name";
//死信交换机
private final static String DEAD_EXCHANGE = "mq.yzy.info.deadexchange.name";
//死信队列
private final static String DEAD_QUEUE = "mq.yzy.info.deadqueue.name";
//死信模型->基本模型->基本交换机(面向生产者)
private final static String DEAD_EXCHANGE_PRODUCER="mq.yzy.info.deadexchange.producer.name";
//死信模型->基本模型->基本路由(面向生产者)
private final static String DEAD_ROUTING_PRODUCER="mq.yzy.info.deadrouting.producer.key.name";
//真正的队列 -> 面向消费者
private final static String REAL_QUEUE="mq.yzy.info.manualqueue.name";
/**
* 创建死信队列
* */
@Bean
public Queue basicDeadQueue(){
//创建死信队列的组成成分map,用来存放组成成员的相关成员
Map<String,Object> args = new HashMap<>(3);
//创建死信交换机
args.put("x-dead-letter-exchange",environment.getProperty(DEAD_EXCHANGE));
//创建死信路由
args.put("x-dead-letter-routing-key",environment.getProperty(DEAD_ROUTING));
//设定 TTL ,单位是毫秒,在这里指的是60s
args.put("x-message-ttl",60000);
//创建并返回死信队列实例
return new Queue(environment.getProperty(DEAD_QUEUE),true,false,false,args);
}
//创建基本交换机 ---> 面向生产者
@Bean
public TopicExchange basicProducerExchange(){
return new TopicExchange(environment.getProperty(DEAD_EXCHANGE_PRODUCER),true,false);
}
//创建基本绑定 --->基本交换机+基本路由 ---> 面向生产者
@Bean
public Binding basicProducerBinding(){
return BindingBuilder.bind(basicDeadQueue()).to(basicProducerExchange()).with(environment.getProperty(DEAD_ROUTING_PRODUCER));
}
//====================================================================================
//创建死信交换机
@Bean
public TopicExchange basicDeadExchange(){
//创建并返回死信交换机实例
return new TopicExchange(environment.getProperty(DEAD_EXCHANGE),true,false);
}
//创建死信路由及其绑定真正的消费队列
/**
* @param manualQueue 真正的队列
* */
@Bean
public Binding basicDeadBindingOne(@Qualifier("manualQueueOne") Queue manualQueue){
return BindingBuilder.bind(manualQueue).to(basicDeadExchange()).with(environment.getProperty(DEAD_ROUTING));
}
}
05::01 制作死信队列模型配置
mq:
env: loacl #自定义变量,表示本地开发
yzy:
info:
deadqueue: #死信队列
name: ${mq.env}.middleware.mq.yzy.info.deadqueue.one
deadrouting: #死信路由
key:
name: ${mq.env}.middleware.mq.yzy.info.deadrouting.key.one
producer: #死信消息模型中 基本模型中的路由
key:
name: ${mq.env}.middleware.mq.yzy.info.deadrouting.producer.key.one
06:: 制作死信队列-生产者
package com.yzy.demo.rabbitmq.dead.publisher;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yzy.demo.test.vo.Student;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
/**
* 死信队列消息模型构建---生产者
* @author yangzhenyu
* */
@Component
public class DeadPublisher {
private static Logger log = LoggerFactory.getLogger(DeadPublisher.class);
//序列化和返序列化
@Autowired
private ObjectMapper objectMapper;
//定义RabbitMQ 组件
@Autowired
private RabbitTemplate rabbitTemplate;
//定义环境变量读取实例
@Autowired
private Environment env;
//死信模型->基本模型->基本交换机(面向生产者)
private final static String DEAD_EXCHANGE_PRODUCER="mq.yzy.info.deadexchange.producer.name";
//死信模型->基本模型->基本路由(面向生产者)
private final static String DEAD_ROUTING_PRODUCER="mq.yzy.info.deadrouting.producer.key.name";
/**
* 发送对象类型的消息给死信队列
* @param info
* */
public void sendMsg(Student info){
try{
if (info != null){
//定义消息传输的格式为json
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//指定消息模型中的交换机
rabbitTemplate.setExchange(env.getProperty(DEAD_EXCHANGE_PRODUCER));
//将字符串值转换成二进制的数据流
Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(info))
.build();
rabbitTemplate.convertAndSend(env.getProperty(DEAD_ROUTING_PRODUCER),msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//获取消息的属性
MessageProperties messageProperties = message.getMessageProperties();
//设置消息的持久化模式
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//设置消息头,即指定发送消息的所属对象类型
messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, Student.class);
//设置消息的TTL,当消息和队列同时设置TTL时,取最短 10s
messageProperties.setExpiration(String.valueOf(10000));
return message;
}
});
log.info("死信队列消息模型 -生产者发出消息:{}",objectMapper.writeValueAsString(info));
}
}catch (Exception e){
log.error("死信队列消息模型 -生产者发出消息-发生异常:{}",info,e.fillInStackTrace());
}
}
}
注: 置消息的TTL,当消息和队列同时设置TTL时,取最短。
07:: 制作测试代码
如下:
//死信队列-延迟
@Autowired
private DeadPublisher deadPublisher;
@Autowired
private ObjectMapper objectMapper;
/**
* 死信队列模型演示
* */
@ApiOperation(value = "死信队列模型演示",notes = "死信队列模型演示")
@ResponseBody
@PostMapping("/deadPublisher")
public ResponseBo deadPublisher(@RequestBody @Valid Student vo) throws JsonProcessingException {
String msgValue = "deadPublisher";
long startTime = init(msgValue,objectMapper.writeValueAsString(vo));
try{
deadPublisher.sendMsg(vo);
endLog(msgValue,startTime);
}catch (Exception e){
endLogError(msgValue,startTime,e);
}
return ResponseBo.ok();
}
vo:
/**
* 学生
* @author yangzhenyu
* */
public class Student implements Serializable {
/**
* 序列号
*/
private static final long serialVersionUID = -5023112818896544461L;
@NotNull(message = "学生 id值不能为null")
@ApiModelProperty(" 学生 id值")
private String sId;
@ApiModelProperty(" 学生 名称")
private String sName;
@ApiModelProperty(" 学生 班级")
private String className;
public String getsId() {
return sId;
}
public 一篇带您搞懂MQ延迟队列实战操作
扩展自定义mq组件,使用rabbitmq_delayed_message_exchange延迟组件,完善消息延迟消息精度问题