使用 Scala 在 Apache Spark 中连接不同 RDD 的数据集
Posted
技术标签:
【中文标题】使用 Scala 在 Apache Spark 中连接不同 RDD 的数据集【英文标题】:Concatenating datasets of different RDDs in Apache spark using scala 【发布时间】:2015-02-08 07:37:30 【问题描述】:有没有办法在 spark 中连接两个不同 RDD
s 的数据集?
要求是 - 我使用具有相同列名的 scala 创建了两个中间 RDD,需要组合两个 RDD 的这些结果并缓存结果以访问 UI。如何在此处合并数据集?
RDD 的类型为 spark.sql.SchemaRDD
【问题讨论】:
你不能只用++
吗?
@lmm 否。它将向 RDD 添加列。我需要向 RDD 添加行。我有两个具有相同列的 RDD,它们的记录需要合并到一个 RDD。
不,不会,我只是试了一下确定。 ++
使用两者的结果创建一个联合 RDD。
【参考方案1】:
我想你在找RDD.union
val rddPart1 = ???
val rddPart2 = ???
val rddAll = rddPart1.union(rddPart2)
示例(在 Spark-shell 上)
val rdd1 = sc.parallelize(Seq((1, "Aug", 30),(1, "Sep", 31),(2, "Aug", 15),(2, "Sep", 10)))
val rdd2 = sc.parallelize(Seq((1, "Oct", 10),(1, "Nov", 12),(2, "Oct", 5),(2, "Nov", 15)))
rdd1.union(rdd2).collect
res0: Array[(Int, String, Int)] = Array((1,Aug,30), (1,Sep,31), (2,Aug,15), (2,Sep,10), (1,Oct,10), (1,Nov,12), (2,Oct,5), (2,Nov,15))
【讨论】:
rddPart1.union(rddPart2) 会将 rddPart2 的列添加到 rddPart1。我需要将 rddPart2 的行添加到 rddPart1。仅供参考,在这种情况下,两个 RDD 具有相同的列名和类型 更像是在已经存在的 RDD 中插入记录。不为 RDD 创建新列 @example 添加了一个示例。联合 RDD 没有新列。 虽然该示例看起来像是发生了连接(输出中 rdd1 后跟 rdd2),但我不相信union
对数据的排序做出任何保证。他们可能会互相混淆。真正的连接并不那么容易,因为它意味着数据中存在顺序依赖关系,这与 spark 的分布式特性作斗争。【参考方案2】:
我遇到了同样的问题。要按行而不是按列组合,请使用 unionAll:
val rddPart1= ???
val rddPart2= ???
val rddAll = rddPart1.unionAll(rddPart2)
我是在阅读数据框的方法摘要后找到的。更多信息请访问:https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrame.html
【讨论】:
不确定这是正确的答案,问题是关于 RDD,而不是如何处理数据帧以上是关于使用 Scala 在 Apache Spark 中连接不同 RDD 的数据集的主要内容,如果未能解决你的问题,请参考以下文章
无法使用 Scala 在 Apache Spark 中执行用户定义的函数
使用 IntelliJ idea 的 Scala 工作表作为 Apache Spark 的 Scala REPL
如何使用scala在Apache spark中用空字符串(“”)替换空值[重复]