pyspark 数据框上的自定义函数
Posted
技术标签:
【中文标题】pyspark 数据框上的自定义函数【英文标题】:Custom function over pyspark dataframe 【发布时间】:2017-12-01 16:39:54 【问题描述】:我正在尝试对 pyspark 数据框中的行应用自定义函数。 此函数采用相同维度的行和 2 个其他向量。它从第二个向量的行中输出每个匹配值的第三个向量的值的总和。
import pandas as pd
import numpy as np
功能:
def V_sum(row,b,c):
return float(np.sum(c[row==b]))
我想用 pandas 实现很简单:
pd_df = pd.DataFrame([[0,1,0,0],[1,1,0,0],[0,0,1,0],[1,0,1,1],[1,1,0,0]], columns=['t1', 't2', 't3', 't4'])
t1 t2 t3 t4
0 0 1 0 0
1 1 1 0 0
2 0 0 1 0
3 1 0 1 1
4 1 1 0 0
B = np.array([1,0,1,0])
V = np.array([5,1,2,4])
pd_df.apply(lambda x: V_sum(x, B, V), axis=1)
0 4.0
1 9.0
2 7.0
3 8.0
4 9.0
dtype: int64
我想在 pyspark 中执行相同的操作。
from pyspark import SparkConf, SparkContext, SQLContext
sc = SparkContext("local")
sqlContext = SQLContext(sc)
spk_df = sqlContext.createDataFrame([[0,1,0,0],[1,1,0,0],[0,0,1,0],[1,0,1,1],[1,1,0,0]], ['t1', 't2', 't3', 't4'])
spk_df.show()
+---+---+---+---+
| t1| t2| t3| t4|
+---+---+---+---+
| 0| 1| 0| 0|
| 1| 1| 0| 0|
| 0| 0| 1| 0|
| 1| 0| 1| 1|
| 1| 1| 0| 0|
+---+---+---+---+
我想过使用 udf,但我无法让它工作
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F
V_sum_udf = F.udf(V_sum, FloatType())
spk_df.select(V_sum_udf(F.array(*(F.col(x) for x in spk_df.columns))).alias("results")).show()
显然我做错了什么,因为它产生了:
Py4JJavaError: An error occurred while calling o27726.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 90.0 failed 1 times, most recent failure: Lost task 0.0 in stage 90.0 (TID 91, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
【问题讨论】:
【参考方案1】:如果您想在函数中使用非列数据以及列数据来计算新列,则如 here 所述的 UDF + 闭包 + withColumn 是一个不错的起点。
B = [2,0,1,0]
V = [5,1,2,4]
v_sum_udf = F.udf(lambda row: V_sum(row, B, V), FloatType())
spk_df.withColumn("results", v_sum_udf(F.array(*(F.col(x) for x in spk_df.columns))))
【讨论】:
感谢您的回答。我尝试运行您的代码,在我尝试使用 show() 显示结果之前它不会抛出任何错误。 使用 spk_df.withColumn("results", v_sum_udf(F.array(*(F.col(x) for x in spk_df.columns)))) 给了我正确的结果。感谢您为我指明正确的方向。以上是关于pyspark 数据框上的自定义函数的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark MLlib 中的自定义算法:“函数”对象没有属性“_input_kwargs”
如何使用 PySpark 中的自定义函数在同一 ML 管道中传递分桶器?