Spark 1.6 数据帧缓存无法正常工作

Posted

技术标签:

【中文标题】Spark 1.6 数据帧缓存无法正常工作【英文标题】:Spark 1.6 Dataframe cache not working correctly 【发布时间】:2017-09-05 20:15:50 【问题描述】:

我的理解是,如果我有一个数据帧,如果我缓存()它并触发像 df.take(1) 或 df.count() 这样的操作,它应该计算数据帧并将其保存在内存中,并且每当调用缓存的数据帧时在程序中它使用缓存中已经计算的数据帧。

但这不是我的程序的工作方式。

我有一个如下所示的数据框,我正在缓存它,然后我立即运行df.count 操作。

    val df = inputDataFrame.select().where().withColumn("newcol" , "").cache()

    df.count

当我运行程序时。在 Spark UI 中,我看到第一行运行了 4 分钟,然后 当涉及到第二行时,它再次运行 4 分钟,基本上第一行被重新计算两次?

不应该在第二行触发时计算并缓存第一行吗?

如何解决此问题。我被卡住了,请指教。

【问题讨论】:

【参考方案1】:

我的理解是,如果我有一个数据帧,如果我缓存()它并触发像 df.take(1) 或 df.count() 这样的操作,它应该计算数据帧并将其保存在内存中,

这是不正确的。简单的cachecounttake 也不适用于RDD)是RDD 的有效方法,但Datasets 并非如此,它们使用了更高级的优化。带查询:

df.select(...).where(...).withColumn("newcol" , "").count()

where 子句中未使用的任何列都可以忽略。

有一个重要的discussion on the developer list和引用Sean Owen

我认为正确的答案是“不要那样做”,但如果你真的不得不这样做,你可以触发一个对每个分区不执行任何操作的 Dataset 操作。我认为这会更可靠,因为必须计算整个分区才能使其在实践中可用。或者,甚至遍历每个元素。

翻译成代码:

df.foreach(_ => ())

df.registerAsTempTable("df")
sqlContext.sql("CACHE TABLE df")

这是急切的,但它不再(Spark 2 及更高版本)记录在案,应该避免。

【讨论】:

嗨,Alper,我有一个数据框列表:df1、df2、df3。每个都是 join、groupby 和 transformations 的结果。我一步一步将他们加入到一个单独的df中。当我运行它时,我发现要计算 df 它总是同时计算 df1、df2、df3,这会导致大型磁盘改组使用。我试图强制 Spark 在使用 save/load、cache.count、persist.count 或 df.rdd.count 创建 df 之前实现 df1、df2、df3,但只有第一个解决方案有效。在第一个解决方案中,我需要将数据帧保存到临时位置并加载它们。您还有其他不需要保存/加载的解决方案吗?【参考方案2】:

不,如果您在 DataFrame 上调用 cache,此时它不会被缓存,它只会被“标记”以备将来缓存。实际缓存仅在稍后执行操作时完成。您还可以在 Spark UI 中的“Storage”下查看缓存的 DataFrame

代码中的另一个问题是 DataFrame 上的 count 不会计算整个 DataFrame,因为并非所有列都需要为此计算。您可以使用df.rdd.count() 强制执行整个评估(请参阅How to force DataFrame evaluation in Spark)。

问题是为什么您的第一个操作需要这么长时间,即使没有调用任何操作。我认为这与调用缓存时计算的缓存逻辑(例如大小估计等)有关(参见例如Why is rdd.map(identity).cache slow when rdd items are big?)

【讨论】:

以上是关于Spark 1.6 数据帧缓存无法正常工作的主要内容,如果未能解决你的问题,请参考以下文章

Spark 1.6 在数据帧保持分区字段中加载特定分区

在 2 个数据帧 Spark 中缓存同一张表两次

Spark 数据框无法比较 Null 值

Spark 2.0 将 json 读入带有引号的数据帧中 - 与 spark 1.6 不同的行为......错误?

缓存后正在重新评估 Spark 数据帧

从缓存中删除 spark 数据帧