Pyspark:重新分区与分区
Posted
技术标签:
【中文标题】Pyspark:重新分区与分区【英文标题】:Pyspark: repartition vs partitionBy 【发布时间】:2016-02-23 05:29:29 【问题描述】:我现在正在研究这两个概念,并希望了解一下。通过命令行工作,我一直在尝试找出差异以及开发人员何时会使用 repartition 与 partitionBy。
这里是一些示例代码:
rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 1), ('b', 3), ('c',1), ('ef',5)])
rdd1 = rdd.repartition(4)
rdd2 = rdd.partitionBy(4)
rdd1.glom().collect()
[[('b', 1), ('ef', 5)], [], [], [('a', 1), ('a', 2), ('b', 3), ('c', 1)]]
rdd2.glom().collect()
[[('a', 1), ('a', 2)], [], [('c', 1)], [('b', 1), ('b', 3), ('ef', 5)]]
我查看了两者的实现,我注意到的唯一区别是partitionBy 可以采用分区函数,或者默认使用portable_hash。所以在 partitionBy 中,所有相同的键都应该在同一个分区中。在重新分区中,我希望这些值在分区上分布得更均匀,但事实并非如此。
鉴于此,为什么有人会使用重新分区?我想我唯一能看到它被使用的情况是我没有使用 PairRDD,或者我有很大的数据偏差?
我有什么遗漏的吗,或者有人可以从不同的角度为我提供启示吗?
【问题讨论】:
【参考方案1】:repartition()
用于指定分区数,考虑核心数和您拥有的数据量。
partitionBy()
用于让shuffle函数更高效,如reduceByKey()
、join()
、cogroup()
等。它只在一个RDD被多次使用的情况下有用,所以通常是其次是persist()
。
两者在行动上的区别:
pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1, 5, 6, 7, 7, 5, 5, 6, 4]).map(lambda x: (x, x))
pairs.partitionBy(3).glom().collect()
[[(3, 3), (6, 6), (6, 6)],
[(1, 1), (4, 4), (4, 4), (1, 1), (7, 7), (7, 7), (4, 4)],
[(2, 2), (2, 2), (5, 5), (5, 5), (5, 5)]]
pairs.repartition(3).glom().collect()
[[(4, 4), (2, 2), (6, 6), (7, 7), (5, 5), (5, 5)],
[(1, 1), (4, 4), (6, 6), (4, 4)],
[(2, 2), (3, 3), (1, 1), (5, 5), (7, 7)]]
【讨论】:
【参考方案2】:repartition
已经存在于 RDD 中,并且不处理键分区(或除排序之外的任何其他标准)。现在 PairRDDs 添加了键的概念,随后添加了另一种允许按该键进行分区的方法。
所以是的,如果您的数据是键控的,那么您绝对应该按该键进行分区,这在许多情况下首先是使用 PairRDD(对于连接、reduceByKey 等)。
【讨论】:
重新分区不能将元素均匀分布在分区之间的原因是什么?这可能是因为我没有足够的数据,而且我们遇到了小样本问题吗? 好问题,我在尝试(在 Scala 中)时看到分布均匀。 @JoeWiden 只不过是一个简单的概率。repartition
实际上是通过在现有值中添加随机键在内部使用 pair RDD,因此它不能提供关于输出数据分布的强有力保证。顺便说一句,您可能应该接受答案。
@MariusSoutier 实际上,Spark 中的 any 重新分区是使用对 RDD 处理的。如果需要,Spark 只需添加虚拟键或虚拟值即可使其工作。以上是关于Pyspark:重新分区与分区的主要内容,如果未能解决你的问题,请参考以下文章
重新分区 pyspark 数据帧失败以及如何避免初始分区大小