如何在 Spark Scala 高效的全外连接中合并连接多个 DataFrame
Posted
技术标签:
【中文标题】如何在 Spark Scala 高效的全外连接中合并连接多个 DataFrame【英文标题】:How to Merge Join Multiple DataFrames in Spark Scala Efficient Full Outer Join 【发布时间】:2016-04-06 19:12:51 【问题描述】:如何有效地合并/加入多个 Spark DataFrame (Scala)?我想加入一个所有表共有的列,下面的“日期”,并得到一个稀疏数组作为结果。
Data Set A:
Date Col A1 Col A2
-----------------------
1/1/16 A11 A21
1/2/16 A12 A22
1/3/16 A13 A23
1/4/16 A14 A24
1/5/16 A15 A25
Data Set B:
Date Col B1 Col B2
-----------------------
1/1/16 B11 B21
1/3/16 B13 B23
1/5/16 B15 B25
Data Set C:
Date Col C1 Col C2
-----------------------
1/2/16 C12 C22
1/3/16 C13 C23
1/4/16 C14 C24
1/5/16 C15 C25
Expected Result Set:
Date Col A1 Col A2 Col B1 Col B2 Col C1 Col C2
---------------------------------------------------------
1/1/16 A11 A21 B11 B12
1/2/16 A12 A22 C12 C22
1/3/16 A13 A23 B13 B23 C13 C23
1/4/16 A14 A24 C14 C24
1/5/16 A15 A25 B15 B25 C15 C25
这感觉就像是对多个表的完全外部联接,但我不确定。 是否有一些更简单/更有效的方法可以在 DataFrames 上没有 Join 方法的情况下访问这个稀疏数组?
【问题讨论】:
【参考方案1】:这是一篇旧帖子,所以我不确定 OP 是否仍在调整中。无论如何,实现所需结果的简单方法是通过 cogroup()
。将每个RDD
变成[K,V] RDD
,日期为key,然后使用cogroup。这是一个例子:
def mergeFrames(sc: SparkContext, sqlContext: SQLContext) =
import sqlContext.implicits._
// Create three dataframes. All string types assumed.
val dfa = sc.parallelize(Seq(A("1/1/16", "A11", "A21"),
A("1/2/16", "A12", "A22"),
A("1/3/16", "A13", "A23"),
A("1/4/16", "A14", "A24"),
A("1/5/16", "A15", "A25"))).toDF()
val dfb = sc.parallelize(Seq(
B("1/1/16", "B11", "B21"),
B("1/3/16", "B13", "B23"),
B("1/5/16", "B15", "B25"))).toDF()
val dfc = sc.parallelize(Seq(
C("1/2/16", "C12", "C22"),
C("1/3/16", "C13", "C23"),
C("1/4/16", "C14", "C24"),
C("1/5/16", "C15", "C25"))).toDF()
val rdda = dfa.rdd.map(row => row(0) -> row.toSeq.drop(1))
val rddb = dfb.rdd.map(row => row(0) -> row.toSeq.drop(1))
val rddc = dfc.rdd.map(row => row(0) -> row.toSeq.drop(1))
val schema = StructType("date a1 a2 b1 b2 c1 c2".split(" ").map(fieldName => StructField(fieldName, StringType)))
// Form cogroups. `date` is assumed to be a key so there's at most one row for each date in an rdd/df
val cg: RDD[Row] = rdda.cogroup(rddb, rddc).map case (k, (v1, v2, v3)) =>
val cols = Seq(k) ++
(if (v1.nonEmpty) v1.head else Seq(null, null)) ++
(if (v2.nonEmpty) v2.head else Seq(null, null)) ++
(if (v3.nonEmpty) v3.head else Seq(null, null))
Row.fromSeq(cols)
// Turn RDD back to DataFrame
val cgdf = sqlContext.createDataFrame(cg, schema).sort("date")
cgdf.show
【讨论】:
我已经编辑了我的答案并添加了一些示例代码。希望对您有所帮助。以上是关于如何在 Spark Scala 高效的全外连接中合并连接多个 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章
查询 - 数据集中的全外连接2个不同的表 - LINQ C#
如何在 Windows 中使用 Scala 将 Cassandra 与 Spark 连接起来
如何进行外连接:Spark Scala SQLContext
如何在scala spark中将数据框的特定列与另一个列连接[重复]