Pyspark UDF for Dataframe vs RDD
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pyspark UDF for Dataframe vs RDD相关的知识,希望对你有一定的参考价值。
我的数据帧的架构是:
root
|-- _10: string (nullable = true)
|-- _11: string (nullable = true)
|-- _12: string (nullable = true)
|-- _13: string (nullable = true)
|-- _14: string (nullable = true)
|-- _15: string (nullable = true)
|-- _16: string (nullable = true)
|-- _17: string (nullable = true)
|-- _18: string (nullable = true)
|-- _19: string (nullable = true)
|-- _20: string (nullable = true)
|-- _21: string (nullable = true)
|-- _22: string (nullable = true)
|-- _23: string (nullable = true)
|-- _24: string (nullable = true)
|-- _25: string (nullable = true)
|-- id: long (nullable = true)
|-- features: array (nullable = true)
| |-- element: double (containsNull = true)
我想使用features数组进行一些操作,并将结果存储在一个新列中:预测
def addPred(inp):
global weights, bias
for j in range(0,len(weights)):
if j==0:
out = sigmoid(np.dot(inp,weights[j]) + bias[j])
elif j==len(weights)-1:
out = softmax(np.dot(out,weights[j]) + bias[j])
else:
out = sigmoid(np.dot(out,weights[j]) + bias[j])
if out[0]>out[1]:
return -1*out[0]
return out[1]
使用此UDF和以下代码我正在尝试直接向数据框添加新列。
udf_addPred = udf(addPred, DoubleType())
test_data = test_data.withColumn('pred', udf_addPred('features'))
但它给了我各种错误。
- 有时'不可序列化的错误'
- 有时'RDD是空错误'
但是如果我使用rdd map执行相同的操作,它可以使用以下代码
col_rdd = test_data.rdd.map(lambda x: addPred(x.features))
- 我自己尝试调试这个问题,但无法弄清楚错误的来源
- 以RDD方式执行然后合并列将需要两倍的计算
- 有人可以指出错误或建议更好的选择吗?
编辑:
test_data.rdd.first()的输出:
Row(_10=u'Abu Dhabi Global Market', _11=u'Abu Dhabi Media Company', _12=u'Abu Dhabi Global Market (ADGM) BuildingADGM Square Al Maryah Island PO Box 111999', _13=u'Abu Dhabi Media P.O. Box 63', _14=u'Abu Dhabi', _15=u'Abu Dhabi', _16=u'Abu Dhabi', _17=u'Abu Dhabi', _18=u'United Arab Emirates', _19=u'United Arab Emirates', _20=None, _21=None, _22=u'557942700', _23=u'552544884', _24=u'www.adgm.com', _25=u'http://www.admedia.ae', id=4, features=[0.4782608695652174, 0.2592592592592593, 1.0, 1.0, 1.0, 0.14285714285714285, 0.0, 0.19999999999999996])
重量和偏差是Spark的Multilayer Perceptron的相应内容
def extWeights():
weights = []
bias = []
last = 0
for i in range(0,len(model.layers)-1):
curr = (model.layers[i]+1)*model.layers[i+1]
weights.append(np.reshape(model.weights[last:last+curr],((model.layers[i]+1),model.layers[i+1])))
bias.append(weights[i][model.layers[i]])
weights[i] = weights[i][:model.layers[i]]
last += curr
return weights, bias
答案
替代方案。因此,RDD正在工作,带回lambda函数中所需的cols:
col_rdd = test_data.rdd.map(lambda x: addPred(x.features))
变
col_rdd = test_data.rdd.map(lambda x: (x.neededCols, addPred(x.features)))
以上是关于Pyspark UDF for Dataframe vs RDD的主要内容,如果未能解决你的问题,请参考以下文章
PySpark。将 Dataframe 传递给 pandas_udf 并返回一个系列
PySpark 中的 Groupby 和 UDF/UDAF,同时保持 DataFrame 结构
PySpark 将算法转换为 UDF 并将其应用于 DataFrame
用于 Dataframe 与 RDD 的 Pyspark UDF