Spark 在值系列中查找 NULL 值块
Posted
技术标签:
【中文标题】Spark 在值系列中查找 NULL 值块【英文标题】:Spark find NULL value blocks in Series of values 【发布时间】:2018-06-09 17:52:35 【问题描述】:假设这是我的数据:
date value
2016-01-01 1
2016-01-02 NULL
2016-01-03 NULL
2016-01-04 2
2016-01-05 3
2016-01-06 NULL
2016-01-07 NULL
2016-01-08 NULL
2016-01-09 1
我正在尝试查找围绕 NULL 值组的开始和结束日期。示例输出如下:
start end
2016-01-01 2016-01-04
2016-01-05 2016-01-09
我对这个问题的第一次尝试产生了以下结果:
df.filter($"value".isNull)\
.agg(to_date(date_add(max("date"), 1)) as "max",
to_date(date_sub(min("date"),1)) as "min"
)
但这只能找到总的最小值和最大值。我想过使用 groupBy 但不知道如何为每个空值块创建一个列。
【问题讨论】:
【参考方案1】:棘手的部分是获取组的边界,因此您需要几个步骤。
首先构建空值/非空值组(使用窗口函数) 然后按块分组以获得块内的边界 然后再次使用窗口函数来扩展边框这是一个工作示例:
import ss.implicits._
val df = Seq(
("2016-01-01", Some(1)),
("2016-01-02", None),
("2016-01-03", None),
("2016-01-04", Some(2)),
("2016-01-05", Some(3)),
("2016-01-06", None),
("2016-01-07", None),
("2016-01-08", None),
("2016-01-09", Some(1))
).toDF("date", "value")
df
// build blocks
.withColumn("isnull", when($"value".isNull, true).otherwise(false))
.withColumn("lag_isnull", lag($"isnull",1).over(Window.orderBy($"date")))
.withColumn("change", coalesce($"isnull"=!=$"lag_isnull",lit(false)))
.withColumn("block", sum($"change".cast("int")).over(Window.orderBy($"date")))
// now calculate min/max within groups
.groupBy($"block")
.agg(
min($"date").as("tmp_min"),
max($"date").as("tmp_max"),
(count($"value")===0).as("null_block")
)
// now extend groups to include borders
.withColumn("min", lag($"tmp_max", 1).over(Window.orderBy($"tmp_min")))
.withColumn("max", lead($"tmp_min", 1).over(Window.orderBy($"tmp_max")))
// only select null-groups
.where($"null_block")
.select($"min", $"max")
.orderBy($"min")
.show()
给予
+----------+----------+
| min| max|
+----------+----------+
|2016-01-01|2016-01-04|
|2016-01-05|2016-01-09|
+----------+----------+
【讨论】:
【参考方案2】:我没有可行的解决方案,但我有一些建议。
Look at using a lag;您还必须稍微更改该代码以生成引导列。
现在假设您有滞后和领先列。您的结果数据框现在将如下所示:
date value lag_value lead_value
2016-01-01 1 NULL 1
2016-01-02 NULL NULL 1
2016-01-03 NULL 2 NULL
2016-01-04 2 3 NULL
2016-01-05 3 NULL 2
2016-01-06 NULL NULL 3
2016-01-07 NULL NULL NULL
2016-01-08 NULL 1 NULL
2016-01-09 1 1 NULL
现在您要做的只是按以下条件过滤:
min date:
df.filter("value IS NOT NULL AND lag_value IS NULL")
max date:
df.filter("value IS NULL AND lead_value IS NOT NULL")
如果您想更高级一点,还可以使用when
命令创建一个新列,用于说明日期是空组的开始日期还是结束日期:
date value lag_value lead_value group_date_type
2016-01-01 1 NULL 1 start
2016-01-02 NULL NULL 1 NULL
2016-01-03 NULL 2 NULL NULL
2016-01-04 2 3 NULL end
2016-01-05 3 NULL 2 start
2016-01-06 NULL NULL 3 NULL
2016-01-07 NULL NULL NULL NULL
2016-01-08 NULL 1 NULL NULL
2016-01-09 1 1 NULL end
这可以用如下所示的东西创建:
from pyspark.sql import functions as F
df_2.withColumn('group_date_type',
F.when("value IS NOT NULL AND lag_value IS NULL", start)\
.when("value IS NULL AND lead_value IS NOT NULL", end)\
.otherwise(None)
)
【讨论】:
不知道为什么这不是一个可行的解决方案,对我有用,谢谢?以上是关于Spark 在值系列中查找 NULL 值块的主要内容,如果未能解决你的问题,请参考以下文章
Spark数据框选择该行的任何列中至少有一个null或空白的行