RocketMQ
Posted Zephyr丶J
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ相关的知识,希望对你有一定的参考价值。
RocketMQ
官网:https://rocketmq.apache.org/
看中文文档
基本概念
消息类型很多,java默认的机制并没有对消息进行归类,只会一股脑的消费,有时候就会比较混乱,性质就会受到影响;消息太多,没有分类,如果我只想消费一类数据,就实现不了
RocketMQ消费的时候,是基于主题的,在发消息的时候,可以给消息定义主题,比如说秒杀、付款…消费者只关注对应的主题,只消费对应主题
1 消息模型(Message Model)
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
2 消息生产者(Producer)
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
3 消息消费者(Consumer)
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
4 主题(Topic)
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
5 代理服务器(Broker Server)
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
MQ的服务器不是一个服务器,是要做集群的,那么集群是基于什么协议呢?是有一个路由去整体管理数据结点,一个中心节点管理其他数据节点,还是节点之间互相交叉通信,采用留言协议,一种是有中心的,一种是无中心的
MQ采用的是有中心的,最终处理数据的节点叫做Broker,是干活的节点,有多个Broker,NameServer是中心节点,路由
6 名字服务(Name Server)
名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
7 拉取式消费(Pull Consumer)
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
8 推动式消费(Push Consumer)
Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
(我们项目采用这个方式)
(以下不太重要)
9 生产者组(Producer Group)
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
一组生产者是类似的
10 消费者组(Consumer Group)
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
一组消费者是类似的
11 集群消费(Clustering)
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
12 广播消费(Broadcasting)
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
13 普通顺序消息(Normal Ordered Message)
普通顺序消费模式下,消费者通过同一个消息队列( Topic 分区,称作 Message Queue) 收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
14 严格顺序消息(Strictly Ordered Message)
严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
15 消息(Message)
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
16 标签(Tag)
为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
架构
Broker集群,有多个数据结点,数据结点可以做主从备份,有多组主备的存在
左边生产者集群,右边消费者集群,就是应用程序,往往生产者和消费者在一个应用程序里,既生产又消费
NameServer路由的集群
生成者要发消息,但是Broker有多个,发送给谁;去询问NameServer,NameServer会和Broker保持长时间的联系,不断检测这些Broker到底是死是活,根据它们的状态找一个合适的返回给生产者;消费者同样是这个逻辑
NameServer是中心,Broker是数据结点
部署好以后,启用的过程应该是怎么样的?
一般先启动NameServer,把它的集群先组建好,它们时刻准备着去发现broker;把broker启动以后,就会迅速发现broker,并把它注册进来,然后再启动生产者消费者,生产者就可以通过NameServer去访问broker
设计
生产者和消费者通过队列去沟通,核心队列一个CommitLog,生产者所有消息都先放到这个队列里,
存储基于两个东西,一个CommitLog
生产者往redis的节点里发数据,是把它放在commitLog文件里
消费者消费数据,不是从CommitLog里拿,为什么呢,因为commitLog里的数据,很多数据是有不同的主题的,因为是发布订阅模式,生产者按照主题发布消息,消费者按照主题订阅消息,如果要直接从CommitLog中去消费的话,需要遍历查看是哪一个主题而去消费,效率低;如何把分散的、随即的数据访问方式变成有序的?怎么解决性能的问题呢,怎么提高对这个队列读取的并发的能力?
就有了消费者队列ConsumerQueue,默认有三个,三个队列同时读取CommitLog,有了并发能力,并且这个队列使得同一个主题的消息逻辑上顺序的组织在一起,这样就可以按顺序消费了;可以看成是ConsumerQueue对CommitLog以做了一个索引,可以看成是CommitLog的索引,这里包含了相同主题的一些消息,我们顺序去读同样主题的一些消息,就快很多
不是基于内存的,是基于磁盘的,我们在磁盘中是可以看到文件的,因为消息量大,内存存不下;所以是持久化的,基于磁盘的随即读写效率很差,所以才有这样的设计
生产者顺序写磁盘性能很好,同一个消费队列里存放的是相同主题的有序的数据,消费者消费的时候是顺序读,顺序读写磁盘的性能不亚于随即读写内存,所以性能能够得到保证
从ConsumerQueue消费的话是通过消息的主题加ID去消费的,IndexFile是对整个CommitLog消息做了另外一个维度的索引,通过key或时间创建索引,所以有时候我们可能不想从ConsumerQueue中读,而是想以时间等维度读取数据,就可以走这个索引文件,但是默认不会用
消息刷盘
生产者往CommitLog里发消息,那么消息是什么时候刷盘的,有两种机制
一种同步刷盘,生产者递交消息给了Broker,Broker底层访问JavaHeap,堆访问虚拟机,最终虚拟机要把数据刷到硬盘里去;只有当最终落盘了,才会给一个反馈,进行下一次行为,显然这种方式比较慢
还有一种异步的方式,是说当存到虚拟机内存中以后,就返回ACK,但是内存不是迅速同步到磁盘中,而是根据系统调度同步磁盘的,什么时候落盘,基于操作系统机制或者RocketMQ的设置(1s一次还是怎么样)
我们一般采用异步的方式,因为我们关注的是吞吐量
事务消息
MQ Server可以理解为Broker(这里没有画Namesrv),
消息发送给Server,最终是由Consumer消费的,是由Producer来发送消息的
要保证分布式事务的一致性,要做两阶段提交,和mysql分布式事务一样
我们所谓的保证消息的一致性,是指本地做一件事务,最终向Server发的消息,消费者要能消费到,而这个事务,要提交成功了,就能消费到;提交失败,就消费不到;我们要保证的是本地事务和消息消费的最终一致性
1.首先,MQ Producer要想发送事务型消息,先得给Server打一个招呼,一会我要干什么,这个消息是个半成品,叫做half消息,不是发过去就让消费,暂时不让消费,什么时候可以被消费,等再通知
2.Server表示收到了,先保存这个消息,保护起来,但是不会让消费者Consumer感知到,这两步是准备工作
3.这时候Producer才要做一件事,本地一般是访问数据库,本地就会访问Mysql,执行一个事务
4.如果成功了,那么Producer通知Server本地事务成功了,就可以提交消息,让消费者消费了;如果本地事务失败了,就rollback,提醒这个消息不要让消费者感知到,将消息作废;所以会有一个通知commit或者rollback
但是到现在为止,还是不靠谱的,因为有可能事务知道执行成功还是失败了,但是生产者通知Server的时候有可能没有传达到,如果没有传达到,但是事务提交成功了,这时Server得到的半成品消息永远不会被消费到,就不能保证一致了
5.那么如何解决呢?因为第一步发送了半成品消息,MQ Server只有一个半成品的消息,所以加一个回查的机制,就是每隔一段时间看一眼这个半成品成功了吗,这个间隔时间会递增
6.Producer这时会查数据库,从数据库里检查是否成功了
7.Producer将结果反馈给Server
8.然后消费者就可以消费了
总之,事务消息是要保证本地事务和消息的一致性
所以我们需要对第1步,第三步,第六步进行实现,而第4步是基于第3步,第7步是基于第6步
下载and使用
https://rocketmq.apache.org/dowloading/releases/
source是带源码的,binary是不带的
复制链接的地址:
然后wget 链接 下载
下载好就是一个压缩包
然后unzip解压,得到一个文件夹,需要执行,所以需要赋予权限,直接给最高,只是测试
然后进去该目录下
配置
启动之前需要配置,直接启动不起来,因为它默认分配的内存是4G,而我服务器总共就4G
先改启动命令runserver和runbroker,因为我们启动的时候最少启动一个nameserver和一个broker,不能更少了
runserver.sh (82行)
改成:
分别是最小堆内存,最大堆内存,新生代内存,元空间大小,最大元空间大小
然后改/bin/runbroker.sh (67行)
再改一个配置文件:
在 /conf/broker.conf (追加),是broker的配置,需要声明有多少个broker,这样nameserver启动以后好去发现这些broker,把它们注册进来
我们只有一个所以只写一个,autoCreateTopicEnable = true 允许自动创建主题;意思是如果启动了这个选项,那么如果broker中没有生产者生产的主题,会自动创建主题,否则就得手动在broker中创建
启动
启动用到的是bin目录下对应的启动命令,底层调用的是run
后台启动:
先启动nameserver,指定端口,一般是9876,& 的意思是后台启动
看一下日志,发现启动好了
再启动broker
需要指定broker注册给谁,指定配置文件
可以看到报错,commitlg不存在
当首次安装好RocketMQ并启动之后,你看broker.log发现报错,提示说commitlog不存在。这个问题可以忽略,因为你还没有发放任何消息,这个文件还没有被初始化,当你发送任何一条消息值,它自动就会创建好。
测试
自带的测试工具tools
测试需要声明一个变量:
然后模拟生产者发送消息:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
然后看到发送了一大片消息:
然后消费测试一下:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
又是一大片
关闭:
sh ./bin/mqshutdown broker
sh ./bin/mqshutdown namesrv
简单代码测试
单向发送消息是发送过去不用回应,一般会用异步,异步是要提高性能;不管什么消息,消费逻辑类似
顺序产生消息,commer有三个queue,可以并发的访问,并发的消费,在某一个队列里消费,必须是按顺序的,从前往后;但是先在哪个队列里消费并不是确定的,如果我们要保证顺序的消费消息,底层就有一个顺序消息样例,保证多个消息之间是有序
延时消息:
一个消息延时多长时间以后再消费;场景:下单然后支付,如果长时间没有支付就取消订单;或者笔试答题,多长时间自动交卷。所有这种一开始以后达到一个固定的时间去做一个特殊的处理,都可以用延时消息
写个测试类测试一下:
生产者生产消息,需要指定生产者所在的组,nameserver的地址
消息有主题和标签,还有内容
异步发送消息,有一个回调,当返回数据以后,通过回调函数去处理,和ajax类似
消费者消费消息,设置消费者的组名,nameserver地址,订阅的是哪个主题、标签的消息
然后注册一个监听器,一旦发现有消息可以消费,就调用consumeMessage方法
然后启动消费者
测试结果如下:
前面生产了十个
body是二进制的,要用需要转换成字符串
自动配置RocketMQAutoConfiguration
首先第二个注释,启用配置类
第四个注释,有这个配置生效
最后一个,在那个类配置前做这个配置
同样,前缀是rocketmq,是为了读取配置文件中的配置
这里有一个方法,把默认的生产者和消费者设置到了这个方法里,所以要在spring架构下用rocketMQ,只要注入这个rocketMQTemplate,没有必要自己去new:
基于spring如何用?
普通消息
普通的生产者:
消费的话怎么办,不用去写一个方法去消费,在spring模式下消费是创建一个内部类,加上Service注解,声明一个监听器,只要声明,底层就会自动创建,为什么这样做,因为我们可能会有很多个主题、标签,要很多个消费者;如果直接用rocketMQTemplate去获取消费者的话,消费者只有一个,不符合我们需要,而这样写每个Bean就是一个独立的消费者,有一个独立的监听器,这样的话相当于程序中有多个消费者,这样方便
这里声明了三个消费者,一个消费0标签,一个消费1标签,一个都消费
这里我运行出错了,显示连接那个IP失败,应该是没有开放端口,把端口开放一下:
还需要开放10909和10911端口
然后成功了:
可以看到消费者0只消费偶数消息,消费者1消费奇数,X都消费
事务消息
这里代码表示第一步,发送半消息,并收到返回的结果
这里用一个组件来实现第三步和第六步,生产者和数据库的沟通
但是这个监听器只能有一个,所以事务消息都在这里处理;这里可以做判断,如果标签是什么就怎么样处理…
可以看到,因为直接返回的是COMMIT,所以消费成功,没有问题:
如果都写ROLLBACK,不会被消费
未知,就是说提交事务的时候,发生阻塞,或者死锁,导致没有结果
所以刚开始不会被消费,但是当回查的时候,返回的是COMMIT,所以过30s以后,就会被消费
以上是关于RocketMQ的主要内容,如果未能解决你的问题,请参考以下文章