Pyspark 将函数作为参数传递给 UDF

Posted

技术标签:

【中文标题】Pyspark 将函数作为参数传递给 UDF【英文标题】:Pyspark pass function as a parameter to UDF 【发布时间】:2020-08-05 08:11:27 【问题描述】:

我正在尝试创建一个将另一个函数作为参数的 UDF。但是执行以异常结束。 我运行的代码:

import pandas as pd
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql.types import MapType, DataType, StringType
from pyspark.sql.functions import udf, struct, lit
import os

sc = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sc)

df_to_test = sqlContext.createDataFrame(
    pd.DataFrame(
        'inn': ['111', '222', '333'],
        'field1': [1, 2, 3],
        'field2': ['a', 'b', 'c']
    ))

def foo_fun(row, b) -> str:
    return 'a' + b()

def bar_fun():
    return 'I am bar'

foo_fun_udf = udf(foo_fun, StringType())
df_to_test.withColumn(
    'foo', 
    foo_fun_udf(struct([df_to_test[x] for x in df_to_test.columns]), bar_fun)
).show()

例外:

Invalid argument, not a string or column: <function bar_fun at 0x7f0e69ce6268> of type <class 'function'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

我试图将bar_fun 包装到 udf 中,但没有成功。有没有办法将函数作为参数传递?

【问题讨论】:

【参考方案1】:

您离解决方案不远了。这是我的做法:

def foo_fun_udf(func):

    def foo_fun(row) -> str:
        return 'a' + func()

    out_udf = udf(foo_fun, StringType())
    return out_udf 

df_to_test.withColumn(
    'foo', 
    foo_fun_udf(bar_fun)(struct([df_to_test[x] for x in df_to_test.columns]))
).show()

【讨论】:

以上是关于Pyspark 将函数作为参数传递给 UDF的主要内容,如果未能解决你的问题,请参考以下文章

将查询作为参数传递给 udf 函数

如何在 PySpark 中将多个参数传递给 Pandas UDF?

如何将 Set/HashSet 作为参数传递给 Spark 中的 UDF?

PySpark 将 Dataframe 作为额外参数传递给映射

将列表作为参数传递给 udf 方法

将 UDF 方法作为参数传递给 KSQL 中的其他 UDF