在 Spark SQL 中使用窗口函数结束日期记录
Posted
技术标签:
【中文标题】在 Spark SQL 中使用窗口函数结束日期记录【英文标题】:End-dating records using window functions in Spark SQL 【发布时间】:2018-12-14 04:16:19 【问题描述】:我有一个如下所示的数据框
+----+----+----------+----------+
|colA|colB| colC| colD|
+----+----+----------+----------+
| a| 2|2013-12-12|2999-12-31|
| b| 3|2011-12-14|2999-12-31|
| a| 4|2013-12-17|2999-12-31|
| b| 8|2011-12-19|2999-12-31|
| a| 6|2013-12-23|2999-12-31|
+----+----+----------+----------+
我需要根据 ColA 对记录进行分组,并根据 colC 对记录进行排名(最近的日期排名更大),然后通过从相邻排名的 colC 记录中减去一天来更新 colD 中的日期。
最终的数据框应如下所示
+----+----+----------+----------+
|colA|colB| colC| colD|
+----+----+----------+----------+
| a| 2|2013-12-12|2013-12-16|
| a| 4|2013-12-17|2013-12-22|
| a| 6|2013-12-23|2999-12-31|
| b| 3|2011-12-14|2011-12-18|
| b| 8|2011-12-29|2999-12-31|
+----+----+----------+----------+
【问题讨论】:
【参考方案1】:您可以使用窗口函数获取它
scala> val df = Seq(("a",2,"2013-12-12","2999-12-31"),("b",3,"2011-12-14","2999-12-31"),("a",4,"2013-12-17","2999-12-31"),("b",8,"2011-12-19","2999-12-31"),("a",6,"2013-12-23","2999-12-31")).toDF("colA","colB","colC","colD")
df: org.apache.spark.sql.DataFrame = [colA: string, colB: int ... 2 more fields]
scala> val df2 = df.withColumn("colc",'colc.cast("date")).withColumn("cold",'cold.cast("date"))
df2: org.apache.spark.sql.DataFrame = [colA: string, colB: int ... 2 more fields]
scala> df2.createOrReplaceTempView("yash")
scala> spark.sql(""" select cola,colb,colc,cold, rank() over(partition by cola order by colc) c1, coalesce(date_sub(lead(colc) over(partition by cola order by colc),1),cold) as cold2 from yash """).show
+----+----+----------+----------+---+----------+
|cola|colb| colc| cold| c1| cold2|
+----+----+----------+----------+---+----------+
| b| 3|2011-12-14|2999-12-31| 1|2011-12-18|
| b| 8|2011-12-19|2999-12-31| 2|2999-12-31|
| a| 2|2013-12-12|2999-12-31| 1|2013-12-16|
| a| 4|2013-12-17|2999-12-31| 2|2013-12-22|
| a| 6|2013-12-23|2999-12-31| 3|2999-12-31|
+----+----+----------+----------+---+----------+
scala>
删除不必要的列
scala> spark.sql(""" select cola,colb,colc, coalesce(date_sub(lead(colc) over(partition by cola order by colc),1),cold) as cold from yash """).show
+----+----+----------+----------+
|cola|colb| colc| cold|
+----+----+----------+----------+
| b| 3|2011-12-14|2011-12-18|
| b| 8|2011-12-19|2999-12-31|
| a| 2|2013-12-12|2013-12-16|
| a| 4|2013-12-17|2013-12-22|
| a| 6|2013-12-23|2999-12-31|
+----+----+----------+----------+
scala>
【讨论】:
【参考方案2】:您可以按 colA 在分区上创建 row_number 并按 colC 排序,然后在数据帧上进行自连接。代码应如下所示。
val rnkDF = df.withColumn("rnk", row_number().over(Window.partitionBy("colA").orderBy($"colC".asc)))
.withColumn("rnkminusone", $"rnk" - lit(1))
val joinDF = rnkDF.alias('A).join(rnkDF.alias('B), ($"A.colA" === $"B.colA").and($"A.rnk" === $"B.rnkminusone"),"left")
.select($"A.colA".as("colA")
, $"A.colB".as("colB")
, $"A.colC".as("colC")
, when($"B.colC".isNull, $"A.colD").otherwise(date_sub($"B.colC", 1)).as("colD"))
结果如下。我希望这会有所帮助。
+----+----+----------+----------+
|colA|colB| colC| colD|
+----+----+----------+----------+
| a| 2|2013-12-12|2013-12-16|
| a| 4|2013-12-17|2013-12-22|
| a| 6|2013-12-23|2999-12-31|
| b| 3|2011-12-14|2011-12-18|
| b| 8|2011-12-19|2999-12-31|
+----+----+----------+----------+
【讨论】:
以上是关于在 Spark SQL 中使用窗口函数结束日期记录的主要内容,如果未能解决你的问题,请参考以下文章