Springboot 整合 rocketmq及调度方案实现
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Springboot 整合 rocketmq及调度方案实现相关的知识,希望对你有一定的参考价值。
参考技术A 最近新启动一个项目,用springboot开发,需要整合rocketmq,到网上收集了一下资料,大多都是一些简单的实现,类似于练习作业的那种。作为产品开发缺乏的东西太多。这里我给出完整的集成和消息调度方案,有不足之处欢迎留言指正。这里先讲mq的集成,再讲消息发送的设计和消息消费的调度
消息发送实现类
消息体基类
消息
具体业务对应的消息体
监听类
mq消息接收启动类
消息分发
具体业务处理类
注解
消息类型定义
当然消费者端也可以用topic+tag的方式去调用业务实现类,这里给个简单的例子。
SpringBoot整合RocketMQ
项目环境
jdk:1.8
rocketmq:4.5.1
springboot:2.6.3
引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
设置配置
rocketmq.name-server=192.168.241.200:9876
rocketmq.producer.group=test-group
生产者
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Component
public class DelayProduce
Logger log = LoggerFactory.getLogger(DelayProduce.class);
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendDelayMessage(String topic, String message, int delayLevel)
SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), 5000, delayLevel);
log.info("send time: ", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()));
log.info("sendResult: ", sendResult);
消费者
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Component
@RocketMQMessageListener(topic = "delay-topic", consumerGroup = "consumer")
public class DelayConsume implements RocketMQListener<String>
Logger log = LoggerFactory.getLogger(DelayConsume.class);
@Override
public void onMessage(String message)
log.info("received message time: ", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()));
log.info("received message:", message);
测试
- 先启动一个消费者
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayConsumeTest
@Test
public void onMessage()
while (true)
- 启动生产者发送消息
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayProduceTest
@Autowired
private DelayProduce delayProduce;
@Test
public void sendDelayMessage()
delayProduce.sendDelayMessage("delay-topic","hello world", 5);
以上是关于Springboot 整合 rocketmq及调度方案实现的主要内容,如果未能解决你的问题,请参考以下文章