Spark 的 reduceByKey 最佳实践

Posted

技术标签:

【中文标题】Spark 的 reduceByKey 最佳实践【英文标题】:Spark best practices for reduceByKey 【发布时间】:2017-05-13 19:37:22 【问题描述】:

我有一个带有下一个架构的数据框:

root
 |-- id_1: long (nullable = true)
 |-- id_2: long (nullable = true)
 |-- score: double (nullable = true)

数据如下:

+----+----+------------------+
|id_1|id_2|score             |
+----+----+------------------+
|0   |9   |0.5888888888888889|
|0   |1   |0.6166666666666667|
|0   |2   |0.496996996996997 |
|1   |9   |0.6222222222222221|
|1   |6   |0.9082996632996633|
|1   |5   |0.5927450980392157|
|2   |3   |0.665774107440774 |
|3   |8   |0.6872367465504721|
|3   |8   |0.6872367465504721|
|5   |6   |0.5365909090909091|
+----+----+------------------+

目标是为每个 id_1 找到最高分的 id_2。也许我错了,但是……只需要创建配对的 RDD:

root
 |-- _1: long (nullable = true)
 |-- _2: struct (nullable = true)
 |    |-- _1: long (nullable = true)
 |    |-- _2: double (nullable = true)

+---+----------------------+
|_1 |_2                    |
+---+----------------------+
|0  |[9,0.5888888888888889]|
|0  |[1,0.6166666666666667]|
|0  |[2,0.496996996996997] |
|1  |[9,0.6222222222222221]|
|1  |[6,0.9082996632996633]|
|1  |[5,0.5927450980392157]|
|2  |[3,0.665774107440774] |
|3  |[8,0.6872367465504721]|
|3  |[8,0.6872367465504721]|
|5  |[6,0.5365909090909091]|
+---+----------------------+

并通过最大键减少。类似的东西

paired_rdd.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-1]))

或与DataFrame API相同(没有配对的rdd):

original_df.groupBy('id_1').max('score')

我有两个问题,如果有人能指出我的错误步骤,我们将不胜感激。

    对于 10 亿甚至 1000 亿条记录:实现目标的最佳实践是什么(为每个 id_1 找到 id_2 并获得最高分数)?我已经尝试了 5000 万和 100M 记录,并且通过改组数据获得了更好的结果(这与 Holden Karau 所说的相反)。我已经通过id_1完成了重新分区

    .repartition(X, "id_1")

    然后 reduceByKey 更快。为什么?

    为什么 DataFrame API 比 RDD API 慢几倍?我哪里错了?

谢谢。

【问题讨论】:

original_df是如何分区的? @RaphaelRoth 我相信它取决于核心数量。 (是的,经验法则是每个分区 128MB,但它不是文本数据,也不会消耗太多内存)我尝试了 30 个内核并使用了 30 个,这是更好的时机。使用 60、120 - 更糟糕,少于 30 - 更糟糕,因为并非所有核心都已加载。 也许不是 RDD API 更快,而是您对数据进行了重新分区。如果你的原始dataframe分区太少,那么dataframe API的groupBy将无法正确使用你的集群资源(因为你没有足够的并行度) @RaphaelRoth 所有执行程序/核心都很忙,所以我相信,并行性就足够了。我怎样才能检查它?很快将使用 Spark UI 创建具有相同数据但不同数量的分区和 RDD 与 DataFrame 的屏幕。这是 youtube 上的链接,时间为 youtu.be/V6DkTVvy9vk?t=20m26s,其中 RDD 与 DataFrame 速度的条形图。 【参考方案1】:

您的用例是窗口聚合函数的完美用例。试一试,看看它与 RDD 的 reduceByKey 相比如何。


有时不是关于基于 RDD 的管道是否比基于 DataFrame 的管道更快,而是关于一个与另一个相比的表现力如何。基于 DataFrame 的管道几乎总是比基于 RDD 的替代方案更具表现力(并且从长远来看可能更易于维护)。


(我正在使用 Scala 并将代码转换为 Python 作为家庭练习)

scala> dataset.show
+----+----+------------------+
|id_1|id_2|             score|
+----+----+------------------+
|   0|   9|0.5888888888888889|
|   0|   1|0.6166666666666667|
|   0|   2| 0.496996996996997|
|   1|   9|0.6222222222222221|
|   1|   6|0.9082996632996633|
|   1|   5|0.5927450980392157|
|   2|   3| 0.665774107440774|
|   3|   8|0.6872367465504721|
|   3|   8|0.6872367465504721|
|   5|   6|0.5365909090909091|
+----+----+------------------+

import org.apache.spark.sql.expressions.Window
val byId_1 = Window.partitionBy("id_1")
original_df.select($"id_1", max() over byId_1)
scala> dataset.
  select($"id_1", $"id_2", $"score", max("score") over byId_1 as "max_score").
  filter($"score" === $"max_score").
  distinct.  // <-- id_1 == 3 is duplicated
  sort("id_1").
  show
+----+----+------------------+------------------+
|id_1|id_2|             score|         max_score|
+----+----+------------------+------------------+
|   0|   1|0.6166666666666667|0.6166666666666667|
|   1|   6|0.9082996632996633|0.9082996632996633|
|   2|   3| 0.665774107440774| 0.665774107440774|
|   3|   8|0.6872367465504721|0.6872367465504721|
|   5|   6|0.5365909090909091|0.5365909090909091|
+----+----+------------------+------------------+

请注意,默认情况下,DataFrames 使用spark.sql.shuffle.partitions,即200,上周我遇到了一个案例,其中大多数分区(以及因此的任务)都是空的,导致数千个任务等待执行,这是徒劳的并烧毁 CPU 周期。我们从几小时缩短到几秒。

了解您的数据及其应如何分区是优化 Spark 查询的第一步,无论它是使用 RDD API 还是 Dataset API 编写的。

【讨论】:

【参考方案2】:

感谢 Jacek 的有趣建议。

我已经在 4 * c4.8xlarge 服务器上进行了一些测试(128 个内核,192GB RAM,我希望 32 个工作人员和 partitioning=128 对这个设置有好处)。 使用具有 1,368,598,093 条记录的数据集。

    “窗口”解决方案 - 大约 43 分钟 并生成大约 31GB 随机播放(15.4GB 随机写入和 15.4GB 随机读取)。参见第 25 阶段。 不按 id 重新分区的 reduceByKey 解决方案 - 40min 和 8.4MB 随机播放(然后 4.2MB 随机写入和 4.2MB 随机读取)参见第 22 阶段 获胜者 - reduceByKey id 重新分区。 22min 和 15GB 随机播放(7.5GB 随机写入和 7.5GB 随机读取)参见第 24 阶段

我相信如果我要处理 200B 条记录,shuffle 会导致一些 IO 问题,最好不要使用某些列的 repartition(因为 shuffle),但我不知道如何在没有它的情况下提高速度。不幸的是 *** 不能给我正确的答案。 :(

感谢各位有趣的建议!

【讨论】:

以上是关于Spark 的 reduceByKey 最佳实践的主要内容,如果未能解决你的问题,请参考以下文章

「SDS极客」Spark On Kubernetes存算分离的最佳实践

Spark入门--Spark的reduce和reduceByKey

Spark中的treeReduce与reduceByKey

在开始取决于reduceByKey结果的连接步骤之前,Spark是不是必须完成reduceByKey步骤中的所有条目的处理?

Spark笔记004-reduceByKey和groupBykey

在Spark中关于groupByKey与reduceByKey的区别