用于 Dataframe 与 RDD 的 Pyspark UDF

Posted

技术标签:

【中文标题】用于 Dataframe 与 RDD 的 Pyspark UDF【英文标题】:Pyspark UDF for Dataframe vs RDD 【发布时间】:2018-03-01 14:31:38 【问题描述】:

我的数据框的架构是:

root
     |-- _10: string (nullable = true)
     |-- _11: string (nullable = true)
     |-- _12: string (nullable = true)
     |-- _13: string (nullable = true)
     |-- _14: string (nullable = true)
     |-- _15: string (nullable = true)
     |-- _16: string (nullable = true)
     |-- _17: string (nullable = true)
     |-- _18: string (nullable = true)
     |-- _19: string (nullable = true)
     |-- _20: string (nullable = true)
     |-- _21: string (nullable = true)
     |-- _22: string (nullable = true)
     |-- _23: string (nullable = true)
     |-- _24: string (nullable = true)
     |-- _25: string (nullable = true)
     |-- id: long (nullable = true)
     |-- features: array (nullable = true)
     |    |-- element: double (containsNull = true)

我想使用特征数组做一些操作并将结果存储在一个新列中:预测

def addPred(inp):
    global weights, bias
    for j in range(0,len(weights)):
        if j==0:
            out = sigmoid(np.dot(inp,weights[j]) + bias[j])
        elif j==len(weights)-1:
            out = softmax(np.dot(out,weights[j]) + bias[j])
        else:
            out = sigmoid(np.dot(out,weights[j]) + bias[j])

    if out[0]>out[1]:
        return -1*out[0]
    return out[1]

使用此 UDF 和以下代码,我正在尝试直接向数据框添加一个新列。

udf_addPred = udf(addPred, DoubleType())
test_data = test_data.withColumn('pred', udf_addPred('features'))

但它给了我各种各样的错误。

有时出现“不可序列化错误” 有时会出现“RDD 为空错误”

但是如果我使用 rdd map 做同样的操作,它可以使用下面的代码来工作

col_rdd  = test_data.rdd.map(lambda x: addPred(x.features))
我尝试自己调试问题,但无法找出错误来源 以 RDD 方式执行,然后合并列将需要两次计算 谁能指出错误或提出更好的替代方案?

编辑:

test_data.rdd.first() 的输出:

Row(_10=u'Abu Dhabi Global Market', _11=u'Abu Dhabi Media Company', _12=u'Abu Dhabi Global Market (ADGM) BuildingADGM Square Al Maryah Island PO Box 111999', _13=u'Abu Dhabi Media P.O. Box 63', _14=u'Abu Dhabi', _15=u'Abu Dhabi', _16=u'Abu Dhabi', _17=u'Abu Dhabi', _18=u'United Arab Emirates', _19=u'United Arab Emirates', _20=None, _21=None, _22=u'557942700', _23=u'552544884', _24=u'www.adgm.com', _25=u'http://www.admedia.ae', id=4, features=[0.4782608695652174, 0.2592592592592593, 1.0, 1.0, 1.0, 0.14285714285714285, 0.0, 0.19999999999999996])

权重和偏差是 Spark 的多层感知器中对应的东西

def extWeights():
    weights = []
    bias = []
    last = 0
    for i in range(0,len(model.layers)-1):
        curr = (model.layers[i]+1)*model.layers[i+1]
        weights.append(np.reshape(model.weights[last:last+curr],((model.layers[i]+1),model.layers[i+1])))
        bias.append(weights[i][model.layers[i]])
        weights[i] = weights[i][:model.layers[i]]
        last += curr
    return weights, bias

【问题讨论】:

之后不需要合并。只需修改您的 Lamba 以添加您需要的列以及带有函数结果的其他列。 test_data.rdd.take(1) ?你能给我们看一下数据吗? 我不确定你应该使用全局变量。尝试将它们作为参数传递。 什么是global weights, bias 您是否在range 中意识到您只保留最后一个版本的 out ?你不需要为此做一个 for 循环...... 【参考方案1】:

替代解决方案。 RDD 正在工作,因此,在 lambda 函数中带回您需要的 cols:

col_rdd  = test_data.rdd.map(lambda x: addPred(x.features))

变成

col_rdd  = test_data.rdd.map(lambda x: (x.neededCols, addPred(x.features)))

【讨论】:

从逻辑上讲,在创建 RDD 时,我们基本上是在复制另外 10 个属性。这不是有点糟糕吗?

以上是关于用于 Dataframe 与 RDD 的 Pyspark UDF的主要内容,如果未能解决你的问题,请参考以下文章

045 RDD与DataFrame互相转换

RDD, DataFrame,DataSet区别与相互转化

Spark——DataFrame与RDD互操作方式

spark sql 之 RDD与DataFrame互相转化

pyspark建立RDD以及读取文件成dataframe

spark: RDD与DataFrame之间的相互转换