缓存后正在重新评估 Spark 数据帧
Posted
技术标签:
【中文标题】缓存后正在重新评估 Spark 数据帧【英文标题】:Spark dataframe is being re-evaluated after a cache 【发布时间】:2019-10-04 17:13:32 【问题描述】:我在 spark 数据帧上使用缓存时遇到了一些问题。我的期望是,在数据帧上缓存之后,数据帧会在第一次需要时被创建和缓存。对数据帧的任何进一步调用都应该来自缓存
这是我的代码:
val mydf = spark.sql("read about 400 columns from a hive table").
withColumn ("newcol", someudf("existingcol")).
cache()
为了测试,我运行了 mydf.count() 两次。我希望第一次需要一些时间,因为数据正在被缓存。但是第二次应该是瞬时的吧?
我实际看到的是,这两个计数都需要相同的时间。第一个很快就回来了,我认为这告诉我数据没有被缓存。如果我删除代码的 withColumn 部分并只缓存原始数据,第二个计数是瞬时的
我做错了吗?如何从配置单元加载原始数据,添加列,然后缓存数据帧以供进一步使用?使用火花 2.3
任何帮助都会很棒!
【问题讨论】:
您能否检查一下explainPlan() 和Spark UI 的存储选项卡,看看数据是否真的被持久化了? 【参考方案1】:您的情况的问题是 mydf.count()
实际上并没有具体化数据框(即,并非所有列都被读取,您的 udf 将不会被调用)。那是因为count()
是高度优化的。
为确保将整个数据帧缓存到内存中,您应该使用 mydf.rdd.count()
或其他查询(例如使用排序和/或聚合)重复您的实验
参见例如this SO question
【讨论】:
这并没有什么不同。我也尝试运行 agg,多个查询的响应时间保持不变。我可以让它快速工作的唯一方法是删除 withColumn,但我需要它【参考方案2】:在缓存数据集/数据帧时,请查看documented default behavior:
def cache(): Dataset.this.type
使用默认存储级别 (
MEMORY_AND_DISK
) 保留此数据集。
所以你可以试试persist(MEMORY_ONLY)
def persist(newLevel: StorageLevel): Dataset.this.type
使用给定的存储级别保留此数据集。
newLevel
之一:MEMORY_ONLY
、MEMORY_AND_DISK
、MEMORY_ONLY_SER
、MEMORY_AND_DISK_SER
、DISK_ONLY
、MEMORY_ONLY_2
、MEMORY_AND_DISK_2
等
【讨论】:
【参考方案3】:如果相关
.cache/persist
是惰性求值,要强制它,你可以使用 spark SQL 的 API,它具有从惰性到渴望的能力变化。
CACHE [ LAZY ] TABLE table_identifier
[ OPTIONS ( 'storageLevel' [ = ] value ) ] [ [ AS ] query ]
除非 LAZY 指定它是 Eager 模式,否则您需要在此之前注册一个临时表。
伪代码是:
df.createOrReplaceTempView("dummyTbl")
spark.sql("cache table dummyTbl")
有关文档参考的更多信息 - https://spark.apache.org/docs/latest/sql-ref-syntax-aux-cache-cache-table.html
【讨论】:
以上是关于缓存后正在重新评估 Spark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章