一篇带您搞懂MQ延迟队列实战操作

Posted 栗子~~

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一篇带您搞懂MQ延迟队列实战操作相关的知识,希望对你有一定的参考价值。

文章目录

前言

  如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。
  而且听说点赞的人每天的运气都不会太差,实在白嫖的话,那欢迎常来啊!!!


RabbitMq 专栏直通车

序号直通车
1干货实战-RabbitMQ(消息队列)的特性-并用具体应用场景来介绍
2windows rabbitMQ安装(轻松简单,快速上手)
3纯干货-详解RabbitMQ的消息模型(看一看,说不定有你想要的呢~~)
4干货实战演练-RabbitMQ普通消息模型(字节流接受模式)<<带源码>>
5干货实战演练-RabbitMQ普通消息模型(对象接受模式)<<带源码>>
6干货实战演练-RabbitMQ广播消息模型(字节流接受模式)<<带源码>>
7干货实战演练-RabbitMQ直连消息模型<<带源码>>
8干货实战演练-RabbitMQ订阅消息模型<<带源码>>
9干货实战-RabbitMQ的消息高可用和确认消费
10干货实战演练-RabbitMQ基于MANUAL机制手动确认消费模型<<带源码>>
11详解-RabbitMQ 死信队列/延迟队列-( 商品秒杀后30分钟之内付款)
12一篇带您搞懂MQ延迟队列实战操作<<带源码>>

MQ-死信队列(延迟操作)外加消息确认模式

先回顾一下RabbitMq核心基础组件:

  • 【生产者】: 用于生产、发送信息的模块
  • 【消费者】: 用于监听、接受、消费和处理信息的模块
  • 【消息】: 可以看成一串实质的数据,如文字、图片、等等,在整个传输过程中,消息是通过二进制数据流来传递的
  • 【队列】:消费的暂存区或者存储区,可以理解为中转站,即生产者 -> 队列 -> 消费者
  • 【交换机】:同样也可以看成是中转站,用于首次接受和分发消息
  • 【路由】:相当于网关、秘钥、地址等等,一般不单独使用,绑定到交换机上,将消息指定到指定的队列

通过上篇文章,我们了解了死信队列的用处,这篇文章主要讲的就是实战,基本上都是代码。

死信队列消息模型构建大概有几步?

  1. 创建死信队列
  2. 创建基本交换机 —> 面向生产者
  3. 创建基本绑定 —>基本交换机+基本路由 —> 面向生产者
  4. 创建死信交换机
  5. 创建死信路由及其绑定真正的消费队列

好了,现在废话不多说,开始了;

<<<<<<<<<<<<<<<<<<<<<<<<<<开始演练>>>>>>>>>>>>>>>>>>>>>>>>>>>

01::前期准备:引入相关依赖

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
			<version>1.3.3.RELEASE</version>
		</dependency>

02::整合RabbitMQ

02::01-加入RabbitMq相关配置

即在配置文件中加入RabbitMq的配置,ip、端口号、账户密码等等…

spring:
  rabbitmq: #RabbitMq 配置
    virtual-host: /
    host: 127.0.0.1  #IP
    post: 5672  #提供服务时的端口
    username: guest #连接RabbitMQ的账户
    password: guest #连接RabbitMQ的密码

03:: 创建真实队列–交换机、队列、绑定,和确认消费相关配置


/**
 * 基于手动确认消费模式实战配置
 * @author yangzhenyu
 * */
@Configuration
public class ManualMqConfig 
    private static Logger log = LoggerFactory.getLogger(ManualMqConfig.class);
    public ManualMqConfig() 
        log.info("=================== 基于手动确认消费模式实战配置注入IOC===================");
    
    @Autowired
    private Environment environment;
    //自动装配 RabbitMQ 的连接工厂实例
    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    //消费者
    @Autowired
    private ManualConsumer manualConsumer;

    //创建队列
    @Bean(name = "manualQueueOne")
    public Queue manualQueueOne()
        return new Queue(environment.getProperty("mq.yzy.info.manualqueue.name"),true);
    

    //交换机
    @Bean
    public DirectExchange manualExchange()
        return new DirectExchange(environment.getProperty("mq.yzy.info.manualexchange.name"),true,false);
    

    //创建绑定
    //directQueueOne
    @Bean
    public Binding basicBindingOne()
        return BindingBuilder.bind(manualQueueOne()).to(manualExchange()).with(environment.getProperty("mq.yzy.info.manualrouting.key.name"));
    

    /**
     * 基于手动确认消费模式实战配置
     * */
    @Bean(name = "manualListenerContainer")
    public SimpleMessageListenerContainer manualListenerContainer(@Qualifier("manualQueueOne") Queue manualQueue)
        //自定义消息监听器所在的容器工厂
        SimpleMessageListenerContainer factory = new SimpleMessageListenerContainer();
        //设置容器工厂所用的实例
        factory.setConnectionFactory(cachingConnectionFactory);
        //设置消息的确认消费模式,在这里为MANUAL,表示手动确认消费
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        //设置并发消费者实例的初始值
        factory.setConcurrentConsumers(1);
        //设置并发消费者实例的最大数量
        factory.setMaxConcurrentConsumers(1);
        //设置并发消费者实例中每个实例拉取的消息数量
        factory.setPrefetchCount(1);

        //指定该容器监听的队列
        factory.setQueues(manualQueue);
        //指定该容器中的消费监听器 即消费者
        factory.setMessageListener(manualConsumer);
        return factory;
    

03::01:设置消息的确认消费模式为手动确认

03::02:指定该容器监听的队列


03::03:指定该容器中的消费监听器 即消费者

03::04:交换机、队列持久化配置

03::05:相关配置信息:

mq:
  env: loacl #自定义变量,表示本地开发
  yzy:
    info:
      manualqueue: #消息确认模式
        name: $mq.env.middleware.mq.yzy.info.manualqueue.one
      manualexchange: #消息确认模式
        name: $mq.env.middleware.mq.yzy.info.manualexchange.one
      manualrouting:
        key: #确认模式
          name: $mq.env.middleware.mq.yzy.info.manualrouting.key.one

04:: 制作真实队列消费者,即上述模型中指定的消费者

如下:


/**
 * 认为手动确认消费-消费者-字节流模式
 * @author yangzhenyu
 * */
@Component("manualConsumer")
public class ManualConsumer implements ChannelAwareMessageListener 
    private static Logger log = LoggerFactory.getLogger(ManualConsumer.class);

    //序列化和返序列化
    @Autowired
    private ObjectMapper objectMapper;


    @Override
    public void onMessage(Message message, Channel channel) throws Exception 
        //获取消息属性
        MessageProperties messageProperties = message.getMessageProperties();
        //获取消息分发时的全局唯一标识
        long tag = messageProperties.getDeliveryTag();
        try
            //获得消息体
            byte [] msg = message.getBody();
            //解析消息体
            Student student = objectMapper.readValue(msg,Student.class);
            log.info("基于manual机制-确认消息模式-人为手动确定消费-监听到消息:【】",objectMapper.writeValueAsString(student));
            //执行完逻辑后手动确认,第一个参数代表消息的分发标识(全局唯一),第二个参数代表是否允许批量确认消费
            channel.basicAck(tag,true);
        catch (Exception e)
            log.error("确认消息模式-人为手动确定消费-发生异常:",e.fillInStackTrace());
            /**
             * 如果在处理消息的过程中发生异常,则需要人为手动确认消费掉该消息
             * 否则该消息将一直停留在队列中,从而导致重复消费
             * */
            channel.basicReject(tag,false);
        
    

05:: 制作死信队列模型配置

如下:

/**
 * 死信队列消息模型构建
 * @author yangzhenyu
 * */
@Configuration
public class DeadExchangeConfig 
    private static Logger log = LoggerFactory.getLogger(DeadExchangeConfig.class);
    public DeadExchangeConfig() 
        log.info("=================== 死信队列消息模型构建注入IOC===================");
    
    @Autowired
    private Environment environment;
    //死信路由
    private final static String DEAD_ROUTING="mq.yzy.info.deadrouting.key.name";
    //死信交换机
    private final static String DEAD_EXCHANGE = "mq.yzy.info.deadexchange.name";
    //死信队列
    private final static String DEAD_QUEUE = "mq.yzy.info.deadqueue.name";
    //死信模型->基本模型->基本交换机(面向生产者)
    private final static String DEAD_EXCHANGE_PRODUCER="mq.yzy.info.deadexchange.producer.name";
    //死信模型->基本模型->基本路由(面向生产者)
    private final static String DEAD_ROUTING_PRODUCER="mq.yzy.info.deadrouting.producer.key.name";
    //真正的队列 -> 面向消费者
    private final static String REAL_QUEUE="mq.yzy.info.manualqueue.name";

    /**
     * 创建死信队列
     * */
    @Bean
    public Queue basicDeadQueue()
        //创建死信队列的组成成分map,用来存放组成成员的相关成员
        Map<String,Object> args = new HashMap<>(3);
        //创建死信交换机
        args.put("x-dead-letter-exchange",environment.getProperty(DEAD_EXCHANGE));
        //创建死信路由
        args.put("x-dead-letter-routing-key",environment.getProperty(DEAD_ROUTING));
        //设定 TTL ,单位是毫秒,在这里指的是60s
        args.put("x-message-ttl",60000);
        //创建并返回死信队列实例
        return new Queue(environment.getProperty(DEAD_QUEUE),true,false,false,args);
    

    //创建基本交换机 ---> 面向生产者
    @Bean
    public TopicExchange basicProducerExchange()
        return new TopicExchange(environment.getProperty(DEAD_EXCHANGE_PRODUCER),true,false);
    

    //创建基本绑定 --->基本交换机+基本路由 ---> 面向生产者
    @Bean
    public Binding basicProducerBinding()
        return BindingBuilder.bind(basicDeadQueue()).to(basicProducerExchange()).with(environment.getProperty(DEAD_ROUTING_PRODUCER));
    
    //====================================================================================



    //创建死信交换机
    @Bean
    public TopicExchange basicDeadExchange()
        //创建并返回死信交换机实例
        return new TopicExchange(environment.getProperty(DEAD_EXCHANGE),true,false);
    

    //创建死信路由及其绑定真正的消费队列
    /**
     * @param manualQueue 真正的队列
     * */
    @Bean
    public Binding basicDeadBindingOne(@Qualifier("manualQueueOne") Queue manualQueue)
        return BindingBuilder.bind(manualQueue).to(basicDeadExchange()).with(environment.getProperty(DEAD_ROUTING));
    


05::01 制作死信队列模型配置

mq:
  env: loacl #自定义变量,表示本地开发
  yzy:
    info:
      deadqueue: #死信队列
        name:  $mq.env.middleware.mq.yzy.info.deadqueue.one
      deadrouting: #死信路由
        key:
          name: $mq.env.middleware.mq.yzy.info.deadrouting.key.one
        producer: #死信消息模型中 基本模型中的路由
          key:
            name: $mq.env.middleware.mq.yzy.info.deadrouting.producer.key.one

06:: 制作死信队列-生产者

package com.yzy.demo.rabbitmq.dead.publisher;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yzy.demo.test.vo.Student;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

/**
 * 死信队列消息模型构建---生产者
 * @author yangzhenyu
 * */
@Component
public class DeadPublisher 
    private static Logger log = LoggerFactory.getLogger(DeadPublisher.class);
    //序列化和返序列化
    @Autowired
    private ObjectMapper objectMapper;
    //定义RabbitMQ 组件
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //定义环境变量读取实例
    @Autowired
    private Environment env;

    //死信模型->基本模型->基本交换机(面向生产者)
    private final static String DEAD_EXCHANGE_PRODUCER="mq.yzy.info.deadexchange.producer.name";
    //死信模型->基本模型->基本路由(面向生产者)
    private final static String DEAD_ROUTING_PRODUCER="mq.yzy.info.deadrouting.producer.key.name";

    /**
     * 发送对象类型的消息给死信队列
     * @param info
     * */
    public void sendMsg(Student info)
        try
            if (info != null)
                //定义消息传输的格式为json
                rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                //指定消息模型中的交换机
                rabbitTemplate.setExchange(env.getProperty(DEAD_EXCHANGE_PRODUCER));
                //将字符串值转换成二进制的数据流
                Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(info))
                        .build();
                rabbitTemplate.convertAndSend(env.getProperty(DEAD_ROUTING_PRODUCER),msg, new MessagePostProcessor() 
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException 
                        //获取消息的属性
                        MessageProperties messageProperties = message.getMessageProperties();
                        //设置消息的持久化模式
                        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        //设置消息头,即指定发送消息的所属对象类型
                        messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, Student.class);
                        //设置消息的TTL,当消息和队列同时设置TTL时,取最短 10s
                        messageProperties.setExpiration(String.valueOf(10000));
                        return message;
                    
                );
                log.info("死信队列消息模型 -生产者发出消息:",objectMapper.writeValueAsString(info));
            
        catch (Exception e)
            log.error("死信队列消息模型 -生产者发出消息-发生异常:",info,e.fillInStackTrace());
        
    


注: 置消息的TTL,当消息和队列同时设置TTL时,取最短。

07:: 制作测试代码

如下:

    //死信队列-延迟
    @Autowired
    private DeadPublisher deadPublisher;
    @Autowired
    private ObjectMapper objectMapper;
    /**
     * 死信队列模型演示
     * */
    @ApiOperation(value = "死信队列模型演示",notes = "死信队列模型演示")
    @ResponseBody
    @PostMapping("/deadPublisher")
    public ResponseBo deadPublisher(@RequestBody @Valid Student vo) throws JsonProcessingException 
        String msgValue = "deadPublisher";
        long startTime = init(msgValue,objectMapper.writeValueAsString(vo));
        try
            deadPublisher.sendMsg(vo);
            endLog(msgValue,startTime);
        catch (Exception e)
            endLogError(msgValue,startTime,e);
        
        return ResponseBo.ok();
    

vo:


/**
 * 学生
 * @author yangzhenyu
 * */
public class Student implements Serializable 
    /**
     * 序列号
     */
    private static final long serialVersionUID = -5023112818896544461L;
    @NotNull(message = "学生 id值不能为null")
    @ApiModelProperty(" 学生 id值")
    private String sId;
    @ApiModelProperty(" 学生 名称")
    private String sName;
    @ApiModelProperty(" 学生 班级")
    private String className;

    public String getsId() 
        return sId;
    

    public void setsId(String sId) 「MQ实战」RabbitMQ 延迟队列,消息延迟推送

三分钟带您搞懂装饰模式

MQ-死信队列实现消息延迟

MQ 死信队列/延迟队列-( 商品秒杀后30分钟之内付款)

MQ 死信队列/延迟队列-( 商品秒杀后30分钟之内付款)

RabbitMQ-消息可靠性&延迟消息