缓存后正在重新评估 Spark 数据帧

Posted

技术标签:

【中文标题】缓存后正在重新评估 Spark 数据帧【英文标题】:Spark dataframe is being re-evaluated after a cache 【发布时间】:2019-10-04 17:13:32 【问题描述】:

我在 spark 数据帧上使用缓存时遇到了一些问题。我的期望是,在数据帧上缓存之后,数据帧会在第一次需要时被创建和缓存。对数据帧的任何进一步调用都应该来自缓存

这是我的代码:

val mydf = spark.sql("read about 400 columns from a hive table").
  withColumn ("newcol", someudf("existingcol")).
  cache()

为了测试,我运行了 mydf.count() 两次。我希望第一次需要一些时间,因为数据正在被缓存。但是第二次应该是瞬时的吧?

我实际看到的是,这两个计数都需要相同的时间。第一个很快就回来了,我认为这告诉我数据没有被缓存。如果我删除代码的 withColumn 部分并只缓存原始数据,第二个计数是瞬时的

我做错了吗?如何从配置单元加载原始数据,添加列,然后缓存数据帧以供进一步使用?使用火花 2.3

任何帮助都会很棒!

【问题讨论】:

您能否检查一下explainPlan() 和Spark UI 的存储选项卡,看看数据是否真的被持久化了? 【参考方案1】:

您的情况的问题是 mydf.count() 实际上并没有具体化数据框(即,并非所有列都被读取,您的 udf 将不会被调用)。那是因为count() 是高度优化的。

为确保将整个数据帧缓存到内存中,您应该使用 mydf.rdd.count() 或其他查询(例如使用排序和/或聚合)重复您的实验

参见例如this SO question

【讨论】:

这并没有什么不同。我也尝试运行 agg,多个查询的响应时间保持不变。我可以让它快速工作的唯一方法是删除 withColumn,但我需要它【参考方案2】:

在缓存数据集/数据帧时,请查看documented default behavior:

def cache(): Dataset.this.type

使用默认存储级别 (MEMORY_AND_DISK) 保留此数据集。

所以你可以试试persist(MEMORY_ONLY)

def persist(newLevel: StorageLevel): Dataset.this.type

使用给定的存储级别保留此数据集。

newLevel 之一:MEMORY_ONLYMEMORY_AND_DISKMEMORY_ONLY_SERMEMORY_AND_DISK_SERDISK_ONLYMEMORY_ONLY_2MEMORY_AND_DISK_2

【讨论】:

【参考方案3】:

如果相关

.cache/persist 是惰性求值,要强制它,你可以使用 spark SQL 的 API,它具有从惰性到渴望的能力变化。

CACHE [ LAZY ] TABLE table_identifier
    [ OPTIONS ( 'storageLevel' [ = ] value ) ] [ [ AS ] query ]

除非 LAZY 指定它是 Eager 模式,否则您需要在此之前注册一个临时表。

伪代码是:

df.createOrReplaceTempView("dummyTbl")
spark.sql("cache table dummyTbl")

有关文档参考的更多信息 - https://spark.apache.org/docs/latest/sql-ref-syntax-aux-cache-cache-table.html

【讨论】:

以上是关于缓存后正在重新评估 Spark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章

Spark 2.0+ 即使数据帧被缓存,如果它的源之一发生变化,它会重新计算吗?

Spark 1.6 数据帧缓存无法正常工作

从缓存中删除 spark 数据帧

在内存中缓存 Spark 数据帧是不是有额外的开销?

Apache Spark:广播连接不适用于缓存的数据帧

将大量 Spark 数据帧合并为一个