如何将一个 RDD 拆分为两个或多个 RDD?

Posted

技术标签:

【中文标题】如何将一个 RDD 拆分为两个或多个 RDD?【英文标题】:How do I split an RDD into two or more RDDs? 【发布时间】:2016-01-03 09:57:04 【问题描述】:

我正在寻找一种将 RDD 拆分为两个或多个 RDD 的方法。我见过的最接近的是 Scala Spark: Split collection into several RDD?,它仍然是单个 RDD。

如果您熟悉 SAS,则如下所示:

data work.split1, work.split2;
    set work.preSplit;

    if (condition1)
        output work.split1
    else if (condition2)
        output work.split2
run;

这导致了两个不同的数据集。它必须立即坚持才能得到我想要的结果......

【问题讨论】:

【参考方案1】:

正如上面提到的其他海报,没有单一的本地 RDD 转换可以拆分 RDD,但这里有一些“多路复用”操作可以有效地模拟 RDD 上的各种“拆分”,没有 多次阅读:

http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions

一些特定于随机分裂的方法:

http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions

方法可从开源 silex 项目中获得:

https://github.com/willb/silex

一篇解释它们如何工作的博文:

http://erikerlandson.github.io/blog/2016/02/08/efficient-multiplexing-for-spark-rdds/

def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U],
  persist: StorageLevel): Seq[RDD[U]] = 
  val mux = self.mapPartitionsWithIndex  case (id, itr) =>
    Iterator.single(f(id, itr))
  .persist(persist)
  Vector.tabulate(n)  j => mux.mapPartitions  itr => Iterator.single(itr.next()(j))  


def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]],
  persist: StorageLevel): Seq[RDD[U]] = 
  val mux = self.mapPartitionsWithIndex  case (id, itr) =>
    Iterator.single(f(id, itr))
  .persist(persist)
  Vector.tabulate(n)  j => mux.mapPartitions  itr => itr.next()(j).toIterator  

正如其他地方所提到的,这些方法确实涉及内存与速度的权衡,因为它们通过“急切地”而不是“懒惰地”计算整个分区结果来运行。因此,这些方法可能会在大分区上遇到内存问题,而传统的惰性转换则不会。

【讨论】:

值得在另一个答案上重新陈述部分对话:多路复用允许通过单次计算提高效率,但它通过将结果存储在“非惰性”容器中来实现,因此(取决于正在计算的内容)与传统的多通道变化相比,常驻内存可能会增加,其中计算可能是惰性的。换句话说,多路复用购买会随着内存使用量的增加而提高计算效率 这个评论作为答案的一部分难道不是更好吗?【参考方案2】:

不可能从单个转换中产生多个 RDD*。如果要拆分 RDD,则必须为每个拆分条件应用 filter。例如:

def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))

rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))

如果你只有一个二元条件并且计算成本很高,你可能更喜欢这样的东西:

kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()

rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()

这意味着只有一次谓词计算,但需要对所有数据进行额外的传递。

需要注意的是,只要输入 RDD 被正确缓存并且没有关于数据分布的额外假设,重复过滤器和嵌套 if-else 的 for 循环之间的时间复杂度没有显着差异。

对于 N 个元素和 M 个条件,您必须执行的操作数显然与 N 乘以 M 成正比。在 for 循环的情况下,它应该更接近 (N + MN) / 2,并且重复过滤器正好是 NM,但在归根结底,它只不过是 O(NM)。您可以查看我与 Jason Lenderman 的讨论**,了解一些利弊。

在非常高的层次上,您应该考虑两件事:

    Spark 转换是惰性的,直到您执行一个操作,您的 RDD 才会具体化

    为什么重要?回到我的例子:

     rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
    

    如果以后我决定只需要rdd_odd,那么就没有理由实现rdd_even

    如果您查看 SAS 示例来计算 work.split2,您需要同时实现输入数据和 work.split1

    RDD 提供声明式 API。当您使用filtermap 时,如何执行此操作完全取决于 Spark 引擎。只要传递给转换的函数没有副作用,它就会为优化整个管道创造多种可能性。

归根结底,这个案例并不足以证明它自己的转变是合理的。

这个带有过滤器模式的映射实际上用在了核心 Spark 中。请参阅我对How does Sparks RDD.randomSplit actually split the RDD 的回答和randomSplit 方法的relevant part。

如果唯一的目标是实现输入拆分,则可以将partitionBy 子句用于DataFrameWriter 文本输出格式:

def makePairs(row: T): (String, String) = ???

data
  .map(makePairs).toDF("key", "value")
  .write.partitionBy($"key").format("text").save(...)

* Spark 中只有 3 种基本的转换类型:

RDD[T] => RDD[T] RDD[T] => RDD[U] (RDD[T], RDD[U]) => RDD[W]

其中 T、U、W 可以是原子类型或 products / 元组 (K, V)。任何其他操作都必须使用上述的某种组合来表达。详情可查看the original RDD paper。

** https://chat.***.com/rooms/91928/discussion-between-zero323-and-jason-lenderman

*** 另见Scala Spark: Split collection into several RDD?

【讨论】:

非常有用:)。我想知道为什么在 spark 中没有等效的分区方法。有什么想法吗? @Rakshith 简单。而且由于我们查看的是血统,因此无论如何都会丢弃一个分支。 有一些方法可以在不使用 'filter' 的情况下拆分 RDD,请参阅:***.com/a/37956034/3669757 @eje 前段时间Jason Lenderman 提出了类似的方法,并且已在此答案中链接。我看到的问题是假设数据适合执行程序内存,这通常是无法做到的。 @zero323,所有分区数据都必须适合执行程序内存,至少在计算时是这样。多路复用的 RDD 也不例外。可以指定存储类别来控制计算后是否缓存、溢出等。【参考方案3】:

一种方法是使用自定义分区器根据您的过滤条件对数据进行分区。这可以通过扩展Partitioner 并实现类似于RangePartitioner 的东西来实现。

然后可以使用映射分区从分区的 RDD 构造多个 RDD,而无需读取所有数据。

val filtered = partitioned.mapPartitions  iter => 

  new Iterator[Int]()
    override def hasNext: Boolean = 
      if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) 
        false
       else 
        iter.hasNext
      
    

    override def next():Int = iter.next()
  

请注意,过滤后的 RDD 中的分区数将与分区 RDD 中的数相同,因此应使用合并来减少此数量并删除空分区。

【讨论】:

有点,每次调用 mapPartitions 时它都会为每个分区运行任务,但是如果只读取一次分区内的实际数据 好的,但是如果我立即坚持下去,我将只触及每个观察一次,并且我将有两个 RDD 作为不同的输出,对吗? 是的,就是这样。 @JemTucker 您可以使用mapPartitionsWithIndex 而不是访问TaskContext。在旁注中,每个观察结果只会被触及一次是不正确的。由于它需要洗牌,这本身就是不好的,因此至少有一部分数据将被读取、序列化、传输、反序列化和可选地写入。这不仅意味着数据被多次访问,而且以更昂贵的方式访问。 这确实有道理,但是在使用这种方法过滤大量 RDD 时,我取得了良好的性能。我同意洗牌很昂贵,但通常在前面的步骤中强制洗牌,因此可以在这些步骤中使用自定义分区器来有效地对分区进行排序,从而避免使用一组过滤器。【参考方案4】:

如果您使用 randomSplit API call 拆分 RDD,您将返回一个 RDD 数组。

如果要返回 5 个 RDD,请传入 5 个权重值。

例如

val sourceRDD = val sourceRDD = sc.parallelize(1 to 100, 4)
val seedValue = 5
val splitRDD = sourceRDD.randomSplit(Array(1.0,1.0,1.0,1.0,1.0), seedValue)

splitRDD(1).collect()
res7: Array[Int] = Array(1, 6, 11, 12, 20, 29, 40, 62, 64, 75, 77, 83, 94, 96, 100)

【讨论】:

这不是和@zero323 的解决方案一样吗?他说它会多次阅读,这是我试图避免的

以上是关于如何将一个 RDD 拆分为两个或多个 RDD?的主要内容,如果未能解决你的问题,请参考以下文章

如何将一个RDD拆分成多个RDD并相互比较

火花。将 RDD 拆分为批次

如何通过 Delimiter 拆分 Spark RDD 的行

将 DataFrame 转换为 RDD 并将 RDD 动态拆分为与 DataFrame 相同数量的 Columns

在pySpark中将RDD拆分为n个部分

拆分数据数据类型后的Spark RDD如何在不更改数据类型的情况下拆分