编写一个 Pyspark UDF,其功能类似于 Python any 函数

Posted

技术标签:

【中文标题】编写一个 Pyspark UDF,其功能类似于 Python any 函数【英文标题】:Writing a Pyspark UDF that functions like the Python any function 【发布时间】:2017-09-17 13:52:10 【问题描述】:

我想编写一个 any_lambda 函数来检查 ArrayType 列中的任何元素是否满足 lambda 函数指定的条件。

这是我的代码不起作用:

def any_lambda(f, l):
    return any(list(map(f, l)))

spark.udf.register("any_lambda", any_lambda)

source_df = spark.createDataFrame(
    [
        ("jose", [1, 2, 3]),
        ("li", [4, 5, 6]),
        ("luisa", [10, 11, 12]),
    ],
    StructType([
        StructField("name", StringType(), True),
        StructField("nums", ArrayType(StringType(), True), True),
    ])
)

actual_df = source_df.withColumn(
    "any_num_greater_than_5",
    any_lambda(lambda n: n > 5, col("nums"))
)

此代码引发TypeError: Column is not iterable

如何创建一个有效的any_lambda 函数?

【问题讨论】:

【参考方案1】:

Udf 期望参数是列,lambda 函数不是列;您可能要做的是定义any_lambda,以便它接受一个lambda函数并返回一个udf

import pyspark.sql.functions as F

def any_lambda(f):
    @F.udf
    def temp_udf(l):
        return any(map(f, l))
    return temp_udf

source_df = spark.createDataFrame(
    [
        ("jose", [1, 2, 3]),
        ("li", [4, 5, 6]),
        ("luisa", [10, 11, 12]),
    ],
    StructType([
        StructField("name", StringType(), True),
        StructField("nums", ArrayType(IntegerType(), True), True),
    ])
)

actual_df = source_df.withColumn(
    "any_num_greater_than_5",
    any_lambda(lambda n: n > 5)(col("nums"))
)

actual_df.show()
+-----+------------+----------------------+
| name|        nums|any_num_greater_than_5|
+-----+------------+----------------------+
| jose|   [1, 2, 3]|                 false|
|   li|   [4, 5, 6]|                  true|
|luisa|[10, 11, 12]|                  true|
+-----+------------+----------------------+

或者正如@Powers 所说,要明确返回的列类型,我们可以在udf 中指定返回的类型,如下所示:

def any_lambda(f):
    def temp_udf(l):
        return any(map(f, l))
    return F.udf(temp_udf, BooleanType())

现在架构看起来像:

actual_df.printSchema()
root
 |-- name: string (nullable = true)
 |-- nums: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- any_num_greater_than_5: boolean (nullable = true)

【讨论】:

很棒的答案。我删除了@F.udf 并添加了return F.udf(temp_udf, BooleanType()) 以使any_num_greater_than_5 成为布尔列。在您的回答中,它是一个字符串列。您能否更新您的答案以显示这两个选项? @Powers 好电话。编辑答案以反映这一点。 嗨,Psidom,你能告诉我这条线是如何工作的............ any_lambda(lambda n: n > 5)(col("nums"))

以上是关于编写一个 Pyspark UDF,其功能类似于 Python any 函数的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 将算法转换为 UDF 并将其应用于 DataFrame

Pyspark:访问 UDF 中行内的列

PySpark 自定义 UDF ModuleNotFoundError: No module named

Pyspark:在UDF中传递多列以及参数

在 pyspark 中使用 UDF 和简单数据帧

PySpark 结构化流将 udf 应用于窗口