RocketMQ快速入门实战
Posted 流楚丶格念
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ快速入门实战相关的知识,希望对你有一定的参考价值。
文章目录
1. RocketMQ基础
1.1 组成结构
RocketMQ组成结构图如下 :
1.1.1 名词解释如下:
-
Producer Cluster 消息生产者群:负责发送消息 ,一般由业务系统负责产生消息。
-
Consumer Cluster 消息消费者群:负责消费消息 ,一般是后台系统负责异步消费。
它有两种消费模式 :
Push Consumer ,服务端向消费者端推送消息 Pull Consumer ,消费者端向服务定时拉取消息
-
NameServer 名称服务器:集群架构中的组织协调员 ,相当于注册中心 ,收集broker的工作情况 ,不负责消息的处理
-
Broker 消息服务器是RocketMQ的核心 ,负责消息的接受 ,存储 ,发送等。
需要定时发送自身状态 到NameServer ,默认10秒发送一次 ,超时2分钟会认为该broker失效。
1.1.2 交互过程如下 :
1 ) Brokder定时发送自身状态 到NameServer。
2 ) Producer 请求NameServer获取Broker的地址。
3 ) Producer 将消息发送到Broker中的消息队列。
4 ) Consumer订阅Broker中的消息队列 ,通过拉取消息 ,或由Broker将消息推送至Consumer。
1.2 安装RocketMQ
使用docker安装RocketMQ,可以参考我转载的另一篇博文:https://yangyongli.blog.csdn.net/article/details/125940018
2. 快速入门
2.1 三种消息发送方式
RocketMQ 支持 3 种消息发送方式 :
1、同步消息 ( sync message )
producer向 broker 发送消息 ,执行 API 时同步等待 ,直到broker 服务器返回发送结果 。
2、异步消息 ( async message )
producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法 ,调用 API 后立即返回 ,producer发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。
3、单向消息 ( oneway message )
producer向 broker 发送消息 ,执行 API 时直接返回 ,不等待broker 服务器的结果 。
2.2 消息结构
RocketMQ的消息包括基础属性和扩展属性两部分 :
2.2.1 基础属性
1 ) topic :主题相当于消息的一级分类 ,具有相同topic的消息将发送至该topic下的消息队列中 ,比方说一个电商 系统可以分为商品消息、订单消息、物流消息等 ,就可以在broker中创建商品主题、订单主题等 ,所有商品的消息 发送至该主题下的消息队列中。
2 ) 消息体 :即消息的内容 ,可以的字符串、对象等类型 (可系列化) 。消息的最大长度 是4M。
3 ) 消息 Flag :消息的一个标记 ,RocketMQ不处理 ,留给业务系统使用。
2.2.2 扩展属性
1 ) tag :相当于消息的二级分类 ,用于消费消息时进行过滤 ,可为空 。
2 ) keys: Message 索引键 ,在运维中可以根据这些 key 快速检索到消息 ,可为空 。
3 ) waitStoreMsgOK :消息 发送时是否等消息存储完成后再返回 。
Message 的基础属性主要包括消息所属主题 topic ,消息 Flag(RocketMQ 不做处理) 、扩展属性、消息体 。
2.3 生产者工程
创建生产者工程,工程结构如下 :
1 )创建test-rocketmq
创建一个test-rocketmq的测试工程专门用于rocketmq的功能测试。
test-rocketmq父工程的pom.xml如下 :
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yyl</artifactId>
<groupId>com.yyl</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<artifactId>test-rocketmq</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
2 )创建rocketmq-producer生产者工程
rocketmq-producer的pom.xml如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>test-rocketmq</artifactId>
<groupId>com.yyl</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-producer</artifactId>
</project>
3 ) 新建rocketmq-producer工程 的application.yml文件
注意:你的rocketmq server地址要填你自己的
server:
port: 8181 #服务端口
servlet:
context-path: /rocketmq-producer
spring:
application:
name: rocketmq-producer #指定服务名
rocketmq:
nameServer: 你的rocketmq server地址:9876
producer:
group: demo-producer-group
4 ) 新建启动类
package com.yyl.test.rocketmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Administrator
* @version 1.0
**/
@SpringBootApplication
public class ProducerApplication
public static void main(String[] args)
SpringApplication.run(ProducerApplication.class, args);
发送同步消息
package com.yyl.test.rocketmq.message;
import com.yyl.test.rocketmq.model.OrderExt;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* 发送消息类ProducerSimple
* @author Administrator
* @version 1.0
**/
@Component
public class ProducerSimple
@Autowired
RocketMQTemplate rocketMQTemplate;
/**
* 发送同步消息
* @param topic 主题
* @param msg 消息内容
*/
public void sendSyncMsg(String topic,String msg)
SendResult sendResult = rocketMQTemplate.syncSend(topic, msg);
System.out.println("");
测试
1、在test下编写测试类 ,发送同步消息。
package com.yyl.test.rocketmq.message;
import com.yyl.test.rocketmq.model.OrderExt;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Date;
/**
* @author Administrator
* @version 1.0
**/
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerSimpleTest
@Autowired
ProducerSimple producerSimple;
@Test
public void testSendSyncMsg()
producerSimple.sendSyncMsg("my-topic","第1条同步消息");
2、启动NameServer、 Broker、管理端
3、执行testSendSyncMsg方法
进入管理端 ,查询消息。
2.4 消费者工程
创建消费者工程
创建消息消费者工程 ,pom.xml如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>test-rocketmq</artifactId>
<groupId>com.yyl</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-consumer</artifactId>
</project>
2、启动类
package com.yyl.test.rocketmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Administrator
* @version 1.0
**/
@SpringBootApplication
public class ConsumerApplication
public static void main(String[] args)
SpringApplication.run(ConsumerApplication.class, args);
3、配置文件application.yml
server:
port: 8182 #服务端口
servlet:
context-path: /rocketmq-consumer
spring:
application:
name: rocketmq-producer #指定服务名
rocketmq:
nameServer: 你的 mqserver IP地址:9876
消费消息
编写消费消息监听类 :
package com.yyl.test.rocketmq.message;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @author Administrator
* @version 1.0
**/
@Component
@RocketMQMessageListener(topic = "my-topic",consumerGroup="demo-consumer-group")
public class ConsumerSimple implements RocketMQListener<String>
@Override
public void onMessage(String msg)
//此方法被调用表示接收到消息,msg形参就是消息内容
//处理消息...
System.out.println(msg);
监听消息队列需要指定 :
参数 | 说明 |
---|---|
topic | 监听的主题 |
consumerGroup | 消费组 ,相同消费组的消费者共同消费该主题的消息 ,它们组成一个集群。 |
测试
1、启动消费者工程
启动消费者工程 ,观察控制台输出“第1条同步消息”消息内容 ,这说明从消息队列已经读取到消息。
2、保证消费者工程已启动 ,再次发送消息 ,观察控制台是否输出“第一条同步消息”消息内容 ,输出则说明接收消 息成功。
以上是关于RocketMQ快速入门实战的主要内容,如果未能解决你的问题,请参考以下文章