如何使用 withColumn 将额外的参数传递给 UDF

Posted

技术标签:

【中文标题】如何使用 withColumn 将额外的参数传递给 UDF【英文标题】:How to pass an extra argument to UDF using withColumn 【发布时间】:2020-07-31 14:47:07 【问题描述】:

如何使用 withColumn 向我的 UDF 传递一个额外的参数

df = spark.createDataFrame([
  ["aaa","1"],
  ["bbb","2"],
  ["ccc","5"]
]).toDF("text","id")

def title(x,y):
   if y:
      x = x.title()
   return x

title_udf = udf(lambda x: title(x,y), StringType())
spark.udf.register('title_udf', title_udf)

df = df.withColumn('text_title',title_udf('text',True)

当我尝试这个时,我得到一个错误:Invalid argument, not a string or column....

【问题讨论】:

【参考方案1】:

udf 只能识别行元素。所以要传递一个固定的参数,你必须使用 lit() 函数。您的 udf 定义也必须更正。试试这个:

import pyspark.sql.functions as F
from pyspark.sql.types import *
df = spark.createDataFrame([
  ["aaa","1"],
  ["bbb","2"],
  ["ccc","5"]
]).toDF("text","id")

def title(x,y):
   if y:
      x = x.title()
   return x

title_udf = F.udf(title, StringType())

df = df.withColumn('text_title',title_udf('text',F.lit(True)))

 df.show()
+----+---+----------+
|text| id|text_title|
+----+---+----------+
| aaa|  1|       Aaa|
| bbb|  2|       Bbb|
| ccc|  5|       Ccc|
+----+---+----------+

正如评论中@powers 所指出的,如果此输出是您的最终目的,那么您可以使用 initcap() 函数在没有 udf 的情况下执行此操作

df = df.withColumn("text_title",F.when(F.lit(True),F.initcap(F.col('text'))).otherwise(F.col('text')))

您还可以使用其他列作为条件,例如“id”列

df = df.withColumn("text_title",F.when(F.col('id')>2,F.initcap(F.col('text'))).otherwise(F.col('text')))

【讨论】:

@Raghu - 很好的答案 +1。作为额外的学分分配,您可能还想解释如何在不使用 UDF 的情况下解决这个问题。应尽可能避免使用 UDF,我认为可以使用 Spark 原生函数解决此问题。 @Powers - 感谢您的提示。我一直认为给定的数据是一个玩具示例 :-)【参考方案2】:

您可以使用 initcap 内置函数来避免 udf。

Example:

df = spark.createDataFrame([
  ["aaa","1"],
  ["bbb","2"],
  ["ccc","5"]
]).toDF("text","id")

from pyspark.sql.functions import *

df.withColumn("text_title",initcap(col("text"))).show()
#+----+---+----------+
#|text| id|text_title|
#+----+---+----------+
#| aaa|  1|       Aaa|
#| bbb|  2|       Bbb|
#| ccc|  5|       Ccc|
#+----+---+----------+

【讨论】:

谢谢。我知道,但这只是 UDF 的一个例子

以上是关于如何使用 withColumn 将额外的参数传递给 UDF的主要内容,如果未能解决你的问题,请参考以下文章

如何将额外的参数传递给 Vision 框架?

如何将参数传递给用户定义函数?

如何将额外的参数传递给 django admin 自定义表单?

将额外参数传递给事件处理程序?

如何将一些额外的字符串参数传递给每行的 spark udf?

如何在 SparkR 中将额外的参数传递给 spark.lapply?