UDAF 合并 Spark DataSet/Dataframe 中第一个 orderdby 的行

Posted

技术标签:

【中文标题】UDAF 合并 Spark DataSet/Dataframe 中第一个 orderdby 的行【英文标题】:UDAF merge rows where are first orderdby in a Spark DataSet/Dataframe 【发布时间】:2017-06-06 09:13:34 【问题描述】:

假设我们在 Spark 中有一个 dataset/dataframe,其中有 3 列 IDWordTimestamp

我想写一个UDAF 函数,我可以在其中做这样的事情

df.show()

ID | Word | Timestamp
1  | I    | "2017-1-1 00:01"
1  | am   | "2017-1-1 00:02"
1  | Chris | "2017-1-1 00:03"
2  | I    | "2017-1-1 00:01"
2  | am   | "2017-1-1 00:02"
2  | Jessica | "2017-1-1 00:03"

val df_merged = df.groupBy("ID")
  .sort("ID", "Timestamp")
  .agg(custom_agg("ID", "Word", "Timestamp")

df_merged.show

ID | Words         | StartTime        |      EndTime     |
1  | "I am Chris"  | "2017-1-1 00:01" | "2017-1-1 00:03" |
1  | "I am Jessica"  | "2017-1-1 00:01" | "2017-1-1 00:03" |

问题是如何确保Words 列在我的UDAF 中以正确的顺序合并?

【问题讨论】:

udaf 返回的列将始终位于数据帧中列的末尾。但是您可以使用select 随意订购它们。 【参考方案1】:

这里是 Spark 2 的groupByKey 的解决方案(与无类型的Dataset 一起使用)。groupByKey 的优点是您可以访问该组(您在mapGroups 中获得一个Iterator[Row]):

 df.groupByKey(r => r.getAs[Int]("ID"))
      .mapGroupscase(id,rows) => 
        val sorted = rows
          .toVector
          .map(r => (r.getAs[String]("Word"),r.getAs[java.sql.Timestamp]("Timestamp")))
          .sortBy(_._2.getTime)

        (id, 
         sorted.map(_._1).mkString(" "),
         sorted.map(_._2).head,
         sorted.map(_._2).last
         )  
        
      .toDF("ID","Words","StartTime","EndTime")

【讨论】:

这个答案也是正确的。但是 groupByKey 方案比 Window 方案快很多,至少对于小案例数据的场景是这样。有什么特别的原因吗?【参考方案2】:

对不起,我没有使用 Scala,希望您能阅读。

Window 函数可以为所欲为:

df = df.withColumn('Words', f.collect_list(df['Word']).over(
    Window().partitionBy(df['ID']).orderBy('Timestamp').rowsBetween(start=Window.unboundedPreceding,
                                                                    end=Window.unboundedFollowing)))

输出:

+---+-------+-----------------+----------------+                                
| ID|   Word|        Timestamp|           Words|
+---+-------+-----------------+----------------+
|  1|      I|2017-1-1 00:01:00|  [I, am, Chris]|
|  1|     am|2017-1-1 00:02:00|  [I, am, Chris]|
|  1|  Chris|2017-1-1 00:03:00|  [I, am, Chris]|
|  2|      I|2017-1-1 00:01:00|[I, am, Jessica]|
|  2|     am|2017-1-1 00:02:00|[I, am, Jessica]|
|  2|Jessica|2017-1-1 00:03:00|[I, am, Jessica]|
+---+-------+-----------------+----------------+

然后groupBy以上数据:

df = df.groupBy(df['ID'], df['Words']).agg(
    f.min(df['Timestamp']).alias('StartTime'), f.max(df['Timestamp']).alias('EndTime'))
df = df.withColumn('Words', f.concat_ws(' ', df['Words']))

输出:

+---+------------+-----------------+-----------------+                          
| ID|       Words|        StartTime|          EndTime|
+---+------------+-----------------+-----------------+
|  1|  I am Chris|2017-1-1 00:01:00|2017-1-1 00:03:00|
|  2|I am Jessica|2017-1-1 00:01:00|2017-1-1 00:03:00|
+---+------------+-----------------+-----------------+

【讨论】:

这是python 不是scala 代码,其次也许你可以向OP 解释一下代码?

以上是关于UDAF 合并 Spark DataSet/Dataframe 中第一个 orderdby 的行的主要内容,如果未能解决你的问题,请参考以下文章

极简spark教程spark聚合函数

在 spark sql 中注册 UDAF

spark UDAF

Spark之UDAF

spark的udf和udaf的注册

如何将数组传递给 Spark (UDAF) 中的用户定义聚合函数