TypeError: can't pickle generator objects: Spark collect() 由于不可序列化的生成器返回类型(dict_key)而失败

Posted

技术标签:

【中文标题】TypeError: can\'t pickle generator objects: Spark collect() 由于不可序列化的生成器返回类型(dict_key)而失败【英文标题】:TypeError: can't pickle generator objects: Spark collect() fails due to unserializable generator return type (dict_key)TypeError: can't pickle generator objects: Spark collect() 由于不可序列化的生成器返回类型(dict_key)而失败 【发布时间】:2019-02-26 06:36:34 【问题描述】:

我有一个库函数,它返回一个包含生成器的复合对象,它不能被腌制(尝试腌制会产生错误TypeError: can't pickle dict_keys objects)。

当我尝试通过 Spark 进行并行化时,由于 pickle 失败(注意通过 DataBricks 运行,默认为 sc),它在收集步骤中失败。

这是一个最小的复制:

test_list = ["a": 1, "b": 2, "c": 3, 
             "a": 7, "b": 3, "c": 5, 
             "a": 2, "b": 3, "c": 4, 
             "a": 9, "b": 8, "c": 7]

parallel_test_list = sc.parallelize(test_list)

parallel_results = parallel_test_list.map(lambda x: x.keys())

local_results = parallel_results.collect()

我收到的堆栈跟踪很长,我认为相关部分是:

Traceback (most recent call last):
      File "/databricks/spark/python/pyspark/worker.py", line 403, in main
        process()
      File "/databricks/spark/python/pyspark/worker.py", line 398, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/databricks/spark/python/pyspark/serializers.py", line 418, in dump_stream
        bytes = self.serializer.dumps(vs)
      File "/databricks/spark/python/pyspark/serializers.py", line 597, in dumps
        return pickle.dumps(obj, protocol)
    TypeError: can't pickle dict_keys objects

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609)

【问题讨论】:

【参考方案1】:

您可以编写一个递归辅助函数来“使用”所有嵌套的生成器对象,并使用此函数maprdd 中的所有行。

例如,这是一个将嵌套生成器转换为lists 的函数:

from inspect import isgenerator, isgeneratorfunction

def consume_all_generators(row):

    if isinstance(row, str):
        return row
    elif isinstance(row, dict):
        return k: consume_all_generators(v) for k, v in row.items()

    output = []
    try:
        for val in row:
            if isgenerator(val) or isgeneratorfunction(val):
                output.append(list(consume_all_generators(val)))
            else:
                output.append(consume_all_generators(val))
        return output
    except TypeError:
        return row

现在在collect之前调用map(consume_all_generators)

local_results = parallel_results.map(consume_all_generators).collect()
print(local_results)
#[['a', 'c', 'b'], ['a', 'c', 'b'], ['a', 'c', 'b'], ['a', 'c', 'b']]

【讨论】:

以上是关于TypeError: can't pickle generator objects: Spark collect() 由于不可序列化的生成器返回类型(dict_key)而失败的主要内容,如果未能解决你的问题,请参考以下文章

TypeError: can't pickle memoryview objects when running basic add.delay(1,2) test

TypeError: can't pickle generator objects: Spark collect() 由于不可序列化的生成器返回类型(dict_key)而失败

pickle.PicklingError: Can't pickle: it's not the same object as

MUI 的 Autocomplete AS MULTIPLE input + react-hook-form + 控制默认值不起作用(TypeError: Can't read property 'f

pickle.PicklingError: Can't pickle <function past_match_sim at 0x7fa26e03b7b8>: attribute look

multiprocessing.Pool - PicklingError: Can't pickle <type 'thread.lock'>: 属性查找 thread.lock 失败