大数据:Spark ShuffleShuffleWrite:Executor如何将Shuffle的结果进行归并写到数据文件中去
Posted raintungli
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据:Spark ShuffleShuffleWrite:Executor如何将Shuffle的结果进行归并写到数据文件中去相关的知识,希望对你有一定的参考价值。
1. 前序
关于Executor如何运行算子,请参考前面博文:大数据:Spark Core(四)用LogQuery的例子来说明Executor是如何运算RDD的算子,当Executor进行reduce运算的时候,生成运算结果的临时Shuffle数据,并保存在磁盘中,被最后的Action算子调用,而这个阶段就是在ShuffleMapTask里执行的。
前面博客中也提到了,用什么ShuffleWrite是由ShuffleHandler来决定的,在这篇博客里主要介绍最常见的SortShuffleWrite的核心算法ExternalSorter.
2. 结构AppendOnlyMap
在前面博客中介绍了SortShuffleWriter调用ExternalSorter.insertAll进行数据插入和数据合并的,ExternalSorted里使用了PartitionedAppendOnlyMap作为数据的存储方式
先来看PartitionedAppendOnlyMap的结构
虽然名字为Map,但是在这里和常见的Map的结构并不太一样,里面并没有使用链表结果保存相同的hash值的key,当插入的key的hashcode相同的时但key不相同,会通过i的叠加一直找到数组里空闲的位置。
这里有几个注意点:
- Key 注意这里的Key并不是通过Map里拆分的Key, 而是Tuple2(PartitionId,Key),由分片的段和key组合的联合key
- 如何计算PartitionId? 这是由Partitioner来决定的
2.1 Partitioner
Partitioner的方法
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
通过调用getPartition方法找到对应的partition相应的块,而常用的是HashPartitioner
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
计算 key的hashCode,进行总的分片数求余,分配到对应的片区
3. Spill
在大数据的情况下进行归并,由于合并的数据量非常大,仅仅使用AppendOnlyMap进行数据的归并内存显然是不足够的,在这种情况下需要对讲内存里的已经归并的数据刷到磁盘上避免OOM的风险。
控制Spill到磁盘的阀值
- 内存:虽然Java的堆内存管理是由JVM虚拟机管控,但是Spark自己实现了一个简单的但不精准的内存管理,内存的申请在TaskMemoryManager里进行管理
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
val amountToRequest = 2 * currentMemory - myMemoryThreshold
val granted = acquireMemory(amountToRequest)
myMemoryThreshold += granted
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
shouldSpill = currentMemory >= myMemoryThreshold
}
在每添加32个元素的时候,检查一下当前的内存状况,currentMemory是Map当前大概使用的内存,myMemoryThreshold是可以使用的内存址,初始的时候受参数控制:
以上是关于大数据:Spark ShuffleShuffleWrite:Executor如何将Shuffle的结果进行归并写到数据文件中去的主要内容,如果未能解决你的问题,请参考以下文章
大数据入门核心技术-Spark执行Spark任务的两种方式:spark-submit和spark-shell