SpringBoot整合RocketMQ
Posted l_learning
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合RocketMQ相关的知识,希望对你有一定的参考价值。
项目环境
jdk:1.8
rocketmq:4.5.1
springboot:2.6.3
引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
设置配置
rocketmq.name-server=192.168.241.200:9876
rocketmq.producer.group=test-group
生产者
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Component
public class DelayProduce
Logger log = LoggerFactory.getLogger(DelayProduce.class);
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendDelayMessage(String topic, String message, int delayLevel)
SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), 5000, delayLevel);
log.info("send time: ", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()));
log.info("sendResult: ", sendResult);
消费者
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Component
@RocketMQMessageListener(topic = "delay-topic", consumerGroup = "consumer")
public class DelayConsume implements RocketMQListener<String>
Logger log = LoggerFactory.getLogger(DelayConsume.class);
@Override
public void onMessage(String message)
log.info("received message time: ", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()));
log.info("received message:", message);
测试
- 先启动一个消费者
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayConsumeTest
@Test
public void onMessage()
while (true)
- 启动生产者发送消息
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayProduceTest
@Autowired
private DelayProduce delayProduce;
@Test
public void sendDelayMessage()
delayProduce.sendDelayMessage("delay-topic","hello world", 5);
以上是关于SpringBoot整合RocketMQ的主要内容,如果未能解决你的问题,请参考以下文章