使用 udf 传递列作为参数将自定义列添加到 pyspark 数据帧
Posted
技术标签:
【中文标题】使用 udf 传递列作为参数将自定义列添加到 pyspark 数据帧【英文标题】:Adding a custom column to a pyspark dataframe using udf passing columns as an argument 【发布时间】:2018-11-30 05:24:03 【问题描述】:我有一个包含两列的 spark 数据框,我正在尝试添加一个新列,该列引用这些列的新值。我从包含该列正确值的字典中获取这些值
+--------------+--------------------+
| country| zip|
+--------------+--------------------+
| Brazil| 7541|
|United Kingdom| 5678|
| Japan| 1234|
| Denmark| 2345|
| Canada| 4567|
| Italy| 6031|
| Sweden| 4205|
| France| 6111|
| Spain| 8555|
| India| 2552|
+--------------+--------------------+
国家/地区的正确值应该是 India,zip 应该是 1234,并且存储在字典中
column_dict = 'country' : 'India', zip: 1234
我正在尝试将新列值设为“巴西:印度,邮编:1234”,其中列的值与这些值不同。
我已经尝试过以下方式,但它返回的是空列,但函数正在返回所需的值
cols = list(df.columns)
col_list = list(column_dict.keys())
def update(df, cols = cols , col_list = col_list):
z = []
for col1, col2 in zip(cols,col_list):
if col1 == col2:
if df.col1 != column_dict[col2]:
z.append("'col':" + col2 + ", 'reco': " + str(column_dict[col2]) + "")
else:
z.append("'col':" + col2 + ", 'reco': ")
my_udf = udf(lambda x: update(x, cols, col_list))
z = y.withColumn("NewValue", lit(my_udf(y, cols,col_list)))
如果我将相同的输出数据帧导出到 csv 值,则附带“\”的部分。如何准确获取列上的函数值?
【问题讨论】:
您期待什么样的数据框?你的问题不是很清楚...你能告诉我们你感兴趣的结果吗? 【参考方案1】:一种简单的方法是从您的dictionary
和union()
创建一个数据框到您的主数据框,然后groupby
并获取last
值。在这里你可以这样做:
sc = SparkContext.getOrCreate()
newDf = sc.parallelize([
'country' : 'India', 'zip': 1234
]).toDF()
newDF.show()
新DF:
+-------+----+
|country| zip|
+-------+----+
| India|1234|
+-------+----+
和 finalDF:
unionDF = df.union(newDF)
unionDF.show()
+--------------+--------------------+
| country| zip|
+--------------+--------------------+
| Brazil| 7541|
|United Kingdom| 5678|
| Japan| 1234|
| Denmark| 2345|
| Canada| 4567|
| Italy| 6031|
| Sweden| 4205|
| France| 6111|
| Spain| 8555|
| India| 2552|
| India| 1234|
+--------------+--------------------+
最后做groupby
和last
:
import pyspark.sql.functions as f
finalDF = unionDF.groupbby('country').agg(f.last('zip'))
finalDF.show()
+--------------+--------------------+
| country| zip|
+--------------+--------------------+
| Brazil| 7541|
|United Kingdom| 5678|
| Japan| 1234|
| Denmark| 2345|
| Canada| 4567|
| Italy| 6031|
| Sweden| 4205|
| France| 6111|
| Spain| 8555|
| India| 1234|
+--------------+--------------------+
【讨论】:
以上是关于使用 udf 传递列作为参数将自定义列添加到 pyspark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章
阿帕奇火花。 UDF 列基于另一列而不将其名称作为参数传递。
更改 DataFrame 中的列数据类型并将其传递到 UDF - PySpark