PySpark:当列是列表时,将列添加到 DataFrame
Posted
技术标签:
【中文标题】PySpark:当列是列表时,将列添加到 DataFrame【英文标题】:PySpark: Add a column to DataFrame when column is a list 【发布时间】:2016-03-21 13:54:34 【问题描述】:我已经阅读了类似的问题,但找不到针对我的具体问题的解决方案。
我有一个清单
l = [1, 2, 3]
和一个数据框
df = sc.parallelize([
['p1', 'a'],
['p2', 'b'],
['p3', 'c'],
]).toDF(('product', 'name'))
我想获得一个新的DataFrame,其中列表l
被添加为另一列,即
+-------+----+---------+
|product|name| new_col |
+-------+----+---------+
| p1| a| 1 |
| p2| b| 2 |
| p3| c| 3 |
+-------+----+---------+
使用 JOIN 的方法,我用
加入 df sc.parallelize([[1], [2], [3]])
失败了。使用withColumn
的方法,如
new_df = df.withColumn('new_col', l)
由于列表不是Column
对象而失败。
【问题讨论】:
我认为这是一个很好的问题,因为它显示了 Spark DataFrames API 中严重缺失的功能。 【参考方案1】:因此,通过阅读here 的一些有趣内容,我确定您实际上不能只是将随机/任意列附加到给定的DataFrame
对象。看来您想要的更多的是zip
,而不是join
。我环顾四周,找到了this ticket,这让我觉得你不能zip
,因为你有DataFrame
而不是RDD
对象。
我能够解决您的问题的唯一方法是离开DataFrame
对象的世界并返回RDD
对象。我还需要为连接目的创建一个索引,这可能适用于您的用例,也可能不适用于您的用例。
l = sc.parallelize([1, 2, 3])
index = sc.parallelize(range(0, l.count()))
z = index.zip(l)
rdd = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c']])
rdd_index = index.zip(rdd)
# just in case!
assert(rdd.count() == l.count())
# perform an inner join on the index we generated above, then map it to look pretty.
new_rdd = rdd_index.join(z).map(lambda (x, y): [y[0][0], y[0][1], y[1]])
new_df = new_rdd.toDF(["product", 'name', 'new_col'])
当我运行new_df.show()
时,我得到:
+-------+----+-------+
|product|name|new_col|
+-------+----+-------+
| p1| a| 1|
| p2| b| 2|
| p3| c| 3|
+-------+----+-------+
旁注:我真的很惊讶这不起作用。看起来像外连接?
from pyspark.sql import Row
l = sc.parallelize([1, 2, 3])
new_row = Row("new_col_name")
l_as_df = l.map(new_row).toDF()
new_df = df.join(l_as_df)
当我运行new_df.show()
时,我得到:
+-------+----+------------+
|product|name|new_col_name|
+-------+----+------------+
| p1| a| 1|
| p1| a| 2|
| p1| a| 3|
| p2| b| 1|
| p3| c| 1|
| p2| b| 2|
| p2| b| 3|
| p3| c| 2|
| p3| c| 3|
+-------+----+------------+
【讨论】:
请注意,您可以直接使用 df.rdd 将 DataFrame 转换为 RDD,例如在问题中的我的 df 上。【参考方案2】:如果product
列是唯一的,则考虑以下方法:
原始数据框:
df = spark.sparkContext.parallelize([
['p1', 'a'],
['p2', 'b'],
['p3', 'c'],
]).toDF(('product', 'name'))
df.show()
+-------+----+
|product|name|
+-------+----+
| p1| a|
| p2| b|
| p3| c|
+-------+----+
新列(和新索引列):
lst = [1, 2, 3]
indx = ['p1','p2','p3']
从上面的列表中创建一个新的数据框(带有索引):
from pyspark.sql.types import *
myschema= StructType([ StructField("indx", StringType(), True),
StructField("newCol", IntegerType(), True)
])
df1=spark.createDataFrame(zip(indx,lst),schema = myschema)
df1.show()
+----+------+
|indx|newCol|
+----+------+
| p1| 1|
| p2| 2|
| p3| 3|
+----+------+
使用创建的索引将其加入原始数据框:
dfnew = df.join(df1, df.product == df1.indx,how='left')\
.drop(df1.indx)\
.sort("product")
得到:
dfnew.show()
+-------+----+------+
|product|name|newCol|
+-------+----+------+
| p1| a| 1|
| p2| b| 2|
| p3| c| 3|
+-------+----+------+
【讨论】:
【参考方案3】:这可以通过 RDD 实现。
1 将数据帧转换为索引rdds:
df_rdd = df.rdd.zipWithIndex().map(lambda row: (row[1], (row[0][0], row[0][1])))
l_rdd = sc.parallelize(l).zipWithIndex().map(lambda row: (row[1], row[0]))
2 在索引上加入两个 RDD,删除索引并重新排列元素:
res_rdd = df_rdd.join(l_rdd).map(lambda row: [row[1][0][0], row[1][0][1], row[1][1]])
3 将结果转换为数据框:
res_df = res_rdd.toDF(['product', 'name', 'new_col'])
res_df.show()
+-------+----+-------+
|product|name|new_col|
+-------+----+-------+
| p1| a| 1|
| p2| b| 2|
| p3| c| 3|
+-------+----+-------+
【讨论】:
以上是关于PySpark:当列是列表时,将列添加到 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章
当列是现有列的布尔测试时,为啥向 Pandas DataFrame 添加列会返回 SettingWithCopy 警告?
当列是NTEXT时,SQL Server:IN('asd')不工作
警告:将列添加到从函数返回的 data.table 时“检测到无效 .internal.selfref”