Spark Window Functions:过滤掉开始和结束日期在另一行开始和结束日期范围内的行

Posted

技术标签:

【中文标题】Spark Window Functions:过滤掉开始和结束日期在另一行开始和结束日期范围内的行【英文标题】:Spark Window Functions: Filter out rows with start and end dates within the bounds of another rows start and end dates 【发布时间】:2019-09-11 22:47:48 【问题描述】:

我有一个类似于以下内容的 DataFrame (sqlDF)(针对此示例进行了简化),其中我试图删除在另一行的开始日期和结束日期范围内具有 start_date 和 end_date 的任何行:

+-------+-------------+-------------------+-------------------+
|    id |         type|         start_date|           end_date|
+-------+-------------+-------------------+-------------------+
|  1    |      unknown|2018-11-14 16:03:47|2018-12-06 21:23:22| (remove as it's within the next rows start and end dates)
|  1    |          ios|2018-10-13 14:58:22|2019-08-26 15:50:45|
|  1    |      android|2019-08-29 02:41:40|2019-09-05 23:03:20|
|  2    |          ios|2017-12-19 02:25:34|2019-08-09 15:41:30|
|  2    |      windows|2018-07-10 05:30:52|2018-07-13 10:11:34| (remove as it's within the previous row's start and end dates)
|  2    |      android|2019-05-14 18:33:15|2019-08-27 06:10:53| (remove as it's within another row's start and end dates)

首先,最终用户要求我删除所有 start_date 和 end_date 之间的时间少于 5 天的记录,我这样做了:

val dfWithoutTempHandsets = sqlDF.filter(datediff(col("end_date"), col("start_date")) > 5)

产生这样的 DataFrame:

+-------+-------------+-------------------+-------------------+
|    id |         type|         start_date|           end_date|
+-------+-------------+-------------------+-------------------+
|  1    |      unknown|2018-11-14 16:03:47|2018-12-06 21:23:22| 
|  1    |          ios|2018-10-13 14:58:22|2019-08-26 15:50:45|
|  1    |      android|2019-08-29 02:41:40|2019-09-05 23:03:20|
|  2    |          ios|2017-12-19 02:25:34|2019-08-09 15:41:30|
|  2    |      android|2019-05-14 18:33:15|2019-06-27 06:10:53|

现在我需要过滤掉开始和结束日期在同一 ID 的另一行的开始和结束日期内的行,这样生成的 DataFrame 将如下所示:

+-------+-------------+-------------------+-------------------+
|    id |         type|         start_date|           end_date|
+-------+-------------+-------------------+-------------------+
|  1    |          ios|2018-10-13 14:58:22|2019-08-26 15:50:45|
|  1    |      android|2019-08-29 02:41:40|2019-09-05 23:03:20|
|  2    |          ios|2017-12-19 02:25:34|2019-08-09 15:41:30|

在阅读了几篇关于 spark 窗口函数的博客文章和堆栈溢出文章后,我知道这就是答案。但是我正在努力寻找类似用例的示例,其中多个日期以这种方式与另一行的日期进行比较。我相信我有一个很接近的 windowSpec:

val windowSpec = Window.partitionBy("id", "type").orderBy("start_date")

但是从那里我不确定如何利用 windowSpec 仅选择在该 ID 的另一行中没有开始和结束日期的行。

编辑:我被赋予了一个新要求,即只对具有“NULL”或“未知”类型的行应用上述逻辑。但这里的答案让我更接近了!

【问题讨论】:

【参考方案1】:

这是我会考虑使用的逻辑:

在由idstart_date按升序排列的Window分区下,如果当前行中的end_date早于或等于前面任意行中的end_date,则日期范围在当前行必须包含在前一行的某个日期范围内。

将其转换为示例代码(还包括> 5 days 过滤):

import java.sql.Timestamp
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
  (1, "unknown", Timestamp.valueOf("2018-11-14 16:03:47"), Timestamp.valueOf("2018-12-06 21:23:22")),
  (1, "ios", Timestamp.valueOf("2018-10-13 14:58:22"), Timestamp.valueOf("2019-08-26 15:50:45")),
  (1, "android", Timestamp.valueOf("2019-08-29 02:41:40"), Timestamp.valueOf("2019-09-05 23:03:20")),
  (2, "ios", Timestamp.valueOf("2017-12-19 02:25:34"), Timestamp.valueOf("2019-08-09 15:41:30")),
  (2, "unknown", Timestamp.valueOf("2018-07-10 05:30:52"), Timestamp.valueOf("2018-07-13 10:11:34")),
  (2, "android", Timestamp.valueOf("2019-05-14 18:33:15"), Timestamp.valueOf("2019-06-27 06:10:53"))
).toDF("id", "type", "start_date", "end_date")

val win = Window.partitionBy("id").orderBy($"start_date").
  rowsBetween(Window.unboundedPreceding, -1)

df.
  where(unix_timestamp($"end_date") - unix_timestamp($"start_date") > 5*24*3600).
  withColumn("isContained",
    when($"end_date" <= max($"end_date").over(win), true).otherwise(false)
  ).
  where(! $"isContained").
  show
// +---+-------+-------------------+-------------------+-----------+
// | id|   type|         start_date|           end_date|isContained|
// +---+-------+-------------------+-------------------+-----------+
// |  1|    ios|2018-10-13 14:58:22|2019-08-26 15:50:45|      false|
// |  1|android|2019-08-29 02:41:40|2019-09-05 23:03:20|      false|
// |  2|    ios|2017-12-19 02:25:34|2019-08-09 15:41:30|      false|
// +---+-------+-------------------+-------------------+-----------+

请注意,对于&gt; 5 days 过滤,我使用unix_timestamp 而不是datediff,它只是机械地比较day 值的差异(例如 datediff(2019-01-06 12:00:00, 2019-01-01 00:00:00) > 5 为假)。

【讨论】:

这太棒了!假设我只需要对类型为“Null”或“unknown”的行应用此逻辑,我是否只需在您的 when 之外添加第二个“when”? (查看最新编辑) @alaskanloops,从您所描述的情况来看,附加条件很可能应该在现有的when 子句中,例如when(($"type" === "unknown" || $"type".isNull) &amp;&amp; $"end_date" &lt;= max ...)。但请记住,窗口规范 win 适用于同一 id 内每个当前行的所有先前行,无论 type 是什么。【参考方案2】:
import org.apache.spark.sql.expressions._

val sqlDF = Seq((1,"unknown","2018-11-14 16:03:47","2018-12-06 21:23:22"),(1,"ios","2018-10-13 14:58:22","2019-08-26 15:50:45"),(1,"android","2019-08-29 02:41:40","2019-09-05 23:03:20"),(2,"ios","2017-12-19 02:25:34","2019-08-09 15:41:30"),(2,"unknown","2018-07-10 05:30:52","2018-07-13 10:11:34"),(2,"android","2019-05-14 18:33:15","2019-06-27 06:10:53")).toDF("id","type","start_date","end_date")

val dfWithoutTempHandsets = sqlDF.filter(datediff(col("end_date"), col("start_date")) > 5)

val windowSpec = Window.partitionBy(dfWithoutTempHandsets("id")).orderBy(dfWithoutTempHandsets("start_date"))

val windowSpec1 = Window.partitionBy(dfWithoutTempHandsets("id")).orderBy((dfWithoutTempHandsets("end_date").desc))

val dense = first(dfWithoutTempHandsets("start_date")).over(windowSpec)

val dense1 = first(dfWithoutTempHandsets("end_date")).over(windowSpec1)

val temp = dfWithoutTempHandsets.select(dfWithoutTempHandsets("id"),dfWithoutTempHandsets("type"),dfWithoutTempHandsets("start_date"),dfWithoutTempHandsets("end_date"),dense.alias("min_start_date"),dense1.alias("max_end_date"))

val finalDf = temp.filter(temp("start_date").leq(temp("min_start_date")).or(temp("end_date").geq(temp("max_end_date"))))

finalDf.show(false)

【讨论】:

以上是关于Spark Window Functions:过滤掉开始和结束日期在另一行开始和结束日期范围内的行的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark 2.4 上的 pyspark.sql.functions.max().over(window) 上使用 .where() 会引发 Java 异常

spark2.3 SQL内置函数——Date window functions

Spark SQL functions.scala 源码解析String functions (基于 Spark 3.3.0)

Spark SQL functions.scala 源码解析Aggregate functions(基于 Spark 3.3.0)

org.apache.spark.sql.functions汇总

如何在 Spark 的 github 中查看 Functions.Scala 中的代码