Spark中将一个RDD严格划分为多个RDD
Posted
技术标签:
【中文标题】Spark中将一个RDD严格划分为多个RDD【英文标题】:Strict partition an RDD into multiple RDDs in Spark 【发布时间】:2016-03-05 00:26:36 【问题描述】:我有一个带有 n
分区的 rdd,我想以这样的方式将此 rdd 拆分为 k
rdds
rdd = rdd_1.union(rdd_2).union(rdd_3)...union(rdd_k)
例如,如果n=10
和k=2
我想最终得到 2 个 rdds,其中 rdd1 由 5 个分区组成,而 rdd2 由其他 5 个分区组成。
在 Spark 中最有效的方法是什么?
【问题讨论】:
证明更多关于你想要达到的目标的信息可能会给你一个更有帮助的答案。例如。如果你想平衡你的分区大小,你可以使用repartition
。除非您也拥有自己的分区程序,否则我看不出每个分区有一个 RDD 将如何服务于任何目的。另请注意,有许多函数可以使用分区索引,因此您可以只 return
处理无效分区。最后但同样重要的是,如果您的分区有逻辑拆分,也可以使用groupBy
。
【参考方案1】:
你可以试试这样的:
val rdd: RDD[T] = ???
val k: Integer = ???
val n = rdd.partitions.size
val rdds = (0 until n) // Create Seq of partitions numbers
.grouped(n / k) // group it into fixed sized buckets
.map(idxs => (idxs.head, idxs.last)) // Take the first and the last idx
.map
case(min, max) => rdd.mapPartitionsWithIndex(
// If partition in [min, max] range keep its iterator
// otherwise return empty-one
(i, iter) => if (i >= min & i <= max) iter else Iterator()
)
如果输入 RDD
具有复杂的依赖关系,则应在应用之前对其进行缓存。
【讨论】:
您忘记了“并行化”吗?到目前为止它只在 Seq 上运行 @tribbloid 我没有。这里唯一的 RDD 是输入的 (rdd
)。 parallelize
这里没有位置。以上是关于Spark中将一个RDD严格划分为多个RDD的主要内容,如果未能解决你的问题,请参考以下文章