RocketMQ(六)——集群搭建理论
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ(六)——集群搭建理论相关的知识,希望对你有一定的参考价值。
参考技术A 复制策略
复制策略是Broker得Master与Slave间得数据同步方式。分为同步复制与异步复制:
刷盘策略
刷盘策略指的是broker中消息的落盘方式,即发送到broker内存后消息持久化到磁盘的方式。分为同步刷盘与异步刷盘。
根据Broker集群中各个节点间关系的不同,Broker集群克分为以下几类:
单Master
只有一个broker。这种方式只能在测试时使用,会存在单点问题
多Master无Slave
broker集群仅由多个master构成,不存在Slave。同一Topic的各个Queue会平均分布在各个master节点上。
多Master多Slave模式-异步复制
broker集群由多个master构成,每个master右配置了多个slave(在配置了RAID10磁盘阵列的情况下,一个master一般配置一个slave即可)。master与slave的关系使主备关系,即master负责处理消息的读写请求,而slave仅负责消息的备份与master宕机后的角色切换。
该模式的最大特点之一是,当master宕机后slave能够 自动切换 为master。不过由于slave从master的同步具有短暂的延迟(毫秒级),所以当master宕机后,这种异步复制方式 可能会存在少量消息的丢失问题 。
多Master多Slave模式-同步双写
同步双写 ,指的是消息写入master成功后,master会等待slave同步数据成功后才向producer返回成功ACK。该模式与异步复制模式相比,优点是消息的安全性更高。但单个消息的RT略高,从而导致性能略低。
该模式最大的一个问题:Master宕机后,Slave不会自动切换到Master。
最佳实践
一般会为Master配置RAID10磁盘阵列,然后再为其配置一个Slave。即利用了RAID10磁盘阵列的高效、安全性,又解决了可能会影响订阅的问题。
《从0开始学RocketMQ》—集群搭建
用两台服务器,搭建出一个双master双slave、无单点故障的高可用 RocketMQ 集群。此处假设两台服务器的物理 IP 分别为:192.168.50.1、192.168.50.2。
内容目录
1. 启动 NameServer 集群2. 启动 Broker 集群3. RocketMQ 可视化管理控制台:rocketmq-console4. 集群测试
1. 启动 NameServer 集群
2. 启动 Broker 集群
修改 Broker 配置文件,以使每台服务器上都可以启动一个 Master 角色 的 Broker 和 一个Slave 角色的 Broker。
首先找到 Broker 配置文件,此处我们搭建一个同步双写模式的集群,所以需要修改 2m-2s-sync 目录下的 broker 配置文件:
[root@157-89 ~]# cd /usr/local/rocketmq-all-4.3.2-bin-release/conf/
[root@157-89 conf]# ls
2m-2s-async 2m-2s-sync 2m-noslave broker.conf logback_broker.xml logback_namesrv.xml logback_tools.xml
[root@157-89 conf]# cd 2m-2s-sync/
[root@157-89 2m-2s-sync]# ls
broker-a.properties broker-a-s.properties broker-b.properties broker-b-s.properties
1) 修改 192.168.50.1 服务器上的 broker-a.properties 为 Master 角色的 Broker:
namesrvAddr=192.168.50.1:9876;192.168.50.2:9876
brokerClusterName=rocketMqCluster
brokerIP1=192.168.50.1
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=120
mapedFileSizeConsumeQueue=500000
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
storePathRootDir=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a
storePathCommitLog=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/commitlog
storePathConsumeQueue=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/consumequeue
storePathIndex=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/index
storeCheckpoint=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/checkpoint
abortFile=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/abort
2) 修改 192.168.50.2 服务器上的 broker-b.properties 为 Master 角色的 Broker:
namesrvAddr=192.168.50.1:9876;192.168.50.2:9876
brokerClusterName=rocketMqCluster
brokerIP1=192.168.50.2
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=120
mapedFileSizeConsumeQueue=500000
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
storePathRootDir=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b
storePathCommitLog=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/commitlog
storePathConsumeQueue=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/consumequeue
storePathIndex=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/index
storeCheckpoint=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/checkpoint
abortFile=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/abort
3) 修改 192.168.50.1 服务器上的 broker-b-s.properties 为 Slave 角色的 Broker:
namesrvAddr=192.168.50.1:9876;192.168.50.2:9876
brokerClusterName=rocketMqCluster
brokerIP1=192.168.50.1
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=120
mapedFileSizeConsumeQueue=500000
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10921
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
storePathRootDir=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s
storePathCommitLog=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/commitlog
storePathConsumeQueue=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/consumequeue
storePathIndex=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/index
storeCheckpoint=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/checkpoint
abortFile=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/abort
4) 修改 192.168.50.2 服务器上的 broker-a-s.properties 为 Slave 角色的 Broker:
namesrvAddr=192.168.50.1:9876;192.168.50.2:9876
brokerClusterName=rocketMqCluster
brokerIP1=192.168.50.2
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10921
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
storePathRootDir=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s
storePathCommitLog=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/commitlog
storePathConsumeQueue=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/consumequeue
storePathIndex=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/index
storeCheckpoint=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/checkpoint
abortFile=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/abort
一台服务器上启动多个Broker 时,需指定不同的端口号,记得防火墙放开 NameServer 和 Broker 中用到的端口号哦~
分别启动四个 Broker:
nohup sh bin/mqbroker -c broker_config_file &
3. RocketMQ 可视化管理控制台:rocketmq-console
在服务器 192.168.50.1 上安装即可,无需集群
[root@153-215 local]# git clone https://github.com/apache/rocketmq-externals.git
Cloning into 'rocketmq-externals'...
remote: Enumerating objects: 10, done.
remote: Counting objects: 100% (10/10), done.
remote: Compressing objects: 100% (10/10), done.
remote: Total 9425 (delta 2), reused 1 (delta 0), pack-reused 9415
Receiving objects: 100% (9425/9425), 11.86 MiB | 232.00 KiB/s, done.
Resolving deltas: 100% (4235/4235), done.
[root@153-215 local]# cd rocketmq-externals/
[root@153-215 rocketmq-externals]# ls
dev README.md rocketmq-console rocketmq-docker rocketmq-flink rocketmq-flume rocketmq-hbase rocketmq-iot-bridge rocketmq-jms rocketmq-mysql rocketmq-php rocketmq-redis rocketmq-sentinel rocketmq-serializer rocketmq-spark
[root@153-215 rocketmq-externals]# git branch
* master
[root@153-215 rocketmq-externals]# git fetch origin release-rocketmq-console-1.0.0
From https://github.com/apache/rocketmq-externals
* branch release-rocketmq-console-1.0.0 -> FETCH_HEAD
[root@153-215 rocketmq-externals]# git checkout -b release-1.0.0 origin/release-rocketmq-console-1.0.0
Branch 'release-1.0.0' set up to track remote branch 'release-rocketmq-console-1.0.0' from 'origin'.
Switched to a new branch 'release-1.0.0'
[root@153-215 rocketmq-externals]# ls
README.md rocketmq-console
[root@153-215 rocketmq-externals]# ls rocketmq-console/
doc LICENSE NOTICE pom.xml README.md src style
[root@153-215 rocketmq-externals]# vim rocketmq-console/src/main/resources/application.properties
编辑 application.properties:
server.contextPath=/rocketmq
server.port=8080
#spring.application.index=true
spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.config=classpath:logback.xml
#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr=192.168.50.1:9876;192.168.50.2:9876
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true
移动 rocketmq-console 所在目录,编译并启动 rocketmq-console:
[root@153-215 rocketmq-console]# mv /usr/local/rocketmq-externals/rocketmq-console /usr/local/rocketmq-console
[root@153-215 rocketmq-console]# cd /usr/local/rocketmq-console/
[root@153-215 rocketmq-console]# ls
doc LICENSE NOTICE pom.xml README.md src style
[root@153-215 rocketmq-console]# mvn clean package -Dmaven.test.skip=true
........
[INFO] Building jar: /usr/local/rocketmq-console/target/rocketmq-console-ng-1.0.0-sources.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 02:54 min
[INFO] Finished at: 2019-01-11T17:02:34+08:00
[INFO] ------------------------------------------------------------------------
[root@153-215 rocketmq-console]# ls
doc LICENSE NOTICE pom.xml README.md src style target
[root@153-215 rocketmq-console]# ls target/
checkstyle-cachefile checkstyle-checker.xml checkstyle-result.xml classes generated-sources maven-archiver maven-status rocketmq-console-ng-1.0.0.jar rocketmq-console-ng-1.0.0.jar.original rocketmq-console-ng-1.0.0-sources.jar
[root@153-215 rocketmq-console]# java -jar target/rocketmq-console-ng-1.0.0.jar
.......
[2019-01-11 17:04:15.980] INFO Initializing ProtocolHandler ["http-nio-8080"]
[2019-01-11 17:04:15.991] INFO Starting ProtocolHandler [http-nio-8080]
[2019-01-11 17:04:16.232] INFO Using a shared selector for servlet write/read
[2019-01-11 17:04:16.251] INFO Tomcat started on port(s): 8080 (http)
[2019-01-11 17:04:16.257] INFO Started App in 6.594 seconds (JVM running for 7.239)
4. 集群测试
Producer 测试代码:
public class SyncProducerTest {
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("producer_test_group");
producer.setNamesrvAddr("192.168.50.1:9876;192.168.50.2:9876");
try{
producer.start();
for(int i=0;i<100;i++){
Message message = new Message("topic_test", "tag_test", ("Hello World" + 1).getBytes("UTF-8"));
SendResult sendResult = producer.send(message);
System.out.println(JSON.toJSON(sendResult));
}
producer.shutdown();
}catch (Exception e){
e.printStackTrace();
}
}
}
Consumer 测试代码:
public class SyncConsumerTest {
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_test_group");
consumer.setNamesrvAddr("192.168.50.1:9876;192.168.50.2:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
try {
consumer.subscribe("topic_test", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (messageList, context) -> {
System.out.println(Thread.currentThread().getName() + " Receive New Message:" + messageList);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}catch (Exception e){
e.printStackTrace();
}
}
}
SyncProducerTest 运行日志:
SyncConsumerTest 运行日志:
通过日志可以看到,消费者、生产者收发消息都是正常的,我们去可视化管理控制台查看下 http://192.168.50.1:8080/rocketmq:
通过管控台可以看到,双 master 双 slave 的 broker 集群一切正常,并可进一步看到每个 broker 处理消息的情况。
以上是关于RocketMQ(六)——集群搭建理论的主要内容,如果未能解决你的问题,请参考以下文章