apache spark - 迭代地跳过并从 RDD 中获取

Posted

技术标签:

【中文标题】apache spark - 迭代地跳过并从 RDD 中获取【英文标题】:apache spark - iteratively skip and take from RDD 【发布时间】:2018-03-19 17:04:21 【问题描述】:

给定一个 RDD,对它进行排序然后在离散大小的块中使用它的最佳方法是什么?例如:

  JavaRDD<Integer> baseRdd = sc.parallelize(Arrays.asList(1,2,5,3,4));

  JavaRDD<Integer> sorted = baseRdd.sortBy(x -> x, true, 5);   

  // returns 1, 2   
  List<Integer> first = sorted.take(2);

  // returns 1, 2.  How to skip 2 and then take?
  List<Integer> second = sorted.take(2);

我真正想要的是在第一次调用take(2) 时使用1, 2,然后将某种“跳过”参数传递给第二个take(2) 以返回3, 4

由于当前 RDD 功能中似乎不存在“跳​​过”功能,将已排序的 RDD 拆分为可以独立操作的已知大小的块的最有效方法是什么?

【问题讨论】:

您只想要索引 (0, 1) 和 (2, 3) 处的元素还是所有 (n, n+1) 处的元素? 不只是 (n, n+1)。如果我有一个包含 75,000 个条目的 RDD,我希望在第一次调用 take() 时获得前 25,000 个条目,然后在第二次调用 take() 时获得条目 25001 到 50000,其余条目在第三个等等。数字 2在我最初的问题中,以及这里的数字 25,000 只是示例。 【参考方案1】:

为了提高效率,不要忘记您可以随时缓存您的 RDD。这将避免每次调用take 时都从文本文件中重新计算已排序的 RDD。由于我们将多次使用sorted RDD,我们将其缓存:

JavaRDD<Integer> sorted = baseRdd.sortBy(x -> x, true, 5).cache();

然后要从给定索引中获取元素到另一个索引,我们可以结合zipWithIndexfilterzipWithIndex 将 RDD 转换为元组的 RDD,其中元组的第一部分是已排序 RDD 的元素,第二部分是它的索引。一旦我们有了这些索引记录,我们就可以根据它们的索引过滤它们(假设 offset = 2 和 window = 2):

List<Integer> nth =
  sorted.zipWithIndex()
  .filter(x -> x._2() >= offset && x._2() < offset + window)
  .map(x -> x._1())
  .collect();

返回:

[3, 4]

最终的结果是:

JavaRDD<Integer> sorted = baseRdd.sortBy(x -> x, true, 5).zipWithIndex().cache();

Integer offset = 2;
Integer window = 2;

List<Integer> nth =
  sorted
  .filter(x -> x._2() >= offset && x._2() < offset + window)
  .map(x -> x._1())
  .collect();

这里我只在使用 index 压缩后才缓存 rdd,以免每次在不同窗口上执行此操作时都执行压缩部分。

然后,您可以根据您要如何创建不同的窗口列表,将此 nth creation sn-p 映射到循环或映射中。

【讨论】:

【参考方案2】:
rdd1=sc.parallelize((1,2,3,4,5,6,7,8))
rdd2=rdd1.take(2)

现在你根据 rdd2 过滤你的初始 rdd

rdd1.filter(lambda line:line not in rdd2).take(2)

这给出 [3, 4]

使用 PySpark

【讨论】:

这需要一个复杂度为 O(nlogn) 的排序函数并将其转换为 O(n^2) 问题。我的 RDD 中有数百万个条目,我无法做到这一点。 我的另一个选择是使用.zipWithUniqueId().filter(lambda x : x[1]&gt;50) 猜猜它不会满足您的需求,祝你好运! 谢谢安德烈,您的第二个选项大致类似于下面@Xavier 的回答。似乎是个好方法!

以上是关于apache spark - 迭代地跳过并从 RDD 中获取的主要内容,如果未能解决你的问题,请参考以下文章

如何有条件地跳过python中for循环中的迭代步骤数?

有条件地跳过案例

我可以有条件地跳过在同一个文件中加载“更多”红宝石代码吗?

rails .each 无缘无故地跳过第二个元素

运行测试时如何优雅地跳过 express-jwt 中间件?

字符串长度的for循环奇怪地跳过了一些字符