Rocketmq
Posted mingyuewu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rocketmq相关的知识,希望对你有一定的参考价值。
Rocketmq安装
时间 | 2021-08-09 |
---|---|
环境 | centos7.6 jdk 1.8 rocketmq4.9.0 maven3.8 |
一、Rocketmq简介
消息队列是一个符合先进先出原则的单向队列:一方发送消息,并存到消息队列尾部(生产者投递消息),一方从消息队列的头部取出消息(消费者消费消息)。
1 消息队列优点:
(1 应用解耦:消息队列可以将同步的系统调用转为异步的消息投递,一定程序上解除业务子系统间的耦合
(2 流量削峰:消息队列作为缓冲,将每一个用户下单请求都作为一条消息存入消息队列。消息队列会根据消费者的速度稳定的将流量传递给下游的消费者系统。生产者生产速度很快,消费者消费速度慢,加一个消息队列让二者的速度匹配。
(3 异步处理
提供并发
2 rocketmq组成部分
producer、consumer ,可以看做是rocketmq的客户端。brokerServer、nameServer,可以看做是rocketmq的服务端
(1 producer 生产消息
producer将消息发送到brokerServer,由brokerServer统一进行消息的分发。
rocketmq 支持多种消息发送方式:同步消息发送、异步回调消息发送、顺序消息发送以及单向消息发送(异步无回调)
除了单向消息发送,其余的发送方式均需要brokerServer返回发送结果的确认消息。支持发送事务消息。
(2 Consumer消费producer生产的消息
consumer会从brokerServer获取消息,并传递给应用程序。如果一定时间内没有接收到consumer确定消费的响应结果,会将同一条消息再次投递给consumer。consumer分为两种方式来获取信息。
推模式(push consume):brokerServer将消息退给了consumer;
拉模式(pull consume):consumer主动区brokerServer拉取消息。
(3 BrokerServer 接收存储分发消息
brokerServer是消息队列的消息存储、转发服务器,是rocketmq最核心的组成部分。
为实现高可用和高吞吐量,brokerServer通常采用集群部署,共同对外服务。
每个Broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后定时上报。
(4 NameServer提供路由元数据
brokerServer通常是集群部署的,其拓扑结构会经常发送变化。如果每次集群中broker机器的上下线都需要通知所有的消费者和生产者,效率低。 nameServer作为brokerServer路由信息的维护者,broker的每次上下线都和nameServer通信,由nameServer来维护broker的路由信息。producer和consumer通过访问nameServer获得broker的访问地址后,再向对应的broker发起请求。nameServer解除了broker和客户端的耦合依赖关系。(zookeeper的作用)
nameserver是服务发现者,集群中各个角色(producer、broker、consumer等)都需要定时向nameserver上报自己的状态,以便互相发现彼此,超时不上报的话,nameserver会把它从列表中剔除。
NameServer集群间互不通信,没有主备的概念
nameserver内存式存储,nameserver中的broker、topic等信息默认不会持久化 ,所以他是无状态节点
3 rocketmq架构图
4 rocketmq的基本概念模型
(1 topic主题
topic主题,代表一系列消息的集合,任何消息都只能属于一个topic主题,主题是rocketmq进行消息发布订阅的最小单位,
topic和queue的本质区别:一条消息能不能被多个消费者消费
queue: 任何一条消息只能由一个消费者接收到。(单播),每个队列上只能被一个线程消费
topic : 一条消息可以被多个消费者消费。
每个主题包含多个队列(分区),实现消息消费的并行,生产的并行。
(2 tag标签
tag标签,tag从属于topic主题,主要用于对同一主题下的消息进行进一步区分。(二级主题)
(3 group组
group组代表同一类客户端的集合。可分为消费者组(consumer group)和生产者组(producer group)两种。消费者组和生产者组之间没有任何关联。
同一消费者组内的消费者,通常消费同样的消息,且消息消费逻辑一致。
(4 message 消息
message消息是rocketmq中传递消息的主体,消息具有全局唯一的messageID属性,可根据此ID进行精确查询。
(5 Message Model
消息模型:集群(Clustering)和广播(Broadcasting)
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
(6 Queue
1个Topic会被分为N个Queue,数量是可配置的。message本身其实是存储到queue上的,消费者消费的也是queue上的消息
(7 Message Order
(8 分布式事务
消息顺序:顺序(Orderly)和并发(Concurrently)
(9 消息不丢失
利用消息队列的有序性验证消息是否丢失,消费者收到的消息序号递增。
kafka和rockmq不保证在topic上的严格顺序,保证在queue上的顺序,验证在queue上的序列号。
有多个producer实例时,要验证每个producer的顺序,需要producer的标识。
(10 消息重复消费
幂等操作:任意多次执行产生的影响与一次执行产生的影响是一致的,不用担心重复执行给系统带来影响。
幂等操作的实现:给数据增加版本号。
在发送消息时,给每个消息指定一个全局唯一的ID,消费者消费该消息时,会先查询该ID的消息是否被消费过,如果没有没消费过,才关系数据,然后将状态置为已消费。
(11 消息堆积
原因:消息生产者和消息消费者性能倒挂,生产了太多,消费者消费不完,使消息堆积。
解决:物理扩容,增加consumer实例数量,同时增加queue数量。
二、软件安装路径及端口规划
端口 | nameserver listenPort=9876 |
---|---|
家目录 | /usr/local/rocketmq/distribution/target/rocketmq-4.9.0/rocketmq-4.9.0 |
bin文件目录 | /usr/local/rocketmq/distribution/target/rocketmq-4.9.0/rocketmq-4.9.0/bin |
日志文件 | ~/logs/rocketmqlogs 运行程序后,自动在该用户的家目录下创建该日志文件 |
启动nameServer命令 | 到ROCKETMQ家目录下,nohup sh bin/mqnamesrv |
启动broker命令 | 到ROCKETMQ家目录下,nohup sh bin/mqbroker -n localhost:9876 & |
三、单机安装过程
0 参考文档
官方安装文档英文 | https://rocketmq.apache.org/docs/quick-start/ |
---|---|
原理、架构、部署、应用 | https://github.com/apache/rocketmq/tree/master/docs/cn |
Apache 上开源官网站 | https://rocketmq.apache.org/ |
阿里官方的介绍文档 | http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/ |
集群部署参考文档 | https://github.com/apache/rocketmq/blob/master/docs/cn/operation.md |
1 下载源码包
https://github.com/apache/rocketmq
2 解压编译
使用Maven来编译整个项目
#解压
unzip rocketmq
#编译
进入解压目录
mvn -Prelease-all -DskipTests clean install -U
# 配置环境变量
vim /etc/profile
export ROCKETMQ_HOME=/usr/local/rocketmq/distribution/target/rocketmq-4.9.0/rocketmq-4.9.0
export PATH=$PATH:$ROCKETMQ_HOME/bin
source /etc/profile
3 修改内存配置
bin目录下的runbroker.sh和runserver.sh文件中默认分配的内存较大,官方要求有4g可用内存以上。
(1修改runserver.sh文件
#JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
(2 修改runbroker.sh 文件
# JVM Configuration
#===========================================================================================
#JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m"
4 启动与关闭
(1) 先启动namesrv,再启动broker
进入RocketMq安装后的家目录,启动的时候先启动namesrv,然后启动broker。
#后台启动mqnamesrv nohup后台启动,在当前目录下生产nohup.out日志文件,也可以指定日志输出位置。
[root@node01 rocketmq-4.9.0] pwd
/usr/local/rocketmq/distribution/target/rocketmq-4.9.0/rocketmq-4.9.0
[root@node01 rocketmq-4.9.0] nohup sh bin/mqnamesrv &
[1] 23356
[root@node01 rocketmq-4.9.0] nohup: ignoring input and appending output to ‘nohup.out’
# 验证 只有在运行mqnamesrv后,才会在运行该命令的用户的家目录下产生log文件
[root@node01 rocketmq-4.9.0]# tail -f ~/logs/rocketmqlogs/namesrv.log
2021-08-10 10:28:51 INFO main - tls.client.authServer = false
2021-08-10 10:28:51 INFO main - tls.client.trustCertPath = null
2021-08-10 10:28:51 INFO main - Using JDK SSL provider
2021-08-10 10:28:51 INFO main - SSLContext created for server
2021-08-10 10:28:51 INFO main - Try to start service thread:FileWatchService started:false lastThread:null
2021-08-10 10:28:51 INFO NettyEventExecutor - NettyEventExecutor service started
2021-08-10 10:28:51 INFO main - The Name Server boot success. serializeType=JSON
# 后台启动Broker中间件 -n 后面的参数为 连接到的NameServer的ip和端口,语法ip:port;ip:port
[root@node01 rocketmq-4.9.0] pwd
/usr/local/rocketmq/distribution/target/rocketmq-4.9.0/rocketmq-4.9.0
[root@node01 rocketmq-4.9.0] nohup sh bin/mqbroker -n localhost:9876 &
[2] 29838
[root@node01 rocketmq-4.9.0]# nohup: ignoring input and appending output to ‘nohup.out’
# 验证
tail -200 ~/logs/rocketmqlogs/broker.log 或者输入 jps
(2) producer、consumer测试启动情况
> export NAMESRV_ADDR=localhost:9876
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
(3) 先关闭broker,再关闭nameserv
> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664)
(4)其他命令
#查看集群情况
mqadmin clusterList -n 127.0.0.1:9876
#查看broker状态
mqadmin brokerStatus -n 127.0.0.1:9876 -b IP:port
#查看topic列表
mqadmin topicList -n 127.0.0.1:9876
#查看topic状态
mqadmin topicStatus -n 127.0.0.1:9876 -t MyTopic
#查看topic路由
mqadmin topicRoute -n 127.0.0.1:9876 -t MyTopic
打开防火墙
firewall-cmd --zone=public --add-port=9876/tcp --permanent #开启 9876 端口
#重启防火墙
firewall-cmd --reload #查看开放的端口
firewall-cmd --zone=public --list-ports
四、集群部署方案
这里指broker的集群,nameserver的集群和节点之间无联系。
疑问@@
每个Master存的消息一样吗?
如何确保消息不被重复消费?
参考:https://github.com/apache/rocketmq/blob/master/docs/cn/operation.md
1 单Master模式
风险较大,一旦Broker重启或者宕机,会导致整个服务不可用。不建议线上环境使用,可用于本地测试
2 多Master模式
一个集群全是Master,无Slave,例如两个Master
优点:配置简单,单个Master宕机或重启对应用无影响,在磁盘配置为RAID10时,即使机器不可恢复,由于RAID磁盘非常可靠,消息也不会丢失
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可被订阅,消息实时性会受到影响。
3 多Master多Slave模式–异步复制
每个Master配置一个Slave,有多对Master-Slave 。主备有短暂消息延迟(毫秒级)
优点:Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
缺单:Master宕机,磁盘损坏情况下会丢失少量消息。
4 多Master多Slave模式–同步双写
每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写模式,即只有主备都写成功,才向应用返回成功。
优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
缺点:性能比异步复制模式略低(大约低10%),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机
五、其他知识点
1 maven:帮助开发者下载jar包并解决依赖关系的工具
(jar坐标唯一定位一个jar,实训的时候学过在,pom.xml配置文件中写好jar坐标,类似URL,maven会给下载好jar包)
rockmq要求的maven版本是3.2及以上,yum安装的maven版本过低,需要卸载。
# 卸载yum安装的包
yum list installed | grep maven
yum -y remove maven-wagon.noarch
# 验证是否卸载干净
[root@node01 ~]# mvn
-bash: /usr/bin/mvn: No such file or directory
从官网下载版本符合的二进制版
#下载地址
https://maven.apache.org/download.cgi
# 安装教程 解压安装,加环境变量bin,官网有安装手册
unzip apache-maven-3.8.1-bin.zip
tar xzvf apache-maven-3.8.1-bin.tar.gz
#验证是否安装成功
[root@node01 src]# mvn -v
Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)
Maven home: /usr/local/src/apache-maven
Java version: 1.8.0_231, vendor: Oracle Corporation, runtime: /usr/local/jdk1.8.0_231/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-957.21.3.el7.x86_64", arch: "amd64", family: "unix"
以上是关于Rocketmq的主要内容,如果未能解决你的问题,请参考以下文章