如何在 Spark RDD 中选择一系列元素?
Posted
技术标签:
【中文标题】如何在 Spark RDD 中选择一系列元素?【英文标题】:How do I select a range of elements in Spark RDD? 【发布时间】:2014-08-31 20:47:06 【问题描述】:我想在 Spark RDD 中选择一系列元素。比如我有一个有一百个元素的RDD,我需要选择60到80个元素,我该怎么做呢?
我看到 RDD 有一个 take(i: int) 方法,它返回前 i 个元素。但是没有对应的方法来取最后的i个元素,或者从某个索引开始从中间开始的i个元素。
【问题讨论】:
我认为从 1.0.0 开始执行此操作的最自然方法是将 RDD 注册为表并使用 Spark SQL 将ROW_NUMBER()
或 RANK()
添加到数据集,然后 @ 987654324@ 所需的行。对于较小的 RDD,这是多余的,但这种方法应该对非常大的 RDD 有效。
这里是a discussion about this on the Spark User mailing list。
@NickChammas Checkout 更新答案
【参考方案1】:
对于那些偶然发现这个问题并寻找与 Spark 2.x 兼容的答案的人,您可以使用filterByRange
【讨论】:
【参考方案2】:我认为目前还没有有效的方法来做到这一点。但最简单的方法是使用filter()
,假设你有一个RDD,pairs
和键值对,你只想要60 到80 之间的元素就行了。
val 60to80 = pairs.filter
_ match
case (k,v) => k >= 60 && k <= 80
case _ => false //incase of invalid input
我认为通过使用sortByKey
并保存有关映射到每个分区的值范围的信息,将来可能会更有效地完成此操作。请记住,如果您计划多次查询范围,这种方法只会节省任何东西,因为排序显然很昂贵。
通过查看 spark 源,肯定可以使用 RangePartitioner
进行有效的范围查询:
// An array of upper bounds for the first (partitions - 1) partitions
private val rangeBounds: Array[K] =
这是RangePartitioner
的私有成员,知道所有分区的上限,因此很容易只查询必要的分区。看起来这是火花用户将来可能会看到的东西:SPARK-911
更新:更好的答案,基于我为 SPARK-911 编写的拉取请求。如果对 RDD 进行排序并且您多次查询它,它将有效地运行。
val sorted = sc.parallelize((1 to 100).map(x => (x, x))).sortByKey().cache()
val p: RangePartitioner[Int, Int] = sorted.partitioner.get.asInstanceOf[RangePartitioner[Int, Int]];
val (lower, upper) = (10, 20)
val range = p.getPartition(lower) to p.getPartition(upper)
println(range)
val rangeFilter = (i: Int, iter: Iterator[(Int, Int)]) =>
if (range.contains(i))
for ((k, v) <- iter if k >= lower && k <= upper) yield (k, v)
else
Iterator.empty
for((k,v) <- sorted.mapPartitionsWithIndex(rangeFilter, preservesPartitioning = true).collect()) println(s"$k, $v")
如果将整个分区放在内存中是可以接受的,您甚至可以这样做。val glommedAndCached = sorted.glom()cache();
glommedAndCached.map(a => a.slice(a.search(lower),a.search(upper)+1)).collect()
search
不是成员 BTW 我只是做了一个具有二分查找功能的隐式类,此处未显示
【讨论】:
应该case (k,v) => k >= 60 || k <= 80
没有&&
而不是||
?
@Tom 哦,是的,看起来以前没有人发现它,已修复【参考方案3】:
以下应该可以得到范围。注意缓存会为你节省一些开销,因为 zipWithIndex 内部需要扫描 RDD 分区以获取每个分区中的元素数。
scala>val r1 = sc.parallelize(List("a", "b", "c", "d", "e", "f", "g"), 3).cache
scala>val r2 = r1.zipWithIndex
scala>val r3 = r2.filter(x=> x._2>2 && x._2 < 4).map(x=>x._1)
scala>r3.foreach(println)
d
【讨论】:
您需要添加此代码,并重构您的代码:val r4 = r3.collect r4.foreach(println) 值得注意的是,zipWithIndex 并非纯粹是惰性的——即使对于文档中的示例,它也做了很多分布式处理。【参考方案4】:您的数据集有多大?您也许可以做您需要的事情:
data.take(80).drop(59)
这似乎效率低下,但对于中小型数据,应该可以。
是否可以通过其他方式解决此问题?从数据中间准确挑选某个范围的情况是什么? takeSample
会更好地为您服务吗?
【讨论】:
RDD 可能很大(数十到数百 GB)。takeSample
也不起作用,我需要一个范围(从...到)。
@user1698678 如何在如此庞大的集合中定义范围?鉴于 RDD 将分布在许多节点中,您将需要一种方法来索引您的数据。 getRange (from,to)
在分布式模型中不起作用。
@maasg:你是在告诉我范围只为小型 RDD 定义吗?或者对于小型和大型的定义不同? take() 方法似乎适用于所有尺寸,所以我不明白为什么 takeLast() 和 takeMiddle() 方法不能存在。
@user1698678 take()
是一个动作,它将强制在驱动程序处收集 RDD。它确实只适用于适合驾驶员记忆的小型设备。如果您没有索引或排序,range(...)
是不稳定的,所以我想您需要某种形式的键来进行选择。以上是关于如何在 Spark RDD 中选择一系列元素?的主要内容,如果未能解决你的问题,请参考以下文章