如何在条件下在pyspark上创建一个新列?
Posted
技术标签:
【中文标题】如何在条件下在pyspark上创建一个新列?【英文标题】:How to create a new column on pyspark under condition? 【发布时间】:2018-05-28 14:18:48 【问题描述】:我在spark
中有以下data.frame
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
from pyspark.sql import functions as sf
from pyspark.sql.functions import col, when, lit
ddf = spark.createDataFrame([[None, 'Michael',2],
[30, 'Andy',3],
[19, 'Justin',4],
[30, 'James Dr No From Russia with Love Bond',6]],
schema=['age', 'name','weights'])
ddf.show()
在这个简单的示例中,我想创建两列:一列是 weighted.mean
的 weighted.mean
,如果 age>29
(名称为 weighted_age
),另一列是 age^2
,如果 age<=29
(带有名字age_squared
)
【问题讨论】:
weighted.mean 是什么意思? 所以在这个例子中,age>29
的人的加权平均值将是 (30*3 + 30*6)/(6+3)
如果它回答了你的问题,请在下面查看我的回答
【参考方案1】:
您应该首先使用age > 29
从整个数据集中找到weighted.mean
,然后使用withColumn
进行填充。这是因为weighted.mean
依赖于整个数据集。
age_squared
可以逐行做为
from pyspark.sql import functions as f
weightedMean = ddf.filter(f.col('age')>29).select(f.sum(f.col('age')*f.col('weights'))/f.sum(f.col('weights'))).first()[0]
ddf.withColumn('weighted_age', f.when(f.col('age') > 29, weightedMean))\
.withColumn('age_squared', f.when(f.col('age') <= 29, f.col('age')*f.col('age')))\
.show(truncate=False)
这应该给你
+----+--------------------------------------+-------+------------+-----------+
|age |name |weights|weighted_age|age_squared|
+----+--------------------------------------+-------+------------+-----------+
|null|Michael |2 |null |null |
|30 |Andy |3 |30.0 |null |
|19 |Justin |4 |null |361 |
|30 |James Dr No From Russia with Love Bond|6 |30.0 |null |
+----+--------------------------------------+-------+------------+-----------+
您可以使用 .otherwise
和 when
函数填充其他值,而不是填充默认的 null
【讨论】:
感谢您的回答。有没有什么办法可以并行执行这两个if
条件的操作?
因此,创建了两列。一个在age >29
条件下创建(第一个if
条件),另一个在age <= 29
条件下创建(第二个if
条件)
我要求教育目的。因为如果数据集大约有 10m 行。那么把它分成两部分是有意义的,对吧?
dataframes 已经是分布式的,并且 withColumn 在本质上是分布式的。您只需要定义适合您的集群和数据的分区即可。
我该怎么做?我应该再发一个问题吗?以上是关于如何在条件下在pyspark上创建一个新列?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 pyspark 数据框列上拟合内核密度估计并将其用于创建具有估计的新列
如何在pyspark数据框中添加多个带有when条件的新列?