如何在 Spark RDD 中选择一系列元素?

Posted

技术标签:

【中文标题】如何在 Spark RDD 中选择一系列元素?【英文标题】:How do I select a range of elements in Spark RDD? 【发布时间】:2014-08-31 20:47:06 【问题描述】:

我想在 Spark RDD 中选择一系列元素。比如我有一个有一百个元素的RDD,我需要选择60到80个元素,我该怎么做呢?

我看到 RDD 有一个 take(i: int) 方法,它返回前 i 个元素。但是没有对应的方法来取最后的i个元素,或者从某个索引开始从中间开始的i个元素。

【问题讨论】:

我认为从 1.0.0 开始执行此操作的最自然方法是将 RDD 注册为表并使用 Spark SQL 将 ROW_NUMBER()RANK() 添加到数据集,然后 @ 987654324@ 所需的行。对于较小的 RDD,这是多余的,但这种方法应该对非常大的 RDD 有效。 这里是a discussion about this on the Spark User mailing list。 @NickChammas Checkout 更新答案 【参考方案1】:

对于那些偶然发现这个问题并寻找与 Spark 2.x 兼容的答案的人,您可以使用filterByRange

【讨论】:

【参考方案2】:

我认为目前还没有有效的方法来做到这一点。但最简单的方法是使用filter(),假设你有一个RDD,pairs 和键值对,你只想要60 到80 之间的元素就行了。

val 60to80 = pairs.filter 
    _ match 
        case (k,v) => k >= 60 && k <= 80
        case _ => false //incase of invalid input
    

我认为通过使用sortByKey 并保存有关映射到每个分区的值范围的信息,将来可能会更有效地完成此操作。请记住,如果您计划多次查询范围,这种方法只会节省任何东西,因为排序显然很昂贵。

通过查看 spark 源,肯定可以使用 RangePartitioner 进行有效的范围查询:

// An array of upper bounds for the first (partitions - 1) partitions
  private val rangeBounds: Array[K] = 

这是RangePartitioner 的私有成员,知道所有分区的上限,因此很容易只查询必要的分区。看起来这是火花用户将来可能会看到的东西:SPARK-911

更新:更好的答案,基于我为 SPARK-911 编写的拉取请求。如果对 RDD 进行排序并且您多次查询它,它将有效地运行。

val sorted = sc.parallelize((1 to 100).map(x => (x, x))).sortByKey().cache()
val p: RangePartitioner[Int, Int] = sorted.partitioner.get.asInstanceOf[RangePartitioner[Int, Int]];
val (lower, upper) = (10, 20)
val range = p.getPartition(lower) to p.getPartition(upper)
println(range)
val rangeFilter = (i: Int, iter: Iterator[(Int, Int)]) => 
  if (range.contains(i))
    for ((k, v) <- iter if k >= lower && k <= upper) yield (k, v)
  else
    Iterator.empty

for((k,v) <- sorted.mapPartitionsWithIndex(rangeFilter, preservesPartitioning = true).collect()) println(s"$k, $v")

如果将整个分区放在内存中是可以接受的,您甚至可以这样做。val glommedAndCached = sorted.glom()cache(); glommedAndCached.map(a => a.slice(a.search(lower),a.search(upper)+1)).collect()

search 不是成员 BTW 我只是做了一个具有二分查找功能的隐式类,此处未显示

【讨论】:

应该case (k,v) =&gt; k &gt;= 60 || k &lt;= 80 没有&amp;&amp; 而不是|| @Tom 哦,是的,看起来以前没有人发现它,已修复【参考方案3】:

以下应该可以得到范围。注意缓存会为你节省一些开销,因为 zipWithIndex 内部需要扫描 RDD 分区以获取每个分区中的元素数。

scala>val r1 = sc.parallelize(List("a", "b", "c", "d", "e", "f", "g"), 3).cache
scala>val r2 = r1.zipWithIndex
scala>val r3 = r2.filter(x=> x._2>2 && x._2 < 4).map(x=>x._1)
scala>r3.foreach(println)
d

【讨论】:

您需要添加此代码,并重构您的代码:val r4 = r3.collect r4.foreach(println) 值得注意的是,zipWithIndex 并非纯粹是惰性的——即使对于文档中的示例,它也做了很多分布式处理。【参考方案4】:

您的数据集有多大?您也许可以做您需要的事情:

data.take(80).drop(59)

这似乎效率低下,但对于中小型数据,应该可以。

是否可以通过其他方式解决此问题?从数据中间准确挑选某个范围的情况是什么? takeSample 会更好地为您服务吗?

【讨论】:

RDD 可能很大(数十到数百 GB)。 takeSample 也不起作用,我需要一个范围(从...到)。 @user1698678 如何在如此庞大的集合中定义范围?鉴于 RDD 将分布在许多节点中,您将需要一种方法来索引您的数据。 getRange (from,to) 在分布式模型中不起作用。 @maasg:你是在告诉我范围只为小型 RDD 定义吗?或者对于小型和大型的定义不同? take() 方法似乎适用于所有尺寸,所以我不明白为什么 takeLast() 和 takeMiddle() 方法不能存在。 @user1698678 take() 是一个动作,它将强制在驱动程序处收集 RDD。它确实只适用于适合驾驶员记忆的小型设备。如果您没有索引或排序,range(...) 是不稳定的,所以我想您需要某种形式的键来进行选择。

以上是关于如何在 Spark RDD 中选择一系列元素?的主要内容,如果未能解决你的问题,请参考以下文章

Spark如何将RDD的前几个元素存入HDFS中。

Spark RDD在Spark中的地位和作用如何?

Spark RDD在Spark中的地位和作用如何?

Spark RDD在Spark中的地位和作用如何?

RDD take()方法如何在内部工作?

spark系列-5RDDDataFrameDataset的区别和各自的优势