如何通过一个键加入两个 RDD? [复制]
Posted
技术标签:
【中文标题】如何通过一个键加入两个 RDD? [复制]【英文标题】:How to join two RDDs by a key? [duplicate] 【发布时间】:2015-11-22 15:20:33 【问题描述】:animals_population_file = sc.textFile("input/myFile1.txt")
animals_place_file = sc.textFile("input/myFile2.txt")
animals_population_file:
Dogs, 5
Cats, 6
animals_place_file:
Dogs, Italy
Cats, Italy
Dogs, Spain
现在我想加入animals_population_file
和animals_place_file
,使用动物类型作为键。
结果应该是这个:
Dogs, [Italy, Spain, 5]
Cats, [Italy, 6]
我试过joined = animals_population_file.join(animals_place_file)
,但我不知道如何定义密钥。另外,当我运行joined.collect()
时,它给了我一个错误:
298 raise Py4JJavaError(
299 'An error occurred while calling 012.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling o247.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 1 times, most recent failure: Lost task 0.0 in stage 21.0 (TID 29, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/pyspark/worker.py", line 101, in main
process()
File "/usr/lib/spark/python/pyspark/worker.py", line 96, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/pyspark/serializers.py", line 236, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/lib/spark/python/pyspark/rdd.py", line 1807, in <lambda>
map_values_fn = lambda (k, v): (k, f(v))
ValueError: too many values to unpack
【问题讨论】:
@zero323:我尝试了上述线程中提供的解决方案。但是,我的问题是我不知道如何在此代码 df1.join(df2, df1.k == df2.k, joinType='inner') 中指向密钥。另外,如果我只是执行 df1.join(df2),那么 collect() 会出错。指出的线程中没有解释。 您上面显示的是文件的确切内容? @zero323:确切内容:RDD1 = [u'Surreal_Games', u'269', u'Hourly_Games', u'428', u'Hot_Talking', u'747', u'Almost_Sports ', u'350'] 和另一个 RDD2 = [ u'Loud_Games', u'BAT', u'Cold_Talking', u'DEF', u'Surreal_Sports', u'XYZ', u'Hourly_Sports', u'CAB ', u'Hot_Talking', u'MAN', u'Almost_Cooking', u'BAT'] 没有内部元组或列表?喜欢[(u'Surreal_Games', u'269'), ...]
。
@zero323:是的,我输入的是来自 RDD1.collect() 和 RDD2.collect() 的精确复制粘贴
【参考方案1】:
运行 textFile 时没有 PairRdd(基于评论中的 rdds 内容)。 但是,要进行连接,您需要 PairRDD。 所以把你的输入变成pairRDDs
val rdd1 = sc.textFile("input/myFile1.txt")
val rdd2 = sc.textFile("input/myFile2.txt")
val data1 = rdd1.map(line => line.split(",").map(elem => elem.trim))
val data2 = rdd2.map(line => line.split(",").map(elem => elem.trim))
val pairRdd1 = data1.map(r => (r(0), r)) /** zero index is the animal type which is the key in file 1*/
val pairRdd2 = data2.map(r => (r(0), r)) /** zero index is the animal type which is the key in file 2 as well */
val joined = pairRdd1.join(pairRdd2)
val local = joined.collect()
local.foreachcase (k, v) =>
print(k + " : ")
println(v._1.mkString("|") + "|" + v._2.mkString("|"))
【讨论】:
以上是关于如何通过一个键加入两个 RDD? [复制]的主要内容,如果未能解决你的问题,请参考以下文章