Pyspark - 每个键添加缺失值?

Posted

技术标签:

【中文标题】Pyspark - 每个键添加缺失值?【英文标题】:Pyspark - add missing values per key? 【发布时间】:2020-03-11 12:00:15 【问题描述】:

我有一个 Pyspark 数据框,其中包含一些非唯一键 key 和一些列 numbervalue

对于大多数keysnumber 列从 1 到 12,但对于其中一些,numbers 中存在间隙(例如,我们有数字 [1, 2, 5, 9])。我想添加缺失的行,这样对于每个 key,我们都会在 1-12 范围内填充最后看到的值的所有 numbers

所以对于表

key    number    value
a      1         6
a      2         10
a      5         20
a      9         25

我想得到

key    number    value
a      1         6
a      2         10
a      3         10
a      4         10
a      5         20
a      6         20
a      7         20
a      8         20
a      9         25
a      10        25
a      11        25
a      12        25

我考虑过创建一个a 的表和一个 1-12 的数组,分解该数组并加入我的原始表,然后使用以当前行为边界的窗口函数分别用先前的值填充 value 列.不过好像有点不雅,不知道有没有更好的方法来实现我想要的?

【问题讨论】:

我认为在 pyspark 上下文中,你想要的很棘手。我的理解是 spark 最适合以列方式工作,添加、过滤、删除列等。我可以假设您事先不知道数字列中哪些键有间隙吗?您可以通过使用 groupBy() 和 agg() (hackingandslacking.com/…) 提取它,然后过滤以仅保留具有 【参考方案1】:

我考虑过创建一个 a 的表和一个 1-12 的数组,分解数组并与我的原始表连接,然后使用以当前行为边界的窗口函数分别用先前的值填充 value 列。不过好像有点不雅,不知道有没有更好的方法来实现我想要的?

我不认为您提出的方法不优雅 - 但您可以使用 range 而不是 explode 来实现同样的效果。

首先创建一个包含您范围内所有数字的数据框。您还需要将其与 DataFrame 中不同的 key 列交叉连接。

all_numbers = spark.range(1, 13).withColumnRenamed("id", "number")
all_numbers = all_numbers.crossJoin(df.select("key").distinct()).cache()
all_numbers.show()
#+------+---+
#|number|key|
#+------+---+
#|     1|  a|
#|     2|  a|
#|     3|  a|
#|     4|  a|
#|     5|  a|
#|     6|  a|
#|     7|  a|
#|     8|  a|
#|     9|  a|
#|    10|  a|
#|    11|  a|
#|    12|  a|
#+------+---+

现在您可以将其外部连接到您的原始 DataFrame 和 forward fill using the last known good value。如果key的数量足够少,或许可以广播

from pyspark.sql.functions import broadcast, last
from pyspark.sql import Window

df.join(broadcast(all_numbers), on=["number", "key"], how="outer")\
    .withColumn(
        "value", 
        last(
            "value", 
            ignorenulls=True
        ).over(
            Window.partitionBy("key").orderBy("number")\
                .rowsBetween(Window.unboundedPreceding, 0)
        )
    )\
    .show()
#+------+---+-----+
#|number|key|value|
#+------+---+-----+
#|     1|  a|    6|
#|     2|  a|   10|
#|     3|  a|   10|
#|     4|  a|   10|
#|     5|  a|   20|
#|     6|  a|   20|
#|     7|  a|   20|
#|     8|  a|   20|
#|     9|  a|   25|
#|    10|  a|   25|
#|    11|  a|   25|
#|    12|  a|   25|
#+------+---+-----+

【讨论】:

【参考方案2】:

无需加入也可以做到这一点。我已经对此进行了多次测试,具有不同的差距,它总是会工作只要始终提供数字 1 作为输入(因为你需要从那里开始的序列),它的范围总是直到 12。我使用了一个情侣窗口s来获得一个我可以在sequence中使用的列,然后使用表达式制作一个自定义序列,然后exploded 得到想要的结果。如果由于某种原因,您的输入中没有数字 1,请告诉我,我将更新我的解决方案。

from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import when
w=Window().partitionBy("key").orderBy("number")
w2=Window().partitionBy("key").orderBy("number").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("number2", F.lag("number").over(w)).withColumn("diff", F.when((F.col("number2").isNotNull()) & ((F.col("number")-F.col("number2")) > 1), (F.col("number")-F.col("number2"))).otherwise(F.lit(0)))\
.withColumn("diff2", F.lead("diff").over(w)).withColumn("diff2", F.when(F.col("diff2").isNull(), F.lit(0)).otherwise(F.col("diff2"))).withColumn("diff2", F.when(F.col("diff2")!=0, F.col("diff2")-1).otherwise(F.col("diff2"))).withColumn("max", F.max("number").over(w2))\
.withColumn("diff2", F.when((F.col("number")==F.col("max")) & (F.col("number")<F.lit(12)), F.lit(12)-F.col("number")).otherwise(F.col("diff2")))\
.withColumn("number2", F.when(F.col("diff2")!=0,F.expr("""sequence(number,number+diff2,1)""")).otherwise(F.expr("""sequence(number,number+diff2,0)""")))\
.drop("diff","diff2","max")\
.withColumn("number2", F.explode("number2")).drop("number")\
.select("key", F.col("number2").alias("number"), "value")\
.show()


+---+------+-----+
|key|number|value|
+---+------+-----+
|  a|     1|    6|
|  a|     2|   10|
|  a|     3|   10|
|  a|     4|   10|
|  a|     5|   20|
|  a|     6|   20|
|  a|     7|   20|
|  a|     8|   20|
|  a|     9|   25|
|  a|    10|   25|
|  a|    11|   25|
|  a|    12|   25|
+---+------+-----+

【讨论】:

以上是关于Pyspark - 每个键添加缺失值?的主要内容,如果未能解决你的问题,请参考以下文章

我想用 Pyspark 中的最后一行值填充缺失值:

在pyspark中用平均值填充缺失值

Pyspark 以递减的方式填充缺失值

在pyspark中填充每组的缺失值?

Pyspark Dataframe Imputations - 根据指定条件用列平均值替换未知和缺失值

Spark 数据框添加缺失值