高性能分布式消息中间件—RocketMQ
Posted 云客时间
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高性能分布式消息中间件—RocketMQ相关的知识,希望对你有一定的参考价值。
我是这样规划的,想从以下的技术栈记录:
1:分布式系统消息中间件。
2:分布式搜索引擎。
3:分布式JOB。
4:分布式缓存。
5:分库分表。
6:手把手教你打造一个第三方公众平台。
7:分布式系统性能监控技术。
8:Spring生态源码解读。
9:Spring生态实战技术。
10:并发技术汇总。
11:jvm调优。
12:netty。
13:架构实战。
14:java基础。
15:mysql & TIDB。
简介:
RocketMQ作为一款纯java、是由阿里巴巴公司开发的分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
由开源社区killme2008维护,开源社区非常活跃。https://github.com/killme2008/Metamorphosis。Metaq 2.x。于2012年10月份上线,在淘宝内部被广泛使用 。Metaq 3.0发布时,产品名称改为RocketMQ。基于公司内部开源共建原则,RocketMQ项目只维护核心功能,且去除了所有其他运行时的依赖,核心功能最简化。每个BU的个性化需求都在RocketMQ项目之上进行深度定制。RocketMQ向其他BU提供的仅仅是jar包,例如要定制一个Broker,那么只需要依赖rocketmq-broker这个jar包即可,可通过API进行交互,如果定制client,则依赖rocketmq-client这个jar包,对其提供的api进行再封装。如今的rocketmq孵化成了Apache的顶级开源项目。
特点:
-
支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型 -
在一个队列中可靠的先进先出(FIFO)和严格的顺序传递 (RocketMQ可以保证严格的消息顺序,而ActiveMQ无法保证) -
支持拉(pull)和推(push)两种消息模式 (Push好理解,比如在消费者端设置Listener回调;而Pull,控制权在于应用,即应用需要主动的调用拉消息方法从Broker获取消息,这里面存在一个消费位置记录的问题(如果不记录,会导致消息重复消费)) -
单一队列百万消息的堆积能力 (RocketMQ提供亿级消息的堆积能力,这不是重点,重点是堆积了亿级的消息后,依然保持写入低延迟) -
支持多种消息协议,如 JMS、MQTT 等 -
分布式高可用的部署架构,满足至少一次消息传递语义 (RocketMQ原生就是支持分布式的,而ActiveMQ原生存在单点性) -
提供 docker 镜像用于隔离测试和云集群部署 -
提供配置、指标和监控等功能丰富的 Dashboard
在Metaq1.x/2.x的版本中,分布式协调采用的是Zookeeper,而RocketMQ自己实现了一个NameServer,更加轻量级,性能更好!
-
**组(Group)**有Producer/Consumer Group。ActiveMQ中并没有Group这个概念,而在RocketMQ中理解Group的机制很重要。通过Group机制,让RocketMQ天然的支持消息负载均衡!比如某个Topic有9条消息,其中一个Consumer Group有3个实例(3个进程 OR 3台机器),那么每个实例将均摊3条消息!(注意RocketMQ只有一种模式,即发布订阅模式。) -
消息失败重试机制、高效的订阅者水平扩展能力、强大的API、事务机制等等。
专业术语
Producer
消息生产者,生产者的作用就是将消息发送到 MQ,生产者本身既可以产生消息,如读取文本信息等。也可以对外提供接口,由外部应用来调用接口,再由生产者将收到的消息发送到 MQ。
Producer Group
生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。在这里可以不用关心,只要知道有这么一个概念即可。
Consumer
消息消费者,简单来说,消费 MQ 上的消息的应用程序就是消费者,至于消息是否进行逻辑处理,还是直接存储到数据库等取决于业务需要。
Consumer Group
消费者组,和生产者类似,消费同一类消息的多个 consumer 实例组成一个消费者组。
Topic
Topic 是一种消息的逻辑分类,比如说你有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单 Topic 存放订单相关的消息,一个是库存 Topic 存储库存相关的消息。
Message
Tag
标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。
Broker
Broker 是 RocketMQ 系统的主要角色,其实就是前面一直说的 MQ。Broker 接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备。
Name Server
Name Server 为 producer 和 consumer 提供路由信息。
rocketmq基于Dledger构建高可用
构建集群需要至少3台服务器
本人的RocketMq集群是基于阿里云的centos7.9版本。网上很多教程写的基于虚拟机的本地环境。我认为要玩儿就玩儿的正式一点,真正可以接近生产环境的玩法。
构建安装DLedger
git clone https://github.com/openmessaging/openmessaging-storage-dledger.git
cd openmessaging-storage-dledger
mvn clean install -DskipTests
另外系统中还要安装一个git,用作拉取rocketmq控制台的源码工程。
下载RocketMQ
wget https://github.com/apache/rocketmq/releases
unzip rocketmq-all-4.7.1-source-release.zip
cd rocketmq-all-4.7.1/
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/rocketmq-4.7.1/rocketmq-4.7.1
这里注意,rocketmq只有4.5以上版本支持dledger集群构建
修改配置文件
服务器说明:(生产中应该将 NameServer 部署到其他服务器中,在这为了方便,与Broker部署在一起)
安装Maven
cd /opt
wget http://mirrors.cnnic.cn/apache/maven/maven-3/3.5.4/binaries/apache-maven-3.5.4-bin.tar.gz
tar -zxvf apache-maven-3.5.4-bin.tar.gz
# 修改环境变量
vim /etc/profile
# 最下面添加
export MAVEN_HOME=/opt/apache-maven-3.5.4
export PATH=$MAVEN_HOME/bin:$PATH
# 保存退出
source /etc/profile
# 建立软连接
ln -s /opt/apache-maven-3.5.4/bin/mvn /usr/bin/mvn
安装JDK
这里需要自己前往oracle官方下载
配置java环境变量
环境变量配置 | 配置 |
---|---|
"" | export JAVA_HOME=/opt/jdk1.8.0_261 |
vi /etc/profile | export CLASSPATH=${JAVA_HOME}/lib |
"" | export PATH= {JAVA_HOME}/bin |
source /etc/profile 刷新配置
修改配置文件
服务器 | ip | 安装的服务 |
---|---|---|
服务器1-主 | 172.30.134.189 | DLedger,Broker,NameServer |
服务器2-从 | 172.30.134.190 | DLedger,Broker,NameServer |
服务器3-从 | 172.30.134.191 | DLedger,Broker,NameServer |
服务器1配置-Master
进入rocketmq安装目录找到
vim conf/dledger/broker-n0.conf
修改Broker配置
## 集群名
brokerClusterName = RaftCluster
### broker组名,同一个RaftClusterGroup内,brokerName名要一样
brokerName=RaftNode00
### 监听的端口
listenPort=10911
### 你设置的NameServer地址和端口
namesrvAddr=172.30.134.189:9876;172.30.134.190:9876;172.30.134.191:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
### n0 n1 n2 分别是broker1,broker2,broker3 的 dLegerSelfId
### 例如:dLegerPeers=n0-服务器1的IP:40911;n1-服务器2的IP:40912;n2-服务器3的IP:40913
dLegerPeers=n0-172.30.134.189:40911;n1-172.30.134.190:40912;n2-172.30.134.191:40913
### must be unique
autoCreateTopicEnable=true
### 这个值必须是在同一个RaftClusterGroup内唯一的
dLegerSelfId=n0
sendMessageThreadPoolNums=4
### 由于我的虚拟机配置了多个网卡,所以会绑定ip错误,因此我配置了这项,
brokerIP1=master主机的外网IP地址
服务器2配置-Slave
vim conf/dledger/broker-n1.conf
修改Broker配置
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=10921
namesrvAddr=172.30.134.189:9876;172.30.134.190:9876;172.30.134.191:9876
storePathRootDir=/tmp/rmqstore/node01
storePathCommitLog=/tmp/rmqstore/node01/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-172.30.134.189:40911;n1-172.30.134.190:40912;n2-172.30.134.191:40913
## must be unique
dLegerSelfId=n1
autoCreateTopicEnable=true
sendMessageThreadPoolNums=4
brokerIP1=内网IP
这里有个坑需要注意一下,如果说,你的master主机的brokerIP1设置了外网的ip那么你的slave节点的brokerIP1就要设置为内网的IP
服务器3配置-Slave
vim conf/dledger/broker-n2.conf 修改Broker配置
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=10931
namesrvAddr=172.30.134.189:9876;172.30.134.190:9876;172.30.134.191:9876
storePathRootDir=/tmp/rmqstore/node02
storePathCommitLog=/tmp/rmqstore/node02/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-172.30.134.189:40911;n1-172.30.134.190:40912;n2-172.30.134.191:40913
## must be unique
autoCreateTopicEnable=true
dLegerSelfId=n2
sendMessageThreadPoolNums=4
brokerIP1=172.30.134.191
启动集群
在服务器1 执行
nohup sh bin/mqnamesrv -n 172.30.134.189 & > nohubNameserv
nohup sh bin/mqbroker > nohubBroker -c conf/dledger/broker-n0.conf autoCreateTopicEnable=true &
启动集群
在服务器2 执行
nohup sh bin/mqnamesrv -n 172.30.134.190 & > nohubNameserv
nohup sh bin/mqbroker > nohubBroker -c conf/dledger/broker-n1.conf autoCreateTopicEnable=true &
启动集群
在服务器3 执行
nohup sh bin/mqnamesrv -n 172.30.134.191 & > nohubNameserv
nohup sh bin/mqbroker > nohubBroker -c conf/dledger/broker-n2.conf autoCreateTopicEnable=true &
查看集群情况
sh bin/mqadmin clusterList -n 127.0.0.1:9876
备注
Broker 配置
Broker 配置
参考文档:
参数名 | 默认值 | 说明 |
---|---|---|
listenPort | 10911 | 接受客户端连接的监听端口 |
namesrvAddr | null | nameServer 地址 |
brokerIP1 | 网卡的 InetAddress | 当前 broker 监听的 IP |
brokerIP2 | 跟 brokerIP1 一样 | 存在主从 broker 时,如果在 broker 主节点上配置了 brokerIP2 属性,broker 从节点会连接主节点配置的 brokerIP2 进行同步 |
brokerName | null | broker 的名称 |
brokerClusterName | DefaultCluster | 本 broker 所属的 Cluser 名称 |
brokerId | 0 | broker id, 0 表示 master, 其他的正整数表示 slave |
storePathCommitLog | $HOME/store/commitlog/ | 存储 commit log 的路径 |
storePathConsumerQueue | $HOME/store/consumequeue/ | 存储 consume queue 的路径 |
mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 |
deleteWhen | 04 | 在每天的什么时间删除已经超过文件保留时间的 commit log |
fileReservedTime | 72 | 以小时计算的文件保留时间 |
brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |
flushDiskType | ASYNC_FLUSH | SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保证在收到确认生产者之前将消息刷盘。ASYNC_FLUSH 模式下的 broker 则利用刷盘一组消息的模式,可以取得更好的性能。 |
enableDLegerCommitLog | 是否启动 DLedger | true |
dLegerGroup | DLedger Raft Group的名字,建议和 brokerName 保持一致 | RaftNode00 |
dLegerPeers | DLedger Group 内各节点的端口信息,同一个 Group 内的各个节点配置必须要保证一致 | n0-172.30.134.189:40911;n1-172.30.134.190:40912;n2-172.30.134.191:40913 |
dLegerSelfId | 节点 id, 必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一 | n0 |
sendMessageThreadPoolNums | 发送线程个数,建议配置成 Cpu 核数 | 16 |
启动内存不够
修改 bin/runbroker.sh 和 bin/runserver.sh 中的
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
报连接超时
查看防火墙服务状态 systemctl status firewalld
将防火墙关闭 systemctl stop firewalld
安装控制台
1.下载文件(/usr/local目录下)
2.解压(/usr/local目录下)
yum install -y unzip zip 前提是:unzip解压文件无法使用 unzip rocketmq-externals-master.zip 解压文件
3.修改配置文件(usr/local/rocketmq-externals-master/目录下)
find -name application.properties 可以查看到两个文件都在rocketmq-console文件目录下
vi application.properties
4.编译(usr/local/rocketmq-externals-master/rocketmq-console/目录下)
mvn clean package -Dmaven.test.skip=true 如果失败多编译几次--可能是网络问题
编译成功后,在rocketmq-console目录下会生成一个目录:target目录,该目录下有启动rocketmq界面的jar文件
5.启动web(usr/local/rocketmq-externals-master/rocketmq-console/target目录下)
java -jar rocketmq-console-ng-1.0.0.jar 启动 ---当终端断了该服务就会停止 nohup java -jar rocketmq-console-ng-1.0.0.jar >>/usr/logs/log.out 2>&1 & 后台启动 --当终端断了也不会停止服务
6.添加端口并在阿里云服务器上开通端口
firewall-cmd --zone=public --add-port=8080/tcp --permanen 永久添加端口并重启防火墙
在阿里云服务器添加出入规则。
查看web监控:
消费者组:
本篇文章就介绍到这里,读者们首先要学会安装rocketmq、以及rocketmq中的基础概念。后续的文章会对rocketmq做具体的介绍以及使用,以及如何集成Springboot和spring cloud stream。
以上是关于高性能分布式消息中间件—RocketMQ的主要内容,如果未能解决你的问题,请参考以下文章