对同一个 apache Spark RDD 的操作会导致所有语句重新执行
Posted
技术标签:
【中文标题】对同一个 apache Spark RDD 的操作会导致所有语句重新执行【英文标题】:Actions on the same apache Spark RDD cause all statements re-execution 【发布时间】:2016-01-14 17:31:51 【问题描述】:我正在使用 Apache Spark 处理大量数据。我需要在同一个 RDD
上执行许多 Spark 操作。我的代码如下所示:
val rdd = /* Get the rdd using the SparkContext */
val map1 = rdd.map(/* Some transformation */)
val map2 = map1.map(/* Some other transformation */)
map2.count
val map3 = map2.map(/* More transformation */)
map3.count
问题在于调用第二个动作map3.count
会强制重新执行转换rdd.map
和map1.map
。
这到底是怎么回事?我认为 Spark 构建的 DAG 是造成这种行为的原因。
【问题讨论】:
您是否有一个最小的工作示例来重现该行为?我在map
中使用println
尝试了一些明显的操作来显示正在执行的操作,但是当我在第二个上调用collect
时,我只得到第二个,而不是第一个。
我已经更正了我的问题,这不是那么准确。明天我将能够给出一个有效的例子。感谢您的帮助。
【参考方案1】:
这是预期的行为。除非可以从缓存中获取其中一个祖先(通常这意味着在 shuffle 期间已显式或隐式地持久化),否则每个操作都将重新计算整个沿袭。
如果 RDD 已被持久化,但数据已丢失/从缓存中删除,或者可用空间量不足以存储所有记录,则也可以触发重新计算。
在这种特殊情况下,您应该按以下顺序cache
...
val map2 = map1.map(/* Some other transformation */)
map2.cache
map2.count
val map3 = map2.map(/* More transformation */)
...
如果您想避免重复评估 rdd
、map1
和 map2
。
【讨论】:
我的 spark 作业也面临同样的问题,但在我的情况下,数据量很大,因此可能无法进行缓存。有没有其他选择可以避免数据重新洗牌。提前致谢 @Atul Shuffled 数据将重用 shuffle 文件。请参阅What does “Stage Skipped” mean in Apache Spark web UI? 和与之相关的其他问题。通常也应该可以使用内存和磁盘进行缓存。以上是关于对同一个 apache Spark RDD 的操作会导致所有语句重新执行的主要内容,如果未能解决你的问题,请参考以下文章
IDEA Spark Streaming 操作(RDD队列流)