不可散列的类型:加入 PySpark RDD 时的“列表”

Posted

技术标签:

【中文标题】不可散列的类型:加入 PySpark RDD 时的“列表”【英文标题】:unhashable type: 'list' while joining PySpark RDDs 【发布时间】:2016-02-17 11:23:03 【问题描述】:

在 pipelineRDD 上运行 collect 时出现错误。我也查了 View RDD contents in Python Spark? 我有两个需要在 spark 中运行的输入文件。

文件格式如下:

able,991
about,11
burger,15
actor,22
[cloudera@quickstart ~]$ hdfs dfs -cat /user/cloudera/input/join1_FileB.txt
n-01 able,5
Feb-02 about,3
Mar-03 about,8
Apr-04 able,13
Feb-22 actor,3
Feb-23 burger,5
Mar-08 burger,2
Dec-15 able,100

我还为fileAfileB 创建了映射器并验证了结果。

def split_fileB(line):
    key_val1 = line.split(",")
    dt_word  = key_val1[0].split(" ")
    count    = key_val1[1]
    date     = dt_word[0]
    word     = dt_word[1]
    return(word,date + " " + count)

def split_fileA(line):
    key_val = line.split(",")
    word = key_val[0].split(" ")
    count = int(key_val[1])
    return(word,count)

fileA_data = fileA.map(split_fileA)
fileA_data.collect()
## [(u'able', 991), (u'about', 11), (u'burger', 15), (u'actor', 22)] 

fileB_data = fileA.map(split_fileB)
fileB_data.collect()
## [(u'able', u'n-01 5'),
## (u'about', u'Feb-02 3'),
## (u'about', u'Mar-03 8'),
## (u'able', u'Apr-04 13'),
## (u'actor', u'Feb-22 3'),
## (u'burger', u'Feb-23 5'),
## (u'burger', u'Mar-08 2'),
## (u'able', u'Dec-15 100')]

fileB_joined_fileA = fileB_data.join(fileA_data)
fileB_joined_fileA.collect()


fileB_joined_fileA.collect()

追溯:

    16/02/17 03:20:01 INFO scheduler.DAGScheduler: Job 14 failed: collect at <ipython-input-45-20609ef53c7a>:1, took 0.318953 s
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-45-20609ef53c7a> in <module>()
----> 1 fileB_joined_fileA.collect()

/usr/lib/spark/python/pyspark/rdd.py in collect(self)
    699         """
    700         with SCCallSiteSync(self.context) as css:
--> 701             bytesInJava = self._jrdd.collect().iterator()
    702         return list(self._collect_iterator_through_file(bytesInJava))
    703 

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling 012.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o319.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 17.0 failed 1 times, most recent failure: Lost task 1.0 in stage 17.0 (TID 18, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 101, in main
    process()
  File "/usr/lib/spark/python/pyspark/worker.py", line 96, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2253, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 270, in func
    return f(iterator)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1706, in combineLocally
    merger.mergeValues(iterator)
  File "/usr/lib/spark/python/pyspark/shuffle.py", line 253, in mergeValues
    d[k] = comb(d[k], v) if k in d else creator(v)
TypeError: unhashable type: 'list'

【问题讨论】:

【参考方案1】:

首先,您提供的输出与您的代码不匹配。 split_fileA首先用逗号分割输入字符串a:

key_val = line.split(",")

然后key_val 的第一个元素在空白处:

word = key_val[0].split(" ")

这意味着,假设没有格式错误的行,word 实际上是 list,而不是 string

"able,991".split(",")[0].split(" ")
## ['able']

因此,join 转换毫无意义并且无法工作,因为列表不可散列。

另见A list as a key for PySpark's reduceByKey

【讨论】:

以上是关于不可散列的类型:加入 PySpark RDD 时的“列表”的主要内容,如果未能解决你的问题,请参考以下文章

Python - 重复数据删除问题:TypeError:不可散列的类型:'numpy.ndarray'

nltk 的朴素基分类器给出不可散列的类型错误

多标签计算类权重 - 不可散列的类型

如何在其上制作一个简单的 Django URLconf 和 reverse() 进行测试? (获取TypeError:不可散列的类型:'list')

随机数生成器,不可散列的类型“列表”

TypeError:使用一组 UDT 创建 Cassandra Python 驱动程序模型时不可散列的类型 UserType