RocketMQ安装及集群搭建

Posted 小小本科生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ安装及集群搭建相关的知识,希望对你有一定的参考价值。

这里写自定义目录标题

一、下载安装

1. 下载

RocketMQ下载地址:http://rocketmq.apache.org/year-archive/
由于目前最新版为4.8.0,因此我点击Release Notes - Apache RocketMQ - Version 4.8.0进入下载页面。其中Source为包含源码版本,如果对RocketMQ源码感兴趣或者想进行二次开发,可以下载Source版本,普通开发下载Binary版本就够了。

随便选择一个镜像进行下载后,将安装包上传至服务器,并用unzip命令进行解压。
(1)如果下载的是源码包,需要手动安装。我这里下载的安装包名称为rocketmq-all-4.8.0-source-release.zip,因此使用unzip rocketmq-all-4.8.0-source-release.zip命令解压后得到的文件如下图所示。

进入rocketmq-all-4.8.0-source-release目录执行

# 使用maven进行安装,使用-DskipTests跳过测试会快一点
mvn -Prelease-all -DskipTests clean install -U
# 使用maven会自动将包安装到distribution/target/目录下
cd distribution/target/rocketmq-4.8.0/rocketmq-4.8.0

(2)如果下载的是二进制包,则不需要安装,直接使用即可。我这里下载的安装包名称为rocketmq-all-4.8.0-bin-release.zip,因此使用unzip rocketmq-all-4.8.0-bin-release.zip命令解压后得到的文件如下所示。

进入rocketmq-all-4.8.0-bin-release目录,可以看到和刚刚用源码包构建出来的项目一样的目录结构

2. 基础概念

本章节我们先了解一些基础概念,否则都不知道自己启动的是什么。
名字服务(Name Server)
Name Server充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
代理服务器(Broker)
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
(生产者)Producer
Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
(消费者)Concumer
Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

3. 启动RocketMQ

无论您是选择的源码安装,还是二进制安装,请切换到安装好的目录下
(1)准备工作
启动Name Server和Broker之前我们需要做一点准备工作,因为RocketMQ设置的默认参数为堆内存8G,新生代4G,这参数对一般学习RocketMQ的开发者机器开说太大了,因此我们需要将其改小一点,防止启动失败。切换到rocketMQ中的bin目录使用vim编辑runbroker.sh,找到如下行改为和下图中一致即可

继续使用vim编辑runserver.sh,找到如下行改为和下图中一致即可。

(2)启动Name Server

# 切换到bin目录中,nohup和&表示后台启动
nohup ./mqnamesrv &
# 执行如下命令可以查看启动结果
tail -f ~/logs/rocketmqlogs/namesrv.log

由于是后台启动因此我们ctrl+c回到命令行状态并不会中断程序的运行,执行jps命令发现Name Server已经启动了

(3)启动Broker

# 切换到bin目录中,后台启动broker,-n:连接到localhost:9876上的Name Server
nohup ./mqbroker -n localhost:9876 &
# 日志信息
tail -f ~/logs/rocketmqlogs/broker.log 

启动后使用jps命令查看Name Server和Broker是否均已启动成功

(4)消息发送测试

# 声明一个临时的环境变量,告诉客户端Name Server的位置
export NAMESRV_ADDR=localhost:9876
# 启动官方提供的生产者发送测试消息
./tools.sh org.apache.rocketmq.example.quickstart.Producer

下图表示消息成功发送

(5)消息消费测试
新开一个ssh连接

# 声明一个临时的环境变量,告诉客户端Name Server的位置
export NAMESRV_ADDR=localhost:9876
# 启动官方提供的消费者测试消费消息
./tools.sh org.apache.rocketmq.example.quickstart.Consumer

下图表示消息消费成功

二、安装RocketMQ console

单节点RocketMQ已经搭建完成,我们也已经完成了消息发送和消息消费的测试,但是结果并不直观。我们需要一个可视化的控制台,能控实时监测RocketMQ的运行情况,以及其中的消息发送与消费情况。RocketMQ官方已经实现了这个控制台,并将其集成在rocketmq-externals项目中。下面我们将一步步安装RocketMQ console。

1. 使用git拉取项目

git地址:https://github.com/apache/rocketmq-externals
如果你的服务器中安装了git,找到一个合适的目录直接执行git clone https://github.com/apache/rocketmq-externals.git拉取代码,否则可以先拉到本地,然后上传至服务器。
注意:笔者2021年10月8号发现,rocketMQ console改名为dashboard,且地址改为https://github.com/apache/rocketmq-dashboard

2. 修改配置文件

RocketMQ console是一个WEB项目,由于我将拉取到/usr/local下面,因此我进入到如下目录修改配置文件/usr/local/rocketmq-externals/rocketmq-console/src/main/resources,有WEB开发基础的应该都知道项目配置存在于该目录下的application.properties文件中。使用vim编辑该文件

第一个修改位置为项目访问端口,默认是8080,由于我的服务器中8080端口被占用了,因此这里我改为其他端口。如果您的端口未被占用,可以不用改动,或者改成您喜欢的访问端口。
第二个参数为Name Server地址。

3. 打包

修改完成后切换到rocketmq-console目录执行如下maven打包命令

# 打包,跳过测试
mvn clean package -Dmaven.test.skip=true

4. 启动项目

打包成功后,该目录下会出现一个target目录,生成的jar包就在该目录中

# 后台启动,并将日志输出到console.log文件中,如果项目启动失败可以查询此日志
nohup java -jar rocketmq-console-ng-2.0.0.jar > console.log &

注意:如果你用的是虚拟机一定不要忘记在防火墙中开放相应端口(RocketMQ-console默认是8080,我的改完之后应该开放22531;Name Server端口为9876),如果你用的是云服务器,一定不要忘了到安全组中开放相应端口。
(5)浏览器访问
在浏览器中输入服务器地址和RocketMQ-console端口号即可访问控制台

三、搭建双主双从集群

双主双从也就是两个主Broker节点,两个从Broker节点,共需要4台服务器,如果追求高可用,Name Server需要部署在其他服务器上,那么将会需要更多的服务器。目前我手上只有两台服务器,因此我使用如下架构来模仿多服务器场景。主从节点分别在不同的服务器上,那么当一台服务器挂掉后,不至于主节点和从节点全部挂掉;而且也模仿了主从之间的远程连接、同步。

如果您参考了上面单节点RocketMQ的搭建过程,首先执行如下命令关闭上述启动的NameServer和Broker,我们要复用刚才搭建的单节点RocketMQ,然后搭建一个双主双从的RocketMQ集群。
在bin目录下执行如下命令关闭NameServer和Broker

./mqshutdown namesrv
./mqshutdown broker

1. 创建服务器1中的消息存储路径

mkdir -p /usr/local/rocketmq/store/broker-a
mkdir -p /usr/local/rocketmq/store/broker-a/commitlog
mkdir -p /usr/local/rocketmq/store/broker-a/consumequeue
mkdir -p /usr/local/rocketmq/store/broker-a/index
mkdir -p /usr/local/rocketmq/store/broker-b
mkdir -p /usr/local/rocketmq/store/broker-b/commitlog
mkdir -p /usr/local/rocketmq/store/broker-b/consumequeue
mkdir -p /usr/local/rocketmq/store/broker-b/index

2. 修改服务器1配置文件

RocketMQ为我们提供了默认的双主双从配置文件,在rocketmq解压目录下的conf目录中,其中2m-2s-sync目录中为双主双从同步复制,我们此次演示就用这个配置文件。

将服务器1中broker-a.properties的修改为如下所示,表示master1

# 防止broker自动识别到内网ip
brokerIP1=服务器1IP
brokerIP2=服务器1IP
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=服务器1IP:9876;服务器2IP:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store/broker-a
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/broker-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/broker-a/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/broker-a/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/broker-a/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/broker-a/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

将服务器1中的broker-b-s.properties改为如下图所示,表示slave2

# 防止broker自动识别到内网ip
brokerIP1=服务器1IP
brokerIP2=服务器1IP
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=服务器1IP:9876;服务器2IP:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store/broker-b
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/broker-b/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/broker-b/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/broker-b/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/broker-b/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/broker-b/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

3. 创建服务器2中的消息存储路径

mkdir -p /usr/local/rocketmq/store/broker-a
mkdir -p /usr/local/rocketmq/store/broker-a/commitlog
mkdir -p /usr/local/rocketmq/store/broker-a/consumequeue
mkdir -p /usr/local/rocketmq/store/broker-a/index
mkdir -p /usr/local/rocketmq/store/broker-b
mkdir -p /usr/local/rocketmq/store/broker-b/commitlog
mkdir -p /usr/local/rocketmq/store/broker-b/consumequeue
mkdir -p /usr/local/rocketmq/store/broker-b/index

4. 修改服务器2配置文件

将服务器2中的broker-b.properties改为如下图所示,表示master2

# 防止broker自动识别到内网ip
brokerIP1=服务器2IP
brokerIP2=服务器2IP
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=服务器1IP:9876;服务器2IP:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store/broker-b
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/broker-b/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/broker-b/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/broker-b/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/broker-b/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/broker-b/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

将服务器2中的broker-a-s.properties改为如下图所示,表示slave1

# 防止broker自动识别到内网ip
brokerIP1=服务器2IP
brokerIP2=服务器2IP
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=服务器1IP:9876;服务器2IP:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store/broker-a
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/broker-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/broker-a/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/broker-a/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/broker-a/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/broker-a/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

注意:不要忘记修改新的机器上runserver.sh和runbroker.sh中的JVM参数

5. 分别启动服务器1和服务器2中的Name Server

# 切换至bin目录
cd /usr/local/rocketmq-all-4.8.0-bin-release/bin
# 启动Name Server
nohup ./mqnamesrv &

6. 分别启动服务器1和服务器2中的Broker master和Broker slave

(a)切换至服务器1

# 后台启动服务器1中的broker master a
nohup ./mqbroker -c /usr/local/rocketmq-all-4.8.0-bin-release/conf/2m-2s-sync/broker-a.properties &
# 后台启动服务器1中的broker slave b
nohup ./mqbroker -c /usr/local/rocketmq-all-4.8.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &

(b)切换至服务器2

# 启动服务器2中中的broker master b
nohup ./mqbroker -c /usr/local/rocketmq-all-4.8.0-bin-release/conf/2m-2s-sync/broker-b.properties &
# 后台启动服务器2中的broker slave a
nohup ./mqbroker -c /usr/local/rocketmq-all-4.8.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &

7. 开放端口

开放服务器1和服务器2中的端口(防火墙端口,如果使用的是云服务器还需要在云服务器安全组中开放端口)
(a)9876:Name Server端口(如果上面已经在服务器1上开放了此端口,那么这里只在服务器2上开放此端口即可)
(b)10911:服务器1中的broker master a端口和服务器2中的broker master b端口
(c)11011:服务器1中的broker slave b端口和服务器2中的broker slave a端口
(d)22531:服务器1中的rocketmq-console端口(如果上面已经开启了,这里可以忽略)
(e)11009
(g)10909

到此集群已经搭建完毕,下面重启RocketMQ Console来查看我们搭建的效果

8. 重启RocketMQ Console

(a)切换至rocketmq console的target目录中

# 切换至目标目录
cd /usr/local/rocketmq-externals/rocketmq-console/target
# 查看rocketmq console jar文件的进程号
ps -ef | grep rocketmq-console-ng-2.0.0.jar
# 杀掉进程
kill -9 25874

找到配置文件application.properties,添加服务器2中的Name Server地址

(b)打包启动

# 打包
mvn clean package -Dmaven.test.skip=true
# 启动
nohup java -jar rocketmq-console-ng-2.0.0.jar > console.log &

以上是关于RocketMQ安装及集群搭建的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ安装及集群搭建

RocketMQ集群搭建2

安装部署RocketMQ集群(双主双从)

rocketMQ安装配置+与java交互API操作+集群搭建+高级特性

rocketMQ安装配置+与java交互API操作+集群搭建+高级特性

mac下rocketmq各种集群模式搭建实战