SpringBoot整合RocketMQ

Posted l_learning

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合RocketMQ相关的知识,希望对你有一定的参考价值。

项目环境
jdk:1.8
rocketmq:4.5.1
springboot:2.6.3

引入依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

设置配置

rocketmq.name-server=192.168.241.200:9876
rocketmq.producer.group=test-group

生产者

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

@Component
public class DelayProduce 

    Logger log = LoggerFactory.getLogger(DelayProduce.class);

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendDelayMessage(String topic, String message, int delayLevel)
        SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), 5000, delayLevel);
        log.info("send time: ", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()));
        log.info("sendResult: ", sendResult);
    

消费者

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

@Component
@RocketMQMessageListener(topic = "delay-topic", consumerGroup = "consumer")
public class DelayConsume implements RocketMQListener<String> 

    Logger log = LoggerFactory.getLogger(DelayConsume.class);

    @Override
    public void onMessage(String message) 
        log.info("received message time: ", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()));
        log.info("received message:", message);
    


测试

  1. 先启动一个消费者
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayConsumeTest 

    @Test
    public void onMessage() 
        while (true)

        
    


  1. 启动生产者发送消息
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayProduceTest 

   @Autowired
   private DelayProduce delayProduce;

   @Test
   public void sendDelayMessage()
       delayProduce.sendDelayMessage("delay-topic","hello world", 5);
   


以上是关于SpringBoot整合RocketMQ的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ(二十四)整合SpringBoot

SpringBoot整合RocketMQ

SpringBoot整合RocketMQ

SpringBoot整合RocketMQ实现入门案例

SpringBoot整合rocketmq

springboot整合RocketMq(非事务)