重新分区分区数据

Posted

技术标签:

【中文标题】重新分区分区数据【英文标题】:Repartitioning partitioned data 【发布时间】:2016-03-31 04:15:27 【问题描述】:

我正在处理一个倾斜的数据问题,例如我的最小分区低于 64MB,而我最大的分区可能大于 1GB。我一直在考虑将几个小分区映射到同一个分区键的策略,从而创建一个由分区组成的分区。这一切都是为了减少任务大小的差异以及存储在磁盘上的文件数量。

在我的 Spark 应用程序的某一时刻,我需要对(未分组的)原始分区进行操作,为此,需要按原始键重新分区。这让我想到了我的问题:

假设我有两个数据集,如下所示。每行都是 (partition_key, (original_key, data)) 形式的元组。在 data0 中可以看到 original_key = 0 在自己的节点上,而 original_key = 4 和 original_key = 5 一起在包含 partition_key = 3 的节点上。在 data1 ,事情没有那么有条理。

如果data0先按partition_key分区,再按original_key分区,会不会出现shuffle?换句话说,在第二个 partitionBy 调用期间 data0 是否比 data1 更有条理?

data0 = [
    (0, (0, 'a')),
    (0, (0, 'b')),
    (0, (0, 'c')),
    (1, (1, 'd')),
    (1, (1, 'e')),
    (1, (2, 'f')),
    (1, (2, 'g')),
    (2, (3, 'h')),
    (2, (3, 'i')),
    (2, (3, 'j')),
    (3, (4, 'k')),
    (3, (4, 'l')),
    (3, (5, 'm')),
    (3, (5, 'n')),
    (3, (5, 'o')),
]

data1 = [
    (0, (0, 'a')),
    (1, (0, 'b')),
    (0, (0, 'c')),
    (1, (1, 'd')),
    (2, (1, 'e')),
    (1, (2, 'f')),
    (3, (2, 'g')),
    (2, (3, 'h')),
    (0, (3, 'i')),
    (3, (3, 'j')),
    (3, (4, 'k')),
    (3, (4, 'l')),
    (1, (5, 'm')),
    (2, (5, 'n')),
    (3, (5, 'o')),
]

rdd0 = sc.parallelize(data0, 3).cache()
partitioned0 = rdd0.partitionBy(4)
partitioned0.map(lambda row: (row[1][0], row[1])).partitionBy(6).collect()

rdd1 = sc.parallelize(data1, 3).cache()
partitioned1 = rdd1.partitionBy(4)
partitioned1.map(lambda row: (row[1][0], row[1])).partitionBy(6).collect()

【问题讨论】:

我不确定我是否明白这一点。因此,您实际上是在尝试使分区更大吗?我可能会更有意义地实际关注大分区的重新分配。关于手头的问题 - 如果新键、分区器和分区数相同,则不应移动数据。所以从某种意义上说,“更有条理”会减少要洗牌的数据量。 【参考方案1】:

当你调用 re-partition shuffle 时。 有多少数据被洗牌是基于原始的RDD。

附带说明:当您执行sc.parallelize(data0,3) 时,3 只是指导方针。如果默认分区

【讨论】:

以上是关于重新分区分区数据的主要内容,如果未能解决你的问题,请参考以下文章

Spark中的最佳重新分区方式

重新分区已经有效地分区数据集,以便将小文件组合成更大的文件

重新分区 pyspark 数据帧失败以及如何避免初始分区大小

火花重新分区和合并

如何根据列在火花中重新分区?

Spark DataFrame重新分区:未保留的分区数