如何在 Spark DataFrame 上应用部分排序?
Posted
技术标签:
【中文标题】如何在 Spark DataFrame 上应用部分排序?【英文标题】:How to apply partial sort on a Spark DataFrame? 【发布时间】:2020-07-26 15:55:54 【问题描述】:以下代码:
val myDF = Seq(83, 90, 40, 94, 12, 70, 56, 70, 28, 91).toDF("number")
myDF.orderBy("number").limit(3).show
输出:
+------+
|number|
+------+
| 12|
| 28|
| 40|
+------+
Spark 的惰性结合limit
调用和orderBy
的实现是否会自动生成partially sorted 数据帧,或者剩余的7 个数字是否也已排序,即使它不需要?如果是这样,有没有办法避免这种不必要的计算工作?
使用.explain()
显示,执行两个排序阶段,首先在每个分区上,然后(每个分区前 3 个)一个全局阶段。但它没有说明这些种类是完整的还是部分的。
myDF.orderBy("number").limit(3).explain(true)
== Parsed Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
+- Sort [number#3416 ASC NULLS FIRST], true
+- Project [value#3414 AS number#3416]
+- LocalRelation [value#3414]
== Analyzed Logical Plan ==
number: int
GlobalLimit 3
+- LocalLimit 3
+- Sort [number#3416 ASC NULLS FIRST], true
+- Project [value#3414 AS number#3416]
+- LocalRelation [value#3414]
== Optimized Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
+- Sort [number#3416 ASC NULLS FIRST], true
+- LocalRelation [number#3416]
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[number#3416 ASC NULLS FIRST], output=[number#3416])
+- LocalTableScan [number#3416]
【问题讨论】:
可能相关***.com/questions/59195346/… 【参考方案1】:如果您 explain()
您的数据框,您会发现 Spark 将首先在每个分区中进行“本地”排序,然后从每个分区中仅选择前三个元素进行最终全局排序,然后再取出前三个它。
scala> myDF.orderBy("number").limit(3).explain(true)
== Parsed Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
+- Sort [number#3 ASC NULLS FIRST], true
+- Project [value#1 AS number#3]
+- LocalRelation [value#1]
== Analyzed Logical Plan ==
number: int
GlobalLimit 3
+- LocalLimit 3
+- Sort [number#3 ASC NULLS FIRST], true
+- Project [value#1 AS number#3]
+- LocalRelation [value#1]
== Optimized Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
+- Sort [number#3 ASC NULLS FIRST], true
+- LocalRelation [number#3]
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[number#3 ASC NULLS FIRST], output=[number#3])
+- LocalTableScan [number#3]
我认为它最好在优化逻辑计划部分中看到,但物理也一样。
【讨论】:
很好,谢谢。你知道局部排序和最终的全局排序是完整的还是部分的? (我已经相应地编辑了我的问题。) 好问题!我相信它在内部使用了 Guava 的 TopK,所以它应该是部分的。但是如果你想确定的话,查看源代码可能是唯一的选择:) @abiratsis 指出他的出色回答证实了这一点 ^ 对,所以在两个级别(部分和最终)上都没有发生完整的排序。那是好消息! :)【参考方案2】:-
myDF.orderBy("number").limit(3).show
myDF.limit(3).orderBy("number").show
1 => 将进行完整排序,然后选择前 3 个元素。
2 => 将返回包含前 3 个元素的数据框并进行排序。
【讨论】:
我知道。但这就是我现在要问的。 ;)以上是关于如何在 Spark DataFrame 上应用部分排序?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark Dataframe 上获取按结果分组的元组?
如何基于第二个 DataFrame (Java) 在 Spark DataFrame 中创建新列?