pyspark如何使用两列编写UDF

Posted

技术标签:

【中文标题】pyspark如何使用两列编写UDF【英文标题】:pyspark how to write UDF using two columns 【发布时间】:2021-01-07 10:29:58 【问题描述】:
rdd = sc.parallelize( [(['a','b','c'], 'c'), \
                       (['h','j','s'], 'j'), \
                       (['w','x','a'], 'a'), \
                       (['o','b','e'], 'c')] )

df = spark.createDataFrame(rdd, ['seq','target'])

+---------+------+
|      seq|target|
+---------+------+
|[a, b, c]|     c|
|[h, j, s]|     j|
|[w, x, a]|     a|
|[o, b, e]|     c|
+---------+------+

我想写一个 UDF 来从 seq 中删除目标。

+---------+------+---------+
|      seq|target| filtered|
+---------+------+---------+
|[a, b, c]|     c|   [a, b]|
|[h, j, s]|     j|   [h, s]|
|[w, x, a]|     a|   [w, x]|
|[o, b, e]|     c|[o, b, e]|
+---------+------+---------+

请注意,这只是一个展示。实际情况更为复杂。我想通过使用另一列(例如target)作为参数来获得处理一列(例如seq)的正式方式。 有没有通用的解决方案?

【问题讨论】:

【参考方案1】:

你可以使用array_remove:

import pyspark.sql.functions as F

df2 = df.withColumn('filtered', F.expr('array_remove(seq, target)'))

df2.show()
+---------+------+---------+
|      seq|target| filtered|
+---------+------+---------+
|[a, b, c]|     c|   [a, b]|
|[h, j, s]|     j|   [h, s]|
|[w, x, a]|     a|   [w, x]|
|[o, b, e]|     c|[o, b, e]|
+---------+------+---------+

如果您正在寻找 UDF 解决方案,

@F.udf('array<string>')
def array_remove(col1, col2):
    return list(filter(lambda x: x != col2, col1))

df2 = df.withColumn('filtered', array_remove('seq', 'target'))

df2.show()
+---------+------+---------+
|      seq|target| filtered|
+---------+------+---------+
|[a, b, c]|     c|   [a, b]|
|[h, j, s]|     j|   [h, s]|
|[w, x, a]|     a|   [w, x]|
|[o, b, e]|     c|[o, b, e]|
+---------+------+---------+

【讨论】:

是否有任何理由使用表达式array_remove 而不是函数?这 'array&lt;string&gt;' 是否等同于 T.ArrayType(T.StringType())?这是什么符号? @MykolaZotko 该函数不接受列作为第二个参数,所以我需要使用 expr。是的,对于第二个问题,这是您在 SQL 中指定类型的方式 感谢您的帮助。它是否也适用于两列以上的场景? @mck @yanachen 是的,只需在函数参数中添加更多列

以上是关于pyspark如何使用两列编写UDF的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Hive 上下文中的 Pyspark 调用用 Java 编写的 Hive UDF

根据字符串列和其他列 2 & 3 Pyspark UDF 的条件转换两列

udf(用户定义函数)如何在 pyspark 中工作?

如何在 pyspark 中使用 pandas UDF 并在 StructType 中返回结果

如何在pyspark withcolumn中使用udf和class

如何在 Pyspark 中使用 @pandas_udf 返回多个数据帧?