pyspark如何使用两列编写UDF
Posted
技术标签:
【中文标题】pyspark如何使用两列编写UDF【英文标题】:pyspark how to write UDF using two columns 【发布时间】:2021-01-07 10:29:58 【问题描述】:rdd = sc.parallelize( [(['a','b','c'], 'c'), \
(['h','j','s'], 'j'), \
(['w','x','a'], 'a'), \
(['o','b','e'], 'c')] )
df = spark.createDataFrame(rdd, ['seq','target'])
+---------+------+
| seq|target|
+---------+------+
|[a, b, c]| c|
|[h, j, s]| j|
|[w, x, a]| a|
|[o, b, e]| c|
+---------+------+
我想写一个 UDF 来从 seq 中删除目标。
+---------+------+---------+
| seq|target| filtered|
+---------+------+---------+
|[a, b, c]| c| [a, b]|
|[h, j, s]| j| [h, s]|
|[w, x, a]| a| [w, x]|
|[o, b, e]| c|[o, b, e]|
+---------+------+---------+
请注意,这只是一个展示。实际情况更为复杂。我想通过使用另一列(例如target
)作为参数来获得处理一列(例如seq
)的正式方式。
有没有通用的解决方案?
【问题讨论】:
【参考方案1】:你可以使用array_remove
:
import pyspark.sql.functions as F
df2 = df.withColumn('filtered', F.expr('array_remove(seq, target)'))
df2.show()
+---------+------+---------+
| seq|target| filtered|
+---------+------+---------+
|[a, b, c]| c| [a, b]|
|[h, j, s]| j| [h, s]|
|[w, x, a]| a| [w, x]|
|[o, b, e]| c|[o, b, e]|
+---------+------+---------+
如果您正在寻找 UDF 解决方案,
@F.udf('array<string>')
def array_remove(col1, col2):
return list(filter(lambda x: x != col2, col1))
df2 = df.withColumn('filtered', array_remove('seq', 'target'))
df2.show()
+---------+------+---------+
| seq|target| filtered|
+---------+------+---------+
|[a, b, c]| c| [a, b]|
|[h, j, s]| j| [h, s]|
|[w, x, a]| a| [w, x]|
|[o, b, e]| c|[o, b, e]|
+---------+------+---------+
【讨论】:
是否有任何理由使用表达式array_remove
而不是函数?这 'array<string>'
是否等同于 T.ArrayType(T.StringType())
?这是什么符号?
@MykolaZotko 该函数不接受列作为第二个参数,所以我需要使用 expr。是的,对于第二个问题,这是您在 SQL 中指定类型的方式
感谢您的帮助。它是否也适用于两列以上的场景? @mck
@yanachen 是的,只需在函数参数中添加更多列以上是关于pyspark如何使用两列编写UDF的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Hive 上下文中的 Pyspark 调用用 Java 编写的 Hive UDF
根据字符串列和其他列 2 & 3 Pyspark UDF 的条件转换两列
如何在 pyspark 中使用 pandas UDF 并在 StructType 中返回结果