基于 Pyspark 中的键加入 RDD
Posted
技术标签:
【中文标题】基于 Pyspark 中的键加入 RDD【英文标题】:Join RDDs based on keys in Pyspark 【发布时间】:2018-10-24 05:39:02 【问题描述】:我有一个如下所示的文本文件:
OrderId|OrderItem|OrderDate|OrderPrice|ItemQuantity 1|气体|2018-01-17|1895|1 1|空调|2018-01-28|19000|3 1|燃气|2018-01-17|2300|1 1|电视|2018-01-11|45000|2 2|气|2018-01-17|1895|1 2|空调|2017-01-28|19000|3 2|气体|2016-01-17|2300|1 1|瓶&&|2018-03-24|45|10 1|食用油|2018-04-22|100|3 3|逆变器|2015-11-02|29000|1 3|气体|2014-01-09|2300|1 3|电视|2018-01-17|45000|2 1|气体|2011-01-27|1895|1 1|空调|2018-01-28|19000|3 4|燃气|2018-01-17|2300|1 4|电视$$|2018-01-17|45000|2 5|医药|2016-03-14|23.50|8 5|止咳糖浆|2016-01-28|190|1 5|冰淇淋|2014-09-23|300|7 5|意大利面|2015-06-30|65|2
textdata = sc.textFile("/user/OrderInputFile")
header=textdata.first();
textnewdata = textdata.filter(lambda x:x != header)
splittextdataRDD= textnewdata.map(lambda x: x.split('|'))
filtersplittextdataRDD1 = splittextdataRDD.filter(lambda x : x[0]=='1' or x[0]=='4')
filtersplittextdataRDD2 = splittextdataRDD.filter(lambda x : x[0]=='2' or x[0]=='4')
#creating pair RDDS using key on first position:
pairfiltersplittextdataRDD1 = filtersplittextdataRDD1.map(lambda x :(x[0],x[1:]))
pairfiltersplittextdataRDD2 = filtersplittextdataRDD2.map(lambda x :(x[0],x[1:]))
I am facing issues while joining RDDs on keys
pairjoinRDD = filtersplittextdataRDD1.join(filtersplittextdataRDD2).map(lambda(x[0],(x1,x2)):x1+x2)
【问题讨论】:
【参考方案1】:我能够加入我的 RDD。以下是解决方案:
pairjoinRDD = filtersplittextdataRDD1.map(lambda x:(x[0],x)).join(filtersplittextdataRDD2.map(lambda x:(x[0],x )))
这是输出:
>>> newRdd.take(20);
[(u'4', ([u'4', u'Gas', u'2018-01-17', u'2300', u'1'], [u'4', u'Gas', u'2018-01-17', u'2300', u'1'])), (u'4', ([u'4', u'Gas', u'2018-01-17', u'2300', u'1'], [u'4', u'Television', u'2018-01-17', u'45000', u'2'])), (u'4', ([u'4', u'Television', u'2018-01-17', u'45000', u'2'], [u'4', u'Gas', u'2018-01-17', u'2300', u'1'])), (u'4', ([u'4', u'Television', u'2018-01-17', u'45000', u'2'], [u'4', u'Television', u'2018-01-17', u'45000', u'2']))]
【讨论】:
以上是关于基于 Pyspark 中的键加入 RDD的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark 中的广播加入得到 OnOutOfMemoryError