Spark 3.0 排序并应用于组 Scala/Java

Posted

技术标签:

【中文标题】Spark 3.0 排序并应用于组 Scala/Java【英文标题】:Spark 3.0 Sort and apply on group Scala/Java 【发布时间】:2020-07-01 04:04:06 【问题描述】:

我有 spark DataSet 让我们用 A、B、C 列查看

我要获取数据集

A 列上的组 B 列上的排序组(不是整个数据集) 迭代单个组,查找连续 N 行之间的一些序列/模式,并根据形成结果数据集的标准返回行

在 Flink 中

dataset.groupBy(0).sortGroup(1, Order.ASCENDING)
                .reduceGroup()

在 Pyspark 中

我们可以在 Pandas 中对组调用 apply 函数并在 pandas 中排序 但是与 Flink 相比,它的速度非常慢 10 倍

注意:我想对分组数据进行处理并返回另一个不是标准聚合的数据集

有人可以指出我在 Spark 中如何使用 java/scala 的类似代码吗?

【问题讨论】:

【参考方案1】:

几种可能的方法取决于迭代逻辑:

使用数据集 API

给定

val df =
      Seq(("a", 0, "foo"), ("b", 1, "foo"), ("a", 1, "foobar"))
        .toDF("A", "B", "C")

先对它进行一点预处理

df.select($"A", struct($"B", $"C") as $"S").show()

得到

+---+-----------+
|  A|          S|
+---+-----------+
|  a|   [0, foo]|
|  b|   [1, foo]|
|  a|[1, foobar]|
+---+-----------+

现在我们可以将任何 Scala 代码应用于元组 S 的序列,包括排序:

df.select($"A", struct($"B", $"C") as $"S")
      .groupBy("A")
      .agg(collect_list("S"))
      .as[(String, Seq[(Int, String)])]
      .map 
        case (a, l) => (a, l.sortBy(_._1).map(_._2).maxBy(_.length))
      
      .show()

使用 UDAF

实现自定义UDAF:

class MyAgg extends Aggregator[
      (Int, String),
      mutable.ListBuffer[(Int, String)],
      /* any output type here */] 
...

并使用它进行聚合:

val myagg = udaf(new MyAgg())
df.select($"A", struct($"B", $"C") as "S").groupBy($"A").agg(myagg($"S"))

【讨论】:

有没有方便的记录排序方式?此外,我的 agg dosent 提供了迭代组中记录的方法 记录可以在finish方法spark.apache.org/docs/latest/api/java/index.html?org/apache/…进行迭代 而 AFAIK 排序需要像第一个 sn-p 那样使用 Scala 集合排序来完成

以上是关于Spark 3.0 排序并应用于组 Scala/Java的主要内容,如果未能解决你的问题,请参考以下文章

Spark 3.0开发近两年终于发布,流PythonSQL重大更新详解

Spark 3.0重磅发布!开发近两年,流PythonSQL重大更新全面解读

从Spark SQL与RDD api编写数据

将不同的功能应用于组对象中的不同项目:Python pandas

云栖大会 | Apache Spark 3.0 和 Koalas 最新进展

spark-3.0 application 调度算法解析