Spark 在值系列中查找 NULL 值块

Posted

技术标签:

【中文标题】Spark 在值系列中查找 NULL 值块【英文标题】:Spark find NULL value blocks in Series of values 【发布时间】:2018-06-09 17:52:35 【问题描述】:

假设这是我的数据:

date         value
2016-01-01   1
2016-01-02   NULL
2016-01-03   NULL
2016-01-04   2
2016-01-05   3
2016-01-06   NULL
2016-01-07   NULL
2016-01-08   NULL
2016-01-09   1

我正在尝试查找围绕 NULL 值组的开始和结束日期。示例输出如下:

start        end
2016-01-01   2016-01-04
2016-01-05   2016-01-09

我对这个问题的第一次尝试产生了以下结果:

df.filter($"value".isNull)\
    .agg(to_date(date_add(max("date"), 1)) as "max", 
         to_date(date_sub(min("date"),1)) as "min"
        )

但这只能找到总的最小值和最大值。我想过使用 groupBy 但不知道如何为每个空值块创建一个列。

【问题讨论】:

【参考方案1】:

棘手的部分是获取组的边界,因此您需要几个步骤。

首先构建空值/非空值组(使用窗口函数) 然后按块分组以获得块内的边界 然后再次使用窗口函数来扩展边框

这是一个工作示例:

import ss.implicits._

val df = Seq(
  ("2016-01-01", Some(1)),
  ("2016-01-02", None),
  ("2016-01-03", None),
  ("2016-01-04", Some(2)),
  ("2016-01-05", Some(3)),
  ("2016-01-06", None),
  ("2016-01-07", None),
  ("2016-01-08", None),
  ("2016-01-09", Some(1))
).toDF("date", "value")


df
  // build blocks
  .withColumn("isnull", when($"value".isNull, true).otherwise(false))
  .withColumn("lag_isnull", lag($"isnull",1).over(Window.orderBy($"date")))
  .withColumn("change", coalesce($"isnull"=!=$"lag_isnull",lit(false)))
  .withColumn("block", sum($"change".cast("int")).over(Window.orderBy($"date")))
  // now calculate min/max within groups
  .groupBy($"block")
  .agg(
    min($"date").as("tmp_min"),
    max($"date").as("tmp_max"),
    (count($"value")===0).as("null_block")
  )
  // now extend groups to include borders
  .withColumn("min", lag($"tmp_max", 1).over(Window.orderBy($"tmp_min")))
  .withColumn("max", lead($"tmp_min", 1).over(Window.orderBy($"tmp_max")))
  // only select null-groups
  .where($"null_block")
  .select($"min", $"max")
  .orderBy($"min")
  .show()

给予

+----------+----------+
|       min|       max|
+----------+----------+
|2016-01-01|2016-01-04|
|2016-01-05|2016-01-09|
+----------+----------+

【讨论】:

【参考方案2】:

我没有可行的解决方案,但我有一些建议。

Look at using a lag;您还必须稍微更改该代码以生成引导列。

现在假设您有滞后和领先列。您的结果数据框现在将如下所示:

date         value     lag_value     lead_value
2016-01-01   1         NULL          1 
2016-01-02   NULL      NULL          1
2016-01-03   NULL      2             NULL
2016-01-04   2         3             NULL
2016-01-05   3         NULL          2
2016-01-06   NULL      NULL          3
2016-01-07   NULL      NULL          NULL
2016-01-08   NULL      1             NULL
2016-01-09   1         1             NULL

现在您要做的只是按以下条件过滤:

min date:
df.filter("value IS NOT NULL AND lag_value IS NULL")

max date:
df.filter("value IS NULL AND lead_value IS NOT NULL")

如果您想更高级一点,还可以使用when 命令创建一个新列,用于说明日期是空组的开始日期还是结束日期:

date         value     lag_value     lead_value   group_date_type
2016-01-01   1         NULL          1            start
2016-01-02   NULL      NULL          1            NULL
2016-01-03   NULL      2             NULL         NULL   
2016-01-04   2         3             NULL         end
2016-01-05   3         NULL          2            start
2016-01-06   NULL      NULL          3            NULL
2016-01-07   NULL      NULL          NULL         NULL
2016-01-08   NULL      1             NULL         NULL
2016-01-09   1         1             NULL         end 

这可以用如下所示的东西创建:

from pyspark.sql import functions as F
df_2.withColumn('group_date_type', 
                F.when("value IS NOT NULL AND lag_value IS NULL", start)\
                  .when("value IS NULL AND lead_value IS NOT NULL", end)\
                  .otherwise(None)
                 )

【讨论】:

不知道为什么这不是一个可行的解决方案,对我有用,谢谢?

以上是关于Spark 在值系列中查找 NULL 值块的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark 数据框中的 n 列中按行查找最频繁的值

在两个 Spark 数据框列中查找公共元素的有效方法

Spark数据框选择该行的任何列中至少有一个null或空白的行

我爱java系列之---如何把数据库中查到的Skulist数据转换成elastic索引库中SkuInfo类数据?

从系列/列中查找第一个元素的索引(例如“True”)

使用 Spark/Cassandra 的时间序列 - 如何在值满足条件时找到时间戳?