一篇带您搞懂MQ延迟队列实战操作

Posted 栗子~~

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一篇带您搞懂MQ延迟队列实战操作相关的知识,希望对你有一定的参考价值。

前言

  如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。
  而且听说点赞的人每天的运气都不会太差,实在白嫖的话,那欢迎常来啊!!!


RabbitMq 专栏直通车

序号直通车
1干货实战-RabbitMQ(消息队列)的特性-并用具体应用场景来介绍
2windows rabbitMQ安装(轻松简单,快速上手)
3纯干货-详解RabbitMQ的消息模型(看一看,说不定有你想要的呢~~)
4干货实战演练-RabbitMQ普通消息模型(字节流接受模式)<<带源码>>
5干货实战演练-RabbitMQ普通消息模型(对象接受模式)<<带源码>>
6干货实战演练-RabbitMQ广播消息模型(字节流接受模式)<<带源码>>
7干货实战演练-RabbitMQ直连消息模型<<带源码>>
8干货实战演练-RabbitMQ订阅消息模型<<带源码>>
9干货实战-RabbitMQ的消息高可用和确认消费
10干货实战演练-RabbitMQ基于MANUAL机制手动确认消费模型<<带源码>>
11详解-RabbitMQ 死信队列/延迟队列-( 商品秒杀后30分钟之内付款)
12一篇带您搞懂MQ延迟队列实战操作<<带源码>>

MQ-死信队列(延迟操作)外加消息确认模式

先回顾一下RabbitMq核心基础组件:

  • 【生产者】: 用于生产、发送信息的模块
  • 【消费者】: 用于监听、接受、消费和处理信息的模块
  • 【消息】: 可以看成一串实质的数据,如文字、图片、等等,在整个传输过程中,消息是通过二进制数据流来传递的
  • 【队列】:消费的暂存区或者存储区,可以理解为中转站,即生产者 -> 队列 -> 消费者
  • 【交换机】:同样也可以看成是中转站,用于首次接受和分发消息
  • 【路由】:相当于网关、秘钥、地址等等,一般不单独使用,绑定到交换机上,将消息指定到指定的队列

通过上篇文章,我们了解了死信队列的用处,这篇文章主要讲的就是实战,基本上都是代码。

死信队列消息模型构建大概有几步?

  1. 创建死信队列
  2. 创建基本交换机 —> 面向生产者
  3. 创建基本绑定 —>基本交换机+基本路由 —> 面向生产者
  4. 创建死信交换机
  5. 创建死信路由及其绑定真正的消费队列

好了,现在废话不多说,开始了;

<<<<<<<<<<<<<<<<<<<<<<<<<<开始演练>>>>>>>>>>>>>>>>>>>>>>>>>>>

01::前期准备:引入相关依赖

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
			<version>1.3.3.RELEASE</version>
		</dependency>

02::整合RabbitMQ

02::01-加入RabbitMq相关配置

即在配置文件中加入RabbitMq的配置,ip、端口号、账户密码等等…

spring:
  rabbitmq: #RabbitMq 配置
    virtual-host: /
    host: 127.0.0.1  #IP
    post: 5672  #提供服务时的端口
    username: guest #连接RabbitMQ的账户
    password: guest #连接RabbitMQ的密码

03:: 创建真实队列–交换机、队列、绑定,和确认消费相关配置


/**
 * 基于手动确认消费模式实战配置
 * @author yangzhenyu
 * */
@Configuration
public class ManualMqConfig {
    private static Logger log = LoggerFactory.getLogger(ManualMqConfig.class);
    public ManualMqConfig() {
        log.info("=================== 基于手动确认消费模式实战配置注入IOC===================");
    }
    @Autowired
    private Environment environment;
    //自动装配 RabbitMQ 的连接工厂实例
    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    //消费者
    @Autowired
    private ManualConsumer manualConsumer;

    //创建队列
    @Bean(name = "manualQueueOne")
    public Queue manualQueueOne(){
        return new Queue(environment.getProperty("mq.yzy.info.manualqueue.name"),true);
    }

    //交换机
    @Bean
    public DirectExchange manualExchange(){
        return new DirectExchange(environment.getProperty("mq.yzy.info.manualexchange.name"),true,false);
    }

    //创建绑定
    //directQueueOne
    @Bean
    public Binding basicBindingOne(){
        return BindingBuilder.bind(manualQueueOne()).to(manualExchange()).with(environment.getProperty("mq.yzy.info.manualrouting.key.name"));
    }

    /**
     * 基于手动确认消费模式实战配置
     * */
    @Bean(name = "manualListenerContainer")
    public SimpleMessageListenerContainer manualListenerContainer(@Qualifier("manualQueueOne") Queue manualQueue){
        //自定义消息监听器所在的容器工厂
        SimpleMessageListenerContainer factory = new SimpleMessageListenerContainer();
        //设置容器工厂所用的实例
        factory.setConnectionFactory(cachingConnectionFactory);
        //设置消息的确认消费模式,在这里为MANUAL,表示手动确认消费
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        //设置并发消费者实例的初始值
        factory.setConcurrentConsumers(1);
        //设置并发消费者实例的最大数量
        factory.setMaxConcurrentConsumers(1);
        //设置并发消费者实例中每个实例拉取的消息数量
        factory.setPrefetchCount(1);

        //指定该容器监听的队列
        factory.setQueues(manualQueue);
        //指定该容器中的消费监听器 即消费者
        factory.setMessageListener(manualConsumer);
        return factory;
    }
}

03::01:设置消息的确认消费模式为手动确认

在这里插入图片描述

03::02:指定该容器监听的队列

在这里插入图片描述
在这里插入图片描述

03::03:指定该容器中的消费监听器 即消费者

在这里插入图片描述

在这里插入图片描述

03::04:交换机、队列持久化配置

在这里插入图片描述

03::05:相关配置信息:

mq:
  env: loacl #自定义变量,表示本地开发
  yzy:
    info:
      manualqueue: #消息确认模式
        name: ${mq.env}.middleware.mq.yzy.info.manualqueue.one
      manualexchange: #消息确认模式
        name: ${mq.env}.middleware.mq.yzy.info.manualexchange.one
      manualrouting:
        key: #确认模式
          name: ${mq.env}.middleware.mq.yzy.info.manualrouting.key.one

04:: 制作真实队列消费者,即上述模型中指定的消费者

如下:


/**
 * 认为手动确认消费-消费者-字节流模式
 * @author yangzhenyu
 * */
@Component("manualConsumer")
public class ManualConsumer implements ChannelAwareMessageListener {
    private static Logger log = LoggerFactory.getLogger(ManualConsumer.class);

    //序列化和返序列化
    @Autowired
    private ObjectMapper objectMapper;


    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //获取消息属性
        MessageProperties messageProperties = message.getMessageProperties();
        //获取消息分发时的全局唯一标识
        long tag = messageProperties.getDeliveryTag();
        try{
            //获得消息体
            byte [] msg = message.getBody();
            //解析消息体
            Student student = objectMapper.readValue(msg,Student.class);
            log.info("基于manual机制-确认消息模式-人为手动确定消费-监听到消息:【{}】",objectMapper.writeValueAsString(student));
            //执行完逻辑后手动确认,第一个参数代表消息的分发标识(全局唯一),第二个参数代表是否允许批量确认消费
            channel.basicAck(tag,true);
        }catch (Exception e){
            log.error("确认消息模式-人为手动确定消费-发生异常:",e.fillInStackTrace());
            /**
             * 如果在处理消息的过程中发生异常,则需要人为手动确认消费掉该消息
             * 否则该消息将一直停留在队列中,从而导致重复消费
             * */
            channel.basicReject(tag,false);
        }
    }
}

05:: 制作死信队列模型配置

如下:

/**
 * 死信队列消息模型构建
 * @author yangzhenyu
 * */
@Configuration
public class DeadExchangeConfig {
    private static Logger log = LoggerFactory.getLogger(DeadExchangeConfig.class);
    public DeadExchangeConfig() {
        log.info("=================== 死信队列消息模型构建注入IOC===================");
    }
    @Autowired
    private Environment environment;
    //死信路由
    private final static String DEAD_ROUTING="mq.yzy.info.deadrouting.key.name";
    //死信交换机
    private final static String DEAD_EXCHANGE = "mq.yzy.info.deadexchange.name";
    //死信队列
    private final static String DEAD_QUEUE = "mq.yzy.info.deadqueue.name";
    //死信模型->基本模型->基本交换机(面向生产者)
    private final static String DEAD_EXCHANGE_PRODUCER="mq.yzy.info.deadexchange.producer.name";
    //死信模型->基本模型->基本路由(面向生产者)
    private final static String DEAD_ROUTING_PRODUCER="mq.yzy.info.deadrouting.producer.key.name";
    //真正的队列 -> 面向消费者
    private final static String REAL_QUEUE="mq.yzy.info.manualqueue.name";

    /**
     * 创建死信队列
     * */
    @Bean
    public Queue basicDeadQueue(){
        //创建死信队列的组成成分map,用来存放组成成员的相关成员
        Map<String,Object> args = new HashMap<>(3);
        //创建死信交换机
        args.put("x-dead-letter-exchange",environment.getProperty(DEAD_EXCHANGE));
        //创建死信路由
        args.put("x-dead-letter-routing-key",environment.getProperty(DEAD_ROUTING));
        //设定 TTL ,单位是毫秒,在这里指的是60s
        args.put("x-message-ttl",60000);
        //创建并返回死信队列实例
        return new Queue(environment.getProperty(DEAD_QUEUE),true,false,false,args);
    }

    //创建基本交换机 ---> 面向生产者
    @Bean
    public TopicExchange basicProducerExchange(){
        return new TopicExchange(environment.getProperty(DEAD_EXCHANGE_PRODUCER),true,false);
    }

    //创建基本绑定 --->基本交换机+基本路由 ---> 面向生产者
    @Bean
    public Binding basicProducerBinding(){
        return BindingBuilder.bind(basicDeadQueue()).to(basicProducerExchange()).with(environment.getProperty(DEAD_ROUTING_PRODUCER));
    }
    //====================================================================================



    //创建死信交换机
    @Bean
    public TopicExchange basicDeadExchange(){
        //创建并返回死信交换机实例
        return new TopicExchange(environment.getProperty(DEAD_EXCHANGE),true,false);
    }

    //创建死信路由及其绑定真正的消费队列
    /**
     * @param manualQueue 真正的队列
     * */
    @Bean
    public Binding basicDeadBindingOne(@Qualifier("manualQueueOne") Queue manualQueue){
        return BindingBuilder.bind(manualQueue).to(basicDeadExchange()).with(environment.getProperty(DEAD_ROUTING));
    }

}

05::01 制作死信队列模型配置

mq:
  env: loacl #自定义变量,表示本地开发
  yzy:
    info:
      deadqueue: #死信队列
        name:  ${mq.env}.middleware.mq.yzy.info.deadqueue.one
      deadrouting: #死信路由
        key:
          name: ${mq.env}.middleware.mq.yzy.info.deadrouting.key.one
        producer: #死信消息模型中 基本模型中的路由
          key:
            name: ${mq.env}.middleware.mq.yzy.info.deadrouting.producer.key.one

06:: 制作死信队列-生产者

package com.yzy.demo.rabbitmq.dead.publisher;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yzy.demo.test.vo.Student;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

/**
 * 死信队列消息模型构建---生产者
 * @author yangzhenyu
 * */
@Component
public class DeadPublisher {
    private static Logger log = LoggerFactory.getLogger(DeadPublisher.class);
    //序列化和返序列化
    @Autowired
    private ObjectMapper objectMapper;
    //定义RabbitMQ 组件
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //定义环境变量读取实例
    @Autowired
    private Environment env;

    //死信模型->基本模型->基本交换机(面向生产者)
    private final static String DEAD_EXCHANGE_PRODUCER="mq.yzy.info.deadexchange.producer.name";
    //死信模型->基本模型->基本路由(面向生产者)
    private final static String DEAD_ROUTING_PRODUCER="mq.yzy.info.deadrouting.producer.key.name";

    /**
     * 发送对象类型的消息给死信队列
     * @param info
     * */
    public void sendMsg(Student info){
        try{
            if (info != null){
                //定义消息传输的格式为json
                rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                //指定消息模型中的交换机
                rabbitTemplate.setExchange(env.getProperty(DEAD_EXCHANGE_PRODUCER));
                //将字符串值转换成二进制的数据流
                Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(info))
                        .build();
                rabbitTemplate.convertAndSend(env.getProperty(DEAD_ROUTING_PRODUCER),msg, new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        //获取消息的属性
                        MessageProperties messageProperties = message.getMessageProperties();
                        //设置消息的持久化模式
                        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        //设置消息头,即指定发送消息的所属对象类型
                        messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, Student.class);
                        //设置消息的TTL,当消息和队列同时设置TTL时,取最短 10s
                        messageProperties.setExpiration(String.valueOf(10000));
                        return message;
                    }
                });
                log.info("死信队列消息模型 -生产者发出消息:{}",objectMapper.writeValueAsString(info));
            }
        }catch (Exception e){
            log.error("死信队列消息模型 -生产者发出消息-发生异常:{}",info,e.fillInStackTrace());
        }
    }
}

注: 置消息的TTL,当消息和队列同时设置TTL时,取最短。

07:: 制作测试代码

如下:

    //死信队列-延迟
    @Autowired
    private DeadPublisher deadPublisher;
    @Autowired
    private ObjectMapper objectMapper;
    /**
     * 死信队列模型演示
     * */
    @ApiOperation(value = "死信队列模型演示",notes = "死信队列模型演示")
    @ResponseBody
    @PostMapping("/deadPublisher")
    public ResponseBo deadPublisher(@RequestBody @Valid Student vo) throws JsonProcessingException {
        String msgValue = "deadPublisher";
        long startTime = init(msgValue,objectMapper.writeValueAsString(vo));
        try{
            deadPublisher.sendMsg(vo);
            endLog(msgValue,startTime);
        }catch (Exception e){
            endLogError(msgValue,startTime,e);
        }
        return ResponseBo.ok();
    }

vo:


/**
 * 学生
 * @author yangzhenyu
 * */
public class Student implements Serializable {
    /**
     * 序列号
     */
    private static final long serialVersionUID = -5023112818896544461L;
    @NotNull(message = "学生 id值不能为null")
    @ApiModelProperty(" 学生 id值")
    private String sId;
    @ApiModelProperty(" 学生 名称")
    private String sName;
    @ApiModelProperty(" 学生 班级")
    private String className;

    public String getsId() {
        return sId;
    }

    public 三分钟带您搞懂装饰模式

JavaScript之用new操作符调用函数——一篇带你搞懂

「MQ实战」RabbitMQ 延迟队列,消息延迟推送

一篇带你搞懂 java 集合

一篇带你搞懂 java 集合

MQ-死信队列实现消息延迟