Pyspark:在UDF中传递多列以及参数

Posted

技术标签:

【中文标题】Pyspark:在UDF中传递多列以及参数【英文标题】:Pyspark: Pass multiple columns along with an argument in UDF 【发布时间】:2018-10-16 20:20:22 【问题描述】:

我正在编写一个 udf,它将采用两个数据框列以及一个额外的参数(一个常量值),并且应该向数据框添加一个新列。我的功能如下:

def udf_test(column1, column2, constant_var):
    if column1 == column2:
        return column1
    else:
        return constant_var

另外,我正在执行以下操作以传递多个列:

apply_test = udf(udf_test, StringType())
df = df.withColumn('new_column', apply_test('column1', 'column2'))

除非我删除 constant_var 作为函数的第三个参数,否则这现在不起作用,但我真的需要它。因此,我尝试执行以下操作:

constant_var = 'TEST'
apply_test = udf(lambda x: udf_test(x, constant_var), StringType())
df = df.withColumn('new_column', apply_test(constant_var)(col('column1', 'column2')))

apply_test = udf(lambda x,y: udf_test(x, y, constant_var), StringType())

以上都不适合我。我根据this 和this *** 帖子得到了这些想法,我认为我的问题与两者的不同之处很明显。任何帮助将不胜感激。

注意:我在这里简化了功能只是为了讨论,实际功能更复杂。我知道这个操作可以使用whenotherwise 语句来完成。

【问题讨论】:

你可以使用.when().otherwise(),对吧? @Prazy 这个函数实际上更复杂,我把它改成了这个只是为了简化问题。但你是对的,在那种情况下我可以使用 when 和 else 什么是 constant_var? 【参考方案1】:

您不必使用用户定义的函数。你可以使用when()和otherwise()这两个函数:

from pyspark.sql import functions as f
df = df.withColumn('new_column', 
                   f.when(f.col('col1') == f.col('col2'), f.col('col1'))
                    .otherwise('other_value'))

另一种方法是生成用户定义的函数。但是,使用udf 会对性能产生负面影响,因为数据必须在 python 之间进行(反)序列化。要生成用户定义的函数,您需要一个返回(用户定义的)函数的函数。例如:

def generate_udf(constant_var):
    def test(col1, col2):
        if col1 == col2:
            return col1
        else:
            return constant_var
    return f.udf(test, StringType())

df = df.withColumn('new_column', 
                   generate_udf('default_value')(f.col('col1'), f.col('col2')))

【讨论】:

实现 UDF 会影响性能,并且您的 .when() 和 .other() 会替换您编写的整个 UDF 代码(节省时间和性能)。 UDF 反序列化和重新序列化数据 => 负面性能。此外,when()otherwise() 不是运算符,它们是函数 谢谢!我已将您的疑虑添加到答案中 :) 谢谢你。

以上是关于Pyspark:在UDF中传递多列以及参数的主要内容,如果未能解决你的问题,请参考以下文章

使用 udf 传递列作为参数将自定义列添加到 pyspark 数据帧

在 PySpark 中,有没有办法使用运行时给出的 Python 类的函数来动态注册 UDF? [复制]

Pyspark 和使用 UDF:如何将 Python 参数(sys.argv、argparse)传递给 Python Worker?

更改 DataFrame 中的列数据类型并将其传递到 UDF - PySpark

以整齐的方式将多列作为分组变量传递给 UDF

pyspark udf 的可变参数数量