分布式流处理组件-优化篇:Producer生产调优之核心参数
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式流处理组件-优化篇:Producer生产调优之核心参数相关的知识,希望对你有一定的参考价值。
前言
前面我们花了较长的时间对生产者Producer理论、Producer分区做了一个比较细致的介绍。详细大家在认真阅读完前两节的内容之后会对Kafka的生产者有一个比较清晰的认知。
其中我们需要重点掌握的内容是:Producer发送消息的过程,如果有不清楚的建议返回好好品味。
- 《分布式流处理组件-理论篇:Producer入门理论》
我们需要注意的是:
- 在生产环境由于物理机器等资源配置的影响,也为了更大程度上保证资源的利用率,我们都会对各个组件进行适配。
而Kafka的Producer也是一样的。在生产环境中也有很多需要注意的点。本章我们就来好好的聊一聊~
啰嗦两句Producer的消息发送原理
如上图所示,消息数据通过主线程调用producer.send()
将其发送出去,其中经过拦截器、序列化器、分区器的层层加工之后,记录缓冲区RecordAccumulator
会将加工之后的消息记录添加到其中。而消息也不是单纯的存在于RecordAccumulator
中,为了降低网络IO,Producer将其按照batch的形式进行存放。
而batch消息也会根据分区器计算得到的分区号存在于对应的Deque
双端队列中,所以他们的关系就是图中一层包一层的样子
当batch满足某个条件或者消息等待指定时间之后,sender线程被拉起,Sender程序将不断从缓冲区取出数据,进而进入到另外一个阶段
从缓冲区拉取出来的数据会被封装为Request
对象,并且与缓存区类似的是:
NetworkClient
中同样会存在一个类似缓冲区的存在:InFlightRequests
。其中会按照分区对Request进行存储。所以他们的逻辑关系其实是这样的
随后进行发送,而在默认情况下,如果Broker端一直没有响应,每个分区下的Request只能存放5个请求。而超出的情况将会阻塞发送逻辑。
消息发送成功后,将会清空原始数据。否则尝试重试等操作
Producer高吞吐
前置条件
来吧,啰嗦完Producer发送过程之后,就到了精彩的测试验证环节。有句话需要重点说明下:
- 以下验证结果不属银弹,实际生产中参数配置如何: 还是需要结合实际业务场景和资源配置给出最佳参数
机器配置:
- 3台 2C4G 的虚拟机
- Topic设置分区数为3,副本因子为2
测试脚本
本次Producer吞吐量测试脚本在Kafka中已经提供,我们在$KAFKA_HOME/kafka_2.13-3.3.1/bin
中可以找到kafka-producer-perf-test.sh
。如下对执行参数进行简单说明:
- --throughput: 限制测试发送消息的最大吞吐量,-1表示不受限制
- --num-records: 测试消息的数据量。
- --record-size: 每条消息的大小。
- --producer-props: Producer可配置参数信息,我们也可以通过properties文件的方式配置到
--producer.config
默认参数负载情况
kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 --throughput -1 --num-records 100000 --record-size 1024
发送10W条消息大小为1KB的消息到newTopic_test001上,不设置吞吐限制
30421 records sent, 6083.0 records/sec (5.94 MB/sec), 2125.8 ms avg latency, 3695.0 ms max latency.
55005 records sent, 11001.0 records/sec (10.74 MB/sec), 3129.7 ms avg latency, 4864.0 ms max latency.
100000 records sent, 9042.408898 records/sec (8.83 MB/sec), 2719.28 ms avg latency, 4864.00 ms max latency, 2598 ms 50th, 4250 ms 95th, 4734 ms 99th, 4832 ms 99.9th.
输出表示:
- 成功消费100000记录,吞吐量:10101.010101条/s (9.86 MB/sec)
- 平均延迟2451.60ms,最大延迟4025.00ms
- 50%消息延迟2455ms, 95%消息延迟3581ms, 99%消息延迟3875ms, 99.9%消息延迟4016ms。
参数介绍
下面要介绍的这几个参数,在Producer端:任意一个都可以称为王牌的存在。接下来我们就一一来看看:
batch.size
每当发送多个消息时,为了提高客户端和服务器的性能,生产者将尝试对多个消息进行打包成批,保证这一批可以在同一个分区内。默认为16384【16KB】
为了尽可能的提高吞吐量,在实际生产中需要对发送的消息进行合理预估,根据实际情况选择一个合理的大小,避免出现如下情况:
- 单条消息超过
batch.size
,Producer有可能不会处理此消息 batch.size
过大,有可能会造成Producer端内存空间的浪费batch.size
过小,频繁的网络IO会降低Producer的吞吐
linger.ms
如果消息迟迟没有达到batch.size
,那么将尝试等待linger.ms
时间发送。默认等待时间为0,也就是当消息到达之后立即发送
这两个参数我们或多或少都介绍过,但是并没有测试调整它们对吞吐量的影响。接下来我们就先来测试一波吧!!!
测试batch.size
和linger.ms
对吞吐量的影响
对照以上默认测试结果,然后我们开始进行参数调整。
跟着我的节奏,先对单个参数进行调整,对比差异
- batch.size=32768
kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 batch.size=32768 --throughput -1 --num-records 100000 --record-size 1024
68697 records sent, 13733.9 records/sec (13.41 MB/sec), 900.0 ms avg latency, 1812.0 ms max latency.
100000 records sent, 14817.009927 records/sec (14.47 MB/sec), 1129.92 ms avg latency, 1957.00 ms max latency, 1167 ms 50th, 1884 ms 95th, 1932 ms 99th, 1951 ms 99.9th.
100000 records sent, 22311.468095 records/sec (21.79 MB/sec), 1036.42 ms avg latency, 1467.00 ms max latency, 1100 ms 50th, 1431 ms 95th, 1449 ms 99th, 1462 ms 99.9th.
100000 records sent, 21753.317381 records/sec (21.24 MB/sec), 1001.94 ms avg latency, 1646.00 ms max latency, 955 ms 50th, 1614 ms 95th, 1631 ms 99th, 1639 ms 99.9th.
对比默认结果,我们可以看到在吞吐量上已经有了非常明显的提高
- linger.ms=3000
kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 linger.ms=3000 --throughput -1 --num-records 100000 --record-size 1024
60526 records sent, 12102.8 records/sec (11.82 MB/sec), 1632.5 ms avg latency, 2337.0 ms max latency.
100000 records sent, 12280.486307 records/sec (11.99 MB/sec), 1882.64 ms avg latency, 2591.00 ms max latency, 2050 ms 50th, 2553 ms 95th, 2571 ms 99th, 2584 ms 99.9th.
64966 records sent, 12990.6 records/sec (12.69 MB/sec), 1556.1 ms avg latency, 2757.0 ms max latency.
100000 records sent, 13540.961408 records/sec (13.22 MB/sec), 1719.67 ms avg latency, 2757.00 ms max latency, 1474 ms 50th, 2701 ms 95th, 2730 ms 99th, 2744 ms 99.9th.
71776 records sent, 14355.2 records/sec (14.02 MB/sec), 1506.2 ms avg latency, 2176.0 ms max latency.
100000 records sent, 14320.492625 records/sec (13.98 MB/sec), 1577.49 ms avg latency, 2176.00 ms max latency, 1668 ms 50th, 2134 ms 95th, 2154 ms 99th, 2170 ms 99.9th.
同样还是要和默认测试结果进行对比,虽然吞吐量没有配置batch.size
的效果差异,但也不能说本次调整不重要
- 合并参数测试
kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 linger.ms=3000 batch.size=32768 --throughput -1 --num-records 100000 --record-size 1024
100000 records sent, 21739.130435 records/sec (21.23 MB/sec), 1007.46 ms avg latency, 1428.00 ms max latency, 995 ms 50th, 1400 ms 95th, 1411 ms 99th, 1424 ms 99.9th.
100000 records sent, 23041.474654 records/sec (22.50 MB/sec), 952.25 ms avg latency, 1334.00 ms max latency, 919 ms 50th, 1302 ms 95th, 1318 ms 99th, 1328 ms 99.9th.
100000 records sent, 24271.844660 records/sec (23.70 MB/sec), 916.17 ms avg latency, 1318.00 ms max latency, 912 ms 50th, 1278 ms 95th, 1309 ms 99th, 1314 ms 99.9th.
已经可以看出对比了吧。 我们继续~~~
compression.type
该参数对Producer生产的数据进行压缩,主要针对批数据压缩。默认是none【无压缩】,可以用来设置的值:
- gzip
- snappy
- lz4
- zstd
接下来我们直接测试这几个压缩算法的性能吧,为了方便写个脚本
#!/bin/bash
for i in gzip snappy lz4 zstd
do
for ((j=0; j<3; j++))
do
echo "----$i方式的第$j次测试----"
kafka-producer-perf-test.sh --topic newTopic_test002 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 batch.size=65536 compression.type=$i --throughput -1 --num-records 100000 --record-size 1024
done
done
----gzip方式的第0次测试----
100000 records sent, 25425.883549 records/sec (24.83 MB/sec), 136.45 ms avg latency, 744.00 ms max latency, 97 ms 50th, 451 ms 95th, 677 ms 99th, 742 ms 99.9th.
----gzip方式的第1次测试----
100000 records sent, 27019.724399 records/sec (26.39 MB/sec), 54.22 ms avg latency, 291.00 ms max latency, 27 ms 50th, 174 ms 95th, 228 ms 99th, 288 ms 99.9th.
----gzip方式的第2次测试----
100000 records sent, 27940.765577 records/sec (27.29 MB/sec), 21.55 ms avg latency, 267.00 ms max latency, 11 ms 50th, 75 ms 95th, 97 ms 99th, 130 ms 99.9th.
===========分割线=============
----snappy方式的第0次测试----
100000 records sent, 34153.005464 records/sec (33.35 MB/sec), 518.28 ms avg latency, 1047.00 ms max latency, 479 ms 50th, 969 ms 95th, 1029 ms 99th, 1044 ms 99.9th.
----snappy方式的第1次测试----
100000 records sent, 32765.399738 records/sec (32.00 MB/sec), 474.20 ms avg latency, 985.00 ms max latency, 476 ms 50th, 921 ms 95th, 971 ms 99th, 984 ms 99.9th.
----snappy方式的第2次测试----
100000 records sent, 34578.146611 records/sec (33.77 MB/sec), 450.48 ms avg latency, 932.00 ms max latency, 442 ms 50th, 869 ms 95th, 911 ms 99th, 928 ms 99.9th.
===========分割线=============
----lz4方式的第0次测试----
100000 records sent, 34059.945504 records/sec (33.26 MB/sec), 474.75 ms avg latency, 871.00 ms max latency, 461 ms 50th, 818 ms 95th, 852 ms 99th, 867 ms 99.9th.
----lz4方式的第1次测试----
100000 records sent, 37174.721190 records/sec (36.30 MB/sec), 408.28 ms avg latency, 769.00 ms max latency, 418 ms 50th, 716 ms 95th, 753 ms 99th, 767 ms 99.9th.
----lz4方式的第2次测试----
100000 records sent, 32404.406999 records/sec (31.64 MB/sec), 483.38 ms avg latency, 1145.00 ms max latency, 372 ms 50th, 1077 ms 95th, 1122 ms 99th, 1142 ms 99.9th.
===========分割线=============
----zstd方式的第0次测试----
100000 records sent, 51975.051975 records/sec (50.76 MB/sec), 57.53 ms avg latency, 279.00 ms max latency, 54 ms 50th, 108 ms 95th, 141 ms 99th, 158 ms 99.9th.
----zstd方式的第1次测试----
100000 records sent, 47755.491882 records/sec (46.64 MB/sec), 66.12 ms avg latency, 333.00 ms max latency, 49 ms 50th, 208 ms 95th, 316 ms 99th, 332 ms 99.9th.
----zstd方式的第2次测试----
100000 records sent, 51203.277010 records/sec (50.00 MB/sec), 60.27 ms avg latency, 265.00 ms max latency, 49 ms 50th, 148 ms 95th, 172 ms 99th, 193 ms 99.9th.
===========分割线=============
根据本次测试,而且测试命令也运行了多次,确实是zstd
较好。 而我们再看看官网给出的说明:
acks
前面章节中其实我们介绍过acks
这个参数。
- "0": 当消息调用
send()
发送出去之后就表示消息已经发送成功,不管消息是否已经到达broker - "1": 消息发送后,Leader接收到消息并记录到本地之后,不需要同步数据到副本就能进行ack返回
- "all": 当消息在Leader接收记录,并且等待副本数据同步完成之后,才会返回ack。 该级别也属于
Java#Producer
的默认配置
基本就是默认配置,只不过多了一个acks
的配置项
#!/bin/bash
for i in 0 1 all
do
echo "---------------acks=$i------------------"
for ((j=0; j<3; j++))
do
kafka-producer-perf-test.sh --topic newTopic_test001 --producer-props bootstrap.servers=master:9092,node01:9092,node02:9092 acks=$i --throughput -1 --num-records 100000 --record-size 1024
done
done
--------------acks=0------------------
100000 records sent, 43878.894252 records/sec (42.85 MB/sec), 60.90 ms avg latency, 367.00 ms max latency, 20 ms 50th, 218 ms 95th, 297 ms 99th, 363 ms 99.9th.
100000 records sent, 38699.690402 records/sec (37.79 MB/sec), 121.56 ms avg latency, 823.00 ms max latency, 60 ms 50th, 469 ms 95th, 736 ms 99th, 814 ms 99.9th.
100000 records sent, 37650.602410 records/sec (36.77 MB/sec), 127.71 ms avg latency, 441.00 ms max latency, 100 ms 50th, 351 ms 95th, 403 ms 99th, 440 ms 99.9th.
---------------acks=1------------------
100000 records sent, 30284.675954 records/sec (29.57 MB/sec), 387.87 ms avg latency, 897.00 ms max latency, 340 ms 50th, 799 ms 95th, 855 ms 99th, 893 ms 99.9th.
100000 records sent, 36995.930448 records/sec (36.13 MB/sec), 247.35 ms avg latency, 826.00 ms max latency, 199 ms 50th, 703 ms 95th, 789 ms 99th, 813 ms 99.9th.
100000 records sent, 37425.149701 records/sec (36.55 MB/sec), 394.01 ms avg latency, 850.00 ms max latency, 389 ms 50th, 784 ms 95th, 814 ms 99th, 841 ms 99.9th.
---------------acks=all------------------
76362 records sent, 15272.4 records/sec (14.91 MB/sec), 1422.4 ms avg latency, 1980.0 ms max latency.
100000 records sent, 16084.928422 records/sec (15.71 MB/sec), 1463.71 ms avg latency, 1980.00 ms max latency, 1628 ms 50th, 1903 ms 95th, 1960 ms 99th, 1974 ms 99.9th.
81151 records sent, 16223.7 records/sec (15.84 MB/sec), 1404.0 ms avg latency, 2201.0 ms max latency.
100000 records sent, 16906.170752 records/sec (16.51 MB/sec), 1397.82 ms avg latency, 2201.00 ms max latency, 1491 ms 50th, 2001 ms 95th, 2136 ms 99th, 2193 ms 99.9th.
88156 records sent, 17631.2 records/sec (17.22 MB/sec), 1245.0 ms avg latency, 1688.0 ms max latency.
100000 records sent, 17969.451932 records/sec (17.55 MB/sec), 1255.73 ms avg latency, 1688.00 ms max latency, 1403 ms 50th, 1598 ms 95th, 1635 ms 99th, 1668 ms 99.9th.
所以:
- 如果是类似日志、行为等不重要的消息,建议将
acks
设置为0. - 其他的就根据消息的安全程度来进行合理的选择吧~
下期预告
本期针对Producer调优参数的介绍和测试对比到这里就已经结束了。还是一句话:
- 从数据看结果:实际生产中进行测试,选择合理的配置参数信息是必须得~
下一期针对数据可靠性我们来做一个详细的介绍。 期待~
分布式流式处理组件-理论篇: Producer分区
前言
前面我们已经对Producer发送原理做了一个比较详细的说明,其中我们提到了分区器。其实从整体结构上来讲,分区器也是属于一个非常重要的知识点,所以我们来专门对分区以及分区策略等内容做一个介绍。
为什么需要分区
分区的作用
- 合理的使用存储资源:把海量的数据按照分区切割成一小块的数据存储在多台Broker上。此时能够保证每台服务器存储资源能够被充分利用到。而且小块数据在寻址时间上更有优势~
- 负载均衡: 数据生产或消费期间,生产者已分区的单位发送数据,消费者分区的单位进行消费。 期间,各分区生产和消费数据互不影响,这样能够达到合理控制分区任务的程度,提高任务的并行度。从而达到负载均衡的效果。
刚才我们提到:生产者已分区为单位向Broker发送数据。那么问题来了:
- 生产者是怎么知道该向哪个分区发送数据呢?
这就是我们接下来要研究的分区策略。
分区策略
其实我们在上一篇文章中已经见到了,看这里:
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster)
// 如果在消息中指定了分区
if (record.partition() != null)
return record.partition();
if (partitioner != null)
// 分区器通过计算得到分区
int customPartition = partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
if (customPartition < 0)
throw new IllegalArgumentException(String.format(
"The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
return customPartition;
// 通过序列化key计算分区
if (serializedKey != null && !partitionerIgnoreKeys)
// hash the keyBytes to choose a partition
return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
else
// 返回-1
return RecordMetadata.UNKNOWN_PARTITION;
下面的代码可以说是整个分区器的核心部分,可以通过以下的步骤进行说明:
- 如果在生产消息的时候,已经指定了需要发送的分区位置,那么就会直接使用已经指定的份具体的位置,这样子还节省了也不计算的时间
- 如果在生产者配置
Properties
中指定了分区策略类,那么消息生产就会通过已经指定的分区策略类进行分区计算 - 否则就会以
serializedKey
作为参数,通过hash取模的方式计算。如果serializedKey == null
,那么就会采用粘性分区的逻辑。 这在Kafka中属于默认分区器。 - 如果以上情况都没有包含,那么他就会直接返回-1。相当于
ack=0
的情况。
在Kafka中分区策略我们是可以自定义的。当然Kafka也为我们内置了三种分区策略类。 接下来我们挑个重点来介绍,来给我们自定义分区器做一个铺垫~
DefaultPartitioner
在当前版本中,如果没有对partitioner.class
进行配置,此时的分区策略就会采用当前类作为默认分区策略类。
而以下是DefaultPartitioner策略类的核心实现方式,并且标记部分的代码实现其实就是UniformStickyPartitioner
的计算逻辑
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions)
if (keyBytes == null)
// 就是这段属于UniformStickyPartitioner的实现逻辑
return stickyPartitionCache.partition(topic, cluster);
return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
还有一段代码让我们来一起看看
public static int partitionForKey(final byte[] serializedKey, final int numPartitions)
return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
这段代码不管有多复杂,调用方法有多少,但最终我们是能够发现:
- 它的本质其实是在对
序列化Key
做哈希计算,然后通过hash值和分区数做取模运算,然后得到结果分区位置
这是一种比较重要的计算方式,但却不是唯一的方式
---这是分割线---
接下来继续,我们看看如果无法对序列化Key计算,会是怎么样的计算逻辑?
我们先开始来看一下,是在哪个地方得到的serializedKey
,并且什么情况下serializedKey
会是NULL
看看下面的这个代码眼熟不?
// 生产者生产消息对象
ProducerRecord<String, String> record = new ProducerRecord<>(
"newTopic001",
"data from " + KafkaQuickProducer.class.getName()
);
// KafkaProducer#doSend()
// line994
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
public class StringSerializer implements Serializer<String>
// 省略。。。
@Override
public byte[] serialize(String topic, String data)
if (data == null)
return null;
else
return data.getBytes(encoding);
从上面的代码来看,基本上能够实锤了:
- 当在生成
ProducerRecord
对象的时候,如果没有对消息设置key参数,此时序列化之后的key就是个null - 那么当序列化之后的Key为NULL之后,此时分区计算逻辑就会改变。
此时相当于我们已经进入到UniformStickyPartitioner
的计算逻辑, 当然了在我们使用的3.3版本中当前类也已经被标注为过期
根据前面的说法,粘性分区主要解决了消息无Key的分区计算逻辑,那么粘性分区并不是说每次都使用同一个分区
它是通过一个大Batch为单位,尽量将batch内的消息固定在同一个分区内,这样在很大程度上能够保证:
- 防止消息无规律的分散在不同的分区内,降低分区倾斜
- 同时不需要每次进行分区计算,也降低了Producer的延迟
而当Batch内消息满足发送条件被发送出去之后,才会开始再次计算下一个分区,为此在KafkaProducer
中还专门调用了新的方法
partitioner.onNewBatch(topic, cluster, prevPartition);
public void onNewBatch(String topic, Cluster cluster, int prevPartition)
stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
RoundRobinPartitioner
这是在当前版本中唯一没有被标注的类,未来说不定会成为默认分区策略类,我们不看
以上是关于分布式流处理组件-优化篇:Producer生产调优之核心参数的主要内容,如果未能解决你的问题,请参考以下文章