UDAF 合并 Spark DataSet/Dataframe 中第一个 orderdby 的行
Posted
技术标签:
【中文标题】UDAF 合并 Spark DataSet/Dataframe 中第一个 orderdby 的行【英文标题】:UDAF merge rows where are first orderdby in a Spark DataSet/Dataframe 【发布时间】:2017-06-06 09:13:34 【问题描述】:假设我们在 Spark 中有一个 dataset
/dataframe
,其中有 3 列
ID
、Word
、Timestamp
我想写一个UDAF
函数,我可以在其中做这样的事情
df.show()
ID | Word | Timestamp
1 | I | "2017-1-1 00:01"
1 | am | "2017-1-1 00:02"
1 | Chris | "2017-1-1 00:03"
2 | I | "2017-1-1 00:01"
2 | am | "2017-1-1 00:02"
2 | Jessica | "2017-1-1 00:03"
val df_merged = df.groupBy("ID")
.sort("ID", "Timestamp")
.agg(custom_agg("ID", "Word", "Timestamp")
df_merged.show
ID | Words | StartTime | EndTime |
1 | "I am Chris" | "2017-1-1 00:01" | "2017-1-1 00:03" |
1 | "I am Jessica" | "2017-1-1 00:01" | "2017-1-1 00:03" |
问题是如何确保Words
列在我的UDAF
中以正确的顺序合并?
【问题讨论】:
从udaf
返回的列将始终位于数据帧中列的末尾。但是您可以使用select
随意订购它们。
【参考方案1】:
这里是 Spark 2 的groupByKey
的解决方案(与无类型的Dataset
一起使用)。groupByKey 的优点是您可以访问该组(您在mapGroups
中获得一个Iterator[Row]
):
df.groupByKey(r => r.getAs[Int]("ID"))
.mapGroupscase(id,rows) =>
val sorted = rows
.toVector
.map(r => (r.getAs[String]("Word"),r.getAs[java.sql.Timestamp]("Timestamp")))
.sortBy(_._2.getTime)
(id,
sorted.map(_._1).mkString(" "),
sorted.map(_._2).head,
sorted.map(_._2).last
)
.toDF("ID","Words","StartTime","EndTime")
【讨论】:
这个答案也是正确的。但是 groupByKey 方案比 Window 方案快很多,至少对于小案例数据的场景是这样。有什么特别的原因吗?【参考方案2】:对不起,我没有使用 Scala,希望您能阅读。
Window
函数可以为所欲为:
df = df.withColumn('Words', f.collect_list(df['Word']).over(
Window().partitionBy(df['ID']).orderBy('Timestamp').rowsBetween(start=Window.unboundedPreceding,
end=Window.unboundedFollowing)))
输出:
+---+-------+-----------------+----------------+
| ID| Word| Timestamp| Words|
+---+-------+-----------------+----------------+
| 1| I|2017-1-1 00:01:00| [I, am, Chris]|
| 1| am|2017-1-1 00:02:00| [I, am, Chris]|
| 1| Chris|2017-1-1 00:03:00| [I, am, Chris]|
| 2| I|2017-1-1 00:01:00|[I, am, Jessica]|
| 2| am|2017-1-1 00:02:00|[I, am, Jessica]|
| 2|Jessica|2017-1-1 00:03:00|[I, am, Jessica]|
+---+-------+-----------------+----------------+
然后groupBy
以上数据:
df = df.groupBy(df['ID'], df['Words']).agg(
f.min(df['Timestamp']).alias('StartTime'), f.max(df['Timestamp']).alias('EndTime'))
df = df.withColumn('Words', f.concat_ws(' ', df['Words']))
输出:
+---+------------+-----------------+-----------------+
| ID| Words| StartTime| EndTime|
+---+------------+-----------------+-----------------+
| 1| I am Chris|2017-1-1 00:01:00|2017-1-1 00:03:00|
| 2|I am Jessica|2017-1-1 00:01:00|2017-1-1 00:03:00|
+---+------------+-----------------+-----------------+
【讨论】:
这是python
不是scala
代码,其次也许你可以向OP 解释一下代码?以上是关于UDAF 合并 Spark DataSet/Dataframe 中第一个 orderdby 的行的主要内容,如果未能解决你的问题,请参考以下文章