Spark Dataframe - 窗口函数 - 插入和更新输出的滞后和领先

Posted

技术标签:

【中文标题】Spark Dataframe - 窗口函数 - 插入和更新输出的滞后和领先【英文标题】:Spark Dataframe - Windowing Function - Lag & Lead for Insert & Update output 【发布时间】:2018-01-11 11:08:08 【问题描述】:

我需要使用窗口函数 Lag 和 Lead 对数据帧执行以下操作。

对于每个 Key,我需要在最终输出中执行下面的插入和更新

插入条件: 1. 默认情况下,LAYER_NO=0,需要写入输出。 2.如果COL1,COL2,COL3的值有任何变化,相对于其宝贵的记录,那么该记录需要写入输出。

示例:key_1 与 layer_no=2,COL3 中值从 400 变为 600

更新条件: 1. 如果 COL1,COL2,COL3 的值相对于其之前的记录没有变化,但“DEPART 列”有变化,则需要在输出中更新该值。

示例:key_1 with layer_no=3,COL1,COL2,COL3 没有变化,但是 DEPART 列中的值变化为“xyz”,因此需要在输出中更新。 2.连LAYER_NO也要依次更新,插入layer_no=0的记录后

    val inputDF = values.toDF("KEY","LAYER_NO","COl1","COl2","COl3","DEPART")

    inputDF.show()   
    +-----+--------+----+----+----+------+
    |  KEY|LAYER_NO|COL1|COL2|COL3|DEPART|
    +-----+--------+----+----+----+------+
    |key_1|       0| 200| 300| 400|   abc|->default write
    |key_1|       1| 200| 300| 400|   abc|
    |key_1|       2| 200| 300| 600|   uil|--->change in col3,so write
    |key_1|       2| 200| 300| 600|   uil|
    |key_1|       3| 200| 300| 600|   xyz|--->change in col4,so update
    |key_2|       0| 500| 700| 900|   prq|->default write
    |key_2|       1| 888| 555| 900|   tep|--->change in col1 & col 2,so write
    |key_3|       0| 111| 222| 333|   lgh|->default write
    |key_3|       1| 084| 222| 333|   lgh|--->change in col1,so write
    |key_3|       2| 084| 222| 333|   rrr|--->change in col4,so update
    +-----+--------+----+----+----+------+

预期输出:

outputDF.show()
+-----+--------+----+----+----+------+
|  KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1|       0| 200| 300| 400|   abc|
|key_1|       1| 200| 300| 600|   xyz|
|key_2|       0| 500| 700| 900|   prq|
|key_2|       1| 888| 555| 900|   tep|
|key_3|       0| 111| 222| 333|   lgh|
|key_3|       1| 084| 222| 333|   rrr|
+-----+--------+----+----+----+------+

【问题讨论】:

为什么|key_1| 2| 200| 300| 600| uil 没有出现在输出中? |key_1| 1| 200| 300| 600|用户|写入输出,但在下一条记录中,DEPART 列中的值发生了变化,因此这会将“uil”更新为“xyz”。所以最终记录的将是|key_1| 1| 200| 300| 600| xyz| 那么为什么 layer_no 是 1 ?不应该是3吗? 对不起,它错过了信息....现在更新了问题..在插入 layer_no=0 的记录后,即使 LAYER_NO 也应该按顺序更新 【参考方案1】:

我们需要定义两个Window 来达到您的预期输出。一个用于检查DEPART 列的变化,第二个用于检查COL1COL3 之和的差异。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w_col = Window.partitionBy("KEY", "COL1", "COL2", "COL3").orderBy("LAYER_NO")
                  .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val w_key = Window.partitionBy("KEY").orderBy("LAYER_NO")

然后我们只需将DEPART 列中的值替换为正确的值,并将数据过滤到滞后总和与当前列总和不同的行(以及LAYER_NO === 0 的行)。最后,我们用排名替换LAYER_NO

inputDF.withColumn("DEPART", last("DEPART").over(w_col))
   .withColumn("row_sum",($"COL1" + $"COL2" + $"COL3"))
   .withColumn("lag_sum", lag($"row_sum",1).over(w_key))
   .filter($"LAYER_NO" === 0 || not($"row_sum" === $"lag_sum"))
   .withColumn("LAYER_NO", rank.over(w_key)-1)
   .drop("row_sum", "lag_sum").show()
+-----+--------+----+----+----+------+
|  KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1|       0| 200| 300| 400|   abc|
|key_1|       1| 200| 300| 600|   xyz|
|key_2|       0| 500| 700| 900|   prq|
|key_2|       1| 888| 555| 900|   tep|
|key_3|       0| 111| 222| 333|   lgh|
|key_3|       1| 084| 222| 333|   rrr|
+-----+--------+----+----+----+------+

【讨论】:

是的..我还需要更新 LAYER_NO。如何做到这一点? 我们可以在第二个窗口中使用rank(),查看更新

以上是关于Spark Dataframe - 窗口函数 - 插入和更新输出的滞后和领先的主要内容,如果未能解决你的问题,请参考以下文章

没有 orderBy 的 Spark 窗口函数

具有复杂条件的 Spark SQL 窗口函数

具有复杂条件的 Spark SQL 窗口函数

如何在窗口 scala/spark 中使用 partitionBy 函数

sparksql系列 sparksql列操作窗口函数join

在按天分区的数据上过滤 n 天窗口的 spark DataFrame