带有缓存和操作的奇怪 Spark 行为

Posted

技术标签:

【中文标题】带有缓存和操作的奇怪 Spark 行为【英文标题】:Strange Spark behavior with cache and action 【发布时间】:2017-11-22 01:38:17 【问题描述】:

我一直想知道为什么我在运行某个 spark 作业时会出现奇怪的行为。如果我在缓存 DataFrame 之后或在将数据帧写回 hdfs 之前立即执行操作(A .show(1) 方法),则作业将出错。

这里有一篇与 SO 非常相似的帖子:

Spark SQL SaveMode.Overwrite, getting java.io.FileNotFoundException and requiring 'REFRESH TABLE tableName'。

基本上,另一篇文章解释说,当您从要写入的同一个 HDFS 目录中读取数据时,并且您的 SaveMode"overwrite",那么您将得到一个 java.io.FileNotFoundException

但在这里我发现,只是在程序中移动动作所在的位置会产生非常不同的结果——要么完成程序,要么给出这个例外。

我想知道是否有人可以解释为什么 Spark 在这里不一致?

 val myDF = spark.read.format("csv")
    .option("header", "false")
    .option("delimiter", "\t")
    .schema(schema)
    .load(myPath)

// If I cache it here or persist it then do an action after the cache, it will occasionally 
// not throw the error. This is when completely restarting the SparkSession so there is no
// risk of another user interfering on the same JVM.

      myDF.cache()
      myDF.show(1)

// Just an example.
// Many different transformations are then applied...

val secondDF = mergeOtherDFsWithmyDF(myDF, otherDF, thirdDF)

val fourthDF = mergeTwoDFs(thirdDF, StringToCheck, fifthDF)

// Below is the same .show(1) action call as was previously done, only this below
// action ALWAYS results in a successful completion and the above .show(1) sometimes results
// in FileNotFoundException and sometimes results in successful completion. The only
// thing that changes among test runs is only one is executed. Either
// fourthDF.show(1) or myDF.show(1) is left commented out

fourthDF.show(1)
fourthDF.write
    .mode(writeMode)
    .option("header", "false")
    .option("delimiter", "\t")
    .csv(myPath)

【问题讨论】:

【参考方案1】:

尝试使用count 而不是show(1),我认为问题是由于Spark 试图变得聪明而不是加载整个数据帧(因为show 不需要一切)。运行 count 会强制 Spark 加载并正确缓存所有数据,这有望消除不一致。

【讨论】:

我去看看! 您可以考虑使用foreachPartition(x => ) 而不是count。 可能不会有明显差异,但 count 需要聚合,而 foreachPartition 不需要。 是不是表示可以在缓存后使用计数,让缓存现在真正缓存中间数据?【参考方案2】:

Spark 仅按需实现 rdds,大多数操作需要读取 DF 的所有分区,例如 count(),但 take() 和 first() 等操作不需要所有分区。

在您的情况下,它需要一个分区,因此只有 1 个分区被物化和缓存。然后,当您执行 count() 时,所有分区都需要物化并缓存到可用内存允许的范围内。

【讨论】:

以上是关于带有缓存和操作的奇怪 Spark 行为的主要内容,如果未能解决你的问题,请参考以下文章

带有广播连接的 Spark 流式传输

Spark:MEMORY_ONLY_SER_2 - 奇怪的内存行为

浮动操作按钮滚动奇怪的行为

Firefox 基于 max-age 的缓存过期行为奇怪

gcc 的奇怪行为。带有 和 = 的 C++ 对象定义是不是相等?

Spark RDD基本概念宽窄依赖转换行为操作