Spark SQL 使用窗口 - 根据列条件从当前行之后的行中收集数据
Posted
技术标签:
【中文标题】Spark SQL 使用窗口 - 根据列条件从当前行之后的行中收集数据【英文标题】:Spark SQL using a window - collect data from rows after current row based on a column condition 【发布时间】:2018-11-29 17:43:05 【问题描述】:我有一个像这样的 spark DataFrame(在 Scala 中):
+---------+-------------+------+---------+------------+
| user_id| item_id| mood| time|sessionBegin|
+---------+-------------+------+---------+------------+
| 1| A| Happy| 0| 0|
| 1| B| Happy| 1| 0|
| 1| C| Happy| 3| 0|
| 1| D| Happy| 5| 0|
| 1| C| Happy| 6| 0|
| 1| D| Sad| 6| 0|
| 1| C| Sad| 10| 0|
| 1| A| Happy| 28| 0|
| 1| E| Happy| 35| 0|
| 1| E| Sad| 60| 0|
| 2| F| Happy| 6| 6|
| 2| E| Happy| 17| 6|
| 2| D| Happy| 20| 6|
| 2| D| Sad| 21| 6|
| 2| E| Happy| 27| 6|
| 2| G| Happy| 37| 6|
| 2| H| Happy| 39| 6|
| 2| G| Sad| 45| 6|
+---------+-------------+------+---------+------------+
我在列(user_id、sessionBegin)上定义了一个窗口并按时间排序
val window = Window.partitionBy("user_id","sessionBegin").orderBy("time")
现在我想添加一列result
:
1) 检查 Mood 是否为 Happy
,然后仅在当前行 && mood = Sad
之后收集所有 item_id
。否则,如果 Mood 是 sad
:放入空数组。
2) 这必须超过我上面指定的window
。
(例如,此数据框有两个窗口 -> 第一个是 (user_id = 1 , sessionBegin = 0),第二个是 (user_id = 2, sessionBegin = 6)
因此生成的 DF 将是:
+---------+-------------+------+---------+------------+---------+
| user_id| item_id| mood| time|sessionBegin| result|
+---------+-------------+------+---------+------------+---------+
| 1| A| Happy| 0| 0| [D,C,E]|
| 1| B| Happy| 1| 0| [D,C,E]|
| 1| C| Happy| 3| 0| [D,C,E]|
| 1| D| Happy| 5| 0| [D,C,E]|
| 1| C| Happy| 6| 0| [D,C,E]|
| 1| D| Sad| 6| 0| []|
| 1| C| Sad| 10| 0| []|
| 1| A| Happy| 28| 0| [E]|
| 1| E| Happy| 35| 0| [E]|
| 1| E| Sad| 60| 0| []|
| 2| F| Happy| 6| 6| [D,G]|
| 2| E| Happy| 17| 6| [D,G]|
| 2| D| Happy| 20| 6| [D,G]|
| 2| D| Sad| 21| 6| []|
| 2| E| Happy| 27| 6| [G]|
| 2| G| Happy| 37| 6| [G]|
| 2| H| Happy| 39| 6| [G]|
| 2| G| Sad| 45| 6| []|
+---------+-------------+------+---------+------------+---------+
我在窗口上使用when..otherwise
使用collect_set
方法,但我无法弄清楚两件事:
-
如何只考虑当前行之后的行
对于所有带有
mood=Happy
的行,如何仅在mood=sad
时收集_设置item_id
?
有什么方法可以解决这个问题吗?
【问题讨论】:
您可以通过调用 rowsBetween 函数在窗口规范中使用行框。见databricks.com/blog/2015/07/15/… 第一行没有包含“B”的任何原因?...您的解释和预期结果不匹配... @stack0114106 "B" 不应包含在result
列中,因为我只想收集具有 Mood= Sad
的那些 item_id 。我现在已经编辑了预期的结果。第一行应包含 [D,C,E]。
【参考方案1】:
我无法在分区结束后的下一行和下一行之间给出行。所以我使用了当前行和无限跟随,然后使用 udf 删除了第一个 Array 元素。我已经使用了所有 - spark.sql、udf 和 df 操作.. 看看这个
val df = Seq((1,"A","Happy","0","0"),(1,"B","Happy","1","0"),(1,"C","Happy","3","0"),(1,"D","Happy","5","0"),(1,"C","Happy","6","0"),(1,"D","Sad","6","0"),(1,"C","Sad","10","0"),(1,"A","Happy","28","0"),(1,"E","Happy","35","0"),(1,"E","Sad","60","0"),(2,"F","Happy","6","6"),(2,"E","Happy","17","6"),(2,"D","Happy","20","6"),(2,"D","Sad","21","6"),(2,"E","Happy","27","6"),(2,"G","Happy","37","6"),(2,"H","Happy","39","6"),(2,"G","Sad","45","6")).toDF("user_id","item_id","mood","time","sessionBegin")
val df2 = df.withColumn("time", 'time.cast("int"))
df2.createOrReplaceTempView("user")
val df3 = spark.sql(
"""
select user_id, item_id, mood, time, sessionBegin,
case when mood='Happy' then
collect_list(case when mood='Happy' then ' ' when mood='Sad' then item_id end) over(partition by user_id order by time rows between current row and unbounded following )
when mood='Sad' then array()
end as result from user
""")
def sliceResult(x:Seq[String]):Seq[String]=
val y = x.drop(1).filter( _ != " ")
y.toSet.toSeq
val udf_sliceResult = udf ( sliceResult(_:Seq[String]):Seq[String] )
df3.withColumn("result1", udf_sliceResult('result) ).show(false)
结果:
+-------+-------+-----+----+------------+------------------------------+---------+
|user_id|item_id|mood |time|sessionBegin|result |result1 |
+-------+-------+-----+----+------------+------------------------------+---------+
|1 |A |Happy|0 |0 |[ , , , , , D, C, , , E]|[D, C, E]|
|1 |B |Happy|1 |0 |[ , , , , D, C, , , E] |[D, C, E]|
|1 |C |Happy|3 |0 |[ , , , D, C, , , E] |[D, C, E]|
|1 |D |Happy|5 |0 |[ , , D, C, , , E] |[D, C, E]|
|1 |C |Happy|6 |0 |[ , D, C, , , E] |[D, C, E]|
|1 |D |Sad |6 |0 |[] |[] |
|1 |C |Sad |10 |0 |[] |[] |
|1 |A |Happy|28 |0 |[ , , E] |[E] |
|1 |E |Happy|35 |0 |[ , E] |[E] |
|1 |E |Sad |60 |0 |[] |[] |
|2 |F |Happy|6 |6 |[ , , , D, , , , G] |[D, G] |
|2 |E |Happy|17 |6 |[ , , D, , , , G] |[D, G] |
|2 |D |Happy|20 |6 |[ , D, , , , G] |[D, G] |
|2 |D |Sad |21 |6 |[] |[] |
|2 |E |Happy|27 |6 |[ , , , G] |[G] |
|2 |G |Happy|37 |6 |[ , , G] |[G] |
|2 |H |Happy|39 |6 |[ , G] |[G] |
|2 |G |Sad |45 |6 |[] |[] |
+-------+-------+-----+----+------------+------------------------------+---------+
EDIT1:
正如 OP 所述,' ' 可以替换为 null
并且 df3 本身将是最终结果。这样就可以避免udf()
scala> :paste
// Entering paste mode (ctrl-D to finish)
val df3 = spark.sql(
"""
select user_id, item_id, mood, time, sessionBegin,
case when mood='Happy' then
collect_list(case when mood='Happy' then null when mood='Sad' then item_id end) over(partition by user_id order by time rows between current row and unbounded following )
when mood='Sad' then array()
end as result from user
""")
// Exiting paste mode, now interpreting.
df3: org.apache.spark.sql.DataFrame = [user_id: int, item_id: string ... 4 more fields]
scala> df3.show(false)
+-------+-------+-----+----+------------+---------+
|user_id|item_id|mood |time|sessionBegin|result |
+-------+-------+-----+----+------------+---------+
|1 |A |Happy|0 |0 |[D, C, E]|
|1 |B |Happy|1 |0 |[D, C, E]|
|1 |C |Happy|3 |0 |[D, C, E]|
|1 |D |Happy|5 |0 |[D, C, E]|
|1 |C |Happy|6 |0 |[D, C, E]|
|1 |D |Sad |6 |0 |[] |
|1 |C |Sad |10 |0 |[] |
|1 |A |Happy|28 |0 |[E] |
|1 |E |Happy|35 |0 |[E] |
|1 |E |Sad |60 |0 |[] |
|2 |F |Happy|6 |6 |[D, G] |
|2 |E |Happy|17 |6 |[D, G] |
|2 |D |Happy|20 |6 |[D, G] |
|2 |D |Sad |21 |6 |[] |
|2 |E |Happy|27 |6 |[G] |
|2 |G |Happy|37 |6 |[G] |
|2 |H |Happy|39 |6 |[G] |
|2 |G |Sad |45 |6 |[] |
+-------+-------+-----+----+------------+---------+
scala>
【讨论】:
我在你的sql语句中得到org.apache.spark.sql.AnalysisException:
,它说THEN and ELSE expressions should all be same type or coercible to a common type
2.1.0... 我通过在 else 部分添加 null
而不是 array()
来修复它。现在可以了!谢谢!!
是的,如果你把 null
代替 whitespace
,那么你不需要切片结果/删除数组的第一个元素 - 所以 df3
将是最终输出。
以上是关于Spark SQL 使用窗口 - 根据列条件从当前行之后的行中收集数据的主要内容,如果未能解决你的问题,请参考以下文章