2RocketMQ 源码解析之 与 Spring Boot 集成

Posted carl-zhao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2RocketMQ 源码解析之 与 Spring Boot 集成相关的知识,希望对你有一定的参考价值。

上一篇文章分析了一下 RocketMQ 的基本架构、概念、安装以及使用方式。现在大多数项目都是基于 Spring Boot。因为它很方便的自动装配机制,所以现在构建项目都是基于 Spring Boot。下面我们来分析一下 RocketMQ 是如何集成 Spring Boot 的。

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

RocketMQ 具体的相关概念可以查看 Rocket MQ 基本概念 .

1、项目结构

2、项目依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3、项目配置

application.yml

## Server Config
server:
  port: 8080

## RocketMQ Config
rocketmq:
  name-server: localhost:9876
  producer:
    group: test-topic-producer-group

4、订单实体类

Order 类是我们定义的消息发送的具体实体类。

Order.java

@Data
public class Order 

    /** 订单ID */
    private String id;
    /** 订单名称 */
    private String name;
    /** 订单状态 */
    private Integer status;


5、项目启动类

DemoApplication 这个类是 Spring Boot 启动类,并且依赖注入了 RocketMQTemplate 用于发送消息。

DemoApplication.java

@RestController
@SpringBootApplication
public class DemoApplication 

    @Resource(name = "rocketMQTemplate")
    private RocketMQTemplate rocketMQTemplate;

    public static void main(String[] args) 
        SpringApplication.run(DemoApplication.class, args);
    

    @RequestMapping("/api/test")
    public String test()
        Order order = new Order();
        order.setId("123456");
        order.setName("MacBook Pro");
        order.setStatus(1);
        rocketMQTemplate.convertAndSend("test-topic", order);
        return "ok";
    


6、消息监听类

TestTopicListener 为消息监听类,监听主题为 test-topic 接收到消息发送方的消息。

@Component
@RocketMQMessageListener(consumerGroup = "test-topic-consumer-group", topic = "test-topic")
public class TestTopicListener implements RocketMQListener<Order> 

    public void onMessage(Order order) 
        System.out.println(JSON.toJSONString(order));
    


7、测试

通过 Postman 发送请求:

请求发送成功,并且响应 ok

并且后台控制台也成功打印我们发送出去的消息:

参考文章:

  • https://rocketmq.apache.org/docs/rmq-arc/
  • https://github.com/apache/rocketmq/blob/master/docs/cn/concept.md

以上是关于2RocketMQ 源码解析之 与 Spring Boot 集成的主要内容,如果未能解决你的问题,请参考以下文章

spring与ibatis集成之事务部分源码解析

Spring源码解析之@Configuration

spring cloud kubernetes源码解析之feign与loadbalancer

spring cloud kubernetes源码解析之feign与loadbalancer

spring cloud kubernetes源码解析之feign与loadbalancer

Spring 源码解析之HandlerAdapter源码解析