pyspark udf 的可变参数数量
Posted
技术标签:
【中文标题】pyspark udf 的可变参数数量【英文标题】:Variable number of arguments for pyspark udf 【发布时间】:2017-03-21 18:40:40 【问题描述】:我有大约275
列,我想在25
列中搜索正则表达式字符串"^D(410|412)
。如果此搜索字符串出现在任何2
5 列中,我想将true
添加到MyNewColumn
。
使用下面我可以为2
列做到这一点。无论如何传递可变数量的列?
以下代码适用于 2 列
def moreThanTwoArgs(col1,col2):
return bool((re.search("^D(410|412)",col1) or re.search("^D(410|412)",col2)))
twoUDF= udf(moreThanTwoArgs,BooleanType())
df = df.withColumn("MyNewColumn", twoUDF(df["X1"], df["X2"]))
【问题讨论】:
【参考方案1】:我尝试了一些类似的示例代码试试这个并继续:-
df1 = sc.parallelize(
[
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
]).toDF(['c1', 'c2', 'c3', 'c4', 'c5', 'c6', 'c7', 'c8', 'c9', 'c10'])
df1.show()
+---+---+---+---+---+---+---+---+---+---+
| c1| c2| c3| c4| c5| c6| c7| c8| c9|c10|
+---+---+---+---+---+---+---+---+---+---+
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10|
+---+---+---+---+---+---+---+---+---+---+
import pyspark.sql.functions as F
import pyspark.sql.types as T
import re
def booleanFindFunc(*args):
return sum(args)
udfBoolean = F.udf(booleanFindFunc, T.StringType())
#Below is Sum of three columns (c1+c2+c2)
df1.withColumn("MyNewColumn", booleanFindFunc(F.col("c1"), F.col("c2"), F.col("c2"))).show()
+---+---+---+---+---+---+---+---+---+---+-----------+
| c1| c2| c3| c4| c5| c6| c7| c8| c9|c10|MyNewColumn|
+---+---+---+---+---+---+---+---+---+---+-----------+
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 5|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 5|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 5|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 5|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 5|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 5|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 5|
+---+---+---+---+---+---+---+---+---+---+-----------+
#Below is Sum of All Columns (c1+c2+c3---+c10)
df1.withColumn("MyNewColumn", booleanFindFunc(*[F.col(i) for i in df1.columns])).show()
+---+---+---+---+---+---+---+---+---+---+-----------+
| c1| c2| c3| c4| c5| c6| c7| c8| c9|c10|MyNewColumn|
+---+---+---+---+---+---+---+---+---+---+-----------+
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 55|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 55|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 55|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 55|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 55|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 55|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 55|
+---+---+---+---+---+---+---+---+---+---+-----------+
#Below is Sum of All odd Columns (c1+c3+c5--+c9)
df1.withColumn("MyNewColumn", booleanFindFunc(*[F.col(i) for i in df1.columns if int(i[1:])%2])).show()
+---+---+---+---+---+---+---+---+---+---+-----------+
| c1| c2| c3| c4| c5| c6| c7| c8| c9|c10|MyNewColumn|
+---+---+---+---+---+---+---+---+---+---+-----------+
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 25|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 25|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 25|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 25|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 25|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 25|
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 25|
+---+---+---+---+---+---+---+---+---+---+-----------+
希望这能解决你的问题
【讨论】:
以上是关于pyspark udf 的可变参数数量的主要内容,如果未能解决你的问题,请参考以下文章