如何在 Spark DataFrame 上应用部分排序?

Posted

技术标签:

【中文标题】如何在 Spark DataFrame 上应用部分排序?【英文标题】:How to apply partial sort on a Spark DataFrame? 【发布时间】:2020-07-26 15:55:54 【问题描述】:

以下代码:

val myDF = Seq(83, 90, 40, 94, 12, 70, 56, 70, 28, 91).toDF("number")
myDF.orderBy("number").limit(3).show

输出:

+------+
|number|
+------+
|    12|
|    28|
|    40|
+------+

Spark 的惰性结合limit 调用和orderBy 的实现是否会自动生成partially sorted 数据帧,或者剩余的7 个数字是否也已排序,即使它不需要?如果是这样,有没有办法避免这种不必要的计算工作?


使用.explain() 显示,执行两个排序阶段,首先在每个分区上,然后(每个分区前 3 个)一个全局阶段。但它没有说明这些种类是完整的还是部分的。

myDF.orderBy("number").limit(3).explain(true)
== Parsed Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
   +- Sort [number#3416 ASC NULLS FIRST], true
      +- Project [value#3414 AS number#3416]
         +- LocalRelation [value#3414]

== Analyzed Logical Plan ==
number: int
GlobalLimit 3
+- LocalLimit 3
   +- Sort [number#3416 ASC NULLS FIRST], true
      +- Project [value#3414 AS number#3416]
         +- LocalRelation [value#3414]

== Optimized Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
   +- Sort [number#3416 ASC NULLS FIRST], true
      +- LocalRelation [number#3416]

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[number#3416 ASC NULLS FIRST], output=[number#3416])
+- LocalTableScan [number#3416]

【问题讨论】:

可能相关***.com/questions/59195346/… 【参考方案1】:

如果您 explain() 您的数据框,您会发现 Spark 将首先在每个分区中进行“本地”排序,然后从每个分区中仅选择前三个元素进行最终全局排序,然后再取出前三个它。

scala> myDF.orderBy("number").limit(3).explain(true)
== Parsed Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
   +- Sort [number#3 ASC NULLS FIRST], true
      +- Project [value#1 AS number#3]
         +- LocalRelation [value#1]

== Analyzed Logical Plan ==
number: int
GlobalLimit 3
+- LocalLimit 3
   +- Sort [number#3 ASC NULLS FIRST], true
      +- Project [value#1 AS number#3]
         +- LocalRelation [value#1]

== Optimized Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
   +- Sort [number#3 ASC NULLS FIRST], true
      +- LocalRelation [number#3]

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[number#3 ASC NULLS FIRST], output=[number#3])
+- LocalTableScan [number#3]

我认为它最好在优化逻辑计划部分中看到,但物理也一样。

【讨论】:

很好,谢谢。你知道局部排序和最终的全局排序是完整的还是部分的? (我已经相应地编辑了我的问题。) 好问题!我相信它在内部使用了 Guava 的 TopK,所以它应该是部分的。但是如果你想确定的话,查看源代码可能是唯一的选择:) @abiratsis 指出他的出色回答证实了这一点 ^ 对,所以在两个级别(部分和最终)上都没有发生完整的排序。那是好消息! :)【参考方案2】:
    myDF.orderBy("number").limit(3).show myDF.limit(3).orderBy("number").show

1 => 将进行完整排序,然后选择前 3 个元素。

2 => 将返回包含前 3 个元素的数据框并进行排序。

【讨论】:

我知道。但这就是我现在要问的。 ;)

以上是关于如何在 Spark DataFrame 上应用部分排序?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark Dataframe 上获取按结果分组的元组?

Spark SQL DataFrame - 异常处理

在 DataFrame 上应用映射函数

如何基于第二个 DataFrame (Java) 在 Spark DataFrame 中创建新列?

Spark scala Dataframe:如何将自定义类型应用于现有数据框?

如何使延迟加载 Apache Spark Dataframe 连接到 REST API