重新分区分区数据
Posted
技术标签:
【中文标题】重新分区分区数据【英文标题】:Repartitioning partitioned data 【发布时间】:2016-03-31 04:15:27 【问题描述】:我正在处理一个倾斜的数据问题,例如我的最小分区低于 64MB,而我最大的分区可能大于 1GB。我一直在考虑将几个小分区映射到同一个分区键的策略,从而创建一个由分区组成的分区。这一切都是为了减少任务大小的差异以及存储在磁盘上的文件数量。
在我的 Spark 应用程序的某一时刻,我需要对(未分组的)原始分区进行操作,为此,需要按原始键重新分区。这让我想到了我的问题:
假设我有两个数据集,如下所示。每行都是 (partition_key, (original_key, data)) 形式的元组。在 data0 中可以看到 original_key = 0 在自己的节点上,而 original_key = 4 和 original_key = 5 一起在包含 partition_key = 3 的节点上。在 data1 ,事情没有那么有条理。
如果data0先按partition_key分区,再按original_key分区,会不会出现shuffle?换句话说,在第二个 partitionBy 调用期间 data0 是否比 data1 更有条理?
data0 = [
(0, (0, 'a')),
(0, (0, 'b')),
(0, (0, 'c')),
(1, (1, 'd')),
(1, (1, 'e')),
(1, (2, 'f')),
(1, (2, 'g')),
(2, (3, 'h')),
(2, (3, 'i')),
(2, (3, 'j')),
(3, (4, 'k')),
(3, (4, 'l')),
(3, (5, 'm')),
(3, (5, 'n')),
(3, (5, 'o')),
]
data1 = [
(0, (0, 'a')),
(1, (0, 'b')),
(0, (0, 'c')),
(1, (1, 'd')),
(2, (1, 'e')),
(1, (2, 'f')),
(3, (2, 'g')),
(2, (3, 'h')),
(0, (3, 'i')),
(3, (3, 'j')),
(3, (4, 'k')),
(3, (4, 'l')),
(1, (5, 'm')),
(2, (5, 'n')),
(3, (5, 'o')),
]
rdd0 = sc.parallelize(data0, 3).cache()
partitioned0 = rdd0.partitionBy(4)
partitioned0.map(lambda row: (row[1][0], row[1])).partitionBy(6).collect()
rdd1 = sc.parallelize(data1, 3).cache()
partitioned1 = rdd1.partitionBy(4)
partitioned1.map(lambda row: (row[1][0], row[1])).partitionBy(6).collect()
【问题讨论】:
我不确定我是否明白这一点。因此,您实际上是在尝试使分区更大吗?我可能会更有意义地实际关注大分区的重新分配。关于手头的问题 - 如果新键、分区器和分区数相同,则不应移动数据。所以从某种意义上说,“更有条理”会减少要洗牌的数据量。 【参考方案1】:当你调用 re-partition shuffle 时。 有多少数据被洗牌是基于原始的RDD。
附带说明:当您执行sc.parallelize(data0,3)
时,3 只是指导方针。如果默认分区
【讨论】:
以上是关于重新分区分区数据的主要内容,如果未能解决你的问题,请参考以下文章