Spark Dataframe Join shuffle
Posted
技术标签:
【中文标题】Spark Dataframe Join shuffle【英文标题】: 【发布时间】:2019-06-14 09:16:24 【问题描述】:Spark 版本 1.6.0
我在两个具有 100 个分区的数据帧之间使用连接函数,该应用程序在一个集群上运行,我为每 20 个执行程序使用 5 个内核,总共 100 个内核。
我的问题是,当我进行连接时,所有记录都在一个执行器上计算,而其他执行器没有使用,如下图所示:
这会导致性能下降,因为所有数据都是使用一个执行器与其他 19 个可用的执行器计算的。
看起来 spark join 只在一个分区中“带来”所有记录,有没有办法避免这种情况?
为了确保它不会重新分配到 1,我还设置了这个 spark 属性:spark.sql.shuffle.partitions=100
实际上,两个输入数据帧有 100 个与输出数据帧相同的分区
【问题讨论】:
【参考方案1】:简答:
这是因为你的数据,而不是因为 spark。
长答案:
为了执行join
操作,spark 需要将具有相同键(您要加入的列的值)的数据移动到相同的工作人员。例如。如果您将 A 列与 B 列连接,则两个表中包含相同值的行将被移动到相同的工作人员,然后再连接。
此外 - 具有不同键的行也可能移动到同一个节点 - 这取决于您拥有的分区器。您可以阅读更多 here - 但一般的想法是默认分区器 - HashPartitioner
和 RangePartitioner
。不管使用哪一个 - 它决定了哪个工人行。例如 - 如果您有 RangePartitioner 范围为 [0, 5)[5. 7)[7, 10] 然后键 1, 2, 3, 4 将全部交给同一个工人。如果您的数据中只有这些键 - 将只使用一名工作人员。
【讨论】:
以上是关于Spark Dataframe Join shuffle的主要内容,如果未能解决你的问题,请参考以下文章
Spark SQL中Dataframe join操作含null值的列