向 pyspark 中的数据框添加列

Posted

技术标签:

【中文标题】向 pyspark 中的数据框添加列【英文标题】:Adding a column to a dataframe in pyspark 【发布时间】:2021-05-11 06:29:09 【问题描述】:

我想使用pyspark根据现有列的值向数据框添加新列。

例如,如果这是原始数据框,我想添加一个名为“parent's data”的新列,其中包含基于列“parent_id”的父级数据,以便生成的数据框如下所示。

任何帮助将不胜感激。谢谢。

【问题讨论】:

你可以在 id 和 parent_id 上使用 self join 来实现你想要的输出。 你试过什么?这只是一个简单的连接。 【参考方案1】:

我确信有多种方法可以实现这一目标。但是,最简单的方法是使用 2 列现有数据框创建一个新数据框。然后加入 2 个数据框来实现这一点。

这里是代码

df1 = pd.DataFrame([[1, 'a', 2], [2, 'b', 3], [3, 'c', 1]], columns=["id", "data", "parent_id"])
print(df1)
sparkdf=spark.createDataFrame(df1)
sparkdf.show()
sparkdf2=sparkdf.select('id','data')
sparkdf2.show()
sparkdf.registerTempTable("sparkdf")
sparkdf2.registerTempTable("sparkdf2")

sparkdf3=spark.sql('select a.id,a.data,a.parent_id,b.data from sparkdf as a join sparkdf2 as b on a.parent_id=b.id')
sparkdf3.show()

【讨论】:

【参考方案2】:

您可以从 id 和 data 两列创建字典,然后使用 withColumn 添加新列:

>>> d = row["id"]:row["data"] for row in df.collect()
Out[260]: 1: 'a', 2: 'b', 3: 'c'

from itertools import chain
from pyspark.sql.functions import create_map, lit

m = create_map([lit(x) for x in chain(*d.items())])
df = df.withColumn('parent_data', m[df['parent_id']])

打印回来:

>>> df.show(truncate=False)

+---+----+---------+-----------+
|id |data|parent_id|parent_data|
+---+----+---------+-----------+
|1  |a   |2        |b          |
|2  |b   |3        |c          |
|3  |c   |1        |a          |
+---+----+---------+-----------+

【讨论】:

以上是关于向 pyspark 中的数据框添加列的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 向数据框添加顺序和确定性索引

将row_number添加到数据框pyspark中的连接列

PySpark向现有DataFrame添加列 - TypeError:无效参数,不是字符串或列

怎么解决 ? (将列表添加到列数据框pyspark)

如何创建 Pyspark UDF 以向数据框添加新列

PySpark - 将列表作为参数传递给 UDF + 迭代数据框列添加