Spark 的 reduceByKey 最佳实践
Posted
技术标签:
【中文标题】Spark 的 reduceByKey 最佳实践【英文标题】:Spark best practices for reduceByKey 【发布时间】:2017-05-13 19:37:22 【问题描述】:我有一个带有下一个架构的数据框:
root
|-- id_1: long (nullable = true)
|-- id_2: long (nullable = true)
|-- score: double (nullable = true)
数据如下:
+----+----+------------------+
|id_1|id_2|score |
+----+----+------------------+
|0 |9 |0.5888888888888889|
|0 |1 |0.6166666666666667|
|0 |2 |0.496996996996997 |
|1 |9 |0.6222222222222221|
|1 |6 |0.9082996632996633|
|1 |5 |0.5927450980392157|
|2 |3 |0.665774107440774 |
|3 |8 |0.6872367465504721|
|3 |8 |0.6872367465504721|
|5 |6 |0.5365909090909091|
+----+----+------------------+
目标是为每个 id_1 找到最高分的 id_2。也许我错了,但是……只需要创建配对的 RDD:
root
|-- _1: long (nullable = true)
|-- _2: struct (nullable = true)
| |-- _1: long (nullable = true)
| |-- _2: double (nullable = true)
+---+----------------------+
|_1 |_2 |
+---+----------------------+
|0 |[9,0.5888888888888889]|
|0 |[1,0.6166666666666667]|
|0 |[2,0.496996996996997] |
|1 |[9,0.6222222222222221]|
|1 |[6,0.9082996632996633]|
|1 |[5,0.5927450980392157]|
|2 |[3,0.665774107440774] |
|3 |[8,0.6872367465504721]|
|3 |[8,0.6872367465504721]|
|5 |[6,0.5365909090909091]|
+---+----------------------+
并通过最大键减少。类似的东西
paired_rdd.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-1]))
或与DataFrame API相同(没有配对的rdd):
original_df.groupBy('id_1').max('score')
我有两个问题,如果有人能指出我的错误步骤,我们将不胜感激。
对于 10 亿甚至 1000 亿条记录:实现目标的最佳实践是什么(为每个 id_1 找到 id_2 并获得最高分数)?我已经尝试了 5000 万和 100M 记录,并且通过改组数据获得了更好的结果(这与 Holden Karau 所说的相反)。我已经通过id_1
完成了重新分区
.repartition(X, "id_1")
然后 reduceByKey 更快。为什么?
为什么 DataFrame API 比 RDD API 慢几倍?我哪里错了?
谢谢。
【问题讨论】:
original_df
是如何分区的?
@RaphaelRoth 我相信它取决于核心数量。 (是的,经验法则是每个分区 128MB,但它不是文本数据,也不会消耗太多内存)我尝试了 30 个内核并使用了 30 个,这是更好的时机。使用 60、120 - 更糟糕,少于 30 - 更糟糕,因为并非所有核心都已加载。
也许不是 RDD API 更快,而是您对数据进行了重新分区。如果你的原始dataframe分区太少,那么dataframe API的groupBy
将无法正确使用你的集群资源(因为你没有足够的并行度)
@RaphaelRoth 所有执行程序/核心都很忙,所以我相信,并行性就足够了。我怎样才能检查它?很快将使用 Spark UI 创建具有相同数据但不同数量的分区和 RDD 与 DataFrame 的屏幕。这是 youtube 上的链接,时间为 youtu.be/V6DkTVvy9vk?t=20m26s,其中 RDD 与 DataFrame 速度的条形图。
【参考方案1】:
您的用例是窗口聚合函数的完美用例。试一试,看看它与 RDD 的 reduceByKey
相比如何。
有时不是关于基于 RDD 的管道是否比基于 DataFrame 的管道更快,而是关于一个与另一个相比的表现力如何。基于 DataFrame 的管道几乎总是比基于 RDD 的替代方案更具表现力(并且从长远来看可能更易于维护)。
(我正在使用 Scala 并将代码转换为 Python 作为家庭练习)
scala> dataset.show
+----+----+------------------+
|id_1|id_2| score|
+----+----+------------------+
| 0| 9|0.5888888888888889|
| 0| 1|0.6166666666666667|
| 0| 2| 0.496996996996997|
| 1| 9|0.6222222222222221|
| 1| 6|0.9082996632996633|
| 1| 5|0.5927450980392157|
| 2| 3| 0.665774107440774|
| 3| 8|0.6872367465504721|
| 3| 8|0.6872367465504721|
| 5| 6|0.5365909090909091|
+----+----+------------------+
import org.apache.spark.sql.expressions.Window
val byId_1 = Window.partitionBy("id_1")
original_df.select($"id_1", max() over byId_1)
scala> dataset.
select($"id_1", $"id_2", $"score", max("score") over byId_1 as "max_score").
filter($"score" === $"max_score").
distinct. // <-- id_1 == 3 is duplicated
sort("id_1").
show
+----+----+------------------+------------------+
|id_1|id_2| score| max_score|
+----+----+------------------+------------------+
| 0| 1|0.6166666666666667|0.6166666666666667|
| 1| 6|0.9082996632996633|0.9082996632996633|
| 2| 3| 0.665774107440774| 0.665774107440774|
| 3| 8|0.6872367465504721|0.6872367465504721|
| 5| 6|0.5365909090909091|0.5365909090909091|
+----+----+------------------+------------------+
请注意,默认情况下,DataFrames 使用spark.sql.shuffle.partitions
,即200
,上周我遇到了一个案例,其中大多数分区(以及因此的任务)都是空的,导致数千个任务等待执行,这是徒劳的并烧毁 CPU 周期。我们从几小时缩短到几秒。
了解您的数据及其应如何分区是优化 Spark 查询的第一步,无论它是使用 RDD API 还是 Dataset API 编写的。
【讨论】:
【参考方案2】:感谢 Jacek 的有趣建议。
我已经在 4 * c4.8xlarge 服务器上进行了一些测试(128 个内核,192GB RAM,我希望 32 个工作人员和 partitioning=128 对这个设置有好处)。 使用具有 1,368,598,093 条记录的数据集。
-
“窗口”解决方案 - 大约 43 分钟 并生成大约 31GB 随机播放(15.4GB 随机写入和 15.4GB 随机读取)。参见第 25 阶段。
不按 id 重新分区的 reduceByKey 解决方案 - 40min 和 8.4MB 随机播放(然后 4.2MB 随机写入和 4.2MB 随机读取)参见第 22 阶段
获胜者 - reduceByKey 与 id 重新分区。 22min 和 15GB 随机播放(7.5GB 随机写入和 7.5GB 随机读取)参见第 24 阶段
我相信如果我要处理 200B 条记录,shuffle 会导致一些 IO 问题,最好不要使用某些列的 repartition(因为 shuffle),但我不知道如何在没有它的情况下提高速度。不幸的是 *** 不能给我正确的答案。 :(
感谢各位有趣的建议!
【讨论】:
以上是关于Spark 的 reduceByKey 最佳实践的主要内容,如果未能解决你的问题,请参考以下文章
「SDS极客」Spark On Kubernetes存算分离的最佳实践
Spark入门--Spark的reduce和reduceByKey
在开始取决于reduceByKey结果的连接步骤之前,Spark是不是必须完成reduceByKey步骤中的所有条目的处理?