火花。将 RDD 拆分为批次

Posted

技术标签:

【中文标题】火花。将 RDD 拆分为批次【英文标题】:Spark. Split RDD into batches 【发布时间】:2018-04-26 05:45:08 【问题描述】:

我有 RDD,其中每条记录都是 int:

[0,1,2,3,4,5,6,7,8]

我需要做的就是将这个 RDD 分成多个批次。 IE。制作另一个 RDD,其中每个元素都是固定大小的元素列表:

[[0,1,2], [3,4,5], [6,7,8]]

这听起来微不足道,但是,这几天我很困惑,除了以下解决方案之外找不到任何东西:

    使用 ZipWithIndex 枚举 RDD 中的记录:

    [0,1,2,3,4,5] -> [(0, 0),(1, 1),(2, 2),(3, 3),(4, 4),(5, 5)]

    使用 map() 遍历这个 RDD 并计算像 index = int(index / batchSize) 这样的索引

    [1,2,3,4,5,6] -> [(0, 0),(0, 1),(0, 2),(1, 3),(1, 4),(1, 5)]

    然后按生成的索引分组。

    [(0, [0,1,2]), (1, [3,4,5])]

这将得到我需要的东西,但是,我不想在这里使用 group by。当您使用普通的 Map Reduce 或 Apache Crunch 之类的抽象时,这很简单。但是有没有办法在 Spark 中产生类似的结果而不使用大量的 group by?

【问题讨论】:

您可以 a) 应用多个过滤器; b) 使用自定义分区器并从每个分区创建 RDD。虽然我无法想象你为什么需要固定大小的 RDD。 @khachik 您能否详细说明“应用多个过滤器”和“实现自定义分区器”?我不需要固定大小的 RDD。我需要 RDD 中的每条记录都是记录数组(批处理)。这是必需的,因为我的数学模型消耗的不是单条记录,而是一批记录并返回一批预测。 【参考方案1】:

您没有清楚地解释为什么需要固定大小的 RDD,这取决于您要完成的工作可能会有更好的解决方案,但要回答已提出的问题,我看到以下选项: 1)根据项目数量和批量大小实施过滤器。例如,如果您在原始 RDD 中有 1000 个项目并希望将它们分成 10 个批次,您最终将应用 10 个过滤器,第一个检查索引是否为 [0, 99],第二个检查索引是否为 [100, 199]等等。应用每个过滤器后,您将拥有一个 RDD。需要注意的是,原始 RDD 可能会在过滤之前被缓存。优点:每个生成的 RDD 可以单独处理,不必在一个节点上完全分配。缺点:这种方法会随着批次数量的增加而变慢。 2) 逻辑上与此类似,但不是过滤器,您只需实现一个自定义分区器,该分区器根据索引(键)返回分区 id,如下所述:Custom partitioner for equally sized partitions。优点:比过滤器快。缺点:每个分区必须适合一个节点。 3)如果原始RDD中的顺序不重要,只需要大致相等的分块,你可以合并/重新分区,这里解释https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-partitions.html

【讨论】:

非常感谢您的详细解释。让我为您提供更多背景信息。我有一些机器学习模型,它最多需要 1K 条记录作为输入,做一些魔术并返回相同数量的记录。我需要做的就是使用此模型对我的 RDD 中的所有记录进行“评分”。所以基本上这就是为什么我需要将原始 RDD 分成块。每个块应包含不超过 1000 条记录。它是否不符合 Spark 范式?我大部分时间都在使用 Crunch / MapReduce,那里没有这样的问题。提前致谢【参考方案2】:

也许你可以使用aggregateByKey,在这种情况下它比groupByKey 更快更轻量级。 我尝试在 10 个 executor 上将 5 亿条数据拆分成 256 个大小的批次,只需要半个小时就可以完成。

data = data.zipWithIndex().map(lambda x: (x[1] / 256, x[0]))
data = data.aggregateByKey(list(), lambda x, y: x + [y], add)

有关详细信息,请参阅 Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey

【讨论】:

以上是关于火花。将 RDD 拆分为批次的主要内容,如果未能解决你的问题,请参考以下文章

将一个 RDD 拆分为多个 RDDS

如何将一个 RDD 拆分为两个或多个 RDD?

将 DataFrame 转换为 RDD 并将 RDD 动态拆分为与 DataFrame 相同数量的 Columns

在 Tensorflow 中将数据拆分为批次进行分类

在pySpark中将RDD拆分为n个部分

交货单批次拆分