Kafka的分区规则(轮询分区黏性分区)/ 生产者实现生产数据的负载均衡

Posted 在上树的路上

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka的分区规则(轮询分区黏性分区)/ 生产者实现生产数据的负载均衡相关的知识,希望对你有一定的参考价值。

1.分区参数partition说明

在说Kafka分区规则前,先看一下partition的计算方法以确定数据的分区。从而使topic加上分区编号构建分区对象,将数据写入该分区中。代码如下。

int partition = this.partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);

record是生产者数据对象,它的全参构建方法代码如下,通过参数Integer partition可以指定数据的传入分区。

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) 
        this(topic, partition, timestamp, key, value, (Iterable)null);
    

下面是对于传入参数的处理。根据传入的ProducerRecord对象,判断是否有指定数据传入分区,如果没有指定,则默认使用KafkaProducer分区规则;如果指定了,则直接返回指定分区。

 private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) 
        Integer partition = record.partition();
        return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    

2.默认分区规则

  • 如果指定了key:按照key的hash/mur取余分区的个数,来写入对应分区
  • 如果没有指定key:按照黏性分区写入
public class DefaultPartitioner implements Partitioner 
    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

    public DefaultPartitioner() 
    

    public void configure(Map<String, ?> configs) 
    
	//*客官~~~来~~看这边看这边看这边*
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) 
		//首先判断是否有传入key
        if (keyBytes == null) 
        	//如果没有,则返回**黏性分区**
            return this.stickyPartitionCache.partition(topic, cluster);
         else 
        	//获取这个topic的所有分区对象
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            //获取分区个数
            int numPartitions = partitions.size();
            //使用key的mur值取余分区个数(和哈希取余的方法类似)得到当前key对应value的写入分区
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        
    

    public void close() 
    

    public void onNewBatch(String topic, Cluster cluster, int prevPartition) 
        this.stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
    


3.轮询分区

在2.4版本之前,没有黏性分区,默认的是轮询分区。
它的原理很简单:就是轮着来给分区。

Topicpartkeyvalue
topic101value1
topic112value2
topic123value3
topic104value4
topic115value5
topic126value6

优点:数据分配均衡
缺点:性能较差

性能差的原因:

  • Kafka生产者写入数据过程
  • step1:先将数据放入一个批次中,判断是否达到条件,达到条件才将整个批次的数据写入kafka
    • 批次满了【batch.size】
    • 达到一定时间【linger.ms】
  • step2:根据数据属于哪个分区,就与分区构建一个连接,发送这个分区的批次的数据
    • 第一条数据:先构建0分区的连接,第二条不是0分区的,所以直接构建一个批次,发送第—条
    • 第二条数据:先构建1分区的连接,第三条不是1分区的,所以直接构建一个批次,发送第二条
    • 每条数据需要构建一个批次,9条数据,9个批次,每个批次只有—条数据,每个批次又得构建连接,性能很差
  • 总结:轮询分区会将一条数据构成一个批次,每个批次都需要构建一个连接,性能差
  • 希望:批次少,每个批次数据量多,性能比较好

源码:

public class RoundRobinPartitioner implements Partitioner 
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    public RoundRobinPartitioner() 
    

    public void configure(Map<String, ?> configs) 
    

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) 
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int nextValue = this.nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) 
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return ((PartitionInfo)availablePartitions.get(part)).partition();
         else 
            return Utils.toPositive(nextValue) % numPartitions;
        
    

    private int nextValue(String topic) 
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> 
            return new AtomicInteger(0);
        );
        return counter.getAndIncrement();
    

    public void close() 
    

4.黏性分区

2.4版本后,默认分区改为黏性分区

  • 设计:实现少批次多数据
  • 规则:判断缓存中是否有这个topic的分区连接,如果有,直接使用,如果没有随机写入一个分区,并且放入缓存
  • 优点:性能好,整体数据的分配相对均衡
  • 缺点:分配没有绝对均衡
  • 思想总结:随机选择分区
  • 将一个批次的数据放入缓存,随机与一个分区构建连接,将数据全部传入分区,清空缓存。然后再将一个批次的数据放入缓存,随机·······

数据写入过程:

  • 第一次:将所有数据随机选择一个分区,全部写入这个分区中,将这次的分区编号放入缓存中
Topicpartkeyvalue
topic111value1
topic112value2
topic113value3
topic114value4
topic115value5
topic116value6
  • 第二次开始根据缓存中是否有上一次的编号
    • 有:直接使用上一次的编号
    • 没有:重新选择一个

源码:

public class StickyPartitionCache 
    private final ConcurrentMap<String, Integer> indexCache = new ConcurrentHashMap();

    public StickyPartitionCache() 
    

    public int partition(String topic, Cluster cluster) 
        Integer part = (Integer)this.indexCache.get(topic);
        return part == null ? this.nextPartition(topic, cluster, -1) : part;
    

    public int nextPartition(String topic, Cluster cluster, int prevPartition) 
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        Integer oldPart = (Integer)this.indexCache.get(topic);
        Integer newPart = oldPart;
        if (oldPart != null && oldPart != prevPartition) 
            return (Integer)this.indexCache.get(topic);
         else 
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            Integer random;
            if (availablePartitions.size() < 1) 
            	//这边-------------这边
            	//核心思想:随机选择一个分区
                random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = random % partitions.size();
             else if (availablePartitions.size() == 1) 
                newPart = ((PartitionInfo)availablePartitions.get(0)).partition();
             else 
                while(newPart == null || newPart.equals(oldPart)) 
                    random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                    newPart = ((PartitionInfo)availablePartitions.get(random % availablePartitions.size())).partition();
                
            

            if (oldPart == null) 
                this.indexCache.putIfAbsent(topic, newPart);
             else 
                this.indexCache.replace(topic, prevPartition, newPart);
            

            return (Integer)this.indexCache.get(topic);
        
    

5.总结

Kafka中生产数据的分区规则是什么?
Kafka生产者怎么实现生产数据的负载均衡?
为什么生产数据的方式不同,分区规则就不一样?

  1. 先判断是否指定了分区
  2. 如果指定了,就写入指定的分区
  3. 再判断是否指定了Key
  4. 如果指定了Key,按照Key的mur取余分区个数来决定
  5. 如果没有指定Key,按照黏性分区

以上是关于Kafka的分区规则(轮询分区黏性分区)/ 生产者实现生产数据的负载均衡的主要内容,如果未能解决你的问题,请参考以下文章

Kafka生产者

kafka分区

kafka自定义分区规则

kafka-2-生产者分区机制/消息压缩/无丢失配置介绍

Kafka-分区分配策略

小记--------kafka生产者分区原则及故障处理机制