Spark SQL 使用窗口 - 根据列条件从当前行之后的行中收集数据

Posted

技术标签:

【中文标题】Spark SQL 使用窗口 - 根据列条件从当前行之后的行中收集数据【英文标题】:Spark SQL using a window - collect data from rows after current row based on a column condition 【发布时间】:2018-11-29 17:43:05 【问题描述】:

我有一个像这样的 spark DataFrame(在 Scala 中):

+---------+-------------+------+---------+------------+
|  user_id|      item_id|  mood|     time|sessionBegin|
+---------+-------------+------+---------+------------+
|        1|            A| Happy|        0|           0|
|        1|            B| Happy|        1|           0|
|        1|            C| Happy|        3|           0|
|        1|            D| Happy|        5|           0|
|        1|            C| Happy|        6|           0|
|        1|            D|   Sad|        6|           0|
|        1|            C|   Sad|       10|           0|
|        1|            A| Happy|       28|           0|
|        1|            E| Happy|       35|           0|
|        1|            E|   Sad|       60|           0|
|        2|            F| Happy|        6|           6|
|        2|            E| Happy|       17|           6|
|        2|            D| Happy|       20|           6|
|        2|            D|   Sad|       21|           6|
|        2|            E| Happy|       27|           6|
|        2|            G| Happy|       37|           6|
|        2|            H| Happy|       39|           6|
|        2|            G|   Sad|       45|           6|
+---------+-------------+------+---------+------------+

我在列(user_id、sessionBegin)上定义了一个窗口并按时间排序

val window = Window.partitionBy("user_id","sessionBegin").orderBy("time")

现在我想添加一列result

1) 检查 Mood 是否为 Happy,然后仅在当前行 && mood = Sad 之后收集所有 item_id。否则,如果 Mood 是 sad:放入空数组。

2) 这必须超过我上面指定的window。 (例如,此数据框有两个窗口 -> 第一个是 (user_id = 1 , sessionBegin = 0),第二个是 (user_id = 2, sessionBegin = 6)

因此生成的 DF 将是:

+---------+-------------+------+---------+------------+---------+
|  user_id|      item_id|  mood|     time|sessionBegin|   result|
+---------+-------------+------+---------+------------+---------+
|        1|            A| Happy|        0|           0|  [D,C,E]|
|        1|            B| Happy|        1|           0|  [D,C,E]|
|        1|            C| Happy|        3|           0|  [D,C,E]|
|        1|            D| Happy|        5|           0|  [D,C,E]|
|        1|            C| Happy|        6|           0|  [D,C,E]|
|        1|            D|   Sad|        6|           0|       []|
|        1|            C|   Sad|       10|           0|       []|
|        1|            A| Happy|       28|           0|      [E]|
|        1|            E| Happy|       35|           0|      [E]|
|        1|            E|   Sad|       60|           0|       []|
|        2|            F| Happy|        6|           6|    [D,G]|
|        2|            E| Happy|       17|           6|    [D,G]|
|        2|            D| Happy|       20|           6|    [D,G]|
|        2|            D|   Sad|       21|           6|       []|
|        2|            E| Happy|       27|           6|      [G]|
|        2|            G| Happy|       37|           6|      [G]|
|        2|            H| Happy|       39|           6|      [G]|
|        2|            G|   Sad|       45|           6|       []|
+---------+-------------+------+---------+------------+---------+

我在窗口上使用when..otherwise 使用collect_set 方法,但我无法弄清楚两件事:

    如何只考虑当前行之后的行 对于所有带有mood=Happy 的行,如何仅在mood=sad 时收集_设置item_id

有什么方法可以解决这个问题吗?

【问题讨论】:

您可以通过调用 rowsBetween 函数在窗口规范中使用行框。见databricks.com/blog/2015/07/15/… 第一行没有包含“B”的任何原因?...您的解释和预期结果不匹配... @stack0114106 "B" 不应包含在 result 列中,因为我只想收集具有 Mood= Sad 的那些 item_id 。我现在已经编辑了预期的结果。第一行应包含 [D,C,E]。 【参考方案1】:

我无法在分区结束后的下一行和下一行之间给出行。所以我使用了当前行和无限跟随,然后使用 udf 删除了第一个 Array 元素。我已经使用了所有 - spark.sql、udf 和 df 操作.. 看看这个

val df = Seq((1,"A","Happy","0","0"),(1,"B","Happy","1","0"),(1,"C","Happy","3","0"),(1,"D","Happy","5","0"),(1,"C","Happy","6","0"),(1,"D","Sad","6","0"),(1,"C","Sad","10","0"),(1,"A","Happy","28","0"),(1,"E","Happy","35","0"),(1,"E","Sad","60","0"),(2,"F","Happy","6","6"),(2,"E","Happy","17","6"),(2,"D","Happy","20","6"),(2,"D","Sad","21","6"),(2,"E","Happy","27","6"),(2,"G","Happy","37","6"),(2,"H","Happy","39","6"),(2,"G","Sad","45","6")).toDF("user_id","item_id","mood","time","sessionBegin")
val df2 = df.withColumn("time", 'time.cast("int"))
df2.createOrReplaceTempView("user")

val df3 = spark.sql(
  """
    select user_id, item_id, mood, time, sessionBegin,
    case when mood='Happy' then
    collect_list(case when mood='Happy' then ' ' when mood='Sad' then item_id end) over(partition by user_id order by time rows between current row  and unbounded following )
    when mood='Sad' then array()
    end as result from user

  """)
def sliceResult(x:Seq[String]):Seq[String]=
  val y = x.drop(1).filter( _ != " ")
  y.toSet.toSeq

val udf_sliceResult = udf ( sliceResult(_:Seq[String]):Seq[String]  )
df3.withColumn("result1", udf_sliceResult('result) ).show(false)

结果:

+-------+-------+-----+----+------------+------------------------------+---------+
|user_id|item_id|mood |time|sessionBegin|result                        |result1  |
+-------+-------+-----+----+------------+------------------------------+---------+
|1      |A      |Happy|0   |0           |[ ,  ,  ,  ,  , D, C,  ,  , E]|[D, C, E]|
|1      |B      |Happy|1   |0           |[ ,  ,  ,  , D, C,  ,  , E]   |[D, C, E]|
|1      |C      |Happy|3   |0           |[ ,  ,  , D, C,  ,  , E]      |[D, C, E]|
|1      |D      |Happy|5   |0           |[ ,  , D, C,  ,  , E]         |[D, C, E]|
|1      |C      |Happy|6   |0           |[ , D, C,  ,  , E]            |[D, C, E]|
|1      |D      |Sad  |6   |0           |[]                            |[]       |
|1      |C      |Sad  |10  |0           |[]                            |[]       |
|1      |A      |Happy|28  |0           |[ ,  , E]                     |[E]      |
|1      |E      |Happy|35  |0           |[ , E]                        |[E]      |
|1      |E      |Sad  |60  |0           |[]                            |[]       |
|2      |F      |Happy|6   |6           |[ ,  ,  , D,  ,  ,  , G]      |[D, G]   |
|2      |E      |Happy|17  |6           |[ ,  , D,  ,  ,  , G]         |[D, G]   |
|2      |D      |Happy|20  |6           |[ , D,  ,  ,  , G]            |[D, G]   |
|2      |D      |Sad  |21  |6           |[]                            |[]       |
|2      |E      |Happy|27  |6           |[ ,  ,  , G]                  |[G]      |
|2      |G      |Happy|37  |6           |[ ,  , G]                     |[G]      |
|2      |H      |Happy|39  |6           |[ , G]                        |[G]      |
|2      |G      |Sad  |45  |6           |[]                            |[]       |
+-------+-------+-----+----+------------+------------------------------+---------+

EDIT1:

正如 OP 所述,' ' 可以替换为 null 并且 df3 本身将是最终结果。这样就可以避免udf()

scala> :paste
// Entering paste mode (ctrl-D to finish)

val df3 = spark.sql(
  """
    select user_id, item_id, mood, time, sessionBegin,
    case when mood='Happy' then
    collect_list(case when mood='Happy' then null when mood='Sad' then item_id end) over(partition by user_id order by time rows between current row  and unbounded following )
    when mood='Sad' then array()
    end as result from user
  """)

// Exiting paste mode, now interpreting.

df3: org.apache.spark.sql.DataFrame = [user_id: int, item_id: string ... 4 more fields]

scala> df3.show(false)
+-------+-------+-----+----+------------+---------+
|user_id|item_id|mood |time|sessionBegin|result   |
+-------+-------+-----+----+------------+---------+
|1      |A      |Happy|0   |0           |[D, C, E]|
|1      |B      |Happy|1   |0           |[D, C, E]|
|1      |C      |Happy|3   |0           |[D, C, E]|
|1      |D      |Happy|5   |0           |[D, C, E]|
|1      |C      |Happy|6   |0           |[D, C, E]|
|1      |D      |Sad  |6   |0           |[]       |
|1      |C      |Sad  |10  |0           |[]       |
|1      |A      |Happy|28  |0           |[E]      |
|1      |E      |Happy|35  |0           |[E]      |
|1      |E      |Sad  |60  |0           |[]       |
|2      |F      |Happy|6   |6           |[D, G]   |
|2      |E      |Happy|17  |6           |[D, G]   |
|2      |D      |Happy|20  |6           |[D, G]   |
|2      |D      |Sad  |21  |6           |[]       |
|2      |E      |Happy|27  |6           |[G]      |
|2      |G      |Happy|37  |6           |[G]      |
|2      |H      |Happy|39  |6           |[G]      |
|2      |G      |Sad  |45  |6           |[]       |
+-------+-------+-----+----+------------+---------+


scala>

【讨论】:

我在你的sql语句中得到org.apache.spark.sql.AnalysisException:,它说THEN and ELSE expressions should all be same type or coercible to a common type 2.1.0... 我通过在 else 部分添加 null 而不是 array() 来修复它。现在可以了!谢谢!! 是的,如果你把 null 代替 whitespace ,那么你不需要切片结果/删除数组的第一个元素 - 所以 df3 将是最终输出。

以上是关于Spark SQL 使用窗口 - 根据列条件从当前行之后的行中收集数据的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL Windows:基于数组列创建数据框

Spark SQL:窗口函数滞后直到满足条件

[Py]Spark SQL:使用框架的输入行约束窗口的每一帧

spark sql 条件最大值

具有复杂条件的 Spark SQL 窗口函数

具有复杂条件的 Spark SQL 窗口函数