在 Spark SQL 中使用窗口函数结束日期记录

Posted

技术标签:

【中文标题】在 Spark SQL 中使用窗口函数结束日期记录【英文标题】:End-dating records using window functions in Spark SQL 【发布时间】:2018-12-14 04:16:19 【问题描述】:

我有一个如下所示的数据框

+----+----+----------+----------+
|colA|colB|      colC|      colD|
+----+----+----------+----------+
|   a|   2|2013-12-12|2999-12-31|
|   b|   3|2011-12-14|2999-12-31|
|   a|   4|2013-12-17|2999-12-31|
|   b|   8|2011-12-19|2999-12-31|
|   a|   6|2013-12-23|2999-12-31|
+----+----+----------+----------+

我需要根据 ColA 对记录进行分组,并根据 colC 对记录进行排名(最近的日期排名更大),然后通过从相邻排名的 colC 记录中减去一天来更新 colD 中的日期。

最终的数据框应如下所示

+----+----+----------+----------+
|colA|colB|      colC|      colD|
+----+----+----------+----------+
|   a|   2|2013-12-12|2013-12-16|
|   a|   4|2013-12-17|2013-12-22|
|   a|   6|2013-12-23|2999-12-31|
|   b|   3|2011-12-14|2011-12-18|
|   b|   8|2011-12-29|2999-12-31|
+----+----+----------+----------+

【问题讨论】:

【参考方案1】:

您可以使用窗口函数获取它

scala> val df = Seq(("a",2,"2013-12-12","2999-12-31"),("b",3,"2011-12-14","2999-12-31"),("a",4,"2013-12-17","2999-12-31"),("b",8,"2011-12-19","2999-12-31"),("a",6,"2013-12-23","2999-12-31")).toDF("colA","colB","colC","colD")
df: org.apache.spark.sql.DataFrame = [colA: string, colB: int ... 2 more fields]

scala> val df2 = df.withColumn("colc",'colc.cast("date")).withColumn("cold",'cold.cast("date"))
df2: org.apache.spark.sql.DataFrame = [colA: string, colB: int ... 2 more fields]

scala> df2.createOrReplaceTempView("yash")

scala> spark.sql(""" select cola,colb,colc,cold, rank() over(partition by cola order by colc) c1, coalesce(date_sub(lead(colc) over(partition by cola order by colc),1),cold) as cold2 from yash """).show
+----+----+----------+----------+---+----------+
|cola|colb|      colc|      cold| c1|     cold2|
+----+----+----------+----------+---+----------+
|   b|   3|2011-12-14|2999-12-31|  1|2011-12-18|
|   b|   8|2011-12-19|2999-12-31|  2|2999-12-31|
|   a|   2|2013-12-12|2999-12-31|  1|2013-12-16|
|   a|   4|2013-12-17|2999-12-31|  2|2013-12-22|
|   a|   6|2013-12-23|2999-12-31|  3|2999-12-31|
+----+----+----------+----------+---+----------+


scala> 

删除不必要的列

scala> spark.sql(""" select cola,colb,colc, coalesce(date_sub(lead(colc) over(partition by cola order by colc),1),cold) as cold from yash """).show
+----+----+----------+----------+
|cola|colb|      colc|      cold|
+----+----+----------+----------+
|   b|   3|2011-12-14|2011-12-18|
|   b|   8|2011-12-19|2999-12-31|
|   a|   2|2013-12-12|2013-12-16|
|   a|   4|2013-12-17|2013-12-22|
|   a|   6|2013-12-23|2999-12-31|
+----+----+----------+----------+


scala>

【讨论】:

【参考方案2】:

您可以按 colA 在分区上创建 row_number 并按 colC 排序,然后在数据帧上进行自连接。代码应如下所示。

val rnkDF = df.withColumn("rnk", row_number().over(Window.partitionBy("colA").orderBy($"colC".asc)))
  .withColumn("rnkminusone", $"rnk" - lit(1))

val joinDF = rnkDF.alias('A).join(rnkDF.alias('B), ($"A.colA" ===  $"B.colA").and($"A.rnk" === $"B.rnkminusone"),"left")
    .select($"A.colA".as("colA")
    , $"A.colB".as("colB")
    , $"A.colC".as("colC")
    , when($"B.colC".isNull, $"A.colD").otherwise(date_sub($"B.colC", 1)).as("colD"))

结果如下。我希望这会有所帮助。

+----+----+----------+----------+ |colA|colB| colC| colD| +----+----+----------+----------+ | a| 2|2013-12-12|2013-12-16| | a| 4|2013-12-17|2013-12-22| | a| 6|2013-12-23|2999-12-31| | b| 3|2011-12-14|2011-12-18| | b| 8|2011-12-19|2999-12-31| +----+----+----------+----------+

【讨论】:

以上是关于在 Spark SQL 中使用窗口函数结束日期记录的主要内容,如果未能解决你的问题,请参考以下文章

用于构建时间线的 Spark 窗口函数

在 SQL/Spark 中使用窗口函数执行特定过滤器

如何在 Hive/Spark SQL 中使用窗口函数删除重叠

Spark 窗口函数 - rangeBetween 日期

Spark SQL 中分组依据和窗口函数如何交互?

在 Apache Spark SQL 中将中值作为窗口函数 (UDAF) 移动