Spark源码:如何理解withScope方法
Posted
技术标签:
【中文标题】Spark源码:如何理解withScope方法【英文标题】:Spark Source code: How to understand withScope method 【发布时间】:2016-06-08 00:11:40 【问题描述】:看不懂withScope方法的功能(其实我也不是很懂RDDOperationScope类的意思)
特别是withScope方法的参数列表中的(body: => T)是什么意思:
private[spark] def withScope[T](
sc: SparkContext,
name: String,
allowNesting: Boolean,
ignoreParent: Boolean)(body: => T): T =
// Save the old scope to restore it later
val scopeKey = SparkContext.RDD_SCOPE_KEY
val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
val oldScopeJson = sc.getLocalProperty(scopeKey)
val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
val oldNoOverride = sc.getLocalProperty(noOverrideKey)
try
if (ignoreParent)
// Ignore all parent settings and scopes and start afresh with our own root scope
sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
else if (sc.getLocalProperty(noOverrideKey) == null)
// Otherwise, set the scope only if the higher level caller allows us to do so
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
// Optionally disallow the child body to override our scope
if (!allowNesting)
sc.setLocalProperty(noOverrideKey, "true")
body
finally
// Remember to restore any state that was modified before exiting
sc.setLocalProperty(scopeKey, oldScopeJson)
sc.setLocalProperty(noOverrideKey, oldNoOverride)
您可以通过此链接找到源代码: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
谁能帮帮我?谢谢,我困惑了很久。
【问题讨论】:
这是一个私有方法。方法文档很好地解释了它。您需要非常了解 Spark 内部思想。 谢谢,有什么关于 spark internal 的推荐吗? 【参考方案1】:以下代码可能对您有所帮助
object TestWithScope
def withScope(func: => String) =
println("withscope")
func
def bar(foo: String) = withScope
println("Bar: " + foo)
"BBBB"
def main(args: Array[String]): Unit =
println(bar("AAAA"));
可能的输出
withscope
Bar: AAAA
BBBB
【讨论】:
这个例子很好地解释了它。为了完整起见,您能否解释一下def withScope(func: => String) = ...
中:
的含义【参考方案2】:
你需要看看 withScope 是如何被调用的。这是 RDD.scala 中的一个示例
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
基本上它会创建一个新的范围(代码块),这样前一个函数中的变量就不会与当前函数混合。范围的主体是在 withScope 之后传递的内容,在这种情况下是
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
我还没有到恢复旧范围的地步。
【讨论】:
以上是关于Spark源码:如何理解withScope方法的主要内容,如果未能解决你的问题,请参考以下文章