如何在条件下在pyspark上创建一个新列?

Posted

技术标签:

【中文标题】如何在条件下在pyspark上创建一个新列?【英文标题】:How to create a new column on pyspark under condition? 【发布时间】:2018-05-28 14:18:48 【问题描述】:

我在spark 中有以下data.frame

import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
from pyspark.sql import functions as sf
from pyspark.sql.functions import col, when, lit

ddf = spark.createDataFrame([[None, 'Michael',2],
                             [30, 'Andy',3],
                             [19, 'Justin',4],
                             [30, 'James Dr No From Russia with Love Bond',6]],
                            schema=['age', 'name','weights'])
ddf.show()

在这个简单的示例中,我想创建两列:一列是 weighted.meanweighted.mean,如果 age>29(名称为 weighted_age),另一列是 age^2,如果 age<=29(带有名字age_squared)

【问题讨论】:

weighted.mean 是什么意思? 所以在这个例子中,age>29 的人的加权平均值将是 (30*3 + 30*6)/(6+3) 如果它回答了你的问题,请在下面查看我的回答 【参考方案1】:

您应该首先使用age > 29 从整个数据集中找到weighted.mean,然后使用withColumn 进行填充。这是因为weighted.mean 依赖于整个数据集。

age_squared可以逐行做为

from pyspark.sql import functions as f
weightedMean = ddf.filter(f.col('age')>29).select(f.sum(f.col('age')*f.col('weights'))/f.sum(f.col('weights'))).first()[0]

ddf.withColumn('weighted_age', f.when(f.col('age') > 29, weightedMean))\
    .withColumn('age_squared', f.when(f.col('age') <= 29, f.col('age')*f.col('age')))\
    .show(truncate=False)

这应该给你

+----+--------------------------------------+-------+------------+-----------+
|age |name                                  |weights|weighted_age|age_squared|
+----+--------------------------------------+-------+------------+-----------+
|null|Michael                               |2      |null        |null       |
|30  |Andy                                  |3      |30.0        |null       |
|19  |Justin                                |4      |null        |361        |
|30  |James Dr No From Russia with Love Bond|6      |30.0        |null       |
+----+--------------------------------------+-------+------------+-----------+

您可以使用 .otherwisewhen 函数填充其他值,而不是填充默认的 null

【讨论】:

感谢您的回答。有没有什么办法可以并行执行这两个if条件的操作? 因此,创建了两列。一个在age &gt;29条件下创建(第一个if条件),另一个在age &lt;= 29条件下创建(第二个if条件) 我要求教育目的。因为如果数据集大约有 10m 行。那么把它分成两部分是有意义的,对吧? dataframes 已经是分布式的,并且 withColumn 在本质上是分布式的。您只需要定义适合您的集群和数据的分区即可。 我该怎么做?我应该再发一个问题吗?

以上是关于如何在条件下在pyspark上创建一个新列?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 pyspark 数据框列上拟合内核密度估计并将其用于创建具有估计的新列

如何在pyspark数据框中添加多个带有when条件的新列?

使用 pyspark 基于 if 和 else 条件创建新列

如何创建 Pyspark UDF 以向数据框添加新列

如何根据 Pyspark 中数组列中的值创建新列

如何拆分pyspark数据框并创建新列