Spark SQL:窗口函数滞后直到满足条件
Posted
技术标签:
【中文标题】Spark SQL:窗口函数滞后直到满足条件【英文标题】:Spark SQL: Window function lag until a condition met 【发布时间】:2019-04-09 16:52:58 【问题描述】:我正在 Spark 中处理这个数据集:
+------------+------------+------------+
| ColumnA| ColumnB| Result |
+------------+------------+------------+
| ABCDEF| MNOPQRST| true |
| 123455| UVWXYZ| false |
| ABCDEF| MNOPQRST| false | (should be true)
| 123455| UVWXYZ| false |
| 123455| UVWXYZ| false |
| ABCDEF| EFGHIJK | false |
+------------+------------+------------+
规则是:
-
如果给定分区集
Result
的等级为 1,则为 true。
如果等级不是 1 且 ColumnA
的值为 123455
,则将 Result
的值设置为 false
如果排名不是 1 且 ColumnA
值不是 123455
并且如果 ColumnB
值与前一行的 ColumnB
值匹配,则将 Result 设置为 true。确保上一行的ColumnA的值不是123455
WindowSpec w = Window.partitionBy("ColumnA, ColumnB");
列 matchColumnB = functions.col("ColumnB").equalTo( functions.lag("ColumnB", 1).over(w));
这里窗口函数检查上一行而不考虑上一行的ColumnA值。
例如在上面的数据集中,第 3 行的 ColumnB 值应该与第 1 行而不是第 2 行进行比较。
我尝试查看Window.unboundedPreceding
,但不确定如何在这种情况下使用它。
有没有办法做到这一点?
【问题讨论】:
【参考方案1】:复制 DF:
val df = sc.parallelize(List(("ABCDEF","MNOPQRST"),
("123455","UVWXYZ"),
("ABCDEF","MNOPQRST"),
("123455","UVWXYZ"),
("123455","UVWXYZ"),
("ABCDEF","EFGHIJK")))
.toDF("ColumnA","ColumnB")
提供的信息中存在一些矛盾,例如,您的窗口实现使得无法应用上述条件。
在根据行的顺序[排名和与前一行的比较]进行工作时,窗口分析有一些基本要素
您需要定义适当的分区列。如果窗口被columnA
和columnB
划分,那么它们的值对于给定的窗口将保持不变。因此,如果需要在lead
或lag
行之间比较columnA
和columnB
,则DF 需要按其他列进行分区。 举例说明为什么它是问题
val w = Window.partitionBy("ColumnA", "ColumnB").orderBy("ColumnA", "ColumnB");
df.withColumn("rank", rank.over(w)).show
+-------+--------+----+
|ColumnA| ColumnB|rank|
+-------+--------+----+
| ABCDEF| EFGHIJK| 1|
| ABCDEF|MNOPQRST| 1|
| ABCDEF|MNOPQRST| 1|
| 123455| UVWXYZ| 1|
| 123455| UVWXYZ| 1|
| 123455| UVWXYZ| 1|
+-------+--------+----+
现在每一行都充当自己的窗口。注意顺序,在第2点解释。
窗口中还需要具体的order by
语句。如果没有rank
,“滞后”、“领先”等将变得不确定,因此没有多大意义。 Spark 会尝试保护它,如果没有 order by 子句,窗口函数将抛出异常。 举例说明为什么它是问题
val w = Window.partitionBy("ColumnA", "ColumnB")
df.withColumn("result", lag("columnB", 1).over(w))
导致:
org.apache.spark.sql.AnalysisException: Window function lag('columnB, 1, null) requires window to be ordered, please add ORDER BY clause. For example SELECT lag('columnB, 1, null)(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;
解决方案 回答这个问题本身:我将为您的问题考虑另外两列。
val df = sc.parallelize(List(("ABCDEF","MNOPQRST", "P1", "1"),
("123455","UVWXYZ", "P1", "2"),
("ABCDEF","MNOPQRST", "P1", "3"),
("123455","UVWXYZ", "P1", "4"),
("123455","UVWXYZ", "P1", "5"),
("BLABLAH","UVWXYZ", "P1", "6"),
("ABCDEF","EFGHIJK", "P1", "7")))
.toDF("ColumnA","ColumnB", "ColumnP", "ColumnO")
+-------+--------+-------+-------+
|ColumnA| ColumnB|ColumnP|ColumnO|
+-------+--------+-------+-------+
| ABCDEF|MNOPQRST| P1| 1|
| 123455| UVWXYZ| P1| 2|
| ABCDEF|MNOPQRST| P1| 3|
| 123455| UVWXYZ| P1| 4|
| 123455| UVWXYZ| P1| 5|
|BLABLAH| UVWXYZ| P1| 5|
| ABCDEF| EFGHIJK| P1| 6|
+-------+--------+-------+-------+
这里,分区列是columnP
,按列排序是ColumnO
val w = Window.partitionBy("ColumnP").orderBy("ColumnO")
val dfWithWindowing = df.withColumn("lag_columnB", lag("columnB", 1).over(w))
.withColumn("rank", rank().over(w))
dfWithWindowing.show
+-------+--------+-------+-------+-----------+----+
|ColumnA| ColumnB|ColumnP|ColumnO|lag_columnB|rank|
+-------+--------+-------+-------+-----------+----+
| ABCDEF|MNOPQRST| P1| 1| null| 1|
| 123455| UVWXYZ| P1| 2| MNOPQRST| 2|
| ABCDEF|MNOPQRST| P1| 3| UVWXYZ| 3|
| 123455| UVWXYZ| P1| 4| MNOPQRST| 4|
| 123455| UVWXYZ| P1| 5| UVWXYZ| 5|
|BLABLAH| UVWXYZ| P1| 6| UVWXYZ| 6|
| ABCDEF| EFGHIJK| P1| 7| UVWXYZ| 7|
+-------+--------+-------+-------+-----------+----+
现在我们拥有了执行所需计算所需的所有信息。当结果不满足任何条件时,规则中没有关于结果值的规范,实现认为这是真的。
val resultDF = dfWithWindowing.withColumn("result", when($"rank"==="1",true).otherwise(
when($"ColumnA"==="123455", false).otherwise(
when($"ColumnB"===$"lag_columnB", true).otherwise(true)
)
)
).drop("ColumnP", "ColumnO","lag_columnB","rank")
+-------+--------+------+
|ColumnA| ColumnB|result|
+-------+--------+------+
| ABCDEF|MNOPQRST| true|
| 123455| UVWXYZ| false|
| ABCDEF|MNOPQRST| true|
| 123455| UVWXYZ| false|
| 123455| UVWXYZ| false|
|BLABLAH| UVWXYZ| true|
| ABCDEF| EFGHIJK| true|
+-------+--------+------+
想了解更多开窗知识,请参考https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
【讨论】:
以上是关于Spark SQL:窗口函数滞后直到满足条件的主要内容,如果未能解决你的问题,请参考以下文章