RocketMQ(02)——发送消息的三种方式

Posted elim168

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ(02)——发送消息的三种方式相关的知识,希望对你有一定的参考价值。

发送消息的三种方式

同步发送

Producer在进行消息发送时可以是阻塞的,也可以是非阻塞的。具体对应到发送方式一共有三种,分别是同步、异步和单向的(ONEWAY)。之前介绍的调用send()返回SendResult的方法是阻塞的,它一定要等到Broker进行了响应后才会返回,才能继续往下执行。对于下面的代码就是只有第一条消息发送完了后,才能发送第二条消息,接着是第三条。这种阻塞发送的方式也叫同步发送,它的整个响应时间还包括可能的重试时间。其内部会默认进行两次重试,可以通过setRetryTimesWhenSendFailed()指定同步发送时内部最大的重试次数。

@Test
public void testSyncSend() throws Exception 
  //指定Producer的Group为group1
  DefaultMQProducer producer = new DefaultMQProducer("group1");
  //指定需要连接的Name Server
  producer.setNamesrvAddr(nameServer);
  //发送消息前必须调用start(),其内部会进行一些初始化工作。
  producer.start();
  for (int i = 0; i < 10; i++) 
    //指定消息发送的Topic是topic1。
    Message message = new Message("topic1", ("hello" + i).getBytes());
    //同步发送,发送成功后才会返回
    SendResult sendResult = producer.send(message);
    if (sendResult.getSendStatus() == SendStatus.SEND_OK) 
      System.out.println("消息发送成功:" + sendResult);
     else 
      System.out.println("消息发送失败:" + sendResult);
    
  
  //使用完毕后需要把Producer关闭,以释放相应的资源
  producer.shutdown();


异步发送

同步发送时调用send()的线程会阻塞,而异步发送时当前线程是不会阻塞的。发送结果将由一个回调函数进行回调。下面的代码就是异步发送消息的示例,它与同步发送消息的区别是它在发送消息时多传递了一个SendCallback对象,该方法一调用立马返回,而不需要等待Broker的响应返回。消息发送成功或失败后将回调SendCallback对象的对应方法。所以对于下面示例而言,第一条消息Broker还没有确认发送成功时,第二条消息就发送了,第三条消息也是一样。它们真正在Broker发送成功的顺序其实是不确定的。

@Test
public void sendAsync() throws Exception 
  DefaultMQProducer producer = new DefaultMQProducer("group1");
  producer.setNamesrvAddr(nameServer);
  producer.start();
  CountDownLatch latch = new CountDownLatch(10);
  for (int i = 0; i < 10; i++) 
    Message message = new Message("topic1", ("send by async, no." + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    producer.send(message, new SendCallback() 
      @Override
      public void onSuccess(SendResult sendResult) 
        System.out.println("发送成功:" + message);
        latch.countDown();
      

      @Override
      public void onException(Throwable throwable) 
        System.out.println("发送失败");
        latch.countDown();
      
    );
  
  latch.await();
  producer.shutdown();

通过异步方式发送消息如果失败了,其内部也是会进行重试的,其最大重试次数是通过setRetryTimesWhenSendAsyncFailed()指定的,默认也是2次。

ONEWAY

除了同步发送和异步发送外,还有一种发送方式叫ONEWAY,它的发送是单向的,即它不需要等待Broker的响应,只管发送即可,而不论发送成功与失败。通常应用于一些消息不是那么重要,可丢失的场景。它的发送是通过调用sendOneway()发送的。

@Test
public void sendOneway() throws Exception 
  DefaultMQProducer producer = new DefaultMQProducer("group1");
  producer.setNamesrvAddr(nameServer);
  producer.start();
  for (int i=0; i<10; i++) 
    Message message = new Message("topic1", "tag2", ("message send with oneway, no."+i).getBytes());
    producer.sendOneway(message);
  
  producer.shutdown();

(注:本文是基于Apache RocketMQ4.5.0所写)

以上是关于RocketMQ(02)——发送消息的三种方式的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ消息发送流程

RocketMQ 三种消息发送方式

kafka传递消息的三种方式

RocketMQ专题2:三种常用生产消费方式(顺序广播定时)以及顺序消费源码探究

8RocketMQ 源码解析之消息发送

8RocketMQ 源码解析之消息发送