如何在 pyspark 管道中添加 UDF?

Posted

技术标签:

【中文标题】如何在 pyspark 管道中添加 UDF?【英文标题】:How to add UDF in pyspark pipeline? 【发布时间】:2018-04-28 11:02:20 【问题描述】:

我有以下代码,基本上是在做特征工程管道:

token_q1=Tokenizer(inputCol='question1',outputCol='question1_tokens') 
token_q2=Tokenizer(inputCol='question2',outputCol='question2_tokens')  

remover_q1=StopWordsRemover(inputCol='question1_tokens',outputCol='question1_tokens_filtered')
remover_q2=StopWordsRemover(inputCol='question2_tokens',outputCol='question2_tokens_filtered')

q1w2model = Word2Vec(inputCol='question1_tokens_filtered',outputCol='q1_vectors')
q1w2model.setSeed(1)

q2w2model = Word2Vec(inputCol='question2_tokens_filtered',outputCol='q2_vectors')
q2w2model.setSeed(1)

pipeline=Pipeline(stages[token_q1,token_q2,remover_q1,remover_q2,q1w2model,q2w2model])
model=pipeline.fit(train)
result=model.transform(train)
result.show()

我想将以下 UDF 添加到上述管道中:

charcount_q1 = F.udf(lambda row : sum([len(char) for char in row]),IntegerType())

当我这样做时,我得到 Java 错误。有人可以指出我正确的方向吗?

但是,我使用以下基本上可以工作的代码添加此列:

charCountq1=train.withColumn("charcountq1", charcount_q1("question1"))

但我想将它添加到管道中而不是这样做

【问题讨论】:

【参考方案1】:

如果您想在Pipeline 中使用udf,您需要以下之一:

Create a custom Transformer in PySpark ML 使用SQLTransformer

对于这样一个简单的用例,第一个非常冗长,所以我推荐第二个选项:

from pyspark.sql.functions import udf
from pyspark.ml import Pipeline
from pyspark.ml.feature import SQLTransformer

charcount_q1 = spark.udf.register(
    "charcount_q1",
    lambda row : sum(len(char) for char in row),
    "integer"
)

df = spark.createDataFrame(
    [(1, ["spark", "java", "python"])],
    ("id", "question1"))

pipeline = Pipeline(stages = [SQLTransformer(
    statement = "SELECT *, charcount_q1(question1) charcountq1 FROM __THIS__"
)])

pipeline.fit(df).transform(df).show()
# +---+--------------------+-----------+
# | id|           question1|charcountq1|
# +---+--------------------+-----------+
# |  1|[spark, java, pyt...|         15|
# +---+--------------------+-----------+

【讨论】:

以上是关于如何在 pyspark 管道中添加 UDF?的主要内容,如果未能解决你的问题,请参考以下文章

udf(用户定义函数)如何在 pyspark 中工作?

如何使用具有多个源列的 pandas_udf 将多个列添加到 pyspark DF?

如何在 pyspark 中使用 pandas UDF 并在 StructType 中返回结果

如何在 PySpark 的 UDF 中返回“元组类型”?

如何在pyspark withcolumn中使用udf和class

如何在 pyspark.sql.functions.pandas_udf 和 pyspark.sql.functions.udf 之间进行选择?