RocketMQRocketMQ集群架构
Posted 每天都要进步一点点
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQRocketMQ集群架构相关的知识,希望对你有一定的参考价值。
一、RocketMQ集群架构
RocketMQ网络部署图如下:
RocketMQ中主要涉及到四种角色:NameServer注册服务器、Broker服务器、Producer生产者、Consumer消费者。每种角色都可以单独搭建集群,下面我们分别介绍一下NameServer 集群、Broker 集群、Producer 集群、Customer 集群。
- (一)、NameServer 集群
NameServer 是一个无状态的节点,可集群部署,节点都是各自独立的,无任何信息同步。
- (二)、 Broker 集群
① Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但一个 Slave 只能对应一个Master;
② Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerID 来定义,id 为 0 表示 Master, 非 0 表示 Slave;
③ 可以部署多个 Master 实现 Broker 集群,即多组 Master - Slave 的 Broker 节点;
④ Master 通常用于写入数据,Slave 用于读取数据;
⑤ 每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer;
- (三)、Producer 集群
① Producer 为消息的生产者,都是各自独立的无状态的节点,可以认为只要向 mq 中推送消息的节点都算作 Producer 节点。
② Producer 节点与 NameServer 集群中的随机一个节点建立长连接,定期从 NameServer 取出 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。
- (四)、Customer 集群
① Customer 为消息的消费者,都是各自独立的无状态的节点,可以认为只要向 mq 中获取消息的节点都算作 Customer 节点;
② Customer 节点与 NameServer 集群中的随机一个节点建立长连接,定期从 NameServer 取出 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳;
③ Customer 节点既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定;
二、RocketMQ的四种集群模式
(一)、单Master模式
这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
前面我们搭建的单机版RocketMQ其实就是单Master这种模式,具体的搭建过程读者朋友可参考前面的文章。
(二)、多 Master 模式
一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:
- 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
- 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息消费的实时性会受到影响。
(三)、多Master多Slave模式-异步复制
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
- 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样。
- 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
(四)、多Master多Slave模式-同步双写
每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
- 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
- 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高。
三、RocketMQ集群工作流程
- 启动NameServer,NameServer启动后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
Master支持读和写,Slave仅支持读,也就是 Producer只能和Master连接写入消息;Consumer可以连接 Master,也可以连接Slave来读取消息。
四、RocketMQ刷盘机制
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式:同步刷盘和异步刷盘。如下图:
1)同步刷盘
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。
2)异步刷盘
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
3)怎么配置刷盘方式?
同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType参数设置的,这个参数被配置成SYNC_FLUSH(同步刷盘)、ASYNC_FLUSH(异步刷盘)中的 一个。
五、RocketMQ主从复制机制
如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。
1)同步复制
同步复制方式是等Master和Slave均写成功后,才反馈给客户端写成功状态;
在同步复制方式下,如果Master出故障, Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。
2)异步复制
异步复制方式是只要Master写成功,即可反馈给客户端写成功状态。
在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写入Slave,有可能会丢失;
3)怎么配置复制方式?
同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER(同步Master)、 SYNC_MASTER(异步Master)、SLAVE(从节点)三个值中的一个。
4)总结
实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是SYNC_FLUSH方式,由于频繁地触发磁盘写动作,会明显降低性能。通常情况下,应该把Master和Save配置成ASYNC_FLUSH(异步刷盘)的刷盘方式,主从之间配置成SYNC_MASTER(同步复制)的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。
六、总结
本篇文章主要介绍了RocketMQ的集群架构、四种集群模式、RocketMQ集群的工作流程,以及总结了RocketMQ提供的刷盘策略和主从同步复制的方式,下面一篇文章,我们就实践一下如何搭建多Master多Slave模式-异步复制的RocketMQ集群,实现RocketMQ的高可用。
RocketMQ单台,各种集群,可视化安装步骤
文章目录
下载地址
https://mirror.bit.edu.cn/apache/rocketmq/
安装JDK1.8
上传jdk到linux主机/usr/local/src
解压压缩包
tar -xvf jdk-8u51-linux-x64.tar.gz
为了方便好记可以把解压后的jdk文件夹改名为jdk1.8
mv jdk1.8.0_51 jdk1.8
编辑环境变量配置文件
vim /etc/profile
#在文本最后
export JAVA_HOME=/usr/local/src/jdk1.8
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib
使jdk生效
source /etc/profile
验证
java -version
安装RocketMQ
直接下载
wget https://mirror.bit.edu.cn/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip
或上传下载好的压缩包
把rocketmq 解压到 /usr/local/ 目录
安装解压工具
yum install -y unzip zip
解压
unzip rocketmq-all-4.8.0-bin-release.zip
方便改名
mv rocketmq-all-4.8.0-bin-release rocketmq
配置环境变量方便随时使用MQ命令
vim /etc/profile
# 在文件末尾添加以下内容:
export ROCKETMQ_HOME=/usr/local/rocketmq
export PATH=$ROCKETMQ_HOME/bin:$PATH
使环境变量生效
source /etc/profile
配置MQ启动内存大小
rocketmq需要启动两个服务: name server 和 broker, name server 默认配置JVM使用的内存是4g, broker默认配置JVM使用的内存是8g.
修改 name server 内存改为 256m
cd /usr/local/rocketmq/
# 编辑 bin/runserver.sh
vim bin/runserver.sh
# 找到文件中下面这一行:
JAVA_OPT="$JAVA_OPT -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
# 将 -Xms4g -Xmx4g -Xmn2g 修改为 -Xms256m -Xmx256m -Xmn128m
JAVA_OPT="$JAVA_OPT -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
修改 broker 内存改为 256m
# 编辑 bin/runbroker.sh
vim bin/runbroker.sh
# 找到文件中下面这一行:
JAVA_OPT="$JAVA_OPT -server -Xms8g -Xmx8g -Xmn4g"
# 将 -Xms8g -Xmx8g -Xmn4g 修改为 -Xms256m -Xmx256m -Xmn128m
JAVA_OPT="$JAVA_OPT -server -Xms256m -Xmx256m -Xmn128m"
启动 rocketmq
先启动 name server
# 进入 rocketmq 目录
cd /usr/local/rocketmq/
# 启动 name server
nohup sh bin/mqnamesrv &
# 查看运行日志, 看到"The Name Server boot success."表示启动成功
tail -f ~/logs/rocketmqlogs/namesrv.log
再启动 broker
# 启动 broker, 连接name server: localhost:9876
nohup sh bin/mqbroker -n localhost:9876 &
# 查看运行日志, 看到"The broker[......:10911] boot success."表示启动成功
tail -f ~/logs/rocketmqlogs/broker.log
测试
运行测试, 启动生产者发送消息, 启动消费者接收消息
# 通过环境变量, 告诉客户端程序name server的地址
export NAMESRV_ADDR=localhost:9876
# 启动生产者来测试发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
# 启动消费者来测试接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
关闭RocketMQ
mqshutdown broker
mqshutdown namesrv
RocketMQ可视化界面
github提供了rocketmq 的管理界面: 地址:
https://github.com/apache/rocketmq-externals
使用码云的镜像, 地址:
https://gitee.com/mirrors/RocketMQ-Externals
打包项目
如果没有安装 maven, 请先执行 maven 安装命令
yum install -y maven
打包管理界面项目 rocketmq-console.
# 进入管理界面项目的文件夹
cd RocketMQ-Externals/rocketmq-console
# 执行maven 打包命令, 执行时间较长, 请耐心等待
mvn clean package -Dmaven.test.skip=true
运行启动管理界面
打包的 jar 文件在 target 目录, 进入目录执行jar文件
# 进入 target 目录
cd target
# 运行管理界面
nohup java -jar rocketmq-console-ng-1.0.1.jar --server.port=8080 --rocketmq.config.namesrvAddr=localhost:9876 &
访问管理界面
http://ip:8080
RocketMQ集群方案
4种模式介绍
多Master模式:一个集群无Slave,全是Master
多Master多Slave模式:异步复制,同步复制
Dledger:每个Master配置二个 Slave 组成 Dledger Group,可以有多个 Dledger Group,由 Dledger 实现 Master 选举,可实现动态扩容。
选择:
前三种Master Broker 挂了之后 ,没办法让 Slave Broker 自动 切换为新的 Master Broker,需要手动更改配置将 Slave Broker 设置为 Master Broker,推荐Dledger
双主双从同步-和-异步复制方案搭建
同步方案
整个集群由2个 name server 实例和4个 broker 实例组成
name server:
两台服务器分别启动两个name server
broker A 主从:
服务器1部署 broker A 主服务
服务器2部署 broker A 从服务
broker B 主从:
服务器2部署 broker B 主服务
服务器1部署 broker B 从服务
方便在一台机器上启动多个实例,以文件夹分开
mkdir /usr/local/rocketmq/store/
mkdir /usr/local/rocketmq/store/broker-a
mkdir /usr/local/rocketmq/store/broker-a/commitlog
mkdir /usr/local/rocketmq/store/broker-b
mkdir /usr/local/rocketmq/store/broker-b/commitlog
mkdir /usr/local/rocketmq/store/broker-as
mkdir /usr/local/rocketmq/store/broker-as/commitlog
mkdir /usr/local/rocketmq/store/broker-bs
mkdir /usr/local/rocketmq/store/broker-bs/commitlog
rocketmq/conf 目录下提供了四种集群方案的配置样例
2m-2s-async:双主双从异步复制
2m-2s-sync:双主双从同步复制
2m-noslave:双主
dledger: raft主从切换
broker-a,a主服务器配置
在服务器1修改样例配置文件:rocketmq/conf/2m-2s-sync/broker-a.properties
在样例配置文件中,添加三项配置:
listenPort:我们在一台服务器上要运行两个broker实例,所以两个实例的端口要有所区分。这里broker-a主服务器的端口使用默认的10911。
storePathRootDir:数据存储目录
storePathCommitLog:提交日志存储目录
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
storePathRootDir=/usr/local/rocketmq/store/broker-a
storePathCommitLog=/usr/local/rocketmq/store/broker-a/commitlog
broker-a slave,a从服务器配置
在服务器2修改样例配置文件:rocketmq/conf/2m-2s-sync/broker-a-s.properties
在样例配置文件中,添加三项配置:
listenPort:我们在一台服务器上要运行两个broker实例,所以两个实例的端口要有所区分。这里broker-a slave从服务器的端口使用11911。
storePathRootDir:数据存储目录
storePathCommitLog:提交日志存储目录
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=11911
storePathRootDir=/usr/local/rocketmq/store/broker-as
storePathCommitLog=/usr/local/rocketmq/store/broker-as/commitlog
broker-b,b主服务器配置
在服务器2修改样例配置文件:rocketmq/conf/2m-2s-sync/broker-b.properties
在样例配置文件中,添加三项配置:
listenPort:我们在一台服务器上要运行两个broker实例,所以两个实例的端口要有所区分。这里broker-b主服务器的端口使用默认的10911。
storePathRootDir:数据存储目录
storePathCommitLog:提交日志存储目录
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
storePathRootDir=/usr/local/rocketmq/store/broker-b
storePathCommitLog=/usr/local/rocketmq/store/broker-b/commitlog
broker-b slave,b从服务器配置
在服务器1修改样例配置文件:rocketmq/conf/2m-2s-sync/broker-b-s.properties
在样例配置文件中,添加三项配置:
listenPort:我们在一台服务器上要运行两个broker实例,所以两个实例的端口要有所区分。这里broker-b slave从服务器的端口使用11911。
storePathRootDir:数据存储目录
storePathCommitLog:提交日志存储目录
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=11911
storePathRootDir=/usr/local/rocketmq/store/broker-bs
storePathCommitLog=/usr/local/rocketmq/store/broker-bs/commitlog
#支持sql92
enablePropertyFilter=true
配置要点说明
四台服务器的集群名 brokerClusterName 相同。集群名称相同的服务器共同组成服务集群 。
从服务器通过名字与主服务器关联在一起,brokerName 与主服务器相同。
brokerId为0是主服务器。从服务器的值是非零值,例如如果有四个从服务器,他们的 brokerId 应该是 1,2,3,4。
brokerRole的值为 SYNC_MASTER 是同步复制的主服务器。如果是 ASYNC_MASTER 则为异步复制的主服务器。
同步复制:消息复制到从服务器后才向生产者发回反馈信息。
异步复制:消息发到主服务器就向生产者发回反馈信息,之后再向从服务器复制。
启动
启动两个 name server
在两台服务器上启动两个 name server,它们不用做任何集群的配置,都是作为独立服务运行,它们之间也不会进行数据复制。
所有broker服务启动后,要同时连接这两个 name server,向两个 name server 进行注册。
在两台服务器上都启动 name server:
nohup sh mqnamesrv &
启动 broker a 的主从两台服务器
在服务器1上启动 broker a 主服务器:
参数说明:
-n参数:指定name server地址列表,多个地址用分号分隔
-c参数:指定配置文件,使用指定的配置文件启动 broker
nohup sh mqbroker \\
-n '192.168.126.100:9876;192.168.64.152:9876' \\
-c ../conf/2m-2s-sync/broker-a.properties \\
&
在服务器2上启动 broker a 从服务器:
nohup sh mqbroker \\
-n '192.168.126.100:9876;192.168.64.152:9876' \\
-c ../conf/2m-2s-sync/broker-a-s.properties \\
&
启动 broker b 的主从两台服务器
在服务器2上启动 broker b 主服务器:
nohup sh mqbroker \\
-n '192.168.126.100:9876;192.168.64.152:9876' \\
-c ../conf/2m-2s-sync/broker-b.properties \\
&
在服务器1上启动 broker b 从服务器:
nohup sh mqbroker \\
-n '192.168.126.100:9876;192.168.64.152:9876' \\
-c ../conf/2m-2s-sync/broker-b-s.properties \\
&
检查
jps
使用可视化管理
#使用可视化jar包
nohup java -jar rocketmq-console-ng-1.0.1.jar \\
--server.port=8080 \\
--rocketmq.config.namesrvAddr='192.168.126.100:9876;192.168.64.152:9876' \\
&
异步方案
同步方案修改主的配置参数即可
Dledger方案搭建
(一主两从,可配置多个一主两从实现扩容)
部分配置需要自行建好文件夹配置好对应机器ip
conf/dledger配置文件解释
共有三台n0主,n1从,n2从
主n0
## 集群名
brokerClusterName = myCluster
## broker组名,同一个myClusterGroup内,brokerName名要一样
brokerName=myNode00
listenPort=30911
namesrvAddr=192.168.126.100:9876;192.168.16.103:9876
storePathRootDir=/usr/local/rocketmq/store/node00
storePathCommitLog=/usr/local/rocketmq/store/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=myNode00
## n0 n1 n2 分别是broker1,broker2,broker3 的 dLegerSelfId
## 例如:dLegerPeers=n0-服务器1的IP:40911;n1-服务器2的IP:40912;n2-服务器3的IP:40913
dLegerPeers=n0-192.168.126.102:40911;n1-192.168.126.102:40912;n2-192.168.126.102:40913
## must be unique
## 这个值必须是在同一个RaftClusterGroup内唯一的
dLegerSelfId=n0
sendMessageThreadPoolNums=16
从n1
brokerClusterName = myCluster
brokerName=myNode00
listenPort=30911
namesrvAddr=192.168.126.100:9876;192.168.16.103:9876
storePathRootDir=/usr/local/rocketmq/store/node00
storePathCommitLog=/usr/local/rocketmq/store/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=myNode00
dLegerPeers=n0-192.168.126.102:40911;n1-192.168.126.102:40912;n2-192.168.126.102:40913
dLegerSelfId=n1
sendMessageThreadPoolNums=16
从n2以此类推
启动
类似之前集群一样先启动多台nameServer
nohup sh bin/mqnamesrv > nohubNameserv &
再启动三台broker(n0,n1,n2)
nohup sh mqbroker \\
-n '192.168.126.100:9876;192.168.64.152:9876' \\
-c conf/dledger/broker-n0.conf &
参数名 | 默认值 | 说明 |
---|---|---|
listenPort | 10911 | 接受客户端连接的监听端口 |
namesrvAddr | null | nameServer 地址 |
brokerIP1 | 网卡的 | InetAddress 当前 broker 监听的 IP |
brokerIP2 | 跟 brokerIP1 一样 | 存在主从 broker 时,如果在 broker 主节点上配置了 brokerIP2 属性,broker 从节点会连接主节点配置的 brokerIP2 进行同步 |
brokerName | null | broker 的名称 |
brokerClusterName | DefaultCluster | 本 broker 所属的 Cluser 名称 |
brokerId | 0 | broker id, 0 表示 master, 其他的正整数表示 slave |
storePathCommitLog | $HOME/store/commitlog/ | 存储 commit log 的路径 |
storePathConsumerQueue | $HOME/store/consumequeue/ | 存储 consume queue 的路径 |
mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 |
deleteWhen | 04 | 在每天的什么时间删除已经超过文件保留时间的 commit log |
fileReservedTime | 72 | 以小时计算的文件保留时间 |
brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |
flushDiskType | ASYNC_FLUSH | SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保证在收到确认生产者之前将消息刷盘。ASYNC_FLUSH 模式下的 broker 则利用刷盘一组消息的模式,可以取得更好的性能。 |
enableDLegerCommitLog | 是否启动 DLedger | true |
dLegerGroup | DLedger Raft Group的名字,建议和 brokerName 保持一致 | RaftNode00 |
dLegerPeers | DLedger Group 内各节点的端口信息,同一个 Group 内的各个节点配置必须要保证一致 | n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 |
dLegerSelfId | 节点 id, 必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一 | n0 |
sendMessageThreadPoolNums | 发送线程个数,建议配置成 Cpu 核数 | 16 |
以上是关于RocketMQRocketMQ集群架构的主要内容,如果未能解决你的问题,请参考以下文章