kafka2:性能优化

Posted 大数据技术官

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka2:性能优化相关的知识,希望对你有一定的参考价值。

来源:myparamita


1.partition数量配置

partition数量由topic的并发决定,并发少则1个分区就可以,并发越高,分区数越多,可以提高吞吐量。

创建topic时指定topic数量
bin/kafka-topics.sh --create --zookeeper 10.25.58.35:2181 --replication-factor 3 --partitions 3 --topic test8


2.日志保留策略设置

当kafka broker的被写入海量消息后,会生成很多数据文件,占用大量磁盘空间,kafka默认是保留7天,建议根据磁盘情况配置,避免磁盘撑爆。

log.retention.hours=72

段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,kafka启动时是单线程扫描目录(log.dir)下所有数据文件)

log.segment.bytes=1073741824


3.文件刷盘策略

为了大幅度提高producer写入吞吐量,需要定期批量写文件。建议配置:

每当producer写入10000条消息时,刷数据到磁盘
log.flush.interval.messages=10000

每间隔1秒钟时间,刷数据到磁盘
log.flush.interval.ms=1000


4.网络和io操作线程配置优化

一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1.

broker处理消息的最大线程数
num.network.threads=xxx

num.io.threads主要进行磁盘io操作,高峰期可能有些io等待,因此配置需要大些。配置线程数量为cpu核数2倍,最大不超过3倍.

broker处理磁盘IO的线程数
num.io.threads=xxx

加入队列的最大请求数,超过该值,network thread阻塞

queued.max.requests=5000

server使用的send buffer大小。

socket.send.buffer.bytes=1024000

server使用的recive buffer大小。

socket.receive.buffer.bytes=1024000


5.异步提交(kafka.javaapi.producer)

采用同步:1000条8s;
采用异步:100条或3s异步写入,速度提升为1w条2s(ProducerConfig)

request.required.acks=0   producer.type=async     ##在异步模式下,一个batch发送的消息数量。producer会等待直到要发送的消息数量达到这个值,之后才会发送。但如果消息数量不够,达到queue.buffer.max.ms时也会直接发送。       batch.num.messages=100  ##默认值:200,当使用异步模式时,缓冲数据的最大时间。例如设为100的话,会每隔100毫秒把所有的消息批量发送。这会提高吞吐量,但是会增加消息的到达延时queue.buffering.max.ms=100  ##默认值:5000,在异步模式下,producer端允许buffer的最大消息数量,如果producer无法尽快将消息发送给broker,从而导致消息在producer端大量沉积,如果消息的条数达到此配置值,将会导致producer端阻塞或者消息被抛弃。queue.buffering.max.messages=1000 ##发送队列缓冲长度##默认值:10000,当消息在producer端沉积的条数达到 queue.buffering.max.meesages 时,阻塞一定时间后,队列仍然没有enqueue(producer仍然没有发送出任何消息)。此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制阻塞的时间,如果值为-1(默认值)则 无阻塞超时限制,消息不会被抛弃;如果值为0 则立即清空队列,消息被抛弃。queue.enqueue.timeout.ms=100     compression.codec=gzip


6.producer版本

使用新producer发送少量消息时丢失

新producer:org.apache.kafka.clients.producer(KafkaProducer.java)
老producer:kafka.javaapi.producer(Producer.scala)

  • 查阅资料后,原因为使用producer时必须调用producer.close(),且在发送后Thread.sleep适当时间,则不会丢失数据。否则会造成资源泄露,导致数据丢失。

  • 当使用多个producer进行发送时(使用apache线程池),当同时有多个producer并发发送时,依然会造成数据丢失。sleep后有好转,但仍然丢失。

  • 使用老producer,且compression.codec不为snappy时,不会造成数据丢失。使用线程池也不会丢失。


7.性能测试

kafka自带的性能测试工具,位于bin/kafka-producer-perf-test.sh。


8.生产端发送堵塞

  • 调整producer缓冲区大小 queue.buffering.max.messages

  • 增加通道数量:多建几个producer,使用连接池管理producer

producer使用线程池

  1. buffer.memory设置的缓存是针对每个producerThread
    针对每个producerThread,不应设置高,以免影响内存

  2. 线程池中线程数量如何设置?
    监视剩余线程数据,进行动态调整,并针对可能出现的峰值预留一定的线程。

  3. 使用tryAcquire()还是acquire()??阻塞或放弃消息??
    使用apache的线程池即可,设置阻塞时的等待时间,超过后则抛出异常。

  4. 是否对线程池容量进行动态调整?
    使用apache的线程池即可。

  5. 线程池最大线程数100,启用50个thread同时发送日志,报错:

kafka.common.QueueFullException: Event queue is full of unsent messages, could not send event: KeyedMessage(test12,null,null,........
报错原因为生产速度大于发送速度(网络传输等决定),可设置继续等待时间,超过此时间后丢弃消息;或设置一直阻塞,排队等待消息发送完毕(会造成线程死锁)。



以上是关于kafka2:性能优化的主要内容,如果未能解决你的问题,请参考以下文章

优化片段着色器

kafka2.5.0自定义数据序列化类

JavaScript性能优化5——JSBench工具的使用

JavaScript性能优化5——JSBench工具的使用

优化 C# 代码片段、ObservableCollection 和 AddRange

使用 C++ 反转句子中的每个单词需要对我的代码片段进行代码优化