Pyspark:重新分区与分区

Posted

技术标签:

【中文标题】Pyspark:重新分区与分区【英文标题】:Pyspark: repartition vs partitionBy 【发布时间】:2016-02-23 05:29:29 【问题描述】:

我现在正在研究这两个概念,并希望了解一下。通过命令行工作,我一直在尝试找出差异以及开发人员何时会使用 repartition 与 partitionBy。

这里是一些示例代码:

rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 1), ('b', 3), ('c',1), ('ef',5)])
rdd1 = rdd.repartition(4)
rdd2 = rdd.partitionBy(4)

rdd1.glom().collect()
[[('b', 1), ('ef', 5)], [], [], [('a', 1), ('a', 2), ('b', 3), ('c', 1)]]

rdd2.glom().collect()
[[('a', 1), ('a', 2)], [], [('c', 1)], [('b', 1), ('b', 3), ('ef', 5)]]

我查看了两者的实现,我注意到的唯一区别是partitionBy 可以采用分区函数,或者默认使用portable_hash。所以在 partitionBy 中,所有相同的键都应该在同一个分区中。在重新分区中,我希望这些值在分区上分布得更均匀,但事实并非如此。

鉴于此,为什么有人会使用重新分区?我想我唯一能看到它被使用的情况是我没有使用 PairRDD,或者我有很大的数据偏差?

我有什么遗漏的吗,或者有人可以从不同的角度为我提供启示吗?

【问题讨论】:

【参考方案1】:

repartition() 用于指定分区数,考虑核心数和您拥有的数据量。

partitionBy()用于让shuffle函数更高效,如reduceByKey()join()cogroup()等。它只在一个RDD被多次使用的情况下有用,所以通常是其次是persist()

两者在行动上的区别:

pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1, 5, 6, 7, 7, 5, 5, 6, 4]).map(lambda x: (x, x))

pairs.partitionBy(3).glom().collect()
[[(3, 3), (6, 6), (6, 6)],
 [(1, 1), (4, 4), (4, 4), (1, 1), (7, 7), (7, 7), (4, 4)],
 [(2, 2), (2, 2), (5, 5), (5, 5), (5, 5)]]

pairs.repartition(3).glom().collect()
[[(4, 4), (2, 2), (6, 6), (7, 7), (5, 5), (5, 5)],
 [(1, 1), (4, 4), (6, 6), (4, 4)],
 [(2, 2), (3, 3), (1, 1), (5, 5), (7, 7)]]

【讨论】:

【参考方案2】:

repartition 已经存在于 RDD 中,并且不处理键分区(或除排序之外的任何其他标准)。现在 PairRDDs 添加了键的概念,随后添加了另一种允许按该键进行分区的方法。

所以是的,如果您的数据是键控的,那么您绝对应该按该键进行分区,这在许多情况下首先是使用 PairRDD(对于连接、reduceByKey 等)。

【讨论】:

重新分区不能将元素均匀分布在分区之间的原因是什么?这可能是因为我没有足够的数据,而且我们遇到了小样本问题吗? 好问题,我在尝试(在 Scala 中)时看到分布均匀。 @JoeWiden 只不过是一个简单的概率。 repartition 实际上是通过在现有值中添加随机键在内部使用 pair RDD,因此它不能提供关于输出数据分布的强有力保证。顺便说一句,您可能应该接受答案。 @MariusSoutier 实际上,Spark 中的 any 重新分区是使用对 RDD 处理的。如果需要,Spark 只需添加虚拟键或虚拟值即可使其工作。

以上是关于Pyspark:重新分区与分区的主要内容,如果未能解决你的问题,请参考以下文章

pyspark:重新分区后出现“太多值”错误

PySpark 根据特定列重新分区

重新分区 pyspark 数据帧失败以及如何避免初始分区大小

Pyspark 数据帧重新分区将所有数据放在一个分区中

让 PySpark 每列值输出一个文件(重新分区/分区不工作)

PySpark 重新分区 RDD 元素