在每个分区内具有共享指针的 Spark RDD(以及幻数 200??)

Posted

技术标签:

【中文标题】在每个分区内具有共享指针的 Spark RDD(以及幻数 200??)【英文标题】:Spark RDDs with shared pointers within each partition (and the magic number 200??) 【发布时间】:2014-11-22 16:35:11 【问题描述】:

我正在尝试持久化一个 spark RDD,其中每个分区的元素都共享对单个大对象的访问。但是,该对象似乎多次存储在内存中。将我的问题简化为只有 200 个元素的单个分区的玩具箱:

val nElements = 200
class Elem(val s:Array[Int])

val rdd = sc.parallelize(Seq(1)).mapPartitions( _ => 
    val sharedArray = Array.ofDim[Int](10000000) // Should require ~40MB
    (1 to nElements).toIterator.map(i => new Elem(sharedArray))
).cache()

rdd.count() //force computation    

这会消耗预期的内存量,如日志所示:

storage.MemoryStore:块 rdd_1_0 作为值存储在内存中(估计大小 38.2 MB,空闲 5.7 GB)

但是,200 是最大元素数。设置nElements=201 产生:

storage.MemoryStore:块 rdd_1_0 作为值存储在内存中(估计大小 76.7 MB,空闲 5.7 GB)

这是什么原因造成的?这个神奇的数字 200 是从哪里来的,如何增加呢?


编辑澄清

在函数中添加一个 println 表明它确实只被调用了一次。此外,运行:

rdd.map(_.s.hashCode).min == rdd.map(_.s.hashCode).max  // returns true

..显示所有 10000000 个元素确实都指向同一个对象,因此数据结构基本上表现正确。当 nExamples 大得多(例如 20000)时,问题就出现了,因此它不能持续存在。

storage.MemoryStore:没有足够的空间在内存中缓存 rdd_1_0! (目前计算为 6.1 GB)

当我设置 nExamples=500 时,它成功地将 rdd 保存在内存中,说 estimated size 1907.4 MB,但我可以看到我的内存使用量的实际增加远低于此。

【问题讨论】:

你试过设置preservesPartitioning = true吗?是不是一样的行为? 试一试,是的。没有区别。 你能把println放在mapPartitions的函数中吗?我想知道它是否会被调用两次。另外,rdd.foreach(x => println(x)) 是什么样的?每个元素的指针相同? 该函数只被调用一次。是的,每个元素都有相同的指针。 似乎数据结构正在按其应有的方式运行,因此这根本不是问题,只是随着 nElements 变得更大(如我所愿),spark 失去了坚持的能力。相反,它只是抱怨:没有足够的空间在内存中缓存 rdd_1_0! (目前计算为 6.1 GB) 【参考方案1】:

对于将来遇到此问题的任何人,我最终想出了一个超级 hacky 解决方案(尽管我仍然很高兴听到更好的解决方案)。我没有使用 rdd.cache(),而是定义:

def cached[T: ClassTag](rdd:RDD[T]) = 
    rdd.mapPartitions(p => 
        Iterator(p.toSeq)
    ).cache().mapPartitions(p =>
        p.next().toIterator
    )

这样cached(rdd) 返回一个从“缓存”列表生成的 RDD

【讨论】:

以上是关于在每个分区内具有共享指针的 Spark RDD(以及幻数 200??)的主要内容,如果未能解决你的问题,请参考以下文章

Spark面试题

入门大数据---Spark_RDD

Spark02

Spark:查找 RDD 的每个分区大小

spark系列之基本概念

Spark RDD 核心总结