RocketMQ 三种消息发送方式

Posted 流楚丶格念

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 三种消息发送方式相关的知识,希望对你有一定的参考价值。

文章目录

消息发送过程

通过快速入门对消息的发送和接收有一个粗略的认识 ,下边分析具体的消息发送过程 ,如下图 :

消息发送流程如下 :

1. Producer从NameServer中获取主题路由信息

Broker将自己的状态上报给NameServer ,NameServer中存储了每个Broker及主题、消息队列的信息。

Producer根据 topic从NameServer查询所有消息队列 ,查询到的结果例如 :

[
	"brokerName":"Broker ‐1","queueId":0,
	"brokerName":"Broker ‐1","queueId":1,
	"brokerName":"Broker ‐2","queueId":0,
	"brokerName":"Broker ‐2","queueId":1
]

Producer按选择算法从以上队列中选择一个进行消息发送 ,如果发送消息失败则在下次选择的时候 会规避掉失败的broker。

2. 构建消息 ,发送消息

发送消息前进行校验 ,比如消息的内容长度不能为0、消息最大长度、消息必要的属性是否具备等 (topic、消息 体 ,生产组等) 。

如果该topic下还没有队列则自动创建 ,默认一个topic下自动创建4个写队列 ,4个读队列 。

问题

那么为什么要设置多个队列呢 ?

1 ) 高可用:当某个队列不可用时其它队列顶上。

2 ) 提高并发:

  • 发送消息是选择队列进行发送 ,提高发送消息的并发能力。

  • 消息消费时每个消费者可以监听多个队列 ,提高消费消息的并发能力。

生产组有什么用 ?

在事务消息中broker需要回查producer ,同一个生产组的producer组成一个集群 ,提高并发能力。

3. 监听队列 ,消费消息

一个消费组可以包括多个消费者 ,一个消费组可以订阅多个主题。

一个队列同时只允许一个消费者消费 ,一个消费者可以消费多个队列中的消息。

消费组有两种消费模式 :

1 ) 集群模式:一个消费组内的消费者组成一个集群 ,主题下的一条消息只能被一个消费者消费。

2 ) 广播模式:主题下的一条消息能被消费组下的所有消费者消费。

消费者和broker之间通过推模式和拉模式接收消息 ,推模式即broker推送给消费者 ,拉模式是消费者主动从broker 查询消息。

三种消息发送方式

RocketMQ 支持 3 种消息发送方式 ,即同步消息 ( sync message ) 、异步消息 ( async message ) 、单向消息 ( oneway message ) 。

同步消息

producer向 broker 发送消息 ,执行 API 时同步等待 ,直到broker 服务器返回发送结果 。

代码案例:参见快速入门的测试程序:https://yangyongli.blog.csdn.net/article/details/125936067

异步消息

producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法 ,调用 API 后立即返回 ,producer发送消 息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。

在ProducerSimple中编写发送异步消息的方法

/**
 * 异步消息
 * @param topic
 * @param msg
 */
public void sendASyncMsg(String topic,String msg)
    rocketMQTemplate.asyncSend(topic, msg, new SendCallback() 
        //消息发送成功的回调
        @Override
        public void onSuccess(SendResult sendResult) 
            System.out.println(sendResult);
        
        //消息发送失败的回调
        @Override
        public void onException(Throwable throwable) 
            System.out.println(throwable.getMessage());
        
    );

测试 :

@Test
public void testSendASyncMsg()
    producerSimple.sendASyncMsg("my-topic","第1条异步消息");
    try 
        Thread.sleep(5000);
     catch (InterruptedException e) 
        e.printStackTrace();
    

单向消息

producer向 broker 发送消息 ,执行 API 时直接返回 ,不等待broker 服务器的结果 。

/**
 *  发送单向消息 
 *  @param topic 
 *  @param msg 
 *  */
public void sendOneWayMsg(String topic, String msg) 
    this.rocketMQTemplate.sendOneWay(topic, msg);

测试 :

@Test
public void testSendOneMsg()
    producerSimple.sendOneWayMsg("my-topic","第3条单条消息");

以上是关于RocketMQ 三种消息发送方式的主要内容,如果未能解决你的问题,请参考以下文章

rocketMQ之消息的生产与消费

RocketMQ 保证消息不丢失

RocketMQ的消息可靠性(防止消息丢失)

RocketMQ 三种消息发送方式

RocketMq简单的消费者和生产者(示例代码)

RocketMQ(02)——发送消息的三种方式