对同一个 apache Spark RDD 的操作会导致所有语句重新执行

Posted

技术标签:

【中文标题】对同一个 apache Spark RDD 的操作会导致所有语句重新执行【英文标题】:Actions on the same apache Spark RDD cause all statements re-execution 【发布时间】:2016-01-14 17:31:51 【问题描述】:

我正在使用 Apache Spark 处理大量数据。我需要在同一个 RDD 上执行许多 Spark 操作。我的代码如下所示:

val rdd = /* Get the rdd using the SparkContext */
val map1 = rdd.map(/* Some transformation */)
val map2 = map1.map(/* Some other transformation */)
map2.count
val map3 = map2.map(/* More transformation */)
map3.count

问题在于调用第二个动作map3.count 会强制重新执行转换rdd.mapmap1.map

这到底是怎么回事?我认为 Spark 构建的 DAG 是造成这种行为的原因。

【问题讨论】:

您是否有一个最小的工作示例来重现该行为?我在map 中使用println 尝试了一些明显的操作来显示正在执行的操作,但是当我在第二个上调用collect 时,我只得到第二个,而不是第一个。 我已经更正了我的问题,这不是那么准确。明天我将能够给出一个有效的例子。感谢您的帮助。 【参考方案1】:

这是预期的行为。除非可以从缓存中获取其中一个祖先(通常这意味着在 shuffle 期间已显式或隐式地持久化),否则每个操作都将重新计算整个沿袭。

如果 RDD 已被持久化,但数据已丢失/从缓存中删除,或者可用空间量不足以存储所有记录,则也可以触发重新计算。

在这种特殊情况下,您应该按以下顺序cache

...
val map2 = map1.map(/* Some other transformation */)
map2.cache
map2.count
val map3 = map2.map(/* More transformation */)
...

如果您想避免重复评估 rddmap1map2

【讨论】:

我的 spark 作业也面临同样的问题,但在我的情况下,数据量很大,因此可能无法进行缓存。有没有其他选择可以避免数据重新洗牌。提前致谢 @Atul Shuffled 数据将重用 shuffle 文件。请参阅What does “Stage Skipped” mean in Apache Spark web UI? 和与之相关的其他问题。通常也应该可以使用内存和磁盘进行缓存。

以上是关于对同一个 apache Spark RDD 的操作会导致所有语句重新执行的主要内容,如果未能解决你的问题,请参考以下文章

spark的Pair RDD的转化操作

Spark Rdd DataFrame操作汇总

Spark-RDD/DataFrame/DateSet

IDEA Spark Streaming 操作(RDD队列流)

RDD的转换操作,分三种:单value,双value交互,(k,v)对

Apache Spark基础知识