使用 Scala 在 Apache Spark 中连接不同 RDD 的数据集

Posted

技术标签:

【中文标题】使用 Scala 在 Apache Spark 中连接不同 RDD 的数据集【英文标题】:Concatenating datasets of different RDDs in Apache spark using scala 【发布时间】:2015-02-08 07:37:30 【问题描述】:

有没有办法在 spark 中连接两个不同 RDDs 的数据集?

要求是 - 我使用具有相同列名的 scala 创建了两个中间 RDD,需要组合两个 RDD 的这些结果并缓存结果以访问 UI。如何在此处合并数据集?

RDD 的类型为 spark.sql.SchemaRDD

【问题讨论】:

你不能只用++ 吗? @lmm 否。它将向 RDD 添加列。我需要向 RDD 添加行。我有两个具有相同列的 RDD,它们的记录需要合并到一个 RDD。 不,不会,我只是试了一下确定。 ++ 使用两者的结果创建一个联合 RDD。 【参考方案1】:

我想你在找RDD.union

val rddPart1 = ???
val rddPart2 = ???
val rddAll = rddPart1.union(rddPart2)

示例(在 Spark-shell 上)

val rdd1 = sc.parallelize(Seq((1, "Aug", 30),(1, "Sep", 31),(2, "Aug", 15),(2, "Sep", 10)))
val rdd2 = sc.parallelize(Seq((1, "Oct", 10),(1, "Nov", 12),(2, "Oct", 5),(2, "Nov", 15)))
rdd1.union(rdd2).collect

res0: Array[(Int, String, Int)] = Array((1,Aug,30), (1,Sep,31), (2,Aug,15), (2,Sep,10), (1,Oct,10), (1,Nov,12), (2,Oct,5), (2,Nov,15))

【讨论】:

rddPart1.union(rddPart2) 会将 rddPart2 的列添加到 rddPart1。我需要将 rddPart2 的行添加到 rddPart1。仅供参考,在这种情况下,两个 RDD 具有相同的列名和类型 更像是在已经存在的 RDD 中插入记录。不为 RDD 创建新列 @example 添加了一个示例。联合 RDD 没有新列。 虽然该示例看起来像是发生了连接(输出中 rdd1 后跟 rdd2),但我不相信 union 对数据的排序做出任何保证。他们可能会互相混淆。真正的连接并不那么容易,因为它意味着数据中存在顺序依赖关系,这与 spark 的分布式特性作斗争。【参考方案2】:

我遇到了同样的问题。要按行而不是按列组合,请使用 unionAll:

val rddPart1= ???
val rddPart2= ???
val rddAll = rddPart1.unionAll(rddPart2)

我是在阅读数据框的方法摘要后找到的。更多信息请访问:https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrame.html

【讨论】:

不确定这是正确的答案,问题是关于 RDD,而不是如何处理数据帧

以上是关于使用 Scala 在 Apache Spark 中连接不同 RDD 的数据集的主要内容,如果未能解决你的问题,请参考以下文章

无法使用 Scala 在 Apache Spark 中执行用户定义的函数

使用 Spark 和 Scala 进行字数统计

使用 IntelliJ idea 的 Scala 工作表作为 Apache Spark 的 Scala REPL

如何使用scala在Apache spark中用空字符串(“”)替换空值[重复]

在本地文件系统(不是HDFS)中使用Scala读取Apache Spark中的文件时如何更改输入块大小[重复]

在apache spark scala中排序和排名?