kafka源码分析 创建Topic时 分区分配分析
Posted 顧棟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka源码分析 创建Topic时 分区分配分析相关的知识,希望对你有一定的参考价值。
文章目录
kafka 1.1 创建Topic时 分区分配分析
分区分配指的是为集群创建Topic时的partition的副本分配,就是Topic的partition分配在哪些broker。
分区副本分配方式
不考虑机架因素进行分区分配
主要方法assignReplicasToBrokersRackUnaware
代码
private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
replicationFactor: Int,
brokerList: Seq[Int],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
val ret = mutable.Map[Int, Seq[Int]]()
val brokerArray = brokerList.toArray
// 根据brokers长度随机产生一个数 作为开始下标
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
var currentPartitionId = math.max(0, startPartitionId)
// 代表副本之间的broker间隔数,为了将副本分片更均匀的分配到brokers中
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
// 轮询所有分区,将每个分区的副本分配到broker中
for (_ <- 0 until nPartitions) {
// 分区编号大于0 且 分区编号能被brokers的长度整除时 副本间隔数+1
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1
// 第一个副本的下标是 当前分区编号+startIndex 后 与broker的个数取余数
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
// 确定了分区的第一个副本的broker之后 通过 replicaIndex获取其余副本的broker
for (j <- 0 until replicationFactor - 1)
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
// 将此分区的副本信息保存到ret中 key为分区编号。
ret.put(currentPartitionId, replicaBuffer)
// 为下一个分区分配副本
currentPartitionId += 1
}
ret
}
入参
- nPartitions:分区数
- replicationFactor:副本因子
- brokerList:集群中的brokers
- fixedStartIndex:第一个副本起始下标 默认值-1
- startPartitionId:起始分区编号 默认值-1
结果
- ret:分区分配的结果
分区下标计算方法
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
入参
- firstReplicaIndex:第一个副本在brokers中的下标
- secondReplicaShift:副本之间的broker间隔数
- replicaIndex:副本的下标
- nBrokers:broker列表
fixedStartIndex和startPartitionId由上层方法调用的时候 传入的值都为-1。
出参
- broker的下标
为一个分区分配所有的副本后,会为下一个分区分配副本,直到所有的分区副本分配结束。
for (_ <- 0 until nPartitions) {}循环中,从第一个分区进行副本的分配,每个分区分配的结果以分区编号为key 存入 set中,返回给上层调用放方法。
情况模拟
package com.donny.kafka.topic;
import java.util.*;
public class PartitionDistribution {
public static void main(String[] args) {
List<Integer> brokerList = new ArrayList<>();
brokerList.add(1);
brokerList.add(2);
brokerList.add(3);
printMap(assignReplicasToBrokersRackUnaware(6, 2, brokerList, -1, -1));
}
private static Map<Integer, List<Integer>> assignReplicasToBrokersRackUnaware(int partitions,
int replicationFactor,
List<Integer> brokerList,
int fixedStartIndex,
int startPartitionId) {
Random random = new Random();
Map<Integer, List<Integer>> ret = new HashMap<>();
int brokerLength = brokerList.size();
int startIndex = random.nextInt(brokerLength);
System.out.println("startIndex=" + startIndex);
int currentPartitionId = Math.max(0, startPartitionId);
int nextReplicaShift = random.nextInt(brokerLength);
for (int i = 0; i < partitions; i++) {
List<Integer> list = new ArrayList<>();
if (currentPartitionId > 0 && (currentPartitionId % brokerLength == 0)) {
nextReplicaShift += 1;
}
System.out.println("currentPartitionId=" + currentPartitionId + ", nextReplicaShift=" + nextReplicaShift);
int firstReplicaIndex = (currentPartitionId + startIndex) % brokerLength;
list.add(brokerList.get(firstReplicaIndex));
for (int j = 0; j < replicationFactor - 1; j++) {
list.add(brokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size())));
}
ret.put(currentPartitionId, list);
currentPartitionId += 1;
}
return ret;
}
private static int replicaIndex(int firstReplicaIndex, int secondReplicaShift, int replicaIndex, int nBrokers) {
int shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1);
return (firstReplicaIndex + shift) % nBrokers;
}
private static void printMap(Map<Integer, List<Integer>> map) {
for (Integer key : map.keySet()) {
System.out.println("partitionId=" + key +
", partitionDistribution=" + Arrays.toString(map.get(key).toArray()));
}
}
}
startIndex=2
currentPartitionId=0, nextReplicaShift=2
currentPartitionId=1, nextReplicaShift=2
currentPartitionId=2, nextReplicaShift=2
currentPartitionId=3, nextReplicaShift=3
currentPartitionId=4, nextReplicaShift=3
currentPartitionId=5, nextReplicaShift=3
partitionId=0, partitionDistribution=[3, 1]
partitionId=1, partitionDistribution=[1, 2]
partitionId=2, partitionDistribution=[2, 3]
partitionId=3, partitionDistribution=[3, 2]
partitionId=4, partitionDistribution=[1, 3]
partitionId=5, partitionDistribution=[2, 1]
从运行结果来看,分配的是相对平均的。随机数,为了增加平均度。
考虑机架因素进行分区分配
主要方法assignReplicasToBrokersRackAware
代码
private def assignReplicasToBrokersRackAware(nPartitions: Int,
replicationFactor: Int,
brokerMetadatas: Seq[BrokerMetadata],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) =>
id -> rack
}.toMap
// 获取机架的个数
val numRacks = brokerRackMap.values.toSet.size
// 获取按机架排序后的broker列表
val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
val numBrokers = arrangedBrokerList.size
val ret = mutable.Map[Int, Seq[Int]]()
// 根据brokers长度随机产生一个数 作为开始下标
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
var currentPartitionId = math.max(0, startPartitionId)
// 代表副本之间的broker间隔数,为了将副本分片更均匀的分配到brokers中
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
for (_ <- 0 until nPartitions) {
// 分区编号大于0 且 分区编号能被brokers的长度整除时 副本间隔数+1
if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size
// 找到分区的第一个副本的broker编号
val leader = arrangedBrokerList(firstReplicaIndex)
val replicaBuffer = mutable.ArrayBuffer(leader)
// 找到已分配的broker的机架的信息
val racksWithReplicas = mutable.Set(brokerRackMap(leader))
val brokersWithReplicas = mutable.Set(leader)
var k = 0
// 继续为分区分配剩余的副本
for (_ <- 0 until replicationFactor - 1) {
var done = false
while (!done) {
val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size))
val rack = brokerRackMap(broker)
// Skip this broker if
// 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks
// that do not have any replica, or
// 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned
if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) {
replicaBuffer += broker
racksWithReplicas += rack
brokersWithReplicas += broker
done = true
}
k += 1
}
}
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
入参
- nPartitions:分区数
- replicationFactor:副本因子
- brokerMetadatas:集群中的brokers元数据信息
- fixedStartIndex:第一个副本起始下标 默认值-1
- startPartitionId:起始分区编号 默认值-1
出参
- ret:分区分配的结果
除了考虑未考虑机架信息的分配方式,还有以下条件
-
如果所有的机架和broker上都有此分区的副本了,那算出的broker会被分配,否者轮询下一个。
-
如果所有机架上没有此分区副本和此broker未被分配分区,那算出的broker会被分配
-
如果所有机架上没有此分区副本和所有的broker都分配过分区了,那算出的broker会被分配
-
如果所有机架上已经有此分区副本和此broker未分配过,那算出的broker会被分配
以上是关于kafka源码分析 创建Topic时 分区分配分析的主要内容,如果未能解决你的问题,请参考以下文章