漫谈Kafka集群性能调优
Posted 中国移动大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了漫谈Kafka集群性能调优相关的知识,希望对你有一定的参考价值。
前言
Kafka是由LinkedIn开发并贡献给Apache基金会的分布式消息系统,由于其分布式架构、高吞吐量、低延迟的设计特征,使其特别适合作为海量数据场景下的消息采集和分发工具,当前Kafka已经成为大数据技术领域最主流的组件之一。Kafka能够适用于多种不同的场景,作为大数据集群的运维管理人员,如何保障集群稳定并根据实际场景不断的优化集群,改善集群性能,使其满足我们的需求,成为一个重要的问题。本文就两个实际场景出发,分享一下近期在解决Kafka性能问题上的实践经验,同时进一步按照四个应用场景对Kafka集群的优化方案进行探究。
一
Kafka集群性能问题案例
1
场景一:消息延时过大问题
通过Kafka集群接收的地市共享数据(实时数据,原则上延时不超过60秒,最大延时不超过3分钟),由于消费端数据消费不及时,Kafka集群上面的数据不断积压,导致数据延迟时间持续增长,最大已达到8个小时的延迟。
Kafka的处理架构非常类似于经典的生产者-消费者模型,生产者1分钟内生产了10000条数据,消费者按照要求应该在1分钟内及时消费掉这10000条数据,但是最终发现只消费了6000条,产生了4000条的数据积压,数据在实时产生,导致雪球效应,积压越来越多,延迟越来越高,就产生了严重的处理延迟性能问题。
可以明显看到,在该场景下,应该需要针对消费端进行优化,具体如何优化,稍后再叙。
2
场景二:吞吐量不足问题
数据源一个时间周期内产生大量的数据需要发送到Kafka集群,再通过Kafka集群共享给各业务线用于数据分析。由于数据源数据量太大,出现大量数据无法及时发送给kafka集群(Kafka吞吐量不足造成),数据在producer端产生大量积压,进而影响正常的数据分析工作。
再以生产者-消费者模型来描述,生产者在11分钟内产生了10000条数据,发送到kafka集群并被成功接收的只有7000条,这样在数据源也就是producer端产生了3000条的数据积压,数据源持续不断生成数据,生产者的数据积压也越来越严重,进而造成严重的性能问题。
可以看出,这是一个生产端的问题,需要针对生产端进行优化。事实上,在实际的生产环境中,可能会同时出现生产和消费的问题,针对具体的问题需要具体的分析,下面按照四种场景来探讨Kafka的优化问题。
二
Kafka集群性能调优
Kafka是一种高吞吐量的分布式发布订阅消息系统。要充分发挥Kafka集群的性能,可以主要从以下4个维度考虑:
(1)吞吐量(throughput):生产者和消费者时间周期内数据处理速度;
(2)延时性(latency):数据时效性,实时数据对延时性要求很高;
(3)持久性(durability):数据存储周期;
(4)可用性(availability):Kafka集群高可用。
在Kafka性能调优之前,我们需要明确:不同的用户使用场景,对Kafka集群的性能要求是不同的。Kafka集群的性能指标,需要在吞吐量和延时性权衡、持久性和可用性之间权衡。但是作为集群管理者,要求我们必须全局考虑,不能孤立地只考虑其中某一个方面。下面我们将分别讨论在不同的使用场景Kafka集群的优化方案。
1
Kafka集群高吞吐量调优
高吞吐量Kafka集群是以实现高吞吐量(TPS,即producer生产速度和consumer消费速度),比如几百万、上千万TPS为目标的集群,通常这种Kafka集群的数据源数据量巨大,对Kafka集群的数据吞吐量要求很高。针对这种场景,我们的优化原则是:减少操作频次,增加单次处理数据量。我们可以对如下参数进行优化:
1.1
参数配置优化
producer端:
(1)batch.size 默认值:16384 推荐值:100000-200000(适当扩大)
参数说明:Producer会尝试去把发往同一个Partition的多个Requests进行合并,batch.size指明了一次Batch合并后Requests总大小的上限。如果这个值设置的太小,可能会导致所有的Request都不进行Batch。
(2)linger.ms 默认值:0 推荐值:10-100
参数说明:Producer默认会把两次发送时间间隔内收集到的所Requests进行一次聚合然后再发送,以此提高吞吐量,而linger.ms则更进一步,这个参数为每次发送增加一些delay,以此来聚合更多的Message。
(3)compression.type 默认值:none 推荐值:snappy、gzip、lz4
参数说明:默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和Broker的存储压力。
(4)acks 默认值:1 推荐值:1
参数说明:producer需要server接收到数据之后发出的确认接收的信号,此项配置就是指producer需要多少个这样的确认信号。此配置实际上代表了数据备份的可用性。以下设置为常用选项:
(a)acks=0: 设置为0表示producer不需要等待任何确认收到的信息。副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为-1;
(b)acks=1: 这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
(c)acks=all: 这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据,这是最强的保证。
(5) buffer.memory 默认值:32m 推荐值:适当增加
参数说明:producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”配置来决定。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。
consumer端:
(1) fetch.min.bytes 默认值:1 推荐值:10-100000
参数说明:每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。
(2) num.consumer.fetchers 默认值:1 推荐值:适当扩大
参数说明:启动Consumer的个数,适当增加可以提高并发度。
1.2
其他层面优化
除了在上述参数的调整外,还可以从以下三个方面提高吞吐量:
(1)磁盘读写方式:利用了磁盘连续读写的性能远远高于随机读写的特点;
(2)高并发:将一个topic拆分多个partition,提高数据处理并发线程数;
(3)数据压缩/聚合。
Kafka读写的单位是partition,因此将一个topic拆分为多个partition可以提高吞吐量。但是,这里有个前提,就是不同partition需要位于不同的磁盘(可以在同一个机器)。如果多个partition位于同一个磁盘,那么意味着有多个进程同时对一个磁盘的多个文件进行读写,使得操作系统会对磁盘读写进行频繁调度,也就是破坏了磁盘读写的连续性。所以在服务器磁盘挂载时,需要为每块磁盘独立挂载目录,并且不需要做raid,就是为了充分利用多磁盘并发读写,又保证每个磁盘连续读写的特性。
例如,一台服务器有六块硬盘,配置要求每块硬盘作为一个数据存储目录配置到broker的log.dirs, 配置信息如下:log.dirs= /data1/kafka-logs,/data2/kafka-logs,/data3/kafka-logs,/data4/kafka-logs,/data5/kafka-logs,/data6/kafka-logs
Kafka会在新建partition的时候,将新partition分布在partition最少的目录上,因此,一般不能将同一个磁盘的多个目录设置到log.dirs。同一个ConsumerGroup内的Consumer和Partition在同一时间内必须保证是一对一的消费关系。任意Partition在某一个时刻只能被一个Consumer Group内的一个Consumer消费(反过来一个Consumer则可以同时消费多个Partition)。
很多时候,性能的瓶颈并非CPU、内存或硬盘而是网络带宽,对于需要在数据中心之传送大量数据的应用更是如此。用户可以在没有Kafka支持的情况下各自压缩自己的消息,举例说明:我们需要对10000W条数据进行传输,我们可以提前将10条数据聚合成一条,那么我们总得传输次数变成1000次,这一点在信令数据传输过程中,性能上有极大提升。
PS:Kafka采用了端到端的压缩:因为有“消息集”的概念,客户端的消息可以一起被压缩后送到服务端,并以压缩后的格式写入日志文件,以压缩的格式发送到consumer,消息producer发出到consumer拿到都被是压缩的,只有在consumer使用的时候才被解压缩,所以叫做“端到端的压缩”。
2
Kafka集群延时性调优
Kafka集群延时性主要指数据写入和消费的时效性。Kafka集群实现低延时,就要求消息从被写入到被读取之间的时间间隔越小越好。这一点主要用于实时数据的处理,低延时的一个实际应用场景就是平时的聊天或短信程序,接收到某一条消息越快越好。这种场景的优化原则:减少数据操作过程(聚合、压缩),遵循即来即处理原则。
2.1
参数配置优化
producer端:
(1)batch.size 根据产生数据速度调整,如果数据量小,建议设置较小的值
(2)linger.ms,同前一个场景的描述,推荐值:0
(3)compression.type,推荐值:none
(4)acks 推荐值:1
consumer端:
fetch.min.bytes 推荐值:1
broker端:
num.replica.fetchers 默认值:1 推荐值:不超过CPU核数+1
参数说明:从leader备份数据的线程数,如果发生ISR频繁进出的情况或follower无法追上leader的情况则,适当增加该值,但通常不要超过CPU核数+1。
3
Kafka集群持久性调优
Kafka实现高持久性,即被成功提交的消息永远不能丢失?比如以Kafka作为底层数据存储的应用服务,那么就要求Kafka不能数据丢失。通常Kafka上的数据不会长时间存储,参数log.retention.hours 和 log.retention.bytes定义日志文件删除策略,无论哪个属性已经达到阀值,都将自动删除存储的日志文件。
3.1
参数配置优化
producer端:
acks 推荐值:allmax.in.flight.requests.per.connection 推荐值:1
参数说明:限制客户端在单个连接上能够发送的未响应请求的个数。设置此值是1表示Kafka broker在响应请求之前client不能再向同一个broker发送请求。设置此参数是为了避免消息乱序。
broker端:
unclean.leader.election.enable 默认值:true 推荐值:false
参数说明:关闭unclean leader选举,即不允许非ISR中的副本被选举为leader,以避免数据丢失。
default.replication.factor 默认值:1 推荐值:3
参数说明: 默认备份因子数、
min.insync.replicas 默认值:1 推荐值:2
参数说明:消息至少要被写入到这么多副本才算成功,也是提升数据持久性的一个参数。与acks配合使用,即设置为replication factor - 1。
auto.create.topics.enable 默认值:true 推荐值:false
参数说明:是否允许自动创建topic。如果是真的,则produce或者fetch 不存在的topic时,会自动创建这个topic,否则需要使用命令行创建topic。
broker.rack 推荐值:如果有机架信息,则最好设置该值,保证数据在多个rack间的分布性以达到高持久化。
log.flush.interval.messages
参数说明:log文件“sync”到磁盘之前累积的消息条数。因为磁盘IO操作是一个慢操作,但又是一个“数据可靠性”的必要手段,所以检查是否需要固化到硬盘的时间间隔,需要在“数据可靠性”与“性能”之间做必要的权衡,如果此值过大,将会导致每次“发sync”的时间过长(IO阻塞),如果此值过小,将会导致“fsync”的时间较长(IO阻塞),如果此值过小,将会导致发“sync”的次数较多,这也就意味着整体的client请求有一定的延迟,物理server故障,将会导致没有fsync的消息丢失。如果是特别重要的topic并且TPS本身也不高,则推荐设置成比较低的值。注意Kafka数据只有flash到硬盘才能被后续消费者消费。
log.flush.interval.ms
参数说明:仅仅通过interval来控制消息的磁盘写入时机,是不足的,这个数用来控制”fsync”的时间间隔,如果消息量始终没有达到固化到磁盘的消息数,但是离上次磁盘同步的时间间隔达到阈值,也将触发磁盘同步。如果是特别重要的topic并且TPS本身也不高,则推荐设置成比较低的值。注意Kafka数据只有flash到硬盘才能被后续消费者消费。
consumer端 :
auto.commit.enable 默认值true 推荐值:false
参数说明:如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。这样提交的offset将在进程挂掉时,由新的consumer使用。改为自己控制offset值。
4
Kafka集群高可用调优
Kafka本身是分布式系统,自身具有对抗服务崩溃的特性。如果集群高可用是用户的主要目标,配置特定的参数确保Kafka可以及时从崩溃中恢复就显得至关重要了。
4.1
参数配置优化
broker端:
unclean.leader.election.enable 推荐值:true
参数说明:关闭unclean leader选举,即不允许非ISR中的副本被选举为leader,以避免数据丢失。
num.recovery.threads.per.data.dir 默认值:1 推荐值:log.dirs中配置
的目录数
参数说明:每个数据目录用来日志恢复的线程数目。
三
总结
结合上述四种场景的方案,回到文初的两个场景,对于场景一,通过减少数据操作过程(聚合、压缩),遵循即来即处理原则,同时提高消费者并发线程数来解决。对于场景二,通过数据压缩聚合,增加单次处理数据量,减少数据发送频次,扩大topic partition数(相当于提高并发吞吐能力)处理来解决。
本文列出的Kafka调优参数只是众多参数的很少部分,许多基础参数都没有一一列出,任何系统调优工作都离不开实际应用服务的配合。Kafka集群的性能指标,需要在吞吐量和延时性权衡、持久性和可用性之间权衡,很难找到一种适合所有场景下的参数配置。作为大数据集群管理者只有结合具体应用,不断调整Kafka配置参数才能实现集群的高吞吐、低时延和高可用等目标。
以上是关于漫谈Kafka集群性能调优的主要内容,如果未能解决你的问题,请参考以下文章
Kafka跨集群迁移方案MirrorMaker原理使用以及性能调优实践