无法序列化对象:AttributeError:“builtin_function_or_method”对象没有属性“__code__”

Posted

技术标签:

【中文标题】无法序列化对象:AttributeError:“builtin_function_or_method”对象没有属性“__code__”【英文标题】:Could not serialize object: AttributeError: 'builtin_function_or_method' object has no attribute '__code__' 【发布时间】:2018-06-03 23:33:46 【问题描述】:

我在 python 中通过 tensorflow 训练了一个 DNN 分类器模型。现在我想将它加载到 pyspark 并使用该模型来预测 RDD 的每条记录的性别。首先我像在训练模型中一样构建张量流图,然后加载训练好的模型并尝试预测 RDD 的每一行:

"""
code to generate the tensorflow graph omitted
"""

with tf.Session(graph=graph) as sess:
    # load the trained model
    saver.restore(sess, "./nonClass_gender")
    # lib is the RDD, each Row has the form of Row(key = ..., values = ..., indcies =..., shape = ...)
    predictions_1 = lib.map(lambda e: Row(key = e["key"], 
    prob = y_proba.eval(feed_dict=values: e["values"], 
    indices: e["indices"], shape: [1,2318])))
    predictions_1.take(5)

请注意,在 RDD 中,每一行的形式为 Row(key = ..., values = ..., indcies =..., shape = ...)。值、索引和形状等效于此答案中的值、索引和dense_shape: Use coo_matrix in TensorFlow。它们用于生成 SparseTensorValue。不同之处在于,在我的代码中,每一行都会生成一个 SparseTensorValue。

然后我有以下错误:

Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 562, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
    save(x)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 249, in save_function
    self.save_function_tuple(obj)
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 297, in save_function_tuple
    save(f_globals)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
    save(x)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
    save(x)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 562, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 249, in save_function
    self.save_function_tuple(obj)
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
    save(x)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 368, in save_builtin_function
    return self.save_function(obj)
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 247, in save_function
    if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
-------------------------------------------------------------------
PicklingError                     Traceback (most recent call last)
<ipython-input-210-74fa9037373f> in <module>()
      6         prob = y_proba.eval(feed_dict=values: e["values"], 
      7         indices: e["indices"], shape: [1,2318])))
----> 8     predictions_1.take(5)

/usr/local/spark/python/pyspark/rdd.pyc in take(self, num)
   1341 
   1342             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1343             res = self.context.runJob(self, takeUpToNumLeft, p)
   1344 
   1345             items += res

/usr/local/spark/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    990         # SparkContext#runJob.
    991         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 992         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
    993         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
    994 

/usr/local/spark/python/pyspark/rdd.pyc in _jrdd(self)
   2453 
   2454         wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
-> 2455                                       self._jrdd_deserializer, profiler)
   2456         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
   2457                                              self.preservesPartitioning)

/usr/local/spark/python/pyspark/rdd.pyc in _wrap_function(sc, func, deserializer, serializer, profiler)
   2386     assert serializer, "serializer should not be empty"
   2387     command = (func, profiler, deserializer, serializer)
-> 2388     pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
   2389     return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
   2390                                   sc.pythonVer, broadcast_vars, sc._javaAccumulator)

/usr/local/spark/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc, command)
   2372     # the serialized command will be compressed by broadcast
   2373     ser = CloudPickleSerializer()
-> 2374     pickled_command = ser.dumps(command)
   2375     if len(pickled_command) > (1 << 20):  # 1M
   2376         # The broadcast will have same life cycle as created PythonRDD

/usr/local/spark/python/pyspark/serializers.pyc in dumps(self, obj)
    458 
    459     def dumps(self, obj):
--> 460         return cloudpickle.dumps(obj, 2)
    461 
    462 

/usr/local/spark/python/pyspark/cloudpickle.pyc in dumps(obj, protocol)
    702 
    703     cp = CloudPickler(file,protocol)
--> 704     cp.dump(obj)
    705 
    706     return file.getvalue()

/usr/local/spark/python/pyspark/cloudpickle.pyc in dump(self, obj)
    160                 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    161             print_exec(sys.stderr)
--> 162             raise pickle.PicklingError(msg)
    163 
    164     def save_memoryview(self, obj):

PicklingError: Could not serialize object: AttributeError: 'builtin_function_or_method' object has no attribute '__code__'

在上面的代码中,如果我将prob = y_proba.eval(feed_dict=values: e["values"], indices: e["indices"], shape: [1,2318]))) 更改为python 定义的函数,如proba = test(e["values"],e["indices"], [1,2318]),它将起作用。此外,如果我只在 python 中使用y_proba.eval(不在 RDD 映射中),它也可以工作。

【问题讨论】:

你试过用 run 代替 eval 吗? run 会给出同样的错误。不过,我刚刚找到了解决办法,请看下面我的回答。 【参考方案1】: 将模型分发到每台机器(您可以使用SparkFiles)。

重写

def predict(rows, worker_session_path):
    with tf.Session(graph=graph) as sess:
        # load the trained model
        saver.restore(sess, worker_session_path)
        # lib is the RDD, each Row has the form of Row(key = ..., values = ..., indcies =..., shape = ...)
        return map(lambda e: Row(key = e["key"], 
            prob = y_proba.eval(feed_dict=values: e["values"], 
            indices: e["indices"], shape: [1,2318])), rows)

mapPartitions一起使用

lib.mapPartitions(lambda rows: predict(rows, worker_session_path))

【讨论】:

感谢您的回答。尽管通过运行您的代码我得到了同样的错误,但我提出了受您回答启发的解决方案。有关详细信息,请参阅我发布的答案。【参考方案2】:

感谢@user8371915,受到他的回答和相关主题的启发:Transform map to mapPartition using pyspark,我可以完成工作。解决方案的关键是在 mapPartitions 使用的函数内部构建张量流图,而不是在函数外部。这是有效的代码:

def predict(rows,worker_session_path):

    n_inputs = 2318 # the second dimension of the input sparse matrix X
    n_hidden1 = 200 # first hidden layer neuron 
    n_hidden2 = 20 # second hidden layer neuron 
    n_outputs = 2 # binary classification
    # build the graph as in the training model
    graph = tf.Graph()
    with graph.as_default():
        # for sparse tensor X
        values = tf.placeholder(tf.float32) 
        indices = tf.placeholder(tf.int64)
        shape = tf.placeholder(tf.int64)

        y = tf.placeholder(tf.int32, shape=(None), name="y")

        training = tf.placeholder_with_default(False, shape=(), name='training')

        with tf.name_scope("dnn"):
            hidden1 = first_layer(values, indices, shape, n_hidden1, name="hidden1", 
                                  activation=tf.nn.relu, n_inputs = n_inputs)
            hidden1_drop = tf.layers.dropout(hidden1, dropout_rate, training=training)
            hidden2 = neuron_layer(hidden1_drop, n_hidden2, name="hidden2",
                                   activation=tf.nn.relu)
            hidden2_drop = tf.layers.dropout(hidden2, dropout_rate, training=training)
            logits = neuron_layer(hidden2_drop, n_outputs, name="outputs")
            y_proba = tf.nn.softmax(logits)

        saver = tf.train.Saver()

    with tf.Session(graph=graph) as sess:
        saver.restore(sess, worker_session_path)
        for e in rows:
            proba = sess.run(y_proba, feed_dict=indices:e["indices"], 
                                             values:e["values"], shape: [1,2318])
            # np.squeeze convet proba shape from (1,2) to (2,)
            yield(Row(key = e['key'], proba = np.squeeze(proba)))

lib2 = lib.mapPartitions(lambda rows: predict(rows, "./nonClass_gender"))
lib2.take(5)

【讨论】:

以上是关于无法序列化对象:AttributeError:“builtin_function_or_method”对象没有属性“__code__”的主要内容,如果未能解决你的问题,请参考以下文章

DRF 序列化程序错误:AttributeError:“FeedPostSerializer”对象没有属性“auth_user”

Python AttributeError:“模块”对象没有属性“序列”[重复]

AttributeError:“SequenceGenerator”对象没有属性“形状”自定义序列生成器 Keras 2.2.4

AttributeError:“str”对象没有属性“errno”

无法安装模块 - AttributeError: 'NoneType' 对象没有属性 'loader'

Python多处理:AttributeError:无法腌制本地对象