在日期列比较上过滤 DataFrame

Posted

技术标签:

【中文标题】在日期列比较上过滤 DataFrame【英文标题】:Filtering a DataFrame on date columns comparison 【发布时间】:2019-01-15 15:01:38 【问题描述】:

我正在尝试使用 Scala 和 Spark 过滤比较两个日期列的 DataFrame。基于过滤后的 DataFrame,在顶部运行计算以计算新列。 简化后我的数据框具有以下架构:

|-- received_day: date (nullable = true)
|-- finished: int (nullable = true)

除此之外,我还创建了两个新列 t_startt_end,用于过滤 DataFrame。它们与原始列 received_day 相差 10 天和 20 天:

val dfWithDates= df
      .withColumn("t_end",date_sub(col("received_day"),10))
      .withColumn("t_start",date_sub(col("received_day"),20))

我现在想要一个新的计算列,它为每一行数据指示在t_startt_end 期间有多少行数据帧。我想我可以通过以下方式实现:

val dfWithCount = dfWithDates
       .withColumn("cnt", lit(
        dfWithDates.filter(
          $"received_day".lt(col("t_end")) 
          && $"received_day".gt(col("t_start"))).count()))

但是,这个计数只返回 0,我相信问题出在我传递给 ltgt 的参数上。

从这里Filtering a spark dataframe based on date 关注该问题,我意识到我需要传递一个字符串值。如果我尝试使用像lt(lit("2018-12-15")) 这样的硬编码值,那么过滤就会起作用。所以我试着把我的专栏投到StringType:

val dfWithDates= df
      .withColumn("t_end",date_sub(col("received_day"),10).cast(DataTypes.StringType))
      .withColumn("t_start",date_sub(col("received_day"),20).cast(DataTypes.StringType))

但过滤器仍然返回一个空的数据帧。 我会假设我没有正确处理数据类型。

我在带有 Spark 2.0.2 的 Scala 2.11.0 上运行。

【问题讨论】:

也许还有人知道我在哪里可以找到关于lt()gt() 的文档?我试着搜索它,但找不到我要找的东西。 您可以在Column数据类型上找到所有可操作的函数:spark.apache.org/docs/latest/api/scala/… 【参考方案1】:

是的,你是对的。对于$"received_day".lt(col("t_end"),每个reveived_day 值与当前行的t_end 值进行比较,而不是整个数据帧。所以每次你都会得到零作为计数。 您可以通过编写一个简单的 udf 来解决这个问题。以下是解决问题的方法:

创建示例输入数据集

import org.apache.spark.sql.Row, SparkSession
import java.sql.Date
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq((Date.valueOf("2018-10-12"),1),
              (Date.valueOf("2018-10-13"),1),
              (Date.valueOf("2018-09-25"),1),
              (Date.valueOf("2018-10-14"),1)).toDF("received_day", "finished")

val dfWithDates= df
  .withColumn("t_start",date_sub(col("received_day"),20))
  .withColumn("t_end",date_sub(col("received_day"),10))
dfWithDates.show()
    +------------+--------+----------+----------+
|received_day|finished|   t_start|     t_end|
+------------+--------+----------+----------+
|  2018-10-12|       1|2018-09-22|2018-10-02|
|  2018-10-13|       1|2018-09-23|2018-10-03|
|  2018-09-25|       1|2018-09-05|2018-09-15|
|  2018-10-14|       1|2018-09-24|2018-10-04|
+------------+--------+----------+----------+

这里对于2018-09-25,我们需要计数 3

生成输出

val count_udf = udf((received_day:Date) => 
        (dfWithDates.filter((col("t_end").gt(s"$received_day")) && col("t_start").lt(s"$received_day")).count())
    )
    val dfWithCount = dfWithDates.withColumn("count",count_udf(col("received_day")))
    dfWithCount.show()
    +------------+--------+----------+----------+-----+
|received_day|finished|   t_start|     t_end|count|
+------------+--------+----------+----------+-----+
|  2018-10-12|       1|2018-09-22|2018-10-02|    0|
|  2018-10-13|       1|2018-09-23|2018-10-03|    0|
|  2018-09-25|       1|2018-09-05|2018-09-15|    3|
|  2018-10-14|       1|2018-09-24|2018-10-04|    0|
+------------+--------+----------+----------+-----+

为了使计算更快,我建议缓存dfWithDates,因为每一行都有相同的操作重复。

【讨论】:

这实际上对我有用,我只是不明白为什么你的答案在没有任何评论的情况下被降级。里面有什么不好的做法吗? @Inna 我仍然不明白为什么它被降级。没有任何评论的降级是不可接受的。你问了一个很好的问题,起初我看到这个问题被降级而没有任何评论。如果可行,请您接受答案吗?【参考方案2】:

您可以使用 DateTimeFormatter 将日期值转换为具有任何模式的字符串

import java.time.format.DateTimeFormatter

date.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))

【讨论】:

如果我有 DateTime 格式的 val t_end 则此方法有效,但如果我尝试执行 .withColumn("t_end",date_sub(col("received_day"),10).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))),则会收到以下错误:error: value format is not a member of org.apache.spark.sql.Column

以上是关于在日期列比较上过滤 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

剑道列过滤器内的角度材料日期选择器问题

过滤数字列名称上的非NA

剑道网格中的日期时间过滤器

数据表日期过滤器

过滤特定月份的日期列

PF过滤包含日期的数据表列