如何通过 RDD Scala 与 join 进行映射
Posted
技术标签:
【中文标题】如何通过 RDD Scala 与 join 进行映射【英文标题】:How to map with join by RDD Scala 【发布时间】:2019-11-19 20:41:33 【问题描述】:我有一个 (id-(name-value) 对的列表。像这样
val input = sc.parallelize(Array(Array(1, "a 10"),
Array(1, "b 11"),
Array(3, "a 12"),
Array(3, "b 13"),
Array(3, "c 14"),
Array(4, "b 15")))
map阶段的key是id,value是(name-value)字符串。
val rdd = input.map(x => (x(0), x(1)))
我的预期结果是:对于每个 id,使用 f() 函数比较基于名称的值。
例如,id == "3",我们在reduce阶段后得到结果:
(key: ab, value: f(12,13))
(key: ac, value: f(12,14))
(key: bc, value: f(13,14))
【问题讨论】:
【参考方案1】:RDD可以和自己连接获取所有pair,过滤只留下需要的行:
// split string value on two parts
val rdd = input.map(x => (x(0), x(1).toString.split(" ")))
.map( case (key, parts) => (key, (parts(0), parts(1))) )
// join , filter, and transform to expected
val both = rdd
.join(rdd)
.filter( case (_, (v1, v2)) => v1._1 < v2._1 )
.map( case (key, (v1, v2)) => (s"[$key] key: " + v1._1 + v2._1, s"value: f($v1._2,$v2._2)") )
输出:
([1] key: ab,value: f(10,11))
([3] key: ab,value: f(12,13))
([3] key: ac,value: f(12,14))
([3] key: bc,value: f(13,14))
PS:这里可以使用高级过滤。
【讨论】:
以上是关于如何通过 RDD Scala 与 join 进行映射的主要内容,如果未能解决你的问题,请参考以下文章
如何将 Spark/Scala RDD 合并/加入到 List 中,以便 RDD 中的每个值与每个 List 项一起获得一个新行