如何在 spark scala 中加入 2 rdd

Posted

技术标签:

【中文标题】如何在 spark scala 中加入 2 rdd【英文标题】:how to join 2 rdd's in spark scala 【发布时间】:2021-09-08 17:23:03 【问题描述】:

我有 2 个如下的 RDD

val rdd1 = spark.sparkContext.parallelize(Seq((123, List(("000000011119",20),("000000011120",30),("000000011121",50))),(234, List(("000000011119",20),("000000011120",30),("000000011121",50)))))
val rdd2 = spark.sparkContext.parallelize(Seq((123, List("000000011119","000000011120")),(234, List("000000011121","000000011120"))))

我想根据 rdd2 中的密钥对来执行 rdd1 中的值相加。

需要输出:

RDD[(123,50),(234,80)]

任何帮助将不胜感激。

【问题讨论】:

【参考方案1】:

实际上,这是对行的第一个元素和每个内容的第一个元素的连接。

所以我会把它分解成多行并以这种方式加入

val flat1 = rdd1.flatMap(r => r._2.map(e => ((r._1, e._1), e._2))) // looks like ((234,000000011119),20)
val flat2 = rdd2.flatMap(r => r._2.map(e => ((r._1, e), true))) // looks like ((234,000000011121),true)

val res =  flat1.join(flat2)
  .map(r => (r._1._1, r._2._1))  // looks like (123, 30)
  .reduceByKey(_ + _)  // total each key group

.foreach(println) 的结果

scala> :pas
// Entering paste mode (ctrl-D to finish)

flat1.join(flat2)
  .map(r => (r._1._1, r._2._1))  // looks like (123, 30)
  .reduceByKey(_ + _)  // total each key group
  .foreach(println)

// Exiting paste mode, now interpreting.

(123,50)
(234,80)

像往常一样,使用 Dataset 时这些东西要简单得多,所以这将是我对未来的建议。

【讨论】:

以上是关于如何在 spark scala 中加入 2 rdd的主要内容,如果未能解决你的问题,请参考以下文章

hadoop2.0中加入全新的集群资源管理器,下面哪个不是yarn中的组件

Spark:scala - 如何将集合从 RDD 转换为另一个 RDD

在 Spark 中加入数据集

Spark Scala 根据另一个 RDD 的列删除一个 RDD 中的行

如何在 Scala Spark 中对 RDD 进行排序?

如何知道 Spark 使用 Scala 推断出的 RDD 类型是啥