使用 spark 用下一行值填充 null 或空

Posted

技术标签:

【中文标题】使用 spark 用下一行值填充 null 或空【英文标题】:Fill null or empty with next Row value with spark 【发布时间】:2019-05-21 10:17:30 【问题描述】:

有没有办法用下一行非空值替换 spark 数据帧中的空值。为 Windows 分区和排序添加了额外的 row_count 列。更具体地说,我想达到以下结果:

      +---------+-----------+      +---------+--------+
      | row_count |       id|      |row_count |     id|
      +---------+-----------+      +------+-----------+
      |        1|       null|      |     1|        109|
      |        2|        109|      |     2|        109|
      |        3|       null|      |     3|        108|
      |        4|       null|      |     4|        108|
      |        5|        108| =>   |     5|        108|
      |        6|       null|      |     6|        110|
      |        7|        110|      |     7|        110|
      |        8|       null|      |     8|       null|
      |        9|       null|      |     9|       null|
      |       10|       null|      |    10|       null|
      +---------+-----------+      +---------+--------+

我尝试了下面的代码,它没有给出正确的结果。

      val ss = dataframe.select($"*", sum(when(dataframe("id").isNull||dataframe("id") === "", 1).otherwise(0)).over(Window.orderBy($"row_count")) as "value")
      val window1=Window.partitionBy($"value").orderBy("id").rowsBetween(0, Long.MaxValue)
      val selectList=ss.withColumn("id_fill_from_below",last("id").over(window1)).drop($"row_count").drop($"value")

【问题讨论】:

Spark / Scala: forward fill with last observation的可能重复 @user10938362 这是不同的。在此用 First 值填充 null。表示在窗口函数的这个使用的 first_value 特征中。在此解决方案中,从上到下填充数据,但我们需要从上到下填充数据。 您可以将数据的顺序更改为 row_count desc,然后应用***.com/questions/33621319/…中的解决方案 我在另一个 SO 问题中添加了一个仅使用窗口函数的解决方案:***.com/a/58876725/2166220 johnpaton.net/posts/forward-fill-spark 这篇文章帮助了我。这是我能找到的最佳解决方案。 【参考方案1】:

这是一种方法

    过滤非空值 (dfNonNulls) 过滤空值 (dfNulls) 使用连接和窗口函数找到空 id 的正确值 填充空数据框 (dfNullFills) 联合 dfNonNulls 和 dfNullFills

数据.csv

row_count,id
1,
2,109
3,
4,
5,108
6,
7,110
8,
9,
10,
var df = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("data.csv")

var dfNulls = df.filter(
  $"id".isNull
).withColumnRenamed(
  "row_count","row_count_nulls"
).withColumnRenamed(
  "id","id_nulls"
)

val dfNonNulls = df.filter(
  $"id".isNotNull
).withColumnRenamed(
  "row_count","row_count_values"
).withColumnRenamed(
  "id","id_values"
)

dfNulls = dfNulls.join(
  dfNonNulls, $"row_count_nulls" lt $"row_count_values","left"
).select(
  $"id_nulls",$"id_values",$"row_count_nulls",$"row_count_values"
)

val window = Window.partitionBy("row_count_nulls").orderBy("row_count_values")

val dfNullFills = dfNulls.withColumn(
  "rn", row_number.over(window)
).where($"rn" === 1).drop("rn").select(
  $"row_count_nulls".alias("row_count"),$"id_values".alias("id"))

dfNullFills .union(dfNonNulls).orderBy($"row_count").show()

导致

+---------+----+
|row_count|  id|
+---------+----+
|        1| 109|
|        2| 109|
|        3| 108|
|        4| 108|
|        5| 108|
|        6| 110|
|        7| 110|
|        8|null|
|        9|null|
|       10|null|
+---------+----+

【讨论】:

以上是关于使用 spark 用下一行值填充 null 或空的主要内容,如果未能解决你的问题,请参考以下文章

值不能为 null 或空. 参数名: value

7645 Null 或空全文谓词

带有空对象的补丁值

Scala Spark用NULL替换空String

spark.read.schema 为数据框列值返回 null [重复]

检查对象中的值是不是为空或空javascript