如何将 Spark/Scala RDD 合并/加入到 List 中,以便 RDD 中的每个值与每个 List 项一起获得一个新行

Posted

技术标签:

【中文标题】如何将 Spark/Scala RDD 合并/加入到 List 中,以便 RDD 中的每个值与每个 List 项一起获得一个新行【英文标题】:How to merge/join Spark/Scala RDD to List so each value in RDD gets a new row with each List item 【发布时间】:2021-08-31 16:11:51 【问题描述】:

假设我有一个 List[String],我想将它与一个 RDD 对象合并,以便 RDD 中的每个对象都将列表中的每个值添加到它:

List[String] myBands = ["Band1","Band2"]; 

表:乐队成员 |名称 |仪器 | | ----- | ---------- | |斜线 |吉他 | |车轴 |人声|

case class BandMembers ( name:String, instrument:String );
var myRDD = BandMembersTable.map(a => new BandMembers(a.name, a.instrument));  
//join the myRDD to myBands
// how do I do this?
//var result = myRdd.join/merge/union(myBands); 

期望的结果: |名称 |仪器 |乐队 | | ----- | ---------- |------| |斜线 |吉他 |乐队1| |斜线 |吉他 |带2| |车轴 |人声 |乐队1| |车轴 |人声 |乐队2|

我不太确定如何以最适合 Spark/Scala 的方式解决此问题。我知道我可以转换为 DF,然后使用 spark sql 进行连接,但必须有更好的方法来使用 RDD 和 List,或者我认为。

【问题讨论】:

【参考方案1】:

这里的风格有点偏离,但假设你真的需要 RDD 而不是 Dataset

所以对于 RDD:

case class BandMembers ( name:String, instrument:String )
val myRDD = spark.sparkContext.parallelize(BandMembersTable.map(a => new BandMembers(a.name, a.instrument)))
val myBands = spark.sparkContext.parallelize(Seq("Band1","Band2"))
val res = myRDD.cartesian(myBands).map  case (a,b) => Row(a.name, a.instrument, b) 

使用数据集:

case class BandMembers ( name:String, instrument:String )
val myRDD = BandMembersTable.map(a => new BandMembers(a.name, a.instrument)).toDS
val myBands = Seq("Band1","Band2").toDS
val res = myRDD.crossJoin(myBands)

输入数据:

val BandMembersTable = Seq(BandMembers("a", "b"), BandMembers("c", "d"))
val myBands = Seq("Band1","Band2")

带有数据集的输出:

+----+----------+-----+
|name|instrument|value|
+----+----------+-----+
|a   |b         |Band1|
|a   |b         |Band2|
|c   |d         |Band1|
|c   |d         |Band2|
+----+----------+-----+

带有 RDD 的 Println(这些是行)

[a,b,Band1]
[c,d,Band2]
[c,d,Band1]
[a,b,Band2]

【讨论】:

【参考方案2】:

考虑为此使用 RDD zip。来自官方文档

RDD> zip(RDD other, scala.reflect.ClassTag evidence$11) 用另一个 RDD 压缩这个 RDD,返回键值对,每个 RDD 中的第一个元素,每个 RDD 中的第二个元素,

【讨论】:

以上是关于如何将 Spark/Scala RDD 合并/加入到 List 中,以便 RDD 中的每个值与每个 List 项一起获得一个新行的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark Scala 中将 RDD[(String, String, String)] 转换为 RDD[(String, (String, String))]

如何在 Spark Scala 高效的全外连接中合并连接多个 DataFrame

如何在 spark scala 中加入 2 rdd

Spark Scala 根据另一个 RDD 的列删除一个 RDD 中的行

如何对 spark scala RDD 中的元组列表/数组执行转换?

Spark/Scala:仅使用 RDD 使用 ReduceByKey 创建嵌套结构