如何将 Spark/Scala RDD 合并/加入到 List 中,以便 RDD 中的每个值与每个 List 项一起获得一个新行
Posted
技术标签:
【中文标题】如何将 Spark/Scala RDD 合并/加入到 List 中,以便 RDD 中的每个值与每个 List 项一起获得一个新行【英文标题】:How to merge/join Spark/Scala RDD to List so each value in RDD gets a new row with each List item 【发布时间】:2021-08-31 16:11:51 【问题描述】:假设我有一个 List[String],我想将它与一个 RDD 对象合并,以便 RDD 中的每个对象都将列表中的每个值添加到它:
List[String] myBands = ["Band1","Band2"];
表:乐队成员 |名称 |仪器 | | ----- | ---------- | |斜线 |吉他 | |车轴 |人声|
case class BandMembers ( name:String, instrument:String );
var myRDD = BandMembersTable.map(a => new BandMembers(a.name, a.instrument));
//join the myRDD to myBands
// how do I do this?
//var result = myRdd.join/merge/union(myBands);
期望的结果: |名称 |仪器 |乐队 | | ----- | ---------- |------| |斜线 |吉他 |乐队1| |斜线 |吉他 |带2| |车轴 |人声 |乐队1| |车轴 |人声 |乐队2|
我不太确定如何以最适合 Spark/Scala 的方式解决此问题。我知道我可以转换为 DF,然后使用 spark sql 进行连接,但必须有更好的方法来使用 RDD 和 List,或者我认为。
【问题讨论】:
【参考方案1】:这里的风格有点偏离,但假设你真的需要 RDD 而不是 Dataset
所以对于 RDD:
case class BandMembers ( name:String, instrument:String )
val myRDD = spark.sparkContext.parallelize(BandMembersTable.map(a => new BandMembers(a.name, a.instrument)))
val myBands = spark.sparkContext.parallelize(Seq("Band1","Band2"))
val res = myRDD.cartesian(myBands).map case (a,b) => Row(a.name, a.instrument, b)
使用数据集:
case class BandMembers ( name:String, instrument:String )
val myRDD = BandMembersTable.map(a => new BandMembers(a.name, a.instrument)).toDS
val myBands = Seq("Band1","Band2").toDS
val res = myRDD.crossJoin(myBands)
输入数据:
val BandMembersTable = Seq(BandMembers("a", "b"), BandMembers("c", "d"))
val myBands = Seq("Band1","Band2")
带有数据集的输出:
+----+----------+-----+
|name|instrument|value|
+----+----------+-----+
|a |b |Band1|
|a |b |Band2|
|c |d |Band1|
|c |d |Band2|
+----+----------+-----+
带有 RDD 的 Println(这些是行)
[a,b,Band1]
[c,d,Band2]
[c,d,Band1]
[a,b,Band2]
【讨论】:
【参考方案2】:考虑为此使用 RDD zip。来自官方文档
RDD
【讨论】:
以上是关于如何将 Spark/Scala RDD 合并/加入到 List 中,以便 RDD 中的每个值与每个 List 项一起获得一个新行的主要内容,如果未能解决你的问题,请参考以下文章
在 Spark Scala 中将 RDD[(String, String, String)] 转换为 RDD[(String, (String, String))]
如何在 Spark Scala 高效的全外连接中合并连接多个 DataFrame
Spark Scala 根据另一个 RDD 的列删除一个 RDD 中的行