Spark:持久化和重新分区顺序
Posted
技术标签:
【中文标题】Spark:持久化和重新分区顺序【英文标题】:Spark: persist and repartition order 【发布时间】:2016-02-14 11:09:12 【问题描述】:我有以下代码:
val data = input.map... .persist(StorageLevel.MEMORY_ONLY_SER).repartition(2000)
我想知道如果我先进行重新分区有什么区别:
val data = input.map... .repartition(2000).persist(StorageLevel.MEMORY_ONLY_SER)
调用reparation和persist的顺序有区别吗?谢谢!
【问题讨论】:
【参考方案1】:是的,有区别。
在第一种情况下,您会在 map 阶段后获得持久 RDD。这意味着每次访问data
都会触发repartition
。
在第二种情况下,您在重新分区后缓存。当data
被访问并且之前已经物化时,没有额外的工作要做。
为了证明让我们做一个实验:
import org.apache.spark.storage.StorageLevel
val data1 = sc.parallelize(1 to 10, 8)
.map(identity)
.persist(StorageLevel.MEMORY_ONLY_SER)
.repartition(2000)
data1.count()
val data2 = sc.parallelize(1 to 10, 8)
.map(identity)
.repartition(2000)
.persist(StorageLevel.MEMORY_ONLY_SER)
data2.count()
并查看存储信息:
sc.getRDDStorageInfo
// Array[org.apache.spark.storage.RDDInfo] = Array(
// RDD "MapPartitionsRDD" (17) StorageLevel:
// StorageLevel(false, true, false, false, 1);
// CachedPartitions: 2000; TotalPartitions: 2000; MemorySize: 8.6 KB;
// ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B,
// RDD "MapPartitionsRDD" (7) StorageLevel:
// StorageLevel(false, true, false, false, 1);
// CachedPartitions: 8; TotalPartitions: 8; MemorySize: 668.0 B;
// ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B)
如您所见,有两个持久化 RDD,一个有 2000 个分区,一个有 8 个。
【讨论】:
我迟到了两年才发表评论。但 data1.getNumPartitions 和 data2.getNumPartitions 都返回 2000 @hadooper 它应该。中间对象不同,而不是最终对象。 你能解释一下中间对象吗? @hadooper 在一个场景中,Spark 将 RDD 保存在 8 个分区中,而在另一种场景中,首先进行洗牌,然后将 RDD 保存在 2000 个分区中。如果在第二种方法之后需要 2000 个分区更好,因为 shuffle 执行一次(在持久化之前)。 @hadooper 保留的分区数(8 vs 2000)以上是关于Spark:持久化和重新分区顺序的主要内容,如果未能解决你的问题,请参考以下文章