RocketMQ(01)——简介

Posted elim168

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ(01)——简介相关的知识,希望对你有一定的参考价值。

RocketMQ简介

笔者使用的是Apache RocketMQ,官网是http://rocketmq.apache.org/。RocketMQ是Alibaba开源的一个分布式消息队列,可以通过http://rocketmq.apache.org/dowloading/releases/下载当前最新的版本。下载后解压缩,然后通过bin/mqnamesrv启动一个Name Server,它默认监听在9876端口。然后需要通过bin/mqbroker -n localhost:9876启动一个Broker,并把它注册到刚刚启动的那个Name Server上,Broker默认监听在端口10911上。生产者和消费者都是跟Broker打交道,但是它们不会直接指定Broker的地址,而是通过Name Server来间接的获取Broker的地址。这样做的好处是可以动态的增加Broker,多个Broker之间可以组成一个集群。应用中使用RocketMQ时需要添加RocketMQ Client依赖。

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.0</version>
</dependency>

然后可以通过DefaultMQProducer进行消息的发送,每一个生产者必须指定一个Group,下面代码就指定了Group为group1。相同处理逻辑的生产者必须定义相同的Group。这个Group只对于发送事务消息的生产者有用。然后需要通过setNamesrvAddr()指定Name Server的地址。在发送消息前必须调用其start()。发送的消息是通过org.apache.rocketmq.common.message.Message对象表示的。它必须要指定一个Topic,RocketMQ是通过抽象的Topic来管理一组队列的,这个Topic必须在Broker中进行创建。可以通过bin/mqadmin updateTopic -b localhost:10911 -t topic1在本地刚刚启动的Broker上创建名为topic1的Topic。它默认拥有8个读队列,8个写队列。下面的代码指定了消息都将发送到名为topic1的Topic。通过其send()进行消息发送,它是同步发送的,发送完后会返回一个SendResult。其SendStatus为SEND_OK时表示发送成功。下面的代码一共发送了10条消息到topic1,消息内容分别是hello0…hello9。

@Test
public void testSend() throws Exception 
  //指定Producer的Group为group1
  DefaultMQProducer producer = new DefaultMQProducer("group1");
  //指定需要连接的Name Server
  producer.setNamesrvAddr(nameServer);
  //发送消息前必须调用start(),其内部会进行一些初始化工作。
  producer.start();
  for (int i = 0; i < 10; i++) 
    //指定消息发送的Topic是topic1。
    Message message = new Message("topic1", ("hello" + i).getBytes());
    //同步发送,发送成功后才会返回
    SendResult sendResult = producer.send(message);
    if (sendResult.getSendStatus() == SendStatus.SEND_OK) 
      System.out.println("消息发送成功:" + sendResult);
     else 
      System.out.println("消息发送失败:" + sendResult);
    
  
  //使用完毕后需要把Producer关闭,以释放相应的资源
  producer.shutdown();

消息的消费者可以通过DefaultMQPushConsumer进行消费。DefaultMQPushConsumer是进行推模式消费的,它也需要指定一个Group。默认情况下相同Group的消费者将对同一个队列中的消息进行集群消费,即同一条消息只会被一个Consumer实例进行消费。DefaultMQPushConsumer也需要通过setNamesrvAddr()指定需要连接的Name Server。通过subscribe()指定需要消费的Topic和对应的Tag。下面指定了需要消费的Topic是topic1,通过*指定将消费所有的Tag。Tag是用来对消息进行分类标记的,需要在发送消息的时候指定。通过registerMessageListener()注册消息监听器,当收到消息后会回调它。下面代码注册的是一个MessageListenerConcurrently类型的监听器。消息正常消费后需要返回CONSUME_SUCCESS,如果消费失败可以返回RECONSUME_LATER,这样可以先跳过这一条消息的消费,Broker会过一段时间再投递这一条消息。Consumer也是需要通过start()进行启动。这样消费者就可以开始进行消息消费了,默认只有它启动之后发送的消息才能收到。

@Test
public void testConsumer() throws Exception 
  //创建Consumer并指定消费者组。
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group1");
  //指定需要连接的Name Server
  consumer.setNamesrvAddr(nameServer);
  //订阅topic1上的所有Tag。
  consumer.subscribe("topic1", "*");
  //注册一个消息监听器
  consumer.registerMessageListener(new MessageListenerConcurrently() 
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
      System.out.println(Thread.currentThread().getName() + "收到了消息,数量是:" + msgs.size());
      AtomicInteger counter = new AtomicInteger();
      msgs.forEach(msg -> System.out.println(counter.incrementAndGet() + ".消息内容是:" + new String(msg.getBody())));
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    
  );
  //启动消费者
  consumer.start();
  //为了确保Junit线程不立即死掉。
  TimeUnit.SECONDS.sleep(120);

(注:本文是基于Apache RocketMQ4.5.0所写)

CSDN 社区图书馆,开张营业! 深读计划,写书评领图书福利~

以上是关于RocketMQ(01)——简介的主要内容,如果未能解决你的问题,请参考以下文章

rocketmq-spring : 实战与源码解析一网打尽

一文带你理解 RocketMQ 广播模式实现机制

RocketMQ的关键特性

rocketmq广播消息

RocketMQ(05)——消息的群集消费和广播消费

RocketMQ(05)——消息的群集消费和广播消费