将 UDF 应用于 spark 2.0 中的 SparseVector 列

Posted

技术标签:

【中文标题】将 UDF 应用于 spark 2.0 中的 SparseVector 列【英文标题】:Apply UDF to SparseVector column in spark 2.0 【发布时间】:2016-10-10 20:10:05 【问题描述】:

我正在尝试将 UDF 应用于 PySpark df 中包含 SparseVectors 的列(使用 pyspark.ml.feature.IDF 创建)。最初,我试图应用一个更复杂的功能,但在任何应用功能时都会遇到同样的错误。所以举个例子:

udfSum = udf(lambda x: np.sum(x.values), FloatType()) 
df = df.withColumn("vec_sum", udfSum(df.idf)) 
df.take(10) 

我收到此错误:

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.sql.execution.python.EvaluatePython.takeAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 55.0 failed 4 times, most recent failure: Lost task 0.3 
in stage 55.0 (TID 111, 10.0.11.102): net.razorvine.pickle.PickleException:
expected zero arguments for construction of ClassDict (for numpy.dtype)

如果我将 df 转换为 Pandas 并应用该函数,我可以确认 FloatType() 是正确的响应类型。这可能是相关的,但错误是不同的:Issue with UDF on a column of Vectors in PySpark DataFrame。

谢谢!

【问题讨论】:

【参考方案1】:

将输出转换为float:

udf(lambda x: float(np.sum(x.values)), FloatType()) 

【讨论】:

工作就像一个魅力!

以上是关于将 UDF 应用于 spark 2.0 中的 SparseVector 列的主要内容,如果未能解决你的问题,请参考以下文章

将 Python UDF 应用于 Spark 数据帧时出现 java.lang.IllegalArgumentException

Pyspark:UDF 将正则表达式应用于数据帧中的每一行

如何将 pandas udf 应用于大型矩阵数据框

Spark 是不是仅将我的 UDF 应用于正在显示的记录?

spark read 在 Scala UDF 函数中不起作用

如何在 Scala Spark 项目中使用 PySpark UDF?