你应该总是在中间计数之前缓存一个 RDD 吗?
Posted
技术标签:
【中文标题】你应该总是在中间计数之前缓存一个 RDD 吗?【英文标题】:Should you always cache an RDD before an intermediate count? 【发布时间】:2017-11-21 21:30:32 【问题描述】:我想在开始和结束转换之间记录 RDD 中的行数。我的代码目前如下所示:
val transformation1 = firstTransformation(inputdata).cache // Is this cache recommended or can I remove it?
log("Transformation1 count: " + tranformation1.count)
val tranformation2 = secondTransformation(transformation1).cache
val finalX = transformation2.filter(row => row.contains("x"))
val finalY = tranformation2.filter(row => row.contains("y"))
我的问题是 transformation1
是一个巨大的 RDD 并且占用了大量内存(它适合内存但稍后会导致内存问题)。但是,我知道,由于我在 transformation1(.count()
和 secondTransformation()
)上执行 2 种不同的操作,因此通常建议将其缓存。
这种情况可能很常见,那么推荐的处理方法是什么?您应该始终在中间计数之前缓存 RDD,还是可以删除 transformation1 上的 .cache()
?
【问题讨论】:
【参考方案1】:如果您遇到内存问题,应尽快取消持久化,也可以在磁盘上持久化。
val transformation1 = firstTransformation(inputdata).persist(StorageLevel.DISK_ONLY) // Is this cache recommended or can I remove it?
log("Transformation1 count: " + tranformation1.count)
val tranformation2 = secondTransformation(transformation1).persist(StorageLevel.DISK_ONLY)
val finalX = transformation2.filter(row => row.contains("x"))
val finalY = tranformation2.filter(row => row.contains("y"))
// All the actions are done
transformation1.unpersist()
transformation2.unpersist()
如果您可以在内存问题发生之前使用 unpersist,那么如果您缓存而不是持久存储在磁盘上会更好
【讨论】:
取消持久化时,Spark 崩溃并显示Exception in thread "main" org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
这是一个不同的问题,看看***.com/questions/41123846/…以上是关于你应该总是在中间计数之前缓存一个 RDD 吗?的主要内容,如果未能解决你的问题,请参考以下文章
我们可以使用SizeEstimator.estimate估算RDD / DataFrame的大小吗?
使用 Pyspark 从单词列表的行条目创建元组并使用 RDD 计数