从另一个 DataFrame 将列添加到 Pyspark DataFrame
Posted
技术标签:
【中文标题】从另一个 DataFrame 将列添加到 Pyspark DataFrame【英文标题】:Add column to Pyspark DataFrame from another DataFrame 【发布时间】:2020-12-04 21:35:11 【问题描述】:我有这个:
df_e :=
|country, name, year, c2, c3, c4|
|Austria, Jon Doe, 2003, 21.234, 54.234, 345.434|
...
df_p :=
|name, 2001, 2002, 2003, 2004|
|Jon Doe, 2849234, 12384312, 123908234, 12398193|
...
从 csv 文件读取的两个 Pyspark 数据帧。
如何在df_e中创建一个名为“amount”的新列,它以每条记录的名称和年份值作为df_e的引用,并从df_p获取相应的金额?使用 Pyspark。
在这种情况下,我应该得到以下 DataFrame:
df_e :=
|country, name, year, c2, c3, c4, amount|
|Austria, Jon Doe, 2003, 21.234, 54.234, 345.434, 123908234|
...
感谢您的帮助!
编辑:
这就是我阅读文件的方式:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate(SparkConf().setMaster('local[*]'))
spark = SparkSession.builder.getOrCreate()
df_e = spark.read.option('header', 'true').option('inferSchema', 'true').csv('data/e.csv')
df_p = spark.read.option('header', 'true').option('inferSchema', 'true').csv('data/p.csv')
我是从 Pyspark 开始的,所以我真的不知道我可以用什么函数来解决这个问题。
对于 pandas,我会通过遍历 DataFrame 来做到这一点,如下所示:
for i in df_e.index:
p[i] = df_p.query('name == ""'.format(df_e['name'][i]))[''.format(df_e['year'][i])]
然后将列表 p 作为新列添加到 df_e(虽然我知道可能有更好的方法来做到这一点)。
【问题讨论】:
也许你想分享你所做的工作。 【参考方案1】:import pyspark.sql.functions as F
### i am assumming all the columns are years in this except the first one
### you can manually specify the list also ['2003','2005'] etc ..
columns_to_transpose=df_p .columns[1:]
k=[]
for x in columns_to_pivot:
k.append(F.struct(F.lit(f'x').alias('year'),F.col(x).alias('year_value')))
df_p_new=df_p.withColumn('New',F.explode(F.array(k))).select([F.col('Name').alias('JOIN_NAME'),F.col('New')['YEAR'].alias('NEW_YEAR'),F.col('New')['year_value'].alias('YEAR_VALUE')])
>>> df_p_new.show()
+---------+--------+----------+
|JOIN_NAME|NEW_YEAR|YEAR_VALUE|
+---------+--------+----------+
|John Doe | 2001| 2849234|
|John Doe | 2002| 12384312|
|John Doe | 2003| 123908234|
|John Doe | 2004| 12398193|
+---------+--------+----------+
##Column Names are case sensitive
df_answer=df_e.join(df_p_new,(df_p_new.JOIN_NAME==df_e.name) & (df_p_new.NEW_YEAR==df_e.year), how='left').select(*df_e.columns,'YEAR_VALUE')
df_answer.show()
+-------+--------+----+------+------+-------+----------+
|country| name|year| c2| c3| c4|YEAR_VALUE|
+-------+--------+----+------+------+-------+----------+
|Austria|John Doe|2003|21.234|54.234|345.434| 123908234|
+-------+--------+----+------+------+-------+----------+
df_answer.select([*df_e.columns,'YEAR_VALUE'])
## you can use the alias to rename the columns
【讨论】:
以上是关于从另一个 DataFrame 将列添加到 Pyspark DataFrame的主要内容,如果未能解决你的问题,请参考以下文章
更快地遍历一个 DataFrame 的行以将列添加到第二个 DataFrame
PySpark:当列是列表时,将列添加到 DataFrame
将列添加到包含其他列值列表的 pandas DataFrame