大数据:Spark Core用LogQuery的例子来说明Executor是如何运算RDD的算子

Posted raintungli

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据:Spark Core用LogQuery的例子来说明Executor是如何运算RDD的算子相关的知识,希望对你有一定的参考价值。

1. 究竟是怎么运行的?

很多的博客里大量的讲了什么是RDD, Dependency, Shuffle... 但是究竟那些Executor是怎么运行你提交的代码段的? 下面是一个日志分析的例子,来自Spark的example
  def main(args: Array[String]) 
    val sparkConf = new SparkConf().setAppName("Log Query")
    val sc = new SparkContext(sparkConf)
    val dataSet =
      if (args.length == 1) sc.textFile(args(0)) else sc.parallelize(exampleApacheLogs)
    // scalastyle:off
    val apacheLogRegex =
      """^([\\d.]+) (\\S+) (\\S+) \\[([\\w\\d:/]+\\s[+\\-]\\d4)\\] "(.+?)" (\\d3) ([\\d\\-]+) "([^"]+)" "([^"]+)".*""".r
    // scalastyle:on
    /** Tracks the total query count and number of aggregate bytes for a particular group. */
    class Stats(val count: Int, val numBytes: Int) extends Serializable 
      def merge(other: Stats): Stats = 
        new Stats(count + other.count, numBytes + other.numBytes)
      
      override def toString: String = "bytes=%s\\tn=%s".format(numBytes, count)
    

    def extractKey(line: String): (String, String, String) = 
      apacheLogRegex.findFirstIn(line) match 
        case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>
          if (user != "\\"-\\"") (ip, user, query)
          else (null, null, null)
        case _ => (null, null, null)
      
    

    def extractStats(line: String): Stats = 
      apacheLogRegex.findFirstIn(line) match 
        case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>
          new Stats(1, bytes.toInt)
        case _ => new Stats(1, 0)
      
    

    dataSet.map(line => (extractKey(line), extractStats(line)))
      .reduceByKey((c, d) => c.merge(d))
      .collect().foreach
        case (user, query) => println("%s\\t%s".format(user, query))

    sc.stop()
  

在map的RDD算子里,自定义了extractKey, extractStats函数,而在reduceByKey的RDD又自定义了一个相同的key的merge函数 这些函数是如何被传递到executor里并且进行运算的呢?

1.1 RDD,ShuffleDependency

在前面的博文( Executor上是如何launch task的)中,已经讨论过如何获取到Driver的RDD, Dependency, 那么RDD如何能够运行这些函数呢?
Execute获取的DAG里提交的ShuffleMapTask是在TaskDecription中serializedTask中反序列化出来 ShuffleMapTask的RunTask的方法
 override def runTask(context: TaskContext): MapStatus = 
    // Deserialize the RDD using the broadcast variable.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) 
      threadMXBean.getCurrentThreadCpuTime
     else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) 
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
     else 0L

    var writer: ShuffleWriter[Any, Any] = null
    try 
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      writer.stop(success = true).get
     catch 
      case e: Exception =>
        try 
          if (writer != null) 
            writer.stop(success = false)
          
         catch 
          case e: Exception =>
            log.debug("Could not stop writer", e)
        
        throw e
    
  
看到了通过shufflewrite去写迭代的rdd数据

1.1.1 ShuffleWrite

ShuffleWrite的构建是通过shuffleManager来获取的,在SortShuffleManager.scala中
 /** Get a writer for a given partition. Called on executors by map tasks. */
  override def getWriter[K, V](
      handle: ShuffleHandle,
      mapId: Int,
      context: TaskContext): ShuffleWriter[K, V] = 
    numMapsForShuffle.putIfAbsent(
      handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
    val env = SparkEnv.get
    handle match 
      case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
        new UnsafeShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          context.taskMemoryManager(),
          unsafeShuffleHandle,
          mapId,
          context,
          env.conf)
      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
        new BypassMergeSortShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          bypassMergeSortHandle,
          mapId,
          context,
          env.conf)
      case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
        new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
    
  
在ShuffleDependency中保存着ShuffleHandle, ShuffleHandle中也保存着Dependency
  1. 在Driver DAG 中registerShuffle中dependency决定着使用什么ShuffleHandle
  2. 在Executor的shuffleManager中是由dependency中的ShuffleHandle来决定什么ShuffleWrite
题外话:Dependency本身就可以直接决定shuffleWrite,整个ShuffleHandle只是在SortShuffleWriter的时候用于获取了dependency, Executor端SortShuffleWriter本身就能获取到Dependency,ShuffleHandle感觉就是一个鸡肋。
在日志分析的这个代码案例中,返回的是SortShuffleWriter

1.1.2 RDD.iterator

在ShuffleMapTask中的runTask方法
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])

writer在调用的write函数中传递了rdd.iterator,也就是通过rdd构造的迭代器
 final def iterator(split: Partition, context: TaskContext): Iterator[T] = 
    if (storageLevel != StorageLevel.NONE) 
      getOrCompute(split, context)
     else 
      computeOrReadCheckpoint(split, context)
    
  

Map的rdd的构造迭代器MapPartitionsRDD,MapPartitionsRDD并没有设置缓存或者存储,StorageLevel是NONE,调用computerOrReadCheckpoint方法

  /**
   * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
   */
  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
  
    if (isCheckpointedAndMaterialized) 
      firstParent[T].iterator(split, context)
     else 
      compute(split, context)
    
  
也没有做过checkpointed ,调用compute方法
  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))
先来看fistParent
  /** Returns the first parent RDD */
  protected[spark] def firstParent[U: ClassTag]: RDD[U] = 
    dependencies.head.rdd.asInstanceOf[RDD[U]]
  
每个RDD都会保存一个Dependency的数组,Dependency里有RDD的属性,而Dependency数组的头一个dependency的RDD,就是处理数据的首个RDD,也就是如下的代码里的dataSet
val dataSet =
      if (args.length == 1) sc.textFile(args(0)) else sc.parallelize(exampleApacheLogs)
我们以parallelize为例子,所对应的RDD就是ParallelCollectionRDD 回到
firstParent[T].iterator(split, context))
iterator函数就是前面的RDD函数,StorageLevel依然是NONE,也没有做过checkpointed,依然还是调用compute的方法
  override def compute(s: Partition, context: TaskContext): Iterator[T] = 
    new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
  
生成了一个 InterruptibleIterator迭代器,迭代器本质只是一个代理的迭代器
@DeveloperApi
class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
  extends Iterator[T] 

  def hasNext: Boolean = 
    // TODO(aarondav/rxin): Check Thread.interrupted instead of context.interrupted if interrupt
    // is allowed. The assumption is that Thread.interrupted does not have a memory fence in read
    // (just a volatile field in C), while context.interrupted is a volatile in the JVM, which
    // introduces an expensive read fence.
    if (context.isInterrupted) 
      throw new TaskKilledException
     else 
      delegate.hasNext
    
  

  def next(): T = delegate.next()
当发现有打断命令的时候,直接抛出TaskKilledException的异常,其所代理的iterator 是
s.asInstanceOf[ParallelCollectionPartition[T]].iterator
ParallelCollectionRDD的Partition就是ParallelCollectionPartition
private[spark] class ParallelCollectionPartition[T: ClassTag](
    var rddId: Long,
    var slice: Int,
    var values: Seq[T]
  ) extends Partition with Serializable 

  def iterator: Iterator[T] = values.iterator
   .......
Values是需要支持序列化的数组,在Driver端ParallelCollectionRDD中将数据Data进行了ParallelCollectionPartition的分片,分片的数据Values被保存在了ParallelCollectionPartition里,数据并没有被保存在ParallelCollectionRDD中, 所以进行计算的数据并不是通过RDD传递过来的,而是通过反序列化ShuffleMapTask获得的,走的是直接的rpc通道
private[spark] class ShuffleMapTask(
    stageId: Int,
    stageAttemptId: Int,
    taskBinary: Broadcast[Array[Byte]],
    partition: Partition,
    @transient private var locs: Seq[TaskLocation],
    metrics: TaskMetrics,
    localProperties: Properties,
    jobId: Option[Int] = None,
    appId: Option[String] = None,
    appAttemptId: Option[String] = None)
  extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId,
    appId, appAttemptId)

回到MapPartitionsRDD原来的函数中去:
  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))
要看看f是什么?RDD.map函数
  def map[U: ClassTag](f: T => U): RDD[U] = withScope 
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  

我们在看看我们是如何调用map函数的:
dataSet.map(line => (extractKey(line), extractStats(line)))

f(context, split.index, firstParent[T].iterator(split, context))就是调用了(context, pid,iter) =>iter.map(cleanF) 关键的是iter.map函数这是scala的基本函数,查看scala代码Iterator.scala
 def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] 
    def hasNext = self.hasNext
    def next() = f(self.next())
  
返回的可以简单的认为AbstractIterator,self 指向的是InterruptibleIterator,f 就是 line => (extractKey(line), extractStats(line))
我们来看ExternalSorter.scala通过迭代器获取Partiton的数据并进行运算的代码
while (records.hasNext) 
        addElementsRead()
        kv = records.next()
        map.changeValue((getPartition(kv._1), kv._1), update)
        maybeSpillCollection(usingMap = true)
      

  • AbstractIterator.hasNext -> InterruptibleIterator.hasNext ->  Elements( Seq.interator).hasNext -> def hasNext: Boolean = index < end
  • AbstractIterator.next() -> InterruptibleIterator.next() -> Elements( Seq.interator).next(). -> f(InterruptibleIterator.next()) ->(extractKey(InterruptibleIterator.next()), extractStats(InterruptibleIterator.next()))
运算extractKey, extractStats后返回的是一个 Product2[Tuple3(String,String,String),Stats] KV值
还记得executor会loadDriver的jar么?虽然在scala里所定义函数都默认支持反序列化,但是在运行方法并不需要反序列化,只要加载jar包,classload 这个我们写的driver的类就可以了。

1.1.3 reduceByKey算子

在LogQuery中
.reduceByKey((c, d) => c.merge(d))
我们来看PairRDDFunction.scala中的reduceByKey,为什么PairRDDFunction不是RDD在前面的博客已经描述过
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope 
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  
combineByKeyWithClassTag函数中
def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope 
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) 
      if (mapSideCombine) 
        throw new SparkException("Cannot use map-side combining with array keys.")
      
      if (partitioner.isInstanceOf[HashPartitioner]) 
        throw new SparkException("HashPartitioner cannot partition array keys.")
      
    
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) 
      self.mapPartitions(iter => 
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      , preservesPartitioning = true)
     else 
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    
  

在以前都没有介绍过Aggregator,我们来介绍一下这个Aggregator,Aggregator有三个关键函数
  1. createCombiner: 通过Map获得的新KV, 在Key不存在的情况下将V转化为C
  2. mergeValue: 通过Map获得的新KV, 在已经存在相同的Key情况下,将新获得的V聚合到C
  3. mergeCombiners: 分布式计算的时候,最后要每个RDD的分区最后汇总,汇总的时候对相同的Key,已经聚合的C和另一个分区已经聚合的C再次聚合
在logquery的例子中,mergeValue, mergeCombiners 就是 (c,d)  =>c.merge(d)            createCombiner就是 stats不变 还是回到ExternalSorter.scala的insertAll中
val mergeValue = aggregator.get.mergeValue
      val createCombiner = aggregator.get.createCombiner
      var kv: Product2[K, V] = null
      val update = (hadValue: Boolean, oldValue: C) => 
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      
      while (records.hasNext) 
        addElementsRead()
        kv = records.next()
        map.changeValue((getPartition(kv._1), kv._1), update)
        maybeSpillCollection(usingMap = true)
      
我们看到在map.changeValue的时候,通过update的方法更新相同的key
val update = (hadValue: Boolean, oldValue: C) => 
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      
mergeValue,createCombiner就是从Aggregator中获取到的,而Aggregator被保存在ShuffledRDD和ShuffledDependency中,ShuffledDependency是通过Driver RPC传递给Executor的,所以可以从ShuffledDependency获取到Aggregator,通过Aggregator里指定的算法进行KV的操作,而mergeValue就是Driver中的c.merge(d),因为c 是stats 对象
    class Stats(val count: Int, val numBytes: Int) extends Serializable 
      def merge(other: Stats): Stats = 
        new Stats(count + other.count, numBytes + other.numBytes)
      
      override def toString: String = "bytes=%s\\tn=%s".format(numBytes, count)
    
调用了Stats.merge的方法

2. 总结

  • 通过反序列化RDD(不是ShuffleRDD),通过Dependency的列表获的最初获取数据的RDD的迭代器A
  • Map算子对迭代器A重新封装AbstractIterator,在迭代器A获取结果后进行Map算子里的函数调用line => (extractKey(line), extractStats(line)),返回KV的结果
  • reduceByKey算子里的函数传递是通过ShuffledDependency里的aggregator进行传递
  • Executor 只要对迭代器AbstractIterator进行迭代获取KV,调用aggregator里的方法进行相同的K对V进行操作,完成Driver里面的main函数定义的RDD运算。





以上是关于大数据:Spark Core用LogQuery的例子来说明Executor是如何运算RDD的算子的主要内容,如果未能解决你的问题,请参考以下文章

大数据:Spark ShuffleShuffleWrite:Executor如何将Shuffle的结果进行归并写到数据文件中去

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Cor

大数据之Spark:Spark Core

大数据之Spark:Spark Core 调优之数据倾斜调优

大数据笔记(二十七)——Spark Core简介及安装配置

《OD学spark》20160925 Spark Core