10. spark源代码分析(基于yarn cluster模式)- 聊聊RDD和Depedency
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了10. spark源代码分析(基于yarn cluster模式)- 聊聊RDD和Depedency相关的知识,希望对你有一定的参考价值。
我们知道在Spark中RDD是一个特别重要的概念,可以说spark所有的逻辑都需要依赖RDD,这篇我们简单聊聊Spark中的RDD,spark中RDD的定义如下:
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))
}
每个RDD会包含如下5个属性:
- 该RDD的分区列表
- 每个数据文件计算函数
- 对其他RDD的依赖关系
- 分区选择器(可选)
- 每个数据文件的位置信息(可选)
为了更好的理解,这里我们用大家在HDFS上常见的HDFS实现:HadoopRDD
的实现来研究。
我们首先看看HadoopRDD
是怎么获取分区信息的:
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
SparkHadoopUtil.get.addCredentials(jobConf)
try {
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
val inputSplits = if (ignoreEmptySplits) {
allInputSplits.filter(_.getLength > 0)
} else {
allInputSplits
}
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
} catch {
case e: InvalidInputException if ignoreMissingFiles =>
Array.empty[Partition]
}
}
可以看到,这里HadoopRDD
就是获取了对应数据的底层文件信息,即hadoop中的块信息,然后一个块文件就是一个分区。这里HadoopRDD
对应的分区信息是封装成了HadoopPartition
:
trait Partition extends Serializable {
def index: Int
override def hashCode(): Int = index
override def equals(other: Any): Boolean = super.equals(other)
}
private[spark] class HadoopPartition(rddId: Int, override val index: Int, s: InputSplit)
extends Partition {
val inputSplit = new SerializableWritable[InputSplit](s)
override def hashCode(): Int = 31 * (31 + rddId) + index
override def equals(other: Any): Boolean = super.equals(other)
def getPipeEnvVars(): Map[String, String] = {
val envVars: Map[String, String] = if (inputSplit.value.isInstanceOf[FileSplit]) {
val is: FileSplit = inputSplit.value.asInstanceOf[FileSplit]
// since it's not removed yet
Map("map_input_file" -> is.getPath().toString(),
"mapreduce_map_input_file" -> is.getPath().toString())
} else {
Map()
}
envVars
}
}
主要包含了几个信息:
- RDD的id
- 分区的序号
- 分区对应的文件块
而对于依赖关系,由于这里不会设计到任何Shuffle,依赖关系不存在,在构造实例化的时候,传入的依赖列表为空。
如果我们对其做相关计算,比如reparation的话,在RDD中该方法实现如下:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
可以看到,这时候返回的是一个CoalescedRDD
,里面套着一个ShuffledRDD
,然后这个ShuffledRDD在套着一个MapPartitionsRDD
,我们先看看这个MapPartitionsRDD:
private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false,
isOrderSensitive: Boolean = false): RDD[U] = withScope {
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
preservesPartitioning = preservesPartitioning,
isOrderSensitive = isOrderSensitive)
}
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false,
isOrderSensitive: Boolean = false)
extends RDD[U](prev) {
....
}
注意如果父类RDD通过这种方式构造传入RDD[U](prev)
:
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))
其依赖关系为OneToOneDependency
,这里MapPartitionsRDD
的父RDD就是当前RDD,也就是我们分析的HadoopRDD
我们看看其上述5个属性
分区
其分区获取函数:
override def getPartitions: Array[Partition] = firstParent[T].partitions
protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
dependencies.head.rdd.asInstanceOf[RDD[U]]
}
可以看到,这里获取到当前依赖列表中第一个依赖对应的分区,也就是HadoopRDD的分区信息
计算函数
MapPartitionsRDD的计算函数是通过构造传入
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
// ------------------------------------------
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
可以看到,这里执行repartition
的时候,就是根据父RDD的每个分区,将分区序号按照hash进行随机打散,返回的就是每条数据和新的分区ID。
另外这里firstParent[T].iterator(split, context)
来读取父RDD的数据,这个实现我们可以看看:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
可以看到,这里就又回到了我们之前说的Task执行的时候,读取RDD的数据,这里我们实际上就是HadoopRDD
,读取到的数据就是一个一个数据块,然后对块数据进行处理,每个块里面的数据会按照hash分区打散到不同的分区中。
在看看ShuffledRDD
,其父RDD是上面的MapPartitionsRDD:
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
@transient var prev: RDD[_ <: Product2[K, V]],
part: Partitioner)
extends RDD[(K, C)](prev.context, Nil) {
...
}
其分区列表返回如下:
override def getPartitions: Array[Partition] = {
Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
}
这里的分区返回的是一个ShuffledRDDPartition
,里面包含了一个分区序号(根据传入的HashPartitioner
需要重分区的个数)
其返回依赖关系如下:
override def getDependencies: Seq[Dependency[_]] = {
val serializer = userSpecifiedSerializer.getOrElse {
val serializerManager = SparkEnv.get.serializerManager
if (mapSideCombine) {
serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[C]])
} else {
serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[V]])
}
}
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
}
可以看到,这里返回的就是一个ShuffleDependency
,之前我们研究Stage划分的时候只要遇到ShuffleDependency
则会划分一个新的Stage,结合我们之前的分析,在这里划分Stage,如果后续还要进行计算,则会生成一个ShuffleMapTask,作为Map端将数据写入。
而这里读取数据的时候会调用`ShuffleRDD.compute’方法:
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
.read()
.asInstanceOf[Iterator[(K, C)]]
}
这里就是我们在分析Task执行时候的方法了,每个执行节点会拿到自己节点应该要处理的数据,这里就是拿到对应的HadoopRDD的对应块数据文件,然后根据分区器,返回每条数据对应的分区和数据,然后每个执行节点根据生成的数据将同一个分区的数据(重分区后)放到一起然后写入本地临时文件中。
我们在看下最外围的CoalescedRDD
:
private[spark] class CoalescedRDD[T: ClassTag](
@transient var prev: RDD[T],
maxPartitions: Int,
partitionCoalescer: Option[PartitionCoalescer] = None)
extends RDD[T](prev.context, Nil) {
...
}
通过之前的分析我们知道,在这里如果我们后续没有其他处理,则会生成一个ResultStage
对应ResultTask
去执行用户传入的方法。
这里CoalescedRDD
的依赖:
override def getDependencies: Seq[Dependency[_]] = {
Seq(new NarrowDependency(prev) {
def getParents(id: Int): Seq[Int] =
partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices
})
}
可以看到,返回的是一个NarrowDependency
,如果后续还有其他处理则还会根据该RDD生成新的RDD,并且遇到ShuffleDependency才划分Stage。
这里的计算函数compute
实现如下:
override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>
firstParent[T].iterator(parentPartition, context)
}
}
可以看到,这里就是调用了构造是传入ShuffledRDD,用这个ShuffledRDD
来读取数据,至于这个怎么实现的,可以看之前的源码分析。
以上是关于10. spark源代码分析(基于yarn cluster模式)- 聊聊RDD和Depedency的主要内容,如果未能解决你的问题,请参考以下文章
9. spark源代码分析(基于yarn cluster模式)- Task执行,Reduce端读取shuffle数据文件
3. spark-2.4.6源码分析(基于yarn cluster模式)-YARN ApplicationMaster启动
2. spark-2.4.6源码分析(基于yarn cluster模式)-YARN client启动,提交ApplicationMaster
7. spark源码分析(基于yarn cluster模式)- Task划分提交
3. spark-2.4.6源码分析(基于yarn cluster模式)-YARN contaienr启动-CoarseGrainedExecutorBackend