RocketMQ 三种消息发送方式
Posted 流楚丶格念
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 三种消息发送方式相关的知识,希望对你有一定的参考价值。
文章目录
消息发送过程
通过快速入门对消息的发送和接收有一个粗略的认识 ,下边分析具体的消息发送过程 ,如下图 :
消息发送流程如下 :
1. Producer从NameServer中获取主题路由信息
Broker将自己的状态上报给NameServer ,NameServer中存储了每个Broker及主题、消息队列的信息。
Producer根据 topic从NameServer查询所有消息队列 ,查询到的结果例如 :
[
"brokerName":"Broker ‐1","queueId":0,
"brokerName":"Broker ‐1","queueId":1,
"brokerName":"Broker ‐2","queueId":0,
"brokerName":"Broker ‐2","queueId":1
]
Producer按选择算法从以上队列中选择一个进行消息发送 ,如果发送消息失败则在下次选择的时候 会规避掉失败的broker。
2. 构建消息 ,发送消息
发送消息前进行校验 ,比如消息的内容长度不能为0、消息最大长度、消息必要的属性是否具备等 (topic、消息 体 ,生产组等) 。
如果该topic下还没有队列则自动创建 ,默认一个topic下自动创建4个写队列 ,4个读队列 。
问题
那么为什么要设置多个队列呢 ?
1 ) 高可用:当某个队列不可用时其它队列顶上。
2 ) 提高并发:
-
发送消息是选择队列进行发送 ,提高发送消息的并发能力。
-
消息消费时每个消费者可以监听多个队列 ,提高消费消息的并发能力。
生产组有什么用 ?
在事务消息中broker需要回查producer ,同一个生产组的producer组成一个集群 ,提高并发能力。
3. 监听队列 ,消费消息
一个消费组可以包括多个消费者 ,一个消费组可以订阅多个主题。
一个队列同时只允许一个消费者消费 ,一个消费者可以消费多个队列中的消息。
消费组有两种消费模式 :
1 ) 集群模式:一个消费组内的消费者组成一个集群 ,主题下的一条消息只能被一个消费者消费。
2 ) 广播模式:主题下的一条消息能被消费组下的所有消费者消费。
消费者和broker之间通过推模式和拉模式接收消息 ,推模式即broker推送给消费者 ,拉模式是消费者主动从broker 查询消息。
三种消息发送方式
RocketMQ 支持 3 种消息发送方式 ,即同步消息 ( sync message ) 、异步消息 ( async message ) 、单向消息 ( oneway message ) 。
同步消息
producer向 broker 发送消息 ,执行 API 时同步等待 ,直到broker 服务器返回发送结果 。
代码案例:参见快速入门的测试程序:https://yangyongli.blog.csdn.net/article/details/125936067。
异步消息
producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法 ,调用 API 后立即返回 ,producer发送消 息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。
在ProducerSimple中编写发送异步消息的方法
/**
* 异步消息
* @param topic
* @param msg
*/
public void sendASyncMsg(String topic,String msg)
rocketMQTemplate.asyncSend(topic, msg, new SendCallback()
//消息发送成功的回调
@Override
public void onSuccess(SendResult sendResult)
System.out.println(sendResult);
//消息发送失败的回调
@Override
public void onException(Throwable throwable)
System.out.println(throwable.getMessage());
);
测试 :
@Test
public void testSendASyncMsg()
producerSimple.sendASyncMsg("my-topic","第1条异步消息");
try
Thread.sleep(5000);
catch (InterruptedException e)
e.printStackTrace();
单向消息
producer向 broker 发送消息 ,执行 API 时直接返回 ,不等待broker 服务器的结果 。
/**
* 发送单向消息
* @param topic
* @param msg
* */
public void sendOneWayMsg(String topic, String msg)
this.rocketMQTemplate.sendOneWay(topic, msg);
测试 :
@Test
public void testSendOneMsg()
producerSimple.sendOneWayMsg("my-topic","第3条单条消息");
以上是关于RocketMQ 三种消息发送方式的主要内容,如果未能解决你的问题,请参考以下文章