Scala - 基于 Spark 中的键合并两个 RDD

Posted

技术标签:

【中文标题】Scala - 基于 Spark 中的键合并两个 RDD【英文标题】:Scala - Merge Two RDDs Based on Key in Spark 【发布时间】:2018-09-11 12:27:32 【问题描述】:

我正在使用 Scala 编写 Spark 应用程序。我有以下两个 RDD:

(a, 1, some_values1)
(b, 1, some_values2)
(c, 1, some_values3)

(a, 2, some_values1)
(b, 2, some_values2)
(a, 3, some_values1)
(b, 3, some_values2)

我正在尝试得到这个输出:

(a, 1, 2, computed_values1)
(b, 1, 2, computed_values2)
(c, 1, 2, None)
(a, 1, 3, computed_values1)
(b, 1, 3, computed_values2)
(c, 1, 3, None)

因此,这里的字母用于将第一个 RDD 中的每条记录与第二个 RDD 匹配。我尝试使用join 方法,但没有记录c。我怎样才能做到这一点?

更新

另一个例子:

(a, 1, some_values1)
(b, 1, some_values2)
(c, 1, some_values3)

(a, 2, some_values1)
(b, 2, some_values2)
(a, 3, some_values1)
(b, 3, some_values2)
(c, 3, some_values2)

我正在尝试得到这个输出:

(a, 1, 2, computed_values1)
(b, 1, 2, computed_values2)
(c, 1, 2, None)
(a, 1, 3, computed_values1)
(b, 1, 3, computed_values2)
(c, 1, 3, computed_values3)

【问题讨论】:

Join "outer" 可用于保留两个数据帧中的行,这里有一些信息:***.com/questions/45990633/… @pasha701 是的,我知道,实际上我尝试使用外连接来实现,但没有成功。 【参考方案1】:

如果我正确理解您的要求,这是一种方法:

    创建一个 RDD,比如 rdd2c2,它具有来自 rdd2 的第二列的不同值 对rdd1rdd2c2 执行cartesian join 并将结果转换为RDD[K,V] 以使the 第一列和rdd2c2 列作为其keyrdd2 转换为RDD[K,V] 以使其第一列和第二列作为key 对两个 RDD[K,V] 执行 leftOuterJoin 并将元素转换为所需的结构

示例代码:

val rdd1 = sc.parallelize(Seq(
  ("a", 1, "some_values1"),
  ("b", 1, "some_values2"),
  ("c", 1, "some_values3")
))

val rdd2 = sc.parallelize(Seq(
  ("a", 2, "some_values1"),
  ("b", 2, "some_values2"),
  ("a", 3, "some_values1"),
  ("b", 3, "some_values2"),
  ("c", 3, "some_values2")
))

val rdd2c2 = rdd2.map(_._2).distinct
// rdd2c2.collect: Array[Int] = Array(2, 3)

val rddKV1 = rdd1.cartesian(rdd2c2).
  map case (a, b) => ((a._1, b), (a._2, a._3))
// rddKV1.collect: Array[((String, Int), (Int, String))] = Array(
//   ((a,2),(1,some_values1)),
//   ((a,3),(1,some_values1)),
//   ((b,2),(1,some_values2)),
//   ((b,3),(1,some_values2)),
//   ((c,2),(1,some_values3)),
//   ((c,3),(1,some_values3)))

val rddKV2 = rdd2.map(r => ((r._1, r._2), r._3))
// rddKV2.collect: Array[((String, Int), String)] = Array(
//   ((a,2),some_values1),
//   ((b,2),some_values2),
//   ((a,3),some_values1),
//   ((b,3),some_values2),
//   ((c,3),some_values2))

val rddJoined = rddKV1.leftOuterJoin(rddKV2).
  map case (k, v) => (k._1, v._1._1, k._2, v._2) 
// rddJoined.collect: Array[(String, Int, Int, Option[String])] = Array(
//   (a,1,3,Some(some_values1)),
//   (a,1,2,Some(some_values1)),
//   (c,1,2,None),
//   (b,1,2,Some(some_values2)),
//   (b,1,3,Some(some_values2)),
//   (c,1,3,Some(some_values2)))

【讨论】:

感谢您的回答。这实际上将产生给定示例的预期结果。但这里的问题是,如果 rdd2 让我们说“c”和“3”,则生成的 rdd 将不包括:(c, 1, 2, None)。我将更新我的问题以添加此示例。 @m2008m1033m,请根据我对您明确要求的理解查看修改后的解决方案。 成功了,谢谢!关于最后的注释,如果 distinct 列表非常大(因为它实际上是),我们可以使用 sc.broadcast 吗? 事实上,使用适当制作的cartesian join 将消除对collect 的需要。解决方案进一步修订。【参考方案2】:

如果“c”必须只出现一次(猜测,在所需的输出中打印错误),可以使用这样的代码来实现:

val data1 = List(
  ("a", 1, "some_values1"),
  ("b", 1, "some_values2"),
  ("c", 1, "some_values3")
)

val data2 = List(
  ("a", 2, "some_values1"),
  ("b", 2, "some_values2"),
  ("a", 3, "some_values1"),
  ("b", 3, "some_values2")
)

val rdd1 = sparkContext.parallelize(data1)
val rdd2 = sparkContext.parallelize(data2)

val rdd1WithKey = rdd1.map(v => (v._1, (v._2, v._3)))
val rdd2WithKey = rdd2.map(v => (v._1, (v._2, v._3)))

val joined = rdd1WithKey.fullOuterJoin(rdd2WithKey)
joined.foreach(println)

输出:

(b,(Some((1,some_values2)),Some((2,some_values2))))
(a,(Some((1,some_values1)),Some((2,some_values1))))
(b,(Some((1,some_values2)),Some((3,some_values2))))
(a,(Some((1,some_values1)),Some((3,some_values1))))
(c,(Some((1,some_values3)),None))

【讨论】:

这就是问题所在。我正在尝试为第二个 rdd 中的每个数字获取“c”。

以上是关于Scala - 基于 Spark 中的键合并两个 RDD的主要内容,如果未能解决你的问题,请参考以下文章

将新行与spark scala中数据框中的前一行数据合并

如何根据javascript中的键合并和替换两个数组中的对象?

基于没有重复的键合并两个字典

如何将 Spark/Scala RDD 合并/加入到 List 中,以便 RDD 中的每个值与每个 List 项一起获得一个新行

Scala中的Spark分组映射UDF

Spark Scala 统计 Map Key 中字符串数组的出现次数