Rabbitmq 实现延时任务

Posted hsz-csy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbitmq 实现延时任务相关的知识,希望对你有一定的参考价值。

1、需要用到插件 rabbitmq_delayed_message_exchange 来实现,插件下载地址:https://www.rabbitmq.com/community-plugins.html

 

 技术图片

 

2、下载后把插件放到 plugins 里面,然后到 sbin里面打开cmd,执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令

 

技术图片

 

 3、插件装好后,重新启动mq,然后集成mq。

  首先,导包

    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

 

  

  然后,配置文件配置连接信息:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.simple.acknowledge-mode=manual

 

  

  mq 配置:

  

技术图片
package com.rrg.gz.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 *  mq配置
 * @author huangsz  2019/4/25 0025
 */
@Configuration
public class RabbitPluginConfig 

    /**
     * 延时队列交换机
     * 注意这里的交换机类型:CustomExchange
     * @return
     */
    @Bean
    public CustomExchange delayExchange()
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("rrg_delay_exchange","x-delayed-message",true, false,args);
    

    /**
     * 延时队列
     * @return
     */
    @Bean
    public Queue delayQueue()
        return new Queue("rrg_delay_queue",true);
    

    /**
     * 给延时队列绑定交换机
     * @return
     */
    @Bean
    public Binding cfgDelayBinding(Queue cfgDelayQueue, CustomExchange cfgUserDelayExchange)
        return BindingBuilder.bind(cfgDelayQueue).to(cfgUserDelayExchange).with("rrg_delay_key").noargs();
    
View Code

 

 

  发送消息类、接收类和信息类,信息类是我们自己时间业务封装需要消费的信息。

技术图片
package com.rrg.gz.mq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 消息发送者
 *
 * @author huangsz  2019/3/7 0007
 */
@Component
public class Sender 
    private static Logger log = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDelayMessage(MqEntity entity, long time) 
        // 这里的消息可以是任意对象,无需额外配置,直接传即可
        log.info("延时队列生产消息");
        this.rabbitTemplate.convertAndSend(
                "rrg_delay_exchange",
                "rrg_delay_key",
                entity,
                message -> 
                    // 注意这里时间可以使long,而且是设置header
                    message.getMessageProperties().setHeader("x-delay",time);
                    return message;
                
        );
        log.info("ms后执行", time);
    

View Code

 

技术图片
package com.rrg.gz.mq;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 接受者
 *
 * @author huangsz  2019/3/7 0007
 */

@Component
public class Receiver 
    private static Logger log = LoggerFactory.getLogger(Receiver.class);
    @Autowired
    private Sender sender;

    @RabbitListener(queues = "rrg_delay_queue")
    public void cfgUserReceiveDealy(MqEntity entity, Message message, Channel channel) throws Exception
        log.info("开始接受消息!");
        // 通知 MQ 消息已被接收,可以ACK(从队列中删除)了
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        System.out.println("接收消息并打印");
        System.out.println(entity);
    
View Code
技术图片
package com.rrg.gz.mq;


import java.io.Serializable;

/**
 *  一定要实现 Serializable
 * @author huangsz  2019/3/7 0007
 */
public class MqEntity implements Serializable 

    private Integer userId;
    private String msg;

    public MqEntity() 
    

    public MqEntity(Integer userId, String msg) 
        this.userId = userId;
        this.msg = msg;
    

    public Integer getUserId() 
        return userId;
    

    public void setUserId(Integer userId) 
        this.userId = userId;
    

    public String getMsg() 
        return msg;
    

    public void setMsg(String msg) 
        this.msg = msg;
    

    @Override
    public String toString() 
        return "MqEntity" +
                "userId=" + userId +
                ", msg=‘" + msg + ‘\\‘‘ +
                ‘‘;
    
View Code

 

 

4、写一个controller测试:

  

@RequestMapping("/test1")
    public void test()
        MqEntity mqEntity = new MqEntity(1,"30秒后消费");
        sender.sendDelayMessage(mqEntity,30000);
    

    @RequestMapping("/test2")
    public void test2()
        MqEntity mqEntity = new MqEntity(1,"10秒后消费");
        sender.sendDelayMessage(mqEntity,10000);
    

 

 

先执行test1,然后执行test2,这个时候,不需要等test1消费完之后,test2才消费信息。

 

技术图片

 

以上是关于Rabbitmq 实现延时任务的主要内容,如果未能解决你的问题,请参考以下文章

C# 通过 RabbitMQ 实现定时任务 (延时队列)

springboot使用RabbitMQ实现延时任务

rabbitmq 延时队列实现定时任务

rabbitmq 延时队列实现定时任务

通过RabbitMQ的DIRECT模式以及死信队列实现延时任务

RabbitMQ:伪延时队列