从另一个 DataFrame 将列添加到 Pyspark DataFrame

Posted

技术标签:

【中文标题】从另一个 DataFrame 将列添加到 Pyspark DataFrame【英文标题】:Add column to Pyspark DataFrame from another DataFrame 【发布时间】:2020-12-04 21:35:11 【问题描述】:

我有这个:

df_e :=     
|country, name, year, c2, c3, c4|       
|Austria, Jon Doe, 2003, 21.234, 54.234, 345.434|       
...

df_p :=     
|name, 2001, 2002, 2003, 2004|       
|Jon Doe, 2849234, 12384312, 123908234, 12398193|       
...

从 csv 文件读取的两个 Pyspark 数据帧。

如何在df_e中创建一个名为“amount”的新列,它以每条记录的名称和年份值作为df_e的引用,并从df_p获取相应的金额?使用 Pyspark。

在这种情况下,我应该得到以下 DataFrame:

df_e :=     
|country, name, year, c2, c3, c4, amount|       
|Austria, Jon Doe, 2003, 21.234, 54.234, 345.434, 123908234|       
...

感谢您的帮助!

编辑:

这就是我阅读文件的方式:

from pyspark import SparkContext, SparkConf       
from pyspark.sql import SparkSession       

sc = SparkContext.getOrCreate(SparkConf().setMaster('local[*]'))       
spark = SparkSession.builder.getOrCreate()       

df_e = spark.read.option('header', 'true').option('inferSchema', 'true').csv('data/e.csv')       
df_p = spark.read.option('header', 'true').option('inferSchema', 'true').csv('data/p.csv')       

我是从 Pyspark 开始的,所以我真的不知道我可以用什么函数来解决这个问题。

对于 pandas,我会通过遍历 DataFrame 来做到这一点,如下所示:

for i in df_e.index:       
    p[i] = df_p.query('name == ""'.format(df_e['name'][i]))[''.format(df_e['year'][i])]

然后将列表 p 作为新列添加到 df_e(虽然我知道可能有更好的方法来做到这一点)。

【问题讨论】:

也许你想分享你所做的工作。 【参考方案1】:
import pyspark.sql.functions as F

### i am assumming all the columns are years in this except the first one 
### you can manually specify the list also   ['2003','2005']  etc .. 
columns_to_transpose=df_p .columns[1:] 
k=[]
for x in columns_to_pivot:
    k.append(F.struct(F.lit(f'x').alias('year'),F.col(x).alias('year_value')))
df_p_new=df_p.withColumn('New',F.explode(F.array(k))).select([F.col('Name').alias('JOIN_NAME'),F.col('New')['YEAR'].alias('NEW_YEAR'),F.col('New')['year_value'].alias('YEAR_VALUE')])

>>> df_p_new.show()
+---------+--------+----------+
|JOIN_NAME|NEW_YEAR|YEAR_VALUE|
+---------+--------+----------+
|John Doe |    2001|   2849234|
|John Doe |    2002|  12384312|
|John Doe |    2003| 123908234|
|John Doe |    2004|  12398193|
+---------+--------+----------+

##Column Names are case sensitive 
df_answer=df_e.join(df_p_new,(df_p_new.JOIN_NAME==df_e.name) & (df_p_new.NEW_YEAR==df_e.year), how='left').select(*df_e.columns,'YEAR_VALUE')
df_answer.show()

    
+-------+--------+----+------+------+-------+----------+
|country|    name|year|    c2|    c3|     c4|YEAR_VALUE|
+-------+--------+----+------+------+-------+----------+
|Austria|John Doe|2003|21.234|54.234|345.434| 123908234|
+-------+--------+----+------+------+-------+----------+


df_answer.select([*df_e.columns,'YEAR_VALUE'])
    
    ## you can use the alias  to  rename the columns 

【讨论】:

以上是关于从另一个 DataFrame 将列添加到 Pyspark DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

如何动态地将列添加到 DataFrame?

更快地遍历一个 DataFrame 的行以将列添加到第二个 DataFrame

PySpark:当列是列表时,将列添加到 DataFrame

将列添加到包含其他列值列表的 pandas DataFrame

如何在不从 DataFrame 转换并访问它的情况下将列添加到 Dataset?

Python pandas:使用方法链接将列添加到分组的 DataFrame