spark RDD 上的有序联合

Posted

技术标签:

【中文标题】spark RDD 上的有序联合【英文标题】:Ordered union on spark RDDs 【发布时间】:2015-05-21 12:51:20 【问题描述】:

我正在尝试使用 apache spark 对键记录对的键进行排序。键长 10 字节,值长约 90 字节。换句话说,我正在尝试复制用于break the sorting record 的排序基准 Databricks。我从文档中注意到的一件事是,它们对键行号对进行排序,而不是键记录对,可能​​对缓存/tlb 友好。我试图复制这种方法,但没有找到合适的解决方案。这是我尝试过的:

var keyValueRDD_1 = input.map(x => (x.substring(0, 10), x.substring(12, 13)))
var keyValueRDD_2 = input.map(x => (x.substring(0, 10), x.substring(14, 98))
var result = keyValueRDD_1.sortByKey(true, 1) // assume partitions = 1
var unionResult = result.union(keyValueRDD_2) 
var finalResult = unionResult.foldByKey("")(_+_)

当我对结果 RDD 和 keyValueRDD_2 RDD 进行联合并打印 unionResultRDD 的输出时,结果和 keyValueRDD_2 不会交错。换句话说,unionResult RDD 看起来有 keyValueRDD_2 内容,然后是结果 RDD 内容。但是,当我执行将相同键的值组合成单个键值对的 foldByKey 操作时,排序顺序被破坏。我需要通过按键操作进行折叠,以便将结果保存为原始键记录对。是否有可用于实现此目的的替代 rdd 函数?

任何提示或建议都会非常有用。 谢谢

【问题讨论】:

你试过RDD.zip和/或RDD.zipWithIndex吗? 我对 RDD zip 的理解是它会明智地压缩两个 RDD 元素的元素。因此,虽然订购了一个 RDD,但未订购另一个 RDD,这将导致@daniel 指出的相同问题。谢谢。 【参考方案1】:

union 方法只是将两个 RDD 一个接一个地放置,除非它们具有相同的分区器。然后它加入分区。

你想做的事是不可能的。

当您有一个已排序的 RDD (keyValueRDD_1) 和另一个具有相同键 (keyValueRDD_2) 的未排序 RDD 时,让第二个 RDD 排序的唯一方法是对其进行排序。

排序后的RDD的存在并不能帮助我们对第二个RDD进行排序。

Databricks article 谈到了在本地执行器上发生的优化。在 shuffle 步骤之后,对记录进行粗略排序。现在每个分区都包含一系列键,但分区是未排序的。

现在您必须在本地对每个分区进行排序,这就是前缀优化有助于缓存局部性的地方。

【讨论】:

我明白了。是的,这是有道理的。因此,要么这个前缀优化已经内置在 spark-engine 中,要么应该以这样一种方式对其进行编码,以便使用这种优化是我需要探索的东西。但是,是的,我明白以上是不可能的,我倾向于那个结果。谢谢@Daniel。

以上是关于spark RDD 上的有序联合的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark - 多个 RDD 的交集

循环 PySpark 后的联合 RDD

org.elasticsearch.spark.rdd.api.java.javaesspark哪个包

多个小型 RDD 的高效联合

从 RDD 到联合数据帧 PySpark

数据集上的慢联合