Spark源码:如何理解withScope方法

Posted

技术标签:

【中文标题】Spark源码:如何理解withScope方法【英文标题】:Spark Source code: How to understand withScope method 【发布时间】:2016-06-08 00:11:40 【问题描述】:

看不懂withScope方法的功能(其实我也不是很懂RDDOperationScope类的意思)

特别是withScope方法的参数列表中的(body: => T)是什么意思:

private[spark] def withScope[T](
  sc: SparkContext,
  name: String,
  allowNesting: Boolean,
  ignoreParent: Boolean)(body: => T): T = 
// Save the old scope to restore it later
val scopeKey = SparkContext.RDD_SCOPE_KEY
val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
val oldScopeJson = sc.getLocalProperty(scopeKey)
val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
val oldNoOverride = sc.getLocalProperty(noOverrideKey)
try 
  if (ignoreParent) 
    // Ignore all parent settings and scopes and start afresh with our own root scope
    sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
   else if (sc.getLocalProperty(noOverrideKey) == null) 
    // Otherwise, set the scope only if the higher level caller allows us to do so
    sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
  
  // Optionally disallow the child body to override our scope
  if (!allowNesting) 
    sc.setLocalProperty(noOverrideKey, "true")
  
  body
 finally 
  // Remember to restore any state that was modified before exiting
  sc.setLocalProperty(scopeKey, oldScopeJson)
  sc.setLocalProperty(noOverrideKey, oldNoOverride)


您可以通过此链接找到源代码: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala

谁能帮帮我?谢谢,我困惑了很久。

【问题讨论】:

这是一个私有方法。方法文档很好地解释了它。您需要非常了解 Spark 内部思想。 谢谢,有什么关于 spark internal 的推荐吗? 【参考方案1】:

以下代码可能对您有所帮助

object TestWithScope 
    def withScope(func: => String) = 
        println("withscope")
        func
    

    def bar(foo: String) = withScope 
        println("Bar: " + foo)
        "BBBB"
    

    def main(args: Array[String]): Unit = 
        println(bar("AAAA"));
    

可能的输出

withscope
Bar: AAAA
BBBB

【讨论】:

这个例子很好地解释了它。为了完整起见,您能否解释一下def withScope(func: => String) = ...:的含义【参考方案2】:

你需要看看 withScope 是如何被调用的。这是 RDD.scala 中的一个示例

     /**
   *  Return a new RDD by first applying a function to all elements of this
   *  RDD, and then flattening the results.
   */
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope 
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  

基本上它会创建一个新的范围(代码块),这样前一个函数中的变量就不会与当前函数混合。范围的主体是在 withScope 之后传递的内容,在这种情况下是

      
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
      

我还没有到恢复旧范围的地步。

【讨论】:

以上是关于Spark源码:如何理解withScope方法的主要内容,如果未能解决你的问题,请参考以下文章

Spark1.4源码走读笔记之隐式转换

5. spark-2.4.6源码分析(基于yarn cluster模式)- job任务提交Stage划分Stage提交

Spark算子

Spark算子

spark.mllib源码阅读-优化算法2-Updater

深入理解spark-taskScheduler,schedulerBackend源码分析