分布式流处理组件-优化篇:Producer生产调优之核心参数

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式流处理组件-优化篇:Producer生产调优之核心参数相关的知识,希望对你有一定的参考价值。

前言

前面我们花了较长的时间对生产者Producer理论、Producer分区做了一个比较细致的介绍。详细大家在认真阅读完前两节的内容之后会对Kafka的生产者有一个比较清晰的认知。

其中我们需要重点掌握的内容是:Producer发送消息的过程,如果有不清楚的建议返回好好品味。

  • 《分布式流处理组件-理论篇:Producer入门理论》

我们需要注意的是:

  • 在生产环境由于物理机器等资源配置的影响,也为了更大程度上保证资源的利用率,我们都会对各个组件进行适配。

而Kafka的Producer也是一样的。在生产环境中也有很多需要注意的点。本章我们就来好好的聊一聊~

啰嗦两句Producer的消息发送原理

如上图所示,消息数据通过主线程调用producer.send()将其发送出去,其中经过拦截器、序列化器、分区器的层层加工之后,记录缓冲区RecordAccumulator会将加工之后的消息记录添加到其中。而消息也不是单纯的存在于RecordAccumulator中,为了降低网络IO,Producer将其按照batch的形式进行存放。

而batch消息也会根据分区器计算得到的分区号存在于对应的Deque双端队列中,所以他们的关系就是图中一层包一层的样子

当batch满足某个条件或者消息等待指定时间之后,sender线程被拉起,Sender程序将不断从缓冲区取出数据,进而进入到另外一个阶段

从缓冲区拉取出来的数据会被封装为Request对象,并且与缓存区类似的是:

  • NetworkClient中同样会存在一个类似缓冲区的存在:InFlightRequests。其中会按照分区对Request进行存储。所以他们的逻辑关系其实是这样的

随后进行发送,而在默认情况下,如果Broker端一直没有响应,每个分区下的Request只能存放5个请求。而超出的情况将会阻塞发送逻辑。

消息发送成功后,将会清空原始数据。否则尝试重试等操作

Producer高吞吐

前置条件

来吧,啰嗦完Producer发送过程之后,就到了精彩的测试验证环节。有句话需要重点说明下:

  • 以下验证结果不属银弹,实际生产中参数配置如何: 还是需要结合实际业务场景和资源配置给出最佳参数

机器配置:

  • 3台 2C4G 的虚拟机
  • Topic设置分区数为3,副本因子为2

测试脚本

本次Producer吞吐量测试脚本在Kafka中已经提供,我们在$KAFKA_HOME/kafka_2.13-3.3.1/bin中可以找到kafka-producer-perf-test.sh。如下对执行参数进行简单说明:

  • --throughput: 限制测试发送消息的最大吞吐量,-1表示不受限制
  • --num-records: 测试消息的数据量。
  • --record-size: 每条消息的大小。
  • --producer-props: Producer可配置参数信息,我们也可以通过properties文件的方式配置到--producer.config

默认参数负载情况

kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 --throughput -1 --num-records 100000  --record-size 1024

发送10W条消息大小为1KB的消息到newTopic_test001上,不设置吞吐限制

30421 records sent, 6083.0 records/sec (5.94 MB/sec), 2125.8 ms avg latency, 3695.0 ms max latency.
55005 records sent, 11001.0 records/sec (10.74 MB/sec), 3129.7 ms avg latency, 4864.0 ms max latency.
100000 records sent, 9042.408898 records/sec (8.83 MB/sec), 2719.28 ms avg latency, 4864.00 ms max latency, 2598 ms 50th, 4250 ms 95th, 4734 ms 99th, 4832 ms 99.9th.

输出表示:

  • 成功消费100000记录,吞吐量:10101.010101条/s (9.86 MB/sec)
  • 平均延迟2451.60ms,最大延迟4025.00ms
  • 50%消息延迟2455ms, 95%消息延迟3581ms, 99%消息延迟3875ms, 99.9%消息延迟4016ms。

参数介绍

下面要介绍的这几个参数,在Producer端:任意一个都可以称为王牌的存在。接下来我们就一一来看看:

batch.size

每当发送多个消息时,为了提高客户端和服务器的性能,生产者将尝试对多个消息进行打包成批,保证这一批可以在同一个分区内。默认为16384【16KB】

为了尽可能的提高吞吐量,在实际生产中需要对发送的消息进行合理预估,根据实际情况选择一个合理的大小,避免出现如下情况:

  • 单条消息超过batch.size,Producer有可能不会处理此消息
  • batch.size过大,有可能会造成Producer端内存空间的浪费
  • batch.size过小,频繁的网络IO会降低Producer的吞吐

linger.ms

如果消息迟迟没有达到batch.size,那么将尝试等待linger.ms时间发送。默认等待时间为0,也就是当消息到达之后立即发送

这两个参数我们或多或少都介绍过,但是并没有测试调整它们对吞吐量的影响。接下来我们就先来测试一波吧!!!

测试batch.sizelinger.ms对吞吐量的影响

对照以上默认测试结果,然后我们开始进行参数调整。

跟着我的节奏,先对单个参数进行调整,对比差异

  • batch.size=32768
kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 batch.size=32768 --throughput -1 --num-records 100000  --record-size 1024
68697 records sent, 13733.9 records/sec (13.41 MB/sec), 900.0 ms avg latency, 1812.0 ms max latency.
100000 records sent, 14817.009927 records/sec (14.47 MB/sec), 1129.92 ms avg latency, 1957.00 ms max latency, 1167 ms 50th, 1884 ms 95th, 1932 ms 99th, 1951 ms 99.9th.

100000 records sent, 22311.468095 records/sec (21.79 MB/sec), 1036.42 ms avg latency, 1467.00 ms max latency, 1100 ms 50th, 1431 ms 95th, 1449 ms 99th, 1462 ms 99.9th.

100000 records sent, 21753.317381 records/sec (21.24 MB/sec), 1001.94 ms avg latency, 1646.00 ms max latency, 955 ms 50th, 1614 ms 95th, 1631 ms 99th, 1639 ms 99.9th.

对比默认结果,我们可以看到在吞吐量上已经有了非常明显的提高

  • linger.ms=3000
kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 linger.ms=3000 --throughput -1 --num-records 100000  --record-size 1024
60526 records sent, 12102.8 records/sec (11.82 MB/sec), 1632.5 ms avg latency, 2337.0 ms max latency.
100000 records sent, 12280.486307 records/sec (11.99 MB/sec), 1882.64 ms avg latency, 2591.00 ms max latency, 2050 ms 50th, 2553 ms 95th, 2571 ms 99th, 2584 ms 99.9th.

64966 records sent, 12990.6 records/sec (12.69 MB/sec), 1556.1 ms avg latency, 2757.0 ms max latency.
100000 records sent, 13540.961408 records/sec (13.22 MB/sec), 1719.67 ms avg latency, 2757.00 ms max latency, 1474 ms 50th, 2701 ms 95th, 2730 ms 99th, 2744 ms 99.9th.

71776 records sent, 14355.2 records/sec (14.02 MB/sec), 1506.2 ms avg latency, 2176.0 ms max latency.
100000 records sent, 14320.492625 records/sec (13.98 MB/sec), 1577.49 ms avg latency, 2176.00 ms max latency, 1668 ms 50th, 2134 ms 95th, 2154 ms 99th, 2170 ms 99.9th.

同样还是要和默认测试结果进行对比,虽然吞吐量没有配置batch.size的效果差异,但也不能说本次调整不重要

  • 合并参数测试
kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 linger.ms=3000 batch.size=32768 --throughput -1 --num-records 100000  --record-size 1024
100000 records sent, 21739.130435 records/sec (21.23 MB/sec), 1007.46 ms avg latency, 1428.00 ms max latency, 995 ms 50th, 1400 ms 95th, 1411 ms 99th, 1424 ms 99.9th.
100000 records sent, 23041.474654 records/sec (22.50 MB/sec), 952.25 ms avg latency, 1334.00 ms max latency, 919 ms 50th, 1302 ms 95th, 1318 ms 99th, 1328 ms 99.9th.
100000 records sent, 24271.844660 records/sec (23.70 MB/sec), 916.17 ms avg latency, 1318.00 ms max latency, 912 ms 50th, 1278 ms 95th, 1309 ms 99th, 1314 ms 99.9th.

已经可以看出对比了吧。 我们继续~~~

compression.type

该参数对Producer生产的数据进行压缩,主要针对批数据压缩。默认是none【无压缩】,可以用来设置的值:

  • gzip
  • snappy
  • lz4
  • zstd

接下来我们直接测试这几个压缩算法的性能吧,为了方便写个脚本

#!/bin/bash

for i in gzip snappy lz4 zstd
do
	for ((j=0; j<3; j++))
	do
        echo "----$i方式的第$j次测试----"

		kafka-producer-perf-test.sh --topic newTopic_test002 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 batch.size=65536 compression.type=$i --throughput -1 --num-records 100000  --record-size 1024

	done
done
----gzip方式的第0次测试----
100000 records sent, 25425.883549 records/sec (24.83 MB/sec), 136.45 ms avg latency, 744.00 ms max latency, 97 ms 50th, 451 ms 95th, 677 ms 99th, 742 ms 99.9th.
----gzip方式的第1次测试----
100000 records sent, 27019.724399 records/sec (26.39 MB/sec), 54.22 ms avg latency, 291.00 ms max latency, 27 ms 50th, 174 ms 95th, 228 ms 99th, 288 ms 99.9th.
----gzip方式的第2次测试----
100000 records sent, 27940.765577 records/sec (27.29 MB/sec), 21.55 ms avg latency, 267.00 ms max latency, 11 ms 50th, 75 ms 95th, 97 ms 99th, 130 ms 99.9th.
===========分割线=============
----snappy方式的第0次测试----
100000 records sent, 34153.005464 records/sec (33.35 MB/sec), 518.28 ms avg latency, 1047.00 ms max latency, 479 ms 50th, 969 ms 95th, 1029 ms 99th, 1044 ms 99.9th.
----snappy方式的第1次测试----
100000 records sent, 32765.399738 records/sec (32.00 MB/sec), 474.20 ms avg latency, 985.00 ms max latency, 476 ms 50th, 921 ms 95th, 971 ms 99th, 984 ms 99.9th.
----snappy方式的第2次测试----
100000 records sent, 34578.146611 records/sec (33.77 MB/sec), 450.48 ms avg latency, 932.00 ms max latency, 442 ms 50th, 869 ms 95th, 911 ms 99th, 928 ms 99.9th.
===========分割线=============
----lz4方式的第0次测试----
100000 records sent, 34059.945504 records/sec (33.26 MB/sec), 474.75 ms avg latency, 871.00 ms max latency, 461 ms 50th, 818 ms 95th, 852 ms 99th, 867 ms 99.9th.
----lz4方式的第1次测试----
100000 records sent, 37174.721190 records/sec (36.30 MB/sec), 408.28 ms avg latency, 769.00 ms max latency, 418 ms 50th, 716 ms 95th, 753 ms 99th, 767 ms 99.9th.
----lz4方式的第2次测试----
100000 records sent, 32404.406999 records/sec (31.64 MB/sec), 483.38 ms avg latency, 1145.00 ms max latency, 372 ms 50th, 1077 ms 95th, 1122 ms 99th, 1142 ms 99.9th.
===========分割线=============
----zstd方式的第0次测试----
100000 records sent, 51975.051975 records/sec (50.76 MB/sec), 57.53 ms avg latency, 279.00 ms max latency, 54 ms 50th, 108 ms 95th, 141 ms 99th, 158 ms 99.9th.
----zstd方式的第1次测试----
100000 records sent, 47755.491882 records/sec (46.64 MB/sec), 66.12 ms avg latency, 333.00 ms max latency, 49 ms 50th, 208 ms 95th, 316 ms 99th, 332 ms 99.9th.
----zstd方式的第2次测试----
100000 records sent, 51203.277010 records/sec (50.00 MB/sec), 60.27 ms avg latency, 265.00 ms max latency, 49 ms 50th, 148 ms 95th, 172 ms 99th, 193 ms 99.9th.
===========分割线=============

根据本次测试,而且测试命令也运行了多次,确实是zstd较好。 而我们再看看官网给出的说明:

acks

前面章节中其实我们介绍过acks这个参数。

  • "0": 当消息调用send()发送出去之后就表示消息已经发送成功,不管消息是否已经到达broker
  • "1": 消息发送后,Leader接收到消息并记录到本地之后,不需要同步数据到副本就能进行ack返回
  • "all": 当消息在Leader接收记录,并且等待副本数据同步完成之后,才会返回ack。 该级别也属于Java#Producer的默认配置

基本就是默认配置,只不过多了一个acks的配置项

#!/bin/bash
for i in 0 1 all
do
echo "---------------acks=$i------------------"
        for ((j=0; j<3; j++))
        do
        kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 acks=$i --throughput -1 --num-records 100000  --record-size 1024
        done
done
--------------acks=0------------------
100000 records sent, 43878.894252 records/sec (42.85 MB/sec), 60.90 ms avg latency, 367.00 ms max latency, 20 ms 50th, 218 ms 95th, 297 ms 99th, 363 ms 99.9th.
100000 records sent, 38699.690402 records/sec (37.79 MB/sec), 121.56 ms avg latency, 823.00 ms max latency, 60 ms 50th, 469 ms 95th, 736 ms 99th, 814 ms 99.9th.
100000 records sent, 37650.602410 records/sec (36.77 MB/sec), 127.71 ms avg latency, 441.00 ms max latency, 100 ms 50th, 351 ms 95th, 403 ms 99th, 440 ms 99.9th.
---------------acks=1------------------
100000 records sent, 30284.675954 records/sec (29.57 MB/sec), 387.87 ms avg latency, 897.00 ms max latency, 340 ms 50th, 799 ms 95th, 855 ms 99th, 893 ms 99.9th.
100000 records sent, 36995.930448 records/sec (36.13 MB/sec), 247.35 ms avg latency, 826.00 ms max latency, 199 ms 50th, 703 ms 95th, 789 ms 99th, 813 ms 99.9th.
100000 records sent, 37425.149701 records/sec (36.55 MB/sec), 394.01 ms avg latency, 850.00 ms max latency, 389 ms 50th, 784 ms 95th, 814 ms 99th, 841 ms 99.9th.
---------------acks=all------------------
76362 records sent, 15272.4 records/sec (14.91 MB/sec), 1422.4 ms avg latency, 1980.0 ms max latency.
100000 records sent, 16084.928422 records/sec (15.71 MB/sec), 1463.71 ms avg latency, 1980.00 ms max latency, 1628 ms 50th, 1903 ms 95th, 1960 ms 99th, 1974 ms 99.9th.

81151 records sent, 16223.7 records/sec (15.84 MB/sec), 1404.0 ms avg latency, 2201.0 ms max latency.
100000 records sent, 16906.170752 records/sec (16.51 MB/sec), 1397.82 ms avg latency, 2201.00 ms max latency, 1491 ms 50th, 2001 ms 95th, 2136 ms 99th, 2193 ms 99.9th.

88156 records sent, 17631.2 records/sec (17.22 MB/sec), 1245.0 ms avg latency, 1688.0 ms max latency.
100000 records sent, 17969.451932 records/sec (17.55 MB/sec), 1255.73 ms avg latency, 1688.00 ms max latency, 1403 ms 50th, 1598 ms 95th, 1635 ms 99th, 1668 ms 99.9th.

所以:

  • 如果是类似日志、行为等不重要的消息,建议将acks设置为0.
  • 其他的就根据消息的安全程度来进行合理的选择吧~

下期预告

本期针对Producer调优参数的介绍和测试对比到这里就已经结束了。还是一句话:

  • 从数据看结果:实际生产中进行测试,选择合理的配置参数信息是必须得~

下一期针对数据可靠性我们来做一个详细的介绍。 期待~

分布式流式处理组件-理论篇: Producer分区

前言

前面我们已经对Producer发送原理做了一个比较详细的说明,其中我们提到了分区器。其实从整体结构上来讲,分区器也是属于一个非常重要的知识点,所以我们来专门对分区以及分区策略等内容做一个介绍。

为什么需要分区

分区的作用

  • 合理的使用存储资源:把海量的数据按照分区切割成一小块的数据存储在多台Broker上。此时能够保证每台服务器存储资源能够被充分利用到。而且小块数据在寻址时间上更有优势~
  • 负载均衡: 数据生产或消费期间,生产者已分区的单位发送数据消费者分区的单位进行消费。 期间,各分区生产和消费数据互不影响,这样能够达到合理控制分区任务的程度,提高任务的并行度。从而达到负载均衡的效果。

刚才我们提到:生产者已分区为单位向Broker发送数据。那么问题来了:

  • 生产者是怎么知道该向哪个分区发送数据呢?

这就是我们接下来要研究的分区策略

分区策略

其实我们在上一篇文章中已经见到了,看这里:

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) 
    // 如果在消息中指定了分区
    if (record.partition() != null)
        return record.partition();

    if (partitioner != null) 
        // 分区器通过计算得到分区
        int customPartition = partitioner.partition(
            record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
        if (customPartition < 0) 
            throw new IllegalArgumentException(String.format(
                "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
        
        return customPartition;
    
  
    // 通过序列化key计算分区
    if (serializedKey != null && !partitionerIgnoreKeys) 
        // hash the keyBytes to choose a partition
        return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
     else 
        // 返回-1
        return RecordMetadata.UNKNOWN_PARTITION;
    

下面的代码可以说是整个分区器的核心部分,可以通过以下的步骤进行说明:

  • 如果在生产消息的时候,已经指定了需要发送的分区位置,那么就会直接使用已经指定的份具体的位置,这样子还节省了也不计算的时间
  • 如果在生产者配置Properties中指定了分区策略类,那么消息生产就会通过已经指定的分区策略类进行分区计算
  • 否则就会以serializedKey作为参数,通过hash取模的方式计算。如果serializedKey == null,那么就会采用粘性分区的逻辑。 这在Kafka中属于默认分区器。
  • 如果以上情况都没有包含,那么他就会直接返回-1。相当于ack=0的情况。

在Kafka中分区策略我们是可以自定义的。当然Kafka也为我们内置了三种分区策略类。 接下来我们挑个重点来介绍,来给我们自定义分区器做一个铺垫~

DefaultPartitioner

在当前版本中,如果没有对partitioner.class进行配置,此时的分区策略就会采用当前类作为默认分区策略类。

而以下是DefaultPartitioner策略类的核心实现方式,并且标记部分的代码实现其实就是UniformStickyPartitioner的计算逻辑

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) 
    if (keyBytes == null) 
        // 就是这段属于UniformStickyPartitioner的实现逻辑
        return stickyPartitionCache.partition(topic, cluster);
    
    return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);

还有一段代码让我们来一起看看

public static int partitionForKey(final byte[] serializedKey, final int numPartitions) 
    return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;

这段代码不管有多复杂,调用方法有多少,但最终我们是能够发现:

  • 它的本质其实是在对序列化Key做哈希计算,然后通过hash值和分区数做取模运算,然后得到结果分区位置

这是一种比较重要的计算方式,但却不是唯一的方式

---这是分割线---

接下来继续,我们看看如果无法对序列化Key计算,会是怎么样的计算逻辑?

我们先开始来看一下,是在哪个地方得到的serializedKey,并且什么情况下serializedKey会是NULL

看看下面的这个代码眼熟不?

// 生产者生产消息对象
ProducerRecord<String, String> record = new ProducerRecord<>(
        "newTopic001",
        "data from " + KafkaQuickProducer.class.getName()
);

// KafkaProducer#doSend()
// line994
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
public class StringSerializer implements Serializer<String> 
    // 省略。。。
    @Override
    public byte[] serialize(String topic, String data) 
        if (data == null) 
            return null;
         else 
            return data.getBytes(encoding);
        
    

从上面的代码来看,基本上能够实锤了:

  • 当在生成ProducerRecord对象的时候,如果没有对消息设置key参数,此时序列化之后的key就是个null
  • 那么当序列化之后的Key为NULL之后,此时分区计算逻辑就会改变。

此时相当于我们已经进入到UniformStickyPartitioner的计算逻辑, 当然了在我们使用的3.3版本中当前类也已经被标注为过期

根据前面的说法,粘性分区主要解决了消息无Key的分区计算逻辑,那么粘性分区并不是说每次都使用同一个分区

它是通过一个大Batch为单位,尽量将batch内的消息固定在同一个分区内,这样在很大程度上能够保证:

  • 防止消息无规律的分散在不同的分区内,降低分区倾斜
  • 同时不需要每次进行分区计算,也降低了Producer的延迟

而当Batch内消息满足发送条件被发送出去之后,才会开始再次计算下一个分区,为此在KafkaProducer中还专门调用了新的方法

partitioner.onNewBatch(topic, cluster, prevPartition);
public void onNewBatch(String topic, Cluster cluster, int prevPartition) 
    stickyPartitionCache.nextPartition(topic, cluster, prevPartition);

RoundRobinPartitioner

这是在当前版本中唯一没有被标注的类,未来说不定会成为默认分区策略类,我们不看

以上是关于分布式流处理组件-优化篇:Producer生产调优之核心参数的主要内容,如果未能解决你的问题,请参考以下文章

夯实Kafka知识体系及基本功分析一下生产者(Producer)实现原理分析「原理篇」

分布式流处理组件-理论篇:Kafka与安装配置

分布式流处理组件-理论实战结合篇:Kafka架构理论

Kafka 详解------Producer生产者

Kafka 调优

2.kafka入门