Spark:持久化和重新分区顺序

Posted

技术标签:

【中文标题】Spark:持久化和重新分区顺序【英文标题】:Spark: persist and repartition order 【发布时间】:2016-02-14 11:09:12 【问题描述】:

我有以下代码:

val data = input.map... .persist(StorageLevel.MEMORY_ONLY_SER).repartition(2000)

我想知道如果我先进行重新分区有什么区别:

val data = input.map... .repartition(2000).persist(StorageLevel.MEMORY_ONLY_SER)

调用reparation和persist的顺序有区别吗?谢谢!

【问题讨论】:

【参考方案1】:

是的,有区别。

在第一种情况下,您会在 map 阶段后获得持久 RDD。这意味着每次访问data都会触发repartition

在第二种情况下,您在重新分区后缓存。当data 被访问并且之前已经物化时,没有额外的工作要做。

为了证明让我们做一个实验:

import  org.apache.spark.storage.StorageLevel

val data1 = sc.parallelize(1 to 10, 8)
  .map(identity)
  .persist(StorageLevel.MEMORY_ONLY_SER)
  .repartition(2000)
data1.count()

val data2 = sc.parallelize(1 to 10, 8)
  .map(identity)
  .repartition(2000)
  .persist(StorageLevel.MEMORY_ONLY_SER)
data2.count()

并查看存储信息:

sc.getRDDStorageInfo

// Array[org.apache.spark.storage.RDDInfo] = Array(
//   RDD "MapPartitionsRDD" (17) StorageLevel:
//       StorageLevel(false, true, false, false, 1);
//     CachedPartitions: 2000; TotalPartitions: 2000; MemorySize: 8.6 KB; 
//     ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B,
//   RDD "MapPartitionsRDD" (7) StorageLevel:
//      StorageLevel(false, true, false, false, 1);
//    CachedPartitions: 8; TotalPartitions: 8; MemorySize: 668.0 B; 
//    ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B)

如您所见,有两个持久化 RDD,一个有 2000 个分区,一个有 8 个。

【讨论】:

我迟到了两年才发表评论。但 data1.getNumPartitions 和 data2.getNumPartitions 都返回 2000 @hadooper 它应该。中间对象不同,而不是最终对象。 你能解释一下中间对象吗? @hadooper 在一个场景中,Spark 将 RDD 保存在 8 个分区中,而在另一种场景中,首先进行洗牌,然后将 RDD 保存在 2000 个分区中。如果在第二种方法之后需要 2000 个分区更好,因为 shuffle 执行一次(在持久化之前)。 @hadooper 保留的分区数(8 vs 2000)

以上是关于Spark:持久化和重新分区顺序的主要内容,如果未能解决你的问题,请参考以下文章

spark-partitionBy

spark中的分区和自定义分区器中的重新分区和排序给出数组越界异常

Spark重新分区不均匀分布记录

Spark DataFrame重新分区:未保留的分区数

我们如何在 Apache Spark 中执行动态重新分区?

Spark中的最佳重新分区方式