使用 spark 用下一行值填充 null 或空
Posted
技术标签:
【中文标题】使用 spark 用下一行值填充 null 或空【英文标题】:Fill null or empty with next Row value with spark 【发布时间】:2019-05-21 10:17:30 【问题描述】:有没有办法用下一行非空值替换 spark 数据帧中的空值。为 Windows 分区和排序添加了额外的 row_count 列。更具体地说,我想达到以下结果:
+---------+-----------+ +---------+--------+
| row_count | id| |row_count | id|
+---------+-----------+ +------+-----------+
| 1| null| | 1| 109|
| 2| 109| | 2| 109|
| 3| null| | 3| 108|
| 4| null| | 4| 108|
| 5| 108| => | 5| 108|
| 6| null| | 6| 110|
| 7| 110| | 7| 110|
| 8| null| | 8| null|
| 9| null| | 9| null|
| 10| null| | 10| null|
+---------+-----------+ +---------+--------+
我尝试了下面的代码,它没有给出正确的结果。
val ss = dataframe.select($"*", sum(when(dataframe("id").isNull||dataframe("id") === "", 1).otherwise(0)).over(Window.orderBy($"row_count")) as "value")
val window1=Window.partitionBy($"value").orderBy("id").rowsBetween(0, Long.MaxValue)
val selectList=ss.withColumn("id_fill_from_below",last("id").over(window1)).drop($"row_count").drop($"value")
【问题讨论】:
Spark / Scala: forward fill with last observation的可能重复 @user10938362 这是不同的。在此用 First 值填充 null。表示在窗口函数的这个使用的 first_value 特征中。在此解决方案中,从上到下填充数据,但我们需要从上到下填充数据。 您可以将数据的顺序更改为 row_count desc,然后应用***.com/questions/33621319/…中的解决方案 我在另一个 SO 问题中添加了一个仅使用窗口函数的解决方案:***.com/a/58876725/2166220 johnpaton.net/posts/forward-fill-spark 这篇文章帮助了我。这是我能找到的最佳解决方案。 【参考方案1】:这是一种方法
-
过滤非空值 (dfNonNulls)
过滤空值 (dfNulls)
使用连接和窗口函数找到空 id 的正确值
填充空数据框 (dfNullFills)
联合 dfNonNulls 和 dfNullFills
数据.csv
row_count,id
1,
2,109
3,
4,
5,108
6,
7,110
8,
9,
10,
var df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("data.csv")
var dfNulls = df.filter(
$"id".isNull
).withColumnRenamed(
"row_count","row_count_nulls"
).withColumnRenamed(
"id","id_nulls"
)
val dfNonNulls = df.filter(
$"id".isNotNull
).withColumnRenamed(
"row_count","row_count_values"
).withColumnRenamed(
"id","id_values"
)
dfNulls = dfNulls.join(
dfNonNulls, $"row_count_nulls" lt $"row_count_values","left"
).select(
$"id_nulls",$"id_values",$"row_count_nulls",$"row_count_values"
)
val window = Window.partitionBy("row_count_nulls").orderBy("row_count_values")
val dfNullFills = dfNulls.withColumn(
"rn", row_number.over(window)
).where($"rn" === 1).drop("rn").select(
$"row_count_nulls".alias("row_count"),$"id_values".alias("id"))
dfNullFills .union(dfNonNulls).orderBy($"row_count").show()
导致
+---------+----+
|row_count| id|
+---------+----+
| 1| 109|
| 2| 109|
| 3| 108|
| 4| 108|
| 5| 108|
| 6| 110|
| 7| 110|
| 8|null|
| 9|null|
| 10|null|
+---------+----+
【讨论】:
以上是关于使用 spark 用下一行值填充 null 或空的主要内容,如果未能解决你的问题,请参考以下文章