RocketMQ安装及集群搭建
Posted 小小本科生
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ安装及集群搭建相关的知识,希望对你有一定的参考价值。
这里写自定义目录标题
一、下载安装
1. 下载
RocketMQ下载地址:http://rocketmq.apache.org/year-archive/
由于目前最新版为4.8.0,因此我点击Release Notes - Apache RocketMQ - Version 4.8.0进入下载页面。其中Source为包含源码版本,如果对RocketMQ源码感兴趣或者想进行二次开发,可以下载Source版本,普通开发下载Binary版本就够了。
随便选择一个镜像进行下载后,将安装包上传至服务器,并用unzip命令进行解压。
(1)如果下载的是源码包,需要手动安装。我这里下载的安装包名称为rocketmq-all-4.8.0-source-release.zip,因此使用unzip rocketmq-all-4.8.0-source-release.zip
命令解压后得到的文件如下图所示。
进入rocketmq-all-4.8.0-source-release目录执行
# 使用maven进行安装,使用-DskipTests跳过测试会快一点
mvn -Prelease-all -DskipTests clean install -U
# 使用maven会自动将包安装到distribution/target/目录下
cd distribution/target/rocketmq-4.8.0/rocketmq-4.8.0
(2)如果下载的是二进制包,则不需要安装,直接使用即可。我这里下载的安装包名称为rocketmq-all-4.8.0-bin-release.zip,因此使用unzip rocketmq-all-4.8.0-bin-release.zip
命令解压后得到的文件如下所示。
进入rocketmq-all-4.8.0-bin-release目录,可以看到和刚刚用源码包构建出来的项目一样的目录结构
2. 基础概念
本章节我们先了解一些基础概念,否则都不知道自己启动的是什么。
名字服务(Name Server)
Name Server充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
代理服务器(Broker)
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
(生产者)Producer
Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
(消费者)Concumer
Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
3. 启动RocketMQ
无论您是选择的源码安装,还是二进制安装,请切换到安装好的目录下
(1)准备工作
启动Name Server和Broker之前我们需要做一点准备工作,因为RocketMQ设置的默认参数为堆内存8G,新生代4G,这参数对一般学习RocketMQ的开发者机器开说太大了,因此我们需要将其改小一点,防止启动失败。切换到rocketMQ中的bin目录使用vim编辑runbroker.sh,找到如下行改为和下图中一致即可
继续使用vim编辑runserver.sh,找到如下行改为和下图中一致即可。
(2)启动Name Server
# 切换到bin目录中,nohup和&表示后台启动
nohup ./mqnamesrv &
# 执行如下命令可以查看启动结果
tail -f ~/logs/rocketmqlogs/namesrv.log
由于是后台启动因此我们ctrl+c回到命令行状态并不会中断程序的运行,执行jps命令发现Name Server已经启动了
(3)启动Broker
# 切换到bin目录中,后台启动broker,-n:连接到localhost:9876上的Name Server
nohup ./mqbroker -n localhost:9876 &
# 日志信息
tail -f ~/logs/rocketmqlogs/broker.log
启动后使用jps命令查看Name Server和Broker是否均已启动成功
(4)消息发送测试
# 声明一个临时的环境变量,告诉客户端Name Server的位置
export NAMESRV_ADDR=localhost:9876
# 启动官方提供的生产者发送测试消息
./tools.sh org.apache.rocketmq.example.quickstart.Producer
下图表示消息成功发送
(5)消息消费测试
新开一个ssh连接
# 声明一个临时的环境变量,告诉客户端Name Server的位置
export NAMESRV_ADDR=localhost:9876
# 启动官方提供的消费者测试消费消息
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
下图表示消息消费成功
二、安装RocketMQ console
单节点RocketMQ已经搭建完成,我们也已经完成了消息发送和消息消费的测试,但是结果并不直观。我们需要一个可视化的控制台,能控实时监测RocketMQ的运行情况,以及其中的消息发送与消费情况。RocketMQ官方已经实现了这个控制台,并将其集成在rocketmq-externals项目中。下面我们将一步步安装RocketMQ console。
1. 使用git拉取项目
git地址:https://github.com/apache/rocketmq-externals
如果你的服务器中安装了git,找到一个合适的目录直接执行git clone https://github.com/apache/rocketmq-externals.git
拉取代码,否则可以先拉到本地,然后上传至服务器。
注意:笔者2021年10月8号发现,rocketMQ console改名为dashboard,且地址改为https://github.com/apache/rocketmq-dashboard
2. 修改配置文件
RocketMQ console是一个WEB项目,由于我将拉取到/usr/local下面,因此我进入到如下目录修改配置文件/usr/local/rocketmq-externals/rocketmq-console/src/main/resources,有WEB开发基础的应该都知道项目配置存在于该目录下的application.properties文件中。使用vim编辑该文件
第一个修改位置为项目访问端口,默认是8080,由于我的服务器中8080端口被占用了,因此这里我改为其他端口。如果您的端口未被占用,可以不用改动,或者改成您喜欢的访问端口。
第二个参数为Name Server地址。
3. 打包
修改完成后切换到rocketmq-console目录执行如下maven打包命令
# 打包,跳过测试
mvn clean package -Dmaven.test.skip=true
4. 启动项目
打包成功后,该目录下会出现一个target目录,生成的jar包就在该目录中
# 后台启动,并将日志输出到console.log文件中,如果项目启动失败可以查询此日志
nohup java -jar rocketmq-console-ng-2.0.0.jar > console.log &
注意:如果你用的是虚拟机一定不要忘记在防火墙中开放相应端口(RocketMQ-console默认是8080,我的改完之后应该开放22531;Name Server端口为9876),如果你用的是云服务器,一定不要忘了到安全组中开放相应端口。
(5)浏览器访问
在浏览器中输入服务器地址和RocketMQ-console端口号即可访问控制台
三、搭建双主双从集群
双主双从也就是两个主Broker节点,两个从Broker节点,共需要4台服务器,如果追求高可用,Name Server需要部署在其他服务器上,那么将会需要更多的服务器。目前我手上只有两台服务器,因此我使用如下架构来模仿多服务器场景。主从节点分别在不同的服务器上,那么当一台服务器挂掉后,不至于主节点和从节点全部挂掉;而且也模仿了主从之间的远程连接、同步。
如果您参考了上面单节点RocketMQ的搭建过程,首先执行如下命令关闭上述启动的NameServer和Broker,我们要复用刚才搭建的单节点RocketMQ,然后搭建一个双主双从的RocketMQ集群。
在bin目录下执行如下命令关闭NameServer和Broker
./mqshutdown namesrv
./mqshutdown broker
1. 创建服务器1中的消息存储路径
mkdir -p /usr/local/rocketmq/store/broker-a
mkdir -p /usr/local/rocketmq/store/broker-a/commitlog
mkdir -p /usr/local/rocketmq/store/broker-a/consumequeue
mkdir -p /usr/local/rocketmq/store/broker-a/index
mkdir -p /usr/local/rocketmq/store/broker-b
mkdir -p /usr/local/rocketmq/store/broker-b/commitlog
mkdir -p /usr/local/rocketmq/store/broker-b/consumequeue
mkdir -p /usr/local/rocketmq/store/broker-b/index
2. 修改服务器1配置文件
RocketMQ为我们提供了默认的双主双从配置文件,在rocketmq解压目录下的conf目录中,其中2m-2s-sync目录中为双主双从同步复制,我们此次演示就用这个配置文件。
将服务器1中broker-a.properties的修改为如下所示,表示master1
# 防止broker自动识别到内网ip
brokerIP1=服务器1IP
brokerIP2=服务器1IP
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=服务器1IP:9876;服务器2IP:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store/broker-a
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/broker-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/broker-a/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/broker-a/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/broker-a/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/broker-a/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
将服务器1中的broker-b-s.properties改为如下图所示,表示slave2
# 防止broker自动识别到内网ip
brokerIP1=服务器1IP
brokerIP2=服务器1IP
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=服务器1IP:9876;服务器2IP:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store/broker-b
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/broker-b/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/broker-b/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/broker-b/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/broker-b/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/broker-b/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
3. 创建服务器2中的消息存储路径
mkdir -p /usr/local/rocketmq/store/broker-a
mkdir -p /usr/local/rocketmq/store/broker-a/commitlog
mkdir -p /usr/local/rocketmq/store/broker-a/consumequeue
mkdir -p /usr/local/rocketmq/store/broker-a/index
mkdir -p /usr/local/rocketmq/store/broker-b
mkdir -p /usr/local/rocketmq/store/broker-b/commitlog
mkdir -p /usr/local/rocketmq/store/broker-b/consumequeue
mkdir -p /usr/local/rocketmq/store/broker-b/index
4. 修改服务器2配置文件
将服务器2中的broker-b.properties改为如下图所示,表示master2
# 防止broker自动识别到内网ip
brokerIP1=服务器2IP
brokerIP2=服务器2IP
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=服务器1IP:9876;服务器2IP:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store/broker-b
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/broker-b/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/broker-b/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/broker-b/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/broker-b/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/broker-b/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
将服务器2中的broker-a-s.properties改为如下图所示,表示slave1
# 防止broker自动识别到内网ip
brokerIP1=服务器2IP
brokerIP2=服务器2IP
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=服务器1IP:9876;服务器2IP:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store/broker-a
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/broker-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/broker-a/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/broker-a/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/broker-a/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/broker-a/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
注意:不要忘记修改新的机器上runserver.sh和runbroker.sh中的JVM参数
5. 分别启动服务器1和服务器2中的Name Server
# 切换至bin目录
cd /usr/local/rocketmq-all-4.8.0-bin-release/bin
# 启动Name Server
nohup ./mqnamesrv &
6. 分别启动服务器1和服务器2中的Broker master和Broker slave
(a)切换至服务器1
# 后台启动服务器1中的broker master a
nohup ./mqbroker -c /usr/local/rocketmq-all-4.8.0-bin-release/conf/2m-2s-sync/broker-a.properties &
# 后台启动服务器1中的broker slave b
nohup ./mqbroker -c /usr/local/rocketmq-all-4.8.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &
(b)切换至服务器2
# 启动服务器2中中的broker master b
nohup ./mqbroker -c /usr/local/rocketmq-all-4.8.0-bin-release/conf/2m-2s-sync/broker-b.properties &
# 后台启动服务器2中的broker slave a
nohup ./mqbroker -c /usr/local/rocketmq-all-4.8.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &
7. 开放端口
开放服务器1和服务器2中的端口(防火墙端口,如果使用的是云服务器还需要在云服务器安全组中开放端口)
(a)9876:Name Server端口(如果上面已经在服务器1上开放了此端口,那么这里只在服务器2上开放此端口即可)
(b)10911:服务器1中的broker master a端口和服务器2中的broker master b端口
(c)11011:服务器1中的broker slave b端口和服务器2中的broker slave a端口
(d)22531:服务器1中的rocketmq-console端口(如果上面已经开启了,这里可以忽略)
(e)11009
(g)10909
到此集群已经搭建完毕,下面重启RocketMQ Console来查看我们搭建的效果
8. 重启RocketMQ Console
(a)切换至rocketmq console的target目录中
# 切换至目标目录
cd /usr/local/rocketmq-externals/rocketmq-console/target
# 查看rocketmq console jar文件的进程号
ps -ef | grep rocketmq-console-ng-2.0.0.jar
# 杀掉进程
kill -9 25874
找到配置文件application.properties,添加服务器2中的Name Server地址
(b)打包启动
# 打包
mvn clean package -Dmaven.test.skip=true
# 启动
nohup java -jar rocketmq-console-ng-2.0.0.jar > console.log &
以上是关于RocketMQ安装及集群搭建的主要内容,如果未能解决你的问题,请参考以下文章
rocketMQ安装配置+与java交互API操作+集群搭建+高级特性