在每个分区内具有共享指针的 Spark RDD(以及幻数 200??)
Posted
技术标签:
【中文标题】在每个分区内具有共享指针的 Spark RDD(以及幻数 200??)【英文标题】:Spark RDDs with shared pointers within each partition (and the magic number 200??) 【发布时间】:2014-11-22 16:35:11 【问题描述】:我正在尝试持久化一个 spark RDD,其中每个分区的元素都共享对单个大对象的访问。但是,该对象似乎多次存储在内存中。将我的问题简化为只有 200 个元素的单个分区的玩具箱:
val nElements = 200
class Elem(val s:Array[Int])
val rdd = sc.parallelize(Seq(1)).mapPartitions( _ =>
val sharedArray = Array.ofDim[Int](10000000) // Should require ~40MB
(1 to nElements).toIterator.map(i => new Elem(sharedArray))
).cache()
rdd.count() //force computation
这会消耗预期的内存量,如日志所示:
storage.MemoryStore:块 rdd_1_0 作为值存储在内存中(估计大小 38.2 MB,空闲 5.7 GB)
但是,200 是最大元素数。设置nElements=201
产生:
storage.MemoryStore:块 rdd_1_0 作为值存储在内存中(估计大小 76.7 MB,空闲 5.7 GB)
这是什么原因造成的?这个神奇的数字 200 是从哪里来的,如何增加呢?
编辑澄清:
在函数中添加一个 println 表明它确实只被调用了一次。此外,运行:
rdd.map(_.s.hashCode).min == rdd.map(_.s.hashCode).max // returns true
..显示所有 10000000 个元素确实都指向同一个对象,因此数据结构基本上表现正确。当 nExamples 大得多(例如 20000)时,问题就出现了,因此它不能持续存在。
storage.MemoryStore:没有足够的空间在内存中缓存 rdd_1_0! (目前计算为 6.1 GB)
当我设置 nExamples=500
时,它成功地将 rdd 保存在内存中,说 estimated size 1907.4 MB,但我可以看到我的内存使用量的实际增加远低于此。
【问题讨论】:
你试过设置preservesPartitioning = true
吗?是不是一样的行为?
试一试,是的。没有区别。
你能把println
放在mapPartitions
的函数中吗?我想知道它是否会被调用两次。另外,rdd.foreach(x => println(x))
是什么样的?每个元素的指针相同?
该函数只被调用一次。是的,每个元素都有相同的指针。
似乎数据结构正在按其应有的方式运行,因此这根本不是问题,只是随着 nElements 变得更大(如我所愿),spark 失去了坚持的能力。相反,它只是抱怨:没有足够的空间在内存中缓存 rdd_1_0! (目前计算为 6.1 GB)
【参考方案1】:
对于将来遇到此问题的任何人,我最终想出了一个超级 hacky 解决方案(尽管我仍然很高兴听到更好的解决方案)。我没有使用 rdd.cache(),而是定义:
def cached[T: ClassTag](rdd:RDD[T]) =
rdd.mapPartitions(p =>
Iterator(p.toSeq)
).cache().mapPartitions(p =>
p.next().toIterator
)
这样cached(rdd)
返回一个从“缓存”列表生成的 RDD
【讨论】:
以上是关于在每个分区内具有共享指针的 Spark RDD(以及幻数 200??)的主要内容,如果未能解决你的问题,请参考以下文章