kafka源码分析 创建Topic时 分区分配分析

Posted 顧棟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka源码分析 创建Topic时 分区分配分析相关的知识,希望对你有一定的参考价值。

kafka 1.1 创建Topic时 分区分配分析

分区分配指的是为集群创建Topic时的partition的副本分配,就是Topic的partition分配在哪些broker。

分区副本分配方式

不考虑机架因素进行分区分配

主要方法assignReplicasToBrokersRackUnaware代码

private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
                                               replicationFactor: Int,
                                               brokerList: Seq[Int],
                                               fixedStartIndex: Int,
                                               startPartitionId: Int): Map[Int, Seq[Int]] = {
  val ret = mutable.Map[Int, Seq[Int]]()
  val brokerArray = brokerList.toArray
  // 根据brokers长度随机产生一个数 作为开始下标
  val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
  var currentPartitionId = math.max(0, startPartitionId)
  // 代表副本之间的broker间隔数,为了将副本分片更均匀的分配到brokers中
  var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
  // 轮询所有分区,将每个分区的副本分配到broker中
  for (_ <- 0 until nPartitions) {
    // 分区编号大于0 且 分区编号能被brokers的长度整除时 副本间隔数+1
    if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
      nextReplicaShift += 1
    // 第一个副本的下标是 当前分区编号+startIndex 后 与broker的个数取余数
    val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
    val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
    // 确定了分区的第一个副本的broker之后 通过 replicaIndex获取其余副本的broker
    for (j <- 0 until replicationFactor - 1)
      replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
    // 将此分区的副本信息保存到ret中 key为分区编号。
    ret.put(currentPartitionId, replicaBuffer)
    // 为下一个分区分配副本
    currentPartitionId += 1
  }
  ret
}

入参

  • nPartitions:分区数
  • replicationFactor:副本因子
  • brokerList:集群中的brokers
  • fixedStartIndex:第一个副本起始下标 默认值-1
  • startPartitionId:起始分区编号 默认值-1

结果

  • ret:分区分配的结果

分区下标计算方法

private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
  val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
  (firstReplicaIndex + shift) % nBrokers
}

入参

  • firstReplicaIndex:第一个副本在brokers中的下标
  • secondReplicaShift:副本之间的broker间隔数
  • replicaIndex:副本的下标
  • nBrokers:broker列表

fixedStartIndex和startPartitionId由上层方法调用的时候 传入的值都为-1。

出参

  • broker的下标

为一个分区分配所有的副本后,会为下一个分区分配副本,直到所有的分区副本分配结束。

for (_ <- 0 until nPartitions) {}循环中,从第一个分区进行副本的分配,每个分区分配的结果以分区编号为key 存入 set中,返回给上层调用放方法。

情况模拟

package com.donny.kafka.topic;

import java.util.*;

public class PartitionDistribution {

    public static void main(String[] args) {
        List<Integer> brokerList = new ArrayList<>();
        brokerList.add(1);
        brokerList.add(2);
        brokerList.add(3);
        printMap(assignReplicasToBrokersRackUnaware(6, 2, brokerList, -1, -1));
    }


    private static Map<Integer, List<Integer>> assignReplicasToBrokersRackUnaware(int partitions,
                                                                                  int replicationFactor,
                                                                                  List<Integer> brokerList,
                                                                                  int fixedStartIndex,
                                                                                  int startPartitionId) {
        Random random = new Random();
        Map<Integer, List<Integer>> ret = new HashMap<>();
        int brokerLength = brokerList.size();
        int startIndex = random.nextInt(brokerLength);
        System.out.println("startIndex=" + startIndex);
        int currentPartitionId = Math.max(0, startPartitionId);
        int nextReplicaShift = random.nextInt(brokerLength);
        for (int i = 0; i < partitions; i++) {
            List<Integer> list = new ArrayList<>();
            if (currentPartitionId > 0 && (currentPartitionId % brokerLength == 0)) {
                nextReplicaShift += 1;
            }
            System.out.println("currentPartitionId=" + currentPartitionId + ", nextReplicaShift=" + nextReplicaShift);
            int firstReplicaIndex = (currentPartitionId + startIndex) % brokerLength;
            list.add(brokerList.get(firstReplicaIndex));
            for (int j = 0; j < replicationFactor - 1; j++) {
                list.add(brokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size())));
            }
            ret.put(currentPartitionId, list);
            currentPartitionId += 1;
        }
        return ret;
    }

    private static int replicaIndex(int firstReplicaIndex, int secondReplicaShift, int replicaIndex, int nBrokers) {
        int shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1);
        return (firstReplicaIndex + shift) % nBrokers;
    }

    private static void printMap(Map<Integer, List<Integer>> map) {
        for (Integer key : map.keySet()) {
            System.out.println("partitionId=" + key +
                    ", partitionDistribution=" + Arrays.toString(map.get(key).toArray()));
        }
    }

}
startIndex=2
currentPartitionId=0, nextReplicaShift=2
currentPartitionId=1, nextReplicaShift=2
currentPartitionId=2, nextReplicaShift=2
currentPartitionId=3, nextReplicaShift=3
currentPartitionId=4, nextReplicaShift=3
currentPartitionId=5, nextReplicaShift=3
partitionId=0, partitionDistribution=[3, 1]
partitionId=1, partitionDistribution=[1, 2]
partitionId=2, partitionDistribution=[2, 3]
partitionId=3, partitionDistribution=[3, 2]
partitionId=4, partitionDistribution=[1, 3]
partitionId=5, partitionDistribution=[2, 1]

从运行结果来看,分配的是相对平均的。随机数,为了增加平均度。

考虑机架因素进行分区分配

主要方法assignReplicasToBrokersRackAware代码

private def assignReplicasToBrokersRackAware(nPartitions: Int,
                                             replicationFactor: Int,
                                             brokerMetadatas: Seq[BrokerMetadata],
                                             fixedStartIndex: Int,
                                             startPartitionId: Int): Map[Int, Seq[Int]] = {
  val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) =>
    id -> rack
  }.toMap
  // 获取机架的个数
  val numRacks = brokerRackMap.values.toSet.size
  // 获取按机架排序后的broker列表
  val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
  val numBrokers = arrangedBrokerList.size
  val ret = mutable.Map[Int, Seq[Int]]()
  // 根据brokers长度随机产生一个数 作为开始下标
  val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
  var currentPartitionId = math.max(0, startPartitionId)
  //  代表副本之间的broker间隔数,为了将副本分片更均匀的分配到brokers中
  var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
  for (_ <- 0 until nPartitions) {
    // 分区编号大于0 且 分区编号能被brokers的长度整除时 副本间隔数+1
    if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0))
      nextReplicaShift += 1
    val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size
    // 找到分区的第一个副本的broker编号
    val leader = arrangedBrokerList(firstReplicaIndex)
    val replicaBuffer = mutable.ArrayBuffer(leader)
    // 找到已分配的broker的机架的信息
    val racksWithReplicas = mutable.Set(brokerRackMap(leader))
    val brokersWithReplicas = mutable.Set(leader)
    var k = 0
    // 继续为分区分配剩余的副本
    for (_ <- 0 until replicationFactor - 1) {
      var done = false
      while (!done) {
        val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size))
        val rack = brokerRackMap(broker)
        // Skip this broker if
        // 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks
        //    that do not have any replica, or
        // 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned
        if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
            && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) {
          replicaBuffer += broker
          racksWithReplicas += rack
          brokersWithReplicas += broker
          done = true
        }
        k += 1
      }
    }
    ret.put(currentPartitionId, replicaBuffer)
    currentPartitionId += 1
  }
  ret
}

入参

  • nPartitions:分区数
  • replicationFactor:副本因子
  • brokerMetadatas:集群中的brokers元数据信息
  • fixedStartIndex:第一个副本起始下标 默认值-1
  • startPartitionId:起始分区编号 默认值-1

出参

  • ret:分区分配的结果

除了考虑未考虑机架信息的分配方式,还有以下条件

  1. 如果所有的机架和broker上都有此分区的副本了,那算出的broker会被分配,否者轮询下一个。

  2. 如果所有机架上没有此分区副本和此broker未被分配分区,那算出的broker会被分配

  3. 如果所有机架上没有此分区副本和所有的broker都分配过分区了,那算出的broker会被分配

  4. 如果所有机架上已经有此分区副本和此broker未分配过,那算出的broker会被分配

以上是关于kafka源码分析 创建Topic时 分区分配分析的主要内容,如果未能解决你的问题,请参考以下文章

Kafka源码分析 Topic与Partition使用

Kafka 分区重分配源码分析

Kafka Rebalance机制分析

Kafka分区分配策略分析——重点:StickyAssignor

kafka 消费者分区分配策略

kafka源码ReassignPartitionsCommand分区副本重分配源码原理分析(附配套教学视频)