Spark DataFrame:orderBy之后的groupBy是不是保持该顺序?

Posted

技术标签:

【中文标题】Spark DataFrame:orderBy之后的groupBy是不是保持该顺序?【英文标题】:Spark DataFrame: does groupBy after orderBy maintain that order?Spark DataFrame:orderBy之后的groupBy是否保持该顺序? 【发布时间】:2017-01-23 03:42:43 【问题描述】:

我有一个 Spark 2.0 数据框 example,其结构如下:

id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.

每个 id 包含 24 个条目(一天中的每个小时一个),并使用 orderBy 函数按 id、小时排序。

我创建了一个聚合器groupConcat

  def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable 
    override def zero: String = ""

    override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)

    override def merge(b1: String, b2: String) = b1 + b2

    override def finish(b: String) = b.substring(1)

    override def bufferEncoder: Encoder[String] = Encoders.STRING

    override def outputEncoder: Encoder[String] = Encoders.STRING
  .toColumn

它帮助我将列连接成字符串以获得这个最终数据帧:

id, hourly_count
id1, 12:55:..:44
id2, 12:89:..:34
etc.

我的问题是,如果我这样做example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count"),这是否保证每小时计数将在各自的存储桶中正确排序?

我读到 RDD 不一定是这种情况(请参阅Spark sort by key and then group by to get ordered iterable?),但 DataFrames 可能不同?

如果没有,我该如何解决?

【问题讨论】:

【参考方案1】:

正如其他人指出的那样,orderBy 之后的groupBy 无法维持秩序。您想要做的是使用 Window 函数,按 id 分区并按小时排序。您可以 collect_list 对此,然后取最大(最大)的结果列表,因为它们是累积的(即第一个小时只会在列表中包含自己,第二个小时将在列表中包含 2 个元素,依此类推)。

完整示例代码:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val data = Seq(( "id1", 0, 12),
  ("id1", 1, 55),
  ("id1", 23, 44),
  ("id2", 0, 12),
  ("id2", 1, 89),
  ("id2", 23, 34)).toDF("id", "hour", "count")

    val mergeList = udf(strings: Seq[String]) => strings.mkString(":")
    data.withColumn("collected", collect_list($"count")
                                                    .over(Window.partitionBy("id")
                                                                 .orderBy("hour")))
            .groupBy("id")
            .agg(max($"collected").as("collected"))
            .withColumn("hourly_count", mergeList($"collected"))
            .select("id", "hourly_count").show

这使我们保持在 DataFrame 世界中。我还简化了 OP 使用的 UDF 代码。

输出:

+---+------------+
| id|hourly_count|
+---+------------+
|id1|    12:55:44|
|id2|    12:89:34|
+---+------------+

【讨论】:

【参考方案2】:

如果您想在 Java 中解决实现问题(Scala 和 Python 应该类似):

example.orderBy("hour")
    .groupBy("id")
    .agg(functions.sort_array(
      functions.collect_list( 
        functions.struct(dataRow.col("hour"),
                         dataRow.col("count"))),false)
    .as("hourly_count"));

【讨论】:

【参考方案3】:

我有一个订单并不总是保持不变的情况:有时是,大多数时候不是。

我的数据框有 200 个在 Spark 1.6 上运行的分区

df_group_sort = data.orderBy(times).groupBy(group_key).agg(
                                                  F.sort_array(F.collect_list(times)),
                                                  F.collect_list(times)
                                                           )

为了检查排序,我比较了返回值

F.sort_array(F.collect_list(times))

F.collect_list(times)

给予例如(左:sort_array(collect_list());右:collect_list())

2016-12-19 08:20:27.172000 2016-12-19 09:57:03.764000
2016-12-19 08:20:30.163000 2016-12-19 09:57:06.763000
2016-12-19 08:20:33.158000 2016-12-19 09:57:09.763000
2016-12-19 08:20:36.158000 2016-12-19 09:57:12.763000
2016-12-19 08:22:27.090000 2016-12-19 09:57:18.762000
2016-12-19 08:22:30.089000 2016-12-19 09:57:33.766000
2016-12-19 08:22:57.088000 2016-12-19 09:57:39.811000
2016-12-19 08:23:03.085000 2016-12-19 09:57:45.770000
2016-12-19 08:23:06.086000 2016-12-19 09:57:57.809000
2016-12-19 08:23:12.085000 2016-12-19 09:59:56.333000
2016-12-19 08:23:15.086000 2016-12-19 10:00:11.329000
2016-12-19 08:23:18.087000 2016-12-19 10:00:14.331000
2016-12-19 08:23:21.085000 2016-12-19 10:00:17.329000
2016-12-19 08:23:24.085000 2016-12-19 10:00:20.326000

左列总是排序的,而右列只包含排序的块。 对于 take() 的不同执行,右列中块的顺序是不同的。

【讨论】:

接受的答案表明您需要按要排序的列以及分组的列进行排序,即orderBy(times, group_key).groupBy(group_key)。你试过了吗?【参考方案4】:

顺序可能相同也可能不同,具体取决于分区数量和数据分布。我们可以使用 rdd 本身来解决。

例如::

我将以下示例数据保存在一个文件中,并将其加载到 hdfs 中。

1,type1,300
2,type1,100
3,type2,400
4,type2,500
5,type1,400
6,type3,560
7,type2,200
8,type3,800

并执行以下命令:

sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3).groupBy(_(1)).mapValues(x=>x.toList.sortBy(_(2)).map(_(0)).mkString("~")).collect()

输出:

Array[(String, String)] = Array((type3,6~8), (type1,2~1~5), (type2,7~3~4))

也就是说,我们按类型对数据进行分组,然后按价格排序,并以“~”作为分隔符连接 ID。 上面的命令可以分解如下:

val validData=sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3)

val groupedData=validData.groupBy(_(1))  //group data rdds

val sortedJoinedData=groupedData.mapValues(x=>
   val list=x.toList
   val sortedList=list.sortBy(_(2))
   val idOnlyList=sortedList.map(_(0))
   idOnlyList.mkString("~")

)
sortedJoinedData.collect()

然后我们可以使用命令获取特定组

sortedJoinedData.filter(_._1=="type1").collect()

输出:

Array[(String, String)] = Array((type1,2~1~5))

【讨论】:

【参考方案5】:

不,groupByKey 内的排序不一定会保持,但众所周知,这很难在一个节点的内存中重现。如前所述,发生这种情况的最典型方式是需要重新分区以使groupByKey 发生。我设法通过在sort 之后手动执行repartition 来重现这一点。然后我将结果传递给groupByKey

case class Numbered(num:Int, group:Int, otherData:Int)

// configure spark with "spark.sql.shuffle.partitions" = 2 or some other small number 

val v =
  (1 to 100000)
    // Make waaay more groups then partitions. I added an extra integer just to mess with the sort hash computation (i.e. so it won't be monotonic, not sure if needed)
    .map(Numbered(_, Random.nextInt(300), Random.nextInt(1000000))).toDS()
    // Be sure they are stored in a small number of partitions
    .repartition(2)
    .sort($"num")
    // Repartition again with a waaay bigger number then there are groups so that when things need to be merged you can get them out of order.
    .repartition(200)
    .groupByKey(_.group)
    .mapGroups 
      case (g, nums) =>
        nums             // all you need is .sortBy(_.num) here to fix the problem          
          .map(_.num)
          .mkString("~")
    
    .collect()

// Walk through the concatenated strings. If any number ahead 
// is smaller than the number before it, you know that something
// is out of order.
v.zipWithIndex.map  case (r, i) =>
  r.split("~").map(_.toInt).foldLeft(0)  case (prev, next) =>
    if (next < prev) 
      println(s"*** Next: $next less then $prev for dataset $i + 1 ***")
    
    next
  

【讨论】:

【参考方案6】:

简短的回答是肯定的,每小时计数将保持相同的顺序。

概括地说,在分组之前进行排序很重要。此外,排序必须与您实际想要排序的组 + 列相同。

举个例子:

employees
    .sort("company_id", "department_id", "employee_role")
    .groupBy("company_id", "department_id")
    .agg(Aggregators.groupConcat(":", 2) as "count_per_role")

【讨论】:

您是否有任何参考资料表明 groupBy 保持顺序?我在官方文档中找不到任何内容 我没有官方文档,但我有这篇文章可以更好地解释机制bzhangusc.wordpress.com/2015/05/28/…。cmets 也很有趣。 有趣的是,即使是 Sean Owen 本人也表示可能不会保留排序 (issues.apache.org/jira/browse/…) 有人看过我2017年6月7日添加的文章和cmets吗?

以上是关于Spark DataFrame:orderBy之后的groupBy是不是保持该顺序?的主要内容,如果未能解决你的问题,请参考以下文章

没有 orderBy 的 Spark 窗口函数

如何在 Spark DataFrame 上应用部分排序?

Spark2 DataFrame数据框常用操作之cube与rollup

大数据Spark DataFrame/DataSet常用操作

大数据Spark DataFrame/DataSet常用操作

哪个更快? Spark SQL with Where 子句或在 Spark SQL 之后在 Dataframe 中使用过滤器