0x00 摘要
Groupby和reduce是大数据领域常见的算子,但是很多同学应该对其背后机制不甚了解。本文将从源码入手,为大家解析Flink中Groupby和reduce的原理,看看他们在背后做了什么。
0x01 问题和概括
1.1 问题
探究的原因是想到了几个问题 :
- groupby的算子会对数据进行排序嘛。
- groupby和reduce过程中究竟有几次排序。
- 如果有多个groupby task,什么机制保证所有这些grouby task的输出中,同样的key都分配给同一个reducer。
- groupby和reduce时候,有没有Rebalance 重新分配。
- reduce算子会不会重新划分task。
- reduce算子有没有可能和前后的其他算子组成Operator Chain。
1.2 概括
为了便于大家理解,我们先总结下,对于一个Groupby + Reduce的操作,Flink做了如下处理:
- Group其实没有真实对应的算子,它只是在在reduce过程之前的一个中间步骤或者辅助步骤。
- 在Flink生成批处理执行计划后,有意义的结果是Reduce算子。
- 为了更好的reduce,Flink在reduce之前大量使用了Combine操作。Combine可以理解为是在map端的reduce的操作,对单个map任务的输出结果数据进行合并的操作。
- 在Flink生成批处理优化计划(Optimized Plan)之后,会把reduce分割成两段,一段是SORTED_PARTIAL_REDUCE,一段是SORTED_REDUCE。
- SORTED_PARTIAL_REDUCE就是Combine。
- Flink生成JobGraph之后,Flink形成了一个Operator Chain:Reduce(SORTED_PARTIAL_REDUCE)和其上游合并在一起。
- Flink用Partitioner来保证多个 grouby task 的输出中同样的key都分配给同一个reducer。
- groupby和reduce过程中至少有三次排序:
- combine
- sort + merge
- reduce
这样之前的疑问就基本得到了解释。
0x02 背景概念
2.1 MapReduce细分
MapReduce是一种编程模型,用于大规模数据集的并行运算。概念 "Map(映射)"和"Reduce(归约)" 是它们的主要思想,其是从函数式编程语言,矢量编程语言里借来的特性。
我们目前使用的Flink,Spark都出自于MapReduce,所以我们有必有追根溯源,看看MapReduce是如何区分各个阶段的。
2.2 MapReduce细分
如果把MapReduce细分,可以分为一下几大过程:
- Input-Split(输入分片):此过程是将从HDFS上读取的文件分片,然后送给Map端。有多少分片就有多少Mapper,一般分片的大小和HDFS中的块大小一致。
- Shuffle-Spill(溢写):每个Map任务都有一个环形缓冲区。一旦缓冲区达到阈值80%,一个后台线程便开始把内容“溢写”-“spill”到磁盘。在溢写过程中,map将继续输出到剩余的20%空间中,互不影响,如果缓冲区被填满map会被堵塞直到写磁盘完成。
- Shuffle-Partition(分区):由于每个Map可能处理的数据量不同,所以到达reduce有可能会导致数据倾斜。分区可以帮助我们解决这一问题,在shuffle过程中会按照默认key的哈希码对分区数量取余,reduce便根据分区号来拉取对应的数据,达到数据均衡。分区数量对应Reduce个数。
- Shuffle-Sort(排序):在分区后,会对此分区的数据进行内排序,排序过程会穿插在整个MapReduce中,在很多地方都存在。
- Shuffle-Group(分组):分组过程会把key相同的value分配到一个组中,wordcount程序就利用了分组这一过程。
- Shuffle-Combiner(组合):这一过程我们可以理解为一个小的Reduce阶段,当数据量大的时候可以在map过程中执行一次combine,这样就相当于在map阶段执行了一次reduce。由于reduce和map在不同的节点上运行,所以reduce需要远程拉取数据,combine就可以有效降低reduce拉取数据的量,减少网络负荷(这一过程默认是不开启的,在如求平均值的mapreduce程序中不要使用combine,因为会影响结果)。
- Compress(压缩):在缓冲区溢写磁盘的时候,可以对数据进行压缩,节约磁盘空间,同样减少给reducer传递的数据量。
- Reduce-Merge(合并):reduce端会拉取各个map输出结果对应的分区文件,这样reduce端就会有很多文件,所以在此阶段,reduce再次将它们合并/排序再送入reduce执行。
- Output(输出):在reduce阶段,对已排序输出中的每个键调用reduce函数。此阶段的输出直接写到输出文件系统,一般为HDFS。
2.3 Combine
Combine是我们需要特殊注意的。在mapreduce中,map多,reduce少。在reduce中由于数据量比较多,所以我们干脆在map阶段中先把自己map里面的数据归类,这样到了reduce的时候就减轻了压力。
Combine可以理解为是在map端的reduce的操作,对单个map任务的输出结果数据进行合并的操作。combine是对一个map的,而reduce合并的对象是对于多个map。
map函数操作所产生的键值对会作为combine函数的输入,经combine函数处理后再送到reduce函数进行处理,减少了写入磁盘的数据量,同时也减少了网络中键值对的传输量。在Map端,用户自定义实现的Combine优化机制类Combiner在执行Map端任务的节点本身运行,相当于对map函数的输出做了一次reduce。
集群上的可用带宽往往是有限的,产生的中间临时数据量很大时就会出现性能瓶颈,因此应该尽量避免Map端任务和Reduce端任务之间大量的数据传输。使用Combine机制的意义就在于使Map端输出更紧凑,使得写到本地磁盘和传给Reduce端的数据更少。
2.4 Partition
Partition是分割map每个节点的结果,按照key分别映射给不同的reduce,mapreduce使用哈希HashPartitioner帮我们归类了。这个我们也可以自定义。
这里其实可以理解归类。我们对于错综复杂的数据归类。比如在动物园里有牛羊鸡鸭鹅,他们都是混在一起的,但是到了晚上他们就各自牛回牛棚,羊回羊圈,鸡回鸡窝。partition的作用就是把这些数据归类。只不过是在写程序的时候,
在经过mapper的运行后,我们得知mapper的输出是这样一个key/value对: key是“aaa”, value是数值1。因为当前map端只做加1的操作,在reduce task里才去合并结果集。假如我们知道这个job有3个reduce task,到底当前的“aaa”应该交由哪个reduce task去做呢,是需要立刻决定的。
MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce task的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
在我们的例子中,假定 “aaa”经过Partitioner后返回0,也就是这对值应当交由第一个reducer来处理。
2.5 Shuffle
shuffle就是map和reduce之间的过程,包含了两端的combine和partition。它比较难以理解,因为我们摸不着,看不到它。它属于mapreduce的框架,编程的时候,我们用不到它。
Shuffle的大致范围就是:怎样把map task的输出结果有效地传送到reduce端。也可以这样理解, Shuffle描述着数据从map task输出到reduce task输入的这段过程。
2.6 Reducer
简单地说,reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge,最终形成一个文件作为reduce task的输入文件。
0x03 代码
我们以Flink的KMeans算法作为样例,具体摘要如下:
public class WordCountExampleReduce {
DataStream ds;
public static void main(String[] args) throws Exception {
//构建环境
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
//通过字符串构建数据集
DataSet<String> text = env.fromElements(
"Who‘s there?",
"I think I hear them. Stand, ho! Who‘s there?");
//分割字符串、按照key进行分组、统计相同的key个数
DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.groupBy(0)
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return new Tuple2(value1.f0, value1.f1 + value2.f1);
}
});
//打印
wordCounts.print();
}
//分割字符串的方法
public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
输出是:
(hear,1)
(ho!,1)
(them.,1)
(I,2)
(Stand,,1)
(Who‘s,2)
(there?,2)
(think,1)
0x04 从Flink JAVA API入手挖掘
首先,我们从Flink基本JAVA API来入手开始挖掘。
4.1 GroupBy是个辅助概念
4.1.1 Grouping
我们需要留意的是:GroupBy并没有对应的Operator。GroupBy只是生成DataSet转换的一个中间步骤或者辅助步骤。
GroupBy功能的基类是Grouping,其只是DataSet转换的一个中间步骤。其几个主要成员是:
- 对应的输入数据DataSet
- 分组所基于的keys
- 用户自定义的Partitioner
// Grouping is an intermediate step for a transformation on a grouped DataSet.
public abstract class Grouping<T> {
protected final DataSet<T> inputDataSet;
protected final Keys<T> keys;
protected Partitioner<?> customPartitioner;
}
Grouping并没有任何业务相关的API,具体API都是在其派生类中,比如UnsortedGrouping。
4.1.2 UnsortedGrouping
我们代码中对应的就是UnsortedGrouping类。我们看到它提供了很多业务API,比如:sum,max,min,reduce,aggregate,reduceGroup,combineGroup.....
回到我们的示例,groupBy做了如下操作
- 首先,groupBy返回的就是一个UnsortedGrouping,这个UnsortedGrouping是用来转换DataSet。
- 其次,
.groupBy(0).reduce(new CentroidAccumulator())
返回的是ReduceOperator。这就对应了前面我们提到的,groupBy只是中间步骤,reduce才能返回一个Operator。
public class UnsortedGrouping<T> extends Grouping<T> {
// groupBy返回一个UnsortedGrouping
public UnsortedGrouping<T> groupBy(int... fields) {
return new UnsortedGrouping<>(this, new Keys.ExpressionKeys<>(fields, getType()));
}
// reduce返回一个ReduceOperator
public ReduceOperator<T> reduce(ReduceFunction<T> reducer) {
return new ReduceOperator<T>(this, inputDataSet.clean(reducer), Utils.getCallLocationName());
}
}
4.2 reduce才是算子
对于业务来说,reduce才是真正有意义的逻辑算子。
从前文的函数调用和ReduceOperator定义可以看出,.groupBy(0).reduce()
的调用结果是生成一个ReduceOperator,而 UnsortedGrouping 被设置为 ReduceOperator 的 grouper 成员变量,作为辅助操作。
public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOperator<IN>> {
private final ReduceFunction<IN> function;
private final Grouping<IN> grouper; // UnsortedGrouping被设置在这里,后续reduce操作中会用到。
public ReduceOperator(Grouping<IN> input, ReduceFunction<IN> function,
String defaultName) {
this.function = function;
this.grouper = input; // UnsortedGrouping被设置在这里,后续reduce操作中会用到。
this.hint = CombineHint.OPTIMIZER_CHOOSES; // 优化时候会用到。
}
}
让我们顺着Flink程序执行阶段继续看看系统都做了些什么。
0x05 批处理执行计划(Plan)
程序执行的第一步是:当程序运行时候,首先会根据java API的结果来生成执行plan。
public JobClient executeAsync(String jobName) throws Exception {
final Plan plan = createProgramPlan(jobName);
}
其中重要的函数是translateToDataFlow,因为在translateToDataFlow方法中,会从批处理Java API模块中operators包往核心模块中operators包的转换。
对于我们的示例程序,在生成 Graph时,translateToDataFlow会生成一个 SingleInputOperator,为后续runtime使用。下面是代码缩减版。
protected org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> translateToDataFlow(Operator<IN> input) {
......
// UnsortedGrouping中的keys被取出,
else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
// reduce with field positions
ReduceOperatorBase<IN, ReduceFunction<IN>> po =
new ReduceOperatorBase<>(function, operatorInfo, logicalKeyPositions, name);
po.setCustomPartitioner(grouper.getCustomPartitioner());
po.setInput(input);
po.setParallelism(getParallelism()); // 没有并行度的变化
return po;//translateToDataFlow会生成一个 SingleInputOperator,为后续runtime使用
}
}
}
我们代码最终生成的执行计划如下,我们可以看出来,执行计划基本符合我们的估计:简单的从输入到输出。中间有意义的算子其实只有Reduce。
GenericDataSourceBase ——> FlatMapOperatorBase ——> ReduceOperatorBase ——> GenericDataSinkBase
具体在代码中体现如下是:
plan = {Plan@1296}
sinks = {ArrayList@1309} size = 1
0 = {GenericDataSinkBase@1313} "collect()"
formatWrapper = {UserCodeObjectWrapper@1315}
input = {ReduceOperatorBase@1316} "ReduceOperatorBase - Reduce at main(WordCountExampleReduceCsv.java:25)"
hint = {ReduceOperatorBase$CombineHint@1325} "OPTIMIZER_CHOOSES"
customPartitioner = null
input = {FlatMapOperatorBase@1326} "FlatMapOperatorBase - FlatMap at main(WordCountExampleReduceCsv.java:23)"
input = {GenericDataSourceBase@1339} "at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)"
0x06 批处理优化计划(Optimized Plan)
程序执行的第二步是:Flink对于Plan会继续优化,生成Optimized Plan。其核心代码位于PlanTranslator.compilePlan 函数,这里得到了Optimized Plan。
这个编译的过程不作任何决策与假设,也就是说作业最终如何被执行早已被优化器确定,而编译也是在此基础上做确定性的映射。所以我们将集中精力看如何优化plan。
private JobGraph compilePlan(Plan plan, Configuration optimizerConfiguration) {
Optimizer optimizer = new Optimizer(new DataStatistics(), optimizerConfiguration);
OptimizedPlan optimizedPlan = optimizer.compile(plan);
JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(optimizerConfiguration);
return jobGraphGenerator.compileJobGraph(optimizedPlan, plan.getJobId());
}
在内部调用plan的accept方法遍历它。accept会挨个在每个sink上调用accept。对于每个sink会先preVisit,然后 postVisit。
这里优化时候有几个注意点:
-
在 GraphCreatingVisitor.preVisit 中,当发现Operator是 ReduceOperatorBase 类型的时候,会建立ReduceNode。
else if (c instanceof ReduceOperatorBase) { n = new ReduceNode((ReduceOperatorBase<?, ?>) c); }
-
ReduceNode是Reducer Operator的Optimizer表示。
public class ReduceNode extends SingleInputNode { private final List<OperatorDescriptorSingle> possibleProperties; private ReduceNode preReduceUtilityNode; }
-
生成ReduceNode时候,会根据之前提到的 hint 来决定
combinerStrategy = DriverStrategy.SORTED_PARTIAL_REDUCE;
public ReduceNode(ReduceOperatorBase<?, ?> operator) { DriverStrategy combinerStrategy; switch(operator.getCombineHint()) { case OPTIMIZER_CHOOSES: combinerStrategy = DriverStrategy.SORTED_PARTIAL_REDUCE; break; } }
生成的优化执行计划如下,我们可以看到,这时候设置了并行度,也把reduce分割成两段,一段是SORTED_PARTIAL_REDUCE,一段是SORTED_REDUCE。
Data Source ——> FlatMap ——> Reduce(SORTED_PARTIAL_REDUCE) ——> Reduce(SORTED_REDUCE) ——> Data Sink
具体在代码中体现如下是:
optimizedPlan = {OptimizedPlan@1506}
allNodes = {HashSet@1510} size = 5
0 = {SourcePlanNode@1512} "Data Source "at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
parallelism = 4
1 = {SingleInputPlanNode@1513} "FlatMap "FlatMap at main(WordCountExampleReduceCsv.java:23)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
parallelism = 4
2 = {SingleInputPlanNode@1514} "Reduce "Reduce at main(WordCountExampleReduceCsv.java:25)" : SORTED_REDUCE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
parallelism = 4
3 = {SinkPlanNode@1515} "Data Sink "collect()" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
parallelism = 4
4 = {SingleInputPlanNode@1516} "Reduce "Reduce at main(WordCountExampleReduceCsv.java:25)" : SORTED_PARTIAL_REDUCE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
parallelism = 4
0x07 JobGraph
程序执行的第三步是:建立JobGraph。LocalExecutor.execute中会生成JobGraph。Optimized Plan 经过优化后生成了 JobGraph, JobGraph是提交给 JobManager 的数据结构。
主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
JobGraph是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一。
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
final JobGraph jobGraph = getJobGraph(pipeline, configuration);
}
我们可以看出来,这一步形成了一个Operator Chain:
CHAIN DataSource -> FlatMap -> Combine (Reduce)
于是我们看到,Reduce(SORTED_PARTIAL_REDUCE)和其上游合并在一起。
具体在程序中打印出来:
jobGraph = {JobGraph@1739} "JobGraph(jobId: 30421d78d7eedee6be2c5de39d416eb7)"
taskVertices = {LinkedHashMap@1742} size = 3
{JobVertexID@1762} "e2c43ec0df647ea6735b2421fb7330fb" -> {InputOutputFormatVertex@1763} "CHAIN DataSource (at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCountExampleReduceCsv.java:23)) -> Combine (Reduce at main(WordCountExampleReduceCsv.java:25)) (org.apache.flink.runtime.operators.DataSourceTask)"
{JobVertexID@1764} "2de11f497e827e48dda1d63b458dead7" -> {JobVertex@1765} "Reduce (Reduce at main(WordCountExampleReduceCsv.java:25)) (org.apache.flink.runtime.operators.BatchTask)"
{JobVertexID@1766} "2bee17f2c86aa1e9439e3dedea58007b" -> {InputOutputFormatVertex@1767} "DataSink (collect()) (org.apache.flink.runtime.operators.DataSinkTask)"
0x08 Runtime
Job提交之后,就是程序正式运行了。这里实际上涉及到了三次排序,
- 一次是在FlatMap发送时候调用到了ChainedReduceCombineDriver.sortAndCombine。这部分对应了我们之前提到的MapReduce中的Combine和Partition。
- 一次是在 ReduceDriver 所在的 BatchTask中,由UnilateralSortMerger完成了sort & merge操作。
- 一次是在ReduceDriver,这里做了最后的reducer排序。
8.1 FlatMap
这里是第一次排序。
当一批数据处理完成之后,在ChainedFlatMapDriver中调用到close函数进行发送数据给下游。
public void close() {
this.outputCollector.close();
}
Operator Chain会调用到ChainedReduceCombineDriver.close
public void close() {
// send the final batch
try {
switch (strategy) {
case SORTED_PARTIAL_REDUCE:
sortAndCombine(); // 我们是在这里
break;
case HASHED_PARTIAL_REDUCE:
reduceFacade.emit();
break;
}
} catch (Exception ex2) {
throw new ExceptionInChainedStubException(taskName, ex2);
}
outputCollector.close();
dispose(false);
}
8.1.1 Combine
sortAndCombine中先排序,然后做combine,最后会不断发送数据。
private void sortAndCombine() throws Exception {
final InMemorySorter<T> sorter = this.sorter;
if (!sorter.isEmpty()) {
sortAlgo.sort(sorter); // 这里会先排序
final TypeSerializer<T> serializer = this.serializer;
final TypeComparator<T> comparator = this.comparator;
final ReduceFunction<T> function = this.reducer;
final Collector<T> output = this.outputCollector;
final MutableObjectIterator<T> input = sorter.getIterator();
if (objectReuseEnabled) {
......
} else {
T value = input.next();
// 这里就是combine
// iterate over key groups
while (running && value != null) {
comparator.setReference(value);
T res = value;
// iterate within a key group
while ((value = input.next()) != null) {
if (comparator.equalToReference(value)) {
// same group, reduce
res = function.reduce(res, value);
} else {
// new key group
break;
}
}
output.collect(res); //发送数据
}
}
}
}
8.1.2 Partition
最后发送给哪个下游,是由OutputEmitter.selectChannel决定的。有如下几种决定方式:
hash-partitioning, broadcasting, round-robin, custom partition functions。这里采用的是PARTITION_HASH。
每个task都会把同样字符串统计结果发送给同样的下游ReduceDriver。这就保证了下游Reducer一定不会出现统计出错。
public final int selectChannel(SerializationDelegate<T> record) {
switch (strategy) {
...
case PARTITION_HASH:
return hashPartitionDefault(record.getInstance(), numberOfChannels);
...
}
}
private int hashPartitionDefault(T record, int numberOfChannels) {
int hash = this.comparator.hash(record);
return MathUtils.murmurHash(hash) % numberOfChannels;
}
具体调用栈:
hash:50, TupleComparator (org.apache.flink.api.java.typeutils.runtime)
hash:30, TupleComparator (org.apache.flink.api.java.typeutils.runtime)
hashPartitionDefault:187, OutputEmitter (org.apache.flink.runtime.operators.shipping)
selectChannel:147, OutputEmitter (org.apache.flink.runtime.operators.shipping)
selectChannel:36, OutputEmitter (org.apache.flink.runtime.operators.shipping)
emit:60, ChannelSelectorRecordWriter (org.apache.flink.runtime.io.network.api.writer)
collect:65, OutputCollector (org.apache.flink.runtime.operators.shipping)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
sortAndCombine:254, ChainedReduceCombineDriver (org.apache.flink.runtime.operators.chaining)
close:266, ChainedReduceCombineDriver (org.apache.flink.runtime.operators.chaining)
close:40, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
close:88, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining)
invoke:215, DataSourceTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)
8.2 UnilateralSortMerger
这里是第二次排序。
在 BatchTask中,会先Sort, Merge输入,然后才会交由Reduce来具体完成过。sort & merge操作具体是在UnilateralSortMerger类中完成的。
getIterator:646, UnilateralSortMerger (org.apache.flink.runtime.operators.sort)
getInput:1110, BatchTask (org.apache.flink.runtime.operators)
prepare:95, ReduceDriver (org.apache.flink.runtime.operators)
run:474, BatchTask (org.apache.flink.runtime.operators)
invoke:369, BatchTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)
UnilateralSortMerger是一个full fledged sorter,它实现了一个多路merge sort。其内部的逻辑被划分到三个线程上(read, sort, spill),这三个线程彼此之间通过一系列blocking queues来构成了一个闭环。
其内存通过MemoryManager分配,所以这个组件不会超过给其分配的内存。
该类主要变量摘录如下:
public class UnilateralSortMerger<E> implements Sorter<E> {
// ------------------------------------------------------------------------
// Threads
// ------------------------------------------------------------------------
/** The thread that reads the input channels into buffers and passes them on to the merger. */
private final ThreadBase<E> readThread;
/** The thread that merges the buffer handed from the reading thread. */
private final ThreadBase<E> sortThread;
/** The thread that handles spilling to secondary storage. */
private final ThreadBase<E> spillThread;
// ------------------------------------------------------------------------
// Memory
// ------------------------------------------------------------------------
/** The memory segments used first for sorting and later for reading/pre-fetching
* during the external merge. */
protected final List<MemorySegment> sortReadMemory;
/** The memory segments used to stage data to be written. */
protected final List<MemorySegment> writeMemory;
/** The memory manager through which memory is allocated and released. */
protected final MemoryManager memoryManager;
// ------------------------------------------------------------------------
// Miscellaneous Fields
// ------------------------------------------------------------------------
/**
* Collection of all currently open channels, to be closed and deleted during cleanup.
*/
private final HashSet<FileIOChannel> openChannels;
/**
* The monitor which guards the iterator field.
*/
protected final Object iteratorLock = new Object();
/**
* The iterator to be returned by the sort-merger. This variable is null, while receiving and merging is still in
* progress and it will be set once we have < merge factor sorted sub-streams that will then be streamed sorted.
*/
protected volatile MutableObjectIterator<E> iterator; // 如果大家经常调试,就会发现driver中的input都是这个兄弟。
private final Collection<InMemorySorter<?>> inMemorySorters;
}
8.2.1 三种线程
ReadingThread:这种线程持续读取输入,然后把数据放入到一个待排序的buffer中。The thread that consumes the input data and puts it into a buffer that will be sorted.
SortingThread : 这种线程对于上游填充好的buffer进行排序。The thread that sorts filled buffers.
SpillingThread:这种线程进行归并操作。The thread that handles the spilling of intermediate results and sets up the merging. It also merges the channels until sufficiently few channels remain to perform the final streamed merge.
8.2.2 MutableObjectIterator
UnilateralSortMerger有一个特殊变量:
protected volatile MutableObjectIterator<E> iterator;
这个变量就是最终sort-merger的输出。如果大家调试过算子,就会发现这个变量就是具体算子的输入input类型。最终算子的输入就是来自于此。
8.3 ReduceDriver
这里是第三次排序,我们可以看出来reduce是怎么和groupby一起运作的。
- 针对
.groupBy(0)
,ReduceDriver就是单纯获取输入的第一个数值T value = input.next();
- 后续代码中有嵌套的两个while,分别是 :遍历各种key,以及某一key中reduce。
- 遍历 group keys的时候,把value赋于比较算子comparator(这个算子概念不是Flink算子,就是为了说明逻辑概念)
comparator.setReference(value);
因为groubBy只是指定按照第一个位置比较,没有指定具体key数值,所以这个value就是key了。此处记为while (1)
,代码中有注解。 - 从输入中读取后续的数值value,如果下一个数值是同一个key,就reduce;如果下一个数值不是同一个key,就跳出循环。放弃比较,把reduce结果输出。此处记为
while (2)
- 跳出
while (2)
之后,代码依然在while (1)
,此时value是新值,所以继续在while (1)
中运行 。把value继续赋于比较算子comparator.setReference(value);
,于是进行新的key比较。
public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
@Override
public void run() throws Exception {
final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
// cache references on the stack
final MutableObjectIterator<T> input = this.input;
final TypeSerializer<T> serializer = this.serializer;
final TypeComparator<T> comparator = this.comparator;
final ReduceFunction<T> function = this.taskContext.getStub();
final Collector<T> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
......
} else {
// 针对 `.groupBy(0)`,ReduceDriver就是单纯获取输入的第一个数值 `T value = input.next();`
T value = input.next();
// while (1)
// iterate over key groups
while (this.running && value != null) {
numRecordsIn.inc();
// 把value赋于比较算子,这个value就是key了。
comparator.setReference(value);
T res = value;
// while (2)
// iterate within a key group,循环比较这个key
while ((value = input.next()) != null) {
numRecordsIn.inc();
if (comparator.equalToReference(value)) {
// same group, reduce,如果下一个数值是同一个key,就reduce
res = function.reduce(res, value);
} else {
// new key group,如果下一个数值不是同一个key,就跳出循环,放弃比较。
break;
}
}
// 把reduce结果输出
output.collect(res);
}
}
}
}
0x09 参考
mapreduce里的shuffle 里的 sort merge 和combine
实战录 | Hadoop Mapreduce shuffle之Combine探讨