当我迭代地重复使用旧的缓存数据时,Spark Dataframe突然变得非常慢
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了当我迭代地重复使用旧的缓存数据时,Spark Dataframe突然变得非常慢相关的知识,希望对你有一定的参考价值。
当我尝试将缓存结果保存在List中并尝试通过每次迭代中最后一个列表中的所有数据计算新DataFrame时,问题就发生了。但是,即使我使用空的DataFrame并且每次都得到一个空的结果,该函数在大约8~12轮之后会突然变得很慢。
这是我的代码
testLoop(Nil)
def testLoop(lastDfList:List[DataFrame]){
// do some dummy transformation like union and cache the result
val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf)}.cache
// always get 0, of course
println(resultDf.count)
// benchmark action
benchmark(resultDf.count)
testLoop(resultDf::lastDfList)
}
基准测试结果
1~6 round : < 200ms
7 round : 367ms
8 round : 918ms
9 round : 2476ms
10 round : 7833ms
11 round : 24231ms
我不认为GC或Block驱逐是我的问题,因为我已经使用了一个空的DataFrame,但我不知道是什么原因?我是否误解了缓存或其他什么的含义?
谢谢!
在阅读ImDarrenG的解决方案后,我将我的代码更改为以下内容:
spark.sparkContext.setCheckpointDir("/tmp")
testLoop(Nil)
def testLoop(lastDfList:List[DataFrame]){
// do some dummy transformation like union and cache the result
val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf)}.cache
resultDf.checkpoint()
// always get 0, of course
println(resultDf.count)
// benchmark action
benchmark(resultDf.count)
testLoop(resultDf::lastDfList)
}
但经过几次迭代后它仍然变得很慢。
在这里,您可以通过将DataFrames
添加到resultDf
的开头来创建lastDfList
列表,并将其传递给下一个testLoop
迭代:
testLoop(resultDf::lastDfList)
所以lastDfList
每次传球都会变长。
这条线由DataFrame
ing union
的每个成员创建一个新的lastDfList
:
val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf))}.cache
lastDfList
的每个成员都是它的前辈的联盟,因此,Spark保持一个谱系,随着testLoop
的每次传递变得指数级。
我预计时间的增加是由发展议程集团的内务管理造成的。缓存数据帧不需要重复转换,但仍然必须通过spark维护谱系。
缓存数据或不,看起来你正在构建一个非常复杂的DAG,将每个DataFrame
与testLoop
的每次传递的所有前辈联合起来。
您可以使用checkpoint
修剪谱系,并引入一些检查以防止无限递归。
根据API和code,checkpoint
将返回一个新的数据集,而不是更改原始数据集。
以上是关于当我迭代地重复使用旧的缓存数据时,Spark Dataframe突然变得非常慢的主要内容,如果未能解决你的问题,请参考以下文章