Kafka的分区规则(轮询分区黏性分区)/ 生产者实现生产数据的负载均衡
Posted 在上树的路上
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka的分区规则(轮询分区黏性分区)/ 生产者实现生产数据的负载均衡相关的知识,希望对你有一定的参考价值。
1.分区参数partition说明
在说Kafka分区规则前,先看一下partition的计算方法以确定数据的分区。从而使topic加上分区编号构建分区对象,将数据写入该分区中。代码如下。
int partition = this.partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
record是生产者数据对象,它的全参构建方法代码如下,通过参数Integer partition可以指定数据的传入分区。
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
this(topic, partition, timestamp, key, value, (Iterable)null);
下面是对于传入参数的处理。根据传入的ProducerRecord对象,判断是否有指定数据传入分区,如果没有指定,则默认使用KafkaProducer分区规则;如果指定了,则直接返回指定分区。
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster)
Integer partition = record.partition();
return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
2.默认分区规则
- 如果指定了key:按照key的hash/mur取余分区的个数,来写入对应分区
- 如果没有指定key:按照黏性分区写入
public class DefaultPartitioner implements Partitioner
private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
public DefaultPartitioner()
public void configure(Map<String, ?> configs)
//*客官~~~来~~看这边看这边看这边*
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
//首先判断是否有传入key
if (keyBytes == null)
//如果没有,则返回**黏性分区**
return this.stickyPartitionCache.partition(topic, cluster);
else
//获取这个topic的所有分区对象
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
//获取分区个数
int numPartitions = partitions.size();
//使用key的mur值取余分区个数(和哈希取余的方法类似)得到当前key对应value的写入分区
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
public void close()
public void onNewBatch(String topic, Cluster cluster, int prevPartition)
this.stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
3.轮询分区
在2.4版本之前,没有黏性分区,默认的是轮询分区。
它的原理很简单:就是轮着来给分区。
Topic | part | key | value |
---|---|---|---|
topic1 | 0 | 1 | value1 |
topic1 | 1 | 2 | value2 |
topic1 | 2 | 3 | value3 |
topic1 | 0 | 4 | value4 |
topic1 | 1 | 5 | value5 |
topic1 | 2 | 6 | value6 |
优点:数据分配均衡
缺点:性能较差
性能差的原因:
- Kafka生产者写入数据过程
- step1:先将数据放入一个批次中,判断是否达到条件,达到条件才将整个批次的数据写入kafka
- 批次满了【batch.size】
- 达到一定时间【linger.ms】
- step2:根据数据属于哪个分区,就与分区构建一个连接,发送这个分区的批次的数据
- 第一条数据:先构建0分区的连接,第二条不是0分区的,所以直接构建一个批次,发送第—条
- 第二条数据:先构建1分区的连接,第三条不是1分区的,所以直接构建一个批次,发送第二条
- …
- 每条数据需要构建一个批次,9条数据,9个批次,每个批次只有—条数据,每个批次又得构建连接,性能很差
- 总结:轮询分区会将一条数据构成一个批次,每个批次都需要构建一个连接,性能差
- 希望:批次少,每个批次数据量多,性能比较好
源码:
public class RoundRobinPartitioner implements Partitioner
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();
public RoundRobinPartitioner()
public void configure(Map<String, ?> configs)
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty())
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
else
return Utils.toPositive(nextValue) % numPartitions;
private int nextValue(String topic)
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) ->
return new AtomicInteger(0);
);
return counter.getAndIncrement();
public void close()
4.黏性分区
2.4版本后,默认分区改为黏性分区
- 设计:实现少批次多数据
- 规则:判断缓存中是否有这个topic的分区连接,如果有,直接使用,如果没有随机写入一个分区,并且放入缓存
- 优点:性能好,整体数据的分配相对均衡
- 缺点:分配没有绝对均衡
- 思想总结:随机选择分区
- 将一个批次的数据放入缓存,随机与一个分区构建连接,将数据全部传入分区,清空缓存。然后再将一个批次的数据放入缓存,随机·······
数据写入过程:
- 第一次:将所有数据随机选择一个分区,全部写入这个分区中,将这次的分区编号放入缓存中
Topic | part | key | value |
---|---|---|---|
topic1 | 1 | 1 | value1 |
topic1 | 1 | 2 | value2 |
topic1 | 1 | 3 | value3 |
topic1 | 1 | 4 | value4 |
topic1 | 1 | 5 | value5 |
topic1 | 1 | 6 | value6 |
- 第二次开始根据缓存中是否有上一次的编号
- 有:直接使用上一次的编号
- 没有:重新选择一个
源码:
public class StickyPartitionCache
private final ConcurrentMap<String, Integer> indexCache = new ConcurrentHashMap();
public StickyPartitionCache()
public int partition(String topic, Cluster cluster)
Integer part = (Integer)this.indexCache.get(topic);
return part == null ? this.nextPartition(topic, cluster, -1) : part;
public int nextPartition(String topic, Cluster cluster, int prevPartition)
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
Integer oldPart = (Integer)this.indexCache.get(topic);
Integer newPart = oldPart;
if (oldPart != null && oldPart != prevPartition)
return (Integer)this.indexCache.get(topic);
else
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
Integer random;
if (availablePartitions.size() < 1)
//这边-------------这边
//核心思想:随机选择一个分区
random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
else if (availablePartitions.size() == 1)
newPart = ((PartitionInfo)availablePartitions.get(0)).partition();
else
while(newPart == null || newPart.equals(oldPart))
random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = ((PartitionInfo)availablePartitions.get(random % availablePartitions.size())).partition();
if (oldPart == null)
this.indexCache.putIfAbsent(topic, newPart);
else
this.indexCache.replace(topic, prevPartition, newPart);
return (Integer)this.indexCache.get(topic);
5.总结
Kafka中生产数据的分区规则是什么?
Kafka生产者怎么实现生产数据的负载均衡?
为什么生产数据的方式不同,分区规则就不一样?
- 先判断是否指定了分区
- 如果指定了,就写入指定的分区
- 再判断是否指定了Key
- 如果指定了Key,按照Key的mur取余分区个数来决定
- 如果没有指定Key,按照黏性分区
以上是关于Kafka的分区规则(轮询分区黏性分区)/ 生产者实现生产数据的负载均衡的主要内容,如果未能解决你的问题,请参考以下文章