聊聊 Kafka:Kafka 如何保证一致性
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊聊 Kafka:Kafka 如何保证一致性相关的知识,希望对你有一定的参考价值。
参考技术A在如今的分布式环境时代,任何一款中间件产品,大多都有一套机制去保证一致性的,Kafka 作为一个商业级消息中间件,消息一致性的重要性可想而知,那 Kafka 如何保证一致性的呢?本文从高水位更新机制、副本同步机制以及 Leader Epoch 几个方面去介绍 Kafka 是如何保证一致性的。
要想 Kafka 保证一致性,我们必须先了解 HW(High Watermark)高水位和 LEO(Log End Offset)日志末端位移,看下面这张图你就清晰了:
高水位的作用:
这里我们不讨论 Kafka 事务,因为事务机制会影响消费者所能看到的消息的范围,它不只是简单依赖高水位来判断。它依靠一个名为 LSO(Log Stable Offset)的位移值来判断事务型消费者的可见性。
日志末端位移的作用:
高水位和 LEO 是副本对象的两个重要属性。Kafka 所有副本都有对应的高水位和 LEO 值,而不仅仅是 Leader 副本。只不过 Leader 副本比较特殊,Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位。
现在,我们知道了每个副本对象都保存了一组高水位值和 LEO 值,但实际上,在 Leader 副本所在的 Broker 上,还保存了其他 Follower 副本的 LEO 值,请看下图:
从图中可以看出,Broker 0 上保存了某分区的 Leader 副本和所有 Follower 副本的 LEO 值,而 Broker 1 上仅仅保存了该分区的某个 Follower 副本。Kafka 把 Broker 0 上保存的这些 Follower 副本又称为远程副本(Remote Replica)。Kafka 副本机制在运行过程中,会更新 Broker 1 上 Follower 副本的高水位和 LEO 值,同时也会更新 Broker 0 上 Leader 副本的高水位和 LEO 以及所有远程副本的 LEO, 但它不会更新远程副本的高水位值,也就是我在图中标记为灰色的部分 。
这里你可能就困惑了?
别着急,老周带你看下源码:
在 kafka.cluster.Partition#makeLeader 中:
Leader 副本所在的 Broker 上只有重置更新远程副本的 LEO,并没有远程副本的 HW。
这里你又可能会问了?
Broker 0 上保存这些远程副本的主要作用是,帮助 Leader 副本确定其高水位,也就是分区高水位。
第二个问题我们直接来看下 HW 和 LEO 被更新的时机:
处理生产者请求的逻辑如下:
处理 Follower 副本拉取消息的逻辑如下:
从 Leader 拉取消息的处理逻辑如下:
搞清楚了上面 HW 和 LEO 的更新机制后,我们举一个单分区且有两个副本的主题来演示下 Kafka 副本同步的全流程。
当生产者发送一条消息时,Leader 和 Follower 副本对应的 HW 和 LEO 是怎么被更新的呢?
首先是初始状态。下面这张图中的 remote LEO 就是刚才的远程副本的 LEO 值。在初始状态时,所有值都是 0。
当生产者给主题分区发送一条消息后,状态变更为:
此时,Leader 副本成功将消息写入了本地磁盘,故 LEO 值被更新为 1。
Follower 再次尝试从 Leader 拉取消息。和之前不同的是,这次有消息可以拉取了,因此状态进一步变更为:
这时,Follower 副本也成功地更新 LEO 为 1。此时,Leader 和 Follower 副本的 LEO 都是 1,但各自的高水位依然是 0,还没有被更新。它们需要在下一轮的拉取中被更新,如下图所示:
在新一轮的拉取请求中,由于位移值是 0 的消息已经拉取成功,因此 Follower 副本这次请求拉取的是位移值 =1 的消息。Leader 副本接收到此请求后,更新远程副本 LEO 为 1,然后更新 Leader 高水位为 1。做完这些之后,它会将当前已更新过的高水位值 1 发送给 Follower 副本。Follower 副本接收到以后,也将自己的高水位值更新成 1。至此,一次完整的消息同步周期就结束了。事实上,Kafka 就是利用这样的机制,实现了 Leader 和 Follower 副本之间的同步。
上面的副本同步机制似乎很完美,我们不妨来思考下这种场景:
从刚才的分析中,我们知道,Follower 副本的高水位更新需要一轮额外的拉取请求才能实现。如果把上面那个例子扩展到多个 Follower 副本,情况可能更糟,也许需要多轮拉取请求。也就是说,Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。基于此,社区在 0.11 版本正式引入了 Leader Epoch 概念,来规避因高水位更新错配导致的各种不一致问题。
所谓 Leader Epoch,我们大致可以认为是 Leader 版本。它由两部分数据组成。
我举个例子来说明一下 Leader Epoch。假设现在有两个 Leader Epoch<0, 0> 和 <1, 120>,那么,第一个 Leader Epoch 表示版本号是 0,这个版本的 Leader 从位移 0 开始保存消息,一共保存了 120 条消息。之后,Leader 发生了变更,版本号增加到 1,新版本的起始位移是 120。
Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个 checkpoint 文件中。当 Leader 副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。如果该 Leader 是首次写入消息,那么 Broker 会向缓存中增加一个 Leader Epoch 条目,否则就不做更新。这样,每次有 Leader 变更时,新的 Leader 副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移,以避免数据丢失和不一致的情况。
源码在 org.apache.kafka.raft.LeaderState 中:
Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据:
同时它还会定期地将这些信息持久化到一个 checkpoint 文件中:
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState#write
接下来,我们来看一个实际的例子,它展示的是 Leader Epoch 是如何防止数据丢失的。请先看下图:
开始时,副本 A 和副本 B 都处于正常状态,A 是 Leader 副本。某个使用了默认 acks 设置的生产者程序向 A 发送了两条消息,A 全部写入成功,此时 Kafka 会通知生产者说两条消息全部发送成功。
现在我们假设 Leader 和 Follower 都写入了这两条消息,而且 Leader 副本的高水位也已经更新了,但 Follower 副本高水位还未更新——这是可能出现的。还记得吧,Follower 端高水位的更新与 Leader 端有时间错配。倘若此时副本 B 所在的 Broker 宕机,当它重启回来后,副本 B 会执行日志截断操作,将 LEO 值调整为之前的高水位值,也就是 1。这就是说,位移值为 1 的那条消息被副本 B 从磁盘中删除,此时副本 B 的底层磁盘文件中只保存有 1 条消息,即位移值为 0 的那条消息。
当执行完截断操作后,副本 B 开始从 A 拉取消息,执行正常的消息同步。如果就在这个节骨眼上,副本 A 所在的 Broker 宕机了,那么 Kafka 就别无选择,只能让副本 B 成为新的 Leader,此时,当 A 回来后,需要执行相同的日志截断操作,即将高水位调整为与 B 相同的值,也就是 1。这样操作之后,位移值为 1 的那条消息就从这两个副本中被永远地抹掉了。这就是这张图要展示的数据丢失场景。
严格来说 ,这个场景发生的前提是 Broker 端参数 min.insync.replicas 设置为 1 。此时一旦消息被写入到 Leader 副本的磁盘,就会被认为是“已提交状态”,但现有的时间错配问题导致 Follower 端的高水位更新是有滞后的。如果在这个短暂的滞后时间窗口内,接连发生 Broker 宕机,那么这类数据的丢失就是不可避免的。
现在,我们来看下如何利用 Leader Epoch 机制来规避这种数据丢失。请看下图:
场景和之前大致是类似的,只不过引用 Leader Epoch 机制后,Follower 副本 B 重启回来后,需要向 A 发送一个特殊的请求去获取 Leader 的 LEO 值。在这个例子中,该值为 2。当获知到 Leader LEO=2 后,B 发现该 LEO 值不比它自己的 LEO 值小,而且缓存中也没有保存任何起始位移值 > 2 的 Epoch 条目,因此 B 无需执行任何日志截断操作。这是对高水位机制的一个明显改进,即副本是否执行日志截断不再依赖于高水位进行判断。
现在,副本 A 宕机了,B 成为 Leader。同样地,当 A 重启回来后,执行与 B 相同的逻辑判断,发现也不用执行日志截断,至此位移值为 1 的那条消息在两个副本中均得到保留。后面当生产者程序向 B 写入新消息时,副本 B 所在的 Broker 缓存中,会生成新的 Leader Epoch 条目:[Epoch=1, Offset=2]。之后,副本 B 会使用这个条目帮助判断后续是否执行日志截断操作。这样,通过 Leader Epoch 机制,Kafka 完美地规避了这种数据丢失场景。
Kafka 消息队列
目录
主流的消息队列
目前我们市面上比较常见的消息队列主要有Kafka、ActiveMQ、RabbitMQ、RocketMQ等。但是这么多的消息队列我们如何进行选择呢?
在大数据的场景中我们主要采用Kafka作为消息队列。我们在JavaEE的开发当中主要采用ActiveMQ、RabbitMQ、RocketMQ。
消息队列的应用场景
虽然我现在学了几个消息队列,但是我自己对他没有什么明确的定义。
那现在就一定要记住它的功能。
传统的消息队列的主要应用场景包括:缓存/肖锋、解耦和异步通信。
缓存/肖锋
缓存/肖锋:有利于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
比如有下面这种情况:我们在大型活动中比如像双十一这样的活动。比如双十一参与的用户,每秒可以达到10亿/s,来进行一次秒杀,但是我们后台的最大的处理能力只有1千万/s,但是我这个系统扛不住这个海量的数据,但是我还要搞这次活动怎么办,那么我们就可以使用消息队列来处理。
这个情况下,我们的消息队列就起到了非常重要的作用。我们可以将我们的10亿次/s的数据先缓存到消息队列MQ中,一条一条缓存进来。然后再按照我们的处理系统1千万/s的处理能力,多花个几秒钟的时间就可以把它给处理完了。这也是消息队列中一个非常典型的应用场景。
解耦
解耦:允许在消息队列两边修改或者是独立拓展处理,只需要确保他们遵循MQ统一的约束接口就行。
比如我们需要将我们的多个数据源如MySQL、MongoDB等传送到我们的数据处理框架中如SpringBoot、Spark中。如果没有消息队列的话,那我们就要对他们进行一一处理,编写相应的框架代码,这样开发成本很高、而且后续的修改也需要一一修改,这样麻烦。但是如果我们有了消息队列,我们的数据源只需要往消息队列里面写数据就行了,然后我们后端处理框架需要数据的,只需要从MQ中去取就行了,这样就减少了开发成本,有一个解耦的作用。
异步处理
异步处理区别于同步处理
同步处理就是一步一步把所有的事情都给做完,比如我们用户填写注册信息,然后后台将用户信息写入数据库,然后在调用发送短信接口来发送短信,假设发送短信这个过程需要等待3秒,然后之后页面才响应注册成功。
而**异步处理是我们把核心的事情处理完毕之后,其他另一些不太重要的事情交给MQ去处理,哪怕失败了没有关系,不影响核心任务的完成。**如我们上述说的如果用异步来进行处理的话,用户填完注册信息之后,我们后台将用户信息写入数据库,这个时候,我们就可以响应给用户注册成功了,而发送短信的这个过程交给异步来做。这样子用户就不需要进行等待了。
但是异步处理也不代表不重要,像现在大多数电商分布式场景基本都是使用异步处理的,相反异步处理带来的重试机制反而可以提高业务的成功率的。
我们使用异步处理的主要目的是:减少请求响应时间,实现流程异步化,提高系统响应性能。
Kafka
Kafka的定义
什么是Kafka?
Kafka是一个基于分布式的、具有高吞吐量的发布、订阅模式的消息队列,主要应用于大数据领域,它可以承担大数据的存储、分析和计算。
Kafka的底层基础架构
首先我们先来说一下,Kafka的基本组成。
首先是Kafka 的Producer生产者,用来对接外部传进来的数据的。
然后是Kafka 的Consumer消费者,是用来消费Kafka的数据的。
再有就是Kafka的中间部分,我们Kafka一般都是基于分布式的,所以中间是我们的Kafka集群,集群中存储的是Kafka的Topic主题,用来存储各种各样的数据的地方。(我们Topic主题的分区,可以放在不同的服务器上)
假设我们现在有100T的数据,我们需要把这100T的数据传入到Kafka里面,同时做一个消费,100T的数据,一般情况下,我们的单独一台服务器来存储,肯定是存不下的。所以需要海量的数据,我们需要将数据分而治之,所以Kafka也采用了这种思想。Kafka把一个Topic主题分成多个Partition分区,这样我们一个Topic每个Partition就可以分别承担存储数据的压力。
为了配合Kafka集群中主题分区的设计,Kafka官方提出了消费者组的概念,组内可以多个消费者,并且为了后续的管理,主题中的一个分区,只能由消费者组内一个消费者来消费。
而为了提高kafka的高可靠,为每一个Partition分区增加了副本,并且分区的副本有Leader和Follower之分,这里需要注意的是,无论是Kafka的生产者和消费者,来处理数据的时候,只针对Leader分区进行生产和消费,Follower主要是用来当作备份,如果Kafka分区Leader挂掉之后,Follower副本有条件可以成为Leader。
在Kafka2.8.0之前,Kafka必须配置Zokeeper进行使用的,Kafka一些数据是存储在Zookeeper里面的,Zookeeper集群主要存储Kafka中哪些brokers服务器上线了,并且记录了每一个分区谁是Leader。而在Kafka2.8.0之后,Zookeeper是可选的,我们可以选择Zookeeper来存储信息,也可以使用Kafka的Kraft模式。
这就是Kafka的基础架构。
Kafka分区如何保证Leader选举
Kafka集群中每一个broker都会有一个Controller,而只有一个broker的Controller会被选举为Controller Leader,它负责管理Kafka broker集群的上下线并负责Leader的选举工作。而Kafka分区的Leader和Follower的信息同步工作是依赖于Zookeeper的。
假设我们已经启动了Zookeeper集群和Kafka集群。
Kafka每启动一个节点Broker就会在Zookeeper当中注册一个节点信息。
而Kafka的每一个节点Broker都会有Controller,而这些Controller会抢着去Zookeeper集群中注册controller,谁先注册到,谁就可以成为Controller,负责Leader的选举工作。
假如我们Kafka Broker0抢到了Controller Leader,那么Broker0第一时间会监控Zookeeper当中对应的kafka集群信息brokers/ids/里面的节点变化,里面如果有任何信息变化都能快速的捕捉到。
下面我们来讲一下Kafka中Leader真正的选举策略。
选举规则:在ISR 中存活为前提,安装AR中排在前面的优先。
例如ISR[1,0,2],AR[1,0,2],那么Leader就会安装1,0,2的顺序轮询。
选举出来后,Controller Leader将对应的主题的ISR和Leader信息上传到Zookeeper,防止Controller Leader节点挂了,其他节点可以快速识别信息。
现在理顺一下:假设我们有三个Kafka Broker0/1/2 , 1 为Leader,假设broker1挂掉了,那么在Zookeeper中kafka集群信息brokers/ids/中就会去掉broker1,而broker0(Leader Controller)已经监控了Zookeeper中kafka集群信息的变化,首先它会在Zookeeper集群中拉出对应的主题的ISR和Leader信息,拉过来之后,就按照上述的选举规则进行选举,Leader选举出来之后,Controller Leader将对应的主题的ISR和Leader信息上传到Zookeeper。
Kafka分区如何保证Leader和Follower数据的一致性
Leader和Follower数据的一致性、Kafka数据的同步机制都是一致的说法。
在我们探讨Kafka底层的基础架构的时候,我们说Kafka的Topic主题分区有Leader和Follower之分,而无论是Kafka的生产者和消费者,来处理数据的时候,只针对Leader分区进行生产和消费,而所有的Follower都是从Leader中拉取数据进行数据 同步的,由于生产和消费只针对Leader,所以Leader一般数据较多,而Follower较少,那Kafka到底是怎么样同步的?
假设我们有三个Kafka节点broker0/1/2,broker0为Leader。
首先我们需要明白几个概念
LEO:每个副本最后一个offset+1。就像我们的数组a[8],LEO就是8。
HW(High Watermark)高水位线:所有副本最小的LEO。消费者能够看到最大的数据就是HW-1。
Leader和Follower数据的一致性分为两种情况
1)Leader故障
(1)Leader发生故障之后会被踢出ISR,然后根据Leader选举规则选举新的Leader。
(2)由于生产和消费只针对Leader,所以Leader一般数据较多,而Follower较少,为保证多个副本数据之间的一致性,其余的Follower会先将给的log文件高于Leader的HW的部分截掉,然后大家的数据就都一致了。
2)Follower故障
(1)Follower发送故障之后会被踢出ISR。
(2)这个期间存活的节点继续接收数据。
(3)待该Follower恢复后,Follower会读取本地磁盘记录上一次的HW,并通过log文件将高于HW的部分截取掉,从HW开始向Leader继续同步。
(4)等该Follower的LEO达到该Topic的HW,该Follower就可以重新加入ISR了。
由上述的过程我们可以知道Kafka只能保证数据的一致性,并不能保证数据不丢失或者不重复。
Kafka 中消费者的消费方式
在正常情况下,MQ消息队列中,消费者有两种消费方式:由MQ集群推(push)和由消费者进行拉(pull)。
而Kafka采用的是消费者主动拉取(pull)的方式。
如果Kafka采用Push推的方式,消费者方面各消费速度可能不同,很难适应所有的消费者的消费速率。如Broker是推10ms/s、20m/s还是50m/s。
而且push的方式速度固定,忽略了消费消费能力,如果速度太快可能导致拒绝服务或者排队堵塞的情况。
拉取(pull)模式中
优点:
(1)可以根据消费者consumer的消费能力进行拉取速率,起到一个控制速率的作用。
(2)pull中的拉取可以是批量拉取,也可以是单条数据的拉取。
缺点:
如果kafka集群中没有数据,消费者可能陷入空循环当中,一种返回空数据,消耗资源。
Kafka 高效读写数据的原因(高性能吞吐的原因)(重点)
(1)Kafka本身是分布式集群,采用了分区技术,也采用了消费者组并行消费数据,并行度高。
Topic主题采用了分区技术,针对于生产者,提高了生产者存储数据的并行度,分区也解决了存储大数据量的问题,针对于消费者,采用了消费者组的方式来消费分区的数据,提高消费者消费数据的并行度。
(2)Kafka写数据的时候,是按照顺序写入磁盘的。
Kafka生产者生产数据的时候,要写入log文件中,写的过程是一直追加到文件的末端,为顺序写。官方有数据表名,同样的磁盘,顺序写的速度能到600M/s,而随机写只有100K/s。这样磁盘的结构有关,顺序写之所以快,是因为省去了大量磁头寻址的时间。
(3)Kafka读数据的时候,采用稀疏索引,可以快速定位要消费的数据。
这个我们可以从Kafka的文件存储机制说起,在Kafka集群中,Topic是逻辑上的概念,而Partition分区是物理上的概念,所以Topic主题在硬盘上是以分区的形式存在的。一个Topic分为多个Partition,一个Partition对应一个log,而这个log文件中存储的就是Producer生成的数据。Producer生产的数据会被不断追加到该log文件的末端。为防止log文件过大导致数据定位效率低下,kafka采用了分片和索引机制,将每个partition分为多个Segment。每个Segment大约是1G左右,其中每个segment包括:.index文件,.log文件和.timeindex文件。Kafka生产者在生产日志的时候,Kafka集群存储日志的时候,索引是按照稀疏索引的方式进行存储的,大约每往.log文件中存储4KB的数据,就会在.index中写入一条索引。因为Segment大约1个G,如果我们直接查找1个G的文件,会比较慢,所以Kafka在Segment中创建了一个索引,方便对数据的定位。
(4)零拷贝技术和页缓存。
pageCache叶缓存:kafka依赖底层操作系统提供的pageCache功能。当上层有数据操作时,操作系统将数据写入叶缓存中,当读数据操作发生时,先从页缓存中找,如果找不到,再从磁盘中读取。pageCache页缓存是让多的内存当做磁盘缓存来使用。
零拷贝技术:Kafka对数据的加工和处理都交由Kafka的生产者和Kafka消费者去做,Kafka 集群应用层不关心存储的数据,所以就不用走应用层,传输效率高。
Kafka在页缓存中获取到数据之后,如果没有零拷贝技术,走的是应用层,需要把数据传输到应用层,然后再重新发送到操作系统内核中的Socket套接字缓存,之后再传输给网卡。而又了零拷贝技术,获取到页缓存获取到数据之后直接走网卡,传输效率高。
Kafka 数据可靠性(如何实现高可靠)
Kafka 数据的高可靠,需要靠Kafka生产者、Kafka集群和Kafka消费者三方面共同合作完成。
生产者数据可靠性
生产者数据的可靠性,只要是根据ack应答机制来决定的。
- ack = 0:生产者发送消息过来之后,不需要进行ack应答,生产者可以发送下一波数据。
- ack = 1:生产者发送新数据过来,Leader收到数据就进行ack应答,生产者可以发送下一波数据。
- ack = -1:生产者发送新数据过来,Leader和所有的Follower节点收到数据之后就行ack应答。
所以,在Kafka生产者中保证数据的可靠性,我们需要设置ack=-1。
Kafka集群数据可靠性
如果生产者ack应答设置为-1,可能会出现一个问题,生产者发送数据过来之后,Leader收到数据,所有Follower都开始同步数据,但如果有一个Follower因为发送故障,迟迟不能与Leader进行同步,怎么解决。
我们来看看kafka是如何解决的,Kafka让Leader维护一个动态的ISR队列,ISR队列为和Leader保持同步的Follower+Leader集合。其过程是如果Follower长时间未向Leader发送通信请求或者同步数据,那么该Follower将会被踢出ISR队列。
这样就不需要等长期联系不上或者是已经故障的节点了,保障了Kafka集群的健康。
那么,我们来假设一种情况,如果Topic主题分区的副本数设置为1个,或者ISR里的副本数量只有1个,这样的效果就相当于只有一个Leader,和ack=1的效果是一样的,这样如果Leader挂掉之后就会产生丢数据的风险了。
所以,在Kafka集群中数据的可靠性,我们需要保证主题分区副本数>=2 + ISR里最小副本数>=2。
数据完全可靠的条件 : ack = -1 + 主题分区副本数>=2 + ISR里最小副本数>=2
数据虽然完全可靠,但是会出现数据重复问题,比如Leader在和Follower中的节点同步之后,已经存了数据了,在要发送ack应答的一瞬间,这个时候,Leader挂掉了,应答没有成功,这个时候又会重新选举出来新的Leader,由于没有应答成功,Kafka生产者会进行重试,会再次发送一次一模一样的数据,这个时候就出现数据重复了。
我们数据完全可靠会实现至少一次,而如果我们需要做到数据精确一次,既数据不能重复也不能丢失,比如和钱相关的数据就需要这样。
所以,我们在保证数据完全可靠的条件下,我们还需要使用Kafka的幂等性务来实现精准一次。
Kafka幂等性是指:无论生产者发送多少重复数据给Kafka集群,Kafka集群之后持久化一次,保证了数据不重复。而新版的Kafka幂等性配置默认是打开的。
1)生产者角度
- acks设置为-1 (acks=-1)。
- 幂等性(enable.idempotence = true) + 事务 。
2)broker服务端角度
- 分区副本大于等于2 (–replication-factor 2)。
- ISR里应答的最小副本数量大于等于2 (min.insync.replicas = 2)。
3)消费者
- 事务 + 手动提交offset (enable.auto.commit = false)。
- 消费者输出的目的地必须支持事务(MySQL、Kafka)。
Kafka 消息丢失的场景及解决方案
Kafka消息丢失的场景:
1、ack = 0,生产者发送完消息之后不应答,如果发送失败消息就丢失了。
这种情况我们可以让ack = -1来解决,让生产者发送消息之后,Leader和Follower收到消息之后再进行应答。
2、Topic主题分区的副本数设置为1个,或者ISR里的副本数量只有1个,这样的效果就相当于只有一个Leader,和ack=1的效果是一样的,这样如果Leader挂掉之后就会产生丢数据的风险了。
这种情况下,要保证分区副本数>=2 + ISR里面的副本数量>=2。
Kafka 数据乱序(重点)
数据乱序,是指Kafka生产者通过Sender线程最多给Kafka集群发送五个请求,假如现在发送4个数据1234的时候出现了这种情况,1、2发送成功,3发送失败了进行重试,4发送成功了,这个时候就出现了1、 2 、4 、3的顺序,这就导致了数据乱序。
解决方案如下:
1)Kafka在1.x版本之前要保证数据单分区有序,
在不需要考虑是否开启幂等性的情况下,设置max.in.flight.requests.per.connection = 1
2)Kafka1.x版本之后要保证数据单分区有序,
(1)在未开启幂等性的情况下,设置max.in.flight.requests.per.connection = 1。
(2)在开启幂等性的情况下,需要设置max.in.flight.requests.per.connection 小于等于5,原因是在Kafka1.x以后,启用幂等性后,Kafka服务端会缓存producer发来最近5个的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。
Kafka 提高吞吐量(Kafka调优)
我们来说一种数据积压的场景,在Kafka当中默认的日志存储时间为7天,过了7天之后就会被删除,如果我们消费某个主题的数据,我们已经消费4天了,但是也只是消费者到10%,那么再过三天也消费不了,就这是数据积压。
解决数据积压的方式:提高消费者的吞吐量。
消费者:
(1)如果是Kafka的消费能力不足,我们可以考虑增加Topic主题的分区数,并同时提升消费者组的数据,让消费者数 = 分区数。
(2)如果是我们消费者消费数据能力过剩,而从集群中拉取的批次数据过少,使我们处理的数据小于生成的数据,也会造成数据积压。这个时候我们可以通过参数提高我们批次拉取的数据,从500条增加到1000条(max.poll.records),这里需要注意一下,默认每批次拉取的数据是50M,计算方式是:条数*日志大小,如果总大小大于50M,我们也需要通过参数提高50M这个值(fetch.max.bytes)。
提高生产者的吞吐量,在提生产者的吞吐量前,我们需要知道几个参数的意义
bitch.size:批次发送数据大小,默认16K。
linger.ms:等待时间,默认0ms。
compression.type:压缩类型。
RecordAccumulator:Kafka发送数据的缓冲区大小,默认是32M
了解完这几个核心参数之后,我们还需要知道生产者吞吐数据的流程:
在生产者发送数据的队列里面,在linger.ms等待时间里面,如果我们的数据迟迟未到达bitch.size,生产者sender等待linger.ms之后就会发送数据。linger.ms的默认值为0ms,说明如果我们不对它进行设置,那么队列里面只要有数据,sender线程就可以从队列里面拉取数据并发送到Kafka集群中,0延迟。并且batch.size在linger.ms为0ms的面前是不起作用的,触发我们把这个时间加长,bitch.siez才会起作用。这种来一个拉一个的效率其实不高。
一般我们会这样设置这几个参数的大小来提高Kafka生产者的吞吐量。
batch.size:每批发送数据的大小从默认的16K修改为32K。
linger.ms:等待时间,从默认的0ms修改为5-100ms(但是我们需要注意修改linger.ms会导致延迟变大,适当调节就好)
compression.type:压缩类型,我们使用snappy压缩,如果我们将数据压缩之后,能拉取的数据就更多了。
RecordAcumulator:缓冲区大小,也就是我们的队列,我们可以把默认的32M修改为64M。(buffer.memory)
以上是关于聊聊 Kafka:Kafka 如何保证一致性的主要内容,如果未能解决你的问题,请参考以下文章
Kafka在高并发的情况下,如何避免消息丢失和消息重复?kafka消费怎么保证数据消费一次?数据的一致性和统一性?数据的完整性?