大数据:Spark ShuffleShuffleWrite:Executor如何将Shuffle的结果进行归并写到数据文件中去

Posted raintungli

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据:Spark ShuffleShuffleWrite:Executor如何将Shuffle的结果进行归并写到数据文件中去相关的知识,希望对你有一定的参考价值。

1. 前序

关于Executor如何运行算子,请参考前面博文:大数据:Spark Core(四)用LogQuery的例子来说明Executor是如何运算RDD的算子,当Executor进行reduce运算的时候,生成运算结果的临时Shuffle数据,并保存在磁盘中,被最后的Action算子调用,而这个阶段就是在ShuffleMapTask里执行的。

前面博客中也提到了,用什么ShuffleWrite是由ShuffleHandler来决定的,在这篇博客里主要介绍最常见的SortShuffleWrite的核心算法ExternalSorter.

2. 结构AppendOnlyMap

在前面博客中介绍了SortShuffleWriter调用ExternalSorter.insertAll进行数据插入和数据合并的,ExternalSorted里使用了PartitionedAppendOnlyMap作为数据的存储方式

先来看PartitionedAppendOnlyMap的结构



虽然名字为Map,但是在这里和常见的Map的结构并不太一样,里面并没有使用链表结果保存相同的hash值的key,当插入的key的hashcode相同的时但key不相同,会通过i的叠加一直找到数组里空闲的位置。

这里有几个注意点:

  • Key 注意这里的Key并不是通过Map里拆分的Key, 而是Tuple2(PartitionId,Key),由分片的段和key组合的联合key
  • 如何计算PartitionId? 这是由Partitioner来决定的

2.1 Partitioner

Partitioner的方法

abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

通过调用getPartition方法找到对应的partition相应的块,而常用的是HashPartitioner

 def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

计算 key的hashCode,进行总的分片数求余,分配到对应的片区

3. Spill

在大数据的情况下进行归并,由于合并的数据量非常大,仅仅使用AppendOnlyMap进行数据的归并内存显然是不足够的,在这种情况下需要对讲内存里的已经归并的数据刷到磁盘上避免OOM的风险。

控制Spill到磁盘的阀值

  • 内存:虽然Java的堆内存管理是由JVM虚拟机管控,但是Spark自己实现了一个简单的但不精准的内存管理,内存的申请在TaskMemoryManager里进行管理
 if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
      // Claim up to double our current memory from the shuffle memory pool
      val amountToRequest = 2 * currentMemory - myMemoryThreshold
      val granted = acquireMemory(amountToRequest)
      myMemoryThreshold += granted
      // If we were granted too little memory to grow further (either tryToAcquire returned 0,
      // or we already had more memory than myMemoryThreshold), spill the current collection
      shouldSpill = currentMemory >= myMemoryThreshold
    }
在每添加32个元素的时候,检查一下当前的内存状况,currentMemory是Map当前大概使用的内存,myMemoryThreshold是可以使用的内存址,初始的时候受参数控制:

以上是关于大数据:Spark ShuffleShuffleWrite:Executor如何将Shuffle的结果进行归并写到数据文件中去的主要内容,如果未能解决你的问题,请参考以下文章

大数据入门核心技术-Spark执行Spark任务的两种方式:spark-submit和spark-shell

大数据(spark sql 和 spark dataframes 连接)

大数据之Spark:Spark 基础

大数据中的Spark指的是啥?

数分-理论-大数据7-Spark

Spark一出,Hadoop必死?Spark才是大数据的未来?