你应该总是在中间计数之前缓存一个 RDD 吗?

Posted

技术标签:

【中文标题】你应该总是在中间计数之前缓存一个 RDD 吗?【英文标题】:Should you always cache an RDD before an intermediate count? 【发布时间】:2017-11-21 21:30:32 【问题描述】:

我想在开始和结束转换之间记录 RDD 中的行数。我的代码目前如下所示:

val transformation1 = firstTransformation(inputdata).cache  // Is this cache recommended or can I remove it?
log("Transformation1 count: " + tranformation1.count)
val tranformation2 = secondTransformation(transformation1).cache
val finalX = transformation2.filter(row => row.contains("x"))
val finalY = tranformation2.filter(row => row.contains("y"))

我的问题是 transformation1 是一个巨大的 RDD 并且占用了大量内存(它适合内存但稍后会导致内存问题)。但是,我知道,由于我在 transformation1(.count()secondTransformation())上执行 2 种不同的操作,因此通常建议将其缓存。

这种情况可能很常见,那么推荐的处理方法是什么?您应该始终在中间计数之前缓存 RDD,还是可以删除 transformation1 上的 .cache()

【问题讨论】:

【参考方案1】:

如果您遇到内存问题,应尽快取消持久化,也可以在磁盘上持久化。

val transformation1 = firstTransformation(inputdata).persist(StorageLevel.DISK_ONLY)  // Is this cache recommended or can I remove it?
log("Transformation1 count: " + tranformation1.count)
val tranformation2 = secondTransformation(transformation1).persist(StorageLevel.DISK_ONLY)
val finalX = transformation2.filter(row => row.contains("x"))
val finalY = tranformation2.filter(row => row.contains("y"))
// All the actions are done
transformation1.unpersist()
transformation2.unpersist()

如果您可以在内存问题发生之前使用 unpersist,那么如果您缓存而不是持久存储在磁盘上会更好

【讨论】:

取消持久化时,Spark 崩溃并显示Exception in thread "main" org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout 这是一个不同的问题,看看***.com/questions/41123846/…

以上是关于你应该总是在中间计数之前缓存一个 RDD 吗?的主要内容,如果未能解决你的问题,请参考以下文章

我们可以使用SizeEstimator.estimate估算RDD / DataFrame的大小吗?

使用 Pyspark 从单词列表的行条目创建元组并使用 RDD 计数

如何在 Spark Scala 中的 Schema RDD [从案例类中创建] 中查找重复项以及相应的重复计数?

洛谷——P4017 最大食物链计数

重置窗口函数的计数器

我应该总是为失败的初始化方法释放自我吗?