漫谈Kafka集群性能调优

Posted 中国移动大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了漫谈Kafka集群性能调优相关的知识,希望对你有一定的参考价值。

前言

Kafka是由LinkedIn开发并贡献给Apache基金会的分布式消息系统,由于其分布式架构、高吞吐量、低延迟的设计特征,使其特别适合作为海量数据场景下的消息采集和分发工具,当前Kafka已经成为大数据技术领域最主流的组件之一。Kafka能够适用于多种不同的场景,作为大数据集群的运维管理人员,如何保障集群稳定并根据实际场景不断的优化集群,改善集群性能,使其满足我们的需求,成为一个重要的问题。本文就两个实际场景出发,分享一下近期在解决Kafka性能问题上的实践经验,同时进一步按照四个应用场景对Kafka集群的优化方案进行探究。



Kafka集群性能问题案例


1

场景一:消息延时过大问题

通过Kafka集群接收的地市共享数据(实时数据,原则上延时不超过60秒,最大延时不超过3分钟),由于消费端数据消费不及时,Kafka集群上面的数据不断积压,导致数据延迟时间持续增长,最大已达到8个小时的延迟。

Kafka的处理架构非常类似于经典的生产者-消费者模型,生产者1分钟内生产了10000条数据,消费者按照要求应该在1分钟内及时消费掉这10000条数据,但是最终发现只消费了6000条,产生了4000条的数据积压,数据在实时产生,导致雪球效应,积压越来越多,延迟越来越高,就产生了严重的处理延迟性能问题。

可以明显看到,在该场景下,应该需要针对消费端进行优化,具体如何优化,稍后再叙。


2

场景二:吞吐量不足问题

数据源一个时间周期内产生大量的数据需要发送到Kafka集群,再通过Kafka集群共享给各业务线用于数据分析。由于数据源数据量太大,出现大量数据无法及时发送给kafka集群(Kafka吞吐量不足造成),数据在producer端产生大量积压,进而影响正常的数据分析工作。

再以生产者-消费者模型来描述,生产者在11分钟内产生了10000条数据,发送到kafka集群并被成功接收的只有7000条,这样在数据源也就是producer端产生了3000条的数据积压,数据源持续不断生成数据,生产者的数据积压也越来越严重,进而造成严重的性能问题。

可以看出,这是一个生产端的问题,需要针对生产端进行优化。事实上,在实际的生产环境中,可能会同时出现生产和消费的问题,针对具体的问题需要具体的分析,下面按照四种场景来探讨Kafka的优化问题。



Kafka集群性能调优


Kafka是一种高吞吐量的分布式发布订阅消息系统。要充分发挥Kafka集群的性能,可以主要从以下4个维度考虑:

(1)吞吐量(throughput):生产者和消费者时间周期内数据处理速度;

(2)延时性(latency):数据时效性,实时数据对延时性要求很高;

(3)持久性(durability):数据存储周期;

(4)可用性(availability):Kafka集群高可用。

在Kafka性能调优之前,我们需要明确:不同的用户使用场景,对Kafka集群的性能要求是不同的。Kafka集群的性能指标,需要在吞吐量和延时性权衡、持久性和可用性之间权衡。但是作为集群管理者,要求我们必须全局考虑,不能孤立地只考虑其中某一个方面。下面我们将分别讨论在不同的使用场景Kafka集群的优化方案。


1

Kafka集群高吞吐量调优

高吞吐量Kafka集群是以实现高吞吐量(TPS,即producer生产速度和consumer消费速度),比如几百万、上千万TPS为目标的集群,通常这种Kafka集群的数据源数据量巨大,对Kafka集群的数据吞吐量要求很高。针对这种场景,我们的优化原则是:减少操作频次,增加单次处理数据量。我们可以对如下参数进行优化:

1.1

漫谈Kafka集群性能调优

参数配置优化

漫谈Kafka集群性能调优

producer端:

(1)batch.size   默认值:16384   推荐值:100000-200000(适当扩大)

参数说明:Producer会尝试去把发往同一个Partition的多个Requests进行合并,batch.size指明了一次Batch合并后Requests总大小的上限。如果这个值设置的太小,可能会导致所有的Request都不进行Batch。

(2)linger.ms   默认值:0   推荐值:10-100

参数说明:Producer默认会把两次发送时间间隔内收集到的所Requests进行一次聚合然后再发送,以此提高吞吐量,而linger.ms则更进一步,这个参数为每次发送增加一些delay,以此来聚合更多的Message。

(3)compression.type  默认值:none  推荐值:snappy、gzip、lz4

参数说明:默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和Broker的存储压力。

(4)acks  默认值:1  推荐值:1

参数说明:producer需要server接收到数据之后发出的确认接收的信号,此项配置就是指producer需要多少个这样的确认信号。此配置实际上代表了数据备份的可用性。以下设置为常用选项:
    (a)acks=0: 设置为0表示producer不需要等待任何确认收到的信息。副本将立即加到socket  buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为-1;
   (b)acks=1: 这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
   (c)acks=all: 这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据,这是最强的保证。

(5) buffer.memory     默认值:32m  推荐值:适当增加

参数说明:producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”配置来决定。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。

consumer端:

(1) fetch.min.bytes  默认值:1  推荐值:10-100000

参数说明:每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。

(2) num.consumer.fetchers  默认值:1  推荐值:适当扩大

参数说明:启动Consumer的个数,适当增加可以提高并发度。

1.2

漫谈Kafka集群性能调优

其他层面优化

漫谈Kafka集群性能调优


除了在上述参数的调整外,还可以从以下三个方面提高吞吐量:

(1)磁盘读写方式:利用了磁盘连续读写的性能远远高于随机读写的特点;

(2)高并发:将一个topic拆分多个partition,提高数据处理并发线程数;

3数据压缩/聚合。

Kafka读写的单位是partition,因此将一个topic拆分为多个partition可以提高吞吐量。但是,这里有个前提,就是不同partition需要位于不同的磁盘(可以在同一个机器)。如果多个partition位于同一个磁盘,那么意味着有多个进程同时对一个磁盘的多个文件进行读写,使得操作系统会对磁盘读写进行频繁调度,也就是破坏了磁盘读写的连续性。所以在服务器磁盘挂载时,需要为每块磁盘独立挂载目录,并且不需要做raid,就是为了充分利用多磁盘并发读写,又保证每个磁盘连续读写的特性。

例如,一台服务器有六块硬盘,配置要求每块硬盘作为一个数据存储目录配置到broker的log.dirs, 配置信息如下:log.dirs= /data1/kafka-logs,/data2/kafka-logs,/data3/kafka-logs,/data4/kafka-logs,/data5/kafka-logs,/data6/kafka-logs

漫谈Kafka集群性能调优

Kafka会在新建partition的时候,将新partition分布在partition最少的目录上,因此,一般不能将同一个磁盘的多个目录设置到log.dirs。同一个ConsumerGroup内的Consumer和Partition在同一时间内必须保证是一对一的消费关系。任意Partition在某一个时刻只能被一个Consumer Group内的一个Consumer消费(反过来一个Consumer则可以同时消费多个Partition)。

很多时候,性能的瓶颈并非CPU、内存或硬盘而是网络带宽,对于需要在数据中心之传送大量数据的应用更是如此。用户可以在没有Kafka支持的情况下各自压缩自己的消息,举例说明:我们需要对10000W条数据进行传输,我们可以提前将10条数据聚合成一条,那么我们总得传输次数变成1000次,这一点在信令数据传输过程中,性能上有极大提升。

PS:Kafka采用了端到端的压缩:因为有“消息集”的概念,客户端的消息可以一起被压缩后送到服务端,并以压缩后的格式写入日志文件,以压缩的格式发送到consumer,消息producer发出到consumer拿到都被是压缩的,只有在consumer使用的时候才被解压缩,所以叫做“端到端的压缩”。


2

Kafka集群延时性调优

Kafka集群延时性主要指数据写入和消费的时效性。Kafka集群实现低延时,就要求消息从被写入到被读取之间的时间间隔越小越好。这一点主要用于实时数据的处理,低延时的一个实际应用场景就是平时的聊天或短信程序,接收到某一条消息越快越好。这种场景的优化原则:减少数据操作过程(聚合、压缩),遵循即来即处理原则。

2.1

漫谈Kafka集群性能调优

参数配置优化

漫谈Kafka集群性能调优

producer端:

(1)batch.size   根据产生数据速度调整,如果数据量小,建议设置较小的值

(2)linger.ms,同前一个场景的描述,推荐值:0

(3)compression.type,推荐值:none

(4)acks    推荐值:1

consumer端:

fetch.min.bytes        推荐值:1

broker端:

num.replica.fetchers   默认值:1    推荐值:不超过CPU核数+1

参数说明:从leader备份数据的线程数,如果发生ISR频繁进出的情况或follower无法追上leader的情况则,适当增加该值,但通常不要超过CPU核数+1。


3

Kafka集群持久性调优

Kafka实现高持久性,即被成功提交的消息永远不能丢失?比如以Kafka作为底层数据存储的应用服务,那么就要求Kafka不能数据丢失。通常Kafka上的数据不会长时间存储,参数log.retention.hours 和 log.retention.bytes定义日志文件删除策略,无论哪个属性已经达到阀值,都将自动删除存储的日志文件。

3.1

漫谈Kafka集群性能调优

参数配置优化

漫谈Kafka集群性能调优

producer端:

acks 推荐值:allmax.in.flight.requests.per.connection     推荐值:1   

参数说明:限制客户端在单个连接上能够发送的未响应请求的个数。设置此值是1表示Kafka broker在响应请求之前client不能再向同一个broker发送请求。设置此参数是为了避免消息乱序。

broker端:

unclean.leader.election.enable     默认值:true  推荐值:false 

参数说明:关闭unclean leader选举,即不允许非ISR中的副本被选举为leader,以避免数据丢失。

default.replication.factor        默认值:1   推荐值:3

参数说明: 默认备份因子数、

min.insync.replicas             默认值:1     推荐值:2

参数说明:消息至少要被写入到这么多副本才算成功,也是提升数据持久性的一个参数。与acks配合使用,即设置为replication factor - 1。

auto.create.topics.enable          默认值:true  推荐值:false  

参数说明:是否允许自动创建topic。如果是真的,则produce或者fetch 不存在的topic时,会自动创建这个topic,否则需要使用命令行创建topic。

broker.rack         推荐值:如果有机架信息,则最好设置该值,保证数据在多个rack间的分布性以达到高持久化。

log.flush.interval.messages      

参数说明:log文件“sync”到磁盘之前累积的消息条数。因为磁盘IO操作是一个慢操作,但又是一个“数据可靠性”的必要手段,所以检查是否需要固化到硬盘的时间间隔,需要在“数据可靠性”与“性能”之间做必要的权衡,如果此值过大,将会导致每次“发sync”的时间过长(IO阻塞),如果此值过小,将会导致“fsync”的时间较长(IO阻塞),如果此值过小,将会导致发“sync”的次数较多,这也就意味着整体的client请求有一定的延迟,物理server故障,将会导致没有fsync的消息丢失。如果是特别重要的topic并且TPS本身也不高,则推荐设置成比较低的值。注意Kafka数据只有flash到硬盘才能被后续消费者消费。

log.flush.interval.ms           

参数说明:仅仅通过interval来控制消息的磁盘写入时机,是不足的,这个数用来控制”fsync”的时间间隔,如果消息量始终没有达到固化到磁盘的消息数,但是离上次磁盘同步的时间间隔达到阈值,也将触发磁盘同步。如果是特别重要的topic并且TPS本身也不高,则推荐设置成比较低的值。注意Kafka数据只有flash到硬盘才能被后续消费者消费。

consumer端  :

auto.commit.enable       默认值true    推荐值:false 

参数说明:如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。这样提交的offset将在进程挂掉时,由新的consumer使用。改为自己控制offset值。


4

Kafka集群高可用调优

Kafka本身是分布式系统,自身具有对抗服务崩溃的特性。如果集群高可用是用户的主要目标,配置特定的参数确保Kafka可以及时从崩溃中恢复就显得至关重要了。

4.1

漫谈Kafka集群性能调优

参数配置优化

broker端:

unclean.leader.election.enable  推荐值:true

参数说明:关闭unclean leader选举,即不允许非ISR中的副本被选举为leader,以避免数据丢失。

num.recovery.threads.per.data.dir  默认值:1  推荐值:log.dirs中配置
的目录数

参数说明:每个数据目录用来日志恢复的线程数目。



总结


结合上述四种场景的方案,回到文初的两个场景,对于场景一,通过减少数据操作过程(聚合、压缩),遵循即来即处理原则,同时提高消费者并发线程数来解决。对于场景二,通过数据压缩聚合,增加单次处理数据量,减少数据发送频次,扩大topic partition数(相当于提高并发吞吐能力)处理来解决。

本文列出的Kafka调优参数只是众多参数的很少部分,许多基础参数都没有一一列出,任何系统调优工作都离不开实际应用服务的配合。Kafka集群的性能指标,需要在吞吐量和延时性权衡、持久性和可用性之间权衡,很难找到一种适合所有场景下的参数配置。作为大数据集群管理者只有结合具体应用,不断调整Kafka配置参数才能实现集群的高吞吐、低时延和高可用等目标。


以上是关于漫谈Kafka集群性能调优的主要内容,如果未能解决你的问题,请参考以下文章

Kafka性能调优

Kafka跨集群迁移方案MirrorMaker原理使用以及性能调优实践

信步漫谈之Eclipse—性能调优

Spark Core 性能调优之配置进程参数

[数据库]漫谈ElasticSearch关于ES性能调优几件必须知道的事(转)

漫谈ElasticSearch关于ES性能调优几件必须知道的事(转)