Pyspark 中的 UDF 和 python 函数

Posted

技术标签:

【中文标题】Pyspark 中的 UDF 和 python 函数【英文标题】:UDF and python function in Pyspark 【发布时间】:2018-12-05 11:03:45 【问题描述】:

我在 Pyspark 中有一个数据框:

listA = [(1,'AAA','USA'),(2,'XXX','CHN'),(3,'KKK','USA'),(4,'PPP','USA'),(5,'EEE','USA'),(5,'HHH','THA')]
df = spark.createDataFrame(listA, ['id', 'name','country'])

我创建了一个字典:

thedict="USA":"WASHINGTON","CHN":"BEIJING","DEFAULT":"KEY NOT FOUND"

然后我创建了一个 UDF 来从字典中获取匹配的键值。

def my_func(letter):
    if(thedict.get(letter) !=None):
        return thedict.get(letter)
    else:
        return thedict.get("DEFAULT")

尝试调用函数时出现以下错误:

df.withColumn('CAPITAL',my_func(df.country))

  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 1848, in withColumn
    assert isinstance(col, Column), "col should be Column"
AssertionError: col should be Column

如果我用 pyspark.sql.functions 嵌入它,它工作正常。

from pyspark.sql.functions import col, udf
udfdict = udf(my_func,StringType())

df.withColumn('CAPITAL',udfdict(df.country)).show()

+---+----+-------+-------------+
| id|name|country|      CAPITAL|
+---+----+-------+-------------+
|  1| AAA|    USA|   WASHINGTON|
|  2| XXX|    CHN|      BEIJING|
|  3| KKK|    USA|   WASHINGTON|
|  4| PPP|    USA|   WASHINGTON|
|  5| EEE|    USA|   WASHINGTON|
|  5| HHH|    THA|KEY NOT FOUND|
+---+----+-------+-------------+

我不明白这两个调用有什么区别?

【问题讨论】:

你写的每一个函数需要应用到列上,你必须把它转换成一个pyspark UDF然后使用它! @阿里。这似乎不是真的。见下面的代码。它工作正常。 listA = [('A',10,20,40,60),('B',10,10,10,40)] df = spark.createDataFrame(listA, ['id', 'M1','M2 ','M3','M4']) def add_column(*args): num=0 for i in args: num = num +i return num newdf = df.withColumn('TOTAL', add_column(df.M1,df .M2,df.M3)) no 我创建了一个 python udf 为 :def add_column(*args): num=0 for i in args: num = num +i return num 是的,我明白了。您是否尝试过任何类似add_column 的功能,但没有使用udf 将其应用于列? 我不知道到底是什么问题,但我建议您使用udf 来完成您的工作! 【参考方案1】:

UDF 函数具有特殊属性,因为它们采用列/s 并按行应用逻辑来生成新列。而一个常见的 Python 函数只接受一个离散参数并产生一个输出。

这就是错误所在。函数的返回值不是列

assert isinstance(col, Column), "col 应该是 Column"

你可以通过两种方式定义udf:

    myudf = udf(LAMBDA_EXPRESSION, RETURN_TYPE) myudf = udf(CUSTOM_FUNCTION, RETURN_TYPE)

【讨论】:

我明白了,感谢您的意见。但是为什么它在评论部分中解释的示例中起作用。我还使用 withCoulmn 调用 python 函数并将多个参数传递给它。我错过了什么吗?

以上是关于Pyspark 中的 UDF 和 python 函数的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:从 Python 到 Pyspark 实现 lambda 函数和 udf

pyspark 中的重型有状态 UDF

SparkSession 中的 udf 和 pyspark.sql.functions 中的 udf 有啥区别

pyspark 中的 UDF 能否返回与列不同的对象?

udf(用户定义函数)如何在 pyspark 中工作?

Pyspark 和使用 UDF:如何将 Python 参数(sys.argv、argparse)传递给 Python Worker?