spark:根据另一个 rdd 的序列加入 rdd

Posted

技术标签:

【中文标题】spark:根据另一个 rdd 的序列加入 rdd【英文标题】:spark: join rdd based on sequence of another rdd 【发布时间】:2017-05-11 10:04:44 【问题描述】:

我有一个 rdd 说 sample_rdd 类型为 RDD[(String, String, Int))] 有 3 列 id、item、count。样本数据:

id1|item1|1 id1|item2|3 id1|item3|4 id2|item1|3 id2|item4|2

我想将每个 id 加入到 lookup_rdd 这个:

item1|0 item2|0 item3|0 item4|0 item5|0

输出应该为我提供以下 id1、带有查找表的外连接:

item1|1 item2|3 item3|4 item4|0 item5|0

同样对于 id2 我应该得到:

item1|3 item2|0 item3|0 item4|2 item5|0

每个 id 的最终输出应该具有 id 的所有计数:

id1,1,3,4,0,0 id2,3,0,0,2,0

重要提示:此输出应始终按照查找中的顺序进行排序

这是我尝试过的:

val line = rdd_sample.map case (id, item, count) => (id, (item,count)) .map(row=>(row._1,row._2)).groupByKey() get(line).map(l=>(l._1,l._2)).mapValues(item_count=>lookup_r‌​dd.leftOuterJoin(ite‌​m_count))

def get (line: RDD[(String, Iterable[(String, Int)])]) = for (id, item_cnt) <- line i = item_cnt.map(tuple => (tuple._1,tuple._2)) yield (id,i)

【问题讨论】:

val line = rdd_sample.map case (id, item, count) => (id, (item,count)) .map(row=>(row._1,row._2)).groupByKey() get(line).map(l=>(l._1,l._2)).mapValues(item_count=>lookup_rdd.leftOuterJoin(item_count)) 函数:def get (line: RDD[(String, Iterable[(String, Int)])]) = for (id, item_cnt) <- line i = item_cnt.map(tuple => (tuple._1,tuple._2)) yield (id,i) 您可以将其编辑到问题中。 @NanditaDwivedi 你试过解决方案了吗? 【参考方案1】:

试试下面。在本地控制台上运行每个步骤以了解详细情况。

思路是zipwithindex,根据lookup_rdd形成seq。 (i1,0),(i2,1)..(i5,4)(id1,0),(id2,1)

Index of final result wanted = [delta(length of lookup_rdd seq) * index of id1..id2 ] + index of i1...i5

所以生成的基本序列将是(0,(i1,id1)),(1,(i2,id1))...(8,(i4,id2)),(9,(i5,id2))

然后根据key(i1,id1)减少并计算count。

val res2 = sc.parallelize(arr) //sample_rdd
val res3 = sc.parallelize(cart) //lookup_rdd
val delta = res3.count

val res83 = res3.map(_._1).zipWithIndex.cartesian(res2.map(_._1).distinct.zipWithIndex).map(x => (((x._1._1,x._2._1),((delta * x._2._2) + x._1._2, 0)))

val res86 = res2.map(x => ((x._2,x._1),x._3)).reduceByKey(_+_)

val res88 = res83.leftOuterJoin(res86)

val res91 = res88.map( x => 
    x._2._2 match 
       case Some(x1) => (x._2._1._1, (x._1,x._2._1._2+x1))
       case None => (x._2._1._1, (x._1,x._2._1._2))
    
)

val res97 = res91.sortByKey(true).map( x => 
(x._2._1._2,List(x._2._2))).reduceByKey(_++_)

res97.collect

// SOLUTION: Array((id1,List(1,3,4,0,0)),(id2,List(3,0,0,2,0)))

【讨论】:

感谢您的解决方案,它有效!但是如果 lookup_rdd 有 6k 个 id 并且 sample_rdd 文件可以达到 10GB,这种方法是否好? 你可以试一试并发布表演吗?我没有那么多数据。

以上是关于spark:根据另一个 rdd 的序列加入 rdd的主要内容,如果未能解决你的问题,请参考以下文章

从另一个 rdd 中搜索 rdd 的值

浅谈Spark算子

来自 RDD 映射的 Spark Scala 序列化错误

Spark:scala - 如何将集合从 RDD 转换为另一个 RDD

如何将 Spark/Scala RDD 合并/加入到 List 中,以便 RDD 中的每个值与每个 List 项一起获得一个新行

在另一个 RDD 的基础上修剪一个 RDD