不可散列的类型:加入 PySpark RDD 时的“列表”
Posted
技术标签:
【中文标题】不可散列的类型:加入 PySpark RDD 时的“列表”【英文标题】:unhashable type: 'list' while joining PySpark RDDs 【发布时间】:2016-02-17 11:23:03 【问题描述】:在 pipelineRDD 上运行 collect 时出现错误。我也查了 View RDD contents in Python Spark? 我有两个需要在 spark 中运行的输入文件。
文件格式如下:
able,991
about,11
burger,15
actor,22
[cloudera@quickstart ~]$ hdfs dfs -cat /user/cloudera/input/join1_FileB.txt
n-01 able,5
Feb-02 about,3
Mar-03 about,8
Apr-04 able,13
Feb-22 actor,3
Feb-23 burger,5
Mar-08 burger,2
Dec-15 able,100
我还为fileA
和fileB
创建了映射器并验证了结果。
def split_fileB(line):
key_val1 = line.split(",")
dt_word = key_val1[0].split(" ")
count = key_val1[1]
date = dt_word[0]
word = dt_word[1]
return(word,date + " " + count)
def split_fileA(line):
key_val = line.split(",")
word = key_val[0].split(" ")
count = int(key_val[1])
return(word,count)
fileA_data = fileA.map(split_fileA)
fileA_data.collect()
## [(u'able', 991), (u'about', 11), (u'burger', 15), (u'actor', 22)]
fileB_data = fileA.map(split_fileB)
fileB_data.collect()
## [(u'able', u'n-01 5'),
## (u'about', u'Feb-02 3'),
## (u'about', u'Mar-03 8'),
## (u'able', u'Apr-04 13'),
## (u'actor', u'Feb-22 3'),
## (u'burger', u'Feb-23 5'),
## (u'burger', u'Mar-08 2'),
## (u'able', u'Dec-15 100')]
fileB_joined_fileA = fileB_data.join(fileA_data)
fileB_joined_fileA.collect()
fileB_joined_fileA.collect()
追溯:
16/02/17 03:20:01 INFO scheduler.DAGScheduler: Job 14 failed: collect at <ipython-input-45-20609ef53c7a>:1, took 0.318953 s
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-45-20609ef53c7a> in <module>()
----> 1 fileB_joined_fileA.collect()
/usr/lib/spark/python/pyspark/rdd.py in collect(self)
699 """
700 with SCCallSiteSync(self.context) as css:
--> 701 bytesInJava = self._jrdd.collect().iterator()
702 return list(self._collect_iterator_through_file(bytesInJava))
703
/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling 012.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling o319.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 17.0 failed 1 times, most recent failure: Lost task 1.0 in stage 17.0 (TID 18, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/pyspark/worker.py", line 101, in main
process()
File "/usr/lib/spark/python/pyspark/worker.py", line 96, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/pyspark/rdd.py", line 2253, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 270, in func
return f(iterator)
File "/usr/lib/spark/python/pyspark/rdd.py", line 1706, in combineLocally
merger.mergeValues(iterator)
File "/usr/lib/spark/python/pyspark/shuffle.py", line 253, in mergeValues
d[k] = comb(d[k], v) if k in d else creator(v)
TypeError: unhashable type: 'list'
【问题讨论】:
【参考方案1】:首先,您提供的输出与您的代码不匹配。 split_fileA
首先用逗号分割输入字符串a:
key_val = line.split(",")
然后key_val
的第一个元素在空白处:
word = key_val[0].split(" ")
这意味着,假设没有格式错误的行,word
实际上是 list
,而不是 string
。
"able,991".split(",")[0].split(" ")
## ['able']
因此,join
转换毫无意义并且无法工作,因为列表不可散列。
另见A list as a key for PySpark's reduceByKey
【讨论】:
以上是关于不可散列的类型:加入 PySpark RDD 时的“列表”的主要内容,如果未能解决你的问题,请参考以下文章
Python - 重复数据删除问题:TypeError:不可散列的类型:'numpy.ndarray'
如何在其上制作一个简单的 Django URLconf 和 reverse() 进行测试? (获取TypeError:不可散列的类型:'list')
TypeError:使用一组 UDT 创建 Cassandra Python 驱动程序模型时不可散列的类型 UserType