在 UDF 之后将新列附加到现有 PySpark 数据帧

Posted

技术标签:

【中文标题】在 UDF 之后将新列附加到现有 PySpark 数据帧【英文标题】:Appending new column after UDF to existing PySpark dataframe 【发布时间】:2017-09-08 15:15:28 【问题描述】:

我有以下示例数据框。

+-------+--------+--------+--------+
| data1 | data 2 | data 3 | data 4 |
+-------+--------+--------+--------+
|1      |abc     |abd     |3       |
+-------+--------+--------+--------+
|3      |abd     |abd     |3       |
+-------+--------+--------+--------+
|2      |abe     |abg     |2       |

例如,我正在应用一个 UDF,它将数据 4 转换为 True(如果为 3)和False(如果为 2)。

我正在使用以下代码生成一个独立的DataFrame,其中包含一列中的旧值和新值:

UDF = udf(converterFnc,StringType())
tempDF = mydata.select('data 4', UDF('data 4').alias('newdata 4'))

并获得以下 DataFrame:

+--------+-----------+
| data 4 | newdata 4 |
+--------+-----------+
| 3      | True      |
+--------+-----------+
| 2      | False     |

我试图弄清楚如何将其合并回原始数据框,但我发现使用join 时遇到了一个奇怪的问题,其中所有连接的值都只是整个数据框的第一个值。

我想要的输出:

+-------+--------+--------+--------+-----------+
| data1 | data 2 | data 3 | data 4 | newdata 4 |
+-------+--------+--------+--------+-----------+
|1      |abc     |abd     |3       | True      |
+-------+--------+--------+--------+-----------+
|3      |abd     |abd     |3       | True      |
+-------+--------+--------+--------+-----------+
|2      |abe     |abg     |2       | False     |

谢谢!

【问题讨论】:

【参考方案1】:

您可以使用withColumnwhen.otherwise 创建一个没有joining 进程的新列:

import pyspark.sql.functions as F
df.withColumn("newdata 4", F.when(df["data 4"] == 3, True).otherwise(F.when(df["data 4"] == 2, False))).show()
+-----+------+------+------+---------+
|data1|data 2|data 3|data 4|newdata 4|
+-----+------+------+------+---------+

|    1|   abc|   abd|     3|     true|
|    3|   abd|   abd|     3|     true|
|    2|   abe|   abg|     2|    false|
+-----+------+------+------+---------+

【讨论】:

谢谢!我的方法比简单的 True/False 分类器要复杂一些,但 withColumn 方法我能够让它工作! 酷。很高兴它有帮助!

以上是关于在 UDF 之后将新列附加到现有 PySpark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章

如何创建 Pyspark UDF 以向数据框添加新列

如何将新列添加到现有表 symfony - orocommerce

PySpark / 计算出现次数并使用 UDF 创建新列

PySpark 用户定义函数 (UDF) 创建新列

Pyspark:使用带有参数的UDF创建一个新列[重复]

如何使用 jdbc pyspark python 在现有表中添加新列?