pyspark 数据框上的自定义函数

Posted

技术标签:

【中文标题】pyspark 数据框上的自定义函数【英文标题】:Custom function over pyspark dataframe 【发布时间】:2017-12-01 16:39:54 【问题描述】:

我正在尝试对 pyspark 数据框中的行应用自定义函数。 此函数采用相同维度的行和 2 个其他向量。它从第二个向量的行中输出每个匹配值的第三个向量的值的总和。

import pandas as pd
import numpy as np

功能:

def V_sum(row,b,c):
    return float(np.sum(c[row==b]))

我想用 pandas 实现很简单:

pd_df = pd.DataFrame([[0,1,0,0],[1,1,0,0],[0,0,1,0],[1,0,1,1],[1,1,0,0]], columns=['t1', 't2', 't3', 't4'])
   t1  t2  t3  t4
0   0   1   0   0
1   1   1   0   0
2   0   0   1   0
3   1   0   1   1
4   1   1   0   0

B = np.array([1,0,1,0])
V = np.array([5,1,2,4])

pd_df.apply(lambda x: V_sum(x, B, V), axis=1)
0    4.0
1    9.0
2    7.0
3    8.0
4    9.0
dtype: int64

我想在 pyspark 中执行相同的操作。

from pyspark import SparkConf, SparkContext, SQLContext
sc = SparkContext("local")
sqlContext = SQLContext(sc)

spk_df = sqlContext.createDataFrame([[0,1,0,0],[1,1,0,0],[0,0,1,0],[1,0,1,1],[1,1,0,0]], ['t1', 't2', 't3', 't4'])
spk_df.show()
+---+---+---+---+
| t1| t2| t3| t4|
+---+---+---+---+
|  0|  1|  0|  0|
|  1|  1|  0|  0|
|  0|  0|  1|  0|
|  1|  0|  1|  1|
|  1|  1|  0|  0|
+---+---+---+---+

我想过使用 udf,但我无法让它工作

from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

V_sum_udf = F.udf(V_sum, FloatType()) 
spk_df.select(V_sum_udf(F.array(*(F.col(x) for x in spk_df.columns))).alias("results")).show()

显然我做错了什么,因为它产生了:

Py4JJavaError: An error occurred while calling o27726.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 90.0 failed 1 times, most recent failure: Lost task 0.0 in stage 90.0 (TID 91, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):

【问题讨论】:

【参考方案1】:

如果您想在函数中使用非列数据以及列数据来计算新列,则如 here 所述的 UDF + 闭包 + withColumn 是一个不错的起点。

B = [2,0,1,0] 
V = [5,1,2,4]

v_sum_udf = F.udf(lambda row: V_sum(row, B, V), FloatType())
spk_df.withColumn("results", v_sum_udf(F.array(*(F.col(x) for x in spk_df.columns))))

【讨论】:

感谢您的回答。我尝试运行您的代码,在我尝试使用 show() 显示结果之前它不会抛出任何错误。 使用 spk_df.withColumn("results", v_sum_udf(F.array(*(F.col(x) for x in spk_df.columns)))) 给了我正确的结果。感谢您为我指明正确的方向。

以上是关于pyspark 数据框上的自定义函数的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark MLlib 中的自定义算法:“函数”对象没有属性“_input_kwargs”

如何使用 PySpark 中的自定义函数在同一 ML 管道中传递分桶器?

pyspark groupby 并应用自定义函数

Keras 上的自定义损失函数

pyspark:从 pyspark 调用自定义 java 函数。我需要 Java_Gateway 吗?

pyspark 数据框中的自定义排序