Spark 3.0 排序并应用于组 Scala/Java
Posted
技术标签:
【中文标题】Spark 3.0 排序并应用于组 Scala/Java【英文标题】:Spark 3.0 Sort and apply on group Scala/Java 【发布时间】:2020-07-01 04:04:06 【问题描述】:我有 spark DataSet 让我们用 A、B、C 列查看
我要获取数据集
A 列上的组 B 列上的排序组(不是整个数据集) 迭代单个组,查找连续 N 行之间的一些序列/模式,并根据形成结果数据集的标准返回行在 Flink 中
dataset.groupBy(0).sortGroup(1, Order.ASCENDING)
.reduceGroup()
在 Pyspark 中
我们可以在 Pandas 中对组调用 apply 函数并在 pandas 中排序 但是与 Flink 相比,它的速度非常慢 10 倍
注意:我想对分组数据进行处理并返回另一个不是标准聚合的数据集
有人可以指出我在 Spark 中如何使用 java/scala 的类似代码吗?
【问题讨论】:
【参考方案1】:几种可能的方法取决于迭代逻辑:
使用数据集 API
给定
val df =
Seq(("a", 0, "foo"), ("b", 1, "foo"), ("a", 1, "foobar"))
.toDF("A", "B", "C")
先对它进行一点预处理
df.select($"A", struct($"B", $"C") as $"S").show()
得到
+---+-----------+
| A| S|
+---+-----------+
| a| [0, foo]|
| b| [1, foo]|
| a|[1, foobar]|
+---+-----------+
现在我们可以将任何 Scala 代码应用于元组 S 的序列,包括排序:
df.select($"A", struct($"B", $"C") as $"S")
.groupBy("A")
.agg(collect_list("S"))
.as[(String, Seq[(Int, String)])]
.map
case (a, l) => (a, l.sortBy(_._1).map(_._2).maxBy(_.length))
.show()
使用 UDAF
实现自定义UDAF:
class MyAgg extends Aggregator[
(Int, String),
mutable.ListBuffer[(Int, String)],
/* any output type here */]
...
并使用它进行聚合:
val myagg = udaf(new MyAgg())
df.select($"A", struct($"B", $"C") as "S").groupBy($"A").agg(myagg($"S"))
【讨论】:
有没有方便的记录排序方式?此外,我的 agg dosent 提供了迭代组中记录的方法 记录可以在finish
方法spark.apache.org/docs/latest/api/java/index.html?org/apache/…进行迭代
而 AFAIK 排序需要像第一个 sn-p 那样使用 Scala 集合排序来完成以上是关于Spark 3.0 排序并应用于组 Scala/Java的主要内容,如果未能解决你的问题,请参考以下文章
Spark 3.0开发近两年终于发布,流PythonSQL重大更新详解
Spark 3.0重磅发布!开发近两年,流PythonSQL重大更新全面解读
将不同的功能应用于组对象中的不同项目:Python pandas