从rocketmq到kafka

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从rocketmq到kafka相关的知识,希望对你有一定的参考价值。

参考技术A 在基于了解或掌握其他同类MQ的基础知识上,怎么比较快速的掌握kafka的核心设计,确保在使用的过程中做到心中有数,做到知其然并知其所以然?本篇文章主要是笔者在已有的rmq的基础上学习kafka的思路以及过程的总结。

ps、rmq指RocketMQ

ps、文章写着写着发现有点长,应该挺乱了……

ps、因为是学习笔记,所以就这样吧,随便看看……

带着问题去学习新的技能,也许会更贴近自己原有的知识储备,也能更好的把新知识纳入自己原有的知识体系并加以补充或者延展,形成更完整的知识脉络。基于原有的rmq的知识体系,在提前梳理了几个相关的,并且浅显的问题,主要是两个方面的内容,一类是MQ模型中生产者客户端的设计与消费者客户端的设计,一类是服务端的总体架构设计。

服务端的总体架构设计

客户端的总体架构设计

相信很多人不管是在面试中,还是在做MQ选型时,都会遇到几个问题,比如Kafka在超过2k的topic时性能会急剧下降,但是rmq在超过2k的topic时性能不存大规模下降,比如Kafka是一个分布式消息队列中间件,而rmq更像一个单机版消息队列中间件等。这些问题的背后,正是两个消息中间件在架构设计上的差异性所导致的,各有优劣势,我们更多的关注设计思路。先看这几个问题。

在rmq服务端写入时,完全是基于commit log 做log append,避免了磁盘的随机读写,再配合零拷贝等技术特性,成为了MQ的高并发利器。而由于RMQ的全量日志都维护在commit log,这也是其余kafka的一个架构设计上的区别。相信初步了解过kafka的同学,都应该知道其设计理念中关于分区与副本的概念,一个topic在集群中存在多个分区,一个分区在集群中存在多个副本,不同的topic之间分区是互不关联的,当单机维护超过2k的topic时,意味着单机存在2k多个分区,即便topic内日志采用log append,那么在高并发写入刷盘时,磁头在这些分区的副本文件上移来移去,性能自然会随之下降,看起来像是‘随机读写’。

这个说法是在一个中间件爱好群里看到大家在讨论时聊到的,感觉相当有意思,这种看法背后又是怎么的逻辑呢?首先,网上能找到的二者大量的对比都是基于单机的对比,集群对比很少。从分区+副本的思路来看,kafka的部署架构看起来是多个broker组成集群,但是内部运转逻辑是分区维度的多副本间高可用,即topic在多个broker之间做高可用的保证,而副本间的运转逻辑是基于zookeeper的ZAB机制。反观rmq最开始的架构确实主从架构,看起来更简单,但是可用性的保证上完全不一样,由于所有的topic都在主节点上,主节点挂了整个集群就运转不下去了,因为只有主可以支持写,所以rmq推荐使用双主架构,后来才引入raft协议支持选举,但依旧是基于broker的选举。二者最大的区别在于,集群中某个节点挂机对于整个集群的影响程度不同,毫无疑问,rmq显得更重。同样的多节点集群中,每个kafka broker都在提供读写能力,因为不同的topic的副本散落在各个broker中,而每个topic的leader副本也会分散在整个集群中,而rmq则不同,所以理论上kafka集群能提供的吞吐量应该会比rmq更高。

从前两个问题,提到了几个很核心的概念,包括分区,副本,而这也是kafka最核心设计内容。kafka的分区这个设计很有意思( 关于kafka分区 ),kafka的集群是一个整体,对于topic而言,分区个数相当于多少个可读写节点,一个分区下存在多个副本组成一个分布式可选主的‘集群’。

如上图,在一个kafka集群中,部署了三个服务端节点,在topic-a创建时,创建了2个分区,3个副本,在这个部署下,提供读写能力的只有broker1节点上1分区的副本,broker2节点上2分区的副本。对于部署节点broker而言并无主次之分,分区与分区间相互独立,分区内副本间组成集群为topic-a : partation-1提供服务。topic 与 分区 可以看做是逻辑概念,副本为物理概念。所以,前文提到弱化broker的概念就在于,它是基于分区提供服务,这个与rmq的设定完全不同,也许是先入为主的关系,又或者在rmq架构中broker的设定更像是mysql的主从设定,rmq的broker理解起来更简单。

那么什么是isr, asr? 在说这个之前,先说说对于一条消息而言,kafka理论上应该如何在兼顾一定性能的情况下获取更高的可靠性?请求写入分区1的leader副本,就能保证数据一定不丢失吗?如果此时leader节点宕机发生选举,由于follower节点还没同步leader数据,那是不是一段时间内的数据就丢失了呢?那为了更高的可靠性,是不是可以选择等所有副本都同步到当前消息才算本次写入成功?follower节点的数据时从leader节点复制而来(此处会抽象一个很常见的水位高低的概念,但是还没详细了解,暂时忽略),那如果follower节点的数据跟leader节点的数据很接近的话,那么复制会很快完成,但是如果某个follower节点的数据落后leader的节点很多,等待完全同步需要更长的时间,毫无疑问将会引发灾难性的结果。那么,有没有一种相对均衡,可接受的方案,比如只等待落后leader节点数据量较低的follower节点成功复制就算成功?技术方案的选择往往都是取舍,特别是多副本间的数据一致性的问题。

isr集合,俗称副本同步集。kafka并非是根据副本间数据复制的偏移量来计算集合,而是根据数据同步的时间间隔(参数为 **[replica.lag.time.max.ms](http://replica.lag.time.max.ms/)** ,默认为10s),将相同分区中leader与follower之间同步消息的时间间隔不超过设置的阔值的副本放入isr集合,而asr则表示所有副本集合。

有了isr集合,那么副本数据间的一定程度的一致性,可以转为只要写入isr成功就算成功,但是就算这样就可以了吗?如果leader副本宕机了集群要重新选举,选出了一个落后的follower副本,那数据还是照样丢了,kafka是不是要确保一个非isr集群的副本不能参加选举?其次,如果isr长期只有一个节点,那是不是风险依旧很高?鉴于这些问题,另外提供两个参数以供解决(参数 unclean.leader.election.enable , 设置为false表示非isr节点不参与选举, 参考文章 。参数 min.insync.replicas ,最小同步副本个数,既isr集合大小,推荐设置asr -1 ,具体视副本数大小设定 ), 推荐查阅的isr文章 。在兼顾kafka的性能与可靠性间取舍,通过生产者端的 acks 参数来设定。

在聊kafka的isr集合时,让笔者想到rmq的几个参数,客户端的同步/异步发送,服务端的同步刷盘,异步刷盘,同步复制,异步复制,由于后期rmq基于raft协议做集群的选举,并不知道是否还有其他的副本间数据一致性的方案,并且也有数年未翻过rmq的源码,细节了解不多,所以不好下定论,但是从颗粒度的设计上无疑rmq会更粗糙一些,理解上也更加简单,但是,个人认为kafka的设计相对更高级,在生产环境中使用会更加放心。

以上大致也谈了kafka集群的部署,以及isr合集,分区副本等概念,还未谈及kafka服务端的日志的设计,从网上大致了解了一番,感觉跟rmq的设计有相似之处(rmq的设计理念很多也是借鉴了kafka),这块内容待后续补充吧。

前文带着问题来学习时提到客户端中生产者我们需要关注的内容,包括同步异步,负载均衡,流控等的内容。生产者的本质是什么,无疑是如何将消息正确的投递到服务端。由于目前笔者使用的是Java,所以直接翻阅了Java语言实现的客户端程序(spring-kafka-2.1.7, kafka-client-1.0.1),其他语言实现的客户端程序可能存在一定差异。先提供几篇学习资料的传送门: 初识kafka producer 、 Kafka消息发送流程 、 Sender线程详解

不得不提的几个参数

其中笔者觉得有几个点是需要重点关注的,如下:

1、分区器是重点需要关注

分区器的逻辑相对简单,但是却要关注,因为它很重要,它在做选择分区的活。

2、acks=0的逻辑(其他的逻辑需要原来服务端的响应,可以看看NetworkClient#handleCompletedReceives)

3、RecordAccumulator#ready 中判断了什么

4、回调里究竟做了什么?

5、流控问题

应该算初步掌握Kafka Producer,至少日常使用应该是没问题了,前文提到同步、异步、流控、负载均衡等问题都应该有了了解,其他的细节有时间有遇到问题再单独补充吧~

消费端的核心流程一般有三个:1、元数据同步过来后的负载均衡,消费端剔除,reblance等。2、消息的poll流程。3、offset commit流程。先提供几篇学习资料: 线程拉取模型 , 核心参数 , 重平衡

基本上看完了消费者客户端的核心流程,总体感觉相对复杂。相比rmq而言,印象中,rmq的rebalance的设计更简单一些,有一条线程去做reblance,有一条线程去做poll,线程间相互隔离,relbance的流程不会阻塞poll流程。并且由于是客户端均分负载,每个客户端只负责从ns中拿到元数据,然后客户端按照算法本地均分,结果并不会反馈给服务端,既rmq的客户端如果采用算法不一样,就会有风险。

1、kafka的reblance中需要引入一个协调器Coordinator,客户端从本地缓存的元数据中随机找到一个可用的leader节点,向其发送寻找协调器的节点。主要向服务端带了一个grouopId的数据去找到合适的协调器,寻找协调器的过程中是超时阻塞的。

2、找到协调器后,判断是否需要加入总的分配分组(仅看选择了自动分配的订阅类型的)。判断上一次分配的元数据跟当前的元数据是否发生变更,判断已加入的主题列表与当前的主题列表是否发生变更,已经是否需要rejoin的标记位。

3、确认需要加入分组后,如果是自动分配分区的场景下,需要拉去最新的元数据,之后进入reblance阶段。

4、reblance分为两个阶段,一个是prepare阶段,主要做包括(自动的)同步本地的最新commit,触发reblance钩子函数,重置分组订阅关系,还需要停掉当前的心跳(防止它影响分配的过程)。另外一个就是加入阶段,向服务端发起请求后等待结果异步回调,回调中会判断如果当前的节点是leader节点,则执行分配逻辑,并将分配逻辑发给服务端。如果当前的节点是follower节点,则向服务端发送空的分配结果,并等待服务端回调最终的分配结果。等服务端返回最终结果都是阻塞状态。

5、执行协调器分配完成。

从中可以理解,分配的逻辑在客户端中的某一个节点,客户端分配完成后将结果返回给服务端,服务端再分发给各个节点,确保整个客户端集群的分配算法是统一的。触发reblance之后,如果集群中的节点发生变更,会怎么触发第二次reblance?在客户端父服务端的交互中,比如通过心跳上报时,服务端返回 Errors.REBALANCE_IN_PROGRESS ,客户端重置本地 needRejoin 标记位,等待下一次发起poll时进行第二次reblance。同理,提交偏移量时会返回需要reblance。

RocketMQ—基础篇

前言

RocketMQ最初是cooy的Kafka,改成了java语言。所以,RocketMQ中充斥着Kafka的影子。学习RocketMQ前,最好学习Kafka知识。
RocketMQ需要占用的内存较大。默认的配置中,配置到了8G的内存。所以想玩RocketMQ,必须保证有足够的运行内存。

一、物理架构

在这里插入图片描述
NameServer:
NameServer类似于Kafka中的Zookeeper,是RocketMQ的服务注册中心,所以启动RocketMQ需要先启动NameServer再启动Broker。
Broker在启动时向所有NameServer注册服务器地址等信息,生产者在发送消息之前先从NameServer获取Broker服务器地址列表(消费者一样),然后根据负载均衡算法从列表中选择一台服务器进行消息发送。
NameServer与每台Broker服务保持长连接,间隔30S检查Broker是否存活,如果宕机,则从路由注册表中将其移出。

二、核心概念

分组(Group)
生产者:标识发送同一类消息的Producer,发送普通消息时,仅标识使用。主要用作事务消息。

消费者:标识一类Consumer的集合,这类Consumer通常消费一类消息,且消费逻辑一致。同一个Consumer Group下的各个实例共同消费topic的消息,起到负载均衡的作用。这句话的意思是,同一类的Consumer Group,共同订阅并消费某一个topic,且他们的处理逻辑都一样。

Consumer Group 和Kafka中的消费者群组类似,一个群组内的消费者共同消费一个主题,偏移量是共享的。不同群组的消费者消费一个主题,偏移量是各自群组的。

主题(Topic)
RocketMQ中的主题,引入了标签(Tag) 概念,消息发送时,给消息打Tag,在消费时,再指定Tag,那么消费者就只能消费这个主题中携带这个Tag的消息。

消息队列(Message Queue)
类似于Kafka中分区的概念。消息的物理管理单位。一个Topic可以有多个Queue。若一个Topic创建在 不同的Broker上,则不同的Broker上都有若干个Queue,消息将物理地址存储落在不同Broker节点上。这样可以提高消费者的消费速度,提高并发度。
生产者和消费者,最终交互的,都是topic下的Queue。这和Kafka的模式一样。

偏移量(Offset)
RocketMQ 中,有很多 offset 的概念。一般我们只关心暴露到客户端的 offset。不指定的话,就是指 Message Queue 下面的 offset。
Message queue 是无限长的数组。一条消息进来下标就会涨 1,而这个数组的下标就是 offset,Message queue 中的 max offset 表示消息的最大 offset。
offset实际上表示的是下次拉取的 offset 位置。
和Kafka中分区偏移量意思类似。

消息(Message)
Message 是 RocketMQ 消息引擎中的主体。messageId 是全局唯一的。MessageKey 是业务系统(生产者)生成的,所以如果要结合业务,可以使用 MessageKey 作为业务系统的唯一索引。

三、零拷贝与MMAP

以上是关于从rocketmq到kafka的主要内容,如果未能解决你的问题,请参考以下文章

消息队列技术选型(Kafka + RocketMQ)

Canal利用canal实现mysql实时增量备份并对接kafka

从rocketmq到kafka

RocketMQ 升级到主从切换(DLedger多副本)实战

从RocketMQ的Broker源码层面验证一下这两个点

44 源码:从Github拉取RocketMQ源码以及导入IntellijIDEA中