RocketMQ快速入门实战

Posted 流楚丶格念

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ快速入门实战相关的知识,希望对你有一定的参考价值。

文章目录

1. RocketMQ基础

1.1 组成结构

RocketMQ组成结构图如下 :

1.1.1 名词解释如下:

  1. Producer Cluster 消息生产者群:负责发送消息 ,一般由业务系统负责产生消息。

  2. Consumer Cluster 消息消费者群:负责消费消息 ,一般是后台系统负责异步消费。

    它有两种消费模式 :

    Push Consumer ,服务端向消费者端推送消息
    Pull Consumer ,消费者端向服务定时拉取消息
    
  3. NameServer 名称服务器:集群架构中的组织协调员 ,相当于注册中心 ,收集broker的工作情况 ,不负责消息的处理

  4. Broker 消息服务器是RocketMQ的核心 ,负责消息的接受 ,存储 ,发送等。

    需要定时发送自身状态 到NameServer ,默认10秒发送一次 ,超时2分钟会认为该broker失效。

1.1.2 交互过程如下 :

1 ) Brokder定时发送自身状态 到NameServer。

2 ) Producer 请求NameServer获取Broker的地址。

3 ) Producer 将消息发送到Broker中的消息队列。

4 ) Consumer订阅Broker中的消息队列 ,通过拉取消息 ,或由Broker将消息推送至Consumer。

1.2 安装RocketMQ

使用docker安装RocketMQ,可以参考我转载的另一篇博文:https://yangyongli.blog.csdn.net/article/details/125940018

2. 快速入门

2.1 三种消息发送方式

RocketMQ 支持 3 种消息发送方式 :

1、同步消息 ( sync message )

producer向 broker 发送消息 ,执行 API 时同步等待 ,直到broker 服务器返回发送结果 。

2、异步消息 ( async message )

producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法 ,调用 API 后立即返回 ,producer发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。

3、单向消息 ( oneway message )

producer向 broker 发送消息 ,执行 API 时直接返回 ,不等待broker 服务器的结果 。

2.2 消息结构

RocketMQ的消息包括基础属性和扩展属性两部分 :

2.2.1 基础属性

1 ) topic :主题相当于消息的一级分类 ,具有相同topic的消息将发送至该topic下的消息队列中 ,比方说一个电商 系统可以分为商品消息、订单消息、物流消息等 ,就可以在broker中创建商品主题、订单主题等 ,所有商品的消息 发送至该主题下的消息队列中。

2 ) 消息体 :即消息的内容 ,可以的字符串、对象等类型 (可系列化) 。消息的最大长度 是4M。

3 ) 消息 Flag :消息的一个标记 ,RocketMQ不处理 ,留给业务系统使用。

2.2.2 扩展属性

1 ) tag :相当于消息的二级分类 ,用于消费消息时进行过滤 ,可为空 。

2 ) keys: Message 索引键 ,在运维中可以根据这些 key 快速检索到消息 ,可为空 。

3 ) waitStoreMsgOK :消息 发送时是否等消息存储完成后再返回 。

Message 的基础属性主要包括消息所属主题 topic ,消息 Flag(RocketMQ 不做处理) 、扩展属性、消息体 。

2.3 生产者工程

创建生产者工程,工程结构如下 :

1 )创建test-rocketmq

创建一个test-rocketmq的测试工程专门用于rocketmq的功能测试。

test-rocketmq父工程的pom.xml如下 :

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>yyl</artifactId>
        <groupId>com.yyl</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <packaging>pom</packaging>

    <artifactId>test-rocketmq</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

</project>

2 )创建rocketmq-producer生产者工程

rocketmq-producer的pom.xml如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>test-rocketmq</artifactId>
        <groupId>com.yyl</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rocketmq-producer</artifactId>
</project>

3 ) 新建rocketmq-producer工程 的application.yml文件

注意:你的rocketmq server地址要填你自己的

server:
  port: 8181 #服务端口
  servlet:
    context-path: /rocketmq-producer

spring:
  application:
    name: rocketmq-producer #指定服务名
rocketmq:
  nameServer: 你的rocketmq server地址:9876
  producer:
    group: demo-producer-group

4 ) 新建启动类

package com.yyl.test.rocketmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author Administrator
 * @version 1.0
 **/
@SpringBootApplication
public class ProducerApplication 

    public static void main(String[] args) 
        SpringApplication.run(ProducerApplication.class, args);
    



发送同步消息

package com.yyl.test.rocketmq.message;

import com.yyl.test.rocketmq.model.OrderExt;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * 发送消息类ProducerSimple 
 * @author Administrator
 * @version 1.0
 **/
@Component
public class ProducerSimple 

    @Autowired
    RocketMQTemplate rocketMQTemplate;


    /**
     * 发送同步消息
     * @param topic 主题
     * @param msg 消息内容
     */
    public void sendSyncMsg(String topic,String msg)
        SendResult sendResult = rocketMQTemplate.syncSend(topic, msg);
        System.out.println("");
    


测试

1、在test下编写测试类 ,发送同步消息。

package com.yyl.test.rocketmq.message;

import com.yyl.test.rocketmq.model.OrderExt;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Date;

/**
 * @author Administrator
 * @version 1.0
 **/
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerSimpleTest 

    @Autowired
    ProducerSimple producerSimple;

    @Test
    public void testSendSyncMsg()
        producerSimple.sendSyncMsg("my-topic","第1条同步消息");
    


2、启动NameServer、 Broker、管理端

3、执行testSendSyncMsg方法

进入管理端 ,查询消息。


2.4 消费者工程

创建消费者工程

创建消息消费者工程 ,pom.xml如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>test-rocketmq</artifactId>
        <groupId>com.yyl</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rocketmq-consumer</artifactId>
</project>

2、启动类

package com.yyl.test.rocketmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author Administrator
 * @version 1.0
 **/
@SpringBootApplication
public class ConsumerApplication 

    public static void main(String[] args) 
        SpringApplication.run(ConsumerApplication.class, args);
    


3、配置文件application.yml

server:
  port: 8182 #服务端口
  servlet:
    context-path: /rocketmq-consumer

spring:
  application:
    name: rocketmq-producer #指定服务名
rocketmq:
  nameServer: 你的 mqserver IP地址:9876

消费消息

编写消费消息监听类 :

package com.yyl.test.rocketmq.message;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * @author Administrator
 * @version 1.0
 **/
@Component
@RocketMQMessageListener(topic = "my-topic",consumerGroup="demo-consumer-group")
public class ConsumerSimple implements RocketMQListener<String> 

    @Override
    public void onMessage(String msg) 
        //此方法被调用表示接收到消息,msg形参就是消息内容
        //处理消息...
        System.out.println(msg);
    


监听消息队列需要指定 :

参数说明
topic监听的主题
consumerGroup消费组 ,相同消费组的消费者共同消费该主题的消息 ,它们组成一个集群。

测试

1、启动消费者工程

启动消费者工程 ,观察控制台输出“第1条同步消息”消息内容 ,这说明从消息队列已经读取到消息。

2、保证消费者工程已启动 ,再次发送消息 ,观察控制台是否输出“第一条同步消息”消息内容 ,输出则说明接收消 息成功。

以上是关于RocketMQ快速入门实战的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ快速入门

rocketmq4.x快速入门指南

RocketMQ快速入门:消息发送延迟消息消费重试

3分钟快速入门RocketMQ(下)

3分钟快速入门RocketMQ(上)

RocketMQ实战