Rabbitmq 实现延时任务
Posted hsz-csy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbitmq 实现延时任务相关的知识,希望对你有一定的参考价值。
1、需要用到插件 rabbitmq_delayed_message_exchange 来实现,插件下载地址:https://www.rabbitmq.com/community-plugins.html
2、下载后把插件放到 plugins 里面,然后到 sbin里面打开cmd,执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令
3、插件装好后,重新启动mq,然后集成mq。
首先,导包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
然后,配置文件配置连接信息:
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.listener.simple.acknowledge-mode=manual
mq 配置:
package com.rrg.gz.mq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * mq配置 * @author huangsz 2019/4/25 0025 */ @Configuration public class RabbitPluginConfig /** * 延时队列交换机 * 注意这里的交换机类型:CustomExchange * @return */ @Bean public CustomExchange delayExchange() Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange("rrg_delay_exchange","x-delayed-message",true, false,args); /** * 延时队列 * @return */ @Bean public Queue delayQueue() return new Queue("rrg_delay_queue",true); /** * 给延时队列绑定交换机 * @return */ @Bean public Binding cfgDelayBinding(Queue cfgDelayQueue, CustomExchange cfgUserDelayExchange) return BindingBuilder.bind(cfgDelayQueue).to(cfgUserDelayExchange).with("rrg_delay_key").noargs();
发送消息类、接收类和信息类,信息类是我们自己时间业务封装需要消费的信息。
package com.rrg.gz.mq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 消息发送者 * * @author huangsz 2019/3/7 0007 */ @Component public class Sender private static Logger log = LoggerFactory.getLogger(Sender.class); @Autowired private RabbitTemplate rabbitTemplate; public void sendDelayMessage(MqEntity entity, long time) // 这里的消息可以是任意对象,无需额外配置,直接传即可 log.info("延时队列生产消息"); this.rabbitTemplate.convertAndSend( "rrg_delay_exchange", "rrg_delay_key", entity, message -> // 注意这里时间可以使long,而且是设置header message.getMessageProperties().setHeader("x-delay",time); return message; ); log.info("ms后执行", time);
package com.rrg.gz.mq; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 接受者 * * @author huangsz 2019/3/7 0007 */ @Component public class Receiver private static Logger log = LoggerFactory.getLogger(Receiver.class); @Autowired private Sender sender; @RabbitListener(queues = "rrg_delay_queue") public void cfgUserReceiveDealy(MqEntity entity, Message message, Channel channel) throws Exception log.info("开始接受消息!"); // 通知 MQ 消息已被接收,可以ACK(从队列中删除)了 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("接收消息并打印"); System.out.println(entity);
package com.rrg.gz.mq; import java.io.Serializable; /** * 一定要实现 Serializable * @author huangsz 2019/3/7 0007 */ public class MqEntity implements Serializable private Integer userId; private String msg; public MqEntity() public MqEntity(Integer userId, String msg) this.userId = userId; this.msg = msg; public Integer getUserId() return userId; public void setUserId(Integer userId) this.userId = userId; public String getMsg() return msg; public void setMsg(String msg) this.msg = msg; @Override public String toString() return "MqEntity" + "userId=" + userId + ", msg=‘" + msg + ‘\\‘‘ + ‘‘;
4、写一个controller测试:
@RequestMapping("/test1") public void test() MqEntity mqEntity = new MqEntity(1,"30秒后消费"); sender.sendDelayMessage(mqEntity,30000); @RequestMapping("/test2") public void test2() MqEntity mqEntity = new MqEntity(1,"10秒后消费"); sender.sendDelayMessage(mqEntity,10000);
先执行test1,然后执行test2,这个时候,不需要等test1消费完之后,test2才消费信息。
以上是关于Rabbitmq 实现延时任务的主要内容,如果未能解决你的问题,请参考以下文章