使用 udf 传递列作为参数将自定义列添加到 pyspark 数据帧

Posted

技术标签:

【中文标题】使用 udf 传递列作为参数将自定义列添加到 pyspark 数据帧【英文标题】:Adding a custom column to a pyspark dataframe using udf passing columns as an argument 【发布时间】:2018-11-30 05:24:03 【问题描述】:

我有一个包含两列的 spark 数据框,我正在尝试添加一个新列,该列引用这些列的新值。我从包含该列正确值的字典中获取这些值

+--------------+--------------------+
|       country|                 zip|
+--------------+--------------------+
|        Brazil|                7541|
|United Kingdom|                5678|
|         Japan|                1234|
|       Denmark|                2345|
|        Canada|                4567|
|         Italy|                6031|
|        Sweden|                4205|
|        France|                6111|
|         Spain|                8555|
|         India|                2552|
+--------------+--------------------+

国家/地区的正确值应该是 India,zip 应该是 1234,并且存储在字典中

column_dict = 'country' : 'India', zip: 1234

我正在尝试将新列值设为“巴西:印度,邮编:1234”,其中列的值与这些值不同。

我已经尝试过以下方式,但它返回的是空列,但函数正在返回所需的值

     cols = list(df.columns)
     col_list = list(column_dict.keys())

def update(df, cols = cols , col_list = col_list):
   z = []
   for col1, col2 in zip(cols,col_list):
      if col1 == col2:
         if df.col1 != column_dict[col2]: 
            z.append("'col':" + col2  + ", 'reco': " + str(column_dict[col2]) + "")   
         else:
            z.append("'col':" + col2  + ", 'reco': ")

my_udf = udf(lambda x: update(x, cols, col_list))
z = y.withColumn("NewValue", lit(my_udf(y, cols,col_list)))

如果我将相同的输出数据帧导出到 csv 值,则附带“\”的部分。如何准确获取列上的函数值?

【问题讨论】:

您期待什么样的数据框?你的问题不是很清楚...你能告诉我们你感兴趣的结果吗? 【参考方案1】:

一种简单的方法是从您的dictionaryunion() 创建一个数据框到您的主数据框,然后groupby 并获取last 值。在这里你可以这样做:

sc = SparkContext.getOrCreate()

newDf = sc.parallelize([
    'country' : 'India', 'zip': 1234
]).toDF()

newDF.show()

新DF:

+-------+----+
|country| zip|
+-------+----+
|  India|1234|
+-------+----+

和 finalDF:

unionDF = df.union(newDF)

unionDF.show()
+--------------+--------------------+
|       country|                 zip|
+--------------+--------------------+
|        Brazil|                7541|
|United Kingdom|                5678|
|         Japan|                1234|
|       Denmark|                2345|
|        Canada|                4567|
|         Italy|                6031|
|        Sweden|                4205|
|        France|                6111|
|         Spain|                8555|
|         India|                2552|
|         India|                1234|
+--------------+--------------------+

最后做groupbylast

import pyspark.sql.functions as f

finalDF = unionDF.groupbby('country').agg(f.last('zip'))

finalDF.show()

+--------------+--------------------+
|       country|                 zip|
+--------------+--------------------+
|        Brazil|                7541|
|United Kingdom|                5678|
|         Japan|                1234|
|       Denmark|                2345|
|        Canada|                4567|
|         Italy|                6031|
|        Sweden|                4205|
|        France|                6111|
|         Spain|                8555|
|         India|                1234|
+--------------+--------------------+

【讨论】:

以上是关于使用 udf 传递列作为参数将自定义列添加到 pyspark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章

阿帕奇火花。 UDF 列基于另一列而不将其名称作为参数传递。

在 UDF 正文中将列作为参数传递

更改 DataFrame 中的列数据类型并将其传递到 UDF - PySpark

PySpark - 将列表作为参数传递给 UDF + 迭代数据框列添加

如何将自定义 css 工具提示添加到 extjs 列标题?

以整齐的方式将多列作为分组变量传递给 UDF