Springboot 整合 rocketmq及调度方案实现

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Springboot 整合 rocketmq及调度方案实现相关的知识,希望对你有一定的参考价值。

参考技术A 最近新启动一个项目,用springboot开发,需要整合rocketmq,到网上收集了一下资料,大多都是一些简单的实现,类似于练习作业的那种。作为产品开发缺乏的东西太多。这里我给出完整的集成和消息调度方案,有不足之处欢迎留言指正。

这里先讲mq的集成,再讲消息发送的设计和消息消费的调度

消息发送实现类

消息体基类

消息

具体业务对应的消息体

监听类

mq消息接收启动类

消息分发

具体业务处理类

注解

消息类型定义

当然消费者端也可以用topic+tag的方式去调用业务实现类,这里给个简单的例子。

SpringBoot整合RocketMQ

项目环境
jdk:1.8
rocketmq:4.5.1
springboot:2.6.3

引入依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

设置配置

rocketmq.name-server=192.168.241.200:9876
rocketmq.producer.group=test-group

生产者

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

@Component
public class DelayProduce 

    Logger log = LoggerFactory.getLogger(DelayProduce.class);

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendDelayMessage(String topic, String message, int delayLevel)
        SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), 5000, delayLevel);
        log.info("send time: ", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()));
        log.info("sendResult: ", sendResult);
    

消费者

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

@Component
@RocketMQMessageListener(topic = "delay-topic", consumerGroup = "consumer")
public class DelayConsume implements RocketMQListener<String> 

    Logger log = LoggerFactory.getLogger(DelayConsume.class);

    @Override
    public void onMessage(String message) 
        log.info("received message time: ", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()));
        log.info("received message:", message);
    


测试

  1. 先启动一个消费者
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayConsumeTest 

    @Test
    public void onMessage() 
        while (true)

        
    


  1. 启动生产者发送消息
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayProduceTest 

   @Autowired
   private DelayProduce delayProduce;

   @Test
   public void sendDelayMessage()
       delayProduce.sendDelayMessage("delay-topic","hello world", 5);
   


以上是关于Springboot 整合 rocketmq及调度方案实现的主要内容,如果未能解决你的问题,请参考以下文章

6 SpringBoot整合RocketMQ发送异步消息

SpringBoot(17)---SpringBoot整合RocketMQ

SpringBoot整合RocketMQ

SpringBoot整合RocketMQ

SpringBoot整合RocketMQ

SpringBoot整合RocketMQ实现入门案例