Pyspark - 每个键添加缺失值?
Posted
技术标签:
【中文标题】Pyspark - 每个键添加缺失值?【英文标题】:Pyspark - add missing values per key? 【发布时间】:2020-03-11 12:00:15 【问题描述】:我有一个 Pyspark 数据框,其中包含一些非唯一键 key
和一些列 number
和 value
。
对于大多数keys
,number
列从 1 到 12,但对于其中一些,numbers
中存在间隙(例如,我们有数字 [1, 2, 5, 9]
)。我想添加缺失的行,这样对于每个 key
,我们都会在 1-12 范围内填充最后看到的值的所有 numbers
。
所以对于表
key number value
a 1 6
a 2 10
a 5 20
a 9 25
我想得到
key number value
a 1 6
a 2 10
a 3 10
a 4 10
a 5 20
a 6 20
a 7 20
a 8 20
a 9 25
a 10 25
a 11 25
a 12 25
我考虑过创建一个a
的表和一个 1-12 的数组,分解该数组并加入我的原始表,然后使用以当前行为边界的窗口函数分别用先前的值填充 value
列.不过好像有点不雅,不知道有没有更好的方法来实现我想要的?
【问题讨论】:
我认为在 pyspark 上下文中,你想要的很棘手。我的理解是 spark 最适合以列方式工作,添加、过滤、删除列等。我可以假设您事先不知道数字列中哪些键有间隙吗?您可以通过使用 groupBy() 和 agg() (hackingandslacking.com/…) 提取它,然后过滤以仅保留具有 【参考方案1】:我考虑过创建一个 a 的表和一个 1-12 的数组,分解数组并与我的原始表连接,然后使用以当前行为边界的窗口函数分别用先前的值填充 value 列。不过好像有点不雅,不知道有没有更好的方法来实现我想要的?
我不认为您提出的方法不优雅 - 但您可以使用 range
而不是 explode
来实现同样的效果。
首先创建一个包含您范围内所有数字的数据框。您还需要将其与 DataFrame 中不同的 key
列交叉连接。
all_numbers = spark.range(1, 13).withColumnRenamed("id", "number")
all_numbers = all_numbers.crossJoin(df.select("key").distinct()).cache()
all_numbers.show()
#+------+---+
#|number|key|
#+------+---+
#| 1| a|
#| 2| a|
#| 3| a|
#| 4| a|
#| 5| a|
#| 6| a|
#| 7| a|
#| 8| a|
#| 9| a|
#| 10| a|
#| 11| a|
#| 12| a|
#+------+---+
现在您可以将其外部连接到您的原始 DataFrame 和 forward fill using the last known good value。如果key的数量足够少,或许可以广播
from pyspark.sql.functions import broadcast, last
from pyspark.sql import Window
df.join(broadcast(all_numbers), on=["number", "key"], how="outer")\
.withColumn(
"value",
last(
"value",
ignorenulls=True
).over(
Window.partitionBy("key").orderBy("number")\
.rowsBetween(Window.unboundedPreceding, 0)
)
)\
.show()
#+------+---+-----+
#|number|key|value|
#+------+---+-----+
#| 1| a| 6|
#| 2| a| 10|
#| 3| a| 10|
#| 4| a| 10|
#| 5| a| 20|
#| 6| a| 20|
#| 7| a| 20|
#| 8| a| 20|
#| 9| a| 25|
#| 10| a| 25|
#| 11| a| 25|
#| 12| a| 25|
#+------+---+-----+
【讨论】:
【参考方案2】:无需加入也可以做到这一点。我已经对此进行了多次测试,具有不同的差距,它总是会工作只要始终提供数字 1 作为输入(因为你需要从那里开始的序列),它的范围总是直到 12。我使用了一个情侣窗口s来获得一个我可以在sequence中使用的列,然后使用表达式制作一个自定义序列,然后exploded 得到想要的结果。如果由于某种原因,您的输入中没有数字 1,请告诉我,我将更新我的解决方案。
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import when
w=Window().partitionBy("key").orderBy("number")
w2=Window().partitionBy("key").orderBy("number").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("number2", F.lag("number").over(w)).withColumn("diff", F.when((F.col("number2").isNotNull()) & ((F.col("number")-F.col("number2")) > 1), (F.col("number")-F.col("number2"))).otherwise(F.lit(0)))\
.withColumn("diff2", F.lead("diff").over(w)).withColumn("diff2", F.when(F.col("diff2").isNull(), F.lit(0)).otherwise(F.col("diff2"))).withColumn("diff2", F.when(F.col("diff2")!=0, F.col("diff2")-1).otherwise(F.col("diff2"))).withColumn("max", F.max("number").over(w2))\
.withColumn("diff2", F.when((F.col("number")==F.col("max")) & (F.col("number")<F.lit(12)), F.lit(12)-F.col("number")).otherwise(F.col("diff2")))\
.withColumn("number2", F.when(F.col("diff2")!=0,F.expr("""sequence(number,number+diff2,1)""")).otherwise(F.expr("""sequence(number,number+diff2,0)""")))\
.drop("diff","diff2","max")\
.withColumn("number2", F.explode("number2")).drop("number")\
.select("key", F.col("number2").alias("number"), "value")\
.show()
+---+------+-----+
|key|number|value|
+---+------+-----+
| a| 1| 6|
| a| 2| 10|
| a| 3| 10|
| a| 4| 10|
| a| 5| 20|
| a| 6| 20|
| a| 7| 20|
| a| 8| 20|
| a| 9| 25|
| a| 10| 25|
| a| 11| 25|
| a| 12| 25|
+---+------+-----+
【讨论】:
以上是关于Pyspark - 每个键添加缺失值?的主要内容,如果未能解决你的问题,请参考以下文章