如何通过 RDD Scala 与 join 进行映射

Posted

技术标签:

【中文标题】如何通过 RDD Scala 与 join 进行映射【英文标题】:How to map with join by RDD Scala 【发布时间】:2019-11-19 20:41:33 【问题描述】:

我有一个 (id-(name-value) 对的列表。像这样

val input =  sc.parallelize(Array(Array(1, "a 10"),
                                  Array(1, "b 11"), 
                                  Array(3, "a 12"),
                                  Array(3, "b 13"),
                                  Array(3, "c 14"),
                                  Array(4, "b 15")))


map阶段的key是id,value是(name-value)字符串。

val rdd = input.map(x => (x(0), x(1)))

我的预期结果是:对于每个 id,使用 f() 函数比较基于名称的值。

例如,id == "3",我们在reduce阶段后得到结果:

(key: ab, value: f(12,13))
(key: ac, value: f(12,14))
(key: bc, value: f(13,14))

【问题讨论】:

【参考方案1】:

RDD可以和自己连接获取所有pair,过滤只留下需要的行:

// split string value on two parts
val rdd = input.map(x => (x(0), x(1).toString.split(" ")))
  .map( case (key, parts) => (key, (parts(0), parts(1))) )

// join , filter, and transform to expected
val both = rdd
  .join(rdd)
  .filter( case (_, (v1, v2)) => v1._1 < v2._1 )
  .map( case (key, (v1, v2)) => (s"[$key] key: " + v1._1 + v2._1, s"value: f($v1._2,$v2._2)") )

输出:

([1] key: ab,value: f(10,11))
([3] key: ab,value: f(12,13))
([3] key: ac,value: f(12,14))
([3] key: bc,value: f(13,14))

PS:这里可以使用高级过滤。

【讨论】:

以上是关于如何通过 RDD Scala 与 join 进行映射的主要内容,如果未能解决你的问题,请参考以下文章

Scala RDD 反向键值对

如何在 Scala Spark 中对 RDD 进行排序?

如何使用scala对spark中rdd的每一行进行排序?

如何将 Spark/Scala RDD 合并/加入到 List 中,以便 RDD 中的每个值与每个 List 项一起获得一个新行

如何将 RDD [GenericRecord] 转换为 scala 中的数据框?

RDD和DataFrame转换(Java+Scala)