apache spark - 迭代地跳过并从 RDD 中获取
Posted
技术标签:
【中文标题】apache spark - 迭代地跳过并从 RDD 中获取【英文标题】:apache spark - iteratively skip and take from RDD 【发布时间】:2018-03-19 17:04:21 【问题描述】:给定一个 RDD,对它进行排序然后在离散大小的块中使用它的最佳方法是什么?例如:
JavaRDD<Integer> baseRdd = sc.parallelize(Arrays.asList(1,2,5,3,4));
JavaRDD<Integer> sorted = baseRdd.sortBy(x -> x, true, 5);
// returns 1, 2
List<Integer> first = sorted.take(2);
// returns 1, 2. How to skip 2 and then take?
List<Integer> second = sorted.take(2);
我真正想要的是在第一次调用take(2)
时使用1, 2
,然后将某种“跳过”参数传递给第二个take(2)
以返回3, 4
?
由于当前 RDD 功能中似乎不存在“跳过”功能,将已排序的 RDD 拆分为可以独立操作的已知大小的块的最有效方法是什么?
【问题讨论】:
您只想要索引 (0, 1) 和 (2, 3) 处的元素还是所有 (n, n+1) 处的元素? 不只是 (n, n+1)。如果我有一个包含 75,000 个条目的 RDD,我希望在第一次调用 take() 时获得前 25,000 个条目,然后在第二次调用 take() 时获得条目 25001 到 50000,其余条目在第三个等等。数字 2在我最初的问题中,以及这里的数字 25,000 只是示例。 【参考方案1】:为了提高效率,不要忘记您可以随时缓存您的 RDD。这将避免每次调用take
时都从文本文件中重新计算已排序的 RDD。由于我们将多次使用sorted
RDD,我们将其缓存:
JavaRDD<Integer> sorted = baseRdd.sortBy(x -> x, true, 5).cache();
然后要从给定索引中获取元素到另一个索引,我们可以结合zipWithIndex
和filter
。 zipWithIndex
将 RDD 转换为元组的 RDD,其中元组的第一部分是已排序 RDD 的元素,第二部分是它的索引。一旦我们有了这些索引记录,我们就可以根据它们的索引过滤它们(假设 offset = 2 和 window = 2):
List<Integer> nth =
sorted.zipWithIndex()
.filter(x -> x._2() >= offset && x._2() < offset + window)
.map(x -> x._1())
.collect();
返回:
[3, 4]
最终的结果是:
JavaRDD<Integer> sorted = baseRdd.sortBy(x -> x, true, 5).zipWithIndex().cache();
Integer offset = 2;
Integer window = 2;
List<Integer> nth =
sorted
.filter(x -> x._2() >= offset && x._2() < offset + window)
.map(x -> x._1())
.collect();
这里我只在使用 index 压缩后才缓存 rdd,以免每次在不同窗口上执行此操作时都执行压缩部分。
然后,您可以根据您要如何创建不同的窗口列表,将此 nth
creation sn-p 映射到循环或映射中。
【讨论】:
【参考方案2】:rdd1=sc.parallelize((1,2,3,4,5,6,7,8))
rdd2=rdd1.take(2)
现在你根据 rdd2 过滤你的初始 rdd
rdd1.filter(lambda line:line not in rdd2).take(2)
这给出 [3, 4]
使用 PySpark
【讨论】:
这需要一个复杂度为 O(nlogn) 的排序函数并将其转换为 O(n^2) 问题。我的 RDD 中有数百万个条目,我无法做到这一点。 我的另一个选择是使用.zipWithUniqueId().filter(lambda x : x[1]>50)
猜猜它不会满足您的需求,祝你好运!
谢谢安德烈,您的第二个选项大致类似于下面@Xavier 的回答。似乎是个好方法!以上是关于apache spark - 迭代地跳过并从 RDD 中获取的主要内容,如果未能解决你的问题,请参考以下文章