3分钟快速入门RocketMQ(下)

Posted 冯先生的笔记

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了3分钟快速入门RocketMQ(下)相关的知识,希望对你有一定的参考价值。


集群模式

  • 单 master 模式

也就是只有一个 master 节点,称不上是集群,一旦这个 master 节点宕机,那么整个服务就不可用。

优点:部署简单。
缺点:存在单点故障。
注意:该模式一般只用来个人学习,或者作为开发环境使用,生产环境不推荐使用该模式。

  • 多 master 模式

多个 master 节点组成集群,单个 master 节点宕机或者重启对应用来说没有影响。

优点:所有模式中性能最高。
缺点:单个 master 节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性会受到影响。
注意:使用同步刷盘可以保证消息不丢失,同时每个 Topic 应该均匀分布在集群中每个节点,而不是只在某个节点上,否则,该节点宕机就会对订阅该 Topic 的应用造成影响。

  • 多 master 多 slave 异步复制模式

在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave。master 节点可读可写,但是 slave 只能读不能写,类似于 mysql 的主备模式。

优点: 在 master 宕机时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样。
缺点:使用异步复制的同步方式有可能会有消息丢失的问题。

  • 多 master 多 slave 同步双写模式

同多 master 多 slave 异步复制模式类似,区别在于 master 和 slave 之间的数据同步方式。

优点:同步双写模式能保证数据不丢失。
缺点:发送单个消息 RT 会略长,性能相比异步复制低 10% 左右。
刷盘策略:同步刷盘和异步刷盘(指的是节点自身数据是同步还是异步存储)。
同步方式:同步双写和异步复制(指的一组 master 和 slave 之间数据的同步方式)。
注意:要保证数据可靠,需采用同步刷盘和同步双写的方式,但性能会较其他方式低。


单主部署

3分钟快速入门RocketMQ(下)

鉴于是快速入门,我选择的是第一种单 master 的部署模式。先说明一下我的安装环境:

  1. Centos 7.2

  2. jdk 1.8

  3. Maven 3.2.x

  4. Git

这里 git 可用可不用,主要是用来直接下载 github 上的源码。也可以选择自己到 github 上下载,然后上传到服务器上。这里以 git 操作为示例。

1.clone 源码并用 maven 编译

  
    
    
  
  1. > git clone https://github.com/alibaba/RocketMQ.git /opt/RocketMQ

  2. > cd /opt/RocketMQ && mvn -Dmaven.test.skip=true clean package install assembly:assembly -U

  3. > cd target/alibaba-rocketmq-broker/alibaba-rocketmq

2.启动 Name Server

  
    
    
  
  1. > nohup sh /opt/RocketMQ/bin/mqnamesrv &

  2. //执行 jps 查看进程

  3. > jps

  4. 25913 NamesrvStartup

  5. //查看日志确保服务已正常启动

  6. > tail -f ~/logs/rocketmqlogs/namesrv.log

  7. The Name Server boot success...

3.启动 broker

  
    
    
  
  1. > nohup sh /opt/RocketMQ/bin/mqbroker -n localhost:9876 &

  2. //执行 jps 查看进程

  3. > jps

  4. 25954 BrokerStartup

  5. //查看日志确保服务已正常启动

  6. > tail -f ~/logs/rocketmqlogs/broker.log

  7. The broker[broker-a, 10.1.54.121:10911] boot success...

  
    
    
  
  1. > export NAMESRV_ADDR=localhost:9876

  2. > sh /opt/RocketMQ/bin/tools.sh com.alibaba.rocketmq.example.quickstart.Producer

  3. SendResult [sendStatus=SEND_OK, msgId= ...

  4. > sh /opt/RocketMQ/bin/tools.sh com.alibaba.rocketmq.example.quickstart.Consumer

  5. ConsumeMessageThread_%d Receive New Messages: [MessageExt...

5.关闭服务

  
    
    
  
  1. > sh /opt/RocketMQ/bin/mqshutdown broker

  2. The mqbroker(36695) is running...

  3. Send shutdown request to mqbroker(36695) OK

  4. > sh /opt/RocketMQ/bin/mqshutdown namesrv

  5. The mqnamesrv(36664) is running...

  6. Send shutdown request to mqnamesrv(36664) OK

此处可能遇到的问题:
1.执行"git clone https://github.com/alibaba/RocketMQ.git /home/inspkgs/RocketMQ"时出现以下提示:

  
    
    
  
  1. fatal: unable to access 'https://github.com/alibaba/RocketMQ.git/': Could not resolve host: github.com; Unknown error

解决办法:一般是由于网络原因造成的,执行以下命令:

  
    
    
  
  1. > ping github.com

确定可以 ping 通之后,再重新执行 git clone 命令。

2.执行"mvn -Dmaven.test.skip=true clean package install assembly:assembly -U"编译时,可能出现下载相关 jar 包很慢的情况。 这也是由于默认 maven 中央仓库在国外的原因,可以根据需要在 /home/maven/conf/setting.xml 中的 添加以下内容后重新编译:

  
    
    
  
  1. <mirror>

  2.    <id>aliyun</id>

  3.    <mirrorOf>central</mirrorOf>

  4.    <name>aliyun maven</name>

  5.    <url>http://maven.aliyun.com/nexus/content/groups/public/</url>

  6. </mirror>


示例代码

  • 生产者

 
   
   
 
  1. public class Producer {

  2.    public static void main(String[] args) throws MQClientException, InterruptedException {

  3.        //声明并初始化一个producer

  4.        //需要一个producer group名字作为构造方法的参数,这里为producer1

  5.        DefaultMQProducer producer = new DefaultMQProducer("demo_producer");

  6.        producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");

  7.        //调用start()方法启动一个producer实例

  8.        producer.start();

  9.        //发送10条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值

  10.        for (int i = 0; i < 10; i++) {

  11.            try {

  12.                Message msg = new Message("TopicTest",// topic

  13.                        "TagA",// tag

  14.                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body

  15.                );

  16.                //调用producer的send()方法发送消息

  17.                //这里调用的是同步的方式,所以会有返回结果

  18.                SendResult sendResult = producer.send(msg);

  19.                //打印返回结果,可以看到消息发送的状态以及一些相关信息

  20.                System.out.println(sendResult);

  21.            } catch (Exception e) {

  22.                e.printStackTrace();

  23.                Thread.sleep(1000);

  24.            }

  25.        }

  26.        //发送完消息之后,调用shutdown()方法关闭producer

  27.        producer.shutdown();

  28.    }

  29. }

  • 消费者

 
   
   
 
  1. public class Consumer {

  2.    public static void main(String[] args) throws InterruptedException, MQClientException {

  3.        //声明并初始化一个consumer

  4.        //需要一个consumer group名字作为构造方法的参数,这里为consumer1

  5.        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer");

  6.        consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");

  7.        //这里设置的是一个consumer的消费策略

  8.        //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息

  9.        //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍

  10.        //CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前

  11.        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

  12.        //设置consumer所订阅的Topic和Tag,*代表全部的Tag

  13.        consumer.subscribe("TopicTest", "*");

  14.        //设置一个Listener,主要进行消息的逻辑处理

  15.        consumer.registerMessageListener(new MessageListenerConcurrently() {

  16.            @Override

  17.            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

  18.                                                            ConsumeConcurrentlyContext context) {

  19.                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);

  20.                //返回消费状态

  21.                //CONSUME_SUCCESS 消费成功

  22.                //RECONSUME_LATER 消费失败,需要稍后重新消费

  23.                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

  24.            }

  25.        });

  26.        //调用start()方法启动consumer

  27.        consumer.start();

  28.        System.out.println("Consumer Started.");

  29.    }

  30. }


以上是关于3分钟快速入门RocketMQ(下)的主要内容,如果未能解决你的问题,请参考以下文章

十分钟入门RocketMQ

十分钟入门RocketMQ

阿里中间件团队,十分钟带你入门 RocketMQ

RocketMQ快速入门

5分钟快速入门,用Python做SQLite数据库开发,附代码适合初学

C++基础(3分钟内)快速入门