Apache Spark - 多个 RDD 的交集
Posted
技术标签:
【中文标题】Apache Spark - 多个 RDD 的交集【英文标题】:Apache Spark - Intersection of Multiple RDDs 【发布时间】:2016-10-27 15:14:10 【问题描述】:在 apache spark 中,可以使用sparkContext.union()
方法有效地联合多个 RDD。如果有人想与多个 RDD 相交,是否有类似的东西?我在 sparkContext 方法中进行了搜索,但在其他任何地方都找不到任何东西。一种解决方案可能是合并 rdds,然后检索重复项,但我认为它不会那么有效。假设我有以下带有键/值对集合的示例:
val rdd1 = sc.parallelize(Seq((1,1.0),(2,1.0)))
val rdd2 = sc.parallelize(Seq((1,2.0),(3,4.0),(3,1.0)))
我想检索一个包含以下元素的新集合:
(1,2.0) (1,1.0)
但当然是多个 rdds 而不仅仅是两个。
【问题讨论】:
为什么要交叉多个rdds?依据是什么? 我想现在我的问题更好理解了。 【参考方案1】:试试:
val rdds = Seq(
sc.parallelize(Seq(1, 3, 5)),
sc.parallelize(Seq(3, 5)),
sc.parallelize(Seq(1, 3))
)
rdds.map(rdd => rdd.map(x => (x, None))).reduce((x, y) => x.join(y).keys.map(x => (x, None))).keys
【讨论】:
这行得通,谢谢。但是,如果每个集合都有键/值对而不是整数,那是行不通的,对吧?此外,此方法使用连接。通常,哈希分区器是一种很好的做法,对吧? 只要可以散列元素就应该可以工作。除非你想要一些不同的输出。不明白第二个问题。 在 rdds 之间使用 join 之前的一个好习惯是使用 Hash Partitioner 来避免多余的重新洗牌并提高效率。在您的代码中,您不使用任何哈希分区。 重新分区根据定义是随机的,默认加入集合中的最佳分区。【参考方案2】:RDD上有一个intersection method,但它只需要另一个RDD:
def intersection(other: RDD[T]): RDD[T]
让我们根据这个来实现你想要的方法。
def intersectRDDs[T](rdds: Seq[RDD[T]]): RDD[T] =
rdds.reduce case (left, right) => left.intersection(right)
如果你看过 Spark joins 的实现,你可以通过将最大的 RDD 放在首位来优化执行:
def intersectRDDs[T](rdds: Seq[RDD[T]]): RDD[T] =
rdds.sortBy(rdd => -1 * rdd.partitions.length)
.reduce case (left, right) => left.intersection(right)
编辑:看起来我误读了您的示例:您的文本看起来像是在搜索 rdd.union 的反向行为,但您的示例暗示您希望按键相交。我的回答没有解决这种情况。
【讨论】:
以上是关于Apache Spark - 多个 RDD 的交集的主要内容,如果未能解决你的问题,请参考以下文章
类型不匹配;找到:org.apache.spark.sql.DataFrame 需要:org.apache.spark.rdd.RDD
值 toDS 不是 org.apache.spark.rdd.RDD 的成员
值 toDF 不是 org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] 的成员