使用 reduceByKey(numPartitions) 或 repartition 规范化 SPARK RDD 分区

Posted

技术标签:

【中文标题】使用 reduceByKey(numPartitions) 或 repartition 规范化 SPARK RDD 分区【英文标题】:Normalize SPARK RDD partitions using reduceByKey(numPartitions) or repartition 【发布时间】:2020-09-04 07:00:41 【问题描述】:

使用 Spark 2.4.0。 我的生产数据非常倾斜,因此其中一项任务花费的时间是其他任务的 7 倍。 我尝试了不同的策略来规范化数据,以便所有执行者都能平等地工作 -

    spark.default.parallelism reduceByKey(numPartitions) 重新分区(numPartitions)

我的期望是它们三个应该均匀分区,但是在 Spark Local/Standalone 上使用一些虚拟的非生产数据表明选项 1,2 比 3 更规范化。

数据如下:(我正在尝试对每个帐户的余额+ccy组合进行简单的减少

accountdateccyamount
A12020/01/20USD100.12
A22010/01/20SGD200.24
A22010/01/20USD300.36
A12020/01/20USD400.12

预期结果应该是[A1-USD,500.24], [A2-SGD,200.24], [A2-USD,300.36] 理想情况下,这些应该被划分为 3 个不同的分区。

javaRDDWithoutHeader
.mapToPair((PairFunction<Balance, String, Integer>) balance -> new Tuple2<>(balance.getAccount() + balance.getCcy(), 1))        
    .mapToPair(new MyPairFunction())
   .reduceByKey(new ReductionFunction())

检查分区的代码

     System.out.println("b4 = " +pairRDD.getNumPartitions());
     System.out.println(pairRDD.glom().collect());
     JavaPairRDD<DummyString, BigDecimal> newPairRDD = pairRDD.repartition(3);
     System.out.println("Number of partitions = " +newPairRDD.getNumPartitions());
     System.out.println(newPairRDD.glom().collect());
    选项 1:什么都不做 选项 2:将 spark.default.parallelism 设置为 3 选项 3:reduceByKey 与 numPartitions = 3

    选项4:重新分区(3)

    对于选项 1 分区数 = 2 [ [(DummyStringaccount='A2', ccy='SGD',200.24), (DummyString account='A2', ccy='USD',300.36)], [(DummyStringaccount='A1', ccy='USD',500.24)] ]

    对于选项 2

    分区数 = 3 [ [(DummyStringaccount='A1', ccy='USD',500.24)], [(DummyStringaccount='A2', ccy='USD',300.36)], [(DummyStringaccount='A2', ccy='SGD',200.24)]]

    对于选项 3 分区数 = 3 [ [(DummyStringaccount='A1', ccy='USD',500.24)], [(DummyStringaccount='A2', ccy='USD',300.36)], [(DummyStringaccount='A2', ccy='SGD',200.24)] ]

    对于选项 4 分区数 = 3 [[], [(虚拟字符串 account='A2', ccy='SGD',200.24)], [(DummyString account='A2', ccy='USD',300.36), (DummyString account='A1', ccy='USD',500.24)]]

结论: 选项 2(spark.default.parallelism) 和 3(reduceByKey(numPartitions) 归一化比选项 4(重新分区)好得多 相当确定的结果,从未见过 option4 归一化为 3 个分区。

问题:

    reduceByKey(numPartitions) 比 repartition 好很多 这仅仅是因为样本数据集太小了吗?或 当我们通过 YARN 集群提交时,这种行为是否会有所不同

【问题讨论】:

阶段或任务——我假设是后者。 1 个任务(在一个阶段的 250 多个任务中)花费了很长时间 你能展示一下 Spark UI 的东西吗?我怀疑250在输入端。什么是绝对值的 7 倍? 哪个阶段?为什么你认为散列应该都在单独的桶中?哈希结果就是它们的样子。 见***.com/questions/43027306/… 【参考方案1】:

我认为这个问题涉及到一些问题,因此更难回答。

首先是与静态数据相关的分区和并行性,因此在读入时;在没有重新沸腾海洋的情况下,这是一个很好的 SO 答案,可以解决这个问题:当文件无法放入 spark 的主内存时,spark 如何读取大文件(PB)。无论如何,没有散列或任何事情发生,只是“原样”。

此外,RDD 与 DF 相比没有得到很好的优化。

Spark 中的各种操作会在调用 Action 后导致洗牌:

reduceByKey 将减少洗牌,使用散列进行最终聚合和更有效的本地分区聚合 也使用随机性进行重新分区 partitionBy(new HashPartitioner(n)) 等你没有提到的 reduceByKey(aggr.function, N partitions) 奇怪的是,它似乎首先比重新分区更有效

您的后一条评论通常暗示数据偏斜。太多条目散列到 reduceByKey 的相同“桶”/分区。缓解方式:

一般来说,先尝试使用更多的分区(读入时) - 但我在这里看不到您的转换和方法,所以我们将此作为一般建议。

一般来说,尝试使用合适的散列来预先(读入时)使用更多的分区 - 但我在这里看不到您的转换和方法,因此我们将此作为一般建议。

李>

或者在某些情况下,通过添加后缀来“加盐”密钥,然后再次通过 reduceByKey 和 reduceByKey 来“取消加盐”以获得原始密钥。取决于所花费的额外时间与保持原样或执行其他选项。

repartition(n) 应用随机排序,所以你洗牌,然后需要再次洗牌。不必要的imo。正如另一篇文章所示(请参阅您的问题中的 cmets),这看起来像是完成了不必要的工作,但这些都是旧式 RDD。

顺便说一句,使用数据帧更容易。

由于我们不知道您的完整编码,希望这会有所帮助。

【讨论】:

以上是关于使用 reduceByKey(numPartitions) 或 repartition 规范化 SPARK RDD 分区的主要内容,如果未能解决你的问题,请参考以下文章

Spark入门--Spark的reduce和reduceByKey

Spark中的treeReduce与reduceByKey

Spark/Scala:仅使用 RDD 使用 ReduceByKey 创建嵌套结构

reduceByKey和groupByKey的区别

IndexError:在pyspark shell上使用reduceByKey操作时列出索引超出范围

Spark笔记004-reduceByKey和groupBykey