pyspark中未定义的函数UDF?

Posted

技术标签:

【中文标题】pyspark中未定义的函数UDF?【英文标题】:Undefined function UDF in pyspark? 【发布时间】:2017-12-22 20:11:36 【问题描述】:

我有一个在 Dataframe 中调用的 UDF,但我得到未定义的 udf。

global ac
ac = sc.accumulator(0)

def incrementAC():
  ac.add(1)
  return str(ac.value)

df = sc.parallelize([('Java',90),('Scala',95),('Spark',92)]).toDF(["language","rank"])

df.withColumn("lang_and_rank", expr("concat(language,'blah')")).show()

+--------+----+-------------+
|language|rank|lang_and_rank|
+--------+----+-------------+
|    Java|  90|     Javablah|
|   Scala|  95|    Scalablah|
|   Spark|  92|    Sparkblah|
+--------+----+-------------+

myudf = udf(incrementAC,StringType())
df.withColumn("lang_and_rank", expr("concat(language,myudf())")).show()

.utils.AnalysisException: u'undefined function myudf;'

【问题讨论】:

【参考方案1】:

必须注册与expr 一起使用的函数:

spark.udf.register("incrementAC", incrementAC)

另外,转换中使用的accumualtors 也不可靠。

【讨论】:

【参考方案2】:

希望这会有所帮助!

from pyspark.sql.functions import udf, expr, concat, col
from pyspark.sql.types import StringType

ac = sc.accumulator(0)

def incrementAC():
  ac.add(1)
  return str(ac)

#sample data
df = sc.parallelize([('Java',90),('Scala',95),('Spark',92)]).toDF(["language","rank"])

方法一:

#solution using usual udf definition
myudf = udf(incrementAC, StringType())
df.withColumn("lang_and_rank", concat(col('language'), myudf())).show()

方法2:

#another solution if you want to use 'expr' (as rightly pointed out by @user9132725)
sqlContext.udf.register("myudf", incrementAC, StringType())
df = df.withColumn("lang_and_rank", expr("concat(language, myudf())"))
df.show()

输出是:

+--------+----+-------------+
|language|rank|lang_and_rank|
+--------+----+-------------+
|    Java|  90|        Java1|
|   Scala|  95|       Scala1|
|   Spark|  92|       Spark2|
+--------+----+-------------+

【讨论】:

【参考方案3】:

我在重命名 Java 文件/函数后遇到了这个问题。我重新启动了我的 Spark 服务器并复制了 JAR 文件,但在尝试使用新的 JAR 文件运行我当前正在运行的 Jupyter Notebook 时出现未定义函数 UDF 错误。

解决方案是重新启动我的 Jupyter Notebook。我怀疑新的 UDF 注册没有刷新一些正在运行的 Python 环境。

【讨论】:

以上是关于pyspark中未定义的函数UDF?的主要内容,如果未能解决你的问题,请参考以下文章

udf(用户定义函数)如何在 pyspark 中工作?

pyspark 的用户定义函数 (UDF) 是不是需要单元测试?

PySpark 用户定义函数 (UDF) 创建新列

在 PySpark Pandas UDF 中指定用户定义函数的正确方法

从Pyspark UDF调用另一个自定义Python函数

PySpark UDF 无法识别参数数量