Pyspark Dataframe Imputations - 根据指定条件用列平均值替换未知和缺失值

Posted

技术标签:

【中文标题】Pyspark Dataframe Imputations - 根据指定条件用列平均值替换未知和缺失值【英文标题】:Pyspark Dataframe Imputations -- Replace Unknown & Missing Values with Column Mean based on specified condition 【发布时间】:2016-09-22 08:06:52 【问题描述】:

给定一个 Spark 数据框,我想根据该列的非缺失值和非未知值计算列平均值。然后我想取这个平均值并用它来替换列的缺失和未知值。

例如,假设我正在使用:

名为 df 的数据框,其中每条记录代表一个人,所有列都是整数或数字 名为年龄的列(每条记录的年龄) 名为 missing_age 的列(如果此人没有年龄,则为 1,否则为 0) 名为 unknown_age 的列(如果此人年龄未知,则为 1,否则为 0)

然后我可以计算这个平均值,如下所示。

calc_mean = df.where((col("unknown_age") == 0) & (col("missing_age") == 0))
.agg(avg(col("age")))

或通过 SQL 和 windows 函数,

mean_compute = hiveContext.sql("select avg(age) over() as mean from df 
where missing_age = 0 and unknown_age = 0")

如果我能提供帮助,我想使用 SQL/windows 函数。我的挑战是采用这种平均值并使用非 SQL 方法用它替换未知/缺失值。

我尝试过使用 when()、where()、replace()、withColumn、UDF 和组合...无论我做什么,我要么得到错误,要么结果不是我所期望的。这是我尝试过但不起作用的许多事情之一的示例。

imputed = df.when((col("unknown_age") == 1) | (col("missing_age") == 1),
calc_mean).otherwise("age")

我在网上搜索过,但没有找到类似的插补类型问题,因此非常感谢任何帮助。这可能是我错过的非常简单的事情。

附注——我正在尝试将此代码应用于 Spark Dataframe 中列名中没有 unknown_ 或 missing_ 的所有列。我可以将 Spark 相关代码包装在 Python 的“for 循环”中并遍历所有适用的列来执行此操作吗?

更新:

还想出了如何遍历列...这是一个示例。

for x in df.columns:
    if 'unknown_' not in x and 'missing_' not in x:
        avg_compute = df.where(df['missing_' + x] != 1).agg(avg(x)).first()[0]
        df = df.withColumn(x + 'mean_miss_imp', when((df['missing_' + x] == 1), 
        avg_compute).otherwise(df[x]))

【问题讨论】:

【参考方案1】:

如果未知或失踪的年龄是某个值:

from pyspark.sql.functions import col, avg, when

df = sc.parallelize([
    (10, 0, 0), (20, 0, 0), (-1, 1, 0), (-1, 0, 1)
]).toDF(["age", "missing_age", "unknown_age"])

avg_age = df.where(
    (col("unknown_age") != 1) & (col("missing_age") != 1)
).agg(avg("age")).first()[0]

df.withColumn("age_imp", when(
    (col("unknown_age") == 1) | (col("missing_age") == 1), avg_age
).otherwise(col("age")))

如果未知或缺失的年龄为 NULL,您可以将其简化为:

df = sc.parallelize([
    (10, 0, 0), (20, 0, 0), (None, 1, 0), (None, 0, 1)
]).toDF(["age", "missing_age", "unknown_age"])

df.na.fill(df.na.drop().agg(avg("age")).first()[0], ["age"])

【讨论】:

非常感谢!你的帮助成就了我的一周!我还想出了如何应用到所有列并更新了帖子。

以上是关于Pyspark Dataframe Imputations - 根据指定条件用列平均值替换未知和缺失值的主要内容,如果未能解决你的问题,请参考以下文章

PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解

PySpark:转换DataFrame中给定列的值

PySpark|比RDD更快的DataFrame

Pyspark:将 pyspark.sql.row 转换为 Dataframe

是否可以在 Pyspark 中对 DataFrame 进行子类化?

在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe