Spark 地图端聚合:仅每个分区?

Posted

技术标签:

【中文标题】Spark 地图端聚合:仅每个分区?【英文标题】:Spark map-side aggregation: Per partition only? 【发布时间】:2020-12-08 05:35:18 【问题描述】:

我一直在阅读关于 map-side reduce/aggregation 的内容,但我似乎无法清楚地理解一件事。它是仅在每个分区发生还是范围更广?我的意思是如果同一个key出现在同一个Executor处理的多个分区中,它是否也会减少跨分区?

现在我还有几个问题,具体取决于答案是否为“仅每个分区”。

假设它是每个分区:

在我知道我的数据集很适合在洗牌之前进一步减少跨本地分区的情况时,有哪些好方法可以处理。例如。我为每个 Executor 处理 10 个分区,并且我知道它们都包含许多重叠的键,因此它可能会减少到仅 1/10。基本上我正在寻找一个本地的 reduce() (就像很多人一样)。想到 Coalesce()ing 他们,有什么常用的方法来处理这个问题?

假设它跨分区减少:

每个执行者都会发生这种情况吗?分配给同一个 Worker 节点的 Executor 怎么样,它们是否有能力减少彼此的分区,以识别它们位于同一位置?

它是否在 Executor 中的每个核心(线程)中发生?我问这个的原因是因为我查看的一些图表似乎显示了执行程序的每个核心/线程的映射器,看起来来自该核心的所有任务的结果都转到了单个 Mapper 实例。 (如果我没记错的话,shuffle 会写)

它是确定性的吗?例如。如果我有记录,假设 A=1 在由同一个 Executor 处理的 10 个分区中,我可以期望看到 A=10 用于读取 shuffle 输出的任务吗?还是尽力而为,例如它仍然会减少,但存在一些限制(缓冲区大小等),因此随机读取可能会遇到 A=4 和 A=6。

【问题讨论】:

【参考方案1】:

Map 端聚合类似于 Hadoop combiner 方法。在本地减少对 Spark 也有意义,并且意味着更少的洗牌。所以它适用于每个分区 - 正如你所说的那样。

当应用归约功能时,例如一个 groupBy 和 sum,然后最初发生洗牌,以便键在同一个分区中,这样就可以发生上述情况(自动使用数据帧)。但是,比如说,一个简单的计数也会在本地减少,然后通过将中间结果返回给驱动程序来计算总计数。

因此,结果会在 Executors 的 Driver 上合并 - 取决于实际请求的内容,例如收集,打印一个计数。但是如果在某种性质的聚合之后写出,那么减少仅限于 Worker 上的 Executor。

【讨论】:

谢谢,我等了一会儿,看看是否会有更多答案。不幸的是,如果我没有遗漏一些东西,除非“它适用于每个分区”这句话的意思是“它只适用于每个分区,周期”,这对我来说仍然有些模糊。让我进一步简化,我有 rdd.mapPartitions().reduceByKey(),mapPartitions() 输出为 A=1 和 A=2,用于减少 - 洗牌之前的 2 个单独分区(在同一个 Executor 上)。是否可以在这 2 个不同的分区上应用 map-side reduce,从而可能会为 shuffle 写入 A=3,还是根本不跨分区应用? 这个概念是PER PARTITION。分区是在同一个工作人员上还是由同一个执行者提供服务都没有关系。然后最终 AGGRegation 将覆盖所有这些分区。答案是正确的。

以上是关于Spark 地图端聚合:仅每个分区?的主要内容,如果未能解决你的问题,请参考以下文章

Spark Structured Streaming - groupByKey 按分区单独

Spark - 地图转换

Spark RDD-行动算子

spark aggregate函数

Spark-core:Spark RDD的高级算子

如何将地图转换为 Spark 的 RDD