RocketMQ快速入门:消息发送延迟消息消费重试
Posted 一宿君
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ快速入门:消息发送延迟消息消费重试相关的知识,希望对你有一定的参考价值。
一起学编程,让生活更随和!
如果你觉得是个同道中人,欢迎关注博主gzh:【随和的皮蛋桑】。
专注于Java基础、进阶、面试以及计算机基础知识分享🐳。偶尔认知思考、日常水文🐌。
目录
1、RocketMQ消息结构
1.1、消息结构
RocketMQ的消息包括基础属性和扩展属性两部分:
1)基础属性
- topic : 主题相当于消息的一级分类,具有相同topic的消息将发送至该topic下的消息队列中,比方说一个电商系统可以分为商品消息、订单消息、物流消息等,就可以在broker中创建商品主题、订单主题等,所有商品的消息发送至该主题下的消息队列中。
- 消息体:即消息的内容 ,可以的字符串、对象等类型(可系列化)。消息的最大长度 是4M。
- 消息Flag:消息的一个标记,RocketMQ不处理,留给业务系统使用。
2)扩展属性:
- tag :相当于消息的二级分类,用于消费消息时进行过滤,可为空 。
- keys: Message 索引键,在运维中可以根据这些 key 快速检索到消息, 可为空 。
- waitStoreMsgOK:消息发送时是否等消息存储完成后再返回 。Message 的基础属性主要包括消息所属主题 topic , 消息 Flag(RocketMQ 不做处理)、 扩展属性、消息体 。
1.2、三种消息发松方式
RocketMQ 支持 3 种消息发送方式 :
1)同步消息(sync message )
producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送结果 。
2)异步消息(async message)
producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,
producer发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。
3)单向消息(oneway message)
producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果 。
2、快速搭建工程
2.1、创建rocketmq-demo父工程
pom.xml引入一下依赖:
<!-- spring-boot父依赖 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.12.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<!-- starter-test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!-- starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
2.2、生产者工程
1)创建rocketmq-producer
生产者工程
2)新建rocketmq-producer
工程的application.yml
文件
# 端口号、上下文路径
server:
port: 8181
servlet:
context-path: /rocketmq-producer
# 服务名
spring:
application:
name: rocketmq-producer
# rocketmq
rocketmq:
name-server: 106.15.0.30:9876 #命名空间地址和端口号
producer:
group: demo-producer-group #生产者组
3) 新建启动类
package com.wbs;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author yixiujun
* @version Id: ProducerApplication.java, v 0.1 Administrator Exp $$
* @date 2023-02-15 09:22:20
* @desc 生产者启动类
*/
@SpringBootApplication
public class ProducerApplication
public static void main(String[] args)
SpringApplication.run(ProducerApplication.class, args);
4)创建发送同步消息方法
package com.wbs.test.message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author yixiujun
* @version Id: ProducerSimple.java, v 0.1 Administrator Exp $$
* @date 2023-02-15 09:24:13
* @desc rocketmq发送消息类
*/
@Component
public class ProducerSimple
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 发送同步消息
*
* @param topic 主题
* @param msg 消息
*/
public void sendSyncMsg(String topic, String msg)
rocketMQTemplate.syncSend(topic, msg);
5)测试
在test
包下创建单元测试ProducerSimpleTest
类:
package com.wbs.test.message;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
/**
* @author yixiujun
* @version Id: ProducerSimpleTest.java, v 0.1 Administrator Exp $$
* @date 2023-02-19 16:24:20
* @desc 生产者发送消息测试
*/
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerSimpleTest
@Resource
private ProducerSimple producerSimple;
/**
* 测试发送同步消息
*/
@Test
public void testSendSyncMsg()
this.producerSimple.sendSyncMsg("my-topic", "第一条同步消息");
System.out.println("end...");
启动服务器端
- NameServer
- Broker
- console管理端
执行上述单元测试testSendSyncMsg
方法,观察控制台和管理端控制台出现end...
表示消息发送成功。
进入管理端,查询消息。
2.3、消费者工程
1)创建消息消费者工程rocketmq-consumer
**2)创建application.yml
文件 **
# 端口号、上下文路径
server:
port: 8182
servlet:
context-path: /rocketmq-consumer
# 服务名
spring:
application:
name: rocketmq-consumer
# rocketmq
rocketmq:
name-server: 106.15.0.30:9876
producer:
group: demo-consumer-group
3)创建启动类
package com.wbs;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author yixiujun
* @version Id: ConsumerApplication.java, v 0.1 Administrator Exp $$
* @date 2023-02-19 16:46:00
* @desc 消费者启动类
*/
@SpringBootApplication
public class ConsumerApplication
public static void main(String[] args)
SpringApplication.run(ConsumerApplication.class, args);
4)消费消息
编写消费消息监听类:
package com.wbs.test.message;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @author yixiujun
* @version Id: ConsumerSimple.java, v 0.1 Administrator Exp $$
* @date 2023-02-19 16:48:16
* @desc 消费者监听类
*/
@Component
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "demo-consumer-group")
public class ConsumerSimple implements RocketMQListener<String>
/**
* 接收到消息后调用此方法
* @param s 消息内容
*/
@Override
public void onMessage(String s)
System.out.println(s + ":已被消费!");
监听消息队列 需要指定:
- topic:监听的主题
- consumerGroup:消费组,相同消费组的消费者共同消费该主题的消息,它们组成一个集群。
2.4、消息发送过程
启动消费者工程,观察控制台输出“第一条同步消息”消息内容,这说明从消息队列已经读取到消息。
保证消费者工程已启动,再次发送消息,观察控制台是否输出“第一条同步消息”消息内容,输出则说明接收消息成功。
3、消息发送过程
通过快速入门对消息的发送和接收有一个粗略的认识,下边分析具体的消息发送过程,如下图:
消息发送流程如下:
1、Producer从NameServer中获取主题路由信息
Broker将自己的状态上报给NameServer,NameServer中存储了每个Broker及主题、消息队列的信息。
Producer根据 topic从NameServer查询所有消息队列,查询到的结果例如:
[
"brokerName":"Broker‐1","queueId":0,
"brokerName":"Broker‐1","queueId":1,
"brokerName":"Broker‐2","queueId":0,
"brokerName":"Broker‐2","queueId":1
]
Producer按选择算法从以上队列中选择一个进行消息发送,如果发送消息失败则在下次选择的时候 会规避掉失败的broker。
2、构建消息,发送消息
发送消息前进行校验,比如消息的内容长度不能为0、消息最大长度、消息必要的属性是否具备等(topic、消息体,生产组等)。
如果该topic下还没有队列则自动创建,默认一个topic下自动创建4个写队列,4个读队列 。为什么要多个队列 ?
- 高可用:当某个队列不可用时其它队列顶上。
- 提高并发:发送消息是选择队列进行发送,提高发送消息的并发能力。消息消费时每个消费者可以监听多个队列,提高消费消息的并发能力。
生产组有什么用?
在事务消息中broker需要回查producer,同一个生产组的producer组成一个集群,提高并发能力。
3、监听队列,消费消息
一个消费组可以包括多个消费者,一个消费组可以订阅多个主题。
一个队列同时只允许一个消费者消费,一个消费者可以消费多个队列中的消息。
消费组有两种消费模式:
1)集群模式
一个消费组内的消费者组成一个集群,主题下的一条消息只能被一个消费者消费。
2)广播模式
主题下的一条消息能被消费组下的所有消费者消费。
消费者和broker之间通过推模式
和拉模式
接收消息,推模式即broker推送给消费者,拉模式是消费者主动从broker查询消息。
4、三种消息发送方式
RocketMQ 支持 3 种消息发送方式 ,即
- 同步消息(sync message )
- 异步消息(async message)
- 单向消息(oneway message)
4.1、同步消息
参考2快速搭建工程(同步消息的简单示例)。
4.2、异步消息
producer
向broker
发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,producer
发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。
在ProducerSimple
中编写发送异步消息的方法:
/**
* 发送异步消息
*
* @param topic 主题
* @param msg 消息
*/
public void sendAsyncMsg(String topic, String msg)
rocketMQTemplate.asyncSend(topic, msg, new SendCallback()
@Override
public void onSuccess(SendResult sendResult)
// 回调成功
System.out.println(sendResult.getSendStatus());
@Override
public void onException(Throwable throwable)
// 回调异常
System.out.println(throwable.getMessage());
);
单元测试:
/**
* 测试发送异步消息
*
* @throws InterruptedException 异常
*/
@Test
public void testSendASyncMsg() throws InterruptedException
this.producerSimple.sendAsyncMsg("my-topic", "第一条异步消息");
System.out.println("end……");
// 异步消息,为跟踪回调线程这里加入延迟
Thread.sleep(3000);
4.3、单向消息
producer
向broker
发送消息,执行 API 时直接返回,不等待broker
服务器的结果 。
在ProducerSimple
中编写发送单项消息的方法:
/**
* 发送单向消息
*
* @param topic 主题
* @param msg 消息
*/
public void sendOneWayMsg(String topic, String msg)
this.rocketMQTemplate.sendOneWay(topic, msg);
测试:
/**
* 测试发送异步消息
*/
@Test
public void testSendOneWayMsg()
this.producerSimple.sendOneWayMsg("my-topic", "第一条单项消息");
System.out.println("end……");
5、自定义消息格式
前边我们发送的消息内容格式都是字符串,在生产开发中消息内容格式是相对较复杂的,下面介绍如何对消息格式进行自定义
。
JSON是互联网开发中非常常用的数据格式,它具有格式标准,扩展方便的特点,将消息的格式使用JSON进行定义,可以提高消息内容的扩展性,RocketMQ支持传递JSON数据格式。
在生产端和消费端定义模型类:
package com.wbs.test.model;
/**
* 自定义消息实体类
*
* @author yixiujun
* @version Id: OrderExt.java, v 0.1 Administrator Exp $$
* @date 2023-02-19 20:55:25
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class OrderExt implements Serializable
private final static Long SERIALIZABLE_UID = -1L;
/**
* 主键id
*/
private String id;
/**
* 创建时间
*/
private Date createTime;
/**
* money
*/
private Long money;
/**
* 标题
*/
private String title;
创建ProducerUserDefineSimple
用户发送自定义消息模板类,创建发送消息内容为json格式的方法,
生产端:
package com.wbs.test.message;
/**
* 用户发送自定义消息模板
*
* @author yixiujun
* @version Id: ProducerUserDefineSimple.java, v 0.1 Administrator Exp $$
* @date 2023-02-19 21:07:16
*/
@Component
public class ProducerUserDefineSimple
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 消息内容为json格式
*
* @param topic 主题
* @param orderExt 消息实体
*/
public void sendMsgByJson(String topic, OrderExt orderExt)
// 发送同步消息,消息内容将orderExt转为json
this.rocketMQTemplate.convertAndSend(topic, orderExt);
System.out.printf("send msg : %s", orderExt);
编写单元测试方法:
/**
* 测试发送JSON格式的内容消息
*/
@Test
public void sendMsgByJson()
OrderExt orderExt = new OrderExt();
orderExt.setId("001");
orderExt.setCreateTime(new Date());
orderExt.setMoney(10000L);
orderExt.setTitle("这是JSON格式数据");
this.producerUserDefineSimple.sendMsgByJson("my-topic", orderExt);
System.out.println("end……");
消费端:
@Component
@RocketMQMessageListener(topic = "my-topic-obj", consumerGroup = "demo-consumer-group-obj")
public class ConsumerUserDefineSimple implements RocketMQListener<String>
/**
* 接收到消息后调用此方法
*
* @param s 消息内容
*/
@Override
public void onMessage(String s)
// 如果是json数据,可以将json转为对象
OrderExt orderExt = JSON.parseObject(s, OrderExt.class);
System.out.println(orderExt);
上例实现了RocketMQ传输JSON消息的过程,消费端在接收到JSON手动将JSON转成对象,也可以自动转换成对象,定义新的监听类,RocketMQListener泛型指定要转换的对象类型
。
6、延迟消息
6.1、延迟消息介绍
延迟消息也叫作定时消息,比如在电商项目的交易系统中,当用户下单之后超过一段时间之后仍然没有支付,此时就需要将该订单关l闭。
要实现该功能的话,可以在用户创建订单时就发送一条包含订单内容的延迟消息,该消息在一段时间之后投递给消息消费者,当消息消费者接收到该消息后,判断该订单的支付状态,如果处于未支付状态,则将该订单关闭。
RocketMQ的延迟消息实现非常简单,只需要发送消息前设置延迟的时间,延迟时间存在十八个等级
(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h),调用setDelayTimeLevel()
设置与时间相对应的延迟级别即可。
6.2、同步消息延迟发送
生产端:
/**
* 发送同步延迟消息(消息内容为json格式)
*
* @param topic 主题
* @param orderExt 消息体
*/
public void sendMsgByJsonDelay(String topic, OrderExt orderExt)
// 发送同步消息,消息内容将orderExt转为json
Message<OrderExt> message = MessageBuilder.withPayload(orderExt).build();
// 指定发送超时时间(毫秒)和延迟等级
this.rocketMQTemplate.syncSend(topic, message, 1000, 3);
System.out.printf("send msg : %s", orderExt);
消费端监听:
/**
* 消费者监听类
*
* @author yixiujun
* @version Id: ConsumerSimple.java, v 0.1 Administrator Exp $$
* @date 2023-02-19 16:48:16
*/
@Component
@RocketMQMessageListener(topic = "my-topic-obj", consumerGroup = "demo-consumer-group-obj")
public class ConsumerUserDefineSimple implements RocketMQListener<OrderExt>
/**
* 根据泛型默认接收将接收到的json数据转化为对应的实体类
*
* @param orderExt json对应实体
*/
@Override
public void onMessage(OrderExt orderExt)
System.out.println(orderExt);
单元测试:
/**
* 测试发送同步延迟消息
*/
@Test
public void testSendMsgByJsonDelay()
OrderExt orderExt = new OrderExt();
orderExt.setId(UUID.randomUUID().toString());
orderExt.setCreateTime(new Date());
orderExt.setMoney(20000L);
orderExt.setTitle("测试订单");
this.producerUserDefineSimple.sendMsgByJsonDelay("my-topic-obj", orderExt);
System.out.println("end……");
6.3、异步消息延迟发送
生产端:
/**
* 发送异步延迟消息(消息内容为json格式)
*
* @param topic 主题
* @param orderE以上是关于RocketMQ快速入门:消息发送延迟消息消费重试的主要内容,如果未能解决你的问题,请参考以下文章