一篇带您搞懂MQ延迟队列实战操作
Posted 栗子~~
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一篇带您搞懂MQ延迟队列实战操作相关的知识,希望对你有一定的参考价值。
文章目录
前言
如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。
而且听说点赞的人每天的运气都不会太差,实在白嫖的话,那欢迎常来啊!!!
RabbitMq 专栏直通车
MQ-死信队列(延迟操作)外加消息确认模式
先回顾一下RabbitMq核心基础组件:
- 【生产者】: 用于生产、发送信息的模块
- 【消费者】: 用于监听、接受、消费和处理信息的模块
- 【消息】: 可以看成一串实质的数据,如文字、图片、等等,在整个传输过程中,消息是通过二进制数据流来传递的
- 【队列】:消费的暂存区或者存储区,可以理解为中转站,即生产者 -> 队列 -> 消费者
- 【交换机】:同样也可以看成是中转站,用于首次接受和分发消息
- 【路由】:相当于网关、秘钥、地址等等,一般不单独使用,绑定到交换机上,将消息指定到指定的队列
通过上篇文章,我们了解了死信队列的用处,这篇文章主要讲的就是实战,基本上都是代码。
死信队列消息模型构建大概有几步?
- 创建死信队列
- 创建基本交换机 —> 面向生产者
- 创建基本绑定 —>基本交换机+基本路由 —> 面向生产者
- 创建死信交换机
- 创建死信路由及其绑定真正的消费队列
好了,现在废话不多说,开始了;
<<<<<<<<<<<<<<<<<<<<<<<<<<开始演练>>>>>>>>>>>>>>>>>>>>>>>>>>>
01::前期准备:引入相关依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.3.3.RELEASE</version>
</dependency>
02::整合RabbitMQ
02::01-加入RabbitMq相关配置
即在配置文件中加入RabbitMq的配置,ip、端口号、账户密码等等…
spring:
rabbitmq: #RabbitMq 配置
virtual-host: /
host: 127.0.0.1 #IP
post: 5672 #提供服务时的端口
username: guest #连接RabbitMQ的账户
password: guest #连接RabbitMQ的密码
03:: 创建真实队列–交换机、队列、绑定,和确认消费相关配置
/**
* 基于手动确认消费模式实战配置
* @author yangzhenyu
* */
@Configuration
public class ManualMqConfig {
private static Logger log = LoggerFactory.getLogger(ManualMqConfig.class);
public ManualMqConfig() {
log.info("=================== 基于手动确认消费模式实战配置注入IOC===================");
}
@Autowired
private Environment environment;
//自动装配 RabbitMQ 的连接工厂实例
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
//消费者
@Autowired
private ManualConsumer manualConsumer;
//创建队列
@Bean(name = "manualQueueOne")
public Queue manualQueueOne(){
return new Queue(environment.getProperty("mq.yzy.info.manualqueue.name"),true);
}
//交换机
@Bean
public DirectExchange manualExchange(){
return new DirectExchange(environment.getProperty("mq.yzy.info.manualexchange.name"),true,false);
}
//创建绑定
//directQueueOne
@Bean
public Binding basicBindingOne(){
return BindingBuilder.bind(manualQueueOne()).to(manualExchange()).with(environment.getProperty("mq.yzy.info.manualrouting.key.name"));
}
/**
* 基于手动确认消费模式实战配置
* */
@Bean(name = "manualListenerContainer")
public SimpleMessageListenerContainer manualListenerContainer(@Qualifier("manualQueueOne") Queue manualQueue){
//自定义消息监听器所在的容器工厂
SimpleMessageListenerContainer factory = new SimpleMessageListenerContainer();
//设置容器工厂所用的实例
factory.setConnectionFactory(cachingConnectionFactory);
//设置消息的确认消费模式,在这里为MANUAL,表示手动确认消费
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置并发消费者实例的初始值
factory.setConcurrentConsumers(1);
//设置并发消费者实例的最大数量
factory.setMaxConcurrentConsumers(1);
//设置并发消费者实例中每个实例拉取的消息数量
factory.setPrefetchCount(1);
//指定该容器监听的队列
factory.setQueues(manualQueue);
//指定该容器中的消费监听器 即消费者
factory.setMessageListener(manualConsumer);
return factory;
}
}
03::01:设置消息的确认消费模式为手动确认
03::02:指定该容器监听的队列
03::03:指定该容器中的消费监听器 即消费者
03::04:交换机、队列持久化配置
03::05:相关配置信息:
mq:
env: loacl #自定义变量,表示本地开发
yzy:
info:
manualqueue: #消息确认模式
name: ${mq.env}.middleware.mq.yzy.info.manualqueue.one
manualexchange: #消息确认模式
name: ${mq.env}.middleware.mq.yzy.info.manualexchange.one
manualrouting:
key: #确认模式
name: ${mq.env}.middleware.mq.yzy.info.manualrouting.key.one
04:: 制作真实队列消费者,即上述模型中指定的消费者
如下:
/**
* 认为手动确认消费-消费者-字节流模式
* @author yangzhenyu
* */
@Component("manualConsumer")
public class ManualConsumer implements ChannelAwareMessageListener {
private static Logger log = LoggerFactory.getLogger(ManualConsumer.class);
//序列化和返序列化
@Autowired
private ObjectMapper objectMapper;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//获取消息属性
MessageProperties messageProperties = message.getMessageProperties();
//获取消息分发时的全局唯一标识
long tag = messageProperties.getDeliveryTag();
try{
//获得消息体
byte [] msg = message.getBody();
//解析消息体
Student student = objectMapper.readValue(msg,Student.class);
log.info("基于manual机制-确认消息模式-人为手动确定消费-监听到消息:【{}】",objectMapper.writeValueAsString(student));
//执行完逻辑后手动确认,第一个参数代表消息的分发标识(全局唯一),第二个参数代表是否允许批量确认消费
channel.basicAck(tag,true);
}catch (Exception e){
log.error("确认消息模式-人为手动确定消费-发生异常:",e.fillInStackTrace());
/**
* 如果在处理消息的过程中发生异常,则需要人为手动确认消费掉该消息
* 否则该消息将一直停留在队列中,从而导致重复消费
* */
channel.basicReject(tag,false);
}
}
}
05:: 制作死信队列模型配置
如下:
/**
* 死信队列消息模型构建
* @author yangzhenyu
* */
@Configuration
public class DeadExchangeConfig {
private static Logger log = LoggerFactory.getLogger(DeadExchangeConfig.class);
public DeadExchangeConfig() {
log.info("=================== 死信队列消息模型构建注入IOC===================");
}
@Autowired
private Environment environment;
//死信路由
private final static String DEAD_ROUTING="mq.yzy.info.deadrouting.key.name";
//死信交换机
private final static String DEAD_EXCHANGE = "mq.yzy.info.deadexchange.name";
//死信队列
private final static String DEAD_QUEUE = "mq.yzy.info.deadqueue.name";
//死信模型->基本模型->基本交换机(面向生产者)
private final static String DEAD_EXCHANGE_PRODUCER="mq.yzy.info.deadexchange.producer.name";
//死信模型->基本模型->基本路由(面向生产者)
private final static String DEAD_ROUTING_PRODUCER="mq.yzy.info.deadrouting.producer.key.name";
//真正的队列 -> 面向消费者
private final static String REAL_QUEUE="mq.yzy.info.manualqueue.name";
/**
* 创建死信队列
* */
@Bean
public Queue basicDeadQueue(){
//创建死信队列的组成成分map,用来存放组成成员的相关成员
Map<String,Object> args = new HashMap<>(3);
//创建死信交换机
args.put("x-dead-letter-exchange",environment.getProperty(DEAD_EXCHANGE));
//创建死信路由
args.put("x-dead-letter-routing-key",environment.getProperty(DEAD_ROUTING));
//设定 TTL ,单位是毫秒,在这里指的是60s
args.put("x-message-ttl",60000);
//创建并返回死信队列实例
return new Queue(environment.getProperty(DEAD_QUEUE),true,false,false,args);
}
//创建基本交换机 ---> 面向生产者
@Bean
public TopicExchange basicProducerExchange(){
return new TopicExchange(environment.getProperty(DEAD_EXCHANGE_PRODUCER),true,false);
}
//创建基本绑定 --->基本交换机+基本路由 ---> 面向生产者
@Bean
public Binding basicProducerBinding(){
return BindingBuilder.bind(basicDeadQueue()).to(basicProducerExchange()).with(environment.getProperty(DEAD_ROUTING_PRODUCER));
}
//====================================================================================
//创建死信交换机
@Bean
public TopicExchange basicDeadExchange(){
//创建并返回死信交换机实例
return new TopicExchange(environment.getProperty(DEAD_EXCHANGE),true,false);
}
//创建死信路由及其绑定真正的消费队列
/**
* @param manualQueue 真正的队列
* */
@Bean
public Binding basicDeadBindingOne(@Qualifier("manualQueueOne") Queue manualQueue){
return BindingBuilder.bind(manualQueue).to(basicDeadExchange()).with(environment.getProperty(DEAD_ROUTING));
}
}
05::01 制作死信队列模型配置
mq:
env: loacl #自定义变量,表示本地开发
yzy:
info:
deadqueue: #死信队列
name: ${mq.env}.middleware.mq.yzy.info.deadqueue.one
deadrouting: #死信路由
key:
name: ${mq.env}.middleware.mq.yzy.info.deadrouting.key.one
producer: #死信消息模型中 基本模型中的路由
key:
name: ${mq.env}.middleware.mq.yzy.info.deadrouting.producer.key.one
06:: 制作死信队列-生产者
package com.yzy.demo.rabbitmq.dead.publisher;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yzy.demo.test.vo.Student;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
/**
* 死信队列消息模型构建---生产者
* @author yangzhenyu
* */
@Component
public class DeadPublisher {
private static Logger log = LoggerFactory.getLogger(DeadPublisher.class);
//序列化和返序列化
@Autowired
private ObjectMapper objectMapper;
//定义RabbitMQ 组件
@Autowired
private RabbitTemplate rabbitTemplate;
//定义环境变量读取实例
@Autowired
private Environment env;
//死信模型->基本模型->基本交换机(面向生产者)
private final static String DEAD_EXCHANGE_PRODUCER="mq.yzy.info.deadexchange.producer.name";
//死信模型->基本模型->基本路由(面向生产者)
private final static String DEAD_ROUTING_PRODUCER="mq.yzy.info.deadrouting.producer.key.name";
/**
* 发送对象类型的消息给死信队列
* @param info
* */
public void sendMsg(Student info){
try{
if (info != null){
//定义消息传输的格式为json
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//指定消息模型中的交换机
rabbitTemplate.setExchange(env.getProperty(DEAD_EXCHANGE_PRODUCER));
//将字符串值转换成二进制的数据流
Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(info))
.build();
rabbitTemplate.convertAndSend(env.getProperty(DEAD_ROUTING_PRODUCER),msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//获取消息的属性
MessageProperties messageProperties = message.getMessageProperties();
//设置消息的持久化模式
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//设置消息头,即指定发送消息的所属对象类型
messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, Student.class);
//设置消息的TTL,当消息和队列同时设置TTL时,取最短 10s
messageProperties.setExpiration(String.valueOf(10000));
return message;
}
});
log.info("死信队列消息模型 -生产者发出消息:{}",objectMapper.writeValueAsString(info));
}
}catch (Exception e){
log.error("死信队列消息模型 -生产者发出消息-发生异常:{}",info,e.fillInStackTrace());
}
}
}
注: 置消息的TTL,当消息和队列同时设置TTL时,取最短。
07:: 制作测试代码
如下:
//死信队列-延迟
@Autowired
private DeadPublisher deadPublisher;
@Autowired
private ObjectMapper objectMapper;
/**
* 死信队列模型演示
* */
@ApiOperation(value = "死信队列模型演示",notes = "死信队列模型演示")
@ResponseBody
@PostMapping("/deadPublisher")
public ResponseBo deadPublisher(@RequestBody @Valid Student vo) throws JsonProcessingException {
String msgValue = "deadPublisher";
long startTime = init(msgValue,objectMapper.writeValueAsString(vo));
try{
deadPublisher.sendMsg(vo);
endLog(msgValue,startTime);
}catch (Exception e){
endLogError(msgValue,startTime,e);
}
return ResponseBo.ok();
}
vo:
/**
* 学生
* @author yangzhenyu
* */
public class Student implements Serializable {
/**
* 序列号
*/
private static final long serialVersionUID = -5023112818896544461L;
@NotNull(message = "学生 id值不能为null")
@ApiModelProperty(" 学生 id值")
private String sId;
@ApiModelProperty(" 学生 名称")
private String sName;
@ApiModelProperty(" 学生 班级")
private String className;
public String getsId() {
return sId;
}
public 三分钟带您搞懂装饰模式