Pyspark py4j PickleException:“构造 ClassDict 的预期参数为零”

Posted

技术标签:

【中文标题】Pyspark py4j PickleException:“构造 ClassDict 的预期参数为零”【英文标题】:Pyspark py4j PickleException: "expected zero arguments for construction of ClassDict" 【发布时间】:2015-07-06 18:56:59 【问题描述】:

此问题针对熟悉 py4j 的人员 - 可以帮助解决酸洗错误。我正在尝试向 pyspark PythonMLLibAPI 添加一个方法,该方法接受命名元组的 RDD,执行一些工作,并以 RDD 的形式返回结果。

此方法仿照 PYthonMLLibAPI.trainALSModel() 方法,其类似的现有相关部分是:

  def trainALSModel(
    ratingsJRDD: JavaRDD[Rating],
    .. )

用于对新代码建模的现有 python Rating 类是:

class Rating(namedtuple("Rating", ["user", "product", "rating"])):
    def __reduce__(self):
        return Rating, (int(self.user), int(self.product), float(self.rating))

这是尝试所以这里是相关的类:

python 类 pyspark.mllib.clustering.MatrixEntry:

from collections import namedtuple
class MatrixEntry(namedtuple("MatrixEntry", ["x","y","weight"])):
    def __reduce__(self):
        return MatrixEntry, (long(self.x), long(self.y), float(self.weight))

New 方法 foobarRDD 在 PythonMLLibAPI 中:

  def foobarRdd(
    data: JavaRDD[MatrixEntry]): RDD[FooBarResult] = 
    val rdd = data.rdd.map  d => FooBarResult(d.i, d.j, d.value, d.i * 100 + d.j * 10 + d.value)
    rdd
  

现在让我们试试吧:

from pyspark.mllib.clustering import MatrixEntry

def convert_to_MatrixEntry(tuple):
  return MatrixEntry(*tuple)

from pyspark.mllib.clustering import *
pic = PowerIterationClusteringModel(2)
tups = [(1,2,3),(4,5,6),(12,13,14),(15,7,8),(16,17,16.5)]
trdd = sc.parallelize(map(convert_to_MatrixEntry,tups))

# print out the RDD on python side just for validation
print "%s" %(repr(trdd.collect()))

from pyspark.mllib.common import callMLlibFunc
pic = callMLlibFunc("foobar", trdd)

结果的相关部分:

[(1,2)=3.0, (4,5)=6.0, (12,13)=14.0, (15,7)=8.0, (16,17)=16.5]

这表明输入 rdd 是“整体”。不过酸洗不爽:

5/04/27 21:15:44 ERROR Executor: Exception in task 6.0 in stage 1.0 (TID 14)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict
(for pyspark.mllib.clustering.MatrixEntry)
    at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
    at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617)
    at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170)
    at net.razorvine.pickle.Unpickler.load(Unpickler.java:84)
    at net.razorvine.pickle.Unpickler.loads(Unpickler.java:97)
    at org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(PythonMLLibAPI.scala:1167)
    at org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(PythonMLLibAPI.scala:1166)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819)
    at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1523)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1523)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:212)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)

下面是 python 调用堆栈跟踪的视觉效果:

【问题讨论】:

【参考方案1】:

我在使用 MLlib 时遇到了同样的错误,结果证明我在其中一个函数中返回了错误的数据类型。它现在可以在对返回值进行简单转换后工作。这可能不是您正在寻找的答案,但它至少是对未来方向的提示。

【讨论】:

我不再从事该项目 - 因此无法验证。然而,这似乎是一个合理的考虑,因此已投赞成票。【参考方案2】:

多次遇到同样的问题。 numpy 类型没有到 pyspark.sql.types 的隐式转换。

对本机类型系统进行简单的显式转换。 在我的情况下是:

float(vector_a.dot(vector_b))

【讨论】:

【参考方案3】:

我在使用 Spark 版本 >= 2.0 时收到此错误。

Spark 正在将 MLlib 功能转换为更新的 ML 命名空间。 因此,有两种类型的 SparseVector: ml.linalg.SparseVectormllib.linalg.SparseVector

一些 MLlib 函数仍然需要旧的 mllib 类型

from pyspark.ml.linalg import Vectors
# convert ML vector to older MLlib vector
old_vec = Vectors.fromML(new_vec)

HTH

【讨论】:

这非常有帮助 - 谢谢!唯一的问题是,在 2.1.1 版本中,fromML 似乎不再存在,所以我不得不通过 pyspark.mllib.linalg.SparseVector(sv.size, sv.indices, sv.values) 手动创建对象,其中 sv 是我的 pyspark.ml.linalg.SparseVector 对象。

以上是关于Pyspark py4j PickleException:“构造 ClassDict 的预期参数为零”的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark py4j PickleException:“构造 ClassDict 的预期参数为零”

Pyspark 错误:py4j.java_gateway:尝试连接到 Java 服务器时发生错误(127.0.0.1:50532)

pyspark结构化流kafka - py4j.protocol.Py4JJavaError:调用o41.save时发生错误

3 pyspark学习---sparkContext概述

PySpark简介及DF数据处理操作总结

Win7安装PySpark