spark pyspark mllib 模型 - 当使用 map 生成预测 rdd 时,它会在 collect() 上引发异常

Posted

技术标签:

【中文标题】spark pyspark mllib 模型 - 当使用 map 生成预测 rdd 时,它会在 collect() 上引发异常【英文标题】:spark pyspark mllib model - when prediction rdd is generated using map, it throws exception on collect() 【发布时间】:2015-11-20 04:14:14 【问题描述】:

我正在使用 spark 1.2.0(无法升级,因为我无法控制它)。我正在使用 mllib 构建模型

points = labels.zip(tfidf).map(lambda t: LabeledPoint(t[0], t[1] ))
train_data, test_data = points.randomSplit([0.6, 0.4], 17)

iterations = 3
model = LogisticRegressionWithSGD.train(train_data, iterations)

labelsAndPreds = test_data.map(lambda p: (p.label, model.predict(p.features)) )
print("labels = "+str(labelsAndPreds.collect()))

当我运行此代码时,我在 collect() 上收到 NullPointerException。事实上,对预测数据结果的任何操作都会引发此异常。

15/08/26 04:02:43 INFO TaskSetManager: Starting task 0.0 in stage 17.0 (TID 26, dojo3s10118.rtp1.hadoop.fmr.com, PROCESS_LOCAL, 1464 bytes)
15/08/26 04:02:43 INFO BlockManagerInfo: Added broadcast_15_piece0 in memory on dojo3s10118.rtp1.hadoop.fmr.com:41145 (size: 9.6 KB, free: 529.8 MB)
15/08/26 04:02:43 INFO BlockManagerInfo: Added broadcast_14_piece0 in memory on dojo3s10118.rtp1.hadoop.fmr.com:41145 (size: 68.0 B, free: 529.8 MB)
15/08/26 04:02:43 WARN TaskSetManager: Lost task 0.0 in stage 17.0 (TID 26, dojo3s10118.rtp1.hadoop.fmr.com): java.lang.NullPointerException
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1468)
        at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203)

15/08/26 04:02:43 INFO TaskSetManager: Starting task 0.1 in stage 17.0 (TID 27, dojo3s10118.rtp1.hadoop.fmr.com, PROCESS_LOCAL, 1464 bytes)
15/08/26 04:02:44 INFO TaskSetManager: Lost task 0.1 in stage 17.0 (TID 27) on executor dojo3s10118.rtp1.hadoop.fmr.com: java.lang.NullPointerException (null) [duplicate 1]
15/08/26 04:02:44 INFO TaskSetManager: Starting task 0.2 in stage 17.0 (TID 28, dojo3s10118.rtp1.hadoop.fmr.com, PROCESS_LOCAL, 1464 bytes)
15/08/26 04:02:44 INFO TaskSetManager: Lost task 0.2 in stage 17.0 (TID 28) on executor dojo3s10118.rtp1.hadoop.fmr.com: java.lang.NullPointerException (null) [duplicate 2]
15/08/26 04:02:44 INFO TaskSetManager: Starting task 0.3 in stage 17.0 (TID 29, dojo3s10118.rtp1.hadoop.fmr.com, PROCESS_LOCAL, 1464 bytes)
15/08/26 04:02:44 INFO TaskSetManager: Lost task 0.3 in stage 17.0 (TID 29) on executor dojo3s10118.rtp1.hadoop.fmr.com: java.lang.NullPointerException (null) [duplicate 3]
15/08/26 04:02:44 ERROR TaskSetManager: Task 0 in stage 17.0 failed 4 times; aborting job
15/08/26 04:02:44 INFO YarnClientClusterScheduler: Removed TaskSet 17.0, whose tasks have all completed, from pool
15/08/26 04:02:44 INFO YarnClientClusterScheduler: Cancelling stage 17
15/08/26 04:02:44 INFO DAGScheduler: Job 8 failed: collect at /home/a560975/spark-exp/./ml-py-exp-2.py:102, took 0.209401 s
Traceback (most recent call last):
  File "/home/a560975/spark-exp/./ml-py-exp-2.py", line 102, in <module>
    print("labels = "+str(labelsAndPreds.collect()))
  File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p711.386/lib/spark/python/pyspark/rdd.py", line 676, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p711.386/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p711.386/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o118.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 4 times, most recent failure: Lost task 0.3 in stage 17.0 (TID 29, dojo3s10118.rtp1.hadoop.fmr.com
): java.lang.NullPointerException
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229)

如果不是做test_data.map(lambda p: (p.label, model.predict(p.features)) ) 我执行以下操作

for lp in test_data.collect():
    print("predicted = "+str(model.predict(lp.features)))

那么预测不会抛出任何异常,但这不是并行的。 当我尝试通过 map 函数进行模型预测时,为什么会出现异常?我该如何克服它?

我已经尝试sc.broadcast(model) 广播模型,但我仍然看到同样的问题。请帮忙。

【问题讨论】:

你的test_data变量一定是空的,检查一下! 我已经通过打印内容进行了测试,它不为空。 安装似乎有问题。我将相同的代码移至另一个设置,它可以正常工作。需要深入挖掘以找出安装问题。 How to generate tuples of (original lable, predicted label) on Spark with MLlib?的可能重复 【参考方案1】:

如果你使用 Python ,原因是“在 Python 中,predict 目前不能在 RDD 转换或动作中使用。而是直接在 RDD 上调用 predict。”。

【讨论】:

以上是关于spark pyspark mllib 模型 - 当使用 map 生成预测 rdd 时,它会在 collect() 上引发异常的主要内容,如果未能解决你的问题,请参考以下文章

高斯混合模型:Spark MLlib 和 scikit-learn 之间的区别

PySpark MLLib Zeppelin Logistic回归度量标准错误:AssertionError:维度不匹配

如何使用 pySpark 决定将 numClasses 参数传递给 SPark MLlib 中的随机森林算法

如何在 PySpark 中更新 MLLIB 版本

PySpark MLlib:AssertionError:分类器未从 HasRawPredictionCol 扩展

PySpark 中的回归。使用哪个库[重复]