PySpark 传递列表到用户定义函数

Posted

技术标签:

【中文标题】PySpark 传递列表到用户定义函数【英文标题】:PySpark pass list to User Defined Function 【发布时间】:2020-01-10 23:14:48 【问题描述】:

我有一个包含一列的 DataFrame。在该列的每一行中,都有一个具有不同整数个数的列表。例如,第 1 行有一个包含 5 个整数的列表。第 2 行有一个包含 8 个整数的列表,并且... 我想编写一个UDF,它将每一行作为一个列表并将列表中的所有整数相乘并将结果作为数字返回。 我想在另一列中有答案,所以它是 df.withColumn(...) 出于某种原因,我确实想使用 UDF 而不是任何其他预构建的函数或命令。 感谢您的时间和支持。

【问题讨论】:

【参考方案1】:

定义一些样本输入数据:

df = spark.createDataFrame([
    (1, [3, 4, 8]), (2, [7, 2, 6, 8])
], ("id", "list_of_ints"))

df.show()

udf的定义:

from pyspark.sql.functions import udf, col
def product(numbers):
     a = 1
     for num in numbers:
         a *= num
     return a

from pyspark.sql.types import IntegerType
product_udf = udf(lambda z: product(z), IntegerType())

并添加一个包含所有列表元素乘积的列:

df.withColumn("product", product_udf("list_of_ints")).show()

+---+------------+-------+
| id|list_of_ints|product|
+---+------------+-------+
|  1|   [3, 4, 8]|     96|
|  2|[7, 2, 6, 8]|    672|
+---+------------+-------+

希望这会有所帮助!

【讨论】:

以上是关于PySpark 传递列表到用户定义函数的主要内容,如果未能解决你的问题,请参考以下文章

如何将输入传递到用户定义函数中的命名列表

如何使用 PySpark 中的自定义函数在同一 ML 管道中传递分桶器?

尝试通过数据框在 Pyspark 中执行用户定义的函数时出错

使用 udf 传递列作为参数将自定义列添加到 pyspark 数据帧

传递列表项作为 withColumn (Pyspark) 的输入

pyspark 的用户定义函数 (UDF) 是不是需要单元测试?