跨越分区的SparkSQL DataFrame顺序

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了跨越分区的SparkSQL DataFrame顺序相关的知识,希望对你有一定的参考价值。

我正在使用spark sql对我的数据集运行查询。查询的结果非常小但仍然是分区的。

我想合并生成的DataFrame并按列排序行。我试过了

DataFrame result = sparkSQLContext.sql("my sql").coalesce(1).orderBy("col1")
result.toJSON().saveAsTextFile("output")

我也试过了

DataFrame result = sparkSQLContext.sql("my sql").repartition(1).orderBy("col1")
result.toJSON().saveAsTextFile("output")

输出文件以块的形式排序(即分区是有序的,但数据帧不作为整体排序)。例如,而不是

1, value
2, value
4, value
4, value
5, value
5, value
...

我明白了

2, value
4, value
5, value
-----------> partition boundary
1, value
4, value
5, value
  1. 获取查询结果的绝对排序的正确方法是什么?
  2. 为什么数据框不会合并为单个分区?
答案

我想在这里提几件事。 1-源代码显示orderBy语句在内部调用排序api,全局排序设置为true。因此,输出级别缺乏排序表明在写入目标时排序丢失。我的观点是,对orderBy的调用始终需要全局订单。

2-使用剧烈的聚结,如在你的情况下强制单个分区,可能是非常危险的。我建议你不要这样做。源代码表明调用coalesce(1)可能会导致上游转换使用单个分区。这将是残酷的表现。

3-您似乎希望orderBy语句可以使用单个分区执行。我不认为我同意这一说法。这将使Spark成为一个非常愚蠢的分布式框架。

如果您同意或不同意声明,请告知我们。

你是如何从输出中收集数据的呢?

也许输出实际上包含已排序的数据,但您为了从输出中读取而执行的转换/操作是导致订单丢失的原因。

另一答案

orderBy将在合并后生成新分区。要拥有单个输出分区,请重新排序操作...

DataFrame result = spark.sql("my sql").orderBy("col1").coalesce(1)
result.write.json("results.json")

正如@JavaPlanet所提到的,对于非常大的数据,您不希望合并到单个分区中。它将大大降低您的并行度。

以上是关于跨越分区的SparkSQL DataFrame顺序的主要内容,如果未能解决你的问题,请参考以下文章

DataFrame,SparkSQL

SparkSQL使用IDEA创建DataFrame

sparkSQL创建DataFrame

小记--------sparksql和DataFrame的小小案例javascala版本

大数据SparkSql连接查询中的谓词下推处理

SparkSql之DataFrame操作